123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353 |
- package chord
- import (
- "encoding/binary"
- "fmt"
- "log"
- "time"
- )
- // Converts the ID to string
- func (vn *Vnode) String() string {
- return fmt.Sprintf("%x", vn.Id)
- }
- // Initializes a local vnode
- func (vn *LocalVnode) Init(idx int) {
- // Generate an ID
- vn.GenId(uint16(idx))
- // Set our host
- vn.Host = vn.Ring.Config.Hostname
- // Initialize all state
- vn.Successors = make([]*Vnode, vn.Ring.Config.NumSuccessors)
- vn.Finger = make([]*Vnode, vn.Ring.Config.HashBits)
- // Register with the RPC mechanism
- vn.Ring.Transport.Register(&vn.Vnode, vn)
- }
- // Schedules the Vnode to do regular maintenence
- func (vn *LocalVnode) Schedule() {
- // Setup our stabilize timer
- vn.Timer = time.AfterFunc(RandStabilize(vn.Ring.Config), vn.Stabilize)
- }
- // Generates an ID for the node
- func (vn *LocalVnode) GenId(idx uint16) {
- // Use the hash funciton
- conf := vn.Ring.Config
- hash := conf.HashFunc()
- hash.Write([]byte(conf.Hostname))
- binary.Write(hash, binary.BigEndian, idx)
- // Use the hash as the ID
- vn.Id = hash.Sum(nil)
- }
- // Called to periodically stabilize the vnode
- func (vn *LocalVnode) Stabilize() {
- // Clear the timer
- vn.Timer = nil
- // Check for shutdown
- if vn.Ring.ChanShutdown != nil {
- vn.Ring.ChanShutdown <- true
- return
- }
- // Setup the next stabilize timer
- defer vn.Schedule()
- // Check for new successor
- if err := vn.CheckNewSuccessor(); err != nil {
- log.Printf("[ERR] Error checking for new successor: %s", err)
- }
- // Notify the successor
- if err := vn.NotifySuccessor(); err != nil {
- log.Printf("[ERR] Error notifying successor: %s", err)
- }
- // Finger table fix up
- if err := vn.FixFingerTable(); err != nil {
- log.Printf("[ERR] Error fixing finger table: %s", err)
- }
- // Check the predecessor
- if err := vn.CheckPredecessor(); err != nil {
- log.Printf("[ERR] Error checking predecessor: %s", err)
- }
- // Set the last stabilized time
- vn.Stabilized = time.Now()
- }
- // Checks for a new successor
- func (vn *LocalVnode) CheckNewSuccessor() error {
- // Ask our successor for it's predecessor
- trans := vn.Ring.Transport
- CHECK_NEW_SUC:
- succ := vn.Successors[0]
- if succ == nil {
- panic("Node has no successor!")
- }
- maybe_suc, err := trans.GetPredecessor(succ)
- if err != nil {
- // Check if we have succ list, try to contact next live succ
- known := vn.KnownSuccessors()
- if known > 1 {
- for i := 0; i < known; i++ {
- if alive, _ := trans.Ping(vn.Successors[0]); !alive {
- // Don't eliminate the last successor we know of
- if i+1 == known {
- return fmt.Errorf("All known Successors dead!")
- }
- // Advance the Successors list past the dead one
- copy(vn.Successors[0:], vn.Successors[1:])
- vn.Successors[known-1-i] = nil
- } else {
- // Found live successor, check for new one
- goto CHECK_NEW_SUC
- }
- }
- }
- return err
- }
- // Check if we should replace our successor
- if maybe_suc != nil && Between(vn.Id, succ.Id, maybe_suc.Id) {
- // Check if new successor is alive before switching
- alive, err := trans.Ping(maybe_suc)
- if alive && err == nil {
- copy(vn.Successors[1:], vn.Successors[0:len(vn.Successors)-1])
- vn.Successors[0] = maybe_suc
- } else {
- return err
- }
- }
- return nil
- }
- // RPC: Invoked to return out predecessor
- func (vn *LocalVnode) GetPredecessor() (*Vnode, error) {
- return vn.Predecessor, nil
- }
- // Notifies our successor of us, updates successor list
- func (vn *LocalVnode) NotifySuccessor() error {
- // Notify successor
- succ := vn.Successors[0]
- succ_list, err := vn.Ring.Transport.Notify(succ, &vn.Vnode)
- if err != nil {
- return err
- }
- // Trim the Successors list if too long
- max_succ := vn.Ring.Config.NumSuccessors
- if len(succ_list) > max_succ-1 {
- succ_list = succ_list[:max_succ-1]
- }
- // Update local Successors list
- for idx, s := range succ_list {
- if s == nil {
- break
- }
- // Ensure we don't set ourselves as a successor!
- if s == nil || s.String() == vn.String() {
- break
- }
- vn.Successors[idx+1] = s
- }
- return nil
- }
- // RPC: Notify is invoked when a Vnode gets notified
- func (vn *LocalVnode) Notify(maybe_pred *Vnode) ([]*Vnode, error) {
- // Check if we should update our predecessor
- if vn.Predecessor == nil || Between(vn.Predecessor.Id, vn.Id, maybe_pred.Id) {
- // Inform the delegate
- conf := vn.Ring.Config
- old := vn.Predecessor
- vn.Ring.InvokeDelegate(func() {
- conf.Delegate.NewPredecessor(&vn.Vnode, maybe_pred, old)
- })
- vn.Predecessor = maybe_pred
- }
- // Return our Successors list
- return vn.Successors, nil
- }
- // Fixes up the finger table
- func (vn *LocalVnode) FixFingerTable() error {
- // Determine the offset
- hb := vn.Ring.Config.HashBits
- offset := PowerOffset(vn.Id, vn.LastFinger, hb)
- // Find the successor
- nodes, err := vn.FindSuccessors(1, offset)
- if nodes == nil || len(nodes) == 0 || err != nil {
- return err
- }
- node := nodes[0]
- // Update the finger table
- vn.Finger[vn.LastFinger] = node
- // Try to skip as many finger entries as possible
- for {
- next := vn.LastFinger + 1
- if next >= hb {
- break
- }
- offset := PowerOffset(vn.Id, next, hb)
- // While the node is the successor, update the finger entries
- if BetweenRightIncl(vn.Id, node.Id, offset) {
- vn.Finger[next] = node
- vn.LastFinger = next
- } else {
- break
- }
- }
- // Increment to the index to repair
- if vn.LastFinger+1 == hb {
- vn.LastFinger = 0
- } else {
- vn.LastFinger++
- }
- return nil
- }
- // Checks the health of our predecessor
- func (vn *LocalVnode) CheckPredecessor() error {
- // Check predecessor
- if vn.Predecessor != nil {
- res, err := vn.Ring.Transport.Ping(vn.Predecessor)
- if err != nil {
- return err
- }
- // Predecessor is dead
- if !res {
- vn.Predecessor = nil
- }
- }
- return nil
- }
- // Finds next N Successors. N must be <= NumSuccessors
- func (vn *LocalVnode) FindSuccessors(n int, key []byte) ([]*Vnode, error) {
- // Check if we are the immediate predecessor
- if BetweenRightIncl(vn.Id, vn.Successors[0].Id, key) {
- return vn.Successors[:n], nil
- }
- // Try the closest preceeding nodes
- cp := ClosestPreceedingVnodeIterator{}
- cp.Init(vn, key)
- for {
- // Get the next closest node
- closest := cp.Next()
- if closest == nil {
- break
- }
- // Try that node, break on success
- res, err := vn.Ring.Transport.FindSuccessors(closest, n, key)
- if err == nil {
- return res, nil
- } else {
- log.Printf("[ERR] Failed to contact %s. Got %s", closest.String(), err)
- }
- }
- // Determine how many Successors we know of
- Successors := vn.KnownSuccessors()
- // Check if the ID is between us and any non-immediate Successors
- for i := 1; i <= Successors-n; i++ {
- if BetweenRightIncl(vn.Id, vn.Successors[i].Id, key) {
- remain := vn.Successors[i:]
- if len(remain) > n {
- remain = remain[:n]
- }
- return remain, nil
- }
- }
- // Checked all closer nodes and our Successors!
- return nil, fmt.Errorf("Exhausted all preceeding nodes!")
- }
- // Instructs the vnode to leave
- func (vn *LocalVnode) Leave() error {
- // Inform the delegate we are leaving
- conf := vn.Ring.Config
- pred := vn.Predecessor
- succ := vn.Successors[0]
- vn.Ring.InvokeDelegate(func() {
- conf.Delegate.Leaving(&vn.Vnode, pred, succ)
- })
- // Notify predecessor to advance to their next successor
- var err error
- trans := vn.Ring.Transport
- if vn.Predecessor != nil {
- err = trans.SkipSuccessor(vn.Predecessor, &vn.Vnode)
- }
- // Notify successor to clear old predecessor
- err = MergeErrors(err, trans.ClearPredecessor(vn.Successors[0], &vn.Vnode))
- return err
- }
- // Used to clear our predecessor when a node is leaving
- func (vn *LocalVnode) ClearPredecessor(p *Vnode) error {
- if vn.Predecessor != nil && vn.Predecessor.String() == p.String() {
- // Inform the delegate
- conf := vn.Ring.Config
- old := vn.Predecessor
- vn.Ring.InvokeDelegate(func() {
- conf.Delegate.PredecessorLeaving(&vn.Vnode, old)
- })
- vn.Predecessor = nil
- }
- return nil
- }
- // Used to skip a successor when a node is leaving
- func (vn *LocalVnode) SkipSuccessor(s *Vnode) error {
- // Skip if we have a match
- if vn.Successors[0].String() == s.String() {
- // Inform the delegate
- conf := vn.Ring.Config
- old := vn.Successors[0]
- vn.Ring.InvokeDelegate(func() {
- conf.Delegate.SuccessorLeaving(&vn.Vnode, old)
- })
- known := vn.KnownSuccessors()
- copy(vn.Successors[0:], vn.Successors[1:])
- vn.Successors[known-1] = nil
- }
- return nil
- }
- // Determine how many Successors we know of
- func (vn *LocalVnode) KnownSuccessors() (Successors int) {
- for i := 0; i < len(vn.Successors); i++ {
- if vn.Successors[i] != nil {
- Successors = i + 1
- }
- }
- return
- }
|