cgistatus.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package importer
  2. import (
  3. "encoding/json"
  4. "os"
  5. "sync"
  6. "time"
  7. "git.wecise.com/wecise/cgimport/odbc"
  8. "github.com/wecisecode/util/mio"
  9. "github.com/wecisecode/util/rc"
  10. )
  11. type ImportStatus struct {
  12. LinesCount int64
  13. RecordsCount int64
  14. RetryCount int64
  15. }
  16. type CGIStatus struct {
  17. filepath string
  18. //
  19. TotalUseTime time.Duration
  20. ImportStatus map[string]*ImportStatus
  21. //
  22. mutex sync.RWMutex
  23. rc *rc.RoutinesController
  24. lasterror error
  25. lastsavetime time.Time
  26. waitdone chan any
  27. }
  28. func NewCGIStatus() *CGIStatus {
  29. return &CGIStatus{
  30. filepath: mcfg.GetString("cgi.statusfile", "/opt/matrix/var/cgimport/"+odbc.Keyspace()+".status.txt"),
  31. ImportStatus: map[string]*ImportStatus{},
  32. rc: rc.NewRoutinesController("", 1),
  33. waitdone: make(chan any, 1),
  34. }
  35. }
  36. func (cgistatus *CGIStatus) Load() error {
  37. logger.Info("load progress from", cgistatus.filepath)
  38. cgistatusbs, e := mio.ReadFile(cgistatus.filepath)
  39. if e != nil && !os.IsNotExist(e) {
  40. return e
  41. }
  42. if len(cgistatusbs) > 0 {
  43. e = json.Unmarshal(cgistatusbs, &cgistatus)
  44. if e != nil {
  45. logger.Warn(e)
  46. }
  47. }
  48. return nil
  49. }
  50. func (cgistatus *CGIStatus) WaitSaveDone() {
  51. cgistatus.waitdone <- 1
  52. cgistatus.rc.WaitDone()
  53. }
  54. func (cgistatus *CGIStatus) Save() (err error) {
  55. cgistatus.rc.CallLast2Only(func() {
  56. if !cgistatus.lastsavetime.Equal(time.Time{}) {
  57. interval := 10 * time.Second
  58. realinterval := time.Since(cgistatus.lastsavetime)
  59. if realinterval < interval {
  60. t := time.NewTimer(interval - realinterval)
  61. select {
  62. case <-t.C:
  63. case v := <-cgistatus.waitdone:
  64. cgistatus.waitdone <- v
  65. }
  66. }
  67. }
  68. cgistatus.mutex.RLock()
  69. cgistatusbs, e := json.MarshalIndent(cgistatus, "", " ")
  70. cgistatus.mutex.RUnlock()
  71. if e != nil {
  72. cgistatus.lasterror = e
  73. return
  74. }
  75. e = mio.WriteFile(cgistatus.filepath, cgistatusbs, true)
  76. if e != nil {
  77. cgistatus.lasterror = e
  78. return
  79. }
  80. cgistatus.lastsavetime = time.Now()
  81. // fmt.Println(cgistatus.lastsavetime)
  82. })
  83. return cgistatus.lasterror
  84. }