123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231 |
- package odbc_test
- import (
- "fmt"
- "regexp"
- "strconv"
- "sync/atomic"
- "testing"
- "time"
- ccfg "git.wecise.com/wecise/common/matrix/cfg"
- clog "git.wecise.com/wecise/common/matrix/logger"
- "git.wecise.com/wecise/odb-go/odb"
- "git.wecise.com/wecise/odbtest/odbc"
- )
- var cfg = ccfg.MConfig()
- var log = clog.New().WithConfig(cfg, "log")
- func TestConcurrent(t *testing.T) {
- var n, reqcount, okcount, confirmcount, errcount int32
- odb.QueryDoAsync = true
- log.Info("QueryDoAsync =", odb.QueryDoAsync)
- log.Info("开始测试连接到", odbc.Config().Hosts)
- client, err := odbc.NewClient()
- if err != nil {
- log.Error(fmt.Sprint("connection error:", err, atomic.AddInt32(&errcount, 1)))
- return
- }
- defer func() {
- _ = client.Close()
- log.Info("connection closed")
- }()
- classname := "/oktest/alert_status"
- N := cfg.GetInt("N", 0)
- if N != 0 {
- classname += "_" + strconv.Itoa(N)
- }
- log.Info("开启事件订阅")
- odbc.SubscribeDataChangeTest(client)
- log.Info("开始创建相关类", classname)
- cns := regexp.MustCompile("^/([^/]+)/([^/]+)").FindAllStringSubmatch(classname, -1)
- rootclass := "/" + cns[0][1]
- aliasname := cns[0][2] + "_" + cns[0][1]
- _, err = client.Query(`
- create class if not exists ` + rootclass + `();
- create class if not exists ` + classname + ` (
- acknowledged int "acknowledged",
- msg varchar "msg",
- agent varchar "AAgent",
- alertgroup varchar "alertgroup",
- alertkey varchar "alertkey",
- bsm_classname varchar "bsm_classname",
- bsm_identity varchar "bsm_identity",
- bsm_subidentity varchar "bsm_subidentity",
- businessimpact int "businessimpact",
- causetype int "causetype",
- ciid varchar "ciid",
- cmdbenrichstatus int "cmdbenrichstatus",
- corrscore int "corrscore",
- customer varchar "customer",
- domaingroup varchar "domaingroup",
- domainsubgroup varchar "domainsubgroup",
- duration int "duration",
- emailstatus int "emailstatus",
- eventid varchar "eventid",
- eventtype enum {
- "10":["主机告警"],
- "11":["数据库告警"],
- "12":["中间件类告警"],
- "13":["应用类告警"],
- "14":["硬件类告警"],
- "99":["其他类告警"]} "事件类型",
- expiretime int "expiretime",
- extendedattr varchar "extendedattr",
- firstoccurrence timestamp "firstoccurrence",
- flash int "flash",
- grade int "grade",
- identifier varchar "identifier",
- internallast timestamp "internallast",
- lastoccurrence timestamp "lastoccurrence",
- localnodealias varchar "localnodealias",
- localobjrelate int "localobjrelate",
- localpriobj varchar "localpriobj",
- localrootobj varchar "localrootobj",
- localsecobj varchar "localsecobj",
- localtertobj varchar "localtertobj",
- location varchar "location",
- logticket int "logticket",
- maintenance enum {
- "0":["Unknown", "未知"],
- "1":["true", "维护期内"],
- "2":["false", "维护期外"]} "级别",
- manager varchar "manager",
- market varchar "market",
- netype varchar "netype",
- nmoscausetype int "nmoscausetype",
- nmosdomainname varchar "nmosdomainname",
- nmosentityid int "nmosentityid",
- nmoseventmap varchar "nmoseventmap",
- nmosmanagedstatus int "nmosmanagedstatus",
- nmosobjinst int "nmosobjinst",
- nmosserial varchar "nmosserial",
- node varchar "node",
- nodealias varchar "nodealias",
- nodename varchar "nodename",
- objectfullname varchar "objectfullname",
- oldrow int "oldrow",
- omni_class varchar "class",
- omni_serial int "serial",
- originalseverity int "originalseverity",
- ownergid int "ownergid",
- owneruid int "owneruid",
- physicalcard varchar "physicalcard",
- physicalport int "physicalport",
- physicalslot int "physicalslot",
- platform varchar "platform",
- poll int "poll",
- probesubsecondid int "probesubsecondid",
- processreq int "processreq",
- region varchar "region",
- remotenodealias varchar "remotenodealias",
- remoteobjrelate int "remoteobjrelate",
- remotepriobj varchar "remotepriobj",
- remoterootobj varchar "remoterootobj",
- remotesecobj varchar "remotesecobj",
- remotetertobj varchar "remotetertobj",
- servername varchar "servername",
- serverserial int "serverserial",
- service varchar "service",
- servicenowerrorcode int "servicenowerrorcode",
- servicenowstate int "servicenowstate",
- servicenowsysid varchar "servicenowsysid",
- severity enum {
- "0":["Unknown", "未知"],
- "1":["Trace", "追踪"],
- "2":["Debug", "调试"],
- "3":["Info", "消息"],
- "4":["Warn", "警告"],
- "5":["Error", "错误"],
- "6":["Fatal", "致命错误"]} "级别",
- siteid varchar "siteid",
- statechange timestamp "statechange",
- summary varchar "summary",
- suppressescl int "suppressescl",
- tally int "tally",
- tasklist int "tasklist",
- ticketstatus varchar "ticketstatus",
- ttnumber varchar "ttnumber",
- ttstate int "ttstate",
- ttuser varchar "ttuser",
- type enum {
- "0":["Type not set"],
- "1":["problem", "问题"],
- "2":["resolution", "恢复"],
- "7":["ISM new alarm", "问题"],
- "8":["ISM old alarm", "恢复"]} "类型",
- url varchar "url",
- x733corrnotif varchar "x733corrnotif",
- x733eventtype int "x733eventtype",
- x733probablecause int "x733probablecause",
- x733specificprob varchar "x733specificprob",
-
- keys( identifier ) ,
- index( acknowledged,alertgroup,alertkey,bsm_classname,bsm_identity,bsm_subidentity,businessimpact,causetype,ciid,cmdbenrichstatus,corrscore,customer,domaingroup,domainsubgroup,emailstatus,eventid,eventtype,expiretime,extendedattr,firstoccurrence,flash,grade,identifier,internallast,lastoccurrence,localnodealias,localobjrelate,localpriobj,localrootobj,localsecobj,localtertobj,location,logticket,manager,market,netype,nmoscausetype,nmosdomainname,nmosentityid,nmoseventmap,nmosmanagedstatus,nmosobjinst,nmosserial,node,nodealias,nodename,objectfullname,oldrow,omni_serial,originalseverity,ownergid,owneruid,physicalcard,physicalport,physicalslot,platform,poll,probesubsecondid,processreq,region,remotenodealias,remoteobjrelate,remotepriobj,remoterootobj,remotesecobj,remotetertobj,servername,serverserial,service,servicenowerrorcode,servicenowstate,servicenowsysid,severity,siteid,statechange,summary,suppressescl,tally,tasklist,ticketstatus,ttnumber,ttstate,ttuser,type,url,x733corrnotif,x733eventtype,x733probablecause,x733specificprob)
- )with ttl=366 day , autosearch=true , version=true , key=manu, alias='` + aliasname + `' , nickname='` + aliasname + `' ;
- `).Do()
- if err != nil {
- log.Error(fmt.Sprint("query count error:", err, atomic.AddInt32(&errcount, 1)))
- }
- log.Info("检查历史数据")
- result, err := client.Query("select count(id) as count FROM " + classname + " LIMIT -1").Do()
- if err != nil {
- log.Error(fmt.Sprint("query count error:", err, atomic.AddInt32(&errcount, 1)))
- } else {
- log.Info(fmt.Sprintf("query count return: %#v", result.Data[0]["count"]))
- }
- log.Info("清除历史数据")
- _, err = client.Query("delete FROM " + classname + "").Do()
- if err != nil {
- log.Error(fmt.Sprint("delete error:", err, atomic.AddInt32(&errcount, 1)))
- } else {
- log.Info(fmt.Sprintf("delete ok"))
- }
- st := time.Now()
- go func() {
- t := time.NewTicker(1 * time.Second)
- for {
- select {
- case <-t.C:
- log.Info(time.Since(st), "插入数据", "req:", atomic.AddInt32(&reqcount, 0), "ok:", atomic.AddInt32(&okcount, 0), "confirm:", atomic.AddInt32(&confirmcount, 0), "err:", atomic.AddInt32(&errcount, 0))
- }
- }
- }()
- for i := 0; i < client.Config().PoolSize*1000; i++ {
- go func(i int) {
- for {
- identifier := fmt.Sprint("222.129.134.178.1.3.6.1.4.1.2011.5.25.219.2.5.665.25.219.2.5.6.", i, ".", atomic.AddInt32(&n, 1))
- mql := `insert into ` + classname + ` (identifier, severity, serverserial, domaingroup, lastoccurrence, maintenance, type, agent, firstoccurrence, omni_class, nodealias, alertgroup, node, manager, _tokens, originalseverity, summary) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) on conflict update firstoccurrence=firstoccurrence, servicenowstate=servicenowstate, emailstatus=emailstatus, tally=tally+1`
- params := []interface{}{identifier, 5, 111, "domaingroup", 1622249715000, 1, 2, "huawei mttrapd", 1622211715023, "300", "222.129.134.178", "huawei_power_func", "222.129.134.178", "MTTrapd Probe", map[string]interface{}{"_raw": "{\"1\":\"SNMP Trap Test.see more:http://www.micmiu.com\",\"1_hex\":\"53 4e 4d 50 20 54 72 61 70 20 54 65 73 74 2e 73 65 65 20 6d 6f 72 65 3a 68 74 74 70 3a 2f 2f 77 77 77 2e 6d 69 63 6d 69 75 2e 63 6f 6d\",\"1_raw\":\"SNMP Trap Test.see more:http://www.micmiu.com\",\"1_text\":\"SNMP Trap Test.see more:http://www.micmiu.com\",\"2\":\"1000\",\"2_hex\":\"1000\",\"2_raw\":\"1000\",\"2_text\":\"1000\",\"3\":\".1.3.6.1.4.1.2011.5.25.219.2.5.6\",\"3_hex\":\".1.3.6.1.4.1.2011.5.25.219.2.5.6\",\"3_raw\":\".1.3.6.1.4.1.2011.5.25.219.2.5.6\",\"3_text\":\".1.3.6.1.4.1.2011.5.25.219.2.5.6\",\"4\":\"SNMPTEST\",\"4_hex\":\"53 4e 4d 50 54 45 53 54\",\"4_raw\":\"SNMPTEST\",\"4_text\":\"SNMPTEST\",\"5\":10,\"5_hex\":10,\"5_raw\":10,\"5_text\":10,\"6\":\"Sat May 29 08:55:14 CST 2021\",\"6_hex\":\"53 61 74 20 4d 61 79 20 32 39 20 30 38 3a 35 35 3a 31 34 20 43 53 54 20 32 30 32 31\",\"6_raw\":\"Sat May 29 08:55:14 CST 2021\",\"6_text\":\"Sat May 29 08:55:14 CST 2021\",\"IPaddress\":\"222.129.134.178\",\"Node\":\"222.129.134.178\",\"OID1\":\".1.3.6.1.2.1.1.1.1\",\"OID2\":\".1.3.6.1.2.1.1.3.0\",\"OID3\":\".1.3.6.1.6.3.1.1.4.1.0\",\"OID4\":\".1.3.6.1.4.1.2011.2.15.2.1.2.1.1.1.1\",\"OID5\":\".1.3.6.1.4.1.2011.2.15.2.1.2.1.1.1.2\",\"OID6\":\".1.3.6.1.4.1.2011.2.15.2.1.2.1.1.1.3\",\"PeerAddress\":\"222.129.134.178\",\"PeerIPaddress\":\"222.129.134.178\",\"Protocol\":\"udp\",\"ReceivedPort\":9162,\"ReceivedTime\":0,\"ReqId\":176301626,\"SNMP_Version\":\"2c\",\"UpTime\":\"1000\",\"Uptime\":\"8:16:40.00\",\"community\":\"public\",\"contextEngineID\":\"\",\"enterprise\":\"\",\"generic-trap\":0,\"notify\":\".1.3.6.1.4.1.2011.5.25.219.2.5.6\",\"securityEngineID\":\"1234\",\"securityLevel\":\"authPriv\",\"securityName\":\"user\",\"specific-trap\":0}"}, 5, "设备:222.129.134.178电源整体功能恢复!"}
- atomic.AddInt32(&reqcount, 1)
- result, err := client.Query(mql, params...).Do()
- if err != nil {
- log.Error(fmt.Sprint("connection", i, err, atomic.AddInt32(&errcount, 1)))
- time.Sleep(1 * time.Second)
- } else {
- atomic.AddInt32(&okcount, 1)
- result, err = client.Query("select id,tally FROM "+classname+" where identifier=?", identifier).Do()
- if err != nil {
- log.Error(fmt.Sprint("connection", i, err, atomic.AddInt32(&errcount, 1)))
- time.Sleep(1 * time.Second)
- } else {
- atomic.AddInt32(&confirmcount, 1)
- log.Debug(fmt.Sprint("connection", i, result))
- // time.Sleep(1 * time.Second)
- }
- }
- // time.Sleep(3 * time.Second)
- }
- }(i)
- }
- time.Sleep(3600 * time.Second)
- log.Info("测试结束")
- }
|