package odbcmql import ( "context" "sync" "sync/atomic" "testing" "time" "gitee.com/wecisecode/util/filewalker" "gitee.com/wecisecode/util/pqc" ) func (mt *MQLTest) Run(t *testing.T, fw *filewalker.FileWalker) (retok bool) { mt.t = t mt.fw = fw global := &GlobalVars{ CurrentVars: &CurrentVars{}, wg_wait_fork_routine: make(map[string]*sync.WaitGroup), ch_wait_mql_done: make(map[string]chan bool), } toploop := config.GetInt("loop") if toploop <= 0 { toploop = 1 } mt.scopevars = &ScopeVars{ top: &Variables{ vars: map[string]interface{}{}, loop_count: toploop, loop_from: 1, loop_step: 1}, dir: map[string]*Variables{}, file: map[string]*Variables{}, mql: map[string]*Variables{}} ctx, cancel := context.WithCancel(context.Background()) defer cancel() st := time.Now() loop_i := 0 parallel_queue := pqc.NewQueue[any](0) mqlcount := int32(0) defer func() { mt.scopevars.RLock() loop_count := mt.scopevars.top.loop_count mt.scopevars.RUnlock() ut := time.Since(st) aut := time.Duration(0) if mqlcount > 0 { aut = global.totalusetime / time.Duration(mqlcount) } logger.Info("完成 ", loop_count, " 次执行,共", mqlcount, "次 MQL 请求,耗时", ut, "单条响应时间", global.minusetime, "~", global.maxusetime, "/", aut, "平均每秒吞吐量", (int64(mqlcount)*int64(time.Second))/int64(ut)) }() var wg sync.WaitGroup for { mt.scopevars.Lock() ok := loop_i < mt.scopevars.top.loop_count mt.scopevars.Unlock() if !ok { break } loop_i++ // ch_parallel_count := make(chan int) topvars := &CurrentVars{ loop_i: loop_i, ch_parallel_count: ch_parallel_count, } // ok_chan := make(chan bool, 1) parallel_chan := make(chan bool, 1) parallel := false parallelcount := 0 done := false wg.Add(1) go func() { defer func() { atomic.AddInt32(&mqlcount, topvars.mqlcount) wg.Done() }() ch_ok := make(chan bool) go func() { for { select { case <-ch_parallel_count: if !done && !parallel { parallel = true // 加入并发控制队列 if parallelcount > 0 { if parallelcount > parallel_queue.Size() { parallel_queue.Growth(parallelcount) } parallel_queue.Push(1) } parallel_chan <- true } case ok := <-ch_ok: ok_chan <- ok if parallel { if parallelcount > 0 { // 从并发控制队列中移除 parallel_queue.Pop() } } else { done = true } // 整个测试完成一次 // 只保留 top 级变量,清除变量 mt.scopevars.Lock() mt.scopevars.dir = map[string]*Variables{} mt.scopevars.file = map[string]*Variables{} mt.scopevars.mql = map[string]*Variables{} mt.scopevars.Unlock() return } } }() ok := mt.RunAllDirs(t, ctx, global, topvars) ch_ok <- ok if !ok { cancel() // 并发测试,有一个线程出错,就全停 return } }() success := true select { case success = <-ok_chan: // 非并发,等待完成 case <-parallel_chan: // 并发,执行继续下一次 logger.Info("第", topvars.loop_i, "次并发执行继续") } if !success { return false // 失败,不等,直接返回 } } wg.Wait() return }