feat: 增加电商充值系统我们的商品统计

This commit is contained in:
fuzhongyun 2025-12-31 12:02:12 +08:00
parent 86713dbb1a
commit e77da7875e
13 changed files with 127 additions and 55 deletions

View File

@ -7,6 +7,8 @@ import (
"ai_scheduler/internal/data/constants" "ai_scheduler/internal/data/constants"
"ai_scheduler/internal/data/impl" "ai_scheduler/internal/data/impl"
"ai_scheduler/internal/data/model" "ai_scheduler/internal/data/model"
"ai_scheduler/internal/domain/workflow/recharge"
"ai_scheduler/internal/domain/workflow/runtime"
"ai_scheduler/internal/entitys" "ai_scheduler/internal/entitys"
"ai_scheduler/internal/pkg/l_request" "ai_scheduler/internal/pkg/l_request"
"ai_scheduler/internal/pkg/utils_oss" "ai_scheduler/internal/pkg/utils_oss"
@ -48,6 +50,7 @@ type DingTalkBotBiz struct {
conf *config.Config conf *config.Config
cardSend *dingtalk.SendCardClient cardSend *dingtalk.SendCardClient
ossClient *utils_oss.Client ossClient *utils_oss.Client
workflowManager *runtime.Registry
} }
// NewDingTalkBotBiz // NewDingTalkBotBiz
@ -63,6 +66,7 @@ func NewDingTalkBotBiz(
conf *config.Config, conf *config.Config,
cardSend *dingtalk.SendCardClient, cardSend *dingtalk.SendCardClient,
ossClient *utils_oss.Client, ossClient *utils_oss.Client,
workflowManager *runtime.Registry,
) *DingTalkBotBiz { ) *DingTalkBotBiz {
return &DingTalkBotBiz{ return &DingTalkBotBiz{
do: do, do: do,
@ -77,6 +81,7 @@ func NewDingTalkBotBiz(
conf: conf, conf: conf,
cardSend: cardSend, cardSend: cardSend,
ossClient: ossClient, ossClient: ossClient,
workflowManager: workflowManager,
} }
} }
@ -552,6 +557,38 @@ func (d *DingTalkBotBiz) GetReportLists(ctx context.Context, group *model.AiBotG
return return
} }
// 追加电商充值系统统计 - 返回统一使用 []*bbxt.ReportRes
rechargeReports, err := d.rechargeDailyReport(ctx, time.Now(), []string{"官方-爱奇艺-星钻季卡", "官方-爱奇艺-星钻半年卡", "官方--腾讯-年卡", "官方--爱奇艺-月卡"}, d.ossClient)
reports = append(reports, rechargeReports...)
return
}
// rechargeDailyReport 获取电商充值系统统计报告
func (d *DingTalkBotBiz) rechargeDailyReport(ctx context.Context, now time.Time, productNames []string, ossClient *utils_oss.Client) (reports []*bbxt.ReportRes, err error) {
workflowId := recharge.WorkflowIDStatisticsOursProduct
args := &runtime.WorkflowArgs{
Args: map[string]any{
"product_names": productNames,
"now": now,
},
}
res, err := d.workflowManager.Invoke(ctx, workflowId, args)
if err != nil {
return
}
reports = []*bbxt.ReportRes{
{
ReportName: "我们的商品统计(电商充值系统)",
Title: fmt.Sprintf("%s 电商充值系统我们的商品统计", now.Format("2006-01-02")),
Path: res["path"].(string),
Url: res["url"].(string),
Data: res["data"].([][]string),
Desc: res["desc"].(string),
},
}
return return
} }

View File

@ -396,7 +396,7 @@ func (r *Handle) handleEinoWorkflow(ctx context.Context, rec *entitys.Recognize,
// 工作流内部输出 // 工作流内部输出
workflowId := task.Index workflowId := task.Index
_, err = r.workflowManager.Invoke(ctx, workflowId, rec) _, err = r.workflowManager.Invoke(ctx, workflowId, &runtime.WorkflowArgs{Recognize: rec})
if err != nil { if err != nil {
return err return err
} }

View File

@ -8,7 +8,6 @@ import (
"ai_scheduler/internal/domain/tools/hyt/goods_category_add" "ai_scheduler/internal/domain/tools/hyt/goods_category_add"
"ai_scheduler/internal/domain/tools/hyt/goods_media_add" "ai_scheduler/internal/domain/tools/hyt/goods_media_add"
"ai_scheduler/internal/domain/workflow/runtime" "ai_scheduler/internal/domain/workflow/runtime"
"ai_scheduler/internal/entitys"
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
@ -42,7 +41,7 @@ type GoodsAddWorkflowInput struct {
func (o *goodsAdd) ID() string { return WorkflowIDGoodsAdd } func (o *goodsAdd) ID() string { return WorkflowIDGoodsAdd }
func (o *goodsAdd) Invoke(ctx context.Context, rec *entitys.Recognize) (map[string]any, error) { func (o *goodsAdd) Invoke(ctx context.Context, rec *runtime.WorkflowArgs) (map[string]any, error) {
// 构建工作流 // 构建工作流
runnable, err := o.buildWorkflow(ctx) runnable, err := o.buildWorkflow(ctx)
if err != nil { if err != nil {

View File

@ -6,7 +6,6 @@ import (
toolManager "ai_scheduler/internal/domain/tools" toolManager "ai_scheduler/internal/domain/tools"
toolPu "ai_scheduler/internal/domain/tools/hyt/product_upload" toolPu "ai_scheduler/internal/domain/tools/hyt/product_upload"
"ai_scheduler/internal/domain/workflow/runtime" "ai_scheduler/internal/domain/workflow/runtime"
"ai_scheduler/internal/entitys"
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
@ -39,7 +38,7 @@ type ProductUploadWorkflowInput struct {
func (o *productUpload) ID() string { return WorkflowIDProductUpload } func (o *productUpload) ID() string { return WorkflowIDProductUpload }
func (o *productUpload) Invoke(ctx context.Context, rec *entitys.Recognize) (map[string]any, error) { func (o *productUpload) Invoke(ctx context.Context, rec *runtime.WorkflowArgs) (map[string]any, error) {
// 构建工作流 // 构建工作流
runnable, err := o.buildWorkflow(ctx) runnable, err := o.buildWorkflow(ctx)
if err != nil { if err != nil {

View File

@ -6,7 +6,6 @@ import (
toolManager "ai_scheduler/internal/domain/tools" toolManager "ai_scheduler/internal/domain/tools"
"ai_scheduler/internal/domain/tools/recharge/statistics_ours_product" "ai_scheduler/internal/domain/tools/recharge/statistics_ours_product"
"ai_scheduler/internal/domain/workflow/runtime" "ai_scheduler/internal/domain/workflow/runtime"
"ai_scheduler/internal/entitys"
"ai_scheduler/internal/pkg/utils_oss" "ai_scheduler/internal/pkg/utils_oss"
"context" "context"
"errors" "errors"
@ -37,28 +36,31 @@ type StatisticsOursProductWorkflowInput struct {
} }
type StatisticsOursProductWorkflowOutput struct { type StatisticsOursProductWorkflowOutput struct {
ImgUrl string `json:"img_url"` Path string `json:"path"`
Url string `json:"url"`
Data [][]string `json:"data"`
Desc string `json:"desc"`
} }
func (w *statisticsOursProduct) ID() string { return WorkflowIDStatisticsOursProduct } func (w *statisticsOursProduct) ID() string { return WorkflowIDStatisticsOursProduct }
func (w *statisticsOursProduct) Invoke(ctx context.Context, rec *entitys.Recognize) (map[string]any, error) { func (w *statisticsOursProduct) Invoke(ctx context.Context, args *runtime.WorkflowArgs) (map[string]any, error) {
// 构建工作流 // 构建工作流
runnable, err := w.buildWorkflow(ctx) runnable, err := w.buildWorkflow(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// 解析参数 (假设参数在 rec.Match.Parameters 中,或者根据实际情况解析) // 获取参数时间
// 这里简化处理,假设需要解析参数 now := args.Args["now"].(time.Time)
// 实际上这里应该根据 LLM 解析的结果来填充 Input
// 暂时假设 ParameterResult 是 JSON 字符串
input := &StatisticsOursProductWorkflowInput{ input := &StatisticsOursProductWorkflowInput{
// 默认值,具体应从 rec 解析 // 默认值,具体应从 rec 解析
StartTime: time.Now().Format("2006010200"), StartTime: now.Format("2006010200"),
EndTime: time.Now().Format("2006010223"), EndTime: now.Format("2006010215"),
} }
input.StartTime = "2025122300"
// 工作流过程调用 // 工作流过程调用
output, err := runnable.Invoke(ctx, input) output, err := runnable.Invoke(ctx, input)
if err != nil { if err != nil {
@ -70,11 +72,11 @@ func (w *statisticsOursProduct) Invoke(ctx context.Context, rec *entitys.Recogni
return nil, errorcode.WorkflowErr(errStr) return nil, errorcode.WorkflowErr(errStr)
} }
return map[string]any{"img_url": output.ImgUrl}, nil return output, nil
} }
func (w *statisticsOursProduct) buildWorkflow(ctx context.Context) (compose.Runnable[*StatisticsOursProductWorkflowInput, *StatisticsOursProductWorkflowOutput], error) { func (w *statisticsOursProduct) buildWorkflow(ctx context.Context) (compose.Runnable[*StatisticsOursProductWorkflowInput, map[string]any], error) {
c := compose.NewChain[*StatisticsOursProductWorkflowInput, *StatisticsOursProductWorkflowOutput]() c := compose.NewChain[*StatisticsOursProductWorkflowInput, map[string]any]()
// 1. 调用工具统计我们的商品 // 1. 调用工具统计我们的商品
c.AppendLambda(compose.InvokableLambda(w.callStatisticsTool)) c.AppendLambda(compose.InvokableLambda(w.callStatisticsTool))
@ -82,6 +84,9 @@ func (w *statisticsOursProduct) buildWorkflow(ctx context.Context) (compose.Runn
// 2. 生成 Excel 并转图片上传 // 2. 生成 Excel 并转图片上传
c.AppendLambda(compose.InvokableLambda(w.generateExcelAndUpload)) c.AppendLambda(compose.InvokableLambda(w.generateExcelAndUpload))
// 3. 转map输出
c.AppendLambda(compose.InvokableLambda(w.convertToMap))
return c.Compile(ctx) return c.Compile(ctx)
} }
@ -98,7 +103,7 @@ func (w *statisticsOursProduct) callStatisticsTool(ctx context.Context, input *S
func (w *statisticsOursProduct) generateExcelAndUpload(ctx context.Context, data []statistics_ours_product.StatisticsOursProductItem) (*StatisticsOursProductWorkflowOutput, error) { func (w *statisticsOursProduct) generateExcelAndUpload(ctx context.Context, data []statistics_ours_product.StatisticsOursProductItem) (*StatisticsOursProductWorkflowOutput, error) {
// 2. 获取模板路径 (假设在项目根目录的 assets/templates 下) // 2. 获取模板路径 (假设在项目根目录的 assets/templates 下)
cwd, _ := filepath.Abs(".") cwd, _ := filepath.Abs(".")
templatePath := filepath.Join(cwd, "assets", "templates", "statistics_ours_product.xlsx") templatePath := filepath.Join(cwd, "tmpl", "excel_temp", "recharge_statistics_ours_product.xlsx")
fileName := fmt.Sprintf("statistics_ours_product_%d", time.Now().Unix()) fileName := fmt.Sprintf("statistics_ours_product_%d", time.Now().Unix())
// 3. 转换数据为 [][]string // 3. 转换数据为 [][]string
@ -122,7 +127,14 @@ func (w *statisticsOursProduct) generateExcelAndUpload(ctx context.Context, data
return nil, fmt.Errorf("上传 OSS 失败: %v", err) return nil, fmt.Errorf("上传 OSS 失败: %v", err)
} }
return &StatisticsOursProductWorkflowOutput{ImgUrl: url}, nil res := &StatisticsOursProductWorkflowOutput{
Path: "",
Url: url,
Data: excelData,
Desc: "",
}
return res, nil
} }
// convertDataToExcelFormat 将业务数据转换为 Excel 生成器需要的二维字符串数组 // convertDataToExcelFormat 将业务数据转换为 Excel 生成器需要的二维字符串数组
@ -144,3 +156,12 @@ func (w *statisticsOursProduct) convertDataToExcelFormat(data []statistics_ours_
} }
return result return result
} }
func (w *statisticsOursProduct) convertToMap(ctx context.Context, output *StatisticsOursProductWorkflowOutput) (map[string]any, error) {
return map[string]any{
"path": output.Path,
"url": output.Url,
"data": output.Data,
"desc": output.Desc,
}, nil
}

View File

@ -15,7 +15,7 @@ import (
type Workflow interface { type Workflow interface {
ID() string ID() string
// Schema() map[string]any // Schema() map[string]any
Invoke(ctx context.Context, requireData *entitys.Recognize) (map[string]any, error) Invoke(ctx context.Context, requireData *WorkflowArgs) (map[string]any, error)
} }
type Deps struct { type Deps struct {
@ -28,6 +28,11 @@ type Deps struct {
type Factory func(deps *Deps) (Workflow, error) type Factory func(deps *Deps) (Workflow, error)
type WorkflowArgs struct {
*entitys.Recognize
Args map[string]any
}
var ( var (
regMu sync.RWMutex regMu sync.RWMutex
factories = map[string]Factory{} factories = map[string]Factory{}
@ -69,7 +74,7 @@ func Default() *Registry {
return r return r
} }
func (r *Registry) Invoke(ctx context.Context, id string, rec *entitys.Recognize) (map[string]any, error) { func (r *Registry) Invoke(ctx context.Context, id string, rec *WorkflowArgs) (map[string]any, error) {
regMu.RLock() regMu.RLock()
f, ok := factories[id] f, ok := factories[id]
regMu.RUnlock() regMu.RUnlock()

View File

@ -42,7 +42,7 @@ func (w *bugOptimizationSubmitBak) ID() string {
type BugOptimizationSubmitBakInput struct { type BugOptimizationSubmitBakInput struct {
Ch chan entitys.Response Ch chan entitys.Response
RequireData *entitys.Recognize RequireData *runtime.WorkflowArgs
} }
type BugOptimizationSubmitBakOutput struct { type BugOptimizationSubmitBakOutput struct {
@ -54,7 +54,7 @@ type contextWithTaskBak struct {
TaskID string TaskID string
} }
func (w *bugOptimizationSubmitBak) Invoke(ctx context.Context, recognize *entitys.Recognize) (map[string]any, error) { func (w *bugOptimizationSubmitBak) Invoke(ctx context.Context, recognize *runtime.WorkflowArgs) (map[string]any, error) {
chain, err := w.buildWorkflow(ctx) chain, err := w.buildWorkflow(ctx)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -42,7 +42,7 @@ func (w *bugOptimizationSubmit) ID() string {
type BugOptimizationSubmitInput struct { type BugOptimizationSubmitInput struct {
Ch chan entitys.Response Ch chan entitys.Response
RequireData *entitys.Recognize RequireData *runtime.WorkflowArgs
} }
type BugOptimizationSubmitOutput struct { type BugOptimizationSubmitOutput struct {
@ -54,7 +54,7 @@ type contextWithTask struct {
TaskID string TaskID string
} }
func (w *bugOptimizationSubmit) Invoke(ctx context.Context, recognize *entitys.Recognize) (map[string]any, error) { func (w *bugOptimizationSubmit) Invoke(ctx context.Context, recognize *runtime.WorkflowArgs) (map[string]any, error) {
chain, err := w.buildWorkflow(ctx) chain, err := w.buildWorkflow(ctx)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -78,7 +78,7 @@ type OrderAfterSaleResellerBatchData struct {
func (o *orderAfterSaleResellerBatch) ID() string { return "zltx.orderAfterSaleResellerBatch" } func (o *orderAfterSaleResellerBatch) ID() string { return "zltx.orderAfterSaleResellerBatch" }
// Invoke 调用原有编排工作流并规范化输出 // Invoke 调用原有编排工作流并规范化输出
func (o *orderAfterSaleResellerBatch) Invoke(ctx context.Context, rec *entitys.Recognize) (map[string]any, error) { func (o *orderAfterSaleResellerBatch) Invoke(ctx context.Context, rec *runtime.WorkflowArgs) (map[string]any, error) {
// 构建工作流 // 构建工作流
chain, err := o.buildWorkflow(ctx) chain, err := o.buildWorkflow(ctx)
if err != nil { if err != nil {

View File

@ -18,6 +18,7 @@ type HTTPServer struct {
callback *services.CallbackService callback *services.CallbackService
chatHis *services.HistoryService chatHis *services.HistoryService
capabilityService *services.CapabilityService capabilityService *services.CapabilityService
cronService *services.CronService
} }
func NewHTTPServer( func NewHTTPServer(
@ -28,10 +29,11 @@ func NewHTTPServer(
callback *services.CallbackService, callback *services.CallbackService,
chatHis *services.HistoryService, chatHis *services.HistoryService,
capabilityService *services.CapabilityService, capabilityService *services.CapabilityService,
cronService *services.CronService,
) *fiber.App { ) *fiber.App {
//构建 server //构建 server
app := initRoute() app := initRoute()
router.SetupRoutes(app, service, session, task, gateway, callback, chatHis, capabilityService) router.SetupRoutes(app, service, session, task, gateway, callback, chatHis, capabilityService, cronService)
return app return app
} }

View File

@ -21,12 +21,13 @@ type RouterServer struct {
gateway *gateway.Gateway gateway *gateway.Gateway
chatHist *services.HistoryService chatHist *services.HistoryService
capabilityService *services.CapabilityService capabilityService *services.CapabilityService
cronService *services.CronService
} }
// SetupRoutes 设置路由 // SetupRoutes 设置路由
func SetupRoutes(app *fiber.App, ChatService *services.ChatService, sessionService *services.SessionService, task *services.TaskService, func SetupRoutes(app *fiber.App, ChatService *services.ChatService, sessionService *services.SessionService, task *services.TaskService,
gateway *gateway.Gateway, callbackService *services.CallbackService, chatHist *services.HistoryService, gateway *gateway.Gateway, callbackService *services.CallbackService, chatHist *services.HistoryService,
capabilityService *services.CapabilityService, capabilityService *services.CapabilityService, cronService *services.CronService,
) { ) {
app.Use(func(c *fiber.Ctx) error { app.Use(func(c *fiber.Ctx) error {
// 设置 CORS 头 // 设置 CORS 头
@ -94,6 +95,11 @@ func SetupRoutes(app *fiber.App, ChatService *services.ChatService, sessionServi
// 能力 // 能力
r.Post("/capability/product/ingest", capabilityService.ProductIngest) // 商品数据提取 r.Post("/capability/product/ingest", capabilityService.ProductIngest) // 商品数据提取
r.Post("/capability/product/ingest/:thread_id/confirm", capabilityService.ProductIngestConfirm) // 商品数据提取确认 r.Post("/capability/product/ingest/:thread_id/confirm", capabilityService.ProductIngestConfirm) // 商品数据提取确认
// 测试任务
r.Post("/test/cron", func(c *fiber.Ctx) error {
return cronService.CronReportSend(c.Context())
})
} }
func routerSocket(app *fiber.App, chatService *services.ChatService) { func routerSocket(app *fiber.App, chatService *services.ChatService) {

View File

@ -198,7 +198,7 @@ func (s *CapabilityService) ProductIngestConfirm(c *fiber.Ctx) error {
// 调用eino工作流实现商品上传到目标系统 // 调用eino工作流实现商品上传到目标系统
rec := &entitys.Recognize{UserContent: &entitys.RecognizeUserContent{Text: req.Confirmed}} rec := &entitys.Recognize{UserContent: &entitys.RecognizeUserContent{Text: req.Confirmed}}
res, err := s.workflowManager.Invoke(ctx, workflowId, rec) res, err := s.workflowManager.Invoke(ctx, workflowId, &runtime.WorkflowArgs{Recognize: rec})
if err != nil { if err != nil {
return err return err
} }

View File

@ -12,6 +12,7 @@ import (
"ai_scheduler/internal/domain/component/callback" "ai_scheduler/internal/domain/component/callback"
"ai_scheduler/internal/domain/repo" "ai_scheduler/internal/domain/repo"
"ai_scheduler/internal/domain/workflow" "ai_scheduler/internal/domain/workflow"
"ai_scheduler/internal/domain/workflow/runtime"
"ai_scheduler/internal/pkg" "ai_scheduler/internal/pkg"
"ai_scheduler/internal/pkg/dingtalk" "ai_scheduler/internal/pkg/dingtalk"
"ai_scheduler/internal/pkg/utils_ollama" "ai_scheduler/internal/pkg/utils_ollama"
@ -102,7 +103,9 @@ func run() {
handle := do.NewHandle(ollamaService, manager, configConfig, sessionImpl, registry, oldClient, contactClient, notableClient) handle := do.NewHandle(ollamaService, manager, configConfig, sessionImpl, registry, oldClient, contactClient, notableClient)
// 初始化钉钉机器人业务逻辑 // 初始化钉钉机器人业务逻辑
utils_ossClient, _ := utils_oss.NewClient(configConfig) utils_ossClient, _ := utils_oss.NewClient(configConfig)
dingTalkBotBiz := biz.NewDingTalkBotBiz(doDo, handle, botConfigImpl, botGroupImpl, user, toolRegis, botChatHisImpl, manager, configConfig, sendCardClient, utils_ossClient) // 初始化工作流管理器
workflowManager := runtime.NewRegistry()
dingTalkBotBiz := biz.NewDingTalkBotBiz(doDo, handle, botConfigImpl, botGroupImpl, user, toolRegis, botChatHisImpl, manager, configConfig, sendCardClient, utils_ossClient, workflowManager)
// 初始化钉钉机器人服务 // 初始化钉钉机器人服务
cronService = NewCronService(configConfig, dingTalkBotBiz) cronService = NewCronService(configConfig, dingTalkBotBiz)
dingBotService = NewDingBotService(configConfig, dingTalkBotBiz) dingBotService = NewDingBotService(configConfig, dingTalkBotBiz)