ai_scheduler/internal/services/callback.go

578 lines
18 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package services
import (
"ai_scheduler/internal/biz"
"ai_scheduler/internal/biz/handle/qywx/json_callback/wxbizjsonmsgcrypt"
"ai_scheduler/internal/config"
"ai_scheduler/internal/data/constants"
errorcode "ai_scheduler/internal/data/error"
"ai_scheduler/internal/domain/component/callback"
"ai_scheduler/internal/entitys"
"ai_scheduler/internal/gateway"
"ai_scheduler/internal/pkg"
"ai_scheduler/internal/pkg/dingtalk"
"ai_scheduler/internal/pkg/util"
"ai_scheduler/internal/pkg/utils_ollama"
"ai_scheduler/internal/tool_callback"
"context"
"encoding/json"
"fmt"
"net/url"
"strings"
"time"
"gitea.cdlsxd.cn/self-tools/l-dingtalk-stream-sdk-go/chatbot"
"github.com/alibabacloud-go/dingtalk/card_1_0"
"github.com/alibabacloud-go/tea/tea"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/log"
"github.com/ollama/ollama/api"
)
// CallbackService 统一回调入口
type CallbackService struct {
cfg *config.Config
gateway *gateway.Gateway
dingtalkOldClient *dingtalk.OldClient
dingtalkContactClient *dingtalk.ContactClient
dingtalkNotableClient *dingtalk.NotableClient
dingtalkCardClient *dingtalk.CardClient
callbackManager callback.Manager
dingTalkBotBiz *biz.DingTalkBotBiz
ollamaClient *utils_ollama.Client
}
func NewCallbackService(
cfg *config.Config,
gateway *gateway.Gateway,
dingtalkOldClient *dingtalk.OldClient,
dingtalkContactClient *dingtalk.ContactClient,
dingtalkNotableClient *dingtalk.NotableClient,
dingtalkCardClient *dingtalk.CardClient,
callbackManager callback.Manager,
dingTalkBotBiz *biz.DingTalkBotBiz,
ollamaClient *utils_ollama.Client,
) *CallbackService {
return &CallbackService{
cfg: cfg,
gateway: gateway,
dingtalkOldClient: dingtalkOldClient,
dingtalkContactClient: dingtalkContactClient,
dingtalkNotableClient: dingtalkNotableClient,
dingtalkCardClient: dingtalkCardClient,
callbackManager: callbackManager,
dingTalkBotBiz: dingTalkBotBiz,
ollamaClient: ollamaClient,
}
}
// Envelope 回调统一请求体
type Envelope struct {
Action string `json:"action"`
TaskID string `json:"task_id"`
Data json.RawMessage `json:"data"`
}
// bug_optimization_submit 工单回调
const (
ActionBugOptimizationSubmitProcess = "bug_optimization_submit_process" // 工单过程回调
ActionBugOptimizationSubmitDone = "bug_optimization_submit_done" // 工单完成回调
ActionBugOptimizationSubmitUpdate = "bug_optimization_submit_update" // 工单更新回调
)
// BugOptimizationSubmitDoneData 工单完成回调数据
type BugOptimizationSubmitDoneData struct {
Receivers []string `json:"receivers"`
DetailPage string `json:"detail_page"`
Msg string `json:"msg"`
}
// BugOptimizationSubmitUpdateData 工单更新回调数据
type BugOptimizationSubmitUpdateData struct {
BaseId string `json:"base_id"` // 表格ID
SheetId string `json:"sheet_id"` // 表单ID
RecordId string `json:"record_id"` // 记录ID
UnionId string `json:"union_id"` // 钉钉用户 UnionID
Creator string `json:"creator"` // 钉钉用户名称
}
// Callback 统一回调处理
// 头部X-Source-Key / X-Timestamp
func (s *CallbackService) Callback(c *fiber.Ctx) error {
// 读取头
sourceKey := strings.TrimSpace(c.Get("X-Source-Key"))
ts := strings.TrimSpace(c.Get("X-Timestamp"))
// 时间窗口(如果提供了 ts 则校验,否则跳过),窗口 5 分钟
// if ts != "" && !validateTimestamp(ts, 5*time.Minute) {
if ts != "" && !util.IsInTimeWindow(ts, 5*time.Minute) {
return errorcode.AuthNotFound
}
// 解析 Envelope
var env Envelope
if err := json.Unmarshal(c.Body(), &env); err != nil {
return errorcode.ParamErrf("invalid json: %v", err)
}
if env.Action == "" || env.TaskID == "" {
return errorcode.ParamErrf("missing action/task_id")
}
if env.Data == nil {
return errorcode.ParamErrf("missing data")
}
switch sourceKey {
case "dingtalk":
return s.handleDingTalkCallback(c, env)
default:
return errorcode.AuthNotFound
}
}
func (s *CallbackService) CallbackQr(c *fiber.Ctx) error {
// 读取头
sourceKey := strings.TrimSpace(c.Get("X-Source-Key"))
ts := strings.TrimSpace(c.Get("X-Timestamp"))
// 时间窗口(如果提供了 ts 则校验,否则跳过),窗口 5 分钟
// if ts != "" && !validateTimestamp(ts, 5*time.Minute) {
if ts != "" && !util.IsInTimeWindow(ts, 5*time.Minute) {
return errorcode.AuthNotFound
}
// 解析 Envelope
var env Envelope
if err := json.Unmarshal(c.Body(), &env); err != nil {
return errorcode.ParamErrf("invalid json: %v", err)
}
if env.Action == "" || env.TaskID == "" {
return errorcode.ParamErrf("missing action/task_id")
}
if env.Data == nil {
return errorcode.ParamErrf("missing data")
}
switch sourceKey {
case "dingtalk":
return s.handleDingTalkCallback(c, env)
default:
return errorcode.AuthNotFound
}
}
func (s *CallbackService) handleDingTalkCallback(c *fiber.Ctx, env Envelope) error {
// 校验taskId
ctx := c.Context()
sessionID, err := s.callbackManager.GetSession(ctx, env.TaskID)
if err != nil {
return errorcode.ParamErrf("failed to get session for task_id: %s, err: %v", env.TaskID, err)
}
if sessionID == "" {
return errorcode.ParamErrf("missing session_id for task_id: %s", env.TaskID)
}
switch env.Action {
case ActionBugOptimizationSubmitUpdate:
// 业务处理
msg, businessErr := s.handleBugOptimizationSubmitUpdate(ctx, env.Data)
if businessErr != nil {
return businessErr
}
s.sendStreamLog(sessionID, msg)
return c.JSON(fiber.Map{"code": 0, "message": "ok"})
case ActionBugOptimizationSubmitDone:
// 业务处理
msg, businessErr := s.handleBugOptimizationSubmitDone(ctx, env.Data)
if businessErr != nil {
return businessErr
}
// 发送日志
s.sendStreamTxt(sessionID, msg)
// 通知等待者
if err := s.callbackManager.Notify(ctx, env.TaskID, msg); err != nil {
// 记录错误但继续
}
return c.JSON(fiber.Map{"code": 0, "message": "ok"})
case ActionBugOptimizationSubmitProcess:
type processData struct {
Process string `json:"process"`
}
var data processData
if err := json.Unmarshal(env.Data, &data); err != nil {
return errorcode.ParamErrf("invalid json: %v", err)
}
s.sendStreamLoading(sessionID, data.Process)
return c.JSON(fiber.Map{"code": 0, "message": "ok"})
default:
return errorcode.ParamErrf("unknown action: %s", env.Action)
}
}
// getDingtalkReceivers 解析接收者字符串为 DingTalk 用户 ID 列表
func (s *CallbackService) getDingtalkReceivers(ctx context.Context, receiverIds []string) string {
var receiverNames []string
for _, receiverId := range receiverIds {
userDetails, err := s.dingtalkOldClient.QueryUserDetails(ctx, receiverId)
if err != nil {
return ""
}
if userDetails == nil {
return ""
}
receiverNames = append(receiverNames, "@"+userDetails.Name)
}
receivers := strings.Join(receiverNames, " ")
return receivers
}
// sendStreamLog 发送流式日志
func (s *CallbackService) sendStreamLog(sessionID string, content string) {
if content == "" {
return
}
streamLog := entitys.Response{
Index: string(constants.BotToolsBugOptimizationSubmit),
Content: content,
Type: entitys.ResponseLog,
}
streamLogBytes := pkg.JsonByteIgonErr(streamLog)
s.gateway.SendToUid(sessionID, streamLogBytes)
}
// sendStreamTxt 发送流式文本
func (s *CallbackService) sendStreamTxt(sessionID string, content string) {
if content == "" {
return
}
streamLog := entitys.Response{
Index: string(constants.BotToolsBugOptimizationSubmit),
Content: content,
Type: entitys.ResponseText,
}
streamLogBytes := pkg.JsonByteIgonErr(streamLog)
s.gateway.SendToUid(sessionID, streamLogBytes)
}
// sendStreamLoading 发送流式加载过程
func (s *CallbackService) sendStreamLoading(sessionID string, content string) {
if content == "" {
return
}
streamLog := entitys.Response{
Index: string(constants.BotToolsBugOptimizationSubmit),
Content: content,
Type: entitys.ResponseLoading,
}
streamLogBytes := pkg.JsonByteIgonErr(streamLog)
s.gateway.SendToUid(sessionID, streamLogBytes)
}
// handleBugOptimizationSubmitUpdate 处理 bug 优化提交更新回调
func (s *CallbackService) handleBugOptimizationSubmitUpdate(ctx context.Context, taskData json.RawMessage) (string, *errorcode.BusinessErr) {
var data BugOptimizationSubmitUpdateData
if err := json.Unmarshal(taskData, &data); err != nil {
return "", errorcode.ParamErrf("invalid data type: %v", err)
}
if data.Creator == "" {
return "", errorcode.ParamErrf("empty creator")
}
// 获取创建者uid
accessToken, _ := s.dingtalkOldClient.GetAccessToken()
creatorId, err := s.dingtalkContactClient.SearchUserOne(dingtalk.AppKey{AccessToken: accessToken}, data.Creator)
if err != nil {
return "", errorcode.ParamErrf("invalid data type: %v", err)
}
// 获取用户详情
userDetails, err := s.dingtalkOldClient.QueryUserDetails(ctx, creatorId)
if err != nil {
return "", errorcode.ParamErrf("invalid data type: %v", err)
}
if userDetails == nil {
return "", errorcode.ParamErrf("user details not found")
}
unionId := userDetails.UnionID
// 更新记录
ok, err := s.dingtalkNotableClient.UpdateRecord(dingtalk.AppKey{AccessToken: accessToken}, &dingtalk.UpdateRecordReq{
BaseId: data.BaseId,
SheetId: data.SheetId,
RecordId: data.RecordId,
OperatorId: tool_callback.BotBugOptimizationSubmitAdminUnionId,
CreatorUnionId: unionId,
})
if err != nil {
return "", errorcode.ParamErrf("invalid data type: %v", err)
}
if !ok {
return "", errorcode.ParamErrf("update record failed")
}
return "问题记录即将完成", nil
}
// handleBugOptimizationSubmitDone 处理 bug 优化提交完成回调
func (s *CallbackService) handleBugOptimizationSubmitDone(ctx context.Context, taskData json.RawMessage) (string, *errorcode.BusinessErr) {
var data BugOptimizationSubmitDoneData
if err := json.Unmarshal(taskData, &data); err != nil {
return "", errorcode.ParamErrf("invalid data type: %v", err)
}
if len(data.Receivers) == 0 {
return "", errorcode.ParamErrf("empty receivers")
}
// 构建接收者
receivers := s.getDingtalkReceivers(ctx, data.Receivers)
if receivers == "" {
return "", errorcode.ParamErrf("invalid receivers")
}
// 构建跳转链接
var detailPage string
if data.DetailPage != "" {
detailPage = util.BuildJumpLink(data.DetailPage, "去查看")
}
msg := data.Msg
msg = util.ReplacePlaceholder(msg, "receivers", receivers)
msg = util.ReplacePlaceholder(msg, "detail_page", detailPage)
return msg, nil
}
func (s *CallbackService) QywxCallback(c *fiber.Ctx) (err error) {
// 读取头
httpstr := string(c.Request().URI().QueryString())
start := strings.Index(httpstr, "msg_signature=")
start += len("msg_signature=")
var msgSignature string
next := getString(httpstr, "&timestamp=", start, &msgSignature)
var timestamp string
next = getString(httpstr, "&nonce=", next, &timestamp)
var nonce string
next = getString(httpstr, "&echostr=", next, &nonce)
echostr := httpstr[next:len(httpstr)]
echostr, _ = url.QueryUnescape(echostr)
fmt.Println(httpstr, msgSignature, timestamp, nonce, echostr)
wxcpt := wxbizjsonmsgcrypt.NewWXBizMsgCrypt(s.cfg.Qywx.Token, s.cfg.Qywx.AES_KEY, s.cfg.Qywx.CorpId, wxbizjsonmsgcrypt.JsonType)
echoStr, cryptErr := wxcpt.VerifyURL(msgSignature, timestamp, nonce, echostr)
if nil != cryptErr {
log.Errorf("%v", cryptErr)
return fmt.Errorf("%v", cryptErr)
}
fmt.Println("verifyUrl success echoStr", string(echoStr))
err = c.Send(echoStr)
return err
}
func getString(str, endstr string, start int, msg *string) int {
end := strings.Index(str, endstr)
*msg = str[start:end]
return end + len(endstr)
}
// CallbackDingtalkRobot 钉钉机器人回调
func (s *CallbackService) CallbackDingtalkRobot(c *fiber.Ctx) (err error) {
// 获取body中的参数
body := c.Request().Body()
var data chatbot.BotCallbackDataModel
if err := json.Unmarshal(body, &data); err != nil {
return fmt.Errorf("invalid body: %v", err)
}
fmt.Println(string(body))
// token 校验 ? token 好像没带?
// 通过机器人ID路由到不同能力
switch data.RobotCode {
case constants.GroupTemplateRobotIdIssueHandling:
// 问题处理群机器人
err := s.issueHandling(c, data)
if err != nil {
return fmt.Errorf("issueHandling failed: %v", err)
}
default:
// 其他机器人
return nil
}
// ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
// defer cancel()
// 统一初始化请求参数
// requireData, err := s.dingTalkBotBiz.InitRequire(ctx, &data)
// if err != nil {
// return fmt.Errorf("初始化请求参数失败: %v", err)
// }
// 这里需要再实现一套HTTP形式的回调用于处理钉钉群模板机器人的回调
// 主程等待处理结果
// resChan := make(chan string, 10)
// defer close(resChan)
return nil
}
// issueHandling 问题处理群机器人回调
// 能力1 通过[内容提取] 宏分析用户QA问题调出QA表单卡片
// 能力2 通过[QA收集] 宏,收集用户反馈,写入知识库
func (s *CallbackService) issueHandling(c *fiber.Ctx, data chatbot.BotCallbackDataModel) error {
// 宏解析
if strings.Contains(data.Text.Content, "[内容提取]") {
s.issueHandlingExtractContent(data)
}
if strings.Contains(data.Text.Content, "[QA收集]") {
s.issueHandlingCollectQA()
}
return nil
}
// 问题处理群机器人内容提取
func (s *CallbackService) issueHandlingExtractContent(data chatbot.BotCallbackDataModel) {
systemPrompt := `你是一个【问题与答案生成助手】。
你的职责是:
- 分析用户输入的内容
- 识别其中隐含或明确的问题
- 基于输入内容本身,生成对应的问题与答案
当用户输入为【多条群聊聊天记录】时:
- 结合问题主题,判断聊天记录中正在讨论或试图解决的问题
- 一个群聊中可能包含多个相互独立的问题,但它们都围绕着一个主题,尽可能总结为一个问题
- 若确实问题很独立,需要分别识别,对每个问题,整理出清晰、可复用的“问题描述”和“对应答案”
生成答案时的原则:
- 答案必须来源于聊天内容中已经给出的信息或共识
- 不要引入外部知识,不要使用聊天记录中真实人名或敏感信息,适当总结
- 若聊天中未形成明确答案,应明确标记为“暂无明确结论”
- 若存在多种不同观点,应分别列出,不要擅自合并或裁决
【JSON 输出原则】:
- 你的最终输出必须是**合法的 JSON**
- 不得输出任何额外解释性文字
- JSON 结构必须严格符合以下约定
JSON 结构约定:
{
"items": [
{
"question": "清晰、独立、可复用的问题描述",
"answer": "基于聊天内容整理出的答案;如无结论则为“暂无明确结论”",
"confidence": "high | medium | low"
}
]
}
字段说明:
- items问题与答案列表若未识别到有效问题则返回空数组 []
- question抽象后的标准问题表述不包含具体聊天语句
- answer整理后的答案不得引入聊天之外的信息
- confidence根据聊天中信息的一致性和明确程度给出判断
如果无法从输入中识别出任何有效问题,返回:
{ "items": [] }
问题主题:
%s
用户输入:
%s
`
prompt := fmt.Sprintf(systemPrompt, "紧急加款,提示当前账户为离线账户,请输入银行流水号", data.Text.Content)
fmt.Println("prompt:", prompt)
generateResp, err := s.ollamaClient.Generation(context.Background(), &api.GenerateRequest{
Model: s.cfg.Ollama.GenerateModel,
Prompt: prompt,
Stream: util.AnyToPoint(false),
})
if err != nil {
log.Errorf("问题提取失败: %v", err)
return
}
// 解析 JSON 响应
var resp struct {
Items []struct {
Question string `json:"question"`
Answer string `json:"answer"`
Confidence string `json:"confidence"`
} `json:"items"`
}
if err := json.Unmarshal([]byte(generateResp.Response), &resp); err != nil {
log.Errorf("解析 JSON 响应失败: %v", err)
return
}
cardContentTpl := "问题:%s \n答案%s"
var cardContentList []string
for _, item := range resp.Items {
cardContentList = append(cardContentList, fmt.Sprintf(cardContentTpl, item.Question, item.Answer))
}
cardContent := strings.Join(cardContentList, "\n\n")
// 调用卡片
// 构建卡片 OutTrackId
outTrackId := constants.BuildCardOutTrackId(data.ConversationId, data.RobotCode)
_, err = s.dingtalkCardClient.CreateAndDeliver(
dingtalk.AppKey{
AppKey: "ding5wwvnf9hxeyjau7t",
AppSecret: "FxXVlTzxrKXvJ8h-9uK0s5TjaBfOJSXumpmrHal-NmQAtku9wOPxcss0Af6WHoAK",
},
&card_1_0.CreateAndDeliverRequest{
CardTemplateId: tea.String("3a447814-6a3e-4a02-b48a-92c57b349d77.schema"),
OutTrackId: tea.String(outTrackId),
CallbackType: tea.String("HTTP"),
CallbackRouteKey: tea.String("gateway.dev.cdlsxd.cn-dingtalk-robot"),
CardData: &card_1_0.CreateAndDeliverRequestCardData{
CardParamMap: map[string]*string{
"title": tea.String("QA知识收集"),
"button_display": tea.String("true"),
"QA_details_now": tea.String(cardContent),
"textarea_display": tea.String("normal"),
"action_id": tea.String("collect_qa"),
"tenant_id": tea.String(constants.KnowledgeTenantIdDefault),
"_CARD_DEBUG_TOOL_ENTRY": tea.String("show"), // debug字段
},
},
ImGroupOpenSpaceModel: &card_1_0.CreateAndDeliverRequestImGroupOpenSpaceModel{
SupportForward: tea.Bool(false),
},
OpenSpaceId: tea.String("dtv1.card//im_group." + data.ConversationId),
ImGroupOpenDeliverModel: &card_1_0.CreateAndDeliverRequestImGroupOpenDeliverModel{
RobotCode: tea.String(constants.GroupTemplateRobotIdIssueHandling),
Recipients: []*string{
tea.String(data.SenderStaffId),
},
},
},
)
}
// 问题处理群机器人 QA 收集
func (s *CallbackService) issueHandlingCollectQA() {
}