libf 5 ماه پیش
والد
کامیت
39b757833b
2فایلهای تغییر یافته به همراه55 افزوده شده و 42 حذف شده
  1. 43 1
      cgf/cgf.go
  2. 12 41
      main.go

+ 43 - 1
cgf/cgf.go

@@ -5,11 +5,53 @@ import (
 	"fmt"
 	"io"
 	"os"
+	"path/filepath"
+	"sync"
+	"sync/atomic"
 
 	"git.wecise.com/wecise/cgimport/cgf/reader"
+	"git.wecise.com/wecise/util/filewalker"
+	"git.wecise.com/wecise/util/rc"
 )
 
-func Import(filepath string) (blockcount int, err error) {
+func ImportDir(datapath string, parallel int) (filescount, recordscount int64, err error) {
+	// 遍历文件目录
+	var cgirc = rc.NewRoutinesController("", parallel)
+	var wg sync.WaitGroup
+	fw, e := filewalker.NewFileWalker([]string{datapath}, ".*")
+	if e != nil {
+		err = e
+		return
+	}
+	e = fw.List(func(basedir string, fpath string) bool {
+		if err != nil {
+			return false
+		}
+		filename := filepath.Join(basedir, fpath)
+		wg.Add(1)
+		cgirc.ConcurCall(1,
+			func() {
+				defer wg.Done()
+				records, e := ImportFile(filename)
+				if e != nil {
+					err = e
+					return
+				}
+				atomic.AddInt64(&filescount, 1)
+				atomic.AddInt64(&recordscount, int64(records))
+			},
+		)
+		return true
+	})
+	wg.Wait()
+	if e != nil {
+		err = e
+		return
+	}
+	return
+}
+
+func ImportFile(filepath string) (blockcount int, err error) {
 	f, e := os.Open(filepath)
 	if e != nil {
 		return blockcount, e

+ 12 - 41
main.go

@@ -2,15 +2,11 @@ package main
 
 import (
 	"fmt"
-	"path/filepath"
-	"sync"
-	"sync/atomic"
+	"os"
 	"time"
 
 	"git.wecise.com/wecise/cgimport/cgf"
 	"git.wecise.com/wecise/util/cfg"
-	"git.wecise.com/wecise/util/filewalker"
-	"git.wecise.com/wecise/util/rc"
 )
 
 // 获取配置信息
@@ -22,49 +18,24 @@ import (
 var mcfg = cfg.MConfig()
 
 func main() {
+	// 开始计时
 	st := time.Now()
-	filescount := int64(0)
-	recordscount := int64(0)
-
+	// 配置参数
 	// 文件目录
-	datapath := mcfg.GetStrings("datapath", "data")
+	datapath := mcfg.GetString("datapath", "data")
 	// 并发数
 	parallel := mcfg.GetInt("parallel", 10)
-
-	// 遍历文件目录
-	var cgirc = rc.NewRoutinesController("", parallel)
-	var wg sync.WaitGroup
-	var err error
-	fw, e := filewalker.NewFileWalker(datapath, ".*")
-	if e != nil {
-		panic(e)
+	// 检查参数,文件目录
+	f, _ := os.Stat(datapath)
+	if f == nil {
+		fmt.Println(`put all data files in directory "` + datapath + `"`)
+		return
 	}
-	e = fw.List(func(basedir string, fpath string) bool {
-		if err != nil {
-			return false
-		}
-		filename := filepath.Join(basedir, fpath)
-		wg.Add(1)
-		cgirc.ConcurCall(1,
-			func() {
-				defer wg.Done()
-				records, e := cgf.Import(filename)
-				if e != nil {
-					err = e
-					return
-				}
-				atomic.AddInt64(&filescount, 1)
-				atomic.AddInt64(&recordscount, int64(records))
-			},
-		)
-		return true
-	})
-	wg.Wait()
+	// 导入
+	filescount, recordscount, e := cgf.ImportDir(datapath, parallel)
 	if e != nil {
 		panic(e)
 	}
-	if err != nil {
-		panic(err)
-	}
+	// 输出统计信息
 	fmt.Println("total import", filescount, "files", recordscount, "records", "in", time.Since(st))
 }