importer.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420
  1. package importer
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path/filepath"
  8. "regexp"
  9. "runtime"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. "git.wecise.com/wecise/cgimport/graph"
  15. "git.wecise.com/wecise/cgimport/odbc"
  16. "git.wecise.com/wecise/cgimport/reader"
  17. "git.wecise.com/wecise/cgimport/schema"
  18. "github.com/wecisecode/util/filewalker"
  19. "github.com/wecisecode/util/merrs"
  20. "github.com/wecisecode/util/rc"
  21. )
  22. var mcfg = odbc.Config
  23. var logger = odbc.Logger
  24. type Importer struct {
  25. datapath string
  26. parallel int
  27. rebuild bool
  28. reload bool
  29. importstatus *CGIStatus
  30. fileimportrc *rc.RoutinesController
  31. odbcqueryrc *rc.RoutinesController
  32. odbcimporter *ODBCImporter
  33. starttime time.Time
  34. currentstarttime time.Time
  35. }
  36. func ImportDir(datapath string, parallel int, rebuild, reload bool) (totalfilescount, totallinecount, totalrecordscount, totalretrycount int64, totalusetime time.Duration, filescount, linescount, recordscount, retrycount int64, usetime time.Duration, err error) {
  37. concurlimt := mcfg.GetInt("odbc.concurrent.limit", runtime.GOMAXPROCS(0))
  38. importer := &Importer{
  39. datapath: datapath,
  40. parallel: parallel,
  41. rebuild: rebuild,
  42. reload: reload,
  43. importstatus: NewCGIStatus(),
  44. fileimportrc: rc.NewRoutinesController("", parallel),
  45. odbcqueryrc: rc.NewRoutinesControllerLimit("", concurlimt, concurlimt*5),
  46. odbcimporter: NewODBCImporter(),
  47. }
  48. return importer.Import()
  49. }
  50. func (importer *Importer) Import() (totalfilescount, totallinecount, totalrecordscount, totalretrycount int64, totalusetime time.Duration, filescount, linescount, recordscount, retrycount int64, usetime time.Duration, err error) {
  51. if odbc.DevPhase&odbc.DP_PROCESSCONTINUE != 0 && !importer.reload && !importer.rebuild {
  52. // reload
  53. err = importer.importstatus.Load()
  54. if err != nil {
  55. return
  56. }
  57. }
  58. err = importer.odbcimporter.init(importer.rebuild)
  59. if err != nil {
  60. return
  61. }
  62. //
  63. logger.Info("graph data import start")
  64. totalusetime = importer.importstatus.TotalUseTime
  65. importer.starttime = time.Now().Add(-totalusetime)
  66. importer.currentstarttime = time.Now()
  67. reedgefile := regexp.MustCompile("(?i).*edge.*.csv")
  68. efc, elc, erc, etc, ut, e := importer.ImportEdgeFiles(reedgefile, false)
  69. if e != nil {
  70. err = e
  71. return
  72. }
  73. afc, alc, arc, atc, ut, e := importer.ImportNonEdgeFiles(reedgefile, true)
  74. if e != nil {
  75. err = e
  76. return
  77. }
  78. totalfilescount = int64(len(importer.importstatus.ImportStatus)) + efc
  79. for _, v := range importer.importstatus.ImportStatus {
  80. totallinecount += v.LinesCount
  81. totalrecordscount += v.RecordsCount
  82. totalretrycount += v.RetryCount
  83. }
  84. totallinecount += elc
  85. totalrecordscount += erc
  86. totalretrycount += etc
  87. filescount = afc + efc
  88. linescount = alc + elc
  89. recordscount = arc + erc
  90. retrycount = atc + etc
  91. usetime = ut
  92. totalusetime = importer.importstatus.TotalUseTime
  93. importer.importstatus.WaitSaveDone()
  94. importer.alldone()
  95. return
  96. }
  97. func (importer *Importer) ImportEdgeFiles(reedgefile *regexp.Regexp, logstatus bool) (filescount, linecount, recordscount, retrycount int64, usetime time.Duration, err error) {
  98. return importer.ImportFiles(func(basedir string, fpath string) FWOP {
  99. if !reedgefile.MatchString(filepath.Base(fpath)) {
  100. // 忽略非EDGE文件
  101. return FWOP_IGNORE
  102. }
  103. return FWOP_CONTINUE
  104. }, logstatus)
  105. }
  106. func (importer *Importer) ImportNonEdgeFiles(reedgefile *regexp.Regexp, logstatus bool) (filescount, linecount, recordscount, retrycount int64, usetime time.Duration, err error) {
  107. return importer.ImportFiles(func(basedir string, fpath string) FWOP {
  108. if reedgefile.MatchString(filepath.Base(fpath)) {
  109. // 忽略EDGE文件
  110. return FWOP_IGNORE
  111. }
  112. return FWOP_CONTINUE
  113. }, logstatus)
  114. }
  115. type FWOP int
  116. const (
  117. FWOP_IGNORE FWOP = iota + 1
  118. FWOP_BREAK
  119. FWOP_CONTINUE
  120. )
  121. func (importer *Importer) ImportFiles(fwop func(basedir string, fpath string) FWOP, logstatus bool) (filescount, linescount, recordscount, retrycount int64, usetime time.Duration, err error) {
  122. // 遍历文件目录
  123. var wg sync.WaitGroup
  124. fw, e := filewalker.NewFileWalker([]string{importer.datapath}, `^[^\.].*`)
  125. if e != nil {
  126. err = e
  127. return
  128. }
  129. e = fw.List(func(basedir string, fpath string) bool {
  130. if err != nil {
  131. // 前方发生错误,结束遍历
  132. return false
  133. }
  134. if strings.Contains(fpath, string(filepath.Separator)) {
  135. // 忽略子目录,fw.List有序,目录排在文件后面,遇到子目录即可结束遍历
  136. return false
  137. }
  138. switch fwop(basedir, fpath) {
  139. case FWOP_IGNORE:
  140. // 忽略当前文件,继续处理下一文件
  141. return true
  142. case FWOP_BREAK:
  143. // 结束遍历
  144. return false
  145. case FWOP_CONTINUE:
  146. default:
  147. }
  148. // 继续处理当前文件
  149. filename := filepath.Join(basedir, fpath)
  150. filescount++
  151. wg.Add(1)
  152. // 并发处理
  153. importer.fileimportrc.ConcurCall(1,
  154. func() {
  155. defer wg.Done()
  156. importer.importstatus.mutex.RLock()
  157. importstatus := importer.importstatus.ImportStatus[filename]
  158. importer.importstatus.mutex.RUnlock()
  159. linefrom, blockfrom := int64(0), int64(0)
  160. totalretrycount := int64(0)
  161. if importstatus != nil {
  162. linefrom, blockfrom, totalretrycount = importstatus.LinesCount, importstatus.RecordsCount, importstatus.RetryCount
  163. }
  164. if linefrom == 0 {
  165. logger.Info("import", "file", filename)
  166. } else {
  167. logger.Info("import", "file", filename, "from line", linefrom)
  168. }
  169. lines, records, retries, e := importer.ImportFile(filename, linefrom, blockfrom, totalretrycount, logstatus)
  170. if e != nil {
  171. err = e
  172. return
  173. }
  174. atomic.AddInt64(&linescount, lines-linefrom)
  175. atomic.AddInt64(&recordscount, records-blockfrom)
  176. atomic.AddInt64(&retrycount, retries-totalretrycount)
  177. usetime = time.Since(importer.currentstarttime)
  178. importer.importstatus.mutex.Lock()
  179. if logstatus {
  180. importer.importstatus.ImportStatus[filename] = &ImportStatus{
  181. LinesCount: lines,
  182. RecordsCount: records,
  183. RetryCount: retries,
  184. }
  185. }
  186. importer.importstatus.TotalUseTime = time.Since(importer.starttime)
  187. importer.importstatus.mutex.Unlock()
  188. importer.importstatus.Save()
  189. if retries > 0 {
  190. logger.Info("file", filename, "total imported", lines, "lines", records, "records", retries, "retry times")
  191. } else {
  192. logger.Info("file", filename, "total imported", lines, "lines", records, "records")
  193. }
  194. },
  195. )
  196. return true
  197. })
  198. wg.Wait()
  199. if e != nil {
  200. if os.IsNotExist(e) {
  201. err = merrs.NewError(`directory "`+importer.datapath+`" not exist specified by "datapath"`, e)
  202. } else {
  203. err = merrs.NewError(e)
  204. }
  205. return
  206. }
  207. return
  208. }
  209. func (importer *Importer) ImportFile(filepath string, linefrom, blockfrom, totalretrycount int64, logstatus bool) (linecount, blockcount, retrycount int64, err error) {
  210. f, e := os.Open(filepath)
  211. if e != nil {
  212. return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filepath}})
  213. }
  214. defer f.Close()
  215. return importer.importReader(filepath, f, linefrom, blockfrom, totalretrycount, logstatus)
  216. }
  217. func (importer *Importer) importReader(filename string, buf io.Reader, linefrom, blockfrom, totalretrycount int64, logstatus bool) (linecount, blockcount, retrycount int64, err error) {
  218. var filetype schema.FileType
  219. switch {
  220. case strings.Contains(filename, "_L1_"):
  221. filetype = schema.FT_LEVEL1
  222. case strings.Contains(filename, "_L2_"):
  223. filetype = schema.FT_LEVEL2
  224. case strings.Contains(filename, "_L3_"):
  225. filetype = schema.FT_LEVEL3
  226. case strings.Contains(filename, "_L4_"):
  227. filetype = schema.FT_LEVEL4
  228. case strings.Contains(filename, "_L5_"):
  229. filetype = schema.FT_LEVEL5
  230. case strings.Contains(filename, "_L6_"):
  231. filetype = schema.FT_LEVEL6
  232. case strings.Contains(filename, "_L7_"):
  233. filetype = schema.FT_LEVEL7
  234. case strings.Contains(filename, "_L8_"):
  235. filetype = schema.FT_LEVEL8
  236. case strings.Contains(filename, "MASTER"):
  237. filetype = schema.FT_MASTER
  238. case strings.Contains(filename, "EDGE"):
  239. filetype = schema.FT_EDGE
  240. default:
  241. err = merrs.NewError("filename does not conform to the agreed format " + filename)
  242. return
  243. }
  244. br, e := reader.NewBlockReader(filename, filetype, buf)
  245. if e != nil {
  246. return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
  247. }
  248. lastlogtime := time.Now()
  249. skiplines := int(linefrom)
  250. blockcount = blockfrom
  251. doinglines := []int64{}
  252. donelines := linefrom
  253. doneblocks := blockfrom
  254. retrycount = totalretrycount
  255. // maxresponsetime := time.Duration(0)
  256. var wg sync.WaitGroup
  257. defer importer.done()
  258. defer wg.Wait()
  259. for {
  260. if err != nil {
  261. return
  262. }
  263. block, line, linenumber, e := br.ReadBlock(skiplines)
  264. linecount = int64(linenumber)
  265. if e != nil {
  266. return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  267. }
  268. if block == nil {
  269. return
  270. }
  271. blockcount++
  272. wg.Add(1)
  273. doingline := linecount
  274. doingblock := blockcount
  275. if logstatus {
  276. doinglines = append(doinglines, doingline)
  277. }
  278. e = importer.odbcqueryrc.ConcurCall(1, func() {
  279. defer wg.Done()
  280. if err != nil {
  281. return
  282. }
  283. // st := time.Now()
  284. // defer func() {
  285. // ut := time.Since(st)
  286. // if ut > maxresponsetime {
  287. // maxresponsetime = ut
  288. // }
  289. // }()
  290. // logger.Info("G:", runtime.NumGoroutine(),
  291. // "RC:", importer.fileimportrc.ConcurCount(),
  292. // "WC:", importer.odbcqueryrc.ConcurCount(),
  293. // "RQ:", importer.fileimportrc.QueueCount(),
  294. // "WQ:", importer.odbcqueryrc.QueueCount(),
  295. // "maxresponsetime:", maxresponsetime)
  296. rc, e := importer.importRecord(block, line, filename, filetype, int(doingline))
  297. atomic.AddInt64(&retrycount, int64(rc))
  298. if e != nil {
  299. err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(doingline)}, {"line": line}})
  300. return
  301. }
  302. atomic.AddInt64(&doneblocks, 1)
  303. if logstatus {
  304. readinglines := doinglines[len(doinglines)-1]
  305. if doingline == doinglines[0] {
  306. importer.importstatus.mutex.Lock()
  307. importer.importstatus.ImportStatus[filename] = &ImportStatus{
  308. LinesCount: doingline,
  309. RecordsCount: doingblock,
  310. RetryCount: retrycount,
  311. }
  312. donelines = doingline
  313. importer.importstatus.TotalUseTime = time.Since(importer.starttime)
  314. importer.importstatus.Save()
  315. doinglines = doinglines[1:]
  316. if time.Since(lastlogtime) > 5*time.Second {
  317. if retrycount > 0 {
  318. logger.Info("file", filename, "read", readinglines, "lines,", "importing", len(doinglines), "chunks,", "imported", donelines, "lines", doneblocks, "records", retrycount, "retry times")
  319. } else {
  320. logger.Info("file", filename, "read", readinglines, "lines,", "importing", len(doinglines), "chunks,", "imported", donelines, "lines", doneblocks, "records")
  321. }
  322. lastlogtime = time.Now()
  323. }
  324. importer.importstatus.mutex.Unlock()
  325. } else {
  326. for i, l := range doinglines {
  327. if l == doingline {
  328. doinglines = append(doinglines[:i], doinglines[i+1:]...)
  329. break
  330. }
  331. }
  332. if time.Since(lastlogtime) > 5*time.Second {
  333. if retrycount > 0 {
  334. logger.Info("file", filename, "read", readinglines, "lines,", "importing", len(doinglines), "chunks,", "imported", donelines, "lines", doneblocks, "records", retrycount, "retry times")
  335. } else {
  336. logger.Info("file", filename, "read", readinglines, "lines,", "importing", len(doinglines), "chunks,", "imported", donelines, "lines", doneblocks, "records")
  337. }
  338. lastlogtime = time.Now()
  339. }
  340. }
  341. }
  342. })
  343. if e != nil {
  344. return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  345. }
  346. }
  347. }
  348. func (importer *Importer) importRecord(record map[string]any, line string, filename string, filetype schema.FileType, linecount int) (retrycount int, err error) {
  349. if odbc.LogDebug {
  350. bs, e := json.MarshalIndent(record, "", " ")
  351. if e != nil {
  352. return 0, merrs.NewError(e)
  353. }
  354. logger.Trace(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs)))
  355. }
  356. var classaliasname string
  357. switch filetype {
  358. case schema.FT_EDGE:
  359. graph.CacheEdgeInfo(record)
  360. default:
  361. classaliasname = string(filetype)
  362. retrycount, err = importer.odbcimporter.InsertData(classaliasname, record)
  363. if err != nil {
  364. err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  365. return
  366. }
  367. }
  368. return
  369. }
  370. func (importer *Importer) alldone() {
  371. importer.odbcimporter.alldone()
  372. }
  373. func (importer *Importer) done() {
  374. importer.odbcimporter.done()
  375. }
  376. func Check() {
  377. client := odbc.ODBClient
  378. if client == nil {
  379. return
  380. }
  381. {
  382. // mql := "select id,uniqueid,tags,contain,day,vtime from level1 where uniqueid='E2E:OTR0002L'"
  383. mql := "select * from level1 where uniqueid='E2E:OTR0002L'"
  384. r, e := client.Query(mql).Do()
  385. if e != nil {
  386. panic(merrs.NewError(e))
  387. }
  388. bs, _ := json.MarshalIndent(r.Data, "", " ")
  389. fmt.Println(string(bs))
  390. }
  391. {
  392. mql := `match ("level1:E2E:OTR0002L")-[*]->(),("level1:E2E:OTR0002L")<-[*]-() with namespace="m3cnet", fields="uniqueid,distname" in "level1","level2"`
  393. r, e := client.Query(mql).Do()
  394. if e != nil {
  395. panic(merrs.NewError(e))
  396. }
  397. bs, _ := json.MarshalIndent(r.Data, "", " ")
  398. fmt.Println(string(bs))
  399. }
  400. }