package odbcmql import ( "context" "encoding/json" "fmt" "math/rand" "strconv" "strings" "sync" "sync/atomic" "testing" "time" "gitee.com/wecisecode/util/cast" "gitee.com/wecisecode/util/pqc" "github.com/stretchr/testify/assert" ) func (mt *MQLTest) RunMQR(t *testing.T, ctx context.Context, global *GlobalVars, topvars *CurrentVars, dirvars *CurrentVars, filevars *CurrentVars, basedir, ffpath string, mqr *MQLRequest) bool { mqlkey := mqr.Key staticactionexprs := mqr.StaticActionExprs staticactions := mqr.StaticActions return t.Run(mqlkey, func(t *testing.T) { var wg sync.WaitGroup st := time.Now() loop_i := 0 parallel_queue := pqc.NewQueue[any](0) mqlcount := int32(0) for { mt.scopevars.Lock() ok := loop_i < mt.scopevars.mql[mqlkey].loop_count mt.scopevars.Unlock() if !ok { break } loop_i++ // ch_parallel_count := make(chan int) mqlvars := &CurrentVars{ loop_i: loop_i, ch_parallel_count: ch_parallel_count, } // 运行实例 mqlsn := fmt.Sprint(mqlkey, "/", topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i, ".", mqlvars.loop_i) mt.scopevars.Lock() if mt.scopevars.mql[mqlsn] == nil { mt.scopevars.mql[mqlsn] = &Variables{ vars: map[string]interface{}{}, loop_count: 1, loop_from: 1, loop_step: 1} } mt.scopevars.Unlock() // mt.scopevars.Lock() top_loopi := mt.scopevars.top.loop_from + (topvars.loop_i-1)*mt.scopevars.top.loop_step mt.scopevars.top.vars["topi"] = top_loopi dir_loopi := mt.scopevars.dir[basedir].loop_from + (dirvars.loop_i-1)*mt.scopevars.dir[basedir].loop_step mt.scopevars.dir[basedir].vars["diri"] = dir_loopi mt.scopevars.dir[basedir].vars["topi"] = top_loopi file_loopi := mt.scopevars.file[ffpath].loop_from + (filevars.loop_i-1)*mt.scopevars.file[ffpath].loop_step mt.scopevars.file[ffpath].vars["filei"] = file_loopi mt.scopevars.file[ffpath].vars["diri"] = dir_loopi mt.scopevars.file[ffpath].vars["topi"] = top_loopi mql_loopi := mt.scopevars.mql[mqlkey].loop_from + (mqlvars.loop_i-1)*mt.scopevars.mql[mqlkey].loop_step mt.scopevars.mql[mqlkey].vars["mqli"] = mql_loopi mt.scopevars.mql[mqlkey].vars["filei"] = file_loopi mt.scopevars.mql[mqlkey].vars["diri"] = dir_loopi mt.scopevars.mql[mqlkey].vars["topi"] = top_loopi // mt.scopevars.mql[mqlsn].vars["mqli"] = mql_loopi mt.scopevars.mql[mqlsn].vars["filei"] = file_loopi mt.scopevars.mql[mqlsn].vars["diri"] = dir_loopi mt.scopevars.mql[mqlsn].vars["topi"] = top_loopi mt.scopevars.Unlock() mt.BeforeRunAction(basedir, ffpath, mqlkey, mqlsn, staticactions) formatedmqlstr := mqr.FormatedQueryString mql := mt.ReplaceLoopSN(formatedmqlstr, global, topvars, dirvars, filevars, mqlvars, top_loopi, dir_loopi, file_loopi, mql_loopi, basedir, ffpath, mqlkey, mqlsn, ) mqlreplace := rereplace_nbytes.FindAllStringSubmatch(mql, -1) for _, mqr := range mqlreplace { if len(mqr) == 2 { bs := make([]byte, cast.ToInt(mqr[1])) for i := 0; i < len(bs); i++ { bs[i] = byte(32 + rand.Intn(91)) } s := string(bs) s = strings.ReplaceAll(s, ")", string([]byte{123})) s = strings.ReplaceAll(s, "'", string([]byte{124})) s = strings.ReplaceAll(s, "\"", string([]byte{125})) s = strings.ReplaceAll(s, "\\", string([]byte{126})) mql = strings.Replace(mql, mqr[0], s, 1) } } for i, sat := range staticactionexprs { mql = strings.Replace(mql, "["+strconv.Itoa(i)+"]", sat.SourceText, 1) } mql = strings.ReplaceAll(mql, "[[]", "[") mqri := &MQLRequestInstance{ MQLRequest: mqr, PreparedQueryString: mql, } // ok_chan := make(chan error, 1) parallel_chan := make(chan bool, 1) parallel := false parallelcount := 0 done := false wg.Add(1) go func(mqlvars *CurrentVars) { defer func() { atomic.AddInt32(&mqlcount, mqlvars.mqlcount) wg.Done() }() ch_ok := make(chan error) go func() { for { select { case parallelcount = <-ch_parallel_count: if !done && !parallel { parallel = true // 加入并发控制队列 if parallelcount > 0 { if parallelcount > parallel_queue.Size() { parallel_queue.Growth(parallelcount) } parallel_queue.Push(1) } parallel_chan <- true } case ok := <-ch_ok: ok_chan <- ok if parallel { if parallelcount > 0 { // 从并发控制队列中移除 parallel_queue.Pop() } } else { done = true } return } } }() nstaticactionexprs := map[int]*Action{} for i, sat := range staticactionexprs { nstaticactionexprs[i] = sat } // logger.Info(mqlkey, "第", fmt.Sprint(all_loop_i, ".", dir_loop_i, ".", file_loop_i, ".", curval.mql_i), "次执行开始") ch_ok <- mt.RunMQL(t, ctx, global, topvars, dirvars, filevars, mqlvars, basedir, ffpath, mqlkey, mqlsn, mqri, staticactions, nstaticactionexprs) // logger.Info(mqlkey, "第", fmt.Sprint(all_loop_i, ".", dir_loop_i, ".", file_loop_i, ".", curval.mql_i), "次执行结束") }(mqlvars) var err error select { case err = <-ok_chan: // 非并发,等待完成 // logger.Info(mqlkey, "第", fmt.Sprint(all_loop_i, ".", dir_loop_i, ".", file_loop_i, ".", curval.mql_i), "次顺序执行完成") case <-parallel_chan: // 并发,执行继续下一次 // logger.Info(mqlkey, "第", fmt.Sprint(all_loop_i, ".", dir_loop_i, ".", file_loop_i, ".", curval.mql_i), "次并发执行继续") } if err != nil { // logger.Error(err) break } } wg.Wait() mt.scopevars.RLock() loop_count := mt.scopevars.mql[mqlkey].loop_count mt.scopevars.RUnlock() if loop_count > 1 { ut := time.Since(st) sn := fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i) as := "" if topvars.sleeptime > 0 { as = fmt.Sprint(", sleep ", topvars.sleeptime) } logger.Info(fmt.Sprint("mql ", mqlkey+"/"+sn, " loop ", loop_i, " times, usetime ", ut, as)) } }) } // 执行一条命令 func (mt *MQLTest) RunMQL(t *testing.T, ctx context.Context, global *GlobalVars, topvars *CurrentVars, dirvars *CurrentVars, filevars *CurrentVars, mqlvars *CurrentVars, basedir, ffpath, mqlkey, mqlsn string, mqri *MQLRequestInstance, staticactions *StaticActions, staticactionexprs map[int]*Action) error { mqlstr := mqri.PreparedQueryString if staticactions.WaitName != nil { global.Lock() wgs := []*sync.WaitGroup{} if *staticactions.WaitName == "" { for _, wg := range global.wg_wait_fork_routine { wgs = append(wgs, wg) } } else { wg := global.wg_wait_fork_routine[*staticactions.WaitName] if wg != nil { wgs = append(wgs, wg) } } global.Unlock() for _, wg := range wgs { wg.Wait() } } if staticactions.ParallelCount != nil { mt.scopevars.Lock() switch staticactions.Scope { case "top": topvars.ch_parallel_count <- *staticactions.ParallelCount case "dir": dirvars.ch_parallel_count <- *staticactions.ParallelCount case "file": filevars.ch_parallel_count <- *staticactions.ParallelCount default: mqlvars.ch_parallel_count <- *staticactions.ParallelCount } mt.scopevars.Unlock() } // 重新获取修正后的动作 actionexprs, e := getActionExprs(mqlstr, staticactionexprs) if !assert.Nil(t, e, e) { return e } dynamicactions := actionexprs.DynamicActions() if len(dynamicactions.SubscribeArgs) > 0 { subscribe(dynamicactions.SubscribeArgs...) } if len(dynamicactions.UnsubscribeArgs) > 0 { unsubscribe(dynamicactions.UnsubscribeArgs...) } values := []interface{}{} if len(dynamicactions.Params) > 0 { err := json.Unmarshal([]byte(dynamicactions.Params), &values) if err != nil { assert.Nil(t, fmt.Sprint("params参数只支持JSON Array,", dynamicactions.Params, mqlstr), err) return err } } x := atomic.AddInt32(&global.mqlcount, 1) atomic.AddInt32(&topvars.mqlcount, 1) atomic.AddInt32(&dirvars.mqlcount, 1) atomic.AddInt32(&filevars.mqlcount, 1) atomic.AddInt32(&mqlvars.mqlcount, 1) err := mt.RunMQLTryDo(t, ctx, global, topvars, dirvars, filevars, mqlvars, basedir, ffpath, mqlkey, mqlsn+"("+strconv.Itoa(int(x))+")", mqri, values, staticactions, actionexprs) if err != nil { // 执行过程有错,停止继续执行 return err } if staticactions.SleepTime != nil { global.sleeptime += *staticactions.SleepTime topvars.sleeptime += *staticactions.SleepTime dirvars.sleeptime += *staticactions.SleepTime filevars.sleeptime += *staticactions.SleepTime mqlvars.sleeptime += *staticactions.SleepTime time.Sleep(*staticactions.SleepTime) } return nil }