You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1621 lines
32 KiB

  1. package redis
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "fmt"
  6. "math"
  7. "math/rand"
  8. "net"
  9. "runtime"
  10. "sort"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. "github.com/go-redis/redis/internal"
  15. "github.com/go-redis/redis/internal/hashtag"
  16. "github.com/go-redis/redis/internal/pool"
  17. "github.com/go-redis/redis/internal/proto"
  18. )
  19. var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
  20. // ClusterOptions are used to configure a cluster client and should be
  21. // passed to NewClusterClient.
  22. type ClusterOptions struct {
  23. // A seed list of host:port addresses of cluster nodes.
  24. Addrs []string
  25. // The maximum number of retries before giving up. Command is retried
  26. // on network errors and MOVED/ASK redirects.
  27. // Default is 8 retries.
  28. MaxRedirects int
  29. // Enables read-only commands on slave nodes.
  30. ReadOnly bool
  31. // Allows routing read-only commands to the closest master or slave node.
  32. // It automatically enables ReadOnly.
  33. RouteByLatency bool
  34. // Allows routing read-only commands to the random master or slave node.
  35. // It automatically enables ReadOnly.
  36. RouteRandomly bool
  37. // Optional function that returns cluster slots information.
  38. // It is useful to manually create cluster of standalone Redis servers
  39. // and load-balance read/write operations between master and slaves.
  40. // It can use service like ZooKeeper to maintain configuration information
  41. // and Cluster.ReloadState to manually trigger state reloading.
  42. ClusterSlots func() ([]ClusterSlot, error)
  43. // Optional hook that is called when a new node is created.
  44. OnNewNode func(*Client)
  45. // Following options are copied from Options struct.
  46. OnConnect func(*Conn) error
  47. Password string
  48. MaxRetries int
  49. MinRetryBackoff time.Duration
  50. MaxRetryBackoff time.Duration
  51. DialTimeout time.Duration
  52. ReadTimeout time.Duration
  53. WriteTimeout time.Duration
  54. // PoolSize applies per cluster node and not for the whole cluster.
  55. PoolSize int
  56. MinIdleConns int
  57. MaxConnAge time.Duration
  58. PoolTimeout time.Duration
  59. IdleTimeout time.Duration
  60. IdleCheckFrequency time.Duration
  61. TLSConfig *tls.Config
  62. }
  63. func (opt *ClusterOptions) init() {
  64. if opt.MaxRedirects == -1 {
  65. opt.MaxRedirects = 0
  66. } else if opt.MaxRedirects == 0 {
  67. opt.MaxRedirects = 8
  68. }
  69. if (opt.RouteByLatency || opt.RouteRandomly) && opt.ClusterSlots == nil {
  70. opt.ReadOnly = true
  71. }
  72. if opt.PoolSize == 0 {
  73. opt.PoolSize = 5 * runtime.NumCPU()
  74. }
  75. switch opt.ReadTimeout {
  76. case -1:
  77. opt.ReadTimeout = 0
  78. case 0:
  79. opt.ReadTimeout = 3 * time.Second
  80. }
  81. switch opt.WriteTimeout {
  82. case -1:
  83. opt.WriteTimeout = 0
  84. case 0:
  85. opt.WriteTimeout = opt.ReadTimeout
  86. }
  87. switch opt.MinRetryBackoff {
  88. case -1:
  89. opt.MinRetryBackoff = 0
  90. case 0:
  91. opt.MinRetryBackoff = 8 * time.Millisecond
  92. }
  93. switch opt.MaxRetryBackoff {
  94. case -1:
  95. opt.MaxRetryBackoff = 0
  96. case 0:
  97. opt.MaxRetryBackoff = 512 * time.Millisecond
  98. }
  99. }
  100. func (opt *ClusterOptions) clientOptions() *Options {
  101. const disableIdleCheck = -1
  102. return &Options{
  103. OnConnect: opt.OnConnect,
  104. MaxRetries: opt.MaxRetries,
  105. MinRetryBackoff: opt.MinRetryBackoff,
  106. MaxRetryBackoff: opt.MaxRetryBackoff,
  107. Password: opt.Password,
  108. readOnly: opt.ReadOnly,
  109. DialTimeout: opt.DialTimeout,
  110. ReadTimeout: opt.ReadTimeout,
  111. WriteTimeout: opt.WriteTimeout,
  112. PoolSize: opt.PoolSize,
  113. MinIdleConns: opt.MinIdleConns,
  114. MaxConnAge: opt.MaxConnAge,
  115. PoolTimeout: opt.PoolTimeout,
  116. IdleTimeout: opt.IdleTimeout,
  117. IdleCheckFrequency: disableIdleCheck,
  118. TLSConfig: opt.TLSConfig,
  119. }
  120. }
  121. //------------------------------------------------------------------------------
  122. type clusterNode struct {
  123. Client *Client
  124. latency uint32 // atomic
  125. generation uint32 // atomic
  126. loading uint32 // atomic
  127. }
  128. func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
  129. opt := clOpt.clientOptions()
  130. opt.Addr = addr
  131. node := clusterNode{
  132. Client: NewClient(opt),
  133. }
  134. node.latency = math.MaxUint32
  135. if clOpt.RouteByLatency {
  136. go node.updateLatency()
  137. }
  138. if clOpt.OnNewNode != nil {
  139. clOpt.OnNewNode(node.Client)
  140. }
  141. return &node
  142. }
  143. func (n *clusterNode) String() string {
  144. return n.Client.String()
  145. }
  146. func (n *clusterNode) Close() error {
  147. return n.Client.Close()
  148. }
  149. func (n *clusterNode) updateLatency() {
  150. const probes = 10
  151. var latency uint32
  152. for i := 0; i < probes; i++ {
  153. start := time.Now()
  154. n.Client.Ping()
  155. probe := uint32(time.Since(start) / time.Microsecond)
  156. latency = (latency + probe) / 2
  157. }
  158. atomic.StoreUint32(&n.latency, latency)
  159. }
  160. func (n *clusterNode) Latency() time.Duration {
  161. latency := atomic.LoadUint32(&n.latency)
  162. return time.Duration(latency) * time.Microsecond
  163. }
  164. func (n *clusterNode) MarkAsLoading() {
  165. atomic.StoreUint32(&n.loading, uint32(time.Now().Unix()))
  166. }
  167. func (n *clusterNode) Loading() bool {
  168. const minute = int64(time.Minute / time.Second)
  169. loading := atomic.LoadUint32(&n.loading)
  170. if loading == 0 {
  171. return false
  172. }
  173. if time.Now().Unix()-int64(loading) < minute {
  174. return true
  175. }
  176. atomic.StoreUint32(&n.loading, 0)
  177. return false
  178. }
  179. func (n *clusterNode) Generation() uint32 {
  180. return atomic.LoadUint32(&n.generation)
  181. }
  182. func (n *clusterNode) SetGeneration(gen uint32) {
  183. for {
  184. v := atomic.LoadUint32(&n.generation)
  185. if gen < v || atomic.CompareAndSwapUint32(&n.generation, v, gen) {
  186. break
  187. }
  188. }
  189. }
  190. //------------------------------------------------------------------------------
  191. type clusterNodes struct {
  192. opt *ClusterOptions
  193. mu sync.RWMutex
  194. allAddrs []string
  195. allNodes map[string]*clusterNode
  196. clusterAddrs []string
  197. closed bool
  198. _generation uint32 // atomic
  199. }
  200. func newClusterNodes(opt *ClusterOptions) *clusterNodes {
  201. return &clusterNodes{
  202. opt: opt,
  203. allAddrs: opt.Addrs,
  204. allNodes: make(map[string]*clusterNode),
  205. }
  206. }
  207. func (c *clusterNodes) Close() error {
  208. c.mu.Lock()
  209. defer c.mu.Unlock()
  210. if c.closed {
  211. return nil
  212. }
  213. c.closed = true
  214. var firstErr error
  215. for _, node := range c.allNodes {
  216. if err := node.Client.Close(); err != nil && firstErr == nil {
  217. firstErr = err
  218. }
  219. }
  220. c.allNodes = nil
  221. c.clusterAddrs = nil
  222. return firstErr
  223. }
  224. func (c *clusterNodes) Addrs() ([]string, error) {
  225. var addrs []string
  226. c.mu.RLock()
  227. closed := c.closed
  228. if !closed {
  229. if len(c.clusterAddrs) > 0 {
  230. addrs = c.clusterAddrs
  231. } else {
  232. addrs = c.allAddrs
  233. }
  234. }
  235. c.mu.RUnlock()
  236. if closed {
  237. return nil, pool.ErrClosed
  238. }
  239. if len(addrs) == 0 {
  240. return nil, errClusterNoNodes
  241. }
  242. return addrs, nil
  243. }
  244. func (c *clusterNodes) NextGeneration() uint32 {
  245. return atomic.AddUint32(&c._generation, 1)
  246. }
  247. // GC removes unused nodes.
  248. func (c *clusterNodes) GC(generation uint32) {
  249. var collected []*clusterNode
  250. c.mu.Lock()
  251. for addr, node := range c.allNodes {
  252. if node.Generation() >= generation {
  253. continue
  254. }
  255. c.clusterAddrs = remove(c.clusterAddrs, addr)
  256. delete(c.allNodes, addr)
  257. collected = append(collected, node)
  258. }
  259. c.mu.Unlock()
  260. for _, node := range collected {
  261. _ = node.Client.Close()
  262. }
  263. }
  264. func (c *clusterNodes) Get(addr string) (*clusterNode, error) {
  265. var node *clusterNode
  266. var err error
  267. c.mu.RLock()
  268. if c.closed {
  269. err = pool.ErrClosed
  270. } else {
  271. node = c.allNodes[addr]
  272. }
  273. c.mu.RUnlock()
  274. return node, err
  275. }
  276. func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
  277. node, err := c.Get(addr)
  278. if err != nil {
  279. return nil, err
  280. }
  281. if node != nil {
  282. return node, nil
  283. }
  284. c.mu.Lock()
  285. defer c.mu.Unlock()
  286. if c.closed {
  287. return nil, pool.ErrClosed
  288. }
  289. node, ok := c.allNodes[addr]
  290. if ok {
  291. return node, err
  292. }
  293. node = newClusterNode(c.opt, addr)
  294. c.allAddrs = appendIfNotExists(c.allAddrs, addr)
  295. c.clusterAddrs = append(c.clusterAddrs, addr)
  296. c.allNodes[addr] = node
  297. return node, err
  298. }
  299. func (c *clusterNodes) All() ([]*clusterNode, error) {
  300. c.mu.RLock()
  301. defer c.mu.RUnlock()
  302. if c.closed {
  303. return nil, pool.ErrClosed
  304. }
  305. cp := make([]*clusterNode, 0, len(c.allNodes))
  306. for _, node := range c.allNodes {
  307. cp = append(cp, node)
  308. }
  309. return cp, nil
  310. }
  311. func (c *clusterNodes) Random() (*clusterNode, error) {
  312. addrs, err := c.Addrs()
  313. if err != nil {
  314. return nil, err
  315. }
  316. n := rand.Intn(len(addrs))
  317. return c.GetOrCreate(addrs[n])
  318. }
  319. //------------------------------------------------------------------------------
  320. type clusterSlot struct {
  321. start, end int
  322. nodes []*clusterNode
  323. }
  324. type clusterSlotSlice []*clusterSlot
  325. func (p clusterSlotSlice) Len() int {
  326. return len(p)
  327. }
  328. func (p clusterSlotSlice) Less(i, j int) bool {
  329. return p[i].start < p[j].start
  330. }
  331. func (p clusterSlotSlice) Swap(i, j int) {
  332. p[i], p[j] = p[j], p[i]
  333. }
  334. type clusterState struct {
  335. nodes *clusterNodes
  336. Masters []*clusterNode
  337. Slaves []*clusterNode
  338. slots []*clusterSlot
  339. generation uint32
  340. createdAt time.Time
  341. }
  342. func newClusterState(
  343. nodes *clusterNodes, slots []ClusterSlot, origin string,
  344. ) (*clusterState, error) {
  345. c := clusterState{
  346. nodes: nodes,
  347. slots: make([]*clusterSlot, 0, len(slots)),
  348. generation: nodes.NextGeneration(),
  349. createdAt: time.Now(),
  350. }
  351. originHost, _, _ := net.SplitHostPort(origin)
  352. isLoopbackOrigin := isLoopback(originHost)
  353. for _, slot := range slots {
  354. var nodes []*clusterNode
  355. for i, slotNode := range slot.Nodes {
  356. addr := slotNode.Addr
  357. if !isLoopbackOrigin {
  358. addr = replaceLoopbackHost(addr, originHost)
  359. }
  360. node, err := c.nodes.GetOrCreate(addr)
  361. if err != nil {
  362. return nil, err
  363. }
  364. node.SetGeneration(c.generation)
  365. nodes = append(nodes, node)
  366. if i == 0 {
  367. c.Masters = appendUniqueNode(c.Masters, node)
  368. } else {
  369. c.Slaves = appendUniqueNode(c.Slaves, node)
  370. }
  371. }
  372. c.slots = append(c.slots, &clusterSlot{
  373. start: slot.Start,
  374. end: slot.End,
  375. nodes: nodes,
  376. })
  377. }
  378. sort.Sort(clusterSlotSlice(c.slots))
  379. time.AfterFunc(time.Minute, func() {
  380. nodes.GC(c.generation)
  381. })
  382. return &c, nil
  383. }
  384. func replaceLoopbackHost(nodeAddr, originHost string) string {
  385. nodeHost, nodePort, err := net.SplitHostPort(nodeAddr)
  386. if err != nil {
  387. return nodeAddr
  388. }
  389. nodeIP := net.ParseIP(nodeHost)
  390. if nodeIP == nil {
  391. return nodeAddr
  392. }
  393. if !nodeIP.IsLoopback() {
  394. return nodeAddr
  395. }
  396. // Use origin host which is not loopback and node port.
  397. return net.JoinHostPort(originHost, nodePort)
  398. }
  399. func isLoopback(host string) bool {
  400. ip := net.ParseIP(host)
  401. if ip == nil {
  402. return true
  403. }
  404. return ip.IsLoopback()
  405. }
  406. func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) {
  407. nodes := c.slotNodes(slot)
  408. if len(nodes) > 0 {
  409. return nodes[0], nil
  410. }
  411. return c.nodes.Random()
  412. }
  413. func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
  414. nodes := c.slotNodes(slot)
  415. switch len(nodes) {
  416. case 0:
  417. return c.nodes.Random()
  418. case 1:
  419. return nodes[0], nil
  420. case 2:
  421. if slave := nodes[1]; !slave.Loading() {
  422. return slave, nil
  423. }
  424. return nodes[0], nil
  425. default:
  426. var slave *clusterNode
  427. for i := 0; i < 10; i++ {
  428. n := rand.Intn(len(nodes)-1) + 1
  429. slave = nodes[n]
  430. if !slave.Loading() {
  431. return slave, nil
  432. }
  433. }
  434. // All slaves are loading - use master.
  435. return nodes[0], nil
  436. }
  437. }
  438. func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
  439. const threshold = time.Millisecond
  440. nodes := c.slotNodes(slot)
  441. if len(nodes) == 0 {
  442. return c.nodes.Random()
  443. }
  444. var node *clusterNode
  445. for _, n := range nodes {
  446. if n.Loading() {
  447. continue
  448. }
  449. if node == nil || node.Latency()-n.Latency() > threshold {
  450. node = n
  451. }
  452. }
  453. return node, nil
  454. }
  455. func (c *clusterState) slotRandomNode(slot int) *clusterNode {
  456. nodes := c.slotNodes(slot)
  457. n := rand.Intn(len(nodes))
  458. return nodes[n]
  459. }
  460. func (c *clusterState) slotNodes(slot int) []*clusterNode {
  461. i := sort.Search(len(c.slots), func(i int) bool {
  462. return c.slots[i].end >= slot
  463. })
  464. if i >= len(c.slots) {
  465. return nil
  466. }
  467. x := c.slots[i]
  468. if slot >= x.start && slot <= x.end {
  469. return x.nodes
  470. }
  471. return nil
  472. }
  473. //------------------------------------------------------------------------------
  474. type clusterStateHolder struct {
  475. load func() (*clusterState, error)
  476. state atomic.Value
  477. reloading uint32 // atomic
  478. }
  479. func newClusterStateHolder(fn func() (*clusterState, error)) *clusterStateHolder {
  480. return &clusterStateHolder{
  481. load: fn,
  482. }
  483. }
  484. func (c *clusterStateHolder) Reload() (*clusterState, error) {
  485. state, err := c.load()
  486. if err != nil {
  487. return nil, err
  488. }
  489. c.state.Store(state)
  490. return state, nil
  491. }
  492. func (c *clusterStateHolder) LazyReload() {
  493. if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) {
  494. return
  495. }
  496. go func() {
  497. defer atomic.StoreUint32(&c.reloading, 0)
  498. _, err := c.Reload()
  499. if err != nil {
  500. return
  501. }
  502. time.Sleep(100 * time.Millisecond)
  503. }()
  504. }
  505. func (c *clusterStateHolder) Get() (*clusterState, error) {
  506. v := c.state.Load()
  507. if v != nil {
  508. state := v.(*clusterState)
  509. if time.Since(state.createdAt) > time.Minute {
  510. c.LazyReload()
  511. }
  512. return state, nil
  513. }
  514. return c.Reload()
  515. }
  516. func (c *clusterStateHolder) ReloadOrGet() (*clusterState, error) {
  517. state, err := c.Reload()
  518. if err == nil {
  519. return state, nil
  520. }
  521. return c.Get()
  522. }
  523. //------------------------------------------------------------------------------
  524. // ClusterClient is a Redis Cluster client representing a pool of zero
  525. // or more underlying connections. It's safe for concurrent use by
  526. // multiple goroutines.
  527. type ClusterClient struct {
  528. cmdable
  529. ctx context.Context
  530. opt *ClusterOptions
  531. nodes *clusterNodes
  532. state *clusterStateHolder
  533. cmdsInfoCache *cmdsInfoCache
  534. process func(Cmder) error
  535. processPipeline func([]Cmder) error
  536. processTxPipeline func([]Cmder) error
  537. }
  538. // NewClusterClient returns a Redis Cluster client as described in
  539. // http://redis.io/topics/cluster-spec.
  540. func NewClusterClient(opt *ClusterOptions) *ClusterClient {
  541. opt.init()
  542. c := &ClusterClient{
  543. opt: opt,
  544. nodes: newClusterNodes(opt),
  545. }
  546. c.state = newClusterStateHolder(c.loadState)
  547. c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo)
  548. c.process = c.defaultProcess
  549. c.processPipeline = c.defaultProcessPipeline
  550. c.processTxPipeline = c.defaultProcessTxPipeline
  551. c.init()
  552. if opt.IdleCheckFrequency > 0 {
  553. go c.reaper(opt.IdleCheckFrequency)
  554. }
  555. return c
  556. }
  557. func (c *ClusterClient) init() {
  558. c.cmdable.setProcessor(c.Process)
  559. }
  560. // ReloadState reloads cluster state. If available it calls ClusterSlots func
  561. // to get cluster slots information.
  562. func (c *ClusterClient) ReloadState() error {
  563. _, err := c.state.Reload()
  564. return err
  565. }
  566. func (c *ClusterClient) Context() context.Context {
  567. if c.ctx != nil {
  568. return c.ctx
  569. }
  570. return context.Background()
  571. }
  572. func (c *ClusterClient) WithContext(ctx context.Context) *ClusterClient {
  573. if ctx == nil {
  574. panic("nil context")
  575. }
  576. c2 := c.copy()
  577. c2.ctx = ctx
  578. return c2
  579. }
  580. func (c *ClusterClient) copy() *ClusterClient {
  581. cp := *c
  582. cp.init()
  583. return &cp
  584. }
  585. // Options returns read-only Options that were used to create the client.
  586. func (c *ClusterClient) Options() *ClusterOptions {
  587. return c.opt
  588. }
  589. func (c *ClusterClient) retryBackoff(attempt int) time.Duration {
  590. return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
  591. }
  592. func (c *ClusterClient) cmdsInfo() (map[string]*CommandInfo, error) {
  593. addrs, err := c.nodes.Addrs()
  594. if err != nil {
  595. return nil, err
  596. }
  597. var firstErr error
  598. for _, addr := range addrs {
  599. node, err := c.nodes.Get(addr)
  600. if err != nil {
  601. return nil, err
  602. }
  603. if node == nil {
  604. continue
  605. }
  606. info, err := node.Client.Command().Result()
  607. if err == nil {
  608. return info, nil
  609. }
  610. if firstErr == nil {
  611. firstErr = err
  612. }
  613. }
  614. return nil, firstErr
  615. }
  616. func (c *ClusterClient) cmdInfo(name string) *CommandInfo {
  617. cmdsInfo, err := c.cmdsInfoCache.Get()
  618. if err != nil {
  619. return nil
  620. }
  621. info := cmdsInfo[name]
  622. if info == nil {
  623. internal.Logf("info for cmd=%s not found", name)
  624. }
  625. return info
  626. }
  627. func cmdSlot(cmd Cmder, pos int) int {
  628. if pos == 0 {
  629. return hashtag.RandomSlot()
  630. }
  631. firstKey := cmd.stringArg(pos)
  632. return hashtag.Slot(firstKey)
  633. }
  634. func (c *ClusterClient) cmdSlot(cmd Cmder) int {
  635. args := cmd.Args()
  636. if args[0] == "cluster" && args[1] == "getkeysinslot" {
  637. return args[2].(int)
  638. }
  639. cmdInfo := c.cmdInfo(cmd.Name())
  640. return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
  641. }
  642. func (c *ClusterClient) cmdSlotAndNode(cmd Cmder) (int, *clusterNode, error) {
  643. state, err := c.state.Get()
  644. if err != nil {
  645. return 0, nil, err
  646. }
  647. cmdInfo := c.cmdInfo(cmd.Name())
  648. slot := c.cmdSlot(cmd)
  649. if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly {
  650. if c.opt.RouteByLatency {
  651. node, err := state.slotClosestNode(slot)
  652. return slot, node, err
  653. }
  654. if c.opt.RouteRandomly {
  655. node := state.slotRandomNode(slot)
  656. return slot, node, nil
  657. }
  658. node, err := state.slotSlaveNode(slot)
  659. return slot, node, err
  660. }
  661. node, err := state.slotMasterNode(slot)
  662. return slot, node, err
  663. }
  664. func (c *ClusterClient) slotMasterNode(slot int) (*clusterNode, error) {
  665. state, err := c.state.Get()
  666. if err != nil {
  667. return nil, err
  668. }
  669. nodes := state.slotNodes(slot)
  670. if len(nodes) > 0 {
  671. return nodes[0], nil
  672. }
  673. return c.nodes.Random()
  674. }
  675. func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
  676. if len(keys) == 0 {
  677. return fmt.Errorf("redis: Watch requires at least one key")
  678. }
  679. slot := hashtag.Slot(keys[0])
  680. for _, key := range keys[1:] {
  681. if hashtag.Slot(key) != slot {
  682. err := fmt.Errorf("redis: Watch requires all keys to be in the same slot")
  683. return err
  684. }
  685. }
  686. node, err := c.slotMasterNode(slot)
  687. if err != nil {
  688. return err
  689. }
  690. for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
  691. if attempt > 0 {
  692. time.Sleep(c.retryBackoff(attempt))
  693. }
  694. err = node.Client.Watch(fn, keys...)
  695. if err == nil {
  696. break
  697. }
  698. if err != Nil {
  699. c.state.LazyReload()
  700. }
  701. moved, ask, addr := internal.IsMovedError(err)
  702. if moved || ask {
  703. node, err = c.nodes.GetOrCreate(addr)
  704. if err != nil {
  705. return err
  706. }
  707. continue
  708. }
  709. if err == pool.ErrClosed || internal.IsReadOnlyError(err) {
  710. node, err = c.slotMasterNode(slot)
  711. if err != nil {
  712. return err
  713. }
  714. continue
  715. }
  716. if internal.IsRetryableError(err, true) {
  717. continue
  718. }
  719. return err
  720. }
  721. return err
  722. }
  723. // Close closes the cluster client, releasing any open resources.
  724. //
  725. // It is rare to Close a ClusterClient, as the ClusterClient is meant
  726. // to be long-lived and shared between many goroutines.
  727. func (c *ClusterClient) Close() error {
  728. return c.nodes.Close()
  729. }
  730. // Do creates a Cmd from the args and processes the cmd.
  731. func (c *ClusterClient) Do(args ...interface{}) *Cmd {
  732. cmd := NewCmd(args...)
  733. c.Process(cmd)
  734. return cmd
  735. }
  736. func (c *ClusterClient) WrapProcess(
  737. fn func(oldProcess func(Cmder) error) func(Cmder) error,
  738. ) {
  739. c.process = fn(c.process)
  740. }
  741. func (c *ClusterClient) Process(cmd Cmder) error {
  742. return c.process(cmd)
  743. }
  744. func (c *ClusterClient) defaultProcess(cmd Cmder) error {
  745. var node *clusterNode
  746. var ask bool
  747. for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
  748. if attempt > 0 {
  749. time.Sleep(c.retryBackoff(attempt))
  750. }
  751. if node == nil {
  752. var err error
  753. _, node, err = c.cmdSlotAndNode(cmd)
  754. if err != nil {
  755. cmd.setErr(err)
  756. break
  757. }
  758. }
  759. var err error
  760. if ask {
  761. pipe := node.Client.Pipeline()
  762. _ = pipe.Process(NewCmd("ASKING"))
  763. _ = pipe.Process(cmd)
  764. _, err = pipe.Exec()
  765. _ = pipe.Close()
  766. ask = false
  767. } else {
  768. err = node.Client.Process(cmd)
  769. }
  770. // If there is no error - we are done.
  771. if err == nil {
  772. break
  773. }
  774. if err != Nil {
  775. c.state.LazyReload()
  776. }
  777. // If slave is loading - pick another node.
  778. if c.opt.ReadOnly && internal.IsLoadingError(err) {
  779. node.MarkAsLoading()
  780. node = nil
  781. continue
  782. }
  783. var moved bool
  784. var addr string
  785. moved, ask, addr = internal.IsMovedError(err)
  786. if moved || ask {
  787. node, err = c.nodes.GetOrCreate(addr)
  788. if err != nil {
  789. break
  790. }
  791. continue
  792. }
  793. if err == pool.ErrClosed || internal.IsReadOnlyError(err) {
  794. node = nil
  795. continue
  796. }
  797. if internal.IsRetryableError(err, true) {
  798. // First retry the same node.
  799. if attempt == 0 {
  800. continue
  801. }
  802. // Second try random node.
  803. node, err = c.nodes.Random()
  804. if err != nil {
  805. break
  806. }
  807. continue
  808. }
  809. break
  810. }
  811. return cmd.Err()
  812. }
  813. // ForEachMaster concurrently calls the fn on each master node in the cluster.
  814. // It returns the first error if any.
  815. func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
  816. state, err := c.state.ReloadOrGet()
  817. if err != nil {
  818. return err
  819. }
  820. var wg sync.WaitGroup
  821. errCh := make(chan error, 1)
  822. for _, master := range state.Masters {
  823. wg.Add(1)
  824. go func(node *clusterNode) {
  825. defer wg.Done()
  826. err := fn(node.Client)
  827. if err != nil {
  828. select {
  829. case errCh <- err:
  830. default:
  831. }
  832. }
  833. }(master)
  834. }
  835. wg.Wait()
  836. select {
  837. case err := <-errCh:
  838. return err
  839. default:
  840. return nil
  841. }
  842. }
  843. // ForEachSlave concurrently calls the fn on each slave node in the cluster.
  844. // It returns the first error if any.
  845. func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error {
  846. state, err := c.state.ReloadOrGet()
  847. if err != nil {
  848. return err
  849. }
  850. var wg sync.WaitGroup
  851. errCh := make(chan error, 1)
  852. for _, slave := range state.Slaves {
  853. wg.Add(1)
  854. go func(node *clusterNode) {
  855. defer wg.Done()
  856. err := fn(node.Client)
  857. if err != nil {
  858. select {
  859. case errCh <- err:
  860. default:
  861. }
  862. }
  863. }(slave)
  864. }
  865. wg.Wait()
  866. select {
  867. case err := <-errCh:
  868. return err
  869. default:
  870. return nil
  871. }
  872. }
  873. // ForEachNode concurrently calls the fn on each known node in the cluster.
  874. // It returns the first error if any.
  875. func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
  876. state, err := c.state.ReloadOrGet()
  877. if err != nil {
  878. return err
  879. }
  880. var wg sync.WaitGroup
  881. errCh := make(chan error, 1)
  882. worker := func(node *clusterNode) {
  883. defer wg.Done()
  884. err := fn(node.Client)
  885. if err != nil {
  886. select {
  887. case errCh <- err:
  888. default:
  889. }
  890. }
  891. }
  892. for _, node := range state.Masters {
  893. wg.Add(1)
  894. go worker(node)
  895. }
  896. for _, node := range state.Slaves {
  897. wg.Add(1)
  898. go worker(node)
  899. }
  900. wg.Wait()
  901. select {
  902. case err := <-errCh:
  903. return err
  904. default:
  905. return nil
  906. }
  907. }
  908. // PoolStats returns accumulated connection pool stats.
  909. func (c *ClusterClient) PoolStats() *PoolStats {
  910. var acc PoolStats
  911. state, _ := c.state.Get()
  912. if state == nil {
  913. return &acc
  914. }
  915. for _, node := range state.Masters {
  916. s := node.Client.connPool.Stats()
  917. acc.Hits += s.Hits
  918. acc.Misses += s.Misses
  919. acc.Timeouts += s.Timeouts
  920. acc.TotalConns += s.TotalConns
  921. acc.IdleConns += s.IdleConns
  922. acc.StaleConns += s.StaleConns
  923. }
  924. for _, node := range state.Slaves {
  925. s := node.Client.connPool.Stats()
  926. acc.Hits += s.Hits
  927. acc.Misses += s.Misses
  928. acc.Timeouts += s.Timeouts
  929. acc.TotalConns += s.TotalConns
  930. acc.IdleConns += s.IdleConns
  931. acc.StaleConns += s.StaleConns
  932. }
  933. return &acc
  934. }
  935. func (c *ClusterClient) loadState() (*clusterState, error) {
  936. if c.opt.ClusterSlots != nil {
  937. slots, err := c.opt.ClusterSlots()
  938. if err != nil {
  939. return nil, err
  940. }
  941. return newClusterState(c.nodes, slots, "")
  942. }
  943. addrs, err := c.nodes.Addrs()
  944. if err != nil {
  945. return nil, err
  946. }
  947. var firstErr error
  948. for _, addr := range addrs {
  949. node, err := c.nodes.GetOrCreate(addr)
  950. if err != nil {
  951. if firstErr == nil {
  952. firstErr = err
  953. }
  954. continue
  955. }
  956. slots, err := node.Client.ClusterSlots().Result()
  957. if err != nil {
  958. if firstErr == nil {
  959. firstErr = err
  960. }
  961. continue
  962. }
  963. return newClusterState(c.nodes, slots, node.Client.opt.Addr)
  964. }
  965. return nil, firstErr
  966. }
  967. // reaper closes idle connections to the cluster.
  968. func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) {
  969. ticker := time.NewTicker(idleCheckFrequency)
  970. defer ticker.Stop()
  971. for range ticker.C {
  972. nodes, err := c.nodes.All()
  973. if err != nil {
  974. break
  975. }
  976. for _, node := range nodes {
  977. _, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns()
  978. if err != nil {
  979. internal.Logf("ReapStaleConns failed: %s", err)
  980. }
  981. }
  982. }
  983. }
  984. func (c *ClusterClient) Pipeline() Pipeliner {
  985. pipe := Pipeline{
  986. exec: c.processPipeline,
  987. }
  988. pipe.statefulCmdable.setProcessor(pipe.Process)
  989. return &pipe
  990. }
  991. func (c *ClusterClient) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  992. return c.Pipeline().Pipelined(fn)
  993. }
  994. func (c *ClusterClient) WrapProcessPipeline(
  995. fn func(oldProcess func([]Cmder) error) func([]Cmder) error,
  996. ) {
  997. c.processPipeline = fn(c.processPipeline)
  998. }
  999. func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
  1000. cmdsMap := newCmdsMap()
  1001. err := c.mapCmdsByNode(cmds, cmdsMap)
  1002. if err != nil {
  1003. setCmdsErr(cmds, err)
  1004. return err
  1005. }
  1006. for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
  1007. if attempt > 0 {
  1008. time.Sleep(c.retryBackoff(attempt))
  1009. }
  1010. failedCmds := newCmdsMap()
  1011. var wg sync.WaitGroup
  1012. for node, cmds := range cmdsMap.m {
  1013. wg.Add(1)
  1014. go func(node *clusterNode, cmds []Cmder) {
  1015. defer wg.Done()
  1016. cn, err := node.Client.getConn()
  1017. if err != nil {
  1018. if err == pool.ErrClosed {
  1019. c.mapCmdsByNode(cmds, failedCmds)
  1020. } else {
  1021. setCmdsErr(cmds, err)
  1022. }
  1023. return
  1024. }
  1025. err = c.pipelineProcessCmds(node, cn, cmds, failedCmds)
  1026. node.Client.releaseConnStrict(cn, err)
  1027. }(node, cmds)
  1028. }
  1029. wg.Wait()
  1030. if len(failedCmds.m) == 0 {
  1031. break
  1032. }
  1033. cmdsMap = failedCmds
  1034. }
  1035. return cmdsFirstErr(cmds)
  1036. }
  1037. type cmdsMap struct {
  1038. mu sync.Mutex
  1039. m map[*clusterNode][]Cmder
  1040. }
  1041. func newCmdsMap() *cmdsMap {
  1042. return &cmdsMap{
  1043. m: make(map[*clusterNode][]Cmder),
  1044. }
  1045. }
  1046. func (c *ClusterClient) mapCmdsByNode(cmds []Cmder, cmdsMap *cmdsMap) error {
  1047. state, err := c.state.Get()
  1048. if err != nil {
  1049. setCmdsErr(cmds, err)
  1050. return err
  1051. }
  1052. cmdsAreReadOnly := c.cmdsAreReadOnly(cmds)
  1053. for _, cmd := range cmds {
  1054. var node *clusterNode
  1055. var err error
  1056. if cmdsAreReadOnly {
  1057. _, node, err = c.cmdSlotAndNode(cmd)
  1058. } else {
  1059. slot := c.cmdSlot(cmd)
  1060. node, err = state.slotMasterNode(slot)
  1061. }
  1062. if err != nil {
  1063. return err
  1064. }
  1065. cmdsMap.mu.Lock()
  1066. cmdsMap.m[node] = append(cmdsMap.m[node], cmd)
  1067. cmdsMap.mu.Unlock()
  1068. }
  1069. return nil
  1070. }
  1071. func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool {
  1072. for _, cmd := range cmds {
  1073. cmdInfo := c.cmdInfo(cmd.Name())
  1074. if cmdInfo == nil || !cmdInfo.ReadOnly {
  1075. return false
  1076. }
  1077. }
  1078. return true
  1079. }
  1080. func (c *ClusterClient) pipelineProcessCmds(
  1081. node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap,
  1082. ) error {
  1083. err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
  1084. return writeCmd(wr, cmds...)
  1085. })
  1086. if err != nil {
  1087. setCmdsErr(cmds, err)
  1088. failedCmds.mu.Lock()
  1089. failedCmds.m[node] = cmds
  1090. failedCmds.mu.Unlock()
  1091. return err
  1092. }
  1093. err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
  1094. return c.pipelineReadCmds(node, rd, cmds, failedCmds)
  1095. })
  1096. return err
  1097. }
  1098. func (c *ClusterClient) pipelineReadCmds(
  1099. node *clusterNode, rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap,
  1100. ) error {
  1101. var firstErr error
  1102. for _, cmd := range cmds {
  1103. err := cmd.readReply(rd)
  1104. if err == nil {
  1105. continue
  1106. }
  1107. if c.checkMovedErr(cmd, err, failedCmds) {
  1108. continue
  1109. }
  1110. if internal.IsRedisError(err) {
  1111. continue
  1112. }
  1113. failedCmds.mu.Lock()
  1114. failedCmds.m[node] = append(failedCmds.m[node], cmd)
  1115. failedCmds.mu.Unlock()
  1116. if firstErr == nil {
  1117. firstErr = err
  1118. }
  1119. }
  1120. return firstErr
  1121. }
  1122. func (c *ClusterClient) checkMovedErr(
  1123. cmd Cmder, err error, failedCmds *cmdsMap,
  1124. ) bool {
  1125. moved, ask, addr := internal.IsMovedError(err)
  1126. if moved {
  1127. c.state.LazyReload()
  1128. node, err := c.nodes.GetOrCreate(addr)
  1129. if err != nil {
  1130. return false
  1131. }
  1132. failedCmds.mu.Lock()
  1133. failedCmds.m[node] = append(failedCmds.m[node], cmd)
  1134. failedCmds.mu.Unlock()
  1135. return true
  1136. }
  1137. if ask {
  1138. node, err := c.nodes.GetOrCreate(addr)
  1139. if err != nil {
  1140. return false
  1141. }
  1142. failedCmds.mu.Lock()
  1143. failedCmds.m[node] = append(failedCmds.m[node], NewCmd("ASKING"), cmd)
  1144. failedCmds.mu.Unlock()
  1145. return true
  1146. }
  1147. return false
  1148. }
  1149. // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
  1150. func (c *ClusterClient) TxPipeline() Pipeliner {
  1151. pipe := Pipeline{
  1152. exec: c.processTxPipeline,
  1153. }
  1154. pipe.statefulCmdable.setProcessor(pipe.Process)
  1155. return &pipe
  1156. }
  1157. func (c *ClusterClient) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  1158. return c.TxPipeline().Pipelined(fn)
  1159. }
  1160. func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error {
  1161. state, err := c.state.Get()
  1162. if err != nil {
  1163. return err
  1164. }
  1165. cmdsMap := c.mapCmdsBySlot(cmds)
  1166. for slot, cmds := range cmdsMap {
  1167. node, err := state.slotMasterNode(slot)
  1168. if err != nil {
  1169. setCmdsErr(cmds, err)
  1170. continue
  1171. }
  1172. cmdsMap := map[*clusterNode][]Cmder{node: cmds}
  1173. for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
  1174. if attempt > 0 {
  1175. time.Sleep(c.retryBackoff(attempt))
  1176. }
  1177. failedCmds := newCmdsMap()
  1178. var wg sync.WaitGroup
  1179. for node, cmds := range cmdsMap {
  1180. wg.Add(1)
  1181. go func(node *clusterNode, cmds []Cmder) {
  1182. defer wg.Done()
  1183. cn, err := node.Client.getConn()
  1184. if err != nil {
  1185. if err == pool.ErrClosed {
  1186. c.mapCmdsByNode(cmds, failedCmds)
  1187. } else {
  1188. setCmdsErr(cmds, err)
  1189. }
  1190. return
  1191. }
  1192. err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds)
  1193. node.Client.releaseConnStrict(cn, err)
  1194. }(node, cmds)
  1195. }
  1196. wg.Wait()
  1197. if len(failedCmds.m) == 0 {
  1198. break
  1199. }
  1200. cmdsMap = failedCmds.m
  1201. }
  1202. }
  1203. return cmdsFirstErr(cmds)
  1204. }
  1205. func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
  1206. cmdsMap := make(map[int][]Cmder)
  1207. for _, cmd := range cmds {
  1208. slot := c.cmdSlot(cmd)
  1209. cmdsMap[slot] = append(cmdsMap[slot], cmd)
  1210. }
  1211. return cmdsMap
  1212. }
  1213. func (c *ClusterClient) txPipelineProcessCmds(
  1214. node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap,
  1215. ) error {
  1216. err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
  1217. return txPipelineWriteMulti(wr, cmds)
  1218. })
  1219. if err != nil {
  1220. setCmdsErr(cmds, err)
  1221. failedCmds.mu.Lock()
  1222. failedCmds.m[node] = cmds
  1223. failedCmds.mu.Unlock()
  1224. return err
  1225. }
  1226. err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
  1227. err := c.txPipelineReadQueued(rd, cmds, failedCmds)
  1228. if err != nil {
  1229. setCmdsErr(cmds, err)
  1230. return err
  1231. }
  1232. return pipelineReadCmds(rd, cmds)
  1233. })
  1234. return err
  1235. }
  1236. func (c *ClusterClient) txPipelineReadQueued(
  1237. rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap,
  1238. ) error {
  1239. // Parse queued replies.
  1240. var statusCmd StatusCmd
  1241. if err := statusCmd.readReply(rd); err != nil {
  1242. return err
  1243. }
  1244. for _, cmd := range cmds {
  1245. err := statusCmd.readReply(rd)
  1246. if err == nil {
  1247. continue
  1248. }
  1249. if c.checkMovedErr(cmd, err, failedCmds) || internal.IsRedisError(err) {
  1250. continue
  1251. }
  1252. return err
  1253. }
  1254. // Parse number of replies.
  1255. line, err := rd.ReadLine()
  1256. if err != nil {
  1257. if err == Nil {
  1258. err = TxFailedErr
  1259. }
  1260. return err
  1261. }
  1262. switch line[0] {
  1263. case proto.ErrorReply:
  1264. err := proto.ParseErrorReply(line)
  1265. for _, cmd := range cmds {
  1266. if !c.checkMovedErr(cmd, err, failedCmds) {
  1267. break
  1268. }
  1269. }
  1270. return err
  1271. case proto.ArrayReply:
  1272. // ok
  1273. default:
  1274. err := fmt.Errorf("redis: expected '*', but got line %q", line)
  1275. return err
  1276. }
  1277. return nil
  1278. }
  1279. func (c *ClusterClient) pubSub() *PubSub {
  1280. var node *clusterNode
  1281. pubsub := &PubSub{
  1282. opt: c.opt.clientOptions(),
  1283. newConn: func(channels []string) (*pool.Conn, error) {
  1284. if node != nil {
  1285. panic("node != nil")
  1286. }
  1287. slot := hashtag.Slot(channels[0])
  1288. var err error
  1289. node, err = c.slotMasterNode(slot)
  1290. if err != nil {
  1291. return nil, err
  1292. }
  1293. cn, err := node.Client.newConn()
  1294. if err != nil {
  1295. return nil, err
  1296. }
  1297. return cn, nil
  1298. },
  1299. closeConn: func(cn *pool.Conn) error {
  1300. err := node.Client.connPool.CloseConn(cn)
  1301. node = nil
  1302. return err
  1303. },
  1304. }
  1305. pubsub.init()
  1306. return pubsub
  1307. }
  1308. // Subscribe subscribes the client to the specified channels.
  1309. // Channels can be omitted to create empty subscription.
  1310. func (c *ClusterClient) Subscribe(channels ...string) *PubSub {
  1311. pubsub := c.pubSub()
  1312. if len(channels) > 0 {
  1313. _ = pubsub.Subscribe(channels...)
  1314. }
  1315. return pubsub
  1316. }
  1317. // PSubscribe subscribes the client to the given patterns.
  1318. // Patterns can be omitted to create empty subscription.
  1319. func (c *ClusterClient) PSubscribe(channels ...string) *PubSub {
  1320. pubsub := c.pubSub()
  1321. if len(channels) > 0 {
  1322. _ = pubsub.PSubscribe(channels...)
  1323. }
  1324. return pubsub
  1325. }
  1326. func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
  1327. for _, n := range nodes {
  1328. if n == node {
  1329. return nodes
  1330. }
  1331. }
  1332. return append(nodes, node)
  1333. }
  1334. func appendIfNotExists(ss []string, es ...string) []string {
  1335. loop:
  1336. for _, e := range es {
  1337. for _, s := range ss {
  1338. if s == e {
  1339. continue loop
  1340. }
  1341. }
  1342. ss = append(ss, e)
  1343. }
  1344. return ss
  1345. }
  1346. func remove(ss []string, es ...string) []string {
  1347. if len(es) == 0 {
  1348. return ss[:0]
  1349. }
  1350. for _, e := range es {
  1351. for i, s := range ss {
  1352. if s == e {
  1353. ss = append(ss[:i], ss[i+1:]...)
  1354. break
  1355. }
  1356. }
  1357. }
  1358. return ss
  1359. }