You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

291 lines
6.4 KiB

  1. package redis
  2. import (
  3. "errors"
  4. "log"
  5. "net"
  6. "strings"
  7. "sync"
  8. "time"
  9. )
  10. //------------------------------------------------------------------------------
  11. type FailoverOptions struct {
  12. MasterName string
  13. SentinelAddrs []string
  14. Password string
  15. DB int64
  16. PoolSize int
  17. DialTimeout time.Duration
  18. ReadTimeout time.Duration
  19. WriteTimeout time.Duration
  20. IdleTimeout time.Duration
  21. }
  22. func (opt *FailoverOptions) getPoolSize() int {
  23. if opt.PoolSize == 0 {
  24. return 10
  25. }
  26. return opt.PoolSize
  27. }
  28. func (opt *FailoverOptions) getDialTimeout() time.Duration {
  29. if opt.DialTimeout == 0 {
  30. return 5 * time.Second
  31. }
  32. return opt.DialTimeout
  33. }
  34. func (opt *FailoverOptions) options() *options {
  35. return &options{
  36. DB: opt.DB,
  37. Password: opt.Password,
  38. DialTimeout: opt.getDialTimeout(),
  39. ReadTimeout: opt.ReadTimeout,
  40. WriteTimeout: opt.WriteTimeout,
  41. PoolSize: opt.getPoolSize(),
  42. IdleTimeout: opt.IdleTimeout,
  43. }
  44. }
  45. func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
  46. opt := failoverOpt.options()
  47. failover := &sentinelFailover{
  48. masterName: failoverOpt.MasterName,
  49. sentinelAddrs: failoverOpt.SentinelAddrs,
  50. opt: opt,
  51. }
  52. return &Client{
  53. baseClient: &baseClient{
  54. opt: opt,
  55. connPool: failover.Pool(),
  56. },
  57. }
  58. }
  59. //------------------------------------------------------------------------------
  60. type sentinelClient struct {
  61. *baseClient
  62. }
  63. func newSentinel(clOpt *Options) *sentinelClient {
  64. opt := clOpt.options()
  65. opt.Password = ""
  66. opt.DB = 0
  67. dialer := func() (net.Conn, error) {
  68. return net.DialTimeout("tcp", clOpt.Addr, opt.DialTimeout)
  69. }
  70. return &sentinelClient{
  71. baseClient: &baseClient{
  72. opt: opt,
  73. connPool: newConnPool(newConnFunc(dialer), opt),
  74. },
  75. }
  76. }
  77. func (c *sentinelClient) PubSub() *PubSub {
  78. return &PubSub{
  79. baseClient: &baseClient{
  80. opt: c.opt,
  81. connPool: newSingleConnPool(c.connPool, false),
  82. },
  83. }
  84. }
  85. func (c *sentinelClient) GetMasterAddrByName(name string) *StringSliceCmd {
  86. cmd := NewStringSliceCmd("SENTINEL", "get-master-addr-by-name", name)
  87. c.Process(cmd)
  88. return cmd
  89. }
  90. func (c *sentinelClient) Sentinels(name string) *SliceCmd {
  91. cmd := NewSliceCmd("SENTINEL", "sentinels", name)
  92. c.Process(cmd)
  93. return cmd
  94. }
  95. type sentinelFailover struct {
  96. masterName string
  97. sentinelAddrs []string
  98. opt *options
  99. pool pool
  100. poolOnce sync.Once
  101. lock sync.RWMutex
  102. _sentinel *sentinelClient
  103. }
  104. func (d *sentinelFailover) dial() (net.Conn, error) {
  105. addr, err := d.MasterAddr()
  106. if err != nil {
  107. return nil, err
  108. }
  109. return net.DialTimeout("tcp", addr, d.opt.DialTimeout)
  110. }
  111. func (d *sentinelFailover) Pool() pool {
  112. d.poolOnce.Do(func() {
  113. d.pool = newConnPool(newConnFunc(d.dial), d.opt)
  114. })
  115. return d.pool
  116. }
  117. func (d *sentinelFailover) MasterAddr() (string, error) {
  118. defer d.lock.Unlock()
  119. d.lock.Lock()
  120. // Try last working sentinel.
  121. if d._sentinel != nil {
  122. addr, err := d._sentinel.GetMasterAddrByName(d.masterName).Result()
  123. if err != nil {
  124. log.Printf("redis-sentinel: GetMasterAddrByName %q failed: %s", d.masterName, err)
  125. d.resetSentinel()
  126. } else {
  127. addr := net.JoinHostPort(addr[0], addr[1])
  128. log.Printf("redis-sentinel: %q addr is %s", d.masterName, addr)
  129. return addr, nil
  130. }
  131. }
  132. for i, sentinelAddr := range d.sentinelAddrs {
  133. sentinel := newSentinel(&Options{
  134. Addr: sentinelAddr,
  135. DB: d.opt.DB,
  136. Password: d.opt.Password,
  137. DialTimeout: d.opt.DialTimeout,
  138. ReadTimeout: d.opt.ReadTimeout,
  139. WriteTimeout: d.opt.WriteTimeout,
  140. PoolSize: d.opt.PoolSize,
  141. IdleTimeout: d.opt.IdleTimeout,
  142. })
  143. masterAddr, err := sentinel.GetMasterAddrByName(d.masterName).Result()
  144. if err != nil {
  145. log.Printf("redis-sentinel: GetMasterAddrByName %q failed: %s", d.masterName, err)
  146. sentinel.Close()
  147. continue
  148. }
  149. // Push working sentinel to the top.
  150. d.sentinelAddrs[0], d.sentinelAddrs[i] = d.sentinelAddrs[i], d.sentinelAddrs[0]
  151. d.setSentinel(sentinel)
  152. addr := net.JoinHostPort(masterAddr[0], masterAddr[1])
  153. log.Printf("redis-sentinel: %q addr is %s", d.masterName, addr)
  154. return addr, nil
  155. }
  156. return "", errors.New("redis: all sentinels are unreachable")
  157. }
  158. func (d *sentinelFailover) setSentinel(sentinel *sentinelClient) {
  159. d.discoverSentinels(sentinel)
  160. d._sentinel = sentinel
  161. go d.listen()
  162. }
  163. func (d *sentinelFailover) discoverSentinels(sentinel *sentinelClient) {
  164. sentinels, err := sentinel.Sentinels(d.masterName).Result()
  165. if err != nil {
  166. log.Printf("redis-sentinel: Sentinels %q failed: %s", d.masterName, err)
  167. return
  168. }
  169. for _, sentinel := range sentinels {
  170. vals := sentinel.([]interface{})
  171. for i := 0; i < len(vals); i += 2 {
  172. key := vals[i].(string)
  173. if key == "name" {
  174. sentinelAddr := vals[i+1].(string)
  175. if !contains(d.sentinelAddrs, sentinelAddr) {
  176. log.Printf(
  177. "redis-sentinel: discovered new %q sentinel: %s",
  178. d.masterName, sentinelAddr,
  179. )
  180. d.sentinelAddrs = append(d.sentinelAddrs, sentinelAddr)
  181. }
  182. }
  183. }
  184. }
  185. }
  186. func (d *sentinelFailover) listen() {
  187. var pubsub *PubSub
  188. for {
  189. if pubsub == nil {
  190. pubsub = d._sentinel.PubSub()
  191. if err := pubsub.Subscribe("+switch-master"); err != nil {
  192. log.Printf("redis-sentinel: Subscribe failed: %s", err)
  193. d.lock.Lock()
  194. d.resetSentinel()
  195. d.lock.Unlock()
  196. return
  197. }
  198. }
  199. msgIface, err := pubsub.Receive()
  200. if err != nil {
  201. log.Printf("redis-sentinel: Receive failed: %s", err)
  202. pubsub.Close()
  203. return
  204. }
  205. switch msg := msgIface.(type) {
  206. case *Message:
  207. switch msg.Channel {
  208. case "+switch-master":
  209. parts := strings.Split(msg.Payload, " ")
  210. if parts[0] != d.masterName {
  211. log.Printf("redis-sentinel: ignore new %s addr", parts[0])
  212. continue
  213. }
  214. addr := net.JoinHostPort(parts[3], parts[4])
  215. log.Printf(
  216. "redis-sentinel: new %q addr is %s",
  217. d.masterName, addr,
  218. )
  219. d.pool.Filter(func(cn *conn) bool {
  220. if cn.RemoteAddr().String() != addr {
  221. log.Printf(
  222. "redis-sentinel: closing connection to old master %s",
  223. cn.RemoteAddr(),
  224. )
  225. return false
  226. }
  227. return true
  228. })
  229. default:
  230. log.Printf("redis-sentinel: unsupported message: %s", msg)
  231. }
  232. case *Subscription:
  233. // Ignore.
  234. default:
  235. log.Printf("redis-sentinel: unsupported message: %s", msgIface)
  236. }
  237. }
  238. }
  239. func (d *sentinelFailover) resetSentinel() {
  240. d._sentinel.Close()
  241. d._sentinel = nil
  242. }
  243. func contains(slice []string, str string) bool {
  244. for _, s := range slice {
  245. if s == str {
  246. return true
  247. }
  248. }
  249. return false
  250. }