libf 2 年之前
父節點
當前提交
ffe3007ab8
共有 6 個文件被更改,包括 922 次插入100 次删除
  1. 383 1
      ping/hosts.last.txt
  2. 383 1
      ping/hosts.txt
  3. 12 18
      ping/ping.go
  4. 52 34
      ping/probing/icmp/packetconn.go
  5. 27 15
      ping/probing/mpconn.go
  6. 65 31
      ping/probing/ping.go

File diff suppressed because it is too large
+ 383 - 1
ping/hosts.last.txt


File diff suppressed because it is too large
+ 383 - 1
ping/hosts.txt


+ 12 - 18
ping/ping.go

@@ -147,9 +147,11 @@ func main() {
 				}
 				sort.Strings(ks)
 				for i, k := range ks {
-					v := input.statinfo[k]
-					ip := input.ipaddrs[k]
-					s += fmt.Sprintf("%-3d %-20s %15s : %-12s [%12s ~ %-12s] loss %d/%d\n", i, k, ip, v.AvgRtt, v.MinRtt, v.MaxRtt, v.LossCount, v.Count)
+					if i > len(ks)-200 {
+						v := input.statinfo[k]
+						ip := input.ipaddrs[k]
+						s += fmt.Sprintf("%-3d %-20s %15s : %-12s [%12s ~ %-12s] loss %d/%d\n", i, k, ip, v.AvgRtt, v.MinRtt, v.MaxRtt, v.LossCount, v.Count)
+					}
 				}
 				input.statinfomutex.Unlock()
 				logger.Info("统计信息更新:", fmt.Sprint("\n", s))
@@ -157,6 +159,7 @@ func main() {
 					"平均每秒", (int64(pingcount+1) * int64(time.Second) / int64(time.Since(starttime))), "次",
 					"最近一秒", (pingcount - int32(last_count)), "次",
 					"最大缓冲", icmp.MaxReceiveBufferUsed(),
+					"并发失控", problem,
 				)
 				last_count = int(pingcount)
 			}
@@ -248,6 +251,7 @@ func (input *Input) Run() (err error) {
 }
 
 var pingcount int32
+var problem int32
 var starttime = time.Now()
 var printpingcountmutex sync.Mutex
 var printpingcounttime = time.Now()
@@ -304,16 +308,12 @@ func (input *Input) send(t *task, workerNum int) error {
 		var (
 			consecutiveFailures int
 		)
-		var recvMutex sync.Mutex
-		var recvList = map[int]int{}
+		var recvList = make([]int32, pinger.Count)
 
 		pinger.OnSend = func(pkt *probing.Packet) {
-			recvMutex.Lock()
-			if recvList[pkt.Seq] != 0 {
-				println("并发控制有问题")
+			if atomic.AddInt32(&recvList[pkt.Seq], 1) != 1 {
+				atomic.AddInt32(&problem, 1)
 			}
-			recvList[pkt.Seq]++
-			recvMutex.Unlock()
 			input.statinfomutex.Lock()
 			si := input.statinfo[t.Server]
 			if si == nil {
@@ -326,9 +326,7 @@ func (input *Input) send(t *task, workerNum int) error {
 		}
 		pingMsg := fmt.Sprintf("\nWoker %d PING %s (%s):\n", workerNum, pinger.Addr(), pinger.IPAddr())
 		pinger.OnRecv = func(pkt *probing.Packet) {
-			recvMutex.Lock()
-			recvList[pkt.Seq]++
-			recvMutex.Unlock()
+			atomic.AddInt32(&recvList[pkt.Seq], 1)
 			s := fmt.Sprintf("%s %d bytes from %s: icmp_seq=%d time=%v\n",
 				time.Now().Format("15:04:05.000"), pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt)
 			pingMsg += s
@@ -337,9 +335,7 @@ func (input *Input) send(t *task, workerNum int) error {
 			atomic.AddInt32(&pingcount, 1)
 		}
 		pinger.OnDuplicateRecv = func(pkt *probing.Packet) {
-			recvMutex.Lock()
-			recvList[pkt.Seq]++
-			recvMutex.Unlock()
+			atomic.AddInt32(&recvList[pkt.Seq], 1)
 			s := fmt.Sprintf("%s %d bytes from %s: icmp_seq=%d time=%v (DUP!)\n",
 				time.Now().Format("15:04:05.000"), pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt)
 			pingMsg += s
@@ -396,7 +392,6 @@ func (input *Input) send(t *task, workerNum int) error {
 		m["responseTime"] = time.Now().UnixNano() / int64(time.Millisecond)
 
 		var failCount, totalFailCount int
-		recvMutex.Lock()
 		for i := range recvList {
 			if recvList[i] < 2 {
 				failCount++
@@ -415,7 +410,6 @@ func (input *Input) send(t *task, workerNum int) error {
 		if totalFailCount == len(recvList) {
 			message = "ICMP echo failed"
 		}
-		recvMutex.Unlock()
 	}
 
 	// Special

+ 52 - 34
ping/probing/icmp/packetconn.go

@@ -38,19 +38,6 @@ var (
 	ipv6Proto = map[string]string{"icmp": "ip6:ipv6-icmp", "udp": "udp6"}
 )
 
-func listen(ipv4 bool, protocol string, source string) (conn PacketConn, err error) {
-	if ipv4 {
-		var c icmpv4Conn
-		c.c, err = icmp.ListenPacket(ipv4Proto[protocol], source)
-		conn = &c
-	} else {
-		var c icmpV6Conn
-		c.c, err = icmp.ListenPacket(ipv6Proto[protocol], source)
-		conn = &c
-	}
-	return
-}
-
 // Packet represents a received and processed ICMP echo packet.
 type Packet struct {
 	// IPAddr is the address of the host being pinged.
@@ -69,6 +56,8 @@ type Packet struct {
 	SendTime time.Time
 	// Rtt is the round-trip time it took to ping.
 	Rtt time.Duration
+	//
+	TimeoutTimer *time.Timer
 }
 
 type recvPkt struct {
@@ -86,7 +75,7 @@ type MPacketConn struct {
 	Backlog      int
 	TTL          int
 	OnRecvPacket func(pkt *Packet)
-	OnRecvError  func(error)
+	OnError      func(error)
 	mutex        sync.Mutex
 	conn         PacketConn
 	done         chan interface{}
@@ -101,16 +90,11 @@ func (mp *MPacketConn) Listen() error {
 		return nil
 	}
 
-	conn, err := listen(mp.IPV4, mp.Protocol, mp.Source)
+	conn, err := mp.listen()
 	if err != nil {
 		return err
 	}
 
-	conn.SetTTL(mp.TTL)
-	if err := conn.SetFlagTTL(); err != nil {
-		return err
-	}
-
 	mp.done = make(chan interface{})
 	mp.recvbuf = make(chan *recvPkt, mp.Backlog)
 	mp.conn = conn
@@ -120,6 +104,27 @@ func (mp *MPacketConn) Listen() error {
 	return nil
 }
 
+func (mp *MPacketConn) listen() (conn PacketConn, err error) {
+	if mp.IPV4 {
+		var c icmpv4Conn
+		c.c, err = icmp.ListenPacket(ipv4Proto[mp.Protocol], mp.Source)
+		conn = &c
+	} else {
+		var c icmpV6Conn
+		c.c, err = icmp.ListenPacket(ipv6Proto[mp.Protocol], mp.Source)
+		conn = &c
+	}
+	if err != nil {
+		return nil, err
+	}
+	conn.SetTTL(mp.TTL)
+	if err := conn.SetFlagTTL(); err != nil {
+		conn.Close()
+		return nil, err
+	}
+	return conn, nil
+}
+
 func (mp *MPacketConn) Close() error {
 	mp.mutex.Lock()
 	defer mp.mutex.Unlock()
@@ -133,7 +138,13 @@ func (mp *MPacketConn) Close() error {
 	if open {
 		close(mp.done)
 	}
-	return mp.conn.Close()
+
+	if mp.conn != nil {
+		mp.conn.Close()
+		mp.conn = nil
+	}
+
+	return nil
 }
 
 func (mp *MPacketConn) recvICMP() {
@@ -143,10 +154,14 @@ func (mp *MPacketConn) recvICMP() {
 		case <-mp.done:
 			return
 		default:
+			conn := mp.conn
+			if conn == nil {
+				return
+			}
 			var n, ttl int
 			var addr net.Addr
 			var err error
-			n, ttl, addr, err = mp.conn.ReadFrom(bytes)
+			n, ttl, addr, err = conn.ReadFrom(bytes)
 			if err != nil {
 				if neterr, ok := err.(*net.OpError); ok {
 					if neterr.Timeout() {
@@ -154,10 +169,10 @@ func (mp *MPacketConn) recvICMP() {
 						continue
 					}
 				}
-				if mp.OnRecvError != nil {
-					mp.OnRecvError(err)
+				if mp.OnError != nil {
+					mp.OnError(err)
 				} else {
-					fmt.Println(err)
+					fmt.Println("ReadFrom Error:", err)
 				}
 			}
 			bs := make([]byte, n)
@@ -172,7 +187,8 @@ func (mp *MPacketConn) recvICMP() {
 }
 
 func (mp *MPacketConn) SendPacket(pkt *Packet) error {
-	if mp.conn == nil {
+	conn := mp.conn
+	if conn == nil {
 		return ENOLISTENER
 	}
 	var dst net.Addr = pkt.IPAddr
@@ -185,17 +201,18 @@ func (mp *MPacketConn) SendPacket(pkt *Packet) error {
 			return nil
 		default:
 		}
-		msgBytes, err := pkt.BuildEchoRequestMessage(mp.conn.ICMPRequestType())
+		msgBytes, err := pkt.BuildEchoRequestMessage(conn.ICMPRequestType())
 		if err != nil {
 			return err
 		}
-		if _, err := mp.conn.WriteTo(msgBytes, dst); err != nil {
+		if _, err := conn.WriteTo(msgBytes, dst); err != nil {
 			if neterr, ok := err.(*net.OpError); ok {
 				if neterr.Err == syscall.ENOBUFS {
-					if mp.OnRecvError != nil {
-						mp.OnRecvError(neterr.Err)
+					if mp.OnError != nil {
+						mp.OnError(neterr.Err)
 					} else {
-						fmt.Println("缓存不够,发送失败,重发")
+						// 运行时默认忽略缓存不够错误
+						// fmt.Println("缓存不够,发送失败,重发")
 					}
 					continue
 				}
@@ -220,10 +237,11 @@ func (mp *MPacketConn) processRecvPacket() {
 		}
 		err := mp.processPacket(pkt)
 		if err != nil {
-			if mp.OnRecvError != nil {
-				mp.OnRecvError(err)
+			if mp.OnError != nil {
+				mp.OnError(err)
 			} else {
-				fmt.Println(err)
+				// 运行时默认忽略接收数据格式不符错误
+				// fmt.Println(err)
 			}
 		}
 	}

+ 27 - 15
ping/probing/mpconn.go

@@ -39,17 +39,17 @@ type mpingconn struct {
 }
 
 type mpinfo struct {
-	host        string
-	ipaddr      *net.IPAddr
-	id          int
-	lastseq     int
-	size        int
-	timeout     time.Duration
-	seqpkt      map[int]*icmp.Packet
-	OnSend      func(*icmp.Packet)
-	OnRecv      func(*icmp.Packet)
-	OnRecvDup   func(*icmp.Packet)
-	OnRecvError func(error)
+	host          string
+	ipaddr        *net.IPAddr
+	id            int
+	lastseq       int
+	size          int
+	timeout       time.Duration
+	seqpkt        map[int]*icmp.Packet
+	OnSend        func(outpkt *icmp.Packet)
+	OnRecv        func(inpkt *icmp.Packet)
+	OnRecvDup     func(inpkt *icmp.Packet)
+	OnRecvTimeout func(outpkt *icmp.Packet)
 }
 
 var pingid int32
@@ -84,7 +84,7 @@ func newMPConn(ipv4 bool, protocol string) *mpingconn {
 		pingidinfo: make(map[int]*mpinfo),
 	}
 	mpconn.MPacketConn.OnRecvPacket = mpconn.OnRecvPacket
-	mpconn.MPacketConn.OnRecvError = mpconn.OnRecvError
+	mpconn.MPacketConn.OnError = mpconn.OnError
 	return mpconn
 }
 
@@ -117,9 +117,12 @@ func (p *mpingconn) OnRecvPacket(recvpkt *icmp.Packet) {
 	if !bytes.Equal(p.uuid[:], recvpkt.UUID[:]) {
 		return
 	}
-	_, inflight := pinfo.seqpkt[recvpkt.Seq]
+	outpkt, inflight := pinfo.seqpkt[recvpkt.Seq]
 	if inflight {
 		// remove it from the list of sequences we're waiting for so we don't get duplicates.
+		if outpkt.TimeoutTimer != nil {
+			outpkt.TimeoutTimer.Stop()
+		}
 		delete(pinfo.seqpkt, recvpkt.Seq)
 		if pinfo.OnRecv != nil {
 			go pinfo.OnRecv(recvpkt)
@@ -131,7 +134,7 @@ func (p *mpingconn) OnRecvPacket(recvpkt *icmp.Packet) {
 	}
 }
 
-func (p *mpingconn) OnRecvError(err error) {
+func (p *mpingconn) OnError(err error) {
 	logger.Error(err)
 }
 
@@ -159,7 +162,16 @@ func (p *mpingconn) Ping(pinfo *mpinfo) error {
 	if pinfo.OnSend != nil {
 		pinfo.OnSend(outpkt)
 	}
-
+	if pinfo.OnRecvTimeout != nil {
+		outpkt.TimeoutTimer = time.AfterFunc(pinfo.timeout, func() {
+			p.mutex.Lock()
+			outpkt := pinfo.seqpkt[seq]
+			p.mutex.Unlock()
+			if outpkt != nil {
+				pinfo.OnRecvTimeout(outpkt)
+			}
+		})
+	}
 	// fmt.Println("sent", outpkt)
 
 	return nil

+ 65 - 31
ping/probing/ping.go

@@ -53,7 +53,6 @@ package probing
 
 import (
 	"errors"
-	"fmt"
 	"math"
 	"math/rand"
 	"net"
@@ -62,24 +61,24 @@ import (
 	"time"
 	"trial/ping/probing/icmp"
 
-	"github.com/google/uuid"
 	"golang.org/x/sync/errgroup"
 )
 
-var receive_buffer_count = mcfg.GetInt("ping.recv.buf.count", 10)
+var receive_buffer_count = mcfg.GetInt("ping.recv.buf.count", 500)
 var ping_ttl = mcfg.GetInt("ping.ttl", 64)
-var ping_interval = mcfg.GetDuration("ping.interval", 1000*time.Millisecond)
-var concurlimit_ping = mcfg.GetInt("concurlimit.ping", 100)
-var concurchan_ping = make(chan struct{}, concurlimit_ping)
-var lastpingtimemutex sync.Mutex
-var lastpingtime = map[string]time.Time{}
 
-var ETIMEDOUT error = fmt.Errorf("timeout")
+// var ping_interval = mcfg.GetDuration("ping.interval", 1000*time.Millisecond)
+// var concurlimit_ping = mcfg.GetInt("concurlimit.ping", 100)
+// var concurchan_ping = make(chan struct{}, concurlimit_ping)
+// var lastpingtimemutex sync.Mutex
+// var lastpingtime = map[string]time.Time{}
 
-const (
-	timeSliceLength = 8
-	trackerLength   = len(uuid.UUID{})
-)
+// var ETIMEDOUT error = fmt.Errorf("timeout")
+
+// const (
+// 	timeSliceLength = 8
+// 	trackerLength   = len(uuid.UUID{})
+// )
 
 // New returns a new Pinger struct pointer.
 func New(addr string) *Pinger {
@@ -88,7 +87,7 @@ func New(addr string) *Pinger {
 		Count:      -1,
 		Interval:   time.Second,
 		RecordRtts: true,
-		Size:       timeSliceLength + trackerLength,
+		Size:       64, // timeSliceLength + trackerLength,
 		Timeout:    time.Duration(math.MaxInt64),
 
 		addr:     addr,
@@ -247,7 +246,8 @@ func (p *Pinger) updateStatistics(pkt *Packet) {
 
 	p.PacketsRecv++
 	if p.RecordRtts {
-		p.rtts = append(p.rtts, pkt.Rtt)
+		// p.rtts = append(p.rtts, pkt.Rtt)
+		p.rtts[pkt.Seq] = pkt.Rtt
 	}
 
 	if p.PacketsRecv == 1 || pkt.Rtt < p.minRtt {
@@ -425,7 +425,20 @@ func (p *Pinger) Run() (err error) {
 	return
 }
 
+func (p *Pinger) init() *mpinfo {
+	pinfo := getPingInfo(p.ipaddr)
+	pinfo.host = p.addr
+	pinfo.size = p.Size
+	pinfo.timeout = p.Timeout
+	p.sequence_base = pinfo.lastseq
+	if p.RecordRtts {
+		p.rtts = make([]time.Duration, p.Count)
+	}
+	return pinfo
+}
+
 func (p *Pinger) run() (time.Time, error) {
+	pinfo := p.init()
 	defer p.finish()
 
 	err := MPConn(p.ipv4, p.protocol).Listen()
@@ -442,15 +455,14 @@ func (p *Pinger) run() (time.Time, error) {
 
 	g.Go(func() (err error) {
 		defer p.Stop()
-		last_send_time, err = p.runLoop()
+		last_send_time, err = p.runLoop(pinfo)
 		return err
 	})
 
 	return last_send_time, g.Wait()
 }
 
-func (p *Pinger) runLoop() (time.Time, error) {
-
+func (p *Pinger) runLoop(pinfo *mpinfo) (time.Time, error) {
 	timeout := time.NewTimer(p.Timeout)
 	interval := time.NewTimer(0)
 	timeout.Stop()
@@ -461,22 +473,19 @@ func (p *Pinger) runLoop() (time.Time, error) {
 	received := make(chan interface{}, 1)
 
 	last_send_time := time.Now()
-	pinfo := getPingInfo(p.ipaddr)
-	pinfo.host = p.addr
-	pinfo.size = p.Size
-	pinfo.timeout = p.Timeout
 	pinfo.OnSend = func(pkt *icmp.Packet) {
 		last_send_time = pkt.SendTime
-		if p.PacketsSent == 0 {
-			p.sequence_base = pkt.Seq
+		seq := pkt.Seq - p.sequence_base
+		if seq < 0 || seq >= p.Count {
+			return
 		}
 		p.PacketsSent++
 		if p.OnSend != nil {
 			p.OnSend(&Packet{
 				Host:   p.addr,
 				IPAddr: pkt.IPAddr,
-				ID:     pkt.ID,
-				Seq:    pkt.Seq - p.sequence_base,
+				ID:     p.id,
+				Seq:    seq,
 				Nbytes: pkt.Nbytes,
 				TTL:    pkt.TTL,
 				Rtt:    pkt.Rtt,
@@ -484,11 +493,15 @@ func (p *Pinger) runLoop() (time.Time, error) {
 		}
 	}
 	pinfo.OnRecv = func(pkt *icmp.Packet) {
+		seq := pkt.Seq - p.sequence_base
+		if seq < 0 || seq >= p.Count {
+			return
+		}
 		inpkt := &Packet{
 			Host:   p.addr,
 			IPAddr: pkt.IPAddr,
-			ID:     pkt.ID,
-			Seq:    pkt.Seq - p.sequence_base,
+			ID:     p.id,
+			Seq:    seq,
 			Nbytes: pkt.Nbytes,
 			TTL:    pkt.TTL,
 			Rtt:    pkt.Rtt,
@@ -500,11 +513,15 @@ func (p *Pinger) runLoop() (time.Time, error) {
 		received <- nil
 	}
 	pinfo.OnRecvDup = func(pkt *icmp.Packet) {
+		seq := pkt.Seq - p.sequence_base
+		if seq < 0 || seq >= p.Count {
+			return
+		}
 		inpkt := &Packet{
 			Host:   p.addr,
 			IPAddr: pkt.IPAddr,
-			ID:     pkt.ID,
-			Seq:    pkt.Seq - p.sequence_base,
+			ID:     p.id,
+			Seq:    seq,
 			Nbytes: pkt.Nbytes,
 			TTL:    pkt.TTL,
 			Rtt:    pkt.Rtt,
@@ -512,7 +529,24 @@ func (p *Pinger) runLoop() (time.Time, error) {
 		if p.OnDuplicateRecv != nil {
 			p.OnDuplicateRecv(inpkt)
 		}
-		p.updateStatistics(inpkt)
+	}
+	pinfo.OnRecvTimeout = func(pkt *icmp.Packet) {
+		seq := pkt.Seq - p.sequence_base
+		if seq < 0 || seq >= p.Count {
+			return
+		}
+		outpkt := &Packet{
+			Host:   p.addr,
+			IPAddr: pkt.IPAddr,
+			ID:     p.id,
+			Seq:    seq,
+			Nbytes: pkt.Nbytes,
+			TTL:    pkt.TTL,
+			Rtt:    pkt.Rtt,
+		}
+		if p.OnTimeout != nil {
+			p.OnTimeout(outpkt)
+		}
 	}
 	for {
 		select {