resp.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456
  1. package goredis
  2. import (
  3. "bufio"
  4. "bytes"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "strconv"
  9. )
  10. type Error string
  11. func (err Error) Error() string { return string(err) }
  12. var (
  13. okReply interface{} = "OK"
  14. pongReply interface{} = "PONG"
  15. )
  16. type RespReader struct {
  17. br *bufio.Reader
  18. }
  19. func NewRespReader(br *bufio.Reader) *RespReader {
  20. r := &RespReader{br}
  21. return r
  22. }
  23. // Parse RESP
  24. func (resp *RespReader) Parse() (interface{}, error) {
  25. line, err := readLine(resp.br)
  26. if err != nil {
  27. return nil, err
  28. }
  29. if len(line) == 0 {
  30. return nil, errors.New("short resp line")
  31. }
  32. switch line[0] {
  33. case '+':
  34. switch {
  35. case len(line) == 3 && line[1] == 'O' && line[2] == 'K':
  36. // Avoid allocation for frequent "+OK" response.
  37. return okReply, nil
  38. case len(line) == 5 && line[1] == 'P' && line[2] == 'O' && line[3] == 'N' && line[4] == 'G':
  39. // Avoid allocation in PING command benchmarks :)
  40. return pongReply, nil
  41. default:
  42. return string(line[1:]), nil
  43. }
  44. case '-':
  45. return Error(string(line[1:])), nil
  46. case ':':
  47. n, err := parseInt(line[1:])
  48. return n, err
  49. case '$':
  50. n, err := parseLen(line[1:])
  51. if n < 0 || err != nil {
  52. return nil, err
  53. }
  54. p := make([]byte, n)
  55. _, err = io.ReadFull(resp.br, p)
  56. if err != nil {
  57. return nil, err
  58. }
  59. if line, err := readLine(resp.br); err != nil {
  60. return nil, err
  61. } else if len(line) != 0 {
  62. return nil, errors.New("bad bulk string format")
  63. }
  64. return p, nil
  65. case '*':
  66. n, err := parseLen(line[1:])
  67. if n < 0 || err != nil {
  68. return nil, err
  69. }
  70. r := make([]interface{}, n)
  71. for i := range r {
  72. r[i], err = resp.Parse()
  73. if err != nil {
  74. return nil, err
  75. }
  76. }
  77. return r, nil
  78. }
  79. return nil, errors.New("unexpected response line")
  80. }
  81. // Parse client -> server command request, must be array of bulk strings
  82. func (resp *RespReader) ParseRequest() ([][]byte, error) {
  83. line, err := readLine(resp.br)
  84. if err != nil {
  85. return nil, err
  86. }
  87. if len(line) == 0 {
  88. return nil, errors.New("short resp line")
  89. }
  90. switch line[0] {
  91. case '*':
  92. n, err := parseLen(line[1:])
  93. if n < 0 || err != nil {
  94. return nil, err
  95. }
  96. r := make([][]byte, n)
  97. for i := range r {
  98. r[i], err = parseBulk(resp.br)
  99. if err != nil {
  100. return nil, err
  101. }
  102. }
  103. return r, nil
  104. default:
  105. return nil, fmt.Errorf("not invalid array of bulk string type, but %c", line[0])
  106. }
  107. }
  108. // Parse bulk string and write it with writer w
  109. func (resp *RespReader) ParseBulkTo(w io.Writer) error {
  110. line, err := readLine(resp.br)
  111. if err != nil {
  112. return err
  113. }
  114. if len(line) == 0 {
  115. return errors.New("ledis: short response line")
  116. }
  117. switch line[0] {
  118. case '-':
  119. return Error(string(line[1:]))
  120. case '$':
  121. n, err := parseLen(line[1:])
  122. if n < 0 || err != nil {
  123. return err
  124. }
  125. var nn int64
  126. if nn, err = io.CopyN(w, resp.br, int64(n)); err != nil {
  127. return err
  128. } else if nn != int64(n) {
  129. return io.ErrShortWrite
  130. }
  131. if line, err := readLine(resp.br); err != nil {
  132. return err
  133. } else if len(line) != 0 {
  134. return errors.New("bad bulk string format")
  135. }
  136. return nil
  137. default:
  138. return fmt.Errorf("not invalid bulk string type, but %c", line[0])
  139. }
  140. }
  141. func readLine(br *bufio.Reader) ([]byte, error) {
  142. p, err := br.ReadSlice('\n')
  143. if err == bufio.ErrBufferFull {
  144. return nil, errors.New("long resp line")
  145. }
  146. if err != nil {
  147. return nil, err
  148. }
  149. i := len(p) - 2
  150. if i < 0 || p[i] != '\r' {
  151. return nil, errors.New("bad resp line terminator")
  152. }
  153. return p[:i], nil
  154. }
  155. // parseLen parses bulk string and array lengths.
  156. func parseLen(p []byte) (int, error) {
  157. if len(p) == 0 {
  158. return -1, errors.New("malformed length")
  159. }
  160. if p[0] == '-' && len(p) == 2 && p[1] == '1' {
  161. // handle $-1 and $-1 null replies.
  162. return -1, nil
  163. }
  164. var n int
  165. for _, b := range p {
  166. n *= 10
  167. if b < '0' || b > '9' {
  168. return -1, errors.New("illegal bytes in length")
  169. }
  170. n += int(b - '0')
  171. }
  172. return n, nil
  173. }
  174. // parseInt parses an integer reply.
  175. func parseInt(p []byte) (int64, error) {
  176. if len(p) == 0 {
  177. return 0, errors.New("malformed integer")
  178. }
  179. var negate bool
  180. if p[0] == '-' {
  181. negate = true
  182. p = p[1:]
  183. if len(p) == 0 {
  184. return 0, errors.New("malformed integer")
  185. }
  186. }
  187. var n int64
  188. for _, b := range p {
  189. n *= 10
  190. if b < '0' || b > '9' {
  191. return 0, errors.New("illegal bytes in length")
  192. }
  193. n += int64(b - '0')
  194. }
  195. if negate {
  196. n = -n
  197. }
  198. return n, nil
  199. }
  200. func parseBulk(br *bufio.Reader) ([]byte, error) {
  201. line, err := readLine(br)
  202. if err != nil {
  203. return nil, err
  204. }
  205. if len(line) == 0 {
  206. return nil, errors.New("short resp line")
  207. }
  208. switch line[0] {
  209. case '$':
  210. n, err := parseLen(line[1:])
  211. if n < 0 || err != nil {
  212. return nil, err
  213. }
  214. p := make([]byte, n)
  215. _, err = io.ReadFull(br, p)
  216. if err != nil {
  217. return nil, err
  218. }
  219. if line, err := readLine(br); err != nil {
  220. return nil, err
  221. } else if len(line) != 0 {
  222. return nil, errors.New("bad bulk string format")
  223. }
  224. return p, nil
  225. default:
  226. return nil, fmt.Errorf("not invalid bulk string type, but %c", line[0])
  227. }
  228. }
  229. var (
  230. intBuffer [][]byte
  231. respTerm = []byte("\r\n")
  232. nullBulk = []byte("-1")
  233. nullArray = []byte("-1")
  234. )
  235. func init() {
  236. cnt := 10000
  237. intBuffer = make([][]byte, cnt)
  238. for i := 0; i < cnt; i++ {
  239. intBuffer[i] = []byte(strconv.Itoa(i))
  240. }
  241. }
  242. type RespWriter struct {
  243. bw *bufio.Writer
  244. // Scratch space for formatting integers and floats.
  245. numScratch [40]byte
  246. }
  247. func NewRespWriter(bw *bufio.Writer) *RespWriter {
  248. r := &RespWriter{bw: bw}
  249. return r
  250. }
  251. func (resp *RespWriter) Flush() error {
  252. return resp.bw.Flush()
  253. }
  254. func (resp *RespWriter) writeTerm() error {
  255. _, err := resp.bw.Write(respTerm)
  256. return err
  257. }
  258. func (resp *RespWriter) writeInteger(n int64) error {
  259. var err error
  260. if n < int64(len(intBuffer)) {
  261. _, err = resp.bw.Write(intBuffer[n])
  262. } else {
  263. _, err = resp.bw.Write(strconv.AppendInt(nil, n, 10))
  264. }
  265. return err
  266. }
  267. func (resp *RespWriter) WriteInteger(n int64) error {
  268. resp.bw.WriteByte(':')
  269. resp.writeInteger(n)
  270. return resp.writeTerm()
  271. }
  272. func (resp *RespWriter) FlushInteger(n int64) error {
  273. resp.WriteInteger(n)
  274. return resp.Flush()
  275. }
  276. func (resp *RespWriter) WriteString(s string) error {
  277. resp.bw.WriteByte('+')
  278. resp.bw.WriteString(s)
  279. return resp.writeTerm()
  280. }
  281. func (resp *RespWriter) FlushString(s string) error {
  282. resp.WriteString(s)
  283. return resp.Flush()
  284. }
  285. func (resp *RespWriter) WriteError(e error) error {
  286. resp.bw.WriteByte('-')
  287. if e != nil {
  288. resp.bw.WriteString(e.Error())
  289. } else {
  290. resp.bw.WriteString("error is nil, invalid")
  291. }
  292. return resp.writeTerm()
  293. }
  294. func (resp *RespWriter) FlushError(e error) error {
  295. resp.WriteError(e)
  296. return resp.Flush()
  297. }
  298. func (resp *RespWriter) WriteBulk(b []byte) error {
  299. resp.bw.WriteByte('$')
  300. if b == nil {
  301. resp.bw.Write(nullBulk)
  302. } else {
  303. resp.writeInteger(int64(len(b)))
  304. resp.writeTerm()
  305. resp.bw.Write(b)
  306. }
  307. return resp.writeTerm()
  308. }
  309. func (resp *RespWriter) FlushBulk(b []byte) error {
  310. resp.WriteBulk(b)
  311. return resp.Flush()
  312. }
  313. func (resp *RespWriter) WriteArray(ay []interface{}) error {
  314. resp.bw.WriteByte('*')
  315. if ay == nil {
  316. resp.bw.Write(nullArray)
  317. return resp.writeTerm()
  318. } else {
  319. resp.writeInteger(int64(len(ay)))
  320. resp.writeTerm()
  321. var err error
  322. for i := 0; i < len(ay); i++ {
  323. if err != nil {
  324. return err
  325. }
  326. switch v := ay[i].(type) {
  327. case []interface{}:
  328. err = resp.WriteArray(v)
  329. case []byte:
  330. err = resp.WriteBulk(v)
  331. case nil:
  332. err = resp.WriteBulk(nil)
  333. case int64:
  334. err = resp.WriteInteger(v)
  335. case string:
  336. err = resp.WriteString(v)
  337. case error:
  338. err = resp.WriteError(v)
  339. default:
  340. err = fmt.Errorf("invalid array type %T %v", ay[i], v)
  341. }
  342. }
  343. return err
  344. }
  345. }
  346. func (resp *RespWriter) FlushArray(ay []interface{}) error {
  347. resp.WriteArray(ay)
  348. return resp.Flush()
  349. }
  350. func (resp *RespWriter) writeBulkString(s string) error {
  351. resp.bw.WriteByte('$')
  352. resp.writeInteger(int64(len(s)))
  353. resp.writeTerm()
  354. resp.bw.WriteString(s)
  355. return resp.writeTerm()
  356. }
  357. func (resp *RespWriter) writeBulkInt64(n int64) error {
  358. return resp.WriteBulk(strconv.AppendInt(resp.numScratch[:0], n, 10))
  359. }
  360. func (resp *RespWriter) writeBulkFloat64(n float64) error {
  361. return resp.WriteBulk(strconv.AppendFloat(resp.numScratch[:0], n, 'g', -1, 64))
  362. }
  363. // RESP command is array of bulk string
  364. func (resp *RespWriter) WriteCommand(cmd string, args ...interface{}) error {
  365. resp.bw.WriteByte('*')
  366. resp.writeInteger(int64(1 + len(args)))
  367. resp.writeTerm()
  368. err := resp.writeBulkString(cmd)
  369. for _, arg := range args {
  370. if err != nil {
  371. break
  372. }
  373. switch arg := arg.(type) {
  374. case string:
  375. err = resp.writeBulkString(arg)
  376. case []byte:
  377. err = resp.WriteBulk(arg)
  378. case int:
  379. err = resp.writeBulkInt64(int64(arg))
  380. case int64:
  381. err = resp.writeBulkInt64(arg)
  382. case float64:
  383. err = resp.writeBulkFloat64(arg)
  384. case bool:
  385. if arg {
  386. err = resp.writeBulkString("1")
  387. } else {
  388. err = resp.writeBulkString("0")
  389. }
  390. case nil:
  391. err = resp.writeBulkString("")
  392. default:
  393. var buf bytes.Buffer
  394. fmt.Fprint(&buf, arg)
  395. err = resp.WriteBulk(buf.Bytes())
  396. }
  397. }
  398. if err != nil {
  399. return err
  400. }
  401. return resp.Flush()
  402. }