feat: 新增知识库调用demo、段落输出demo

This commit is contained in:
fuzhongyun 2026-01-15 18:12:34 +08:00
parent 941827ce41
commit e8061799b8
2 changed files with 132 additions and 1 deletions

View File

@ -16,6 +16,7 @@ import (
"ai_scheduler/internal/tools"
"ai_scheduler/internal/tools/bbxt"
"ai_scheduler/utils"
"bufio"
"context"
"encoding/json"
"errors"
@ -52,6 +53,7 @@ func NewGroupConfigBiz(
conf *config.Config,
reportDailyCacheImpl *impl.ReportDailyCacheImpl,
rdb *utils.Rdb,
toolManager *tools.Manager,
) *GroupConfigBiz {
return &GroupConfigBiz{
botTools: tools.BootTools,
@ -61,6 +63,7 @@ func NewGroupConfigBiz(
conf: conf,
reportDailyCacheImpl: reportDailyCacheImpl,
rdb: rdb,
toolManager: toolManager,
}
}
@ -255,6 +258,9 @@ func (g *GroupConfigBiz) handleMatch(ctx context.Context, rec *entitys.Recognize
return g.handleReport(ctx, rec, pointTask, groupConfig)
case constants.TaskTypeCozeWorkflow:
return g.handleCozeWorkflow(ctx, rec, pointTask)
case constants.TaskTypeKnowle: // 知识库V2版本
return g.handleKnowledgeV2(ctx, rec, pointTask)
// return g.handleKnowledgeV3(ctx, rec, pointTask)
default:
return g.otherTask(ctx, rec)
}
@ -459,3 +465,128 @@ func (g *GroupConfigBiz) GetReportCache(ctx context.Context, day time.Time, tota
return nil
}
// handleKnowledgeV2 处理知识库V2版本
func (g *GroupConfigBiz) handleKnowledgeV2(ctx context.Context, rec *entitys.Recognize, pointTask *model.AiBotTool) (err error) {
req := l_request.Request{
Method: "POST",
Url: "http://127.0.0.1:9600/query",
Headers: map[string]string{
"Content-Type": "application/json",
"X-Tenant-ID": "default",
},
Json: map[string]interface{}{
"query": rec.UserContent.Text,
"mode": "naive",
"stream": true,
"think": false,
},
}
resp, err := req.SendNoParseResponse()
if err != nil {
return fmt.Errorf("请求失败err: %v", err)
}
defer resp.Body.Close()
err = g.connectAndReadSSE(resp, rec.Ch, true)
if err != nil {
return
}
return
}
// 连接 SSE 并读取数据
// event: thinking
// data: {"text": "1. 上下文检索中...\n"}
// event: answer
// data: {"text": "根据"}
func (g *GroupConfigBiz) connectAndReadSSE(resp *http.Response, channel chan entitys.Response, useParagraphMode bool) error {
scanner := bufio.NewScanner(resp.Body)
var buffer strings.Builder
for scanner.Scan() {
line := scanner.Text()
// 解析 data 行
if strings.HasPrefix(line, "data:") {
dataStr := strings.TrimSpace(strings.TrimPrefix(line, "data:"))
if dataStr == "" {
continue
}
var data struct {
Text string `json:"text"`
}
if err := json.Unmarshal([]byte(dataStr), &data); err != nil {
log.Errorf("SSE数据解析失败: %v body: %s", err, dataStr)
continue
}
if data.Text != "" {
if useParagraphMode {
// 存入缓冲区
buffer.WriteString(data.Text)
content := buffer.String()
// 检查是否有换行符,按段落输出
if idx := strings.LastIndex(content, "\n"); idx != -1 {
// 发送直到最后一个换行符的内容
toSend := content[:idx+1]
entitys.ResStream(channel, "", toSend)
// 重置缓冲区,保留剩余部分
remaining := content[idx+1:]
buffer.Reset()
buffer.WriteString(remaining)
}
} else {
// 逐字输出模式:直接发送
entitys.ResStream(channel, "", data.Text)
}
}
}
}
if err := scanner.Err(); err != nil {
return fmt.Errorf("读取SSE流中断: %w", err)
}
// 发送缓冲区剩余内容(仅在段落模式下需要)
if useParagraphMode && buffer.Len() > 0 {
entitys.ResStream(channel, "", buffer.String())
}
return nil
}
// handleKnowledgeV3 处理知识库V3同步版本
func (g *GroupConfigBiz) handleKnowledgeV3(ctx context.Context, rec *entitys.Recognize, pointTask *model.AiBotTool) (err error) {
req := l_request.Request{
Method: "POST",
Url: "http://127.0.0.1:9600/query",
Headers: map[string]string{
"Content-Type": "application/json",
"X-Tenant-ID": "default",
},
Json: map[string]interface{}{
"query": rec.UserContent.Text,
"mode": "naive",
"stream": false,
"think": false,
},
}
resp, err := req.Send()
if err != nil {
return fmt.Errorf("请求失败err: %v", err)
}
obj := make(map[string]string)
if err := json.Unmarshal([]byte(resp.Text), &obj); err != nil {
return fmt.Errorf("解析响应失败err: %v", err)
}
entitys.ResText(rec.Ch, "", obj["response"])
return
}

View File

@ -20,7 +20,7 @@ import (
)
const DefaultInterval = 100 * time.Millisecond
const HeardBeatX = 100
const HeardBeatX = 1000
type SendCardClient struct {
Auth *Auth