ai-courseware/eino-project/internal/domain/monitor/monitor.go

527 lines
15 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package monitor
import (
"context"
"fmt"
"sync"
"time"
"github.com/go-kratos/kratos/v2/log"
)
// Metrics 监控指标
type Metrics struct {
// 请求统计
TotalRequests int64 `json:"total_requests"`
SuccessRequests int64 `json:"success_requests"`
ErrorRequests int64 `json:"error_requests"`
RequestsByType map[string]int64 `json:"requests_by_type"`
// 响应时间统计
AvgResponseTime float64 `json:"avg_response_time_ms"`
MaxResponseTime float64 `json:"max_response_time_ms"`
MinResponseTime float64 `json:"min_response_time_ms"`
ResponseTimes []float64 `json:"-"` // 不序列化,用于计算
// AI服务统计
AIRequests int64 `json:"ai_requests"`
AIErrors int64 `json:"ai_errors"`
AIAvgResponseTime float64 `json:"ai_avg_response_time_ms"`
// 向量数据库统计
VectorSearches int64 `json:"vector_searches"`
VectorInserts int64 `json:"vector_inserts"`
VectorErrors int64 `json:"vector_errors"`
// 会话统计
ActiveSessions int64 `json:"active_sessions"`
TotalSessions int64 `json:"total_sessions"`
// 知识库统计
KnowledgeQueries int64 `json:"knowledge_queries"`
DocumentsUploaded int64 `json:"documents_uploaded"`
// 系统资源
MemoryUsage float64 `json:"memory_usage_mb"`
CPUUsage float64 `json:"cpu_usage_percent"`
// 时间戳
LastUpdated time.Time `json:"last_updated"`
}
// Alert 告警信息
type Alert struct {
ID string `json:"id"`
Type string `json:"type"` // error, warning, info
Level string `json:"level"` // critical, high, medium, low
Title string `json:"title"`
Message string `json:"message"`
Metadata map[string]interface{} `json:"metadata"`
Timestamp time.Time `json:"timestamp"`
Resolved bool `json:"resolved"`
ResolvedAt *time.Time `json:"resolved_at,omitempty"`
}
// Monitor 监控器接口
type Monitor interface {
// 记录请求
RecordRequest(ctx context.Context, requestType string, duration time.Duration, success bool) error
// 记录AI请求
RecordAIRequest(ctx context.Context, duration time.Duration, success bool) error
// 记录向量操作
RecordVectorOperation(ctx context.Context, operation string, success bool) error
// 记录会话操作
RecordSessionOperation(ctx context.Context, operation string) error
// 记录知识库操作
RecordKnowledgeOperation(ctx context.Context, operation string) error
// 获取指标
GetMetrics(ctx context.Context) (*Metrics, error)
// 创建告警
CreateAlert(ctx context.Context, alertType, level, title, message string, metadata map[string]interface{}) error
// 获取告警列表
GetAlerts(ctx context.Context, resolved bool) ([]*Alert, error)
// 解决告警
ResolveAlert(ctx context.Context, alertID string) error
// 健康检查
HealthCheck(ctx context.Context) error
// 重置指标
ResetMetrics(ctx context.Context) error
// 记录LLM使用情况轻量级无需Trace/周期性上报)
RecordLLMUsage(ctx context.Context, usage *LLMUsage) error
}
// monitor 监控器实现
type monitor struct {
metrics *Metrics
alerts map[string]*Alert
mutex sync.RWMutex
logger *log.Helper
// 配置
maxResponseTimes int
alertThresholds map[string]float64
}
// LLMUsage 轻量化的LLM使用事件用于统计模型使用、token消耗、时延等
type LLMUsage struct {
Model string `json:"model"`
SessionID string `json:"session_id"`
UserID string `json:"user_id"`
PromptPreview string `json:"prompt_preview"`
PromptTokens int `json:"prompt_tokens"`
CompletionTokens int `json:"completion_tokens"`
TotalTokens int `json:"total_tokens"`
LatencyMS int64 `json:"latency_ms"`
AgentThought string `json:"agent_thought"` // 例如意图分析或检索摘要
KnowledgeHits int `json:"knowledge_hits"`
KnowledgeContextBytes int `json:"knowledge_context_bytes"`
Metadata map[string]string `json:"metadata"`
Timestamp time.Time `json:"timestamp"`
}
// NewMonitor 创建监控器
func NewMonitor(logger log.Logger) Monitor {
return &monitor{
metrics: &Metrics{
RequestsByType: make(map[string]int64),
ResponseTimes: make([]float64, 0),
LastUpdated: time.Now(),
},
alerts: make(map[string]*Alert),
mutex: sync.RWMutex{},
logger: log.NewHelper(logger),
maxResponseTimes: 1000, // 保留最近1000次响应时间
alertThresholds: map[string]float64{
"error_rate": 0.05, // 5%错误率
"response_time": 5000, // 5秒响应时间
"ai_error_rate": 0.10, // 10%AI错误率
},
}
}
// RecordRequest 记录请求
func (m *monitor) RecordRequest(ctx context.Context, requestType string, duration time.Duration, success bool) error {
m.mutex.Lock()
defer m.mutex.Unlock()
m.metrics.TotalRequests++
m.metrics.RequestsByType[requestType]++
if success {
m.metrics.SuccessRequests++
} else {
m.metrics.ErrorRequests++
}
// 记录响应时间
durationMs := float64(duration.Nanoseconds()) / 1e6
m.metrics.ResponseTimes = append(m.metrics.ResponseTimes, durationMs)
// 限制响应时间数组大小
if len(m.metrics.ResponseTimes) > m.maxResponseTimes {
m.metrics.ResponseTimes = m.metrics.ResponseTimes[1:]
}
// 更新响应时间统计
m.updateResponseTimeStats()
// 检查告警条件
m.checkAlerts()
m.metrics.LastUpdated = time.Now()
m.logger.Infof("Recorded request: type=%s, duration=%.2fms, success=%v", requestType, durationMs, success)
return nil
}
// RecordAIRequest 记录AI请求
func (m *monitor) RecordAIRequest(ctx context.Context, duration time.Duration, success bool) error {
m.mutex.Lock()
defer m.mutex.Unlock()
m.metrics.AIRequests++
if !success {
m.metrics.AIErrors++
}
// 更新AI平均响应时间
durationMs := float64(duration.Nanoseconds()) / 1e6
if m.metrics.AIRequests == 1 {
m.metrics.AIAvgResponseTime = durationMs
} else {
m.metrics.AIAvgResponseTime = (m.metrics.AIAvgResponseTime*float64(m.metrics.AIRequests-1) + durationMs) / float64(m.metrics.AIRequests)
}
m.metrics.LastUpdated = time.Now()
m.logger.Infof("Recorded AI request: duration=%.2fms, success=%v", durationMs, success)
return nil
}
// RecordVectorOperation 记录向量操作
func (m *monitor) RecordVectorOperation(ctx context.Context, operation string, success bool) error {
m.mutex.Lock()
defer m.mutex.Unlock()
switch operation {
case "search":
m.metrics.VectorSearches++
case "insert":
m.metrics.VectorInserts++
}
if !success {
m.metrics.VectorErrors++
}
m.metrics.LastUpdated = time.Now()
m.logger.Infof("Recorded vector operation: operation=%s, success=%v", operation, success)
return nil
}
// RecordSessionOperation 记录会话操作
func (m *monitor) RecordSessionOperation(ctx context.Context, operation string) error {
m.mutex.Lock()
defer m.mutex.Unlock()
switch operation {
case "create":
m.metrics.TotalSessions++
m.metrics.ActiveSessions++
case "close":
m.metrics.ActiveSessions--
if m.metrics.ActiveSessions < 0 {
m.metrics.ActiveSessions = 0
}
}
m.metrics.LastUpdated = time.Now()
m.logger.Infof("Recorded session operation: operation=%s", operation)
return nil
}
// RecordKnowledgeOperation 记录知识库操作
func (m *monitor) RecordKnowledgeOperation(ctx context.Context, operation string) error {
m.mutex.Lock()
defer m.mutex.Unlock()
switch operation {
case "query":
m.metrics.KnowledgeQueries++
case "upload":
m.metrics.DocumentsUploaded++
}
m.metrics.LastUpdated = time.Now()
m.logger.Infof("Recorded knowledge operation: operation=%s", operation)
return nil
}
// GetMetrics 获取指标
func (m *monitor) GetMetrics(ctx context.Context) (*Metrics, error) {
m.mutex.RLock()
defer m.mutex.RUnlock()
// 创建副本以避免并发修改
metricsCopy := *m.metrics
metricsCopy.RequestsByType = make(map[string]int64)
for k, v := range m.metrics.RequestsByType {
metricsCopy.RequestsByType[k] = v
}
return &metricsCopy, nil
}
// CreateAlert 创建告警
func (m *monitor) CreateAlert(ctx context.Context, alertType, level, title, message string, metadata map[string]interface{}) error {
m.mutex.Lock()
defer m.mutex.Unlock()
alertID := fmt.Sprintf("%s_%d", alertType, time.Now().UnixNano())
alert := &Alert{
ID: alertID,
Type: alertType,
Level: level,
Title: title,
Message: message,
Metadata: metadata,
Timestamp: time.Now(),
Resolved: false,
}
m.alerts[alertID] = alert
m.logger.Warnf("Created alert: type=%s, level=%s, title=%s", alertType, level, title)
return nil
}
// GetAlerts 获取告警列表
func (m *monitor) GetAlerts(ctx context.Context, resolved bool) ([]*Alert, error) {
m.mutex.RLock()
defer m.mutex.RUnlock()
alerts := make([]*Alert, 0)
for _, alert := range m.alerts {
if alert.Resolved == resolved {
alerts = append(alerts, alert)
}
}
return alerts, nil
}
// ResolveAlert 解决告警
func (m *monitor) ResolveAlert(ctx context.Context, alertID string) error {
m.mutex.Lock()
defer m.mutex.Unlock()
alert, exists := m.alerts[alertID]
if !exists {
return fmt.Errorf("alert not found: %s", alertID)
}
now := time.Now()
alert.Resolved = true
alert.ResolvedAt = &now
m.logger.Infof("Resolved alert: %s", alertID)
return nil
}
// HealthCheck 健康检查
func (m *monitor) HealthCheck(ctx context.Context) error {
m.mutex.RLock()
defer m.mutex.RUnlock()
// 检查基本指标
if m.metrics.TotalRequests > 0 {
errorRate := float64(m.metrics.ErrorRequests) / float64(m.metrics.TotalRequests)
if errorRate > m.alertThresholds["error_rate"] {
return fmt.Errorf("high error rate: %.2f%%", errorRate*100)
}
}
if m.metrics.AvgResponseTime > m.alertThresholds["response_time"] {
return fmt.Errorf("high response time: %.2fms", m.metrics.AvgResponseTime)
}
return nil
}
// ResetMetrics 重置指标
func (m *monitor) ResetMetrics(ctx context.Context) error {
m.mutex.Lock()
defer m.mutex.Unlock()
m.metrics = &Metrics{
RequestsByType: make(map[string]int64),
ResponseTimes: make([]float64, 0),
LastUpdated: time.Now(),
}
m.logger.Info("Reset all metrics")
return nil
}
// RecordLLMUsage 记录一次LLM使用事件仅日志与轻量统计不做复杂Trace与周期上报
func (m *monitor) RecordLLMUsage(ctx context.Context, usage *LLMUsage) error {
if usage == nil {
return nil
}
m.mutex.Lock()
// 轻量统计借用AIRequests/AIErrors与AIAvgResponseTime避免新增复杂指标结构
m.metrics.AIRequests++
if usage.LatencyMS > 0 {
durMs := float64(usage.LatencyMS)
if m.metrics.AIRequests == 1 {
m.metrics.AIAvgResponseTime = durMs
} else {
m.metrics.AIAvgResponseTime = (m.metrics.AIAvgResponseTime*float64(m.metrics.AIRequests-1) + durMs) / float64(m.metrics.AIRequests)
}
}
m.metrics.LastUpdated = time.Now()
m.mutex.Unlock()
// 记录日志,便于快速检索使用情况
m.logger.WithContext(ctx).Infof("LLM usage | model=%s session=%s user=%s prompt_tokens=%d completion_tokens=%d total_tokens=%d latency_ms=%d intent=%s knowledge_hits=%d",
usage.Model, usage.SessionID, usage.UserID, usage.PromptTokens, usage.CompletionTokens, usage.TotalTokens, usage.LatencyMS, usage.AgentThought, usage.KnowledgeHits,
)
return nil
}
// updateResponseTimeStats 更新响应时间统计
func (m *monitor) updateResponseTimeStats() {
if len(m.metrics.ResponseTimes) == 0 {
return
}
var sum, min, max float64
min = m.metrics.ResponseTimes[0]
max = m.metrics.ResponseTimes[0]
for _, rt := range m.metrics.ResponseTimes {
sum += rt
if rt < min {
min = rt
}
if rt > max {
max = rt
}
}
m.metrics.AvgResponseTime = sum / float64(len(m.metrics.ResponseTimes))
m.metrics.MinResponseTime = min
m.metrics.MaxResponseTime = max
}
// checkAlerts 检查告警条件
func (m *monitor) checkAlerts() {
// 检查错误率
if m.metrics.TotalRequests > 10 { // 至少有10个请求才检查
errorRate := float64(m.metrics.ErrorRequests) / float64(m.metrics.TotalRequests)
if errorRate > m.alertThresholds["error_rate"] {
m.createErrorRateAlert(errorRate)
}
}
// 检查响应时间
if m.metrics.AvgResponseTime > m.alertThresholds["response_time"] {
m.createResponseTimeAlert(m.metrics.AvgResponseTime)
}
// 检查AI错误率
if m.metrics.AIRequests > 5 {
aiErrorRate := float64(m.metrics.AIErrors) / float64(m.metrics.AIRequests)
if aiErrorRate > m.alertThresholds["ai_error_rate"] {
m.createAIErrorRateAlert(aiErrorRate)
}
}
}
// createErrorRateAlert 创建错误率告警
func (m *monitor) createErrorRateAlert(errorRate float64) {
alertID := fmt.Sprintf("error_rate_%d", time.Now().Unix())
if _, exists := m.alerts[alertID]; exists {
return // 避免重复告警
}
alert := &Alert{
ID: alertID,
Type: "error_rate",
Level: "high",
Title: "高错误率告警",
Message: fmt.Sprintf("当前错误率: %.2f%%, 超过阈值: %.2f%%", errorRate*100, m.alertThresholds["error_rate"]*100),
Metadata: map[string]interface{}{"error_rate": errorRate, "threshold": m.alertThresholds["error_rate"]},
Timestamp: time.Now(),
Resolved: false,
}
m.alerts[alertID] = alert
}
// createResponseTimeAlert 创建响应时间告警
func (m *monitor) createResponseTimeAlert(responseTime float64) {
alertID := fmt.Sprintf("response_time_%d", time.Now().Unix())
if _, exists := m.alerts[alertID]; exists {
return
}
alert := &Alert{
ID: alertID,
Type: "response_time",
Level: "medium",
Title: "高响应时间告警",
Message: fmt.Sprintf("当前平均响应时间: %.2fms, 超过阈值: %.2fms", responseTime, m.alertThresholds["response_time"]),
Metadata: map[string]interface{}{"response_time": responseTime, "threshold": m.alertThresholds["response_time"]},
Timestamp: time.Now(),
Resolved: false,
}
m.alerts[alertID] = alert
}
// createAIErrorRateAlert 创建AI错误率告警
func (m *monitor) createAIErrorRateAlert(errorRate float64) {
alertID := fmt.Sprintf("ai_error_rate_%d", time.Now().Unix())
if _, exists := m.alerts[alertID]; exists {
return
}
alert := &Alert{
ID: alertID,
Type: "ai_error_rate",
Level: "high",
Title: "AI服务高错误率告警",
Message: fmt.Sprintf("AI服务错误率: %.2f%%, 超过阈值: %.2f%%", errorRate*100, m.alertThresholds["ai_error_rate"]*100),
Metadata: map[string]interface{}{"ai_error_rate": errorRate, "threshold": m.alertThresholds["ai_error_rate"]},
Timestamp: time.Now(),
Resolved: false,
}
m.alerts[alertID] = alert
}