74 lines
1.6 KiB
Go
74 lines
1.6 KiB
Go
package coroutine
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"gitea.cdlsxd.cn/self-tools/l-export-async/util"
|
||
"time"
|
||
)
|
||
|
||
// Server 管理协程
|
||
// 提供给kratos当server使用,主要实现其start和stop,交给kratos管理其生命周期
|
||
type Server struct {
|
||
// 关闭时最长等待时长
|
||
timeout time.Duration
|
||
}
|
||
|
||
type ServerOption func(s *Server)
|
||
|
||
func WithServerTimeout(timeout time.Duration) ServerOption {
|
||
return func(s *Server) {
|
||
s.timeout = timeout
|
||
}
|
||
}
|
||
|
||
// NewServer 创建一个协程管理器
|
||
// log 日志
|
||
// timeout 关闭时最长等待时长
|
||
func NewServer(log util.Logger, opts ...ServerOption) *Server {
|
||
globalLogger = log
|
||
s := &Server{
|
||
timeout: 24 * time.Hour, //默认最多等待24个小时
|
||
}
|
||
for _, opt := range opts {
|
||
opt(s)
|
||
}
|
||
return s
|
||
}
|
||
|
||
func (s *Server) Start(ctx context.Context) error {
|
||
return nil
|
||
}
|
||
|
||
func (s *Server) Stop(ctx context.Context) error {
|
||
myCtx, cancel := context.WithTimeout(context.Background(), s.timeout)
|
||
defer cancel()
|
||
|
||
// 每1s检查下业务是否都处理完成
|
||
ticker := time.NewTicker(1 * time.Second)
|
||
defer ticker.Stop()
|
||
|
||
for {
|
||
tasks := getTasks()
|
||
if len(tasks) == 0 {
|
||
fmt.Println("协程任务已全部优雅退出")
|
||
return nil
|
||
}
|
||
processStatusMsg := ""
|
||
for _, t := range tasks {
|
||
if processStatusMsg != "" {
|
||
processStatusMsg += "、"
|
||
}
|
||
processStatusMsg += t.name
|
||
}
|
||
fmt.Printf("等待协程任务退出,当前活动:%d 个:%s \n", len(tasks), processStatusMsg)
|
||
select {
|
||
case <-myCtx.Done():
|
||
//超时退出
|
||
err := fmt.Errorf("等待协程任务超时,即将退出")
|
||
return err
|
||
case <-ticker.C:
|
||
}
|
||
}
|
||
}
|