libf 5 месяцев назад
Родитель
Сommit
3d5a78a375
3 измененных файлов с 147 добавлено и 6 удалено
  1. 3 3
      importer/importer.go
  2. 139 3
      importer/odbcimporter.go
  3. 5 0
      odbc/odbclient.go

+ 3 - 3
importer/importer.go

@@ -110,7 +110,7 @@ func ImportDir(datapath string, parallel int) (totalfilescount, totalrecordscoun
 	importer := &Importer{
 		datapath:     datapath,
 		parallel:     parallel,
-		importrc:     rc.NewRoutinesController("", 1000),
+		importrc:     rc.NewRoutinesController("", 100),
 		odbcimporter: NewODBCImporter(),
 	}
 	return importer.Import()
@@ -126,7 +126,7 @@ func (importer *Importer) Import() (totalfilescount, totalrecordscount int64, to
 	}
 	cgistatus := NewCGIStatus()
 	reload := mcfg.GetString("reload")
-	if reload == "" {
+	if false && reload == "" {
 		e = cgistatus.Load()
 		if e != nil {
 			err = e
@@ -239,7 +239,7 @@ func (importer *Importer) importRecord(record map[string]any, filename string, l
 	if e != nil {
 		return e
 	}
-	e = importer.odbcimporter.InsertData(record)
+	e = importer.odbcimporter.InsertData("/cgitest/x10/x1002", record)
 	if e != nil {
 		return e
 	}

+ 139 - 3
importer/odbcimporter.go

@@ -1,24 +1,160 @@
 package importer
 
+import (
+	"strings"
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"git.wecise.com/wecise/cgimport/odbc"
+	"git.wecise.com/wecise/odb-go/odb"
+	"git.wecise.com/wecise/util/cmap"
+	"git.wecise.com/wecise/util/merrs"
+)
+
+type fieldinfo struct {
+	fieldname string
+	fieldtype string
+	keyidx    int    // 主键顺序值,0为非主键
+	datakey   string // 对应数据中的键名
+}
+
+type classinfo struct {
+	classname   string
+	nickname    string
+	fieldinfos  map[string]*fieldinfo
+	keyfields   []string
+	fieldslist  []string
+	insertmql   string
+	insertcount int64
+	lastlogtime time.Time
+	mutex       sync.Mutex
+}
+
 type ODBCImporter struct {
+	client     odb.Client
+	classinfos cmap.ConcurrentMap[string, *classinfo]
 }
 
 func NewODBCImporter() *ODBCImporter {
-	return &ODBCImporter{}
+	return &ODBCImporter{
+		client:     odbc.ODBC(),
+		classinfos: cmap.New[string, *classinfo](),
+	}
 }
 
+// 根据数据修正类定义
 func (odbci *ODBCImporter) ReviseClassStruct(data map[string]any) (err error) {
+	e := odbci.createClass("cgitest", "/cgitest", nil)
+	if e != nil {
+		return e
+	}
+	e = odbci.createClass("x10", "/cgitest/x10", []*fieldinfo{
+		{fieldname: "uniqueid", datakey: "uniqueId", fieldtype: "varchar", keyidx: 1},
+	})
+	if e != nil {
+		return e
+	}
+	e = odbci.createClass("x1001", "/cgitest/x10/x1001", []*fieldinfo{
+		{fieldname: "uniqueid", datakey: "uniqueId", fieldtype: "varchar", keyidx: 1},
+		{fieldname: "distname", datakey: "distName", fieldtype: "varchar"},
+	})
+	if e != nil {
+		return e
+	}
+	e = odbci.createClass("x1002", "/cgitest/x10/x1002", []*fieldinfo{
+		{fieldname: "uniqueid", datakey: "uniqueId", fieldtype: "varchar", keyidx: 1},
+		{fieldname: "distname", datakey: "distName", fieldtype: "varchar"},
+	})
+	if e != nil {
+		return e
+	}
 	return
 }
 
-func (odbci *ODBCImporter) InsertData(data map[string]any) (err error) {
+// 插入数据
+func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (err error) {
+	if data["uniqueId"] == nil {
+		// TODO 开发期只导入部分数据
+		return
+	}
+	ci := odbci.classinfos.GetIFPresent(classname)
+	if ci == nil {
+		return merrs.NewError("class not defined " + classname)
+	}
+	if ci.insertmql == "" {
+		return merrs.NewError("class no fields to insert " + classname)
+	}
+	values := []any{}
+	for _, fn := range ci.fieldslist {
+		fi := ci.fieldinfos[fn]
+		v := data[fi.datakey]
+
+		values = append(values, v)
+	}
+	_, err = odbci.client.Query(ci.insertmql, values...).Do()
+	if err != nil {
+		return
+	}
+	atomic.AddInt64(&ci.insertcount, 1)
+	ci.mutex.Lock()
+	if time.Since(ci.lastlogtime) > 5*time.Second {
+		ci.lastlogtime = time.Now()
+		logger.Info("class", ci.classname, "import", ci.insertcount, "records")
+	}
+	ci.mutex.Unlock()
 	return
 }
 
-func (odbci *ODBCImporter) createClass() (err error) {
+// 新建类
+func (odbci *ODBCImporter) createClass(nickname, classname string, fieldinfoslist []*fieldinfo) (err error) {
+	_, err = odbci.classinfos.GetWithNew(classname, func() (ci *classinfo, err error) {
+		logger.Info("create class " + classname)
+		fieldinfos := map[string]*fieldinfo{}
+		fieldslist := []string{}
+		keyfields := []string{}
+		mql := `create class if not exists ` + classname + `(`
+		if len(fieldinfoslist) > 0 {
+			field_defines := []string{}
+			for _, fi := range fieldinfoslist {
+				field_defines = append(field_defines, fi.fieldname+" "+fi.fieldtype)
+				fieldslist = append(fieldslist, fi.fieldname)
+				if fi.keyidx > 0 {
+					for len(keyfields) < fi.keyidx {
+						keyfields = append(keyfields, "")
+					}
+					keyfields[fi.keyidx-1] = fi.fieldname
+				}
+				fieldinfos[fi.fieldname] = fi
+			}
+			mql += strings.Join(field_defines, ",")
+		}
+		if len(keyfields) > 0 {
+			mql += ", keys(" + strings.Join(keyfields, ",") + ")"
+		}
+		mql += `)with namespace="cgitest" and key=manu and nickname='` + nickname + `'`
+		_, err = odbci.client.Query(mql).Do()
+		if err != nil {
+			return
+		}
+		var insertmql string
+		if len(fieldslist) > 0 {
+			insertmql = `insert into ` + classname + "(" + strings.Join(fieldslist, ",") + ")values(" + strings.Repeat(",?", len(fieldslist))[1:] + ")"
+		}
+		ci = &classinfo{
+			classname:  classname,
+			nickname:   nickname,
+			fieldinfos: fieldinfos,
+			keyfields:  keyfields,
+			fieldslist: fieldslist,
+			insertmql:  insertmql,
+		}
+		return
+	})
 	return
 }
 
+// 修改类定义
 func (odbci *ODBCImporter) alterClass() (err error) {
 	return
 }

+ 5 - 0
odbc/odbclient.go

@@ -2,6 +2,7 @@ package odbc
 
 import (
 	"fmt"
+	"os"
 	"path/filepath"
 	"strings"
 
@@ -129,6 +130,10 @@ keyspace=` + default_keyspace + `
 	}))
 	if ODBError != nil {
 		fmt.Print(Usage)
+		if strings.Contains(ODBError.Error(), "error: EOF") {
+			println("\n!!!should add your ip to odbserver whitelist!!!\n")
+			os.Exit(1)
+		}
 		panic(ODBError)
 	}
 	Logger.Info("odbpath :", ODBClient.Config().Hosts, ODBClient.Config().Port)