cgf.go 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package cgf
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path/filepath"
  8. "sync"
  9. "sync/atomic"
  10. "git.wecise.com/wecise/cgimport/cgf/reader"
  11. "git.wecise.com/wecise/util/filewalker"
  12. "git.wecise.com/wecise/util/merrs"
  13. "git.wecise.com/wecise/util/rc"
  14. )
  15. func ImportDir(datapath string, parallel int) (filescount, recordscount int64, err error) {
  16. // 遍历文件目录
  17. var cgirc = rc.NewRoutinesController("", parallel)
  18. var wg sync.WaitGroup
  19. fw, e := filewalker.NewFileWalker([]string{datapath}, ".*")
  20. if e != nil {
  21. err = e
  22. return
  23. }
  24. e = fw.List(func(basedir string, fpath string) bool {
  25. if err != nil {
  26. return false
  27. }
  28. filename := filepath.Join(basedir, fpath)
  29. wg.Add(1)
  30. cgirc.ConcurCall(1,
  31. func() {
  32. defer wg.Done()
  33. records, e := ImportFile(filename)
  34. if e != nil {
  35. err = e
  36. return
  37. }
  38. atomic.AddInt64(&filescount, 1)
  39. atomic.AddInt64(&recordscount, int64(records))
  40. },
  41. )
  42. return true
  43. })
  44. wg.Wait()
  45. if e != nil {
  46. err = e
  47. return
  48. }
  49. return
  50. }
  51. func ImportFile(filepath string) (blockcount int, err error) {
  52. f, e := os.Open(filepath)
  53. if e != nil {
  54. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filepath}})
  55. }
  56. defer f.Close()
  57. return importReader(filepath, f)
  58. }
  59. func importReader(filename string, buf io.Reader) (blockcount int, err error) {
  60. br, e := reader.NewBlockReader(filename, buf)
  61. if e != nil {
  62. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
  63. }
  64. for {
  65. block, linecount, e := br.ReadBlock()
  66. if e != nil {
  67. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"line": fmt.Sprint(linecount)}})
  68. }
  69. if block == nil {
  70. return
  71. }
  72. e = importBlock(block, filename, linecount)
  73. if e != nil {
  74. return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"line": fmt.Sprint(linecount)}})
  75. }
  76. blockcount++
  77. }
  78. }
  79. func importBlock(block map[string]any, filename string, linecount int) (err error) {
  80. bs, e := json.MarshalIndent(block, "", " ")
  81. if e != nil {
  82. return e
  83. }
  84. fmt.Println(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs)))
  85. return
  86. }