importer.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. package importer
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path/filepath"
  8. "regexp"
  9. "strings"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "git.wecise.com/wecise/cgimport/graph"
  14. "git.wecise.com/wecise/cgimport/odbc"
  15. "git.wecise.com/wecise/cgimport/reader"
  16. "git.wecise.com/wecise/cgimport/schema"
  17. "github.com/wecisecode/util/filewalker"
  18. "github.com/wecisecode/util/merrs"
  19. "github.com/wecisecode/util/rc"
  20. )
  21. var mcfg = odbc.Config
  22. var logger = odbc.Logger
  23. type Importer struct {
  24. datapath string
  25. parallel int
  26. rebuild bool
  27. reload bool
  28. importstatus *CGIStatus
  29. fileimportrc *rc.RoutinesController
  30. odbcqueryrc *rc.RoutinesController
  31. odbcimporter *ODBCImporter
  32. starttime time.Time
  33. currentstarttime time.Time
  34. }
  35. func ImportDir(datapath string, parallel int, rebuild, reload bool) (totalfilescount, totalrecordscount int64, totalusetime time.Duration, filescount, recordscount int64, usetime time.Duration, err error) {
  36. concurlimt := mcfg.GetInt("odbc.concurrent.limit", 100)
  37. importer := &Importer{
  38. datapath: datapath,
  39. parallel: parallel,
  40. rebuild: rebuild,
  41. reload: reload,
  42. importstatus: NewCGIStatus(),
  43. fileimportrc: rc.NewRoutinesController("", parallel),
  44. odbcqueryrc: rc.NewRoutinesControllerLimit("", concurlimt, concurlimt*2),
  45. odbcimporter: NewODBCImporter(),
  46. }
  47. return importer.Import()
  48. }
  49. func (importer *Importer) Import() (totalfilescount, totalrecordscount int64, totalusetime time.Duration, filescount, recordscount int64, usetime time.Duration, err error) {
  50. if odbc.DevPhase&odbc.DP_PROCESSCONTINUE != 0 && !importer.reload {
  51. err = importer.importstatus.Load()
  52. if err != nil {
  53. return
  54. }
  55. err = importer.odbcimporter.init()
  56. if err != nil {
  57. return
  58. }
  59. }
  60. if importer.rebuild {
  61. // reload
  62. // 清除已有类
  63. err = importer.odbcimporter.reload()
  64. if err != nil {
  65. return
  66. }
  67. }
  68. // 建类
  69. err = importer.odbcimporter.ReviseClassStruct()
  70. if err != nil {
  71. return
  72. }
  73. totalfilescount = int64(len(importer.importstatus.ImportStatus))
  74. for _, v := range importer.importstatus.ImportStatus {
  75. totalrecordscount += v.RecordsCount
  76. }
  77. totalusetime = importer.importstatus.TotalUseTime
  78. importer.starttime = time.Now().Add(-totalusetime)
  79. importer.currentstarttime = time.Now()
  80. reedgefile := regexp.MustCompile("(?i).*edge.*.csv")
  81. fc, rc, ut, e := importer.ImportEdgeFiles(reedgefile)
  82. if e != nil {
  83. err = e
  84. return
  85. }
  86. totalfilescount += fc
  87. totalrecordscount += rc
  88. filescount += fc
  89. recordscount += rc
  90. usetime += ut
  91. totalusetime = importer.importstatus.TotalUseTime
  92. fc, rc, ut, e = importer.ImportNonEdgeFiles(reedgefile)
  93. if e != nil {
  94. err = e
  95. return
  96. }
  97. totalfilescount += fc
  98. totalrecordscount += rc
  99. filescount += fc
  100. recordscount += rc
  101. usetime += ut
  102. totalusetime = importer.importstatus.TotalUseTime
  103. importer.importstatus.WaitSaveDone()
  104. importer.alldone()
  105. return
  106. }
  107. func (importer *Importer) ImportEdgeFiles(reedgefile *regexp.Regexp) (filescount, recordscount int64, usetime time.Duration, err error) {
  108. return importer.ImportFiles(func(basedir string, fpath string) FWOP {
  109. if !reedgefile.MatchString(filepath.Base(fpath)) {
  110. // 忽略非EDGE文件
  111. return FWOP_IGNORE
  112. }
  113. return FWOP_CONTINUE
  114. })
  115. }
  116. func (importer *Importer) ImportNonEdgeFiles(reedgefile *regexp.Regexp) (filescount, recordscount int64, usetime time.Duration, err error) {
  117. return importer.ImportFiles(func(basedir string, fpath string) FWOP {
  118. if reedgefile.MatchString(filepath.Base(fpath)) {
  119. // 忽略EDGE文件
  120. return FWOP_IGNORE
  121. }
  122. return FWOP_CONTINUE
  123. })
  124. }
  125. type FWOP int
  126. const (
  127. FWOP_IGNORE FWOP = iota + 1
  128. FWOP_BREAK
  129. FWOP_CONTINUE
  130. )
  131. func (importer *Importer) ImportFiles(fwop func(basedir string, fpath string) FWOP) (filescount, recordscount int64, usetime time.Duration, err error) {
  132. // 遍历文件目录
  133. var wg sync.WaitGroup
  134. fw, e := filewalker.NewFileWalker([]string{importer.datapath}, ".*")
  135. if e != nil {
  136. err = e
  137. return
  138. }
  139. e = fw.List(func(basedir string, fpath string) bool {
  140. if err != nil {
  141. // 前方发生错误,结束遍历
  142. return false
  143. }
  144. if strings.Contains(fpath, string(filepath.Separator)) {
  145. // 忽略子目录,fw.List有序,目录排在文件后面,遇到子目录即可结束遍历
  146. return false
  147. }
  148. switch fwop(basedir, fpath) {
  149. case FWOP_IGNORE:
  150. // 忽略当前文件,继续处理下一文件
  151. return true
  152. case FWOP_BREAK:
  153. // 结束遍历
  154. return false
  155. case FWOP_CONTINUE:
  156. default:
  157. }
  158. // 继续处理当前文件
  159. filename := filepath.Join(basedir, fpath)
  160. wg.Add(1)
  161. // 并发处理
  162. importer.fileimportrc.ConcurCall(1,
  163. func() {
  164. defer wg.Done()
  165. logger.Info("import", "file", filename)
  166. importer.importstatus.mutex.RLock()
  167. importstatus := importer.importstatus.ImportStatus[filename]
  168. importer.importstatus.mutex.RUnlock()
  169. importedrecordscount := int64(0)
  170. if importstatus != nil {
  171. importedrecordscount = importstatus.RecordsCount
  172. return
  173. }
  174. records, e := importer.ImportFile(filename, importedrecordscount)
  175. if e != nil {
  176. err = e
  177. return
  178. }
  179. atomic.AddInt64(&filescount, 1)
  180. atomic.AddInt64(&recordscount, records)
  181. usetime = time.Since(importer.currentstarttime)
  182. importer.importstatus.mutex.Lock()
  183. importer.importstatus.ImportStatus[filename] = &ImportStatus{RecordsCount: importedrecordscount + records}
  184. importer.importstatus.TotalUseTime = time.Since(importer.starttime)
  185. importer.importstatus.mutex.Unlock()
  186. importer.importstatus.Save()
  187. logger.Info("file", filename, "imported", records, "records")
  188. },
  189. )
  190. return true
  191. })
  192. wg.Wait()
  193. if e != nil {
  194. if os.IsNotExist(e) {
  195. err = merrs.NewError(`directory "`+importer.datapath+`" not exist specified by "datapath"`, e)
  196. } else {
  197. err = merrs.NewError(e)
  198. }
  199. return
  200. }
  201. return
  202. }
  203. func (importer *Importer) ImportFile(filepath string, skiprecordscount int64) (blockcount int64, err error) {
  204. f, e := os.Open(filepath)
  205. if e != nil {
  206. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filepath}})
  207. }
  208. defer f.Close()
  209. return importer.importReader(filepath, f, skiprecordscount)
  210. }
  211. func (importer *Importer) importReader(filename string, buf io.Reader, skiprecordscount int64) (blockcount int64, err error) {
  212. var filetype schema.FileType
  213. switch {
  214. case strings.Contains(filename, "_L1_"):
  215. filetype = schema.FT_LEVEL1
  216. case strings.Contains(filename, "_L2_"):
  217. filetype = schema.FT_LEVEL2
  218. case strings.Contains(filename, "_L3_"):
  219. filetype = schema.FT_LEVEL3
  220. case strings.Contains(filename, "_L4_"):
  221. filetype = schema.FT_LEVEL4
  222. case strings.Contains(filename, "_L5_"):
  223. filetype = schema.FT_LEVEL5
  224. case strings.Contains(filename, "_L6_"):
  225. filetype = schema.FT_LEVEL6
  226. case strings.Contains(filename, "_L7_"):
  227. filetype = schema.FT_LEVEL7
  228. case strings.Contains(filename, "_L8_"):
  229. filetype = schema.FT_LEVEL8
  230. case strings.Contains(filename, "MASTER"):
  231. filetype = schema.FT_MASTER
  232. case strings.Contains(filename, "EDGE"):
  233. filetype = schema.FT_EDGE
  234. default:
  235. err = merrs.NewError("filename does not conform to the agreed format " + filename)
  236. return
  237. }
  238. br, e := reader.NewBlockReader(filename, filetype, buf)
  239. if e != nil {
  240. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
  241. }
  242. var wg sync.WaitGroup
  243. defer importer.done()
  244. defer wg.Wait()
  245. n := int64(0)
  246. for {
  247. if err != nil {
  248. break
  249. }
  250. block, line, linecount, e := br.ReadBlock()
  251. if e != nil {
  252. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  253. }
  254. if block == nil {
  255. return
  256. }
  257. n++
  258. if n <= skiprecordscount {
  259. continue
  260. }
  261. wg.Add(1)
  262. e = importer.odbcqueryrc.ConcurCall(1, func() {
  263. defer wg.Done()
  264. e = importer.importRecord(block, line, filename, filetype, linecount)
  265. if e != nil {
  266. err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  267. return
  268. }
  269. atomic.AddInt64(&blockcount, 1)
  270. })
  271. if e != nil {
  272. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  273. }
  274. }
  275. return
  276. }
  277. func (importer *Importer) importRecord(record map[string]any, line string, filename string, filetype schema.FileType, linecount int) (err error) {
  278. if odbc.LogDebug {
  279. bs, e := json.MarshalIndent(record, "", " ")
  280. if e != nil {
  281. return merrs.NewError(e)
  282. }
  283. logger.Debug(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs)))
  284. }
  285. var classaliasname string
  286. switch filetype {
  287. case schema.FT_EDGE:
  288. graph.CacheEdgeInfo(record)
  289. default:
  290. classaliasname = string(filetype)
  291. err = importer.odbcimporter.InsertData(classaliasname, record)
  292. if err != nil {
  293. err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  294. return
  295. }
  296. }
  297. return
  298. }
  299. func (importer *Importer) alldone() {
  300. importer.odbcimporter.alldone()
  301. }
  302. func (importer *Importer) done() {
  303. importer.odbcimporter.done()
  304. }
  305. func Check() {
  306. client := odbc.ODBClient
  307. if client == nil {
  308. return
  309. }
  310. {
  311. // mql := "select id,uniqueid,tags,contain,day,vtime from level1 where uniqueid='E2E:OTR0002L'"
  312. mql := "select * from level1 where uniqueid='E2E:OTR0002L'"
  313. r, e := client.Query(mql).Do()
  314. if e != nil {
  315. panic(merrs.NewError(e))
  316. }
  317. bs, _ := json.MarshalIndent(r.Data, "", " ")
  318. fmt.Println(string(bs))
  319. }
  320. {
  321. mql := `match ("level1:E2E:OTR0002L")-[*]->(),("level1:E2E:OTR0002L")<-[*]-() in "level1","level2"`
  322. r, e := client.Query(mql).Do()
  323. if e != nil {
  324. panic(merrs.NewError(e))
  325. }
  326. bs, _ := json.MarshalIndent(r.Data, "", " ")
  327. fmt.Println(string(bs))
  328. }
  329. }