client.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. package goredis
  2. import (
  3. "container/list"
  4. "io"
  5. "net"
  6. "strings"
  7. "sync"
  8. "time"
  9. )
  10. type PoolConn struct {
  11. *Conn
  12. c *Client
  13. }
  14. func (c *PoolConn) Close() {
  15. if c.Conn.isClosed() {
  16. return
  17. }
  18. c.c.put(c.Conn)
  19. }
  20. // force close inner connection and not put it into pool
  21. func (c *PoolConn) Finalize() {
  22. c.Conn.Close()
  23. }
  24. type Client struct {
  25. sync.Mutex
  26. addr string
  27. maxIdleConns int
  28. readBufferSize int
  29. writeBufferSize int
  30. password string
  31. conns *list.List
  32. quit chan struct{}
  33. wg sync.WaitGroup
  34. }
  35. func getProto(addr string) string {
  36. if strings.Contains(addr, "/") {
  37. return "unix"
  38. } else {
  39. return "tcp"
  40. }
  41. }
  42. func NewClient(addr string, password string) *Client {
  43. c := new(Client)
  44. c.addr = addr
  45. c.maxIdleConns = 4
  46. c.readBufferSize = 1024
  47. c.writeBufferSize = 1024
  48. c.password = password
  49. c.conns = list.New()
  50. c.quit = make(chan struct{})
  51. c.wg.Add(1)
  52. go c.onCheck()
  53. return c
  54. }
  55. func (c *Client) SetPassword(pass string) {
  56. c.password = pass
  57. }
  58. func (c *Client) SetReadBufferSize(s int) {
  59. c.readBufferSize = s
  60. }
  61. func (c *Client) SetWriteBufferSize(s int) {
  62. c.writeBufferSize = s
  63. }
  64. func (c *Client) SetMaxIdleConns(n int) {
  65. c.maxIdleConns = n
  66. }
  67. func (c *Client) Do(cmd string, args ...interface{}) (interface{}, error) {
  68. var co *Conn
  69. var err error
  70. var r interface{}
  71. for i := 0; i < 2; i++ {
  72. co, err = c.get()
  73. if err != nil {
  74. return nil, err
  75. }
  76. r, err = co.Do(cmd, args...)
  77. if err != nil {
  78. if e, ok := err.(*net.OpError); ok && strings.Contains(e.Error(), "use of closed network connection") {
  79. //send to a closed connection, try again
  80. continue
  81. }
  82. c.put(co)
  83. return nil, err
  84. }
  85. c.put(co)
  86. return r, nil
  87. }
  88. return nil, err
  89. }
  90. func (c *Client) Monitor(respChan chan interface{}, stopChan chan struct{}) error {
  91. var co *Conn
  92. var err error
  93. co, err = c.get()
  94. if err != nil {
  95. return err
  96. }
  97. if err := co.Send("MONITOR"); err != nil {
  98. return err
  99. }
  100. go func() {
  101. defer func() {
  102. c.put(co)
  103. }()
  104. for {
  105. resp, err := co.Receive()
  106. if err != nil {
  107. if e, ok := err.(*net.OpError); ok && strings.Contains(e.Error(), "use of closed network connection") || err == io.EOF {
  108. //the server may has closed the connection
  109. stopChan <- struct{}{}
  110. return
  111. }
  112. respChan <- err
  113. }
  114. respChan <- resp
  115. }
  116. }()
  117. return nil
  118. }
  119. func (c *Client) Close() {
  120. c.Lock()
  121. defer c.Unlock()
  122. close(c.quit)
  123. c.wg.Wait()
  124. for c.conns.Len() > 0 {
  125. e := c.conns.Front()
  126. co := e.Value.(*Conn)
  127. c.conns.Remove(e)
  128. co.Close()
  129. }
  130. }
  131. func (c *Client) Get() (*PoolConn, error) {
  132. co, err := c.get()
  133. if err != nil {
  134. return nil, err
  135. }
  136. return &PoolConn{co, c}, err
  137. }
  138. func (c *Client) get() (co *Conn, err error) {
  139. c.Lock()
  140. if c.conns.Len() == 0 {
  141. c.Unlock()
  142. co, err = c.newConn(c.addr, c.password)
  143. } else {
  144. e := c.conns.Front()
  145. co = e.Value.(*Conn)
  146. c.conns.Remove(e)
  147. c.Unlock()
  148. }
  149. return
  150. }
  151. func (c *Client) put(conn *Conn) {
  152. c.Lock()
  153. defer c.Unlock()
  154. for c.conns.Len() >= c.maxIdleConns {
  155. // remove back
  156. e := c.conns.Back()
  157. co := e.Value.(*Conn)
  158. c.conns.Remove(e)
  159. co.Close()
  160. }
  161. c.conns.PushFront(conn)
  162. }
  163. func (c *Client) getIdle() *Conn {
  164. c.Lock()
  165. defer c.Unlock()
  166. if c.conns.Len() == 0 {
  167. return nil
  168. } else {
  169. e := c.conns.Back()
  170. co := e.Value.(*Conn)
  171. c.conns.Remove(e)
  172. return co
  173. }
  174. }
  175. func (c *Client) checkIdle() {
  176. co := c.getIdle()
  177. if co == nil {
  178. return
  179. }
  180. _, err := co.Do("PING")
  181. if err != nil {
  182. co.Close()
  183. } else {
  184. c.put(co)
  185. }
  186. }
  187. func (c *Client) onCheck() {
  188. t := time.NewTicker(3 * time.Second)
  189. defer func() {
  190. t.Stop()
  191. c.wg.Done()
  192. }()
  193. for {
  194. select {
  195. case <-t.C:
  196. c.checkIdle()
  197. case <-c.quit:
  198. return
  199. }
  200. }
  201. }