package chord import ( "encoding/binary" "fmt" "log" "time" ) // Converts the ID to string func (vn *Vnode) String() string { return fmt.Sprintf("%x", vn.Id) } // Initializes a local vnode func (vn *LocalVnode) Init(idx int) { // Generate an ID vn.GenId(uint16(idx)) // Set our host vn.Host = vn.Ring.Config.Hostname // Initialize all state vn.Successors = make([]*Vnode, vn.Ring.Config.NumSuccessors) vn.Finger = make([]*Vnode, vn.Ring.Config.HashBits) // Register with the RPC mechanism vn.Ring.Transport.Register(&vn.Vnode, vn) } // Schedules the Vnode to do regular maintenence func (vn *LocalVnode) Schedule() { // Setup our stabilize timer vn.Timer = time.AfterFunc(RandStabilize(vn.Ring.Config), vn.Stabilize) } // Generates an ID for the node func (vn *LocalVnode) GenId(idx uint16) { // Use the hash funciton conf := vn.Ring.Config hash := conf.HashFunc() hash.Write([]byte(conf.Hostname)) binary.Write(hash, binary.BigEndian, idx) // Use the hash as the ID vn.Id = hash.Sum(nil) } // Called to periodically stabilize the vnode func (vn *LocalVnode) Stabilize() { // Clear the timer vn.Timer = nil // Check for shutdown if vn.Ring.ChanShutdown != nil { vn.Ring.ChanShutdown <- true return } // Setup the next stabilize timer defer vn.Schedule() // Check for new successor if err := vn.CheckNewSuccessor(); err != nil { log.Printf("[ERR] Error checking for new successor: %s", err) } // Notify the successor if err := vn.NotifySuccessor(); err != nil { log.Printf("[ERR] Error notifying successor: %s", err) } // Finger table fix up if err := vn.FixFingerTable(); err != nil { log.Printf("[ERR] Error fixing finger table: %s", err) } // Check the predecessor if err := vn.CheckPredecessor(); err != nil { log.Printf("[ERR] Error checking predecessor: %s", err) } // Set the last stabilized time vn.Stabilized = time.Now() } // Checks for a new successor func (vn *LocalVnode) CheckNewSuccessor() error { // Ask our successor for it's predecessor trans := vn.Ring.Transport CHECK_NEW_SUC: succ := vn.Successors[0] if succ == nil { panic("Node has no successor!") } maybe_suc, err := trans.GetPredecessor(succ) if err != nil { // Check if we have succ list, try to contact next live succ known := vn.KnownSuccessors() if known > 1 { for i := 0; i < known; i++ { if alive, _ := trans.Ping(vn.Successors[0]); !alive { // Don't eliminate the last successor we know of if i+1 == known { return fmt.Errorf("All known Successors dead!") } // Advance the Successors list past the dead one copy(vn.Successors[0:], vn.Successors[1:]) vn.Successors[known-1-i] = nil } else { // Found live successor, check for new one goto CHECK_NEW_SUC } } } return err } // Check if we should replace our successor if maybe_suc != nil && Between(vn.Id, succ.Id, maybe_suc.Id) { // Check if new successor is alive before switching alive, err := trans.Ping(maybe_suc) if alive && err == nil { copy(vn.Successors[1:], vn.Successors[0:len(vn.Successors)-1]) vn.Successors[0] = maybe_suc } else { return err } } return nil } // RPC: Invoked to return out predecessor func (vn *LocalVnode) GetPredecessor() (*Vnode, error) { return vn.Predecessor, nil } // Notifies our successor of us, updates successor list func (vn *LocalVnode) NotifySuccessor() error { // Notify successor succ := vn.Successors[0] succ_list, err := vn.Ring.Transport.Notify(succ, &vn.Vnode) if err != nil { return err } // Trim the Successors list if too long max_succ := vn.Ring.Config.NumSuccessors if len(succ_list) > max_succ-1 { succ_list = succ_list[:max_succ-1] } // Update local Successors list for idx, s := range succ_list { if s == nil { break } // Ensure we don't set ourselves as a successor! if s == nil || s.String() == vn.String() { break } vn.Successors[idx+1] = s } return nil } // RPC: Notify is invoked when a Vnode gets notified func (vn *LocalVnode) Notify(maybe_pred *Vnode) ([]*Vnode, error) { // Check if we should update our predecessor if vn.Predecessor == nil || Between(vn.Predecessor.Id, vn.Id, maybe_pred.Id) { // Inform the delegate conf := vn.Ring.Config old := vn.Predecessor vn.Ring.InvokeDelegate(func() { conf.Delegate.NewPredecessor(&vn.Vnode, maybe_pred, old) }) vn.Predecessor = maybe_pred } // Return our Successors list return vn.Successors, nil } // Fixes up the finger table func (vn *LocalVnode) FixFingerTable() error { // Determine the offset hb := vn.Ring.Config.HashBits offset := PowerOffset(vn.Id, vn.LastFinger, hb) // Find the successor nodes, err := vn.FindSuccessors(1, offset) if nodes == nil || len(nodes) == 0 || err != nil { return err } node := nodes[0] // Update the finger table vn.Finger[vn.LastFinger] = node // Try to skip as many finger entries as possible for { next := vn.LastFinger + 1 if next >= hb { break } offset := PowerOffset(vn.Id, next, hb) // While the node is the successor, update the finger entries if BetweenRightIncl(vn.Id, node.Id, offset) { vn.Finger[next] = node vn.LastFinger = next } else { break } } // Increment to the index to repair if vn.LastFinger+1 == hb { vn.LastFinger = 0 } else { vn.LastFinger++ } return nil } // Checks the health of our predecessor func (vn *LocalVnode) CheckPredecessor() error { // Check predecessor if vn.Predecessor != nil { res, err := vn.Ring.Transport.Ping(vn.Predecessor) if err != nil { return err } // Predecessor is dead if !res { vn.Predecessor = nil } } return nil } // Finds next N Successors. N must be <= NumSuccessors func (vn *LocalVnode) FindSuccessors(n int, key []byte) ([]*Vnode, error) { // Check if we are the immediate predecessor if BetweenRightIncl(vn.Id, vn.Successors[0].Id, key) { return vn.Successors[:n], nil } // Try the closest preceeding nodes cp := ClosestPreceedingVnodeIterator{} cp.Init(vn, key) for { // Get the next closest node closest := cp.Next() if closest == nil { break } // Try that node, break on success res, err := vn.Ring.Transport.FindSuccessors(closest, n, key) if err == nil { return res, nil } else { log.Printf("[ERR] Failed to contact %s. Got %s", closest.String(), err) } } // Determine how many Successors we know of Successors := vn.KnownSuccessors() // Check if the ID is between us and any non-immediate Successors for i := 1; i <= Successors-n; i++ { if BetweenRightIncl(vn.Id, vn.Successors[i].Id, key) { remain := vn.Successors[i:] if len(remain) > n { remain = remain[:n] } return remain, nil } } // Checked all closer nodes and our Successors! return nil, fmt.Errorf("Exhausted all preceeding nodes!") } // Instructs the vnode to leave func (vn *LocalVnode) Leave() error { // Inform the delegate we are leaving conf := vn.Ring.Config pred := vn.Predecessor succ := vn.Successors[0] vn.Ring.InvokeDelegate(func() { conf.Delegate.Leaving(&vn.Vnode, pred, succ) }) // Notify predecessor to advance to their next successor var err error trans := vn.Ring.Transport if vn.Predecessor != nil { err = trans.SkipSuccessor(vn.Predecessor, &vn.Vnode) } // Notify successor to clear old predecessor err = MergeErrors(err, trans.ClearPredecessor(vn.Successors[0], &vn.Vnode)) return err } // Used to clear our predecessor when a node is leaving func (vn *LocalVnode) ClearPredecessor(p *Vnode) error { if vn.Predecessor != nil && vn.Predecessor.String() == p.String() { // Inform the delegate conf := vn.Ring.Config old := vn.Predecessor vn.Ring.InvokeDelegate(func() { conf.Delegate.PredecessorLeaving(&vn.Vnode, old) }) vn.Predecessor = nil } return nil } // Used to skip a successor when a node is leaving func (vn *LocalVnode) SkipSuccessor(s *Vnode) error { // Skip if we have a match if vn.Successors[0].String() == s.String() { // Inform the delegate conf := vn.Ring.Config old := vn.Successors[0] vn.Ring.InvokeDelegate(func() { conf.Delegate.SuccessorLeaving(&vn.Vnode, old) }) known := vn.KnownSuccessors() copy(vn.Successors[0:], vn.Successors[1:]) vn.Successors[known-1] = nil } return nil } // Determine how many Successors we know of func (vn *LocalVnode) KnownSuccessors() (Successors int) { for i := 0; i < len(vn.Successors); i++ { if vn.Successors[i] != nil { Successors = i + 1 } } return }