mql_run_dir.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. package odbcmql
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "sync/atomic"
  7. "testing"
  8. "time"
  9. "gitee.com/wecisecode/util/pqc"
  10. "github.com/stretchr/testify/assert"
  11. )
  12. func (mt *MQLTest) RunAllDirs(t *testing.T, ctx context.Context, global *GlobalVars, topvars *CurrentVars) bool {
  13. logger.Info("开始第", topvars.loop_i, "次执行")
  14. st := time.Now()
  15. // 读取文件列表
  16. listdirs := []string{}
  17. dirfiles := map[string][]string{}
  18. err := mt.fw.List(func(basedir, filename string) bool {
  19. if dirfiles[basedir] == nil {
  20. listdirs = append(listdirs, basedir)
  21. }
  22. dirfiles[basedir] = append(dirfiles[basedir], filename)
  23. return true
  24. })
  25. if !assert.Nil(t, err, err) {
  26. return false
  27. }
  28. // 执行所有目录
  29. for _, basedir := range listdirs {
  30. // 循环执行目录下所有文件
  31. mt.scopevars.Lock()
  32. if mt.scopevars.dir[basedir] == nil {
  33. mt.scopevars.dir[basedir] = &Variables{
  34. vars: map[string]interface{}{},
  35. loop_count: 1,
  36. loop_from: 1,
  37. loop_step: 1}
  38. }
  39. mt.scopevars.Unlock()
  40. var wg sync.WaitGroup
  41. st := time.Now()
  42. loop_i := 0
  43. parallel_queue := pqc.NewQueue[any](0)
  44. mqlcount := int32(0)
  45. for {
  46. mt.scopevars.Lock()
  47. ok := loop_i < mt.scopevars.dir[basedir].loop_count
  48. mt.scopevars.Unlock()
  49. if !ok {
  50. break
  51. }
  52. loop_i++
  53. //
  54. ch_parallel_count := make(chan int)
  55. dirvars := &CurrentVars{
  56. loop_i: loop_i,
  57. ch_parallel_count: ch_parallel_count,
  58. }
  59. //
  60. ok_chan := make(chan bool, 1)
  61. parallel_chan := make(chan bool, 1)
  62. parallel := false
  63. parallelcount := 0
  64. done := false
  65. wg.Add(1)
  66. go func(basedir string) {
  67. defer func() {
  68. atomic.AddInt32(&mqlcount, dirvars.mqlcount)
  69. wg.Done()
  70. }()
  71. ch_ok := make(chan bool)
  72. go func() {
  73. for {
  74. select {
  75. case <-ch_parallel_count:
  76. if !done && !parallel {
  77. parallel = true
  78. // 加入并发控制队列
  79. if parallelcount > 0 {
  80. if parallelcount > parallel_queue.Size() {
  81. parallel_queue.Growth(parallelcount)
  82. }
  83. parallel_queue.Push(1)
  84. }
  85. parallel_chan <- true
  86. }
  87. case ok := <-ch_ok:
  88. ok_chan <- ok
  89. if parallel {
  90. if parallelcount > 0 {
  91. // 从并发控制队列中移除
  92. parallel_queue.Pop()
  93. }
  94. } else {
  95. done = true
  96. }
  97. // 一个目录执行完成
  98. return
  99. }
  100. }
  101. }()
  102. // logger.Info("dir", basedir, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i), "次执行开始")
  103. files := dirfiles[basedir]
  104. ch_ok <- mt.RunDirFiles(t, ctx,
  105. global,
  106. topvars,
  107. dirvars,
  108. basedir, files)
  109. // logger.Info("dir", basedir, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i), "次执行结束")
  110. }(basedir)
  111. success := true
  112. select {
  113. case success = <-ok_chan: // 非并发,等待完成
  114. logger.Info("dir", basedir, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i), "次顺序执行完成")
  115. case <-parallel_chan: // 并发,执行继续下一次
  116. logger.Info("dir", basedir, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i), "次并发执行继续")
  117. }
  118. if !success {
  119. return false
  120. }
  121. }
  122. wg.Wait()
  123. mt.scopevars.RLock()
  124. loop_count := mt.scopevars.dir[basedir].loop_count
  125. mt.scopevars.RUnlock()
  126. if loop_count > 1 {
  127. ut := time.Since(st)
  128. logger.Info(fmt.Sprint("dir ", basedir, " loop ", loop_count, " times, run ", mqlcount, " mqls, usetime ", ut))
  129. }
  130. }
  131. logger.Info("完成第", topvars.loop_i, "次执行,共", topvars.mqlcount, "次 MQL 请求,耗时", time.Since(st))
  132. return true
  133. }
  134. func (mt *MQLTest) RunDirFiles(t *testing.T, ctx context.Context,
  135. global *GlobalVars,
  136. topvars *CurrentVars,
  137. dirvars *CurrentVars,
  138. basedir string, files []string) bool {
  139. for _, filename := range files {
  140. ok := mt.RunFile(t, ctx, global, topvars, dirvars, basedir, filename)
  141. if !ok {
  142. return false
  143. }
  144. }
  145. return true
  146. }