package coroutine import ( "context" "fmt" "gitea.cdlsxd.cn/self-tools/l-export-async/util" "time" ) // Server 管理协程 // 提供给kratos当server使用,主要实现其start和stop,交给kratos管理其生命周期 type Server struct { // 关闭时最长等待时长 timeout time.Duration } type ServerOption func(s *Server) func WithServerTimeout(timeout time.Duration) ServerOption { return func(s *Server) { s.timeout = timeout } } // NewServer 创建一个协程管理器 // log 日志 // timeout 关闭时最长等待时长 func NewServer(log util.Logger, opts ...ServerOption) *Server { globalLogger = log s := &Server{ timeout: 24 * time.Hour, //默认最多等待24个小时 } for _, opt := range opts { opt(s) } return s } func (s *Server) Start(ctx context.Context) error { return nil } func (s *Server) Stop(ctx context.Context) error { myCtx, cancel := context.WithTimeout(context.Background(), s.timeout) defer cancel() // 每1s检查下业务是否都处理完成 ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { tasks := getTasks() if len(tasks) == 0 { fmt.Println("协程任务已全部优雅退出") return nil } processStatusMsg := "" for _, t := range tasks { if processStatusMsg != "" { processStatusMsg += "、" } processStatusMsg += t.name } fmt.Printf("等待协程任务退出,当前活动:%d 个:%s \n", len(tasks), processStatusMsg) select { case <-myCtx.Done(): //超时退出 err := fmt.Errorf("等待协程任务超时,即将退出") return err case <-ticker.C: } } }