geoGo/internal/manager/publish_manager.go

298 lines
8.2 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package manager
import (
"fmt"
"geo/internal/config"
"geo/internal/publisher"
"geo/pkg"
"log"
"sync"
"time"
"geo/utils"
)
type PublishManager struct {
AutoStatus bool
Conf *config.Config
TokenID int
running bool
mu sync.Mutex
stopCh chan struct{}
currentPublisher interface{}
db *utils.Db
}
var publishManager *PublishManager
var once sync.Once
func GetPublishManager(config *config.Config, db *utils.Db) *PublishManager {
once.Do(func() {
publishManager = &PublishManager{
AutoStatus: false,
Conf: config,
stopCh: make(chan struct{}),
db: db,
}
})
return publishManager
}
func (pm *PublishManager) Start(tokenID int) bool {
pm.mu.Lock()
defer pm.mu.Unlock()
if pm.AutoStatus {
return false
}
pm.TokenID = tokenID
pm.AutoStatus = true
pm.stopCh = make(chan struct{})
go pm.autoPublishLoop()
return true
}
func (pm *PublishManager) Stop() bool {
pm.mu.Lock()
defer pm.mu.Unlock()
if !pm.AutoStatus {
return false
}
pm.AutoStatus = false
close(pm.stopCh)
return true
}
func (pm *PublishManager) autoPublishLoop() {
log.Println("自动发布服务已启动")
for {
select {
case <-pm.stopCh:
log.Println("自动发布服务已停止")
return
default:
pm.batchPublish()
time.Sleep(30 * time.Second)
}
}
}
func (pm *PublishManager) batchPublish() {
if !pm.AutoStatus {
return
}
publishData := pm.getPendingPublish()
if publishData == nil {
return
}
// 使用 defer recover 防止 panic 导致整个循环崩溃
defer func() {
if r := recover(); r != nil {
log.Printf("批处理发布发生 panic: %v", r)
}
}()
pm.processSingleTask(publishData)
}
func (pm *PublishManager) getPendingPublish() map[string]interface{} {
currentTime := time.Now().Format("2006-01-02 15:04:05")
sql := `
SELECT p.*, pl.*
FROM publish p
INNER JOIN plat pl ON p.plat_index = pl.index AND pl.status = 1
WHERE p.token_id = ? AND p.status = 1 AND p.publish_time <= ?
ORDER BY p.publish_time DESC
LIMIT 1
`
result, err := pm.db.GetOne(sql, pm.TokenID, currentTime)
if err != nil {
log.Printf("查询待发布任务失败: token_id=%d, error=%v", pm.TokenID, err)
return nil
}
if result == nil {
log.Printf("没有待发布任务: token_id=%d, current_time=%s", pm.TokenID, currentTime)
return nil
}
requestID := getString(result, "request_id")
log.Printf("获取到待发布任务: token_id=%d, request_id=%s", pm.TokenID, requestID)
return result
}
func (pm *PublishManager) GetTaskByRequestID(requestID string) (map[string]interface{}, error) {
sql := `
SELECT p.*, pl.*
FROM publish p
INNER JOIN plat pl ON p.plat_index COLLATE utf8mb4_unicode_ci = pl.index AND pl.status = 1
WHERE p.request_id = ?
`
return pm.db.GetOne(sql, requestID)
}
func (pm *PublishManager) processSingleTask(publishData map[string]interface{}) map[string]interface{} {
requestID := getString(publishData, "request_id")
platIndex := getString(publishData, "plat_index")
title := getString(publishData, "title")
tagRaw := getString(publishData, "tag")
userIndex := getString(publishData, "user_index")
url := getString(publishData, "url")
imgURL := getString(publishData, "img")
log.Printf("[任务 %s] 开始处理,平台:%s标题%s", requestID, platIndex, title)
// 更新状态为发布中
pm.updatePublishStatus(requestID, 2, "")
log.Printf("[任务 %s] 状态已更新为发布中", requestID)
// 下载文件
docPath, err := pkg.DownloadFile(url, "", requestID+".docx")
if err != nil {
errMsg := fmt.Sprintf("下载文档失败: %v", err)
log.Printf("[任务 %s] %s", requestID, errMsg)
pm.updatePublishStatus(requestID, 3, errMsg)
return map[string]interface{}{"success": false, "message": errMsg, "request_id": requestID}
}
log.Printf("[任务 %s] 文档下载成功: %s", requestID, docPath)
// 下载图片
imgPath, err := pkg.DownloadImage(imgURL, requestID, "img")
if err != nil {
errMsg := fmt.Sprintf("下载图片失败: %v", err)
log.Printf("[任务 %s] %s", requestID, errMsg)
pm.updatePublishStatus(requestID, 3, errMsg)
// 图片下载失败,清理已下载的文档
pkg.DeleteFile(docPath)
return map[string]interface{}{"success": false, "message": errMsg, "request_id": requestID}
}
log.Printf("[任务 %s] 图片下载成功: %s", requestID, imgPath)
// 确保清理临时文件
defer func() {
pkg.DeleteFile(docPath)
pkg.DeleteFile(imgPath)
}()
// 解析标签
tags := pkg.ParseTags(tagRaw)
log.Printf("[任务 %s] 标签解析完成: %v", requestID, tags)
// 提取内容
content, err := pkg.ExtractWordContent(docPath, "html")
if err != nil {
errMsg := fmt.Sprintf("提取文档内容失败: %v", err)
log.Printf("[任务 %s] %s", requestID, errMsg)
pm.updatePublishStatus(requestID, 3, errMsg)
return map[string]interface{}{"success": false, "message": errMsg, "request_id": requestID}
}
log.Printf("[任务 %s] 内容提取成功,长度: %d", requestID, len(content))
// 获取发布器
publisherClass := getPublisherClass(platIndex)
if publisherClass == nil {
errMsg := fmt.Sprintf("不支持的平台: %s", platIndex)
log.Printf("[任务 %s] %s", requestID, errMsg)
pm.updatePublishStatus(requestID, 3, errMsg)
return map[string]interface{}{"success": false, "message": errMsg, "request_id": requestID}
}
// 创建并执行发布器
var pub interface{ PublishNote() (bool, string) }
switch platIndex {
case "xhs":
pub = publisher.NewXiaohongshuPublisher(false, title, content, tags, userIndex, platIndex, requestID, imgPath, docPath, publishData, pm.Conf)
log.Printf("[任务 %s] 创建小红书发布器", requestID)
case "bjh":
pub = publisher.NewBaijiahaoPublisher(false, title, content, tags, userIndex, platIndex, requestID, imgPath, docPath, publishData, pm.Conf)
log.Printf("[任务 %s] 创建百家号发布器", requestID)
default:
log.Printf("[任务 %s] 未知平台 %s使用默认小红书发布器", requestID, platIndex)
pub = publisher.NewXiaohongshuPublisher(false, title, content, tags, userIndex, platIndex, requestID, imgPath, docPath, publishData, pm.Conf)
}
log.Printf("[任务 %s] 开始执行发布...", requestID)
success, message := pub.PublishNote()
if success {
log.Printf("[任务 %s] 发布成功: %s", requestID, message)
pm.updatePublishStatus(requestID, 4, message)
} else {
log.Printf("[任务 %s] 发布失败: %s", requestID, message)
pm.updatePublishStatus(requestID, 3, message)
}
return map[string]interface{}{
"success": success,
"message": message,
"request_id": requestID,
}
}
func (pm *PublishManager) updatePublishStatus(requestID string, status int, message string) {
if message != "" {
pm.db.Execute("UPDATE publish SET status = ?, msg = ? WHERE request_id = ?", status, message, requestID)
} else {
pm.db.Execute("UPDATE publish SET status = ? WHERE request_id = ?", status, requestID)
}
}
func (pm *PublishManager) ExecuteOnce(tokenId int32) map[string]interface{} {
publishData := pm.getPendingPublish()
if publishData == nil {
return map[string]interface{}{"success": false, "message": "没有待发布任务"}
}
return pm.processSingleTask(publishData)
}
func (pm *PublishManager) RetryTask(requestID string) map[string]interface{} {
publishData, err := pm.GetTaskByRequestID(requestID)
if err != nil || publishData == nil {
return map[string]interface{}{"success": false, "message": "任务不存在"}
}
return pm.processSingleTask(publishData)
}
func (pm *PublishManager) GetStatus() map[string]interface{} {
return map[string]interface{}{
"auto_status": pm.AutoStatus,
"max_concurrent": pm.Conf.Sys.MaxConcurrent,
"task_timeout": pm.Conf.Sys.TaskTimeout,
}
}
func getPublisherClass(platIndex string) interface{} {
platformMap := map[string]interface{}{
"xhs": struct{}{},
"bjh": struct{}{},
"csdn": struct{}{},
}
return platformMap[platIndex]
}
func getString(m map[string]interface{}, key string) string {
if v, ok := m[key]; ok {
switch v.(type) {
case []uint8:
return string(v.([]uint8))
case string:
return v.(string)
case int64:
return fmt.Sprintf("%d", v)
default:
return fmt.Sprintf("%v", v)
}
}
return ""
}