importer.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404
  1. package importer
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path/filepath"
  8. "regexp"
  9. "strings"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "git.wecise.com/wecise/cgimport/graph"
  14. "git.wecise.com/wecise/cgimport/odbc"
  15. "git.wecise.com/wecise/cgimport/reader"
  16. "git.wecise.com/wecise/cgimport/schema"
  17. "github.com/wecisecode/util/filewalker"
  18. "github.com/wecisecode/util/merrs"
  19. "github.com/wecisecode/util/rc"
  20. )
  21. var mcfg = odbc.Config
  22. var logger = odbc.Logger
  23. type Importer struct {
  24. datapath string
  25. parallel int
  26. rebuild bool
  27. reload bool
  28. importstatus *CGIStatus
  29. fileimportrc *rc.RoutinesController
  30. odbcqueryrc *rc.RoutinesController
  31. odbcimporter *ODBCImporter
  32. starttime time.Time
  33. currentstarttime time.Time
  34. }
  35. func ImportDir(datapath string, parallel int, rebuild, reload bool) (totalfilescount, totallinecount, totalrecordscount, totalretrycount int64, totalusetime time.Duration, filescount, linescount, recordscount, retrycount int64, usetime time.Duration, err error) {
  36. concurlimt := mcfg.GetInt("odbc.concurrent.limit", 100)
  37. importer := &Importer{
  38. datapath: datapath,
  39. parallel: parallel,
  40. rebuild: rebuild,
  41. reload: reload,
  42. importstatus: NewCGIStatus(),
  43. fileimportrc: rc.NewRoutinesController("", parallel),
  44. odbcqueryrc: rc.NewRoutinesControllerLimit("", concurlimt, concurlimt*2),
  45. odbcimporter: NewODBCImporter(),
  46. }
  47. return importer.Import()
  48. }
  49. func (importer *Importer) Import() (totalfilescount, totallinecount, totalrecordscount, totalretrycount int64, totalusetime time.Duration, filescount, linescount, recordscount, retrycount int64, usetime time.Duration, err error) {
  50. if odbc.DevPhase&odbc.DP_PROCESSCONTINUE != 0 && !importer.reload && !importer.rebuild {
  51. // reload
  52. err = importer.importstatus.Load()
  53. if err != nil {
  54. return
  55. }
  56. err = importer.odbcimporter.init()
  57. if err != nil {
  58. return
  59. }
  60. }
  61. if importer.rebuild {
  62. // rebuild
  63. // 清除已有类
  64. err = importer.odbcimporter.rebuild()
  65. if err != nil {
  66. return
  67. }
  68. }
  69. // 建类
  70. err = importer.odbcimporter.ReviseClassStruct()
  71. if err != nil {
  72. return
  73. }
  74. //
  75. logger.Info("graph data import start")
  76. totalusetime = importer.importstatus.TotalUseTime
  77. importer.starttime = time.Now().Add(-totalusetime)
  78. importer.currentstarttime = time.Now()
  79. reedgefile := regexp.MustCompile("(?i).*edge.*.csv")
  80. efc, elc, erc, etc, ut, e := importer.ImportEdgeFiles(reedgefile, false)
  81. if e != nil {
  82. err = e
  83. return
  84. }
  85. afc, alc, arc, atc, ut, e := importer.ImportNonEdgeFiles(reedgefile, true)
  86. if e != nil {
  87. err = e
  88. return
  89. }
  90. totalfilescount = int64(len(importer.importstatus.ImportStatus)) + efc
  91. for _, v := range importer.importstatus.ImportStatus {
  92. totallinecount += v.LinesCount
  93. totalrecordscount += v.RecordsCount
  94. totalretrycount += v.RetryCount
  95. }
  96. totallinecount += elc
  97. totalrecordscount += erc
  98. totalretrycount += etc
  99. filescount = afc + efc
  100. linescount = alc + elc
  101. recordscount = arc + erc
  102. retrycount = atc + etc
  103. usetime = ut
  104. totalusetime = importer.importstatus.TotalUseTime
  105. importer.importstatus.WaitSaveDone()
  106. importer.alldone()
  107. return
  108. }
  109. func (importer *Importer) ImportEdgeFiles(reedgefile *regexp.Regexp, logstatus bool) (filescount, linecount, recordscount, retrycount int64, usetime time.Duration, err error) {
  110. return importer.ImportFiles(func(basedir string, fpath string) FWOP {
  111. if !reedgefile.MatchString(filepath.Base(fpath)) {
  112. // 忽略非EDGE文件
  113. return FWOP_IGNORE
  114. }
  115. return FWOP_CONTINUE
  116. }, logstatus)
  117. }
  118. func (importer *Importer) ImportNonEdgeFiles(reedgefile *regexp.Regexp, logstatus bool) (filescount, linecount, recordscount, retrycount int64, usetime time.Duration, err error) {
  119. return importer.ImportFiles(func(basedir string, fpath string) FWOP {
  120. if reedgefile.MatchString(filepath.Base(fpath)) {
  121. // 忽略EDGE文件
  122. return FWOP_IGNORE
  123. }
  124. return FWOP_CONTINUE
  125. }, logstatus)
  126. }
  127. type FWOP int
  128. const (
  129. FWOP_IGNORE FWOP = iota + 1
  130. FWOP_BREAK
  131. FWOP_CONTINUE
  132. )
  133. func (importer *Importer) ImportFiles(fwop func(basedir string, fpath string) FWOP, logstatus bool) (filescount, linescount, recordscount, retrycount int64, usetime time.Duration, err error) {
  134. // 遍历文件目录
  135. var wg sync.WaitGroup
  136. fw, e := filewalker.NewFileWalker([]string{importer.datapath}, ".*")
  137. if e != nil {
  138. err = e
  139. return
  140. }
  141. e = fw.List(func(basedir string, fpath string) bool {
  142. if err != nil {
  143. // 前方发生错误,结束遍历
  144. return false
  145. }
  146. if strings.Contains(fpath, string(filepath.Separator)) {
  147. // 忽略子目录,fw.List有序,目录排在文件后面,遇到子目录即可结束遍历
  148. return false
  149. }
  150. switch fwop(basedir, fpath) {
  151. case FWOP_IGNORE:
  152. // 忽略当前文件,继续处理下一文件
  153. return true
  154. case FWOP_BREAK:
  155. // 结束遍历
  156. return false
  157. case FWOP_CONTINUE:
  158. default:
  159. }
  160. // 继续处理当前文件
  161. filename := filepath.Join(basedir, fpath)
  162. filescount++
  163. wg.Add(1)
  164. // 并发处理
  165. importer.fileimportrc.ConcurCall(1,
  166. func() {
  167. defer wg.Done()
  168. importer.importstatus.mutex.RLock()
  169. importstatus := importer.importstatus.ImportStatus[filename]
  170. importer.importstatus.mutex.RUnlock()
  171. linefrom, blockfrom := int64(0), int64(0)
  172. totalretrycount := int64(0)
  173. if importstatus != nil {
  174. linefrom, blockfrom, totalretrycount = importstatus.LinesCount, importstatus.RecordsCount, importstatus.RetryCount
  175. }
  176. if linefrom == 0 {
  177. logger.Info("import", "file", filename)
  178. } else {
  179. logger.Info("import", "file", filename, "from line", linefrom)
  180. }
  181. lines, records, retries, e := importer.ImportFile(filename, linefrom, blockfrom, totalretrycount, logstatus)
  182. if e != nil {
  183. err = e
  184. return
  185. }
  186. atomic.AddInt64(&linescount, lines-linefrom)
  187. atomic.AddInt64(&recordscount, records-blockfrom)
  188. atomic.AddInt64(&retrycount, retries-totalretrycount)
  189. usetime = time.Since(importer.currentstarttime)
  190. importer.importstatus.mutex.Lock()
  191. if logstatus {
  192. importer.importstatus.ImportStatus[filename] = &ImportStatus{
  193. LinesCount: lines,
  194. RecordsCount: records,
  195. RetryCount: retries,
  196. }
  197. }
  198. importer.importstatus.TotalUseTime = time.Since(importer.starttime)
  199. importer.importstatus.mutex.Unlock()
  200. importer.importstatus.Save()
  201. if retries > 0 {
  202. logger.Info("file", filename, "total imported", lines, "lines", records, "records", retries, "retry times")
  203. } else {
  204. logger.Info("file", filename, "total imported", lines, "lines", records, "records")
  205. }
  206. },
  207. )
  208. return true
  209. })
  210. wg.Wait()
  211. if e != nil {
  212. if os.IsNotExist(e) {
  213. err = merrs.NewError(`directory "`+importer.datapath+`" not exist specified by "datapath"`, e)
  214. } else {
  215. err = merrs.NewError(e)
  216. }
  217. return
  218. }
  219. return
  220. }
  221. func (importer *Importer) ImportFile(filepath string, linefrom, blockfrom, totalretrycount int64, logstatus bool) (linecount, blockcount, retrycount int64, err error) {
  222. f, e := os.Open(filepath)
  223. if e != nil {
  224. return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filepath}})
  225. }
  226. defer f.Close()
  227. return importer.importReader(filepath, f, linefrom, blockfrom, totalretrycount, logstatus)
  228. }
  229. func (importer *Importer) importReader(filename string, buf io.Reader, linefrom, blockfrom, totalretrycount int64, logstatus bool) (linecount, blockcount, retrycount int64, err error) {
  230. var filetype schema.FileType
  231. switch {
  232. case strings.Contains(filename, "_L1_"):
  233. filetype = schema.FT_LEVEL1
  234. case strings.Contains(filename, "_L2_"):
  235. filetype = schema.FT_LEVEL2
  236. case strings.Contains(filename, "_L3_"):
  237. filetype = schema.FT_LEVEL3
  238. case strings.Contains(filename, "_L4_"):
  239. filetype = schema.FT_LEVEL4
  240. case strings.Contains(filename, "_L5_"):
  241. filetype = schema.FT_LEVEL5
  242. case strings.Contains(filename, "_L6_"):
  243. filetype = schema.FT_LEVEL6
  244. case strings.Contains(filename, "_L7_"):
  245. filetype = schema.FT_LEVEL7
  246. case strings.Contains(filename, "_L8_"):
  247. filetype = schema.FT_LEVEL8
  248. case strings.Contains(filename, "MASTER"):
  249. filetype = schema.FT_MASTER
  250. case strings.Contains(filename, "EDGE"):
  251. filetype = schema.FT_EDGE
  252. default:
  253. err = merrs.NewError("filename does not conform to the agreed format " + filename)
  254. return
  255. }
  256. br, e := reader.NewBlockReader(filename, filetype, buf)
  257. if e != nil {
  258. return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
  259. }
  260. lastlogtime := time.Now()
  261. skiplines := int(linefrom)
  262. blockcount = blockfrom
  263. doinglines := []int64{}
  264. retrycount = totalretrycount
  265. var wg sync.WaitGroup
  266. defer importer.done()
  267. defer wg.Wait()
  268. for {
  269. if err != nil {
  270. return
  271. }
  272. block, line, linenumber, e := br.ReadBlock(skiplines)
  273. linecount = int64(linenumber)
  274. if e != nil {
  275. return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  276. }
  277. if block == nil {
  278. return
  279. }
  280. blockcount++
  281. wg.Add(1)
  282. doingline := linecount
  283. doingblock := blockcount
  284. if logstatus {
  285. doinglines = append(doinglines, doingline)
  286. }
  287. e = importer.odbcqueryrc.ConcurCall(1, func() {
  288. defer wg.Done()
  289. if err != nil {
  290. return
  291. }
  292. rc, e := importer.importRecord(block, line, filename, filetype, int(doingline))
  293. atomic.AddInt64(&retrycount, int64(rc))
  294. if e != nil {
  295. err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(doingline)}, {"line": line}})
  296. return
  297. }
  298. if logstatus {
  299. if doingline == doinglines[0] {
  300. importer.importstatus.mutex.Lock()
  301. importer.importstatus.ImportStatus[filename] = &ImportStatus{
  302. LinesCount: doingline,
  303. RecordsCount: doingblock,
  304. RetryCount: retrycount,
  305. }
  306. importer.importstatus.TotalUseTime = time.Since(importer.starttime)
  307. importer.importstatus.Save()
  308. doinglines = doinglines[1:]
  309. if time.Since(lastlogtime) > 5*time.Second {
  310. if retrycount > 0 {
  311. logger.Info("file", filename, "imported", doingline, "lines", doingblock, "records", retrycount, "retry times")
  312. } else {
  313. logger.Info("file", filename, "imported", doingline, "lines", doingblock, "records")
  314. }
  315. lastlogtime = time.Now()
  316. }
  317. importer.importstatus.mutex.Unlock()
  318. } else {
  319. for i, l := range doinglines {
  320. if l == doingline {
  321. doinglines = append(doinglines[:i], doinglines[i+1:]...)
  322. }
  323. }
  324. }
  325. }
  326. })
  327. if e != nil {
  328. return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  329. }
  330. }
  331. }
  332. func (importer *Importer) importRecord(record map[string]any, line string, filename string, filetype schema.FileType, linecount int) (retrycount int, err error) {
  333. if odbc.LogDebug {
  334. bs, e := json.MarshalIndent(record, "", " ")
  335. if e != nil {
  336. return 0, merrs.NewError(e)
  337. }
  338. logger.Trace(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs)))
  339. }
  340. var classaliasname string
  341. switch filetype {
  342. case schema.FT_EDGE:
  343. graph.CacheEdgeInfo(record)
  344. default:
  345. classaliasname = string(filetype)
  346. retrycount, err = importer.odbcimporter.InsertData(classaliasname, record)
  347. if err != nil {
  348. err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  349. return
  350. }
  351. }
  352. return
  353. }
  354. func (importer *Importer) alldone() {
  355. importer.odbcimporter.alldone()
  356. }
  357. func (importer *Importer) done() {
  358. importer.odbcimporter.done()
  359. }
  360. func Check() {
  361. client := odbc.ODBClient
  362. if client == nil {
  363. return
  364. }
  365. {
  366. // mql := "select id,uniqueid,tags,contain,day,vtime from level1 where uniqueid='E2E:OTR0002L'"
  367. mql := "select * from level1 where uniqueid='E2E:OTR0002L'"
  368. r, e := client.Query(mql).Do()
  369. if e != nil {
  370. panic(merrs.NewError(e))
  371. }
  372. bs, _ := json.MarshalIndent(r.Data, "", " ")
  373. fmt.Println(string(bs))
  374. }
  375. {
  376. mql := `match ("level1:E2E:OTR0002L")-[*]->(),("level1:E2E:OTR0002L")<-[*]-() in "level1","level2"`
  377. r, e := client.Query(mql).Do()
  378. if e != nil {
  379. panic(merrs.NewError(e))
  380. }
  381. bs, _ := json.MarshalIndent(r.Data, "", " ")
  382. fmt.Println(string(bs))
  383. }
  384. }