Compare commits
3 Commits
| Author | SHA1 | Date |
|---|---|---|
|
|
ae41655c5d | |
|
|
9cf8777a16 | |
|
|
b262190ca1 |
|
|
@ -109,7 +109,7 @@ func Upload(host, filePath, system, business, fieldFormName string) (*UploadResp
|
||||||
_ = retry.Retry(func() error {
|
_ = retry.Retry(func() error {
|
||||||
err = requestHttp()
|
err = requestHttp()
|
||||||
return err
|
return err
|
||||||
}, retry.RetryTimes(5), retry.RetryDuration(time.Second*3))
|
}, retry.RetryTimes(5))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
11
entity.go
11
entity.go
|
|
@ -23,10 +23,11 @@ type Uploader struct {
|
||||||
|
|
||||||
type ExportOption func(*ExportAsync)
|
type ExportOption func(*ExportAsync)
|
||||||
|
|
||||||
// WithCustomUploader 自定义上传配置
|
// WithCustomOssUploader 自定义上传配置
|
||||||
func WithCustomUploader(sys string, business string, fieldFormName string) ExportOption {
|
func WithCustomOssUploader(host string, sys string, business string, fieldFormName string) ExportOption {
|
||||||
return func(b *ExportAsync) {
|
return func(b *ExportAsync) {
|
||||||
b.uploader = &Uploader{
|
b.uploader = &Uploader{
|
||||||
|
Host: host,
|
||||||
System: sys,
|
System: sys,
|
||||||
Business: business,
|
Business: business,
|
||||||
FieldFormName: fieldFormName,
|
FieldFormName: fieldFormName,
|
||||||
|
|
@ -146,6 +147,12 @@ func WithTimeRange(fetcher TimeRangeDataFetcher, startTime time.Time, timeRange
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithCustomCacheDir(dir string) ExportOption {
|
||||||
|
return func(e *ExportAsync) {
|
||||||
|
e.cacheDir = dir
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type Task struct {
|
type Task struct {
|
||||||
Id string `json:"id"`
|
Id string `json:"id"`
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
|
|
|
||||||
|
|
@ -11,11 +11,6 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"gitea.cdlsxd.cn/self-tools/l-export-async/attachment"
|
|
||||||
"gitea.cdlsxd.cn/self-tools/l-export-async/coroutine"
|
|
||||||
"github.com/google/uuid"
|
|
||||||
"github.com/redis/go-redis/v9"
|
|
||||||
"golang.org/x/sync/errgroup"
|
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
@ -23,24 +18,26 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"gitea.cdlsxd.cn/self-tools/l-export-async/attachment"
|
||||||
|
"gitea.cdlsxd.cn/self-tools/l-export-async/coroutine"
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
var exportAsyncPool = &sync.Pool{
|
var exportAsyncPool = &sync.Pool{
|
||||||
New: func() interface{} {
|
New: func() interface{} {
|
||||||
return &ExportAsync{
|
return &ExportAsync{
|
||||||
extension: DefaultExtension,
|
extension: DefaultExtension,
|
||||||
sheetName: "Sheet1",
|
sheetName: DefaultSheetName,
|
||||||
batchSize: DefaultBatch,
|
batchSize: DefaultBatch,
|
||||||
maxRowPerFile: DefaultMaxRowPerFile,
|
maxRowPerFile: DefaultMaxRowPerFile,
|
||||||
csvToExcelBatch: DefaultCsvToExcelBatch,
|
csvToExcelBatch: DefaultCsvToExcelBatch,
|
||||||
uploader: &Uploader{
|
|
||||||
FieldFormName: "file",
|
|
||||||
System: "crmApi",
|
|
||||||
Business: "download",
|
|
||||||
},
|
|
||||||
task: &Task{},
|
task: &Task{},
|
||||||
workerNum: DefaultWorkNum, //runtime.NumCPU() * 2,
|
workerNum: DefaultWorkNum, //runtime.NumCPU() * 2,
|
||||||
logTool: NewLogPrint(nil),
|
logTool: NewLogPrint(nil),
|
||||||
|
cacheDir: DefaultCacheDir,
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
@ -51,15 +48,11 @@ var (
|
||||||
DefaultMaxRowPerFile = 100000 ////每个Xlsx的行数,默认10000行->WithMaxRowPerFile
|
DefaultMaxRowPerFile = 100000 ////每个Xlsx的行数,默认10000行->WithMaxRowPerFile
|
||||||
DefaultCsvToExcelBatch = 1000 ////csv转excel的批量写入缓冲区大小,逐行写入设置为1000->WithCustomBufferSize
|
DefaultCsvToExcelBatch = 1000 ////csv转excel的批量写入缓冲区大小,逐行写入设置为1000->WithCustomBufferSize
|
||||||
DefaultWorkNum = 1 // 并发协程数(务必大于1),默认runtime.NumCPU() * 2->WithCustomWorkNum
|
DefaultWorkNum = 1 // 并发协程数(务必大于1),默认runtime.NumCPU() * 2->WithCustomWorkNum
|
||||||
ProcessLimit = 1000 //全局并行导出任务上限
|
|
||||||
DefaultUploader = &Uploader{
|
|
||||||
FieldFormName: "file",
|
|
||||||
System: "crmApi",
|
|
||||||
Business: "download",
|
|
||||||
}
|
|
||||||
DefaultExtension = ".xlsx"
|
DefaultExtension = ".xlsx"
|
||||||
DefaultSheetName = "Sheet1"
|
DefaultSheetName = "Sheet1"
|
||||||
//SameTaskProcessLimit = 1 //单任务并行导出上限,必须小于ProcessLimit
|
DefaultCacheDir = "/tmp"
|
||||||
|
ProcessLimit = 0 //全局并行导出任务上限
|
||||||
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ExportAsync 异步导出任务配置->默认配置往上看
|
// ExportAsync 异步导出任务配置->默认配置往上看
|
||||||
|
|
@ -87,7 +80,8 @@ type ExportAsync struct {
|
||||||
//2.当workerNum>1时,因为线程的乱序(虽然可以通过通道解决,但是就没有线程存在的意义了),会导致查询很多空值出来
|
//2.当workerNum>1时,因为线程的乱序(虽然可以通过通道解决,但是就没有线程存在的意义了),会导致查询很多空值出来
|
||||||
//比如:如果dataProvider走的mysql,那么会执行很多空查询,增加数据库压力
|
//比如:如果dataProvider走的mysql,那么会执行很多空查询,增加数据库压力
|
||||||
dataCount int
|
dataCount int
|
||||||
|
//导出文件存储位置->默认/tmp ->WithCustomCacheDir
|
||||||
|
cacheDir string
|
||||||
//日志输出->WithLogPrint
|
//日志输出->WithLogPrint
|
||||||
logTool LogTool
|
logTool LogTool
|
||||||
// Excel 表头
|
// Excel 表头
|
||||||
|
|
@ -115,7 +109,6 @@ type ExportAsync struct {
|
||||||
func NewExportAsync(
|
func NewExportAsync(
|
||||||
fileName string,
|
fileName string,
|
||||||
header []string,
|
header []string,
|
||||||
domain string,
|
|
||||||
TaskSaveTool TaskSaveTool,
|
TaskSaveTool TaskSaveTool,
|
||||||
args ...ExportOption,
|
args ...ExportOption,
|
||||||
) *ExportAsync {
|
) *ExportAsync {
|
||||||
|
|
@ -123,7 +116,6 @@ func NewExportAsync(
|
||||||
exporter.fileName = fileName
|
exporter.fileName = fileName
|
||||||
exporter.header = header
|
exporter.header = header
|
||||||
exporter.taskSaveTool = TaskSaveTool
|
exporter.taskSaveTool = TaskSaveTool
|
||||||
exporter.uploader.Host = domain
|
|
||||||
exporter.task.Name = fileName
|
exporter.task.Name = fileName
|
||||||
for _, arg := range args {
|
for _, arg := range args {
|
||||||
arg(exporter)
|
arg(exporter)
|
||||||
|
|
@ -131,16 +123,16 @@ func NewExportAsync(
|
||||||
return exporter
|
return exporter
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ExportAsync) Run(ctx context.Context) (string, error) {
|
func (e *ExportAsync) Run(ctx context.Context) (task_id string, cacheDir string, err error) {
|
||||||
|
|
||||||
//新建任务
|
//新建任务
|
||||||
if e.pageStrategy == nil {
|
if e.pageStrategy == nil {
|
||||||
return "", fmt.Errorf("未设置导出方式,导出方式具体参考PageStrategy")
|
return "", "", fmt.Errorf("未设置导出方式,导出方式具体参考PageStrategy")
|
||||||
}
|
}
|
||||||
|
|
||||||
tempDir, err := e.createTask(ctx)
|
tempDir, err := e.createTask(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", fmt.Errorf("创建任务失败: %v", err)
|
return "", "", fmt.Errorf("创建任务失败: %v", err)
|
||||||
}
|
}
|
||||||
coroutine.Run(fmt.Sprintf("创建任务导出任务:%s", e.fileName), func() {
|
coroutine.Run(fmt.Sprintf("创建任务导出任务:%s", e.fileName), func() {
|
||||||
// 执行导出任务
|
// 执行导出任务
|
||||||
|
|
@ -151,9 +143,11 @@ func (e *ExportAsync) Run(ctx context.Context) (string, error) {
|
||||||
e.logTool.Errorf("导出panic:\n任务:%s,错误原因:%s", e.task.Id, _err)
|
e.logTool.Errorf("导出panic:\n任务:%s,错误原因:%s", e.task.Id, _err)
|
||||||
}
|
}
|
||||||
e.release()
|
e.release()
|
||||||
|
if e.uploader != nil {
|
||||||
if e.uploader.Host != "" {
|
if e.uploader.Host != "" {
|
||||||
os.RemoveAll(tempDir)
|
os.RemoveAll(tempDir)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
cancel()
|
cancel()
|
||||||
}()
|
}()
|
||||||
|
|
@ -166,8 +160,8 @@ func (e *ExportAsync) Run(ctx context.Context) (string, error) {
|
||||||
e.logTool.Infof("异步导出任务:%s,导出完成,总计导出%d条数据,下载地址:%s", e.task.Id, e.task.RowCount, source)
|
e.logTool.Infof("异步导出任务:%s,导出完成,总计导出%d条数据,下载地址:%s", e.task.Id, e.task.RowCount, source)
|
||||||
|
|
||||||
})
|
})
|
||||||
|
cacheDir, err = e.getBaseCacheDir(ctx)
|
||||||
return e.task.Id, nil
|
return e.task.Id, cacheDir, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// 添加配置项
|
// 添加配置项
|
||||||
|
|
@ -198,7 +192,7 @@ func (e *ExportAsync) export(ctx context.Context, tempDir string) (source string
|
||||||
if err = e.folderToZip(excelsDir, source); err != nil {
|
if err = e.folderToZip(excelsDir, source); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if e.uploader != nil {
|
||||||
if len(e.uploader.Host) > 0 {
|
if len(e.uploader.Host) > 0 {
|
||||||
e.logTool.Infof("异步导出任务:%s,开始上传", e.task.Id)
|
e.logTool.Infof("异步导出任务:%s,开始上传", e.task.Id)
|
||||||
source, err = e.upload(source)
|
source, err = e.upload(source)
|
||||||
|
|
@ -206,6 +200,8 @@ func (e *ExportAsync) export(ctx context.Context, tempDir string) (source string
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
e.task.Source = source
|
e.task.Source = source
|
||||||
e.processAdd(ctx, ATT.int())
|
e.processAdd(ctx, ATT.int())
|
||||||
return
|
return
|
||||||
|
|
@ -610,7 +606,7 @@ func (e *ExportAsync) createTask(ctx context.Context) (tempDir string, err error
|
||||||
return "", fmt.Errorf("初始化任务失败: %w", err)
|
return "", fmt.Errorf("初始化任务失败: %w", err)
|
||||||
}
|
}
|
||||||
e.task.Id = uid.String()
|
e.task.Id = uid.String()
|
||||||
tempDir, err = e.createDefaultDir(ctx)
|
tempDir, err = e.getCacheDir(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
||||||
return "", fmt.Errorf("初始化默认文件夹失败: %w", err)
|
return "", fmt.Errorf("初始化默认文件夹失败: %w", err)
|
||||||
|
|
@ -635,6 +631,9 @@ func (e *ExportAsync) getTaskId(ctx context.Context) (uid uuid.UUID, err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ExportAsync) CheckAndIncrementTaskCount(ctx context.Context, key string, limit int, limitType string) error {
|
func (e *ExportAsync) CheckAndIncrementTaskCount(ctx context.Context, key string, limit int, limitType string) error {
|
||||||
|
if limit == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
count, err := e.getAndParseTaskCount(ctx, key)
|
count, err := e.getAndParseTaskCount(ctx, key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("获取%s数量失败: %w", limitType, err)
|
return fmt.Errorf("获取%s数量失败: %w", limitType, err)
|
||||||
|
|
@ -693,12 +692,9 @@ func (e *ExportAsync) processAdd(ctx context.Context, addNum int32) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ExportAsync) createDefaultDir(ctx context.Context) (string, error) {
|
func (e *ExportAsync) getCacheDir(ctx context.Context) (string, error) {
|
||||||
// 创建临时目录
|
// 创建临时目录
|
||||||
tempDir, err := os.MkdirTemp("", e.task.Id)
|
tempDir, err := e.getBaseCacheDir(ctx)
|
||||||
if err != nil {
|
|
||||||
return "", fmt.Errorf("创建临时目录失败: %v", err)
|
|
||||||
}
|
|
||||||
//csv
|
//csv
|
||||||
if err = os.Mkdir(e.dirCsv(tempDir), 0755); err != nil {
|
if err = os.Mkdir(e.dirCsv(tempDir), 0755); err != nil {
|
||||||
return "", fmt.Errorf("创建csv目录失败: %v", err)
|
return "", fmt.Errorf("创建csv目录失败: %v", err)
|
||||||
|
|
@ -714,6 +710,15 @@ func (e *ExportAsync) createDefaultDir(ctx context.Context) (string, error) {
|
||||||
return tempDir, nil
|
return tempDir, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *ExportAsync) getBaseCacheDir(ctx context.Context) (string, error) {
|
||||||
|
tempDir := filepath.Join(e.cacheDir, e.task.Id)
|
||||||
|
err := os.MkdirAll(tempDir, 0755) // 使用标准的权限模式
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("创建临时目录失败: %v", err)
|
||||||
|
}
|
||||||
|
return tempDir, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (e *ExportAsync) dirCsv(tempDir string) string {
|
func (e *ExportAsync) dirCsv(tempDir string) string {
|
||||||
return tempDir + "/csv/"
|
return tempDir + "/csv/"
|
||||||
}
|
}
|
||||||
|
|
@ -766,7 +771,7 @@ func (e *ExportAsync) release() {
|
||||||
e.csvToExcelBatch = DefaultCsvToExcelBatch
|
e.csvToExcelBatch = DefaultCsvToExcelBatch
|
||||||
e.task = &Task{}
|
e.task = &Task{}
|
||||||
e.workerNum = DefaultWorkNum
|
e.workerNum = DefaultWorkNum
|
||||||
e.uploader = DefaultUploader
|
e.cacheDir = DefaultCacheDir
|
||||||
e.logTool = NewLogPrint(nil)
|
e.logTool = NewLogPrint(nil)
|
||||||
e.sheetName = DefaultSheetName
|
e.sheetName = DefaultSheetName
|
||||||
exportAsyncPool.Put(e)
|
exportAsyncPool.Put(e)
|
||||||
|
|
|
||||||
|
|
@ -2,18 +2,11 @@ package l_export_async
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"finance/internal/data"
|
|
||||||
"finance/internal/initialize"
|
|
||||||
"finance/internal/pkg"
|
|
||||||
"finance/internal/pkg/helper/attachment"
|
|
||||||
log2 "finance/internal/pkg/log"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
attachmentsdk "codeup.aliyun.com/5f9118049cffa29cfdd3be1c/attachment-sdk"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Test_Merge(t *testing.T) {
|
func Test_Merge(t *testing.T) {
|
||||||
|
|
@ -37,24 +30,24 @@ func Test_Tsk(t *testing.T) {
|
||||||
t.Log(NewTask(NewRedisTaskStore(dat().Rdb)).GetTaskInfo(context.Background(), taskId))
|
t.Log(NewTask(NewRedisTaskStore(dat().Rdb)).GetTaskInfo(context.Background(), taskId))
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_Upload(t *testing.T) {
|
//func Test_Upload(t *testing.T) {
|
||||||
task_id := "c0abe2b3-dede-11f0-9178-00155d5ef0f92369485728"
|
// task_id := "c0abe2b3-dede-11f0-9178-00155d5ef0f92369485728"
|
||||||
filename := "供应商结算交易流水20251222100502"
|
// filename := "供应商结算交易流水20251222100502"
|
||||||
|
//
|
||||||
file := "/tmp/" + task_id + "/zip/" + filename + ".zip"
|
// file := "/tmp/" + task_id + "/zip/" + filename + ".zip"
|
||||||
//file := "/tmp/a03f82a8-deda-11f0-9c0b-00155d5ef0f9796292421/zip/供应商结算交易流水20251222100502.zip"
|
// //file := "/tmp/a03f82a8-deda-11f0-9c0b-00155d5ef0f9796292421/zip/供应商结算交易流水20251222100502.zip"
|
||||||
|
//
|
||||||
host := "http://192.168.6.194:8004"
|
// host := "http://192.168.6.194:8004"
|
||||||
sys := "crmApi"
|
// sys := "crmApi"
|
||||||
business := "download"
|
// business := "download"
|
||||||
fieldFormName := "file"
|
// fieldFormName := "file"
|
||||||
resp, err := attachment.Upload(host, file, sys, business, fieldFormName)
|
// resp, err := attachment.Upload(host, file, sys, business, fieldFormName)
|
||||||
if err != nil {
|
// if err != nil {
|
||||||
t.Log(err)
|
// t.Log(err)
|
||||||
}
|
// }
|
||||||
url := attachmentsdk.GeneratePreviewPrivateUrl(host, "", resp.Url, "", strings.TrimSuffix("供应商结算交易流水20251222100502", ".zip"), time.Now().Unix()+300)
|
// url := attachmentsdk.GeneratePreviewPrivateUrl(host, "", resp.Url, "", strings.TrimSuffix("供应商结算交易流水20251222100502", ".zip"), time.Now().Unix()+300)
|
||||||
t.Log(url, err)
|
// t.Log(url, err)
|
||||||
}
|
//}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
Name = "test"
|
Name = "test"
|
||||||
|
|
|
||||||
2
go.mod
2
go.mod
|
|
@ -3,7 +3,7 @@ module gitea.cdlsxd.cn/self-tools/l-export-async
|
||||||
go 1.21.10
|
go 1.21.10
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/duke-git/lancet/v2 v2.2.8
|
github.com/duke-git/lancet/v2 v2.3.8
|
||||||
github.com/google/uuid v1.3.0
|
github.com/google/uuid v1.3.0
|
||||||
github.com/pkg/errors v0.9.1
|
github.com/pkg/errors v0.9.1
|
||||||
github.com/redis/go-redis/v9 v9.17.2
|
github.com/redis/go-redis/v9 v9.17.2
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue