| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- package main
- import (
- "bytes"
- "sync"
- "encoding/gob"
- "github.com/weaveworks/mesh"
- )
- // state is an implementation of a G-counter.
- type state struct {
- mtx sync.RWMutex
- set map[mesh.PeerName]int
- self mesh.PeerName
- }
- // state implements GossipData.
- var _ mesh.GossipData = &state{}
- // Construct an empty state object, ready to receive updates.
- // This is suitable to use at program start.
- // Other peers will populate us with data.
- func newState(self mesh.PeerName) *state {
- return &state{
- set: map[mesh.PeerName]int{},
- self: self,
- }
- }
- func (st *state) get() (result int) {
- st.mtx.RLock()
- defer st.mtx.RUnlock()
- for _, v := range st.set {
- result += v
- }
- return result
- }
- func (st *state) incr() (complete *state) {
- st.mtx.Lock()
- defer st.mtx.Unlock()
- st.set[st.self]++
- return &state{
- set: st.set,
- }
- }
- func (st *state) copy() *state {
- st.mtx.RLock()
- defer st.mtx.RUnlock()
- return &state{
- set: st.set,
- }
- }
- // Encode serializes our complete state to a slice of byte-slices.
- // In this simple example, we use a single gob-encoded
- // buffer: see https://golang.org/pkg/encoding/gob/
- func (st *state) Encode() [][]byte {
- st.mtx.RLock()
- defer st.mtx.RUnlock()
- var buf bytes.Buffer
- if err := gob.NewEncoder(&buf).Encode(st.set); err != nil {
- panic(err)
- }
- return [][]byte{buf.Bytes()}
- }
- // Merge merges the other GossipData into this one,
- // and returns our resulting, complete state.
- func (st *state) Merge(other mesh.GossipData) (complete mesh.GossipData) {
- return st.mergeComplete(other.(*state).copy().set)
- }
- // Merge the set into our state, abiding increment-only semantics.
- // Return a non-nil mesh.GossipData representation of the received set.
- func (st *state) mergeReceived(set map[mesh.PeerName]int) (received mesh.GossipData) {
- st.mtx.Lock()
- defer st.mtx.Unlock()
- for peer, v := range set {
- if v <= st.set[peer] {
- delete(set, peer) // optimization: make the forwarded data smaller
- continue
- }
- st.set[peer] = v
- }
- return &state{
- set: set, // all remaining elements were novel to us
- }
- }
- // Merge the set into our state, abiding increment-only semantics.
- // Return any key/values that have been mutated, or nil if nothing changed.
- func (st *state) mergeDelta(set map[mesh.PeerName]int) (delta mesh.GossipData) {
- st.mtx.Lock()
- defer st.mtx.Unlock()
- for peer, v := range set {
- if v <= st.set[peer] {
- delete(set, peer) // requirement: it's not part of a delta
- continue
- }
- st.set[peer] = v
- }
- if len(set) <= 0 {
- return nil // per OnGossip requirements
- }
- return &state{
- set: set, // all remaining elements were novel to us
- }
- }
- // Merge the set into our state, abiding increment-only semantics.
- // Return our resulting, complete state.
- func (st *state) mergeComplete(set map[mesh.PeerName]int) (complete mesh.GossipData) {
- st.mtx.Lock()
- defer st.mtx.Unlock()
- for peer, v := range set {
- if v > st.set[peer] {
- st.set[peer] = v
- }
- }
- return &state{
- set: st.set, // n.b. can't .copy() due to lock contention
- }
- }
|