l-export-async/coroutine/server.go

74 lines
1.6 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package coroutine
import (
"context"
"fmt"
"gitea.cdlsxd.cn/self-tools/l-export-async/util"
"time"
)
// Server 管理协程
// 提供给kratos当server使用主要实现其start和stop交给kratos管理其生命周期
type Server struct {
// 关闭时最长等待时长
timeout time.Duration
}
type ServerOption func(s *Server)
func WithServerTimeout(timeout time.Duration) ServerOption {
return func(s *Server) {
s.timeout = timeout
}
}
// NewServer 创建一个协程管理器
// log 日志
// timeout 关闭时最长等待时长
func NewServer(log util.Logger, opts ...ServerOption) *Server {
globalLogger = log
s := &Server{
timeout: 24 * time.Hour, //默认最多等待24个小时
}
for _, opt := range opts {
opt(s)
}
return s
}
func (s *Server) Start(ctx context.Context) error {
return nil
}
func (s *Server) Stop(ctx context.Context) error {
myCtx, cancel := context.WithTimeout(context.Background(), s.timeout)
defer cancel()
// 每1s检查下业务是否都处理完成
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
tasks := getTasks()
if len(tasks) == 0 {
fmt.Println("协程任务已全部优雅退出")
return nil
}
processStatusMsg := ""
for _, t := range tasks {
if processStatusMsg != "" {
processStatusMsg += "、"
}
processStatusMsg += t.name
}
fmt.Printf("等待协程任务退出,当前活动:%d 个:%s \n", len(tasks), processStatusMsg)
select {
case <-myCtx.Done():
//超时退出
err := fmt.Errorf("等待协程任务超时,即将退出")
return err
case <-ticker.C:
}
}
}