package main import ( "bytes" "encoding/base64" "flag" "fmt" "io" "math" "math/rand" "os" "path" "path/filepath" "sync/atomic" "time" "git.wecise.com/wecise/common/cassandra" "git.wecise.com/wecise/common/dfs" "git.wecise.com/wecise/common/etcd" "git.wecise.com/wecise/common/matrix/logger" "gopkg.in/src-d/go-billy.v4" ) // 一个持续并发文件读写验证的程序 // 在指定目录下写入一些随机内容的文本文件 // 写入完成后读取并验证写入文件内容的正确性 // 验证正确后在指定的延时时间后将文件删除 // 文件尺寸可以在指定的范围内随机生成 // 所有可指定变量以命令行参数形式传入,并设有默认值 // 每秒在控制台输出验证成功的文件计数信息 // 验证内容不一致时保留文件不删除,提示验证失败的文件路径,程序结束。 func init() { logger.SetFormat("yyyy-MM-dd HH:mm:ss [pid] [level] msg", "\n") logger.SetRollingFile("", "dfstest.log", -1, 1*1024*1024, math.MaxInt64, 1) } var mdfs billy.Filesystem func initDFS(keyspace string) error { cassandraOption := cassandra.Option{Keyspace: keyspace, DisableInitialHostLookup: os.Getenv("VERSION") == "dev"} ccli, err := cassandra.GetClientByOption(cassandraOption) if err != nil { return fmt.Errorf("cassandra.GetClient Error: %v", err) } ecli, err := etcd.Get() if err != nil { return fmt.Errorf("etcd.Get Error: %v", err) } mdfs = dfs.New(keyspace, ccli.Session, ecli) return nil } // func writefile(filePath string, content []byte) (err error) { // dir := path.Dir(filePath) // f, e := os.Stat(dir) // if os.IsNotExist(e) || !f.IsDir() { // if err = os.MkdirAll(dir, os.ModePerm); err != nil { // return // } // } // // 写入文件 // err = ioutil.WriteFile(filePath, content, 0777) // return // } // func readfile(filePath string) ([]byte, error) { // return ioutil.ReadFile(filePath) // } // func deletefile(filePath string) error { // return os.Remove(filePath) // } func writefile(filePath string, content []byte) (err error) { // defer func() { // x := recover() // if x != nil { // if e, ok := x.(error); ok { // err = e // } else { // err = fmt.Errorf("writefile error, %v", x) // } // } // }() dir := path.Dir(filePath) fi, e := mdfs.Stat(dir) if os.IsNotExist(e) || !fi.IsDir() { if err := mdfs.MkdirAll(dir, os.ModePerm); err != nil { return err } } // 写入文件 f, err := mdfs.OpenFile(filePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.ModePerm) if err != nil { return err } _, err = f.Write(content) if err1 := f.Close(); err1 != nil && err == nil { err = err1 } return err } func readfile(filePath string) (bs []byte, err error) { // defer func() { // x := recover() // if x != nil { // if e, ok := x.(error); ok { // err = e // } else { // err = fmt.Errorf("readfile error, %v", x) // } // } // }() var size int if info, err := mdfs.Stat(filePath); err == nil { size64 := info.Size() if int64(int(size64)) == size64 { size = int(size64) } } f, err := mdfs.Open(filePath) if err != nil { return nil, err } defer f.Close() size++ // one byte for final read at EOF // If a file claims a small size, read at least 512 bytes. // In particular, files in Linux's /proc claim size 0 but // then do not work right if read in small pieces, // so an initial read of 1 byte would not work correctly. if size < 512 { size = 512 } data := make([]byte, 0, size) for { if len(data) >= cap(data) { d := append(data[:cap(data)], 0) data = d[:len(data)] } n, err := f.Read(data[len(data):cap(data)]) data = data[:len(data)+n] if err != nil { if err == io.EOF { err = nil } return data, err } } } func deletefile(filePath string) (err error) { // defer func() { // x := recover() // if x != nil { // if e, ok := x.(error); ok { // err = e // } else { // err = fmt.Errorf("deletefile error, %v", x) // } // } // }() return mdfs.Remove(filePath) } func main() { // 设置命令行参数 keyspace := flag.String("keyspace", "ootest", "keyspace") rootPath := flag.String("root-path", "/opt/dfstest", "文件目录") delay := flag.Duration("delay", 30*time.Second, "删除延迟时间") concurLimit := flag.Int("concur-limit", 1, "最大并发数") fileSizeMin := flag.Int("file-size-min", 1024, "最小文件尺寸") fileSizeMax := flag.Int("file-size-max", 10240, "最大文件尺寸") flag.Parse() // mutex := sync.Mutex{} // errorcount := map[string]int{} // 初始化随机数生成器 rand.Seed(time.Now().UnixNano()) if err := initDFS(*keyspace); err != nil { logger.Error(err) return } type fileinfo struct { filepath string filesize int checksum uint64 content string } checkChan := make(chan *fileinfo, *concurLimit) logger.Info("开始验证") // 并发写入文件 go func() { writeChan := make(chan struct{}, *concurLimit) for i := 0; ; i++ { // 并发控制 writeChan <- struct{}{} go func(i int) { defer func() { <-writeChan }() // 随机生成文件名和文件内容 dirName := fmt.Sprintf("%02d", rand.Intn(100)) fileName := fmt.Sprintf("%03d", rand.Intn(1000)) + ".txt" filePath := filepath.Join(*rootPath, dirName, fileName) fileBinarySize := (rand.Intn(*fileSizeMax-*fileSizeMin) + *fileSizeMin) / 4 * 3 // 二进制长度 // 随机生成文件名和文件内容 bs := make([]byte, fileBinarySize) rand.Read(bs) content := base64.RawURLEncoding.EncodeToString(bs) go func() { // 写入文件 if err := writefile(filePath, []byte(content)); err != nil { logger.Error("写入文件", filePath, "失败:", err) return } }() // 写入完成 checkChan <- &fileinfo{ filepath: filePath, filesize: len(content), content: content, } }(i) } }() var successCount int32 var failedCount int32 // 并发验证和删除文件 go func() { for fi := range checkChan { go func(fi *fileinfo) { // 读取文件 var content []byte var err error var t = time.Now() for (err == nil || len(content) == 0) && time.Since(t) < 60*time.Second { content, err = readfile(fi.filepath) if err != nil { if !os.IsNotExist(err) { logger.Error("读取文件", fi.filepath, "失败:", err) return } time.Sleep(10 * time.Millisecond) } } // 验证文件内容是否正确 expectedSize := fi.filesize if len(content) != expectedSize || !bytes.Equal(content, []byte(fi.content)) { // logger.Error("验证失败", fi.filepath, fmt.Sprintf("文件内容与期望值不一致,写入内容为:\n%s\n读出内容为:\n%s", fi.content, string(content))) // os.Exit(1) atomic.AddInt32(&failedCount, 1) return } // 验证成功,统计计数 atomic.AddInt32(&successCount, 1) // 延迟删除文件 time.AfterFunc(*delay, func() { if err := deletefile(fi.filepath); err != nil { logger.Error("删除文件", fi.filepath, "失败:", err) } }) }(fi) } }() t := time.NewTicker(2 * time.Second) for { select { case <-t.C: // 输出计数信息 logger.Info("读写一致验证成功数:", successCount, "失败数", failedCount) } } }