libf 2 tháng trước cách đây
mục cha
commit
470fd75753
6 tập tin đã thay đổi với 99 bổ sung25 xóa
  1. 2 2
      go.mod
  2. 4 4
      go.sum
  3. 17 2
      importer/importer.go
  4. 69 17
      importer/odbcimporter.go
  5. 2 0
      odbc/cfg.go
  6. 5 0
      reader/txtreader.go

+ 2 - 2
go.mod

@@ -3,11 +3,11 @@ module git.wecise.com/wecise/cgimport
 go 1.22.0
 
 require (
-	git.wecise.com/wecise/odb-go v0.0.0-20250216043918-3dfdeaa665d0
+	git.wecise.com/wecise/odb-go v0.0.0-20250217080642-2f166f4b3fa8
 	github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da
 	github.com/scylladb/go-set v1.0.2
 	github.com/spf13/cast v1.7.1
-	github.com/wecisecode/util v0.0.6
+	github.com/wecisecode/util v0.0.7
 	modernc.org/sqlite v1.35.0
 )
 

+ 4 - 4
go.sum

@@ -1,5 +1,5 @@
-git.wecise.com/wecise/odb-go v0.0.0-20250216043918-3dfdeaa665d0 h1:xzJ80y712w31P8jWmEll0NoFtG9ddpFkUVR54seCnbM=
-git.wecise.com/wecise/odb-go v0.0.0-20250216043918-3dfdeaa665d0/go.mod h1:SKS8LuZC0/lkoIjObhBeKZrKfUcy5AaSjbkdtY0bh3c=
+git.wecise.com/wecise/odb-go v0.0.0-20250217080642-2f166f4b3fa8 h1:bDjQoXi3xwJtlSB2Pqc+RmZj8erwOyqMgC6rd+kl5xw=
+git.wecise.com/wecise/odb-go v0.0.0-20250217080642-2f166f4b3fa8/go.mod h1:JHsHBAI3LBOQZrAjM0nFv3BPGg/brbFXEi4O1DqqaeA=
 github.com/bluele/gcache v0.0.2 h1:WcbfdXICg7G/DGBh1PFfcirkWOQV+v077yF1pSy3DGw=
 github.com/bluele/gcache v0.0.2/go.mod h1:m15KV+ECjptwSPxKhOhQoAFQVtUFjTVkc3H8o0t/fp0=
 github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao=
@@ -81,8 +81,8 @@ github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IU
 github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
 github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
 github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
-github.com/wecisecode/util v0.0.6 h1:W4/D32N9XIE3bNeXYqmBrb+P1CEJdEDCd+tmLbUMoa8=
-github.com/wecisecode/util v0.0.6/go.mod h1:3uYUTKKfdmgudEAYCam/38H1LtHTxnD7ly9kGf0Tzp0=
+github.com/wecisecode/util v0.0.7 h1:MMS/Le3TnOHwK38HpiPhSw50UCVx9yxzbsTRnzbKn0E=
+github.com/wecisecode/util v0.0.7/go.mod h1:3uYUTKKfdmgudEAYCam/38H1LtHTxnD7ly9kGf0Tzp0=
 github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
 github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
 go.etcd.io/etcd/api/v3 v3.5.18 h1:Q4oDAKnmwqTo5lafvB+afbgCDF7E35E4EYV2g+FNGhs=

+ 17 - 2
importer/importer.go

@@ -7,6 +7,7 @@ import (
 	"os"
 	"path/filepath"
 	"regexp"
+	"runtime"
 	"strings"
 	"sync"
 	"sync/atomic"
@@ -38,7 +39,7 @@ type Importer struct {
 }
 
 func ImportDir(datapath string, parallel int, rebuild, reload bool) (totalfilescount, totallinecount, totalrecordscount, totalretrycount int64, totalusetime time.Duration, filescount, linescount, recordscount, retrycount int64, usetime time.Duration, err error) {
-	concurlimt := mcfg.GetInt("odbc.concurrent.limit", 100)
+	concurlimt := mcfg.GetInt("odbc.concurrent.limit", runtime.GOMAXPROCS(0))
 	importer := &Importer{
 		datapath:     datapath,
 		parallel:     parallel,
@@ -46,7 +47,7 @@ func ImportDir(datapath string, parallel int, rebuild, reload bool) (totalfilesc
 		reload:       reload,
 		importstatus: NewCGIStatus(),
 		fileimportrc: rc.NewRoutinesController("", parallel),
-		odbcqueryrc:  rc.NewRoutinesControllerLimit("", concurlimt, concurlimt*2),
+		odbcqueryrc:  rc.NewRoutinesControllerLimit("", concurlimt, concurlimt*5),
 		odbcimporter: NewODBCImporter(),
 	}
 	return importer.Import()
@@ -278,6 +279,7 @@ func (importer *Importer) importReader(filename string, buf io.Reader, linefrom,
 	blockcount = blockfrom
 	doinglines := []int64{}
 	retrycount = totalretrycount
+	// maxresponsetime := time.Duration(0)
 	var wg sync.WaitGroup
 	defer importer.done()
 	defer wg.Wait()
@@ -305,6 +307,19 @@ func (importer *Importer) importReader(filename string, buf io.Reader, linefrom,
 			if err != nil {
 				return
 			}
+			// st := time.Now()
+			// defer func() {
+			// 	ut := time.Since(st)
+			// 	if ut > maxresponsetime {
+			// 		maxresponsetime = ut
+			// 	}
+			// }()
+			// logger.Info("G:", runtime.NumGoroutine(),
+			// 	"RC:", importer.fileimportrc.ConcurCount(),
+			// 	"WC:", importer.odbcqueryrc.ConcurCount(),
+			// 	"RQ:", importer.fileimportrc.QueueCount(),
+			// 	"WQ:", importer.odbcqueryrc.QueueCount(),
+			// 	"maxresponsetime:", maxresponsetime)
 			rc, e := importer.importRecord(block, line, filename, filetype, int(doingline))
 			atomic.AddInt64(&retrycount, int64(rc))
 			if e != nil {

+ 69 - 17
importer/odbcimporter.go

@@ -3,6 +3,8 @@ package importer
 import (
 	"encoding/json"
 	"fmt"
+	"regexp"
+	"strings"
 	"sync"
 	"sync/atomic"
 	"time"
@@ -151,7 +153,7 @@ func (odbci *ODBCImporter) masterlevel1data(classaliasname string, suid string,
 			level1data = entiredata
 		}
 		// 重新插入完整的 level1
-		retrycount, e := odbci.insertData("level1", "", "", level1data)
+		retrycount, _, e := odbci.insertData("level1", "", "", level1data)
 		if e != nil {
 			return retrycount, e
 		}
@@ -175,7 +177,7 @@ func (odbci *ODBCImporter) masterlevel1data(classaliasname string, suid string,
 			data = entiredata
 		}
 		// 插入 level1 数据
-		retrycount, e := odbci.insertData("level1", "", "", data)
+		retrycount, _, e := odbci.insertData("level1", "", "", data)
 		if e != nil {
 			return retrycount, e
 		}
@@ -320,7 +322,7 @@ func (odbci *ODBCImporter) InsertData(classaliasname string, data map[string]any
 	} else {
 		data["depend"] = referencedata(classaliasname, data)
 	}
-	rc, e := odbci.insertData(classaliasname, oid, suid, data)
+	rc, _, e := odbci.insertData(classaliasname, oid, suid, data)
 	retrycount += rc
 	if e != nil {
 		return retrycount, e
@@ -382,13 +384,13 @@ func referencedata(classaliasname string, data map[string]any) (depend map[strin
 	return
 }
 
-func (odbci *ODBCImporter) insertData(classaliasname string, oid, suid string, data map[string]any) (retrycount int, err error) {
+func (odbci *ODBCImporter) insertData(classaliasname string, oid, suid string, data map[string]any) (retrycount int, responsetime time.Duration, err error) {
 	cdi := classdatainfos.GetIFPresent(classaliasname)
 	if cdi == nil {
-		return retrycount, merrs.NewError("class not defined " + classaliasname)
+		return retrycount, 0, merrs.NewError("class not defined " + classaliasname)
 	}
 	if cdi.Insertmql == "" {
-		return retrycount, merrs.NewError("class no fields to insert " + classaliasname)
+		return retrycount, 0, merrs.NewError("class no fields to insert " + classaliasname)
 	}
 	values := []any{}
 	for _, fn := range cdi.Fieldslist {
@@ -400,7 +402,7 @@ func (odbci *ODBCImporter) insertData(classaliasname string, oid, suid string, d
 		// 合并扩展字段
 		if strset.New(fi.Datakey...).Has("*") {
 			if fi.Fieldtype != "map<varchar,varchar>" {
-				return retrycount, merrs.NewError("fi.Fieldtype=" + fi.Fieldtype + " != map<varchar,varchar>")
+				return retrycount, 0, merrs.NewError("fi.Fieldtype=" + fi.Fieldtype + " != map<varchar,varchar>")
 			}
 			td := map[string]any{}
 			for k, v := range data {
@@ -422,7 +424,7 @@ func (odbci *ODBCImporter) insertData(classaliasname string, oid, suid string, d
 				case "timestamp":
 					tv, e := cast.ToDateTimeE(v, "2006-01-02-15.04.05.000000")
 					if e != nil {
-						return retrycount, merrs.NewError(fmt.Sprint("can't parse datetime value '", v, "'"))
+						return retrycount, 0, merrs.NewError(fmt.Sprint("can't parse datetime value '", v, "'"))
 					}
 					v = tv.Format("2006-01-02 15:04:05.000000")
 				}
@@ -439,14 +441,14 @@ func (odbci *ODBCImporter) insertData(classaliasname string, oid, suid string, d
 			mql := "select id,uniqueid from " + classaliasname + " where id=?"
 			r, e := odbci.client.Query(mql, oid).Do()
 			if e != nil {
-				return retrycount, e
+				return retrycount, 0, e
 			}
 			if r != nil && len(r.Data) != 0 {
 				logger.Debug(classaliasname, "exists id:", oid, ", uniqueid:", r.Data[0]["uniqueid"], ", new uniqueid:", suid)
 			}
 		}
 		// logger.Info(values...)
-		retrycount, err = odbci.insertDo(cdi.Insertmql, values...)
+		retrycount, responsetime, err = odbci.insertDo(cdi.Insertmql, values...)
 		if err != nil {
 			databs, _ := json.MarshalIndent(data, "", "  ")
 			err = merrs.NewError(err, merrs.SSMaps{{"mql": cdi.Insertmql}, {"values": fmt.Sprint(values)}, {"data": string(databs)}})
@@ -465,28 +467,78 @@ func (odbci *ODBCImporter) insertData(classaliasname string, oid, suid string, d
 	return
 }
 
-var insertretrylimitcount = 0
+type ODBCRetryConfig struct {
+	retry    int
+	contains string
+}
+
+var reodbcretry = regexp.MustCompile(`(?s)^\s*(-?[0-9]+)\s*(?:,\s*(.*)\s*)?$`)
+var odbcretry = ""
+var odbcretryconfig []*ODBCRetryConfig
 
 func init() {
 	mcfg.OnChange(func() {
-		insertretrylimitcount = mcfg.GetInt("odbc.insert.retry", 3)
+		_odbcretry := mcfg.GetStrings("odbc.retry", "-1, timed out")
+		if strings.Join(_odbcretry, "|") != odbcretry {
+			odbcretryconfig = RetryConfig(_odbcretry...)
+			odbcretry = strings.Join(_odbcretry, "|")
+		}
 	})
 }
 
-func (odbci *ODBCImporter) insertDo(insertmql string, values ...any) (trycount int, err error) {
+func RetryConfig(retryconfig ...string) (orcs []*ODBCRetryConfig) {
+	defaultorc := &ODBCRetryConfig{
+		retry:    0,
+		contains: "",
+	}
+	for _, retrycfg := range retryconfig {
+		ss := reodbcretry.FindStringSubmatch(retrycfg)
+		if len(ss) > 1 {
+			orc := &ODBCRetryConfig{
+				retry:    cast.ToInt(ss[0]),
+				contains: ss[1],
+			}
+			if orc.contains == "" {
+				defaultorc = orc
+			} else {
+				orcs = append(orcs, orc)
+			}
+		} else {
+			panic("odbc.retry config format error")
+		}
+	}
+	orcs = append(orcs, defaultorc)
+	return
+}
+
+func (odbci *ODBCImporter) insertDo(insertmql string, values ...any) (trycount int, responsetime time.Duration, err error) {
 	for {
+		st := time.Now()
 		_, e := odbci.client.Query(insertmql, values...).Do()
 		if e != nil {
+			maxtrycount := 0
+			for _, orc := range odbcretryconfig {
+				if orc.contains != "" {
+					if strings.Contains(e.Error(), orc.contains) {
+						maxtrycount = orc.retry
+						break
+					}
+				} else {
+					maxtrycount = orc.retry
+					break
+				}
+			}
 			trycount++
 			e = merrs.New(e, merrs.Map{"trycount": trycount})
-			if trycount <= insertretrylimitcount {
-				logger.Debug(e)
+			if maxtrycount < 0 || trycount <= maxtrycount {
+				logger.Debug(merrs.New(e, merrs.Map{"retrycount": trycount}))
 				time.Sleep(time.Duration(trycount) * time.Second)
 				continue
 			}
-			return trycount, e
+			return trycount, 0, e
 		}
-		return trycount, nil
+		responsetime = time.Since(st)
+		return trycount, responsetime, nil
 	}
 }
 

+ 2 - 0
odbc/cfg.go

@@ -49,3 +49,5 @@ const (
 )
 
 var DevPhase = DP_READFILE | DP_PROCESSCONTINUE | DP_PARSESTRUCT | DP_CREATECLASS | DP_INSERTDATA
+
+var mcfg = Config

+ 5 - 0
reader/txtreader.go

@@ -5,6 +5,7 @@ import (
 	"fmt"
 	"io"
 	"regexp"
+	"strings"
 
 	"git.wecise.com/wecise/cgimport/schema"
 )
@@ -64,6 +65,10 @@ func (br *TXTBlockReader) ReadBlock(skiplines int) (block map[string]any, line s
 			continue
 		}
 		line = regrecordstart.ReplaceAllString(line, "$1")
+		line = strings.ReplaceAll(line, `\`, `\\`)
+		line = strings.ReplaceAll(line, "\t", `\t`)
+		line = strings.ReplaceAll(line, "\r", `\r`)
+		line = strings.ReplaceAll(line, "\n", `\n`)
 		block = map[string]any{}
 		err = json.Unmarshal([]byte(line), &block)
 		if err != nil {