package services import ( "ai_scheduler/internal/biz" "ai_scheduler/internal/config" "ai_scheduler/internal/data/constants" "ai_scheduler/internal/data/impl" "ai_scheduler/internal/data/model" "ai_scheduler/internal/entitys" "ai_scheduler/internal/pkg/dingtalk" "context" "encoding/json" "log" "sync" "time" "gitea.cdlsxd.cn/self-tools/l-dingtalk-stream-sdk-go/card" "gitea.cdlsxd.cn/self-tools/l-dingtalk-stream-sdk-go/chatbot" "github.com/alibabacloud-go/dingtalk/card_1_0" "github.com/alibabacloud-go/tea/tea" "golang.org/x/sync/errgroup" "xorm.io/builder" ) type DingBotService struct { config *config.Config dingTalkBotBiz *biz.DingTalkBotBiz dingTalkOld *dingtalk.OldClient dingtalkCardClient *dingtalk.CardClient dingtalkImClient *dingtalk.ImClient dingtalkOauth2Client *dingtalk.Oauth2Client botGroupConfigImpl *impl.BotGroupConfigImpl botGroupImpl *impl.BotGroupImpl botConfigImpl *impl.BotConfigImpl } func NewDingBotService( config *config.Config, dingTalkBotBiz *biz.DingTalkBotBiz, dingTalkOld *dingtalk.OldClient, dingtalkCardClient *dingtalk.CardClient, dingtalkImClient *dingtalk.ImClient, dingtalkOauth2Client *dingtalk.Oauth2Client, botGroupConfigImpl *impl.BotGroupConfigImpl, botGroupImpl *impl.BotGroupImpl, botConfigImpl *impl.BotConfigImpl, ) *DingBotService { return &DingBotService{ config: config, dingTalkBotBiz: dingTalkBotBiz, dingTalkOld: dingTalkOld, dingtalkCardClient: dingtalkCardClient, dingtalkImClient: dingtalkImClient, dingtalkOauth2Client: dingtalkOauth2Client, botGroupConfigImpl: botGroupConfigImpl, botGroupImpl: botGroupImpl, botConfigImpl: botConfigImpl, } } func (d *DingBotService) GetServiceCfg() ([]entitys.DingTalkBot, error) { return d.dingTalkBotBiz.GetDingTalkBotCfgList() } func (d *DingBotService) OnChatBotMessageReceived(ctx context.Context, data *chatbot.BotCallbackDataModel) ([]byte, error) { requireData, err := d.dingTalkBotBiz.InitRequire(ctx, data) if err != nil { return nil, err } // 启动后台任务(独立生命周期,带超时控制) go func() { defer func() { if r := recover(); r != nil { log.Printf("稍等一下,问的人太多了,有点转不过来,请稍等下再来🚀🚀🚀") } }() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() if err := d.runBackgroundTasks(ctx, data, requireData); err != nil { log.Printf("后台任务执行失败: %v", err) } }() return []byte("success"), nil } func (d *DingBotService) runBackgroundTasks(ctx context.Context, data *chatbot.BotCallbackDataModel, requireData *entitys.RequireDataDingTalkBot) error { g, ctx := errgroup.WithContext(ctx) var ( chat []string chatMu sync.Mutex resChan = make(chan string, 10) ) // 1. 流式处理协程 g.Go(func() error { defer func() { // 确保通道最终关闭 close(resChan) }() return d.dingTalkBotBiz.HandleStreamRes(ctx, data, resChan) }) // 2. 业务处理协程(负责关闭requireData.Ch) g.Go(func() error { // 在完成时关闭通道 defer close(requireData.Ch) //entitys.ResLoading(requireData.Ch, "", "![图片](") //entitys.ResLoading(requireData.Ch, "", "https://p6-img.") //entitys.ResLoading(requireData.Ch, "", "searchpstatp.com/") //entitys.ResLoading(requireData.Ch, "", "tos-cn-i-vvloioitz3/ab5ae998d8162b431f44fb2a0ed9ae33~tplv-vvloioitz3-6:190:124.jpeg)") return d.dingTalkBotBiz.Do(ctx, requireData) }) // 3. 结果收集协程(修改后的版本) resultDone := make(chan struct{}) g.Go(func() error { // 使用defer确保通道关闭 defer close(resultDone) // 处理通道中的数据 for { select { case resp, ok := <-requireData.Ch: if !ok { return nil // 通道已关闭,正常退出 } if resp.Type != entitys.ResponseLog { chatMu.Lock() chat = append(chat, resp.Content) chatMu.Unlock() select { case resChan <- resp.Content: case <-ctx.Done(): return ctx.Err() } } case <-ctx.Done(): return ctx.Err() // 上下文取消,提前退出 } } }) // 4. 统一关闭通道的协程(只关闭resChan) g.Go(func() error { <-resultDone // resChan已在流式处理协程关闭 return nil }) // 5. 历史记录保存协程 g.Go(func() error { <-resultDone chatMu.Lock() savedChat := make([]string, len(chat)) copy(savedChat, chat) chatMu.Unlock() if err := d.dingTalkBotBiz.SaveHis(ctx, requireData, savedChat); err != nil { log.Printf("保存历史记录失败: %v", err) return err } return nil }) // 阻塞直到所有协程完成或出错 if err := g.Wait(); err != nil { return err } return nil } // **一把梭先搞,后续规范化** func (d *DingBotService) OnCardMessageReceived(ctx context.Context, data *card.CardRequest) (resp *card.CardResponse, err error) { // 非回调类型暂不接受 if data.Type != constants.CardActionCallbackTypeAction { return nil, nil } // action 处理 - 这里先只处理第一个匹配的actionId for _, actionId := range data.CardActionData.CardPrivateData.ActionIdList { switch actionId { case constants.CardActionTypeCreateGroup: // 解析 OutTrackId 以获取 SpaceId 和 BotId spaceId, botId := constants.ParseCardOutTrackId(data.OutTrackId) // 获取新群聊人员 var userIds []string userIds, err = d.buildNewGroupUserIds(ctx, spaceId, botId, data.UserId) if err != nil { return nil, err } // 创建群聊及群初始化(ws中,直接协程) if data.CardActionData.CardPrivateData.Params["status"] == "confirm" { go func() { err := d.createIssueHandlingGroupAndInit(ctx, data.CardActionData.CardPrivateData.Params, spaceId, botId, userIds) if err != nil { log.Printf("创建群聊及群初始化失败: %v", err) } }() } // 构建关闭创建群组卡片按钮的响应 resp = d.buildCreateGroupCardResp() return } } return &card.CardResponse{}, nil } // buildNewGroupUserIds 构建新群聊人员列表 func (d *DingBotService) buildNewGroupUserIds(ctx context.Context, spaceId, botId, groupOwner string) ([]string, error) { // 群id+机器人id确认一个群配置 botGroup, err := d.botGroupImpl.GetByConversationIdAndRobotCode(spaceId, botId) if err != nil { return nil, err } // 获取群配置 var groupConfig model.AiBotGroupConfig cond := builder.NewCond().And(builder.Eq{"config_id": botGroup.ConfigID}) err = d.botGroupConfigImpl.GetOneBySearchToStrut(&cond, &groupConfig) if err != nil { return nil, err } // 获取处理人列表 issueOwnerJson := groupConfig.IssueOwner type issueOwnerType struct { UserId string `json:"userid"` Name string `json:"name"` } var issueOwner []issueOwnerType if err = json.Unmarshal([]byte(issueOwnerJson), &issueOwner); err != nil { return nil, err } // 合并所有userid userIds := []string{groupOwner} // 当前用户为群主 for _, owner := range issueOwner { userIds = append(userIds, owner.UserId) } return userIds, nil } // createIssueHandlingGroupAndInit 创建问题处理群聊及群初始化 func (d *DingBotService) createIssueHandlingGroupAndInit(ctx context.Context, callbackParams map[string]any, spaceId, botId string, userIds []string) error { // 获取机器人配置 botConfig, err := d.botConfigImpl.GetRobotConfig(botId) if err != nil { return err } appKey := dingtalk.AppKey{ AppKey: botConfig.ClientId, AppSecret: botConfig.ClientSecret, } // 获取 access_token accessToken, err := d.dingtalkOauth2Client.GetAccessToken(appKey) if err != nil { return err } appKey.AccessToken = accessToken // 创建群聊 _, openConversationId, err := d.createIssueHandlingGroup(ctx, accessToken, spaceId, botId, userIds) if err != nil { return err } // 添加当前机器人到新群 - SDK 有问题,后续再考虑使用 // _, err = d.dingtalkImClient.AddRobotToConversation( // appKey, // &im_1_0.AddRobotToConversationRequest{ // OpenConversationId: tea.String(openConversationId), // RobotCode: tea.String(botId), // }) // if err != nil { // fmt.Printf("添加机器人到会话失败: %v", err) // } // 返回新群分享链接,直接进群 - SDK 有问题,后续再考虑使用 // newGroupShareLink, err = d.dingTalkOld.GetJoinGroupQrcode(ctx, chatId, data.UserId) // if err != nil { // fmt.Printf("获取入群二维码失败: %v", err) // } // 初始化群聊 // 1.开场白 // 群主题 groupScope := callbackParams["group_scope"].(string) // 构建卡片 OutTrackId outTrackId := constants.BuildCardOutTrackId(openConversationId, d.config.Dingtalk.SceneGroup.GroupTemplateRobotIDIssueHandling) _, err = d.dingtalkCardClient.CreateAndDeliver( appKey, &card_1_0.CreateAndDeliverRequest{ CardTemplateId: tea.String(d.config.Dingtalk.Card.Template.BaseMsg), OutTrackId: tea.String(outTrackId), CallbackType: tea.String("HTTP"), CardData: &card_1_0.CreateAndDeliverRequestCardData{ CardParamMap: map[string]*string{ "title": tea.String("当前会话主题"), "markdown": tea.String("问题:" + groupScope), }, }, ImGroupOpenSpaceModel: &card_1_0.CreateAndDeliverRequestImGroupOpenSpaceModel{ SupportForward: tea.Bool(false), }, OpenSpaceId: tea.String("dtv1.card//im_group." + openConversationId), ImGroupOpenDeliverModel: &card_1_0.CreateAndDeliverRequestImGroupOpenDeliverModel{ RobotCode: tea.String(d.config.Dingtalk.SceneGroup.GroupTemplateRobotIDIssueHandling), AtUserIds: map[string]*string{ "@ALL": tea.String("@ALL"), }, }, }, ) // 2. 机器人能力 // 构建卡片 OutTrackId outTrackId = constants.BuildCardOutTrackId(openConversationId, d.config.Dingtalk.SceneGroup.GroupTemplateRobotIDIssueHandling) _, err = d.dingtalkCardClient.CreateAndDeliver( appKey, &card_1_0.CreateAndDeliverRequest{ CardTemplateId: tea.String(d.config.Dingtalk.Card.Template.BaseMsg), OutTrackId: tea.String(outTrackId), CallbackType: tea.String("HTTP"), CardData: &card_1_0.CreateAndDeliverRequestCardData{ CardParamMap: map[string]*string{ "title": tea.String("当前机器人能力"), "markdown": tea.String("- 聊天内容提取(@机器人 [内容提取]{聊天记录/问答描述}) \n - QA知识收集(卡片信息收集) \n - QA知识问答(@机器人 [知识库查询]{问题描述})"), }, }, ImGroupOpenSpaceModel: &card_1_0.CreateAndDeliverRequestImGroupOpenSpaceModel{ SupportForward: tea.Bool(false), }, OpenSpaceId: tea.String("dtv1.card//im_group." + openConversationId), ImGroupOpenDeliverModel: &card_1_0.CreateAndDeliverRequestImGroupOpenDeliverModel{ RobotCode: tea.String(d.config.Dingtalk.SceneGroup.GroupTemplateRobotIDIssueHandling), AtUserIds: map[string]*string{ "@ALL": tea.String("@ALL"), }, }, }, ) return nil } // createGroupV1 创建普通内部群会话 // 这里用的是“统一登录平台”这个应用的接口加入群聊 - 这里用的是“统一登录平台”这个应用的接口 func (d *DingBotService) createIssueHandlingGroup(ctx context.Context, accessToken, spaceId, botId string, userIds []string) (chatId, openConversationId string, err error) { // 是否使用模板群开关 var useTemplateGroup bool = true // 创建内部群会话 if !useTemplateGroup { return d.dingTalkOld.CreateInternalGroupConversation(ctx, accessToken, "问题处理群", userIds) } // 根据群模板ID创建群 if useTemplateGroup { return d.dingTalkOld.CreateSceneGroupConversation(ctx, accessToken, "问题处理群", userIds, d.config.Dingtalk.SceneGroup.GroupTemplateIDIssueHandling) } return } // buildCreateGroupCardResp 构建关闭创建群组卡片按钮 func (d *DingBotService) buildCreateGroupCardResp() *card.CardResponse { return &card.CardResponse{ CardData: &card.CardDataDto{ CardParamMap: map[string]string{ "button_display": "false", }, }, CardUpdateOptions: &card.CardUpdateOptions{ UpdateCardDataByKey: true, }, } }