package services import ( "ai_scheduler/internal/biz" "log" "time" "ai_scheduler/internal/config" "ai_scheduler/internal/entitys" "context" "github.com/open-dingtalk/dingtalk-stream-sdk-go/chatbot" ) type DingBotService struct { config *config.Config dingTalkBotBiz *biz.DingTalkBotBiz } func NewDingBotService(config *config.Config, DingTalkBotBiz *biz.DingTalkBotBiz) *DingBotService { return &DingBotService{config: config, dingTalkBotBiz: DingTalkBotBiz} } func (d *DingBotService) GetServiceCfg() ([]entitys.DingTalkBot, error) { return d.dingTalkBotBiz.GetDingTalkBotCfgList() } func (d *DingBotService) OnChatBotMessageReceived(ctx context.Context, data *chatbot.BotCallbackDataModel) (content []byte, err error) { var ( lastErr error chat []string ) requireData, err := d.dingTalkBotBiz.InitRequire(ctx, data) if err != nil { return } // 使用 ctx.Done() 通知 Do 方法提前终止 subCtx, cancel := context.WithCancel(ctx) defer func() { cancel() _ = d.dingTalkBotBiz.SaveHis(ctx, requireData, chat) }() // 异步执行 Do 方法 done := make(chan error, 1) go func() { done <- d.dingTalkBotBiz.Do(subCtx, requireData) }() for { select { case <-ctx.Done(): lastErr = ctx.Err() goto cleanup case resp, ok := <-requireData.Ch: if !ok { return []byte("success"), nil } if resp.Type == entitys.ResponseLog { continue } if resp.Type == entitys.ResponseText || resp.Type == entitys.ResponseStream || resp.Type == entitys.ResponseJson { chat = append(chat, resp.Content) } if err := d.dingTalkBotBiz.HandleRes(ctx, data, resp); err != nil { log.Printf("HandleRes 失败: %v", err) } } } cleanup: select { case _err := <-done: if _err != nil { panic(_err) } case <-time.After(1 * time.Second): log.Println("警告:等待 Do 方法超时,可能发生 goroutine 泄漏") } return nil, lastErr }