Updating branch with master

Merge branch 'master' into rtp-ringbuff-bypass
This commit is contained in:
saxon 2019-01-03 11:57:48 +10:30
commit 12c8c604e0
6 changed files with 109 additions and 107 deletions

View File

@ -38,7 +38,8 @@ import (
"bitbucket.org/ausocean/av/revid" "bitbucket.org/ausocean/av/revid"
"bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/smartlogger" "bitbucket.org/ausocean/iot/pi/smartlogger"
"bitbucket.org/ausocean/utils/logger"
) )
const ( const (
@ -46,7 +47,7 @@ const (
progName = "revid-cli" progName = "revid-cli"
// Logging is set to INFO level. // Logging is set to INFO level.
defaultLogVerbosity = smartlogger.Debug defaultLogVerbosity = logger.Debug
) )
// Other misc consts // Other misc consts
@ -62,7 +63,7 @@ const (
var canProfile = true var canProfile = true
// The logger that will be used throughout // The logger that will be used throughout
var logger *smartlogger.Logger var log *logger.Logger
func main() { func main() {
useNetsender := flag.Bool("NetSender", false, "Are we checking vars through netsender?") useNetsender := flag.Bool("NetSender", false, "Are we checking vars through netsender?")
@ -74,7 +75,7 @@ func main() {
// run revid for the specified duration // run revid for the specified duration
rv, _, err := startRevid(nil, cfg) rv, _, err := startRevid(nil, cfg)
if err != nil { if err != nil {
cfg.Logger.Log(smartlogger.Fatal, pkg+"failed to start revid", err.Error()) cfg.Logger.Log(logger.Fatal, pkg+"failed to start revid", err.Error())
} }
time.Sleep(*runDurationPtr) time.Sleep(*runDurationPtr)
stopRevid(rv) stopRevid(rv)
@ -83,7 +84,7 @@ func main() {
err := run(nil, cfg) err := run(nil, cfg)
if err != nil { if err != nil {
logger.Log(smartlogger.Fatal, pkg+"failed to run revid", "error", err.Error()) log.Log(logger.Fatal, pkg+"failed to run revid", "error", err.Error())
os.Exit(1) os.Exit(1)
} }
} }
@ -117,28 +118,27 @@ func handleFlags() revid.Config {
intraRefreshPeriodPtr = flag.Uint("IntraRefreshPeriod", 0, "The IntraRefreshPeriod i.e. how many keyframes we send") intraRefreshPeriodPtr = flag.Uint("IntraRefreshPeriod", 0, "The IntraRefreshPeriod i.e. how many keyframes we send")
verticalFlipPtr = flag.Bool("VerticalFlip", false, "Flip video vertically: Yes, No") verticalFlipPtr = flag.Bool("VerticalFlip", false, "Flip video vertically: Yes, No")
horizontalFlipPtr = flag.Bool("HorizontalFlip", false, "Flip video horizontally: Yes, No") horizontalFlipPtr = flag.Bool("HorizontalFlip", false, "Flip video horizontally: Yes, No")
logPathPtr = flag.String("LogPath", defaultLogPath, "Path for logging files (default is /var/log/netsender/)")
rtpAddrPtr = flag.String("RtpAddr", "", "Rtp destination address: <IP>:<port> (port is generally 6970-6999)") rtpAddrPtr = flag.String("RtpAddr", "", "Rtp destination address: <IP>:<port> (port is generally 6970-6999)")
) )
flag.Parse() flag.Parse()
logger = smartlogger.New(defaultLogVerbosity, *logPathPtr) log = logger.New(defaultLogVerbosity, &smartlogger.New("/var/log/netsender").LogRoller)
cfg.Logger = logger cfg.Logger = log
if *cpuprofile != "" { if *cpuprofile != "" {
if canProfile { if canProfile {
f, err := os.Create(*cpuprofile) f, err := os.Create(*cpuprofile)
if err != nil { if err != nil {
logger.Log(smartlogger.Fatal, pkg+"could not create CPU profile", "error", err.Error()) log.Log(logger.Fatal, pkg+"could not create CPU profile", "error", err.Error())
} }
if err := pprof.StartCPUProfile(f); err != nil { if err := pprof.StartCPUProfile(f); err != nil {
logger.Log(smartlogger.Fatal, pkg+"could not start CPU profile", "error", err.Error()) log.Log(logger.Fatal, pkg+"could not start CPU profile", "error", err.Error())
} }
defer pprof.StopCPUProfile() defer pprof.StopCPUProfile()
} else { } else {
logger.Log(smartlogger.Warning, pkg+"ignoring cpuprofile flag - http/pprof built in.") log.Log(logger.Warning, pkg+"ignoring cpuprofile flag - http/pprof built in.")
} }
} }
@ -149,7 +149,7 @@ func handleFlags() revid.Config {
cfg.Input = revid.File cfg.Input = revid.File
case "": case "":
default: default:
logger.Log(smartlogger.Error, pkg+"bad input argument") log.Log(logger.Error, pkg+"bad input argument")
} }
switch *inputCodecPtr { switch *inputCodecPtr {
@ -157,7 +157,7 @@ func handleFlags() revid.Config {
cfg.InputCodec = revid.H264 cfg.InputCodec = revid.H264
case "": case "":
default: default:
logger.Log(smartlogger.Error, pkg+"bad input codec argument") log.Log(logger.Error, pkg+"bad input codec argument")
} }
switch *output1Ptr { switch *output1Ptr {
@ -175,7 +175,7 @@ func handleFlags() revid.Config {
cfg.Output1 = revid.Rtp cfg.Output1 = revid.Rtp
case "": case "":
default: default:
logger.Log(smartlogger.Error, pkg+"bad output 1 argument") log.Log(logger.Error, pkg+"bad output 1 argument")
} }
switch *output2Ptr { switch *output2Ptr {
@ -193,7 +193,7 @@ func handleFlags() revid.Config {
cfg.Output2 = revid.Rtp cfg.Output2 = revid.Rtp
case "": case "":
default: default:
logger.Log(smartlogger.Error, pkg+"bad output 2 argument") log.Log(logger.Error, pkg+"bad output 2 argument")
} }
switch *rtmpMethodPtr { switch *rtmpMethodPtr {
@ -203,7 +203,7 @@ func handleFlags() revid.Config {
cfg.RtmpMethod = revid.LibRtmp cfg.RtmpMethod = revid.LibRtmp
case "": case "":
default: default:
logger.Log(smartlogger.Error, pkg+"bad rtmp method argument") log.Log(logger.Error, pkg+"bad rtmp method argument")
} }
switch *packetizationPtr { switch *packetizationPtr {
@ -215,18 +215,18 @@ func handleFlags() revid.Config {
cfg.Packetization = revid.Flv cfg.Packetization = revid.Flv
case "": case "":
default: default:
logger.Log(smartlogger.Error, pkg+"bad packetization argument") log.Log(logger.Error, pkg+"bad packetization argument")
} }
switch *verbosityPtr { switch *verbosityPtr {
case "No": case "No":
cfg.LogLevel = smartlogger.Fatal cfg.LogLevel = logger.Fatal
case "Debug": case "Debug":
cfg.LogLevel = smartlogger.Debug cfg.LogLevel = logger.Debug
//logger.SetLevel(smartlogger.Debug) //logger.SetLevel(logger.Debug)
case "": case "":
default: default:
logger.Log(smartlogger.Error, pkg+"bad verbosity argument") log.Log(logger.Error, pkg+"bad verbosity argument")
} }
cfg.Quantize = *quantizePtr cfg.Quantize = *quantizePtr
@ -252,10 +252,10 @@ func handleFlags() revid.Config {
func run(rv *revid.Revid, cfg revid.Config) error { func run(rv *revid.Revid, cfg revid.Config) error {
// initialize NetSender and use NetSender's logger // initialize NetSender and use NetSender's logger
//config.Logger = netsender.Logger() //config.Logger = netsender.Logger()
logger.Log(smartlogger.Info, pkg+"running in NetSender mode") log.Log(logger.Info, pkg+"running in NetSender mode")
var ns netsender.Sender var ns netsender.Sender
err := ns.Init(logger, nil, nil, nil) err := ns.Init(log, nil, nil, nil)
if err != nil { if err != nil {
return err return err
} }
@ -274,7 +274,7 @@ func run(rv *revid.Revid, cfg revid.Config) error {
for { for {
if err := send(&ns, rv); err != nil { if err := send(&ns, rv); err != nil {
logger.Log(smartlogger.Error, pkg+"polling failed", "error", err.Error()) log.Log(logger.Error, pkg+"polling failed", "error", err.Error())
time.Sleep(netSendRetryTime) time.Sleep(netSendRetryTime)
continue continue
} }
@ -283,14 +283,14 @@ func run(rv *revid.Revid, cfg revid.Config) error {
// vars changed // vars changed
vars, err := ns.Vars() vars, err := ns.Vars()
if err != nil { if err != nil {
logger.Log(smartlogger.Error, pkg+"netSender failed to get vars", "error", err.Error()) log.Log(logger.Error, pkg+"netSender failed to get vars", "error", err.Error())
time.Sleep(netSendRetryTime) time.Sleep(netSendRetryTime)
continue continue
} }
vs = ns.VarSum() vs = ns.VarSum()
if vars["mode"] == "Paused" { if vars["mode"] == "Paused" {
if !paused { if !paused {
logger.Log(smartlogger.Info, pkg+"pausing revid") log.Log(logger.Info, pkg+"pausing revid")
stopRevid(rv) stopRevid(rv)
paused = true paused = true
} }
@ -369,13 +369,13 @@ func updateRevid(ns *netsender.Sender, rv *revid.Revid, cfg revid.Config, vars m
case "FfmpegRtmp": case "FfmpegRtmp":
cfg.Output1 = revid.FfmpegRtmp cfg.Output1 = revid.FfmpegRtmp
default: default:
logger.Log(smartlogger.Warning, pkg+"invalid Output1 param", "value", value) log.Log(logger.Warning, pkg+"invalid Output1 param", "value", value)
continue continue
} }
case "FramesPerClip": case "FramesPerClip":
f, err := strconv.ParseUint(value, 10, 0) f, err := strconv.ParseUint(value, 10, 0)
if err != nil { if err != nil {
logger.Log(smartlogger.Warning, pkg+"invalid framesperclip param", "value", value) log.Log(logger.Warning, pkg+"invalid framesperclip param", "value", value)
break break
} }
cfg.FramesPerClip = uint(f) cfg.FramesPerClip = uint(f)
@ -384,7 +384,7 @@ func updateRevid(ns *netsender.Sender, rv *revid.Revid, cfg revid.Config, vars m
case "Bitrate": case "Bitrate":
r, err := strconv.ParseUint(value, 10, 0) r, err := strconv.ParseUint(value, 10, 0)
if err != nil { if err != nil {
logger.Log(smartlogger.Warning, pkg+"invalid framerate param", "value", value) log.Log(logger.Warning, pkg+"invalid framerate param", "value", value)
break break
} }
cfg.Bitrate = uint(r) cfg.Bitrate = uint(r)
@ -395,21 +395,21 @@ func updateRevid(ns *netsender.Sender, rv *revid.Revid, cfg revid.Config, vars m
case "Height": case "Height":
h, err := strconv.ParseUint(value, 10, 0) h, err := strconv.ParseUint(value, 10, 0)
if err != nil { if err != nil {
logger.Log(smartlogger.Warning, pkg+"invalid height param", "value", value) log.Log(logger.Warning, pkg+"invalid height param", "value", value)
break break
} }
cfg.Height = uint(h) cfg.Height = uint(h)
case "Width": case "Width":
w, err := strconv.ParseUint(value, 10, 0) w, err := strconv.ParseUint(value, 10, 0)
if err != nil { if err != nil {
logger.Log(smartlogger.Warning, pkg+"invalid width param", "value", value) log.Log(logger.Warning, pkg+"invalid width param", "value", value)
break break
} }
cfg.Width = uint(w) cfg.Width = uint(w)
case "FrameRate": case "FrameRate":
r, err := strconv.ParseUint(value, 10, 0) r, err := strconv.ParseUint(value, 10, 0)
if err != nil { if err != nil {
logger.Log(smartlogger.Warning, pkg+"invalid framerate param", "value", value) log.Log(logger.Warning, pkg+"invalid framerate param", "value", value)
break break
} }
cfg.FrameRate = uint(r) cfg.FrameRate = uint(r)
@ -418,14 +418,14 @@ func updateRevid(ns *netsender.Sender, rv *revid.Revid, cfg revid.Config, vars m
case "Quantization": case "Quantization":
q, err := strconv.ParseUint(value, 10, 0) q, err := strconv.ParseUint(value, 10, 0)
if err != nil { if err != nil {
logger.Log(smartlogger.Warning, pkg+"invalid quantization param", "value", value) log.Log(logger.Warning, pkg+"invalid quantization param", "value", value)
break break
} }
cfg.Quantization = uint(q) cfg.Quantization = uint(q)
case "IntraRefreshPeriod": case "IntraRefreshPeriod":
p, err := strconv.ParseUint(value, 10, 0) p, err := strconv.ParseUint(value, 10, 0)
if err != nil { if err != nil {
logger.Log(smartlogger.Warning, pkg+"invalid intrarefreshperiod param", "value", value) log.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value)
break break
} }
cfg.IntraRefreshPeriod = uint(p) cfg.IntraRefreshPeriod = uint(p)
@ -436,7 +436,7 @@ func updateRevid(ns *netsender.Sender, rv *revid.Revid, cfg revid.Config, vars m
case "false": case "false":
cfg.FlipHorizontal = false cfg.FlipHorizontal = false
default: default:
logger.Log(smartlogger.Warning, pkg+"invalid HorizontalFlip param", "value", value) log.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value)
} }
case "VerticalFlip": case "VerticalFlip":
switch strings.ToLower(value) { switch strings.ToLower(value) {
@ -445,7 +445,7 @@ func updateRevid(ns *netsender.Sender, rv *revid.Revid, cfg revid.Config, vars m
case "false": case "false":
cfg.FlipVertical = false cfg.FlipVertical = false
default: default:
logger.Log(smartlogger.Warning, pkg+"invalid VerticalFlip param", "value", value) log.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value)
} }
default: default:
} }

View File

@ -33,7 +33,8 @@ import (
"time" "time"
"bitbucket.org/ausocean/av/revid" "bitbucket.org/ausocean/av/revid"
"bitbucket.org/ausocean/utils/smartlogger" "bitbucket.org/ausocean/iot/pi/smartlogger"
"bitbucket.org/ausocean/utils/logger"
) )
const ( const (
@ -61,11 +62,11 @@ func main() {
RtmpMethod: revid.LibRtmp, RtmpMethod: revid.LibRtmp,
RtmpUrl: *rtmpUrlPtr, RtmpUrl: *rtmpUrlPtr,
Packetization: revid.Flv, Packetization: revid.Flv,
Logger: smartlogger.New(smartlogger.Info, logPath), Logger: logger.New(logger.Info, &smartlogger.New(logPath).LogRoller),
} }
revidInst, err := revid.New(config, nil) revidInst, err := revid.New(config, nil)
if err != nil { if err != nil {
config.Logger.Log(smartlogger.Error, "Should not have got an error!: ", err.Error()) config.Logger.Log(logger.Error, "Should not have got an error!: ", err.Error())
return return
} }
revidInst.Start() revidInst.Start()

View File

@ -31,7 +31,8 @@ import (
"time" "time"
"bitbucket.org/ausocean/av/revid" "bitbucket.org/ausocean/av/revid"
"bitbucket.org/ausocean/utils/smartlogger" "bitbucket.org/ausocean/iot/pi/smartlogger"
"bitbucket.org/ausocean/utils/logger"
) )
const ( const (
@ -52,11 +53,11 @@ func main() {
Output1: revid.File, Output1: revid.File,
OutputFileName: outputFile, OutputFileName: outputFile,
Packetization: revid.Mpegts, Packetization: revid.Mpegts,
Logger: smartlogger.New(smartlogger.Info, logPath), Logger: logger.New(logger.Info, &smartlogger.New(logPath).LogRoller),
} }
revidInst, err := revid.New(config, nil) revidInst, err := revid.New(config, nil)
if err != nil { if err != nil {
config.Logger.Log(smartlogger.Error, "Should not have got an error!:", err.Error()) config.Logger.Log(logger.Error, "Should not have got an error!:", err.Error())
return return
} }
revidInst.Start() revidInst.Start()

View File

@ -30,7 +30,7 @@ package revid
import ( import (
"errors" "errors"
"bitbucket.org/ausocean/utils/smartlogger" "bitbucket.org/ausocean/utils/logger"
) )
// Config provides parameters relevant to a revid instance. A new config must // Config provides parameters relevant to a revid instance. A new config must
@ -124,7 +124,7 @@ func (c *Config) Validate(r *Revid) error {
case No: case No:
case NothingDefined: case NothingDefined:
c.LogLevel = defaultVerbosity c.LogLevel = defaultVerbosity
c.Logger.Log(smartlogger.Warning, pkg+"no LogLevel mode defined, defaulting", c.Logger.Log(logger.Warning, pkg+"no LogLevel mode defined, defaulting",
"LogLevel", defaultVerbosity) "LogLevel", defaultVerbosity)
default: default:
return errors.New("bad LogLevel defined in config") return errors.New("bad LogLevel defined in config")
@ -134,7 +134,7 @@ func (c *Config) Validate(r *Revid) error {
case Raspivid: case Raspivid:
case File: case File:
case NothingDefined: case NothingDefined:
c.Logger.Log(smartlogger.Warning, pkg+"no input type defined, defaulting", "input", c.Logger.Log(logger.Warning, pkg+"no input type defined, defaulting", "input",
defaultInput) defaultInput)
c.Input = defaultInput c.Input = defaultInput
default: default:
@ -161,10 +161,10 @@ func (c *Config) Validate(r *Revid) error {
} }
case NothingDefined: case NothingDefined:
c.Logger.Log(smartlogger.Warning, pkg+"no input codec defined, defaulting", c.Logger.Log(logger.Warning, pkg+"no input codec defined, defaulting",
"inputCodec", defaultInputCodec) "inputCodec", defaultInputCodec)
c.InputCodec = defaultInputCodec c.InputCodec = defaultInputCodec
c.Logger.Log(smartlogger.Warning, pkg+"defaulting quantization", "quantization", c.Logger.Log(logger.Warning, pkg+"defaulting quantization", "quantization",
defaultQuantization) defaultQuantization)
c.Quantization = defaultQuantization c.Quantization = defaultQuantization
@ -177,20 +177,20 @@ func (c *Config) Validate(r *Revid) error {
case Udp: case Udp:
case Rtmp, FfmpegRtmp: case Rtmp, FfmpegRtmp:
if c.RtmpUrl == "" { if c.RtmpUrl == "" {
c.Logger.Log(smartlogger.Info, pkg+"no RTMP URL: falling back to HTTP") c.Logger.Log(logger.Info, pkg+"no RTMP URL: falling back to HTTP")
c.Output1 = Http c.Output1 = Http
break break
} }
c.Logger.Log(smartlogger.Info, pkg+"defaulting frames per clip for rtmp out", c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for rtmp out",
"framesPerClip", defaultFramesPerClip) "framesPerClip", defaultFramesPerClip)
c.FramesPerClip = defaultFramesPerClip c.FramesPerClip = defaultFramesPerClip
case NothingDefined: case NothingDefined:
c.Logger.Log(smartlogger.Warning, pkg+"no output defined, defaulting", "output", c.Logger.Log(logger.Warning, pkg+"no output defined, defaulting", "output",
defaultOutput) defaultOutput)
c.Output1 = defaultOutput c.Output1 = defaultOutput
fallthrough fallthrough
case Http, Rtp: case Http, Rtp:
c.Logger.Log(smartlogger.Info, pkg+"defaulting frames per clip for http out", c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for http out",
"framesPerClip", httpFramesPerClip) "framesPerClip", httpFramesPerClip)
c.FramesPerClip = httpFramesPerClip c.FramesPerClip = httpFramesPerClip
default: default:
@ -203,7 +203,7 @@ func (c *Config) Validate(r *Revid) error {
case Udp: case Udp:
case Rtmp, FfmpegRtmp: case Rtmp, FfmpegRtmp:
if c.RtmpUrl == "" { if c.RtmpUrl == "" {
c.Logger.Log(smartlogger.Info, pkg+"no RTMP URL: falling back to HTTP") c.Logger.Log(logger.Info, pkg+"no RTMP URL: falling back to HTTP")
c.Output2 = Http c.Output2 = Http
break break
} }
@ -214,38 +214,38 @@ func (c *Config) Validate(r *Revid) error {
} }
if c.FramesPerClip < 1 { if c.FramesPerClip < 1 {
c.Logger.Log(smartlogger.Warning, pkg+"no FramesPerClip defined, defaulting", c.Logger.Log(logger.Warning, pkg+"no FramesPerClip defined, defaulting",
"framesPerClip", defaultFramesPerClip) "framesPerClip", defaultFramesPerClip)
c.FramesPerClip = defaultFramesPerClip c.FramesPerClip = defaultFramesPerClip
} }
if c.Width == 0 { if c.Width == 0 {
c.Logger.Log(smartlogger.Warning, pkg+"no width defined, defaulting", "width", defaultWidth) c.Logger.Log(logger.Warning, pkg+"no width defined, defaulting", "width", defaultWidth)
c.Width = defaultWidth c.Width = defaultWidth
} }
if c.Height == 0 { if c.Height == 0 {
c.Logger.Log(smartlogger.Warning, pkg+"no height defined, defaulting", "height", defaultHeight) c.Logger.Log(logger.Warning, pkg+"no height defined, defaulting", "height", defaultHeight)
c.Height = defaultHeight c.Height = defaultHeight
} }
if c.FrameRate == 0 { if c.FrameRate == 0 {
c.Logger.Log(smartlogger.Warning, pkg+"no frame rate defined, defaulting", "fps", defaultFrameRate) c.Logger.Log(logger.Warning, pkg+"no frame rate defined, defaulting", "fps", defaultFrameRate)
c.FrameRate = defaultFrameRate c.FrameRate = defaultFrameRate
} }
if c.Bitrate == 0 { if c.Bitrate == 0 {
c.Logger.Log(smartlogger.Warning, pkg+"no bitrate defined, defaulting", "bitrate", defaultBitrate) c.Logger.Log(logger.Warning, pkg+"no bitrate defined, defaulting", "bitrate", defaultBitrate)
c.Bitrate = defaultBitrate c.Bitrate = defaultBitrate
} }
if c.IntraRefreshPeriod == 0 { if c.IntraRefreshPeriod == 0 {
c.Logger.Log(smartlogger.Warning, pkg+"no intra refresh defined, defaulting", "intraRefresh", defaultIntraRefreshPeriod) c.Logger.Log(logger.Warning, pkg+"no intra refresh defined, defaulting", "intraRefresh", defaultIntraRefreshPeriod)
c.IntraRefreshPeriod = defaultIntraRefreshPeriod c.IntraRefreshPeriod = defaultIntraRefreshPeriod
} }
if c.Quantization == 0 { if c.Quantization == 0 {
c.Logger.Log(smartlogger.Warning, pkg+"no quantization defined, defaulting", "quantization", defaultQuantization) c.Logger.Log(logger.Warning, pkg+"no quantization defined, defaulting", "quantization", defaultQuantization)
c.Quantization = defaultQuantization c.Quantization = defaultQuantization
} else if c.Quantization > 51 { } else if c.Quantization > 51 {
return errors.New("quantisation is over threshold") return errors.New("quantisation is over threshold")

View File

@ -44,8 +44,8 @@ import (
"bitbucket.org/ausocean/av/stream/lex" "bitbucket.org/ausocean/av/stream/lex"
"bitbucket.org/ausocean/av/stream/mts" "bitbucket.org/ausocean/av/stream/mts"
"bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring" "bitbucket.org/ausocean/utils/ring"
"bitbucket.org/ausocean/utils/smartlogger"
) )
// Misc constants // Misc constants
@ -147,7 +147,7 @@ type packer struct {
// write may include a dropped frame. // write may include a dropped frame.
func (p *packer) Write(frame []byte) (int, error) { func (p *packer) Write(frame []byte) (int, error) {
if len(frame) > ringBufferElementSize { if len(frame) > ringBufferElementSize {
p.owner.config.Logger.Log(smartlogger.Warning, pkg+"frame was too big", "frame size", len(frame)) p.owner.config.Logger.Log(logger.Warning, pkg+"frame was too big", "frame size", len(frame))
return len(frame), nil return len(frame), nil
} }
n, err := p.owner.buffer.Write(frame) n, err := p.owner.buffer.Write(frame)
@ -160,10 +160,10 @@ func (p *packer) Write(frame []byte) (int, error) {
} }
if err != nil { if err != nil {
if err == ring.ErrDropped { if err == ring.ErrDropped {
p.owner.config.Logger.Log(smartlogger.Warning, pkg+"dropped frame", "frame size", len(frame)) p.owner.config.Logger.Log(logger.Warning, pkg+"dropped frame", "frame size", len(frame))
return len(frame), nil return len(frame), nil
} }
p.owner.config.Logger.Log(smartlogger.Error, pkg+"unexpected ring buffer write error", "error", err.Error()) p.owner.config.Logger.Log(logger.Error, pkg+"unexpected ring buffer write error", "error", err.Error())
return n, err return n, err
} }
p.packetCount++ p.packetCount++
@ -273,10 +273,10 @@ func (r *Revid) reset(config Config) error {
} }
switch r.config.InputCodec { switch r.config.InputCodec {
case H264: case H264:
r.config.Logger.Log(smartlogger.Info, pkg+"using H264 lexer") r.config.Logger.Log(logger.Info, pkg+"using H264 lexer")
r.lexTo = lex.H264 r.lexTo = lex.H264
case Mjpeg: case Mjpeg:
r.config.Logger.Log(smartlogger.Info, pkg+"using MJPEG lexer") r.config.Logger.Log(logger.Info, pkg+"using MJPEG lexer")
r.lexTo = lex.MJPEG r.lexTo = lex.MJPEG
} }
@ -298,10 +298,10 @@ func (r *Revid) reset(config Config) error {
} }
r.encoder = stream.NopEncoder(&r.packer) r.encoder = stream.NopEncoder(&r.packer)
case Mpegts: case Mpegts:
r.config.Logger.Log(smartlogger.Info, pkg+"using MPEGTS packetisation") r.config.Logger.Log(logger.Info, pkg+"using MPEGTS packetisation")
r.encoder = mts.NewEncoder(&r.packer, float64(r.config.FrameRate)) r.encoder = mts.NewEncoder(&r.packer, float64(r.config.FrameRate))
case Flv: case Flv:
r.config.Logger.Log(smartlogger.Info, pkg+"using FLV packetisation") r.config.Logger.Log(logger.Info, pkg+"using FLV packetisation")
r.encoder, err = flv.NewEncoder(&r.packer, true, true, int(r.config.FrameRate)) r.encoder, err = flv.NewEncoder(&r.packer, true, true, int(r.config.FrameRate))
if err != nil { if err != nil {
return err return err
@ -320,29 +320,29 @@ func (r *Revid) IsRunning() bool {
// and packetising (if theres packetization) to a defined output. // and packetising (if theres packetization) to a defined output.
func (r *Revid) Start() { func (r *Revid) Start() {
if r.isRunning { if r.isRunning {
r.config.Logger.Log(smartlogger.Warning, pkg+"revid.Start() called but revid already running") r.config.Logger.Log(logger.Warning, pkg+"revid.Start() called but revid already running")
return return
} }
r.config.Logger.Log(smartlogger.Info, pkg+"starting Revid") r.config.Logger.Log(logger.Info, pkg+"starting Revid")
r.config.Logger.Log(smartlogger.Debug, pkg+"setting up output") r.config.Logger.Log(logger.Debug, pkg+"setting up output")
r.isRunning = true r.isRunning = true
r.config.Logger.Log(smartlogger.Info, pkg+"starting output routine") r.config.Logger.Log(logger.Info, pkg+"starting output routine")
go r.outputClips() go r.outputClips()
r.config.Logger.Log(smartlogger.Info, pkg+"setting up input and receiving content") r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content")
go r.setupInput() go r.setupInput()
} }
// Stop halts any processing of video data from a camera or file // Stop halts any processing of video data from a camera or file
func (r *Revid) Stop() { func (r *Revid) Stop() {
if !r.isRunning { if !r.isRunning {
r.config.Logger.Log(smartlogger.Warning, pkg+"revid.Stop() called but revid not running") r.config.Logger.Log(logger.Warning, pkg+"revid.Stop() called but revid not running")
return return
} }
r.config.Logger.Log(smartlogger.Info, pkg+"stopping revid") r.config.Logger.Log(logger.Info, pkg+"stopping revid")
r.isRunning = false r.isRunning = false
r.config.Logger.Log(smartlogger.Info, pkg+"killing input proccess") r.config.Logger.Log(logger.Info, pkg+"killing input proccess")
// If a cmd process is running, we kill! // If a cmd process is running, we kill!
if r.cmd != nil && r.cmd.Process != nil { if r.cmd != nil && r.cmd.Process != nil {
r.cmd.Process.Kill() r.cmd.Process.Kill()
@ -362,53 +362,53 @@ loop:
case nil: case nil:
// Do nothing. // Do nothing.
case ring.ErrTimeout: case ring.ErrTimeout:
r.config.Logger.Log(smartlogger.Warning, pkg+"ring buffer read timeout") r.config.Logger.Log(logger.Warning, pkg+"ring buffer read timeout")
continue continue
default: default:
r.config.Logger.Log(smartlogger.Error, pkg+"unexpected error", "error", err.Error()) r.config.Logger.Log(logger.Error, pkg+"unexpected error", "error", err.Error())
fallthrough fallthrough
case io.EOF: case io.EOF:
break loop break loop
} }
count += chunk.Len() count += chunk.Len()
r.config.Logger.Log(smartlogger.Debug, pkg+"about to send") r.config.Logger.Log(logger.Debug, pkg+"about to send")
for i, dest := range r.destination { for i, dest := range r.destination {
err = dest.load(chunk) err = dest.load(chunk)
if err != nil { if err != nil {
r.config.Logger.Log(smartlogger.Error, pkg+"failed to load clip to output"+strconv.Itoa(i)) r.config.Logger.Log(logger.Error, pkg+"failed to load clip to output"+strconv.Itoa(i))
} }
} }
for i, dest := range r.destination { for i, dest := range r.destination {
err = dest.send() err = dest.send()
if err == nil { if err == nil {
r.config.Logger.Log(smartlogger.Debug, pkg+"sent clip to output "+strconv.Itoa(i)) r.config.Logger.Log(logger.Debug, pkg+"sent clip to output "+strconv.Itoa(i))
} else if r.config.SendRetry == false { } else if r.config.SendRetry == false {
r.config.Logger.Log(smartlogger.Warning, pkg+"send to output "+strconv.Itoa(i)+"failed", "error", err.Error()) r.config.Logger.Log(logger.Warning, pkg+"send to output "+strconv.Itoa(i)+"failed", "error", err.Error())
} else { } else {
r.config.Logger.Log(smartlogger.Error, pkg+"send to output "+strconv.Itoa(i)+ r.config.Logger.Log(logger.Error, pkg+"send to output "+strconv.Itoa(i)+
"failed, trying again", "error", err.Error()) "failed, trying again", "error", err.Error())
err = dest.send() err = dest.send()
if err != nil && chunk.Len() > 11 { if err != nil && chunk.Len() > 11 {
r.config.Logger.Log(smartlogger.Error, pkg+"second send attempted failed, restarting connection", "error", err.Error()) r.config.Logger.Log(logger.Error, pkg+"second send attempted failed, restarting connection", "error", err.Error())
for err != nil { for err != nil {
time.Sleep(sendFailedDelay) time.Sleep(sendFailedDelay)
if rs, ok := dest.(restarter); ok { if rs, ok := dest.(restarter); ok {
r.config.Logger.Log(smartlogger.Debug, pkg+"restarting session", "session", rs) r.config.Logger.Log(logger.Debug, pkg+"restarting session", "session", rs)
err = rs.restart() err = rs.restart()
if err != nil { if err != nil {
r.config.Logger.Log(smartlogger.Error, pkg+"failed to restart rtmp session", "error", err.Error()) r.config.Logger.Log(logger.Error, pkg+"failed to restart rtmp session", "error", err.Error())
r.isRunning = false r.isRunning = false
return return
} }
r.config.Logger.Log(smartlogger.Info, pkg+"restarted rtmp session") r.config.Logger.Log(logger.Info, pkg+"restarted rtmp session")
} }
err = dest.send() err = dest.send()
if err != nil { if err != nil {
r.config.Logger.Log(smartlogger.Error, pkg+"send failed again, with error", "error", err.Error()) r.config.Logger.Log(logger.Error, pkg+"send failed again, with error", "error", err.Error())
} }
} }
} }
@ -419,7 +419,7 @@ loop:
for _, dest := range r.destination { for _, dest := range r.destination {
dest.release() dest.release()
} }
r.config.Logger.Log(smartlogger.Debug, pkg+"done reading that clip from ring buffer") r.config.Logger.Log(logger.Debug, pkg+"done reading that clip from ring buffer")
// Log some information regarding bitrate and ring buffer size if it's time // Log some information regarding bitrate and ring buffer size if it's time
now := time.Now() now := time.Now()
@ -427,17 +427,17 @@ loop:
if deltaTime > bitrateTime { if deltaTime > bitrateTime {
// FIXME(kortschak): For subsecond deltaTime, this will give infinite bitrate. // FIXME(kortschak): For subsecond deltaTime, this will give infinite bitrate.
r.bitrate = int(float64(count*8) / float64(deltaTime/time.Second)) r.bitrate = int(float64(count*8) / float64(deltaTime/time.Second))
r.config.Logger.Log(smartlogger.Debug, pkg+"bitrate (bits/s)", "bitrate", r.bitrate) r.config.Logger.Log(logger.Debug, pkg+"bitrate (bits/s)", "bitrate", r.bitrate)
r.config.Logger.Log(smartlogger.Debug, pkg+"ring buffer size", "value", r.buffer.Len()) r.config.Logger.Log(logger.Debug, pkg+"ring buffer size", "value", r.buffer.Len())
lastTime = now lastTime = now
count = 0 count = 0
} }
} }
r.config.Logger.Log(smartlogger.Info, pkg+"not outputting clips anymore") r.config.Logger.Log(logger.Info, pkg+"not outputting clips anymore")
for i, dest := range r.destination { for i, dest := range r.destination {
err := dest.close() err := dest.close()
if err != nil { if err != nil {
r.config.Logger.Log(smartlogger.Error, pkg+"failed to close output"+strconv.Itoa(i)+" destination", "error", err.Error()) r.config.Logger.Log(logger.Error, pkg+"failed to close output"+strconv.Itoa(i)+" destination", "error", err.Error())
} }
} }
} }
@ -445,7 +445,7 @@ loop:
// startRaspivid sets up things for input from raspivid i.e. starts // startRaspivid sets up things for input from raspivid i.e. starts
// a raspivid process and pipes it's data output. // a raspivid process and pipes it's data output.
func (r *Revid) startRaspivid() error { func (r *Revid) startRaspivid() error {
r.config.Logger.Log(smartlogger.Info, pkg+"starting raspivid") r.config.Logger.Log(logger.Info, pkg+"starting raspivid")
const disabled = "0" const disabled = "0"
args := []string{ args := []string{
@ -478,7 +478,7 @@ func (r *Revid) startRaspivid() error {
case Mjpeg: case Mjpeg:
args = append(args, "--codec", "MJPEG") args = append(args, "--codec", "MJPEG")
} }
r.config.Logger.Log(smartlogger.Info, pkg+"raspivid args", "raspividArgs", strings.Join(args, " ")) r.config.Logger.Log(logger.Info, pkg+"raspivid args", "raspividArgs", strings.Join(args, " "))
r.cmd = exec.Command("raspivid", args...) r.cmd = exec.Command("raspivid", args...)
stdout, err := r.cmd.StdoutPipe() stdout, err := r.cmd.StdoutPipe()
@ -487,13 +487,13 @@ func (r *Revid) startRaspivid() error {
} }
err = r.cmd.Start() err = r.cmd.Start()
if err != nil { if err != nil {
r.config.Logger.Log(smartlogger.Fatal, pkg+"cannot start raspivid", "error", err.Error()) r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error())
} }
r.config.Logger.Log(smartlogger.Info, pkg+"reading camera data") r.config.Logger.Log(logger.Info, pkg+"reading camera data")
delay := time.Second / time.Duration(r.config.FrameRate) delay := time.Second / time.Duration(r.config.FrameRate)
err = r.lexTo(r.encoder, stdout, delay) err = r.lexTo(r.encoder, stdout, delay)
r.config.Logger.Log(smartlogger.Info, pkg+"finished reading camera data") r.config.Logger.Log(logger.Info, pkg+"finished reading camera data")
return err return err
} }
@ -503,7 +503,7 @@ func (r *Revid) setupInputForFile() error {
f, err := os.Open(r.config.InputFileName) f, err := os.Open(r.config.InputFileName)
if err != nil { if err != nil {
r.config.Logger.Log(smartlogger.Error, err.Error()) r.config.Logger.Log(logger.Error, err.Error())
r.Stop() r.Stop()
return err return err
} }

View File

@ -40,8 +40,8 @@ import (
"bitbucket.org/ausocean/av/stream/mts" "bitbucket.org/ausocean/av/stream/mts"
"bitbucket.org/ausocean/av/stream/rtp" "bitbucket.org/ausocean/av/stream/rtp"
"bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring" "bitbucket.org/ausocean/utils/ring"
"bitbucket.org/ausocean/utils/smartlogger"
) )
// loadSender is a destination to send a *ring.Chunk to. // loadSender is a destination to send a *ring.Chunk to.
@ -167,18 +167,18 @@ func (s *httpSender) extractMeta(r string) error {
// Extract time from reply // Extract time from reply
t, err := dec.Int("ts") t, err := dec.Int("ts")
if err != nil { if err != nil {
s.log(smartlogger.Warning, pkg+"No timestamp in reply") s.log(logger.Warning, pkg+"No timestamp in reply")
} else { } else {
s.log(smartlogger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t)) s.log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t))
mts.SetTimeStamp(uint64(t)) mts.SetTimeStamp(uint64(t))
} }
// Extract location from reply // Extract location from reply
g, err := dec.String("ll") g, err := dec.String("ll")
if err != nil { if err != nil {
s.log(smartlogger.Warning, pkg+"No location in reply") s.log(logger.Warning, pkg+"No location in reply")
} else { } else {
s.log(smartlogger.Debug, fmt.Sprintf("%v got location: %v", pkg, g)) s.log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g))
mts.SetLocation(g) mts.SetLocation(g)
} }
@ -270,10 +270,10 @@ func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg
if err == nil { if err == nil {
break break
} }
log(smartlogger.Error, err.Error()) log(logger.Error, err.Error())
sess.Close() sess.Close()
if n < retries-1 { if n < retries-1 {
log(smartlogger.Info, pkg+"retry rtmp connection") log(logger.Info, pkg+"retry rtmp connection")
} }
} }
if err != nil { if err != nil {
@ -316,10 +316,10 @@ func (s *rtmpSender) restart() error {
if err == nil { if err == nil {
break break
} }
s.log(smartlogger.Error, err.Error()) s.log(logger.Error, err.Error())
s.sess.Close() s.sess.Close()
if n < s.retries-1 { if n < s.retries-1 {
s.log(smartlogger.Info, pkg+"retry rtmp connection") s.log(logger.Info, pkg+"retry rtmp connection")
} }
} }
return err return err