package importer import ( "encoding/json" "os" "sync" "time" "git.wecise.com/wecise/cgimport/odbc" "github.com/wecisecode/util/mio" "github.com/wecisecode/util/rc" ) type ImportStatus struct { LinesCount int64 RecordsCount int64 RetryCount int64 } type CGIStatus struct { filepath string // TotalUseTime time.Duration ImportStatus map[string]*ImportStatus // mutex sync.RWMutex rc *rc.RoutinesController lasterror error lastsavetime time.Time waitdone chan any } func NewCGIStatus() *CGIStatus { return &CGIStatus{ filepath: mcfg.GetString("cgi.statusfile", "/opt/matrix/var/cgimport/"+odbc.Keyspace()+".status.txt"), ImportStatus: map[string]*ImportStatus{}, rc: rc.NewRoutinesController("", 1), waitdone: make(chan any, 1), } } func (cgistatus *CGIStatus) Load() error { logger.Info("load progress from", cgistatus.filepath) cgistatusbs, e := mio.ReadFile(cgistatus.filepath) if e != nil && !os.IsNotExist(e) { return e } if len(cgistatusbs) > 0 { e = json.Unmarshal(cgistatusbs, &cgistatus) if e != nil { logger.Warn(e) } } return nil } func (cgistatus *CGIStatus) WaitSaveDone() { cgistatus.waitdone <- 1 cgistatus.rc.WaitDone() } func (cgistatus *CGIStatus) Save() (err error) { cgistatus.rc.CallLast2Only(func() { if !cgistatus.lastsavetime.Equal(time.Time{}) { interval := 10 * time.Second realinterval := time.Since(cgistatus.lastsavetime) if realinterval < interval { t := time.NewTimer(interval - realinterval) select { case <-t.C: case v := <-cgistatus.waitdone: cgistatus.waitdone <- v } } } cgistatus.mutex.RLock() cgistatusbs, e := json.MarshalIndent(cgistatus, "", " ") cgistatus.mutex.RUnlock() if e != nil { cgistatus.lasterror = e return } e = mio.WriteFile(cgistatus.filepath, cgistatusbs, true) if e != nil { cgistatus.lasterror = e return } cgistatus.lastsavetime = time.Now() // fmt.Println(cgistatus.lastsavetime) }) return cgistatus.lasterror }