فهرست منبع

import continue

libf 5 ماه پیش
والد
کامیت
1ec821077e
4فایلهای تغییر یافته به همراه125 افزوده شده و 16 حذف شده
  1. 1 1
      go.mod
  2. 2 2
      go.sum
  3. 118 7
      importer/importer.go
  4. 4 6
      main.go

+ 1 - 1
go.mod

@@ -4,7 +4,7 @@ go 1.20
 
 require (
 	git.wecise.com/wecise/odb-go v0.0.0-20250123142240-7c4d1df627b4
-	git.wecise.com/wecise/util v0.0.0-20250204084114-10f3a56ec32b
+	git.wecise.com/wecise/util v0.0.0-20250204110134-ab7523fa652e
 	github.com/scylladb/go-set v1.0.3-0.20200225121959-cc7b2070d91e
 	github.com/spf13/cast v1.7.0
 )

+ 2 - 2
go.sum

@@ -1,7 +1,7 @@
 git.wecise.com/wecise/odb-go v0.0.0-20250123142240-7c4d1df627b4 h1:VUr6whmUPURppVSqLeDnHa5yk2n05FjrsonJTzd6Pt0=
 git.wecise.com/wecise/odb-go v0.0.0-20250123142240-7c4d1df627b4/go.mod h1:ZgFysxr5kKxDUrFcwCWJs5ntFsQtK0Q77o6ZU4bYQnM=
-git.wecise.com/wecise/util v0.0.0-20250204084114-10f3a56ec32b h1:GQ5IfKNb+XBziRCd0PmkBlteXf1upyp4sNhpAZS3UZE=
-git.wecise.com/wecise/util v0.0.0-20250204084114-10f3a56ec32b/go.mod h1:2YXWE9m5mNgAu40zpYrL3woGz6S8CoHAW/CJeWXaIko=
+git.wecise.com/wecise/util v0.0.0-20250204110134-ab7523fa652e h1:rQK0btS9RtVP6pAky3oaJ1yk42W8wn/nUHeSlDEPbOE=
+git.wecise.com/wecise/util v0.0.0-20250204110134-ab7523fa652e/go.mod h1:2YXWE9m5mNgAu40zpYrL3woGz6S8CoHAW/CJeWXaIko=
 github.com/bluele/gcache v0.0.2 h1:WcbfdXICg7G/DGBh1PFfcirkWOQV+v077yF1pSy3DGw=
 github.com/bluele/gcache v0.0.2/go.mod h1:m15KV+ECjptwSPxKhOhQoAFQVtUFjTVkc3H8o0t/fp0=
 github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao=

+ 118 - 7
importer/importer.go

@@ -8,6 +8,7 @@ import (
 	"path/filepath"
 	"sync"
 	"sync/atomic"
+	"time"
 
 	"git.wecise.com/wecise/cgimport/odbc"
 	"git.wecise.com/wecise/cgimport/reader"
@@ -26,7 +27,86 @@ type Importer struct {
 	odbcimporter *ODBCImporter
 }
 
-func ImportDir(datapath string, parallel int) (filescount, recordscount int64, err error) {
+type ImportStatus struct {
+	RecordsCount int64
+}
+
+type CGIStatus struct {
+	mutex        sync.RWMutex
+	UseTime      time.Duration
+	ImportStatus map[string]*ImportStatus
+	rc           *rc.RoutinesController
+	lasterror    error
+	lastsavetime time.Time
+	waitdone     chan any
+}
+
+var cgistatusfile = mcfg.GetString("cgi.statusfile", "/opt/matrix/var/cgimport.status.txt")
+
+func NewCGIStatus() *CGIStatus {
+	return &CGIStatus{
+		ImportStatus: map[string]*ImportStatus{},
+		rc:           rc.NewRoutinesController("", 1),
+		waitdone:     make(chan any, 1),
+	}
+}
+
+func (cgistatus *CGIStatus) Load() error {
+	cgistatusbs, e := os.ReadFile(cgistatusfile)
+	if e != nil && !os.IsNotExist(e) {
+		return e
+	}
+	if len(cgistatusbs) > 0 {
+		e = json.Unmarshal(cgistatusbs, &cgistatus)
+		if e != nil {
+			logger.Warn(e)
+		}
+	}
+	return nil
+}
+
+func (cgistatus *CGIStatus) WaitSaveDone() {
+	cgistatus.waitdone <- 1
+	cgistatus.rc.WaitDone()
+}
+
+func (cgistatus *CGIStatus) Save() (err error) {
+	cgistatus.rc.CallLast2Only(func() {
+		if !cgistatus.lastsavetime.Equal(time.Time{}) {
+			interval := time.Since(cgistatus.lastsavetime)
+			if interval < 1*time.Second {
+				t := time.NewTimer(1*time.Second - interval)
+				select {
+				case <-t.C:
+				case v := <-cgistatus.waitdone:
+					cgistatus.waitdone <- v
+				}
+			}
+		}
+		cgistatus.mutex.RLock()
+		cgistatusbs, e := json.MarshalIndent(cgistatus, "", "  ")
+		cgistatus.mutex.RUnlock()
+		if e != nil {
+			cgistatus.lasterror = e
+			return
+		}
+		e = os.MkdirAll(filepath.Dir(cgistatusfile), os.ModePerm)
+		if e != nil {
+			cgistatus.lasterror = e
+			return
+		}
+		e = os.WriteFile(cgistatusfile, cgistatusbs, os.ModePerm)
+		if e != nil {
+			cgistatus.lasterror = e
+			return
+		}
+		cgistatus.lastsavetime = time.Now()
+		// fmt.Println(cgistatus.lastsavetime)
+	})
+	return cgistatus.lasterror
+}
+
+func ImportDir(datapath string, parallel int) (totalfilescount, totalrecordscount int64, totalusetime time.Duration, filescount, recordscount int64, usetime time.Duration, err error) {
 	importer := &Importer{
 		datapath:     datapath,
 		parallel:     parallel,
@@ -36,8 +116,7 @@ func ImportDir(datapath string, parallel int) (filescount, recordscount int64, e
 	return importer.Import()
 }
 
-func (importer *Importer) Import() (filescount, recordscount int64, err error) {
-	// 遍历文件目录
+func (importer *Importer) Import() (totalfilescount, totalrecordscount int64, totalusetime time.Duration, filescount, recordscount int64, usetime time.Duration, err error) {
 	var cgirc = rc.NewRoutinesController("", importer.parallel)
 	var wg sync.WaitGroup
 	fw, e := filewalker.NewFileWalker([]string{importer.datapath}, ".*")
@@ -45,6 +124,22 @@ func (importer *Importer) Import() (filescount, recordscount int64, err error) {
 		err = e
 		return
 	}
+	cgistatus := NewCGIStatus()
+	reload := mcfg.GetString("reload")
+	if reload == "" {
+		e = cgistatus.Load()
+		if e != nil {
+			err = e
+			return
+		}
+	}
+	totalfilescount = int64(len(cgistatus.ImportStatus))
+	for _, v := range cgistatus.ImportStatus {
+		totalrecordscount += v.RecordsCount
+	}
+	st := time.Now().Add(-cgistatus.UseTime)
+	cst := time.Now()
+	// 遍历文件目录
 	e = fw.List(func(basedir string, fpath string) bool {
 		if err != nil {
 			return false
@@ -54,13 +149,28 @@ func (importer *Importer) Import() (filescount, recordscount int64, err error) {
 		cgirc.ConcurCall(1,
 			func() {
 				defer wg.Done()
+				cgistatus.mutex.RLock()
+				importstatus := cgistatus.ImportStatus[filename]
+				cgistatus.mutex.RUnlock()
+				if importstatus != nil {
+					return
+				}
 				records, e := importer.ImportFile(filename)
 				if e != nil {
 					err = e
 					return
 				}
 				atomic.AddInt64(&filescount, 1)
-				atomic.AddInt64(&recordscount, int64(records))
+				atomic.AddInt64(&recordscount, records)
+				atomic.AddInt64(&totalfilescount, 1)
+				atomic.AddInt64(&totalrecordscount, records)
+				usetime = time.Since(cst)
+				totalusetime = time.Since(st)
+				cgistatus.mutex.Lock()
+				cgistatus.ImportStatus[filename] = &ImportStatus{RecordsCount: records}
+				cgistatus.UseTime = totalusetime
+				cgistatus.mutex.Unlock()
+				cgistatus.Save()
 			},
 		)
 		return true
@@ -70,10 +180,11 @@ func (importer *Importer) Import() (filescount, recordscount int64, err error) {
 		err = e
 		return
 	}
+	cgistatus.WaitSaveDone()
 	return
 }
 
-func (importer *Importer) ImportFile(filepath string) (blockcount int, err error) {
+func (importer *Importer) ImportFile(filepath string) (blockcount int64, err error) {
 	f, e := os.Open(filepath)
 	if e != nil {
 		return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filepath}})
@@ -82,7 +193,7 @@ func (importer *Importer) ImportFile(filepath string) (blockcount int, err error
 	return importer.importReader(filepath, f)
 }
 
-func (importer *Importer) importReader(filename string, buf io.Reader) (blockcount int, err error) {
+func (importer *Importer) importReader(filename string, buf io.Reader) (blockcount int64, err error) {
 	br, e := reader.NewBlockReader(filename, buf)
 	if e != nil {
 		return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
@@ -108,7 +219,7 @@ func (importer *Importer) importReader(filename string, buf io.Reader) (blockcou
 				err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"line": fmt.Sprint(linecount)}})
 				return
 			}
-			blockcount++
+			atomic.AddInt64(&blockcount, 1)
 		})
 		if e != nil {
 			return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"line": fmt.Sprint(linecount)}})

+ 4 - 6
main.go

@@ -2,7 +2,6 @@ package main
 
 import (
 	"fmt"
-	"time"
 
 	"git.wecise.com/wecise/cgimport/importer"
 	"git.wecise.com/wecise/cgimport/odbc"
@@ -18,23 +17,22 @@ var mcfg = odbc.Config
 var logger = odbc.Logger
 
 func main() {
-	// 开始计时
-	st := time.Now()
 	// 配置参数
 	// 文件目录
 	datapath := mcfg.GetString("datapath", "data")
 	// 并发数
 	parallel := mcfg.GetInt("parallel", 10)
 	// 导入
-	filescount, recordscount, e := importer.ImportDir(datapath, parallel)
+	totalfilescount, totalrecordscount, totalusetime, filescount, recordscount, usetime, e := importer.ImportDir(datapath, parallel)
 	if e != nil {
 		panic(e)
 	}
-	if filescount == 0 {
+	if totalfilescount == 0 {
 		fmt.Println(`not found data files in "` + datapath + `"`)
 		return
 	}
 	// 输出统计信息
-	fmt.Println("total import", filescount, "files", recordscount, "records", "in", time.Since(st))
+	fmt.Println("import", filescount, "files", recordscount, "records", "in", usetime)
+	fmt.Println("total import", totalfilescount, "files", totalrecordscount, "records", "in", totalusetime)
 	fmt.Println("access", odbc.LogFile, "for detail information")
 }