| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224 | /*This package is used to provide an implementation of theChord network protocol.*/package chordimport (	"crypto/sha1"	"fmt"	"hash"	"time")// Implements the methods needed for a Chord ringtype Transport interface {	// Gets a list of the Vnodes on the box	ListVnodes(string) ([]*Vnode, error)	// Ping a Vnode, check for liveness	Ping(*Vnode) (bool, error)	// Request a nodes predecessor	GetPredecessor(*Vnode) (*Vnode, error)	// Notify our successor of ourselves	Notify(target, self *Vnode) ([]*Vnode, error)	// Find a successor	FindSuccessors(*Vnode, int, []byte) ([]*Vnode, error)	// Clears a predecessor if it matches a given vnode. Used to leave.	ClearPredecessor(target, self *Vnode) error	// Instructs a node to skip a given successor. Used to leave.	SkipSuccessor(target, self *Vnode) error	// Register for an RPC callbacks	Register(*Vnode, VnodeRPC)}// These are the methods to invoke on the registered Vnodestype VnodeRPC interface {	GetPredecessor() (*Vnode, error)	Notify(*Vnode) ([]*Vnode, error)	FindSuccessors(int, []byte) ([]*Vnode, error)	ClearPredecessor(*Vnode) error	SkipSuccessor(*Vnode) error}// Delegate to notify on ring eventstype Delegate interface {	NewPredecessor(local, remoteNew, remotePrev *Vnode)	Leaving(local, pred, succ *Vnode)	PredecessorLeaving(local, remote *Vnode)	SuccessorLeaving(local, remote *Vnode)	Shutdown()}// Configuration for Chord nodestype Config struct {	Hostname      string           // Local host name	NumVnodes     int              // Number of Vnodes per physical node	HashFunc      func() hash.Hash // Hash function to use	StabilizeMin  time.Duration    // Minimum stabilization time	StabilizeMax  time.Duration    // Maximum stabilization time	NumSuccessors int              // Number of Successors to maintain	Delegate      Delegate         // Invoked to handle ring events	HashBits      int              // Bit size of the hash function}// Represents an Vnode, local or remotetype Vnode struct {	Id   []byte // Virtual ID	Host string // Host identifier}// Represents a local Vnodetype LocalVnode struct {	Vnode	Ring        *Ring	Successors  []*Vnode	Finger      []*Vnode	LastFinger  int	Predecessor *Vnode	Stabilized  time.Time	Timer       *time.Timer}// Stores the state required for a Chord ringtype Ring struct {	Config       *Config	Transport    Transport	Vnodes       []*LocalVnode	delegateCh   chan func()	ChanShutdown chan bool}// Returns the default Ring configurationfunc DefaultConfig(hostname string) *Config {	return &Config{		hostname,		8,        // 8 Vnodes		sha1.New, // SHA1		time.Duration(15 * time.Second),		time.Duration(45 * time.Second),		8,   // 8 Successors		nil, // No delegate		160, // 160bit hash function	}}// Creates a new Chord ring given the config and transportfunc Create(conf *Config, trans Transport) (*Ring, error) {	// Initialize the hash bits	conf.HashBits = conf.HashFunc().Size() * 8	// Create and initialize a ring	ring := &Ring{}	ring.Init(conf, trans)	ring.SetLocalSuccessors()	ring.Schedule()	return ring, nil}// Joins an existing Chord ringfunc Join(conf *Config, trans Transport, existing string) (*Ring, error) {	// Initialize the hash bits	conf.HashBits = conf.HashFunc().Size() * 8	// Request a list of Vnodes from the remote host	hosts, err := trans.ListVnodes(existing)	if err != nil {		return nil, err	}	if hosts == nil || len(hosts) == 0 {		return nil, fmt.Errorf("Remote host has no Vnodes!")	}	// Create a ring	ring := &Ring{}	ring.Init(conf, trans)	// Acquire a live successor for each Vnode	for _, vn := range ring.Vnodes {		// Get the nearest remote vnode		nearest := NearestVnodeToKey(hosts, vn.Id)		// Query for a list of Successors to this Vnode		succs, err := trans.FindSuccessors(nearest, conf.NumSuccessors, vn.Id)		if err != nil {			return nil, fmt.Errorf("Failed to find successor for Vnodes! Got %s", err)		}		if succs == nil || len(succs) == 0 {			return nil, fmt.Errorf("Failed to find successor for Vnodes! Got no Vnodes!")		}		// Assign the Successors		for idx, s := range succs {			vn.Successors[idx] = s		}	}	// Start delegate handler	if ring.Config.Delegate != nil {		go ring.DelegateHandler()	}	// Do a fast stabilization, will schedule regular execution	for _, vn := range ring.Vnodes {		vn.Stabilize()	}	return ring, nil}// Leaves a given Chord ring and shuts down the local Vnodesfunc (r *Ring) Leave() error {	// Shutdown the Vnodes first to avoid further stabilization runs	r.StopVnodes()	// Instruct each vnode to leave	var err error	for _, vn := range r.Vnodes {		err = MergeErrors(err, vn.Leave())	}	// Wait for the delegate callbacks to complete	r.StopDelegate()	return err}// Shutdown shuts down the local processes in a given Chord ring// Blocks until all the Vnodes terminate.func (r *Ring) Shutdown() {	r.StopVnodes()	r.StopDelegate()}// Does a key lookup for up to N Successors of a keyfunc (r *Ring) Lookup(n int, key []byte) ([]*Vnode, error) {	// Ensure that n is sane	if n > r.Config.NumSuccessors {		return nil, fmt.Errorf("Cannot ask for more Successors than NumSuccessors!")	}	// Hash the key	h := r.Config.HashFunc()	h.Write(key)	key_hash := h.Sum(nil)	// Find the nearest local vnode	nearest := r.NearestVnode(key_hash)	// Use the nearest node for the lookup	Successors, err := nearest.FindSuccessors(n, key_hash)	if err != nil {		return nil, err	}	// Trim the nil Successors	for Successors[len(Successors)-1] == nil {		Successors = Successors[:len(Successors)-1]	}	return Successors, nil}
 |