importer.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. package importer
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path/filepath"
  8. "strings"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. "git.wecise.com/wecise/cgimport/odbc"
  13. "git.wecise.com/wecise/cgimport/reader"
  14. "git.wecise.com/wecise/cgimport/schema"
  15. "git.wecise.com/wecise/util/filewalker"
  16. "git.wecise.com/wecise/util/merrs"
  17. "git.wecise.com/wecise/util/rc"
  18. )
  19. var mcfg = odbc.Config
  20. var logger = odbc.Logger
  21. type Importer struct {
  22. datapath string
  23. parallel int
  24. reload bool
  25. importrc *rc.RoutinesController
  26. odbcimporter *ODBCImporter
  27. }
  28. func ImportDir(datapath string, parallel int, reload bool) (totalfilescount, totalrecordscount int64, totalusetime time.Duration, filescount, recordscount int64, usetime time.Duration, err error) {
  29. importer := &Importer{
  30. datapath: datapath,
  31. parallel: parallel,
  32. reload: reload,
  33. importrc: rc.NewRoutinesController("", 100),
  34. odbcimporter: NewODBCImporter(),
  35. }
  36. return importer.Import()
  37. }
  38. func (importer *Importer) Import() (totalfilescount, totalrecordscount int64, totalusetime time.Duration, filescount, recordscount int64, usetime time.Duration, err error) {
  39. var cgirc = rc.NewRoutinesController("", importer.parallel)
  40. var wg sync.WaitGroup
  41. fw, e := filewalker.NewFileWalker([]string{importer.datapath}, ".*")
  42. if e != nil {
  43. err = e
  44. return
  45. }
  46. cgistatus := NewCGIStatus()
  47. if odbc.DevPhase&odbc.DP_PROCESSCONTINUE != 0 && !importer.reload {
  48. err = cgistatus.Load()
  49. if err != nil {
  50. return
  51. }
  52. } else {
  53. // reload
  54. // 清除已有类
  55. err = importer.odbcimporter.reload()
  56. if err != nil {
  57. return
  58. }
  59. }
  60. // 建类
  61. err = importer.odbcimporter.ReviseClassStruct()
  62. if err != nil {
  63. return
  64. }
  65. totalfilescount = int64(len(cgistatus.ImportStatus))
  66. for _, v := range cgistatus.ImportStatus {
  67. totalrecordscount += v.RecordsCount
  68. }
  69. totalusetime = cgistatus.TotalUseTime
  70. st := time.Now().Add(-totalusetime)
  71. cst := time.Now()
  72. // 遍历文件目录
  73. e = fw.List(func(basedir string, fpath string) bool {
  74. if err != nil {
  75. return false
  76. }
  77. filename := filepath.Join(basedir, fpath)
  78. wg.Add(1)
  79. cgirc.ConcurCall(1,
  80. func() {
  81. defer wg.Done()
  82. cgistatus.mutex.RLock()
  83. importstatus := cgistatus.ImportStatus[filename]
  84. cgistatus.mutex.RUnlock()
  85. if importstatus != nil {
  86. return
  87. }
  88. records, e := importer.ImportFile(filename)
  89. if e != nil {
  90. err = e
  91. return
  92. }
  93. atomic.AddInt64(&filescount, 1)
  94. atomic.AddInt64(&recordscount, records)
  95. atomic.AddInt64(&totalfilescount, 1)
  96. atomic.AddInt64(&totalrecordscount, records)
  97. usetime = time.Since(cst)
  98. totalusetime = time.Since(st)
  99. cgistatus.mutex.Lock()
  100. cgistatus.ImportStatus[filename] = &ImportStatus{RecordsCount: records}
  101. cgistatus.TotalUseTime = totalusetime
  102. cgistatus.mutex.Unlock()
  103. cgistatus.Save()
  104. },
  105. )
  106. return true
  107. })
  108. wg.Wait()
  109. if e != nil {
  110. if os.IsNotExist(e) {
  111. err = merrs.NewError(`directory "`+importer.datapath+`" not exist specified by "datapath"`, e)
  112. } else {
  113. err = merrs.NewError(e)
  114. }
  115. return
  116. }
  117. cgistatus.WaitSaveDone()
  118. importer.alldone()
  119. return
  120. }
  121. func (importer *Importer) ImportFile(filepath string) (blockcount int64, err error) {
  122. f, e := os.Open(filepath)
  123. if e != nil {
  124. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filepath}})
  125. }
  126. defer f.Close()
  127. return importer.importReader(filepath, f)
  128. }
  129. func (importer *Importer) importReader(filename string, buf io.Reader) (blockcount int64, err error) {
  130. var filetype schema.FileType
  131. switch {
  132. case strings.Contains(filename, "_L1_"):
  133. filetype = schema.FT_LEVEL1
  134. case strings.Contains(filename, "_L2_"):
  135. filetype = schema.FT_LEVEL2
  136. case strings.Contains(filename, "_L3_"):
  137. filetype = schema.FT_LEVEL3
  138. case strings.Contains(filename, "_L4_"):
  139. filetype = schema.FT_LEVEL4
  140. case strings.Contains(filename, "_L5_"):
  141. filetype = schema.FT_LEVEL5
  142. case strings.Contains(filename, "_L6_"):
  143. filetype = schema.FT_LEVEL6
  144. case strings.Contains(filename, "_L7_"):
  145. filetype = schema.FT_LEVEL7
  146. case strings.Contains(filename, "_L8_"):
  147. filetype = schema.FT_LEVEL8
  148. case strings.Contains(filename, "MASTER"):
  149. filetype = schema.FT_MASTER
  150. case strings.Contains(filename, "EDGE"):
  151. filetype = schema.FT_EDGE
  152. default:
  153. err = merrs.NewError("filename does not conform to the agreed format " + filename)
  154. return
  155. }
  156. br, e := reader.NewBlockReader(filename, filetype, buf)
  157. if e != nil {
  158. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
  159. }
  160. var wg sync.WaitGroup
  161. defer importer.done()
  162. defer wg.Wait()
  163. for {
  164. if err != nil {
  165. break
  166. }
  167. block, line, linecount, e := br.ReadBlock()
  168. if e != nil {
  169. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  170. }
  171. if block == nil {
  172. return
  173. }
  174. wg.Add(1)
  175. e = importer.importrc.ConcurCall(1, func() {
  176. defer wg.Done()
  177. e = importer.importRecord(block, line, filename, filetype, linecount)
  178. if e != nil {
  179. err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  180. return
  181. }
  182. atomic.AddInt64(&blockcount, 1)
  183. })
  184. if e != nil {
  185. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  186. }
  187. }
  188. return
  189. }
  190. func (importer *Importer) importRecord(record map[string]any, line string, filename string, filetype schema.FileType, linecount int) (err error) {
  191. if odbc.LogDebug {
  192. bs, e := json.MarshalIndent(record, "", " ")
  193. if e != nil {
  194. return merrs.NewError(e)
  195. }
  196. logger.Debug(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs)))
  197. }
  198. var classname string
  199. switch filetype {
  200. case schema.FT_EDGE:
  201. err = importer.odbcimporter.InsertEdge(record)
  202. if err != nil {
  203. err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  204. return
  205. }
  206. default:
  207. classname = string(filetype)
  208. err = importer.odbcimporter.InsertData(classname, record)
  209. if err != nil {
  210. err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  211. return
  212. }
  213. }
  214. return
  215. }
  216. func (importer *Importer) alldone() {
  217. importer.odbcimporter.alldone()
  218. }
  219. func (importer *Importer) done() {
  220. importer.odbcimporter.done()
  221. }
  222. func Check() {
  223. client := odbc.ODBClient
  224. if client == nil {
  225. return
  226. }
  227. {
  228. mql := "select id,uniqueid,tags,contain,day,vtime from level1 where uniqueid='E2E:OTR0002L'"
  229. r, e := client.Query(mql).Do()
  230. if e != nil {
  231. panic(merrs.NewError(e))
  232. }
  233. bs, _ := json.MarshalIndent(r.Data, "", " ")
  234. fmt.Println(string(bs))
  235. }
  236. }