You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

405 lines
6.6 KiB

  1. package redis
  2. import (
  3. "container/list"
  4. "errors"
  5. "log"
  6. "net"
  7. "sync"
  8. "time"
  9. "gopkg.in/bufio.v1"
  10. )
  11. var (
  12. errClosed = errors.New("redis: client is closed")
  13. errRateLimited = errors.New("redis: you open connections too fast")
  14. )
  15. var (
  16. zeroTime = time.Time{}
  17. )
  18. type pool interface {
  19. Get() (*conn, bool, error)
  20. Put(*conn) error
  21. Remove(*conn) error
  22. Len() int
  23. Size() int
  24. Close() error
  25. Filter(func(*conn) bool)
  26. }
  27. //------------------------------------------------------------------------------
  28. type conn struct {
  29. netcn net.Conn
  30. rd *bufio.Reader
  31. buf []byte
  32. inUse bool
  33. usedAt time.Time
  34. readTimeout time.Duration
  35. writeTimeout time.Duration
  36. elem *list.Element
  37. }
  38. func newConnFunc(dial func() (net.Conn, error)) func() (*conn, error) {
  39. return func() (*conn, error) {
  40. netcn, err := dial()
  41. if err != nil {
  42. return nil, err
  43. }
  44. cn := &conn{
  45. netcn: netcn,
  46. buf: make([]byte, 0, 64),
  47. }
  48. cn.rd = bufio.NewReader(cn)
  49. return cn, nil
  50. }
  51. }
  52. func (cn *conn) Read(b []byte) (int, error) {
  53. if cn.readTimeout != 0 {
  54. cn.netcn.SetReadDeadline(time.Now().Add(cn.readTimeout))
  55. } else {
  56. cn.netcn.SetReadDeadline(zeroTime)
  57. }
  58. return cn.netcn.Read(b)
  59. }
  60. func (cn *conn) Write(b []byte) (int, error) {
  61. if cn.writeTimeout != 0 {
  62. cn.netcn.SetWriteDeadline(time.Now().Add(cn.writeTimeout))
  63. } else {
  64. cn.netcn.SetWriteDeadline(zeroTime)
  65. }
  66. return cn.netcn.Write(b)
  67. }
  68. func (cn *conn) RemoteAddr() net.Addr {
  69. return cn.netcn.RemoteAddr()
  70. }
  71. func (cn *conn) Close() error {
  72. return cn.netcn.Close()
  73. }
  74. //------------------------------------------------------------------------------
  75. type connPool struct {
  76. dial func() (*conn, error)
  77. rl *rateLimiter
  78. opt *options
  79. cond *sync.Cond
  80. conns *list.List
  81. idleNum int
  82. closed bool
  83. }
  84. func newConnPool(dial func() (*conn, error), opt *options) *connPool {
  85. return &connPool{
  86. dial: dial,
  87. rl: newRateLimiter(time.Second, 2*opt.PoolSize),
  88. opt: opt,
  89. cond: sync.NewCond(&sync.Mutex{}),
  90. conns: list.New(),
  91. }
  92. }
  93. func (p *connPool) new() (*conn, error) {
  94. if !p.rl.Check() {
  95. return nil, errRateLimited
  96. }
  97. return p.dial()
  98. }
  99. func (p *connPool) Get() (*conn, bool, error) {
  100. p.cond.L.Lock()
  101. if p.closed {
  102. p.cond.L.Unlock()
  103. return nil, false, errClosed
  104. }
  105. if p.opt.IdleTimeout > 0 {
  106. for el := p.conns.Front(); el != nil; el = el.Next() {
  107. cn := el.Value.(*conn)
  108. if cn.inUse {
  109. break
  110. }
  111. if time.Since(cn.usedAt) > p.opt.IdleTimeout {
  112. if err := p.remove(cn); err != nil {
  113. log.Printf("remove failed: %s", err)
  114. }
  115. }
  116. }
  117. }
  118. for p.conns.Len() >= p.opt.PoolSize && p.idleNum == 0 {
  119. p.cond.Wait()
  120. }
  121. if p.idleNum > 0 {
  122. elem := p.conns.Front()
  123. cn := elem.Value.(*conn)
  124. if cn.inUse {
  125. panic("pool: precondition failed")
  126. }
  127. cn.inUse = true
  128. p.conns.MoveToBack(elem)
  129. p.idleNum--
  130. p.cond.L.Unlock()
  131. return cn, false, nil
  132. }
  133. if p.conns.Len() < p.opt.PoolSize {
  134. cn, err := p.new()
  135. if err != nil {
  136. p.cond.L.Unlock()
  137. return nil, false, err
  138. }
  139. cn.inUse = true
  140. cn.elem = p.conns.PushBack(cn)
  141. p.cond.L.Unlock()
  142. return cn, true, nil
  143. }
  144. panic("not reached")
  145. }
  146. func (p *connPool) Put(cn *conn) error {
  147. if cn.rd.Buffered() != 0 {
  148. b, _ := cn.rd.ReadN(cn.rd.Buffered())
  149. log.Printf("redis: connection has unread data: %q", b)
  150. return p.Remove(cn)
  151. }
  152. if p.opt.IdleTimeout > 0 {
  153. cn.usedAt = time.Now()
  154. }
  155. p.cond.L.Lock()
  156. if p.closed {
  157. p.cond.L.Unlock()
  158. return errClosed
  159. }
  160. cn.inUse = false
  161. p.conns.MoveToFront(cn.elem)
  162. p.idleNum++
  163. p.cond.Signal()
  164. p.cond.L.Unlock()
  165. return nil
  166. }
  167. func (p *connPool) Remove(cn *conn) error {
  168. p.cond.L.Lock()
  169. if p.closed {
  170. // Noop, connection is already closed.
  171. p.cond.L.Unlock()
  172. return nil
  173. }
  174. err := p.remove(cn)
  175. p.cond.Signal()
  176. p.cond.L.Unlock()
  177. return err
  178. }
  179. func (p *connPool) remove(cn *conn) error {
  180. p.conns.Remove(cn.elem)
  181. cn.elem = nil
  182. if !cn.inUse {
  183. p.idleNum--
  184. }
  185. return cn.Close()
  186. }
  187. // Len returns number of idle connections.
  188. func (p *connPool) Len() int {
  189. defer p.cond.L.Unlock()
  190. p.cond.L.Lock()
  191. return p.idleNum
  192. }
  193. // Size returns number of connections in the pool.
  194. func (p *connPool) Size() int {
  195. defer p.cond.L.Unlock()
  196. p.cond.L.Lock()
  197. return p.conns.Len()
  198. }
  199. func (p *connPool) Filter(f func(*conn) bool) {
  200. p.cond.L.Lock()
  201. for el, next := p.conns.Front(), p.conns.Front(); el != nil; el = next {
  202. next = el.Next()
  203. cn := el.Value.(*conn)
  204. if !f(cn) {
  205. p.remove(cn)
  206. }
  207. }
  208. p.cond.L.Unlock()
  209. }
  210. func (p *connPool) Close() error {
  211. defer p.cond.L.Unlock()
  212. p.cond.L.Lock()
  213. if p.closed {
  214. return nil
  215. }
  216. p.closed = true
  217. p.rl.Close()
  218. var retErr error
  219. for {
  220. e := p.conns.Front()
  221. if e == nil {
  222. break
  223. }
  224. if err := p.remove(e.Value.(*conn)); err != nil {
  225. log.Printf("cn.Close failed: %s", err)
  226. retErr = err
  227. }
  228. }
  229. return retErr
  230. }
  231. //------------------------------------------------------------------------------
  232. type singleConnPool struct {
  233. pool pool
  234. cnMtx sync.Mutex
  235. cn *conn
  236. reusable bool
  237. closed bool
  238. }
  239. func newSingleConnPool(pool pool, reusable bool) *singleConnPool {
  240. return &singleConnPool{
  241. pool: pool,
  242. reusable: reusable,
  243. }
  244. }
  245. func (p *singleConnPool) SetConn(cn *conn) {
  246. p.cnMtx.Lock()
  247. p.cn = cn
  248. p.cnMtx.Unlock()
  249. }
  250. func (p *singleConnPool) Get() (*conn, bool, error) {
  251. defer p.cnMtx.Unlock()
  252. p.cnMtx.Lock()
  253. if p.closed {
  254. return nil, false, errClosed
  255. }
  256. if p.cn != nil {
  257. return p.cn, false, nil
  258. }
  259. cn, isNew, err := p.pool.Get()
  260. if err != nil {
  261. return nil, false, err
  262. }
  263. p.cn = cn
  264. return p.cn, isNew, nil
  265. }
  266. func (p *singleConnPool) Put(cn *conn) error {
  267. defer p.cnMtx.Unlock()
  268. p.cnMtx.Lock()
  269. if p.cn != cn {
  270. panic("p.cn != cn")
  271. }
  272. if p.closed {
  273. return errClosed
  274. }
  275. return nil
  276. }
  277. func (p *singleConnPool) put() error {
  278. err := p.pool.Put(p.cn)
  279. p.cn = nil
  280. return err
  281. }
  282. func (p *singleConnPool) Remove(cn *conn) error {
  283. defer p.cnMtx.Unlock()
  284. p.cnMtx.Lock()
  285. if p.cn == nil {
  286. panic("p.cn == nil")
  287. }
  288. if p.cn != cn {
  289. panic("p.cn != cn")
  290. }
  291. if p.closed {
  292. return errClosed
  293. }
  294. return p.remove()
  295. }
  296. func (p *singleConnPool) remove() error {
  297. err := p.pool.Remove(p.cn)
  298. p.cn = nil
  299. return err
  300. }
  301. func (p *singleConnPool) Len() int {
  302. defer p.cnMtx.Unlock()
  303. p.cnMtx.Lock()
  304. if p.cn == nil {
  305. return 0
  306. }
  307. return 1
  308. }
  309. func (p *singleConnPool) Size() int {
  310. defer p.cnMtx.Unlock()
  311. p.cnMtx.Lock()
  312. if p.cn == nil {
  313. return 0
  314. }
  315. return 1
  316. }
  317. func (p *singleConnPool) Filter(f func(*conn) bool) {
  318. p.cnMtx.Lock()
  319. if p.cn != nil {
  320. if !f(p.cn) {
  321. p.remove()
  322. }
  323. }
  324. p.cnMtx.Unlock()
  325. }
  326. func (p *singleConnPool) Close() error {
  327. defer p.cnMtx.Unlock()
  328. p.cnMtx.Lock()
  329. if p.closed {
  330. return nil
  331. }
  332. p.closed = true
  333. var err error
  334. if p.cn != nil {
  335. if p.reusable {
  336. err = p.put()
  337. } else {
  338. err = p.remove()
  339. }
  340. }
  341. return err
  342. }