diff --git a/cmd/server/wire.go b/cmd/server/wire.go index 8357aae..ed710a3 100644 --- a/cmd/server/wire.go +++ b/cmd/server/wire.go @@ -6,6 +6,7 @@ package main import ( "ai_scheduler/internal/biz" "ai_scheduler/internal/biz/handle/dingtalk" + "ai_scheduler/internal/biz/handle/qywx" "ai_scheduler/internal/biz/tools_regis" "ai_scheduler/internal/config" "ai_scheduler/internal/data/impl" @@ -36,6 +37,7 @@ func InitializeApp(*config.Config, log.AllLogger) (*server.Servers, func(), erro impl.ProviderImpl, utils.ProviderUtils, dingtalk.ProviderSetDingTalk, + qywx.ProviderSetQywx, tools_regis.ProviderToolsRegis, // tool_callback.ProviderSetCallBackTools, component.ProviderSet, diff --git a/config/config.yaml b/config/config.yaml index b64cfc6..31255f7 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -152,6 +152,17 @@ dingtalk: bot_group_id: bbxt: 28 +qywx: + corp_id: "ww48151f694fb8ec67" + app_secret: "uYqtdwdtdH4Uv_P4is2AChuGzBCoB6cQDyRvpbW0Vmk" + token: "Jdukry6" + aes_key: "4VLH47qRGUogc2d3QLWuUhvJlk8Y0YuRjXzeBquBq8B" + init_account: "les." + chat_id_len: 16 + default_config_id: 1 + bot_group_id: + bbxt: 23 + default_prompt: img_recognize: system_prompt: diff --git a/config/config_test.yaml b/config/config_test.yaml index 71aada4..97903aa 100644 --- a/config/config_test.yaml +++ b/config/config_test.yaml @@ -151,6 +151,22 @@ dingtalk: bot_group_id: bbxt: 23 +qywx: +# corp_id: "ww48151f694fb8ec67" +# app_secret: "uYqtdwdtdH4Uv_P4is2AChuGzBCoB6cQDyRvpbW0Vmk" +# token: "uYqtdwdtdH4Uv_P4is2AChuGzBCoB6cQDyRvpbW0Vmk" +# aes_key: "4VLH47qRGUogc2d3QLWuUhvJlk8Y0YuRjXzeBquBq8B" + + corp_id: "wwabfd0cec7171e769" + app_secret: "uYqtdwdtdH4Uv_P4is2AChuGzBCoB6cQDyRvpbW0Vmk" + token: "gY1AGR3mjBhzy" + aes_key: "g8VGfQEqluUhoKOlyjmmll8Q9C5tVFUTX5T2qkmI9Sv" + init_account: "les." + chat_id_len: 16 + default_config_id: 1 + bot_group_id: + bbxt: 23 + default_prompt: img_recognize: system_prompt: diff --git a/internal/biz/ding_talk_bot.go b/internal/biz/ding_talk_bot.go index 726249f..1ad715c 100644 --- a/internal/biz/ding_talk_bot.go +++ b/internal/biz/ding_talk_bot.go @@ -3,54 +3,47 @@ package biz import ( "ai_scheduler/internal/biz/do" "ai_scheduler/internal/biz/handle/dingtalk" - "ai_scheduler/internal/biz/tools_regis" + "ai_scheduler/internal/biz/handle/qywx" + "ai_scheduler/internal/config" "ai_scheduler/internal/data/constants" "ai_scheduler/internal/data/impl" "ai_scheduler/internal/data/model" - "ai_scheduler/internal/domain/workflow/recharge" - "ai_scheduler/internal/domain/workflow/runtime" "ai_scheduler/internal/entitys" - "ai_scheduler/internal/pkg/l_request" - "ai_scheduler/internal/pkg/utils_oss" "ai_scheduler/internal/tools" "ai_scheduler/internal/tools/bbxt" "ai_scheduler/tmpl/dataTemp" - "io" - "net/http" - "strconv" - "time" - "unicode" - - "ai_scheduler/internal/config" "context" "database/sql" "encoding/json" "errors" "fmt" "strings" + "time" + "unicode" "gitea.cdlsxd.cn/self-tools/l-dingtalk-stream-sdk-go/chatbot" - "github.com/coze-dev/coze-go" + "github.com/gofiber/fiber/v2/log" "xorm.io/builder" ) // AiRouterBiz 智能路由服务 type DingTalkBotBiz struct { - do *do.Do - handle *do.Handle - botConfigImpl *impl.BotConfigImpl - replier *chatbot.ChatbotReplier - log log.Logger - dingTalkUser *dingtalk.User - botTools []model.AiBotTool - botGroupImpl *impl.BotGroupImpl - toolManager *tools.Manager - chatHis *impl.BotChatHisImpl - conf *config.Config - cardSend *dingtalk.SendCardClient - ossClient *utils_oss.Client - workflowManager *runtime.Registry + do *do.Do + handle *do.Handle + botConfigImpl *impl.BotConfigImpl + replier *chatbot.ChatbotReplier + log log.Logger + dingTalkUser *dingtalk.User + botGroupImpl *impl.BotGroupImpl + botGroupConfigImpl *impl.BotGroupConfigImpl + botGroupQywxImpl *impl.BotGroupQywxImpl + toolManager *tools.Manager + chatHis *impl.BotChatHisImpl + conf *config.Config + cardSend *dingtalk.SendCardClient + qywxGroupHandle *qywx.Group + groupConfigBiz *GroupConfigBiz } // NewDingTalkBotBiz @@ -60,28 +53,24 @@ func NewDingTalkBotBiz( botConfigImpl *impl.BotConfigImpl, botGroupImpl *impl.BotGroupImpl, dingTalkUser *dingtalk.User, - tools *tools_regis.ToolRegis, chatHis *impl.BotChatHisImpl, toolManager *tools.Manager, conf *config.Config, cardSend *dingtalk.SendCardClient, - ossClient *utils_oss.Client, - workflowManager *runtime.Registry, + groupConfigBiz *GroupConfigBiz, ) *DingTalkBotBiz { return &DingTalkBotBiz{ - do: do, - handle: handle, - botConfigImpl: botConfigImpl, - replier: chatbot.NewChatbotReplier(), - dingTalkUser: dingTalkUser, - botTools: tools.BootTools, - botGroupImpl: botGroupImpl, - toolManager: toolManager, - chatHis: chatHis, - conf: conf, - cardSend: cardSend, - ossClient: ossClient, - workflowManager: workflowManager, + do: do, + handle: handle, + botConfigImpl: botConfigImpl, + replier: chatbot.NewChatbotReplier(), + dingTalkUser: dingTalkUser, + groupConfigBiz: groupConfigBiz, + botGroupImpl: botGroupImpl, + toolManager: toolManager, + chatHis: chatHis, + conf: conf, + cardSend: cardSend, } } @@ -148,8 +137,15 @@ func (d *DingTalkBotBiz) handleSingleChat(ctx context.Context, requireData *enti func (d *DingTalkBotBiz) handleGroupChat(ctx context.Context, requireData *entitys.RequireDataDingTalkBot) (err error) { group, err := d.initGroup(ctx, requireData.Req.ConversationId, requireData.Req.ConversationTitle, requireData.Req.RobotCode) + if err != nil { + return + } + groupConfig, err := d.groupConfigBiz.GetGroupConfig(ctx, group.ConfigID) + if err != nil { + return + } //宏 - err, isFinal := d.Macro(ctx, requireData, group) + err, isFinal := d.Macro(ctx, requireData, groupConfig) if err != nil { return } @@ -157,7 +153,7 @@ func (d *DingTalkBotBiz) handleGroupChat(ctx context.Context, requireData *entit return } requireData.ID = group.GroupID - groupTools, err := d.getGroupTools(ctx, group) + groupTools, err := d.groupConfigBiz.getGroupTools(ctx, groupConfig) if err != nil { return } @@ -166,10 +162,10 @@ func (d *DingTalkBotBiz) handleGroupChat(ctx context.Context, requireData *entit return } - return d.handleMatch(ctx, rec, group) + return d.groupConfigBiz.handleMatch(ctx, rec, groupConfig) } -func (d *DingTalkBotBiz) Macro(ctx context.Context, requireData *entitys.RequireDataDingTalkBot, group *model.AiBotGroup) (err error, isFinish bool) { +func (d *DingTalkBotBiz) Macro(ctx context.Context, requireData *entitys.RequireDataDingTalkBot, groupConfig *model.AiBotGroupConfig) (err error, isFinish bool) { content := processString(requireData.Req.Text.Content) if strings.Contains(content, "[利润同比报表]商品修改:") { @@ -177,10 +173,10 @@ func (d *DingTalkBotBiz) Macro(ctx context.Context, requireData *entitys.Require if parts := strings.SplitN(content, ":", 2); len(parts) == 2 { itemInfo := strings.TrimSpace(parts[1]) log.Infof("商品修改信息: %s", itemInfo) - group.ProductName = itemInfo + groupConfig.ProductName = itemInfo cond := builder.NewCond() - cond = cond.And(builder.Eq{"group_id": group.GroupID}) - err = d.botGroupImpl.UpdateByCond(&cond, group) + cond = cond.And(builder.Eq{"config_id": groupConfig.ConfigID}) + err = d.botGroupImpl.UpdateByCond(&cond, groupConfig) if err != nil { entitys.ResText(requireData.Ch, "", fmt.Sprintf("修改失败:%v", err)) } @@ -192,10 +188,10 @@ func (d *DingTalkBotBiz) Macro(ctx context.Context, requireData *entitys.Require if strings.Contains(content, "[利润同比报表]商品列表") { // 提取冒号后的内容 - if len(group.ProductName) == 0 { + if len(groupConfig.ProductName) == 0 { entitys.ResText(requireData.Ch, "", "暂未设置") } else { - entitys.ResText(requireData.Ch, "", group.ProductName) + entitys.ResText(requireData.Ch, "", groupConfig.ProductName) isFinish = true } return @@ -236,7 +232,6 @@ func (d *DingTalkBotBiz) initGroup(ctx context.Context, conversationId string, c ConversationID: conversationId, Title: conversationTitle, RobotCode: robotCode, - ToolList: "", } //如果不存在则创建 _, err = d.botGroupImpl.Add(group) @@ -244,38 +239,6 @@ func (d *DingTalkBotBiz) initGroup(ctx context.Context, conversationId string, c return } -func (d *DingTalkBotBiz) getGroupTools(ctx context.Context, group *model.AiBotGroup) (tools []model.AiBotTool, err error) { - if len(d.botTools) == 0 { - return - } - var ( - groupRegisTools = make(map[int]struct{}) - ) - if group.ToolList != "" { - groupToolList := strings.Split(group.ToolList, ",") - for _, tool := range groupToolList { - if tool == "" { - continue - } - num, _err := strconv.Atoi(tool) - if _err != nil { - continue - } - groupRegisTools[num] = struct{}{} - } - } - - for _, v := range d.botTools { - if v.PermissionType == constants.PermissionTypeNone { - tools = append(tools, v) - continue - } - if _, ex := groupRegisTools[int(v.ToolID)]; ex { - tools = append(tools, v) - } - } - return -} func (d *DingTalkBotBiz) recognize(ctx context.Context, requireData *entitys.RequireDataDingTalkBot, tools []model.AiBotTool) (rec *entitys.Recognize, err error) { userContent, err := d.getUserContent(requireData.Req.Msgtype, requireData.Req.Text.Content) @@ -355,269 +318,6 @@ func (d *DingTalkBotBiz) getUserContent(msgType string, msgContent interface{}) return } -func (d *DingTalkBotBiz) handleMatch(ctx context.Context, rec *entitys.Recognize, group *model.AiBotGroup) (err error) { - - if !rec.Match.IsMatch { - if len(rec.Match.Chat) != 0 { - entitys.ResText(rec.Ch, "", rec.Match.Chat) - } else { - entitys.ResText(rec.Ch, "", rec.Match.Reasoning) - } - return - } - var pointTask *model.AiBotTool - for _, task := range d.botTools { - if task.Index == rec.Match.Index { - pointTask = &task - - break - } - } - - if pointTask == nil || pointTask.Index == "other" { - return d.otherTask(ctx, rec) - } - switch constants.TaskType(pointTask.Type) { - case constants.TaskTypeFunc: - return d.handleTask(ctx, rec, pointTask) - case constants.TaskTypeReport: - return d.handleReport(ctx, rec, pointTask, group) - case constants.TaskTypeCozeWorkflow: - return d.handleCozeWorkflow(ctx, rec, pointTask) - default: - return d.otherTask(ctx, rec) - } - return -} - -func (d *DingTalkBotBiz) handleCozeWorkflow(ctx context.Context, rec *entitys.Recognize, task *model.AiBotTool) (err error) { - entitys.ResLoading(rec.Ch, task.Index, "正在执行工作流(coze)\n") - - customClient := &http.Client{ - Timeout: time.Minute * 30, - } - - authCli := coze.NewTokenAuth(d.conf.Coze.ApiSecret) - cozeCli := coze.NewCozeAPI( - authCli, - coze.WithBaseURL(d.conf.Coze.BaseURL), - coze.WithHttpClient(customClient), - ) - - // 从参数中获取workflowID - type requestParams struct { - Request l_request.Request `json:"request"` - } - var config requestParams - err = json.Unmarshal([]byte(task.Config), &config) - if err != nil { - return err - } - workflowId, ok := config.Request.Json["workflow_id"].(string) - if !ok { - return fmt.Errorf("workflow_id不能为空") - } - // 提取参数 - var data map[string]interface{} - err = json.Unmarshal([]byte(rec.Match.Parameters), &data) - - req := &coze.RunWorkflowsReq{ - WorkflowID: workflowId, - Parameters: data, - // IsAsync: true, - } - - stream := config.Request.Json["stream"].(bool) - - entitys.ResLog(rec.Ch, task.Index, "工作流执行中...") - - if stream { - streamResp, err := cozeCli.Workflows.Runs.Stream(ctx, req) - if err != nil { - return err - } - - handleCozeWorkflowEvents(ctx, streamResp, cozeCli, workflowId, rec.Ch, task.Index) - } else { - resp, err := cozeCli.Workflows.Runs.Create(ctx, req) - if err != nil { - return err - } - - entitys.ResJson(rec.Ch, task.Index, resp.Data) - } - - return -} - -// handleCozeWorkflowEvents 处理 coze 工作流事件 -func handleCozeWorkflowEvents(ctx context.Context, resp coze.Stream[coze.WorkflowEvent], cozeCli coze.CozeAPI, workflowID string, ch chan entitys.Response, index string) { - defer resp.Close() - for { - event, err := resp.Recv() - if errors.Is(err, io.EOF) { - fmt.Println("Stream finished") - break - } - if err != nil { - fmt.Println("Error receiving event:", err) - break - } - - switch event.Event { - case coze.WorkflowEventTypeMessage: - entitys.ResStream(ch, index, event.Message.Content) - case coze.WorkflowEventTypeError: - entitys.ResError(ch, index, fmt.Sprintf("工作流执行错误: %s", event.Error)) - case coze.WorkflowEventTypeDone: - entitys.ResEnd(ch, index, "工作流执行完成") - case coze.WorkflowEventTypeInterrupt: - resumeReq := &coze.ResumeRunWorkflowsReq{ - WorkflowID: workflowID, - EventID: event.Interrupt.InterruptData.EventID, - ResumeData: "your data", - InterruptType: event.Interrupt.InterruptData.Type, - } - newResp, err := cozeCli.Workflows.Runs.Resume(ctx, resumeReq) - if err != nil { - entitys.ResError(ch, index, fmt.Sprintf("工作流恢复执行错误: %s", err.Error())) - return - } - entitys.ResLog(ch, index, "工作流恢复执行中...") - handleCozeWorkflowEvents(ctx, newResp, cozeCli, workflowID, ch, index) - } - } - fmt.Printf("done, log:%s\n", resp.Response().LogID()) -} - -func (d *DingTalkBotBiz) handleReport(ctx context.Context, rec *entitys.Recognize, task *model.AiBotTool, group *model.AiBotGroup) error { - var configData entitys.ConfigDataReport - err := json.Unmarshal([]byte(rec.Match.Parameters), &configData) - if err != nil { - return err - } - t, err := time.Parse(time.DateTime, configData.Time) - if err != nil { - t, err = time.Parse("2006-01-02 15:04", configData.Time) - if err != nil { - t, err = time.Parse("2006-01-02", configData.Time) - if err != nil { - log.Infof("时间识别失败:%s", configData.Time) - entitys.ResText(rec.Ch, "", "时间识别失败了!可以给我一份比较具体的时间吗,例如“2025-12-31 12:00,抱歉抱歉😀") - } - } - } - rep, err := bbxt.NewBbxtTools() - uploader := bbxt.NewUploader(d.ossClient) - if err != nil { - return err - } - var reports []*bbxt.ReportRes - switch rec.Match.Index { - case "report_loss_analysis": - repo, _err := rep.StatisOursProductLossSum(t) - if _err != nil { - return _err - } - reports = append(reports, repo...) - case "report_sales_analysis": - product := strings.Split(group.ProductName, ",") - repo, _err := rep.GetStatisOfficialProductSum(t, product) - if _err != nil { - return _err - } - reports = append(reports, repo) - - case "report_ranking_of_distributors": - repo, _err := rep.GetProfitRankingSum(t) - if _err != nil { - return _err - } - reports = append(reports, repo) - case "report_daily": - product := strings.Split(group.ProductName, ",") - repo, _err := rep.DailyReport(t, bbxt.DownWardValue, product, bbxt.SumFilter, nil) - if _err != nil { - return _err - } - reports = append(reports, repo...) - case "report_daily_recharge": - product := strings.Split(group.ProductName, ",") - repo, _err := d.rechargeDailyReport(ctx, t, product, nil) - if _err != nil || len(repo) == 0 { - return _err - } - reports = append(reports, repo...) - case "report_sale_down_analysis": - product := strings.Split(group.ProductName, ",") - repo, _err := rep.GetStatisOfficialProductSumDecline(t, bbxt.DownWardValue, product, bbxt.SumFilter) - if _err != nil { - return _err - } - reports = append(reports, repo) - default: - return fmt.Errorf("未找到的报表:%s", rec.Match.Index) - } - - for _, report := range reports { - err = uploader.Run(report) - if err != nil { - log.Error(err) - continue - } - - entitys.ResText(rec.Ch, "", fmt.Sprintf("%s![图片](%s)", report.Title, report.Url)) - //rec.Ch <- report.Title - //reportChan <- fmt.Sprintf("![图片](%s)", report.Url) - //err = d.SendReport(ctx, group, report) - //if err != nil { - // log.Error(err) - // continue - //} - } - return nil -} - -func (d *DingTalkBotBiz) handleTask(ctx context.Context, rec *entitys.Recognize, task *model.AiBotTool) (err error) { - var configData entitys.ConfigDataTool - err = json.Unmarshal([]byte(task.Config), &configData) - if err != nil { - return - } - - err = d.toolManager.ExecuteTool(ctx, configData.Tool, rec) - if err != nil { - return - } - - return -} - -func (d *DingTalkBotBiz) otherTask(ctx context.Context, rec *entitys.Recognize) (err error) { - entitys.ResText(rec.Ch, "", rec.Match.Reasoning) - return -} - -//func (d *DingTalkBotBiz) HandleRes(ctx context.Context, data *chatbot.BotCallbackDataModel, resp entitys.Response, ch chan string) error { -// switch resp.Type { -// case entitys.ResponseText: -// return d.replyText(ctx, data.SessionWebhook, resp.Content) -// case entitys.ResponseStream: -// -// return d.replySteam(ctx, data, ch) -// case entitys.ResponseImg: -// return d.replyImg(ctx, data.SessionWebhook, resp.Content) -// case entitys.ResponseFile: -// return d.replyFile(ctx, data.SessionWebhook, resp.Content) -// case entitys.ResponseMarkdown: -// return d.replyMarkdown(ctx, data.SessionWebhook, resp.Content) -// case entitys.ResponseActionCard: -// return d.replyActionCard(ctx, data.SessionWebhook, resp.Content) -// default: -// return nil -// } -//} - func (d *DingTalkBotBiz) HandleStreamRes(ctx context.Context, data *chatbot.BotCallbackDataModel, content chan string) (err error) { err = d.cardSend.NewCard(ctx, &dingtalk.CardSend{ RobotCode: data.RobotCode, @@ -631,70 +331,6 @@ func (d *DingTalkBotBiz) HandleStreamRes(ctx context.Context, data *chatbot.BotC return } -func (d *DingTalkBotBiz) GetReportLists(ctx context.Context, group *model.AiBotGroup) (reports []*bbxt.ReportRes, err error) { - - var product []string - if group.ProductName != "" { - product = strings.Split(group.ProductName, ",") - } - - reportList, err := bbxt.NewBbxtTools() - if err != nil { - return - } - - reports, err = reportList.DailyReport(time.Now(), bbxt.DownWardValue, product, bbxt.SumFilter, d.ossClient) - if err != nil { - return - } - //product = []string{"优酷周卡", "优酷季卡", "优酷年卡", "爱奇艺黄金会员天卡"} - //追加电商充值系统统计 - 返回统一使用[]*bbxt.ReportRes - rechargeReports, err := d.rechargeDailyReport(ctx, time.Now(), nil, d.ossClient) - if err != nil || len(rechargeReports) == 0 { - return - } - - reports = append(reports, rechargeReports...) - - return -} - -// rechargeDailyReport 获取电商充值系统统计报告 -func (d *DingTalkBotBiz) rechargeDailyReport(ctx context.Context, now time.Time, productNames []string, ossClient *utils_oss.Client) (reports []*bbxt.ReportRes, err error) { - defer func() { - if err := recover(); err != nil { - log.Error(err) - } - }() - - workflowId := recharge.WorkflowIDStatisticsOursProduct - args := &runtime.WorkflowArgs{ - Args: map[string]any{ - "product_names": productNames, - "now": now, - }, - } - res, err := d.workflowManager.Invoke(ctx, workflowId, args) - if err != nil { - return - } - - log.Infof("imgUrl: %s", res["url"].(string)) - - reports = []*bbxt.ReportRes{ - { - ReportName: "我们的商品统计(电商充值系统)", - Title: fmt.Sprintf("%s 电商充值系统我们的商品统计", now.Format("2006-01-02")), - Path: res["path"].(string), - Url: res["url"].(string), - Data: res["data"].([][]string), - Desc: res["desc"].(string), - }, - } - - return -} - func (d *DingTalkBotBiz) SendReport(ctx context.Context, groupInfo *model.AiBotGroup, report *bbxt.ReportRes) (err error) { reportChan := make(chan string, 10) diff --git a/internal/biz/group_config.go b/internal/biz/group_config.go new file mode 100644 index 0000000..6d83751 --- /dev/null +++ b/internal/biz/group_config.go @@ -0,0 +1,399 @@ +package biz + +import ( + "ai_scheduler/internal/biz/tools_regis" + "ai_scheduler/internal/config" + "ai_scheduler/internal/data/constants" + "ai_scheduler/internal/data/impl" + "ai_scheduler/internal/data/model" + "ai_scheduler/internal/domain/workflow/recharge" + "ai_scheduler/internal/domain/workflow/runtime" + "ai_scheduler/internal/entitys" + "ai_scheduler/internal/pkg/l_request" + "ai_scheduler/internal/pkg/utils_oss" + "ai_scheduler/internal/tools" + "ai_scheduler/internal/tools/bbxt" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "strconv" + "strings" + "time" + + "github.com/coze-dev/coze-go" + "github.com/gofiber/fiber/v2/log" + "xorm.io/builder" +) + +// AiRouterBiz 智能路由服务 +type GroupConfigBiz struct { + botGroupConfigImpl *impl.BotGroupConfigImpl + ossClient *utils_oss.Client + workflowManager *runtime.Registry + botTools []model.AiBotTool + toolManager *tools.Manager + conf *config.Config +} + +// NewDingTalkBotBiz +func NewGroupConfigBiz( + tools *tools_regis.ToolRegis, + ossClient *utils_oss.Client, + botGroupConfigImpl *impl.BotGroupConfigImpl, + workflowManager *runtime.Registry, + conf *config.Config, +) *GroupConfigBiz { + return &GroupConfigBiz{ + botTools: tools.BootTools, + ossClient: ossClient, + botGroupConfigImpl: botGroupConfigImpl, + workflowManager: workflowManager, + conf: conf, + } +} + +func (g *GroupConfigBiz) GetGroupConfig(ctx context.Context, configId int32) (*model.AiBotGroupConfig, error) { + var groupConfig model.AiBotGroupConfig + cond := builder.NewCond() + cond = cond.And(builder.Eq{"config_id": configId}) + err := g.botGroupConfigImpl.GetOneBySearchToStrut(&cond, &groupConfig) + return &groupConfig, err +} + +func (g *GroupConfigBiz) GetReportLists(ctx context.Context, groupConfig *model.AiBotGroupConfig) (reports []*bbxt.ReportRes, err error) { + if groupConfig == nil { + return + } + var product []string + if groupConfig.ProductName != "" { + product = strings.Split(groupConfig.ProductName, ",") + } + + reportList, err := bbxt.NewBbxtTools() + if err != nil { + return + } + + reports, err = reportList.DailyReport(time.Now(), bbxt.DownWardValue, product, bbxt.SumFilter, g.ossClient) + if err != nil { + return + } + //product = []string{"优酷周卡", "优酷季卡", "优酷年卡", "爱奇艺黄金会员天卡"} + //追加电商充值系统统计 - 返回统一使用[]*bbxt.ReportRes + rechargeReports, err := g.rechargeDailyReport(ctx, time.Now(), nil, g.ossClient) + if err != nil || len(rechargeReports) == 0 { + return + } + + reports = append(reports, rechargeReports...) + + return +} + +// rechargeDailyReport 获取电商充值系统统计报告 +func (g *GroupConfigBiz) rechargeDailyReport(ctx context.Context, now time.Time, productNames []string, ossClient *utils_oss.Client) (reports []*bbxt.ReportRes, err error) { + defer func() { + if err := recover(); err != nil { + log.Error(err) + } + }() + + workflowId := recharge.WorkflowIDStatisticsOursProduct + args := &runtime.WorkflowArgs{ + Args: map[string]any{ + "product_names": productNames, + "now": now, + }, + } + res, err := g.workflowManager.Invoke(ctx, workflowId, args) + if err != nil { + return + } + + log.Infof("imgUrl: %s", res["url"].(string)) + + reports = []*bbxt.ReportRes{ + { + ReportName: "我们的商品统计(电商充值系统)", + Title: fmt.Sprintf("%s 电商充值系统我们的商品统计", now.Format("2006-01-02")), + Path: res["path"].(string), + Url: res["url"].(string), + Data: res["data"].([][]string), + Desc: res["desc"].(string), + }, + } + + return +} + +func (g *GroupConfigBiz) handleReport(ctx context.Context, rec *entitys.Recognize, task *model.AiBotTool, groupConfig *model.AiBotGroupConfig) error { + var configData entitys.ConfigDataReport + err := json.Unmarshal([]byte(rec.Match.Parameters), &configData) + if err != nil { + return err + } + t, err := time.Parse(time.DateTime, configData.Time) + if err != nil { + t, err = time.Parse("2006-01-02 15:04", configData.Time) + if err != nil { + t, err = time.Parse("2006-01-02", configData.Time) + if err != nil { + log.Infof("时间识别失败:%s", configData.Time) + entitys.ResText(rec.Ch, "", "时间识别失败了!可以给我一份比较具体的时间吗,例如“2025-12-31 12:00,抱歉抱歉😀") + } + } + } + rep, err := bbxt.NewBbxtTools() + uploader := bbxt.NewUploader(g.ossClient) + if err != nil { + return err + } + var reports []*bbxt.ReportRes + switch rec.Match.Index { + case "report_loss_analysis": + repo, _err := rep.StatisOursProductLossSum(t) + if _err != nil { + return _err + } + reports = append(reports, repo...) + case "report_sales_analysis": + product := strings.Split(groupConfig.ProductName, ",") + repo, _err := rep.GetStatisOfficialProductSum(t, product) + if _err != nil { + return _err + } + reports = append(reports, repo) + + case "report_ranking_of_distributors": + repo, _err := rep.GetProfitRankingSum(t) + if _err != nil { + return _err + } + reports = append(reports, repo) + case "report_daily": + product := strings.Split(groupConfig.ProductName, ",") + repo, _err := rep.DailyReport(t, bbxt.DownWardValue, product, bbxt.SumFilter, nil) + if _err != nil { + return _err + } + reports = append(reports, repo...) + case "report_daily_recharge": + product := strings.Split(groupConfig.ProductName, ",") + repo, _err := g.rechargeDailyReport(ctx, t, product, nil) + if _err != nil || len(repo) == 0 { + return _err + } + reports = append(reports, repo...) + case "report_sale_down_analysis": + product := strings.Split(groupConfig.ProductName, ",") + repo, _err := rep.GetStatisOfficialProductSumDecline(t, bbxt.DownWardValue, product, bbxt.SumFilter) + if _err != nil { + return _err + } + reports = append(reports, repo) + default: + return fmt.Errorf("未找到的报表:%s", rec.Match.Index) + } + + for _, report := range reports { + err = uploader.Run(report) + if err != nil { + log.Error(err) + continue + } + + entitys.ResText(rec.Ch, "", fmt.Sprintf("%s![图片](%s)", report.Title, report.Url)) + + } + return nil +} + +func (g *GroupConfigBiz) handleMatch(ctx context.Context, rec *entitys.Recognize, groupConfig *model.AiBotGroupConfig) (err error) { + + if !rec.Match.IsMatch { + if len(rec.Match.Chat) != 0 { + entitys.ResText(rec.Ch, "", rec.Match.Chat) + } else { + entitys.ResText(rec.Ch, "", rec.Match.Reasoning) + } + return + } + var pointTask *model.AiBotTool + for _, task := range g.botTools { + if task.Index == rec.Match.Index { + pointTask = &task + + break + } + } + if pointTask == nil || pointTask.Index == "other" { + return g.otherTask(ctx, rec) + } + switch constants.TaskType(pointTask.Type) { + case constants.TaskTypeFunc: + return g.handleTask(ctx, rec, pointTask) + case constants.TaskTypeReport: + return g.handleReport(ctx, rec, pointTask, groupConfig) + case constants.TaskTypeCozeWorkflow: + return g.handleCozeWorkflow(ctx, rec, pointTask) + default: + return g.otherTask(ctx, rec) + } + return +} + +func (g *GroupConfigBiz) getGroupTools(ctx context.Context, groupConfig *model.AiBotGroupConfig) (tools []model.AiBotTool, err error) { + if len(g.botTools) == 0 { + return + } + var ( + groupRegisTools = make(map[int]struct{}) + ) + if groupConfig.ToolList != "" { + groupToolList := strings.Split(groupConfig.ToolList, ",") + for _, tool := range groupToolList { + if tool == "" { + continue + } + num, _err := strconv.Atoi(tool) + if _err != nil { + continue + } + groupRegisTools[num] = struct{}{} + } + } + + for _, v := range g.botTools { + if v.PermissionType == constants.PermissionTypeNone { + tools = append(tools, v) + continue + } + if _, ex := groupRegisTools[int(v.ToolID)]; ex { + tools = append(tools, v) + } + } + return +} + +func (q *GroupConfigBiz) handleTask(ctx context.Context, rec *entitys.Recognize, task *model.AiBotTool) (err error) { + var configData entitys.ConfigDataTool + err = json.Unmarshal([]byte(task.Config), &configData) + if err != nil { + return + } + + err = q.toolManager.ExecuteTool(ctx, configData.Tool, rec) + if err != nil { + return + } + + return +} + +func (g *GroupConfigBiz) handleCozeWorkflow(ctx context.Context, rec *entitys.Recognize, task *model.AiBotTool) (err error) { + entitys.ResLoading(rec.Ch, task.Index, "正在执行工作流(coze)\n") + + customClient := &http.Client{ + Timeout: time.Minute * 30, + } + + authCli := coze.NewTokenAuth(g.conf.Coze.ApiSecret) + cozeCli := coze.NewCozeAPI( + authCli, + coze.WithBaseURL(g.conf.Coze.BaseURL), + coze.WithHttpClient(customClient), + ) + + // 从参数中获取workflowID + type requestParams struct { + Request l_request.Request `json:"request"` + } + var config requestParams + err = json.Unmarshal([]byte(task.Config), &config) + if err != nil { + return err + } + workflowId, ok := config.Request.Json["workflow_id"].(string) + if !ok { + return fmt.Errorf("workflow_id不能为空") + } + // 提取参数 + var data map[string]interface{} + err = json.Unmarshal([]byte(rec.Match.Parameters), &data) + + req := &coze.RunWorkflowsReq{ + WorkflowID: workflowId, + Parameters: data, + // IsAsync: true, + } + + stream := config.Request.Json["stream"].(bool) + + entitys.ResLog(rec.Ch, task.Index, "工作流执行中...") + + if stream { + streamResp, err := cozeCli.Workflows.Runs.Stream(ctx, req) + if err != nil { + return err + } + + g.handleCozeWorkflowEvents(ctx, streamResp, cozeCli, workflowId, rec.Ch, task.Index) + } else { + resp, err := cozeCli.Workflows.Runs.Create(ctx, req) + if err != nil { + return err + } + + entitys.ResJson(rec.Ch, task.Index, resp.Data) + } + + return +} + +// handleCozeWorkflowEvents 处理 coze 工作流事件 +func (g *GroupConfigBiz) handleCozeWorkflowEvents(ctx context.Context, resp coze.Stream[coze.WorkflowEvent], cozeCli coze.CozeAPI, workflowID string, ch chan entitys.Response, index string) { + defer resp.Close() + for { + event, err := resp.Recv() + if errors.Is(err, io.EOF) { + fmt.Println("Stream finished") + break + } + if err != nil { + fmt.Println("Error receiving event:", err) + break + } + + switch event.Event { + case coze.WorkflowEventTypeMessage: + entitys.ResStream(ch, index, event.Message.Content) + case coze.WorkflowEventTypeError: + entitys.ResError(ch, index, fmt.Sprintf("工作流执行错误: %v", event.Error)) + case coze.WorkflowEventTypeDone: + entitys.ResEnd(ch, index, "工作流执行完成") + case coze.WorkflowEventTypeInterrupt: + resumeReq := &coze.ResumeRunWorkflowsReq{ + WorkflowID: workflowID, + EventID: event.Interrupt.InterruptData.EventID, + ResumeData: "your data", + InterruptType: event.Interrupt.InterruptData.Type, + } + newResp, err := cozeCli.Workflows.Runs.Resume(ctx, resumeReq) + if err != nil { + entitys.ResError(ch, index, fmt.Sprintf("工作流恢复执行错误: %s", err.Error())) + return + } + entitys.ResLog(ch, index, "工作流恢复执行中...") + g.handleCozeWorkflowEvents(ctx, newResp, cozeCli, workflowID, ch, index) + } + } + fmt.Printf("done, log:%s\n", resp.Response().LogID()) +} + +func (g *GroupConfigBiz) otherTask(ctx context.Context, rec *entitys.Recognize) (err error) { + entitys.ResText(rec.Ch, "", rec.Match.Reasoning) + return +} diff --git a/internal/biz/handle/qywx/group.go b/internal/biz/handle/qywx/group.go index 21fe52f..53f064a 100644 --- a/internal/biz/handle/qywx/group.go +++ b/internal/biz/handle/qywx/group.go @@ -1,6 +1,14 @@ package qywx -import "ai_scheduler/internal/data/impl" +import ( + "ai_scheduler/internal/data/impl" + "ai_scheduler/internal/pkg/l_request" + "ai_scheduler/internal/pkg/util" + "context" + "encoding/json" + "fmt" + "net/http" +) type Group struct { groupImpl *impl.BotGroupQywxImpl @@ -13,3 +21,89 @@ func NewGroup(groupImpl *impl.BotGroupQywxImpl, auth *Auth) *Group { auth: auth, } } + +// Create 方法用于创建群聊 +// 参数: +// - ctx: context.Context,上下文,用于控制请求的超时和取消 +// - req: GroupCreateReq,创建群聊的请求参数结构体 +// - corpid: string,企业的CorpID +// - corpsecret: string,应用的Secret +// +// 返回值: +// - GroupCreateResp: 创建群聊的响应结果 +// - error: 错误信息,如果请求失败则返回错误 +func (g *Group) Create(ctx context.Context, req GroupCreateReq, corpid string, corpsecret string) (GroupCreateResp, error) { + // 声明一个GroupCreateResp结构体变量res,用于存储响应结果 + var res GroupCreateResp + // 将请求结构体req转换为map类型的参数param + // 如果转换失败,忽略错误 + param, _ := util.StructToMap(req) + // 发送HTTP请求到企业微信API创建群聊 + // 参数依次为:上下文、请求参数、请求URL、响应结果存储指针、企业ID、应用密钥 + _, err := g.request(ctx, param, "https://qyapi.weixin.qq.com/cgi-bin/appchat/create", &res, corpid, corpsecret) + // 如果请求过程中发生错误,返回响应结果和错误 + if err != nil { + return res, err + } + // 请求成功,返回响应结果和nil错误 + return res, nil +} + +// SendMarkDown 方法用于发送Markdown格式的消息到群聊 +// 参数: +// - ctx: 上下文信息,用于控制请求的超时和取消 +// - req: 群聊发送Markdown消息的请求参数结构体 +// - corpid: 企业微信corp ID +// - corpsecret: 企业微信应用的secret +// +// 返回值: +// - error: 操作过程中发生的错误,如果成功则为nil +func (g *Group) SendMarkDown(ctx context.Context, req GroupSendMarkDownReq, corpid string, corpsecret string) error { + + // 设置消息类型为Markdown + req.Msgtype = "markdown" + // 将请求结构体转换为map类型,便于后续的HTTP请求参数处理 + param, _ := util.StructToMap(req) + // 调用request方法发送HTTP请求到企业微信API + // 参数依次为:上下文、请求参数、API URL、额外请求头、corpid、corpsecret + _, err := g.request(ctx, param, " https://qyapi.weixin.qq.com/cgi-bin/appchat/send", nil, corpid, corpsecret) + // 如果请求过程中发生错误,直接返回错误 + if err != nil { + return err + } + // 请求成功,返回nil + return nil +} + +func (g *Group) request(ctx context.Context, param map[string]interface{}, url string, resData interface{}, corpid string, corpsecret string) ([]byte, error) { + auth, err := g.auth.GetAccessToken(ctx, corpid, corpsecret) + if err != nil { + return nil, err + } + req := l_request.Request{ + Method: http.MethodPost, + Url: url + "?access_token=" + auth.AccessToken, + Json: param, + } + res, err := req.Send() + if err != nil { + return nil, err + } + if res.StatusCode != http.StatusOK { + return nil, fmt.Errorf("request failed, status code: %d,reason: %s", res.StatusCode, res.Reason) + } + var code commonResp + if err = json.Unmarshal(res.Content, &code); err != nil { + return nil, fmt.Errorf("返回结构异常:%s", string(res.Content)) + } + if code.Errcode != 0 { + return nil, fmt.Errorf("返回状态异常:%s", string(code.Errmsg)) + } + if resData != nil { + if err = json.Unmarshal(res.Content, resData); err != nil { + return nil, fmt.Errorf("返回数据异常:%s", string(res.Content)) + } + } + + return res.Content, nil +} diff --git a/internal/biz/handle/qywx/json_callback/README.md b/internal/biz/handle/qywx/json_callback/README.md new file mode 100644 index 0000000..d5dbdd4 --- /dev/null +++ b/internal/biz/handle/qywx/json_callback/README.md @@ -0,0 +1,12 @@ +# weworkapi_cplusplus +official lib of wework api https://work.weixin.qq.com/api/doc + +# 注意事项 + +* 1.回调sdk json版本 + +* 2.wxbizjsonmsgcrypt.go文件中声明并实现了WXBizJsonMsgCrypt类,提供用户接入企业微信的三个接口。sample.go文件提供了如何使用这三个接口的示例。 + +* 3.WXBizJsonMsgCrypt类封装了VerifyURL, DecryptMsg, EncryptMsg三个接口,分别用于开发者验证回调url,收到用户回复消息的解密以及开发者回复消息的加密过程。使用方法可以参考sample.go文件。 + +* 4.加解密协议请参考企业微信官方文档。 \ No newline at end of file diff --git a/internal/biz/handle/qywx/json_callback/httpserver.go b/internal/biz/handle/qywx/json_callback/httpserver.go new file mode 100644 index 0000000..e5ba6a0 --- /dev/null +++ b/internal/biz/handle/qywx/json_callback/httpserver.go @@ -0,0 +1,126 @@ +package json_callback + +import ( + "ai_scheduler/internal/biz/handle/qywx/json_callback/wxbizjsonmsgcrypt" + + "encoding/json" + "fmt" + "io/ioutil" + "log" + "net/http" + "net/url" + "strings" +) + +const token = "gY1AGR3mjBhzy" +const receiverId = "wwabfd0cec7171e769" +const encodingAeskey = "g8VGfQEqluUhoKOlyjmmll8Q9C5tVFUTX5T2qkmI9Sv" + +func getString(str, endstr string, start int, msg *string) int { + end := strings.Index(str, endstr) + *msg = str[start:end] + return end + len(endstr) +} + +func VerifyURL(w http.ResponseWriter, r *http.Request) { + //httpstr := `&{GET /?msg_signature=825075c093249d5a60967fe4a613cae93146636b×tamp=1597998748&nonce=1597483820&echostr=neLB8CftccHiz19tluVb%2BUBnUVMT3xpUMZU8qvDdD17eH8XfEsbPYC%2FkJyPsZOOc6GdsCeu8jSIa2noSJ%2Fez2w%3D%3D HTTP/1.1 1 1 map[Cache-Control:[no-cache] Accept:[*/*] Pragma:[no-cache] User-Agent:[Mozilla/4.0]] 0x86c180 0 [] false 100.108.211.112:8893 map[] map[] map[] 100.108.79.233:59663 /?msg_signature=825075c093249d5a60967fe4a613cae93146636b×tamp=1597998748&nonce=1597483820&echostr=neLB8CftccHiz19tluVb%2BUBnUVMT3xpUMZU8qvDdD17eH8XfEsbPYC%2FkJyPsZOOc6GdsCeu8jSIa2noSJ%2Fez2w%3D%3D }` + fmt.Println(r, r.Body) + httpstr := r.URL.RawQuery + start := strings.Index(httpstr, "msg_signature=") + start += len("msg_signature=") + + var msg_signature string + next := getString(httpstr, "×tamp=", start, &msg_signature) + + var timestamp string + next = getString(httpstr, "&nonce=", next, ×tamp) + + var nonce string + next = getString(httpstr, "&echostr=", next, &nonce) + + echostr := httpstr[next:len(httpstr)] + + echostr, _ = url.QueryUnescape(echostr) + fmt.Println(msg_signature, timestamp, nonce, echostr, next) + + wxcpt := wxbizjsonmsgcrypt.NewWXBizMsgCrypt(token, encodingAeskey, receiverId, wxbizjsonmsgcrypt.JsonType) + echoStr, cryptErr := wxcpt.VerifyURL(msg_signature, timestamp, nonce, echostr) + if nil != cryptErr { + fmt.Println("verifyUrl fail", cryptErr) + } + fmt.Println("verifyUrl success echoStr", string(echoStr)) + fmt.Fprintf(w, string(echoStr)) +} + +type MsgContent struct { + ToUsername string `json:"ToUserName"` + FromUsername string `json:"FromUserName"` + CreateTime uint32 `json:"CreateTime"` + MsgType string `json:"MsgType"` + Content string `json:"Content"` + Msgid uint64 `json:"MsgId"` + Agentid uint32 `json:"AgentId"` +} + +func MsgHandler(w http.ResponseWriter, r *http.Request) { + httpstr := r.URL.RawQuery + start := strings.Index(httpstr, "msg_signature=") + start += len("msg_signature=") + + var msg_signature string + next := getString(httpstr, "×tamp=", start, &msg_signature) + + var timestamp string + next = getString(httpstr, "&nonce=", next, ×tamp) + + nonce := httpstr[next:len(httpstr)] + fmt.Println(msg_signature, timestamp, nonce) + + body, err := ioutil.ReadAll(r.Body) + fmt.Println(string(body), err) + wxcpt := wxbizjsonmsgcrypt.NewWXBizMsgCrypt(token, encodingAeskey, receiverId, wxbizjsonmsgcrypt.JsonType) + + msg, err_ := wxcpt.DecryptMsg(msg_signature, timestamp, nonce, body) + fmt.Println(string(msg), err_) + var msgContent MsgContent + err = json.Unmarshal(msg, &msgContent) + if nil != err { + fmt.Println("Unmarshal fail", err) + } else { + fmt.Println("struct", msgContent) + } + + fmt.Println(msgContent, err) + ToUsername := msgContent.ToUsername + msgContent.ToUsername = msgContent.FromUsername + msgContent.FromUsername = ToUsername + fmt.Println("replaymsg", msgContent) + replayJson, err := json.Marshal(&msgContent) + + encryptMsg, cryptErr := wxcpt.EncryptMsg(string(replayJson), "1409659589", "1409659589") + if nil != cryptErr { + fmt.Println("DecryptMsg fail", cryptErr) + } + + sEncryptMsg := string(encryptMsg) + + fmt.Println("after encrypt sEncryptMsg: ", sEncryptMsg) + fmt.Fprintf(w, sEncryptMsg) +} + +func CallbackHandler(w http.ResponseWriter, r *http.Request) { + httpstr := r.URL.RawQuery + echo := strings.Index(httpstr, "echostr") + if echo != -1 { + VerifyURL(w, r) + } else { + MsgHandler(w, r) + } + + fmt.Println("finished CallbackHandler", httpstr) +} + +func main() { + http.HandleFunc("/", CallbackHandler) // 设置访问路由 + log.Fatal(http.ListenAndServe(":8893", nil)) +} diff --git a/internal/biz/handle/qywx/json_callback/sample.go b/internal/biz/handle/qywx/json_callback/sample.go new file mode 100644 index 0000000..ba4fd07 --- /dev/null +++ b/internal/biz/handle/qywx/json_callback/sample.go @@ -0,0 +1,140 @@ +package json_callback + +// +//import ( +// "ai_scheduler/internal/biz/handle/qywx/json_callback/wxbizjsonmsgcrypt" +// "encoding/json" +// "fmt" +//) +// +//type MsgContent struct { +// ToUsername string `json:"ToUserName"` +// FromUsername string `json:"FromUserName"` +// CreateTime uint32 `json:"CreateTime"` +// MsgType string `json:"MsgType"` +// Content string `json:"Content"` +// Msgid uint64 `json:"MsgId"` +// Agentid uint32 `json:"AgentId"` +//} +// +//func main() { +// token := "QDG6eK" +// receiverId := "wx5823bf96d3bd56c7" +// encodingAeskey := "jWmYm7qr5nMoAUwZRjGtBxmz3KA1tkAj3ykkR6q2B2C" +// wxcpt := wxbizjsonmsgcrypt.NewWXBizMsgCrypt(token, encodingAeskey, receiverId, wxbizjsonmsgcrypt.JsonType) +// /* +// ------------使用示例一:验证回调URL--------------- +// *企业开启回调模式时,企业微信会向验证url发送一个get请求 +// 假设点击验证时,企业收到类似请求: +// * GET /cgi-bin/wxpush?msg_signature=5c45ff5e21c57e6ad56bac8758b79b1d9ac89fd3×tamp=1409659589&nonce=263014780&echostr=P9nAzCzyDtyTWESHep1vC5X9xho%2FqYX3Zpb4yKa9SKld1DsH3Iyt3tP3zNdtp%2B4RPcs8TgAE7OaBO%2BFZXvnaqQ%3D%3D +// * HTTP/1.1 Host: qy.weixin.qq.com +// +// 接收到该请求时,企业应 +// 1.解析出Get请求的参数,包括消息体签名(msg_signature),时间戳(timestamp),随机数字串(nonce)以及企业微信推送过来的随机加密字符串(echostr), +// 这一步注意作URL解码。 +// 2.验证消息体签名的正确性 +// 3. 解密出echostr原文,将原文当作Get请求的response,返回给企业微信 +// 第2,3步可以用企业微信提供的库函数VerifyURL来实现。 +// +// */ +// // 解析出url上的参数值如下: +// // verifyMsgSign := HttpUtils.ParseUrl("msg_signature") +// verifyMsgSign := "5c45ff5e21c57e6ad56bac8758b79b1d9ac89fd3" +// // verifyTimestamp := HttpUtils.ParseUrl("timestamp") +// verifyTimestamp := "1409659589" +// // verifyNonce := HttpUtils.ParseUrl("nonce") +// verifyNonce := "263014780" +// // verifyEchoStr := HttpUtils.ParseUrl("echoStr") +// verifyEchoStr := "P9nAzCzyDtyTWESHep1vC5X9xho/qYX3Zpb4yKa9SKld1DsH3Iyt3tP3zNdtp+4RPcs8TgAE7OaBO+FZXvnaqQ==" +// echoStr, cryptErr := wxcpt.VerifyURL(verifyMsgSign, verifyTimestamp, verifyNonce, verifyEchoStr) +// if nil != cryptErr { +// fmt.Println("verifyUrl fail", cryptErr) +// } +// fmt.Println("verifyUrl success echoStr", string(echoStr)) +// // 验证URL成功,将sEchoStr返回 +// // HttpUtils.SetResponse(sEchoStr) +// +// /* +// ------------使用示例二:对用户回复的消息解密--------------- +// 用户回复消息或者点击事件响应时,企业会收到回调消息,此消息是经过企业微信加密之后的密文以post形式发送给企业,密文格式请参考官方文档 +// 假设企业收到企业微信的回调消息如下: +// POST /cgi-bin/wxpush? msg_signature=477715d11cdb4164915debcba66cb864d751f3e6×tamp=1409659813&nonce=1372623149 HTTP/1.1 +// Host: qy.weixin.qq.com +// Content-Length: 613 +// { +// "tousername":"wx5823bf96d3bd56c7", +// "encrypt":"CZWs4CWRpI4VolQlvn4dlPBlXke6+HgmuI7p0LueFp1fKH40TNL+YHWJZwqIiYV+3kTrhdNU7fZwc+PmtgBvxSczkFeRz+oaVSsomrrtP2Z91LE313djjbWujqInRT+7ChGbCeo7ZzszByf8xnDSunPBxRX1MfX3kAxpKq7dqduW1kpMAx8O8xUzZ9oC0TLuZchbpxaml4epzGfF21O+zyXDwTxbCEiO0E87mChtzuh/VPlznXYbfqVrnyLNZ5pr", +// "agentid":"218" +// } +// +// 企业收到post请求之后应该: +// 1.解析出url上的参数,包括消息体签名(msg_signature),时间戳(timestamp)以及随机数字串(nonce) +// 2.验证消息体签名的正确性。 +// 3.将post请求的数据进行json解析,并将"Encrypt"标签的内容进行解密,解密出来的明文即是用户回复消息的明文,明文格式请参考官方文档 +// 第2,3步可以用企业微信提供的库函数DecryptMsg来实现。 +// */ +// +// // reqMsgSign := HttpUtils.ParseUrl("msg_signature") +// reqMsgSign := "0623cbc5a8cbee5bcc137c70de99575366fc2af3" +// // reqTimestamp := HttpUtils.ParseUrl("timestamp") +// reqTimestamp := "1409659813" +// // reqNonce := HttpUtils.ParseUrl("nonce") +// reqNonce := "1372623149" +// // post请求的密文数据 +// // reqData = HttpUtils.PostData() +// +// reqData := []byte(`{"tousername":"wx5823bf96d3bd56c7","encrypt":"CZWs4CWRpI4VolQlvn4dlEC1alN2MUEY2VklGehgBVLBrlVF7SyT+SV+Toj43l4ayJ9UMGKphktKKmP7B2j/P1ey67XB8PBgS7Wr5/8+w/yWriZv3Vmoo/MH3/1HsIWZrPQ3N2mJrelStIfI2Y8kLKXA7EhfZgZX4o+ffdkZDM76SEl79Ib9mw7TGjZ9Aw/x/A2VjNbV1E8BtEbRxYYcQippYNw7hr8sFfa3nW1xLdxokt8QkRX83vK3DFP2F6TQFPL2Tu98UwhcUpPvdJBuu1/yiOQIScppV3eOuLWEsko=","agentid":"218"}`) +// +// msg, cryptErr := wxcpt.DecryptMsg(reqMsgSign, reqTimestamp, reqNonce, reqData) +// if nil != cryptErr { +// fmt.Println("DecryptMsg fail", cryptErr) +// } +// fmt.Println("after decrypt msg: ", string(msg)) +// // TODO: 解析出明文json标签的内容进行处理 +// // For example: +// +// var msgContent MsgContent +// err := json.Unmarshal(msg, &msgContent) +// if nil != err { +// fmt.Println("Unmarshal fail", err) +// } else { +// fmt.Println("struct", msgContent) +// } +// +// /* +// ------------使用示例三:企业回复用户消息的加密--------------- +// 企业被动回复用户的消息也需要进行加密,并且拼接成密文格式的json串。 +// 假设企业需要回复用户的明文如下: +// +// { +// "ToUserName": "mycreate", +// "FromUserName":"wx5823bf96d3bd56c7", +// "CreateTime": 1348831860, +// "MsgType": "text", +// "Content": "this is a test", +// "MsgId": 1234567890123456, +// "AgentID": 128 +// } +// +// 为了将此段明文回复给用户,企业应: +// 1.自己生成时间时间戳(timestamp),随机数字串(nonce)以便生成消息体签名,也可以直接用从企业微信的post url上解析出的对应值。 +// 2.将明文加密得到密文。 +// 3.用密文,步骤1生成的timestamp,nonce和企业在企业微信设定的token生成消息体签名。 +// 4.将密文,消息体签名,时间戳,随机数字串拼接成json格式的字符串,发送给企业。 +// 以上2,3,4步可以用企业微信提供的库函数EncryptMsg来实现。 +// */ +// respData := "{\"ToUserName\":\"wx5823bf96d3bd56c7\",\"FromUserName\":\"mycreate\",\"CreateTime\": 1409659813,\"MsgType\":\"text\",\"Content\":\"hello\",\"MsgId\":4561255354251345929,\"AgentID\": 218}" +// //respData := `{"ToUserName":"wx5823bf96d3bd56c7","FromUserName":"mycreate","CreateTime": 1409659813,"MsgType":"text","Content":"hello","MsgId":4561255354251345929,"AgentID": 218}` +// //respData := `{"FromUserName":"mycreate","CreateTime": 1409659813,"MsgType":"text","Content":"hello","MsgId":4561255354251345929,"AgentID": 218}` +// encryptMsg, cryptErr := wxcpt.EncryptMsg(respData, reqTimestamp, reqNonce) +// if nil != cryptErr { +// fmt.Println("DecryptMsg fail", cryptErr) +// } +// +// sEncryptMsg := string(encryptMsg) +// +// fmt.Println("after encrypt sEncryptMsg: ", sEncryptMsg) +// // 加密成功 +// // TODO: +// // HttpUtils.SetResponse(sEncryptMsg) +//} diff --git a/internal/biz/handle/qywx/json_callback/wxbizjsonmsgcrypt/wxbizjsonmsgcrypt.go b/internal/biz/handle/qywx/json_callback/wxbizjsonmsgcrypt/wxbizjsonmsgcrypt.go new file mode 100644 index 0000000..1cddd5d --- /dev/null +++ b/internal/biz/handle/qywx/json_callback/wxbizjsonmsgcrypt/wxbizjsonmsgcrypt.go @@ -0,0 +1,310 @@ +package wxbizjsonmsgcrypt + +import ( + "bytes" + "crypto/aes" + "crypto/cipher" + "crypto/sha1" + "encoding/base64" + "encoding/binary" + "encoding/json" + "fmt" + "math/rand" + "sort" + "strings" +) + +const letterBytes = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" + +const ( + ValidateSignatureError int = -40001 + ParseJsonError int = -40002 + ComputeSignatureError int = -40003 + IllegalAesKey int = -40004 + ValidateCorpidError int = -40005 + EncryptAESError int = -40006 + DecryptAESError int = -40007 + IllegalBuffer int = -40008 + EncodeBase64Error int = -40009 + DecodeBase64Error int = -40010 + GenJsonError int = -40011 + IllegalProtocolType int = -40012 +) + +type ProtocolType int + +const ( + JsonType ProtocolType = 1 +) + +type CryptError struct { + ErrCode int + ErrMsg string +} + +func NewCryptError(err_code int, err_msg string) *CryptError { + return &CryptError{ErrCode: err_code, ErrMsg: err_msg} +} + +type WXBizJsonMsg4Recv struct { + Tousername string `json:"tousername"` + Encrypt string `json:"encrypt"` + Agentid string `json:"agentid"` +} + +type WXBizJsonMsg4Send struct { + Encrypt string `json:"encrypt"` + Signature string `json:"msgsignature"` + Timestamp string `json:"timestamp"` + Nonce string `json:"nonce"` +} + +func NewWXBizJsonMsg4Send(encrypt, signature, timestamp, nonce string) *WXBizJsonMsg4Send { + return &WXBizJsonMsg4Send{Encrypt: encrypt, Signature: signature, Timestamp: timestamp, Nonce: nonce} +} + +type ProtocolProcessor interface { + parse(src_data []byte) (*WXBizJsonMsg4Recv, *CryptError) + serialize(msg_send *WXBizJsonMsg4Send) ([]byte, *CryptError) +} + +type WXBizMsgCrypt struct { + token string + encoding_aeskey string + receiver_id string + protocol_processor ProtocolProcessor +} + +type JsonProcessor struct { +} + +func (self *JsonProcessor) parse(src_data []byte) (*WXBizJsonMsg4Recv, *CryptError) { + var msg4_recv WXBizJsonMsg4Recv + err := json.Unmarshal(src_data, &msg4_recv) + if nil != err { + fmt.Println("Unmarshal fail", err) + return nil, NewCryptError(ParseJsonError, "json to msg fail") + } + return &msg4_recv, nil +} + +func (self *JsonProcessor) serialize(msg4_send *WXBizJsonMsg4Send) ([]byte, *CryptError) { + json_msg, err := json.Marshal(msg4_send) + if nil != err { + return nil, NewCryptError(GenJsonError, err.Error()) + } + + return json_msg, nil +} + +func NewWXBizMsgCrypt(token, encoding_aeskey, receiver_id string, protocol_type ProtocolType) *WXBizMsgCrypt { + var protocol_processor ProtocolProcessor + if protocol_type != JsonType { + panic("unsupport protocal") + } else { + protocol_processor = new(JsonProcessor) + } + + return &WXBizMsgCrypt{token: token, encoding_aeskey: (encoding_aeskey + "="), receiver_id: receiver_id, protocol_processor: protocol_processor} +} + +func (self *WXBizMsgCrypt) randString(n int) string { + b := make([]byte, n) + for i := range b { + b[i] = letterBytes[rand.Int63()%int64(len(letterBytes))] + } + return string(b) +} + +func (self *WXBizMsgCrypt) pKCS7Padding(plaintext string, block_size int) []byte { + padding := block_size - (len(plaintext) % block_size) + padtext := bytes.Repeat([]byte{byte(padding)}, padding) + var buffer bytes.Buffer + buffer.WriteString(plaintext) + buffer.Write(padtext) + return buffer.Bytes() +} + +func (self *WXBizMsgCrypt) pKCS7Unpadding(plaintext []byte, block_size int) ([]byte, *CryptError) { + plaintext_len := len(plaintext) + if nil == plaintext || plaintext_len == 0 { + return nil, NewCryptError(DecryptAESError, "pKCS7Unpadding error nil or zero") + } + if plaintext_len%block_size != 0 { + return nil, NewCryptError(DecryptAESError, "pKCS7Unpadding text not a multiple of the block size") + } + padding_len := int(plaintext[plaintext_len-1]) + return plaintext[:plaintext_len-padding_len], nil +} + +func (self *WXBizMsgCrypt) cbcEncrypter(plaintext string) ([]byte, *CryptError) { + aeskey, err := base64.StdEncoding.DecodeString(self.encoding_aeskey) + if nil != err { + return nil, NewCryptError(DecodeBase64Error, err.Error()) + } + const block_size = 32 + pad_msg := self.pKCS7Padding(plaintext, block_size) + + block, err := aes.NewCipher(aeskey) + if err != nil { + return nil, NewCryptError(EncryptAESError, err.Error()) + } + + ciphertext := make([]byte, len(pad_msg)) + iv := aeskey[:aes.BlockSize] + + mode := cipher.NewCBCEncrypter(block, iv) + + mode.CryptBlocks(ciphertext, pad_msg) + base64_msg := make([]byte, base64.StdEncoding.EncodedLen(len(ciphertext))) + base64.StdEncoding.Encode(base64_msg, ciphertext) + + return base64_msg, nil +} + +func (self *WXBizMsgCrypt) cbcDecrypter(base64_encrypt_msg string) ([]byte, *CryptError) { + aeskey, err := base64.StdEncoding.DecodeString(self.encoding_aeskey) + if nil != err { + return nil, NewCryptError(DecodeBase64Error, err.Error()) + } + + encrypt_msg, err := base64.StdEncoding.DecodeString(base64_encrypt_msg) + if nil != err { + return nil, NewCryptError(DecodeBase64Error, err.Error()) + } + + block, err := aes.NewCipher(aeskey) + if err != nil { + return nil, NewCryptError(DecryptAESError, err.Error()) + } + + if len(encrypt_msg) < aes.BlockSize { + return nil, NewCryptError(DecryptAESError, "encrypt_msg size is not valid") + } + + iv := aeskey[:aes.BlockSize] + + if len(encrypt_msg)%aes.BlockSize != 0 { + return nil, NewCryptError(DecryptAESError, "encrypt_msg not a multiple of the block size") + } + + mode := cipher.NewCBCDecrypter(block, iv) + + mode.CryptBlocks(encrypt_msg, encrypt_msg) + + return encrypt_msg, nil +} + +func (self *WXBizMsgCrypt) calSignature(timestamp, nonce, data string) string { + sort_arr := []string{self.token, timestamp, nonce, data} + sort.Strings(sort_arr) + var buffer bytes.Buffer + for _, value := range sort_arr { + buffer.WriteString(value) + } + + sha := sha1.New() + sha.Write(buffer.Bytes()) + signature := fmt.Sprintf("%x", sha.Sum(nil)) + return string(signature) +} + +func (self *WXBizMsgCrypt) ParsePlainText(plaintext []byte) ([]byte, uint32, []byte, []byte, *CryptError) { + const block_size = 32 + plaintext, err := self.pKCS7Unpadding(plaintext, block_size) + if nil != err { + return nil, 0, nil, nil, err + } + + text_len := uint32(len(plaintext)) + if text_len < 20 { + return nil, 0, nil, nil, NewCryptError(IllegalBuffer, "plain is to small 1") + } + random := plaintext[:16] + msg_len := binary.BigEndian.Uint32(plaintext[16:20]) + if text_len < (20 + msg_len) { + return nil, 0, nil, nil, NewCryptError(IllegalBuffer, "plain is to small 2") + } + + msg := plaintext[20 : 20+msg_len] + receiver_id := plaintext[20+msg_len:] + + return random, msg_len, msg, receiver_id, nil +} + +func (self *WXBizMsgCrypt) VerifyURL(msg_signature, timestamp, nonce, echostr string) ([]byte, *CryptError) { + signature := self.calSignature(timestamp, nonce, echostr) + + if strings.Compare(signature, msg_signature) != 0 { + return nil, NewCryptError(ValidateSignatureError, "signature not equal") + } + + plaintext, err := self.cbcDecrypter(echostr) + if nil != err { + return nil, err + } + + _, _, msg, receiver_id, err := self.ParsePlainText(plaintext) + if nil != err { + return nil, err + } + + if len(self.receiver_id) > 0 && strings.Compare(string(receiver_id), self.receiver_id) != 0 { + fmt.Println(string(receiver_id), self.receiver_id, len(receiver_id), len(self.receiver_id)) + return nil, NewCryptError(ValidateCorpidError, "receiver_id is not equil") + } + + return msg, nil +} + +func (self *WXBizMsgCrypt) EncryptMsg(reply_msg, timestamp, nonce string) ([]byte, *CryptError) { + rand_str := self.randString(16) + var buffer bytes.Buffer + buffer.WriteString(rand_str) + + msg_len_buf := make([]byte, 4) + binary.BigEndian.PutUint32(msg_len_buf, uint32(len(reply_msg))) + buffer.Write(msg_len_buf) + buffer.WriteString(reply_msg) + buffer.WriteString(self.receiver_id) + + tmp_ciphertext, err := self.cbcEncrypter(buffer.String()) + if nil != err { + return nil, err + } + ciphertext := string(tmp_ciphertext) + + signature := self.calSignature(timestamp, nonce, ciphertext) + + msg4_send := NewWXBizJsonMsg4Send(ciphertext, signature, timestamp, nonce) + return self.protocol_processor.serialize(msg4_send) +} + +func (self *WXBizMsgCrypt) DecryptMsg(msg_signature, timestamp, nonce string, post_data []byte) ([]byte, *CryptError) { + msg4_recv, crypt_err := self.protocol_processor.parse(post_data) + if nil != crypt_err { + return nil, crypt_err + } + + signature := self.calSignature(timestamp, nonce, msg4_recv.Encrypt) + + if strings.Compare(signature, msg_signature) != 0 { + return nil, NewCryptError(ValidateSignatureError, "signature not equal") + } + + plaintext, crypt_err := self.cbcDecrypter(msg4_recv.Encrypt) + if nil != crypt_err { + return nil, crypt_err + } + + _, _, msg, receiver_id, crypt_err := self.ParsePlainText(plaintext) + if nil != crypt_err { + return nil, crypt_err + } + + if len(self.receiver_id) > 0 && strings.Compare(string(receiver_id), self.receiver_id) != 0 { + return nil, NewCryptError(ValidateCorpidError, "receiver_id is not equil") + } + + return msg, nil +} diff --git a/internal/biz/handle/qywx/provider_set.go b/internal/biz/handle/qywx/provider_set.go new file mode 100644 index 0000000..6d80e56 --- /dev/null +++ b/internal/biz/handle/qywx/provider_set.go @@ -0,0 +1,10 @@ +package qywx + +import ( + "github.com/google/wire" +) + +var ProviderSetQywx = wire.NewSet( + NewAuth, + NewGroup, +) diff --git a/internal/biz/handle/qywx/types.go b/internal/biz/handle/qywx/types.go index 09fdf67..38b2d34 100644 --- a/internal/biz/handle/qywx/types.go +++ b/internal/biz/handle/qywx/types.go @@ -15,3 +15,31 @@ type AuthInfo struct { AccessToken string `json:"accessToken"` Expire time.Duration `json:"expireIn"` } + +type GroupCreateReq struct { + Name string `json:"name"` + Owner string `json:"owner"` + Userlist []string `json:"userlist"` + Chatid string `json:"chatid"` +} + +type GroupCreateResp struct { + Errcode int `json:"errcode"` + Errmsg string `json:"errmsg"` + Chatid string `json:"chatid"` +} + +type commonResp struct { + Errcode int `json:"errcode"` + Errmsg string `json:"errmsg"` +} + +type GroupSendMarkDownReq struct { + Chatid string `json:"chatid"` + Msgtype string `json:"msgtype"` + Markdown MarkDown `json:"markdown"` +} + +type MarkDown struct { + Content string `json:"content"` +} diff --git a/internal/biz/provider_set.go b/internal/biz/provider_set.go index 1bdc0f7..1f8595f 100644 --- a/internal/biz/provider_set.go +++ b/internal/biz/provider_set.go @@ -1,10 +1,10 @@ package biz import ( - "ai_scheduler/internal/biz/do" - "ai_scheduler/internal/biz/llm_service" - - "github.com/google/wire" + "ai_scheduler/internal/biz/do" + "ai_scheduler/internal/biz/llm_service" + + "github.com/google/wire" ) var ProviderSetBiz = wire.NewSet( @@ -15,7 +15,9 @@ var ProviderSetBiz = wire.NewSet( llm_service.NewOllamaGenerate, //handle.NewHandle, do.NewDo, - do.NewHandle, + do.NewHandle, NewTaskBiz, NewDingTalkBotBiz, + NewQywxAppBiz, + NewGroupConfigBiz, ) diff --git a/internal/biz/qywx_app.go b/internal/biz/qywx_app.go new file mode 100644 index 0000000..694fb1d --- /dev/null +++ b/internal/biz/qywx_app.go @@ -0,0 +1,88 @@ +package biz + +import ( + "ai_scheduler/internal/biz/handle/qywx" + "ai_scheduler/internal/data/constants" + "ai_scheduler/internal/data/impl" + "ai_scheduler/internal/data/model" + "ai_scheduler/internal/pkg" + "ai_scheduler/internal/tools/bbxt" + "context" + "fmt" + "time" + + "ai_scheduler/internal/config" + + "xorm.io/builder" +) + +// AiRouterBiz 智能路由服务 +type QywxAppBiz struct { + conf *config.Config + botGroupQywxImpl *impl.BotGroupQywxImpl + qywxGroupHandle *qywx.Group +} + +// NewDingTalkBotBiz +func NewQywxAppBiz( + conf *config.Config, + botGroupQywxImpl *impl.BotGroupQywxImpl, + qywxGroupHandle *qywx.Group, +) *QywxAppBiz { + return &QywxAppBiz{ + conf: conf, + botGroupQywxImpl: botGroupQywxImpl, + qywxGroupHandle: qywxGroupHandle, + } +} + +func (q *QywxAppBiz) InitGroup(ctx context.Context) (string, error) { + chatId := pkg.RandomString(q.conf.Qywx.ChatIdLen) + GroupInfo := &model.AiBotGroupQywx{ + Title: "bot_group_" + time.Now().Format(time.DateOnly), + ChatID: chatId, + ConfigID: q.conf.Qywx.DefaultConfigId, + AppSecret: q.conf.Qywx.AppSecret, + } + _, err := q.botGroupQywxImpl.Add(GroupInfo) + if err != nil { + return "", err + } + resp, err := q.qywxGroupHandle.Create( + ctx, + qywx.GroupCreateReq{ + Name: GroupInfo.Title, + Chatid: GroupInfo.ChatID, + Userlist: []string{ + q.conf.Qywx.InitAccount, + }, + }, + q.conf.Qywx.CorpId, + GroupInfo.AppSecret, + ) + if err != nil { + return "", err + } + return resp.Chatid, nil +} + +func (q *QywxAppBiz) GetGroupInfo(ctx context.Context, groupId int) (group model.AiBotGroupQywx, err error) { + + cond := builder.NewCond() + cond = cond.And(builder.Eq{"group_id": groupId}) + cond = cond.And(builder.Eq{"status": constants.Enable}) + err = q.botGroupQywxImpl.GetOneBySearchToStrut(&cond, &group) + + return +} + +func (q *QywxAppBiz) SendReport(ctx context.Context, groupInfo *model.AiBotGroupQywx, report *bbxt.ReportRes) (err error) { + confitent := fmt.Sprintf("%s\n%s", report.Title, fmt.Sprintf("![图片](%s)", report.Url)) + err = q.qywxGroupHandle.SendMarkDown(ctx, qywx.GroupSendMarkDownReq{ + Chatid: groupInfo.ChatID, + Markdown: qywx.MarkDown{ + Content: confitent, + }, + }, q.conf.Qywx.CorpId, groupInfo.AppSecret) + return +} diff --git a/internal/biz/qywx_app_test.go b/internal/biz/qywx_app_test.go new file mode 100644 index 0000000..542ced5 --- /dev/null +++ b/internal/biz/qywx_app_test.go @@ -0,0 +1,32 @@ +package biz + +import ( + "ai_scheduler/internal/biz/handle/qywx" + "ai_scheduler/internal/config" + "ai_scheduler/internal/data/impl" + "ai_scheduler/utils" + "context" + "testing" +) + +func Test_InitGroup(t *testing.T) { + run() + chatId, err := qywxAppBiz.InitGroup(context.Background()) + t.Log(chatId, err) +} + +var ( + configConfig *config.Config + qywxAppBiz *QywxAppBiz +) + +func run() { + configConfig, _ = config.LoadConfigWithTest() + // 初始化数据库连接 + db, _ := utils.NewGormDb(configConfig) + rdb := utils.NewRdb(configConfig) + botGroupQywxImpl := impl.NewBotGroupQywxImpl(db) + qywxAuth := qywx.NewAuth(configConfig, rdb) + group := qywx.NewGroup(botGroupQywxImpl, qywxAuth) + qywxAppBiz = NewQywxAppBiz(configConfig, botGroupQywxImpl, group) +} diff --git a/internal/biz/router_test.go b/internal/biz/router_test.go deleted file mode 100644 index 2fb9f65..0000000 --- a/internal/biz/router_test.go +++ /dev/null @@ -1,122 +0,0 @@ -package biz - -// import ( -// "ai_scheduler/internal/config" -// "ai_scheduler/internal/data/impl" -// "ai_scheduler/internal/data/model" -// "ai_scheduler/internal/entitys" -// "ai_scheduler/internal/pkg" -// "ai_scheduler/internal/pkg/utils_ollama" -// "ai_scheduler/internal/tools" -// "ai_scheduler/utils" -// "encoding/json" -// "flag" -// "fmt" -// "os" -// "path/filepath" -// "testing" - -// "github.com/gofiber/fiber/v2/log" -// ) - -// func Test_task(t *testing.T) { -// var c entitys.TaskConfig -// config := `{"param": {"type": "object", "required": ["number"], "properties": {"number": {"type": "string", "description": "订单编号/流水号"}}}, "request": {"url": "http://www.baidu.com/${number}", "headers": {"Authorization": "${authorization}"}, "method": "GET"}}` -// err := json.Unmarshal([]byte(config), &c) -// t.Log(err) -// } - -// type configData struct { -// Param map[string]interface{} `json:"param"` -// Do map[string]interface{} `json:"do"` -// } - -// func Test_Order(t *testing.T) { -// routerBiz := in() -// ch := make(chan entitys.Response, 5) -// defer close(ch) -// err := routerBiz.handleTask(ch, nil, &entitys.Match{Index: "order_diagnosis", Parameters: `{"order_number":"822895927188791297"}`}, &model.AiTask{Config: `{"tool": "zltxOrderDetail", "param": {"type": "object", "optional": [], "required": ["order_number"], "properties": {"order_number": {"type": "string", "description": "订单编号/流水号"}}}}`}) -// select { -// case v := <-ch: // 尝试接收 -// fmt.Println("接收到值:", v) -// default: -// fmt.Println("无数据可接收") -// } -// t.Log(err) -// } - -// func Test_OrderLog(t *testing.T) { -// routerBiz := in() -// ch := make(chan entitys.Response, 5) -// defer close(ch) -// err := routerBiz.handleTask(ch, nil, &entitys.Match{Index: "order_diagnosis", Parameters: `{"order_number":"822979421673758721","serial_number":"822979421979938817"}`}, &model.AiTask{Config: `{"tool": "zltxOrderDirectLog", "param": {"type": "object", "optional": [], "required": ["order_number"], "properties": {"order_number": {"type": "string", "description": "订单编号/流水号"}}}}`}) -// t.Log(err) -// } - -// func Test_ProductLog(t *testing.T) { -// routerBiz := in() -// ch := make(chan entitys.Response, 5) -// defer close(ch) -// err := routerBiz.handleTask(ch, nil, &entitys.Match{Index: "order_diagnosis", Parameters: `{"name":"利楚测试"}`}, &model.AiTask{Config: `{"tool": "zltxProduct", "param": {"type": "object", "optional": [], "required": ["order_number"], "properties": {"order_number": {"type": "string", "description": "订单编号/流水号"}}}}`}) -// t.Log(err) -// } - -// func Test_ZltxStatistics(t *testing.T) { -// routerBiz := in() -// ch := make(chan entitys.Response, 5) -// defer close(ch) -// err := routerBiz.handleTask(ch, nil, &entitys.Match{Index: "order_diagnosis", Parameters: `{"number":"13737882067"}`}, &model.AiTask{Config: `{"tool": "zltxOrderStatistics", "param": {"type": "object", "optional": [], "required": ["number"], "properties": {"number": {"type": "string", "description": "充值账号/分销商ID"}}}}`}) -// t.Log(err) -// } - -// func in() *AiRouterBiz { - -// modDir, err := getModuleDir() -// if err != nil { -// panic("1") -// } -// configPath := flag.String("config", fmt.Sprintf("%s/config/config.yaml", modDir), "Path to configuration file") -// flag.Parse() - -// configConfig, err := config.LoadConfig(*configPath) -// if err != nil { -// panic("加载配置失败") -// } -// client, _, err := utils_ollama.NewClient(configConfig) -// allLogger := log.DefaultLogger() -// utilOllama := utils_ollama.NewUtilOllama(configConfig, allLogger) -// manager := tools.NewManager(configConfig, client) - -// db, _ := utils.NewGormDb(configConfig) -// sessionImpl := impl.NewSessionImpl(db) -// sysImpl := impl.NewSysImpl(db) -// taskImpl := impl.NewTaskImpl(db) -// chatImpl := impl.NewChatImpl(db) -// safeChannelPool, _ := pkg.NewSafeChannelPool(configConfig) -// routerBiz := NewAiRouterBiz(manager, sessionImpl, sysImpl, taskImpl, chatImpl, configConfig, utilOllama, safeChannelPool, client) - -// return routerBiz -// } - -// func getModuleDir() (string, error) { -// dir, err := os.Getwd() -// if err != nil { -// return "", err -// } - -// for { -// modPath := filepath.Join(dir, "go.mod") -// if _, err := os.Stat(modPath); err == nil { -// return dir, nil // 找到 go.mod -// } - -// // 向上查找父目录 -// parent := filepath.Dir(dir) -// if parent == dir { -// break // 到达根目录,未找到 -// } -// dir = parent -// } - -// return "", fmt.Errorf("go.mod not found in current directory or parents") -// } diff --git a/internal/config/config.go b/internal/config/config.go index b8c917d..a313252 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -26,6 +26,7 @@ type Config struct { PermissionConfig PermissionConfig `mapstructure:"permissionConfig"` LLM LLM `mapstructure:"llm"` Dingtalk DingtalkConfig `mapstructure:"dingtalk"` + Qywx QywxConfig `mapstructure:"qywx"` } type SysPrompt struct { @@ -72,6 +73,18 @@ type DingtalkConfig struct { BotGroupID map[string]int `mapstructure:"bot_group_id"` // 机器人群组 } +// QywxConfig 企业微信配置 +type QywxConfig struct { + CorpId string `mapstructure:"corp_id"` + DefaultConfigId int32 `mapstructure:"default_config_id"` + AppSecret string `mapstructure:"app_secret"` + InitAccount string `mapstructure:"init_account"` + Token string `mapstructure:"token"` + AES_KEY string `mapstructure:"aes_key"` + ChatIdLen int `mapstructure:"chat_id_len"` + BotGroupID map[string]int `mapstructure:"bot_group_id"` // 应用群 +} + // TableDemandConfig 需求表配置 type AITableConfig struct { Url string `mapstructure:"url"` diff --git a/internal/data/impl/bot_group_config.go b/internal/data/impl/bot_group_config.go new file mode 100644 index 0000000..c72962f --- /dev/null +++ b/internal/data/impl/bot_group_config.go @@ -0,0 +1,17 @@ +package impl + +import ( + "ai_scheduler/internal/data/model" + "ai_scheduler/tmpl/dataTemp" + "ai_scheduler/utils" +) + +type BotGroupConfigImpl struct { + dataTemp.DataTemp +} + +func NewBotGroupConfigImpl(db *utils.Db) *BotGroupConfigImpl { + return &BotGroupConfigImpl{ + DataTemp: *dataTemp.NewDataTemp(db, new(model.AiBotGroupConfig)), + } +} diff --git a/internal/data/impl/bot_group_qywx.go b/internal/data/impl/bot_group_qywx.go index 87b5ce5..0522713 100644 --- a/internal/data/impl/bot_group_qywx.go +++ b/internal/data/impl/bot_group_qywx.go @@ -10,8 +10,8 @@ type BotGroupQywxImpl struct { dataTemp.DataTemp } -func NewBotGroupQywxImpl(db *utils.Db) *BotGroupImpl { - return &BotGroupImpl{ +func NewBotGroupQywxImpl(db *utils.Db) *BotGroupQywxImpl { + return &BotGroupQywxImpl{ DataTemp: *dataTemp.NewDataTemp(db, new(model.AiBotGroupQywx)), } } diff --git a/internal/data/impl/provider_set.go b/internal/data/impl/provider_set.go index 5624b3e..916b017 100644 --- a/internal/data/impl/provider_set.go +++ b/internal/data/impl/provider_set.go @@ -15,4 +15,6 @@ var ProviderImpl = wire.NewSet( NewBotChatHisImpl, NewBotToolsImpl, NewBotGroupImpl, + NewBotGroupConfigImpl, + NewBotGroupQywxImpl, ) diff --git a/internal/data/model/ai_bot_group.gen.go b/internal/data/model/ai_bot_group.gen.go index 3dec7b5..0e190e7 100644 --- a/internal/data/model/ai_bot_group.gen.go +++ b/internal/data/model/ai_bot_group.gen.go @@ -12,15 +12,14 @@ const TableNameAiBotGroup = "ai_bot_group" // AiBotGroup mapped from table type AiBotGroup struct { - GroupID int32 `gorm:"column:group_id;primaryKey;autoIncrement:true" json:"group_id"` - ConversationID string `gorm:"column:conversation_id;not null;comment:会话ID" json:"conversation_id"` // 会话ID - RobotCode string `gorm:"column:robot_code;not null;comment:绑定机器人code" json:"robot_code"` // 绑定机器人code - Title string `gorm:"column:title;not null;comment:群名称" json:"title"` // 群名称 - ToolList string `gorm:"column:tool_list;not null;comment:开通工具列表" json:"tool_list"` // 开通工具列表 - ProductName string `gorm:"column:product_name;not null;comment:针对报表商品筛选快速实现,后期优化" json:"product_name"` // 针对报表商品筛选快速实现,后期优化 - Status int32 `gorm:"column:status;not null;default:1" json:"status"` - DeleteAt time.Time `gorm:"column:delete_at" json:"delete_at"` - CreateAt time.Time `gorm:"column:create_at;default:CURRENT_TIMESTAMP" json:"create_at"` + GroupID int32 `gorm:"column:group_id;primaryKey;autoIncrement:true" json:"group_id"` + ConversationID string `gorm:"column:conversation_id;not null;comment:会话ID" json:"conversation_id"` // 会话ID + RobotCode string `gorm:"column:robot_code;not null;comment:绑定机器人code" json:"robot_code"` // 绑定机器人code + ConfigID int32 `gorm:"column:config_id;not null;comment:关联ai_bot_group_config" json:"config_id"` // 关联ai_bot_group_config + Title string `gorm:"column:title;not null;comment:群名称" json:"title"` // 群名称 + Status int32 `gorm:"column:status;not null;default:1" json:"status"` + DeleteAt *time.Time `gorm:"column:delete_at" json:"delete_at"` + CreateAt time.Time `gorm:"column:create_at;default:CURRENT_TIMESTAMP" json:"create_at"` } // TableName AiBotGroup's table name diff --git a/internal/data/model/ai_bot_group_config.gen.go b/internal/data/model/ai_bot_group_config.gen.go new file mode 100644 index 0000000..f839145 --- /dev/null +++ b/internal/data/model/ai_bot_group_config.gen.go @@ -0,0 +1,19 @@ +// Code generated by gorm.io/gen. DO NOT EDIT. +// Code generated by gorm.io/gen. DO NOT EDIT. +// Code generated by gorm.io/gen. DO NOT EDIT. + +package model + +const TableNameAiBotGroupConfig = "ai_bot_group_config" + +// AiBotGroupConfig mapped from table +type AiBotGroupConfig struct { + ConfigID int32 `gorm:"column:config_id;primaryKey;autoIncrement:true" json:"config_id"` + ToolList string `gorm:"column:tool_list;not null" json:"tool_list"` + ProductName string `gorm:"column:product_name;not null" json:"product_name"` +} + +// TableName AiBotGroupConfig's table name +func (*AiBotGroupConfig) TableName() string { + return TableNameAiBotGroupConfig +} diff --git a/internal/data/model/ai_bot_group_qywx.gen.go b/internal/data/model/ai_bot_group_qywx.gen.go new file mode 100644 index 0000000..7691350 --- /dev/null +++ b/internal/data/model/ai_bot_group_qywx.gen.go @@ -0,0 +1,28 @@ +// Code generated by gorm.io/gen. DO NOT EDIT. +// Code generated by gorm.io/gen. DO NOT EDIT. +// Code generated by gorm.io/gen. DO NOT EDIT. + +package model + +import ( + "time" +) + +const TableNameAiBotGroupQywx = "ai_bot_group_qywx" + +// AiBotGroupQywx mapped from table +type AiBotGroupQywx struct { + GroupID int32 `gorm:"column:group_id;primaryKey;autoIncrement:true" json:"group_id"` + ChatID string `gorm:"column:chat_id;not null;comment:会话ID" json:"chat_id"` // 会话ID + ConfigID int32 `gorm:"column:config_id;not null" json:"config_id"` + AppSecret string `gorm:"column:app_secret;not null;comment:绑定机器人code" json:"app_secret"` // 绑定机器人code + Title string `gorm:"column:title;not null;comment:群名称" json:"title"` // 群名称 + Status int32 `gorm:"column:status;not null;default:1" json:"status"` + DeleteAt *time.Time `gorm:"column:delete_at" json:"delete_at"` + CreateAt time.Time `gorm:"column:create_at;default:CURRENT_TIMESTAMP" json:"create_at"` +} + +// TableName AiBotGroupQywx's table name +func (*AiBotGroupQywx) TableName() string { + return TableNameAiBotGroupQywx +} diff --git a/internal/pkg/func.go b/internal/pkg/func.go index 648996c..47e78a9 100644 --- a/internal/pkg/func.go +++ b/internal/pkg/func.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "math/rand" "net/url" "reflect" "strconv" @@ -422,3 +423,21 @@ func StructToQueryString(input interface{}, options ...URLValuesOptions) (string } return values.Encode(), nil } + +const ( + letterBytes = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" // 62个字符 +) + +// RandomString 生成随机字符串,包含 0-9, a-z, A-Z +// length: 要生成的字符串长度 +func RandomString(length int) string { + // 使用 crypto/rand 替代 math/rand(更安全,适用于密码学场景) + // 但如果不需要高安全性,math/rand 更快 + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + + result := make([]byte, length) + for i := range result { + result[i] = letterBytes[rng.Intn(len(letterBytes))] + } + return string(result) +} diff --git a/internal/server/cron.go b/internal/server/cron.go index 798de27..3b662e1 100644 --- a/internal/server/cron.go +++ b/internal/server/cron.go @@ -40,7 +40,7 @@ func (c *CronServer) InitJobs(ctx context.Context) { c.ctx = ctx c.jobs = []*cronJob{ { - Func: c.cronService.CronReportSend, + Func: c.cronService.CronReportSendDingTalk, Name: "直连天下报表推送", Schedule: "0 12,18,23 * * *", }, @@ -66,13 +66,13 @@ func (c *CronServer) Run(ctx context.Context) { } c.log.Infof("任务[%d]:%s执行结束", jobID, job.Name) }() - c.log.Infof("任务[%d]:%s执ddd", jobID, job.Name) - // 为每次执行创建新的上下文 - //ctx := context.Background() - //err := job.Func(ctx) - //if err != nil { - // c.log.Errorf("任务[%d]:%s执行失败: %s", jobID, job.Name, err.Error()) - //} + //c.log.Infof("任务[%d]:%s执ddd", jobID, job.Name) + //为每次执行创建新的上下文 + ctx := context.Background() + err := job.Func(ctx) + if err != nil { + c.log.Errorf("任务[%d]:%s执行失败: %s", jobID, job.Name, err.Error()) + } }) if err != nil { c.log.Errorf("添加任务失败:%s", err.Error()) diff --git a/internal/server/router/router.go b/internal/server/router/router.go index 091c85f..c461740 100644 --- a/internal/server/router/router.go +++ b/internal/server/router/router.go @@ -66,6 +66,8 @@ func SetupRoutes(app *fiber.App, ChatService *services.ChatService, sessionServi r.Post("/chat/useful", ChatService.Useful) // 回调 r.Post("/callback", callbackService.Callback) + // 回调 + r.Get("/qywx/callback", callbackService.QywxCallback) //广播 r.Get("/broadcast", func(ctx *fiber.Ctx) error { action := ctx.Query("action") diff --git a/internal/services/callback.go b/internal/services/callback.go index b45ef41..70cf967 100644 --- a/internal/services/callback.go +++ b/internal/services/callback.go @@ -1,6 +1,7 @@ package services import ( + "ai_scheduler/internal/biz/handle/qywx/json_callback/wxbizjsonmsgcrypt" "ai_scheduler/internal/config" "ai_scheduler/internal/data/constants" errorcode "ai_scheduler/internal/data/error" @@ -13,10 +14,13 @@ import ( "ai_scheduler/internal/tool_callback" "context" "encoding/json" + "fmt" + "net/url" "strings" "time" "github.com/gofiber/fiber/v2" + "github.com/gofiber/fiber/v2/log" ) // CallbackService 统一回调入口 @@ -327,3 +331,23 @@ func (s *CallbackService) handleBugOptimizationSubmitDone(ctx context.Context, t return msg, nil } + +func (s *CallbackService) QywxCallback(c *fiber.Ctx) error { + // 读取头 + msgSignature := c.Query("msg_signature") + timestamp := c.Query("timestamp") + nonce := c.Query("nonce") + echostr := c.Query("echostr") + echostr, _ = url.QueryUnescape(echostr) + fmt.Println(msgSignature, timestamp, nonce, echostr) + wxcpt := wxbizjsonmsgcrypt.NewWXBizMsgCrypt(s.cfg.Qywx.Token, s.cfg.Qywx.AES_KEY, s.cfg.Qywx.CorpId, wxbizjsonmsgcrypt.JsonType) + echoStr, cryptErr := wxcpt.VerifyURL(msgSignature, timestamp, nonce, echostr) + if nil != cryptErr { + log.Errorf("%v", cryptErr) + return fmt.Errorf("%v", cryptErr) + } + fmt.Println("verifyUrl success echoStr", string(echoStr)) + err := c.Send(echoStr) + return err + +} diff --git a/internal/services/cron.go b/internal/services/cron.go index 7df3f6e..1cc49e1 100644 --- a/internal/services/cron.go +++ b/internal/services/cron.go @@ -11,27 +11,39 @@ import ( type CronService struct { config *config.Config dingTalkBotBiz *biz.DingTalkBotBiz + qywxAppBiz *biz.QywxAppBiz + groupConfigBiz *biz.GroupConfigBiz } -func NewCronService(config *config.Config, dingTalkBotBiz *biz.DingTalkBotBiz) *CronService { +func NewCronService( + config *config.Config, + dingTalkBotBiz *biz.DingTalkBotBiz, + qywxAppBiz *biz.QywxAppBiz, + groupConfigBiz *biz.GroupConfigBiz, +) *CronService { return &CronService{ config: config, dingTalkBotBiz: dingTalkBotBiz, + qywxAppBiz: qywxAppBiz, + groupConfigBiz: groupConfigBiz, } } -func (d *CronService) CronReportSend(ctx context.Context) error { +func (d *CronService) CronReportSendDingTalk(ctx context.Context) error { groupId := d.config.Dingtalk.BotGroupID["bbxt"] groupInfo, err := d.dingTalkBotBiz.GetGroupInfo(ctx, groupId) if err != nil { return err } - reports, err := d.dingTalkBotBiz.GetReportLists(ctx, &groupInfo) + groupConfig, err := d.groupConfigBiz.GetGroupConfig(ctx, groupInfo.ConfigID) if err != nil { return err } - //contentChan <- "截止今日23点利润亏损合计:127917.0866元,亏损500元以上的分销商和产品金额如下图:" - //contentChan <- "![图片](https://lsxdmgoss.oss-cn-chengdu.aliyuncs.com/MarketingSaaS/image/V2/other/shanghu.png)" + reports, err := d.groupConfigBiz.GetReportLists(ctx, groupConfig) + if err != nil { + return err + } + for _, report := range reports { err = d.dingTalkBotBiz.SendReport(ctx, &groupInfo, report) if err != nil { @@ -41,3 +53,28 @@ func (d *CronService) CronReportSend(ctx context.Context) error { } return nil } + +func (d *CronService) CronReportSendQywx(ctx context.Context) error { + groupId := d.config.Qywx.BotGroupID["bbxt"] + groupInfo, err := d.qywxAppBiz.GetGroupInfo(ctx, groupId) + if err != nil { + return err + } + groupConfig, err := d.groupConfigBiz.GetGroupConfig(ctx, groupInfo.ConfigID) + if err != nil { + return err + } + reports, err := d.groupConfigBiz.GetReportLists(ctx, groupConfig) + if err != nil { + return err + } + + for _, report := range reports { + err = d.qywxAppBiz.SendReport(ctx, &groupInfo, report) + if err != nil { + log.Error(err) + continue + } + } + return nil +} diff --git a/internal/services/dtalk_bot_test.go b/internal/services/dtalk_bot_test.go index d4fbaf1..c5303e0 100644 --- a/internal/services/dtalk_bot_test.go +++ b/internal/services/dtalk_bot_test.go @@ -4,6 +4,7 @@ import ( "ai_scheduler/internal/biz" "ai_scheduler/internal/biz/do" dingtalk2 "ai_scheduler/internal/biz/handle/dingtalk" + "ai_scheduler/internal/biz/handle/qywx" "ai_scheduler/internal/biz/llm_service" "ai_scheduler/internal/biz/tools_regis" "ai_scheduler/internal/config" @@ -12,7 +13,6 @@ import ( "ai_scheduler/internal/domain/component/callback" "ai_scheduler/internal/domain/repo" "ai_scheduler/internal/domain/workflow" - "ai_scheduler/internal/domain/workflow/runtime" "ai_scheduler/internal/pkg" "ai_scheduler/internal/pkg/dingtalk" "ai_scheduler/internal/pkg/lsxd" @@ -30,7 +30,13 @@ import ( func Test_Report(t *testing.T) { run() - a := cronService.CronReportSend(context.Background()) + a := cronService.CronReportSendDingTalk(context.Background()) + t.Log(a) +} + +func Test_Report_QYWX(t *testing.T) { + run() + a := cronService.CronReportSendQywx(context.Background()) t.Log(a) } @@ -107,9 +113,13 @@ func run() { // 初始化钉钉机器人业务逻辑 utils_ossClient, _ := utils_oss.NewClient(configConfig) // 初始化工作流管理器 - workflowManager := runtime.NewRegistry() - dingTalkBotBiz := biz.NewDingTalkBotBiz(doDo, handle, botConfigImpl, botGroupImpl, user, toolRegis, botChatHisImpl, manager, configConfig, sendCardClient, utils_ossClient, workflowManager) + botGroupConfigImpl := impl.NewBotGroupConfigImpl(db) + botGroupQywxImpl := impl.NewBotGroupQywxImpl(db) + qywxAuth := qywx.NewAuth(configConfig, rdb) + group := qywx.NewGroup(botGroupQywxImpl, qywxAuth) + qywxAppBiz := biz.NewQywxAppBiz(configConfig, botGroupQywxImpl, group) + groupConfigBiz := biz.NewGroupConfigBiz(toolRegis, utils_ossClient, botGroupConfigImpl, registry, configConfig) + dingTalkBotBiz := biz.NewDingTalkBotBiz(doDo, handle, botConfigImpl, botGroupImpl, user, botChatHisImpl, manager, configConfig, sendCardClient, groupConfigBiz) // 初始化钉钉机器人服务 - cronService = NewCronService(configConfig, dingTalkBotBiz) - dingBotService = NewDingBotService(configConfig, dingTalkBotBiz) + cronService = NewCronService(configConfig, dingTalkBotBiz, qywxAppBiz, groupConfigBiz) } diff --git a/internal/tools/bbxt/bbxt.go b/internal/tools/bbxt/bbxt.go index 688e446..d9a055b 100644 --- a/internal/tools/bbxt/bbxt.go +++ b/internal/tools/bbxt/bbxt.go @@ -18,7 +18,7 @@ const ( ) var ( - DownWardValue int32 = 1500 + DownWardValue int32 = 1000 SumFilter int32 = -150 ) diff --git a/internal/tools/bbxt/upload.go b/internal/tools/bbxt/upload.go index aa56ba1..0c95e45 100644 --- a/internal/tools/bbxt/upload.go +++ b/internal/tools/bbxt/upload.go @@ -30,6 +30,9 @@ func NewUploader(oss *utils_oss.Client) *Uploader { } func (u *Uploader) Run(report *ReportRes) (err error) { + if report == nil { + return + } if len(report.Path) == 0 { return } diff --git a/tmpl/dataTemp/queryTempl.go b/tmpl/dataTemp/queryTempl.go index 2761318..2c97141 100644 --- a/tmpl/dataTemp/queryTempl.go +++ b/tmpl/dataTemp/queryTempl.go @@ -53,7 +53,7 @@ func NewDataTemp(db *utils.Db, model interface{}) *DataTemp { return &DataTemp{Db: db.Client, Model: model} } -func (k DataTemp) GetById(id int) (data map[string]interface{}, err error) { +func (k DataTemp) GetById(id int32) (data map[string]interface{}, err error) { err = k.Db.Model(k.Model).Where("id = ?", id).Find(&data).Error if data == nil { err = sql.ErrNoRows