libf 4 mesi fa
parent
commit
7790986a94
7 ha cambiato i file con 310 aggiunte e 215 eliminazioni
  1. 93 0
      importer/cgistatus.go
  2. 1 80
      importer/importer.go
  3. 103 64
      importer/odbcimporter.go
  4. 4 2
      main.go
  5. 2 1
      odbc/cfg.go
  6. 14 7
      odbc/odbclient.go
  7. 93 61
      schema/classinfo.go

+ 93 - 0
importer/cgistatus.go

@@ -0,0 +1,93 @@
+package importer
+
+import (
+	"encoding/json"
+	"os"
+	"path/filepath"
+	"sync"
+	"time"
+
+	"git.wecise.com/wecise/cgimport/odbc"
+	"git.wecise.com/wecise/util/rc"
+)
+
+type ImportStatus struct {
+	RecordsCount int64
+}
+
+type CGIStatus struct {
+	filepath string
+	//
+	TotalUseTime time.Duration
+	ImportStatus map[string]*ImportStatus
+	//
+	mutex        sync.RWMutex
+	rc           *rc.RoutinesController
+	lasterror    error
+	lastsavetime time.Time
+	waitdone     chan any
+}
+
+func NewCGIStatus() *CGIStatus {
+	return &CGIStatus{
+		filepath:     mcfg.GetString("cgi.statusfile", "/opt/matrix/var/cgimport/"+odbc.Keyspace+".status.txt"),
+		ImportStatus: map[string]*ImportStatus{},
+		rc:           rc.NewRoutinesController("", 1),
+		waitdone:     make(chan any, 1),
+	}
+}
+
+func (cgistatus *CGIStatus) Load() error {
+	cgistatusbs, e := os.ReadFile(cgistatus.filepath)
+	if e != nil && !os.IsNotExist(e) {
+		return e
+	}
+	if len(cgistatusbs) > 0 {
+		e = json.Unmarshal(cgistatusbs, &cgistatus)
+		if e != nil {
+			logger.Warn(e)
+		}
+	}
+	return nil
+}
+
+func (cgistatus *CGIStatus) WaitSaveDone() {
+	cgistatus.waitdone <- 1
+	cgistatus.rc.WaitDone()
+}
+
+func (cgistatus *CGIStatus) Save() (err error) {
+	cgistatus.rc.CallLast2Only(func() {
+		if !cgistatus.lastsavetime.Equal(time.Time{}) {
+			interval := time.Since(cgistatus.lastsavetime)
+			if interval < 1*time.Second {
+				t := time.NewTimer(1*time.Second - interval)
+				select {
+				case <-t.C:
+				case v := <-cgistatus.waitdone:
+					cgistatus.waitdone <- v
+				}
+			}
+		}
+		cgistatus.mutex.RLock()
+		cgistatusbs, e := json.MarshalIndent(cgistatus, "", "  ")
+		cgistatus.mutex.RUnlock()
+		if e != nil {
+			cgistatus.lasterror = e
+			return
+		}
+		e = os.MkdirAll(filepath.Dir(cgistatus.filepath), os.ModePerm)
+		if e != nil {
+			cgistatus.lasterror = e
+			return
+		}
+		e = os.WriteFile(cgistatus.filepath, cgistatusbs, os.ModePerm)
+		if e != nil {
+			cgistatus.lasterror = e
+			return
+		}
+		cgistatus.lastsavetime = time.Now()
+		// fmt.Println(cgistatus.lastsavetime)
+	})
+	return cgistatus.lasterror
+}

+ 1 - 80
importer/importer.go

@@ -30,85 +30,6 @@ type Importer struct {
 	odbcimporter *ODBCImporter
 }
 
-type ImportStatus struct {
-	RecordsCount int64
-}
-
-type CGIStatus struct {
-	mutex        sync.RWMutex
-	TotalUseTime time.Duration
-	ImportStatus map[string]*ImportStatus
-	rc           *rc.RoutinesController
-	lasterror    error
-	lastsavetime time.Time
-	waitdone     chan any
-}
-
-var cgistatusfile = mcfg.GetString("cgi.statusfile", "/opt/matrix/var/cgimport.status.txt")
-
-func NewCGIStatus() *CGIStatus {
-	return &CGIStatus{
-		ImportStatus: map[string]*ImportStatus{},
-		rc:           rc.NewRoutinesController("", 1),
-		waitdone:     make(chan any, 1),
-	}
-}
-
-func (cgistatus *CGIStatus) Load() error {
-	cgistatusbs, e := os.ReadFile(cgistatusfile)
-	if e != nil && !os.IsNotExist(e) {
-		return e
-	}
-	if len(cgistatusbs) > 0 {
-		e = json.Unmarshal(cgistatusbs, &cgistatus)
-		if e != nil {
-			logger.Warn(e)
-		}
-	}
-	return nil
-}
-
-func (cgistatus *CGIStatus) WaitSaveDone() {
-	cgistatus.waitdone <- 1
-	cgistatus.rc.WaitDone()
-}
-
-func (cgistatus *CGIStatus) Save() (err error) {
-	cgistatus.rc.CallLast2Only(func() {
-		if !cgistatus.lastsavetime.Equal(time.Time{}) {
-			interval := time.Since(cgistatus.lastsavetime)
-			if interval < 1*time.Second {
-				t := time.NewTimer(1*time.Second - interval)
-				select {
-				case <-t.C:
-				case v := <-cgistatus.waitdone:
-					cgistatus.waitdone <- v
-				}
-			}
-		}
-		cgistatus.mutex.RLock()
-		cgistatusbs, e := json.MarshalIndent(cgistatus, "", "  ")
-		cgistatus.mutex.RUnlock()
-		if e != nil {
-			cgistatus.lasterror = e
-			return
-		}
-		e = os.MkdirAll(filepath.Dir(cgistatusfile), os.ModePerm)
-		if e != nil {
-			cgistatus.lasterror = e
-			return
-		}
-		e = os.WriteFile(cgistatusfile, cgistatusbs, os.ModePerm)
-		if e != nil {
-			cgistatus.lasterror = e
-			return
-		}
-		cgistatus.lastsavetime = time.Now()
-		// fmt.Println(cgistatus.lastsavetime)
-	})
-	return cgistatus.lasterror
-}
-
 func ImportDir(datapath string, parallel int, reload bool) (totalfilescount, totalrecordscount int64, totalusetime time.Duration, filescount, recordscount int64, usetime time.Duration, err error) {
 	importer := &Importer{
 		datapath:     datapath,
@@ -316,7 +237,7 @@ func Check() {
 		return
 	}
 	{
-		mql := "select id,uniqueid,tags,contain,day,vtime from /m3cnet/master where uniqueid='E2E:OTR0002L'"
+		mql := "select id,uniqueid,tags,contain,day,vtime from level1 where uniqueid='E2E:OTR0002L'"
 		r, e := client.Query(mql).Do()
 		if e != nil {
 			panic(merrs.NewError(e))

+ 103 - 64
importer/odbcimporter.go

@@ -16,6 +16,7 @@ import (
 	"git.wecise.com/wecise/util/cast"
 	"git.wecise.com/wecise/util/cmap"
 	"git.wecise.com/wecise/util/merrs"
+	"github.com/scylladb/go-set/strset"
 )
 
 type classdatainfo struct {
@@ -26,7 +27,7 @@ type classdatainfo struct {
 	mutex         sync.Mutex
 }
 
-var classdatainfos = cmap.New[string, *classdatainfo]()
+var classdatainfos = cmap.NewSingle[string, *classdatainfo]()
 
 type ODBCImporter struct {
 	client odb.Client
@@ -47,30 +48,33 @@ func (odbci *ODBCImporter) ReviseClassStruct() (err error) {
 		if ci == nil {
 			return merrs.NewError("classinfo not found " + classname)
 		}
-		_, err = classdatainfos.GetWithNew(classname, func() (cdi *classdatainfo, err error) {
+		cdi, e := classdatainfos.GetWithNew(ci.Classaliasname, func() (cdi *classdatainfo, err error) {
 			if odbci.client != nil {
-				_, err = odbci.client.Query("select class,id from " + ci.Classname + " limit 1").Do()
-				if err != nil {
-					if !strings.Contains(err.Error(), "not find") {
-						return nil, err
+				_, e := odbci.client.Query("select class,id from " + ci.Classfullname + " limit 1").Do()
+				if e != nil {
+					if !strings.Contains(e.Error(), "not find") {
+						return nil, e
 					}
 					logger.Info("create class " + ci.Classfullname)
-					_, err = odbci.client.Query(ci.Createmql).Do()
-					if err != nil {
-						return
+					_, e = odbci.client.Query(ci.Createmql).Do()
+					if e != nil {
+						return nil, e
 					}
 				}
 			}
 			cdi = &classdatainfo{ClassInfo: ci}
 			return
 		})
+		if e != nil {
+			return e
+		}
+		classdatainfos.Set(ci.Classfullname, cdi)
 	}
 	if odbci.client != nil {
 		for _, createedgemql := range schema.CreateEdgeMqls {
 			_, e := odbci.client.Query(createedgemql).Do()
 			if e != nil && !strings.Contains(e.Error(), "already exist") {
-				err = e
-				return
+				return e
 			}
 		}
 	}
@@ -110,14 +114,13 @@ func (odbci *ODBCImporter) InsertEdge(data map[string]any) (err error) {
 }
 
 func (odbci *ODBCImporter) insertEdge(edgetype, fromuid, touid string, extraattr map[string]string, data map[string]any) (err error) {
-
 	edgetype = schema.Relations[edgetype]
 	if edgetype == "" {
 		databs, _ := json.MarshalIndent(data, "", "  ")
 		return merrs.NewError("not found valid edgetype in data ", merrs.SSMap{"data": string(databs)})
 	}
 	if odbci.client != nil {
-		foid := get_object_id_from_cache("master:" + fromuid)
+		foid := get_object_id_from_cache("level1", fromuid)
 		toid := to_object_id("level1", touid)
 		eabs, _ := json.Marshal(extraattr)
 		// quadmql := `quad "` + foid + `" ` + edgetype + ` + "` + toid + `" ` + string(eabs)
@@ -127,7 +130,7 @@ func (odbci *ODBCImporter) insertEdge(edgetype, fromuid, touid string, extraattr
 		// 	logger.Error(err)
 		// 	return
 		// }
-		updatemql := "update " + "/m3cnet/master" + " set " + " contain=?" + " where id='" + foid + "'"
+		updatemql := "update " + "level1" + " set " + " contain=?" + " where id='" + foid + "'"
 		_, err = odbci.client.Query(updatemql, map[string][]string{
 			"_all": {toid},
 			toid:   {string(eabs)},
@@ -143,8 +146,8 @@ func (odbci *ODBCImporter) insertEdge(edgetype, fromuid, touid string, extraattr
 
 var cm_object_id_cache = cmap.New[string, chan string]()
 
-func object_id_cache(suid string) chan string {
-	choid, _ := cm_object_id_cache.GetWithNew(suid,
+func object_id_cache(classaliasname, suid string) chan string {
+	choid, _ := cm_object_id_cache.GetWithNew(classaliasname+":"+suid,
 		func() (chan string, error) {
 			ch := make(chan string, 2)
 			return ch, nil
@@ -152,8 +155,8 @@ func object_id_cache(suid string) chan string {
 	return choid
 }
 
-func get_object_id_from_cache(suid string) string {
-	choid := object_id_cache(suid)
+func get_object_id_from_cache(classaliasname, suid string) string {
+	choid := object_id_cache(classaliasname, suid)
 	oid := <-choid
 	push_object_id_into_cache(choid, oid)
 	return oid
@@ -199,11 +202,24 @@ func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (er
 	if cdi.Insertmql == "" {
 		return merrs.NewError("class no fields to insert " + classname)
 	}
-	oid, suid, e := object_id(cdi.Aliasname, data)
-	if e != nil {
-		return e
+	innerdata := &InnerData{}
+	innerdata.oid, innerdata.suid, err = object_id(cdi.Classaliasname, data)
+	if err != nil {
+		return
 	}
-	var contain, depend, topology map[string][]string
+	innerdata.depend = referencedata(classname, data)
+	return odbci.insertData(classname, cdi, innerdata, data)
+}
+
+type InnerData struct {
+	oid      string
+	suid     string
+	contain  map[string][]string
+	depend   map[string][]string
+	topology map[string][]string
+}
+
+func referencedata(classname string, data map[string]any) (depend map[string][]string) {
 	refer := data["_references"]
 	switch vv := refer.(type) {
 	case []interface{}:
@@ -217,6 +233,8 @@ func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (er
 						suid := cast.ToString(v)
 						toclassname := "master"
 						switch classname {
+						case "level1":
+							toclassname = "level1"
 						case "level2":
 							toclassname = "level1"
 						case "level3":
@@ -244,27 +262,30 @@ func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (er
 			}
 		}
 	}
+	return
+}
 
+func (odbci *ODBCImporter) insertData(classname string, cdi *classdatainfo, innerdata *InnerData, data map[string]any) (err error) {
 	values := []any{}
 	for _, fn := range cdi.Fieldslist {
-		if fn == "id" {
-			values = append(values, oid)
+		// 内部字段
+		switch fn {
+		case "id":
+			values = append(values, innerdata.oid)
 			continue
-		}
-		if fn == "contain" {
-			values = append(values, contain)
+		case "contain":
+			values = append(values, innerdata.contain)
 			continue
-		}
-		if fn == "depend" {
-			values = append(values, depend)
+		case "depend":
+			values = append(values, innerdata.depend)
 			continue
-		}
-		if fn == "topology" {
-			values = append(values, topology)
+		case "topology":
+			values = append(values, innerdata.topology)
 			continue
 		}
 		fi := cdi.Fieldinfos[fn]
-		if fi.Datakey == "" {
+		// 合并扩展字段
+		if strset.New(fi.Datakey...).Has("*") {
 			td := map[string]any{}
 			for k, v := range data {
 				if cdi.DatakeyFieldinfos[k] == nil {
@@ -278,16 +299,23 @@ func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (er
 			values = append(values, string(tdbs))
 			continue
 		}
-		v := data[fi.Datakey]
-		switch fi.Fieldtype {
-		case "set<varchar>":
-			v = cast.ToStringSlice(v)
-		case "timestamp":
-			tv, e := cast.ToDateTimeE(v, "2006-01-02-15.04.05.000000")
-			if e != nil {
-				return merrs.NewError(fmt.Sprint("can't parse datetime value '", v, "'"))
+		// 字段类型修正
+		var v any
+		for _, dk := range fi.Datakey {
+			v = data[dk]
+			if v != nil {
+				switch fi.Fieldtype {
+				case "set<varchar>":
+					v = cast.ToStringSlice(v)
+				case "timestamp":
+					tv, e := cast.ToDateTimeE(v, "2006-01-02-15.04.05.000000")
+					if e != nil {
+						return merrs.NewError(fmt.Sprint("can't parse datetime value '", v, "'"))
+					}
+					v = tv.Format("2006-01-02 15:04:05.000000")
+				}
+				break
 			}
-			v = tv.Format("2006-01-02 15:04:05.000000")
 		}
 		if fn == "tags" {
 			v = append(cast.ToStringSlice(v), classname)
@@ -297,18 +325,19 @@ func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (er
 	if odbci.client != nil {
 		_, err = odbci.client.Query(cdi.Insertmql, values...).Do()
 		if err != nil {
-			err = merrs.NewError(err, merrs.SSMaps{{"mql": cdi.Insertmql}, {"values": fmt.Sprint(values)}})
+			databs, _ := json.MarshalIndent(data, "", "  ")
+			err = merrs.NewError(err, merrs.SSMaps{{"mql": cdi.Insertmql}, {"values": fmt.Sprint(values)}, {"data": string(databs)}})
 			logger.Error(err)
 			return
 		}
-		push_object_id_into_cache(object_id_cache(classname+":"+suid), oid)
+		push_object_id_into_cache(object_id_cache(cdi.Classaliasname, innerdata.suid), innerdata.oid)
 	}
 	atomic.AddInt64(&cdi.insertcount, 1)
 	cdi.mutex.Lock()
 	if time.Since(cdi.lastlogtime) > 5*time.Second && cdi.lastlogicount != cdi.insertcount {
 		cdi.lastlogtime = time.Now()
 		cdi.lastlogicount = cdi.insertcount
-		logger.Info("class", cdi.Classname, "import", cdi.insertcount, "records")
+		logger.Info("class", cdi.Classfullname, "import", cdi.insertcount, "records")
 	}
 	cdi.mutex.Unlock()
 	return
@@ -320,7 +349,7 @@ func (odbci *ODBCImporter) done() {
 		if cdi.lastlogicount != cdi.insertcount {
 			cdi.lastlogtime = time.Now()
 			cdi.lastlogicount = cdi.insertcount
-			logger.Info("class", cdi.Classname, "import", cdi.insertcount, "records")
+			logger.Info("class", cdi.Classfullname, "import", cdi.insertcount, "records")
 		}
 		cdi.mutex.Unlock()
 		return true
@@ -333,7 +362,7 @@ func (odbci *ODBCImporter) alldone() {
 		if cdi.insertcount != 0 {
 			cdi.lastlogtime = time.Now()
 			cdi.lastlogicount = cdi.insertcount
-			logger.Info("class", cdi.Classname, "import", cdi.insertcount, "records")
+			logger.Info("class", cdi.Classfullname, "import", cdi.insertcount, "records")
 		}
 		cdi.mutex.Unlock()
 		return true
@@ -357,25 +386,35 @@ func (odbci *ODBCImporter) reload() error {
 	return nil
 }
 
-func (odbci *ODBCImporter) dropclass(classname string) error {
-	for retry := 2; retry >= 0; retry-- {
-		_, e := odbci.client.Query(`delete from "` + classname + `" with version`).Do()
-		_ = e
-		_, e = odbci.client.Query(`drop class if exists "` + classname + `"`).Do()
-		if e != nil {
-			matchstr := regexp.MustCompile(`refer by ([^,]+)`).FindStringSubmatch(e.Error())
-			if len(matchstr) >= 2 {
-				e = odbci.dropclass(matchstr[1])
-				if e != nil {
-					return e
+func (odbci *ODBCImporter) dropclass(classnames ...string) error {
+	for _, classname := range classnames {
+		for retry := 2; retry >= 0; retry-- {
+			_, e := odbci.client.Query(`delete from "` + classname + `" with version`).Do()
+			_ = e
+			_, e = odbci.client.Query(`drop class if exists "` + classname + `"`).Do()
+			if e != nil {
+				matchstr := regexp.MustCompile(`refer by ([^,]+)`).FindStringSubmatch(e.Error())
+				if len(matchstr) >= 2 {
+					e = odbci.dropclass(matchstr[1])
+					if e != nil {
+						return e
+					}
+				} else {
+					matchstr := regexp.MustCompile(`has children \[([^\]]+)\]`).FindStringSubmatch(e.Error())
+					if len(matchstr) >= 2 {
+						e = odbci.dropclass(strings.Split(matchstr[1], ",")...)
+						if e != nil {
+							return e
+						}
+					}
 				}
+				if retry > 0 {
+					continue
+				}
+				return e
 			}
-			if retry > 0 {
-				continue
-			}
-			return e
 		}
+		logger.Info("drop class " + classname)
 	}
-	logger.Info("drop class " + classname)
 	return nil
 }

+ 4 - 2
main.go

@@ -31,9 +31,9 @@ func main() {
 	}
 	// 配置参数
 	// 文件目录
-	datapath := mcfg.GetString("datapath", "data")
+	datapath := mcfg.GetString("datapath", mcfg.GetString("cgi.datapath", "data"))
 	// 并发数
-	parallel := mcfg.GetInt("parallel", 10)
+	parallel := mcfg.GetInt("parallel", mcfg.GetInt("cgi.parallel", 10))
 	//
 	reload := mcfg.GetBool("reload") || mcfg.GetString("reload") == "reload"
 	//
@@ -43,6 +43,7 @@ func main() {
 	// 导入
 	totalfilescount, totalrecordscount, totalusetime, filescount, recordscount, usetime, e := importer.ImportDir(datapath, parallel, reload)
 	if e != nil {
+		logger.Error(e)
 		panic(e)
 	}
 	if totalfilescount == 0 {
@@ -56,4 +57,5 @@ func main() {
 
 	// 验证
 	importer.Check()
+	os.Exit(0)
 }

+ 2 - 1
odbc/cfg.go

@@ -9,6 +9,7 @@ import (
 )
 
 var DefaultAppName = "cgimport"
+
 var Config = ucfg.MConfig(&ucfg.CfgOption{
 	Name: "m:default",
 	Type: ucfg.INI_TEXT,
@@ -16,7 +17,7 @@ var Config = ucfg.MConfig(&ucfg.CfgOption{
 [log]
 level=debug
 dir=` + filepath.Join("/", "opt", "matrix", "var", "logs") + `
-file=` + DefaultAppName + ".log" + `
+file=` + filepath.Join(DefaultAppName, "log.log") + `
 console=true
 color=true
 consolelevel=info

+ 14 - 7
odbc/odbclient.go

@@ -12,6 +12,9 @@ import (
 
 var ODBClient odb.Client
 var ODBError error
+
+var ODBServerPath string
+var Keyspace string
 var Debug bool
 
 var default_keyspace = `oktest`
@@ -58,7 +61,7 @@ keyspace=` + default_keyspace + `
 }
 
 func LogConfigInfo() {
-	Logger.Info("odbpath:    ", ODBClient.Config().Hosts, ODBClient.Config().Port)
+	Logger.Info("odbpath:    ", ODBClient.Config().Hosts)
 	Logger.Info("keyspace:   ", ODBClient.Config().Keyspace)
 	Logger.Info("debug:      ", Debug)
 }
@@ -98,14 +101,16 @@ func ODBC(odbcfgs ...*odb.Config) odb.Client {
 	}
 	default_keyspace = odbcfg.Keyspace
 	default_odbpaths = strings.Join(odbcfg.Hosts, ",")
-	odbpaths := strset.New(strings.Split(ucfg.CommandArgs.GetString("odbpath",
-		strings.Join(ucfg.Environs.GetStrings("ODBPATH",
-			Config.GetStrings("odbc.odbpath", default_odbpaths)...), ",")), ",")...).List()
-	keyspace := ucfg.CommandArgs.GetString("keyspace",
-		ucfg.Environs.GetString("KEYSPACE", Config.GetString("odbc.keyspace", default_keyspace)))
+	odbpaths := strset.New(strings.Split(
+		ucfg.CommandArgs.GetString("odbpath", strings.Join(
+			ucfg.Environs.GetStrings("ODBPATH",
+				Config.GetStrings("odbc.odbpath", default_odbpaths)...), ",")), ",")...).List()
+	Keyspace = ucfg.CommandArgs.GetString("keyspace",
+		ucfg.Environs.GetString("KEYSPACE",
+			Config.GetString("odbc.keyspace", default_keyspace)))
 	Debug = ucfg.CommandArgs.GetBool("debug", false)
 	ODBClient, ODBError = odb.NewClient(config_merge(odbcfg, &odb.Config{
-		Keyspace: keyspace,
+		Keyspace: Keyspace,
 		Hosts:    odbpaths,
 		Debug:    Debug,
 	}))
@@ -117,6 +122,8 @@ func ODBC(odbcfgs ...*odb.Config) odb.Client {
 		}
 		panic(ODBError)
 	}
+	ODBServerPath = "[" + strings.Join(ODBClient.Config().Hosts, ",") + "]"
+	Keyspace = ODBClient.Config().Keyspace
 	LogConfigInfo()
 	return ODBClient
 }

+ 93 - 61
schema/classinfo.go

@@ -4,18 +4,18 @@ import (
 	"strings"
 
 	"git.wecise.com/wecise/util/cmap"
+	"github.com/scylladb/go-set/strset"
 )
 
 type FieldInfo struct {
 	Fieldname string
 	Fieldtype string
-	Keyidx    int    // 主键顺序值,0为非主键
-	Datakey   string // 对应数据中的键名
+	Keyidx    int      // 主键顺序值,0为非主键
+	Datakey   []string // 对应数据中的键名
 }
 
 type ClassInfo struct {
-	Classname         string
-	Aliasname         string
+	Classaliasname    string
 	Classfullname     string
 	Fieldinfos        map[string]*FieldInfo
 	DatakeyFieldinfos map[string]*FieldInfo
@@ -25,7 +25,7 @@ type ClassInfo struct {
 	Createmql         string
 }
 
-var ClassInfos = cmap.New[string, *ClassInfo]()
+var ClassInfos = cmap.NewSingle[string, *ClassInfo]()
 var ClassNames = []string{}
 
 func init() {
@@ -37,46 +37,53 @@ func init() {
 	)
 	newclassinfo("master", "master", "m3cnet",
 		[]*FieldInfo{
-			{Fieldname: "uniqueid", Fieldtype: "varchar", Keyidx: 1, Datakey: "UNIQUEID"},
-			{Fieldname: "name", Fieldtype: "varchar", Datakey: "NAME"},
-			{Fieldname: "entitytypes", Fieldtype: "set<varchar>", Datakey: "ENTITYTYPES"},
-			{Fieldname: "basename", Fieldtype: "varchar", Datakey: "BASENAME"},
-			{Fieldname: "entitytypesarr", Fieldtype: "varchar", Datakey: "ENTITYTYPESARR"},
-			{Fieldname: "originid", Fieldtype: "varchar", Datakey: "ID"},
-			{Fieldname: "tags", Fieldtype: "set<varchar>", Datakey: "TAGS"},
-			{Fieldname: "changetime", Fieldtype: "timestamp", Datakey: "CHANGETIME"},
-			{Fieldname: "emsname", Fieldtype: "varchar", Datakey: "EMSNAME"},
-			{Fieldname: "sysid", Fieldtype: "varchar", Datakey: "SYSID"},
-			{Fieldname: "site", Fieldtype: "varchar", Datakey: "SITE"},
-			{Fieldname: "vendor", Fieldtype: "varchar", Datakey: "VENDOR"},
-			{Fieldname: "ci_table", Fieldtype: "varchar", Datakey: "CI_TABLE"},
-			{Fieldname: "ci_status", Fieldtype: "varchar", Datakey: "CI_STATUS"},
-			{Fieldname: "rel_status", Fieldtype: "varchar", Datakey: "REL_STATUS"},
-			{Fieldname: "stage", Fieldtype: "varchar", Datakey: "STAGE"},
-			{Fieldname: "extraattr", Fieldtype: "varchar", Datakey: "EXTRAATTR"},
-			{Fieldname: "entityid", Fieldtype: "varchar", Datakey: "ENTITYID"},
-			{Fieldname: "asmchangetime", Fieldtype: "int", Datakey: "ASMCHANGETIME"},
-			{Fieldname: "cmdbmapping", Fieldtype: "varchar", Datakey: "CMDBMAPPING"},
-			{Fieldname: "ipaddress", Fieldtype: "varchar", Datakey: "IPADDRESS"},
-			{Fieldname: "distname", Fieldtype: "varchar", Datakey: "DISTNAME"},
-			{Fieldname: "site_source", Fieldtype: "varchar", Datakey: "SITE_SOURCE"},
-			{Fieldname: "lastupdated", Fieldtype: "timestamp", Datakey: "LASTUPDATED"},
+			{Fieldname: "uniqueid", Fieldtype: "varchar", Keyidx: 1, Datakey: []string{"UNIQUEID"}},
+			{Fieldname: "name", Fieldtype: "varchar", Datakey: []string{"NAME"}},
+			{Fieldname: "entitytypes", Fieldtype: "set<varchar>", Datakey: []string{"ENTITYTYPES"}},
+			{Fieldname: "basename", Fieldtype: "varchar", Datakey: []string{"BASENAME"}},
+			{Fieldname: "entitytypesarr", Fieldtype: "varchar", Datakey: []string{"ENTITYTYPESARR"}},
+			{Fieldname: "originid", Fieldtype: "varchar", Datakey: []string{"ID"}},
+			{Fieldname: "tags", Fieldtype: "set<varchar>", Datakey: []string{"TAGS"}},
+			{Fieldname: "changetime", Fieldtype: "timestamp", Datakey: []string{"CHANGETIME"}},
+			{Fieldname: "emsname", Fieldtype: "varchar", Datakey: []string{"EMSNAME"}},
+			{Fieldname: "sysid", Fieldtype: "varchar", Datakey: []string{"SYSID"}},
+			{Fieldname: "site", Fieldtype: "varchar", Datakey: []string{"SITE"}},
+			{Fieldname: "vendor", Fieldtype: "varchar", Datakey: []string{"VENDOR"}},
+			{Fieldname: "ci_table", Fieldtype: "varchar", Datakey: []string{"CI_TABLE"}},
+			{Fieldname: "ci_status", Fieldtype: "varchar", Datakey: []string{"CI_STATUS"}},
+			{Fieldname: "rel_status", Fieldtype: "varchar", Datakey: []string{"REL_STATUS"}},
+			{Fieldname: "stage", Fieldtype: "varchar", Datakey: []string{"STAGE"}},
+			{Fieldname: "extraattr", Fieldtype: "varchar", Datakey: []string{"EXTRAATTR"}},
+			{Fieldname: "entityid", Fieldtype: "varchar", Datakey: []string{"ENTITYID"}},
+			{Fieldname: "asmchangetime", Fieldtype: "int", Datakey: []string{"ASMCHANGETIME"}},
+			{Fieldname: "cmdbmapping", Fieldtype: "varchar", Datakey: []string{"CMDBMAPPING"}},
+			{Fieldname: "ipaddress", Fieldtype: "varchar", Datakey: []string{"IPADDRESS"}},
+			{Fieldname: "distname", Fieldtype: "varchar", Datakey: []string{"DISTNAME"}},
+			{Fieldname: "site_source", Fieldtype: "varchar", Datakey: []string{"SITE_SOURCE"}},
+			{Fieldname: "lastupdated", Fieldtype: "timestamp", Datakey: []string{"LASTUPDATED"}},
 		},
-		[2]string{"partition", "name"},
 		[2]string{"key", "manu"},
 	)
 	newclassinfo("minfo", "minfo", "m3cnet",
 		[]*FieldInfo{
-			{Fieldname: "uniqueid", Fieldtype: "varchar", Keyidx: 1, Datakey: "uniqueId"},
-			{Fieldname: "distname", Fieldtype: "varchar", Datakey: "distName"},
-			{Fieldname: "name", Fieldtype: "varchar", Datakey: "name"},
-			{Fieldname: "entitytypes", Fieldtype: "set<varchar>", Datakey: "entityTypes"},
-			{Fieldname: "extraattr", Fieldtype: "varchar", Datakey: ""},
-			{Fieldname: "tags", Fieldtype: "set<varchar>", Datakey: "tags"},
+			{Fieldname: "uniqueid", Fieldtype: "varchar", Keyidx: 1, Datakey: []string{"uniqueId"}},
+			{Fieldname: "distname", Fieldtype: "varchar", Datakey: []string{"distName"}},
+			{Fieldname: "name", Fieldtype: "varchar", Datakey: []string{"name"}},
+			{Fieldname: "entitytypes", Fieldtype: "set<varchar>", Datakey: []string{"entityTypes"}},
+			{Fieldname: "extraattr", Fieldtype: "varchar", Datakey: []string{"*"}},
+			{Fieldname: "tags", Fieldtype: "set<varchar>", Datakey: []string{"tags"}},
 		},
 		[2]string{"key", "manu"},
 	)
-	newclassinfo("level1", "level1", "minfo", nil,
+	newclassinfo("level1", "level1", "master",
+		[]*FieldInfo{
+			{Fieldname: "uniqueid", Fieldtype: "varchar", Keyidx: 1, Datakey: []string{"uniqueId"}},
+			{Fieldname: "distname", Fieldtype: "varchar", Datakey: []string{"distName"}},
+			{Fieldname: "name", Fieldtype: "varchar", Datakey: []string{"name"}},
+			{Fieldname: "entitytypes", Fieldtype: "set<varchar>", Datakey: []string{"entityTypes"}},
+			{Fieldname: "extraattr", Fieldtype: "varchar", Datakey: []string{"*"}},
+			{Fieldname: "tags", Fieldtype: "set<varchar>", Datakey: []string{"tags"}},
+		},
 		[2]string{"partition", "name"},
 		[2]string{"key", "manu"},
 	)
@@ -110,38 +117,50 @@ func init() {
 	)
 }
 
-func newclassinfo(aliasname, classname, baseclassname string, fieldinfoslist []*FieldInfo, withoptions ...[2]string) (ci *ClassInfo) {
-	defer func() {
-		ClassNames = append(ClassNames, classname)
-		ClassInfos.Set(classname, ci)
-	}()
+func newclassinfo(classaliasname, classsimplename, baseclassaliasname string, fieldinfoslist []*FieldInfo, withoptions ...[2]string) (ci *ClassInfo) {
 	fieldinfos := map[string]*FieldInfo{}
 	datakey_fieldinfos := map[string]*FieldInfo{}
 	fieldslist := []string{}
+	fieldsset := strset.New()
 	keyfields := []string{}
-	createmql := `create class if not exists ` + classname + `:` + baseclassname + `(`
+	createmql := `create class if not exists ` + classsimplename + `:` + baseclassaliasname + `(`
 	classfullname := ""
-	bci := ClassInfos.GetIFPresent(baseclassname)
+	bci := ClassInfos.GetIFPresent(baseclassaliasname)
 	if bci != nil {
-		classfullname = bci.Classfullname + "/" + classname
+		classfullname = bci.Classfullname + "/" + classsimplename
 		for fn, fi := range bci.Fieldinfos {
 			fieldinfos[fn] = fi
-			datakey_fieldinfos[fi.Datakey] = fi
+			for _, dk := range fi.Datakey {
+				datakey_fieldinfos[dk] = fi
+			}
 		}
 		fieldslist = append(fieldslist, bci.Fieldslist...)
+		fieldsset.Add(bci.Fieldslist...)
 		keyfields = append(keyfields, bci.Keyfields...)
 	} else {
-		if baseclassname != "/" && baseclassname != "" {
-			panic("baseclassname not defined " + baseclassname)
+		if baseclassaliasname != "/" && baseclassaliasname != "" {
+			panic("baseclassname not defined " + baseclassaliasname)
 		}
-		classfullname = "/" + classname
+		classfullname = "/" + classsimplename
 	}
+	defer func() {
+		ClassNames = append(ClassNames, classaliasname)
+		ClassInfos.Set(classaliasname, ci)
+		ClassInfos.Set(classfullname, ci)
+	}()
 	keyfield_defines := []string{}
 	if len(fieldinfoslist) > 0 {
 		field_defines := []string{}
 		for _, fi := range fieldinfoslist {
-			field_defines = append(field_defines, fi.Fieldname+` `+fi.Fieldtype+`"`+fi.Datakey+`"`)
-			fieldslist = append(fieldslist, fi.Fieldname)
+			field_define := fi.Fieldname + ` ` + fi.Fieldtype
+			if !fieldsset.Has(fi.Fieldname) {
+				fieldslist = append(fieldslist, fi.Fieldname)
+				fieldsset.Add(fi.Fieldname)
+			} else {
+				fi.Datakey = append(fi.Datakey, bci.Fieldinfos[fi.Fieldname].Datakey...)
+			}
+			field_define += `"` + strings.Join(fi.Datakey, ",") + `"`
+			field_defines = append(field_defines, field_define)
 			if fi.Keyidx > 0 {
 				for len(keyfield_defines) < fi.Keyidx {
 					keyfield_defines = append(keyfield_defines, "")
@@ -149,7 +168,9 @@ func newclassinfo(aliasname, classname, baseclassname string, fieldinfoslist []*
 				keyfield_defines[fi.Keyidx-1] = fi.Fieldname
 			}
 			fieldinfos[fi.Fieldname] = fi
-			datakey_fieldinfos[fi.Datakey] = fi
+			for _, dk := range fi.Datakey {
+				datakey_fieldinfos[dk] = fi
+			}
 		}
 		createmql += strings.Join(field_defines, ",")
 	}
@@ -157,24 +178,35 @@ func newclassinfo(aliasname, classname, baseclassname string, fieldinfoslist []*
 		createmql += ", keys(" + strings.Join(keyfield_defines, ",") + ")"
 		keyfields = append(keyfields, keyfield_defines...)
 	}
-	createmql += `)with alias='` + aliasname + `'`
+	createmql += `)with alias='` + classaliasname + `'`
 	for _, withoption := range withoptions {
 		createmql += " and " + withoption[0] + "=" + withoption[1]
 		if withoption[0] == "key" && withoption[1] == "manu" {
-			fieldslist = append([]string{"id"}, fieldslist...)
+			if !fieldsset.Has("id") {
+				fieldslist = append([]string{"id"}, fieldslist...)
+				fieldsset.Add("id")
+			}
 		}
 	}
-	fieldslist = append(fieldslist, "contain")
-	fieldslist = append(fieldslist, "depend")
-	fieldslist = append(fieldslist, "topology")
+	if !fieldsset.Has("contain") {
+		fieldslist = append(fieldslist, "contain")
+		fieldsset.Add("contain")
+	}
+	if !fieldsset.Has("depend") {
+		fieldslist = append(fieldslist, "depend")
+		fieldsset.Add("depend")
+	}
+	if !fieldsset.Has("topology") {
+		fieldslist = append(fieldslist, "topology")
+		fieldsset.Add("topology")
+	}
 
 	var insertmql string
 	if len(fieldslist) > 0 {
-		insertmql = `insert into ` + classname + "(" + strings.Join(fieldslist, ",") + ")values(" + strings.Repeat(",?", len(fieldslist))[1:] + ")"
+		insertmql = `insert into ` + classfullname + "(" + strings.Join(fieldslist, ",") + ")values(" + strings.Repeat(",?", len(fieldslist))[1:] + ")"
 	}
 	ci = &ClassInfo{
-		Classname:         classname,
-		Aliasname:         aliasname,
+		Classaliasname:    classaliasname,
 		Classfullname:     classfullname,
 		Fieldinfos:        fieldinfos,
 		DatakeyFieldinfos: datakey_fieldinfos,
@@ -229,7 +261,7 @@ create class if not exists minfo : m3cnet (
 	keys( uniqueid ) 
 ) with alias='minfo' ;
 
-create class if not exists level1 : minfo () with partition=entitytypes , alias='level1' ;
+create class if not exists level1 : master () with partition=entitytypes , alias='level1' ;
 create class if not exists level2 : minfo () with partition=entitytypes , alias='level2' ;
 create class if not exists level3 : minfo () with partition=entitytypes , alias='level3' ;
 create class if not exists level4 : minfo () with partition=entitytypes , alias='level4' ;