mirror of https://bitbucket.org/ausocean/av.git
ring: add tests/benchmark for WriterTo path and fix SIGSEGV error
Also add a call to ioutil.ReadAll to mimic the use in revid. There is not a great deal of difference on the RPi3 in terms of throughput, though the allocations obviously increase. ``` $ go test -run ^$ -bench . -benchmem -benchtime 30s goos: linux goarch: arm BenchmarkRoundTrip-4 2000 20184818 ns/op 1.49 MB/s 104646 B/op 4 allocs/op BenchmarkRoundTripWriterTo-4 2000 20175948 ns/op 1.49 MB/s 175 B/op 3 allocs/op PASS ```
This commit is contained in:
parent
4e1ae308ff
commit
b8d804e4ec
|
@ -147,6 +147,8 @@ func (b *Buffer) Close() error {
|
||||||
// Next gets the next element from the queue ready for reading, returning ErrTimeout if no
|
// Next gets the next element from the queue ready for reading, returning ErrTimeout if no
|
||||||
// element is available within the timeout. If the Buffer has been closed Next returns io.EOF.
|
// element is available within the timeout. If the Buffer has been closed Next returns io.EOF.
|
||||||
//
|
//
|
||||||
|
// Is it the responsibility of the caller to close the returned Chunk.
|
||||||
|
//
|
||||||
// Next is safe to use concurrently with write operations, but may not be used concurrently with
|
// Next is safe to use concurrently with write operations, but may not be used concurrently with
|
||||||
// another Read call or Next call. A goroutine calling Next must not call Flush or Close.
|
// another Read call or Next call. A goroutine calling Next must not call Flush or Close.
|
||||||
func (b *Buffer) Next(timeout time.Duration) (*Chunk, error) {
|
func (b *Buffer) Next(timeout time.Duration) (*Chunk, error) {
|
||||||
|
@ -266,8 +268,9 @@ func (b *Chunk) Close() error {
|
||||||
panic("ring: invalid use of ring buffer chunk")
|
panic("ring: invalid use of ring buffer chunk")
|
||||||
}
|
}
|
||||||
b.reset()
|
b.reset()
|
||||||
b.owner = nil
|
|
||||||
b.owner.tail = nil
|
b.owner.tail = nil
|
||||||
b.owner.empty <- b
|
empty := b.owner.empty
|
||||||
|
b.owner = nil
|
||||||
|
empty <- b
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,7 +29,9 @@ LICENSE
|
||||||
package ring
|
package ring
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"io"
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -206,6 +208,112 @@ func TestRoundTrip(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRoundTripWriterTo(t *testing.T) {
|
||||||
|
const maxTimeouts = 100
|
||||||
|
for _, test := range roundTripTests {
|
||||||
|
b := NewBuffer(test.len, test.size, test.timeout)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(2)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for _, c := range test.data {
|
||||||
|
var dropped int
|
||||||
|
for _, f := range c {
|
||||||
|
time.Sleep(test.writeDelay) // Simulate slow data capture.
|
||||||
|
_, err := b.Write([]byte(f))
|
||||||
|
switch err {
|
||||||
|
case nil:
|
||||||
|
dropped = 0
|
||||||
|
case ErrDropped:
|
||||||
|
if dropped > maxTimeouts {
|
||||||
|
t.Errorf("too many write drops for %q", test.name)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
dropped++
|
||||||
|
default:
|
||||||
|
t.Errorf("unexpected write error for %q: %v", test.name, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
b.Flush()
|
||||||
|
}
|
||||||
|
b.Close()
|
||||||
|
}()
|
||||||
|
go func() {
|
||||||
|
var buf bytes.Buffer
|
||||||
|
defer wg.Done()
|
||||||
|
var got []string
|
||||||
|
var timeouts int
|
||||||
|
elements:
|
||||||
|
for {
|
||||||
|
chunk, err := b.Next(test.nextTimeout)
|
||||||
|
switch err {
|
||||||
|
case nil:
|
||||||
|
timeouts = 0
|
||||||
|
case ErrTimeout:
|
||||||
|
if timeouts > maxTimeouts {
|
||||||
|
t.Errorf("too many timeouts for %q", test.name)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
timeouts++
|
||||||
|
continue
|
||||||
|
case io.EOF:
|
||||||
|
break elements
|
||||||
|
default:
|
||||||
|
t.Errorf("unexpected read error for %q: %v", test.name, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err := chunk.WriteTo(&buf)
|
||||||
|
if n != 0 {
|
||||||
|
time.Sleep(test.readDelay) // Simulate slow data processing.
|
||||||
|
got = append(got, buf.String())
|
||||||
|
buf.Reset()
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected writeto error for %q: %v", test.name, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err = chunk.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected close error for %q: %v", test.name, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
var want []string
|
||||||
|
for _, c := range test.data {
|
||||||
|
want = append(want, strings.Join(c, ""))
|
||||||
|
}
|
||||||
|
if test.readDelay == 0 {
|
||||||
|
if !reflect.DeepEqual(got, want) {
|
||||||
|
t.Errorf("unexpected round-trip result for %q:\ngot: %#v\nwant:%#v", test.name, got, want)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// We may have dropped writes in this case.
|
||||||
|
// So just check that we can consume every
|
||||||
|
// received element with reference to what
|
||||||
|
// was sent.
|
||||||
|
// TODO(kortschak): Check that the number of
|
||||||
|
// missing elements matches the number of
|
||||||
|
// dropped writes.
|
||||||
|
var sidx, ridx int
|
||||||
|
var recd string
|
||||||
|
for ridx, recd = range got {
|
||||||
|
for ; sidx < len(want); sidx++ {
|
||||||
|
if recd == want[sidx] {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if ridx != len(got)-1 {
|
||||||
|
t.Errorf("unexpected round-trip result for %q (unexplained element received):\ngot: %#v\nwant:%#v", test.name, got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func BenchmarkRoundTrip(b *testing.B) {
|
func BenchmarkRoundTrip(b *testing.B) {
|
||||||
const (
|
const (
|
||||||
maxTimeouts = 100
|
maxTimeouts = 100
|
||||||
|
@ -224,11 +332,6 @@ func BenchmarkRoundTrip(b *testing.B) {
|
||||||
// amortised cost.
|
// amortised cost.
|
||||||
rb := NewBuffer(len, size, timeout)
|
rb := NewBuffer(len, size, timeout)
|
||||||
|
|
||||||
// This is hoisted here to ensure the allocation
|
|
||||||
// is not counted since this is outside the control
|
|
||||||
// of the ring buffer.
|
|
||||||
buf := make([]byte, size+1)
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -252,20 +355,106 @@ func BenchmarkRoundTrip(b *testing.B) {
|
||||||
b.Errorf("unexpected read error: %v", err)
|
b.Errorf("unexpected read error: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
reads:
|
|
||||||
for {
|
_, err = ioutil.ReadAll(rb)
|
||||||
n, err := rb.Read(buf)
|
time.Sleep(readDelay) // Simulate slow data processing.
|
||||||
if n != 0 {
|
if err != nil {
|
||||||
time.Sleep(readDelay) // Simulate slow data processing.
|
b.Errorf("unexpected read error: %v", err)
|
||||||
}
|
return
|
||||||
switch err {
|
}
|
||||||
case nil:
|
}
|
||||||
case io.EOF:
|
}()
|
||||||
break reads
|
|
||||||
default:
|
data := make([]byte, frameLen)
|
||||||
b.Errorf("unexpected read error: %v", err)
|
|
||||||
return
|
b.ResetTimer()
|
||||||
}
|
b.SetBytes(frameLen)
|
||||||
|
|
||||||
|
var dropped int
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
time.Sleep(writeDelay) // Simulate slow data capture.
|
||||||
|
_, err := rb.Write(data)
|
||||||
|
switch err {
|
||||||
|
case nil:
|
||||||
|
dropped = 0
|
||||||
|
case ErrDropped:
|
||||||
|
if dropped > maxTimeouts {
|
||||||
|
b.Error("too many write drops")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
dropped++
|
||||||
|
default:
|
||||||
|
b.Errorf("unexpected write error: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
rb.Close()
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkRoundTripWriterTo(b *testing.B) {
|
||||||
|
const (
|
||||||
|
maxTimeouts = 100
|
||||||
|
|
||||||
|
len = 50
|
||||||
|
size = 150e3
|
||||||
|
timeout = 10 * time.Millisecond
|
||||||
|
|
||||||
|
frameLen = 30e3
|
||||||
|
|
||||||
|
writeDelay = 20 * time.Millisecond
|
||||||
|
readDelay = 50 * time.Millisecond
|
||||||
|
)
|
||||||
|
|
||||||
|
// Allocated prior to timer reset since it is an
|
||||||
|
// amortised cost.
|
||||||
|
rb := NewBuffer(len, size, timeout)
|
||||||
|
|
||||||
|
// This is hoisted here to ensure the allocation
|
||||||
|
// is not counted since this is outside the control
|
||||||
|
// of the ring buffer.
|
||||||
|
buf := bytes.NewBuffer(make([]byte, 0, size+1))
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
var timeouts int
|
||||||
|
elements:
|
||||||
|
for {
|
||||||
|
chunk, err := rb.Next(timeout)
|
||||||
|
switch err {
|
||||||
|
case nil:
|
||||||
|
timeouts = 0
|
||||||
|
case ErrTimeout:
|
||||||
|
if timeouts > maxTimeouts {
|
||||||
|
b.Error("too many timeouts")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
timeouts++
|
||||||
|
continue
|
||||||
|
case io.EOF:
|
||||||
|
break elements
|
||||||
|
default:
|
||||||
|
b.Errorf("unexpected read error: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err := chunk.WriteTo(buf)
|
||||||
|
if n != 0 {
|
||||||
|
time.Sleep(readDelay) // Simulate slow data processing.
|
||||||
|
buf.Reset()
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
b.Errorf("unexpected writeto error: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err = chunk.Close()
|
||||||
|
if err != nil {
|
||||||
|
b.Errorf("unexpected close error: %v", err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
Loading…
Reference in New Issue