From 7076d6a9187e1dd217f523a599bfd34a2072b418 Mon Sep 17 00:00:00 2001 From: renzhiyuan <465386466@qq.com> Date: Tue, 11 Nov 2025 09:26:18 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BB=93=E6=9E=84=E4=BC=98=E5=8C=96=E4=B8=8E?= =?UTF-8?q?=E5=8A=9F=E8=83=BD=E5=A2=9E=E5=BC=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README2.md | 1 + cmd/server/wire.go | 2 + internal/biz/do/ctx.go | 248 ++++++++++++++ internal/biz/do/handle.go | 254 ++++++++++++++ internal/biz/handle/handle.go | 7 - internal/biz/llm_service/ollama.go | 16 +- internal/biz/provider_set.go | 3 + internal/biz/router.go | 498 ++-------------------------- internal/data/constants/bot.go | 7 + internal/entitys/types.go | 2 +- internal/tools/zltx_order_detail.go | 2 +- internal/tools_bot/dtalk_bot.go | 53 ++- internal/tools_bot/provider_set.go | 9 + 13 files changed, 577 insertions(+), 525 deletions(-) create mode 100644 README2.md create mode 100644 internal/biz/do/ctx.go create mode 100644 internal/biz/do/handle.go create mode 100644 internal/data/constants/bot.go create mode 100644 internal/tools_bot/provider_set.go diff --git a/README2.md b/README2.md new file mode 100644 index 0000000..7e93b59 --- /dev/null +++ b/README2.md @@ -0,0 +1 @@ +您提供的信息中,**图片内容**是一张手机屏幕截图,显示的是一个支付成功的账单详情页面。我们先对这张图片中的信息进行解析和理解,再判断其问题类型、等级,并分析“系统一直在报错”是否与该图相关。\n\n---\n\n### 一、图片内容解析(从截图提取关键信息)\n\n#### 1. **基础信息**\n- **时间**:2024年8月16日 04:09:30\n- **金额**:-1166.00 元(支出)\n- **支付状态**:支付成功\n- **商户名称**:河南联合汇能清洁能源有限公司\n- **商品描述**:加气订单支付\n- **收单机构**:通联支付网络服务股份有限公司\n- **支付方式**:中国银行储蓄卡(9370)\n- **交易单号**:已部分遮挡(可识别为 `2024081611...`)\n- **商户单号**:可在支持的商户扫码退款\n\n#### 2. **附加信息**\n- 手机顶部显示时间为 **2:52**(可能是拍摄时间,非交易时间)\n- 图片右下角有水印:“河北新闻网 Hebnews.cn”\n\n---\n\n### 二、内容理解\n\n这是一笔在 **凌晨4点09分** 完成的 **天然气加气支付记录**,金额为 **1166元**,通过银行卡支付,由“河南联合汇能清洁能源有限公司”作为收款方,经“通联支付”清算完成。\n\n这笔交易是**真实发生的**,且系统显示“支付成功”,说明:\n- 支付流程已完成;\n- 资金已划出;\n- 用户账户已扣款。\n\n---\n\n### 三、问题分析:用户说“系统一直在报错”\n\n#### 1. 是否与该图片相关?\n\n目前来看:\n- 图片展示的是一个**成功的支付结果**,没有报错提示。\n- 界面干净,无异常弹窗或错误码。\n- 付款状态明确为“支付成功”。\n\n👉 **结论**:这张图片本身**不体现系统报错**,反而表明交易已经成功。\n\n但用户说“系统一直在报错”,可能意味着以下几种情况:\n\n| 可能情况 | 分析 |\n|--------|------|\n| ✅ 交易完成后,用户端仍持续收到“支付失败”或“订单异常”提醒 | 这属于**系统反馈异常**,即实际支付成功,但前端未同步状态 |\n| ❌ 用户误以为自己没支付成功,但实际上已扣款 | 需确认是否重复支付或存在混淆 |\n| 🔁 多次尝试支付时,系统反复报错,最终一次成功 | 可能是中间环节出现通信超时等临时故障 |\n| 🧩 支付系统与商户系统对接异常,导致订单状态不同步 | 比如商户后台未收到确认,用户端却显示成功 |\n\n---\n\n### 四、问题类型判断\n\n根据以上分析,该问题属于:\n\n> **【支付系统状态不同步问题】**\n\n具体分类如下:\n\n```json\n{\n \"problem_type\": \"支付系统状态不同步\",\n \"sub_type\": \"前端显示异常 / 后台数据未更新\",\n \"description\": \"用户支付成功后,系统仍持续报错或提示异常,导致用户体验混乱,怀疑交易未成功。\",\n \"possible_causes\": [\n \"支付平台与商户系统接口响应延迟或失败\",\n \"订单状态未及时回传至用户端\",\n \"缓存机制问题导致界面刷新不及时\",\n \"多端系统(APP/小程序/网页)状态不一致\"\n ]\n}\n```\n\n---\n\n### 五、问题等级划分(建议)\n\n| 等级 | 标准 | 本案例评估 |\n|------|------|------------|\n| ⚠️ **P2 - 中等优先级** | 影响用户体验,但不涉及资金损失或安全风险 | ✅ 符合 |\n| ❌ P1 - 高优先级 | 导致资金无法到账、重复扣款、欺诈风险等 | 不符合 |\n| ❌ P3 - 低优先级 | 仅界面轻微显示问题,不影响核心功能 | 不符合 |\n\n> **推荐等级:P2(中等)**\n\n理由:\n- 用户已支付成功,资金安全;\n- 但因系统持续报错,可能导致用户焦虑、重复操作、投诉;\n- 长期存在会影响品牌信誉和服务质量。\n\n---\n\n### 六、建议处理方案\n\n1. **前端排查**:\n - 检查是否有定时轮询接口返回错误;\n - 查看是否有旧订单状态未清除;\n - 强制刷新或手动查询订单状态。\n\n2. **后端对接检查**:\n - 确认支付回调通知是否正确发送到商户系统;\n - 检查商户订单状态更新逻辑是否健壮;\n - 是否存在异步任务失败未重试。\n\n3. **用户沟通**:\n - 提供支付凭证截图(如本图)作为证据;\n - 建议联系客服提供交易单号协助核实;\n - 明确告知“已成功支付,请勿重复操作”。\n\n---\n\n### 总结\n\n| 项目 | 内容 |\n|------|------|\n| 图片内容 | 成功支付1166元加气订单 |\n| 是否报错 | 图片中无报错,支付成功 |\n| 实际问题 | 用户感知“系统一直报错”,可能为状态不同步 |\n| 问题类型 | 支付系统状态不同步 |\n| 问题等级 | P2(中等优先级) |\n| 建议行动 | 排查前后端状态同步机制,加强用户沟通 |\n\n如有更多上下文(如用户多次尝试、支付失败记录、报错截图等),可进一步定位具体原因。 \ No newline at end of file diff --git a/cmd/server/wire.go b/cmd/server/wire.go index ba76674..f134ef9 100644 --- a/cmd/server/wire.go +++ b/cmd/server/wire.go @@ -11,6 +11,7 @@ import ( "ai_scheduler/internal/server" "ai_scheduler/internal/services" "ai_scheduler/internal/tools" + "ai_scheduler/internal/tools_bot" "ai_scheduler/utils" "github.com/gofiber/fiber/v2/log" @@ -27,6 +28,7 @@ func InitializeApp(*config.Config, log.AllLogger) (*server.Servers, func(), erro biz.ProviderSetBiz, impl.ProviderImpl, utils.ProviderUtils, + tools_bot.ProviderSetBotTools, )) } diff --git a/internal/biz/do/ctx.go b/internal/biz/do/ctx.go new file mode 100644 index 0000000..3915b83 --- /dev/null +++ b/internal/biz/do/ctx.go @@ -0,0 +1,248 @@ +package do + +import ( + "ai_scheduler/internal/config" + errors "ai_scheduler/internal/data/error" + "ai_scheduler/internal/data/impl" + "ai_scheduler/internal/data/model" + "ai_scheduler/internal/entitys" + "ai_scheduler/internal/pkg" + "ai_scheduler/tmpl/dataTemp" + "context" + "fmt" + "strconv" + "strings" + "time" + + "gitea.cdlsxd.cn/self-tools/l_request" + "github.com/gofiber/fiber/v2/log" + "github.com/gofiber/websocket/v2" + + "xorm.io/builder" +) + +type Do struct { + Ctx *entitys.RequireData + sessionImpl *impl.SessionImpl + sysImpl *impl.SysImpl + taskImpl *impl.TaskImpl + hisImpl *impl.ChatImpl + conf *config.Config +} + +func NewDo( + sysImpl *impl.SysImpl, + taskImpl *impl.TaskImpl, + hisImpl *impl.ChatImpl, + conf *config.Config, +) *Do { + return &Do{ + conf: conf, + sysImpl: sysImpl, + hisImpl: hisImpl, + taskImpl: taskImpl, + } +} + +func (d *Do) InitCtx(req *entitys.ChatSockRequest) *Do { + d.Ctx = &entitys.RequireData{ + Req: req, + } + return d +} + +func (d *Do) DataAuth(c *websocket.Conn) (err error) { + d.Ctx.Session = c.Query("x-session", "") + if len(d.Ctx.Session) == 0 { + err = errors.SessionNotFound + return + } + d.Ctx.Auth = c.Query("x-authorization", "") + if len(d.Ctx.Auth) == 0 { + err = errors.AuthNotFound + return + } + d.Ctx.Key = c.Query("x-app-key", "") + if len(d.Ctx.Key) == 0 { + err = errors.KeyNotFound + return + } + + d.Ctx.Sys, err = d.getSysInfo() + if err != nil { + err = errors.SysErr("获取系统信息失败:%v", err.Error()) + return + } + d.Ctx.Histories, err = d.getSessionChatHis() + if err != nil { + err = errors.SysErr("获取历史记录失败:%v", err.Error()) + return + } + + d.Ctx.Tasks, err = d.getTasks(d.Ctx.Sys.SysID) + if err != nil { + err = errors.SysErr("获取任务列表失败:%v", err.Error()) + return + } + if err = d.getImgData(); err != nil { + return + } + + return +} + +func (d *Do) MakeCh(c *websocket.Conn) (ctx context.Context, deferFunc func()) { + d.Ctx.Ch = make(chan entitys.Response) + ctx, cancel := context.WithCancel(context.Background()) + done := d.startMessageHandler(ctx, c, d.hisImpl) + return ctx, func() { + close(d.Ctx.Ch) //关闭主通道 + <-done // 等待消息处理完成 + cancel() + } +} + +func (d *Do) getImgData() (err error) { + if len(d.Ctx.Req.Img) == 0 { + return + } + imgs := strings.Split(d.Ctx.Req.Img, ",") + if len(imgs) == 0 { + return + } + if err = pkg.ValidateImageURL(d.Ctx.Req.Img); err != nil { + return err + } + for k, img := range imgs { + baseErr := "获取第" + strconv.Itoa(k+1) + "张图片失败:" + entitys.ResLog(d.Ctx.Ch, "", "获取第"+strconv.Itoa(k+1)+"张图片") + req := l_request.Request{ + Method: "GET", + Url: img, + } + res, _err := req.Send() + if _err != nil { + entitys.ResLog(d.Ctx.Ch, "", baseErr+_err.Error()) + continue + } + if _, ex := res.Headers["Content-Type"]; !ex { + entitys.ResLog(d.Ctx.Ch, "", baseErr+":Content-Type不存在") + continue + } + if !strings.HasPrefix(res.Headers["Content-Type"], "image/") { + entitys.ResLog(d.Ctx.Ch, "", baseErr+":expected image content") + continue + } + d.Ctx.ImgByte = append(d.Ctx.ImgByte, res.Content) + d.Ctx.ImgUrls = append(d.Ctx.ImgUrls, img) + } + + return +} + +func (d *Do) getRequireData() (err error) { + + return +} + +func (d *Do) getSysInfo() (sysInfo model.AiSy, err error) { + cond := builder.NewCond() + cond = cond.And(builder.Eq{"app_key": d.Ctx.Key}) + cond = cond.And(builder.IsNull{"delete_at"}) + cond = cond.And(builder.Eq{"status": 1}) + err = d.sysImpl.GetOneBySearchToStrut(&cond, &sysInfo) + return +} + +func (d *Do) getSessionChatHis() (his []model.AiChatHi, err error) { + + cond := builder.NewCond() + cond = cond.And(builder.Eq{"session_id": d.Ctx.Session}) + + _, err = d.hisImpl.GetListToStruct(&cond, &dataTemp.ReqPageBo{Limit: d.conf.Sys.SessionLen}, &his, "his_id desc") + + return +} + +func (d *Do) getTasks(sysId int32) (tasks []model.AiTask, err error) { + + cond := builder.NewCond() + cond = cond.And(builder.Eq{"sys_id": sysId}) + cond = cond.And(builder.IsNull{"delete_at"}) + cond = cond.And(builder.Eq{"status": 1}) + _, err = d.taskImpl.GetListToStruct(&cond, nil, &tasks, "") + + return +} + +// startMessageHandler 启动独立的消息处理协程 +func (d *Do) startMessageHandler( + ctx context.Context, + c *websocket.Conn, + hisImpl *impl.ChatImpl, +) <-chan struct{} { + done := make(chan struct{}) + var chat []string + + go func() { + defer func() { + close(done) + // 保存历史记录 + var his = []*model.AiChatHi{ + { + SessionID: d.Ctx.Session, + Role: "user", + Content: d.Ctx.Req.Text, // 用户输入在外部处理 + }, + } + if len(chat) > 0 { + his = append(his, &model.AiChatHi{ + SessionID: d.Ctx.Session, + Role: "assistant", + Content: strings.Join(chat, ""), + }) + } + for _, hi := range his { + hisImpl.Add(hi) + } + }() + + for v := range d.Ctx.Ch { // 自动检测通道关闭 + if err := sendWithTimeout(c, v, 2*time.Second); err != nil { + log.Errorf("Send error: %v", err) + return + } + if v.Type == entitys.ResponseText || v.Type == entitys.ResponseStream || v.Type == entitys.ResponseJson { + chat = append(chat, v.Content) + } + } + }() + + return done +} + +// 辅助函数:带超时的 WebSocket 发送 +func sendWithTimeout(c *websocket.Conn, data entitys.Response, timeout time.Duration) error { + sendCtx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + done := make(chan error, 1) + go func() { + defer func() { + if r := recover(); r != nil { + done <- fmt.Errorf("panic in MsgSend: %v", r) + } + close(done) + }() + // 如果 MsgSend 阻塞,这里会卡住 + err := entitys.MsgSend(c, data) + done <- err + }() + + select { + case err := <-done: + return err + case <-sendCtx.Done(): + return sendCtx.Err() + } +} diff --git a/internal/biz/do/handle.go b/internal/biz/do/handle.go new file mode 100644 index 0000000..a137f05 --- /dev/null +++ b/internal/biz/do/handle.go @@ -0,0 +1,254 @@ +package do + +import ( + "ai_scheduler/internal/biz/llm_service" + "ai_scheduler/internal/config" + "ai_scheduler/internal/data/constants" + errors "ai_scheduler/internal/data/error" + "ai_scheduler/internal/data/impl" + "ai_scheduler/internal/data/model" + "ai_scheduler/internal/entitys" + "ai_scheduler/internal/pkg" + "ai_scheduler/internal/pkg/l_request" + "ai_scheduler/internal/pkg/mapstructure" + "ai_scheduler/internal/tools" + "ai_scheduler/internal/tools_bot" + "context" + "encoding/json" + "fmt" + "strings" +) + +type Handle struct { + Ollama *llm_service.OllamaService + toolManager *tools.Manager + Bot *tools_bot.BotTool + conf *config.Config + sessionImpl *impl.SessionImpl +} + +func NewHandle( + Ollama *llm_service.OllamaService, + toolManager *tools.Manager, + conf *config.Config, + sessionImpl *impl.SessionImpl, + dTalkBot *tools_bot.BotTool, +) *Handle { + return &Handle{ + Ollama: Ollama, + toolManager: toolManager, + conf: conf, + sessionImpl: sessionImpl, + Bot: dTalkBot, + } +} + +func (r *Handle) Recognize(ctx context.Context, requireData *entitys.RequireData) (err error) { + entitys.ResLog(requireData.Ch, "", "准备意图识别") + + //意图识别 + recognizeMsg, err := r.Ollama.IntentRecognize(ctx, requireData) + if err != nil { + return + } + entitys.ResLog(requireData.Ch, "", recognizeMsg) + entitys.ResLog(requireData.Ch, "", "意图识别结束") + + var match entitys.Match + if err = json.Unmarshal([]byte(recognizeMsg), &match); err != nil { + err = errors.SysErr("数据结构错误:%v", err.Error()) + return + } + requireData.Match = &match + return +} + +func (r *Handle) handleOtherTask(ctx context.Context, requireData *entitys.RequireData) (err error) { + entitys.ResText(requireData.Ch, "", requireData.Match.Reasoning) + return +} + +func (r *Handle) HandleMatch(ctx context.Context, requireData *entitys.RequireData) (err error) { + + if !requireData.Match.IsMatch { + if len(requireData.Match.Chat) != 0 { + entitys.ResText(requireData.Ch, "", requireData.Match.Chat) + } else { + entitys.ResText(requireData.Ch, "", requireData.Match.Reasoning) + } + + return + } + var pointTask *model.AiTask + for _, task := range requireData.Tasks { + if task.Index == requireData.Match.Index { + pointTask = &task + break + } + } + + if pointTask == nil || pointTask.Index == "other" { + return r.OtherTask(ctx, requireData) + } + switch pointTask.Type { + case constants.TaskTypeApi: + return r.handleApiTask(ctx, requireData, pointTask) + case constants.TaskTypeFunc: + return r.handleTask(ctx, requireData, pointTask) + case constants.TaskTypeKnowle: + return r.handleKnowle(ctx, requireData, pointTask) + case constants.TaskTypeBot: + return r.handleBot(ctx, requireData, pointTask) + default: + return r.handleOtherTask(ctx, requireData) + } +} + +func (r *Handle) OtherTask(ctx context.Context, requireData *entitys.RequireData) (err error) { + entitys.ResText(requireData.Ch, "", requireData.Match.Reasoning) + return +} + +func (r *Handle) handleBot(ctx context.Context, requireData *entitys.RequireData, task *model.AiTask) (err error) { + var configData entitys.ConfigDataTool + err = json.Unmarshal([]byte(task.Config), &configData) + if err != nil { + return + } + err = r.Bot.Execute(ctx, configData.Tool, requireData) + if err != nil { + return + } + + return +} + +func (r *Handle) handleTask(ctx context.Context, requireData *entitys.RequireData, task *model.AiTask) (err error) { + var configData entitys.ConfigDataTool + err = json.Unmarshal([]byte(task.Config), &configData) + if err != nil { + return + } + + err = r.toolManager.ExecuteTool(ctx, configData.Tool, requireData) + if err != nil { + return + } + + return +} + +// 知识库 +func (r *Handle) handleKnowle(ctx context.Context, requireData *entitys.RequireData, task *model.AiTask) (err error) { + + var ( + configData entitys.ConfigDataTool + sessionIdKnowledge string + query string + host string + ) + err = json.Unmarshal([]byte(task.Config), &configData) + if err != nil { + return + } + + // 通过session 找到知识库session + var has bool + if len(requireData.Session) == 0 { + return errors.SessionNotFound + } + requireData.SessionInfo, has, err = r.sessionImpl.FindOne(r.sessionImpl.WithSessionId(requireData.Session)) + if err != nil { + return + } else if !has { + return errors.SessionNotFound + } + + // 找到知识库的host + { + tool, exists := r.toolManager.GetTool(configData.Tool) + if !exists { + return fmt.Errorf("tool not found: %s", configData.Tool) + } + + if knowledgeTool, ok := tool.(*tools.KnowledgeBaseTool); !ok { + return fmt.Errorf("未找到知识库Tool: %s", configData.Tool) + } else { + host = knowledgeTool.GetConfig().BaseURL + } + + } + + // 知识库的session为空,请求知识库获取, 并绑定 + if requireData.SessionInfo.KnowlegeSessionID == "" { + // 请求知识库 + if sessionIdKnowledge, err = tools.GetKnowledgeBaseSession(host, requireData.Sys.KnowlegeBaseID, requireData.Sys.KnowlegeTenantKey); err != nil { + return + } + + // 绑定知识库session,下次可以使用 + requireData.SessionInfo.KnowlegeSessionID = sessionIdKnowledge + if err = r.sessionImpl.Update(&requireData.SessionInfo, r.sessionImpl.WithSessionId(requireData.SessionInfo.SessionID)); err != nil { + return + } + } + + // 用户输入解析 + var ok bool + input := make(map[string]string) + if err = json.Unmarshal([]byte(requireData.Match.Parameters), &input); err != nil { + return + } + if query, ok = input["query"]; !ok { + return fmt.Errorf("query不能为空") + } + + requireData.KnowledgeConf = entitys.KnowledgeBaseRequest{ + Session: requireData.SessionInfo.KnowlegeSessionID, + ApiKey: requireData.Sys.KnowlegeTenantKey, + Query: query, + } + + // 执行工具 + err = r.toolManager.ExecuteTool(ctx, configData.Tool, requireData) + if err != nil { + return + } + + return +} + +func (r *Handle) handleApiTask(ctx context.Context, requireData *entitys.RequireData, task *model.AiTask) (err error) { + var ( + request l_request.Request + requestParam map[string]interface{} + ) + err = json.Unmarshal([]byte(requireData.Match.Parameters), &requestParam) + if err != nil { + return + } + request.Url = strings.ReplaceAll(task.Config, "${authorization}", requireData.Auth) + for k, v := range requestParam { + task.Config = strings.ReplaceAll(task.Config, "${"+k+"}", fmt.Sprintf("%v", v)) + } + var configData entitys.ConfigDataHttp + err = json.Unmarshal([]byte(task.Config), &configData) + if err != nil { + return + } + err = mapstructure.Decode(configData.Request, &request) + if err != nil { + return + } + if len(request.Url) == 0 { + err = errors.NewBusinessErr(422, "api地址获取失败") + return + } + res, err := request.Send() + if err != nil { + return + } + entitys.ResJson(requireData.Ch, "", pkg.JsonStringIgonErr(res.Text)) + + return +} diff --git a/internal/biz/handle/handle.go b/internal/biz/handle/handle.go index ec82d2b..391b6eb 100644 --- a/internal/biz/handle/handle.go +++ b/internal/biz/handle/handle.go @@ -2,9 +2,7 @@ package handle import ( "ai_scheduler/internal/config" - "ai_scheduler/internal/entitys" "ai_scheduler/internal/tools" - "context" ) type Handle struct { @@ -22,8 +20,3 @@ func NewHandle( conf: conf, } } - -func (r *Handle) OtherTask(ctx context.Context, requireData *entitys.RequireData) (err error) { - entitys.ResText(requireData.Ch, "", requireData.Match.Reasoning) - return -} diff --git a/internal/biz/llm_service/ollama.go b/internal/biz/llm_service/ollama.go index 5f4601b..070d884 100644 --- a/internal/biz/llm_service/ollama.go +++ b/internal/biz/llm_service/ollama.go @@ -10,6 +10,7 @@ import ( "encoding/json" "errors" "strings" + "time" "github.com/ollama/ollama/api" ) @@ -73,7 +74,7 @@ func (r *OllamaService) getPrompt(ctx context.Context, requireData *entitys.Requ Content: "### 聊天记录:" + pkg.JsonStringIgonErr(buildAssistant(requireData.Histories)), }, api.Message{ Role: "user", - Content: requireData.UserInput, + Content: requireData.Req.Text, //Images: requireData.ImgByte, }) @@ -86,7 +87,7 @@ func (r *OllamaService) getPrompt(ctx context.Context, requireData *entitys.Requ imgs.WriteString("### 用户上传图片解析内容:\n") prompt = append(prompt, api.Message{ - Role: "user", + Role: "image_desc", Content: "" + desc.Response, }) } @@ -100,11 +101,12 @@ func (r *OllamaService) RecognizeWithImg(ctx context.Context, requireData *entit entitys.ResLog(requireData.Ch, "", "图片识别中。。。") desc, err = r.client.Generation(ctx, &api.GenerateRequest{ - Model: r.config.Ollama.VlModel, - Stream: new(bool), - System: "提取出图片中的文字以及重要信息", - Prompt: requireData.UserInput, - Images: requireData.ImgByte, + Model: r.config.Ollama.VlModel, + Stream: new(bool), + System: "完整提取出图片中的文字以及重要信息,并对用户的需求进行预测", + Prompt: "完整提取出图片中的文字以及重要信息,并对用户的需求进行预测", //requireData.Req.Text, + Images: requireData.ImgByte, + KeepAlive: &api.Duration{Duration: 3600 * time.Second}, }) if err != nil { return diff --git a/internal/biz/provider_set.go b/internal/biz/provider_set.go index 16be159..c0e1d21 100644 --- a/internal/biz/provider_set.go +++ b/internal/biz/provider_set.go @@ -1,6 +1,7 @@ package biz import ( + "ai_scheduler/internal/biz/do" "ai_scheduler/internal/biz/handle" "ai_scheduler/internal/biz/llm_service" @@ -14,4 +15,6 @@ var ProviderSetBiz = wire.NewSet( llm_service.NewLangChainGenerate, llm_service.NewOllamaGenerate, handle.NewHandle, + do.NewDo, + do.NewHandle, ) diff --git a/internal/biz/router.go b/internal/biz/router.go index 89c88f0..1478ff5 100644 --- a/internal/biz/router.go +++ b/internal/biz/router.go @@ -1,507 +1,55 @@ package biz import ( - "ai_scheduler/internal/biz/handle" - "ai_scheduler/internal/biz/llm_service" - "ai_scheduler/internal/config" - "ai_scheduler/internal/data/constants" - errors "ai_scheduler/internal/data/error" - "ai_scheduler/internal/data/impl" - "ai_scheduler/internal/data/model" - "ai_scheduler/internal/entitys" - "ai_scheduler/internal/pkg" - "ai_scheduler/internal/pkg/mapstructure" - "ai_scheduler/internal/tools" - "ai_scheduler/tmpl/dataTemp" - "context" - "encoding/json" - "fmt" - "strconv" - "strings" - "time" + "ai_scheduler/internal/biz/do" + + "ai_scheduler/internal/entitys" - "gitea.cdlsxd.cn/self-tools/l_request" "github.com/gofiber/fiber/v2/log" "github.com/gofiber/websocket/v2" - "xorm.io/builder" ) // AiRouterBiz 智能路由服务 type AiRouterBiz struct { - toolManager *tools.Manager - sessionImpl *impl.SessionImpl - sysImpl *impl.SysImpl - taskImpl *impl.TaskImpl - hisImpl *impl.ChatImpl - conf *config.Config - rds *pkg.Rdb - langChain *llm_service.LangChainService - Ollama *llm_service.OllamaService - handle *handle.Handle + do *do.Do + handle *do.Handle } -// NewRouterService 创建路由服务 +// NewAiRouterBiz 创建路由服务 func NewAiRouterBiz( - sessionImpl *impl.SessionImpl, - sysImpl *impl.SysImpl, - taskImpl *impl.TaskImpl, - hisImpl *impl.ChatImpl, - conf *config.Config, - langChain *llm_service.LangChainService, - Ollama *llm_service.OllamaService, - handle *handle.Handle, + do *do.Do, + handle *do.Handle, ) *AiRouterBiz { return &AiRouterBiz{ - handle: handle, - sessionImpl: sessionImpl, - conf: conf, - sysImpl: sysImpl, - hisImpl: hisImpl, - taskImpl: taskImpl, - langChain: langChain, - Ollama: Ollama, + do: do, + handle: handle, } } func (r *AiRouterBiz) RouteWithSocket(c *websocket.Conn, req *entitys.ChatSockRequest) (err error) { //必要数据验证和获取 - var requireData entitys.RequireData - err = r.dataAuth(c, &requireData) - if err != nil { + dos := r.do.InitCtx(req) + + //初始化通道/上下文 + ctx, clearFunc := dos.MakeCh(c) + defer clearFunc() + + //数据验证和收集 + if err = dos.DataAuth(c); err != nil { + log.Errorf("数据验证和收集失败: %s", err.Error()) return } //初始化通道/上下文 - requireData.Ch = make(chan entitys.Response) - ctx, cancel := context.WithCancel(context.Background()) - // 启动独立的消息处理协程 - done := r.startMessageHandler(ctx, c, &requireData, req.Text) - defer func() { - close(requireData.Ch) //关闭主通道 - <-done // 等待消息处理完成 - cancel() - }() - - //获取图片信息 - err = r.getImgData(req.Img, &requireData) - if err != nil { - log.Errorf("GetImgData error: %v", err) + if err = r.handle.Recognize(ctx, dos.Ctx); err != nil { + log.Errorf("意图识别失败: %s", err.Error()) return } - //获取文字信息 - err = r.getRequireData(req.Text, &requireData) - if err != nil { - log.Errorf("SQL error: %v", err) - return - } - //意图识别 - err = r.recognize(ctx, &requireData) - if err != nil { - log.Errorf("LLM error: %v", err) - return - } //向下传递 - if err = r.handleMatch(ctx, &requireData); err != nil { - log.Errorf("Handle error: %v", err) - return - } - - return -} - -// startMessageHandler 启动独立的消息处理协程 -func (r *AiRouterBiz) startMessageHandler( - ctx context.Context, - c *websocket.Conn, - requireData *entitys.RequireData, - userInput string, -) <-chan struct{} { - done := make(chan struct{}) - var chat []string - - go func() { - defer func() { - close(done) - // 保存历史记录 - var his = []*model.AiChatHi{ - { - SessionID: requireData.Session, - Role: "user", - Content: userInput, // 用户输入在外部处理 - }, - } - if len(chat) > 0 { - his = append(his, &model.AiChatHi{ - SessionID: requireData.Session, - Role: "assistant", - Content: strings.Join(chat, ""), - }) - } - for _, hi := range his { - r.hisImpl.Add(hi) - } - }() - - for v := range requireData.Ch { // 自动检测通道关闭 - if err := sendWithTimeout(c, v, 2*time.Second); err != nil { - log.Errorf("Send error: %v", err) - return - } - if v.Type == entitys.ResponseText || v.Type == entitys.ResponseStream || v.Type == entitys.ResponseJson { - chat = append(chat, v.Content) - } - } - }() - - return done -} - -// 辅助函数:带超时的 WebSocket 发送 -func sendWithTimeout(c *websocket.Conn, data entitys.Response, timeout time.Duration) error { - sendCtx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - - done := make(chan error, 1) - go func() { - defer func() { - if r := recover(); r != nil { - done <- fmt.Errorf("panic in MsgSend: %v", r) - } - close(done) - }() - // 如果 MsgSend 阻塞,这里会卡住 - err := entitys.MsgSend(c, data) - done <- err - }() - - select { - case err := <-done: - return err - case <-sendCtx.Done(): - return sendCtx.Err() - } -} - -func (r *AiRouterBiz) getImgData(imgUrl string, requireData *entitys.RequireData) (err error) { - if len(imgUrl) == 0 { - return - } - imgs := strings.Split(imgUrl, ",") - if len(imgs) == 0 { - return - } - if err = pkg.ValidateImageURL(imgUrl); err != nil { - return err - } - for k, img := range imgs { - baseErr := "获取第" + strconv.Itoa(k+1) + "张图片失败:" - entitys.ResLog(requireData.Ch, "", "获取第"+strconv.Itoa(k+1)+"张图片") - req := l_request.Request{ - Method: "GET", - Url: img, - } - res, _err := req.Send() - if _err != nil { - entitys.ResLog(requireData.Ch, "", baseErr+_err.Error()) - continue - } - if _, ex := res.Headers["Content-Type"]; !ex { - entitys.ResLog(requireData.Ch, "", baseErr+":Content-Type不存在") - continue - } - if !strings.HasPrefix(res.Headers["Content-Type"], "image/") { - entitys.ResLog(requireData.Ch, "", baseErr+":expected image content") - continue - } - requireData.ImgByte = append(requireData.ImgByte, res.Content) - requireData.ImgUrls = append(requireData.ImgUrls, img) - } - - return -} - -func (r *AiRouterBiz) recognize(ctx context.Context, requireData *entitys.RequireData) (err error) { - entitys.ResLog(requireData.Ch, "", "准备意图识别") - - //意图识别 - recognizeMsg, err := r.Ollama.IntentRecognize(ctx, requireData) - if err != nil { - return - } - entitys.ResLog(requireData.Ch, "", recognizeMsg) - entitys.ResLog(requireData.Ch, "", "意图识别结束") - - var match entitys.Match - if err = json.Unmarshal([]byte(recognizeMsg), &match); err != nil { - err = errors.SysErr("数据结构错误:%v", err.Error()) - return - } - requireData.Match = &match - return -} - -func (r *AiRouterBiz) getRequireData(userInput string, requireData *entitys.RequireData) (err error) { - requireData.Sys, err = r.getSysInfo(requireData.Key) - if err != nil { - err = errors.SysErr("获取系统信息失败:%v", err.Error()) - return - } - requireData.Histories, err = r.getSessionChatHis(requireData.Session) - if err != nil { - err = errors.SysErr("获取历史记录失败:%v", err.Error()) - return - } - - requireData.Tasks, err = r.getTasks(requireData.Sys.SysID) - if err != nil { - err = errors.SysErr("获取任务列表失败:%v", err.Error()) - return - } - - requireData.UserInput = userInput - if len(requireData.UserInput) == 0 { - err = errors.SysErr("获取用户输入失败") - return - } - if len(requireData.UserInput) == 0 { - err = errors.SysErr("获取用户输入失败") + if err = r.handle.HandleMatch(ctx, dos.Ctx); err != nil { + log.Errorf("任务处理失败: %s", err.Error()) return } return } - -func (r *AiRouterBiz) dataAuth(c *websocket.Conn, requireData *entitys.RequireData) (err error) { - requireData.Session = c.Query("x-session", "") - if len(requireData.Session) == 0 { - err = errors.SessionNotFound - return - } - requireData.Auth = c.Query("x-authorization", "") - if len(requireData.Auth) == 0 { - err = errors.AuthNotFound - return - } - requireData.Key = c.Query("x-app-key", "") - if len(requireData.Key) == 0 { - err = errors.KeyNotFound - return - } - return -} - -func (r *AiRouterBiz) handleOtherTask(ctx context.Context, requireData *entitys.RequireData) (err error) { - entitys.ResText(requireData.Ch, "", requireData.Match.Reasoning) - return -} - -func (r *AiRouterBiz) handleMatch(ctx context.Context, requireData *entitys.RequireData) (err error) { - - if !requireData.Match.IsMatch { - if len(requireData.Match.Chat) != 0 { - entitys.ResText(requireData.Ch, "", requireData.Match.Reasoning) - } else { - entitys.ResText(requireData.Ch, "", requireData.Match.Reasoning) - } - - return - } - var pointTask *model.AiTask - for _, task := range requireData.Tasks { - if task.Index == requireData.Match.Index { - pointTask = &task - break - } - } - - if pointTask == nil || pointTask.Index == "other" { - return r.handle.OtherTask(ctx, requireData) - } - switch pointTask.Type { - case constants.TaskTypeApi: - return r.handleApiTask(ctx, requireData, pointTask) - case constants.TaskTypeFunc: - return r.handleTask(ctx, requireData, pointTask) - case constants.TaskTypeKnowle: - return r.handleKnowle(ctx, requireData, pointTask) - case constants.TaskTypeBot: - return r.handleBot(ctx, requireData, pointTask) - default: - return r.handleOtherTask(ctx, requireData) - } -} - -func (r *AiRouterBiz) handleBot(ctx context.Context, requireData *entitys.RequireData, task *model.AiTask) (err error) { - var configData entitys.ConfigDataTool - err = json.Unmarshal([]byte(task.Config), &configData) - if err != nil { - return - } - - err = r.toolManager.ExecuteTool(ctx, configData.Tool, requireData) - if err != nil { - return - } - - return -} - -func (r *AiRouterBiz) handleTask(ctx context.Context, requireData *entitys.RequireData, task *model.AiTask) (err error) { - var configData entitys.ConfigDataTool - err = json.Unmarshal([]byte(task.Config), &configData) - if err != nil { - return - } - - err = r.toolManager.ExecuteTool(ctx, configData.Tool, requireData) - if err != nil { - return - } - - return -} - -// 知识库 -func (r *AiRouterBiz) handleKnowle(ctx context.Context, requireData *entitys.RequireData, task *model.AiTask) (err error) { - - var ( - configData entitys.ConfigDataTool - sessionIdKnowledge string - query string - host string - ) - err = json.Unmarshal([]byte(task.Config), &configData) - if err != nil { - return - } - - // 通过session 找到知识库session - var has bool - if len(requireData.Session) == 0 { - return errors.SessionNotFound - } - requireData.SessionInfo, has, err = r.sessionImpl.FindOne(r.sessionImpl.WithSessionId(requireData.Session)) - if err != nil { - return - } else if !has { - return errors.SessionNotFound - } - - // 找到知识库的host - { - tool, exists := r.toolManager.GetTool(configData.Tool) - if !exists { - return fmt.Errorf("tool not found: %s", configData.Tool) - } - - if knowledgeTool, ok := tool.(*tools.KnowledgeBaseTool); !ok { - return fmt.Errorf("未找到知识库Tool: %s", configData.Tool) - } else { - host = knowledgeTool.GetConfig().BaseURL - } - - } - - // 知识库的session为空,请求知识库获取, 并绑定 - if requireData.SessionInfo.KnowlegeSessionID == "" { - // 请求知识库 - if sessionIdKnowledge, err = tools.GetKnowledgeBaseSession(host, requireData.Sys.KnowlegeBaseID, requireData.Sys.KnowlegeTenantKey); err != nil { - return - } - - // 绑定知识库session,下次可以使用 - requireData.SessionInfo.KnowlegeSessionID = sessionIdKnowledge - if err = r.sessionImpl.Update(&requireData.SessionInfo, r.sessionImpl.WithSessionId(requireData.SessionInfo.SessionID)); err != nil { - return - } - } - - // 用户输入解析 - var ok bool - input := make(map[string]string) - if err = json.Unmarshal([]byte(requireData.Match.Parameters), &input); err != nil { - return - } - if query, ok = input["query"]; !ok { - return fmt.Errorf("query不能为空") - } - - requireData.KnowledgeConf = entitys.KnowledgeBaseRequest{ - Session: requireData.SessionInfo.KnowlegeSessionID, - ApiKey: requireData.Sys.KnowlegeTenantKey, - Query: query, - } - - // 执行工具 - err = r.toolManager.ExecuteTool(ctx, configData.Tool, requireData) - if err != nil { - return - } - - return -} - -func (r *AiRouterBiz) handleApiTask(ctx context.Context, requireData *entitys.RequireData, task *model.AiTask) (err error) { - var ( - request l_request.Request - requestParam map[string]interface{} - ) - err = json.Unmarshal([]byte(requireData.Match.Parameters), &requestParam) - if err != nil { - return - } - request.Url = strings.ReplaceAll(task.Config, "${authorization}", requireData.Auth) - for k, v := range requestParam { - task.Config = strings.ReplaceAll(task.Config, "${"+k+"}", fmt.Sprintf("%v", v)) - } - var configData entitys.ConfigDataHttp - err = json.Unmarshal([]byte(task.Config), &configData) - if err != nil { - return - } - err = mapstructure.Decode(configData.Request, &request) - if err != nil { - return - } - if len(request.Url) == 0 { - err = errors.NewBusinessErr(422, "api地址获取失败") - return - } - res, err := request.Send() - if err != nil { - return - } - entitys.ResJson(requireData.Ch, "", pkg.JsonStringIgonErr(res.Text)) - - return -} - -func (r *AiRouterBiz) getSessionChatHis(sessionId string) (his []model.AiChatHi, err error) { - - cond := builder.NewCond() - cond = cond.And(builder.Eq{"session_id": sessionId}) - - _, err = r.hisImpl.GetListToStruct(&cond, &dataTemp.ReqPageBo{Limit: r.conf.Sys.SessionLen}, &his, "his_id desc") - - return -} - -func (r *AiRouterBiz) getSysInfo(appKey string) (sysInfo model.AiSy, err error) { - cond := builder.NewCond() - cond = cond.And(builder.Eq{"app_key": appKey}) - cond = cond.And(builder.IsNull{"delete_at"}) - cond = cond.And(builder.Eq{"status": 1}) - err = r.sysImpl.GetOneBySearchToStrut(&cond, &sysInfo) - return -} - -func (r *AiRouterBiz) getTasks(sysId int32) (tasks []model.AiTask, err error) { - - cond := builder.NewCond() - cond = cond.And(builder.Eq{"sys_id": sysId}) - cond = cond.And(builder.IsNull{"delete_at"}) - cond = cond.And(builder.Eq{"status": 1}) - _, err = r.taskImpl.GetListToStruct(&cond, nil, &tasks, "") - - return -} diff --git a/internal/data/constants/bot.go b/internal/data/constants/bot.go new file mode 100644 index 0000000..6dc6680 --- /dev/null +++ b/internal/data/constants/bot.go @@ -0,0 +1,7 @@ +package constants + +type BotTools string + +const ( + BotToolsBugOptimizationSubmit = "bug_optimization_submit" // 系统的bug/优化建议 +) diff --git a/internal/entitys/types.go b/internal/entitys/types.go index cc8f56d..1f97bd4 100644 --- a/internal/entitys/types.go +++ b/internal/entitys/types.go @@ -147,7 +147,7 @@ type RequireData struct { SessionInfo model.AiSession Tasks []model.AiTask Match *Match - UserInput string + Req *ChatSockRequest Auth string Ch chan Response KnowledgeConf KnowledgeBaseRequest diff --git a/internal/tools/zltx_order_detail.go b/internal/tools/zltx_order_detail.go index f73f2bd..f253f59 100644 --- a/internal/tools/zltx_order_detail.go +++ b/internal/tools/zltx_order_detail.go @@ -164,7 +164,7 @@ func (w *ZltxOrderDetailTool) getZltxOrderDetail(requireData *entitys.RequireDat }, { Role: "user", - Content: requireData.UserInput, + Content: requireData.Req.Text, }, }, w.Name(), "") if err != nil { diff --git a/internal/tools_bot/dtalk_bot.go b/internal/tools_bot/dtalk_bot.go index d9cecf1..9f312c9 100644 --- a/internal/tools_bot/dtalk_bot.go +++ b/internal/tools_bot/dtalk_bot.go @@ -2,53 +2,38 @@ package tools_bot import ( "ai_scheduler/internal/config" + "ai_scheduler/internal/data/constants" + errors "ai_scheduler/internal/data/error" "ai_scheduler/internal/entitys" "ai_scheduler/internal/pkg/utils_ollama" "context" + + "github.com/gofiber/fiber/v2/log" ) type BotTool struct { - config config.ToolConfig + config *config.Config llm *utils_ollama.Client } // NewBotTool 创建直连天下订单详情工具 -func NewBotTool(config config.ToolConfig, llm *utils_ollama.Client) *BotTool { +func NewBotTool(config *config.Config, llm *utils_ollama.Client) *BotTool { return &BotTool{config: config, llm: llm} } -// Name 返回工具名称 -func (w *BotTool) Name() string { - return "DingTalkBotTool" -} - -// Description 返回工具描述 -func (w *BotTool) Description() string { - return "钉钉机器人调用" -} - -// Definition 返回工具定义 -func (w *BotTool) Definition() entitys.ToolDefinition { - return entitys.ToolDefinition{ - Type: "function", - Function: entitys.FunctionDef{ - Name: w.Name(), - Description: w.Description(), - Parameters: map[string]interface{}{ - "type": "object", - "properties": map[string]interface{}{ - "number": map[string]interface{}{ - "type": "string", - "description": "订单编号/流水号", - }, - }, - "required": []string{"number"}, - }, - }, - } -} - // Execute 执行直连天下订单详情查询 -func (w *BotTool) Execute(ctx context.Context, requireData *entitys.RequireData) (err error) { +func (w *BotTool) Execute(ctx context.Context, toolName string, requireData *entitys.RequireData) (err error) { + switch toolName { + case constants.BotToolsBugOptimizationSubmit: + err = w.BugOptimizationSubmit(ctx, requireData) + default: + log.Errorf("未知的工具类型:%s", toolName) + err = errors.ParamErr("未知的工具类型:%s", toolName) + } + return +} + +func (w *BotTool) BugOptimizationSubmit(ctx context.Context, requireData *entitys.RequireData) (err error) { + return } diff --git a/internal/tools_bot/provider_set.go b/internal/tools_bot/provider_set.go new file mode 100644 index 0000000..7bcff12 --- /dev/null +++ b/internal/tools_bot/provider_set.go @@ -0,0 +1,9 @@ +package tools_bot + +import ( + "github.com/google/wire" +) + +var ProviderSetBotTools = wire.NewSet( + NewBotTool, +)