Compare commits
3 Commits
| Author | SHA1 | Date |
|---|---|---|
|
|
ae41655c5d | |
|
|
9cf8777a16 | |
|
|
b262190ca1 |
|
|
@ -109,7 +109,7 @@ func Upload(host, filePath, system, business, fieldFormName string) (*UploadResp
|
|||
_ = retry.Retry(func() error {
|
||||
err = requestHttp()
|
||||
return err
|
||||
}, retry.RetryTimes(5), retry.RetryDuration(time.Second*3))
|
||||
}, retry.RetryTimes(5))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
11
entity.go
11
entity.go
|
|
@ -23,10 +23,11 @@ type Uploader struct {
|
|||
|
||||
type ExportOption func(*ExportAsync)
|
||||
|
||||
// WithCustomUploader 自定义上传配置
|
||||
func WithCustomUploader(sys string, business string, fieldFormName string) ExportOption {
|
||||
// WithCustomOssUploader 自定义上传配置
|
||||
func WithCustomOssUploader(host string, sys string, business string, fieldFormName string) ExportOption {
|
||||
return func(b *ExportAsync) {
|
||||
b.uploader = &Uploader{
|
||||
Host: host,
|
||||
System: sys,
|
||||
Business: business,
|
||||
FieldFormName: fieldFormName,
|
||||
|
|
@ -146,6 +147,12 @@ func WithTimeRange(fetcher TimeRangeDataFetcher, startTime time.Time, timeRange
|
|||
}
|
||||
}
|
||||
|
||||
func WithCustomCacheDir(dir string) ExportOption {
|
||||
return func(e *ExportAsync) {
|
||||
e.cacheDir = dir
|
||||
}
|
||||
}
|
||||
|
||||
type Task struct {
|
||||
Id string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
|
|
|
|||
|
|
@ -11,11 +11,6 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
|
||||
"gitea.cdlsxd.cn/self-tools/l-export-async/attachment"
|
||||
"gitea.cdlsxd.cn/self-tools/l-export-async/coroutine"
|
||||
"github.com/google/uuid"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
|
@ -23,24 +18,26 @@ import (
|
|||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"gitea.cdlsxd.cn/self-tools/l-export-async/attachment"
|
||||
"gitea.cdlsxd.cn/self-tools/l-export-async/coroutine"
|
||||
"github.com/google/uuid"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
var exportAsyncPool = &sync.Pool{
|
||||
New: func() interface{} {
|
||||
return &ExportAsync{
|
||||
extension: DefaultExtension,
|
||||
sheetName: "Sheet1",
|
||||
sheetName: DefaultSheetName,
|
||||
batchSize: DefaultBatch,
|
||||
maxRowPerFile: DefaultMaxRowPerFile,
|
||||
csvToExcelBatch: DefaultCsvToExcelBatch,
|
||||
uploader: &Uploader{
|
||||
FieldFormName: "file",
|
||||
System: "crmApi",
|
||||
Business: "download",
|
||||
},
|
||||
task: &Task{},
|
||||
workerNum: DefaultWorkNum, //runtime.NumCPU() * 2,
|
||||
logTool: NewLogPrint(nil),
|
||||
cacheDir: DefaultCacheDir,
|
||||
}
|
||||
},
|
||||
}
|
||||
|
|
@ -51,15 +48,11 @@ var (
|
|||
DefaultMaxRowPerFile = 100000 ////每个Xlsx的行数,默认10000行->WithMaxRowPerFile
|
||||
DefaultCsvToExcelBatch = 1000 ////csv转excel的批量写入缓冲区大小,逐行写入设置为1000->WithCustomBufferSize
|
||||
DefaultWorkNum = 1 // 并发协程数(务必大于1),默认runtime.NumCPU() * 2->WithCustomWorkNum
|
||||
ProcessLimit = 1000 //全局并行导出任务上限
|
||||
DefaultUploader = &Uploader{
|
||||
FieldFormName: "file",
|
||||
System: "crmApi",
|
||||
Business: "download",
|
||||
}
|
||||
DefaultExtension = ".xlsx"
|
||||
DefaultSheetName = "Sheet1"
|
||||
//SameTaskProcessLimit = 1 //单任务并行导出上限,必须小于ProcessLimit
|
||||
DefaultCacheDir = "/tmp"
|
||||
ProcessLimit = 0 //全局并行导出任务上限
|
||||
|
||||
)
|
||||
|
||||
// ExportAsync 异步导出任务配置->默认配置往上看
|
||||
|
|
@ -87,7 +80,8 @@ type ExportAsync struct {
|
|||
//2.当workerNum>1时,因为线程的乱序(虽然可以通过通道解决,但是就没有线程存在的意义了),会导致查询很多空值出来
|
||||
//比如:如果dataProvider走的mysql,那么会执行很多空查询,增加数据库压力
|
||||
dataCount int
|
||||
|
||||
//导出文件存储位置->默认/tmp ->WithCustomCacheDir
|
||||
cacheDir string
|
||||
//日志输出->WithLogPrint
|
||||
logTool LogTool
|
||||
// Excel 表头
|
||||
|
|
@ -115,7 +109,6 @@ type ExportAsync struct {
|
|||
func NewExportAsync(
|
||||
fileName string,
|
||||
header []string,
|
||||
domain string,
|
||||
TaskSaveTool TaskSaveTool,
|
||||
args ...ExportOption,
|
||||
) *ExportAsync {
|
||||
|
|
@ -123,7 +116,6 @@ func NewExportAsync(
|
|||
exporter.fileName = fileName
|
||||
exporter.header = header
|
||||
exporter.taskSaveTool = TaskSaveTool
|
||||
exporter.uploader.Host = domain
|
||||
exporter.task.Name = fileName
|
||||
for _, arg := range args {
|
||||
arg(exporter)
|
||||
|
|
@ -131,16 +123,16 @@ func NewExportAsync(
|
|||
return exporter
|
||||
}
|
||||
|
||||
func (e *ExportAsync) Run(ctx context.Context) (string, error) {
|
||||
func (e *ExportAsync) Run(ctx context.Context) (task_id string, cacheDir string, err error) {
|
||||
|
||||
//新建任务
|
||||
if e.pageStrategy == nil {
|
||||
return "", fmt.Errorf("未设置导出方式,导出方式具体参考PageStrategy")
|
||||
return "", "", fmt.Errorf("未设置导出方式,导出方式具体参考PageStrategy")
|
||||
}
|
||||
|
||||
tempDir, err := e.createTask(ctx)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("创建任务失败: %v", err)
|
||||
return "", "", fmt.Errorf("创建任务失败: %v", err)
|
||||
}
|
||||
coroutine.Run(fmt.Sprintf("创建任务导出任务:%s", e.fileName), func() {
|
||||
// 执行导出任务
|
||||
|
|
@ -151,9 +143,11 @@ func (e *ExportAsync) Run(ctx context.Context) (string, error) {
|
|||
e.logTool.Errorf("导出panic:\n任务:%s,错误原因:%s", e.task.Id, _err)
|
||||
}
|
||||
e.release()
|
||||
if e.uploader != nil {
|
||||
if e.uploader.Host != "" {
|
||||
os.RemoveAll(tempDir)
|
||||
}
|
||||
}
|
||||
|
||||
cancel()
|
||||
}()
|
||||
|
|
@ -166,8 +160,8 @@ func (e *ExportAsync) Run(ctx context.Context) (string, error) {
|
|||
e.logTool.Infof("异步导出任务:%s,导出完成,总计导出%d条数据,下载地址:%s", e.task.Id, e.task.RowCount, source)
|
||||
|
||||
})
|
||||
|
||||
return e.task.Id, nil
|
||||
cacheDir, err = e.getBaseCacheDir(ctx)
|
||||
return e.task.Id, cacheDir, err
|
||||
}
|
||||
|
||||
// 添加配置项
|
||||
|
|
@ -198,7 +192,7 @@ func (e *ExportAsync) export(ctx context.Context, tempDir string) (source string
|
|||
if err = e.folderToZip(excelsDir, source); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if e.uploader != nil {
|
||||
if len(e.uploader.Host) > 0 {
|
||||
e.logTool.Infof("异步导出任务:%s,开始上传", e.task.Id)
|
||||
source, err = e.upload(source)
|
||||
|
|
@ -206,6 +200,8 @@ func (e *ExportAsync) export(ctx context.Context, tempDir string) (source string
|
|||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
e.task.Source = source
|
||||
e.processAdd(ctx, ATT.int())
|
||||
return
|
||||
|
|
@ -610,7 +606,7 @@ func (e *ExportAsync) createTask(ctx context.Context) (tempDir string, err error
|
|||
return "", fmt.Errorf("初始化任务失败: %w", err)
|
||||
}
|
||||
e.task.Id = uid.String()
|
||||
tempDir, err = e.createDefaultDir(ctx)
|
||||
tempDir, err = e.getCacheDir(ctx)
|
||||
if err != nil {
|
||||
|
||||
return "", fmt.Errorf("初始化默认文件夹失败: %w", err)
|
||||
|
|
@ -635,6 +631,9 @@ func (e *ExportAsync) getTaskId(ctx context.Context) (uid uuid.UUID, err error)
|
|||
}
|
||||
|
||||
func (e *ExportAsync) CheckAndIncrementTaskCount(ctx context.Context, key string, limit int, limitType string) error {
|
||||
if limit == 0 {
|
||||
return nil
|
||||
}
|
||||
count, err := e.getAndParseTaskCount(ctx, key)
|
||||
if err != nil {
|
||||
return fmt.Errorf("获取%s数量失败: %w", limitType, err)
|
||||
|
|
@ -693,12 +692,9 @@ func (e *ExportAsync) processAdd(ctx context.Context, addNum int32) {
|
|||
return
|
||||
}
|
||||
|
||||
func (e *ExportAsync) createDefaultDir(ctx context.Context) (string, error) {
|
||||
func (e *ExportAsync) getCacheDir(ctx context.Context) (string, error) {
|
||||
// 创建临时目录
|
||||
tempDir, err := os.MkdirTemp("", e.task.Id)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("创建临时目录失败: %v", err)
|
||||
}
|
||||
tempDir, err := e.getBaseCacheDir(ctx)
|
||||
//csv
|
||||
if err = os.Mkdir(e.dirCsv(tempDir), 0755); err != nil {
|
||||
return "", fmt.Errorf("创建csv目录失败: %v", err)
|
||||
|
|
@ -714,6 +710,15 @@ func (e *ExportAsync) createDefaultDir(ctx context.Context) (string, error) {
|
|||
return tempDir, nil
|
||||
}
|
||||
|
||||
func (e *ExportAsync) getBaseCacheDir(ctx context.Context) (string, error) {
|
||||
tempDir := filepath.Join(e.cacheDir, e.task.Id)
|
||||
err := os.MkdirAll(tempDir, 0755) // 使用标准的权限模式
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("创建临时目录失败: %v", err)
|
||||
}
|
||||
return tempDir, nil
|
||||
}
|
||||
|
||||
func (e *ExportAsync) dirCsv(tempDir string) string {
|
||||
return tempDir + "/csv/"
|
||||
}
|
||||
|
|
@ -766,7 +771,7 @@ func (e *ExportAsync) release() {
|
|||
e.csvToExcelBatch = DefaultCsvToExcelBatch
|
||||
e.task = &Task{}
|
||||
e.workerNum = DefaultWorkNum
|
||||
e.uploader = DefaultUploader
|
||||
e.cacheDir = DefaultCacheDir
|
||||
e.logTool = NewLogPrint(nil)
|
||||
e.sheetName = DefaultSheetName
|
||||
exportAsyncPool.Put(e)
|
||||
|
|
|
|||
|
|
@ -2,18 +2,11 @@ package l_export_async
|
|||
|
||||
import (
|
||||
"context"
|
||||
"finance/internal/data"
|
||||
"finance/internal/initialize"
|
||||
"finance/internal/pkg"
|
||||
"finance/internal/pkg/helper/attachment"
|
||||
log2 "finance/internal/pkg/log"
|
||||
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
attachmentsdk "codeup.aliyun.com/5f9118049cffa29cfdd3be1c/attachment-sdk"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func Test_Merge(t *testing.T) {
|
||||
|
|
@ -37,24 +30,24 @@ func Test_Tsk(t *testing.T) {
|
|||
t.Log(NewTask(NewRedisTaskStore(dat().Rdb)).GetTaskInfo(context.Background(), taskId))
|
||||
}
|
||||
|
||||
func Test_Upload(t *testing.T) {
|
||||
task_id := "c0abe2b3-dede-11f0-9178-00155d5ef0f92369485728"
|
||||
filename := "供应商结算交易流水20251222100502"
|
||||
|
||||
file := "/tmp/" + task_id + "/zip/" + filename + ".zip"
|
||||
//file := "/tmp/a03f82a8-deda-11f0-9c0b-00155d5ef0f9796292421/zip/供应商结算交易流水20251222100502.zip"
|
||||
|
||||
host := "http://192.168.6.194:8004"
|
||||
sys := "crmApi"
|
||||
business := "download"
|
||||
fieldFormName := "file"
|
||||
resp, err := attachment.Upload(host, file, sys, business, fieldFormName)
|
||||
if err != nil {
|
||||
t.Log(err)
|
||||
}
|
||||
url := attachmentsdk.GeneratePreviewPrivateUrl(host, "", resp.Url, "", strings.TrimSuffix("供应商结算交易流水20251222100502", ".zip"), time.Now().Unix()+300)
|
||||
t.Log(url, err)
|
||||
}
|
||||
//func Test_Upload(t *testing.T) {
|
||||
// task_id := "c0abe2b3-dede-11f0-9178-00155d5ef0f92369485728"
|
||||
// filename := "供应商结算交易流水20251222100502"
|
||||
//
|
||||
// file := "/tmp/" + task_id + "/zip/" + filename + ".zip"
|
||||
// //file := "/tmp/a03f82a8-deda-11f0-9c0b-00155d5ef0f9796292421/zip/供应商结算交易流水20251222100502.zip"
|
||||
//
|
||||
// host := "http://192.168.6.194:8004"
|
||||
// sys := "crmApi"
|
||||
// business := "download"
|
||||
// fieldFormName := "file"
|
||||
// resp, err := attachment.Upload(host, file, sys, business, fieldFormName)
|
||||
// if err != nil {
|
||||
// t.Log(err)
|
||||
// }
|
||||
// url := attachmentsdk.GeneratePreviewPrivateUrl(host, "", resp.Url, "", strings.TrimSuffix("供应商结算交易流水20251222100502", ".zip"), time.Now().Unix()+300)
|
||||
// t.Log(url, err)
|
||||
//}
|
||||
|
||||
var (
|
||||
Name = "test"
|
||||
|
|
|
|||
Loading…
Reference in New Issue