393 lines
12 KiB
Go
393 lines
12 KiB
Go
package services
|
||
|
||
import (
|
||
"ai_scheduler/internal/biz"
|
||
"ai_scheduler/internal/config"
|
||
"ai_scheduler/internal/data/constants"
|
||
"ai_scheduler/internal/data/impl"
|
||
"ai_scheduler/internal/data/model"
|
||
"ai_scheduler/internal/entitys"
|
||
"ai_scheduler/internal/pkg/dingtalk"
|
||
"context"
|
||
"encoding/json"
|
||
"log"
|
||
"sync"
|
||
"time"
|
||
|
||
"gitea.cdlsxd.cn/self-tools/l-dingtalk-stream-sdk-go/card"
|
||
"gitea.cdlsxd.cn/self-tools/l-dingtalk-stream-sdk-go/chatbot"
|
||
"github.com/alibabacloud-go/dingtalk/card_1_0"
|
||
"github.com/alibabacloud-go/tea/tea"
|
||
"golang.org/x/sync/errgroup"
|
||
"xorm.io/builder"
|
||
)
|
||
|
||
type DingBotService struct {
|
||
config *config.Config
|
||
dingTalkBotBiz *biz.DingTalkBotBiz
|
||
dingTalkOld *dingtalk.OldClient
|
||
dingtalkCardClient *dingtalk.CardClient
|
||
dingtalkImClient *dingtalk.ImClient
|
||
dingtalkOauth2Client *dingtalk.Oauth2Client
|
||
botGroupConfigImpl *impl.BotGroupConfigImpl
|
||
botGroupImpl *impl.BotGroupImpl
|
||
botConfigImpl *impl.BotConfigImpl
|
||
}
|
||
|
||
func NewDingBotService(
|
||
config *config.Config,
|
||
dingTalkBotBiz *biz.DingTalkBotBiz,
|
||
dingTalkOld *dingtalk.OldClient,
|
||
dingtalkCardClient *dingtalk.CardClient,
|
||
dingtalkImClient *dingtalk.ImClient,
|
||
dingtalkOauth2Client *dingtalk.Oauth2Client,
|
||
botGroupConfigImpl *impl.BotGroupConfigImpl,
|
||
botGroupImpl *impl.BotGroupImpl,
|
||
botConfigImpl *impl.BotConfigImpl,
|
||
) *DingBotService {
|
||
return &DingBotService{
|
||
config: config,
|
||
dingTalkBotBiz: dingTalkBotBiz,
|
||
dingTalkOld: dingTalkOld,
|
||
dingtalkCardClient: dingtalkCardClient,
|
||
dingtalkImClient: dingtalkImClient,
|
||
dingtalkOauth2Client: dingtalkOauth2Client,
|
||
botGroupConfigImpl: botGroupConfigImpl,
|
||
botGroupImpl: botGroupImpl,
|
||
botConfigImpl: botConfigImpl,
|
||
}
|
||
}
|
||
|
||
func (d *DingBotService) GetServiceCfg() ([]entitys.DingTalkBot, error) {
|
||
return d.dingTalkBotBiz.GetDingTalkBotCfgList()
|
||
}
|
||
|
||
func (d *DingBotService) OnChatBotMessageReceived(ctx context.Context, data *chatbot.BotCallbackDataModel) ([]byte, error) {
|
||
requireData, err := d.dingTalkBotBiz.InitRequire(ctx, data)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// 启动后台任务(独立生命周期,带超时控制)
|
||
go func() {
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
log.Printf("稍等一下,问的人太多了,有点转不过来,请稍等下再来🚀🚀🚀")
|
||
}
|
||
}()
|
||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
|
||
defer cancel()
|
||
if err := d.runBackgroundTasks(ctx, data, requireData); err != nil {
|
||
log.Printf("后台任务执行失败: %v", err)
|
||
}
|
||
}()
|
||
|
||
return []byte("success"), nil
|
||
}
|
||
|
||
func (d *DingBotService) runBackgroundTasks(ctx context.Context, data *chatbot.BotCallbackDataModel, requireData *entitys.RequireDataDingTalkBot) error {
|
||
g, ctx := errgroup.WithContext(ctx)
|
||
var (
|
||
chat []string
|
||
chatMu sync.Mutex
|
||
resChan = make(chan string, 10)
|
||
)
|
||
|
||
// 1. 流式处理协程
|
||
g.Go(func() error {
|
||
defer func() {
|
||
// 确保通道最终关闭
|
||
close(resChan)
|
||
}()
|
||
return d.dingTalkBotBiz.HandleStreamRes(ctx, data, resChan)
|
||
})
|
||
|
||
// 2. 业务处理协程(负责关闭requireData.Ch)
|
||
g.Go(func() error {
|
||
// 在完成时关闭通道
|
||
defer close(requireData.Ch)
|
||
|
||
//entitys.ResLoading(requireData.Ch, "", "
|
||
//entitys.ResLoading(requireData.Ch, "", "https://p6-img.")
|
||
//entitys.ResLoading(requireData.Ch, "", "searchpstatp.com/")
|
||
//entitys.ResLoading(requireData.Ch, "", "tos-cn-i-vvloioitz3/ab5ae998d8162b431f44fb2a0ed9ae33~tplv-vvloioitz3-6:190:124.jpeg)")
|
||
|
||
return d.dingTalkBotBiz.Do(ctx, requireData)
|
||
})
|
||
|
||
// 3. 结果收集协程(修改后的版本)
|
||
resultDone := make(chan struct{})
|
||
g.Go(func() error {
|
||
// 使用defer确保通道关闭
|
||
defer close(resultDone)
|
||
// 处理通道中的数据
|
||
for {
|
||
select {
|
||
case resp, ok := <-requireData.Ch:
|
||
if !ok {
|
||
return nil // 通道已关闭,正常退出
|
||
}
|
||
if resp.Type != entitys.ResponseLog {
|
||
chatMu.Lock()
|
||
chat = append(chat, resp.Content)
|
||
chatMu.Unlock()
|
||
|
||
select {
|
||
case resChan <- resp.Content:
|
||
case <-ctx.Done():
|
||
return ctx.Err()
|
||
}
|
||
}
|
||
case <-ctx.Done():
|
||
return ctx.Err() // 上下文取消,提前退出
|
||
}
|
||
}
|
||
})
|
||
|
||
// 4. 统一关闭通道的协程(只关闭resChan)
|
||
g.Go(func() error {
|
||
<-resultDone
|
||
// resChan已在流式处理协程关闭
|
||
return nil
|
||
})
|
||
|
||
// 5. 历史记录保存协程
|
||
g.Go(func() error {
|
||
<-resultDone
|
||
chatMu.Lock()
|
||
savedChat := make([]string, len(chat))
|
||
copy(savedChat, chat)
|
||
chatMu.Unlock()
|
||
|
||
if err := d.dingTalkBotBiz.SaveHis(ctx, requireData, savedChat); err != nil {
|
||
log.Printf("保存历史记录失败: %v", err)
|
||
return err
|
||
}
|
||
return nil
|
||
})
|
||
|
||
// 阻塞直到所有协程完成或出错
|
||
if err := g.Wait(); err != nil {
|
||
return err
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// **一把梭先搞,后续规范化**
|
||
func (d *DingBotService) OnCardMessageReceived(ctx context.Context, data *card.CardRequest) (resp *card.CardResponse, err error) {
|
||
// 非回调类型暂不接受
|
||
if data.Type != constants.CardActionCallbackTypeAction {
|
||
return nil, nil
|
||
}
|
||
|
||
// action 处理 - 这里先只处理第一个匹配的actionId
|
||
for _, actionId := range data.CardActionData.CardPrivateData.ActionIdList {
|
||
switch actionId {
|
||
case constants.CardActionTypeCreateGroup:
|
||
// 解析 OutTrackId 以获取 SpaceId 和 BotId
|
||
spaceId, botId := constants.ParseCardOutTrackId(data.OutTrackId)
|
||
|
||
// 获取新群聊人员
|
||
var userIds []string
|
||
userIds, err = d.buildNewGroupUserIds(ctx, spaceId, botId, data.UserId)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// 创建群聊及群初始化(ws中,直接协程)
|
||
if data.CardActionData.CardPrivateData.Params["status"] == "confirm" {
|
||
go func() {
|
||
err := d.createIssueHandlingGroupAndInit(ctx, data.CardActionData.CardPrivateData.Params, spaceId, botId, userIds)
|
||
if err != nil {
|
||
log.Printf("创建群聊及群初始化失败: %v", err)
|
||
}
|
||
}()
|
||
}
|
||
|
||
// 构建关闭创建群组卡片按钮的响应
|
||
resp = d.buildCreateGroupCardResp()
|
||
return
|
||
}
|
||
}
|
||
|
||
return &card.CardResponse{}, nil
|
||
}
|
||
|
||
// buildNewGroupUserIds 构建新群聊人员列表
|
||
func (d *DingBotService) buildNewGroupUserIds(ctx context.Context, spaceId, botId, groupOwner string) ([]string, error) {
|
||
// 群id+机器人id确认一个群配置
|
||
botGroup, err := d.botGroupImpl.GetByConversationIdAndRobotCode(spaceId, botId)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// 获取群配置
|
||
var groupConfig model.AiBotGroupConfig
|
||
cond := builder.NewCond().And(builder.Eq{"config_id": botGroup.ConfigID})
|
||
err = d.botGroupConfigImpl.GetOneBySearchToStrut(&cond, &groupConfig)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// 获取处理人列表
|
||
issueOwnerJson := groupConfig.IssueOwner
|
||
type issueOwnerType struct {
|
||
UserId string `json:"userid"`
|
||
Name string `json:"name"`
|
||
}
|
||
var issueOwner []issueOwnerType
|
||
if err = json.Unmarshal([]byte(issueOwnerJson), &issueOwner); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// 合并所有userid
|
||
userIds := []string{groupOwner} // 当前用户为群主
|
||
for _, owner := range issueOwner {
|
||
userIds = append(userIds, owner.UserId)
|
||
}
|
||
|
||
return userIds, nil
|
||
}
|
||
|
||
// createIssueHandlingGroupAndInit 创建问题处理群聊及群初始化
|
||
func (d *DingBotService) createIssueHandlingGroupAndInit(ctx context.Context, callbackParams map[string]any, spaceId, botId string, userIds []string) error {
|
||
// 获取机器人配置
|
||
botConfig, err := d.botConfigImpl.GetRobotConfig(botId)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
appKey := dingtalk.AppKey{
|
||
AppKey: botConfig.ClientId,
|
||
AppSecret: botConfig.ClientSecret,
|
||
}
|
||
|
||
// 获取 access_token
|
||
accessToken, err := d.dingtalkOauth2Client.GetAccessToken(appKey)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
appKey.AccessToken = accessToken
|
||
|
||
// 创建群聊
|
||
_, openConversationId, err := d.createIssueHandlingGroup(ctx, accessToken, spaceId, botId, userIds)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 添加当前机器人到新群 - SDK 有问题,后续再考虑使用
|
||
// _, err = d.dingtalkImClient.AddRobotToConversation(
|
||
// appKey,
|
||
// &im_1_0.AddRobotToConversationRequest{
|
||
// OpenConversationId: tea.String(openConversationId),
|
||
// RobotCode: tea.String(botId),
|
||
// })
|
||
// if err != nil {
|
||
// fmt.Printf("添加机器人到会话失败: %v", err)
|
||
// }
|
||
|
||
// 返回新群分享链接,直接进群 - SDK 有问题,后续再考虑使用
|
||
// newGroupShareLink, err = d.dingTalkOld.GetJoinGroupQrcode(ctx, chatId, data.UserId)
|
||
// if err != nil {
|
||
// fmt.Printf("获取入群二维码失败: %v", err)
|
||
// }
|
||
|
||
// 初始化群聊
|
||
// 1.开场白
|
||
|
||
// 构建卡片 OutTrackId
|
||
outTrackId := constants.BuildCardOutTrackId(openConversationId, constants.GroupTemplateRobotIdIssueHandling)
|
||
|
||
// 群主题
|
||
groupScope := callbackParams["group_scope"].(string)
|
||
|
||
_, err = d.dingtalkCardClient.CreateAndDeliver(
|
||
appKey,
|
||
&card_1_0.CreateAndDeliverRequest{
|
||
CardTemplateId: tea.String(constants.DingtalkCardTplBaseMsg),
|
||
OutTrackId: tea.String(outTrackId),
|
||
CallbackType: tea.String("HTTP"),
|
||
CardData: &card_1_0.CreateAndDeliverRequestCardData{
|
||
CardParamMap: map[string]*string{
|
||
"title": tea.String("当前会话主题"),
|
||
"markdown": tea.String("问题:" + groupScope),
|
||
},
|
||
},
|
||
ImGroupOpenSpaceModel: &card_1_0.CreateAndDeliverRequestImGroupOpenSpaceModel{
|
||
SupportForward: tea.Bool(false),
|
||
},
|
||
OpenSpaceId: tea.String("dtv1.card//im_group." + openConversationId),
|
||
ImGroupOpenDeliverModel: &card_1_0.CreateAndDeliverRequestImGroupOpenDeliverModel{
|
||
RobotCode: tea.String(constants.GroupTemplateRobotIdIssueHandling),
|
||
AtUserIds: map[string]*string{
|
||
"@ALL": tea.String("@ALL"),
|
||
},
|
||
},
|
||
},
|
||
)
|
||
|
||
// 2. 机器人能力
|
||
// 构建卡片 OutTrackId
|
||
outTrackId = constants.BuildCardOutTrackId(openConversationId, constants.GroupTemplateRobotIdIssueHandling)
|
||
_, err = d.dingtalkCardClient.CreateAndDeliver(
|
||
appKey,
|
||
&card_1_0.CreateAndDeliverRequest{
|
||
CardTemplateId: tea.String(constants.DingtalkCardTplBaseMsg),
|
||
OutTrackId: tea.String(outTrackId),
|
||
CallbackType: tea.String("HTTP"),
|
||
CardData: &card_1_0.CreateAndDeliverRequestCardData{
|
||
CardParamMap: map[string]*string{
|
||
"title": tea.String("当前机器人能力"),
|
||
"markdown": tea.String("- 聊天内容提取(@机器人 [内容提取]) \n - QA知识收集 (@机器人 [QA收集])"),
|
||
},
|
||
},
|
||
ImGroupOpenSpaceModel: &card_1_0.CreateAndDeliverRequestImGroupOpenSpaceModel{
|
||
SupportForward: tea.Bool(false),
|
||
},
|
||
OpenSpaceId: tea.String("dtv1.card//im_group." + openConversationId),
|
||
ImGroupOpenDeliverModel: &card_1_0.CreateAndDeliverRequestImGroupOpenDeliverModel{
|
||
RobotCode: tea.String(constants.GroupTemplateRobotIdIssueHandling),
|
||
AtUserIds: map[string]*string{
|
||
"@ALL": tea.String("@ALL"),
|
||
},
|
||
},
|
||
},
|
||
)
|
||
|
||
return nil
|
||
}
|
||
|
||
// createGroupV1 创建普通内部群会话
|
||
// 这里用的是“统一登录平台”这个应用的接口加入群聊 - 这里用的是“统一登录平台”这个应用的接口
|
||
func (d *DingBotService) createIssueHandlingGroup(ctx context.Context, accessToken, spaceId, botId string, userIds []string) (chatId, openConversationId string, err error) {
|
||
// 是否使用模板群开关
|
||
var useTemplateGroup bool = true
|
||
|
||
// 创建内部群会话
|
||
if !useTemplateGroup {
|
||
return d.dingTalkOld.CreateInternalGroupConversation(ctx, accessToken, "问题处理群", userIds)
|
||
}
|
||
|
||
// 根据群模板ID创建群
|
||
if useTemplateGroup {
|
||
return d.dingTalkOld.CreateSceneGroupConversation(ctx, accessToken, "问题处理群", userIds, constants.GroupTemplateIdIssueHandling)
|
||
}
|
||
|
||
return
|
||
}
|
||
|
||
// buildCreateGroupCardResp 构建关闭创建群组卡片按钮
|
||
func (d *DingBotService) buildCreateGroupCardResp() *card.CardResponse {
|
||
return &card.CardResponse{
|
||
CardData: &card.CardDataDto{
|
||
CardParamMap: map[string]string{
|
||
"button_display": "false",
|
||
},
|
||
},
|
||
CardUpdateOptions: &card.CardUpdateOptions{
|
||
UpdateCardDataByKey: true,
|
||
},
|
||
}
|
||
}
|