libf 4 mēneši atpakaļ
vecāks
revīzija
5398b83db2
11 mainītis faili ar 423 papildinājumiem un 195 dzēšanām
  1. 1 1
      go.mod
  2. 2 2
      go.sum
  3. 230 0
      importer/classinfo.go
  4. 16 0
      importer/filetype.go
  5. 59 20
      importer/importer.go
  6. 104 164
      importer/odbcimporter.go
  7. 1 1
      odbc/cfg.go
  8. 1 1
      odbc/odbclient.go
  9. 1 1
      reader/blockreader.go
  10. 1 2
      reader/csvreader.go
  11. 7 3
      reader/txtreader.go

+ 1 - 1
go.mod

@@ -4,7 +4,7 @@ go 1.20
 
 require (
 	git.wecise.com/wecise/odb-go v0.0.0-20250123142240-7c4d1df627b4
-	git.wecise.com/wecise/util v0.0.0-20250204110134-ab7523fa652e
+	git.wecise.com/wecise/util v0.0.0-20250205071151-1b959fbc3562
 	github.com/scylladb/go-set v1.0.3-0.20200225121959-cc7b2070d91e
 	github.com/spf13/cast v1.7.0
 )

+ 2 - 2
go.sum

@@ -1,7 +1,7 @@
 git.wecise.com/wecise/odb-go v0.0.0-20250123142240-7c4d1df627b4 h1:VUr6whmUPURppVSqLeDnHa5yk2n05FjrsonJTzd6Pt0=
 git.wecise.com/wecise/odb-go v0.0.0-20250123142240-7c4d1df627b4/go.mod h1:ZgFysxr5kKxDUrFcwCWJs5ntFsQtK0Q77o6ZU4bYQnM=
-git.wecise.com/wecise/util v0.0.0-20250204110134-ab7523fa652e h1:rQK0btS9RtVP6pAky3oaJ1yk42W8wn/nUHeSlDEPbOE=
-git.wecise.com/wecise/util v0.0.0-20250204110134-ab7523fa652e/go.mod h1:2YXWE9m5mNgAu40zpYrL3woGz6S8CoHAW/CJeWXaIko=
+git.wecise.com/wecise/util v0.0.0-20250205071151-1b959fbc3562 h1:uWx9k1XcW5x0Bq96BLb26itJx/CK9cesQpCqUkMLk7o=
+git.wecise.com/wecise/util v0.0.0-20250205071151-1b959fbc3562/go.mod h1:2YXWE9m5mNgAu40zpYrL3woGz6S8CoHAW/CJeWXaIko=
 github.com/bluele/gcache v0.0.2 h1:WcbfdXICg7G/DGBh1PFfcirkWOQV+v077yF1pSy3DGw=
 github.com/bluele/gcache v0.0.2/go.mod h1:m15KV+ECjptwSPxKhOhQoAFQVtUFjTVkc3H8o0t/fp0=
 github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao=

+ 230 - 0
importer/classinfo.go

@@ -0,0 +1,230 @@
+package importer
+
+import (
+	"strings"
+
+	"git.wecise.com/wecise/util/cmap"
+)
+
+type fieldinfo struct {
+	fieldname string
+	fieldtype string
+	keyidx    int    // 主键顺序值,0为非主键
+	datakey   string // 对应数据中的键名
+}
+
+type classinfo struct {
+	classname          string
+	aliasname          string
+	classfullname      string
+	fieldinfos         map[string]*fieldinfo
+	datakey_fieldinfos map[string]*fieldinfo
+	keyfields          []string
+	fieldslist         []string
+	insertmql          string
+	createmql          string
+}
+
+var classinfos = cmap.New[string, *classinfo]()
+var classnames = []string{}
+
+func init() {
+	newclassinfo("m3cnet", "m3cnet", "/", nil,
+		[2]string{"ttl", "366 day"},
+		[2]string{"autosearch", "true"},
+		[2]string{"version", "false"},
+		[2]string{"namespace", "'m3cnet'"},
+	)
+	newclassinfo("master", "master", "m3cnet",
+		[]*fieldinfo{
+			{fieldname: "uniqueid", fieldtype: "varchar", keyidx: 1, datakey: "UNIQUEID"},
+			{fieldname: "name", fieldtype: "varchar", datakey: "NAME"},
+			{fieldname: "entitytypes", fieldtype: "set<varchar>", datakey: "ENTITYTYPES"},
+			{fieldname: "basename", fieldtype: "varchar", datakey: "BASENAME"},
+			{fieldname: "entitytypesarr", fieldtype: "varchar", datakey: "ENTITYTYPESARR"},
+			{fieldname: "originid", fieldtype: "varchar", datakey: "ID"},
+			{fieldname: "tags", fieldtype: "set<varchar>", datakey: "TAGS"},
+			{fieldname: "changetime", fieldtype: "timestamp", datakey: "CHANGETIME"},
+			{fieldname: "emsname", fieldtype: "varchar", datakey: "EMSNAME"},
+			{fieldname: "sysid", fieldtype: "varchar", datakey: "SYSID"},
+			{fieldname: "site", fieldtype: "varchar", datakey: "SITE"},
+			{fieldname: "vendor", fieldtype: "varchar", datakey: "VENDOR"},
+			{fieldname: "ci_table", fieldtype: "varchar", datakey: "CI_TABLE"},
+			{fieldname: "ci_status", fieldtype: "varchar", datakey: "CI_STATUS"},
+			{fieldname: "rel_status", fieldtype: "varchar", datakey: "REL_STATUS"},
+			{fieldname: "stage", fieldtype: "varchar", datakey: "STAGE"},
+			{fieldname: "extraattr", fieldtype: "varchar", datakey: "EXTRAATTR"},
+			{fieldname: "entityid", fieldtype: "varchar", datakey: "ENTITYID"},
+			{fieldname: "asmchangetime", fieldtype: "int", datakey: "ASMCHANGETIME"},
+			{fieldname: "cmdbmapping", fieldtype: "varchar", datakey: "CMDBMAPPING"},
+			{fieldname: "ipaddress", fieldtype: "varchar", datakey: "IPADDRESS"},
+			{fieldname: "distname", fieldtype: "varchar", datakey: "DISTNAME"},
+			{fieldname: "site_source", fieldtype: "varchar", datakey: "SITE_SOURCE"},
+			{fieldname: "lastupdated", fieldtype: "timestamp", datakey: "LASTUPDATED"},
+		},
+		[2]string{"partition", "name"},
+	)
+	newclassinfo("minfo", "minfo", "m3cnet",
+		[]*fieldinfo{
+			{fieldname: "uniqueid", fieldtype: "varchar", keyidx: 1, datakey: "uniqueId"},
+			{fieldname: "distname", fieldtype: "varchar", datakey: "distName"},
+			{fieldname: "name", fieldtype: "varchar", datakey: "name"},
+			{fieldname: "entitytypes", fieldtype: "set<varchar>", datakey: "entityTypes"},
+			{fieldname: "extraattr", fieldtype: "varchar", datakey: ""},
+			{fieldname: "tags", fieldtype: "set<varchar>", datakey: "tags"},
+		},
+	)
+	newclassinfo("level1", "level1", "minfo", nil,
+		[2]string{"partition", "name"},
+	)
+	newclassinfo("level2", "level2", "minfo", nil,
+		[2]string{"partition", "name"},
+	)
+	newclassinfo("level3", "level3", "minfo", nil,
+		[2]string{"partition", "name"},
+	)
+	newclassinfo("level4", "level4", "minfo", nil,
+		[2]string{"partition", "name"},
+	)
+	newclassinfo("level5", "level5", "minfo", nil,
+		[2]string{"partition", "name"},
+	)
+	newclassinfo("level6", "level6", "minfo", nil,
+		[2]string{"partition", "name"},
+	)
+	newclassinfo("level7", "level7", "minfo", nil,
+		[2]string{"partition", "name"},
+	)
+	newclassinfo("level8", "level8", "minfo", nil,
+		[2]string{"partition", "name"},
+	)
+}
+
+func newclassinfo(aliasname, classname, baseclassname string, fieldinfoslist []*fieldinfo, withoptions ...[2]string) (ci *classinfo) {
+	defer func() {
+		classnames = append(classnames, classname)
+		classinfos.Set(classname, ci)
+	}()
+	fieldinfos := map[string]*fieldinfo{}
+	datakey_fieldinfos := map[string]*fieldinfo{}
+	fieldslist := []string{}
+	keyfields := []string{}
+	createmql := `create class if not exists ` + classname + `:` + baseclassname + `(`
+	classfullname := ""
+	bci := classinfos.GetIFPresent(baseclassname)
+	if bci != nil {
+		classfullname = bci.classfullname + "/" + classname
+		for fn, fi := range bci.fieldinfos {
+			fieldinfos[fn] = fi
+			datakey_fieldinfos[fi.datakey] = fi
+		}
+		fieldslist = append(fieldslist, bci.fieldslist...)
+		keyfields = append(keyfields, bci.keyfields...)
+	} else {
+		if baseclassname != "/" && baseclassname != "" {
+			panic("baseclassname not defined " + baseclassname)
+		}
+		classfullname = "/" + classname
+	}
+	keyfield_defines := []string{}
+	if len(fieldinfoslist) > 0 {
+		field_defines := []string{}
+		for _, fi := range fieldinfoslist {
+			field_defines = append(field_defines, fi.fieldname+` `+fi.fieldtype+`"`+fi.datakey+`"`)
+			fieldslist = append(fieldslist, fi.fieldname)
+			if fi.keyidx > 0 {
+				for len(keyfield_defines) < fi.keyidx {
+					keyfield_defines = append(keyfield_defines, "")
+				}
+				keyfield_defines[fi.keyidx-1] = fi.fieldname
+			}
+			fieldinfos[fi.fieldname] = fi
+			datakey_fieldinfos[fi.datakey] = fi
+		}
+		createmql += strings.Join(field_defines, ",")
+	}
+	if len(keyfield_defines) > 0 {
+		createmql += ", keys(" + strings.Join(keyfield_defines, ",") + ")"
+		keyfields = append(keyfields, keyfield_defines...)
+	}
+	createmql += `)with alias='` + aliasname + `'`
+	for _, withoption := range withoptions {
+		createmql += " and " + withoption[0] + "=" + withoption[1]
+	}
+
+	var insertmql string
+	if len(fieldslist) > 0 {
+		insertmql = `insert into ` + classname + "(" + strings.Join(fieldslist, ",") + ")values(" + strings.Repeat(",?", len(fieldslist))[1:] + ")"
+	}
+	ci = &classinfo{
+		classname:          classname,
+		aliasname:          aliasname,
+		classfullname:      classfullname,
+		fieldinfos:         fieldinfos,
+		datakey_fieldinfos: datakey_fieldinfos,
+		keyfields:          keyfields,
+		fieldslist:         fieldslist,
+		insertmql:          insertmql,
+		createmql:          createmql,
+	}
+	return
+}
+
+var _ = `
+create class if not exists m3cnet : / ( )with ttl=366 day , autosearch=true , version=false , alias='m3cnet' , namespace='m3cnet' ;
+
+create class if not exists master : m3cnet (
+	uniqueid			varchar			"UNIQUEID",
+	name				varchar			"NAME",
+	entitytypes			varchar			"ENTITYTYPES",
+	basename			varchar			"BASENAME",
+	entitytypesarr		varchar			"ENTITYTYPESARR",
+	originid			varchar			"ID",
+	tags				set<varchar>	"TAGS",
+	changetime			timestamp		"CHANGETIME",
+	emsname				varchar			"EMSNAME",
+	sysid				varchar			"SYSID",
+	site				varchar			"SITE",	
+	vendor				varchar			"VENDOR",
+	ci_table			varchar			"CI_TABLE",
+	ci_status			varchar			"CI_STATUS",
+	rel_status			varchar			"REL_STATUS",
+	stage				varchar			"STAGE",
+	extraattr			map<varchar,varchar>		"EXTRAATTR",
+	entityid			varchar			"ENTITYID",
+	asmchangetime		timestamp		"ASMCHANGETIME",
+	cmdbmapping			varchar			"CMDBMAPPING",
+	ipaddress			varchar			"IPADDRESS",
+	distname			varchar			"DISTNAME",
+	site_source			varchar			"SITE_SOURCE",
+	lastupdated			timestamp		"LASTUPDATED",
+
+	keys( uniqueid ) 
+) with partition=entitytypes , alias='master' ;
+
+create class if not exists minfo : m3cnet (
+	uniqueid			varchar,
+	distName			varchar,
+	name				varchar,
+	entityTypes			varchar,
+	extraattr			map<varchar,varchar>,
+	tags				set<varchar>,
+	
+	keys( uniqueid ) 
+) with alias='minfo' ;
+
+create class if not exists level1 : minfo () with partition=entitytypes , alias='level1' ;
+create class if not exists level2 : minfo () with partition=entitytypes , alias='level2' ;
+create class if not exists level3 : minfo () with partition=entitytypes , alias='level3' ;
+create class if not exists level4 : minfo () with partition=entitytypes , alias='level4' ;
+create class if not exists level5 : minfo () with partition=entitytypes , alias='level5' ;
+create class if not exists level6 : minfo () with partition=entitytypes , alias='level6' ;
+create class if not exists level7 : minfo () with partition=entitytypes , alias='level7' ;
+create class if not exists level8 : minfo () with partition=entitytypes , alias='level8' ;
+`
+
+var createedgemqls = []string{
+	`create edge type m3cnet.contain`,
+	`create edge type m3cnet.depend`,
+	`create edge type m3cnet.topology`,
+}

+ 16 - 0
importer/filetype.go

@@ -0,0 +1,16 @@
+package importer
+
+type FileType string
+
+const (
+	FT_LEVEL1 FileType = "level1"
+	FT_LEVEL2 FileType = "level2"
+	FT_LEVEL3 FileType = "level3"
+	FT_LEVEL4 FileType = "level4"
+	FT_LEVEL5 FileType = "level5"
+	FT_LEVEL6 FileType = "level6"
+	FT_LEVEL7 FileType = "level7"
+	FT_LEVEL8 FileType = "level8"
+	FT_MASTER FileType = "master"
+	FT_EDGE   FileType = "edge"
+)

+ 59 - 20
importer/importer.go

@@ -6,6 +6,7 @@ import (
 	"io"
 	"os"
 	"path/filepath"
+	"strings"
 	"sync"
 	"sync/atomic"
 	"time"
@@ -127,18 +128,23 @@ func (importer *Importer) Import() (totalfilescount, totalrecordscount int64, to
 	cgistatus := NewCGIStatus()
 	reload := mcfg.GetString("reload")
 	if odbc.DevPhase&odbc.DP_PROCESSCONTINUE != 0 && reload == "" {
-		e = cgistatus.Load()
-		if e != nil {
-			err = e
+		err = cgistatus.Load()
+		if err != nil {
 			return
 		}
 	} else {
-		e = importer.odbcimporter.reload()
-		if e != nil {
-			err = e
+		// reload
+		// 清除已有类
+		err = importer.odbcimporter.reload()
+		if err != nil {
 			return
 		}
 	}
+	// 建类
+	err = importer.odbcimporter.ReviseClassStruct()
+	if err != nil {
+		return
+	}
 	totalfilescount = int64(len(cgistatus.ImportStatus))
 	for _, v := range cgistatus.ImportStatus {
 		totalrecordscount += v.RecordsCount
@@ -206,6 +212,32 @@ func (importer *Importer) importReader(filename string, buf io.Reader) (blockcou
 	if e != nil {
 		return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
 	}
+	var filetype FileType
+	switch {
+	case strings.Contains(filename, "_L1_"):
+		filetype = FT_LEVEL1
+	case strings.Contains(filename, "_L2_"):
+		filetype = FT_LEVEL2
+	case strings.Contains(filename, "_L3_"):
+		filetype = FT_LEVEL3
+	case strings.Contains(filename, "_L4_"):
+		filetype = FT_LEVEL4
+	case strings.Contains(filename, "_L5_"):
+		filetype = FT_LEVEL5
+	case strings.Contains(filename, "_L6_"):
+		filetype = FT_LEVEL6
+	case strings.Contains(filename, "_L7_"):
+		filetype = FT_LEVEL7
+	case strings.Contains(filename, "_L8_"):
+		filetype = FT_LEVEL8
+	case strings.Contains(filename, "MASTER"):
+		filetype = FT_MASTER
+	case strings.Contains(filename, "EDGE"):
+		filetype = FT_EDGE
+	default:
+		err = merrs.NewError("filename does not conform to the agreed format " + filename)
+		return
+	}
 	var wg sync.WaitGroup
 	defer importer.done()
 	defer wg.Wait()
@@ -213,9 +245,9 @@ func (importer *Importer) importReader(filename string, buf io.Reader) (blockcou
 		if err != nil {
 			break
 		}
-		block, linecount, e := br.ReadBlock()
+		block, line, linecount, e := br.ReadBlock()
 		if e != nil {
-			return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"line": fmt.Sprint(linecount)}})
+			return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
 		}
 		if block == nil {
 			return
@@ -223,36 +255,43 @@ func (importer *Importer) importReader(filename string, buf io.Reader) (blockcou
 		wg.Add(1)
 		e = importer.importrc.ConcurCall(1, func() {
 			defer wg.Done()
-			e = importer.importRecord(block, filename, linecount)
+			e = importer.importRecord(block, line, filename, filetype, linecount)
 			if e != nil {
-				err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"line": fmt.Sprint(linecount)}})
+				err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
 				return
 			}
 			atomic.AddInt64(&blockcount, 1)
 		})
 		if e != nil {
-			return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"line": fmt.Sprint(linecount)}})
+			return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
 		}
 	}
 	return
 }
 
-func (importer *Importer) importRecord(record map[string]any, filename string, linecount int) (err error) {
+func (importer *Importer) importRecord(record map[string]any, line string, filename string, filetype FileType, linecount int) (err error) {
 	if odbc.LogDebug {
 		bs, e := json.MarshalIndent(record, "", "  ")
 		if e != nil {
-			return e
+			return merrs.NewError(e)
 		}
 		logger.Debug(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs)))
 	}
 	var classname string
-	classname, e := importer.odbcimporter.ReviseClassStruct(record)
-	if e != nil {
-		return e
-	}
-	e = importer.odbcimporter.InsertData(classname, record)
-	if e != nil {
-		return e
+	switch filetype {
+	case FT_EDGE:
+		err = importer.odbcimporter.InsertEdge(record)
+		if err != nil {
+			err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
+			return
+		}
+	default:
+		classname = string(filetype)
+		err = importer.odbcimporter.InsertData(classname, record)
+		if err != nil {
+			err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
+			return
+		}
 	}
 	return
 }

+ 104 - 164
importer/odbcimporter.go

@@ -2,6 +2,7 @@ package importer
 
 import (
 	"encoding/json"
+	"fmt"
 	"strings"
 	"sync"
 	"sync/atomic"
@@ -9,39 +10,27 @@ import (
 
 	"git.wecise.com/wecise/cgimport/odbc"
 	"git.wecise.com/wecise/odb-go/odb"
+	"git.wecise.com/wecise/util/cast"
 	"git.wecise.com/wecise/util/cmap"
 	"git.wecise.com/wecise/util/merrs"
 )
 
-type fieldinfo struct {
-	fieldname string
-	fieldtype string
-	keyidx    int    // 主键顺序值,0为非主键
-	datakey   string // 对应数据中的键名
-}
-
-type classinfo struct {
-	classname     string
-	nickname      string
-	fieldinfos    map[string]*fieldinfo
-	keyfields     []string
-	fieldslist    []string
-	insertmql     string
+type classdatainfo struct {
+	*classinfo
 	insertcount   int64
 	lastlogtime   time.Time
 	lastlogicount int64
 	mutex         sync.Mutex
 }
 
+var classdatainfos = cmap.New[string, *classdatainfo]()
+
 type ODBCImporter struct {
-	client     odb.Client
-	classinfos cmap.ConcurrentMap[string, *classinfo]
+	client odb.Client
 }
 
 func NewODBCImporter() *ODBCImporter {
-	odbci := &ODBCImporter{
-		classinfos: cmap.New[string, *classinfo](),
-	}
+	odbci := &ODBCImporter{}
 	if odbc.DevPhase&(odbc.DP_CREATECLASS|odbc.DP_INSERTDATA) != 0 {
 		odbci.client = odbc.ODBC()
 	}
@@ -49,194 +38,145 @@ func NewODBCImporter() *ODBCImporter {
 }
 
 // 根据数据修正类定义
-func (odbci *ODBCImporter) ReviseClassStruct(data map[string]any) (classname string, err error) {
-	switch {
-	case data["uniqueId"] != nil:
-		classname = "/cgitest/x10/x1002"
-	case data["UNIQUEID"] != nil:
-		classname = "/cgitest/x10/x1001"
-	case data["FROMUNIQUEID"] != nil:
-		classname = "/cgitest/x10/x1003"
-	default:
-		bs, e := json.MarshalIndent(data, "", "  ")
-		if e != nil {
-			err = e
+func (odbci *ODBCImporter) ReviseClassStruct() (err error) {
+	for _, classname := range classnames {
+		ci := classinfos.GetIFPresent(classname)
+		if ci == nil {
+			return merrs.NewError("classinfo not found " + classname)
+		}
+		_, err = classdatainfos.GetWithNew(classname, func() (cdi *classdatainfo, err error) {
+			logger.Info("create class " + ci.classname)
+			if odbci.client != nil {
+				_, err = odbci.client.Query(ci.createmql).Do()
+				if err != nil {
+					return
+				}
+			}
+			cdi = &classdatainfo{classinfo: ci}
 			return
+		})
+	}
+	if odbci.client != nil {
+		for _, createedgemql := range createedgemqls {
+			_, e := odbci.client.Query(createedgemql).Do()
+			if e != nil && !strings.Contains(e.Error(), "already exist") {
+				err = e
+				return
+			}
 		}
-		err = merrs.NewError("no mapping classname", merrs.SSMaps{{"data": string(bs)}})
 	}
+	return
+}
 
-	err = odbci.createClass("cgitest", "/cgitest", nil)
-	if err != nil {
-		return
-	}
-	err = odbci.createClass("x10", "/cgitest/x10", nil)
-	if err != nil {
-		return
-	}
-	err = odbci.createClass("x1001", "/cgitest/x10/x1001", []*fieldinfo{
-		{fieldname: "uniqueid", datakey: "UNIQUEID", fieldtype: "varchar", keyidx: 1},
-		{fieldname: "distname", datakey: "BASENAME", fieldtype: "varchar"},
-	})
-	if err != nil {
-		return
-	}
-	err = odbci.createClass("x1002", "/cgitest/x10/x1002", []*fieldinfo{
-		{fieldname: "uniqueid", datakey: "uniqueId", fieldtype: "varchar", keyidx: 1},
-		{fieldname: "distname", datakey: "distName", fieldtype: "varchar"},
-	})
-	if err != nil {
-		return
-	}
-	err = odbci.createClass("x1003", "/cgitest/x10/x1003", []*fieldinfo{
-		{fieldname: "fromuniqueid", datakey: "FROMUNIQUEID", fieldtype: "varchar", keyidx: 1},
-		{fieldname: "touniqueid", datakey: "TOUNIQUEID", fieldtype: "varchar", keyidx: 2},
-	})
-	if err != nil {
-		return
-	}
+func (odbci *ODBCImporter) InsertEdge(data map[string]any) (err error) {
 
 	return
 }
 
 // 插入数据
 func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (err error) {
-	ci := odbci.classinfos.GetIFPresent(classname)
-	if ci == nil {
+	cdi := classdatainfos.GetIFPresent(classname)
+	if cdi == nil {
 		return merrs.NewError("class not defined " + classname)
 	}
-	if ci.insertmql == "" {
+	if cdi.insertmql == "" {
 		return merrs.NewError("class no fields to insert " + classname)
 	}
 	values := []any{}
-	for _, fn := range ci.fieldslist {
-		fi := ci.fieldinfos[fn]
+	for _, fn := range cdi.fieldslist {
+		fi := cdi.fieldinfos[fn]
+		if fi.datakey == "" {
+			td := map[string]any{}
+			for k, v := range data {
+				if cdi.datakey_fieldinfos[k] == nil {
+					td[k] = v
+				}
+			}
+			tdbs, e := json.Marshal(td)
+			if e != nil {
+				return merrs.NewError(e)
+			}
+			values = append(values, string(tdbs))
+			continue
+		}
 		v := data[fi.datakey]
-
+		switch fi.fieldtype {
+		case "set<varchar>":
+			v = cast.ToStringSlice(v)
+		case "timestamp":
+			tv, e := cast.ToDateTimeE(v, "2006-01-02-15.04.05.000000")
+			if e != nil {
+				return merrs.NewError(fmt.Sprint("can't parse datetime value '", v, "'"))
+			}
+			v = tv.Format("2006-01-02 15:04:05.000000")
+		}
 		values = append(values, v)
 	}
 	if odbci.client != nil {
-		_, err = odbci.client.Query(ci.insertmql, values...).Do()
+		_, err = odbci.client.Query(cdi.insertmql, values...).Do()
 		if err != nil {
+			err = merrs.NewError(err, merrs.SSMaps{{"mql": cdi.insertmql}, {"values": fmt.Sprint(values)}})
+			logger.Error(err)
 			return
 		}
 	}
-	atomic.AddInt64(&ci.insertcount, 1)
-	ci.mutex.Lock()
-	if time.Since(ci.lastlogtime) > 5*time.Second && ci.lastlogicount != ci.insertcount {
-		ci.lastlogtime = time.Now()
-		ci.lastlogicount = ci.insertcount
-		logger.Info("class", ci.classname, "import", ci.insertcount, "records")
+	atomic.AddInt64(&cdi.insertcount, 1)
+	cdi.mutex.Lock()
+	if time.Since(cdi.lastlogtime) > 5*time.Second && cdi.lastlogicount != cdi.insertcount {
+		cdi.lastlogtime = time.Now()
+		cdi.lastlogicount = cdi.insertcount
+		logger.Info("class", cdi.classname, "import", cdi.insertcount, "records")
 	}
-	ci.mutex.Unlock()
+	cdi.mutex.Unlock()
 	return
 }
 
 func (odbci *ODBCImporter) done() {
-	odbci.classinfos.Fetch(func(cn string, ci *classinfo) bool {
-		ci.mutex.Lock()
-		if ci.lastlogicount != ci.insertcount {
-			ci.lastlogtime = time.Now()
-			ci.lastlogicount = ci.insertcount
-			logger.Info("class", ci.classname, "import", ci.insertcount, "records")
+	classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
+		cdi.mutex.Lock()
+		if cdi.lastlogicount != cdi.insertcount {
+			cdi.lastlogtime = time.Now()
+			cdi.lastlogicount = cdi.insertcount
+			logger.Info("class", cdi.classname, "import", cdi.insertcount, "records")
 		}
-		ci.mutex.Unlock()
+		cdi.mutex.Unlock()
 		return true
 	})
 }
 
 func (odbci *ODBCImporter) alldone() {
-	odbci.classinfos.Fetch(func(cn string, ci *classinfo) bool {
-		ci.mutex.Lock()
-		if ci.insertcount != 0 {
-			ci.lastlogtime = time.Now()
-			ci.lastlogicount = ci.insertcount
-			logger.Info("class", ci.classname, "import", ci.insertcount, "records")
+	classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
+		cdi.mutex.Lock()
+		if cdi.insertcount != 0 {
+			cdi.lastlogtime = time.Now()
+			cdi.lastlogicount = cdi.insertcount
+			logger.Info("class", cdi.classname, "import", cdi.insertcount, "records")
 		}
-		ci.mutex.Unlock()
+		cdi.mutex.Unlock()
 		return true
 	})
 }
 
 func (odbci *ODBCImporter) reload() error {
 	if odbci.client != nil {
-		_, e := odbci.client.Query(`delete from "/cgitest/x10/x1001" with version`).Do()
-		_ = e
-		_, e = odbci.client.Query(`delete from "/cgitest/x10/x1002" with version`).Do()
-		_ = e
-		_, e = odbci.client.Query(`delete from "/cgitest/x10/x1003" with version`).Do()
-		_ = e
-		_, e = odbci.client.Query(`drop class if exists "/cgitest/x10/x1001"`).Do()
-		if e != nil {
-			return e
-		}
-		_, e = odbci.client.Query(`drop class if exists "/cgitest/x10/x1002"`).Do()
-		if e != nil {
-			return e
-		}
-		_, e = odbci.client.Query(`drop class if exists "/cgitest/x10/x1003"`).Do()
-		if e != nil {
-			return e
-		}
-		_, e = odbci.client.Query(`drop class if exists "/cgitest/x10"`).Do()
-		if e != nil {
-			return e
-		}
-	}
-	return nil
-}
-
-// 新建类
-func (odbci *ODBCImporter) createClass(nickname, classname string, fieldinfoslist []*fieldinfo) (err error) {
-	_, err = odbci.classinfos.GetWithNew(classname, func() (ci *classinfo, err error) {
-		logger.Info("create class " + classname)
-		fieldinfos := map[string]*fieldinfo{}
-		fieldslist := []string{}
-		keyfields := []string{}
-		mql := `create class if not exists ` + classname + `(`
-		if len(fieldinfoslist) > 0 {
-			field_defines := []string{}
-			for _, fi := range fieldinfoslist {
-				field_defines = append(field_defines, fi.fieldname+" "+fi.fieldtype)
-				fieldslist = append(fieldslist, fi.fieldname)
-				if fi.keyidx > 0 {
-					for len(keyfields) < fi.keyidx {
-						keyfields = append(keyfields, "")
+		for i := len(classnames) - 1; i >= 0; i-- {
+			classname := classnames[i]
+			ci := classinfos.GetIFPresent(classname)
+			if ci == nil {
+				continue
+			}
+			for retry := 2; retry >= 0; retry-- {
+				_, e := odbci.client.Query(`delete from "` + ci.classfullname + `" with version`).Do()
+				_ = e
+				_, e = odbci.client.Query(`drop class if exists "` + ci.classfullname + `"`).Do()
+				if e != nil {
+					if retry > 0 {
+						continue
 					}
-					keyfields[fi.keyidx-1] = fi.fieldname
+					return e
 				}
-				fieldinfos[fi.fieldname] = fi
 			}
-			mql += strings.Join(field_defines, ",")
-		}
-		if len(keyfields) > 0 {
-			mql += ", keys(" + strings.Join(keyfields, ",") + ")"
 		}
-		mql += `)with namespace="cgitest" and key=manu and nickname='` + nickname + `'`
-		if odbci.client != nil {
-			_, err = odbci.client.Query(mql).Do()
-			if err != nil {
-				return
-			}
-		}
-		var insertmql string
-		if len(fieldslist) > 0 {
-			insertmql = `insert into ` + classname + "(" + strings.Join(fieldslist, ",") + ")values(" + strings.Repeat(",?", len(fieldslist))[1:] + ")"
-		}
-		ci = &classinfo{
-			classname:  classname,
-			nickname:   nickname,
-			fieldinfos: fieldinfos,
-			keyfields:  keyfields,
-			fieldslist: fieldslist,
-			insertmql:  insertmql,
-		}
-		return
-	})
-	return
-}
-
-// 修改类定义
-func (odbci *ODBCImporter) alterClass() (err error) {
-	return
+	}
+	return nil
 }

+ 1 - 1
odbc/cfg.go

@@ -43,4 +43,4 @@ const (
 	DP_INSERTDATA
 )
 
-var DevPhase = DP_READFILE | DP_PARSESTRUCT
+var DevPhase = DP_READFILE | DP_PARSESTRUCT | DP_CREATECLASS | DP_INSERTDATA

+ 1 - 1
odbc/odbclient.go

@@ -19,7 +19,7 @@ var default_keyspace = `oktest`
 var default_odbpaths = `127.0.0.1:11001`
 
 var default_config = &odb.Config{
-	Hosts:    []string{"127.0.0.1:11001,47.92.151.165:11001"},
+	Hosts:    []string{"127.0.0.1:11001"}, // 47.92.151.165:11001
 	Keyspace: "oktest",
 	User:     fmt.Sprint("测试客户端"),
 	Pass:     "********",

+ 1 - 1
reader/blockreader.go

@@ -8,7 +8,7 @@ import (
 )
 
 type BlockReader interface {
-	ReadBlock() (block map[string]any, linecount int, err error)
+	ReadBlock() (block map[string]any, line string, linecount int, err error)
 }
 
 func NewBlockReader(filename string, reader io.Reader) (BlockReader, error) {

+ 1 - 2
reader/csvreader.go

@@ -21,8 +21,7 @@ func NewCSVBlockReader(filename string, reader io.Reader) *CSVBlockReader {
 	}
 }
 
-func (br *CSVBlockReader) ReadBlock() (block map[string]any, linecount int, err error) {
-	var line string
+func (br *CSVBlockReader) ReadBlock() (block map[string]any, line string, linecount int, err error) {
 	eof := false
 	for {
 		line, linecount, eof, err = br.ReadLine()

+ 7 - 3
reader/txtreader.go

@@ -8,7 +8,8 @@ import (
 
 type TXTBlockReader struct {
 	*LineReader
-	nextline string
+	firstline string
+	nextline  string
 }
 
 func NewTXTBlockReader(filename string, reader io.Reader) *TXTBlockReader {
@@ -19,9 +20,9 @@ func NewTXTBlockReader(filename string, reader io.Reader) *TXTBlockReader {
 
 var regrecord = regexp.MustCompile(`^(?:[\.\/a-zA-Z0-9_]*:)?V:(\{.*)`)
 
-func (br *TXTBlockReader) ReadBlock() (block map[string]any, linecount int, err error) {
+func (br *TXTBlockReader) ReadBlock() (block map[string]any, line string, linecount int, err error) {
 	eof := false
-	line := br.nextline
+	line = br.nextline
 	for {
 		for {
 			br.nextline, linecount, eof, err = br.ReadLine()
@@ -41,6 +42,9 @@ func (br *TXTBlockReader) ReadBlock() (block map[string]any, linecount int, err
 		}
 		linecount--
 		if !regrecord.MatchString(line) {
+			if linecount == 1 {
+				br.firstline = line
+			}
 			line = br.nextline
 			continue
 		}