From 5920d1c8d7c43cf2f3912e723585406efb44fc5d Mon Sep 17 00:00:00 2001 From: saxon Date: Sun, 25 Nov 2018 23:10:38 +1030 Subject: [PATCH] revid: made destination for revid a []loadSender and change code such that iterate through this when performing actions relating to destinations like write, send and close. Also created new sender call rtp sender that has a inherent rtp encoder --- revid/config.go | 25 ++++++++-- revid/revid.go | 125 ++++++++++++++++++++++++++++++----------------- revid/senders.go | 37 ++++++++++++++ 3 files changed, 137 insertions(+), 50 deletions(-) diff --git a/revid/config.go b/revid/config.go index 1ff69e1d..a44a2bbe 100644 --- a/revid/config.go +++ b/revid/config.go @@ -39,7 +39,8 @@ import ( type Config struct { Input uint8 InputCodec uint8 - Output uint8 + Output1 uint8 + Output2 uint8 RtmpMethod uint8 Packetization uint8 QuantizationMode uint8 @@ -181,14 +182,14 @@ func (c *Config) Validate(r *Revid) error { return errors.New("bad input codec defined in config") } - switch c.Output { + switch c.Output1 { case File: case Rtp: case Udp: case Rtmp, FfmpegRtmp: if c.RtmpUrl == "" { c.Logger.Log(smartlogger.Info, pkg+"no RTMP URL: falling back to HTTP") - c.Output = Http + c.Output1 = Http break } c.Logger.Log(smartlogger.Info, pkg+"defaulting frames per clip for rtmp out", @@ -197,7 +198,7 @@ func (c *Config) Validate(r *Revid) error { case NothingDefined: c.Logger.Log(smartlogger.Warning, pkg+"no output defined, defaulting", "output", defaultOutput) - c.Output = defaultOutput + c.Output1 = defaultOutput fallthrough case Http: c.Logger.Log(smartlogger.Info, pkg+"defaulting frames per clip for http out", @@ -207,6 +208,22 @@ func (c *Config) Validate(r *Revid) error { return errors.New("bad output type defined in config") } + switch c.Output2 { + case File: + case Rtp: + case Udp: + case Rtmp, FfmpegRtmp: + if c.RtmpUrl == "" { + c.Logger.Log(smartlogger.Info, pkg+"no RTMP URL: falling back to HTTP") + c.Output2 = Http + break + } + case NothingDefined: + case Http: + default: + return errors.New("bad output2 type defined in config") + } + switch c.HorizontalFlip { case Yes: case No: diff --git a/revid/revid.go b/revid/revid.go index f555e655..ce29abbc 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -200,7 +200,7 @@ func (r *Revid) reset(config Config) error { } r.config = config - for i, destination := range r.destination { + for _, destination := range r.destination { if destination != nil { err = destination.close() if err != nil { @@ -209,33 +209,68 @@ func (r *Revid) reset(config Config) error { } } - switch r.config.Output { + if r.config.Output2 != 0 { + r.destination = make([]loadSender, 2) + } else { + r.destination = make([]loadSender, 1) + } + + switch r.config.Output1 { case File: s, err := newFileSender(config.OutputFileName) if err != nil { return err } - r.destination = s + r.destination[0] = s case FfmpegRtmp: s, err := newFfmpegSender(config.RtmpUrl, r.config.FrameRate) if err != nil { return err } - r.destination = s + r.destination[0] = s case Rtmp: s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimout, rtmpConnectionMaxTries, r.config.Logger.Log) if err != nil { return err } - r.destination = s + r.destination[0] = s case Http: - r.destination = newHttpSender(r.ns, r.config.Logger.Log) + r.destination[0] = newHttpSender(r.ns, r.config.Logger.Log) case Rtp, Udp: s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log) if err != nil { return err } - r.destination = s + r.destination[0] = s + } + + switch r.config.Output2 { + case File: + s, err := newFileSender(config.OutputFileName) + if err != nil { + return err + } + r.destination[1] = s + case FfmpegRtmp: + s, err := newFfmpegSender(config.RtmpUrl, r.config.FrameRate) + if err != nil { + return err + } + r.destination[1] = s + case Rtmp: + s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimout, rtmpConnectionMaxTries, r.config.Logger.Log) + if err != nil { + return err + } + r.destination[1] = s + case Http: + r.destination[1] = newHttpSender(r.ns, r.config.Logger.Log) + case Rtp, Udp: + s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log) + if err != nil { + return err + } + r.destination[1] = s } switch r.config.Input { @@ -352,50 +387,46 @@ loop: count += chunk.Len() r.config.Logger.Log(smartlogger.Debug, pkg+"about to send") - err = r.destination.load(chunk) - if err != nil { - r.config.Logger.Log(smartlogger.Error, pkg+"failed to load clip") - } - err = r.destination.send() - if err == nil { - r.config.Logger.Log(smartlogger.Debug, pkg+"sent clip") - } - - if err != nil && chunk.Len() > 11 { - r.config.Logger.Log(smartlogger.Error, pkg+"first send failed", "error", err.Error()) - // Try and send again - err = r.destination.send() + for i, destination := range r.destination { + err = destination.load(chunk) if err != nil { - r.config.Logger.Log(smartlogger.Error, pkg+"destination send error", "error", err.Error()) + r.config.Logger.Log(smartlogger.Error, pkg+"failed to load clip to output"+strconv.Itoa(i)) } + } - // if there's still an error we try and reconnect, unless we're stopping - for err != nil { - r.config.Logger.Log(smartlogger.Debug, pkg+"send failed again, trying to reconnect...") - time.Sleep(sendFailedDelay) - r.config.Logger.Log(smartlogger.Error, pkg+"send failed with error", "error", err.Error()) - - if rs, ok := r.destination.(restarter); ok { - r.config.Logger.Log(smartlogger.Debug, pkg+"restarting session", "session", rs) - err = rs.restart() - if err != nil { - // TODO(kortschak): Make this "Fatal" when that exists. - r.config.Logger.Log(smartlogger.Error, pkg+"failed to restart rtmp session", "error", err.Error()) - r.isRunning = false - return + for i, dest := range r.destination { + err = dest.send() + if err == nil { + r.config.Logger.Log(smartlogger.Debug, pkg+"sent clip to output"+strconv.Itoa(i)) + } else { + r.config.Logger.Log(smartlogger.Error, pkg+"send to output"+strconv.Itoa(i)+ + "failed, trying again", "error", err.Error()) + err = dest.send() + if err != nil && chunk.Len() > 11 { + r.config.Logger.Log(smartlogger.Error, pkg+"second send attempted failed, restarting connection", "error", err.Error()) + for err != nil { + time.Sleep(sendFailedDelay) + if rs, ok := dest.(restarter); ok { + r.config.Logger.Log(smartlogger.Debug, pkg+"restarting session", "session", rs) + err = rs.restart() + for err != nil { + r.config.Logger.Log(smartlogger.Error, pkg+"failed to restart rtmp session, trying again", "error", err.Error()) + err = rs.restart() + } + r.config.Logger.Log(smartlogger.Info, pkg+"restarted rtmp session") + } + err = dest.send() + if err != nil { + r.config.Logger.Log(smartlogger.Error, pkg+"send failed again, with error", "error", err.Error()) + } } - r.config.Logger.Log(smartlogger.Info, pkg+"restarted rtmp session") - } - - r.config.Logger.Log(smartlogger.Debug, pkg+"trying to send again with new connection") - err = r.destination.send() - if err != nil { - r.config.Logger.Log(smartlogger.Error, pkg+"send failed with error", "error", err.Error()) } } } - r.destination.release() + for _, dest := range r.destination { + dest.release() + } r.config.Logger.Log(smartlogger.Debug, pkg+"done reading that clip from ring buffer") @@ -412,9 +443,11 @@ loop: } } r.config.Logger.Log(smartlogger.Info, pkg+"not outputting clips anymore") - err := r.destination.close() - if err != nil { - r.config.Logger.Log(smartlogger.Error, pkg+"failed to close destination", "error", err.Error()) + for i, dest := range r.destination { + err := dest.close() + if err != nil { + r.config.Logger.Log(smartlogger.Error, pkg+"failed to close output"+strconv.Itoa(i)+" destination", "error", err.Error()) + } } } diff --git a/revid/senders.go b/revid/senders.go index cc0b869f..eb149445 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -35,6 +35,7 @@ import ( "os/exec" "bitbucket.org/ausocean/av/rtmp" + "bitbucket.org/ausocean/av/stream/rtp" "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/ring" "bitbucket.org/ausocean/utils/smartlogger" @@ -325,3 +326,39 @@ func (s *udpSender) release() { } func (s *udpSender) close() error { return nil } + +// rtpSender implements loadSender for a native udp destination. +type rtpSender struct { + log func(lvl int8, msg string, args ...interface{}) + chunk *ring.Chunk + encoder *rtp.Encoder +} + +func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps int) (*rtpSender, error) { + conn, err := net.Dial("udp", addr) + if err != nil { + return nil, err + } + s := &rtpSender{ + log: log, + encoder: rtp.NewEncoder(conn, fps), + } + return s, nil +} + +func (s *rtpSender) load(c *ring.Chunk) error { + s.chunk = c + return nil +} + +func (s *rtpSender) send() error { + _, err := s.chunk.WriteTo(s.encoder) + return err +} + +func (s *rtpSender) release() { + s.chunk.Close() + s.chunk = nil +} + +func (s *rtpSender) close() error { return nil }