2
0
Эх сурвалжийг харах

feat(runtime): http node (#546)

Louis Young 5 сар өмнө
parent
commit
8bbdc53a29

+ 15 - 0
apps/demo-free-layout/src/components/testrun/node-status-bar/viewer/index.tsx

@@ -74,6 +74,21 @@ const TreeNode: React.FC<TreeNodeProps> = ({ label, value, level, isLast = false
             {val.toString()}
           </span>
         );
+      case 'object':
+        // Handle empty objects and arrays
+        if (Array.isArray(val)) {
+          return (
+            <span className={styles.primitiveValue} onDoubleClick={() => handleCopy('[]')}>
+              []
+            </span>
+          );
+        } else {
+          return (
+            <span className={styles.primitiveValue} onDoubleClick={() => handleCopy('{}')}>
+              {'{}'}
+            </span>
+          );
+        }
       default:
         return (
           <span className={styles.primitiveValue} onDoubleClick={() => handleCopy(String(val))}>

+ 1 - 0
packages/runtime/interface/src/node/constant.ts

@@ -15,4 +15,5 @@ export enum FlowGramNode {
   Group = 'group',
   BlockStart = 'block-start',
   BlockEnd = 'block-end',
+  HTTP = 'http',
 }

+ 22 - 0
packages/runtime/interface/src/node/http/constant.ts

@@ -0,0 +1,22 @@
+/**
+ * Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
+ * SPDX-License-Identifier: MIT
+ */
+
+export enum HTTPMethod {
+  Get = 'GET',
+  Post = 'POST',
+  Put = 'PUT',
+  Delete = 'DELETE',
+  Patch = 'PATCH',
+  Head = 'HEAD',
+}
+
+export enum HTTPBodyType {
+  None = 'none',
+  FormData = 'form-data',
+  XWwwFormUrlencoded = 'x-www-form-urlencoded',
+  RawText = 'raw-text',
+  JSON = 'JSON',
+  Binary = 'binary',
+}

+ 39 - 0
packages/runtime/interface/src/node/http/index.ts

@@ -0,0 +1,39 @@
+/**
+ * Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
+ * SPDX-License-Identifier: MIT
+ */
+
+import { IFlowConstantRefValue, IFlowTemplateValue } from '@schema/value';
+import { WorkflowNodeSchema } from '@schema/node';
+import { IJsonSchema } from '@schema/json-schema';
+import { FlowGramNode } from '@node/constant';
+import { HTTPBodyType, HTTPMethod } from './constant';
+
+interface HTTPNodeData {
+  title: string;
+  outputs: IJsonSchema<'object'>;
+  api: {
+    method: HTTPMethod;
+    url: IFlowTemplateValue;
+  };
+  headers: IJsonSchema<'object'>;
+  headersValues: Record<string, IFlowConstantRefValue>;
+  params: IJsonSchema<'object'>;
+  paramsValues: Record<string, IFlowConstantRefValue>;
+  body: {
+    bodyType: HTTPBodyType;
+    json?: IFlowTemplateValue;
+    formData?: IJsonSchema<'object'>;
+    formDataValues?: Record<string, IFlowConstantRefValue>;
+    rawText?: IFlowTemplateValue;
+    binary?: IFlowTemplateValue;
+    xWwwFormUrlencoded?: IJsonSchema<'object'>;
+    xWwwFormUrlencodedValues?: Record<string, IFlowConstantRefValue>;
+  };
+  timeout: {
+    retryTimes: number;
+    timeout: number;
+  };
+}
+export { HTTPMethod, HTTPBodyType };
+export type HTTPNodeSchema = WorkflowNodeSchema<FlowGramNode.HTTP, HTTPNodeData>;

+ 1 - 0
packages/runtime/interface/src/node/index.ts

@@ -9,3 +9,4 @@ export { LLMNodeSchema } from './llm';
 export { StartNodeSchema } from './start';
 export { LoopNodeSchema } from './loop';
 export { ConditionNodeSchema, ConditionOperation, ConditionItem } from './condition';
+export { HTTPNodeSchema, HTTPMethod, HTTPBodyType } from './http';

+ 2 - 0
packages/runtime/interface/src/runtime/executor/node-executor.ts

@@ -4,6 +4,7 @@
  */
 
 import { FlowGramNode } from '@node/index';
+import { ISnapshot } from '../snapshot';
 import { INode } from '../document';
 import { IContext } from '../context';
 import { IContainer } from '../container';
@@ -14,6 +15,7 @@ export interface ExecutionContext {
   inputs: WorkflowInputs;
   container: IContainer;
   runtime: IContext;
+  snapshot: ISnapshot;
 }
 
 export interface ExecutionResult {

+ 8 - 1
packages/runtime/interface/src/runtime/state/index.ts

@@ -3,7 +3,13 @@
  * SPDX-License-Identifier: MIT
  */
 
-import { IFlowValue, IFlowRefValue, WorkflowVariableType, IFlowTemplateValue } from '@schema/index';
+import {
+  IFlowValue,
+  IFlowRefValue,
+  WorkflowVariableType,
+  IFlowTemplateValue,
+  IJsonSchema,
+} from '@schema/index';
 import { IVariableParseResult, IVariableStore } from '../variable';
 import { INode } from '../document';
 import { WorkflowInputs, WorkflowOutputs } from '../base';
@@ -15,6 +21,7 @@ export interface IState {
   dispose(): void;
   getNodeInputs(node: INode): WorkflowInputs;
   setNodeOutputs(params: { node: INode; outputs: WorkflowOutputs }): void;
+  parseInputs(params: { values: Record<string, IFlowValue>; declare: IJsonSchema }): WorkflowInputs;
   parseRef<T = unknown>(ref: IFlowRefValue): IVariableParseResult<T> | null;
   parseTemplate(template: IFlowTemplateValue): IVariableParseResult<string> | null;
   parseValue<T = unknown>(

+ 119 - 0
packages/runtime/js-core/src/domain/__tests__/schemas/http-real.test.ts

@@ -0,0 +1,119 @@
+/**
+ * Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
+ * SPDX-License-Identifier: MIT
+ */
+
+import { describe, expect, it } from 'vitest';
+import { IContainer, IEngine, WorkflowStatus } from '@flowgram.ai/runtime-interface';
+
+import { snapshotsToVOData } from '../utils';
+import { WorkflowRuntimeContainer } from '../../container';
+import { TestSchemas } from '.';
+
+const container: IContainer = WorkflowRuntimeContainer.instance;
+
+describe('WorkflowRuntime http schema', () => {
+  it('should execute a workflow with HTTP request', async () => {
+    if (process.env.ENABLE_REAL_TESTS !== 'true') {
+      return;
+    }
+    const engine = container.get<IEngine>(IEngine);
+    const { context, processing } = engine.invoke({
+      schema: TestSchemas.httpSchema,
+      inputs: {
+        host: 'httpbin.org',
+        path: '/post',
+      },
+    });
+    expect(context.statusCenter.workflow.status).toBe(WorkflowStatus.Processing);
+    const result = await processing;
+    expect(context.statusCenter.workflow.status).toBe(WorkflowStatus.Succeeded);
+
+    // Verify the result structure
+    expect(result).toHaveProperty('res');
+    expect(result).toHaveProperty('code');
+    expect(typeof result.res).toBe('string');
+    expect(typeof result.code).toBe('number');
+    expect(result.code).toBe(200);
+
+    const snapshots = snapshotsToVOData(context.snapshotCenter.exportAll());
+    expect(snapshots).toHaveLength(3);
+
+    // Verify start node snapshot
+    expect(snapshots[0]).toMatchObject({
+      nodeID: 'start_0',
+      inputs: {},
+      outputs: {
+        host: 'httpbin.org',
+        path: '/post',
+      },
+      data: {},
+    });
+
+    // Verify http node snapshot
+    expect(snapshots[1]).toMatchObject({
+      nodeID: 'http_0',
+      inputs: {
+        method: 'POST',
+        url: 'https://httpbin.org/post',
+        body: '{}',
+        headers: {},
+        params: {},
+        timeout: 10000,
+        retryTimes: 1,
+      },
+      data: {},
+    });
+    expect(snapshots[1].outputs).toHaveProperty('body');
+    expect(snapshots[1].outputs).toHaveProperty('headers');
+    expect(snapshots[1].outputs).toHaveProperty('statusCode');
+    expect(snapshots[1].outputs.statusCode).toBe(200);
+
+    // Verify end node snapshot
+    expect(snapshots[2]).toMatchObject({
+      nodeID: 'end_0',
+      data: {},
+    });
+    expect(snapshots[2].inputs).toHaveProperty('res');
+    expect(snapshots[2].inputs).toHaveProperty('code');
+    expect(snapshots[2].outputs).toHaveProperty('res');
+    expect(snapshots[2].outputs).toHaveProperty('code');
+    expect(snapshots[2].outputs.code).toBe(200);
+
+    const report = context.reporter.export();
+    expect(report.workflowStatus.status).toBe(WorkflowStatus.Succeeded);
+    expect(report.reports.start_0.status).toBe(WorkflowStatus.Succeeded);
+    expect(report.reports.http_0.status).toBe(WorkflowStatus.Succeeded);
+    expect(report.reports.end_0.status).toBe(WorkflowStatus.Succeeded);
+  });
+
+  it('should handle HTTP request with different inputs', async () => {
+    if (process.env.ENABLE_REAL_TESTS !== 'true') {
+      return;
+    }
+    const engine = container.get<IEngine>(IEngine);
+    const { context, processing } = engine.invoke({
+      schema: TestSchemas.httpSchema,
+      inputs: {
+        host: 'jsonplaceholder.typicode.com',
+        path: '/posts',
+      },
+    });
+    expect(context.statusCenter.workflow.status).toBe(WorkflowStatus.Processing);
+    const result = await processing;
+    expect(context.statusCenter.workflow.status).toBe(WorkflowStatus.Succeeded);
+
+    // Verify the result structure
+    expect(result).toHaveProperty('res');
+    expect(result).toHaveProperty('code');
+    expect(typeof result.res).toBe('string');
+    expect(typeof result.code).toBe('number');
+    expect(result.code).toBe(201);
+
+    const report = context.reporter.export();
+    expect(report.workflowStatus.status).toBe(WorkflowStatus.Succeeded);
+    expect(report.reports.start_0.status).toBe(WorkflowStatus.Succeeded);
+    expect(report.reports.http_0.status).toBe(WorkflowStatus.Succeeded);
+    expect(report.reports.end_0.status).toBe(WorkflowStatus.Succeeded);
+  });
+});

+ 241 - 0
packages/runtime/js-core/src/domain/__tests__/schemas/http.test.ts

@@ -0,0 +1,241 @@
+/**
+ * Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
+ * SPDX-License-Identifier: MIT
+ */
+
+import { describe, expect, it, vi, beforeEach } from 'vitest';
+import { IContainer, IEngine, WorkflowStatus } from '@flowgram.ai/runtime-interface';
+
+import { snapshotsToVOData } from '../utils';
+import { WorkflowRuntimeContainer } from '../../container';
+import { TestSchemas } from '.';
+
+const container: IContainer = WorkflowRuntimeContainer.instance;
+
+// Mock global fetch function
+const mockFetch = vi.fn();
+global.fetch = mockFetch;
+
+describe('WorkflowRuntime http schema', () => {
+  beforeEach(() => {
+    // Reset mock before each test
+    mockFetch.mockReset();
+  });
+
+  it('should execute a workflow with HTTP request', async () => {
+    // Mock successful HTTP response
+    mockFetch.mockResolvedValueOnce({
+      ok: true,
+      status: 200,
+      statusText: 'OK',
+      headers: new Headers({
+        'content-type': 'application/json',
+      }),
+      text: async () =>
+        JSON.stringify({
+          url: 'https://api.example.com/post',
+          json: {},
+          headers: {
+            'Content-Type': 'application/json',
+          },
+        }),
+    });
+
+    const engine = container.get<IEngine>(IEngine);
+    const { context, processing } = engine.invoke({
+      schema: TestSchemas.httpSchema,
+      inputs: {
+        host: 'api.example.com',
+        path: '/post',
+      },
+    });
+
+    expect(context.statusCenter.workflow.status).toBe(WorkflowStatus.Processing);
+    const result = await processing;
+    expect(context.statusCenter.workflow.status).toBe(WorkflowStatus.Succeeded);
+
+    // Verify the result structure
+    expect(result).toHaveProperty('res');
+    expect(result).toHaveProperty('code');
+    expect(typeof result.res).toBe('string');
+    expect(typeof result.code).toBe('number');
+    expect(result.code).toBe(200);
+
+    // Verify fetch was called with correct parameters
+    expect(mockFetch).toHaveBeenCalledTimes(1);
+    expect(mockFetch).toHaveBeenCalledWith(
+      'https://api.example.com/post',
+      expect.objectContaining({
+        method: 'POST',
+        body: '{}',
+        headers: expect.any(Object),
+      })
+    );
+
+    const snapshots = snapshotsToVOData(context.snapshotCenter.exportAll());
+    expect(snapshots).toHaveLength(3);
+
+    // Verify start node snapshot
+    expect(snapshots[0]).toMatchObject({
+      nodeID: 'start_0',
+      inputs: {},
+      outputs: {
+        host: 'api.example.com',
+        path: '/post',
+      },
+      data: {},
+    });
+
+    // Verify http node snapshot
+    expect(snapshots[1]).toMatchObject({
+      nodeID: 'http_0',
+      inputs: {
+        method: 'POST',
+        url: 'https://api.example.com/post',
+        body: '{}',
+        headers: {},
+        params: {},
+        timeout: 10000,
+        retryTimes: 1,
+      },
+      data: {},
+    });
+    expect(snapshots[1].outputs).toHaveProperty('body');
+    expect(snapshots[1].outputs).toHaveProperty('headers');
+    expect(snapshots[1].outputs).toHaveProperty('statusCode');
+    expect(snapshots[1].outputs.statusCode).toBe(200);
+
+    // Verify end node snapshot
+    expect(snapshots[2]).toMatchObject({
+      nodeID: 'end_0',
+      data: {},
+    });
+    expect(snapshots[2].inputs).toHaveProperty('res');
+    expect(snapshots[2].inputs).toHaveProperty('code');
+    expect(snapshots[2].outputs).toHaveProperty('res');
+    expect(snapshots[2].outputs).toHaveProperty('code');
+    expect(snapshots[2].outputs.code).toBe(200);
+
+    const report = context.reporter.export();
+    expect(report.workflowStatus.status).toBe(WorkflowStatus.Succeeded);
+    expect(report.reports.start_0.status).toBe(WorkflowStatus.Succeeded);
+    expect(report.reports.http_0.status).toBe(WorkflowStatus.Succeeded);
+    expect(report.reports.end_0.status).toBe(WorkflowStatus.Succeeded);
+  });
+
+  it('should handle HTTP request with different inputs and status codes', async () => {
+    // Mock HTTP response with 201 status
+    mockFetch.mockResolvedValueOnce({
+      ok: true,
+      status: 201,
+      statusText: 'Created',
+      headers: new Headers({
+        'content-type': 'application/json',
+      }),
+      text: async () =>
+        JSON.stringify({
+          id: 101,
+          title: 'foo',
+          body: 'bar',
+          userId: 1,
+        }),
+    });
+
+    const engine = container.get<IEngine>(IEngine);
+    const { context, processing } = engine.invoke({
+      schema: TestSchemas.httpSchema,
+      inputs: {
+        host: 'api.test.com',
+        path: '/posts',
+      },
+    });
+
+    expect(context.statusCenter.workflow.status).toBe(WorkflowStatus.Processing);
+    const result = await processing;
+    expect(context.statusCenter.workflow.status).toBe(WorkflowStatus.Succeeded);
+
+    // Verify the result structure
+    expect(result).toHaveProperty('res');
+    expect(result).toHaveProperty('code');
+    expect(typeof result.res).toBe('string');
+    expect(typeof result.code).toBe('number');
+    expect(result.code).toBe(201);
+
+    // Verify fetch was called with correct URL
+    expect(mockFetch).toHaveBeenCalledTimes(1);
+    expect(mockFetch).toHaveBeenCalledWith(
+      'https://api.test.com/posts',
+      expect.objectContaining({
+        method: 'POST',
+      })
+    );
+
+    const report = context.reporter.export();
+    expect(report.workflowStatus.status).toBe(WorkflowStatus.Succeeded);
+    expect(report.reports.start_0.status).toBe(WorkflowStatus.Succeeded);
+    expect(report.reports.http_0.status).toBe(WorkflowStatus.Succeeded);
+    expect(report.reports.end_0.status).toBe(WorkflowStatus.Succeeded);
+  });
+
+  it('should handle HTTP request failure', async () => {
+    // Mock HTTP error response
+    mockFetch.mockResolvedValueOnce({
+      ok: false,
+      status: 404,
+      statusText: 'Not Found',
+      headers: new Headers(),
+      text: async () => 'Not Found',
+    });
+
+    const engine = container.get<IEngine>(IEngine);
+    const { context, processing } = engine.invoke({
+      schema: TestSchemas.httpSchema,
+      inputs: {
+        host: 'api.mock.com',
+        path: '/nonexistent',
+      },
+    });
+
+    expect(context.statusCenter.workflow.status).toBe(WorkflowStatus.Processing);
+    const result = await processing;
+
+    // The workflow should still succeed but with error status code
+    expect(context.statusCenter.workflow.status).toBe(WorkflowStatus.Succeeded);
+    expect(result).toHaveProperty('res');
+    expect(result).toHaveProperty('code');
+    expect(result.code).toBe(404);
+
+    // Verify fetch was called
+    expect(mockFetch).toHaveBeenCalledTimes(1);
+    expect(mockFetch).toHaveBeenCalledWith(
+      'https://api.mock.com/nonexistent',
+      expect.objectContaining({
+        method: 'POST',
+      })
+    );
+  });
+
+  it('should handle network error', async () => {
+    // Mock network error
+    mockFetch.mockRejectedValueOnce(new Error('Network error'));
+
+    const engine = container.get<IEngine>(IEngine);
+    const { context, processing } = engine.invoke({
+      schema: TestSchemas.httpSchema,
+      inputs: {
+        host: 'api.invalid.test',
+        path: '/test',
+      },
+    });
+
+    expect(context.statusCenter.workflow.status).toBe(WorkflowStatus.Processing);
+    await processing;
+
+    // The workflow should fail due to network error
+    expect(context.statusCenter.workflow.status).toBe(WorkflowStatus.Failed);
+
+    const report = context.reporter.export();
+    expect(report.workflowStatus.status).toBe(WorkflowStatus.Failed);
+    expect(report.reports.http_0.status).toBe(WorkflowStatus.Failed);
+  });
+});

+ 133 - 0
packages/runtime/js-core/src/domain/__tests__/schemas/http.ts

@@ -0,0 +1,133 @@
+/**
+ * Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
+ * SPDX-License-Identifier: MIT
+ */
+
+import { WorkflowSchema } from '@flowgram.ai/runtime-interface';
+
+export const httpSchema: WorkflowSchema = {
+  nodes: [
+    {
+      id: 'start_0',
+      type: 'start',
+      meta: {
+        position: {
+          x: 180,
+          y: 125.5,
+        },
+      },
+      data: {
+        title: 'Start',
+        outputs: {
+          type: 'object',
+          properties: {
+            host: {
+              type: 'string',
+              extra: {
+                index: 0,
+              },
+            },
+            path: {
+              type: 'string',
+              extra: {
+                index: 1,
+              },
+            },
+          },
+          required: ['host', 'path'],
+        },
+      },
+    },
+    {
+      id: 'end_0',
+      type: 'end',
+      meta: {
+        position: {
+          x: 1100,
+          y: 125.5,
+        },
+      },
+      data: {
+        title: 'End',
+        inputsValues: {
+          res: {
+            type: 'ref',
+            content: ['http_0', 'body'],
+          },
+          code: {
+            type: 'ref',
+            content: ['http_0', 'statusCode'],
+          },
+        },
+        inputs: {
+          type: 'object',
+          properties: {
+            res: {
+              type: 'string',
+            },
+            code: {
+              type: 'integer',
+            },
+          },
+        },
+      },
+    },
+    {
+      id: 'http_0',
+      type: 'http',
+      meta: {
+        position: {
+          x: 640,
+          y: 0,
+        },
+      },
+      data: {
+        title: 'HTTP_0',
+        api: {
+          method: 'POST',
+          url: {
+            type: 'template',
+            content: 'https://{{start_0.host}}{{start_0.path}}',
+          },
+        },
+        body: {
+          bodyType: 'JSON',
+          json: {
+            type: 'template',
+            content: '{}',
+          },
+        },
+        headers: {},
+        params: {},
+        outputs: {
+          type: 'object',
+          properties: {
+            body: {
+              type: 'string',
+            },
+            headers: {
+              type: 'object',
+            },
+            statusCode: {
+              type: 'integer',
+            },
+          },
+        },
+        timeout: {
+          timeout: 10000,
+          retryTimes: 1,
+        },
+      },
+    },
+  ],
+  edges: [
+    {
+      sourceNodeID: 'start_0',
+      targetNodeID: 'http_0',
+    },
+    {
+      sourceNodeID: 'http_0',
+      targetNodeID: 'end_0',
+    },
+  ],
+};

+ 4 - 2
packages/runtime/js-core/src/domain/__tests__/schemas/index.ts

@@ -6,17 +6,19 @@
 import { validateInputsSchema } from './validate-inputs';
 import { twoLLMSchema } from './two-llm';
 import { loopSchema } from './loop';
+import { llmRealSchema } from './llm-real';
+import { httpSchema } from './http';
 import { branchTwoLayersSchema } from './branch-two-layers';
 import { branchSchema } from './branch';
-import { basicLLMSchema } from './basic-llm';
 import { basicSchema } from './basic';
 
 export const TestSchemas = {
   twoLLMSchema,
   basicSchema,
   branchSchema,
-  basicLLMSchema,
+  llmRealSchema,
   loopSchema,
   branchTwoLayersSchema,
   validateInputsSchema,
+  httpSchema,
 };

+ 3 - 3
packages/runtime/js-core/src/domain/__tests__/schemas/basic-llm.test.ts → packages/runtime/js-core/src/domain/__tests__/schemas/llm-real.test.ts

@@ -19,9 +19,9 @@ beforeEach(() => {
   executor.register(new LLMExecutor());
 });
 
-describe('workflow runtime basic test', () => {
+describe('workflow runtime real llm test', () => {
   it('should execute workflow', async () => {
-    if (process.env.ENABLE_MODEL_TEST !== 'true') {
+    if (process.env.ENABLE_REAL_TESTS !== 'true') {
       return;
     }
     if (!process.env.MODEL_NAME || !process.env.API_KEY || !process.env.API_HOST) {
@@ -32,7 +32,7 @@ describe('workflow runtime basic test', () => {
     const apiKey = process.env.API_KEY;
     const apiHost = process.env.API_HOST;
     const { context, processing } = engine.invoke({
-      schema: TestSchemas.basicLLMSchema,
+      schema: TestSchemas.llmRealSchema,
       inputs: {
         model_name: modelName,
         api_key: apiKey,

+ 1 - 1
packages/runtime/js-core/src/domain/__tests__/schemas/basic-llm.ts → packages/runtime/js-core/src/domain/__tests__/schemas/llm-real.ts

@@ -5,7 +5,7 @@
 
 import type { WorkflowSchema } from '@flowgram.ai/runtime-interface';
 
-export const basicLLMSchema: WorkflowSchema = {
+export const llmRealSchema: WorkflowSchema = {
   nodes: [
     {
       id: 'start_0',

+ 1 - 0
packages/runtime/js-core/src/domain/engine/index.ts

@@ -72,6 +72,7 @@ export class WorkflowRuntimeEngine implements IEngine {
         inputs,
         runtime: context,
         container: WorkflowRuntimeContainer.instance,
+        snapshot,
       });
       if (context.statusCenter.workflow.terminated) {
         return;

+ 33 - 21
packages/runtime/js-core/src/domain/state/index.ts

@@ -15,6 +15,7 @@ import {
   IVariableStore,
   WorkflowVariableType,
   IFlowTemplateValue,
+  IJsonSchema,
 } from '@flowgram.ai/runtime-interface';
 
 import { uuid, WorkflowRuntimeType } from '@infra/utils';
@@ -39,27 +40,10 @@ export class WorkflowRuntimeState implements IState {
   public getNodeInputs(node: INode): WorkflowInputs {
     const inputsDeclare = node.declare.inputs;
     const inputsValues = node.declare.inputsValues;
-    if (!inputsDeclare || !inputsValues) {
-      return {};
-    }
-    return Object.entries(inputsValues).reduce((prev, [key, inputValue]) => {
-      const typeInfo = inputsDeclare.properties?.[key];
-      if (!typeInfo) {
-        return prev;
-      }
-      const expectType = typeInfo.type as WorkflowVariableType;
-      // get value
-      const result = this.parseValue(inputValue);
-      if (!result) {
-        return prev;
-      }
-      const { value, type } = result;
-      if (!WorkflowRuntimeType.isTypeEqual(type, expectType)) {
-        return prev;
-      }
-      prev[key] = value;
-      return prev;
-    }, {} as WorkflowInputs);
+    return this.parseInputs({
+      values: inputsValues,
+      declare: inputsDeclare,
+    });
   }
 
   public setNodeOutputs(params: { node: INode; outputs: WorkflowOutputs }): void {
@@ -87,6 +71,34 @@ export class WorkflowRuntimeState implements IState {
     });
   }
 
+  public parseInputs(params: {
+    values?: Record<string, IFlowValue>;
+    declare?: IJsonSchema;
+  }): WorkflowInputs {
+    const { values, declare } = params;
+    if (!declare || !values) {
+      return {};
+    }
+    return Object.entries(values).reduce((prev, [key, inputValue]) => {
+      const typeInfo = declare.properties?.[key];
+      if (!typeInfo) {
+        return prev;
+      }
+      const expectType = typeInfo.type as WorkflowVariableType;
+      // get value
+      const result = this.parseValue(inputValue);
+      if (!result) {
+        return prev;
+      }
+      const { value, type } = result;
+      if (!WorkflowRuntimeType.isTypeEqual(type, expectType)) {
+        return prev;
+      }
+      prev[key] = value;
+      return prev;
+    }, {} as WorkflowInputs);
+  }
+
   public parseRef<T = unknown>(ref: IFlowRefValue): IVariableParseResult<T> | null {
     if (ref?.type !== 'ref') {
       throw new Error(`Invalid ref value: ${ref}`);

+ 275 - 0
packages/runtime/js-core/src/nodes/http/index.ts

@@ -0,0 +1,275 @@
+/**
+ * Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
+ * SPDX-License-Identifier: MIT
+ */
+
+import {
+  ExecutionContext,
+  ExecutionResult,
+  FlowGramNode,
+  HTTPBodyType,
+  HTTPMethod,
+  HTTPNodeSchema,
+  INode,
+  INodeExecutor,
+} from '@flowgram.ai/runtime-interface';
+
+export interface HTTPExecutorInputs {
+  method: HTTPMethod;
+  url: string;
+  headers: Record<string, string>;
+  params: Record<string, string>;
+  bodyType: HTTPBodyType;
+  body: string;
+  retryTimes: number;
+  timeout: number;
+}
+
+export class HTTPExecutor implements INodeExecutor {
+  public type = FlowGramNode.HTTP;
+
+  public async execute(context: ExecutionContext): Promise<ExecutionResult> {
+    const inputs = this.parseInputs(context);
+    const response = await this.request(inputs);
+
+    const responseHeaders: Record<string, string> = {};
+    response.headers.forEach((value, key) => {
+      responseHeaders[key] = value;
+    });
+
+    const responseBody = await response.text();
+
+    return {
+      outputs: {
+        headers: responseHeaders,
+        statusCode: response.status,
+        body: responseBody,
+      },
+    };
+  }
+
+  private async request(inputs: HTTPExecutorInputs): Promise<Response> {
+    const { method, url, headers, params, bodyType, body, retryTimes, timeout } = inputs;
+
+    // Build URL with query parameters
+    const urlWithParams = this.buildUrlWithParams(url, params);
+
+    // Prepare request options
+    const requestOptions: RequestInit = {
+      method,
+      headers: this.prepareHeaders(headers, bodyType),
+      signal: AbortSignal.timeout(timeout),
+    };
+
+    // Add body if method supports it
+    if (method !== 'GET' && method !== 'HEAD' && body) {
+      requestOptions.body = this.prepareBody(body, bodyType);
+    }
+
+    // Implement retry logic
+    let lastError: Error | null = null;
+    for (let attempt = 0; attempt <= retryTimes; attempt++) {
+      try {
+        const response = await fetch(urlWithParams, requestOptions);
+        return response;
+      } catch (error) {
+        lastError = error as Error;
+        if (attempt < retryTimes) {
+          // Wait before retry (exponential backoff)
+          await new Promise((resolve) => setTimeout(resolve, Math.pow(2, attempt) * 1000));
+        }
+      }
+    }
+
+    throw lastError || new Error('HTTP request failed after all retry attempts');
+  }
+
+  private parseInputs(context: ExecutionContext): HTTPExecutorInputs {
+    const httpNode = context.node as INode<HTTPNodeSchema['data']>;
+    const method = httpNode.data.api.method;
+    const urlVariable = context.runtime.state.parseTemplate(httpNode.data.api.url);
+    if (!urlVariable) {
+      throw new Error('HTTP url is required');
+    }
+    const url = urlVariable.value;
+    const headers = context.runtime.state.parseInputs({
+      values: httpNode.data.headersValues,
+      declare: httpNode.data.headers,
+    });
+    const params = context.runtime.state.parseInputs({
+      values: httpNode.data.paramsValues,
+      declare: httpNode.data.params,
+    });
+    const body = this.parseBody(context);
+    const retryTimes = httpNode.data.timeout.retryTimes;
+    const timeout = httpNode.data.timeout.timeout;
+    const inputs = {
+      method,
+      url,
+      headers,
+      params,
+      bodyType: body.bodyType,
+      body: body.body,
+      retryTimes,
+      timeout,
+    };
+    context.snapshot.update({
+      inputs: JSON.parse(JSON.stringify(inputs)),
+    });
+    return inputs;
+  }
+
+  private parseBody(context: ExecutionContext): {
+    bodyType: HTTPBodyType;
+    body: string;
+  } {
+    const httpNode = context.node as INode<HTTPNodeSchema['data']>;
+    const bodyType = httpNode.data.body.bodyType;
+    if (bodyType === HTTPBodyType.None) {
+      return {
+        bodyType,
+        body: '',
+      };
+    }
+    if (bodyType === HTTPBodyType.JSON) {
+      if (!httpNode.data.body.json) {
+        throw new Error('HTTP json body is required');
+      }
+      const jsonVariable = context.runtime.state.parseTemplate(httpNode.data.body.json);
+      if (!jsonVariable) {
+        throw new Error('HTTP json body is required');
+      }
+      return {
+        bodyType,
+        body: jsonVariable.value,
+      };
+    }
+    if (bodyType === HTTPBodyType.FormData) {
+      if (!httpNode.data.body.formData || !httpNode.data.body.formDataValues) {
+        throw new Error('HTTP form-data body is required');
+      }
+
+      const formData = context.runtime.state.parseInputs({
+        values: httpNode.data.body.formDataValues,
+        declare: httpNode.data.body.formData,
+      });
+      return {
+        bodyType,
+        body: JSON.stringify(formData),
+      };
+    }
+    if (bodyType === HTTPBodyType.RawText) {
+      if (!httpNode.data.body.json) {
+        throw new Error('HTTP json body is required');
+      }
+      const jsonVariable = context.runtime.state.parseTemplate(httpNode.data.body.json);
+      if (!jsonVariable) {
+        throw new Error('HTTP json body is required');
+      }
+      return {
+        bodyType,
+        body: jsonVariable.value,
+      };
+    }
+    if (bodyType === HTTPBodyType.Binary) {
+      if (!httpNode.data.body.binary) {
+        throw new Error('HTTP binary body is required');
+      }
+      const binaryVariable = context.runtime.state.parseTemplate(httpNode.data.body.binary);
+      if (!binaryVariable) {
+        throw new Error('HTTP binary body is required');
+      }
+      return {
+        bodyType,
+        body: binaryVariable.value,
+      };
+    }
+    if (bodyType === HTTPBodyType.XWwwFormUrlencoded) {
+      if (!httpNode.data.body.xWwwFormUrlencoded || !httpNode.data.body.xWwwFormUrlencodedValues) {
+        throw new Error('HTTP x-www-form-urlencoded body is required');
+      }
+      const xWwwFormUrlencoded = context.runtime.state.parseInputs({
+        values: httpNode.data.body.xWwwFormUrlencodedValues,
+        declare: httpNode.data.body.xWwwFormUrlencoded,
+      });
+      return {
+        bodyType,
+        body: JSON.stringify(xWwwFormUrlencoded),
+      };
+    }
+    throw new Error(`HTTP invalid body type "${bodyType}"`);
+  }
+
+  private buildUrlWithParams(url: string, params: Record<string, string>): string {
+    const urlObj = new URL(url);
+    Object.entries(params).forEach(([key, value]) => {
+      if (value !== undefined && value !== null && value !== '') {
+        urlObj.searchParams.set(key, value);
+      }
+    });
+    return urlObj.toString();
+  }
+
+  private prepareHeaders(
+    headers: Record<string, string>,
+    bodyType: HTTPBodyType
+  ): Record<string, string> {
+    const preparedHeaders = { ...headers };
+
+    // Set Content-Type based on body type if not already set
+    if (!preparedHeaders['Content-Type'] && !preparedHeaders['content-type']) {
+      switch (bodyType) {
+        case HTTPBodyType.JSON:
+          preparedHeaders['Content-Type'] = 'application/json';
+          break;
+        case HTTPBodyType.FormData:
+          // Don't set Content-Type for FormData, let browser set it with boundary
+          break;
+        case HTTPBodyType.XWwwFormUrlencoded:
+          preparedHeaders['Content-Type'] = 'application/x-www-form-urlencoded';
+          break;
+        case HTTPBodyType.RawText:
+          preparedHeaders['Content-Type'] = 'text/plain';
+          break;
+        case HTTPBodyType.Binary:
+          preparedHeaders['Content-Type'] = 'application/octet-stream';
+          break;
+      }
+    }
+
+    return preparedHeaders;
+  }
+
+  private prepareBody(body: string, bodyType: HTTPBodyType): string | FormData {
+    switch (bodyType) {
+      case HTTPBodyType.JSON:
+        return body;
+      case HTTPBodyType.FormData:
+        const formData = new FormData();
+        try {
+          const data = JSON.parse(body);
+          Object.entries(data).forEach(([key, value]) => {
+            formData.append(key, String(value));
+          });
+        } catch (error) {
+          throw new Error('Invalid FormData body format');
+        }
+        return formData;
+      case HTTPBodyType.XWwwFormUrlencoded:
+        try {
+          const data = JSON.parse(body);
+          const params = new URLSearchParams();
+          Object.entries(data).forEach(([key, value]) => {
+            params.append(key, String(value));
+          });
+          return params.toString();
+        } catch (error) {
+          throw new Error('Invalid x-www-form-urlencoded body format');
+        }
+      case HTTPBodyType.RawText:
+      case HTTPBodyType.Binary:
+      default:
+        return body;
+    }
+  }
+}

+ 2 - 0
packages/runtime/js-core/src/nodes/index.ts

@@ -8,6 +8,7 @@ import { INodeExecutorFactory } from '@flowgram.ai/runtime-interface';
 import { StartExecutor } from './start';
 import { LoopExecutor } from './loop';
 import { LLMExecutor } from './llm';
+import { HTTPExecutor } from './http';
 import { EndExecutor } from './end';
 import { BlockEndExecutor, BlockStartExecutor } from './empty';
 import { ConditionExecutor } from './condition';
@@ -20,4 +21,5 @@ export const WorkflowRuntimeNodeExecutors: INodeExecutorFactory[] = [
   LoopExecutor,
   BlockStartExecutor,
   BlockEndExecutor,
+  HTTPExecutor,
 ];