l_excel_import/import.go

659 lines
16 KiB
Go
Raw Normal View History

2025-02-21 10:52:23 +08:00
package excel_import
import (
"context"
"fmt"
"github.com/bytedance/sonic"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/transport/http"
"github.com/xuri/excelize/v2"
"io"
"math"
urlnet "net/url"
"os"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
)
type ImportExcel struct {
Ctx context.Context
basePath string
task *Task
errRowsExporter *CreateExcel
rowCount int
importRowsCount int32
RowPicCells []string //图片所在单元格
PicSaveHandle func(pic *Pic) (url string, err error) //存储图片文件
DeleteOssHandle func(ObjectName []string) (err error) //删除OSS图片
GetFileObject func(fileObjectUrl string) (body io.ReadCloser, err error)
Rows []map[string]string
TrimFiled []string //需要进行去掉空格操作的字段
RegexFiledMap map[string]*Regex
JobName string //任务类型,区分不同业务
TaskId string //自动生成
SpeedMod bool //是否开启加速
SliceLen uint //加速模式下,并行切片长度,默认为100
Header []string //头部标题对标
HandleFunc func(excel *ImportExcel, rows []map[string]string)
FileObjectUrl string
DownloadUrl string
}
type checkOption struct {
TrimCheck bool
RegexCheck bool
}
func NewImportExcel(jobName string, opts ...Option) *ImportExcel {
Import := &ImportExcel{
JobName: jobName,
errRowsExporter: &CreateExcel{}, //错误行导出
task: &Task{}, //任务
}
for _, opt := range opts {
opt(Import) // 应用选项
}
return Import
}
func (i *ImportExcel) Init(ctx http.Context, handleFunc func(excel *ImportExcel, row []map[string]string)) (*ImportExcel, error) {
i.Ctx = ctx
basePath, err := importLogPath(i.JobName)
i.basePath = basePath
if err != nil {
return nil, err
}
err = CheckDir(i.taskFile())
if err != nil {
return nil, err
}
err = CheckDir(i.importLogFile())
if err != nil {
return nil, err
}
err = CheckDir(i.errExcelPath())
if err != nil {
return nil, err
}
i.HandleFunc = handleFunc
i.TaskId = i.createTaskId()
err = i.GetRows(ctx.Request())
if err != nil {
return nil, err
}
i.errRowsExporter.FileName = fmt.Sprintf("%s.xlsx", i.TaskId)
i.errRowsExporter.Path = i.errExcelPath()
return i, nil
}
func (i *ImportExcel) Run() (err error) {
//创建任务
i.task = &Task{
TaskId: i.TaskId,
Ctime: time.Now().Format(time.DateTime),
Status: TaskStatusInit,
}
err = i.updateTask()
if err != nil {
return err
}
i.rowCount = len(i.Rows) - 1
err = i.filedCheck()
if err != nil {
return err
}
if i.SpeedMod {
if i.SliceLen == 0 {
i.SliceLen = 100
}
//从第二行开始读取
for j := 1; j <= len(i.Rows[1:]); j += int(i.SliceLen) {
if j+int(i.SliceLen) > len(i.Rows) {
i.SliceLen = uint(len(i.Rows) - j)
}
//****HandleFunc需要创建子上下文来防止ctx被取消导致上下文丢失**** ChildCtx,cancel:=context.WithCancel(context.BackGround()) defer cancel()
go i.HandleFunc(i, i.Rows[j:j+int(i.SliceLen)])
}
} else {
i.HandleFunc(i, i.Rows[1:])
}
i.task.Status = TaskStatusFinish
i.updateTask()
return nil
}
func (i *ImportExcel) filedCheck() (err error) {
var (
checkOptions checkOption
errMsg []string
)
if len(i.TrimFiled) >= 0 {
checkOptions.TrimCheck = true
}
if len(i.RegexFiledMap) >= 0 {
checkOptions.RegexCheck = true
}
//去掉空格
trimKeysMap := make(map[string]struct{})
for _, key := range i.TrimFiled {
trimKeysMap[key] = struct{}{}
}
trimValue := func(value string) string {
return strings.TrimSpace(value)
}
for line, dataMap := range i.Rows[1:] {
for key, value := range dataMap {
//去掉空格
if checkOptions.TrimCheck {
if _, exists := trimKeysMap[key]; exists {
value = trimValue(value)
}
}
//正则匹配
if checkOptions.RegexCheck {
if _, exists := i.RegexFiledMap[key]; exists {
match := RegexMatch(i.RegexFiledMap[key].Rule, value)
if match == i.RegexFiledMap[key].MatchBool {
errMsg = append(errMsg, fmt.Sprintf("第%d行,字段:%s,值:%s,格式错误:%s", line+2, key, value, i.RegexFiledMap[key].Desc))
}
}
}
dataMap[key] = value
}
}
if len(errMsg) > 0 {
return fmt.Errorf(strings.Join(errMsg, "\n"))
}
return
}
func (i *ImportExcel) GetRows(request *http.Request) (err error) {
err = i.getRowsFromHttp(request)
if err != nil {
return err
}
if i.Rows == nil || len(i.Rows) == 0 {
return fmt.Errorf("未获取到导入数据或导入数据为空")
}
return nil
}
func (i *ImportExcel) GetTaskInfo(taskId string) (*TaskResp, error) {
var (
info *Task
)
i.TaskId = taskId
basePath, err := importLogPath(i.JobName)
i.basePath = basePath
if err != nil {
return nil, err
}
taskInfo, _ := os.ReadFile(i.taskFile())
_ = sonic.Unmarshal(taskInfo, &info)
if info == nil {
// 兼容,提供默认值,类似:{"task_id":"1734576235708222000","process":100,"ctime":"2024-12-19 10:44:03","ftime":"2024-12-19 10:44:03","status":2}
info = &Task{
TaskId: taskId,
Process: 0,
Status: TaskStatusInit,
Ctime: time.Now().Format(time.DateTime),
Ftime: time.Now().Format(time.DateTime),
}
}
if info.Process >= 95 {
info.Process = 100
}
return &TaskResp{
Task: info,
ImportErrLog: i.importLog(),
}, nil
}
func (i *ImportExcel) importLog() []*ImportLog {
var logs = []*ImportLog{}
_, err := os.Stat(i.importLogFile())
if err != nil {
return logs
}
taskLog, err := os.ReadFile(i.importLogFile())
if err != nil {
return logs
}
logList := strings.Split(string(taskLog), "\n")
for _, v := range logList {
if v == "" {
continue
}
var log *ImportLog
err = sonic.Unmarshal([]byte(v), &log)
if err != nil {
return logs
}
logs = append(logs, log)
}
return logs
}
func (i *ImportExcel) GetExcel(taskId string) string {
i.TaskId = taskId
basePath, _ := importLogPath(i.JobName)
i.basePath = basePath
return i.errExcelFile()
}
func (i *ImportExcel) DownloadExcel(ctx http.Context) (err error) {
taskId := ctx.Query().Get("task_id")
addr := i.GetExcel(taskId)
_, exist := os.Stat(addr)
if exist != nil {
return fmt.Errorf("文件不存在")
}
file, err := os.Open(addr)
if err != nil {
return err
}
defer file.Close()
payload, err := io.ReadAll(file)
if err != nil {
return err
}
// 设置HTTP响应头
// 打开为预览
//ctx.Response().Header().Set("Content-Type", "image/png")
// 打开为下载
ctx.Response().Header().Set("Content-Type", "application/octet-stream")
ctx.Response().Header().Set("Content-Disposition", "attachment; filename="+taskId+".xlsx")
// 将结果写入
_, err = ctx.Response().Write(payload)
return
}
func (i *ImportExcel) DownloadExcelTemp(ctx http.Context) (err error) {
file, err := os.Open(i.DownloadUrl)
if err != nil {
return err
}
defer file.Close()
payload, err := io.ReadAll(file)
if err != nil {
return err
}
// 设置HTTP响应头
// 打开为预览
//ctx.Response().Header().Set("Content-Type", "image/png")
// 打开为下载
ctx.Response().Header().Set("Content-Type", "application/octet-stream")
ctx.Response().Header().Set("Content-Disposition", "attachment; filename="+i.JobName+".xlsx")
// 将结果写入
_, err = ctx.Response().Write(payload)
return
}
func (i *ImportExcel) TaskInfo(ctx http.Context) (err error) {
taskId := ctx.Query().Get("task_id")
importExcelInfo, err := i.GetTaskInfo(taskId)
if err != nil {
return err
}
response := make(map[string]interface{}, 1)
response["data"] = importExcelInfo
return ctx.Result(200, response)
}
func (i *ImportExcel) TaskHis(ctx http.Context) (err error) {
var (
data []map[string]interface{}
res ResPage
)
page := ctx.Query().Get("page")
num := ctx.Query().Get("limit")
path := i.taskPath()
entries := SortFileWithStatus(path)
count := len(entries)
pageInt, _ := strconv.ParseInt(page, 10, 64)
numInt, _ := strconv.ParseInt(num, 10, 64)
begin := (pageInt - 1) * numInt
entEnd := begin + numInt
if count < int(entEnd) {
entEnd = int64(count)
}
entries = entries[begin:entEnd]
for _, entry := range entries {
if entry.FileInfo == nil {
break
}
info := make(map[string]interface{})
if entry.IsDir() {
continue
}
info["task_id"] = entry.Name()
file := fmt.Sprintf("%s/%s", path, entry.Name())
bytes, _ := os.ReadFile(file)
_ = sonic.Unmarshal(bytes, &info)
fileOs, _ := os.Stat(file)
info["update_time"] = fileOs.ModTime().Format(time.DateTime)
data = append(data, info)
}
res = ResPage{
Page: int(pageInt),
Limit: int(numInt),
Total: count,
Data: data,
LastPage: int(math.Ceil(float64(count) / float64(numInt))),
}
response := make(map[string]interface{}, 1)
response["data"] = res
return ctx.Result(200, response)
}
func (i *ImportExcel) getRowsFromHttp(r *http.Request) (err error) {
// 定义一个file文件
var file io.ReadCloser
// 获取上传的文件
// 获取post请求的参数
fileObjectUrl := r.FormValue("fileObjectUrl")
if i.GetFileObject != nil && fileObjectUrl != "" {
// 判断fileObjectUrl 是否为excel相关得格式
if !IsExcelFormat(fileObjectUrl) {
return fmt.Errorf("文件格式错误, 不是excel")
}
file, err = i.GetFileObject(fileObjectUrl)
if err != nil {
return err
}
// 记录excel地址用于删除文件
i.FileObjectUrl = fileObjectUrl
} else {
file, _, err = r.FormFile("file")
speed_mode := r.PostForm.Get("speed_mode")
if strings.EqualFold(speed_mode, "true") {
i.SpeedMod = true
}
if err != nil {
return fmt.Errorf("未找到导入文件: %w", err)
}
}
defer file.Close()
// 解析Excel文件
f, err := excelize.OpenReader(file)
if err != nil {
return fmt.Errorf("解析excel文件失败: %w", err)
}
defer f.Close()
// 获取第一个工作表的名称
sheetNames := f.GetSheetList()
if len(sheetNames) == 0 {
return fmt.Errorf("无效的excel未获取到对应的sheet")
}
rows, err := f.GetRows(sheetNames[0])
if err != nil {
return fmt.Errorf("excel内未找到数据: %w", err)
}
if i.PicSaveHandle != nil {
i.RowPicCells, err = f.GetPictureCells(sheetNames[0])
if err != nil {
return fmt.Errorf("excel内图片获取失败: %w", err)
}
// 遍历所有的图片
var picList []*Pic
for _, cell := range i.RowPicCells {
if cell == "N1" {
continue
}
pics, _err := f.GetPictures(sheetNames[0], cell)
if _err != nil {
return fmt.Errorf("excel内未找到图片: %w", _err)
}
if len(pics) > 0 {
for _, v := range pics {
if v.File != nil {
picList = append(picList, &Pic{
PicInfos: v,
Cell: cell,
})
}
}
}
}
fmt.Printf("开始上传OSS时间%s \n", time.Now().Format(time.DateTime))
if len(picList) > 0 {
var (
cellUrlMap = make(map[string][]string)
wg = sync.WaitGroup{}
mu sync.Mutex
)
wg.Add(len(picList))
for _, pic := range picList {
go func() {
defer wg.Done()
defer func() {
if r := recover(); r != nil {
fmt.Printf("图片上传失败cell:%s,url:%s,err:%s", pic.Cell, pic.Url, r)
}
}()
url, _err := i.PicSaveHandle(pic)
mu.Lock()
if _err != nil {
log.Error(fmt.Sprintf("图片上传失败cell:%s,url:%s,err:%s", pic.Cell, pic.Url, _err.Error()))
}
cellUrlMap[pic.Cell] = append(cellUrlMap[pic.Cell], url)
mu.Unlock()
}()
}
wg.Wait()
fmt.Printf("结束上传OSS时间%s \n", time.Now().Format(time.DateTime))
if len(cellUrlMap) > 0 {
for cell, cellPic := range cellUrlMap {
err = f.SetCellValue(sheetNames[0], cell, strings.Join(cellPic, ","))
if err != nil {
continue
}
}
}
}
fmt.Printf("os--暂无图片,时间:%s \n", time.Now().Format(time.DateTime))
//从新获取一次
rows, err = f.GetRows(sheetNames[0])
if err != nil {
return fmt.Errorf("excel内未找到数据: %w", err)
}
}
// 获取所有行
if len(rows) == 0 {
return fmt.Errorf("无效的excel未获取到对应的记录数据")
}
if len(i.Header) > 0 || i.Header != nil {
i.Rows = ExchangeRows(rows, i.Header)
}
return err
}
func (i *ImportExcel) getRowsFromFile(filePath string) (err error) {
return err
}
func (i *ImportExcel) updateTask() error {
file, err := os.OpenFile(i.taskFile(), os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0766)
if err != nil {
return err
}
defer file.Close()
taskInfo, err := sonic.Marshal(i.task)
if err != nil {
return err
}
jsonInfo := string(taskInfo)
_, err = file.WriteString(jsonInfo)
if err != nil {
return err
}
return nil
}
func (i *ImportExcel) AddErr(failReason string, key int, row map[string]string) {
_ = i.updateImportLog(&ImportLog{
FailReason: failReason,
Line: key,
})
var interfaces []interface{}
rowSort := ExchangeRowWithMap(row, i.Header)
for _, v := range rowSort {
interfaces = append(interfaces, v)
}
interfaces = append(interfaces, failReason)
i.errRowsExporter.Rows = append(i.errRowsExporter.Rows, interfaces)
i.errRowsExporter.ErrFileUrls = append(i.errRowsExporter.ErrFileUrls, row["货品图片"])
i.Next()
return
}
func (i *ImportExcel) Next() (err error) {
atomic.AddInt32(&i.importRowsCount, 1)
i.task.Process = int(i.importRowsCount * 100 / int32(i.rowCount))
i.updateTask()
if int(i.importRowsCount) == i.rowCount {
i.task.Process = 100
i.task.Status = TaskStatusCreateFailExcel
err = i.updateTask()
if err != nil {
return
}
if len(i.errRowsExporter.Rows) > 0 {
headerInterfaces := make([]interface{}, len(i.Header)+1)
for index, header := range i.Header {
headerInterfaces[index] = header
}
headerInterfaces[len(headerInterfaces)-1] = "失败原因"
i.errRowsExporter.Header = headerInterfaces
err = i.errRowsExporter.Init()
if err != nil {
return
}
for _, v := range i.errRowsExporter.Rows {
_ = i.errRowsExporter.Write(v)
}
i.errRowsExporter.Save()
}
// 删除已经上传到OSS的图片
if len(i.errRowsExporter.ErrFileUrls) > 0 {
var errFileObjectName []string
for _, url := range i.errRowsExporter.ErrFileUrls {
if url == "" {
continue
}
ossUrl := strings.Split(url, ",")
for _, v := range ossUrl {
parsedURL, err := urlnet.Parse(v)
if err != nil {
fmt.Printf("解析URL失败: %v\n", err)
continue
}
// 去掉path前面得/
parsedURL.Path = strings.TrimPrefix(parsedURL.Path, "/")
errFileObjectName = append(errFileObjectName, parsedURL.Path)
}
}
// 批量删除OSS文件
if len(errFileObjectName) > 0 {
if i.DeleteOssHandle != nil {
err = i.DeleteOssHandle(errFileObjectName)
if err != nil {
return
}
}
}
}
if i.FileObjectUrl != "" {
// 批量删除OSS文件
if i.DeleteOssHandle != nil {
err = i.DeleteOssHandle([]string{i.FileObjectUrl})
if err != nil {
return
}
}
}
}
i.task.Status = TaskStatusRunning
i.task.Ftime = time.Now().Format(time.DateTime)
i.updateTask()
return nil
}
func (i *ImportExcel) CreateFailExcel() {
i.importRowsCount++
}
func (i *ImportExcel) updateImportLog(importLog *ImportLog) error {
file, err := os.OpenFile(i.importLogFile(), os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0766)
if err != nil {
return err
}
defer file.Close()
logInfo, err := sonic.Marshal(importLog)
if err != nil {
return err
}
jsonInfo := string(logInfo)
_, err = file.WriteString(jsonInfo + "\n")
if err != nil {
return err
}
return nil
}
func (i *ImportExcel) taskFile() string {
return i.basePath + "/task/" + i.TaskId
}
func (i *ImportExcel) taskPath() string {
if i.basePath == "" {
basePath, _ := importLogPath(i.JobName)
i.basePath = basePath
}
return i.basePath + "/task/"
}
func (i *ImportExcel) importLogFile() string {
return i.basePath + "/import/" + i.TaskId
}
func (i *ImportExcel) errExcelPath() string {
return i.basePath + "/excel/"
}
func (i *ImportExcel) errExcelFile() string {
return i.errExcelPath() + i.TaskId + ".xlsx"
}
func (i *ImportExcel) createTaskId() string {
return fmt.Sprintf("%d", time.Now().UnixNano())
}