Compare commits

...

3 Commits

5 changed files with 82 additions and 77 deletions

View File

@ -109,7 +109,7 @@ func Upload(host, filePath, system, business, fieldFormName string) (*UploadResp
_ = retry.Retry(func() error { _ = retry.Retry(func() error {
err = requestHttp() err = requestHttp()
return err return err
}, retry.RetryTimes(5), retry.RetryDuration(time.Second*3)) }, retry.RetryTimes(5))
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -23,10 +23,11 @@ type Uploader struct {
type ExportOption func(*ExportAsync) type ExportOption func(*ExportAsync)
// WithCustomUploader 自定义上传配置 // WithCustomOssUploader 自定义上传配置
func WithCustomUploader(sys string, business string, fieldFormName string) ExportOption { func WithCustomOssUploader(host string, sys string, business string, fieldFormName string) ExportOption {
return func(b *ExportAsync) { return func(b *ExportAsync) {
b.uploader = &Uploader{ b.uploader = &Uploader{
Host: host,
System: sys, System: sys,
Business: business, Business: business,
FieldFormName: fieldFormName, FieldFormName: fieldFormName,
@ -146,6 +147,12 @@ func WithTimeRange(fetcher TimeRangeDataFetcher, startTime time.Time, timeRange
} }
} }
func WithCustomCacheDir(dir string) ExportOption {
return func(e *ExportAsync) {
e.cacheDir = dir
}
}
type Task struct { type Task struct {
Id string `json:"id"` Id string `json:"id"`
Name string `json:"name"` Name string `json:"name"`

View File

@ -11,11 +11,6 @@ import (
"strconv" "strconv"
"strings" "strings"
"gitea.cdlsxd.cn/self-tools/l-export-async/attachment"
"gitea.cdlsxd.cn/self-tools/l-export-async/coroutine"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
"golang.org/x/sync/errgroup"
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
@ -23,24 +18,26 @@ import (
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"gitea.cdlsxd.cn/self-tools/l-export-async/attachment"
"gitea.cdlsxd.cn/self-tools/l-export-async/coroutine"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
"golang.org/x/sync/errgroup"
) )
var exportAsyncPool = &sync.Pool{ var exportAsyncPool = &sync.Pool{
New: func() interface{} { New: func() interface{} {
return &ExportAsync{ return &ExportAsync{
extension: DefaultExtension, extension: DefaultExtension,
sheetName: "Sheet1", sheetName: DefaultSheetName,
batchSize: DefaultBatch, batchSize: DefaultBatch,
maxRowPerFile: DefaultMaxRowPerFile, maxRowPerFile: DefaultMaxRowPerFile,
csvToExcelBatch: DefaultCsvToExcelBatch, csvToExcelBatch: DefaultCsvToExcelBatch,
uploader: &Uploader{ task: &Task{},
FieldFormName: "file", workerNum: DefaultWorkNum, //runtime.NumCPU() * 2,
System: "crmApi", logTool: NewLogPrint(nil),
Business: "download", cacheDir: DefaultCacheDir,
},
task: &Task{},
workerNum: DefaultWorkNum, //runtime.NumCPU() * 2,
logTool: NewLogPrint(nil),
} }
}, },
} }
@ -51,15 +48,11 @@ var (
DefaultMaxRowPerFile = 100000 ////每个Xlsx的行数默认10000行->WithMaxRowPerFile DefaultMaxRowPerFile = 100000 ////每个Xlsx的行数默认10000行->WithMaxRowPerFile
DefaultCsvToExcelBatch = 1000 ////csv转excel的批量写入缓冲区大小,逐行写入设置为1000->WithCustomBufferSize DefaultCsvToExcelBatch = 1000 ////csv转excel的批量写入缓冲区大小,逐行写入设置为1000->WithCustomBufferSize
DefaultWorkNum = 1 // 并发协程数务必大于1,默认runtime.NumCPU() * 2->WithCustomWorkNum DefaultWorkNum = 1 // 并发协程数务必大于1,默认runtime.NumCPU() * 2->WithCustomWorkNum
ProcessLimit = 1000 //全局并行导出任务上限 DefaultExtension = ".xlsx"
DefaultUploader = &Uploader{ DefaultSheetName = "Sheet1"
FieldFormName: "file", DefaultCacheDir = "/tmp"
System: "crmApi", ProcessLimit = 0 //全局并行导出任务上限
Business: "download",
}
DefaultExtension = ".xlsx"
DefaultSheetName = "Sheet1"
//SameTaskProcessLimit = 1 //单任务并行导出上限,必须小于ProcessLimit
) )
// ExportAsync 异步导出任务配置->默认配置往上看 // ExportAsync 异步导出任务配置->默认配置往上看
@ -87,7 +80,8 @@ type ExportAsync struct {
//2.当workerNum>1时因为线程的乱序虽然可以通过通道解决但是就没有线程存在的意义了会导致查询很多空值出来 //2.当workerNum>1时因为线程的乱序虽然可以通过通道解决但是就没有线程存在的意义了会导致查询很多空值出来
//比如如果dataProvider走的mysql,那么会执行很多空查询,增加数据库压力 //比如如果dataProvider走的mysql,那么会执行很多空查询,增加数据库压力
dataCount int dataCount int
//导出文件存储位置->默认/tmp ->WithCustomCacheDir
cacheDir string
//日志输出->WithLogPrint //日志输出->WithLogPrint
logTool LogTool logTool LogTool
// Excel 表头 // Excel 表头
@ -115,7 +109,6 @@ type ExportAsync struct {
func NewExportAsync( func NewExportAsync(
fileName string, fileName string,
header []string, header []string,
domain string,
TaskSaveTool TaskSaveTool, TaskSaveTool TaskSaveTool,
args ...ExportOption, args ...ExportOption,
) *ExportAsync { ) *ExportAsync {
@ -123,7 +116,6 @@ func NewExportAsync(
exporter.fileName = fileName exporter.fileName = fileName
exporter.header = header exporter.header = header
exporter.taskSaveTool = TaskSaveTool exporter.taskSaveTool = TaskSaveTool
exporter.uploader.Host = domain
exporter.task.Name = fileName exporter.task.Name = fileName
for _, arg := range args { for _, arg := range args {
arg(exporter) arg(exporter)
@ -131,16 +123,16 @@ func NewExportAsync(
return exporter return exporter
} }
func (e *ExportAsync) Run(ctx context.Context) (string, error) { func (e *ExportAsync) Run(ctx context.Context) (task_id string, cacheDir string, err error) {
//新建任务 //新建任务
if e.pageStrategy == nil { if e.pageStrategy == nil {
return "", fmt.Errorf("未设置导出方式导出方式具体参考PageStrategy") return "", "", fmt.Errorf("未设置导出方式导出方式具体参考PageStrategy")
} }
tempDir, err := e.createTask(ctx) tempDir, err := e.createTask(ctx)
if err != nil { if err != nil {
return "", fmt.Errorf("创建任务失败: %v", err) return "", "", fmt.Errorf("创建任务失败: %v", err)
} }
coroutine.Run(fmt.Sprintf("创建任务导出任务:%s", e.fileName), func() { coroutine.Run(fmt.Sprintf("创建任务导出任务:%s", e.fileName), func() {
// 执行导出任务 // 执行导出任务
@ -151,8 +143,10 @@ func (e *ExportAsync) Run(ctx context.Context) (string, error) {
e.logTool.Errorf("导出panic\n任务%s,错误原因:%s", e.task.Id, _err) e.logTool.Errorf("导出panic\n任务%s,错误原因:%s", e.task.Id, _err)
} }
e.release() e.release()
if e.uploader.Host != "" { if e.uploader != nil {
os.RemoveAll(tempDir) if e.uploader.Host != "" {
os.RemoveAll(tempDir)
}
} }
cancel() cancel()
@ -166,8 +160,8 @@ func (e *ExportAsync) Run(ctx context.Context) (string, error) {
e.logTool.Infof("异步导出任务:%s,导出完成,总计导出%d条数据,下载地址:%s", e.task.Id, e.task.RowCount, source) e.logTool.Infof("异步导出任务:%s,导出完成,总计导出%d条数据,下载地址:%s", e.task.Id, e.task.RowCount, source)
}) })
cacheDir, err = e.getBaseCacheDir(ctx)
return e.task.Id, nil return e.task.Id, cacheDir, err
} }
// 添加配置项 // 添加配置项
@ -198,14 +192,16 @@ func (e *ExportAsync) export(ctx context.Context, tempDir string) (source string
if err = e.folderToZip(excelsDir, source); err != nil { if err = e.folderToZip(excelsDir, source); err != nil {
return return
} }
if e.uploader != nil {
if len(e.uploader.Host) > 0 { if len(e.uploader.Host) > 0 {
e.logTool.Infof("异步导出任务:%s,开始上传", e.task.Id) e.logTool.Infof("异步导出任务:%s,开始上传", e.task.Id)
source, err = e.upload(source) source, err = e.upload(source)
if err != nil { if err != nil {
return return
}
} }
} }
e.task.Source = source e.task.Source = source
e.processAdd(ctx, ATT.int()) e.processAdd(ctx, ATT.int())
return return
@ -610,7 +606,7 @@ func (e *ExportAsync) createTask(ctx context.Context) (tempDir string, err error
return "", fmt.Errorf("初始化任务失败: %w", err) return "", fmt.Errorf("初始化任务失败: %w", err)
} }
e.task.Id = uid.String() e.task.Id = uid.String()
tempDir, err = e.createDefaultDir(ctx) tempDir, err = e.getCacheDir(ctx)
if err != nil { if err != nil {
return "", fmt.Errorf("初始化默认文件夹失败: %w", err) return "", fmt.Errorf("初始化默认文件夹失败: %w", err)
@ -635,6 +631,9 @@ func (e *ExportAsync) getTaskId(ctx context.Context) (uid uuid.UUID, err error)
} }
func (e *ExportAsync) CheckAndIncrementTaskCount(ctx context.Context, key string, limit int, limitType string) error { func (e *ExportAsync) CheckAndIncrementTaskCount(ctx context.Context, key string, limit int, limitType string) error {
if limit == 0 {
return nil
}
count, err := e.getAndParseTaskCount(ctx, key) count, err := e.getAndParseTaskCount(ctx, key)
if err != nil { if err != nil {
return fmt.Errorf("获取%s数量失败: %w", limitType, err) return fmt.Errorf("获取%s数量失败: %w", limitType, err)
@ -693,12 +692,9 @@ func (e *ExportAsync) processAdd(ctx context.Context, addNum int32) {
return return
} }
func (e *ExportAsync) createDefaultDir(ctx context.Context) (string, error) { func (e *ExportAsync) getCacheDir(ctx context.Context) (string, error) {
// 创建临时目录 // 创建临时目录
tempDir, err := os.MkdirTemp("", e.task.Id) tempDir, err := e.getBaseCacheDir(ctx)
if err != nil {
return "", fmt.Errorf("创建临时目录失败: %v", err)
}
//csv //csv
if err = os.Mkdir(e.dirCsv(tempDir), 0755); err != nil { if err = os.Mkdir(e.dirCsv(tempDir), 0755); err != nil {
return "", fmt.Errorf("创建csv目录失败: %v", err) return "", fmt.Errorf("创建csv目录失败: %v", err)
@ -714,6 +710,15 @@ func (e *ExportAsync) createDefaultDir(ctx context.Context) (string, error) {
return tempDir, nil return tempDir, nil
} }
func (e *ExportAsync) getBaseCacheDir(ctx context.Context) (string, error) {
tempDir := filepath.Join(e.cacheDir, e.task.Id)
err := os.MkdirAll(tempDir, 0755) // 使用标准的权限模式
if err != nil {
return "", fmt.Errorf("创建临时目录失败: %v", err)
}
return tempDir, nil
}
func (e *ExportAsync) dirCsv(tempDir string) string { func (e *ExportAsync) dirCsv(tempDir string) string {
return tempDir + "/csv/" return tempDir + "/csv/"
} }
@ -766,7 +771,7 @@ func (e *ExportAsync) release() {
e.csvToExcelBatch = DefaultCsvToExcelBatch e.csvToExcelBatch = DefaultCsvToExcelBatch
e.task = &Task{} e.task = &Task{}
e.workerNum = DefaultWorkNum e.workerNum = DefaultWorkNum
e.uploader = DefaultUploader e.cacheDir = DefaultCacheDir
e.logTool = NewLogPrint(nil) e.logTool = NewLogPrint(nil)
e.sheetName = DefaultSheetName e.sheetName = DefaultSheetName
exportAsyncPool.Put(e) exportAsyncPool.Put(e)

View File

@ -2,18 +2,11 @@ package l_export_async
import ( import (
"context" "context"
"finance/internal/data"
"finance/internal/initialize"
"finance/internal/pkg"
"finance/internal/pkg/helper/attachment"
log2 "finance/internal/pkg/log"
"fmt" "fmt"
"os" "os"
"strings"
"testing"
"time"
attachmentsdk "codeup.aliyun.com/5f9118049cffa29cfdd3be1c/attachment-sdk" "testing"
) )
func Test_Merge(t *testing.T) { func Test_Merge(t *testing.T) {
@ -37,24 +30,24 @@ func Test_Tsk(t *testing.T) {
t.Log(NewTask(NewRedisTaskStore(dat().Rdb)).GetTaskInfo(context.Background(), taskId)) t.Log(NewTask(NewRedisTaskStore(dat().Rdb)).GetTaskInfo(context.Background(), taskId))
} }
func Test_Upload(t *testing.T) { //func Test_Upload(t *testing.T) {
task_id := "c0abe2b3-dede-11f0-9178-00155d5ef0f92369485728" // task_id := "c0abe2b3-dede-11f0-9178-00155d5ef0f92369485728"
filename := "供应商结算交易流水20251222100502" // filename := "供应商结算交易流水20251222100502"
//
file := "/tmp/" + task_id + "/zip/" + filename + ".zip" // file := "/tmp/" + task_id + "/zip/" + filename + ".zip"
//file := "/tmp/a03f82a8-deda-11f0-9c0b-00155d5ef0f9796292421/zip/供应商结算交易流水20251222100502.zip" // //file := "/tmp/a03f82a8-deda-11f0-9c0b-00155d5ef0f9796292421/zip/供应商结算交易流水20251222100502.zip"
//
host := "http://192.168.6.194:8004" // host := "http://192.168.6.194:8004"
sys := "crmApi" // sys := "crmApi"
business := "download" // business := "download"
fieldFormName := "file" // fieldFormName := "file"
resp, err := attachment.Upload(host, file, sys, business, fieldFormName) // resp, err := attachment.Upload(host, file, sys, business, fieldFormName)
if err != nil { // if err != nil {
t.Log(err) // t.Log(err)
} // }
url := attachmentsdk.GeneratePreviewPrivateUrl(host, "", resp.Url, "", strings.TrimSuffix("供应商结算交易流水20251222100502", ".zip"), time.Now().Unix()+300) // url := attachmentsdk.GeneratePreviewPrivateUrl(host, "", resp.Url, "", strings.TrimSuffix("供应商结算交易流水20251222100502", ".zip"), time.Now().Unix()+300)
t.Log(url, err) // t.Log(url, err)
} //}
var ( var (
Name = "test" Name = "test"

2
go.mod
View File

@ -3,7 +3,7 @@ module gitea.cdlsxd.cn/self-tools/l-export-async
go 1.21.10 go 1.21.10
require ( require (
github.com/duke-git/lancet/v2 v2.2.8 github.com/duke-git/lancet/v2 v2.3.8
github.com/google/uuid v1.3.0 github.com/google/uuid v1.3.0
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
github.com/redis/go-redis/v9 v9.17.2 github.com/redis/go-redis/v9 v9.17.2