You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

184 lines
4.5 KiB

  1. // Copyright 2020 Andrew Thornton. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package levelqueue
  5. import (
  6. "fmt"
  7. "github.com/syndtr/goleveldb/leveldb"
  8. )
  9. const (
  10. uniqueQueuePrefixStr = "unique"
  11. )
  12. // UniqueQueue defines an unique queue struct
  13. type UniqueQueue struct {
  14. q *Queue
  15. set *Set
  16. db *leveldb.DB
  17. closeUnderlyingDB bool
  18. }
  19. // OpenUnique opens an unique queue from the db path or creates a set if it doesn't exist.
  20. // The keys in the queue portion will not be prefixed, and the set keys will be prefixed with "set-"
  21. func OpenUnique(dataDir string) (*UniqueQueue, error) {
  22. db, err := leveldb.OpenFile(dataDir, nil)
  23. if err != nil {
  24. return nil, err
  25. }
  26. return NewUniqueQueue(db, []byte{}, []byte(uniqueQueuePrefixStr), true)
  27. }
  28. // NewUniqueQueue creates a new unique queue from a db.
  29. // The queue keys will be prefixed with queuePrefix and the set keys with setPrefix
  30. // and at close the db will be closed as per closeUnderlyingDB
  31. func NewUniqueQueue(db *leveldb.DB, queuePrefix []byte, setPrefix []byte, closeUnderlyingDB bool) (*UniqueQueue, error) {
  32. internal, err := NewQueue(db, queuePrefix, false)
  33. if err != nil {
  34. return nil, err
  35. }
  36. set, err := NewSet(db, setPrefix, false)
  37. if err != nil {
  38. return nil, err
  39. }
  40. queue := &UniqueQueue{
  41. q: internal,
  42. set: set,
  43. db: db,
  44. closeUnderlyingDB: closeUnderlyingDB,
  45. }
  46. return queue, err
  47. }
  48. // LPush pushes data to the left of the queue
  49. func (queue *UniqueQueue) LPush(data []byte) error {
  50. return queue.LPushFunc(data, nil)
  51. }
  52. // LPushFunc pushes data to the left of the queue and calls the callback if it is added
  53. func (queue *UniqueQueue) LPushFunc(data []byte, fn func() error) error {
  54. added, err := queue.set.Add(data)
  55. if err != nil {
  56. return err
  57. }
  58. if !added {
  59. return ErrAlreadyInQueue
  60. }
  61. if fn != nil {
  62. err = fn()
  63. if err != nil {
  64. _, remErr := queue.set.Remove(data)
  65. if remErr != nil {
  66. return fmt.Errorf("%v & %v", err, remErr)
  67. }
  68. return err
  69. }
  70. }
  71. return queue.q.LPush(data)
  72. }
  73. // RPush pushes data to the right of the queue
  74. func (queue *UniqueQueue) RPush(data []byte) error {
  75. return queue.RPushFunc(data, nil)
  76. }
  77. // RPushFunc pushes data to the right of the queue and calls the callback if is added
  78. func (queue *UniqueQueue) RPushFunc(data []byte, fn func() error) error {
  79. added, err := queue.set.Add(data)
  80. if err != nil {
  81. return err
  82. }
  83. if !added {
  84. return ErrAlreadyInQueue
  85. }
  86. if fn != nil {
  87. err = fn()
  88. if err != nil {
  89. _, remErr := queue.set.Remove(data)
  90. if remErr != nil {
  91. return fmt.Errorf("%v & %v", err, remErr)
  92. }
  93. return err
  94. }
  95. }
  96. return queue.q.RPush(data)
  97. }
  98. // RPop pop data from the right of the queue
  99. func (queue *UniqueQueue) RPop() ([]byte, error) {
  100. popped, err := queue.q.RPop()
  101. if err != nil {
  102. return popped, err
  103. }
  104. _, err = queue.set.Remove(popped)
  105. return popped, err
  106. }
  107. // RHandle receives a user callback function to handle the right element of the queue, if the function returns nil, then delete the element, otherwise keep the element.
  108. func (queue *UniqueQueue) RHandle(h func([]byte) error) error {
  109. return queue.q.RHandle(func(data []byte) error {
  110. err := h(data)
  111. if err != nil {
  112. return err
  113. }
  114. _, err = queue.set.Remove(data)
  115. return err
  116. })
  117. }
  118. // LPop pops data from left of the queue
  119. func (queue *UniqueQueue) LPop() ([]byte, error) {
  120. popped, err := queue.q.LPop()
  121. if err != nil {
  122. return popped, err
  123. }
  124. _, err = queue.set.Remove(popped)
  125. return popped, err
  126. }
  127. // LHandle receives a user callback function to handle the left element of the queue, if the function returns nil, then delete the element, otherwise keep the element.
  128. func (queue *UniqueQueue) LHandle(h func([]byte) error) error {
  129. return queue.q.LHandle(func(data []byte) error {
  130. err := h(data)
  131. if err != nil {
  132. return err
  133. }
  134. _, err = queue.set.Remove(data)
  135. return err
  136. })
  137. }
  138. // Has checks whether the data is already in the queue
  139. func (queue *UniqueQueue) Has(data []byte) (bool, error) {
  140. return queue.set.Has(data)
  141. }
  142. // Len returns the length of the queue
  143. func (queue *UniqueQueue) Len() int64 {
  144. queue.set.lock.Lock()
  145. defer queue.set.lock.Unlock()
  146. return queue.q.Len()
  147. }
  148. // Close closes the queue (and the underlying DB if set to closeUnderlyingDB)
  149. func (queue *UniqueQueue) Close() error {
  150. _ = queue.q.Close()
  151. _ = queue.set.Close()
  152. if !queue.closeUnderlyingDB {
  153. queue.db = nil
  154. return nil
  155. }
  156. err := queue.db.Close()
  157. queue.db = nil
  158. return err
  159. }