| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- package odbcmql
- import (
- "context"
- "fmt"
- "sync"
- "sync/atomic"
- "testing"
- "time"
- "gitee.com/wecisecode/util/pqc"
- "github.com/stretchr/testify/assert"
- )
- func (mt *MQLTest) RunAllDirs(t *testing.T, ctx context.Context, global *GlobalVars, topvars *CurrentVars) bool {
- logger.Info("开始第", topvars.loop_i, "次执行")
- st := time.Now()
- // 读取文件列表
- listdirs := []string{}
- dirfiles := map[string][]string{}
- err := mt.fw.List(func(basedir, filename string) bool {
- if dirfiles[basedir] == nil {
- listdirs = append(listdirs, basedir)
- }
- dirfiles[basedir] = append(dirfiles[basedir], filename)
- return true
- })
- if !assert.Nil(t, err, err) {
- return false
- }
- // 执行所有目录
- for _, basedir := range listdirs {
- // 循环执行目录下所有文件
- mt.scopevars.Lock()
- if mt.scopevars.dir[basedir] == nil {
- mt.scopevars.dir[basedir] = &Variables{
- vars: map[string]interface{}{},
- loop_count: 1,
- loop_from: 1,
- loop_step: 1}
- }
- mt.scopevars.Unlock()
- var wg sync.WaitGroup
- st := time.Now()
- loop_i := 0
- parallel_queue := pqc.NewQueue[any](0)
- mqlcount := int32(0)
- for {
- mt.scopevars.Lock()
- ok := loop_i < mt.scopevars.dir[basedir].loop_count
- mt.scopevars.Unlock()
- if !ok {
- break
- }
- loop_i++
- //
- ch_parallel_count := make(chan int)
- dirvars := &CurrentVars{
- loop_i: loop_i,
- ch_parallel_count: ch_parallel_count,
- }
- //
- ok_chan := make(chan bool, 1)
- parallel_chan := make(chan bool, 1)
- parallel := false
- parallelcount := 0
- done := false
- wg.Add(1)
- go func(basedir string) {
- defer func() {
- atomic.AddInt32(&mqlcount, dirvars.mqlcount)
- wg.Done()
- }()
- ch_ok := make(chan bool)
- go func() {
- for {
- select {
- case <-ch_parallel_count:
- if !done && !parallel {
- parallel = true
- // 加入并发控制队列
- if parallelcount > 0 {
- if parallelcount > parallel_queue.Size() {
- parallel_queue.Growth(parallelcount)
- }
- parallel_queue.Push(1)
- }
- parallel_chan <- true
- }
- case ok := <-ch_ok:
- ok_chan <- ok
- if parallel {
- if parallelcount > 0 {
- // 从并发控制队列中移除
- parallel_queue.Pop()
- }
- } else {
- done = true
- }
- // 一个目录执行完成
- return
- }
- }
- }()
- // logger.Info("dir", basedir, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i), "次执行开始")
- files := dirfiles[basedir]
- ch_ok <- mt.RunDirFiles(t, ctx,
- global,
- topvars,
- dirvars,
- basedir, files)
- // logger.Info("dir", basedir, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i), "次执行结束")
- }(basedir)
- success := true
- select {
- case success = <-ok_chan: // 非并发,等待完成
- logger.Info("dir", basedir, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i), "次顺序执行完成")
- case <-parallel_chan: // 并发,执行继续下一次
- logger.Info("dir", basedir, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i), "次并发执行继续")
- }
- if !success {
- return false
- }
- }
- wg.Wait()
- mt.scopevars.RLock()
- loop_count := mt.scopevars.dir[basedir].loop_count
- mt.scopevars.RUnlock()
- if loop_count > 1 {
- ut := time.Since(st)
- logger.Info(fmt.Sprint("dir ", basedir, " loop ", loop_count, " times, run ", mqlcount, " mqls, usetime ", ut))
- }
- }
- logger.Info("完成第", topvars.loop_i, "次执行,共", topvars.mqlcount, "次 MQL 请求,耗时", time.Since(st))
- return true
- }
- func (mt *MQLTest) RunDirFiles(t *testing.T, ctx context.Context,
- global *GlobalVars,
- topvars *CurrentVars,
- dirvars *CurrentVars,
- basedir string, files []string) bool {
- for _, filename := range files {
- ok := mt.RunFile(t, ctx, global, topvars, dirvars, basedir, filename)
- if !ok {
- return false
- }
- }
- return true
- }
|