dfstest.go 7.2 KB


  1. package main
  2. import (
  3. "bytes"
  4. "encoding/base64"
  5. "flag"
  6. "fmt"
  7. "io"
  8. "math"
  9. "math/rand"
  10. "os"
  11. "path"
  12. "path/filepath"
  13. "sync/atomic"
  14. "time"
  15. "git.wecise.com/wecise/common/cassandra"
  16. "git.wecise.com/wecise/common/dfs"
  17. "git.wecise.com/wecise/common/etcd"
  18. "git.wecise.com/wecise/common/matrix/logger"
  19. "gopkg.in/src-d/go-billy.v4"
  20. )
  21. // 一个持续并发文件读写验证的程序
  22. // 在指定目录下写入一些随机内容的文本文件
  23. // 写入完成后读取并验证写入文件内容的正确性
  24. // 验证正确后在指定的延时时间后将文件删除
  25. // 文件尺寸可以在指定的范围内随机生成
  26. // 所有可指定变量以命令行参数形式传入,并设有默认值
  27. // 每秒在控制台输出验证成功的文件计数信息
  28. // 验证内容不一致时保留文件不删除,提示验证失败的文件路径,程序结束。
  29. func init() {
  30. logger.SetFormat("yyyy-MM-dd HH:mm:ss [pid] [level] msg", "\n")
  31. logger.SetRollingFile("", "dfstest.log", -1, 1*1024*1024, math.MaxInt64, 1)
  32. }
  33. var mdfs billy.Filesystem
  34. func initDFS(keyspace string) error {
  35. cassandraOption := cassandra.Option{Keyspace: keyspace, DisableInitialHostLookup: os.Getenv("VERSION") == "dev"}
  36. ccli, err := cassandra.NewClientByOption(cassandraOption)
  37. if err != nil {
  38. return fmt.Errorf("cassandra.NewClient Error: %v", err)
  39. }
  40. ecli, err := etcd.Get()
  41. if err != nil {
  42. return fmt.Errorf("etcd.Get Error: %v", err)
  43. }
  44. mdfs = dfs.New(keyspace, ccli.Session, ecli)
  45. return nil
  46. }
  47. // func writefile(filePath string, content []byte) (err error) {
  48. // dir := path.Dir(filePath)
  49. // f, e := os.Stat(dir)
  50. // if os.IsNotExist(e) || !f.IsDir() {
  51. // if err = os.MkdirAll(dir, os.ModePerm); err != nil {
  52. // return
  53. // }
  54. // }
  55. // // 写入文件
  56. // err = ioutil.WriteFile(filePath, content, 0777)
  57. // return
  58. // }
  59. // func readfile(filePath string) ([]byte, error) {
  60. // return ioutil.ReadFile(filePath)
  61. // }
  62. // func deletefile(filePath string) error {
  63. // return os.Remove(filePath)
  64. // }
  65. func writefile(filePath string, content []byte) (err error) {
  66. // defer func() {
  67. // x := recover()
  68. // if x != nil {
  69. // if e, ok := x.(error); ok {
  70. // err = e
  71. // } else {
  72. // err = fmt.Errorf("writefile error, %v", x)
  73. // }
  74. // }
  75. // }()
  76. dir := path.Dir(filePath)
  77. fi, e := mdfs.Stat(dir)
  78. if os.IsNotExist(e) || !fi.IsDir() {
  79. if err := mdfs.MkdirAll(dir, os.ModePerm); err != nil {
  80. return err
  81. }
  82. }
  83. // 写入文件
  84. f, err := mdfs.OpenFile(filePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.ModePerm)
  85. if err != nil {
  86. return err
  87. }
  88. _, err = f.Write(content)
  89. if err1 := f.Close(); err1 != nil && err == nil {
  90. err = err1
  91. }
  92. return err
  93. }
  94. func readfile(filePath string) (bs []byte, err error) {
  95. // defer func() {
  96. // x := recover()
  97. // if x != nil {
  98. // if e, ok := x.(error); ok {
  99. // err = e
  100. // } else {
  101. // err = fmt.Errorf("readfile error, %v", x)
  102. // }
  103. // }
  104. // }()
  105. var size int
  106. if info, err := mdfs.Stat(filePath); err == nil {
  107. size64 := info.Size()
  108. if int64(int(size64)) == size64 {
  109. size = int(size64)
  110. }
  111. }
  112. f, err := mdfs.Open(filePath)
  113. if err != nil {
  114. return nil, err
  115. }
  116. defer f.Close()
  117. size++ // one byte for final read at EOF
  118. // If a file claims a small size, read at least 512 bytes.
  119. // In particular, files in Linux's /proc claim size 0 but
  120. // then do not work right if read in small pieces,
  121. // so an initial read of 1 byte would not work correctly.
  122. if size < 512 {
  123. size = 512
  124. }
  125. data := make([]byte, 0, size)
  126. for {
  127. if len(data) >= cap(data) {
  128. d := append(data[:cap(data)], 0)
  129. data = d[:len(data)]
  130. }
  131. n, err := f.Read(data[len(data):cap(data)])
  132. data = data[:len(data)+n]
  133. if err != nil {
  134. if err == io.EOF {
  135. err = nil
  136. }
  137. return data, err
  138. }
  139. }
  140. }
  141. func deletefile(filePath string) (err error) {
  142. // defer func() {
  143. // x := recover()
  144. // if x != nil {
  145. // if e, ok := x.(error); ok {
  146. // err = e
  147. // } else {
  148. // err = fmt.Errorf("deletefile error, %v", x)
  149. // }
  150. // }
  151. // }()
  152. return mdfs.Remove(filePath)
  153. }
  154. func main() {
  155. // 设置命令行参数
  156. keyspace := flag.String("keyspace", "ootest", "keyspace")
  157. rootPath := flag.String("root-path", "/opt/dfstest", "文件目录")
  158. delay := flag.Duration("delay", 30*time.Second, "删除延迟时间")
  159. concurLimit := flag.Int("concur-limit", 1, "最大并发数")
  160. fileSizeMin := flag.Int("file-size-min", 1024, "最小文件尺寸")
  161. fileSizeMax := flag.Int("file-size-max", 10240, "最大文件尺寸")
  162. flag.Parse()
  163. // mutex := sync.Mutex{}
  164. // errorcount := map[string]int{}
  165. // 初始化随机数生成器
  166. rand.Seed(time.Now().UnixNano())
  167. if err := initDFS(*keyspace); err != nil {
  168. logger.Error(err)
  169. return
  170. }
  171. type fileinfo struct {
  172. filepath string
  173. filesize int
  174. checksum uint64
  175. content string
  176. }
  177. checkChan := make(chan *fileinfo, *concurLimit)
  178. logger.Info("开始验证")
  179. // 并发写入文件
  180. go func() {
  181. writeChan := make(chan struct{}, *concurLimit)
  182. for i := 0; ; i++ {
  183. // 并发控制
  184. writeChan <- struct{}{}
  185. go func(i int) {
  186. defer func() {
  187. <-writeChan
  188. }()
  189. // 随机生成文件名和文件内容
  190. dirName := fmt.Sprintf("%02d", rand.Intn(100))
  191. fileName := fmt.Sprintf("%03d", rand.Intn(1000)) + ".txt"
  192. filePath := filepath.Join(*rootPath, dirName, fileName)
  193. fileBinarySize := (rand.Intn(*fileSizeMax-*fileSizeMin) + *fileSizeMin) / 4 * 3 // 二进制长度
  194. // 随机生成文件名和文件内容
  195. bs := make([]byte, fileBinarySize)
  196. rand.Read(bs)
  197. content := base64.RawURLEncoding.EncodeToString(bs)
  198. go func() {
  199. // 写入文件
  200. if err := writefile(filePath, []byte(content)); err != nil {
  201. logger.Error("写入文件", filePath, "失败:", err)
  202. return
  203. }
  204. }()
  205. // 写入完成
  206. checkChan <- &fileinfo{
  207. filepath: filePath,
  208. filesize: len(content),
  209. content: content,
  210. }
  211. }(i)
  212. }
  213. }()
  214. var successCount int32
  215. var failedCount int32
  216. // 并发验证和删除文件
  217. go func() {
  218. for fi := range checkChan {
  219. go func(fi *fileinfo) {
  220. // 读取文件
  221. var content []byte
  222. var err error
  223. var t = time.Now()
  224. for (err == nil || len(content) == 0) && time.Since(t) < 60*time.Second {
  225. content, err = readfile(fi.filepath)
  226. if err != nil {
  227. if !os.IsNotExist(err) {
  228. logger.Error("读取文件", fi.filepath, "失败:", err)
  229. return
  230. }
  231. time.Sleep(10 * time.Millisecond)
  232. }
  233. }
  234. // 验证文件内容是否正确
  235. expectedSize := fi.filesize
  236. if len(content) != expectedSize || !bytes.Equal(content, []byte(fi.content)) {
  237. // logger.Error("验证失败", fi.filepath, fmt.Sprintf("文件内容与期望值不一致,写入内容为:\n%s\n读出内容为:\n%s", fi.content, string(content)))
  238. // os.Exit(1)
  239. atomic.AddInt32(&failedCount, 1)
  240. return
  241. }
  242. // 验证成功,统计计数
  243. atomic.AddInt32(&successCount, 1)
  244. // 延迟删除文件
  245. time.AfterFunc(*delay, func() {
  246. if err := deletefile(fi.filepath); err != nil {
  247. logger.Error("删除文件", fi.filepath, "失败:", err)
  248. }
  249. })
  250. }(fi)
  251. }
  252. }()
  253. t := time.NewTicker(2 * time.Second)
  254. for {
  255. select {
  256. case <-t.C:
  257. // 输出计数信息
  258. logger.Info("读写一致验证成功数:", successCount, "失败数", failedCount)
  259. }
  260. }
  261. }