|
- package chord
- import (
- "crypto/sha1"
- "fmt"
- "hash"
- "math/big"
- "sync"
- "time"
- "trial/achord/models"
- "golang.org/x/net/context"
- "google.golang.org/grpc"
- )
- func DefaultConfig() *Config {
- n := &Config{
- Hash: sha1.New,
- DialOpts: make([]grpc.DialOption, 0, 5),
- }
- // n.HashSize = n.Hash().Size()
- n.HashSize = n.Hash().Size() * 8
- n.DialOpts = append(n.DialOpts,
- grpc.WithBlock(),
- grpc.WithTimeout(5*time.Second),
- grpc.FailOnNonTempDialError(true),
- grpc.WithInsecure(),
- )
- return n
- }
- type Config struct {
- Id string
- Addr string
- ServerOpts []grpc.ServerOption
- DialOpts []grpc.DialOption
- Hash func() hash.Hash // Hash function to use
- HashSize int
- StabilizeMin time.Duration // Minimum stabilization time
- StabilizeMax time.Duration // Maximum stabilization time
- Timeout time.Duration
- MaxIdle time.Duration
- }
- func (c *Config) Validate() error {
- // hashsize shouldnt be less than hash func size
- return nil
- }
- func NewInode(id string, addr string) *models.Node {
- h := sha1.New()
- if _, err := h.Write([]byte(id)); err != nil {
- return nil
- }
- val := h.Sum(nil)
- return &models.Node{
- Id: val,
- Addr: addr,
- }
- }
- /*
- NewNode creates a new Chord node. Returns error if node already
- exists in the chord ring
- */
- func NewNode(cnf *Config, joinNode *models.Node) (*Node, error) {
- if err := cnf.Validate(); err != nil {
- return nil, err
- }
- node := &Node{
- Node: new(models.Node),
- shutdownCh: make(chan struct{}),
- cnf: cnf,
- storage: NewMapStore(cnf.Hash),
- }
- var nID string
- if cnf.Id != "" {
- nID = cnf.Id
- } else {
- nID = cnf.Addr
- }
- id, err := node.hashKey(nID)
- if err != nil {
- return nil, err
- }
- aInt := (&big.Int{}).SetBytes(id)
- fmt.Printf("new node id %d, \n", aInt)
- node.Node.Id = id
- node.Node.Addr = cnf.Addr
- // Populate finger table
- node.fingerTable = newFingerTable(node.Node, cnf.HashSize)
- // Start RPC server
- transport, err := NewGrpcTransport(cnf)
- if err != nil {
- return nil, err
- }
- node.transport = transport
- models.RegisterChordServer(transport.server, node)
- node.transport.Start()
- if err := node.join(joinNode); err != nil {
- return nil, err
- }
- // Peridoically stabilize the node.
- go func() {
- ticker := time.NewTicker(1 * time.Second)
- for {
- select {
- case <-ticker.C:
- node.stabilize()
- case <-node.shutdownCh:
- ticker.Stop()
- return
- }
- }
- }()
- // Peridoically fix finger tables.
- go func() {
- next := 0
- ticker := time.NewTicker(100 * time.Millisecond)
- for {
- select {
- case <-ticker.C:
- next = node.fixFinger(next)
- case <-node.shutdownCh:
- ticker.Stop()
- return
- }
- }
- }()
- // Peridoically checkes whether predecessor has failed.
- go func() {
- ticker := time.NewTicker(10 * time.Second)
- for {
- select {
- case <-ticker.C:
- node.checkPredecessor()
- case <-node.shutdownCh:
- ticker.Stop()
- return
- }
- }
- }()
- return node, nil
- }
- type Node struct {
- *models.Node
- cnf *Config
- predecessor *models.Node
- predMtx sync.RWMutex
- successor *models.Node
- succMtx sync.RWMutex
- shutdownCh chan struct{}
- fingerTable fingerTable
- ftMtx sync.RWMutex
- storage Storage
- stMtx sync.RWMutex
- transport Transport
- tsMtx sync.RWMutex
- lastStablized time.Time
- }
- func (n *Node) hashKey(key string) ([]byte, error) {
- h := n.cnf.Hash()
- if _, err := h.Write([]byte(key)); err != nil {
- return nil, err
- }
- val := h.Sum(nil)
- return val, nil
- }
- func (n *Node) join(joinNode *models.Node) error {
- // First check if node already present in the circle
- // Join this node to the same chord ring as parent
- var foo *models.Node
- // // Ask if our id already exists on the ring.
- if joinNode != nil {
- remoteNode, err := n.findSuccessorRPC(joinNode, n.Id)
- if err != nil {
- return err
- }
- if isEqual(remoteNode.Id, n.Id) {
- return ERR_NODE_EXISTS
- }
- foo = joinNode
- } else {
- foo = n.Node
- }
- succ, err := n.findSuccessorRPC(foo, n.Id)
- if err != nil {
- return err
- }
- n.succMtx.Lock()
- n.successor = succ
- n.succMtx.Unlock()
- return nil
- }
- /*
- Public storage implementation
- */
- func (n *Node) Find(key string) (*models.Node, error) {
- return n.locate(key)
- }
- func (n *Node) Get(key string) ([]byte, error) {
- return n.get(key)
- }
- func (n *Node) Set(key, value string) error {
- return n.set(key, value)
- }
- func (n *Node) Delete(key string) error {
- return n.delete(key)
- }
- /*
- Finds the node for the key
- */
- func (n *Node) locate(key string) (*models.Node, error) {
- id, err := n.hashKey(key)
- if err != nil {
- return nil, err
- }
- succ, err := n.findSuccessor(id)
- return succ, err
- }
- func (n *Node) get(key string) ([]byte, error) {
- node, err := n.locate(key)
- if err != nil {
- return nil, err
- }
- val, err := n.getKeyRPC(node, key)
- if err != nil {
- return nil, err
- }
- return val.Value, nil
- }
- func (n *Node) set(key, value string) error {
- node, err := n.locate(key)
- if err != nil {
- return err
- }
- err = n.setKeyRPC(node, key, value)
- return err
- }
- func (n *Node) delete(key string) error {
- node, err := n.locate(key)
- if err != nil {
- return err
- }
- err = n.deleteKeyRPC(node, key)
- return err
- }
- func (n *Node) transferKeys(pred, succ *models.Node) {
- keys, err := n.requestKeys(pred, succ)
- if len(keys) > 0 {
- fmt.Println("transfering: ", keys, err)
- }
- delKeyList := make([]string, 0, 10)
- // store the keys in current node
- for _, item := range keys {
- if item == nil {
- continue
- }
- n.storage.Set(item.Key, item.Value)
- delKeyList = append(delKeyList, item.Key)
- }
- // delete the keys from the successor node, as current node
- // is responsible for the keys
- if len(delKeyList) > 0 {
- n.deleteKeys(succ, delKeyList)
- }
- }
- func (n *Node) moveKeysFromLocal(pred, succ *models.Node) {
- keys, err := n.storage.Between(pred.Id, succ.Id)
- if len(keys) > 0 {
- fmt.Println("transfering: ", keys, succ, err)
- }
- delKeyList := make([]string, 0, 10)
- // store the keys in current node
- for _, item := range keys {
- if item == nil {
- continue
- }
- err := n.setKeyRPC(succ, item.Key, item.Value)
- if err != nil {
- fmt.Println("error transfering key: ", item.Key, succ.Addr)
- }
- delKeyList = append(delKeyList, item.Key)
- }
- // delete the keys from the successor node, as current node
- // is responsible for the keys
- if len(delKeyList) > 0 {
- n.deleteKeys(succ, delKeyList)
- }
- }
- func (n *Node) deleteKeys(node *models.Node, keys []string) error {
- return n.deleteKeysRPC(node, keys)
- }
- // When a new node joins, it requests keys from it's successor
- func (n *Node) requestKeys(pred, succ *models.Node) ([]*models.KV, error) {
- if isEqual(n.Id, succ.Id) {
- return nil, nil
- }
- return n.requestKeysRPC(
- succ, pred.Id, n.Id,
- )
- }
- /*
- Fig 5 implementation for find_succesor
- First check if key present in local table, if not
- then look for how to travel in the ring
- */
- func (n *Node) findSuccessor(id []byte) (*models.Node, error) {
- // Check if lock is needed throughout the process
- n.succMtx.RLock()
- defer n.succMtx.RUnlock()
- curr := n.Node
- succ := n.successor
- if succ == nil {
- return curr, nil
- }
- var err error
- if betweenRightIncl(id, curr.Id, succ.Id) {
- return succ, nil
- } else {
- pred := n.closestPrecedingNode(id)
- /*
- NOT SURE ABOUT THIS, RECHECK from paper!!!
- if preceeding node and current node are the same,
- store the key on this node
- */
- if isEqual(pred.Id, n.Id) {
- succ, err = n.getSuccessorRPC(pred)
- if err != nil {
- return nil, err
- }
- if succ == nil {
- // not able to wrap around, current node is the successor
- return pred, nil
- }
- return succ, nil
- }
- succ, err := n.findSuccessorRPC(pred, id)
- // fmt.Println("successor to closest node ", succ, err)
- if err != nil {
- return nil, err
- }
- if succ == nil {
- // not able to wrap around, current node is the successor
- return curr, nil
- }
- return succ, nil
- }
- return nil, nil
- }
- // Fig 5 implementation for closest_preceding_node
- func (n *Node) closestPrecedingNode(id []byte) *models.Node {
- n.predMtx.RLock()
- defer n.predMtx.RUnlock()
- curr := n.Node
- m := len(n.fingerTable) - 1
- for i := m; i >= 0; i-- {
- f := n.fingerTable[i]
- if f == nil || f.Node == nil {
- continue
- }
- if between(f.Id, curr.Id, id) {
- return f.Node
- }
- }
- return curr
- }
- /*
- Periodic functions implementation
- */
- func (n *Node) stabilize() {
- n.succMtx.RLock()
- succ := n.successor
- if succ == nil {
- n.succMtx.RUnlock()
- return
- }
- n.succMtx.RUnlock()
- x, err := n.getPredecessorRPC(succ)
- if err != nil || x == nil {
- fmt.Println("error getting predecessor, ", err, x)
- return
- }
- if x.Id != nil && between(x.Id, n.Id, succ.Id) {
- n.succMtx.Lock()
- n.successor = x
- n.succMtx.Unlock()
- }
- n.notifyRPC(succ, n.Node)
- }
- func (n *Node) checkPredecessor() {
- // implement using rpc func
- n.predMtx.RLock()
- pred := n.predecessor
- n.predMtx.RUnlock()
- if pred != nil {
- err := n.transport.CheckPredecessor(pred)
- if err != nil {
- fmt.Println("predecessor failed!", err)
- n.predMtx.Lock()
- n.predecessor = nil
- n.predMtx.Unlock()
- }
- }
- }
- /*
- RPC callers implementation
- */
- // getSuccessorRPC the successor ID of a remote node.
- func (n *Node) getSuccessorRPC(node *models.Node) (*models.Node, error) {
- return n.transport.GetSuccessor(node)
- }
- // setSuccessorRPC sets the successor of a given node.
- func (n *Node) setSuccessorRPC(node *models.Node, succ *models.Node) error {
- return n.transport.SetSuccessor(node, succ)
- }
- // findSuccessorRPC finds the successor node of a given ID in the entire ring.
- func (n *Node) findSuccessorRPC(node *models.Node, id []byte) (*models.Node, error) {
- return n.transport.FindSuccessor(node, id)
- }
- // getSuccessorRPC the successor ID of a remote node.
- func (n *Node) getPredecessorRPC(node *models.Node) (*models.Node, error) {
- return n.transport.GetPredecessor(node)
- }
- // setPredecessorRPC sets the predecessor of a given node.
- func (n *Node) setPredecessorRPC(node *models.Node, pred *models.Node) error {
- return n.transport.SetPredecessor(node, pred)
- }
- // notifyRPC notifies a remote node that pred is its predecessor.
- func (n *Node) notifyRPC(node, pred *models.Node) error {
- return n.transport.Notify(node, pred)
- }
- func (n *Node) getKeyRPC(node *models.Node, key string) (*models.GetResponse, error) {
- return n.transport.GetKey(node, key)
- }
- func (n *Node) setKeyRPC(node *models.Node, key, value string) error {
- return n.transport.SetKey(node, key, value)
- }
- func (n *Node) deleteKeyRPC(node *models.Node, key string) error {
- return n.transport.DeleteKey(node, key)
- }
- func (n *Node) requestKeysRPC(
- node *models.Node, from []byte, to []byte,
- ) ([]*models.KV, error) {
- return n.transport.RequestKeys(node, from, to)
- }
- func (n *Node) deleteKeysRPC(
- node *models.Node, keys []string,
- ) error {
- return n.transport.DeleteKeys(node, keys)
- }
- /*
- RPC interface implementation
- */
- // GetSuccessor gets the successor on the node..
- func (n *Node) GetSuccessor(ctx context.Context, r *models.ER) (*models.Node, error) {
- n.succMtx.RLock()
- succ := n.successor
- n.succMtx.RUnlock()
- if succ == nil {
- return emptyNode, nil
- }
- return succ, nil
- }
- // SetSuccessor sets the successor on the node..
- func (n *Node) SetSuccessor(ctx context.Context, succ *models.Node) (*models.ER, error) {
- n.succMtx.Lock()
- n.successor = succ
- n.succMtx.Unlock()
- return emptyRequest, nil
- }
- // SetPredecessor sets the predecessor on the node..
- func (n *Node) SetPredecessor(ctx context.Context, pred *models.Node) (*models.ER, error) {
- n.predMtx.Lock()
- n.predecessor = pred
- n.predMtx.Unlock()
- return emptyRequest, nil
- }
- func (n *Node) FindSuccessor(ctx context.Context, id *models.ID) (*models.Node, error) {
- succ, err := n.findSuccessor(id.Id)
- if err != nil {
- return nil, err
- }
- if succ == nil {
- return nil, ERR_NO_SUCCESSOR
- }
- return succ, nil
- }
- func (n *Node) CheckPredecessor(ctx context.Context, id *models.ID) (*models.ER, error) {
- return emptyRequest, nil
- }
- func (n *Node) GetPredecessor(ctx context.Context, r *models.ER) (*models.Node, error) {
- n.predMtx.RLock()
- pred := n.predecessor
- n.predMtx.RUnlock()
- if pred == nil {
- return emptyNode, nil
- }
- return pred, nil
- }
- func (n *Node) Notify(ctx context.Context, node *models.Node) (*models.ER, error) {
- n.predMtx.Lock()
- defer n.predMtx.Unlock()
- var prevPredNode *models.Node
- pred := n.predecessor
- if pred == nil || between(node.Id, pred.Id, n.Id) {
- // fmt.Println("setting predecessor", n.Id, node.Id)
- if n.predecessor != nil {
- prevPredNode = n.predecessor
- }
- n.predecessor = node
- // transfer keys from parent node
- if prevPredNode != nil {
- if between(n.predecessor.Id, prevPredNode.Id, n.Id) {
- n.transferKeys(prevPredNode, n.predecessor)
- }
- }
- }
- return emptyRequest, nil
- }
- func (n *Node) XGet(ctx context.Context, req *models.GetRequest) (*models.GetResponse, error) {
- n.stMtx.RLock()
- defer n.stMtx.RUnlock()
- val, err := n.storage.Get(req.Key)
- if err != nil {
- return emptyGetResponse, err
- }
- return &models.GetResponse{Value: val}, nil
- }
- func (n *Node) XSet(ctx context.Context, req *models.SetRequest) (*models.SetResponse, error) {
- n.stMtx.Lock()
- defer n.stMtx.Unlock()
- fmt.Println("setting key on ", n.Node.Addr, req.Key, req.Value)
- err := n.storage.Set(req.Key, req.Value)
- return emptySetResponse, err
- }
- func (n *Node) XDelete(ctx context.Context, req *models.DeleteRequest) (*models.DeleteResponse, error) {
- n.stMtx.Lock()
- defer n.stMtx.Unlock()
- err := n.storage.Delete(req.Key)
- return emptyDeleteResponse, err
- }
- func (n *Node) XRequestKeys(ctx context.Context, req *models.RequestKeysRequest) (*models.RequestKeysResponse, error) {
- n.stMtx.RLock()
- defer n.stMtx.RUnlock()
- val, err := n.storage.Between(req.From, req.To)
- if err != nil {
- return emptyRequestKeysResponse, err
- }
- return &models.RequestKeysResponse{Values: val}, nil
- }
- func (n *Node) XMultiDelete(ctx context.Context, req *models.MultiDeleteRequest) (*models.DeleteResponse, error) {
- n.stMtx.Lock()
- defer n.stMtx.Unlock()
- err := n.storage.MDelete(req.Keys...)
- return emptyDeleteResponse, err
- }
- func (n *Node) Stop() {
- close(n.shutdownCh)
- // Notify successor to change its predecessor pointer to our predecessor.
- // Do nothing if we are our own successor (i.e. we are the only node in the
- // ring).
- n.succMtx.RLock()
- succ := n.successor
- n.succMtx.RUnlock()
- n.predMtx.RLock()
- pred := n.predecessor
- n.predMtx.RUnlock()
- if n.Node.Addr != succ.Addr && pred != nil {
- n.moveKeysFromLocal(pred, succ)
- predErr := n.setPredecessorRPC(succ, pred)
- succErr := n.setSuccessorRPC(pred, succ)
- fmt.Println("stop errors: ", predErr, succErr)
- }
- n.transport.Stop()
- }
|