package services import ( "ai_scheduler/internal/biz" "ai_scheduler/internal/config" "ai_scheduler/internal/data/constants" "ai_scheduler/internal/data/impl" "ai_scheduler/internal/data/model" "ai_scheduler/internal/entitys" "ai_scheduler/internal/pkg/dingtalk" "context" "encoding/json" "fmt" "log" "sync" "time" "gitea.cdlsxd.cn/self-tools/l-dingtalk-stream-sdk-go/card" "gitea.cdlsxd.cn/self-tools/l-dingtalk-stream-sdk-go/chatbot" "github.com/alibabacloud-go/dingtalk/card_1_0" "github.com/alibabacloud-go/dingtalk/im_1_0" "github.com/alibabacloud-go/tea/tea" "golang.org/x/sync/errgroup" "xorm.io/builder" ) type DingBotService struct { config *config.Config dingTalkBotBiz *biz.DingTalkBotBiz dingTalkOld *dingtalk.OldClient dingtalkCardClient *dingtalk.CardClient dingtalkImClient *dingtalk.ImClient botGroupConfigImpl *impl.BotGroupConfigImpl botGroupImpl *impl.BotGroupImpl botConfigImpl *impl.BotConfigImpl } func NewDingBotService( config *config.Config, dingTalkBotBiz *biz.DingTalkBotBiz, dingTalkOld *dingtalk.OldClient, dingtalkCardClient *dingtalk.CardClient, dingtalkImClient *dingtalk.ImClient, botGroupConfigImpl *impl.BotGroupConfigImpl, botGroupImpl *impl.BotGroupImpl, botConfigImpl *impl.BotConfigImpl, ) *DingBotService { return &DingBotService{ config: config, dingTalkBotBiz: dingTalkBotBiz, dingTalkOld: dingTalkOld, dingtalkCardClient: dingtalkCardClient, dingtalkImClient: dingtalkImClient, botGroupConfigImpl: botGroupConfigImpl, botGroupImpl: botGroupImpl, botConfigImpl: botConfigImpl, } } func (d *DingBotService) GetServiceCfg() ([]entitys.DingTalkBot, error) { return d.dingTalkBotBiz.GetDingTalkBotCfgList() } func (d *DingBotService) OnChatBotMessageReceived(ctx context.Context, data *chatbot.BotCallbackDataModel) ([]byte, error) { requireData, err := d.dingTalkBotBiz.InitRequire(ctx, data) if err != nil { return nil, err } // 启动后台任务(独立生命周期,带超时控制) go func() { defer func() { if r := recover(); r != nil { log.Printf("稍等一下,问的人太多了,有点转不过来,请稍等下再来🚀🚀🚀") } }() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() if err := d.runBackgroundTasks(ctx, data, requireData); err != nil { log.Printf("后台任务执行失败: %v", err) } }() return []byte("success"), nil } func (d *DingBotService) runBackgroundTasks(ctx context.Context, data *chatbot.BotCallbackDataModel, requireData *entitys.RequireDataDingTalkBot) error { g, ctx := errgroup.WithContext(ctx) var ( chat []string chatMu sync.Mutex resChan = make(chan string, 10) ) // 1. 流式处理协程 g.Go(func() error { defer func() { // 确保通道最终关闭 close(resChan) }() return d.dingTalkBotBiz.HandleStreamRes(ctx, data, resChan) }) // 2. 业务处理协程(负责关闭requireData.Ch) g.Go(func() error { // 在完成时关闭通道 defer close(requireData.Ch) //entitys.ResLoading(requireData.Ch, "", "![图片](") //entitys.ResLoading(requireData.Ch, "", "https://p6-img.") //entitys.ResLoading(requireData.Ch, "", "searchpstatp.com/") //entitys.ResLoading(requireData.Ch, "", "tos-cn-i-vvloioitz3/ab5ae998d8162b431f44fb2a0ed9ae33~tplv-vvloioitz3-6:190:124.jpeg)") return d.dingTalkBotBiz.Do(ctx, requireData) }) // 3. 结果收集协程(修改后的版本) resultDone := make(chan struct{}) g.Go(func() error { // 使用defer确保通道关闭 defer close(resultDone) // 处理通道中的数据 for { select { case resp, ok := <-requireData.Ch: if !ok { return nil // 通道已关闭,正常退出 } if resp.Type != entitys.ResponseLog { chatMu.Lock() chat = append(chat, resp.Content) chatMu.Unlock() select { case resChan <- resp.Content: case <-ctx.Done(): return ctx.Err() } } case <-ctx.Done(): return ctx.Err() // 上下文取消,提前退出 } } }) // 4. 统一关闭通道的协程(只关闭resChan) g.Go(func() error { <-resultDone // resChan已在流式处理协程关闭 return nil }) // 5. 历史记录保存协程 g.Go(func() error { <-resultDone chatMu.Lock() savedChat := make([]string, len(chat)) copy(savedChat, chat) chatMu.Unlock() if err := d.dingTalkBotBiz.SaveHis(ctx, requireData, savedChat); err != nil { log.Printf("保存历史记录失败: %v", err) return err } return nil }) // 阻塞直到所有协程完成或出错 if err := g.Wait(); err != nil { return err } return nil } // **一把梭先搞,后续规范化** func (d *DingBotService) OnCardMessageReceived(ctx context.Context, data *card.CardRequest) (resp *card.CardResponse, err error) { // 非回调类型暂不接受 if data.Type != constants.CardActionCallbackTypeAction { return nil, nil } // 卡片同步回调超时时间为2s,2s内同步返回,2s后异步更新卡片 startTime := time.Now() // action 处理 - 这里先只处理第一个匹配的actionId for _, actionId := range data.CardActionData.CardPrivateData.ActionIdList { switch actionId { case constants.CardActionTypeCreateGroup: // 解析 OutTrackId 以获取 SpaceId 和 BotId spaceId, botId := constants.ParseCardOutTrackId(data.OutTrackId) // 获取新群聊人员 var userIds []string userIds, err = d.buildNewGroupUserIds(ctx, spaceId, botId, data.UserId) if err != nil { return nil, err } // 新群分享链接 newGroupShareLink := "" timeOutLimit := 1500 * time.Millisecond // 钉钉appKey appKey := dingtalk.AppKey{} if data.CardActionData.CardPrivateData.Params["status"] == "confirm" { // 创建群聊 - 这里用的是“统一登录平台”这个应用的接口 // 不是很关心成功失败,ws中,后续考虑协程去创建 chatId, openConversationId, err := d.dingTalkOld.CreateInternalGroupConversation(ctx, "问题处理群", userIds) if err != nil { fmt.Printf("创建群聊失败: %v", err) } _ = openConversationId // 获取机器人配置 var botConfig model.AiBotConfig cond := builder.NewCond().And(builder.Eq{"robot_code": botId}) err = d.botConfigImpl.GetOneBySearchToStrut(&cond, &botConfig) if err != nil { return nil, err } // 解出 config var config entitys.DingTalkBot err = json.Unmarshal([]byte(botConfig.BotConfig), &config) if err != nil { log.Printf("配置解析失败 %v", err.Error()) } appKey = dingtalk.AppKey{ AppKey: config.ClientId, AppSecret: config.ClientSecret, } // 添加当前机器人到新群 _, err = d.dingtalkImClient.AddRobotToConversation( appKey, &im_1_0.AddRobotToConversationRequest{ OpenConversationId: tea.String(openConversationId), RobotCode: tea.String(botId), }) if err != nil { fmt.Printf("添加机器人到会话失败: %v", err) } // 返回新群分享链接,直接进群 newGroupShareLink, err = d.dingTalkOld.GetJoinGroupQrcode(ctx, chatId, data.UserId) if err != nil { fmt.Printf("获取入群二维码失败: %v", err) } } endTime := time.Now() if endTime.Sub(startTime) > timeOutLimit { // 异步更新卡片 d.dingtalkCardClient.UpdateCard(appKey, &card_1_0.UpdateCardRequest{ OutTrackId: tea.String(data.OutTrackId), CardData: &card_1_0.UpdateCardRequestCardData{ CardParamMap: map[string]*string{ "button_display": tea.String("false"), "new_group_share_link": tea.String(newGroupShareLink), }, }, CardUpdateOptions: &card_1_0.UpdateCardRequestCardUpdateOptions{ UpdateCardDataByKey: tea.Bool(true), }, }) return } // 构建关闭创建群组卡片按钮的响应 resp = d.buildCreateGroupCardResp(newGroupShareLink) return } } return &card.CardResponse{}, nil } // buildNewGroupUserIds 构建新群聊人员列表 func (d *DingBotService) buildNewGroupUserIds(ctx context.Context, spaceId, botId, groupOwner string) ([]string, error) { // 群id+机器人id确认一个群配置 botGroup, err := d.botGroupImpl.GetByConversationIdAndRobotCode(spaceId, botId) if err != nil { return nil, err } // 获取群配置 var groupConfig model.AiBotGroupConfig cond := builder.NewCond().And(builder.Eq{"config_id": botGroup.ConfigID}) err = d.botGroupConfigImpl.GetOneBySearchToStrut(&cond, &groupConfig) if err != nil { return nil, err } // 获取处理人列表 issueOwnerJson := groupConfig.IssueOwner type issueOwnerType struct { UserId string `json:"userid"` Name string `json:"name"` } var issueOwner []issueOwnerType if err = json.Unmarshal([]byte(issueOwnerJson), &issueOwner); err != nil { return nil, err } // 合并所有userid userIds := []string{groupOwner} // 当前用户为群主 for _, owner := range issueOwner { userIds = append(userIds, owner.UserId) } return userIds, nil } // buildCreateGroupCardResp 构建关闭创建群组卡片按钮 func (d *DingBotService) buildCreateGroupCardResp(newGroupShareLink string) *card.CardResponse { return &card.CardResponse{ CardData: &card.CardDataDto{ CardParamMap: map[string]string{ "button_display": "false", "new_group_share_link": newGroupShareLink, }, }, CardUpdateOptions: &card.CardUpdateOptions{ UpdateCardDataByKey: true, }, } }