cgistatus.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. package importer
  2. import (
  3. "encoding/json"
  4. "os"
  5. "path/filepath"
  6. "sync"
  7. "time"
  8. "git.wecise.com/wecise/cgimport/odbc"
  9. "github.com/wecisecode/util/rc"
  10. )
  11. type ImportStatus struct {
  12. LinesCount int64
  13. RecordsCount int64
  14. }
  15. type CGIStatus struct {
  16. filepath string
  17. //
  18. TotalUseTime time.Duration
  19. ImportStatus map[string]*ImportStatus
  20. //
  21. mutex sync.RWMutex
  22. rc *rc.RoutinesController
  23. lasterror error
  24. lastsavetime time.Time
  25. waitdone chan any
  26. }
  27. func NewCGIStatus() *CGIStatus {
  28. return &CGIStatus{
  29. filepath: mcfg.GetString("cgi.statusfile", "/opt/matrix/var/cgimport/"+odbc.Keyspace+".status.txt"),
  30. ImportStatus: map[string]*ImportStatus{},
  31. rc: rc.NewRoutinesController("", 1),
  32. waitdone: make(chan any, 1),
  33. }
  34. }
  35. func (cgistatus *CGIStatus) Load() error {
  36. cgistatusbs, e := os.ReadFile(cgistatus.filepath)
  37. if e != nil && !os.IsNotExist(e) {
  38. return e
  39. }
  40. if len(cgistatusbs) > 0 {
  41. e = json.Unmarshal(cgistatusbs, &cgistatus)
  42. if e != nil {
  43. logger.Warn(e)
  44. }
  45. }
  46. return nil
  47. }
  48. func (cgistatus *CGIStatus) WaitSaveDone() {
  49. cgistatus.waitdone <- 1
  50. cgistatus.rc.WaitDone()
  51. }
  52. func (cgistatus *CGIStatus) Save() (err error) {
  53. cgistatus.rc.CallLast2Only(func() {
  54. if !cgistatus.lastsavetime.Equal(time.Time{}) {
  55. interval := 10 * time.Second
  56. realinterval := time.Since(cgistatus.lastsavetime)
  57. if realinterval < interval {
  58. t := time.NewTimer(interval - realinterval)
  59. select {
  60. case <-t.C:
  61. case v := <-cgistatus.waitdone:
  62. cgistatus.waitdone <- v
  63. }
  64. }
  65. }
  66. cgistatus.mutex.RLock()
  67. cgistatusbs, e := json.MarshalIndent(cgistatus, "", " ")
  68. cgistatus.mutex.RUnlock()
  69. if e != nil {
  70. cgistatus.lasterror = e
  71. return
  72. }
  73. e = os.MkdirAll(filepath.Dir(cgistatus.filepath), os.ModePerm)
  74. if e != nil {
  75. cgistatus.lasterror = e
  76. return
  77. }
  78. e = os.WriteFile(cgistatus.filepath, cgistatusbs, os.ModePerm)
  79. if e != nil {
  80. cgistatus.lasterror = e
  81. return
  82. }
  83. cgistatus.lastsavetime = time.Now()
  84. // fmt.Println(cgistatus.lastsavetime)
  85. })
  86. return cgistatus.lasterror
  87. }