You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

203 lines
5.3 KiB

  1. // Copyright (c) 2017 Couchbase, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package zap
  15. import (
  16. "bytes"
  17. "encoding/binary"
  18. "io"
  19. "math"
  20. )
  21. const termNotEncoded = math.MaxUint64
  22. type chunkedIntCoder struct {
  23. final []byte
  24. chunkSize uint64
  25. chunkBuf bytes.Buffer
  26. chunkLens []uint64
  27. currChunk uint64
  28. buf []byte
  29. }
  30. // newChunkedIntCoder returns a new chunk int coder which packs data into
  31. // chunks based on the provided chunkSize and supports up to the specified
  32. // maxDocNum
  33. func newChunkedIntCoder(chunkSize uint64, maxDocNum uint64) *chunkedIntCoder {
  34. total := maxDocNum/chunkSize + 1
  35. rv := &chunkedIntCoder{
  36. chunkSize: chunkSize,
  37. chunkLens: make([]uint64, total),
  38. final: make([]byte, 0, 64),
  39. }
  40. return rv
  41. }
  42. // Reset lets you reuse this chunked int coder. buffers are reset and reused
  43. // from previous use. you cannot change the chunk size or max doc num.
  44. func (c *chunkedIntCoder) Reset() {
  45. c.final = c.final[:0]
  46. c.chunkBuf.Reset()
  47. c.currChunk = 0
  48. for i := range c.chunkLens {
  49. c.chunkLens[i] = 0
  50. }
  51. }
  52. // SetChunkSize changes the chunk size. It is only valid to do so
  53. // with a new chunkedIntCoder, or immediately after calling Reset()
  54. func (c *chunkedIntCoder) SetChunkSize(chunkSize uint64, maxDocNum uint64) {
  55. total := int(maxDocNum/chunkSize + 1)
  56. c.chunkSize = chunkSize
  57. if cap(c.chunkLens) < total {
  58. c.chunkLens = make([]uint64, total)
  59. } else {
  60. c.chunkLens = c.chunkLens[:total]
  61. }
  62. }
  63. // Add encodes the provided integers into the correct chunk for the provided
  64. // doc num. You MUST call Add() with increasing docNums.
  65. func (c *chunkedIntCoder) Add(docNum uint64, vals ...uint64) error {
  66. chunk := docNum / c.chunkSize
  67. if chunk != c.currChunk {
  68. // starting a new chunk
  69. c.Close()
  70. c.chunkBuf.Reset()
  71. c.currChunk = chunk
  72. }
  73. if len(c.buf) < binary.MaxVarintLen64 {
  74. c.buf = make([]byte, binary.MaxVarintLen64)
  75. }
  76. for _, val := range vals {
  77. wb := binary.PutUvarint(c.buf, val)
  78. _, err := c.chunkBuf.Write(c.buf[:wb])
  79. if err != nil {
  80. return err
  81. }
  82. }
  83. return nil
  84. }
  85. func (c *chunkedIntCoder) AddBytes(docNum uint64, buf []byte) error {
  86. chunk := docNum / c.chunkSize
  87. if chunk != c.currChunk {
  88. // starting a new chunk
  89. c.Close()
  90. c.chunkBuf.Reset()
  91. c.currChunk = chunk
  92. }
  93. _, err := c.chunkBuf.Write(buf)
  94. return err
  95. }
  96. // Close indicates you are done calling Add() this allows the final chunk
  97. // to be encoded.
  98. func (c *chunkedIntCoder) Close() {
  99. encodingBytes := c.chunkBuf.Bytes()
  100. c.chunkLens[c.currChunk] = uint64(len(encodingBytes))
  101. c.final = append(c.final, encodingBytes...)
  102. c.currChunk = uint64(cap(c.chunkLens)) // sentinel to detect double close
  103. }
  104. // Write commits all the encoded chunked integers to the provided writer.
  105. func (c *chunkedIntCoder) Write(w io.Writer) (int, error) {
  106. bufNeeded := binary.MaxVarintLen64 * (1 + len(c.chunkLens))
  107. if len(c.buf) < bufNeeded {
  108. c.buf = make([]byte, bufNeeded)
  109. }
  110. buf := c.buf
  111. // convert the chunk lengths into chunk offsets
  112. chunkOffsets := modifyLengthsToEndOffsets(c.chunkLens)
  113. // write out the number of chunks & each chunk offsets
  114. n := binary.PutUvarint(buf, uint64(len(chunkOffsets)))
  115. for _, chunkOffset := range chunkOffsets {
  116. n += binary.PutUvarint(buf[n:], chunkOffset)
  117. }
  118. tw, err := w.Write(buf[:n])
  119. if err != nil {
  120. return tw, err
  121. }
  122. // write out the data
  123. nw, err := w.Write(c.final)
  124. tw += nw
  125. if err != nil {
  126. return tw, err
  127. }
  128. return tw, nil
  129. }
  130. // writeAt commits all the encoded chunked integers to the provided writer
  131. // and returns the starting offset, total bytes written and an error
  132. func (c *chunkedIntCoder) writeAt(w io.Writer) (uint64, int, error) {
  133. startOffset := uint64(termNotEncoded)
  134. if len(c.final) <= 0 {
  135. return startOffset, 0, nil
  136. }
  137. if chw := w.(*CountHashWriter); chw != nil {
  138. startOffset = uint64(chw.Count())
  139. }
  140. tw, err := c.Write(w)
  141. return startOffset, tw, err
  142. }
  143. func (c *chunkedIntCoder) FinalSize() int {
  144. return len(c.final)
  145. }
  146. // modifyLengthsToEndOffsets converts the chunk length array
  147. // to a chunk offset array. The readChunkBoundary
  148. // will figure out the start and end of every chunk from
  149. // these offsets. Starting offset of i'th index is stored
  150. // in i-1'th position except for 0'th index and ending offset
  151. // is stored at i'th index position.
  152. // For 0'th element, starting position is always zero.
  153. // eg:
  154. // Lens -> 5 5 5 5 => 5 10 15 20
  155. // Lens -> 0 5 0 5 => 0 5 5 10
  156. // Lens -> 0 0 0 5 => 0 0 0 5
  157. // Lens -> 5 0 0 0 => 5 5 5 5
  158. // Lens -> 0 5 0 0 => 0 5 5 5
  159. // Lens -> 0 0 5 0 => 0 0 5 5
  160. func modifyLengthsToEndOffsets(lengths []uint64) []uint64 {
  161. var runningOffset uint64
  162. var index, i int
  163. for i = 1; i <= len(lengths); i++ {
  164. runningOffset += lengths[i-1]
  165. lengths[index] = runningOffset
  166. index++
  167. }
  168. return lengths
  169. }
  170. func readChunkBoundary(chunk int, offsets []uint64) (uint64, uint64) {
  171. var start uint64
  172. if chunk > 0 {
  173. start = offsets[chunk-1]
  174. }
  175. return start, offsets[chunk]
  176. }