| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283 |
- package odbcmql
- import (
- "bufio"
- "context"
- "fmt"
- "os"
- "path/filepath"
- "strings"
- "sync"
- "sync/atomic"
- "testing"
- "time"
- "gitee.com/wecisecode/util/pqc"
- "gitee.com/wecisecode/util/spliter"
- "github.com/stretchr/testify/assert"
- )
- func (mt *MQLTest) RunFile(t *testing.T, ctx context.Context,
- global *GlobalVars,
- topvars *CurrentVars,
- dirvars *CurrentVars,
- basedir, filename string) bool {
- // 读取文件内容
- ffpath := filepath.Join(basedir, filename)
- bs, err := os.ReadFile(ffpath)
- if !assert.Nil(t, err, err) {
- return false
- }
- // mql语句切分
- mqgr := NewMQLGroupRequest()
- var multilines *MQLRequest
- mqs := spliter.NewMQLSpliter(bufio.NewReader(strings.NewReader(string(bs))))
- for {
- mql, fromline, toline, fromchar, tochar, hasnext, _ := mqs.NextMQL()
- if !hasnext {
- break
- }
- // 去掉注释
- clean_mql := strings.Join(spliter.MQLSplitClean(mql), ";")
- if multilines != nil {
- // 多条语句处理
- if strings.TrimSpace(clean_mql) == "multilines end" {
- // 保留语句中的注释
- tm := strings.TrimSpace(strings.Replace(mql, "multilines end", "", 1))
- if len(tm) > 0 {
- if multilines.OriginQueryString != "" {
- multilines.OriginQueryString += ";"
- }
- multilines.OriginQueryString += tm
- }
- multilines.Toline = toline
- multilines.Tochar = tochar
- // 加入请求组,并初始化,提取动作信息
- e := mqgr.Append(multilines)
- if !assert.Nil(t, e, e) {
- return false
- }
- multilines = nil
- } else {
- if multilines.OriginQueryString != "" {
- multilines.OriginQueryString += ";"
- }
- multilines.OriginQueryString += mql
- multilines.Toline = toline
- multilines.Tochar = tochar
- }
- } else if strings.TrimSpace(clean_mql) == "multilines begin" {
- multilines = &MQLRequest{FilePath: ffpath}
- // 保留语句中的注释
- multilines.OriginQueryString = strings.TrimSpace(strings.Replace(mql, "multilines begin", "", 1))
- multilines.Fromline = fromline
- multilines.Fromchar = fromchar
- multilines.Toline = toline
- multilines.Tochar = tochar
- } else {
- // 单条语句
- mqr := &MQLRequest{OriginQueryString: mql, FilePath: ffpath, Fromline: fromline, Toline: toline, Fromchar: fromchar, Tochar: tochar}
- // 加入请求组,并初始化,提取动作信息
- e := mqgr.Append(mqr)
- if !assert.Nil(t, e, e) {
- return false
- }
- }
- }
- // mqls := spliter.MQLSplit(string(bs))
- mt.scopevars.Lock()
- if mt.scopevars.file[ffpath] == nil {
- mt.scopevars.file[ffpath] = &Variables{
- vars: map[string]interface{}{},
- loop_count: 1,
- loop_from: 1,
- loop_step: 1}
- }
- mt.scopevars.Unlock()
- var wg sync.WaitGroup
- st := time.Now()
- loop_i := 0
- parallel_queue := pqc.NewQueue[any](0)
- mqlcount := int32(0)
- for {
- mt.scopevars.Lock()
- ok := loop_i < mt.scopevars.file[ffpath].loop_count
- mt.scopevars.Unlock()
- if !ok {
- break
- }
- loop_i++
- //
- ch_parallel_count := make(chan int)
- filevars := &CurrentVars{
- loop_i: loop_i,
- ch_parallel_count: ch_parallel_count,
- }
- //
- ok_chan := make(chan bool, 1)
- parallel_chan := make(chan bool, 1)
- parallel := false
- parallelcount := 0
- done := false
- wg.Add(1)
- go func() {
- defer func() {
- atomic.AddInt32(&mqlcount, filevars.mqlcount)
- wg.Done()
- }()
- ch_ok := make(chan bool)
- go func() {
- for {
- select {
- case <-ch_parallel_count:
- if !done && !parallel {
- parallel = true
- // 加入并发控制队列
- if parallelcount > 0 {
- if parallelcount > parallel_queue.Size() {
- parallel_queue.Growth(parallelcount)
- }
- parallel_queue.Push(1)
- }
- parallel_chan <- true
- }
- case ok := <-ch_ok:
- ok_chan <- ok
- if parallel {
- if parallelcount > 0 {
- // 从并发控制队列中移除
- parallel_queue.Pop()
- }
- } else {
- done = true
- }
- return
- }
- }
- }()
- // logger.Info("file", ffpath, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i), "次执行开始")
- ch_ok <- mt.RunMQLGroup(t, ctx,
- global,
- topvars,
- dirvars,
- filevars,
- basedir, ffpath, mqgr)
- // logger.Info("file", ffpath, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i), "次执行结束")
- }()
- success := true
- select {
- case success = <-ok_chan: // 非并发,等待完成
- logger.Info("file", ffpath, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i), "次顺序执行完成")
- case <-parallel_chan: // 并发,执行继续下一次
- logger.Info("file", ffpath, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i), "次并发执行继续")
- }
- if !success {
- return false
- }
- }
- wg.Wait()
- mt.scopevars.RLock()
- loop_count := mt.scopevars.file[ffpath].loop_count
- mt.scopevars.RUnlock()
- if loop_count > 1 {
- ut := time.Since(st)
- sn := fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i)
- logger.Info(fmt.Sprint("file ", ffpath+"/"+sn, " loop ", loop_count, " times, run ", mqlcount, " mqls, usetime ", ut))
- }
- return true
- }
- func (mt *MQLTest) RunMQLGroup(t *testing.T, ctx context.Context,
- global *GlobalVars,
- topvars *CurrentVars,
- dirvars *CurrentVars,
- filevars *CurrentVars,
- basedir, ffpath string, mqgr *MQLGroupRequest) (pass bool) {
- pass = true
- var wg sync.WaitGroup
- for _, mqs := range mqgr.mqrs {
- wg.Add(1)
- go func(mqs []*MQLRequest) {
- defer wg.Done()
- if len(mqs) == 0 {
- return
- }
- if mqs[0].StaticActions.ForkName != nil {
- forkname := *mqs[0].StaticActions.ForkName
- global.Lock()
- wg := global.wg_wait_fork_routine[forkname]
- if wg == nil {
- wg = &sync.WaitGroup{}
- global.wg_wait_fork_routine[forkname] = wg
- }
- global.Unlock()
- wg.Add(1)
- defer wg.Done()
- }
- ok := mt.RunMQLs(t, ctx, global, topvars, dirvars, filevars, basedir, ffpath, mqs)
- if !ok {
- pass = false
- }
- }(mqs)
- }
- wg.Wait()
- return
- }
- func (mt *MQLTest) RunMQLs(t *testing.T, ctx context.Context,
- global *GlobalVars,
- topvars *CurrentVars,
- dirvars *CurrentVars,
- filevars *CurrentVars,
- basedir, ffpath string, mqrs []*MQLRequest) bool {
- for _, mqr := range mqrs {
- mqlkey := mqr.Key
- mqlstr := mqr.OriginQueryString
- if mqlstr == "" {
- continue
- }
- staticactions := mqr.StaticActions
- // 设置执行过程中的控制参数
- mt.InitScopeVars(basedir, ffpath, mqlkey, staticactions)
- ch_test_run_one_mql_result := make(chan bool)
- go func() {
- mqrinst := fmt.Sprint(mqlkey, "/", topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i)
- global.Lock()
- mqrdone := global.ch_wait_mql_done[mqrinst]
- if mqrdone == nil {
- mqrdone = make(chan bool)
- global.ch_wait_mql_done[mqrinst] = mqrdone
- }
- var waitmqrdone chan bool
- if mqr.WaitMQLRequest != nil {
- waitmqrkey := mqr.WaitMQLRequest.Key
- waitmqrinst := fmt.Sprint(waitmqrkey, "/", topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i)
- waitmqrdone = global.ch_wait_mql_done[waitmqrinst]
- if waitmqrdone == nil {
- waitmqrdone = make(chan bool)
- global.ch_wait_mql_done[waitmqrinst] = waitmqrdone
- }
- }
- global.Unlock()
- var ret bool
- defer func() { mqrdone <- ret }()
- if waitmqrdone != nil {
- v := <-waitmqrdone
- waitmqrdone <- v
- if !v {
- // 依赖MQR失败
- ch_test_run_one_mql_result <- false
- return
- }
- }
- ret = mt.RunMQR(t, ctx, global, topvars, dirvars, filevars, basedir, ffpath, mqr)
- ch_test_run_one_mql_result <- ret
- }()
- ret := <-ch_test_run_one_mql_result
- if !ret {
- return ret
- }
- // continue
- }
- return true
- }
|