diff --git a/config/config_env.yaml b/config/config_env.yaml index 1b1a8bd..e480b7a 100644 --- a/config/config_env.yaml +++ b/config/config_env.yaml @@ -4,10 +4,10 @@ server: host: "0.0.0.0" ollama: - base_url: "http://192.168.6.109:11434" - model: "qwen3-coder:480b-cloud" - generate_model: "qwen3-coder:480b-cloud" - mapping_model: "deepseek-v3.2:cloud" + base_url: "http://192.168.6.115:11434" + model: "qwen3:8b" + generate_model: "qwen3:8b" + mapping_model: "qwen3:8b" vl_model: "qwen2.5vl:7b" timeout: "120s" level: "info" diff --git a/internal/biz/group_config.go b/internal/biz/group_config.go index f58ba03..0783a0f 100644 --- a/internal/biz/group_config.go +++ b/internal/biz/group_config.go @@ -10,6 +10,7 @@ import ( "ai_scheduler/internal/domain/workflow/runtime" "ai_scheduler/internal/entitys" "ai_scheduler/internal/pkg" + "ai_scheduler/internal/pkg/dingtalk" "ai_scheduler/internal/pkg/l_request" "ai_scheduler/internal/pkg/lsxd" "ai_scheduler/internal/pkg/utils_oss" @@ -42,6 +43,8 @@ type GroupConfigBiz struct { toolManager *tools.Manager conf *config.Config rdb *utils.Rdb + dingtalkOauth2Client *dingtalk.Oauth2Client + dingtalkRobotClient *dingtalk.RobotClient } // NewDingTalkBotBiz @@ -54,6 +57,8 @@ func NewGroupConfigBiz( reportDailyCacheImpl *impl.ReportDailyCacheImpl, rdb *utils.Rdb, toolManager *tools.Manager, + dingtalkOauth2Client *dingtalk.Oauth2Client, + dingtalkRobotClient *dingtalk.RobotClient, ) *GroupConfigBiz { return &GroupConfigBiz{ botTools: tools.BootTools, @@ -64,6 +69,8 @@ func NewGroupConfigBiz( reportDailyCacheImpl: reportDailyCacheImpl, rdb: rdb, toolManager: toolManager, + dingtalkOauth2Client: dingtalkOauth2Client, + dingtalkRobotClient: dingtalkRobotClient, } } @@ -488,11 +495,24 @@ func (g *GroupConfigBiz) handleKnowledgeV2(ctx context.Context, rec *entitys.Rec } defer resp.Body.Close() - err = g.connectAndReadSSE(resp, rec.Ch, true) + isRetrieved, err := g.connectAndReadSSE(resp, rec.Ch, true) if err != nil { return } + // 未检索到匹配信息,询问是否拉群 + if !isRetrieved { + // 获取dingtalk accessToken + accessToken, _ := g.dingtalkOauth2Client.GetAccessToken() + // 发送钉钉卡片 + _, err = g.dingtalkRobotClient.SendGroupMessages(accessToken, rec.UserContent.Text) + if err != nil { + return fmt.Errorf("发送钉钉卡片失败,err: %v", err) + } + // entitys.ResStream(rec.Ch, "", fmt.Sprintf("已发送卡片,查询ID: %s", queryKey)) + return + } + return } @@ -501,13 +521,37 @@ func (g *GroupConfigBiz) handleKnowledgeV2(ctx context.Context, rec *entitys.Rec // data: {"text": "1. 上下文检索中...\n"} // event: answer // data: {"text": "根据"} -func (g *GroupConfigBiz) connectAndReadSSE(resp *http.Response, channel chan entitys.Response, useParagraphMode bool) error { +func (g *GroupConfigBiz) connectAndReadSSE(resp *http.Response, channel chan entitys.Response, useParagraphMode bool) (isRetrieved bool, err error) { scanner := bufio.NewScanner(resp.Body) var buffer strings.Builder for scanner.Scan() { line := scanner.Text() + // 解析event行 + if strings.HasPrefix(line, "event:") { + eventStr := strings.TrimSpace(strings.TrimPrefix(line, "event:")) + if eventStr == "" { + continue + } + + // thinking不输出 + if eventStr == "thinking" { + continue + } + // system 事件输出 + if eventStr == "system" { + // 未检索到,直接返回 + dataStr := strings.TrimSpace(strings.TrimPrefix(line, "data:")) + if dataStr != "retrieved" { + return false, nil + } + continue + } + + continue + } + // 解析 data 行 if strings.HasPrefix(line, "data:") { dataStr := strings.TrimSpace(strings.TrimPrefix(line, "data:")) @@ -549,7 +593,7 @@ func (g *GroupConfigBiz) connectAndReadSSE(resp *http.Response, channel chan ent } if err := scanner.Err(); err != nil { - return fmt.Errorf("读取SSE流中断: %w", err) + return true, fmt.Errorf("读取SSE流中断: %w", err) } // 发送缓冲区剩余内容(仅在段落模式下需要) @@ -557,7 +601,7 @@ func (g *GroupConfigBiz) connectAndReadSSE(resp *http.Response, channel chan ent entitys.ResStream(channel, "", buffer.String()) } - return nil + return true, nil } // handleKnowledgeV3 处理知识库V3同步版本 diff --git a/internal/pkg/dingtalk/oauth2_client.go b/internal/pkg/dingtalk/oauth2_client.go new file mode 100644 index 0000000..409bba0 --- /dev/null +++ b/internal/pkg/dingtalk/oauth2_client.go @@ -0,0 +1,64 @@ +package dingtalk + +import ( + "ai_scheduler/internal/config" + errorcode "ai_scheduler/internal/data/error" + "ai_scheduler/utils" + "context" + "fmt" + "time" + + openapi "github.com/alibabacloud-go/darabonba-openapi/v2/client" + oauth2 "github.com/alibabacloud-go/dingtalk/oauth2_1_0" + "github.com/alibabacloud-go/tea/tea" + "github.com/redis/go-redis/v9" +) + +type Oauth2Client struct { + config *config.Config + cli *oauth2.Client + redisCli *redis.Client +} + +func NewOauth2Client(config *config.Config, rds *utils.Rdb) (*Oauth2Client, error) { + cfg := &openapi.Config{ + AccessKeyId: tea.String(config.Tools.DingTalkBot.APIKey), + AccessKeySecret: tea.String(config.Tools.DingTalkBot.APISecret), + Protocol: tea.String("https"), + RegionId: tea.String("central"), + } + c, err := oauth2.NewClient(cfg) + if err != nil { + return nil, err + } + return &Oauth2Client{config: config, cli: c, redisCli: rds.Rdb}, nil +} + +func (c *Oauth2Client) GetAccessToken() (string, error) { + // 去cache + ctx := context.Background() + accessToken, err := c.redisCli.Get(ctx, "dingtalk:oauth2:access_token").Result() + if err == nil { + fmt.Println("get access token from cache:", accessToken) + return accessToken, nil + } + if err != redis.Nil { + return "", err + } + + resp, err := c.cli.GetAccessToken(&oauth2.GetAccessTokenRequest{ + AppKey: tea.String("ding5wwvnf9hxeyjau7t"), + AppSecret: tea.String("FxXVlTzxrKXvJ8h-9uK0s5TjaBfOJSXumpmrHal-NmQAtku9wOPxcss0Af6WHoAK"), + }) + if err != nil { + return "", err + } + + if resp.Body == nil { + return "", errorcode.ParamErrf("empty response body") + } + + c.redisCli.Set(ctx, "dingtalk:oauth2:access_token", *resp.Body.AccessToken, time.Duration(*resp.Body.ExpireIn)*time.Second) + + return *resp.Body.AccessToken, nil +} diff --git a/internal/pkg/dingtalk/robot_client.go b/internal/pkg/dingtalk/robot_client.go new file mode 100644 index 0000000..3f1f40f --- /dev/null +++ b/internal/pkg/dingtalk/robot_client.go @@ -0,0 +1,61 @@ +package dingtalk + +import ( + "ai_scheduler/internal/config" + errorcode "ai_scheduler/internal/data/error" + + openapi "github.com/alibabacloud-go/darabonba-openapi/v2/client" + robot "github.com/alibabacloud-go/dingtalk/robot_1_0" + util "github.com/alibabacloud-go/tea-utils/v2/service" + "github.com/alibabacloud-go/tea/tea" +) + +type RobotClient struct { + config *config.Config + cli *robot.Client +} + +func NewRobotClient(config *config.Config) (*RobotClient, error) { + cfg := &openapi.Config{ + AccessKeyId: tea.String(config.Tools.DingTalkBot.APIKey), + AccessKeySecret: tea.String(config.Tools.DingTalkBot.APISecret), + Protocol: tea.String("https"), + RegionId: tea.String("central"), + } + c, err := robot.NewClient(cfg) + if err != nil { + return nil, err + } + return &RobotClient{config: config, cli: c}, nil +} + +type SendGroupMessagesReq struct { + FullMatchField int32 + QueryWord string + Offset int32 + Size int32 +} + +type SendGroupMessagesResp struct { + Body interface{} +} + +func (c *RobotClient) SendGroupMessages(accessToken string, name string) (string, error) { + headers := &robot.OrgGroupSendHeaders{} + headers.XAcsDingtalkAccessToken = tea.String(accessToken) + resp, err := c.cli.OrgGroupSendWithOptions(&robot.OrgGroupSendRequest{ + MsgKey: tea.String("sampleText"), + MsgParam: tea.String("{\"content\":\"今天吃肘子\"}"), + OpenConversationId: tea.String("cidwP24PLZhLVOS2dVIkEawLw=="), + RobotCode: tea.String("ding5wwvnf9hxeyjau7t"), + }, headers, &util.RuntimeOptions{}) + if err != nil { + return "", err + } + + if resp.Body == nil { + return "", errorcode.ParamErrf("empty response body") + } + + return *resp.Body.ProcessQueryKey, nil +} diff --git a/internal/pkg/provider_set.go b/internal/pkg/provider_set.go index 16bef95..b7120c6 100644 --- a/internal/pkg/provider_set.go +++ b/internal/pkg/provider_set.go @@ -21,6 +21,8 @@ var ProviderSetClient = wire.NewSet( dingtalk.NewOldClient, dingtalk.NewContactClient, dingtalk.NewNotableClient, + dingtalk.NewRobotClient, + dingtalk.NewOauth2Client, utils_oss.NewClient, lsxd.NewLogin,