syncstatus.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. package datasync
  2. import (
  3. "encoding/json"
  4. "os"
  5. "path/filepath"
  6. "strings"
  7. "sync"
  8. "time"
  9. "gitee.com/wecisecode/util/mio"
  10. "gitee.com/wecisecode/util/rc"
  11. )
  12. type DoneCount struct {
  13. BucketClass string `json:",omitempty"`
  14. BucketField string `json:",omitempty"`
  15. BucketObjID string `json:",omitempty"`
  16. RecordsCount int64 `json:",omitempty"`
  17. FromVtime string `json:",omitempty"`
  18. LastDataVtime string `json:",omitempty"`
  19. LastSyncVtime string `json:",omitempty"`
  20. RetryCount int64 `json:",omitempty"`
  21. lastlogtime time.Time `json:"-"`
  22. isrunning chan bool `json:"-"`
  23. }
  24. type CountStatus struct {
  25. TotalUseTime time.Duration `json:",omitempty"`
  26. DoneCount map[string]*DoneCount `json:",omitempty"`
  27. mutex sync.RWMutex `json:"-"`
  28. }
  29. type SyncStatus struct {
  30. filepath string
  31. //
  32. starttime time.Time
  33. countstatus *CountStatus
  34. //
  35. rc *rc.RoutinesController
  36. lasterror error
  37. lastsavetime time.Time
  38. waitdone chan any
  39. }
  40. func NewSyncStatus(key string) *SyncStatus {
  41. datadir := mcfg.GetString("datasync.data.dir", "/opt/matrix/var/datasync")
  42. statusfile := mcfg.GetString("datasync.statusfile", key+".status.txt")
  43. if !strings.HasPrefix(statusfile, "/") {
  44. statusfile = filepath.Join(datadir, statusfile)
  45. }
  46. return &SyncStatus{
  47. filepath: statusfile,
  48. countstatus: &CountStatus{
  49. DoneCount: map[string]*DoneCount{},
  50. },
  51. rc: rc.NewRoutinesController("", 1),
  52. waitdone: make(chan any, 1),
  53. }
  54. }
  55. func (syncstatus *SyncStatus) Resume() {
  56. syncstatus.countstatus.mutex.Lock()
  57. defer syncstatus.countstatus.mutex.Unlock()
  58. syncstatus.starttime = time.Now().Add(-syncstatus.countstatus.TotalUseTime)
  59. }
  60. func (syncstatus *SyncStatus) TotalUseTime() time.Duration {
  61. syncstatus.countstatus.mutex.RLock()
  62. defer syncstatus.countstatus.mutex.RUnlock()
  63. return syncstatus.countstatus.TotalUseTime
  64. }
  65. func (syncstatus *SyncStatus) TotalChunks() int {
  66. syncstatus.countstatus.mutex.RLock()
  67. defer syncstatus.countstatus.mutex.RUnlock()
  68. return len(syncstatus.countstatus.DoneCount)
  69. }
  70. func (syncstatus *SyncStatus) TotalRecords() int {
  71. syncstatus.countstatus.mutex.RLock()
  72. defer syncstatus.countstatus.mutex.RUnlock()
  73. n := 0
  74. for _, v := range syncstatus.countstatus.DoneCount {
  75. n += int(v.RecordsCount)
  76. }
  77. return n
  78. }
  79. func (syncstatus *SyncStatus) DoneCountCopy(filter func(k string, v *DoneCount) bool) map[string]*DoneCount {
  80. syncstatus.countstatus.mutex.RLock()
  81. defer syncstatus.countstatus.mutex.RUnlock()
  82. m := map[string]*DoneCount{}
  83. for k, v := range syncstatus.countstatus.DoneCount {
  84. if filter(k, v) {
  85. m[k] = v
  86. }
  87. }
  88. return m
  89. }
  90. func (syncstatus *SyncStatus) DoneCount(key string) *DoneCount {
  91. syncstatus.countstatus.mutex.Lock()
  92. defer syncstatus.countstatus.mutex.Unlock()
  93. dc := syncstatus.countstatus.DoneCount[key]
  94. if dc == nil {
  95. dc = &DoneCount{}
  96. syncstatus.countstatus.DoneCount[key] = dc
  97. }
  98. if dc.isrunning == nil {
  99. dc.isrunning = make(chan bool, 1)
  100. dc.isrunning <- false
  101. }
  102. return dc
  103. }
  104. func (syncstatus *SyncStatus) RemoveDoneCount(key string) {
  105. syncstatus.countstatus.mutex.Lock()
  106. defer syncstatus.countstatus.mutex.Unlock()
  107. delete(syncstatus.countstatus.DoneCount, key)
  108. }
  109. func (syncstatus *SyncStatus) Load() error {
  110. syncstatus.countstatus.mutex.Lock()
  111. defer syncstatus.countstatus.mutex.Unlock()
  112. logger.Info("load progress from", syncstatus.filepath)
  113. syncstatusbs, e := mio.ReadFile(syncstatus.filepath)
  114. if e != nil && !os.IsNotExist(e) {
  115. return e
  116. }
  117. if len(syncstatusbs) > 0 {
  118. e = json.Unmarshal(syncstatusbs, syncstatus.countstatus)
  119. if e != nil {
  120. logger.Warn(e)
  121. }
  122. }
  123. for _, dc := range syncstatus.countstatus.DoneCount {
  124. dc.isrunning = make(chan bool, 1)
  125. dc.isrunning <- false
  126. }
  127. return nil
  128. }
  129. func (syncstatus *SyncStatus) WaitSaveDone() {
  130. syncstatus.waitdone <- 1
  131. syncstatus.rc.WaitDone()
  132. <-syncstatus.waitdone
  133. }
  134. func (syncstatus *SyncStatus) Save(key string, dc *DoneCount) (err error) {
  135. syncstatus.countstatus.mutex.Lock()
  136. syncstatus.countstatus.DoneCount[key] = dc
  137. syncstatus.countstatus.TotalUseTime = time.Since(syncstatus.starttime)
  138. outputloginfo := false
  139. if time.Since(dc.lastlogtime) > 5*time.Second {
  140. dc.lastlogtime = time.Now()
  141. outputloginfo = true
  142. }
  143. syncstatus.countstatus.mutex.Unlock()
  144. if outputloginfo {
  145. logger.Info("sync data:", key, "vtime:", dc.FromVtime, "~", dc.LastSyncVtime, "records:", dc.RecordsCount, "lastvtime:", dc.LastDataVtime)
  146. }
  147. syncstatus.rc.CallLast2Only(func() {
  148. if !syncstatus.lastsavetime.Equal(time.Time{}) {
  149. interval := 10 * time.Second
  150. realinterval := time.Since(syncstatus.lastsavetime)
  151. if realinterval < interval {
  152. t := time.NewTimer(interval - realinterval)
  153. select {
  154. case <-t.C:
  155. case v := <-syncstatus.waitdone:
  156. syncstatus.waitdone <- v
  157. }
  158. }
  159. }
  160. syncstatus.countstatus.mutex.RLock()
  161. syncstatusbs, e := json.MarshalIndent(syncstatus.countstatus, "", " ")
  162. syncstatus.countstatus.mutex.RUnlock()
  163. if e != nil {
  164. syncstatus.lasterror = e
  165. return
  166. }
  167. e = mio.WriteFile(syncstatus.filepath, syncstatusbs, true)
  168. if e != nil {
  169. syncstatus.lasterror = e
  170. return
  171. }
  172. syncstatus.lastsavetime = time.Now()
  173. // fmt.Println(syncstatus.lastsavetime)
  174. })
  175. return syncstatus.lasterror
  176. }