node.go 14 KB


  1. package chord
  2. import (
  3. "crypto/sha1"
  4. "fmt"
  5. "hash"
  6. "math/big"
  7. "sync"
  8. "time"
  9. "trial/achord/models"
  10. "golang.org/x/net/context"
  11. "google.golang.org/grpc"
  12. )
  13. func DefaultConfig() *Config {
  14. n := &Config{
  15. Hash: sha1.New,
  16. DialOpts: make([]grpc.DialOption, 0, 5),
  17. }
  18. // n.HashSize = n.Hash().Size()
  19. n.HashSize = n.Hash().Size() * 8
  20. n.DialOpts = append(n.DialOpts,
  21. grpc.WithBlock(),
  22. grpc.WithTimeout(5*time.Second),
  23. grpc.FailOnNonTempDialError(true),
  24. grpc.WithInsecure(),
  25. )
  26. return n
  27. }
  28. type Config struct {
  29. Id string
  30. Addr string
  31. ServerOpts []grpc.ServerOption
  32. DialOpts []grpc.DialOption
  33. Hash func() hash.Hash // Hash function to use
  34. HashSize int
  35. StabilizeMin time.Duration // Minimum stabilization time
  36. StabilizeMax time.Duration // Maximum stabilization time
  37. Timeout time.Duration
  38. MaxIdle time.Duration
  39. }
  40. func (c *Config) Validate() error {
  41. // hashsize shouldnt be less than hash func size
  42. return nil
  43. }
  44. func NewInode(id string, addr string) *models.Node {
  45. h := sha1.New()
  46. if _, err := h.Write([]byte(id)); err != nil {
  47. return nil
  48. }
  49. val := h.Sum(nil)
  50. return &models.Node{
  51. Id: val,
  52. Addr: addr,
  53. }
  54. }
  55. /*
  56. NewNode creates a new Chord node. Returns error if node already
  57. exists in the chord ring
  58. */
  59. func NewNode(cnf *Config, joinNode *models.Node) (*Node, error) {
  60. if err := cnf.Validate(); err != nil {
  61. return nil, err
  62. }
  63. node := &Node{
  64. Node: new(models.Node),
  65. shutdownCh: make(chan struct{}),
  66. cnf: cnf,
  67. storage: NewMapStore(cnf.Hash),
  68. }
  69. var nID string
  70. if cnf.Id != "" {
  71. nID = cnf.Id
  72. } else {
  73. nID = cnf.Addr
  74. }
  75. id, err := node.hashKey(nID)
  76. if err != nil {
  77. return nil, err
  78. }
  79. aInt := (&big.Int{}).SetBytes(id)
  80. fmt.Printf("new node id %d, \n", aInt)
  81. node.Node.Id = id
  82. node.Node.Addr = cnf.Addr
  83. // Populate finger table
  84. node.fingerTable = newFingerTable(node.Node, cnf.HashSize)
  85. // Start RPC server
  86. transport, err := NewGrpcTransport(cnf)
  87. if err != nil {
  88. return nil, err
  89. }
  90. node.transport = transport
  91. models.RegisterChordServer(transport.server, node)
  92. node.transport.Start()
  93. if err := node.join(joinNode); err != nil {
  94. return nil, err
  95. }
  96. // Peridoically stabilize the node.
  97. go func() {
  98. ticker := time.NewTicker(1 * time.Second)
  99. for {
  100. select {
  101. case <-ticker.C:
  102. node.stabilize()
  103. case <-node.shutdownCh:
  104. ticker.Stop()
  105. return
  106. }
  107. }
  108. }()
  109. // Peridoically fix finger tables.
  110. go func() {
  111. next := 0
  112. ticker := time.NewTicker(100 * time.Millisecond)
  113. for {
  114. select {
  115. case <-ticker.C:
  116. next = node.fixFinger(next)
  117. case <-node.shutdownCh:
  118. ticker.Stop()
  119. return
  120. }
  121. }
  122. }()
  123. // Peridoically checkes whether predecessor has failed.
  124. go func() {
  125. ticker := time.NewTicker(10 * time.Second)
  126. for {
  127. select {
  128. case <-ticker.C:
  129. node.checkPredecessor()
  130. case <-node.shutdownCh:
  131. ticker.Stop()
  132. return
  133. }
  134. }
  135. }()
  136. return node, nil
  137. }
  138. type Node struct {
  139. *models.Node
  140. cnf *Config
  141. predecessor *models.Node
  142. predMtx sync.RWMutex
  143. successor *models.Node
  144. succMtx sync.RWMutex
  145. shutdownCh chan struct{}
  146. fingerTable fingerTable
  147. ftMtx sync.RWMutex
  148. storage Storage
  149. stMtx sync.RWMutex
  150. transport Transport
  151. tsMtx sync.RWMutex
  152. lastStablized time.Time
  153. }
  154. func (n *Node) hashKey(key string) ([]byte, error) {
  155. h := n.cnf.Hash()
  156. if _, err := h.Write([]byte(key)); err != nil {
  157. return nil, err
  158. }
  159. val := h.Sum(nil)
  160. return val, nil
  161. }
  162. func (n *Node) join(joinNode *models.Node) error {
  163. // First check if node already present in the circle
  164. // Join this node to the same chord ring as parent
  165. var foo *models.Node
  166. // // Ask if our id already exists on the ring.
  167. if joinNode != nil {
  168. remoteNode, err := n.findSuccessorRPC(joinNode, n.Id)
  169. if err != nil {
  170. return err
  171. }
  172. if isEqual(remoteNode.Id, n.Id) {
  173. return ERR_NODE_EXISTS
  174. }
  175. foo = joinNode
  176. } else {
  177. foo = n.Node
  178. }
  179. succ, err := n.findSuccessorRPC(foo, n.Id)
  180. if err != nil {
  181. return err
  182. }
  183. n.succMtx.Lock()
  184. n.successor = succ
  185. n.succMtx.Unlock()
  186. return nil
  187. }
  188. /*
  189. Public storage implementation
  190. */
  191. func (n *Node) Find(key string) (*models.Node, error) {
  192. return n.locate(key)
  193. }
  194. func (n *Node) Get(key string) ([]byte, error) {
  195. return n.get(key)
  196. }
  197. func (n *Node) Set(key, value string) error {
  198. return n.set(key, value)
  199. }
  200. func (n *Node) Delete(key string) error {
  201. return n.delete(key)
  202. }
  203. /*
  204. Finds the node for the key
  205. */
  206. func (n *Node) locate(key string) (*models.Node, error) {
  207. id, err := n.hashKey(key)
  208. if err != nil {
  209. return nil, err
  210. }
  211. succ, err := n.findSuccessor(id)
  212. return succ, err
  213. }
  214. func (n *Node) get(key string) ([]byte, error) {
  215. node, err := n.locate(key)
  216. if err != nil {
  217. return nil, err
  218. }
  219. val, err := n.getKeyRPC(node, key)
  220. if err != nil {
  221. return nil, err
  222. }
  223. return val.Value, nil
  224. }
  225. func (n *Node) set(key, value string) error {
  226. node, err := n.locate(key)
  227. if err != nil {
  228. return err
  229. }
  230. err = n.setKeyRPC(node, key, value)
  231. return err
  232. }
  233. func (n *Node) delete(key string) error {
  234. node, err := n.locate(key)
  235. if err != nil {
  236. return err
  237. }
  238. err = n.deleteKeyRPC(node, key)
  239. return err
  240. }
  241. func (n *Node) transferKeys(pred, succ *models.Node) {
  242. keys, err := n.requestKeys(pred, succ)
  243. if len(keys) > 0 {
  244. fmt.Println("transfering: ", keys, err)
  245. }
  246. delKeyList := make([]string, 0, 10)
  247. // store the keys in current node
  248. for _, item := range keys {
  249. if item == nil {
  250. continue
  251. }
  252. n.storage.Set(item.Key, item.Value)
  253. delKeyList = append(delKeyList, item.Key)
  254. }
  255. // delete the keys from the successor node, as current node
  256. // is responsible for the keys
  257. if len(delKeyList) > 0 {
  258. n.deleteKeys(succ, delKeyList)
  259. }
  260. }
  261. func (n *Node) moveKeysFromLocal(pred, succ *models.Node) {
  262. keys, err := n.storage.Between(pred.Id, succ.Id)
  263. if len(keys) > 0 {
  264. fmt.Println("transfering: ", keys, succ, err)
  265. }
  266. delKeyList := make([]string, 0, 10)
  267. // store the keys in current node
  268. for _, item := range keys {
  269. if item == nil {
  270. continue
  271. }
  272. err := n.setKeyRPC(succ, item.Key, item.Value)
  273. if err != nil {
  274. fmt.Println("error transfering key: ", item.Key, succ.Addr)
  275. }
  276. delKeyList = append(delKeyList, item.Key)
  277. }
  278. // delete the keys from the successor node, as current node
  279. // is responsible for the keys
  280. if len(delKeyList) > 0 {
  281. n.deleteKeys(succ, delKeyList)
  282. }
  283. }
  284. func (n *Node) deleteKeys(node *models.Node, keys []string) error {
  285. return n.deleteKeysRPC(node, keys)
  286. }
  287. // When a new node joins, it requests keys from it's successor
  288. func (n *Node) requestKeys(pred, succ *models.Node) ([]*models.KV, error) {
  289. if isEqual(n.Id, succ.Id) {
  290. return nil, nil
  291. }
  292. return n.requestKeysRPC(
  293. succ, pred.Id, n.Id,
  294. )
  295. }
  296. /*
  297. Fig 5 implementation for find_succesor
  298. First check if key present in local table, if not
  299. then look for how to travel in the ring
  300. */
  301. func (n *Node) findSuccessor(id []byte) (*models.Node, error) {
  302. // Check if lock is needed throughout the process
  303. n.succMtx.RLock()
  304. defer n.succMtx.RUnlock()
  305. curr := n.Node
  306. succ := n.successor
  307. if succ == nil {
  308. return curr, nil
  309. }
  310. var err error
  311. if betweenRightIncl(id, curr.Id, succ.Id) {
  312. return succ, nil
  313. } else {
  314. pred := n.closestPrecedingNode(id)
  315. /*
  316. NOT SURE ABOUT THIS, RECHECK from paper!!!
  317. if preceeding node and current node are the same,
  318. store the key on this node
  319. */
  320. if isEqual(pred.Id, n.Id) {
  321. succ, err = n.getSuccessorRPC(pred)
  322. if err != nil {
  323. return nil, err
  324. }
  325. if succ == nil {
  326. // not able to wrap around, current node is the successor
  327. return pred, nil
  328. }
  329. return succ, nil
  330. }
  331. succ, err := n.findSuccessorRPC(pred, id)
  332. // fmt.Println("successor to closest node ", succ, err)
  333. if err != nil {
  334. return nil, err
  335. }
  336. if succ == nil {
  337. // not able to wrap around, current node is the successor
  338. return curr, nil
  339. }
  340. return succ, nil
  341. }
  342. return nil, nil
  343. }
  344. // Fig 5 implementation for closest_preceding_node
  345. func (n *Node) closestPrecedingNode(id []byte) *models.Node {
  346. n.predMtx.RLock()
  347. defer n.predMtx.RUnlock()
  348. curr := n.Node
  349. m := len(n.fingerTable) - 1
  350. for i := m; i >= 0; i-- {
  351. f := n.fingerTable[i]
  352. if f == nil || f.Node == nil {
  353. continue
  354. }
  355. if between(f.Id, curr.Id, id) {
  356. return f.Node
  357. }
  358. }
  359. return curr
  360. }
  361. /*
  362. Periodic functions implementation
  363. */
  364. func (n *Node) stabilize() {
  365. n.succMtx.RLock()
  366. succ := n.successor
  367. if succ == nil {
  368. n.succMtx.RUnlock()
  369. return
  370. }
  371. n.succMtx.RUnlock()
  372. x, err := n.getPredecessorRPC(succ)
  373. if err != nil || x == nil {
  374. fmt.Println("error getting predecessor, ", err, x)
  375. return
  376. }
  377. if x.Id != nil && between(x.Id, n.Id, succ.Id) {
  378. n.succMtx.Lock()
  379. n.successor = x
  380. n.succMtx.Unlock()
  381. }
  382. n.notifyRPC(succ, n.Node)
  383. }
  384. func (n *Node) checkPredecessor() {
  385. // implement using rpc func
  386. n.predMtx.RLock()
  387. pred := n.predecessor
  388. n.predMtx.RUnlock()
  389. if pred != nil {
  390. err := n.transport.CheckPredecessor(pred)
  391. if err != nil {
  392. fmt.Println("predecessor failed!", err)
  393. n.predMtx.Lock()
  394. n.predecessor = nil
  395. n.predMtx.Unlock()
  396. }
  397. }
  398. }
  399. /*
  400. RPC callers implementation
  401. */
  402. // getSuccessorRPC the successor ID of a remote node.
  403. func (n *Node) getSuccessorRPC(node *models.Node) (*models.Node, error) {
  404. return n.transport.GetSuccessor(node)
  405. }
  406. // setSuccessorRPC sets the successor of a given node.
  407. func (n *Node) setSuccessorRPC(node *models.Node, succ *models.Node) error {
  408. return n.transport.SetSuccessor(node, succ)
  409. }
  410. // findSuccessorRPC finds the successor node of a given ID in the entire ring.
  411. func (n *Node) findSuccessorRPC(node *models.Node, id []byte) (*models.Node, error) {
  412. return n.transport.FindSuccessor(node, id)
  413. }
  414. // getSuccessorRPC the successor ID of a remote node.
  415. func (n *Node) getPredecessorRPC(node *models.Node) (*models.Node, error) {
  416. return n.transport.GetPredecessor(node)
  417. }
  418. // setPredecessorRPC sets the predecessor of a given node.
  419. func (n *Node) setPredecessorRPC(node *models.Node, pred *models.Node) error {
  420. return n.transport.SetPredecessor(node, pred)
  421. }
  422. // notifyRPC notifies a remote node that pred is its predecessor.
  423. func (n *Node) notifyRPC(node, pred *models.Node) error {
  424. return n.transport.Notify(node, pred)
  425. }
  426. func (n *Node) getKeyRPC(node *models.Node, key string) (*models.GetResponse, error) {
  427. return n.transport.GetKey(node, key)
  428. }
  429. func (n *Node) setKeyRPC(node *models.Node, key, value string) error {
  430. return n.transport.SetKey(node, key, value)
  431. }
  432. func (n *Node) deleteKeyRPC(node *models.Node, key string) error {
  433. return n.transport.DeleteKey(node, key)
  434. }
  435. func (n *Node) requestKeysRPC(
  436. node *models.Node, from []byte, to []byte,
  437. ) ([]*models.KV, error) {
  438. return n.transport.RequestKeys(node, from, to)
  439. }
  440. func (n *Node) deleteKeysRPC(
  441. node *models.Node, keys []string,
  442. ) error {
  443. return n.transport.DeleteKeys(node, keys)
  444. }
  445. /*
  446. RPC interface implementation
  447. */
  448. // GetSuccessor gets the successor on the node..
  449. func (n *Node) GetSuccessor(ctx context.Context, r *models.ER) (*models.Node, error) {
  450. n.succMtx.RLock()
  451. succ := n.successor
  452. n.succMtx.RUnlock()
  453. if succ == nil {
  454. return emptyNode, nil
  455. }
  456. return succ, nil
  457. }
  458. // SetSuccessor sets the successor on the node..
  459. func (n *Node) SetSuccessor(ctx context.Context, succ *models.Node) (*models.ER, error) {
  460. n.succMtx.Lock()
  461. n.successor = succ
  462. n.succMtx.Unlock()
  463. return emptyRequest, nil
  464. }
  465. // SetPredecessor sets the predecessor on the node..
  466. func (n *Node) SetPredecessor(ctx context.Context, pred *models.Node) (*models.ER, error) {
  467. n.predMtx.Lock()
  468. n.predecessor = pred
  469. n.predMtx.Unlock()
  470. return emptyRequest, nil
  471. }
  472. func (n *Node) FindSuccessor(ctx context.Context, id *models.ID) (*models.Node, error) {
  473. succ, err := n.findSuccessor(id.Id)
  474. if err != nil {
  475. return nil, err
  476. }
  477. if succ == nil {
  478. return nil, ERR_NO_SUCCESSOR
  479. }
  480. return succ, nil
  481. }
  482. func (n *Node) CheckPredecessor(ctx context.Context, id *models.ID) (*models.ER, error) {
  483. return emptyRequest, nil
  484. }
  485. func (n *Node) GetPredecessor(ctx context.Context, r *models.ER) (*models.Node, error) {
  486. n.predMtx.RLock()
  487. pred := n.predecessor
  488. n.predMtx.RUnlock()
  489. if pred == nil {
  490. return emptyNode, nil
  491. }
  492. return pred, nil
  493. }
  494. func (n *Node) Notify(ctx context.Context, node *models.Node) (*models.ER, error) {
  495. n.predMtx.Lock()
  496. defer n.predMtx.Unlock()
  497. var prevPredNode *models.Node
  498. pred := n.predecessor
  499. if pred == nil || between(node.Id, pred.Id, n.Id) {
  500. // fmt.Println("setting predecessor", n.Id, node.Id)
  501. if n.predecessor != nil {
  502. prevPredNode = n.predecessor
  503. }
  504. n.predecessor = node
  505. // transfer keys from parent node
  506. if prevPredNode != nil {
  507. if between(n.predecessor.Id, prevPredNode.Id, n.Id) {
  508. n.transferKeys(prevPredNode, n.predecessor)
  509. }
  510. }
  511. }
  512. return emptyRequest, nil
  513. }
  514. func (n *Node) XGet(ctx context.Context, req *models.GetRequest) (*models.GetResponse, error) {
  515. n.stMtx.RLock()
  516. defer n.stMtx.RUnlock()
  517. val, err := n.storage.Get(req.Key)
  518. if err != nil {
  519. return emptyGetResponse, err
  520. }
  521. return &models.GetResponse{Value: val}, nil
  522. }
  523. func (n *Node) XSet(ctx context.Context, req *models.SetRequest) (*models.SetResponse, error) {
  524. n.stMtx.Lock()
  525. defer n.stMtx.Unlock()
  526. fmt.Println("setting key on ", n.Node.Addr, req.Key, req.Value)
  527. err := n.storage.Set(req.Key, req.Value)
  528. return emptySetResponse, err
  529. }
  530. func (n *Node) XDelete(ctx context.Context, req *models.DeleteRequest) (*models.DeleteResponse, error) {
  531. n.stMtx.Lock()
  532. defer n.stMtx.Unlock()
  533. err := n.storage.Delete(req.Key)
  534. return emptyDeleteResponse, err
  535. }
  536. func (n *Node) XRequestKeys(ctx context.Context, req *models.RequestKeysRequest) (*models.RequestKeysResponse, error) {
  537. n.stMtx.RLock()
  538. defer n.stMtx.RUnlock()
  539. val, err := n.storage.Between(req.From, req.To)
  540. if err != nil {
  541. return emptyRequestKeysResponse, err
  542. }
  543. return &models.RequestKeysResponse{Values: val}, nil
  544. }
  545. func (n *Node) XMultiDelete(ctx context.Context, req *models.MultiDeleteRequest) (*models.DeleteResponse, error) {
  546. n.stMtx.Lock()
  547. defer n.stMtx.Unlock()
  548. err := n.storage.MDelete(req.Keys...)
  549. return emptyDeleteResponse, err
  550. }
  551. func (n *Node) Stop() {
  552. close(n.shutdownCh)
  553. // Notify successor to change its predecessor pointer to our predecessor.
  554. // Do nothing if we are our own successor (i.e. we are the only node in the
  555. // ring).
  556. n.succMtx.RLock()
  557. succ := n.successor
  558. n.succMtx.RUnlock()
  559. n.predMtx.RLock()
  560. pred := n.predecessor
  561. n.predMtx.RUnlock()
  562. if n.Node.Addr != succ.Addr && pred != nil {
  563. n.moveKeysFromLocal(pred, succ)
  564. predErr := n.setPredecessorRPC(succ, pred)
  565. succErr := n.setSuccessorRPC(pred, succ)
  566. fmt.Println("stop errors: ", predErr, succErr)
  567. }
  568. n.transport.Stop()
  569. }