diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 58c094f6..016ed656 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -52,7 +52,8 @@ const ( const ( inputPtr = iota inputCodecPtr - outputPtr + output1Ptr + output2Ptr rtmpMethodPtr packetizationPtr quantizationModePtr @@ -107,9 +108,11 @@ var ( flagNames = [noOfConfigFlags]struct{ name, description string }{ {"Input", "The input type: Raspivid, File"}, {"InputCodec", "The codec of the input: H264, Mjpeg"}, - {"Output", "The output type: Http, Rtmp, File, Udp, Rtp"}, + // TODO: We should make this a comma-separated list that can have an arbitrary + //number of targets. + {"Output1", "The first output type: Http, Rtmp, File, Udp, Rtp"}, + {"Output2", "The second output type: Http, Rtmp, File, Udp, Rtp"}, {"RtmpMethod", "The method used to send over rtmp: Ffmpeg, Librtmp"}, - // NOTE: we add rtp here when we have this functionality {"Packetization", "The method of data packetisation: Flv, Mpegts, None"}, {"QuantizationMode", "Whether quantization if on or off (variable bitrate): On, Off"}, {"Verbosity", "Verbosity: Info, Warning, Error, Fatal"}, @@ -202,23 +205,40 @@ func handleFlags() { logger.Log(smartlogger.Error, pkg+"bad input codec argument") } - switch *configFlags[outputPtr] { + switch *configFlags[output1Ptr] { case "File": - config.Output = revid.File + config.Output1 = revid.File case "Http": - config.Output = revid.Http + config.Output1 = revid.Http case "Rtmp": - config.Output = revid.Rtmp + config.Output1 = revid.Rtmp case "FfmpegRtmp": - config.Output = revid.FfmpegRtmp + config.Output1 = revid.FfmpegRtmp case "Udp": - config.Output = revid.Udp + config.Output1 = revid.Udp case "Rtp": - config.Output = revid.Rtp - config.Packetization = revid.MpegtsRtp + config.Output1 = revid.Rtp case "": default: - logger.Log(smartlogger.Error, pkg+"bad output argument") + logger.Log(smartlogger.Error, pkg+"bad output 1 argument") + } + + switch *configFlags[output2Ptr] { + case "File": + config.Output2 = revid.File + case "Http": + config.Output2 = revid.Http + case "Rtmp": + config.Output2 = revid.Rtmp + case "FfmpegRtmp": + config.Output2 = revid.FfmpegRtmp + case "Udp": + config.Output2 = revid.Udp + case "Rtp": + config.Output2 = revid.Rtp + case "": + default: + logger.Log(smartlogger.Error, pkg+"bad output 2 argument") } switch *configFlags[rtmpMethodPtr] { @@ -238,8 +258,6 @@ func handleFlags() { config.Packetization = revid.Mpegts case "Flv": config.Packetization = revid.Flv - case "MpegtsRtp": - config.Packetization = revid.MpegtsRtp case "": default: logger.Log(smartlogger.Error, pkg+"bad packetization argument") @@ -418,15 +436,15 @@ func updateRevid(ns *netsender.Sender, vars map[string]string, stop bool) error case "Output": switch value { case "File": - config.Output = revid.File + config.Output1 = revid.File case "Http": - config.Output = revid.Http + config.Output1 = revid.Http case "Rtmp": - config.Output = revid.Rtmp + config.Output1 = revid.Rtmp case "FfmpegRtmp": - config.Output = revid.FfmpegRtmp + config.Output1 = revid.FfmpegRtmp default: - logger.Log(smartlogger.Warning, pkg+"invalid Output param", "value", value) + logger.Log(smartlogger.Warning, pkg+"invalid Output1 param", "value", value) continue } case "FramesPerClip": diff --git a/revid/cmd/h264-file-to-flv-rtmp/main.go b/revid/cmd/h264-file-to-flv-rtmp/main.go index 70250df1..907e414b 100644 --- a/revid/cmd/h264-file-to-flv-rtmp/main.go +++ b/revid/cmd/h264-file-to-flv-rtmp/main.go @@ -57,7 +57,7 @@ func main() { Input: revid.File, InputFileName: inputFile, InputCodec: revid.H264, - Output: revid.Rtmp, + Output1: revid.Rtmp, RtmpMethod: revid.LibRtmp, RtmpUrl: *rtmpUrlPtr, Packetization: revid.Flv, diff --git a/revid/cmd/h264-file-to-mpgets-file/main.go b/revid/cmd/h264-file-to-mpgets-file/main.go index a1238da6..5530f6c4 100644 --- a/revid/cmd/h264-file-to-mpgets-file/main.go +++ b/revid/cmd/h264-file-to-mpgets-file/main.go @@ -49,7 +49,7 @@ func main() { Input: revid.File, InputFileName: inputFile, InputCodec: revid.H264, - Output: revid.File, + Output1: revid.File, OutputFileName: outputFile, Packetization: revid.Mpegts, Logger: smartlogger.New(smartlogger.Info, logPath), diff --git a/revid/config.go b/revid/config.go index 1ff69e1d..a44a2bbe 100644 --- a/revid/config.go +++ b/revid/config.go @@ -39,7 +39,8 @@ import ( type Config struct { Input uint8 InputCodec uint8 - Output uint8 + Output1 uint8 + Output2 uint8 RtmpMethod uint8 Packetization uint8 QuantizationMode uint8 @@ -181,14 +182,14 @@ func (c *Config) Validate(r *Revid) error { return errors.New("bad input codec defined in config") } - switch c.Output { + switch c.Output1 { case File: case Rtp: case Udp: case Rtmp, FfmpegRtmp: if c.RtmpUrl == "" { c.Logger.Log(smartlogger.Info, pkg+"no RTMP URL: falling back to HTTP") - c.Output = Http + c.Output1 = Http break } c.Logger.Log(smartlogger.Info, pkg+"defaulting frames per clip for rtmp out", @@ -197,7 +198,7 @@ func (c *Config) Validate(r *Revid) error { case NothingDefined: c.Logger.Log(smartlogger.Warning, pkg+"no output defined, defaulting", "output", defaultOutput) - c.Output = defaultOutput + c.Output1 = defaultOutput fallthrough case Http: c.Logger.Log(smartlogger.Info, pkg+"defaulting frames per clip for http out", @@ -207,6 +208,22 @@ func (c *Config) Validate(r *Revid) error { return errors.New("bad output type defined in config") } + switch c.Output2 { + case File: + case Rtp: + case Udp: + case Rtmp, FfmpegRtmp: + if c.RtmpUrl == "" { + c.Logger.Log(smartlogger.Info, pkg+"no RTMP URL: falling back to HTTP") + c.Output2 = Http + break + } + case NothingDefined: + case Http: + default: + return errors.New("bad output2 type defined in config") + } + switch c.HorizontalFlip { case Yes: case No: diff --git a/revid/revid.go b/revid/revid.go index fde3efb7..058ccd67 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -43,7 +43,6 @@ import ( "bitbucket.org/ausocean/av/stream/flv" "bitbucket.org/ausocean/av/stream/lex" "bitbucket.org/ausocean/av/stream/mts" - "bitbucket.org/ausocean/av/stream/rtp" "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/ring" "bitbucket.org/ausocean/utils/smartlogger" @@ -118,7 +117,7 @@ type Revid struct { // to the target destination. buffer *ring.Buffer // destination is the target endpoint. - destination loadSender + destination []loadSender // bitrate hold the last send bitrate calculation result. bitrate int @@ -200,39 +199,59 @@ func (r *Revid) reset(config Config) error { } r.config = config - if r.destination != nil { - err = r.destination.close() - if err != nil { - r.config.Logger.Log(smartlogger.Error, pkg+"could not close destination", "error", err.Error()) + for _, dest := range r.destination { + if dest != nil { + err = dest.close() + if err != nil { + return err + } } } - switch r.config.Output { - case File: - s, err := newFileSender(config.OutputFileName) - if err != nil { - return err + + n := 1 + if r.config.Output2 != 0 { + n = 2 + } + r.destination = make([]loadSender, n) + + for outNo, outType := range []uint8{r.config.Output1, r.config.Output2} { + switch outType { + case File: + s, err := newFileSender(config.OutputFileName) + if err != nil { + return err + } + r.destination[outNo] = s + case FfmpegRtmp: + s, err := newFfmpegSender(config.RtmpUrl, r.config.FrameRate) + if err != nil { + return err + } + r.destination[outNo] = s + case Rtmp: + s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimout, rtmpConnectionMaxTries, r.config.Logger.Log) + if err != nil { + return err + } + r.destination[outNo] = s + case Http: + r.destination[outNo] = newHttpSender(r.ns, r.config.Logger.Log) + case Udp: + s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log) + if err != nil { + return err + } + r.destination[outNo] = s + case Rtp: + // TODO: framerate in config should probably be an int, make conversions early + // when setting config fields in revid-cli + fps, _ := strconv.Atoi(r.config.FrameRate) + s, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, fps) + if err != nil { + return err + } + r.destination[outNo] = s } - r.destination = s - case FfmpegRtmp: - s, err := newFfmpegSender(config.RtmpUrl, r.config.FrameRate) - if err != nil { - return err - } - r.destination = s - case Rtmp: - s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimout, rtmpConnectionMaxTries, r.config.Logger.Log) - if err != nil { - return err - } - r.destination = s - case Http: - r.destination = newHttpSender(r.ns, r.config.Logger.Log) - case Rtp, Udp: - s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log) - if err != nil { - return err - } - r.destination = s } switch r.config.Input { @@ -278,10 +297,6 @@ func (r *Revid) reset(config Config) error { if err != nil { return err } - case MpegtsRtp: - r.config.Logger.Log(smartlogger.Info, pkg+"using RTP packetisation") - frameRate, _ := strconv.Atoi(r.config.FrameRate) - r.encoder = mts.NewEncoder(rtp.NewEncoder(&r.packer, frameRate), float64(frameRate)) } return nil @@ -349,51 +364,50 @@ loop: count += chunk.Len() r.config.Logger.Log(smartlogger.Debug, pkg+"about to send") - err = r.destination.load(chunk) - if err != nil { - r.config.Logger.Log(smartlogger.Error, pkg+"failed to load clip") - } - err = r.destination.send() - if err == nil { - r.config.Logger.Log(smartlogger.Debug, pkg+"sent clip") - } - - if err != nil && chunk.Len() > 11 { - r.config.Logger.Log(smartlogger.Error, pkg+"first send failed", "error", err.Error()) - // Try and send again - err = r.destination.send() + for i, dest := range r.destination { + err = dest.load(chunk) if err != nil { - r.config.Logger.Log(smartlogger.Error, pkg+"destination send error", "error", err.Error()) + r.config.Logger.Log(smartlogger.Error, pkg+"failed to load clip to output"+strconv.Itoa(i)) } + } - // if there's still an error we try and reconnect, unless we're stopping - for err != nil { - r.config.Logger.Log(smartlogger.Debug, pkg+"send failed again, trying to reconnect...") - time.Sleep(sendFailedDelay) - r.config.Logger.Log(smartlogger.Error, pkg+"send failed with error", "error", err.Error()) + for i, dest := range r.destination { + err = dest.send() + if err == nil { + r.config.Logger.Log(smartlogger.Debug, pkg+"sent clip to output "+strconv.Itoa(i)) + } else { + r.config.Logger.Log(smartlogger.Error, pkg+"send to output "+strconv.Itoa(i)+ + "failed, trying again", "error", err.Error()) + err = dest.send() + if err != nil && chunk.Len() > 11 { + r.config.Logger.Log(smartlogger.Error, pkg+"second send attempted failed, restarting connection", "error", err.Error()) + for err != nil { + time.Sleep(sendFailedDelay) + if rs, ok := dest.(restarter); ok { + r.config.Logger.Log(smartlogger.Debug, pkg+"restarting session", "session", rs) + err = rs.restart() + if err != nil { + r.config.Logger.Log(smartlogger.Error, pkg+"failed to restart rtmp session", "error", err.Error()) + r.isRunning = false + return + } - if rs, ok := r.destination.(restarter); ok { - r.config.Logger.Log(smartlogger.Debug, pkg+"restarting session", "session", rs) - err = rs.restart() - if err != nil { - // TODO(kortschak): Make this "Fatal" when that exists. - r.config.Logger.Log(smartlogger.Error, pkg+"failed to restart rtmp session", "error", err.Error()) - r.isRunning = false - return + r.config.Logger.Log(smartlogger.Info, pkg+"restarted rtmp session") + } + + err = dest.send() + if err != nil { + r.config.Logger.Log(smartlogger.Error, pkg+"send failed again, with error", "error", err.Error()) + } } - r.config.Logger.Log(smartlogger.Info, pkg+"restarted rtmp session") - } - - r.config.Logger.Log(smartlogger.Debug, pkg+"trying to send again with new connection") - err = r.destination.send() - if err != nil { - r.config.Logger.Log(smartlogger.Error, pkg+"send failed with error", "error", err.Error()) } } } - r.destination.release() - + // Release the chunk back to the ring buffer for use + for _, dest := range r.destination { + dest.release() + } r.config.Logger.Log(smartlogger.Debug, pkg+"done reading that clip from ring buffer") // Log some information regarding bitrate and ring buffer size if it's time @@ -409,9 +423,11 @@ loop: } } r.config.Logger.Log(smartlogger.Info, pkg+"not outputting clips anymore") - err := r.destination.close() - if err != nil { - r.config.Logger.Log(smartlogger.Error, pkg+"failed to close destination", "error", err.Error()) + for i, dest := range r.destination { + err := dest.close() + if err != nil { + r.config.Logger.Log(smartlogger.Error, pkg+"failed to close output"+strconv.Itoa(i)+" destination", "error", err.Error()) + } } } diff --git a/revid/senders.go b/revid/senders.go index cc0b869f..fa72af3e 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -35,6 +35,7 @@ import ( "os/exec" "bitbucket.org/ausocean/av/rtmp" + "bitbucket.org/ausocean/av/stream/rtp" "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/ring" "bitbucket.org/ausocean/utils/smartlogger" @@ -145,15 +146,16 @@ func (s *httpSender) send() error { if send { _, _, err = s.client.Send(netsender.RequestPoll, pins) } + return err +} + +func (s *httpSender) release() { // We will not retry, so release // the chunk and clear it now. s.chunk.Close() s.chunk = nil - return err } -func (s *httpSender) release() {} - func (s *httpSender) close() error { return nil } // ffmpegSender implements loadSender for an FFMPEG RTMP destination. @@ -291,7 +293,7 @@ func (s *rtmpSender) close() error { return s.sess.Close() } -// rtpSender implements loadSender for a native udp destination. +// udpSender implements loadSender for a native udp destination. type udpSender struct { conn net.Conn log func(lvl int8, msg string, args ...interface{}) @@ -325,3 +327,40 @@ func (s *udpSender) release() { } func (s *udpSender) close() error { return nil } + +// TODO: Write restart func for rtpSender +// rtpSender implements loadSender for a native udp destination with rtp packetization. +type rtpSender struct { + log func(lvl int8, msg string, args ...interface{}) + chunk *ring.Chunk + encoder *rtp.Encoder +} + +func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps int) (*rtpSender, error) { + conn, err := net.Dial("udp", addr) + if err != nil { + return nil, err + } + s := &rtpSender{ + log: log, + encoder: rtp.NewEncoder(conn, fps), + } + return s, nil +} + +func (s *rtpSender) load(c *ring.Chunk) error { + s.chunk = c + return nil +} + +func (s *rtpSender) send() error { + _, err := s.chunk.WriteTo(s.encoder) + return err +} + +func (s *rtpSender) release() { + s.chunk.Close() + s.chunk = nil +} + +func (s *rtpSender) close() error { return nil }