importer.go 8.2 KB


  1. package importer
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path/filepath"
  8. "strings"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. "git.wecise.com/wecise/cgimport/odbc"
  13. "git.wecise.com/wecise/cgimport/reader"
  14. "git.wecise.com/wecise/util/filewalker"
  15. "git.wecise.com/wecise/util/merrs"
  16. "git.wecise.com/wecise/util/rc"
  17. )
  18. var mcfg = odbc.Config
  19. var logger = odbc.Logger
  20. type Importer struct {
  21. datapath string
  22. parallel int
  23. importrc *rc.RoutinesController
  24. odbcimporter *ODBCImporter
  25. }
  26. type ImportStatus struct {
  27. RecordsCount int64
  28. }
  29. type CGIStatus struct {
  30. mutex sync.RWMutex
  31. TotalUseTime time.Duration
  32. ImportStatus map[string]*ImportStatus
  33. rc *rc.RoutinesController
  34. lasterror error
  35. lastsavetime time.Time
  36. waitdone chan any
  37. }
  38. var cgistatusfile = mcfg.GetString("cgi.statusfile", "/opt/matrix/var/cgimport.status.txt")
  39. func NewCGIStatus() *CGIStatus {
  40. return &CGIStatus{
  41. ImportStatus: map[string]*ImportStatus{},
  42. rc: rc.NewRoutinesController("", 1),
  43. waitdone: make(chan any, 1),
  44. }
  45. }
  46. func (cgistatus *CGIStatus) Load() error {
  47. cgistatusbs, e := os.ReadFile(cgistatusfile)
  48. if e != nil && !os.IsNotExist(e) {
  49. return e
  50. }
  51. if len(cgistatusbs) > 0 {
  52. e = json.Unmarshal(cgistatusbs, &cgistatus)
  53. if e != nil {
  54. logger.Warn(e)
  55. }
  56. }
  57. return nil
  58. }
  59. func (cgistatus *CGIStatus) WaitSaveDone() {
  60. cgistatus.waitdone <- 1
  61. cgistatus.rc.WaitDone()
  62. }
  63. func (cgistatus *CGIStatus) Save() (err error) {
  64. cgistatus.rc.CallLast2Only(func() {
  65. if !cgistatus.lastsavetime.Equal(time.Time{}) {
  66. interval := time.Since(cgistatus.lastsavetime)
  67. if interval < 1*time.Second {
  68. t := time.NewTimer(1*time.Second - interval)
  69. select {
  70. case <-t.C:
  71. case v := <-cgistatus.waitdone:
  72. cgistatus.waitdone <- v
  73. }
  74. }
  75. }
  76. cgistatus.mutex.RLock()
  77. cgistatusbs, e := json.MarshalIndent(cgistatus, "", " ")
  78. cgistatus.mutex.RUnlock()
  79. if e != nil {
  80. cgistatus.lasterror = e
  81. return
  82. }
  83. e = os.MkdirAll(filepath.Dir(cgistatusfile), os.ModePerm)
  84. if e != nil {
  85. cgistatus.lasterror = e
  86. return
  87. }
  88. e = os.WriteFile(cgistatusfile, cgistatusbs, os.ModePerm)
  89. if e != nil {
  90. cgistatus.lasterror = e
  91. return
  92. }
  93. cgistatus.lastsavetime = time.Now()
  94. // fmt.Println(cgistatus.lastsavetime)
  95. })
  96. return cgistatus.lasterror
  97. }
  98. func ImportDir(datapath string, parallel int) (totalfilescount, totalrecordscount int64, totalusetime time.Duration, filescount, recordscount int64, usetime time.Duration, err error) {
  99. importer := &Importer{
  100. datapath: datapath,
  101. parallel: parallel,
  102. importrc: rc.NewRoutinesController("", 100),
  103. odbcimporter: NewODBCImporter(),
  104. }
  105. return importer.Import()
  106. }
  107. func (importer *Importer) Import() (totalfilescount, totalrecordscount int64, totalusetime time.Duration, filescount, recordscount int64, usetime time.Duration, err error) {
  108. var cgirc = rc.NewRoutinesController("", importer.parallel)
  109. var wg sync.WaitGroup
  110. fw, e := filewalker.NewFileWalker([]string{importer.datapath}, ".*")
  111. if e != nil {
  112. err = e
  113. return
  114. }
  115. cgistatus := NewCGIStatus()
  116. reload := mcfg.GetString("reload")
  117. if odbc.DevPhase&odbc.DP_PROCESSCONTINUE != 0 && reload == "" {
  118. err = cgistatus.Load()
  119. if err != nil {
  120. return
  121. }
  122. } else {
  123. // reload
  124. // 清除已有类
  125. err = importer.odbcimporter.reload()
  126. if err != nil {
  127. return
  128. }
  129. }
  130. // 建类
  131. err = importer.odbcimporter.ReviseClassStruct()
  132. if err != nil {
  133. return
  134. }
  135. totalfilescount = int64(len(cgistatus.ImportStatus))
  136. for _, v := range cgistatus.ImportStatus {
  137. totalrecordscount += v.RecordsCount
  138. }
  139. totalusetime = cgistatus.TotalUseTime
  140. st := time.Now().Add(-totalusetime)
  141. cst := time.Now()
  142. // 遍历文件目录
  143. e = fw.List(func(basedir string, fpath string) bool {
  144. if err != nil {
  145. return false
  146. }
  147. filename := filepath.Join(basedir, fpath)
  148. wg.Add(1)
  149. cgirc.ConcurCall(1,
  150. func() {
  151. defer wg.Done()
  152. cgistatus.mutex.RLock()
  153. importstatus := cgistatus.ImportStatus[filename]
  154. cgistatus.mutex.RUnlock()
  155. if importstatus != nil {
  156. return
  157. }
  158. records, e := importer.ImportFile(filename)
  159. if e != nil {
  160. err = e
  161. return
  162. }
  163. atomic.AddInt64(&filescount, 1)
  164. atomic.AddInt64(&recordscount, records)
  165. atomic.AddInt64(&totalfilescount, 1)
  166. atomic.AddInt64(&totalrecordscount, records)
  167. usetime = time.Since(cst)
  168. totalusetime = time.Since(st)
  169. cgistatus.mutex.Lock()
  170. cgistatus.ImportStatus[filename] = &ImportStatus{RecordsCount: records}
  171. cgistatus.TotalUseTime = totalusetime
  172. cgistatus.mutex.Unlock()
  173. cgistatus.Save()
  174. },
  175. )
  176. return true
  177. })
  178. wg.Wait()
  179. if e != nil {
  180. err = e
  181. return
  182. }
  183. cgistatus.WaitSaveDone()
  184. importer.alldone()
  185. return
  186. }
  187. func (importer *Importer) ImportFile(filepath string) (blockcount int64, err error) {
  188. f, e := os.Open(filepath)
  189. if e != nil {
  190. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filepath}})
  191. }
  192. defer f.Close()
  193. return importer.importReader(filepath, f)
  194. }
  195. func (importer *Importer) importReader(filename string, buf io.Reader) (blockcount int64, err error) {
  196. br, e := reader.NewBlockReader(filename, buf)
  197. if e != nil {
  198. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
  199. }
  200. var filetype FileType
  201. switch {
  202. case strings.Contains(filename, "_L1_"):
  203. filetype = FT_LEVEL1
  204. case strings.Contains(filename, "_L2_"):
  205. filetype = FT_LEVEL2
  206. case strings.Contains(filename, "_L3_"):
  207. filetype = FT_LEVEL3
  208. case strings.Contains(filename, "_L4_"):
  209. filetype = FT_LEVEL4
  210. case strings.Contains(filename, "_L5_"):
  211. filetype = FT_LEVEL5
  212. case strings.Contains(filename, "_L6_"):
  213. filetype = FT_LEVEL6
  214. case strings.Contains(filename, "_L7_"):
  215. filetype = FT_LEVEL7
  216. case strings.Contains(filename, "_L8_"):
  217. filetype = FT_LEVEL8
  218. case strings.Contains(filename, "MASTER"):
  219. filetype = FT_MASTER
  220. case strings.Contains(filename, "EDGE"):
  221. filetype = FT_EDGE
  222. default:
  223. err = merrs.NewError("filename does not conform to the agreed format " + filename)
  224. return
  225. }
  226. var wg sync.WaitGroup
  227. defer importer.done()
  228. defer wg.Wait()
  229. for {
  230. if err != nil {
  231. break
  232. }
  233. block, line, linecount, e := br.ReadBlock()
  234. if e != nil {
  235. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  236. }
  237. if block == nil {
  238. return
  239. }
  240. wg.Add(1)
  241. e = importer.importrc.ConcurCall(1, func() {
  242. defer wg.Done()
  243. e = importer.importRecord(block, line, filename, filetype, linecount)
  244. if e != nil {
  245. err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  246. return
  247. }
  248. atomic.AddInt64(&blockcount, 1)
  249. })
  250. if e != nil {
  251. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  252. }
  253. }
  254. return
  255. }
  256. func (importer *Importer) importRecord(record map[string]any, line string, filename string, filetype FileType, linecount int) (err error) {
  257. if odbc.LogDebug {
  258. bs, e := json.MarshalIndent(record, "", " ")
  259. if e != nil {
  260. return merrs.NewError(e)
  261. }
  262. logger.Debug(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs)))
  263. }
  264. var classname string
  265. switch filetype {
  266. case FT_EDGE:
  267. err = importer.odbcimporter.InsertEdge(record)
  268. if err != nil {
  269. err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  270. return
  271. }
  272. default:
  273. classname = string(filetype)
  274. err = importer.odbcimporter.InsertData(classname, record)
  275. if err != nil {
  276. err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  277. return
  278. }
  279. }
  280. return
  281. }
  282. func (importer *Importer) alldone() {
  283. importer.odbcimporter.alldone()
  284. }
  285. func (importer *Importer) done() {
  286. importer.odbcimporter.done()
  287. }
  288. func Check() {
  289. client := odbc.ODBClient
  290. if client == nil {
  291. return
  292. }
  293. {
  294. time.Sleep(3 * time.Second)
  295. mql := "select * from /m3cnet/master"
  296. r, e := client.Query(mql).Do()
  297. if e != nil {
  298. panic(merrs.NewError(e))
  299. }
  300. bs, _ := json.MarshalIndent(r.Data, "", " ")
  301. fmt.Println(string(bs))
  302. }
  303. // {
  304. // mql := "select * from /m3cnet/minfo/level1"
  305. // r, e := client.Query(mql).Do()
  306. // if e != nil {
  307. // panic(merrs.NewError(e))
  308. // }
  309. // bs, _ := json.MarshalIndent(r.Data, "", " ")
  310. // fmt.Println(string(bs))
  311. // }
  312. }