revid: revid output check and destination setting cleaner. Also changed destination release to single chunk.Close() so that chunks aren't closed more than once

This commit is contained in:
saxon 2018-11-25 23:45:38 +10:30
parent 218dcfb8b2
commit 98eb2c64e8
2 changed files with 54 additions and 71 deletions

View File

@ -43,7 +43,6 @@ import (
"bitbucket.org/ausocean/av/stream/flv" "bitbucket.org/ausocean/av/stream/flv"
"bitbucket.org/ausocean/av/stream/lex" "bitbucket.org/ausocean/av/stream/lex"
"bitbucket.org/ausocean/av/stream/mts" "bitbucket.org/ausocean/av/stream/mts"
"bitbucket.org/ausocean/av/stream/rtp"
"bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/ring" "bitbucket.org/ausocean/utils/ring"
"bitbucket.org/ausocean/utils/smartlogger" "bitbucket.org/ausocean/utils/smartlogger"
@ -75,6 +74,11 @@ const (
pkg = "revid:" pkg = "revid:"
) )
const (
out1 = 0
out2 = 1
)
// Log Types // Log Types
const ( const (
Error = "Error" Error = "Error"
@ -209,68 +213,50 @@ func (r *Revid) reset(config Config) error {
} }
} }
n := 1
if r.config.Output2 != 0 { if r.config.Output2 != 0 {
r.destination = make([]loadSender, 2) n = 2
} else {
r.destination = make([]loadSender, 1)
} }
r.destination = make([]loadSender, n)
switch r.config.Output1 { for outNo, outType := range []uint8{r.config.Output1, r.config.Output2} {
case File: switch outType {
s, err := newFileSender(config.OutputFileName) case File:
if err != nil { s, err := newFileSender(config.OutputFileName)
return err if err != nil {
return err
}
r.destination[outNo] = s
case FfmpegRtmp:
s, err := newFfmpegSender(config.RtmpUrl, r.config.FrameRate)
if err != nil {
return err
}
r.destination[outNo] = s
case Rtmp:
s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimout, rtmpConnectionMaxTries, r.config.Logger.Log)
if err != nil {
return err
}
r.destination[outNo] = s
case Http:
r.destination[outNo] = newHttpSender(r.ns, r.config.Logger.Log)
case Udp:
s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log)
if err != nil {
return err
}
r.destination[outNo] = s
case Rtp:
// TODO: framerate in config should probably be an int, make conversions early
// when setting config fields in revid-cli
fps, _ := strconv.Atoi(r.config.FrameRate)
s, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, fps)
if err != nil {
return err
}
r.destination[outNo] = s
} }
r.destination[0] = s
case FfmpegRtmp:
s, err := newFfmpegSender(config.RtmpUrl, r.config.FrameRate)
if err != nil {
return err
}
r.destination[0] = s
case Rtmp:
s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimout, rtmpConnectionMaxTries, r.config.Logger.Log)
if err != nil {
return err
}
r.destination[0] = s
case Http:
r.destination[0] = newHttpSender(r.ns, r.config.Logger.Log)
case Rtp, Udp:
s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log)
if err != nil {
return err
}
r.destination[0] = s
}
switch r.config.Output2 {
case File:
s, err := newFileSender(config.OutputFileName)
if err != nil {
return err
}
r.destination[1] = s
case FfmpegRtmp:
s, err := newFfmpegSender(config.RtmpUrl, r.config.FrameRate)
if err != nil {
return err
}
r.destination[1] = s
case Rtmp:
s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimout, rtmpConnectionMaxTries, r.config.Logger.Log)
if err != nil {
return err
}
r.destination[1] = s
case Http:
r.destination[1] = newHttpSender(r.ns, r.config.Logger.Log)
case Rtp, Udp:
s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log)
if err != nil {
return err
}
r.destination[1] = s
} }
switch r.config.Input { switch r.config.Input {
@ -316,10 +302,6 @@ func (r *Revid) reset(config Config) error {
if err != nil { if err != nil {
return err return err
} }
case MpegtsRtp:
r.config.Logger.Log(smartlogger.Info, pkg+"using RTP packetisation")
frameRate, _ := strconv.Atoi(r.config.FrameRate)
r.encoder = mts.NewEncoder(rtp.NewEncoder(&r.packer, frameRate), float64(frameRate))
} }
return nil return nil
@ -397,9 +379,9 @@ loop:
for i, dest := range r.destination { for i, dest := range r.destination {
err = dest.send() err = dest.send()
if err == nil { if err == nil {
r.config.Logger.Log(smartlogger.Debug, pkg+"sent clip to output"+strconv.Itoa(i)) r.config.Logger.Log(smartlogger.Debug, pkg+"sent clip to output "+strconv.Itoa(i))
} else { } else {
r.config.Logger.Log(smartlogger.Error, pkg+"send to output"+strconv.Itoa(i)+ r.config.Logger.Log(smartlogger.Error, pkg+"send to output "+strconv.Itoa(i)+
"failed, trying again", "error", err.Error()) "failed, trying again", "error", err.Error())
err = dest.send() err = dest.send()
if err != nil && chunk.Len() > 11 { if err != nil && chunk.Len() > 11 {
@ -413,8 +395,10 @@ loop:
r.config.Logger.Log(smartlogger.Error, pkg+"failed to restart rtmp session, trying again", "error", err.Error()) r.config.Logger.Log(smartlogger.Error, pkg+"failed to restart rtmp session, trying again", "error", err.Error())
err = rs.restart() err = rs.restart()
} }
r.config.Logger.Log(smartlogger.Info, pkg+"restarted rtmp session") r.config.Logger.Log(smartlogger.Info, pkg+"restarted rtmp session")
} }
err = dest.send() err = dest.send()
if err != nil { if err != nil {
r.config.Logger.Log(smartlogger.Error, pkg+"send failed again, with error", "error", err.Error()) r.config.Logger.Log(smartlogger.Error, pkg+"send failed again, with error", "error", err.Error())
@ -424,10 +408,8 @@ loop:
} }
} }
for _, dest := range r.destination { // Release the chunk back to the ring buffer for use
dest.release() chunk.Close()
}
r.config.Logger.Log(smartlogger.Debug, pkg+"done reading that clip from ring buffer") r.config.Logger.Log(smartlogger.Debug, pkg+"done reading that clip from ring buffer")
// Log some information regarding bitrate and ring buffer size if it's time // Log some information regarding bitrate and ring buffer size if it's time

View File

@ -292,7 +292,7 @@ func (s *rtmpSender) close() error {
return s.sess.Close() return s.sess.Close()
} }
// rtpSender implements loadSender for a native udp destination. // udpSender implements loadSender for a native udp destination.
type udpSender struct { type udpSender struct {
conn net.Conn conn net.Conn
log func(lvl int8, msg string, args ...interface{}) log func(lvl int8, msg string, args ...interface{})
@ -327,7 +327,8 @@ func (s *udpSender) release() {
func (s *udpSender) close() error { return nil } func (s *udpSender) close() error { return nil }
// rtpSender implements loadSender for a native udp destination. // TODO: Write restart func for rtpSender
// rtpSender implements loadSender for a native udp destination with rtp packetization.
type rtpSender struct { type rtpSender struct {
log func(lvl int8, msg string, args ...interface{}) log func(lvl int8, msg string, args ...interface{})
chunk *ring.Chunk chunk *ring.Chunk