importer.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. package importer
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path/filepath"
  8. "regexp"
  9. "runtime"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. "git.wecise.com/wecise/cgimport/graph"
  15. "git.wecise.com/wecise/cgimport/odbc"
  16. "git.wecise.com/wecise/cgimport/reader"
  17. "git.wecise.com/wecise/cgimport/schema"
  18. "github.com/wecisecode/util/filewalker"
  19. "github.com/wecisecode/util/merrs"
  20. "github.com/wecisecode/util/rc"
  21. )
  22. var mcfg = odbc.Config
  23. var logger = odbc.Logger
  24. type Importer struct {
  25. datapath string
  26. parallel int
  27. rebuild bool
  28. reload bool
  29. importstatus *CGIStatus
  30. fileimportrc *rc.RoutinesController
  31. odbcqueryrc *rc.RoutinesController
  32. odbcimporter *ODBCImporter
  33. starttime time.Time
  34. currentstarttime time.Time
  35. }
  36. func ImportDir(datapath string, parallel int, rebuild, reload bool) (totalfilescount, totallinecount, totalrecordscount, totalretrycount int64, totalusetime time.Duration, filescount, linescount, recordscount, retrycount int64, usetime time.Duration, err error) {
  37. concurlimt := mcfg.GetInt("odbc.concurrent.limit", runtime.GOMAXPROCS(0))
  38. importer := &Importer{
  39. datapath: datapath,
  40. parallel: parallel,
  41. rebuild: rebuild,
  42. reload: reload,
  43. importstatus: NewCGIStatus(),
  44. fileimportrc: rc.NewRoutinesController("", parallel),
  45. odbcqueryrc: rc.NewRoutinesControllerLimit("", concurlimt, concurlimt*5),
  46. odbcimporter: NewODBCImporter(),
  47. }
  48. return importer.Import()
  49. }
  50. func (importer *Importer) Import() (totalfilescount, totallinecount, totalrecordscount, totalretrycount int64, totalusetime time.Duration, filescount, linescount, recordscount, retrycount int64, usetime time.Duration, err error) {
  51. if odbc.DevPhase&odbc.DP_PROCESSCONTINUE != 0 && !importer.reload && !importer.rebuild {
  52. // reload
  53. err = importer.importstatus.Load()
  54. if err != nil {
  55. return
  56. }
  57. }
  58. err = importer.odbcimporter.init(importer.rebuild)
  59. if err != nil {
  60. return
  61. }
  62. //
  63. logger.Info("graph data import start")
  64. totalusetime = importer.importstatus.TotalUseTime
  65. importer.starttime = time.Now().Add(-totalusetime)
  66. importer.currentstarttime = time.Now()
  67. reedgefile := regexp.MustCompile("(?i).*edge.*.csv")
  68. efc, elc, erc, etc, ut, e := importer.ImportEdgeFiles(reedgefile, false)
  69. if e != nil {
  70. err = e
  71. return
  72. }
  73. afc, alc, arc, atc, ut, e := importer.ImportNonEdgeFiles(reedgefile, true)
  74. if e != nil {
  75. err = e
  76. return
  77. }
  78. totalfilescount = int64(len(importer.importstatus.ImportStatus)) + efc
  79. for _, v := range importer.importstatus.ImportStatus {
  80. totallinecount += v.LinesCount
  81. totalrecordscount += v.RecordsCount
  82. totalretrycount += v.RetryCount
  83. }
  84. totallinecount += elc
  85. totalrecordscount += erc
  86. totalretrycount += etc
  87. filescount = afc + efc
  88. linescount = alc + elc
  89. recordscount = arc + erc
  90. retrycount = atc + etc
  91. usetime = ut
  92. totalusetime = importer.importstatus.TotalUseTime
  93. importer.importstatus.WaitSaveDone()
  94. importer.alldone()
  95. return
  96. }
  97. func (importer *Importer) ImportEdgeFiles(reedgefile *regexp.Regexp, logstatus bool) (filescount, linecount, recordscount, retrycount int64, usetime time.Duration, err error) {
  98. return importer.ImportFiles(func(basedir string, fpath string) FWOP {
  99. if !reedgefile.MatchString(filepath.Base(fpath)) {
  100. // 忽略非EDGE文件
  101. return FWOP_IGNORE
  102. }
  103. return FWOP_CONTINUE
  104. }, logstatus)
  105. }
  106. func (importer *Importer) ImportNonEdgeFiles(reedgefile *regexp.Regexp, logstatus bool) (filescount, linecount, recordscount, retrycount int64, usetime time.Duration, err error) {
  107. return importer.ImportFiles(func(basedir string, fpath string) FWOP {
  108. if reedgefile.MatchString(filepath.Base(fpath)) {
  109. // 忽略EDGE文件
  110. return FWOP_IGNORE
  111. }
  112. return FWOP_CONTINUE
  113. }, logstatus)
  114. }
  115. type FWOP int
  116. const (
  117. FWOP_IGNORE FWOP = iota + 1
  118. FWOP_BREAK
  119. FWOP_CONTINUE
  120. )
  121. func (importer *Importer) ImportFiles(fwop func(basedir string, fpath string) FWOP, logstatus bool) (filescount, linescount, recordscount, retrycount int64, usetime time.Duration, err error) {
  122. // 遍历文件目录
  123. var wg sync.WaitGroup
  124. fw, e := filewalker.NewFileWalker([]string{importer.datapath}, ".*")
  125. if e != nil {
  126. err = e
  127. return
  128. }
  129. e = fw.List(func(basedir string, fpath string) bool {
  130. if err != nil {
  131. // 前方发生错误,结束遍历
  132. return false
  133. }
  134. if strings.Contains(fpath, string(filepath.Separator)) {
  135. // 忽略子目录,fw.List有序,目录排在文件后面,遇到子目录即可结束遍历
  136. return false
  137. }
  138. switch fwop(basedir, fpath) {
  139. case FWOP_IGNORE:
  140. // 忽略当前文件,继续处理下一文件
  141. return true
  142. case FWOP_BREAK:
  143. // 结束遍历
  144. return false
  145. case FWOP_CONTINUE:
  146. default:
  147. }
  148. // 继续处理当前文件
  149. filename := filepath.Join(basedir, fpath)
  150. filescount++
  151. wg.Add(1)
  152. // 并发处理
  153. importer.fileimportrc.ConcurCall(1,
  154. func() {
  155. defer wg.Done()
  156. importer.importstatus.mutex.RLock()
  157. importstatus := importer.importstatus.ImportStatus[filename]
  158. importer.importstatus.mutex.RUnlock()
  159. linefrom, blockfrom := int64(0), int64(0)
  160. totalretrycount := int64(0)
  161. if importstatus != nil {
  162. linefrom, blockfrom, totalretrycount = importstatus.LinesCount, importstatus.RecordsCount, importstatus.RetryCount
  163. }
  164. if linefrom == 0 {
  165. logger.Info("import", "file", filename)
  166. } else {
  167. logger.Info("import", "file", filename, "from line", linefrom)
  168. }
  169. lines, records, retries, e := importer.ImportFile(filename, linefrom, blockfrom, totalretrycount, logstatus)
  170. if e != nil {
  171. err = e
  172. return
  173. }
  174. atomic.AddInt64(&linescount, lines-linefrom)
  175. atomic.AddInt64(&recordscount, records-blockfrom)
  176. atomic.AddInt64(&retrycount, retries-totalretrycount)
  177. usetime = time.Since(importer.currentstarttime)
  178. importer.importstatus.mutex.Lock()
  179. if logstatus {
  180. importer.importstatus.ImportStatus[filename] = &ImportStatus{
  181. LinesCount: lines,
  182. RecordsCount: records,
  183. RetryCount: retries,
  184. }
  185. }
  186. importer.importstatus.TotalUseTime = time.Since(importer.starttime)
  187. importer.importstatus.mutex.Unlock()
  188. importer.importstatus.Save()
  189. if retries > 0 {
  190. logger.Info("file", filename, "total imported", lines, "lines", records, "records", retries, "retry times")
  191. } else {
  192. logger.Info("file", filename, "total imported", lines, "lines", records, "records")
  193. }
  194. },
  195. )
  196. return true
  197. })
  198. wg.Wait()
  199. if e != nil {
  200. if os.IsNotExist(e) {
  201. err = merrs.NewError(`directory "`+importer.datapath+`" not exist specified by "datapath"`, e)
  202. } else {
  203. err = merrs.NewError(e)
  204. }
  205. return
  206. }
  207. return
  208. }
  209. func (importer *Importer) ImportFile(filepath string, linefrom, blockfrom, totalretrycount int64, logstatus bool) (linecount, blockcount, retrycount int64, err error) {
  210. f, e := os.Open(filepath)
  211. if e != nil {
  212. return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filepath}})
  213. }
  214. defer f.Close()
  215. return importer.importReader(filepath, f, linefrom, blockfrom, totalretrycount, logstatus)
  216. }
  217. func (importer *Importer) importReader(filename string, buf io.Reader, linefrom, blockfrom, totalretrycount int64, logstatus bool) (linecount, blockcount, retrycount int64, err error) {
  218. var filetype schema.FileType
  219. switch {
  220. case strings.Contains(filename, "_L1_"):
  221. filetype = schema.FT_LEVEL1
  222. case strings.Contains(filename, "_L2_"):
  223. filetype = schema.FT_LEVEL2
  224. case strings.Contains(filename, "_L3_"):
  225. filetype = schema.FT_LEVEL3
  226. case strings.Contains(filename, "_L4_"):
  227. filetype = schema.FT_LEVEL4
  228. case strings.Contains(filename, "_L5_"):
  229. filetype = schema.FT_LEVEL5
  230. case strings.Contains(filename, "_L6_"):
  231. filetype = schema.FT_LEVEL6
  232. case strings.Contains(filename, "_L7_"):
  233. filetype = schema.FT_LEVEL7
  234. case strings.Contains(filename, "_L8_"):
  235. filetype = schema.FT_LEVEL8
  236. case strings.Contains(filename, "MASTER"):
  237. filetype = schema.FT_MASTER
  238. case strings.Contains(filename, "EDGE"):
  239. filetype = schema.FT_EDGE
  240. default:
  241. err = merrs.NewError("filename does not conform to the agreed format " + filename)
  242. return
  243. }
  244. br, e := reader.NewBlockReader(filename, filetype, buf)
  245. if e != nil {
  246. return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
  247. }
  248. lastlogtime := time.Now()
  249. skiplines := int(linefrom)
  250. blockcount = blockfrom
  251. doinglines := []int64{}
  252. retrycount = totalretrycount
  253. // maxresponsetime := time.Duration(0)
  254. var wg sync.WaitGroup
  255. defer importer.done()
  256. defer wg.Wait()
  257. for {
  258. if err != nil {
  259. return
  260. }
  261. block, line, linenumber, e := br.ReadBlock(skiplines)
  262. linecount = int64(linenumber)
  263. if e != nil {
  264. return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  265. }
  266. if block == nil {
  267. return
  268. }
  269. blockcount++
  270. wg.Add(1)
  271. doingline := linecount
  272. doingblock := blockcount
  273. if logstatus {
  274. doinglines = append(doinglines, doingline)
  275. }
  276. e = importer.odbcqueryrc.ConcurCall(1, func() {
  277. defer wg.Done()
  278. if err != nil {
  279. return
  280. }
  281. // st := time.Now()
  282. // defer func() {
  283. // ut := time.Since(st)
  284. // if ut > maxresponsetime {
  285. // maxresponsetime = ut
  286. // }
  287. // }()
  288. // logger.Info("G:", runtime.NumGoroutine(),
  289. // "RC:", importer.fileimportrc.ConcurCount(),
  290. // "WC:", importer.odbcqueryrc.ConcurCount(),
  291. // "RQ:", importer.fileimportrc.QueueCount(),
  292. // "WQ:", importer.odbcqueryrc.QueueCount(),
  293. // "maxresponsetime:", maxresponsetime)
  294. rc, e := importer.importRecord(block, line, filename, filetype, int(doingline))
  295. atomic.AddInt64(&retrycount, int64(rc))
  296. if e != nil {
  297. err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(doingline)}, {"line": line}})
  298. return
  299. }
  300. if logstatus {
  301. if doingline == doinglines[0] {
  302. importer.importstatus.mutex.Lock()
  303. importer.importstatus.ImportStatus[filename] = &ImportStatus{
  304. LinesCount: doingline,
  305. RecordsCount: doingblock,
  306. RetryCount: retrycount,
  307. }
  308. importer.importstatus.TotalUseTime = time.Since(importer.starttime)
  309. importer.importstatus.Save()
  310. doinglines = doinglines[1:]
  311. if time.Since(lastlogtime) > 5*time.Second {
  312. if retrycount > 0 {
  313. logger.Info("file", filename, "imported", doingline, "lines", doingblock, "records", retrycount, "retry times")
  314. } else {
  315. logger.Info("file", filename, "imported", doingline, "lines", doingblock, "records")
  316. }
  317. lastlogtime = time.Now()
  318. }
  319. importer.importstatus.mutex.Unlock()
  320. } else {
  321. for i, l := range doinglines {
  322. if l == doingline {
  323. doinglines = append(doinglines[:i], doinglines[i+1:]...)
  324. }
  325. }
  326. }
  327. }
  328. })
  329. if e != nil {
  330. return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  331. }
  332. }
  333. }
  334. func (importer *Importer) importRecord(record map[string]any, line string, filename string, filetype schema.FileType, linecount int) (retrycount int, err error) {
  335. if odbc.LogDebug {
  336. bs, e := json.MarshalIndent(record, "", " ")
  337. if e != nil {
  338. return 0, merrs.NewError(e)
  339. }
  340. logger.Trace(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs)))
  341. }
  342. var classaliasname string
  343. switch filetype {
  344. case schema.FT_EDGE:
  345. graph.CacheEdgeInfo(record)
  346. default:
  347. classaliasname = string(filetype)
  348. retrycount, err = importer.odbcimporter.InsertData(classaliasname, record)
  349. if err != nil {
  350. err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  351. return
  352. }
  353. }
  354. return
  355. }
  356. func (importer *Importer) alldone() {
  357. importer.odbcimporter.alldone()
  358. }
  359. func (importer *Importer) done() {
  360. importer.odbcimporter.done()
  361. }
  362. func Check() {
  363. client := odbc.ODBClient
  364. if client == nil {
  365. return
  366. }
  367. {
  368. // mql := "select id,uniqueid,tags,contain,day,vtime from level1 where uniqueid='E2E:OTR0002L'"
  369. mql := "select * from level1 where uniqueid='E2E:OTR0002L'"
  370. r, e := client.Query(mql).Do()
  371. if e != nil {
  372. panic(merrs.NewError(e))
  373. }
  374. bs, _ := json.MarshalIndent(r.Data, "", " ")
  375. fmt.Println(string(bs))
  376. }
  377. {
  378. mql := `match ("level1:E2E:OTR0002L")-[*]->(),("level1:E2E:OTR0002L")<-[*]-() with namespace="m3cnet", fields="uniqueid,distname" in "level1","level2"`
  379. r, e := client.Query(mql).Do()
  380. if e != nil {
  381. panic(merrs.NewError(e))
  382. }
  383. bs, _ := json.MarshalIndent(r.Data, "", " ")
  384. fmt.Println(string(bs))
  385. }
  386. }