package main import ( "encoding/json" "fmt" "math/rand" "os" "runtime" "sort" "strings" "sync" "sync/atomic" "time" "trial/ping/probing" "trial/ping/probing/icmp" "trial/ping/utils" "git.wecise.com/wecise/common/logger" "git.wecise.com/wecise/common/matrix/cfg" "git.wecise.com/wecise/common/matrix/util" "github.com/scylladb/go-set/strset" ) type task struct { Server string `json:"server"` Timeout int `json:"timeout"` NumberOfPings int `json:"numberofpings"` PacketInterval int `json:"packetinterval"` PacketSize int `json:"packetsize"` TypeOfService int `json:"typeofservice"` // Not used Retries int `json:"retries"` // Not used Poll int `json:"poll"` FailureRetests int `json:"failureretests"` // Not used RetestInterval int `json:"retestinterval"` // Not used HostNameLookupPreference string `json:"hostnamelookuppreference"` // Not used Description string `json:"description"` // Not used Rule string `json:"rule"` PerfRule string `json:"perfrule"` TaskTime int64 `json:"tasktime"` Indicator string `json:"indicator"` } type InputConfig struct { Poolsize int `toml:"poolsize"` Domain string `toml:"domain"` StatInterval int `toml:"stat_interval"` } type StatInfo struct { MinRtt time.Duration MaxRtt time.Duration AvgRtt time.Duration LossCount int Count int } type Input struct { *InputConfig ips []string statinfomutex sync.Mutex statinfo map[string]*StatInfo ipaddrs map[string]string subSize int workChan chan *task stopChan chan bool ds *utils.DataStat } var mcfg = cfg.MConfig() var allipsmutex sync.Mutex var allips = strset.New() /* poolsize=1000 同时ping的不同目标地址个数 count=5 每轮ping的次数,每轮次产生一次统计结果,并切换目标地址 interval=1000 两次ping之间的间隔,单位毫秒 size=64 每次ping发送字节数,48~8192 timeout=2 ping超时,单位秒 detect=1ms 自动发现可ping通地址,并追加到 hosts.txt 文件,设置发现间隔时间需指定单位,如 ms,毫秒,us,微妙等,默认 0,不进行发现处理 */ func main() { input := &Input{} input.statinfo = map[string]*StatInfo{} input.ipaddrs = map[string]string{} inputcfg := &InputConfig{ Poolsize: 1000, StatInterval: 600, } inputcfg.Poolsize = mcfg.GetInt("poolsize", inputcfg.Poolsize) if detect_interval := mcfg.GetDuration("detect", 0); detect_interval != 0 { go detect(detect_interval) } fips := func() []string { xips := mcfg.GetStrings("ip|ping.ip", "") allipsmutex.Lock() for _, aips := range xips { allips.Add(strings.Split(aips, ",")...) } bs, _ := util.ReadFile("./hosts.txt") if len(bs) > 0 { xips := strings.Split(string(bs), "\n") for _, aips := range xips { allips.Add(strings.Split(aips, ",")...) } } if len(detectips) > 0 { util.WriteFile("./hosts.txt", []byte(string(bs)+"\n"+strings.Join(detectips, ",")), true) detectips = detectips[:0] } sips := allips.List() allipsmutex.Unlock() return sips } go func() { t := time.NewTicker(5 * time.Second) for { <-t.C input.ips = fips() } }() input.ips = fips() mcfg.OnChanged(func(cfg cfg.Configure) { input.ips = fips() }) input.Init(inputcfg) go input.Run() go func() { last_count := 0 t := time.NewTicker(1 * time.Second) for { select { case <-t.C: s := "" input.statinfomutex.Lock() ks := []string{} for k := range input.statinfo { ks = append(ks, k) } sort.Strings(ks) for i, k := range ks { v := input.statinfo[k] ip := input.ipaddrs[k] s += fmt.Sprintf("%-3d %-20s %15s : %-12s [%12s ~ %-12s] loss %d/%d\n", i, k, ip, v.AvgRtt, v.MinRtt, v.MaxRtt, v.LossCount, v.Count) } input.statinfomutex.Unlock() logger.Info("统计信息更新:", fmt.Sprint("\n", s)) logger.Info(util.FormatDuration(time.Since(starttime)), "已经完成", pingcount, "次Ping操作,", "平均每秒", (int64(pingcount+1) * int64(time.Second) / int64(time.Since(starttime))), "次", "最近一秒", (pingcount - int32(last_count)), "次", "最大缓冲", icmp.MaxReceiveBufferUsed(), ) last_count = int(pingcount) } } }() n := 0 for { if n >= len(input.ips) { n = 0 input.ips = append(input.ips, detectips...) } ip := input.ips[n] n++ // ip = strings.TrimSpace(ip) if ip != "" { input.workChan <- &task{ Server: ip, Timeout: mcfg.GetInt("timeout", 2), NumberOfPings: mcfg.GetInt("count", 5), // 至少为2,小于2会导致不能正常结束 PacketInterval: mcfg.GetInt("interval", 1000), PacketSize: mcfg.GetInt("size", 64), } } // time.Sleep(1 * time.Millisecond) } } func (input *Input) Init(config interface{}) error { input.InputConfig = config.(*InputConfig) input.workChan = make(chan *task, input.Poolsize) input.stopChan = make(chan bool) input.subSize = runtime.GOMAXPROCS(0) return nil } func (input *Input) Run() (err error) { input.ds = utils.NewDataStat("service", "", input.StatInterval, input.Poolsize*2) defer input.ds.Close() go input.ds.Start() defer close(input.workChan) var wg sync.WaitGroup // Start worker // count := int32(0) // lpcmutex := sync.Mutex{} // lastprintcount := time.Now() for i := 0; i < input.Poolsize; i++ { go func(n int) { for t := range input.workChan { wg.Add(1) // Run task if e := input.send(t, n); e != nil { logger.Error("Send error:", e) input.ds.Send(1) } else { input.ds.Send(0) } wg.Done() // x := atomic.AddInt32(&count, 1) // lpcmutex.Lock() // printmsg := time.Since(lastprintcount) >= 1*time.Second // if printmsg { // lastprintcount = time.Now() // } // lpcmutex.Unlock() // if printmsg { // logger.Info("已经完成", x, "个目标的Ping操作") // } } }(i) } <-input.stopChan wg.Wait() return nil } var pingcount int32 var starttime = time.Now() var printpingcountmutex sync.Mutex var printpingcounttime = time.Now() func (input *Input) send(t *task, workerNum int) error { hostname, _ := os.Hostname() var ( status = "success" message = "Pings Complete" m = map[string]interface{}{ "service": "icmp", "monitorHost": hostname, "host": t.Server, "pollInterval": t.Poll, "taskTime": t.TaskTime, "indicator": t.Indicator, } ) pinger, err := probing.NewPinger(t.Server) if err != nil { status = "failure" message = err.Error() sips := strset.New(input.ips...) sips.Remove(t.Server) input.ips = sips.List() logger.Error(err) } else { /* https://stackoverflow.com/questions/41423637/go-ping-library-for-unprivileged-icmp-ping-in-golang/41425527#41425527 This library attempts to send an "unprivileged" ping via UDP. On linux, this must be enabled by setting sudo sysctl -w net.ipv4.ping_group_range="0 2147483647" If you do not wish to do this, you can set pinger.SetPrivileged(true) and use setcap to allow your binary using go-ping to bind to raw sockets (or just run as super-user): setcap cap_net_raw=+ep /bin/goping-binary getcap /bin/goping-binary to validate */ pinger.SetPrivileged(false) if t.NumberOfPings > 0 { pinger.Count = t.NumberOfPings } else { pinger.Count = 1 } if t.PacketInterval > 0 { pinger.Interval = time.Millisecond * time.Duration(t.PacketInterval) } if t.PacketSize > 0 { pinger.Size = t.PacketSize } if t.Timeout > 0 { pinger.Timeout = time.Second * time.Duration(t.Timeout) } else { pinger.Timeout = time.Second * time.Duration(pinger.Count) } var ( consecutiveFailures int ) var recvMutex sync.Mutex var recvList = map[int]int{} pinger.OnSend = func(pkt *probing.Packet) { recvMutex.Lock() if recvList[pkt.Seq] != 0 { println("并发控制有问题") } recvList[pkt.Seq]++ recvMutex.Unlock() input.statinfomutex.Lock() si := input.statinfo[t.Server] if si == nil { si = &StatInfo{} input.statinfo[t.Server] = si } input.ipaddrs[t.Server] = pkt.IPAddr.String() si.Count++ input.statinfomutex.Unlock() } pingMsg := fmt.Sprintf("\nWoker %d PING %s (%s):\n", workerNum, pinger.Addr(), pinger.IPAddr()) pinger.OnRecv = func(pkt *probing.Packet) { recvMutex.Lock() recvList[pkt.Seq]++ recvMutex.Unlock() s := fmt.Sprintf("%s %d bytes from %s: icmp_seq=%d time=%v\n", time.Now().Format("15:04:05.000"), pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt) pingMsg += s // fmt.Print(s) // printpingcount atomic.AddInt32(&pingcount, 1) } pinger.OnDuplicateRecv = func(pkt *probing.Packet) { recvMutex.Lock() recvList[pkt.Seq]++ recvMutex.Unlock() s := fmt.Sprintf("%s %d bytes from %s: icmp_seq=%d time=%v (DUP!)\n", time.Now().Format("15:04:05.000"), pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt) pingMsg += s // fmt.Print(s) } pinger.OnFinish = func(stats *probing.Statistics) { m["numberPackets"] = stats.PacketsSent m["averageRTT"] = stats.AvgRtt.Milliseconds() if stats.PacketsSent == 0 { m["respondPercent"] = 1 } else { m["respondPercent"] = stats.PacketsRecv / stats.PacketsSent * 100 } //m["failureRetests"] = ? s := fmt.Sprintf("--- %s ping statistics ---\n", stats.Addr) s += fmt.Sprintf("%d packets transmitted, %d packets received, %v%% packet loss\n", stats.PacketsSent, stats.PacketsRecv, stats.PacketLoss) s += fmt.Sprintf("round-trip min/avg/max/stddev = %v/%v/%v/%v\n", stats.MinRtt, stats.AvgRtt, stats.MaxRtt, stats.StdDevRtt) pingMsg += s // fmt.Print(s) input.statinfomutex.Lock() si := input.statinfo[t.Server] if si == nil { si = &StatInfo{ MinRtt: stats.MinRtt, MaxRtt: stats.MaxRtt, AvgRtt: stats.AvgRtt, LossCount: 0, Count: 0, } input.statinfo[t.Server] = si input.ipaddrs[t.Server] = stats.IPAddr.String() } if stats.MinRtt > 0 && (si.MinRtt == 0 || stats.MinRtt < si.MinRtt) { si.MinRtt = stats.MinRtt } if stats.MaxRtt > si.MaxRtt { si.MaxRtt = stats.MaxRtt } si.AvgRtt = stats.AvgRtt si.LossCount += stats.PacketsSent - stats.PacketsRecv input.statinfomutex.Unlock() } m["requestTime"] = time.Now().UnixNano() / int64(time.Millisecond) if err = pinger.Run(); err != nil { status = "failure" message = err.Error() logger.Errorf("Ping error: %v", err) } else { } m["responseTime"] = time.Now().UnixNano() / int64(time.Millisecond) var failCount, totalFailCount int recvMutex.Lock() for i := range recvList { if recvList[i] < 2 { failCount++ message = "Packet loss" totalFailCount++ } else { failCount = 0 } if failCount > consecutiveFailures { consecutiveFailures = failCount } pingMsg += fmt.Sprintf("icmp_seq:%d %t\n", i, (recvList[i] >= 2)) } // logger.Debug(pingMsg) m["consecutiveFailures"] = consecutiveFailures if totalFailCount == len(recvList) { message = "ICMP echo failed" } recvMutex.Unlock() } // Special m["returnStatus"] = status m["message"] = message b, err := json.Marshal(m) if err != nil { logger.Fatal(err) } protocolMsg := map[string]interface{}{ "protocol": true, "output": b, "attr": map[string]string{"previous_key": input.Domain + "_" + t.Server}, } b, err = json.Marshal(protocolMsg) if err != nil { logger.Fatal(err) } return nil } var detectips = []string{} func detect(detect_interval time.Duration) { t := time.NewTicker(detect_interval) for { <-t.C randip := fmt.Sprintf("%d.%d.%d.%d", 1+rand.Intn(254), 1+rand.Intn(254), 1+rand.Intn(254), 1+rand.Intn(254)) allipsmutex.Lock() has := allips.Has(randip) allipsmutex.Unlock() if !has { if detectip(randip) { allipsmutex.Lock() detectips = append(detectips, randip) allipsmutex.Unlock() } } } } func detectip(addr string) (ok bool) { pinger, err := probing.NewPinger(addr) if err != nil { return false } pinger.SetPrivileged(false) pinger.Count = 1 pinger.Interval = 200 * time.Millisecond pinger.Size = 32 pinger.Timeout = 1 * time.Second pinger.OnRecv = func(pkt *probing.Packet) { ok = true } pinger.Run() return }