400 lines
11 KiB
Go
400 lines
11 KiB
Go
package biz
|
||
|
||
import (
|
||
"ai_scheduler/internal/biz/tools_regis"
|
||
"ai_scheduler/internal/config"
|
||
"ai_scheduler/internal/data/constants"
|
||
"ai_scheduler/internal/data/impl"
|
||
"ai_scheduler/internal/data/model"
|
||
"ai_scheduler/internal/domain/workflow/recharge"
|
||
"ai_scheduler/internal/domain/workflow/runtime"
|
||
"ai_scheduler/internal/entitys"
|
||
"ai_scheduler/internal/pkg/l_request"
|
||
"ai_scheduler/internal/pkg/utils_oss"
|
||
"ai_scheduler/internal/tools"
|
||
"ai_scheduler/internal/tools/bbxt"
|
||
"context"
|
||
"encoding/json"
|
||
"errors"
|
||
"fmt"
|
||
"io"
|
||
"net/http"
|
||
"strconv"
|
||
"strings"
|
||
"time"
|
||
|
||
"github.com/coze-dev/coze-go"
|
||
"github.com/gofiber/fiber/v2/log"
|
||
"xorm.io/builder"
|
||
)
|
||
|
||
// AiRouterBiz 智能路由服务
|
||
type GroupConfigBiz struct {
|
||
botGroupConfigImpl *impl.BotGroupConfigImpl
|
||
ossClient *utils_oss.Client
|
||
workflowManager *runtime.Registry
|
||
botTools []model.AiBotTool
|
||
toolManager *tools.Manager
|
||
conf *config.Config
|
||
}
|
||
|
||
// NewDingTalkBotBiz
|
||
func NewGroupConfigBiz(
|
||
tools *tools_regis.ToolRegis,
|
||
ossClient *utils_oss.Client,
|
||
botGroupConfigImpl *impl.BotGroupConfigImpl,
|
||
workflowManager *runtime.Registry,
|
||
conf *config.Config,
|
||
) *GroupConfigBiz {
|
||
return &GroupConfigBiz{
|
||
botTools: tools.BootTools,
|
||
ossClient: ossClient,
|
||
botGroupConfigImpl: botGroupConfigImpl,
|
||
workflowManager: workflowManager,
|
||
conf: conf,
|
||
}
|
||
}
|
||
|
||
func (g *GroupConfigBiz) GetGroupConfig(ctx context.Context, configId int32) (*model.AiBotGroupConfig, error) {
|
||
var groupConfig model.AiBotGroupConfig
|
||
cond := builder.NewCond()
|
||
cond = cond.And(builder.Eq{"config_id": configId})
|
||
err := g.botGroupConfigImpl.GetOneBySearchToStrut(&cond, &groupConfig)
|
||
return &groupConfig, err
|
||
}
|
||
|
||
func (g *GroupConfigBiz) GetReportLists(ctx context.Context, groupConfig *model.AiBotGroupConfig) (reports []*bbxt.ReportRes, err error) {
|
||
if groupConfig == nil {
|
||
return
|
||
}
|
||
var product []string
|
||
if groupConfig.ProductName != "" {
|
||
product = strings.Split(groupConfig.ProductName, ",")
|
||
}
|
||
|
||
reportList, err := bbxt.NewBbxtTools()
|
||
if err != nil {
|
||
return
|
||
}
|
||
|
||
reports, err = reportList.DailyReport(time.Now(), bbxt.DownWardValue, product, bbxt.SumFilter, g.ossClient)
|
||
if err != nil {
|
||
return
|
||
}
|
||
//product = []string{"优酷周卡", "优酷季卡", "优酷年卡", "爱奇艺黄金会员天卡"}
|
||
//追加电商充值系统统计 - 返回统一使用[]*bbxt.ReportRes
|
||
rechargeReports, err := g.rechargeDailyReport(ctx, time.Now(), nil, g.ossClient)
|
||
if err != nil || len(rechargeReports) == 0 {
|
||
return
|
||
}
|
||
|
||
reports = append(reports, rechargeReports...)
|
||
|
||
return
|
||
}
|
||
|
||
// rechargeDailyReport 获取电商充值系统统计报告
|
||
func (g *GroupConfigBiz) rechargeDailyReport(ctx context.Context, now time.Time, productNames []string, ossClient *utils_oss.Client) (reports []*bbxt.ReportRes, err error) {
|
||
defer func() {
|
||
if err := recover(); err != nil {
|
||
log.Error(err)
|
||
}
|
||
}()
|
||
|
||
workflowId := recharge.WorkflowIDStatisticsOursProduct
|
||
args := &runtime.WorkflowArgs{
|
||
Args: map[string]any{
|
||
"product_names": productNames,
|
||
"now": now,
|
||
},
|
||
}
|
||
res, err := g.workflowManager.Invoke(ctx, workflowId, args)
|
||
if err != nil {
|
||
return
|
||
}
|
||
|
||
log.Infof("imgUrl: %s", res["url"].(string))
|
||
|
||
reports = []*bbxt.ReportRes{
|
||
{
|
||
ReportName: "我们的商品统计(电商充值系统)",
|
||
Title: fmt.Sprintf("%s 电商充值系统我们的商品统计", now.Format("2006-01-02")),
|
||
Path: res["path"].(string),
|
||
Url: res["url"].(string),
|
||
Data: res["data"].([][]string),
|
||
Desc: res["desc"].(string),
|
||
},
|
||
}
|
||
|
||
return
|
||
}
|
||
|
||
func (g *GroupConfigBiz) handleReport(ctx context.Context, rec *entitys.Recognize, task *model.AiBotTool, groupConfig *model.AiBotGroupConfig) error {
|
||
var configData entitys.ConfigDataReport
|
||
err := json.Unmarshal([]byte(rec.Match.Parameters), &configData)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
t, err := time.Parse(time.DateTime, configData.Time)
|
||
if err != nil {
|
||
t, err = time.Parse("2006-01-02 15:04", configData.Time)
|
||
if err != nil {
|
||
t, err = time.Parse("2006-01-02", configData.Time)
|
||
if err != nil {
|
||
log.Infof("时间识别失败:%s", configData.Time)
|
||
entitys.ResText(rec.Ch, "", "时间识别失败了!可以给我一份比较具体的时间吗,例如“2025-12-31 12:00,抱歉抱歉😀")
|
||
}
|
||
}
|
||
}
|
||
rep, err := bbxt.NewBbxtTools()
|
||
uploader := bbxt.NewUploader(g.ossClient)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
var reports []*bbxt.ReportRes
|
||
switch rec.Match.Index {
|
||
case "report_loss_analysis":
|
||
repo, _err := rep.StatisOursProductLossSum(t)
|
||
if _err != nil {
|
||
return _err
|
||
}
|
||
reports = append(reports, repo...)
|
||
case "report_sales_analysis":
|
||
product := strings.Split(groupConfig.ProductName, ",")
|
||
repo, _err := rep.GetStatisOfficialProductSum(t, product)
|
||
if _err != nil {
|
||
return _err
|
||
}
|
||
reports = append(reports, repo)
|
||
|
||
case "report_ranking_of_distributors":
|
||
repo, _err := rep.GetProfitRankingSum(t)
|
||
if _err != nil {
|
||
return _err
|
||
}
|
||
reports = append(reports, repo)
|
||
case "report_daily":
|
||
product := strings.Split(groupConfig.ProductName, ",")
|
||
repo, _err := rep.DailyReport(t, bbxt.DownWardValue, product, bbxt.SumFilter, nil)
|
||
if _err != nil {
|
||
return _err
|
||
}
|
||
reports = append(reports, repo...)
|
||
case "report_daily_recharge":
|
||
product := strings.Split(groupConfig.ProductName, ",")
|
||
repo, _err := g.rechargeDailyReport(ctx, t, product, nil)
|
||
if _err != nil || len(repo) == 0 {
|
||
return _err
|
||
}
|
||
reports = append(reports, repo...)
|
||
case "report_sale_down_analysis":
|
||
product := strings.Split(groupConfig.ProductName, ",")
|
||
repo, _err := rep.GetStatisOfficialProductSumDecline(t, bbxt.DownWardValue, product, bbxt.SumFilter)
|
||
if _err != nil {
|
||
return _err
|
||
}
|
||
reports = append(reports, repo)
|
||
default:
|
||
return fmt.Errorf("未找到的报表:%s", rec.Match.Index)
|
||
}
|
||
|
||
for _, report := range reports {
|
||
err = uploader.Run(report)
|
||
if err != nil {
|
||
log.Error(err)
|
||
continue
|
||
}
|
||
|
||
entitys.ResText(rec.Ch, "", fmt.Sprintf("%s", report.Title, report.Url))
|
||
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (g *GroupConfigBiz) handleMatch(ctx context.Context, rec *entitys.Recognize, groupConfig *model.AiBotGroupConfig) (err error) {
|
||
|
||
if !rec.Match.IsMatch {
|
||
if len(rec.Match.Chat) != 0 {
|
||
entitys.ResText(rec.Ch, "", rec.Match.Chat)
|
||
} else {
|
||
entitys.ResText(rec.Ch, "", rec.Match.Reasoning)
|
||
}
|
||
return
|
||
}
|
||
var pointTask *model.AiBotTool
|
||
for _, task := range g.botTools {
|
||
if task.Index == rec.Match.Index {
|
||
pointTask = &task
|
||
|
||
break
|
||
}
|
||
}
|
||
if pointTask == nil || pointTask.Index == "other" {
|
||
return g.otherTask(ctx, rec)
|
||
}
|
||
switch constants.TaskType(pointTask.Type) {
|
||
case constants.TaskTypeFunc:
|
||
return g.handleTask(ctx, rec, pointTask)
|
||
case constants.TaskTypeReport:
|
||
return g.handleReport(ctx, rec, pointTask, groupConfig)
|
||
case constants.TaskTypeCozeWorkflow:
|
||
return g.handleCozeWorkflow(ctx, rec, pointTask)
|
||
default:
|
||
return g.otherTask(ctx, rec)
|
||
}
|
||
return
|
||
}
|
||
|
||
func (g *GroupConfigBiz) getGroupTools(ctx context.Context, groupConfig *model.AiBotGroupConfig) (tools []model.AiBotTool, err error) {
|
||
if len(g.botTools) == 0 {
|
||
return
|
||
}
|
||
var (
|
||
groupRegisTools = make(map[int]struct{})
|
||
)
|
||
if groupConfig.ToolList != "" {
|
||
groupToolList := strings.Split(groupConfig.ToolList, ",")
|
||
for _, tool := range groupToolList {
|
||
if tool == "" {
|
||
continue
|
||
}
|
||
num, _err := strconv.Atoi(tool)
|
||
if _err != nil {
|
||
continue
|
||
}
|
||
groupRegisTools[num] = struct{}{}
|
||
}
|
||
}
|
||
|
||
for _, v := range g.botTools {
|
||
if v.PermissionType == constants.PermissionTypeNone {
|
||
tools = append(tools, v)
|
||
continue
|
||
}
|
||
if _, ex := groupRegisTools[int(v.ToolID)]; ex {
|
||
tools = append(tools, v)
|
||
}
|
||
}
|
||
return
|
||
}
|
||
|
||
func (q *GroupConfigBiz) handleTask(ctx context.Context, rec *entitys.Recognize, task *model.AiBotTool) (err error) {
|
||
var configData entitys.ConfigDataTool
|
||
err = json.Unmarshal([]byte(task.Config), &configData)
|
||
if err != nil {
|
||
return
|
||
}
|
||
|
||
err = q.toolManager.ExecuteTool(ctx, configData.Tool, rec)
|
||
if err != nil {
|
||
return
|
||
}
|
||
|
||
return
|
||
}
|
||
|
||
func (g *GroupConfigBiz) handleCozeWorkflow(ctx context.Context, rec *entitys.Recognize, task *model.AiBotTool) (err error) {
|
||
entitys.ResLoading(rec.Ch, task.Index, "正在执行工作流(coze)\n")
|
||
|
||
customClient := &http.Client{
|
||
Timeout: time.Minute * 30,
|
||
}
|
||
|
||
authCli := coze.NewTokenAuth(g.conf.Coze.ApiSecret)
|
||
cozeCli := coze.NewCozeAPI(
|
||
authCli,
|
||
coze.WithBaseURL(g.conf.Coze.BaseURL),
|
||
coze.WithHttpClient(customClient),
|
||
)
|
||
|
||
// 从参数中获取workflowID
|
||
type requestParams struct {
|
||
Request l_request.Request `json:"request"`
|
||
}
|
||
var config requestParams
|
||
err = json.Unmarshal([]byte(task.Config), &config)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
workflowId, ok := config.Request.Json["workflow_id"].(string)
|
||
if !ok {
|
||
return fmt.Errorf("workflow_id不能为空")
|
||
}
|
||
// 提取参数
|
||
var data map[string]interface{}
|
||
err = json.Unmarshal([]byte(rec.Match.Parameters), &data)
|
||
|
||
req := &coze.RunWorkflowsReq{
|
||
WorkflowID: workflowId,
|
||
Parameters: data,
|
||
// IsAsync: true,
|
||
}
|
||
|
||
stream := config.Request.Json["stream"].(bool)
|
||
|
||
entitys.ResLog(rec.Ch, task.Index, "工作流执行中...")
|
||
|
||
if stream {
|
||
streamResp, err := cozeCli.Workflows.Runs.Stream(ctx, req)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
g.handleCozeWorkflowEvents(ctx, streamResp, cozeCli, workflowId, rec.Ch, task.Index)
|
||
} else {
|
||
resp, err := cozeCli.Workflows.Runs.Create(ctx, req)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
entitys.ResJson(rec.Ch, task.Index, resp.Data)
|
||
}
|
||
|
||
return
|
||
}
|
||
|
||
// handleCozeWorkflowEvents 处理 coze 工作流事件
|
||
func (g *GroupConfigBiz) handleCozeWorkflowEvents(ctx context.Context, resp coze.Stream[coze.WorkflowEvent], cozeCli coze.CozeAPI, workflowID string, ch chan entitys.Response, index string) {
|
||
defer resp.Close()
|
||
for {
|
||
event, err := resp.Recv()
|
||
if errors.Is(err, io.EOF) {
|
||
fmt.Println("Stream finished")
|
||
break
|
||
}
|
||
if err != nil {
|
||
fmt.Println("Error receiving event:", err)
|
||
break
|
||
}
|
||
|
||
switch event.Event {
|
||
case coze.WorkflowEventTypeMessage:
|
||
entitys.ResStream(ch, index, event.Message.Content)
|
||
case coze.WorkflowEventTypeError:
|
||
entitys.ResError(ch, index, fmt.Sprintf("工作流执行错误: %v", event.Error))
|
||
case coze.WorkflowEventTypeDone:
|
||
entitys.ResEnd(ch, index, "工作流执行完成")
|
||
case coze.WorkflowEventTypeInterrupt:
|
||
resumeReq := &coze.ResumeRunWorkflowsReq{
|
||
WorkflowID: workflowID,
|
||
EventID: event.Interrupt.InterruptData.EventID,
|
||
ResumeData: "your data",
|
||
InterruptType: event.Interrupt.InterruptData.Type,
|
||
}
|
||
newResp, err := cozeCli.Workflows.Runs.Resume(ctx, resumeReq)
|
||
if err != nil {
|
||
entitys.ResError(ch, index, fmt.Sprintf("工作流恢复执行错误: %s", err.Error()))
|
||
return
|
||
}
|
||
entitys.ResLog(ch, index, "工作流恢复执行中...")
|
||
g.handleCozeWorkflowEvents(ctx, newResp, cozeCli, workflowID, ch, index)
|
||
}
|
||
}
|
||
fmt.Printf("done, log:%s\n", resp.Response().LogID())
|
||
}
|
||
|
||
func (g *GroupConfigBiz) otherTask(ctx context.Context, rec *entitys.Recognize) (err error) {
|
||
entitys.ResText(rec.Ch, "", rec.Match.Reasoning)
|
||
return
|
||
}
|