后端入口

AiChatController.msg 端点获取到前端的消息会建立 SSE 双向请求链接

AiChatController.msg 是整个聊天流程的入口点,它接收前端传来的消息 key,并建立 Server-Sent Events (SSE) 连接,实现服务器向客户端的实时推送。

@Inner(value = false)
@GetMapping(value = "/msg/list", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<AiMessageResultDTO> msg(@RequestParam Long key) {
    try {
        return chatService.chatList(key).concatWithValues(new AiMessageResultDTO("[DONE]"));
    }
    catch (Exception e) {
        log.error("chat error", e);
        return Flux.just(new AiMessageResultDTO(e.getMessage())).concatWithValues(new AiMessageResultDTO("[DONE]"));
    }
}

这个端点依赖于之前的 createConnection 调用,即使接口设置为公开,没有有效的 message key 也无法调用。

消息预处理

AiChatServiceImpl.chatList 进行消息的处理

当 msg 端点接收到请求后,会调用 AiChatServiceImpl.chatList 方法进行消息处理:

  1. 首先根据 key 查询聊天记录
  2. 如果记录不存在,返回”链接已失效”
  3. 构建 ChatMessageDTO 对象,填充必要信息
  4. 执行风控逻辑

风控逻辑:flowRisk

// 如果开启了规则引擎
if (flowExecutorOptional.isPresent()) {
    Flux<AiMessageResultDTO> aiMessageResultDTO = flowRisk(chatMessageDTO);
    if (aiMessageResultDTO != null)
        return aiMessageResultDTO;
}

风控逻辑通过规则引擎实现,遵循以下流程:

<chain name="chat">
    THEN(
    // 敏感词判断,触发直接退出流程
    sensitive,
    // 判断是否执行风控规则,触发直接退出流程;  内部调用按用户名+总量控制,外部调用按IP+总量控制
    IF(isNoLimit, noLimit, IF(isInner, WHEN(user,tokens), WHEN(ip,tokens)))
    );
</chain>

风控逻辑主要包括:

  • 敏感词检测:检测用户输入是否包含敏感词
  • 限流控制:根据调用方式不同采取不同的限流策略
    • 内部调用:按用户名 + 总量控制
    • 外部调用:按 IP+ 总量控制

根据请求类型匹配处理规则

ChatTypeEnums chatTypeEnums = ChatTypeEnums.fromCode(chatMessageDTO.getDatasetId());
ChatMessageContextHolder.set(chatMessageDTO);
return chatRuleMap.get(chatTypeEnums.getType()).process(chatMessageDTO);

系统根据 datasetId 确定请求消息类型,主要包括:

Chat TypeCodeDescription
FUNCTION_CHAT-1L功能聊天
SIMPLE_CHAT0L简单聊天
DATABASE_CHAT-2L数据库聊天
IMAGE_CHAT-3L生成图片
MARKMAP_CHAT-4L生成脑图
FLOW_CHAT-5L编排
JSON_CHAT-6LJSON 聊天
REASON_CHAT-7L推理聊天
VECTOR_CHAT1L知识库聊天

聊天类型枚举

知识库聊天处理流程 (VectorChatRule)

当请求类型为知识库聊天时,系统会调用 VectorChatRule 进行处理:

@Override
public Flux<AiMessageResultDTO> process(ChatMessageDTO chatMessageDTO) {
    AiDatasetEntity dataset = aiDatasetService.getById(chatMessageDTO.getDatasetId());
    DimensionAwareEmbeddingModel embeddingModel = modelProvider.getEmbeddingModel(dataset.getEmbeddingModel());
    Embedding queryEmbedding = embeddingModel.embed(chatMessageDTO.getContent()).content();

    // 使用标注数据处理结果
    if (YesNoEnum.YES.getCode().equals(dataset.getStandardFlag())) {
        Flux<AiMessageResultDTO> q2qFluxResult = q2QStandardRagChatHandler
            .process(queryEmbedding, dataset, chatMessageDTO)
            .cache();
        // 如果 q2qFluxResult 不是 empty 则直接返回,如果是 empty 则继续执行 q2AVectorRagChatHandler
        return q2qFluxResult
            .switchIfEmpty(q2AVectorRagChatHandler.process(queryEmbedding, dataset, chatMessageDTO));
    }

    return q2AVectorRagChatHandler.process(queryEmbedding, dataset, chatMessageDTO);
}

处理流程包括:

  1. 获取知识库数据集信息
  2. 获取嵌入模型并将用户问题转换为向量
  3. 根据知识库配置决定处理方式:
    • 如果启用了标准问答(standardFlag=YES),先尝试问题匹配(Q2QStandardRagChatHandler)
    • 如果问题匹配无结果,或未启用标准问答,则使用答案匹配(Q2AVectorRagChatHandler)

向量搜索处理 (Q2AVectorRagChatHandler)

public Flux<AiMessageResultDTO> process(Embedding embeddedList, AiDatasetEntity dataset,
        ChatMessageDTO chatMessageDTO) {
    double minScore = NumberUtil.div(Double.parseDouble(dataset.getScore().toString()), Double.parseDouble("100"),
            2);

    EmbeddingSearchRequest embeddingSearchRequest = EmbeddingSearchRequest.builder()
        .queryEmbedding(embeddedList)
        .maxResults(dataset.getTopK())
        .filter(metadataKey(AiDocumentEntity.Fields.datasetId).isEqualTo(dataset.getId().toString())
            .and(metadataKey(DocumentTypeEnums.Fields.type).isEqualTo(DocumentTypeEnums.ANSWER.getType())))
        .minScore(minScore)
        .build();

    EmbeddingSearchResult<TextSegment> searchResult = embeddingStoreService
        .embeddingStore(dataset.getCollectionName())
        .search(embeddingSearchRequest);
    List<EmbeddingMatch<TextSegment>> embeddingMatchList = searchResult.matches();

    // 未匹配
    if (CollUtil.isEmpty(embeddingMatchList)) {
        return Flux.just(new AiMessageResultDTO(dataset.getEmptyDesc()));
    }

    // 更新命中次数
    List<String> embeddingIdList = embeddingMatchList.stream().map(EmbeddingMatch::embeddingId).toList();
    aiSliceService.updateHitCount(embeddingIdList);
    
    // 对向量结果进行总结
    Flux<AiMessageResultDTO> aiMessageResultDTOFlux = summaryResult(dataset, chatMessageDTO,
            embeddingMatchList.stream().map(EmbeddingMatch::embedded).map(TextSegment::text).toList())
        .cache();

    // 修改 map 逻辑在最后拼接一下参考资料
    AiMessageResultDTO aiMessageResultDTO = new AiMessageResultDTO();
    aiMessageResultDTO.setMessage(StrUtil.EMPTY);
    List<AiMessageResultDTO.ExtLink> extLinks = buildExtMessage(embeddingMatchList);
    aiMessageResultDTO.setExtLinks(extLinks);
    return aiMessageResultDTOFlux.concatWithValues(aiMessageResultDTO);
}

向量搜索处理流程:

  1. 根据知识库配置设置最小相似度分数
  2. 构建向量搜索请求,包括:
    • 查询向量
    • 最大结果数
    • 过滤条件(数据集 ID 和文档类型)
    • 最小分数
  3. 执行向量搜索
  4. 处理搜索结果:
    • 如果没有匹配结果,返回知识库配置的空结果描述
    • 如果有匹配结果,更新命中次数
  5. 调用大模型对搜索结果进行总结(summaryResult)
  6. 构建参考资料链接
  7. 返回总结结果和参考资料

结果总结处理

结果总结处理流程:

  1. 创建提示模板,加载系统提示(knowledge-system.st)
  2. 向模板添加参数:
    • 搜索结果内容
    • 用户问题
    • 空结果描述
  3. 获取大模型服务
  4. 调用大模型进行聊天,生成总结
  5. 将总结结果转换为 AiMessageResultDTO 对象并返回

summaryResult 方法代码

public Flux<AiMessageResultDTO> summaryResult(AiDatasetEntity dataset, ChatMessageDTO chatMessageDTO,
        List<String> resultList) {
    // 对结果进行总结
    PromptTemplate userTemplate = new PromptTemplate(systemResource);
    userTemplate.add("contents", CollUtil.join(resultList, StrUtil.CRLF));
    userTemplate.add("userMessage", chatMessageDTO.getContent());
    userTemplate.add("emptyDesc", dataset.getEmptyDesc());

    AiStreamAssistantService streamAssistantService = modelProvider
        .getAiStreamAssistant(chatMessageDTO.getModelName())
        .getValue();
    Flux<String> summaryResult = streamAssistantService.chat(chatMessageDTO.getConversationId(),
            userTemplate.render());
    return summaryResult.map(AiMessageResultDTO::new);
}

总结处理流程