state.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. package main
  2. import (
  3. "bytes"
  4. "sync"
  5. "encoding/gob"
  6. "github.com/weaveworks/mesh"
  7. )
  8. // state is an implementation of a G-counter.
  9. type state struct {
  10. mtx sync.RWMutex
  11. set map[mesh.PeerName]int
  12. self mesh.PeerName
  13. }
  14. // state implements GossipData.
  15. var _ mesh.GossipData = &state{}
  16. // Construct an empty state object, ready to receive updates.
  17. // This is suitable to use at program start.
  18. // Other peers will populate us with data.
  19. func newState(self mesh.PeerName) *state {
  20. return &state{
  21. set: map[mesh.PeerName]int{},
  22. self: self,
  23. }
  24. }
  25. func (st *state) get() (result int) {
  26. st.mtx.RLock()
  27. defer st.mtx.RUnlock()
  28. for _, v := range st.set {
  29. result += v
  30. }
  31. return result
  32. }
  33. func (st *state) incr() (complete *state) {
  34. st.mtx.Lock()
  35. defer st.mtx.Unlock()
  36. st.set[st.self]++
  37. return &state{
  38. set: st.set,
  39. }
  40. }
  41. func (st *state) copy() *state {
  42. st.mtx.RLock()
  43. defer st.mtx.RUnlock()
  44. return &state{
  45. set: st.set,
  46. }
  47. }
  48. // Encode serializes our complete state to a slice of byte-slices.
  49. // In this simple example, we use a single gob-encoded
  50. // buffer: see https://golang.org/pkg/encoding/gob/
  51. func (st *state) Encode() [][]byte {
  52. st.mtx.RLock()
  53. defer st.mtx.RUnlock()
  54. var buf bytes.Buffer
  55. if err := gob.NewEncoder(&buf).Encode(st.set); err != nil {
  56. panic(err)
  57. }
  58. return [][]byte{buf.Bytes()}
  59. }
  60. // Merge merges the other GossipData into this one,
  61. // and returns our resulting, complete state.
  62. func (st *state) Merge(other mesh.GossipData) (complete mesh.GossipData) {
  63. return st.mergeComplete(other.(*state).copy().set)
  64. }
  65. // Merge the set into our state, abiding increment-only semantics.
  66. // Return a non-nil mesh.GossipData representation of the received set.
  67. func (st *state) mergeReceived(set map[mesh.PeerName]int) (received mesh.GossipData) {
  68. st.mtx.Lock()
  69. defer st.mtx.Unlock()
  70. for peer, v := range set {
  71. if v <= st.set[peer] {
  72. delete(set, peer) // optimization: make the forwarded data smaller
  73. continue
  74. }
  75. st.set[peer] = v
  76. }
  77. return &state{
  78. set: set, // all remaining elements were novel to us
  79. }
  80. }
  81. // Merge the set into our state, abiding increment-only semantics.
  82. // Return any key/values that have been mutated, or nil if nothing changed.
  83. func (st *state) mergeDelta(set map[mesh.PeerName]int) (delta mesh.GossipData) {
  84. st.mtx.Lock()
  85. defer st.mtx.Unlock()
  86. for peer, v := range set {
  87. if v <= st.set[peer] {
  88. delete(set, peer) // requirement: it's not part of a delta
  89. continue
  90. }
  91. st.set[peer] = v
  92. }
  93. if len(set) <= 0 {
  94. return nil // per OnGossip requirements
  95. }
  96. return &state{
  97. set: set, // all remaining elements were novel to us
  98. }
  99. }
  100. // Merge the set into our state, abiding increment-only semantics.
  101. // Return our resulting, complete state.
  102. func (st *state) mergeComplete(set map[mesh.PeerName]int) (complete mesh.GossipData) {
  103. st.mtx.Lock()
  104. defer st.mtx.Unlock()
  105. for peer, v := range set {
  106. if v > st.set[peer] {
  107. st.set[peer] = v
  108. }
  109. }
  110. return &state{
  111. set: st.set, // n.b. can't .copy() due to lock contention
  112. }
  113. }