chore:add bak
This commit is contained in:
parent
e3448ae41e
commit
6c23fa34d6
|
|
@ -0,0 +1,170 @@
|
|||
package zltx
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"ai_scheduler/internal/domain/component/callback"
|
||||
"ai_scheduler/internal/domain/repo"
|
||||
"ai_scheduler/internal/domain/workflow/runtime"
|
||||
"ai_scheduler/internal/entitys"
|
||||
"ai_scheduler/internal/pkg"
|
||||
"ai_scheduler/internal/pkg/l_request"
|
||||
|
||||
"github.com/cloudwego/eino/compose"
|
||||
"github.com/google/uuid"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
const WorkflowIDBugOptimizationSubmitBak = "bug_optimization_submit_bak"
|
||||
|
||||
func init() {
|
||||
runtime.Register(WorkflowIDBugOptimizationSubmitBak, func(d *runtime.Deps) (runtime.Workflow, error) {
|
||||
// 从 Deps.Repos 获取 SessionRepo
|
||||
return &bugOptimizationSubmitBak{
|
||||
manager: d.Component.Callback,
|
||||
sessionRepo: d.Repos.Session,
|
||||
}, nil
|
||||
})
|
||||
}
|
||||
|
||||
type bugOptimizationSubmitBak struct {
|
||||
manager callback.Manager
|
||||
sessionRepo repo.SessionRepo
|
||||
redisCli *redis.Client
|
||||
}
|
||||
|
||||
func (w *bugOptimizationSubmitBak) ID() string {
|
||||
return WorkflowIDBugOptimizationSubmitBak
|
||||
}
|
||||
|
||||
type BugOptimizationSubmitBakInput struct {
|
||||
Ch chan entitys.Response
|
||||
RequireData *entitys.Recognize
|
||||
}
|
||||
|
||||
type BugOptimizationSubmitBakOutput struct {
|
||||
Msg string
|
||||
}
|
||||
|
||||
type contextWithTaskBak struct {
|
||||
Input *BugOptimizationSubmitBakInput
|
||||
TaskID string
|
||||
}
|
||||
|
||||
func (w *bugOptimizationSubmitBak) Invoke(ctx context.Context, recognize *entitys.Recognize) (map[string]any, error) {
|
||||
chain, err := w.buildWorkflow(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
input := &BugOptimizationSubmitBakInput{
|
||||
Ch: recognize.Ch,
|
||||
RequireData: recognize,
|
||||
}
|
||||
|
||||
out, err := chain.Invoke(ctx, input)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return map[string]any{"msg": out.Msg}, nil
|
||||
}
|
||||
|
||||
func (w *bugOptimizationSubmitBak) buildWorkflow(ctx context.Context) (compose.Runnable[*BugOptimizationSubmitBakInput, *BugOptimizationSubmitBakOutput], error) {
|
||||
c := compose.NewChain[*BugOptimizationSubmitBakInput, *BugOptimizationSubmitBakOutput]()
|
||||
|
||||
// Node 1: Prepare and Call
|
||||
c.AppendLambda(compose.InvokableLambda(w.prepareAndCall))
|
||||
|
||||
// Node 2: Wait
|
||||
c.AppendLambda(compose.InvokableLambda(w.waitCallback))
|
||||
|
||||
return c.Compile(ctx)
|
||||
}
|
||||
|
||||
func (w *bugOptimizationSubmitBak) prepareAndCall(ctx context.Context, in *BugOptimizationSubmitBakInput) (*contextWithTaskBak, error) {
|
||||
// 生成 TaskID
|
||||
taskID := uuid.New().String()
|
||||
|
||||
// Ext 中获取 sessionId
|
||||
sessionID := in.RequireData.GetSession()
|
||||
|
||||
// 注册回调映射
|
||||
if err := w.manager.Register(ctx, taskID, sessionID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 查询用户名
|
||||
userName := "unknown"
|
||||
if w.sessionRepo != nil {
|
||||
name, err := w.sessionRepo.GetUserName(ctx, sessionID)
|
||||
if err == nil && name != "" {
|
||||
userName = name
|
||||
}
|
||||
}
|
||||
|
||||
// 构建请求参数
|
||||
var fileUrls, fileContent string
|
||||
if len(in.RequireData.UserContent.File) > 0 {
|
||||
for _, file := range in.RequireData.UserContent.File {
|
||||
fileUrls += file.FileUrl + ","
|
||||
fileContent += file.FileRec + ","
|
||||
}
|
||||
fileUrls = fileUrls[:len(fileUrls)-1]
|
||||
fileContent = fileContent[:len(fileContent)-1]
|
||||
}
|
||||
|
||||
body := map[string]string{
|
||||
"mark": in.RequireData.Match.Index,
|
||||
"text": in.RequireData.UserContent.Text,
|
||||
"img": fileUrls,
|
||||
"img_content": fileContent,
|
||||
"creator": userName,
|
||||
"task_id": taskID,
|
||||
}
|
||||
|
||||
request := l_request.Request{
|
||||
Url: "https://connector.dingtalk.com/webhook/flow/10352c521dd02104cee9000c",
|
||||
Method: "POST",
|
||||
Headers: map[string]string{
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
JsonByte: pkg.JsonByteIgonErr(body),
|
||||
}
|
||||
|
||||
res, err := request.Send()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var data map[string]any
|
||||
if err := json.Unmarshal(res.Content, &data); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if success, ok := data["success"].(bool); !ok || !success {
|
||||
return nil, errors.New("dingtalk flow failed")
|
||||
}
|
||||
|
||||
entitys.ResLog(in.Ch, in.RequireData.Match.Index, "问题记录中")
|
||||
entitys.ResLoading(in.Ch, in.RequireData.Match.Index, "问题记录中...")
|
||||
|
||||
return &contextWithTaskBak{Input: in, TaskID: taskID}, nil
|
||||
}
|
||||
|
||||
func (w *bugOptimizationSubmitBak) waitCallback(ctx context.Context, in *contextWithTask) (*BugOptimizationSubmitBakOutput, error) {
|
||||
// 阻塞等待回调信号
|
||||
// 设置 5 分钟超时
|
||||
waitCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
res, err := w.manager.Wait(waitCtx, in.TaskID, 5*time.Minute)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &BugOptimizationSubmitBakOutput{Msg: res}, nil
|
||||
}
|
||||
Loading…
Reference in New Issue