package excel_import import ( "context" "fmt" "github.com/bytedance/sonic" "github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/transport/http" "github.com/xuri/excelize/v2" "io" "math" urlnet "net/url" "os" "strconv" "strings" "sync" "sync/atomic" "time" ) type ImportExcel struct { Ctx context.Context basePath string task *Task errRowsExporter *CreateExcel rowCount int importRowsCount int32 RowPicCells []string //图片所在单元格 PicSaveHandle func(pic *Pic) (url string, err error) //存储图片文件 DeleteOssHandle func(ObjectName []string) (err error) //删除OSS图片 GetFileObject func(fileObjectUrl string) (body io.ReadCloser, err error) Rows []map[string]string TrimFiled []string //需要进行去掉空格操作的字段 RegexFiledMap map[string]*Regex JobName string //任务类型,区分不同业务 TaskId string //自动生成 SpeedMod bool //是否开启加速 SliceLen uint //加速模式下,并行切片长度,默认为100 Header []string //头部标题对标 HandleFunc func(excel *ImportExcel, rows []map[string]string) FileObjectUrl string DownloadUrl string } type checkOption struct { TrimCheck bool RegexCheck bool } func NewImportExcel(jobName string, opts ...Option) *ImportExcel { Import := &ImportExcel{ JobName: jobName, errRowsExporter: &CreateExcel{}, //错误行导出 task: &Task{}, //任务 } for _, opt := range opts { opt(Import) // 应用选项 } return Import } func (i *ImportExcel) Init(ctx http.Context, handleFunc func(excel *ImportExcel, row []map[string]string)) (*ImportExcel, error) { i.Ctx = ctx basePath, err := importLogPath(i.JobName) i.basePath = basePath if err != nil { return nil, err } err = CheckDir(i.taskFile()) if err != nil { return nil, err } err = CheckDir(i.importLogFile()) if err != nil { return nil, err } err = CheckDir(i.errExcelPath()) if err != nil { return nil, err } i.HandleFunc = handleFunc i.TaskId = i.createTaskId() err = i.GetRows(ctx.Request()) if err != nil { return nil, err } i.errRowsExporter.FileName = fmt.Sprintf("%s.xlsx", i.TaskId) i.errRowsExporter.Path = i.errExcelPath() return i, nil } func (i *ImportExcel) Run() (err error) { //创建任务 i.task = &Task{ TaskId: i.TaskId, Ctime: time.Now().Format(time.DateTime), Status: TaskStatusInit, } err = i.updateTask() if err != nil { return err } i.rowCount = len(i.Rows) - 1 err = i.filedCheck() if err != nil { return err } if i.SpeedMod { if i.SliceLen == 0 { i.SliceLen = 100 } //从第二行开始读取 for j := 1; j <= len(i.Rows[1:]); j += int(i.SliceLen) { if j+int(i.SliceLen) > len(i.Rows) { i.SliceLen = uint(len(i.Rows) - j) } //****HandleFunc需要创建子上下文来防止ctx被取消导致上下文丢失**** ChildCtx,cancel:=context.WithCancel(context.BackGround()) defer cancel() go i.HandleFunc(i, i.Rows[j:j+int(i.SliceLen)]) } } else { i.HandleFunc(i, i.Rows[1:]) } i.task.Status = TaskStatusFinish i.updateTask() return nil } func (i *ImportExcel) filedCheck() (err error) { var ( checkOptions checkOption errMsg []string ) if len(i.TrimFiled) >= 0 { checkOptions.TrimCheck = true } if len(i.RegexFiledMap) >= 0 { checkOptions.RegexCheck = true } //去掉空格 trimKeysMap := make(map[string]struct{}) for _, key := range i.TrimFiled { trimKeysMap[key] = struct{}{} } trimValue := func(value string) string { return strings.TrimSpace(value) } for line, dataMap := range i.Rows[1:] { for key, value := range dataMap { //去掉空格 if checkOptions.TrimCheck { if _, exists := trimKeysMap[key]; exists { value = trimValue(value) } } //正则匹配 if checkOptions.RegexCheck { if _, exists := i.RegexFiledMap[key]; exists { match := RegexMatch(i.RegexFiledMap[key].Rule, value) if match == i.RegexFiledMap[key].MatchBool { errMsg = append(errMsg, fmt.Sprintf("第%d行,字段:%s,值:%s,格式错误:%s", line+2, key, value, i.RegexFiledMap[key].Desc)) } } } dataMap[key] = value } } if len(errMsg) > 0 { return fmt.Errorf(strings.Join(errMsg, "\n")) } return } func (i *ImportExcel) GetRows(request *http.Request) (err error) { err = i.getRowsFromHttp(request) if err != nil { return err } if i.Rows == nil || len(i.Rows) == 0 { return fmt.Errorf("未获取到导入数据或导入数据为空") } return nil } func (i *ImportExcel) GetTaskInfo(taskId string) (*TaskResp, error) { var ( info *Task ) i.TaskId = taskId basePath, err := importLogPath(i.JobName) i.basePath = basePath if err != nil { return nil, err } taskInfo, _ := os.ReadFile(i.taskFile()) _ = sonic.Unmarshal(taskInfo, &info) if info == nil { // 兼容,提供默认值,类似:{"task_id":"1734576235708222000","process":100,"ctime":"2024-12-19 10:44:03","ftime":"2024-12-19 10:44:03","status":2} info = &Task{ TaskId: taskId, Process: 0, Status: TaskStatusInit, Ctime: time.Now().Format(time.DateTime), Ftime: time.Now().Format(time.DateTime), } } if info.Process >= 95 { info.Process = 100 } return &TaskResp{ Task: info, ImportErrLog: i.importLog(), }, nil } func (i *ImportExcel) importLog() []*ImportLog { var logs = []*ImportLog{} _, err := os.Stat(i.importLogFile()) if err != nil { return logs } taskLog, err := os.ReadFile(i.importLogFile()) if err != nil { return logs } logList := strings.Split(string(taskLog), "\n") for _, v := range logList { if v == "" { continue } var log *ImportLog err = sonic.Unmarshal([]byte(v), &log) if err != nil { return logs } logs = append(logs, log) } return logs } func (i *ImportExcel) GetExcel(taskId string) string { i.TaskId = taskId basePath, _ := importLogPath(i.JobName) i.basePath = basePath return i.errExcelFile() } func (i *ImportExcel) DownloadExcel(ctx http.Context) (err error) { taskId := ctx.Query().Get("task_id") addr := i.GetExcel(taskId) _, exist := os.Stat(addr) if exist != nil { return fmt.Errorf("文件不存在") } file, err := os.Open(addr) if err != nil { return err } defer file.Close() payload, err := io.ReadAll(file) if err != nil { return err } // 设置HTTP响应头 // 打开为预览 //ctx.Response().Header().Set("Content-Type", "image/png") // 打开为下载 ctx.Response().Header().Set("Content-Type", "application/octet-stream") ctx.Response().Header().Set("Content-Disposition", "attachment; filename="+taskId+".xlsx") // 将结果写入 _, err = ctx.Response().Write(payload) return } func (i *ImportExcel) DownloadExcelTemp(ctx http.Context) (err error) { file, err := os.Open(i.DownloadUrl) if err != nil { return err } defer file.Close() payload, err := io.ReadAll(file) if err != nil { return err } // 设置HTTP响应头 // 打开为预览 //ctx.Response().Header().Set("Content-Type", "image/png") // 打开为下载 ctx.Response().Header().Set("Content-Type", "application/octet-stream") ctx.Response().Header().Set("Content-Disposition", "attachment; filename="+i.JobName+".xlsx") // 将结果写入 _, err = ctx.Response().Write(payload) return } func (i *ImportExcel) TaskInfo(ctx http.Context) (err error) { taskId := ctx.Query().Get("task_id") importExcelInfo, err := i.GetTaskInfo(taskId) if err != nil { return err } response := make(map[string]interface{}, 1) response["data"] = importExcelInfo return ctx.Result(200, response) } func (i *ImportExcel) TaskHis(ctx http.Context) (err error) { var ( data []map[string]interface{} res ResPage ) page := ctx.Query().Get("page") num := ctx.Query().Get("limit") path := i.taskPath() entries := SortFileWithStatus(path) count := len(entries) pageInt, _ := strconv.ParseInt(page, 10, 64) numInt, _ := strconv.ParseInt(num, 10, 64) begin := (pageInt - 1) * numInt entEnd := begin + numInt if count < int(entEnd) { entEnd = int64(count) } entries = entries[begin:entEnd] for _, entry := range entries { if entry.FileInfo == nil { break } info := make(map[string]interface{}) if entry.IsDir() { continue } info["task_id"] = entry.Name() file := fmt.Sprintf("%s/%s", path, entry.Name()) bytes, _ := os.ReadFile(file) _ = sonic.Unmarshal(bytes, &info) fileOs, _ := os.Stat(file) info["update_time"] = fileOs.ModTime().Format(time.DateTime) data = append(data, info) } res = ResPage{ Page: int(pageInt), Limit: int(numInt), Total: count, Data: data, LastPage: int(math.Ceil(float64(count) / float64(numInt))), } response := make(map[string]interface{}, 1) response["data"] = res return ctx.Result(200, response) } func (i *ImportExcel) getRowsFromHttp(r *http.Request) (err error) { // 定义一个file文件 var file io.ReadCloser // 获取上传的文件 // 获取post请求的参数 fileObjectUrl := r.FormValue("fileObjectUrl") if i.GetFileObject != nil && fileObjectUrl != "" { // 判断fileObjectUrl 是否为excel相关得格式 if !IsExcelFormat(fileObjectUrl) { return fmt.Errorf("文件格式错误, 不是excel") } file, err = i.GetFileObject(fileObjectUrl) if err != nil { return err } // 记录excel地址,用于删除文件 i.FileObjectUrl = fileObjectUrl } else { file, _, err = r.FormFile("file") speed_mode := r.PostForm.Get("speed_mode") if strings.EqualFold(speed_mode, "true") { i.SpeedMod = true } if err != nil { return fmt.Errorf("未找到导入文件: %w", err) } } defer file.Close() // 解析Excel文件 f, err := excelize.OpenReader(file) if err != nil { return fmt.Errorf("解析excel文件失败: %w", err) } defer f.Close() // 获取第一个工作表的名称 sheetNames := f.GetSheetList() if len(sheetNames) == 0 { return fmt.Errorf("无效的excel,未获取到对应的sheet") } rows, err := f.GetRows(sheetNames[0]) if err != nil { return fmt.Errorf("excel内未找到数据: %w", err) } if i.PicSaveHandle != nil { i.RowPicCells, err = f.GetPictureCells(sheetNames[0]) if err != nil { return fmt.Errorf("excel内图片获取失败: %w", err) } // 遍历所有的图片 var picList []*Pic for _, cell := range i.RowPicCells { if cell == "N1" { continue } pics, _err := f.GetPictures(sheetNames[0], cell) if _err != nil { return fmt.Errorf("excel内未找到图片: %w", _err) } if len(pics) > 0 { for _, v := range pics { if v.File != nil { picList = append(picList, &Pic{ PicInfos: v, Cell: cell, }) } } } } fmt.Printf("开始上传OSS,时间:%s \n", time.Now().Format(time.DateTime)) if len(picList) > 0 { var ( cellUrlMap = make(map[string][]string) wg = sync.WaitGroup{} mu sync.Mutex ) wg.Add(len(picList)) for _, pic := range picList { go func() { defer wg.Done() defer func() { if r := recover(); r != nil { fmt.Printf("图片上传失败,cell:%s,url:%s,err:%s", pic.Cell, pic.Url, r) } }() url, _err := i.PicSaveHandle(pic) mu.Lock() if _err != nil { log.Error(fmt.Sprintf("图片上传失败,cell:%s,url:%s,err:%s", pic.Cell, pic.Url, _err.Error())) } cellUrlMap[pic.Cell] = append(cellUrlMap[pic.Cell], url) mu.Unlock() }() } wg.Wait() fmt.Printf("结束上传OSS,时间:%s \n", time.Now().Format(time.DateTime)) if len(cellUrlMap) > 0 { for cell, cellPic := range cellUrlMap { err = f.SetCellValue(sheetNames[0], cell, strings.Join(cellPic, ",")) if err != nil { continue } } } } fmt.Printf("os--暂无图片,时间:%s \n", time.Now().Format(time.DateTime)) //从新获取一次 rows, err = f.GetRows(sheetNames[0]) if err != nil { return fmt.Errorf("excel内未找到数据: %w", err) } } // 获取所有行 if len(rows) == 0 { return fmt.Errorf("无效的excel,未获取到对应的记录数据") } if len(i.Header) > 0 || i.Header != nil { i.Rows = ExchangeRows(rows, i.Header) } return err } func (i *ImportExcel) getRowsFromFile(filePath string) (err error) { return err } func (i *ImportExcel) updateTask() error { file, err := os.OpenFile(i.taskFile(), os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0766) if err != nil { return err } defer file.Close() taskInfo, err := sonic.Marshal(i.task) if err != nil { return err } jsonInfo := string(taskInfo) _, err = file.WriteString(jsonInfo) if err != nil { return err } return nil } func (i *ImportExcel) AddErr(failReason string, key int, row map[string]string) { _ = i.updateImportLog(&ImportLog{ FailReason: failReason, Line: key, }) var interfaces []interface{} rowSort := ExchangeRowWithMap(row, i.Header) for _, v := range rowSort { interfaces = append(interfaces, v) } interfaces = append(interfaces, failReason) i.errRowsExporter.Rows = append(i.errRowsExporter.Rows, interfaces) i.errRowsExporter.ErrFileUrls = append(i.errRowsExporter.ErrFileUrls, row["货品图片"]) i.Next() return } func (i *ImportExcel) Next() (err error) { atomic.AddInt32(&i.importRowsCount, 1) i.task.Process = int(i.importRowsCount * 100 / int32(i.rowCount)) i.updateTask() if int(i.importRowsCount) == i.rowCount { i.task.Process = 100 i.task.Status = TaskStatusCreateFailExcel err = i.updateTask() if err != nil { return } if len(i.errRowsExporter.Rows) > 0 { headerInterfaces := make([]interface{}, len(i.Header)+1) for index, header := range i.Header { headerInterfaces[index] = header } headerInterfaces[len(headerInterfaces)-1] = "失败原因" i.errRowsExporter.Header = headerInterfaces err = i.errRowsExporter.Init() if err != nil { return } for _, v := range i.errRowsExporter.Rows { _ = i.errRowsExporter.Write(v) } i.errRowsExporter.Save() } // 删除已经上传到OSS的图片 if len(i.errRowsExporter.ErrFileUrls) > 0 { var errFileObjectName []string for _, url := range i.errRowsExporter.ErrFileUrls { if url == "" { continue } ossUrl := strings.Split(url, ",") for _, v := range ossUrl { parsedURL, err := urlnet.Parse(v) if err != nil { fmt.Printf("解析URL失败: %v\n", err) continue } // 去掉path前面得/ parsedURL.Path = strings.TrimPrefix(parsedURL.Path, "/") errFileObjectName = append(errFileObjectName, parsedURL.Path) } } // 批量删除OSS文件 if len(errFileObjectName) > 0 { if i.DeleteOssHandle != nil { err = i.DeleteOssHandle(errFileObjectName) if err != nil { return } } } } if i.FileObjectUrl != "" { // 批量删除OSS文件 if i.DeleteOssHandle != nil { err = i.DeleteOssHandle([]string{i.FileObjectUrl}) if err != nil { return } } } } i.task.Status = TaskStatusRunning i.task.Ftime = time.Now().Format(time.DateTime) i.updateTask() return nil } func (i *ImportExcel) CreateFailExcel() { i.importRowsCount++ } func (i *ImportExcel) updateImportLog(importLog *ImportLog) error { file, err := os.OpenFile(i.importLogFile(), os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0766) if err != nil { return err } defer file.Close() logInfo, err := sonic.Marshal(importLog) if err != nil { return err } jsonInfo := string(logInfo) _, err = file.WriteString(jsonInfo + "\n") if err != nil { return err } return nil } func (i *ImportExcel) taskFile() string { return i.basePath + "/task/" + i.TaskId } func (i *ImportExcel) taskPath() string { if i.basePath == "" { basePath, _ := importLogPath(i.JobName) i.basePath = basePath } return i.basePath + "/task/" } func (i *ImportExcel) importLogFile() string { return i.basePath + "/import/" + i.TaskId } func (i *ImportExcel) errExcelPath() string { return i.basePath + "/excel/" } func (i *ImportExcel) errExcelFile() string { return i.errExcelPath() + i.TaskId + ".xlsx" } func (i *ImportExcel) createTaskId() string { return fmt.Sprintf("%d", time.Now().UnixNano()) }