peer.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. package main
  2. import (
  3. "log"
  4. "bytes"
  5. "encoding/gob"
  6. "github.com/weaveworks/mesh"
  7. )
  8. // Peer encapsulates state and implements mesh.Gossiper.
  9. // It should be passed to mesh.Router.NewGossip,
  10. // and the resulting Gossip registered in turn,
  11. // before calling mesh.Router.Start.
  12. type peer struct {
  13. st *state
  14. send mesh.Gossip
  15. actions chan<- func()
  16. quit chan struct{}
  17. logger *log.Logger
  18. }
  19. // peer implements mesh.Gossiper.
  20. var _ mesh.Gossiper = &peer{}
  21. // Construct a peer with empty state.
  22. // Be sure to register a channel, later,
  23. // so we can make outbound communication.
  24. func newPeer(self mesh.PeerName, logger *log.Logger) *peer {
  25. actions := make(chan func())
  26. p := &peer{
  27. st: newState(self),
  28. send: nil, // must .register() later
  29. actions: actions,
  30. quit: make(chan struct{}),
  31. logger: logger,
  32. }
  33. go p.loop(actions)
  34. return p
  35. }
  36. func (p *peer) loop(actions <-chan func()) {
  37. for {
  38. select {
  39. case f := <-actions:
  40. f()
  41. case <-p.quit:
  42. return
  43. }
  44. }
  45. }
  46. // register the result of a mesh.Router.NewGossip.
  47. func (p *peer) register(send mesh.Gossip) {
  48. p.actions <- func() { p.send = send }
  49. }
  50. // Return the current value of the counter.
  51. func (p *peer) get() int {
  52. return p.st.get()
  53. }
  54. // Increment the counter by one.
  55. func (p *peer) incr() (result int) {
  56. c := make(chan struct{})
  57. p.actions <- func() {
  58. defer close(c)
  59. st := p.st.incr()
  60. if p.send != nil {
  61. p.send.GossipBroadcast(st)
  62. } else {
  63. p.logger.Printf("no sender configured; not broadcasting update right now")
  64. }
  65. result = st.get()
  66. }
  67. <-c
  68. return result
  69. }
  70. func (p *peer) stop() {
  71. close(p.quit)
  72. }
  73. // Return a copy of our complete state.
  74. func (p *peer) Gossip() (complete mesh.GossipData) {
  75. complete = p.st.copy()
  76. p.logger.Printf("Gossip => complete %v", complete.(*state).set)
  77. return complete
  78. }
  79. // Merge the gossiped data represented by buf into our state.
  80. // Return the state information that was modified.
  81. func (p *peer) OnGossip(buf []byte) (delta mesh.GossipData, err error) {
  82. var set map[mesh.PeerName]int
  83. if err := gob.NewDecoder(bytes.NewReader(buf)).Decode(&set); err != nil {
  84. return nil, err
  85. }
  86. delta = p.st.mergeDelta(set)
  87. if delta == nil {
  88. p.logger.Printf("OnGossip %v => delta %v", set, delta)
  89. } else {
  90. p.logger.Printf("OnGossip %v => delta %v", set, delta.(*state).set)
  91. }
  92. return delta, nil
  93. }
  94. // Merge the gossiped data represented by buf into our state.
  95. // Return the state information that was modified.
  96. func (p *peer) OnGossipBroadcast(src mesh.PeerName, buf []byte) (received mesh.GossipData, err error) {
  97. var set map[mesh.PeerName]int
  98. if err := gob.NewDecoder(bytes.NewReader(buf)).Decode(&set); err != nil {
  99. return nil, err
  100. }
  101. received = p.st.mergeReceived(set)
  102. if received == nil {
  103. p.logger.Printf("OnGossipBroadcast %s %v => delta %v", src, set, received)
  104. } else {
  105. p.logger.Printf("OnGossipBroadcast %s %v => delta %v", src, set, received.(*state).set)
  106. }
  107. return received, nil
  108. }
  109. // Merge the gossiped data represented by buf into our state.
  110. func (p *peer) OnGossipUnicast(src mesh.PeerName, buf []byte) error {
  111. var set map[mesh.PeerName]int
  112. if err := gob.NewDecoder(bytes.NewReader(buf)).Decode(&set); err != nil {
  113. return err
  114. }
  115. complete := p.st.mergeComplete(set)
  116. p.logger.Printf("OnGossipUnicast %s %v => complete %v", src, set, complete)
  117. return nil
  118. }