171 lines
4.2 KiB
Go
171 lines
4.2 KiB
Go
package zltx
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"time"
|
|
|
|
"ai_scheduler/internal/domain/component/callback"
|
|
"ai_scheduler/internal/domain/repo"
|
|
"ai_scheduler/internal/domain/workflow/runtime"
|
|
"ai_scheduler/internal/entitys"
|
|
"ai_scheduler/internal/pkg"
|
|
"ai_scheduler/internal/pkg/l_request"
|
|
|
|
"github.com/cloudwego/eino/compose"
|
|
"github.com/google/uuid"
|
|
"github.com/redis/go-redis/v9"
|
|
)
|
|
|
|
const WorkflowIDBugOptimizationSubmit = "bug_optimization_submit"
|
|
|
|
func init() {
|
|
runtime.Register(WorkflowIDBugOptimizationSubmit, func(d *runtime.Deps) (runtime.Workflow, error) {
|
|
// 从 Deps.Repos 获取 SessionRepo
|
|
return &bugOptimizationSubmit{
|
|
manager: d.Component.Callback,
|
|
sessionRepo: d.Repos.Session,
|
|
}, nil
|
|
})
|
|
}
|
|
|
|
type bugOptimizationSubmit struct {
|
|
manager callback.Manager
|
|
sessionRepo repo.SessionRepo
|
|
redisCli *redis.Client
|
|
}
|
|
|
|
func (w *bugOptimizationSubmit) ID() string {
|
|
return WorkflowIDBugOptimizationSubmit
|
|
}
|
|
|
|
type BugOptimizationSubmitInput struct {
|
|
Ch chan entitys.Response
|
|
RequireData *entitys.Recognize
|
|
}
|
|
|
|
type BugOptimizationSubmitOutput struct {
|
|
Msg string
|
|
}
|
|
|
|
type contextWithTask struct {
|
|
Input *BugOptimizationSubmitInput
|
|
TaskID string
|
|
}
|
|
|
|
func (w *bugOptimizationSubmit) Invoke(ctx context.Context, recognize *entitys.Recognize) (map[string]any, error) {
|
|
chain, err := w.buildWorkflow(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
input := &BugOptimizationSubmitInput{
|
|
Ch: recognize.Ch,
|
|
RequireData: recognize,
|
|
}
|
|
|
|
out, err := chain.Invoke(ctx, input)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return map[string]any{"msg": out.Msg}, nil
|
|
}
|
|
|
|
func (w *bugOptimizationSubmit) buildWorkflow(ctx context.Context) (compose.Runnable[*BugOptimizationSubmitInput, *BugOptimizationSubmitOutput], error) {
|
|
c := compose.NewChain[*BugOptimizationSubmitInput, *BugOptimizationSubmitOutput]()
|
|
|
|
// Node 1: Prepare and Call
|
|
c.AppendLambda(compose.InvokableLambda(w.prepareAndCall))
|
|
|
|
// Node 2: Wait
|
|
c.AppendLambda(compose.InvokableLambda(w.waitCallback))
|
|
|
|
return c.Compile(ctx)
|
|
}
|
|
|
|
func (w *bugOptimizationSubmit) prepareAndCall(ctx context.Context, in *BugOptimizationSubmitInput) (*contextWithTask, error) {
|
|
// 生成 TaskID
|
|
taskID := uuid.New().String()
|
|
|
|
// Ext 中获取 sessionId
|
|
sessionID := in.RequireData.GetSession()
|
|
|
|
// 注册回调映射
|
|
if err := w.manager.Register(ctx, taskID, sessionID); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// 查询用户名
|
|
userName := "unknown"
|
|
if w.sessionRepo != nil {
|
|
name, err := w.sessionRepo.GetUserName(ctx, sessionID)
|
|
if err == nil && name != "" {
|
|
userName = name
|
|
}
|
|
}
|
|
|
|
// 构建请求参数
|
|
var fileUrls, fileContent string
|
|
if len(in.RequireData.UserContent.File) > 0 {
|
|
for _, file := range in.RequireData.UserContent.File {
|
|
fileUrls += file.FileUrl + ","
|
|
fileContent += file.FileRec + ","
|
|
}
|
|
fileUrls = fileUrls[:len(fileUrls)-1]
|
|
fileContent = fileContent[:len(fileContent)-1]
|
|
}
|
|
|
|
body := map[string]string{
|
|
"mark": in.RequireData.Match.Index,
|
|
"text": in.RequireData.UserContent.Text,
|
|
"img": fileUrls,
|
|
"img_content": fileContent,
|
|
"creator": userName,
|
|
"task_id": taskID,
|
|
}
|
|
|
|
request := l_request.Request{
|
|
Url: "https://connector.dingtalk.com/webhook/flow/10352c521dd02104cee9000c",
|
|
Method: "POST",
|
|
Headers: map[string]string{
|
|
"Content-Type": "application/json",
|
|
},
|
|
JsonByte: pkg.JsonByteIgonErr(body),
|
|
}
|
|
|
|
res, err := request.Send()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var data map[string]any
|
|
if err := json.Unmarshal(res.Content, &data); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if success, ok := data["success"].(bool); !ok || !success {
|
|
return nil, errors.New("dingtalk flow failed")
|
|
}
|
|
|
|
entitys.ResLog(in.Ch, in.RequireData.Match.Index, "问题记录中")
|
|
entitys.ResLoading(in.Ch, in.RequireData.Match.Index, "问题记录中...")
|
|
|
|
return &contextWithTask{Input: in, TaskID: taskID}, nil
|
|
}
|
|
|
|
func (w *bugOptimizationSubmit) waitCallback(ctx context.Context, in *contextWithTask) (*BugOptimizationSubmitOutput, error) {
|
|
// 阻塞等待回调信号
|
|
// 设置 5 分钟超时
|
|
waitCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
|
|
defer cancel()
|
|
|
|
res, err := w.manager.Wait(waitCtx, in.TaskID, 5*time.Minute)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &BugOptimizationSubmitOutput{Msg: res}, nil
|
|
}
|