Merged in av-logging-update (pull request #57)

av-logging-update

Approved-by: kortschak <dan@kortschak.io>
This commit is contained in:
Saxon Milton 2018-09-22 05:40:50 +00:00
commit be1bf3ef77
6 changed files with 219 additions and 177 deletions

View File

@ -30,8 +30,6 @@ package main
import (
"flag"
"fmt"
"log"
"os"
"runtime/pprof"
"strconv"
@ -47,7 +45,7 @@ const (
progName = "revid-cli"
// Logging is set to INFO level.
loggerVerbosity = 3
defaultLogVerbosity = smartlogger.Info
)
// Indexes for configFlags
@ -73,6 +71,7 @@ const (
intraRefreshPeriodPtr
verticalFlipPtr
horizontalFlipPtr
logPathPtr
noOfConfigFlags
)
@ -82,21 +81,29 @@ const (
netSendRetryTime = 5 * time.Second
defaultRunDuration = 24 * time.Hour
revidStopTime = 5 * time.Second
defaultLogPath = "/var/log/netsender/"
pkg = "revid-cli:"
)
// canProfile is set to false with revid-cli is built with "-tags profile".
var canProfile = true
// The logger that will be used throughout
var logger *smartlogger.Logger
// Globals
var (
rv *revid.Revid
config revid.Config
rv *revid.Revid
config revid.Config
configFlags = make([](*string), noOfConfigFlags)
)
func main() {
cpuprofile := flag.String("cpuprofile", "", "write cpu profile to `file`")
flagNames := [noOfConfigFlags]struct{ name, description string }{
// Flags
var (
cpuprofile = flag.String("cpuprofile", "", "write cpu profile to `file`")
useNetsender = flag.Bool("NetSender", false, "Are we checking vars through netsender?")
runDurationPtr = flag.Duration("runDuration", defaultRunDuration, "How long do you want revid to run for?")
flagNames = [noOfConfigFlags]struct{ name, description string }{
{"Input", "The input type: Raspivid, File"},
{"InputCodec", "The codec of the input: H264, Mjpeg"},
{"Output", "The output type: Http, Rtmp, File"},
@ -104,7 +111,7 @@ func main() {
// NOTE: we add rtp here when we have this functionality
{"Packetization", "The method of data packetisation: Flv, Mpegts, None"},
{"QuantizationMode", "Whether quantization if on or off (variable bitrate): On, Off"},
{"Verbosity", "Verbosity: On, Off"},
{"Verbosity", "Verbosity: Info, Warning, Error, Fatal"},
{"FramesPerClip", "Number of frames per clip sent"},
{"RtmpUrl", "Url of rtmp endpoint"},
{"Bitrate", "Bitrate of recorded video"},
@ -119,33 +126,59 @@ func main() {
{"IntraRefreshPeriod", "The IntraRefreshPeriod i.e. how many keyframes we send"},
{"VerticalFlip", "Flip video vertically: Yes, No"},
{"HorizontalFlip", "Flip video horizontally: Yes, No"},
{"LogPath", "Path for logging files (default is /var/log/netsender/)"},
}
)
func main() {
handleFlags()
if !*useNetsender {
// run revid for the specified duration
startRevid(nil)
time.Sleep(*runDurationPtr)
stopRevid()
return
}
// Create the configFlags based on the flagNames array
configFlags := make([](*string), noOfConfigFlags)
err := run()
if err != nil {
logger.Log(smartlogger.Fatal, pkg+"failed to run revid", "error", err.Error())
os.Exit(1)
}
}
// Handle flags interprets and validates command line flags and sets revid
// config etc accordingly
func handleFlags() {
// Create the configFlags based on the flagNames struct array
for i, f := range &flagNames {
configFlags[i] = flag.String(f.name, "", f.description)
}
// Do we want a netsender session
useNetsender := flag.Bool("NetSender", false, "Are we checking vars through netsender?")
// User might also want to define how long revid runs for
runDurationPtr := flag.Duration("runDuration", defaultRunDuration, "How long do you want revid to run for?")
flag.Parse()
logPath := defaultLogPath
if *configFlags[logPathPtr] != "" {
logPath = *configFlags[logPathPtr]
}
logger = smartlogger.New(defaultLogVerbosity, logPath)
config.Logger = logger
if *cpuprofile != "" {
if canProfile {
f, err := os.Create(*cpuprofile)
if err != nil {
log.Fatal("could not create CPU profile: ", err)
logger.Log(smartlogger.Fatal, pkg+"could not create CPU profile", "error", err.Error())
}
if err := pprof.StartCPUProfile(f); err != nil {
log.Fatal("could not start CPU profile: ", err)
logger.Log(smartlogger.Fatal, pkg+"could not start CPU profile", "error", err.Error())
}
defer pprof.StopCPUProfile()
} else {
fmt.Fprintln(os.Stderr, "Ignoring cpuprofile flag - http/pprof built in.")
logger.Log(smartlogger.Warning, pkg+"ignoring cpuprofile flag - http/pprof built in.")
}
}
@ -156,7 +189,7 @@ func main() {
config.Input = revid.File
case "":
default:
fmt.Println("Bad input argument!")
logger.Log(smartlogger.Error, pkg+"bad input argument")
}
switch *configFlags[inputCodecPtr] {
@ -164,7 +197,7 @@ func main() {
config.InputCodec = revid.H264Codec
case "":
default:
fmt.Println("Bad input codec argument!")
logger.Log(smartlogger.Error, pkg+"bad input codec argument")
}
switch *configFlags[outputPtr] {
@ -178,7 +211,7 @@ func main() {
config.Output = revid.FfmpegRtmp
case "":
default:
fmt.Println("Bad output argument!")
logger.Log(smartlogger.Error, pkg+"bad output argument")
}
switch *configFlags[rtmpMethodPtr] {
@ -188,7 +221,7 @@ func main() {
config.RtmpMethod = revid.LibRtmp
case "":
default:
fmt.Println("Bad rtmp method argument!")
logger.Log(smartlogger.Error, pkg+"bad rtmp method argument")
}
switch *configFlags[packetizationPtr] {
@ -200,7 +233,7 @@ func main() {
config.Packetization = revid.Flv
case "":
default:
fmt.Println("Bad packetization argument!")
logger.Log(smartlogger.Error, pkg+"bad packetization argument")
}
switch *configFlags[quantizationModePtr] {
@ -210,7 +243,7 @@ func main() {
config.QuantizationMode = revid.QuantizationOff
case "":
default:
fmt.Println("Bad quantization mode argument!")
logger.Log(smartlogger.Error, pkg+"bad quantization mode argument")
}
switch *configFlags[verbosityPtr] {
@ -220,7 +253,7 @@ func main() {
config.Verbosity = revid.Yes
case "":
default:
fmt.Println("Bad verbosity argument!")
logger.Log(smartlogger.Error, pkg+"bad verbosity argument")
}
switch *configFlags[horizontalFlipPtr] {
@ -231,7 +264,7 @@ func main() {
case "":
config.HorizontalFlip = revid.No
default:
fmt.Println("Bad horizontal flip option!")
logger.Log(smartlogger.Error, pkg+"bad horizontal flip option")
}
switch *configFlags[verticalFlipPtr] {
@ -242,7 +275,7 @@ func main() {
case "":
config.VerticalFlip = revid.No
default:
fmt.Println("Bad vertical flip option!")
logger.Log(smartlogger.Error, pkg+"bad vertical flip option")
}
fpc, err := strconv.Atoi(*configFlags[framesPerClipPtr])
@ -260,32 +293,16 @@ func main() {
config.Quantization = *configFlags[quantizationPtr]
config.Timeout = *configFlags[timeoutPtr]
config.IntraRefreshPeriod = *configFlags[intraRefreshPeriodPtr]
if !*useNetsender {
// instantiate our own logger
config.Logger = smartlogger.New(loggerVerbosity, smartlogger.File, "/var/log/netsender/")
// run revid for the specified duration
startRevid(nil)
time.Sleep(*runDurationPtr)
stopRevid()
return
}
err = run()
if err != nil {
config.Logger.Log(progName, "Error", err.Error()) // TODO(kortschak): Make this "Fatal" when that exists.
os.Exit(1)
}
}
// initialize then run the main NetSender client
func run() error {
// initialize NetSender and use NetSender's logger
//config.Logger = netsender.Logger()
config.Logger.Log(progName, "Info", "Running in NetSender mode")
logger.Log(smartlogger.Info, pkg+"running in NetSender mode")
var ns netsender.Sender
err := ns.Init(nil, nil, nil, nil)
err := ns.Init(logger, nil, nil, nil)
if err != nil {
return err
}
@ -304,7 +321,7 @@ func run() error {
for {
if err := send(&ns); err != nil {
config.Logger.Log(progName, "Warning", err.Error())
logger.Log(smartlogger.Error, pkg+"polling failed", "error", err.Error())
time.Sleep(netSendRetryTime)
continue
}
@ -313,14 +330,14 @@ func run() error {
// vars changed
vars, err := ns.Vars()
if err != nil {
config.Logger.Log(progName, "Warning", err.Error())
logger.Log(smartlogger.Error, pkg+"netSender failed to get vars", "error", err.Error())
time.Sleep(netSendRetryTime)
continue
}
vs = ns.VarSum()
if vars["mode"] == "Paused" {
if !paused {
config.Logger.Log(progName, "Info", "Pausing revid")
logger.Log(smartlogger.Info, pkg+"pausing revid")
stopRevid()
paused = true
}
@ -399,7 +416,7 @@ func updateRevid(ns *netsender.Sender, vars map[string]string, stop bool) error
case "FfmpegRtmp":
config.Output = revid.FfmpegRtmp
default:
rv.Log(revid.Warning, "Invalid Output param: "+value)
logger.Log(smartlogger.Warning, pkg+"invalid Output param", "value", value)
continue
}
case "FramesPerClip":
@ -407,7 +424,7 @@ func updateRevid(ns *netsender.Sender, vars map[string]string, stop bool) error
if fpc > 0 && err == nil {
config.FramesPerClip = fpc
} else {
rv.Log(revid.Warning, "Invalid FramesPerClip param: "+value)
logger.Log(smartlogger.Warning, pkg+"invalid FramesPerClip param", "value", value)
}
case "RtmpUrl":
config.RtmpUrl = value
@ -416,7 +433,7 @@ func updateRevid(ns *netsender.Sender, vars map[string]string, stop bool) error
if asInt > 0 && err == nil {
config.Bitrate = value
} else {
rv.Log(revid.Warning, "Invalid Bitrate param: "+value)
logger.Log(smartlogger.Warning, pkg+"invalid Bitrate param", "value", value)
}
case "OutputFileName":
config.OutputFileName = value
@ -427,21 +444,21 @@ func updateRevid(ns *netsender.Sender, vars map[string]string, stop bool) error
if asInt > 0 && err == nil {
config.Height = value
} else {
rv.Log(revid.Warning, "Invalid Height param: "+value)
logger.Log(smartlogger.Warning, pkg+"invalid Height param", "value", value)
}
case "Width":
asInt, err := strconv.Atoi(value)
if asInt > 0 && err == nil {
config.Width = value
} else {
rv.Log(revid.Warning, "Invalid Width param: "+value)
logger.Log(smartlogger.Warning, pkg+"invalid Width param", "value", value)
}
case "FrameRate":
asInt, err := strconv.Atoi(value)
if asInt > 0 && err == nil {
config.FrameRate = value
} else {
rv.Log(revid.Warning, "Invalid FrameRate param: "+value)
logger.Log(smartlogger.Warning, pkg+"invalid FrameRate param", "value", value)
}
case "HttpAddress":
config.HttpAddress = value
@ -450,7 +467,7 @@ func updateRevid(ns *netsender.Sender, vars map[string]string, stop bool) error
if asInt > 0 && err == nil {
config.Quantization = value
} else {
rv.Log(revid.Warning, "Invalid Quantization param: "+value)
logger.Log(smartlogger.Warning, pkg+"invalid Quantization param", "value", value)
}
case "Timeout":
asInt, err := strconv.Atoi(value)
@ -469,7 +486,7 @@ func updateRevid(ns *netsender.Sender, vars map[string]string, stop bool) error
case "No":
config.HorizontalFlip = revid.No
default:
rv.Log(revid.Warning, "Invalid HorizontalFlip param: "+value)
logger.Log(smartlogger.Warning, pkg+"invalid HorizontalFlip param", "value", value)
}
case "VerticalFlip":
switch value {
@ -478,7 +495,7 @@ func updateRevid(ns *netsender.Sender, vars map[string]string, stop bool) error
case "No":
config.VerticalFlip = revid.No
default:
rv.Log(revid.Warning, "Invalid VerticalFlip param: "+value)
logger.Log(smartlogger.Warning, pkg+"invalid VerticalFlip param", "value", value)
}
default:
}

View File

@ -33,12 +33,14 @@ import (
"time"
"bitbucket.org/ausocean/av/revid"
"bitbucket.org/ausocean/utils/smartlogger"
)
const (
inputFile = "../../../../test/test-data/av/input/betterInput.h264"
frameRate = "25"
runDuration = 120 * time.Second
logPath = "/var/log"
)
// Test h264 inputfile to flv format into rtmp using librtmp c wrapper
@ -59,10 +61,11 @@ func main() {
RtmpMethod: revid.LibRtmp,
RtmpUrl: *rtmpUrlPtr,
Packetization: revid.Flv,
Logger: smartlogger.New(smartlogger.Info, logPath),
}
revidInst, err := revid.New(config, nil)
if err != nil {
log.Printf("Should not of have got an error!: %v\n", err.Error())
config.Logger.Log(smartlogger.Error, "Should not have got an error!: ", err.Error())
return
}
revidInst.Start()

View File

@ -28,10 +28,10 @@ LICENSE
package main
import (
"log"
"time"
"bitbucket.org/ausocean/av/revid"
"bitbucket.org/ausocean/utils/smartlogger"
)
const (
@ -39,6 +39,7 @@ const (
outputFile = "output.ts"
frameRate = "25"
runDuration = 120 * time.Second
logPath = "/var/log"
)
// Test h264 inputfile to flv format into rtmp using librtmp c wrapper
@ -51,10 +52,11 @@ func main() {
Output: revid.File,
OutputFileName: outputFile,
Packetization: revid.Mpegts,
Logger: smartlogger.New(smartlogger.Info, logPath),
}
revidInst, err := revid.New(config, nil)
if err != nil {
log.Printf("Should not of have got an error!: %v\n", err.Error())
config.Logger.Log(smartlogger.Error, "Should not have got an error!:", err.Error())
return
}
revidInst.Start()

View File

@ -43,7 +43,7 @@ type Config struct {
RtmpMethod uint8
Packetization uint8
QuantizationMode uint8
Verbosity uint8
LogLevel int8
HorizontalFlip uint8
VerticalFlip uint8
FramesPerClip int
@ -58,7 +58,7 @@ type Config struct {
Quantization string
Timeout string
IntraRefreshPeriod string
Logger smartlogger.LogInstance
Logger Logger
}
// Enums for config struct
@ -99,39 +99,45 @@ const (
defaultFramesPerClip = 1
defaultVerticalFlip = No
defaultHorizontalFlip = No
httpFramesPerClip = 7
defaultInputCodec = H264
defaultVerbosity = No
)
// Validate checks for any errors in the config fields and defaults settings
// if particular parameters have not been defined.
func (c *Config) Validate(r *Revid) error {
switch c.Verbosity {
switch c.LogLevel {
case Yes:
case No:
case NothingDefined:
c.Verbosity = Yes
r.Log(Warning, "No verbosity mode defined, defaulting to no Verbosity!")
c.LogLevel = defaultVerbosity
c.Logger.Log(smartlogger.Warning, pkg+"no LogLevel mode defined, defaulting",
"LogLevel", defaultVerbosity)
default:
return errors.New("Bad Verbosity defined in config!")
return errors.New("bad LogLevel defined in config")
}
switch c.QuantizationMode {
case QuantizationOn:
case QuantizationOff:
case NothingDefined:
r.Log(Warning, "No quantization mode defined, defaulting to QuantizationOff!")
c.Logger.Log(smartlogger.Warning, pkg+"no quantization mode defined, defaulting",
"quantizationMode", QuantizationOff)
c.QuantizationMode = QuantizationOff
default:
return errors.New("Bad QuantizationMode defined in config!")
return errors.New("bad QuantizationMode defined in config")
}
switch c.Input {
case Raspivid:
case File:
case NothingDefined:
r.Log(Warning, "No input type defined, defaulting to raspivid!")
c.Logger.Log(smartlogger.Warning, pkg+"no input type defined, defaulting", "input",
defaultInput)
c.Input = defaultInput
default:
return errors.New("Bad input type defined in config!")
return errors.New("bad input type defined in config")
}
switch c.InputCodec {
@ -139,54 +145,59 @@ func (c *Config) Validate(r *Revid) error {
if c.Bitrate != "" && c.Quantization != "" {
bitrate, err := strconv.Atoi(c.Bitrate)
if err != nil {
return errors.New("Something is wrong with bitrate in conig!")
return errors.New("bitrate not an integer")
}
quantization, err := strconv.Atoi(c.Quantization)
if err != nil {
return errors.New("Something is wrong with quantization in config!")
return errors.New("quantization not an integer")
}
if (bitrate > 0 && quantization > 0) || (bitrate == 0 && quantization == 0) {
return errors.New("Bad bitrate and quantization combination for H264 input!")
return errors.New("bad bitrate and quantization combination for H264 input")
}
}
case Mjpeg:
if c.Quantization != "" {
quantization, err := strconv.Atoi(c.Quantization)
if err != nil {
return errors.New("Something is wrong with quantization in config!")
return errors.New("quantization not an integer")
}
if quantization > 0 || c.Bitrate == "" {
return errors.New("Bad bitrate or quantization for mjpeg input!")
return errors.New("bad bitrate or quantization for mjpeg input")
}
}
case NothingDefined:
r.Log(Warning, "No input codec defined, defaulting to h264!")
c.InputCodec = H264
r.Log(Warning, "Defaulting bitrate to 0 and quantization to 35!")
c.Logger.Log(smartlogger.Warning, pkg+"no input codec defined, defaulting",
"inputCodec", defaultInputCodec)
c.InputCodec = defaultInputCodec
c.Logger.Log(smartlogger.Warning, pkg+"defaulting quantization", "quantization",
defaultQuantization)
c.Quantization = defaultQuantization
default:
return errors.New("Bad input codec defined in config!")
return errors.New("bad input codec defined in config")
}
switch c.Output {
case File:
case Rtmp, FfmpegRtmp:
if c.RtmpUrl == "" {
r.Log(Info, "No RTMP URL: falling back to HTTP")
c.Logger.Log(smartlogger.Info, pkg+"no RTMP URL: falling back to HTTP")
c.Output = Http
break
}
r.Log(Info, "Defaulting frames per clip to 1 for rtmp output!")
c.FramesPerClip = 1
c.Logger.Log(smartlogger.Info, pkg+"defaulting frames per clip for rtmp out",
"framesPerClip", defaultFramesPerClip)
c.FramesPerClip = defaultFramesPerClip
case NothingDefined:
r.Log(Warning, "No output defined, defaulting to httpOut!")
c.Logger.Log(smartlogger.Warning, pkg+"no output defined, defaulting", "output",
defaultOutput)
c.Output = defaultOutput
fallthrough
case Http:
r.Log(Info, "Defaulting frames per clip to 7 for http output!")
c.FramesPerClip = 7
c.Logger.Log(smartlogger.Info, pkg+"defaulting frames per clip for http out",
"framesPerClip", httpFramesPerClip)
c.FramesPerClip = httpFramesPerClip
default:
return errors.New("Bad output type defined in config!")
return errors.New("bad output type defined in config")
}
switch c.Packetization {
@ -194,97 +205,107 @@ func (c *Config) Validate(r *Revid) error {
case Mpegts:
case Flv:
case NothingDefined:
r.Log(Warning, "No packetization option defined, defaulting to none!")
c.Packetization = Flv
c.Logger.Log(smartlogger.Warning, pkg+"no packetization option defined, defaulting",
"packetization", defaultPacketization)
c.Packetization = defaultPacketization
default:
return errors.New("Bad packetization option defined in config!")
return errors.New("bad packetization option defined in config")
}
switch c.HorizontalFlip {
case Yes:
case No:
case NothingDefined:
r.Log(Warning, "No horizontal flip option defined, defaulting to not flipped!")
c.Logger.Log(smartlogger.Warning, pkg+"no horizontal flip option defined, defaulting",
"horizontalFlip", defaultHorizontalFlip)
c.HorizontalFlip = defaultHorizontalFlip
default:
return errors.New("Bad horizontal flip option defined in config!")
return errors.New("bad horizontal flip option defined in config")
}
switch c.VerticalFlip {
case Yes:
case No:
case NothingDefined:
r.Log(Warning, "No vertical flip option defined, defaulting to not flipped!")
c.Logger.Log(smartlogger.Warning, pkg+"no vertical flip option defined, defaulting",
"verticalFlip", defaultVerticalFlip)
c.VerticalFlip = defaultVerticalFlip
default:
return errors.New("Bad vertical flip option defined in config!")
return errors.New("bad vertical flip option defined in config")
}
if c.FramesPerClip < 1 {
r.Log(Warning, "No FramesPerClip defined defined, defaulting to 1!")
c.Logger.Log(smartlogger.Warning, pkg+"no FramesPerClip defined, defaulting",
"framesPerClip", defaultFramesPerClip)
c.FramesPerClip = defaultFramesPerClip
}
if c.Width == "" {
r.Log(Warning, "No width defined, defaulting to 1280!")
c.Logger.Log(smartlogger.Warning, pkg+"no width defined, defaulting", "width",
defaultWidth)
c.Width = defaultWidth
} else {
if integer, err := strconv.Atoi(c.Width); integer < 0 || err != nil {
return errors.New("Bad width defined in config!")
return errors.New("width not unsigned integer")
}
}
if c.Height == "" {
r.Log(Warning, "No height defined, defaulting to 720!")
c.Logger.Log(smartlogger.Warning, pkg+"no height defined, defaulting", "height",
defaultHeight)
c.Height = defaultHeight
} else {
if integer, err := strconv.Atoi(c.Height); integer < 0 || err != nil {
return errors.New("Bad height defined in config!")
return errors.New("height not unsigned integer")
}
}
if c.FrameRate == "" {
r.Log(Warning, "No frame rate defined, defaulting to 25!")
c.Logger.Log(smartlogger.Warning, pkg+"no frame rate defined, defaulting", "fps",
defaultFrameRate)
c.FrameRate = defaultFrameRate
} else {
if integer, err := strconv.Atoi(c.FrameRate); integer < 0 || err != nil {
return errors.New("Bad frame rate defined in config!")
return errors.New("frame rate not unsigned integer")
}
}
if c.Bitrate == "" {
r.Log(Warning, "No bitrate defined, defaulting!")
c.Logger.Log(smartlogger.Warning, pkg+"no bitrate defined, defaulting", "bitrate",
defaultBitrate)
c.Bitrate = defaultBitrate
} else {
if integer, err := strconv.Atoi(c.Bitrate); integer < 0 || err != nil {
return errors.New("Bad bitrate defined in config!")
return errors.New("bitrate not unsigned integer")
}
}
if c.Timeout == "" {
r.Log(Warning, "No timeout defined, defaulting to 0!")
c.Logger.Log(smartlogger.Warning, pkg+"no timeout defined, defaulting", "timeout", defaultTimeout)
c.Timeout = defaultTimeout
} else {
if integer, err := strconv.Atoi(c.Timeout); integer < 0 || err != nil {
return errors.New("Bad timeout defined in config!")
return errors.New("timeout not unsigned integer")
}
}
if c.IntraRefreshPeriod == "" {
r.Log(Warning, "No intra refresh defined, defaulting to 100!")
c.Logger.Log(smartlogger.Warning, pkg+"no intra refresh defined, defaulting", "intraRefresh",
defaultIntraRefreshPeriod)
c.IntraRefreshPeriod = defaultIntraRefreshPeriod
} else {
if integer, err := strconv.Atoi(c.IntraRefreshPeriod); integer < 0 || err != nil {
return errors.New("Bad intra refresh defined in config!")
return errors.New("intra refresh not unsigned integer")
}
}
if c.Quantization == "" {
r.Log(Warning, "No quantization defined, defaulting to 35!")
c.Logger.Log(smartlogger.Warning, pkg+"no quantization defined, defaulting", "quantization",
defaultQuantization)
c.Quantization = defaultQuantization
} else {
if integer, err := strconv.Atoi(c.Quantization); integer < 0 || integer > 51 || err != nil {
return errors.New("Bad quantization defined in config!")
return errors.New("quantisation not unsigned integer or is over threshold")
}
}
return nil

View File

@ -31,11 +31,12 @@ package revid
import (
"errors"
"fmt"
_ "fmt"
"io"
"os"
"os/exec"
"strconv"
"strings"
"time"
"bitbucket.org/ausocean/av/rtmp"
@ -45,6 +46,7 @@ import (
"bitbucket.org/ausocean/av/stream/mts"
"bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/ring"
"bitbucket.org/ausocean/utils/smartlogger"
)
// Misc constants
@ -71,6 +73,7 @@ const (
raspividNoOfTries = 3
sendingWaitTime = 5 * time.Millisecond
runContinuously = "0" // -t arg to raspivid
pkg = "revid:"
)
// Log Types
@ -82,6 +85,11 @@ const (
Detail = "Detail"
)
type Logger interface {
SetLevel(int8)
Log(level int8, message string, params ...interface{})
}
// Revid provides methods to control a revid session; providing methods
// to start, stop and change the state of an instance using the Config struct.
type Revid struct {
@ -139,14 +147,14 @@ func (r *Revid) reset(config Config) error {
r.config.Logger = config.Logger
err := config.Validate(r)
if err != nil {
return errors.New("Config struct is bad!: " + err.Error())
return errors.New("Config struct is bad: " + err.Error())
}
r.config = config
if r.destination != nil {
err = r.destination.close()
if err != nil {
r.Log(Error, err.Error())
r.config.Logger.Log(smartlogger.Error, pkg+"could not close destination", "error", err.Error())
}
}
switch r.config.Output {
@ -163,13 +171,13 @@ func (r *Revid) reset(config Config) error {
}
r.destination = s
case Rtmp:
s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimout, rtmpConnectionMaxTries, r.Log)
s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimout, rtmpConnectionMaxTries, r.config.Logger.Log)
if err != nil {
return err
}
r.destination = s
case Http:
r.destination = newHttpSender(r.ns, r.Log)
r.destination = newHttpSender(r.ns, r.config.Logger.Log)
}
switch r.config.Input {
@ -180,10 +188,10 @@ func (r *Revid) reset(config Config) error {
}
switch r.config.InputCodec {
case H264:
r.Log(Info, "using H264 lexer")
r.config.Logger.Log(smartlogger.Info, pkg+"using H264 lexer")
r.lexTo = lex.H264
case Mjpeg:
r.Log(Info, "using MJPEG lexer")
r.config.Logger.Log(smartlogger.Info, pkg+"using MJPEG lexer")
r.lexTo = lex.MJPEG
}
@ -209,11 +217,11 @@ func (r *Revid) reset(config Config) error {
r.getFrame = r.getFrameNoPacketization
return nil
case Mpegts:
r.Log(Info, "Using MPEGTS packetisation")
r.config.Logger.Log(smartlogger.Info, pkg+"using MPEGTS packetisation")
frameRate, _ := strconv.ParseFloat(r.config.FrameRate, 64)
r.encoder = mts.NewEncoder(frameRate)
case Flv:
r.Log(Info, "Using FLV packetisation")
r.config.Logger.Log(smartlogger.Info, pkg+"using FLV packetisation")
frameRate, _ := strconv.Atoi(r.config.FrameRate)
r.encoder = flv.NewEncoder(true, true, frameRate)
}
@ -224,20 +232,6 @@ func (r *Revid) reset(config Config) error {
return nil
}
// Log takes a logtype and message and tries to send this information to the
// logger provided in the revid config - if there is one, otherwise the message
// is sent to stdout
func (r *Revid) Log(logType, m string) {
if r.config.Verbosity != Yes {
return
}
if r.config.Logger != nil {
r.config.Logger.Log("revid", logType, m)
return
}
fmt.Println(logType + ": " + m)
}
// IsRunning returns whether the receiver is running.
func (r *Revid) IsRunning() bool {
return r.isRunning
@ -247,31 +241,31 @@ func (r *Revid) IsRunning() bool {
// and packetising (if theres packetization) to a defined output.
func (r *Revid) Start() {
if r.isRunning {
r.Log(Warning, "Revid.Start() called but revid already running!")
r.config.Logger.Log(smartlogger.Warning, pkg+"revid.Start() called but revid already running")
return
}
r.Log(Info, "Starting Revid")
r.Log(Debug, "Setting up output")
r.config.Logger.Log(smartlogger.Info, pkg+"starting Revid")
r.config.Logger.Log(smartlogger.Debug, pkg+"setting up output")
r.isRunning = true
r.Log(Info, "Starting output routine")
r.config.Logger.Log(smartlogger.Info, pkg+"starting output routine")
go r.outputClips()
r.Log(Info, "Starting clip packing routine")
r.config.Logger.Log(smartlogger.Info, pkg+"starting clip packing routine")
go r.packClips()
r.Log(Info, "Setting up input and receiving content")
r.config.Logger.Log(smartlogger.Info, pkg+"setting up input and receiving content")
go r.setupInput()
}
// Stop halts any processing of video data from a camera or file
func (r *Revid) Stop() {
if !r.isRunning {
r.Log(Warning, "Revid.Stop() called but revid not running!")
r.config.Logger.Log(smartlogger.Warning, pkg+"revid.Stop() called but revid not running")
return
}
r.Log(Info, "Stopping revid!")
r.config.Logger.Log(smartlogger.Info, pkg+"stopping revid")
r.isRunning = false
r.Log(Info, "Killing input proccess!")
r.config.Logger.Log(smartlogger.Info, pkg+"killing input proccess")
// If a cmd process is running, we kill!
if r.cmd != nil && r.cmd.Process != nil {
r.cmd.Process.Kill()
@ -305,16 +299,17 @@ func (r *Revid) packClips() {
case frame := <-r.encoder.Stream():
lenOfFrame := len(frame)
if lenOfFrame > ringBufferElementSize {
r.Log(Warning, fmt.Sprintf("Frame was too big: %v bytes, getting another one!", lenOfFrame))
r.config.Logger.Log(smartlogger.Warning, pkg+"frame was too big", "frame size", lenOfFrame)
frame = r.getFrame()
lenOfFrame = len(frame)
}
_, err := r.ringBuffer.Write(frame)
if err != nil {
if err == ring.ErrDropped {
r.Log(Warning, fmt.Sprintf("dropped %d byte frame", len(frame)))
r.config.Logger.Log(smartlogger.Warning, pkg+"dropped frame", "frame size", len(frame))
} else {
r.Log(Error, err.Error())
r.config.Logger.Log(smartlogger.Error, pkg+"unexpected ringbuffer write error",
"error", err.Error())
}
}
packetCount++
@ -357,51 +352,51 @@ func (r *Revid) outputClips() {
}
bytes += chunk.Len()
r.Log(Detail, "About to send")
r.config.Logger.Log(smartlogger.Debug, pkg+"about to send")
err = r.destination.load(chunk)
if err != nil {
r.Log(Error, "failed to load clip")
r.config.Logger.Log(smartlogger.Error, pkg+"failed to load clip")
}
err = r.destination.send()
if err == nil {
r.Log(Detail, "sent clip")
r.config.Logger.Log(smartlogger.Debug, pkg+"sent clip")
}
if r.isRunning && err != nil && chunk.Len() > 11 {
r.Log(Debug, "Send failed! Trying again")
r.config.Logger.Log(smartlogger.Debug, pkg+"send failed, trying again")
// Try and send again
err = r.destination.send()
r.Log(Error, err.Error())
r.config.Logger.Log(smartlogger.Error, pkg+"destination send error", "error", err.Error())
// if there's still an error we try and reconnect, unless we're stopping
for r.isRunning && err != nil {
r.Log(Debug, "Send failed a again! Trying to reconnect...")
r.config.Logger.Log(smartlogger.Debug, pkg+"send failed a again, trying to reconnect...")
time.Sleep(time.Duration(sendFailedDelay) * time.Millisecond)
r.Log(Error, err.Error())
r.config.Logger.Log(smartlogger.Error, pkg+"send failed with error", "error", err.Error())
if rs, ok := r.destination.(restarter); ok {
r.Log(Debug, fmt.Sprintf("restarting %T session", rs))
r.config.Logger.Log(smartlogger.Debug, pkg+"restarting session", "session", rs)
err = rs.restart()
if err != nil {
// TODO(kortschak): Make this "Fatal" when that exists.
r.Log(Error, "failed to restart rtmp session")
r.config.Logger.Log(smartlogger.Error, pkg+"failed to restart rtmp session", "error", err.Error())
r.isRunning = false
return
}
r.Log(Info, "restarted rtmp session")
r.config.Logger.Log(smartlogger.Info, pkg+"restarted rtmp session")
}
r.Log(Debug, "Trying to send again with new connection...")
r.config.Logger.Log(smartlogger.Debug, pkg+"trying to send again with new connection")
err = r.destination.send()
if err != nil {
r.Log(Error, err.Error())
r.config.Logger.Log(smartlogger.Error, pkg+"send failed with error", "error", err.Error())
}
}
}
r.destination.release()
r.Log(Detail, "Done reading that clip from ringbuffer...")
r.config.Logger.Log(smartlogger.Debug, pkg+"done reading that clip from ringbuffer")
// Log some information regarding bitrate and ring buffer size if it's time
now = time.Now()
@ -409,23 +404,23 @@ func (r *Revid) outputClips() {
if deltaTime > bitrateTime {
// FIXME(kortschak): For subsecond deltaTime, this will give infinite bitrate.
r.bitrate = int(float64(bytes*8) / float64(deltaTime/time.Second))
r.Log(Debug, fmt.Sprintf("Bitrate: %v bits/s\n", r.bitrate))
r.Log(Debug, fmt.Sprintf("Ring buffer size: %v\n", r.ringBuffer.Len()))
r.config.Logger.Log(smartlogger.Debug, pkg+"bitrate (bits/s)", "bitrate", r.bitrate)
r.config.Logger.Log(smartlogger.Debug, pkg+"ring buffer size", "value", r.ringBuffer.Len())
prevTime = now
bytes = 0
}
}
r.Log(Info, "Not outputting clips anymore!")
r.config.Logger.Log(smartlogger.Info, pkg+"not outputting clips anymore")
err := r.destination.close()
if err != nil {
r.Log(Error, "failed to close destination")
r.config.Logger.Log(smartlogger.Error, pkg+"failed to close destination", "error", err.Error())
}
}
// startRaspivid sets up things for input from raspivid i.e. starts
// a raspivid process and pipes it's data output.
func (r *Revid) startRaspivid() error {
r.Log(Info, "Starting raspivid!")
r.config.Logger.Log(smartlogger.Info, pkg+"starting raspivid")
switch r.config.InputCodec {
case H264:
args := []string{
@ -440,6 +435,7 @@ func (r *Revid) startRaspivid() error {
"-ih",
"-g", r.config.IntraRefreshPeriod,
}
if r.config.QuantizationMode == QuantizationOn {
args = append(args, "-qp", r.config.Quantization)
}
@ -449,11 +445,13 @@ func (r *Revid) startRaspivid() error {
if r.config.VerticalFlip == Yes {
args = append(args, "-vf")
}
r.Log(Info, fmt.Sprintf("Starting raspivid with args: %v", args))
r.cmd = exec.Command("raspivid", args...)
// Log all the args and create []string
argsStr := strings.Join(args, " ")
r.config.Logger.Log(smartlogger.Info, pkg+"raspivid args", "raspividArgs", argsStr)
r.cmd = exec.Command("raspivid", argsStr)
case Mjpeg:
// FIXME(saxon): do above in this case too
r.cmd = exec.Command("raspivid",
"-cd", "MJPEG",
"-o", "-",
@ -472,9 +470,9 @@ func (r *Revid) startRaspivid() error {
}
r.inputReader = stdout
go func() {
r.Log(Info, "Reading camera data!")
r.config.Logger.Log(smartlogger.Info, pkg+"reading camera data")
r.lexTo(r.encoder, r.inputReader, 0)
r.Log(Info, "Not trying to read from camera anymore!")
r.config.Logger.Log(smartlogger.Info, pkg+"not trying to read from camera anymore")
}()
return nil
}
@ -489,7 +487,7 @@ func (r *Revid) setupInputForFile() error {
f, err := os.Open(r.config.InputFileName)
if err != nil {
r.Log(Error, err.Error())
r.config.Logger.Log(smartlogger.Error, err.Error())
r.Stop()
return err
}

View File

@ -36,6 +36,7 @@ import (
"bitbucket.org/ausocean/av/rtmp"
"bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/ring"
"bitbucket.org/ausocean/utils/smartlogger"
)
// loadSender is a destination to send a *ring.Chunk to.
@ -102,12 +103,12 @@ func (s *fileSender) close() error {
type httpSender struct {
client *netsender.Sender
log func(lvl, msg string)
log func(lvl int8, msg string, args ...interface{})
chunk *ring.Chunk
}
func newHttpSender(ns *netsender.Sender, log func(lvl, msg string)) *httpSender {
func newHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *httpSender {
return &httpSender{
client: ns,
log: log,
@ -214,14 +215,14 @@ type rtmpSender struct {
url string
timeout uint
retries int
log func(lvl, msg string)
log func(lvl int8, msg string, args ...interface{})
chunk *ring.Chunk
}
var _ restarter = (*rtmpSender)(nil)
func newRtmpSender(url string, timeout uint, retries int, log func(lvl, msg string)) (*rtmpSender, error) {
func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) {
var sess *rtmp.Session
var err error
for n := 0; n < retries; n++ {
@ -230,10 +231,10 @@ func newRtmpSender(url string, timeout uint, retries int, log func(lvl, msg stri
if err == nil {
break
}
log(Error, err.Error())
log(smartlogger.Error, err.Error())
sess.Close()
if n < retries-1 {
log(Info, "retry rtmp connection")
log(smartlogger.Info, pkg+"retry rtmp connection")
}
}
if err != nil {
@ -276,10 +277,10 @@ func (s *rtmpSender) restart() error {
if err == nil {
break
}
s.log(Error, err.Error())
s.log(smartlogger.Error, err.Error())
s.sess.Close()
if n < s.retries-1 {
s.log(Info, "retry rtmp connection")
s.log(smartlogger.Info, pkg+"retry rtmp connection")
}
}
return err