You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

120 lines
2.2 KiB

  1. package redis
  2. import (
  3. "sync"
  4. "github.com/go-redis/redis/internal/pool"
  5. )
  6. type pipelineExecer func([]Cmder) error
  7. type Pipeliner interface {
  8. StatefulCmdable
  9. Do(args ...interface{}) *Cmd
  10. Process(cmd Cmder) error
  11. Close() error
  12. Discard() error
  13. Exec() ([]Cmder, error)
  14. }
  15. var _ Pipeliner = (*Pipeline)(nil)
  16. // Pipeline implements pipelining as described in
  17. // http://redis.io/topics/pipelining. It's safe for concurrent use
  18. // by multiple goroutines.
  19. type Pipeline struct {
  20. statefulCmdable
  21. exec pipelineExecer
  22. mu sync.Mutex
  23. cmds []Cmder
  24. closed bool
  25. }
  26. func (c *Pipeline) Do(args ...interface{}) *Cmd {
  27. cmd := NewCmd(args...)
  28. _ = c.Process(cmd)
  29. return cmd
  30. }
  31. // Process queues the cmd for later execution.
  32. func (c *Pipeline) Process(cmd Cmder) error {
  33. c.mu.Lock()
  34. c.cmds = append(c.cmds, cmd)
  35. c.mu.Unlock()
  36. return nil
  37. }
  38. // Close closes the pipeline, releasing any open resources.
  39. func (c *Pipeline) Close() error {
  40. c.mu.Lock()
  41. c.discard()
  42. c.closed = true
  43. c.mu.Unlock()
  44. return nil
  45. }
  46. // Discard resets the pipeline and discards queued commands.
  47. func (c *Pipeline) Discard() error {
  48. c.mu.Lock()
  49. err := c.discard()
  50. c.mu.Unlock()
  51. return err
  52. }
  53. func (c *Pipeline) discard() error {
  54. if c.closed {
  55. return pool.ErrClosed
  56. }
  57. c.cmds = c.cmds[:0]
  58. return nil
  59. }
  60. // Exec executes all previously queued commands using one
  61. // client-server roundtrip.
  62. //
  63. // Exec always returns list of commands and error of the first failed
  64. // command if any.
  65. func (c *Pipeline) Exec() ([]Cmder, error) {
  66. c.mu.Lock()
  67. defer c.mu.Unlock()
  68. if c.closed {
  69. return nil, pool.ErrClosed
  70. }
  71. if len(c.cmds) == 0 {
  72. return nil, nil
  73. }
  74. cmds := c.cmds
  75. c.cmds = nil
  76. return cmds, c.exec(cmds)
  77. }
  78. func (c *Pipeline) pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  79. if err := fn(c); err != nil {
  80. return nil, err
  81. }
  82. cmds, err := c.Exec()
  83. _ = c.Close()
  84. return cmds, err
  85. }
  86. func (c *Pipeline) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  87. return c.pipelined(fn)
  88. }
  89. func (c *Pipeline) Pipeline() Pipeliner {
  90. return c
  91. }
  92. func (c *Pipeline) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  93. return c.pipelined(fn)
  94. }
  95. func (c *Pipeline) TxPipeline() Pipeliner {
  96. return c
  97. }