|
@@ -0,0 +1,678 @@
|
|
|
+package chord
|
|
|
+
|
|
|
+import (
|
|
|
+ "crypto/sha1"
|
|
|
+ "fmt"
|
|
|
+ "hash"
|
|
|
+ "math/big"
|
|
|
+ "sync"
|
|
|
+ "time"
|
|
|
+ "trial/achord/models"
|
|
|
+
|
|
|
+ "golang.org/x/net/context"
|
|
|
+ "google.golang.org/grpc"
|
|
|
+)
|
|
|
+
|
|
|
+func DefaultConfig() *Config {
|
|
|
+ n := &Config{
|
|
|
+ Hash: sha1.New,
|
|
|
+ DialOpts: make([]grpc.DialOption, 0, 5),
|
|
|
+ }
|
|
|
+ // n.HashSize = n.Hash().Size()
|
|
|
+ n.HashSize = n.Hash().Size() * 8
|
|
|
+
|
|
|
+ n.DialOpts = append(n.DialOpts,
|
|
|
+ grpc.WithBlock(),
|
|
|
+ grpc.WithTimeout(5*time.Second),
|
|
|
+ grpc.FailOnNonTempDialError(true),
|
|
|
+ grpc.WithInsecure(),
|
|
|
+ )
|
|
|
+ return n
|
|
|
+}
|
|
|
+
|
|
|
+type Config struct {
|
|
|
+ Id string
|
|
|
+ Addr string
|
|
|
+
|
|
|
+ ServerOpts []grpc.ServerOption
|
|
|
+ DialOpts []grpc.DialOption
|
|
|
+
|
|
|
+ Hash func() hash.Hash // Hash function to use
|
|
|
+ HashSize int
|
|
|
+
|
|
|
+ StabilizeMin time.Duration // Minimum stabilization time
|
|
|
+ StabilizeMax time.Duration // Maximum stabilization time
|
|
|
+
|
|
|
+ Timeout time.Duration
|
|
|
+ MaxIdle time.Duration
|
|
|
+}
|
|
|
+
|
|
|
+func (c *Config) Validate() error {
|
|
|
+ // hashsize shouldnt be less than hash func size
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func NewInode(id string, addr string) *models.Node {
|
|
|
+ h := sha1.New()
|
|
|
+ if _, err := h.Write([]byte(id)); err != nil {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ val := h.Sum(nil)
|
|
|
+
|
|
|
+ return &models.Node{
|
|
|
+ Id: val,
|
|
|
+ Addr: addr,
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+/*
|
|
|
+NewNode creates a new Chord node. Returns error if node already
|
|
|
+exists in the chord ring
|
|
|
+*/
|
|
|
+func NewNode(cnf *Config, joinNode *models.Node) (*Node, error) {
|
|
|
+ if err := cnf.Validate(); err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ node := &Node{
|
|
|
+ Node: new(models.Node),
|
|
|
+ shutdownCh: make(chan struct{}),
|
|
|
+ cnf: cnf,
|
|
|
+ storage: NewMapStore(cnf.Hash),
|
|
|
+ }
|
|
|
+
|
|
|
+ var nID string
|
|
|
+ if cnf.Id != "" {
|
|
|
+ nID = cnf.Id
|
|
|
+ } else {
|
|
|
+ nID = cnf.Addr
|
|
|
+ }
|
|
|
+ id, err := node.hashKey(nID)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ aInt := (&big.Int{}).SetBytes(id)
|
|
|
+
|
|
|
+ fmt.Printf("new node id %d, \n", aInt)
|
|
|
+
|
|
|
+ node.Node.Id = id
|
|
|
+ node.Node.Addr = cnf.Addr
|
|
|
+
|
|
|
+ // Populate finger table
|
|
|
+ node.fingerTable = newFingerTable(node.Node, cnf.HashSize)
|
|
|
+
|
|
|
+ // Start RPC server
|
|
|
+ transport, err := NewGrpcTransport(cnf)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ node.transport = transport
|
|
|
+
|
|
|
+ models.RegisterChordServer(transport.server, node)
|
|
|
+
|
|
|
+ node.transport.Start()
|
|
|
+
|
|
|
+ if err := node.join(joinNode); err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ // Peridoically stabilize the node.
|
|
|
+ go func() {
|
|
|
+ ticker := time.NewTicker(1 * time.Second)
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-ticker.C:
|
|
|
+ node.stabilize()
|
|
|
+ case <-node.shutdownCh:
|
|
|
+ ticker.Stop()
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
+ // Peridoically fix finger tables.
|
|
|
+ go func() {
|
|
|
+ next := 0
|
|
|
+ ticker := time.NewTicker(100 * time.Millisecond)
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-ticker.C:
|
|
|
+ next = node.fixFinger(next)
|
|
|
+ case <-node.shutdownCh:
|
|
|
+ ticker.Stop()
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
+ // Peridoically checkes whether predecessor has failed.
|
|
|
+
|
|
|
+ go func() {
|
|
|
+ ticker := time.NewTicker(10 * time.Second)
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-ticker.C:
|
|
|
+ node.checkPredecessor()
|
|
|
+ case <-node.shutdownCh:
|
|
|
+ ticker.Stop()
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
+ return node, nil
|
|
|
+}
|
|
|
+
|
|
|
+type Node struct {
|
|
|
+ *models.Node
|
|
|
+
|
|
|
+ cnf *Config
|
|
|
+
|
|
|
+ predecessor *models.Node
|
|
|
+ predMtx sync.RWMutex
|
|
|
+
|
|
|
+ successor *models.Node
|
|
|
+ succMtx sync.RWMutex
|
|
|
+
|
|
|
+ shutdownCh chan struct{}
|
|
|
+
|
|
|
+ fingerTable fingerTable
|
|
|
+ ftMtx sync.RWMutex
|
|
|
+
|
|
|
+ storage Storage
|
|
|
+ stMtx sync.RWMutex
|
|
|
+
|
|
|
+ transport Transport
|
|
|
+ tsMtx sync.RWMutex
|
|
|
+
|
|
|
+ lastStablized time.Time
|
|
|
+}
|
|
|
+
|
|
|
+func (n *Node) hashKey(key string) ([]byte, error) {
|
|
|
+ h := n.cnf.Hash()
|
|
|
+ if _, err := h.Write([]byte(key)); err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ val := h.Sum(nil)
|
|
|
+ return val, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (n *Node) join(joinNode *models.Node) error {
|
|
|
+ // First check if node already present in the circle
|
|
|
+ // Join this node to the same chord ring as parent
|
|
|
+ var foo *models.Node
|
|
|
+ // // Ask if our id already exists on the ring.
|
|
|
+ if joinNode != nil {
|
|
|
+ remoteNode, err := n.findSuccessorRPC(joinNode, n.Id)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ if isEqual(remoteNode.Id, n.Id) {
|
|
|
+ return ERR_NODE_EXISTS
|
|
|
+ }
|
|
|
+ foo = joinNode
|
|
|
+ } else {
|
|
|
+ foo = n.Node
|
|
|
+ }
|
|
|
+
|
|
|
+ succ, err := n.findSuccessorRPC(foo, n.Id)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ n.succMtx.Lock()
|
|
|
+ n.successor = succ
|
|
|
+ n.succMtx.Unlock()
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+/*
|
|
|
+ Public storage implementation
|
|
|
+*/
|
|
|
+
|
|
|
+func (n *Node) Find(key string) (*models.Node, error) {
|
|
|
+ return n.locate(key)
|
|
|
+}
|
|
|
+
|
|
|
+func (n *Node) Get(key string) ([]byte, error) {
|
|
|
+ return n.get(key)
|
|
|
+}
|
|
|
+func (n *Node) Set(key, value string) error {
|
|
|
+ return n.set(key, value)
|
|
|
+}
|
|
|
+func (n *Node) Delete(key string) error {
|
|
|
+ return n.delete(key)
|
|
|
+}
|
|
|
+
|
|
|
+/*
|
|
|
+Finds the node for the key
|
|
|
+*/
|
|
|
+func (n *Node) locate(key string) (*models.Node, error) {
|
|
|
+ id, err := n.hashKey(key)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ succ, err := n.findSuccessor(id)
|
|
|
+ return succ, err
|
|
|
+}
|
|
|
+
|
|
|
+func (n *Node) get(key string) ([]byte, error) {
|
|
|
+ node, err := n.locate(key)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ val, err := n.getKeyRPC(node, key)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ return val.Value, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (n *Node) set(key, value string) error {
|
|
|
+ node, err := n.locate(key)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ err = n.setKeyRPC(node, key, value)
|
|
|
+ return err
|
|
|
+}
|
|
|
+
|
|
|
+func (n *Node) delete(key string) error {
|
|
|
+ node, err := n.locate(key)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ err = n.deleteKeyRPC(node, key)
|
|
|
+ return err
|
|
|
+}
|
|
|
+
|
|
|
+func (n *Node) transferKeys(pred, succ *models.Node) {
|
|
|
+
|
|
|
+ keys, err := n.requestKeys(pred, succ)
|
|
|
+ if len(keys) > 0 {
|
|
|
+ fmt.Println("transfering: ", keys, err)
|
|
|
+ }
|
|
|
+ delKeyList := make([]string, 0, 10)
|
|
|
+ // store the keys in current node
|
|
|
+ for _, item := range keys {
|
|
|
+ if item == nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ n.storage.Set(item.Key, item.Value)
|
|
|
+ delKeyList = append(delKeyList, item.Key)
|
|
|
+ }
|
|
|
+ // delete the keys from the successor node, as current node
|
|
|
+ // is responsible for the keys
|
|
|
+ if len(delKeyList) > 0 {
|
|
|
+ n.deleteKeys(succ, delKeyList)
|
|
|
+ }
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+func (n *Node) moveKeysFromLocal(pred, succ *models.Node) {
|
|
|
+
|
|
|
+ keys, err := n.storage.Between(pred.Id, succ.Id)
|
|
|
+ if len(keys) > 0 {
|
|
|
+ fmt.Println("transfering: ", keys, succ, err)
|
|
|
+ }
|
|
|
+ delKeyList := make([]string, 0, 10)
|
|
|
+ // store the keys in current node
|
|
|
+ for _, item := range keys {
|
|
|
+ if item == nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ err := n.setKeyRPC(succ, item.Key, item.Value)
|
|
|
+ if err != nil {
|
|
|
+ fmt.Println("error transfering key: ", item.Key, succ.Addr)
|
|
|
+ }
|
|
|
+ delKeyList = append(delKeyList, item.Key)
|
|
|
+ }
|
|
|
+ // delete the keys from the successor node, as current node
|
|
|
+ // is responsible for the keys
|
|
|
+ if len(delKeyList) > 0 {
|
|
|
+ n.deleteKeys(succ, delKeyList)
|
|
|
+ }
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+func (n *Node) deleteKeys(node *models.Node, keys []string) error {
|
|
|
+ return n.deleteKeysRPC(node, keys)
|
|
|
+}
|
|
|
+
|
|
|
+// When a new node joins, it requests keys from it's successor
|
|
|
+func (n *Node) requestKeys(pred, succ *models.Node) ([]*models.KV, error) {
|
|
|
+
|
|
|
+ if isEqual(n.Id, succ.Id) {
|
|
|
+ return nil, nil
|
|
|
+ }
|
|
|
+ return n.requestKeysRPC(
|
|
|
+ succ, pred.Id, n.Id,
|
|
|
+ )
|
|
|
+}
|
|
|
+
|
|
|
+/*
|
|
|
+Fig 5 implementation for find_succesor
|
|
|
+First check if key present in local table, if not
|
|
|
+then look for how to travel in the ring
|
|
|
+*/
|
|
|
+func (n *Node) findSuccessor(id []byte) (*models.Node, error) {
|
|
|
+ // Check if lock is needed throughout the process
|
|
|
+ n.succMtx.RLock()
|
|
|
+ defer n.succMtx.RUnlock()
|
|
|
+ curr := n.Node
|
|
|
+ succ := n.successor
|
|
|
+
|
|
|
+ if succ == nil {
|
|
|
+ return curr, nil
|
|
|
+ }
|
|
|
+
|
|
|
+ var err error
|
|
|
+
|
|
|
+ if betweenRightIncl(id, curr.Id, succ.Id) {
|
|
|
+ return succ, nil
|
|
|
+ } else {
|
|
|
+ pred := n.closestPrecedingNode(id)
|
|
|
+ /*
|
|
|
+ NOT SURE ABOUT THIS, RECHECK from paper!!!
|
|
|
+ if preceeding node and current node are the same,
|
|
|
+ store the key on this node
|
|
|
+ */
|
|
|
+
|
|
|
+ if isEqual(pred.Id, n.Id) {
|
|
|
+ succ, err = n.getSuccessorRPC(pred)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ if succ == nil {
|
|
|
+ // not able to wrap around, current node is the successor
|
|
|
+ return pred, nil
|
|
|
+ }
|
|
|
+ return succ, nil
|
|
|
+ }
|
|
|
+
|
|
|
+ succ, err := n.findSuccessorRPC(pred, id)
|
|
|
+ // fmt.Println("successor to closest node ", succ, err)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ if succ == nil {
|
|
|
+ // not able to wrap around, current node is the successor
|
|
|
+ return curr, nil
|
|
|
+ }
|
|
|
+ return succ, nil
|
|
|
+
|
|
|
+ }
|
|
|
+ return nil, nil
|
|
|
+}
|
|
|
+
|
|
|
+// Fig 5 implementation for closest_preceding_node
|
|
|
+func (n *Node) closestPrecedingNode(id []byte) *models.Node {
|
|
|
+ n.predMtx.RLock()
|
|
|
+ defer n.predMtx.RUnlock()
|
|
|
+
|
|
|
+ curr := n.Node
|
|
|
+
|
|
|
+ m := len(n.fingerTable) - 1
|
|
|
+ for i := m; i >= 0; i-- {
|
|
|
+ f := n.fingerTable[i]
|
|
|
+ if f == nil || f.Node == nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if between(f.Id, curr.Id, id) {
|
|
|
+ return f.Node
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return curr
|
|
|
+}
|
|
|
+
|
|
|
+/*
|
|
|
+ Periodic functions implementation
|
|
|
+*/
|
|
|
+
|
|
|
+func (n *Node) stabilize() {
|
|
|
+
|
|
|
+ n.succMtx.RLock()
|
|
|
+ succ := n.successor
|
|
|
+ if succ == nil {
|
|
|
+ n.succMtx.RUnlock()
|
|
|
+ return
|
|
|
+ }
|
|
|
+ n.succMtx.RUnlock()
|
|
|
+
|
|
|
+ x, err := n.getPredecessorRPC(succ)
|
|
|
+ if err != nil || x == nil {
|
|
|
+ fmt.Println("error getting predecessor, ", err, x)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if x.Id != nil && between(x.Id, n.Id, succ.Id) {
|
|
|
+ n.succMtx.Lock()
|
|
|
+ n.successor = x
|
|
|
+ n.succMtx.Unlock()
|
|
|
+ }
|
|
|
+ n.notifyRPC(succ, n.Node)
|
|
|
+}
|
|
|
+
|
|
|
+func (n *Node) checkPredecessor() {
|
|
|
+ // implement using rpc func
|
|
|
+ n.predMtx.RLock()
|
|
|
+ pred := n.predecessor
|
|
|
+ n.predMtx.RUnlock()
|
|
|
+
|
|
|
+ if pred != nil {
|
|
|
+ err := n.transport.CheckPredecessor(pred)
|
|
|
+ if err != nil {
|
|
|
+ fmt.Println("predecessor failed!", err)
|
|
|
+ n.predMtx.Lock()
|
|
|
+ n.predecessor = nil
|
|
|
+ n.predMtx.Unlock()
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+/*
|
|
|
+ RPC callers implementation
|
|
|
+*/
|
|
|
+
|
|
|
+// getSuccessorRPC the successor ID of a remote node.
|
|
|
+func (n *Node) getSuccessorRPC(node *models.Node) (*models.Node, error) {
|
|
|
+ return n.transport.GetSuccessor(node)
|
|
|
+}
|
|
|
+
|
|
|
+// setSuccessorRPC sets the successor of a given node.
|
|
|
+func (n *Node) setSuccessorRPC(node *models.Node, succ *models.Node) error {
|
|
|
+ return n.transport.SetSuccessor(node, succ)
|
|
|
+}
|
|
|
+
|
|
|
+// findSuccessorRPC finds the successor node of a given ID in the entire ring.
|
|
|
+func (n *Node) findSuccessorRPC(node *models.Node, id []byte) (*models.Node, error) {
|
|
|
+ return n.transport.FindSuccessor(node, id)
|
|
|
+}
|
|
|
+
|
|
|
+// getSuccessorRPC the successor ID of a remote node.
|
|
|
+func (n *Node) getPredecessorRPC(node *models.Node) (*models.Node, error) {
|
|
|
+ return n.transport.GetPredecessor(node)
|
|
|
+}
|
|
|
+
|
|
|
+// setPredecessorRPC sets the predecessor of a given node.
|
|
|
+func (n *Node) setPredecessorRPC(node *models.Node, pred *models.Node) error {
|
|
|
+ return n.transport.SetPredecessor(node, pred)
|
|
|
+}
|
|
|
+
|
|
|
+// notifyRPC notifies a remote node that pred is its predecessor.
|
|
|
+func (n *Node) notifyRPC(node, pred *models.Node) error {
|
|
|
+ return n.transport.Notify(node, pred)
|
|
|
+}
|
|
|
+
|
|
|
+func (n *Node) getKeyRPC(node *models.Node, key string) (*models.GetResponse, error) {
|
|
|
+ return n.transport.GetKey(node, key)
|
|
|
+}
|
|
|
+func (n *Node) setKeyRPC(node *models.Node, key, value string) error {
|
|
|
+ return n.transport.SetKey(node, key, value)
|
|
|
+}
|
|
|
+func (n *Node) deleteKeyRPC(node *models.Node, key string) error {
|
|
|
+ return n.transport.DeleteKey(node, key)
|
|
|
+}
|
|
|
+
|
|
|
+func (n *Node) requestKeysRPC(
|
|
|
+ node *models.Node, from []byte, to []byte,
|
|
|
+) ([]*models.KV, error) {
|
|
|
+ return n.transport.RequestKeys(node, from, to)
|
|
|
+}
|
|
|
+
|
|
|
+func (n *Node) deleteKeysRPC(
|
|
|
+ node *models.Node, keys []string,
|
|
|
+) error {
|
|
|
+ return n.transport.DeleteKeys(node, keys)
|
|
|
+}
|
|
|
+
|
|
|
+/*
|
|
|
+ RPC interface implementation
|
|
|
+*/
|
|
|
+
|
|
|
+// GetSuccessor gets the successor on the node..
|
|
|
+func (n *Node) GetSuccessor(ctx context.Context, r *models.ER) (*models.Node, error) {
|
|
|
+ n.succMtx.RLock()
|
|
|
+ succ := n.successor
|
|
|
+ n.succMtx.RUnlock()
|
|
|
+ if succ == nil {
|
|
|
+ return emptyNode, nil
|
|
|
+ }
|
|
|
+
|
|
|
+ return succ, nil
|
|
|
+}
|
|
|
+
|
|
|
+// SetSuccessor sets the successor on the node..
|
|
|
+func (n *Node) SetSuccessor(ctx context.Context, succ *models.Node) (*models.ER, error) {
|
|
|
+ n.succMtx.Lock()
|
|
|
+ n.successor = succ
|
|
|
+ n.succMtx.Unlock()
|
|
|
+ return emptyRequest, nil
|
|
|
+}
|
|
|
+
|
|
|
+// SetPredecessor sets the predecessor on the node..
|
|
|
+func (n *Node) SetPredecessor(ctx context.Context, pred *models.Node) (*models.ER, error) {
|
|
|
+ n.predMtx.Lock()
|
|
|
+ n.predecessor = pred
|
|
|
+ n.predMtx.Unlock()
|
|
|
+ return emptyRequest, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (n *Node) FindSuccessor(ctx context.Context, id *models.ID) (*models.Node, error) {
|
|
|
+ succ, err := n.findSuccessor(id.Id)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ if succ == nil {
|
|
|
+ return nil, ERR_NO_SUCCESSOR
|
|
|
+ }
|
|
|
+
|
|
|
+ return succ, nil
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+func (n *Node) CheckPredecessor(ctx context.Context, id *models.ID) (*models.ER, error) {
|
|
|
+ return emptyRequest, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (n *Node) GetPredecessor(ctx context.Context, r *models.ER) (*models.Node, error) {
|
|
|
+ n.predMtx.RLock()
|
|
|
+ pred := n.predecessor
|
|
|
+ n.predMtx.RUnlock()
|
|
|
+ if pred == nil {
|
|
|
+ return emptyNode, nil
|
|
|
+ }
|
|
|
+ return pred, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (n *Node) Notify(ctx context.Context, node *models.Node) (*models.ER, error) {
|
|
|
+ n.predMtx.Lock()
|
|
|
+ defer n.predMtx.Unlock()
|
|
|
+ var prevPredNode *models.Node
|
|
|
+
|
|
|
+ pred := n.predecessor
|
|
|
+ if pred == nil || between(node.Id, pred.Id, n.Id) {
|
|
|
+ // fmt.Println("setting predecessor", n.Id, node.Id)
|
|
|
+ if n.predecessor != nil {
|
|
|
+ prevPredNode = n.predecessor
|
|
|
+ }
|
|
|
+ n.predecessor = node
|
|
|
+
|
|
|
+ // transfer keys from parent node
|
|
|
+ if prevPredNode != nil {
|
|
|
+ if between(n.predecessor.Id, prevPredNode.Id, n.Id) {
|
|
|
+ n.transferKeys(prevPredNode, n.predecessor)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ return emptyRequest, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (n *Node) XGet(ctx context.Context, req *models.GetRequest) (*models.GetResponse, error) {
|
|
|
+ n.stMtx.RLock()
|
|
|
+ defer n.stMtx.RUnlock()
|
|
|
+ val, err := n.storage.Get(req.Key)
|
|
|
+ if err != nil {
|
|
|
+ return emptyGetResponse, err
|
|
|
+ }
|
|
|
+ return &models.GetResponse{Value: val}, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (n *Node) XSet(ctx context.Context, req *models.SetRequest) (*models.SetResponse, error) {
|
|
|
+ n.stMtx.Lock()
|
|
|
+ defer n.stMtx.Unlock()
|
|
|
+ fmt.Println("setting key on ", n.Node.Addr, req.Key, req.Value)
|
|
|
+ err := n.storage.Set(req.Key, req.Value)
|
|
|
+ return emptySetResponse, err
|
|
|
+}
|
|
|
+
|
|
|
+func (n *Node) XDelete(ctx context.Context, req *models.DeleteRequest) (*models.DeleteResponse, error) {
|
|
|
+ n.stMtx.Lock()
|
|
|
+ defer n.stMtx.Unlock()
|
|
|
+ err := n.storage.Delete(req.Key)
|
|
|
+ return emptyDeleteResponse, err
|
|
|
+}
|
|
|
+
|
|
|
+func (n *Node) XRequestKeys(ctx context.Context, req *models.RequestKeysRequest) (*models.RequestKeysResponse, error) {
|
|
|
+ n.stMtx.RLock()
|
|
|
+ defer n.stMtx.RUnlock()
|
|
|
+ val, err := n.storage.Between(req.From, req.To)
|
|
|
+ if err != nil {
|
|
|
+ return emptyRequestKeysResponse, err
|
|
|
+ }
|
|
|
+ return &models.RequestKeysResponse{Values: val}, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (n *Node) XMultiDelete(ctx context.Context, req *models.MultiDeleteRequest) (*models.DeleteResponse, error) {
|
|
|
+ n.stMtx.Lock()
|
|
|
+ defer n.stMtx.Unlock()
|
|
|
+ err := n.storage.MDelete(req.Keys...)
|
|
|
+ return emptyDeleteResponse, err
|
|
|
+}
|
|
|
+
|
|
|
+func (n *Node) Stop() {
|
|
|
+ close(n.shutdownCh)
|
|
|
+
|
|
|
+ // Notify successor to change its predecessor pointer to our predecessor.
|
|
|
+ // Do nothing if we are our own successor (i.e. we are the only node in the
|
|
|
+ // ring).
|
|
|
+ n.succMtx.RLock()
|
|
|
+ succ := n.successor
|
|
|
+ n.succMtx.RUnlock()
|
|
|
+
|
|
|
+ n.predMtx.RLock()
|
|
|
+ pred := n.predecessor
|
|
|
+ n.predMtx.RUnlock()
|
|
|
+
|
|
|
+ if n.Node.Addr != succ.Addr && pred != nil {
|
|
|
+ n.moveKeysFromLocal(pred, succ)
|
|
|
+ predErr := n.setPredecessorRPC(succ, pred)
|
|
|
+ succErr := n.setSuccessorRPC(pred, succ)
|
|
|
+ fmt.Println("stop errors: ", predErr, succErr)
|
|
|
+ }
|
|
|
+
|
|
|
+ n.transport.Stop()
|
|
|
+}
|