package odbcmql import ( "encoding/json" "fmt" "math" "regexp" "strings" "time" "git.wecise.com/wecise/odb-go/odb" "git.wecise.com/wecise/odb-go/odb/mql/parser" "git.wecise.com/wecise/odb-go/odb/mql/qstru" "gitee.com/wecisecode/util/cast" "gitee.com/wecisecode/util/mfmt" "gitee.com/wecisecode/util/set/strset" "gitee.com/wecisecode/util/sortedmap" ) type Action struct { ID int Name string Args []interface{} SourceFromIndex int SourceToIndex int SourceText string // } type OnErrorProc int const ( OnErrorHalt OnErrorProc = iota // halt, fail, exit OnErrorIgnore // continue, ignore OnErrorBreak // break, 中断循环 OnErrorPass // expect, pass, need, sure ) type StaticActions struct { Skip bool RetryLimit int Scope string Qmeta odb.QueryMeta Timeout *time.Duration LoopCount *int LoopFrom *int LoopStep *int ParallelCount *int ForkName *string WaitName *string SleepTime *time.Duration Variables map[string]interface{} BeforeRunActions []*Action } type DynamicActions struct { SubscribeArgs []string UnsubscribeArgs []string Params string } type OnErrorOption struct { regex map[OnErrorProc]*regexp.Regexp noerrorinfo bool } type ActionExprs struct { *sortedmap.LinkedMap staticActions *StaticActions dynamicActions *DynamicActions followThroughActions []*Action onerrorOption *OnErrorOption } func getActionExprs(mqlstr string, staticactions map[int]*Action) (actionexprs *ActionExprs, err error) { actionexprs = &ActionExprs{ LinkedMap: sortedmap.NewLinkedMap(), onerrorOption: &OnErrorOption{ regex: map[OnErrorProc]*regexp.Regexp{}, }, } { // /** ... **/ 之间的内容 qlsindex := reactioneexprs.FindStringSubmatchIndex(mqlstr) if len(qlsindex) < 4 { // --{ ... } 之间的内容 qlsindex = reactioneexprs_2.FindStringSubmatchIndex(mqlstr) if len(qlsindex) < 4 { // --{ ... } 之间的内容 qlsindex = reactioneexprs_3.FindStringSubmatchIndex(mqlstr) if len(qlsindex) < 4 { return } } } isactionexpr := qlsindex[2] ieactionexpr := qlsindex[3] if isactionexpr < 0 || ieactionexpr < 0 { return } sactionexpr := mqlstr[isactionexpr:ieactionexpr] runestart := len([]rune(mqlstr[:isactionexpr])) // for i, q := range qls { // if i == len(qls)-1 { // break // } // if i > 0 { // isactionexpr += 2 // } // isactionexpr += len(q) // } // ckfs := strings.TrimSpace(qls[len(qls)-1]) // if len(ckfs) > 2 && ckfs[0] == '{' && ckfs[len(ckfs)-1] == '}' { // sactionexpr = ckfs[1 : len(ckfs)-1] // } if sactionexpr != "" { stmts, mp, e := parser.ParseMQL(sactionexpr) if e != nil { // logger.Error(e) return nil, e } ab := &ActionBuilder{ runestart: runestart, actionid: 0, staticactions: staticactions, actionexprs: actionexprs, } for _, stmt := range stmts { fields := stmt.(*qstru.SelectStatement).Fields for _, sf := range fields { sp := mp.SourcePosition()[sf] if len(sp) != 1 { panic("len(sp) != 1,一个字段有且只有一个确定位置") } source := mp.Source(sp[0]) ab.NewAction(sf.Expr, sp[0], source) } } } } return } type ActionBuilder struct { runestart int actionid int staticactions map[int]*Action actionexprs *ActionExprs } func (ab *ActionBuilder) NewAction(sfe *qstru.Expr, sp parser.SourcePosition, source string) *Action { ab.actionid++ if act := ab.staticactions[ab.actionid]; act != nil { ab.actionexprs.Put(act.Name, act) return act } action := fmt.Sprint(sfe.FuncName) fargs := sfe.FuncArgs sargs := []interface{}{} for _, farg := range fargs { if farg == nil { sargs = append(sargs, nil) } else if farg.Const != nil { sargs = append(sargs, farg.Const) } else if farg.VarName != "" { sargs = append(sargs, farg) } else if farg.FuncName != nil && farg.FuncArgs != nil { sargs = append(sargs, ab.NewAction(farg, parser.SourcePosition{}, "")) } else { sargs = append(sargs, nil) } } act := &Action{ ID: ab.actionid, Name: action, Args: sargs, SourceFromIndex: sp.Begin + ab.runestart, SourceToIndex: sp.End + ab.runestart, SourceText: source, } ab.actionexprs.Put(act.Name, act) // 记录在mql中的位置,避免固定数值设定被动态改变 if STATIC_ACTION_NAMES.Has(action) { ab.staticactions[ab.actionid] = act } if FOLLOWTHROUGH_ACTION_NAMES.Has(act.Name) { ab.actionexprs.followThroughActions = append(ab.actionexprs.followThroughActions, act) } if ONERROR_ACTION_NAMES.Has(action) { switch action { case "noerrinfo": ab.actionexprs.onerrorOption.noerrorinfo = true case "onerror": if len(act.Args) > 0 { oep := OnErrorHalt switch cast.ToString(act.Args[0]) { case "break": oep = OnErrorBreak case "continue", "ignore": oep = OnErrorIgnore case "pass", "expect", "need", "must", "sure": oep = OnErrorPass case "fail", "halt", "exit": default: } msgs := []string{} for _, arg := range act.Args[1:] { sarg := cast.ToString(arg) if sarg != "" { msgs = append(msgs, sarg) } } msg := strings.Join(msgs, "|") msg = regexp.MustCompile("([^\\|\\*0-9A-Z_a-z\u0100-\uffff])").ReplaceAllString(msg, "\\$1") msg = strings.ReplaceAll(msg, "*", ".*") var regx *regexp.Regexp if msg == "" { regx = matchall } else { msg = "(?:" + strings.ReplaceAll(msg, "|", ")|(?:") + ")" regx = ab.actionexprs.onerrorOption.regex[oep] if regx != nil && regx != matchall { msg += regx.String() + "|" + msg } regx = regexp.MustCompile(msg) } ab.actionexprs.onerrorOption.regex[oep] = regx } } } return act } var ONERROR_ACTION_NAMES = strset.New("onerror", "noerrinfo") var STATIC_ACTION_NAMES = strset.New("skip", "retry", "scope", "beforerun", "set", "timeout", "qmeta", "loop", "loopfrom", "loopstep", "parallel", "fork", "wait", "sleep") var DYNAMIC_ACTION_NAMES = strset.New("subscribe", "unsubscribe", "params") var FOLLOWTHROUGH_ACTION_NAMES = strset.New("schema", "metainfo", "output", "outputcount", "count", "match", "matchcount", "equal") var time_layouts = []string{"2006-01-02 15:04:05", "2006-01-02", "2006-01-02 15:04:05.000", "2006-01-02 15:04:05.000000", "2006-01-02 15:04:05.000000000", "2006-01-02 15:04"} func ToStaticActions(actionexprs *ActionExprs) (staticActions *StaticActions, err error) { staticActions = actionexprs.staticActions if staticActions == nil { staticActions = &StaticActions{} actionexprs.staticActions = staticActions // skip 动作与位置顺序无关 skip := actionexprs.Has("skip") actionexprs.Delete("skip") staticActions.Skip = skip if skip { // 跳过本条语句,不需要后续处理了 return } // retry 动作与位置顺序无关 retry_v, has_retry := actionexprs.Get("retry") actionexprs.Delete("retry") if has_retry { retry_args := retry_v.(*Action).Args retry_limit := 0 if len(retry_args) > 0 { retry_limit = cast.ToInt(retry_args[0]) if retry_limit < 0 { retry_limit = 0 } } staticActions.RetryLimit = retry_limit } // scope 动作与位置顺序无关 op_scope_v, has_scope := actionexprs.Get("scope") actionexprs.Delete("scope") if has_scope { op_scope_args := op_scope_v.(*Action).Args op_scope := "mql" if len(op_scope_args) > 0 { op_scope = cast.ToString(op_scope_args[0]) if op_scope != "file" && op_scope != "dir" && op_scope != "top" { op_scope = "mql" } } staticActions.Scope = op_scope } // beforerun 动作与位置顺序无关 op_beforerun_v, has_beforerun := actionexprs.Get("beforerun") actionexprs.Delete("beforerun") if has_beforerun { op_beforerun_args := op_beforerun_v.(*Action).Args for _, f := range op_beforerun_args { switch sf := f.(type) { case *Action: staticActions.BeforeRunActions = append(staticActions.BeforeRunActions, sf) } } } // set 动作与位置顺序无关 op_set_v, has_set := actionexprs.Get("set") actionexprs.Delete("set") if has_set { op_set_args := op_set_v.(*Action).Args vars := map[string]interface{}{} for i := 0; i < len(op_set_args)-1; i += 2 { k := cast.ToString(op_set_args[i]) v := op_set_args[i+1] switch av := v.(type) { case *qstru.Expr: ak := av.VarName if ak == "now" { v = time.Now() } case string: for _, tf := range time_layouts { t, e := time.Parse(tf, av) if e == nil { v = t break } } } vars[k] = v } staticActions.Variables = vars } // timeout 动作与位置顺序无关 op_timeout_v, has_timeout := actionexprs.Get("timeout") actionexprs.Delete("timeout") if has_timeout { op_timeout_args := op_timeout_v.(*Action).Args var op_timeout time.Duration if len(op_timeout_args) > 0 { op_timeout = mfmt.ParseDuration(strings.TrimSpace(cast.ToString(op_timeout_args[0]))) } staticActions.Timeout = &op_timeout } // qmeta 动作与位置顺序无关 op_qmeta_v, has_qmeta := actionexprs.Get("qmeta") actionexprs.Delete("qmeta") if has_qmeta { op_qmeta_args := op_qmeta_v.(*Action).Args var op_qmeta odb.QueryMeta if len(op_qmeta_args) > 0 { qm := odb.QueryMeta{} switch qmeta_arg := op_qmeta_args[0].(type) { case map[string]any: bs, e := json.Marshal(qmeta_arg) if e != nil { return nil, e } e = json.Unmarshal(bs, &qm) if e != nil { return nil, e } default: s := strings.TrimSpace(cast.ToString(op_qmeta_args[0])) e := json.Unmarshal([]byte(s), &qm) if e != nil { return nil, e } } op_qmeta = qm } staticActions.Qmeta = op_qmeta } // loop 动作与位置顺序无关 op_loop_v, has_loop := actionexprs.Get("loop") actionexprs.Delete("loop") if has_loop { op_loop_args := op_loop_v.(*Action).Args if len(op_loop_args) > 0 { loopcount := cast.ToInt(op_loop_args[0]) if loopcount == 0 { loopcount = 1 } else if loopcount < 0 { loopcount = math.MaxInt } staticActions.LoopCount = &loopcount } } // loopfrom 动作与位置顺序无关 op_loopfrom_v, has_loopfrom := actionexprs.Get("loopfrom") actionexprs.Delete("loopfrom") if has_loopfrom { op_loopfrom_args := op_loopfrom_v.(*Action).Args if len(op_loopfrom_args) > 0 { loopfrom := cast.ToInt(op_loopfrom_args[0]) staticActions.LoopFrom = &loopfrom } } // loopstep 动作与位置顺序无关 op_loopstep_v, has_loopstep := actionexprs.Get("loopstep") actionexprs.Delete("loopstep") if has_loopstep { op_loopstep_args := op_loopstep_v.(*Action).Args if len(op_loopstep_args) > 0 { loopstep := cast.ToInt(op_loopstep_args[0]) staticActions.LoopStep = &loopstep } } // parallel 动作与位置顺序无关 op_parallel_v, has_parallel := actionexprs.Get("parallel") actionexprs.Delete("parallel") if has_parallel { op_parallel_args := op_parallel_v.(*Action).Args parallelcount := 0 if len(op_parallel_args) > 0 { parallelcount = cast.ToInt(op_parallel_args[0]) } staticActions.ParallelCount = ¶llelcount } // fork 动作与位置顺序无关 op_fork_v, has_fork := actionexprs.Get("fork") actionexprs.Delete("fork") if has_fork { op_fork_args := op_fork_v.(*Action).Args fork_name := "" if len(op_fork_args) > 0 { fork_name = cast.ToString(op_fork_args[0]) } staticActions.ForkName = &fork_name } // wait 动作与位置顺序无关 op_wait_v, has_wait := actionexprs.Get("wait") actionexprs.Delete("wait") if has_wait { op_wait_args := op_wait_v.(*Action).Args wait_name := "" if len(op_wait_args) > 0 { wait_name = cast.ToString(op_wait_args[0]) } staticActions.WaitName = &wait_name } // sleep 动作与位置顺序无关 op_sleep_v, has_sleep := actionexprs.Get("sleep") actionexprs.Delete("sleep") if has_sleep { op_sleep_args := op_sleep_v.(*Action).Args var op_sleep_time time.Duration if len(op_sleep_args) > 0 { op_sleep_time = mfmt.ParseDuration(strings.TrimSpace(cast.ToString(op_sleep_args[0]))) } staticActions.SleepTime = &op_sleep_time } } return } func (actionexprs *ActionExprs) DynamicActions() (dynamicActions *DynamicActions) { dynamicActions = actionexprs.dynamicActions if actionexprs.dynamicActions == nil { dynamicActions = &DynamicActions{} actionexprs.dynamicActions = dynamicActions // subscribe 动作与位置顺序无关 op_subscribe_args := []string{} op_subscribe_v, has_subscribe := actionexprs.Get("subscribe") actionexprs.Delete("subscribe") if has_subscribe { args := op_subscribe_v.(*Action).Args for _, argv := range args { switch arg := argv.(type) { case map[string]interface{}: bs, e := json.Marshal(arg) if e == nil { op_subscribe_args = append(op_subscribe_args, string(bs)) } default: op_subscribe_args = append(op_subscribe_args, cast.ToString(arg)) } } if len(op_subscribe_args) > 0 { subscribe(op_subscribe_args...) } } dynamicActions.SubscribeArgs = op_subscribe_args // unsubscribe 动作与位置顺序无关 op_unsubscribe_args := []string{} op_unsubscribe_v, has_unsubscribe := actionexprs.Get("unsubscribe") actionexprs.Delete("unsubscribe") if has_unsubscribe { op_unsubscribe_args = cast.ToStringSlice(op_unsubscribe_v.(*Action).Args) if len(op_unsubscribe_args) > 0 { unsubscribe(op_unsubscribe_args...) } } dynamicActions.UnsubscribeArgs = op_unsubscribe_args // params 动作与位置顺序无关 op_params_str := "" op_params_v, has_params := actionexprs.Get("params") actionexprs.Delete("params") if has_params { op_params_args := op_params_v.(*Action).Args bs, e := json.Marshal(op_params_args) if e == nil { op_params_str = string(bs) } else { op_params_str = strings.TrimSpace(strings.Join(cast.ToStringSlice(op_params_args), ",")) if len(op_params_str) == 0 || op_params_str[0] != '[' || op_params_str[len(op_params_str)-1] != ']' { op_params_str = "[" + op_params_str + "]" } } } dynamicActions.Params = op_params_str } return } // 其它后续动作 func (actionexprs *ActionExprs) FollowThroughActions() []*Action { if actionexprs.followThroughActions == nil { actionexprs.Fetch(func(key, value interface{}) bool { name := key.(string) action := value.(*Action) if FOLLOWTHROUGH_ACTION_NAMES.Has(name) { actionexprs.followThroughActions = append(actionexprs.followThroughActions, action) } return true }) } return actionexprs.followThroughActions }