123 lines
2.8 KiB
Go
123 lines
2.8 KiB
Go
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
)
|
|
|
|
func cutData(data []map[string]string, cut int) chan []map[string]string {
|
|
lent := (len(data) / cut) + 1
|
|
batchChan := make(chan []map[string]string, lent)
|
|
for i := 0; i < len(data); i += cut {
|
|
if i+cut > len(data) {
|
|
batchChan <- data[i:]
|
|
continue
|
|
}
|
|
batchChan <- data[i : i+cut]
|
|
}
|
|
return batchChan
|
|
}
|
|
|
|
func saveData(dataChan chan []map[string]string, set *Set, title []string, dir string) {
|
|
var (
|
|
wg sync.WaitGroup
|
|
current int64
|
|
)
|
|
|
|
lent := len(dataChan)
|
|
wg.Add(lent)
|
|
for v := range dataChan {
|
|
go func() {
|
|
defer wg.Done()
|
|
defer func() {
|
|
if e := recover(); e != nil {
|
|
warning(fmt.Sprintf("协程弹出"))
|
|
}
|
|
}()
|
|
defer func() {
|
|
atomic.AddInt64(¤t, 1)
|
|
log("当前进度:%d/%d", current, lent)
|
|
if current == int64(lent) {
|
|
close(dataChan)
|
|
}
|
|
}()
|
|
|
|
var (
|
|
err error
|
|
query strings.Builder
|
|
)
|
|
|
|
switch set.Op {
|
|
case Add:
|
|
query.WriteString(fmt.Sprintf("INSERT INTO %s (`%s`) VALUES ", c.Table, strings.Join(title, "`,`")))
|
|
is_start := true
|
|
for _, item := range v {
|
|
var dataRaw strings.Builder
|
|
if !is_start {
|
|
dataRaw.WriteString(",")
|
|
} else {
|
|
is_start = false
|
|
}
|
|
dataRaw.WriteString(" (")
|
|
for _, t := range title {
|
|
if strings.Contains(item[t], "'") {
|
|
item[t] = strings.ReplaceAll(item[t], "'", "`")
|
|
}
|
|
if strings.Contains(item[t], `"`) {
|
|
item[t] = strings.ReplaceAll(item[t], `"`, "`")
|
|
}
|
|
dataRaw.WriteString(fmt.Sprintf("'%s',", item[t]))
|
|
}
|
|
str := dataRaw.String()
|
|
str = str[:len(str)-1]
|
|
query.WriteString(str + ")")
|
|
}
|
|
query.WriteString(";")
|
|
raw := query.String()
|
|
result := db.Exec(raw)
|
|
if result.Error != nil {
|
|
warning(fmt.Sprintf("failed to insert user: %v", result.Error))
|
|
return
|
|
}
|
|
if err != nil {
|
|
warning("数据保存失败:%v", err)
|
|
return
|
|
}
|
|
|
|
case OverWrite:
|
|
for _, item := range v {
|
|
var dataRaw strings.Builder
|
|
is_start := true
|
|
dataRaw.WriteString(fmt.Sprintf("SELECT %s From %s WHERE ", strings.Join(set.OverWriteJudColumns, "`,`"), c.Table))
|
|
for _, t := range set.OverWriteJudColumns {
|
|
if strings.Contains(item[t], "'") {
|
|
item[t] = strings.ReplaceAll(item[t], "'", "`")
|
|
}
|
|
if strings.Contains(item[t], `"`) {
|
|
item[t] = strings.ReplaceAll(item[t], `"`, "`")
|
|
}
|
|
if !is_start {
|
|
dataRaw.WriteString(fmt.Sprintf("'%s'=%s", t, item[t]))
|
|
}
|
|
dataRaw.WriteString(fmt.Sprintf("AND '%s'=%s", item[t]))
|
|
}
|
|
raw := dataRaw.String()
|
|
result := db.Exec(raw)
|
|
if result.Error != nil {
|
|
warning(fmt.Sprintf(" 查询失败: %v", result.Error))
|
|
return
|
|
}
|
|
|
|
}
|
|
default:
|
|
query.WriteString(";")
|
|
}
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
success("导入完毕.....")
|
|
finish(dir)
|
|
}
|