Updating branch with master

Merge branch 'master' into improve-ts-encoder-performance
This commit is contained in:
saxon 2019-01-03 12:43:58 +10:30
commit d037b21753
6 changed files with 226 additions and 325 deletions

View File

@ -33,11 +33,13 @@ import (
"os" "os"
"runtime/pprof" "runtime/pprof"
"strconv" "strconv"
"strings"
"time" "time"
"bitbucket.org/ausocean/av/revid" "bitbucket.org/ausocean/av/revid"
"bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/smartlogger" "bitbucket.org/ausocean/iot/pi/smartlogger"
"bitbucket.org/ausocean/utils/logger"
) )
const ( const (
@ -45,7 +47,7 @@ const (
progName = "revid-cli" progName = "revid-cli"
// Logging is set to INFO level. // Logging is set to INFO level.
defaultLogVerbosity = smartlogger.Debug defaultLogVerbosity = logger.Debug
) )
// Other misc consts // Other misc consts
@ -61,7 +63,7 @@ const (
var canProfile = true var canProfile = true
// The logger that will be used throughout // The logger that will be used throughout
var logger *smartlogger.Logger var log *logger.Logger
func main() { func main() {
useNetsender := flag.Bool("NetSender", false, "Are we checking vars through netsender?") useNetsender := flag.Bool("NetSender", false, "Are we checking vars through netsender?")
@ -73,7 +75,7 @@ func main() {
// run revid for the specified duration // run revid for the specified duration
rv, _, err := startRevid(nil, cfg) rv, _, err := startRevid(nil, cfg)
if err != nil { if err != nil {
cfg.Logger.Log(smartlogger.Fatal, pkg+"failed to start revid", err.Error()) cfg.Logger.Log(logger.Fatal, pkg+"failed to start revid", err.Error())
} }
time.Sleep(*runDurationPtr) time.Sleep(*runDurationPtr)
stopRevid(rv) stopRevid(rv)
@ -82,7 +84,7 @@ func main() {
err := run(nil, cfg) err := run(nil, cfg)
if err != nil { if err != nil {
logger.Log(smartlogger.Fatal, pkg+"failed to run revid", "error", err.Error()) log.Log(logger.Fatal, pkg+"failed to run revid", "error", err.Error())
os.Exit(1) os.Exit(1)
} }
} }
@ -101,44 +103,42 @@ func handleFlags() revid.Config {
output2Ptr = flag.String("Output2", "", "The second output type: Http, Rtmp, File, Udp, Rtp") output2Ptr = flag.String("Output2", "", "The second output type: Http, Rtmp, File, Udp, Rtp")
rtmpMethodPtr = flag.String("RtmpMethod", "", "The method used to send over rtmp: Ffmpeg, Librtmp") rtmpMethodPtr = flag.String("RtmpMethod", "", "The method used to send over rtmp: Ffmpeg, Librtmp")
packetizationPtr = flag.String("Packetization", "", "The method of data packetisation: Flv, Mpegts, None") packetizationPtr = flag.String("Packetization", "", "The method of data packetisation: Flv, Mpegts, None")
quantizationModePtr = flag.String("QuantizationMode", "", "Whether quantization if on or off (variable bitrate): On, Off") quantizePtr = flag.Bool("Quantize", false, "Quantize input (non-variable bitrate)")
verbosityPtr = flag.String("Verbosity", "", "Verbosity: Info, Warning, Error, Fatal") verbosityPtr = flag.String("Verbosity", "", "Verbosity: Info, Warning, Error, Fatal")
framesPerClipPtr = flag.String("FramesPerClip", "", "Number of frames per clip sent") framesPerClipPtr = flag.Uint("FramesPerClip", 0, "Number of frames per clip sent")
rtmpUrlPtr = flag.String("RtmpUrl", "", "Url of rtmp endpoint") rtmpUrlPtr = flag.String("RtmpUrl", "", "Url of rtmp endpoint")
bitratePtr = flag.String("Bitrate", "", "Bitrate of recorded video") bitratePtr = flag.Uint("Bitrate", 0, "Bitrate of recorded video")
outputFileNamePtr = flag.String("OutputFileName", "", "The directory of the output file") outputFileNamePtr = flag.String("OutputFileName", "", "The directory of the output file")
inputFileNamePtr = flag.String("InputFileName", "", "The directory of the input file") inputFileNamePtr = flag.String("InputFileName", "", "The directory of the input file")
heightPtr = flag.String("Height", "", "Height in pixels") heightPtr = flag.Uint("Height", 0, "Height in pixels")
widthPtr = flag.String("Width", "", "Width in pixels") widthPtr = flag.Uint("Width", 0, "Width in pixels")
frameRatePtr = flag.String("FrameRate", "", "Frame rate of captured video") frameRatePtr = flag.Uint("FrameRate", 0, "Frame rate of captured video")
httpAddressPtr = flag.String("HttpAddress", "", "Destination address of http posts") httpAddressPtr = flag.String("HttpAddress", "", "Destination address of http posts")
quantizationPtr = flag.String("Quantization", "", "Desired quantization value: 0-40") quantizationPtr = flag.Uint("Quantization", 0, "Desired quantization value: 0-40")
timeoutPtr = flag.String("Timeout", "", "Http timeout in seconds") intraRefreshPeriodPtr = flag.Uint("IntraRefreshPeriod", 0, "The IntraRefreshPeriod i.e. how many keyframes we send")
intraRefreshPeriodPtr = flag.String("IntraRefreshPeriod", "", "The IntraRefreshPeriod i.e. how many keyframes we send") verticalFlipPtr = flag.Bool("VerticalFlip", false, "Flip video vertically: Yes, No")
verticalFlipPtr = flag.String("VerticalFlip", "", "Flip video vertically: Yes, No") horizontalFlipPtr = flag.Bool("HorizontalFlip", false, "Flip video horizontally: Yes, No")
horizontalFlipPtr = flag.String("HorizontalFlip", "", "Flip video horizontally: Yes, No")
logPathPtr = flag.String("LogPath", defaultLogPath, "Path for logging files (default is /var/log/netsender/)")
rtpAddrPtr = flag.String("RtpAddr", "", "Rtp destination address: <IP>:<port> (port is generally 6970-6999)") rtpAddrPtr = flag.String("RtpAddr", "", "Rtp destination address: <IP>:<port> (port is generally 6970-6999)")
) )
flag.Parse() flag.Parse()
logger = smartlogger.New(defaultLogVerbosity, *logPathPtr) log = logger.New(defaultLogVerbosity, &smartlogger.New("/var/log/netsender").LogRoller)
cfg.Logger = logger cfg.Logger = log
if *cpuprofile != "" { if *cpuprofile != "" {
if canProfile { if canProfile {
f, err := os.Create(*cpuprofile) f, err := os.Create(*cpuprofile)
if err != nil { if err != nil {
logger.Log(smartlogger.Fatal, pkg+"could not create CPU profile", "error", err.Error()) log.Log(logger.Fatal, pkg+"could not create CPU profile", "error", err.Error())
} }
if err := pprof.StartCPUProfile(f); err != nil { if err := pprof.StartCPUProfile(f); err != nil {
logger.Log(smartlogger.Fatal, pkg+"could not start CPU profile", "error", err.Error()) log.Log(logger.Fatal, pkg+"could not start CPU profile", "error", err.Error())
} }
defer pprof.StopCPUProfile() defer pprof.StopCPUProfile()
} else { } else {
logger.Log(smartlogger.Warning, pkg+"ignoring cpuprofile flag - http/pprof built in.") log.Log(logger.Warning, pkg+"ignoring cpuprofile flag - http/pprof built in.")
} }
} }
@ -149,7 +149,7 @@ func handleFlags() revid.Config {
cfg.Input = revid.File cfg.Input = revid.File
case "": case "":
default: default:
logger.Log(smartlogger.Error, pkg+"bad input argument") log.Log(logger.Error, pkg+"bad input argument")
} }
switch *inputCodecPtr { switch *inputCodecPtr {
@ -157,7 +157,7 @@ func handleFlags() revid.Config {
cfg.InputCodec = revid.H264 cfg.InputCodec = revid.H264
case "": case "":
default: default:
logger.Log(smartlogger.Error, pkg+"bad input codec argument") log.Log(logger.Error, pkg+"bad input codec argument")
} }
switch *output1Ptr { switch *output1Ptr {
@ -175,7 +175,7 @@ func handleFlags() revid.Config {
cfg.Output1 = revid.Rtp cfg.Output1 = revid.Rtp
case "": case "":
default: default:
logger.Log(smartlogger.Error, pkg+"bad output 1 argument") log.Log(logger.Error, pkg+"bad output 1 argument")
} }
switch *output2Ptr { switch *output2Ptr {
@ -193,7 +193,7 @@ func handleFlags() revid.Config {
cfg.Output2 = revid.Rtp cfg.Output2 = revid.Rtp
case "": case "":
default: default:
logger.Log(smartlogger.Error, pkg+"bad output 2 argument") log.Log(logger.Error, pkg+"bad output 2 argument")
} }
switch *rtmpMethodPtr { switch *rtmpMethodPtr {
@ -203,7 +203,7 @@ func handleFlags() revid.Config {
cfg.RtmpMethod = revid.LibRtmp cfg.RtmpMethod = revid.LibRtmp
case "": case "":
default: default:
logger.Log(smartlogger.Error, pkg+"bad rtmp method argument") log.Log(logger.Error, pkg+"bad rtmp method argument")
} }
switch *packetizationPtr { switch *packetizationPtr {
@ -215,56 +215,24 @@ func handleFlags() revid.Config {
cfg.Packetization = revid.Flv cfg.Packetization = revid.Flv
case "": case "":
default: default:
logger.Log(smartlogger.Error, pkg+"bad packetization argument") log.Log(logger.Error, pkg+"bad packetization argument")
}
switch *quantizationModePtr {
case "QuantizationOn":
cfg.QuantizationMode = revid.QuantizationOn
case "QuantizationOff":
cfg.QuantizationMode = revid.QuantizationOff
case "":
default:
logger.Log(smartlogger.Error, pkg+"bad quantization mode argument")
} }
switch *verbosityPtr { switch *verbosityPtr {
case "No": case "No":
cfg.LogLevel = smartlogger.Fatal cfg.LogLevel = logger.Fatal
case "Debug": case "Debug":
cfg.LogLevel = smartlogger.Debug cfg.LogLevel = logger.Debug
//logger.SetLevel(smartlogger.Debug) //logger.SetLevel(logger.Debug)
case "": case "":
default: default:
logger.Log(smartlogger.Error, pkg+"bad verbosity argument") log.Log(logger.Error, pkg+"bad verbosity argument")
} }
switch *horizontalFlipPtr { cfg.Quantize = *quantizePtr
case "No": cfg.FlipHorizontal = *horizontalFlipPtr
cfg.FlipHorizontal = false cfg.FlipVertical = *verticalFlipPtr
case "Yes": cfg.FramesPerClip = *framesPerClipPtr
cfg.FlipHorizontal = true
case "":
cfg.FlipHorizontal = false
default:
logger.Log(smartlogger.Error, pkg+"bad horizontal flip option")
}
switch *verticalFlipPtr {
case "No":
cfg.FlipVertical = false
case "Yes":
cfg.FlipVertical = true
case "":
cfg.FlipVertical = false
default:
logger.Log(smartlogger.Error, pkg+"bad vertical flip option")
}
fpc, err := strconv.Atoi(*framesPerClipPtr)
if err == nil && fpc > 0 {
cfg.FramesPerClip = fpc
}
cfg.RtmpUrl = *rtmpUrlPtr cfg.RtmpUrl = *rtmpUrlPtr
cfg.Bitrate = *bitratePtr cfg.Bitrate = *bitratePtr
cfg.OutputFileName = *outputFileNamePtr cfg.OutputFileName = *outputFileNamePtr
@ -274,7 +242,6 @@ func handleFlags() revid.Config {
cfg.FrameRate = *frameRatePtr cfg.FrameRate = *frameRatePtr
cfg.HttpAddress = *httpAddressPtr cfg.HttpAddress = *httpAddressPtr
cfg.Quantization = *quantizationPtr cfg.Quantization = *quantizationPtr
cfg.Timeout = *timeoutPtr
cfg.IntraRefreshPeriod = *intraRefreshPeriodPtr cfg.IntraRefreshPeriod = *intraRefreshPeriodPtr
cfg.RtpAddress = *rtpAddrPtr cfg.RtpAddress = *rtpAddrPtr
@ -285,10 +252,10 @@ func handleFlags() revid.Config {
func run(rv *revid.Revid, cfg revid.Config) error { func run(rv *revid.Revid, cfg revid.Config) error {
// initialize NetSender and use NetSender's logger // initialize NetSender and use NetSender's logger
//config.Logger = netsender.Logger() //config.Logger = netsender.Logger()
logger.Log(smartlogger.Info, pkg+"running in NetSender mode") log.Log(logger.Info, pkg+"running in NetSender mode")
var ns netsender.Sender var ns netsender.Sender
err := ns.Init(logger, nil, nil, nil) err := ns.Init(log, nil, nil, nil)
if err != nil { if err != nil {
return err return err
} }
@ -307,7 +274,7 @@ func run(rv *revid.Revid, cfg revid.Config) error {
for { for {
if err := send(&ns, rv); err != nil { if err := send(&ns, rv); err != nil {
logger.Log(smartlogger.Error, pkg+"polling failed", "error", err.Error()) log.Log(logger.Error, pkg+"polling failed", "error", err.Error())
time.Sleep(netSendRetryTime) time.Sleep(netSendRetryTime)
continue continue
} }
@ -316,14 +283,14 @@ func run(rv *revid.Revid, cfg revid.Config) error {
// vars changed // vars changed
vars, err := ns.Vars() vars, err := ns.Vars()
if err != nil { if err != nil {
logger.Log(smartlogger.Error, pkg+"netSender failed to get vars", "error", err.Error()) log.Log(logger.Error, pkg+"netSender failed to get vars", "error", err.Error())
time.Sleep(netSendRetryTime) time.Sleep(netSendRetryTime)
continue continue
} }
vs = ns.VarSum() vs = ns.VarSum()
if vars["mode"] == "Paused" { if vars["mode"] == "Paused" {
if !paused { if !paused {
logger.Log(smartlogger.Info, pkg+"pausing revid") log.Log(logger.Info, pkg+"pausing revid")
stopRevid(rv) stopRevid(rv)
paused = true paused = true
} }
@ -402,86 +369,83 @@ func updateRevid(ns *netsender.Sender, rv *revid.Revid, cfg revid.Config, vars m
case "FfmpegRtmp": case "FfmpegRtmp":
cfg.Output1 = revid.FfmpegRtmp cfg.Output1 = revid.FfmpegRtmp
default: default:
logger.Log(smartlogger.Warning, pkg+"invalid Output1 param", "value", value) log.Log(logger.Warning, pkg+"invalid Output1 param", "value", value)
continue continue
} }
case "FramesPerClip": case "FramesPerClip":
fpc, err := strconv.Atoi(value) f, err := strconv.ParseUint(value, 10, 0)
if fpc > 0 && err == nil { if err != nil {
cfg.FramesPerClip = fpc log.Log(logger.Warning, pkg+"invalid framesperclip param", "value", value)
} else { break
logger.Log(smartlogger.Warning, pkg+"invalid FramesPerClip param", "value", value)
} }
cfg.FramesPerClip = uint(f)
case "RtmpUrl": case "RtmpUrl":
cfg.RtmpUrl = value cfg.RtmpUrl = value
case "Bitrate": case "Bitrate":
asInt, err := strconv.Atoi(value) r, err := strconv.ParseUint(value, 10, 0)
if asInt > 0 && err == nil { if err != nil {
cfg.Bitrate = value log.Log(logger.Warning, pkg+"invalid framerate param", "value", value)
} else { break
logger.Log(smartlogger.Warning, pkg+"invalid Bitrate param", "value", value)
} }
cfg.Bitrate = uint(r)
case "OutputFileName": case "OutputFileName":
cfg.OutputFileName = value cfg.OutputFileName = value
case "InputFileName": case "InputFileName":
cfg.InputFileName = value cfg.InputFileName = value
case "Height": case "Height":
asInt, err := strconv.Atoi(value) h, err := strconv.ParseUint(value, 10, 0)
if asInt > 0 && err == nil { if err != nil {
cfg.Height = value log.Log(logger.Warning, pkg+"invalid height param", "value", value)
} else { break
logger.Log(smartlogger.Warning, pkg+"invalid Height param", "value", value)
} }
cfg.Height = uint(h)
case "Width": case "Width":
asInt, err := strconv.Atoi(value) w, err := strconv.ParseUint(value, 10, 0)
if asInt > 0 && err == nil { if err != nil {
cfg.Width = value log.Log(logger.Warning, pkg+"invalid width param", "value", value)
} else { break
logger.Log(smartlogger.Warning, pkg+"invalid Width param", "value", value)
} }
cfg.Width = uint(w)
case "FrameRate": case "FrameRate":
asInt, err := strconv.Atoi(value) r, err := strconv.ParseUint(value, 10, 0)
if asInt > 0 && err == nil { if err != nil {
cfg.FrameRate = value log.Log(logger.Warning, pkg+"invalid framerate param", "value", value)
} else { break
logger.Log(smartlogger.Warning, pkg+"invalid FrameRate param", "value", value)
} }
cfg.FrameRate = uint(r)
case "HttpAddress": case "HttpAddress":
cfg.HttpAddress = value cfg.HttpAddress = value
case "Quantization": case "Quantization":
asInt, err := strconv.Atoi(value) q, err := strconv.ParseUint(value, 10, 0)
if asInt > 0 && err == nil { if err != nil {
cfg.Quantization = value log.Log(logger.Warning, pkg+"invalid quantization param", "value", value)
} else { break
logger.Log(smartlogger.Warning, pkg+"invalid Quantization param", "value", value)
}
case "Timeout":
asInt, err := strconv.Atoi(value)
if asInt > 0 && err == nil {
cfg.Timeout = value
} }
cfg.Quantization = uint(q)
case "IntraRefreshPeriod": case "IntraRefreshPeriod":
asInt, err := strconv.Atoi(value) p, err := strconv.ParseUint(value, 10, 0)
if asInt > 0 && err == nil { if err != nil {
cfg.IntraRefreshPeriod = value log.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value)
break
} }
cfg.IntraRefreshPeriod = uint(p)
case "HorizontalFlip": case "HorizontalFlip":
switch value { switch strings.ToLower(value) {
case "Yes": case "true":
cfg.FlipHorizontal = true cfg.FlipHorizontal = true
case "No": case "false":
cfg.FlipHorizontal = false cfg.FlipHorizontal = false
default: default:
logger.Log(smartlogger.Warning, pkg+"invalid HorizontalFlip param", "value", value) log.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value)
} }
case "VerticalFlip": case "VerticalFlip":
switch value { switch strings.ToLower(value) {
case "Yes": case "true":
cfg.FlipVertical = true cfg.FlipVertical = true
case "No": case "false":
cfg.FlipVertical = false cfg.FlipVertical = false
default: default:
logger.Log(smartlogger.Warning, pkg+"invalid VerticalFlip param", "value", value) log.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value)
} }
default: default:
} }

View File

@ -33,7 +33,8 @@ import (
"time" "time"
"bitbucket.org/ausocean/av/revid" "bitbucket.org/ausocean/av/revid"
"bitbucket.org/ausocean/utils/smartlogger" "bitbucket.org/ausocean/iot/pi/smartlogger"
"bitbucket.org/ausocean/utils/logger"
) )
const ( const (
@ -61,11 +62,11 @@ func main() {
RtmpMethod: revid.LibRtmp, RtmpMethod: revid.LibRtmp,
RtmpUrl: *rtmpUrlPtr, RtmpUrl: *rtmpUrlPtr,
Packetization: revid.Flv, Packetization: revid.Flv,
Logger: smartlogger.New(smartlogger.Info, logPath), Logger: logger.New(logger.Info, &smartlogger.New(logPath).LogRoller),
} }
revidInst, err := revid.New(config, nil) revidInst, err := revid.New(config, nil)
if err != nil { if err != nil {
config.Logger.Log(smartlogger.Error, "Should not have got an error!: ", err.Error()) config.Logger.Log(logger.Error, "Should not have got an error!: ", err.Error())
return return
} }
revidInst.Start() revidInst.Start()

View File

@ -31,7 +31,8 @@ import (
"time" "time"
"bitbucket.org/ausocean/av/revid" "bitbucket.org/ausocean/av/revid"
"bitbucket.org/ausocean/utils/smartlogger" "bitbucket.org/ausocean/iot/pi/smartlogger"
"bitbucket.org/ausocean/utils/logger"
) )
const ( const (
@ -52,11 +53,11 @@ func main() {
Output1: revid.File, Output1: revid.File,
OutputFileName: outputFile, OutputFileName: outputFile,
Packetization: revid.Mpegts, Packetization: revid.Mpegts,
Logger: smartlogger.New(smartlogger.Info, logPath), Logger: logger.New(logger.Info, &smartlogger.New(logPath).LogRoller),
} }
revidInst, err := revid.New(config, nil) revidInst, err := revid.New(config, nil)
if err != nil { if err != nil {
config.Logger.Log(smartlogger.Error, "Should not have got an error!:", err.Error()) config.Logger.Log(logger.Error, "Should not have got an error!:", err.Error())
return return
} }
revidInst.Start() revidInst.Start()

View File

@ -29,40 +29,43 @@ package revid
import ( import (
"errors" "errors"
"strconv"
"bitbucket.org/ausocean/utils/smartlogger" "bitbucket.org/ausocean/utils/logger"
) )
// Config provides parameters relevant to a revid instance. A new config must // Config provides parameters relevant to a revid instance. A new config must
// be passed to the constructor. // be passed to the constructor.
type Config struct { type Config struct {
LogLevel int8
Input uint8 Input uint8
InputCodec uint8 InputCodec uint8
Output1 uint8 Output1 uint8
Output2 uint8 Output2 uint8
RtmpMethod uint8 RtmpMethod uint8
Packetization uint8 Packetization uint8
QuantizationMode uint8
LogLevel int8 // Quantize specifies whether the input to
// revid will have constant or variable
// bitrate.
Quantize bool
// FlipHorizonatla and FlipVertical specify // FlipHorizonatla and FlipVertical specify
// whether video frames should be flipped. // whether video frames should be flipped.
FlipHorizontal bool FlipHorizontal bool
FlipVertical bool FlipVertical bool
FramesPerClip int FramesPerClip uint
RtmpUrl string RtmpUrl string
Bitrate string Bitrate uint
OutputFileName string OutputFileName string
InputFileName string InputFileName string
Height string Height uint
Width string Width uint
FrameRate string FrameRate uint
HttpAddress string HttpAddress string
Quantization string Quantization uint
Timeout string IntraRefreshPeriod uint
IntraRefreshPeriod string
RtpAddress string RtpAddress string
Logger Logger Logger Logger
SendRetry bool SendRetry bool
@ -98,20 +101,18 @@ const (
defaultInput = Raspivid defaultInput = Raspivid
defaultOutput = Http defaultOutput = Http
defaultPacketization = Flv defaultPacketization = Flv
defaultFrameRate = "25" defaultFrameRate = 25
defaultWidth = "1280" defaultWidth = 1280
defaultHeight = "720" defaultHeight = 720
defaultIntraRefreshPeriod = "100" defaultIntraRefreshPeriod = 100
defaultTimeout = "0" defaultTimeout = 0
defaultQuantization = "40" defaultQuantization = 40
defaultBitrate = "400000" defaultBitrate = 400000
defaultQuantizationMode = QuantizationOff defaultQuantizationMode = QuantizationOff
defaultFramesPerClip = 1 defaultFramesPerClip = 1
defaultVerticalFlip = No
defaultHorizontalFlip = No
httpFramesPerClip = 560 httpFramesPerClip = 560
defaultInputCodec = H264 defaultInputCodec = H264
defaultVerbosity = No defaultVerbosity = No // FIXME(kortschak): This makes no sense whatsoever. No is currently 15.
defaultRtpAddr = "localhost:6970" defaultRtpAddr = "localhost:6970"
) )
@ -123,28 +124,17 @@ func (c *Config) Validate(r *Revid) error {
case No: case No:
case NothingDefined: case NothingDefined:
c.LogLevel = defaultVerbosity c.LogLevel = defaultVerbosity
c.Logger.Log(smartlogger.Warning, pkg+"no LogLevel mode defined, defaulting", c.Logger.Log(logger.Warning, pkg+"no LogLevel mode defined, defaulting",
"LogLevel", defaultVerbosity) "LogLevel", defaultVerbosity)
default: default:
return errors.New("bad LogLevel defined in config") return errors.New("bad LogLevel defined in config")
} }
switch c.QuantizationMode {
case QuantizationOn:
case QuantizationOff:
case NothingDefined:
c.Logger.Log(smartlogger.Warning, pkg+"no quantization mode defined, defaulting",
"quantizationMode", QuantizationOff)
c.QuantizationMode = QuantizationOff
default:
return errors.New("bad QuantizationMode defined in config")
}
switch c.Input { switch c.Input {
case Raspivid: case Raspivid:
case File: case File:
case NothingDefined: case NothingDefined:
c.Logger.Log(smartlogger.Warning, pkg+"no input type defined, defaulting", "input", c.Logger.Log(logger.Warning, pkg+"no input type defined, defaulting", "input",
defaultInput) defaultInput)
c.Input = defaultInput c.Input = defaultInput
default: default:
@ -153,36 +143,31 @@ func (c *Config) Validate(r *Revid) error {
switch c.InputCodec { switch c.InputCodec {
case H264: case H264:
if c.Bitrate != "" && c.Quantization != "" { // FIXME(kortschak): This is not really what we want.
bitrate, err := strconv.Atoi(c.Bitrate) // Configuration really needs to be rethought here.
if err != nil { if c.Quantize && c.Quantization == 0 {
return errors.New("bitrate not an integer") c.Quantization = defaultQuantization
} else {
c.Bitrate = defaultBitrate
} }
quantization, err := strconv.Atoi(c.Quantization)
if err != nil { if (c.Bitrate > 0 && c.Quantization > 0) || (c.Bitrate == 0 && c.Quantization == 0) {
return errors.New("quantization not an integer")
}
if (bitrate > 0 && quantization > 0) || (bitrate == 0 && quantization == 0) {
return errors.New("bad bitrate and quantization combination for H264 input") return errors.New("bad bitrate and quantization combination for H264 input")
} }
}
case Mjpeg: case Mjpeg:
if c.Quantization != "" { if c.Quantization > 0 || c.Bitrate == 0 {
quantization, err := strconv.Atoi(c.Quantization)
if err != nil {
return errors.New("quantization not an integer")
}
if quantization > 0 || c.Bitrate == "" {
return errors.New("bad bitrate or quantization for mjpeg input") return errors.New("bad bitrate or quantization for mjpeg input")
} }
}
case NothingDefined: case NothingDefined:
c.Logger.Log(smartlogger.Warning, pkg+"no input codec defined, defaulting", c.Logger.Log(logger.Warning, pkg+"no input codec defined, defaulting",
"inputCodec", defaultInputCodec) "inputCodec", defaultInputCodec)
c.InputCodec = defaultInputCodec c.InputCodec = defaultInputCodec
c.Logger.Log(smartlogger.Warning, pkg+"defaulting quantization", "quantization", c.Logger.Log(logger.Warning, pkg+"defaulting quantization", "quantization",
defaultQuantization) defaultQuantization)
c.Quantization = defaultQuantization c.Quantization = defaultQuantization
default: default:
return errors.New("bad input codec defined in config") return errors.New("bad input codec defined in config")
} }
@ -192,20 +177,20 @@ func (c *Config) Validate(r *Revid) error {
case Udp: case Udp:
case Rtmp, FfmpegRtmp: case Rtmp, FfmpegRtmp:
if c.RtmpUrl == "" { if c.RtmpUrl == "" {
c.Logger.Log(smartlogger.Info, pkg+"no RTMP URL: falling back to HTTP") c.Logger.Log(logger.Info, pkg+"no RTMP URL: falling back to HTTP")
c.Output1 = Http c.Output1 = Http
break break
} }
c.Logger.Log(smartlogger.Info, pkg+"defaulting frames per clip for rtmp out", c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for rtmp out",
"framesPerClip", defaultFramesPerClip) "framesPerClip", defaultFramesPerClip)
c.FramesPerClip = defaultFramesPerClip c.FramesPerClip = defaultFramesPerClip
case NothingDefined: case NothingDefined:
c.Logger.Log(smartlogger.Warning, pkg+"no output defined, defaulting", "output", c.Logger.Log(logger.Warning, pkg+"no output defined, defaulting", "output",
defaultOutput) defaultOutput)
c.Output1 = defaultOutput c.Output1 = defaultOutput
fallthrough fallthrough
case Http, Rtp: case Http, Rtp:
c.Logger.Log(smartlogger.Info, pkg+"defaulting frames per clip for http out", c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for http out",
"framesPerClip", httpFramesPerClip) "framesPerClip", httpFramesPerClip)
c.FramesPerClip = httpFramesPerClip c.FramesPerClip = httpFramesPerClip
default: default:
@ -218,7 +203,7 @@ func (c *Config) Validate(r *Revid) error {
case Udp: case Udp:
case Rtmp, FfmpegRtmp: case Rtmp, FfmpegRtmp:
if c.RtmpUrl == "" { if c.RtmpUrl == "" {
c.Logger.Log(smartlogger.Info, pkg+"no RTMP URL: falling back to HTTP") c.Logger.Log(logger.Info, pkg+"no RTMP URL: falling back to HTTP")
c.Output2 = Http c.Output2 = Http
break break
} }
@ -229,78 +214,41 @@ func (c *Config) Validate(r *Revid) error {
} }
if c.FramesPerClip < 1 { if c.FramesPerClip < 1 {
c.Logger.Log(smartlogger.Warning, pkg+"no FramesPerClip defined, defaulting", c.Logger.Log(logger.Warning, pkg+"no FramesPerClip defined, defaulting",
"framesPerClip", defaultFramesPerClip) "framesPerClip", defaultFramesPerClip)
c.FramesPerClip = defaultFramesPerClip c.FramesPerClip = defaultFramesPerClip
} }
if c.Width == "" { if c.Width == 0 {
c.Logger.Log(smartlogger.Warning, pkg+"no width defined, defaulting", "width", c.Logger.Log(logger.Warning, pkg+"no width defined, defaulting", "width", defaultWidth)
defaultWidth)
c.Width = defaultWidth c.Width = defaultWidth
} else {
if integer, err := strconv.Atoi(c.Width); integer < 0 || err != nil {
return errors.New("width not unsigned integer")
}
} }
if c.Height == "" { if c.Height == 0 {
c.Logger.Log(smartlogger.Warning, pkg+"no height defined, defaulting", "height", c.Logger.Log(logger.Warning, pkg+"no height defined, defaulting", "height", defaultHeight)
defaultHeight)
c.Height = defaultHeight c.Height = defaultHeight
} else {
if integer, err := strconv.Atoi(c.Height); integer < 0 || err != nil {
return errors.New("height not unsigned integer")
}
} }
if c.FrameRate == "" { if c.FrameRate == 0 {
c.Logger.Log(smartlogger.Warning, pkg+"no frame rate defined, defaulting", "fps", c.Logger.Log(logger.Warning, pkg+"no frame rate defined, defaulting", "fps", defaultFrameRate)
defaultFrameRate)
c.FrameRate = defaultFrameRate c.FrameRate = defaultFrameRate
} else {
if integer, err := strconv.Atoi(c.FrameRate); integer < 0 || err != nil {
return errors.New("frame rate not unsigned integer")
}
} }
if c.Bitrate == "" { if c.Bitrate == 0 {
c.Logger.Log(smartlogger.Warning, pkg+"no bitrate defined, defaulting", "bitrate", c.Logger.Log(logger.Warning, pkg+"no bitrate defined, defaulting", "bitrate", defaultBitrate)
defaultBitrate)
c.Bitrate = defaultBitrate c.Bitrate = defaultBitrate
} else {
if integer, err := strconv.Atoi(c.Bitrate); integer < 0 || err != nil {
return errors.New("bitrate not unsigned integer")
}
} }
if c.Timeout == "" { if c.IntraRefreshPeriod == 0 {
c.Logger.Log(smartlogger.Warning, pkg+"no timeout defined, defaulting", "timeout", defaultTimeout) c.Logger.Log(logger.Warning, pkg+"no intra refresh defined, defaulting", "intraRefresh", defaultIntraRefreshPeriod)
c.Timeout = defaultTimeout
} else {
if integer, err := strconv.Atoi(c.Timeout); integer < 0 || err != nil {
return errors.New("timeout not unsigned integer")
}
}
if c.IntraRefreshPeriod == "" {
c.Logger.Log(smartlogger.Warning, pkg+"no intra refresh defined, defaulting", "intraRefresh",
defaultIntraRefreshPeriod)
c.IntraRefreshPeriod = defaultIntraRefreshPeriod c.IntraRefreshPeriod = defaultIntraRefreshPeriod
} else {
if integer, err := strconv.Atoi(c.IntraRefreshPeriod); integer < 0 || err != nil {
return errors.New("intra refresh not unsigned integer")
}
} }
if c.Quantization == "" { if c.Quantization == 0 {
c.Logger.Log(smartlogger.Warning, pkg+"no quantization defined, defaulting", "quantization", c.Logger.Log(logger.Warning, pkg+"no quantization defined, defaulting", "quantization", defaultQuantization)
defaultQuantization)
c.Quantization = defaultQuantization c.Quantization = defaultQuantization
} else { } else if c.Quantization > 51 {
if integer, err := strconv.Atoi(c.Quantization); integer < 0 || integer > 51 || err != nil { return errors.New("quantisation is over threshold")
return errors.New("quantisation not unsigned integer or is over threshold")
}
} }
if c.RtpAddress == "" { if c.RtpAddress == "" {

View File

@ -44,8 +44,8 @@ import (
"bitbucket.org/ausocean/av/stream/lex" "bitbucket.org/ausocean/av/stream/lex"
"bitbucket.org/ausocean/av/stream/mts" "bitbucket.org/ausocean/av/stream/mts"
"bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring" "bitbucket.org/ausocean/utils/ring"
"bitbucket.org/ausocean/utils/smartlogger"
) )
// Misc constants // Misc constants
@ -133,7 +133,7 @@ var prevTime = now
type packer struct { type packer struct {
owner *Revid owner *Revid
packetCount int packetCount uint
} }
// Write implements the io.Writer interface. // Write implements the io.Writer interface.
@ -143,21 +143,21 @@ type packer struct {
// write may include a dropped frame. // write may include a dropped frame.
func (p *packer) Write(frame []byte) (int, error) { func (p *packer) Write(frame []byte) (int, error) {
if len(frame) > ringBufferElementSize { if len(frame) > ringBufferElementSize {
p.owner.config.Logger.Log(smartlogger.Warning, pkg+"frame was too big", "frame size", len(frame)) p.owner.config.Logger.Log(logger.Warning, pkg+"frame was too big", "frame size", len(frame))
return len(frame), nil return len(frame), nil
} }
n, err := p.owner.buffer.Write(frame) n, err := p.owner.buffer.Write(frame)
if err != nil { if err != nil {
if err == ring.ErrDropped { if err == ring.ErrDropped {
p.owner.config.Logger.Log(smartlogger.Warning, pkg+"dropped frame", "frame size", len(frame)) p.owner.config.Logger.Log(logger.Warning, pkg+"dropped frame", "frame size", len(frame))
return len(frame), nil return len(frame), nil
} }
p.owner.config.Logger.Log(smartlogger.Error, pkg+"unexpected ring buffer write error", "error", err.Error()) p.owner.config.Logger.Log(logger.Error, pkg+"unexpected ring buffer write error", "error", err.Error())
return n, err return n, err
} }
p.packetCount++ p.packetCount++
now = time.Now() now = time.Now()
if now.Sub(prevTime) > clipDuration && p.packetCount%7 == 0 { if (p.owner.config.Output1 != Rtmp && now.Sub(prevTime) > clipDuration && p.packetCount%7 == 0) || p.owner.config.Output1 == Rtmp {
p.owner.buffer.Flush() p.owner.buffer.Flush()
p.packetCount = 0 p.packetCount = 0
prevTime = now prevTime = now
@ -227,7 +227,7 @@ func (r *Revid) reset(config Config) error {
} }
r.destination[outNo] = s r.destination[outNo] = s
case FfmpegRtmp: case FfmpegRtmp:
s, err := newFfmpegSender(config.RtmpUrl, r.config.FrameRate) s, err := newFfmpegSender(config.RtmpUrl, fmt.Sprint(r.config.FrameRate))
if err != nil { if err != nil {
return err return err
} }
@ -247,10 +247,7 @@ func (r *Revid) reset(config Config) error {
} }
r.destination[outNo] = s r.destination[outNo] = s
case Rtp: case Rtp:
// TODO: framerate in config should probably be an int, make conversions early s, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate)
// when setting config fields in revid-cli
fps, _ := strconv.Atoi(r.config.FrameRate)
s, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, fps)
if err != nil { if err != nil {
return err return err
} }
@ -266,10 +263,10 @@ func (r *Revid) reset(config Config) error {
} }
switch r.config.InputCodec { switch r.config.InputCodec {
case H264: case H264:
r.config.Logger.Log(smartlogger.Info, pkg+"using H264 lexer") r.config.Logger.Log(logger.Info, pkg+"using H264 lexer")
r.lexTo = lex.H264 r.lexTo = lex.H264
case Mjpeg: case Mjpeg:
r.config.Logger.Log(smartlogger.Info, pkg+"using MJPEG lexer") r.config.Logger.Log(logger.Info, pkg+"using MJPEG lexer")
r.lexTo = lex.MJPEG r.lexTo = lex.MJPEG
} }
@ -291,13 +288,11 @@ func (r *Revid) reset(config Config) error {
} }
r.encoder = stream.NopEncoder(&r.packer) r.encoder = stream.NopEncoder(&r.packer)
case Mpegts: case Mpegts:
r.config.Logger.Log(smartlogger.Info, pkg+"using MPEGTS packetisation") r.config.Logger.Log(logger.Info, pkg+"using MPEGTS packetisation")
frameRate, _ := strconv.ParseFloat(r.config.FrameRate, 64) r.encoder = mts.NewEncoder(&r.packer, float64(r.config.FrameRate))
r.encoder = mts.NewEncoder(&r.packer, frameRate)
case Flv: case Flv:
r.config.Logger.Log(smartlogger.Info, pkg+"using FLV packetisation") r.config.Logger.Log(logger.Info, pkg+"using FLV packetisation")
frameRate, _ := strconv.Atoi(r.config.FrameRate) r.encoder, err = flv.NewEncoder(&r.packer, true, true, int(r.config.FrameRate))
r.encoder, err = flv.NewEncoder(&r.packer, true, true, frameRate)
if err != nil { if err != nil {
return err return err
} }
@ -315,29 +310,29 @@ func (r *Revid) IsRunning() bool {
// and packetising (if theres packetization) to a defined output. // and packetising (if theres packetization) to a defined output.
func (r *Revid) Start() { func (r *Revid) Start() {
if r.isRunning { if r.isRunning {
r.config.Logger.Log(smartlogger.Warning, pkg+"revid.Start() called but revid already running") r.config.Logger.Log(logger.Warning, pkg+"revid.Start() called but revid already running")
return return
} }
r.config.Logger.Log(smartlogger.Info, pkg+"starting Revid") r.config.Logger.Log(logger.Info, pkg+"starting Revid")
r.config.Logger.Log(smartlogger.Debug, pkg+"setting up output") r.config.Logger.Log(logger.Debug, pkg+"setting up output")
r.isRunning = true r.isRunning = true
r.config.Logger.Log(smartlogger.Info, pkg+"starting output routine") r.config.Logger.Log(logger.Info, pkg+"starting output routine")
go r.outputClips() go r.outputClips()
r.config.Logger.Log(smartlogger.Info, pkg+"setting up input and receiving content") r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content")
go r.setupInput() go r.setupInput()
} }
// Stop halts any processing of video data from a camera or file // Stop halts any processing of video data from a camera or file
func (r *Revid) Stop() { func (r *Revid) Stop() {
if !r.isRunning { if !r.isRunning {
r.config.Logger.Log(smartlogger.Warning, pkg+"revid.Stop() called but revid not running") r.config.Logger.Log(logger.Warning, pkg+"revid.Stop() called but revid not running")
return return
} }
r.config.Logger.Log(smartlogger.Info, pkg+"stopping revid") r.config.Logger.Log(logger.Info, pkg+"stopping revid")
r.isRunning = false r.isRunning = false
r.config.Logger.Log(smartlogger.Info, pkg+"killing input proccess") r.config.Logger.Log(logger.Info, pkg+"killing input proccess")
// If a cmd process is running, we kill! // If a cmd process is running, we kill!
if r.cmd != nil && r.cmd.Process != nil { if r.cmd != nil && r.cmd.Process != nil {
r.cmd.Process.Kill() r.cmd.Process.Kill()
@ -357,53 +352,53 @@ loop:
case nil: case nil:
// Do nothing. // Do nothing.
case ring.ErrTimeout: case ring.ErrTimeout:
r.config.Logger.Log(smartlogger.Warning, pkg+"ring buffer read timeout") r.config.Logger.Log(logger.Warning, pkg+"ring buffer read timeout")
continue continue
default: default:
r.config.Logger.Log(smartlogger.Error, pkg+"unexpected error", "error", err.Error()) r.config.Logger.Log(logger.Error, pkg+"unexpected error", "error", err.Error())
fallthrough fallthrough
case io.EOF: case io.EOF:
break loop break loop
} }
count += chunk.Len() count += chunk.Len()
r.config.Logger.Log(smartlogger.Debug, pkg+"about to send") r.config.Logger.Log(logger.Debug, pkg+"about to send")
for i, dest := range r.destination { for i, dest := range r.destination {
err = dest.load(chunk) err = dest.load(chunk)
if err != nil { if err != nil {
r.config.Logger.Log(smartlogger.Error, pkg+"failed to load clip to output"+strconv.Itoa(i)) r.config.Logger.Log(logger.Error, pkg+"failed to load clip to output"+strconv.Itoa(i))
} }
} }
for i, dest := range r.destination { for i, dest := range r.destination {
err = dest.send() err = dest.send()
if err == nil { if err == nil {
r.config.Logger.Log(smartlogger.Debug, pkg+"sent clip to output "+strconv.Itoa(i)) r.config.Logger.Log(logger.Debug, pkg+"sent clip to output "+strconv.Itoa(i))
} else if r.config.SendRetry == false { } else if r.config.SendRetry == false {
r.config.Logger.Log(smartlogger.Warning, pkg+"send to output "+strconv.Itoa(i)+"failed", "error", err.Error()) r.config.Logger.Log(logger.Warning, pkg+"send to output "+strconv.Itoa(i)+"failed", "error", err.Error())
} else { } else {
r.config.Logger.Log(smartlogger.Error, pkg+"send to output "+strconv.Itoa(i)+ r.config.Logger.Log(logger.Error, pkg+"send to output "+strconv.Itoa(i)+
"failed, trying again", "error", err.Error()) "failed, trying again", "error", err.Error())
err = dest.send() err = dest.send()
if err != nil && chunk.Len() > 11 { if err != nil && chunk.Len() > 11 {
r.config.Logger.Log(smartlogger.Error, pkg+"second send attempted failed, restarting connection", "error", err.Error()) r.config.Logger.Log(logger.Error, pkg+"second send attempted failed, restarting connection", "error", err.Error())
for err != nil { for err != nil {
time.Sleep(sendFailedDelay) time.Sleep(sendFailedDelay)
if rs, ok := dest.(restarter); ok { if rs, ok := dest.(restarter); ok {
r.config.Logger.Log(smartlogger.Debug, pkg+"restarting session", "session", rs) r.config.Logger.Log(logger.Debug, pkg+"restarting session", "session", rs)
err = rs.restart() err = rs.restart()
if err != nil { if err != nil {
r.config.Logger.Log(smartlogger.Error, pkg+"failed to restart rtmp session", "error", err.Error()) r.config.Logger.Log(logger.Error, pkg+"failed to restart rtmp session", "error", err.Error())
r.isRunning = false r.isRunning = false
return return
} }
r.config.Logger.Log(smartlogger.Info, pkg+"restarted rtmp session") r.config.Logger.Log(logger.Info, pkg+"restarted rtmp session")
} }
err = dest.send() err = dest.send()
if err != nil { if err != nil {
r.config.Logger.Log(smartlogger.Error, pkg+"send failed again, with error", "error", err.Error()) r.config.Logger.Log(logger.Error, pkg+"send failed again, with error", "error", err.Error())
} }
} }
} }
@ -414,7 +409,7 @@ loop:
for _, dest := range r.destination { for _, dest := range r.destination {
dest.release() dest.release()
} }
r.config.Logger.Log(smartlogger.Debug, pkg+"done reading that clip from ring buffer") r.config.Logger.Log(logger.Debug, pkg+"done reading that clip from ring buffer")
// Log some information regarding bitrate and ring buffer size if it's time // Log some information regarding bitrate and ring buffer size if it's time
now := time.Now() now := time.Now()
@ -422,17 +417,17 @@ loop:
if deltaTime > bitrateTime { if deltaTime > bitrateTime {
// FIXME(kortschak): For subsecond deltaTime, this will give infinite bitrate. // FIXME(kortschak): For subsecond deltaTime, this will give infinite bitrate.
r.bitrate = int(float64(count*8) / float64(deltaTime/time.Second)) r.bitrate = int(float64(count*8) / float64(deltaTime/time.Second))
r.config.Logger.Log(smartlogger.Debug, pkg+"bitrate (bits/s)", "bitrate", r.bitrate) r.config.Logger.Log(logger.Debug, pkg+"bitrate (bits/s)", "bitrate", r.bitrate)
r.config.Logger.Log(smartlogger.Debug, pkg+"ring buffer size", "value", r.buffer.Len()) r.config.Logger.Log(logger.Debug, pkg+"ring buffer size", "value", r.buffer.Len())
lastTime = now lastTime = now
count = 0 count = 0
} }
} }
r.config.Logger.Log(smartlogger.Info, pkg+"not outputting clips anymore") r.config.Logger.Log(logger.Info, pkg+"not outputting clips anymore")
for i, dest := range r.destination { for i, dest := range r.destination {
err := dest.close() err := dest.close()
if err != nil { if err != nil {
r.config.Logger.Log(smartlogger.Error, pkg+"failed to close output"+strconv.Itoa(i)+" destination", "error", err.Error()) r.config.Logger.Log(logger.Error, pkg+"failed to close output"+strconv.Itoa(i)+" destination", "error", err.Error())
} }
} }
} }
@ -440,17 +435,17 @@ loop:
// startRaspivid sets up things for input from raspivid i.e. starts // startRaspivid sets up things for input from raspivid i.e. starts
// a raspivid process and pipes it's data output. // a raspivid process and pipes it's data output.
func (r *Revid) startRaspivid() error { func (r *Revid) startRaspivid() error {
r.config.Logger.Log(smartlogger.Info, pkg+"starting raspivid") r.config.Logger.Log(logger.Info, pkg+"starting raspivid")
const disabled = "0" const disabled = "0"
args := []string{ args := []string{
"--output", "-", "--output", "-",
"--nopreview", "--nopreview",
"--timeout", disabled, "--timeout", disabled,
"--width", r.config.Width, "--width", fmt.Sprint(r.config.Width),
"--height", r.config.Height, "--height", fmt.Sprint(r.config.Height),
"--bitrate", r.config.Bitrate, "--bitrate", fmt.Sprint(r.config.Bitrate),
"--framerate", r.config.FrameRate, "--framerate", fmt.Sprint(r.config.FrameRate),
} }
if r.config.FlipHorizontal { if r.config.FlipHorizontal {
args = append(args, "--hflip") args = append(args, "--hflip")
@ -465,48 +460,40 @@ func (r *Revid) startRaspivid() error {
args = append(args, args = append(args,
"--codec", "H264", "--codec", "H264",
"--inline", "--inline",
"--intra", r.config.IntraRefreshPeriod, "--intra", fmt.Sprint(r.config.IntraRefreshPeriod),
) )
if r.config.QuantizationMode == QuantizationOn { if r.config.Quantize {
args = append(args, "-qp", r.config.Quantization) args = append(args, "-qp", fmt.Sprint(r.config.Quantization))
} }
case Mjpeg: case Mjpeg:
args = append(args, "--codec", "MJPEG") args = append(args, "--codec", "MJPEG")
} }
r.config.Logger.Log(smartlogger.Info, pkg+"raspivid args", "raspividArgs", strings.Join(args, " ")) r.config.Logger.Log(logger.Info, pkg+"raspivid args", "raspividArgs", strings.Join(args, " "))
r.cmd = exec.Command("raspivid", args...) r.cmd = exec.Command("raspivid", args...)
d, err := strconv.Atoi(r.config.FrameRate)
if err != nil {
return err
}
delay := time.Second / time.Duration(d)
stdout, err := r.cmd.StdoutPipe() stdout, err := r.cmd.StdoutPipe()
if err != nil { if err != nil {
return err return err
} }
err = r.cmd.Start() err = r.cmd.Start()
if err != nil { if err != nil {
return err r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error())
} }
r.config.Logger.Log(smartlogger.Info, pkg+"reading camera data") r.config.Logger.Log(logger.Info, pkg+"reading camera data")
delay := time.Second / time.Duration(r.config.FrameRate)
err = r.lexTo(r.encoder, stdout, delay) err = r.lexTo(r.encoder, stdout, delay)
r.config.Logger.Log(smartlogger.Info, pkg+"finished reading camera data") r.config.Logger.Log(logger.Info, pkg+"finished reading camera data")
return err return err
} }
// setupInputForFile sets things up for getting input from a file // setupInputForFile sets things up for getting input from a file
func (r *Revid) setupInputForFile() error { func (r *Revid) setupInputForFile() error {
fps, err := strconv.Atoi(r.config.FrameRate) delay := time.Second / time.Duration(r.config.FrameRate)
if err != nil {
return err
}
delay := time.Second / time.Duration(fps)
f, err := os.Open(r.config.InputFileName) f, err := os.Open(r.config.InputFileName)
if err != nil { if err != nil {
r.config.Logger.Log(smartlogger.Error, err.Error()) r.config.Logger.Log(logger.Error, err.Error())
r.Stop() r.Stop()
return err return err
} }

View File

@ -39,8 +39,8 @@ import (
"bitbucket.org/ausocean/av/stream/mts" "bitbucket.org/ausocean/av/stream/mts"
"bitbucket.org/ausocean/av/stream/rtp" "bitbucket.org/ausocean/av/stream/rtp"
"bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring" "bitbucket.org/ausocean/utils/ring"
"bitbucket.org/ausocean/utils/smartlogger"
) )
// loadSender is a destination to send a *ring.Chunk to. // loadSender is a destination to send a *ring.Chunk to.
@ -166,18 +166,18 @@ func (s *httpSender) extractMeta(r string) error {
// Extract time from reply // Extract time from reply
t, err := dec.Int("ts") t, err := dec.Int("ts")
if err != nil { if err != nil {
s.log(smartlogger.Warning, pkg+"No timestamp in reply") s.log(logger.Warning, pkg+"No timestamp in reply")
} else { } else {
s.log(smartlogger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t)) s.log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t))
mts.SetTimeStamp(uint64(t)) mts.SetTimeStamp(uint64(t))
} }
// Extract location from reply // Extract location from reply
g, err := dec.String("ll") g, err := dec.String("ll")
if err != nil { if err != nil {
s.log(smartlogger.Warning, pkg+"No location in reply") s.log(logger.Warning, pkg+"No location in reply")
} else { } else {
s.log(smartlogger.Debug, fmt.Sprintf("%v got location: %v", pkg, g)) s.log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g))
mts.SetLocation(g) mts.SetLocation(g)
} }
@ -269,10 +269,10 @@ func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg
if err == nil { if err == nil {
break break
} }
log(smartlogger.Error, err.Error()) log(logger.Error, err.Error())
sess.Close() sess.Close()
if n < retries-1 { if n < retries-1 {
log(smartlogger.Info, pkg+"retry rtmp connection") log(logger.Info, pkg+"retry rtmp connection")
} }
} }
if err != nil { if err != nil {
@ -315,10 +315,10 @@ func (s *rtmpSender) restart() error {
if err == nil { if err == nil {
break break
} }
s.log(smartlogger.Error, err.Error()) s.log(logger.Error, err.Error())
s.sess.Close() s.sess.Close()
if n < s.retries-1 { if n < s.retries-1 {
s.log(smartlogger.Info, pkg+"retry rtmp connection") s.log(logger.Info, pkg+"retry rtmp connection")
} }
} }
return err return err
@ -371,14 +371,14 @@ type rtpSender struct {
encoder *rtp.Encoder encoder *rtp.Encoder
} }
func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps int) (*rtpSender, error) { func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps uint) (*rtpSender, error) {
conn, err := net.Dial("udp", addr) conn, err := net.Dial("udp", addr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
s := &rtpSender{ s := &rtpSender{
log: log, log: log,
encoder: rtp.NewEncoder(conn, fps), encoder: rtp.NewEncoder(conn, int(fps)),
} }
return s, nil return s, nil
} }