importer.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443
  1. package importer
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path/filepath"
  8. "regexp"
  9. "runtime"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. "git.wecise.com/wecise/cgimport/graph"
  15. "git.wecise.com/wecise/cgimport/odbc"
  16. "git.wecise.com/wecise/cgimport/reader"
  17. "git.wecise.com/wecise/cgimport/schema"
  18. "github.com/wecisecode/util/cmap"
  19. "github.com/wecisecode/util/filewalker"
  20. "github.com/wecisecode/util/merrs"
  21. "github.com/wecisecode/util/pattern"
  22. "github.com/wecisecode/util/rc"
  23. )
  24. var mcfg = odbc.Config
  25. var logger = odbc.Logger
  26. type Importer struct {
  27. datapath string
  28. parallel int
  29. rebuild bool
  30. reload bool
  31. importstatus *CGIStatus
  32. fileimportrc *rc.RoutinesController
  33. odbcqueryrc *rc.RoutinesController
  34. odbcimporter *ODBCImporter
  35. starttime time.Time
  36. currentstarttime time.Time
  37. lastlogtime cmap.ConcurrentMap[string, time.Time]
  38. }
  39. func ImportDir(datapath string, parallel int, rebuild, reload bool) (totalfilescount, totallinecount, totalrecordscount, totalretrycount int64, totalusetime time.Duration, filescount, linescount, recordscount, retrycount int64, usetime time.Duration, err error) {
  40. concurlimt := mcfg.GetInt("odbc.concurrent.limit", runtime.GOMAXPROCS(0))
  41. importer := &Importer{
  42. datapath: datapath,
  43. parallel: parallel,
  44. rebuild: rebuild,
  45. reload: reload,
  46. importstatus: NewCGIStatus(),
  47. fileimportrc: rc.NewRoutinesController("", parallel),
  48. odbcqueryrc: rc.NewRoutinesControllerLimit("", concurlimt, concurlimt*5),
  49. odbcimporter: NewODBCImporter(),
  50. lastlogtime: cmap.NewSingle[string, time.Time](),
  51. }
  52. return importer.Import()
  53. }
  54. func (importer *Importer) Import() (totalfilescount, totallinecount, totalrecordscount, totalretrycount int64, totalusetime time.Duration, filescount, linescount, recordscount, retrycount int64, usetime time.Duration, err error) {
  55. if odbc.DevPhase&odbc.DP_PROCESSCONTINUE != 0 && !importer.reload && !importer.rebuild {
  56. // reload
  57. err = importer.importstatus.Load()
  58. if err != nil {
  59. return
  60. }
  61. }
  62. err = importer.odbcimporter.init(importer.rebuild)
  63. if err != nil {
  64. return
  65. }
  66. //
  67. logger.Info("graph data import start")
  68. totalusetime = importer.importstatus.TotalUseTime
  69. importer.starttime = time.Now().Add(-totalusetime)
  70. importer.currentstarttime = time.Now()
  71. reedgefile := regexp.MustCompile("(?i).*edge.*.csv")
  72. efc, elc, erc, etc, ut, e := importer.ImportEdgeFiles(reedgefile, false)
  73. if e != nil {
  74. err = e
  75. return
  76. }
  77. afc, alc, arc, atc, ut, e := importer.ImportNonEdgeFiles(reedgefile, true)
  78. if e != nil {
  79. err = e
  80. return
  81. }
  82. totalfilescount = int64(len(importer.importstatus.ImportStatus)) + efc
  83. for _, v := range importer.importstatus.ImportStatus {
  84. totallinecount += v.LinesCount
  85. totalrecordscount += v.RecordsCount
  86. totalretrycount += v.RetryCount
  87. }
  88. totallinecount += elc
  89. totalrecordscount += erc
  90. totalretrycount += etc
  91. filescount = afc + efc
  92. linescount = alc + elc
  93. recordscount = arc + erc
  94. retrycount = atc + etc
  95. usetime = ut
  96. totalusetime = importer.importstatus.TotalUseTime
  97. importer.importstatus.WaitSaveDone()
  98. importer.alldone()
  99. return
  100. }
  101. func (importer *Importer) ImportEdgeFiles(reedgefile *regexp.Regexp, logstatus bool) (filescount, linecount, recordscount, retrycount int64, usetime time.Duration, err error) {
  102. return importer.ImportFiles(func(basedir string, fpath string) FWOP {
  103. if !reedgefile.MatchString(filepath.Base(fpath)) {
  104. // 忽略非EDGE文件
  105. return FWOP_IGNORE
  106. }
  107. return FWOP_CONTINUE
  108. }, logstatus)
  109. }
  110. func (importer *Importer) ImportNonEdgeFiles(reedgefile *regexp.Regexp, logstatus bool) (filescount, linecount, recordscount, retrycount int64, usetime time.Duration, err error) {
  111. return importer.ImportFiles(func(basedir string, fpath string) FWOP {
  112. if reedgefile.MatchString(filepath.Base(fpath)) {
  113. // 忽略EDGE文件
  114. return FWOP_IGNORE
  115. }
  116. return FWOP_CONTINUE
  117. }, logstatus)
  118. }
  119. type FWOP int
  120. const (
  121. FWOP_IGNORE FWOP = iota + 1
  122. FWOP_BREAK
  123. FWOP_CONTINUE
  124. )
  125. func (importer *Importer) ImportFiles(fwop func(basedir string, fpath string) FWOP, logstatus bool) (filescount, linescount, recordscount, retrycount int64, usetime time.Duration, err error) {
  126. // 遍历文件目录
  127. var wg sync.WaitGroup
  128. fw, e := filewalker.NewFileWalker([]string{importer.datapath}, `^[^\.].*`)
  129. if e != nil {
  130. err = e
  131. return
  132. }
  133. e = fw.List(func(basedir string, fpath string) bool {
  134. if err != nil {
  135. // 前方发生错误,结束遍历
  136. return false
  137. }
  138. if strings.Contains(fpath, string(filepath.Separator)) {
  139. // 忽略子目录,fw.List有序,目录排在文件后面,遇到子目录即可结束遍历
  140. return false
  141. }
  142. switch fwop(basedir, fpath) {
  143. case FWOP_IGNORE:
  144. // 忽略当前文件,继续处理下一文件
  145. return true
  146. case FWOP_BREAK:
  147. // 结束遍历
  148. return false
  149. case FWOP_CONTINUE:
  150. default:
  151. }
  152. // 继续处理当前文件
  153. filename := filepath.Join(basedir, fpath)
  154. filescount++
  155. wg.Add(1)
  156. // 并发处理
  157. importer.fileimportrc.ConcurCall(1,
  158. func() {
  159. defer wg.Done()
  160. importer.importstatus.mutex.RLock()
  161. importstatus := importer.importstatus.ImportStatus[filename]
  162. importer.importstatus.mutex.RUnlock()
  163. linefrom, blockfrom := int64(0), int64(0)
  164. totalretrycount := int64(0)
  165. if importstatus != nil {
  166. linefrom, blockfrom, totalretrycount = importstatus.LinesCount, importstatus.RecordsCount, importstatus.RetryCount
  167. }
  168. if linefrom == 0 {
  169. logger.Info("import", "file", filename)
  170. } else {
  171. logger.Info("import", "file", filename, "from line", linefrom)
  172. }
  173. lines, records, retries, e := importer.ImportFile(filename, linefrom, blockfrom, totalretrycount, logstatus)
  174. if e != nil {
  175. err = e
  176. return
  177. }
  178. atomic.AddInt64(&linescount, lines-linefrom)
  179. atomic.AddInt64(&recordscount, records-blockfrom)
  180. atomic.AddInt64(&retrycount, retries-totalretrycount)
  181. usetime = time.Since(importer.currentstarttime)
  182. importer.importstatus.mutex.Lock()
  183. if logstatus {
  184. importer.importstatus.ImportStatus[filename] = &ImportStatus{
  185. LinesCount: lines,
  186. RecordsCount: records,
  187. RetryCount: retries,
  188. }
  189. }
  190. importer.importstatus.TotalUseTime = time.Since(importer.starttime)
  191. importer.importstatus.mutex.Unlock()
  192. importer.importstatus.Save()
  193. if retries > 0 {
  194. logger.Info("file", filename, "total imported", lines, "lines", records, "records", retries, "retry times")
  195. } else {
  196. logger.Info("file", filename, "total imported", lines, "lines", records, "records")
  197. }
  198. },
  199. )
  200. return true
  201. })
  202. wg.Wait()
  203. if e != nil {
  204. if os.IsNotExist(e) {
  205. err = merrs.NewError(`directory "`+importer.datapath+`" not exist specified by "datapath"`, e)
  206. } else {
  207. err = merrs.NewError(e)
  208. }
  209. return
  210. }
  211. return
  212. }
  213. func (importer *Importer) ImportFile(filepath string, linefrom, blockfrom, totalretrycount int64, logstatus bool) (linecount, blockcount, retrycount int64, err error) {
  214. f, e := os.Open(filepath)
  215. if e != nil {
  216. return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filepath}})
  217. }
  218. defer f.Close()
  219. return importer.importReader(filepath, f, linefrom, blockfrom, totalretrycount, logstatus)
  220. }
  221. var fileclassmapping = cmap.NewSingle[string, *regexp.Regexp]()
  222. func init() {
  223. fileclassmapping.Set("EDGE", regexp.MustCompile(pattern.Contain2RegexpString(`EDGE`)))
  224. fileclassmapping.Set("master", regexp.MustCompile(pattern.Contain2RegexpString(`MASTER`)))
  225. fileclassmapping.Set("level1", regexp.MustCompile(pattern.Contain2RegexpString(`_L1_`)))
  226. fileclassmapping.Set("level2", regexp.MustCompile(pattern.Contain2RegexpString(`_L2_`)))
  227. fileclassmapping.Set("level3", regexp.MustCompile(pattern.Contain2RegexpString(`_L3_`)))
  228. fileclassmapping.Set("level4", regexp.MustCompile(pattern.Contain2RegexpString(`_L4_`)))
  229. fileclassmapping.Set("level5", regexp.MustCompile(pattern.Contain2RegexpString(`_L5_`)))
  230. fileclassmapping.Set("level6", regexp.MustCompile(pattern.Contain2RegexpString(`_L6_`)))
  231. fileclassmapping.Set("level7", regexp.MustCompile(pattern.Contain2RegexpString(`_L7_`)))
  232. fileclassmapping.Set("level8", regexp.MustCompile(pattern.Contain2RegexpString(`_L8_`)))
  233. keys := mcfg.Keys()
  234. for _, key := range keys {
  235. if strings.HasPrefix(key, "mapping.class.") {
  236. classname := key[len("mapping.class."):]
  237. filepatterns := mcfg.GetStrings(key)
  238. for _, fp := range filepatterns {
  239. fp = pattern.Wildcard2RegexpString(fp)
  240. fileclassmapping.Set(classname, regexp.MustCompile(fp))
  241. }
  242. }
  243. }
  244. }
  245. func (importer *Importer) importReader(filename string, buf io.Reader, linefrom, blockfrom, totalretrycount int64, logstatus bool) (linecount, blockcount, retrycount int64, err error) {
  246. classname := ""
  247. fileclassmapping.Fetch(func(key string, v *regexp.Regexp) bool {
  248. if v.MatchString(filename) {
  249. classname = key
  250. return false
  251. }
  252. return true
  253. })
  254. if classname == "" {
  255. err = merrs.NewError("cannot mapping to class, filename:" + filename)
  256. return
  257. }
  258. br, e := reader.NewBlockReader(filename, classname, buf)
  259. if e != nil {
  260. return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
  261. }
  262. skiplines := int(linefrom)
  263. blockcount = blockfrom
  264. doinglines := []int64{}
  265. donelines := linefrom
  266. doneblocks := blockfrom
  267. savedlines := linefrom
  268. savedblocks := blockfrom
  269. retrycount = totalretrycount
  270. linecount = linefrom
  271. // maxresponsetime := time.Duration(0)
  272. var wg sync.WaitGroup
  273. defer importer.done()
  274. defer wg.Wait()
  275. for {
  276. if err != nil {
  277. return
  278. }
  279. lastlinecount := linecount
  280. block, line, linenumber, e := br.ReadBlock(skiplines)
  281. linecount = int64(linenumber)
  282. if e != nil {
  283. return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  284. }
  285. if block == nil {
  286. return
  287. }
  288. blockcount++
  289. wg.Add(1)
  290. doingline := linecount
  291. doingblock := blockcount
  292. if logstatus {
  293. doinglines = append(doinglines, doingline)
  294. }
  295. e = importer.odbcqueryrc.ConcurCall(1, func() {
  296. defer wg.Done()
  297. if err != nil {
  298. return
  299. }
  300. // st := time.Now()
  301. // defer func() {
  302. // ut := time.Since(st)
  303. // if ut > maxresponsetime {
  304. // maxresponsetime = ut
  305. // }
  306. // }()
  307. // logger.Info("G:", runtime.NumGoroutine(),
  308. // "RC:", importer.fileimportrc.ConcurCount(),
  309. // "WC:", importer.odbcqueryrc.ConcurCount(),
  310. // "RQ:", importer.fileimportrc.QueueCount(),
  311. // "WQ:", importer.odbcqueryrc.QueueCount(),
  312. // "maxresponsetime:", maxresponsetime)
  313. rc, e := importer.importRecord(block, line, filename, classname, int(doingline))
  314. atomic.AddInt64(&retrycount, int64(rc))
  315. if e != nil {
  316. err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(doingline)}, {"line": line}})
  317. return
  318. }
  319. atomic.AddInt64(&donelines, doingline-lastlinecount)
  320. atomic.AddInt64(&doneblocks, 1)
  321. if logstatus {
  322. readinglines := doinglines[len(doinglines)-1]
  323. if doingline == doinglines[0] {
  324. importer.importstatus.mutex.Lock()
  325. importer.importstatus.ImportStatus[filename] = &ImportStatus{
  326. LinesCount: doingline,
  327. RecordsCount: doingblock,
  328. RetryCount: retrycount,
  329. }
  330. importer.importstatus.TotalUseTime = time.Since(importer.starttime)
  331. importer.importstatus.Save()
  332. savedlines = doingline
  333. savedblocks = doingblock
  334. doinglines = doinglines[1:]
  335. importer.importstatus.mutex.Unlock()
  336. } else {
  337. for i, l := range doinglines {
  338. if l == doingline {
  339. doinglines = append(doinglines[:i], doinglines[i+1:]...)
  340. break
  341. }
  342. }
  343. }
  344. importer.logInfo(filename, readinglines, doinglines, donelines, doneblocks, savedlines, savedblocks, retrycount)
  345. }
  346. })
  347. if e != nil {
  348. return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  349. }
  350. }
  351. }
  352. func (importer *Importer) logInfo(filename string, readinglines int64, doinglines []int64, donelines, doneblocks, savedlines, savedblocks, retrycount int64) {
  353. if time.Since(importer.lastlogtime.GetIFPresent(filename)) > 5*time.Second {
  354. if odbc.LogDebug {
  355. if retrycount > 0 {
  356. logger.Info("file", filename, "read", readinglines, "lines,", "importing", len(doinglines), "chunks,", "imported", donelines, "lines", doneblocks, "records,", "saved", savedlines, "lines", savedblocks, "records", retrycount, "retry times")
  357. } else {
  358. logger.Info("file", filename, "read", readinglines, "lines,", "importing", len(doinglines), "chunks,", "imported", donelines, "lines", doneblocks, "records,", "saved", savedlines, "lines", savedblocks, "records")
  359. }
  360. } else {
  361. if retrycount > 0 {
  362. logger.Info("file", filename, "read", readinglines, "lines,", "importing", len(doinglines), "chunks,", "imported", donelines, "lines", doneblocks, "records", retrycount, "retry times")
  363. } else {
  364. logger.Info("file", filename, "read", readinglines, "lines,", "importing", len(doinglines), "chunks,", "imported", donelines, "lines", doneblocks, "records")
  365. }
  366. }
  367. importer.lastlogtime.Set(filename, time.Now())
  368. }
  369. }
  370. func (importer *Importer) importRecord(record map[string]any, line string, filename string, classaliasname string, linecount int) (retrycount int, err error) {
  371. if odbc.LogDebug {
  372. bs, e := json.MarshalIndent(record, "", " ")
  373. if e != nil {
  374. return 0, merrs.NewError(e)
  375. }
  376. logger.Trace(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs)))
  377. }
  378. switch classaliasname {
  379. case schema.EDGE:
  380. graph.CacheEdgeInfo(record)
  381. default:
  382. retrycount, err = importer.odbcimporter.InsertData(classaliasname, record)
  383. if err != nil {
  384. err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  385. return
  386. }
  387. }
  388. return
  389. }
  390. func (importer *Importer) alldone() {
  391. importer.odbcimporter.alldone()
  392. }
  393. func (importer *Importer) done() {
  394. importer.odbcimporter.done()
  395. }
  396. func Check() {
  397. client := odbc.ODBClient
  398. if client == nil {
  399. return
  400. }
  401. {
  402. // mql := "select id,uniqueid,tags,contain,day,vtime from level1 where uniqueid='E2E:OTR0002L'"
  403. mql := "select * from level1 where uniqueid='E2E:OTR0002L'"
  404. r, e := client.Query(mql).Do()
  405. if e != nil {
  406. panic(merrs.NewError(e))
  407. }
  408. bs, _ := json.MarshalIndent(r.Data, "", " ")
  409. fmt.Println(string(bs))
  410. }
  411. {
  412. mql := `match ("level1:E2E:OTR0002L")-[*]->(),("level1:E2E:OTR0002L")<-[*]-() with namespace="m3cnet", fields="uniqueid,distname" in "level1","level2"`
  413. r, e := client.Query(mql).Do()
  414. if e != nil {
  415. panic(merrs.NewError(e))
  416. }
  417. bs, _ := json.MarshalIndent(r.Data, "", " ")
  418. fmt.Println(string(bs))
  419. }
  420. }