ping.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "math/rand"
  6. "os"
  7. "runtime"
  8. "sort"
  9. "strings"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "trial/ping/probing"
  14. "trial/ping/utils"
  15. "git.wecise.com/wecise/common/logger"
  16. "git.wecise.com/wecise/common/matrix/cfg"
  17. "git.wecise.com/wecise/common/matrix/util"
  18. "github.com/scylladb/go-set/strset"
  19. )
  20. type task struct {
  21. Server string `json:"server"`
  22. Timeout int `json:"timeout"`
  23. NumberOfPings int `json:"numberofpings"`
  24. PacketInterval int `json:"packetinterval"`
  25. PacketSize int `json:"packetsize"`
  26. TypeOfService int `json:"typeofservice"` // Not used
  27. Retries int `json:"retries"` // Not used
  28. Poll int `json:"poll"`
  29. FailureRetests int `json:"failureretests"` // Not used
  30. RetestInterval int `json:"retestinterval"` // Not used
  31. HostNameLookupPreference string `json:"hostnamelookuppreference"` // Not used
  32. Description string `json:"description"` // Not used
  33. Rule string `json:"rule"`
  34. PerfRule string `json:"perfrule"`
  35. TaskTime int64 `json:"tasktime"`
  36. Indicator string `json:"indicator"`
  37. }
  38. type InputConfig struct {
  39. Poolsize int `toml:"poolsize"`
  40. Domain string `toml:"domain"`
  41. StatInterval int `toml:"stat_interval"`
  42. }
  43. type StatInfo struct {
  44. MinRtt time.Duration
  45. MaxRtt time.Duration
  46. AvgRtt time.Duration
  47. LossCount int
  48. Count int
  49. }
  50. type Input struct {
  51. *InputConfig
  52. ips []string
  53. statinfomutex sync.Mutex
  54. statinfo map[string]*StatInfo
  55. ipaddrs map[string]string
  56. subSize int
  57. workChan chan *task
  58. stopChan chan bool
  59. ds *utils.DataStat
  60. }
  61. var mcfg = cfg.MConfig()
  62. var allipsmutex sync.Mutex
  63. var allips = strset.New()
  64. func main() {
  65. input := &Input{}
  66. input.statinfo = map[string]*StatInfo{}
  67. input.ipaddrs = map[string]string{}
  68. inputcfg := &InputConfig{
  69. Poolsize: 1000,
  70. StatInterval: 600,
  71. }
  72. inputcfg.Poolsize = mcfg.GetInt("poolsize", inputcfg.Poolsize)
  73. if detect_interval := mcfg.GetDuration("detect", 0); detect_interval != 0 {
  74. go detect(detect_interval)
  75. }
  76. fips := func() []string {
  77. xips := mcfg.GetStrings("ip|ping.ip", "")
  78. allipsmutex.Lock()
  79. for _, aips := range xips {
  80. allips.Add(strings.Split(aips, ",")...)
  81. }
  82. bs, _ := util.ReadFile("./hosts.txt")
  83. if len(bs) > 0 {
  84. xips := strings.Split(string(bs), "\n")
  85. for _, aips := range xips {
  86. allips.Add(strings.Split(aips, ",")...)
  87. }
  88. }
  89. if len(detectips) > 0 {
  90. util.WriteFile("./hosts.txt", []byte(string(bs)+"\n"+strings.Join(detectips, ",")), true)
  91. detectips = detectips[:0]
  92. }
  93. sips := allips.List()
  94. allipsmutex.Unlock()
  95. return sips
  96. }
  97. go func() {
  98. t := time.NewTicker(5 * time.Second)
  99. for {
  100. <-t.C
  101. input.ips = fips()
  102. }
  103. }()
  104. input.ips = fips()
  105. mcfg.OnChanged(func(cfg cfg.Configure) {
  106. input.ips = fips()
  107. })
  108. input.Init(inputcfg)
  109. go input.Run()
  110. go func() {
  111. last_count := 0
  112. t := time.NewTicker(1 * time.Second)
  113. for {
  114. select {
  115. case <-t.C:
  116. s := ""
  117. input.statinfomutex.Lock()
  118. ks := []string{}
  119. for k := range input.statinfo {
  120. ks = append(ks, k)
  121. }
  122. sort.Strings(ks)
  123. for i, k := range ks {
  124. v := input.statinfo[k]
  125. ip := input.ipaddrs[k]
  126. s += fmt.Sprintf("%-3d %-20s %15s : %-12s [%12s ~ %-12s] loss %d/%d\n", i, k, ip, v.AvgRtt, v.MinRtt, v.MaxRtt, v.LossCount, v.Count)
  127. }
  128. input.statinfomutex.Unlock()
  129. logger.Info("统计信息更新:", fmt.Sprint("\n", s))
  130. logger.Info(util.FormatDuration(time.Since(starttime)), "已经完成", pingcount, "次Ping操作,",
  131. "平均每秒", (int64(pingcount+1) * int64(time.Second) / int64(time.Since(starttime))), "次",
  132. "最近一秒", (pingcount - int32(last_count)), "次",
  133. )
  134. last_count = int(pingcount)
  135. }
  136. }
  137. }()
  138. n := 0
  139. for {
  140. if n >= len(input.ips) {
  141. n = 0
  142. input.ips = append(input.ips, detectips...)
  143. }
  144. ip := input.ips[n]
  145. n++
  146. //
  147. ip = strings.TrimSpace(ip)
  148. if ip != "" {
  149. input.workChan <- &task{
  150. Server: ip,
  151. Timeout: mcfg.GetInt("timeout", 2),
  152. NumberOfPings: mcfg.GetInt("count", 5), // 至少为2,小于2会导致不能正常结束
  153. PacketInterval: mcfg.GetInt("interval", 1000),
  154. PacketSize: mcfg.GetInt("size", 32),
  155. }
  156. }
  157. // time.Sleep(1 * time.Millisecond)
  158. }
  159. }
  160. func (input *Input) Init(config interface{}) error {
  161. input.InputConfig = config.(*InputConfig)
  162. input.workChan = make(chan *task, input.Poolsize)
  163. input.stopChan = make(chan bool)
  164. input.subSize = runtime.GOMAXPROCS(0)
  165. return nil
  166. }
  167. func (input *Input) Run() (err error) {
  168. input.ds = utils.NewDataStat("service", "", input.StatInterval, input.Poolsize*2)
  169. defer input.ds.Close()
  170. go input.ds.Start()
  171. defer close(input.workChan)
  172. var wg sync.WaitGroup
  173. // Start worker
  174. // count := int32(0)
  175. // lpcmutex := sync.Mutex{}
  176. // lastprintcount := time.Now()
  177. for i := 0; i < input.Poolsize; i++ {
  178. go func(n int) {
  179. for t := range input.workChan {
  180. wg.Add(1)
  181. // Run task
  182. if e := input.send(t, n); e != nil {
  183. logger.Error("Send error:", e)
  184. input.ds.Send(1)
  185. } else {
  186. input.ds.Send(0)
  187. }
  188. wg.Done()
  189. // x := atomic.AddInt32(&count, 1)
  190. // lpcmutex.Lock()
  191. // printmsg := time.Since(lastprintcount) >= 1*time.Second
  192. // if printmsg {
  193. // lastprintcount = time.Now()
  194. // }
  195. // lpcmutex.Unlock()
  196. // if printmsg {
  197. // logger.Info("已经完成", x, "个目标的Ping操作")
  198. // }
  199. }
  200. }(i)
  201. }
  202. <-input.stopChan
  203. wg.Wait()
  204. return nil
  205. }
  206. var pingcount int32
  207. var starttime = time.Now()
  208. var printpingcountmutex sync.Mutex
  209. var printpingcounttime = time.Now()
  210. func (input *Input) send(t *task, workerNum int) error {
  211. hostname, _ := os.Hostname()
  212. var (
  213. status = "success"
  214. message = "Pings Complete"
  215. m = map[string]interface{}{
  216. "service": "icmp",
  217. "monitorHost": hostname,
  218. "host": t.Server,
  219. "pollInterval": t.Poll,
  220. "taskTime": t.TaskTime,
  221. "indicator": t.Indicator,
  222. }
  223. )
  224. pinger, err := probing.NewPinger(t.Server)
  225. if err != nil {
  226. status = "failure"
  227. message = err.Error()
  228. sips := strset.New(input.ips...)
  229. sips.Remove(t.Server)
  230. input.ips = sips.List()
  231. logger.Error(err)
  232. } else {
  233. /*
  234. https://stackoverflow.com/questions/41423637/go-ping-library-for-unprivileged-icmp-ping-in-golang/41425527#41425527
  235. This library attempts to send an "unprivileged" ping via UDP. On linux, this must be enabled by setting
  236. sudo sysctl -w net.ipv4.ping_group_range="0 2147483647"
  237. If you do not wish to do this, you can set pinger.SetPrivileged(true) and use setcap to allow your binary using go-ping to bind to raw sockets (or just run as super-user):
  238. setcap cap_net_raw=+ep /bin/goping-binary
  239. getcap /bin/goping-binary to validate
  240. */
  241. pinger.SetPrivileged(false)
  242. pinger.TTL = 64
  243. if t.NumberOfPings > 0 {
  244. pinger.Count = t.NumberOfPings
  245. } else {
  246. pinger.Count = 1
  247. }
  248. if t.PacketInterval > 0 {
  249. pinger.Interval = time.Millisecond * time.Duration(t.PacketInterval)
  250. }
  251. if t.PacketSize > 0 {
  252. pinger.Size = t.PacketSize
  253. }
  254. if t.Timeout > 0 {
  255. pinger.Timeout = time.Second * time.Duration(t.Timeout)
  256. } else {
  257. pinger.Timeout = time.Second * time.Duration(pinger.Count)
  258. }
  259. var (
  260. consecutiveFailures int
  261. )
  262. var recvMutex sync.Mutex
  263. var recvList = map[int]int{}
  264. pinger.OnSend = func(pkt *probing.Packet) {
  265. recvMutex.Lock()
  266. if recvList[pkt.Seq] != 0 {
  267. println("并发控制有问题")
  268. }
  269. recvList[pkt.Seq]++
  270. recvMutex.Unlock()
  271. input.statinfomutex.Lock()
  272. si := input.statinfo[t.Server]
  273. if si == nil {
  274. si = &StatInfo{}
  275. input.statinfo[t.Server] = si
  276. }
  277. input.ipaddrs[t.Server] = pkt.IPAddr.String()
  278. si.Count++
  279. input.statinfomutex.Unlock()
  280. }
  281. pingMsg := fmt.Sprintf("\nWoker %d PING %s (%s):\n", workerNum, pinger.Addr(), pinger.IPAddr())
  282. pinger.OnRecv = func(pkt *probing.Packet) {
  283. recvMutex.Lock()
  284. recvList[pkt.Seq]++
  285. recvMutex.Unlock()
  286. s := fmt.Sprintf("%s %d bytes from %s: icmp_seq=%d time=%v\n",
  287. time.Now().Format("15:04:05.000"), pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt)
  288. pingMsg += s
  289. // fmt.Print(s)
  290. // printpingcount
  291. atomic.AddInt32(&pingcount, 1)
  292. }
  293. pinger.OnDuplicateRecv = func(pkt *probing.Packet) {
  294. recvMutex.Lock()
  295. recvList[pkt.Seq]++
  296. recvMutex.Unlock()
  297. s := fmt.Sprintf("%s %d bytes from %s: icmp_seq=%d time=%v (DUP!)\n",
  298. time.Now().Format("15:04:05.000"), pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt)
  299. pingMsg += s
  300. // fmt.Print(s)
  301. }
  302. pinger.OnFinish = func(stats *probing.Statistics) {
  303. m["numberPackets"] = stats.PacketsSent
  304. m["averageRTT"] = stats.AvgRtt.Milliseconds()
  305. if stats.PacketsSent == 0 {
  306. m["respondPercent"] = 1
  307. } else {
  308. m["respondPercent"] = stats.PacketsRecv / stats.PacketsSent * 100
  309. }
  310. //m["failureRetests"] = ?
  311. s := fmt.Sprintf("--- %s ping statistics ---\n", stats.Addr)
  312. s += fmt.Sprintf("%d packets transmitted, %d packets received, %v%% packet loss\n",
  313. stats.PacketsSent, stats.PacketsRecv, stats.PacketLoss)
  314. s += fmt.Sprintf("round-trip min/avg/max/stddev = %v/%v/%v/%v\n",
  315. stats.MinRtt, stats.AvgRtt, stats.MaxRtt, stats.StdDevRtt)
  316. pingMsg += s
  317. // fmt.Print(s)
  318. input.statinfomutex.Lock()
  319. si := input.statinfo[t.Server]
  320. if si == nil {
  321. si = &StatInfo{
  322. MinRtt: stats.MinRtt,
  323. MaxRtt: stats.MaxRtt,
  324. AvgRtt: stats.AvgRtt,
  325. LossCount: 0,
  326. Count: 0,
  327. }
  328. input.statinfo[t.Server] = si
  329. input.ipaddrs[t.Server] = stats.IPAddr.String()
  330. }
  331. if stats.MinRtt > 0 && (si.MinRtt == 0 || stats.MinRtt < si.MinRtt) {
  332. si.MinRtt = stats.MinRtt
  333. }
  334. if stats.MaxRtt > si.MaxRtt {
  335. si.MaxRtt = stats.MaxRtt
  336. }
  337. si.AvgRtt = stats.AvgRtt
  338. si.LossCount += stats.PacketsSent - stats.PacketsRecv
  339. input.statinfomutex.Unlock()
  340. }
  341. m["requestTime"] = time.Now().UnixNano() / int64(time.Millisecond)
  342. if err = pinger.Run(); err != nil {
  343. status = "failure"
  344. message = err.Error()
  345. logger.Errorf("Ping error: %v", err)
  346. } else {
  347. }
  348. m["responseTime"] = time.Now().UnixNano() / int64(time.Millisecond)
  349. var failCount, totalFailCount int
  350. recvMutex.Lock()
  351. for i := range recvList {
  352. if recvList[i] < 2 {
  353. failCount++
  354. message = "Packet loss"
  355. totalFailCount++
  356. } else {
  357. failCount = 0
  358. }
  359. if failCount > consecutiveFailures {
  360. consecutiveFailures = failCount
  361. }
  362. pingMsg += fmt.Sprintf("icmp_seq:%d %t\n", i, (recvList[i] >= 2))
  363. }
  364. // logger.Debug(pingMsg)
  365. m["consecutiveFailures"] = consecutiveFailures
  366. if totalFailCount == len(recvList) {
  367. message = "ICMP echo failed"
  368. }
  369. recvMutex.Unlock()
  370. }
  371. // Special
  372. m["returnStatus"] = status
  373. m["message"] = message
  374. b, err := json.Marshal(m)
  375. if err != nil {
  376. logger.Fatal(err)
  377. }
  378. protocolMsg := map[string]interface{}{
  379. "protocol": true,
  380. "output": b,
  381. "attr": map[string]string{"previous_key": input.Domain + "_" + t.Server},
  382. }
  383. b, err = json.Marshal(protocolMsg)
  384. if err != nil {
  385. logger.Fatal(err)
  386. }
  387. return nil
  388. }
  389. var detectips = []string{}
  390. func detect(detect_interval time.Duration) {
  391. t := time.NewTicker(detect_interval)
  392. for {
  393. <-t.C
  394. randip := fmt.Sprintf("%d.%d.%d.%d", 1+rand.Intn(254), 1+rand.Intn(254), 1+rand.Intn(254), 1+rand.Intn(254))
  395. allipsmutex.Lock()
  396. has := allips.Has(randip)
  397. allipsmutex.Unlock()
  398. if !has {
  399. if detectip(randip) {
  400. allipsmutex.Lock()
  401. detectips = append(detectips, randip)
  402. allipsmutex.Unlock()
  403. }
  404. }
  405. }
  406. }
  407. func detectip(addr string) (ok bool) {
  408. pinger, err := probing.NewPinger(addr)
  409. if err != nil {
  410. return false
  411. }
  412. pinger.SetPrivileged(false)
  413. pinger.TTL = 64
  414. pinger.Count = 1
  415. pinger.Interval = 200 * time.Millisecond
  416. pinger.Size = 32
  417. pinger.Timeout = 1 * time.Second
  418. pinger.OnRecv = func(pkt *probing.Packet) {
  419. ok = true
  420. }
  421. pinger.Run()
  422. return
  423. }