importer.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. package importer
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path/filepath"
  8. "regexp"
  9. "strings"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "git.wecise.com/wecise/cgimport/graph"
  14. "git.wecise.com/wecise/cgimport/odbc"
  15. "git.wecise.com/wecise/cgimport/reader"
  16. "git.wecise.com/wecise/cgimport/schema"
  17. "git.wecise.com/wecise/util/filewalker"
  18. "git.wecise.com/wecise/util/merrs"
  19. "git.wecise.com/wecise/util/rc"
  20. )
  21. var mcfg = odbc.Config
  22. var logger = odbc.Logger
  23. type Importer struct {
  24. datapath string
  25. parallel int
  26. reload bool
  27. importstatus *CGIStatus
  28. fileimportrc *rc.RoutinesController
  29. odbcqueryrc *rc.RoutinesController
  30. odbcimporter *ODBCImporter
  31. starttime time.Time
  32. currentstarttime time.Time
  33. }
  34. func ImportDir(datapath string, parallel int, reload bool) (totalfilescount, totalrecordscount int64, totalusetime time.Duration, filescount, recordscount int64, usetime time.Duration, err error) {
  35. importer := &Importer{
  36. datapath: datapath,
  37. parallel: parallel,
  38. reload: reload,
  39. importstatus: NewCGIStatus(),
  40. fileimportrc: rc.NewRoutinesController("", parallel),
  41. odbcqueryrc: rc.NewRoutinesController("", mcfg.GetInt("odbc.concurrent.limit", parallel*5)),
  42. odbcimporter: NewODBCImporter(),
  43. }
  44. return importer.Import()
  45. }
  46. func (importer *Importer) Import() (totalfilescount, totalrecordscount int64, totalusetime time.Duration, filescount, recordscount int64, usetime time.Duration, err error) {
  47. if odbc.DevPhase&odbc.DP_PROCESSCONTINUE != 0 && !importer.reload {
  48. err = importer.importstatus.Load()
  49. if err != nil {
  50. return
  51. }
  52. err = importer.odbcimporter.init()
  53. if err != nil {
  54. return
  55. }
  56. } else {
  57. // reload
  58. // 清除已有类
  59. err = importer.odbcimporter.reload()
  60. if err != nil {
  61. return
  62. }
  63. }
  64. // 建类
  65. err = importer.odbcimporter.ReviseClassStruct()
  66. if err != nil {
  67. return
  68. }
  69. totalfilescount = int64(len(importer.importstatus.ImportStatus))
  70. for _, v := range importer.importstatus.ImportStatus {
  71. totalrecordscount += v.RecordsCount
  72. }
  73. totalusetime = importer.importstatus.TotalUseTime
  74. importer.starttime = time.Now().Add(-totalusetime)
  75. importer.currentstarttime = time.Now()
  76. reedgefile := regexp.MustCompile("(?i).*edge.*.csv")
  77. fc, rc, ut, e := importer.ImportEdgeFiles(reedgefile)
  78. if e != nil {
  79. err = e
  80. return
  81. }
  82. totalfilescount += fc
  83. totalrecordscount += rc
  84. filescount += fc
  85. recordscount += rc
  86. usetime += ut
  87. totalusetime = importer.importstatus.TotalUseTime
  88. fc, rc, ut, e = importer.ImportNonEdgeFiles(reedgefile)
  89. if e != nil {
  90. err = e
  91. return
  92. }
  93. totalfilescount += fc
  94. totalrecordscount += rc
  95. filescount += fc
  96. recordscount += rc
  97. usetime += ut
  98. totalusetime = importer.importstatus.TotalUseTime
  99. importer.importstatus.WaitSaveDone()
  100. importer.alldone()
  101. return
  102. }
  103. func (importer *Importer) ImportEdgeFiles(reedgefile *regexp.Regexp) (filescount, recordscount int64, usetime time.Duration, err error) {
  104. return importer.ImportFiles(func(basedir string, fpath string) FWOP {
  105. if !reedgefile.MatchString(filepath.Base(fpath)) {
  106. // 忽略非EDGE文件
  107. return FWOP_IGNORE
  108. }
  109. return FWOP_CONTINUE
  110. })
  111. }
  112. func (importer *Importer) ImportNonEdgeFiles(reedgefile *regexp.Regexp) (filescount, recordscount int64, usetime time.Duration, err error) {
  113. return importer.ImportFiles(func(basedir string, fpath string) FWOP {
  114. if reedgefile.MatchString(filepath.Base(fpath)) {
  115. // 忽略EDGE文件
  116. return FWOP_IGNORE
  117. }
  118. return FWOP_CONTINUE
  119. })
  120. }
  121. type FWOP int
  122. const (
  123. FWOP_IGNORE FWOP = iota + 1
  124. FWOP_BREAK
  125. FWOP_CONTINUE
  126. )
  127. func (importer *Importer) ImportFiles(fwop func(basedir string, fpath string) FWOP) (filescount, recordscount int64, usetime time.Duration, err error) {
  128. // 遍历文件目录
  129. var wg sync.WaitGroup
  130. fw, e := filewalker.NewFileWalker([]string{importer.datapath}, ".*")
  131. if e != nil {
  132. err = e
  133. return
  134. }
  135. e = fw.List(func(basedir string, fpath string) bool {
  136. if err != nil {
  137. // 前方发生错误,结束遍历
  138. return false
  139. }
  140. if strings.Contains(fpath, string(filepath.Separator)) {
  141. // 忽略子目录,fw.List有序,目录排在文件后面,遇到子目录即可结束遍历
  142. return false
  143. }
  144. switch fwop(basedir, fpath) {
  145. case FWOP_IGNORE:
  146. // 忽略当前文件,继续处理下一文件
  147. return true
  148. case FWOP_BREAK:
  149. // 结束遍历
  150. return false
  151. case FWOP_CONTINUE:
  152. default:
  153. }
  154. // 继续处理当前文件
  155. filename := filepath.Join(basedir, fpath)
  156. wg.Add(1)
  157. // 并发处理
  158. importer.fileimportrc.ConcurCall(1,
  159. func() {
  160. defer wg.Done()
  161. importer.importstatus.mutex.RLock()
  162. importstatus := importer.importstatus.ImportStatus[filename]
  163. importer.importstatus.mutex.RUnlock()
  164. importedrecordscount := int64(0)
  165. if importstatus != nil {
  166. importedrecordscount = importstatus.RecordsCount
  167. return
  168. }
  169. records, e := importer.ImportFile(filename, importedrecordscount)
  170. if e != nil {
  171. err = e
  172. return
  173. }
  174. atomic.AddInt64(&filescount, 1)
  175. atomic.AddInt64(&recordscount, records)
  176. usetime = time.Since(importer.currentstarttime)
  177. importer.importstatus.mutex.Lock()
  178. importer.importstatus.ImportStatus[filename] = &ImportStatus{RecordsCount: importedrecordscount + records}
  179. importer.importstatus.TotalUseTime = time.Since(importer.starttime)
  180. importer.importstatus.mutex.Unlock()
  181. importer.importstatus.Save()
  182. },
  183. )
  184. return true
  185. })
  186. wg.Wait()
  187. if e != nil {
  188. if os.IsNotExist(e) {
  189. err = merrs.NewError(`directory "`+importer.datapath+`" not exist specified by "datapath"`, e)
  190. } else {
  191. err = merrs.NewError(e)
  192. }
  193. return
  194. }
  195. return
  196. }
  197. func (importer *Importer) ImportFile(filepath string, skiprecordscount int64) (blockcount int64, err error) {
  198. f, e := os.Open(filepath)
  199. if e != nil {
  200. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filepath}})
  201. }
  202. defer f.Close()
  203. return importer.importReader(filepath, f, skiprecordscount)
  204. }
  205. func (importer *Importer) importReader(filename string, buf io.Reader, skiprecordscount int64) (blockcount int64, err error) {
  206. var filetype schema.FileType
  207. switch {
  208. case strings.Contains(filename, "_L1_"):
  209. filetype = schema.FT_LEVEL1
  210. case strings.Contains(filename, "_L2_"):
  211. filetype = schema.FT_LEVEL2
  212. case strings.Contains(filename, "_L3_"):
  213. filetype = schema.FT_LEVEL3
  214. case strings.Contains(filename, "_L4_"):
  215. filetype = schema.FT_LEVEL4
  216. case strings.Contains(filename, "_L5_"):
  217. filetype = schema.FT_LEVEL5
  218. case strings.Contains(filename, "_L6_"):
  219. filetype = schema.FT_LEVEL6
  220. case strings.Contains(filename, "_L7_"):
  221. filetype = schema.FT_LEVEL7
  222. case strings.Contains(filename, "_L8_"):
  223. filetype = schema.FT_LEVEL8
  224. case strings.Contains(filename, "MASTER"):
  225. filetype = schema.FT_MASTER
  226. case strings.Contains(filename, "EDGE"):
  227. filetype = schema.FT_EDGE
  228. default:
  229. err = merrs.NewError("filename does not conform to the agreed format " + filename)
  230. return
  231. }
  232. br, e := reader.NewBlockReader(filename, filetype, buf)
  233. if e != nil {
  234. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
  235. }
  236. var wg sync.WaitGroup
  237. defer importer.done()
  238. defer wg.Wait()
  239. n := int64(0)
  240. for {
  241. if err != nil {
  242. break
  243. }
  244. block, line, linecount, e := br.ReadBlock()
  245. if e != nil {
  246. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  247. }
  248. if block == nil {
  249. return
  250. }
  251. n++
  252. if n <= skiprecordscount {
  253. continue
  254. }
  255. wg.Add(1)
  256. e = importer.odbcqueryrc.ConcurCall(1, func() {
  257. defer wg.Done()
  258. e = importer.importRecord(block, line, filename, filetype, linecount)
  259. if e != nil {
  260. err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  261. return
  262. }
  263. atomic.AddInt64(&blockcount, 1)
  264. })
  265. if e != nil {
  266. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  267. }
  268. }
  269. return
  270. }
  271. func (importer *Importer) importRecord(record map[string]any, line string, filename string, filetype schema.FileType, linecount int) (err error) {
  272. if odbc.LogDebug {
  273. bs, e := json.MarshalIndent(record, "", " ")
  274. if e != nil {
  275. return merrs.NewError(e)
  276. }
  277. logger.Debug(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs)))
  278. }
  279. var classaliasname string
  280. switch filetype {
  281. case schema.FT_EDGE:
  282. graph.CacheEdgeInfo(record)
  283. default:
  284. classaliasname = string(filetype)
  285. err = importer.odbcimporter.InsertData(classaliasname, record)
  286. if err != nil {
  287. err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  288. return
  289. }
  290. }
  291. return
  292. }
  293. func (importer *Importer) alldone() {
  294. importer.odbcimporter.alldone()
  295. }
  296. func (importer *Importer) done() {
  297. importer.odbcimporter.done()
  298. }
  299. func Check() {
  300. client := odbc.ODBClient
  301. if client == nil {
  302. return
  303. }
  304. {
  305. // mql := "select id,uniqueid,tags,contain,day,vtime from level1 where uniqueid='E2E:OTR0002L'"
  306. mql := "select * from level1 where uniqueid='E2E:OTR0002L'"
  307. r, e := client.Query(mql).Do()
  308. if e != nil {
  309. panic(merrs.NewError(e))
  310. }
  311. bs, _ := json.MarshalIndent(r.Data, "", " ")
  312. fmt.Println(string(bs))
  313. }
  314. {
  315. mql := `match ("level1:E2E:OTR0002L")-[*]->(),("level1:E2E:OTR0002L")<-[*]-() in "level1","level2"`
  316. r, e := client.Query(mql).Do()
  317. if e != nil {
  318. panic(merrs.NewError(e))
  319. }
  320. bs, _ := json.MarshalIndent(r.Data, "", " ")
  321. fmt.Println(string(bs))
  322. }
  323. }