本手册以 MCP 节点为例,详细说明如何在 PigX 工作流系统中新增一个自定义节点类型。工作流节点是系统的核心组件,每个节点负责执行特定的业务逻辑。

🏗️ AI Flow 架构设计

AiNodeProcessor 接口:定义节点处理器的基本接口,所有节点处理器必须实现此接口

AbstractNodeProcessor 抽象基类:提供通用的节点处理逻辑和工具方法,减少重复代码

具体节点处理器:实现特定类型节点的业务逻辑,如 MCPNodeProcessor、LLMNodeProcessor 等

🚀 节点开发快速指南

1

定义节点类型常量

NodeTypeConstants.java 中添加新的节点类型标识符

2

创建节点配置类

定义节点所需的配置参数结构

3

扩展节点定义

AiNodeDefinition.java 中添加新节点的配置字段

4

实现节点处理器

创建具体的节点处理器类,继承 AbstractNodeProcessor

5

注册节点处理器

通过 Spring 注解完成自动注册

🔧 详细实现步骤

第一步:定义节点类型常量

NodeTypeConstants.java 中添加新的节点类型:

// 文件位置: pigx-knowledge/src/main/java/com/pig4cloud/pigx/knowledge/support/flow/constants/NodeTypeConstants.java

/**
 * MCP节点
 * 用于调用MCP(Model Context Protocol)服务的节点类型
 */
String MCP = "mcp";

第二步:创建节点配置类

为了接收前端节点配置参数,我们需要创建相应的配置类来定义节点的参数结构。以MCP节点为例:

// 文件位置: pigx-knowledge/src/main/java/com/pig4cloud/pigx/knowledge/support/flow/model/nodes/AiMCPNode.java

package com.pig4cloud.pigx.knowledge.support.flow.model.nodes;

import lombok.Data;

/**
 * MCP节点配置
 * 用于定义MCP(Model Context Protocol)节点的配置参数
 */
@Data
public class AiMCPNode {
    private String mcpId;
    private String mcpName;
    private String prompt;
}

第三步:扩展节点定义

节点定义扩展

AiNodeDefinition.java 中添加新节点的配置字段,使系统能够识别和处理新的节点类型。

// 文件位置: pigx-knowledge/src/main/java/com/pig4cloud/pigx/knowledge/support/flow/model/AiNodeDefinition.java

/**
 * MCP节点参数配置
 */
private AiMCPNode mcpParams;

第四步:实现节点处理器

// 文件位置: pigx-knowledge/src/main/java/com/pig4cloud/pigx/knowledge/support/flow/model/processor/MCPNodeProcessor.java

/**
 * MCP节点处理器 
 * 负责处理流程中的MCP(Model Context Protocol)节点,直接调用McpChatRule避免代码重复
 *
 * @author lengleng
 * @date 2025/03/22
 */
@Slf4j
@Component(NodeTypeConstants.MCP)  // 重要:使用节点类型常量作为Spring Bean名称
@RequiredArgsConstructor
public class MCPNodeProcessor extends AbstractNodeProcessor {

    /**
     * 模板引擎,用于变量替换
     */
    public static final TemplateEngine engine = TemplateUtil.createEngine(new TemplateConfig());

    private final McpChatRule mcpChatRule;

    @Override
    protected Dict doExecute(AiNodeDefinition node, FlowContextHolder context) {
        try {
            // 验证节点配置
            AiMCPNode config = validateNodeConfig(node);

            // 获取输入参数并处理消息模板
            Dict variables = getInputVariables(node, context);
            String inputMessage = engine.getTemplate(config.getPrompt()).render(variables);

            // 构建ChatMessageDTO
            ChatMessageDTO chatMessageDTO = buildChatMessageDTO(config, inputMessage, context);

            // 调用McpChatRule处理MCP调用
            return processMcpCallViaRule(chatMessageDTO, context);

        } catch (Exception e) {
            throw FlowException.nodeError(node.getId(), "[MCP节点] -> " + e.getMessage());
        }
    }

    /**
     * 验证节点配置
     */
    private AiMCPNode validateNodeConfig(AiNodeDefinition node) {
        AiMCPNode config = node.getMcpParams();
        if (config == null) {
            throw FlowException.invalidParam("MCP节点配置无效");
        }
        
        if (StrUtil.isBlank(config.getMcpId())) {
            throw FlowException.invalidParam("MCP配置ID不能为空");
        }
        
        if (StrUtil.isBlank(config.getPrompt())) {
            throw FlowException.invalidParam("提示模板不能为空");
        }
        
        return config;
    }

    /**
     * 构建聊天消息DTO
     */
    private ChatMessageDTO buildChatMessageDTO(AiMCPNode config, String inputMessage, FlowContextHolder context) {
        return ChatMessageDTO.builder()
            .mcpId(config.getMcpId())
            .message(inputMessage)
            .conversationId(context.getParameters().getStr("conversationId"))
            .userId(context.getParameters().getStr("userId"))
            .build();
    }

    /**
     * 通过规则处理MCP调用
     */
    private Dict processMcpCallViaRule(ChatMessageDTO chatMessageDTO, FlowContextHolder context) {
        // 调用现有的MCP聊天规则
        String result = mcpChatRule.execute(chatMessageDTO);
        
        // 返回标准格式的结果
        return Dict.create()
            .set(FlowConstant.CONTENT, result)
            .set(FlowConstant.ROLE, "assistant")
            .set(FlowConstant.TIMESTAMP, System.currentTimeMillis());
    }
}

第五步:自动注册机制

Spring Bean 自动注册

节点处理器通过 Spring 的组件扫描机制自动注册,关键要点:

Bean 名称规范

必须使用节点类型常量
@Component(NodeTypeConstants.MCP)

自动发现机制

工厂自动识别
AiNodeProcessorFactory 会自动发现所有实现

@Component(NodeTypeConstants.MCP)  // Bean名称 = "mcp"
public class MCPNodeProcessor extends AbstractNodeProcessor {
    // 实现内容...
}

📋 开发规范指南

1. 类结构规范

2. 配置验证规范

每个节点处理器都应该包含严格的配置验证逻辑,确保运行时的稳定性。

private AiXXXNode validateNodeConfig(AiNodeDefinition node) {
    AiXXXNode config = node.getXxxParams();
    if (config == null) {
        throw FlowException.invalidParam("XXX节点配置无效");
    }
    
    // 具体参数验证
    if (StrUtil.isBlank(config.getRequiredParam())) {
        throw FlowException.invalidParam("必要参数不能为空");
    }
    
    return config;
}

3. 返回结果规范

标准返回格式

所有节点处理器必须返回统一格式的 Dict 对象,确保系统的一致性。

return Dict.create()
    .set(FlowConstant.CONTENT, resultContent)      // 主要内容
    .set(FlowConstant.ROLE, "assistant")           // 角色标识
    .set(FlowConstant.TOKENS, tokenCount)          // 令牌使用量
    .set(FlowConstant.TIMESTAMP, System.currentTimeMillis());  // 时间戳

🛠️ 工具方法说明

AbstractNodeProcessor 提供的工具方法

FlowContextHolder 上下文方法

上下文变量操作方法

  • getParameters(): 获取流程执行参数
  • getVariables(): 获取变量映射表
  • setVariable(key, value): 设置变量值
  • getVariable(key): 获取指定变量值

🌊 流式处理支持

对于需要实时响应的节点(如AI对话、长时间计算等),系统支持流式处理机制。

流式数据发送

// 检查是否支持流式回调
if (context.getAiFlowExecuteDTO() != null && 
    context.getAiFlowExecuteDTO().getCallback() != null) {
    
    // 发送流式数据
    context.getAiFlowExecuteDTO()
        .getCallback()
        .execute(AiFlowExecuteDTO.FlowCallbackResult.builder()
            .data(AiFlowExecuteDTO.FlowCallbackData.builder()
                .content(content)              // 内容片段
                .nodeId(node.getId())          // 节点ID
                .timestamp(System.currentTimeMillis())  // 时间戳
                .build())
            .build());
}