importer.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. package importer
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path/filepath"
  8. "regexp"
  9. "strings"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "git.wecise.com/wecise/cgimport/graph"
  14. "git.wecise.com/wecise/cgimport/odbc"
  15. "git.wecise.com/wecise/cgimport/reader"
  16. "git.wecise.com/wecise/cgimport/schema"
  17. "git.wecise.com/wecise/util/filewalker"
  18. "git.wecise.com/wecise/util/rc"
  19. "github.com/wecisecode/util/merrs"
  20. )
  21. var mcfg = odbc.Config
  22. var logger = odbc.Logger
  23. type Importer struct {
  24. datapath string
  25. parallel int
  26. reload bool
  27. importstatus *CGIStatus
  28. fileimportrc *rc.RoutinesController
  29. odbcqueryrc *rc.RoutinesController
  30. odbcimporter *ODBCImporter
  31. starttime time.Time
  32. currentstarttime time.Time
  33. }
  34. func ImportDir(datapath string, parallel int, reload bool) (totalfilescount, totalrecordscount int64, totalusetime time.Duration, filescount, recordscount int64, usetime time.Duration, err error) {
  35. concurlimt := mcfg.GetInt("odbc.concurrent.limit", 50)
  36. importer := &Importer{
  37. datapath: datapath,
  38. parallel: parallel,
  39. reload: reload,
  40. importstatus: NewCGIStatus(),
  41. fileimportrc: rc.NewRoutinesController("", parallel),
  42. odbcqueryrc: rc.NewRoutinesControllerLimit("", concurlimt, concurlimt*2),
  43. odbcimporter: NewODBCImporter(),
  44. }
  45. return importer.Import()
  46. }
  47. func (importer *Importer) Import() (totalfilescount, totalrecordscount int64, totalusetime time.Duration, filescount, recordscount int64, usetime time.Duration, err error) {
  48. if odbc.DevPhase&odbc.DP_PROCESSCONTINUE != 0 && !importer.reload {
  49. err = importer.importstatus.Load()
  50. if err != nil {
  51. return
  52. }
  53. err = importer.odbcimporter.init()
  54. if err != nil {
  55. return
  56. }
  57. } else {
  58. // reload
  59. // 清除已有类
  60. err = importer.odbcimporter.reload()
  61. if err != nil {
  62. return
  63. }
  64. }
  65. // 建类
  66. err = importer.odbcimporter.ReviseClassStruct()
  67. if err != nil {
  68. return
  69. }
  70. totalfilescount = int64(len(importer.importstatus.ImportStatus))
  71. for _, v := range importer.importstatus.ImportStatus {
  72. totalrecordscount += v.RecordsCount
  73. }
  74. totalusetime = importer.importstatus.TotalUseTime
  75. importer.starttime = time.Now().Add(-totalusetime)
  76. importer.currentstarttime = time.Now()
  77. reedgefile := regexp.MustCompile("(?i).*edge.*.csv")
  78. fc, rc, ut, e := importer.ImportEdgeFiles(reedgefile)
  79. if e != nil {
  80. err = e
  81. return
  82. }
  83. totalfilescount += fc
  84. totalrecordscount += rc
  85. filescount += fc
  86. recordscount += rc
  87. usetime += ut
  88. totalusetime = importer.importstatus.TotalUseTime
  89. fc, rc, ut, e = importer.ImportNonEdgeFiles(reedgefile)
  90. if e != nil {
  91. err = e
  92. return
  93. }
  94. totalfilescount += fc
  95. totalrecordscount += rc
  96. filescount += fc
  97. recordscount += rc
  98. usetime += ut
  99. totalusetime = importer.importstatus.TotalUseTime
  100. importer.importstatus.WaitSaveDone()
  101. importer.alldone()
  102. return
  103. }
  104. func (importer *Importer) ImportEdgeFiles(reedgefile *regexp.Regexp) (filescount, recordscount int64, usetime time.Duration, err error) {
  105. return importer.ImportFiles(func(basedir string, fpath string) FWOP {
  106. if !reedgefile.MatchString(filepath.Base(fpath)) {
  107. // 忽略非EDGE文件
  108. return FWOP_IGNORE
  109. }
  110. return FWOP_CONTINUE
  111. })
  112. }
  113. func (importer *Importer) ImportNonEdgeFiles(reedgefile *regexp.Regexp) (filescount, recordscount int64, usetime time.Duration, err error) {
  114. return importer.ImportFiles(func(basedir string, fpath string) FWOP {
  115. if reedgefile.MatchString(filepath.Base(fpath)) {
  116. // 忽略EDGE文件
  117. return FWOP_IGNORE
  118. }
  119. return FWOP_CONTINUE
  120. })
  121. }
  122. type FWOP int
  123. const (
  124. FWOP_IGNORE FWOP = iota + 1
  125. FWOP_BREAK
  126. FWOP_CONTINUE
  127. )
  128. func (importer *Importer) ImportFiles(fwop func(basedir string, fpath string) FWOP) (filescount, recordscount int64, usetime time.Duration, err error) {
  129. // 遍历文件目录
  130. var wg sync.WaitGroup
  131. fw, e := filewalker.NewFileWalker([]string{importer.datapath}, ".*")
  132. if e != nil {
  133. err = e
  134. return
  135. }
  136. e = fw.List(func(basedir string, fpath string) bool {
  137. if err != nil {
  138. // 前方发生错误,结束遍历
  139. return false
  140. }
  141. if strings.Contains(fpath, string(filepath.Separator)) {
  142. // 忽略子目录,fw.List有序,目录排在文件后面,遇到子目录即可结束遍历
  143. return false
  144. }
  145. switch fwop(basedir, fpath) {
  146. case FWOP_IGNORE:
  147. // 忽略当前文件,继续处理下一文件
  148. return true
  149. case FWOP_BREAK:
  150. // 结束遍历
  151. return false
  152. case FWOP_CONTINUE:
  153. default:
  154. }
  155. // 继续处理当前文件
  156. filename := filepath.Join(basedir, fpath)
  157. wg.Add(1)
  158. // 并发处理
  159. importer.fileimportrc.ConcurCall(1,
  160. func() {
  161. defer wg.Done()
  162. logger.Info("import", "file", filename)
  163. importer.importstatus.mutex.RLock()
  164. importstatus := importer.importstatus.ImportStatus[filename]
  165. importer.importstatus.mutex.RUnlock()
  166. importedrecordscount := int64(0)
  167. if importstatus != nil {
  168. importedrecordscount = importstatus.RecordsCount
  169. return
  170. }
  171. records, e := importer.ImportFile(filename, importedrecordscount)
  172. if e != nil {
  173. err = e
  174. return
  175. }
  176. atomic.AddInt64(&filescount, 1)
  177. atomic.AddInt64(&recordscount, records)
  178. usetime = time.Since(importer.currentstarttime)
  179. importer.importstatus.mutex.Lock()
  180. importer.importstatus.ImportStatus[filename] = &ImportStatus{RecordsCount: importedrecordscount + records}
  181. importer.importstatus.TotalUseTime = time.Since(importer.starttime)
  182. importer.importstatus.mutex.Unlock()
  183. importer.importstatus.Save()
  184. logger.Info("file", filename, "imported", records, "records")
  185. },
  186. )
  187. return true
  188. })
  189. wg.Wait()
  190. if e != nil {
  191. if os.IsNotExist(e) {
  192. err = merrs.NewError(`directory "`+importer.datapath+`" not exist specified by "datapath"`, e)
  193. } else {
  194. err = merrs.NewError(e)
  195. }
  196. return
  197. }
  198. return
  199. }
  200. func (importer *Importer) ImportFile(filepath string, skiprecordscount int64) (blockcount int64, err error) {
  201. f, e := os.Open(filepath)
  202. if e != nil {
  203. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filepath}})
  204. }
  205. defer f.Close()
  206. return importer.importReader(filepath, f, skiprecordscount)
  207. }
  208. func (importer *Importer) importReader(filename string, buf io.Reader, skiprecordscount int64) (blockcount int64, err error) {
  209. var filetype schema.FileType
  210. switch {
  211. case strings.Contains(filename, "_L1_"):
  212. filetype = schema.FT_LEVEL1
  213. case strings.Contains(filename, "_L2_"):
  214. filetype = schema.FT_LEVEL2
  215. case strings.Contains(filename, "_L3_"):
  216. filetype = schema.FT_LEVEL3
  217. case strings.Contains(filename, "_L4_"):
  218. filetype = schema.FT_LEVEL4
  219. case strings.Contains(filename, "_L5_"):
  220. filetype = schema.FT_LEVEL5
  221. case strings.Contains(filename, "_L6_"):
  222. filetype = schema.FT_LEVEL6
  223. case strings.Contains(filename, "_L7_"):
  224. filetype = schema.FT_LEVEL7
  225. case strings.Contains(filename, "_L8_"):
  226. filetype = schema.FT_LEVEL8
  227. case strings.Contains(filename, "MASTER"):
  228. filetype = schema.FT_MASTER
  229. case strings.Contains(filename, "EDGE"):
  230. filetype = schema.FT_EDGE
  231. default:
  232. err = merrs.NewError("filename does not conform to the agreed format " + filename)
  233. return
  234. }
  235. br, e := reader.NewBlockReader(filename, filetype, buf)
  236. if e != nil {
  237. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
  238. }
  239. var wg sync.WaitGroup
  240. defer importer.done()
  241. defer wg.Wait()
  242. n := int64(0)
  243. for {
  244. if err != nil {
  245. break
  246. }
  247. block, line, linecount, e := br.ReadBlock()
  248. if e != nil {
  249. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  250. }
  251. if block == nil {
  252. return
  253. }
  254. n++
  255. if n <= skiprecordscount {
  256. continue
  257. }
  258. wg.Add(1)
  259. e = importer.odbcqueryrc.ConcurCall(1, func() {
  260. defer wg.Done()
  261. e = importer.importRecord(block, line, filename, filetype, linecount)
  262. if e != nil {
  263. err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  264. return
  265. }
  266. atomic.AddInt64(&blockcount, 1)
  267. })
  268. if e != nil {
  269. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  270. }
  271. }
  272. return
  273. }
  274. func (importer *Importer) importRecord(record map[string]any, line string, filename string, filetype schema.FileType, linecount int) (err error) {
  275. if odbc.LogDebug {
  276. bs, e := json.MarshalIndent(record, "", " ")
  277. if e != nil {
  278. return merrs.NewError(e)
  279. }
  280. logger.Debug(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs)))
  281. }
  282. var classaliasname string
  283. switch filetype {
  284. case schema.FT_EDGE:
  285. graph.CacheEdgeInfo(record)
  286. default:
  287. classaliasname = string(filetype)
  288. err = importer.odbcimporter.InsertData(classaliasname, record)
  289. if err != nil {
  290. err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  291. return
  292. }
  293. }
  294. return
  295. }
  296. func (importer *Importer) alldone() {
  297. importer.odbcimporter.alldone()
  298. }
  299. func (importer *Importer) done() {
  300. importer.odbcimporter.done()
  301. }
  302. func Check() {
  303. client := odbc.ODBClient
  304. if client == nil {
  305. return
  306. }
  307. {
  308. // mql := "select id,uniqueid,tags,contain,day,vtime from level1 where uniqueid='E2E:OTR0002L'"
  309. mql := "select * from level1 where uniqueid='E2E:OTR0002L'"
  310. r, e := client.Query(mql).Do()
  311. if e != nil {
  312. panic(merrs.NewError(e))
  313. }
  314. bs, _ := json.MarshalIndent(r.Data, "", " ")
  315. fmt.Println(string(bs))
  316. }
  317. {
  318. mql := `match ("level1:E2E:OTR0002L")-[*]->(),("level1:E2E:OTR0002L")<-[*]-() in "level1","level2"`
  319. r, e := client.Query(mql).Do()
  320. if e != nil {
  321. panic(merrs.NewError(e))
  322. }
  323. bs, _ := json.MarshalIndent(r.Data, "", " ")
  324. fmt.Println(string(bs))
  325. }
  326. }