diff --git a/revid/Revid.go b/revid/Revid.go index 9a44c916..a43b46de 100644 --- a/revid/Revid.go +++ b/revid/Revid.go @@ -355,7 +355,7 @@ func (r *revid) packClips() { _, err := r.ringBuffer.Write(frame) if err != nil { r.Log(Error, err.Error()) - if err == ring.ErrTimeout { + if err == ring.ErrDropped { r.Log(Warning, fmt.Sprintf("dropped %d byte frame", len(frame))) } } diff --git a/ring/ring.go b/ring/ring.go index b2d8b68c..265360ad 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -38,6 +38,8 @@ import ( var ( ErrTimeout = errors.New("ring: buffer cycle timeout") + ErrDropped = errors.New("ring: dropped old write") + ErrStall = errors.New("ring: unable to dump old write") ErrTooLong = errors.New("ring: write to long for buffer element") ) @@ -76,17 +78,26 @@ func (b *Buffer) Len() int { // Write writes the bytes in b to the next current or next available element of the ring buffer // it returns the number of bytes written and any error. -// If no element can be gained within the timeout, ErrTimeout is returned and if the len(p) is -// greater than the buffer's element size, ErrTooLong is returned. +// If no element can be gained within the timeout or stolen from the queue, ErrStall is returned +// and if the len(p) is greater than the buffer's element size, ErrTooLong is returned. If a +// write was successful but a previous write was dropped, ErrDropped is returned. // // Write is safe to use concurrently with Read, but may not be used concurrently with another // write operation. func (b *Buffer) Write(p []byte) (int, error) { + var dropped bool if b.head == nil { timer := time.NewTimer(b.timeout) select { case <-timer.C: - return 0, ErrTimeout + select { + case b.head = <-b.full: + b.head.Reset() + dropped = true + default: + // This should never happen. + return 0, ErrStall + } case b.head = <-b.empty: timer.Stop() } @@ -104,6 +115,9 @@ func (b *Buffer) Write(p []byte) (int, error) { b.full <- b.head b.head = nil } + if dropped && err == nil { + err = ErrDropped + } return n, err } diff --git a/ring/ring_test.go b/ring/ring_test.go index 86f110da..b607351b 100644 --- a/ring/ring_test.go +++ b/ring/ring_test.go @@ -110,19 +110,19 @@ func TestRoundTrip(t *testing.T) { go func() { defer wg.Done() for _, c := range test.data { - var timeouts int + var dropped int for _, f := range c { time.Sleep(test.writeDelay) // Simulate slow data capture. _, err := b.Write([]byte(f)) switch err { case nil: - timeouts = 0 - case ErrTimeout: - if timeouts > maxTimeouts { - t.Errorf("too many write timeouts for %q", test.name) + dropped = 0 + case ErrDropped: + if dropped > maxTimeouts { + t.Errorf("too many write drops for %q", test.name) return } - timeouts++ + dropped++ default: t.Errorf("unexpected write error for %q: %v", test.name, err) return