importer.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. package importer
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path/filepath"
  8. "regexp"
  9. "runtime"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. "git.wecise.com/wecise/cgimport/graph"
  15. "git.wecise.com/wecise/cgimport/odbc"
  16. "git.wecise.com/wecise/cgimport/reader"
  17. "git.wecise.com/wecise/cgimport/schema"
  18. "github.com/wecisecode/util/cmap"
  19. "github.com/wecisecode/util/filewalker"
  20. "github.com/wecisecode/util/merrs"
  21. "github.com/wecisecode/util/pattern"
  22. "github.com/wecisecode/util/rc"
  23. )
  24. var mcfg = odbc.Config
  25. var logger = odbc.Logger
  26. type Importer struct {
  27. datapath string
  28. parallel int
  29. rebuild bool
  30. reload bool
  31. importstatus *CGIStatus
  32. fileimportrc *rc.RoutinesController
  33. odbcqueryrc *rc.RoutinesController
  34. odbcimporter *ODBCImporter
  35. starttime time.Time
  36. currentstarttime time.Time
  37. }
  38. func ImportDir(datapath string, parallel int, rebuild, reload bool) (totalfilescount, totallinecount, totalrecordscount, totalretrycount int64, totalusetime time.Duration, filescount, linescount, recordscount, retrycount int64, usetime time.Duration, err error) {
  39. concurlimt := mcfg.GetInt("odbc.concurrent.limit", runtime.GOMAXPROCS(0))
  40. importer := &Importer{
  41. datapath: datapath,
  42. parallel: parallel,
  43. rebuild: rebuild,
  44. reload: reload,
  45. importstatus: NewCGIStatus(),
  46. fileimportrc: rc.NewRoutinesController("", parallel),
  47. odbcqueryrc: rc.NewRoutinesControllerLimit("", concurlimt, concurlimt*5),
  48. odbcimporter: NewODBCImporter(),
  49. }
  50. return importer.Import()
  51. }
  52. func (importer *Importer) Import() (totalfilescount, totallinecount, totalrecordscount, totalretrycount int64, totalusetime time.Duration, filescount, linescount, recordscount, retrycount int64, usetime time.Duration, err error) {
  53. if odbc.DevPhase&odbc.DP_PROCESSCONTINUE != 0 && !importer.reload && !importer.rebuild {
  54. // reload
  55. err = importer.importstatus.Load()
  56. if err != nil {
  57. return
  58. }
  59. }
  60. err = importer.odbcimporter.init(importer.rebuild)
  61. if err != nil {
  62. return
  63. }
  64. //
  65. logger.Info("graph data import start")
  66. totalusetime = importer.importstatus.TotalUseTime
  67. importer.starttime = time.Now().Add(-totalusetime)
  68. importer.currentstarttime = time.Now()
  69. reedgefile := regexp.MustCompile("(?i).*edge.*.csv")
  70. efc, elc, erc, etc, ut, e := importer.ImportEdgeFiles(reedgefile, false)
  71. if e != nil {
  72. err = e
  73. return
  74. }
  75. afc, alc, arc, atc, ut, e := importer.ImportNonEdgeFiles(reedgefile, true)
  76. if e != nil {
  77. err = e
  78. return
  79. }
  80. totalfilescount = int64(len(importer.importstatus.ImportStatus)) + efc
  81. for _, v := range importer.importstatus.ImportStatus {
  82. totallinecount += v.LinesCount
  83. totalrecordscount += v.RecordsCount
  84. totalretrycount += v.RetryCount
  85. }
  86. totallinecount += elc
  87. totalrecordscount += erc
  88. totalretrycount += etc
  89. filescount = afc + efc
  90. linescount = alc + elc
  91. recordscount = arc + erc
  92. retrycount = atc + etc
  93. usetime = ut
  94. totalusetime = importer.importstatus.TotalUseTime
  95. importer.importstatus.WaitSaveDone()
  96. importer.alldone()
  97. return
  98. }
  99. func (importer *Importer) ImportEdgeFiles(reedgefile *regexp.Regexp, logstatus bool) (filescount, linecount, recordscount, retrycount int64, usetime time.Duration, err error) {
  100. return importer.ImportFiles(func(basedir string, fpath string) FWOP {
  101. if !reedgefile.MatchString(filepath.Base(fpath)) {
  102. // 忽略非EDGE文件
  103. return FWOP_IGNORE
  104. }
  105. return FWOP_CONTINUE
  106. }, logstatus)
  107. }
  108. func (importer *Importer) ImportNonEdgeFiles(reedgefile *regexp.Regexp, logstatus bool) (filescount, linecount, recordscount, retrycount int64, usetime time.Duration, err error) {
  109. return importer.ImportFiles(func(basedir string, fpath string) FWOP {
  110. if reedgefile.MatchString(filepath.Base(fpath)) {
  111. // 忽略EDGE文件
  112. return FWOP_IGNORE
  113. }
  114. return FWOP_CONTINUE
  115. }, logstatus)
  116. }
  117. type FWOP int
  118. const (
  119. FWOP_IGNORE FWOP = iota + 1
  120. FWOP_BREAK
  121. FWOP_CONTINUE
  122. )
  123. func (importer *Importer) ImportFiles(fwop func(basedir string, fpath string) FWOP, logstatus bool) (filescount, linescount, recordscount, retrycount int64, usetime time.Duration, err error) {
  124. // 遍历文件目录
  125. var wg sync.WaitGroup
  126. fw, e := filewalker.NewFileWalker([]string{importer.datapath}, `^[^\.].*`)
  127. if e != nil {
  128. err = e
  129. return
  130. }
  131. e = fw.List(func(basedir string, fpath string) bool {
  132. if err != nil {
  133. // 前方发生错误,结束遍历
  134. return false
  135. }
  136. if strings.Contains(fpath, string(filepath.Separator)) {
  137. // 忽略子目录,fw.List有序,目录排在文件后面,遇到子目录即可结束遍历
  138. return false
  139. }
  140. switch fwop(basedir, fpath) {
  141. case FWOP_IGNORE:
  142. // 忽略当前文件,继续处理下一文件
  143. return true
  144. case FWOP_BREAK:
  145. // 结束遍历
  146. return false
  147. case FWOP_CONTINUE:
  148. default:
  149. }
  150. // 继续处理当前文件
  151. filename := filepath.Join(basedir, fpath)
  152. filescount++
  153. wg.Add(1)
  154. // 并发处理
  155. importer.fileimportrc.ConcurCall(1,
  156. func() {
  157. defer wg.Done()
  158. importer.importstatus.mutex.RLock()
  159. importstatus := importer.importstatus.ImportStatus[filename]
  160. importer.importstatus.mutex.RUnlock()
  161. linefrom, blockfrom := int64(0), int64(0)
  162. totalretrycount := int64(0)
  163. if importstatus != nil {
  164. linefrom, blockfrom, totalretrycount = importstatus.LinesCount, importstatus.RecordsCount, importstatus.RetryCount
  165. }
  166. if linefrom == 0 {
  167. logger.Info("import", "file", filename)
  168. } else {
  169. logger.Info("import", "file", filename, "from line", linefrom)
  170. }
  171. lines, records, retries, e := importer.ImportFile(filename, linefrom, blockfrom, totalretrycount, logstatus)
  172. if e != nil {
  173. err = e
  174. return
  175. }
  176. atomic.AddInt64(&linescount, lines-linefrom)
  177. atomic.AddInt64(&recordscount, records-blockfrom)
  178. atomic.AddInt64(&retrycount, retries-totalretrycount)
  179. usetime = time.Since(importer.currentstarttime)
  180. importer.importstatus.mutex.Lock()
  181. if logstatus {
  182. importer.importstatus.ImportStatus[filename] = &ImportStatus{
  183. LinesCount: lines,
  184. RecordsCount: records,
  185. RetryCount: retries,
  186. }
  187. }
  188. importer.importstatus.TotalUseTime = time.Since(importer.starttime)
  189. importer.importstatus.mutex.Unlock()
  190. importer.importstatus.Save()
  191. if retries > 0 {
  192. logger.Info("file", filename, "total imported", lines, "lines", records, "records", retries, "retry times")
  193. } else {
  194. logger.Info("file", filename, "total imported", lines, "lines", records, "records")
  195. }
  196. },
  197. )
  198. return true
  199. })
  200. wg.Wait()
  201. if e != nil {
  202. if os.IsNotExist(e) {
  203. err = merrs.NewError(`directory "`+importer.datapath+`" not exist specified by "datapath"`, e)
  204. } else {
  205. err = merrs.NewError(e)
  206. }
  207. return
  208. }
  209. return
  210. }
  211. func (importer *Importer) ImportFile(filepath string, linefrom, blockfrom, totalretrycount int64, logstatus bool) (linecount, blockcount, retrycount int64, err error) {
  212. f, e := os.Open(filepath)
  213. if e != nil {
  214. return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filepath}})
  215. }
  216. defer f.Close()
  217. return importer.importReader(filepath, f, linefrom, blockfrom, totalretrycount, logstatus)
  218. }
  219. var fileclassmapping = cmap.NewSingle[string, *regexp.Regexp]()
  220. func init() {
  221. fileclassmapping.Set("EDGE", regexp.MustCompile(pattern.Contain2RegexpString(`EDGE`)))
  222. fileclassmapping.Set("master", regexp.MustCompile(pattern.Contain2RegexpString(`MASTER`)))
  223. fileclassmapping.Set("level1", regexp.MustCompile(pattern.Contain2RegexpString(`_L1_`)))
  224. fileclassmapping.Set("level2", regexp.MustCompile(pattern.Contain2RegexpString(`_L2_`)))
  225. fileclassmapping.Set("level3", regexp.MustCompile(pattern.Contain2RegexpString(`_L3_`)))
  226. fileclassmapping.Set("level4", regexp.MustCompile(pattern.Contain2RegexpString(`_L4_`)))
  227. fileclassmapping.Set("level5", regexp.MustCompile(pattern.Contain2RegexpString(`_L5_`)))
  228. fileclassmapping.Set("level6", regexp.MustCompile(pattern.Contain2RegexpString(`_L6_`)))
  229. fileclassmapping.Set("level7", regexp.MustCompile(pattern.Contain2RegexpString(`_L7_`)))
  230. fileclassmapping.Set("level8", regexp.MustCompile(pattern.Contain2RegexpString(`_L8_`)))
  231. keys := mcfg.Keys()
  232. for _, key := range keys {
  233. if strings.HasPrefix(key, "mapping.class.") {
  234. classname := key[len("mapping.class."):]
  235. filepatterns := mcfg.GetStrings(key)
  236. for _, fp := range filepatterns {
  237. fp = pattern.Wildcard2RegexpString(fp)
  238. fileclassmapping.Set(classname, regexp.MustCompile(fp))
  239. }
  240. }
  241. }
  242. }
  243. func (importer *Importer) importReader(filename string, buf io.Reader, linefrom, blockfrom, totalretrycount int64, logstatus bool) (linecount, blockcount, retrycount int64, err error) {
  244. classname := ""
  245. fileclassmapping.Fetch(func(key string, v *regexp.Regexp) bool {
  246. if v.MatchString(filename) {
  247. classname = key
  248. return false
  249. }
  250. return true
  251. })
  252. if classname == "" {
  253. err = merrs.NewError("cannot mapping to class, filename:" + filename)
  254. return
  255. }
  256. br, e := reader.NewBlockReader(filename, classname, buf)
  257. if e != nil {
  258. return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
  259. }
  260. lastlogtime := time.Now()
  261. skiplines := int(linefrom)
  262. blockcount = blockfrom
  263. doinglines := []int64{}
  264. donelines := linefrom
  265. doneblocks := blockfrom
  266. retrycount = totalretrycount
  267. // maxresponsetime := time.Duration(0)
  268. var wg sync.WaitGroup
  269. defer importer.done()
  270. defer wg.Wait()
  271. for {
  272. if err != nil {
  273. return
  274. }
  275. block, line, linenumber, e := br.ReadBlock(skiplines)
  276. linecount = int64(linenumber)
  277. if e != nil {
  278. return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  279. }
  280. if block == nil {
  281. return
  282. }
  283. blockcount++
  284. wg.Add(1)
  285. doingline := linecount
  286. doingblock := blockcount
  287. if logstatus {
  288. doinglines = append(doinglines, doingline)
  289. }
  290. e = importer.odbcqueryrc.ConcurCall(1, func() {
  291. defer wg.Done()
  292. if err != nil {
  293. return
  294. }
  295. // st := time.Now()
  296. // defer func() {
  297. // ut := time.Since(st)
  298. // if ut > maxresponsetime {
  299. // maxresponsetime = ut
  300. // }
  301. // }()
  302. // logger.Info("G:", runtime.NumGoroutine(),
  303. // "RC:", importer.fileimportrc.ConcurCount(),
  304. // "WC:", importer.odbcqueryrc.ConcurCount(),
  305. // "RQ:", importer.fileimportrc.QueueCount(),
  306. // "WQ:", importer.odbcqueryrc.QueueCount(),
  307. // "maxresponsetime:", maxresponsetime)
  308. rc, e := importer.importRecord(block, line, filename, classname, int(doingline))
  309. atomic.AddInt64(&retrycount, int64(rc))
  310. if e != nil {
  311. err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(doingline)}, {"line": line}})
  312. return
  313. }
  314. atomic.AddInt64(&doneblocks, 1)
  315. if logstatus {
  316. readinglines := doinglines[len(doinglines)-1]
  317. if doingline == doinglines[0] {
  318. importer.importstatus.mutex.Lock()
  319. importer.importstatus.ImportStatus[filename] = &ImportStatus{
  320. LinesCount: doingline,
  321. RecordsCount: doingblock,
  322. RetryCount: retrycount,
  323. }
  324. donelines = doingline
  325. importer.importstatus.TotalUseTime = time.Since(importer.starttime)
  326. importer.importstatus.Save()
  327. doinglines = doinglines[1:]
  328. if time.Since(lastlogtime) > 5*time.Second {
  329. if retrycount > 0 {
  330. logger.Info("file", filename, "read", readinglines, "lines,", "importing", len(doinglines), "chunks,", "imported", donelines, "lines", doneblocks, "records", retrycount, "retry times")
  331. } else {
  332. logger.Info("file", filename, "read", readinglines, "lines,", "importing", len(doinglines), "chunks,", "imported", donelines, "lines", doneblocks, "records")
  333. }
  334. lastlogtime = time.Now()
  335. }
  336. importer.importstatus.mutex.Unlock()
  337. } else {
  338. for i, l := range doinglines {
  339. if l == doingline {
  340. doinglines = append(doinglines[:i], doinglines[i+1:]...)
  341. break
  342. }
  343. }
  344. if time.Since(lastlogtime) > 5*time.Second {
  345. if retrycount > 0 {
  346. logger.Info("file", filename, "read", readinglines, "lines,", "importing", len(doinglines), "chunks,", "imported", donelines, "lines", doneblocks, "records", retrycount, "retry times")
  347. } else {
  348. logger.Info("file", filename, "read", readinglines, "lines,", "importing", len(doinglines), "chunks,", "imported", donelines, "lines", doneblocks, "records")
  349. }
  350. lastlogtime = time.Now()
  351. }
  352. }
  353. }
  354. })
  355. if e != nil {
  356. return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  357. }
  358. }
  359. }
  360. func (importer *Importer) importRecord(record map[string]any, line string, filename string, classaliasname string, linecount int) (retrycount int, err error) {
  361. if odbc.LogDebug {
  362. bs, e := json.MarshalIndent(record, "", " ")
  363. if e != nil {
  364. return 0, merrs.NewError(e)
  365. }
  366. logger.Trace(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs)))
  367. }
  368. switch classaliasname {
  369. case schema.EDGE:
  370. graph.CacheEdgeInfo(record)
  371. default:
  372. retrycount, err = importer.odbcimporter.InsertData(classaliasname, record)
  373. if err != nil {
  374. err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  375. return
  376. }
  377. }
  378. return
  379. }
  380. func (importer *Importer) alldone() {
  381. importer.odbcimporter.alldone()
  382. }
  383. func (importer *Importer) done() {
  384. importer.odbcimporter.done()
  385. }
  386. func Check() {
  387. client := odbc.ODBClient
  388. if client == nil {
  389. return
  390. }
  391. {
  392. // mql := "select id,uniqueid,tags,contain,day,vtime from level1 where uniqueid='E2E:OTR0002L'"
  393. mql := "select * from level1 where uniqueid='E2E:OTR0002L'"
  394. r, e := client.Query(mql).Do()
  395. if e != nil {
  396. panic(merrs.NewError(e))
  397. }
  398. bs, _ := json.MarshalIndent(r.Data, "", " ")
  399. fmt.Println(string(bs))
  400. }
  401. {
  402. mql := `match ("level1:E2E:OTR0002L")-[*]->(),("level1:E2E:OTR0002L")<-[*]-() with namespace="m3cnet", fields="uniqueid,distname" in "level1","level2"`
  403. r, e := client.Query(mql).Do()
  404. if e != nil {
  405. panic(merrs.NewError(e))
  406. }
  407. bs, _ := json.MarshalIndent(r.Data, "", " ")
  408. fmt.Println(string(bs))
  409. }
  410. }