package odbcmql import ( "bufio" "context" "fmt" "os" "path/filepath" "strings" "sync" "sync/atomic" "testing" "time" "gitee.com/wecisecode/util/pqc" "gitee.com/wecisecode/util/spliter" "github.com/stretchr/testify/assert" ) func (mt *MQLTest) RunFile(t *testing.T, ctx context.Context, global *GlobalVars, topvars *CurrentVars, dirvars *CurrentVars, basedir, filename string) bool { // 读取文件内容 ffpath := filepath.Join(basedir, filename) bs, err := os.ReadFile(ffpath) if !assert.Nil(t, err, err) { return false } // mql语句切分 mqgr := NewMQLGroupRequest() var multilines *MQLRequest mqs := spliter.NewMQLSpliter(bufio.NewReader(strings.NewReader(string(bs)))) for { mql, fromline, toline, fromchar, tochar, hasnext, _ := mqs.NextMQL() if !hasnext { break } // 去掉注释 clean_mql := strings.Join(spliter.MQLSplitClean(mql), ";") if multilines != nil { // 多条语句处理 if strings.TrimSpace(clean_mql) == "multilines end" { // 保留语句中的注释 tm := strings.TrimSpace(strings.Replace(mql, "multilines end", "", 1)) if len(tm) > 0 { if multilines.OriginQueryString != "" { multilines.OriginQueryString += ";" } multilines.OriginQueryString += tm } multilines.Toline = toline multilines.Tochar = tochar // 加入请求组,并初始化,提取动作信息 e := mqgr.Append(multilines) if !assert.Nil(t, e, e) { return false } multilines = nil } else { if multilines.OriginQueryString != "" { multilines.OriginQueryString += ";" } multilines.OriginQueryString += mql multilines.Toline = toline multilines.Tochar = tochar } } else if strings.TrimSpace(clean_mql) == "multilines begin" { multilines = &MQLRequest{FilePath: ffpath} // 保留语句中的注释 multilines.OriginQueryString = strings.TrimSpace(strings.Replace(mql, "multilines begin", "", 1)) multilines.Fromline = fromline multilines.Fromchar = fromchar multilines.Toline = toline multilines.Tochar = tochar } else { // 单条语句 mqr := &MQLRequest{OriginQueryString: mql, FilePath: ffpath, Fromline: fromline, Toline: toline, Fromchar: fromchar, Tochar: tochar} // 加入请求组,并初始化,提取动作信息 e := mqgr.Append(mqr) if !assert.Nil(t, e, e) { return false } } } // mqls := spliter.MQLSplit(string(bs)) mt.scopevars.Lock() if mt.scopevars.file[ffpath] == nil { mt.scopevars.file[ffpath] = &Variables{ vars: map[string]interface{}{}, loop_count: 1, loop_from: 1, loop_step: 1} } mt.scopevars.Unlock() var wg sync.WaitGroup st := time.Now() loop_i := 0 parallel_queue := pqc.NewQueue[any](0) mqlcount := int32(0) for { mt.scopevars.Lock() ok := loop_i < mt.scopevars.file[ffpath].loop_count mt.scopevars.Unlock() if !ok { break } loop_i++ // ch_parallel_count := make(chan int) filevars := &CurrentVars{ loop_i: loop_i, ch_parallel_count: ch_parallel_count, } // ok_chan := make(chan bool, 1) parallel_chan := make(chan bool, 1) parallel := false parallelcount := 0 done := false wg.Add(1) go func() { defer func() { atomic.AddInt32(&mqlcount, filevars.mqlcount) wg.Done() }() ch_ok := make(chan bool) go func() { for { select { case <-ch_parallel_count: if !done && !parallel { parallel = true // 加入并发控制队列 if parallelcount > 0 { if parallelcount > parallel_queue.Size() { parallel_queue.Growth(parallelcount) } parallel_queue.Push(1) } parallel_chan <- true } case ok := <-ch_ok: ok_chan <- ok if parallel { if parallelcount > 0 { // 从并发控制队列中移除 parallel_queue.Pop() } } else { done = true } return } } }() // logger.Info("file", ffpath, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i), "次执行开始") ch_ok <- mt.RunMQLGroup(t, ctx, global, topvars, dirvars, filevars, basedir, ffpath, mqgr) // logger.Info("file", ffpath, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i), "次执行结束") }() success := true select { case success = <-ok_chan: // 非并发,等待完成 logger.Info("file", ffpath, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i), "次顺序执行完成") case <-parallel_chan: // 并发,执行继续下一次 logger.Info("file", ffpath, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i), "次并发执行继续") } if !success { return false } } wg.Wait() mt.scopevars.RLock() loop_count := mt.scopevars.file[ffpath].loop_count mt.scopevars.RUnlock() if loop_count > 1 { ut := time.Since(st) sn := fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i) logger.Info(fmt.Sprint("file ", ffpath+"/"+sn, " loop ", loop_count, " times, run ", mqlcount, " mqls, usetime ", ut)) } return true } func (mt *MQLTest) RunMQLGroup(t *testing.T, ctx context.Context, global *GlobalVars, topvars *CurrentVars, dirvars *CurrentVars, filevars *CurrentVars, basedir, ffpath string, mqgr *MQLGroupRequest) (pass bool) { pass = true var wg sync.WaitGroup for _, mqs := range mqgr.mqrs { wg.Add(1) go func(mqs []*MQLRequest) { defer wg.Done() if len(mqs) == 0 { return } if mqs[0].StaticActions.ForkName != nil { forkname := *mqs[0].StaticActions.ForkName global.Lock() wg := global.wg_wait_fork_routine[forkname] if wg == nil { wg = &sync.WaitGroup{} global.wg_wait_fork_routine[forkname] = wg } global.Unlock() wg.Add(1) defer wg.Done() } ok := mt.RunMQLs(t, ctx, global, topvars, dirvars, filevars, basedir, ffpath, mqs) if !ok { pass = false } }(mqs) } wg.Wait() return } func (mt *MQLTest) RunMQLs(t *testing.T, ctx context.Context, global *GlobalVars, topvars *CurrentVars, dirvars *CurrentVars, filevars *CurrentVars, basedir, ffpath string, mqrs []*MQLRequest) bool { for _, mqr := range mqrs { mqlkey := mqr.Key mqlstr := mqr.OriginQueryString if mqlstr == "" { continue } staticactions := mqr.StaticActions // 设置执行过程中的控制参数 mt.InitScopeVars(basedir, ffpath, mqlkey, staticactions) ch_test_run_one_mql_result := make(chan bool) go func() { mqrinst := fmt.Sprint(mqlkey, "/", topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i) global.Lock() mqrdone := global.ch_wait_mql_done[mqrinst] if mqrdone == nil { mqrdone = make(chan bool) global.ch_wait_mql_done[mqrinst] = mqrdone } var waitmqrdone chan bool if mqr.WaitMQLRequest != nil { waitmqrkey := mqr.WaitMQLRequest.Key waitmqrinst := fmt.Sprint(waitmqrkey, "/", topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i) waitmqrdone = global.ch_wait_mql_done[waitmqrinst] if waitmqrdone == nil { waitmqrdone = make(chan bool) global.ch_wait_mql_done[waitmqrinst] = waitmqrdone } } global.Unlock() var ret bool defer func() { mqrdone <- ret }() if waitmqrdone != nil { v := <-waitmqrdone waitmqrdone <- v if !v { // 依赖MQR失败 ch_test_run_one_mql_result <- false return } } ret = mt.RunMQR(t, ctx, global, topvars, dirvars, filevars, basedir, ffpath, mqr) ch_test_run_one_mql_result <- ret }() ret := <-ch_test_run_one_mql_result if !ret { return ret } // continue } return true }