package hyt import ( "ai_scheduler/internal/config" errorcode "ai_scheduler/internal/data/error" toolManager "ai_scheduler/internal/domain/tools" "ai_scheduler/internal/domain/tools/hyt/goods_add" "ai_scheduler/internal/domain/tools/hyt/goods_category_add" "ai_scheduler/internal/domain/tools/hyt/goods_media_add" "ai_scheduler/internal/domain/workflow/runtime" "ai_scheduler/internal/entitys" "context" "encoding/json" "errors" "fmt" "log" "strconv" "strings" "sync" "github.com/cloudwego/eino/compose" ) const WorkflowIDGoodsAdd = "hyt.goodsAdd" func init() { runtime.Register(WorkflowIDGoodsAdd, func(d *runtime.Deps) (runtime.Workflow, error) { return &goodsAdd{cfg: d.Conf, toolManager: d.ToolManager}, nil }) } type goodsAdd struct { cfg *config.Config toolManager *toolManager.Manager data *GoodsAddWorkflowInput } type GoodsAddWorkflowInput struct { Text string `mapstructure:"text"` } func (o *goodsAdd) ID() string { return WorkflowIDGoodsAdd } func (o *goodsAdd) Invoke(ctx context.Context, rec *entitys.Recognize) (map[string]any, error) { // 构建工作流 runnable, err := o.buildWorkflow(ctx) if err != nil { return nil, err } o.data = &GoodsAddWorkflowInput{ Text: rec.UserContent.Text, } // 工作流过程调用 output, err := runnable.Invoke(ctx, o.data) if err != nil { errStr := err.Error() if u := errors.Unwrap(err); u != nil { errStr = u.Error() } return nil, errorcode.WorkflowErr(errStr) } return output, nil } // ProductIngestData 对应 HYTGoodsAddPropertyTemplateZH 的结构 type GoodsAddProductIngestData struct { Title string `json:"商品标题"` GoodsCode string `json:"商品编码"` SpuName string `json:"SPU名称"` SpuCode string `json:"SPU编码"` GoodsNum string `json:"商品货号"` GoodsBarCode string `json:"商品条形码"` Price string `json:"市场价"` SalesPrice string `json:"建议销售价"` ExternalPrice string `json:"电商销售价格"` Unit string `json:"单位"` Discount string `json:"折扣(%)"` TaxRate string `json:"税率(%)"` FreightTemplate string `json:"运费模版"` SellByDate string `json:"保质期"` SellByDateUnit string `json:"保质期单位"` Brand string `json:"品牌"` IsHot string `json:"是否热销主推"` ExternalUrl string `json:"外部平台链接"` Introduction string `json:"商品卖点"` GoodsAttributes string `json:"商品规格参数"` GoodsIllustration string `json:"商品说明"` Remark string `json:"备注"` CategoryName string `json:"分类名称"` Images []string `json:"电脑端主图"` } // GoodsAddContext Graph 执行上下文状态 type GoodsAddContext struct { mu *sync.Mutex InputText string IngestData *GoodsAddProductIngestData // 核心请求体 AddGoodsReq *goods_add.GoodsAddRequest // 中间态数据 BrandId int CategoryId int BrandName string CategoryName string // 运行结果 GoodsId int Result map[string]any } // buildWorkflow 构建基于 Graph 的并行工作流 func (o *goodsAdd) buildWorkflow(ctx context.Context) (compose.Runnable[*GoodsAddWorkflowInput, map[string]any], error) { g := compose.NewGraph[*GoodsAddWorkflowInput, map[string]any]() // 1. DataMapping 节点: 解析 JSON -> 填充基础 Request g.AddLambdaNode("data_mapping", compose.InvokableLambda(func(ctx context.Context, in *GoodsAddWorkflowInput) (*GoodsAddContext, error) { state := &GoodsAddContext{ mu: &sync.Mutex{}, // 初始化锁 InputText: in.Text, AddGoodsReq: &goods_add.GoodsAddRequest{}, Result: make(map[string]any), } // 解析用户输入的中文 JSON var ingestData GoodsAddProductIngestData if err := json.Unmarshal([]byte(in.Text), &ingestData); err != nil { return nil, fmt.Errorf("解析商品数据失败: %w", err) } // 必填校验 if ingestData.Title == "" { return nil, errors.New("商品标题不能为空") } if ingestData.GoodsCode == "" { return nil, errors.New("商品编码不能为空") } if ingestData.SpuName == "" { return nil, errors.New("SPU名称不能为空") } if ingestData.SpuCode == "" { return nil, errors.New("SPU编码不能为空") } if ingestData.Price == "" { return nil, errors.New("市场价不能为空") } if ingestData.SalesPrice == "" { return nil, errors.New("建议销售价不能为空") } if ingestData.Unit == "" { return nil, errors.New("价格单位不能为空") } if ingestData.Discount == "" { return nil, errors.New("折扣不能为空") } if ingestData.TaxRate == "" { return nil, errors.New("税率不能为空") } state.IngestData = &ingestData state.BrandName = ingestData.Brand state.CategoryName = ingestData.CategoryName // 映射字段到 AddGoodsReq state.AddGoodsReq.Title = ingestData.Title state.AddGoodsReq.GoodsCode = ingestData.GoodsCode state.AddGoodsReq.SpuName = ingestData.SpuName state.AddGoodsReq.SpuCode = ingestData.SpuCode state.AddGoodsReq.GoodsNum = ingestData.GoodsNum state.AddGoodsReq.GoodsBarCode = ingestData.GoodsBarCode // 价格处理 if val, err := strconv.ParseFloat(strings.TrimSuffix(ingestData.Price, "元"), 64); err == nil { state.AddGoodsReq.Price = val } if val, err := strconv.ParseFloat(strings.TrimSuffix(ingestData.SalesPrice, "元"), 64); err == nil { state.AddGoodsReq.SalesPrice = val } if val, err := strconv.ParseFloat(strings.TrimSuffix(ingestData.ExternalPrice, "元"), 64); err == nil && state.AddGoodsReq.Price == 0 { state.AddGoodsReq.ExternalPrice = val } state.AddGoodsReq.Unit = ingestData.Unit // 折扣处理 "80%" -> 80 discountStr := strings.TrimSuffix(ingestData.Discount, "%") if val, err := strconv.Atoi(discountStr); err == nil { state.AddGoodsReq.Discount = val } // 税率处理 "13%" -> 13 taxStr := strings.TrimSuffix(strings.TrimSuffix(ingestData.TaxRate, "%"), " ") if val, err := strconv.Atoi(taxStr); err == nil { state.AddGoodsReq.TaxRate = val } // 运费模板先不给 state.AddGoodsReq.FreightId = 3 // 保质期处理 "180天" -> 180 sellByDateStr := strings.TrimSuffix(ingestData.SellByDate, "天") if val, err := strconv.Atoi(sellByDateStr); err == nil { state.AddGoodsReq.SellByDate = val } state.AddGoodsReq.SellByDateUnit = ingestData.SellByDateUnit // state.AddGoodsReq.BrandId 品牌ID后续赋值 state.AddGoodsReq.IsHot = 2 if ingestData.IsHot == "是" { state.AddGoodsReq.IsHot = 1 } state.AddGoodsReq.ExternalUrl = ingestData.ExternalUrl state.AddGoodsReq.Introduction = ingestData.Introduction state.AddGoodsReq.GoodsAttributes = ingestData.GoodsAttributes state.AddGoodsReq.GoodsIllustration = ingestData.GoodsIllustration state.AddGoodsReq.Remark = ingestData.Remark state.AddGoodsReq.IsComposeGoods = 2 // 非组合商品 return state, nil })) // 2. 获取品牌ID 节点 (并行) g.AddLambdaNode("get_brand_id", compose.InvokableLambda(func(ctx context.Context, state *GoodsAddContext) (*GoodsAddContext, error) { if state.BrandName == "" { return state, errors.New("品牌名称不能为空") } brandId, err := o.toolManager.Hyt.GoodsBrandSearch.Call(ctx, state.BrandName) if err != nil { log.Printf("warning: 品牌ID获取失败,%s: %v\n", state.BrandName, err) // 如果获取失败,不阻断后续流程 return nil, nil } state.mu.Lock() defer state.mu.Unlock() state.BrandId = brandId state.AddGoodsReq.BrandId = brandId return state, nil })) // 3. 获取分类ID 节点 (并行) g.AddLambdaNode("get_category_id", compose.InvokableLambda(func(ctx context.Context, state *GoodsAddContext) (*GoodsAddContext, error) { if state.CategoryName == "" { return state, errors.New("分类名称不能为空") } categoryId, err := o.toolManager.Hyt.GoodsCategorySearch.Call(ctx, state.CategoryName) if err != nil { log.Printf("warning: 分类ID获取失败,%s: %v\n", state.CategoryName, err) // 如果获取失败,不阻断后续流程 return nil, nil } state.mu.Lock() defer state.mu.Unlock() state.CategoryId = categoryId return state, nil })) // 4. 新增商品 节点 (依赖 get_brand_id) g.AddLambdaNode("goods_add", compose.InvokableLambda(func(ctx context.Context, state *GoodsAddContext) (*GoodsAddContext, error) { // 校验 BrandId if state.AddGoodsReq.BrandId == 0 { return nil, errors.New("Missing Brand ID") } // 调用 goods_add 工具 goodsId, err := o.toolManager.Hyt.GoodsAdd.Call(ctx, state.AddGoodsReq) if err != nil { return nil, fmt.Errorf("新增商品失败: %w", err) } state.GoodsId = goodsId state.Result["goods_id"] = state.GoodsId state.Result["spu_code"] = state.AddGoodsReq.SpuCode return state, nil })) // 5. 新增商品分类 节点 (依赖 goods_add 和 get_category_id) g.AddLambdaNode("goods_category_add", compose.InvokableLambda(func(ctx context.Context, state *GoodsAddContext) (*GoodsAddContext, error) { if state.GoodsId == 0 { return nil, errors.New("goods_id is 0") } if state.CategoryId == 0 { return nil, errors.New("category_id is 0") } req := &goods_category_add.GoodsCategoryAddRequest{ GoodsId: state.GoodsId, CategoryIds: []int{state.CategoryId}, IsCover: false, } _, err := o.toolManager.Hyt.GoodsCategoryAdd.Call(ctx, req) if err != nil { log.Printf("warning: 关联分类失败: %v", err) state.mu.Lock() state.Result["category_error"] = err.Error() state.mu.Unlock() } else { state.mu.Lock() state.Result["category_added"] = true state.mu.Unlock() } return state, nil })) // 6. 新增商品图片 节点 (依赖 goods_add) g.AddLambdaNode("goods_media_add", compose.InvokableLambda(func(ctx context.Context, state *GoodsAddContext) (*GoodsAddContext, error) { if state.GoodsId == 0 { return nil, errors.New("goods_id is 0") } if len(state.IngestData.Images) == 0 { return state, nil } req := &goods_media_add.GoodsMediaAddRequest{ GoodsId: state.GoodsId, IsCover: true, Data: make([]goods_media_add.MediaItem, 0), } for i, url := range state.IngestData.Images { req.Data = append(req.Data, goods_media_add.MediaItem{ Type: 1, // 图片 Url: url, Sort: i, }) } _, err := o.toolManager.Hyt.GoodsMediaAdd.Call(ctx, req) if err != nil { log.Printf("warning: 添加图片失败: %v", err) state.mu.Lock() state.Result["media_error"] = err.Error() state.mu.Unlock() } else { state.mu.Lock() state.Result["media_added"] = true state.mu.Unlock() } return state, nil })) // 7. 结果格式化节点 g.AddLambdaNode("format_output", compose.InvokableLambda(func(ctx context.Context, state *GoodsAddContext) (map[string]any, error) { return state.Result, nil })) // 构建边 (DAG) // Start -> DataMapping g.AddEdge(compose.START, "data_mapping") // Branching: DataMapping -> GetBrandId, DataMapping -> GetCategoryId g.AddEdge("data_mapping", "get_brand_id") g.AddEdge("data_mapping", "get_category_id") // Synchronization for GoodsAdd: Need BrandId g.AddEdge("get_brand_id", "goods_add") // Synchronization for CategoryAdd: Need GoodsId AND CategoryId // Eino supports multi-predecessor nodes which act as merge points. // state merging is handled by the framework (usually last writer wins or custom merge, but here we modify different fields/mutex). // However, we need to ensure goods_add is done. g.AddEdge("goods_add", "goods_category_add") g.AddEdge("get_category_id", "goods_category_add") // Synchronization for MediaAdd: Need GoodsId g.AddEdge("goods_add", "goods_media_add") // Final Merge g.AddEdge("goods_category_add", "format_output") g.AddEdge("goods_media_add", "format_output") g.AddEdge("format_output", compose.END) return g.Compile(ctx) }