package biz import ( "ai_scheduler/internal/biz/tools_regis" "ai_scheduler/internal/config" "ai_scheduler/internal/data/constants" "ai_scheduler/internal/data/impl" "ai_scheduler/internal/data/model" "ai_scheduler/internal/domain/workflow/recharge" "ai_scheduler/internal/domain/workflow/runtime" "ai_scheduler/internal/entitys" "ai_scheduler/internal/pkg" "ai_scheduler/internal/pkg/l_request" "ai_scheduler/internal/pkg/lsxd" "ai_scheduler/internal/pkg/utils_oss" "ai_scheduler/internal/tools" "ai_scheduler/internal/tools/bbxt" "ai_scheduler/utils" "context" "database/sql" "encoding/json" "errors" "fmt" "io" "net/http" "strconv" "strings" "time" "github.com/coze-dev/coze-go" "github.com/gofiber/fiber/v2/log" "xorm.io/builder" ) // AiRouterBiz 智能路由服务 type GroupConfigBiz struct { botGroupConfigImpl *impl.BotGroupConfigImpl reportDailyCacheImpl *impl.ReportDailyCacheImpl ossClient *utils_oss.Client workflowManager *runtime.Registry botTools []model.AiBotTool toolManager *tools.Manager conf *config.Config rdb *utils.Rdb } // NewDingTalkBotBiz func NewGroupConfigBiz( tools *tools_regis.ToolRegis, ossClient *utils_oss.Client, botGroupConfigImpl *impl.BotGroupConfigImpl, workflowManager *runtime.Registry, conf *config.Config, reportDailyCacheImpl *impl.ReportDailyCacheImpl, rdb *utils.Rdb, ) *GroupConfigBiz { return &GroupConfigBiz{ botTools: tools.BootTools, ossClient: ossClient, botGroupConfigImpl: botGroupConfigImpl, workflowManager: workflowManager, conf: conf, reportDailyCacheImpl: reportDailyCacheImpl, rdb: rdb, } } func (g *GroupConfigBiz) GetGroupConfig(ctx context.Context, configId int32) (*model.AiBotGroupConfig, error) { var groupConfig model.AiBotGroupConfig cond := builder.NewCond() cond = cond.And(builder.Eq{"config_id": configId}) err := g.botGroupConfigImpl.GetOneBySearchToStrut(&cond, &groupConfig) return &groupConfig, err } func (g *GroupConfigBiz) GetReportLists(ctx context.Context, groupConfig *model.AiBotGroupConfig) (reports []*bbxt.ReportRes, err error) { if groupConfig == nil { return } var product []string if groupConfig.ProductName != "" { product = strings.Split(groupConfig.ProductName, ",") } reportList, err := bbxt.NewBbxtTools(g.conf, lsxd.NewLogin(g.conf, g.rdb)) if err != nil { return } reports, err = reportList.DailyReport(ctx, time.Now(), bbxt.DownWardValue, product, bbxt.SumFilter, g.ossClient, g.GetReportCache) if err != nil { return } //追加电商充值系统统计 - 返回统一使用[]*bbxt.ReportRes rechargeReports, err := g.rechargeDailyReport(ctx, time.Now(), nil, g.ossClient) if err != nil || len(rechargeReports) == 0 { return } reports = append(reports, rechargeReports...) return } // rechargeDailyReport 获取电商充值系统统计报告 func (g *GroupConfigBiz) rechargeDailyReport(ctx context.Context, now time.Time, productNames []string, ossClient *utils_oss.Client) (reports []*bbxt.ReportRes, err error) { defer func() { if err := recover(); err != nil { log.Error(err) } }() workflowId := recharge.WorkflowIDStatisticsOursProduct args := &runtime.WorkflowArgs{ Args: map[string]any{ "product_names": productNames, "now": now, }, } res, err := g.workflowManager.Invoke(ctx, workflowId, args) if err != nil { return } log.Infof("imgUrl: %s", res["url"].(string)) reports = []*bbxt.ReportRes{ { ReportName: "我们的商品统计(电商充值系统)", Title: res["title"].(string), Path: res["path"].(string), Url: res["url"].(string), Data: res["data"].([][]string), }, } return } func (g *GroupConfigBiz) handleReport(ctx context.Context, rec *entitys.Recognize, task *model.AiBotTool, groupConfig *model.AiBotGroupConfig) error { var configData entitys.ConfigDataReport err := json.Unmarshal([]byte(rec.Match.Parameters), &configData) if err != nil { return err } t, err := time.Parse(time.DateTime, configData.Time) if err != nil { t, err = time.Parse("2006-01-02 15:04", configData.Time) if err != nil { t, err = time.Parse("2006-01-02", configData.Time) if err != nil { log.Infof("时间识别失败:%s", configData.Time) entitys.ResText(rec.Ch, "", "时间识别失败了!可以给我一份比较具体的时间吗,例如“2025-12-31 12:00,抱歉抱歉😀") } } } rep, err := bbxt.NewBbxtTools(g.conf, lsxd.NewLogin(g.conf, g.rdb)) uploader := bbxt.NewUploader(g.ossClient, g.conf) if err != nil { return err } var reports []*bbxt.ReportRes switch rec.Match.Index { case "report_loss_analysis": repo, _err := rep.StatisOursProductLossSum(ctx, t, g.GetReportCache) if _err != nil { return _err } reports = append(reports, repo...) case "report_sales_analysis": product := strings.Split(groupConfig.ProductName, ",") repo, _err := rep.GetStatisOfficialProductSum(t, product) if _err != nil { return _err } reports = append(reports, repo) case "report_ranking_of_distributors": repo, _err := rep.GetProfitRankingSum(t) if _err != nil { return _err } reports = append(reports, repo) case "report_daily": product := strings.Split(groupConfig.ProductName, ",") repo, _err := rep.DailyReport(ctx, t, bbxt.DownWardValue, product, bbxt.SumFilter, nil, g.GetReportCache) if _err != nil { return _err } reports = append(reports, repo...) rechargeReport, _err := g.rechargeDailyReport(ctx, t, product, nil) if _err != nil || len(repo) == 0 { return _err } reports = append(reports, rechargeReport...) case "report_daily_recharge": product := strings.Split(groupConfig.ProductName, ",") repo, _err := g.rechargeDailyReport(ctx, t, product, nil) if _err != nil || len(repo) == 0 { return _err } reports = append(reports, repo...) case "report_sale_down_analysis": product := strings.Split(groupConfig.ProductName, ",") repo, _err := rep.GetStatisOfficialProductSumDecline(t, bbxt.DownWardValue, product, bbxt.SumFilter) if _err != nil { return _err } reports = append(reports, repo) default: return fmt.Errorf("未找到的报表:%s", rec.Match.Index) } for _, report := range reports { if report == nil { continue } err = uploader.Run(report) if err != nil { log.Error(err) continue } entitys.ResText(rec.Ch, "", fmt.Sprintf("%s![图片](%s)", report.Title, report.Url)) } return nil } func (g *GroupConfigBiz) handleMatch(ctx context.Context, rec *entitys.Recognize, groupConfig *model.AiBotGroupConfig) (err error) { if !rec.Match.IsMatch { if len(rec.Match.Chat) != 0 { entitys.ResText(rec.Ch, "", rec.Match.Chat) } else { entitys.ResText(rec.Ch, "", rec.Match.Reasoning) } return } var pointTask *model.AiBotTool for _, task := range g.botTools { if task.Index == rec.Match.Index { pointTask = &task break } } if pointTask == nil || pointTask.Index == "other" { return g.otherTask(ctx, rec) } switch constants.TaskType(pointTask.Type) { case constants.TaskTypeFunc: return g.handleTask(ctx, rec, pointTask) case constants.TaskTypeReport: return g.handleReport(ctx, rec, pointTask, groupConfig) case constants.TaskTypeCozeWorkflow: return g.handleCozeWorkflow(ctx, rec, pointTask) default: return g.otherTask(ctx, rec) } return } func (g *GroupConfigBiz) getGroupTools(ctx context.Context, groupConfig *model.AiBotGroupConfig) (tools []model.AiBotTool, err error) { if len(g.botTools) == 0 { return } var ( groupRegisTools = make(map[int]struct{}) ) if groupConfig.ToolList != "" { groupToolList := strings.Split(groupConfig.ToolList, ",") for _, tool := range groupToolList { if tool == "" { continue } num, _err := strconv.Atoi(tool) if _err != nil { continue } groupRegisTools[num] = struct{}{} } } for _, v := range g.botTools { if v.PermissionType == constants.PermissionTypeNone { tools = append(tools, v) continue } if _, ex := groupRegisTools[int(v.ToolID)]; ex { tools = append(tools, v) } } return } func (q *GroupConfigBiz) handleTask(ctx context.Context, rec *entitys.Recognize, task *model.AiBotTool) (err error) { var configData entitys.ConfigDataTool err = json.Unmarshal([]byte(task.Config), &configData) if err != nil { return } err = q.toolManager.ExecuteTool(ctx, configData.Tool, rec) if err != nil { return } return } func (g *GroupConfigBiz) handleCozeWorkflow(ctx context.Context, rec *entitys.Recognize, task *model.AiBotTool) (err error) { entitys.ResLoading(rec.Ch, task.Index, "正在执行工作流(coze)\n") customClient := &http.Client{ Timeout: time.Minute * 30, } authCli := coze.NewTokenAuth(g.conf.Coze.ApiSecret) cozeCli := coze.NewCozeAPI( authCli, coze.WithBaseURL(g.conf.Coze.BaseURL), coze.WithHttpClient(customClient), ) // 从参数中获取workflowID type requestParams struct { Request l_request.Request `json:"request"` } var config requestParams err = json.Unmarshal([]byte(task.Config), &config) if err != nil { return err } workflowId, ok := config.Request.Json["workflow_id"].(string) if !ok { return fmt.Errorf("workflow_id不能为空") } // 提取参数 var data map[string]interface{} err = json.Unmarshal([]byte(rec.Match.Parameters), &data) req := &coze.RunWorkflowsReq{ WorkflowID: workflowId, Parameters: data, // IsAsync: true, } stream := config.Request.Json["stream"].(bool) entitys.ResLog(rec.Ch, task.Index, "工作流执行中...") if stream { streamResp, err := cozeCli.Workflows.Runs.Stream(ctx, req) if err != nil { return err } g.handleCozeWorkflowEvents(ctx, streamResp, cozeCli, workflowId, rec.Ch, task.Index) } else { resp, err := cozeCli.Workflows.Runs.Create(ctx, req) if err != nil { return err } entitys.ResJson(rec.Ch, task.Index, resp.Data) } return } // handleCozeWorkflowEvents 处理 coze 工作流事件 func (g *GroupConfigBiz) handleCozeWorkflowEvents(ctx context.Context, resp coze.Stream[coze.WorkflowEvent], cozeCli coze.CozeAPI, workflowID string, ch chan entitys.Response, index string) { defer resp.Close() for { event, err := resp.Recv() if errors.Is(err, io.EOF) { fmt.Println("Stream finished") break } if err != nil { fmt.Println("Error receiving event:", err) break } switch event.Event { case coze.WorkflowEventTypeMessage: entitys.ResStream(ch, index, event.Message.Content) case coze.WorkflowEventTypeError: entitys.ResError(ch, index, fmt.Sprintf("工作流执行错误: %v", event.Error)) case coze.WorkflowEventTypeDone: entitys.ResEnd(ch, index, "工作流执行完成") case coze.WorkflowEventTypeInterrupt: resumeReq := &coze.ResumeRunWorkflowsReq{ WorkflowID: workflowID, EventID: event.Interrupt.InterruptData.EventID, ResumeData: "your data", InterruptType: event.Interrupt.InterruptData.Type, } newResp, err := cozeCli.Workflows.Runs.Resume(ctx, resumeReq) if err != nil { entitys.ResError(ch, index, fmt.Sprintf("工作流恢复执行错误: %s", err.Error())) return } entitys.ResLog(ch, index, "工作流恢复执行中...") g.handleCozeWorkflowEvents(ctx, newResp, cozeCli, workflowID, ch, index) } } fmt.Printf("done, log:%s\n", resp.Response().LogID()) } func (g *GroupConfigBiz) otherTask(ctx context.Context, rec *entitys.Recognize) (err error) { entitys.ResText(rec.Ch, "", rec.Match.Reasoning) return } func (g *GroupConfigBiz) GetReportCache(ctx context.Context, day time.Time, totalDetail []*bbxt.ResellerLoss, bbxtObj *bbxt.BbxtTools) error { var ResellerProductRelation map[int32]*bbxt.ResellerLossSumProductRelation dayDate := day.Format(time.DateOnly) cond := builder.NewCond() cond = cond.And(builder.Eq{"index": bbxt.IndexLossSumDetail}) cond = cond.And(builder.Eq{"key": dayDate}) var cache model.AiReportDailyCache err := g.reportDailyCacheImpl.GetOneBySearchToStrut(&cond, &cache) if err != nil { if errors.Is(sql.ErrNoRows, err) { ResellerProductRelation, err = bbxtObj.GetResellerLossMannagerAndLossReasonFromApi(ctx, totalDetail) if err != nil { return err } cache = model.AiReportDailyCache{ Key: dayDate, Index: bbxt.IndexLossSumDetail, Value: pkg.JsonStringIgonErr(ResellerProductRelation), } _, err = g.reportDailyCacheImpl.Add(&cache) } } else { err = json.Unmarshal([]byte(cache.Value), &ResellerProductRelation) } if err != nil { return err } for _, v := range totalDetail { if _, ex := ResellerProductRelation[v.ResellerId]; !ex { continue } v.Manager = ResellerProductRelation[v.ResellerId].AfterSaleName for _, vv := range v.ProductLoss { if _, ex := ResellerProductRelation[v.ResellerId].Products[vv.ProductId]; !ex { continue } vv.LossReason = ResellerProductRelation[v.ResellerId].Products[vv.ProductId].LossReason } } return nil }