importer.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. package importer
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path/filepath"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "git.wecise.com/wecise/cgimport/odbc"
  12. "git.wecise.com/wecise/cgimport/reader"
  13. "git.wecise.com/wecise/util/filewalker"
  14. "git.wecise.com/wecise/util/merrs"
  15. "git.wecise.com/wecise/util/rc"
  16. )
  17. var mcfg = odbc.Config
  18. var logger = odbc.Logger
  19. type Importer struct {
  20. datapath string
  21. parallel int
  22. importrc *rc.RoutinesController
  23. odbcimporter *ODBCImporter
  24. }
  25. type ImportStatus struct {
  26. RecordsCount int64
  27. }
  28. type CGIStatus struct {
  29. mutex sync.RWMutex
  30. TotalUseTime time.Duration
  31. ImportStatus map[string]*ImportStatus
  32. rc *rc.RoutinesController
  33. lasterror error
  34. lastsavetime time.Time
  35. waitdone chan any
  36. }
  37. var cgistatusfile = mcfg.GetString("cgi.statusfile", "/opt/matrix/var/cgimport.status.txt")
  38. func NewCGIStatus() *CGIStatus {
  39. return &CGIStatus{
  40. ImportStatus: map[string]*ImportStatus{},
  41. rc: rc.NewRoutinesController("", 1),
  42. waitdone: make(chan any, 1),
  43. }
  44. }
  45. func (cgistatus *CGIStatus) Load() error {
  46. cgistatusbs, e := os.ReadFile(cgistatusfile)
  47. if e != nil && !os.IsNotExist(e) {
  48. return e
  49. }
  50. if len(cgistatusbs) > 0 {
  51. e = json.Unmarshal(cgistatusbs, &cgistatus)
  52. if e != nil {
  53. logger.Warn(e)
  54. }
  55. }
  56. return nil
  57. }
  58. func (cgistatus *CGIStatus) WaitSaveDone() {
  59. cgistatus.waitdone <- 1
  60. cgistatus.rc.WaitDone()
  61. }
  62. func (cgistatus *CGIStatus) Save() (err error) {
  63. cgistatus.rc.CallLast2Only(func() {
  64. if !cgistatus.lastsavetime.Equal(time.Time{}) {
  65. interval := time.Since(cgistatus.lastsavetime)
  66. if interval < 1*time.Second {
  67. t := time.NewTimer(1*time.Second - interval)
  68. select {
  69. case <-t.C:
  70. case v := <-cgistatus.waitdone:
  71. cgistatus.waitdone <- v
  72. }
  73. }
  74. }
  75. cgistatus.mutex.RLock()
  76. cgistatusbs, e := json.MarshalIndent(cgistatus, "", " ")
  77. cgistatus.mutex.RUnlock()
  78. if e != nil {
  79. cgistatus.lasterror = e
  80. return
  81. }
  82. e = os.MkdirAll(filepath.Dir(cgistatusfile), os.ModePerm)
  83. if e != nil {
  84. cgistatus.lasterror = e
  85. return
  86. }
  87. e = os.WriteFile(cgistatusfile, cgistatusbs, os.ModePerm)
  88. if e != nil {
  89. cgistatus.lasterror = e
  90. return
  91. }
  92. cgistatus.lastsavetime = time.Now()
  93. // fmt.Println(cgistatus.lastsavetime)
  94. })
  95. return cgistatus.lasterror
  96. }
  97. func ImportDir(datapath string, parallel int) (totalfilescount, totalrecordscount int64, totalusetime time.Duration, filescount, recordscount int64, usetime time.Duration, err error) {
  98. importer := &Importer{
  99. datapath: datapath,
  100. parallel: parallel,
  101. importrc: rc.NewRoutinesController("", 1000),
  102. odbcimporter: NewODBCImporter(),
  103. }
  104. return importer.Import()
  105. }
  106. func (importer *Importer) Import() (totalfilescount, totalrecordscount int64, totalusetime time.Duration, filescount, recordscount int64, usetime time.Duration, err error) {
  107. var cgirc = rc.NewRoutinesController("", importer.parallel)
  108. var wg sync.WaitGroup
  109. fw, e := filewalker.NewFileWalker([]string{importer.datapath}, ".*")
  110. if e != nil {
  111. err = e
  112. return
  113. }
  114. cgistatus := NewCGIStatus()
  115. reload := mcfg.GetString("reload")
  116. if reload == "" {
  117. e = cgistatus.Load()
  118. if e != nil {
  119. err = e
  120. return
  121. }
  122. }
  123. totalfilescount = int64(len(cgistatus.ImportStatus))
  124. for _, v := range cgistatus.ImportStatus {
  125. totalrecordscount += v.RecordsCount
  126. }
  127. totalusetime = cgistatus.TotalUseTime
  128. st := time.Now().Add(-totalusetime)
  129. cst := time.Now()
  130. // 遍历文件目录
  131. e = fw.List(func(basedir string, fpath string) bool {
  132. if err != nil {
  133. return false
  134. }
  135. filename := filepath.Join(basedir, fpath)
  136. wg.Add(1)
  137. cgirc.ConcurCall(1,
  138. func() {
  139. defer wg.Done()
  140. cgistatus.mutex.RLock()
  141. importstatus := cgistatus.ImportStatus[filename]
  142. cgistatus.mutex.RUnlock()
  143. if importstatus != nil {
  144. return
  145. }
  146. records, e := importer.ImportFile(filename)
  147. if e != nil {
  148. err = e
  149. return
  150. }
  151. atomic.AddInt64(&filescount, 1)
  152. atomic.AddInt64(&recordscount, records)
  153. atomic.AddInt64(&totalfilescount, 1)
  154. atomic.AddInt64(&totalrecordscount, records)
  155. usetime = time.Since(cst)
  156. totalusetime = time.Since(st)
  157. cgistatus.mutex.Lock()
  158. cgistatus.ImportStatus[filename] = &ImportStatus{RecordsCount: records}
  159. cgistatus.TotalUseTime = totalusetime
  160. cgistatus.mutex.Unlock()
  161. cgistatus.Save()
  162. },
  163. )
  164. return true
  165. })
  166. wg.Wait()
  167. if e != nil {
  168. err = e
  169. return
  170. }
  171. cgistatus.WaitSaveDone()
  172. return
  173. }
  174. func (importer *Importer) ImportFile(filepath string) (blockcount int64, err error) {
  175. f, e := os.Open(filepath)
  176. if e != nil {
  177. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filepath}})
  178. }
  179. defer f.Close()
  180. return importer.importReader(filepath, f)
  181. }
  182. func (importer *Importer) importReader(filename string, buf io.Reader) (blockcount int64, err error) {
  183. br, e := reader.NewBlockReader(filename, buf)
  184. if e != nil {
  185. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
  186. }
  187. var wg sync.WaitGroup
  188. defer wg.Wait()
  189. for {
  190. if err != nil {
  191. break
  192. }
  193. block, linecount, e := br.ReadBlock()
  194. if e != nil {
  195. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"line": fmt.Sprint(linecount)}})
  196. }
  197. if block == nil {
  198. return
  199. }
  200. wg.Add(1)
  201. e = importer.importrc.ConcurCall(1, func() {
  202. defer wg.Done()
  203. e = importer.importRecord(block, filename, linecount)
  204. if e != nil {
  205. err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"line": fmt.Sprint(linecount)}})
  206. return
  207. }
  208. atomic.AddInt64(&blockcount, 1)
  209. })
  210. if e != nil {
  211. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"line": fmt.Sprint(linecount)}})
  212. }
  213. }
  214. return
  215. }
  216. func (importer *Importer) importRecord(record map[string]any, filename string, linecount int) (err error) {
  217. bs, e := json.MarshalIndent(record, "", " ")
  218. if e != nil {
  219. return e
  220. }
  221. logger.Debug(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs)))
  222. e = importer.odbcimporter.ReviseClassStruct(record)
  223. if e != nil {
  224. return e
  225. }
  226. e = importer.odbcimporter.InsertData(record)
  227. if e != nil {
  228. return e
  229. }
  230. return
  231. }