importer.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419
  1. package importer
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path/filepath"
  8. "regexp"
  9. "runtime"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. "git.wecise.com/wecise/cgimport/graph"
  15. "git.wecise.com/wecise/cgimport/odbc"
  16. "git.wecise.com/wecise/cgimport/reader"
  17. "git.wecise.com/wecise/cgimport/schema"
  18. "github.com/wecisecode/util/filewalker"
  19. "github.com/wecisecode/util/merrs"
  20. "github.com/wecisecode/util/rc"
  21. )
  22. var mcfg = odbc.Config
  23. var logger = odbc.Logger
  24. type Importer struct {
  25. datapath string
  26. parallel int
  27. rebuild bool
  28. reload bool
  29. importstatus *CGIStatus
  30. fileimportrc *rc.RoutinesController
  31. odbcqueryrc *rc.RoutinesController
  32. odbcimporter *ODBCImporter
  33. starttime time.Time
  34. currentstarttime time.Time
  35. }
  36. func ImportDir(datapath string, parallel int, rebuild, reload bool) (totalfilescount, totallinecount, totalrecordscount, totalretrycount int64, totalusetime time.Duration, filescount, linescount, recordscount, retrycount int64, usetime time.Duration, err error) {
  37. concurlimt := mcfg.GetInt("odbc.concurrent.limit", runtime.GOMAXPROCS(0))
  38. importer := &Importer{
  39. datapath: datapath,
  40. parallel: parallel,
  41. rebuild: rebuild,
  42. reload: reload,
  43. importstatus: NewCGIStatus(),
  44. fileimportrc: rc.NewRoutinesController("", parallel),
  45. odbcqueryrc: rc.NewRoutinesControllerLimit("", concurlimt, concurlimt*5),
  46. odbcimporter: NewODBCImporter(),
  47. }
  48. return importer.Import()
  49. }
  50. func (importer *Importer) Import() (totalfilescount, totallinecount, totalrecordscount, totalretrycount int64, totalusetime time.Duration, filescount, linescount, recordscount, retrycount int64, usetime time.Duration, err error) {
  51. if odbc.DevPhase&odbc.DP_PROCESSCONTINUE != 0 && !importer.reload && !importer.rebuild {
  52. // reload
  53. err = importer.importstatus.Load()
  54. if err != nil {
  55. return
  56. }
  57. err = importer.odbcimporter.init()
  58. if err != nil {
  59. return
  60. }
  61. }
  62. if importer.rebuild {
  63. // rebuild
  64. // 清除已有类
  65. err = importer.odbcimporter.rebuild()
  66. if err != nil {
  67. return
  68. }
  69. }
  70. // 建类
  71. err = importer.odbcimporter.ReviseClassStruct()
  72. if err != nil {
  73. return
  74. }
  75. //
  76. logger.Info("graph data import start")
  77. totalusetime = importer.importstatus.TotalUseTime
  78. importer.starttime = time.Now().Add(-totalusetime)
  79. importer.currentstarttime = time.Now()
  80. reedgefile := regexp.MustCompile("(?i).*edge.*.csv")
  81. efc, elc, erc, etc, ut, e := importer.ImportEdgeFiles(reedgefile, false)
  82. if e != nil {
  83. err = e
  84. return
  85. }
  86. afc, alc, arc, atc, ut, e := importer.ImportNonEdgeFiles(reedgefile, true)
  87. if e != nil {
  88. err = e
  89. return
  90. }
  91. totalfilescount = int64(len(importer.importstatus.ImportStatus)) + efc
  92. for _, v := range importer.importstatus.ImportStatus {
  93. totallinecount += v.LinesCount
  94. totalrecordscount += v.RecordsCount
  95. totalretrycount += v.RetryCount
  96. }
  97. totallinecount += elc
  98. totalrecordscount += erc
  99. totalretrycount += etc
  100. filescount = afc + efc
  101. linescount = alc + elc
  102. recordscount = arc + erc
  103. retrycount = atc + etc
  104. usetime = ut
  105. totalusetime = importer.importstatus.TotalUseTime
  106. importer.importstatus.WaitSaveDone()
  107. importer.alldone()
  108. return
  109. }
  110. func (importer *Importer) ImportEdgeFiles(reedgefile *regexp.Regexp, logstatus bool) (filescount, linecount, recordscount, retrycount int64, usetime time.Duration, err error) {
  111. return importer.ImportFiles(func(basedir string, fpath string) FWOP {
  112. if !reedgefile.MatchString(filepath.Base(fpath)) {
  113. // 忽略非EDGE文件
  114. return FWOP_IGNORE
  115. }
  116. return FWOP_CONTINUE
  117. }, logstatus)
  118. }
  119. func (importer *Importer) ImportNonEdgeFiles(reedgefile *regexp.Regexp, logstatus bool) (filescount, linecount, recordscount, retrycount int64, usetime time.Duration, err error) {
  120. return importer.ImportFiles(func(basedir string, fpath string) FWOP {
  121. if reedgefile.MatchString(filepath.Base(fpath)) {
  122. // 忽略EDGE文件
  123. return FWOP_IGNORE
  124. }
  125. return FWOP_CONTINUE
  126. }, logstatus)
  127. }
  128. type FWOP int
  129. const (
  130. FWOP_IGNORE FWOP = iota + 1
  131. FWOP_BREAK
  132. FWOP_CONTINUE
  133. )
  134. func (importer *Importer) ImportFiles(fwop func(basedir string, fpath string) FWOP, logstatus bool) (filescount, linescount, recordscount, retrycount int64, usetime time.Duration, err error) {
  135. // 遍历文件目录
  136. var wg sync.WaitGroup
  137. fw, e := filewalker.NewFileWalker([]string{importer.datapath}, ".*")
  138. if e != nil {
  139. err = e
  140. return
  141. }
  142. e = fw.List(func(basedir string, fpath string) bool {
  143. if err != nil {
  144. // 前方发生错误,结束遍历
  145. return false
  146. }
  147. if strings.Contains(fpath, string(filepath.Separator)) {
  148. // 忽略子目录,fw.List有序,目录排在文件后面,遇到子目录即可结束遍历
  149. return false
  150. }
  151. switch fwop(basedir, fpath) {
  152. case FWOP_IGNORE:
  153. // 忽略当前文件,继续处理下一文件
  154. return true
  155. case FWOP_BREAK:
  156. // 结束遍历
  157. return false
  158. case FWOP_CONTINUE:
  159. default:
  160. }
  161. // 继续处理当前文件
  162. filename := filepath.Join(basedir, fpath)
  163. filescount++
  164. wg.Add(1)
  165. // 并发处理
  166. importer.fileimportrc.ConcurCall(1,
  167. func() {
  168. defer wg.Done()
  169. importer.importstatus.mutex.RLock()
  170. importstatus := importer.importstatus.ImportStatus[filename]
  171. importer.importstatus.mutex.RUnlock()
  172. linefrom, blockfrom := int64(0), int64(0)
  173. totalretrycount := int64(0)
  174. if importstatus != nil {
  175. linefrom, blockfrom, totalretrycount = importstatus.LinesCount, importstatus.RecordsCount, importstatus.RetryCount
  176. }
  177. if linefrom == 0 {
  178. logger.Info("import", "file", filename)
  179. } else {
  180. logger.Info("import", "file", filename, "from line", linefrom)
  181. }
  182. lines, records, retries, e := importer.ImportFile(filename, linefrom, blockfrom, totalretrycount, logstatus)
  183. if e != nil {
  184. err = e
  185. return
  186. }
  187. atomic.AddInt64(&linescount, lines-linefrom)
  188. atomic.AddInt64(&recordscount, records-blockfrom)
  189. atomic.AddInt64(&retrycount, retries-totalretrycount)
  190. usetime = time.Since(importer.currentstarttime)
  191. importer.importstatus.mutex.Lock()
  192. if logstatus {
  193. importer.importstatus.ImportStatus[filename] = &ImportStatus{
  194. LinesCount: lines,
  195. RecordsCount: records,
  196. RetryCount: retries,
  197. }
  198. }
  199. importer.importstatus.TotalUseTime = time.Since(importer.starttime)
  200. importer.importstatus.mutex.Unlock()
  201. importer.importstatus.Save()
  202. if retries > 0 {
  203. logger.Info("file", filename, "total imported", lines, "lines", records, "records", retries, "retry times")
  204. } else {
  205. logger.Info("file", filename, "total imported", lines, "lines", records, "records")
  206. }
  207. },
  208. )
  209. return true
  210. })
  211. wg.Wait()
  212. if e != nil {
  213. if os.IsNotExist(e) {
  214. err = merrs.NewError(`directory "`+importer.datapath+`" not exist specified by "datapath"`, e)
  215. } else {
  216. err = merrs.NewError(e)
  217. }
  218. return
  219. }
  220. return
  221. }
  222. func (importer *Importer) ImportFile(filepath string, linefrom, blockfrom, totalretrycount int64, logstatus bool) (linecount, blockcount, retrycount int64, err error) {
  223. f, e := os.Open(filepath)
  224. if e != nil {
  225. return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filepath}})
  226. }
  227. defer f.Close()
  228. return importer.importReader(filepath, f, linefrom, blockfrom, totalretrycount, logstatus)
  229. }
  230. func (importer *Importer) importReader(filename string, buf io.Reader, linefrom, blockfrom, totalretrycount int64, logstatus bool) (linecount, blockcount, retrycount int64, err error) {
  231. var filetype schema.FileType
  232. switch {
  233. case strings.Contains(filename, "_L1_"):
  234. filetype = schema.FT_LEVEL1
  235. case strings.Contains(filename, "_L2_"):
  236. filetype = schema.FT_LEVEL2
  237. case strings.Contains(filename, "_L3_"):
  238. filetype = schema.FT_LEVEL3
  239. case strings.Contains(filename, "_L4_"):
  240. filetype = schema.FT_LEVEL4
  241. case strings.Contains(filename, "_L5_"):
  242. filetype = schema.FT_LEVEL5
  243. case strings.Contains(filename, "_L6_"):
  244. filetype = schema.FT_LEVEL6
  245. case strings.Contains(filename, "_L7_"):
  246. filetype = schema.FT_LEVEL7
  247. case strings.Contains(filename, "_L8_"):
  248. filetype = schema.FT_LEVEL8
  249. case strings.Contains(filename, "MASTER"):
  250. filetype = schema.FT_MASTER
  251. case strings.Contains(filename, "EDGE"):
  252. filetype = schema.FT_EDGE
  253. default:
  254. err = merrs.NewError("filename does not conform to the agreed format " + filename)
  255. return
  256. }
  257. br, e := reader.NewBlockReader(filename, filetype, buf)
  258. if e != nil {
  259. return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
  260. }
  261. lastlogtime := time.Now()
  262. skiplines := int(linefrom)
  263. blockcount = blockfrom
  264. doinglines := []int64{}
  265. retrycount = totalretrycount
  266. // maxresponsetime := time.Duration(0)
  267. var wg sync.WaitGroup
  268. defer importer.done()
  269. defer wg.Wait()
  270. for {
  271. if err != nil {
  272. return
  273. }
  274. block, line, linenumber, e := br.ReadBlock(skiplines)
  275. linecount = int64(linenumber)
  276. if e != nil {
  277. return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  278. }
  279. if block == nil {
  280. return
  281. }
  282. blockcount++
  283. wg.Add(1)
  284. doingline := linecount
  285. doingblock := blockcount
  286. if logstatus {
  287. doinglines = append(doinglines, doingline)
  288. }
  289. e = importer.odbcqueryrc.ConcurCall(1, func() {
  290. defer wg.Done()
  291. if err != nil {
  292. return
  293. }
  294. // st := time.Now()
  295. // defer func() {
  296. // ut := time.Since(st)
  297. // if ut > maxresponsetime {
  298. // maxresponsetime = ut
  299. // }
  300. // }()
  301. // logger.Info("G:", runtime.NumGoroutine(),
  302. // "RC:", importer.fileimportrc.ConcurCount(),
  303. // "WC:", importer.odbcqueryrc.ConcurCount(),
  304. // "RQ:", importer.fileimportrc.QueueCount(),
  305. // "WQ:", importer.odbcqueryrc.QueueCount(),
  306. // "maxresponsetime:", maxresponsetime)
  307. rc, e := importer.importRecord(block, line, filename, filetype, int(doingline))
  308. atomic.AddInt64(&retrycount, int64(rc))
  309. if e != nil {
  310. err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(doingline)}, {"line": line}})
  311. return
  312. }
  313. if logstatus {
  314. if doingline == doinglines[0] {
  315. importer.importstatus.mutex.Lock()
  316. importer.importstatus.ImportStatus[filename] = &ImportStatus{
  317. LinesCount: doingline,
  318. RecordsCount: doingblock,
  319. RetryCount: retrycount,
  320. }
  321. importer.importstatus.TotalUseTime = time.Since(importer.starttime)
  322. importer.importstatus.Save()
  323. doinglines = doinglines[1:]
  324. if time.Since(lastlogtime) > 5*time.Second {
  325. if retrycount > 0 {
  326. logger.Info("file", filename, "imported", doingline, "lines", doingblock, "records", retrycount, "retry times")
  327. } else {
  328. logger.Info("file", filename, "imported", doingline, "lines", doingblock, "records")
  329. }
  330. lastlogtime = time.Now()
  331. }
  332. importer.importstatus.mutex.Unlock()
  333. } else {
  334. for i, l := range doinglines {
  335. if l == doingline {
  336. doinglines = append(doinglines[:i], doinglines[i+1:]...)
  337. }
  338. }
  339. }
  340. }
  341. })
  342. if e != nil {
  343. return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  344. }
  345. }
  346. }
  347. func (importer *Importer) importRecord(record map[string]any, line string, filename string, filetype schema.FileType, linecount int) (retrycount int, err error) {
  348. if odbc.LogDebug {
  349. bs, e := json.MarshalIndent(record, "", " ")
  350. if e != nil {
  351. return 0, merrs.NewError(e)
  352. }
  353. logger.Trace(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs)))
  354. }
  355. var classaliasname string
  356. switch filetype {
  357. case schema.FT_EDGE:
  358. graph.CacheEdgeInfo(record)
  359. default:
  360. classaliasname = string(filetype)
  361. retrycount, err = importer.odbcimporter.InsertData(classaliasname, record)
  362. if err != nil {
  363. err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  364. return
  365. }
  366. }
  367. return
  368. }
  369. func (importer *Importer) alldone() {
  370. importer.odbcimporter.alldone()
  371. }
  372. func (importer *Importer) done() {
  373. importer.odbcimporter.done()
  374. }
  375. func Check() {
  376. client := odbc.ODBClient
  377. if client == nil {
  378. return
  379. }
  380. {
  381. // mql := "select id,uniqueid,tags,contain,day,vtime from level1 where uniqueid='E2E:OTR0002L'"
  382. mql := "select * from level1 where uniqueid='E2E:OTR0002L'"
  383. r, e := client.Query(mql).Do()
  384. if e != nil {
  385. panic(merrs.NewError(e))
  386. }
  387. bs, _ := json.MarshalIndent(r.Data, "", " ")
  388. fmt.Println(string(bs))
  389. }
  390. {
  391. mql := `match ("level1:E2E:OTR0002L")-[*]->(),("level1:E2E:OTR0002L")<-[*]-() in "level1","level2"`
  392. r, e := client.Query(mql).Do()
  393. if e != nil {
  394. panic(merrs.NewError(e))
  395. }
  396. bs, _ := json.MarshalIndent(r.Data, "", " ")
  397. fmt.Println(string(bs))
  398. }
  399. }