From f168ead65d8e7721b743cc18829d3d0eda5d12e4 Mon Sep 17 00:00:00 2001 From: renzhiyuan <465386466@qq.com> Date: Tue, 27 May 2025 18:30:05 +0800 Subject: [PATCH] 11 --- csv.go | 15 +++-- func.go | 154 ++++++++++++++++++++++++++++++++---------------- import_csv_data | 1 - main.go | 4 +- op.go | 4 +- 5 files changed, 116 insertions(+), 62 deletions(-) delete mode 100644 import_csv_data diff --git a/csv.go b/csv.go index 4f7a2a4..2b28d44 100644 --- a/csv.go +++ b/csv.go @@ -37,26 +37,29 @@ func getCsvInfo(dir string) (dataMap []map[string]string, title []string, files prompt := promptui.Select{ Label: "请选择需要导入的csv文件: ", Items: append([]string{"全选", "导入非`已导入_`开头的文件"}, csvFiles...), - Size: len(csvFiles), + Size: len(csvFiles) + 2, } _, f, err := prompt.Run() if err != nil { warning("Prompt failed %v\n", err) continue } + f = strings.TrimSpace(f) - fp = fmt.Sprintf("%s/%s", fp, f) - switch fp { + switch f { case "全选": - files = csvFiles + for _, s := range csvFiles { + files = append(files, fmt.Sprintf("%s%s", fp, s)) + } + case "导入非`已导入_`开头的文件": for _, s := range csvFiles { if !strings.HasPrefix(s, "已导入_") { - files = append(files, s) + files = append(files, fmt.Sprintf("%s%s", fp, s)) } } default: - files = append(files, fp) + files = append(files, fmt.Sprintf("%s%s", fp, f)) } break } diff --git a/func.go b/func.go index 6ec9ed2..ee39869 100644 --- a/func.go +++ b/func.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "strings" "sync" "sync/atomic" @@ -22,13 +23,15 @@ func cutData(data []map[string]string, cut int) chan []map[string]string { func saveData(dataChan chan []map[string]string, set *Set, title []string, dir string) { var ( - wg sync.WaitGroup - current int64 + wg sync.WaitGroup + current int64 + errCount atomic.Int32 ) lent := len(dataChan) wg.Add(lent) for v := range dataChan { + v := v go func() { defer wg.Done() defer func() { @@ -44,53 +47,23 @@ func saveData(dataChan chan []map[string]string, set *Set, title []string, dir s } }() - var ( - err error - query strings.Builder - ) - switch set.Op { case Add: - query.WriteString(fmt.Sprintf("INSERT INTO %s (`%s`) VALUES ", c.Table, strings.Join(title, "`,`"))) - is_start := true - for _, item := range v { - var dataRaw strings.Builder - if !is_start { - dataRaw.WriteString(",") - } else { - is_start = false - } - dataRaw.WriteString(" (") - for _, t := range title { - if strings.Contains(item[t], "'") { - item[t] = strings.ReplaceAll(item[t], "'", "`") - } - if strings.Contains(item[t], `"`) { - item[t] = strings.ReplaceAll(item[t], `"`, "`") - } - dataRaw.WriteString(fmt.Sprintf("'%s',", item[t])) - } - str := dataRaw.String() - str = str[:len(str)-1] - query.WriteString(str + ")") - } - query.WriteString(";") - raw := query.String() - result := db.Exec(raw) - if result.Error != nil { - warning(fmt.Sprintf("failed to insert user: %v", result.Error)) - return - } - if err != nil { - warning("数据保存失败:%v", err) - return - } - + add(title, v, &errCount) case OverWrite: + table, err := db.DB() + if err != nil { + warning(err.Error()) + } + defer table.Close() for _, item := range v { - var dataRaw strings.Builder + var ( + dataRaw strings.Builder + judge strings.Builder + ) + is_start := true - dataRaw.WriteString(fmt.Sprintf("SELECT %s From %s WHERE ", strings.Join(set.OverWriteJudColumns, "`,`"), c.Table)) + dataRaw.WriteString(fmt.Sprintf("SELECT %s From %s WHERE ", fmt.Sprintf("`%s`", strings.Join(set.OverWriteJudColumns, "`,`")), c.Table)) for _, t := range set.OverWriteJudColumns { if strings.Contains(item[t], "'") { item[t] = strings.ReplaceAll(item[t], "'", "`") @@ -98,25 +71,102 @@ func saveData(dataChan chan []map[string]string, set *Set, title []string, dir s if strings.Contains(item[t], `"`) { item[t] = strings.ReplaceAll(item[t], `"`, "`") } - if !is_start { - dataRaw.WriteString(fmt.Sprintf("'%s'=%s", t, item[t])) + if is_start { + judge.WriteString(fmt.Sprintf("%s=%s", t, item[t])) + is_start = false + continue } - dataRaw.WriteString(fmt.Sprintf("AND '%s'=%s", item[t])) + judge.WriteString(fmt.Sprintf(" AND %s=%s", t, item[t])) } + dataRaw.WriteString(judge.String()) + dataRaw.WriteString(" LIMIT 1") raw := dataRaw.String() - result := db.Exec(raw) - if result.Error != nil { - warning(fmt.Sprintf(" 查询失败: %v", result.Error)) + + result, _err := table.Query(raw) + defer result.Close() + if _err != nil { + warning(fmt.Sprintf(" 查询失败: %v", _err)) return } + var adds []map[string]string + if result.Next() { + var ( + updateRaw strings.Builder + updateStartLock = false + ) + updateRaw.WriteString(fmt.Sprintf("Update %s Set ", c.Table)) + for key, value := range item { + if !updateStartLock { + updateRaw.WriteString(fmt.Sprintf(" %s=%s", key, value)) + } + updateRaw.WriteString(fmt.Sprintf(",%s=%s", key, value)) + } + updateRaw.WriteString(judge.String()) + update := updateRaw.String() + result := db.Exec(update) + if result.Error != nil { + errCount.Add(1) + warning(fmt.Sprintf("failed to insert user: %v", result.Error)) + return + } + } else { + adds = append(adds, item) + } + if len(adds) > 0 { + add(title, adds, &errCount) + } } default: - query.WriteString(";") + } }() } wg.Wait() - success("导入完毕.....") + success("导入完毕.....失败:%d", errCount.Load()) + finish(dir) } + +func add(title []string, v []map[string]string, errCount *atomic.Int32) { + var ( + err error + query strings.Builder + ) + query.WriteString(fmt.Sprintf("INSERT INTO %s (`%s`) VALUES ", c.Table, strings.Join(title, "`,`"))) + is_start := true + for _, item := range v { + var dataRaw strings.Builder + if !is_start { + dataRaw.WriteString(",") + } else { + is_start = false + } + dataRaw.WriteString(" (") + for _, t := range title { + if strings.Contains(item[t], "'") { + item[t] = strings.ReplaceAll(item[t], "'", "`") + } + if strings.Contains(item[t], `"`) { + item[t] = strings.ReplaceAll(item[t], `"`, "`") + } + dataRaw.WriteString(fmt.Sprintf("'%s',", item[t])) + } + str := dataRaw.String() + str = str[:len(str)-1] + query.WriteString(str + ")") + } + query.WriteString(";") + raw := query.String() + result := db.Exec(raw) + if result.Error != nil { + errCount.Add(1) + warning(fmt.Sprintf("failed to insert user: %v", result.Error)) + return + } + if err != nil { + errCount.Add(1) + warning("数据保存失败:%v", err) + return + } +} diff --git a/import_csv_data b/import_csv_data deleted file mode 100644 index ac6f3ce..0000000 --- a/import_csv_data +++ /dev/null @@ -1 +0,0 @@ -s1QM2t7N1NQtPksWJpIjU5swReLzUGryqueba8bh/Yj0YQB0zMpxxT/kwjE09zq2ZBcC4TvUr4SHL6WbcT3OT991BATzhZj5s1LJ9oo1ZWFObSZUHhGYiUJ+y92ynZYGMP2kHMrrfNgHkGVpjvcUo8mIbDIJAW6m9XUSYBwdYUZoecXe35g8F71l11ipDdfMDWmOH3DZzy9tzBM= \ No newline at end of file diff --git a/main.go b/main.go index fbe34bb..3eb2808 100644 --- a/main.go +++ b/main.go @@ -37,6 +37,8 @@ var ( db *gorm.DB ) +var Cut int = 1 + func main() { log("正在读取配置。。。") dir, err := os.Getwd() @@ -57,7 +59,7 @@ func main() { func do(dir string) { csvData, title, files := getCsvInfo(dir) set := setOp(title) - cutChannel := cutData(csvData, 1) + cutChannel := cutData(csvData, Cut) saveData(cutChannel, set, title, dir) overFiles(files) finish(dir) diff --git a/op.go b/op.go index aa9549f..ff8f29b 100644 --- a/op.go +++ b/op.go @@ -12,7 +12,7 @@ const ( ) var opIndex = map[string]opType{ - "新增": Add, + "新增": Add, "覆盖(有则改,无则增)": OverWrite, } @@ -26,7 +26,7 @@ func setOp(title []string) *Set { for { prompt := promptui.Select{ Label: "请选择需要执行的操作: ", - Items: []string{"新增"}, //, "覆盖(有则改,无则增)" + Items: []string{"新增"}, // "覆盖(有则改,无则增)" } _, r, err := prompt.Run() if err != nil {