diff --git a/revid/Revid.go b/revid/Revid.go index 3616e994..15b692f0 100644 --- a/revid/Revid.go +++ b/revid/Revid.go @@ -383,6 +383,7 @@ func (r *revid) outputClips() { prevTime := now bytes := 0 delay := 0 + errorCount := 0 for r.isRunning { // Here we slow things down as much as we can to decrease cpu usage switch { @@ -401,23 +402,25 @@ func (r *revid) outputClips() { err2 := r.sendClip(clip) r.Log(Debug, "Finished send!") - for ; err2 != nil; errorCount++ { - r.Log(Warning, "Send failed trying again!") - // If the clip size is not bigger than the threshold then we classify - // it as junk and we don't try to send it off again. + if err2 != nil && len(clip) > 11 { + // Try and send again err2 = r.sendClip(clip) - if err2 == nil { - break - } - - // So that we don't fill up the log, once we reach the maxSendFailedErrorCount - // we will add some delay to slow things down until we have a connection again - time.Sleep(time.Duration(sendFailedDelay) * time.Second) - r.Log(Error, err2.Error()) - if r.config.Output == NativeRtmp { - r.rtmpInst.EndSession() - r.flushData() - err = r.rtmpInst.StartSession() + + // if there's still an error we try and reconnect + for err2 != nil { + time.Sleep(time.Duration(5)*time.Millisecond) + r.Log(Error, err2.Error()) + if r.config.Output == NativeRtmp { + r.rtmpInst.EndSession() + } + if r.ringBuffer.Full() { + r.flushData() + } + if r.config.Output == NativeRtmp { + r.rtmpInst.StartSession() + } + // and if the ring buffer is full then we flush the incoming data + err2 = r.sendClip(clip) } } // let the ringbuffer know that we're done with the memory we grabbed when diff --git a/ringbuffer/RingBuffer.go b/ringbuffer/RingBuffer.go index 5430dbd2..7a077aff 100644 --- a/ringbuffer/RingBuffer.go +++ b/ringbuffer/RingBuffer.go @@ -46,6 +46,7 @@ type RingBuffer interface { IsReadable() bool IsWritable() bool GetNoOfElements() int + Full() bool } func (rb *ringBuffer) GetNoOfElements() int { @@ -63,6 +64,7 @@ type ringBuffer struct { currentlyWriting bool currentlyReading bool mutex sync.Mutex + full bool } /* @@ -86,6 +88,7 @@ func NewRingBuffer(bufferSize int, elementSize int) (rb *ringBuffer) { rb.currentlyWriting = false rb.currentlyReading = false rb.mutex = sync.Mutex{} + rb.full = false return } @@ -98,8 +101,10 @@ func (rb *ringBuffer) Get() ([]byte, error) { rb.mutex.Lock() defer rb.mutex.Unlock() if !rb.IsWritable() { + rb.full = true return nil, errors.New("Buffer full!") } + rb.full = false var nextlast int if !rb.currentlyWriting { rb.currentlyWriting = true @@ -177,6 +182,10 @@ func (rb *ringBuffer) DoneReading() error { return nil } +func (rb *ringBuffer) Full() bool { + return rb.full +} + /* IsReadable returns true if it is possible to read from the buffer, i.e. if it is not empty.