You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

143 lines
3.4 KiB

  1. package couchbase
  2. import (
  3. "github.com/couchbase/gomemcached/client"
  4. "github.com/couchbase/goutils/logging"
  5. "sync"
  6. "time"
  7. )
  8. const initialRetryInterval = 1 * time.Second
  9. const maximumRetryInterval = 30 * time.Second
  10. // A TapFeed streams mutation events from a bucket.
  11. //
  12. // Events from the bucket can be read from the channel 'C'. Remember
  13. // to call Close() on it when you're done, unless its channel has
  14. // closed itself already.
  15. type TapFeed struct {
  16. C <-chan memcached.TapEvent
  17. bucket *Bucket
  18. args *memcached.TapArguments
  19. nodeFeeds []*memcached.TapFeed // The TAP feeds of the individual nodes
  20. output chan memcached.TapEvent // Same as C but writeably-typed
  21. wg sync.WaitGroup
  22. quit chan bool
  23. }
  24. // StartTapFeed creates and starts a new Tap feed
  25. func (b *Bucket) StartTapFeed(args *memcached.TapArguments) (*TapFeed, error) {
  26. if args == nil {
  27. defaultArgs := memcached.DefaultTapArguments()
  28. args = &defaultArgs
  29. }
  30. feed := &TapFeed{
  31. bucket: b,
  32. args: args,
  33. output: make(chan memcached.TapEvent, 10),
  34. quit: make(chan bool),
  35. }
  36. go feed.run()
  37. feed.C = feed.output
  38. return feed, nil
  39. }
  40. // Goroutine that runs the feed
  41. func (feed *TapFeed) run() {
  42. retryInterval := initialRetryInterval
  43. bucketOK := true
  44. for {
  45. // Connect to the TAP feed of each server node:
  46. if bucketOK {
  47. killSwitch, err := feed.connectToNodes()
  48. if err == nil {
  49. // Run until one of the sub-feeds fails:
  50. select {
  51. case <-killSwitch:
  52. case <-feed.quit:
  53. return
  54. }
  55. feed.closeNodeFeeds()
  56. retryInterval = initialRetryInterval
  57. }
  58. }
  59. // On error, try to refresh the bucket in case the list of nodes changed:
  60. logging.Infof("go-couchbase: TAP connection lost; reconnecting to bucket %q in %v",
  61. feed.bucket.Name, retryInterval)
  62. err := feed.bucket.Refresh()
  63. bucketOK = err == nil
  64. select {
  65. case <-time.After(retryInterval):
  66. case <-feed.quit:
  67. return
  68. }
  69. if retryInterval *= 2; retryInterval > maximumRetryInterval {
  70. retryInterval = maximumRetryInterval
  71. }
  72. }
  73. }
  74. func (feed *TapFeed) connectToNodes() (killSwitch chan bool, err error) {
  75. killSwitch = make(chan bool)
  76. for _, serverConn := range feed.bucket.getConnPools(false /* not already locked */) {
  77. var singleFeed *memcached.TapFeed
  78. singleFeed, err = serverConn.StartTapFeed(feed.args)
  79. if err != nil {
  80. logging.Errorf("go-couchbase: Error connecting to tap feed of %s: %v", serverConn.host, err)
  81. feed.closeNodeFeeds()
  82. return
  83. }
  84. feed.nodeFeeds = append(feed.nodeFeeds, singleFeed)
  85. go feed.forwardTapEvents(singleFeed, killSwitch, serverConn.host)
  86. feed.wg.Add(1)
  87. }
  88. return
  89. }
  90. // Goroutine that forwards Tap events from a single node's feed to the aggregate feed.
  91. func (feed *TapFeed) forwardTapEvents(singleFeed *memcached.TapFeed, killSwitch chan bool, host string) {
  92. defer feed.wg.Done()
  93. for {
  94. select {
  95. case event, ok := <-singleFeed.C:
  96. if !ok {
  97. if singleFeed.Error != nil {
  98. logging.Errorf("go-couchbase: Tap feed from %s failed: %v", host, singleFeed.Error)
  99. }
  100. killSwitch <- true
  101. return
  102. }
  103. feed.output <- event
  104. case <-feed.quit:
  105. return
  106. }
  107. }
  108. }
  109. func (feed *TapFeed) closeNodeFeeds() {
  110. for _, f := range feed.nodeFeeds {
  111. f.Close()
  112. }
  113. feed.nodeFeeds = nil
  114. }
  115. // Close a Tap feed.
  116. func (feed *TapFeed) Close() error {
  117. select {
  118. case <-feed.quit:
  119. return nil
  120. default:
  121. }
  122. feed.closeNodeFeeds()
  123. close(feed.quit)
  124. feed.wg.Wait()
  125. close(feed.output)
  126. return nil
  127. }