libf 4 месяцев назад
Родитель
Сommit
bdce9228ee
10 измененных файлов с 227 добавлено и 171 удалено
  1. 23 21
      importer/importer.go
  2. 27 26
      importer/odbcimporter.go
  3. 19 1
      main.go
  4. 4 0
      odbc/cfg.go
  5. 39 32
      odbc/odbclient.go
  6. 6 5
      reader/blockreader.go
  7. 23 4
      reader/csvreader.go
  8. 5 1
      reader/txtreader.go
  9. 80 80
      importer/classinfo.go
  10. 1 1
      importer/filetype.go

+ 23 - 21
importer/importer.go

@@ -13,6 +13,7 @@ import (
 
 	"git.wecise.com/wecise/cgimport/odbc"
 	"git.wecise.com/wecise/cgimport/reader"
+	"git.wecise.com/wecise/cgimport/schema"
 	"git.wecise.com/wecise/util/filewalker"
 	"git.wecise.com/wecise/util/merrs"
 	"git.wecise.com/wecise/util/rc"
@@ -24,6 +25,7 @@ var logger = odbc.Logger
 type Importer struct {
 	datapath     string
 	parallel     int
+	reload       bool
 	importrc     *rc.RoutinesController
 	odbcimporter *ODBCImporter
 }
@@ -107,10 +109,11 @@ func (cgistatus *CGIStatus) Save() (err error) {
 	return cgistatus.lasterror
 }
 
-func ImportDir(datapath string, parallel int) (totalfilescount, totalrecordscount int64, totalusetime time.Duration, filescount, recordscount int64, usetime time.Duration, err error) {
+func ImportDir(datapath string, parallel int, reload bool) (totalfilescount, totalrecordscount int64, totalusetime time.Duration, filescount, recordscount int64, usetime time.Duration, err error) {
 	importer := &Importer{
 		datapath:     datapath,
 		parallel:     parallel,
+		reload:       reload,
 		importrc:     rc.NewRoutinesController("", 100),
 		odbcimporter: NewODBCImporter(),
 	}
@@ -126,8 +129,7 @@ func (importer *Importer) Import() (totalfilescount, totalrecordscount int64, to
 		return
 	}
 	cgistatus := NewCGIStatus()
-	reload := mcfg.GetString("reload")
-	if odbc.DevPhase&odbc.DP_PROCESSCONTINUE != 0 && reload == "" {
+	if odbc.DevPhase&odbc.DP_PROCESSCONTINUE != 0 && !importer.reload {
 		err = cgistatus.Load()
 		if err != nil {
 			return
@@ -212,36 +214,36 @@ func (importer *Importer) ImportFile(filepath string) (blockcount int64, err err
 }
 
 func (importer *Importer) importReader(filename string, buf io.Reader) (blockcount int64, err error) {
-	br, e := reader.NewBlockReader(filename, buf)
-	if e != nil {
-		return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
-	}
-	var filetype FileType
+	var filetype schema.FileType
 	switch {
 	case strings.Contains(filename, "_L1_"):
-		filetype = FT_LEVEL1
+		filetype = schema.FT_LEVEL1
 	case strings.Contains(filename, "_L2_"):
-		filetype = FT_LEVEL2
+		filetype = schema.FT_LEVEL2
 	case strings.Contains(filename, "_L3_"):
-		filetype = FT_LEVEL3
+		filetype = schema.FT_LEVEL3
 	case strings.Contains(filename, "_L4_"):
-		filetype = FT_LEVEL4
+		filetype = schema.FT_LEVEL4
 	case strings.Contains(filename, "_L5_"):
-		filetype = FT_LEVEL5
+		filetype = schema.FT_LEVEL5
 	case strings.Contains(filename, "_L6_"):
-		filetype = FT_LEVEL6
+		filetype = schema.FT_LEVEL6
 	case strings.Contains(filename, "_L7_"):
-		filetype = FT_LEVEL7
+		filetype = schema.FT_LEVEL7
 	case strings.Contains(filename, "_L8_"):
-		filetype = FT_LEVEL8
+		filetype = schema.FT_LEVEL8
 	case strings.Contains(filename, "MASTER"):
-		filetype = FT_MASTER
+		filetype = schema.FT_MASTER
 	case strings.Contains(filename, "EDGE"):
-		filetype = FT_EDGE
+		filetype = schema.FT_EDGE
 	default:
 		err = merrs.NewError("filename does not conform to the agreed format " + filename)
 		return
 	}
+	br, e := reader.NewBlockReader(filename, filetype, buf)
+	if e != nil {
+		return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
+	}
 	var wg sync.WaitGroup
 	defer importer.done()
 	defer wg.Wait()
@@ -273,7 +275,7 @@ func (importer *Importer) importReader(filename string, buf io.Reader) (blockcou
 	return
 }
 
-func (importer *Importer) importRecord(record map[string]any, line string, filename string, filetype FileType, linecount int) (err error) {
+func (importer *Importer) importRecord(record map[string]any, line string, filename string, filetype schema.FileType, linecount int) (err error) {
 	if odbc.LogDebug {
 		bs, e := json.MarshalIndent(record, "", "  ")
 		if e != nil {
@@ -283,7 +285,7 @@ func (importer *Importer) importRecord(record map[string]any, line string, filen
 	}
 	var classname string
 	switch filetype {
-	case FT_EDGE:
+	case schema.FT_EDGE:
 		err = importer.odbcimporter.InsertEdge(record)
 		if err != nil {
 			err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
@@ -314,7 +316,7 @@ func Check() {
 		return
 	}
 	{
-		mql := "select * from /m3cnet/master"
+		mql := "select id,uniqueid,tags,contain,day,vtime from /m3cnet/master where uniqueid='E2E:OTR0002L'"
 		r, e := client.Query(mql).Do()
 		if e != nil {
 			panic(merrs.NewError(e))

+ 27 - 26
importer/odbcimporter.go

@@ -11,6 +11,7 @@ import (
 	"time"
 
 	"git.wecise.com/wecise/cgimport/odbc"
+	"git.wecise.com/wecise/cgimport/schema"
 	"git.wecise.com/wecise/odb-go/odb"
 	"git.wecise.com/wecise/util/cast"
 	"git.wecise.com/wecise/util/cmap"
@@ -18,7 +19,7 @@ import (
 )
 
 type classdatainfo struct {
-	*classinfo
+	*schema.ClassInfo
 	insertcount   int64
 	lastlogtime   time.Time
 	lastlogicount int64
@@ -41,31 +42,31 @@ func NewODBCImporter() *ODBCImporter {
 
 // 根据数据修正类定义
 func (odbci *ODBCImporter) ReviseClassStruct() (err error) {
-	for _, classname := range classnames {
-		ci := classinfos.GetIFPresent(classname)
+	for _, classname := range schema.ClassNames {
+		ci := schema.ClassInfos.GetIFPresent(classname)
 		if ci == nil {
 			return merrs.NewError("classinfo not found " + classname)
 		}
 		_, err = classdatainfos.GetWithNew(classname, func() (cdi *classdatainfo, err error) {
 			if odbci.client != nil {
-				_, err = odbci.client.Query("select class,id from " + ci.classname + " limit 1").Do()
+				_, err = odbci.client.Query("select class,id from " + ci.Classname + " limit 1").Do()
 				if err != nil {
 					if !strings.Contains(err.Error(), "not find") {
 						return nil, err
 					}
-					logger.Info("create class " + ci.classname)
-					_, err = odbci.client.Query(ci.createmql).Do()
+					logger.Info("create class " + ci.Classname)
+					_, err = odbci.client.Query(ci.Createmql).Do()
 					if err != nil {
 						return
 					}
 				}
 			}
-			cdi = &classdatainfo{classinfo: ci}
+			cdi = &classdatainfo{ClassInfo: ci}
 			return
 		})
 	}
 	if odbci.client != nil {
-		for _, createedgemql := range createedgemqls {
+		for _, createedgemql := range schema.CreateEdgeMqls {
 			_, e := odbci.client.Query(createedgemql).Do()
 			if e != nil && !strings.Contains(e.Error(), "already exist") {
 				err = e
@@ -110,7 +111,7 @@ func (odbci *ODBCImporter) InsertEdge(data map[string]any) (err error) {
 
 func (odbci *ODBCImporter) insertEdge(edgetype, fromuid, touid string, extraattr map[string]string, data map[string]any) (err error) {
 
-	edgetype = relations[edgetype]
+	edgetype = schema.Relations[edgetype]
 	if edgetype == "" {
 		databs, _ := json.MarshalIndent(data, "", "  ")
 		return merrs.NewError("not found valid edgetype in data ", merrs.SSMap{"data": string(databs)})
@@ -195,10 +196,10 @@ func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (er
 	if cdi == nil {
 		return merrs.NewError("class not defined " + classname)
 	}
-	if cdi.insertmql == "" {
+	if cdi.Insertmql == "" {
 		return merrs.NewError("class no fields to insert " + classname)
 	}
-	oid, suid, e := object_id(cdi.aliasname, data)
+	oid, suid, e := object_id(cdi.Aliasname, data)
 	if e != nil {
 		return e
 	}
@@ -245,7 +246,7 @@ func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (er
 	}
 
 	values := []any{}
-	for _, fn := range cdi.fieldslist {
+	for _, fn := range cdi.Fieldslist {
 		if fn == "id" {
 			values = append(values, oid)
 			continue
@@ -262,11 +263,11 @@ func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (er
 			values = append(values, topology)
 			continue
 		}
-		fi := cdi.fieldinfos[fn]
-		if fi.datakey == "" {
+		fi := cdi.Fieldinfos[fn]
+		if fi.Datakey == "" {
 			td := map[string]any{}
 			for k, v := range data {
-				if cdi.datakey_fieldinfos[k] == nil {
+				if cdi.DatakeyFieldinfos[k] == nil {
 					td[k] = v
 				}
 			}
@@ -277,8 +278,8 @@ func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (er
 			values = append(values, string(tdbs))
 			continue
 		}
-		v := data[fi.datakey]
-		switch fi.fieldtype {
+		v := data[fi.Datakey]
+		switch fi.Fieldtype {
 		case "set<varchar>":
 			v = cast.ToStringSlice(v)
 		case "timestamp":
@@ -294,9 +295,9 @@ func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (er
 		values = append(values, v)
 	}
 	if odbci.client != nil {
-		_, err = odbci.client.Query(cdi.insertmql, values...).Do()
+		_, err = odbci.client.Query(cdi.Insertmql, values...).Do()
 		if err != nil {
-			err = merrs.NewError(err, merrs.SSMaps{{"mql": cdi.insertmql}, {"values": fmt.Sprint(values)}})
+			err = merrs.NewError(err, merrs.SSMaps{{"mql": cdi.Insertmql}, {"values": fmt.Sprint(values)}})
 			logger.Error(err)
 			return
 		}
@@ -307,7 +308,7 @@ func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (er
 	if time.Since(cdi.lastlogtime) > 5*time.Second && cdi.lastlogicount != cdi.insertcount {
 		cdi.lastlogtime = time.Now()
 		cdi.lastlogicount = cdi.insertcount
-		logger.Info("class", cdi.classname, "import", cdi.insertcount, "records")
+		logger.Info("class", cdi.Classname, "import", cdi.insertcount, "records")
 	}
 	cdi.mutex.Unlock()
 	return
@@ -319,7 +320,7 @@ func (odbci *ODBCImporter) done() {
 		if cdi.lastlogicount != cdi.insertcount {
 			cdi.lastlogtime = time.Now()
 			cdi.lastlogicount = cdi.insertcount
-			logger.Info("class", cdi.classname, "import", cdi.insertcount, "records")
+			logger.Info("class", cdi.Classname, "import", cdi.insertcount, "records")
 		}
 		cdi.mutex.Unlock()
 		return true
@@ -332,7 +333,7 @@ func (odbci *ODBCImporter) alldone() {
 		if cdi.insertcount != 0 {
 			cdi.lastlogtime = time.Now()
 			cdi.lastlogicount = cdi.insertcount
-			logger.Info("class", cdi.classname, "import", cdi.insertcount, "records")
+			logger.Info("class", cdi.Classname, "import", cdi.insertcount, "records")
 		}
 		cdi.mutex.Unlock()
 		return true
@@ -341,13 +342,13 @@ func (odbci *ODBCImporter) alldone() {
 
 func (odbci *ODBCImporter) reload() error {
 	if odbci.client != nil {
-		for i := len(classnames) - 1; i >= 0; i-- {
-			classname := classnames[i]
-			ci := classinfos.GetIFPresent(classname)
+		for i := len(schema.ClassNames) - 1; i >= 0; i-- {
+			classname := schema.ClassNames[i]
+			ci := schema.ClassInfos.GetIFPresent(classname)
 			if ci == nil {
 				continue
 			}
-			e := odbci.dropclass(ci.classfullname)
+			e := odbci.dropclass(ci.Classfullname)
 			if e != nil {
 				return e
 			}

+ 19 - 1
main.go

@@ -2,6 +2,7 @@ package main
 
 import (
 	"fmt"
+	"os"
 
 	"git.wecise.com/wecise/cgimport/importer"
 	"git.wecise.com/wecise/cgimport/odbc"
@@ -16,14 +17,31 @@ import (
 var mcfg = odbc.Config
 var logger = odbc.Logger
 
+func init() {
+	odbc.CommandArgsInfo = func() string {
+		return `
+datapath=data    # 指定数据文件路径
+reload=true    # 重新导入所有数据,不指定则跳过已经导入的文件`
+	}
+}
 func main() {
+	if mcfg.GetString("help") != "" {
+		fmt.Println(odbc.Usage())
+		os.Exit(0)
+	}
 	// 配置参数
 	// 文件目录
 	datapath := mcfg.GetString("datapath", "data")
 	// 并发数
 	parallel := mcfg.GetInt("parallel", 10)
+	//
+	reload := mcfg.GetBool("reload") || mcfg.GetString("reload") == "reload"
+	//
+	logger.Info("datapath:   ", datapath)
+	logger.Info("parallel:   ", parallel)
+	logger.Info("reload:     ", reload)
 	// 导入
-	totalfilescount, totalrecordscount, totalusetime, filescount, recordscount, usetime, e := importer.ImportDir(datapath, parallel)
+	totalfilescount, totalrecordscount, totalusetime, filescount, recordscount, usetime, e := importer.ImportDir(datapath, parallel, reload)
 	if e != nil {
 		panic(e)
 	}

+ 4 - 0
odbc/cfg.go

@@ -35,6 +35,10 @@ var Logger = ulog.New().WithConfig(Config, "log")
 var LogFile = Logger.FileOutPath()
 var LogDebug = Logger.FileOutLevel() <= ulog.DEBUG
 
+func init() {
+	Logger.SetFormat("yyyy-MM-dd HH:mm:ss.SSSSSS [pid] [level] msg", "\n")
+}
+
 const (
 	DP_READFILE = 1 << iota
 	DP_PROCESSCONTINUE

+ 39 - 32
odbc/odbclient.go

@@ -13,7 +13,6 @@ import (
 var ODBClient odb.Client
 var ODBError error
 var Debug bool
-var Usage string
 
 var default_keyspace = `oktest`
 var default_odbpaths = `127.0.0.1:11001`
@@ -27,6 +26,43 @@ var default_config = &odb.Config{
 	Debug:    false,
 }
 
+var CommandArgsInfo = func() string { return "" }
+
+func Usage() string {
+	return `
+命令行参数:` + CommandArgsInfo() + `
+odbpath=` + default_odbpaths + `    # 指定odbserver路径,默认通过环境变量ODBPATH或通过ETCD相关配置获取
+keyspace=` + default_keyspace + `    # 指定keyspace,默认通过环境变量KEYSPACE获取
+debug=true    # 开启调试模式,输出更多信息
+	
+环境变量需求:
+ODBPATH=` + default_odbpaths + `    # 指定odbserver路径,默认通过配置信息 odbc.odbpath 获取
+KEYSPACE=` + default_keyspace + `    # 指定keyspace,默认通过配置信息 odbc.keyspace 获取
+ETCDPATH=127.0.0.1:2379     # 必须设置
+ETCDUSER=
+ETCDPASS=
+CASSANDRAPATH=127.0.0.1
+CASSANDRALOOKUP=false
+NATSPATH=nats://user:N5JbKeT1C3uOUh317OVXjg==@127.0.0.1:4222
+
+LANG=zh_CN.utf8
+LC_ALL=zh_CN.utf8
+
+可通过` + DefaultAppName + `.conf配置运行参数:
+[odbc]
+;指定odbserver路径
+odbpath=` + default_odbpaths + `
+;指定keyspace
+keyspace=` + default_keyspace + `
+`
+}
+
+func LogConfigInfo() {
+	Logger.Info("odbpath:    ", ODBClient.Config().Hosts, ODBClient.Config().Port)
+	Logger.Info("keyspace:   ", ODBClient.Config().Keyspace)
+	Logger.Info("debug:      ", Debug)
+}
+
 func config_merge(a *odb.Config, b *odb.Config) *odb.Config {
 	if len(b.Hosts) > 0 {
 		a.Hosts = b.Hosts
@@ -62,33 +98,6 @@ func ODBC(odbcfgs ...*odb.Config) odb.Client {
 	}
 	default_keyspace = odbcfg.Keyspace
 	default_odbpaths = strings.Join(odbcfg.Hosts, ",")
-	Usage = `
-命令行参数:
-odbpath=` + default_odbpaths + ` #指定odbserver路径,默认通过环境变量ODBPATH或通过ETCD相关配置获取
-keyspace=` + default_keyspace + ` #指定keyspace,默认通过环境变量KEYSPACE获取
-debug=true #开启调试模式,输出更多信息
-	
-环境变量需求:
-ODBPATH=` + default_odbpaths + ` #指定odbserver路径,默认通过配置信息 odbc.odbpath 获取
-KEYSPACE=` + default_keyspace + ` #指定keyspace,默认通过配置信息 odbc.keyspace 获取
-ETCDPATH=127.0.0.1:2379 #必须设置
-ETCDUSER=
-ETCDPASS=
-CASSANDRAPATH=127.0.0.1
-CASSANDRALOOKUP=false
-NATSPATH=nats://user:N5JbKeT1C3uOUh317OVXjg==@127.0.0.1:4222
-
-LANG=zh_CN.utf8
-LC_ALL=zh_CN.utf8
-
-可通过` + DefaultAppName + `.conf配置运行参数:
-[odbc]
-;指定odbserver路径
-odbpath=` + default_odbpaths + `
-;指定keyspace
-keyspace=` + default_keyspace + `
-`
-	Logger.SetFormat("yyyy-MM-dd HH:mm:ss.SSSSSS [pid] [level] msg", "\n")
 	odbpaths := strset.New(strings.Split(ucfg.CommandArgs.GetString("odbpath",
 		strings.Join(ucfg.Environs.GetStrings("ODBPATH",
 			Config.GetStrings("odbc.odbpath", default_odbpaths)...), ",")), ",")...).List()
@@ -101,15 +110,13 @@ keyspace=` + default_keyspace + `
 		Debug:    Debug,
 	}))
 	if ODBError != nil {
-		fmt.Print(Usage)
+		fmt.Print(Usage())
 		if strings.Contains(ODBError.Error(), "error: EOF") {
 			println("\n!!!should add your ip to odbserver whitelist!!!\n")
 			os.Exit(1)
 		}
 		panic(ODBError)
 	}
-	Logger.Info("odbpath :", ODBClient.Config().Hosts, ODBClient.Config().Port)
-	Logger.Info("keyspace:", ODBClient.Config().Keyspace)
-	Logger.Info("debug:   ", Debug)
+	LogConfigInfo()
 	return ODBClient
 }

+ 6 - 5
reader/blockreader.go

@@ -4,6 +4,7 @@ import (
 	"io"
 	"path/filepath"
 
+	"git.wecise.com/wecise/cgimport/schema"
 	"git.wecise.com/wecise/util/merrs"
 )
 
@@ -11,13 +12,13 @@ type BlockReader interface {
 	ReadBlock() (block map[string]any, line string, linecount int, err error)
 }
 
-func NewBlockReader(filename string, reader io.Reader) (BlockReader, error) {
-	filetype := filepath.Ext(filename)
-	switch filetype {
+func NewBlockReader(filename string, filetype schema.FileType, reader io.Reader) (BlockReader, error) {
+	fileext := filepath.Ext(filename)
+	switch fileext {
 	case ".csv":
-		return NewCSVBlockReader(filename, reader), nil
+		return NewCSVBlockReader(filename, filetype, reader), nil
 	case ".txt":
-		return NewTXTBlockReader(filename, reader), nil
+		return NewTXTBlockReader(filename, filetype, reader), nil
 	}
 	return nil, merrs.UnsupportedError.NewError("unsupported data format " + filetype)
 }

+ 23 - 4
reader/csvreader.go

@@ -6,22 +6,27 @@ import (
 	"strconv"
 	"strings"
 
+	"git.wecise.com/wecise/cgimport/schema"
 	"git.wecise.com/wecise/util/merrs"
 	"github.com/spf13/cast"
 )
 
 type CSVBlockReader struct {
 	*LineReader
-	csvkeys []string
+	filetype schema.FileType
+	csvkeys  []string
 }
 
-func NewCSVBlockReader(filename string, reader io.Reader) *CSVBlockReader {
+func NewCSVBlockReader(filename string, filetype schema.FileType, reader io.Reader) *CSVBlockReader {
 	return &CSVBlockReader{
 		LineReader: NewLineReader(filename, reader),
+		filetype:   filetype,
 	}
 }
 
 func (br *CSVBlockReader) ReadBlock() (block map[string]any, line string, linecount int, err error) {
+	classname := string(br.filetype)
+	ci := schema.ClassInfos.GetIFPresent(classname)
 	eof := false
 	for {
 		line, linecount, eof, err = br.ReadLine()
@@ -67,8 +72,22 @@ func (br *CSVBlockReader) ReadBlock() (block map[string]any, line string, lineco
 				}
 				s, e := strconv.Unquote(v)
 				if e == nil {
-					block[k] = s
-					continue
+					v = s
+				}
+			}
+			if ci != nil {
+				fi := ci.DatakeyFieldinfos[k]
+				if fi != nil {
+					switch fi.Fieldtype {
+					case "set<varchar>":
+						s := v
+						if strings.HasPrefix(s, "[") && strings.HasSuffix(s, "]") {
+							s = s[1 : len(s)-1]
+						}
+						ss := cast.ToStringSlice(s)
+						block[k] = ss
+						continue
+					}
 				}
 			}
 			block[k] = v

+ 5 - 1
reader/txtreader.go

@@ -4,17 +4,21 @@ import (
 	"encoding/json"
 	"io"
 	"regexp"
+
+	"git.wecise.com/wecise/cgimport/schema"
 )
 
 type TXTBlockReader struct {
 	*LineReader
+	filetype  schema.FileType
 	firstline string
 	nextline  string
 }
 
-func NewTXTBlockReader(filename string, reader io.Reader) *TXTBlockReader {
+func NewTXTBlockReader(filename string, filetype schema.FileType, reader io.Reader) *TXTBlockReader {
 	return &TXTBlockReader{
 		LineReader: NewLineReader(filename, reader),
+		filetype:   filetype,
 	}
 }
 

+ 80 - 80
importer/classinfo.go

@@ -1,4 +1,4 @@
-package importer
+package schema
 
 import (
 	"strings"
@@ -6,27 +6,27 @@ import (
 	"git.wecise.com/wecise/util/cmap"
 )
 
-type fieldinfo struct {
-	fieldname string
-	fieldtype string
-	keyidx    int    // 主键顺序值,0为非主键
-	datakey   string // 对应数据中的键名
+type FieldInfo struct {
+	Fieldname string
+	Fieldtype string
+	Keyidx    int    // 主键顺序值,0为非主键
+	Datakey   string // 对应数据中的键名
 }
 
-type classinfo struct {
-	classname          string
-	aliasname          string
-	classfullname      string
-	fieldinfos         map[string]*fieldinfo
-	datakey_fieldinfos map[string]*fieldinfo
-	keyfields          []string
-	fieldslist         []string
-	insertmql          string
-	createmql          string
+type ClassInfo struct {
+	Classname         string
+	Aliasname         string
+	Classfullname     string
+	Fieldinfos        map[string]*FieldInfo
+	DatakeyFieldinfos map[string]*FieldInfo
+	Keyfields         []string
+	Fieldslist        []string
+	Insertmql         string
+	Createmql         string
 }
 
-var classinfos = cmap.New[string, *classinfo]()
-var classnames = []string{}
+var ClassInfos = cmap.New[string, *ClassInfo]()
+var ClassNames = []string{}
 
 func init() {
 	newclassinfo("m3cnet", "m3cnet", "/", nil,
@@ -36,43 +36,43 @@ func init() {
 		[2]string{"namespace", "'m3cnet'"},
 	)
 	newclassinfo("master", "master", "m3cnet",
-		[]*fieldinfo{
-			{fieldname: "uniqueid", fieldtype: "varchar", keyidx: 1, datakey: "UNIQUEID"},
-			{fieldname: "name", fieldtype: "varchar", datakey: "NAME"},
-			{fieldname: "entitytypes", fieldtype: "set<varchar>", datakey: "ENTITYTYPES"},
-			{fieldname: "basename", fieldtype: "varchar", datakey: "BASENAME"},
-			{fieldname: "entitytypesarr", fieldtype: "varchar", datakey: "ENTITYTYPESARR"},
-			{fieldname: "originid", fieldtype: "varchar", datakey: "ID"},
-			{fieldname: "tags", fieldtype: "set<varchar>", datakey: "TAGS"},
-			{fieldname: "changetime", fieldtype: "timestamp", datakey: "CHANGETIME"},
-			{fieldname: "emsname", fieldtype: "varchar", datakey: "EMSNAME"},
-			{fieldname: "sysid", fieldtype: "varchar", datakey: "SYSID"},
-			{fieldname: "site", fieldtype: "varchar", datakey: "SITE"},
-			{fieldname: "vendor", fieldtype: "varchar", datakey: "VENDOR"},
-			{fieldname: "ci_table", fieldtype: "varchar", datakey: "CI_TABLE"},
-			{fieldname: "ci_status", fieldtype: "varchar", datakey: "CI_STATUS"},
-			{fieldname: "rel_status", fieldtype: "varchar", datakey: "REL_STATUS"},
-			{fieldname: "stage", fieldtype: "varchar", datakey: "STAGE"},
-			{fieldname: "extraattr", fieldtype: "varchar", datakey: "EXTRAATTR"},
-			{fieldname: "entityid", fieldtype: "varchar", datakey: "ENTITYID"},
-			{fieldname: "asmchangetime", fieldtype: "int", datakey: "ASMCHANGETIME"},
-			{fieldname: "cmdbmapping", fieldtype: "varchar", datakey: "CMDBMAPPING"},
-			{fieldname: "ipaddress", fieldtype: "varchar", datakey: "IPADDRESS"},
-			{fieldname: "distname", fieldtype: "varchar", datakey: "DISTNAME"},
-			{fieldname: "site_source", fieldtype: "varchar", datakey: "SITE_SOURCE"},
-			{fieldname: "lastupdated", fieldtype: "timestamp", datakey: "LASTUPDATED"},
+		[]*FieldInfo{
+			{Fieldname: "uniqueid", Fieldtype: "varchar", Keyidx: 1, Datakey: "UNIQUEID"},
+			{Fieldname: "name", Fieldtype: "varchar", Datakey: "NAME"},
+			{Fieldname: "entitytypes", Fieldtype: "set<varchar>", Datakey: "ENTITYTYPES"},
+			{Fieldname: "basename", Fieldtype: "varchar", Datakey: "BASENAME"},
+			{Fieldname: "entitytypesarr", Fieldtype: "varchar", Datakey: "ENTITYTYPESARR"},
+			{Fieldname: "originid", Fieldtype: "varchar", Datakey: "ID"},
+			{Fieldname: "tags", Fieldtype: "set<varchar>", Datakey: "TAGS"},
+			{Fieldname: "changetime", Fieldtype: "timestamp", Datakey: "CHANGETIME"},
+			{Fieldname: "emsname", Fieldtype: "varchar", Datakey: "EMSNAME"},
+			{Fieldname: "sysid", Fieldtype: "varchar", Datakey: "SYSID"},
+			{Fieldname: "site", Fieldtype: "varchar", Datakey: "SITE"},
+			{Fieldname: "vendor", Fieldtype: "varchar", Datakey: "VENDOR"},
+			{Fieldname: "ci_table", Fieldtype: "varchar", Datakey: "CI_TABLE"},
+			{Fieldname: "ci_status", Fieldtype: "varchar", Datakey: "CI_STATUS"},
+			{Fieldname: "rel_status", Fieldtype: "varchar", Datakey: "REL_STATUS"},
+			{Fieldname: "stage", Fieldtype: "varchar", Datakey: "STAGE"},
+			{Fieldname: "extraattr", Fieldtype: "varchar", Datakey: "EXTRAATTR"},
+			{Fieldname: "entityid", Fieldtype: "varchar", Datakey: "ENTITYID"},
+			{Fieldname: "asmchangetime", Fieldtype: "int", Datakey: "ASMCHANGETIME"},
+			{Fieldname: "cmdbmapping", Fieldtype: "varchar", Datakey: "CMDBMAPPING"},
+			{Fieldname: "ipaddress", Fieldtype: "varchar", Datakey: "IPADDRESS"},
+			{Fieldname: "distname", Fieldtype: "varchar", Datakey: "DISTNAME"},
+			{Fieldname: "site_source", Fieldtype: "varchar", Datakey: "SITE_SOURCE"},
+			{Fieldname: "lastupdated", Fieldtype: "timestamp", Datakey: "LASTUPDATED"},
 		},
 		[2]string{"partition", "name"},
 		[2]string{"key", "manu"},
 	)
 	newclassinfo("minfo", "minfo", "m3cnet",
-		[]*fieldinfo{
-			{fieldname: "uniqueid", fieldtype: "varchar", keyidx: 1, datakey: "uniqueId"},
-			{fieldname: "distname", fieldtype: "varchar", datakey: "distName"},
-			{fieldname: "name", fieldtype: "varchar", datakey: "name"},
-			{fieldname: "entitytypes", fieldtype: "set<varchar>", datakey: "entityTypes"},
-			{fieldname: "extraattr", fieldtype: "varchar", datakey: ""},
-			{fieldname: "tags", fieldtype: "set<varchar>", datakey: "tags"},
+		[]*FieldInfo{
+			{Fieldname: "uniqueid", Fieldtype: "varchar", Keyidx: 1, Datakey: "uniqueId"},
+			{Fieldname: "distname", Fieldtype: "varchar", Datakey: "distName"},
+			{Fieldname: "name", Fieldtype: "varchar", Datakey: "name"},
+			{Fieldname: "entitytypes", Fieldtype: "set<varchar>", Datakey: "entityTypes"},
+			{Fieldname: "extraattr", Fieldtype: "varchar", Datakey: ""},
+			{Fieldname: "tags", Fieldtype: "set<varchar>", Datakey: "tags"},
 		},
 		[2]string{"key", "manu"},
 	)
@@ -110,26 +110,26 @@ func init() {
 	)
 }
 
-func newclassinfo(aliasname, classname, baseclassname string, fieldinfoslist []*fieldinfo, withoptions ...[2]string) (ci *classinfo) {
+func newclassinfo(aliasname, classname, baseclassname string, fieldinfoslist []*FieldInfo, withoptions ...[2]string) (ci *ClassInfo) {
 	defer func() {
-		classnames = append(classnames, classname)
-		classinfos.Set(classname, ci)
+		ClassNames = append(ClassNames, classname)
+		ClassInfos.Set(classname, ci)
 	}()
-	fieldinfos := map[string]*fieldinfo{}
-	datakey_fieldinfos := map[string]*fieldinfo{}
+	fieldinfos := map[string]*FieldInfo{}
+	datakey_fieldinfos := map[string]*FieldInfo{}
 	fieldslist := []string{}
 	keyfields := []string{}
 	createmql := `create class if not exists ` + classname + `:` + baseclassname + `(`
 	classfullname := ""
-	bci := classinfos.GetIFPresent(baseclassname)
+	bci := ClassInfos.GetIFPresent(baseclassname)
 	if bci != nil {
-		classfullname = bci.classfullname + "/" + classname
-		for fn, fi := range bci.fieldinfos {
+		classfullname = bci.Classfullname + "/" + classname
+		for fn, fi := range bci.Fieldinfos {
 			fieldinfos[fn] = fi
-			datakey_fieldinfos[fi.datakey] = fi
+			datakey_fieldinfos[fi.Datakey] = fi
 		}
-		fieldslist = append(fieldslist, bci.fieldslist...)
-		keyfields = append(keyfields, bci.keyfields...)
+		fieldslist = append(fieldslist, bci.Fieldslist...)
+		keyfields = append(keyfields, bci.Keyfields...)
 	} else {
 		if baseclassname != "/" && baseclassname != "" {
 			panic("baseclassname not defined " + baseclassname)
@@ -140,16 +140,16 @@ func newclassinfo(aliasname, classname, baseclassname string, fieldinfoslist []*
 	if len(fieldinfoslist) > 0 {
 		field_defines := []string{}
 		for _, fi := range fieldinfoslist {
-			field_defines = append(field_defines, fi.fieldname+` `+fi.fieldtype+`"`+fi.datakey+`"`)
-			fieldslist = append(fieldslist, fi.fieldname)
-			if fi.keyidx > 0 {
-				for len(keyfield_defines) < fi.keyidx {
+			field_defines = append(field_defines, fi.Fieldname+` `+fi.Fieldtype+`"`+fi.Datakey+`"`)
+			fieldslist = append(fieldslist, fi.Fieldname)
+			if fi.Keyidx > 0 {
+				for len(keyfield_defines) < fi.Keyidx {
 					keyfield_defines = append(keyfield_defines, "")
 				}
-				keyfield_defines[fi.keyidx-1] = fi.fieldname
+				keyfield_defines[fi.Keyidx-1] = fi.Fieldname
 			}
-			fieldinfos[fi.fieldname] = fi
-			datakey_fieldinfos[fi.datakey] = fi
+			fieldinfos[fi.Fieldname] = fi
+			datakey_fieldinfos[fi.Datakey] = fi
 		}
 		createmql += strings.Join(field_defines, ",")
 	}
@@ -172,16 +172,16 @@ func newclassinfo(aliasname, classname, baseclassname string, fieldinfoslist []*
 	if len(fieldslist) > 0 {
 		insertmql = `insert into ` + classname + "(" + strings.Join(fieldslist, ",") + ")values(" + strings.Repeat(",?", len(fieldslist))[1:] + ")"
 	}
-	ci = &classinfo{
-		classname:          classname,
-		aliasname:          aliasname,
-		classfullname:      classfullname,
-		fieldinfos:         fieldinfos,
-		datakey_fieldinfos: datakey_fieldinfos,
-		keyfields:          keyfields,
-		fieldslist:         fieldslist,
-		insertmql:          insertmql,
-		createmql:          createmql,
+	ci = &ClassInfo{
+		Classname:         classname,
+		Aliasname:         aliasname,
+		Classfullname:     classfullname,
+		Fieldinfos:        fieldinfos,
+		DatakeyFieldinfos: datakey_fieldinfos,
+		Keyfields:         keyfields,
+		Fieldslist:        fieldslist,
+		Insertmql:         insertmql,
+		Createmql:         createmql,
 	}
 	return
 }
@@ -239,7 +239,7 @@ create class if not exists level7 : minfo () with partition=entitytypes , alias=
 create class if not exists level8 : minfo () with partition=entitytypes , alias='level8' ;
 `
 
-var relations = map[string]string{
+var Relations = map[string]string{
 	"contains": "contain",
 	"contain":  "contain",
 	"dependon": "depend",
@@ -247,7 +247,7 @@ var relations = map[string]string{
 	"topology": "topology",
 }
 
-var createedgemqls = []string{
+var CreateEdgeMqls = []string{
 	`create edge type m3cnet.contain`,
 	`create edge type m3cnet.depend`,
 	`create edge type m3cnet.topology`,

+ 1 - 1
importer/filetype.go

@@ -1,4 +1,4 @@
-package importer
+package schema
 
 type FileType string