diff --git a/cmd/server/wire.go b/cmd/server/wire.go index 8a29212..e34c611 100644 --- a/cmd/server/wire.go +++ b/cmd/server/wire.go @@ -4,16 +4,16 @@ package main import ( - "ai_scheduler/internal/biz" - "ai_scheduler/internal/config" - "ai_scheduler/internal/data/impl" - "ai_scheduler/internal/pkg" - "ai_scheduler/internal/server" - "ai_scheduler/internal/services" - "ai_scheduler/internal/domain/workflow" - "ai_scheduler/internal/tools" - "ai_scheduler/internal/tools_bot" - "ai_scheduler/utils" + "ai_scheduler/internal/biz" + "ai_scheduler/internal/config" + "ai_scheduler/internal/data/impl" + "ai_scheduler/internal/domain/workflow" + "ai_scheduler/internal/pkg" + "ai_scheduler/internal/server" + "ai_scheduler/internal/services" + "ai_scheduler/internal/tools" + "ai_scheduler/internal/tools_bot" + "ai_scheduler/utils" "github.com/gofiber/fiber/v2/log" "github.com/google/wire" @@ -21,17 +21,16 @@ import ( // InitializeApp 初始化应用程序 func InitializeApp(*config.Config, log.AllLogger) (*server.Servers, func(), error) { - panic(wire.Build( - server.ProviderSetServer, - llm.ProviderSet, - workflow.ProviderSetWorkflow, - tools.ProviderSetTools, - pkg.ProviderSetClient, - services.ProviderSetServices, - biz.ProviderSetBiz, - impl.ProviderImpl, - utils.ProviderUtils, - tools_bot.ProviderSetBotTools, - )) + panic(wire.Build( + server.ProviderSetServer, + workflow.ProviderSetWorkflow, + tools.ProviderSetTools, + pkg.ProviderSetClient, + services.ProviderSetServices, + biz.ProviderSetBiz, + impl.ProviderImpl, + utils.ProviderUtils, + tools_bot.ProviderSetBotTools, + )) } diff --git a/internal/biz/do/handle.go b/internal/biz/do/handle.go index 2028f54..e5b30b4 100644 --- a/internal/biz/do/handle.go +++ b/internal/biz/do/handle.go @@ -288,23 +288,16 @@ func (r *Handle) handleApiTask(ctx context.Context, requireData *entitys.Require func (r *Handle) handleEinoWorkflow(ctx context.Context, requireData *entitys.RequireData, task *model.AiTask) (err error) { // token 写入ctx ctx = util.SetTokenToContext(ctx, requireData.Auth) - // 解析入参:workflow_id 与 input - var params map[string]any - if len(requireData.Match.Parameters) > 0 { - _ = json.Unmarshal([]byte(requireData.Match.Parameters), ¶ms) - } - wfID, _ := params["workflow_id"].(string) - input, _ := params["input"].(map[string]any) - if wfID == "" { - return fmt.Errorf("workflow_id 不能为空") - } + entitys.ResLoading(requireData.Ch, requireData.Task.Index, "正在执行工作流") - res, err := r.workflowManager.Invoke(ctx, wfID, input) + + // 工作流内部输出 + workflowId := task.Index + _, err = r.workflowManager.Invoke(ctx, workflowId, requireData) if err != nil { return err } - b, _ := json.Marshal(res) - entitys.ResJson(requireData.Ch, requireData.Task.Index, string(b)) + return nil } diff --git a/internal/domain/tools/zltx/order_after_reseller_batch/client.go b/internal/domain/tools/zltx/order_after_reseller_batch/client.go index 90da543..efcf29c 100644 --- a/internal/domain/tools/zltx/order_after_reseller_batch/client.go +++ b/internal/domain/tools/zltx/order_after_reseller_batch/client.go @@ -10,13 +10,13 @@ import ( "fmt" ) -func Call(ctx context.Context, cfg config.ToolConfig, orderNumbers []string) (OrderAfterSaleResellerBatchData, error) { +func Call(ctx context.Context, cfg config.ToolConfig, orderNumbers []string) (*OrderAfterSaleResellerBatchResponse, error) { if len(orderNumbers) == 0 { - return OrderAfterSaleResellerBatchData{}, errors.New("批充订单号不能为空") + return nil, errors.New("批充订单号不能为空") } token := util.GetTokenFromContext(ctx) if token == "" { - return OrderAfterSaleResellerBatchData{}, errors.New("token 未注入") + return nil, errors.New("token 未注入") } r := l_request.Request{ Url: cfg.BaseURL, @@ -31,17 +31,18 @@ func Call(ctx context.Context, cfg config.ToolConfig, orderNumbers []string) (Or } res, err := r.Send() if err != nil { - return OrderAfterSaleResellerBatchData{}, err + return nil, err } - var response OrderAfterSaleResellerBatchResponse + + response := &OrderAfterSaleResellerBatchResponse{} if err = json.Unmarshal(res.Content, &response); err != nil { - return OrderAfterSaleResellerBatchData{}, err + return nil, err } if response.Code != 200 { - return OrderAfterSaleResellerBatchData{}, fmt.Errorf("售后订单查询异常: %s", response.Error) + return nil, fmt.Errorf("售后订单查询异常: %s", response.Error) } if len(response.Data.Data) == 0 { - return OrderAfterSaleResellerBatchData{}, errors.New("未查询到相应售后订单,请核实订单号是否正确") + return nil, errors.New("未查询到相应售后订单,请核实订单号是否正确") } - return response.Data, nil + return response, nil } diff --git a/internal/domain/tools/zltx/order_after_reseller_batch/invokable.go b/internal/domain/tools/zltx/order_after_reseller_batch/invokable.go index 34da7e1..e7e99b5 100644 --- a/internal/domain/tools/zltx/order_after_reseller_batch/invokable.go +++ b/internal/domain/tools/zltx/order_after_reseller_batch/invokable.go @@ -13,7 +13,7 @@ type Args struct { } func NewInvokable(cfg config.ToolConfig) tool.InvokableTool { - run := func(ctx context.Context, in Args) (OrderAfterSaleResellerBatchData, error) { + run := func(ctx context.Context, in Args) (*OrderAfterSaleResellerBatchResponse, error) { return Call(ctx, cfg, in.OrderNumber) } t, err := toolutils.InferTool("zltxOrderAfterSaleResellerBatch", "直连天下下游分销商批充订单售后工具", run) diff --git a/internal/domain/tools/zltx/order_after_reseller_batch/types.go b/internal/domain/tools/zltx/order_after_reseller_batch/types.go index 9d2a071..3e35115 100644 --- a/internal/domain/tools/zltx/order_after_reseller_batch/types.go +++ b/internal/domain/tools/zltx/order_after_reseller_batch/types.go @@ -1,14 +1,14 @@ package order_after_reseller_batch type OrderAfterSaleResellerBatchResponse struct { - Code int `json:"code"` - Error string `json:"error"` - Data OrderAfterSaleResellerBatchData `json:"data"` + Code int `json:"code"` + Error string `json:"error"` + Data *OrderAfterSaleResellerBatchData `json:"data"` } type OrderAfterSaleResellerBatchData struct { - Data []OrderAfterSaleResellerBatchBase `json:"data"` - ExtData map[string]OrderAfterSaleResellerBatchExtItem `json:"extraData"` + Data []*OrderAfterSaleResellerBatchBase `json:"data"` + ExtData map[string]*OrderAfterSaleResellerBatchExtItem `json:"extraData"` } type OrderAfterSaleResellerBatchBase struct { diff --git a/internal/domain/workflow/runtime/registry.go b/internal/domain/workflow/runtime/registry.go index 1260173..8ae7a82 100644 --- a/internal/domain/workflow/runtime/registry.go +++ b/internal/domain/workflow/runtime/registry.go @@ -2,6 +2,7 @@ package runtime import ( "ai_scheduler/internal/config" + "ai_scheduler/internal/entitys" "ai_scheduler/internal/pkg/utils_ollama" "context" "errors" @@ -11,7 +12,7 @@ import ( type Workflow interface { ID() string Schema() map[string]any - Invoke(ctx context.Context, input map[string]any) (map[string]any, error) + Invoke(ctx context.Context, requireData *entitys.RequireData) (map[string]any, error) } type Deps struct { @@ -62,10 +63,7 @@ func Default() *Registry { return r } -func (r *Registry) Invoke(ctx context.Context, id string, input map[string]any) (map[string]any, error) { - if input == nil { - input = map[string]any{} - } +func (r *Registry) Invoke(ctx context.Context, id string, requireData *entitys.RequireData) (map[string]any, error) { regMu.RLock() f, ok := factories[id] regMu.RUnlock() @@ -91,5 +89,5 @@ func (r *Registry) Invoke(ctx context.Context, id string, input map[string]any) r.mu.Unlock() } - return w.Invoke(ctx, input) + return w.Invoke(ctx, requireData) } diff --git a/internal/domain/workflow/zltx/order_after_reseller_batch.go b/internal/domain/workflow/zltx/order_after_reseller_batch.go index e00b726..4d151d2 100644 --- a/internal/domain/workflow/zltx/order_after_reseller_batch.go +++ b/internal/domain/workflow/zltx/order_after_reseller_batch.go @@ -2,12 +2,17 @@ package zltx import ( "ai_scheduler/internal/config" + "ai_scheduler/internal/data/model" toolZoarb "ai_scheduler/internal/domain/tools/zltx/order_after_reseller_batch" "ai_scheduler/internal/domain/workflow/runtime" + "ai_scheduler/internal/entitys" + "ai_scheduler/internal/pkg/util" "context" + "encoding/json" "errors" "github.com/cloudwego/eino/compose" + "github.com/cloudwego/eino/schema" ) func init() { @@ -17,7 +22,57 @@ func init() { } type orderAfterSaleResellerBatch struct { - cfg config.ToolConfig + cfg config.ToolConfig + data *OrderAfterSaleResellerBatchWorkflowInput +} + +// 工作流入参 +type OrderAfterSaleResellerBatchWorkflowInput struct { + Ch chan entitys.Response // 响应通道 + UserInput string // 用户输入文本 + FileContent string // 文件解析结果 + UserHistory []model.AiChatHi // 用户对话历史 + ParameterResult string // 参数解析结果 + Data *OrderAfterSaleResellerBatchNodeData // 节点所需参数 +} + +// 节点所需参数 +type OrderAfterSaleResellerBatchNodeData struct { + OrderNumber []string `json:"orderNumber"` // 订单号 + AfterType string `json:"afterType"` // 处理方式 1.退款 2.扣款 + AfterSalesPrice string `json:"afterSalesPrice"` // 售后金额 + AfterSalesReason string `json:"afterSalesReason"` // 售后原因 + ResponsibleType string `json:"responsibleType"` // 费用承担者 1.供应商 2.商务 3.公司 4.无 + ResponsiblePerson string `json:"responsiblePerson"` // 费用承担供应商 +} + +// 工作流出参 +type OrderAfterSaleResellerBatchWorkflowOutput struct { + Code int `json:"code"` + Msg string `json:"msg"` + Data []*OrderAfterSaleResellerBatchData `json:"data"` +} + +type OrderAfterSaleResellerBatchData struct { + OrderType int `json:"orderType"` + OrderNumber string `json:"orderNumber"` + OrderAmount float64 `json:"orderAmount"` + OrderPrice float64 `json:"orderPrice"` + SignCompany int `json:"signCompany"` + OrderQuantity int `json:"orderQuantity"` + ResellerID int `json:"resellerId"` + ResellerName string `json:"resellerName"` + OurProductID int `json:"ourProductId"` + OurProductTitle string `json:"ourProductTitle"` + Account []string `json:"account"` + Platforms map[int]string `json:"platforms"` + AfterType int `json:"afterType"` // 处理方式 1.退款 2.扣款 + Remark string `json:"remark"` // 售后原因 + AfterAmount float64 `json:"afterAmount"` // 售后金额 + ResponsibleType int `json:"responsibleType"` // 费用承担者 1.供应商 2.商务 3.公司 4.无 + ResponsiblePerson string `json:"responsiblePerson"` // 费用承担供应商 + IsExistsAfterSale bool `json:"isExistsAfterSale"` // 是否已存在售后 + CreateTime int `json:"createTime"` // 创建时间 } // ID 返回工作流唯一标识 @@ -33,18 +88,22 @@ func (o *orderAfterSaleResellerBatch) Schema() map[string]any { } // Invoke 调用原有编排工作流并规范化输出 -func (o *orderAfterSaleResellerBatch) Invoke(ctx context.Context, input map[string]any) (map[string]any, error) { +func (o *orderAfterSaleResellerBatch) Invoke(ctx context.Context, requireData *entitys.RequireData) (map[string]any, error) { // 构建工作流 chain, err := o.buildWorkflow(ctx) if err != nil { return nil, err } - var in OrderAfterSaleResellerBatchWorkflowInput - if v, ok := input["orderNumber"].([]string); ok { - in.OrderNumber = v + o.data = &OrderAfterSaleResellerBatchWorkflowInput{ + Ch: requireData.Ch, + UserInput: requireData.Req.Text, + FileContent: "", + UserHistory: requireData.Histories, + ParameterResult: requireData.Match.Parameters, } - _, err = chain.Invoke(ctx, in) + // 工作流过程输出,不关注最终输出 + _, err = chain.Invoke(ctx, o.data) if err != nil { return nil, err } @@ -55,35 +114,117 @@ func (o *orderAfterSaleResellerBatch) Invoke(ctx context.Context, input map[stri return nil, nil } -type OrderAfterSaleResellerBatchWorkflowInput struct { - OrderNumber []string `json:"orderNumber"` -} - var ErrInvalidOrderNumbers = errors.New("orderNumber 不能为空") // buildWorkflow 构建工作流 -func (o *orderAfterSaleResellerBatch) buildWorkflow(ctx context.Context) (compose.Runnable[OrderAfterSaleResellerBatchWorkflowInput, toolZoarb.OrderAfterSaleResellerBatchData], error) { +func (o *orderAfterSaleResellerBatch) buildWorkflow(ctx context.Context) (compose.Runnable[*OrderAfterSaleResellerBatchWorkflowInput, *OrderAfterSaleResellerBatchWorkflowOutput], error) { // 定义工作流、出入参 - c := compose.NewChain[OrderAfterSaleResellerBatchWorkflowInput, toolZoarb.OrderAfterSaleResellerBatchData]() + c := compose.NewChain[*OrderAfterSaleResellerBatchWorkflowInput, *OrderAfterSaleResellerBatchWorkflowOutput]() - // 1.入参解析与校验 - c.AppendLambda(compose.InvokableLambda(func(_ context.Context, in OrderAfterSaleResellerBatchWorkflowInput) (OrderAfterSaleResellerBatchWorkflowInput, error) { + // 1.llm 推断参数 (若需要) + c.AppendLambda(compose.InvokableLambda(func(_ context.Context, in *OrderAfterSaleResellerBatchWorkflowInput) (*schema.Message, error) { + // 已推断完,直接使用 + parameters := in.ParameterResult + return &schema.Message{Content: parameters}, nil + })) + + // 2.参数解析为结构体 + c.AppendLambda(compose.MessageParser( + schema.NewMessageJSONParser[*OrderAfterSaleResellerBatchNodeData](&schema.MessageJSONParseConfig{ + ParseFrom: schema.MessageParseFromContent, + ParseKeyPath: "", // 如果仅需要 parse 子字段,可用 "key.sub.grandsub" + }), + )) + + // 3.参数校验 + c.AppendLambda(compose.InvokableLambda(func(_ context.Context, in *OrderAfterSaleResellerBatchNodeData) (*OrderAfterSaleResellerBatchNodeData, error) { + // 校验必填项 if len(in.OrderNumber) == 0 { - return OrderAfterSaleResellerBatchWorkflowInput{}, ErrInvalidOrderNumbers + return nil, ErrInvalidOrderNumbers } + + o.data.Data = in + return in, nil })) - // 2.调用工具 - c.AppendLambda(compose.InvokableLambda(func(ctx context.Context, in OrderAfterSaleResellerBatchWorkflowInput) (toolZoarb.OrderAfterSaleResellerBatchData, error) { - return toolZoarb.Call(ctx, o.cfg, in.OrderNumber) + // 4.工具调用 + c.AppendLambda(compose.InvokableLambda(func(_ context.Context, in *OrderAfterSaleResellerBatchNodeData) (*toolZoarb.OrderAfterSaleResellerBatchResponse, error) { + entitys.ResLoading(o.data.Ch, o.ID(), "数据拉取中") + + toolRes, err := toolZoarb.Call(ctx, o.cfg, in.OrderNumber) + + entitys.ResLog(o.data.Ch, o.ID(), "数据拉取完成") + + return toolRes, err })) - // 3.结果映射与整形 - c.AppendLambda(compose.InvokableLambda(func(_ context.Context, in toolZoarb.OrderAfterSaleResellerBatchData) (toolZoarb.OrderAfterSaleResellerBatchData, error) { - return in, nil - })) + // 5.结果数据映射 + c.AppendLambda(compose.InvokableLambda(o.dataMapping)) // 编译工作流 return c.Compile(ctx) } + +// 结果数据映射 +func (o *orderAfterSaleResellerBatch) dataMapping(_ context.Context, in *toolZoarb.OrderAfterSaleResellerBatchResponse) (*OrderAfterSaleResellerBatchWorkflowOutput, error) { + entitys.ResLog(o.data.Ch, o.ID(), "数据整理中") + + toolResp := &OrderAfterSaleResellerBatchWorkflowOutput{ + Code: in.Code, + Msg: in.Error, + Data: make([]*OrderAfterSaleResellerBatchData, 0, len(in.Data.Data)), + } + + // 转换数据 + for _, item := range in.Data.Data { + // 处理方式 + afterType := util.StringToInt(o.data.Data.AfterType) + if afterType == 0 { + afterType = 1 // 默认退款 + } + // 费用承担者 + responsibleType := util.StringToInt(o.data.Data.ResponsibleType) + if responsibleType == 0 { + responsibleType = 4 // 默认无 + } + // 售后金额 + afterSalesPrice := util.StringToFloat64(o.data.Data.AfterSalesPrice) + if afterSalesPrice == 0 { + afterSalesPrice = item.OrderPrice + } + + toolResp.Data = append(toolResp.Data, &OrderAfterSaleResellerBatchData{ + OrderType: item.OrderType, + OrderNumber: item.OrderNumber, + OrderAmount: item.OrderAmount, + OrderPrice: item.OrderPrice, + SignCompany: item.SignCompany, + OrderQuantity: item.OrderQuantity, + ResellerID: item.ResellerID, + ResellerName: item.ResellerName, + OurProductID: item.OurProductID, + OurProductTitle: item.OurProductTitle, + Account: item.Account, + Platforms: item.Platforms, + AfterType: afterType, + Remark: o.data.Data.AfterSalesReason, + AfterAmount: afterSalesPrice, + ResponsibleType: responsibleType, + ResponsiblePerson: o.data.Data.ResponsiblePerson, + }) + } + + // 追加扩展数据 + for _, item := range toolResp.Data { + if extItem, ok := in.Data.ExtData[item.OrderNumber]; ok { + item.IsExistsAfterSale = item.OrderType > 100 // 102 批充&已售后 + item.CreateTime = extItem.SerialCreateTime + } + } + + toolRespJson, _ := json.Marshal(toolResp) + entitys.ResJson(o.data.Ch, o.ID(), string(toolRespJson)) + + return toolResp, nil +}