import_csv_datacenter/func.go

173 lines
4.0 KiB
Go

package main
import (
"fmt"
"strings"
"sync"
"sync/atomic"
)
func cutData(data []map[string]string, cut int) chan []map[string]string {
lent := (len(data) / cut) + 1
batchChan := make(chan []map[string]string, lent)
for i := 0; i < len(data); i += cut {
if i+cut > len(data) {
batchChan <- data[i:]
continue
}
batchChan <- data[i : i+cut]
}
return batchChan
}
func saveData(dataChan chan []map[string]string, set *Set, title []string, dir string) {
var (
wg sync.WaitGroup
current int64
errCount atomic.Int32
)
lent := len(dataChan)
wg.Add(lent)
for v := range dataChan {
v := v
go func() {
defer wg.Done()
defer func() {
if e := recover(); e != nil {
warning(fmt.Sprintf("协程弹出"))
}
}()
defer func() {
atomic.AddInt64(&current, 1)
log("当前进度:%d/%d", current, lent)
if current == int64(lent) {
close(dataChan)
}
}()
switch set.Op {
case Add:
add(title, v, &errCount)
case OverWrite:
table, err := db.DB()
if err != nil {
warning(err.Error())
}
defer table.Close()
for _, item := range v {
var (
dataRaw strings.Builder
judge strings.Builder
)
is_start := true
dataRaw.WriteString(fmt.Sprintf("SELECT %s From %s WHERE ", fmt.Sprintf("`%s`", strings.Join(set.OverWriteJudColumns, "`,`")), c.Table))
for _, t := range set.OverWriteJudColumns {
if strings.Contains(item[t], "'") {
item[t] = strings.ReplaceAll(item[t], "'", "`")
}
if strings.Contains(item[t], `"`) {
item[t] = strings.ReplaceAll(item[t], `"`, "`")
}
if is_start {
judge.WriteString(fmt.Sprintf("%s=%s", t, item[t]))
is_start = false
continue
}
judge.WriteString(fmt.Sprintf(" AND %s=%s", t, item[t]))
}
dataRaw.WriteString(judge.String())
dataRaw.WriteString(" LIMIT 1")
raw := dataRaw.String()
result, _err := table.Query(raw)
defer result.Close()
if _err != nil {
warning(fmt.Sprintf(" 查询失败: %v", _err))
return
}
var adds []map[string]string
if result.Next() {
var (
updateRaw strings.Builder
updateStartLock = false
)
updateRaw.WriteString(fmt.Sprintf("Update %s Set ", c.Table))
for key, value := range item {
if !updateStartLock {
updateRaw.WriteString(fmt.Sprintf(" %s=%s", key, value))
}
updateRaw.WriteString(fmt.Sprintf(",%s=%s", key, value))
}
updateRaw.WriteString(judge.String())
update := updateRaw.String()
result := db.Exec(update)
if result.Error != nil {
errCount.Add(1)
warning(fmt.Sprintf("failed to insert user: %v", result.Error))
return
}
} else {
adds = append(adds, item)
}
if len(adds) > 0 {
add(title, adds, &errCount)
}
}
default:
}
}()
}
wg.Wait()
success("导入完毕.....失败:%d", errCount.Load())
finish(dir)
}
func add(title []string, v []map[string]string, errCount *atomic.Int32) {
var (
err error
query strings.Builder
)
query.WriteString(fmt.Sprintf("INSERT INTO %s (`%s`) VALUES ", c.Table, strings.Join(title, "`,`")))
is_start := true
for _, item := range v {
var dataRaw strings.Builder
if !is_start {
dataRaw.WriteString(",")
} else {
is_start = false
}
dataRaw.WriteString(" (")
for _, t := range title {
if strings.Contains(item[t], "'") {
item[t] = strings.ReplaceAll(item[t], "'", "`")
}
if strings.Contains(item[t], `"`) {
item[t] = strings.ReplaceAll(item[t], `"`, "`")
}
dataRaw.WriteString(fmt.Sprintf("'%s',", item[t]))
}
str := dataRaw.String()
str = str[:len(str)-1]
query.WriteString(str + ")")
}
query.WriteString(";")
raw := query.String()
result := db.Exec(raw)
if result.Error != nil {
errCount.Add(1)
warning(fmt.Sprintf("failed to insert user: %v", result.Error))
return
}
if err != nil {
errCount.Add(1)
warning("数据保存失败:%v", err)
return
}
}