You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

188 lines
4.8 KiB

  1. // Copyright (c) 2017 Couchbase, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package vellum
  15. import (
  16. "bytes"
  17. )
  18. // MergeFunc is used to choose the new value for a key when merging a slice
  19. // of iterators, and the same key is observed with multiple values.
  20. // Values presented to the MergeFunc will be in the same order as the
  21. // original slice creating the MergeIterator. This allows some MergeFunc
  22. // implementations to prioritize one iterator over another.
  23. type MergeFunc func([]uint64) uint64
  24. // MergeIterator implements the Iterator interface by traversing a slice
  25. // of iterators and merging the contents of them. If the same key exists
  26. // in mulitipe underlying iterators, a user-provided MergeFunc will be
  27. // invoked to choose the new value.
  28. type MergeIterator struct {
  29. itrs []Iterator
  30. f MergeFunc
  31. currKs [][]byte
  32. currVs []uint64
  33. lowK []byte
  34. lowV uint64
  35. lowIdxs []int
  36. mergeV []uint64
  37. }
  38. // NewMergeIterator creates a new MergeIterator over the provided slice of
  39. // Iterators and with the specified MergeFunc to resolve duplicate keys.
  40. func NewMergeIterator(itrs []Iterator, f MergeFunc) (*MergeIterator, error) {
  41. rv := &MergeIterator{
  42. itrs: itrs,
  43. f: f,
  44. currKs: make([][]byte, len(itrs)),
  45. currVs: make([]uint64, len(itrs)),
  46. lowIdxs: make([]int, 0, len(itrs)),
  47. mergeV: make([]uint64, 0, len(itrs)),
  48. }
  49. rv.init()
  50. if rv.lowK == nil {
  51. return rv, ErrIteratorDone
  52. }
  53. return rv, nil
  54. }
  55. func (m *MergeIterator) init() {
  56. for i, itr := range m.itrs {
  57. m.currKs[i], m.currVs[i] = itr.Current()
  58. }
  59. m.updateMatches()
  60. }
  61. func (m *MergeIterator) updateMatches() {
  62. if len(m.itrs) < 1 {
  63. return
  64. }
  65. m.lowK = m.currKs[0]
  66. m.lowIdxs = m.lowIdxs[:0]
  67. m.lowIdxs = append(m.lowIdxs, 0)
  68. for i := 1; i < len(m.itrs); i++ {
  69. if m.currKs[i] == nil {
  70. continue
  71. }
  72. cmp := bytes.Compare(m.currKs[i], m.lowK)
  73. if m.lowK == nil || cmp < 0 {
  74. // reached a new low
  75. m.lowK = m.currKs[i]
  76. m.lowIdxs = m.lowIdxs[:0]
  77. m.lowIdxs = append(m.lowIdxs, i)
  78. } else if cmp == 0 {
  79. m.lowIdxs = append(m.lowIdxs, i)
  80. }
  81. }
  82. if len(m.lowIdxs) > 1 {
  83. // merge multiple values
  84. m.mergeV = m.mergeV[:0]
  85. for _, vi := range m.lowIdxs {
  86. m.mergeV = append(m.mergeV, m.currVs[vi])
  87. }
  88. m.lowV = m.f(m.mergeV)
  89. } else if len(m.lowIdxs) == 1 {
  90. m.lowV = m.currVs[m.lowIdxs[0]]
  91. }
  92. }
  93. // Current returns the key and value currently pointed to by this iterator.
  94. // If the iterator is not pointing at a valid value (because Iterator/Next/Seek)
  95. // returned an error previously, it may return nil,0.
  96. func (m *MergeIterator) Current() ([]byte, uint64) {
  97. return m.lowK, m.lowV
  98. }
  99. // Next advances this iterator to the next key/value pair. If there is none,
  100. // then ErrIteratorDone is returned.
  101. func (m *MergeIterator) Next() error {
  102. // move all the current low iterators to next
  103. for _, vi := range m.lowIdxs {
  104. err := m.itrs[vi].Next()
  105. if err != nil && err != ErrIteratorDone {
  106. return err
  107. }
  108. m.currKs[vi], m.currVs[vi] = m.itrs[vi].Current()
  109. }
  110. m.updateMatches()
  111. if m.lowK == nil {
  112. return ErrIteratorDone
  113. }
  114. return nil
  115. }
  116. // Seek advances this iterator to the specified key/value pair. If this key
  117. // is not in the FST, Current() will return the next largest key. If this
  118. // seek operation would go past the last key, then ErrIteratorDone is returned.
  119. func (m *MergeIterator) Seek(key []byte) error {
  120. for i := range m.itrs {
  121. err := m.itrs[i].Seek(key)
  122. if err != nil && err != ErrIteratorDone {
  123. return err
  124. }
  125. }
  126. m.updateMatches()
  127. if m.lowK == nil {
  128. return ErrIteratorDone
  129. }
  130. return nil
  131. }
  132. // Close will attempt to close all the underlying Iterators. If any errors
  133. // are encountered, the first will be returned.
  134. func (m *MergeIterator) Close() error {
  135. var rv error
  136. for i := range m.itrs {
  137. // close all iterators, return first error if any
  138. err := m.itrs[i].Close()
  139. if rv == nil {
  140. rv = err
  141. }
  142. }
  143. return rv
  144. }
  145. // MergeMin chooses the minimum value
  146. func MergeMin(vals []uint64) uint64 {
  147. rv := vals[0]
  148. for _, v := range vals[1:] {
  149. if v < rv {
  150. rv = v
  151. }
  152. }
  153. return rv
  154. }
  155. // MergeMax chooses the maximum value
  156. func MergeMax(vals []uint64) uint64 {
  157. rv := vals[0]
  158. for _, v := range vals[1:] {
  159. if v > rv {
  160. rv = v
  161. }
  162. }
  163. return rv
  164. }
  165. // MergeSum sums the values
  166. func MergeSum(vals []uint64) uint64 {
  167. rv := vals[0]
  168. for _, v := range vals[1:] {
  169. rv += v
  170. }
  171. return rv
  172. }