package main import ( "fmt" "strings" "sync" "sync/atomic" ) func cutData(data []map[string]string, cut int) chan []map[string]string { lent := (len(data) / cut) + 1 batchChan := make(chan []map[string]string, lent) for i := 0; i < len(data); i += cut { if i+cut > len(data) { batchChan <- data[i:] continue } batchChan <- data[i : i+cut] } return batchChan } func saveData(dataChan chan []map[string]string, set *Set, title []string, dir string) { var ( wg sync.WaitGroup current int64 errCount atomic.Int32 ) lent := len(dataChan) wg.Add(lent) for v := range dataChan { v := v go func() { defer wg.Done() defer func() { if e := recover(); e != nil { warning(fmt.Sprintf("协程弹出")) } }() defer func() { atomic.AddInt64(¤t, 1) log("当前进度:%d/%d", current, lent) if current == int64(lent) { close(dataChan) } }() switch set.Op { case Add: add(title, v, &errCount) case OverWrite: table, err := db.DB() if err != nil { warning(err.Error()) } defer table.Close() for _, item := range v { var ( dataRaw strings.Builder judge strings.Builder ) is_start := true dataRaw.WriteString(fmt.Sprintf("SELECT %s From %s WHERE ", fmt.Sprintf("`%s`", strings.Join(set.OverWriteJudColumns, "`,`")), c.Table)) for _, t := range set.OverWriteJudColumns { if strings.Contains(item[t], "'") { item[t] = strings.ReplaceAll(item[t], "'", "`") } if strings.Contains(item[t], `"`) { item[t] = strings.ReplaceAll(item[t], `"`, "`") } if is_start { judge.WriteString(fmt.Sprintf("%s=%s", t, item[t])) is_start = false continue } judge.WriteString(fmt.Sprintf(" AND %s=%s", t, item[t])) } dataRaw.WriteString(judge.String()) dataRaw.WriteString(" LIMIT 1") raw := dataRaw.String() result, _err := table.Query(raw) defer result.Close() if _err != nil { warning(fmt.Sprintf(" 查询失败: %v", _err)) return } var adds []map[string]string if result.Next() { var ( updateRaw strings.Builder updateStartLock = false ) updateRaw.WriteString(fmt.Sprintf("Update %s Set ", c.Table)) for key, value := range item { if !updateStartLock { updateRaw.WriteString(fmt.Sprintf(" %s=%s", key, value)) } updateRaw.WriteString(fmt.Sprintf(",%s=%s", key, value)) } updateRaw.WriteString(judge.String()) update := updateRaw.String() result := db.Exec(update) if result.Error != nil { errCount.Add(1) warning(fmt.Sprintf("failed to insert user: %v", result.Error)) return } } else { adds = append(adds, item) } if len(adds) > 0 { add(title, adds, &errCount) } } default: } }() } wg.Wait() success("导入完毕.....失败:%d", errCount.Load()) finish(dir) } func add(title []string, v []map[string]string, errCount *atomic.Int32) { var ( err error query strings.Builder ) query.WriteString(fmt.Sprintf("INSERT INTO %s (`%s`) VALUES ", c.Table, strings.Join(title, "`,`"))) is_start := true for _, item := range v { var dataRaw strings.Builder if !is_start { dataRaw.WriteString(",") } else { is_start = false } dataRaw.WriteString(" (") for _, t := range title { if strings.Contains(item[t], "'") { item[t] = strings.ReplaceAll(item[t], "'", "`") } if strings.Contains(item[t], `"`) { item[t] = strings.ReplaceAll(item[t], `"`, "`") } dataRaw.WriteString(fmt.Sprintf("'%s',", item[t])) } str := dataRaw.String() str = str[:len(str)-1] query.WriteString(str + ")") } query.WriteString(";") raw := query.String() result := db.Exec(raw) if result.Error != nil { errCount.Add(1) warning(fmt.Sprintf("failed to insert user: %v", result.Error)) return } if err != nil { errCount.Add(1) warning("数据保存失败:%v", err) return } }