|
@@ -1,678 +0,0 @@
|
|
-package chord
|
|
|
|
-
|
|
|
|
-import (
|
|
|
|
- "crypto/sha1"
|
|
|
|
- "fmt"
|
|
|
|
- "hash"
|
|
|
|
- "math/big"
|
|
|
|
- "sync"
|
|
|
|
- "time"
|
|
|
|
- "trial/achord/models"
|
|
|
|
-
|
|
|
|
- "golang.org/x/net/context"
|
|
|
|
- "google.golang.org/grpc"
|
|
|
|
-)
|
|
|
|
-
|
|
|
|
-func DefaultConfig() *Config {
|
|
|
|
- n := &Config{
|
|
|
|
- Hash: sha1.New,
|
|
|
|
- DialOpts: make([]grpc.DialOption, 0, 5),
|
|
|
|
- }
|
|
|
|
- // n.HashSize = n.Hash().Size()
|
|
|
|
- n.HashSize = n.Hash().Size() * 8
|
|
|
|
-
|
|
|
|
- n.DialOpts = append(n.DialOpts,
|
|
|
|
- grpc.WithBlock(),
|
|
|
|
- grpc.WithTimeout(5*time.Second),
|
|
|
|
- grpc.FailOnNonTempDialError(true),
|
|
|
|
- grpc.WithInsecure(),
|
|
|
|
- )
|
|
|
|
- return n
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-type Config struct {
|
|
|
|
- Id string
|
|
|
|
- Addr string
|
|
|
|
-
|
|
|
|
- ServerOpts []grpc.ServerOption
|
|
|
|
- DialOpts []grpc.DialOption
|
|
|
|
-
|
|
|
|
- Hash func() hash.Hash // Hash function to use
|
|
|
|
- HashSize int
|
|
|
|
-
|
|
|
|
- StabilizeMin time.Duration // Minimum stabilization time
|
|
|
|
- StabilizeMax time.Duration // Maximum stabilization time
|
|
|
|
-
|
|
|
|
- Timeout time.Duration
|
|
|
|
- MaxIdle time.Duration
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-func (c *Config) Validate() error {
|
|
|
|
- // hashsize shouldnt be less than hash func size
|
|
|
|
- return nil
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-func NewInode(id string, addr string) *models.Node {
|
|
|
|
- h := sha1.New()
|
|
|
|
- if _, err := h.Write([]byte(id)); err != nil {
|
|
|
|
- return nil
|
|
|
|
- }
|
|
|
|
- val := h.Sum(nil)
|
|
|
|
-
|
|
|
|
- return &models.Node{
|
|
|
|
- Id: val,
|
|
|
|
- Addr: addr,
|
|
|
|
- }
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-/*
|
|
|
|
-NewNode creates a new Chord node. Returns error if node already
|
|
|
|
-exists in the chord ring
|
|
|
|
-*/
|
|
|
|
-func NewNode(cnf *Config, joinNode *models.Node) (*Node, error) {
|
|
|
|
- if err := cnf.Validate(); err != nil {
|
|
|
|
- return nil, err
|
|
|
|
- }
|
|
|
|
- node := &Node{
|
|
|
|
- Node: new(models.Node),
|
|
|
|
- shutdownCh: make(chan struct{}),
|
|
|
|
- cnf: cnf,
|
|
|
|
- storage: NewMapStore(cnf.Hash),
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- var nID string
|
|
|
|
- if cnf.Id != "" {
|
|
|
|
- nID = cnf.Id
|
|
|
|
- } else {
|
|
|
|
- nID = cnf.Addr
|
|
|
|
- }
|
|
|
|
- id, err := node.hashKey(nID)
|
|
|
|
- if err != nil {
|
|
|
|
- return nil, err
|
|
|
|
- }
|
|
|
|
- aInt := (&big.Int{}).SetBytes(id)
|
|
|
|
-
|
|
|
|
- fmt.Printf("new node id %d, \n", aInt)
|
|
|
|
-
|
|
|
|
- node.Node.Id = id
|
|
|
|
- node.Node.Addr = cnf.Addr
|
|
|
|
-
|
|
|
|
- // Populate finger table
|
|
|
|
- node.fingerTable = newFingerTable(node.Node, cnf.HashSize)
|
|
|
|
-
|
|
|
|
- // Start RPC server
|
|
|
|
- transport, err := NewGrpcTransport(cnf)
|
|
|
|
- if err != nil {
|
|
|
|
- return nil, err
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- node.transport = transport
|
|
|
|
-
|
|
|
|
- models.RegisterChordServer(transport.server, node)
|
|
|
|
-
|
|
|
|
- node.transport.Start()
|
|
|
|
-
|
|
|
|
- if err := node.join(joinNode); err != nil {
|
|
|
|
- return nil, err
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Peridoically stabilize the node.
|
|
|
|
- go func() {
|
|
|
|
- ticker := time.NewTicker(1 * time.Second)
|
|
|
|
- for {
|
|
|
|
- select {
|
|
|
|
- case <-ticker.C:
|
|
|
|
- node.stabilize()
|
|
|
|
- case <-node.shutdownCh:
|
|
|
|
- ticker.Stop()
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }()
|
|
|
|
-
|
|
|
|
- // Peridoically fix finger tables.
|
|
|
|
- go func() {
|
|
|
|
- next := 0
|
|
|
|
- ticker := time.NewTicker(100 * time.Millisecond)
|
|
|
|
- for {
|
|
|
|
- select {
|
|
|
|
- case <-ticker.C:
|
|
|
|
- next = node.fixFinger(next)
|
|
|
|
- case <-node.shutdownCh:
|
|
|
|
- ticker.Stop()
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }()
|
|
|
|
-
|
|
|
|
- // Peridoically checkes whether predecessor has failed.
|
|
|
|
-
|
|
|
|
- go func() {
|
|
|
|
- ticker := time.NewTicker(10 * time.Second)
|
|
|
|
- for {
|
|
|
|
- select {
|
|
|
|
- case <-ticker.C:
|
|
|
|
- node.checkPredecessor()
|
|
|
|
- case <-node.shutdownCh:
|
|
|
|
- ticker.Stop()
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }()
|
|
|
|
-
|
|
|
|
- return node, nil
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-type Node struct {
|
|
|
|
- *models.Node
|
|
|
|
-
|
|
|
|
- cnf *Config
|
|
|
|
-
|
|
|
|
- predecessor *models.Node
|
|
|
|
- predMtx sync.RWMutex
|
|
|
|
-
|
|
|
|
- successor *models.Node
|
|
|
|
- succMtx sync.RWMutex
|
|
|
|
-
|
|
|
|
- shutdownCh chan struct{}
|
|
|
|
-
|
|
|
|
- fingerTable fingerTable
|
|
|
|
- ftMtx sync.RWMutex
|
|
|
|
-
|
|
|
|
- storage Storage
|
|
|
|
- stMtx sync.RWMutex
|
|
|
|
-
|
|
|
|
- transport Transport
|
|
|
|
- tsMtx sync.RWMutex
|
|
|
|
-
|
|
|
|
- lastStablized time.Time
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-func (n *Node) hashKey(key string) ([]byte, error) {
|
|
|
|
- h := n.cnf.Hash()
|
|
|
|
- if _, err := h.Write([]byte(key)); err != nil {
|
|
|
|
- return nil, err
|
|
|
|
- }
|
|
|
|
- val := h.Sum(nil)
|
|
|
|
- return val, nil
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-func (n *Node) join(joinNode *models.Node) error {
|
|
|
|
- // First check if node already present in the circle
|
|
|
|
- // Join this node to the same chord ring as parent
|
|
|
|
- var foo *models.Node
|
|
|
|
- // // Ask if our id already exists on the ring.
|
|
|
|
- if joinNode != nil {
|
|
|
|
- remoteNode, err := n.findSuccessorRPC(joinNode, n.Id)
|
|
|
|
- if err != nil {
|
|
|
|
- return err
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if isEqual(remoteNode.Id, n.Id) {
|
|
|
|
- return ERR_NODE_EXISTS
|
|
|
|
- }
|
|
|
|
- foo = joinNode
|
|
|
|
- } else {
|
|
|
|
- foo = n.Node
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- succ, err := n.findSuccessorRPC(foo, n.Id)
|
|
|
|
- if err != nil {
|
|
|
|
- return err
|
|
|
|
- }
|
|
|
|
- n.succMtx.Lock()
|
|
|
|
- n.successor = succ
|
|
|
|
- n.succMtx.Unlock()
|
|
|
|
-
|
|
|
|
- return nil
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-/*
|
|
|
|
- Public storage implementation
|
|
|
|
-*/
|
|
|
|
-
|
|
|
|
-func (n *Node) Find(key string) (*models.Node, error) {
|
|
|
|
- return n.locate(key)
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-func (n *Node) Get(key string) ([]byte, error) {
|
|
|
|
- return n.get(key)
|
|
|
|
-}
|
|
|
|
-func (n *Node) Set(key, value string) error {
|
|
|
|
- return n.set(key, value)
|
|
|
|
-}
|
|
|
|
-func (n *Node) Delete(key string) error {
|
|
|
|
- return n.delete(key)
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-/*
|
|
|
|
-Finds the node for the key
|
|
|
|
-*/
|
|
|
|
-func (n *Node) locate(key string) (*models.Node, error) {
|
|
|
|
- id, err := n.hashKey(key)
|
|
|
|
- if err != nil {
|
|
|
|
- return nil, err
|
|
|
|
- }
|
|
|
|
- succ, err := n.findSuccessor(id)
|
|
|
|
- return succ, err
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-func (n *Node) get(key string) ([]byte, error) {
|
|
|
|
- node, err := n.locate(key)
|
|
|
|
- if err != nil {
|
|
|
|
- return nil, err
|
|
|
|
- }
|
|
|
|
- val, err := n.getKeyRPC(node, key)
|
|
|
|
- if err != nil {
|
|
|
|
- return nil, err
|
|
|
|
- }
|
|
|
|
- return val.Value, nil
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-func (n *Node) set(key, value string) error {
|
|
|
|
- node, err := n.locate(key)
|
|
|
|
- if err != nil {
|
|
|
|
- return err
|
|
|
|
- }
|
|
|
|
- err = n.setKeyRPC(node, key, value)
|
|
|
|
- return err
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-func (n *Node) delete(key string) error {
|
|
|
|
- node, err := n.locate(key)
|
|
|
|
- if err != nil {
|
|
|
|
- return err
|
|
|
|
- }
|
|
|
|
- err = n.deleteKeyRPC(node, key)
|
|
|
|
- return err
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-func (n *Node) transferKeys(pred, succ *models.Node) {
|
|
|
|
-
|
|
|
|
- keys, err := n.requestKeys(pred, succ)
|
|
|
|
- if len(keys) > 0 {
|
|
|
|
- fmt.Println("transfering: ", keys, err)
|
|
|
|
- }
|
|
|
|
- delKeyList := make([]string, 0, 10)
|
|
|
|
- // store the keys in current node
|
|
|
|
- for _, item := range keys {
|
|
|
|
- if item == nil {
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- n.storage.Set(item.Key, item.Value)
|
|
|
|
- delKeyList = append(delKeyList, item.Key)
|
|
|
|
- }
|
|
|
|
- // delete the keys from the successor node, as current node
|
|
|
|
- // is responsible for the keys
|
|
|
|
- if len(delKeyList) > 0 {
|
|
|
|
- n.deleteKeys(succ, delKeyList)
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-func (n *Node) moveKeysFromLocal(pred, succ *models.Node) {
|
|
|
|
-
|
|
|
|
- keys, err := n.storage.Between(pred.Id, succ.Id)
|
|
|
|
- if len(keys) > 0 {
|
|
|
|
- fmt.Println("transfering: ", keys, succ, err)
|
|
|
|
- }
|
|
|
|
- delKeyList := make([]string, 0, 10)
|
|
|
|
- // store the keys in current node
|
|
|
|
- for _, item := range keys {
|
|
|
|
- if item == nil {
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- err := n.setKeyRPC(succ, item.Key, item.Value)
|
|
|
|
- if err != nil {
|
|
|
|
- fmt.Println("error transfering key: ", item.Key, succ.Addr)
|
|
|
|
- }
|
|
|
|
- delKeyList = append(delKeyList, item.Key)
|
|
|
|
- }
|
|
|
|
- // delete the keys from the successor node, as current node
|
|
|
|
- // is responsible for the keys
|
|
|
|
- if len(delKeyList) > 0 {
|
|
|
|
- n.deleteKeys(succ, delKeyList)
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-func (n *Node) deleteKeys(node *models.Node, keys []string) error {
|
|
|
|
- return n.deleteKeysRPC(node, keys)
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-// When a new node joins, it requests keys from it's successor
|
|
|
|
-func (n *Node) requestKeys(pred, succ *models.Node) ([]*models.KV, error) {
|
|
|
|
-
|
|
|
|
- if isEqual(n.Id, succ.Id) {
|
|
|
|
- return nil, nil
|
|
|
|
- }
|
|
|
|
- return n.requestKeysRPC(
|
|
|
|
- succ, pred.Id, n.Id,
|
|
|
|
- )
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-/*
|
|
|
|
-Fig 5 implementation for find_succesor
|
|
|
|
-First check if key present in local table, if not
|
|
|
|
-then look for how to travel in the ring
|
|
|
|
-*/
|
|
|
|
-func (n *Node) findSuccessor(id []byte) (*models.Node, error) {
|
|
|
|
- // Check if lock is needed throughout the process
|
|
|
|
- n.succMtx.RLock()
|
|
|
|
- defer n.succMtx.RUnlock()
|
|
|
|
- curr := n.Node
|
|
|
|
- succ := n.successor
|
|
|
|
-
|
|
|
|
- if succ == nil {
|
|
|
|
- return curr, nil
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- var err error
|
|
|
|
-
|
|
|
|
- if betweenRightIncl(id, curr.Id, succ.Id) {
|
|
|
|
- return succ, nil
|
|
|
|
- } else {
|
|
|
|
- pred := n.closestPrecedingNode(id)
|
|
|
|
- /*
|
|
|
|
- NOT SURE ABOUT THIS, RECHECK from paper!!!
|
|
|
|
- if preceeding node and current node are the same,
|
|
|
|
- store the key on this node
|
|
|
|
- */
|
|
|
|
-
|
|
|
|
- if isEqual(pred.Id, n.Id) {
|
|
|
|
- succ, err = n.getSuccessorRPC(pred)
|
|
|
|
- if err != nil {
|
|
|
|
- return nil, err
|
|
|
|
- }
|
|
|
|
- if succ == nil {
|
|
|
|
- // not able to wrap around, current node is the successor
|
|
|
|
- return pred, nil
|
|
|
|
- }
|
|
|
|
- return succ, nil
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- succ, err := n.findSuccessorRPC(pred, id)
|
|
|
|
- // fmt.Println("successor to closest node ", succ, err)
|
|
|
|
- if err != nil {
|
|
|
|
- return nil, err
|
|
|
|
- }
|
|
|
|
- if succ == nil {
|
|
|
|
- // not able to wrap around, current node is the successor
|
|
|
|
- return curr, nil
|
|
|
|
- }
|
|
|
|
- return succ, nil
|
|
|
|
-
|
|
|
|
- }
|
|
|
|
- return nil, nil
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-// Fig 5 implementation for closest_preceding_node
|
|
|
|
-func (n *Node) closestPrecedingNode(id []byte) *models.Node {
|
|
|
|
- n.predMtx.RLock()
|
|
|
|
- defer n.predMtx.RUnlock()
|
|
|
|
-
|
|
|
|
- curr := n.Node
|
|
|
|
-
|
|
|
|
- m := len(n.fingerTable) - 1
|
|
|
|
- for i := m; i >= 0; i-- {
|
|
|
|
- f := n.fingerTable[i]
|
|
|
|
- if f == nil || f.Node == nil {
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- if between(f.Id, curr.Id, id) {
|
|
|
|
- return f.Node
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- return curr
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-/*
|
|
|
|
- Periodic functions implementation
|
|
|
|
-*/
|
|
|
|
-
|
|
|
|
-func (n *Node) stabilize() {
|
|
|
|
-
|
|
|
|
- n.succMtx.RLock()
|
|
|
|
- succ := n.successor
|
|
|
|
- if succ == nil {
|
|
|
|
- n.succMtx.RUnlock()
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- n.succMtx.RUnlock()
|
|
|
|
-
|
|
|
|
- x, err := n.getPredecessorRPC(succ)
|
|
|
|
- if err != nil || x == nil {
|
|
|
|
- fmt.Println("error getting predecessor, ", err, x)
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- if x.Id != nil && between(x.Id, n.Id, succ.Id) {
|
|
|
|
- n.succMtx.Lock()
|
|
|
|
- n.successor = x
|
|
|
|
- n.succMtx.Unlock()
|
|
|
|
- }
|
|
|
|
- n.notifyRPC(succ, n.Node)
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-func (n *Node) checkPredecessor() {
|
|
|
|
- // implement using rpc func
|
|
|
|
- n.predMtx.RLock()
|
|
|
|
- pred := n.predecessor
|
|
|
|
- n.predMtx.RUnlock()
|
|
|
|
-
|
|
|
|
- if pred != nil {
|
|
|
|
- err := n.transport.CheckPredecessor(pred)
|
|
|
|
- if err != nil {
|
|
|
|
- fmt.Println("predecessor failed!", err)
|
|
|
|
- n.predMtx.Lock()
|
|
|
|
- n.predecessor = nil
|
|
|
|
- n.predMtx.Unlock()
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-/*
|
|
|
|
- RPC callers implementation
|
|
|
|
-*/
|
|
|
|
-
|
|
|
|
-// getSuccessorRPC the successor ID of a remote node.
|
|
|
|
-func (n *Node) getSuccessorRPC(node *models.Node) (*models.Node, error) {
|
|
|
|
- return n.transport.GetSuccessor(node)
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-// setSuccessorRPC sets the successor of a given node.
|
|
|
|
-func (n *Node) setSuccessorRPC(node *models.Node, succ *models.Node) error {
|
|
|
|
- return n.transport.SetSuccessor(node, succ)
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-// findSuccessorRPC finds the successor node of a given ID in the entire ring.
|
|
|
|
-func (n *Node) findSuccessorRPC(node *models.Node, id []byte) (*models.Node, error) {
|
|
|
|
- return n.transport.FindSuccessor(node, id)
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-// getSuccessorRPC the successor ID of a remote node.
|
|
|
|
-func (n *Node) getPredecessorRPC(node *models.Node) (*models.Node, error) {
|
|
|
|
- return n.transport.GetPredecessor(node)
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-// setPredecessorRPC sets the predecessor of a given node.
|
|
|
|
-func (n *Node) setPredecessorRPC(node *models.Node, pred *models.Node) error {
|
|
|
|
- return n.transport.SetPredecessor(node, pred)
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-// notifyRPC notifies a remote node that pred is its predecessor.
|
|
|
|
-func (n *Node) notifyRPC(node, pred *models.Node) error {
|
|
|
|
- return n.transport.Notify(node, pred)
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-func (n *Node) getKeyRPC(node *models.Node, key string) (*models.GetResponse, error) {
|
|
|
|
- return n.transport.GetKey(node, key)
|
|
|
|
-}
|
|
|
|
-func (n *Node) setKeyRPC(node *models.Node, key, value string) error {
|
|
|
|
- return n.transport.SetKey(node, key, value)
|
|
|
|
-}
|
|
|
|
-func (n *Node) deleteKeyRPC(node *models.Node, key string) error {
|
|
|
|
- return n.transport.DeleteKey(node, key)
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-func (n *Node) requestKeysRPC(
|
|
|
|
- node *models.Node, from []byte, to []byte,
|
|
|
|
-) ([]*models.KV, error) {
|
|
|
|
- return n.transport.RequestKeys(node, from, to)
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-func (n *Node) deleteKeysRPC(
|
|
|
|
- node *models.Node, keys []string,
|
|
|
|
-) error {
|
|
|
|
- return n.transport.DeleteKeys(node, keys)
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-/*
|
|
|
|
- RPC interface implementation
|
|
|
|
-*/
|
|
|
|
-
|
|
|
|
-// GetSuccessor gets the successor on the node..
|
|
|
|
-func (n *Node) GetSuccessor(ctx context.Context, r *models.ER) (*models.Node, error) {
|
|
|
|
- n.succMtx.RLock()
|
|
|
|
- succ := n.successor
|
|
|
|
- n.succMtx.RUnlock()
|
|
|
|
- if succ == nil {
|
|
|
|
- return emptyNode, nil
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- return succ, nil
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-// SetSuccessor sets the successor on the node..
|
|
|
|
-func (n *Node) SetSuccessor(ctx context.Context, succ *models.Node) (*models.ER, error) {
|
|
|
|
- n.succMtx.Lock()
|
|
|
|
- n.successor = succ
|
|
|
|
- n.succMtx.Unlock()
|
|
|
|
- return emptyRequest, nil
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-// SetPredecessor sets the predecessor on the node..
|
|
|
|
-func (n *Node) SetPredecessor(ctx context.Context, pred *models.Node) (*models.ER, error) {
|
|
|
|
- n.predMtx.Lock()
|
|
|
|
- n.predecessor = pred
|
|
|
|
- n.predMtx.Unlock()
|
|
|
|
- return emptyRequest, nil
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-func (n *Node) FindSuccessor(ctx context.Context, id *models.ID) (*models.Node, error) {
|
|
|
|
- succ, err := n.findSuccessor(id.Id)
|
|
|
|
- if err != nil {
|
|
|
|
- return nil, err
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if succ == nil {
|
|
|
|
- return nil, ERR_NO_SUCCESSOR
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- return succ, nil
|
|
|
|
-
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-func (n *Node) CheckPredecessor(ctx context.Context, id *models.ID) (*models.ER, error) {
|
|
|
|
- return emptyRequest, nil
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-func (n *Node) GetPredecessor(ctx context.Context, r *models.ER) (*models.Node, error) {
|
|
|
|
- n.predMtx.RLock()
|
|
|
|
- pred := n.predecessor
|
|
|
|
- n.predMtx.RUnlock()
|
|
|
|
- if pred == nil {
|
|
|
|
- return emptyNode, nil
|
|
|
|
- }
|
|
|
|
- return pred, nil
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-func (n *Node) Notify(ctx context.Context, node *models.Node) (*models.ER, error) {
|
|
|
|
- n.predMtx.Lock()
|
|
|
|
- defer n.predMtx.Unlock()
|
|
|
|
- var prevPredNode *models.Node
|
|
|
|
-
|
|
|
|
- pred := n.predecessor
|
|
|
|
- if pred == nil || between(node.Id, pred.Id, n.Id) {
|
|
|
|
- // fmt.Println("setting predecessor", n.Id, node.Id)
|
|
|
|
- if n.predecessor != nil {
|
|
|
|
- prevPredNode = n.predecessor
|
|
|
|
- }
|
|
|
|
- n.predecessor = node
|
|
|
|
-
|
|
|
|
- // transfer keys from parent node
|
|
|
|
- if prevPredNode != nil {
|
|
|
|
- if between(n.predecessor.Id, prevPredNode.Id, n.Id) {
|
|
|
|
- n.transferKeys(prevPredNode, n.predecessor)
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- return emptyRequest, nil
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-func (n *Node) XGet(ctx context.Context, req *models.GetRequest) (*models.GetResponse, error) {
|
|
|
|
- n.stMtx.RLock()
|
|
|
|
- defer n.stMtx.RUnlock()
|
|
|
|
- val, err := n.storage.Get(req.Key)
|
|
|
|
- if err != nil {
|
|
|
|
- return emptyGetResponse, err
|
|
|
|
- }
|
|
|
|
- return &models.GetResponse{Value: val}, nil
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-func (n *Node) XSet(ctx context.Context, req *models.SetRequest) (*models.SetResponse, error) {
|
|
|
|
- n.stMtx.Lock()
|
|
|
|
- defer n.stMtx.Unlock()
|
|
|
|
- fmt.Println("setting key on ", n.Node.Addr, req.Key, req.Value)
|
|
|
|
- err := n.storage.Set(req.Key, req.Value)
|
|
|
|
- return emptySetResponse, err
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-func (n *Node) XDelete(ctx context.Context, req *models.DeleteRequest) (*models.DeleteResponse, error) {
|
|
|
|
- n.stMtx.Lock()
|
|
|
|
- defer n.stMtx.Unlock()
|
|
|
|
- err := n.storage.Delete(req.Key)
|
|
|
|
- return emptyDeleteResponse, err
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-func (n *Node) XRequestKeys(ctx context.Context, req *models.RequestKeysRequest) (*models.RequestKeysResponse, error) {
|
|
|
|
- n.stMtx.RLock()
|
|
|
|
- defer n.stMtx.RUnlock()
|
|
|
|
- val, err := n.storage.Between(req.From, req.To)
|
|
|
|
- if err != nil {
|
|
|
|
- return emptyRequestKeysResponse, err
|
|
|
|
- }
|
|
|
|
- return &models.RequestKeysResponse{Values: val}, nil
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-func (n *Node) XMultiDelete(ctx context.Context, req *models.MultiDeleteRequest) (*models.DeleteResponse, error) {
|
|
|
|
- n.stMtx.Lock()
|
|
|
|
- defer n.stMtx.Unlock()
|
|
|
|
- err := n.storage.MDelete(req.Keys...)
|
|
|
|
- return emptyDeleteResponse, err
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-func (n *Node) Stop() {
|
|
|
|
- close(n.shutdownCh)
|
|
|
|
-
|
|
|
|
- // Notify successor to change its predecessor pointer to our predecessor.
|
|
|
|
- // Do nothing if we are our own successor (i.e. we are the only node in the
|
|
|
|
- // ring).
|
|
|
|
- n.succMtx.RLock()
|
|
|
|
- succ := n.successor
|
|
|
|
- n.succMtx.RUnlock()
|
|
|
|
-
|
|
|
|
- n.predMtx.RLock()
|
|
|
|
- pred := n.predecessor
|
|
|
|
- n.predMtx.RUnlock()
|
|
|
|
-
|
|
|
|
- if n.Node.Addr != succ.Addr && pred != nil {
|
|
|
|
- n.moveKeysFromLocal(pred, succ)
|
|
|
|
- predErr := n.setPredecessorRPC(succ, pred)
|
|
|
|
- succErr := n.setSuccessorRPC(pred, succ)
|
|
|
|
- fmt.Println("stop errors: ", predErr, succErr)
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- n.transport.Stop()
|
|
|
|
-}
|
|
|