/* This package is used to provide an implementation of the Chord network protocol. */ package chord import ( "crypto/sha1" "fmt" "hash" "time" ) // Implements the methods needed for a Chord ring type Transport interface { // Gets a list of the Vnodes on the box ListVnodes(string) ([]*Vnode, error) // Ping a Vnode, check for liveness Ping(*Vnode) (bool, error) // Request a nodes predecessor GetPredecessor(*Vnode) (*Vnode, error) // Notify our successor of ourselves Notify(target, self *Vnode) ([]*Vnode, error) // Find a successor FindSuccessors(*Vnode, int, []byte) ([]*Vnode, error) // Clears a predecessor if it matches a given vnode. Used to leave. ClearPredecessor(target, self *Vnode) error // Instructs a node to skip a given successor. Used to leave. SkipSuccessor(target, self *Vnode) error // Register for an RPC callbacks Register(*Vnode, VnodeRPC) } // These are the methods to invoke on the registered Vnodes type VnodeRPC interface { GetPredecessor() (*Vnode, error) Notify(*Vnode) ([]*Vnode, error) FindSuccessors(int, []byte) ([]*Vnode, error) ClearPredecessor(*Vnode) error SkipSuccessor(*Vnode) error } // Delegate to notify on ring events type Delegate interface { NewPredecessor(local, remoteNew, remotePrev *Vnode) Leaving(local, pred, succ *Vnode) PredecessorLeaving(local, remote *Vnode) SuccessorLeaving(local, remote *Vnode) Shutdown() } // Configuration for Chord nodes type Config struct { Hostname string // Local host name NumVnodes int // Number of Vnodes per physical node HashFunc func() hash.Hash // Hash function to use StabilizeMin time.Duration // Minimum stabilization time StabilizeMax time.Duration // Maximum stabilization time NumSuccessors int // Number of Successors to maintain Delegate Delegate // Invoked to handle ring events HashBits int // Bit size of the hash function } // Represents an Vnode, local or remote type Vnode struct { Id []byte // Virtual ID Host string // Host identifier } // Represents a local Vnode type LocalVnode struct { Vnode Ring *Ring Successors []*Vnode Finger []*Vnode LastFinger int Predecessor *Vnode Stabilized time.Time Timer *time.Timer } // Stores the state required for a Chord ring type Ring struct { Config *Config Transport Transport Vnodes []*LocalVnode delegateCh chan func() ChanShutdown chan bool } // Returns the default Ring configuration func DefaultConfig(hostname string) *Config { return &Config{ hostname, 8, // 8 Vnodes sha1.New, // SHA1 time.Duration(15 * time.Second), time.Duration(45 * time.Second), 8, // 8 Successors nil, // No delegate 160, // 160bit hash function } } // Creates a new Chord ring given the config and transport func Create(conf *Config, trans Transport) (*Ring, error) { // Initialize the hash bits conf.HashBits = conf.HashFunc().Size() * 8 // Create and initialize a ring ring := &Ring{} ring.Init(conf, trans) ring.SetLocalSuccessors() ring.Schedule() return ring, nil } // Joins an existing Chord ring func Join(conf *Config, trans Transport, existing string) (*Ring, error) { // Initialize the hash bits conf.HashBits = conf.HashFunc().Size() * 8 // Request a list of Vnodes from the remote host hosts, err := trans.ListVnodes(existing) if err != nil { return nil, err } if hosts == nil || len(hosts) == 0 { return nil, fmt.Errorf("Remote host has no Vnodes!") } // Create a ring ring := &Ring{} ring.Init(conf, trans) // Acquire a live successor for each Vnode for _, vn := range ring.Vnodes { // Get the nearest remote vnode nearest := NearestVnodeToKey(hosts, vn.Id) // Query for a list of Successors to this Vnode succs, err := trans.FindSuccessors(nearest, conf.NumSuccessors, vn.Id) if err != nil { return nil, fmt.Errorf("Failed to find successor for Vnodes! Got %s", err) } if succs == nil || len(succs) == 0 { return nil, fmt.Errorf("Failed to find successor for Vnodes! Got no Vnodes!") } // Assign the Successors for idx, s := range succs { vn.Successors[idx] = s } } // Start delegate handler if ring.Config.Delegate != nil { go ring.DelegateHandler() } // Do a fast stabilization, will schedule regular execution for _, vn := range ring.Vnodes { vn.Stabilize() } return ring, nil } // Leaves a given Chord ring and shuts down the local Vnodes func (r *Ring) Leave() error { // Shutdown the Vnodes first to avoid further stabilization runs r.StopVnodes() // Instruct each vnode to leave var err error for _, vn := range r.Vnodes { err = MergeErrors(err, vn.Leave()) } // Wait for the delegate callbacks to complete r.StopDelegate() return err } // Shutdown shuts down the local processes in a given Chord ring // Blocks until all the Vnodes terminate. func (r *Ring) Shutdown() { r.StopVnodes() r.StopDelegate() } // Does a key lookup for up to N Successors of a key func (r *Ring) Lookup(n int, key []byte) ([]*Vnode, error) { // Ensure that n is sane if n > r.Config.NumSuccessors { return nil, fmt.Errorf("Cannot ask for more Successors than NumSuccessors!") } // Hash the key h := r.Config.HashFunc() h.Write(key) key_hash := h.Sum(nil) // Find the nearest local vnode nearest := r.NearestVnode(key_hash) // Use the nearest node for the lookup Successors, err := nearest.FindSuccessors(n, key_hash) if err != nil { return nil, err } // Trim the nil Successors for Successors[len(Successors)-1] == nil { Successors = Successors[:len(Successors)-1] } return Successors, nil }