importer.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. package importer
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path/filepath"
  8. "strings"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. "git.wecise.com/wecise/cgimport/odbc"
  13. "git.wecise.com/wecise/cgimport/reader"
  14. "git.wecise.com/wecise/util/filewalker"
  15. "git.wecise.com/wecise/util/merrs"
  16. "git.wecise.com/wecise/util/rc"
  17. )
  18. var mcfg = odbc.Config
  19. var logger = odbc.Logger
  20. type Importer struct {
  21. datapath string
  22. parallel int
  23. importrc *rc.RoutinesController
  24. odbcimporter *ODBCImporter
  25. }
  26. type ImportStatus struct {
  27. RecordsCount int64
  28. }
  29. type CGIStatus struct {
  30. mutex sync.RWMutex
  31. TotalUseTime time.Duration
  32. ImportStatus map[string]*ImportStatus
  33. rc *rc.RoutinesController
  34. lasterror error
  35. lastsavetime time.Time
  36. waitdone chan any
  37. }
  38. var cgistatusfile = mcfg.GetString("cgi.statusfile", "/opt/matrix/var/cgimport.status.txt")
  39. func NewCGIStatus() *CGIStatus {
  40. return &CGIStatus{
  41. ImportStatus: map[string]*ImportStatus{},
  42. rc: rc.NewRoutinesController("", 1),
  43. waitdone: make(chan any, 1),
  44. }
  45. }
  46. func (cgistatus *CGIStatus) Load() error {
  47. cgistatusbs, e := os.ReadFile(cgistatusfile)
  48. if e != nil && !os.IsNotExist(e) {
  49. return e
  50. }
  51. if len(cgistatusbs) > 0 {
  52. e = json.Unmarshal(cgistatusbs, &cgistatus)
  53. if e != nil {
  54. logger.Warn(e)
  55. }
  56. }
  57. return nil
  58. }
  59. func (cgistatus *CGIStatus) WaitSaveDone() {
  60. cgistatus.waitdone <- 1
  61. cgistatus.rc.WaitDone()
  62. }
  63. func (cgistatus *CGIStatus) Save() (err error) {
  64. cgistatus.rc.CallLast2Only(func() {
  65. if !cgistatus.lastsavetime.Equal(time.Time{}) {
  66. interval := time.Since(cgistatus.lastsavetime)
  67. if interval < 1*time.Second {
  68. t := time.NewTimer(1*time.Second - interval)
  69. select {
  70. case <-t.C:
  71. case v := <-cgistatus.waitdone:
  72. cgistatus.waitdone <- v
  73. }
  74. }
  75. }
  76. cgistatus.mutex.RLock()
  77. cgistatusbs, e := json.MarshalIndent(cgistatus, "", " ")
  78. cgistatus.mutex.RUnlock()
  79. if e != nil {
  80. cgistatus.lasterror = e
  81. return
  82. }
  83. e = os.MkdirAll(filepath.Dir(cgistatusfile), os.ModePerm)
  84. if e != nil {
  85. cgistatus.lasterror = e
  86. return
  87. }
  88. e = os.WriteFile(cgistatusfile, cgistatusbs, os.ModePerm)
  89. if e != nil {
  90. cgistatus.lasterror = e
  91. return
  92. }
  93. cgistatus.lastsavetime = time.Now()
  94. // fmt.Println(cgistatus.lastsavetime)
  95. })
  96. return cgistatus.lasterror
  97. }
  98. func ImportDir(datapath string, parallel int) (totalfilescount, totalrecordscount int64, totalusetime time.Duration, filescount, recordscount int64, usetime time.Duration, err error) {
  99. importer := &Importer{
  100. datapath: datapath,
  101. parallel: parallel,
  102. importrc: rc.NewRoutinesController("", 100),
  103. odbcimporter: NewODBCImporter(),
  104. }
  105. return importer.Import()
  106. }
  107. func (importer *Importer) Import() (totalfilescount, totalrecordscount int64, totalusetime time.Duration, filescount, recordscount int64, usetime time.Duration, err error) {
  108. var cgirc = rc.NewRoutinesController("", importer.parallel)
  109. var wg sync.WaitGroup
  110. fw, e := filewalker.NewFileWalker([]string{importer.datapath}, ".*")
  111. if e != nil {
  112. err = e
  113. return
  114. }
  115. cgistatus := NewCGIStatus()
  116. reload := mcfg.GetString("reload")
  117. if odbc.DevPhase&odbc.DP_PROCESSCONTINUE != 0 && reload == "" {
  118. err = cgistatus.Load()
  119. if err != nil {
  120. return
  121. }
  122. } else {
  123. // reload
  124. // 清除已有类
  125. err = importer.odbcimporter.reload()
  126. if err != nil {
  127. return
  128. }
  129. }
  130. // 建类
  131. err = importer.odbcimporter.ReviseClassStruct()
  132. if err != nil {
  133. return
  134. }
  135. totalfilescount = int64(len(cgistatus.ImportStatus))
  136. for _, v := range cgistatus.ImportStatus {
  137. totalrecordscount += v.RecordsCount
  138. }
  139. totalusetime = cgistatus.TotalUseTime
  140. st := time.Now().Add(-totalusetime)
  141. cst := time.Now()
  142. // 遍历文件目录
  143. e = fw.List(func(basedir string, fpath string) bool {
  144. if err != nil {
  145. return false
  146. }
  147. filename := filepath.Join(basedir, fpath)
  148. wg.Add(1)
  149. cgirc.ConcurCall(1,
  150. func() {
  151. defer wg.Done()
  152. cgistatus.mutex.RLock()
  153. importstatus := cgistatus.ImportStatus[filename]
  154. cgistatus.mutex.RUnlock()
  155. if importstatus != nil {
  156. return
  157. }
  158. records, e := importer.ImportFile(filename)
  159. if e != nil {
  160. err = e
  161. return
  162. }
  163. atomic.AddInt64(&filescount, 1)
  164. atomic.AddInt64(&recordscount, records)
  165. atomic.AddInt64(&totalfilescount, 1)
  166. atomic.AddInt64(&totalrecordscount, records)
  167. usetime = time.Since(cst)
  168. totalusetime = time.Since(st)
  169. cgistatus.mutex.Lock()
  170. cgistatus.ImportStatus[filename] = &ImportStatus{RecordsCount: records}
  171. cgistatus.TotalUseTime = totalusetime
  172. cgistatus.mutex.Unlock()
  173. cgistatus.Save()
  174. },
  175. )
  176. return true
  177. })
  178. wg.Wait()
  179. if e != nil {
  180. if os.IsNotExist(e) {
  181. err = merrs.NewError(`directory "`+importer.datapath+`" not exist specified by "datapath"`, e)
  182. } else {
  183. err = merrs.NewError(e)
  184. }
  185. return
  186. }
  187. cgistatus.WaitSaveDone()
  188. importer.alldone()
  189. return
  190. }
  191. func (importer *Importer) ImportFile(filepath string) (blockcount int64, err error) {
  192. f, e := os.Open(filepath)
  193. if e != nil {
  194. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filepath}})
  195. }
  196. defer f.Close()
  197. return importer.importReader(filepath, f)
  198. }
  199. func (importer *Importer) importReader(filename string, buf io.Reader) (blockcount int64, err error) {
  200. br, e := reader.NewBlockReader(filename, buf)
  201. if e != nil {
  202. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
  203. }
  204. var filetype FileType
  205. switch {
  206. case strings.Contains(filename, "_L1_"):
  207. filetype = FT_LEVEL1
  208. case strings.Contains(filename, "_L2_"):
  209. filetype = FT_LEVEL2
  210. case strings.Contains(filename, "_L3_"):
  211. filetype = FT_LEVEL3
  212. case strings.Contains(filename, "_L4_"):
  213. filetype = FT_LEVEL4
  214. case strings.Contains(filename, "_L5_"):
  215. filetype = FT_LEVEL5
  216. case strings.Contains(filename, "_L6_"):
  217. filetype = FT_LEVEL6
  218. case strings.Contains(filename, "_L7_"):
  219. filetype = FT_LEVEL7
  220. case strings.Contains(filename, "_L8_"):
  221. filetype = FT_LEVEL8
  222. case strings.Contains(filename, "MASTER"):
  223. filetype = FT_MASTER
  224. case strings.Contains(filename, "EDGE"):
  225. filetype = FT_EDGE
  226. default:
  227. err = merrs.NewError("filename does not conform to the agreed format " + filename)
  228. return
  229. }
  230. var wg sync.WaitGroup
  231. defer importer.done()
  232. defer wg.Wait()
  233. for {
  234. if err != nil {
  235. break
  236. }
  237. block, line, linecount, e := br.ReadBlock()
  238. if e != nil {
  239. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  240. }
  241. if block == nil {
  242. return
  243. }
  244. wg.Add(1)
  245. e = importer.importrc.ConcurCall(1, func() {
  246. defer wg.Done()
  247. e = importer.importRecord(block, line, filename, filetype, linecount)
  248. if e != nil {
  249. err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  250. return
  251. }
  252. atomic.AddInt64(&blockcount, 1)
  253. })
  254. if e != nil {
  255. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  256. }
  257. }
  258. return
  259. }
  260. func (importer *Importer) importRecord(record map[string]any, line string, filename string, filetype FileType, linecount int) (err error) {
  261. if odbc.LogDebug {
  262. bs, e := json.MarshalIndent(record, "", " ")
  263. if e != nil {
  264. return merrs.NewError(e)
  265. }
  266. logger.Debug(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs)))
  267. }
  268. var classname string
  269. switch filetype {
  270. case FT_EDGE:
  271. err = importer.odbcimporter.InsertEdge(record)
  272. if err != nil {
  273. err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  274. return
  275. }
  276. default:
  277. classname = string(filetype)
  278. err = importer.odbcimporter.InsertData(classname, record)
  279. if err != nil {
  280. err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
  281. return
  282. }
  283. }
  284. return
  285. }
  286. func (importer *Importer) alldone() {
  287. importer.odbcimporter.alldone()
  288. }
  289. func (importer *Importer) done() {
  290. importer.odbcimporter.done()
  291. }
  292. func Check() {
  293. client := odbc.ODBClient
  294. if client == nil {
  295. return
  296. }
  297. {
  298. mql := "select * from /m3cnet/master"
  299. r, e := client.Query(mql).Do()
  300. if e != nil {
  301. panic(merrs.NewError(e))
  302. }
  303. bs, _ := json.MarshalIndent(r.Data, "", " ")
  304. fmt.Println(string(bs))
  305. }
  306. }