package odbcmql import ( "context" "fmt" "sync" "sync/atomic" "testing" "time" "gitee.com/wecisecode/util/pqc" "github.com/stretchr/testify/assert" ) func (mt *MQLTest) RunAllDirs(t *testing.T, ctx context.Context, global *GlobalVars, topvars *CurrentVars) bool { logger.Info("开始第", topvars.loop_i, "次执行") st := time.Now() // 读取文件列表 listdirs := []string{} dirfiles := map[string][]string{} err := mt.fw.List(func(basedir, filename string) bool { if dirfiles[basedir] == nil { listdirs = append(listdirs, basedir) } dirfiles[basedir] = append(dirfiles[basedir], filename) return true }) if !assert.Nil(t, err, err) { return false } // 执行所有目录 for _, basedir := range listdirs { // 循环执行目录下所有文件 mt.scopevars.Lock() if mt.scopevars.dir[basedir] == nil { mt.scopevars.dir[basedir] = &Variables{ vars: map[string]interface{}{}, loop_count: 1, loop_from: 1, loop_step: 1} } mt.scopevars.Unlock() var wg sync.WaitGroup st := time.Now() loop_i := 0 parallel_queue := pqc.NewQueue[any](0) mqlcount := int32(0) for { mt.scopevars.Lock() ok := loop_i < mt.scopevars.dir[basedir].loop_count mt.scopevars.Unlock() if !ok { break } loop_i++ // ch_parallel_count := make(chan int) dirvars := &CurrentVars{ loop_i: loop_i, ch_parallel_count: ch_parallel_count, } // ok_chan := make(chan bool, 1) parallel_chan := make(chan bool, 1) parallel := false parallelcount := 0 done := false wg.Add(1) go func(basedir string) { defer func() { atomic.AddInt32(&mqlcount, dirvars.mqlcount) wg.Done() }() ch_ok := make(chan bool) go func() { for { select { case <-ch_parallel_count: if !done && !parallel { parallel = true // 加入并发控制队列 if parallelcount > 0 { if parallelcount > parallel_queue.Size() { parallel_queue.Growth(parallelcount) } parallel_queue.Push(1) } parallel_chan <- true } case ok := <-ch_ok: ok_chan <- ok if parallel { if parallelcount > 0 { // 从并发控制队列中移除 parallel_queue.Pop() } } else { done = true } // 一个目录执行完成 return } } }() // logger.Info("dir", basedir, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i), "次执行开始") files := dirfiles[basedir] ch_ok <- mt.RunDirFiles(t, ctx, global, topvars, dirvars, basedir, files) // logger.Info("dir", basedir, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i), "次执行结束") }(basedir) success := true select { case success = <-ok_chan: // 非并发,等待完成 logger.Info("dir", basedir, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i), "次顺序执行完成") case <-parallel_chan: // 并发,执行继续下一次 logger.Info("dir", basedir, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i), "次并发执行继续") } if !success { return false } } wg.Wait() mt.scopevars.RLock() loop_count := mt.scopevars.dir[basedir].loop_count mt.scopevars.RUnlock() if loop_count > 1 { ut := time.Since(st) logger.Info(fmt.Sprint("dir ", basedir, " loop ", loop_count, " times, run ", mqlcount, " mqls, usetime ", ut)) } } logger.Info("完成第", topvars.loop_i, "次执行,共", topvars.mqlcount, "次 MQL 请求,耗时", time.Since(st)) return true } func (mt *MQLTest) RunDirFiles(t *testing.T, ctx context.Context, global *GlobalVars, topvars *CurrentVars, dirvars *CurrentVars, basedir string, files []string) bool { for _, filename := range files { ok := mt.RunFile(t, ctx, global, topvars, dirvars, basedir, filename) if !ok { return false } } return true }