本手册以 MCP 节点为例,详细说明如何在 PigX 工作流系统中新增一个自定义节点类型。工作流节点是系统的核心组件,每个节点负责执行特定的业务逻辑。
🏗️ AI Flow 架构设计
AiNodeProcessor 接口:定义节点处理器的基本接口,所有节点处理器必须实现此接口
AbstractNodeProcessor 抽象基类:提供通用的节点处理逻辑和工具方法,减少重复代码
具体节点处理器:实现特定类型节点的业务逻辑,如 MCPNodeProcessor、LLMNodeProcessor 等
🚀 节点开发快速指南
定义节点类型常量
在 NodeTypeConstants.java
中添加新的节点类型标识符
扩展节点定义
在 AiNodeDefinition.java
中添加新节点的配置字段
实现节点处理器
创建具体的节点处理器类,继承 AbstractNodeProcessor
🔧 详细实现步骤
第一步:定义节点类型常量
在 NodeTypeConstants.java
中添加新的节点类型:
// 文件位置: pigx-knowledge/src/main/java/com/pig4cloud/pigx/knowledge/support/flow/constants/NodeTypeConstants.java
/**
* MCP节点
* 用于调用MCP(Model Context Protocol)服务的节点类型
*/
String MCP = "mcp";
第二步:创建节点配置类
为了接收前端节点配置参数,我们需要创建相应的配置类来定义节点的参数结构。以MCP节点为例:
// 文件位置: pigx-knowledge/src/main/java/com/pig4cloud/pigx/knowledge/support/flow/model/nodes/AiMCPNode.java
package com.pig4cloud.pigx.knowledge.support.flow.model.nodes;
import lombok.Data;
/**
* MCP节点配置
* 用于定义MCP(Model Context Protocol)节点的配置参数
*/
@Data
public class AiMCPNode {
private String mcpId;
private String mcpName;
private String prompt;
}
第三步:扩展节点定义
节点定义扩展
在 AiNodeDefinition.java
中添加新节点的配置字段,使系统能够识别和处理新的节点类型。
// 文件位置: pigx-knowledge/src/main/java/com/pig4cloud/pigx/knowledge/support/flow/model/AiNodeDefinition.java
/**
* MCP节点参数配置
*/
private AiMCPNode mcpParams;
第四步:实现节点处理器
// 文件位置: pigx-knowledge/src/main/java/com/pig4cloud/pigx/knowledge/support/flow/model/processor/MCPNodeProcessor.java
/**
* MCP节点处理器
* 负责处理流程中的MCP(Model Context Protocol)节点,直接调用McpChatRule避免代码重复
*
* @author lengleng
* @date 2025/03/22
*/
@Slf4j
@Component(NodeTypeConstants.MCP) // 重要:使用节点类型常量作为Spring Bean名称
@RequiredArgsConstructor
public class MCPNodeProcessor extends AbstractNodeProcessor {
/**
* 模板引擎,用于变量替换
*/
public static final TemplateEngine engine = TemplateUtil.createEngine(new TemplateConfig());
private final McpChatRule mcpChatRule;
@Override
protected Dict doExecute(AiNodeDefinition node, FlowContextHolder context) {
try {
// 验证节点配置
AiMCPNode config = validateNodeConfig(node);
// 获取输入参数并处理消息模板
Dict variables = getInputVariables(node, context);
String inputMessage = engine.getTemplate(config.getPrompt()).render(variables);
// 构建ChatMessageDTO
ChatMessageDTO chatMessageDTO = buildChatMessageDTO(config, inputMessage, context);
// 调用McpChatRule处理MCP调用
return processMcpCallViaRule(chatMessageDTO, context);
} catch (Exception e) {
throw FlowException.nodeError(node.getId(), "[MCP节点] -> " + e.getMessage());
}
}
/**
* 验证节点配置
*/
private AiMCPNode validateNodeConfig(AiNodeDefinition node) {
AiMCPNode config = node.getMcpParams();
if (config == null) {
throw FlowException.invalidParam("MCP节点配置无效");
}
if (StrUtil.isBlank(config.getMcpId())) {
throw FlowException.invalidParam("MCP配置ID不能为空");
}
if (StrUtil.isBlank(config.getPrompt())) {
throw FlowException.invalidParam("提示模板不能为空");
}
return config;
}
/**
* 构建聊天消息DTO
*/
private ChatMessageDTO buildChatMessageDTO(AiMCPNode config, String inputMessage, FlowContextHolder context) {
return ChatMessageDTO.builder()
.mcpId(config.getMcpId())
.message(inputMessage)
.conversationId(context.getParameters().getStr("conversationId"))
.userId(context.getParameters().getStr("userId"))
.build();
}
/**
* 通过规则处理MCP调用
*/
private Dict processMcpCallViaRule(ChatMessageDTO chatMessageDTO, FlowContextHolder context) {
// 调用现有的MCP聊天规则
String result = mcpChatRule.execute(chatMessageDTO);
// 返回标准格式的结果
return Dict.create()
.set(FlowConstant.CONTENT, result)
.set(FlowConstant.ROLE, "assistant")
.set(FlowConstant.TIMESTAMP, System.currentTimeMillis());
}
}
第五步:自动注册机制
Spring Bean 自动注册
节点处理器通过 Spring 的组件扫描机制自动注册,关键要点:
Bean 名称规范
必须使用节点类型常量
@Component(NodeTypeConstants.MCP)
自动发现机制
工厂自动识别
AiNodeProcessorFactory
会自动发现所有实现
@Component(NodeTypeConstants.MCP) // Bean名称 = "mcp"
public class MCPNodeProcessor extends AbstractNodeProcessor {
// 实现内容...
}
📋 开发规范指南
1. 类结构规范
@Slf4j
@Component(NodeTypeConstants.XXX) // 使用节点类型常量
@RequiredArgsConstructor
public class XXXNodeProcessor extends AbstractNodeProcessor {
// 依赖注入的服务
private final SomeService someService;
// 核心执行方法
@Override
protected Dict doExecute(AiNodeDefinition node, FlowContextHolder context) {
// 1. 参数验证
// 2. 业务逻辑执行
// 3. 结果处理
// 4. 返回标准格式
}
// 私有辅助方法
private void validateConfig() { }
private void processData() { }
}
2. 配置验证规范
每个节点处理器都应该包含严格的配置验证逻辑,确保运行时的稳定性。
private AiXXXNode validateNodeConfig(AiNodeDefinition node) {
AiXXXNode config = node.getXxxParams();
if (config == null) {
throw FlowException.invalidParam("XXX节点配置无效");
}
// 具体参数验证
if (StrUtil.isBlank(config.getRequiredParam())) {
throw FlowException.invalidParam("必要参数不能为空");
}
return config;
}
3. 返回结果规范
标准返回格式
所有节点处理器必须返回统一格式的 Dict 对象,确保系统的一致性。
return Dict.create()
.set(FlowConstant.CONTENT, resultContent) // 主要内容
.set(FlowConstant.ROLE, "assistant") // 角色标识
.set(FlowConstant.TOKENS, tokenCount) // 令牌使用量
.set(FlowConstant.TIMESTAMP, System.currentTimeMillis()); // 时间戳
🛠️ 工具方法说明
AbstractNodeProcessor 提供的工具方法
getInputVariables(node, context)
功能: 获取节点输入参数
用途: 从上下文中提取当前节点需要的输入变量
返回: Dict 对象,包含所有输入变量的键值对
getOutputVariables(node, result)
功能: 处理节点输出参数
用途: 将节点执行结果按照配置输出到上下文
参数: 节点定义和执行结果
addQueryParam(url, name, value)
功能: 添加URL查询参数
用途: 构建带参数的URL请求
返回: 拼接好参数的完整URL
FlowContextHolder 上下文方法
上下文变量操作方法:
getParameters()
: 获取流程执行参数
getVariables()
: 获取变量映射表
setVariable(key, value)
: 设置变量值
getVariable(key)
: 获取指定变量值
上下文变量操作方法:
getParameters()
: 获取流程执行参数
getVariables()
: 获取变量映射表
setVariable(key, value)
: 设置变量值
getVariable(key)
: 获取指定变量值
执行配置相关方法:
getAiFlowExecuteDTO()
: 获取执行配置对象
getUserId()
: 获取当前用户ID
getConversationId()
: 获取会话ID
🌊 流式处理支持
对于需要实时响应的节点(如AI对话、长时间计算等),系统支持流式处理机制。
流式数据发送
// 检查是否支持流式回调
if (context.getAiFlowExecuteDTO() != null &&
context.getAiFlowExecuteDTO().getCallback() != null) {
// 发送流式数据
context.getAiFlowExecuteDTO()
.getCallback()
.execute(AiFlowExecuteDTO.FlowCallbackResult.builder()
.data(AiFlowExecuteDTO.FlowCallbackData.builder()
.content(content) // 内容片段
.nodeId(node.getId()) // 节点ID
.timestamp(System.currentTimeMillis()) // 时间戳
.build())
.build());
}