From a4497c0ea4ff561fe314939246b493307ebd988e Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Tue, 29 May 2018 07:49:11 +0930 Subject: [PATCH] ring: new package replacement for ringbuffer --- ring/README.md | 24 ++++++ ring/ring.go | 172 ++++++++++++++++++++++++++++++++++++++ ring/ring_test.go | 207 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 403 insertions(+) create mode 100644 ring/README.md create mode 100644 ring/ring.go create mode 100644 ring/ring_test.go diff --git a/ring/README.md b/ring/README.md new file mode 100644 index 00000000..3738922b --- /dev/null +++ b/ring/README.md @@ -0,0 +1,24 @@ +# Readme + +Package ring provides a ring buffer of io.ReadWriters. + +# Author + +Dan Kortschak + +# License + +ring is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + +It is free software: you can redistribute it and/or modify them +under the terms of the GNU General Public License as published by the +Free Software Foundation, either version 3 of the License, or (at your +option) any later version. + +It is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +or more details. + +You should have received a copy of the GNU General Public License +along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses/). diff --git a/ring/ring.go b/ring/ring.go new file mode 100644 index 00000000..b2d8b68c --- /dev/null +++ b/ring/ring.go @@ -0,0 +1,172 @@ +/* +NAME + ring.go - a structure that encapsulates a Buffer datastructure with conccurency + functionality + +DESCRIPTION + See Readme.md + +AUTHOR + Dan Kortschak + +LICENSE + ring.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + +// Package ring provides a ring buffer of io.ReadWriters. +package ring + +import ( + "bytes" + "errors" + "io" + "time" +) + +var ( + ErrTimeout = errors.New("ring: buffer cycle timeout") + ErrTooLong = errors.New("ring: write to long for buffer element") +) + +// Buffer implements a ring buffer. +// +// The buffer has a writable head and a readable tail with a queue from the head +// to the tail. Concurrent read a write operations are safe. +type Buffer struct { + head, tail *bytes.Buffer + full, empty chan *bytes.Buffer + timeout time.Duration +} + +// NewBuffer returns a Buffer with len elements of the given size. The timeout +// parameter specifies how long a write operation will wait before failing with +// a temporary timeout error. +func NewBuffer(len, size int, timeout time.Duration) *Buffer { + if len <= 0 || size <= 0 { + return nil + } + b := Buffer{ + full: make(chan *bytes.Buffer, len), + empty: make(chan *bytes.Buffer, len), + timeout: timeout, + } + for i := 0; i < len; i++ { + b.empty <- bytes.NewBuffer(make([]byte, 0, size)) + } + return &b +} + +// Len returns the number of full buffer elements. +func (b *Buffer) Len() int { + return len(b.full) +} + +// Write writes the bytes in b to the next current or next available element of the ring buffer +// it returns the number of bytes written and any error. +// If no element can be gained within the timeout, ErrTimeout is returned and if the len(p) is +// greater than the buffer's element size, ErrTooLong is returned. +// +// Write is safe to use concurrently with Read, but may not be used concurrently with another +// write operation. +func (b *Buffer) Write(p []byte) (int, error) { + if b.head == nil { + timer := time.NewTimer(b.timeout) + select { + case <-timer.C: + return 0, ErrTimeout + case b.head = <-b.empty: + timer.Stop() + } + } + if len(p) > b.head.Cap() { + return 0, ErrTooLong + } + if len(p) > b.head.Cap()-b.head.Len() { + b.full <- b.head + b.head = nil + return b.Write(p) + } + n, err := b.head.Write(p) + if b.head.Cap()-b.head.Len() == 0 { + b.full <- b.head + b.head = nil + } + return n, err +} + +// Flush puts the currently writing element of the buffer into the queue for reading. Flush +// is idempotent. +// +// Flush is safe to use concurrently with Read, but may not be used concurrently with another +// another write operation. +func (b *Buffer) Flush() { + if b.head == nil { + return + } + b.full <- b.head + b.head = nil +} + +// Close closes the buffer. The buffer may not be written to after a call to close, but can +// be drained by calls to Read. +// +// Flush is safe to use concurrently with Read, but may not be used concurrently with another +// another write operation. +func (b *Buffer) Close() error { + b.Flush() + close(b.full) + return nil +} + +// Next gets the next element from the queue ready for reading, returning ErrTimeout if no +// element is available within the timeout. If the Buffer has been closed Next returns io.EOF. +// +// Next is safe to use concurrently with write operations, but may not be used concurrently with +// another Read call or Next call. A goroutine calling Next must not call Flush or Close. +func (b *Buffer) Next(timeout time.Duration) error { + if b.tail == nil { + timer := time.NewTimer(timeout) + var ok bool + select { + case <-timer.C: + return ErrTimeout + case b.tail, ok = <-b.full: + timer.Stop() + if !ok { + return io.EOF + } + } + } + return nil +} + +// Read reads bytes from the current tail of the ring buffer into p and returns the number of +// bytes read and any error. +// +// Read is safe to use concurrently with write operations, but may not be used concurrently with +// another Read call or Next call. A goroutine calling Read must not call Flush or Close. +func (b *Buffer) Read(p []byte) (int, error) { + if b.tail == nil { + return 0, io.EOF + } + n, err := b.tail.Read(p) + if b.tail.Len() == 0 { + b.tail.Reset() + b.empty <- b.tail + b.tail = nil + } + return n, err +} diff --git a/ring/ring_test.go b/ring/ring_test.go new file mode 100644 index 00000000..86f110da --- /dev/null +++ b/ring/ring_test.go @@ -0,0 +1,207 @@ +/* +NAME + ring_test.go - a test suite adopting the golang testing library to test functionality of the + RingBuffer structure + +DESCRIPTION + See README.md + +AUTHOR + Dan Kortschak + +LICENSE + ring_test.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + +package ring + +import ( + "io" + "reflect" + "strings" + "sync" + "testing" + "time" +) + +var roundTripTests = []struct { + name string + + len int + size int + timeout time.Duration + nextTimeout time.Duration + + data [][]string + readDelay time.Duration + writeDelay time.Duration +}{ + { + name: "happy", + len: 2, size: 50, + timeout: 100 * time.Millisecond, + nextTimeout: 100 * time.Millisecond, + + data: [][]string{ + {"frame1", "frame2", "frame3", "frame4"}, + {"frame5", "frame6"}, + {"frame5", "frame6", "frame7"}, + {"frame8", "frame9", "frame10"}, + {"frame11"}, + {"frame12", "frame13"}, + {"frame14", "frame15", "frame16", "frame17"}, + }, + }, + { + name: "slow write", + len: 2, size: 50, + timeout: 100 * time.Millisecond, + nextTimeout: 100 * time.Millisecond, + + data: [][]string{ + {"frame1", "frame2", "frame3", "frame4"}, + {"frame5", "frame6"}, + {"frame5", "frame6", "frame7"}, + {"frame8", "frame9", "frame10"}, + {"frame11"}, + {"frame12", "frame13"}, + {"frame14", "frame15", "frame16", "frame17"}, + }, + writeDelay: 500 * time.Millisecond, + }, + { + name: "slow read", + len: 2, size: 50, + timeout: 100 * time.Millisecond, + nextTimeout: 100 * time.Millisecond, + + data: [][]string{ + {"frame1", "frame2", "frame3", "frame4"}, + {"frame5", "frame6"}, + {"frame5", "frame6", "frame7"}, + {"frame8", "frame9", "frame10"}, + {"frame11"}, + {"frame12", "frame13"}, + {"frame14", "frame15", "frame16", "frame17"}, + }, + readDelay: 500 * time.Millisecond, + }, +} + +func TestRoundTrip(t *testing.T) { + const maxTimeouts = 100 + for _, test := range roundTripTests { + b := NewBuffer(test.len, test.size, test.timeout) + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + for _, c := range test.data { + var timeouts int + for _, f := range c { + time.Sleep(test.writeDelay) // Simulate slow data capture. + _, err := b.Write([]byte(f)) + switch err { + case nil: + timeouts = 0 + case ErrTimeout: + if timeouts > maxTimeouts { + t.Errorf("too many write timeouts for %q", test.name) + return + } + timeouts++ + default: + t.Errorf("unexpected write error for %q: %v", test.name, err) + return + } + } + b.Flush() + } + b.Close() + }() + go func() { + buf := make([]byte, 1<<10) + defer wg.Done() + var got []string + var timeouts int + elements: + for { + err := b.Next(test.nextTimeout) + switch err { + case nil: + timeouts = 0 + case ErrTimeout: + if timeouts > maxTimeouts { + t.Errorf("too many timeouts for %q", test.name) + return + } + timeouts++ + case io.EOF: + break elements + default: + t.Errorf("unexpected read error for %q: %v", test.name, err) + return + } + reads: + for { + n, err := b.Read(buf) + if n != 0 { + time.Sleep(test.readDelay) // Simulate slow data processing. + got = append(got, string(buf[:n])) + } + switch err { + case nil: + case io.EOF: + break reads + default: + t.Errorf("unexpected read error for %q: %v", test.name, err) + return + } + } + } + var want []string + for _, c := range test.data { + want = append(want, strings.Join(c, "")) + } + if test.readDelay == 0 { + if !reflect.DeepEqual(got, want) { + t.Errorf("unexpected round-trip result for %q:\ngot: %#v\nwant:%#v", test.name, got, want) + } + } else { + // We may have dropped writes in this case. + // So just check that we can consume every + // received element with reference to what + // was sent. + // TODO(kortschak): Check that the number of + // missing elements matches the number of + // dropped writes. + var sidx, ridx int + var recd string + for ridx, recd = range got { + for ; sidx < len(want); sidx++ { + if recd == want[sidx] { + break + } + } + } + if ridx != len(got)-1 { + t.Errorf("unexpected round-trip result for %q (unexplained element received):\ngot: %#v\nwant:%#v", test.name, got, want) + } + } + }() + wg.Wait() + } +}