revid: fixed conflict with master regarding fatal log when raspivid doesn't start successfully

This commit is contained in:
saxon 2019-01-13 14:36:49 +10:30
commit efbc2a1a77
28 changed files with 2210 additions and 2511 deletions

View File

@ -33,11 +33,13 @@ import (
"os"
"runtime/pprof"
"strconv"
"strings"
"time"
"bitbucket.org/ausocean/av/revid"
"bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/smartlogger"
"bitbucket.org/ausocean/iot/pi/smartlogger"
"bitbucket.org/ausocean/utils/logger"
)
const (
@ -45,7 +47,7 @@ const (
progName = "revid-cli"
// Logging is set to INFO level.
defaultLogVerbosity = smartlogger.Debug
defaultLogVerbosity = logger.Debug
)
// Other misc consts
@ -61,7 +63,7 @@ const (
var canProfile = true
// The logger that will be used throughout
var logger *smartlogger.Logger
var log *logger.Logger
func main() {
useNetsender := flag.Bool("NetSender", false, "Are we checking vars through netsender?")
@ -73,7 +75,7 @@ func main() {
// run revid for the specified duration
rv, _, err := startRevid(nil, cfg)
if err != nil {
cfg.Logger.Log(smartlogger.Fatal, pkg+"failed to start revid", err.Error())
cfg.Logger.Log(logger.Fatal, pkg+"failed to start revid", err.Error())
}
time.Sleep(*runDurationPtr)
stopRevid(rv)
@ -82,7 +84,7 @@ func main() {
err := run(nil, cfg)
if err != nil {
logger.Log(smartlogger.Fatal, pkg+"failed to run revid", "error", err.Error())
log.Log(logger.Fatal, pkg+"failed to run revid", "error", err.Error())
os.Exit(1)
}
}
@ -101,44 +103,44 @@ func handleFlags() revid.Config {
output2Ptr = flag.String("Output2", "", "The second output type: Http, Rtmp, File, Udp, Rtp")
rtmpMethodPtr = flag.String("RtmpMethod", "", "The method used to send over rtmp: Ffmpeg, Librtmp")
packetizationPtr = flag.String("Packetization", "", "The method of data packetisation: Flv, Mpegts, None")
quantizationModePtr = flag.String("QuantizationMode", "", "Whether quantization if on or off (variable bitrate): On, Off")
quantizePtr = flag.Bool("Quantize", false, "Quantize input (non-variable bitrate)")
verbosityPtr = flag.String("Verbosity", "", "Verbosity: Info, Warning, Error, Fatal")
framesPerClipPtr = flag.String("FramesPerClip", "", "Number of frames per clip sent")
framesPerClipPtr = flag.Uint("FramesPerClip", 0, "Number of frames per clip sent")
rtmpUrlPtr = flag.String("RtmpUrl", "", "Url of rtmp endpoint")
bitratePtr = flag.String("Bitrate", "", "Bitrate of recorded video")
bitratePtr = flag.Uint("Bitrate", 0, "Bitrate of recorded video")
outputFileNamePtr = flag.String("OutputFileName", "", "The directory of the output file")
inputFileNamePtr = flag.String("InputFileName", "", "The directory of the input file")
heightPtr = flag.String("Height", "", "Height in pixels")
widthPtr = flag.String("Width", "", "Width in pixels")
frameRatePtr = flag.String("FrameRate", "", "Frame rate of captured video")
heightPtr = flag.Uint("Height", 0, "Height in pixels")
widthPtr = flag.Uint("Width", 0, "Width in pixels")
frameRatePtr = flag.Uint("FrameRate", 0, "Frame rate of captured video")
httpAddressPtr = flag.String("HttpAddress", "", "Destination address of http posts")
quantizationPtr = flag.String("Quantization", "", "Desired quantization value: 0-40")
timeoutPtr = flag.String("Timeout", "", "Http timeout in seconds")
intraRefreshPeriodPtr = flag.String("IntraRefreshPeriod", "", "The IntraRefreshPeriod i.e. how many keyframes we send")
verticalFlipPtr = flag.String("VerticalFlip", "", "Flip video vertically: Yes, No")
horizontalFlipPtr = flag.String("HorizontalFlip", "", "Flip video horizontally: Yes, No")
logPathPtr = flag.String("LogPath", defaultLogPath, "Path for logging files (default is /var/log/netsender/)")
quantizationPtr = flag.Uint("Quantization", 0, "Desired quantization value: 0-40")
intraRefreshPeriodPtr = flag.Uint("IntraRefreshPeriod", 0, "The IntraRefreshPeriod i.e. how many keyframes we send")
verticalFlipPtr = flag.Bool("VerticalFlip", false, "Flip video vertically: Yes, No")
horizontalFlipPtr = flag.Bool("HorizontalFlip", false, "Flip video horizontally: Yes, No")
rtpAddrPtr = flag.String("RtpAddr", "", "Rtp destination address: <IP>:<port> (port is generally 6970-6999)")
logPathPtr = flag.String("LogPath", defaultLogPath, "The log path")
configFilePtr = flag.String("ConfigFile", "", "NetSender config file")
)
flag.Parse()
logger = smartlogger.New(defaultLogVerbosity, *logPathPtr)
log = logger.New(defaultLogVerbosity, &smartlogger.New(*logPathPtr).LogRoller)
cfg.Logger = logger
cfg.Logger = log
if *cpuprofile != "" {
if canProfile {
f, err := os.Create(*cpuprofile)
if err != nil {
logger.Log(smartlogger.Fatal, pkg+"could not create CPU profile", "error", err.Error())
log.Log(logger.Fatal, pkg+"could not create CPU profile", "error", err.Error())
}
if err := pprof.StartCPUProfile(f); err != nil {
logger.Log(smartlogger.Fatal, pkg+"could not start CPU profile", "error", err.Error())
log.Log(logger.Fatal, pkg+"could not start CPU profile", "error", err.Error())
}
defer pprof.StopCPUProfile()
} else {
logger.Log(smartlogger.Warning, pkg+"ignoring cpuprofile flag - http/pprof built in.")
log.Log(logger.Warning, pkg+"ignoring cpuprofile flag - http/pprof built in.")
}
}
@ -149,7 +151,7 @@ func handleFlags() revid.Config {
cfg.Input = revid.File
case "":
default:
logger.Log(smartlogger.Error, pkg+"bad input argument")
log.Log(logger.Error, pkg+"bad input argument")
}
switch *inputCodecPtr {
@ -157,7 +159,7 @@ func handleFlags() revid.Config {
cfg.InputCodec = revid.H264
case "":
default:
logger.Log(smartlogger.Error, pkg+"bad input codec argument")
log.Log(logger.Error, pkg+"bad input codec argument")
}
switch *output1Ptr {
@ -175,7 +177,7 @@ func handleFlags() revid.Config {
cfg.Output1 = revid.Rtp
case "":
default:
logger.Log(smartlogger.Error, pkg+"bad output 1 argument")
log.Log(logger.Error, pkg+"bad output 1 argument")
}
switch *output2Ptr {
@ -193,7 +195,7 @@ func handleFlags() revid.Config {
cfg.Output2 = revid.Rtp
case "":
default:
logger.Log(smartlogger.Error, pkg+"bad output 2 argument")
log.Log(logger.Error, pkg+"bad output 2 argument")
}
switch *rtmpMethodPtr {
@ -203,68 +205,39 @@ func handleFlags() revid.Config {
cfg.RtmpMethod = revid.LibRtmp
case "":
default:
logger.Log(smartlogger.Error, pkg+"bad rtmp method argument")
log.Log(logger.Error, pkg+"bad rtmp method argument")
}
switch *packetizationPtr {
case "None":
case "", "None":
cfg.Packetization = revid.None
case "Mpegts":
cfg.Packetization = revid.Mpegts
case "Flv":
cfg.Packetization = revid.Flv
case "":
default:
logger.Log(smartlogger.Error, pkg+"bad packetization argument")
}
switch *quantizationModePtr {
case "QuantizationOn":
cfg.QuantizationMode = revid.QuantizationOn
case "QuantizationOff":
cfg.QuantizationMode = revid.QuantizationOff
case "":
default:
logger.Log(smartlogger.Error, pkg+"bad quantization mode argument")
log.Log(logger.Error, pkg+"bad packetization argument")
}
switch *verbosityPtr {
case "No":
cfg.LogLevel = smartlogger.Fatal
cfg.LogLevel = logger.Fatal
case "Debug":
cfg.LogLevel = smartlogger.Debug
//logger.SetLevel(smartlogger.Debug)
cfg.LogLevel = logger.Debug
//logger.SetLevel(logger.Debug)
case "":
default:
logger.Log(smartlogger.Error, pkg+"bad verbosity argument")
log.Log(logger.Error, pkg+"bad verbosity argument")
}
switch *horizontalFlipPtr {
case "No":
cfg.FlipHorizontal = false
case "Yes":
cfg.FlipHorizontal = true
case "":
cfg.FlipHorizontal = false
default:
logger.Log(smartlogger.Error, pkg+"bad horizontal flip option")
if *configFilePtr != "" {
netsender.ConfigFile = *configFilePtr
}
switch *verticalFlipPtr {
case "No":
cfg.FlipVertical = false
case "Yes":
cfg.FlipVertical = true
case "":
cfg.FlipVertical = false
default:
logger.Log(smartlogger.Error, pkg+"bad vertical flip option")
}
fpc, err := strconv.Atoi(*framesPerClipPtr)
if err == nil && fpc > 0 {
cfg.FramesPerClip = fpc
}
cfg.Quantize = *quantizePtr
cfg.FlipHorizontal = *horizontalFlipPtr
cfg.FlipVertical = *verticalFlipPtr
cfg.FramesPerClip = *framesPerClipPtr
cfg.RtmpUrl = *rtmpUrlPtr
cfg.Bitrate = *bitratePtr
cfg.OutputFileName = *outputFileNamePtr
@ -274,7 +247,6 @@ func handleFlags() revid.Config {
cfg.FrameRate = *frameRatePtr
cfg.HttpAddress = *httpAddressPtr
cfg.Quantization = *quantizationPtr
cfg.Timeout = *timeoutPtr
cfg.IntraRefreshPeriod = *intraRefreshPeriodPtr
cfg.RtpAddress = *rtpAddrPtr
@ -285,10 +257,10 @@ func handleFlags() revid.Config {
func run(rv *revid.Revid, cfg revid.Config) error {
// initialize NetSender and use NetSender's logger
//config.Logger = netsender.Logger()
logger.Log(smartlogger.Info, pkg+"running in NetSender mode")
log.Log(logger.Info, pkg+"running in NetSender mode")
var ns netsender.Sender
err := ns.Init(logger, nil, nil, nil)
err := ns.Init(log, nil, nil, nil)
if err != nil {
return err
}
@ -307,7 +279,7 @@ func run(rv *revid.Revid, cfg revid.Config) error {
for {
if err := send(&ns, rv); err != nil {
logger.Log(smartlogger.Error, pkg+"polling failed", "error", err.Error())
log.Log(logger.Error, pkg+"polling failed", "error", err.Error())
time.Sleep(netSendRetryTime)
continue
}
@ -316,14 +288,14 @@ func run(rv *revid.Revid, cfg revid.Config) error {
// vars changed
vars, err := ns.Vars()
if err != nil {
logger.Log(smartlogger.Error, pkg+"netSender failed to get vars", "error", err.Error())
log.Log(logger.Error, pkg+"netSender failed to get vars", "error", err.Error())
time.Sleep(netSendRetryTime)
continue
}
vs = ns.VarSum()
if vars["mode"] == "Paused" {
if !paused {
logger.Log(smartlogger.Info, pkg+"pausing revid")
log.Log(logger.Info, pkg+"pausing revid")
stopRevid(rv)
paused = true
}
@ -402,86 +374,83 @@ func updateRevid(ns *netsender.Sender, rv *revid.Revid, cfg revid.Config, vars m
case "FfmpegRtmp":
cfg.Output1 = revid.FfmpegRtmp
default:
logger.Log(smartlogger.Warning, pkg+"invalid Output1 param", "value", value)
log.Log(logger.Warning, pkg+"invalid Output1 param", "value", value)
continue
}
case "FramesPerClip":
fpc, err := strconv.Atoi(value)
if fpc > 0 && err == nil {
cfg.FramesPerClip = fpc
} else {
logger.Log(smartlogger.Warning, pkg+"invalid FramesPerClip param", "value", value)
f, err := strconv.ParseUint(value, 10, 0)
if err != nil {
log.Log(logger.Warning, pkg+"invalid framesperclip param", "value", value)
break
}
cfg.FramesPerClip = uint(f)
case "RtmpUrl":
cfg.RtmpUrl = value
case "Bitrate":
asInt, err := strconv.Atoi(value)
if asInt > 0 && err == nil {
cfg.Bitrate = value
} else {
logger.Log(smartlogger.Warning, pkg+"invalid Bitrate param", "value", value)
r, err := strconv.ParseUint(value, 10, 0)
if err != nil {
log.Log(logger.Warning, pkg+"invalid framerate param", "value", value)
break
}
cfg.Bitrate = uint(r)
case "OutputFileName":
cfg.OutputFileName = value
case "InputFileName":
cfg.InputFileName = value
case "Height":
asInt, err := strconv.Atoi(value)
if asInt > 0 && err == nil {
cfg.Height = value
} else {
logger.Log(smartlogger.Warning, pkg+"invalid Height param", "value", value)
h, err := strconv.ParseUint(value, 10, 0)
if err != nil {
log.Log(logger.Warning, pkg+"invalid height param", "value", value)
break
}
cfg.Height = uint(h)
case "Width":
asInt, err := strconv.Atoi(value)
if asInt > 0 && err == nil {
cfg.Width = value
} else {
logger.Log(smartlogger.Warning, pkg+"invalid Width param", "value", value)
w, err := strconv.ParseUint(value, 10, 0)
if err != nil {
log.Log(logger.Warning, pkg+"invalid width param", "value", value)
break
}
cfg.Width = uint(w)
case "FrameRate":
asInt, err := strconv.Atoi(value)
if asInt > 0 && err == nil {
cfg.FrameRate = value
} else {
logger.Log(smartlogger.Warning, pkg+"invalid FrameRate param", "value", value)
r, err := strconv.ParseUint(value, 10, 0)
if err != nil {
log.Log(logger.Warning, pkg+"invalid framerate param", "value", value)
break
}
cfg.FrameRate = uint(r)
case "HttpAddress":
cfg.HttpAddress = value
case "Quantization":
asInt, err := strconv.Atoi(value)
if asInt > 0 && err == nil {
cfg.Quantization = value
} else {
logger.Log(smartlogger.Warning, pkg+"invalid Quantization param", "value", value)
}
case "Timeout":
asInt, err := strconv.Atoi(value)
if asInt > 0 && err == nil {
cfg.Timeout = value
q, err := strconv.ParseUint(value, 10, 0)
if err != nil {
log.Log(logger.Warning, pkg+"invalid quantization param", "value", value)
break
}
cfg.Quantization = uint(q)
case "IntraRefreshPeriod":
asInt, err := strconv.Atoi(value)
if asInt > 0 && err == nil {
cfg.IntraRefreshPeriod = value
p, err := strconv.ParseUint(value, 10, 0)
if err != nil {
log.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value)
break
}
cfg.IntraRefreshPeriod = uint(p)
case "HorizontalFlip":
switch value {
case "Yes":
switch strings.ToLower(value) {
case "true":
cfg.FlipHorizontal = true
case "No":
case "false":
cfg.FlipHorizontal = false
default:
logger.Log(smartlogger.Warning, pkg+"invalid HorizontalFlip param", "value", value)
log.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value)
}
case "VerticalFlip":
switch value {
case "Yes":
switch strings.ToLower(value) {
case "true":
cfg.FlipVertical = true
case "No":
case "false":
cfg.FlipVertical = false
default:
logger.Log(smartlogger.Warning, pkg+"invalid VerticalFlip param", "value", value)
log.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value)
}
default:
}

View File

@ -33,7 +33,8 @@ import (
"time"
"bitbucket.org/ausocean/av/revid"
"bitbucket.org/ausocean/utils/smartlogger"
"bitbucket.org/ausocean/iot/pi/smartlogger"
"bitbucket.org/ausocean/utils/logger"
)
const (
@ -61,11 +62,11 @@ func main() {
RtmpMethod: revid.LibRtmp,
RtmpUrl: *rtmpUrlPtr,
Packetization: revid.Flv,
Logger: smartlogger.New(smartlogger.Info, logPath),
Logger: logger.New(logger.Info, &smartlogger.New(logPath).LogRoller),
}
revidInst, err := revid.New(config, nil)
if err != nil {
config.Logger.Log(smartlogger.Error, "Should not have got an error!: ", err.Error())
config.Logger.Log(logger.Error, "Should not have got an error!: ", err.Error())
return
}
revidInst.Start()

View File

@ -31,7 +31,8 @@ import (
"time"
"bitbucket.org/ausocean/av/revid"
"bitbucket.org/ausocean/utils/smartlogger"
"bitbucket.org/ausocean/iot/pi/smartlogger"
"bitbucket.org/ausocean/utils/logger"
)
const (
@ -52,11 +53,11 @@ func main() {
Output1: revid.File,
OutputFileName: outputFile,
Packetization: revid.Mpegts,
Logger: smartlogger.New(smartlogger.Info, logPath),
Logger: logger.New(logger.Info, &smartlogger.New(logPath).LogRoller),
}
revidInst, err := revid.New(config, nil)
if err != nil {
config.Logger.Log(smartlogger.Error, "Should not have got an error!:", err.Error())
config.Logger.Log(logger.Error, "Should not have got an error!:", err.Error())
return
}
revidInst.Start()

View File

@ -29,40 +29,43 @@ package revid
import (
"errors"
"strconv"
"bitbucket.org/ausocean/utils/smartlogger"
"bitbucket.org/ausocean/utils/logger"
)
// Config provides parameters relevant to a revid instance. A new config must
// be passed to the constructor.
type Config struct {
LogLevel int8
Input uint8
InputCodec uint8
Output1 uint8
Output2 uint8
RtmpMethod uint8
Packetization uint8
QuantizationMode uint8
LogLevel int8
// Quantize specifies whether the input to
// revid will have constant or variable
// bitrate.
Quantize bool
// FlipHorizonatla and FlipVertical specify
// whether video frames should be flipped.
FlipHorizontal bool
FlipVertical bool
FramesPerClip int
FramesPerClip uint
RtmpUrl string
Bitrate string
Bitrate uint
OutputFileName string
InputFileName string
Height string
Width string
FrameRate string
Height uint
Width uint
FrameRate uint
HttpAddress string
Quantization string
Timeout string
IntraRefreshPeriod string
Quantization uint
IntraRefreshPeriod uint
RtpAddress string
Logger Logger
SendRetry bool
@ -98,20 +101,18 @@ const (
defaultInput = Raspivid
defaultOutput = Http
defaultPacketization = Flv
defaultFrameRate = "25"
defaultWidth = "1280"
defaultHeight = "720"
defaultIntraRefreshPeriod = "100"
defaultTimeout = "0"
defaultQuantization = "40"
defaultBitrate = "400000"
defaultFrameRate = 25
defaultWidth = 1280
defaultHeight = 720
defaultIntraRefreshPeriod = 100
defaultTimeout = 0
defaultQuantization = 40
defaultBitrate = 400000
defaultQuantizationMode = QuantizationOff
defaultFramesPerClip = 1
defaultVerticalFlip = No
defaultHorizontalFlip = No
httpFramesPerClip = 560
defaultInputCodec = H264
defaultVerbosity = No
defaultVerbosity = No // FIXME(kortschak): This makes no sense whatsoever. No is currently 15.
defaultRtpAddr = "localhost:6970"
)
@ -123,28 +124,17 @@ func (c *Config) Validate(r *Revid) error {
case No:
case NothingDefined:
c.LogLevel = defaultVerbosity
c.Logger.Log(smartlogger.Warning, pkg+"no LogLevel mode defined, defaulting",
c.Logger.Log(logger.Warning, pkg+"no LogLevel mode defined, defaulting",
"LogLevel", defaultVerbosity)
default:
return errors.New("bad LogLevel defined in config")
}
switch c.QuantizationMode {
case QuantizationOn:
case QuantizationOff:
case NothingDefined:
c.Logger.Log(smartlogger.Warning, pkg+"no quantization mode defined, defaulting",
"quantizationMode", QuantizationOff)
c.QuantizationMode = QuantizationOff
default:
return errors.New("bad QuantizationMode defined in config")
}
switch c.Input {
case Raspivid:
case File:
case NothingDefined:
c.Logger.Log(smartlogger.Warning, pkg+"no input type defined, defaulting", "input",
c.Logger.Log(logger.Warning, pkg+"no input type defined, defaulting", "input",
defaultInput)
c.Input = defaultInput
default:
@ -153,36 +143,31 @@ func (c *Config) Validate(r *Revid) error {
switch c.InputCodec {
case H264:
if c.Bitrate != "" && c.Quantization != "" {
bitrate, err := strconv.Atoi(c.Bitrate)
if err != nil {
return errors.New("bitrate not an integer")
// FIXME(kortschak): This is not really what we want.
// Configuration really needs to be rethought here.
if c.Quantize && c.Quantization == 0 {
c.Quantization = defaultQuantization
} else {
c.Bitrate = defaultBitrate
}
quantization, err := strconv.Atoi(c.Quantization)
if err != nil {
return errors.New("quantization not an integer")
}
if (bitrate > 0 && quantization > 0) || (bitrate == 0 && quantization == 0) {
if (c.Bitrate > 0 && c.Quantization > 0) || (c.Bitrate == 0 && c.Quantization == 0) {
return errors.New("bad bitrate and quantization combination for H264 input")
}
}
case Mjpeg:
if c.Quantization != "" {
quantization, err := strconv.Atoi(c.Quantization)
if err != nil {
return errors.New("quantization not an integer")
}
if quantization > 0 || c.Bitrate == "" {
if c.Quantization > 0 || c.Bitrate == 0 {
return errors.New("bad bitrate or quantization for mjpeg input")
}
}
case NothingDefined:
c.Logger.Log(smartlogger.Warning, pkg+"no input codec defined, defaulting",
c.Logger.Log(logger.Warning, pkg+"no input codec defined, defaulting",
"inputCodec", defaultInputCodec)
c.InputCodec = defaultInputCodec
c.Logger.Log(smartlogger.Warning, pkg+"defaulting quantization", "quantization",
c.Logger.Log(logger.Warning, pkg+"defaulting quantization", "quantization",
defaultQuantization)
c.Quantization = defaultQuantization
default:
return errors.New("bad input codec defined in config")
}
@ -192,20 +177,20 @@ func (c *Config) Validate(r *Revid) error {
case Udp:
case Rtmp, FfmpegRtmp:
if c.RtmpUrl == "" {
c.Logger.Log(smartlogger.Info, pkg+"no RTMP URL: falling back to HTTP")
c.Logger.Log(logger.Info, pkg+"no RTMP URL: falling back to HTTP")
c.Output1 = Http
break
}
c.Logger.Log(smartlogger.Info, pkg+"defaulting frames per clip for rtmp out",
c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for rtmp out",
"framesPerClip", defaultFramesPerClip)
c.FramesPerClip = defaultFramesPerClip
case NothingDefined:
c.Logger.Log(smartlogger.Warning, pkg+"no output defined, defaulting", "output",
c.Logger.Log(logger.Warning, pkg+"no output defined, defaulting", "output",
defaultOutput)
c.Output1 = defaultOutput
fallthrough
case Http, Rtp:
c.Logger.Log(smartlogger.Info, pkg+"defaulting frames per clip for http out",
c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for http out",
"framesPerClip", httpFramesPerClip)
c.FramesPerClip = httpFramesPerClip
default:
@ -218,7 +203,7 @@ func (c *Config) Validate(r *Revid) error {
case Udp:
case Rtmp, FfmpegRtmp:
if c.RtmpUrl == "" {
c.Logger.Log(smartlogger.Info, pkg+"no RTMP URL: falling back to HTTP")
c.Logger.Log(logger.Info, pkg+"no RTMP URL: falling back to HTTP")
c.Output2 = Http
break
}
@ -229,78 +214,41 @@ func (c *Config) Validate(r *Revid) error {
}
if c.FramesPerClip < 1 {
c.Logger.Log(smartlogger.Warning, pkg+"no FramesPerClip defined, defaulting",
c.Logger.Log(logger.Warning, pkg+"no FramesPerClip defined, defaulting",
"framesPerClip", defaultFramesPerClip)
c.FramesPerClip = defaultFramesPerClip
}
if c.Width == "" {
c.Logger.Log(smartlogger.Warning, pkg+"no width defined, defaulting", "width",
defaultWidth)
if c.Width == 0 {
c.Logger.Log(logger.Warning, pkg+"no width defined, defaulting", "width", defaultWidth)
c.Width = defaultWidth
} else {
if integer, err := strconv.Atoi(c.Width); integer < 0 || err != nil {
return errors.New("width not unsigned integer")
}
}
if c.Height == "" {
c.Logger.Log(smartlogger.Warning, pkg+"no height defined, defaulting", "height",
defaultHeight)
if c.Height == 0 {
c.Logger.Log(logger.Warning, pkg+"no height defined, defaulting", "height", defaultHeight)
c.Height = defaultHeight
} else {
if integer, err := strconv.Atoi(c.Height); integer < 0 || err != nil {
return errors.New("height not unsigned integer")
}
}
if c.FrameRate == "" {
c.Logger.Log(smartlogger.Warning, pkg+"no frame rate defined, defaulting", "fps",
defaultFrameRate)
if c.FrameRate == 0 {
c.Logger.Log(logger.Warning, pkg+"no frame rate defined, defaulting", "fps", defaultFrameRate)
c.FrameRate = defaultFrameRate
} else {
if integer, err := strconv.Atoi(c.FrameRate); integer < 0 || err != nil {
return errors.New("frame rate not unsigned integer")
}
}
if c.Bitrate == "" {
c.Logger.Log(smartlogger.Warning, pkg+"no bitrate defined, defaulting", "bitrate",
defaultBitrate)
if c.Bitrate == 0 {
c.Logger.Log(logger.Warning, pkg+"no bitrate defined, defaulting", "bitrate", defaultBitrate)
c.Bitrate = defaultBitrate
} else {
if integer, err := strconv.Atoi(c.Bitrate); integer < 0 || err != nil {
return errors.New("bitrate not unsigned integer")
}
}
if c.Timeout == "" {
c.Logger.Log(smartlogger.Warning, pkg+"no timeout defined, defaulting", "timeout", defaultTimeout)
c.Timeout = defaultTimeout
} else {
if integer, err := strconv.Atoi(c.Timeout); integer < 0 || err != nil {
return errors.New("timeout not unsigned integer")
}
}
if c.IntraRefreshPeriod == "" {
c.Logger.Log(smartlogger.Warning, pkg+"no intra refresh defined, defaulting", "intraRefresh",
defaultIntraRefreshPeriod)
if c.IntraRefreshPeriod == 0 {
c.Logger.Log(logger.Warning, pkg+"no intra refresh defined, defaulting", "intraRefresh", defaultIntraRefreshPeriod)
c.IntraRefreshPeriod = defaultIntraRefreshPeriod
} else {
if integer, err := strconv.Atoi(c.IntraRefreshPeriod); integer < 0 || err != nil {
return errors.New("intra refresh not unsigned integer")
}
}
if c.Quantization == "" {
c.Logger.Log(smartlogger.Warning, pkg+"no quantization defined, defaulting", "quantization",
defaultQuantization)
if c.Quantization == 0 {
c.Logger.Log(logger.Warning, pkg+"no quantization defined, defaulting", "quantization", defaultQuantization)
c.Quantization = defaultQuantization
} else {
if integer, err := strconv.Atoi(c.Quantization); integer < 0 || integer > 51 || err != nil {
return errors.New("quantisation not unsigned integer or is over threshold")
}
} else if c.Quantization > 51 {
return errors.New("quantisation is over threshold")
}
if c.RtpAddress == "" {

View File

@ -44,8 +44,8 @@ import (
"bitbucket.org/ausocean/av/stream/lex"
"bitbucket.org/ausocean/av/stream/mts"
"bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring"
"bitbucket.org/ausocean/utils/smartlogger"
)
// Misc constants
@ -125,15 +125,12 @@ type Revid struct {
isRunning bool
}
var now = time.Now()
var prevTime = now
// packer takes data segments and packs them into clips
// of the number frames specified in the owners config.
type packer struct {
owner *Revid
packetCount int
lastTime time.Time
packetCount uint
}
// Write implements the io.Writer interface.
@ -143,24 +140,24 @@ type packer struct {
// write may include a dropped frame.
func (p *packer) Write(frame []byte) (int, error) {
if len(frame) > ringBufferElementSize {
p.owner.config.Logger.Log(smartlogger.Warning, pkg+"frame was too big", "frame size", len(frame))
p.owner.config.Logger.Log(logger.Warning, pkg+"frame was too big", "frame size", len(frame))
return len(frame), nil
}
n, err := p.owner.buffer.Write(frame)
if err != nil {
if err == ring.ErrDropped {
p.owner.config.Logger.Log(smartlogger.Warning, pkg+"dropped frame", "frame size", len(frame))
p.owner.config.Logger.Log(logger.Warning, pkg+"dropped frame", "frame size", len(frame))
return len(frame), nil
}
p.owner.config.Logger.Log(smartlogger.Error, pkg+"unexpected ring buffer write error", "error", err.Error())
p.owner.config.Logger.Log(logger.Error, pkg+"unexpected ring buffer write error", "error", err.Error())
return n, err
}
p.packetCount++
now = time.Now()
if now.Sub(prevTime) > clipDuration && p.packetCount%7 == 0 {
now := time.Now()
if (p.owner.config.Output1 != Rtmp && now.Sub(p.lastTime) > clipDuration && p.packetCount%7 == 0) || p.owner.config.Output1 == Rtmp {
p.owner.buffer.Flush()
p.packetCount = 0
prevTime = now
p.lastTime = now
}
return len(frame), nil
}
@ -227,7 +224,7 @@ func (r *Revid) reset(config Config) error {
}
r.destination[outNo] = s
case FfmpegRtmp:
s, err := newFfmpegSender(config.RtmpUrl, r.config.FrameRate)
s, err := newFfmpegSender(config.RtmpUrl, fmt.Sprint(r.config.FrameRate))
if err != nil {
return err
}
@ -247,10 +244,7 @@ func (r *Revid) reset(config Config) error {
}
r.destination[outNo] = s
case Rtp:
// TODO: framerate in config should probably be an int, make conversions early
// when setting config fields in revid-cli
fps, _ := strconv.Atoi(r.config.FrameRate)
s, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, fps)
s, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate)
if err != nil {
return err
}
@ -266,10 +260,10 @@ func (r *Revid) reset(config Config) error {
}
switch r.config.InputCodec {
case H264:
r.config.Logger.Log(smartlogger.Info, pkg+"using H264 lexer")
r.config.Logger.Log(logger.Info, pkg+"using H264 lexer")
r.lexTo = lex.H264
case Mjpeg:
r.config.Logger.Log(smartlogger.Info, pkg+"using MJPEG lexer")
r.config.Logger.Log(logger.Info, pkg+"using MJPEG lexer")
r.lexTo = lex.MJPEG
}
@ -291,15 +285,13 @@ func (r *Revid) reset(config Config) error {
}
r.encoder = stream.NopEncoder(&r.packer)
case Mpegts:
r.config.Logger.Log(smartlogger.Info, pkg+"using MPEGTS packetisation")
frameRate, _ := strconv.ParseFloat(r.config.FrameRate, 64)
r.encoder = mts.NewEncoder(&r.packer, frameRate)
r.config.Logger.Log(logger.Info, pkg+"using MPEGTS packetisation")
r.encoder = mts.NewEncoder(&r.packer, float64(r.config.FrameRate))
case Flv:
r.config.Logger.Log(smartlogger.Info, pkg+"using FLV packetisation")
frameRate, _ := strconv.Atoi(r.config.FrameRate)
r.encoder, err = flv.NewEncoder(&r.packer, true, true, frameRate)
r.config.Logger.Log(logger.Info, pkg+"using FLV packetisation")
r.encoder, err = flv.NewEncoder(&r.packer, true, true, int(r.config.FrameRate))
if err != nil {
return err
r.config.Logger.Log(logger.Fatal, pkg+"failed to open FLV encoder", err.Error())
}
}
@ -315,29 +307,29 @@ func (r *Revid) IsRunning() bool {
// and packetising (if theres packetization) to a defined output.
func (r *Revid) Start() {
if r.isRunning {
r.config.Logger.Log(smartlogger.Warning, pkg+"revid.Start() called but revid already running")
r.config.Logger.Log(logger.Warning, pkg+"revid.Start() called but revid already running")
return
}
r.config.Logger.Log(smartlogger.Info, pkg+"starting Revid")
r.config.Logger.Log(smartlogger.Debug, pkg+"setting up output")
r.config.Logger.Log(logger.Info, pkg+"starting Revid")
r.config.Logger.Log(logger.Debug, pkg+"setting up output")
r.isRunning = true
r.config.Logger.Log(smartlogger.Info, pkg+"starting output routine")
r.config.Logger.Log(logger.Info, pkg+"starting output routine")
go r.outputClips()
r.config.Logger.Log(smartlogger.Info, pkg+"setting up input and receiving content")
r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content")
go r.setupInput()
}
// Stop halts any processing of video data from a camera or file
func (r *Revid) Stop() {
if !r.isRunning {
r.config.Logger.Log(smartlogger.Warning, pkg+"revid.Stop() called but revid not running")
r.config.Logger.Log(logger.Warning, pkg+"revid.Stop() called but revid not running")
return
}
r.config.Logger.Log(smartlogger.Info, pkg+"stopping revid")
r.config.Logger.Log(logger.Info, pkg+"stopping revid")
r.isRunning = false
r.config.Logger.Log(smartlogger.Info, pkg+"killing input proccess")
r.config.Logger.Log(logger.Info, pkg+"killing input proccess")
// If a cmd process is running, we kill!
if r.cmd != nil && r.cmd.Process != nil {
r.cmd.Process.Kill()
@ -357,53 +349,53 @@ loop:
case nil:
// Do nothing.
case ring.ErrTimeout:
r.config.Logger.Log(smartlogger.Warning, pkg+"ring buffer read timeout")
r.config.Logger.Log(logger.Warning, pkg+"ring buffer read timeout")
continue
default:
r.config.Logger.Log(smartlogger.Error, pkg+"unexpected error", "error", err.Error())
r.config.Logger.Log(logger.Error, pkg+"unexpected error", "error", err.Error())
fallthrough
case io.EOF:
break loop
}
count += chunk.Len()
r.config.Logger.Log(smartlogger.Debug, pkg+"about to send")
r.config.Logger.Log(logger.Debug, pkg+"about to send")
for i, dest := range r.destination {
err = dest.load(chunk)
if err != nil {
r.config.Logger.Log(smartlogger.Error, pkg+"failed to load clip to output"+strconv.Itoa(i))
r.config.Logger.Log(logger.Error, pkg+"failed to load clip to output"+strconv.Itoa(i))
}
}
for i, dest := range r.destination {
err = dest.send()
if err == nil {
r.config.Logger.Log(smartlogger.Debug, pkg+"sent clip to output "+strconv.Itoa(i))
r.config.Logger.Log(logger.Debug, pkg+"sent clip to output "+strconv.Itoa(i))
} else if r.config.SendRetry == false {
r.config.Logger.Log(smartlogger.Warning, pkg+"send to output "+strconv.Itoa(i)+"failed", "error", err.Error())
r.config.Logger.Log(logger.Warning, pkg+"send to output "+strconv.Itoa(i)+"failed", "error", err.Error())
} else {
r.config.Logger.Log(smartlogger.Error, pkg+"send to output "+strconv.Itoa(i)+
r.config.Logger.Log(logger.Error, pkg+"send to output "+strconv.Itoa(i)+
"failed, trying again", "error", err.Error())
err = dest.send()
if err != nil && chunk.Len() > 11 {
r.config.Logger.Log(smartlogger.Error, pkg+"second send attempted failed, restarting connection", "error", err.Error())
r.config.Logger.Log(logger.Error, pkg+"second send attempted failed, restarting connection", "error", err.Error())
for err != nil {
time.Sleep(sendFailedDelay)
if rs, ok := dest.(restarter); ok {
r.config.Logger.Log(smartlogger.Debug, pkg+"restarting session", "session", rs)
r.config.Logger.Log(logger.Debug, pkg+"restarting session", "session", rs)
err = rs.restart()
if err != nil {
r.config.Logger.Log(smartlogger.Error, pkg+"failed to restart rtmp session", "error", err.Error())
r.config.Logger.Log(logger.Error, pkg+"failed to restart rtmp session", "error", err.Error())
r.isRunning = false
return
}
r.config.Logger.Log(smartlogger.Info, pkg+"restarted rtmp session")
r.config.Logger.Log(logger.Info, pkg+"restarted rtmp session")
}
err = dest.send()
if err != nil {
r.config.Logger.Log(smartlogger.Error, pkg+"send failed again, with error", "error", err.Error())
r.config.Logger.Log(logger.Error, pkg+"send failed again, with error", "error", err.Error())
}
}
}
@ -414,7 +406,7 @@ loop:
for _, dest := range r.destination {
dest.release()
}
r.config.Logger.Log(smartlogger.Debug, pkg+"done reading that clip from ring buffer")
r.config.Logger.Log(logger.Debug, pkg+"done reading that clip from ring buffer")
// Log some information regarding bitrate and ring buffer size if it's time
now := time.Now()
@ -422,17 +414,17 @@ loop:
if deltaTime > bitrateTime {
// FIXME(kortschak): For subsecond deltaTime, this will give infinite bitrate.
r.bitrate = int(float64(count*8) / float64(deltaTime/time.Second))
r.config.Logger.Log(smartlogger.Debug, pkg+"bitrate (bits/s)", "bitrate", r.bitrate)
r.config.Logger.Log(smartlogger.Debug, pkg+"ring buffer size", "value", r.buffer.Len())
r.config.Logger.Log(logger.Debug, pkg+"bitrate (bits/s)", "bitrate", r.bitrate)
r.config.Logger.Log(logger.Debug, pkg+"ring buffer size", "value", r.buffer.Len())
lastTime = now
count = 0
}
}
r.config.Logger.Log(smartlogger.Info, pkg+"not outputting clips anymore")
r.config.Logger.Log(logger.Info, pkg+"not outputting clips anymore")
for i, dest := range r.destination {
err := dest.close()
if err != nil {
r.config.Logger.Log(smartlogger.Error, pkg+"failed to close output"+strconv.Itoa(i)+" destination", "error", err.Error())
r.config.Logger.Log(logger.Error, pkg+"failed to close output"+strconv.Itoa(i)+" destination", "error", err.Error())
}
}
}
@ -440,17 +432,17 @@ loop:
// startRaspivid sets up things for input from raspivid i.e. starts
// a raspivid process and pipes it's data output.
func (r *Revid) startRaspivid() error {
r.config.Logger.Log(smartlogger.Info, pkg+"starting raspivid")
r.config.Logger.Log(logger.Info, pkg+"starting raspivid")
const disabled = "0"
args := []string{
"--output", "-",
"--nopreview",
"--timeout", disabled,
"--width", r.config.Width,
"--height", r.config.Height,
"--bitrate", r.config.Bitrate,
"--framerate", r.config.FrameRate,
"--width", fmt.Sprint(r.config.Width),
"--height", fmt.Sprint(r.config.Height),
"--bitrate", fmt.Sprint(r.config.Bitrate),
"--framerate", fmt.Sprint(r.config.FrameRate),
}
if r.config.FlipHorizontal {
args = append(args, "--hflip")
@ -465,48 +457,39 @@ func (r *Revid) startRaspivid() error {
args = append(args,
"--codec", "H264",
"--inline",
"--intra", r.config.IntraRefreshPeriod,
"--intra", fmt.Sprint(r.config.IntraRefreshPeriod),
)
if r.config.QuantizationMode == QuantizationOn {
args = append(args, "-qp", r.config.Quantization)
if r.config.Quantize {
args = append(args, "-qp", fmt.Sprint(r.config.Quantization))
}
case Mjpeg:
args = append(args, "--codec", "MJPEG")
}
r.config.Logger.Log(smartlogger.Info, pkg+"raspivid args", "raspividArgs", strings.Join(args, " "))
r.config.Logger.Log(logger.Info, pkg+"raspivid args", "raspividArgs", strings.Join(args, " "))
r.cmd = exec.Command("raspivid", args...)
d, err := strconv.Atoi(r.config.FrameRate)
if err != nil {
return err
}
delay := time.Second / time.Duration(d)
stdout, err := r.cmd.StdoutPipe()
if err != nil {
return err
}
err = r.cmd.Start()
if err != nil {
return err
r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error())
}
r.config.Logger.Log(smartlogger.Info, pkg+"reading camera data")
r.config.Logger.Log(logger.Info, pkg+"reading camera data")
delay := time.Second / time.Duration(r.config.FrameRate)
err = r.lexTo(r.encoder, stdout, delay)
r.config.Logger.Log(smartlogger.Info, pkg+"finished reading camera data")
r.config.Logger.Log(logger.Info, pkg+"finished reading camera data")
return err
}
// setupInputForFile sets things up for getting input from a file
func (r *Revid) setupInputForFile() error {
fps, err := strconv.Atoi(r.config.FrameRate)
if err != nil {
return err
}
delay := time.Second / time.Duration(fps)
delay := time.Second / time.Duration(r.config.FrameRate)
f, err := os.Open(r.config.InputFileName)
if err != nil {
r.config.Logger.Log(smartlogger.Error, err.Error())
r.config.Logger.Log(logger.Error, err.Error())
r.Stop()
return err
}

View File

@ -39,8 +39,8 @@ import (
"bitbucket.org/ausocean/av/stream/mts"
"bitbucket.org/ausocean/av/stream/rtp"
"bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring"
"bitbucket.org/ausocean/utils/smartlogger"
)
// loadSender is a destination to send a *ring.Chunk to.
@ -144,14 +144,16 @@ func (s *httpSender) send() error {
break
}
}
if !send {
return nil
}
var err error
var reply string
if send {
reply, _, err = s.client.Send(netsender.RequestRecv, pins)
if err != nil {
return err
}
}
return s.extractMeta(reply)
}
@ -166,19 +168,19 @@ func (s *httpSender) extractMeta(r string) error {
// Extract time from reply
t, err := dec.Int("ts")
if err != nil {
s.log(smartlogger.Warning, pkg+"No timestamp in reply")
s.log(logger.Warning, pkg+"No timestamp in reply")
} else {
s.log(smartlogger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t))
mts.SetTimeStamp(uint64(t))
s.log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t))
mts.MetaData.SetTimeStamp(uint64(t))
}
// Extract location from reply
g, err := dec.String("ll")
if err != nil {
s.log(smartlogger.Warning, pkg+"No location in reply")
s.log(logger.Warning, pkg+"No location in reply")
} else {
s.log(smartlogger.Debug, fmt.Sprintf("%v got location: %v", pkg, g))
mts.SetLocation(g)
s.log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g))
mts.MetaData.SetLocation(g)
}
return nil
@ -264,15 +266,15 @@ func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg
var sess *rtmp.Session
var err error
for n := 0; n < retries; n++ {
sess = rtmp.NewSession(url, timeout)
sess = rtmp.NewSession(url, timeout, log)
err = sess.Open()
if err == nil {
break
}
log(smartlogger.Error, err.Error())
log(logger.Error, err.Error())
sess.Close()
if n < retries-1 {
log(smartlogger.Info, pkg+"retry rtmp connection")
log(logger.Info, pkg+"retry rtmp connection")
}
}
if err != nil {
@ -310,15 +312,15 @@ func (s *rtmpSender) restart() error {
return err
}
for n := 0; n < s.retries; n++ {
s.sess = rtmp.NewSession(s.url, s.timeout)
s.sess = rtmp.NewSession(s.url, s.timeout, s.log)
err = s.sess.Open()
if err == nil {
break
}
s.log(smartlogger.Error, err.Error())
s.log(logger.Error, err.Error())
s.sess.Close()
if n < s.retries-1 {
s.log(smartlogger.Info, pkg+"retry rtmp connection")
s.log(logger.Info, pkg+"retry rtmp connection")
}
}
return err
@ -371,14 +373,14 @@ type rtpSender struct {
encoder *rtp.Encoder
}
func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps int) (*rtpSender, error) {
func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps uint) (*rtpSender, error) {
conn, err := net.Dial("udp", addr)
if err != nil {
return nil, err
}
s := &rtpSender{
log: log,
encoder: rtp.NewEncoder(conn, fps),
encoder: rtp.NewEncoder(conn, int(fps)),
}
return s, nil
}

View File

@ -9,7 +9,7 @@ AUTHORS
Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE
amf_headers.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
amf_headers.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the

515
rtmp/packet.go Normal file
View File

@ -0,0 +1,515 @@
/*
NAME
packet.go
DESCRIPTION
RTMP packet functionality.
AUTHORS
Saxon Nelson-Milton <saxon@ausocean.org>
Dan Kortschak <dan@ausocean.org>
Alan Noble <alan@ausocean.org>
LICENSE
packet.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
Derived from librtmp under the GNU Lesser General Public License 2.1
Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org
Copyright (C) 2008-2009 Andrej Stepanchuk
Copyright (C) 2009-2010 Howard Chu
*/
package rtmp
import (
"encoding/binary"
"io"
)
// Packet types.
const (
packetTypeChunkSize = 0x01
packetTypeBytesReadReport = 0x03
packetTypeControl = 0x04
packetTypeServerBW = 0x05
packetTypeClientBW = 0x06
packetTypeAudio = 0x08
packetTypeVideo = 0x09
packetTypeFlexStreamSend = 0x0F // not implemented
packetTypeFlexSharedObject = 0x10 // not implemented
packetTypeFlexMessage = 0x11 // not implemented
packetTypeInfo = 0x12
packetTypeInvoke = 0x14
packetTypeFlashVideo = 0x16 // not implemented
)
// Header sizes.
const (
headerSizeLarge = 0
headerSizeMedium = 1
headerSizeSmall = 2
headerSizeMinimum = 3
headerSizeAuto = 4
)
// Special channels.
const (
chanBytesRead = 0x02
chanControl = 0x03
chanSource = 0x04
)
// headerSizes defines header sizes for header types 0, 1, 2 and 3 respectively:
// 0: full header (12 bytes)
// 1: header without message ID (8 bytes)
// 2: basic header + timestamp (4 byes)
// 3: basic header (chunk type and stream ID) (1 byte)
var headerSizes = [...]int{12, 8, 4, 1}
// packet defines an RTMP packet.
type packet struct {
headerType uint8
packetType uint8
channel int32
hasAbsTimestamp bool
timestamp uint32
info int32
bodySize uint32
bytesRead uint32
chunk *chunk
header []byte
body []byte
}
// chunk defines an RTMP packet chunk.
type chunk struct {
headerSize int32
data []byte
header [fullHeaderSize]byte
}
// read reads a packet.
func (pkt *packet) read(s *Session) error {
var hbuf [fullHeaderSize]byte
header := hbuf[:]
_, err := s.read(header[:1])
if err != nil {
s.log(DebugLevel, pkg+"failed to read packet header 1st byte", "error", err.Error())
if err == io.EOF {
s.log(WarnLevel, pkg+"EOF error; connection likely terminated")
}
return err
}
pkt.headerType = (header[0] & 0xc0) >> 6
pkt.channel = int32(header[0] & 0x3f)
header = header[1:]
switch {
case pkt.channel == 0:
_, err = s.read(header[:1])
if err != nil {
s.log(DebugLevel, pkg+"failed to read packet header 2nd byte", "error", err.Error())
return err
}
header = header[1:]
pkt.channel = int32(header[0]) + 64
case pkt.channel == 1:
_, err = s.read(header[:2])
if err != nil {
s.log(DebugLevel, pkg+"failed to read packet header 3rd byte", "error", err.Error())
return err
}
header = header[2:]
pkt.channel = int32(binary.BigEndian.Uint16(header[:2])) + 64
}
if pkt.channel >= s.channelsAllocatedIn {
n := pkt.channel + 10
timestamp := append(s.channelTimestamp, make([]int32, 10)...)
var pkts []*packet
if s.channelsIn == nil {
pkts = make([]*packet, n)
} else {
pkts = append(s.channelsIn[:pkt.channel:pkt.channel], make([]*packet, 10)...)
}
s.channelTimestamp = timestamp
s.channelsIn = pkts
for i := int(s.channelsAllocatedIn); i < len(s.channelTimestamp); i++ {
s.channelTimestamp[i] = 0
}
for i := int(s.channelsAllocatedIn); i < int(n); i++ {
s.channelsIn[i] = nil
}
s.channelsAllocatedIn = n
}
size := headerSizes[pkt.headerType]
switch {
case size == fullHeaderSize:
pkt.hasAbsTimestamp = true
case size < fullHeaderSize:
if s.channelsIn[pkt.channel] != nil {
*pkt = *(s.channelsIn[pkt.channel])
}
}
size--
if size > 0 {
_, err = s.read(header[:size])
if err != nil {
s.log(DebugLevel, pkg+"failed to read packet header", "error", err.Error())
return err
}
}
hSize := len(hbuf) - len(header) + size
if size >= 3 {
pkt.timestamp = C_AMF_DecodeInt24(header[:3])
if size >= 6 {
pkt.bodySize = C_AMF_DecodeInt24(header[3:6])
pkt.bytesRead = 0
if size > 6 {
pkt.packetType = header[6]
if size == 11 {
pkt.info = decodeInt32LE(header[7:11])
}
}
}
}
extendedTimestamp := pkt.timestamp == 0xffffff
if extendedTimestamp {
_, err = s.read(header[size : size+4])
if err != nil {
s.log(DebugLevel, pkg+"failed to read extended timestamp", "error", err.Error())
return err
}
// TODO: port this
pkt.timestamp = C_AMF_DecodeInt32(header[size : size+4])
hSize += 4
}
if pkt.bodySize > 0 && pkt.body == nil {
pkt.resize(pkt.bodySize, (hbuf[0]&0xc0)>>6)
}
toRead := int32(pkt.bodySize - pkt.bytesRead)
chunkSize := s.inChunkSize
if toRead < chunkSize {
chunkSize = toRead
}
if pkt.chunk != nil {
panic("non-nil chunk")
pkt.chunk.headerSize = int32(hSize)
copy(pkt.chunk.header[:], hbuf[:hSize])
pkt.chunk.data = pkt.body[pkt.bytesRead : pkt.bytesRead+uint32(chunkSize)]
}
_, err = s.read(pkt.body[pkt.bytesRead:][:chunkSize])
if err != nil {
s.log(DebugLevel, pkg+"failed to read packet body", "error", err.Error())
return err
}
pkt.bytesRead += uint32(chunkSize)
// keep the packet as ref for other packets on this channel
if s.channelsIn[pkt.channel] == nil {
s.channelsIn[pkt.channel] = &packet{}
}
*(s.channelsIn[pkt.channel]) = *pkt
if extendedTimestamp {
s.channelsIn[pkt.channel].timestamp = 0xffffff
}
if pkt.bytesRead != pkt.bodySize {
panic("readPacket: bytesRead != bodySize")
}
if !pkt.hasAbsTimestamp {
// timestamps seem to always be relative
pkt.timestamp += uint32(s.channelTimestamp[pkt.channel])
}
s.channelTimestamp[pkt.channel] = int32(pkt.timestamp)
s.channelsIn[pkt.channel].body = nil
s.channelsIn[pkt.channel].bytesRead = 0
s.channelsIn[pkt.channel].hasAbsTimestamp = false
return nil
}
// resize adjusts the packet's storage to accommodate a body of the given size and header type.
func (pkt *packet) resize(size uint32, ht uint8) {
buf := make([]byte, fullHeaderSize+size)
pkt.header = buf
pkt.body = buf[fullHeaderSize:]
if ht != headerSizeAuto {
pkt.headerType = ht
return
}
switch pkt.packetType {
case packetTypeVideo, packetTypeAudio:
if pkt.timestamp == 0 {
pkt.headerType = headerSizeLarge
} else {
pkt.headerType = headerSizeMedium
}
case packetTypeInfo:
pkt.headerType = headerSizeLarge
pkt.bodySize += 16
default:
pkt.headerType = headerSizeMedium
}
}
// write sends a packet.
// When queue is true, we expect a response to this request and cache the method on s.methodCalls.
func (pkt *packet) write(s *Session, queue bool) error {
if pkt.body == nil {
return errInvalidBody
}
if pkt.channel >= s.channelsAllocatedOut {
s.log(DebugLevel, pkg+"growing channelsOut", "channel", pkt.channel)
n := int(pkt.channel + 10)
var pkts []*packet
if s.channelsOut == nil {
pkts = make([]*packet, n)
} else {
pkts = append(s.channelsOut[:pkt.channel:pkt.channel], make([]*packet, 10)...)
}
s.channelsOut = pkts
for i := int(s.channelsAllocatedOut); i < n; i++ {
s.channelsOut[i] = nil
}
s.channelsAllocatedOut = int32(n)
}
prevPkt := s.channelsOut[pkt.channel]
var last int
if prevPkt != nil && pkt.headerType != headerSizeLarge {
// compress a bit by using the prev packet's attributes
if prevPkt.bodySize == pkt.bodySize && prevPkt.packetType == pkt.packetType && pkt.headerType == headerSizeMedium {
pkt.headerType = headerSizeSmall
}
if prevPkt.timestamp == pkt.timestamp && pkt.headerType == headerSizeSmall {
pkt.headerType = headerSizeMinimum
}
last = int(prevPkt.timestamp)
}
if pkt.headerType > 3 {
s.log(WarnLevel, pkg+"unexpected header type", "type", pkt.headerType)
return errInvalidHeader
}
// The complete packet starts from headerSize _before_ the start the body.
// origIdx is the original offset, which will be 0 for a full (12-byte) header or 11 for a minimum (1-byte) header.
headBytes := pkt.header
hSize := headerSizes[pkt.headerType]
origIdx := fullHeaderSize - hSize
// adjust 1 or 2 bytes for the channel
cSize := 0
switch {
case pkt.channel > 319:
cSize = 2
case pkt.channel > 63:
cSize = 1
}
if cSize != 0 {
origIdx -= cSize
hSize += cSize
}
// adjust 4 bytes for the timestamp
var ts uint32
if prevPkt != nil {
ts = uint32(int(pkt.timestamp) - last)
}
if ts >= 0xffffff {
origIdx -= 4
hSize += 4
s.log(DebugLevel, pkg+"larger timestamp than 24 bits", "timestamp", ts)
}
headerIdx := origIdx
c := pkt.headerType << 6
switch cSize {
case 0:
c |= byte(pkt.channel)
case 1:
// Do nothing.
case 2:
c |= 1
}
headBytes[headerIdx] = c
headerIdx++
if cSize != 0 {
tmp := pkt.channel - 64
headBytes[headerIdx] = byte(tmp & 0xff)
headerIdx++
if cSize == 2 {
headBytes[headerIdx] = byte(tmp >> 8)
headerIdx++
}
}
if headerSizes[pkt.headerType] > 1 {
res := ts
if ts > 0xffffff {
res = 0xffffff
}
C_AMF_EncodeInt24(headBytes[headerIdx:], int32(res))
headerIdx += 3 // 24bits
}
if headerSizes[pkt.headerType] > 4 {
C_AMF_EncodeInt24(headBytes[headerIdx:], int32(pkt.bodySize))
headerIdx += 3 // 24bits
headBytes[headerIdx] = pkt.packetType
headerIdx++
}
if headerSizes[pkt.headerType] > 8 {
binary.LittleEndian.PutUint32(headBytes[headerIdx:headerIdx+4], uint32(pkt.info))
headerIdx += 4 // 32bits
}
if ts >= 0xffffff {
C_AMF_EncodeInt32(headBytes[headerIdx:], int32(ts))
headerIdx += 4 // 32bits
}
size := int(pkt.bodySize)
chunkSize := int(s.outChunkSize)
if s.deferred == nil {
// Defer sending small audio packets (at most once).
if pkt.packetType == packetTypeAudio && size < chunkSize {
s.deferred = headBytes[origIdx:][:size+hSize]
s.log(DebugLevel, pkg+"deferred sending packet", "size", size, "la", s.link.conn.LocalAddr(), "ra", s.link.conn.RemoteAddr())
return nil
}
} else {
// Send previously deferrd packet if combining it with the next one would exceed the chunk size.
if len(s.deferred)+size+hSize > chunkSize {
s.log(DebugLevel, pkg+"sending deferred packet separately", "size", len(s.deferred))
_, err := s.write(s.deferred)
if err != nil {
return err
}
s.deferred = nil
}
}
// TODO(kortschak): Rewrite this horrific peice of premature optimisation.
// NB: RTMP wants packets in chunks which are 128 bytes by default, but the server may request a different size.
s.log(DebugLevel, pkg+"sending packet", "la", s.link.conn.LocalAddr(), "ra", s.link.conn.RemoteAddr(), "size", size)
for size+hSize != 0 {
if chunkSize > size {
chunkSize = size
}
bytes := headBytes[origIdx:][:chunkSize+hSize]
if s.deferred != nil {
// Prepend the previously deferred packet and write it with the current one.
s.log(DebugLevel, pkg+"combining deferred packet", "size", len(s.deferred))
bytes = append(s.deferred, bytes...)
}
_, err := s.write(bytes)
if err != nil {
return err
}
s.deferred = nil
size -= chunkSize
origIdx += chunkSize + hSize
hSize = 0
if size > 0 {
origIdx -= 1 + cSize
hSize = 1 + cSize
if ts >= 0xffffff {
origIdx -= 4
hSize += 4
}
headBytes[origIdx] = 0xc0 | c
if cSize != 0 {
tmp := int(pkt.channel) - 64
headBytes[origIdx+1] = byte(tmp)
if cSize == 2 {
headBytes[origIdx+2] = byte(tmp >> 8)
}
}
if ts >= 0xffffff {
extendedTimestamp := headBytes[origIdx+1+cSize:]
C_AMF_EncodeInt32(extendedTimestamp[:4], int32(ts))
}
}
}
// We invoked a remote method
if pkt.packetType == packetTypeInvoke {
buf := pkt.body[1:]
meth := C_AMF_DecodeString(buf)
s.log(DebugLevel, pkg+"invoking method "+meth)
// keep it in call queue till result arrives
if queue {
buf = buf[3+len(meth):]
txn := int32(C_AMF_DecodeNumber(buf[:8]))
s.methodCalls = append(s.methodCalls, method{name: meth, num: txn})
}
}
if s.channelsOut[pkt.channel] == nil {
s.channelsOut[pkt.channel] = &packet{}
}
*(s.channelsOut[pkt.channel]) = *pkt
return nil
}
func decodeInt32LE(data []byte) int32 {
return int32(data[3])<<24 | int32(data[2])<<16 | int32(data[1])<<8 | int32(data[0])
}
func encodeInt32LE(dst []byte, v int32) int32 {
binary.LittleEndian.PutUint32(dst, uint32(v))
return 4
}

View File

@ -8,9 +8,10 @@ DESCRIPTION
AUTHOR
Dan Kortschak <dan@ausocean.org>
Saxon Nelson-Milton <saxon@ausocean.org>
Alan Noble <alan@ausocean.org>
LICENSE
parseurl.go is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean)
parseurl.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
@ -33,53 +34,50 @@ LICENSE
package rtmp
import (
"log"
"net/url"
"path"
"strconv"
"strings"
)
// int RTMP_ParseURL(const char *url, int *protocol, AVal *host, unsigned int *port, AVal *playpath, AVal *app);
// parseurl.c +33
func C_RTMP_ParseURL(addr string) (protocol int32, host string, port uint16, app, playpath string, ok bool) {
// parseURL parses an RTMP URL (ok, technically it is lexing).
//
func parseURL(addr string) (protocol int32, host string, port uint16, app, playpath string, err error) {
u, err := url.Parse(addr)
if err != nil {
log.Printf("failed to parse addr: %v", err)
return protocol, host, port, app, playpath, false
return protocol, host, port, app, playpath, err
}
switch u.Scheme {
case "rtmp":
protocol = RTMP_PROTOCOL_RTMP
protocol = protoRTMP
case "rtmpt":
protocol = RTMP_PROTOCOL_RTMPT
protocol = protoRTMPT
case "rtmps":
protocol = RTMP_PROTOCOL_RTMPS
protocol = protoRTMPS
case "rtmpe":
protocol = RTMP_PROTOCOL_RTMPE
protocol = protoRTMPE
case "rtmfp":
protocol = RTMP_PROTOCOL_RTMFP
protocol = protoRTMFP
case "rtmpte":
protocol = RTMP_PROTOCOL_RTMPTE
protocol = protoRTMPTE
case "rtmpts":
protocol = RTMP_PROTOCOL_RTMPTS
protocol = protoRTMPTS
default:
log.Printf("unknown scheme: %q", u.Scheme)
return protocol, host, port, app, playpath, false
return protocol, host, port, app, playpath, errUnknownScheme
}
host = u.Host
if p := u.Port(); p != "" {
pi, err := strconv.Atoi(p)
if err != nil {
return protocol, host, port, app, playpath, false
return protocol, host, port, app, playpath, err
}
port = uint16(pi)
}
if !path.IsAbs(u.Path) {
return protocol, host, port, app, playpath, true
return protocol, host, port, app, playpath, nil
}
elems := strings.SplitN(u.Path[1:], "/", 3)
app = elems[0]
@ -99,5 +97,5 @@ func C_RTMP_ParseURL(addr string) (protocol int32, host string, port uint16, app
}
}
return protocol, host, port, app, playpath, true
return protocol, host, port, app, playpath, nil
}

File diff suppressed because it is too large Load Diff

View File

@ -1,221 +0,0 @@
/*
NAME
rtmp_headers.go
DESCRIPTION
See Readme.md
AUTHORS
Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE
rtmp_headers.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
Derived from librtmp under the GNU Lesser General Public License 2.1
Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org
Copyright (C) 2008-2009 Andrej Stepanchuk
Copyright (C) 2009-2010 Howard Chu
*/
package rtmp
import "net"
const (
RTMPT_OPEN = iota
RTMPT_SEND
RTMPT_IDLE
RTMPT_CLOSE
)
const (
RTMP_PACKET_TYPE_CHUNK_SIZE = 0x01
RTMP_PACKET_TYPE_BYTES_READ_REPORT = 0x03
RTMP_PACKET_TYPE_CONTROL = 0x04
RTMP_PACKET_TYPE_SERVER_BW = 0x05
RTMP_PACKET_TYPE_CLIENT_BW = 0x06
RTMP_PACKET_TYPE_AUDIO = 0x08
RTMP_PACKET_TYPE_VIDEO = 0x09
RTMP_PACKET_TYPE_FLEX_STREAM_SEND = 0x0F
RTMP_PACKET_TYPE_FLEX_SHARED_OBJECT = 0x10
RTMP_PACKET_TYPE_FLEX_MESSAGE = 0x11
RTMP_PACKET_TYPE_INFO = 0x12
RTMP_PACKET_TYPE_INVOKE = 0x14
RTMP_PACKET_TYPE_FLASH_VIDEO = 0x16
)
const (
RTMP_PACKET_SIZE_LARGE = 0
RTMP_PACKET_SIZE_MEDIUM = 1
RTMP_PACKET_SIZE_SMALL = 2
RTMP_PACKET_SIZE_MINIMUM = 3
)
const (
RTMP_READ_HEADER = 0x01
RTMP_READ_RESUME = 0x02
RTMP_READ_NO_IGNORE = 0x04
RTMP_READ_GOTKF = 0x08
RTMP_READ_GOTFLVK = 0x10
RTMP_READ_SEEKING = 0x20
RTMP_READ_COMPLETE = -3
RTMP_READ_ERROR = -2
RTMP_READ_EOF = -1
RTMP_READ_IGNORE = 0
)
const (
RTMP_LF_AUTH = 0x0001 /* using auth param */
RTMP_LF_LIVE = 0x0002 /* stream is live */
RTMP_LF_SWFV = 0x0004 /* do SWF verification */
RTMP_LF_PLST = 0x0008 /* send playlist before play */
RTMP_LF_BUFX = 0x0010 /* toggle stream on BufferEmpty msg */
RTMP_LF_FTCU = 0x0020 /* free tcUrl on close */
RTMP_LF_FAPU = 0x0040 /* free app on close */
)
const (
RTMP_FEATURE_HTTP = 0x01
RTMP_FEATURE_ENC = 0x02
RTMP_FEATURE_SSL = 0x04
RTMP_FEATURE_MFP = 0x08 /* not yet supported */
RTMP_FEATURE_WRITE = 0x10 /* publish, not play */
RTMP_FEATURE_HTTP2 = 0x20 /* server-side rtmpt */
)
const (
RTMP_PROTOCOL_RTMP = 0
RTMP_PROTOCOL_RTMPE = RTMP_FEATURE_ENC
RTMP_PROTOCOL_RTMPT = RTMP_FEATURE_HTTP
RTMP_PROTOCOL_RTMPS = RTMP_FEATURE_SSL
RTMP_PROTOCOL_RTMPTE = (RTMP_FEATURE_HTTP | RTMP_FEATURE_ENC)
RTMP_PROTOCOL_RTMPTS = (RTMP_FEATURE_HTTP | RTMP_FEATURE_SSL)
RTMP_PROTOCOL_RTMFP = RTMP_FEATURE_MFP
)
const (
RTMP_DEFAULT_CHUNKSIZE = 128
RTMP_BUFFER_CACHE_SIZE = (16 * 1024)
RTMP_SIG_SIZE = 1536
RTMP_LARGE_HEADER_SIZE = 12
RTMP_MAX_HEADER_SIZE = 18
)
// typedef struct RTMPChunk
// rtmp.h +105
type C_RTMPChunk struct {
c_headerSize int32
c_chunk []byte
c_header [RTMP_MAX_HEADER_SIZE]byte
}
// typedef struct RTMPPacket
// rtmp.h +113
type C_RTMPPacket struct {
m_headerType uint8
m_packetType uint8
m_hasAbsTimestamp bool
m_nChannel int32
m_nTimeStamp uint32
m_nInfoField2 int32
m_nBodySize uint32
m_nBytesRead uint32
m_chunk *C_RTMPChunk
m_header []byte
m_body []byte
}
// typedef struct RTMPSockBuf
// rtmp.h +127
type C_RTMPSockBuf struct {
conn *net.TCPConn
sb_size int
sb_start int
sb_buf [RTMP_BUFFER_CACHE_SIZE]byte // port const
sb_timedout bool
}
// RTMPPacket_IsReady(a)
// rtmp.h +142
func C_RTMPPacket_IsReady(p *C_RTMPPacket) bool {
return p.m_nBytesRead == p.m_nBodySize
}
// typedef struct RTMP_LNK
// rtmp.h +144
type C_RTMP_LNK struct {
hostname string
sockshost string
playpath0 string
playpath string
tcUrl string
swfUrl string
pageUrl string
app string
auth string
flashVer string
token string
extras C_AMFObject
seekTime int32
lFlags int32
swfAge int32
protocol int32
timeout int32
socksport uint16
port uint16
}
// typedef struct RTMPMethod
// rtmp.h +231
type C_RTMP_METHOD struct {
name string
num int32
}
// typedef struct RTMP
// rtmp.h +237
type C_RTMP struct {
m_inChunkSize int32
m_outChunkSize int32
m_nBWCheckCounter int32
m_nBytesIn int32
m_nBytesInSent int32
m_nBufferMS int32
m_stream_id int32
m_mediaChannel int32
m_pausing int32
m_nServerBW int32
m_nClientBW int32
m_nClientBW2 uint8
m_bPlaying bool
m_bSendEncoding bool
m_bSendCounter bool
m_numInvokes int32
m_methodCalls []C_RTMP_METHOD
m_channelsAllocatedIn int32
m_channelsAllocatedOut int32
m_vecChannelsIn []*C_RTMPPacket
m_vecChannelsOut []*C_RTMPPacket
m_channelTimestamp []int32
m_fAudioCodecs float64
m_fVideoCodecs float64
m_fEncoding float64
m_fDuration float64
m_msgCounter int32
m_resplen int32
m_unackd int32
m_write C_RTMPPacket
m_sb C_RTMPSockBuf
Link C_RTMP_LNK
}

View File

@ -1,7 +0,0 @@
package rtmp
// #define SET_RCVTIMEO(tv,s) int tv = s*1000
// rtmp_sys.h +43
func SET_RCVTIMEO(tv *int32, s int32) {
*tv = s * 1000
}

238
rtmp/rtmp_test.go Normal file
View File

@ -0,0 +1,238 @@
/*
NAME
rtmp_test.go
DESCRIPTION
RTMP tests
AUTHORS
Saxon Nelson-Milton <saxon@ausocean.org>
Dan Kortschak <dan@ausocean.org>
Alan Noble <alan@ausocean.org>
LICENSE
rtmp_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package rtmp
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"testing"
"time"
"bitbucket.org/ausocean/av/stream/flv"
"bitbucket.org/ausocean/av/stream/lex"
)
const (
rtmpProtocol = "rtmp"
testHost = "a.rtmp.youtube.com"
testApp = "live2"
testBaseURL = rtmpProtocol + "://" + testHost + "/" + testApp + "/"
testTimeout = 30
testDataDir = "../../test/test-data/av/input"
)
// testVerbosity controls the amount of output.
// NB: This is not the log level, which is DebugLevel.
// 0: suppress logging completely
// 1: log messages only
// 2: log messages with errors, if any
var testVerbosity = 1
// testKey is the YouTube RTMP key required for YouTube streaming (RTMP_TEST_KEY env var).
// NB: don't share your key with others.
var testKey string
// testFile is the test video file (RTMP_TEST_FILE env var).
// betterInput.h264 is a good one to use.
var testFile string
// testLog is a bare bones logger that logs to stdout, and exits upon either an error or fatal error.
func testLog(level int8, msg string, params ...interface{}) {
logLevels := [...]string{"Debug", "Info", "Warn", "Error", "", "", "Fatal"}
if testVerbosity == 0 {
return
}
if level < -1 || level > 5 {
panic("Invalid log level")
}
switch testVerbosity {
case 0:
// silence is golden
case 1:
fmt.Printf("%s: %s\n", logLevels[level+1], msg)
case 2:
// extract the first param if it is one we care about, otherwise just print the message
if len(params) >= 2 {
switch params[0].(string) {
case "error":
fmt.Printf("%s: %s, error=%v\n", logLevels[level+1], msg, params[1].(string))
case "size":
fmt.Printf("%s: %s, size=%d\n", logLevels[level+1], msg, params[1].(int))
default:
fmt.Printf("%s: %s\n", logLevels[level+1], msg)
}
} else {
fmt.Printf("%s: %s\n", logLevels[level+1], msg)
}
}
if level >= 4 {
// Error or Fatal
buf := make([]byte, 1<<16)
size := runtime.Stack(buf, true)
fmt.Printf("%s\n", string(buf[:size]))
os.Exit(1)
}
}
// TestKey tests that the RTMP_TEST_KEY environment variable is present
func TestKey(t *testing.T) {
testLog(0, "TestKey")
testKey = os.Getenv("RTMP_TEST_KEY")
if testKey == "" {
msg := "RTMP_TEST_KEY environment variable not defined"
testLog(0, msg)
t.Skip(msg)
}
testLog(0, "Testing against URL "+testBaseURL+testKey)
}
// TestSetupURL tests URL parsing.
func TestSetupURL(t *testing.T) {
testLog(0, "TestSetupURL")
// test with just the base URL
s := NewSession(testBaseURL, testTimeout, testLog)
if s.url != testBaseURL && s.link.timeout != testTimeout {
t.Errorf("NewSession failed")
}
err := setupURL(s)
if err != nil {
t.Errorf("setupURL(testBaseURL) failed with error: %v", err)
}
// test the parts are as expected
if rtmpProtocolStrings[s.link.protocol] != rtmpProtocol {
t.Errorf("setupURL returned wrong protocol: %v", s.link.protocol)
}
if s.link.host != testHost {
t.Errorf("setupURL returned wrong host: %v", s.link.host)
}
if s.link.app != testApp {
t.Errorf("setupURL returned wrong app: %v", s.link.app)
}
}
// TestOpenClose tests opening an closing an RTMP connection.
func TestOpenClose(t *testing.T) {
testLog(0, "TestOpenClose")
if testKey == "" {
t.Skip("Skipping TestOpenClose since no RTMP_TEST_KEY")
}
s := NewSession(testBaseURL+testKey, testTimeout, testLog)
err := s.Open()
if err != nil {
t.Errorf("Open failed with error: %v", err)
return
}
err = s.Close()
if err != nil {
t.Errorf("Close failed with error: %v", err)
return
}
}
// TestFromFrame tests streaming from a single H.264 frame which is repeated.
func TestFromFrame(t *testing.T) {
testLog(0, "TestFromFrame")
if testKey == "" {
t.Skip("Skipping TestFromFrame since no RTMP_TEST_KEY")
}
s := NewSession(testBaseURL+testKey, testTimeout, testLog)
err := s.Open()
if err != nil {
t.Errorf("Session.Open failed with error: %v", err)
}
b, err := ioutil.ReadFile(filepath.Join(testDataDir, "AusOcean_logo_1080p.h264"))
if err != nil {
t.Errorf("ReadFile failed with error: %v", err)
}
// Pass RTMP session, true for audio, true for video, and 25 FPS
// ToDo: fix this. Although we can encode the file and YouTube
// doesn't complain, YouTube doesn't play it (even when we
// send 1 minute's worth).
flvEncoder, err := flv.NewEncoder(s, true, true, 25)
if err != nil {
t.Fatalf("failed to create encoder: %v", err)
}
for i := 0; i < 25; i++ {
err := flvEncoder.Encode(b)
if err != nil {
t.Errorf("Encoding failed with error: %v", err)
}
time.Sleep(time.Millisecond / 25) // rate limit to 1/25s
}
err = s.Close()
if err != nil {
t.Errorf("Session.Close failed with error: %v", err)
}
}
// TestFromFile tests streaming from an video file comprising raw H.264.
// The test file is supplied via the RTMP_TEST_FILE environment variable.
func TestFromFile(t *testing.T) {
testLog(0, "TestFromFile")
testFile := os.Getenv("RTMP_TEST_FILE")
if testFile == "" {
t.Skip("Skipping TestFromFile since no RTMP_TEST_FILE")
}
if testKey == "" {
t.Skip("Skipping TestFromFile since no RTMP_TEST_KEY")
}
s := NewSession(testBaseURL+testKey, testTimeout, testLog)
err := s.Open()
if err != nil {
t.Errorf("Session.Open failed with error: %v", err)
}
f, err := os.Open(testFile)
if err != nil {
t.Errorf("Open failed with error: %v", err)
}
defer f.Close()
// Pass RTMP session, true for audio, true for video, and 25 FPS
flvEncoder, err := flv.NewEncoder(s, true, true, 25)
if err != nil {
t.Fatalf("failed to create encoder: %v", err)
}
err = lex.H264(flvEncoder, f, time.Second/time.Duration(25))
if err != nil {
t.Errorf("Lexing and encoding failed with error: %v", err)
}
err = s.Close()
if err != nil {
t.Errorf("Session.Close failed with error: %v", err)
}
}

View File

@ -3,14 +3,15 @@ NAME
session.go
DESCRIPTION
See Readme.md
RTMP session functionality.
AUTHORS
Saxon Nelson-Milton <saxon@ausocean.org>
Dan Kortschak <dan@ausocean.org>
Alan Noble <alan@ausocean.org>
LICENSE
session.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
session.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
@ -33,65 +34,216 @@ LICENSE
package rtmp
import (
"errors"
"io"
"net"
"time"
)
// session provides parameters required for an rtmp communication session.
// Session holds the state for an RTMP session.
type Session struct {
rtmp *C_RTMP
url string
timeout uint
inChunkSize int32
outChunkSize int32
checkCounter int32
nBytesIn int32
nBytesInSent int32
streamID int32
serverBW int32
clientBW int32
clientBW2 uint8
isPlaying bool
sendEncoding bool
numInvokes int32
methodCalls []method
channelsAllocatedIn int32
channelsAllocatedOut int32
channelsIn []*packet
channelsOut []*packet
channelTimestamp []int32
audioCodecs float64
videoCodecs float64
encoding float64
deferred []byte
link link
log Log
}
// NewSession returns a new session.
func NewSession(url string, connectTimeout uint) *Session {
// link represents RTMP URL and connection information.
type link struct {
host string
playpath string
tcUrl string
swfUrl string
pageUrl string
app string
auth string
flashVer string
token string
extras C_AMFObject
flags int32
swfAge int32
protocol int32
timeout uint
port uint16
conn *net.TCPConn
}
// method represents an RTMP method.
type method struct {
name string
num int32
}
// Log defines the RTMP logging function.
type Log func(level int8, message string, params ...interface{})
// Log levels used by Log.
const (
DebugLevel int8 = -1
InfoLevel int8 = 0
WarnLevel int8 = 1
ErrorLevel int8 = 2
FatalLevel int8 = 5
)
// NewSession returns a new Session.
func NewSession(url string, timeout uint, log Log) *Session {
return &Session{
url: url,
timeout: connectTimeout,
inChunkSize: 128,
outChunkSize: 128,
clientBW: 2500000,
clientBW2: 2,
serverBW: 2500000,
audioCodecs: 3191.0,
videoCodecs: 252.0,
log: log,
link: link{
timeout: timeout,
swfAge: 30,
},
}
}
// Open establishes an rtmp connection with the url passed into the
// constructor
// Open establishes an rtmp connection with the url passed into the constructor.
func (s *Session) Open() error {
if s.rtmp != nil {
return errors.New("rtmp: attempt to start already running session")
s.log(DebugLevel, pkg+"Session.Open")
if s.isConnected() {
return errConnected
}
var err error
s.rtmp, err = startSession(s.rtmp, s.url, uint32(s.timeout))
if s.rtmp == nil {
err := setupURL(s)
if err != nil {
return err
}
s.enableWrite()
err = connect(s)
if err != nil {
s.Close()
return err
}
err = connectStream(s)
if err != nil {
s.Close()
return err
}
return nil
}
// Close terminates the rtmp connection
// Close terminates the rtmp connection.
// NB: Close is idempotent and the session value is cleared completely.
func (s *Session) Close() error {
if s.rtmp == nil {
return Err(3)
s.log(DebugLevel, pkg+"Session.Close")
if !s.isConnected() {
return errNotConnected
}
ret := endSession(s.rtmp)
s.rtmp = nil
if ret != 0 {
return Err(ret)
if s.streamID > 0 {
if s.link.protocol&featureWrite != 0 {
sendFCUnpublish(s)
}
sendDeleteStream(s, float64(s.streamID))
}
s.link.conn.Close()
*s = Session{}
return nil
}
// Write writes a frame (flv tag) to the rtmp connection
// Write writes a frame (flv tag) to the rtmp connection.
func (s *Session) Write(data []byte) (int, error) {
if s.rtmp == nil {
return 0, Err(3)
if !s.isConnected() {
return 0, errNotConnected
}
if len(data) < minDataSize {
return 0, errTinyPacket
}
if data[0] == packetTypeInfo || (data[0] == 'F' && data[1] == 'L' && data[2] == 'V') {
return 0, errUnimplemented
}
if C_RTMP_IsConnected(s.rtmp) == 0 {
//if C.RTMP_IsConnected(s.rtmp) == 0 {
return 0, Err(1)
pkt := packet{
packetType: data[0],
bodySize: C_AMF_DecodeInt24(data[1:4]),
timestamp: C_AMF_DecodeInt24(data[4:7]) | uint32(data[7])<<24,
channel: chanSource,
info: s.streamID,
}
if C_RTMP_Write(s.rtmp, data) == 0 {
//if C.RTMP_Write(s.rtmp, (*byte)(unsafe.Pointer(&data[0])), int32(len(data))) == 0 {
return 0, Err(2)
pkt.resize(pkt.bodySize, headerSizeAuto)
copy(pkt.body, data[minDataSize:minDataSize+pkt.bodySize])
err := pkt.write(s, false)
if err != nil {
return 0, err
}
return len(data), nil
}
// I/O functions
// read from an RTMP connection. Sends a bytes received message if the
// number of bytes received (nBytesIn) is greater than the number sent
// (nBytesInSent) by 10% of the bandwidth.
func (s *Session) read(buf []byte) (int, error) {
err := s.link.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(s.link.timeout)))
if err != nil {
return 0, err
}
n, err := io.ReadFull(s.link.conn, buf)
if err != nil {
s.log(DebugLevel, pkg+"read failed", "error", err.Error())
return 0, err
}
s.nBytesIn += int32(n)
if s.nBytesIn > (s.nBytesInSent + s.clientBW/10) {
err := sendBytesReceived(s)
if err != nil {
return n, err // NB: we still read n bytes, even though send bytes failed
}
}
return n, nil
}
// write to an RTMP connection.
func (s *Session) write(buf []byte) (int, error) {
//ToDo: consider using a different timeout for writes than for reads
err := s.link.conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(s.link.timeout)))
if err != nil {
return 0, err
}
n, err := s.link.conn.Write(buf)
if err != nil {
s.log(WarnLevel, pkg+"write failed", "error", err.Error())
return 0, err
}
return n, nil
}
// isConnected returns true if the RTMP connection is up.
func (s *Session) isConnected() bool {
return s.link.conn != nil
}
// enableWrite enables the current session for writing.
func (s *Session) enableWrite() {
s.link.protocol |= featureWrite
}

View File

@ -1,151 +0,0 @@
/*
NAME
rtmp.go
DESCRIPTION
See Readme.md
AUTHORS
Saxon Nelson-Milton <saxon@ausocean.org>
Dan Kortschak <dan@ausocean.org>
LICENSE
rtmp.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
Derived from librtmp under the GNU Lesser General Public License 2.1
Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org
Copyright (C) 2008-2009 Andrej Stepanchuk
Copyright (C) 2009-2010 Howard Chu
*/
package rtmp
import (
"fmt"
"log"
"net"
"golang.org/x/sys/unix"
)
// int RTMP_Connect(RTMP *r, RTMPPacket* cp);
// rtmp.c +1032
func C_RTMP_Connect(r *C_RTMP, cp *C_RTMPPacket) (ok bool) {
if r.Link.hostname == "" {
return false
}
var hostname string
if r.Link.socksport != 0 {
hostname = fmt.Sprintf("%s:%d", r.Link.sockshost, r.Link.socksport)
} else {
hostname = fmt.Sprintf("%s:%d", r.Link.hostname, r.Link.port)
}
addr, err := net.ResolveTCPAddr("tcp4", hostname)
if err != nil {
return false
}
r.m_sb.conn, err = net.DialTCP("tcp4", nil, addr)
if err != nil {
return false
}
if r.Link.socksport != 0 {
if !C_SocksNegotiate(r, addr) {
return false
}
}
f, err := r.m_sb.conn.File()
if err != nil {
log.Printf("failed to get fd to set timeout: %v", err)
return false
}
tv := setTimeval(int(r.Link.timeout))
err = unix.SetsockoptTimeval(int(f.Fd()), unix.SOL_SOCKET, unix.SO_RCVTIMEO, &tv)
if err != nil {
log.Printf("failed to set timeout: %v", err)
}
r.m_bSendCounter = true
return C_RTMP_Connect1(r, cp)
}
// int SocksNegotiate(RTMP* r);
// rtmp.c +1062
func C_SocksNegotiate(r *C_RTMP, addr *net.TCPAddr) (ok bool) {
ip := addr.IP.To4()
packet := []byte{
0x4, // SOCKS version
0x1, // Establish a TCP/IP stream connection
byte(r.Link.port >> 8), byte(r.Link.port),
ip[0], ip[1], ip[2], ip[3],
0x0, // Empty user ID string
}
C_WriteN(r, packet)
if C_ReadN(r, packet[:8]) != 8 {
return false
}
if packet[0] == 0x0 && packet[1] == 0x5a {
return true
}
// TODO: use new logger here
log.Println("C_SocksNegotitate: SOCKS returned error code!")
return false
}
// int RTMPSockBuf_Fill(RTMPSockBuf* sb);
// rtmp.c +4253
func C_RTMPSockBuf_Fill(sb *C_RTMPSockBuf) int {
if sb.sb_size == 0 {
sb.sb_start = 0
}
n, err := sb.conn.Read(sb.sb_buf[sb.sb_start+sb.sb_size:])
if err != nil {
return 0
}
sb.sb_size += n
return n
}
// int RTMPSockBuf_Send(RTMPSockBuf* sb, const char* buf, int len);
// rtmp.c +4297
// TODO replace send with golang net connection send
func C_RTMPSockBuf_Send(sb *C_RTMPSockBuf, buf []byte) int32 {
n, err := sb.conn.Write(buf)
if err != nil {
return -1
}
return int32(n)
}
// int
// RTMPSockBuf_Close(RTMPSockBuf *sb)
// rtmp.c +4369
func C_RTMPSockBuf_Close(sb *C_RTMPSockBuf) int32 {
if sb.conn != nil {
err := sb.conn.Close()
sb.conn = nil
if err == nil {
return 1
}
}
return 0
}

View File

@ -1,7 +0,0 @@
package rtmp
import "golang.org/x/sys/unix"
func setTimeval(sec int) unix.Timeval {
return unix.Timeval{Sec: int64(sec)}
}

View File

@ -1,7 +0,0 @@
package rtmp
import "golang.org/x/sys/unix"
func setTimeval(sec int) unix.Timeval {
return unix.Timeval{Sec: int32(sec)}
}

View File

@ -60,7 +60,6 @@ type Encoder struct {
fps int
audio bool
video bool
lastTagSize int
header Header
start time.Time
}
@ -73,12 +72,11 @@ func NewEncoder(dst io.Writer, audio, video bool, fps int) (*Encoder, error) {
audio: audio,
video: video,
}
// TODO(kortschak): Do this lazily.
_, err := e.dst.Write(e.HeaderBytes())
_, err := dst.Write(e.HeaderBytes())
if err != nil {
return nil, err
}
return &e, nil
return &e, err
}
// HeaderBytes returns the a
@ -194,6 +192,17 @@ func (s *frameScanner) readByte() (b byte, ok bool) {
func (e *Encoder) Encode(frame []byte) error {
var frameType byte
var packetType byte
if e.start.IsZero() {
// This is the first frame, so write the PreviousTagSize0.
//
// See https://download.macromedia.com/f4v/video_file_format_spec_v10_1.pdf
// section E.3.
var zero [4]byte
_, err := e.dst.Write(zero[:])
if err != nil {
return err
}
}
timeStamp := e.getNextTimestamp()
// Do we have video to send off?
if e.video {

View File

@ -79,6 +79,8 @@ type Header struct {
}
func (h *Header) Bytes() []byte {
// See https://download.macromedia.com/f4v/video_file_format_spec_v10_1.pdf
// section E.2.
const headerLength = 9
b := [headerLength]byte{
0: 'F', 1: 'L', 2: 'V', 3: version,

View File

@ -30,35 +30,145 @@ package mts
import (
"io"
"sync"
"time"
"bitbucket.org/ausocean/av/stream/mts/pes"
"bitbucket.org/ausocean/av/stream/mts/psi"
)
const (
psiPacketSize = 184
psiSendCount = 7
// Some common manifestations of PSI
var (
// standardPat is a minimal PAT.
standardPat = psi.PSI{
Pf: 0x00,
Tid: 0x00,
Ssi: true,
Pb: false,
Sl: 0x0d,
Tss: &psi.TSS{
Tide: 0x01,
V: 0,
Cni: true,
Sn: 0,
Lsn: 0,
Sd: &psi.PAT{
Pn: 0x01,
Pmpid: 0x1000,
},
},
}
// standardPmt is a minimal PMT, without descriptors for time and location.
standardPmt = psi.PSI{
Pf: 0x00,
Tid: 0x02,
Ssi: true,
Sl: 0x12,
Tss: &psi.TSS{
Tide: 0x01,
V: 0,
Cni: true,
Sn: 0,
Lsn: 0,
Sd: &psi.PMT{
Pcrpid: 0x0100,
Pil: 0,
Essd: &psi.ESSD{
St: 0x1b,
Epid: 0x0100,
Esil: 0x00,
},
},
},
}
// standardPmtTimeLocation is a standard PMT with time and location
// descriptors, but time and location fields zeroed out.
standardPmtTimeLocation = psi.PSI{
Pf: 0x00,
Tid: 0x02,
Ssi: true,
Sl: 0x3e,
Tss: &psi.TSS{
Tide: 0x01,
V: 0,
Cni: true,
Sn: 0,
Lsn: 0,
Sd: &psi.PMT{
Pcrpid: 0x0100,
Pil: psi.PmtTimeLocationPil,
Pd: []psi.Desc{
{
Dt: psi.TimeDescTag,
Dl: psi.TimeDataSize,
Dd: make([]byte, psi.TimeDataSize),
},
{
Dt: psi.LocationDescTag,
Dl: psi.LocationDataSize,
Dd: make([]byte, psi.LocationDataSize),
},
},
Essd: &psi.ESSD{
St: 0x1b,
Epid: 0x0100,
Esil: 0x00,
},
},
},
}
)
type MetaData struct {
const (
psiSndCnt = 7
)
// timeLocation holds time and location data
type timeLocation struct {
mu sync.RWMutex
time uint64
location string
}
var metaData = MetaData{time: 0, location: ""}
func SetTimeStamp(t uint64) {
metaData.time = t
// SetTimeStamp sets the time field of a TimeLocation.
func (tl *timeLocation) SetTimeStamp(t uint64) {
tl.mu.Lock()
tl.time = t
tl.mu.Unlock()
}
func SetLocation(g string) {
metaData.location = g
// GetTimeStamp returns the location of a TimeLocation.
func (tl *timeLocation) TimeStamp() uint64 {
tl.mu.RLock()
t := tl.time
tl.mu.RUnlock()
return t
}
// SetLocation sets the location of a TimeLocation.
func (tl *timeLocation) SetLocation(l string) {
tl.mu.Lock()
tl.location = l
tl.mu.Unlock()
}
// GetLocation returns the location of a TimeLocation.
func (tl *timeLocation) Location() string {
tl.mu.RLock()
l := tl.location
tl.mu.RUnlock()
return l
}
// MetData will hold time and location data which may be set externally if
// this data is available. It is then inserted into mpegts packets outputted.
var MetaData timeLocation
var (
patTable = psi.StdPat.Bytes()
pmtTable = psi.StdPmtTimeLocation.Bytes()
patTable = standardPat.Bytes()
pmtTable = standardPmtTimeLocation.Bytes()
)
const (
@ -72,7 +182,7 @@ const (
// Time related constants.
const (
// ptsOffset is the offset added to the clock to determine
// the current presentation timestamp,
// the current presentation timestamp.
ptsOffset = 700 * time.Millisecond
// pcrFreq is the base Program Clock Reference frequency.
@ -86,6 +196,8 @@ type Encoder struct {
clock time.Duration
frameInterval time.Duration
ptsOffset time.Duration
tsSpace [PacketSize]byte
pesSpace [pes.MaxPesSize]byte
psiCount int
@ -119,9 +231,8 @@ const (
)
// generate handles the incoming data and generates equivalent mpegts packets -
// sending them to the output channel
// sending them to the output channel.
func (e *Encoder) Encode(nalu []byte) error {
// Prepare PES data.
pesPkt := pes.Packet{
StreamID: streamID,
@ -130,7 +241,7 @@ func (e *Encoder) Encode(nalu []byte) error {
Data: nalu,
HeaderLength: 5,
}
buf := pesPkt.Bytes()
buf := pesPkt.Bytes(e.pesSpace[:pes.MaxPesSize])
pusi := true
for len(buf) != 0 {
@ -158,7 +269,7 @@ func (e *Encoder) Encode(nalu []byte) error {
}
}
e.psiCount--
_, err := e.dst.Write(pkt.Bytes())
_, err := e.dst.Write(pkt.Bytes(e.tsSpace[:PacketSize]))
if err != nil {
return err
}
@ -172,54 +283,45 @@ func (e *Encoder) Encode(nalu []byte) error {
// writePSI creates mpegts with pat and pmt tables - with pmt table having updated
// location and time data.
func (e *Encoder) writePSI() error {
// Write PAT
// Write PAT.
patPkt := Packet{
PUSI: true,
PID: patPid,
CC: e.ccFor(patPid),
AFC: hasPayload,
Payload: addPadding(patTable),
Payload: patTable,
}
_, err := e.dst.Write(patPkt.Bytes())
_, err := e.dst.Write(patPkt.Bytes(e.tsSpace[:PacketSize]))
if err != nil {
return err
}
// Update pmt table time and location
err = psi.UpdateTime(pmtTable, metaData.time)
// Update pmt table time and location.
err = psi.UpdateTime(pmtTable, MetaData.TimeStamp())
if err != nil {
return err
}
err = psi.UpdateLocation(pmtTable, metaData.location)
err = psi.UpdateLocation(pmtTable, MetaData.Location())
if err != nil {
return nil
}
// Create mts packet from pmt table
// Create mts packet from pmt table.
pmtPkt := Packet{
PUSI: true,
PID: pmtPid,
CC: e.ccFor(pmtPid),
AFC: hasPayload,
Payload: addPadding(pmtTable),
Payload: pmtTable,
}
_, err = e.dst.Write(pmtPkt.Bytes())
_, err = e.dst.Write(pmtPkt.Bytes(e.tsSpace[:PacketSize]))
if err != nil {
return err
}
e.psiCount = psiSendCount
e.psiCount = psiSndCnt
return nil
}
// addPadding adds an appropriate amount of padding to a pat or pmt table for
// addition to an mpegts packet
func addPadding(d []byte) []byte {
for len(d) < psiPacketSize {
d = append(d, 0xff)
}
return d
}
// tick advances the clock one frame interval.
func (e *Encoder) tick() {
e.clock += e.frameInterval

View File

@ -33,8 +33,8 @@ import (
)
const (
mpegTsSize = 188
mpegtsPayloadSize = 176
PacketSize = 188
PayloadSize = 176
)
/*
@ -130,13 +130,13 @@ type Packet struct {
// FindPMT will take a clip of mpegts and try to find a PMT table - if one
// is found, then it is returned, otherwise nil and an error is returned.
func FindPMT(d []byte) (p []byte, err error) {
if len(d) < mpegTsSize {
if len(d) < PacketSize {
return nil, errors.New("Mmpegts data not of valid length")
}
for i := 0; i < len(d); i += mpegTsSize {
for i := 0; i < len(d); i += PacketSize {
pid := (uint16(d[i+1]&0x1f) << 8) | uint16(d[i+2])
if pid == pmtPid {
p = d[i+4 : i+mpegTsSize]
p = d[i+4 : i+PacketSize]
return
}
}
@ -146,9 +146,9 @@ func FindPMT(d []byte) (p []byte, err error) {
// FillPayload takes a channel and fills the packets Payload field until the
// channel is empty or we've the packet reaches capacity
func (p *Packet) FillPayload(data []byte) int {
currentPktLength := 6 + asInt(p.PCRF)*6 + asInt(p.OPCRF)*6 +
currentPktLen := 6 + asInt(p.PCRF)*6 + asInt(p.OPCRF)*6 +
asInt(p.SPF)*1 + asInt(p.TPDF)*1 + len(p.TPD)
p.Payload = make([]byte, mpegtsPayloadSize-currentPktLength)
p.Payload = make([]byte, PayloadSize-currentPktLen)
return copy(p.Payload, data)
}
@ -168,7 +168,11 @@ func asByte(b bool) byte {
// ToByteSlice interprets the fields of the ts packet instance and outputs a
// corresponding byte slice
func (p *Packet) Bytes() []byte {
func (p *Packet) Bytes(buf []byte) []byte {
if buf == nil || cap(buf) != PacketSize {
buf = make([]byte, 0, PacketSize)
}
buf = buf[:0]
stuffingLength := 182 - len(p.Payload) - len(p.TPD) - asInt(p.PCRF)*6 -
asInt(p.OPCRF)*6 - asInt(p.SPF)
var stuffing []byte
@ -179,7 +183,6 @@ func (p *Packet) Bytes() []byte {
stuffing[i] = 0xFF
}
afl := 1 + asInt(p.PCRF)*6 + asInt(p.OPCRF)*6 + asInt(p.SPF) + asInt(p.TPDF) + len(p.TPD) + len(stuffing)
buf := make([]byte, 0, mpegTsSize)
buf = append(buf, []byte{
0x47,
(asByte(p.TEI)<<7 | asByte(p.PUSI)<<6 | asByte(p.Priority)<<5 | byte((p.PID&0xFF00)>>8)),

View File

@ -26,7 +26,7 @@ LICENSE
package pes
const maxPesSize = 10000
const MaxPesSize = 10000
/*
The below data struct encapsulates the fields of an PES packet. Below is
@ -92,8 +92,11 @@ type Packet struct {
Data []byte // Pes packet data
}
func (p *Packet) Bytes() []byte {
buf := make([]byte, 0, maxPesSize)
func (p *Packet) Bytes(buf []byte) []byte {
if buf == nil || cap(buf) != MaxPesSize {
buf = make([]byte, 0, MaxPesSize)
}
buf = buf[:0]
buf = append(buf, []byte{
0x00, 0x00, 0x01,
p.StreamID,

View File

@ -46,7 +46,7 @@ func TestPesToByteSlice(t *testing.T) {
Stuff: []byte{0xFF, 0xFF},
Data: []byte{0xEA, 0x4B, 0x12},
}
got := pkt.Bytes()
got := pkt.Bytes(nil)
want := []byte{
0x00, // packet start code prefix byte 1
0x00, // packet start code prefix byte 2

71
stream/mts/psi/crc.go Normal file
View File

@ -0,0 +1,71 @@
/*
NAME
crc.go
DESCRIPTION
See Readme.md
AUTHOR
Dan Kortschak <dan@ausocean.org>
Saxon Milton <saxon@ausocean.org>
LICENSE
crc.go is Copyright (C) 2018 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package psi
import (
"encoding/binary"
"hash/crc32"
"math/bits"
)
// addCrc appends a crc table to a given psi table in bytes
func addCrc(out []byte) []byte {
t := make([]byte, len(out)+4)
copy(t, out)
updateCrc(t)
return t
}
// updateCrc updates the crc of bytes slice, writing the checksum into the last four bytes.
func updateCrc(b []byte) {
crc32 := crc32_Update(0xffffffff, crc32_MakeTable(bits.Reverse32(crc32.IEEE)), b[1:len(b)-4])
binary.BigEndian.PutUint32(b[len(b)-4:], crc32)
}
func crc32_MakeTable(poly uint32) *crc32.Table {
var t crc32.Table
for i := range t {
crc := uint32(i) << 24
for j := 0; j < 8; j++ {
if crc&0x80000000 != 0 {
crc = (crc << 1) ^ poly
} else {
crc <<= 1
}
}
t[i] = crc
}
return &t
}
func crc32_Update(crc uint32, tab *crc32.Table, p []byte) uint32 {
for _, v := range p {
crc = tab[byte(crc>>24)^v] ^ (crc << 8)
}
return crc
}

View File

@ -1,15 +1,15 @@
/*
NAME
op.go
helpers.go
DESCRIPTION
op.go provides functionality for editing and reading bytes slices
helpers.go provides functionality for editing and reading bytes slices
directly in order to insert/read timestamp and location data in psi.
AUTHOR
Saxon Milton <saxon@ausocean.org>
LICENSE
op.go is Copyright (C) 2018 the Australian Ocean Lab (AusOcean)
helpers.go is Copyright (C) 2018 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
@ -31,61 +31,58 @@ import (
"bytes"
"encoding/binary"
"errors"
"hash/crc32"
"math/bits"
)
// TimeBytes takes a timestamp as an uint64 and converts to an 8 byte slice -
// allows for updating of timestamp in pmt time descriptor.
func TimeBytes(t uint64) []byte {
var s [timeDataSize]byte
var s [TimeDataSize]byte
binary.BigEndian.PutUint64(s[:], t)
return s[:]
}
// HasTime takes a psi as a byte slice and checks to see if it has a time descriptor
// - if so return nil, otherwise return error
func HasTime(p []byte) error {
if p[timeTagIndx] != timeDescTag {
return errors.New("PSI does not contain a time descriptor, cannot update")
}
return nil
func HasTime(p []byte) bool {
return p[TimeTagIndx] == TimeDescTag
}
// HasLocation takes a psi as a byte slice and checks to see if it has a location descriptor
// - if so return nil, otherwise return error
func HasLocation(p []byte) error {
if p[locationTagIndx] != locationDescTag {
return errors.New("PSI does not contain a location descriptor, cannot update")
}
return nil
func HasLocation(p []byte) bool {
return p[LocationTagIndx] == LocationDescTag
}
// UpdateTime takes the byte slice representation of a psi-pmt as well as a time
// as an integer and attempts to update the time descriptor in the pmt with the
// given time if the time descriptor exists, otherwise an error is returned
func UpdateTime(dst []byte, t uint64) error {
err := HasTime(dst)
if err != nil {
return err
if !HasTime(dst) {
return errors.New("pmt does not have time descriptor, cannot update")
}
ts := TimeBytes(uint64(t))
for i := range dst[timeDataIndx : timeDataIndx+timeDataSize] {
dst[i+timeDataIndx] = ts[i]
for i := range dst[TimeDataIndx : TimeDataIndx+TimeDataSize] {
dst[i+TimeDataIndx] = ts[i]
}
dst = updateCrc(dst)
updateCrc(dst)
return nil
}
// SyntaxSecLenFrom takes a byte slice representation of a psi and extracts
// it's syntax section length
func SyntaxSecLenFrom(p []byte) (l uint8) {
l = uint8(p[syntaxSecLenIndx]) - crcSize
return
}
// TimeFrom takes a byte slice representation of a psi-pmt and extracts it's
// timestamp, returning as a uint64 if it exists, otherwise returning 0 and nil
// if it does not exist
func TimeFrom(p []byte) (t uint64, err error) {
err = HasTime(p)
if err != nil {
return 0, err
if !HasTime(p) {
return 0, errors.New("pmt does not have a time descriptor")
}
t = binary.BigEndian.Uint64(p[timeDataIndx : timeDataIndx+timeDataSize])
t = binary.BigEndian.Uint64(p[TimeDataIndx : TimeDataIndx+TimeDataSize])
return t, nil
}
@ -93,11 +90,10 @@ func TimeFrom(p []byte) (t uint64, err error) {
// timestamp, returning as a uint64 if it exists, otherwise returning 0 and nil
// if it does not exist
func LocationFrom(p []byte) (g string, err error) {
err = HasLocation(p)
if err != nil {
return "", err
if !HasLocation(p) {
return "", errors.New("pmt does not have location descriptor")
}
gBytes := p[locationDataIndx : locationDataIndx+locationDataSize]
gBytes := p[LocationDataIndx : LocationDataIndx+LocationDataSize]
gBytes = bytes.Trim(gBytes, "\x00")
g = string(gBytes)
return g, nil
@ -106,7 +102,7 @@ func LocationFrom(p []byte) (g string, err error) {
// LocationStrBytes take a string of location data and converts to a 32 byte slice -
// easy update of slice representation of a pmt with location descriptor
func LocationStrBytes(s string) []byte {
var b [locationDataSize]byte
var b [LocationDataSize]byte
copy(b[:], s)
return b[:]
}
@ -115,31 +111,32 @@ func LocationStrBytes(s string) []byte {
// descriptor and attempts to update the location data value with the passed string.
// If the psi does not contain a location descriptor, and error is returned.
func UpdateLocation(d []byte, s string) error {
err := HasLocation(d)
if err != nil {
return err
if !HasLocation(d) {
return errors.New("pmt does not location descriptor, cannot update")
}
gb := LocationStrBytes(s)
for i := range d[locationDataIndx : locationDataIndx+locationDataSize] {
d[i+locationDataIndx] = gb[i]
}
d = updateCrc(d)
copy(d[LocationDataIndx:LocationDataIndx+LocationDataSize], gb)
updateCrc(d)
return nil
}
// addCrc appends a crc table to a given psi table in bytes
func addCrc(out []byte) []byte {
out = append(out, make([]byte, 4)...)
out = updateCrc(out)
return out
func trimTo(d []byte, t byte) []byte {
for i, b := range d {
if b == t {
return d[:i]
}
}
return d
}
// updateCrc updates the crc of psi bytes slice that may have been modified
func updateCrc(out []byte) []byte {
crc32 := crc32_Update(0xffffffff, crc32_MakeTable(bits.Reverse32(crc32.IEEE)), out[1:len(out)-4])
out[len(out)-4] = byte(crc32 >> 24)
out[len(out)-3] = byte(crc32 >> 16)
out[len(out)-2] = byte(crc32 >> 8)
out[len(out)-1] = byte(crc32)
return out
// addPadding adds an appropriate amount of padding to a pat or pmt table for
// addition to an mpegts packet
func addPadding(d []byte) []byte {
t := make([]byte, PacketSize)
copy(t, d)
padding := t[len(d):]
for i := range padding {
padding[i] = 0xff
}
return d
}

View File

@ -26,8 +26,8 @@ LICENSE
package psi
import (
"hash/crc32"
const (
PacketSize = 184 // packet size of a psi.
)
// Lengths of section definitions
@ -42,24 +42,30 @@ const (
// Table Type IDs
const (
PATTableID = 0x00
PMTTableID = 0x02
patID = 0x00
pmtID = 0x02
)
// Consts relating to time description
const (
timeDescTag = 234
timeTagIndx = 13
timeDataIndx = 15
timeDataSize = 8 // bytes, because time is stored in uint64
TimeDescTag = 234
TimeTagIndx = 13
TimeDataIndx = 15
TimeDataSize = 8 // bytes, because time is stored in uint64
)
// Consts relating to location description
const (
locationDescTag = 235
locationTagIndx = 23
locationDataIndx = 25
locationDataSize = 32 // bytes
LocationDescTag = 235
LocationTagIndx = 23
LocationDataIndx = 25
LocationDataSize = 32 // bytes
)
// Other misc consts
const (
syntaxSecLenIndx = 3
crcSize = 4
)
// Program specific information
@ -81,21 +87,21 @@ type TSS struct {
Cni bool // Current/next indicator
Sn byte // Section number
Lsn byte // Last section number
Sd SD // Specific data PAT/PMT
Sd SpecificData // Specific data PAT/PMT
}
// Specific Data, (could be PAT or PMT)
type SD interface {
type SpecificData interface {
Bytes() []byte
}
// Program association table, implements SD
// Program association table, implements SpecificData
type PAT struct {
Pn uint16 // Program Number
Pmpid uint16 // Program map PID
}
// Program mapping table, implements SD
// Program mapping table, implements SpecificData
type PMT struct {
Pcrpid uint16 // Program clock reference pid
Pil uint16 // Program info length
@ -118,109 +124,6 @@ type Desc struct {
Dd []byte // Descriptor data
}
// ReadPSI creates a PSI data structure from a given byte slice that represents a PSI
func ReadPSI(data []byte) *PSI {
psi := PSI{}
pos := 0
psi.Pf = data[pos]
if psi.Pf != 0 {
panic("No support for pointer filler bytes")
}
psi.Tid = data[pos]
pos++
psi.Ssi = byteToBool(data[pos] & 0x80)
psi.Pb = byteToBool(data[pos] & 0x40)
psi.Sl = uint16(data[pos]&0x03)<<8 | uint16(data[pos+1])
pos += 2
psi.Tss = readTSS(data[pos:], &psi)
return &psi
}
// ReadTSS creates a TSS data structure from a given byte slice that represents a TSS
func readTSS(data []byte, p *PSI) *TSS {
tss := TSS{}
pos := 0
tss.Tide = uint16(data[pos])<<8 | uint16(data[pos+1])
pos += 2
tss.V = (data[pos] & 0x3e) >> 1
tss.Cni = byteToBool(data[pos] & 0x01)
pos++
tss.Sn = data[pos]
pos++
tss.Lsn = data[pos]
pos++
switch p.Tid {
case PATTableID:
tss.Sd = readPAT(data[pos:], &tss)
case PMTTableID:
tss.Sd = readPMT(data[pos:], &tss)
default:
panic("Can't yet deal with tables that are not PAT or PMT")
}
return &tss
}
// readPAT creates a pat struct based on a bytes slice representing a pat
func readPAT(data []byte, p *TSS) *PAT {
pat := PAT{}
pos := 0
pat.Pn = uint16(data[pos])<<8 | uint16(data[pos+1])
pos += 2
pat.Pmpid = uint16(data[pos]&0x1f)<<8 | uint16(data[pos+1])
return &pat
}
// readPMT creates a pmt struct based on a bytes slice that represents a pmt
func readPMT(data []byte, p *TSS) *PAT {
pmt := PMT{}
pos := 0
pmt.Pcrpid = uint16(data[pos]&0x1f)<<8 | uint16(data[pos+1])
pos += 2
pmt.Pil = uint16(data[pos]&0x03)<<8 | uint16(data[pos+1])
pos += 2
if pmt.Pil != 0 {
pmt.Pd = readDescs(data[pos:], int(pmt.Pil))
}
pos += int(pmt.Pil)
// TODO Read ES stuff
pmt.Essd = readEssd(data[pos:])
return nil
}
// readDescs reads provides a slice of Descs given a byte slice that represents Descs
// and the no of bytes that the descs accumilate
func readDescs(data []byte, descLen int) (o []Desc) {
pos := 0
o = make([]Desc, 1)
o[0].Dt = data[pos]
pos++
o[0].Dl = data[pos]
pos++
o[0].Dd = make([]byte, o[0].Dl)
for i := 0; i < int(o[0].Dl); i++ {
o[0].Dd[i] = data[pos]
pos++
}
if 2+len(o[0].Dd) != descLen {
panic("No support for reading more than one descriptor")
}
return
}
// readEESD creates an ESSD struct based on a bytes slice that represents ESSD
func readEssd(data []byte) *ESSD {
essd := ESSD{}
pos := 0
essd.St = data[pos]
pos++
essd.Epid = uint16(data[pos]&0x1f)<<8 | uint16(data[pos+1])
pos += 2
essd.Esil = uint16(data[pos]&0x03)<<8 | uint16(data[pos+1])
pos += 2
essd.Esd = readDescs(data[pos:], int(essd.Esil))
return &essd
}
// Bytes outputs a byte slice representation of the PSI
func (p *PSI) Bytes() []byte {
out := make([]byte, 4)
@ -233,6 +136,7 @@ func (p *PSI) Bytes() []byte {
out[3] = byte(p.Sl)
out = append(out, p.Tss.Bytes()...)
out = addCrc(out)
out = addPadding(out)
return out
}
@ -241,7 +145,7 @@ func (t *TSS) Bytes() []byte {
out := make([]byte, TSSDefLen)
out[0] = byte(t.Tide >> 8)
out[1] = byte(t.Tide)
out[2] = 0xc0 | (0x3e & (t.V << 1)) | (0x01 & boolToByte(t.Cni))
out[2] = 0xc0 | (0x3e & (t.V << 1)) | (0x01 & asByte(t.Cni))
out[3] = t.Sn
out[4] = t.Lsn
out = append(out, t.Sd.Bytes()...)
@ -295,39 +199,9 @@ func (e *ESSD) Bytes() []byte {
return out
}
func boolToByte(b bool) (o byte) {
func asByte(b bool) byte {
if b {
o = 0x01
return 0x01
}
return
}
func byteToBool(b byte) (o bool) {
if b != 0 {
o = true
}
return
}
func crc32_MakeTable(poly uint32) *crc32.Table {
var t crc32.Table
for i := range t {
crc := uint32(i) << 24
for j := 0; j < 8; j++ {
if crc&0x80000000 != 0 {
crc = (crc << 1) ^ poly
} else {
crc <<= 1
}
}
t[i] = crc
}
return &t
}
func crc32_Update(crc uint32, tab *crc32.Table, p []byte) uint32 {
for _, v := range p {
crc = tab[byte(crc>>24)^v] ^ (crc << 8)
}
return crc
return 0x00
}

View File

@ -31,10 +31,94 @@ import (
"testing"
)
// Some common manifestations of PSI
var (
// standardPat is a minimal PAT.
standardPat = PSI{
Pf: 0x00,
Tid: 0x00,
Ssi: true,
Pb: false,
Sl: 0x0d,
Tss: &TSS{
Tide: 0x01,
V: 0,
Cni: true,
Sn: 0,
Lsn: 0,
Sd: &PAT{
Pn: 0x01,
Pmpid: 0x1000,
},
},
}
// standardPmt is a minimal PMT, without time and location descriptors.
standardPmt = PSI{
Pf: 0x00,
Tid: 0x02,
Ssi: true,
Sl: 0x12,
Tss: &TSS{
Tide: 0x01,
V: 0,
Cni: true,
Sn: 0,
Lsn: 0,
Sd: &PMT{
Pcrpid: 0x0100, // wrong
Pil: 0,
Essd: &ESSD{
St: 0x1b,
Epid: 0x0100,
Esil: 0x00,
},
},
},
}
// standardPmtTimeLocation is a standard PMT with time and location
// descriptors, but time and location fields zeroed out.
standardPmtTimeLocation = PSI{
Pf: 0x00,
Tid: 0x02,
Ssi: true,
Sl: 0x3e,
Tss: &TSS{
Tide: 0x01,
V: 0,
Cni: true,
Sn: 0,
Lsn: 0,
Sd: &PMT{
Pcrpid: 0x0100,
Pil: PmtTimeLocationPil,
Pd: []Desc{
{
Dt: TimeDescTag,
Dl: TimeDataSize,
Dd: make([]byte, TimeDataSize),
},
{
Dt: LocationDescTag,
Dl: LocationDataSize,
Dd: make([]byte, LocationDataSize),
},
},
Essd: &ESSD{
St: 0x1b,
Epid: 0x0100,
Esil: 0x00,
},
},
},
}
)
// Times as ints for testing
const (
tstTime1 = 1235367435 // 0x49A2360B
tstTime2 = 1735357535 // 0x676F745F
tstTime1 = 1235367435 // 0x49a2360b
tstTime2 = 1735357535 // 0x676f745f
)
// GPS string for testing
@ -46,7 +130,7 @@ const (
// err message
const (
errCmp = "Incorrect output, for: %v wanted: %v, got: %v"
errCmp = "Incorrect output, for: %v \nwant: %v, \ngot: %v"
)
// Test time to bytes test Data
@ -60,11 +144,11 @@ var (
var (
pmtTimeLocationBytesPart1 = []byte{
0x00, 0x02, 0xb0, 0x12, 0x00, 0x01, 0xc1, 0x00, 0x00, 0xe1, 0x00, 0xf0, 0x0a,
byte(timeDescTag), // Descriptor tag for timestamp
byte(timeDataSize), // Length of bytes to follow
0x00, 0x00, 0x00, 0x00, 0x67, 0x6F, 0x74, 0x5F, // Timestamp data
byte(locationDescTag), // Descriptor tag for location
byte(locationDataSize), // Length of bytes to follow
TimeDescTag, // Descriptor tag for timestamp
TimeDataSize, // Length of bytes to follow
0x00, 0x00, 0x00, 0x00, 0x67, 0x6f, 0x74, 0x5f, // Timestamp data
LocationDescTag, // Descriptor tag for location
LocationDataSize, // Length of bytes to follow
}
pmtTimeLocationBytesPart2 = []byte{
0x1b, 0xe1, 0x00, 0xf0, 0x00,
@ -75,18 +159,18 @@ var (
// Bytes representing pmt with tstTime1
pmtTimeBytes1 = []byte{
0x00, 0x02, 0xb0, 0x12, 0x00, 0x01, 0xc1, 0x00, 0x00, 0xe1, 0x00, 0xf0, 0x0a,
byte(timeDescTag), // Descriptor tag
byte(timeDataSize), // Length of bytes to follow
0x00, 0x00, 0x00, 0x00, 0x49, 0xA2, 0x36, 0x0B, // timestamp
TimeDescTag, // Descriptor tag
TimeDataSize, // Length of bytes to follow
0x00, 0x00, 0x00, 0x00, 0x49, 0xa2, 0x36, 0x0b, // timestamp
0x1b, 0xe1, 0x00, 0xf0, 0x00,
}
// Bytes representing pmt with tstTime 2
pmtTimeBytes2 = []byte{
0x00, 0x02, 0xb0, 0x12, 0x00, 0x01, 0xc1, 0x00, 0x00, 0xe1, 0x00, 0xf0, 0x0a,
byte(timeDescTag), // Descriptor tag
byte(timeDataSize), // Length of bytes to follow
0x00, 0x00, 0x00, 0x00, 0x67, 0x6F, 0x74, 0x5F, // timestamp
TimeDescTag, // Descriptor tag
TimeDataSize, // Length of bytes to follow
0x00, 0x00, 0x00, 0x00, 0x67, 0x6f, 0x74, 0x5f, // timestamp
0x1b, 0xe1, 0x00, 0xf0, 0x00,
}
@ -106,15 +190,15 @@ var bytesTests = []struct {
// Pat test
{
name: "pat Bytes()",
input: StdPat,
want: StdPatBytes,
input: standardPat,
want: StandardPatBytes,
},
// Pmt test data no descriptor
{
name: "pmt to Bytes() without descriptors",
input: StdPmt,
want: StdPmtBytes,
input: standardPmt,
want: StandardPmtBytes,
},
// Pmt with time descriptor
@ -124,9 +208,9 @@ var bytesTests = []struct {
Pf: 0x00,
Tid: 0x02,
Ssi: true,
Sl: uint16(0x12),
Sl: 0x12,
Tss: &TSS{
Tide: uint16(0x01),
Tide: 0x01,
V: 0,
Cni: true,
Sn: 0,
@ -135,9 +219,9 @@ var bytesTests = []struct {
Pcrpid: 0x0100, // wrong
Pil: 10,
Pd: []Desc{
Desc{
Dt: byte(timeDescTag),
Dl: byte(timeDataSize),
{
Dt: TimeDescTag,
Dl: TimeDataSize,
Dd: TimeBytes(tstTime1),
},
},
@ -159,9 +243,9 @@ var bytesTests = []struct {
Pf: 0x00,
Tid: 0x02,
Ssi: true,
Sl: uint16(0x12),
Sl: 0x12,
Tss: &TSS{
Tide: uint16(0x01),
Tide: 0x01,
V: 0,
Cni: true,
Sn: 0,
@ -170,14 +254,14 @@ var bytesTests = []struct {
Pcrpid: 0x0100, // wrong
Pil: 10,
Pd: []Desc{
Desc{
Dt: byte(timeDescTag),
Dl: byte(timeDataSize),
{
Dt: TimeDescTag,
Dl: TimeDataSize,
Dd: TimeBytes(tstTime2),
},
Desc{
Dt: byte(locationDescTag),
Dl: byte(locationDataSize),
{
Dt: LocationDescTag,
Dl: LocationDataSize,
Dd: LocationStrBytes(locationTstStr1),
},
},
@ -198,9 +282,7 @@ var bytesTests = []struct {
func TestBytes(t *testing.T) {
for _, test := range bytesTests {
got := test.input.Bytes()
// Remove crc32s
got = got[:len(got)-4]
if !bytes.Equal(got, test.want) {
if !bytes.Equal(got, addPadding(addCrc(test.want))) {
t.Errorf("unexpected error for test %v: got:%v want:%v", test.name, got,
test.want)
}
@ -243,7 +325,7 @@ func TestTimeGet(t *testing.T) {
// TestLocationGet checks that we can correctly get location data from a pmt table
func TestLocationGet(t *testing.T) {
pb := StdPmtTimeLocation.Bytes()
pb := standardPmtTimeLocation.Bytes()
err := UpdateLocation(pb, locationTstStr1)
if err != nil {
t.Errorf("Error for TestLocationGet UpdateLocation(pb, locationTstStr1): %v", err)
@ -272,6 +354,15 @@ func TestLocationUpdate(t *testing.T) {
}
}
func TestTrim(t *testing.T) {
test := []byte{0xa3, 0x01, 0x03, 0x00, 0xde}
want := []byte{0xa3, 0x01, 0x03}
got := trimTo(test, 0x00)
if !bytes.Equal(got, want) {
t.Errorf(errCmp, "TestTrim", want, got)
}
}
// buildPmtTimeLocationBytes is a helper function to help construct the byte slices
// for pmts with time and location, as the location data field is 32 bytes, i.e. quite large
// to type out

View File

@ -27,95 +27,12 @@ LICENSE
package psi
const (
pmtTimeLocationPil = 44
)
// Some common manifestations of PSI
var (
// PSI struct to represent basic pat
StdPat = PSI{
Pf: 0x00,
Tid: 0x00,
Ssi: true,
Pb: false,
Sl: uint16(0x0d),
Tss: &TSS{
Tide: uint16(0x01),
V: 0,
Cni: true,
Sn: 0,
Lsn: 0,
Sd: &PAT{
Pn: uint16(0x01),
Pmpid: uint16(0x1000),
},
},
}
// PSI struct to represent basic pmt without descriptors for time and location
StdPmt = PSI{
Pf: 0x00,
Tid: 0x02,
Ssi: true,
Sl: uint16(0x12),
Tss: &TSS{
Tide: uint16(0x01),
V: 0,
Cni: true,
Sn: 0,
Lsn: 0,
Sd: &PMT{
Pcrpid: 0x0100, // wrong
Pil: 0,
Essd: &ESSD{
St: 0x1b,
Epid: 0x0100,
Esil: 0x00,
},
},
},
}
// Std pmt with time and location descriptors, time and location fields are zeroed out
StdPmtTimeLocation = PSI{
Pf: 0x00,
Tid: 0x02,
Ssi: true,
Sl: uint16(0x3e),
Tss: &TSS{
Tide: uint16(0x01),
V: 0,
Cni: true,
Sn: 0,
Lsn: 0,
Sd: &PMT{
Pcrpid: 0x0100,
Pil: pmtTimeLocationPil,
Pd: []Desc{
Desc{
Dt: byte(timeDescTag),
Dl: byte(timeDataSize),
Dd: make([]byte, timeDataSize),
},
Desc{
Dt: byte(locationDescTag),
Dl: byte(locationDataSize),
Dd: make([]byte, locationDataSize),
},
},
Essd: &ESSD{
St: 0x1b,
Epid: 0x0100,
Esil: 0x00,
},
},
},
}
PmtTimeLocationPil = 44
)
// Std PSI in bytes form
var (
StdPatBytes = []byte{
StandardPatBytes = []byte{
0x00, // pointer
// ---- section included in data sent to CRC32 during check
@ -136,7 +53,7 @@ var (
// 0x2a, 0xb1, 0x04, 0xb2, // CRC
// ----
}
StdPmtBytes = []byte{
StandardPmtBytes = []byte{
0x00, // pointer
// ---- section included in data sent to CRC32 during check