Compare commits
No commits in common. "master" and "v1.0.04" have entirely different histories.
|
|
@ -109,7 +109,7 @@ func Upload(host, filePath, system, business, fieldFormName string) (*UploadResp
|
||||||
_ = retry.Retry(func() error {
|
_ = retry.Retry(func() error {
|
||||||
err = requestHttp()
|
err = requestHttp()
|
||||||
return err
|
return err
|
||||||
}, retry.RetryTimes(5))
|
}, retry.RetryTimes(5), retry.RetryDuration(time.Second*3))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
11
entity.go
11
entity.go
|
|
@ -23,11 +23,10 @@ type Uploader struct {
|
||||||
|
|
||||||
type ExportOption func(*ExportAsync)
|
type ExportOption func(*ExportAsync)
|
||||||
|
|
||||||
// WithCustomOssUploader 自定义上传配置
|
// WithCustomUploader 自定义上传配置
|
||||||
func WithCustomOssUploader(host string, sys string, business string, fieldFormName string) ExportOption {
|
func WithCustomUploader(sys string, business string, fieldFormName string) ExportOption {
|
||||||
return func(b *ExportAsync) {
|
return func(b *ExportAsync) {
|
||||||
b.uploader = &Uploader{
|
b.uploader = &Uploader{
|
||||||
Host: host,
|
|
||||||
System: sys,
|
System: sys,
|
||||||
Business: business,
|
Business: business,
|
||||||
FieldFormName: fieldFormName,
|
FieldFormName: fieldFormName,
|
||||||
|
|
@ -147,12 +146,6 @@ func WithTimeRange(fetcher TimeRangeDataFetcher, startTime time.Time, timeRange
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func WithCustomCacheDir(dir string) ExportOption {
|
|
||||||
return func(e *ExportAsync) {
|
|
||||||
e.cacheDir = dir
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type Task struct {
|
type Task struct {
|
||||||
Id string `json:"id"`
|
Id string `json:"id"`
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,11 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"gitea.cdlsxd.cn/self-tools/l-export-async/attachment"
|
||||||
|
"gitea.cdlsxd.cn/self-tools/l-export-async/coroutine"
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
@ -18,26 +23,24 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"gitea.cdlsxd.cn/self-tools/l-export-async/attachment"
|
|
||||||
"gitea.cdlsxd.cn/self-tools/l-export-async/coroutine"
|
|
||||||
"github.com/google/uuid"
|
|
||||||
"github.com/redis/go-redis/v9"
|
|
||||||
"golang.org/x/sync/errgroup"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var exportAsyncPool = &sync.Pool{
|
var exportAsyncPool = &sync.Pool{
|
||||||
New: func() interface{} {
|
New: func() interface{} {
|
||||||
return &ExportAsync{
|
return &ExportAsync{
|
||||||
extension: DefaultExtension,
|
extension: DefaultExtension,
|
||||||
sheetName: DefaultSheetName,
|
sheetName: "Sheet1",
|
||||||
batchSize: DefaultBatch,
|
batchSize: DefaultBatch,
|
||||||
maxRowPerFile: DefaultMaxRowPerFile,
|
maxRowPerFile: DefaultMaxRowPerFile,
|
||||||
csvToExcelBatch: DefaultCsvToExcelBatch,
|
csvToExcelBatch: DefaultCsvToExcelBatch,
|
||||||
|
uploader: &Uploader{
|
||||||
|
FieldFormName: "file",
|
||||||
|
System: "crmApi",
|
||||||
|
Business: "download",
|
||||||
|
},
|
||||||
task: &Task{},
|
task: &Task{},
|
||||||
workerNum: DefaultWorkNum, //runtime.NumCPU() * 2,
|
workerNum: DefaultWorkNum, //runtime.NumCPU() * 2,
|
||||||
logTool: NewLogPrint(nil),
|
logTool: NewLogPrint(nil),
|
||||||
cacheDir: DefaultCacheDir,
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
@ -48,11 +51,15 @@ var (
|
||||||
DefaultMaxRowPerFile = 100000 ////每个Xlsx的行数,默认10000行->WithMaxRowPerFile
|
DefaultMaxRowPerFile = 100000 ////每个Xlsx的行数,默认10000行->WithMaxRowPerFile
|
||||||
DefaultCsvToExcelBatch = 1000 ////csv转excel的批量写入缓冲区大小,逐行写入设置为1000->WithCustomBufferSize
|
DefaultCsvToExcelBatch = 1000 ////csv转excel的批量写入缓冲区大小,逐行写入设置为1000->WithCustomBufferSize
|
||||||
DefaultWorkNum = 1 // 并发协程数(务必大于1),默认runtime.NumCPU() * 2->WithCustomWorkNum
|
DefaultWorkNum = 1 // 并发协程数(务必大于1),默认runtime.NumCPU() * 2->WithCustomWorkNum
|
||||||
|
ProcessLimit = 1000 //全局并行导出任务上限
|
||||||
|
DefaultUploader = &Uploader{
|
||||||
|
FieldFormName: "file",
|
||||||
|
System: "crmApi",
|
||||||
|
Business: "download",
|
||||||
|
}
|
||||||
DefaultExtension = ".xlsx"
|
DefaultExtension = ".xlsx"
|
||||||
DefaultSheetName = "Sheet1"
|
DefaultSheetName = "Sheet1"
|
||||||
DefaultCacheDir = "/tmp"
|
//SameTaskProcessLimit = 1 //单任务并行导出上限,必须小于ProcessLimit
|
||||||
ProcessLimit = 0 //全局并行导出任务上限
|
|
||||||
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ExportAsync 异步导出任务配置->默认配置往上看
|
// ExportAsync 异步导出任务配置->默认配置往上看
|
||||||
|
|
@ -80,8 +87,7 @@ type ExportAsync struct {
|
||||||
//2.当workerNum>1时,因为线程的乱序(虽然可以通过通道解决,但是就没有线程存在的意义了),会导致查询很多空值出来
|
//2.当workerNum>1时,因为线程的乱序(虽然可以通过通道解决,但是就没有线程存在的意义了),会导致查询很多空值出来
|
||||||
//比如:如果dataProvider走的mysql,那么会执行很多空查询,增加数据库压力
|
//比如:如果dataProvider走的mysql,那么会执行很多空查询,增加数据库压力
|
||||||
dataCount int
|
dataCount int
|
||||||
//导出文件存储位置->默认/tmp ->WithCustomCacheDir
|
|
||||||
cacheDir string
|
|
||||||
//日志输出->WithLogPrint
|
//日志输出->WithLogPrint
|
||||||
logTool LogTool
|
logTool LogTool
|
||||||
// Excel 表头
|
// Excel 表头
|
||||||
|
|
@ -109,6 +115,7 @@ type ExportAsync struct {
|
||||||
func NewExportAsync(
|
func NewExportAsync(
|
||||||
fileName string,
|
fileName string,
|
||||||
header []string,
|
header []string,
|
||||||
|
domain string,
|
||||||
TaskSaveTool TaskSaveTool,
|
TaskSaveTool TaskSaveTool,
|
||||||
args ...ExportOption,
|
args ...ExportOption,
|
||||||
) *ExportAsync {
|
) *ExportAsync {
|
||||||
|
|
@ -116,6 +123,7 @@ func NewExportAsync(
|
||||||
exporter.fileName = fileName
|
exporter.fileName = fileName
|
||||||
exporter.header = header
|
exporter.header = header
|
||||||
exporter.taskSaveTool = TaskSaveTool
|
exporter.taskSaveTool = TaskSaveTool
|
||||||
|
exporter.uploader.Host = domain
|
||||||
exporter.task.Name = fileName
|
exporter.task.Name = fileName
|
||||||
for _, arg := range args {
|
for _, arg := range args {
|
||||||
arg(exporter)
|
arg(exporter)
|
||||||
|
|
@ -123,16 +131,16 @@ func NewExportAsync(
|
||||||
return exporter
|
return exporter
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ExportAsync) Run(ctx context.Context) (task_id string, cacheDir string, err error) {
|
func (e *ExportAsync) Run(ctx context.Context) (string, error) {
|
||||||
|
|
||||||
//新建任务
|
//新建任务
|
||||||
if e.pageStrategy == nil {
|
if e.pageStrategy == nil {
|
||||||
return "", "", fmt.Errorf("未设置导出方式,导出方式具体参考PageStrategy")
|
return "", fmt.Errorf("未设置导出方式,导出方式具体参考PageStrategy")
|
||||||
}
|
}
|
||||||
|
|
||||||
tempDir, err := e.createTask(ctx)
|
tempDir, err := e.createTask(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", "", fmt.Errorf("创建任务失败: %v", err)
|
return "", fmt.Errorf("创建任务失败: %v", err)
|
||||||
}
|
}
|
||||||
coroutine.Run(fmt.Sprintf("创建任务导出任务:%s", e.fileName), func() {
|
coroutine.Run(fmt.Sprintf("创建任务导出任务:%s", e.fileName), func() {
|
||||||
// 执行导出任务
|
// 执行导出任务
|
||||||
|
|
@ -143,11 +151,9 @@ func (e *ExportAsync) Run(ctx context.Context) (task_id string, cacheDir string,
|
||||||
e.logTool.Errorf("导出panic:\n任务:%s,错误原因:%s", e.task.Id, _err)
|
e.logTool.Errorf("导出panic:\n任务:%s,错误原因:%s", e.task.Id, _err)
|
||||||
}
|
}
|
||||||
e.release()
|
e.release()
|
||||||
if e.uploader != nil {
|
|
||||||
if e.uploader.Host != "" {
|
if e.uploader.Host != "" {
|
||||||
os.RemoveAll(tempDir)
|
os.RemoveAll(tempDir)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
cancel()
|
cancel()
|
||||||
}()
|
}()
|
||||||
|
|
@ -160,8 +166,8 @@ func (e *ExportAsync) Run(ctx context.Context) (task_id string, cacheDir string,
|
||||||
e.logTool.Infof("异步导出任务:%s,导出完成,总计导出%d条数据,下载地址:%s", e.task.Id, e.task.RowCount, source)
|
e.logTool.Infof("异步导出任务:%s,导出完成,总计导出%d条数据,下载地址:%s", e.task.Id, e.task.RowCount, source)
|
||||||
|
|
||||||
})
|
})
|
||||||
cacheDir, err = e.getBaseCacheDir(ctx)
|
|
||||||
return e.task.Id, cacheDir, err
|
return e.task.Id, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// 添加配置项
|
// 添加配置项
|
||||||
|
|
@ -192,7 +198,7 @@ func (e *ExportAsync) export(ctx context.Context, tempDir string) (source string
|
||||||
if err = e.folderToZip(excelsDir, source); err != nil {
|
if err = e.folderToZip(excelsDir, source); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if e.uploader != nil {
|
|
||||||
if len(e.uploader.Host) > 0 {
|
if len(e.uploader.Host) > 0 {
|
||||||
e.logTool.Infof("异步导出任务:%s,开始上传", e.task.Id)
|
e.logTool.Infof("异步导出任务:%s,开始上传", e.task.Id)
|
||||||
source, err = e.upload(source)
|
source, err = e.upload(source)
|
||||||
|
|
@ -200,8 +206,6 @@ func (e *ExportAsync) export(ctx context.Context, tempDir string) (source string
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
e.task.Source = source
|
e.task.Source = source
|
||||||
e.processAdd(ctx, ATT.int())
|
e.processAdd(ctx, ATT.int())
|
||||||
return
|
return
|
||||||
|
|
@ -606,7 +610,7 @@ func (e *ExportAsync) createTask(ctx context.Context) (tempDir string, err error
|
||||||
return "", fmt.Errorf("初始化任务失败: %w", err)
|
return "", fmt.Errorf("初始化任务失败: %w", err)
|
||||||
}
|
}
|
||||||
e.task.Id = uid.String()
|
e.task.Id = uid.String()
|
||||||
tempDir, err = e.getCacheDir(ctx)
|
tempDir, err = e.createDefaultDir(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
||||||
return "", fmt.Errorf("初始化默认文件夹失败: %w", err)
|
return "", fmt.Errorf("初始化默认文件夹失败: %w", err)
|
||||||
|
|
@ -631,9 +635,6 @@ func (e *ExportAsync) getTaskId(ctx context.Context) (uid uuid.UUID, err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ExportAsync) CheckAndIncrementTaskCount(ctx context.Context, key string, limit int, limitType string) error {
|
func (e *ExportAsync) CheckAndIncrementTaskCount(ctx context.Context, key string, limit int, limitType string) error {
|
||||||
if limit == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
count, err := e.getAndParseTaskCount(ctx, key)
|
count, err := e.getAndParseTaskCount(ctx, key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("获取%s数量失败: %w", limitType, err)
|
return fmt.Errorf("获取%s数量失败: %w", limitType, err)
|
||||||
|
|
@ -692,9 +693,12 @@ func (e *ExportAsync) processAdd(ctx context.Context, addNum int32) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ExportAsync) getCacheDir(ctx context.Context) (string, error) {
|
func (e *ExportAsync) createDefaultDir(ctx context.Context) (string, error) {
|
||||||
// 创建临时目录
|
// 创建临时目录
|
||||||
tempDir, err := e.getBaseCacheDir(ctx)
|
tempDir, err := os.MkdirTemp("", e.task.Id)
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("创建临时目录失败: %v", err)
|
||||||
|
}
|
||||||
//csv
|
//csv
|
||||||
if err = os.Mkdir(e.dirCsv(tempDir), 0755); err != nil {
|
if err = os.Mkdir(e.dirCsv(tempDir), 0755); err != nil {
|
||||||
return "", fmt.Errorf("创建csv目录失败: %v", err)
|
return "", fmt.Errorf("创建csv目录失败: %v", err)
|
||||||
|
|
@ -710,15 +714,6 @@ func (e *ExportAsync) getCacheDir(ctx context.Context) (string, error) {
|
||||||
return tempDir, nil
|
return tempDir, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ExportAsync) getBaseCacheDir(ctx context.Context) (string, error) {
|
|
||||||
tempDir := filepath.Join(e.cacheDir, e.task.Id)
|
|
||||||
err := os.MkdirAll(tempDir, 0755) // 使用标准的权限模式
|
|
||||||
if err != nil {
|
|
||||||
return "", fmt.Errorf("创建临时目录失败: %v", err)
|
|
||||||
}
|
|
||||||
return tempDir, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *ExportAsync) dirCsv(tempDir string) string {
|
func (e *ExportAsync) dirCsv(tempDir string) string {
|
||||||
return tempDir + "/csv/"
|
return tempDir + "/csv/"
|
||||||
}
|
}
|
||||||
|
|
@ -771,7 +766,7 @@ func (e *ExportAsync) release() {
|
||||||
e.csvToExcelBatch = DefaultCsvToExcelBatch
|
e.csvToExcelBatch = DefaultCsvToExcelBatch
|
||||||
e.task = &Task{}
|
e.task = &Task{}
|
||||||
e.workerNum = DefaultWorkNum
|
e.workerNum = DefaultWorkNum
|
||||||
e.cacheDir = DefaultCacheDir
|
e.uploader = DefaultUploader
|
||||||
e.logTool = NewLogPrint(nil)
|
e.logTool = NewLogPrint(nil)
|
||||||
e.sheetName = DefaultSheetName
|
e.sheetName = DefaultSheetName
|
||||||
exportAsyncPool.Put(e)
|
exportAsyncPool.Put(e)
|
||||||
|
|
|
||||||
|
|
@ -2,11 +2,18 @@ package l_export_async
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"finance/internal/data"
|
||||||
|
"finance/internal/initialize"
|
||||||
|
"finance/internal/pkg"
|
||||||
|
"finance/internal/pkg/helper/attachment"
|
||||||
|
log2 "finance/internal/pkg/log"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
attachmentsdk "codeup.aliyun.com/5f9118049cffa29cfdd3be1c/attachment-sdk"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Test_Merge(t *testing.T) {
|
func Test_Merge(t *testing.T) {
|
||||||
|
|
@ -30,24 +37,24 @@ func Test_Tsk(t *testing.T) {
|
||||||
t.Log(NewTask(NewRedisTaskStore(dat().Rdb)).GetTaskInfo(context.Background(), taskId))
|
t.Log(NewTask(NewRedisTaskStore(dat().Rdb)).GetTaskInfo(context.Background(), taskId))
|
||||||
}
|
}
|
||||||
|
|
||||||
//func Test_Upload(t *testing.T) {
|
func Test_Upload(t *testing.T) {
|
||||||
// task_id := "c0abe2b3-dede-11f0-9178-00155d5ef0f92369485728"
|
task_id := "c0abe2b3-dede-11f0-9178-00155d5ef0f92369485728"
|
||||||
// filename := "供应商结算交易流水20251222100502"
|
filename := "供应商结算交易流水20251222100502"
|
||||||
//
|
|
||||||
// file := "/tmp/" + task_id + "/zip/" + filename + ".zip"
|
file := "/tmp/" + task_id + "/zip/" + filename + ".zip"
|
||||||
// //file := "/tmp/a03f82a8-deda-11f0-9c0b-00155d5ef0f9796292421/zip/供应商结算交易流水20251222100502.zip"
|
//file := "/tmp/a03f82a8-deda-11f0-9c0b-00155d5ef0f9796292421/zip/供应商结算交易流水20251222100502.zip"
|
||||||
//
|
|
||||||
// host := "http://192.168.6.194:8004"
|
host := "http://192.168.6.194:8004"
|
||||||
// sys := "crmApi"
|
sys := "crmApi"
|
||||||
// business := "download"
|
business := "download"
|
||||||
// fieldFormName := "file"
|
fieldFormName := "file"
|
||||||
// resp, err := attachment.Upload(host, file, sys, business, fieldFormName)
|
resp, err := attachment.Upload(host, file, sys, business, fieldFormName)
|
||||||
// if err != nil {
|
if err != nil {
|
||||||
// t.Log(err)
|
t.Log(err)
|
||||||
// }
|
}
|
||||||
// url := attachmentsdk.GeneratePreviewPrivateUrl(host, "", resp.Url, "", strings.TrimSuffix("供应商结算交易流水20251222100502", ".zip"), time.Now().Unix()+300)
|
url := attachmentsdk.GeneratePreviewPrivateUrl(host, "", resp.Url, "", strings.TrimSuffix("供应商结算交易流水20251222100502", ".zip"), time.Now().Unix()+300)
|
||||||
// t.Log(url, err)
|
t.Log(url, err)
|
||||||
//}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
Name = "test"
|
Name = "test"
|
||||||
|
|
|
||||||
2
go.mod
2
go.mod
|
|
@ -3,7 +3,7 @@ module gitea.cdlsxd.cn/self-tools/l-export-async
|
||||||
go 1.21.10
|
go 1.21.10
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/duke-git/lancet/v2 v2.3.8
|
github.com/duke-git/lancet/v2 v2.2.8
|
||||||
github.com/google/uuid v1.3.0
|
github.com/google/uuid v1.3.0
|
||||||
github.com/pkg/errors v0.9.1
|
github.com/pkg/errors v0.9.1
|
||||||
github.com/redis/go-redis/v9 v9.17.2
|
github.com/redis/go-redis/v9 v9.17.2
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue