| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193 |
- package datasync
- import (
- "encoding/json"
- "os"
- "path/filepath"
- "strings"
- "sync"
- "time"
- "gitee.com/wecisecode/util/mio"
- "gitee.com/wecisecode/util/rc"
- )
- type DoneCount struct {
- BucketClass string `json:",omitempty"`
- BucketField string `json:",omitempty"`
- BucketObjID string `json:",omitempty"`
- RecordsCount int64 `json:",omitempty"`
- FromVtime string `json:",omitempty"`
- LastDataVtime string `json:",omitempty"`
- LastSyncVtime string `json:",omitempty"`
- RetryCount int64 `json:",omitempty"`
- lastlogtime time.Time `json:"-"`
- isrunning chan bool `json:"-"`
- }
- type CountStatus struct {
- TotalUseTime time.Duration `json:",omitempty"`
- DoneCount map[string]*DoneCount `json:",omitempty"`
- mutex sync.RWMutex `json:"-"`
- }
- type SyncStatus struct {
- filepath string
- //
- starttime time.Time
- countstatus *CountStatus
- //
- rc *rc.RoutinesController
- lasterror error
- lastsavetime time.Time
- waitdone chan any
- }
- func NewSyncStatus(key string) *SyncStatus {
- datadir := mcfg.GetString("datasync.data.dir", "/opt/matrix/var/datasync")
- statusfile := mcfg.GetString("datasync.statusfile", key+".status.txt")
- if !strings.HasPrefix(statusfile, "/") {
- statusfile = filepath.Join(datadir, statusfile)
- }
- return &SyncStatus{
- filepath: statusfile,
- countstatus: &CountStatus{
- DoneCount: map[string]*DoneCount{},
- },
- rc: rc.NewRoutinesController("", 1),
- waitdone: make(chan any, 1),
- }
- }
- func (syncstatus *SyncStatus) Resume() {
- syncstatus.countstatus.mutex.Lock()
- defer syncstatus.countstatus.mutex.Unlock()
- syncstatus.starttime = time.Now().Add(-syncstatus.countstatus.TotalUseTime)
- }
- func (syncstatus *SyncStatus) TotalUseTime() time.Duration {
- syncstatus.countstatus.mutex.RLock()
- defer syncstatus.countstatus.mutex.RUnlock()
- return syncstatus.countstatus.TotalUseTime
- }
- func (syncstatus *SyncStatus) TotalChunks() int {
- syncstatus.countstatus.mutex.RLock()
- defer syncstatus.countstatus.mutex.RUnlock()
- return len(syncstatus.countstatus.DoneCount)
- }
- func (syncstatus *SyncStatus) TotalRecords() int {
- syncstatus.countstatus.mutex.RLock()
- defer syncstatus.countstatus.mutex.RUnlock()
- n := 0
- for _, v := range syncstatus.countstatus.DoneCount {
- n += int(v.RecordsCount)
- }
- return n
- }
- func (syncstatus *SyncStatus) DoneCountCopy(filter func(k string, v *DoneCount) bool) map[string]*DoneCount {
- syncstatus.countstatus.mutex.RLock()
- defer syncstatus.countstatus.mutex.RUnlock()
- m := map[string]*DoneCount{}
- for k, v := range syncstatus.countstatus.DoneCount {
- if filter(k, v) {
- m[k] = v
- }
- }
- return m
- }
- func (syncstatus *SyncStatus) DoneCount(key string) *DoneCount {
- syncstatus.countstatus.mutex.Lock()
- defer syncstatus.countstatus.mutex.Unlock()
- dc := syncstatus.countstatus.DoneCount[key]
- if dc == nil {
- dc = &DoneCount{}
- syncstatus.countstatus.DoneCount[key] = dc
- }
- if dc.isrunning == nil {
- dc.isrunning = make(chan bool, 1)
- dc.isrunning <- false
- }
- return dc
- }
- func (syncstatus *SyncStatus) RemoveDoneCount(key string) {
- syncstatus.countstatus.mutex.Lock()
- defer syncstatus.countstatus.mutex.Unlock()
- delete(syncstatus.countstatus.DoneCount, key)
- }
- func (syncstatus *SyncStatus) Load() error {
- syncstatus.countstatus.mutex.Lock()
- defer syncstatus.countstatus.mutex.Unlock()
- logger.Info("load progress from", syncstatus.filepath)
- syncstatusbs, e := mio.ReadFile(syncstatus.filepath)
- if e != nil && !os.IsNotExist(e) {
- return e
- }
- if len(syncstatusbs) > 0 {
- e = json.Unmarshal(syncstatusbs, syncstatus.countstatus)
- if e != nil {
- logger.Warn(e)
- }
- }
- for _, dc := range syncstatus.countstatus.DoneCount {
- dc.isrunning = make(chan bool, 1)
- dc.isrunning <- false
- }
- return nil
- }
- func (syncstatus *SyncStatus) WaitSaveDone() {
- syncstatus.waitdone <- 1
- syncstatus.rc.WaitDone()
- <-syncstatus.waitdone
- }
- func (syncstatus *SyncStatus) Save(key string, dc *DoneCount) (err error) {
- syncstatus.countstatus.mutex.Lock()
- syncstatus.countstatus.DoneCount[key] = dc
- syncstatus.countstatus.TotalUseTime = time.Since(syncstatus.starttime)
- outputloginfo := false
- if time.Since(dc.lastlogtime) > 5*time.Second {
- dc.lastlogtime = time.Now()
- outputloginfo = true
- }
- syncstatus.countstatus.mutex.Unlock()
- if outputloginfo {
- logger.Info("sync data:", key, "vtime:", dc.FromVtime, "~", dc.LastSyncVtime, "records:", dc.RecordsCount, "lastvtime:", dc.LastDataVtime)
- }
- syncstatus.rc.CallLast2Only(func() {
- if !syncstatus.lastsavetime.Equal(time.Time{}) {
- interval := 10 * time.Second
- realinterval := time.Since(syncstatus.lastsavetime)
- if realinterval < interval {
- t := time.NewTimer(interval - realinterval)
- select {
- case <-t.C:
- case v := <-syncstatus.waitdone:
- syncstatus.waitdone <- v
- }
- }
- }
- syncstatus.countstatus.mutex.RLock()
- syncstatusbs, e := json.MarshalIndent(syncstatus.countstatus, "", " ")
- syncstatus.countstatus.mutex.RUnlock()
- if e != nil {
- syncstatus.lasterror = e
- return
- }
- e = mio.WriteFile(syncstatus.filepath, syncstatusbs, true)
- if e != nil {
- syncstatus.lasterror = e
- return
- }
- syncstatus.lastsavetime = time.Now()
- // fmt.Println(syncstatus.lastsavetime)
- })
- return syncstatus.lasterror
- }
|