|
- /*
- This package is used to provide an implementation of the
- Chord network protocol.
- */
- package chord
- import (
- "crypto/sha1"
- "fmt"
- "hash"
- "time"
- )
- // Implements the methods needed for a Chord ring
- type Transport interface {
- // Gets a list of the Vnodes on the box
- ListVnodes(string) ([]*Vnode, error)
- // Ping a Vnode, check for liveness
- Ping(*Vnode) (bool, error)
- // Request a nodes predecessor
- GetPredecessor(*Vnode) (*Vnode, error)
- // Notify our successor of ourselves
- Notify(target, self *Vnode) ([]*Vnode, error)
- // Find a successor
- FindSuccessors(*Vnode, int, []byte) ([]*Vnode, error)
- // Clears a predecessor if it matches a given vnode. Used to leave.
- ClearPredecessor(target, self *Vnode) error
- // Instructs a node to skip a given successor. Used to leave.
- SkipSuccessor(target, self *Vnode) error
- // Register for an RPC callbacks
- Register(*Vnode, VnodeRPC)
- }
- // These are the methods to invoke on the registered Vnodes
- type VnodeRPC interface {
- GetPredecessor() (*Vnode, error)
- Notify(*Vnode) ([]*Vnode, error)
- FindSuccessors(int, []byte) ([]*Vnode, error)
- ClearPredecessor(*Vnode) error
- SkipSuccessor(*Vnode) error
- }
- // Delegate to notify on ring events
- type Delegate interface {
- NewPredecessor(local, remoteNew, remotePrev *Vnode)
- Leaving(local, pred, succ *Vnode)
- PredecessorLeaving(local, remote *Vnode)
- SuccessorLeaving(local, remote *Vnode)
- Shutdown()
- }
- // Configuration for Chord nodes
- type Config struct {
- Hostname string // Local host name
- NumVnodes int // Number of Vnodes per physical node
- HashFunc func() hash.Hash // Hash function to use
- StabilizeMin time.Duration // Minimum stabilization time
- StabilizeMax time.Duration // Maximum stabilization time
- NumSuccessors int // Number of Successors to maintain
- Delegate Delegate // Invoked to handle ring events
- HashBits int // Bit size of the hash function
- }
- // Represents an Vnode, local or remote
- type Vnode struct {
- Id []byte // Virtual ID
- Host string // Host identifier
- }
- // Represents a local Vnode
- type LocalVnode struct {
- Vnode
- Ring *Ring
- Successors []*Vnode
- Finger []*Vnode
- LastFinger int
- Predecessor *Vnode
- Stabilized time.Time
- Timer *time.Timer
- }
- // Stores the state required for a Chord ring
- type Ring struct {
- Config *Config
- Transport Transport
- Vnodes []*LocalVnode
- delegateCh chan func()
- ChanShutdown chan bool
- }
- // Returns the default Ring configuration
- func DefaultConfig(hostname string) *Config {
- return &Config{
- hostname,
- 8, // 8 Vnodes
- sha1.New, // SHA1
- time.Duration(15 * time.Second),
- time.Duration(45 * time.Second),
- 8, // 8 Successors
- nil, // No delegate
- 160, // 160bit hash function
- }
- }
- // Creates a new Chord ring given the config and transport
- func Create(conf *Config, trans Transport) (*Ring, error) {
- // Initialize the hash bits
- conf.HashBits = conf.HashFunc().Size() * 8
- // Create and initialize a ring
- ring := &Ring{}
- ring.Init(conf, trans)
- ring.SetLocalSuccessors()
- ring.Schedule()
- return ring, nil
- }
- // Joins an existing Chord ring
- func Join(conf *Config, trans Transport, existing string) (*Ring, error) {
- // Initialize the hash bits
- conf.HashBits = conf.HashFunc().Size() * 8
- // Request a list of Vnodes from the remote host
- hosts, err := trans.ListVnodes(existing)
- if err != nil {
- return nil, err
- }
- if hosts == nil || len(hosts) == 0 {
- return nil, fmt.Errorf("Remote host has no Vnodes!")
- }
- // Create a ring
- ring := &Ring{}
- ring.Init(conf, trans)
- // Acquire a live successor for each Vnode
- for _, vn := range ring.Vnodes {
- // Get the nearest remote vnode
- nearest := NearestVnodeToKey(hosts, vn.Id)
- // Query for a list of Successors to this Vnode
- succs, err := trans.FindSuccessors(nearest, conf.NumSuccessors, vn.Id)
- if err != nil {
- return nil, fmt.Errorf("Failed to find successor for Vnodes! Got %s", err)
- }
- if succs == nil || len(succs) == 0 {
- return nil, fmt.Errorf("Failed to find successor for Vnodes! Got no Vnodes!")
- }
- // Assign the Successors
- for idx, s := range succs {
- vn.Successors[idx] = s
- }
- }
- // Start delegate handler
- if ring.Config.Delegate != nil {
- go ring.DelegateHandler()
- }
- // Do a fast stabilization, will schedule regular execution
- for _, vn := range ring.Vnodes {
- vn.Stabilize()
- }
- return ring, nil
- }
- // Leaves a given Chord ring and shuts down the local Vnodes
- func (r *Ring) Leave() error {
- // Shutdown the Vnodes first to avoid further stabilization runs
- r.StopVnodes()
- // Instruct each vnode to leave
- var err error
- for _, vn := range r.Vnodes {
- err = MergeErrors(err, vn.Leave())
- }
- // Wait for the delegate callbacks to complete
- r.StopDelegate()
- return err
- }
- // Shutdown shuts down the local processes in a given Chord ring
- // Blocks until all the Vnodes terminate.
- func (r *Ring) Shutdown() {
- r.StopVnodes()
- r.StopDelegate()
- }
- // Does a key lookup for up to N Successors of a key
- func (r *Ring) Lookup(n int, key []byte) ([]*Vnode, error) {
- // Ensure that n is sane
- if n > r.Config.NumSuccessors {
- return nil, fmt.Errorf("Cannot ask for more Successors than NumSuccessors!")
- }
- // Hash the key
- h := r.Config.HashFunc()
- h.Write(key)
- key_hash := h.Sum(nil)
- // Find the nearest local vnode
- nearest := r.NearestVnode(key_hash)
- // Use the nearest node for the lookup
- Successors, err := nearest.FindSuccessors(n, key_hash)
- if err != nil {
- return nil, err
- }
- // Trim the nil Successors
- for Successors[len(Successors)-1] == nil {
- Successors = Successors[:len(Successors)-1]
- }
- return Successors, nil
- }
|