l-export-async/merge.go

275 lines
4.9 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package l_export_async
import (
"encoding/csv"
"fmt"
"io"
"math"
"os"
"regexp"
"github.com/xuri/excelize/v2"
)
type (
Reader struct {
Files []string
Index int
}
Writer struct {
File string
Limit int
BufferSize int // 缓冲区大小
}
Merge struct {
reader Reader
writer Writer
log LogTool
buffer [][]interface{} // 批量写入缓冲区
file *excelize.File
sw *excelize.StreamWriter
titles []interface{}
fileIndex int
total int
rowIndex int // 当前已写入的行数从0开始
}
)
func NewMerge(r Reader, w Writer, log LogTool) *Merge {
m := &Merge{
reader: r,
writer: w,
log: log,
}
m.open()
return m
}
func (m *Merge) Merge() error {
defer func() {
if err := m.Save(); err != nil {
m.log.Errorf(err.Error())
}
}()
for i := 0; i <= m.reader.Index; i++ {
csvOpen, err := os.Open(m.reader.Files[i])
if err != nil {
if os.IsNotExist(err) {
continue
}
return fmt.Errorf("打开读取文件%s失败%w", m.reader.Files[i], err)
}
csvReader := csv.NewReader(csvOpen)
frist := true
for {
record, err := csvReader.Read()
if err == io.EOF {
break
} else if err != nil {
return fmt.Errorf("读取文件%s错误%w", m.reader.Files[i], err)
}
row := transform(record)
// 不是第一个文件时,跳过第一条数据
if frist {
frist = false
if i == 0 {
m.WriteTitle(row)
}
continue
}
if m.writer.BufferSize > 0 {
m.WriteBatch(row)
} else {
m.Write(row)
}
}
csvOpen.Close()
}
// 确保所有缓冲区数据都被写入
if m.writer.BufferSize > 0 {
if err := m.flush(); err != nil {
return err
}
}
return nil
}
func (m *Merge) WriteTitle(titles []interface{}) error {
if titles != nil {
m.titles = titles
}
if m.titles != nil {
// 标题写入第一行
cell, err := excelize.CoordinatesToCellName(1, 1)
if err != nil {
return err
}
err = m.sw.SetRow(cell, m.titles)
if err != nil {
return err
}
// 标题写入后行索引为1
m.rowIndex = 1
}
return nil
}
func (m *Merge) WriteBatch(values []interface{}) error {
m.buffer = append(m.buffer, values) // 存入缓冲区
// 如果缓冲区已满,执行批量写入
if len(m.buffer) >= m.writer.BufferSize {
if err := m.flush(); err != nil {
return err
}
}
return nil
}
func (m *Merge) flush() error {
if len(m.buffer) == 0 {
return nil
}
// 计算起始行(从当前行索引+1开始因为Excel行号从1开始
startRow := m.rowIndex + 1
// 批量写入
for i, row := range m.buffer {
cell, err := excelize.CoordinatesToCellName(1, startRow+i)
if err != nil {
return err
}
if err := m.sw.SetRow(cell, row); err != nil {
return err
}
}
// 更新行索引
m.rowIndex += len(m.buffer)
m.total += len(m.buffer)
// 清空缓冲区
m.buffer = m.buffer[:0]
// 检查是否达到限制
if m.rowIndex >= m.writer.GetLimit() {
if err := m.reset(); err != nil {
return err
}
}
return nil
}
func (m *Merge) Write(values []interface{}) error {
// 当前行索引+1作为Excel行号
cell, err := excelize.CoordinatesToCellName(1, m.rowIndex+1)
if err != nil {
return err
}
err = m.sw.SetRow(cell, values)
if err != nil {
return err
}
// 更新计数
m.rowIndex++
m.total++
// 检查是否达到限制
if m.rowIndex >= m.writer.GetLimit() {
if err := m.reset(); err != nil {
return err
}
}
return nil
}
func (m *Merge) reset() (err error) {
// 先保存当前文件
if err := m.Save(); err != nil {
return err
}
// 重置状态并打开新文件
m.fileIndex++
m.rowIndex = 0
m.total = 0
return m.open()
}
func (m *Merge) open() (err error) {
m.file = excelize.NewFile()
m.sw, err = m.file.NewStreamWriter("Sheet1")
if err != nil {
return err
}
// 如果已有标题,写入标题
if m.titles != nil {
return m.WriteTitle(nil)
}
return nil
}
func (m *Merge) Save() error {
// 确保刷新缓冲区
if m.writer.BufferSize > 0 {
if err := m.flush(); err != nil {
return err
}
}
// 忽略只有标题的文件
if m.titles != nil && m.rowIndex <= 1 {
return nil
}
if err := m.sw.Flush(); err != nil {
return err
}
fileName := m.writer.GetFileName(m.fileIndex)
err := m.file.SaveAs(fileName)
return err
}
// GetFileName 获取文件名
func (w *Writer) GetFileName(fileIndex int) string {
if fileIndex == 0 {
return w.File
}
extRegex := regexp.MustCompile(`(\.[^.]+)$`)
matches := extRegex.FindStringSubmatch(w.File)
if len(matches) > 0 {
ext := matches[1]
baseName := w.File[:len(w.File)-len(ext)]
return fmt.Sprintf("%s_%d%s", baseName, fileIndex, ext)
}
return fmt.Sprintf("%s_%d", w.File, fileIndex)
}
func (w *Writer) GetLimit() int {
// excel 单表最大100w行数据
return int(math.Min(float64(w.Limit), 1000000))
}
func transform(record []string) []interface{} {
result := make([]interface{}, len(record))
for i2, s := range record {
result[i2] = s
}
return result
}