123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247 |
- package importer
- import (
- "encoding/json"
- "fmt"
- "io"
- "os"
- "path/filepath"
- "sync"
- "sync/atomic"
- "time"
- "git.wecise.com/wecise/cgimport/odbc"
- "git.wecise.com/wecise/cgimport/reader"
- "git.wecise.com/wecise/util/filewalker"
- "git.wecise.com/wecise/util/merrs"
- "git.wecise.com/wecise/util/rc"
- )
- var mcfg = odbc.Config
- var logger = odbc.Logger
- type Importer struct {
- datapath string
- parallel int
- importrc *rc.RoutinesController
- odbcimporter *ODBCImporter
- }
- type ImportStatus struct {
- RecordsCount int64
- }
- type CGIStatus struct {
- mutex sync.RWMutex
- TotalUseTime time.Duration
- ImportStatus map[string]*ImportStatus
- rc *rc.RoutinesController
- lasterror error
- lastsavetime time.Time
- waitdone chan any
- }
- var cgistatusfile = mcfg.GetString("cgi.statusfile", "/opt/matrix/var/cgimport.status.txt")
- func NewCGIStatus() *CGIStatus {
- return &CGIStatus{
- ImportStatus: map[string]*ImportStatus{},
- rc: rc.NewRoutinesController("", 1),
- waitdone: make(chan any, 1),
- }
- }
- func (cgistatus *CGIStatus) Load() error {
- cgistatusbs, e := os.ReadFile(cgistatusfile)
- if e != nil && !os.IsNotExist(e) {
- return e
- }
- if len(cgistatusbs) > 0 {
- e = json.Unmarshal(cgistatusbs, &cgistatus)
- if e != nil {
- logger.Warn(e)
- }
- }
- return nil
- }
- func (cgistatus *CGIStatus) WaitSaveDone() {
- cgistatus.waitdone <- 1
- cgistatus.rc.WaitDone()
- }
- func (cgistatus *CGIStatus) Save() (err error) {
- cgistatus.rc.CallLast2Only(func() {
- if !cgistatus.lastsavetime.Equal(time.Time{}) {
- interval := time.Since(cgistatus.lastsavetime)
- if interval < 1*time.Second {
- t := time.NewTimer(1*time.Second - interval)
- select {
- case <-t.C:
- case v := <-cgistatus.waitdone:
- cgistatus.waitdone <- v
- }
- }
- }
- cgistatus.mutex.RLock()
- cgistatusbs, e := json.MarshalIndent(cgistatus, "", " ")
- cgistatus.mutex.RUnlock()
- if e != nil {
- cgistatus.lasterror = e
- return
- }
- e = os.MkdirAll(filepath.Dir(cgistatusfile), os.ModePerm)
- if e != nil {
- cgistatus.lasterror = e
- return
- }
- e = os.WriteFile(cgistatusfile, cgistatusbs, os.ModePerm)
- if e != nil {
- cgistatus.lasterror = e
- return
- }
- cgistatus.lastsavetime = time.Now()
- // fmt.Println(cgistatus.lastsavetime)
- })
- return cgistatus.lasterror
- }
- func ImportDir(datapath string, parallel int) (totalfilescount, totalrecordscount int64, totalusetime time.Duration, filescount, recordscount int64, usetime time.Duration, err error) {
- importer := &Importer{
- datapath: datapath,
- parallel: parallel,
- importrc: rc.NewRoutinesController("", 1000),
- odbcimporter: NewODBCImporter(),
- }
- return importer.Import()
- }
- func (importer *Importer) Import() (totalfilescount, totalrecordscount int64, totalusetime time.Duration, filescount, recordscount int64, usetime time.Duration, err error) {
- var cgirc = rc.NewRoutinesController("", importer.parallel)
- var wg sync.WaitGroup
- fw, e := filewalker.NewFileWalker([]string{importer.datapath}, ".*")
- if e != nil {
- err = e
- return
- }
- cgistatus := NewCGIStatus()
- reload := mcfg.GetString("reload")
- if reload == "" {
- e = cgistatus.Load()
- if e != nil {
- err = e
- return
- }
- }
- totalfilescount = int64(len(cgistatus.ImportStatus))
- for _, v := range cgistatus.ImportStatus {
- totalrecordscount += v.RecordsCount
- }
- totalusetime = cgistatus.TotalUseTime
- st := time.Now().Add(-totalusetime)
- cst := time.Now()
- // 遍历文件目录
- e = fw.List(func(basedir string, fpath string) bool {
- if err != nil {
- return false
- }
- filename := filepath.Join(basedir, fpath)
- wg.Add(1)
- cgirc.ConcurCall(1,
- func() {
- defer wg.Done()
- cgistatus.mutex.RLock()
- importstatus := cgistatus.ImportStatus[filename]
- cgistatus.mutex.RUnlock()
- if importstatus != nil {
- return
- }
- records, e := importer.ImportFile(filename)
- if e != nil {
- err = e
- return
- }
- atomic.AddInt64(&filescount, 1)
- atomic.AddInt64(&recordscount, records)
- atomic.AddInt64(&totalfilescount, 1)
- atomic.AddInt64(&totalrecordscount, records)
- usetime = time.Since(cst)
- totalusetime = time.Since(st)
- cgistatus.mutex.Lock()
- cgistatus.ImportStatus[filename] = &ImportStatus{RecordsCount: records}
- cgistatus.TotalUseTime = totalusetime
- cgistatus.mutex.Unlock()
- cgistatus.Save()
- },
- )
- return true
- })
- wg.Wait()
- if e != nil {
- err = e
- return
- }
- cgistatus.WaitSaveDone()
- return
- }
- func (importer *Importer) ImportFile(filepath string) (blockcount int64, err error) {
- f, e := os.Open(filepath)
- if e != nil {
- return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filepath}})
- }
- defer f.Close()
- return importer.importReader(filepath, f)
- }
- func (importer *Importer) importReader(filename string, buf io.Reader) (blockcount int64, err error) {
- br, e := reader.NewBlockReader(filename, buf)
- if e != nil {
- return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
- }
- var wg sync.WaitGroup
- defer wg.Wait()
- for {
- if err != nil {
- break
- }
- block, linecount, e := br.ReadBlock()
- if e != nil {
- return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"line": fmt.Sprint(linecount)}})
- }
- if block == nil {
- return
- }
- wg.Add(1)
- e = importer.importrc.ConcurCall(1, func() {
- defer wg.Done()
- e = importer.importRecord(block, filename, linecount)
- if e != nil {
- err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"line": fmt.Sprint(linecount)}})
- return
- }
- atomic.AddInt64(&blockcount, 1)
- })
- if e != nil {
- return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"line": fmt.Sprint(linecount)}})
- }
- }
- return
- }
- func (importer *Importer) importRecord(record map[string]any, filename string, linecount int) (err error) {
- bs, e := json.MarshalIndent(record, "", " ")
- if e != nil {
- return e
- }
- logger.Debug(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs)))
- e = importer.odbcimporter.ReviseClassStruct(record)
- if e != nil {
- return e
- }
- e = importer.odbcimporter.InsertData(record)
- if e != nil {
- return e
- }
- return
- }
|