ring: reverse time discounting

This commit is contained in:
Dan Kortschak 2018-05-31 07:47:14 +09:30
parent 04df5c2aa2
commit 2791939f34
3 changed files with 24 additions and 10 deletions

View File

@ -355,7 +355,7 @@ func (r *revid) packClips() {
_, err := r.ringBuffer.Write(frame) _, err := r.ringBuffer.Write(frame)
if err != nil { if err != nil {
r.Log(Error, err.Error()) r.Log(Error, err.Error())
if err == ring.ErrTimeout { if err == ring.ErrDropped {
r.Log(Warning, fmt.Sprintf("dropped %d byte frame", len(frame))) r.Log(Warning, fmt.Sprintf("dropped %d byte frame", len(frame)))
} }
} }

View File

@ -38,6 +38,8 @@ import (
var ( var (
ErrTimeout = errors.New("ring: buffer cycle timeout") ErrTimeout = errors.New("ring: buffer cycle timeout")
ErrDropped = errors.New("ring: dropped old write")
ErrStall = errors.New("ring: unable to dump old write")
ErrTooLong = errors.New("ring: write to long for buffer element") ErrTooLong = errors.New("ring: write to long for buffer element")
) )
@ -76,17 +78,26 @@ func (b *Buffer) Len() int {
// Write writes the bytes in b to the next current or next available element of the ring buffer // Write writes the bytes in b to the next current or next available element of the ring buffer
// it returns the number of bytes written and any error. // it returns the number of bytes written and any error.
// If no element can be gained within the timeout, ErrTimeout is returned and if the len(p) is // If no element can be gained within the timeout or stolen from the queue, ErrStall is returned
// greater than the buffer's element size, ErrTooLong is returned. // and if the len(p) is greater than the buffer's element size, ErrTooLong is returned. If a
// write was successful but a previous write was dropped, ErrDropped is returned.
// //
// Write is safe to use concurrently with Read, but may not be used concurrently with another // Write is safe to use concurrently with Read, but may not be used concurrently with another
// write operation. // write operation.
func (b *Buffer) Write(p []byte) (int, error) { func (b *Buffer) Write(p []byte) (int, error) {
var dropped bool
if b.head == nil { if b.head == nil {
timer := time.NewTimer(b.timeout) timer := time.NewTimer(b.timeout)
select { select {
case <-timer.C: case <-timer.C:
return 0, ErrTimeout select {
case b.head = <-b.full:
b.head.Reset()
dropped = true
default:
// This should never happen.
return 0, ErrStall
}
case b.head = <-b.empty: case b.head = <-b.empty:
timer.Stop() timer.Stop()
} }
@ -104,6 +115,9 @@ func (b *Buffer) Write(p []byte) (int, error) {
b.full <- b.head b.full <- b.head
b.head = nil b.head = nil
} }
if dropped && err == nil {
err = ErrDropped
}
return n, err return n, err
} }

View File

@ -110,19 +110,19 @@ func TestRoundTrip(t *testing.T) {
go func() { go func() {
defer wg.Done() defer wg.Done()
for _, c := range test.data { for _, c := range test.data {
var timeouts int var dropped int
for _, f := range c { for _, f := range c {
time.Sleep(test.writeDelay) // Simulate slow data capture. time.Sleep(test.writeDelay) // Simulate slow data capture.
_, err := b.Write([]byte(f)) _, err := b.Write([]byte(f))
switch err { switch err {
case nil: case nil:
timeouts = 0 dropped = 0
case ErrTimeout: case ErrDropped:
if timeouts > maxTimeouts { if dropped > maxTimeouts {
t.Errorf("too many write timeouts for %q", test.name) t.Errorf("too many write drops for %q", test.name)
return return
} }
timeouts++ dropped++
default: default:
t.Errorf("unexpected write error for %q: %v", test.name, err) t.Errorf("unexpected write error for %q: %v", test.name, err)
return return