527 lines
15 KiB
Go
527 lines
15 KiB
Go
package monitor
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"sync"
|
||
"time"
|
||
|
||
"github.com/go-kratos/kratos/v2/log"
|
||
)
|
||
|
||
// Metrics 监控指标
|
||
type Metrics struct {
|
||
// 请求统计
|
||
TotalRequests int64 `json:"total_requests"`
|
||
SuccessRequests int64 `json:"success_requests"`
|
||
ErrorRequests int64 `json:"error_requests"`
|
||
RequestsByType map[string]int64 `json:"requests_by_type"`
|
||
|
||
// 响应时间统计
|
||
AvgResponseTime float64 `json:"avg_response_time_ms"`
|
||
MaxResponseTime float64 `json:"max_response_time_ms"`
|
||
MinResponseTime float64 `json:"min_response_time_ms"`
|
||
ResponseTimes []float64 `json:"-"` // 不序列化,用于计算
|
||
|
||
// AI服务统计
|
||
AIRequests int64 `json:"ai_requests"`
|
||
AIErrors int64 `json:"ai_errors"`
|
||
AIAvgResponseTime float64 `json:"ai_avg_response_time_ms"`
|
||
|
||
// 向量数据库统计
|
||
VectorSearches int64 `json:"vector_searches"`
|
||
VectorInserts int64 `json:"vector_inserts"`
|
||
VectorErrors int64 `json:"vector_errors"`
|
||
|
||
// 会话统计
|
||
ActiveSessions int64 `json:"active_sessions"`
|
||
TotalSessions int64 `json:"total_sessions"`
|
||
|
||
// 知识库统计
|
||
KnowledgeQueries int64 `json:"knowledge_queries"`
|
||
DocumentsUploaded int64 `json:"documents_uploaded"`
|
||
|
||
// 系统资源
|
||
MemoryUsage float64 `json:"memory_usage_mb"`
|
||
CPUUsage float64 `json:"cpu_usage_percent"`
|
||
|
||
// 时间戳
|
||
LastUpdated time.Time `json:"last_updated"`
|
||
}
|
||
|
||
// Alert 告警信息
|
||
type Alert struct {
|
||
ID string `json:"id"`
|
||
Type string `json:"type"` // error, warning, info
|
||
Level string `json:"level"` // critical, high, medium, low
|
||
Title string `json:"title"`
|
||
Message string `json:"message"`
|
||
Metadata map[string]interface{} `json:"metadata"`
|
||
Timestamp time.Time `json:"timestamp"`
|
||
Resolved bool `json:"resolved"`
|
||
ResolvedAt *time.Time `json:"resolved_at,omitempty"`
|
||
}
|
||
|
||
// Monitor 监控器接口
|
||
type Monitor interface {
|
||
// 记录请求
|
||
RecordRequest(ctx context.Context, requestType string, duration time.Duration, success bool) error
|
||
|
||
// 记录AI请求
|
||
RecordAIRequest(ctx context.Context, duration time.Duration, success bool) error
|
||
|
||
// 记录向量操作
|
||
RecordVectorOperation(ctx context.Context, operation string, success bool) error
|
||
|
||
// 记录会话操作
|
||
RecordSessionOperation(ctx context.Context, operation string) error
|
||
|
||
// 记录知识库操作
|
||
RecordKnowledgeOperation(ctx context.Context, operation string) error
|
||
|
||
// 获取指标
|
||
GetMetrics(ctx context.Context) (*Metrics, error)
|
||
|
||
// 创建告警
|
||
CreateAlert(ctx context.Context, alertType, level, title, message string, metadata map[string]interface{}) error
|
||
|
||
// 获取告警列表
|
||
GetAlerts(ctx context.Context, resolved bool) ([]*Alert, error)
|
||
|
||
// 解决告警
|
||
ResolveAlert(ctx context.Context, alertID string) error
|
||
|
||
// 健康检查
|
||
HealthCheck(ctx context.Context) error
|
||
|
||
// 重置指标
|
||
ResetMetrics(ctx context.Context) error
|
||
|
||
// 记录LLM使用情况(轻量级,无需Trace/周期性上报)
|
||
RecordLLMUsage(ctx context.Context, usage *LLMUsage) error
|
||
}
|
||
|
||
// monitor 监控器实现
|
||
type monitor struct {
|
||
metrics *Metrics
|
||
alerts map[string]*Alert
|
||
mutex sync.RWMutex
|
||
logger *log.Helper
|
||
|
||
// 配置
|
||
maxResponseTimes int
|
||
alertThresholds map[string]float64
|
||
}
|
||
|
||
// LLMUsage 轻量化的LLM使用事件,用于统计模型使用、token消耗、时延等
|
||
type LLMUsage struct {
|
||
Model string `json:"model"`
|
||
SessionID string `json:"session_id"`
|
||
UserID string `json:"user_id"`
|
||
PromptPreview string `json:"prompt_preview"`
|
||
PromptTokens int `json:"prompt_tokens"`
|
||
CompletionTokens int `json:"completion_tokens"`
|
||
TotalTokens int `json:"total_tokens"`
|
||
LatencyMS int64 `json:"latency_ms"`
|
||
AgentThought string `json:"agent_thought"` // 例如意图分析或检索摘要
|
||
KnowledgeHits int `json:"knowledge_hits"`
|
||
KnowledgeContextBytes int `json:"knowledge_context_bytes"`
|
||
Metadata map[string]string `json:"metadata"`
|
||
Timestamp time.Time `json:"timestamp"`
|
||
}
|
||
|
||
// NewMonitor 创建监控器
|
||
func NewMonitor(logger log.Logger) Monitor {
|
||
return &monitor{
|
||
metrics: &Metrics{
|
||
RequestsByType: make(map[string]int64),
|
||
ResponseTimes: make([]float64, 0),
|
||
LastUpdated: time.Now(),
|
||
},
|
||
alerts: make(map[string]*Alert),
|
||
mutex: sync.RWMutex{},
|
||
logger: log.NewHelper(logger),
|
||
maxResponseTimes: 1000, // 保留最近1000次响应时间
|
||
alertThresholds: map[string]float64{
|
||
"error_rate": 0.05, // 5%错误率
|
||
"response_time": 5000, // 5秒响应时间
|
||
"ai_error_rate": 0.10, // 10%AI错误率
|
||
},
|
||
}
|
||
}
|
||
|
||
// RecordRequest 记录请求
|
||
func (m *monitor) RecordRequest(ctx context.Context, requestType string, duration time.Duration, success bool) error {
|
||
m.mutex.Lock()
|
||
defer m.mutex.Unlock()
|
||
|
||
m.metrics.TotalRequests++
|
||
m.metrics.RequestsByType[requestType]++
|
||
|
||
if success {
|
||
m.metrics.SuccessRequests++
|
||
} else {
|
||
m.metrics.ErrorRequests++
|
||
}
|
||
|
||
// 记录响应时间
|
||
durationMs := float64(duration.Nanoseconds()) / 1e6
|
||
m.metrics.ResponseTimes = append(m.metrics.ResponseTimes, durationMs)
|
||
|
||
// 限制响应时间数组大小
|
||
if len(m.metrics.ResponseTimes) > m.maxResponseTimes {
|
||
m.metrics.ResponseTimes = m.metrics.ResponseTimes[1:]
|
||
}
|
||
|
||
// 更新响应时间统计
|
||
m.updateResponseTimeStats()
|
||
|
||
// 检查告警条件
|
||
m.checkAlerts()
|
||
|
||
m.metrics.LastUpdated = time.Now()
|
||
|
||
m.logger.Infof("Recorded request: type=%s, duration=%.2fms, success=%v", requestType, durationMs, success)
|
||
|
||
return nil
|
||
}
|
||
|
||
// RecordAIRequest 记录AI请求
|
||
func (m *monitor) RecordAIRequest(ctx context.Context, duration time.Duration, success bool) error {
|
||
m.mutex.Lock()
|
||
defer m.mutex.Unlock()
|
||
|
||
m.metrics.AIRequests++
|
||
|
||
if !success {
|
||
m.metrics.AIErrors++
|
||
}
|
||
|
||
// 更新AI平均响应时间
|
||
durationMs := float64(duration.Nanoseconds()) / 1e6
|
||
if m.metrics.AIRequests == 1 {
|
||
m.metrics.AIAvgResponseTime = durationMs
|
||
} else {
|
||
m.metrics.AIAvgResponseTime = (m.metrics.AIAvgResponseTime*float64(m.metrics.AIRequests-1) + durationMs) / float64(m.metrics.AIRequests)
|
||
}
|
||
|
||
m.metrics.LastUpdated = time.Now()
|
||
|
||
m.logger.Infof("Recorded AI request: duration=%.2fms, success=%v", durationMs, success)
|
||
|
||
return nil
|
||
}
|
||
|
||
// RecordVectorOperation 记录向量操作
|
||
func (m *monitor) RecordVectorOperation(ctx context.Context, operation string, success bool) error {
|
||
m.mutex.Lock()
|
||
defer m.mutex.Unlock()
|
||
|
||
switch operation {
|
||
case "search":
|
||
m.metrics.VectorSearches++
|
||
case "insert":
|
||
m.metrics.VectorInserts++
|
||
}
|
||
|
||
if !success {
|
||
m.metrics.VectorErrors++
|
||
}
|
||
|
||
m.metrics.LastUpdated = time.Now()
|
||
|
||
m.logger.Infof("Recorded vector operation: operation=%s, success=%v", operation, success)
|
||
|
||
return nil
|
||
}
|
||
|
||
// RecordSessionOperation 记录会话操作
|
||
func (m *monitor) RecordSessionOperation(ctx context.Context, operation string) error {
|
||
m.mutex.Lock()
|
||
defer m.mutex.Unlock()
|
||
|
||
switch operation {
|
||
case "create":
|
||
m.metrics.TotalSessions++
|
||
m.metrics.ActiveSessions++
|
||
case "close":
|
||
m.metrics.ActiveSessions--
|
||
if m.metrics.ActiveSessions < 0 {
|
||
m.metrics.ActiveSessions = 0
|
||
}
|
||
}
|
||
|
||
m.metrics.LastUpdated = time.Now()
|
||
|
||
m.logger.Infof("Recorded session operation: operation=%s", operation)
|
||
|
||
return nil
|
||
}
|
||
|
||
// RecordKnowledgeOperation 记录知识库操作
|
||
func (m *monitor) RecordKnowledgeOperation(ctx context.Context, operation string) error {
|
||
m.mutex.Lock()
|
||
defer m.mutex.Unlock()
|
||
|
||
switch operation {
|
||
case "query":
|
||
m.metrics.KnowledgeQueries++
|
||
case "upload":
|
||
m.metrics.DocumentsUploaded++
|
||
}
|
||
|
||
m.metrics.LastUpdated = time.Now()
|
||
|
||
m.logger.Infof("Recorded knowledge operation: operation=%s", operation)
|
||
|
||
return nil
|
||
}
|
||
|
||
// GetMetrics 获取指标
|
||
func (m *monitor) GetMetrics(ctx context.Context) (*Metrics, error) {
|
||
m.mutex.RLock()
|
||
defer m.mutex.RUnlock()
|
||
|
||
// 创建副本以避免并发修改
|
||
metricsCopy := *m.metrics
|
||
metricsCopy.RequestsByType = make(map[string]int64)
|
||
for k, v := range m.metrics.RequestsByType {
|
||
metricsCopy.RequestsByType[k] = v
|
||
}
|
||
|
||
return &metricsCopy, nil
|
||
}
|
||
|
||
// CreateAlert 创建告警
|
||
func (m *monitor) CreateAlert(ctx context.Context, alertType, level, title, message string, metadata map[string]interface{}) error {
|
||
m.mutex.Lock()
|
||
defer m.mutex.Unlock()
|
||
|
||
alertID := fmt.Sprintf("%s_%d", alertType, time.Now().UnixNano())
|
||
|
||
alert := &Alert{
|
||
ID: alertID,
|
||
Type: alertType,
|
||
Level: level,
|
||
Title: title,
|
||
Message: message,
|
||
Metadata: metadata,
|
||
Timestamp: time.Now(),
|
||
Resolved: false,
|
||
}
|
||
|
||
m.alerts[alertID] = alert
|
||
|
||
m.logger.Warnf("Created alert: type=%s, level=%s, title=%s", alertType, level, title)
|
||
|
||
return nil
|
||
}
|
||
|
||
// GetAlerts 获取告警列表
|
||
func (m *monitor) GetAlerts(ctx context.Context, resolved bool) ([]*Alert, error) {
|
||
m.mutex.RLock()
|
||
defer m.mutex.RUnlock()
|
||
|
||
alerts := make([]*Alert, 0)
|
||
for _, alert := range m.alerts {
|
||
if alert.Resolved == resolved {
|
||
alerts = append(alerts, alert)
|
||
}
|
||
}
|
||
|
||
return alerts, nil
|
||
}
|
||
|
||
// ResolveAlert 解决告警
|
||
func (m *monitor) ResolveAlert(ctx context.Context, alertID string) error {
|
||
m.mutex.Lock()
|
||
defer m.mutex.Unlock()
|
||
|
||
alert, exists := m.alerts[alertID]
|
||
if !exists {
|
||
return fmt.Errorf("alert not found: %s", alertID)
|
||
}
|
||
|
||
now := time.Now()
|
||
alert.Resolved = true
|
||
alert.ResolvedAt = &now
|
||
|
||
m.logger.Infof("Resolved alert: %s", alertID)
|
||
|
||
return nil
|
||
}
|
||
|
||
// HealthCheck 健康检查
|
||
func (m *monitor) HealthCheck(ctx context.Context) error {
|
||
m.mutex.RLock()
|
||
defer m.mutex.RUnlock()
|
||
|
||
// 检查基本指标
|
||
if m.metrics.TotalRequests > 0 {
|
||
errorRate := float64(m.metrics.ErrorRequests) / float64(m.metrics.TotalRequests)
|
||
if errorRate > m.alertThresholds["error_rate"] {
|
||
return fmt.Errorf("high error rate: %.2f%%", errorRate*100)
|
||
}
|
||
}
|
||
|
||
if m.metrics.AvgResponseTime > m.alertThresholds["response_time"] {
|
||
return fmt.Errorf("high response time: %.2fms", m.metrics.AvgResponseTime)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// ResetMetrics 重置指标
|
||
func (m *monitor) ResetMetrics(ctx context.Context) error {
|
||
m.mutex.Lock()
|
||
defer m.mutex.Unlock()
|
||
|
||
m.metrics = &Metrics{
|
||
RequestsByType: make(map[string]int64),
|
||
ResponseTimes: make([]float64, 0),
|
||
LastUpdated: time.Now(),
|
||
}
|
||
|
||
m.logger.Info("Reset all metrics")
|
||
|
||
return nil
|
||
}
|
||
|
||
// RecordLLMUsage 记录一次LLM使用事件(仅日志与轻量统计,不做复杂Trace与周期上报)
|
||
func (m *monitor) RecordLLMUsage(ctx context.Context, usage *LLMUsage) error {
|
||
if usage == nil {
|
||
return nil
|
||
}
|
||
|
||
m.mutex.Lock()
|
||
// 轻量统计:借用AIRequests/AIErrors与AIAvgResponseTime,避免新增复杂指标结构
|
||
m.metrics.AIRequests++
|
||
if usage.LatencyMS > 0 {
|
||
durMs := float64(usage.LatencyMS)
|
||
if m.metrics.AIRequests == 1 {
|
||
m.metrics.AIAvgResponseTime = durMs
|
||
} else {
|
||
m.metrics.AIAvgResponseTime = (m.metrics.AIAvgResponseTime*float64(m.metrics.AIRequests-1) + durMs) / float64(m.metrics.AIRequests)
|
||
}
|
||
}
|
||
m.metrics.LastUpdated = time.Now()
|
||
m.mutex.Unlock()
|
||
|
||
// 记录日志,便于快速检索使用情况
|
||
m.logger.WithContext(ctx).Infof("LLM usage | model=%s session=%s user=%s prompt_tokens=%d completion_tokens=%d total_tokens=%d latency_ms=%d intent=%s knowledge_hits=%d",
|
||
usage.Model, usage.SessionID, usage.UserID, usage.PromptTokens, usage.CompletionTokens, usage.TotalTokens, usage.LatencyMS, usage.AgentThought, usage.KnowledgeHits,
|
||
)
|
||
return nil
|
||
}
|
||
|
||
// updateResponseTimeStats 更新响应时间统计
|
||
func (m *monitor) updateResponseTimeStats() {
|
||
if len(m.metrics.ResponseTimes) == 0 {
|
||
return
|
||
}
|
||
|
||
var sum, min, max float64
|
||
min = m.metrics.ResponseTimes[0]
|
||
max = m.metrics.ResponseTimes[0]
|
||
|
||
for _, rt := range m.metrics.ResponseTimes {
|
||
sum += rt
|
||
if rt < min {
|
||
min = rt
|
||
}
|
||
if rt > max {
|
||
max = rt
|
||
}
|
||
}
|
||
|
||
m.metrics.AvgResponseTime = sum / float64(len(m.metrics.ResponseTimes))
|
||
m.metrics.MinResponseTime = min
|
||
m.metrics.MaxResponseTime = max
|
||
}
|
||
|
||
// checkAlerts 检查告警条件
|
||
func (m *monitor) checkAlerts() {
|
||
// 检查错误率
|
||
if m.metrics.TotalRequests > 10 { // 至少有10个请求才检查
|
||
errorRate := float64(m.metrics.ErrorRequests) / float64(m.metrics.TotalRequests)
|
||
if errorRate > m.alertThresholds["error_rate"] {
|
||
m.createErrorRateAlert(errorRate)
|
||
}
|
||
}
|
||
|
||
// 检查响应时间
|
||
if m.metrics.AvgResponseTime > m.alertThresholds["response_time"] {
|
||
m.createResponseTimeAlert(m.metrics.AvgResponseTime)
|
||
}
|
||
|
||
// 检查AI错误率
|
||
if m.metrics.AIRequests > 5 {
|
||
aiErrorRate := float64(m.metrics.AIErrors) / float64(m.metrics.AIRequests)
|
||
if aiErrorRate > m.alertThresholds["ai_error_rate"] {
|
||
m.createAIErrorRateAlert(aiErrorRate)
|
||
}
|
||
}
|
||
}
|
||
|
||
// createErrorRateAlert 创建错误率告警
|
||
func (m *monitor) createErrorRateAlert(errorRate float64) {
|
||
alertID := fmt.Sprintf("error_rate_%d", time.Now().Unix())
|
||
if _, exists := m.alerts[alertID]; exists {
|
||
return // 避免重复告警
|
||
}
|
||
|
||
alert := &Alert{
|
||
ID: alertID,
|
||
Type: "error_rate",
|
||
Level: "high",
|
||
Title: "高错误率告警",
|
||
Message: fmt.Sprintf("当前错误率: %.2f%%, 超过阈值: %.2f%%", errorRate*100, m.alertThresholds["error_rate"]*100),
|
||
Metadata: map[string]interface{}{"error_rate": errorRate, "threshold": m.alertThresholds["error_rate"]},
|
||
Timestamp: time.Now(),
|
||
Resolved: false,
|
||
}
|
||
|
||
m.alerts[alertID] = alert
|
||
}
|
||
|
||
// createResponseTimeAlert 创建响应时间告警
|
||
func (m *monitor) createResponseTimeAlert(responseTime float64) {
|
||
alertID := fmt.Sprintf("response_time_%d", time.Now().Unix())
|
||
if _, exists := m.alerts[alertID]; exists {
|
||
return
|
||
}
|
||
|
||
alert := &Alert{
|
||
ID: alertID,
|
||
Type: "response_time",
|
||
Level: "medium",
|
||
Title: "高响应时间告警",
|
||
Message: fmt.Sprintf("当前平均响应时间: %.2fms, 超过阈值: %.2fms", responseTime, m.alertThresholds["response_time"]),
|
||
Metadata: map[string]interface{}{"response_time": responseTime, "threshold": m.alertThresholds["response_time"]},
|
||
Timestamp: time.Now(),
|
||
Resolved: false,
|
||
}
|
||
|
||
m.alerts[alertID] = alert
|
||
}
|
||
|
||
// createAIErrorRateAlert 创建AI错误率告警
|
||
func (m *monitor) createAIErrorRateAlert(errorRate float64) {
|
||
alertID := fmt.Sprintf("ai_error_rate_%d", time.Now().Unix())
|
||
if _, exists := m.alerts[alertID]; exists {
|
||
return
|
||
}
|
||
|
||
alert := &Alert{
|
||
ID: alertID,
|
||
Type: "ai_error_rate",
|
||
Level: "high",
|
||
Title: "AI服务高错误率告警",
|
||
Message: fmt.Sprintf("AI服务错误率: %.2f%%, 超过阈值: %.2f%%", errorRate*100, m.alertThresholds["ai_error_rate"]*100),
|
||
Metadata: map[string]interface{}{"ai_error_rate": errorRate, "threshold": m.alertThresholds["ai_error_rate"]},
|
||
Timestamp: time.Now(),
|
||
Resolved: false,
|
||
}
|
||
|
||
m.alerts[alertID] = alert
|
||
} |