Compare commits

...

3 Commits

Author SHA1 Message Date
renzhiyuan f168ead65d 11 2025-05-27 18:30:05 +08:00
renzhiyuan 1d029e0234 Merge branch 'master' of https://gitea.cdlsxd.cn/self-tools/import_csv_datacenter 2025-05-27 16:41:45 +08:00
renzhiyuan a7bd12a344 11 2025-05-27 16:40:54 +08:00
5 changed files with 180 additions and 61 deletions

51
csv.go
View File

@ -10,11 +10,22 @@ import (
"strings"
)
func getCsvInfo(dir string) (dataMap []map[string]string, title []string) {
func getCsvInfo(dir string) (dataMap []map[string]string, title []string, files []string) {
fp := fmt.Sprintf("%s/%s/", dir, "csvfile")
if _, err := os.Stat(fp); os.IsNotExist(err) {
// 文件夹不存在,创建它
err := os.Mkdir(fp, 0755) // 0755 是常见的目录权限
if err != nil {
fatal("无法创建目录 %s: %v", fp, err)
}
}
entries, err := os.ReadDir(fp)
if err != nil {
panic("当前目录下未找到csvfile文件夹请自行创建")
fatal("获取文件目录失败 %s: %v", fp, err)
}
if len(entries) == 0 {
warning("请将需导入文件放入%s目录下", fp)
exit()
}
var csvFiles []string
for _, entry := range entries {
@ -25,18 +36,46 @@ func getCsvInfo(dir string) (dataMap []map[string]string, title []string) {
for {
prompt := promptui.Select{
Label: "请选择需要导入的csv文件: ",
Items: csvFiles,
Size: len(csvFiles),
Items: append([]string{"全选", "导入非`已导入_`开头的文件"}, csvFiles...),
Size: len(csvFiles) + 2,
}
_, f, err := prompt.Run()
if err != nil {
warning("Prompt failed %v\n", err)
continue
}
fp = fmt.Sprintf("%s/%s", fp, f)
f = strings.TrimSpace(f)
switch f {
case "全选":
for _, s := range csvFiles {
files = append(files, fmt.Sprintf("%s%s", fp, s))
}
case "导入非`已导入_`开头的文件":
for _, s := range csvFiles {
if !strings.HasPrefix(s, "已导入_") {
files = append(files, fmt.Sprintf("%s%s", fp, s))
}
}
default:
files = append(files, fmt.Sprintf("%s%s", fp, f))
}
break
}
return ReadCsvData(fp)
if len(files) == 0 {
warning("未找到符合条件的文件")
finish(dir)
}
for _, v := range files {
data, fileTitle := ReadCsvData(v)
if title == nil {
title = fileTitle
}
dataMap = append(dataMap, data...)
}
return dataMap, title, files
}
func ReadCsvData(fp string) (dataMap []map[string]string, title []string) {

155
func.go
View File

@ -2,6 +2,7 @@ package main
import (
"fmt"
"strings"
"sync"
"sync/atomic"
@ -22,13 +23,15 @@ func cutData(data []map[string]string, cut int) chan []map[string]string {
func saveData(dataChan chan []map[string]string, set *Set, title []string, dir string) {
var (
wg sync.WaitGroup
current int64
wg sync.WaitGroup
current int64
errCount atomic.Int32
)
lent := len(dataChan)
wg.Add(lent)
for v := range dataChan {
v := v
go func() {
defer wg.Done()
defer func() {
@ -44,53 +47,23 @@ func saveData(dataChan chan []map[string]string, set *Set, title []string, dir s
}
}()
var (
err error
query strings.Builder
)
switch set.Op {
case Add:
query.WriteString(fmt.Sprintf("INSERT INTO %s (`%s`) VALUES ", c.Table, strings.Join(title, "`,`")))
is_start := true
for _, item := range v {
var dataRaw strings.Builder
if !is_start {
dataRaw.WriteString(",")
} else {
is_start = false
}
dataRaw.WriteString(" (")
for _, t := range title {
if strings.Contains(item[t], "'") {
item[t] = strings.ReplaceAll(item[t], "'", "`")
}
if strings.Contains(item[t], `"`) {
item[t] = strings.ReplaceAll(item[t], `"`, "`")
}
dataRaw.WriteString(fmt.Sprintf("'%s',", item[t]))
}
str := dataRaw.String()
str = str[:len(str)-1]
query.WriteString(str + ")")
}
query.WriteString(";")
raw := query.String()
result := db.Exec(raw)
if result.Error != nil {
warning(fmt.Sprintf("failed to insert user: %v", result.Error))
return
}
if err != nil {
warning("数据保存失败:%v", err)
return
}
add(title, v, &errCount)
case OverWrite:
table, err := db.DB()
if err != nil {
warning(err.Error())
}
defer table.Close()
for _, item := range v {
var dataRaw strings.Builder
var (
dataRaw strings.Builder
judge strings.Builder
)
is_start := true
dataRaw.WriteString(fmt.Sprintf("SELECT %s From %s WHERE ", strings.Join(title, "`,`"), c.Table))
dataRaw.WriteString(fmt.Sprintf("SELECT %s From %s WHERE ", fmt.Sprintf("`%s`", strings.Join(set.OverWriteJudColumns, "`,`")), c.Table))
for _, t := range set.OverWriteJudColumns {
if strings.Contains(item[t], "'") {
item[t] = strings.ReplaceAll(item[t], "'", "`")
@ -98,18 +71,102 @@ func saveData(dataChan chan []map[string]string, set *Set, title []string, dir s
if strings.Contains(item[t], `"`) {
item[t] = strings.ReplaceAll(item[t], `"`, "`")
}
if !is_start {
dataRaw.WriteString(fmt.Sprintf("'%s',", item[t]))
if is_start {
judge.WriteString(fmt.Sprintf("%s=%s", t, item[t]))
is_start = false
continue
}
dataRaw.WriteString(fmt.Sprintf("'%s',", item[t]))
judge.WriteString(fmt.Sprintf(" AND %s=%s", t, item[t]))
}
dataRaw.WriteString(judge.String())
dataRaw.WriteString(" LIMIT 1")
raw := dataRaw.String()
result, _err := table.Query(raw)
defer result.Close()
if _err != nil {
warning(fmt.Sprintf(" 查询失败: %v", _err))
return
}
var adds []map[string]string
if result.Next() {
var (
updateRaw strings.Builder
updateStartLock = false
)
updateRaw.WriteString(fmt.Sprintf("Update %s Set ", c.Table))
for key, value := range item {
if !updateStartLock {
updateRaw.WriteString(fmt.Sprintf(" %s=%s", key, value))
}
updateRaw.WriteString(fmt.Sprintf(",%s=%s", key, value))
}
updateRaw.WriteString(judge.String())
update := updateRaw.String()
result := db.Exec(update)
if result.Error != nil {
errCount.Add(1)
warning(fmt.Sprintf("failed to insert user: %v", result.Error))
return
}
} else {
adds = append(adds, item)
}
if len(adds) > 0 {
add(title, adds, &errCount)
}
}
default:
query.WriteString(";")
}
}()
}
wg.Wait()
success("导入完毕.....")
success("导入完毕.....失败:%d", errCount.Load())
finish(dir)
}
func add(title []string, v []map[string]string, errCount *atomic.Int32) {
var (
err error
query strings.Builder
)
query.WriteString(fmt.Sprintf("INSERT INTO %s (`%s`) VALUES ", c.Table, strings.Join(title, "`,`")))
is_start := true
for _, item := range v {
var dataRaw strings.Builder
if !is_start {
dataRaw.WriteString(",")
} else {
is_start = false
}
dataRaw.WriteString(" (")
for _, t := range title {
if strings.Contains(item[t], "'") {
item[t] = strings.ReplaceAll(item[t], "'", "`")
}
if strings.Contains(item[t], `"`) {
item[t] = strings.ReplaceAll(item[t], `"`, "`")
}
dataRaw.WriteString(fmt.Sprintf("'%s',", item[t]))
}
str := dataRaw.String()
str = str[:len(str)-1]
query.WriteString(str + ")")
}
query.WriteString(";")
raw := query.String()
result := db.Exec(raw)
if result.Error != nil {
errCount.Add(1)
warning(fmt.Sprintf("failed to insert user: %v", result.Error))
return
}
if err != nil {
errCount.Add(1)
warning("数据保存失败:%v", err)
return
}
}

View File

@ -1 +0,0 @@
s1QM2t7N1NQtPksWJpIjU5swReLzUGryqueba8bh/Yj0YQB0zMpxxT/kwjE09zq2ZBcC4TvUr4SHL6WbcT3OT991BATzhZj5s1LJ9oo1ZWFObSZUHhGYiUJ+y92ynZYGMP2kHMrrfNgHkGVpjvcUo8mIbDIJAW6m9XUSYBwdYUZoecXe35g8F71l11ipDdfMDWmOH3DZzy9tzBM=

30
main.go
View File

@ -7,6 +7,7 @@ import (
"gorm.io/gorm"
"os"
"strings"
"time"
)
var (
@ -36,6 +37,8 @@ var (
db *gorm.DB
)
var Cut int = 1
func main() {
log("正在读取配置。。。")
dir, err := os.Getwd()
@ -54,10 +57,11 @@ func main() {
}
func do(dir string) {
csvData, title := getCsvInfo(dir)
csvData, title, files := getCsvInfo(dir)
set := setOp(title)
cutChannel := cutData(csvData, 100)
cutChannel := cutData(csvData, Cut)
saveData(cutChannel, set, title, dir)
overFiles(files)
finish(dir)
select {}
}
@ -141,9 +145,29 @@ func cConf(fp string) *Conf {
}
func exit() {
os.Exit(1)
waitTime := 3
warning("程序即将再%d秒后退出", waitTime)
for {
if waitTime == 0 {
os.Exit(1)
}
time.Sleep(1 * time.Second)
warning("%d", waitTime)
waitTime--
}
}
func finish(dir string) {
do(dir)
}
func overFiles(files []string) {
for _, v := range files {
err := os.Rename(v, fmt.Sprintf("已导入_%s", v))
if err != nil {
warning("重命名文件时出错: %v\n", err)
exit()
}
}
}

4
op.go
View File

@ -12,7 +12,7 @@ const (
)
var opIndex = map[string]opType{
"新增": Add,
"新增": Add,
"覆盖(有则改,无则增)": OverWrite,
}
@ -26,7 +26,7 @@ func setOp(title []string) *Set {
for {
prompt := promptui.Select{
Label: "请选择需要执行的操作: ",
Items: []string{"新增"}, //, "覆盖(有则改,无则增)"
Items: []string{"新增"}, // "覆盖(有则改,无则增)"
}
_, r, err := prompt.Run()
if err != nil {