You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

228 lines
6.1 KiB

  1. // Copyright 2012, Google Inc. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Package pools provides functionality to manage and reuse resources
  5. // like connections.
  6. package pools
  7. import (
  8. "fmt"
  9. "time"
  10. "github.com/ngaut/sync2"
  11. )
  12. var (
  13. CLOSED_ERR = fmt.Errorf("ResourcePool is closed")
  14. )
  15. // Factory is a function that can be used to create a resource.
  16. type Factory func() (Resource, error)
  17. // Every resource needs to suport the Resource interface.
  18. // Thread synchronization between Close() and IsClosed()
  19. // is the responsibility the caller.
  20. type Resource interface {
  21. Close()
  22. }
  23. // ResourcePool allows you to use a pool of resources.
  24. type ResourcePool struct {
  25. resources chan resourceWrapper
  26. factory Factory
  27. capacity sync2.AtomicInt64
  28. idleTimeout sync2.AtomicDuration
  29. // stats
  30. waitCount sync2.AtomicInt64
  31. waitTime sync2.AtomicDuration
  32. }
  33. type resourceWrapper struct {
  34. resource Resource
  35. timeUsed time.Time
  36. }
  37. // NewResourcePool creates a new ResourcePool pool.
  38. // capacity is the initial capacity of the pool.
  39. // maxCap is the maximum capacity.
  40. // If a resource is unused beyond idleTimeout, it's discarded.
  41. // An idleTimeout of 0 means that there is no timeout.
  42. func NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Duration) *ResourcePool {
  43. if capacity <= 0 || maxCap <= 0 || capacity > maxCap {
  44. panic(fmt.Errorf("Invalid/out of range capacity"))
  45. }
  46. rp := &ResourcePool{
  47. resources: make(chan resourceWrapper, maxCap),
  48. factory: factory,
  49. capacity: sync2.AtomicInt64(capacity),
  50. idleTimeout: sync2.AtomicDuration(idleTimeout),
  51. }
  52. for i := 0; i < capacity; i++ {
  53. rp.resources <- resourceWrapper{}
  54. }
  55. return rp
  56. }
  57. // Close empties the pool calling Close on all its resources.
  58. // You can call Close while there are outstanding resources.
  59. // It waits for all resources to be returned (Put).
  60. // After a Close, Get and TryGet are not allowed.
  61. func (rp *ResourcePool) Close() {
  62. rp.SetCapacity(0)
  63. }
  64. func (rp *ResourcePool) IsClosed() (closed bool) {
  65. return rp.capacity.Get() == 0
  66. }
  67. // Get will return the next available resource. If capacity
  68. // has not been reached, it will create a new one using the factory. Otherwise,
  69. // it will indefinitely wait till the next resource becomes available.
  70. func (rp *ResourcePool) Get() (resource Resource, err error) {
  71. return rp.get(true)
  72. }
  73. // TryGet will return the next available resource. If none is available, and capacity
  74. // has not been reached, it will create a new one using the factory. Otherwise,
  75. // it will return nil with no error.
  76. func (rp *ResourcePool) TryGet() (resource Resource, err error) {
  77. return rp.get(false)
  78. }
  79. func (rp *ResourcePool) get(wait bool) (resource Resource, err error) {
  80. // Fetch
  81. var wrapper resourceWrapper
  82. var ok bool
  83. select {
  84. case wrapper, ok = <-rp.resources:
  85. default:
  86. if !wait {
  87. return nil, nil
  88. }
  89. startTime := time.Now()
  90. wrapper, ok = <-rp.resources
  91. rp.recordWait(startTime)
  92. }
  93. if !ok {
  94. return nil, CLOSED_ERR
  95. }
  96. // Unwrap
  97. timeout := rp.idleTimeout.Get()
  98. if wrapper.resource != nil && timeout > 0 && wrapper.timeUsed.Add(timeout).Sub(time.Now()) < 0 {
  99. wrapper.resource.Close()
  100. wrapper.resource = nil
  101. }
  102. if wrapper.resource == nil {
  103. wrapper.resource, err = rp.factory()
  104. if err != nil {
  105. rp.resources <- resourceWrapper{}
  106. }
  107. }
  108. return wrapper.resource, err
  109. }
  110. // Put will return a resource to the pool. For every successful Get,
  111. // a corresponding Put is required. If you no longer need a resource,
  112. // you will need to call Put(nil) instead of returning the closed resource.
  113. // The will eventually cause a new resource to be created in its place.
  114. func (rp *ResourcePool) Put(resource Resource) {
  115. var wrapper resourceWrapper
  116. if resource != nil {
  117. wrapper = resourceWrapper{resource, time.Now()}
  118. }
  119. select {
  120. case rp.resources <- wrapper:
  121. default:
  122. panic(fmt.Errorf("Attempt to Put into a full ResourcePool"))
  123. }
  124. }
  125. // SetCapacity changes the capacity of the pool.
  126. // You can use it to shrink or expand, but not beyond
  127. // the max capacity. If the change requires the pool
  128. // to be shrunk, SetCapacity waits till the necessary
  129. // number of resources are returned to the pool.
  130. // A SetCapacity of 0 is equivalent to closing the ResourcePool.
  131. func (rp *ResourcePool) SetCapacity(capacity int) error {
  132. if capacity < 0 || capacity > cap(rp.resources) {
  133. return fmt.Errorf("capacity %d is out of range", capacity)
  134. }
  135. // Atomically swap new capacity with old, but only
  136. // if old capacity is non-zero.
  137. var oldcap int
  138. for {
  139. oldcap = int(rp.capacity.Get())
  140. if oldcap == 0 {
  141. return CLOSED_ERR
  142. }
  143. if oldcap == capacity {
  144. return nil
  145. }
  146. if rp.capacity.CompareAndSwap(int64(oldcap), int64(capacity)) {
  147. break
  148. }
  149. }
  150. if capacity < oldcap {
  151. for i := 0; i < oldcap-capacity; i++ {
  152. wrapper := <-rp.resources
  153. if wrapper.resource != nil {
  154. wrapper.resource.Close()
  155. }
  156. }
  157. } else {
  158. for i := 0; i < capacity-oldcap; i++ {
  159. rp.resources <- resourceWrapper{}
  160. }
  161. }
  162. if capacity == 0 {
  163. close(rp.resources)
  164. }
  165. return nil
  166. }
  167. func (rp *ResourcePool) recordWait(start time.Time) {
  168. rp.waitCount.Add(1)
  169. rp.waitTime.Add(time.Now().Sub(start))
  170. }
  171. func (rp *ResourcePool) SetIdleTimeout(idleTimeout time.Duration) {
  172. rp.idleTimeout.Set(idleTimeout)
  173. }
  174. func (rp *ResourcePool) StatsJSON() string {
  175. c, a, mx, wc, wt, it := rp.Stats()
  176. return fmt.Sprintf(`{"Capacity": %v, "Available": %v, "MaxCapacity": %v, "WaitCount": %v, "WaitTime": %v, "IdleTimeout": %v}`, c, a, mx, wc, int64(wt), int64(it))
  177. }
  178. func (rp *ResourcePool) Stats() (capacity, available, maxCap, waitCount int64, waitTime, idleTimeout time.Duration) {
  179. return rp.Capacity(), rp.Available(), rp.MaxCap(), rp.WaitCount(), rp.WaitTime(), rp.IdleTimeout()
  180. }
  181. func (rp *ResourcePool) Capacity() int64 {
  182. return rp.capacity.Get()
  183. }
  184. func (rp *ResourcePool) Available() int64 {
  185. return int64(len(rp.resources))
  186. }
  187. func (rp *ResourcePool) MaxCap() int64 {
  188. return int64(cap(rp.resources))
  189. }
  190. func (rp *ResourcePool) WaitCount() int64 {
  191. return rp.waitCount.Get()
  192. }
  193. func (rp *ResourcePool) WaitTime() time.Duration {
  194. return rp.waitTime.Get()
  195. }
  196. func (rp *ResourcePool) IdleTimeout() time.Duration {
  197. return rp.idleTimeout.Get()
  198. }