geoGo/internal/manager/publish_manager.go

353 lines
10 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package manager
import (
"fmt"
"geo/internal/config"
"geo/internal/publisher"
"geo/pkg"
"io"
"log"
"os"
"path/filepath"
"strings"
"sync"
"time"
"geo/utils"
)
type PublishManager struct {
AutoStatus bool
Conf *config.Config
TokenID int
running bool
mu sync.Mutex
stopCh chan struct{}
currentPublisher interface{}
db *utils.Db
}
var publishManager *PublishManager
var once sync.Once
func GetPublishManager(config *config.Config, db *utils.Db) *PublishManager {
once.Do(func() {
publishManager = &PublishManager{
AutoStatus: false,
Conf: config,
stopCh: make(chan struct{}),
db: db,
}
})
return publishManager
}
// getTaskLogger 获取任务专属日志记录器(同一个文件)
func (pm *PublishManager) getTaskLogger(requestID string) (*log.Logger, *os.File, error) {
// 确定日志目录
logsDir := pm.Conf.Sys.LogsDir
if logsDir == "" {
logsDir = "./logs"
}
// 按日期创建子目录
dateDir := time.Now().Format("2006-01-02")
taskLogDir := filepath.Join(logsDir, "tasks", dateDir)
if err := os.MkdirAll(taskLogDir, 0755); err != nil {
return nil, nil, fmt.Errorf("创建日志目录失败: %v", err)
}
// 创建以requestId命名的日志文件
logPath := filepath.Join(taskLogDir, fmt.Sprintf("%s.log", requestID))
logFile, err := os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return nil, nil, fmt.Errorf("创建日志文件失败: %v", err)
}
// 创建写入器:同时写入文件和标准输出
multiWriter := io.MultiWriter(logFile, os.Stdout)
// 创建专用的logger
taskLogger := log.New(multiWriter, "", log.LstdFlags|log.Lmicroseconds)
// 写入任务开始分隔线
taskLogger.Printf(strings.Repeat("=", 80))
taskLogger.Printf("任务开始 | RequestID: %s | 时间: %s", requestID, time.Now().Format("2006-01-02 15:04:05.000"))
taskLogger.Printf(strings.Repeat("=", 80))
return taskLogger, logFile, nil
}
func (pm *PublishManager) Start(tokenID int) bool {
pm.mu.Lock()
defer pm.mu.Unlock()
if pm.AutoStatus {
return false
}
pm.TokenID = tokenID
pm.AutoStatus = true
pm.stopCh = make(chan struct{})
go pm.autoPublishLoop()
return true
}
func (pm *PublishManager) Stop() bool {
pm.mu.Lock()
defer pm.mu.Unlock()
if !pm.AutoStatus {
return false
}
pm.AutoStatus = false
close(pm.stopCh)
return true
}
func (pm *PublishManager) autoPublishLoop() {
log.Println("自动发布服务已启动")
for {
select {
case <-pm.stopCh:
log.Println("自动发布服务已停止")
return
default:
pm.batchPublish()
time.Sleep(30 * time.Second)
}
}
}
func (pm *PublishManager) batchPublish() {
if !pm.AutoStatus {
return
}
publishData := pm.getPendingPublish()
if publishData == nil {
return
}
// 使用 defer recover 防止 panic 导致整个循环崩溃
defer func() {
if r := recover(); r != nil {
log.Printf("批处理发布发生 panic: %v", r)
}
}()
pm.processSingleTask(publishData)
}
func (pm *PublishManager) getPendingPublish() map[string]interface{} {
currentTime := time.Now().Format("2006-01-02 15:04:05")
sql := `
SELECT p.*, pl.*
FROM publish p
INNER JOIN plat pl ON p.plat_index = pl.index AND pl.status = 1
WHERE p.token_id = ? AND p.status = 1 AND p.publish_time <= ?
ORDER BY p.publish_time DESC
LIMIT 1
`
result, err := pm.db.GetOne(sql, pm.TokenID, currentTime)
if err != nil {
log.Printf("查询待发布任务失败: token_id=%d, error=%v", pm.TokenID, err)
return nil
}
if result == nil {
log.Printf("没有待发布任务: token_id=%d, current_time=%s", pm.TokenID, currentTime)
return nil
}
requestID := pkg.GetString(result, "request_id")
log.Printf("获取到待发布任务: token_id=%d, request_id=%s", pm.TokenID, requestID)
return result
}
func (pm *PublishManager) GetTaskByRequestID(requestID string) (map[string]interface{}, error) {
sql := `
SELECT p.*, pl.*
FROM publish p
INNER JOIN plat pl ON p.plat_index COLLATE utf8mb4_unicode_ci = pl.index AND pl.status = 1
WHERE p.request_id = ?
`
return pm.db.GetOne(sql, requestID)
}
func (pm *PublishManager) processSingleTask(publishData map[string]interface{}) (result map[string]interface{}) {
requestID := pkg.GetString(publishData, "request_id")
// 获取任务专属日志(同一个文件)
taskLogger, logFile, err := pm.getTaskLogger(requestID)
if err != nil {
log.Printf("[任务 %s] 创建日志文件失败: %v使用全局日志", requestID, err)
taskLogger = log.Default()
}
if logFile != nil {
defer logFile.Close()
}
// 全局defer用于捕获panic并记录到同一个日志文件
defer func() {
if r := recover(); r != nil {
errMsg := fmt.Sprintf("任务执行发生panic: %v", r)
taskLogger.Printf("❌ CRITICAL: %s", errMsg)
taskLogger.Printf(strings.Repeat("=", 80))
taskLogger.Printf("任务异常结束 | RequestID: %s | 时间: %s", requestID, time.Now().Format("2006-01-02 15:04:05.000"))
taskLogger.Printf(strings.Repeat("=", 80))
result = map[string]interface{}{
"success": false,
"message": errMsg,
"request_id": requestID,
}
}
}()
taskLogger.Printf("[任务 %s] 开始处理", requestID)
platIndex := pkg.GetString(publishData, "plat_index")
title := pkg.GetString(publishData, "title")
tagRaw := pkg.GetString(publishData, "tag")
userIndex := pkg.GetString(publishData, "user_index")
url := pkg.GetString(publishData, "url")
imgURL := pkg.GetString(publishData, "img")
taskLogger.Printf("[任务 %s] 任务详情 - 平台:%s标题%s用户%s", requestID, platIndex, title, userIndex)
taskLogger.Printf("[任务 %s] 文档URL: %s", requestID, url)
taskLogger.Printf("[任务 %s] 图片URL: %s", requestID, imgURL)
// 更新状态为发布中
pm.updatePublishStatus(requestID, 2, "")
taskLogger.Printf("[任务 %s] 状态已更新为发布中", requestID)
// 下载文件
taskLogger.Printf("[任务 %s] 开始下载文档...", requestID)
docPath, err := pkg.DownloadFile(url, pm.Conf.Sys.DocsDir, requestID+".docx")
if err != nil {
errMsg := fmt.Sprintf("下载文档失败: %v", err)
taskLogger.Printf("[任务 %s] ❌ %s", requestID, errMsg)
pm.updatePublishStatus(requestID, 3, errMsg)
return map[string]interface{}{"success": false, "message": errMsg, "request_id": requestID}
}
defer func() {
if docPath != "" {
pkg.DeleteFile(docPath)
taskLogger.Printf("[任务 %s] 已删除文档文件: %s", requestID, docPath)
}
}()
taskLogger.Printf("[任务 %s] ✅ 文档下载成功: %s", requestID, docPath)
// 下载图片
taskLogger.Printf("[任务 %s] 开始下载图片...", requestID)
imgPath, err := pkg.DownloadImage(imgURL, requestID, pm.Conf.Sys.UploadDir)
defer func() {
if imgPath != "" {
pkg.DeleteFile(imgPath)
taskLogger.Printf("[任务 %s] 已删除图片文件: %s", requestID, imgPath)
}
}()
if err != nil {
errMsg := fmt.Sprintf("下载图片失败: %v", err)
taskLogger.Printf("[任务 %s] ❌ %s", requestID, errMsg)
pm.updatePublishStatus(requestID, 3, errMsg)
return map[string]interface{}{"success": false, "message": errMsg, "request_id": requestID}
}
taskLogger.Printf("[任务 %s] ✅ 图片下载成功: %s", requestID, imgPath)
// 解析标签
tags := pkg.ParseTags(tagRaw)
taskLogger.Printf("[任务 %s] 标签解析完成: %v", requestID, tags)
// 获取发布器
publisherClass := GetPublisherClass(platIndex)
if publisherClass == nil {
errMsg := fmt.Sprintf("不支持的平台: %s", platIndex)
taskLogger.Printf("[任务 %s] ❌ %s", requestID, errMsg)
pm.updatePublishStatus(requestID, 3, errMsg)
return map[string]interface{}{"success": false, "message": errMsg, "request_id": requestID}
}
// 提取内容
taskLogger.Printf("[任务 %s] 开始提取文档内容...", requestID)
var content string
if publisherClass.Type == 1 {
content, err = pkg.ExtractWordContent(docPath, publisherClass.ContentFormat)
if err != nil {
errMsg := fmt.Sprintf("提取文档内容失败: %v", err)
taskLogger.Printf("[任务 %s] ❌ %s", requestID, errMsg)
pm.updatePublishStatus(requestID, 3, errMsg)
return map[string]interface{}{"success": false, "message": errMsg, "request_id": requestID}
}
}
taskLogger.Printf("[任务 %s] ✅ 内容提取成功,长度: %d", requestID, len(content))
taskLogger.Printf("[任务 %s] 创建发布器...", requestID)
pub := publisherClass.InitMethod(false, title, content, tags, userIndex, platIndex, requestID, imgPath, docPath, publishData, pm.Conf, taskLogger)
taskLogger.Printf("[任务 %s] 创建%s发布器", publisherClass.Name, requestID)
taskLogger.Printf("[任务 %s] 开始执行发布...", requestID)
success, message := pub.PublishNote()
if success {
taskLogger.Printf("[任务 %s] ✅ 发布成功: %s", requestID, message)
pm.updatePublishStatus(requestID, 4, message)
} else {
taskLogger.Printf("[任务 %s] ❌ 发布失败: %s", requestID, message)
pm.updatePublishStatus(requestID, 3, message)
}
taskLogger.Printf(strings.Repeat("=", 80))
taskLogger.Printf("任务结束 | RequestID: %s | 结果: %v | 时间: %s", requestID, success, time.Now().Format("2006-01-02 15:04:05.000"))
taskLogger.Printf(strings.Repeat("=", 80))
return map[string]interface{}{
"success": success,
"message": message,
"request_id": requestID,
}
}
func (pm *PublishManager) updatePublishStatus(requestID string, status int, message string) {
if message != "" {
pm.db.Execute("UPDATE publish SET status = ?, msg = ? WHERE request_id = ?", status, message, requestID)
} else {
pm.db.Execute("UPDATE publish SET status = ? WHERE request_id = ?", status, requestID)
}
}
func (pm *PublishManager) ExecuteOnce(tokenId int32) map[string]interface{} {
publishData := pm.getPendingPublish()
if publishData == nil {
return map[string]interface{}{"success": false, "message": "没有待发布任务"}
}
return pm.processSingleTask(publishData)
}
func (pm *PublishManager) RetryTask(requestID string) map[string]interface{} {
publishData, err := pm.GetTaskByRequestID(requestID)
if err != nil || publishData == nil {
return map[string]interface{}{"success": false, "message": "任务不存在"}
}
return pm.processSingleTask(publishData)
}
func (pm *PublishManager) GetStatus() map[string]interface{} {
return map[string]interface{}{
"auto_status": pm.AutoStatus,
"max_concurrent": pm.Conf.Sys.MaxConcurrent,
"task_timeout": pm.Conf.Sys.TaskTimeout,
}
}
func GetPublisherClass(platIndex string) *publisher.PublisherValue {
return publisher.PublisherMap[platIndex]
}