You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

120 lines
3.1 KiB

  1. // Package ctxio provides io.Reader and io.Writer wrappers that
  2. // respect context.Contexts. Use these at the interface between
  3. // your context code and your io.
  4. //
  5. // WARNING: read the code. see how writes and reads will continue
  6. // until you cancel the io. Maybe this package should provide
  7. // versions of io.ReadCloser and io.WriteCloser that automatically
  8. // call .Close when the context expires. But for now -- since in my
  9. // use cases I have long-lived connections with ephemeral io wrappers
  10. // -- this has yet to be a need.
  11. package ctxio
  12. import (
  13. "io"
  14. context "golang.org/x/net/context"
  15. )
  16. type ioret struct {
  17. n int
  18. err error
  19. }
  20. type Writer interface {
  21. io.Writer
  22. }
  23. type ctxWriter struct {
  24. w io.Writer
  25. ctx context.Context
  26. }
  27. // NewWriter wraps a writer to make it respect given Context.
  28. // If there is a blocking write, the returned Writer will return
  29. // whenever the context is cancelled (the return values are n=0
  30. // and err=ctx.Err().)
  31. //
  32. // Note well: this wrapper DOES NOT ACTUALLY cancel the underlying
  33. // write-- there is no way to do that with the standard go io
  34. // interface. So the read and write _will_ happen or hang. So, use
  35. // this sparingly, make sure to cancel the read or write as necesary
  36. // (e.g. closing a connection whose context is up, etc.)
  37. //
  38. // Furthermore, in order to protect your memory from being read
  39. // _after_ you've cancelled the context, this io.Writer will
  40. // first make a **copy** of the buffer.
  41. func NewWriter(ctx context.Context, w io.Writer) *ctxWriter {
  42. if ctx == nil {
  43. ctx = context.Background()
  44. }
  45. return &ctxWriter{ctx: ctx, w: w}
  46. }
  47. func (w *ctxWriter) Write(buf []byte) (int, error) {
  48. buf2 := make([]byte, len(buf))
  49. copy(buf2, buf)
  50. c := make(chan ioret, 1)
  51. go func() {
  52. n, err := w.w.Write(buf2)
  53. c <- ioret{n, err}
  54. close(c)
  55. }()
  56. select {
  57. case r := <-c:
  58. return r.n, r.err
  59. case <-w.ctx.Done():
  60. return 0, w.ctx.Err()
  61. }
  62. }
  63. type Reader interface {
  64. io.Reader
  65. }
  66. type ctxReader struct {
  67. r io.Reader
  68. ctx context.Context
  69. }
  70. // NewReader wraps a reader to make it respect given Context.
  71. // If there is a blocking read, the returned Reader will return
  72. // whenever the context is cancelled (the return values are n=0
  73. // and err=ctx.Err().)
  74. //
  75. // Note well: this wrapper DOES NOT ACTUALLY cancel the underlying
  76. // write-- there is no way to do that with the standard go io
  77. // interface. So the read and write _will_ happen or hang. So, use
  78. // this sparingly, make sure to cancel the read or write as necesary
  79. // (e.g. closing a connection whose context is up, etc.)
  80. //
  81. // Furthermore, in order to protect your memory from being read
  82. // _before_ you've cancelled the context, this io.Reader will
  83. // allocate a buffer of the same size, and **copy** into the client's
  84. // if the read succeeds in time.
  85. func NewReader(ctx context.Context, r io.Reader) *ctxReader {
  86. return &ctxReader{ctx: ctx, r: r}
  87. }
  88. func (r *ctxReader) Read(buf []byte) (int, error) {
  89. buf2 := make([]byte, len(buf))
  90. c := make(chan ioret, 1)
  91. go func() {
  92. n, err := r.r.Read(buf2)
  93. c <- ioret{n, err}
  94. close(c)
  95. }()
  96. select {
  97. case ret := <-c:
  98. copy(buf, buf2)
  99. return ret.n, ret.err
  100. case <-r.ctx.Done():
  101. return 0, r.ctx.Err()
  102. }
  103. }