Merge branch 'master' into send-retry-flag

This commit is contained in:
saxon 2019-02-08 18:18:01 +10:30
commit c5d93b53d5
26 changed files with 1990 additions and 626 deletions

View File

@ -37,6 +37,8 @@ import (
"time" "time"
"bitbucket.org/ausocean/av/revid" "bitbucket.org/ausocean/av/revid"
"bitbucket.org/ausocean/av/stream/mts"
"bitbucket.org/ausocean/av/stream/mts/meta"
"bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/iot/pi/smartlogger" "bitbucket.org/ausocean/iot/pi/smartlogger"
"bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/logger"
@ -50,6 +52,13 @@ const (
defaultLogVerbosity = logger.Debug defaultLogVerbosity = logger.Debug
) )
// Revid modes
const (
normal = "Normal"
paused = "Paused"
burst = "Burst"
)
// Other misc consts // Other misc consts
const ( const (
netSendRetryTime = 5 * time.Second netSendRetryTime = 5 * time.Second
@ -65,30 +74,36 @@ var canProfile = true
// The logger that will be used throughout // The logger that will be used throughout
var log *logger.Logger var log *logger.Logger
const (
metaPreambleKey = "copyright"
metaPreambleData = "ausocean.org/license/content2019"
)
func main() { func main() {
mts.Meta = meta.NewWith([][2]string{{metaPreambleKey, metaPreambleData}})
useNetsender := flag.Bool("NetSender", false, "Are we checking vars through netsender?") useNetsender := flag.Bool("NetSender", false, "Are we checking vars through netsender?")
runDurationPtr := flag.Duration("runDuration", defaultRunDuration, "How long do you want revid to run for?") runDurationPtr := flag.Duration("runDuration", defaultRunDuration, "How long do you want revid to run for?")
cfg := handleFlags() cfg := handleFlags()
if !*useNetsender { if !*useNetsender {
// run revid for the specified duration rv, err := revid.New(cfg, nil)
rv, _, err := startRevid(nil, cfg)
if err != nil { if err != nil {
cfg.Logger.Log(logger.Fatal, pkg+"failed to initialiase revid", "error", err.Error())
}
if err = rv.Start(); err != nil {
cfg.Logger.Log(logger.Fatal, pkg+"failed to start revid", "error", err.Error()) cfg.Logger.Log(logger.Fatal, pkg+"failed to start revid", "error", err.Error())
} }
time.Sleep(*runDurationPtr) time.Sleep(*runDurationPtr)
err = stopRevid(rv) if err = rv.Stop(); err != nil {
if err != nil {
cfg.Logger.Log(logger.Error, pkg+"failed to stop revid before program termination", "error", err.Error()) cfg.Logger.Log(logger.Error, pkg+"failed to stop revid before program termination", "error", err.Error())
} }
return return
} }
err := run(nil, cfg) if err := run(cfg); err != nil {
if err != nil {
log.Log(logger.Fatal, pkg+"failed to run revid", "error", err.Error()) log.Log(logger.Fatal, pkg+"failed to run revid", "error", err.Error())
os.Exit(1)
} }
} }
@ -102,8 +117,6 @@ func handleFlags() revid.Config {
inputPtr = flag.String("Input", "", "The input type: Raspivid, File, Webcam") inputPtr = flag.String("Input", "", "The input type: Raspivid, File, Webcam")
inputCodecPtr = flag.String("InputCodec", "", "The codec of the input: H264, Mjpeg") inputCodecPtr = flag.String("InputCodec", "", "The codec of the input: H264, Mjpeg")
output1Ptr = flag.String("Output1", "", "The first output type: Http, Rtmp, File, Udp, Rtp")
output2Ptr = flag.String("Output2", "", "The second output type: Http, Rtmp, File, Udp, Rtp")
rtmpMethodPtr = flag.String("RtmpMethod", "", "The method used to send over rtmp: Ffmpeg, Librtmp") rtmpMethodPtr = flag.String("RtmpMethod", "", "The method used to send over rtmp: Ffmpeg, Librtmp")
packetizationPtr = flag.String("Packetization", "", "The method of data packetisation: Flv, Mpegts, None") packetizationPtr = flag.String("Packetization", "", "The method of data packetisation: Flv, Mpegts, None")
quantizePtr = flag.Bool("Quantize", false, "Quantize input (non-variable bitrate)") quantizePtr = flag.Bool("Quantize", false, "Quantize input (non-variable bitrate)")
@ -127,6 +140,9 @@ func handleFlags() revid.Config {
sendRetryPtr = flag.Bool("send-retry", false, "If true, we retry send on a failure, otherwise drop the data.") sendRetryPtr = flag.Bool("send-retry", false, "If true, we retry send on a failure, otherwise drop the data.")
) )
var outputs flagStrings
flag.Var(&outputs, "Output", "output type: Http, Rtmp, File, Udp, Rtp (may be used more than once)")
flag.Parse() flag.Parse()
log = logger.New(defaultLogVerbosity, &smartlogger.New(*logPathPtr).LogRoller) log = logger.New(defaultLogVerbosity, &smartlogger.New(*logPathPtr).LogRoller)
@ -168,40 +184,24 @@ func handleFlags() revid.Config {
log.Log(logger.Error, pkg+"bad input codec argument") log.Log(logger.Error, pkg+"bad input codec argument")
} }
switch *output1Ptr { for _, o := range outputs {
switch o {
case "File": case "File":
cfg.Output1 = revid.File cfg.Outputs = append(cfg.Outputs, revid.File)
case "Http": case "Http":
cfg.Output1 = revid.Http cfg.Outputs = append(cfg.Outputs, revid.Http)
case "Rtmp": case "Rtmp":
cfg.Output1 = revid.Rtmp cfg.Outputs = append(cfg.Outputs, revid.Rtmp)
case "FfmpegRtmp": case "FfmpegRtmp":
cfg.Output1 = revid.FfmpegRtmp cfg.Outputs = append(cfg.Outputs, revid.FfmpegRtmp)
case "Udp": case "Udp":
cfg.Output1 = revid.Udp cfg.Outputs = append(cfg.Outputs, revid.Udp)
case "Rtp": case "Rtp":
cfg.Output1 = revid.Rtp cfg.Outputs = append(cfg.Outputs, revid.Rtp)
case "": case "":
default: default:
log.Log(logger.Error, pkg+"bad output 1 argument") log.Log(logger.Error, pkg+"bad output argument", "arg", o)
} }
switch *output2Ptr {
case "File":
cfg.Output2 = revid.File
case "Http":
cfg.Output2 = revid.Http
case "Rtmp":
cfg.Output2 = revid.Rtmp
case "FfmpegRtmp":
cfg.Output2 = revid.FfmpegRtmp
case "Udp":
cfg.Output2 = revid.Udp
case "Rtp":
cfg.Output2 = revid.Rtp
case "":
default:
log.Log(logger.Error, pkg+"bad output 2 argument")
} }
switch *rtmpMethodPtr { switch *rtmpMethodPtr {
@ -261,66 +261,97 @@ func handleFlags() revid.Config {
} }
// initialize then run the main NetSender client // initialize then run the main NetSender client
func run(rv *revid.Revid, cfg revid.Config) error { func run(cfg revid.Config) error {
// initialize NetSender and use NetSender's logger
//config.Logger = netsender.Logger()
log.Log(logger.Info, pkg+"running in NetSender mode") log.Log(logger.Info, pkg+"running in NetSender mode")
var vars map[string]string
// initialize NetSender and use NetSender's logger
var ns netsender.Sender var ns netsender.Sender
err := ns.Init(log, nil, nil, nil) err := ns.Init(log, nil, nil, nil)
if err != nil { if err != nil {
return err return err
} }
vars, _ := ns.Vars()
vars, _ = ns.Vars()
vs := ns.VarSum() vs := ns.VarSum()
paused := false
if vars["mode"] == "Paused" { rv, err := revid.New(cfg, &ns)
paused = true if err != nil {
log.Log(logger.Fatal, pkg+"could not initialise revid", "error", err.Error())
} }
if !paused {
rv, cfg, err = updateRevid(&ns, rv, cfg, vars, false) // Update revid to get latest config settings from netreceiver.
err = rv.Update(vars)
if err != nil {
return err
}
// If mode on netreceiver isn't paused then we can start revid.
if ns.Mode() != paused && ns.Mode() != burst {
err = rv.Start()
if err != nil { if err != nil {
return err return err
} }
} }
if ns.Mode() == burst {
ns.SetMode(paused, &vs)
}
for { for {
if err := send(&ns, rv); err != nil { // TODO(saxon): replace this call with call to ns.Run().
log.Log(logger.Error, pkg+"polling failed", "error", err.Error()) err = send(&ns, rv)
if err != nil {
log.Log(logger.Error, pkg+"Run Failed. Retrying...", "error", err.Error())
time.Sleep(netSendRetryTime) time.Sleep(netSendRetryTime)
continue continue
} }
if vs != ns.VarSum() { // If var sum hasn't change we continue
// vars changed if vs == ns.VarSum() {
vars, err := ns.Vars() goto sleep
}
vars, err = ns.Vars()
if err != nil { if err != nil {
log.Log(logger.Error, pkg+"netSender failed to get vars", "error", err.Error()) log.Log(logger.Error, pkg+"netSender failed to get vars", "error", err.Error())
time.Sleep(netSendRetryTime) time.Sleep(netSendRetryTime)
continue continue
} }
vs = ns.VarSum() vs = ns.VarSum()
if vars["mode"] == "Paused" {
if !paused { err = rv.Update(vars)
log.Log(logger.Info, pkg+"pausing revid")
err = stopRevid(rv)
if err != nil {
log.Log(logger.Error, pkg+"failed to stop revide", "error", err.Error())
continue
}
paused = true
}
} else {
rv, cfg, err = updateRevid(&ns, rv, cfg, vars, !paused)
if err != nil { if err != nil {
return err return err
} }
if paused {
paused = false switch ns.Mode() {
case paused:
case normal:
err = rv.Start()
if err != nil {
return err
} }
case burst:
log.Log(logger.Info, pkg+"Starting burst...")
err = rv.Start()
if err != nil {
return err
} }
time.Sleep(time.Duration(rv.Config().BurstPeriod) * time.Second)
log.Log(logger.Info, pkg+"Stopping burst...")
err = rv.Stop()
if err != nil {
return err
}
ns.SetMode(paused, &vs)
}
sleep:
sleepTime, err := strconv.Atoi(ns.Param("mp"))
if err != nil {
return err
} }
sleepTime, _ := strconv.Atoi(ns.Param("mp"))
time.Sleep(time.Duration(sleepTime) * time.Second) time.Sleep(time.Duration(sleepTime) * time.Second)
} }
} }
@ -348,131 +379,27 @@ func send(ns *netsender.Sender, rv *revid.Revid) error {
return nil return nil
} }
// wrappers for stopping and starting revid // flagStrings implements an appending string set flag.
func startRevid(ns *netsender.Sender, cfg revid.Config) (*revid.Revid, revid.Config, error) { type flagStrings []string
rv, err := revid.New(cfg, ns)
if err != nil { func (v *flagStrings) String() string {
return nil, cfg, err if *v != nil {
return strings.Join(*v, ",")
} }
err = rv.Start() return ""
return rv, cfg, err
} }
func stopRevid(rv *revid.Revid) error { func (v *flagStrings) Set(s string) error {
err := rv.Stop() if s == "" {
if err != nil { return nil
return err
} }
for _, e := range *v {
// FIXME(kortschak): Is this waiting on completion of work? if e == s {
// Use a wait group and Wait method if it is. return nil
time.Sleep(revidStopTime) }
}
*v = append(*v, s)
return nil return nil
} }
func updateRevid(ns *netsender.Sender, rv *revid.Revid, cfg revid.Config, vars map[string]string, stop bool) (*revid.Revid, revid.Config, error) { func (v *flagStrings) Get() interface{} { return *v }
if stop {
err := stopRevid(rv)
if err != nil {
return nil, cfg, err
}
}
//look through the vars and update revid where needed
for key, value := range vars {
switch key {
case "Output":
switch value {
case "File":
cfg.Output1 = revid.File
case "Http":
cfg.Output1 = revid.Http
case "Rtmp":
cfg.Output1 = revid.Rtmp
case "FfmpegRtmp":
cfg.Output1 = revid.FfmpegRtmp
default:
log.Log(logger.Warning, pkg+"invalid Output1 param", "value", value)
continue
}
case "FramesPerClip":
f, err := strconv.ParseUint(value, 10, 0)
if err != nil {
log.Log(logger.Warning, pkg+"invalid framesperclip param", "value", value)
break
}
cfg.FramesPerClip = uint(f)
case "RtmpUrl":
cfg.RtmpUrl = value
case "Bitrate":
r, err := strconv.ParseUint(value, 10, 0)
if err != nil {
log.Log(logger.Warning, pkg+"invalid framerate param", "value", value)
break
}
cfg.Bitrate = uint(r)
case "OutputFileName":
cfg.OutputFileName = value
case "InputFileName":
cfg.InputFileName = value
case "Height":
h, err := strconv.ParseUint(value, 10, 0)
if err != nil {
log.Log(logger.Warning, pkg+"invalid height param", "value", value)
break
}
cfg.Height = uint(h)
case "Width":
w, err := strconv.ParseUint(value, 10, 0)
if err != nil {
log.Log(logger.Warning, pkg+"invalid width param", "value", value)
break
}
cfg.Width = uint(w)
case "FrameRate":
r, err := strconv.ParseUint(value, 10, 0)
if err != nil {
log.Log(logger.Warning, pkg+"invalid framerate param", "value", value)
break
}
cfg.FrameRate = uint(r)
case "HttpAddress":
cfg.HttpAddress = value
case "Quantization":
q, err := strconv.ParseUint(value, 10, 0)
if err != nil {
log.Log(logger.Warning, pkg+"invalid quantization param", "value", value)
break
}
cfg.Quantization = uint(q)
case "IntraRefreshPeriod":
p, err := strconv.ParseUint(value, 10, 0)
if err != nil {
log.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value)
break
}
cfg.IntraRefreshPeriod = uint(p)
case "HorizontalFlip":
switch strings.ToLower(value) {
case "true":
cfg.FlipHorizontal = true
case "false":
cfg.FlipHorizontal = false
default:
log.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value)
}
case "VerticalFlip":
switch strings.ToLower(value) {
case "true":
cfg.FlipVertical = true
case "false":
cfg.FlipVertical = false
default:
log.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value)
}
default:
}
}
return startRevid(ns, cfg)
}

View File

@ -0,0 +1,144 @@
/*
NAME
decode.go
DESCRIPTION
decode.go provides functionality for the decoding of FLAC compressed audio
AUTHOR
Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE
decode.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package flac
import (
"bytes"
"errors"
"io"
"github.com/go-audio/audio"
"github.com/go-audio/wav"
"github.com/mewkiz/flac"
)
const wavFormat = 1
// writeSeeker implements a memory based io.WriteSeeker.
type writeSeeker struct {
buf []byte
pos int
}
// Bytes returns the bytes contained in the writeSeekers buffer.
func (ws *writeSeeker) Bytes() []byte {
return ws.buf
}
// Write writes len(p) bytes from p to the writeSeeker's buf and returns the number
// of bytes written. If less than len(p) bytes are written, an error is returned.
func (ws *writeSeeker) Write(p []byte) (n int, err error) {
minCap := ws.pos + len(p)
if minCap > cap(ws.buf) { // Make sure buf has enough capacity:
buf2 := make([]byte, len(ws.buf), minCap+len(p)) // add some extra
copy(buf2, ws.buf)
ws.buf = buf2
}
if minCap > len(ws.buf) {
ws.buf = ws.buf[:minCap]
}
copy(ws.buf[ws.pos:], p)
ws.pos += len(p)
return len(p), nil
}
// Seek sets the offset for the next Read or Write to offset, interpreted according
// to whence: SeekStart means relative to the start of the file, SeekCurrent means
// relative to the current offset, and SeekEnd means relative to the end. Seek returns
// the new offset relative to the start of the file and an error, if any.
func (ws *writeSeeker) Seek(offset int64, whence int) (int64, error) {
newPos, offs := 0, int(offset)
switch whence {
case io.SeekStart:
newPos = offs
case io.SeekCurrent:
newPos = ws.pos + offs
case io.SeekEnd:
newPos = len(ws.buf) + offs
}
if newPos < 0 {
return 0, errors.New("negative result pos")
}
ws.pos = newPos
return int64(newPos), nil
}
// Decode takes buf, a slice of FLAC, and decodes to WAV. If complete decoding
// fails, an error is returned.
func Decode(buf []byte) ([]byte, error) {
// Lex the FLAC into a stream to hold audio and it's properties.
r := bytes.NewReader(buf)
stream, err := flac.Parse(r)
if err != nil {
return nil, errors.New("Could not parse FLAC")
}
// Create WAV encoder and pass writeSeeker that will store output WAV.
ws := &writeSeeker{}
sr := int(stream.Info.SampleRate)
bps := int(stream.Info.BitsPerSample)
nc := int(stream.Info.NChannels)
enc := wav.NewEncoder(ws, sr, bps, nc, wavFormat)
defer enc.Close()
// Decode FLAC into frames of samples
intBuf := &audio.IntBuffer{
Format: &audio.Format{NumChannels: nc, SampleRate: sr},
SourceBitDepth: bps,
}
return decodeFrames(stream, intBuf, enc, ws)
}
// decodeFrames parses frames from the stream and encodes them into WAV until
// the end of the stream is reached. The bytes from writeSeeker buffer are then
// returned. If any errors occur during encodeing, nil bytes and the error is returned.
func decodeFrames(s *flac.Stream, intBuf *audio.IntBuffer, e *wav.Encoder, ws *writeSeeker) ([]byte, error) {
var data []int
for {
frame, err := s.ParseNext()
// If we've reached the end of the stream then we can output the writeSeeker's buffer.
if err == io.EOF {
return ws.Bytes(), nil
} else if err != nil {
return nil, err
}
// Encode WAV audio samples.
data = data[:0]
for i := 0; i < frame.Subframes[0].NSamples; i++ {
for _, subframe := range frame.Subframes {
data = append(data, int(subframe.Samples[i]))
}
}
intBuf.Data = data
if err := e.Write(intBuf); err != nil {
return nil, err
}
}
}

View File

@ -0,0 +1,121 @@
/*
NAME
flac_test.go
DESCRIPTION
flac_test.go provides utilities to test FLAC audio decoding
AUTHOR
Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE
flac_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package flac
import (
"io"
"io/ioutil"
"os"
"testing"
)
const (
testFile = "../../../test/test-data/av/input/robot.flac"
outFile = "testOut.wav"
)
// TestWriteSeekerWrite checks that basic writing to the ws works as expected.
func TestWriteSeekerWrite(t *testing.T) {
ws := &writeSeeker{}
const tstStr1 = "hello"
ws.Write([]byte(tstStr1))
got := string(ws.buf)
if got != tstStr1 {
t.Errorf("Write failed, got: %v, want: %v", got, tstStr1)
}
const tstStr2 = " world"
const want = "hello world"
ws.Write([]byte(tstStr2))
got = string(ws.buf)
if got != want {
t.Errorf("Second write failed, got: %v, want: %v", got, want)
}
}
// TestWriteSeekerSeek checks that writing and seeking works as expected, i.e. we
// can write, then seek to a knew place in the buf, and write again, either replacing
// bytes, or appending bytes.
func TestWriteSeekerSeek(t *testing.T) {
ws := &writeSeeker{}
const tstStr1 = "hello"
want1 := tstStr1
ws.Write([]byte(tstStr1))
got := string(ws.buf)
if got != tstStr1 {
t.Errorf("Unexpected output, got: %v, want: %v", got, want1)
}
const tstStr2 = " world"
const want2 = tstStr1 + tstStr2
ws.Write([]byte(tstStr2))
got = string(ws.buf)
if got != want2 {
t.Errorf("Unexpected output, got: %v, want: %v", got, want2)
}
const tstStr3 = "k!"
const want3 = "hello work!"
ws.Seek(-2, io.SeekEnd)
ws.Write([]byte(tstStr3))
got = string(ws.buf)
if got != want3 {
t.Errorf("Unexpected output, got: %v, want: %v", got, want3)
}
const tstStr4 = "gopher"
const want4 = "hello gopher"
ws.Seek(6, io.SeekStart)
ws.Write([]byte(tstStr4))
got = string(ws.buf)
if got != want4 {
t.Errorf("Unexpected output, got: %v, want: %v", got, want4)
}
}
// TestDecodeFlac checks that we can load a flac file and decode to wav, writing
// to a wav file.
func TestDecodeFlac(t *testing.T) {
b, err := ioutil.ReadFile(testFile)
if err != nil {
t.Fatalf("Could not read test file, failed with err: %v", err.Error())
}
out, err := Decode(b)
if err != nil {
t.Errorf("Could not decode, failed with err: %v", err.Error())
}
f, err := os.Create(outFile)
if err != nil {
t.Fatalf("Could not create output file, failed with err: %v", err.Error())
}
_, err = f.Write(out)
if err != nil {
t.Fatalf("Could not write to output file, failed with err: %v", err.Error())
}
}

View File

@ -0,0 +1,265 @@
/*
NAME
ts-repair/main.go
DESCRIPTION
This program attempts to repair mpegts discontinuities using one of two methods
as selected by the mode flag. Setting the mode flag to 0 will result in repair
by shifting all CCs such that they are continuous. Setting the mode flag to 1
will result in repair through setting the discontinuity indicator to true at
packets where a discontinuity exists.
Specify the input file with the in flag, and the output file with out flag.
AUTHOR
Saxon A. Nelson-Milton <saxon.milton@gmail.com>
LICENSE
mpegts.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses).
*/
package main
import (
"errors"
"flag"
"fmt"
"io"
"os"
"bitbucket.org/ausocean/av/stream/mts"
"github.com/Comcast/gots/packet"
)
const (
PatPid = 0
PmtPid = 4096
VideoPid = 256
HeadSize = 4
DefaultAdaptationSize = 2
AdaptationIdx = 4
AdaptationControlIdx = 3
AdaptationBodyIdx = AdaptationIdx + 1
AdaptationControlMask = 0x30
DefaultAdaptationBodySize = 1
DiscontinuityIndicatorMask = 0x80
DiscontinuityIndicatorIdx = AdaptationIdx + 1
)
// Various errors that we can encounter.
const (
errBadInPath = "No file path provided, or file does not exist"
errCantCreateOut = "Can't create output file"
errCantGetPid = "Can't get pid from packet"
errReadFail = "Read failed"
errWriteFail = "Write to file failed"
errBadMode = "Bad fix mode"
errAdaptationPresent = "Adaptation field is already present in packet"
errNoAdaptationField = "No adaptation field in this packet"
)
// Consts describing flag usage.
const (
inUsage = "The path to the file to be repaired"
outUsage = "Output file path"
modeUsage = "Fix mode: 0 = cc-shift, 1 = di-update"
)
// Repair modes.
const (
ccShift = iota
diUpdate
)
var ccMap = map[int]byte{
PatPid: 16,
PmtPid: 16,
VideoPid: 16,
}
// packetNo will keep track of the ts packet number for reference.
var packetNo int
// Option defines a func that performs an action on p in order to change a ts option.
type Option func(p *Packet)
// Packet is a byte array of size PacketSize i.e. 188 bytes. We define this
// to allow us to write receiver funcs for the [PacketSize]byte type.
type Packet [mts.PacketSize]byte
// CC returns the CC of p.
func (p *Packet) CC() byte {
return (*p)[3] & 0x0f
}
// setCC sets the CC of p.
func (p *Packet) setCC(cc byte) {
(*p)[3] |= cc & 0xf
}
// setDI sets the discontinuity counter of p.
func (p *Packet) setDI(di bool) {
if di {
p[5] |= 0x80
} else {
p[5] &= 0x7f
}
}
// addAdaptationField adds an adaptation field to p, and applys the passed options to this field.
// TODO: this will probably break if we already have adaptation field.
func (p *Packet) addAdaptationField(options ...Option) error {
if p.hasAdaptation() {
return errors.New(errAdaptationPresent)
}
// Create space for adaptation field.
copy(p[HeadSize+DefaultAdaptationSize:], p[HeadSize:len(p)-DefaultAdaptationSize])
// TODO: seperate into own function
// Update adaptation field control.
p[AdaptationControlIdx] &= 0xff ^ AdaptationControlMask
p[AdaptationControlIdx] |= AdaptationControlMask
// Default the adaptationfield.
p.resetAdaptation()
// Apply and options that have bee passed.
for _, option := range options {
option(p)
}
return nil
}
// resetAdaptation sets fields in ps adaptation field to 0 if the adaptation field
// exists, otherwise an error is returned.
func (p *Packet) resetAdaptation() error {
if !p.hasAdaptation() {
return errors.New(errNoAdaptationField)
}
p[AdaptationIdx] = DefaultAdaptationBodySize
p[AdaptationBodyIdx] = 0x00
return nil
}
// hasAdaptation returns true if p has an adaptation field and false otherwise.
func (p *Packet) hasAdaptation() bool {
afc := p[AdaptationControlIdx] & AdaptationControlMask
if afc == 0x20 || afc == 0x30 {
return true
} else {
return false
}
}
// DiscontinuityIndicator returns and Option that will set p's discontinuity
// indicator according to f.
func DiscontinuityIndicator(f bool) Option {
return func(p *Packet) {
set := byte(DiscontinuityIndicatorMask)
if !f {
set = 0x00
}
p[DiscontinuityIndicatorIdx] &= 0xff ^ DiscontinuityIndicatorMask
p[DiscontinuityIndicatorIdx] |= DiscontinuityIndicatorMask & set
}
}
func main() {
// Deal with input flags
inPtr := flag.String("in", "", inUsage)
outPtr := flag.String("out", "out.ts", outUsage)
modePtr := flag.Int("mode", diUpdate, modeUsage)
flag.Parse()
// Try and open the given input file, otherwise panic - we can't do anything
inFile, err := os.Open(*inPtr)
defer inFile.Close()
if err != nil {
panic(errBadInPath)
}
// Try and create output file, otherwise panic - we can't do anything
outFile, err := os.Create(*outPtr)
defer outFile.Close()
if err != nil {
panic(errCantCreateOut)
}
// Read each packet from the input file reader
var p Packet
for {
// If we get an end of file then return, otherwise we panic - can't do anything else
if _, err := inFile.Read(p[:mts.PacketSize]); err == io.EOF {
return
} else if err != nil {
panic(errReadFail + ": " + err.Error())
}
packetNo++
// Get the pid from the packet
pid, err := packet.Pid((*packet.Packet)(&p))
if err != nil {
panic(errCantGetPid)
}
// Get the cc from the packet and also the expected cc (if exists)
cc := p.CC()
expect, exists := expectedCC(int(pid))
if !exists {
updateCCMap(int(pid), cc)
} else {
switch *modePtr {
// ccShift mode shifts all CC regardless of presence of Discontinuities or not
case ccShift:
p.setCC(expect)
// diUpdate mode finds discontinuities and sets the discontinuity indicator to true.
// If we have a pat or pmt then we need to add an adaptation field and then set the DI.
case diUpdate:
if cc != expect {
fmt.Printf("***** Discontinuity found (packetNo: %v pid: %v, cc: %v, expect: %v)\n", packetNo, pid, cc, expect)
if p.hasAdaptation() {
p.setDI(true)
} else {
p.addAdaptationField(DiscontinuityIndicator(true))
}
updateCCMap(int(pid), p.CC())
}
default:
panic(errBadMode)
}
}
// Write this packet to the output file.
if _, err := outFile.Write(p[:]); err != nil {
panic(errWriteFail + ": " + err.Error())
}
}
}
// expectedCC returns the expected cc for the given pid. If the cc hasn't been
// used yet, then 16 and false is returned.
func expectedCC(pid int) (byte, bool) {
cc := ccMap[pid]
if cc == 16 {
return 16, false
}
ccMap[pid] = (cc + 1) & 0xf
return cc, true
}
// updateCCMap updates the cc for the passed pid.
func updateCCMap(pid int, cc byte) {
ccMap[pid] = (cc + 1) & 0xf
}

View File

@ -1,75 +0,0 @@
/*
NAME
main.go
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE
main.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses).
*/
package main
import (
"flag"
"log"
"time"
"bitbucket.org/ausocean/av/revid"
"bitbucket.org/ausocean/iot/pi/smartlogger"
"bitbucket.org/ausocean/utils/logger"
)
const (
inputFile = "../../../../test/test-data/av/input/betterInput.h264"
frameRate = "25"
runDuration = 120 * time.Second
logPath = "/var/log"
)
// Test h264 inputfile to flv format into rtmp using librtmp c wrapper
func main() {
// Get the rtmp url from a cmd flag
rtmpUrlPtr := flag.String("rtmpUrl", "", "The rtmp url you would like to stream to.")
flag.Parse()
if *rtmpUrlPtr == "" {
log.Println("No RTMP url passed!")
return
}
config := revid.Config{
Input: revid.File,
InputFileName: inputFile,
InputCodec: revid.H264,
Output1: revid.Rtmp,
RtmpMethod: revid.LibRtmp,
RtmpUrl: *rtmpUrlPtr,
Packetization: revid.Flv,
Logger: logger.New(logger.Info, &smartlogger.New(logPath).LogRoller),
}
revidInst, err := revid.New(config, nil)
if err != nil {
config.Logger.Log(logger.Error, "Should not have got an error!: ", err.Error())
return
}
revidInst.Start()
time.Sleep(runDuration)
revidInst.Stop()
}

View File

@ -1,66 +0,0 @@
/*
NAME
main.go
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE
main.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses).
*/
package main
import (
"time"
"bitbucket.org/ausocean/av/revid"
"bitbucket.org/ausocean/iot/pi/smartlogger"
"bitbucket.org/ausocean/utils/logger"
)
const (
inputFile = "../../../../test/test-data/av/input/betterInput.h264"
outputFile = "output.ts"
frameRate = "25"
runDuration = 120 * time.Second
logPath = "/var/log"
)
// Test h264 inputfile to flv format into rtmp using librtmp c wrapper
func main() {
config := revid.Config{
Input: revid.File,
InputFileName: inputFile,
InputCodec: revid.H264,
Output1: revid.File,
OutputFileName: outputFile,
Packetization: revid.Mpegts,
Logger: logger.New(logger.Info, &smartlogger.New(logPath).LogRoller),
}
revidInst, err := revid.New(config, nil)
if err != nil {
config.Logger.Log(logger.Error, "Should not have got an error!:", err.Error())
return
}
revidInst.Start()
time.Sleep(runDuration)
revidInst.Stop()
}

View File

@ -40,8 +40,7 @@ type Config struct {
Input uint8 Input uint8
InputCodec uint8 InputCodec uint8
Output1 uint8 Outputs []uint8
Output2 uint8
RtmpMethod uint8 RtmpMethod uint8
Packetization uint8 Packetization uint8
@ -69,6 +68,7 @@ type Config struct {
RtpAddress string RtpAddress string
Logger Logger Logger Logger
SendRetry bool SendRetry bool
BurstPeriod uint
} }
// Enums for config struct // Enums for config struct
@ -115,6 +115,7 @@ const (
defaultInputCodec = H264 defaultInputCodec = H264
defaultVerbosity = No // FIXME(kortschak): This makes no sense whatsoever. No is currently 15. defaultVerbosity = No // FIXME(kortschak): This makes no sense whatsoever. No is currently 15.
defaultRtpAddr = "localhost:6970" defaultRtpAddr = "localhost:6970"
defaultBurstPeriod = 10 // Seconds
) )
// Validate checks for any errors in the config fields and defaults settings // Validate checks for any errors in the config fields and defaults settings
@ -172,13 +173,16 @@ func (c *Config) Validate(r *Revid) error {
return errors.New("bad input codec defined in config") return errors.New("bad input codec defined in config")
} }
switch c.Output1 { for i, o := range c.Outputs {
switch o {
case File: case File:
case Udp: case Udp:
case Rtmp, FfmpegRtmp: case Rtmp, FfmpegRtmp:
if c.RtmpUrl == "" { if c.RtmpUrl == "" {
c.Logger.Log(logger.Info, pkg+"no RTMP URL: falling back to HTTP") c.Logger.Log(logger.Info, pkg+"no RTMP URL: falling back to HTTP")
c.Output1 = Http c.Outputs[i] = Http
// FIXME(kortschak): Does this want the same line as below?
// c.FramesPerClip = httpFramesPerClip
break break
} }
c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for rtmp out", c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for rtmp out",
@ -187,7 +191,7 @@ func (c *Config) Validate(r *Revid) error {
case NothingDefined: case NothingDefined:
c.Logger.Log(logger.Warning, pkg+"no output defined, defaulting", "output", c.Logger.Log(logger.Warning, pkg+"no output defined, defaulting", "output",
defaultOutput) defaultOutput)
c.Output1 = defaultOutput c.Outputs[i] = defaultOutput
fallthrough fallthrough
case Http, Rtp: case Http, Rtp:
c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for http out", c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for http out",
@ -196,21 +200,11 @@ func (c *Config) Validate(r *Revid) error {
default: default:
return errors.New("bad output type defined in config") return errors.New("bad output type defined in config")
} }
switch c.Output2 {
case File:
case Rtp:
case Udp:
case Rtmp, FfmpegRtmp:
if c.RtmpUrl == "" {
c.Logger.Log(logger.Info, pkg+"no RTMP URL: falling back to HTTP")
c.Output2 = Http
break
} }
case NothingDefined:
case Http: if c.BurstPeriod == 0 {
default: c.Logger.Log(logger.Warning, pkg+"no burst period defined, defaulting", "burstPeriod", defaultBurstPeriod)
return errors.New("bad output2 type defined in config") c.BurstPeriod = defaultBurstPeriod
} }
if c.FramesPerClip < 1 { if c.FramesPerClip < 1 {

View File

@ -37,6 +37,7 @@ import (
"os/exec" "os/exec"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
"bitbucket.org/ausocean/av/stream" "bitbucket.org/ausocean/av/stream"
@ -50,7 +51,7 @@ import (
// Ring buffer sizes and read/write timeouts. // Ring buffer sizes and read/write timeouts.
const ( const (
ringBufferSize = 10000 ringBufferSize = 100
ringBufferElementSize = 150000 ringBufferElementSize = 150000
writeTimeout = 10 * time.Millisecond writeTimeout = 10 * time.Millisecond
readTimeout = 10 * time.Millisecond readTimeout = 10 * time.Millisecond
@ -118,8 +119,12 @@ type Revid struct {
// bitrate hold the last send bitrate calculation result. // bitrate hold the last send bitrate calculation result.
bitrate int bitrate int
// isRunning is a loaded and cocked foot-gun. mu sync.Mutex
isRunning bool isRunning bool
wg sync.WaitGroup
err chan error
} }
// packer takes data segments and packs them into clips // packer takes data segments and packs them into clips
@ -157,8 +162,15 @@ func (p *packer) Write(frame []byte) (int, error) {
return n, err return n, err
} }
p.packetCount++ p.packetCount++
var hasRtmp bool
for _, d := range p.owner.config.Outputs {
if d == Rtmp {
hasRtmp = true
break
}
}
now := time.Now() now := time.Now()
if (p.owner.config.Output1 != Rtmp && now.Sub(p.lastTime) > clipDuration && p.packetCount%7 == 0) || p.owner.config.Output1 == Rtmp { if hasRtmp || (now.Sub(p.lastTime) > clipDuration && p.packetCount%7 == 0) {
p.owner.buffer.Flush() p.owner.buffer.Flush()
p.packetCount = 0 p.packetCount = 0
p.lastTime = now p.lastTime = now
@ -169,16 +181,35 @@ func (p *packer) Write(frame []byte) (int, error) {
// New returns a pointer to a new Revid with the desired configuration, and/or // New returns a pointer to a new Revid with the desired configuration, and/or
// an error if construction of the new instance was not successful. // an error if construction of the new instance was not successful.
func New(c Config, ns *netsender.Sender) (*Revid, error) { func New(c Config, ns *netsender.Sender) (*Revid, error) {
r := Revid{ns: ns} r := Revid{ns: ns, err: make(chan error)}
r.buffer = ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout) r.buffer = ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout)
r.packer.owner = &r r.packer.owner = &r
err := r.reset(c) err := r.reset(c)
if err != nil { if err != nil {
return nil, err return nil, err
} }
go r.handleErrors()
return &r, nil return &r, nil
} }
// TODO(Saxon): put more thought into error severity.
func (r *Revid) handleErrors() {
for {
err := <-r.err
if err != nil {
r.config.Logger.Log(logger.Error, pkg+"async error", "error", err.Error())
err = r.Stop()
if err != nil {
r.config.Logger.Log(logger.Fatal, pkg+"failed to stop", "error", err.Error())
}
err = r.Start()
if err != nil {
r.config.Logger.Log(logger.Fatal, pkg+"failed to restart", "error", err.Error())
}
}
}
}
// Bitrate returns the result of the most recent bitrate check. // Bitrate returns the result of the most recent bitrate check.
func (r *Revid) Bitrate() int { func (r *Revid) Bitrate() int {
return r.bitrate return r.bitrate
@ -203,40 +234,35 @@ func (r *Revid) reset(config Config) error {
} }
} }
n := 1 r.destination = r.destination[:0]
if r.config.Output2 != 0 && r.config.Output2 != Rtp { for _, typ := range r.config.Outputs {
n = 2 switch typ {
}
r.destination = make([]loadSender, n)
for outNo, outType := range []uint8{r.config.Output1, r.config.Output2} {
switch outType {
case File: case File:
s, err := newFileSender(config.OutputFileName) s, err := newFileSender(config.OutputFileName)
if err != nil { if err != nil {
return err return err
} }
r.destination[outNo] = s r.destination = append(r.destination, s)
case FfmpegRtmp: case FfmpegRtmp:
s, err := newFfmpegSender(config.RtmpUrl, fmt.Sprint(r.config.FrameRate)) s, err := newFfmpegSender(config.RtmpUrl, fmt.Sprint(r.config.FrameRate))
if err != nil { if err != nil {
return err return err
} }
r.destination[outNo] = s r.destination = append(r.destination, s)
case Rtmp: case Rtmp:
s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log) s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log)
if err != nil { if err != nil {
return err return err
} }
r.destination[outNo] = s r.destination = append(r.destination, s)
case Http: case Http:
r.destination[outNo] = newHttpSender(r.ns, r.config.Logger.Log) r.destination = append(r.destination, newHttpSender(r.ns, r.config.Logger.Log))
case Udp: case Udp:
s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log) s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log)
if err != nil { if err != nil {
return err return err
} }
r.destination[outNo] = s r.destination = append(r.destination, s)
case Rtp: case Rtp:
r.rtpSender, err = newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) r.rtpSender, err = newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate)
if err != nil { if err != nil {
@ -293,21 +319,40 @@ func (r *Revid) reset(config Config) error {
return nil return nil
} }
// IsRunning returns whether the receiver is running. // IsRunning returns true if revid is running.
func (r *Revid) IsRunning() bool { func (r *Revid) IsRunning() bool {
return r.isRunning r.mu.Lock()
ret := r.isRunning
r.mu.Unlock()
return ret
}
func (r *Revid) Config() Config {
r.mu.Lock()
cfg := r.config
r.mu.Unlock()
return cfg
}
// setIsRunning sets r.isRunning using b.
func (r *Revid) setIsRunning(b bool) {
r.mu.Lock()
r.isRunning = b
r.mu.Unlock()
} }
// Start invokes a Revid to start processing video from a defined input // Start invokes a Revid to start processing video from a defined input
// and packetising (if theres packetization) to a defined output. // and packetising (if theres packetization) to a defined output.
func (r *Revid) Start() error { func (r *Revid) Start() error {
if r.isRunning { if r.IsRunning() {
return errors.New(pkg + "start called but revid is already running") return errors.New(pkg + "start called but revid is already running")
} }
r.config.Logger.Log(logger.Info, pkg+"starting Revid") r.config.Logger.Log(logger.Info, pkg+"starting Revid")
// TODO: this doesn't need to be here
r.config.Logger.Log(logger.Debug, pkg+"setting up output") r.config.Logger.Log(logger.Debug, pkg+"setting up output")
r.isRunning = true r.setIsRunning(true)
r.config.Logger.Log(logger.Info, pkg+"starting output routine") r.config.Logger.Log(logger.Info, pkg+"starting output routine")
r.wg.Add(1)
go r.outputClips() go r.outputClips()
r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content") r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content")
err := r.setupInput() err := r.setupInput()
@ -316,28 +361,145 @@ func (r *Revid) Start() error {
// Stop halts any processing of video data from a camera or file // Stop halts any processing of video data from a camera or file
func (r *Revid) Stop() error { func (r *Revid) Stop() error {
if !r.isRunning { if !r.IsRunning() {
return errors.New(pkg + "stop called but revid is already stopped") return errors.New(pkg + "stop called but revid is already stopped")
} }
r.config.Logger.Log(logger.Info, pkg+"stopping revid") r.config.Logger.Log(logger.Info, pkg+"stopping revid")
r.isRunning = false r.setIsRunning(false)
r.config.Logger.Log(logger.Info, pkg+"killing input proccess") r.config.Logger.Log(logger.Info, pkg+"killing input proccess")
// If a cmd process is running, we kill! // If a cmd process is running, we kill!
if r.cmd != nil && r.cmd.Process != nil { if r.cmd != nil && r.cmd.Process != nil {
r.cmd.Process.Kill() r.cmd.Process.Kill()
} }
r.wg.Wait()
return nil
}
func (r *Revid) Update(vars map[string]string) error {
if r.IsRunning() {
if err := r.Stop(); err != nil {
return err
}
}
//look through the vars and update revid where needed
for key, value := range vars {
switch key {
case "Output":
// FIXME(kortschak): There can be only one!
// How do we specify outputs after the first?
//
// Maybe we shouldn't be doing this!
switch value {
case "File":
r.config.Outputs[0] = File
case "Http":
r.config.Outputs[0] = Http
case "Rtmp":
r.config.Outputs[0] = Rtmp
case "FfmpegRtmp":
r.config.Outputs[0] = FfmpegRtmp
default:
r.config.Logger.Log(logger.Warning, pkg+"invalid Output1 param", "value", value)
continue
}
case "FramesPerClip":
f, err := strconv.ParseUint(value, 10, 0)
if err != nil {
r.config.Logger.Log(logger.Warning, pkg+"invalid framesperclip param", "value", value)
break
}
r.config.FramesPerClip = uint(f)
case "RtmpUrl":
r.config.RtmpUrl = value
case "Bitrate":
v, err := strconv.ParseUint(value, 10, 0)
if err != nil {
r.config.Logger.Log(logger.Warning, pkg+"invalid framerate param", "value", value)
break
}
r.config.Bitrate = uint(v)
case "OutputFileName":
r.config.OutputFileName = value
case "InputFileName":
r.config.InputFileName = value
case "Height":
h, err := strconv.ParseUint(value, 10, 0)
if err != nil {
r.config.Logger.Log(logger.Warning, pkg+"invalid height param", "value", value)
break
}
r.config.Height = uint(h)
case "Width":
w, err := strconv.ParseUint(value, 10, 0)
if err != nil {
r.config.Logger.Log(logger.Warning, pkg+"invalid width param", "value", value)
break
}
r.config.Width = uint(w)
case "FrameRate":
v, err := strconv.ParseUint(value, 10, 0)
if err != nil {
r.config.Logger.Log(logger.Warning, pkg+"invalid framerate param", "value", value)
break
}
r.config.FrameRate = uint(v)
case "HttpAddress":
r.config.HttpAddress = value
case "Quantization":
q, err := strconv.ParseUint(value, 10, 0)
if err != nil {
r.config.Logger.Log(logger.Warning, pkg+"invalid quantization param", "value", value)
break
}
r.config.Quantization = uint(q)
case "IntraRefreshPeriod":
p, err := strconv.ParseUint(value, 10, 0)
if err != nil {
r.config.Logger.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value)
break
}
r.config.IntraRefreshPeriod = uint(p)
case "HorizontalFlip":
switch strings.ToLower(value) {
case "true":
r.config.FlipHorizontal = true
case "false":
r.config.FlipHorizontal = false
default:
r.config.Logger.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value)
}
case "VerticalFlip":
switch strings.ToLower(value) {
case "true":
r.config.FlipVertical = true
case "false":
r.config.FlipVertical = false
default:
r.config.Logger.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value)
}
case "BurstPeriod":
v, err := strconv.ParseUint(value, 10, 0)
if err != nil {
r.config.Logger.Log(logger.Warning, pkg+"invalid BurstPeriod param", "value", value)
break
}
r.config.BurstPeriod = uint(v)
}
}
return nil return nil
} }
// outputClips takes the clips produced in the packClips method and outputs them // outputClips takes the clips produced in the packClips method and outputs them
// to the desired output defined in the revid config // to the desired output defined in the revid config
func (r *Revid) outputClips() { func (r *Revid) outputClips() {
defer r.wg.Done()
lastTime := time.Now() lastTime := time.Now()
var count int var count int
loop: loop:
for r.isRunning { for r.IsRunning() {
// If the ring buffer has something we can read and send off // If the ring buffer has something we can read and send off
chunk, err := r.buffer.Next(readTimeout) chunk, err := r.buffer.Next(readTimeout)
switch err { switch err {
@ -381,7 +543,7 @@ loop:
err = rs.restart() err = rs.restart()
if err != nil { if err != nil {
r.config.Logger.Log(logger.Error, pkg+"failed to restart rtmp session", "error", err.Error()) r.config.Logger.Log(logger.Error, pkg+"failed to restart rtmp session", "error", err.Error())
r.isRunning = false r.setIsRunning(false)
return return
} }
@ -472,11 +634,9 @@ func (r *Revid) startRaspivid() error {
r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error()) r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error())
} }
r.config.Logger.Log(logger.Info, pkg+"reading camera data") r.wg.Add(1)
delay := time.Second / time.Duration(r.config.FrameRate) go r.processFrom(stdout, 0)
err = r.lexTo(r.encoder, stdout, delay) return nil
r.config.Logger.Log(logger.Info, pkg+"finished reading camera data")
return err
} }
func (r *Revid) startV4L() error { func (r *Revid) startV4L() error {
@ -512,7 +672,6 @@ func (r *Revid) startV4L() error {
r.config.Logger.Log(logger.Info, pkg+"ffmpeg args", "args", strings.Join(args, " ")) r.config.Logger.Log(logger.Info, pkg+"ffmpeg args", "args", strings.Join(args, " "))
r.cmd = exec.Command("ffmpeg", args...) r.cmd = exec.Command("ffmpeg", args...)
delay := time.Second / time.Duration(r.config.FrameRate)
stdout, err := r.cmd.StdoutPipe() stdout, err := r.cmd.StdoutPipe()
if err != nil { if err != nil {
return err return err
@ -524,15 +683,13 @@ func (r *Revid) startV4L() error {
return err return err
} }
r.config.Logger.Log(logger.Info, pkg+"reading camera data") r.wg.Add(1)
err = r.lexTo(r.encoder, stdout, delay) go r.processFrom(stdout, time.Duration(0))
r.config.Logger.Log(logger.Info, pkg+"finished reading camera data") return nil
return err
} }
// setupInputForFile sets things up for getting input from a file // setupInputForFile sets things up for getting input from a file
func (r *Revid) setupInputForFile() error { func (r *Revid) setupInputForFile() error {
delay := time.Second / time.Duration(r.config.FrameRate)
f, err := os.Open(r.config.InputFileName) f, err := os.Open(r.config.InputFileName)
if err != nil { if err != nil {
r.config.Logger.Log(logger.Error, err.Error()) r.config.Logger.Log(logger.Error, err.Error())
@ -542,5 +699,14 @@ func (r *Revid) setupInputForFile() error {
defer f.Close() defer f.Close()
// TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop.
return r.lexTo(r.encoder, f, delay) r.wg.Add(1)
go r.processFrom(f, time.Second/time.Duration(r.config.FrameRate))
return nil
}
func (r *Revid) processFrom(read io.Reader, delay time.Duration) {
r.config.Logger.Log(logger.Info, pkg+"reading input data")
r.err <- r.lexTo(r.encoder, read, delay)
r.config.Logger.Log(logger.Info, pkg+"finished reading input data")
r.wg.Done()
} }

View File

@ -35,6 +35,7 @@ import (
"net" "net"
"os" "os"
"os/exec" "os/exec"
"strconv"
"bitbucket.org/ausocean/av/rtmp" "bitbucket.org/ausocean/av/rtmp"
"bitbucket.org/ausocean/av/stream/mts" "bitbucket.org/ausocean/av/stream/mts"
@ -172,7 +173,7 @@ func (s *httpSender) extractMeta(r string) error {
s.log(logger.Warning, pkg+"No timestamp in reply") s.log(logger.Warning, pkg+"No timestamp in reply")
} else { } else {
s.log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t)) s.log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t))
mts.MetaData.SetTimeStamp(uint64(t)) mts.Meta.Add("ts", strconv.Itoa(t))
} }
// Extract location from reply // Extract location from reply
@ -181,7 +182,7 @@ func (s *httpSender) extractMeta(r string) error {
s.log(logger.Warning, pkg+"No location in reply") s.log(logger.Warning, pkg+"No location in reply")
} else { } else {
s.log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g)) s.log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g))
mts.MetaData.SetLocation(g) mts.Meta.Add("loc", g)
} }
return nil return nil

View File

@ -51,7 +51,7 @@ import (
const ( const (
typeNumber = 0x00 typeNumber = 0x00
typeBoolean = 0x01 typeBoolean = 0x01
typeString = 0x02 TypeString = 0x02
TypeObject = 0x03 TypeObject = 0x03
typeMovieClip = 0x04 typeMovieClip = 0x04
TypeNull = 0x05 TypeNull = 0x05
@ -93,7 +93,7 @@ type Property struct {
var ( var (
ErrShortBuffer = errors.New("amf: short buffer") // The supplied buffer was too short. ErrShortBuffer = errors.New("amf: short buffer") // The supplied buffer was too short.
ErrInvalidType = errors.New("amf: invalid type") // An invalid type was supplied to the encoder. ErrInvalidType = errors.New("amf: invalid type") // An invalid type was supplied to the encoder.
ErrUnexpectedType = errors.New("amf: unexpected end") // An unexpected type was encountered while decoding. ErrUnexpectedType = errors.New("amf: unexpected type") // An unexpected type was encountered while decoding.
ErrPropertyNotFound = errors.New("amf: property not found") // The requested property was not found. ErrPropertyNotFound = errors.New("amf: property not found") // The requested property was not found.
) )
@ -160,6 +160,7 @@ func EncodeInt32(buf []byte, val uint32) ([]byte, error) {
} }
// EncodeString encodes a string. // EncodeString encodes a string.
// Strings less than 65536 in length are encoded as TypeString, while longer strings are ecodeded as typeLongString.
func EncodeString(buf []byte, val string) ([]byte, error) { func EncodeString(buf []byte, val string) ([]byte, error) {
const typeSize = 1 const typeSize = 1
if len(val) < 65536 && len(val)+typeSize+binary.Size(int16(0)) > len(buf) { if len(val) < 65536 && len(val)+typeSize+binary.Size(int16(0)) > len(buf) {
@ -171,7 +172,7 @@ func EncodeString(buf []byte, val string) ([]byte, error) {
} }
if len(val) < 65536 { if len(val) < 65536 {
buf[0] = typeString buf[0] = TypeString
buf = buf[1:] buf = buf[1:]
binary.BigEndian.PutUint16(buf[:2], uint16(len(val))) binary.BigEndian.PutUint16(buf[:2], uint16(len(val)))
buf = buf[2:] buf = buf[2:]
@ -263,7 +264,7 @@ func EncodeProperty(prop *Property, buf []byte) ([]byte, error) {
return EncodeNumber(buf, prop.Number) return EncodeNumber(buf, prop.Number)
case typeBoolean: case typeBoolean:
return EncodeBoolean(buf, prop.Number != 0) return EncodeBoolean(buf, prop.Number != 0)
case typeString: case TypeString:
return EncodeString(buf, prop.String) return EncodeString(buf, prop.String)
case TypeNull: case TypeNull:
if len(buf) < 2 { if len(buf) < 2 {
@ -320,7 +321,7 @@ func DecodeProperty(prop *Property, buf []byte, decodeName bool) (int, error) {
prop.Number = float64(buf[0]) prop.Number = float64(buf[0])
buf = buf[1:] buf = buf[1:]
case typeString: case TypeString:
n := DecodeInt16(buf[:2]) n := DecodeInt16(buf[:2])
if len(buf) < int(n+2) { if len(buf) < int(n+2) {
return 0, ErrShortBuffer return 0, ErrShortBuffer
@ -354,7 +355,6 @@ func DecodeProperty(prop *Property, buf []byte, decodeName bool) (int, error) {
} }
// Encode encodes an Object into its AMF representation. // Encode encodes an Object into its AMF representation.
// This is the top-level encoding function and is typically the only function callers will need to use.
func Encode(obj *Object, buf []byte) ([]byte, error) { func Encode(obj *Object, buf []byte) ([]byte, error) {
if len(buf) < 5 { if len(buf) < 5 {
return nil, ErrShortBuffer return nil, ErrShortBuffer
@ -481,7 +481,7 @@ func (obj *Object) NumberProperty(name string, idx int) (float64, error) {
// StringProperty is a wrapper for Property that returns a String property's value, if any. // StringProperty is a wrapper for Property that returns a String property's value, if any.
func (obj *Object) StringProperty(name string, idx int) (string, error) { func (obj *Object) StringProperty(name string, idx int) (string, error) {
prop, err := obj.Property(name, idx, typeString) prop, err := obj.Property(name, idx, TypeString)
if err != nil { if err != nil {
return "", err return "", err
} }

View File

@ -58,7 +58,7 @@ func TestSanity(t *testing.T) {
// TestStrings tests string encoding and decoding. // TestStrings tests string encoding and decoding.
func TestStrings(t *testing.T) { func TestStrings(t *testing.T) {
// Short string encoding is as follows: // Short string encoding is as follows:
// enc[0] = data type (typeString) // enc[0] = data type (TypeString)
// end[1:3] = size // end[1:3] = size
// enc[3:] = data // enc[3:] = data
for _, s := range testStrings { for _, s := range testStrings {
@ -67,8 +67,8 @@ func TestStrings(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("EncodeString failed") t.Errorf("EncodeString failed")
} }
if buf[0] != typeString { if buf[0] != TypeString {
t.Errorf("Expected typeString, got %v", buf[0]) t.Errorf("Expected TypeString, got %v", buf[0])
} }
ds := DecodeString(buf[1:]) ds := DecodeString(buf[1:])
if s != ds { if s != ds {
@ -76,7 +76,7 @@ func TestStrings(t *testing.T) {
} }
} }
// Long string encoding is as follows: // Long string encoding is as follows:
// enc[0] = data type (typeString) // enc[0] = data type (TypeString)
// end[1:5] = size // end[1:5] = size
// enc[5:] = data // enc[5:] = data
s := string(make([]byte, 65536)) s := string(make([]byte, 65536))
@ -148,7 +148,7 @@ func TestProperties(t *testing.T) {
// Encode/decode string properties. // Encode/decode string properties.
enc = buf[:] enc = buf[:]
for i := range testStrings { for i := range testStrings {
enc, err = EncodeProperty(&Property{Type: typeString, String: testStrings[i]}, enc) enc, err = EncodeProperty(&Property{Type: TypeString, String: testStrings[i]}, enc)
if err != nil { if err != nil {
t.Errorf("EncodeProperty of string failed") t.Errorf("EncodeProperty of string failed")
} }
@ -235,7 +235,7 @@ func TestObject(t *testing.T) {
// Construct a more complicated object that includes a nested object. // Construct a more complicated object that includes a nested object.
var obj2 Object var obj2 Object
for i := range testStrings { for i := range testStrings {
obj2.Properties = append(obj2.Properties, Property{Type: typeString, String: testStrings[i]}) obj2.Properties = append(obj2.Properties, Property{Type: TypeString, String: testStrings[i]})
obj2.Properties = append(obj2.Properties, Property{Type: typeNumber, Number: float64(testNumbers[i])}) obj2.Properties = append(obj2.Properties, Property{Type: typeNumber, Number: float64(testNumbers[i])})
} }
obj2.Properties = append(obj2.Properties, Property{Type: TypeObject, Object: obj1}) obj2.Properties = append(obj2.Properties, Property{Type: TypeObject, Object: obj1})

View File

@ -76,7 +76,7 @@ type link struct {
protocol int32 protocol int32
timeout uint timeout uint
port uint16 port uint16
conn *net.TCPConn conn net.Conn
} }
// method represents an RTMP method. // method represents an RTMP method.
@ -121,20 +121,6 @@ func Dial(url string, timeout uint, log Log) (*Conn, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if c.link.app == "" {
return nil, errInvalidURL
}
if c.link.port == 0 {
switch {
case (c.link.protocol & featureSSL) != 0:
c.link.port = 433
c.log(FatalLevel, pkg+"SSL not supported")
case (c.link.protocol & featureHTTP) != 0:
c.link.port = 80
default:
c.link.port = 1935
}
}
c.link.url = rtmpProtocolStrings[c.link.protocol] + "://" + c.link.host + ":" + strconv.Itoa(int(c.link.port)) + "/" + c.link.app c.link.url = rtmpProtocolStrings[c.link.protocol] + "://" + c.link.host + ":" + strconv.Itoa(int(c.link.port)) + "/" + c.link.app
c.link.protocol |= featureWrite c.link.protocol |= featureWrite

View File

@ -81,7 +81,7 @@ const (
// 3: basic header (chunk type and stream ID) (1 byte) // 3: basic header (chunk type and stream ID) (1 byte)
var headerSizes = [...]int{12, 8, 4, 1} var headerSizes = [...]int{12, 8, 4, 1}
// packet defines an RTMP packet. // packet represents an RTMP packet.
type packet struct { type packet struct {
headerType uint8 headerType uint8
packetType uint8 packetType uint8
@ -90,7 +90,6 @@ type packet struct {
timestamp uint32 timestamp uint32
streamID uint32 streamID uint32
bodySize uint32 bodySize uint32
bytesRead uint32
buf []byte buf []byte
body []byte body []byte
} }
@ -179,7 +178,6 @@ func (pkt *packet) readFrom(c *Conn) error {
pkt.timestamp = amf.DecodeInt24(header[:3]) pkt.timestamp = amf.DecodeInt24(header[:3])
if size >= 6 { if size >= 6 {
pkt.bodySize = amf.DecodeInt24(header[3:6]) pkt.bodySize = amf.DecodeInt24(header[3:6])
pkt.bytesRead = 0
if size > 6 { if size > 6 {
pkt.packetType = header[6] pkt.packetType = header[6]
@ -201,25 +199,18 @@ func (pkt *packet) readFrom(c *Conn) error {
hSize += 4 hSize += 4
} }
if pkt.bodySize > 0 && pkt.body == nil { pkt.resize(pkt.bodySize, pkt.headerType)
pkt.resize(pkt.bodySize, (hbuf[0]&0xc0)>>6)
if pkt.bodySize > c.inChunkSize {
c.log(WarnLevel, pkg+"reading large packet", "size", int(pkt.bodySize))
} }
toRead := pkt.bodySize - pkt.bytesRead _, err = c.read(pkt.body[:pkt.bodySize])
chunkSize := c.inChunkSize
if toRead < chunkSize {
chunkSize = toRead
}
_, err = c.read(pkt.body[pkt.bytesRead:][:chunkSize])
if err != nil { if err != nil {
c.log(DebugLevel, pkg+"failed to read packet body", "error", err.Error()) c.log(DebugLevel, pkg+"failed to read packet body", "error", err.Error())
return err return err
} }
pkt.bytesRead += uint32(chunkSize)
// Keep the packet as a reference for other packets on this channel. // Keep the packet as a reference for other packets on this channel.
if c.channelsIn[pkt.channel] == nil { if c.channelsIn[pkt.channel] == nil {
c.channelsIn[pkt.channel] = &packet{} c.channelsIn[pkt.channel] = &packet{}
@ -237,15 +228,16 @@ func (pkt *packet) readFrom(c *Conn) error {
c.channelTimestamp[pkt.channel] = int32(pkt.timestamp) c.channelTimestamp[pkt.channel] = int32(pkt.timestamp)
c.channelsIn[pkt.channel].body = nil c.channelsIn[pkt.channel].body = nil
c.channelsIn[pkt.channel].bytesRead = 0
c.channelsIn[pkt.channel].hasAbsTimestamp = false c.channelsIn[pkt.channel].hasAbsTimestamp = false
return nil return nil
} }
// resize adjusts the packet's storage to accommodate a body of the given size and header type. // resize adjusts the packet's storage (if necessary) to accommodate a body of the given size and header type.
// When headerSizeAuto is specified, the header type is computed based on packet type. // When headerSizeAuto is specified, the header type is computed based on packet type.
func (pkt *packet) resize(size uint32, ht uint8) { func (pkt *packet) resize(size uint32, ht uint8) {
if cap(pkt.buf) < fullHeaderSize+int(size) {
pkt.buf = make([]byte, fullHeaderSize+size) pkt.buf = make([]byte, fullHeaderSize+size)
}
pkt.body = pkt.buf[fullHeaderSize:] pkt.body = pkt.buf[fullHeaderSize:]
if ht != headerSizeAuto { if ht != headerSizeAuto {
pkt.headerType = ht pkt.headerType = ht
@ -407,7 +399,7 @@ func (pkt *packet) writeTo(c *Conn, queue bool) error {
return nil return nil
} }
} else { } else {
// Send previously deferrd packet if combining it with the next one would exceed the chunk size. // Send previously deferred packet if combining it with the next one would exceed the chunk size.
if len(c.deferred)+size+hSize > chunkSize { if len(c.deferred)+size+hSize > chunkSize {
c.log(DebugLevel, pkg+"sending deferred packet separately", "size", len(c.deferred)) c.log(DebugLevel, pkg+"sending deferred packet separately", "size", len(c.deferred))
_, err := c.write(c.deferred) _, err := c.write(c.deferred)
@ -419,7 +411,7 @@ func (pkt *packet) writeTo(c *Conn, queue bool) error {
} }
// TODO(kortschak): Rewrite this horrific peice of premature optimisation. // TODO(kortschak): Rewrite this horrific peice of premature optimisation.
c.log(DebugLevel, pkg+"sending packet", "la", c.link.conn.LocalAddr(), "ra", c.link.conn.RemoteAddr(), "size", size) c.log(DebugLevel, pkg+"sending packet", "size", size, "la", c.link.conn.LocalAddr(), "ra", c.link.conn.RemoteAddr())
for size+hSize != 0 { for size+hSize != 0 {
if chunkSize > size { if chunkSize > size {
chunkSize = size chunkSize = size

View File

@ -41,7 +41,6 @@ import (
) )
// parseURL parses an RTMP URL (ok, technically it is lexing). // parseURL parses an RTMP URL (ok, technically it is lexing).
//
func parseURL(addr string) (protocol int32, host string, port uint16, app, playpath string, err error) { func parseURL(addr string) (protocol int32, host string, port uint16, app, playpath string, err error) {
u, err := url.Parse(addr) u, err := url.Parse(addr)
if err != nil { if err != nil {
@ -81,6 +80,9 @@ func parseURL(addr string) (protocol int32, host string, port uint16, app, playp
} }
elems := strings.SplitN(u.Path[1:], "/", 3) elems := strings.SplitN(u.Path[1:], "/", 3)
app = elems[0] app = elems[0]
if app == "" {
return protocol, host, port, app, playpath, errInvalidURL
}
playpath = elems[1] playpath = elems[1]
if len(elems) == 3 && len(elems[2]) != 0 { if len(elems) == 3 && len(elems[2]) != 0 {
playpath = path.Join(elems[1:]...) playpath = path.Join(elems[1:]...)
@ -97,5 +99,15 @@ func parseURL(addr string) (protocol int32, host string, port uint16, app, playp
} }
} }
switch {
case port != 0:
case (protocol & featureSSL) != 0:
return protocol, host, port, app, playpath, errUnimplemented // port = 433
case (protocol & featureHTTP) != 0:
port = 80
default:
port = 1935
}
return protocol, host, port, app, playpath, nil return protocol, host, port, app, playpath, nil
} }

View File

@ -174,6 +174,13 @@ func connect(c *Conn) error {
return err return err
} }
c.log(DebugLevel, pkg+"connected") c.log(DebugLevel, pkg+"connected")
defer func() {
if err != nil {
c.link.conn.Close()
}
}()
err = handshake(c) err = handshake(c)
if err != nil { if err != nil {
c.log(WarnLevel, pkg+"handshake failed", "error", err.Error()) c.log(WarnLevel, pkg+"handshake failed", "error", err.Error())
@ -185,12 +192,14 @@ func connect(c *Conn) error {
c.log(WarnLevel, pkg+"sendConnect failed", "error", err.Error()) c.log(WarnLevel, pkg+"sendConnect failed", "error", err.Error())
return err return err
} }
c.log(DebugLevel, pkg+"negotiating") c.log(DebugLevel, pkg+"negotiating")
var buf [256]byte
for !c.isPlaying { for !c.isPlaying {
pkt := packet{} pkt := packet{buf: buf[:]}
err = pkt.readFrom(c) err = pkt.readFrom(c)
if err != nil { if err != nil {
break return err
} }
switch pkt.packetType { switch pkt.packetType {
@ -199,14 +208,10 @@ func connect(c *Conn) error {
default: default:
err = handlePacket(c, &pkt) err = handlePacket(c, &pkt)
if err != nil { if err != nil {
break
}
}
}
if !c.isPlaying {
return err return err
} }
}
}
return nil return nil
} }
@ -276,26 +281,18 @@ func sendConnectPacket(c *Conn) error {
return err return err
} }
enc[0] = amf.TypeObject // required link info
enc = enc[1:] info := amf.Object{Properties: []amf.Property{
enc, err = amf.EncodeNamedString(enc, avApp, c.link.app) amf.Property{Type: amf.TypeString, Name: avApp, String: c.link.app},
if err != nil { amf.Property{Type: amf.TypeString, Name: avType, String: avNonprivate},
return err amf.Property{Type: amf.TypeString, Name: avTcUrl, String: c.link.url}},
} }
enc, err = amf.EncodeNamedString(enc, avType, avNonprivate) enc, err = amf.Encode(&info, enc)
if err != nil {
return err
}
enc, err = amf.EncodeNamedString(enc, avTcUrl, c.link.url)
if err != nil {
return err
}
enc, err = amf.EncodeInt24(enc, amf.TypeObjectEnd)
if err != nil { if err != nil {
return err return err
} }
// add auth string, if any // optional link auth info
if c.link.auth != "" { if c.link.auth != "" {
enc, err = amf.EncodeBoolean(enc, c.link.flags&linkAuth != 0) enc, err = amf.EncodeBoolean(enc, c.link.flags&linkAuth != 0)
if err != nil { if err != nil {

View File

@ -243,8 +243,9 @@ func TestFromFile(t *testing.T) {
} }
defer f.Close() defer f.Close()
rs := &rtmpSender{conn: c}
// Pass RTMP session, true for audio, true for video, and 25 FPS // Pass RTMP session, true for audio, true for video, and 25 FPS
flvEncoder, err := flv.NewEncoder(c, true, true, 25) flvEncoder, err := flv.NewEncoder(rs, true, true, 25)
if err != nil { if err != nil {
t.Fatalf("failed to create encoder: %v", err) t.Fatalf("failed to create encoder: %v", err)
} }

View File

@ -30,9 +30,9 @@ package mts
import ( import (
"io" "io"
"sync"
"time" "time"
"bitbucket.org/ausocean/av/stream/mts/meta"
"bitbucket.org/ausocean/av/stream/mts/pes" "bitbucket.org/ausocean/av/stream/mts/pes"
"bitbucket.org/ausocean/av/stream/mts/psi" "bitbucket.org/ausocean/av/stream/mts/psi"
) )
@ -82,93 +82,21 @@ var (
}, },
}, },
} }
// standardPmtTimeLocation is a standard PMT with time and location
// descriptors, but time and location fields zeroed out.
standardPmtTimeLocation = psi.PSI{
Pf: 0x00,
Tid: 0x02,
Ssi: true,
Sl: 0x3e,
Tss: &psi.TSS{
Tide: 0x01,
V: 0,
Cni: true,
Sn: 0,
Lsn: 0,
Sd: &psi.PMT{
Pcrpid: 0x0100,
Pil: psi.PmtTimeLocationPil,
Pd: []psi.Desc{
{
Dt: psi.TimeDescTag,
Dl: psi.TimeDataSize,
Dd: make([]byte, psi.TimeDataSize),
},
{
Dt: psi.LocationDescTag,
Dl: psi.LocationDataSize,
Dd: make([]byte, psi.LocationDataSize),
},
},
Essd: &psi.ESSD{
St: 0x1b,
Epid: 0x0100,
Esil: 0x00,
},
},
},
}
) )
const ( const (
psiSndCnt = 7 psiInterval = 1 * time.Second
) )
// timeLocation holds time and location data // Meta allows addition of metadata to encoded mts from outside of this pkg.
type timeLocation struct { // See meta pkg for usage.
mu sync.RWMutex //
time uint64 // TODO: make this not global.
location string var Meta *meta.Data
}
// SetTimeStamp sets the time field of a TimeLocation.
func (tl *timeLocation) SetTimeStamp(t uint64) {
tl.mu.Lock()
tl.time = t
tl.mu.Unlock()
}
// GetTimeStamp returns the location of a TimeLocation.
func (tl *timeLocation) TimeStamp() uint64 {
tl.mu.RLock()
t := tl.time
tl.mu.RUnlock()
return t
}
// SetLocation sets the location of a TimeLocation.
func (tl *timeLocation) SetLocation(l string) {
tl.mu.Lock()
tl.location = l
tl.mu.Unlock()
}
// GetLocation returns the location of a TimeLocation.
func (tl *timeLocation) Location() string {
tl.mu.RLock()
l := tl.location
tl.mu.RUnlock()
return l
}
// MetData will hold time and location data which may be set externally if
// this data is available. It is then inserted into mpegts packets outputted.
var MetaData timeLocation
var ( var (
patTable = standardPat.Bytes() patTable = standardPat.Bytes()
pmtTable = standardPmtTimeLocation.Bytes() pmtTable = standardPmt.Bytes()
) )
const ( const (
@ -199,9 +127,9 @@ type Encoder struct {
tsSpace [PacketSize]byte tsSpace [PacketSize]byte
pesSpace [pes.MaxPesSize]byte pesSpace [pes.MaxPesSize]byte
psiCount int
continuity map[int]byte continuity map[int]byte
psiLastTime time.Time
} }
// NewEncoder returns an Encoder with the specified frame rate. // NewEncoder returns an Encoder with the specified frame rate.
@ -233,6 +161,15 @@ const (
// generate handles the incoming data and generates equivalent mpegts packets - // generate handles the incoming data and generates equivalent mpegts packets -
// sending them to the output channel. // sending them to the output channel.
func (e *Encoder) Encode(nalu []byte) error { func (e *Encoder) Encode(nalu []byte) error {
now := time.Now()
if now.Sub(e.psiLastTime) > psiInterval {
err := e.writePSI()
if err != nil {
return err
}
e.psiLastTime = now
}
// Prepare PES data. // Prepare PES data.
pesPkt := pes.Packet{ pesPkt := pes.Packet{
StreamID: streamID, StreamID: streamID,
@ -262,13 +199,6 @@ func (e *Encoder) Encode(nalu []byte) error {
pkt.PCR = e.pcr() pkt.PCR = e.pcr()
pusi = false pusi = false
} }
if e.psiCount <= 0 {
err := e.writePSI()
if err != nil {
return err
}
}
e.psiCount--
_, err := e.dst.Write(pkt.Bytes(e.tsSpace[:PacketSize])) _, err := e.dst.Write(pkt.Bytes(e.tsSpace[:PacketSize]))
if err != nil { if err != nil {
return err return err
@ -286,39 +216,32 @@ func (e *Encoder) writePSI() error {
// Write PAT. // Write PAT.
patPkt := Packet{ patPkt := Packet{
PUSI: true, PUSI: true,
PID: patPid, PID: PatPid,
CC: e.ccFor(patPid), CC: e.ccFor(PatPid),
AFC: hasPayload, AFC: HasPayload,
Payload: patTable, Payload: psi.AddPadding(patTable),
} }
_, err := e.dst.Write(patPkt.Bytes(e.tsSpace[:PacketSize])) _, err := e.dst.Write(patPkt.Bytes(e.tsSpace[:PacketSize]))
if err != nil { if err != nil {
return err return err
} }
pmtTable, err = updateMeta(pmtTable)
// Update pmt table time and location.
err = psi.UpdateTime(pmtTable, MetaData.TimeStamp())
if err != nil { if err != nil {
return err return err
} }
err = psi.UpdateLocation(pmtTable, MetaData.Location())
if err != nil {
return nil
}
// Create mts packet from pmt table. // Create mts packet from pmt table.
pmtPkt := Packet{ pmtPkt := Packet{
PUSI: true, PUSI: true,
PID: pmtPid, PID: PmtPid,
CC: e.ccFor(pmtPid), CC: e.ccFor(PmtPid),
AFC: hasPayload, AFC: HasPayload,
Payload: pmtTable, Payload: psi.AddPadding(pmtTable),
} }
_, err = e.dst.Write(pmtPkt.Bytes(e.tsSpace[:PacketSize])) _, err = e.dst.Write(pmtPkt.Bytes(e.tsSpace[:PacketSize]))
if err != nil { if err != nil {
return err return err
} }
e.psiCount = psiSndCnt
return nil return nil
} }
@ -344,3 +267,11 @@ func (e *Encoder) ccFor(pid int) byte {
e.continuity[pid] = (cc + 1) & continuityCounterMask e.continuity[pid] = (cc + 1) & continuityCounterMask
return cc return cc
} }
// updateMeta adds/updates a metaData descriptor in the given psi bytes using data
// contained in the global Meta struct.
func updateMeta(b []byte) ([]byte, error) {
p := psi.PSIBytes(b)
err := p.AddDescriptor(psi.MetadataTag, Meta.Encode())
return []byte(p), err
}

185
stream/mts/meta/meta.go Normal file
View File

@ -0,0 +1,185 @@
/*
NAME
meta.go
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE
meta.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package meta
import (
"encoding/binary"
"errors"
"strings"
"sync"
)
// This is the headsize of our metadata string,
// which is encoded int the data body of a pmt descriptor.
const headSize = 4
const (
majVer = 1
minVer = 0
)
// Indices of bytes for uint16 metadata length.
const (
dataLenIdx = 2
)
var (
errKeyAbsent = errors.New("Key does not exist in map")
errNoHeader = errors.New("Metadata string does not contain header")
errInvalidHeader = errors.New("Metadata string does not contain valid header")
)
// Metadata provides functionality for the storage and encoding of metadata
// using a map.
type Data struct {
mu sync.RWMutex
data map[string]string
order []string
enc []byte
}
// New returns a pointer to a new Metadata.
func New() *Data {
return &Data{
data: make(map[string]string),
enc: []byte{
0x00, // Reserved byte
(majVer << 4) | minVer, // MS and LS versions
0x00, // Data len byte1
0x00, // Data len byte2
},
}
}
// NewWith creates a meta.Data and fills map with initial data given. If there
// is repeated key, then the latter overwrites the prior.
func NewWith(data [][2]string) *Data {
m := New()
m.order = make([]string, 0, len(data))
for _, d := range data {
if _, exists := m.data[d[0]]; !exists {
m.order = append(m.order, d[0])
}
m.data[d[0]] = d[1]
}
return m
}
// Add adds metadata with key and val.
func (m *Data) Add(key, val string) {
m.mu.Lock()
defer m.mu.Unlock()
m.data[key] = val
for _, k := range m.order {
if k == key {
return
}
}
m.order = append(m.order, key)
return
}
// All returns the a copy of the map containing the meta data.
func (m *Data) All() map[string]string {
m.mu.Lock()
cpy := make(map[string]string)
for k, v := range m.data {
cpy[k] = v
}
m.mu.Unlock()
return cpy
}
// Get returns the meta data for the passed key.
func (m *Data) Get(key string) (val string, ok bool) {
m.mu.Lock()
val, ok = m.data[key]
m.mu.Unlock()
return
}
// Delete deletes a meta entry in the map and returns error if it doesnt exist.
func (m *Data) Delete(key string) {
m.mu.Lock()
defer m.mu.Unlock()
if _, ok := m.data[key]; ok {
delete(m.data, key)
for i, k := range m.order {
if k == key {
copy(m.order[:i], m.order[i+1:])
m.order = m.order[:len(m.order)-1]
break
}
}
return
}
return
}
// Encode takes the meta data map and encodes into a byte slice with header
// describing the version, length of data and data in TSV format.
func (m *Data) Encode() []byte {
m.enc = m.enc[:headSize]
// Iterate over map and append entries, only adding tab if we're not on the
// last entry.
var entry string
for i, k := range m.order {
v := m.data[k]
entry += k + "=" + v
if i+1 < len(m.data) {
entry += "\t"
}
}
m.enc = append(m.enc, []byte(entry)...)
// Calculate and set data length in encoded meta header.
dataLen := len(m.enc[headSize:])
binary.BigEndian.PutUint16(m.enc[dataLenIdx:dataLenIdx+2], uint16(dataLen))
return m.enc
}
// ReadFrom extracts a value from a metadata string d, for the given key. If the
// key is not present in the metadata string, an error is returned. If the
// metadata header is not present in the string, an error is returned.
func Extract(key string, d []byte) (string, error) {
if d[0] != 0 {
return "", errNoHeader
} else if d[0] == 0 && binary.BigEndian.Uint16(d[2:headSize]) != uint16(len(d[headSize:])) {
return "", errInvalidHeader
}
d = d[headSize:]
entries := strings.Split(string(d), "\t")
for _, entry := range entries {
kv := strings.Split(entry, "=")
if kv[0] == key {
return kv[1], nil
}
}
return "", errKeyAbsent
}

View File

@ -0,0 +1,167 @@
/*
NAME
meta_test.go
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE
meta_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package meta
import (
"bytes"
"encoding/binary"
"reflect"
"testing"
)
const (
tstKey1 = "loc"
tstData1 = "a,b,c"
tstKey2 = "ts"
tstData2 = "12345678"
tstData3 = "d,e,f"
)
// TestAddAndGet ensures that we can add metadata and then successfully get it.
func TestAddAndGet(t *testing.T) {
meta := New()
meta.Add(tstKey1, tstData1)
meta.Add(tstKey2, tstData2)
if data, ok := meta.Get(tstKey1); !ok {
t.Errorf("Could not get data for key: %v\n", tstKey1)
if data != tstData1 {
t.Error("Did not get expected data")
}
}
if data, ok := meta.Get(tstKey2); !ok {
t.Errorf("Could not get data for key: %v", tstKey2)
if data != tstData2 {
t.Error("Did not get expected data")
}
}
}
// TestUpdate checks that we can use Meta.Add to actually update metadata
// if it already exists in the Meta map.
func TestUpdate(t *testing.T) {
meta := New()
meta.Add(tstKey1, tstData1)
meta.Add(tstKey1, tstData3)
if data, ok := meta.Get(tstKey1); !ok {
t.Errorf("Could not get data for key: %v\n", tstKey1)
if data != tstData2 {
t.Error(`Data did not correctly update for key "loc"`)
}
}
}
// TestAll ensures we can get a correct map using Meta.All() after adding some data
func TestAll(t *testing.T) {
meta := New()
tstMap := map[string]string{
tstKey1: tstData1,
tstKey2: tstData2,
}
meta.Add(tstKey1, tstData1)
meta.Add(tstKey2, tstData2)
metaMap := meta.All()
if !reflect.DeepEqual(metaMap, tstMap) {
t.Errorf("Map not correct. Got: %v, want: %v", metaMap, tstMap)
}
}
// TestGetAbsentKey ensures that we get the expected error when we try to get with
// key that does not yet exist in the Meta map.
func TestGetAbsentKey(t *testing.T) {
meta := New()
if _, ok := meta.Get(tstKey1); ok {
t.Error("Get for absent key incorrectly returned'ok'")
}
}
// TestDelete ensures we can remove a data entry in the Meta map.
func TestDelete(t *testing.T) {
meta := New()
meta.Add(tstKey1, tstData1)
meta.Delete(tstKey1)
if _, ok := meta.Get(tstKey1); ok {
t.Error("Get incorrectly returned okay for absent key")
}
}
// TestEncode checks that we're getting the correct byte slice from Meta.Encode().
func TestEncode(t *testing.T) {
meta := New()
meta.Add(tstKey1, tstData1)
meta.Add(tstKey2, tstData2)
dataLen := len(tstKey1+tstData1+tstKey2+tstData2) + 3
header := [4]byte{
0x00,
0x10,
}
binary.BigEndian.PutUint16(header[2:4], uint16(dataLen))
expectedOut := append(header[:], []byte(
tstKey1+"="+tstData1+"\t"+
tstKey2+"="+tstData2)...)
got := meta.Encode()
if !bytes.Equal(expectedOut, got) {
t.Errorf("Did not get expected out. \nGot : %v, \nwant: %v\n", got, expectedOut)
}
}
// TestReadFrom checks that we can correctly obtain a value for a partiular key
// from a string of metadata using the ReadFrom func.
func TestReadFrom(t *testing.T) {
tstMeta := append([]byte{0x00, 0x10, 0x00, 0x12}, "loc=a,b,c\tts=12345"...)
tests := []struct {
key string
want string
}{
{
"loc",
"a,b,c",
},
{
"ts",
"12345",
},
}
for _, test := range tests {
got, err := Extract(test.key, []byte(tstMeta))
if err != nil {
t.Errorf("Unexpected err: %v\n", err)
}
if got != test.want {
t.Errorf("Did not get expected out. \nGot : %v, \nwant: %v\n", got, test.want)
}
}
}

View File

@ -0,0 +1,102 @@
/*
NAME
metaEncode_test.go
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE
metaEncode_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package mts
import (
"bytes"
"testing"
"bitbucket.org/ausocean/av/stream/mts/meta"
"bitbucket.org/ausocean/av/stream/mts/psi"
)
const (
errNotExpectedOut = "Unexpected output. \n Got : %v\n, Want: %v\n"
errUnexpectedErr = "Unexpected error: %v\n"
)
const fps = 25
// TestMetaEncode1 checks that we can externally add a single metadata entry to
// the mts global Meta meta.Data struct and then successfully have the mts encoder
// write this to psi.
func TestMetaEncode1(t *testing.T) {
Meta = meta.New()
var b []byte
buf := bytes.NewBuffer(b)
e := NewEncoder(buf, fps)
Meta.Add("ts", "12345678")
if err := e.writePSI(); err != nil {
t.Errorf(errUnexpectedErr, err.Error())
}
out := buf.Bytes()
got := out[PacketSize+4:]
want := []byte{
0x00, 0x02, 0xb0, 0x23, 0x00, 0x01, 0xc1, 0x00, 0x00, 0xe1, 0x00, 0xf0, 0x11,
psi.MetadataTag, // Descriptor tag
0x0f, // Length of bytes to follow
0x00, 0x10, 0x00, 0x0b, 't', 's', '=', '1', '2', '3', '4', '5', '6', '7', '8', // timestamp
0x1b, 0xe1, 0x00, 0xf0, 0x00,
}
want = psi.AddCrc(want)
want = psi.AddPadding(want)
if !bytes.Equal(got, want) {
t.Errorf(errNotExpectedOut, got, want)
}
}
// TestMetaEncode2 checks that we can externally add two metadata entries to the
// Meta meta.Data global and then have the mts encoder successfully encode this
// into psi.
func TestMetaEncode2(t *testing.T) {
Meta = meta.New()
var b []byte
buf := bytes.NewBuffer(b)
e := NewEncoder(buf, fps)
Meta.Add("ts", "12345678")
Meta.Add("loc", "1234,4321,1234")
if err := e.writePSI(); err != nil {
t.Errorf(errUnexpectedErr, err.Error())
}
out := buf.Bytes()
got := out[PacketSize+4:]
want := []byte{
0x00, 0x02, 0xb0, 0x36, 0x00, 0x01, 0xc1, 0x00, 0x00, 0xe1, 0x00, 0xf0, 0x24,
psi.MetadataTag, // Descriptor tag
0x22, // Length of bytes to follow
0x00, 0x10, 0x00, 0x1e, 't', 's', '=', '1', '2', '3', '4', '5', '6', '7', '8', '\t', // timestamp
'l', 'o', 'c', '=', '1', '2', '3', '4', ',', '4', '3', '2', '1', ',', '1', '2', '3', '4', // location
0x1b, 0xe1, 0x00, 0xf0, 0x00,
}
want = psi.AddCrc(want)
want = psi.AddPadding(want)
if !bytes.Equal(got, want) {
t.Errorf(errNotExpectedOut, got, want)
}
}

View File

@ -32,11 +32,41 @@ import (
"errors" "errors"
) )
// General mpegts packet properties.
const ( const (
PacketSize = 188 PacketSize = 188
PayloadSize = 176 PayloadSize = 176
) )
// Program ID for various types of ts packets.
const (
SdtPid = 17
PatPid = 0
PmtPid = 4096
VideoPid = 256
)
// StreamID is the id of the first stream.
const StreamID = 0xe0
// HeadSize is the size of an mpegts packet header.
const HeadSize = 4
// Consts relating to adaptation field.
const (
AdaptationIdx = 4 // Index to the adaptation field (index of AFL).
AdaptationControlIdx = 3 // Index to octet with adaptation field control.
AdaptationFieldsIdx = AdaptationIdx + 1 // Adaptation field index is the index of the adaptation fields.
DefaultAdaptationSize = 2 // Default size of the adaptation field.
AdaptationControlMask = 0x30 // Mask for the adaptation field control in octet 3.
)
// TODO: make this better - currently doesn't make sense.
const (
HasPayload = 0x1
HasAdaptationField = 0x2
)
/* /*
The below data struct encapsulates the fields of an MPEG-TS packet. Below is The below data struct encapsulates the fields of an MPEG-TS packet. Below is
the formatting of an MPEG-TS packet for reference! the formatting of an MPEG-TS packet for reference!
@ -128,19 +158,19 @@ type Packet struct {
} }
// FindPMT will take a clip of mpegts and try to find a PMT table - if one // FindPMT will take a clip of mpegts and try to find a PMT table - if one
// is found, then it is returned, otherwise nil and an error is returned. // is found, then it is returned along with its index, otherwise nil, -1 and an error is returned.
func FindPMT(d []byte) (p []byte, err error) { func FindPMT(d []byte) (p []byte, i int, err error) {
if len(d) < PacketSize { if len(d) < PacketSize {
return nil, errors.New("Mmpegts data not of valid length") return nil, -1, errors.New("Mmpegts data not of valid length")
} }
for i := 0; i < len(d); i += PacketSize { for i = 0; i < len(d); i += PacketSize {
pid := (uint16(d[i+1]&0x1f) << 8) | uint16(d[i+2]) pid := (uint16(d[i+1]&0x1f) << 8) | uint16(d[i+2])
if pid == pmtPid { if pid == pmtPid {
p = d[i+4 : i+PacketSize] p = d[i+4 : i+PacketSize]
return return
} }
} }
return nil, errors.New("Could not find pmt table in mpegts data") return nil, -1, errors.New("Could not find pmt table in mpegts data")
} }
// FillPayload takes a channel and fills the packets Payload field until the // FillPayload takes a channel and fills the packets Payload field until the

View File

@ -34,16 +34,16 @@ import (
) )
// addCrc appends a crc table to a given psi table in bytes // addCrc appends a crc table to a given psi table in bytes
func addCrc(out []byte) []byte { func AddCrc(out []byte) []byte {
t := make([]byte, len(out)+4) t := make([]byte, len(out)+4)
copy(t, out) copy(t, out)
updateCrc(t) UpdateCrc(t[1:])
return t return t
} }
// updateCrc updates the crc of bytes slice, writing the checksum into the last four bytes. // updateCrc updates the crc of bytes slice, writing the checksum into the last four bytes.
func updateCrc(b []byte) { func UpdateCrc(b []byte) {
crc32 := crc32_Update(0xffffffff, crc32_MakeTable(bits.Reverse32(crc32.IEEE)), b[1:len(b)-4]) crc32 := crc32_Update(0xffffffff, crc32_MakeTable(bits.Reverse32(crc32.IEEE)), b[:len(b)-4])
binary.BigEndian.PutUint32(b[len(b)-4:], crc32) binary.BigEndian.PutUint32(b[len(b)-4:], crc32)
} }

View File

@ -0,0 +1,322 @@
/*
NAME
descriptor_test.go
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE
descriptor_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package psi
import (
"bytes"
"testing"
)
const (
errNotExpectedOut = "Did not get expected output: \ngot : %v, \nwant: %v"
errUnexpectedErr = "Unexpected error: %v\n"
)
var (
tstPsi1 = PSI{
Pf: 0x00,
Tid: 0x02,
Ssi: true,
Sl: 0x1c,
Tss: &TSS{
Tide: 0x01,
V: 0,
Cni: true,
Sn: 0,
Lsn: 0,
Sd: &PMT{
Pcrpid: 0x0100, // wrong
Pil: 10,
Pd: []Desc{
{
Dt: TimeDescTag,
Dl: TimeDataSize,
Dd: make([]byte, TimeDataSize),
},
},
Essd: &ESSD{
St: 0x1b,
Epid: 0x0100,
Esil: 0x00,
},
},
},
}
tstPsi2 = PSI{
Pf: 0x00,
Tid: 0x02,
Ssi: true,
Sl: 0x12,
Tss: &TSS{
Tide: 0x01,
V: 0,
Cni: true,
Sn: 0,
Lsn: 0,
Sd: &PMT{
Pcrpid: 0x0100,
Pil: 0,
Essd: &ESSD{
St: 0x1b,
Epid: 0x0100,
Esil: 0x00,
},
},
},
}
tstPsi3 = PSI{
Pf: 0x00,
Tid: 0x02,
Ssi: true,
Sl: 0x3e,
Tss: &TSS{
Tide: 0x01,
V: 0,
Cni: true,
Sn: 0,
Lsn: 0,
Sd: &PMT{
Pcrpid: 0x0100,
Pil: PmtTimeLocationPil,
Pd: []Desc{
{
Dt: TimeDescTag,
Dl: TimeDataSize,
Dd: make([]byte, TimeDataSize),
},
{
Dt: LocationDescTag,
Dl: LocationDataSize,
Dd: make([]byte, LocationDataSize),
},
},
Essd: &ESSD{
St: 0x1b,
Epid: 0x0100,
Esil: 0x00,
},
},
},
}
)
var (
pmtTimeBytesResizedBigger = []byte{
0x00, 0x02, 0xb0, 0x1e, 0x00, 0x01, 0xc1, 0x00, 0x00, 0xe1, 0x00, 0xf0, 0x0c,
TimeDescTag, // Descriptor tag
0x0a, // Length of bytes to follow
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, // timestamp
0x1b, 0xe1, 0x00, 0xf0, 0x00,
}
pmtTimeBytesResizedSmaller = []byte{
0x00, 0x02, 0xb0, 0x1a, 0x00, 0x01, 0xc1, 0x00, 0x00, 0xe1, 0x00, 0xf0, 0x08,
TimeDescTag, // Descriptor tag
0x06, // Length of bytes to follow
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, // timestamp
0x1b, 0xe1, 0x00, 0xf0, 0x00,
}
)
// TestHasDescriptorExists checks that PSIBytes.HasDescriptor performs as expected
// when the PSI we're interested in has the descriptor of interest. HasDescriptor
// should return the descriptor bytes.
// TODO: HasDescriptor also returns index of descriptor - we should check this.
func TestHasDescriptorExists(t *testing.T) {
p := PSIBytes(tstPsi3.Bytes())
_, got := p.HasDescriptor(LocationDescTag)
want := []byte{
LocationDescTag,
LocationDataSize,
}
want = append(want, make([]byte, LocationDataSize)...)
if !bytes.Equal(got, want) {
t.Errorf(errNotExpectedOut, got, want)
}
}
// TestHasDescriptorAbsent checks that PSIBytes.HasDescriptor performs as expected
// when the PSI does not have the descriptor of interest. HasDescriptor should
// return a nil slice and a negative index.
// TODO: check index here as well.
func TestHasDescriptorAbsent(t *testing.T) {
p := PSIBytes(tstPsi3.Bytes())
const fakeTag = 236
_, got := p.HasDescriptor(fakeTag)
var want []byte
if !bytes.Equal(got, want) {
t.Errorf(errNotExpectedOut, got, want)
}
}
// TestHasDescriptorNone checks that PSIBytes.HasDescriptor behaves as expected
// when the PSI does not have any descriptors. HasDescriptor should return a nil
// slice.
// TODO: again check index here.
func TestHasDescriptorNone(t *testing.T) {
p := PSIBytes(tstPsi2.Bytes())
_, got := p.HasDescriptor(LocationDescTag)
var want []byte
if !bytes.Equal(got, want) {
t.Errorf(errNotExpectedOut, got, want)
}
}
// TestProgramInfoLen checks that PSIBytes.ProgramInfoLen correctly extracts
// the program info length from a PSI.
func TestProgramInfoLen(t *testing.T) {
p := PSIBytes(tstPsi1.Bytes())
got := p.ProgramInfoLen()
want := 10
if got != want {
t.Errorf(errNotExpectedOut, got, want)
}
}
// TestDescriptors checks that PSIBytes.descriptors correctly returns the descriptors
// from a PSI when descriptors exist.
func TestDescriptors(t *testing.T) {
p := PSIBytes(tstPsi1.Bytes())
got := p.descriptors()
want := []byte{
TimeDescTag,
TimeDataSize,
}
want = append(want, make([]byte, TimeDataSize)...)
if !bytes.Equal(got, want) {
t.Errorf(errNotExpectedOut, got, want)
}
}
// TestDescriptors checks that PSIBYtes.desriptors correctly returns nil when
// we try to get descriptors from a psi without any descriptors.
func TestDescriptorsNone(t *testing.T) {
p := PSIBytes(tstPsi2.Bytes())
got := p.descriptors()
var want []byte
if !bytes.Equal(got, want) {
t.Errorf(errNotExpectedOut, got, want)
}
}
// TestCreateDescriptorEmpty checks that PSIBytes.createDescriptor correctly adds
// a descriptor to the descriptors list in a PSI when it has no descriptors already.
func TestCreateDescriptorEmpty(t *testing.T) {
got := PSIBytes(tstPsi2.Bytes())
got.createDescriptor(TimeDescTag, make([]byte, TimeDataSize))
UpdateCrc(got[1:])
want := PSIBytes(tstPsi1.Bytes())
if !bytes.Equal(want, got) {
t.Errorf(errNotExpectedOut, got, want)
}
}
// TestCreateDescriptorNotEmpty checks that PSIBytes.createDescriptor correctly adds
// a descriptor to the descriptors list in a PSI when it already has one with
// a different tag.
func TestCreateDescriptorNotEmpty(t *testing.T) {
got := PSIBytes(tstPsi1.Bytes())
got.createDescriptor(LocationDescTag, make([]byte, LocationDataSize))
UpdateCrc(got[1:])
want := PSIBytes(tstPsi3.Bytes())
if !bytes.Equal(want, got) {
t.Errorf(errNotExpectedOut, got, want)
}
}
// TestAddDescriptorEmpty checks that PSIBytes.AddDescriptor correctly adds a descriptor
// when there are no other descriptors present in the PSI.
func TestAddDescriptorEmpty(t *testing.T) {
got := PSIBytes(tstPsi2.Bytes())
if err := got.AddDescriptor(TimeDescTag, make([]byte, TimeDataSize)); err != nil {
t.Errorf(errUnexpectedErr, err.Error())
}
want := PSIBytes(tstPsi1.Bytes())
if !bytes.Equal(got, want) {
t.Errorf(errNotExpectedOut, got, want)
}
}
// TestAddDescriptorNonEmpty checks that PSIBytes.AddDescriptor correctly adds a
// descriptor when there is already a descriptor of a differing type in a PSI.
func TestAddDescriptorNonEmpty(t *testing.T) {
got := PSIBytes(tstPsi1.Bytes())
if err := got.AddDescriptor(LocationDescTag, make([]byte, LocationDataSize)); err != nil {
t.Errorf(errUnexpectedErr, err.Error())
}
want := PSIBytes(tstPsi3.Bytes())
if !bytes.Equal(got, want) {
t.Errorf(errNotExpectedOut, got, want)
}
}
// TestAddDescriptorUpdateSame checks that PSIBytes.AddDescriptor correctly updates data in a descriptor
// with the same given tag, with data being the same size. AddDescriptor should just copy new data into
// the descriptors data field.
func TestAddDescriptorUpdateSame(t *testing.T) {
newData := [8]byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}
want := PSIBytes(tstPsi2.Bytes())
want.createDescriptor(TimeDescTag, newData[:])
got := PSIBytes(tstPsi1.Bytes())
if err := got.AddDescriptor(TimeDescTag, newData[:]); err != nil {
t.Errorf(errUnexpectedErr, err.Error())
}
if !bytes.Equal(got, want) {
t.Errorf(errNotExpectedOut, got, want)
}
}
// TestAddDescriptorUpdateBigger checks that PSIBytes.AddDescriptor correctly resizes descriptor with same given tag
// to a bigger size and copies in new data. AddDescriptor should find descriptor with same tag, increase size of psi,
// shift data to make room for update descriptor, and then copy in the new data.
func TestAddDescriptorUpdateBigger(t *testing.T) {
got := PSIBytes(tstPsi1.Bytes())
if err := got.AddDescriptor(TimeDescTag, []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a}); err != nil {
t.Errorf(errUnexpectedErr, err.Error())
}
want := AddCrc(pmtTimeBytesResizedBigger)
if !bytes.Equal(got, want) {
t.Errorf(errNotExpectedOut, got, want)
}
}
// TestAddDescriptorUpdateSmaller checks that PSIBytes.AddDescriptor correctly resizes descriptor with same given tag
// in a psi to a smaller size and copies in new data. AddDescriptor should find tag with same descrtiptor, shift data
// after descriptor upwards, trim the psi to new size, and then copy in new data.
func TestAddDescriptorUpdateSmaller(t *testing.T) {
got := PSIBytes(tstPsi1.Bytes())
if err := got.AddDescriptor(TimeDescTag, []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06}); err != nil {
t.Errorf(errUnexpectedErr, err.Error())
}
want := AddCrc(pmtTimeBytesResizedSmaller)
if !bytes.Equal(got, want) {
t.Errorf(errNotExpectedOut, got, want)
}
}

View File

@ -64,15 +64,14 @@ func UpdateTime(dst []byte, t uint64) error {
for i := range dst[TimeDataIndx : TimeDataIndx+TimeDataSize] { for i := range dst[TimeDataIndx : TimeDataIndx+TimeDataSize] {
dst[i+TimeDataIndx] = ts[i] dst[i+TimeDataIndx] = ts[i]
} }
updateCrc(dst) UpdateCrc(dst[1:])
return nil return nil
} }
// SyntaxSecLenFrom takes a byte slice representation of a psi and extracts // SyntaxSecLenFrom takes a byte slice representation of a psi and extracts
// it's syntax section length // it's syntax section length
func SyntaxSecLenFrom(p []byte) (l uint8) { func SyntaxSecLenFrom(p []byte) int {
l = uint8(p[syntaxSecLenIndx]) - crcSize return int(((p[SyntaxSecLenIdx1] & SyntaxSecLenMask1) << 8) | p[SyntaxSecLenIdx2])
return
} }
// TimeFrom takes a byte slice representation of a psi-pmt and extracts it's // TimeFrom takes a byte slice representation of a psi-pmt and extracts it's
@ -112,7 +111,7 @@ func UpdateLocation(d []byte, s string) error {
for i := range loc { for i := range loc {
loc[i] = 0 loc[i] = 0
} }
updateCrc(d) UpdateCrc(d[1:])
return nil return nil
} }
@ -127,7 +126,7 @@ func trimTo(d []byte, t byte) []byte {
// addPadding adds an appropriate amount of padding to a pat or pmt table for // addPadding adds an appropriate amount of padding to a pat or pmt table for
// addition to an mpegts packet // addition to an mpegts packet
func addPadding(d []byte) []byte { func AddPadding(d []byte) []byte {
t := make([]byte, PacketSize) t := make([]byte, PacketSize)
copy(t, d) copy(t, d)
padding := t[len(d):] padding := t[len(d):]

View File

@ -26,11 +26,16 @@ LICENSE
package psi package psi
const ( import (
PacketSize = 184 // packet size of a psi. "errors"
"github.com/Comcast/gots/psi"
) )
// Lengths of section definitions // PacketSize of psi (without mpegts header)
const PacketSize = 184
// Lengths of section definitions.
const ( const (
ESSDDefLen = 5 ESSDDefLen = 5
DescDefLen = 2 DescDefLen = 2
@ -40,13 +45,14 @@ const (
PSIDefLen = 3 PSIDefLen = 3
) )
// Table Type IDs // Table Type IDs.
const ( const (
patID = 0x00 patID = 0x00
pmtID = 0x02 pmtID = 0x02
) )
// Consts relating to time description // Consts relating to time description
// TODO: remove this, we don't do metadata like this anymore.
const ( const (
TimeDescTag = 234 TimeDescTag = 234
TimeTagIndx = 13 TimeTagIndx = 13
@ -55,6 +61,7 @@ const (
) )
// Consts relating to location description // Consts relating to location description
// TODO: remove this, we don't do metadata like this anymore.
const ( const (
LocationDescTag = 235 LocationDescTag = 235
LocationTagIndx = 23 LocationTagIndx = 23
@ -62,10 +69,35 @@ const (
LocationDataSize = 32 // bytes LocationDataSize = 32 // bytes
) )
// Other misc consts // crc hassh Size
const crcSize = 4
// Consts relating to syntax section.
const ( const (
syntaxSecLenIndx = 3 TotalSyntaxSecLen = 180
crcSize = 4 SyntaxSecLenIdx1 = 2
SyntaxSecLenIdx2 = 3
SyntaxSecLenMask1 = 0x03
SectionLenMask1 = 0x03
)
// Consts relating to program info len.
const (
ProgramInfoLenIdx1 = 11
ProgramInfoLenIdx2 = 12
ProgramInfoLenMask1 = 0x03
)
// DescriptorsIdx is the index that the descriptors start at.
const DescriptorsIdx = ProgramInfoLenIdx2 + 1
// MetadataTag is the descriptor tag used for metadata.
const MetadataTag = 0x26
// TODO: get rid of these - not a good idea.
type (
PSIBytes []byte
Descriptor []byte
) )
// Program specific information // Program specific information
@ -135,8 +167,7 @@ func (p *PSI) Bytes() []byte {
out[2] = 0x80 | 0x30 | (0x03 & byte(p.Sl>>8)) out[2] = 0x80 | 0x30 | (0x03 & byte(p.Sl>>8))
out[3] = byte(p.Sl) out[3] = byte(p.Sl)
out = append(out, p.Tss.Bytes()...) out = append(out, p.Tss.Bytes()...)
out = addCrc(out) out = AddCrc(out)
out = addPadding(out)
return out return out
} }
@ -205,3 +236,135 @@ func asByte(b bool) byte {
} }
return 0x00 return 0x00
} }
// AddDescriptor adds or updates a descriptor in a PSI given a descriptor tag
// and data. If the psi is not a pmt, then an error is returned. If a descriptor
// with the given tag is not found in the psi, room is made and a descriptor with
// given tag and data is created. If a descriptor with the tag is found, the
// descriptor is resized as required and the new data is copied in.
func (p *PSIBytes) AddDescriptor(tag int, data []byte) error {
if psi.TableID(*p) != pmtID {
return errors.New("trying to add descriptor, but not pmt")
}
i, desc := p.HasDescriptor(tag)
if desc == nil {
err := p.createDescriptor(tag, data)
return err
}
oldDescLen := desc.len()
oldDataLen := int(desc[1])
newDataLen := len(data)
newDescLen := 2 + newDataLen
delta := newDescLen - oldDescLen
// If the old data length is more than the new data length, we need shift data
// after descriptor up, and then trim the psi. If the oldDataLen is less than
// new data then we need reseize psi and shift data down. If same do nothing.
switch {
case oldDataLen > newDataLen:
copy((*p)[i+newDescLen:], (*p)[i+oldDescLen:])
*p = (*p)[:len(*p)+delta]
case oldDataLen < newDataLen:
tmp := make([]byte, len(*p)+delta)
copy(tmp, *p)
*p = tmp
copy((*p)[i+newDescLen:], (*p)[i+oldDescLen:])
}
// Copy in new data
(*p)[i+1] = byte(newDataLen)
copy((*p)[i+2:], data)
newProgInfoLen := p.ProgramInfoLen() + delta
p.setProgInfoLen(newProgInfoLen)
newSectionLen := int(psi.SectionLength(*p)) + delta
p.setSectionLen(newSectionLen)
UpdateCrc((*p)[1:])
return nil
}
// HasDescriptor checks if a descriptor of the given tag exists in a PSI. If the descriptor
// of the given tag exists, an index of this descriptor, as well as the Descriptor is returned.
// If the descriptor of the given tag cannot be found, -1 and a nil slice is returned.
//
// TODO: check if pmt, return error if not ?
func (p *PSIBytes) HasDescriptor(tag int) (int, Descriptor) {
descs := p.descriptors()
if descs == nil {
return -1, nil
}
for i := 0; i < len(descs); i += 2 + int(descs[i+1]) {
if int(descs[i]) == tag {
return i + DescriptorsIdx, descs[i : i+2+int(descs[i+1])]
}
}
return -1, nil
}
// createDescriptor creates a descriptor in a psi given a tag and data. It does so
// by resizing the psi, shifting existing data down and copying in new descriptor
// in new space.
func (p *PSIBytes) createDescriptor(tag int, data []byte) error {
curProgLen := p.ProgramInfoLen()
oldSyntaxSectionLen := SyntaxSecLenFrom(*p)
if TotalSyntaxSecLen-(oldSyntaxSectionLen+2+len(data)) <= 0 {
return errors.New("Not enough space in psi to create descriptor.")
}
dataLen := len(data)
newDescIdx := DescriptorsIdx + curProgLen
newDescLen := dataLen + 2
// Increase size of psi and copy data down to make room for new descriptor.
tmp := make([]byte, len(*p)+newDescLen)
copy(tmp, *p)
*p = tmp
copy((*p)[newDescIdx+newDescLen:], (*p)[newDescIdx:newDescIdx+newDescLen])
// Set the tag, data len and data of the new desriptor.
(*p)[newDescIdx] = byte(tag)
(*p)[newDescIdx+1] = byte(dataLen)
copy((*p)[newDescIdx+2:newDescIdx+2+dataLen], data)
// Set length fields and update the psi crc.
addedLen := dataLen + 2
newProgInfoLen := curProgLen + addedLen
p.setProgInfoLen(newProgInfoLen)
newSyntaxSectionLen := int(oldSyntaxSectionLen) + addedLen
p.setSectionLen(newSyntaxSectionLen)
UpdateCrc((*p)[1:])
return nil
}
// setProgInfoLen sets the program information length in a psi with a pmt.
func (p *PSIBytes) setProgInfoLen(l int) {
(*p)[ProgramInfoLenIdx1] &= 0xff ^ ProgramInfoLenMask1
(*p)[ProgramInfoLenIdx1] |= byte(l>>8) & ProgramInfoLenMask1
(*p)[ProgramInfoLenIdx2] = byte(l)
}
// setSectionLen sets section length in a psi.
func (p *PSIBytes) setSectionLen(l int) {
(*p)[SyntaxSecLenIdx1] &= 0xff ^ SyntaxSecLenMask1
(*p)[SyntaxSecLenIdx1] |= byte(l>>8) & SyntaxSecLenMask1
(*p)[SyntaxSecLenIdx2] = byte(l)
}
// descriptors returns the descriptors in a psi if they exist, otherwise
// a nil slice is returned.
func (p *PSIBytes) descriptors() []byte {
return (*p)[DescriptorsIdx : DescriptorsIdx+p.ProgramInfoLen()]
}
// len returns the length of a descriptor in bytes.
func (d *Descriptor) len() int {
return int(2 + (*d)[1])
}
// ProgramInfoLen returns the program info length of a PSI.
//
// TODO: check if pmt - if not return 0 ? or -1 ?
func (p *PSIBytes) ProgramInfoLen() int {
return int((((*p)[ProgramInfoLenIdx1] & ProgramInfoLenMask1) << 8) | (*p)[ProgramInfoLenIdx2])
}

View File

@ -282,7 +282,7 @@ var bytesTests = []struct {
func TestBytes(t *testing.T) { func TestBytes(t *testing.T) {
for _, test := range bytesTests { for _, test := range bytesTests {
got := test.input.Bytes() got := test.input.Bytes()
if !bytes.Equal(got, addPadding(addCrc(test.want))) { if !bytes.Equal(got, AddCrc(test.want)) {
t.Errorf("unexpected error for test %v: got:%v want:%v", test.name, got, t.Errorf("unexpected error for test %v: got:%v want:%v", test.name, got,
test.want) test.want)
} }
@ -301,7 +301,7 @@ func TestTimestampToBytes(t *testing.T) {
func TestTimeUpdate(t *testing.T) { func TestTimeUpdate(t *testing.T) {
cpy := make([]byte, len(pmtTimeBytes1)) cpy := make([]byte, len(pmtTimeBytes1))
copy(cpy, pmtTimeBytes1) copy(cpy, pmtTimeBytes1)
cpy = addCrc(cpy) cpy = AddCrc(cpy)
err := UpdateTime(cpy, tstTime2) err := UpdateTime(cpy, tstTime2)
cpy = cpy[:len(cpy)-4] cpy = cpy[:len(cpy)-4]
if err != nil { if err != nil {
@ -343,7 +343,7 @@ func TestLocationGet(t *testing.T) {
func TestLocationUpdate(t *testing.T) { func TestLocationUpdate(t *testing.T) {
cpy := make([]byte, len(pmtWithMetaTst1)) cpy := make([]byte, len(pmtWithMetaTst1))
copy(cpy, pmtWithMetaTst1) copy(cpy, pmtWithMetaTst1)
cpy = addCrc(cpy) cpy = AddCrc(cpy)
err := UpdateLocation(cpy, locationTstStr2) err := UpdateLocation(cpy, locationTstStr2)
cpy = cpy[:len(cpy)-4] cpy = cpy[:len(cpy)-4]
if err != nil { if err != nil {