mirror of https://bitbucket.org/ausocean/av.git
trying new way to deal with send errors
This commit is contained in:
parent
d03e70e8dc
commit
76d0de2ffb
|
@ -383,6 +383,7 @@ func (r *revid) outputClips() {
|
|||
prevTime := now
|
||||
bytes := 0
|
||||
delay := 0
|
||||
errorCount := 0
|
||||
for r.isRunning {
|
||||
// Here we slow things down as much as we can to decrease cpu usage
|
||||
switch {
|
||||
|
@ -401,23 +402,25 @@ func (r *revid) outputClips() {
|
|||
err2 := r.sendClip(clip)
|
||||
r.Log(Debug, "Finished send!")
|
||||
|
||||
for ; err2 != nil; errorCount++ {
|
||||
r.Log(Warning, "Send failed trying again!")
|
||||
// If the clip size is not bigger than the threshold then we classify
|
||||
// it as junk and we don't try to send it off again.
|
||||
if err2 != nil && len(clip) > 11 {
|
||||
// Try and send again
|
||||
err2 = r.sendClip(clip)
|
||||
if err2 == nil {
|
||||
break
|
||||
}
|
||||
|
||||
// So that we don't fill up the log, once we reach the maxSendFailedErrorCount
|
||||
// we will add some delay to slow things down until we have a connection again
|
||||
time.Sleep(time.Duration(sendFailedDelay) * time.Second)
|
||||
// if there's still an error we try and reconnect
|
||||
for err2 != nil {
|
||||
time.Sleep(time.Duration(5)*time.Millisecond)
|
||||
r.Log(Error, err2.Error())
|
||||
if r.config.Output == NativeRtmp {
|
||||
r.rtmpInst.EndSession()
|
||||
}
|
||||
if r.ringBuffer.Full() {
|
||||
r.flushData()
|
||||
err = r.rtmpInst.StartSession()
|
||||
}
|
||||
if r.config.Output == NativeRtmp {
|
||||
r.rtmpInst.StartSession()
|
||||
}
|
||||
// and if the ring buffer is full then we flush the incoming data
|
||||
err2 = r.sendClip(clip)
|
||||
}
|
||||
}
|
||||
// let the ringbuffer know that we're done with the memory we grabbed when
|
||||
|
|
|
@ -46,6 +46,7 @@ type RingBuffer interface {
|
|||
IsReadable() bool
|
||||
IsWritable() bool
|
||||
GetNoOfElements() int
|
||||
Full() bool
|
||||
}
|
||||
|
||||
func (rb *ringBuffer) GetNoOfElements() int {
|
||||
|
@ -63,6 +64,7 @@ type ringBuffer struct {
|
|||
currentlyWriting bool
|
||||
currentlyReading bool
|
||||
mutex sync.Mutex
|
||||
full bool
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -86,6 +88,7 @@ func NewRingBuffer(bufferSize int, elementSize int) (rb *ringBuffer) {
|
|||
rb.currentlyWriting = false
|
||||
rb.currentlyReading = false
|
||||
rb.mutex = sync.Mutex{}
|
||||
rb.full = false
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -98,8 +101,10 @@ func (rb *ringBuffer) Get() ([]byte, error) {
|
|||
rb.mutex.Lock()
|
||||
defer rb.mutex.Unlock()
|
||||
if !rb.IsWritable() {
|
||||
rb.full = true
|
||||
return nil, errors.New("Buffer full!")
|
||||
}
|
||||
rb.full = false
|
||||
var nextlast int
|
||||
if !rb.currentlyWriting {
|
||||
rb.currentlyWriting = true
|
||||
|
@ -177,6 +182,10 @@ func (rb *ringBuffer) DoneReading() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (rb *ringBuffer) Full() bool {
|
||||
return rb.full
|
||||
}
|
||||
|
||||
/*
|
||||
IsReadable returns true if it is possible to read from the buffer, i.e. if
|
||||
it is not empty.
|
||||
|
|
Loading…
Reference in New Issue