package zltx import ( "context" "encoding/json" "errors" "time" "ai_scheduler/internal/domain/component/callback" "ai_scheduler/internal/domain/repo" "ai_scheduler/internal/domain/workflow/runtime" "ai_scheduler/internal/entitys" "ai_scheduler/internal/pkg" "ai_scheduler/internal/pkg/l_request" "github.com/cloudwego/eino/compose" "github.com/google/uuid" "github.com/redis/go-redis/v9" ) const WorkflowIDBugOptimizationSubmit = "bug_optimization_submit" func init() { runtime.Register(WorkflowIDBugOptimizationSubmit, func(d *runtime.Deps) (runtime.Workflow, error) { // 从 Deps.Repos 获取 SessionRepo return &bugOptimizationSubmit{ manager: d.Component.Callback, sessionRepo: d.Repos.Session, }, nil }) } type bugOptimizationSubmit struct { manager callback.Manager sessionRepo repo.SessionRepo redisCli *redis.Client } func (w *bugOptimizationSubmit) ID() string { return WorkflowIDBugOptimizationSubmit } type BugOptimizationSubmitInput struct { Ch chan entitys.Response RequireData *entitys.Recognize } type BugOptimizationSubmitOutput struct { Msg string } type contextWithTask struct { Input *BugOptimizationSubmitInput TaskID string } func (w *bugOptimizationSubmit) Invoke(ctx context.Context, recognize *entitys.Recognize) (map[string]any, error) { chain, err := w.buildWorkflow(ctx) if err != nil { return nil, err } input := &BugOptimizationSubmitInput{ Ch: recognize.Ch, RequireData: recognize, } out, err := chain.Invoke(ctx, input) if err != nil { return nil, err } return map[string]any{"msg": out.Msg}, nil } func (w *bugOptimizationSubmit) buildWorkflow(ctx context.Context) (compose.Runnable[*BugOptimizationSubmitInput, *BugOptimizationSubmitOutput], error) { c := compose.NewChain[*BugOptimizationSubmitInput, *BugOptimizationSubmitOutput]() // Node 1: Prepare and Call c.AppendLambda(compose.InvokableLambda(w.prepareAndCall)) // Node 2: Wait c.AppendLambda(compose.InvokableLambda(w.waitCallback)) return c.Compile(ctx) } func (w *bugOptimizationSubmit) prepareAndCall(ctx context.Context, in *BugOptimizationSubmitInput) (*contextWithTask, error) { // 生成 TaskID taskID := uuid.New().String() // Ext 中获取 sessionId sessionID := in.RequireData.GetSession() // 注册回调映射 if err := w.manager.Register(ctx, taskID, sessionID); err != nil { return nil, err } // 查询用户名 userName := "unknown" if w.sessionRepo != nil { name, err := w.sessionRepo.GetUserName(ctx, sessionID) if err == nil && name != "" { userName = name } } // 构建请求参数 var fileUrls, fileContent string if len(in.RequireData.UserContent.File) > 0 { for _, file := range in.RequireData.UserContent.File { fileUrls += file.FileUrl + "," fileContent += file.FileRec + "," } fileUrls = fileUrls[:len(fileUrls)-1] fileContent = fileContent[:len(fileContent)-1] } body := map[string]string{ "mark": in.RequireData.Match.Index, "text": in.RequireData.UserContent.Text, "img": fileUrls, "img_content": fileContent, "creator": userName, "task_id": taskID, } request := l_request.Request{ Url: "https://connector.dingtalk.com/webhook/flow/10352c521dd02104cee9000c", Method: "POST", Headers: map[string]string{ "Content-Type": "application/json", }, JsonByte: pkg.JsonByteIgonErr(body), } res, err := request.Send() if err != nil { return nil, err } var data map[string]any if err := json.Unmarshal(res.Content, &data); err != nil { return nil, err } if success, ok := data["success"].(bool); !ok || !success { return nil, errors.New("dingtalk flow failed") } entitys.ResLog(in.Ch, in.RequireData.Match.Index, "问题记录中") entitys.ResLoading(in.Ch, in.RequireData.Match.Index, "问题记录中...") return &contextWithTask{Input: in, TaskID: taskID}, nil } func (w *bugOptimizationSubmit) waitCallback(ctx context.Context, in *contextWithTask) (*BugOptimizationSubmitOutput, error) { // 阻塞等待回调信号 // 设置 5 分钟超时 waitCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) defer cancel() res, err := w.manager.Wait(waitCtx, in.TaskID, 5*time.Minute) if err != nil { return nil, err } return &BugOptimizationSubmitOutput{Msg: res}, nil }