importer.go 8.3 KB


  1. package importer
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path/filepath"
  8. "strings"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. "git.wecise.com/wecise/cgimport/odbc"
  13. "git.wecise.com/wecise/cgimport/reader"
  14. "git.wecise.com/wecise/cgimport/schema"
  15. "git.wecise.com/wecise/util/filewalker"
  16. "git.wecise.com/wecise/util/merrs"
  17. "git.wecise.com/wecise/util/rc"
  18. )
  19. var mcfg = odbc.Config
  20. var logger = odbc.Logger
  21. type Importer struct {
  22. datapath string
  23. parallel int
  24. reload bool
  25. importrc *rc.RoutinesController
  26. odbcimporter *ODBCImporter
  27. }
  28. type ImportStatus struct {
  29. RecordsCount int64
  30. }
  31. type CGIStatus struct {
  32. mutex sync.RWMutex
  33. TotalUseTime time.Duration
  34. ImportStatus map[string]*ImportStatus
  35. rc *rc.RoutinesController
  36. lasterror error
  37. lastsavetime time.Time
  38. waitdone chan any
  39. }
  40. var cgistatusfile = mcfg.GetString("cgi.statusfile", "/opt/matrix/var/cgimport.status.txt")
  41. func NewCGIStatus() *CGIStatus {
  42. return &CGIStatus{
  43. ImportStatus: map[string]*ImportStatus{},
  44. rc: rc.NewRoutinesController("", 1),
  45. waitdone: make(chan any, 1),
  46. }
  47. }
  48. func (cgistatus *CGIStatus) Load() error {
  49. cgistatusbs, e := os.ReadFile(cgistatusfile)
  50. if e != nil && !os.IsNotExist(e) {
  51. return e
  52. }
  53. if len(cgistatusbs) > 0 {
  54. e = json.Unmarshal(cgistatusbs, &cgistatus)
  55. if e != nil {
  56. logger.Warn(e)
  57. }
  58. }
  59. return nil
  60. }
  61. func (cgistatus *CGIStatus) WaitSaveDone() {
  62. cgistatus.waitdone <- 1
  63. cgistatus.rc.WaitDone()
  64. }
  65. func (cgistatus *CGIStatus) Save() (err error) {
  66. cgistatus.rc.CallLast2Only(func() {
  67. if !cgistatus.lastsavetime.Equal(time.Time{}) {
  68. interval := time.Since(cgistatus.lastsavetime)
  69. if interval < 1*time.Second {
  70. t := time.NewTimer(1*time.Second - interval)
  71. select {
  72. case <-t.C:
  73. case v := <-cgistatus.waitdone:
  74. cgistatus.waitdone <- v
  75. }
  76. }
  77. }
  78. cgistatus.mutex.RLock()
  79. cgistatusbs, e := json.MarshalIndent(cgistatus, "", " ")
  80. cgistatus.mutex.RUnlock()
  81. if e != nil {
  82. cgistatus.lasterror = e
  83. return
  84. }
  85. e = os.MkdirAll(filepath.Dir(cgistatusfile), os.ModePerm)
  86. if e != nil {
  87. cgistatus.lasterror = e
  88. return
  89. }
  90. e = os.WriteFile(cgistatusfile, cgistatusbs, os.ModePerm)
  91. if e != nil {
  92. cgistatus.lasterror = e
  93. return
  94. }
  95. cgistatus.lastsavetime = time.Now()
  96. // fmt.Println(cgistatus.lastsavetime)
  97. })
  98. return cgistatus.lasterror
  99. }
  100. func ImportDir(datapath string, parallel int, reload bool) (totalfilescount, totalrecordscount int64, totalusetime time.Duration, filescount, recordscount int64, usetime time.Duration, err error) {
  101. importer := &Importer{
  102. datapath: datapath,
  103. parallel: parallel,
  104. reload: reload,
  105. importrc: rc.NewRoutinesController("", 100),
  106. odbcimporter: NewODBCImporter(),
  107. }
  108. return importer.Import()
  109. }
  110. func (importer *Importer) Import() (totalfilescount, totalrecordscount int64, totalusetime time.Duration, filescount, recordscount int64, usetime time.Duration, err error) {
  111. var cgirc = rc.NewRoutinesController("", importer.parallel)
  112. var wg sync.WaitGroup
  113. fw, e := filewalker.NewFileWalker([]string{importer.datapath}, ".*")
  114. if e != nil {
  115. err = e
  116. return
  117. }
  118. cgistatus := NewCGIStatus()
  119. if odbc.DevPhase&odbc.DP_PROCESSCONTINUE != 0 && !importer.reload {
  120. err = cgistatus.Load()
  121. if err != nil {
  122. return
  123. }
  124. } else {
  125. // reload
  126. // 清除已有类
  127. err = importer.odbcimporter.reload()
  128. if err != nil {
  129. return
  130. }
  131. }
  132. // 建类
  133. err = importer.odbcimporter.ReviseClassStruct()
  134. if err != nil {
  135. return
  136. }
  137. totalfilescount = int64(len(cgistatus.ImportStatus))
  138. for _, v := range cgistatus.ImportStatus {
  139. totalrecordscount += v.RecordsCount
  140. }
  141. totalusetime = cgistatus.TotalUseTime
  142. st := time.Now().Add(-totalusetime)
  143. cst := time.Now()
  144. // 遍历文件目录
  145. e = fw.List(func(basedir string, fpath string) bool {
  146. if err != nil {
  147. return false
  148. }
  149. filename := filepath.Join(basedir, fpath)
  150. wg.Add(1)
  151. cgirc.ConcurCall(1,
  152. func() {
  153. defer wg.Done()
  154. cgistatus.mutex.RLock()
  155. importstatus := cgistatus.ImportStatus[filename]
  156. cgistatus.mutex.RUnlock()
  157. if importstatus != nil {
  158. return
  159. }
  160. records, e := importer.ImportFile(filename)
  161. if e != nil {
  162. err = e
  163. return
  164. }
  165. atomic.AddInt64(&filescount, 1)
  166. atomic.AddInt64(&recordscount, records)
  167. atomic.AddInt64(&totalfilescount, 1)
  168. atomic.AddInt64(&totalrecordscount, records)
  169. usetime = time.Since(cst)
  170. totalusetime = time.Since(st)
  171. cgistatus.mutex.Lock()
  172. cgistatus.ImportStatus[filename] = &ImportStatus{RecordsCount: records}
  173. cgistatus.TotalUseTime = totalusetime
  174. cgistatus.mutex.Unlock()
  175. cgistatus.Save()
  176. },
  177. )
  178. return true
  179. })
  180. wg.Wait()
  181. if e != nil {
  182. if os.IsNotExist(e) {
  183. err = merrs.NewError(`directory "`+importer.datapath+`" not exist specified by "datapath"`, e)
  184. } else {
  185. err = merrs.NewError(e)
  186. }
  187. return
  188. }
  189. cgistatus.WaitSaveDone()
  190. importer.alldone()
  191. return
  192. }
  193. func (importer *Importer) ImportFile(filepath string) (blockcount int64, err error) {
  194. f, e := os.Open(filepath)
  195. if e != nil {
  196. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filepath}})
  197. }
  198. defer f.Close()
  199. return importer.importReader(filepath, f)
  200. }
  201. func (importer *Importer) importReader(filename string, buf io.Reader) (blockcount int64, err error) {
  202. var filetype schema.FileType
  203. switch {
  204. case strings.Contains(filename, "_L1_"):
  205. filetype = schema.FT_LEVEL1
  206. case strings.Contains(filename, "_L2_"):
  207. filetype = schema.FT_LEVEL2
  208. case strings.Contains(filename, "_L3_"):
  209. filetype = schema.FT_LEVEL3
  210. case strings.Contains(filename, "_L4_"):
  211. filetype = schema.FT_LEVEL4
  212. case strings.Contains(filename, "_L5_"):
  213. filetype = schema.FT_LEVEL5
  214. case strings.Contains(filename, "_L6_"):
  215. filetype = schema.FT_LEVEL6
  216. case strings.Contains(filename, "_L7_"):
  217. filetype = schema.FT_LEVEL7
  218. case strings.Contains(filename, "_L8_"):
  219. filetype = schema.FT_LEVEL8
  220. case strings.Contains(filename, "MASTER"):
  221. filetype = schema.FT_MASTER
  222. case strings.Contains(filename, "EDGE"):
  223. filetype = schema.FT_EDGE
  224. default:
  225. err = merrs.NewError("filename does not conform to the agreed format " + filename)
  226. return
  227. }
  228. br, e := reader.NewBlockReader(filename, filetype, buf)
  229. if e != nil {
  230. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
  231. }
  232. var wg sync.WaitGroup
  233. defer importer.done()
  234. defer wg.Wait()
  235. for {
  236. if err != nil {
  237. break
  238. }
  239. block, line, linecount, e := br.ReadBlock()
  240. if e != nil {
  241. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  242. }
  243. if block == nil {
  244. return
  245. }
  246. wg.Add(1)
  247. e = importer.importrc.ConcurCall(1, func() {
  248. defer wg.Done()
  249. e = importer.importRecord(block, line, filename, filetype, linecount)
  250. if e != nil {
  251. err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  252. return
  253. }
  254. atomic.AddInt64(&blockcount, 1)
  255. })
  256. if e != nil {
  257. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  258. }
  259. }
  260. return
  261. }
  262. func (importer *Importer) importRecord(record map[string]any, line string, filename string, filetype schema.FileType, linecount int) (err error) {
  263. if odbc.LogDebug {
  264. bs, e := json.MarshalIndent(record, "", " ")
  265. if e != nil {
  266. return merrs.NewError(e)
  267. }
  268. logger.Debug(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs)))
  269. }
  270. var classname string
  271. switch filetype {
  272. case schema.FT_EDGE:
  273. err = importer.odbcimporter.InsertEdge(record)
  274. if err != nil {
  275. err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  276. return
  277. }
  278. default:
  279. classname = string(filetype)
  280. err = importer.odbcimporter.InsertData(classname, record)
  281. if err != nil {
  282. err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  283. return
  284. }
  285. }
  286. return
  287. }
  288. func (importer *Importer) alldone() {
  289. importer.odbcimporter.alldone()
  290. }
  291. func (importer *Importer) done() {
  292. importer.odbcimporter.done()
  293. }
  294. func Check() {
  295. client := odbc.ODBClient
  296. if client == nil {
  297. return
  298. }
  299. {
  300. mql := "select id,uniqueid,tags,contain,day,vtime from /m3cnet/master where uniqueid='E2E:OTR0002L'"
  301. r, e := client.Query(mql).Do()
  302. if e != nil {
  303. panic(merrs.NewError(e))
  304. }
  305. bs, _ := json.MarshalIndent(r.Data, "", " ")
  306. fmt.Println(string(bs))
  307. }
  308. }