123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463 |
- package main
- import (
- "encoding/json"
- "fmt"
- "math/rand"
- "os"
- "runtime"
- "sort"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "trial/ping/probing"
- "trial/ping/probing/icmp"
- "trial/ping/utils"
- "git.wecise.com/wecise/common/logger"
- "git.wecise.com/wecise/common/matrix/cfg"
- "git.wecise.com/wecise/common/matrix/util"
- "github.com/scylladb/go-set/strset"
- )
- type task struct {
- Server string `json:"server"`
- Timeout int `json:"timeout"`
- NumberOfPings int `json:"numberofpings"`
- PacketInterval int `json:"packetinterval"`
- PacketSize int `json:"packetsize"`
- TypeOfService int `json:"typeofservice"` // Not used
- Retries int `json:"retries"` // Not used
- Poll int `json:"poll"`
- FailureRetests int `json:"failureretests"` // Not used
- RetestInterval int `json:"retestinterval"` // Not used
- HostNameLookupPreference string `json:"hostnamelookuppreference"` // Not used
- Description string `json:"description"` // Not used
- Rule string `json:"rule"`
- PerfRule string `json:"perfrule"`
- TaskTime int64 `json:"tasktime"`
- Indicator string `json:"indicator"`
- }
- type InputConfig struct {
- Poolsize int `toml:"poolsize"`
- Domain string `toml:"domain"`
- StatInterval int `toml:"stat_interval"`
- }
- type StatInfo struct {
- MinRtt time.Duration
- MaxRtt time.Duration
- AvgRtt time.Duration
- LossCount int
- Count int
- }
- type Input struct {
- *InputConfig
- ips []string
- statinfomutex sync.Mutex
- statinfo map[string]*StatInfo
- ipaddrs map[string]string
- subSize int
- workChan chan *task
- stopChan chan bool
- ds *utils.DataStat
- }
- var mcfg = cfg.MConfig()
- var allipsmutex sync.Mutex
- var allips = strset.New()
- /*
- poolsize=1000 同时ping的不同目标地址个数
- count=5 每轮ping的次数,每轮次产生一次统计结果,并切换目标地址
- interval=1000 两次ping之间的间隔,单位毫秒
- size=64 每次ping发送字节数,48~8192
- timeout=2 ping超时,单位秒
- detect=1ms 自动发现可ping通地址,并追加到 hosts.txt 文件,设置发现间隔时间需指定单位,如 ms,毫秒,us,微妙等,默认 0,不进行发现处理
- */
- func main() {
- input := &Input{}
- input.statinfo = map[string]*StatInfo{}
- input.ipaddrs = map[string]string{}
- inputcfg := &InputConfig{
- Poolsize: 1000,
- StatInterval: 600,
- }
- inputcfg.Poolsize = mcfg.GetInt("poolsize", inputcfg.Poolsize)
- if detect_interval := mcfg.GetDuration("detect", 0); detect_interval != 0 {
- go detect(detect_interval)
- }
- fips := func() []string {
- xips := mcfg.GetStrings("ip|ping.ip", "")
- allipsmutex.Lock()
- for _, aips := range xips {
- allips.Add(strings.Split(aips, ",")...)
- }
- bs, _ := util.ReadFile("./hosts.txt")
- if len(bs) > 0 {
- xips := strings.Split(string(bs), "\n")
- for _, aips := range xips {
- allips.Add(strings.Split(aips, ",")...)
- }
- }
- if len(detectips) > 0 {
- util.WriteFile("./hosts.txt", []byte(string(bs)+"\n"+strings.Join(detectips, ",")), true)
- detectips = detectips[:0]
- }
- sips := allips.List()
- allipsmutex.Unlock()
- return sips
- }
- go func() {
- t := time.NewTicker(5 * time.Second)
- for {
- <-t.C
- input.ips = fips()
- }
- }()
- input.ips = fips()
- mcfg.OnChanged(func(cfg cfg.Configure) {
- input.ips = fips()
- })
- input.Init(inputcfg)
- go input.Run()
- go func() {
- last_count := 0
- t := time.NewTicker(1 * time.Second)
- for {
- select {
- case <-t.C:
- s := ""
- input.statinfomutex.Lock()
- ks := []string{}
- for k := range input.statinfo {
- ks = append(ks, k)
- }
- sort.Strings(ks)
- for i, k := range ks {
- v := input.statinfo[k]
- ip := input.ipaddrs[k]
- s += fmt.Sprintf("%-3d %-20s %15s : %-12s [%12s ~ %-12s] loss %d/%d\n", i, k, ip, v.AvgRtt, v.MinRtt, v.MaxRtt, v.LossCount, v.Count)
- }
- input.statinfomutex.Unlock()
- logger.Info("统计信息更新:", fmt.Sprint("\n", s))
- logger.Info(util.FormatDuration(time.Since(starttime)), "已经完成", pingcount, "次Ping操作,",
- "平均每秒", (int64(pingcount+1) * int64(time.Second) / int64(time.Since(starttime))), "次",
- "最近一秒", (pingcount - int32(last_count)), "次",
- "最大缓冲", icmp.MaxReceiveBufferUsed(),
- )
- last_count = int(pingcount)
- }
- }
- }()
- n := 0
- for {
- if n >= len(input.ips) {
- n = 0
- input.ips = append(input.ips, detectips...)
- }
- ip := input.ips[n]
- n++
- //
- ip = strings.TrimSpace(ip)
- if ip != "" {
- input.workChan <- &task{
- Server: ip,
- Timeout: mcfg.GetInt("timeout", 2),
- NumberOfPings: mcfg.GetInt("count", 5), // 至少为2,小于2会导致不能正常结束
- PacketInterval: mcfg.GetInt("interval", 1000),
- PacketSize: mcfg.GetInt("size", 64),
- }
- }
- // time.Sleep(1 * time.Millisecond)
- }
- }
- func (input *Input) Init(config interface{}) error {
- input.InputConfig = config.(*InputConfig)
- input.workChan = make(chan *task, input.Poolsize)
- input.stopChan = make(chan bool)
- input.subSize = runtime.GOMAXPROCS(0)
- return nil
- }
- func (input *Input) Run() (err error) {
- input.ds = utils.NewDataStat("service", "", input.StatInterval, input.Poolsize*2)
- defer input.ds.Close()
- go input.ds.Start()
- defer close(input.workChan)
- var wg sync.WaitGroup
- // Start worker
- // count := int32(0)
- // lpcmutex := sync.Mutex{}
- // lastprintcount := time.Now()
- for i := 0; i < input.Poolsize; i++ {
- go func(n int) {
- for t := range input.workChan {
- wg.Add(1)
- // Run task
- if e := input.send(t, n); e != nil {
- logger.Error("Send error:", e)
- input.ds.Send(1)
- } else {
- input.ds.Send(0)
- }
- wg.Done()
- // x := atomic.AddInt32(&count, 1)
- // lpcmutex.Lock()
- // printmsg := time.Since(lastprintcount) >= 1*time.Second
- // if printmsg {
- // lastprintcount = time.Now()
- // }
- // lpcmutex.Unlock()
- // if printmsg {
- // logger.Info("已经完成", x, "个目标的Ping操作")
- // }
- }
- }(i)
- }
- <-input.stopChan
- wg.Wait()
- return nil
- }
- var pingcount int32
- var starttime = time.Now()
- var printpingcountmutex sync.Mutex
- var printpingcounttime = time.Now()
- func (input *Input) send(t *task, workerNum int) error {
- hostname, _ := os.Hostname()
- var (
- status = "success"
- message = "Pings Complete"
- m = map[string]interface{}{
- "service": "icmp",
- "monitorHost": hostname,
- "host": t.Server,
- "pollInterval": t.Poll,
- "taskTime": t.TaskTime,
- "indicator": t.Indicator,
- }
- )
- pinger, err := probing.NewPinger(t.Server)
- if err != nil {
- status = "failure"
- message = err.Error()
- sips := strset.New(input.ips...)
- sips.Remove(t.Server)
- input.ips = sips.List()
- logger.Error(err)
- } else {
- /*
- https://stackoverflow.com/questions/41423637/go-ping-library-for-unprivileged-icmp-ping-in-golang/41425527#41425527
- This library attempts to send an "unprivileged" ping via UDP. On linux, this must be enabled by setting
- sudo sysctl -w net.ipv4.ping_group_range="0 2147483647"
- If you do not wish to do this, you can set pinger.SetPrivileged(true) and use setcap to allow your binary using go-ping to bind to raw sockets (or just run as super-user):
- setcap cap_net_raw=+ep /bin/goping-binary
- getcap /bin/goping-binary to validate
- */
- pinger.SetPrivileged(false)
- if t.NumberOfPings > 0 {
- pinger.Count = t.NumberOfPings
- } else {
- pinger.Count = 1
- }
- if t.PacketInterval > 0 {
- pinger.Interval = time.Millisecond * time.Duration(t.PacketInterval)
- }
- if t.PacketSize > 0 {
- pinger.Size = t.PacketSize
- }
- if t.Timeout > 0 {
- pinger.Timeout = time.Second * time.Duration(t.Timeout)
- } else {
- pinger.Timeout = time.Second * time.Duration(pinger.Count)
- }
- var (
- consecutiveFailures int
- )
- var recvMutex sync.Mutex
- var recvList = map[int]int{}
- pinger.OnSend = func(pkt *probing.Packet) {
- recvMutex.Lock()
- if recvList[pkt.Seq] != 0 {
- println("并发控制有问题")
- }
- recvList[pkt.Seq]++
- recvMutex.Unlock()
- input.statinfomutex.Lock()
- si := input.statinfo[t.Server]
- if si == nil {
- si = &StatInfo{}
- input.statinfo[t.Server] = si
- }
- input.ipaddrs[t.Server] = pkt.IPAddr.String()
- si.Count++
- input.statinfomutex.Unlock()
- }
- pingMsg := fmt.Sprintf("\nWoker %d PING %s (%s):\n", workerNum, pinger.Addr(), pinger.IPAddr())
- pinger.OnRecv = func(pkt *probing.Packet) {
- recvMutex.Lock()
- recvList[pkt.Seq]++
- recvMutex.Unlock()
- s := fmt.Sprintf("%s %d bytes from %s: icmp_seq=%d time=%v\n",
- time.Now().Format("15:04:05.000"), pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt)
- pingMsg += s
- // fmt.Print(s)
- // printpingcount
- atomic.AddInt32(&pingcount, 1)
- }
- pinger.OnDuplicateRecv = func(pkt *probing.Packet) {
- recvMutex.Lock()
- recvList[pkt.Seq]++
- recvMutex.Unlock()
- s := fmt.Sprintf("%s %d bytes from %s: icmp_seq=%d time=%v (DUP!)\n",
- time.Now().Format("15:04:05.000"), pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt)
- pingMsg += s
- // fmt.Print(s)
- }
- pinger.OnFinish = func(stats *probing.Statistics) {
- m["numberPackets"] = stats.PacketsSent
- m["averageRTT"] = stats.AvgRtt.Milliseconds()
- if stats.PacketsSent == 0 {
- m["respondPercent"] = 1
- } else {
- m["respondPercent"] = stats.PacketsRecv / stats.PacketsSent * 100
- }
- //m["failureRetests"] = ?
- s := fmt.Sprintf("--- %s ping statistics ---\n", stats.Addr)
- s += fmt.Sprintf("%d packets transmitted, %d packets received, %v%% packet loss\n",
- stats.PacketsSent, stats.PacketsRecv, stats.PacketLoss)
- s += fmt.Sprintf("round-trip min/avg/max/stddev = %v/%v/%v/%v\n",
- stats.MinRtt, stats.AvgRtt, stats.MaxRtt, stats.StdDevRtt)
- pingMsg += s
- // fmt.Print(s)
- input.statinfomutex.Lock()
- si := input.statinfo[t.Server]
- if si == nil {
- si = &StatInfo{
- MinRtt: stats.MinRtt,
- MaxRtt: stats.MaxRtt,
- AvgRtt: stats.AvgRtt,
- LossCount: 0,
- Count: 0,
- }
- input.statinfo[t.Server] = si
- input.ipaddrs[t.Server] = stats.IPAddr.String()
- }
- if stats.MinRtt > 0 && (si.MinRtt == 0 || stats.MinRtt < si.MinRtt) {
- si.MinRtt = stats.MinRtt
- }
- if stats.MaxRtt > si.MaxRtt {
- si.MaxRtt = stats.MaxRtt
- }
- si.AvgRtt = stats.AvgRtt
- si.LossCount += stats.PacketsSent - stats.PacketsRecv
- input.statinfomutex.Unlock()
- }
- m["requestTime"] = time.Now().UnixNano() / int64(time.Millisecond)
- if err = pinger.Run(); err != nil {
- status = "failure"
- message = err.Error()
- logger.Errorf("Ping error: %v", err)
- } else {
- }
- m["responseTime"] = time.Now().UnixNano() / int64(time.Millisecond)
- var failCount, totalFailCount int
- recvMutex.Lock()
- for i := range recvList {
- if recvList[i] < 2 {
- failCount++
- message = "Packet loss"
- totalFailCount++
- } else {
- failCount = 0
- }
- if failCount > consecutiveFailures {
- consecutiveFailures = failCount
- }
- pingMsg += fmt.Sprintf("icmp_seq:%d %t\n", i, (recvList[i] >= 2))
- }
- // logger.Debug(pingMsg)
- m["consecutiveFailures"] = consecutiveFailures
- if totalFailCount == len(recvList) {
- message = "ICMP echo failed"
- }
- recvMutex.Unlock()
- }
- // Special
- m["returnStatus"] = status
- m["message"] = message
- b, err := json.Marshal(m)
- if err != nil {
- logger.Fatal(err)
- }
- protocolMsg := map[string]interface{}{
- "protocol": true,
- "output": b,
- "attr": map[string]string{"previous_key": input.Domain + "_" + t.Server},
- }
- b, err = json.Marshal(protocolMsg)
- if err != nil {
- logger.Fatal(err)
- }
- return nil
- }
- var detectips = []string{}
- func detect(detect_interval time.Duration) {
- t := time.NewTicker(detect_interval)
- for {
- <-t.C
- randip := fmt.Sprintf("%d.%d.%d.%d", 1+rand.Intn(254), 1+rand.Intn(254), 1+rand.Intn(254), 1+rand.Intn(254))
- allipsmutex.Lock()
- has := allips.Has(randip)
- allipsmutex.Unlock()
- if !has {
- if detectip(randip) {
- allipsmutex.Lock()
- detectips = append(detectips, randip)
- allipsmutex.Unlock()
- }
- }
- }
- }
- func detectip(addr string) (ok bool) {
- pinger, err := probing.NewPinger(addr)
- if err != nil {
- return false
- }
- pinger.SetPrivileged(false)
- pinger.Count = 1
- pinger.Interval = 200 * time.Millisecond
- pinger.Size = 32
- pinger.Timeout = 1 * time.Second
- pinger.OnRecv = func(pkt *probing.Packet) {
- ok = true
- }
- pinger.Run()
- return
- }
|