From e8061799b80eb4deb83971270d1c719bc710c459 Mon Sep 17 00:00:00 2001 From: fuzhongyun <15339891972@163.com> Date: Thu, 15 Jan 2026 18:12:34 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=96=B0=E5=A2=9E=E7=9F=A5=E8=AF=86?= =?UTF-8?q?=E5=BA=93=E8=B0=83=E7=94=A8demo=E3=80=81=E6=AE=B5=E8=90=BD?= =?UTF-8?q?=E8=BE=93=E5=87=BAdemo?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/biz/group_config.go | 131 ++++++++++++++++++++++ internal/biz/handle/dingtalk/send_card.go | 2 +- 2 files changed, 132 insertions(+), 1 deletion(-) diff --git a/internal/biz/group_config.go b/internal/biz/group_config.go index b68a29d..f58ba03 100644 --- a/internal/biz/group_config.go +++ b/internal/biz/group_config.go @@ -16,6 +16,7 @@ import ( "ai_scheduler/internal/tools" "ai_scheduler/internal/tools/bbxt" "ai_scheduler/utils" + "bufio" "context" "encoding/json" "errors" @@ -52,6 +53,7 @@ func NewGroupConfigBiz( conf *config.Config, reportDailyCacheImpl *impl.ReportDailyCacheImpl, rdb *utils.Rdb, + toolManager *tools.Manager, ) *GroupConfigBiz { return &GroupConfigBiz{ botTools: tools.BootTools, @@ -61,6 +63,7 @@ func NewGroupConfigBiz( conf: conf, reportDailyCacheImpl: reportDailyCacheImpl, rdb: rdb, + toolManager: toolManager, } } @@ -255,6 +258,9 @@ func (g *GroupConfigBiz) handleMatch(ctx context.Context, rec *entitys.Recognize return g.handleReport(ctx, rec, pointTask, groupConfig) case constants.TaskTypeCozeWorkflow: return g.handleCozeWorkflow(ctx, rec, pointTask) + case constants.TaskTypeKnowle: // 知识库V2版本 + return g.handleKnowledgeV2(ctx, rec, pointTask) + // return g.handleKnowledgeV3(ctx, rec, pointTask) default: return g.otherTask(ctx, rec) } @@ -459,3 +465,128 @@ func (g *GroupConfigBiz) GetReportCache(ctx context.Context, day time.Time, tota return nil } + +// handleKnowledgeV2 处理知识库V2版本 +func (g *GroupConfigBiz) handleKnowledgeV2(ctx context.Context, rec *entitys.Recognize, pointTask *model.AiBotTool) (err error) { + req := l_request.Request{ + Method: "POST", + Url: "http://127.0.0.1:9600/query", + Headers: map[string]string{ + "Content-Type": "application/json", + "X-Tenant-ID": "default", + }, + Json: map[string]interface{}{ + "query": rec.UserContent.Text, + "mode": "naive", + "stream": true, + "think": false, + }, + } + resp, err := req.SendNoParseResponse() + if err != nil { + return fmt.Errorf("请求失败,err: %v", err) + } + defer resp.Body.Close() + + err = g.connectAndReadSSE(resp, rec.Ch, true) + if err != nil { + return + } + + return +} + +// 连接 SSE 并读取数据 +// event: thinking +// data: {"text": "1. 上下文检索中...\n"} +// event: answer +// data: {"text": "根据"} +func (g *GroupConfigBiz) connectAndReadSSE(resp *http.Response, channel chan entitys.Response, useParagraphMode bool) error { + scanner := bufio.NewScanner(resp.Body) + var buffer strings.Builder + + for scanner.Scan() { + line := scanner.Text() + + // 解析 data 行 + if strings.HasPrefix(line, "data:") { + dataStr := strings.TrimSpace(strings.TrimPrefix(line, "data:")) + if dataStr == "" { + continue + } + + var data struct { + Text string `json:"text"` + } + if err := json.Unmarshal([]byte(dataStr), &data); err != nil { + log.Errorf("SSE数据解析失败: %v body: %s", err, dataStr) + continue + } + + if data.Text != "" { + if useParagraphMode { + // 存入缓冲区 + buffer.WriteString(data.Text) + content := buffer.String() + + // 检查是否有换行符,按段落输出 + if idx := strings.LastIndex(content, "\n"); idx != -1 { + // 发送直到最后一个换行符的内容 + toSend := content[:idx+1] + entitys.ResStream(channel, "", toSend) + + // 重置缓冲区,保留剩余部分 + remaining := content[idx+1:] + buffer.Reset() + buffer.WriteString(remaining) + } + } else { + // 逐字输出模式:直接发送 + entitys.ResStream(channel, "", data.Text) + } + } + } + } + + if err := scanner.Err(); err != nil { + return fmt.Errorf("读取SSE流中断: %w", err) + } + + // 发送缓冲区剩余内容(仅在段落模式下需要) + if useParagraphMode && buffer.Len() > 0 { + entitys.ResStream(channel, "", buffer.String()) + } + + return nil +} + +// handleKnowledgeV3 处理知识库V3同步版本 +func (g *GroupConfigBiz) handleKnowledgeV3(ctx context.Context, rec *entitys.Recognize, pointTask *model.AiBotTool) (err error) { + req := l_request.Request{ + Method: "POST", + Url: "http://127.0.0.1:9600/query", + Headers: map[string]string{ + "Content-Type": "application/json", + "X-Tenant-ID": "default", + }, + Json: map[string]interface{}{ + "query": rec.UserContent.Text, + "mode": "naive", + "stream": false, + "think": false, + }, + } + resp, err := req.Send() + if err != nil { + return fmt.Errorf("请求失败,err: %v", err) + } + + obj := make(map[string]string) + if err := json.Unmarshal([]byte(resp.Text), &obj); err != nil { + return fmt.Errorf("解析响应失败,err: %v", err) + } + + entitys.ResText(rec.Ch, "", obj["response"]) + + return +} diff --git a/internal/biz/handle/dingtalk/send_card.go b/internal/biz/handle/dingtalk/send_card.go index d2e5cb7..4660f33 100644 --- a/internal/biz/handle/dingtalk/send_card.go +++ b/internal/biz/handle/dingtalk/send_card.go @@ -20,7 +20,7 @@ import ( ) const DefaultInterval = 100 * time.Millisecond -const HeardBeatX = 100 +const HeardBeatX = 1000 type SendCardClient struct { Auth *Auth