You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

383 lines
9.2 KiB

  1. // The `fwd` package provides a buffered reader
  2. // and writer. Each has methods that help improve
  3. // the encoding/decoding performance of some binary
  4. // protocols.
  5. //
  6. // The `fwd.Writer` and `fwd.Reader` type provide similar
  7. // functionality to their counterparts in `bufio`, plus
  8. // a few extra utility methods that simplify read-ahead
  9. // and write-ahead. I wrote this package to improve serialization
  10. // performance for http://github.com/tinylib/msgp,
  11. // where it provided about a 2x speedup over `bufio` for certain
  12. // workloads. However, care must be taken to understand the semantics of the
  13. // extra methods provided by this package, as they allow
  14. // the user to access and manipulate the buffer memory
  15. // directly.
  16. //
  17. // The extra methods for `fwd.Reader` are `Peek`, `Skip`
  18. // and `Next`. `(*fwd.Reader).Peek`, unlike `(*bufio.Reader).Peek`,
  19. // will re-allocate the read buffer in order to accommodate arbitrarily
  20. // large read-ahead. `(*fwd.Reader).Skip` skips the next `n` bytes
  21. // in the stream, and uses the `io.Seeker` interface if the underlying
  22. // stream implements it. `(*fwd.Reader).Next` returns a slice pointing
  23. // to the next `n` bytes in the read buffer (like `Peek`), but also
  24. // increments the read position. This allows users to process streams
  25. // in arbitrary block sizes without having to manage appropriately-sized
  26. // slices. Additionally, obviating the need to copy the data from the
  27. // buffer to another location in memory can improve performance dramatically
  28. // in CPU-bound applications.
  29. //
  30. // `fwd.Writer` only has one extra method, which is `(*fwd.Writer).Next`, which
  31. // returns a slice pointing to the next `n` bytes of the writer, and increments
  32. // the write position by the length of the returned slice. This allows users
  33. // to write directly to the end of the buffer.
  34. //
  35. package fwd
  36. import "io"
  37. const (
  38. // DefaultReaderSize is the default size of the read buffer
  39. DefaultReaderSize = 2048
  40. // minimum read buffer; straight from bufio
  41. minReaderSize = 16
  42. )
  43. // NewReader returns a new *Reader that reads from 'r'
  44. func NewReader(r io.Reader) *Reader {
  45. return NewReaderSize(r, DefaultReaderSize)
  46. }
  47. // NewReaderSize returns a new *Reader that
  48. // reads from 'r' and has a buffer size 'n'
  49. func NewReaderSize(r io.Reader, n int) *Reader {
  50. rd := &Reader{
  51. r: r,
  52. data: make([]byte, 0, max(minReaderSize, n)),
  53. }
  54. if s, ok := r.(io.Seeker); ok {
  55. rd.rs = s
  56. }
  57. return rd
  58. }
  59. // Reader is a buffered look-ahead reader
  60. type Reader struct {
  61. r io.Reader // underlying reader
  62. // data[n:len(data)] is buffered data; data[len(data):cap(data)] is free buffer space
  63. data []byte // data
  64. n int // read offset
  65. state error // last read error
  66. // if the reader past to NewReader was
  67. // also an io.Seeker, this is non-nil
  68. rs io.Seeker
  69. }
  70. // Reset resets the underlying reader
  71. // and the read buffer.
  72. func (r *Reader) Reset(rd io.Reader) {
  73. r.r = rd
  74. r.data = r.data[0:0]
  75. r.n = 0
  76. r.state = nil
  77. if s, ok := rd.(io.Seeker); ok {
  78. r.rs = s
  79. } else {
  80. r.rs = nil
  81. }
  82. }
  83. // more() does one read on the underlying reader
  84. func (r *Reader) more() {
  85. // move data backwards so that
  86. // the read offset is 0; this way
  87. // we can supply the maximum number of
  88. // bytes to the reader
  89. if r.n != 0 {
  90. if r.n < len(r.data) {
  91. r.data = r.data[:copy(r.data[0:], r.data[r.n:])]
  92. } else {
  93. r.data = r.data[:0]
  94. }
  95. r.n = 0
  96. }
  97. var a int
  98. a, r.state = r.r.Read(r.data[len(r.data):cap(r.data)])
  99. if a == 0 && r.state == nil {
  100. r.state = io.ErrNoProgress
  101. return
  102. } else if a > 0 && r.state == io.EOF {
  103. // discard the io.EOF if we read more than 0 bytes.
  104. // the next call to Read should return io.EOF again.
  105. r.state = nil
  106. }
  107. r.data = r.data[:len(r.data)+a]
  108. }
  109. // pop error
  110. func (r *Reader) err() (e error) {
  111. e, r.state = r.state, nil
  112. return
  113. }
  114. // pop error; EOF -> io.ErrUnexpectedEOF
  115. func (r *Reader) noEOF() (e error) {
  116. e, r.state = r.state, nil
  117. if e == io.EOF {
  118. e = io.ErrUnexpectedEOF
  119. }
  120. return
  121. }
  122. // buffered bytes
  123. func (r *Reader) buffered() int { return len(r.data) - r.n }
  124. // Buffered returns the number of bytes currently in the buffer
  125. func (r *Reader) Buffered() int { return len(r.data) - r.n }
  126. // BufferSize returns the total size of the buffer
  127. func (r *Reader) BufferSize() int { return cap(r.data) }
  128. // Peek returns the next 'n' buffered bytes,
  129. // reading from the underlying reader if necessary.
  130. // It will only return a slice shorter than 'n' bytes
  131. // if it also returns an error. Peek does not advance
  132. // the reader. EOF errors are *not* returned as
  133. // io.ErrUnexpectedEOF.
  134. func (r *Reader) Peek(n int) ([]byte, error) {
  135. // in the degenerate case,
  136. // we may need to realloc
  137. // (the caller asked for more
  138. // bytes than the size of the buffer)
  139. if cap(r.data) < n {
  140. old := r.data[r.n:]
  141. r.data = make([]byte, n+r.buffered())
  142. r.data = r.data[:copy(r.data, old)]
  143. r.n = 0
  144. }
  145. // keep filling until
  146. // we hit an error or
  147. // read enough bytes
  148. for r.buffered() < n && r.state == nil {
  149. r.more()
  150. }
  151. // we must have hit an error
  152. if r.buffered() < n {
  153. return r.data[r.n:], r.err()
  154. }
  155. return r.data[r.n : r.n+n], nil
  156. }
  157. // Skip moves the reader forward 'n' bytes.
  158. // Returns the number of bytes skipped and any
  159. // errors encountered. It is analogous to Seek(n, 1).
  160. // If the underlying reader implements io.Seeker, then
  161. // that method will be used to skip forward.
  162. //
  163. // If the reader encounters
  164. // an EOF before skipping 'n' bytes, it
  165. // returns io.ErrUnexpectedEOF. If the
  166. // underlying reader implements io.Seeker, then
  167. // those rules apply instead. (Many implementations
  168. // will not return `io.EOF` until the next call
  169. // to Read.)
  170. func (r *Reader) Skip(n int) (int, error) {
  171. // fast path
  172. if r.buffered() >= n {
  173. r.n += n
  174. return n, nil
  175. }
  176. // use seeker implementation
  177. // if we can
  178. if r.rs != nil {
  179. return r.skipSeek(n)
  180. }
  181. // loop on filling
  182. // and then erasing
  183. o := n
  184. for r.buffered() < n && r.state == nil {
  185. r.more()
  186. // we can skip forward
  187. // up to r.buffered() bytes
  188. step := min(r.buffered(), n)
  189. r.n += step
  190. n -= step
  191. }
  192. // at this point, n should be
  193. // 0 if everything went smoothly
  194. return o - n, r.noEOF()
  195. }
  196. // Next returns the next 'n' bytes in the stream.
  197. // Unlike Peek, Next advances the reader position.
  198. // The returned bytes point to the same
  199. // data as the buffer, so the slice is
  200. // only valid until the next reader method call.
  201. // An EOF is considered an unexpected error.
  202. // If an the returned slice is less than the
  203. // length asked for, an error will be returned,
  204. // and the reader position will not be incremented.
  205. func (r *Reader) Next(n int) ([]byte, error) {
  206. // in case the buffer is too small
  207. if cap(r.data) < n {
  208. old := r.data[r.n:]
  209. r.data = make([]byte, n+r.buffered())
  210. r.data = r.data[:copy(r.data, old)]
  211. r.n = 0
  212. }
  213. // fill at least 'n' bytes
  214. for r.buffered() < n && r.state == nil {
  215. r.more()
  216. }
  217. if r.buffered() < n {
  218. return r.data[r.n:], r.noEOF()
  219. }
  220. out := r.data[r.n : r.n+n]
  221. r.n += n
  222. return out, nil
  223. }
  224. // skipSeek uses the io.Seeker to seek forward.
  225. // only call this function when n > r.buffered()
  226. func (r *Reader) skipSeek(n int) (int, error) {
  227. o := r.buffered()
  228. // first, clear buffer
  229. n -= o
  230. r.n = 0
  231. r.data = r.data[:0]
  232. // then seek forward remaning bytes
  233. i, err := r.rs.Seek(int64(n), 1)
  234. return int(i) + o, err
  235. }
  236. // Read implements `io.Reader`
  237. func (r *Reader) Read(b []byte) (int, error) {
  238. // if we have data in the buffer, just
  239. // return that.
  240. if r.buffered() != 0 {
  241. x := copy(b, r.data[r.n:])
  242. r.n += x
  243. return x, nil
  244. }
  245. var n int
  246. // we have no buffered data; determine
  247. // whether or not to buffer or call
  248. // the underlying reader directly
  249. if len(b) >= cap(r.data) {
  250. n, r.state = r.r.Read(b)
  251. } else {
  252. r.more()
  253. n = copy(b, r.data)
  254. r.n = n
  255. }
  256. if n == 0 {
  257. return 0, r.err()
  258. }
  259. return n, nil
  260. }
  261. // ReadFull attempts to read len(b) bytes into
  262. // 'b'. It returns the number of bytes read into
  263. // 'b', and an error if it does not return len(b).
  264. // EOF is considered an unexpected error.
  265. func (r *Reader) ReadFull(b []byte) (int, error) {
  266. var n int // read into b
  267. var nn int // scratch
  268. l := len(b)
  269. // either read buffered data,
  270. // or read directly for the underlying
  271. // buffer, or fetch more buffered data.
  272. for n < l && r.state == nil {
  273. if r.buffered() != 0 {
  274. nn = copy(b[n:], r.data[r.n:])
  275. n += nn
  276. r.n += nn
  277. } else if l-n > cap(r.data) {
  278. nn, r.state = r.r.Read(b[n:])
  279. n += nn
  280. } else {
  281. r.more()
  282. }
  283. }
  284. if n < l {
  285. return n, r.noEOF()
  286. }
  287. return n, nil
  288. }
  289. // ReadByte implements `io.ByteReader`
  290. func (r *Reader) ReadByte() (byte, error) {
  291. for r.buffered() < 1 && r.state == nil {
  292. r.more()
  293. }
  294. if r.buffered() < 1 {
  295. return 0, r.err()
  296. }
  297. b := r.data[r.n]
  298. r.n++
  299. return b, nil
  300. }
  301. // WriteTo implements `io.WriterTo`
  302. func (r *Reader) WriteTo(w io.Writer) (int64, error) {
  303. var (
  304. i int64
  305. ii int
  306. err error
  307. )
  308. // first, clear buffer
  309. if r.buffered() > 0 {
  310. ii, err = w.Write(r.data[r.n:])
  311. i += int64(ii)
  312. if err != nil {
  313. return i, err
  314. }
  315. r.data = r.data[0:0]
  316. r.n = 0
  317. }
  318. for r.state == nil {
  319. // here we just do
  320. // 1:1 reads and writes
  321. r.more()
  322. if r.buffered() > 0 {
  323. ii, err = w.Write(r.data)
  324. i += int64(ii)
  325. if err != nil {
  326. return i, err
  327. }
  328. r.data = r.data[0:0]
  329. r.n = 0
  330. }
  331. }
  332. if r.state != io.EOF {
  333. return i, r.err()
  334. }
  335. return i, nil
  336. }
  337. func min(a int, b int) int {
  338. if a < b {
  339. return a
  340. }
  341. return b
  342. }
  343. func max(a int, b int) int {
  344. if a < b {
  345. return b
  346. }
  347. return a
  348. }