373 lines
11 KiB
Go
373 lines
11 KiB
Go
package hyt
|
||
|
||
import (
|
||
"ai_scheduler/internal/config"
|
||
errorcode "ai_scheduler/internal/data/error"
|
||
toolManager "ai_scheduler/internal/domain/tools"
|
||
"ai_scheduler/internal/domain/tools/hyt/goods_add"
|
||
"ai_scheduler/internal/domain/tools/hyt/goods_category_add"
|
||
"ai_scheduler/internal/domain/tools/hyt/goods_media_add"
|
||
"ai_scheduler/internal/domain/workflow/runtime"
|
||
"ai_scheduler/internal/entitys"
|
||
"context"
|
||
"encoding/json"
|
||
"errors"
|
||
"fmt"
|
||
"log"
|
||
"strconv"
|
||
"strings"
|
||
"sync"
|
||
|
||
"github.com/cloudwego/eino/compose"
|
||
"golang.org/x/sync/errgroup"
|
||
)
|
||
|
||
const WorkflowIDGoodsAdd = "hyt.goodsAdd"
|
||
|
||
func init() {
|
||
runtime.Register(WorkflowIDGoodsAdd, func(d *runtime.Deps) (runtime.Workflow, error) {
|
||
return &goodsAdd{cfg: d.Conf, toolManager: d.ToolManager}, nil
|
||
})
|
||
}
|
||
|
||
type goodsAdd struct {
|
||
cfg *config.Config
|
||
toolManager *toolManager.Manager
|
||
data *GoodsAddWorkflowInput
|
||
}
|
||
|
||
type GoodsAddWorkflowInput struct {
|
||
Text string `mapstructure:"text"`
|
||
}
|
||
|
||
func (o *goodsAdd) ID() string { return WorkflowIDGoodsAdd }
|
||
|
||
func (o *goodsAdd) Invoke(ctx context.Context, rec *entitys.Recognize) (map[string]any, error) {
|
||
// 构建工作流
|
||
runnable, err := o.buildWorkflow(ctx)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
o.data = &GoodsAddWorkflowInput{
|
||
Text: rec.UserContent.Text,
|
||
}
|
||
// 工作流过程调用
|
||
output, err := runnable.Invoke(ctx, o.data)
|
||
if err != nil {
|
||
fmt.Println("Invoke err:", err)
|
||
errStr := err.Error()
|
||
if u := errors.Unwrap(err); u != nil {
|
||
errStr = u.Error()
|
||
}
|
||
return nil, errorcode.WorkflowErr(errStr)
|
||
}
|
||
|
||
return output, nil
|
||
}
|
||
|
||
// ProductIngestData 对应 HYTGoodsAddPropertyTemplateZH 的结构
|
||
type GoodsAddProductIngestData struct {
|
||
Title string `json:"商品标题"`
|
||
GoodsCode string `json:"商品编码"`
|
||
SpuName string `json:"SPU名称"`
|
||
SpuCode string `json:"SPU编码"`
|
||
GoodsNum string `json:"商品货号"`
|
||
GoodsBarCode string `json:"商品条形码"`
|
||
Price string `json:"市场价"`
|
||
SalesPrice string `json:"建议销售价"`
|
||
ExternalPrice string `json:"电商销售价格"`
|
||
Unit string `json:"单位"`
|
||
Discount string `json:"折扣"`
|
||
TaxRate string `json:"税率"`
|
||
FreightTemplate string `json:"运费模版"`
|
||
SellByDate string `json:"保质期"`
|
||
SellByDateUnit string `json:"保质期单位"`
|
||
Brand string `json:"品牌"`
|
||
IsHot string `json:"是否热销主推"`
|
||
ExternalUrl string `json:"外部平台链接"`
|
||
Introduction string `json:"商品卖点"`
|
||
GoodsAttributes string `json:"商品规格参数"`
|
||
GoodsIllustration string `json:"商品说明"`
|
||
Remark string `json:"备注"`
|
||
CategoryName string `json:"分类名称"`
|
||
Images []string `json:"电脑端主图"`
|
||
}
|
||
|
||
// GoodsAddContext Graph 执行上下文状态
|
||
type GoodsAddContext struct {
|
||
mu *sync.Mutex
|
||
InputText string
|
||
IngestData *GoodsAddProductIngestData
|
||
|
||
// 核心请求体
|
||
AddGoodsReq *goods_add.GoodsAddRequest
|
||
|
||
// 中间态数据
|
||
BrandId int
|
||
CategoryId int
|
||
BrandName string
|
||
CategoryName string
|
||
|
||
// 运行结果
|
||
GoodsAddResp *goods_add.GoodsAddResponse
|
||
GoodsCategoryAddResp bool
|
||
GoodsMediaAddResp bool
|
||
}
|
||
|
||
// buildWorkflow 构建基于 Graph 的并行工作流
|
||
func (o *goodsAdd) buildWorkflow(ctx context.Context) (compose.Runnable[*GoodsAddWorkflowInput, map[string]any], error) {
|
||
g := compose.NewGraph[*GoodsAddWorkflowInput, map[string]any]()
|
||
|
||
// 1. DataMapping 节点: 解析 JSON -> 填充基础 Request
|
||
g.AddLambdaNode("data_mapping", compose.InvokableLambda(func(ctx context.Context, in *GoodsAddWorkflowInput) (*GoodsAddContext, error) {
|
||
state := &GoodsAddContext{
|
||
mu: &sync.Mutex{}, // 初始化锁
|
||
InputText: in.Text,
|
||
AddGoodsReq: &goods_add.GoodsAddRequest{},
|
||
}
|
||
|
||
// 解析用户输入的中文 JSON
|
||
var ingestData GoodsAddProductIngestData
|
||
if err := json.Unmarshal([]byte(in.Text), &ingestData); err != nil {
|
||
return nil, fmt.Errorf("解析商品数据失败")
|
||
}
|
||
|
||
// 必填校验
|
||
if ingestData.Title == "" {
|
||
return nil, errors.New("商品标题不能为空")
|
||
}
|
||
if ingestData.GoodsCode == "" {
|
||
return nil, errors.New("商品编码不能为空")
|
||
}
|
||
if ingestData.SpuName == "" {
|
||
return nil, errors.New("SPU名称不能为空")
|
||
}
|
||
if ingestData.SpuCode == "" {
|
||
return nil, errors.New("SPU编码不能为空")
|
||
}
|
||
if ingestData.Price == "" {
|
||
return nil, errors.New("市场价不能为空")
|
||
}
|
||
if ingestData.SalesPrice == "" {
|
||
return nil, errors.New("建议销售价不能为空")
|
||
}
|
||
if ingestData.Unit == "" {
|
||
return nil, errors.New("价格单位不能为空")
|
||
}
|
||
if ingestData.Discount == "" {
|
||
return nil, errors.New("折扣不能为空")
|
||
}
|
||
if ingestData.TaxRate == "" {
|
||
return nil, errors.New("税率不能为空")
|
||
}
|
||
|
||
state.IngestData = &ingestData
|
||
state.BrandName = ingestData.Brand
|
||
state.CategoryName = ingestData.CategoryName
|
||
|
||
// 映射字段到 AddGoodsReq
|
||
state.AddGoodsReq.Title = ingestData.Title
|
||
state.AddGoodsReq.GoodsCode = ingestData.GoodsCode
|
||
state.AddGoodsReq.SpuName = ingestData.SpuName
|
||
state.AddGoodsReq.SpuCode = ingestData.SpuCode
|
||
state.AddGoodsReq.GoodsNum = ingestData.GoodsNum
|
||
state.AddGoodsReq.GoodsBarCode = ingestData.GoodsBarCode
|
||
|
||
// 价格处理
|
||
if val, err := strconv.ParseFloat(strings.TrimSuffix(ingestData.Price, "元"), 64); err == nil {
|
||
state.AddGoodsReq.Price = val
|
||
}
|
||
if val, err := strconv.ParseFloat(strings.TrimSuffix(ingestData.SalesPrice, "元"), 64); err == nil {
|
||
state.AddGoodsReq.SalesPrice = val
|
||
}
|
||
if val, err := strconv.ParseFloat(strings.TrimSuffix(ingestData.ExternalPrice, "元"), 64); err == nil {
|
||
state.AddGoodsReq.ExternalPrice = val
|
||
}
|
||
|
||
state.AddGoodsReq.Unit = ingestData.Unit
|
||
|
||
// 折扣处理 "80%" -> 80
|
||
discountStr := strings.TrimSuffix(ingestData.Discount, "%")
|
||
if val, err := strconv.Atoi(discountStr); err == nil {
|
||
state.AddGoodsReq.Discount = val
|
||
}
|
||
// 税率处理 "13%" -> 13
|
||
taxStr := strings.TrimSuffix(strings.TrimSuffix(ingestData.TaxRate, "%"), " ")
|
||
if val, err := strconv.Atoi(taxStr); err == nil {
|
||
state.AddGoodsReq.TaxRate = val
|
||
}
|
||
|
||
// 运费模板先不给 state.AddGoodsReq.FreightId = 3
|
||
|
||
// 保质期处理 "180天" -> 180
|
||
sellByDateStr := strings.TrimSuffix(ingestData.SellByDate, "天")
|
||
if val, err := strconv.Atoi(sellByDateStr); err == nil {
|
||
state.AddGoodsReq.SellByDate = val
|
||
}
|
||
state.AddGoodsReq.SellByDateUnit = ingestData.SellByDateUnit
|
||
|
||
// state.AddGoodsReq.BrandId 品牌ID后续赋值
|
||
|
||
state.AddGoodsReq.IsHot = 2
|
||
if ingestData.IsHot == "是" {
|
||
state.AddGoodsReq.IsHot = 1
|
||
}
|
||
|
||
state.AddGoodsReq.ExternalUrl = ingestData.ExternalUrl
|
||
state.AddGoodsReq.Introduction = ingestData.Introduction
|
||
state.AddGoodsReq.GoodsAttributes = ingestData.GoodsAttributes
|
||
state.AddGoodsReq.GoodsIllustration = ingestData.GoodsIllustration
|
||
state.AddGoodsReq.Remark = ingestData.Remark
|
||
state.AddGoodsReq.IsComposeGoods = 2 // 非组合商品
|
||
|
||
return state, nil
|
||
}))
|
||
|
||
// 2. 预处理节点: 并行获取 品牌ID 和 分类ID
|
||
g.AddLambdaNode("prepare_info", compose.InvokableLambda(func(ctx context.Context, state *GoodsAddContext) (*GoodsAddContext, error) {
|
||
eg, ctx := errgroup.WithContext(ctx)
|
||
|
||
// 任务1: 获取品牌ID
|
||
eg.Go(func() error {
|
||
if state.BrandName == "" {
|
||
return nil
|
||
}
|
||
brandId, err := o.toolManager.Hyt.GoodsBrandSearch.Call(ctx, state.BrandName)
|
||
if err != nil {
|
||
log.Printf("warning: 品牌ID获取失败,%s: %v\n", state.BrandName, err)
|
||
return nil
|
||
}
|
||
state.mu.Lock()
|
||
state.BrandId = brandId
|
||
state.AddGoodsReq.BrandId = brandId
|
||
state.mu.Unlock()
|
||
return nil
|
||
})
|
||
|
||
// 任务2: 获取分类ID
|
||
eg.Go(func() error {
|
||
if state.CategoryName == "" {
|
||
return nil
|
||
}
|
||
categoryId, err := o.toolManager.Hyt.GoodsCategorySearch.Call(ctx, state.CategoryName)
|
||
if err != nil {
|
||
log.Printf("warning: 分类ID获取失败,%s: %v\n", state.CategoryName, err)
|
||
return nil
|
||
}
|
||
state.mu.Lock()
|
||
state.CategoryId = categoryId
|
||
state.mu.Unlock()
|
||
return nil
|
||
})
|
||
|
||
// 等待所有任务完成
|
||
_ = eg.Wait()
|
||
|
||
return state, nil
|
||
}))
|
||
|
||
// 3. 新增商品 节点 (依赖 prepare_info)
|
||
g.AddLambdaNode("goods_add", compose.InvokableLambda(func(ctx context.Context, state *GoodsAddContext) (*GoodsAddContext, error) {
|
||
// 调用 goods_add 工具
|
||
respData, err := o.toolManager.Hyt.GoodsAdd.Call(ctx, state.AddGoodsReq)
|
||
if err != nil || respData == nil {
|
||
return nil, fmt.Errorf("新增商品失败")
|
||
}
|
||
|
||
state.GoodsAddResp = respData
|
||
|
||
return state, nil
|
||
}))
|
||
|
||
// 4. 后置处理节点: 并行执行 关联分类 和 添加图片
|
||
g.AddLambdaNode("post_process", compose.InvokableLambda(func(ctx context.Context, state *GoodsAddContext) (*GoodsAddContext, error) {
|
||
if state.GoodsAddResp.Id == 0 {
|
||
return nil, errors.New("商品不存在")
|
||
}
|
||
|
||
eg, ctx := errgroup.WithContext(ctx)
|
||
|
||
// 任务1: 关联分类
|
||
eg.Go(func() error {
|
||
if state.CategoryId == 0 {
|
||
return nil
|
||
}
|
||
req := &goods_category_add.GoodsCategoryAddRequest{
|
||
GoodsId: state.GoodsAddResp.Id,
|
||
CategoryIds: []int{state.CategoryId},
|
||
IsCover: false,
|
||
}
|
||
isSuccess, err := o.toolManager.Hyt.GoodsCategoryAdd.Call(ctx, req)
|
||
if err != nil {
|
||
log.Printf("warning: 关联分类失败: %v", err)
|
||
return nil
|
||
}
|
||
|
||
state.mu.Lock()
|
||
state.GoodsCategoryAddResp = isSuccess
|
||
state.mu.Unlock()
|
||
|
||
return nil
|
||
})
|
||
|
||
// 任务2: 添加图片
|
||
eg.Go(func() error {
|
||
if len(state.IngestData.Images) == 0 {
|
||
return nil
|
||
}
|
||
req := &goods_media_add.GoodsMediaAddRequest{
|
||
GoodsId: state.GoodsAddResp.Id,
|
||
IsCover: true,
|
||
Data: make([]goods_media_add.MediaItem, 0),
|
||
}
|
||
for i, url := range state.IngestData.Images {
|
||
req.Data = append(req.Data, goods_media_add.MediaItem{
|
||
Type: 1, // 图片
|
||
Url: url,
|
||
Sort: i,
|
||
})
|
||
}
|
||
isSuccess, err := o.toolManager.Hyt.GoodsMediaAdd.Call(ctx, req)
|
||
if err != nil {
|
||
log.Printf("warning: 添加图片失败: %v", err)
|
||
return nil
|
||
}
|
||
|
||
state.mu.Lock()
|
||
state.GoodsMediaAddResp = isSuccess
|
||
state.mu.Unlock()
|
||
|
||
return nil
|
||
})
|
||
|
||
// 等待所有任务完成
|
||
_ = eg.Wait()
|
||
|
||
return state, nil
|
||
}))
|
||
|
||
// 5. 结果格式化节点
|
||
g.AddLambdaNode("format_output", compose.InvokableLambda(func(ctx context.Context, state *GoodsAddContext) (map[string]any, error) {
|
||
if state.GoodsAddResp == nil {
|
||
return nil, fmt.Errorf("goods add response is nil")
|
||
}
|
||
|
||
return map[string]any{
|
||
"预览URL(货易通商品列表)": state.GoodsAddResp.PreviewUrl,
|
||
"SPU编码": state.GoodsAddResp.SpuCode,
|
||
"商品ID": state.GoodsAddResp.Id,
|
||
}, nil
|
||
}))
|
||
|
||
// 构建边 (线性拓扑)
|
||
g.AddEdge(compose.START, "data_mapping")
|
||
g.AddEdge("data_mapping", "prepare_info")
|
||
g.AddEdge("prepare_info", "goods_add")
|
||
g.AddEdge("goods_add", "post_process")
|
||
g.AddEdge("post_process", "format_output")
|
||
g.AddEdge("format_output", compose.END)
|
||
|
||
return g.Compile(ctx)
|
||
}
|