ai_scheduler/internal/services/callback.go

866 lines
27 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package services
import (
"ai_scheduler/internal/biz"
"ai_scheduler/internal/biz/handle/qywx/json_callback/wxbizjsonmsgcrypt"
"ai_scheduler/internal/config"
"ai_scheduler/internal/data/constants"
errorcode "ai_scheduler/internal/data/error"
"ai_scheduler/internal/data/impl"
"ai_scheduler/internal/domain/component/callback"
"ai_scheduler/internal/domain/tools/common/knowledge_base"
"ai_scheduler/internal/entitys"
"ai_scheduler/internal/gateway"
"ai_scheduler/internal/pkg"
"ai_scheduler/internal/pkg/dingtalk"
"ai_scheduler/internal/pkg/util"
"ai_scheduler/internal/pkg/utils_ollama"
"ai_scheduler/internal/tool_callback"
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"net/url"
"strings"
"time"
"gitea.cdlsxd.cn/self-tools/l-dingtalk-stream-sdk-go/card"
"gitea.cdlsxd.cn/self-tools/l-dingtalk-stream-sdk-go/chatbot"
"github.com/alibabacloud-go/dingtalk/card_1_0"
"github.com/alibabacloud-go/tea/tea"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/log"
"github.com/ollama/ollama/api"
)
// CallbackService 统一回调入口
type CallbackService struct {
cfg *config.Config
gateway *gateway.Gateway
dingtalkOldClient *dingtalk.OldClient
dingtalkContactClient *dingtalk.ContactClient
dingtalkNotableClient *dingtalk.NotableClient
dingtalkCardClient *dingtalk.CardClient
callbackManager callback.Manager
dingTalkBotBiz *biz.DingTalkBotBiz
ollamaClient *utils_ollama.Client
botConfigImpl *impl.BotConfigImpl
}
func NewCallbackService(
cfg *config.Config,
gateway *gateway.Gateway,
dingtalkOldClient *dingtalk.OldClient,
dingtalkContactClient *dingtalk.ContactClient,
dingtalkNotableClient *dingtalk.NotableClient,
dingtalkCardClient *dingtalk.CardClient,
callbackManager callback.Manager,
dingTalkBotBiz *biz.DingTalkBotBiz,
ollamaClient *utils_ollama.Client,
botConfigImpl *impl.BotConfigImpl,
) *CallbackService {
return &CallbackService{
cfg: cfg,
gateway: gateway,
dingtalkOldClient: dingtalkOldClient,
dingtalkContactClient: dingtalkContactClient,
dingtalkNotableClient: dingtalkNotableClient,
dingtalkCardClient: dingtalkCardClient,
callbackManager: callbackManager,
dingTalkBotBiz: dingTalkBotBiz,
ollamaClient: ollamaClient,
botConfigImpl: botConfigImpl,
}
}
// Envelope 回调统一请求体
type Envelope struct {
Action string `json:"action"`
TaskID string `json:"task_id"`
Data json.RawMessage `json:"data"`
}
// bug_optimization_submit 工单回调
const (
ActionBugOptimizationSubmitProcess = "bug_optimization_submit_process" // 工单过程回调
ActionBugOptimizationSubmitDone = "bug_optimization_submit_done" // 工单完成回调
ActionBugOptimizationSubmitUpdate = "bug_optimization_submit_update" // 工单更新回调
)
// BugOptimizationSubmitDoneData 工单完成回调数据
type BugOptimizationSubmitDoneData struct {
Receivers []string `json:"receivers"`
DetailPage string `json:"detail_page"`
Msg string `json:"msg"`
}
// BugOptimizationSubmitUpdateData 工单更新回调数据
type BugOptimizationSubmitUpdateData struct {
BaseId string `json:"base_id"` // 表格ID
SheetId string `json:"sheet_id"` // 表单ID
RecordId string `json:"record_id"` // 记录ID
UnionId string `json:"union_id"` // 钉钉用户 UnionID
Creator string `json:"creator"` // 钉钉用户名称
}
// Callback 统一回调处理
// 头部X-Source-Key / X-Timestamp
func (s *CallbackService) Callback(c *fiber.Ctx) error {
// 读取头
sourceKey := strings.TrimSpace(c.Get("X-Source-Key"))
ts := strings.TrimSpace(c.Get("X-Timestamp"))
// 时间窗口(如果提供了 ts 则校验,否则跳过),窗口 5 分钟
// if ts != "" && !validateTimestamp(ts, 5*time.Minute) {
if ts != "" && !util.IsInTimeWindow(ts, 5*time.Minute) {
return errorcode.AuthNotFound
}
// 解析 Envelope
var env Envelope
if err := json.Unmarshal(c.Body(), &env); err != nil {
return errorcode.ParamErrf("invalid json: %v", err)
}
if env.Action == "" || env.TaskID == "" {
return errorcode.ParamErrf("missing action/task_id")
}
if env.Data == nil {
return errorcode.ParamErrf("missing data")
}
switch sourceKey {
case "dingtalk":
return s.handleDingTalkCallback(c, env)
default:
return errorcode.AuthNotFound
}
}
func (s *CallbackService) CallbackQr(c *fiber.Ctx) error {
// 读取头
sourceKey := strings.TrimSpace(c.Get("X-Source-Key"))
ts := strings.TrimSpace(c.Get("X-Timestamp"))
// 时间窗口(如果提供了 ts 则校验,否则跳过),窗口 5 分钟
// if ts != "" && !validateTimestamp(ts, 5*time.Minute) {
if ts != "" && !util.IsInTimeWindow(ts, 5*time.Minute) {
return errorcode.AuthNotFound
}
// 解析 Envelope
var env Envelope
if err := json.Unmarshal(c.Body(), &env); err != nil {
return errorcode.ParamErrf("invalid json: %v", err)
}
if env.Action == "" || env.TaskID == "" {
return errorcode.ParamErrf("missing action/task_id")
}
if env.Data == nil {
return errorcode.ParamErrf("missing data")
}
switch sourceKey {
case "dingtalk":
return s.handleDingTalkCallback(c, env)
default:
return errorcode.AuthNotFound
}
}
func (s *CallbackService) handleDingTalkCallback(c *fiber.Ctx, env Envelope) error {
// 校验taskId
ctx := c.Context()
sessionID, err := s.callbackManager.GetSession(ctx, env.TaskID)
if err != nil {
return errorcode.ParamErrf("failed to get session for task_id: %s, err: %v", env.TaskID, err)
}
if sessionID == "" {
return errorcode.ParamErrf("missing session_id for task_id: %s", env.TaskID)
}
switch env.Action {
case ActionBugOptimizationSubmitUpdate:
// 业务处理
msg, businessErr := s.handleBugOptimizationSubmitUpdate(ctx, env.Data)
if businessErr != nil {
return businessErr
}
s.sendStreamLog(sessionID, msg)
return c.JSON(fiber.Map{"code": 0, "message": "ok"})
case ActionBugOptimizationSubmitDone:
// 业务处理
msg, businessErr := s.handleBugOptimizationSubmitDone(ctx, env.Data)
if businessErr != nil {
return businessErr
}
// 发送日志
s.sendStreamTxt(sessionID, msg)
// 通知等待者
if err := s.callbackManager.Notify(ctx, env.TaskID, msg); err != nil {
// 记录错误但继续
}
return c.JSON(fiber.Map{"code": 0, "message": "ok"})
case ActionBugOptimizationSubmitProcess:
type processData struct {
Process string `json:"process"`
}
var data processData
if err := json.Unmarshal(env.Data, &data); err != nil {
return errorcode.ParamErrf("invalid json: %v", err)
}
s.sendStreamLoading(sessionID, data.Process)
return c.JSON(fiber.Map{"code": 0, "message": "ok"})
default:
return errorcode.ParamErrf("unknown action: %s", env.Action)
}
}
// getDingtalkReceivers 解析接收者字符串为 DingTalk 用户 ID 列表
func (s *CallbackService) getDingtalkReceivers(ctx context.Context, receiverIds []string) string {
var receiverNames []string
for _, receiverId := range receiverIds {
userDetails, err := s.dingtalkOldClient.QueryUserDetails(ctx, receiverId)
if err != nil {
return ""
}
if userDetails == nil {
return ""
}
receiverNames = append(receiverNames, "@"+userDetails.Name)
}
receivers := strings.Join(receiverNames, " ")
return receivers
}
// sendStreamLog 发送流式日志
func (s *CallbackService) sendStreamLog(sessionID string, content string) {
if content == "" {
return
}
streamLog := entitys.Response{
Index: string(constants.BotToolsBugOptimizationSubmit),
Content: content,
Type: entitys.ResponseLog,
}
streamLogBytes := pkg.JsonByteIgonErr(streamLog)
s.gateway.SendToUid(sessionID, streamLogBytes)
}
// sendStreamTxt 发送流式文本
func (s *CallbackService) sendStreamTxt(sessionID string, content string) {
if content == "" {
return
}
streamLog := entitys.Response{
Index: string(constants.BotToolsBugOptimizationSubmit),
Content: content,
Type: entitys.ResponseText,
}
streamLogBytes := pkg.JsonByteIgonErr(streamLog)
s.gateway.SendToUid(sessionID, streamLogBytes)
}
// sendStreamLoading 发送流式加载过程
func (s *CallbackService) sendStreamLoading(sessionID string, content string) {
if content == "" {
return
}
streamLog := entitys.Response{
Index: string(constants.BotToolsBugOptimizationSubmit),
Content: content,
Type: entitys.ResponseLoading,
}
streamLogBytes := pkg.JsonByteIgonErr(streamLog)
s.gateway.SendToUid(sessionID, streamLogBytes)
}
// handleBugOptimizationSubmitUpdate 处理 bug 优化提交更新回调
func (s *CallbackService) handleBugOptimizationSubmitUpdate(ctx context.Context, taskData json.RawMessage) (string, *errorcode.BusinessErr) {
var data BugOptimizationSubmitUpdateData
if err := json.Unmarshal(taskData, &data); err != nil {
return "", errorcode.ParamErrf("invalid data type: %v", err)
}
if data.Creator == "" {
return "", errorcode.ParamErrf("empty creator")
}
// 获取创建者uid
accessToken, _ := s.dingtalkOldClient.GetAccessToken()
creatorId, err := s.dingtalkContactClient.SearchUserOne(dingtalk.AppKey{AccessToken: accessToken}, data.Creator)
if err != nil {
return "", errorcode.ParamErrf("invalid data type: %v", err)
}
// 获取用户详情
userDetails, err := s.dingtalkOldClient.QueryUserDetails(ctx, creatorId)
if err != nil {
return "", errorcode.ParamErrf("invalid data type: %v", err)
}
if userDetails == nil {
return "", errorcode.ParamErrf("user details not found")
}
unionId := userDetails.UnionID
// 更新记录
ok, err := s.dingtalkNotableClient.UpdateRecord(dingtalk.AppKey{AccessToken: accessToken}, &dingtalk.UpdateRecordReq{
BaseId: data.BaseId,
SheetId: data.SheetId,
RecordId: data.RecordId,
OperatorId: tool_callback.BotBugOptimizationSubmitAdminUnionId,
CreatorUnionId: unionId,
})
if err != nil {
return "", errorcode.ParamErrf("invalid data type: %v", err)
}
if !ok {
return "", errorcode.ParamErrf("update record failed")
}
return "问题记录即将完成", nil
}
// handleBugOptimizationSubmitDone 处理 bug 优化提交完成回调
func (s *CallbackService) handleBugOptimizationSubmitDone(ctx context.Context, taskData json.RawMessage) (string, *errorcode.BusinessErr) {
var data BugOptimizationSubmitDoneData
if err := json.Unmarshal(taskData, &data); err != nil {
return "", errorcode.ParamErrf("invalid data type: %v", err)
}
if len(data.Receivers) == 0 {
return "", errorcode.ParamErrf("empty receivers")
}
// 构建接收者
receivers := s.getDingtalkReceivers(ctx, data.Receivers)
if receivers == "" {
return "", errorcode.ParamErrf("invalid receivers")
}
// 构建跳转链接
var detailPage string
if data.DetailPage != "" {
detailPage = util.BuildJumpLink(data.DetailPage, "去查看")
}
msg := data.Msg
msg = util.ReplacePlaceholder(msg, "receivers", receivers)
msg = util.ReplacePlaceholder(msg, "detail_page", detailPage)
return msg, nil
}
func (s *CallbackService) QywxCallback(c *fiber.Ctx) (err error) {
// 读取头
httpstr := string(c.Request().URI().QueryString())
start := strings.Index(httpstr, "msg_signature=")
start += len("msg_signature=")
var msgSignature string
next := getString(httpstr, "&timestamp=", start, &msgSignature)
var timestamp string
next = getString(httpstr, "&nonce=", next, &timestamp)
var nonce string
next = getString(httpstr, "&echostr=", next, &nonce)
echostr := httpstr[next:len(httpstr)]
echostr, _ = url.QueryUnescape(echostr)
fmt.Println(httpstr, msgSignature, timestamp, nonce, echostr)
wxcpt := wxbizjsonmsgcrypt.NewWXBizMsgCrypt(s.cfg.Qywx.Token, s.cfg.Qywx.AES_KEY, s.cfg.Qywx.CorpId, wxbizjsonmsgcrypt.JsonType)
echoStr, cryptErr := wxcpt.VerifyURL(msgSignature, timestamp, nonce, echostr)
if nil != cryptErr {
log.Errorf("%v", cryptErr)
return fmt.Errorf("%v", cryptErr)
}
fmt.Println("verifyUrl success echoStr", string(echoStr))
err = c.Send(echoStr)
return err
}
func getString(str, endstr string, start int, msg *string) int {
end := strings.Index(str, endstr)
*msg = str[start:end]
return end + len(endstr)
}
// CallbackDingtalkRobot 钉钉机器人回调
// 钉钉 callbackRouteKey: gateway.dev.cdlsxd.cn-dingtalk-robot
// 钉钉 apiSecret: aB3dE7fG9hI2jK4L5M6N7O8P9Q0R1S2T
func (s *CallbackService) CallbackDingtalkRobot(c *fiber.Ctx) (err error) {
// 获取body中的参数
body := c.Request().Body()
var data chatbot.BotCallbackDataModel
if err := json.Unmarshal(body, &data); err != nil {
return fmt.Errorf("invalid body: %v", err)
}
// token 校验 ? token 好像没带?
// 通过机器人ID路由到不同能力
switch data.RobotCode {
case constants.GroupTemplateRobotIdIssueHandling:
// 问题处理群机器人
err := s.issueHandling(c, data)
if err != nil {
return fmt.Errorf("issueHandling failed: %v", err)
}
default:
// 其他机器人
return nil
}
// ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
// defer cancel()
// 统一初始化请求参数
// requireData, err := s.dingTalkBotBiz.InitRequire(ctx, &data)
// if err != nil {
// return fmt.Errorf("初始化请求参数失败: %v", err)
// }
// 这里需要再实现一套HTTP形式的回调用于处理钉钉群模板机器人的回调
// 主程等待处理结果
// resChan := make(chan string, 10)
// defer close(resChan)
return nil
}
// issueHandling 问题处理群机器人回调
// 能力1 通过[内容提取] 宏分析用户QA问题调出QA表单卡片
// 能力2 通过[QA收集] 宏,收集用户反馈,写入知识库
// 能力3 通过[知识库查询] 宏,查询知识库,返回答案
func (s *CallbackService) issueHandling(c *fiber.Ctx, data chatbot.BotCallbackDataModel) error {
// 能力1、2分析用户QA问题写入知识库
if strings.Contains(data.Text.Content, "[内容提取]") || strings.Contains(data.Text.Content, "[QA收集]") {
s.issueHandlingExtractContent(data)
}
// 能力3查询知识库返回答案
if strings.Contains(data.Text.Content, "[知识库查询]") {
s.issueHandlingQueryKnowledgeBase(data)
}
return nil
}
// 问题处理群机器人内容提取
func (s *CallbackService) issueHandlingExtractContent(data chatbot.BotCallbackDataModel) {
systemPrompt := `你是一个【问题与答案生成助手】。
你的职责是:
- 分析用户输入的内容
- 识别其中隐含或明确的问题
- 基于输入内容本身,生成对应的问题与答案
当用户输入为【多条群聊聊天记录】时:
- 结合问题主题,判断聊天记录中正在讨论或试图解决的问题
- 一个群聊中可能包含多个相互独立的问题,但它们都围绕着一个主题,尽可能总结为一个问题
- 若确实问题很独立,需要分别识别,对每个问题,整理出清晰、可复用的“问题描述”和“对应答案”
生成答案时的原则:
- 答案必须来源于聊天内容中已经给出的信息或共识
- 不要引入外部知识,不要使用聊天记录中真实人名或敏感信息,适当总结
- 若聊天中未形成明确答案,应明确标记为“暂无明确结论”
- 若存在多种不同观点,应分别列出,不要擅自合并或裁决
【JSON 输出原则】:
- 你的最终输出必须是**合法的 JSON**
- 不得输出任何额外解释性文字
- JSON 结构必须严格符合以下约定
JSON 结构约定:
{
"items": [
{
"question": "清晰、独立、可复用的问题描述",
"answer": "基于聊天内容整理出的答案;如无结论则为“暂无明确结论”",
"confidence": "high | medium | low"
}
]
}
字段说明:
- items问题与答案列表若未识别到有效问题则返回空数组 []
- question抽象后的标准问题表述不包含具体聊天语句
- answer整理后的答案不得引入聊天之外的信息
- confidence根据聊天中信息的一致性和明确程度给出判断
如果无法从输入中识别出任何有效问题,返回:
{ "items": [] }
问题主题:
%s
用户输入:
%s
`
prompt := fmt.Sprintf(systemPrompt, "紧急加款,提示当前账户为离线账户,请输入银行流水号", data.Text.Content)
fmt.Println("prompt:", prompt)
generateResp, err := s.ollamaClient.Generation(context.Background(), &api.GenerateRequest{
Model: s.cfg.Ollama.GenerateModel,
Prompt: prompt,
Stream: util.AnyToPoint(false),
})
if err != nil {
log.Errorf("问题提取失败: %v", err)
return
}
// 解析 JSON 响应
var resp struct {
Items []struct {
Question string `json:"question"`
Answer string `json:"answer"`
Confidence string `json:"confidence"`
} `json:"items"`
}
if err := json.Unmarshal([]byte(generateResp.Response), &resp); err != nil {
log.Errorf("解析 JSON 响应失败: %v", err)
return
}
cardContentTpl := "问题:%s \n答案%s"
var cardContentList []string
for _, item := range resp.Items {
cardContentList = append(cardContentList, fmt.Sprintf(cardContentTpl, item.Question, item.Answer))
}
cardContent := strings.Join(cardContentList, "\n\n")
// 调用卡片
// 构建卡片 OutTrackId
outTrackId := constants.BuildCardOutTrackId(data.ConversationId, data.RobotCode)
_, err = s.dingtalkCardClient.CreateAndDeliver(
dingtalk.AppKey{
AppKey: "ding5wwvnf9hxeyjau7t",
AppSecret: "FxXVlTzxrKXvJ8h-9uK0s5TjaBfOJSXumpmrHal-NmQAtku9wOPxcss0Af6WHoAK",
},
&card_1_0.CreateAndDeliverRequest{
CardTemplateId: tea.String("3a447814-6a3e-4a02-b48a-92c57b349d77.schema"),
OutTrackId: tea.String(outTrackId),
CallbackType: tea.String("HTTP"),
CallbackRouteKey: tea.String("gateway.dev.cdlsxd.cn-dingtalk-card"),
CardData: &card_1_0.CreateAndDeliverRequestCardData{
CardParamMap: map[string]*string{
"title": tea.String("QA知识收集"),
"button_display": tea.String("true"),
"QA_details_now": tea.String(cardContent),
"textarea_display": tea.String("normal"),
"action_id": tea.String("collect_qa"),
"tenant_id": tea.String(constants.KnowledgeTenantIdDefault),
"_CARD_DEBUG_TOOL_ENTRY": tea.String(constants.CardDebugToolEntryShow), // 调试字段
},
},
ImGroupOpenSpaceModel: &card_1_0.CreateAndDeliverRequestImGroupOpenSpaceModel{
SupportForward: tea.Bool(false),
},
OpenSpaceId: tea.String("dtv1.card//im_group." + data.ConversationId),
ImGroupOpenDeliverModel: &card_1_0.CreateAndDeliverRequestImGroupOpenDeliverModel{
RobotCode: tea.String(constants.GroupTemplateRobotIdIssueHandling),
Recipients: []*string{
tea.String(data.SenderStaffId),
},
},
},
)
}
// 问题处理群机器人查询知识库
func (s *CallbackService) issueHandlingQueryKnowledgeBase(data chatbot.BotCallbackDataModel) {
// 获取应用主机器人
mainRobotCode := data.RobotCode
if robotCode, ok := constants.GroupTemplateRobotIdMap[data.RobotCode]; ok {
mainRobotCode = robotCode
}
// 获取应用机器人配置
robotConfig, err := s.botConfigImpl.GetRobotConfig(mainRobotCode)
if err != nil {
log.Errorf("应用机器人配置不存在: %s, err: %v", mainRobotCode, err)
return
}
// 创建卡片
outTrackId := constants.BuildCardOutTrackId(data.ConversationId, mainRobotCode)
_, err = s.dingtalkCardClient.CreateAndDeliver(
dingtalk.AppKey{
AppKey: robotConfig.ClientId,
AppSecret: robotConfig.ClientSecret,
},
&card_1_0.CreateAndDeliverRequest{
CardTemplateId: tea.String(constants.DingtalkCardTplBaseMsg),
CardData: &card_1_0.CreateAndDeliverRequestCardData{
CardParamMap: map[string]*string{
"title": tea.String(data.Text.Content),
"markdown": tea.String("知识库检索中..."),
},
},
OutTrackId: tea.String(outTrackId),
ImGroupOpenSpaceModel: &card_1_0.CreateAndDeliverRequestImGroupOpenSpaceModel{
SupportForward: tea.Bool(false),
},
OpenSpaceId: tea.String("dtv1.card//im_group." + data.ConversationId),
ImGroupOpenDeliverModel: &card_1_0.CreateAndDeliverRequestImGroupOpenDeliverModel{
RobotCode: tea.String(data.RobotCode),
Recipients: []*string{
tea.String(data.SenderStaffId),
},
},
},
)
// 查询知识库
knowledgeBase := knowledge_base.New(s.cfg.KnowledgeConfig)
knowledgeResp, err := knowledgeBase.Query(&knowledge_base.QueryRequest{
TenantID: constants.KnowledgeTenantIdDefault,
Query: data.Text.Content,
Mode: constants.KnowledgeModeMix,
Stream: false,
Think: false,
OnlyRAG: true,
})
if err != nil {
log.Errorf("查询知识库失败: %v", err)
return
}
knowledgeRespBytes, err := io.ReadAll(knowledgeResp)
if err != nil {
log.Errorf("读取知识库响应失败: %v", err)
return
}
// 卡片更新
message, isRetrieved, err := knowledge_base.ParseOpenAIHTTPData(string(knowledgeRespBytes))
if err != nil {
log.Errorf("读取知识库 SSE 数据失败: %v", err)
return
}
content := message.Content
if !isRetrieved {
content = "知识库未检测到匹配信息,请核查知识库数据是否正确。"
}
// 卡片更新
_, err = s.dingtalkCardClient.UpdateCard(
dingtalk.AppKey{
AppKey: robotConfig.ClientId,
AppSecret: robotConfig.ClientSecret,
},
&card_1_0.UpdateCardRequest{
OutTrackId: tea.String(outTrackId),
CardData: &card_1_0.UpdateCardRequestCardData{
CardParamMap: map[string]*string{
"markdown": tea.String(content),
},
},
CardUpdateOptions: &card_1_0.UpdateCardRequestCardUpdateOptions{
UpdateCardDataByKey: tea.Bool(true),
},
},
)
if err != nil {
log.Errorf("更新卡片失败: %v", err)
return
}
return
}
// 读取知识库 SSE 数据
func (s *CallbackService) readKnowledgeSSE(resp io.ReadCloser, channel chan string) (isRetrieved bool, err error) {
scanner := bufio.NewScanner(resp)
var buffer strings.Builder
for scanner.Scan() {
line := scanner.Text()
delta, done, err := knowledge_base.ParseOpenAIStreamData(line)
if err != nil {
return false, fmt.Errorf("解析SSE数据失败: %w", err)
}
if done {
break
}
if delta == nil {
continue
}
// 知识库未命中 输出提示后中断
if delta.XRagStatus == constants.KnowledgeRagStatusMiss {
var missContent string = "知识库未检测到匹配信息,即将为您创建群聊解决问题。"
channel <- missContent
return false, nil
}
// 推理内容
if delta.ReasoningContent != "" {
channel <- delta.ReasoningContent
continue
}
// 输出内容 - 段落
// 存入缓冲区
buffer.WriteString(delta.Content)
content := buffer.String()
// 检查是否有换行符,按段落输出
if idx := strings.LastIndex(content, "\n"); idx != -1 {
// 发送直到最后一个换行符的内容
toSend := content[:idx+1]
channel <- toSend
// 重置缓冲区,保留剩余部分
remaining := content[idx+1:]
buffer.Reset()
buffer.WriteString(remaining)
}
}
if err := scanner.Err(); err != nil {
return true, fmt.Errorf("读取SSE流中断: %w", err)
}
// 发送缓冲区剩余内容(仅在段落模式下需要)
if buffer.Len() > 0 {
channel <- buffer.String()
}
return true, nil
}
// CallbackDingtalkCard 处理钉钉卡片回调
// 钉钉 callbackRouteKey: gateway.dev.cdlsxd.cn-dingtalk-card
// 钉钉 apiSecret: aB3dE7fG9hI2jK4L5M6N7O8P9Q0R1S2T
func (s *CallbackService) CallbackDingtalkCard(c *fiber.Ctx) error {
// 获取body中的参数
body := c.Request().Body()
// HTTP 回调结构与SDK结构体不符包装结构体
tmp := struct {
card.CardRequest // 嵌入原结构体
UserIdType util.FlexibleType `json:"userIdType"` // 重写type字段
}{}
if err := json.Unmarshal(body, &tmp); err != nil {
return fmt.Errorf("invalid body: %v", err)
}
// 异常字段覆盖
data := tmp.CardRequest
data.UserIdType = tmp.UserIdType.Int()
if err := json.Unmarshal([]byte(data.Content), &data.CardActionData); err != nil {
return fmt.Errorf("invalid content: %v", err)
}
// 非回调类型不处理
if data.Type != constants.CardActionCallbackTypeAction {
return nil
}
// 处理卡片回调
var resp *card.CardResponse
for _, actionId := range data.CardActionData.CardPrivateData.ActionIdList {
switch actionId {
case "collect_qa":
// 问题处理群机器人 QA 收集
resp = s.issueHandlingCollectQA(data)
}
}
// 跳过响应包装
c.Locals("skip_response_wrap", true)
return c.JSON(resp)
}
// 问题处理群机器人 QA 收集
func (s *CallbackService) issueHandlingCollectQA(data card.CardRequest) *card.CardResponse {
// 确认提交,文本写入知识库
if data.CardActionData.CardPrivateData.Params["submit"] == "submit" {
content := data.CardActionData.CardPrivateData.Params["QA_details"].(string)
tenantID := data.CardActionData.CardPrivateData.Params["tenant_id"].(string)
// 协程执行耗时操作,防止阻塞
util.SafeGo("inject_knowledge_base", func() {
knowledgeBase := knowledge_base.New(s.cfg.KnowledgeConfig)
err := knowledgeBase.IngestText(&knowledge_base.IngestTextRequest{
TenantID: tenantID,
Text: content,
})
if err != nil {
log.Errorf("注入知识库失败: %v", err)
} else {
log.Infof("注入知识库成功: tenantID=%s", tenantID)
}
// 解析当前卡片的 ConversationId 和 robotCode
conversationId, robotCode := constants.ParseCardOutTrackId(data.OutTrackId)
// 获取主应用机器人(这里可能是群模板机器人)
mainRobotId := robotCode
if robotCode, ok := constants.GroupTemplateRobotIdMap[robotCode]; ok {
mainRobotId = robotCode
}
// 获取 robot 配置
robotConfig, err := s.botConfigImpl.GetRobotConfig(mainRobotId)
if err != nil {
log.Errorf("获取 robot 配置失败: %v", err)
return
}
// 发送卡片通知用户注入成功
outTrackId := constants.BuildCardOutTrackId(conversationId, robotCode)
s.dingtalkCardClient.CreateAndDeliver(
dingtalk.AppKey{
AppKey: robotConfig.ClientId,
AppSecret: robotConfig.ClientSecret,
},
&card_1_0.CreateAndDeliverRequest{
CardTemplateId: tea.String(constants.DingtalkCardTplBaseMsg),
OutTrackId: tea.String(outTrackId),
CardData: &card_1_0.CreateAndDeliverRequestCardData{
CardParamMap: map[string]*string{
"title": tea.String("QA知识收集结果"),
"markdown": tea.String("[Get] **成功**"),
},
},
ImGroupOpenSpaceModel: &card_1_0.CreateAndDeliverRequestImGroupOpenSpaceModel{
SupportForward: tea.Bool(false),
},
OpenSpaceId: tea.String("dtv1.card//im_group." + conversationId),
ImGroupOpenDeliverModel: &card_1_0.CreateAndDeliverRequestImGroupOpenDeliverModel{
RobotCode: tea.String(robotCode),
Recipients: []*string{
tea.String(data.UserId),
},
},
},
)
})
}
// 取消提交,禁用输入框
resp := &card.CardResponse{
CardUpdateOptions: &card.CardUpdateOptions{
UpdateCardDataByKey: true,
},
CardData: &card.CardDataDto{
CardParamMap: map[string]string{
"textarea_display": "disabled",
},
},
}
return resp
}