transport.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380
  1. package chord
  2. import (
  3. "errors"
  4. "fmt"
  5. "net"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "trial/achord/models"
  10. "golang.org/x/net/context"
  11. "google.golang.org/grpc"
  12. )
  13. var (
  14. emptyNode = &models.Node{}
  15. emptyRequest = &models.ER{}
  16. emptyGetResponse = &models.GetResponse{}
  17. emptySetResponse = &models.SetResponse{}
  18. emptyDeleteResponse = &models.DeleteResponse{}
  19. emptyRequestKeysResponse = &models.RequestKeysResponse{}
  20. )
  21. func Dial(addr string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
  22. return grpc.Dial(addr, opts...)
  23. }
  24. /*
  25. Transport enables a node to talk to the other nodes in
  26. the ring
  27. */
  28. type Transport interface {
  29. Start() error
  30. Stop() error
  31. //RPC
  32. GetSuccessor(*models.Node) (*models.Node, error)
  33. FindSuccessor(*models.Node, []byte) (*models.Node, error)
  34. GetPredecessor(*models.Node) (*models.Node, error)
  35. Notify(*models.Node, *models.Node) error
  36. CheckPredecessor(*models.Node) error
  37. SetPredecessor(*models.Node, *models.Node) error
  38. SetSuccessor(*models.Node, *models.Node) error
  39. //Storage
  40. GetKey(*models.Node, string) (*models.GetResponse, error)
  41. SetKey(*models.Node, string, string) error
  42. DeleteKey(*models.Node, string) error
  43. RequestKeys(*models.Node, []byte, []byte) ([]*models.KV, error)
  44. DeleteKeys(*models.Node, []string) error
  45. }
  46. type GrpcTransport struct {
  47. config *Config
  48. timeout time.Duration
  49. maxIdle time.Duration
  50. sock *net.TCPListener
  51. pool map[string]*grpcConn
  52. poolMtx sync.RWMutex
  53. server *grpc.Server
  54. shutdown int32
  55. }
  56. // func NewGrpcTransport(config *Config) (models.ChordClient, error) {
  57. func NewGrpcTransport(config *Config) (*GrpcTransport, error) {
  58. addr := config.Addr
  59. // Try to start the listener
  60. listener, err := net.Listen("tcp", addr)
  61. if err != nil {
  62. return nil, err
  63. }
  64. pool := make(map[string]*grpcConn)
  65. // Setup the transport
  66. grp := &GrpcTransport{
  67. sock: listener.(*net.TCPListener),
  68. timeout: config.Timeout,
  69. maxIdle: config.MaxIdle,
  70. pool: pool,
  71. config: config,
  72. }
  73. grp.server = grpc.NewServer(config.ServerOpts...)
  74. // Done
  75. return grp, nil
  76. }
  77. type grpcConn struct {
  78. addr string
  79. client models.ChordClient
  80. conn *grpc.ClientConn
  81. lastActive time.Time
  82. }
  83. func (g *grpcConn) Close() {
  84. g.conn.Close()
  85. }
  86. func (g *GrpcTransport) registerNode(node *Node) {
  87. models.RegisterChordServer(g.server, node)
  88. }
  89. func (g *GrpcTransport) GetServer() *grpc.Server {
  90. return g.server
  91. }
  92. // Gets an outbound connection to a host
  93. func (g *GrpcTransport) getConn(
  94. addr string,
  95. ) (models.ChordClient, error) {
  96. g.poolMtx.RLock()
  97. if atomic.LoadInt32(&g.shutdown) == 1 {
  98. g.poolMtx.Unlock()
  99. return nil, fmt.Errorf("TCP transport is shutdown")
  100. }
  101. cc, ok := g.pool[addr]
  102. g.poolMtx.RUnlock()
  103. if ok {
  104. return cc.client, nil
  105. }
  106. var conn *grpc.ClientConn
  107. var err error
  108. conn, err = Dial(addr, g.config.DialOpts...)
  109. if err != nil {
  110. return nil, err
  111. }
  112. client := models.NewChordClient(conn)
  113. cc = &grpcConn{addr, client, conn, time.Now()}
  114. g.poolMtx.Lock()
  115. if g.pool == nil {
  116. g.poolMtx.Unlock()
  117. return nil, errors.New("must instantiate node before using")
  118. }
  119. g.pool[addr] = cc
  120. g.poolMtx.Unlock()
  121. return client, nil
  122. }
  123. func (g *GrpcTransport) Start() error {
  124. // Start RPC server
  125. go g.listen()
  126. // Reap old connections
  127. go g.reapOld()
  128. return nil
  129. }
  130. // Returns an outbound TCP connection to the pool
  131. func (g *GrpcTransport) returnConn(o *grpcConn) {
  132. // Update the last asctive time
  133. o.lastActive = time.Now()
  134. // Push back into the pool
  135. g.poolMtx.Lock()
  136. defer g.poolMtx.Unlock()
  137. if atomic.LoadInt32(&g.shutdown) == 1 {
  138. o.conn.Close()
  139. return
  140. }
  141. g.pool[o.addr] = o
  142. }
  143. // Shutdown the TCP transport
  144. func (g *GrpcTransport) Stop() error {
  145. atomic.StoreInt32(&g.shutdown, 1)
  146. // Close all the connections
  147. g.poolMtx.Lock()
  148. g.server.Stop()
  149. for _, conn := range g.pool {
  150. conn.Close()
  151. }
  152. g.pool = nil
  153. g.poolMtx.Unlock()
  154. return nil
  155. }
  156. // Closes old outbound connections
  157. func (g *GrpcTransport) reapOld() {
  158. ticker := time.NewTicker(60 * time.Second)
  159. for {
  160. if atomic.LoadInt32(&g.shutdown) == 1 {
  161. return
  162. }
  163. select {
  164. case <-ticker.C:
  165. g.reap()
  166. }
  167. }
  168. }
  169. func (g *GrpcTransport) reap() {
  170. g.poolMtx.Lock()
  171. defer g.poolMtx.Unlock()
  172. for host, conn := range g.pool {
  173. if time.Since(conn.lastActive) > g.maxIdle {
  174. conn.Close()
  175. delete(g.pool, host)
  176. }
  177. }
  178. }
  179. // Listens for inbound connections
  180. func (g *GrpcTransport) listen() {
  181. g.server.Serve(g.sock)
  182. }
  183. // GetSuccessor the successor ID of a remote node.
  184. func (g *GrpcTransport) GetSuccessor(node *models.Node) (*models.Node, error) {
  185. client, err := g.getConn(node.Addr)
  186. if err != nil {
  187. return nil, err
  188. }
  189. ctx, cancel := context.WithTimeout(context.Background(), g.timeout)
  190. defer cancel()
  191. return client.GetSuccessor(ctx, emptyRequest)
  192. }
  193. // FindSuccessor the successor ID of a remote node.
  194. func (g *GrpcTransport) FindSuccessor(node *models.Node, id []byte) (*models.Node, error) {
  195. // fmt.Println("yo", node.Id, id)
  196. client, err := g.getConn(node.Addr)
  197. if err != nil {
  198. return nil, err
  199. }
  200. ctx, cancel := context.WithTimeout(context.Background(), g.timeout)
  201. defer cancel()
  202. return client.FindSuccessor(ctx, &models.ID{Id: id})
  203. }
  204. // GetPredecessor the successor ID of a remote node.
  205. func (g *GrpcTransport) GetPredecessor(node *models.Node) (*models.Node, error) {
  206. client, err := g.getConn(node.Addr)
  207. if err != nil {
  208. return nil, err
  209. }
  210. ctx, cancel := context.WithTimeout(context.Background(), g.timeout)
  211. defer cancel()
  212. return client.GetPredecessor(ctx, emptyRequest)
  213. }
  214. func (g *GrpcTransport) SetPredecessor(node *models.Node, pred *models.Node) error {
  215. client, err := g.getConn(node.Addr)
  216. if err != nil {
  217. return err
  218. }
  219. ctx, cancel := context.WithTimeout(context.Background(), g.timeout)
  220. defer cancel()
  221. _, err = client.SetPredecessor(ctx, pred)
  222. return err
  223. }
  224. func (g *GrpcTransport) SetSuccessor(node *models.Node, succ *models.Node) error {
  225. client, err := g.getConn(node.Addr)
  226. if err != nil {
  227. return err
  228. }
  229. ctx, cancel := context.WithTimeout(context.Background(), g.timeout)
  230. defer cancel()
  231. _, err = client.SetSuccessor(ctx, succ)
  232. return err
  233. }
  234. func (g *GrpcTransport) Notify(node, pred *models.Node) error {
  235. client, err := g.getConn(node.Addr)
  236. if err != nil {
  237. return err
  238. }
  239. ctx, cancel := context.WithTimeout(context.Background(), g.timeout)
  240. defer cancel()
  241. _, err = client.Notify(ctx, pred)
  242. return err
  243. }
  244. func (g *GrpcTransport) CheckPredecessor(node *models.Node) error {
  245. client, err := g.getConn(node.Addr)
  246. if err != nil {
  247. return err
  248. }
  249. ctx, cancel := context.WithTimeout(context.Background(), g.timeout)
  250. defer cancel()
  251. _, err = client.CheckPredecessor(ctx, &models.ID{Id: node.Id})
  252. return err
  253. }
  254. func (g *GrpcTransport) GetKey(node *models.Node, key string) (*models.GetResponse, error) {
  255. client, err := g.getConn(node.Addr)
  256. if err != nil {
  257. return nil, err
  258. }
  259. ctx, cancel := context.WithTimeout(context.Background(), g.timeout)
  260. defer cancel()
  261. return client.XGet(ctx, &models.GetRequest{Key: key})
  262. }
  263. func (g *GrpcTransport) SetKey(node *models.Node, key, value string) error {
  264. client, err := g.getConn(node.Addr)
  265. if err != nil {
  266. return err
  267. }
  268. ctx, cancel := context.WithTimeout(context.Background(), g.timeout)
  269. defer cancel()
  270. _, err = client.XSet(ctx, &models.SetRequest{Key: key, Value: value})
  271. return err
  272. }
  273. func (g *GrpcTransport) DeleteKey(node *models.Node, key string) error {
  274. client, err := g.getConn(node.Addr)
  275. if err != nil {
  276. return err
  277. }
  278. ctx, cancel := context.WithTimeout(context.Background(), g.timeout)
  279. defer cancel()
  280. _, err = client.XDelete(ctx, &models.DeleteRequest{Key: key})
  281. return err
  282. }
  283. func (g *GrpcTransport) RequestKeys(node *models.Node, from, to []byte) ([]*models.KV, error) {
  284. client, err := g.getConn(node.Addr)
  285. if err != nil {
  286. return nil, err
  287. }
  288. ctx, cancel := context.WithTimeout(context.Background(), g.timeout)
  289. defer cancel()
  290. val, err := client.XRequestKeys(
  291. ctx, &models.RequestKeysRequest{From: from, To: to},
  292. )
  293. if err != nil {
  294. return nil, err
  295. }
  296. return val.Values, nil
  297. }
  298. func (g *GrpcTransport) DeleteKeys(node *models.Node, keys []string) error {
  299. client, err := g.getConn(node.Addr)
  300. if err != nil {
  301. return err
  302. }
  303. ctx, cancel := context.WithTimeout(context.Background(), g.timeout)
  304. defer cancel()
  305. _, err = client.XMultiDelete(
  306. ctx, &models.MultiDeleteRequest{Keys: keys},
  307. )
  308. return err
  309. }