cgf.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package cgf
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path/filepath"
  8. "sync"
  9. "sync/atomic"
  10. "git.wecise.com/wecise/cgimport/cgf/reader"
  11. "git.wecise.com/wecise/cgimport/odbc"
  12. "git.wecise.com/wecise/util/filewalker"
  13. "git.wecise.com/wecise/util/merrs"
  14. "git.wecise.com/wecise/util/rc"
  15. )
  16. var mcfg = odbc.Config
  17. var logger = odbc.Logger
  18. func ImportDir(datapath string, parallel int) (filescount, recordscount int64, err error) {
  19. // 遍历文件目录
  20. var cgirc = rc.NewRoutinesController("", parallel)
  21. var wg sync.WaitGroup
  22. fw, e := filewalker.NewFileWalker([]string{datapath}, ".*")
  23. if e != nil {
  24. err = e
  25. return
  26. }
  27. e = fw.List(func(basedir string, fpath string) bool {
  28. if err != nil {
  29. return false
  30. }
  31. filename := filepath.Join(basedir, fpath)
  32. wg.Add(1)
  33. cgirc.ConcurCall(1,
  34. func() {
  35. defer wg.Done()
  36. records, e := ImportFile(filename)
  37. if e != nil {
  38. err = e
  39. return
  40. }
  41. atomic.AddInt64(&filescount, 1)
  42. atomic.AddInt64(&recordscount, int64(records))
  43. },
  44. )
  45. return true
  46. })
  47. wg.Wait()
  48. if e != nil {
  49. err = e
  50. return
  51. }
  52. return
  53. }
  54. func ImportFile(filepath string) (blockcount int, err error) {
  55. f, e := os.Open(filepath)
  56. if e != nil {
  57. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filepath}})
  58. }
  59. defer f.Close()
  60. return importReader(filepath, f)
  61. }
  62. var parserc = rc.NewRoutinesController("", 1000)
  63. func importReader(filename string, buf io.Reader) (blockcount int, err error) {
  64. br, e := reader.NewBlockReader(filename, buf)
  65. if e != nil {
  66. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
  67. }
  68. var wg sync.WaitGroup
  69. defer wg.Wait()
  70. for {
  71. if err != nil {
  72. break
  73. }
  74. block, linecount, e := br.ReadBlock()
  75. if e != nil {
  76. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"line": fmt.Sprint(linecount)}})
  77. }
  78. if block == nil {
  79. return
  80. }
  81. wg.Add(1)
  82. e = parserc.ConcurCall(1, func() {
  83. defer wg.Done()
  84. e = importBlock(block, filename, linecount)
  85. if e != nil {
  86. err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"line": fmt.Sprint(linecount)}})
  87. return
  88. }
  89. blockcount++
  90. })
  91. if e != nil {
  92. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"line": fmt.Sprint(linecount)}})
  93. }
  94. }
  95. return
  96. }
  97. func importBlock(block map[string]any, filename string, linecount int) (err error) {
  98. bs, e := json.MarshalIndent(block, "", " ")
  99. if e != nil {
  100. return e
  101. }
  102. logger.Debug(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs)))
  103. return
  104. }