package services import ( "ai_scheduler/internal/biz" "ai_scheduler/internal/config" "ai_scheduler/internal/data/constants" "ai_scheduler/internal/data/impl" "ai_scheduler/internal/data/model" "ai_scheduler/internal/entitys" "ai_scheduler/internal/pkg/dingtalk" "context" "encoding/json" "log" "sync" "time" "gitea.cdlsxd.cn/self-tools/l-dingtalk-stream-sdk-go/card" "gitea.cdlsxd.cn/self-tools/l-dingtalk-stream-sdk-go/chatbot" "golang.org/x/sync/errgroup" "xorm.io/builder" ) type DingBotService struct { config *config.Config dingTalkBotBiz *biz.DingTalkBotBiz dingTalkOld *dingtalk.OldClient dingtalkCardClient *dingtalk.CardClient botGroupConfigImpl *impl.BotGroupConfigImpl botGroupImpl *impl.BotGroupImpl } func NewDingBotService( config *config.Config, dingTalkBotBiz *biz.DingTalkBotBiz, dingTalkOld *dingtalk.OldClient, dingtalkCardClient *dingtalk.CardClient, botGroupConfigImpl *impl.BotGroupConfigImpl, botGroupImpl *impl.BotGroupImpl, ) *DingBotService { return &DingBotService{ config: config, dingTalkBotBiz: dingTalkBotBiz, dingTalkOld: dingTalkOld, dingtalkCardClient: dingtalkCardClient, botGroupConfigImpl: botGroupConfigImpl, botGroupImpl: botGroupImpl, } } func (d *DingBotService) GetServiceCfg() ([]entitys.DingTalkBot, error) { return d.dingTalkBotBiz.GetDingTalkBotCfgList() } func (d *DingBotService) OnChatBotMessageReceived(ctx context.Context, data *chatbot.BotCallbackDataModel) ([]byte, error) { requireData, err := d.dingTalkBotBiz.InitRequire(ctx, data) if err != nil { return nil, err } // 启动后台任务(独立生命周期,带超时控制) go func() { defer func() { if r := recover(); r != nil { log.Printf("稍等一下,问的人太多了,有点转不过来,请稍等下再来🚀🚀🚀") } }() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() if err := d.runBackgroundTasks(ctx, data, requireData); err != nil { log.Printf("后台任务执行失败: %v", err) } }() return []byte("success"), nil } func (d *DingBotService) runBackgroundTasks(ctx context.Context, data *chatbot.BotCallbackDataModel, requireData *entitys.RequireDataDingTalkBot) error { g, ctx := errgroup.WithContext(ctx) var ( chat []string chatMu sync.Mutex resChan = make(chan string, 10) ) // 1. 流式处理协程 g.Go(func() error { defer func() { // 确保通道最终关闭 close(resChan) }() return d.dingTalkBotBiz.HandleStreamRes(ctx, data, resChan) }) // 2. 业务处理协程(负责关闭requireData.Ch) g.Go(func() error { // 在完成时关闭通道 defer close(requireData.Ch) //entitys.ResLoading(requireData.Ch, "", "![图片](") //entitys.ResLoading(requireData.Ch, "", "https://p6-img.") //entitys.ResLoading(requireData.Ch, "", "searchpstatp.com/") //entitys.ResLoading(requireData.Ch, "", "tos-cn-i-vvloioitz3/ab5ae998d8162b431f44fb2a0ed9ae33~tplv-vvloioitz3-6:190:124.jpeg)") return d.dingTalkBotBiz.Do(ctx, requireData) }) // 3. 结果收集协程(修改后的版本) resultDone := make(chan struct{}) g.Go(func() error { // 使用defer确保通道关闭 defer close(resultDone) // 处理通道中的数据 for { select { case resp, ok := <-requireData.Ch: if !ok { return nil // 通道已关闭,正常退出 } if resp.Type != entitys.ResponseLog { chatMu.Lock() chat = append(chat, resp.Content) chatMu.Unlock() select { case resChan <- resp.Content: case <-ctx.Done(): return ctx.Err() } } case <-ctx.Done(): return ctx.Err() // 上下文取消,提前退出 } } }) // 4. 统一关闭通道的协程(只关闭resChan) g.Go(func() error { <-resultDone // resChan已在流式处理协程关闭 return nil }) // 5. 历史记录保存协程 g.Go(func() error { <-resultDone chatMu.Lock() savedChat := make([]string, len(chat)) copy(savedChat, chat) chatMu.Unlock() if err := d.dingTalkBotBiz.SaveHis(ctx, requireData, savedChat); err != nil { log.Printf("保存历史记录失败: %v", err) return err } return nil }) // 阻塞直到所有协程完成或出错 if err := g.Wait(); err != nil { return err } return nil } // **一把梭先搞,后续规范化** func (d *DingBotService) OnCardMessageReceived(ctx context.Context, data *card.CardRequest) (resp *card.CardResponse, err error) { // 非回调类型暂不接受 if data.Type != constants.CardActionCallbackTypeAction { return nil, nil } // action 处理 - 这里先只处理第一个匹配的actionId for _, actionId := range data.CardActionData.CardPrivateData.ActionIdList { switch actionId { case constants.CardActionTypeCreateGroup: // 解析 OutTrackId 以获取 SpaceId 和 BotId spaceId, botId := constants.ParseCardOutTrackId(data.OutTrackId) // 群id+机器人id确认一个群配置 var botGroup *model.AiBotGroup botGroup, err = d.botGroupImpl.GetByConversationIdAndRobotCode(spaceId, botId) if err != nil { return nil, err } // 获取群配置 var groupConfig model.AiBotGroupConfig cond := builder.NewCond().And(builder.Eq{"config_id": botGroup.ConfigID}) err = d.botGroupConfigImpl.GetOneBySearchToStrut(&cond, &groupConfig) if err != nil { return nil, err } // 获取处理人列表 issueOwnerJson := groupConfig.IssueOwner type issueOwnerType struct { UserId string `json:"userid"` Name string `json:"name"` } var issueOwner []issueOwnerType if err = json.Unmarshal([]byte(issueOwnerJson), &issueOwner); err != nil { return nil, err } // 合并所有userid userIds := []string{data.UserId} // 当前用户为群主 for _, owner := range issueOwner { userIds = append(userIds, owner.UserId) } if data.CardActionData.CardPrivateData.Params["status"] == "confirm" { // 创建群聊 - 这里用的是“统一登录平台”这个应用的接口 // 不是很关心成功失败,ws中,后续考虑协程去创建 if _, err = d.dingTalkOld.CreateInternalGroupConversation(ctx, "问题处理群", userIds); err != nil { return nil, err } } // 构建关闭创建群组卡片按钮的响应 resp = d.buildCreateGroupCardResp() return } } return &card.CardResponse{}, nil } // 关闭创建群组卡片按钮 func (d *DingBotService) buildCreateGroupCardResp() *card.CardResponse { return &card.CardResponse{ CardData: &card.CardDataDto{ CardParamMap: map[string]string{ "button_display": "false", }, }, CardUpdateOptions: &card.CardUpdateOptions{ UpdateCardDataByKey: true, }, } }