revid: made destination for revid a []loadSender and change code such that iterate through this when performing actions relating to destinations like write, send and close. Also created new sender call rtp sender that has a inherent rtp encoder

This commit is contained in:
saxon 2018-11-25 23:10:38 +10:30
parent a4ded5337d
commit 5920d1c8d7
3 changed files with 137 additions and 50 deletions

View File

@ -39,7 +39,8 @@ import (
type Config struct { type Config struct {
Input uint8 Input uint8
InputCodec uint8 InputCodec uint8
Output uint8 Output1 uint8
Output2 uint8
RtmpMethod uint8 RtmpMethod uint8
Packetization uint8 Packetization uint8
QuantizationMode uint8 QuantizationMode uint8
@ -181,14 +182,14 @@ func (c *Config) Validate(r *Revid) error {
return errors.New("bad input codec defined in config") return errors.New("bad input codec defined in config")
} }
switch c.Output { switch c.Output1 {
case File: case File:
case Rtp: case Rtp:
case Udp: case Udp:
case Rtmp, FfmpegRtmp: case Rtmp, FfmpegRtmp:
if c.RtmpUrl == "" { if c.RtmpUrl == "" {
c.Logger.Log(smartlogger.Info, pkg+"no RTMP URL: falling back to HTTP") c.Logger.Log(smartlogger.Info, pkg+"no RTMP URL: falling back to HTTP")
c.Output = Http c.Output1 = Http
break break
} }
c.Logger.Log(smartlogger.Info, pkg+"defaulting frames per clip for rtmp out", c.Logger.Log(smartlogger.Info, pkg+"defaulting frames per clip for rtmp out",
@ -197,7 +198,7 @@ func (c *Config) Validate(r *Revid) error {
case NothingDefined: case NothingDefined:
c.Logger.Log(smartlogger.Warning, pkg+"no output defined, defaulting", "output", c.Logger.Log(smartlogger.Warning, pkg+"no output defined, defaulting", "output",
defaultOutput) defaultOutput)
c.Output = defaultOutput c.Output1 = defaultOutput
fallthrough fallthrough
case Http: case Http:
c.Logger.Log(smartlogger.Info, pkg+"defaulting frames per clip for http out", c.Logger.Log(smartlogger.Info, pkg+"defaulting frames per clip for http out",
@ -207,6 +208,22 @@ func (c *Config) Validate(r *Revid) error {
return errors.New("bad output type defined in config") return errors.New("bad output type defined in config")
} }
switch c.Output2 {
case File:
case Rtp:
case Udp:
case Rtmp, FfmpegRtmp:
if c.RtmpUrl == "" {
c.Logger.Log(smartlogger.Info, pkg+"no RTMP URL: falling back to HTTP")
c.Output2 = Http
break
}
case NothingDefined:
case Http:
default:
return errors.New("bad output2 type defined in config")
}
switch c.HorizontalFlip { switch c.HorizontalFlip {
case Yes: case Yes:
case No: case No:

View File

@ -200,7 +200,7 @@ func (r *Revid) reset(config Config) error {
} }
r.config = config r.config = config
for i, destination := range r.destination { for _, destination := range r.destination {
if destination != nil { if destination != nil {
err = destination.close() err = destination.close()
if err != nil { if err != nil {
@ -209,33 +209,68 @@ func (r *Revid) reset(config Config) error {
} }
} }
switch r.config.Output { if r.config.Output2 != 0 {
r.destination = make([]loadSender, 2)
} else {
r.destination = make([]loadSender, 1)
}
switch r.config.Output1 {
case File: case File:
s, err := newFileSender(config.OutputFileName) s, err := newFileSender(config.OutputFileName)
if err != nil { if err != nil {
return err return err
} }
r.destination = s r.destination[0] = s
case FfmpegRtmp: case FfmpegRtmp:
s, err := newFfmpegSender(config.RtmpUrl, r.config.FrameRate) s, err := newFfmpegSender(config.RtmpUrl, r.config.FrameRate)
if err != nil { if err != nil {
return err return err
} }
r.destination = s r.destination[0] = s
case Rtmp: case Rtmp:
s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimout, rtmpConnectionMaxTries, r.config.Logger.Log) s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimout, rtmpConnectionMaxTries, r.config.Logger.Log)
if err != nil { if err != nil {
return err return err
} }
r.destination = s r.destination[0] = s
case Http: case Http:
r.destination = newHttpSender(r.ns, r.config.Logger.Log) r.destination[0] = newHttpSender(r.ns, r.config.Logger.Log)
case Rtp, Udp: case Rtp, Udp:
s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log) s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log)
if err != nil { if err != nil {
return err return err
} }
r.destination = s r.destination[0] = s
}
switch r.config.Output2 {
case File:
s, err := newFileSender(config.OutputFileName)
if err != nil {
return err
}
r.destination[1] = s
case FfmpegRtmp:
s, err := newFfmpegSender(config.RtmpUrl, r.config.FrameRate)
if err != nil {
return err
}
r.destination[1] = s
case Rtmp:
s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimout, rtmpConnectionMaxTries, r.config.Logger.Log)
if err != nil {
return err
}
r.destination[1] = s
case Http:
r.destination[1] = newHttpSender(r.ns, r.config.Logger.Log)
case Rtp, Udp:
s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log)
if err != nil {
return err
}
r.destination[1] = s
} }
switch r.config.Input { switch r.config.Input {
@ -352,50 +387,46 @@ loop:
count += chunk.Len() count += chunk.Len()
r.config.Logger.Log(smartlogger.Debug, pkg+"about to send") r.config.Logger.Log(smartlogger.Debug, pkg+"about to send")
err = r.destination.load(chunk) for i, destination := range r.destination {
if err != nil { err = destination.load(chunk)
r.config.Logger.Log(smartlogger.Error, pkg+"failed to load clip")
}
err = r.destination.send()
if err == nil {
r.config.Logger.Log(smartlogger.Debug, pkg+"sent clip")
}
if err != nil && chunk.Len() > 11 {
r.config.Logger.Log(smartlogger.Error, pkg+"first send failed", "error", err.Error())
// Try and send again
err = r.destination.send()
if err != nil { if err != nil {
r.config.Logger.Log(smartlogger.Error, pkg+"destination send error", "error", err.Error()) r.config.Logger.Log(smartlogger.Error, pkg+"failed to load clip to output"+strconv.Itoa(i))
} }
}
// if there's still an error we try and reconnect, unless we're stopping for i, dest := range r.destination {
for err != nil { err = dest.send()
r.config.Logger.Log(smartlogger.Debug, pkg+"send failed again, trying to reconnect...") if err == nil {
time.Sleep(sendFailedDelay) r.config.Logger.Log(smartlogger.Debug, pkg+"sent clip to output"+strconv.Itoa(i))
r.config.Logger.Log(smartlogger.Error, pkg+"send failed with error", "error", err.Error()) } else {
r.config.Logger.Log(smartlogger.Error, pkg+"send to output"+strconv.Itoa(i)+
if rs, ok := r.destination.(restarter); ok { "failed, trying again", "error", err.Error())
r.config.Logger.Log(smartlogger.Debug, pkg+"restarting session", "session", rs) err = dest.send()
err = rs.restart() if err != nil && chunk.Len() > 11 {
if err != nil { r.config.Logger.Log(smartlogger.Error, pkg+"second send attempted failed, restarting connection", "error", err.Error())
// TODO(kortschak): Make this "Fatal" when that exists. for err != nil {
r.config.Logger.Log(smartlogger.Error, pkg+"failed to restart rtmp session", "error", err.Error()) time.Sleep(sendFailedDelay)
r.isRunning = false if rs, ok := dest.(restarter); ok {
return r.config.Logger.Log(smartlogger.Debug, pkg+"restarting session", "session", rs)
err = rs.restart()
for err != nil {
r.config.Logger.Log(smartlogger.Error, pkg+"failed to restart rtmp session, trying again", "error", err.Error())
err = rs.restart()
}
r.config.Logger.Log(smartlogger.Info, pkg+"restarted rtmp session")
}
err = dest.send()
if err != nil {
r.config.Logger.Log(smartlogger.Error, pkg+"send failed again, with error", "error", err.Error())
}
} }
r.config.Logger.Log(smartlogger.Info, pkg+"restarted rtmp session")
}
r.config.Logger.Log(smartlogger.Debug, pkg+"trying to send again with new connection")
err = r.destination.send()
if err != nil {
r.config.Logger.Log(smartlogger.Error, pkg+"send failed with error", "error", err.Error())
} }
} }
} }
r.destination.release() for _, dest := range r.destination {
dest.release()
}
r.config.Logger.Log(smartlogger.Debug, pkg+"done reading that clip from ring buffer") r.config.Logger.Log(smartlogger.Debug, pkg+"done reading that clip from ring buffer")
@ -412,9 +443,11 @@ loop:
} }
} }
r.config.Logger.Log(smartlogger.Info, pkg+"not outputting clips anymore") r.config.Logger.Log(smartlogger.Info, pkg+"not outputting clips anymore")
err := r.destination.close() for i, dest := range r.destination {
if err != nil { err := dest.close()
r.config.Logger.Log(smartlogger.Error, pkg+"failed to close destination", "error", err.Error()) if err != nil {
r.config.Logger.Log(smartlogger.Error, pkg+"failed to close output"+strconv.Itoa(i)+" destination", "error", err.Error())
}
} }
} }

View File

@ -35,6 +35,7 @@ import (
"os/exec" "os/exec"
"bitbucket.org/ausocean/av/rtmp" "bitbucket.org/ausocean/av/rtmp"
"bitbucket.org/ausocean/av/stream/rtp"
"bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/ring" "bitbucket.org/ausocean/utils/ring"
"bitbucket.org/ausocean/utils/smartlogger" "bitbucket.org/ausocean/utils/smartlogger"
@ -325,3 +326,39 @@ func (s *udpSender) release() {
} }
func (s *udpSender) close() error { return nil } func (s *udpSender) close() error { return nil }
// rtpSender implements loadSender for a native udp destination.
type rtpSender struct {
log func(lvl int8, msg string, args ...interface{})
chunk *ring.Chunk
encoder *rtp.Encoder
}
func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps int) (*rtpSender, error) {
conn, err := net.Dial("udp", addr)
if err != nil {
return nil, err
}
s := &rtpSender{
log: log,
encoder: rtp.NewEncoder(conn, fps),
}
return s, nil
}
func (s *rtpSender) load(c *ring.Chunk) error {
s.chunk = c
return nil
}
func (s *rtpSender) send() error {
_, err := s.chunk.WriteTo(s.encoder)
return err
}
func (s *rtpSender) release() {
s.chunk.Close()
s.chunk = nil
}
func (s *rtpSender) close() error { return nil }