ai_scheduler/internal/services/dtalk_bot.go

83 lines
1.9 KiB
Go

package services
import (
"ai_scheduler/internal/biz"
"log"
"time"
"ai_scheduler/internal/config"
"ai_scheduler/internal/entitys"
"context"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/chatbot"
)
type DingBotService struct {
config *config.Config
dingTalkBotBiz *biz.DingTalkBotBiz
}
func NewDingBotService(config *config.Config, DingTalkBotBiz *biz.DingTalkBotBiz) *DingBotService {
return &DingBotService{config: config, dingTalkBotBiz: DingTalkBotBiz}
}
func (d *DingBotService) GetServiceCfg() ([]entitys.DingTalkBot, error) {
return d.dingTalkBotBiz.GetDingTalkBotCfgList()
}
func (d *DingBotService) OnChatBotMessageReceived(ctx context.Context, data *chatbot.BotCallbackDataModel) (content []byte, err error) {
var (
lastErr error
chat []string
)
requireData, err := d.dingTalkBotBiz.InitRequire(ctx, data)
if err != nil {
return
}
// 使用 ctx.Done() 通知 Do 方法提前终止
subCtx, cancel := context.WithCancel(ctx)
defer func() {
cancel()
_ = d.dingTalkBotBiz.SaveHis(ctx, requireData, chat)
}()
// 异步执行 Do 方法
done := make(chan error, 1)
go func() {
done <- d.dingTalkBotBiz.Do(subCtx, requireData)
}()
for {
select {
case <-ctx.Done():
lastErr = ctx.Err()
goto cleanup
case resp, ok := <-requireData.Ch:
if !ok {
return []byte("success"), nil
}
if resp.Type == entitys.ResponseLog {
continue
}
if resp.Type == entitys.ResponseText || resp.Type == entitys.ResponseStream || resp.Type == entitys.ResponseJson {
chat = append(chat, resp.Content)
}
if err := d.dingTalkBotBiz.HandleRes(ctx, data, resp); err != nil {
log.Printf("HandleRes 失败: %v", err)
}
}
}
cleanup:
select {
case _err := <-done:
if _err != nil {
panic(_err)
}
case <-time.After(1 * time.Second):
log.Println("警告:等待 Do 方法超时,可能发生 goroutine 泄漏")
}
return nil, lastErr
}