Просмотр исходного кода

feat(runtime): add message system & error message transmission (#515)

* feat(runtime): support message system

* feat(runtime): error message transmission
Louis Young 6 месяцев назад
Родитель
Сommit
e6fefa9177

+ 10 - 0
apps/demo-free-layout/src/components/testrun/node-status-bar/render/index.tsx

@@ -227,6 +227,16 @@ export const NodeStatusRender: FC<NodeStatusRenderProps> = ({ report }) => {
           padding: '0px 2px 10px 2px',
         }}
       >
+        {isNodeFailed && currentSnapshot?.error && (
+          <div
+            style={{
+              padding: 12,
+              color: 'red',
+            }}
+          >
+            {currentSnapshot.error}
+          </div>
+        )}
         {renderSnapshotNavigation()}
         <NodeStatusGroup title="Inputs" data={currentSnapshot?.inputs} />
         <NodeStatusGroup title="Outputs" data={currentSnapshot?.outputs} />

+ 14 - 6
apps/demo-free-layout/src/components/testrun/testrun-panel/index.tsx

@@ -25,7 +25,7 @@ export const TestRunSidePanel: FC<TestRunSidePanelProps> = ({ visible, onCancel
   const runtimeService = useService(WorkflowRuntimeService);
   const [isRunning, setRunning] = useState(false);
   const [value, setValue] = useState<string>(`{}`);
-  const [error, setError] = useState<string | undefined>();
+  const [errors, setErrors] = useState<string[]>();
   const [result, setResult] = useState<
     | {
         inputs: WorkflowInputs;
@@ -40,12 +40,12 @@ export const TestRunSidePanel: FC<TestRunSidePanelProps> = ({ visible, onCancel
       return;
     }
     setResult(undefined);
-    setError(undefined);
+    setErrors(undefined);
     setRunning(true);
     try {
       await runtimeService.taskRun(value);
     } catch (e: any) {
-      setError(e.message);
+      setErrors([e.message]);
     }
   };
 
@@ -57,9 +57,14 @@ export const TestRunSidePanel: FC<TestRunSidePanelProps> = ({ visible, onCancel
   };
 
   useEffect(() => {
-    const disposer = runtimeService.onTerminated(({ result }) => {
+    const disposer = runtimeService.onTerminated(({ result, errors }) => {
       setRunning(false);
       setResult(result);
+      if (errors) {
+        setErrors(errors.map((e) => `${e.nodeID}: ${e.message}`));
+      } else {
+        setErrors(undefined);
+      }
     });
     return () => disposer.dispose();
   }, []);
@@ -77,8 +82,11 @@ export const TestRunSidePanel: FC<TestRunSidePanelProps> = ({ visible, onCancel
       <div className={styles['code-editor-container']}>
         <CodeEditor languageId="json" value={value} onChange={setValue} />
       </div>
-      <div className={styles.error}>{error}</div>
-
+      {errors?.map((e) => (
+        <div className={styles.error} key={e}>
+          {e}
+        </div>
+      ))}
       <NodeStatusGroup title="Inputs" data={result?.inputs} optional disableCollapse />
       <NodeStatusGroup title="Outputs" data={result?.outputs} optional disableCollapse />
     </div>

+ 9 - 5
apps/demo-free-layout/src/plugins/runtime-plugin/runtime-service/index.ts

@@ -4,6 +4,7 @@
  */
 
 import {
+  IMessage,
   IReport,
   NodeReport,
   WorkflowInputs,
@@ -52,6 +53,7 @@ export class WorkflowRuntimeService {
   private resetEmitter = new Emitter<{}>();
 
   public terminatedEmitter = new Emitter<{
+    errors?: IMessage[];
     result?: {
       inputs: WorkflowInputs;
       outputs: WorkflowOutputs;
@@ -125,24 +127,26 @@ export class WorkflowRuntimeService {
     if (!this.taskID) {
       return;
     }
-    const output = await this.runtimeClient.TaskReport({
+    const report = await this.runtimeClient.TaskReport({
       taskID: this.taskID,
     });
-    if (!output) {
+    if (!report) {
       clearInterval(this.syncTaskReportIntervalID);
       console.error('Sync task report failed');
       return;
     }
-    const { workflowStatus, inputs, outputs } = output;
+    const { workflowStatus, inputs, outputs, messages } = report;
     if (workflowStatus.terminated) {
       clearInterval(this.syncTaskReportIntervalID);
       if (Object.keys(outputs).length > 0) {
         this.terminatedEmitter.fire({ result: { inputs, outputs } });
       } else {
-        this.terminatedEmitter.fire({});
+        this.terminatedEmitter.fire({
+          errors: messages.error,
+        });
       }
     }
-    this.updateReport(output);
+    this.updateReport(report);
   }
 
   private updateReport(report: IReport): void {

+ 2 - 0
packages/runtime/interface/src/runtime/context/index.ts

@@ -6,6 +6,7 @@
 import { IVariableStore } from '@runtime/variable';
 import { IStatusCenter } from '@runtime/status';
 import { ISnapshotCenter } from '@runtime/snapshot';
+import { IMessageCenter } from '@runtime/message';
 import { IIOCenter } from '@runtime/io-center';
 import { IState } from '../state';
 import { IReporter } from '../reporter';
@@ -19,6 +20,7 @@ export interface ContextData {
   ioCenter: IIOCenter;
   snapshotCenter: ISnapshotCenter;
   statusCenter: IStatusCenter;
+  messageCenter: IMessageCenter;
   reporter: IReporter;
 }
 

+ 1 - 0
packages/runtime/interface/src/runtime/index.ts

@@ -17,3 +17,4 @@ export * from './status';
 export * from './task';
 export * from './validation';
 export * from './variable';
+export * from './message';

+ 37 - 0
packages/runtime/interface/src/runtime/message/index.ts

@@ -0,0 +1,37 @@
+/**
+ * Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
+ * SPDX-License-Identifier: MIT
+ */
+
+export enum WorkflowMessageType {
+  Log = 'log',
+  Info = 'info',
+  Debug = 'debug',
+  Error = 'error',
+  Warn = 'warning',
+}
+
+export interface MessageData {
+  message: string;
+  nodeID?: string;
+  timestamp?: number;
+}
+
+export interface IMessage extends MessageData {
+  id: string;
+  type: WorkflowMessageType;
+  timestamp: number;
+}
+
+export type WorkflowMessages = Record<WorkflowMessageType, IMessage[]>;
+
+export interface IMessageCenter {
+  init(): void;
+  dispose(): void;
+  log(data: MessageData): IMessage;
+  info(data: MessageData): IMessage;
+  debug(data: MessageData): IMessage;
+  error(data: MessageData): IMessage;
+  warn(data: MessageData): IMessage;
+  export(): WorkflowMessages;
+}

+ 7 - 1
packages/runtime/interface/src/runtime/reporter/index.ts

@@ -3,25 +3,31 @@
  * SPDX-License-Identifier: MIT
  */
 
+import { IMessageCenter, WorkflowMessages } from '@runtime/message';
 import { StatusData, IStatusCenter } from '../status';
 import { Snapshot, ISnapshotCenter } from '../snapshot';
 import { WorkflowInputs, WorkflowOutputs } from '../base';
+
 export interface NodeReport extends StatusData {
   id: string;
   snapshots: Snapshot[];
 }
 
+export type WorkflowReports = Record<string, NodeReport>;
+
 export interface IReport {
   id: string;
   inputs: WorkflowInputs;
   outputs: WorkflowOutputs;
   workflowStatus: StatusData;
-  reports: Record<string, NodeReport>;
+  reports: WorkflowReports;
+  messages: WorkflowMessages;
 }
 
 export interface IReporter {
   snapshotCenter: ISnapshotCenter;
   statusCenter: IStatusCenter;
+  messageCenter: IMessageCenter;
   init(): void;
   dispose(): void;
   export(): IReport;

+ 2 - 1
packages/runtime/interface/src/runtime/snapshot/snapshot.ts

@@ -11,6 +11,7 @@ export interface SnapshotData {
   outputs: WorkflowOutputs;
   data: any;
   branch?: string;
+  error?: string;
 }
 
 export interface Snapshot extends SnapshotData {
@@ -20,7 +21,7 @@ export interface Snapshot extends SnapshotData {
 export interface ISnapshot {
   id: string;
   data: Partial<SnapshotData>;
-  addData(data: Partial<SnapshotData>): void;
+  update(data: Partial<SnapshotData>): void;
   validate(): boolean;
   export(): Snapshot;
 }

+ 4 - 2
packages/runtime/js-core/src/domain/__tests__/schemas/branch.test.ts

@@ -200,11 +200,13 @@ describe('WorkflowRuntime branch schema', () => {
     });
     expect(context.statusCenter.workflow.status).toBe(WorkflowStatus.Processing);
     const result = await processing;
-    expect(context.statusCenter.workflow.status).toBe(WorkflowStatus.Succeeded);
+    expect(context.statusCenter.workflow.status).toBe(WorkflowStatus.Failed);
     expect(result).toStrictEqual({});
 
     const report = context.reporter.export();
-    expect(report.workflowStatus.status).toBe(WorkflowStatus.Succeeded);
+    expect(report.messages.error.length).toBe(1);
+    expect(report.messages.error[0].nodeID).toBe('condition_0');
+    expect(report.workflowStatus.status).toBe(WorkflowStatus.Failed);
     expect(report.reports.start_0.status).toBe(WorkflowStatus.Succeeded);
     expect(report.reports.condition_0.status).toBe(WorkflowStatus.Failed);
     expect(report.reports.llm_1).toBeUndefined();

+ 16 - 1
packages/runtime/js-core/src/domain/context/index.ts

@@ -14,8 +14,10 @@ import {
   IReporter,
   IIOCenter,
   ContextData,
+  IMessageCenter,
 } from '@flowgram.ai/runtime-interface';
 
+import { WorkflowRuntimeMessageCenter } from '@workflow/message';
 import { uuid } from '@infra/utils';
 import { WorkflowRuntimeVariableStore } from '../variable';
 import { WorkflowRuntimeStatusCenter } from '../status';
@@ -40,6 +42,8 @@ export class WorkflowRuntimeContext implements IContext {
 
   public readonly statusCenter: IStatusCenter;
 
+  public readonly messageCenter: IMessageCenter;
+
   public readonly reporter: IReporter;
 
   private subContexts: IContext[] = [];
@@ -52,6 +56,7 @@ export class WorkflowRuntimeContext implements IContext {
     this.ioCenter = data.ioCenter;
     this.snapshotCenter = data.snapshotCenter;
     this.statusCenter = data.statusCenter;
+    this.messageCenter = data.messageCenter;
     this.reporter = data.reporter;
   }
 
@@ -63,6 +68,7 @@ export class WorkflowRuntimeContext implements IContext {
     this.ioCenter.init(inputs);
     this.snapshotCenter.init();
     this.statusCenter.init();
+    this.messageCenter.init();
     this.reporter.init();
   }
 
@@ -77,6 +83,7 @@ export class WorkflowRuntimeContext implements IContext {
     this.ioCenter.dispose();
     this.snapshotCenter.dispose();
     this.statusCenter.dispose();
+    this.messageCenter.dispose();
     this.reporter.dispose();
   }
 
@@ -89,6 +96,7 @@ export class WorkflowRuntimeContext implements IContext {
       ioCenter: this.ioCenter,
       snapshotCenter: this.snapshotCenter,
       statusCenter: this.statusCenter,
+      messageCenter: this.messageCenter,
       reporter: this.reporter,
       variableStore,
       state,
@@ -107,7 +115,13 @@ export class WorkflowRuntimeContext implements IContext {
     const ioCenter = new WorkflowRuntimeIOCenter();
     const snapshotCenter = new WorkflowRuntimeSnapshotCenter();
     const statusCenter = new WorkflowRuntimeStatusCenter();
-    const reporter = new WorkflowRuntimeReporter(ioCenter, snapshotCenter, statusCenter);
+    const messageCenter = new WorkflowRuntimeMessageCenter();
+    const reporter = new WorkflowRuntimeReporter(
+      ioCenter,
+      snapshotCenter,
+      statusCenter,
+      messageCenter
+    );
     return new WorkflowRuntimeContext({
       document,
       variableStore,
@@ -115,6 +129,7 @@ export class WorkflowRuntimeContext implements IContext {
       ioCenter,
       snapshotCenter,
       statusCenter,
+      messageCenter,
       reporter,
     });
   }

+ 17 - 8
packages/runtime/js-core/src/domain/engine/index.ts

@@ -46,11 +46,14 @@ export class WorkflowRuntimeEngine implements IEngine {
       return;
     }
     context.statusCenter.nodeStatus(node.id).process();
+    const snapshot = context.snapshotCenter.create({
+      nodeID: node.id,
+      data: node.data,
+    });
+    let nextNodes: INode[] = [];
     try {
       const inputs = context.state.getNodeInputs(node);
-      const snapshot = context.snapshotCenter.create({
-        nodeID: node.id,
-        data: node.data,
+      snapshot.update({
         inputs,
       });
       const result = await this.executor.execute({
@@ -63,17 +66,23 @@ export class WorkflowRuntimeEngine implements IEngine {
         return;
       }
       const { outputs, branch } = result;
-      snapshot.addData({ outputs, branch });
+      snapshot.update({ outputs, branch });
       context.state.setNodeOutputs({ node, outputs });
       context.state.addExecutedNode(node);
       context.statusCenter.nodeStatus(node.id).success();
-      const nextNodes = this.getNextNodes({ node, branch, context });
-      await this.executeNext({ node, nextNodes, context });
+      nextNodes = this.getNextNodes({ node, branch, context });
     } catch (e) {
+      const errorMessage = e instanceof Error ? e.message : 'An unknown error occurred';
+      snapshot.update({ error: errorMessage });
+      context.messageCenter.error({
+        nodeID: node.id,
+        message: errorMessage,
+      });
       context.statusCenter.nodeStatus(node.id).fail();
       console.error(e);
-      return;
+      throw e;
     }
+    await this.executeNext({ node, nextNodes, context });
   }
 
   private async process(context: IContext): Promise<WorkflowOutputs> {
@@ -86,7 +95,7 @@ export class WorkflowRuntimeEngine implements IEngine {
       return outputs;
     } catch (e) {
       context.statusCenter.workflow.fail();
-      throw e;
+      return {};
     }
   }
 

+ 6 - 0
packages/runtime/js-core/src/domain/message/index.ts

@@ -0,0 +1,6 @@
+/**
+ * Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
+ * SPDX-License-Identifier: MIT
+ */
+
+export { WorkflowRuntimeMessageCenter } from './message-center';

+ 294 - 0
packages/runtime/js-core/src/domain/message/message-center/index.test.ts

@@ -0,0 +1,294 @@
+/**
+ * Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
+ * SPDX-License-Identifier: MIT
+ */
+
+import { beforeEach, describe, expect, it } from 'vitest';
+import { WorkflowMessageType, MessageData } from '@flowgram.ai/runtime-interface';
+
+import { WorkflowRuntimeMessageCenter } from './index';
+
+describe('WorkflowRuntimeMessageCenter', () => {
+  let messageCenter: WorkflowRuntimeMessageCenter;
+  const mockMessageData: MessageData = {
+    nodeID: 'test-node-1',
+    message: 'Test message',
+    timestamp: Date.now(),
+  };
+
+  beforeEach(() => {
+    messageCenter = new WorkflowRuntimeMessageCenter();
+    messageCenter.init();
+  });
+
+  describe('init', () => {
+    it('should initialize with empty messages object', () => {
+      const messages = messageCenter.export();
+      expect(messages).toEqual({
+        [WorkflowMessageType.Log]: [],
+        [WorkflowMessageType.Info]: [],
+        [WorkflowMessageType.Debug]: [],
+        [WorkflowMessageType.Error]: [],
+        [WorkflowMessageType.Warn]: [],
+      });
+    });
+
+    it('should clear existing messages when called', () => {
+      messageCenter.log(mockMessageData);
+      const messagesAfterLog = messageCenter.export();
+      expect(messagesAfterLog[WorkflowMessageType.Log]).toHaveLength(1);
+
+      messageCenter.init();
+      const messagesAfterInit = messageCenter.export();
+      expect(messagesAfterInit).toEqual({
+        [WorkflowMessageType.Log]: [],
+        [WorkflowMessageType.Info]: [],
+        [WorkflowMessageType.Debug]: [],
+        [WorkflowMessageType.Error]: [],
+        [WorkflowMessageType.Warn]: [],
+      });
+    });
+  });
+
+  describe('dispose', () => {
+    it('should not throw error when called', () => {
+      expect(() => messageCenter.dispose()).not.toThrow();
+    });
+  });
+
+  describe('log', () => {
+    it('should create and store log message', () => {
+      const message = messageCenter.log(mockMessageData);
+
+      expect(message.type).toBe(WorkflowMessageType.Log);
+      expect(message.nodeID).toBe(mockMessageData.nodeID);
+      expect(message.message).toBe(mockMessageData.message);
+      expect(message.timestamp).toBe(mockMessageData.timestamp);
+      expect(message.id).toBeDefined();
+      expect(typeof message.id).toBe('string');
+
+      const messages = messageCenter.export();
+      expect(messages[WorkflowMessageType.Log]).toHaveLength(1);
+      expect(messages[WorkflowMessageType.Log][0]).toBe(message);
+    });
+
+    it('should handle message without nodeID', () => {
+      const dataWithoutNodeID = {
+        message: 'Test message without nodeID',
+        timestamp: Date.now(),
+      };
+
+      const message = messageCenter.log(dataWithoutNodeID);
+      expect(message.nodeID).toBeUndefined();
+      expect(message.message).toBe(dataWithoutNodeID.message);
+    });
+  });
+
+  describe('info', () => {
+    it('should create and store info message', () => {
+      const message = messageCenter.info(mockMessageData);
+
+      expect(message.type).toBe(WorkflowMessageType.Info);
+      expect(message.nodeID).toBe(mockMessageData.nodeID);
+      expect(message.message).toBe(mockMessageData.message);
+      expect(message.timestamp).toBe(mockMessageData.timestamp);
+      expect(message.id).toBeDefined();
+
+      const messages = messageCenter.export();
+      expect(messages[WorkflowMessageType.Info]).toHaveLength(1);
+      expect(messages[WorkflowMessageType.Info][0]).toBe(message);
+    });
+  });
+
+  describe('debug', () => {
+    it('should create and store debug message', () => {
+      const message = messageCenter.debug(mockMessageData);
+
+      expect(message.type).toBe(WorkflowMessageType.Debug);
+      expect(message.nodeID).toBe(mockMessageData.nodeID);
+      expect(message.message).toBe(mockMessageData.message);
+      expect(message.timestamp).toBe(mockMessageData.timestamp);
+      expect(message.id).toBeDefined();
+
+      const messages = messageCenter.export();
+      expect(messages[WorkflowMessageType.Debug]).toHaveLength(1);
+      expect(messages[WorkflowMessageType.Debug][0]).toBe(message);
+    });
+  });
+
+  describe('error', () => {
+    it('should create and store error message', () => {
+      const message = messageCenter.error(mockMessageData);
+
+      expect(message.type).toBe(WorkflowMessageType.Error);
+      expect(message.nodeID).toBe(mockMessageData.nodeID);
+      expect(message.message).toBe(mockMessageData.message);
+      expect(message.timestamp).toBe(mockMessageData.timestamp);
+      expect(message.id).toBeDefined();
+
+      const messages = messageCenter.export();
+      expect(messages[WorkflowMessageType.Error]).toHaveLength(1);
+      expect(messages[WorkflowMessageType.Error][0]).toBe(message);
+    });
+  });
+
+  describe('warn', () => {
+    it('should create and store warning message', () => {
+      const message = messageCenter.warn(mockMessageData);
+
+      expect(message.type).toBe(WorkflowMessageType.Warn);
+      expect(message.nodeID).toBe(mockMessageData.nodeID);
+      expect(message.message).toBe(mockMessageData.message);
+      expect(message.timestamp).toBe(mockMessageData.timestamp);
+      expect(message.id).toBeDefined();
+
+      const messages = messageCenter.export();
+      expect(messages[WorkflowMessageType.Warn]).toHaveLength(1);
+      expect(messages[WorkflowMessageType.Warn][0]).toBe(message);
+    });
+  });
+
+  describe('export', () => {
+    beforeEach(() => {
+      // Add different types of messages
+      messageCenter.log({ message: 'Log message', timestamp: 1 });
+      messageCenter.info({ message: 'Info message', timestamp: 2 });
+      messageCenter.debug({ message: 'Debug message', timestamp: 3 });
+      messageCenter.error({ message: 'Error message', timestamp: 4 });
+      messageCenter.warn({ message: 'Warning message', timestamp: 5 });
+    });
+
+    it('should return all messages grouped by type', () => {
+      const messages = messageCenter.export();
+
+      expect(messages[WorkflowMessageType.Log]).toHaveLength(1);
+      expect(messages[WorkflowMessageType.Info]).toHaveLength(1);
+      expect(messages[WorkflowMessageType.Debug]).toHaveLength(1);
+      expect(messages[WorkflowMessageType.Error]).toHaveLength(1);
+      expect(messages[WorkflowMessageType.Warn]).toHaveLength(1);
+
+      // Verify that a copy is returned, not the original array
+      messages[WorkflowMessageType.Log].pop();
+      const newMessages = messageCenter.export();
+      expect(newMessages[WorkflowMessageType.Log]).toHaveLength(1);
+    });
+
+    it('should return correct log messages', () => {
+      const messages = messageCenter.export();
+      const logMessages = messages[WorkflowMessageType.Log];
+      expect(logMessages).toHaveLength(1);
+      expect(logMessages[0].type).toBe(WorkflowMessageType.Log);
+      expect(logMessages[0].message).toBe('Log message');
+    });
+
+    it('should return correct info messages', () => {
+      const messages = messageCenter.export();
+      const infoMessages = messages[WorkflowMessageType.Info];
+      expect(infoMessages).toHaveLength(1);
+      expect(infoMessages[0].type).toBe(WorkflowMessageType.Info);
+      expect(infoMessages[0].message).toBe('Info message');
+    });
+
+    it('should return correct debug messages', () => {
+      const messages = messageCenter.export();
+      const debugMessages = messages[WorkflowMessageType.Debug];
+      expect(debugMessages).toHaveLength(1);
+      expect(debugMessages[0].type).toBe(WorkflowMessageType.Debug);
+      expect(debugMessages[0].message).toBe('Debug message');
+    });
+
+    it('should return correct error messages', () => {
+      const messages = messageCenter.export();
+      const errorMessages = messages[WorkflowMessageType.Error];
+      expect(errorMessages).toHaveLength(1);
+      expect(errorMessages[0].type).toBe(WorkflowMessageType.Error);
+      expect(errorMessages[0].message).toBe('Error message');
+    });
+
+    it('should return correct warning messages', () => {
+      const messages = messageCenter.export();
+      const warnMessages = messages[WorkflowMessageType.Warn];
+      expect(warnMessages).toHaveLength(1);
+      expect(warnMessages[0].type).toBe(WorkflowMessageType.Warn);
+      expect(warnMessages[0].message).toBe('Warning message');
+    });
+
+    it('should return empty arrays when no messages exist', () => {
+      messageCenter.init(); // Clear all messages
+
+      const messages = messageCenter.export();
+      expect(messages[WorkflowMessageType.Log]).toEqual([]);
+      expect(messages[WorkflowMessageType.Info]).toEqual([]);
+      expect(messages[WorkflowMessageType.Debug]).toEqual([]);
+      expect(messages[WorkflowMessageType.Error]).toEqual([]);
+      expect(messages[WorkflowMessageType.Warn]).toEqual([]);
+    });
+
+    it('should maintain message order within each type', () => {
+      // Add multiple messages of the same type
+      messageCenter.log({ message: 'Log message 2', timestamp: 6 });
+      messageCenter.log({ message: 'Log message 3', timestamp: 7 });
+
+      const messages = messageCenter.export();
+      const logMessages = messages[WorkflowMessageType.Log];
+
+      expect(logMessages).toHaveLength(3);
+      expect(logMessages[0].message).toBe('Log message');
+      expect(logMessages[1].message).toBe('Log message 2');
+      expect(logMessages[2].message).toBe('Log message 3');
+    });
+  });
+
+  describe('message uniqueness', () => {
+    it('should generate unique IDs for each message', () => {
+      const message1 = messageCenter.log(mockMessageData);
+      const message2 = messageCenter.log(mockMessageData);
+      const message3 = messageCenter.info(mockMessageData);
+
+      expect(message1.id).not.toBe(message2.id);
+      expect(message1.id).not.toBe(message3.id);
+      expect(message2.id).not.toBe(message3.id);
+    });
+  });
+
+  describe('integration tests', () => {
+    it('should handle multiple operations correctly', () => {
+      // Add various types of messages
+      const logMsg = messageCenter.log({ message: 'Log 1', timestamp: 1 });
+      const infoMsg = messageCenter.info({ message: 'Info 1', timestamp: 2 });
+      const errorMsg = messageCenter.error({ message: 'Error 1', timestamp: 3 });
+
+      expect(logMsg.type).toBe(WorkflowMessageType.Log);
+      expect(infoMsg.type).toBe(WorkflowMessageType.Info);
+      expect(errorMsg.type).toBe(WorkflowMessageType.Error);
+
+      // Verify messages are grouped by type
+      const messages = messageCenter.export();
+      expect(messages[WorkflowMessageType.Log]).toHaveLength(1);
+      expect(messages[WorkflowMessageType.Info]).toHaveLength(1);
+      expect(messages[WorkflowMessageType.Error]).toHaveLength(1);
+      expect(messages[WorkflowMessageType.Debug]).toHaveLength(0);
+      expect(messages[WorkflowMessageType.Warn]).toHaveLength(0);
+
+      // Verify message content
+      expect(messages[WorkflowMessageType.Log][0]).toBe(logMsg);
+      expect(messages[WorkflowMessageType.Info][0]).toBe(infoMsg);
+      expect(messages[WorkflowMessageType.Error][0]).toBe(errorMsg);
+
+      // Reinitialize
+      messageCenter.init();
+      const emptyMessages = messageCenter.export();
+      expect(emptyMessages[WorkflowMessageType.Log]).toHaveLength(0);
+      expect(emptyMessages[WorkflowMessageType.Info]).toHaveLength(0);
+      expect(emptyMessages[WorkflowMessageType.Error]).toHaveLength(0);
+      expect(emptyMessages[WorkflowMessageType.Debug]).toHaveLength(0);
+      expect(emptyMessages[WorkflowMessageType.Warn]).toHaveLength(0);
+
+      // Add new message
+      const newMsg = messageCenter.debug({ message: 'Debug after init', timestamp: 4 });
+      const newMessages = messageCenter.export();
+      expect(newMessages[WorkflowMessageType.Debug]).toHaveLength(1);
+      expect(newMessages[WorkflowMessageType.Debug][0]).toBe(newMsg);
+    });
+  });
+});

+ 85 - 0
packages/runtime/js-core/src/domain/message/message-center/index.ts

@@ -0,0 +1,85 @@
+/**
+ * Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
+ * SPDX-License-Identifier: MIT
+ */
+
+import {
+  IMessage,
+  IMessageCenter,
+  MessageData,
+  WorkflowMessages,
+  WorkflowMessageType,
+} from '@flowgram.ai/runtime-interface';
+
+import { WorkflowRuntimeMessage } from '../message-value-object';
+
+export class WorkflowRuntimeMessageCenter implements IMessageCenter {
+  private messages: WorkflowMessages;
+
+  public init(): void {
+    this.messages = {
+      [WorkflowMessageType.Log]: [],
+      [WorkflowMessageType.Info]: [],
+      [WorkflowMessageType.Debug]: [],
+      [WorkflowMessageType.Error]: [],
+      [WorkflowMessageType.Warn]: [],
+    };
+  }
+
+  public dispose(): void {}
+
+  public log(data: MessageData): IMessage {
+    const message = WorkflowRuntimeMessage.create({
+      type: WorkflowMessageType.Log,
+      ...data,
+    });
+    this.messages[WorkflowMessageType.Log].push(message);
+    return message;
+  }
+
+  public info(data: MessageData): IMessage {
+    const message = WorkflowRuntimeMessage.create({
+      type: WorkflowMessageType.Info,
+      ...data,
+    });
+    this.messages[WorkflowMessageType.Info].push(message);
+    return message;
+  }
+
+  public debug(data: MessageData): IMessage {
+    const message = WorkflowRuntimeMessage.create({
+      type: WorkflowMessageType.Debug,
+      ...data,
+    });
+    this.messages[WorkflowMessageType.Debug].push(message);
+    return message;
+  }
+
+  public error(data: MessageData): IMessage {
+    const message = WorkflowRuntimeMessage.create({
+      type: WorkflowMessageType.Error,
+      ...data,
+    });
+    this.messages[WorkflowMessageType.Error].push(message);
+    return message;
+  }
+
+  public warn(data: MessageData): IMessage {
+    const message = WorkflowRuntimeMessage.create({
+      type: WorkflowMessageType.Warn,
+      ...data,
+    });
+    this.messages[WorkflowMessageType.Warn].push(message);
+    return message;
+  }
+
+  public export(): WorkflowMessages {
+    return {
+      [WorkflowMessageType.Log]: this.messages[WorkflowMessageType.Log].slice(),
+      [WorkflowMessageType.Info]: this.messages[WorkflowMessageType.Info].slice(),
+      [WorkflowMessageType.Debug]: this.messages[WorkflowMessageType.Debug].slice(),
+      [WorkflowMessageType.Error]: this.messages[WorkflowMessageType.Error].slice(),
+      [WorkflowMessageType.Warn]: this.messages[WorkflowMessageType.Warn].slice(),
+    };
+  }
+}

+ 25 - 0
packages/runtime/js-core/src/domain/message/message-value-object/index.ts

@@ -0,0 +1,25 @@
+/**
+ * Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
+ * SPDX-License-Identifier: MIT
+ */
+
+import { IMessage, MessageData, WorkflowMessageType } from '@flowgram.ai/runtime-interface';
+
+import { uuid } from '@infra/utils';
+
+export namespace WorkflowRuntimeMessage {
+  export const create = (
+    params: MessageData & {
+      type: WorkflowMessageType;
+    }
+  ): IMessage => {
+    const message = {
+      id: uuid(),
+      ...params,
+    };
+    if (!params.timestamp) {
+      message.timestamp = Date.now();
+    }
+    return message as IMessage;
+  };
+}

+ 7 - 3
packages/runtime/js-core/src/domain/report/reporter/index.ts

@@ -10,6 +10,8 @@ import {
   IIOCenter,
   IReport,
   NodeReport,
+  WorkflowReports,
+  IMessageCenter,
 } from '@flowgram.ai/runtime-interface';
 
 import { WorkflowRuntimeReport } from '../report-value-object';
@@ -18,7 +20,8 @@ export class WorkflowRuntimeReporter implements IReporter {
   constructor(
     public readonly ioCenter: IIOCenter,
     public readonly snapshotCenter: ISnapshotCenter,
-    public readonly statusCenter: IStatusCenter
+    public readonly statusCenter: IStatusCenter,
+    public readonly messageCenter: IMessageCenter
   ) {}
 
   public init(): void {}
@@ -31,12 +34,13 @@ export class WorkflowRuntimeReporter implements IReporter {
       outputs: this.ioCenter.outputs,
       workflowStatus: this.statusCenter.workflow.export(),
       reports: this.nodeReports(),
+      messages: this.messageCenter.export(),
     });
     return report;
   }
 
-  private nodeReports(): Record<string, NodeReport> {
-    const reports: Record<string, NodeReport> = {};
+  private nodeReports(): WorkflowReports {
+    const reports: WorkflowReports = {};
     const statuses = this.statusCenter.exportNodeStatus();
     const snapshots = this.snapshotCenter.export();
     Object.keys(statuses).forEach((nodeID) => {

+ 1 - 1
packages/runtime/js-core/src/domain/snapshot/snapshot-entity/index.ts

@@ -17,7 +17,7 @@ export class WorkflowRuntimeSnapshot implements ISnapshot {
     this.data = data;
   }
 
-  public addData(data: Partial<SnapshotData>): void {
+  public update(data: Partial<SnapshotData>): void {
     Object.assign(this.data, data);
   }