98 lines
2.3 KiB
Go
98 lines
2.3 KiB
Go
package handlers
|
||
|
||
import (
|
||
"net/http"
|
||
"time"
|
||
|
||
"github.com/gin-gonic/gin"
|
||
|
||
"qr-scanner/models"
|
||
"qr-scanner/services"
|
||
)
|
||
|
||
type ProgressHandler struct {
|
||
store *services.TaskStore
|
||
}
|
||
|
||
func NewProgressHandler(store *services.TaskStore) *ProgressHandler {
|
||
return &ProgressHandler{store: store}
|
||
}
|
||
|
||
func (h *ProgressHandler) GetProgress(c *gin.Context) {
|
||
taskID := c.Param("taskID")
|
||
task, ok := h.store.Get(taskID)
|
||
if !ok {
|
||
fail(c, http.StatusNotFound, "任务不存在")
|
||
return
|
||
}
|
||
if task.Status == services.TaskExpired {
|
||
fail(c, http.StatusGone, "任务已过期")
|
||
return
|
||
}
|
||
|
||
var update models.ProgressUpdate
|
||
if task.Progress != nil {
|
||
update = task.Progress.Snapshot(string(task.Status))
|
||
} else {
|
||
update = models.ProgressUpdate{Total: len(task.Files), Status: string(task.Status), UpdatedAt: time.Now()}
|
||
}
|
||
respondOK(c, update)
|
||
}
|
||
|
||
func (h *ProgressHandler) StreamProgress(c *gin.Context) {
|
||
/*
|
||
SSE 进度推送:
|
||
- 连接建立后先发送一次快照,避免前端空白
|
||
- progress 内部用带缓冲 channel 做“最新进度覆盖”,避免慢客户端拖垮服务
|
||
- 任务达到终态(completed/canceled/failed)后主动结束 SSE
|
||
*/
|
||
taskID := c.Param("taskID")
|
||
task, ok := h.store.Get(taskID)
|
||
if !ok {
|
||
fail(c, http.StatusNotFound, "任务不存在")
|
||
return
|
||
}
|
||
if task.Status == services.TaskExpired {
|
||
fail(c, http.StatusGone, "任务已过期")
|
||
return
|
||
}
|
||
if task.Progress == nil {
|
||
fail(c, http.StatusConflict, "任务尚未开始")
|
||
return
|
||
}
|
||
|
||
c.Writer.Header().Set("Content-Type", "text/event-stream")
|
||
c.Writer.Header().Set("Cache-Control", "no-cache")
|
||
c.Writer.Header().Set("Connection", "keep-alive")
|
||
|
||
flusher, ok := c.Writer.(http.Flusher)
|
||
if !ok {
|
||
fail(c, http.StatusInternalServerError, "SSE不支持")
|
||
return
|
||
}
|
||
|
||
updateCh := make(chan models.ProgressUpdate, 16)
|
||
task.Progress.AddListener(updateCh)
|
||
defer task.Progress.RemoveListener(updateCh)
|
||
|
||
initial := task.Progress.Snapshot(string(task.Status))
|
||
_ = services.SSEWrite(c.Writer, initial)
|
||
flusher.Flush()
|
||
|
||
for {
|
||
select {
|
||
case <-c.Request.Context().Done():
|
||
return
|
||
case update, ok := <-updateCh:
|
||
if !ok {
|
||
return
|
||
}
|
||
_ = services.SSEWrite(c.Writer, update)
|
||
flusher.Flush()
|
||
if update.Status == string(services.TaskCompleted) || update.Status == string(services.TaskCanceled) || update.Status == string(services.TaskFailed) {
|
||
return
|
||
}
|
||
}
|
||
}
|
||
}
|