Merged in multiple-outputs (pull request #71)

Multiple Outputs

Approved-by: kortschak <dan@kortschak.io>
This commit is contained in:
Saxon Milton 2018-12-03 23:46:17 +00:00
commit f1586641a0
6 changed files with 194 additions and 104 deletions

View File

@ -52,7 +52,8 @@ const (
const ( const (
inputPtr = iota inputPtr = iota
inputCodecPtr inputCodecPtr
outputPtr output1Ptr
output2Ptr
rtmpMethodPtr rtmpMethodPtr
packetizationPtr packetizationPtr
quantizationModePtr quantizationModePtr
@ -107,9 +108,11 @@ var (
flagNames = [noOfConfigFlags]struct{ name, description string }{ flagNames = [noOfConfigFlags]struct{ name, description string }{
{"Input", "The input type: Raspivid, File"}, {"Input", "The input type: Raspivid, File"},
{"InputCodec", "The codec of the input: H264, Mjpeg"}, {"InputCodec", "The codec of the input: H264, Mjpeg"},
{"Output", "The output type: Http, Rtmp, File, Udp, Rtp"}, // TODO: We should make this a comma-separated list that can have an arbitrary
//number of targets.
{"Output1", "The first output type: Http, Rtmp, File, Udp, Rtp"},
{"Output2", "The second output type: Http, Rtmp, File, Udp, Rtp"},
{"RtmpMethod", "The method used to send over rtmp: Ffmpeg, Librtmp"}, {"RtmpMethod", "The method used to send over rtmp: Ffmpeg, Librtmp"},
// NOTE: we add rtp here when we have this functionality
{"Packetization", "The method of data packetisation: Flv, Mpegts, None"}, {"Packetization", "The method of data packetisation: Flv, Mpegts, None"},
{"QuantizationMode", "Whether quantization if on or off (variable bitrate): On, Off"}, {"QuantizationMode", "Whether quantization if on or off (variable bitrate): On, Off"},
{"Verbosity", "Verbosity: Info, Warning, Error, Fatal"}, {"Verbosity", "Verbosity: Info, Warning, Error, Fatal"},
@ -202,23 +205,40 @@ func handleFlags() {
logger.Log(smartlogger.Error, pkg+"bad input codec argument") logger.Log(smartlogger.Error, pkg+"bad input codec argument")
} }
switch *configFlags[outputPtr] { switch *configFlags[output1Ptr] {
case "File": case "File":
config.Output = revid.File config.Output1 = revid.File
case "Http": case "Http":
config.Output = revid.Http config.Output1 = revid.Http
case "Rtmp": case "Rtmp":
config.Output = revid.Rtmp config.Output1 = revid.Rtmp
case "FfmpegRtmp": case "FfmpegRtmp":
config.Output = revid.FfmpegRtmp config.Output1 = revid.FfmpegRtmp
case "Udp": case "Udp":
config.Output = revid.Udp config.Output1 = revid.Udp
case "Rtp": case "Rtp":
config.Output = revid.Rtp config.Output1 = revid.Rtp
config.Packetization = revid.MpegtsRtp
case "": case "":
default: default:
logger.Log(smartlogger.Error, pkg+"bad output argument") logger.Log(smartlogger.Error, pkg+"bad output 1 argument")
}
switch *configFlags[output2Ptr] {
case "File":
config.Output2 = revid.File
case "Http":
config.Output2 = revid.Http
case "Rtmp":
config.Output2 = revid.Rtmp
case "FfmpegRtmp":
config.Output2 = revid.FfmpegRtmp
case "Udp":
config.Output2 = revid.Udp
case "Rtp":
config.Output2 = revid.Rtp
case "":
default:
logger.Log(smartlogger.Error, pkg+"bad output 2 argument")
} }
switch *configFlags[rtmpMethodPtr] { switch *configFlags[rtmpMethodPtr] {
@ -238,8 +258,6 @@ func handleFlags() {
config.Packetization = revid.Mpegts config.Packetization = revid.Mpegts
case "Flv": case "Flv":
config.Packetization = revid.Flv config.Packetization = revid.Flv
case "MpegtsRtp":
config.Packetization = revid.MpegtsRtp
case "": case "":
default: default:
logger.Log(smartlogger.Error, pkg+"bad packetization argument") logger.Log(smartlogger.Error, pkg+"bad packetization argument")
@ -418,15 +436,15 @@ func updateRevid(ns *netsender.Sender, vars map[string]string, stop bool) error
case "Output": case "Output":
switch value { switch value {
case "File": case "File":
config.Output = revid.File config.Output1 = revid.File
case "Http": case "Http":
config.Output = revid.Http config.Output1 = revid.Http
case "Rtmp": case "Rtmp":
config.Output = revid.Rtmp config.Output1 = revid.Rtmp
case "FfmpegRtmp": case "FfmpegRtmp":
config.Output = revid.FfmpegRtmp config.Output1 = revid.FfmpegRtmp
default: default:
logger.Log(smartlogger.Warning, pkg+"invalid Output param", "value", value) logger.Log(smartlogger.Warning, pkg+"invalid Output1 param", "value", value)
continue continue
} }
case "FramesPerClip": case "FramesPerClip":

View File

@ -57,7 +57,7 @@ func main() {
Input: revid.File, Input: revid.File,
InputFileName: inputFile, InputFileName: inputFile,
InputCodec: revid.H264, InputCodec: revid.H264,
Output: revid.Rtmp, Output1: revid.Rtmp,
RtmpMethod: revid.LibRtmp, RtmpMethod: revid.LibRtmp,
RtmpUrl: *rtmpUrlPtr, RtmpUrl: *rtmpUrlPtr,
Packetization: revid.Flv, Packetization: revid.Flv,

View File

@ -49,7 +49,7 @@ func main() {
Input: revid.File, Input: revid.File,
InputFileName: inputFile, InputFileName: inputFile,
InputCodec: revid.H264, InputCodec: revid.H264,
Output: revid.File, Output1: revid.File,
OutputFileName: outputFile, OutputFileName: outputFile,
Packetization: revid.Mpegts, Packetization: revid.Mpegts,
Logger: smartlogger.New(smartlogger.Info, logPath), Logger: smartlogger.New(smartlogger.Info, logPath),

View File

@ -39,7 +39,8 @@ import (
type Config struct { type Config struct {
Input uint8 Input uint8
InputCodec uint8 InputCodec uint8
Output uint8 Output1 uint8
Output2 uint8
RtmpMethod uint8 RtmpMethod uint8
Packetization uint8 Packetization uint8
QuantizationMode uint8 QuantizationMode uint8
@ -181,14 +182,14 @@ func (c *Config) Validate(r *Revid) error {
return errors.New("bad input codec defined in config") return errors.New("bad input codec defined in config")
} }
switch c.Output { switch c.Output1 {
case File: case File:
case Rtp: case Rtp:
case Udp: case Udp:
case Rtmp, FfmpegRtmp: case Rtmp, FfmpegRtmp:
if c.RtmpUrl == "" { if c.RtmpUrl == "" {
c.Logger.Log(smartlogger.Info, pkg+"no RTMP URL: falling back to HTTP") c.Logger.Log(smartlogger.Info, pkg+"no RTMP URL: falling back to HTTP")
c.Output = Http c.Output1 = Http
break break
} }
c.Logger.Log(smartlogger.Info, pkg+"defaulting frames per clip for rtmp out", c.Logger.Log(smartlogger.Info, pkg+"defaulting frames per clip for rtmp out",
@ -197,7 +198,7 @@ func (c *Config) Validate(r *Revid) error {
case NothingDefined: case NothingDefined:
c.Logger.Log(smartlogger.Warning, pkg+"no output defined, defaulting", "output", c.Logger.Log(smartlogger.Warning, pkg+"no output defined, defaulting", "output",
defaultOutput) defaultOutput)
c.Output = defaultOutput c.Output1 = defaultOutput
fallthrough fallthrough
case Http: case Http:
c.Logger.Log(smartlogger.Info, pkg+"defaulting frames per clip for http out", c.Logger.Log(smartlogger.Info, pkg+"defaulting frames per clip for http out",
@ -207,6 +208,22 @@ func (c *Config) Validate(r *Revid) error {
return errors.New("bad output type defined in config") return errors.New("bad output type defined in config")
} }
switch c.Output2 {
case File:
case Rtp:
case Udp:
case Rtmp, FfmpegRtmp:
if c.RtmpUrl == "" {
c.Logger.Log(smartlogger.Info, pkg+"no RTMP URL: falling back to HTTP")
c.Output2 = Http
break
}
case NothingDefined:
case Http:
default:
return errors.New("bad output2 type defined in config")
}
switch c.HorizontalFlip { switch c.HorizontalFlip {
case Yes: case Yes:
case No: case No:

View File

@ -43,7 +43,6 @@ import (
"bitbucket.org/ausocean/av/stream/flv" "bitbucket.org/ausocean/av/stream/flv"
"bitbucket.org/ausocean/av/stream/lex" "bitbucket.org/ausocean/av/stream/lex"
"bitbucket.org/ausocean/av/stream/mts" "bitbucket.org/ausocean/av/stream/mts"
"bitbucket.org/ausocean/av/stream/rtp"
"bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/ring" "bitbucket.org/ausocean/utils/ring"
"bitbucket.org/ausocean/utils/smartlogger" "bitbucket.org/ausocean/utils/smartlogger"
@ -118,7 +117,7 @@ type Revid struct {
// to the target destination. // to the target destination.
buffer *ring.Buffer buffer *ring.Buffer
// destination is the target endpoint. // destination is the target endpoint.
destination loadSender destination []loadSender
// bitrate hold the last send bitrate calculation result. // bitrate hold the last send bitrate calculation result.
bitrate int bitrate int
@ -200,39 +199,59 @@ func (r *Revid) reset(config Config) error {
} }
r.config = config r.config = config
if r.destination != nil { for _, dest := range r.destination {
err = r.destination.close() if dest != nil {
err = dest.close()
if err != nil { if err != nil {
r.config.Logger.Log(smartlogger.Error, pkg+"could not close destination", "error", err.Error()) return err
} }
} }
switch r.config.Output { }
n := 1
if r.config.Output2 != 0 {
n = 2
}
r.destination = make([]loadSender, n)
for outNo, outType := range []uint8{r.config.Output1, r.config.Output2} {
switch outType {
case File: case File:
s, err := newFileSender(config.OutputFileName) s, err := newFileSender(config.OutputFileName)
if err != nil { if err != nil {
return err return err
} }
r.destination = s r.destination[outNo] = s
case FfmpegRtmp: case FfmpegRtmp:
s, err := newFfmpegSender(config.RtmpUrl, r.config.FrameRate) s, err := newFfmpegSender(config.RtmpUrl, r.config.FrameRate)
if err != nil { if err != nil {
return err return err
} }
r.destination = s r.destination[outNo] = s
case Rtmp: case Rtmp:
s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimout, rtmpConnectionMaxTries, r.config.Logger.Log) s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimout, rtmpConnectionMaxTries, r.config.Logger.Log)
if err != nil { if err != nil {
return err return err
} }
r.destination = s r.destination[outNo] = s
case Http: case Http:
r.destination = newHttpSender(r.ns, r.config.Logger.Log) r.destination[outNo] = newHttpSender(r.ns, r.config.Logger.Log)
case Rtp, Udp: case Udp:
s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log) s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log)
if err != nil { if err != nil {
return err return err
} }
r.destination = s r.destination[outNo] = s
case Rtp:
// TODO: framerate in config should probably be an int, make conversions early
// when setting config fields in revid-cli
fps, _ := strconv.Atoi(r.config.FrameRate)
s, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, fps)
if err != nil {
return err
}
r.destination[outNo] = s
}
} }
switch r.config.Input { switch r.config.Input {
@ -278,10 +297,6 @@ func (r *Revid) reset(config Config) error {
if err != nil { if err != nil {
return err return err
} }
case MpegtsRtp:
r.config.Logger.Log(smartlogger.Info, pkg+"using RTP packetisation")
frameRate, _ := strconv.Atoi(r.config.FrameRate)
r.encoder = mts.NewEncoder(rtp.NewEncoder(&r.packer, frameRate), float64(frameRate))
} }
return nil return nil
@ -349,51 +364,50 @@ loop:
count += chunk.Len() count += chunk.Len()
r.config.Logger.Log(smartlogger.Debug, pkg+"about to send") r.config.Logger.Log(smartlogger.Debug, pkg+"about to send")
err = r.destination.load(chunk) for i, dest := range r.destination {
err = dest.load(chunk)
if err != nil { if err != nil {
r.config.Logger.Log(smartlogger.Error, pkg+"failed to load clip") r.config.Logger.Log(smartlogger.Error, pkg+"failed to load clip to output"+strconv.Itoa(i))
} }
err = r.destination.send() }
for i, dest := range r.destination {
err = dest.send()
if err == nil { if err == nil {
r.config.Logger.Log(smartlogger.Debug, pkg+"sent clip") r.config.Logger.Log(smartlogger.Debug, pkg+"sent clip to output "+strconv.Itoa(i))
} } else {
r.config.Logger.Log(smartlogger.Error, pkg+"send to output "+strconv.Itoa(i)+
"failed, trying again", "error", err.Error())
err = dest.send()
if err != nil && chunk.Len() > 11 { if err != nil && chunk.Len() > 11 {
r.config.Logger.Log(smartlogger.Error, pkg+"first send failed", "error", err.Error()) r.config.Logger.Log(smartlogger.Error, pkg+"second send attempted failed, restarting connection", "error", err.Error())
// Try and send again
err = r.destination.send()
if err != nil {
r.config.Logger.Log(smartlogger.Error, pkg+"destination send error", "error", err.Error())
}
// if there's still an error we try and reconnect, unless we're stopping
for err != nil { for err != nil {
r.config.Logger.Log(smartlogger.Debug, pkg+"send failed again, trying to reconnect...")
time.Sleep(sendFailedDelay) time.Sleep(sendFailedDelay)
r.config.Logger.Log(smartlogger.Error, pkg+"send failed with error", "error", err.Error()) if rs, ok := dest.(restarter); ok {
if rs, ok := r.destination.(restarter); ok {
r.config.Logger.Log(smartlogger.Debug, pkg+"restarting session", "session", rs) r.config.Logger.Log(smartlogger.Debug, pkg+"restarting session", "session", rs)
err = rs.restart() err = rs.restart()
if err != nil { if err != nil {
// TODO(kortschak): Make this "Fatal" when that exists.
r.config.Logger.Log(smartlogger.Error, pkg+"failed to restart rtmp session", "error", err.Error()) r.config.Logger.Log(smartlogger.Error, pkg+"failed to restart rtmp session", "error", err.Error())
r.isRunning = false r.isRunning = false
return return
} }
r.config.Logger.Log(smartlogger.Info, pkg+"restarted rtmp session") r.config.Logger.Log(smartlogger.Info, pkg+"restarted rtmp session")
} }
r.config.Logger.Log(smartlogger.Debug, pkg+"trying to send again with new connection") err = dest.send()
err = r.destination.send()
if err != nil { if err != nil {
r.config.Logger.Log(smartlogger.Error, pkg+"send failed with error", "error", err.Error()) r.config.Logger.Log(smartlogger.Error, pkg+"send failed again, with error", "error", err.Error())
}
}
} }
} }
} }
r.destination.release() // Release the chunk back to the ring buffer for use
for _, dest := range r.destination {
dest.release()
}
r.config.Logger.Log(smartlogger.Debug, pkg+"done reading that clip from ring buffer") r.config.Logger.Log(smartlogger.Debug, pkg+"done reading that clip from ring buffer")
// Log some information regarding bitrate and ring buffer size if it's time // Log some information regarding bitrate and ring buffer size if it's time
@ -409,9 +423,11 @@ loop:
} }
} }
r.config.Logger.Log(smartlogger.Info, pkg+"not outputting clips anymore") r.config.Logger.Log(smartlogger.Info, pkg+"not outputting clips anymore")
err := r.destination.close() for i, dest := range r.destination {
err := dest.close()
if err != nil { if err != nil {
r.config.Logger.Log(smartlogger.Error, pkg+"failed to close destination", "error", err.Error()) r.config.Logger.Log(smartlogger.Error, pkg+"failed to close output"+strconv.Itoa(i)+" destination", "error", err.Error())
}
} }
} }

View File

@ -35,6 +35,7 @@ import (
"os/exec" "os/exec"
"bitbucket.org/ausocean/av/rtmp" "bitbucket.org/ausocean/av/rtmp"
"bitbucket.org/ausocean/av/stream/rtp"
"bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/ring" "bitbucket.org/ausocean/utils/ring"
"bitbucket.org/ausocean/utils/smartlogger" "bitbucket.org/ausocean/utils/smartlogger"
@ -145,15 +146,16 @@ func (s *httpSender) send() error {
if send { if send {
_, _, err = s.client.Send(netsender.RequestPoll, pins) _, _, err = s.client.Send(netsender.RequestPoll, pins)
} }
return err
}
func (s *httpSender) release() {
// We will not retry, so release // We will not retry, so release
// the chunk and clear it now. // the chunk and clear it now.
s.chunk.Close() s.chunk.Close()
s.chunk = nil s.chunk = nil
return err
} }
func (s *httpSender) release() {}
func (s *httpSender) close() error { return nil } func (s *httpSender) close() error { return nil }
// ffmpegSender implements loadSender for an FFMPEG RTMP destination. // ffmpegSender implements loadSender for an FFMPEG RTMP destination.
@ -291,7 +293,7 @@ func (s *rtmpSender) close() error {
return s.sess.Close() return s.sess.Close()
} }
// rtpSender implements loadSender for a native udp destination. // udpSender implements loadSender for a native udp destination.
type udpSender struct { type udpSender struct {
conn net.Conn conn net.Conn
log func(lvl int8, msg string, args ...interface{}) log func(lvl int8, msg string, args ...interface{})
@ -325,3 +327,40 @@ func (s *udpSender) release() {
} }
func (s *udpSender) close() error { return nil } func (s *udpSender) close() error { return nil }
// TODO: Write restart func for rtpSender
// rtpSender implements loadSender for a native udp destination with rtp packetization.
type rtpSender struct {
log func(lvl int8, msg string, args ...interface{})
chunk *ring.Chunk
encoder *rtp.Encoder
}
func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps int) (*rtpSender, error) {
conn, err := net.Dial("udp", addr)
if err != nil {
return nil, err
}
s := &rtpSender{
log: log,
encoder: rtp.NewEncoder(conn, fps),
}
return s, nil
}
func (s *rtpSender) load(c *ring.Chunk) error {
s.chunk = c
return nil
}
func (s *rtpSender) send() error {
_, err := s.chunk.WriteTo(s.encoder)
return err
}
func (s *rtpSender) release() {
s.chunk.Close()
s.chunk = nil
}
func (s *rtpSender) close() error { return nil }