| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333 | package importerimport (	"encoding/json"	"fmt"	"io"	"os"	"path/filepath"	"regexp"	"strings"	"sync"	"sync/atomic"	"time"	"git.wecise.com/wecise/cgimport/graph"	"git.wecise.com/wecise/cgimport/odbc"	"git.wecise.com/wecise/cgimport/reader"	"git.wecise.com/wecise/cgimport/schema"	"git.wecise.com/wecise/util/filewalker"	"git.wecise.com/wecise/util/merrs"	"git.wecise.com/wecise/util/rc")var mcfg = odbc.Configvar logger = odbc.Loggertype Importer struct {	datapath         string	parallel         int	reload           bool	importstatus     *CGIStatus	fileimportrc     *rc.RoutinesController	odbcqueryrc      *rc.RoutinesController	odbcimporter     *ODBCImporter	starttime        time.Time	currentstarttime time.Time}func ImportDir(datapath string, parallel int, reload bool) (totalfilescount, totalrecordscount int64, totalusetime time.Duration, filescount, recordscount int64, usetime time.Duration, err error) {	importer := &Importer{		datapath:     datapath,		parallel:     parallel,		reload:       reload,		importstatus: NewCGIStatus(),		fileimportrc: rc.NewRoutinesController("", parallel),		odbcqueryrc:  rc.NewRoutinesController("", parallel*10),		odbcimporter: NewODBCImporter(),	}	return importer.Import()}func (importer *Importer) Import() (totalfilescount, totalrecordscount int64, totalusetime time.Duration, filescount, recordscount int64, usetime time.Duration, err error) {	if odbc.DevPhase&odbc.DP_PROCESSCONTINUE != 0 && !importer.reload {		err = importer.importstatus.Load()		if err != nil {			return		}	} else {		// reload		// 清除已有类		err = importer.odbcimporter.reload()		if err != nil {			return		}	}	// 建类	err = importer.odbcimporter.ReviseClassStruct()	if err != nil {		return	}	totalfilescount = int64(len(importer.importstatus.ImportStatus))	for _, v := range importer.importstatus.ImportStatus {		totalrecordscount += v.RecordsCount	}	totalusetime = importer.importstatus.TotalUseTime	importer.starttime = time.Now().Add(-totalusetime)	importer.currentstarttime = time.Now()	reedgefile := regexp.MustCompile("(?i).*edge.*.csv")	fc, rc, ut, e := importer.ImportEdgeFiles(reedgefile)	if e != nil {		err = e		return	}	totalfilescount += fc	totalrecordscount += rc	filescount += fc	recordscount += rc	usetime += ut	totalusetime = importer.importstatus.TotalUseTime	fc, rc, ut, e = importer.ImportNonEdgeFiles(reedgefile)	if e != nil {		err = e		return	}	totalfilescount += fc	totalrecordscount += rc	filescount += fc	recordscount += rc	usetime += ut	totalusetime = importer.importstatus.TotalUseTime	importer.importstatus.WaitSaveDone()	importer.alldone()	return}func (importer *Importer) ImportEdgeFiles(reedgefile *regexp.Regexp) (filescount, recordscount int64, usetime time.Duration, err error) {	return importer.ImportFiles(func(basedir string, fpath string) FWOP {		if !reedgefile.MatchString(filepath.Base(fpath)) {			// 忽略非EDGE文件			return FWOP_IGNORE		}		return FWOP_CONTINUE	})}func (importer *Importer) ImportNonEdgeFiles(reedgefile *regexp.Regexp) (filescount, recordscount int64, usetime time.Duration, err error) {	return importer.ImportFiles(func(basedir string, fpath string) FWOP {		if reedgefile.MatchString(filepath.Base(fpath)) {			// 忽略EDGE文件			return FWOP_IGNORE		}		return FWOP_CONTINUE	})}type FWOP intconst (	FWOP_IGNORE FWOP = iota + 1	FWOP_BREAK	FWOP_CONTINUE)func (importer *Importer) ImportFiles(fwop func(basedir string, fpath string) FWOP) (filescount, recordscount int64, usetime time.Duration, err error) {	// 遍历文件目录	var wg sync.WaitGroup	fw, e := filewalker.NewFileWalker([]string{importer.datapath}, ".*")	if e != nil {		err = e		return	}	e = fw.List(func(basedir string, fpath string) bool {		if err != nil {			// 前方发生错误,结束遍历			return false		}		if strings.Contains(fpath, string(filepath.Separator)) {			// 忽略子目录,fw.List有序,目录排在文件后面,遇到子目录即可结束遍历			return false		}		switch fwop(basedir, fpath) {		case FWOP_IGNORE:			// 忽略当前文件,继续处理下一文件			return true		case FWOP_BREAK:			// 结束遍历			return false		case FWOP_CONTINUE:		default:		}		// 继续处理当前文件		filename := filepath.Join(basedir, fpath)		wg.Add(1)		// 并发处理		importer.fileimportrc.ConcurCall(1,			func() {				defer wg.Done()				importer.importstatus.mutex.RLock()				importstatus := importer.importstatus.ImportStatus[filename]				importer.importstatus.mutex.RUnlock()				importedrecordscount := int64(0)				if importstatus != nil {					importedrecordscount = importstatus.RecordsCount					return				}				records, e := importer.ImportFile(filename, importedrecordscount)				if e != nil {					err = e					return				}				atomic.AddInt64(&filescount, 1)				atomic.AddInt64(&recordscount, records)				usetime = time.Since(importer.currentstarttime)				importer.importstatus.mutex.Lock()				importer.importstatus.ImportStatus[filename] = &ImportStatus{RecordsCount: importedrecordscount + records}				importer.importstatus.TotalUseTime = time.Since(importer.starttime)				importer.importstatus.mutex.Unlock()				importer.importstatus.Save()			},		)		return true	})	wg.Wait()	if e != nil {		if os.IsNotExist(e) {			err = merrs.NewError(`directory "`+importer.datapath+`" not exist specified by "datapath"`, e)		} else {			err = merrs.NewError(e)		}		return	}	return}func (importer *Importer) ImportFile(filepath string, skiprecordscount int64) (blockcount int64, err error) {	f, e := os.Open(filepath)	if e != nil {		return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filepath}})	}	defer f.Close()	return importer.importReader(filepath, f, skiprecordscount)}func (importer *Importer) importReader(filename string, buf io.Reader, skiprecordscount int64) (blockcount int64, err error) {	var filetype schema.FileType	switch {	case strings.Contains(filename, "_L1_"):		filetype = schema.FT_LEVEL1	case strings.Contains(filename, "_L2_"):		filetype = schema.FT_LEVEL2	case strings.Contains(filename, "_L3_"):		filetype = schema.FT_LEVEL3	case strings.Contains(filename, "_L4_"):		filetype = schema.FT_LEVEL4	case strings.Contains(filename, "_L5_"):		filetype = schema.FT_LEVEL5	case strings.Contains(filename, "_L6_"):		filetype = schema.FT_LEVEL6	case strings.Contains(filename, "_L7_"):		filetype = schema.FT_LEVEL7	case strings.Contains(filename, "_L8_"):		filetype = schema.FT_LEVEL8	case strings.Contains(filename, "MASTER"):		filetype = schema.FT_MASTER	case strings.Contains(filename, "EDGE"):		filetype = schema.FT_EDGE	default:		err = merrs.NewError("filename does not conform to the agreed format " + filename)		return	}	br, e := reader.NewBlockReader(filename, filetype, buf)	if e != nil {		return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})	}	var wg sync.WaitGroup	defer importer.done()	defer wg.Wait()	n := int64(0)	for {		if err != nil {			break		}		block, line, linecount, e := br.ReadBlock()		if e != nil {			return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})		}		if block == nil {			return		}		n++		if n <= skiprecordscount {			continue		}		wg.Add(1)		e = importer.odbcqueryrc.ConcurCall(1, func() {			defer wg.Done()			e = importer.importRecord(block, line, filename, filetype, linecount)			if e != nil {				err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})				return			}			atomic.AddInt64(&blockcount, 1)		})		if e != nil {			return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})		}	}	return}func (importer *Importer) importRecord(record map[string]any, line string, filename string, filetype schema.FileType, linecount int) (err error) {	if odbc.LogDebug {		bs, e := json.MarshalIndent(record, "", "  ")		if e != nil {			return merrs.NewError(e)		}		logger.Debug(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs)))	}	var classname string	switch filetype {	case schema.FT_EDGE:		// err = importer.odbcimporter.InsertEdge(record)		// if err != nil {		// 	err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})		// 	return		// }		graph.CacheEdgeInfo(record)	default:		classname = string(filetype)		err = importer.odbcimporter.InsertData(classname, record)		if err != nil {			err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})			return		}	}	return}func (importer *Importer) alldone() {	importer.odbcimporter.alldone()}func (importer *Importer) done() {	importer.odbcimporter.done()}func Check() {	client := odbc.ODBClient	if client == nil {		return	}	{		mql := "select id,uniqueid,tags,contain,day,vtime from level1 where uniqueid='E2E:OTR0002L'"		r, e := client.Query(mql).Do()		if e != nil {			panic(merrs.NewError(e))		}		bs, _ := json.MarshalIndent(r.Data, "", "  ")		fmt.Println(string(bs))	}}
 |