package hyt import ( "ai_scheduler/internal/config" errorcode "ai_scheduler/internal/data/error" toolManager "ai_scheduler/internal/domain/tools" "ai_scheduler/internal/domain/tools/hyt/goods_add" "ai_scheduler/internal/domain/tools/hyt/goods_category_add" "ai_scheduler/internal/domain/tools/hyt/goods_media_add" "ai_scheduler/internal/domain/workflow/runtime" "ai_scheduler/internal/entitys" "context" "encoding/json" "errors" "fmt" "log" "strconv" "strings" "sync" "github.com/cloudwego/eino/compose" "golang.org/x/sync/errgroup" ) const WorkflowIDGoodsAdd = "hyt.goodsAdd" func init() { runtime.Register(WorkflowIDGoodsAdd, func(d *runtime.Deps) (runtime.Workflow, error) { return &goodsAdd{cfg: d.Conf, toolManager: d.ToolManager}, nil }) } type goodsAdd struct { cfg *config.Config toolManager *toolManager.Manager data *GoodsAddWorkflowInput } type GoodsAddWorkflowInput struct { Text string `mapstructure:"text"` } func (o *goodsAdd) ID() string { return WorkflowIDGoodsAdd } func (o *goodsAdd) Invoke(ctx context.Context, rec *entitys.Recognize) (map[string]any, error) { // 构建工作流 runnable, err := o.buildWorkflow(ctx) if err != nil { return nil, err } o.data = &GoodsAddWorkflowInput{ Text: rec.UserContent.Text, } // 工作流过程调用 output, err := runnable.Invoke(ctx, o.data) if err != nil { fmt.Println("Invoke err:", err) errStr := err.Error() if u := errors.Unwrap(err); u != nil { errStr = u.Error() } return nil, errorcode.WorkflowErr(errStr) } return output, nil } // ProductIngestData 对应 HYTGoodsAddPropertyTemplateZH 的结构 type GoodsAddProductIngestData struct { Title string `json:"商品标题"` GoodsCode string `json:"商品编码"` SpuName string `json:"SPU名称"` SpuCode string `json:"SPU编码"` GoodsNum string `json:"商品货号"` GoodsBarCode string `json:"商品条形码"` Price string `json:"市场价"` SalesPrice string `json:"建议销售价"` ExternalPrice string `json:"电商销售价格"` Unit string `json:"单位"` Discount string `json:"折扣"` TaxRate string `json:"税率"` FreightTemplate string `json:"运费模版"` SellByDate string `json:"保质期"` SellByDateUnit string `json:"保质期单位"` Brand string `json:"品牌"` IsHot string `json:"是否热销主推"` ExternalUrl string `json:"外部平台链接"` Introduction string `json:"商品卖点"` GoodsAttributes string `json:"商品规格参数"` GoodsIllustration string `json:"商品说明"` Remark string `json:"备注"` CategoryName string `json:"分类名称"` Images []string `json:"电脑端主图"` } // GoodsAddContext Graph 执行上下文状态 type GoodsAddContext struct { mu *sync.Mutex InputText string IngestData *GoodsAddProductIngestData // 核心请求体 AddGoodsReq *goods_add.GoodsAddRequest // 中间态数据 BrandId int CategoryId int BrandName string CategoryName string // 运行结果 GoodsAddResp *goods_add.GoodsAddResponse GoodsCategoryAddResp bool GoodsMediaAddResp bool } // buildWorkflow 构建基于 Graph 的并行工作流 func (o *goodsAdd) buildWorkflow(ctx context.Context) (compose.Runnable[*GoodsAddWorkflowInput, map[string]any], error) { g := compose.NewGraph[*GoodsAddWorkflowInput, map[string]any]() // 1. DataMapping 节点: 解析 JSON -> 填充基础 Request g.AddLambdaNode("data_mapping", compose.InvokableLambda(func(ctx context.Context, in *GoodsAddWorkflowInput) (*GoodsAddContext, error) { state := &GoodsAddContext{ mu: &sync.Mutex{}, // 初始化锁 InputText: in.Text, AddGoodsReq: &goods_add.GoodsAddRequest{}, } // 解析用户输入的中文 JSON var ingestData GoodsAddProductIngestData if err := json.Unmarshal([]byte(in.Text), &ingestData); err != nil { return nil, fmt.Errorf("解析商品数据失败") } // 必填校验 if ingestData.Title == "" { return nil, errors.New("商品标题不能为空") } if ingestData.GoodsCode == "" { return nil, errors.New("商品编码不能为空") } if ingestData.SpuName == "" { return nil, errors.New("SPU名称不能为空") } if ingestData.SpuCode == "" { return nil, errors.New("SPU编码不能为空") } if ingestData.Price == "" { return nil, errors.New("市场价不能为空") } if ingestData.SalesPrice == "" { return nil, errors.New("建议销售价不能为空") } if ingestData.Unit == "" { return nil, errors.New("价格单位不能为空") } if ingestData.Discount == "" { return nil, errors.New("折扣不能为空") } if ingestData.TaxRate == "" { return nil, errors.New("税率不能为空") } state.IngestData = &ingestData state.BrandName = ingestData.Brand state.CategoryName = ingestData.CategoryName // 映射字段到 AddGoodsReq state.AddGoodsReq.Title = ingestData.Title state.AddGoodsReq.GoodsCode = ingestData.GoodsCode state.AddGoodsReq.SpuName = ingestData.SpuName state.AddGoodsReq.SpuCode = ingestData.SpuCode state.AddGoodsReq.GoodsNum = ingestData.GoodsNum state.AddGoodsReq.GoodsBarCode = ingestData.GoodsBarCode // 价格处理 if val, err := strconv.ParseFloat(strings.TrimSuffix(ingestData.Price, "元"), 64); err == nil { state.AddGoodsReq.Price = val } if val, err := strconv.ParseFloat(strings.TrimSuffix(ingestData.SalesPrice, "元"), 64); err == nil { state.AddGoodsReq.SalesPrice = val } if val, err := strconv.ParseFloat(strings.TrimSuffix(ingestData.ExternalPrice, "元"), 64); err == nil { state.AddGoodsReq.ExternalPrice = val } state.AddGoodsReq.Unit = ingestData.Unit // 折扣处理 "80%" -> 80 discountStr := strings.TrimSuffix(ingestData.Discount, "%") if val, err := strconv.Atoi(discountStr); err == nil { state.AddGoodsReq.Discount = val } // 税率处理 "13%" -> 13 taxStr := strings.TrimSuffix(strings.TrimSuffix(ingestData.TaxRate, "%"), " ") if val, err := strconv.Atoi(taxStr); err == nil { state.AddGoodsReq.TaxRate = val } // 运费模板先不给 state.AddGoodsReq.FreightId = 3 // 保质期处理 "180天" -> 180 sellByDateStr := strings.TrimSuffix(ingestData.SellByDate, "天") if val, err := strconv.Atoi(sellByDateStr); err == nil { state.AddGoodsReq.SellByDate = val } state.AddGoodsReq.SellByDateUnit = ingestData.SellByDateUnit // state.AddGoodsReq.BrandId 品牌ID后续赋值 state.AddGoodsReq.IsHot = 2 if ingestData.IsHot == "是" { state.AddGoodsReq.IsHot = 1 } state.AddGoodsReq.ExternalUrl = ingestData.ExternalUrl state.AddGoodsReq.Introduction = ingestData.Introduction state.AddGoodsReq.GoodsAttributes = ingestData.GoodsAttributes state.AddGoodsReq.GoodsIllustration = ingestData.GoodsIllustration state.AddGoodsReq.Remark = ingestData.Remark state.AddGoodsReq.IsComposeGoods = 2 // 非组合商品 return state, nil })) // 2. 预处理节点: 并行获取 品牌ID 和 分类ID g.AddLambdaNode("prepare_info", compose.InvokableLambda(func(ctx context.Context, state *GoodsAddContext) (*GoodsAddContext, error) { eg, ctx := errgroup.WithContext(ctx) // 任务1: 获取品牌ID eg.Go(func() error { if state.BrandName == "" { return nil } brandId, err := o.toolManager.Hyt.GoodsBrandSearch.Call(ctx, state.BrandName) if err != nil { log.Printf("warning: 品牌ID获取失败,%s: %v\n", state.BrandName, err) return nil } state.mu.Lock() state.BrandId = brandId state.AddGoodsReq.BrandId = brandId state.mu.Unlock() return nil }) // 任务2: 获取分类ID eg.Go(func() error { if state.CategoryName == "" { return nil } categoryId, err := o.toolManager.Hyt.GoodsCategorySearch.Call(ctx, state.CategoryName) if err != nil { log.Printf("warning: 分类ID获取失败,%s: %v\n", state.CategoryName, err) return nil } state.mu.Lock() state.CategoryId = categoryId state.mu.Unlock() return nil }) // 等待所有任务完成 _ = eg.Wait() return state, nil })) // 3. 新增商品 节点 (依赖 prepare_info) g.AddLambdaNode("goods_add", compose.InvokableLambda(func(ctx context.Context, state *GoodsAddContext) (*GoodsAddContext, error) { // 调用 goods_add 工具 respData, err := o.toolManager.Hyt.GoodsAdd.Call(ctx, state.AddGoodsReq) if err != nil || respData == nil { log.Printf("warning: 新增商品失败: %v", err) return nil, fmt.Errorf("新增商品失败: %s", err.Error()) } state.GoodsAddResp = respData return state, nil })) // 4. 后置处理节点: 并行执行 关联分类 和 添加图片 g.AddLambdaNode("post_process", compose.InvokableLambda(func(ctx context.Context, state *GoodsAddContext) (*GoodsAddContext, error) { if state.GoodsAddResp.Id == 0 { return nil, errors.New("商品不存在") } eg, ctx := errgroup.WithContext(ctx) // 任务1: 关联分类 eg.Go(func() error { if state.CategoryId == 0 { return nil } req := &goods_category_add.GoodsCategoryAddRequest{ GoodsId: state.GoodsAddResp.Id, CategoryIds: []int{state.CategoryId}, IsCover: false, } isSuccess, err := o.toolManager.Hyt.GoodsCategoryAdd.Call(ctx, req) if err != nil { log.Printf("warning: 关联分类失败: %v", err) return nil } state.mu.Lock() state.GoodsCategoryAddResp = isSuccess state.mu.Unlock() return nil }) // 任务2: 添加图片 eg.Go(func() error { if len(state.IngestData.Images) == 0 { return nil } req := &goods_media_add.GoodsMediaAddRequest{ GoodsId: state.GoodsAddResp.Id, IsCover: true, Data: make([]goods_media_add.MediaItem, 0), } for i, url := range state.IngestData.Images { req.Data = append(req.Data, goods_media_add.MediaItem{ Type: 1, // 图片 Url: url, Sort: i, }) } isSuccess, err := o.toolManager.Hyt.GoodsMediaAdd.Call(ctx, req) if err != nil { log.Printf("warning: 添加图片失败: %v", err) return nil } state.mu.Lock() state.GoodsMediaAddResp = isSuccess state.mu.Unlock() return nil }) // 等待所有任务完成 _ = eg.Wait() return state, nil })) // 5. 结果格式化节点 g.AddLambdaNode("format_output", compose.InvokableLambda(func(ctx context.Context, state *GoodsAddContext) (map[string]any, error) { if state.GoodsAddResp == nil { return nil, fmt.Errorf("goods add response is nil") } return map[string]any{ "预览URL(货易通商品列表)": state.GoodsAddResp.PreviewUrl, "SPU编码": state.GoodsAddResp.SpuCode, "商品ID": state.GoodsAddResp.Id, }, nil })) // 构建边 (线性拓扑) g.AddEdge(compose.START, "data_mapping") g.AddEdge("data_mapping", "prepare_info") g.AddEdge("prepare_info", "goods_add") g.AddEdge("goods_add", "post_process") g.AddEdge("post_process", "format_output") g.AddEdge("format_output", compose.END) return g.Compile(ctx) }