Merge branch 'master' into test

This commit is contained in:
fuzhongyun 2026-01-07 09:10:59 +08:00
commit 0721a4e82e
37 changed files with 1778 additions and 629 deletions

View File

@ -6,6 +6,7 @@ package main
import (
"ai_scheduler/internal/biz"
"ai_scheduler/internal/biz/handle/dingtalk"
"ai_scheduler/internal/biz/handle/qywx"
"ai_scheduler/internal/biz/tools_regis"
"ai_scheduler/internal/config"
"ai_scheduler/internal/data/impl"
@ -36,6 +37,7 @@ func InitializeApp(*config.Config, log.AllLogger) (*server.Servers, func(), erro
impl.ProviderImpl,
utils.ProviderUtils,
dingtalk.ProviderSetDingTalk,
qywx.ProviderSetQywx,
tools_regis.ProviderToolsRegis,
// tool_callback.ProviderSetCallBackTools,
component.ProviderSet,

View File

@ -22,12 +22,6 @@ vllm:
coze:
base_url: "https://api.coze.cn"
redis:
host: 47.97.27.195:6379
type: node
pass:
lansexiongdi@666i_secret: "sat_AqvFcdNgesP8megy1ItTscWFXRcsHRzmM4NJ1KNavfcdT0EPwYuCPkDqGhItpx13"
lsxd:
# 统一登录
login_url: "https://api.user.1688sup.com/v1/login/phone"
@ -48,6 +42,16 @@ sys:
maxIdleTime: 30 #每个连接最大空闲时间,如果超过了这个时间会被关闭
tls: 30
db:
redis:
host: 47.97.27.195:6379
type: node
pass: lansexiongdi@666
key: report-api-test
pollSize: 5 #连接池大小不配置或配置为0表示不启用连接池
minIdleConns: 2 #最小空闲连接数
maxIdleTime: 30 #每个连接最大空闲时间,如果超过了这个时间会被关闭
tls: 30
db:
db:
driver: mysql
source: root:SD###sdf323r343@tcp(121.199.38.107:3306)/sys_ai?charset=utf8mb4&parseTime=true&loc=Asia%2FShanghai
@ -150,7 +154,18 @@ dingtalk:
sheet_id_or_name: "数据表"
# 机器人群组
bot_group_id:
bbxt: 28
bbxt: 29
qywx:
corp_id: "ww48151f694fb8ec67"
app_secret: "uYqtdwdtdH4Uv_P4is2AChuGzBCoB6cQDyRvpbW0Vmk"
token: "zJdukry6"
aes_key: "4VLH47qRGUogc2d3QLWuUhvJlk8Y0YuRjXzeBquBq8B"
init_account: "les."
chat_id_len: 16
default_config_id: 1
bot_group_id:
bbxt: 23
default_prompt:
img_recognize:

View File

@ -151,6 +151,22 @@ dingtalk:
bot_group_id:
bbxt: 23
qywx:
corp_id: "ww48151f694fb8ec67"
app_secret: "uYqtdwdtdH4Uv_P4is2AChuGzBCoB6cQDyRvpbW0Vmk"
token: "zJdukry6"
aes_key: "4VLH47qRGUogc2d3QLWuUhvJlk8Y0YuRjXzeBquBq8B"
#
# corp_id: "wx5823bf96d3bd56c7"
# token: "QDG6eK"
# aes_key: "jWmYm7qr5nMoAUwZRjGtBxmz3KA1tkAj3ykkR6q2B2C"
# app_secret: "uYqtdwdtdH4Uv_P4is2AChuGzBCoB6cQDyRvpbW0Vmk"
init_account: "les."
chat_id_len: 16
default_config_id: 1
bot_group_id:
bbxt: 23
default_prompt:
img_recognize:
system_prompt:

View File

@ -3,54 +3,47 @@ package biz
import (
"ai_scheduler/internal/biz/do"
"ai_scheduler/internal/biz/handle/dingtalk"
"ai_scheduler/internal/biz/tools_regis"
"ai_scheduler/internal/biz/handle/qywx"
"ai_scheduler/internal/config"
"ai_scheduler/internal/data/constants"
"ai_scheduler/internal/data/impl"
"ai_scheduler/internal/data/model"
"ai_scheduler/internal/domain/workflow/recharge"
"ai_scheduler/internal/domain/workflow/runtime"
"ai_scheduler/internal/entitys"
"ai_scheduler/internal/pkg/l_request"
"ai_scheduler/internal/pkg/utils_oss"
"ai_scheduler/internal/tools"
"ai_scheduler/internal/tools/bbxt"
"ai_scheduler/tmpl/dataTemp"
"io"
"net/http"
"strconv"
"time"
"unicode"
"ai_scheduler/internal/config"
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
"unicode"
"gitea.cdlsxd.cn/self-tools/l-dingtalk-stream-sdk-go/chatbot"
"github.com/coze-dev/coze-go"
"github.com/gofiber/fiber/v2/log"
"xorm.io/builder"
)
// AiRouterBiz 智能路由服务
type DingTalkBotBiz struct {
do *do.Do
handle *do.Handle
botConfigImpl *impl.BotConfigImpl
replier *chatbot.ChatbotReplier
log log.Logger
dingTalkUser *dingtalk.User
botTools []model.AiBotTool
botGroupImpl *impl.BotGroupImpl
toolManager *tools.Manager
chatHis *impl.BotChatHisImpl
conf *config.Config
cardSend *dingtalk.SendCardClient
ossClient *utils_oss.Client
workflowManager *runtime.Registry
do *do.Do
handle *do.Handle
botConfigImpl *impl.BotConfigImpl
replier *chatbot.ChatbotReplier
log log.Logger
dingTalkUser *dingtalk.User
botGroupImpl *impl.BotGroupImpl
botGroupConfigImpl *impl.BotGroupConfigImpl
botGroupQywxImpl *impl.BotGroupQywxImpl
toolManager *tools.Manager
chatHis *impl.BotChatHisImpl
conf *config.Config
cardSend *dingtalk.SendCardClient
qywxGroupHandle *qywx.Group
groupConfigBiz *GroupConfigBiz
}
// NewDingTalkBotBiz
@ -60,28 +53,24 @@ func NewDingTalkBotBiz(
botConfigImpl *impl.BotConfigImpl,
botGroupImpl *impl.BotGroupImpl,
dingTalkUser *dingtalk.User,
tools *tools_regis.ToolRegis,
chatHis *impl.BotChatHisImpl,
toolManager *tools.Manager,
conf *config.Config,
cardSend *dingtalk.SendCardClient,
ossClient *utils_oss.Client,
workflowManager *runtime.Registry,
groupConfigBiz *GroupConfigBiz,
) *DingTalkBotBiz {
return &DingTalkBotBiz{
do: do,
handle: handle,
botConfigImpl: botConfigImpl,
replier: chatbot.NewChatbotReplier(),
dingTalkUser: dingTalkUser,
botTools: tools.BootTools,
botGroupImpl: botGroupImpl,
toolManager: toolManager,
chatHis: chatHis,
conf: conf,
cardSend: cardSend,
ossClient: ossClient,
workflowManager: workflowManager,
do: do,
handle: handle,
botConfigImpl: botConfigImpl,
replier: chatbot.NewChatbotReplier(),
dingTalkUser: dingTalkUser,
groupConfigBiz: groupConfigBiz,
botGroupImpl: botGroupImpl,
toolManager: toolManager,
chatHis: chatHis,
conf: conf,
cardSend: cardSend,
}
}
@ -148,8 +137,15 @@ func (d *DingTalkBotBiz) handleSingleChat(ctx context.Context, requireData *enti
func (d *DingTalkBotBiz) handleGroupChat(ctx context.Context, requireData *entitys.RequireDataDingTalkBot) (err error) {
group, err := d.initGroup(ctx, requireData.Req.ConversationId, requireData.Req.ConversationTitle, requireData.Req.RobotCode)
if err != nil {
return
}
groupConfig, err := d.groupConfigBiz.GetGroupConfig(ctx, group.ConfigID)
if err != nil {
return
}
//宏
err, isFinal := d.Macro(ctx, requireData, group)
err, isFinal := d.Macro(ctx, requireData, groupConfig)
if err != nil {
return
}
@ -157,7 +153,7 @@ func (d *DingTalkBotBiz) handleGroupChat(ctx context.Context, requireData *entit
return
}
requireData.ID = group.GroupID
groupTools, err := d.getGroupTools(ctx, group)
groupTools, err := d.groupConfigBiz.getGroupTools(ctx, groupConfig)
if err != nil {
return
}
@ -166,10 +162,10 @@ func (d *DingTalkBotBiz) handleGroupChat(ctx context.Context, requireData *entit
return
}
return d.handleMatch(ctx, rec, group)
return d.groupConfigBiz.handleMatch(ctx, rec, groupConfig)
}
func (d *DingTalkBotBiz) Macro(ctx context.Context, requireData *entitys.RequireDataDingTalkBot, group *model.AiBotGroup) (err error, isFinish bool) {
func (d *DingTalkBotBiz) Macro(ctx context.Context, requireData *entitys.RequireDataDingTalkBot, groupConfig *model.AiBotGroupConfig) (err error, isFinish bool) {
content := processString(requireData.Req.Text.Content)
if strings.Contains(content, "[利润同比报表]商品修改:") {
@ -177,10 +173,10 @@ func (d *DingTalkBotBiz) Macro(ctx context.Context, requireData *entitys.Require
if parts := strings.SplitN(content, "", 2); len(parts) == 2 {
itemInfo := strings.TrimSpace(parts[1])
log.Infof("商品修改信息: %s", itemInfo)
group.ProductName = itemInfo
groupConfig.ProductName = itemInfo
cond := builder.NewCond()
cond = cond.And(builder.Eq{"group_id": group.GroupID})
err = d.botGroupImpl.UpdateByCond(&cond, group)
cond = cond.And(builder.Eq{"config_id": groupConfig.ConfigID})
err = d.botGroupImpl.UpdateByCond(&cond, groupConfig)
if err != nil {
entitys.ResText(requireData.Ch, "", fmt.Sprintf("修改失败:%v", err))
}
@ -192,10 +188,10 @@ func (d *DingTalkBotBiz) Macro(ctx context.Context, requireData *entitys.Require
if strings.Contains(content, "[利润同比报表]商品列表") {
// 提取冒号后的内容
if len(group.ProductName) == 0 {
if len(groupConfig.ProductName) == 0 {
entitys.ResText(requireData.Ch, "", "暂未设置")
} else {
entitys.ResText(requireData.Ch, "", group.ProductName)
entitys.ResText(requireData.Ch, "", groupConfig.ProductName)
isFinish = true
}
return
@ -236,7 +232,6 @@ func (d *DingTalkBotBiz) initGroup(ctx context.Context, conversationId string, c
ConversationID: conversationId,
Title: conversationTitle,
RobotCode: robotCode,
ToolList: "",
}
//如果不存在则创建
_, err = d.botGroupImpl.Add(group)
@ -244,38 +239,6 @@ func (d *DingTalkBotBiz) initGroup(ctx context.Context, conversationId string, c
return
}
func (d *DingTalkBotBiz) getGroupTools(ctx context.Context, group *model.AiBotGroup) (tools []model.AiBotTool, err error) {
if len(d.botTools) == 0 {
return
}
var (
groupRegisTools = make(map[int]struct{})
)
if group.ToolList != "" {
groupToolList := strings.Split(group.ToolList, ",")
for _, tool := range groupToolList {
if tool == "" {
continue
}
num, _err := strconv.Atoi(tool)
if _err != nil {
continue
}
groupRegisTools[num] = struct{}{}
}
}
for _, v := range d.botTools {
if v.PermissionType == constants.PermissionTypeNone {
tools = append(tools, v)
continue
}
if _, ex := groupRegisTools[int(v.ToolID)]; ex {
tools = append(tools, v)
}
}
return
}
func (d *DingTalkBotBiz) recognize(ctx context.Context, requireData *entitys.RequireDataDingTalkBot, tools []model.AiBotTool) (rec *entitys.Recognize, err error) {
userContent, err := d.getUserContent(requireData.Req.Msgtype, requireData.Req.Text.Content)
@ -355,269 +318,6 @@ func (d *DingTalkBotBiz) getUserContent(msgType string, msgContent interface{})
return
}
func (d *DingTalkBotBiz) handleMatch(ctx context.Context, rec *entitys.Recognize, group *model.AiBotGroup) (err error) {
if !rec.Match.IsMatch {
if len(rec.Match.Chat) != 0 {
entitys.ResText(rec.Ch, "", rec.Match.Chat)
} else {
entitys.ResText(rec.Ch, "", rec.Match.Reasoning)
}
return
}
var pointTask *model.AiBotTool
for _, task := range d.botTools {
if task.Index == rec.Match.Index {
pointTask = &task
break
}
}
if pointTask == nil || pointTask.Index == "other" {
return d.otherTask(ctx, rec)
}
switch constants.TaskType(pointTask.Type) {
case constants.TaskTypeFunc:
return d.handleTask(ctx, rec, pointTask)
case constants.TaskTypeReport:
return d.handleReport(ctx, rec, pointTask, group)
case constants.TaskTypeCozeWorkflow:
return d.handleCozeWorkflow(ctx, rec, pointTask)
default:
return d.otherTask(ctx, rec)
}
return
}
func (d *DingTalkBotBiz) handleCozeWorkflow(ctx context.Context, rec *entitys.Recognize, task *model.AiBotTool) (err error) {
entitys.ResLoading(rec.Ch, task.Index, "正在执行工作流(coze)\n")
customClient := &http.Client{
Timeout: time.Minute * 30,
}
authCli := coze.NewTokenAuth(d.conf.Coze.ApiSecret)
cozeCli := coze.NewCozeAPI(
authCli,
coze.WithBaseURL(d.conf.Coze.BaseURL),
coze.WithHttpClient(customClient),
)
// 从参数中获取workflowID
type requestParams struct {
Request l_request.Request `json:"request"`
}
var config requestParams
err = json.Unmarshal([]byte(task.Config), &config)
if err != nil {
return err
}
workflowId, ok := config.Request.Json["workflow_id"].(string)
if !ok {
return fmt.Errorf("workflow_id不能为空")
}
// 提取参数
var data map[string]interface{}
err = json.Unmarshal([]byte(rec.Match.Parameters), &data)
req := &coze.RunWorkflowsReq{
WorkflowID: workflowId,
Parameters: data,
// IsAsync: true,
}
stream := config.Request.Json["stream"].(bool)
entitys.ResLog(rec.Ch, task.Index, "工作流执行中...")
if stream {
streamResp, err := cozeCli.Workflows.Runs.Stream(ctx, req)
if err != nil {
return err
}
handleCozeWorkflowEvents(ctx, streamResp, cozeCli, workflowId, rec.Ch, task.Index)
} else {
resp, err := cozeCli.Workflows.Runs.Create(ctx, req)
if err != nil {
return err
}
entitys.ResJson(rec.Ch, task.Index, resp.Data)
}
return
}
// handleCozeWorkflowEvents 处理 coze 工作流事件
func handleCozeWorkflowEvents(ctx context.Context, resp coze.Stream[coze.WorkflowEvent], cozeCli coze.CozeAPI, workflowID string, ch chan entitys.Response, index string) {
defer resp.Close()
for {
event, err := resp.Recv()
if errors.Is(err, io.EOF) {
fmt.Println("Stream finished")
break
}
if err != nil {
fmt.Println("Error receiving event:", err)
break
}
switch event.Event {
case coze.WorkflowEventTypeMessage:
entitys.ResStream(ch, index, event.Message.Content)
case coze.WorkflowEventTypeError:
entitys.ResError(ch, index, fmt.Sprintf("工作流执行错误: %s", event.Error))
case coze.WorkflowEventTypeDone:
entitys.ResEnd(ch, index, "工作流执行完成")
case coze.WorkflowEventTypeInterrupt:
resumeReq := &coze.ResumeRunWorkflowsReq{
WorkflowID: workflowID,
EventID: event.Interrupt.InterruptData.EventID,
ResumeData: "your data",
InterruptType: event.Interrupt.InterruptData.Type,
}
newResp, err := cozeCli.Workflows.Runs.Resume(ctx, resumeReq)
if err != nil {
entitys.ResError(ch, index, fmt.Sprintf("工作流恢复执行错误: %s", err.Error()))
return
}
entitys.ResLog(ch, index, "工作流恢复执行中...")
handleCozeWorkflowEvents(ctx, newResp, cozeCli, workflowID, ch, index)
}
}
fmt.Printf("done, log:%s\n", resp.Response().LogID())
}
func (d *DingTalkBotBiz) handleReport(ctx context.Context, rec *entitys.Recognize, task *model.AiBotTool, group *model.AiBotGroup) error {
var configData entitys.ConfigDataReport
err := json.Unmarshal([]byte(rec.Match.Parameters), &configData)
if err != nil {
return err
}
t, err := time.Parse(time.DateTime, configData.Time)
if err != nil {
t, err = time.Parse("2006-01-02 15:04", configData.Time)
if err != nil {
t, err = time.Parse("2006-01-02", configData.Time)
if err != nil {
log.Infof("时间识别失败:%s", configData.Time)
entitys.ResText(rec.Ch, "", "时间识别失败了可以给我一份比较具体的时间吗例如“2025-12-31 12:00,抱歉抱歉😀")
}
}
}
rep, err := bbxt.NewBbxtTools()
uploader := bbxt.NewUploader(d.ossClient)
if err != nil {
return err
}
var reports []*bbxt.ReportRes
switch rec.Match.Index {
case "report_loss_analysis":
repo, _err := rep.StatisOursProductLossSum(t)
if _err != nil {
return _err
}
reports = append(reports, repo...)
case "report_sales_analysis":
product := strings.Split(group.ProductName, ",")
repo, _err := rep.GetStatisOfficialProductSum(t, product)
if _err != nil {
return _err
}
reports = append(reports, repo)
case "report_ranking_of_distributors":
repo, _err := rep.GetProfitRankingSum(t)
if _err != nil {
return _err
}
reports = append(reports, repo)
case "report_daily":
product := strings.Split(group.ProductName, ",")
repo, _err := rep.DailyReport(t, bbxt.DownWardValue, product, bbxt.SumFilter, nil)
if _err != nil {
return _err
}
reports = append(reports, repo...)
case "report_daily_recharge":
product := strings.Split(group.ProductName, ",")
repo, _err := d.rechargeDailyReport(ctx, t, product, nil)
if _err != nil || len(repo) == 0 {
return _err
}
reports = append(reports, repo...)
case "report_sale_down_analysis":
product := strings.Split(group.ProductName, ",")
repo, _err := rep.GetStatisOfficialProductSumDecline(t, bbxt.DownWardValue, product, bbxt.SumFilter)
if _err != nil {
return _err
}
reports = append(reports, repo)
default:
return fmt.Errorf("未找到的报表:%s", rec.Match.Index)
}
for _, report := range reports {
err = uploader.Run(report)
if err != nil {
log.Error(err)
continue
}
entitys.ResText(rec.Ch, "", fmt.Sprintf("%s![图片](%s)", report.Title, report.Url))
//rec.Ch <- report.Title
//reportChan <- fmt.Sprintf("![图片](%s)", report.Url)
//err = d.SendReport(ctx, group, report)
//if err != nil {
// log.Error(err)
// continue
//}
}
return nil
}
func (d *DingTalkBotBiz) handleTask(ctx context.Context, rec *entitys.Recognize, task *model.AiBotTool) (err error) {
var configData entitys.ConfigDataTool
err = json.Unmarshal([]byte(task.Config), &configData)
if err != nil {
return
}
err = d.toolManager.ExecuteTool(ctx, configData.Tool, rec)
if err != nil {
return
}
return
}
func (d *DingTalkBotBiz) otherTask(ctx context.Context, rec *entitys.Recognize) (err error) {
entitys.ResText(rec.Ch, "", rec.Match.Reasoning)
return
}
//func (d *DingTalkBotBiz) HandleRes(ctx context.Context, data *chatbot.BotCallbackDataModel, resp entitys.Response, ch chan string) error {
// switch resp.Type {
// case entitys.ResponseText:
// return d.replyText(ctx, data.SessionWebhook, resp.Content)
// case entitys.ResponseStream:
//
// return d.replySteam(ctx, data, ch)
// case entitys.ResponseImg:
// return d.replyImg(ctx, data.SessionWebhook, resp.Content)
// case entitys.ResponseFile:
// return d.replyFile(ctx, data.SessionWebhook, resp.Content)
// case entitys.ResponseMarkdown:
// return d.replyMarkdown(ctx, data.SessionWebhook, resp.Content)
// case entitys.ResponseActionCard:
// return d.replyActionCard(ctx, data.SessionWebhook, resp.Content)
// default:
// return nil
// }
//}
func (d *DingTalkBotBiz) HandleStreamRes(ctx context.Context, data *chatbot.BotCallbackDataModel, content chan string) (err error) {
err = d.cardSend.NewCard(ctx, &dingtalk.CardSend{
RobotCode: data.RobotCode,
@ -631,70 +331,6 @@ func (d *DingTalkBotBiz) HandleStreamRes(ctx context.Context, data *chatbot.BotC
return
}
func (d *DingTalkBotBiz) GetReportLists(ctx context.Context, group *model.AiBotGroup) (reports []*bbxt.ReportRes, err error) {
var product []string
if group.ProductName != "" {
product = strings.Split(group.ProductName, ",")
}
reportList, err := bbxt.NewBbxtTools()
if err != nil {
return
}
reports, err = reportList.DailyReport(time.Now(), bbxt.DownWardValue, product, bbxt.SumFilter, d.ossClient)
if err != nil {
return
}
//product = []string{"优酷周卡", "优酷季卡", "优酷年卡", "爱奇艺黄金会员天卡"}
//追加电商充值系统统计 - 返回统一使用[]*bbxt.ReportRes
rechargeReports, err := d.rechargeDailyReport(ctx, time.Now(), nil, d.ossClient)
if err != nil || len(rechargeReports) == 0 {
return
}
reports = append(reports, rechargeReports...)
return
}
// rechargeDailyReport 获取电商充值系统统计报告
func (d *DingTalkBotBiz) rechargeDailyReport(ctx context.Context, now time.Time, productNames []string, ossClient *utils_oss.Client) (reports []*bbxt.ReportRes, err error) {
defer func() {
if err := recover(); err != nil {
log.Error(err)
}
}()
workflowId := recharge.WorkflowIDStatisticsOursProduct
args := &runtime.WorkflowArgs{
Args: map[string]any{
"product_names": productNames,
"now": now,
},
}
res, err := d.workflowManager.Invoke(ctx, workflowId, args)
if err != nil {
return
}
log.Infof("imgUrl: %s", res["url"].(string))
reports = []*bbxt.ReportRes{
{
ReportName: "我们的商品统计(电商充值系统)",
Title: fmt.Sprintf("%s 电商充值系统我们的商品统计", now.Format("2006-01-02")),
Path: res["path"].(string),
Url: res["url"].(string),
Data: res["data"].([][]string),
Desc: res["desc"].(string),
},
}
return
}
func (d *DingTalkBotBiz) SendReport(ctx context.Context, groupInfo *model.AiBotGroup, report *bbxt.ReportRes) (err error) {
reportChan := make(chan string, 10)

View File

@ -0,0 +1,406 @@
package biz
import (
"ai_scheduler/internal/biz/tools_regis"
"ai_scheduler/internal/config"
"ai_scheduler/internal/data/constants"
"ai_scheduler/internal/data/impl"
"ai_scheduler/internal/data/model"
"ai_scheduler/internal/domain/workflow/recharge"
"ai_scheduler/internal/domain/workflow/runtime"
"ai_scheduler/internal/entitys"
"ai_scheduler/internal/pkg/l_request"
"ai_scheduler/internal/pkg/utils_oss"
"ai_scheduler/internal/tools"
"ai_scheduler/internal/tools/bbxt"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"
"github.com/coze-dev/coze-go"
"github.com/gofiber/fiber/v2/log"
"xorm.io/builder"
)
// AiRouterBiz 智能路由服务
type GroupConfigBiz struct {
botGroupConfigImpl *impl.BotGroupConfigImpl
ossClient *utils_oss.Client
workflowManager *runtime.Registry
botTools []model.AiBotTool
toolManager *tools.Manager
conf *config.Config
}
// NewDingTalkBotBiz
func NewGroupConfigBiz(
tools *tools_regis.ToolRegis,
ossClient *utils_oss.Client,
botGroupConfigImpl *impl.BotGroupConfigImpl,
workflowManager *runtime.Registry,
conf *config.Config,
) *GroupConfigBiz {
return &GroupConfigBiz{
botTools: tools.BootTools,
ossClient: ossClient,
botGroupConfigImpl: botGroupConfigImpl,
workflowManager: workflowManager,
conf: conf,
}
}
func (g *GroupConfigBiz) GetGroupConfig(ctx context.Context, configId int32) (*model.AiBotGroupConfig, error) {
var groupConfig model.AiBotGroupConfig
cond := builder.NewCond()
cond = cond.And(builder.Eq{"config_id": configId})
err := g.botGroupConfigImpl.GetOneBySearchToStrut(&cond, &groupConfig)
return &groupConfig, err
}
func (g *GroupConfigBiz) GetReportLists(ctx context.Context, groupConfig *model.AiBotGroupConfig) (reports []*bbxt.ReportRes, err error) {
if groupConfig == nil {
return
}
var product []string
if groupConfig.ProductName != "" {
product = strings.Split(groupConfig.ProductName, ",")
}
reportList, err := bbxt.NewBbxtTools()
if err != nil {
return
}
reports, err = reportList.DailyReport(time.Now(), bbxt.DownWardValue, product, bbxt.SumFilter, g.ossClient)
if err != nil {
return
}
//product = []string{"优酷周卡", "优酷季卡", "优酷年卡", "爱奇艺黄金会员天卡"}
//追加电商充值系统统计 - 返回统一使用[]*bbxt.ReportRes
rechargeReports, err := g.rechargeDailyReport(ctx, time.Now(), nil, g.ossClient)
if err != nil || len(rechargeReports) == 0 {
return
}
reports = append(reports, rechargeReports...)
return
}
// rechargeDailyReport 获取电商充值系统统计报告
func (g *GroupConfigBiz) rechargeDailyReport(ctx context.Context, now time.Time, productNames []string, ossClient *utils_oss.Client) (reports []*bbxt.ReportRes, err error) {
defer func() {
if err := recover(); err != nil {
log.Error(err)
}
}()
workflowId := recharge.WorkflowIDStatisticsOursProduct
args := &runtime.WorkflowArgs{
Args: map[string]any{
"product_names": productNames,
"now": now,
},
}
res, err := g.workflowManager.Invoke(ctx, workflowId, args)
if err != nil {
return
}
log.Infof("imgUrl: %s", res["url"].(string))
reports = []*bbxt.ReportRes{
{
ReportName: "我们的商品统计(电商充值系统)",
Title: res["title"].(string),
Path: res["path"].(string),
Url: res["url"].(string),
Data: res["data"].([][]string),
},
}
return
}
func (g *GroupConfigBiz) handleReport(ctx context.Context, rec *entitys.Recognize, task *model.AiBotTool, groupConfig *model.AiBotGroupConfig) error {
var configData entitys.ConfigDataReport
err := json.Unmarshal([]byte(rec.Match.Parameters), &configData)
if err != nil {
return err
}
t, err := time.Parse(time.DateTime, configData.Time)
if err != nil {
t, err = time.Parse("2006-01-02 15:04", configData.Time)
if err != nil {
t, err = time.Parse("2006-01-02", configData.Time)
if err != nil {
log.Infof("时间识别失败:%s", configData.Time)
entitys.ResText(rec.Ch, "", "时间识别失败了可以给我一份比较具体的时间吗例如“2025-12-31 12:00,抱歉抱歉😀")
}
}
}
rep, err := bbxt.NewBbxtTools()
uploader := bbxt.NewUploader(g.ossClient)
if err != nil {
return err
}
var reports []*bbxt.ReportRes
switch rec.Match.Index {
case "report_loss_analysis":
repo, _err := rep.StatisOursProductLossSum(t)
if _err != nil {
return _err
}
reports = append(reports, repo...)
case "report_sales_analysis":
product := strings.Split(groupConfig.ProductName, ",")
repo, _err := rep.GetStatisOfficialProductSum(t, product)
if _err != nil {
return _err
}
reports = append(reports, repo)
case "report_ranking_of_distributors":
repo, _err := rep.GetProfitRankingSum(t)
if _err != nil {
return _err
}
reports = append(reports, repo)
case "report_daily":
product := strings.Split(groupConfig.ProductName, ",")
repo, _err := rep.DailyReport(t, bbxt.DownWardValue, product, bbxt.SumFilter, nil)
if _err != nil {
return _err
}
reports = append(reports, repo...)
rechargeReport, _err := g.rechargeDailyReport(ctx, t, product, nil)
if _err != nil || len(repo) == 0 {
return _err
}
reports = append(reports, rechargeReport...)
case "report_daily_recharge":
product := strings.Split(groupConfig.ProductName, ",")
repo, _err := g.rechargeDailyReport(ctx, t, product, nil)
if _err != nil || len(repo) == 0 {
return _err
}
reports = append(reports, repo...)
case "report_sale_down_analysis":
product := strings.Split(groupConfig.ProductName, ",")
repo, _err := rep.GetStatisOfficialProductSumDecline(t, bbxt.DownWardValue, product, bbxt.SumFilter)
if _err != nil {
return _err
}
reports = append(reports, repo)
default:
return fmt.Errorf("未找到的报表:%s", rec.Match.Index)
}
for _, report := range reports {
if report == nil {
continue
}
err = uploader.Run(report)
if err != nil {
log.Error(err)
continue
}
entitys.ResText(rec.Ch, "", fmt.Sprintf("%s![图片](%s)", report.Title, report.Url))
}
return nil
}
func (g *GroupConfigBiz) handleMatch(ctx context.Context, rec *entitys.Recognize, groupConfig *model.AiBotGroupConfig) (err error) {
if !rec.Match.IsMatch {
if len(rec.Match.Chat) != 0 {
entitys.ResText(rec.Ch, "", rec.Match.Chat)
} else {
entitys.ResText(rec.Ch, "", rec.Match.Reasoning)
}
return
}
var pointTask *model.AiBotTool
for _, task := range g.botTools {
if task.Index == rec.Match.Index {
pointTask = &task
break
}
}
if pointTask == nil || pointTask.Index == "other" {
return g.otherTask(ctx, rec)
}
switch constants.TaskType(pointTask.Type) {
case constants.TaskTypeFunc:
return g.handleTask(ctx, rec, pointTask)
case constants.TaskTypeReport:
return g.handleReport(ctx, rec, pointTask, groupConfig)
case constants.TaskTypeCozeWorkflow:
return g.handleCozeWorkflow(ctx, rec, pointTask)
default:
return g.otherTask(ctx, rec)
}
return
}
func (g *GroupConfigBiz) getGroupTools(ctx context.Context, groupConfig *model.AiBotGroupConfig) (tools []model.AiBotTool, err error) {
if len(g.botTools) == 0 {
return
}
var (
groupRegisTools = make(map[int]struct{})
)
if groupConfig.ToolList != "" {
groupToolList := strings.Split(groupConfig.ToolList, ",")
for _, tool := range groupToolList {
if tool == "" {
continue
}
num, _err := strconv.Atoi(tool)
if _err != nil {
continue
}
groupRegisTools[num] = struct{}{}
}
}
for _, v := range g.botTools {
if v.PermissionType == constants.PermissionTypeNone {
tools = append(tools, v)
continue
}
if _, ex := groupRegisTools[int(v.ToolID)]; ex {
tools = append(tools, v)
}
}
return
}
func (q *GroupConfigBiz) handleTask(ctx context.Context, rec *entitys.Recognize, task *model.AiBotTool) (err error) {
var configData entitys.ConfigDataTool
err = json.Unmarshal([]byte(task.Config), &configData)
if err != nil {
return
}
err = q.toolManager.ExecuteTool(ctx, configData.Tool, rec)
if err != nil {
return
}
return
}
func (g *GroupConfigBiz) handleCozeWorkflow(ctx context.Context, rec *entitys.Recognize, task *model.AiBotTool) (err error) {
entitys.ResLoading(rec.Ch, task.Index, "正在执行工作流(coze)\n")
customClient := &http.Client{
Timeout: time.Minute * 30,
}
authCli := coze.NewTokenAuth(g.conf.Coze.ApiSecret)
cozeCli := coze.NewCozeAPI(
authCli,
coze.WithBaseURL(g.conf.Coze.BaseURL),
coze.WithHttpClient(customClient),
)
// 从参数中获取workflowID
type requestParams struct {
Request l_request.Request `json:"request"`
}
var config requestParams
err = json.Unmarshal([]byte(task.Config), &config)
if err != nil {
return err
}
workflowId, ok := config.Request.Json["workflow_id"].(string)
if !ok {
return fmt.Errorf("workflow_id不能为空")
}
// 提取参数
var data map[string]interface{}
err = json.Unmarshal([]byte(rec.Match.Parameters), &data)
req := &coze.RunWorkflowsReq{
WorkflowID: workflowId,
Parameters: data,
// IsAsync: true,
}
stream := config.Request.Json["stream"].(bool)
entitys.ResLog(rec.Ch, task.Index, "工作流执行中...")
if stream {
streamResp, err := cozeCli.Workflows.Runs.Stream(ctx, req)
if err != nil {
return err
}
g.handleCozeWorkflowEvents(ctx, streamResp, cozeCli, workflowId, rec.Ch, task.Index)
} else {
resp, err := cozeCli.Workflows.Runs.Create(ctx, req)
if err != nil {
return err
}
entitys.ResJson(rec.Ch, task.Index, resp.Data)
}
return
}
// handleCozeWorkflowEvents 处理 coze 工作流事件
func (g *GroupConfigBiz) handleCozeWorkflowEvents(ctx context.Context, resp coze.Stream[coze.WorkflowEvent], cozeCli coze.CozeAPI, workflowID string, ch chan entitys.Response, index string) {
defer resp.Close()
for {
event, err := resp.Recv()
if errors.Is(err, io.EOF) {
fmt.Println("Stream finished")
break
}
if err != nil {
fmt.Println("Error receiving event:", err)
break
}
switch event.Event {
case coze.WorkflowEventTypeMessage:
entitys.ResStream(ch, index, event.Message.Content)
case coze.WorkflowEventTypeError:
entitys.ResError(ch, index, fmt.Sprintf("工作流执行错误: %v", event.Error))
case coze.WorkflowEventTypeDone:
entitys.ResEnd(ch, index, "工作流执行完成")
case coze.WorkflowEventTypeInterrupt:
resumeReq := &coze.ResumeRunWorkflowsReq{
WorkflowID: workflowID,
EventID: event.Interrupt.InterruptData.EventID,
ResumeData: "your data",
InterruptType: event.Interrupt.InterruptData.Type,
}
newResp, err := cozeCli.Workflows.Runs.Resume(ctx, resumeReq)
if err != nil {
entitys.ResError(ch, index, fmt.Sprintf("工作流恢复执行错误: %s", err.Error()))
return
}
entitys.ResLog(ch, index, "工作流恢复执行中...")
g.handleCozeWorkflowEvents(ctx, newResp, cozeCli, workflowID, ch, index)
}
}
fmt.Printf("done, log:%s\n", resp.Response().LogID())
}
func (g *GroupConfigBiz) otherTask(ctx context.Context, rec *entitys.Recognize) (err error) {
entitys.ResText(rec.Ch, "", rec.Match.Reasoning)
return
}

View File

@ -0,0 +1,82 @@
package qywx
import (
"ai_scheduler/internal/config"
"ai_scheduler/internal/data/constants"
"ai_scheduler/internal/pkg/l_request"
"ai_scheduler/utils"
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"time"
"github.com/redis/go-redis/v9"
)
type Auth struct {
redis *redis.Client
cfg *config.Config
}
func NewAuth(cfg *config.Config, redis *utils.Rdb) *Auth {
return &Auth{
redis: redis.Rdb,
cfg: cfg,
}
}
func (a *Auth) GetAccessToken(ctx context.Context, corpid string, corpsecret string) (authInfo *AuthInfo, err error) {
if corpid == "" {
return nil, errors.New("corpid is empty")
}
accessToken := a.redis.Get(ctx, a.getKey(corpsecret)).Val()
var expire time.Duration
if accessToken == "" {
authRes, _err := a.getNewAccessToken(ctx, corpid, corpsecret)
if _err != nil {
return nil, _err
}
expire = time.Duration(authRes.ExpiresIn-60) * time.Second
err = a.redis.SetEx(ctx, a.getKey(corpsecret), authRes.AccessToken, expire).Err()
if err != nil {
return
}
accessToken = authRes.AccessToken
} else {
expire, _ = a.redis.TTL(ctx, a.getKey(corpsecret)).Result()
}
return &AuthInfo{
Corpid: corpid,
Corpsecret: corpsecret,
AccessToken: accessToken,
Expire: expire,
}, nil
}
func (a *Auth) getKey(corpsecret string) string {
return a.cfg.Redis.Key + ":" + constants.QywxAuthBaseKeyPrefix + ":" + corpsecret
}
func (a *Auth) getNewAccessToken(ctx context.Context, corpid string, corpsecret string) (auth AuthRes, err error) {
if corpid == "" || corpsecret == "" {
err = errors.New("corpid or corpsecret is empty")
return
}
req := l_request.Request{
Method: http.MethodGet,
Url: "https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=" + corpid + "&corpsecret=" + corpsecret,
}
res, err := req.Send()
if err != nil {
return
}
err = json.Unmarshal(res.Content, &auth)
if auth.Errcode != 0 {
err = fmt.Errorf("请求失败:%s", auth.Errmsg)
return
}
return
}

View File

@ -0,0 +1,109 @@
package qywx
import (
"ai_scheduler/internal/data/impl"
"ai_scheduler/internal/pkg/l_request"
"ai_scheduler/internal/pkg/util"
"context"
"encoding/json"
"fmt"
"net/http"
)
type Group struct {
groupImpl *impl.BotGroupQywxImpl
auth *Auth
}
func NewGroup(groupImpl *impl.BotGroupQywxImpl, auth *Auth) *Group {
return &Group{
groupImpl: groupImpl,
auth: auth,
}
}
// Create 方法用于创建群聊
// 参数:
// - ctx: context.Context上下文用于控制请求的超时和取消
// - req: GroupCreateReq创建群聊的请求参数结构体
// - corpid: string企业的CorpID
// - corpsecret: string应用的Secret
//
// 返回值:
// - GroupCreateResp: 创建群聊的响应结果
// - error: 错误信息,如果请求失败则返回错误
func (g *Group) Create(ctx context.Context, req GroupCreateReq, corpid string, corpsecret string) (GroupCreateResp, error) {
// 声明一个GroupCreateResp结构体变量res用于存储响应结果
var res GroupCreateResp
// 将请求结构体req转换为map类型的参数param
// 如果转换失败,忽略错误
param, _ := util.StructToMap(req)
// 发送HTTP请求到企业微信API创建群聊
// 参数依次为上下文、请求参数、请求URL、响应结果存储指针、企业ID、应用密钥
_, err := g.request(ctx, param, "https://qyapi.weixin.qq.com/cgi-bin/appchat/create", &res, corpid, corpsecret)
// 如果请求过程中发生错误,返回响应结果和错误
if err != nil {
return res, err
}
// 请求成功返回响应结果和nil错误
return res, nil
}
// SendMarkDown 方法用于发送Markdown格式的消息到群聊
// 参数:
// - ctx: 上下文信息,用于控制请求的超时和取消
// - req: 群聊发送Markdown消息的请求参数结构体
// - corpid: 企业微信corp ID
// - corpsecret: 企业微信应用的secret
//
// 返回值:
// - error: 操作过程中发生的错误如果成功则为nil
func (g *Group) SendMarkDown(ctx context.Context, req GroupSendMarkDownReq, corpid string, corpsecret string) error {
// 设置消息类型为Markdown
req.Msgtype = "markdown"
// 将请求结构体转换为map类型便于后续的HTTP请求参数处理
param, _ := util.StructToMap(req)
// 调用request方法发送HTTP请求到企业微信API
// 参数依次为:上下文、请求参数、API URL、额外请求头、corpid、corpsecret
_, err := g.request(ctx, param, " https://qyapi.weixin.qq.com/cgi-bin/appchat/send", nil, corpid, corpsecret)
// 如果请求过程中发生错误,直接返回错误
if err != nil {
return err
}
// 请求成功返回nil
return nil
}
func (g *Group) request(ctx context.Context, param map[string]interface{}, url string, resData interface{}, corpid string, corpsecret string) ([]byte, error) {
auth, err := g.auth.GetAccessToken(ctx, corpid, corpsecret)
if err != nil {
return nil, err
}
req := l_request.Request{
Method: http.MethodPost,
Url: url + "?access_token=" + auth.AccessToken,
Json: param,
}
res, err := req.Send()
if err != nil {
return nil, err
}
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("request failed, status code: %d,reason: %s", res.StatusCode, res.Reason)
}
var code commonResp
if err = json.Unmarshal(res.Content, &code); err != nil {
return nil, fmt.Errorf("返回结构异常:%s", string(res.Content))
}
if code.Errcode != 0 {
return nil, fmt.Errorf("返回状态异常:%s", string(code.Errmsg))
}
if resData != nil {
if err = json.Unmarshal(res.Content, resData); err != nil {
return nil, fmt.Errorf("返回数据异常:%s", string(res.Content))
}
}
return res.Content, nil
}

View File

@ -0,0 +1,12 @@
# weworkapi_cplusplus
official lib of wework api https://work.weixin.qq.com/api/doc
# 注意事项
* 1.回调sdk json版本
* 2.wxbizjsonmsgcrypt.go文件中声明并实现了WXBizJsonMsgCrypt类提供用户接入企业微信的三个接口。sample.go文件提供了如何使用这三个接口的示例。
* 3.WXBizJsonMsgCrypt类封装了VerifyURL, DecryptMsg, EncryptMsg三个接口分别用于开发者验证回调url收到用户回复消息的解密以及开发者回复消息的加密过程。使用方法可以参考sample.go文件。
* 4.加解密协议请参考企业微信官方文档。

View File

@ -0,0 +1,126 @@
package json_callback
import (
"ai_scheduler/internal/biz/handle/qywx/json_callback/wxbizjsonmsgcrypt"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/url"
"strings"
)
const token = "gY1AGR3mjBhzy"
const receiverId = "wwabfd0cec7171e769"
const encodingAeskey = "g8VGfQEqluUhoKOlyjmmll8Q9C5tVFUTX5T2qkmI9Sv"
func getString(str, endstr string, start int, msg *string) int {
end := strings.Index(str, endstr)
*msg = str[start:end]
return end + len(endstr)
}
func VerifyURL(w http.ResponseWriter, r *http.Request) {
//httpstr := `&{GET /?msg_signature=825075c093249d5a60967fe4a613cae93146636b&timestamp=1597998748&nonce=1597483820&echostr=neLB8CftccHiz19tluVb%2BUBnUVMT3xpUMZU8qvDdD17eH8XfEsbPYC%2FkJyPsZOOc6GdsCeu8jSIa2noSJ%2Fez2w%3D%3D HTTP/1.1 1 1 map[Cache-Control:[no-cache] Accept:[*/*] Pragma:[no-cache] User-Agent:[Mozilla/4.0]] 0x86c180 0 [] false 100.108.211.112:8893 map[] map[] <nil> map[] 100.108.79.233:59663 /?msg_signature=825075c093249d5a60967fe4a613cae93146636b&timestamp=1597998748&nonce=1597483820&echostr=neLB8CftccHiz19tluVb%2BUBnUVMT3xpUMZU8qvDdD17eH8XfEsbPYC%2FkJyPsZOOc6GdsCeu8jSIa2noSJ%2Fez2w%3D%3D <nil>}`
fmt.Println(r, r.Body)
httpstr := r.URL.RawQuery
start := strings.Index(httpstr, "msg_signature=")
start += len("msg_signature=")
var msg_signature string
next := getString(httpstr, "&timestamp=", start, &msg_signature)
var timestamp string
next = getString(httpstr, "&nonce=", next, &timestamp)
var nonce string
next = getString(httpstr, "&echostr=", next, &nonce)
echostr := httpstr[next:len(httpstr)]
echostr, _ = url.QueryUnescape(echostr)
fmt.Println(msg_signature, timestamp, nonce, echostr, next)
wxcpt := wxbizjsonmsgcrypt.NewWXBizMsgCrypt(token, encodingAeskey, receiverId, wxbizjsonmsgcrypt.JsonType)
echoStr, cryptErr := wxcpt.VerifyURL(msg_signature, timestamp, nonce, echostr)
if nil != cryptErr {
fmt.Println("verifyUrl fail", cryptErr)
}
fmt.Println("verifyUrl success echoStr", string(echoStr))
fmt.Fprintf(w, string(echoStr))
}
type MsgContent struct {
ToUsername string `json:"ToUserName"`
FromUsername string `json:"FromUserName"`
CreateTime uint32 `json:"CreateTime"`
MsgType string `json:"MsgType"`
Content string `json:"Content"`
Msgid uint64 `json:"MsgId"`
Agentid uint32 `json:"AgentId"`
}
func MsgHandler(w http.ResponseWriter, r *http.Request) {
httpstr := r.URL.RawQuery
start := strings.Index(httpstr, "msg_signature=")
start += len("msg_signature=")
var msg_signature string
next := getString(httpstr, "&timestamp=", start, &msg_signature)
var timestamp string
next = getString(httpstr, "&nonce=", next, &timestamp)
nonce := httpstr[next:len(httpstr)]
fmt.Println(msg_signature, timestamp, nonce)
body, err := ioutil.ReadAll(r.Body)
fmt.Println(string(body), err)
wxcpt := wxbizjsonmsgcrypt.NewWXBizMsgCrypt(token, encodingAeskey, receiverId, wxbizjsonmsgcrypt.JsonType)
msg, err_ := wxcpt.DecryptMsg(msg_signature, timestamp, nonce, body)
fmt.Println(string(msg), err_)
var msgContent MsgContent
err = json.Unmarshal(msg, &msgContent)
if nil != err {
fmt.Println("Unmarshal fail", err)
} else {
fmt.Println("struct", msgContent)
}
fmt.Println(msgContent, err)
ToUsername := msgContent.ToUsername
msgContent.ToUsername = msgContent.FromUsername
msgContent.FromUsername = ToUsername
fmt.Println("replaymsg", msgContent)
replayJson, err := json.Marshal(&msgContent)
encryptMsg, cryptErr := wxcpt.EncryptMsg(string(replayJson), "1409659589", "1409659589")
if nil != cryptErr {
fmt.Println("DecryptMsg fail", cryptErr)
}
sEncryptMsg := string(encryptMsg)
fmt.Println("after encrypt sEncryptMsg: ", sEncryptMsg)
fmt.Fprintf(w, sEncryptMsg)
}
func CallbackHandler(w http.ResponseWriter, r *http.Request) {
httpstr := r.URL.RawQuery
echo := strings.Index(httpstr, "echostr")
if echo != -1 {
VerifyURL(w, r)
} else {
MsgHandler(w, r)
}
fmt.Println("finished CallbackHandler", httpstr)
}
func main() {
http.HandleFunc("/", CallbackHandler) // 设置访问路由
log.Fatal(http.ListenAndServe(":8893", nil))
}

View File

@ -0,0 +1,140 @@
package json_callback
//
//import (
// "ai_scheduler/internal/biz/handle/qywx/json_callback/wxbizjsonmsgcrypt"
// "encoding/json"
// "fmt"
//)
//
//type MsgContent struct {
// ToUsername string `json:"ToUserName"`
// FromUsername string `json:"FromUserName"`
// CreateTime uint32 `json:"CreateTime"`
// MsgType string `json:"MsgType"`
// Content string `json:"Content"`
// Msgid uint64 `json:"MsgId"`
// Agentid uint32 `json:"AgentId"`
//}
//
//func main() {
// token := "QDG6eK"
// receiverId := "wx5823bf96d3bd56c7"
// encodingAeskey := "jWmYm7qr5nMoAUwZRjGtBxmz3KA1tkAj3ykkR6q2B2C"
// wxcpt := wxbizjsonmsgcrypt.NewWXBizMsgCrypt(token, encodingAeskey, receiverId, wxbizjsonmsgcrypt.JsonType)
// /*
// ------------使用示例一验证回调URL---------------
// *企业开启回调模式时企业微信会向验证url发送一个get请求
// 假设点击验证时,企业收到类似请求:
// * GET /cgi-bin/wxpush?msg_signature=5c45ff5e21c57e6ad56bac8758b79b1d9ac89fd3&timestamp=1409659589&nonce=263014780&echostr=P9nAzCzyDtyTWESHep1vC5X9xho%2FqYX3Zpb4yKa9SKld1DsH3Iyt3tP3zNdtp%2B4RPcs8TgAE7OaBO%2BFZXvnaqQ%3D%3D
// * HTTP/1.1 Host: qy.weixin.qq.com
//
// 接收到该请求时,企业应
// 1.解析出Get请求的参数包括消息体签名(msg_signature),时间戳(timestamp),随机数字串(nonce)以及企业微信推送过来的随机加密字符串(echostr),
// 这一步注意作URL解码。
// 2.验证消息体签名的正确性
// 3. 解密出echostr原文将原文当作Get请求的response返回给企业微信
// 第23步可以用企业微信提供的库函数VerifyURL来实现。
//
// */
// // 解析出url上的参数值如下
// // verifyMsgSign := HttpUtils.ParseUrl("msg_signature")
// verifyMsgSign := "5c45ff5e21c57e6ad56bac8758b79b1d9ac89fd3"
// // verifyTimestamp := HttpUtils.ParseUrl("timestamp")
// verifyTimestamp := "1409659589"
// // verifyNonce := HttpUtils.ParseUrl("nonce")
// verifyNonce := "263014780"
// // verifyEchoStr := HttpUtils.ParseUrl("echoStr")
// verifyEchoStr := "P9nAzCzyDtyTWESHep1vC5X9xho/qYX3Zpb4yKa9SKld1DsH3Iyt3tP3zNdtp+4RPcs8TgAE7OaBO+FZXvnaqQ=="
// echoStr, cryptErr := wxcpt.VerifyURL(verifyMsgSign, verifyTimestamp, verifyNonce, verifyEchoStr)
// if nil != cryptErr {
// fmt.Println("verifyUrl fail", cryptErr)
// }
// fmt.Println("verifyUrl success echoStr", string(echoStr))
// // 验证URL成功将sEchoStr返回
// // HttpUtils.SetResponse(sEchoStr)
//
// /*
// ------------使用示例二:对用户回复的消息解密---------------
// 用户回复消息或者点击事件响应时企业会收到回调消息此消息是经过企业微信加密之后的密文以post形式发送给企业密文格式请参考官方文档
// 假设企业收到企业微信的回调消息如下:
// POST /cgi-bin/wxpush? msg_signature=477715d11cdb4164915debcba66cb864d751f3e6&timestamp=1409659813&nonce=1372623149 HTTP/1.1
// Host: qy.weixin.qq.com
// Content-Length: 613
// {
// "tousername":"wx5823bf96d3bd56c7",
// "encrypt":"CZWs4CWRpI4VolQlvn4dlPBlXke6+HgmuI7p0LueFp1fKH40TNL+YHWJZwqIiYV+3kTrhdNU7fZwc+PmtgBvxSczkFeRz+oaVSsomrrtP2Z91LE313djjbWujqInRT+7ChGbCeo7ZzszByf8xnDSunPBxRX1MfX3kAxpKq7dqduW1kpMAx8O8xUzZ9oC0TLuZchbpxaml4epzGfF21O+zyXDwTxbCEiO0E87mChtzuh/VPlznXYbfqVrnyLNZ5pr",
// "agentid":"218"
// }
//
// 企业收到post请求之后应该
// 1.解析出url上的参数包括消息体签名(msg_signature),时间戳(timestamp)以及随机数字串(nonce)
// 2.验证消息体签名的正确性。
// 3.将post请求的数据进行json解析并将"Encrypt"标签的内容进行解密,解密出来的明文即是用户回复消息的明文,明文格式请参考官方文档
// 第23步可以用企业微信提供的库函数DecryptMsg来实现。
// */
//
// // reqMsgSign := HttpUtils.ParseUrl("msg_signature")
// reqMsgSign := "0623cbc5a8cbee5bcc137c70de99575366fc2af3"
// // reqTimestamp := HttpUtils.ParseUrl("timestamp")
// reqTimestamp := "1409659813"
// // reqNonce := HttpUtils.ParseUrl("nonce")
// reqNonce := "1372623149"
// // post请求的密文数据
// // reqData = HttpUtils.PostData()
//
// reqData := []byte(`{"tousername":"wx5823bf96d3bd56c7","encrypt":"CZWs4CWRpI4VolQlvn4dlEC1alN2MUEY2VklGehgBVLBrlVF7SyT+SV+Toj43l4ayJ9UMGKphktKKmP7B2j/P1ey67XB8PBgS7Wr5/8+w/yWriZv3Vmoo/MH3/1HsIWZrPQ3N2mJrelStIfI2Y8kLKXA7EhfZgZX4o+ffdkZDM76SEl79Ib9mw7TGjZ9Aw/x/A2VjNbV1E8BtEbRxYYcQippYNw7hr8sFfa3nW1xLdxokt8QkRX83vK3DFP2F6TQFPL2Tu98UwhcUpPvdJBuu1/yiOQIScppV3eOuLWEsko=","agentid":"218"}`)
//
// msg, cryptErr := wxcpt.DecryptMsg(reqMsgSign, reqTimestamp, reqNonce, reqData)
// if nil != cryptErr {
// fmt.Println("DecryptMsg fail", cryptErr)
// }
// fmt.Println("after decrypt msg: ", string(msg))
// // TODO: 解析出明文json标签的内容进行处理
// // For example:
//
// var msgContent MsgContent
// err := json.Unmarshal(msg, &msgContent)
// if nil != err {
// fmt.Println("Unmarshal fail", err)
// } else {
// fmt.Println("struct", msgContent)
// }
//
// /*
// ------------使用示例三:企业回复用户消息的加密---------------
// 企业被动回复用户的消息也需要进行加密并且拼接成密文格式的json串。
// 假设企业需要回复用户的明文如下:
//
// {
// "ToUserName": "mycreate",
// "FromUserName":"wx5823bf96d3bd56c7",
// "CreateTime": 1348831860,
// "MsgType": "text",
// "Content": "this is a test",
// "MsgId": 1234567890123456,
// "AgentID": 128
// }
//
// 为了将此段明文回复给用户,企业应:
// 1.自己生成时间时间戳(timestamp),随机数字串(nonce)以便生成消息体签名也可以直接用从企业微信的post url上解析出的对应值。
// 2.将明文加密得到密文。
// 3.用密文步骤1生成的timestamp,nonce和企业在企业微信设定的token生成消息体签名。
// 4.将密文消息体签名时间戳随机数字串拼接成json格式的字符串发送给企业。
// 以上234步可以用企业微信提供的库函数EncryptMsg来实现。
// */
// respData := "{\"ToUserName\":\"wx5823bf96d3bd56c7\",\"FromUserName\":\"mycreate\",\"CreateTime\": 1409659813,\"MsgType\":\"text\",\"Content\":\"hello\",\"MsgId\":4561255354251345929,\"AgentID\": 218}"
// //respData := `{"ToUserName":"wx5823bf96d3bd56c7","FromUserName":"mycreate","CreateTime": 1409659813,"MsgType":"text","Content":"hello","MsgId":4561255354251345929,"AgentID": 218}`
// //respData := `{"FromUserName":"mycreate","CreateTime": 1409659813,"MsgType":"text","Content":"hello","MsgId":4561255354251345929,"AgentID": 218}`
// encryptMsg, cryptErr := wxcpt.EncryptMsg(respData, reqTimestamp, reqNonce)
// if nil != cryptErr {
// fmt.Println("DecryptMsg fail", cryptErr)
// }
//
// sEncryptMsg := string(encryptMsg)
//
// fmt.Println("after encrypt sEncryptMsg: ", sEncryptMsg)
// // 加密成功
// // TODO:
// // HttpUtils.SetResponse(sEncryptMsg)
//}

View File

@ -0,0 +1,310 @@
package wxbizjsonmsgcrypt
import (
"bytes"
"crypto/aes"
"crypto/cipher"
"crypto/sha1"
"encoding/base64"
"encoding/binary"
"encoding/json"
"fmt"
"math/rand"
"sort"
"strings"
)
const letterBytes = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
const (
ValidateSignatureError int = -40001
ParseJsonError int = -40002
ComputeSignatureError int = -40003
IllegalAesKey int = -40004
ValidateCorpidError int = -40005
EncryptAESError int = -40006
DecryptAESError int = -40007
IllegalBuffer int = -40008
EncodeBase64Error int = -40009
DecodeBase64Error int = -40010
GenJsonError int = -40011
IllegalProtocolType int = -40012
)
type ProtocolType int
const (
JsonType ProtocolType = 1
)
type CryptError struct {
ErrCode int
ErrMsg string
}
func NewCryptError(err_code int, err_msg string) *CryptError {
return &CryptError{ErrCode: err_code, ErrMsg: err_msg}
}
type WXBizJsonMsg4Recv struct {
Tousername string `json:"tousername"`
Encrypt string `json:"encrypt"`
Agentid string `json:"agentid"`
}
type WXBizJsonMsg4Send struct {
Encrypt string `json:"encrypt"`
Signature string `json:"msgsignature"`
Timestamp string `json:"timestamp"`
Nonce string `json:"nonce"`
}
func NewWXBizJsonMsg4Send(encrypt, signature, timestamp, nonce string) *WXBizJsonMsg4Send {
return &WXBizJsonMsg4Send{Encrypt: encrypt, Signature: signature, Timestamp: timestamp, Nonce: nonce}
}
type ProtocolProcessor interface {
parse(src_data []byte) (*WXBizJsonMsg4Recv, *CryptError)
serialize(msg_send *WXBizJsonMsg4Send) ([]byte, *CryptError)
}
type WXBizMsgCrypt struct {
token string
encoding_aeskey string
receiver_id string
protocol_processor ProtocolProcessor
}
type JsonProcessor struct {
}
func (self *JsonProcessor) parse(src_data []byte) (*WXBizJsonMsg4Recv, *CryptError) {
var msg4_recv WXBizJsonMsg4Recv
err := json.Unmarshal(src_data, &msg4_recv)
if nil != err {
fmt.Println("Unmarshal fail", err)
return nil, NewCryptError(ParseJsonError, "json to msg fail")
}
return &msg4_recv, nil
}
func (self *JsonProcessor) serialize(msg4_send *WXBizJsonMsg4Send) ([]byte, *CryptError) {
json_msg, err := json.Marshal(msg4_send)
if nil != err {
return nil, NewCryptError(GenJsonError, err.Error())
}
return json_msg, nil
}
func NewWXBizMsgCrypt(token, encoding_aeskey, receiver_id string, protocol_type ProtocolType) *WXBizMsgCrypt {
var protocol_processor ProtocolProcessor
if protocol_type != JsonType {
panic("unsupport protocal")
} else {
protocol_processor = new(JsonProcessor)
}
return &WXBizMsgCrypt{token: token, encoding_aeskey: (encoding_aeskey + "="), receiver_id: receiver_id, protocol_processor: protocol_processor}
}
func (self *WXBizMsgCrypt) randString(n int) string {
b := make([]byte, n)
for i := range b {
b[i] = letterBytes[rand.Int63()%int64(len(letterBytes))]
}
return string(b)
}
func (self *WXBizMsgCrypt) pKCS7Padding(plaintext string, block_size int) []byte {
padding := block_size - (len(plaintext) % block_size)
padtext := bytes.Repeat([]byte{byte(padding)}, padding)
var buffer bytes.Buffer
buffer.WriteString(plaintext)
buffer.Write(padtext)
return buffer.Bytes()
}
func (self *WXBizMsgCrypt) pKCS7Unpadding(plaintext []byte, block_size int) ([]byte, *CryptError) {
plaintext_len := len(plaintext)
if nil == plaintext || plaintext_len == 0 {
return nil, NewCryptError(DecryptAESError, "pKCS7Unpadding error nil or zero")
}
if plaintext_len%block_size != 0 {
return nil, NewCryptError(DecryptAESError, "pKCS7Unpadding text not a multiple of the block size")
}
padding_len := int(plaintext[plaintext_len-1])
return plaintext[:plaintext_len-padding_len], nil
}
func (self *WXBizMsgCrypt) cbcEncrypter(plaintext string) ([]byte, *CryptError) {
aeskey, err := base64.StdEncoding.DecodeString(self.encoding_aeskey)
if nil != err {
return nil, NewCryptError(DecodeBase64Error, err.Error())
}
const block_size = 32
pad_msg := self.pKCS7Padding(plaintext, block_size)
block, err := aes.NewCipher(aeskey)
if err != nil {
return nil, NewCryptError(EncryptAESError, err.Error())
}
ciphertext := make([]byte, len(pad_msg))
iv := aeskey[:aes.BlockSize]
mode := cipher.NewCBCEncrypter(block, iv)
mode.CryptBlocks(ciphertext, pad_msg)
base64_msg := make([]byte, base64.StdEncoding.EncodedLen(len(ciphertext)))
base64.StdEncoding.Encode(base64_msg, ciphertext)
return base64_msg, nil
}
func (self *WXBizMsgCrypt) cbcDecrypter(base64_encrypt_msg string) ([]byte, *CryptError) {
aeskey, err := base64.StdEncoding.DecodeString(self.encoding_aeskey)
if nil != err {
return nil, NewCryptError(DecodeBase64Error, err.Error())
}
encrypt_msg, err := base64.StdEncoding.DecodeString(base64_encrypt_msg)
if nil != err {
return nil, NewCryptError(DecodeBase64Error, err.Error())
}
block, err := aes.NewCipher(aeskey)
if err != nil {
return nil, NewCryptError(DecryptAESError, err.Error())
}
if len(encrypt_msg) < aes.BlockSize {
return nil, NewCryptError(DecryptAESError, "encrypt_msg size is not valid")
}
iv := aeskey[:aes.BlockSize]
if len(encrypt_msg)%aes.BlockSize != 0 {
return nil, NewCryptError(DecryptAESError, "encrypt_msg not a multiple of the block size")
}
mode := cipher.NewCBCDecrypter(block, iv)
mode.CryptBlocks(encrypt_msg, encrypt_msg)
return encrypt_msg, nil
}
func (self *WXBizMsgCrypt) calSignature(timestamp, nonce, data string) string {
sort_arr := []string{self.token, timestamp, nonce, data}
sort.Strings(sort_arr)
var buffer bytes.Buffer
for _, value := range sort_arr {
buffer.WriteString(value)
}
sha := sha1.New()
sha.Write(buffer.Bytes())
signature := fmt.Sprintf("%x", sha.Sum(nil))
return string(signature)
}
func (self *WXBizMsgCrypt) ParsePlainText(plaintext []byte) ([]byte, uint32, []byte, []byte, *CryptError) {
const block_size = 32
plaintext, err := self.pKCS7Unpadding(plaintext, block_size)
if nil != err {
return nil, 0, nil, nil, err
}
text_len := uint32(len(plaintext))
if text_len < 20 {
return nil, 0, nil, nil, NewCryptError(IllegalBuffer, "plain is to small 1")
}
random := plaintext[:16]
msg_len := binary.BigEndian.Uint32(plaintext[16:20])
if text_len < (20 + msg_len) {
return nil, 0, nil, nil, NewCryptError(IllegalBuffer, "plain is to small 2")
}
msg := plaintext[20 : 20+msg_len]
receiver_id := plaintext[20+msg_len:]
return random, msg_len, msg, receiver_id, nil
}
func (self *WXBizMsgCrypt) VerifyURL(msg_signature, timestamp, nonce, echostr string) ([]byte, *CryptError) {
signature := self.calSignature(timestamp, nonce, echostr)
if strings.Compare(signature, msg_signature) != 0 {
return nil, NewCryptError(ValidateSignatureError, "signature not equal")
}
plaintext, err := self.cbcDecrypter(echostr)
if nil != err {
return nil, err
}
_, _, msg, receiver_id, err := self.ParsePlainText(plaintext)
if nil != err {
return nil, err
}
if len(self.receiver_id) > 0 && strings.Compare(string(receiver_id), self.receiver_id) != 0 {
fmt.Println(string(receiver_id), self.receiver_id, len(receiver_id), len(self.receiver_id))
return nil, NewCryptError(ValidateCorpidError, "receiver_id is not equil")
}
return msg, nil
}
func (self *WXBizMsgCrypt) EncryptMsg(reply_msg, timestamp, nonce string) ([]byte, *CryptError) {
rand_str := self.randString(16)
var buffer bytes.Buffer
buffer.WriteString(rand_str)
msg_len_buf := make([]byte, 4)
binary.BigEndian.PutUint32(msg_len_buf, uint32(len(reply_msg)))
buffer.Write(msg_len_buf)
buffer.WriteString(reply_msg)
buffer.WriteString(self.receiver_id)
tmp_ciphertext, err := self.cbcEncrypter(buffer.String())
if nil != err {
return nil, err
}
ciphertext := string(tmp_ciphertext)
signature := self.calSignature(timestamp, nonce, ciphertext)
msg4_send := NewWXBizJsonMsg4Send(ciphertext, signature, timestamp, nonce)
return self.protocol_processor.serialize(msg4_send)
}
func (self *WXBizMsgCrypt) DecryptMsg(msg_signature, timestamp, nonce string, post_data []byte) ([]byte, *CryptError) {
msg4_recv, crypt_err := self.protocol_processor.parse(post_data)
if nil != crypt_err {
return nil, crypt_err
}
signature := self.calSignature(timestamp, nonce, msg4_recv.Encrypt)
if strings.Compare(signature, msg_signature) != 0 {
return nil, NewCryptError(ValidateSignatureError, "signature not equal")
}
plaintext, crypt_err := self.cbcDecrypter(msg4_recv.Encrypt)
if nil != crypt_err {
return nil, crypt_err
}
_, _, msg, receiver_id, crypt_err := self.ParsePlainText(plaintext)
if nil != crypt_err {
return nil, crypt_err
}
if len(self.receiver_id) > 0 && strings.Compare(string(receiver_id), self.receiver_id) != 0 {
return nil, NewCryptError(ValidateCorpidError, "receiver_id is not equil")
}
return msg, nil
}

View File

@ -0,0 +1,10 @@
package qywx
import (
"github.com/google/wire"
)
var ProviderSetQywx = wire.NewSet(
NewAuth,
NewGroup,
)

View File

@ -0,0 +1,45 @@
package qywx
import "time"
type AuthRes struct {
Errcode int `json:"errcode"`
Errmsg string `json:"errmsg"`
AccessToken string `json:"access_token"`
ExpiresIn int `json:"expires_in"`
}
type AuthInfo struct {
Corpid string `json:"corpid"`
Corpsecret string `json:"corpsecret"`
AccessToken string `json:"accessToken"`
Expire time.Duration `json:"expireIn"`
}
type GroupCreateReq struct {
Name string `json:"name"`
Owner string `json:"owner"`
Userlist []string `json:"userlist"`
Chatid string `json:"chatid"`
}
type GroupCreateResp struct {
Errcode int `json:"errcode"`
Errmsg string `json:"errmsg"`
Chatid string `json:"chatid"`
}
type commonResp struct {
Errcode int `json:"errcode"`
Errmsg string `json:"errmsg"`
}
type GroupSendMarkDownReq struct {
Chatid string `json:"chatid"`
Msgtype string `json:"msgtype"`
Markdown MarkDown `json:"markdown"`
}
type MarkDown struct {
Content string `json:"content"`
}

View File

@ -1,10 +1,10 @@
package biz
import (
"ai_scheduler/internal/biz/do"
"ai_scheduler/internal/biz/llm_service"
"github.com/google/wire"
"ai_scheduler/internal/biz/do"
"ai_scheduler/internal/biz/llm_service"
"github.com/google/wire"
)
var ProviderSetBiz = wire.NewSet(
@ -15,7 +15,9 @@ var ProviderSetBiz = wire.NewSet(
llm_service.NewOllamaGenerate,
//handle.NewHandle,
do.NewDo,
do.NewHandle,
do.NewHandle,
NewTaskBiz,
NewDingTalkBotBiz,
NewQywxAppBiz,
NewGroupConfigBiz,
)

88
internal/biz/qywx_app.go Normal file
View File

@ -0,0 +1,88 @@
package biz
import (
"ai_scheduler/internal/biz/handle/qywx"
"ai_scheduler/internal/data/constants"
"ai_scheduler/internal/data/impl"
"ai_scheduler/internal/data/model"
"ai_scheduler/internal/pkg"
"ai_scheduler/internal/tools/bbxt"
"context"
"fmt"
"time"
"ai_scheduler/internal/config"
"xorm.io/builder"
)
// AiRouterBiz 智能路由服务
type QywxAppBiz struct {
conf *config.Config
botGroupQywxImpl *impl.BotGroupQywxImpl
qywxGroupHandle *qywx.Group
}
// NewDingTalkBotBiz
func NewQywxAppBiz(
conf *config.Config,
botGroupQywxImpl *impl.BotGroupQywxImpl,
qywxGroupHandle *qywx.Group,
) *QywxAppBiz {
return &QywxAppBiz{
conf: conf,
botGroupQywxImpl: botGroupQywxImpl,
qywxGroupHandle: qywxGroupHandle,
}
}
func (q *QywxAppBiz) InitGroup(ctx context.Context) (string, error) {
chatId := pkg.RandomString(q.conf.Qywx.ChatIdLen)
GroupInfo := &model.AiBotGroupQywx{
Title: "bot_group_" + time.Now().Format(time.DateOnly),
ChatID: chatId,
ConfigID: q.conf.Qywx.DefaultConfigId,
AppSecret: q.conf.Qywx.AppSecret,
}
_, err := q.botGroupQywxImpl.Add(GroupInfo)
if err != nil {
return "", err
}
resp, err := q.qywxGroupHandle.Create(
ctx,
qywx.GroupCreateReq{
Name: GroupInfo.Title,
Chatid: GroupInfo.ChatID,
Userlist: []string{
q.conf.Qywx.InitAccount,
},
},
q.conf.Qywx.CorpId,
GroupInfo.AppSecret,
)
if err != nil {
return "", err
}
return resp.Chatid, nil
}
func (q *QywxAppBiz) GetGroupInfo(ctx context.Context, groupId int) (group model.AiBotGroupQywx, err error) {
cond := builder.NewCond()
cond = cond.And(builder.Eq{"group_id": groupId})
cond = cond.And(builder.Eq{"status": constants.Enable})
err = q.botGroupQywxImpl.GetOneBySearchToStrut(&cond, &group)
return
}
func (q *QywxAppBiz) SendReport(ctx context.Context, groupInfo *model.AiBotGroupQywx, report *bbxt.ReportRes) (err error) {
confitent := fmt.Sprintf("%s\n%s", report.Title, fmt.Sprintf("![图片](%s)", report.Url))
err = q.qywxGroupHandle.SendMarkDown(ctx, qywx.GroupSendMarkDownReq{
Chatid: groupInfo.ChatID,
Markdown: qywx.MarkDown{
Content: confitent,
},
}, q.conf.Qywx.CorpId, groupInfo.AppSecret)
return
}

View File

@ -0,0 +1,32 @@
package biz
import (
"ai_scheduler/internal/biz/handle/qywx"
"ai_scheduler/internal/config"
"ai_scheduler/internal/data/impl"
"ai_scheduler/utils"
"context"
"testing"
)
func Test_InitGroup(t *testing.T) {
run()
chatId, err := qywxAppBiz.InitGroup(context.Background())
t.Log(chatId, err)
}
var (
configConfig *config.Config
qywxAppBiz *QywxAppBiz
)
func run() {
configConfig, _ = config.LoadConfigWithTest()
// 初始化数据库连接
db, _ := utils.NewGormDb(configConfig)
rdb := utils.NewRdb(configConfig)
botGroupQywxImpl := impl.NewBotGroupQywxImpl(db)
qywxAuth := qywx.NewAuth(configConfig, rdb)
group := qywx.NewGroup(botGroupQywxImpl, qywxAuth)
qywxAppBiz = NewQywxAppBiz(configConfig, botGroupQywxImpl, group)
}

View File

@ -1,122 +0,0 @@
package biz
// import (
// "ai_scheduler/internal/config"
// "ai_scheduler/internal/data/impl"
// "ai_scheduler/internal/data/model"
// "ai_scheduler/internal/entitys"
// "ai_scheduler/internal/pkg"
// "ai_scheduler/internal/pkg/utils_ollama"
// "ai_scheduler/internal/tools"
// "ai_scheduler/utils"
// "encoding/json"
// "flag"
// "fmt"
// "os"
// "path/filepath"
// "testing"
// "github.com/gofiber/fiber/v2/log"
// )
// func Test_task(t *testing.T) {
// var c entitys.TaskConfig
// config := `{"param": {"type": "object", "required": ["number"], "properties": {"number": {"type": "string", "description": "订单编号/流水号"}}}, "request": {"url": "http://www.baidu.com/${number}", "headers": {"Authorization": "${authorization}"}, "method": "GET"}}`
// err := json.Unmarshal([]byte(config), &c)
// t.Log(err)
// }
// type configData struct {
// Param map[string]interface{} `json:"param"`
// Do map[string]interface{} `json:"do"`
// }
// func Test_Order(t *testing.T) {
// routerBiz := in()
// ch := make(chan entitys.Response, 5)
// defer close(ch)
// err := routerBiz.handleTask(ch, nil, &entitys.Match{Index: "order_diagnosis", Parameters: `{"order_number":"822895927188791297"}`}, &model.AiTask{Config: `{"tool": "zltxOrderDetail", "param": {"type": "object", "optional": [], "required": ["order_number"], "properties": {"order_number": {"type": "string", "description": "订单编号/流水号"}}}}`})
// select {
// case v := <-ch: // 尝试接收
// fmt.Println("接收到值:", v)
// default:
// fmt.Println("无数据可接收")
// }
// t.Log(err)
// }
// func Test_OrderLog(t *testing.T) {
// routerBiz := in()
// ch := make(chan entitys.Response, 5)
// defer close(ch)
// err := routerBiz.handleTask(ch, nil, &entitys.Match{Index: "order_diagnosis", Parameters: `{"order_number":"822979421673758721","serial_number":"822979421979938817"}`}, &model.AiTask{Config: `{"tool": "zltxOrderDirectLog", "param": {"type": "object", "optional": [], "required": ["order_number"], "properties": {"order_number": {"type": "string", "description": "订单编号/流水号"}}}}`})
// t.Log(err)
// }
// func Test_ProductLog(t *testing.T) {
// routerBiz := in()
// ch := make(chan entitys.Response, 5)
// defer close(ch)
// err := routerBiz.handleTask(ch, nil, &entitys.Match{Index: "order_diagnosis", Parameters: `{"name":"利楚测试"}`}, &model.AiTask{Config: `{"tool": "zltxProduct", "param": {"type": "object", "optional": [], "required": ["order_number"], "properties": {"order_number": {"type": "string", "description": "订单编号/流水号"}}}}`})
// t.Log(err)
// }
// func Test_ZltxStatistics(t *testing.T) {
// routerBiz := in()
// ch := make(chan entitys.Response, 5)
// defer close(ch)
// err := routerBiz.handleTask(ch, nil, &entitys.Match{Index: "order_diagnosis", Parameters: `{"number":"13737882067"}`}, &model.AiTask{Config: `{"tool": "zltxOrderStatistics", "param": {"type": "object", "optional": [], "required": ["number"], "properties": {"number": {"type": "string", "description": "充值账号/分销商ID"}}}}`})
// t.Log(err)
// }
// func in() *AiRouterBiz {
// modDir, err := getModuleDir()
// if err != nil {
// panic("1")
// }
// configPath := flag.String("config", fmt.Sprintf("%s/config/config.yaml", modDir), "Path to configuration file")
// flag.Parse()
// configConfig, err := config.LoadConfig(*configPath)
// if err != nil {
// panic("加载配置失败")
// }
// client, _, err := utils_ollama.NewClient(configConfig)
// allLogger := log.DefaultLogger()
// utilOllama := utils_ollama.NewUtilOllama(configConfig, allLogger)
// manager := tools.NewManager(configConfig, client)
// db, _ := utils.NewGormDb(configConfig)
// sessionImpl := impl.NewSessionImpl(db)
// sysImpl := impl.NewSysImpl(db)
// taskImpl := impl.NewTaskImpl(db)
// chatImpl := impl.NewChatImpl(db)
// safeChannelPool, _ := pkg.NewSafeChannelPool(configConfig)
// routerBiz := NewAiRouterBiz(manager, sessionImpl, sysImpl, taskImpl, chatImpl, configConfig, utilOllama, safeChannelPool, client)
// return routerBiz
// }
// func getModuleDir() (string, error) {
// dir, err := os.Getwd()
// if err != nil {
// return "", err
// }
// for {
// modPath := filepath.Join(dir, "go.mod")
// if _, err := os.Stat(modPath); err == nil {
// return dir, nil // 找到 go.mod
// }
// // 向上查找父目录
// parent := filepath.Dir(dir)
// if parent == dir {
// break // 到达根目录,未找到
// }
// dir = parent
// }
// return "", fmt.Errorf("go.mod not found in current directory or parents")
// }

View File

@ -26,6 +26,7 @@ type Config struct {
PermissionConfig PermissionConfig `mapstructure:"permissionConfig"`
LLM LLM `mapstructure:"llm"`
Dingtalk DingtalkConfig `mapstructure:"dingtalk"`
Qywx QywxConfig `mapstructure:"qywx"`
}
type SysPrompt struct {
@ -72,6 +73,18 @@ type DingtalkConfig struct {
BotGroupID map[string]int `mapstructure:"bot_group_id"` // 机器人群组
}
// QywxConfig 企业微信配置
type QywxConfig struct {
CorpId string `mapstructure:"corp_id"`
DefaultConfigId int32 `mapstructure:"default_config_id"`
AppSecret string `mapstructure:"app_secret"`
InitAccount string `mapstructure:"init_account"`
Token string `mapstructure:"token"`
AES_KEY string `mapstructure:"aes_key"`
ChatIdLen int `mapstructure:"chat_id_len"`
BotGroupID map[string]int `mapstructure:"bot_group_id"` // 应用群
}
// TableDemandConfig 需求表配置
type AITableConfig struct {
Url string `mapstructure:"url"`

View File

@ -36,6 +36,8 @@ const DingTalkAuthBaseKeyPrefix = "dingTalk_auth"
const DingTalkAuthBaseKeyBotPrefix = "dingTalk_auth_bot"
const QywxAuthBaseKeyPrefix = "qywx_auth"
// PermissionType 工具使用权限
type PermissionType int32

View File

@ -0,0 +1,17 @@
package impl
import (
"ai_scheduler/internal/data/model"
"ai_scheduler/tmpl/dataTemp"
"ai_scheduler/utils"
)
type BotGroupConfigImpl struct {
dataTemp.DataTemp
}
func NewBotGroupConfigImpl(db *utils.Db) *BotGroupConfigImpl {
return &BotGroupConfigImpl{
DataTemp: *dataTemp.NewDataTemp(db, new(model.AiBotGroupConfig)),
}
}

View File

@ -0,0 +1,17 @@
package impl
import (
"ai_scheduler/internal/data/model"
"ai_scheduler/tmpl/dataTemp"
"ai_scheduler/utils"
)
type BotGroupQywxImpl struct {
dataTemp.DataTemp
}
func NewBotGroupQywxImpl(db *utils.Db) *BotGroupQywxImpl {
return &BotGroupQywxImpl{
DataTemp: *dataTemp.NewDataTemp(db, new(model.AiBotGroupQywx)),
}
}

View File

@ -15,4 +15,6 @@ var ProviderImpl = wire.NewSet(
NewBotChatHisImpl,
NewBotToolsImpl,
NewBotGroupImpl,
NewBotGroupConfigImpl,
NewBotGroupQywxImpl,
)

View File

@ -12,15 +12,14 @@ const TableNameAiBotGroup = "ai_bot_group"
// AiBotGroup mapped from table <ai_bot_group>
type AiBotGroup struct {
GroupID int32 `gorm:"column:group_id;primaryKey;autoIncrement:true" json:"group_id"`
ConversationID string `gorm:"column:conversation_id;not null;comment:会话ID" json:"conversation_id"` // 会话ID
RobotCode string `gorm:"column:robot_code;not null;comment:绑定机器人code" json:"robot_code"` // 绑定机器人code
Title string `gorm:"column:title;not null;comment:群名称" json:"title"` // 群名称
ToolList string `gorm:"column:tool_list;not null;comment:开通工具列表" json:"tool_list"` // 开通工具列表
ProductName string `gorm:"column:product_name;not null;comment:针对报表商品筛选快速实现,后期优化" json:"product_name"` // 针对报表商品筛选快速实现,后期优化
Status int32 `gorm:"column:status;not null;default:1" json:"status"`
DeleteAt time.Time `gorm:"column:delete_at" json:"delete_at"`
CreateAt time.Time `gorm:"column:create_at;default:CURRENT_TIMESTAMP" json:"create_at"`
GroupID int32 `gorm:"column:group_id;primaryKey;autoIncrement:true" json:"group_id"`
ConversationID string `gorm:"column:conversation_id;not null;comment:会话ID" json:"conversation_id"` // 会话ID
RobotCode string `gorm:"column:robot_code;not null;comment:绑定机器人code" json:"robot_code"` // 绑定机器人code
ConfigID int32 `gorm:"column:config_id;not null;comment:关联ai_bot_group_config" json:"config_id"` // 关联ai_bot_group_config
Title string `gorm:"column:title;not null;comment:群名称" json:"title"` // 群名称
Status int32 `gorm:"column:status;not null;default:1" json:"status"`
DeleteAt *time.Time `gorm:"column:delete_at" json:"delete_at"`
CreateAt time.Time `gorm:"column:create_at;default:CURRENT_TIMESTAMP" json:"create_at"`
}
// TableName AiBotGroup's table name

View File

@ -0,0 +1,19 @@
// Code generated by gorm.io/gen. DO NOT EDIT.
// Code generated by gorm.io/gen. DO NOT EDIT.
// Code generated by gorm.io/gen. DO NOT EDIT.
package model
const TableNameAiBotGroupConfig = "ai_bot_group_config"
// AiBotGroupConfig mapped from table <ai_bot_group_config>
type AiBotGroupConfig struct {
ConfigID int32 `gorm:"column:config_id;primaryKey;autoIncrement:true" json:"config_id"`
ToolList string `gorm:"column:tool_list;not null" json:"tool_list"`
ProductName string `gorm:"column:product_name;not null" json:"product_name"`
}
// TableName AiBotGroupConfig's table name
func (*AiBotGroupConfig) TableName() string {
return TableNameAiBotGroupConfig
}

View File

@ -0,0 +1,28 @@
// Code generated by gorm.io/gen. DO NOT EDIT.
// Code generated by gorm.io/gen. DO NOT EDIT.
// Code generated by gorm.io/gen. DO NOT EDIT.
package model
import (
"time"
)
const TableNameAiBotGroupQywx = "ai_bot_group_qywx"
// AiBotGroupQywx mapped from table <ai_bot_group_qywx>
type AiBotGroupQywx struct {
GroupID int32 `gorm:"column:group_id;primaryKey;autoIncrement:true" json:"group_id"`
ChatID string `gorm:"column:chat_id;not null;comment:会话ID" json:"chat_id"` // 会话ID
ConfigID int32 `gorm:"column:config_id;not null" json:"config_id"`
AppSecret string `gorm:"column:app_secret;not null;comment:绑定机器人code" json:"app_secret"` // 绑定机器人code
Title string `gorm:"column:title;not null;comment:群名称" json:"title"` // 群名称
Status int32 `gorm:"column:status;not null;default:1" json:"status"`
DeleteAt *time.Time `gorm:"column:delete_at" json:"delete_at"`
CreateAt time.Time `gorm:"column:create_at;default:CURRENT_TIMESTAMP" json:"create_at"`
}
// TableName AiBotGroupQywx's table name
func (*AiBotGroupQywx) TableName() string {
return TableNameAiBotGroupQywx
}

View File

@ -15,6 +15,7 @@ import (
"fmt"
"math/rand"
"path/filepath"
"sort"
"strconv"
"time"
@ -91,6 +92,7 @@ type StatisticsOursProductContext struct {
ProductData []statistics_ours_product.StatisticsOursProductItem
ImgUrl string
ExcelData [][]string
TotalLoss float64
}
func (w *statisticsOursProduct) buildWorkflow(ctx context.Context) (compose.Runnable[*StatisticsOursProductWorkflowInput, map[string]any], error) {
@ -122,7 +124,7 @@ func (w *statisticsOursProduct) formatContext(ctx context.Context, input *Statis
Time: time.Now(),
StartTime: startTime,
EndTime: endTime,
Title: fmt.Sprintf("截止 %s 我们的商品统计", endTimeStr),
Title: fmt.Sprintf("截止 %s 亏损100以上我们的商品统计", endTimeStr),
}, nil
}
@ -162,7 +164,7 @@ func (w *statisticsOursProduct) generateExcelAndUpload(ctx context.Context, stat
fileName := fmt.Sprintf("statistics_ours_product_%d%d", time.Now().Unix(), rand.Intn(1000))
// 2. 转换数据为 [][]string
excelData := w.convertDataToExcelFormat(state.ProductData)
excelData, totalLoss := w.convertDataToExcelFormat(state.ProductData)
// 3. 生成 Excel
req := &excel_generator.ExcelGeneratorRequest{
@ -191,13 +193,20 @@ func (w *statisticsOursProduct) generateExcelAndUpload(ctx context.Context, stat
state.ImgUrl = url
state.ExcelData = excelData
state.TotalLoss = totalLoss
return state, nil
}
// convertDataToExcelFormat 将业务数据转换为 Excel 生成器需要的二维字符串数组
func (w *statisticsOursProduct) convertDataToExcelFormat(data []statistics_ours_product.StatisticsOursProductItem) [][]string {
var result [][]string
func (w *statisticsOursProduct) convertDataToExcelFormat(data []statistics_ours_product.StatisticsOursProductItem) ([][]string, float64) {
type sortType struct {
Profit float64
cells []string
}
var sortList []sortType
var totalLoss float64
for _, item := range data {
var profitVal float64
@ -219,6 +228,11 @@ func (w *statisticsOursProduct) convertDataToExcelFormat(data []statistics_ours_
profitVal = 0
}
// 累加总亏损
if profitVal < 0 {
totalLoss += profitVal
}
// 过滤利润小于 -100 的记录
if profitVal > -100 {
continue
@ -236,16 +250,31 @@ func (w *statisticsOursProduct) convertDataToExcelFormat(data []statistics_ours_
fmt.Sprintf("%v", item.Profit),
}
result = append(result, row)
sortList = append(sortList, sortType{
Profit: profitVal,
cells: row,
})
}
return result
// 排序
sort.Slice(sortList, func(i, j int) bool {
return sortList[i].Profit < sortList[j].Profit
})
// 转换为 [][]string
result := make([][]string, 0, len(sortList))
for _, item := range sortList {
result = append(result, item.cells)
}
return result, totalLoss
}
func (w *statisticsOursProduct) convertToMap(ctx context.Context, state *StatisticsOursProductContext) (map[string]any, error) {
return map[string]any{
"path": "",
"url": state.ImgUrl,
"data": state.ExcelData,
"desc": state.Title,
"path": "",
"url": state.ImgUrl,
"data": state.ExcelData,
"title": state.Title + fmt.Sprintf("(总亏损 %.2f", state.TotalLoss),
}, nil
}

View File

@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math/rand"
"net/url"
"reflect"
"strconv"
@ -422,3 +423,21 @@ func StructToQueryString(input interface{}, options ...URLValuesOptions) (string
}
return values.Encode(), nil
}
const (
letterBytes = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" // 62个字符
)
// RandomString 生成随机字符串,包含 0-9, a-z, A-Z
// length: 要生成的字符串长度
func RandomString(length int) string {
// 使用 crypto/rand 替代 math/rand更安全适用于密码学场景
// 但如果不需要高安全性math/rand 更快
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
result := make([]byte, length)
for i := range result {
result[i] = letterBytes[rng.Intn(len(letterBytes))]
}
return string(result)
}

View File

@ -40,7 +40,7 @@ func (c *CronServer) InitJobs(ctx context.Context) {
c.ctx = ctx
c.jobs = []*cronJob{
{
Func: c.cronService.CronReportSend,
Func: c.cronService.CronReportSendDingTalk,
Name: "直连天下报表推送",
Schedule: "0 12,18,23 * * *",
},
@ -66,13 +66,13 @@ func (c *CronServer) Run(ctx context.Context) {
}
c.log.Infof("任务[%d]:%s执行结束", jobID, job.Name)
}()
c.log.Infof("任务[%d]:%s执ddd", jobID, job.Name)
// 为每次执行创建新的上下文
//ctx := context.Background()
//err := job.Func(ctx)
//if err != nil {
// c.log.Errorf("任务[%d]:%s执行失败: %s", jobID, job.Name, err.Error())
//}
//c.log.Infof("任务[%d]:%s执ddd", jobID, job.Name)
//为每次执行创建新的上下文
ctx := context.Background()
err := job.Func(ctx)
if err != nil {
c.log.Errorf("任务[%d]:%s执行失败: %s", jobID, job.Name, err.Error())
}
})
if err != nil {
c.log.Errorf("添加任务失败:%s", err.Error())

View File

@ -66,6 +66,8 @@ func SetupRoutes(app *fiber.App, ChatService *services.ChatService, sessionServi
r.Post("/chat/useful", ChatService.Useful)
// 回调
r.Post("/callback", callbackService.Callback)
// 回调
r.Get("/qywx/callback", callbackService.QywxCallback)
//广播
r.Get("/broadcast", func(ctx *fiber.Ctx) error {
action := ctx.Query("action")
@ -127,7 +129,9 @@ func registerResponse(router fiber.Router) {
func registerCommon(c *fiber.Ctx, err error) error {
// 调用下一个中间件或路由处理函数
if c.Path() == "/api/v1/qywx/callback" {
return nil
}
bsErr, ok := err.(*errors.BusinessErr)
if !ok {
bsErr = errors.SystemError

View File

@ -1,6 +1,7 @@
package services
import (
"ai_scheduler/internal/biz/handle/qywx/json_callback/wxbizjsonmsgcrypt"
"ai_scheduler/internal/config"
"ai_scheduler/internal/data/constants"
errorcode "ai_scheduler/internal/data/error"
@ -13,10 +14,13 @@ import (
"ai_scheduler/internal/tool_callback"
"context"
"encoding/json"
"fmt"
"net/url"
"strings"
"time"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/log"
)
// CallbackService 统一回调入口
@ -103,40 +107,36 @@ func (s *CallbackService) Callback(c *fiber.Ctx) error {
}
}
// func validateTimestamp(ts string, window time.Duration) bool {
// // 期望毫秒时间戳或秒级,简单容错
// // 尝试解析为整数
// var n int64
// for _, base := range []int64{1, 1000} { // 秒或毫秒
// if v, ok := parseInt64(ts); ok {
// n = v
// // 归一为毫秒
// if base == 1 && len(ts) <= 10 {
// n = n * 1000
// }
// now := time.Now().UnixMilli()
// diff := now - n
// if diff < 0 {
// diff = -diff
// }
// if diff <= window.Milliseconds() {
// return true
// }
// }
// }
// return false
// }
func (s *CallbackService) CallbackQr(c *fiber.Ctx) error {
// 读取头
sourceKey := strings.TrimSpace(c.Get("X-Source-Key"))
ts := strings.TrimSpace(c.Get("X-Timestamp"))
// func parseInt64(s string) (int64, bool) {
// var n int64
// for _, ch := range s {
// if ch < '0' || ch > '9' {
// return 0, false
// }
// n = n*10 + int64(ch-'0')
// }
// return n, true
// }
// 时间窗口(如果提供了 ts 则校验,否则跳过),窗口 5 分钟
// if ts != "" && !validateTimestamp(ts, 5*time.Minute) {
if ts != "" && !util.IsInTimeWindow(ts, 5*time.Minute) {
return errorcode.AuthNotFound
}
// 解析 Envelope
var env Envelope
if err := json.Unmarshal(c.Body(), &env); err != nil {
return errorcode.ParamErrf("invalid json: %v", err)
}
if env.Action == "" || env.TaskID == "" {
return errorcode.ParamErrf("missing action/task_id")
}
if env.Data == nil {
return errorcode.ParamErrf("missing data")
}
switch sourceKey {
case "dingtalk":
return s.handleDingTalkCallback(c, env)
default:
return errorcode.AuthNotFound
}
}
func (s *CallbackService) handleDingTalkCallback(c *fiber.Ctx, env Envelope) error {
// 校验taskId
@ -331,3 +331,36 @@ func (s *CallbackService) handleBugOptimizationSubmitDone(ctx context.Context, t
return msg, nil
}
func (s *CallbackService) QywxCallback(c *fiber.Ctx) (err error) {
// 读取头
httpstr := string(c.Request().URI().QueryString())
start := strings.Index(httpstr, "msg_signature=")
start += len("msg_signature=")
var msgSignature string
next := getString(httpstr, "&timestamp=", start, &msgSignature)
var timestamp string
next = getString(httpstr, "&nonce=", next, &timestamp)
var nonce string
next = getString(httpstr, "&echostr=", next, &nonce)
echostr := httpstr[next:len(httpstr)]
echostr, _ = url.QueryUnescape(echostr)
fmt.Println(httpstr, msgSignature, timestamp, nonce, echostr)
wxcpt := wxbizjsonmsgcrypt.NewWXBizMsgCrypt(s.cfg.Qywx.Token, s.cfg.Qywx.AES_KEY, s.cfg.Qywx.CorpId, wxbizjsonmsgcrypt.JsonType)
echoStr, cryptErr := wxcpt.VerifyURL(msgSignature, timestamp, nonce, echostr)
if nil != cryptErr {
log.Errorf("%v", cryptErr)
return fmt.Errorf("%v", cryptErr)
}
fmt.Println("verifyUrl success echoStr", string(echoStr))
err = c.Send(echoStr)
return err
}
func getString(str, endstr string, start int, msg *string) int {
end := strings.Index(str, endstr)
*msg = str[start:end]
return end + len(endstr)
}

View File

@ -11,27 +11,39 @@ import (
type CronService struct {
config *config.Config
dingTalkBotBiz *biz.DingTalkBotBiz
qywxAppBiz *biz.QywxAppBiz
groupConfigBiz *biz.GroupConfigBiz
}
func NewCronService(config *config.Config, dingTalkBotBiz *biz.DingTalkBotBiz) *CronService {
func NewCronService(
config *config.Config,
dingTalkBotBiz *biz.DingTalkBotBiz,
qywxAppBiz *biz.QywxAppBiz,
groupConfigBiz *biz.GroupConfigBiz,
) *CronService {
return &CronService{
config: config,
dingTalkBotBiz: dingTalkBotBiz,
qywxAppBiz: qywxAppBiz,
groupConfigBiz: groupConfigBiz,
}
}
func (d *CronService) CronReportSend(ctx context.Context) error {
func (d *CronService) CronReportSendDingTalk(ctx context.Context) error {
groupId := d.config.Dingtalk.BotGroupID["bbxt"]
groupInfo, err := d.dingTalkBotBiz.GetGroupInfo(ctx, groupId)
if err != nil {
return err
}
reports, err := d.dingTalkBotBiz.GetReportLists(ctx, &groupInfo)
groupConfig, err := d.groupConfigBiz.GetGroupConfig(ctx, groupInfo.ConfigID)
if err != nil {
return err
}
//contentChan <- "截止今日23点利润亏损合计127917.0866元亏损500元以上的分销商和产品金额如下图"
//contentChan <- "![图片](https://lsxdmgoss.oss-cn-chengdu.aliyuncs.com/MarketingSaaS/image/V2/other/shanghu.png)"
reports, err := d.groupConfigBiz.GetReportLists(ctx, groupConfig)
if err != nil {
return err
}
for _, report := range reports {
err = d.dingTalkBotBiz.SendReport(ctx, &groupInfo, report)
if err != nil {
@ -41,3 +53,28 @@ func (d *CronService) CronReportSend(ctx context.Context) error {
}
return nil
}
func (d *CronService) CronReportSendQywx(ctx context.Context) error {
groupId := d.config.Qywx.BotGroupID["bbxt"]
groupInfo, err := d.qywxAppBiz.GetGroupInfo(ctx, groupId)
if err != nil {
return err
}
groupConfig, err := d.groupConfigBiz.GetGroupConfig(ctx, groupInfo.ConfigID)
if err != nil {
return err
}
reports, err := d.groupConfigBiz.GetReportLists(ctx, groupConfig)
if err != nil {
return err
}
for _, report := range reports {
err = d.qywxAppBiz.SendReport(ctx, &groupInfo, report)
if err != nil {
log.Error(err)
continue
}
}
return nil
}

View File

@ -4,6 +4,7 @@ import (
"ai_scheduler/internal/biz"
"ai_scheduler/internal/biz/do"
dingtalk2 "ai_scheduler/internal/biz/handle/dingtalk"
"ai_scheduler/internal/biz/handle/qywx"
"ai_scheduler/internal/biz/llm_service"
"ai_scheduler/internal/biz/tools_regis"
"ai_scheduler/internal/config"
@ -12,7 +13,6 @@ import (
"ai_scheduler/internal/domain/component/callback"
"ai_scheduler/internal/domain/repo"
"ai_scheduler/internal/domain/workflow"
"ai_scheduler/internal/domain/workflow/runtime"
"ai_scheduler/internal/pkg"
"ai_scheduler/internal/pkg/dingtalk"
"ai_scheduler/internal/pkg/lsxd"
@ -30,7 +30,13 @@ import (
func Test_Report(t *testing.T) {
run()
a := cronService.CronReportSend(context.Background())
a := cronService.CronReportSendDingTalk(context.Background())
t.Log(a)
}
func Test_Report_QYWX(t *testing.T) {
run()
a := cronService.CronReportSendQywx(context.Background())
t.Log(a)
}
@ -107,9 +113,13 @@ func run() {
// 初始化钉钉机器人业务逻辑
utils_ossClient, _ := utils_oss.NewClient(configConfig)
// 初始化工作流管理器
workflowManager := runtime.NewRegistry()
dingTalkBotBiz := biz.NewDingTalkBotBiz(doDo, handle, botConfigImpl, botGroupImpl, user, toolRegis, botChatHisImpl, manager, configConfig, sendCardClient, utils_ossClient, workflowManager)
botGroupConfigImpl := impl.NewBotGroupConfigImpl(db)
botGroupQywxImpl := impl.NewBotGroupQywxImpl(db)
qywxAuth := qywx.NewAuth(configConfig, rdb)
group := qywx.NewGroup(botGroupQywxImpl, qywxAuth)
qywxAppBiz := biz.NewQywxAppBiz(configConfig, botGroupQywxImpl, group)
groupConfigBiz := biz.NewGroupConfigBiz(toolRegis, utils_ossClient, botGroupConfigImpl, registry, configConfig)
dingTalkBotBiz := biz.NewDingTalkBotBiz(doDo, handle, botConfigImpl, botGroupImpl, user, botChatHisImpl, manager, configConfig, sendCardClient, groupConfigBiz)
// 初始化钉钉机器人服务
cronService = NewCronService(configConfig, dingTalkBotBiz)
dingBotService = NewDingBotService(configConfig, dingTalkBotBiz)
cronService = NewCronService(configConfig, dingTalkBotBiz, qywxAppBiz, groupConfigBiz)
}

View File

@ -18,7 +18,7 @@ const (
)
var (
DownWardValue int32 = 1500
DownWardValue int32 = 1000
SumFilter int32 = -150
)
@ -355,19 +355,20 @@ func (b *BbxtTools) GetStatisOfficialProductSumDecline(now time.Time, downWardVa
return
}
var (
productSumMap = make(map[int32]ProductSumDecline)
productSumMap = make(map[string]ProductSumDecline)
)
for _, v := range data.OfficialProductSumDecline {
if _, ex := productSumMap[v.OfficialProductId]; !ex {
productSumMap[v.OfficialProductId] = ProductSumDecline{
if _, ex := productSumMap[v.OfficialProductName]; !ex {
productSumMap[v.OfficialProductName] = ProductSumDecline{
OfficialProductName: v.OfficialProductName,
OfficialProductId: v.OfficialProductId,
ProductSumReseller: make(map[int32]ProductSumReseller),
Index: productMap[v.OfficialProductName],
}
}
if v.HistoryOneDiff <= sumFilter || v.HistoryTwoDiff <= sumFilter {
productSumMap[v.OfficialProductId].ProductSumReseller[v.ResellerId] = ProductSumReseller{
productSumMap[v.OfficialProductName].ProductSumReseller[v.ResellerId] = ProductSumReseller{
ResellerName: v.ResellerName,
CurrentNum: v.CurrentNum,
HistoryOneNum: v.HistoryOneNum,
@ -376,8 +377,14 @@ func (b *BbxtTools) GetStatisOfficialProductSumDecline(now time.Time, downWardVa
HistoryTwoDiff: v.HistoryTwoDiff,
}
}
}
var total = make([]ProductSumDecline, 0, len(productSumMap))
for k, _ := range productSumMap {
total = append(total, productSumMap[k])
}
sort.Slice(total, func(i, j int) bool {
return total[i].Index > total[j].Index
})
timeCh := now.Format("1月2日15点")
//title := "截至" + timeCh + "销量下滑大于" + fmt.Sprintf("%d", downWardValue) + "明细,分销商仅展示差额大于" + fmt.Sprintf("%d", -sumFilter)
title := "截至" + timeCh + "销量下滑较大商品"
@ -386,7 +393,7 @@ func (b *BbxtTools) GetStatisOfficialProductSumDecline(now time.Time, downWardVa
return
}
filePath := b.cacheDir + "/xlxhmx" + fmt.Sprintf("%d%d", time.Now().Unix(), rand.Intn(1000)) + ".xlsx"
err = b.OfficialProductSumDeclineExcel(b.excelTempDir+"/"+"/xlxhmx.xlsx", filePath, productSumMap, title)
err = b.OfficialProductSumDeclineExcel(b.excelTempDir+"/"+"/xlxhmx.xlsx", filePath, total, title)
return &ReportRes{
ReportName: "销售下滑明细",
Title: title,

View File

@ -60,7 +60,7 @@ func Test_GetStatisOfficialProductSumDecline(t *testing.T) {
if err != nil {
panic(err)
}
s := "官方--美团外卖红包5元,官方--美团外卖红包10元,官方--饿了么超级会员月卡,官方--网易云黑胶vip月卡,官方--喜马拉雅巅峰会员月卡,官方--芒果-PC季卡,官方--芒果-PC月卡,官方--芒果-PC周卡,官方--腾讯-周卡,官方--优酷周卡,官方--QQ音乐-绿钻月卡,官方--爱奇艺-周卡,官方--腾讯-月卡,官方--腾讯-季卡,官方--腾讯-年卡,官方--优酷月卡,官方--优酷季卡,官方--优酷年卡,官方--爱奇艺-月卡,官方--爱奇艺-季卡,官方--爱奇艺-年卡"
s := "官方--腾讯-周卡,官方--腾讯-月卡,官方--腾讯-季卡,官方--腾讯-年卡,官方--优酷周卡,官方--优酷月卡,官方--优酷季卡,官方--优酷年卡,官方--爱奇艺-周卡,官方--爱奇艺-月卡,官方--爱奇艺-季卡,官方--爱奇艺-年卡,官方--芒果-PC周卡,官方--芒果-PC月卡,官方--芒果-PC季卡,官方--美团外卖红包5元,官方--美团外卖红包10元,官方--QQ音乐-绿钻月卡,官方--饿了么超级会员月卡,官方--网易云黑胶vip月卡,官方--喜马拉雅巅峰会员月卡"
//s := "官方--QQ音乐-绿钻月卡"
report, err := o.GetStatisOfficialProductSumDecline(time.Now(), 1000, strings.Split(s, ","), -150)

View File

@ -26,6 +26,7 @@ type ProductSumDecline struct {
OfficialProductId int32
OfficialProductName string
ProductSumReseller map[int32]ProductSumReseller
Index int
}
type ProductSumReseller struct {

View File

@ -30,6 +30,9 @@ func NewUploader(oss *utils_oss.Client) *Uploader {
}
func (u *Uploader) Run(report *ReportRes) (err error) {
if report == nil {
return
}
if len(report.Path) == 0 {
return
}
@ -173,7 +176,7 @@ func (u *Uploader) uploadToOSS(fileName string, fileBytes []byte) string {
// return url
//}
func (b *BbxtTools) OfficialProductSumDeclineExcel(templatePath, outputPath string, sumMap map[int32]ProductSumDecline, title string) error {
func (b *BbxtTools) OfficialProductSumDeclineExcel(templatePath, outputPath string, sumSlice []ProductSumDecline, title string) error {
// 1. 读取模板
f, err := excelize.OpenFile(templatePath)
if err != nil {
@ -227,7 +230,7 @@ func (b *BbxtTools) OfficialProductSumDeclineExcel(templatePath, outputPath stri
currentRow := 3
pattern := `\$\{(.*?)\}`
re := regexp.MustCompile(pattern)
for _, product := range sumMap {
for _, product := range sumSlice {
// 排序 ProductLoss
var reseller []ProductSumReseller
for _, p := range product.ProductSumReseller {

View File

@ -53,7 +53,7 @@ func NewDataTemp(db *utils.Db, model interface{}) *DataTemp {
return &DataTemp{Db: db.Client, Model: model}
}
func (k DataTemp) GetById(id int) (data map[string]interface{}, err error) {
func (k DataTemp) GetById(id int32) (data map[string]interface{}, err error) {
err = k.Db.Model(k.Model).Where("id = ?", id).Find(&data).Error
if data == nil {
err = sql.ErrNoRows