ai_scheduler/internal/services/dtalk_bot.go

329 lines
9.4 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package services
import (
"ai_scheduler/internal/biz"
"ai_scheduler/internal/config"
"ai_scheduler/internal/data/constants"
"ai_scheduler/internal/data/impl"
"ai_scheduler/internal/data/model"
"ai_scheduler/internal/entitys"
"ai_scheduler/internal/pkg/dingtalk"
"context"
"encoding/json"
"fmt"
"log"
"sync"
"time"
"gitea.cdlsxd.cn/self-tools/l-dingtalk-stream-sdk-go/card"
"gitea.cdlsxd.cn/self-tools/l-dingtalk-stream-sdk-go/chatbot"
"github.com/alibabacloud-go/dingtalk/card_1_0"
"github.com/alibabacloud-go/dingtalk/im_1_0"
"github.com/alibabacloud-go/tea/tea"
"golang.org/x/sync/errgroup"
"xorm.io/builder"
)
type DingBotService struct {
config *config.Config
dingTalkBotBiz *biz.DingTalkBotBiz
dingTalkOld *dingtalk.OldClient
dingtalkCardClient *dingtalk.CardClient
dingtalkImClient *dingtalk.ImClient
botGroupConfigImpl *impl.BotGroupConfigImpl
botGroupImpl *impl.BotGroupImpl
botConfigImpl *impl.BotConfigImpl
}
func NewDingBotService(
config *config.Config,
dingTalkBotBiz *biz.DingTalkBotBiz,
dingTalkOld *dingtalk.OldClient,
dingtalkCardClient *dingtalk.CardClient,
dingtalkImClient *dingtalk.ImClient,
botGroupConfigImpl *impl.BotGroupConfigImpl,
botGroupImpl *impl.BotGroupImpl,
botConfigImpl *impl.BotConfigImpl,
) *DingBotService {
return &DingBotService{
config: config,
dingTalkBotBiz: dingTalkBotBiz,
dingTalkOld: dingTalkOld,
dingtalkCardClient: dingtalkCardClient,
dingtalkImClient: dingtalkImClient,
botGroupConfigImpl: botGroupConfigImpl,
botGroupImpl: botGroupImpl,
botConfigImpl: botConfigImpl,
}
}
func (d *DingBotService) GetServiceCfg() ([]entitys.DingTalkBot, error) {
return d.dingTalkBotBiz.GetDingTalkBotCfgList()
}
func (d *DingBotService) OnChatBotMessageReceived(ctx context.Context, data *chatbot.BotCallbackDataModel) ([]byte, error) {
requireData, err := d.dingTalkBotBiz.InitRequire(ctx, data)
if err != nil {
return nil, err
}
// 启动后台任务(独立生命周期,带超时控制)
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("稍等一下,问的人太多了,有点转不过来,请稍等下再来🚀🚀🚀")
}
}()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
if err := d.runBackgroundTasks(ctx, data, requireData); err != nil {
log.Printf("后台任务执行失败: %v", err)
}
}()
return []byte("success"), nil
}
func (d *DingBotService) runBackgroundTasks(ctx context.Context, data *chatbot.BotCallbackDataModel, requireData *entitys.RequireDataDingTalkBot) error {
g, ctx := errgroup.WithContext(ctx)
var (
chat []string
chatMu sync.Mutex
resChan = make(chan string, 10)
)
// 1. 流式处理协程
g.Go(func() error {
defer func() {
// 确保通道最终关闭
close(resChan)
}()
return d.dingTalkBotBiz.HandleStreamRes(ctx, data, resChan)
})
// 2. 业务处理协程负责关闭requireData.Ch
g.Go(func() error {
// 在完成时关闭通道
defer close(requireData.Ch)
//entitys.ResLoading(requireData.Ch, "", "![图片](")
//entitys.ResLoading(requireData.Ch, "", "https://p6-img.")
//entitys.ResLoading(requireData.Ch, "", "searchpstatp.com/")
//entitys.ResLoading(requireData.Ch, "", "tos-cn-i-vvloioitz3/ab5ae998d8162b431f44fb2a0ed9ae33~tplv-vvloioitz3-6:190:124.jpeg)")
return d.dingTalkBotBiz.Do(ctx, requireData)
})
// 3. 结果收集协程(修改后的版本)
resultDone := make(chan struct{})
g.Go(func() error {
// 使用defer确保通道关闭
defer close(resultDone)
// 处理通道中的数据
for {
select {
case resp, ok := <-requireData.Ch:
if !ok {
return nil // 通道已关闭,正常退出
}
if resp.Type != entitys.ResponseLog {
chatMu.Lock()
chat = append(chat, resp.Content)
chatMu.Unlock()
select {
case resChan <- resp.Content:
case <-ctx.Done():
return ctx.Err()
}
}
case <-ctx.Done():
return ctx.Err() // 上下文取消,提前退出
}
}
})
// 4. 统一关闭通道的协程只关闭resChan
g.Go(func() error {
<-resultDone
// resChan已在流式处理协程关闭
return nil
})
// 5. 历史记录保存协程
g.Go(func() error {
<-resultDone
chatMu.Lock()
savedChat := make([]string, len(chat))
copy(savedChat, chat)
chatMu.Unlock()
if err := d.dingTalkBotBiz.SaveHis(ctx, requireData, savedChat); err != nil {
log.Printf("保存历史记录失败: %v", err)
return err
}
return nil
})
// 阻塞直到所有协程完成或出错
if err := g.Wait(); err != nil {
return err
}
return nil
}
// **一把梭先搞,后续规范化**
func (d *DingBotService) OnCardMessageReceived(ctx context.Context, data *card.CardRequest) (resp *card.CardResponse, err error) {
// 非回调类型暂不接受
if data.Type != constants.CardActionCallbackTypeAction {
return nil, nil
}
// 卡片同步回调超时时间为2s2s内同步返回2s后异步更新卡片
startTime := time.Now()
// action 处理 - 这里先只处理第一个匹配的actionId
for _, actionId := range data.CardActionData.CardPrivateData.ActionIdList {
switch actionId {
case constants.CardActionTypeCreateGroup:
// 解析 OutTrackId 以获取 SpaceId 和 BotId
spaceId, botId := constants.ParseCardOutTrackId(data.OutTrackId)
// 获取新群聊人员
var userIds []string
userIds, err = d.buildNewGroupUserIds(ctx, spaceId, botId, data.UserId)
if err != nil {
return nil, err
}
// 新群分享链接
newGroupShareLink := ""
timeOutLimit := 1500 * time.Millisecond
// 钉钉appKey
appKey := dingtalk.AppKey{}
if data.CardActionData.CardPrivateData.Params["status"] == "confirm" {
// 创建群聊 - 这里用的是“统一登录平台”这个应用的接口
// 不是很关心成功失败ws中后续考虑协程去创建
chatId, openConversationId, err := d.dingTalkOld.CreateInternalGroupConversation(ctx, "问题处理群", userIds)
if err != nil {
fmt.Printf("创建群聊失败: %v", err)
}
_ = openConversationId
// 获取机器人配置
var botConfig model.AiBotConfig
cond := builder.NewCond().And(builder.Eq{"robot_code": botId})
err = d.botConfigImpl.GetOneBySearchToStrut(&cond, &botConfig)
if err != nil {
return nil, err
}
// 解出 config
var config entitys.DingTalkBot
err = json.Unmarshal([]byte(botConfig.BotConfig), &config)
if err != nil {
log.Printf("配置解析失败 %v", err.Error())
}
appKey = dingtalk.AppKey{
AppKey: config.ClientId,
AppSecret: config.ClientSecret,
}
// 添加当前机器人到新群
_, err = d.dingtalkImClient.AddRobotToConversation(
appKey,
&im_1_0.AddRobotToConversationRequest{
OpenConversationId: tea.String(openConversationId),
RobotCode: tea.String(botId),
})
if err != nil {
fmt.Printf("添加机器人到会话失败: %v", err)
}
// 返回新群分享链接,直接进群
newGroupShareLink, err = d.dingTalkOld.GetJoinGroupQrcode(ctx, chatId, data.UserId)
if err != nil {
fmt.Printf("获取入群二维码失败: %v", err)
}
}
endTime := time.Now()
if endTime.Sub(startTime) > timeOutLimit {
// 异步更新卡片
d.dingtalkCardClient.UpdateCard(appKey, &card_1_0.UpdateCardRequest{
OutTrackId: tea.String(data.OutTrackId),
CardData: &card_1_0.UpdateCardRequestCardData{
CardParamMap: map[string]*string{
"button_display": tea.String("false"),
"new_group_share_link": tea.String(newGroupShareLink),
},
},
CardUpdateOptions: &card_1_0.UpdateCardRequestCardUpdateOptions{
UpdateCardDataByKey: tea.Bool(true),
},
})
return
}
// 构建关闭创建群组卡片按钮的响应
resp = d.buildCreateGroupCardResp(newGroupShareLink)
return
}
}
return &card.CardResponse{}, nil
}
// buildNewGroupUserIds 构建新群聊人员列表
func (d *DingBotService) buildNewGroupUserIds(ctx context.Context, spaceId, botId, groupOwner string) ([]string, error) {
// 群id+机器人id确认一个群配置
botGroup, err := d.botGroupImpl.GetByConversationIdAndRobotCode(spaceId, botId)
if err != nil {
return nil, err
}
// 获取群配置
var groupConfig model.AiBotGroupConfig
cond := builder.NewCond().And(builder.Eq{"config_id": botGroup.ConfigID})
err = d.botGroupConfigImpl.GetOneBySearchToStrut(&cond, &groupConfig)
if err != nil {
return nil, err
}
// 获取处理人列表
issueOwnerJson := groupConfig.IssueOwner
type issueOwnerType struct {
UserId string `json:"userid"`
Name string `json:"name"`
}
var issueOwner []issueOwnerType
if err = json.Unmarshal([]byte(issueOwnerJson), &issueOwner); err != nil {
return nil, err
}
// 合并所有userid
userIds := []string{groupOwner} // 当前用户为群主
for _, owner := range issueOwner {
userIds = append(userIds, owner.UserId)
}
return userIds, nil
}
// buildCreateGroupCardResp 构建关闭创建群组卡片按钮
func (d *DingBotService) buildCreateGroupCardResp(newGroupShareLink string) *card.CardResponse {
return &card.CardResponse{
CardData: &card.CardDataDto{
CardParamMap: map[string]string{
"button_display": "false",
"new_group_share_link": newGroupShareLink,
},
},
CardUpdateOptions: &card.CardUpdateOptions{
UpdateCardDataByKey: true,
},
}
}