mql_run_top.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package odbcmql
  2. import (
  3. "context"
  4. "sync"
  5. "sync/atomic"
  6. "testing"
  7. "time"
  8. "gitee.com/wecisecode/util/filewalker"
  9. "gitee.com/wecisecode/util/pqc"
  10. )
  11. func (mt *MQLTest) Run(t *testing.T, fw *filewalker.FileWalker) (retok bool) {
  12. mt.t = t
  13. mt.fw = fw
  14. global := &GlobalVars{
  15. CurrentVars: &CurrentVars{},
  16. wg_wait_fork_routine: make(map[string]*sync.WaitGroup),
  17. ch_wait_mql_done: make(map[string]chan bool),
  18. }
  19. toploop := config.GetInt("loop")
  20. if toploop <= 0 {
  21. toploop = 1
  22. }
  23. mt.scopevars = &ScopeVars{
  24. top: &Variables{
  25. vars: map[string]interface{}{},
  26. loop_count: toploop,
  27. loop_from: 1,
  28. loop_step: 1},
  29. dir: map[string]*Variables{},
  30. file: map[string]*Variables{},
  31. mql: map[string]*Variables{}}
  32. ctx, cancel := context.WithCancel(context.Background())
  33. defer cancel()
  34. st := time.Now()
  35. loop_i := 0
  36. parallel_queue := pqc.NewQueue[any](0)
  37. mqlcount := int32(0)
  38. defer func() {
  39. mt.scopevars.RLock()
  40. loop_count := mt.scopevars.top.loop_count
  41. mt.scopevars.RUnlock()
  42. ut := time.Since(st)
  43. aut := time.Duration(0)
  44. if mqlcount > 0 {
  45. aut = global.totalusetime / time.Duration(mqlcount)
  46. }
  47. logger.Info("完成 ", loop_count, " 次执行,共", mqlcount, "次 MQL 请求,耗时", ut, "单条响应时间", global.minusetime, "~", global.maxusetime, "/", aut, "平均每秒吞吐量", (int64(mqlcount)*int64(time.Second))/int64(ut))
  48. }()
  49. var wg sync.WaitGroup
  50. for {
  51. mt.scopevars.Lock()
  52. ok := loop_i < mt.scopevars.top.loop_count
  53. mt.scopevars.Unlock()
  54. if !ok {
  55. break
  56. }
  57. loop_i++
  58. //
  59. ch_parallel_count := make(chan int)
  60. topvars := &CurrentVars{
  61. loop_i: loop_i,
  62. ch_parallel_count: ch_parallel_count,
  63. }
  64. //
  65. ok_chan := make(chan bool, 1)
  66. parallel_chan := make(chan bool, 1)
  67. parallel := false
  68. parallelcount := 0
  69. done := false
  70. wg.Add(1)
  71. go func() {
  72. defer func() {
  73. atomic.AddInt32(&mqlcount, topvars.mqlcount)
  74. wg.Done()
  75. }()
  76. ch_ok := make(chan bool)
  77. go func() {
  78. for {
  79. select {
  80. case <-ch_parallel_count:
  81. if !done && !parallel {
  82. parallel = true
  83. // 加入并发控制队列
  84. if parallelcount > 0 {
  85. if parallelcount > parallel_queue.Size() {
  86. parallel_queue.Growth(parallelcount)
  87. }
  88. parallel_queue.Push(1)
  89. }
  90. parallel_chan <- true
  91. }
  92. case ok := <-ch_ok:
  93. ok_chan <- ok
  94. if parallel {
  95. if parallelcount > 0 {
  96. // 从并发控制队列中移除
  97. parallel_queue.Pop()
  98. }
  99. } else {
  100. done = true
  101. }
  102. // 整个测试完成一次
  103. // 只保留 top 级变量,清除变量
  104. mt.scopevars.Lock()
  105. mt.scopevars.dir = map[string]*Variables{}
  106. mt.scopevars.file = map[string]*Variables{}
  107. mt.scopevars.mql = map[string]*Variables{}
  108. mt.scopevars.Unlock()
  109. return
  110. }
  111. }
  112. }()
  113. ok := mt.RunAllDirs(t, ctx,
  114. global,
  115. topvars)
  116. ch_ok <- ok
  117. if !ok {
  118. cancel() // 并发测试,有一个线程出错,就全停
  119. return
  120. }
  121. }()
  122. success := true
  123. select {
  124. case success = <-ok_chan: // 非并发,等待完成
  125. case <-parallel_chan: // 并发,执行继续下一次
  126. logger.Info("第", topvars.loop_i, "次并发执行继续")
  127. }
  128. if !success {
  129. return false // 失败,不等,直接返回
  130. }
  131. }
  132. wg.Wait()
  133. return
  134. }