| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288 |
- package odbcmql
- import (
- "context"
- "encoding/json"
- "fmt"
- "math/rand"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- "testing"
- "time"
- "gitee.com/wecisecode/util/cast"
- "gitee.com/wecisecode/util/pqc"
- "github.com/stretchr/testify/assert"
- )
- func (mt *MQLTest) RunMQR(t *testing.T, ctx context.Context,
- global *GlobalVars,
- topvars *CurrentVars,
- dirvars *CurrentVars,
- filevars *CurrentVars,
- basedir, ffpath string, mqr *MQLRequest) bool {
- mqlkey := mqr.Key
- staticactionexprs := mqr.StaticActionExprs
- staticactions := mqr.StaticActions
- return t.Run(mqlkey, func(t *testing.T) {
- var wg sync.WaitGroup
- st := time.Now()
- loop_i := 0
- parallel_queue := pqc.NewQueue[any](0)
- mqlcount := int32(0)
- for {
- mt.scopevars.Lock()
- ok := loop_i < mt.scopevars.mql[mqlkey].loop_count
- mt.scopevars.Unlock()
- if !ok {
- break
- }
- loop_i++
- //
- ch_parallel_count := make(chan int)
- mqlvars := &CurrentVars{
- loop_i: loop_i,
- ch_parallel_count: ch_parallel_count,
- }
- // 运行实例
- mqlsn := fmt.Sprint(mqlkey, "/", topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i, ".", mqlvars.loop_i)
- mt.scopevars.Lock()
- if mt.scopevars.mql[mqlsn] == nil {
- mt.scopevars.mql[mqlsn] = &Variables{
- vars: map[string]interface{}{},
- loop_count: 1,
- loop_from: 1,
- loop_step: 1}
- }
- mt.scopevars.Unlock()
- //
- mt.scopevars.Lock()
- top_loopi := mt.scopevars.top.loop_from + (topvars.loop_i-1)*mt.scopevars.top.loop_step
- mt.scopevars.top.vars["topi"] = top_loopi
- dir_loopi := mt.scopevars.dir[basedir].loop_from + (dirvars.loop_i-1)*mt.scopevars.dir[basedir].loop_step
- mt.scopevars.dir[basedir].vars["diri"] = dir_loopi
- mt.scopevars.dir[basedir].vars["topi"] = top_loopi
- file_loopi := mt.scopevars.file[ffpath].loop_from + (filevars.loop_i-1)*mt.scopevars.file[ffpath].loop_step
- mt.scopevars.file[ffpath].vars["filei"] = file_loopi
- mt.scopevars.file[ffpath].vars["diri"] = dir_loopi
- mt.scopevars.file[ffpath].vars["topi"] = top_loopi
- mql_loopi := mt.scopevars.mql[mqlkey].loop_from + (mqlvars.loop_i-1)*mt.scopevars.mql[mqlkey].loop_step
- mt.scopevars.mql[mqlkey].vars["mqli"] = mql_loopi
- mt.scopevars.mql[mqlkey].vars["filei"] = file_loopi
- mt.scopevars.mql[mqlkey].vars["diri"] = dir_loopi
- mt.scopevars.mql[mqlkey].vars["topi"] = top_loopi
- //
- mt.scopevars.mql[mqlsn].vars["mqli"] = mql_loopi
- mt.scopevars.mql[mqlsn].vars["filei"] = file_loopi
- mt.scopevars.mql[mqlsn].vars["diri"] = dir_loopi
- mt.scopevars.mql[mqlsn].vars["topi"] = top_loopi
- mt.scopevars.Unlock()
- mt.BeforeRunAction(basedir, ffpath, mqlkey, mqlsn, staticactions)
- formatedmqlstr := mqr.FormatedQueryString
- mql := mt.ReplaceLoopSN(formatedmqlstr,
- global, topvars, dirvars, filevars, mqlvars,
- top_loopi, dir_loopi, file_loopi, mql_loopi,
- basedir, ffpath, mqlkey, mqlsn,
- )
- mqlreplace := rereplace_nbytes.FindAllStringSubmatch(mql, -1)
- for _, mqr := range mqlreplace {
- if len(mqr) == 2 {
- bs := make([]byte, cast.ToInt(mqr[1]))
- for i := 0; i < len(bs); i++ {
- bs[i] = byte(32 + rand.Intn(91))
- }
- s := string(bs)
- s = strings.ReplaceAll(s, ")", string([]byte{123}))
- s = strings.ReplaceAll(s, "'", string([]byte{124}))
- s = strings.ReplaceAll(s, "\"", string([]byte{125}))
- s = strings.ReplaceAll(s, "\\", string([]byte{126}))
- mql = strings.Replace(mql, mqr[0], s, 1)
- }
- }
- for i, sat := range staticactionexprs {
- mql = strings.Replace(mql, "["+strconv.Itoa(i)+"]", sat.SourceText, 1)
- }
- mql = strings.ReplaceAll(mql, "[[]", "[")
- mqri := &MQLRequestInstance{
- MQLRequest: mqr,
- PreparedQueryString: mql,
- }
- //
- ok_chan := make(chan error, 1)
- parallel_chan := make(chan bool, 1)
- parallel := false
- parallelcount := 0
- done := false
- wg.Add(1)
- go func(mqlvars *CurrentVars) {
- defer func() {
- atomic.AddInt32(&mqlcount, mqlvars.mqlcount)
- wg.Done()
- }()
- ch_ok := make(chan error)
- go func() {
- for {
- select {
- case parallelcount = <-ch_parallel_count:
- if !done && !parallel {
- parallel = true
- // 加入并发控制队列
- if parallelcount > 0 {
- if parallelcount > parallel_queue.Size() {
- parallel_queue.Growth(parallelcount)
- }
- parallel_queue.Push(1)
- }
- parallel_chan <- true
- }
- case ok := <-ch_ok:
- ok_chan <- ok
- if parallel {
- if parallelcount > 0 {
- // 从并发控制队列中移除
- parallel_queue.Pop()
- }
- } else {
- done = true
- }
- return
- }
- }
- }()
- nstaticactionexprs := map[int]*Action{}
- for i, sat := range staticactionexprs {
- nstaticactionexprs[i] = sat
- }
- // logger.Info(mqlkey, "第", fmt.Sprint(all_loop_i, ".", dir_loop_i, ".", file_loop_i, ".", curval.mql_i), "次执行开始")
- ch_ok <- mt.RunMQL(t, ctx,
- global,
- topvars,
- dirvars,
- filevars,
- mqlvars,
- basedir, ffpath, mqlkey, mqlsn, mqri, staticactions, nstaticactionexprs)
- // logger.Info(mqlkey, "第", fmt.Sprint(all_loop_i, ".", dir_loop_i, ".", file_loop_i, ".", curval.mql_i), "次执行结束")
- }(mqlvars)
- var err error
- select {
- case err = <-ok_chan: // 非并发,等待完成
- // logger.Info(mqlkey, "第", fmt.Sprint(all_loop_i, ".", dir_loop_i, ".", file_loop_i, ".", curval.mql_i), "次顺序执行完成")
- case <-parallel_chan: // 并发,执行继续下一次
- // logger.Info(mqlkey, "第", fmt.Sprint(all_loop_i, ".", dir_loop_i, ".", file_loop_i, ".", curval.mql_i), "次并发执行继续")
- }
- if err != nil {
- // logger.Error(err)
- break
- }
- }
- wg.Wait()
- mt.scopevars.RLock()
- loop_count := mt.scopevars.mql[mqlkey].loop_count
- mt.scopevars.RUnlock()
- if loop_count > 1 {
- ut := time.Since(st)
- sn := fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i)
- as := ""
- if topvars.sleeptime > 0 {
- as = fmt.Sprint(", sleep ", topvars.sleeptime)
- }
- logger.Info(fmt.Sprint("mql ", mqlkey+"/"+sn, " loop ", loop_i, " times, usetime ", ut, as))
- }
- })
- }
- // 执行一条命令
- func (mt *MQLTest) RunMQL(t *testing.T, ctx context.Context,
- global *GlobalVars,
- topvars *CurrentVars,
- dirvars *CurrentVars,
- filevars *CurrentVars,
- mqlvars *CurrentVars,
- basedir, ffpath, mqlkey, mqlsn string, mqri *MQLRequestInstance, staticactions *StaticActions, staticactionexprs map[int]*Action) error {
- mqlstr := mqri.PreparedQueryString
- if staticactions.WaitName != nil {
- global.Lock()
- wgs := []*sync.WaitGroup{}
- if *staticactions.WaitName == "" {
- for _, wg := range global.wg_wait_fork_routine {
- wgs = append(wgs, wg)
- }
- } else {
- wg := global.wg_wait_fork_routine[*staticactions.WaitName]
- if wg != nil {
- wgs = append(wgs, wg)
- }
- }
- global.Unlock()
- for _, wg := range wgs {
- wg.Wait()
- }
- }
- if staticactions.ParallelCount != nil {
- mt.scopevars.Lock()
- switch staticactions.Scope {
- case "top":
- topvars.ch_parallel_count <- *staticactions.ParallelCount
- case "dir":
- dirvars.ch_parallel_count <- *staticactions.ParallelCount
- case "file":
- filevars.ch_parallel_count <- *staticactions.ParallelCount
- default:
- mqlvars.ch_parallel_count <- *staticactions.ParallelCount
- }
- mt.scopevars.Unlock()
- }
- // 重新获取修正后的动作
- actionexprs, e := getActionExprs(mqlstr, staticactionexprs)
- if !assert.Nil(t, e, e) {
- return e
- }
- dynamicactions := actionexprs.DynamicActions()
- if len(dynamicactions.SubscribeArgs) > 0 {
- subscribe(dynamicactions.SubscribeArgs...)
- }
- if len(dynamicactions.UnsubscribeArgs) > 0 {
- unsubscribe(dynamicactions.UnsubscribeArgs...)
- }
- values := []interface{}{}
- if len(dynamicactions.Params) > 0 {
- err := json.Unmarshal([]byte(dynamicactions.Params), &values)
- if err != nil {
- assert.Nil(t, fmt.Sprint("params参数只支持JSON Array,", dynamicactions.Params, mqlstr), err)
- return err
- }
- }
- x := atomic.AddInt32(&global.mqlcount, 1)
- atomic.AddInt32(&topvars.mqlcount, 1)
- atomic.AddInt32(&dirvars.mqlcount, 1)
- atomic.AddInt32(&filevars.mqlcount, 1)
- atomic.AddInt32(&mqlvars.mqlcount, 1)
- err := mt.RunMQLTryDo(t, ctx,
- global,
- topvars,
- dirvars,
- filevars,
- mqlvars,
- basedir, ffpath, mqlkey, mqlsn+"("+strconv.Itoa(int(x))+")", mqri, values, staticactions, actionexprs)
- if err != nil {
- // 执行过程有错,停止继续执行
- return err
- }
- if staticactions.SleepTime != nil {
- global.sleeptime += *staticactions.SleepTime
- topvars.sleeptime += *staticactions.SleepTime
- dirvars.sleeptime += *staticactions.SleepTime
- filevars.sleeptime += *staticactions.SleepTime
- mqlvars.sleeptime += *staticactions.SleepTime
- time.Sleep(*staticactions.SleepTime)
- }
- return nil
- }
|