236 lines
5.9 KiB
Go
236 lines
5.9 KiB
Go
package services
|
||
|
||
import (
|
||
"context"
|
||
"errors"
|
||
"runtime"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
|
||
"qr-scanner/models"
|
||
"qr-scanner/utils"
|
||
)
|
||
|
||
type Scanner struct {
|
||
store *TaskStore
|
||
debugDelay time.Duration
|
||
}
|
||
|
||
func NewScanner(store *TaskStore, debugDelay time.Duration) *Scanner {
|
||
return &Scanner{store: store, debugDelay: debugDelay}
|
||
}
|
||
|
||
func (s *Scanner) Start(task *Task) error {
|
||
/*
|
||
Start 只负责把任务从 uploaded 推进到 processing,并异步启动 worker。
|
||
为了让 HTTP handler 迅速返回,真正的 CPU/IO 工作放在 run() 里执行。
|
||
*/
|
||
if task == nil {
|
||
return errors.New("task is nil")
|
||
}
|
||
if task.Status == TaskProcessing {
|
||
return nil
|
||
}
|
||
if task.Status == TaskCompleted || task.Status == TaskCanceled || task.Status == TaskFailed {
|
||
return nil
|
||
}
|
||
|
||
if task.Progress == nil {
|
||
task.Progress = NewProgress(len(task.Files))
|
||
}
|
||
|
||
task.Status = TaskProcessing
|
||
task.UpdatedAt = time.Now()
|
||
|
||
ctx, cancel := context.WithCancel(context.Background())
|
||
task.SetCancel(cancel)
|
||
s.store.Put(task)
|
||
|
||
/*
|
||
并发数:优先使用 scan 请求传入的 Concurrency(由 handler 写入 task),
|
||
兜底为 runtime.NumCPU(),并且不会超过任务文件数,避免空转 goroutine。
|
||
*/
|
||
workers := task.Concurrency
|
||
if workers <= 0 {
|
||
workers = runtime.NumCPU()
|
||
}
|
||
if workers > len(task.Files) && len(task.Files) > 0 {
|
||
workers = len(task.Files)
|
||
}
|
||
if workers <= 0 {
|
||
workers = 1
|
||
}
|
||
|
||
/*
|
||
单张超时:每张图片的解码被包在 context timeout 里,
|
||
避免极端图片导致任务整体卡死。
|
||
*/
|
||
timeout := time.Duration(task.TimeoutS) * time.Second
|
||
if task.TimeoutS <= 0 {
|
||
timeout = 0
|
||
}
|
||
|
||
go func() {
|
||
s.run(ctx, task, workers, timeout)
|
||
}()
|
||
|
||
return nil
|
||
}
|
||
|
||
func (s *Scanner) run(ctx context.Context, task *Task, workers int, timeout time.Duration) {
|
||
/*
|
||
run 使用“任务内 worker 池”处理图片列表:
|
||
- jobs:按顺序投递 TaskFile
|
||
- workers:并发消费 jobs,识别二维码并回写 progress
|
||
- ctx:用于取消任务;取消后尽快停止投递/停止 worker
|
||
*/
|
||
jobs := make(chan TaskFile)
|
||
var wg sync.WaitGroup
|
||
|
||
for i := 0; i < workers; i++ {
|
||
wg.Add(1)
|
||
go func() {
|
||
defer wg.Done()
|
||
for tf := range jobs {
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
default:
|
||
}
|
||
|
||
/*
|
||
Current 字段用于 UI 显示“当前处理文件”。
|
||
这里写入的是“压缩包内相对路径”或“上传文件名”,不会暴露服务器绝对路径。
|
||
*/
|
||
task.Progress.SetCurrent(tf.RelPath)
|
||
|
||
report := func(r models.ScanResult) {
|
||
if s.debugDelay > 0 {
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
case <-time.After(s.debugDelay):
|
||
}
|
||
}
|
||
task.Progress.Update(r, string(task.Status))
|
||
}
|
||
|
||
contents, err := utils.DecodeFile(ctx, tf.AbsPath, timeout)
|
||
res := models.ScanResult{
|
||
Index: tf.Index,
|
||
FilePath: tf.RelPath,
|
||
Contents: nil,
|
||
Success: false,
|
||
ProcessedAt: time.Now(),
|
||
}
|
||
|
||
if err == nil && len(contents) > 0 {
|
||
/*
|
||
业务规则:单张图片可能含多个二维码,DecodeFile 返回 []string。
|
||
这里做两件事:
|
||
1) 去空、去首尾空白
|
||
2) 如果识别到 http/https,则做 URL 基本格式校验
|
||
*/
|
||
validated := make([]string, 0, len(contents))
|
||
hasInvalidURL := false
|
||
for _, c := range contents {
|
||
c = strings.TrimSpace(c)
|
||
if c == "" {
|
||
continue
|
||
}
|
||
if strings.HasPrefix(c, "http://") || strings.HasPrefix(c, "https://") {
|
||
if !utils.IsHTTPURL(c) {
|
||
hasInvalidURL = true
|
||
continue
|
||
}
|
||
}
|
||
validated = append(validated, c)
|
||
}
|
||
if len(validated) > 0 {
|
||
res.Contents = validated
|
||
if hasInvalidURL {
|
||
/*
|
||
URL 校验策略(更严格):
|
||
- 如果识别结果包含非法 URL,则该图片标记为失败(便于前端/导出明确提示)。
|
||
- 同时保留 validated 中的有效内容,便于后续人工处理(Excel 中可看到)。
|
||
*/
|
||
res.Success = false
|
||
res.ErrorCode = "E_URL_INVALID"
|
||
res.ErrorMessage = "识别结果包含非法URL"
|
||
report(res)
|
||
continue
|
||
}
|
||
res.Success = true
|
||
report(res)
|
||
continue
|
||
}
|
||
if hasInvalidURL {
|
||
res.ErrorCode = "E_URL_INVALID"
|
||
res.ErrorMessage = "识别到疑似URL但格式不合法"
|
||
report(res)
|
||
continue
|
||
}
|
||
}
|
||
|
||
if err != nil {
|
||
/*
|
||
错误码归一化:便于前端展示“失败原因”并导出到 Excel。
|
||
不要把底层库的原始错误信息直接暴露给用户(信息噪音大且不稳定)。
|
||
*/
|
||
if errors.Is(err, context.DeadlineExceeded) {
|
||
res.ErrorCode = "E_TIMEOUT"
|
||
res.ErrorMessage = "处理超时"
|
||
} else if errors.Is(err, utils.ErrImageDecode) {
|
||
res.ErrorCode = "E_IMG_DECODE"
|
||
res.ErrorMessage = "图片无法解析"
|
||
} else if errors.Is(err, utils.ErrQRCodeNotFound) {
|
||
res.ErrorCode = "E_QR_NOT_FOUND"
|
||
res.ErrorMessage = "未检测到二维码"
|
||
} else {
|
||
res.ErrorCode = "E_QR_DECODE_FAIL"
|
||
res.ErrorMessage = "二维码解码失败"
|
||
}
|
||
} else if res.ErrorMessage == "" {
|
||
res.ErrorCode = "E_QR_NOT_FOUND"
|
||
res.ErrorMessage = "未检测到二维码"
|
||
}
|
||
report(res)
|
||
}
|
||
}()
|
||
}
|
||
|
||
go func() {
|
||
/*
|
||
投递线程:按 files 的顺序把任务文件发到 jobs。
|
||
一旦取消任务,停止继续投递。
|
||
*/
|
||
defer close(jobs)
|
||
for _, tf := range task.Files {
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
case jobs <- tf:
|
||
}
|
||
}
|
||
}()
|
||
|
||
wg.Wait()
|
||
|
||
/*
|
||
任务结束状态:worker 全部退出后,根据 ctx 是否被取消决定 completed/canceled。
|
||
此处完成最终一次 progress 推送,便于 SSE 客户端收敛到终态并跳转结果页。
|
||
*/
|
||
select {
|
||
case <-ctx.Done():
|
||
task.Status = TaskCanceled
|
||
default:
|
||
task.Status = TaskCompleted
|
||
}
|
||
|
||
task.EndedAt = time.Now()
|
||
task.UpdatedAt = time.Now()
|
||
task.Progress.Complete(string(task.Status))
|
||
s.store.Put(task)
|
||
}
|