libf 3 months ago
parent
commit
2463e63acb
8 changed files with 127 additions and 72 deletions
  1. 2 2
      cgimport.conf
  2. 1 1
      go.mod
  3. 2 2
      go.sum
  4. 6 9
      importer/cgistatus.go
  5. 47 25
      importer/importer.go
  6. 57 25
      importer/odbcimporter.go
  7. 4 4
      main.go
  8. 8 4
      odbc/odbclient.go

+ 2 - 2
cgimport.conf

@@ -1,8 +1,8 @@
 
 
 [cgi]
 [cgi]
 datapath=
 datapath=
-parallel=2
+parallel=20
 
 
 [odbc]
 [odbc]
-concurrent.limit=2
+concurrent.limit=20
 
 

+ 1 - 1
go.mod

@@ -7,7 +7,7 @@ require (
 	github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da
 	github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da
 	github.com/scylladb/go-set v1.0.2
 	github.com/scylladb/go-set v1.0.2
 	github.com/spf13/cast v1.7.1
 	github.com/spf13/cast v1.7.1
-	github.com/wecisecode/util v0.0.3
+	github.com/wecisecode/util v0.0.4
 	modernc.org/sqlite v1.35.0
 	modernc.org/sqlite v1.35.0
 )
 )
 
 

+ 2 - 2
go.sum

@@ -81,8 +81,8 @@ github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IU
 github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
 github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
 github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
 github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
 github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
 github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
-github.com/wecisecode/util v0.0.3 h1:Vozn8IxWTYMjBKcyhi0yTVlUD/E2TCrnu908Hig8Kfk=
-github.com/wecisecode/util v0.0.3/go.mod h1:3uYUTKKfdmgudEAYCam/38H1LtHTxnD7ly9kGf0Tzp0=
+github.com/wecisecode/util v0.0.4 h1:z51nlZdMQynhSmw1l9/oVFGRxPuQY2FsGwUNPbxIEsQ=
+github.com/wecisecode/util v0.0.4/go.mod h1:3uYUTKKfdmgudEAYCam/38H1LtHTxnD7ly9kGf0Tzp0=
 github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
 github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
 github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
 github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
 go.etcd.io/etcd/api/v3 v3.5.18 h1:Q4oDAKnmwqTo5lafvB+afbgCDF7E35E4EYV2g+FNGhs=
 go.etcd.io/etcd/api/v3 v3.5.18 h1:Q4oDAKnmwqTo5lafvB+afbgCDF7E35E4EYV2g+FNGhs=

+ 6 - 9
importer/cgistatus.go

@@ -3,17 +3,18 @@ package importer
 import (
 import (
 	"encoding/json"
 	"encoding/json"
 	"os"
 	"os"
-	"path/filepath"
 	"sync"
 	"sync"
 	"time"
 	"time"
 
 
 	"git.wecise.com/wecise/cgimport/odbc"
 	"git.wecise.com/wecise/cgimport/odbc"
+	"github.com/wecisecode/util/mio"
 	"github.com/wecisecode/util/rc"
 	"github.com/wecisecode/util/rc"
 )
 )
 
 
 type ImportStatus struct {
 type ImportStatus struct {
 	LinesCount   int64
 	LinesCount   int64
 	RecordsCount int64
 	RecordsCount int64
+	RetryCount   int64
 }
 }
 
 
 type CGIStatus struct {
 type CGIStatus struct {
@@ -31,7 +32,7 @@ type CGIStatus struct {
 
 
 func NewCGIStatus() *CGIStatus {
 func NewCGIStatus() *CGIStatus {
 	return &CGIStatus{
 	return &CGIStatus{
-		filepath:     mcfg.GetString("cgi.statusfile", "/opt/matrix/var/cgimport/"+odbc.Keyspace+".status.txt"),
+		filepath:     mcfg.GetString("cgi.statusfile", "/opt/matrix/var/cgimport/"+odbc.Keyspace()+".status.txt"),
 		ImportStatus: map[string]*ImportStatus{},
 		ImportStatus: map[string]*ImportStatus{},
 		rc:           rc.NewRoutinesController("", 1),
 		rc:           rc.NewRoutinesController("", 1),
 		waitdone:     make(chan any, 1),
 		waitdone:     make(chan any, 1),
@@ -39,7 +40,8 @@ func NewCGIStatus() *CGIStatus {
 }
 }
 
 
 func (cgistatus *CGIStatus) Load() error {
 func (cgistatus *CGIStatus) Load() error {
-	cgistatusbs, e := os.ReadFile(cgistatus.filepath)
+	logger.Info("load progress from", cgistatus.filepath)
+	cgistatusbs, e := mio.ReadFile(cgistatus.filepath)
 	if e != nil && !os.IsNotExist(e) {
 	if e != nil && !os.IsNotExist(e) {
 		return e
 		return e
 	}
 	}
@@ -78,12 +80,7 @@ func (cgistatus *CGIStatus) Save() (err error) {
 			cgistatus.lasterror = e
 			cgistatus.lasterror = e
 			return
 			return
 		}
 		}
-		e = os.MkdirAll(filepath.Dir(cgistatus.filepath), os.ModePerm)
-		if e != nil {
-			cgistatus.lasterror = e
-			return
-		}
-		e = os.WriteFile(cgistatus.filepath, cgistatusbs, os.ModePerm)
+		e = mio.WriteFile(cgistatus.filepath, cgistatusbs, true)
 		if e != nil {
 		if e != nil {
 			cgistatus.lasterror = e
 			cgistatus.lasterror = e
 			return
 			return

+ 47 - 25
importer/importer.go

@@ -37,7 +37,7 @@ type Importer struct {
 	currentstarttime time.Time
 	currentstarttime time.Time
 }
 }
 
 
-func ImportDir(datapath string, parallel int, rebuild, reload bool) (totalfilescount, totallinecount, totalrecordscount int64, totalusetime time.Duration, filescount, linescount, recordscount int64, usetime time.Duration, err error) {
+func ImportDir(datapath string, parallel int, rebuild, reload bool) (totalfilescount, totallinecount, totalrecordscount, totalretrycount int64, totalusetime time.Duration, filescount, linescount, recordscount, retrycount int64, usetime time.Duration, err error) {
 	concurlimt := mcfg.GetInt("odbc.concurrent.limit", 100)
 	concurlimt := mcfg.GetInt("odbc.concurrent.limit", 100)
 	importer := &Importer{
 	importer := &Importer{
 		datapath:     datapath,
 		datapath:     datapath,
@@ -52,7 +52,7 @@ func ImportDir(datapath string, parallel int, rebuild, reload bool) (totalfilesc
 	return importer.Import()
 	return importer.Import()
 }
 }
 
 
-func (importer *Importer) Import() (totalfilescount, totallinecount, totalrecordscount int64, totalusetime time.Duration, filescount, linescount, recordscount int64, usetime time.Duration, err error) {
+func (importer *Importer) Import() (totalfilescount, totallinecount, totalrecordscount, totalretrycount int64, totalusetime time.Duration, filescount, linescount, recordscount, retrycount int64, usetime time.Duration, err error) {
 	if odbc.DevPhase&odbc.DP_PROCESSCONTINUE != 0 && !importer.reload {
 	if odbc.DevPhase&odbc.DP_PROCESSCONTINUE != 0 && !importer.reload {
 		// reload
 		// reload
 		err = importer.importstatus.Load()
 		err = importer.importstatus.Load()
@@ -77,17 +77,20 @@ func (importer *Importer) Import() (totalfilescount, totallinecount, totalrecord
 	if err != nil {
 	if err != nil {
 		return
 		return
 	}
 	}
+	//
+	logger.Info("graph data import start")
+
 	totalusetime = importer.importstatus.TotalUseTime
 	totalusetime = importer.importstatus.TotalUseTime
 	importer.starttime = time.Now().Add(-totalusetime)
 	importer.starttime = time.Now().Add(-totalusetime)
 	importer.currentstarttime = time.Now()
 	importer.currentstarttime = time.Now()
 
 
 	reedgefile := regexp.MustCompile("(?i).*edge.*.csv")
 	reedgefile := regexp.MustCompile("(?i).*edge.*.csv")
-	efc, elc, erc, ut, e := importer.ImportEdgeFiles(reedgefile, false)
+	efc, elc, erc, etc, ut, e := importer.ImportEdgeFiles(reedgefile, false)
 	if e != nil {
 	if e != nil {
 		err = e
 		err = e
 		return
 		return
 	}
 	}
-	afc, alc, arc, ut, e := importer.ImportNonEdgeFiles(reedgefile, true)
+	afc, alc, arc, atc, ut, e := importer.ImportNonEdgeFiles(reedgefile, true)
 	if e != nil {
 	if e != nil {
 		err = e
 		err = e
 		return
 		return
@@ -96,12 +99,15 @@ func (importer *Importer) Import() (totalfilescount, totallinecount, totalrecord
 	for _, v := range importer.importstatus.ImportStatus {
 	for _, v := range importer.importstatus.ImportStatus {
 		totallinecount += v.LinesCount
 		totallinecount += v.LinesCount
 		totalrecordscount += v.RecordsCount
 		totalrecordscount += v.RecordsCount
+		totalretrycount += v.RetryCount
 	}
 	}
 	totallinecount += elc
 	totallinecount += elc
 	totalrecordscount += erc
 	totalrecordscount += erc
+	totalretrycount += etc
 	filescount = afc + efc
 	filescount = afc + efc
 	linescount = alc + elc
 	linescount = alc + elc
 	recordscount = arc + erc
 	recordscount = arc + erc
+	retrycount = atc + etc
 	usetime = ut
 	usetime = ut
 	totalusetime = importer.importstatus.TotalUseTime
 	totalusetime = importer.importstatus.TotalUseTime
 
 
@@ -110,7 +116,7 @@ func (importer *Importer) Import() (totalfilescount, totallinecount, totalrecord
 	return
 	return
 }
 }
 
 
-func (importer *Importer) ImportEdgeFiles(reedgefile *regexp.Regexp, logstatus bool) (filescount, linecount, recordscount int64, usetime time.Duration, err error) {
+func (importer *Importer) ImportEdgeFiles(reedgefile *regexp.Regexp, logstatus bool) (filescount, linecount, recordscount, retrycount int64, usetime time.Duration, err error) {
 	return importer.ImportFiles(func(basedir string, fpath string) FWOP {
 	return importer.ImportFiles(func(basedir string, fpath string) FWOP {
 		if !reedgefile.MatchString(filepath.Base(fpath)) {
 		if !reedgefile.MatchString(filepath.Base(fpath)) {
 			// 忽略非EDGE文件
 			// 忽略非EDGE文件
@@ -120,7 +126,7 @@ func (importer *Importer) ImportEdgeFiles(reedgefile *regexp.Regexp, logstatus b
 	}, logstatus)
 	}, logstatus)
 }
 }
 
 
-func (importer *Importer) ImportNonEdgeFiles(reedgefile *regexp.Regexp, logstatus bool) (filescount, linecount, recordscount int64, usetime time.Duration, err error) {
+func (importer *Importer) ImportNonEdgeFiles(reedgefile *regexp.Regexp, logstatus bool) (filescount, linecount, recordscount, retrycount int64, usetime time.Duration, err error) {
 	return importer.ImportFiles(func(basedir string, fpath string) FWOP {
 	return importer.ImportFiles(func(basedir string, fpath string) FWOP {
 		if reedgefile.MatchString(filepath.Base(fpath)) {
 		if reedgefile.MatchString(filepath.Base(fpath)) {
 			// 忽略EDGE文件
 			// 忽略EDGE文件
@@ -138,7 +144,7 @@ const (
 	FWOP_CONTINUE
 	FWOP_CONTINUE
 )
 )
 
 
-func (importer *Importer) ImportFiles(fwop func(basedir string, fpath string) FWOP, logstatus bool) (filescount, linescount, recordscount int64, usetime time.Duration, err error) {
+func (importer *Importer) ImportFiles(fwop func(basedir string, fpath string) FWOP, logstatus bool) (filescount, linescount, recordscount, retrycount int64, usetime time.Duration, err error) {
 	// 遍历文件目录
 	// 遍历文件目录
 	var wg sync.WaitGroup
 	var wg sync.WaitGroup
 	fw, e := filewalker.NewFileWalker([]string{importer.datapath}, ".*")
 	fw, e := filewalker.NewFileWalker([]string{importer.datapath}, ".*")
@@ -177,32 +183,40 @@ func (importer *Importer) ImportFiles(fwop func(basedir string, fpath string) FW
 				importstatus := importer.importstatus.ImportStatus[filename]
 				importstatus := importer.importstatus.ImportStatus[filename]
 				importer.importstatus.mutex.RUnlock()
 				importer.importstatus.mutex.RUnlock()
 				linefrom, blockfrom := int64(0), int64(0)
 				linefrom, blockfrom := int64(0), int64(0)
+				totalretrycount := int64(0)
 				if importstatus != nil {
 				if importstatus != nil {
-					linefrom, blockfrom = importstatus.LinesCount, importstatus.RecordsCount
+					linefrom, blockfrom, totalretrycount = importstatus.LinesCount, importstatus.RecordsCount, importstatus.RetryCount
 				}
 				}
 				if linefrom == 0 {
 				if linefrom == 0 {
 					logger.Info("import", "file", filename)
 					logger.Info("import", "file", filename)
 				} else {
 				} else {
 					logger.Info("import", "file", filename, "from line", linefrom)
 					logger.Info("import", "file", filename, "from line", linefrom)
 				}
 				}
-				lines, records, e := importer.ImportFile(filename, linefrom, blockfrom, logstatus)
+				lines, records, retries, e := importer.ImportFile(filename, linefrom, blockfrom, totalretrycount, logstatus)
 				if e != nil {
 				if e != nil {
 					err = e
 					err = e
 					return
 					return
 				}
 				}
 				atomic.AddInt64(&linescount, lines-linefrom)
 				atomic.AddInt64(&linescount, lines-linefrom)
 				atomic.AddInt64(&recordscount, records-blockfrom)
 				atomic.AddInt64(&recordscount, records-blockfrom)
+				atomic.AddInt64(&retrycount, retries-totalretrycount)
 				usetime = time.Since(importer.currentstarttime)
 				usetime = time.Since(importer.currentstarttime)
 				importer.importstatus.mutex.Lock()
 				importer.importstatus.mutex.Lock()
 				if logstatus {
 				if logstatus {
 					importer.importstatus.ImportStatus[filename] = &ImportStatus{
 					importer.importstatus.ImportStatus[filename] = &ImportStatus{
 						LinesCount:   lines,
 						LinesCount:   lines,
-						RecordsCount: records}
+						RecordsCount: records,
+						RetryCount:   retries,
+					}
 				}
 				}
 				importer.importstatus.TotalUseTime = time.Since(importer.starttime)
 				importer.importstatus.TotalUseTime = time.Since(importer.starttime)
 				importer.importstatus.mutex.Unlock()
 				importer.importstatus.mutex.Unlock()
 				importer.importstatus.Save()
 				importer.importstatus.Save()
-				logger.Info("file", filename, "imported", records, "records")
+				if totalretrycount > 0 {
+					logger.Info("file", filename, "total imported", lines, "lines", records, "records", retries, "retry times")
+				} else {
+					logger.Info("file", filename, "total imported", lines, "lines", records, "records")
+				}
 			},
 			},
 		)
 		)
 		return true
 		return true
@@ -219,16 +233,16 @@ func (importer *Importer) ImportFiles(fwop func(basedir string, fpath string) FW
 	return
 	return
 }
 }
 
 
-func (importer *Importer) ImportFile(filepath string, linefrom, blockfrom int64, logstatus bool) (linecount, blockcount int64, err error) {
+func (importer *Importer) ImportFile(filepath string, linefrom, blockfrom, totalretrycount int64, logstatus bool) (linecount, blockcount, retrycount int64, err error) {
 	f, e := os.Open(filepath)
 	f, e := os.Open(filepath)
 	if e != nil {
 	if e != nil {
-		return linecount, blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filepath}})
+		return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filepath}})
 	}
 	}
 	defer f.Close()
 	defer f.Close()
-	return importer.importReader(filepath, f, linefrom, blockfrom, logstatus)
+	return importer.importReader(filepath, f, linefrom, blockfrom, totalretrycount, logstatus)
 }
 }
 
 
-func (importer *Importer) importReader(filename string, buf io.Reader, linefrom, blockfrom int64, logstatus bool) (linecount, blockcount int64, err error) {
+func (importer *Importer) importReader(filename string, buf io.Reader, linefrom, blockfrom, totalretrycount int64, logstatus bool) (linecount, blockcount, retrycount int64, err error) {
 	var filetype schema.FileType
 	var filetype schema.FileType
 	switch {
 	switch {
 	case strings.Contains(filename, "_L1_"):
 	case strings.Contains(filename, "_L1_"):
@@ -257,7 +271,7 @@ func (importer *Importer) importReader(filename string, buf io.Reader, linefrom,
 	}
 	}
 	br, e := reader.NewBlockReader(filename, filetype, buf)
 	br, e := reader.NewBlockReader(filename, filetype, buf)
 	if e != nil {
 	if e != nil {
-		return linecount, blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
+		return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
 	}
 	}
 	lastlogtime := time.Now()
 	lastlogtime := time.Now()
 	skiplines := int(linefrom)
 	skiplines := int(linefrom)
@@ -268,12 +282,12 @@ func (importer *Importer) importReader(filename string, buf io.Reader, linefrom,
 	defer wg.Wait()
 	defer wg.Wait()
 	for {
 	for {
 		if err != nil {
 		if err != nil {
-			break
+			return
 		}
 		}
 		block, line, linenumber, e := br.ReadBlock(skiplines)
 		block, line, linenumber, e := br.ReadBlock(skiplines)
 		linecount = int64(linenumber)
 		linecount = int64(linenumber)
 		if e != nil {
 		if e != nil {
-			return linecount, blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
+			return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
 		}
 		}
 		if block == nil {
 		if block == nil {
 			return
 			return
@@ -287,7 +301,11 @@ func (importer *Importer) importReader(filename string, buf io.Reader, linefrom,
 		}
 		}
 		e = importer.odbcqueryrc.ConcurCall(1, func() {
 		e = importer.odbcqueryrc.ConcurCall(1, func() {
 			defer wg.Done()
 			defer wg.Done()
-			e = importer.importRecord(block, line, filename, filetype, int(doingline))
+			if err != nil {
+				return
+			}
+			rc, e := importer.importRecord(block, line, filename, filetype, int(doingline))
+			atomic.AddInt64(&retrycount, int64(rc))
 			if e != nil {
 			if e != nil {
 				err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(doingline)}, {"line": line}})
 				err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(doingline)}, {"line": line}})
 				return
 				return
@@ -298,12 +316,17 @@ func (importer *Importer) importReader(filename string, buf io.Reader, linefrom,
 					importer.importstatus.ImportStatus[filename] = &ImportStatus{
 					importer.importstatus.ImportStatus[filename] = &ImportStatus{
 						LinesCount:   doingline,
 						LinesCount:   doingline,
 						RecordsCount: doingblock,
 						RecordsCount: doingblock,
+						RetryCount:   retrycount,
 					}
 					}
 					importer.importstatus.TotalUseTime = time.Since(importer.starttime)
 					importer.importstatus.TotalUseTime = time.Since(importer.starttime)
 					importer.importstatus.Save()
 					importer.importstatus.Save()
 					doinglines = doinglines[1:]
 					doinglines = doinglines[1:]
 					if time.Since(lastlogtime) > 5*time.Second {
 					if time.Since(lastlogtime) > 5*time.Second {
-						logger.Info("file", filename, "imported", doingblock, "records")
+						if retrycount > 0 {
+							logger.Info("file", filename, "imported", doingline, "lines", doingblock, "records", retrycount, "retry times")
+						} else {
+							logger.Info("file", filename, "imported", doingline, "lines", doingblock, "records")
+						}
 						lastlogtime = time.Now()
 						lastlogtime = time.Now()
 					}
 					}
 					importer.importstatus.mutex.Unlock()
 					importer.importstatus.mutex.Unlock()
@@ -317,17 +340,16 @@ func (importer *Importer) importReader(filename string, buf io.Reader, linefrom,
 			}
 			}
 		})
 		})
 		if e != nil {
 		if e != nil {
-			return linecount, blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
+			return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
 		}
 		}
 	}
 	}
-	return
 }
 }
 
 
-func (importer *Importer) importRecord(record map[string]any, line string, filename string, filetype schema.FileType, linecount int) (err error) {
+func (importer *Importer) importRecord(record map[string]any, line string, filename string, filetype schema.FileType, linecount int) (retrycount int, err error) {
 	if odbc.LogDebug {
 	if odbc.LogDebug {
 		bs, e := json.MarshalIndent(record, "", "  ")
 		bs, e := json.MarshalIndent(record, "", "  ")
 		if e != nil {
 		if e != nil {
-			return merrs.NewError(e)
+			return 0, merrs.NewError(e)
 		}
 		}
 		logger.Debug(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs)))
 		logger.Debug(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs)))
 	}
 	}
@@ -337,7 +359,7 @@ func (importer *Importer) importRecord(record map[string]any, line string, filen
 		graph.CacheEdgeInfo(record)
 		graph.CacheEdgeInfo(record)
 	default:
 	default:
 		classaliasname = string(filetype)
 		classaliasname = string(filetype)
-		err = importer.odbcimporter.InsertData(classaliasname, record)
+		retrycount, err = importer.odbcimporter.InsertData(classaliasname, record)
 		if err != nil {
 		if err != nil {
 			err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
 			err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
 			return
 			return

+ 57 - 25
importer/odbcimporter.go

@@ -113,7 +113,7 @@ var masterlevel1mutex = make([]sync.Mutex, 256)
 var masterdatas = cmap.New[string, map[string]any]()
 var masterdatas = cmap.New[string, map[string]any]()
 var level1datas = cmap.New[string, map[string]any]()
 var level1datas = cmap.New[string, map[string]any]()
 
 
-func (odbci *ODBCImporter) masterlevel1data(classaliasname string, suid string, data map[string]any) error {
+func (odbci *ODBCImporter) masterlevel1data(classaliasname string, suid string, data map[string]any) (retrycount int, err error) {
 	hidx := int(farm.Hash32([]byte(suid)) % uint32(len(masterlevel1mutex)))
 	hidx := int(farm.Hash32([]byte(suid)) % uint32(len(masterlevel1mutex)))
 	masterlevel1mutex[hidx].Lock()
 	masterlevel1mutex[hidx].Lock()
 	defer masterlevel1mutex[hidx].Unlock()
 	defer masterlevel1mutex[hidx].Unlock()
@@ -130,7 +130,7 @@ func (odbci *ODBCImporter) masterlevel1data(classaliasname string, suid string,
 					// master oid -> 重新生成 level1 oid
 					// master oid -> 重新生成 level1 oid
 					oid, _, e := graph.GetNodeId("level1", data)
 					oid, _, e := graph.GetNodeId("level1", data)
 					if e != nil {
 					if e != nil {
-						return e
+						return retrycount, e
 					}
 					}
 					v = oid
 					v = oid
 				}
 				}
@@ -151,9 +151,9 @@ func (odbci *ODBCImporter) masterlevel1data(classaliasname string, suid string,
 			level1data = entiredata
 			level1data = entiredata
 		}
 		}
 		// 重新插入完整的 level1
 		// 重新插入完整的 level1
-		e := odbci.insertData("level1", "", "", level1data)
+		retrycount, e := odbci.insertData("level1", "", "", level1data)
 		if e != nil {
 		if e != nil {
-			return e
+			return retrycount, e
 		}
 		}
 	case "level1":
 	case "level1":
 		masterdata := masterdatas.GetIFPresent(suid)
 		masterdata := masterdatas.GetIFPresent(suid)
@@ -175,12 +175,12 @@ func (odbci *ODBCImporter) masterlevel1data(classaliasname string, suid string,
 			data = entiredata
 			data = entiredata
 		}
 		}
 		// 插入 level1 数据
 		// 插入 level1 数据
-		e := odbci.insertData("level1", "", "", data)
+		retrycount, e := odbci.insertData("level1", "", "", data)
 		if e != nil {
 		if e != nil {
-			return e
+			return retrycount, e
 		}
 		}
 	}
 	}
-	return nil
+	return retrycount, nil
 }
 }
 
 
 // func (odbci *ODBCImporter) masterlevel1data(classaliasname string, suid string, data map[string]any) error {
 // func (odbci *ODBCImporter) masterlevel1data(classaliasname string, suid string, data map[string]any) error {
@@ -291,16 +291,17 @@ func (odbci *ODBCImporter) masterlevel1data(classaliasname string, suid string,
 // }
 // }
 
 
 // 插入数据
 // 插入数据
-func (odbci *ODBCImporter) InsertData(classaliasname string, data map[string]any) (err error) {
+func (odbci *ODBCImporter) InsertData(classaliasname string, data map[string]any) (retrycount int, err error) {
 	oid, suid, e := graph.GetNodeId(classaliasname, data)
 	oid, suid, e := graph.GetNodeId(classaliasname, data)
 	if e != nil {
 	if e != nil {
-		return e
+		return 0, e
 	}
 	}
 	data["id"] = oid
 	data["id"] = oid
 	if classaliasname == "master" {
 	if classaliasname == "master" {
-		e := odbci.masterlevel1data(classaliasname, suid, data)
+		rc, e := odbci.masterlevel1data(classaliasname, suid, data)
+		retrycount += rc
 		if e != nil {
 		if e != nil {
-			return e
+			return retrycount, e
 		}
 		}
 	} else if classaliasname == "level1" {
 	} else if classaliasname == "level1" {
 		ei := graph.GetEdgeInfo(oid)
 		ei := graph.GetEdgeInfo(oid)
@@ -309,16 +310,22 @@ func (odbci *ODBCImporter) InsertData(classaliasname string, data map[string]any
 			data["depend"] = ei["depend"]
 			data["depend"] = ei["depend"]
 			data["topology"] = ei["topology"]
 			data["topology"] = ei["topology"]
 		}
 		}
-		e := odbci.masterlevel1data(classaliasname, suid, data)
+		rc, e := odbci.masterlevel1data(classaliasname, suid, data)
+		retrycount += rc
 		if e != nil {
 		if e != nil {
-			return e
+			return retrycount, e
 		}
 		}
 		// 数据已经在 masterlevel1data 中插入完成
 		// 数据已经在 masterlevel1data 中插入完成
 		return
 		return
 	} else {
 	} else {
 		data["depend"] = referencedata(classaliasname, data)
 		data["depend"] = referencedata(classaliasname, data)
 	}
 	}
-	return odbci.insertData(classaliasname, oid, suid, data)
+	rc, e := odbci.insertData(classaliasname, oid, suid, data)
+	retrycount += rc
+	if e != nil {
+		return retrycount, e
+	}
+	return retrycount, nil
 }
 }
 
 
 type InnerData struct {
 type InnerData struct {
@@ -375,13 +382,13 @@ func referencedata(classaliasname string, data map[string]any) (depend map[strin
 	return
 	return
 }
 }
 
 
-func (odbci *ODBCImporter) insertData(classaliasname string, oid, suid string, data map[string]any) (err error) {
+func (odbci *ODBCImporter) insertData(classaliasname string, oid, suid string, data map[string]any) (retrycount int, err error) {
 	cdi := classdatainfos.GetIFPresent(classaliasname)
 	cdi := classdatainfos.GetIFPresent(classaliasname)
 	if cdi == nil {
 	if cdi == nil {
-		return merrs.NewError("class not defined " + classaliasname)
+		return retrycount, merrs.NewError("class not defined " + classaliasname)
 	}
 	}
 	if cdi.Insertmql == "" {
 	if cdi.Insertmql == "" {
-		return merrs.NewError("class no fields to insert " + classaliasname)
+		return retrycount, merrs.NewError("class no fields to insert " + classaliasname)
 	}
 	}
 	values := []any{}
 	values := []any{}
 	for _, fn := range cdi.Fieldslist {
 	for _, fn := range cdi.Fieldslist {
@@ -393,7 +400,7 @@ func (odbci *ODBCImporter) insertData(classaliasname string, oid, suid string, d
 		// 合并扩展字段
 		// 合并扩展字段
 		if strset.New(fi.Datakey...).Has("*") {
 		if strset.New(fi.Datakey...).Has("*") {
 			if fi.Fieldtype != "map<varchar,varchar>" {
 			if fi.Fieldtype != "map<varchar,varchar>" {
-				return merrs.NewError("fi.Fieldtype=" + fi.Fieldtype + " != map<varchar,varchar>")
+				return retrycount, merrs.NewError("fi.Fieldtype=" + fi.Fieldtype + " != map<varchar,varchar>")
 			}
 			}
 			td := map[string]any{}
 			td := map[string]any{}
 			for k, v := range data {
 			for k, v := range data {
@@ -415,7 +422,7 @@ func (odbci *ODBCImporter) insertData(classaliasname string, oid, suid string, d
 				case "timestamp":
 				case "timestamp":
 					tv, e := cast.ToDateTimeE(v, "2006-01-02-15.04.05.000000")
 					tv, e := cast.ToDateTimeE(v, "2006-01-02-15.04.05.000000")
 					if e != nil {
 					if e != nil {
-						return merrs.NewError(fmt.Sprint("can't parse datetime value '", v, "'"))
+						return retrycount, merrs.NewError(fmt.Sprint("can't parse datetime value '", v, "'"))
 					}
 					}
 					v = tv.Format("2006-01-02 15:04:05.000000")
 					v = tv.Format("2006-01-02 15:04:05.000000")
 				}
 				}
@@ -432,14 +439,14 @@ func (odbci *ODBCImporter) insertData(classaliasname string, oid, suid string, d
 			mql := "select id,uniqueid from " + classaliasname + " where id=?"
 			mql := "select id,uniqueid from " + classaliasname + " where id=?"
 			r, e := odbci.client.Query(mql, oid).Do()
 			r, e := odbci.client.Query(mql, oid).Do()
 			if e != nil {
 			if e != nil {
-				return e
+				return retrycount, e
 			}
 			}
 			if r != nil && len(r.Data) != 0 {
 			if r != nil && len(r.Data) != 0 {
-				logger.Info(classaliasname, "exists id:", oid, ", uniqueid:", r.Data[0]["uniqueid"], ", new uniqueid:", suid)
+				logger.Debug(classaliasname, "exists id:", oid, ", uniqueid:", r.Data[0]["uniqueid"], ", new uniqueid:", suid)
 			}
 			}
 		}
 		}
 		// logger.Info(values...)
 		// logger.Info(values...)
-		_, err = odbci.client.Query(cdi.Insertmql, values...).Do()
+		retrycount, err = odbci.insertDo(cdi.Insertmql, values...)
 		if err != nil {
 		if err != nil {
 			databs, _ := json.MarshalIndent(data, "", "  ")
 			databs, _ := json.MarshalIndent(data, "", "  ")
 			err = merrs.NewError(err, merrs.SSMaps{{"mql": cdi.Insertmql}, {"values": fmt.Sprint(values)}, {"data": string(databs)}})
 			err = merrs.NewError(err, merrs.SSMaps{{"mql": cdi.Insertmql}, {"values": fmt.Sprint(values)}, {"data": string(databs)}})
@@ -452,19 +459,44 @@ func (odbci *ODBCImporter) insertData(classaliasname string, oid, suid string, d
 	if time.Since(cdi.lastlogtime) > 5*time.Second && cdi.lastlogicount != cdi.insertcount {
 	if time.Since(cdi.lastlogtime) > 5*time.Second && cdi.lastlogicount != cdi.insertcount {
 		cdi.lastlogtime = time.Now()
 		cdi.lastlogtime = time.Now()
 		cdi.lastlogicount = cdi.insertcount
 		cdi.lastlogicount = cdi.insertcount
-		logger.Info("class", cdi.Classfullname, "import", cdi.insertcount, "records")
+		logger.Debug("class", cdi.Classfullname, "import", cdi.insertcount, "records")
 	}
 	}
 	cdi.mutex.Unlock()
 	cdi.mutex.Unlock()
 	return
 	return
 }
 }
 
 
+var insertretrylimitcount = 0
+
+func init() {
+	mcfg.OnChange(func() {
+		insertretrylimitcount = mcfg.GetInt("odbc.insert.retry", 3)
+	})
+}
+
+func (odbci *ODBCImporter) insertDo(insertmql string, values ...any) (trycount int, err error) {
+	for {
+		_, e := odbci.client.Query(insertmql, values...).Do()
+		if e != nil {
+			trycount++
+			e = merrs.New(e, merrs.Map{"trycount": trycount})
+			if trycount <= insertretrylimitcount {
+				logger.Debug(e)
+				time.Sleep(time.Duration(trycount) * time.Second)
+				continue
+			}
+			return trycount, e
+		}
+		return trycount, nil
+	}
+}
+
 func (odbci *ODBCImporter) done() {
 func (odbci *ODBCImporter) done() {
 	classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
 	classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
 		cdi.mutex.Lock()
 		cdi.mutex.Lock()
 		if cdi.lastlogicount != cdi.insertcount {
 		if cdi.lastlogicount != cdi.insertcount {
 			cdi.lastlogtime = time.Now()
 			cdi.lastlogtime = time.Now()
 			cdi.lastlogicount = cdi.insertcount
 			cdi.lastlogicount = cdi.insertcount
-			logger.Info("class", cdi.Classfullname, "import", cdi.insertcount, "records")
+			logger.Debug("class", cdi.Classfullname, "import", cdi.insertcount, "records")
 		}
 		}
 		cdi.mutex.Unlock()
 		cdi.mutex.Unlock()
 		return true
 		return true
@@ -477,7 +509,7 @@ func (odbci *ODBCImporter) alldone() {
 		if cdi.insertcount != 0 {
 		if cdi.insertcount != 0 {
 			cdi.lastlogtime = time.Now()
 			cdi.lastlogtime = time.Now()
 			cdi.lastlogicount = cdi.insertcount
 			cdi.lastlogicount = cdi.insertcount
-			logger.Info("class", cdi.Classfullname, "import", cdi.insertcount, "records")
+			logger.Debug("class", cdi.Classfullname, "import", cdi.insertcount, "records")
 		}
 		}
 		cdi.mutex.Unlock()
 		cdi.mutex.Unlock()
 		return true
 		return true

+ 4 - 4
main.go

@@ -45,8 +45,9 @@ func main() {
 	logger.Info("parallel:   ", parallel)
 	logger.Info("parallel:   ", parallel)
 	logger.Info("reload:     ", reload)
 	logger.Info("reload:     ", reload)
 	logger.Info("rebuild:    ", rebuild)
 	logger.Info("rebuild:    ", rebuild)
+	logger.Info("access", odbc.LogFile, "for detail information")
 	// 导入
 	// 导入
-	totalfilescount, totallinecount, totalrecordscount, totalusetime, filescount, linescount, recordscount, usetime, e := importer.ImportDir(datapath, parallel, rebuild, reload)
+	totalfilescount, totallinecount, totalrecordscount, totalretrycount, totalusetime, filescount, linescount, recordscount, retrycount, usetime, e := importer.ImportDir(datapath, parallel, rebuild, reload)
 	if e != nil {
 	if e != nil {
 		logger.Error(e)
 		logger.Error(e)
 		panic(e)
 		panic(e)
@@ -56,9 +57,8 @@ func main() {
 		return
 		return
 	}
 	}
 	// 输出统计信息
 	// 输出统计信息
-	logger.Info("import", filescount, "files", linescount, "lines", recordscount, "records", "in", mfmt.FormatDuration(usetime))
-	logger.Info("total import", totalfilescount, "files", totallinecount, "lines", totalrecordscount, "records", "in", mfmt.FormatDuration(totalusetime))
-	fmt.Println("access", odbc.LogFile, "for detail information")
+	logger.Info("import", filescount, "files", linescount, "lines", recordscount, "records", retrycount, "retry times", "in", mfmt.FormatDuration(usetime))
+	logger.Info("total import", totalfilescount, "files", totallinecount, "lines", totalrecordscount, "records", totalretrycount, "retry times", "in", mfmt.FormatDuration(totalusetime))
 
 
 	// 验证
 	// 验证
 	if odbc.ODBCDebug || odbc.LogDebug {
 	if odbc.ODBCDebug || odbc.LogDebug {

+ 8 - 4
odbc/odbclient.go

@@ -14,12 +14,12 @@ var ODBClient odb.Client
 var ODBError error
 var ODBError error
 
 
 var ODBServerPath string
 var ODBServerPath string
-var Keyspace string
 var ODBCDebug bool
 var ODBCDebug bool
 
 
 var default_keyspace = `oktest`
 var default_keyspace = `oktest`
 var default_odbpaths = `127.0.0.1:11001`
 var default_odbpaths = `127.0.0.1:11001`
 
 
+var keyspace string
 var default_config = &odb.Config{
 var default_config = &odb.Config{
 	Hosts:    []string{"127.0.0.1:11001"}, // 47.92.151.165:11001
 	Hosts:    []string{"127.0.0.1:11001"}, // 47.92.151.165:11001
 	Keyspace: "oktest",
 	Keyspace: "oktest",
@@ -98,17 +98,21 @@ func init() {
 		ucfg.CommandArgs.GetString("odbpath", strings.Join(
 		ucfg.CommandArgs.GetString("odbpath", strings.Join(
 			ucfg.Environs.GetStrings("ODBPATH",
 			ucfg.Environs.GetStrings("ODBPATH",
 				Config.GetStrings("odbc.odbpath", default_odbpaths)...), ",")), ",")...).List()
 				Config.GetStrings("odbc.odbpath", default_odbpaths)...), ",")), ",")...).List()
-	Keyspace = ucfg.CommandArgs.GetString("keyspace",
+	keyspace = ucfg.CommandArgs.GetString("keyspace",
 		ucfg.Environs.GetString("KEYSPACE",
 		ucfg.Environs.GetString("KEYSPACE",
 			Config.GetString("odbc.keyspace", default_keyspace)))
 			Config.GetString("odbc.keyspace", default_keyspace)))
 	ODBCDebug = Config.GetBool("odbc.debug", false) || Config.GetString("odbc.debug") == "odbc.debug"
 	ODBCDebug = Config.GetBool("odbc.debug", false) || Config.GetString("odbc.debug") == "odbc.debug"
 	odbcfg = config_merge(odbcfg, &odb.Config{
 	odbcfg = config_merge(odbcfg, &odb.Config{
-		Keyspace: Keyspace,
+		Keyspace: keyspace,
 		Hosts:    odbpaths,
 		Hosts:    odbpaths,
 		Debug:    ODBCDebug,
 		Debug:    ODBCDebug,
 	})
 	})
 	ODBServerPath = "[" + strings.Join(odbcfg.Hosts, ",") + "]"
 	ODBServerPath = "[" + strings.Join(odbcfg.Hosts, ",") + "]"
-	Keyspace = odbcfg.Keyspace
+	keyspace = odbcfg.Keyspace
+}
+
+func Keyspace() string {
+	return keyspace
 }
 }
 
 
 // 可以通过命令行,环境变量 或 与应用同名的 .conf 配置文件 设置
 // 可以通过命令行,环境变量 或 与应用同名的 .conf 配置文件 设置