fix: eino 工作流 类型任务接入

This commit is contained in:
fuzhongyun 2025-12-12 18:00:20 +08:00
parent d310bf8104
commit 54321bacf8
7 changed files with 210 additions and 78 deletions

View File

@ -4,16 +4,16 @@
package main
import (
"ai_scheduler/internal/biz"
"ai_scheduler/internal/config"
"ai_scheduler/internal/data/impl"
"ai_scheduler/internal/pkg"
"ai_scheduler/internal/server"
"ai_scheduler/internal/services"
"ai_scheduler/internal/domain/workflow"
"ai_scheduler/internal/tools"
"ai_scheduler/internal/tools_bot"
"ai_scheduler/utils"
"ai_scheduler/internal/biz"
"ai_scheduler/internal/config"
"ai_scheduler/internal/data/impl"
"ai_scheduler/internal/domain/workflow"
"ai_scheduler/internal/pkg"
"ai_scheduler/internal/server"
"ai_scheduler/internal/services"
"ai_scheduler/internal/tools"
"ai_scheduler/internal/tools_bot"
"ai_scheduler/utils"
"github.com/gofiber/fiber/v2/log"
"github.com/google/wire"
@ -21,17 +21,16 @@ import (
// InitializeApp 初始化应用程序
func InitializeApp(*config.Config, log.AllLogger) (*server.Servers, func(), error) {
panic(wire.Build(
server.ProviderSetServer,
llm.ProviderSet,
workflow.ProviderSetWorkflow,
tools.ProviderSetTools,
pkg.ProviderSetClient,
services.ProviderSetServices,
biz.ProviderSetBiz,
impl.ProviderImpl,
utils.ProviderUtils,
tools_bot.ProviderSetBotTools,
))
panic(wire.Build(
server.ProviderSetServer,
workflow.ProviderSetWorkflow,
tools.ProviderSetTools,
pkg.ProviderSetClient,
services.ProviderSetServices,
biz.ProviderSetBiz,
impl.ProviderImpl,
utils.ProviderUtils,
tools_bot.ProviderSetBotTools,
))
}

View File

@ -288,23 +288,16 @@ func (r *Handle) handleApiTask(ctx context.Context, requireData *entitys.Require
func (r *Handle) handleEinoWorkflow(ctx context.Context, requireData *entitys.RequireData, task *model.AiTask) (err error) {
// token 写入ctx
ctx = util.SetTokenToContext(ctx, requireData.Auth)
// 解析入参workflow_id 与 input
var params map[string]any
if len(requireData.Match.Parameters) > 0 {
_ = json.Unmarshal([]byte(requireData.Match.Parameters), &params)
}
wfID, _ := params["workflow_id"].(string)
input, _ := params["input"].(map[string]any)
if wfID == "" {
return fmt.Errorf("workflow_id 不能为空")
}
entitys.ResLoading(requireData.Ch, requireData.Task.Index, "正在执行工作流")
res, err := r.workflowManager.Invoke(ctx, wfID, input)
// 工作流内部输出
workflowId := task.Index
_, err = r.workflowManager.Invoke(ctx, workflowId, requireData)
if err != nil {
return err
}
b, _ := json.Marshal(res)
entitys.ResJson(requireData.Ch, requireData.Task.Index, string(b))
return nil
}

View File

@ -10,13 +10,13 @@ import (
"fmt"
)
func Call(ctx context.Context, cfg config.ToolConfig, orderNumbers []string) (OrderAfterSaleResellerBatchData, error) {
func Call(ctx context.Context, cfg config.ToolConfig, orderNumbers []string) (*OrderAfterSaleResellerBatchResponse, error) {
if len(orderNumbers) == 0 {
return OrderAfterSaleResellerBatchData{}, errors.New("批充订单号不能为空")
return nil, errors.New("批充订单号不能为空")
}
token := util.GetTokenFromContext(ctx)
if token == "" {
return OrderAfterSaleResellerBatchData{}, errors.New("token 未注入")
return nil, errors.New("token 未注入")
}
r := l_request.Request{
Url: cfg.BaseURL,
@ -31,17 +31,18 @@ func Call(ctx context.Context, cfg config.ToolConfig, orderNumbers []string) (Or
}
res, err := r.Send()
if err != nil {
return OrderAfterSaleResellerBatchData{}, err
return nil, err
}
var response OrderAfterSaleResellerBatchResponse
response := &OrderAfterSaleResellerBatchResponse{}
if err = json.Unmarshal(res.Content, &response); err != nil {
return OrderAfterSaleResellerBatchData{}, err
return nil, err
}
if response.Code != 200 {
return OrderAfterSaleResellerBatchData{}, fmt.Errorf("售后订单查询异常: %s", response.Error)
return nil, fmt.Errorf("售后订单查询异常: %s", response.Error)
}
if len(response.Data.Data) == 0 {
return OrderAfterSaleResellerBatchData{}, errors.New("未查询到相应售后订单,请核实订单号是否正确")
return nil, errors.New("未查询到相应售后订单,请核实订单号是否正确")
}
return response.Data, nil
return response, nil
}

View File

@ -13,7 +13,7 @@ type Args struct {
}
func NewInvokable(cfg config.ToolConfig) tool.InvokableTool {
run := func(ctx context.Context, in Args) (OrderAfterSaleResellerBatchData, error) {
run := func(ctx context.Context, in Args) (*OrderAfterSaleResellerBatchResponse, error) {
return Call(ctx, cfg, in.OrderNumber)
}
t, err := toolutils.InferTool("zltxOrderAfterSaleResellerBatch", "直连天下下游分销商批充订单售后工具", run)

View File

@ -1,14 +1,14 @@
package order_after_reseller_batch
type OrderAfterSaleResellerBatchResponse struct {
Code int `json:"code"`
Error string `json:"error"`
Data OrderAfterSaleResellerBatchData `json:"data"`
Code int `json:"code"`
Error string `json:"error"`
Data *OrderAfterSaleResellerBatchData `json:"data"`
}
type OrderAfterSaleResellerBatchData struct {
Data []OrderAfterSaleResellerBatchBase `json:"data"`
ExtData map[string]OrderAfterSaleResellerBatchExtItem `json:"extraData"`
Data []*OrderAfterSaleResellerBatchBase `json:"data"`
ExtData map[string]*OrderAfterSaleResellerBatchExtItem `json:"extraData"`
}
type OrderAfterSaleResellerBatchBase struct {

View File

@ -2,6 +2,7 @@ package runtime
import (
"ai_scheduler/internal/config"
"ai_scheduler/internal/entitys"
"ai_scheduler/internal/pkg/utils_ollama"
"context"
"errors"
@ -11,7 +12,7 @@ import (
type Workflow interface {
ID() string
Schema() map[string]any
Invoke(ctx context.Context, input map[string]any) (map[string]any, error)
Invoke(ctx context.Context, requireData *entitys.RequireData) (map[string]any, error)
}
type Deps struct {
@ -62,10 +63,7 @@ func Default() *Registry {
return r
}
func (r *Registry) Invoke(ctx context.Context, id string, input map[string]any) (map[string]any, error) {
if input == nil {
input = map[string]any{}
}
func (r *Registry) Invoke(ctx context.Context, id string, requireData *entitys.RequireData) (map[string]any, error) {
regMu.RLock()
f, ok := factories[id]
regMu.RUnlock()
@ -91,5 +89,5 @@ func (r *Registry) Invoke(ctx context.Context, id string, input map[string]any)
r.mu.Unlock()
}
return w.Invoke(ctx, input)
return w.Invoke(ctx, requireData)
}

View File

@ -2,12 +2,17 @@ package zltx
import (
"ai_scheduler/internal/config"
"ai_scheduler/internal/data/model"
toolZoarb "ai_scheduler/internal/domain/tools/zltx/order_after_reseller_batch"
"ai_scheduler/internal/domain/workflow/runtime"
"ai_scheduler/internal/entitys"
"ai_scheduler/internal/pkg/util"
"context"
"encoding/json"
"errors"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
)
func init() {
@ -17,7 +22,57 @@ func init() {
}
type orderAfterSaleResellerBatch struct {
cfg config.ToolConfig
cfg config.ToolConfig
data *OrderAfterSaleResellerBatchWorkflowInput
}
// 工作流入参
type OrderAfterSaleResellerBatchWorkflowInput struct {
Ch chan entitys.Response // 响应通道
UserInput string // 用户输入文本
FileContent string // 文件解析结果
UserHistory []model.AiChatHi // 用户对话历史
ParameterResult string // 参数解析结果
Data *OrderAfterSaleResellerBatchNodeData // 节点所需参数
}
// 节点所需参数
type OrderAfterSaleResellerBatchNodeData struct {
OrderNumber []string `json:"orderNumber"` // 订单号
AfterType string `json:"afterType"` // 处理方式 1.退款 2.扣款
AfterSalesPrice string `json:"afterSalesPrice"` // 售后金额
AfterSalesReason string `json:"afterSalesReason"` // 售后原因
ResponsibleType string `json:"responsibleType"` // 费用承担者 1.供应商 2.商务 3.公司 4.无
ResponsiblePerson string `json:"responsiblePerson"` // 费用承担供应商
}
// 工作流出参
type OrderAfterSaleResellerBatchWorkflowOutput struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data []*OrderAfterSaleResellerBatchData `json:"data"`
}
type OrderAfterSaleResellerBatchData struct {
OrderType int `json:"orderType"`
OrderNumber string `json:"orderNumber"`
OrderAmount float64 `json:"orderAmount"`
OrderPrice float64 `json:"orderPrice"`
SignCompany int `json:"signCompany"`
OrderQuantity int `json:"orderQuantity"`
ResellerID int `json:"resellerId"`
ResellerName string `json:"resellerName"`
OurProductID int `json:"ourProductId"`
OurProductTitle string `json:"ourProductTitle"`
Account []string `json:"account"`
Platforms map[int]string `json:"platforms"`
AfterType int `json:"afterType"` // 处理方式 1.退款 2.扣款
Remark string `json:"remark"` // 售后原因
AfterAmount float64 `json:"afterAmount"` // 售后金额
ResponsibleType int `json:"responsibleType"` // 费用承担者 1.供应商 2.商务 3.公司 4.无
ResponsiblePerson string `json:"responsiblePerson"` // 费用承担供应商
IsExistsAfterSale bool `json:"isExistsAfterSale"` // 是否已存在售后
CreateTime int `json:"createTime"` // 创建时间
}
// ID 返回工作流唯一标识
@ -33,18 +88,22 @@ func (o *orderAfterSaleResellerBatch) Schema() map[string]any {
}
// Invoke 调用原有编排工作流并规范化输出
func (o *orderAfterSaleResellerBatch) Invoke(ctx context.Context, input map[string]any) (map[string]any, error) {
func (o *orderAfterSaleResellerBatch) Invoke(ctx context.Context, requireData *entitys.RequireData) (map[string]any, error) {
// 构建工作流
chain, err := o.buildWorkflow(ctx)
if err != nil {
return nil, err
}
var in OrderAfterSaleResellerBatchWorkflowInput
if v, ok := input["orderNumber"].([]string); ok {
in.OrderNumber = v
o.data = &OrderAfterSaleResellerBatchWorkflowInput{
Ch: requireData.Ch,
UserInput: requireData.Req.Text,
FileContent: "",
UserHistory: requireData.Histories,
ParameterResult: requireData.Match.Parameters,
}
_, err = chain.Invoke(ctx, in)
// 工作流过程输出,不关注最终输出
_, err = chain.Invoke(ctx, o.data)
if err != nil {
return nil, err
}
@ -55,35 +114,117 @@ func (o *orderAfterSaleResellerBatch) Invoke(ctx context.Context, input map[stri
return nil, nil
}
type OrderAfterSaleResellerBatchWorkflowInput struct {
OrderNumber []string `json:"orderNumber"`
}
var ErrInvalidOrderNumbers = errors.New("orderNumber 不能为空")
// buildWorkflow 构建工作流
func (o *orderAfterSaleResellerBatch) buildWorkflow(ctx context.Context) (compose.Runnable[OrderAfterSaleResellerBatchWorkflowInput, toolZoarb.OrderAfterSaleResellerBatchData], error) {
func (o *orderAfterSaleResellerBatch) buildWorkflow(ctx context.Context) (compose.Runnable[*OrderAfterSaleResellerBatchWorkflowInput, *OrderAfterSaleResellerBatchWorkflowOutput], error) {
// 定义工作流、出入参
c := compose.NewChain[OrderAfterSaleResellerBatchWorkflowInput, toolZoarb.OrderAfterSaleResellerBatchData]()
c := compose.NewChain[*OrderAfterSaleResellerBatchWorkflowInput, *OrderAfterSaleResellerBatchWorkflowOutput]()
// 1.入参解析与校验
c.AppendLambda(compose.InvokableLambda(func(_ context.Context, in OrderAfterSaleResellerBatchWorkflowInput) (OrderAfterSaleResellerBatchWorkflowInput, error) {
// 1.llm 推断参数 (若需要)
c.AppendLambda(compose.InvokableLambda(func(_ context.Context, in *OrderAfterSaleResellerBatchWorkflowInput) (*schema.Message, error) {
// 已推断完,直接使用
parameters := in.ParameterResult
return &schema.Message{Content: parameters}, nil
}))
// 2.参数解析为结构体
c.AppendLambda(compose.MessageParser(
schema.NewMessageJSONParser[*OrderAfterSaleResellerBatchNodeData](&schema.MessageJSONParseConfig{
ParseFrom: schema.MessageParseFromContent,
ParseKeyPath: "", // 如果仅需要 parse 子字段,可用 "key.sub.grandsub"
}),
))
// 3.参数校验
c.AppendLambda(compose.InvokableLambda(func(_ context.Context, in *OrderAfterSaleResellerBatchNodeData) (*OrderAfterSaleResellerBatchNodeData, error) {
// 校验必填项
if len(in.OrderNumber) == 0 {
return OrderAfterSaleResellerBatchWorkflowInput{}, ErrInvalidOrderNumbers
return nil, ErrInvalidOrderNumbers
}
o.data.Data = in
return in, nil
}))
// 2.调用工具
c.AppendLambda(compose.InvokableLambda(func(ctx context.Context, in OrderAfterSaleResellerBatchWorkflowInput) (toolZoarb.OrderAfterSaleResellerBatchData, error) {
return toolZoarb.Call(ctx, o.cfg, in.OrderNumber)
// 4.工具调用
c.AppendLambda(compose.InvokableLambda(func(_ context.Context, in *OrderAfterSaleResellerBatchNodeData) (*toolZoarb.OrderAfterSaleResellerBatchResponse, error) {
entitys.ResLoading(o.data.Ch, o.ID(), "数据拉取中")
toolRes, err := toolZoarb.Call(ctx, o.cfg, in.OrderNumber)
entitys.ResLog(o.data.Ch, o.ID(), "数据拉取完成")
return toolRes, err
}))
// 3.结果映射与整形
c.AppendLambda(compose.InvokableLambda(func(_ context.Context, in toolZoarb.OrderAfterSaleResellerBatchData) (toolZoarb.OrderAfterSaleResellerBatchData, error) {
return in, nil
}))
// 5.结果数据映射
c.AppendLambda(compose.InvokableLambda(o.dataMapping))
// 编译工作流
return c.Compile(ctx)
}
// 结果数据映射
func (o *orderAfterSaleResellerBatch) dataMapping(_ context.Context, in *toolZoarb.OrderAfterSaleResellerBatchResponse) (*OrderAfterSaleResellerBatchWorkflowOutput, error) {
entitys.ResLog(o.data.Ch, o.ID(), "数据整理中")
toolResp := &OrderAfterSaleResellerBatchWorkflowOutput{
Code: in.Code,
Msg: in.Error,
Data: make([]*OrderAfterSaleResellerBatchData, 0, len(in.Data.Data)),
}
// 转换数据
for _, item := range in.Data.Data {
// 处理方式
afterType := util.StringToInt(o.data.Data.AfterType)
if afterType == 0 {
afterType = 1 // 默认退款
}
// 费用承担者
responsibleType := util.StringToInt(o.data.Data.ResponsibleType)
if responsibleType == 0 {
responsibleType = 4 // 默认无
}
// 售后金额
afterSalesPrice := util.StringToFloat64(o.data.Data.AfterSalesPrice)
if afterSalesPrice == 0 {
afterSalesPrice = item.OrderPrice
}
toolResp.Data = append(toolResp.Data, &OrderAfterSaleResellerBatchData{
OrderType: item.OrderType,
OrderNumber: item.OrderNumber,
OrderAmount: item.OrderAmount,
OrderPrice: item.OrderPrice,
SignCompany: item.SignCompany,
OrderQuantity: item.OrderQuantity,
ResellerID: item.ResellerID,
ResellerName: item.ResellerName,
OurProductID: item.OurProductID,
OurProductTitle: item.OurProductTitle,
Account: item.Account,
Platforms: item.Platforms,
AfterType: afterType,
Remark: o.data.Data.AfterSalesReason,
AfterAmount: afterSalesPrice,
ResponsibleType: responsibleType,
ResponsiblePerson: o.data.Data.ResponsiblePerson,
})
}
// 追加扩展数据
for _, item := range toolResp.Data {
if extItem, ok := in.Data.ExtData[item.OrderNumber]; ok {
item.IsExistsAfterSale = item.OrderType > 100 // 102 批充&已售后
item.CreateTime = extItem.SerialCreateTime
}
}
toolRespJson, _ := json.Marshal(toolResp)
entitys.ResJson(o.data.Ch, o.ID(), string(toolRespJson))
return toolResp, nil
}