Merge branch 'master' into audio-compression

This commit is contained in:
Trek H 2019-02-13 10:04:43 +10:30
commit 790dfaba7e
6 changed files with 106 additions and 103 deletions

View File

@ -102,8 +102,6 @@ func handleFlags() revid.Config {
inputPtr = flag.String("Input", "", "The input type: Raspivid, File, Webcam") inputPtr = flag.String("Input", "", "The input type: Raspivid, File, Webcam")
inputCodecPtr = flag.String("InputCodec", "", "The codec of the input: H264, Mjpeg") inputCodecPtr = flag.String("InputCodec", "", "The codec of the input: H264, Mjpeg")
output1Ptr = flag.String("Output1", "", "The first output type: Http, Rtmp, File, Udp, Rtp")
output2Ptr = flag.String("Output2", "", "The second output type: Http, Rtmp, File, Udp, Rtp")
rtmpMethodPtr = flag.String("RtmpMethod", "", "The method used to send over rtmp: Ffmpeg, Librtmp") rtmpMethodPtr = flag.String("RtmpMethod", "", "The method used to send over rtmp: Ffmpeg, Librtmp")
packetizationPtr = flag.String("Packetization", "", "The method of data packetisation: Flv, Mpegts, None") packetizationPtr = flag.String("Packetization", "", "The method of data packetisation: Flv, Mpegts, None")
quantizePtr = flag.Bool("Quantize", false, "Quantize input (non-variable bitrate)") quantizePtr = flag.Bool("Quantize", false, "Quantize input (non-variable bitrate)")
@ -126,6 +124,9 @@ func handleFlags() revid.Config {
configFilePtr = flag.String("ConfigFile", "", "NetSender config file") configFilePtr = flag.String("ConfigFile", "", "NetSender config file")
) )
var outputs flagStrings
flag.Var(&outputs, "Output", "output type: Http, Rtmp, File, Udp, Rtp (may be used more than once)")
flag.Parse() flag.Parse()
log = logger.New(defaultLogVerbosity, &smartlogger.New(*logPathPtr).LogRoller) log = logger.New(defaultLogVerbosity, &smartlogger.New(*logPathPtr).LogRoller)
@ -167,40 +168,24 @@ func handleFlags() revid.Config {
log.Log(logger.Error, pkg+"bad input codec argument") log.Log(logger.Error, pkg+"bad input codec argument")
} }
switch *output1Ptr { for _, o := range outputs {
switch o {
case "File": case "File":
cfg.Output1 = revid.File cfg.Outputs = append(cfg.Outputs, revid.File)
case "Http": case "Http":
cfg.Output1 = revid.Http cfg.Outputs = append(cfg.Outputs, revid.Http)
case "Rtmp": case "Rtmp":
cfg.Output1 = revid.Rtmp cfg.Outputs = append(cfg.Outputs, revid.Rtmp)
case "FfmpegRtmp": case "FfmpegRtmp":
cfg.Output1 = revid.FfmpegRtmp cfg.Outputs = append(cfg.Outputs, revid.FfmpegRtmp)
case "Udp": case "Udp":
cfg.Output1 = revid.Udp cfg.Outputs = append(cfg.Outputs, revid.Udp)
case "Rtp": case "Rtp":
cfg.Output1 = revid.Rtp cfg.Outputs = append(cfg.Outputs, revid.Rtp)
case "": case "":
default: default:
log.Log(logger.Error, pkg+"bad output 1 argument") log.Log(logger.Error, pkg+"bad output argument", "arg", o)
} }
switch *output2Ptr {
case "File":
cfg.Output2 = revid.File
case "Http":
cfg.Output2 = revid.Http
case "Rtmp":
cfg.Output2 = revid.Rtmp
case "FfmpegRtmp":
cfg.Output2 = revid.FfmpegRtmp
case "Udp":
cfg.Output2 = revid.Udp
case "Rtp":
cfg.Output2 = revid.Rtp
case "":
default:
log.Log(logger.Error, pkg+"bad output 2 argument")
} }
switch *rtmpMethodPtr { switch *rtmpMethodPtr {
@ -380,15 +365,19 @@ func updateRevid(ns *netsender.Sender, rv *revid.Revid, cfg revid.Config, vars m
for key, value := range vars { for key, value := range vars {
switch key { switch key {
case "Output": case "Output":
// FIXME(kortschak): There can be only one!
// How do we specify outputs after the first?
//
// Maybe we shouldn't be doing this!
switch value { switch value {
case "File": case "File":
cfg.Output1 = revid.File cfg.Outputs[0] = revid.File
case "Http": case "Http":
cfg.Output1 = revid.Http cfg.Outputs[0] = revid.Http
case "Rtmp": case "Rtmp":
cfg.Output1 = revid.Rtmp cfg.Outputs[0] = revid.Rtmp
case "FfmpegRtmp": case "FfmpegRtmp":
cfg.Output1 = revid.FfmpegRtmp cfg.Outputs[0] = revid.FfmpegRtmp
default: default:
log.Log(logger.Warning, pkg+"invalid Output1 param", "value", value) log.Log(logger.Warning, pkg+"invalid Output1 param", "value", value)
continue continue
@ -474,3 +463,28 @@ func updateRevid(ns *netsender.Sender, rv *revid.Revid, cfg revid.Config, vars m
return startRevid(ns, cfg) return startRevid(ns, cfg)
} }
// flagStrings implements an appending string set flag.
type flagStrings []string
func (v *flagStrings) String() string {
if *v != nil {
return strings.Join(*v, ",")
}
return ""
}
func (v *flagStrings) Set(s string) error {
if s == "" {
return nil
}
for _, e := range *v {
if e == s {
return nil
}
}
*v = append(*v, s)
return nil
}
func (v *flagStrings) Get() interface{} { return *v }

View File

@ -58,7 +58,7 @@ func main() {
Input: revid.File, Input: revid.File,
InputFileName: inputFile, InputFileName: inputFile,
InputCodec: revid.H264, InputCodec: revid.H264,
Output1: revid.Rtmp, Outputs: []byte{revid.Rtmp},
RtmpMethod: revid.LibRtmp, RtmpMethod: revid.LibRtmp,
RtmpUrl: *rtmpUrlPtr, RtmpUrl: *rtmpUrlPtr,
Packetization: revid.Flv, Packetization: revid.Flv,

View File

@ -50,7 +50,7 @@ func main() {
Input: revid.File, Input: revid.File,
InputFileName: inputFile, InputFileName: inputFile,
InputCodec: revid.H264, InputCodec: revid.H264,
Output1: revid.File, Outputs: []byte{revid.File},
OutputFileName: outputFile, OutputFileName: outputFile,
Packetization: revid.Mpegts, Packetization: revid.Mpegts,
Logger: logger.New(logger.Info, &smartlogger.New(logPath).LogRoller), Logger: logger.New(logger.Info, &smartlogger.New(logPath).LogRoller),

View File

@ -40,8 +40,7 @@ type Config struct {
Input uint8 Input uint8
InputCodec uint8 InputCodec uint8
Output1 uint8 Outputs []uint8
Output2 uint8
RtmpMethod uint8 RtmpMethod uint8
Packetization uint8 Packetization uint8
@ -172,13 +171,16 @@ func (c *Config) Validate(r *Revid) error {
return errors.New("bad input codec defined in config") return errors.New("bad input codec defined in config")
} }
switch c.Output1 { for i, o := range c.Outputs {
switch o {
case File: case File:
case Udp: case Udp:
case Rtmp, FfmpegRtmp: case Rtmp, FfmpegRtmp:
if c.RtmpUrl == "" { if c.RtmpUrl == "" {
c.Logger.Log(logger.Info, pkg+"no RTMP URL: falling back to HTTP") c.Logger.Log(logger.Info, pkg+"no RTMP URL: falling back to HTTP")
c.Output1 = Http c.Outputs[i] = Http
// FIXME(kortschak): Does this want the same line as below?
// c.FramesPerClip = httpFramesPerClip
break break
} }
c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for rtmp out", c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for rtmp out",
@ -187,7 +189,7 @@ func (c *Config) Validate(r *Revid) error {
case NothingDefined: case NothingDefined:
c.Logger.Log(logger.Warning, pkg+"no output defined, defaulting", "output", c.Logger.Log(logger.Warning, pkg+"no output defined, defaulting", "output",
defaultOutput) defaultOutput)
c.Output1 = defaultOutput c.Outputs[i] = defaultOutput
fallthrough fallthrough
case Http, Rtp: case Http, Rtp:
c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for http out", c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for http out",
@ -196,21 +198,6 @@ func (c *Config) Validate(r *Revid) error {
default: default:
return errors.New("bad output type defined in config") return errors.New("bad output type defined in config")
} }
switch c.Output2 {
case File:
case Rtp:
case Udp:
case Rtmp, FfmpegRtmp:
if c.RtmpUrl == "" {
c.Logger.Log(logger.Info, pkg+"no RTMP URL: falling back to HTTP")
c.Output2 = Http
break
}
case NothingDefined:
case Http:
default:
return errors.New("bad output2 type defined in config")
} }
if c.FramesPerClip < 1 { if c.FramesPerClip < 1 {

View File

@ -157,8 +157,15 @@ func (p *packer) Write(frame []byte) (int, error) {
return n, err return n, err
} }
p.packetCount++ p.packetCount++
var hasRtmp bool
for _, d := range p.owner.config.Outputs {
if d == Rtmp {
hasRtmp = true
break
}
}
now := time.Now() now := time.Now()
if (p.owner.config.Output1 != Rtmp && now.Sub(p.lastTime) > clipDuration && p.packetCount%7 == 0) || p.owner.config.Output1 == Rtmp { if hasRtmp || (now.Sub(p.lastTime) > clipDuration && p.packetCount%7 == 0) {
p.owner.buffer.Flush() p.owner.buffer.Flush()
p.packetCount = 0 p.packetCount = 0
p.lastTime = now p.lastTime = now
@ -203,40 +210,35 @@ func (r *Revid) reset(config Config) error {
} }
} }
n := 1 r.destination = r.destination[:0]
if r.config.Output2 != 0 && r.config.Output2 != Rtp { for _, typ := range r.config.Outputs {
n = 2 switch typ {
}
r.destination = make([]loadSender, n)
for outNo, outType := range []uint8{r.config.Output1, r.config.Output2} {
switch outType {
case File: case File:
s, err := newFileSender(config.OutputFileName) s, err := newFileSender(config.OutputFileName)
if err != nil { if err != nil {
return err return err
} }
r.destination[outNo] = s r.destination = append(r.destination, s)
case FfmpegRtmp: case FfmpegRtmp:
s, err := newFfmpegSender(config.RtmpUrl, fmt.Sprint(r.config.FrameRate)) s, err := newFfmpegSender(config.RtmpUrl, fmt.Sprint(r.config.FrameRate))
if err != nil { if err != nil {
return err return err
} }
r.destination[outNo] = s r.destination = append(r.destination, s)
case Rtmp: case Rtmp:
s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log) s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log)
if err != nil { if err != nil {
return err return err
} }
r.destination[outNo] = s r.destination = append(r.destination, s)
case Http: case Http:
r.destination[outNo] = newHttpSender(r.ns, r.config.Logger.Log) r.destination = append(r.destination, newHttpSender(r.ns, r.config.Logger.Log))
case Udp: case Udp:
s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log) s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log)
if err != nil { if err != nil {
return err return err
} }
r.destination[outNo] = s r.destination = append(r.destination, s)
case Rtp: case Rtp:
r.rtpSender, err = newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) r.rtpSender, err = newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate)
if err != nil { if err != nil {

View File

@ -233,6 +233,12 @@ const (
// generate handles the incoming data and generates equivalent mpegts packets - // generate handles the incoming data and generates equivalent mpegts packets -
// sending them to the output channel. // sending them to the output channel.
func (e *Encoder) Encode(nalu []byte) error { func (e *Encoder) Encode(nalu []byte) error {
if e.psiCount <= 0 {
err := e.writePSI()
if err != nil {
return err
}
}
// Prepare PES data. // Prepare PES data.
pesPkt := pes.Packet{ pesPkt := pes.Packet{
StreamID: streamID, StreamID: streamID,
@ -262,14 +268,8 @@ func (e *Encoder) Encode(nalu []byte) error {
pkt.PCR = e.pcr() pkt.PCR = e.pcr()
pusi = false pusi = false
} }
if e.psiCount <= 0 {
err := e.writePSI()
if err != nil {
return err
}
}
e.psiCount--
_, err := e.dst.Write(pkt.Bytes(e.tsSpace[:PacketSize])) _, err := e.dst.Write(pkt.Bytes(e.tsSpace[:PacketSize]))
e.psiCount--
if err != nil { if err != nil {
return err return err
} }