| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515 |
- package odbcmql
- import (
- "encoding/json"
- "fmt"
- "math"
- "regexp"
- "strings"
- "time"
- "git.wecise.com/wecise/odb-go/odb"
- "git.wecise.com/wecise/odb-go/odb/mql/parser"
- "git.wecise.com/wecise/odb-go/odb/mql/qstru"
- "gitee.com/wecisecode/util/cast"
- "gitee.com/wecisecode/util/mfmt"
- "gitee.com/wecisecode/util/set/strset"
- "gitee.com/wecisecode/util/sortedmap"
- )
- type Action struct {
- ID int
- Name string
- Args []interface{}
- SourceFromIndex int
- SourceToIndex int
- SourceText string //
- }
- type OnErrorProc int
- const (
- OnErrorHalt OnErrorProc = iota // halt, fail, exit
- OnErrorIgnore // continue, ignore
- OnErrorBreak // break, 中断循环
- OnErrorPass // expect, pass, need, sure
- )
- type StaticActions struct {
- Skip bool
- RetryLimit int
- Scope string
- Qmeta odb.QueryMeta
- Timeout *time.Duration
- LoopCount *int
- LoopFrom *int
- LoopStep *int
- ParallelCount *int
- ForkName *string
- WaitName *string
- SleepTime *time.Duration
- Variables map[string]interface{}
- BeforeRunActions []*Action
- }
- type DynamicActions struct {
- SubscribeArgs []string
- UnsubscribeArgs []string
- Params string
- }
- type OnErrorOption struct {
- regex map[OnErrorProc]*regexp.Regexp
- noerrorinfo bool
- }
- type ActionExprs struct {
- *sortedmap.LinkedMap
- staticActions *StaticActions
- dynamicActions *DynamicActions
- followThroughActions []*Action
- onerrorOption *OnErrorOption
- }
- func getActionExprs(mqlstr string, staticactions map[int]*Action) (actionexprs *ActionExprs, err error) {
- actionexprs = &ActionExprs{
- LinkedMap: sortedmap.NewLinkedMap(),
- onerrorOption: &OnErrorOption{
- regex: map[OnErrorProc]*regexp.Regexp{},
- },
- }
- {
- // /** ... **/ 之间的内容
- qlsindex := reactioneexprs.FindStringSubmatchIndex(mqlstr)
- if len(qlsindex) < 4 {
- // --{ ... } 之间的内容
- qlsindex = reactioneexprs_2.FindStringSubmatchIndex(mqlstr)
- if len(qlsindex) < 4 {
- // --{ ... } 之间的内容
- qlsindex = reactioneexprs_3.FindStringSubmatchIndex(mqlstr)
- if len(qlsindex) < 4 {
- return
- }
- }
- }
- isactionexpr := qlsindex[2]
- ieactionexpr := qlsindex[3]
- if isactionexpr < 0 || ieactionexpr < 0 {
- return
- }
- sactionexpr := mqlstr[isactionexpr:ieactionexpr]
- runestart := len([]rune(mqlstr[:isactionexpr]))
- // for i, q := range qls {
- // if i == len(qls)-1 {
- // break
- // }
- // if i > 0 {
- // isactionexpr += 2
- // }
- // isactionexpr += len(q)
- // }
- // ckfs := strings.TrimSpace(qls[len(qls)-1])
- // if len(ckfs) > 2 && ckfs[0] == '{' && ckfs[len(ckfs)-1] == '}' {
- // sactionexpr = ckfs[1 : len(ckfs)-1]
- // }
- if sactionexpr != "" {
- stmts, mp, e := parser.ParseMQL(sactionexpr)
- if e != nil {
- // logger.Error(e)
- return nil, e
- }
- ab := &ActionBuilder{
- runestart: runestart,
- actionid: 0,
- staticactions: staticactions,
- actionexprs: actionexprs,
- }
- for _, stmt := range stmts {
- fields := stmt.(*qstru.SelectStatement).Fields
- for _, sf := range fields {
- sp := mp.SourcePosition()[sf]
- if len(sp) != 1 {
- panic("len(sp) != 1,一个字段有且只有一个确定位置")
- }
- source := mp.Source(sp[0])
- ab.NewAction(sf.Expr, sp[0], source)
- }
- }
- }
- }
- return
- }
- type ActionBuilder struct {
- runestart int
- actionid int
- staticactions map[int]*Action
- actionexprs *ActionExprs
- }
- func (ab *ActionBuilder) NewAction(sfe *qstru.Expr, sp parser.SourcePosition, source string) *Action {
- ab.actionid++
- if act := ab.staticactions[ab.actionid]; act != nil {
- ab.actionexprs.Put(act.Name, act)
- return act
- }
- action := fmt.Sprint(sfe.FuncName)
- fargs := sfe.FuncArgs
- sargs := []interface{}{}
- for _, farg := range fargs {
- if farg == nil {
- sargs = append(sargs, nil)
- } else if farg.Const != nil {
- sargs = append(sargs, farg.Const)
- } else if farg.VarName != "" {
- sargs = append(sargs, farg)
- } else if farg.FuncName != nil && farg.FuncArgs != nil {
- sargs = append(sargs, ab.NewAction(farg, parser.SourcePosition{}, ""))
- } else {
- sargs = append(sargs, nil)
- }
- }
- act := &Action{
- ID: ab.actionid,
- Name: action,
- Args: sargs,
- SourceFromIndex: sp.Begin + ab.runestart,
- SourceToIndex: sp.End + ab.runestart,
- SourceText: source,
- }
- ab.actionexprs.Put(act.Name, act)
- // 记录在mql中的位置,避免固定数值设定被动态改变
- if STATIC_ACTION_NAMES.Has(action) {
- ab.staticactions[ab.actionid] = act
- }
- if FOLLOWTHROUGH_ACTION_NAMES.Has(act.Name) {
- ab.actionexprs.followThroughActions = append(ab.actionexprs.followThroughActions, act)
- }
- if ONERROR_ACTION_NAMES.Has(action) {
- switch action {
- case "noerrinfo":
- ab.actionexprs.onerrorOption.noerrorinfo = true
- case "onerror":
- if len(act.Args) > 0 {
- oep := OnErrorHalt
- switch cast.ToString(act.Args[0]) {
- case "break":
- oep = OnErrorBreak
- case "continue", "ignore":
- oep = OnErrorIgnore
- case "pass", "expect", "need", "must", "sure":
- oep = OnErrorPass
- case "fail", "halt", "exit":
- default:
- }
- msgs := []string{}
- for _, arg := range act.Args[1:] {
- sarg := cast.ToString(arg)
- if sarg != "" {
- msgs = append(msgs, sarg)
- }
- }
- msg := strings.Join(msgs, "|")
- msg = regexp.MustCompile("([^\\|\\*0-9A-Z_a-z\u0100-\uffff])").ReplaceAllString(msg, "\\$1")
- msg = strings.ReplaceAll(msg, "*", ".*")
- var regx *regexp.Regexp
- if msg == "" {
- regx = matchall
- } else {
- msg = "(?:" + strings.ReplaceAll(msg, "|", ")|(?:") + ")"
- regx = ab.actionexprs.onerrorOption.regex[oep]
- if regx != nil && regx != matchall {
- msg += regx.String() + "|" + msg
- }
- regx = regexp.MustCompile(msg)
- }
- ab.actionexprs.onerrorOption.regex[oep] = regx
- }
- }
- }
- return act
- }
- var ONERROR_ACTION_NAMES = strset.New("onerror", "noerrinfo")
- var STATIC_ACTION_NAMES = strset.New("skip", "retry", "scope", "beforerun", "set", "timeout", "qmeta", "loop", "loopfrom", "loopstep", "parallel", "fork", "wait", "sleep")
- var DYNAMIC_ACTION_NAMES = strset.New("subscribe", "unsubscribe", "params")
- var FOLLOWTHROUGH_ACTION_NAMES = strset.New("schema", "metainfo", "output", "outputcount", "count", "match", "matchcount", "equal")
- var time_layouts = []string{"2006-01-02 15:04:05", "2006-01-02", "2006-01-02 15:04:05.000", "2006-01-02 15:04:05.000000", "2006-01-02 15:04:05.000000000", "2006-01-02 15:04"}
- func ToStaticActions(actionexprs *ActionExprs) (staticActions *StaticActions, err error) {
- staticActions = actionexprs.staticActions
- if staticActions == nil {
- staticActions = &StaticActions{}
- actionexprs.staticActions = staticActions
- // skip 动作与位置顺序无关
- skip := actionexprs.Has("skip")
- actionexprs.Delete("skip")
- staticActions.Skip = skip
- if skip {
- // 跳过本条语句,不需要后续处理了
- return
- }
- // retry 动作与位置顺序无关
- retry_v, has_retry := actionexprs.Get("retry")
- actionexprs.Delete("retry")
- if has_retry {
- retry_args := retry_v.(*Action).Args
- retry_limit := 0
- if len(retry_args) > 0 {
- retry_limit = cast.ToInt(retry_args[0])
- if retry_limit < 0 {
- retry_limit = 0
- }
- }
- staticActions.RetryLimit = retry_limit
- }
- // scope 动作与位置顺序无关
- op_scope_v, has_scope := actionexprs.Get("scope")
- actionexprs.Delete("scope")
- if has_scope {
- op_scope_args := op_scope_v.(*Action).Args
- op_scope := "mql"
- if len(op_scope_args) > 0 {
- op_scope = cast.ToString(op_scope_args[0])
- if op_scope != "file" && op_scope != "dir" && op_scope != "top" {
- op_scope = "mql"
- }
- }
- staticActions.Scope = op_scope
- }
- // beforerun 动作与位置顺序无关
- op_beforerun_v, has_beforerun := actionexprs.Get("beforerun")
- actionexprs.Delete("beforerun")
- if has_beforerun {
- op_beforerun_args := op_beforerun_v.(*Action).Args
- for _, f := range op_beforerun_args {
- switch sf := f.(type) {
- case *Action:
- staticActions.BeforeRunActions = append(staticActions.BeforeRunActions, sf)
- }
- }
- }
- // set 动作与位置顺序无关
- op_set_v, has_set := actionexprs.Get("set")
- actionexprs.Delete("set")
- if has_set {
- op_set_args := op_set_v.(*Action).Args
- vars := map[string]interface{}{}
- for i := 0; i < len(op_set_args)-1; i += 2 {
- k := cast.ToString(op_set_args[i])
- v := op_set_args[i+1]
- switch av := v.(type) {
- case *qstru.Expr:
- ak := av.VarName
- if ak == "now" {
- v = time.Now()
- }
- case string:
- for _, tf := range time_layouts {
- t, e := time.Parse(tf, av)
- if e == nil {
- v = t
- break
- }
- }
- }
- vars[k] = v
- }
- staticActions.Variables = vars
- }
- // timeout 动作与位置顺序无关
- op_timeout_v, has_timeout := actionexprs.Get("timeout")
- actionexprs.Delete("timeout")
- if has_timeout {
- op_timeout_args := op_timeout_v.(*Action).Args
- var op_timeout time.Duration
- if len(op_timeout_args) > 0 {
- op_timeout = mfmt.ParseDuration(strings.TrimSpace(cast.ToString(op_timeout_args[0])))
- }
- staticActions.Timeout = &op_timeout
- }
- // qmeta 动作与位置顺序无关
- op_qmeta_v, has_qmeta := actionexprs.Get("qmeta")
- actionexprs.Delete("qmeta")
- if has_qmeta {
- op_qmeta_args := op_qmeta_v.(*Action).Args
- var op_qmeta odb.QueryMeta
- if len(op_qmeta_args) > 0 {
- qm := odb.QueryMeta{}
- switch qmeta_arg := op_qmeta_args[0].(type) {
- case map[string]any:
- bs, e := json.Marshal(qmeta_arg)
- if e != nil {
- return nil, e
- }
- e = json.Unmarshal(bs, &qm)
- if e != nil {
- return nil, e
- }
- default:
- s := strings.TrimSpace(cast.ToString(op_qmeta_args[0]))
- e := json.Unmarshal([]byte(s), &qm)
- if e != nil {
- return nil, e
- }
- }
- op_qmeta = qm
- }
- staticActions.Qmeta = op_qmeta
- }
- // loop 动作与位置顺序无关
- op_loop_v, has_loop := actionexprs.Get("loop")
- actionexprs.Delete("loop")
- if has_loop {
- op_loop_args := op_loop_v.(*Action).Args
- if len(op_loop_args) > 0 {
- loopcount := cast.ToInt(op_loop_args[0])
- if loopcount == 0 {
- loopcount = 1
- } else if loopcount < 0 {
- loopcount = math.MaxInt
- }
- staticActions.LoopCount = &loopcount
- }
- }
- // loopfrom 动作与位置顺序无关
- op_loopfrom_v, has_loopfrom := actionexprs.Get("loopfrom")
- actionexprs.Delete("loopfrom")
- if has_loopfrom {
- op_loopfrom_args := op_loopfrom_v.(*Action).Args
- if len(op_loopfrom_args) > 0 {
- loopfrom := cast.ToInt(op_loopfrom_args[0])
- staticActions.LoopFrom = &loopfrom
- }
- }
- // loopstep 动作与位置顺序无关
- op_loopstep_v, has_loopstep := actionexprs.Get("loopstep")
- actionexprs.Delete("loopstep")
- if has_loopstep {
- op_loopstep_args := op_loopstep_v.(*Action).Args
- if len(op_loopstep_args) > 0 {
- loopstep := cast.ToInt(op_loopstep_args[0])
- staticActions.LoopStep = &loopstep
- }
- }
- // parallel 动作与位置顺序无关
- op_parallel_v, has_parallel := actionexprs.Get("parallel")
- actionexprs.Delete("parallel")
- if has_parallel {
- op_parallel_args := op_parallel_v.(*Action).Args
- parallelcount := 0
- if len(op_parallel_args) > 0 {
- parallelcount = cast.ToInt(op_parallel_args[0])
- }
- staticActions.ParallelCount = ¶llelcount
- }
- // fork 动作与位置顺序无关
- op_fork_v, has_fork := actionexprs.Get("fork")
- actionexprs.Delete("fork")
- if has_fork {
- op_fork_args := op_fork_v.(*Action).Args
- fork_name := ""
- if len(op_fork_args) > 0 {
- fork_name = cast.ToString(op_fork_args[0])
- }
- staticActions.ForkName = &fork_name
- }
- // wait 动作与位置顺序无关
- op_wait_v, has_wait := actionexprs.Get("wait")
- actionexprs.Delete("wait")
- if has_wait {
- op_wait_args := op_wait_v.(*Action).Args
- wait_name := ""
- if len(op_wait_args) > 0 {
- wait_name = cast.ToString(op_wait_args[0])
- }
- staticActions.WaitName = &wait_name
- }
- // sleep 动作与位置顺序无关
- op_sleep_v, has_sleep := actionexprs.Get("sleep")
- actionexprs.Delete("sleep")
- if has_sleep {
- op_sleep_args := op_sleep_v.(*Action).Args
- var op_sleep_time time.Duration
- if len(op_sleep_args) > 0 {
- op_sleep_time = mfmt.ParseDuration(strings.TrimSpace(cast.ToString(op_sleep_args[0])))
- }
- staticActions.SleepTime = &op_sleep_time
- }
- }
- return
- }
- func (actionexprs *ActionExprs) DynamicActions() (dynamicActions *DynamicActions) {
- dynamicActions = actionexprs.dynamicActions
- if actionexprs.dynamicActions == nil {
- dynamicActions = &DynamicActions{}
- actionexprs.dynamicActions = dynamicActions
- // subscribe 动作与位置顺序无关
- op_subscribe_args := []string{}
- op_subscribe_v, has_subscribe := actionexprs.Get("subscribe")
- actionexprs.Delete("subscribe")
- if has_subscribe {
- args := op_subscribe_v.(*Action).Args
- for _, argv := range args {
- switch arg := argv.(type) {
- case map[string]interface{}:
- bs, e := json.Marshal(arg)
- if e == nil {
- op_subscribe_args = append(op_subscribe_args, string(bs))
- }
- default:
- op_subscribe_args = append(op_subscribe_args, cast.ToString(arg))
- }
- }
- if len(op_subscribe_args) > 0 {
- subscribe(op_subscribe_args...)
- }
- }
- dynamicActions.SubscribeArgs = op_subscribe_args
- // unsubscribe 动作与位置顺序无关
- op_unsubscribe_args := []string{}
- op_unsubscribe_v, has_unsubscribe := actionexprs.Get("unsubscribe")
- actionexprs.Delete("unsubscribe")
- if has_unsubscribe {
- op_unsubscribe_args = cast.ToStringSlice(op_unsubscribe_v.(*Action).Args)
- if len(op_unsubscribe_args) > 0 {
- unsubscribe(op_unsubscribe_args...)
- }
- }
- dynamicActions.UnsubscribeArgs = op_unsubscribe_args
- // params 动作与位置顺序无关
- op_params_str := ""
- op_params_v, has_params := actionexprs.Get("params")
- actionexprs.Delete("params")
- if has_params {
- op_params_args := op_params_v.(*Action).Args
- bs, e := json.Marshal(op_params_args)
- if e == nil {
- op_params_str = string(bs)
- } else {
- op_params_str = strings.TrimSpace(strings.Join(cast.ToStringSlice(op_params_args), ","))
- if len(op_params_str) == 0 || op_params_str[0] != '[' || op_params_str[len(op_params_str)-1] != ']' {
- op_params_str = "[" + op_params_str + "]"
- }
- }
- }
- dynamicActions.Params = op_params_str
- }
- return
- }
- // 其它后续动作
- func (actionexprs *ActionExprs) FollowThroughActions() []*Action {
- if actionexprs.followThroughActions == nil {
- actionexprs.Fetch(func(key, value interface{}) bool {
- name := key.(string)
- action := value.(*Action)
- if FOLLOWTHROUGH_ACTION_NAMES.Has(name) {
- actionexprs.followThroughActions = append(actionexprs.followThroughActions, action)
- }
- return true
- })
- }
- return actionexprs.followThroughActions
- }
|