|
@@ -17,8 +17,10 @@ import (
|
|
"git.wecise.com/wecise/cgimport/odbc"
|
|
"git.wecise.com/wecise/cgimport/odbc"
|
|
"git.wecise.com/wecise/cgimport/reader"
|
|
"git.wecise.com/wecise/cgimport/reader"
|
|
"git.wecise.com/wecise/cgimport/schema"
|
|
"git.wecise.com/wecise/cgimport/schema"
|
|
|
|
+ "github.com/wecisecode/util/cmap"
|
|
"github.com/wecisecode/util/filewalker"
|
|
"github.com/wecisecode/util/filewalker"
|
|
"github.com/wecisecode/util/merrs"
|
|
"github.com/wecisecode/util/merrs"
|
|
|
|
+ "github.com/wecisecode/util/pattern"
|
|
"github.com/wecisecode/util/rc"
|
|
"github.com/wecisecode/util/rc"
|
|
)
|
|
)
|
|
|
|
|
|
@@ -230,34 +232,46 @@ func (importer *Importer) ImportFile(filepath string, linefrom, blockfrom, total
|
|
return importer.importReader(filepath, f, linefrom, blockfrom, totalretrycount, logstatus)
|
|
return importer.importReader(filepath, f, linefrom, blockfrom, totalretrycount, logstatus)
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+var fileclassmapping = cmap.NewSingle[string, *regexp.Regexp]()
|
|
|
|
+
|
|
|
|
+func init() {
|
|
|
|
+ fileclassmapping.Set("EDGE", regexp.MustCompile(pattern.Contain2RegexpString(`EDGE`)))
|
|
|
|
+ fileclassmapping.Set("master", regexp.MustCompile(pattern.Contain2RegexpString(`MASTER`)))
|
|
|
|
+ fileclassmapping.Set("level1", regexp.MustCompile(pattern.Contain2RegexpString(`_L1_`)))
|
|
|
|
+ fileclassmapping.Set("level2", regexp.MustCompile(pattern.Contain2RegexpString(`_L2_`)))
|
|
|
|
+ fileclassmapping.Set("level3", regexp.MustCompile(pattern.Contain2RegexpString(`_L3_`)))
|
|
|
|
+ fileclassmapping.Set("level4", regexp.MustCompile(pattern.Contain2RegexpString(`_L4_`)))
|
|
|
|
+ fileclassmapping.Set("level5", regexp.MustCompile(pattern.Contain2RegexpString(`_L5_`)))
|
|
|
|
+ fileclassmapping.Set("level6", regexp.MustCompile(pattern.Contain2RegexpString(`_L6_`)))
|
|
|
|
+ fileclassmapping.Set("level7", regexp.MustCompile(pattern.Contain2RegexpString(`_L7_`)))
|
|
|
|
+ fileclassmapping.Set("level8", regexp.MustCompile(pattern.Contain2RegexpString(`_L8_`)))
|
|
|
|
+ keys := mcfg.Keys()
|
|
|
|
+ for _, key := range keys {
|
|
|
|
+ if strings.HasPrefix(key, "mapping.class.") {
|
|
|
|
+ classname := key[len("mapping.class."):]
|
|
|
|
+ filepatterns := mcfg.GetStrings(key)
|
|
|
|
+ for _, fp := range filepatterns {
|
|
|
|
+ fp = pattern.Wildcard2RegexpString(fp)
|
|
|
|
+ fileclassmapping.Set(classname, regexp.MustCompile(fp))
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
func (importer *Importer) importReader(filename string, buf io.Reader, linefrom, blockfrom, totalretrycount int64, logstatus bool) (linecount, blockcount, retrycount int64, err error) {
|
|
func (importer *Importer) importReader(filename string, buf io.Reader, linefrom, blockfrom, totalretrycount int64, logstatus bool) (linecount, blockcount, retrycount int64, err error) {
|
|
- var filetype schema.FileType
|
|
|
|
- switch {
|
|
|
|
- case strings.Contains(filename, "_L1_"):
|
|
|
|
- filetype = schema.FT_LEVEL1
|
|
|
|
- case strings.Contains(filename, "_L2_"):
|
|
|
|
- filetype = schema.FT_LEVEL2
|
|
|
|
- case strings.Contains(filename, "_L3_"):
|
|
|
|
- filetype = schema.FT_LEVEL3
|
|
|
|
- case strings.Contains(filename, "_L4_"):
|
|
|
|
- filetype = schema.FT_LEVEL4
|
|
|
|
- case strings.Contains(filename, "_L5_"):
|
|
|
|
- filetype = schema.FT_LEVEL5
|
|
|
|
- case strings.Contains(filename, "_L6_"):
|
|
|
|
- filetype = schema.FT_LEVEL6
|
|
|
|
- case strings.Contains(filename, "_L7_"):
|
|
|
|
- filetype = schema.FT_LEVEL7
|
|
|
|
- case strings.Contains(filename, "_L8_"):
|
|
|
|
- filetype = schema.FT_LEVEL8
|
|
|
|
- case strings.Contains(filename, "MASTER"):
|
|
|
|
- filetype = schema.FT_MASTER
|
|
|
|
- case strings.Contains(filename, "EDGE"):
|
|
|
|
- filetype = schema.FT_EDGE
|
|
|
|
- default:
|
|
|
|
- err = merrs.NewError("filename does not conform to the agreed format " + filename)
|
|
|
|
|
|
+ classname := ""
|
|
|
|
+ fileclassmapping.Fetch(func(key string, v *regexp.Regexp) bool {
|
|
|
|
+ if v.MatchString(filename) {
|
|
|
|
+ classname = key
|
|
|
|
+ return false
|
|
|
|
+ }
|
|
|
|
+ return true
|
|
|
|
+ })
|
|
|
|
+ if classname == "" {
|
|
|
|
+ err = merrs.NewError("cannot mapping to class, filename:" + filename)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
- br, e := reader.NewBlockReader(filename, filetype, buf)
|
|
|
|
|
|
+ br, e := reader.NewBlockReader(filename, classname, buf)
|
|
if e != nil {
|
|
if e != nil {
|
|
return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
|
|
return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
|
|
}
|
|
}
|
|
@@ -309,7 +323,7 @@ func (importer *Importer) importReader(filename string, buf io.Reader, linefrom,
|
|
// "RQ:", importer.fileimportrc.QueueCount(),
|
|
// "RQ:", importer.fileimportrc.QueueCount(),
|
|
// "WQ:", importer.odbcqueryrc.QueueCount(),
|
|
// "WQ:", importer.odbcqueryrc.QueueCount(),
|
|
// "maxresponsetime:", maxresponsetime)
|
|
// "maxresponsetime:", maxresponsetime)
|
|
- rc, e := importer.importRecord(block, line, filename, filetype, int(doingline))
|
|
|
|
|
|
+ rc, e := importer.importRecord(block, line, filename, classname, int(doingline))
|
|
atomic.AddInt64(&retrycount, int64(rc))
|
|
atomic.AddInt64(&retrycount, int64(rc))
|
|
if e != nil {
|
|
if e != nil {
|
|
err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(doingline)}, {"line": line}})
|
|
err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(doingline)}, {"line": line}})
|
|
@@ -362,7 +376,7 @@ func (importer *Importer) importReader(filename string, buf io.Reader, linefrom,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-func (importer *Importer) importRecord(record map[string]any, line string, filename string, filetype schema.FileType, linecount int) (retrycount int, err error) {
|
|
|
|
|
|
+func (importer *Importer) importRecord(record map[string]any, line string, filename string, classaliasname string, linecount int) (retrycount int, err error) {
|
|
if odbc.LogDebug {
|
|
if odbc.LogDebug {
|
|
bs, e := json.MarshalIndent(record, "", " ")
|
|
bs, e := json.MarshalIndent(record, "", " ")
|
|
if e != nil {
|
|
if e != nil {
|
|
@@ -370,12 +384,10 @@ func (importer *Importer) importRecord(record map[string]any, line string, filen
|
|
}
|
|
}
|
|
logger.Trace(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs)))
|
|
logger.Trace(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs)))
|
|
}
|
|
}
|
|
- var classaliasname string
|
|
|
|
- switch filetype {
|
|
|
|
- case schema.FT_EDGE:
|
|
|
|
|
|
+ switch classaliasname {
|
|
|
|
+ case schema.EDGE:
|
|
graph.CacheEdgeInfo(record)
|
|
graph.CacheEdgeInfo(record)
|
|
default:
|
|
default:
|
|
- classaliasname = string(filetype)
|
|
|
|
retrycount, err = importer.odbcimporter.InsertData(classaliasname, record)
|
|
retrycount, err = importer.odbcimporter.InsertData(classaliasname, record)
|
|
if err != nil {
|
|
if err != nil {
|
|
err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
|
|
err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
|