stream/mts: fixed conflict

This commit is contained in:
Saxon 2019-02-28 16:47:24 +10:30
commit ee915b58ac
6 changed files with 95 additions and 85 deletions

View File

@ -40,6 +40,7 @@ import (
"bitbucket.org/ausocean/av/stream/mts" "bitbucket.org/ausocean/av/stream/mts"
"bitbucket.org/ausocean/av/stream/mts/meta" "bitbucket.org/ausocean/av/stream/mts/meta"
"bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/iot/pi/sds"
"bitbucket.org/ausocean/iot/pi/smartlogger" "bitbucket.org/ausocean/iot/pi/smartlogger"
"bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/logger"
) )
@ -49,7 +50,7 @@ const (
progName = "revid-cli" progName = "revid-cli"
// Logging is set to INFO level. // Logging is set to INFO level.
defaultLogVerbosity = logger.Debug defaultLogVerbosity = logger.Info
) )
// Revid modes // Revid modes
@ -86,7 +87,6 @@ func main() {
runDurationPtr := flag.Duration("runDuration", defaultRunDuration, "How long do you want revid to run for?") runDurationPtr := flag.Duration("runDuration", defaultRunDuration, "How long do you want revid to run for?")
cfg := handleFlags() cfg := handleFlags()
if !*useNetsender { if !*useNetsender {
rv, err := revid.New(cfg, nil) rv, err := revid.New(cfg, nil)
if err != nil { if err != nil {
@ -124,8 +124,8 @@ func handleFlags() revid.Config {
framesPerClipPtr = flag.Uint("FramesPerClip", 0, "Number of frames per clip sent") framesPerClipPtr = flag.Uint("FramesPerClip", 0, "Number of frames per clip sent")
rtmpUrlPtr = flag.String("RtmpUrl", "", "Url of rtmp endpoint") rtmpUrlPtr = flag.String("RtmpUrl", "", "Url of rtmp endpoint")
bitratePtr = flag.Uint("Bitrate", 0, "Bitrate of recorded video") bitratePtr = flag.Uint("Bitrate", 0, "Bitrate of recorded video")
outputFileNamePtr = flag.String("OutputFileName", "", "The directory of the output file") outputPathPtr = flag.String("OutputPath", "", "The directory of the output file")
inputFileNamePtr = flag.String("InputFileName", "", "The directory of the input file") inputFilePtr = flag.String("InputPath", "", "The directory of the input file")
heightPtr = flag.Uint("Height", 0, "Height in pixels") heightPtr = flag.Uint("Height", 0, "Height in pixels")
widthPtr = flag.Uint("Width", 0, "Width in pixels") widthPtr = flag.Uint("Width", 0, "Width in pixels")
frameRatePtr = flag.Uint("FrameRate", 0, "Frame rate of captured video") frameRatePtr = flag.Uint("FrameRate", 0, "Frame rate of captured video")
@ -145,7 +145,22 @@ func handleFlags() revid.Config {
flag.Parse() flag.Parse()
log = logger.New(defaultLogVerbosity, &smartlogger.New(*logPathPtr).LogRoller) switch *verbosityPtr {
case "Debug":
cfg.LogLevel = logger.Debug
case "Info":
cfg.LogLevel = logger.Info
case "Warning":
cfg.LogLevel = logger.Warning
case "Error":
cfg.LogLevel = logger.Error
case "Fatal":
cfg.LogLevel = logger.Fatal
default:
log.Log(logger.Error, pkg+"bad verbosity argument")
}
log = logger.New(cfg.LogLevel, &smartlogger.New(*logPathPtr).LogRoller)
cfg.Logger = log cfg.Logger = log
@ -184,6 +199,10 @@ func handleFlags() revid.Config {
log.Log(logger.Error, pkg+"bad input codec argument") log.Log(logger.Error, pkg+"bad input codec argument")
} }
if len(outputs) == 0 {
cfg.Outputs = make([]uint8, 1)
}
for _, o := range outputs { for _, o := range outputs {
switch o { switch o {
case "File": case "File":
@ -225,17 +244,6 @@ func handleFlags() revid.Config {
log.Log(logger.Error, pkg+"bad packetization argument") log.Log(logger.Error, pkg+"bad packetization argument")
} }
switch *verbosityPtr {
case "No":
cfg.LogLevel = logger.Fatal
case "Debug":
cfg.LogLevel = logger.Debug
//logger.SetLevel(logger.Debug)
case "":
default:
log.Log(logger.Error, pkg+"bad verbosity argument")
}
if *configFilePtr != "" { if *configFilePtr != "" {
netsender.ConfigFile = *configFilePtr netsender.ConfigFile = *configFilePtr
} }
@ -246,8 +254,8 @@ func handleFlags() revid.Config {
cfg.FramesPerClip = *framesPerClipPtr cfg.FramesPerClip = *framesPerClipPtr
cfg.RtmpUrl = *rtmpUrlPtr cfg.RtmpUrl = *rtmpUrlPtr
cfg.Bitrate = *bitratePtr cfg.Bitrate = *bitratePtr
cfg.OutputFileName = *outputFileNamePtr cfg.OutputPath = *outputPathPtr
cfg.InputFileName = *inputFileNamePtr cfg.InputPath = *inputFilePtr
cfg.Height = *heightPtr cfg.Height = *heightPtr
cfg.Width = *widthPtr cfg.Width = *widthPtr
cfg.FrameRate = *frameRatePtr cfg.FrameRate = *frameRatePtr
@ -266,21 +274,33 @@ func run(cfg revid.Config) error {
var vars map[string]string var vars map[string]string
// initialize NetSender and use NetSender's logger var rv *revid.Revid
var ns netsender.Sender
err := ns.Init(log, nil, nil, nil) readPin := func(pin *netsender.Pin) error {
switch {
case pin.Name == "X23":
pin.Value = rv.Bitrate()
case pin.Name[0] == 'X':
return sds.ReadSystem(pin)
default:
pin.Value = -1
}
return nil // Return error only if we want NetSender to generate an error
}
ns, err := netsender.New(log, nil, readPin, nil)
if err != nil { if err != nil {
return err return err
} }
vars, _ = ns.Vars() rv, err = revid.New(cfg, ns)
vs := ns.VarSum()
rv, err := revid.New(cfg, &ns)
if err != nil { if err != nil {
log.Log(logger.Fatal, pkg+"could not initialise revid", "error", err.Error()) log.Log(logger.Fatal, pkg+"could not initialise revid", "error", err.Error())
} }
vars, _ = ns.Vars()
vs := ns.VarSum()
// Update revid to get latest config settings from netreceiver. // Update revid to get latest config settings from netreceiver.
err = rv.Update(vars) err = rv.Update(vars)
if err != nil { if err != nil {
@ -300,8 +320,7 @@ func run(cfg revid.Config) error {
} }
for { for {
// TODO(saxon): replace this call with call to ns.Run(). err = ns.Run()
err = send(&ns, rv)
if err != nil { if err != nil {
log.Log(logger.Error, pkg+"Run Failed. Retrying...", "error", err.Error()) log.Log(logger.Error, pkg+"Run Failed. Retrying...", "error", err.Error())
time.Sleep(netSendRetryTime) time.Sleep(netSendRetryTime)
@ -356,29 +375,6 @@ func run(cfg revid.Config) error {
} }
} }
// send implements our main NetSender client and handles NetReceiver configuration
// (as distinct from httpSender which just sends video data).
func send(ns *netsender.Sender, rv *revid.Revid) error {
// populate input values, if any
inputs := netsender.MakePins(ns.Param("ip"), "X")
if rv != nil {
for i, pin := range inputs {
if pin.Name == "X23" {
inputs[i].Value = rv.Bitrate()
}
}
}
_, reconfig, err := ns.Send(netsender.RequestPoll, inputs)
if err != nil {
return err
}
if reconfig {
return ns.Config()
}
return nil
}
// flagStrings implements an appending string set flag. // flagStrings implements an appending string set flag.
type flagStrings []string type flagStrings []string

View File

@ -57,8 +57,8 @@ type Config struct {
FramesPerClip uint FramesPerClip uint
RtmpUrl string RtmpUrl string
Bitrate uint Bitrate uint
OutputFileName string OutputPath string
InputFileName string InputPath string
Height uint Height uint
Width uint Width uint
FrameRate uint FrameRate uint
@ -148,11 +148,9 @@ func (c *Config) Validate(r *Revid) error {
// Configuration really needs to be rethought here. // Configuration really needs to be rethought here.
if c.Quantize && c.Quantization == 0 { if c.Quantize && c.Quantization == 0 {
c.Quantization = defaultQuantization c.Quantization = defaultQuantization
} else {
c.Bitrate = defaultBitrate
} }
if (c.Bitrate > 0 && c.Quantization > 0) || (c.Bitrate == 0 && c.Quantization == 0) { if (c.Bitrate > 0 && c.Quantize) || (c.Bitrate == 0 && !c.Quantize) {
return errors.New("bad bitrate and quantization combination for H264 input") return errors.New("bad bitrate and quantization combination for H264 input")
} }

View File

@ -51,7 +51,7 @@ import (
// Ring buffer sizes and read/write timeouts. // Ring buffer sizes and read/write timeouts.
const ( const (
ringBufferSize = 100 ringBufferSize = 1000
ringBufferElementSize = 150000 ringBufferElementSize = 150000
writeTimeout = 10 * time.Millisecond writeTimeout = 10 * time.Millisecond
readTimeout = 10 * time.Millisecond readTimeout = 10 * time.Millisecond
@ -161,7 +161,6 @@ func (p *packer) Write(frame []byte) (int, error) {
// an error if construction of the new instance was not successful. // an error if construction of the new instance was not successful.
func New(c Config, ns *netsender.Sender) (*Revid, error) { func New(c Config, ns *netsender.Sender) (*Revid, error) {
r := Revid{ns: ns, err: make(chan error)} r := Revid{ns: ns, err: make(chan error)}
r.buffer = ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout)
r.packer.owner = &r r.packer.owner = &r
err := r.reset(c) err := r.reset(c)
if err != nil { if err != nil {
@ -204,20 +203,13 @@ func (r *Revid) reset(config Config) error {
} }
r.config = config r.config = config
for _, dest := range r.destination { r.buffer = ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout)
if dest != nil {
err = dest.close()
if err != nil {
return err
}
}
}
r.destination = make([]loadSender, 0, len(r.config.Outputs)) r.destination = make([]loadSender, 0, len(r.config.Outputs))
for _, typ := range r.config.Outputs { for _, typ := range r.config.Outputs {
switch typ { switch typ {
case File: case File:
s, err := newFileSender(config.OutputFileName) s, err := newFileSender(config.OutputPath)
if err != nil { if err != nil {
return err return err
} }
@ -372,6 +364,7 @@ func (r *Revid) Update(vars map[string]string) error {
for key, value := range vars { for key, value := range vars {
switch key { switch key {
case "Output": case "Output":
r.config.Outputs = make([]uint8, 1)
// FIXME(kortschak): There can be only one! // FIXME(kortschak): There can be only one!
// How do we specify outputs after the first? // How do we specify outputs after the first?
// //
@ -389,6 +382,17 @@ func (r *Revid) Update(vars map[string]string) error {
r.config.Logger.Log(logger.Warning, pkg+"invalid Output1 param", "value", value) r.config.Logger.Log(logger.Warning, pkg+"invalid Output1 param", "value", value)
continue continue
} }
case "Packetization":
switch value {
case "Mpegts":
r.config.Packetization = Mpegts
case "Flv":
r.config.Packetization = Flv
default:
r.config.Logger.Log(logger.Warning, pkg+"invalid packetization param", "value", value)
continue
}
case "FramesPerClip": case "FramesPerClip":
f, err := strconv.ParseUint(value, 10, 0) f, err := strconv.ParseUint(value, 10, 0)
if err != nil { if err != nil {
@ -405,10 +409,10 @@ func (r *Revid) Update(vars map[string]string) error {
break break
} }
r.config.Bitrate = uint(v) r.config.Bitrate = uint(v)
case "OutputFileName": case "OutputPath":
r.config.OutputFileName = value r.config.OutputPath = value
case "InputFileName": case "InputPath":
r.config.InputFileName = value r.config.InputPath = value
case "Height": case "Height":
h, err := strconv.ParseUint(value, 10, 0) h, err := strconv.ParseUint(value, 10, 0)
if err != nil { if err != nil {
@ -473,8 +477,8 @@ func (r *Revid) Update(vars map[string]string) error {
r.config.BurstPeriod = uint(v) r.config.BurstPeriod = uint(v)
} }
} }
r.config.Logger.Log(logger.Info, pkg+"revid config changed", "config", fmt.Sprintf("%+v", r.config))
return nil return r.reset(r.config)
} }
// outputClips takes the clips produced in the packClips method and outputs them // outputClips takes the clips produced in the packClips method and outputs them
@ -491,7 +495,7 @@ loop:
case nil: case nil:
// Do nothing. // Do nothing.
case ring.ErrTimeout: case ring.ErrTimeout:
r.config.Logger.Log(logger.Warning, pkg+"ring buffer read timeout") r.config.Logger.Log(logger.Debug, pkg+"ring buffer read timeout")
continue continue
default: default:
r.config.Logger.Log(logger.Error, pkg+"unexpected error", "error", err.Error()) r.config.Logger.Log(logger.Error, pkg+"unexpected error", "error", err.Error())
@ -628,13 +632,13 @@ func (r *Revid) startV4L() error {
const defaultVideo = "/dev/video0" const defaultVideo = "/dev/video0"
r.config.Logger.Log(logger.Info, pkg+"starting webcam") r.config.Logger.Log(logger.Info, pkg+"starting webcam")
if r.config.InputFileName == "" { if r.config.InputPath == "" {
r.config.Logger.Log(logger.Info, pkg+"using default video device", "device", defaultVideo) r.config.Logger.Log(logger.Info, pkg+"using default video device", "device", defaultVideo)
r.config.InputFileName = defaultVideo r.config.InputPath = defaultVideo
} }
args := []string{ args := []string{
"-i", r.config.InputFileName, "-i", r.config.InputPath,
"-f", "h264", "-f", "h264",
"-r", fmt.Sprint(r.config.FrameRate), "-r", fmt.Sprint(r.config.FrameRate),
} }
@ -675,7 +679,7 @@ func (r *Revid) startV4L() error {
// setupInputForFile sets things up for getting input from a file // setupInputForFile sets things up for getting input from a file
func (r *Revid) setupInputForFile() error { func (r *Revid) setupInputForFile() error {
f, err := os.Open(r.config.InputFileName) f, err := os.Open(r.config.InputPath)
if err != nil { if err != nil {
r.config.Logger.Log(logger.Error, err.Error()) r.config.Logger.Log(logger.Error, err.Error())
r.Stop() r.Stop()

View File

@ -380,7 +380,6 @@ func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg
break break
} }
log(logger.Error, err.Error()) log(logger.Error, err.Error())
conn.Close()
if n < retries-1 { if n < retries-1 {
log(logger.Info, pkg+"retry rtmp connection") log(logger.Info, pkg+"retry rtmp connection")
} }

View File

@ -134,10 +134,10 @@ func Dial(url string, timeout uint, log Log) (*Conn, error) {
// Close terminates the RTMP connection. // Close terminates the RTMP connection.
// NB: Close is idempotent and the connection value is cleared completely. // NB: Close is idempotent and the connection value is cleared completely.
func (c *Conn) Close() error { func (c *Conn) Close() error {
c.log(DebugLevel, pkg+"Conn.Close")
if !c.isConnected() { if !c.isConnected() {
return errNotConnected return errNotConnected
} }
c.log(DebugLevel, pkg+"Conn.Close")
if c.streamID > 0 { if c.streamID > 0 {
if c.link.protocol&featureWrite != 0 { if c.link.protocol&featureWrite != 0 {
sendFCUnpublish(c) sendFCUnpublish(c)

View File

@ -30,6 +30,7 @@ package mts
import ( import (
"errors" "errors"
"fmt"
"github.com/Comcast/gots/packet" "github.com/Comcast/gots/packet"
) )
@ -162,20 +163,32 @@ type Packet struct {
Payload []byte // Mpeg ts Payload Payload []byte // Mpeg ts Payload
} }
// FindPMT will take a clip of mpegts and try to find a PMT table - if one // FindPmt will take a clip of mpegts and try to find a PMT table - if one
// is found, then it is returned along with its index, otherwise nil, -1 and an error is returned. // is found, then it is returned along with its index, otherwise nil, -1 and an error is returned.
func FindPMT(d []byte) (p []byte, i int, err error) { func FindPmt(d []byte) ([]byte, int, error) {
return FindPid(d, PmtPid)
}
// FindPat will take a clip of mpegts and try to find a PAT table - if one
// is found, then it is returned along with its index, otherwise nil, -1 and an error is returned.
func FindPat(d []byte) ([]byte, int, error) {
return FindPid(d, PatPid)
}
// FindPid will take a clip of mpegts and try to find a packet with given PID - if one
// is found, then it is returned along with its index, otherwise nil, -1 and an error is returned.
func FindPid(d []byte, pid uint16) (pkt []byte, i int, err error) {
if len(d) < PacketSize { if len(d) < PacketSize {
return nil, -1, errors.New("Mmpegts data not of valid length") return nil, -1, errors.New("Mmpegts data not of valid length")
} }
for i = 0; i < len(d); i += PacketSize { for i = 0; i < len(d); i += PacketSize {
pid := (uint16(d[i+1]&0x1f) << 8) | uint16(d[i+2]) p := (uint16(d[i+1]&0x1f) << 8) | uint16(d[i+2])
if pid == pmtPid { if p == pid {
p = d[i+4 : i+PacketSize] pkt = d[i+4 : i+PacketSize]
return return
} }
} }
return nil, -1, errors.New("Could not find pmt table in mpegts data") return nil, -1, fmt.Errorf("could not find packet with pid: %d", pid)
} }
// FillPayload takes a channel and fills the packets Payload field until the // FillPayload takes a channel and fills the packets Payload field until the