ai_scheduler/internal/biz/group_config.go

638 lines
19 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package biz
import (
"ai_scheduler/internal/biz/tools_regis"
"ai_scheduler/internal/config"
"ai_scheduler/internal/data/constants"
"ai_scheduler/internal/data/impl"
"ai_scheduler/internal/data/model"
"ai_scheduler/internal/domain/tools/common/knowledge_base"
"ai_scheduler/internal/domain/workflow/recharge"
"ai_scheduler/internal/domain/workflow/runtime"
"ai_scheduler/internal/entitys"
"ai_scheduler/internal/pkg"
"ai_scheduler/internal/pkg/dingtalk"
"ai_scheduler/internal/pkg/l_request"
"ai_scheduler/internal/pkg/lsxd"
"ai_scheduler/internal/pkg/utils_oss"
"ai_scheduler/internal/tools"
"ai_scheduler/internal/tools/bbxt"
"ai_scheduler/utils"
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"
"gitea.cdlsxd.cn/self-tools/l-dingtalk-stream-sdk-go/chatbot"
"github.com/alibabacloud-go/dingtalk/card_1_0"
"github.com/alibabacloud-go/tea/tea"
"github.com/coze-dev/coze-go"
"github.com/gofiber/fiber/v2/log"
"xorm.io/builder"
)
// AiRouterBiz 智能路由服务
type GroupConfigBiz struct {
botGroupConfigImpl *impl.BotGroupConfigImpl
reportDailyCacheImpl *impl.ReportDailyCacheImpl
botConfigImpl *impl.BotConfigImpl
ossClient *utils_oss.Client
workflowManager *runtime.Registry
botTools []model.AiBotTool
toolManager *tools.Manager
conf *config.Config
rdb *utils.Rdb
dingtalkCardClient *dingtalk.CardClient
}
// NewDingTalkBotBiz
func NewGroupConfigBiz(
tools *tools_regis.ToolRegis,
ossClient *utils_oss.Client,
botGroupConfigImpl *impl.BotGroupConfigImpl,
botConfigImpl *impl.BotConfigImpl,
workflowManager *runtime.Registry,
conf *config.Config,
reportDailyCacheImpl *impl.ReportDailyCacheImpl,
rdb *utils.Rdb,
toolManager *tools.Manager,
dingtalkCardClient *dingtalk.CardClient,
) *GroupConfigBiz {
return &GroupConfigBiz{
botTools: tools.BootTools,
ossClient: ossClient,
botGroupConfigImpl: botGroupConfigImpl,
botConfigImpl: botConfigImpl,
workflowManager: workflowManager,
conf: conf,
reportDailyCacheImpl: reportDailyCacheImpl,
rdb: rdb,
toolManager: toolManager,
dingtalkCardClient: dingtalkCardClient,
}
}
func (g *GroupConfigBiz) GetGroupConfig(ctx context.Context, configId int32) (*model.AiBotGroupConfig, error) {
var groupConfig model.AiBotGroupConfig
cond := builder.NewCond()
cond = cond.And(builder.Eq{"config_id": configId})
err := g.botGroupConfigImpl.GetOneBySearchToStrut(&cond, &groupConfig)
return &groupConfig, err
}
func (g *GroupConfigBiz) GetReportLists(ctx context.Context, groupConfig *model.AiBotGroupConfig) (reports []*bbxt.ReportRes, err error) {
if groupConfig == nil {
return
}
var product []string
if groupConfig.ProductName != "" {
product = strings.Split(groupConfig.ProductName, ",")
}
reportList, err := bbxt.NewBbxtTools(g.conf, lsxd.NewLogin(g.conf, g.rdb))
if err != nil {
return
}
reports, err = reportList.DailyReport(ctx, time.Now(), bbxt.DownWardValue, product, bbxt.SumFilter, g.ossClient, g.GetReportCache)
if err != nil {
return
}
//追加电商充值系统统计 - 返回统一使用[]*bbxt.ReportRes
rechargeReports, err := g.rechargeDailyReport(ctx, time.Now(), nil, g.ossClient)
if err != nil || len(rechargeReports) == 0 {
return
}
reports = append(reports, rechargeReports...)
return
}
// rechargeDailyReport 获取电商充值系统统计报告
func (g *GroupConfigBiz) rechargeDailyReport(ctx context.Context, now time.Time, productNames []string, ossClient *utils_oss.Client) (reports []*bbxt.ReportRes, err error) {
defer func() {
if err := recover(); err != nil {
log.Error(err)
}
}()
workflowId := recharge.WorkflowIDStatisticsOursProduct
args := &runtime.WorkflowArgs{
Args: map[string]any{
"product_names": productNames,
"now": now,
},
}
res, err := g.workflowManager.Invoke(ctx, workflowId, args)
if err != nil {
return
}
log.Infof("imgUrl: %s", res["url"].(string))
reports = []*bbxt.ReportRes{
{
ReportName: "我们的商品统计(电商充值系统)",
Title: res["title"].(string),
Path: res["path"].(string),
Url: res["url"].(string),
Data: res["data"].([][]string),
},
}
return
}
func (g *GroupConfigBiz) handleReport(ctx context.Context, rec *entitys.Recognize, task *model.AiBotTool, groupConfig *model.AiBotGroupConfig) error {
var configData entitys.ConfigDataReport
err := json.Unmarshal([]byte(rec.Match.Parameters), &configData)
if err != nil {
return err
}
t, err := time.Parse(time.DateTime, configData.Time)
if err != nil {
t, err = time.Parse("2006-01-02 15:04", configData.Time)
if err != nil {
t, err = time.Parse("2006-01-02", configData.Time)
if err != nil {
log.Infof("时间识别失败:%s", configData.Time)
entitys.ResText(rec.Ch, "", "时间识别失败了可以给我一份比较具体的时间吗例如“2025-12-31 12:00,抱歉抱歉😀")
}
}
}
rep, err := bbxt.NewBbxtTools(g.conf, lsxd.NewLogin(g.conf, g.rdb))
uploader := bbxt.NewUploader(g.ossClient, g.conf)
if err != nil {
return err
}
var reports []*bbxt.ReportRes
switch rec.Match.Index {
case "report_loss_analysis":
repo, _err := rep.StatisOursProductLossSum(ctx, t, g.GetReportCache)
if _err != nil {
return _err
}
reports = append(reports, repo...)
case "report_sales_analysis":
product := strings.Split(groupConfig.ProductName, ",")
repo, _err := rep.GetStatisOfficialProductSum(t, product)
if _err != nil {
return _err
}
reports = append(reports, repo)
case "report_ranking_of_distributors":
repo, _err := rep.GetProfitRankingSum(t)
if _err != nil {
return _err
}
reports = append(reports, repo)
case "report_daily":
product := strings.Split(groupConfig.ProductName, ",")
repo, _err := rep.DailyReport(ctx, t, bbxt.DownWardValue, product, bbxt.SumFilter, nil, g.GetReportCache)
if _err != nil {
return _err
}
rechargeReport, _err := g.rechargeDailyReport(ctx, t, product, nil)
if _err != nil {
return _err
}
reports = append(reports, repo...)
reports = append(reports, rechargeReport...)
case "report_daily_recharge":
product := strings.Split(groupConfig.ProductName, ",")
repo, _err := g.rechargeDailyReport(ctx, t, product, nil)
if _err != nil || len(repo) == 0 {
return _err
}
reports = append(reports, repo...)
case "report_sale_down_analysis":
product := strings.Split(groupConfig.ProductName, ",")
repo, _err := rep.GetStatisOfficialProductSumDecline(t, bbxt.DownWardValue, product, bbxt.SumFilter)
if _err != nil {
return _err
}
reports = append(reports, repo)
default:
return fmt.Errorf("未找到的报表:%s", rec.Match.Index)
}
for _, report := range reports {
if report == nil {
continue
}
err = uploader.Run(report)
if err != nil {
log.Error(err)
continue
}
entitys.ResText(rec.Ch, "", fmt.Sprintf("%s![图片](%s)", report.Title, report.Url))
}
return nil
}
func (g *GroupConfigBiz) handleMatch(ctx context.Context, rec *entitys.Recognize, groupConfig *model.AiBotGroupConfig, callback *chatbot.BotCallbackDataModel) (err error) {
if !rec.Match.IsMatch {
if len(rec.Match.Chat) != 0 {
entitys.ResText(rec.Ch, "", rec.Match.Chat)
} else {
entitys.ResText(rec.Ch, "", rec.Match.Reasoning)
}
return
}
var pointTask *model.AiBotTool
for _, task := range g.botTools {
if task.Index == rec.Match.Index {
pointTask = &task
break
}
}
if pointTask == nil || pointTask.Index == "other" {
return g.otherTask(ctx, rec)
}
switch constants.TaskType(pointTask.Type) {
case constants.TaskTypeFunc:
return g.handleTask(ctx, rec, pointTask)
case constants.TaskTypeReport:
return g.handleReport(ctx, rec, pointTask, groupConfig)
case constants.TaskTypeCozeWorkflow:
return g.handleCozeWorkflow(ctx, rec, pointTask)
case constants.TaskTypeKnowle: // 知识库V2版本
return g.handleKnowledgeV2(ctx, rec, groupConfig, callback)
default:
return g.otherTask(ctx, rec)
}
return
}
func (g *GroupConfigBiz) getGroupTools(ctx context.Context, groupConfig *model.AiBotGroupConfig) (tools []model.AiBotTool, err error) {
if len(g.botTools) == 0 {
return
}
var (
groupRegisTools = make(map[int]struct{})
)
if groupConfig.ToolList != "" {
groupToolList := strings.Split(groupConfig.ToolList, ",")
for _, tool := range groupToolList {
if tool == "" {
continue
}
num, _err := strconv.Atoi(tool)
if _err != nil {
continue
}
groupRegisTools[num] = struct{}{}
}
}
for _, v := range g.botTools {
if v.PermissionType == constants.PermissionTypeNone {
tools = append(tools, v)
continue
}
if _, ex := groupRegisTools[int(v.ToolID)]; ex {
tools = append(tools, v)
}
}
return
}
func (q *GroupConfigBiz) handleTask(ctx context.Context, rec *entitys.Recognize, task *model.AiBotTool) (err error) {
var configData entitys.ConfigDataTool
err = json.Unmarshal([]byte(task.Config), &configData)
if err != nil {
return
}
err = q.toolManager.ExecuteTool(ctx, configData.Tool, rec)
if err != nil {
return
}
return
}
func (g *GroupConfigBiz) handleCozeWorkflow(ctx context.Context, rec *entitys.Recognize, task *model.AiBotTool) (err error) {
entitys.ResLoading(rec.Ch, task.Index, "正在执行工作流(coze)\n")
customClient := &http.Client{
Timeout: time.Minute * 30,
}
authCli := coze.NewTokenAuth(g.conf.Coze.ApiSecret)
cozeCli := coze.NewCozeAPI(
authCli,
coze.WithBaseURL(g.conf.Coze.BaseURL),
coze.WithHttpClient(customClient),
)
// 从参数中获取workflowID
type requestParams struct {
Request l_request.Request `json:"request"`
}
var config requestParams
err = json.Unmarshal([]byte(task.Config), &config)
if err != nil {
return err
}
workflowId, ok := config.Request.Json["workflow_id"].(string)
if !ok {
return fmt.Errorf("workflow_id不能为空")
}
// 提取参数
var data map[string]interface{}
err = json.Unmarshal([]byte(rec.Match.Parameters), &data)
req := &coze.RunWorkflowsReq{
WorkflowID: workflowId,
Parameters: data,
// IsAsync: true,
}
stream := config.Request.Json["stream"].(bool)
entitys.ResLog(rec.Ch, task.Index, "工作流执行中...")
if stream {
streamResp, err := cozeCli.Workflows.Runs.Stream(ctx, req)
if err != nil {
return err
}
g.handleCozeWorkflowEvents(ctx, streamResp, cozeCli, workflowId, rec.Ch, task.Index)
} else {
resp, err := cozeCli.Workflows.Runs.Create(ctx, req)
if err != nil {
return err
}
entitys.ResJson(rec.Ch, task.Index, resp.Data)
}
return
}
// handleCozeWorkflowEvents 处理 coze 工作流事件
func (g *GroupConfigBiz) handleCozeWorkflowEvents(ctx context.Context, resp coze.Stream[coze.WorkflowEvent], cozeCli coze.CozeAPI, workflowID string, ch chan entitys.Response, index string) {
defer resp.Close()
for {
event, err := resp.Recv()
if errors.Is(err, io.EOF) {
fmt.Println("Stream finished")
break
}
if err != nil {
fmt.Println("Error receiving event:", err)
break
}
switch event.Event {
case coze.WorkflowEventTypeMessage:
entitys.ResStream(ch, index, event.Message.Content)
case coze.WorkflowEventTypeError:
entitys.ResError(ch, index, fmt.Sprintf("工作流执行错误: %v", event.Error))
case coze.WorkflowEventTypeDone:
entitys.ResEnd(ch, index, "工作流执行完成")
case coze.WorkflowEventTypeInterrupt:
resumeReq := &coze.ResumeRunWorkflowsReq{
WorkflowID: workflowID,
EventID: event.Interrupt.InterruptData.EventID,
ResumeData: "your data",
InterruptType: event.Interrupt.InterruptData.Type,
}
newResp, err := cozeCli.Workflows.Runs.Resume(ctx, resumeReq)
if err != nil {
entitys.ResError(ch, index, fmt.Sprintf("工作流恢复执行错误: %s", err.Error()))
return
}
entitys.ResLog(ch, index, "工作流恢复执行中...")
g.handleCozeWorkflowEvents(ctx, newResp, cozeCli, workflowID, ch, index)
}
}
fmt.Printf("done, log:%s\n", resp.Response().LogID())
}
func (g *GroupConfigBiz) otherTask(ctx context.Context, rec *entitys.Recognize) (err error) {
entitys.ResText(rec.Ch, "", rec.Match.Reasoning)
return
}
func (g *GroupConfigBiz) GetReportCache(ctx context.Context, day time.Time, totalDetail []*bbxt.ResellerLoss, bbxtObj *bbxt.BbxtTools) error {
var ResellerProductRelation map[int32]*bbxt.ResellerLossSumProductRelation
dayDate := day.Format(time.DateOnly)
cond := builder.NewCond()
cond = cond.And(builder.Eq{"cache_index": bbxt.IndexLossSumDetail})
cond = cond.And(builder.Eq{"cache_key": dayDate})
var cache model.AiReportDailyCache
err := g.reportDailyCacheImpl.GetOneBySearchToStrut(&cond, &cache)
if err != nil {
return err
}
if cache.ID == 0 {
ResellerProductRelation, err = bbxtObj.GetResellerLossMannagerAndLossReasonFromApi(ctx, totalDetail)
if err != nil {
return err
}
cache = model.AiReportDailyCache{
CacheKey: dayDate,
CacheIndex: bbxt.IndexLossSumDetail,
Value: pkg.JsonStringIgonErr(ResellerProductRelation),
}
_, err = g.reportDailyCacheImpl.Add(&cache)
} else {
err = json.Unmarshal([]byte(cache.Value), &ResellerProductRelation)
}
if err != nil {
return err
}
for _, v := range totalDetail {
if _, ex := ResellerProductRelation[v.ResellerId]; !ex {
continue
}
v.Manager = ResellerProductRelation[v.ResellerId].AfterSaleName
for _, vv := range v.ProductLoss {
if _, ex := ResellerProductRelation[v.ResellerId].Products[vv.ProductId]; !ex {
continue
}
vv.LossReason = ResellerProductRelation[v.ResellerId].Products[vv.ProductId].LossReason
}
}
return nil
}
// handleKnowledgeV2 处理知识库V2版本
func (g *GroupConfigBiz) handleKnowledgeV2(ctx context.Context, rec *entitys.Recognize, groupConfig *model.AiBotGroupConfig, callback *chatbot.BotCallbackDataModel) (err error) {
// 请求知识库工具
knowledgeBase := knowledge_base.New(g.conf.KnowledgeConfig)
knowledgeResp, err := knowledgeBase.Call(&knowledge_base.ChatRequest{
TenantID: constants.KnowledgeTenantIdDefault, // 后续动态接参
Query: rec.UserContent.Text,
Mode: constants.KnowledgeModeMix,
Stream: true,
Think: false,
})
if err != nil {
return fmt.Errorf("请求知识库工具失败err: %v", err)
}
// 读取知识库SSE数据
isRetrieved, err := g.readKnowledgeSSE(knowledgeResp, rec.Ch, true)
if err != nil {
return
}
// 未检索到匹配信息,询问是否拉群
if !isRetrieved {
g.shouldCreateIssueHandlingGroup(ctx, rec, groupConfig, callback)
return nil
}
return
}
// 读取知识库 SSE 数据
func (g *GroupConfigBiz) readKnowledgeSSE(resp io.ReadCloser, channel chan entitys.Response, useParagraphMode bool) (isRetrieved bool, err error) {
scanner := bufio.NewScanner(resp)
var buffer strings.Builder
for scanner.Scan() {
line := scanner.Text()
delta, done, err := knowledge_base.ParseOpenAIStreamData(line)
if err != nil {
return false, fmt.Errorf("解析SSE数据失败: %w", err)
}
if done {
break
}
if delta == nil {
continue
}
// 知识库未命中 输出提示后中断
if delta.XRagStatus == constants.KnowledgeRagStatusMiss {
var missContent string = "知识库未检测到匹配信息,即将为您创建群聊解决问题。"
entitys.ResStream(channel, "", missContent)
return false, nil
}
// 推理内容
if delta.ReasoningContent != "" {
entitys.ResStream(channel, "", delta.ReasoningContent)
continue
}
// 输出内容 - 段落
if delta.Content != "" && useParagraphMode {
// 存入缓冲区
buffer.WriteString(delta.Content)
content := buffer.String()
// 检查是否有换行符,按段落输出
if idx := strings.LastIndex(content, "\n"); idx != -1 {
// 发送直到最后一个换行符的内容
toSend := content[:idx+1]
entitys.ResStream(channel, "", toSend)
// 重置缓冲区,保留剩余部分
remaining := content[idx+1:]
buffer.Reset()
buffer.WriteString(remaining)
}
}
// 输出内容 - 逐字
if delta.Content != "" && !useParagraphMode {
entitys.ResStream(channel, "", delta.Content)
}
}
if err := scanner.Err(); err != nil {
return true, fmt.Errorf("读取SSE流中断: %w", err)
}
// 发送缓冲区剩余内容(仅在段落模式下需要)
if useParagraphMode && buffer.Len() > 0 {
entitys.ResStream(channel, "", buffer.String())
}
return true, nil
}
// 询问是否创建群聊处理问题
func (g *GroupConfigBiz) shouldCreateIssueHandlingGroup(ctx context.Context, rec *entitys.Recognize, groupConfig *model.AiBotGroupConfig, callback *chatbot.BotCallbackDataModel) error {
// 获取群问题处理人
type issueOwnerType struct {
UserId string `json:"userid"`
Name string `json:"name"`
}
var issueOwner []issueOwnerType
if err := json.Unmarshal([]byte(groupConfig.IssueOwner), &issueOwner); err != nil {
return fmt.Errorf("解析群问题处理人失败err: %v", err)
}
// 合并所有name
var userNames []string
for _, owner := range issueOwner {
userNames = append(userNames, "@"+owner.Name)
}
issueOwnerStr := strings.Join(userNames, "、")
botConfig, err := g.botConfigImpl.GetRobotConfig(callback.RobotCode)
if err != nil {
return fmt.Errorf("获取机器人配置失败err: %v", err)
}
// 构建卡片 OutTrackId
outTrackId := constants.BuildCardOutTrackId(callback.ConversationId, botConfig.ClientId)
// 发送钉钉卡片
_, err = g.dingtalkCardClient.CreateAndDeliver(dingtalk.AppKey{
AppKey: botConfig.ClientId,
AppSecret: botConfig.ClientSecret,
}, &card_1_0.CreateAndDeliverRequest{
CardTemplateId: tea.String(constants.DingtalkCardTplCreateGroupApprove),
OutTrackId: tea.String(outTrackId),
CallbackType: tea.String("STREAM"),
CardData: &card_1_0.CreateAndDeliverRequestCardData{
CardParamMap: map[string]*string{
"title": tea.String("创建群聊提醒"),
"content": tea.String(fmt.Sprintf("**确认创建群聊?**\n\n将邀请以下成员加入群聊\n\n%s", issueOwnerStr)),
"remark": tea.String("注:如若无需,忽略即可"),
"button_left": tea.String("创建群聊"),
"button_right": tea.String("忽略"),
"action_id": tea.String("create_group"),
"button_display": tea.String("true"),
"group_scope": tea.String(strings.TrimSpace(rec.UserContent.Text)),
// "_CARD_DEBUG_TOOL_ENTRY": tea.String("show"), // debug字段
},
},
ImGroupOpenSpaceModel: &card_1_0.CreateAndDeliverRequestImGroupOpenSpaceModel{
SupportForward: tea.Bool(false),
},
OpenSpaceId: tea.String("dtv1.card//im_group." + callback.ConversationId),
ImGroupOpenDeliverModel: &card_1_0.CreateAndDeliverRequestImGroupOpenDeliverModel{
RobotCode: tea.String(callback.RobotCode),
Recipients: []*string{
tea.String(callback.SenderStaffId),
},
},
})
if err != nil {
return fmt.Errorf("发送钉钉卡片失败err: %v", err)
}
return nil
}