revid: fixed conflict

This commit is contained in:
Saxon 2019-03-02 17:46:20 +10:30
commit c07946fa76
5 changed files with 63 additions and 55 deletions

View File

@ -96,9 +96,7 @@ func main() {
cfg.Logger.Log(logger.Fatal, pkg+"failed to start revid", "error", err.Error()) cfg.Logger.Log(logger.Fatal, pkg+"failed to start revid", "error", err.Error())
} }
time.Sleep(*runDurationPtr) time.Sleep(*runDurationPtr)
if err = rv.Stop(); err != nil { rv.Stop()
cfg.Logger.Log(logger.Error, pkg+"failed to stop revid before program termination", "error", err.Error())
}
return return
} }
@ -120,12 +118,12 @@ func handleFlags() revid.Config {
rtmpMethodPtr = flag.String("RtmpMethod", "", "The method used to send over rtmp: Ffmpeg, Librtmp") rtmpMethodPtr = flag.String("RtmpMethod", "", "The method used to send over rtmp: Ffmpeg, Librtmp")
packetizationPtr = flag.String("Packetization", "", "The method of data packetisation: Flv, Mpegts, None") packetizationPtr = flag.String("Packetization", "", "The method of data packetisation: Flv, Mpegts, None")
quantizePtr = flag.Bool("Quantize", false, "Quantize input (non-variable bitrate)") quantizePtr = flag.Bool("Quantize", false, "Quantize input (non-variable bitrate)")
verbosityPtr = flag.String("Verbosity", "", "Verbosity: Info, Warning, Error, Fatal") verbosityPtr = flag.String("Verbosity", "Info", "Verbosity: Info, Warning, Error, Fatal")
framesPerClipPtr = flag.Uint("FramesPerClip", 0, "Number of frames per clip sent") framesPerClipPtr = flag.Uint("FramesPerClip", 0, "Number of frames per clip sent")
rtmpUrlPtr = flag.String("RtmpUrl", "", "Url of rtmp endpoint") rtmpUrlPtr = flag.String("RtmpUrl", "", "Url of rtmp endpoint")
bitratePtr = flag.Uint("Bitrate", 0, "Bitrate of recorded video") bitratePtr = flag.Uint("Bitrate", 0, "Bitrate of recorded video")
outputFileNamePtr = flag.String("OutputFileName", "", "The directory of the output file") outputPathPtr = flag.String("OutputPath", "", "The directory of the output file")
inputFileNamePtr = flag.String("InputFileName", "", "The directory of the input file") inputFilePtr = flag.String("InputPath", "", "The directory of the input file")
heightPtr = flag.Uint("Height", 0, "Height in pixels") heightPtr = flag.Uint("Height", 0, "Height in pixels")
widthPtr = flag.Uint("Width", 0, "Width in pixels") widthPtr = flag.Uint("Width", 0, "Width in pixels")
frameRatePtr = flag.Uint("FrameRate", 0, "Frame rate of captured video") frameRatePtr = flag.Uint("FrameRate", 0, "Frame rate of captured video")
@ -145,7 +143,22 @@ func handleFlags() revid.Config {
flag.Parse() flag.Parse()
log = logger.New(defaultLogVerbosity, &smartlogger.New(*logPathPtr).LogRoller) switch *verbosityPtr {
case "Debug":
cfg.LogLevel = logger.Debug
case "Info":
cfg.LogLevel = logger.Info
case "Warning":
cfg.LogLevel = logger.Warning
case "Error":
cfg.LogLevel = logger.Error
case "Fatal":
cfg.LogLevel = logger.Fatal
default:
cfg.LogLevel = defaultLogVerbosity
}
log = logger.New(cfg.LogLevel, &smartlogger.New(*logPathPtr).LogRoller)
cfg.Logger = log cfg.Logger = log
@ -229,17 +242,6 @@ func handleFlags() revid.Config {
log.Log(logger.Error, pkg+"bad packetization argument") log.Log(logger.Error, pkg+"bad packetization argument")
} }
switch *verbosityPtr {
case "No":
cfg.LogLevel = logger.Fatal
case "Debug":
cfg.LogLevel = logger.Debug
//logger.SetLevel(logger.Debug)
case "":
default:
log.Log(logger.Error, pkg+"bad verbosity argument")
}
if *configFilePtr != "" { if *configFilePtr != "" {
netsender.ConfigFile = *configFilePtr netsender.ConfigFile = *configFilePtr
} }
@ -250,8 +252,8 @@ func handleFlags() revid.Config {
cfg.FramesPerClip = *framesPerClipPtr cfg.FramesPerClip = *framesPerClipPtr
cfg.RtmpUrl = *rtmpUrlPtr cfg.RtmpUrl = *rtmpUrlPtr
cfg.Bitrate = *bitratePtr cfg.Bitrate = *bitratePtr
cfg.OutputFileName = *outputFileNamePtr cfg.OutputPath = *outputPathPtr
cfg.InputFileName = *inputFileNamePtr cfg.InputPath = *inputFilePtr
cfg.Height = *heightPtr cfg.Height = *heightPtr
cfg.Width = *widthPtr cfg.Width = *widthPtr
cfg.FrameRate = *frameRatePtr cfg.FrameRate = *frameRatePtr
@ -356,10 +358,7 @@ func run(cfg revid.Config) error {
} }
time.Sleep(time.Duration(rv.Config().BurstPeriod) * time.Second) time.Sleep(time.Duration(rv.Config().BurstPeriod) * time.Second)
log.Log(logger.Info, pkg+"Stopping burst...") log.Log(logger.Info, pkg+"Stopping burst...")
err = rv.Stop() rv.Stop()
if err != nil {
return err
}
ns.SetMode(paused, &vs) ns.SetMode(paused, &vs)
} }
sleep: sleep:

View File

@ -57,8 +57,8 @@ type Config struct {
FramesPerClip uint FramesPerClip uint
RtmpUrl string RtmpUrl string
Bitrate uint Bitrate uint
OutputFileName string OutputPath string
InputFileName string InputPath string
Height uint Height uint
Width uint Width uint
FrameRate uint FrameRate uint

View File

@ -199,13 +199,10 @@ func (r *Revid) handleErrors() {
err := <-r.err err := <-r.err
if err != nil { if err != nil {
r.config.Logger.Log(logger.Error, pkg+"async error", "error", err.Error()) r.config.Logger.Log(logger.Error, pkg+"async error", "error", err.Error())
err = r.Stop() r.Stop()
if err != nil {
r.config.Logger.Log(logger.Fatal, pkg+"failed to stop", "error", err.Error())
}
err = r.Start() err = r.Start()
if err != nil { if err != nil {
r.config.Logger.Log(logger.Fatal, pkg+"failed to restart", "error", err.Error()) r.config.Logger.Log(logger.Error, pkg+"failed to restart revid", "error", err.Error())
} }
} }
} }
@ -239,7 +236,7 @@ func (r *Revid) reset(config Config) error {
for _, typ := range r.config.Outputs { for _, typ := range r.config.Outputs {
switch typ { switch typ {
case File: case File:
s, err := newFileSender(config.OutputFileName) s, err := newFileSender(config.OutputPath)
if err != nil { if err != nil {
return err return err
} }
@ -346,7 +343,8 @@ func (r *Revid) setIsRunning(b bool) {
// and packetising (if theres packetization) to a defined output. // and packetising (if theres packetization) to a defined output.
func (r *Revid) Start() error { func (r *Revid) Start() error {
if r.IsRunning() { if r.IsRunning() {
return errors.New(pkg + "start called but revid is already running") r.config.Logger.Log(logger.Warning, pkg+"start called, but revid already running")
return nil
} }
r.config.Logger.Log(logger.Info, pkg+"starting Revid") r.config.Logger.Log(logger.Info, pkg+"starting Revid")
// TODO: this doesn't need to be here // TODO: this doesn't need to be here
@ -361,9 +359,10 @@ func (r *Revid) Start() error {
} }
// Stop halts any processing of video data from a camera or file // Stop halts any processing of video data from a camera or file
func (r *Revid) Stop() error { func (r *Revid) Stop() {
if !r.IsRunning() { if !r.IsRunning() {
return errors.New(pkg + "stop called but revid is already stopped") r.config.Logger.Log(logger.Warning, pkg+"stop called but revid isn't running")
return
} }
r.config.Logger.Log(logger.Info, pkg+"stopping revid") r.config.Logger.Log(logger.Info, pkg+"stopping revid")
@ -375,14 +374,11 @@ func (r *Revid) Stop() error {
r.cmd.Process.Kill() r.cmd.Process.Kill()
} }
r.wg.Wait() r.wg.Wait()
return nil
} }
func (r *Revid) Update(vars map[string]string) error { func (r *Revid) Update(vars map[string]string) error {
if r.IsRunning() { if r.IsRunning() {
if err := r.Stop(); err != nil { r.Stop()
return err
}
} }
//look through the vars and update revid where needed //look through the vars and update revid where needed
for key, value := range vars { for key, value := range vars {
@ -433,10 +429,10 @@ func (r *Revid) Update(vars map[string]string) error {
break break
} }
r.config.Bitrate = uint(v) r.config.Bitrate = uint(v)
case "OutputFileName": case "OutputPath":
r.config.OutputFileName = value r.config.OutputPath = value
case "InputFileName": case "InputPath":
r.config.InputFileName = value r.config.InputPath = value
case "Height": case "Height":
h, err := strconv.ParseUint(value, 10, 0) h, err := strconv.ParseUint(value, 10, 0)
if err != nil { if err != nil {
@ -501,7 +497,7 @@ func (r *Revid) Update(vars map[string]string) error {
r.config.BurstPeriod = uint(v) r.config.BurstPeriod = uint(v)
} }
} }
r.config.Logger.Log(logger.Info, pkg+"revid config changed", "config", fmt.Sprint("%+v", r.config)) r.config.Logger.Log(logger.Info, pkg+"revid config changed", "config", fmt.Sprintf("%+v", r.config))
return r.reset(r.config) return r.reset(r.config)
} }
@ -656,13 +652,13 @@ func (r *Revid) startV4L() error {
const defaultVideo = "/dev/video0" const defaultVideo = "/dev/video0"
r.config.Logger.Log(logger.Info, pkg+"starting webcam") r.config.Logger.Log(logger.Info, pkg+"starting webcam")
if r.config.InputFileName == "" { if r.config.InputPath == "" {
r.config.Logger.Log(logger.Info, pkg+"using default video device", "device", defaultVideo) r.config.Logger.Log(logger.Info, pkg+"using default video device", "device", defaultVideo)
r.config.InputFileName = defaultVideo r.config.InputPath = defaultVideo
} }
args := []string{ args := []string{
"-i", r.config.InputFileName, "-i", r.config.InputPath,
"-f", "h264", "-f", "h264",
"-r", fmt.Sprint(r.config.FrameRate), "-r", fmt.Sprint(r.config.FrameRate),
} }
@ -703,7 +699,7 @@ func (r *Revid) startV4L() error {
// setupInputForFile sets things up for getting input from a file // setupInputForFile sets things up for getting input from a file
func (r *Revid) setupInputForFile() error { func (r *Revid) setupInputForFile() error {
f, err := os.Open(r.config.InputFileName) f, err := os.Open(r.config.InputPath)
if err != nil { if err != nil {
r.config.Logger.Log(logger.Error, err.Error()) r.config.Logger.Log(logger.Error, err.Error())
r.Stop() r.Stop()

View File

@ -134,10 +134,10 @@ func Dial(url string, timeout uint, log Log) (*Conn, error) {
// Close terminates the RTMP connection. // Close terminates the RTMP connection.
// NB: Close is idempotent and the connection value is cleared completely. // NB: Close is idempotent and the connection value is cleared completely.
func (c *Conn) Close() error { func (c *Conn) Close() error {
c.log(DebugLevel, pkg+"Conn.Close")
if !c.isConnected() { if !c.isConnected() {
return errNotConnected return errNotConnected
} }
c.log(DebugLevel, pkg+"Conn.Close")
if c.streamID > 0 { if c.streamID > 0 {
if c.link.protocol&featureWrite != 0 { if c.link.protocol&featureWrite != 0 {
sendFCUnpublish(c) sendFCUnpublish(c)

View File

@ -30,6 +30,7 @@ package mts
import ( import (
"errors" "errors"
"fmt"
) )
// General mpegts packet properties. // General mpegts packet properties.
@ -157,20 +158,32 @@ type Packet struct {
Payload []byte // Mpeg ts Payload Payload []byte // Mpeg ts Payload
} }
// FindPMT will take a clip of mpegts and try to find a PMT table - if one // FindPmt will take a clip of mpegts and try to find a PMT table - if one
// is found, then it is returned along with its index, otherwise nil, -1 and an error is returned. // is found, then it is returned along with its index, otherwise nil, -1 and an error is returned.
func FindPMT(d []byte) (p []byte, i int, err error) { func FindPmt(d []byte) ([]byte, int, error) {
return FindPid(d, PmtPid)
}
// FindPat will take a clip of mpegts and try to find a PAT table - if one
// is found, then it is returned along with its index, otherwise nil, -1 and an error is returned.
func FindPat(d []byte) ([]byte, int, error) {
return FindPid(d, PatPid)
}
// FindPid will take a clip of mpegts and try to find a packet with given PID - if one
// is found, then it is returned along with its index, otherwise nil, -1 and an error is returned.
func FindPid(d []byte, pid uint16) (pkt []byte, i int, err error) {
if len(d) < PacketSize { if len(d) < PacketSize {
return nil, -1, errors.New("Mmpegts data not of valid length") return nil, -1, errors.New("Mmpegts data not of valid length")
} }
for i = 0; i < len(d); i += PacketSize { for i = 0; i < len(d); i += PacketSize {
pid := (uint16(d[i+1]&0x1f) << 8) | uint16(d[i+2]) p := (uint16(d[i+1]&0x1f) << 8) | uint16(d[i+2])
if pid == pmtPid { if p == pid {
p = d[i+4 : i+PacketSize] pkt = d[i+4 : i+PacketSize]
return return
} }
} }
return nil, -1, errors.New("Could not find pmt table in mpegts data") return nil, -1, fmt.Errorf("could not find packet with pid: %d", pid)
} }
// FillPayload takes a channel and fills the packets Payload field until the // FillPayload takes a channel and fills the packets Payload field until the