You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

399 lines
8.6 KiB

  1. package redis
  2. import (
  3. "crypto/tls"
  4. "errors"
  5. "net"
  6. "strings"
  7. "sync"
  8. "time"
  9. "github.com/go-redis/redis/internal"
  10. "github.com/go-redis/redis/internal/pool"
  11. )
  12. //------------------------------------------------------------------------------
  13. // FailoverOptions are used to configure a failover client and should
  14. // be passed to NewFailoverClient.
  15. type FailoverOptions struct {
  16. // The master name.
  17. MasterName string
  18. // A seed list of host:port addresses of sentinel nodes.
  19. SentinelAddrs []string
  20. // Following options are copied from Options struct.
  21. OnConnect func(*Conn) error
  22. Password string
  23. DB int
  24. MaxRetries int
  25. MinRetryBackoff time.Duration
  26. MaxRetryBackoff time.Duration
  27. DialTimeout time.Duration
  28. ReadTimeout time.Duration
  29. WriteTimeout time.Duration
  30. PoolSize int
  31. MinIdleConns int
  32. MaxConnAge time.Duration
  33. PoolTimeout time.Duration
  34. IdleTimeout time.Duration
  35. IdleCheckFrequency time.Duration
  36. TLSConfig *tls.Config
  37. }
  38. func (opt *FailoverOptions) options() *Options {
  39. return &Options{
  40. Addr: "FailoverClient",
  41. OnConnect: opt.OnConnect,
  42. DB: opt.DB,
  43. Password: opt.Password,
  44. MaxRetries: opt.MaxRetries,
  45. DialTimeout: opt.DialTimeout,
  46. ReadTimeout: opt.ReadTimeout,
  47. WriteTimeout: opt.WriteTimeout,
  48. PoolSize: opt.PoolSize,
  49. PoolTimeout: opt.PoolTimeout,
  50. IdleTimeout: opt.IdleTimeout,
  51. IdleCheckFrequency: opt.IdleCheckFrequency,
  52. TLSConfig: opt.TLSConfig,
  53. }
  54. }
  55. // NewFailoverClient returns a Redis client that uses Redis Sentinel
  56. // for automatic failover. It's safe for concurrent use by multiple
  57. // goroutines.
  58. func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
  59. opt := failoverOpt.options()
  60. opt.init()
  61. failover := &sentinelFailover{
  62. masterName: failoverOpt.MasterName,
  63. sentinelAddrs: failoverOpt.SentinelAddrs,
  64. opt: opt,
  65. }
  66. c := Client{
  67. baseClient: baseClient{
  68. opt: opt,
  69. connPool: failover.Pool(),
  70. onClose: func() error {
  71. return failover.Close()
  72. },
  73. },
  74. }
  75. c.baseClient.init()
  76. c.cmdable.setProcessor(c.Process)
  77. return &c
  78. }
  79. //------------------------------------------------------------------------------
  80. type SentinelClient struct {
  81. baseClient
  82. }
  83. func NewSentinelClient(opt *Options) *SentinelClient {
  84. opt.init()
  85. c := &SentinelClient{
  86. baseClient: baseClient{
  87. opt: opt,
  88. connPool: newConnPool(opt),
  89. },
  90. }
  91. c.baseClient.init()
  92. return c
  93. }
  94. func (c *SentinelClient) pubSub() *PubSub {
  95. pubsub := &PubSub{
  96. opt: c.opt,
  97. newConn: func(channels []string) (*pool.Conn, error) {
  98. return c.newConn()
  99. },
  100. closeConn: c.connPool.CloseConn,
  101. }
  102. pubsub.init()
  103. return pubsub
  104. }
  105. // Subscribe subscribes the client to the specified channels.
  106. // Channels can be omitted to create empty subscription.
  107. func (c *SentinelClient) Subscribe(channels ...string) *PubSub {
  108. pubsub := c.pubSub()
  109. if len(channels) > 0 {
  110. _ = pubsub.Subscribe(channels...)
  111. }
  112. return pubsub
  113. }
  114. // PSubscribe subscribes the client to the given patterns.
  115. // Patterns can be omitted to create empty subscription.
  116. func (c *SentinelClient) PSubscribe(channels ...string) *PubSub {
  117. pubsub := c.pubSub()
  118. if len(channels) > 0 {
  119. _ = pubsub.PSubscribe(channels...)
  120. }
  121. return pubsub
  122. }
  123. func (c *SentinelClient) GetMasterAddrByName(name string) *StringSliceCmd {
  124. cmd := NewStringSliceCmd("sentinel", "get-master-addr-by-name", name)
  125. c.Process(cmd)
  126. return cmd
  127. }
  128. func (c *SentinelClient) Sentinels(name string) *SliceCmd {
  129. cmd := NewSliceCmd("sentinel", "sentinels", name)
  130. c.Process(cmd)
  131. return cmd
  132. }
  133. // Failover forces a failover as if the master was not reachable, and without
  134. // asking for agreement to other Sentinels.
  135. func (c *SentinelClient) Failover(name string) *StatusCmd {
  136. cmd := NewStatusCmd("sentinel", "failover", name)
  137. c.Process(cmd)
  138. return cmd
  139. }
  140. // Reset resets all the masters with matching name. The pattern argument is a
  141. // glob-style pattern. The reset process clears any previous state in a master
  142. // (including a failover in progress), and removes every slave and sentinel
  143. // already discovered and associated with the master.
  144. func (c *SentinelClient) Reset(pattern string) *IntCmd {
  145. cmd := NewIntCmd("sentinel", "reset", pattern)
  146. c.Process(cmd)
  147. return cmd
  148. }
  149. type sentinelFailover struct {
  150. sentinelAddrs []string
  151. opt *Options
  152. pool *pool.ConnPool
  153. poolOnce sync.Once
  154. mu sync.RWMutex
  155. masterName string
  156. _masterAddr string
  157. sentinel *SentinelClient
  158. pubsub *PubSub
  159. }
  160. func (c *sentinelFailover) Close() error {
  161. c.mu.Lock()
  162. defer c.mu.Unlock()
  163. if c.sentinel != nil {
  164. return c.closeSentinel()
  165. }
  166. return nil
  167. }
  168. func (c *sentinelFailover) Pool() *pool.ConnPool {
  169. c.poolOnce.Do(func() {
  170. c.opt.Dialer = c.dial
  171. c.pool = newConnPool(c.opt)
  172. })
  173. return c.pool
  174. }
  175. func (c *sentinelFailover) dial() (net.Conn, error) {
  176. addr, err := c.MasterAddr()
  177. if err != nil {
  178. return nil, err
  179. }
  180. return net.DialTimeout("tcp", addr, c.opt.DialTimeout)
  181. }
  182. func (c *sentinelFailover) MasterAddr() (string, error) {
  183. addr, err := c.masterAddr()
  184. if err != nil {
  185. return "", err
  186. }
  187. c.switchMaster(addr)
  188. return addr, nil
  189. }
  190. func (c *sentinelFailover) masterAddr() (string, error) {
  191. addr := c.getMasterAddr()
  192. if addr != "" {
  193. return addr, nil
  194. }
  195. c.mu.Lock()
  196. defer c.mu.Unlock()
  197. for i, sentinelAddr := range c.sentinelAddrs {
  198. sentinel := NewSentinelClient(&Options{
  199. Addr: sentinelAddr,
  200. MaxRetries: c.opt.MaxRetries,
  201. DialTimeout: c.opt.DialTimeout,
  202. ReadTimeout: c.opt.ReadTimeout,
  203. WriteTimeout: c.opt.WriteTimeout,
  204. PoolSize: c.opt.PoolSize,
  205. PoolTimeout: c.opt.PoolTimeout,
  206. IdleTimeout: c.opt.IdleTimeout,
  207. IdleCheckFrequency: c.opt.IdleCheckFrequency,
  208. TLSConfig: c.opt.TLSConfig,
  209. })
  210. masterAddr, err := sentinel.GetMasterAddrByName(c.masterName).Result()
  211. if err != nil {
  212. internal.Logf("sentinel: GetMasterAddrByName master=%q failed: %s",
  213. c.masterName, err)
  214. _ = sentinel.Close()
  215. continue
  216. }
  217. // Push working sentinel to the top.
  218. c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
  219. c.setSentinel(sentinel)
  220. addr := net.JoinHostPort(masterAddr[0], masterAddr[1])
  221. return addr, nil
  222. }
  223. return "", errors.New("redis: all sentinels are unreachable")
  224. }
  225. func (c *sentinelFailover) getMasterAddr() string {
  226. c.mu.RLock()
  227. sentinel := c.sentinel
  228. c.mu.RUnlock()
  229. if sentinel == nil {
  230. return ""
  231. }
  232. addr, err := sentinel.GetMasterAddrByName(c.masterName).Result()
  233. if err != nil {
  234. internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s",
  235. c.masterName, err)
  236. c.mu.Lock()
  237. if c.sentinel == sentinel {
  238. c.closeSentinel()
  239. }
  240. c.mu.Unlock()
  241. return ""
  242. }
  243. return net.JoinHostPort(addr[0], addr[1])
  244. }
  245. func (c *sentinelFailover) switchMaster(addr string) {
  246. c.mu.RLock()
  247. masterAddr := c._masterAddr
  248. c.mu.RUnlock()
  249. if masterAddr == addr {
  250. return
  251. }
  252. c.mu.Lock()
  253. defer c.mu.Unlock()
  254. internal.Logf("sentinel: new master=%q addr=%q",
  255. c.masterName, addr)
  256. _ = c.Pool().Filter(func(cn *pool.Conn) bool {
  257. return cn.RemoteAddr().String() != addr
  258. })
  259. c._masterAddr = addr
  260. }
  261. func (c *sentinelFailover) setSentinel(sentinel *SentinelClient) {
  262. c.discoverSentinels(sentinel)
  263. c.sentinel = sentinel
  264. c.pubsub = sentinel.Subscribe("+switch-master")
  265. go c.listen(c.pubsub)
  266. }
  267. func (c *sentinelFailover) closeSentinel() error {
  268. var firstErr error
  269. err := c.pubsub.Close()
  270. if err != nil && firstErr == err {
  271. firstErr = err
  272. }
  273. c.pubsub = nil
  274. err = c.sentinel.Close()
  275. if err != nil && firstErr == err {
  276. firstErr = err
  277. }
  278. c.sentinel = nil
  279. return firstErr
  280. }
  281. func (c *sentinelFailover) discoverSentinels(sentinel *SentinelClient) {
  282. sentinels, err := sentinel.Sentinels(c.masterName).Result()
  283. if err != nil {
  284. internal.Logf("sentinel: Sentinels master=%q failed: %s", c.masterName, err)
  285. return
  286. }
  287. for _, sentinel := range sentinels {
  288. vals := sentinel.([]interface{})
  289. for i := 0; i < len(vals); i += 2 {
  290. key := vals[i].(string)
  291. if key == "name" {
  292. sentinelAddr := vals[i+1].(string)
  293. if !contains(c.sentinelAddrs, sentinelAddr) {
  294. internal.Logf("sentinel: discovered new sentinel=%q for master=%q",
  295. sentinelAddr, c.masterName)
  296. c.sentinelAddrs = append(c.sentinelAddrs, sentinelAddr)
  297. }
  298. }
  299. }
  300. }
  301. }
  302. func (c *sentinelFailover) listen(pubsub *PubSub) {
  303. ch := pubsub.Channel()
  304. for {
  305. msg, ok := <-ch
  306. if !ok {
  307. break
  308. }
  309. switch msg.Channel {
  310. case "+switch-master":
  311. parts := strings.Split(msg.Payload, " ")
  312. if parts[0] != c.masterName {
  313. internal.Logf("sentinel: ignore addr for master=%q", parts[0])
  314. continue
  315. }
  316. addr := net.JoinHostPort(parts[3], parts[4])
  317. c.switchMaster(addr)
  318. }
  319. }
  320. }
  321. func contains(slice []string, str string) bool {
  322. for _, s := range slice {
  323. if s == str {
  324. return true
  325. }
  326. }
  327. return false
  328. }