ai-courseware/eino-project/internal/pkg/sseutil/sseutil.go

104 lines
5.3 KiB
Go

package sseutil
import (
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/gorilla/websocket"
)
type Content struct {
Chunk string `json:"chunk"`
FullMessage string `json:"full_message"`
IsFinal bool `json:"is_final"`
}
type Payload struct {
DataType string `json:"data_type"`
Content Content `json:"content"`
Component map[string]interface{} `json:"component,omitempty"`
}
type Response struct {
SessionID string `json:"session_id"`
Timestamp string `json:"timestamp"`
Type string `json:"type"`
Payload Payload `json:"payload"`
}
func ts() string { return time.Now().Format("2006-01-02 15:04:05") }
func write(w http.ResponseWriter, fl http.Flusher, r Response) {
b, _ := json.Marshal(r)
fmt.Fprintf(w, "data: %s\n\n", b)
fl.Flush()
}
func WriteLog(w http.ResponseWriter, fl http.Flusher, sessionID string, msg string) {
write(w, fl, Response{SessionID: sessionID, Timestamp: ts(), Type: "data", Payload: Payload{DataType: "thinking_process", Content: Content{Chunk: msg, FullMessage: "", IsFinal: false}}})
}
func WriteLoading(w http.ResponseWriter, fl http.Flusher, sessionID string, msg string) {
write(w, fl, Response{SessionID: sessionID, Timestamp: ts(), Type: "data", Payload: Payload{DataType: "thinking_process", Content: Content{Chunk: msg, FullMessage: "", IsFinal: false}}})
}
func WriteProcess(w http.ResponseWriter, fl http.Flusher, sessionID string, msg string, final bool) {
write(w, fl, Response{SessionID: sessionID, Timestamp: ts(), Type: "data", Payload: Payload{DataType: "thinking_process", Content: Content{Chunk: msg, FullMessage: "", IsFinal: final}}})
}
func WriteStreamChunk(w http.ResponseWriter, fl http.Flusher, sessionID string, chunk string) {
write(w, fl, Response{SessionID: sessionID, Timestamp: ts(), Type: "data", Payload: Payload{DataType: "general_chat_stream", Content: Content{Chunk: chunk, FullMessage: "", IsFinal: false}}})
}
func WriteStreamFinal(w http.ResponseWriter, fl http.Flusher, sessionID string, full string) {
write(w, fl, Response{SessionID: sessionID, Timestamp: ts(), Type: "data", Payload: Payload{DataType: "general_chat_stream", Content: Content{Chunk: "", FullMessage: full, IsFinal: true}}})
}
func WriteJson(w http.ResponseWriter, fl http.Flusher, sessionID string, component map[string]interface{}, final bool) {
write(w, fl, Response{SessionID: sessionID, Timestamp: ts(), Type: "data", Payload: Payload{DataType: "component_data", Content: Content{Chunk: "", FullMessage: "", IsFinal: final}, Component: component}})
}
func WriteError(w http.ResponseWriter, fl http.Flusher, sessionID string, errMsg string) {
write(w, fl, Response{SessionID: sessionID, Timestamp: ts(), Type: "error", Payload: Payload{DataType: "thinking_process", Content: Content{Chunk: "", FullMessage: errMsg, IsFinal: true}}})
}
func WriteDone(w http.ResponseWriter, fl http.Flusher, sessionID string) {
write(w, fl, Response{SessionID: sessionID, Timestamp: ts(), Type: "done", Payload: Payload{DataType: "thinking_process", Content: Content{Chunk: "", FullMessage: "", IsFinal: true}}})
}
// 构建(不写出)——用于 WebSocket 统一协议
func BuildLog(sessionID string, msg string) []byte {
b, _ := json.Marshal(Response{SessionID: sessionID, Timestamp: ts(), Type: "data", Payload: Payload{DataType: "thinking_process", Content: Content{Chunk: msg, FullMessage: "", IsFinal: false}}})
return b
}
func BuildProcess(sessionID string, msg string, final bool) []byte {
b, _ := json.Marshal(Response{SessionID: sessionID, Timestamp: ts(), Type: "data", Payload: Payload{DataType: "thinking_process", Content: Content{Chunk: msg, FullMessage: "", IsFinal: final}}})
return b
}
func BuildStreamChunk(sessionID string, chunk string) []byte {
b, _ := json.Marshal(Response{SessionID: sessionID, Timestamp: ts(), Type: "data", Payload: Payload{DataType: "general_chat_stream", Content: Content{Chunk: chunk, FullMessage: "", IsFinal: false}}})
return b
}
func BuildStreamFinal(sessionID string, full string) []byte {
b, _ := json.Marshal(Response{SessionID: sessionID, Timestamp: ts(), Type: "data", Payload: Payload{DataType: "general_chat_stream", Content: Content{Chunk: "", FullMessage: full, IsFinal: true}}})
return b
}
func BuildJson(sessionID string, component map[string]interface{}, final bool) []byte {
b, _ := json.Marshal(Response{SessionID: sessionID, Timestamp: ts(), Type: "data", Payload: Payload{DataType: "component_data", Content: Content{Chunk: "", FullMessage: "", IsFinal: final}, Component: component}})
return b
}
func BuildError(sessionID string, errMsg string) []byte {
b, _ := json.Marshal(Response{SessionID: sessionID, Timestamp: ts(), Type: "error", Payload: Payload{DataType: "thinking_process", Content: Content{Chunk: "", FullMessage: errMsg, IsFinal: true}}})
return b
}
func BuildDone(sessionID string) []byte {
b, _ := json.Marshal(Response{SessionID: sessionID, Timestamp: ts(), Type: "done", Payload: Payload{DataType: "thinking_process", Content: Content{Chunk: "", FullMessage: "", IsFinal: true}}})
return b
}
// WebSocket 工具类
func WSWriteJSON(conn *websocket.Conn, payload []byte) error {
return conn.WriteMessage(websocket.TextMessage, payload)
}