package manager import ( "fmt" "geo/internal/config" "geo/internal/publisher" "geo/pkg" "io" "log" "os" "path/filepath" "strings" "sync" "time" "geo/utils" ) type PublishManager struct { AutoStatus bool Conf *config.Config TokenID int running bool mu sync.Mutex stopCh chan struct{} currentPublisher interface{} db *utils.Db } var publishManager *PublishManager var once sync.Once func GetPublishManager(config *config.Config, db *utils.Db) *PublishManager { once.Do(func() { publishManager = &PublishManager{ AutoStatus: false, Conf: config, stopCh: make(chan struct{}), db: db, } }) return publishManager } // getTaskLogger 获取任务专属日志记录器(同一个文件) func (pm *PublishManager) getTaskLogger(requestID string) (*log.Logger, *os.File, error) { // 确定日志目录 logsDir := pm.Conf.Sys.LogsDir if logsDir == "" { logsDir = "./logs" } // 按日期创建子目录 dateDir := time.Now().Format("2006-01-02") taskLogDir := filepath.Join(logsDir, "tasks", dateDir) if err := os.MkdirAll(taskLogDir, 0755); err != nil { return nil, nil, fmt.Errorf("创建日志目录失败: %v", err) } // 创建以requestId命名的日志文件 logPath := filepath.Join(taskLogDir, fmt.Sprintf("%s.log", requestID)) logFile, err := os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) if err != nil { return nil, nil, fmt.Errorf("创建日志文件失败: %v", err) } // 创建写入器:同时写入文件和标准输出 multiWriter := io.MultiWriter(logFile, os.Stdout) // 创建专用的logger taskLogger := log.New(multiWriter, "", log.LstdFlags|log.Lmicroseconds) // 写入任务开始分隔线 taskLogger.Printf(strings.Repeat("=", 80)) taskLogger.Printf("任务开始 | RequestID: %s | 时间: %s", requestID, time.Now().Format("2006-01-02 15:04:05.000")) taskLogger.Printf(strings.Repeat("=", 80)) return taskLogger, logFile, nil } func (pm *PublishManager) Start(tokenID int) bool { pm.mu.Lock() defer pm.mu.Unlock() if pm.AutoStatus { return false } pm.TokenID = tokenID pm.AutoStatus = true pm.stopCh = make(chan struct{}) go pm.autoPublishLoop() return true } func (pm *PublishManager) Stop() bool { pm.mu.Lock() defer pm.mu.Unlock() if !pm.AutoStatus { return false } pm.AutoStatus = false close(pm.stopCh) return true } func (pm *PublishManager) autoPublishLoop() { log.Println("自动发布服务已启动") for { select { case <-pm.stopCh: log.Println("自动发布服务已停止") return default: pm.batchPublish() time.Sleep(30 * time.Second) } } } func (pm *PublishManager) batchPublish() { if !pm.AutoStatus { return } publishData := pm.getPendingPublish() if publishData == nil { return } // 使用 defer recover 防止 panic 导致整个循环崩溃 defer func() { if r := recover(); r != nil { log.Printf("批处理发布发生 panic: %v", r) } }() pm.processSingleTask(publishData) } func (pm *PublishManager) getPendingPublish() map[string]interface{} { currentTime := time.Now().Format("2006-01-02 15:04:05") sql := ` SELECT p.*, pl.* FROM publish p INNER JOIN plat pl ON p.plat_index = pl.index AND pl.status = 1 WHERE p.token_id = ? AND p.status = 1 AND p.publish_time <= ? ORDER BY p.publish_time DESC LIMIT 1 ` result, err := pm.db.GetOne(sql, pm.TokenID, currentTime) if err != nil { log.Printf("查询待发布任务失败: token_id=%d, error=%v", pm.TokenID, err) return nil } if result == nil { log.Printf("没有待发布任务: token_id=%d, current_time=%s", pm.TokenID, currentTime) return nil } requestID := pkg.GetString(result, "request_id") log.Printf("获取到待发布任务: token_id=%d, request_id=%s", pm.TokenID, requestID) return result } func (pm *PublishManager) GetTaskByRequestID(requestID string) (map[string]interface{}, error) { sql := ` SELECT p.*, pl.* FROM publish p INNER JOIN plat pl ON p.plat_index COLLATE utf8mb4_unicode_ci = pl.index AND pl.status = 1 WHERE p.request_id = ? ` return pm.db.GetOne(sql, requestID) } func (pm *PublishManager) processSingleTask(publishData map[string]interface{}) (result map[string]interface{}) { requestID := pkg.GetString(publishData, "request_id") // 获取任务专属日志(同一个文件) taskLogger, logFile, err := pm.getTaskLogger(requestID) if err != nil { log.Printf("[任务 %s] 创建日志文件失败: %v,使用全局日志", requestID, err) taskLogger = log.Default() } if logFile != nil { defer logFile.Close() } // 全局defer用于捕获panic并记录到同一个日志文件 defer func() { if r := recover(); r != nil { errMsg := fmt.Sprintf("任务执行发生panic: %v", r) taskLogger.Printf("❌ CRITICAL: %s", errMsg) taskLogger.Printf(strings.Repeat("=", 80)) taskLogger.Printf("任务异常结束 | RequestID: %s | 时间: %s", requestID, time.Now().Format("2006-01-02 15:04:05.000")) taskLogger.Printf(strings.Repeat("=", 80)) result = map[string]interface{}{ "success": false, "message": errMsg, "request_id": requestID, } } }() taskLogger.Printf("[任务 %s] 开始处理", requestID) platIndex := pkg.GetString(publishData, "plat_index") title := pkg.GetString(publishData, "title") tagRaw := pkg.GetString(publishData, "tag") userIndex := pkg.GetString(publishData, "user_index") url := pkg.GetString(publishData, "url") imgURL := pkg.GetString(publishData, "img") taskLogger.Printf("[任务 %s] 任务详情 - 平台:%s,标题:%s,用户:%s", requestID, platIndex, title, userIndex) taskLogger.Printf("[任务 %s] 文档URL: %s", requestID, url) taskLogger.Printf("[任务 %s] 图片URL: %s", requestID, imgURL) // 更新状态为发布中 pm.updatePublishStatus(requestID, 2, "") taskLogger.Printf("[任务 %s] 状态已更新为发布中", requestID) // 下载文件 taskLogger.Printf("[任务 %s] 开始下载文档...", requestID) docPath, err := pkg.DownloadFile(url, pm.Conf.Sys.DocsDir, requestID+".docx") if err != nil { errMsg := fmt.Sprintf("下载文档失败: %v", err) taskLogger.Printf("[任务 %s] ❌ %s", requestID, errMsg) pm.updatePublishStatus(requestID, 3, errMsg) return map[string]interface{}{"success": false, "message": errMsg, "request_id": requestID} } defer func() { if docPath != "" { pkg.DeleteFile(docPath) taskLogger.Printf("[任务 %s] 已删除文档文件: %s", requestID, docPath) } }() taskLogger.Printf("[任务 %s] ✅ 文档下载成功: %s", requestID, docPath) // 下载图片 taskLogger.Printf("[任务 %s] 开始下载图片...", requestID) imgPath, err := pkg.DownloadImage(imgURL, requestID, pm.Conf.Sys.UploadDir) defer func() { if imgPath != "" { pkg.DeleteFile(imgPath) taskLogger.Printf("[任务 %s] 已删除图片文件: %s", requestID, imgPath) } }() if err != nil { errMsg := fmt.Sprintf("下载图片失败: %v", err) taskLogger.Printf("[任务 %s] ❌ %s", requestID, errMsg) pm.updatePublishStatus(requestID, 3, errMsg) return map[string]interface{}{"success": false, "message": errMsg, "request_id": requestID} } taskLogger.Printf("[任务 %s] ✅ 图片下载成功: %s", requestID, imgPath) // 解析标签 tags := pkg.ParseTags(tagRaw) taskLogger.Printf("[任务 %s] 标签解析完成: %v", requestID, tags) // 获取发布器 publisherClass := GetPublisherClass(platIndex) if publisherClass == nil { errMsg := fmt.Sprintf("不支持的平台: %s", platIndex) taskLogger.Printf("[任务 %s] ❌ %s", requestID, errMsg) pm.updatePublishStatus(requestID, 3, errMsg) return map[string]interface{}{"success": false, "message": errMsg, "request_id": requestID} } // 提取内容 taskLogger.Printf("[任务 %s] 开始提取文档内容...", requestID) var content string if publisherClass.Type == 1 { content, err = pkg.ExtractWordContent(docPath, publisherClass.ContentFormat) if err != nil { errMsg := fmt.Sprintf("提取文档内容失败: %v", err) taskLogger.Printf("[任务 %s] ❌ %s", requestID, errMsg) pm.updatePublishStatus(requestID, 3, errMsg) return map[string]interface{}{"success": false, "message": errMsg, "request_id": requestID} } } taskLogger.Printf("[任务 %s] ✅ 内容提取成功,长度: %d", requestID, len(content)) taskLogger.Printf("[任务 %s] 创建发布器...", requestID) pub := publisherClass.InitMethod(false, title, content, tags, userIndex, platIndex, requestID, imgPath, docPath, publishData, pm.Conf, taskLogger) taskLogger.Printf("[任务 %s] 创建%s发布器", publisherClass.Name, requestID) taskLogger.Printf("[任务 %s] 开始执行发布...", requestID) success, message := pub.PublishNote() if success { taskLogger.Printf("[任务 %s] ✅ 发布成功: %s", requestID, message) pm.updatePublishStatus(requestID, 4, message) } else { taskLogger.Printf("[任务 %s] ❌ 发布失败: %s", requestID, message) pm.updatePublishStatus(requestID, 3, message) } taskLogger.Printf(strings.Repeat("=", 80)) taskLogger.Printf("任务结束 | RequestID: %s | 结果: %v | 时间: %s", requestID, success, time.Now().Format("2006-01-02 15:04:05.000")) taskLogger.Printf(strings.Repeat("=", 80)) return map[string]interface{}{ "success": success, "message": message, "request_id": requestID, } } func (pm *PublishManager) updatePublishStatus(requestID string, status int, message string) { if message != "" { pm.db.Execute("UPDATE publish SET status = ?, msg = ? WHERE request_id = ?", status, message, requestID) } else { pm.db.Execute("UPDATE publish SET status = ? WHERE request_id = ?", status, requestID) } } func (pm *PublishManager) ExecuteOnce(tokenId int32) map[string]interface{} { publishData := pm.getPendingPublish() if publishData == nil { return map[string]interface{}{"success": false, "message": "没有待发布任务"} } return pm.processSingleTask(publishData) } func (pm *PublishManager) RetryTask(requestID string) map[string]interface{} { publishData, err := pm.GetTaskByRequestID(requestID) if err != nil || publishData == nil { return map[string]interface{}{"success": false, "message": "任务不存在"} } return pm.processSingleTask(publishData) } func (pm *PublishManager) GetStatus() map[string]interface{} { return map[string]interface{}{ "auto_status": pm.AutoStatus, "max_concurrent": pm.Conf.Sys.MaxConcurrent, "task_timeout": pm.Conf.Sys.TaskTimeout, } } func GetPublisherClass(platIndex string) *publisher.PublisherValue { return publisher.PublisherMap[platIndex] }