353 lines
10 KiB
Go
353 lines
10 KiB
Go
package manager
|
||
|
||
import (
|
||
"fmt"
|
||
"geo/internal/config"
|
||
"geo/internal/publisher"
|
||
"geo/pkg"
|
||
"io"
|
||
"log"
|
||
"os"
|
||
"path/filepath"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
|
||
"geo/utils"
|
||
)
|
||
|
||
type PublishManager struct {
|
||
AutoStatus bool
|
||
Conf *config.Config
|
||
TokenID int
|
||
running bool
|
||
mu sync.Mutex
|
||
stopCh chan struct{}
|
||
currentPublisher interface{}
|
||
db *utils.Db
|
||
}
|
||
|
||
var publishManager *PublishManager
|
||
var once sync.Once
|
||
|
||
func GetPublishManager(config *config.Config, db *utils.Db) *PublishManager {
|
||
once.Do(func() {
|
||
publishManager = &PublishManager{
|
||
AutoStatus: false,
|
||
Conf: config,
|
||
stopCh: make(chan struct{}),
|
||
db: db,
|
||
}
|
||
})
|
||
return publishManager
|
||
}
|
||
|
||
// getTaskLogger 获取任务专属日志记录器(同一个文件)
|
||
func (pm *PublishManager) getTaskLogger(requestID string) (*log.Logger, *os.File, error) {
|
||
// 确定日志目录
|
||
logsDir := pm.Conf.Sys.LogsDir
|
||
if logsDir == "" {
|
||
logsDir = "./logs"
|
||
}
|
||
|
||
// 按日期创建子目录
|
||
dateDir := time.Now().Format("2006-01-02")
|
||
taskLogDir := filepath.Join(logsDir, "tasks", dateDir)
|
||
if err := os.MkdirAll(taskLogDir, 0755); err != nil {
|
||
return nil, nil, fmt.Errorf("创建日志目录失败: %v", err)
|
||
}
|
||
|
||
// 创建以requestId命名的日志文件
|
||
logPath := filepath.Join(taskLogDir, fmt.Sprintf("%s.log", requestID))
|
||
|
||
logFile, err := os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
|
||
if err != nil {
|
||
return nil, nil, fmt.Errorf("创建日志文件失败: %v", err)
|
||
}
|
||
|
||
// 创建写入器:同时写入文件和标准输出
|
||
multiWriter := io.MultiWriter(logFile, os.Stdout)
|
||
|
||
// 创建专用的logger
|
||
taskLogger := log.New(multiWriter, "", log.LstdFlags|log.Lmicroseconds)
|
||
|
||
// 写入任务开始分隔线
|
||
taskLogger.Printf(strings.Repeat("=", 80))
|
||
taskLogger.Printf("任务开始 | RequestID: %s | 时间: %s", requestID, time.Now().Format("2006-01-02 15:04:05.000"))
|
||
taskLogger.Printf(strings.Repeat("=", 80))
|
||
|
||
return taskLogger, logFile, nil
|
||
}
|
||
|
||
func (pm *PublishManager) Start(tokenID int) bool {
|
||
pm.mu.Lock()
|
||
defer pm.mu.Unlock()
|
||
|
||
if pm.AutoStatus {
|
||
return false
|
||
}
|
||
|
||
pm.TokenID = tokenID
|
||
pm.AutoStatus = true
|
||
pm.stopCh = make(chan struct{})
|
||
|
||
go pm.autoPublishLoop()
|
||
return true
|
||
}
|
||
|
||
func (pm *PublishManager) Stop() bool {
|
||
pm.mu.Lock()
|
||
defer pm.mu.Unlock()
|
||
|
||
if !pm.AutoStatus {
|
||
return false
|
||
}
|
||
|
||
pm.AutoStatus = false
|
||
close(pm.stopCh)
|
||
return true
|
||
}
|
||
|
||
func (pm *PublishManager) autoPublishLoop() {
|
||
log.Println("自动发布服务已启动")
|
||
|
||
for {
|
||
select {
|
||
case <-pm.stopCh:
|
||
log.Println("自动发布服务已停止")
|
||
return
|
||
default:
|
||
pm.batchPublish()
|
||
time.Sleep(30 * time.Second)
|
||
}
|
||
}
|
||
}
|
||
|
||
func (pm *PublishManager) batchPublish() {
|
||
if !pm.AutoStatus {
|
||
return
|
||
}
|
||
|
||
publishData := pm.getPendingPublish()
|
||
if publishData == nil {
|
||
return
|
||
}
|
||
|
||
// 使用 defer recover 防止 panic 导致整个循环崩溃
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
log.Printf("批处理发布发生 panic: %v", r)
|
||
}
|
||
}()
|
||
|
||
pm.processSingleTask(publishData)
|
||
}
|
||
|
||
func (pm *PublishManager) getPendingPublish() map[string]interface{} {
|
||
currentTime := time.Now().Format("2006-01-02 15:04:05")
|
||
|
||
sql := `
|
||
SELECT p.*, pl.*
|
||
FROM publish p
|
||
INNER JOIN plat pl ON p.plat_index = pl.index AND pl.status = 1
|
||
WHERE p.token_id = ? AND p.status = 1 AND p.publish_time <= ?
|
||
ORDER BY p.publish_time DESC
|
||
LIMIT 1
|
||
`
|
||
|
||
result, err := pm.db.GetOne(sql, pm.TokenID, currentTime)
|
||
if err != nil {
|
||
log.Printf("查询待发布任务失败: token_id=%d, error=%v", pm.TokenID, err)
|
||
return nil
|
||
}
|
||
if result == nil {
|
||
log.Printf("没有待发布任务: token_id=%d, current_time=%s", pm.TokenID, currentTime)
|
||
return nil
|
||
}
|
||
|
||
requestID := pkg.GetString(result, "request_id")
|
||
log.Printf("获取到待发布任务: token_id=%d, request_id=%s", pm.TokenID, requestID)
|
||
return result
|
||
}
|
||
|
||
func (pm *PublishManager) GetTaskByRequestID(requestID string) (map[string]interface{}, error) {
|
||
sql := `
|
||
SELECT p.*, pl.*
|
||
FROM publish p
|
||
INNER JOIN plat pl ON p.plat_index COLLATE utf8mb4_unicode_ci = pl.index AND pl.status = 1
|
||
WHERE p.request_id = ?
|
||
`
|
||
return pm.db.GetOne(sql, requestID)
|
||
}
|
||
|
||
func (pm *PublishManager) processSingleTask(publishData map[string]interface{}) (result map[string]interface{}) {
|
||
requestID := pkg.GetString(publishData, "request_id")
|
||
|
||
// 获取任务专属日志(同一个文件)
|
||
taskLogger, logFile, err := pm.getTaskLogger(requestID)
|
||
if err != nil {
|
||
log.Printf("[任务 %s] 创建日志文件失败: %v,使用全局日志", requestID, err)
|
||
taskLogger = log.Default()
|
||
}
|
||
if logFile != nil {
|
||
defer logFile.Close()
|
||
}
|
||
|
||
// 全局defer用于捕获panic并记录到同一个日志文件
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
errMsg := fmt.Sprintf("任务执行发生panic: %v", r)
|
||
taskLogger.Printf("❌ CRITICAL: %s", errMsg)
|
||
taskLogger.Printf(strings.Repeat("=", 80))
|
||
taskLogger.Printf("任务异常结束 | RequestID: %s | 时间: %s", requestID, time.Now().Format("2006-01-02 15:04:05.000"))
|
||
taskLogger.Printf(strings.Repeat("=", 80))
|
||
result = map[string]interface{}{
|
||
"success": false,
|
||
"message": errMsg,
|
||
"request_id": requestID,
|
||
}
|
||
}
|
||
}()
|
||
|
||
taskLogger.Printf("[任务 %s] 开始处理", requestID)
|
||
|
||
platIndex := pkg.GetString(publishData, "plat_index")
|
||
title := pkg.GetString(publishData, "title")
|
||
tagRaw := pkg.GetString(publishData, "tag")
|
||
userIndex := pkg.GetString(publishData, "user_index")
|
||
url := pkg.GetString(publishData, "url")
|
||
imgURL := pkg.GetString(publishData, "img")
|
||
|
||
taskLogger.Printf("[任务 %s] 任务详情 - 平台:%s,标题:%s,用户:%s", requestID, platIndex, title, userIndex)
|
||
taskLogger.Printf("[任务 %s] 文档URL: %s", requestID, url)
|
||
taskLogger.Printf("[任务 %s] 图片URL: %s", requestID, imgURL)
|
||
|
||
// 更新状态为发布中
|
||
pm.updatePublishStatus(requestID, 2, "")
|
||
taskLogger.Printf("[任务 %s] 状态已更新为发布中", requestID)
|
||
|
||
// 下载文件
|
||
taskLogger.Printf("[任务 %s] 开始下载文档...", requestID)
|
||
docPath, err := pkg.DownloadFile(url, pm.Conf.Sys.DocsDir, requestID+".docx")
|
||
if err != nil {
|
||
errMsg := fmt.Sprintf("下载文档失败: %v", err)
|
||
taskLogger.Printf("[任务 %s] ❌ %s", requestID, errMsg)
|
||
pm.updatePublishStatus(requestID, 3, errMsg)
|
||
return map[string]interface{}{"success": false, "message": errMsg, "request_id": requestID}
|
||
}
|
||
|
||
defer func() {
|
||
if docPath != "" {
|
||
pkg.DeleteFile(docPath)
|
||
taskLogger.Printf("[任务 %s] 已删除文档文件: %s", requestID, docPath)
|
||
}
|
||
}()
|
||
taskLogger.Printf("[任务 %s] ✅ 文档下载成功: %s", requestID, docPath)
|
||
|
||
// 下载图片
|
||
taskLogger.Printf("[任务 %s] 开始下载图片...", requestID)
|
||
imgPath, err := pkg.DownloadImage(imgURL, requestID, pm.Conf.Sys.UploadDir)
|
||
|
||
defer func() {
|
||
if imgPath != "" {
|
||
pkg.DeleteFile(imgPath)
|
||
taskLogger.Printf("[任务 %s] 已删除图片文件: %s", requestID, imgPath)
|
||
}
|
||
}()
|
||
if err != nil {
|
||
errMsg := fmt.Sprintf("下载图片失败: %v", err)
|
||
taskLogger.Printf("[任务 %s] ❌ %s", requestID, errMsg)
|
||
pm.updatePublishStatus(requestID, 3, errMsg)
|
||
return map[string]interface{}{"success": false, "message": errMsg, "request_id": requestID}
|
||
}
|
||
taskLogger.Printf("[任务 %s] ✅ 图片下载成功: %s", requestID, imgPath)
|
||
|
||
// 解析标签
|
||
tags := pkg.ParseTags(tagRaw)
|
||
taskLogger.Printf("[任务 %s] 标签解析完成: %v", requestID, tags)
|
||
|
||
// 获取发布器
|
||
publisherClass := GetPublisherClass(platIndex)
|
||
if publisherClass == nil {
|
||
errMsg := fmt.Sprintf("不支持的平台: %s", platIndex)
|
||
taskLogger.Printf("[任务 %s] ❌ %s", requestID, errMsg)
|
||
pm.updatePublishStatus(requestID, 3, errMsg)
|
||
return map[string]interface{}{"success": false, "message": errMsg, "request_id": requestID}
|
||
}
|
||
|
||
// 提取内容
|
||
taskLogger.Printf("[任务 %s] 开始提取文档内容...", requestID)
|
||
var content string
|
||
if publisherClass.Type == 1 {
|
||
content, err = pkg.ExtractWordContent(docPath, publisherClass.ContentFormat)
|
||
if err != nil {
|
||
errMsg := fmt.Sprintf("提取文档内容失败: %v", err)
|
||
taskLogger.Printf("[任务 %s] ❌ %s", requestID, errMsg)
|
||
pm.updatePublishStatus(requestID, 3, errMsg)
|
||
return map[string]interface{}{"success": false, "message": errMsg, "request_id": requestID}
|
||
}
|
||
}
|
||
|
||
taskLogger.Printf("[任务 %s] ✅ 内容提取成功,长度: %d", requestID, len(content))
|
||
taskLogger.Printf("[任务 %s] 创建发布器...", requestID)
|
||
pub := publisherClass.InitMethod(false, title, content, tags, userIndex, platIndex, requestID, imgPath, docPath, publishData, pm.Conf, taskLogger)
|
||
taskLogger.Printf("[任务 %s] 创建%s发布器", publisherClass.Name, requestID)
|
||
taskLogger.Printf("[任务 %s] 开始执行发布...", requestID)
|
||
success, message := pub.PublishNote()
|
||
|
||
if success {
|
||
taskLogger.Printf("[任务 %s] ✅ 发布成功: %s", requestID, message)
|
||
pm.updatePublishStatus(requestID, 4, message)
|
||
} else {
|
||
taskLogger.Printf("[任务 %s] ❌ 发布失败: %s", requestID, message)
|
||
pm.updatePublishStatus(requestID, 3, message)
|
||
}
|
||
|
||
taskLogger.Printf(strings.Repeat("=", 80))
|
||
taskLogger.Printf("任务结束 | RequestID: %s | 结果: %v | 时间: %s", requestID, success, time.Now().Format("2006-01-02 15:04:05.000"))
|
||
taskLogger.Printf(strings.Repeat("=", 80))
|
||
|
||
return map[string]interface{}{
|
||
"success": success,
|
||
"message": message,
|
||
"request_id": requestID,
|
||
}
|
||
}
|
||
|
||
func (pm *PublishManager) updatePublishStatus(requestID string, status int, message string) {
|
||
if message != "" {
|
||
pm.db.Execute("UPDATE publish SET status = ?, msg = ? WHERE request_id = ?", status, message, requestID)
|
||
} else {
|
||
pm.db.Execute("UPDATE publish SET status = ? WHERE request_id = ?", status, requestID)
|
||
}
|
||
}
|
||
|
||
func (pm *PublishManager) ExecuteOnce(tokenId int32) map[string]interface{} {
|
||
publishData := pm.getPendingPublish()
|
||
if publishData == nil {
|
||
return map[string]interface{}{"success": false, "message": "没有待发布任务"}
|
||
}
|
||
return pm.processSingleTask(publishData)
|
||
}
|
||
|
||
func (pm *PublishManager) RetryTask(requestID string) map[string]interface{} {
|
||
publishData, err := pm.GetTaskByRequestID(requestID)
|
||
if err != nil || publishData == nil {
|
||
return map[string]interface{}{"success": false, "message": "任务不存在"}
|
||
}
|
||
return pm.processSingleTask(publishData)
|
||
}
|
||
|
||
func (pm *PublishManager) GetStatus() map[string]interface{} {
|
||
return map[string]interface{}{
|
||
"auto_status": pm.AutoStatus,
|
||
"max_concurrent": pm.Conf.Sys.MaxConcurrent,
|
||
"task_timeout": pm.Conf.Sys.TaskTimeout,
|
||
}
|
||
}
|
||
|
||
func GetPublisherClass(platIndex string) *publisher.PublisherValue {
|
||
|
||
return publisher.PublisherMap[platIndex]
|
||
}
|