From 4e1ae308ff631601a4f1219dd954a51516b0cf07 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Sun, 3 Jun 2018 22:31:44 +0930 Subject: [PATCH] ring: add multiple rewrite WriterTo to ring buffer API --- ring/ring.go | 123 +++++++++++++++++++++++++++++++++++++++------- ring/ring_test.go | 4 +- 2 files changed, 107 insertions(+), 20 deletions(-) diff --git a/ring/ring.go b/ring/ring.go index 265360ad..75dfcb65 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -30,7 +30,6 @@ LICENSE package ring import ( - "bytes" "errors" "io" "time" @@ -48,8 +47,8 @@ var ( // The buffer has a writable head and a readable tail with a queue from the head // to the tail. Concurrent read a write operations are safe. type Buffer struct { - head, tail *bytes.Buffer - full, empty chan *bytes.Buffer + head, tail *Chunk + full, empty chan *Chunk timeout time.Duration } @@ -61,12 +60,12 @@ func NewBuffer(len, size int, timeout time.Duration) *Buffer { return nil } b := Buffer{ - full: make(chan *bytes.Buffer, len), - empty: make(chan *bytes.Buffer, len), + full: make(chan *Chunk, len), + empty: make(chan *Chunk, len), timeout: timeout, } for i := 0; i < len; i++ { - b.empty <- bytes.NewBuffer(make([]byte, 0, size)) + b.empty <- newChunk(make([]byte, 0, size)) } return &b } @@ -92,7 +91,7 @@ func (b *Buffer) Write(p []byte) (int, error) { case <-timer.C: select { case b.head = <-b.full: - b.head.Reset() + b.head.reset() dropped = true default: // This should never happen. @@ -102,16 +101,16 @@ func (b *Buffer) Write(p []byte) (int, error) { timer.Stop() } } - if len(p) > b.head.Cap() { + if len(p) > b.head.cap() { return 0, ErrTooLong } - if len(p) > b.head.Cap()-b.head.Len() { + if len(p) > b.head.cap()-b.head.len() { b.full <- b.head b.head = nil return b.Write(p) } - n, err := b.head.Write(p) - if b.head.Cap()-b.head.Len() == 0 { + n, err := b.head.write(p) + if b.head.cap()-b.head.len() == 0 { b.full <- b.head b.head = nil } @@ -150,21 +149,22 @@ func (b *Buffer) Close() error { // // Next is safe to use concurrently with write operations, but may not be used concurrently with // another Read call or Next call. A goroutine calling Next must not call Flush or Close. -func (b *Buffer) Next(timeout time.Duration) error { +func (b *Buffer) Next(timeout time.Duration) (*Chunk, error) { if b.tail == nil { timer := time.NewTimer(timeout) var ok bool select { case <-timer.C: - return ErrTimeout + return nil, ErrTimeout case b.tail, ok = <-b.full: timer.Stop() if !ok { - return io.EOF + return nil, io.EOF } } } - return nil + b.tail.owner = b + return b.tail, nil } // Read reads bytes from the current tail of the ring buffer into p and returns the number of @@ -176,11 +176,98 @@ func (b *Buffer) Read(p []byte) (int, error) { if b.tail == nil { return 0, io.EOF } - n, err := b.tail.Read(p) - if b.tail.Len() == 0 { - b.tail.Reset() + n, err := b.tail.read(p) + if b.tail.len() == 0 { + b.tail.reset() b.empty <- b.tail b.tail = nil } return n, err } + +// Chunk is a simplified version of byte buffer without the capacity to grow beyond the +// buffer's original cap, and a modified WriteTo method that allows multiple calls without +// consuming the buffered data. +type Chunk struct { + buf []byte + off int + owner *Buffer +} + +func newChunk(buf []byte) *Chunk { + return &Chunk{buf: buf[:0]} +} + +func (b *Chunk) len() int { + return len(b.buf) - b.off +} + +func (b *Chunk) cap() int { + return cap(b.buf) +} + +func (b *Chunk) reset() { + b.buf = b.buf[:0] + b.off = 0 +} + +func (b *Chunk) write(p []byte) (n int, err error) { + if len(p) > cap(b.buf)-len(b.buf) { + err = ErrTooLong + } + l := len(b.buf) + m := l + len(p) + if m > cap(b.buf) { + m = cap(b.buf) + } + b.buf = b.buf[:m] + n = copy(b.buf[l:], p) + return n, err +} + +func (b *Chunk) read(p []byte) (n int, err error) { + if b.len() <= 0 { + if len(p) == 0 { + return 0, nil + } + return 0, io.EOF + } + n = copy(p, b.buf[b.off:]) + b.off += n + return n, nil +} + +// WriteTo writes data to w until there's no more data to write or when an error occurs. +// The return value n is the number of bytes written. Any error encountered during the +// write is also returned. Repeated called to WriteTo will write the same data until +// the Chunk's Close method is called. +// +// WriteTo will panic if the Chunk has not been obtained through a call to Buffer.Next or +// has been closed. WriteTo must be used in the same goroutine as the call to Next. +func (b *Chunk) WriteTo(w io.Writer) (n int, err error) { + if b.owner == nil || b.owner.tail != b { + panic("ring: invalid use of ring buffer chunk") + } + n, err = w.Write(b.buf) + if n > len(b.buf) { + panic("ring: invalid byte count") + } + if n != len(b.buf) { + return n, io.ErrShortWrite + } + return n, nil +} + +// Close closes the Chunk, reseting its data and releasing it back to the Buffer. A Chunk +// may not be used after it has been closed. Close must be used in the same goroutine as +// the call to Next. +func (b *Chunk) Close() error { + if b.owner == nil || b.owner.tail != b { + panic("ring: invalid use of ring buffer chunk") + } + b.reset() + b.owner = nil + b.owner.tail = nil + b.owner.empty <- b + return nil +} diff --git a/ring/ring_test.go b/ring/ring_test.go index 7a556a30..c28bf475 100644 --- a/ring/ring_test.go +++ b/ring/ring_test.go @@ -139,7 +139,7 @@ func TestRoundTrip(t *testing.T) { var timeouts int elements: for { - err := b.Next(test.nextTimeout) + _, err := b.Next(test.nextTimeout) switch err { case nil: timeouts = 0 @@ -236,7 +236,7 @@ func BenchmarkRoundTrip(b *testing.B) { var timeouts int elements: for { - err := rb.Next(timeout) + _, err := rb.Next(timeout) switch err { case nil: timeouts = 0