importer.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381
  1. package importer
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path/filepath"
  8. "regexp"
  9. "strings"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "git.wecise.com/wecise/cgimport/graph"
  14. "git.wecise.com/wecise/cgimport/odbc"
  15. "git.wecise.com/wecise/cgimport/reader"
  16. "git.wecise.com/wecise/cgimport/schema"
  17. "github.com/wecisecode/util/filewalker"
  18. "github.com/wecisecode/util/merrs"
  19. "github.com/wecisecode/util/rc"
  20. )
  21. var mcfg = odbc.Config
  22. var logger = odbc.Logger
  23. type Importer struct {
  24. datapath string
  25. parallel int
  26. rebuild bool
  27. reload bool
  28. importstatus *CGIStatus
  29. fileimportrc *rc.RoutinesController
  30. odbcqueryrc *rc.RoutinesController
  31. odbcimporter *ODBCImporter
  32. starttime time.Time
  33. currentstarttime time.Time
  34. }
  35. func ImportDir(datapath string, parallel int, rebuild, reload bool) (totalfilescount, totallinecount, totalrecordscount int64, totalusetime time.Duration, filescount, linescount, recordscount int64, usetime time.Duration, err error) {
  36. concurlimt := mcfg.GetInt("odbc.concurrent.limit", 100)
  37. importer := &Importer{
  38. datapath: datapath,
  39. parallel: parallel,
  40. rebuild: rebuild,
  41. reload: reload,
  42. importstatus: NewCGIStatus(),
  43. fileimportrc: rc.NewRoutinesController("", parallel),
  44. odbcqueryrc: rc.NewRoutinesControllerLimit("", concurlimt, concurlimt*2),
  45. odbcimporter: NewODBCImporter(),
  46. }
  47. return importer.Import()
  48. }
  49. func (importer *Importer) Import() (totalfilescount, totallinecount, totalrecordscount int64, totalusetime time.Duration, filescount, linescount, recordscount int64, usetime time.Duration, err error) {
  50. if odbc.DevPhase&odbc.DP_PROCESSCONTINUE != 0 && !importer.reload {
  51. // reload
  52. err = importer.importstatus.Load()
  53. if err != nil {
  54. return
  55. }
  56. err = importer.odbcimporter.init()
  57. if err != nil {
  58. return
  59. }
  60. }
  61. if importer.rebuild {
  62. // rebuild
  63. // 清除已有类
  64. err = importer.odbcimporter.rebuild()
  65. if err != nil {
  66. return
  67. }
  68. }
  69. // 建类
  70. err = importer.odbcimporter.ReviseClassStruct()
  71. if err != nil {
  72. return
  73. }
  74. totalusetime = importer.importstatus.TotalUseTime
  75. importer.starttime = time.Now().Add(-totalusetime)
  76. importer.currentstarttime = time.Now()
  77. reedgefile := regexp.MustCompile("(?i).*edge.*.csv")
  78. efc, elc, erc, ut, e := importer.ImportEdgeFiles(reedgefile, false)
  79. if e != nil {
  80. err = e
  81. return
  82. }
  83. afc, alc, arc, ut, e := importer.ImportNonEdgeFiles(reedgefile, true)
  84. if e != nil {
  85. err = e
  86. return
  87. }
  88. totalfilescount = int64(len(importer.importstatus.ImportStatus)) + efc
  89. for _, v := range importer.importstatus.ImportStatus {
  90. totallinecount += v.LinesCount
  91. totalrecordscount += v.RecordsCount
  92. }
  93. totallinecount += elc
  94. totalrecordscount += erc
  95. filescount = afc + efc
  96. linescount = alc + elc
  97. recordscount = arc + erc
  98. usetime = ut
  99. totalusetime = importer.importstatus.TotalUseTime
  100. importer.importstatus.WaitSaveDone()
  101. importer.alldone()
  102. return
  103. }
  104. func (importer *Importer) ImportEdgeFiles(reedgefile *regexp.Regexp, logstatus bool) (filescount, linecount, recordscount int64, usetime time.Duration, err error) {
  105. return importer.ImportFiles(func(basedir string, fpath string) FWOP {
  106. if !reedgefile.MatchString(filepath.Base(fpath)) {
  107. // 忽略非EDGE文件
  108. return FWOP_IGNORE
  109. }
  110. return FWOP_CONTINUE
  111. }, logstatus)
  112. }
  113. func (importer *Importer) ImportNonEdgeFiles(reedgefile *regexp.Regexp, logstatus bool) (filescount, linecount, recordscount int64, usetime time.Duration, err error) {
  114. return importer.ImportFiles(func(basedir string, fpath string) FWOP {
  115. if reedgefile.MatchString(filepath.Base(fpath)) {
  116. // 忽略EDGE文件
  117. return FWOP_IGNORE
  118. }
  119. return FWOP_CONTINUE
  120. }, logstatus)
  121. }
  122. type FWOP int
  123. const (
  124. FWOP_IGNORE FWOP = iota + 1
  125. FWOP_BREAK
  126. FWOP_CONTINUE
  127. )
  128. func (importer *Importer) ImportFiles(fwop func(basedir string, fpath string) FWOP, logstatus bool) (filescount, linescount, recordscount int64, usetime time.Duration, err error) {
  129. // 遍历文件目录
  130. var wg sync.WaitGroup
  131. fw, e := filewalker.NewFileWalker([]string{importer.datapath}, ".*")
  132. if e != nil {
  133. err = e
  134. return
  135. }
  136. e = fw.List(func(basedir string, fpath string) bool {
  137. if err != nil {
  138. // 前方发生错误,结束遍历
  139. return false
  140. }
  141. if strings.Contains(fpath, string(filepath.Separator)) {
  142. // 忽略子目录,fw.List有序,目录排在文件后面,遇到子目录即可结束遍历
  143. return false
  144. }
  145. switch fwop(basedir, fpath) {
  146. case FWOP_IGNORE:
  147. // 忽略当前文件,继续处理下一文件
  148. return true
  149. case FWOP_BREAK:
  150. // 结束遍历
  151. return false
  152. case FWOP_CONTINUE:
  153. default:
  154. }
  155. // 继续处理当前文件
  156. filename := filepath.Join(basedir, fpath)
  157. filescount++
  158. wg.Add(1)
  159. // 并发处理
  160. importer.fileimportrc.ConcurCall(1,
  161. func() {
  162. defer wg.Done()
  163. importer.importstatus.mutex.RLock()
  164. importstatus := importer.importstatus.ImportStatus[filename]
  165. importer.importstatus.mutex.RUnlock()
  166. linefrom, blockfrom := int64(0), int64(0)
  167. if importstatus != nil {
  168. linefrom, blockfrom = importstatus.LinesCount, importstatus.RecordsCount
  169. }
  170. if linefrom == 0 {
  171. logger.Info("import", "file", filename)
  172. } else {
  173. logger.Info("import", "file", filename, "from line", linefrom)
  174. }
  175. lines, records, e := importer.ImportFile(filename, linefrom, blockfrom, logstatus)
  176. if e != nil {
  177. err = e
  178. return
  179. }
  180. atomic.AddInt64(&linescount, lines-linefrom)
  181. atomic.AddInt64(&recordscount, records-blockfrom)
  182. usetime = time.Since(importer.currentstarttime)
  183. importer.importstatus.mutex.Lock()
  184. if logstatus {
  185. importer.importstatus.ImportStatus[filename] = &ImportStatus{
  186. LinesCount: lines,
  187. RecordsCount: records}
  188. }
  189. importer.importstatus.TotalUseTime = time.Since(importer.starttime)
  190. importer.importstatus.mutex.Unlock()
  191. importer.importstatus.Save()
  192. logger.Info("file", filename, "imported", records, "records")
  193. },
  194. )
  195. return true
  196. })
  197. wg.Wait()
  198. if e != nil {
  199. if os.IsNotExist(e) {
  200. err = merrs.NewError(`directory "`+importer.datapath+`" not exist specified by "datapath"`, e)
  201. } else {
  202. err = merrs.NewError(e)
  203. }
  204. return
  205. }
  206. return
  207. }
  208. func (importer *Importer) ImportFile(filepath string, linefrom, blockfrom int64, logstatus bool) (linecount, blockcount int64, err error) {
  209. f, e := os.Open(filepath)
  210. if e != nil {
  211. return linecount, blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filepath}})
  212. }
  213. defer f.Close()
  214. return importer.importReader(filepath, f, linefrom, blockfrom, logstatus)
  215. }
  216. func (importer *Importer) importReader(filename string, buf io.Reader, linefrom, blockfrom int64, logstatus bool) (linecount, blockcount int64, err error) {
  217. var filetype schema.FileType
  218. switch {
  219. case strings.Contains(filename, "_L1_"):
  220. filetype = schema.FT_LEVEL1
  221. case strings.Contains(filename, "_L2_"):
  222. filetype = schema.FT_LEVEL2
  223. case strings.Contains(filename, "_L3_"):
  224. filetype = schema.FT_LEVEL3
  225. case strings.Contains(filename, "_L4_"):
  226. filetype = schema.FT_LEVEL4
  227. case strings.Contains(filename, "_L5_"):
  228. filetype = schema.FT_LEVEL5
  229. case strings.Contains(filename, "_L6_"):
  230. filetype = schema.FT_LEVEL6
  231. case strings.Contains(filename, "_L7_"):
  232. filetype = schema.FT_LEVEL7
  233. case strings.Contains(filename, "_L8_"):
  234. filetype = schema.FT_LEVEL8
  235. case strings.Contains(filename, "MASTER"):
  236. filetype = schema.FT_MASTER
  237. case strings.Contains(filename, "EDGE"):
  238. filetype = schema.FT_EDGE
  239. default:
  240. err = merrs.NewError("filename does not conform to the agreed format " + filename)
  241. return
  242. }
  243. br, e := reader.NewBlockReader(filename, filetype, buf)
  244. if e != nil {
  245. return linecount, blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
  246. }
  247. lastlogtime := time.Now()
  248. skiplines := int(linefrom)
  249. blockcount = blockfrom
  250. doinglines := []int64{}
  251. var wg sync.WaitGroup
  252. defer importer.done()
  253. defer wg.Wait()
  254. for {
  255. if err != nil {
  256. break
  257. }
  258. block, line, linenumber, e := br.ReadBlock(skiplines)
  259. linecount = int64(linenumber)
  260. if e != nil {
  261. return linecount, blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  262. }
  263. if block == nil {
  264. return
  265. }
  266. blockcount++
  267. wg.Add(1)
  268. doingline := linecount
  269. doingblock := blockcount
  270. if logstatus {
  271. doinglines = append(doinglines, doingline)
  272. }
  273. e = importer.odbcqueryrc.ConcurCall(1, func() {
  274. defer wg.Done()
  275. e = importer.importRecord(block, line, filename, filetype, int(doingline))
  276. if e != nil {
  277. err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(doingline)}, {"line": line}})
  278. return
  279. }
  280. if logstatus {
  281. if doingline == doinglines[0] {
  282. importer.importstatus.mutex.Lock()
  283. importer.importstatus.ImportStatus[filename] = &ImportStatus{
  284. LinesCount: doingline,
  285. RecordsCount: doingblock,
  286. }
  287. importer.importstatus.TotalUseTime = time.Since(importer.starttime)
  288. importer.importstatus.Save()
  289. doinglines = doinglines[1:]
  290. if time.Since(lastlogtime) > 5*time.Second {
  291. logger.Info("file", filename, "imported", doingblock, "records")
  292. lastlogtime = time.Now()
  293. }
  294. importer.importstatus.mutex.Unlock()
  295. } else {
  296. for i, l := range doinglines {
  297. if l == doingline {
  298. doinglines = append(doinglines[:i], doinglines[i+1:]...)
  299. }
  300. }
  301. }
  302. }
  303. })
  304. if e != nil {
  305. return linecount, blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  306. }
  307. }
  308. return
  309. }
  310. func (importer *Importer) importRecord(record map[string]any, line string, filename string, filetype schema.FileType, linecount int) (err error) {
  311. if odbc.LogDebug {
  312. bs, e := json.MarshalIndent(record, "", " ")
  313. if e != nil {
  314. return merrs.NewError(e)
  315. }
  316. logger.Debug(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs)))
  317. }
  318. var classaliasname string
  319. switch filetype {
  320. case schema.FT_EDGE:
  321. graph.CacheEdgeInfo(record)
  322. default:
  323. classaliasname = string(filetype)
  324. err = importer.odbcimporter.InsertData(classaliasname, record)
  325. if err != nil {
  326. err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  327. return
  328. }
  329. }
  330. return
  331. }
  332. func (importer *Importer) alldone() {
  333. importer.odbcimporter.alldone()
  334. }
  335. func (importer *Importer) done() {
  336. importer.odbcimporter.done()
  337. }
  338. func Check() {
  339. client := odbc.ODBClient
  340. if client == nil {
  341. return
  342. }
  343. {
  344. // mql := "select id,uniqueid,tags,contain,day,vtime from level1 where uniqueid='E2E:OTR0002L'"
  345. mql := "select * from level1 where uniqueid='E2E:OTR0002L'"
  346. r, e := client.Query(mql).Do()
  347. if e != nil {
  348. panic(merrs.NewError(e))
  349. }
  350. bs, _ := json.MarshalIndent(r.Data, "", " ")
  351. fmt.Println(string(bs))
  352. }
  353. {
  354. mql := `match ("level1:E2E:OTR0002L")-[*]->(),("level1:E2E:OTR0002L")<-[*]-() in "level1","level2"`
  355. r, e := client.Query(mql).Do()
  356. if e != nil {
  357. panic(merrs.NewError(e))
  358. }
  359. bs, _ := json.MarshalIndent(r.Data, "", " ")
  360. fmt.Println(string(bs))
  361. }
  362. }