|
@@ -53,6 +53,7 @@ package probing
|
|
|
|
|
|
import (
|
|
|
"errors"
|
|
|
+ "fmt"
|
|
|
"math"
|
|
|
"math/rand"
|
|
|
"net"
|
|
@@ -72,13 +73,11 @@ var receive_buffer_count = mcfg.GetInt("ping.recv.buf.count", 100)
|
|
|
var ttl = mcfg.GetInt("ping.ttl", 64)
|
|
|
var ping_interval = mcfg.GetDuration("ping.interval", 1000*time.Millisecond)
|
|
|
var concurlimit_ping = mcfg.GetInt("concurlimit.ping", 100)
|
|
|
-var concurlimit_send = mcfg.GetInt("concurlimit.send", 10)
|
|
|
var concurchan_ping = make(chan struct{}, concurlimit_ping)
|
|
|
-var concurchan_send = make(chan struct{}, concurlimit_send)
|
|
|
var lastpingtimemutex sync.Mutex
|
|
|
var lastpingtime = map[string]time.Time{}
|
|
|
-var lastsendtimemutex sync.Mutex
|
|
|
-var lastsendtime = map[string]time.Time{}
|
|
|
+
|
|
|
+var ETIMEDOUT error = fmt.Errorf("timeout")
|
|
|
|
|
|
const (
|
|
|
timeSliceLength = 8
|
|
@@ -389,10 +388,38 @@ func (p *Pinger) ID() int {
|
|
|
return p.id
|
|
|
}
|
|
|
|
|
|
+var pingipmtx sync.Mutex
|
|
|
+var pingips = map[string]chan interface{}{}
|
|
|
+var pingads = map[string]chan interface{}{}
|
|
|
+
|
|
|
// Run runs the pinger. This is a blocking function that will exit when it's
|
|
|
// done. If Count or Interval are not specified, it will run continuously until
|
|
|
// it is interrupted.
|
|
|
-func (p *Pinger) Run() error {
|
|
|
+func (p *Pinger) Run() (err error) {
|
|
|
+ // 同一地址,只能有一个实例运行,排队等待
|
|
|
+ pingipmtx.Lock()
|
|
|
+ pingipchan := pingips[p.ipaddr.String()]
|
|
|
+ if pingipchan == nil {
|
|
|
+ pingipchan = make(chan interface{}, 1)
|
|
|
+ pingips[p.ipaddr.String()] = pingipchan
|
|
|
+ }
|
|
|
+ pingadchan := pingads[p.addr]
|
|
|
+ if pingadchan == nil {
|
|
|
+ pingadchan = make(chan interface{}, 1)
|
|
|
+ pingads[p.addr] = pingadchan
|
|
|
+ }
|
|
|
+ pingipmtx.Unlock()
|
|
|
+ pingipchan <- 1
|
|
|
+ pingadchan <- 1
|
|
|
+ var last_send_time time.Time
|
|
|
+ defer func() {
|
|
|
+ d := p.Interval - time.Since(last_send_time)
|
|
|
+ if d > 0 {
|
|
|
+ time.Sleep(d) // 两次运行之间至少间隔
|
|
|
+ }
|
|
|
+ <-pingipchan
|
|
|
+ <-pingadchan
|
|
|
+ }()
|
|
|
// sleeptime := time.Duration(0)
|
|
|
// for sleeptime >= 0 {
|
|
|
// time.Sleep(sleeptime)
|
|
@@ -415,15 +442,16 @@ func (p *Pinger) Run() error {
|
|
|
// defer func() {
|
|
|
// <-concurchan_ping
|
|
|
// }()
|
|
|
- return p.run()
|
|
|
+ last_send_time, err = p.run()
|
|
|
+ return
|
|
|
}
|
|
|
|
|
|
-func (p *Pinger) run() error {
|
|
|
+func (p *Pinger) run() (time.Time, error) {
|
|
|
defer p.finish()
|
|
|
|
|
|
err := MPConn(p.ipv4, p.protocol).Listen()
|
|
|
if err != nil {
|
|
|
- return err
|
|
|
+ return time.Time{}, err
|
|
|
}
|
|
|
|
|
|
if handler := p.OnSetup; handler != nil {
|
|
@@ -431,16 +459,18 @@ func (p *Pinger) run() error {
|
|
|
}
|
|
|
|
|
|
var g errgroup.Group
|
|
|
+ var last_send_time time.Time
|
|
|
|
|
|
- g.Go(func() error {
|
|
|
+ g.Go(func() (err error) {
|
|
|
defer p.Stop()
|
|
|
- return p.runLoop()
|
|
|
+ last_send_time, err = p.runLoop()
|
|
|
+ return err
|
|
|
})
|
|
|
|
|
|
- return g.Wait()
|
|
|
+ return last_send_time, g.Wait()
|
|
|
}
|
|
|
|
|
|
-func (p *Pinger) runLoop() error {
|
|
|
+func (p *Pinger) runLoop() (time.Time, error) {
|
|
|
|
|
|
timeout := time.NewTimer(p.Timeout)
|
|
|
interval := time.NewTimer(0)
|
|
@@ -451,54 +481,13 @@ func (p *Pinger) runLoop() error {
|
|
|
}()
|
|
|
received := make(chan interface{}, 1)
|
|
|
|
|
|
- for {
|
|
|
- select {
|
|
|
- case <-p.done:
|
|
|
- return nil
|
|
|
-
|
|
|
- case <-timeout.C:
|
|
|
- return nil
|
|
|
-
|
|
|
- case <-interval.C:
|
|
|
- if err := p.ping(received); err != nil {
|
|
|
- logger.Errorf("sending packet: %s", err)
|
|
|
- }
|
|
|
- if p.Count > 0 && p.PacketsSent >= p.Count {
|
|
|
- timeout.Reset(p.Timeout)
|
|
|
- } else {
|
|
|
- interval.Reset(p.Interval)
|
|
|
- }
|
|
|
- case <-received:
|
|
|
- if p.Count > 0 && p.PacketsRecv >= p.Count {
|
|
|
- return nil
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-func (p *Pinger) ping(received chan interface{}) error {
|
|
|
-
|
|
|
- sleeptime := time.Duration(0)
|
|
|
- for sleeptime >= 0 {
|
|
|
- time.Sleep(sleeptime)
|
|
|
- lastpingtimemutex.Lock()
|
|
|
- alastpingtime := lastpingtime[p.ipaddr.String()]
|
|
|
- sleeptime = ping_interval - time.Since(alastpingtime)
|
|
|
- if sleeptime <= 0 {
|
|
|
- lastpingtime[p.ipaddr.String()] = time.Now()
|
|
|
- } else {
|
|
|
- // logger.Error(fmt.Sprint("ping", p.addr, "[", p.ipaddr.String(), "]", "同一地址至少间隔一秒"))
|
|
|
- }
|
|
|
- lastpingtimemutex.Unlock()
|
|
|
- }
|
|
|
- defer func() {
|
|
|
- lastpingtimemutex.Lock()
|
|
|
- lastpingtime[p.ipaddr.String()] = time.Now()
|
|
|
- lastpingtimemutex.Unlock()
|
|
|
- }()
|
|
|
-
|
|
|
- pinfo := newPingInfo(p.addr, p.ipaddr, p.Size, p.Timeout)
|
|
|
+ last_send_time := time.Now()
|
|
|
+ pinfo := getPingInfo(p.ipaddr)
|
|
|
+ pinfo.host = p.addr
|
|
|
+ pinfo.size = p.Size
|
|
|
+ pinfo.timeout = p.Timeout
|
|
|
pinfo.OnSend = func(pkt *icmp.Packet) {
|
|
|
+ last_send_time = pkt.SendTime
|
|
|
if p.PacketsSent == 0 {
|
|
|
p.sequence_base = pkt.Seq
|
|
|
}
|
|
@@ -507,7 +496,7 @@ func (p *Pinger) ping(received chan interface{}) error {
|
|
|
p.OnSend(&Packet{
|
|
|
Rtt: pkt.Rtt,
|
|
|
IPAddr: pkt.IPAddr,
|
|
|
- Host: pinfo.host,
|
|
|
+ Host: p.addr,
|
|
|
Nbytes: pkt.Nbytes,
|
|
|
Seq: pkt.Seq - p.sequence_base,
|
|
|
TTL: pkt.TTL,
|
|
@@ -519,7 +508,7 @@ func (p *Pinger) ping(received chan interface{}) error {
|
|
|
inpkt := &Packet{
|
|
|
Rtt: pkt.Rtt,
|
|
|
IPAddr: pkt.IPAddr,
|
|
|
- Host: pinfo.host,
|
|
|
+ Host: p.addr,
|
|
|
Nbytes: pkt.Nbytes,
|
|
|
Seq: pkt.Seq - p.sequence_base,
|
|
|
TTL: pkt.TTL,
|
|
@@ -535,7 +524,7 @@ func (p *Pinger) ping(received chan interface{}) error {
|
|
|
inpkt := &Packet{
|
|
|
Rtt: pkt.Rtt,
|
|
|
IPAddr: pkt.IPAddr,
|
|
|
- Host: pinfo.host,
|
|
|
+ Host: p.addr,
|
|
|
Nbytes: pkt.Nbytes,
|
|
|
Seq: pkt.Seq - p.sequence_base,
|
|
|
TTL: pkt.TTL,
|
|
@@ -546,9 +535,55 @@ func (p *Pinger) ping(received chan interface{}) error {
|
|
|
}
|
|
|
p.updateStatistics(inpkt)
|
|
|
}
|
|
|
- return MPConn(p.ipv4, p.protocol).Ping(pinfo)
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-p.done:
|
|
|
+ return last_send_time, nil
|
|
|
+
|
|
|
+ case <-timeout.C:
|
|
|
+ return last_send_time, nil
|
|
|
+
|
|
|
+ case <-interval.C:
|
|
|
+ if p.Count > 0 && p.PacketsSent >= p.Count {
|
|
|
+ timeout.Reset(p.Timeout)
|
|
|
+ } else {
|
|
|
+ if err := MPConn(p.ipv4, p.protocol).Ping(pinfo); err != nil {
|
|
|
+ logger.Errorf("sending packet: %s", err)
|
|
|
+ }
|
|
|
+ interval.Reset(p.Interval)
|
|
|
+ }
|
|
|
+ case <-received:
|
|
|
+ if p.Count > 0 && p.PacketsRecv >= p.Count {
|
|
|
+ return last_send_time, nil
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
+// func (p *Pinger) ping(pinfo *mpinfo, received chan interface{}) error {
|
|
|
+
|
|
|
+// sleeptime := time.Duration(0)
|
|
|
+// for sleeptime >= 0 {
|
|
|
+// time.Sleep(sleeptime)
|
|
|
+// lastpingtimemutex.Lock()
|
|
|
+// alastpingtime := lastpingtime[p.ipaddr.String()]
|
|
|
+// sleeptime = ping_interval - time.Since(alastpingtime)
|
|
|
+// if sleeptime <= 0 {
|
|
|
+// lastpingtime[p.ipaddr.String()] = time.Now()
|
|
|
+// } else {
|
|
|
+// // logger.Error(fmt.Sprint("ping", p.addr, "[", p.ipaddr.String(), "]", "同一地址至少间隔一秒"))
|
|
|
+// }
|
|
|
+// lastpingtimemutex.Unlock()
|
|
|
+// }
|
|
|
+// defer func() {
|
|
|
+// lastpingtimemutex.Lock()
|
|
|
+// lastpingtime[p.ipaddr.String()] = time.Now()
|
|
|
+// lastpingtimemutex.Unlock()
|
|
|
+// }()
|
|
|
+
|
|
|
+// return MPConn(p.ipv4, p.protocol).Ping(pinfo)
|
|
|
+// }
|
|
|
+
|
|
|
func (p *Pinger) Stop() {
|
|
|
p.lock.Lock()
|
|
|
defer p.lock.Unlock()
|