commit ff5f831b612990bda7d84a87e61acb36f8fa082d Author: renzhiyuan <465386466@qq.com> Date: Mon Dec 22 19:42:36 2025 +0800 添加异步导出功能,支持大文件处理和进度追踪 diff --git a/README.md b/README.md new file mode 100644 index 0000000..0a07ee1 --- /dev/null +++ b/README.md @@ -0,0 +1,24 @@ +## 安装 + +```bash +$ go get gitea.cdlsxd.cn/self-tools/l_ai_excel_header_match +``` + + +## 使用 +```go +func TestAddress(t *testing.T) { + res, err := ExcelMatch(context.Background(), a, b, "", "") + t.Log(res, err) +} + +var ( +a = []string{ +"条码", "分类名称", "货品名称", "货品编号", "商品货号", "品牌", "单位", "规格参数", "货品说明", "保质期", "保质期单位", "链接", "货品图片", "电商销售价格", "销售价", "供应商报价", "税率", "默认供应商", "默认存放仓库", "第三方商品编码", "备注", "长", "宽", "高", "重量", "SPU编码", "SPU名称", +} + +b = []string{ +"商品名称(手工输入)", "品牌(单选)", "商品型号(手工输入)", "商品条码/ISBN/ISSN(手工输入)", "条形码资质1(手工输入)", "条形码资质2(手工输入)", "条形码资质3(手工输入)", "产地(国家)(单选)", "产地(省份)(单选)", "产地(市)(单选)", "长度(手工输入,单位:毫米)", "宽度(手工输入,单位:毫米)", "高度(手工输入,单位:毫米)", "体积(手工输入,单位:立方厘米)", "毛重(手工输入,单位:千克)", "厂家包装含量(手工输入)", "商品税率(单选)(单选)", "采购单位", "供应商商品编码(手工输入)", "商品详情(手工输入)", "发货清单1(手工输入)", "发货清单2(手工输入)", "发货清单3(手工输入)", "发货清单4(手工输入)", "发货清单5(手工输入)", "商品标题(手工输入)", "商品卖点(手工输入)", "促销常规卖点(手工输入)", "促销常规卖点生效时间", "促销常规卖点失效时间", "促销高级卖点(手工输入)", "促销高级卖点生效时间", "促销高级卖点失效时间", "活动关联文案(手工输入)", "电脑端链接(手工输入)", "移动端链接(手工输入)", "活动链接生效时间", "活动链接失效时间", "商品图片1(手工输入)", "商品图片2(手工输入)", "商品图片3(手工输入)", "商品图片4(手工输入)", "商品图片5(手工输入)", "生产者(制造商)名称(手工输入)", "生产商(制造商)地址(手工输入)", "执行标准(手工输入)", "类别(单选)", "茶具类型(单选)", "国产/进口(单选)", "茶具材质(单选)", "上市时间(月)(手工输入)", "茶盘材质(多选)(可选项为:石质,电木,树脂,陶瓷,竹质,木质,其它)", "工艺(单选)", "风格(单选)", "适用人数(单选)", "功能(单选)", "容量(手工输入,单位:毫升)", +} +) +``` diff --git a/entity.go b/entity.go new file mode 100644 index 0000000..acfc9a5 --- /dev/null +++ b/entity.go @@ -0,0 +1,116 @@ +package l_export_async + +import "context" + +// DataProviderFn 定义数据提供函数的类型别名 +type DataProviderFn func(ctx context.Context, pageNum, limit int) ([][]interface{}, error) + +// Uploader 这里主要是为了调用attachment.Upload方法 +type Uploader struct { + Host string + System string + Business string + FieldFormName string +} + +type ExportOption func(*ExportAsync) + +// WithCustomUploader 自定义上传配置 +func WithCustomUploader(sys string, business string, fieldFormName string) ExportOption { + return func(b *ExportAsync) { + b.uploader = &Uploader{ + System: sys, + Business: business, + FieldFormName: fieldFormName, + } + } +} + +// WithCustomBatchSize 每一批次导出数量,数据库每页行数,默认10000行 +func WithCustomBatchSize(batchSize int) ExportOption { + return func(b *ExportAsync) { + b.batchSize = batchSize + } +} + +// WithCustomBufferSize csv转excel的批量写入缓冲区大小,逐行写入设置为0 +func WithCustomBufferSize(bufferSize int) ExportOption { + return func(b *ExportAsync) { + b.csvToExcelBatch = bufferSize + } +} + +// WithCustomSuffixFileName 自定后缀 +func WithCustomSuffixFileName(suffix string) ExportOption { + return func(b *ExportAsync) { + b.fileName = b.fileName + "_" + suffix + } +} + +// WithCustomWorkNum 自定义协程数 +func WithCustomWorkNum(num int) ExportOption { + return func(b *ExportAsync) { + b.workerNum = num + } +} + +// WithCustomExtension 自定义扩展名 +func WithCustomExtension(extension string) ExportOption { + return func(b *ExportAsync) { + b.extension = extension + } +} + +// WithCustomSheetName 自定义注脚 +func WithCustomSheetName(sheetName string) ExportOption { + return func(b *ExportAsync) { + b.sheetName = sheetName + } +} + +// WithProcess 需要进行进度统计 +func WithProcess(dataCount int) ExportOption { + return func(b *ExportAsync) { + b.dataCount = dataCount + } +} + +// WithLogPrint 日志输出组件 +func WithLogPrint(logTool LogTool) ExportOption { + return func(b *ExportAsync) { + b.logTool = NewLogPrint(logTool) + } +} + +// WithMaxRowPerFile 每个Xlsx的行数,默认10000行 +func WithMaxRowPerFile(maxRowPerFile int) ExportOption { + return func(b *ExportAsync) { + b.maxRowPerFile = maxRowPerFile + } +} + +type Task struct { + Id string `json:"id"` + Name string `json:"name"` + Process int32 `json:"process"` //需除10000,上限是100 //初始化10,生成csv60,合并数据20,上传数据10,ProcessScore + Err string `json:"err"` + Source string `json:"source"` + RowCount int64 `json:"row_count"` +} + +type ProcessScore int32 + +const ( + INIT ProcessScore = 100000 + CSV ProcessScore = 600000 + XLSX ProcessScore = 200000 + ATT ProcessScore = 100000 +) + +func (p ProcessScore) int() int32 { + return int32(p) +} + +func (p ProcessScore) float64() float64 { + return float64(p) +} diff --git a/export_async.go b/export_async.go new file mode 100644 index 0000000..c970b63 --- /dev/null +++ b/export_async.go @@ -0,0 +1,464 @@ +package l_export_async + +import ( + "archive/zip" + "context" + "encoding/csv" + "encoding/json" + "fmt" + "io" + "os" + "path/filepath" + "sort" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/google/uuid" + "golang.org/x/sync/errgroup" +) + +var exportAsyncPool = &sync.Pool{ + New: func() interface{} { + return &ExportAsync{ + extension: ".xlsx", + sheetName: "Sheet1", + batchSize: 10000, + maxRowPerFile: 10000, + csvToExcelBatch: 1000, + uploader: &Uploader{ + FieldFormName: "file", + System: "crmApi", + Business: "download", + }, + task: &Task{}, + workerNum: 1, //runtime.NumCPU() * 2, + logTool: NewLogPrint(nil), + } + }, +} + +// ExportAsync 异步导出任务配置->默认配置往上看 +type ExportAsync struct { + //// 导出文件名(不含扩展名) + fileName string + + // 文件扩展名(如 .xlsx),默认.xlsx->WithCustomExtension + extension string + + //xlsx注脚,默认Sheet1->WithCustomSheetName + sheetName string //sheet名称 + + //每一批次导出数量,数据库每页行数,默认10000行->WithCustomBatchSize + batchSize int + + //每个Xlsx的行数,默认10000行->WithMaxRowPerFile + maxRowPerFile int + + //csv转excel的批量写入缓冲区大小,逐行写入设置为1000->WithCustomBufferSize + csvToExcelBatch int + + //!!!导出数总量,这个参数非常重要,可以通过WithProcess设置 + //1.计算进度使用 + //2.当workerNum>1时,因为线程的乱序(虽然可以通过通道解决,但是就没有线程存在的意义了),会导致查询很多空值出来 + //比如:如果dataProvider走的mysql,那么会执行很多空查询,增加数据库压力 + dataCount int + + //日志输出->WithLogPrint + logTool LogTool + // Excel 表头 + header []string + + // 上传配置->WithCustomUploader + uploader *Uploader + + // 数据提供函数 + dataProvider DataProviderFn + + // 任务状态存储(如 Redis); + taskSaveTool TaskSaveTool + + // 并发协程数(务必大于1),默认runtime.NumCPU() * 2->WithCustomWorkNum + workerNum int + + //任务状态 + task *Task +} + +func NewExportAsync( + fileName string, + header []string, + domain string, + dataProvider DataProviderFn, + TaskSaveTool TaskSaveTool, + args ...ExportOption, +) *ExportAsync { + exporter := exportAsyncPool.Get().(*ExportAsync) + exporter.fileName = fileName + exporter.header = header + exporter.dataProvider = dataProvider + exporter.taskSaveTool = TaskSaveTool + exporter.uploader.Host = domain + exporter.task.Name = fileName + for _, arg := range args { + arg(exporter) + } + + return exporter +} + +func (e *ExportAsync) Run(ctx context.Context) (string, error) { + //新建任务 + tempDir, err := e.createTask(ctx) + if err != nil { + return "", fmt.Errorf("创建任务失败: %v", err) + } + + go func() { + // 执行导出任务 + subCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + source, err := e.export(subCtx, tempDir) + if err != nil { + e.logTool.Errorf("导出错误:\n任务:%s,错误原因:%s", e.task.Id, err.Error()) + e.task.Err = err.Error() + _ = e.updateTask(subCtx) + } + e.logTool.Infof("异步导出任务:%s,导出完成,总计导出%d条数据,下载地址:%s", e.task.Id, e.task.RowCount, source) + os.RemoveAll(tempDir) + }() + return e.task.Id, nil +} +func (e *ExportAsync) export(ctx context.Context, tempDir string) (source string, err error) { + + e.processAdd(ctx, INIT.int()) + + e.logTool.Infof("异步导出任务:%s,开始导出到csv", e.task.Id) + csvFiles, err := e.exportToCsv(ctx, tempDir) + e.task.Process = CSV.int() + INIT.int() + e.processAdd(ctx, 0) + // 合并csv文件 + e.logTool.Infof("任务:%s,开始合并到xlsx", e.task.Id) + excelsDir, err := e.mergeCSVsToExcelFiles(csvFiles, tempDir) + if err != nil { + return + } + e.processAdd(ctx, XLSX.int()) + + // 打包 + e.logTool.Infof("异步导出任务:%s,开始打包xlsx", e.task.Id) + source = e.zipFile(tempDir) + if err = e.folderToZip(excelsDir, source); err != nil { + return + } + + if len(e.uploader.Host) > 0 { + e.logTool.Infof("异步导出任务:%s,开始上传", e.task.Id) + source, err = e.upload(source) + if err != nil { + return + } + } + e.task.Source = source + e.processAdd(ctx, ATT.int()) + return +} + +func (e *ExportAsync) exportToCsv(ctx context.Context, tempDir string) (csvFiles []string, err error) { + var ( + perPageProcess int32 + pageLimit int = -1 + csvFilesMap sync.Map + ) + // 计算每页进度 + if e.dataCount > 0 { + pageLimit = (e.dataCount + e.batchSize - 1) / e.batchSize + perPageProcess = CSV.int() / int32(pageLimit) //6 + } + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(e.workerNum) // 限制并发数 + + // 使用原子计数器生成页面编号 + var pageNum int64 = 1 + stopProcessing := false + + for i := 0; i < e.workerNum; i++ { + g.Go(func() error { + for { + if stopProcessing { + return nil + } + + // 原子获取下一页 + page := int(atomic.AddInt64(&pageNum, 1) - 1) + if pageLimit != -1 && page > pageLimit { + return nil + } + // 获取数据 + rows, err := e.dataProvider(ctx, page, e.batchSize) + if err != nil { + e.logTool.Errorf("异步导出任务:%s,第%d页查询失败:%s", e.task.Id, page, err.Error()) + return fmt.Errorf("第%d页查询失败: %w", page, err) + } + + // 检查是否是最后一页 + if len(rows) == 0 || len(rows) < e.batchSize { + stopProcessing = true + } + + // 如果没有数据,结束处理 + if len(rows) == 0 { + return nil + } + + // 生成文件名 + fileName := filepath.Join(tempDir, fmt.Sprintf("/csv/%d.csv", page)) + // 原子增加行数 + atomic.AddInt64(&e.task.RowCount, int64(len(rows))) + + // 保存数据到临时文件 + if err := e.savePageToCSV(rows, fileName); err != nil { + e.logTool.Errorf("任务:%s,导出到csv错误:%s", e.task.Id, err.Error()) + return fmt.Errorf("保存第%d页失败: %w", page, err) + } + // 存储文件名 + csvFilesMap.Store(page, fileName) + // 更新进度 + e.processAdd(ctx, perPageProcess) + } + }) + } + + // 等待所有goroutine完成 + if err := g.Wait(); err != nil { + return nil, err + } + // 将csv文件名称进行排序 + csvFiles = getSortedValues(&csvFilesMap) + return csvFiles, nil +} + +func (e *ExportAsync) upload(file string) (string, error) { + resp, err := Upload(e.uploader.Host, file, e.uploader.System, e.uploader.Business, e.uploader.FieldFormName) + + if err != nil { + return "", err + } + return GeneratePreviewPrivateUrl(e.uploader.Host, "", resp.Url, "", strings.TrimSuffix(e.fileName, ".zip"), time.Now().Unix()+300), nil +} + +func (e *ExportAsync) folderToZip(excelsDir, zipFilePath string) error { + // 创建文件 + zipFile, err := os.Create(zipFilePath) + if err != nil { + return err + } + defer zipFile.Close() + + // 创建zip writer + archive := zip.NewWriter(zipFile) + defer archive.Close() + + // 遍历文件夹 + err = filepath.Walk(excelsDir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + // 忽略文件夹自身 + if info.IsDir() { + return nil + } + + // 打开文件 + file, err := os.Open(path) + if err != nil { + return err + } + defer file.Close() + + // 创建zip文件条目 + header, err := zip.FileInfoHeader(info) + if err != nil { + return err + } + + // 更改工作目录到zip路径 + header.Name = filepath.ToSlash(path[len(excelsDir):]) + + // 创建zip文件条目 + writer, err := archive.CreateHeader(header) + if err != nil { + return err + } + + // 将文件内容写入zip文件条目 + _, err = io.Copy(writer, file) + return err + }) + if err != nil { + return err + } + + return nil +} + +func (e *ExportAsync) zipFile(tempDir string) string { + + return e.dirZip(tempDir) + e.fileName + ".zip" +} + +// mergeCSVsToExcelFiles 将多个CSV文件合并为多个Excel文件(流式处理) +func (e *ExportAsync) mergeCSVsToExcelFiles(csvFiles []string, tempDir string) (outputDir string, err error) { + outputDir = e.dirXlsx(tempDir) + m := NewMerge( + Reader{Files: csvFiles, Index: len(csvFiles) - 1}, + Writer{File: outputDir + e.fileName + e.extension, Limit: e.maxRowPerFile, BufferSize: e.csvToExcelBatch}, + e.logTool, + ) + if err = m.Merge(); err != nil { + return + } + return +} + +func (e *ExportAsync) release() { + // 清空敏感或动态数据 + e.fileName = "" + e.header = nil + e.dataProvider = nil + e.taskSaveTool = nil + exportAsyncPool.Put(e) +} + +func getSortedValues(sm *sync.Map) []string { + // 1. 预分配切片(假设已知大致数量) + items := make([]struct { + key int + value string + }, 0, 16) // 初始容量可调整 + + // 2. 收集数据(单次遍历) + sm.Range(func(key, value interface{}) bool { + items = append(items, struct { + key int + value string + }{key.(int), value.(string)}) + return true + }) + + // 3. 排序 + sort.Slice(items, func(i, j int) bool { + return items[i].key < items[j].key + }) + + // 4. 提取值 + sortedValues := make([]string, len(items)) + for i, item := range items { + sortedValues[i] = item.value + } + + return sortedValues +} + +func (e *ExportAsync) createTask(ctx context.Context) (tempDir string, err error) { + uid, err := uuid.NewUUID() + if err != nil { + err = fmt.Errorf("UUid创建失败: %w", err) + return + } + e.task.Id = uid.String() + tempDir, err = e.createDefaultDir(ctx) + if err != nil { + err = fmt.Errorf("初始化默认文件夹失败: %w", err) + return + } + + err = e.updateTask(ctx) + return tempDir, nil +} + +func (e *ExportAsync) updateTask(ctx context.Context) (err error) { + taskByte, err := json.Marshal(e.task) + if err != nil { + return + } + err = e.taskSaveTool.Set(ctx, e.task.Id, string(taskByte), 0).Err() + if err != nil { + err = fmt.Errorf("更新任务失败: %w", err) + return + } + return +} + +func (e *ExportAsync) processAdd(ctx context.Context, addNum int32) { + + atomic.AddInt32(&e.task.Process, addNum) + e.logTool.Infof("异步导出任务:%s,当前进度:%d", e.task.Id, e.task.Process) + _ = e.updateTask(ctx) + return +} + +func (e *ExportAsync) createDefaultDir(ctx context.Context) (string, error) { + // 创建临时目录 + tempDir, err := os.MkdirTemp("", e.task.Id) + if err != nil { + return "", fmt.Errorf("创建临时目录失败: %v", err) + } + //csv + if err = os.Mkdir(e.dirCsv(tempDir), 0755); err != nil { + return "", fmt.Errorf("创建csv目录失败: %v", err) + } + //xlsx + if err = os.Mkdir(e.dirXlsx(tempDir), 0755); err != nil { + return "", fmt.Errorf("创建xlsx目录失败: %v", err) + } + //zip + if err = os.Mkdir(e.dirZip(tempDir), 0755); err != nil { + return "", fmt.Errorf("创建zip目录失败: %v", err) + } + return tempDir, nil +} + +func (e *ExportAsync) dirCsv(tempDir string) string { + return tempDir + "/csv/" +} + +func (e *ExportAsync) dirXlsx(tempDir string) string { + return tempDir + "/xlsx/" +} + +func (e *ExportAsync) dirZip(tempDir string) string { + return tempDir + "/zip/" +} + +// savePageToCSV 将单页数据保存为CSV文件 +func (e *ExportAsync) savePageToCSV(data [][]interface{}, filename string) error { + file, err := os.Create(filename) + if err != nil { + return fmt.Errorf("创建CSV文件失败: %v", err) + } + defer file.Close() + + writer := csv.NewWriter(file) + defer writer.Flush() + + // 写入表头(如果尚未写入) + if err := writer.Write(e.header); err != nil { + return fmt.Errorf("写入CSV表头失败: %v", err) + } + + // 写入数据行 + for _, row := range data { + csvRow := make([]string, 0, len(e.header)) + for _, val := range row { + csvRow = append(csvRow, fmt.Sprintf("%v", val)) + } + if err := writer.Write(csvRow); err != nil { + return fmt.Errorf("写入CSV行失败: %v", err) + } + } + + return nil +} diff --git a/export_async_test.go b/export_async_test.go new file mode 100644 index 0000000..dfdd061 --- /dev/null +++ b/export_async_test.go @@ -0,0 +1,38 @@ +package l_export_async + +import ( + "fmt" + "os" + "testing" +) + +func Test_Merge(t *testing.T) { + path := "/tmp/f63b2047-dd7b-11f0-b1a8-00155d5ef0f92960656377/csv/" + outputDir := "/tmp/f63b2047-dd7b-11f0-b1a8-00155d5ef0f92960656377/xlsx/aaa.xlsx" + csvFiles, err := listFiles(path) + if err != nil { + panic(err) + } + m := NewMerge( + Reader{Files: csvFiles, Index: len(csvFiles) - 1}, + Writer{File: outputDir, Limit: 30000, BufferSize: 3000}, + NewLogPrint(nil), + ) + err = m.Merge() + t.Log(err) +} + +func listFiles(dirPath string) ([]string, error) { + entries, err := os.ReadDir(dirPath) + if err != nil { + return nil, fmt.Errorf("failed to read directory: %w", err) + } + + var files []string + for _, entry := range entries { + if !entry.IsDir() { // 只添加文件(排除子目录) + files = append(files, dirPath+entry.Name()) + } + } + return files, nil +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..3083e4d --- /dev/null +++ b/go.mod @@ -0,0 +1,25 @@ +module gitea.cdlsxd.cn/self-tools/l-export-async + +go 1.21.10 + +require ( + github.com/duke-git/lancet/v2 v2.2.8 + github.com/google/uuid v1.3.0 + github.com/pkg/errors v0.9.1 + github.com/redis/go-redis/v9 v9.17.2 + github.com/xuri/excelize/v2 v2.9.0 + golang.org/x/sync v0.8.0 +) + +require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect + github.com/richardlehane/mscfb v1.0.4 // indirect + github.com/richardlehane/msoleps v1.0.4 // indirect + github.com/xuri/efp v0.0.0-20240408161823-9ad904a10d6d // indirect + github.com/xuri/nfp v0.0.0-20240318013403-ab9948c2c4a7 // indirect + golang.org/x/crypto v0.28.0 // indirect + golang.org/x/net v0.30.0 // indirect + golang.org/x/text v0.19.0 // indirect +) diff --git a/log.go b/log.go new file mode 100644 index 0000000..8cf198d --- /dev/null +++ b/log.go @@ -0,0 +1,31 @@ +package l_export_async + +import "fmt" + +type LogTool interface { + Errorf(format string, a ...interface{}) + Infof(format string, a ...interface{}) +} +type LogPrint struct { + tool LogTool +} + +func NewLogPrint(tool LogTool) LogTool { + return &LogPrint{tool: tool} +} + +func (r *LogPrint) Errorf(format string, a ...interface{}) { + if r.tool != nil { + r.tool.Errorf(format, a...) + return + } + fmt.Printf(format, a...) +} + +func (r *LogPrint) Infof(format string, a ...interface{}) { + if r.tool != nil { + r.tool.Infof(format, a...) + return + } + fmt.Printf(format, a...) +} diff --git a/merge.go b/merge.go new file mode 100644 index 0000000..13c0ca9 --- /dev/null +++ b/merge.go @@ -0,0 +1,273 @@ +package l_export_async + +import ( + "encoding/csv" + "fmt" + "github.com/xuri/excelize/v2" + "io" + "math" + "os" + "regexp" +) + +type ( + Reader struct { + Files []string + Index int + } + Writer struct { + File string + Limit int + BufferSize int // 缓冲区大小 + } + Merge struct { + reader Reader + writer Writer + log LogTool + buffer [][]interface{} // 批量写入缓冲区 + file *excelize.File + sw *excelize.StreamWriter + titles []interface{} + fileIndex int + total int + rowIndex int // 当前已写入的行数(从0开始) + } +) + +func NewMerge(r Reader, w Writer, log LogTool) *Merge { + m := &Merge{ + reader: r, + writer: w, + log: log, + } + m.open() + return m +} + +func (m *Merge) Merge() error { + defer func() { + if err := m.Save(); err != nil { + m.log.Errorf(err.Error()) + } + }() + + for i := 0; i <= m.reader.Index; i++ { + csvOpen, err := os.Open(m.reader.Files[i]) + if err != nil { + if os.IsNotExist(err) { + continue + } + return fmt.Errorf("打开读取文件%s失败:%w", m.reader.Files[i], err) + } + csvReader := csv.NewReader(csvOpen) + frist := true + for { + record, err := csvReader.Read() + if err == io.EOF { + break + } else if err != nil { + return fmt.Errorf("读取文件%s错误:%w", m.reader.Files[i], err) + } + + row := transform(record) + + // 不是第一个文件时,跳过第一条数据 + if frist { + frist = false + if i == 0 { + m.WriteTitle(row) + } + continue + } + if m.writer.BufferSize > 0 { + m.WriteBatch(row) + } else { + m.Write(row) + } + } + csvOpen.Close() + } + + // 确保所有缓冲区数据都被写入 + if m.writer.BufferSize > 0 { + if err := m.flush(); err != nil { + return err + } + } + + return nil +} + +func (m *Merge) WriteTitle(titles []interface{}) error { + if titles != nil { + m.titles = titles + } + if m.titles != nil { + // 标题写入第一行 + cell, err := excelize.CoordinatesToCellName(1, 1) + if err != nil { + return err + } + err = m.sw.SetRow(cell, m.titles) + if err != nil { + return err + } + // 标题写入后,行索引为1 + m.rowIndex = 1 + } + return nil +} + +func (m *Merge) WriteBatch(values []interface{}) error { + m.buffer = append(m.buffer, values) // 存入缓冲区 + + // 如果缓冲区已满,执行批量写入 + if len(m.buffer) >= m.writer.BufferSize { + if err := m.flush(); err != nil { + return err + } + } + + return nil +} + +func (m *Merge) flush() error { + if len(m.buffer) == 0 { + return nil + } + + // 计算起始行(从当前行索引+1开始,因为Excel行号从1开始) + startRow := m.rowIndex + 1 + + // 批量写入 + for i, row := range m.buffer { + cell, err := excelize.CoordinatesToCellName(1, startRow+i) + if err != nil { + return err + } + if err := m.sw.SetRow(cell, row); err != nil { + return err + } + } + + // 更新行索引 + m.rowIndex += len(m.buffer) + m.total += len(m.buffer) + + // 清空缓冲区 + m.buffer = m.buffer[:0] + + // 检查是否达到限制 + if m.rowIndex >= m.writer.GetLimit() { + if err := m.reset(); err != nil { + return err + } + } + + return nil +} + +func (m *Merge) Write(values []interface{}) error { + // 当前行索引+1作为Excel行号 + cell, err := excelize.CoordinatesToCellName(1, m.rowIndex+1) + if err != nil { + return err + } + + err = m.sw.SetRow(cell, values) + if err != nil { + return err + } + + // 更新计数 + m.rowIndex++ + m.total++ + + // 检查是否达到限制 + if m.rowIndex >= m.writer.GetLimit() { + if err := m.reset(); err != nil { + return err + } + } + + return nil +} + +func (m *Merge) reset() (err error) { + // 先保存当前文件 + if err := m.Save(); err != nil { + return err + } + + // 重置状态并打开新文件 + m.fileIndex++ + m.rowIndex = 0 + m.total = 0 + return m.open() +} + +func (m *Merge) open() (err error) { + m.file = excelize.NewFile() + m.sw, err = m.file.NewStreamWriter("Sheet1") + if err != nil { + return err + } + + // 如果已有标题,写入标题 + if m.titles != nil { + return m.WriteTitle(nil) + } + + return nil +} + +func (m *Merge) Save() error { + // 确保刷新缓冲区 + if m.writer.BufferSize > 0 { + if err := m.flush(); err != nil { + return err + } + } + + // 忽略只有标题的文件 + if m.titles != nil && m.rowIndex <= 1 { + return nil + } + + if err := m.sw.Flush(); err != nil { + return err + } + fileName := m.writer.GetFileName(m.fileIndex) + err := m.file.SaveAs(fileName) + return err +} + +// GetFileName 获取文件名 +func (w *Writer) GetFileName(fileIndex int) string { + if fileIndex == 0 { + return w.File + } + + extRegex := regexp.MustCompile(`(\.[^.]+)$`) + matches := extRegex.FindStringSubmatch(w.File) + if len(matches) > 0 { + ext := matches[1] + baseName := w.File[:len(w.File)-len(ext)] + return fmt.Sprintf("%s_%d%s", baseName, fileIndex, ext) + } + + return fmt.Sprintf("%s_%d", w.File, fileIndex) +} + +func (w *Writer) GetLimit() int { + // excel 单表最大100w行数据 + return int(math.Min(float64(w.Limit), 1000000)) +} + +func transform(record []string) []interface{} { + result := make([]interface{}, len(record)) + for i2, s := range record { + result[i2] = s + } + return result +} diff --git a/redis.go b/redis.go new file mode 100644 index 0000000..a65b4aa --- /dev/null +++ b/redis.go @@ -0,0 +1,29 @@ +package l_export_async + +import ( + "context" + "time" + + "github.com/redis/go-redis/v9" +) + +type redisTaskStore struct { + client *redis.Client +} + +func (r *redisTaskStore) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) TaskErr { + return r.client.Set(ctx, key, value, expiration) +} + +func (r *redisTaskStore) Del(ctx context.Context, keys ...string) TaskErr { + //实际运行中并没有去执行这个,因为不确定是否真的需要删除,如果需要可以自行写入 + return r.client.Del(ctx, keys...) +} + +func (r *redisTaskStore) Get(ctx context.Context, key string) TaskGet { + return r.client.Get(ctx, key) +} + +func NewRedisTaskStore(client *redis.Client) TaskSaveTool { + return &redisTaskStore{client: client} +} diff --git a/task.go b/task.go new file mode 100644 index 0000000..4f565ed --- /dev/null +++ b/task.go @@ -0,0 +1,35 @@ +package l_export_async + +import ( + "context" + "time" +) + +type TaskInfo struct { + taskSaveTool TaskSaveTool // 任务状态存储(如 Redis) +} + +type TaskErr interface { + Err() error +} + +type TaskGet interface { + TaskErr + Val() string +} + +type TaskSaveTool interface { + Set(ctx context.Context, key string, value interface{}, expiration time.Duration) TaskErr + Del(ctx context.Context, keys ...string) TaskErr + Get(ctx context.Context, key string) TaskGet +} + +func NewTask(taskSaveTool TaskSaveTool) *TaskInfo { + return &TaskInfo{ + taskSaveTool: taskSaveTool, + } +} + +func (t *TaskInfo) GetTaskInfo(ctx context.Context, taskId string) TaskGet { + return t.taskSaveTool.Get(ctx, taskId) +} diff --git a/uploader.go b/uploader.go new file mode 100644 index 0000000..0c39959 --- /dev/null +++ b/uploader.go @@ -0,0 +1,142 @@ +package l_export_async + +import ( + "bytes" + "crypto/md5" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "mime/multipart" + "net/http" + "net/url" + "os" + "strconv" + "time" + + "github.com/duke-git/lancet/v2/retry" + "github.com/pkg/errors" +) + +const ( + TokenSalt = "LanSeXiongDi!@#&*(" + UrlPreview = "v1/attachment/preview" +) + +type UploadResp struct { + Url string `json:"url"` + PreviewUrl string `json:"previewUrl"` +} + +// Upload 上传文件 +// 返回值:oss地址,预览地址,错误 +func Upload(host, filePath, system, business, fieldFormName string) (*UploadResp, error) { + f, err := os.Open(filePath) + if err != nil { + return nil, errors.WithMessage(err, "打开待上传文件失败") + } + defer f.Close() + body := &bytes.Buffer{} + writer := multipart.NewWriter(body) + formData := map[string]string{ + "system": system, + "business": business, + } + for k, v := range formData { + err = writer.WriteField(k, v) + if err != nil { + return nil, errors.WithMessage(err, "构建form-data失败") + } + } + // 使用给出的属性名paramName和文件名filePath创建一个新的form-data头 + part, err := writer.CreateFormFile(fieldFormName, filePath) + if err != nil { + return nil, errors.WithMessage(err, "创建文件流失败") + } + // 将源复制到目标,将file写入到part 是按默认的缓冲区32k循环操作的,不会将内容一次性全写入内存中,这样就能解决大文件的问题 + _, err = io.Copy(part, f) + if err != nil { + return nil, errors.WithMessage(err, "复制文件流失败") + } + err = writer.Close() + if err != nil { + return nil, errors.WithMessage(err, "close writer失败") + } + httpClient := http.Client{ + Timeout: time.Minute * 10, + } + url := fmt.Sprintf("%s/v1/attachment/upload", host) + req, err := http.NewRequest("POST", url, body) + if err != nil { + return nil, errors.WithMessage(err, "new request 失败") + } + req.Header.Set("Content-Type", writer.FormDataContentType()) + // 请求服务器 + uploadResp := &UploadResp{} + requestHttp := func() error { + var requestErr error + var respHttp *http.Response + respHttp, requestErr = httpClient.Do(req) + if requestErr != nil { + return errors.WithMessage(err, "上传响应失败") + } + defer respHttp.Body.Close() + respBody, requestErr := io.ReadAll(respHttp.Body) + if requestErr != nil { + return errors.WithMessage(err, "读取响应体失败") + } + if respHttp.StatusCode != http.StatusOK { + respMap := make(map[string]string) + _ = json.Unmarshal(respBody, &respMap) + if respMap["message"] != "" { + // 非正常响应体 + return errors.Errorf("上传响应状态异常,响应码:%d,响应体:%s", respHttp.StatusCode, string(respBody)) + } + return errors.Errorf("响应错误:%s", respMap["message"]) + } + + requestErr = json.Unmarshal(respBody, &uploadResp) + if requestErr != nil { + // json失败为非正常响应体 + if respHttp.StatusCode != http.StatusOK { + return errors.Errorf("上传响应状态异常,响应码:%d,响应体:%s", respHttp.StatusCode, string(respBody)) + } + return errors.WithMessage(err, "json decode响应值失败") + } + return nil + } + + _ = retry.Retry(func() error { + err = requestHttp() + return err + }, retry.RetryTimes(5), retry.RetryDuration(time.Second*3)) + if err != nil { + return nil, err + } + + return uploadResp, nil +} + +// GeneratePreviewPrivateUrl 生成私有预览地址 +func GeneratePreviewPrivateUrl(domain, param, attachmentUrl, water, fileName string, expireAt int64) string { + token := Signature(attachmentUrl, water, expireAt) + params := url.Values{} + params.Add("url", attachmentUrl) + params.Add("water", water) + params.Add("token", token) + params.Add("param", param) + params.Add("fileName", fileName) + params.Add("expireAt", strconv.FormatInt(expireAt, 10)) + return fmt.Sprintf("%s/%s?%s", domain, UrlPreview, params.Encode()) +} + +// Signature 附件加签 +func Signature(attachmentUrl, water string, expireAt int64) string { + s := fmt.Sprintf("%s,%s,%s,%d", TokenSalt, attachmentUrl, water, expireAt) + return MD5Sign(s) +} + +func MD5Sign(s string) string { + sum := md5.Sum([]byte(s)) + return hex.EncodeToString(sum[:]) +}