123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293 |
- package main
- import (
- "bytes"
- "encoding/base64"
- "flag"
- "fmt"
- "io"
- "math"
- "math/rand"
- "os"
- "path"
- "path/filepath"
- "sync/atomic"
- "time"
- "git.wecise.com/wecise/common/cassandra"
- "git.wecise.com/wecise/common/dfs"
- "git.wecise.com/wecise/common/etcd"
- "github.com/wecisecode/util/logger"
- "gopkg.in/src-d/go-billy.v4"
- )
- // 一个持续并发文件读写验证的程序
- // 在指定目录下写入一些随机内容的文本文件
- // 写入完成后读取并验证写入文件内容的正确性
- // 验证正确后在指定的延时时间后将文件删除
- // 文件尺寸可以在指定的范围内随机生成
- // 所有可指定变量以命令行参数形式传入,并设有默认值
- // 每秒在控制台输出验证成功的文件计数信息
- // 验证内容不一致时保留文件不删除,提示验证失败的文件路径,程序结束。
- func init() {
- logger.SetFormat("yyyy-MM-dd HH:mm:ss [pid] [level] msg", "\n")
- logger.SetRollingFile("", "dfstest.log", -1, 1*1024*1024, math.MaxInt64, 1)
- }
- var mdfs billy.Filesystem
- func initDFS(keyspace string) error {
- cassandraOption := cassandra.Option{Keyspace: keyspace, DisableInitialHostLookup: os.Getenv("VERSION") == "dev"}
- ccli, err := cassandra.NewClientByOption(cassandraOption)
- if err != nil {
- return fmt.Errorf("cassandra.NewClient Error: %v", err)
- }
- ecli, err := etcd.Get()
- if err != nil {
- return fmt.Errorf("etcd.Get Error: %v", err)
- }
- mdfs = dfs.New(keyspace, ccli.Session, ecli)
- return nil
- }
- // func writefile(filePath string, content []byte) (err error) {
- // dir := path.Dir(filePath)
- // f, e := os.Stat(dir)
- // if os.IsNotExist(e) || !f.IsDir() {
- // if err = os.MkdirAll(dir, os.ModePerm); err != nil {
- // return
- // }
- // }
- // // 写入文件
- // err = ioutil.WriteFile(filePath, content, 0777)
- // return
- // }
- // func readfile(filePath string) ([]byte, error) {
- // return ioutil.ReadFile(filePath)
- // }
- // func deletefile(filePath string) error {
- // return os.Remove(filePath)
- // }
- func writefile(filePath string, content []byte) (err error) {
- // defer func() {
- // x := recover()
- // if x != nil {
- // if e, ok := x.(error); ok {
- // err = e
- // } else {
- // err = fmt.Errorf("writefile error, %v", x)
- // }
- // }
- // }()
- dir := path.Dir(filePath)
- fi, e := mdfs.Stat(dir)
- if os.IsNotExist(e) || !fi.IsDir() {
- if err := mdfs.MkdirAll(dir, os.ModePerm); err != nil {
- return err
- }
- }
- // 写入文件
- f, err := mdfs.OpenFile(filePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.ModePerm)
- if err != nil {
- return err
- }
- _, err = f.Write(content)
- if err1 := f.Close(); err1 != nil && err == nil {
- err = err1
- }
- return err
- }
- func readfile(filePath string) (bs []byte, err error) {
- // defer func() {
- // x := recover()
- // if x != nil {
- // if e, ok := x.(error); ok {
- // err = e
- // } else {
- // err = fmt.Errorf("readfile error, %v", x)
- // }
- // }
- // }()
- var size int
- if info, err := mdfs.Stat(filePath); err == nil {
- size64 := info.Size()
- if int64(int(size64)) == size64 {
- size = int(size64)
- }
- }
- f, err := mdfs.Open(filePath)
- if err != nil {
- return nil, err
- }
- defer f.Close()
- size++ // one byte for final read at EOF
- // If a file claims a small size, read at least 512 bytes.
- // In particular, files in Linux's /proc claim size 0 but
- // then do not work right if read in small pieces,
- // so an initial read of 1 byte would not work correctly.
- if size < 512 {
- size = 512
- }
- data := make([]byte, 0, size)
- for {
- if len(data) >= cap(data) {
- d := append(data[:cap(data)], 0)
- data = d[:len(data)]
- }
- n, err := f.Read(data[len(data):cap(data)])
- data = data[:len(data)+n]
- if err != nil {
- if err == io.EOF {
- err = nil
- }
- return data, err
- }
- }
- }
- func deletefile(filePath string) (err error) {
- // defer func() {
- // x := recover()
- // if x != nil {
- // if e, ok := x.(error); ok {
- // err = e
- // } else {
- // err = fmt.Errorf("deletefile error, %v", x)
- // }
- // }
- // }()
- return mdfs.Remove(filePath)
- }
- func main() {
- // 设置命令行参数
- keyspace := flag.String("keyspace", "ootest", "keyspace")
- rootPath := flag.String("root-path", "/opt/dfstest", "文件目录")
- delay := flag.Duration("delay", 30*time.Second, "删除延迟时间")
- concurLimit := flag.Int("concur-limit", 1, "最大并发数")
- fileSizeMin := flag.Int("file-size-min", 1024, "最小文件尺寸")
- fileSizeMax := flag.Int("file-size-max", 10240, "最大文件尺寸")
- flag.Parse()
- // mutex := sync.Mutex{}
- // errorcount := map[string]int{}
- // 初始化随机数生成器
- rand.Seed(time.Now().UnixNano())
- if err := initDFS(*keyspace); err != nil {
- logger.Error(err)
- return
- }
- type fileinfo struct {
- filepath string
- filesize int
- checksum uint64
- content string
- }
- checkChan := make(chan *fileinfo, *concurLimit)
- logger.Info("开始验证")
- // 并发写入文件
- go func() {
- writeChan := make(chan struct{}, *concurLimit)
- for i := 0; ; i++ {
- // 并发控制
- writeChan <- struct{}{}
- go func(i int) {
- defer func() {
- <-writeChan
- }()
- // 随机生成文件名和文件内容
- dirName := fmt.Sprintf("%02d", rand.Intn(100))
- fileName := fmt.Sprintf("%03d", rand.Intn(1000)) + ".txt"
- filePath := filepath.Join(*rootPath, dirName, fileName)
- fileBinarySize := (rand.Intn(*fileSizeMax-*fileSizeMin) + *fileSizeMin) / 4 * 3 // 二进制长度
- // 随机生成文件名和文件内容
- bs := make([]byte, fileBinarySize)
- rand.Read(bs)
- content := base64.RawURLEncoding.EncodeToString(bs)
- go func() {
- // 写入文件
- if err := writefile(filePath, []byte(content)); err != nil {
- logger.Error("写入文件", filePath, "失败:", err)
- return
- }
- }()
- // 写入完成
- checkChan <- &fileinfo{
- filepath: filePath,
- filesize: len(content),
- content: content,
- }
- }(i)
- }
- }()
- var successCount int32
- var failedCount int32
- // 并发验证和删除文件
- go func() {
- for fi := range checkChan {
- go func(fi *fileinfo) {
- // 读取文件
- var content []byte
- var err error
- var t = time.Now()
- for (err == nil || len(content) == 0) && time.Since(t) < 60*time.Second {
- content, err = readfile(fi.filepath)
- if err != nil {
- if !os.IsNotExist(err) {
- logger.Error("读取文件", fi.filepath, "失败:", err)
- return
- }
- time.Sleep(10 * time.Millisecond)
- }
- }
- // 验证文件内容是否正确
- expectedSize := fi.filesize
- if len(content) != expectedSize || !bytes.Equal(content, []byte(fi.content)) {
- // logger.Error("验证失败", fi.filepath, fmt.Sprintf("文件内容与期望值不一致,写入内容为:\n%s\n读出内容为:\n%s", fi.content, string(content)))
- // os.Exit(1)
- atomic.AddInt32(&failedCount, 1)
- return
- }
- // 验证成功,统计计数
- atomic.AddInt32(&successCount, 1)
- // 延迟删除文件
- time.AfterFunc(*delay, func() {
- if err := deletefile(fi.filepath); err != nil {
- logger.Error("删除文件", fi.filepath, "失败:", err)
- }
- })
- }(fi)
- }
- }()
- t := time.NewTicker(2 * time.Second)
- for {
- select {
- case <-t.C:
- // 输出计数信息
- logger.Info("读写一致验证成功数:", successCount, "失败数", failedCount)
- }
- }
- }
|