mirror of https://bitbucket.org/ausocean/av.git
stream/mts: fixing conflicts
This commit is contained in:
commit
8bdfed9960
|
@ -52,6 +52,13 @@ const (
|
||||||
defaultLogVerbosity = logger.Debug
|
defaultLogVerbosity = logger.Debug
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Revid modes
|
||||||
|
const (
|
||||||
|
normal = "Normal"
|
||||||
|
paused = "Paused"
|
||||||
|
burst = "Burst"
|
||||||
|
)
|
||||||
|
|
||||||
// Other misc consts
|
// Other misc consts
|
||||||
const (
|
const (
|
||||||
netSendRetryTime = 5 * time.Second
|
netSendRetryTime = 5 * time.Second
|
||||||
|
@ -81,23 +88,22 @@ func main() {
|
||||||
cfg := handleFlags()
|
cfg := handleFlags()
|
||||||
|
|
||||||
if !*useNetsender {
|
if !*useNetsender {
|
||||||
// run revid for the specified duration
|
rv, err := revid.New(cfg, nil)
|
||||||
rv, _, err := startRevid(nil, cfg)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
cfg.Logger.Log(logger.Fatal, pkg+"failed to initialiase revid", "error", err.Error())
|
||||||
|
}
|
||||||
|
if err = rv.Start(); err != nil {
|
||||||
cfg.Logger.Log(logger.Fatal, pkg+"failed to start revid", "error", err.Error())
|
cfg.Logger.Log(logger.Fatal, pkg+"failed to start revid", "error", err.Error())
|
||||||
}
|
}
|
||||||
time.Sleep(*runDurationPtr)
|
time.Sleep(*runDurationPtr)
|
||||||
err = stopRevid(rv)
|
if err = rv.Stop(); err != nil {
|
||||||
if err != nil {
|
|
||||||
cfg.Logger.Log(logger.Error, pkg+"failed to stop revid before program termination", "error", err.Error())
|
cfg.Logger.Log(logger.Error, pkg+"failed to stop revid before program termination", "error", err.Error())
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err := run(nil, cfg)
|
if err := run(cfg); err != nil {
|
||||||
if err != nil {
|
|
||||||
log.Log(logger.Fatal, pkg+"failed to run revid", "error", err.Error())
|
log.Log(logger.Fatal, pkg+"failed to run revid", "error", err.Error())
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -253,66 +259,97 @@ func handleFlags() revid.Config {
|
||||||
}
|
}
|
||||||
|
|
||||||
// initialize then run the main NetSender client
|
// initialize then run the main NetSender client
|
||||||
func run(rv *revid.Revid, cfg revid.Config) error {
|
func run(cfg revid.Config) error {
|
||||||
// initialize NetSender and use NetSender's logger
|
|
||||||
//config.Logger = netsender.Logger()
|
|
||||||
log.Log(logger.Info, pkg+"running in NetSender mode")
|
log.Log(logger.Info, pkg+"running in NetSender mode")
|
||||||
|
|
||||||
|
var vars map[string]string
|
||||||
|
|
||||||
|
// initialize NetSender and use NetSender's logger
|
||||||
var ns netsender.Sender
|
var ns netsender.Sender
|
||||||
err := ns.Init(log, nil, nil, nil)
|
err := ns.Init(log, nil, nil, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
vars, _ := ns.Vars()
|
|
||||||
|
vars, _ = ns.Vars()
|
||||||
vs := ns.VarSum()
|
vs := ns.VarSum()
|
||||||
paused := false
|
|
||||||
if vars["mode"] == "Paused" {
|
rv, err := revid.New(cfg, &ns)
|
||||||
paused = true
|
if err != nil {
|
||||||
|
log.Log(logger.Fatal, pkg+"could not initialise revid", "error", err.Error())
|
||||||
}
|
}
|
||||||
if !paused {
|
|
||||||
rv, cfg, err = updateRevid(&ns, rv, cfg, vars, false)
|
// Update revid to get latest config settings from netreceiver.
|
||||||
|
err = rv.Update(vars)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// If mode on netreceiver isn't paused then we can start revid.
|
||||||
|
if ns.Mode() != paused && ns.Mode() != burst {
|
||||||
|
err = rv.Start()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ns.Mode() == burst {
|
||||||
|
ns.SetMode(paused, &vs)
|
||||||
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
if err := send(&ns, rv); err != nil {
|
// TODO(saxon): replace this call with call to ns.Run().
|
||||||
log.Log(logger.Error, pkg+"polling failed", "error", err.Error())
|
err = send(&ns, rv)
|
||||||
|
if err != nil {
|
||||||
|
log.Log(logger.Error, pkg+"Run Failed. Retrying...", "error", err.Error())
|
||||||
time.Sleep(netSendRetryTime)
|
time.Sleep(netSendRetryTime)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if vs != ns.VarSum() {
|
// If var sum hasn't change we continue
|
||||||
// vars changed
|
if vs == ns.VarSum() {
|
||||||
vars, err := ns.Vars()
|
goto sleep
|
||||||
if err != nil {
|
}
|
||||||
log.Log(logger.Error, pkg+"netSender failed to get vars", "error", err.Error())
|
|
||||||
time.Sleep(netSendRetryTime)
|
vars, err = ns.Vars()
|
||||||
continue
|
if err != nil {
|
||||||
}
|
log.Log(logger.Error, pkg+"netSender failed to get vars", "error", err.Error())
|
||||||
vs = ns.VarSum()
|
time.Sleep(netSendRetryTime)
|
||||||
if vars["mode"] == "Paused" {
|
continue
|
||||||
if !paused {
|
}
|
||||||
log.Log(logger.Info, pkg+"pausing revid")
|
vs = ns.VarSum()
|
||||||
err = stopRevid(rv)
|
|
||||||
if err != nil {
|
err = rv.Update(vars)
|
||||||
log.Log(logger.Error, pkg+"failed to stop revide", "error", err.Error())
|
if err != nil {
|
||||||
continue
|
return err
|
||||||
}
|
}
|
||||||
paused = true
|
|
||||||
}
|
switch ns.Mode() {
|
||||||
} else {
|
case paused:
|
||||||
rv, cfg, err = updateRevid(&ns, rv, cfg, vars, !paused)
|
case normal:
|
||||||
if err != nil {
|
err = rv.Start()
|
||||||
return err
|
if err != nil {
|
||||||
}
|
return err
|
||||||
if paused {
|
}
|
||||||
paused = false
|
case burst:
|
||||||
}
|
log.Log(logger.Info, pkg+"Starting burst...")
|
||||||
}
|
err = rv.Start()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
time.Sleep(time.Duration(rv.Config().BurstPeriod) * time.Second)
|
||||||
|
log.Log(logger.Info, pkg+"Stopping burst...")
|
||||||
|
err = rv.Stop()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
ns.SetMode(paused, &vs)
|
||||||
|
}
|
||||||
|
sleep:
|
||||||
|
sleepTime, err := strconv.Atoi(ns.Param("mp"))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
sleepTime, _ := strconv.Atoi(ns.Param("mp"))
|
|
||||||
time.Sleep(time.Duration(sleepTime) * time.Second)
|
time.Sleep(time.Duration(sleepTime) * time.Second)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -340,139 +377,6 @@ func send(ns *netsender.Sender, rv *revid.Revid) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// wrappers for stopping and starting revid
|
|
||||||
func startRevid(ns *netsender.Sender, cfg revid.Config) (*revid.Revid, revid.Config, error) {
|
|
||||||
rv, err := revid.New(cfg, ns)
|
|
||||||
if err != nil {
|
|
||||||
return nil, cfg, err
|
|
||||||
}
|
|
||||||
err = rv.Start()
|
|
||||||
return rv, cfg, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func stopRevid(rv *revid.Revid) error {
|
|
||||||
err := rv.Stop()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// FIXME(kortschak): Is this waiting on completion of work?
|
|
||||||
// Use a wait group and Wait method if it is.
|
|
||||||
time.Sleep(revidStopTime)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func updateRevid(ns *netsender.Sender, rv *revid.Revid, cfg revid.Config, vars map[string]string, stop bool) (*revid.Revid, revid.Config, error) {
|
|
||||||
if stop {
|
|
||||||
err := stopRevid(rv)
|
|
||||||
if err != nil {
|
|
||||||
return nil, cfg, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//look through the vars and update revid where needed
|
|
||||||
for key, value := range vars {
|
|
||||||
switch key {
|
|
||||||
case "Output":
|
|
||||||
// FIXME(kortschak): There can be only one!
|
|
||||||
// How do we specify outputs after the first?
|
|
||||||
//
|
|
||||||
// Maybe we shouldn't be doing this!
|
|
||||||
switch value {
|
|
||||||
case "File":
|
|
||||||
cfg.Outputs[0] = revid.File
|
|
||||||
case "Http":
|
|
||||||
cfg.Outputs[0] = revid.Http
|
|
||||||
case "Rtmp":
|
|
||||||
cfg.Outputs[0] = revid.Rtmp
|
|
||||||
case "FfmpegRtmp":
|
|
||||||
cfg.Outputs[0] = revid.FfmpegRtmp
|
|
||||||
default:
|
|
||||||
log.Log(logger.Warning, pkg+"invalid Output1 param", "value", value)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
case "FramesPerClip":
|
|
||||||
f, err := strconv.ParseUint(value, 10, 0)
|
|
||||||
if err != nil {
|
|
||||||
log.Log(logger.Warning, pkg+"invalid framesperclip param", "value", value)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
cfg.FramesPerClip = uint(f)
|
|
||||||
case "RtmpUrl":
|
|
||||||
cfg.RtmpUrl = value
|
|
||||||
case "Bitrate":
|
|
||||||
r, err := strconv.ParseUint(value, 10, 0)
|
|
||||||
if err != nil {
|
|
||||||
log.Log(logger.Warning, pkg+"invalid framerate param", "value", value)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
cfg.Bitrate = uint(r)
|
|
||||||
case "OutputFileName":
|
|
||||||
cfg.OutputFileName = value
|
|
||||||
case "InputFileName":
|
|
||||||
cfg.InputFileName = value
|
|
||||||
case "Height":
|
|
||||||
h, err := strconv.ParseUint(value, 10, 0)
|
|
||||||
if err != nil {
|
|
||||||
log.Log(logger.Warning, pkg+"invalid height param", "value", value)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
cfg.Height = uint(h)
|
|
||||||
case "Width":
|
|
||||||
w, err := strconv.ParseUint(value, 10, 0)
|
|
||||||
if err != nil {
|
|
||||||
log.Log(logger.Warning, pkg+"invalid width param", "value", value)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
cfg.Width = uint(w)
|
|
||||||
case "FrameRate":
|
|
||||||
r, err := strconv.ParseUint(value, 10, 0)
|
|
||||||
if err != nil {
|
|
||||||
log.Log(logger.Warning, pkg+"invalid framerate param", "value", value)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
cfg.FrameRate = uint(r)
|
|
||||||
case "HttpAddress":
|
|
||||||
cfg.HttpAddress = value
|
|
||||||
case "Quantization":
|
|
||||||
q, err := strconv.ParseUint(value, 10, 0)
|
|
||||||
if err != nil {
|
|
||||||
log.Log(logger.Warning, pkg+"invalid quantization param", "value", value)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
cfg.Quantization = uint(q)
|
|
||||||
case "IntraRefreshPeriod":
|
|
||||||
p, err := strconv.ParseUint(value, 10, 0)
|
|
||||||
if err != nil {
|
|
||||||
log.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
cfg.IntraRefreshPeriod = uint(p)
|
|
||||||
case "HorizontalFlip":
|
|
||||||
switch strings.ToLower(value) {
|
|
||||||
case "true":
|
|
||||||
cfg.FlipHorizontal = true
|
|
||||||
case "false":
|
|
||||||
cfg.FlipHorizontal = false
|
|
||||||
default:
|
|
||||||
log.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value)
|
|
||||||
}
|
|
||||||
case "VerticalFlip":
|
|
||||||
switch strings.ToLower(value) {
|
|
||||||
case "true":
|
|
||||||
cfg.FlipVertical = true
|
|
||||||
case "false":
|
|
||||||
cfg.FlipVertical = false
|
|
||||||
default:
|
|
||||||
log.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value)
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return startRevid(ns, cfg)
|
|
||||||
}
|
|
||||||
|
|
||||||
// flagStrings implements an appending string set flag.
|
// flagStrings implements an appending string set flag.
|
||||||
type flagStrings []string
|
type flagStrings []string
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,144 @@
|
||||||
|
/*
|
||||||
|
NAME
|
||||||
|
decode.go
|
||||||
|
|
||||||
|
DESCRIPTION
|
||||||
|
decode.go provides functionality for the decoding of FLAC compressed audio
|
||||||
|
|
||||||
|
AUTHOR
|
||||||
|
Saxon Nelson-Milton <saxon@ausocean.org>
|
||||||
|
|
||||||
|
LICENSE
|
||||||
|
decode.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
|
||||||
|
|
||||||
|
It is free software: you can redistribute it and/or modify them
|
||||||
|
under the terms of the GNU General Public License as published by the
|
||||||
|
Free Software Foundation, either version 3 of the License, or (at your
|
||||||
|
option) any later version.
|
||||||
|
|
||||||
|
It is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||||
|
for more details.
|
||||||
|
|
||||||
|
You should have received a copy of the GNU General Public License
|
||||||
|
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
|
||||||
|
*/
|
||||||
|
package flac
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/go-audio/audio"
|
||||||
|
"github.com/go-audio/wav"
|
||||||
|
"github.com/mewkiz/flac"
|
||||||
|
)
|
||||||
|
|
||||||
|
const wavFormat = 1
|
||||||
|
|
||||||
|
// writeSeeker implements a memory based io.WriteSeeker.
|
||||||
|
type writeSeeker struct {
|
||||||
|
buf []byte
|
||||||
|
pos int
|
||||||
|
}
|
||||||
|
|
||||||
|
// Bytes returns the bytes contained in the writeSeekers buffer.
|
||||||
|
func (ws *writeSeeker) Bytes() []byte {
|
||||||
|
return ws.buf
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write writes len(p) bytes from p to the writeSeeker's buf and returns the number
|
||||||
|
// of bytes written. If less than len(p) bytes are written, an error is returned.
|
||||||
|
func (ws *writeSeeker) Write(p []byte) (n int, err error) {
|
||||||
|
minCap := ws.pos + len(p)
|
||||||
|
if minCap > cap(ws.buf) { // Make sure buf has enough capacity:
|
||||||
|
buf2 := make([]byte, len(ws.buf), minCap+len(p)) // add some extra
|
||||||
|
copy(buf2, ws.buf)
|
||||||
|
ws.buf = buf2
|
||||||
|
}
|
||||||
|
if minCap > len(ws.buf) {
|
||||||
|
ws.buf = ws.buf[:minCap]
|
||||||
|
}
|
||||||
|
copy(ws.buf[ws.pos:], p)
|
||||||
|
ws.pos += len(p)
|
||||||
|
return len(p), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Seek sets the offset for the next Read or Write to offset, interpreted according
|
||||||
|
// to whence: SeekStart means relative to the start of the file, SeekCurrent means
|
||||||
|
// relative to the current offset, and SeekEnd means relative to the end. Seek returns
|
||||||
|
// the new offset relative to the start of the file and an error, if any.
|
||||||
|
func (ws *writeSeeker) Seek(offset int64, whence int) (int64, error) {
|
||||||
|
newPos, offs := 0, int(offset)
|
||||||
|
switch whence {
|
||||||
|
case io.SeekStart:
|
||||||
|
newPos = offs
|
||||||
|
case io.SeekCurrent:
|
||||||
|
newPos = ws.pos + offs
|
||||||
|
case io.SeekEnd:
|
||||||
|
newPos = len(ws.buf) + offs
|
||||||
|
}
|
||||||
|
if newPos < 0 {
|
||||||
|
return 0, errors.New("negative result pos")
|
||||||
|
}
|
||||||
|
ws.pos = newPos
|
||||||
|
return int64(newPos), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Decode takes buf, a slice of FLAC, and decodes to WAV. If complete decoding
|
||||||
|
// fails, an error is returned.
|
||||||
|
func Decode(buf []byte) ([]byte, error) {
|
||||||
|
|
||||||
|
// Lex the FLAC into a stream to hold audio and it's properties.
|
||||||
|
r := bytes.NewReader(buf)
|
||||||
|
stream, err := flac.Parse(r)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.New("Could not parse FLAC")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create WAV encoder and pass writeSeeker that will store output WAV.
|
||||||
|
ws := &writeSeeker{}
|
||||||
|
sr := int(stream.Info.SampleRate)
|
||||||
|
bps := int(stream.Info.BitsPerSample)
|
||||||
|
nc := int(stream.Info.NChannels)
|
||||||
|
enc := wav.NewEncoder(ws, sr, bps, nc, wavFormat)
|
||||||
|
defer enc.Close()
|
||||||
|
|
||||||
|
// Decode FLAC into frames of samples
|
||||||
|
intBuf := &audio.IntBuffer{
|
||||||
|
Format: &audio.Format{NumChannels: nc, SampleRate: sr},
|
||||||
|
SourceBitDepth: bps,
|
||||||
|
}
|
||||||
|
return decodeFrames(stream, intBuf, enc, ws)
|
||||||
|
}
|
||||||
|
|
||||||
|
// decodeFrames parses frames from the stream and encodes them into WAV until
|
||||||
|
// the end of the stream is reached. The bytes from writeSeeker buffer are then
|
||||||
|
// returned. If any errors occur during encodeing, nil bytes and the error is returned.
|
||||||
|
func decodeFrames(s *flac.Stream, intBuf *audio.IntBuffer, e *wav.Encoder, ws *writeSeeker) ([]byte, error) {
|
||||||
|
var data []int
|
||||||
|
for {
|
||||||
|
frame, err := s.ParseNext()
|
||||||
|
|
||||||
|
// If we've reached the end of the stream then we can output the writeSeeker's buffer.
|
||||||
|
if err == io.EOF {
|
||||||
|
return ws.Bytes(), nil
|
||||||
|
} else if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Encode WAV audio samples.
|
||||||
|
data = data[:0]
|
||||||
|
for i := 0; i < frame.Subframes[0].NSamples; i++ {
|
||||||
|
for _, subframe := range frame.Subframes {
|
||||||
|
data = append(data, int(subframe.Samples[i]))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
intBuf.Data = data
|
||||||
|
if err := e.Write(intBuf); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,121 @@
|
||||||
|
/*
|
||||||
|
NAME
|
||||||
|
flac_test.go
|
||||||
|
|
||||||
|
DESCRIPTION
|
||||||
|
flac_test.go provides utilities to test FLAC audio decoding
|
||||||
|
|
||||||
|
AUTHOR
|
||||||
|
Saxon Nelson-Milton <saxon@ausocean.org>
|
||||||
|
|
||||||
|
LICENSE
|
||||||
|
flac_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
|
||||||
|
|
||||||
|
It is free software: you can redistribute it and/or modify them
|
||||||
|
under the terms of the GNU General Public License as published by the
|
||||||
|
Free Software Foundation, either version 3 of the License, or (at your
|
||||||
|
option) any later version.
|
||||||
|
|
||||||
|
It is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||||
|
for more details.
|
||||||
|
|
||||||
|
You should have received a copy of the GNU General Public License
|
||||||
|
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
|
||||||
|
*/
|
||||||
|
package flac
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
testFile = "../../../test/test-data/av/input/robot.flac"
|
||||||
|
outFile = "testOut.wav"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestWriteSeekerWrite checks that basic writing to the ws works as expected.
|
||||||
|
func TestWriteSeekerWrite(t *testing.T) {
|
||||||
|
ws := &writeSeeker{}
|
||||||
|
|
||||||
|
const tstStr1 = "hello"
|
||||||
|
ws.Write([]byte(tstStr1))
|
||||||
|
got := string(ws.buf)
|
||||||
|
if got != tstStr1 {
|
||||||
|
t.Errorf("Write failed, got: %v, want: %v", got, tstStr1)
|
||||||
|
}
|
||||||
|
|
||||||
|
const tstStr2 = " world"
|
||||||
|
const want = "hello world"
|
||||||
|
ws.Write([]byte(tstStr2))
|
||||||
|
got = string(ws.buf)
|
||||||
|
if got != want {
|
||||||
|
t.Errorf("Second write failed, got: %v, want: %v", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestWriteSeekerSeek checks that writing and seeking works as expected, i.e. we
|
||||||
|
// can write, then seek to a knew place in the buf, and write again, either replacing
|
||||||
|
// bytes, or appending bytes.
|
||||||
|
func TestWriteSeekerSeek(t *testing.T) {
|
||||||
|
ws := &writeSeeker{}
|
||||||
|
|
||||||
|
const tstStr1 = "hello"
|
||||||
|
want1 := tstStr1
|
||||||
|
ws.Write([]byte(tstStr1))
|
||||||
|
got := string(ws.buf)
|
||||||
|
if got != tstStr1 {
|
||||||
|
t.Errorf("Unexpected output, got: %v, want: %v", got, want1)
|
||||||
|
}
|
||||||
|
|
||||||
|
const tstStr2 = " world"
|
||||||
|
const want2 = tstStr1 + tstStr2
|
||||||
|
ws.Write([]byte(tstStr2))
|
||||||
|
got = string(ws.buf)
|
||||||
|
if got != want2 {
|
||||||
|
t.Errorf("Unexpected output, got: %v, want: %v", got, want2)
|
||||||
|
}
|
||||||
|
|
||||||
|
const tstStr3 = "k!"
|
||||||
|
const want3 = "hello work!"
|
||||||
|
ws.Seek(-2, io.SeekEnd)
|
||||||
|
ws.Write([]byte(tstStr3))
|
||||||
|
got = string(ws.buf)
|
||||||
|
if got != want3 {
|
||||||
|
t.Errorf("Unexpected output, got: %v, want: %v", got, want3)
|
||||||
|
}
|
||||||
|
|
||||||
|
const tstStr4 = "gopher"
|
||||||
|
const want4 = "hello gopher"
|
||||||
|
ws.Seek(6, io.SeekStart)
|
||||||
|
ws.Write([]byte(tstStr4))
|
||||||
|
got = string(ws.buf)
|
||||||
|
if got != want4 {
|
||||||
|
t.Errorf("Unexpected output, got: %v, want: %v", got, want4)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestDecodeFlac checks that we can load a flac file and decode to wav, writing
|
||||||
|
// to a wav file.
|
||||||
|
func TestDecodeFlac(t *testing.T) {
|
||||||
|
b, err := ioutil.ReadFile(testFile)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Could not read test file, failed with err: %v", err.Error())
|
||||||
|
}
|
||||||
|
out, err := Decode(b)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Could not decode, failed with err: %v", err.Error())
|
||||||
|
}
|
||||||
|
f, err := os.Create(outFile)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Could not create output file, failed with err: %v", err.Error())
|
||||||
|
}
|
||||||
|
_, err = f.Write(out)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Could not write to output file, failed with err: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,265 @@
|
||||||
|
/*
|
||||||
|
NAME
|
||||||
|
ts-repair/main.go
|
||||||
|
|
||||||
|
DESCRIPTION
|
||||||
|
This program attempts to repair mpegts discontinuities using one of two methods
|
||||||
|
as selected by the mode flag. Setting the mode flag to 0 will result in repair
|
||||||
|
by shifting all CCs such that they are continuous. Setting the mode flag to 1
|
||||||
|
will result in repair through setting the discontinuity indicator to true at
|
||||||
|
packets where a discontinuity exists.
|
||||||
|
|
||||||
|
Specify the input file with the in flag, and the output file with out flag.
|
||||||
|
|
||||||
|
AUTHOR
|
||||||
|
Saxon A. Nelson-Milton <saxon.milton@gmail.com>
|
||||||
|
|
||||||
|
LICENSE
|
||||||
|
mpegts.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
|
||||||
|
|
||||||
|
It is free software: you can redistribute it and/or modify them
|
||||||
|
under the terms of the GNU General Public License as published by the
|
||||||
|
Free Software Foundation, either version 3 of the License, or (at your
|
||||||
|
option) any later version.
|
||||||
|
|
||||||
|
It is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||||
|
for more details.
|
||||||
|
|
||||||
|
You should have received a copy of the GNU General Public License
|
||||||
|
along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses).
|
||||||
|
*/
|
||||||
|
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"bitbucket.org/ausocean/av/stream/mts"
|
||||||
|
"github.com/Comcast/gots/packet"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
PatPid = 0
|
||||||
|
PmtPid = 4096
|
||||||
|
VideoPid = 256
|
||||||
|
HeadSize = 4
|
||||||
|
DefaultAdaptationSize = 2
|
||||||
|
AdaptationIdx = 4
|
||||||
|
AdaptationControlIdx = 3
|
||||||
|
AdaptationBodyIdx = AdaptationIdx + 1
|
||||||
|
AdaptationControlMask = 0x30
|
||||||
|
DefaultAdaptationBodySize = 1
|
||||||
|
DiscontinuityIndicatorMask = 0x80
|
||||||
|
DiscontinuityIndicatorIdx = AdaptationIdx + 1
|
||||||
|
)
|
||||||
|
|
||||||
|
// Various errors that we can encounter.
|
||||||
|
const (
|
||||||
|
errBadInPath = "No file path provided, or file does not exist"
|
||||||
|
errCantCreateOut = "Can't create output file"
|
||||||
|
errCantGetPid = "Can't get pid from packet"
|
||||||
|
errReadFail = "Read failed"
|
||||||
|
errWriteFail = "Write to file failed"
|
||||||
|
errBadMode = "Bad fix mode"
|
||||||
|
errAdaptationPresent = "Adaptation field is already present in packet"
|
||||||
|
errNoAdaptationField = "No adaptation field in this packet"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Consts describing flag usage.
|
||||||
|
const (
|
||||||
|
inUsage = "The path to the file to be repaired"
|
||||||
|
outUsage = "Output file path"
|
||||||
|
modeUsage = "Fix mode: 0 = cc-shift, 1 = di-update"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Repair modes.
|
||||||
|
const (
|
||||||
|
ccShift = iota
|
||||||
|
diUpdate
|
||||||
|
)
|
||||||
|
|
||||||
|
var ccMap = map[int]byte{
|
||||||
|
PatPid: 16,
|
||||||
|
PmtPid: 16,
|
||||||
|
VideoPid: 16,
|
||||||
|
}
|
||||||
|
|
||||||
|
// packetNo will keep track of the ts packet number for reference.
|
||||||
|
var packetNo int
|
||||||
|
|
||||||
|
// Option defines a func that performs an action on p in order to change a ts option.
|
||||||
|
type Option func(p *Packet)
|
||||||
|
|
||||||
|
// Packet is a byte array of size PacketSize i.e. 188 bytes. We define this
|
||||||
|
// to allow us to write receiver funcs for the [PacketSize]byte type.
|
||||||
|
type Packet [mts.PacketSize]byte
|
||||||
|
|
||||||
|
// CC returns the CC of p.
|
||||||
|
func (p *Packet) CC() byte {
|
||||||
|
return (*p)[3] & 0x0f
|
||||||
|
}
|
||||||
|
|
||||||
|
// setCC sets the CC of p.
|
||||||
|
func (p *Packet) setCC(cc byte) {
|
||||||
|
(*p)[3] |= cc & 0xf
|
||||||
|
}
|
||||||
|
|
||||||
|
// setDI sets the discontinuity counter of p.
|
||||||
|
func (p *Packet) setDI(di bool) {
|
||||||
|
if di {
|
||||||
|
p[5] |= 0x80
|
||||||
|
} else {
|
||||||
|
p[5] &= 0x7f
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// addAdaptationField adds an adaptation field to p, and applys the passed options to this field.
|
||||||
|
// TODO: this will probably break if we already have adaptation field.
|
||||||
|
func (p *Packet) addAdaptationField(options ...Option) error {
|
||||||
|
if p.hasAdaptation() {
|
||||||
|
return errors.New(errAdaptationPresent)
|
||||||
|
}
|
||||||
|
// Create space for adaptation field.
|
||||||
|
copy(p[HeadSize+DefaultAdaptationSize:], p[HeadSize:len(p)-DefaultAdaptationSize])
|
||||||
|
|
||||||
|
// TODO: seperate into own function
|
||||||
|
// Update adaptation field control.
|
||||||
|
p[AdaptationControlIdx] &= 0xff ^ AdaptationControlMask
|
||||||
|
p[AdaptationControlIdx] |= AdaptationControlMask
|
||||||
|
// Default the adaptationfield.
|
||||||
|
p.resetAdaptation()
|
||||||
|
|
||||||
|
// Apply and options that have bee passed.
|
||||||
|
for _, option := range options {
|
||||||
|
option(p)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// resetAdaptation sets fields in ps adaptation field to 0 if the adaptation field
|
||||||
|
// exists, otherwise an error is returned.
|
||||||
|
func (p *Packet) resetAdaptation() error {
|
||||||
|
if !p.hasAdaptation() {
|
||||||
|
return errors.New(errNoAdaptationField)
|
||||||
|
}
|
||||||
|
p[AdaptationIdx] = DefaultAdaptationBodySize
|
||||||
|
p[AdaptationBodyIdx] = 0x00
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// hasAdaptation returns true if p has an adaptation field and false otherwise.
|
||||||
|
func (p *Packet) hasAdaptation() bool {
|
||||||
|
afc := p[AdaptationControlIdx] & AdaptationControlMask
|
||||||
|
if afc == 0x20 || afc == 0x30 {
|
||||||
|
return true
|
||||||
|
} else {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// DiscontinuityIndicator returns and Option that will set p's discontinuity
|
||||||
|
// indicator according to f.
|
||||||
|
func DiscontinuityIndicator(f bool) Option {
|
||||||
|
return func(p *Packet) {
|
||||||
|
set := byte(DiscontinuityIndicatorMask)
|
||||||
|
if !f {
|
||||||
|
set = 0x00
|
||||||
|
}
|
||||||
|
p[DiscontinuityIndicatorIdx] &= 0xff ^ DiscontinuityIndicatorMask
|
||||||
|
p[DiscontinuityIndicatorIdx] |= DiscontinuityIndicatorMask & set
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
// Deal with input flags
|
||||||
|
inPtr := flag.String("in", "", inUsage)
|
||||||
|
outPtr := flag.String("out", "out.ts", outUsage)
|
||||||
|
modePtr := flag.Int("mode", diUpdate, modeUsage)
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
|
// Try and open the given input file, otherwise panic - we can't do anything
|
||||||
|
inFile, err := os.Open(*inPtr)
|
||||||
|
defer inFile.Close()
|
||||||
|
if err != nil {
|
||||||
|
panic(errBadInPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try and create output file, otherwise panic - we can't do anything
|
||||||
|
outFile, err := os.Create(*outPtr)
|
||||||
|
defer outFile.Close()
|
||||||
|
if err != nil {
|
||||||
|
panic(errCantCreateOut)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read each packet from the input file reader
|
||||||
|
var p Packet
|
||||||
|
for {
|
||||||
|
// If we get an end of file then return, otherwise we panic - can't do anything else
|
||||||
|
if _, err := inFile.Read(p[:mts.PacketSize]); err == io.EOF {
|
||||||
|
return
|
||||||
|
} else if err != nil {
|
||||||
|
panic(errReadFail + ": " + err.Error())
|
||||||
|
}
|
||||||
|
packetNo++
|
||||||
|
|
||||||
|
// Get the pid from the packet
|
||||||
|
pid, err := packet.Pid((*packet.Packet)(&p))
|
||||||
|
if err != nil {
|
||||||
|
panic(errCantGetPid)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the cc from the packet and also the expected cc (if exists)
|
||||||
|
cc := p.CC()
|
||||||
|
expect, exists := expectedCC(int(pid))
|
||||||
|
if !exists {
|
||||||
|
updateCCMap(int(pid), cc)
|
||||||
|
} else {
|
||||||
|
switch *modePtr {
|
||||||
|
// ccShift mode shifts all CC regardless of presence of Discontinuities or not
|
||||||
|
case ccShift:
|
||||||
|
p.setCC(expect)
|
||||||
|
// diUpdate mode finds discontinuities and sets the discontinuity indicator to true.
|
||||||
|
// If we have a pat or pmt then we need to add an adaptation field and then set the DI.
|
||||||
|
case diUpdate:
|
||||||
|
if cc != expect {
|
||||||
|
fmt.Printf("***** Discontinuity found (packetNo: %v pid: %v, cc: %v, expect: %v)\n", packetNo, pid, cc, expect)
|
||||||
|
if p.hasAdaptation() {
|
||||||
|
p.setDI(true)
|
||||||
|
} else {
|
||||||
|
p.addAdaptationField(DiscontinuityIndicator(true))
|
||||||
|
}
|
||||||
|
updateCCMap(int(pid), p.CC())
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
panic(errBadMode)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write this packet to the output file.
|
||||||
|
if _, err := outFile.Write(p[:]); err != nil {
|
||||||
|
panic(errWriteFail + ": " + err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// expectedCC returns the expected cc for the given pid. If the cc hasn't been
|
||||||
|
// used yet, then 16 and false is returned.
|
||||||
|
func expectedCC(pid int) (byte, bool) {
|
||||||
|
cc := ccMap[pid]
|
||||||
|
if cc == 16 {
|
||||||
|
return 16, false
|
||||||
|
}
|
||||||
|
ccMap[pid] = (cc + 1) & 0xf
|
||||||
|
return cc, true
|
||||||
|
}
|
||||||
|
|
||||||
|
// updateCCMap updates the cc for the passed pid.
|
||||||
|
func updateCCMap(pid int, cc byte) {
|
||||||
|
ccMap[pid] = (cc + 1) & 0xf
|
||||||
|
}
|
|
@ -1,75 +0,0 @@
|
||||||
/*
|
|
||||||
NAME
|
|
||||||
main.go
|
|
||||||
|
|
||||||
DESCRIPTION
|
|
||||||
See Readme.md
|
|
||||||
|
|
||||||
AUTHOR
|
|
||||||
Saxon Nelson-Milton <saxon@ausocean.org>
|
|
||||||
|
|
||||||
LICENSE
|
|
||||||
main.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
|
|
||||||
|
|
||||||
It is free software: you can redistribute it and/or modify them
|
|
||||||
under the terms of the GNU General Public License as published by the
|
|
||||||
Free Software Foundation, either version 3 of the License, or (at your
|
|
||||||
option) any later version.
|
|
||||||
|
|
||||||
It is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
|
||||||
for more details.
|
|
||||||
|
|
||||||
You should have received a copy of the GNU General Public License
|
|
||||||
along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses).
|
|
||||||
*/
|
|
||||||
|
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"flag"
|
|
||||||
"log"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"bitbucket.org/ausocean/av/revid"
|
|
||||||
"bitbucket.org/ausocean/iot/pi/smartlogger"
|
|
||||||
"bitbucket.org/ausocean/utils/logger"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
inputFile = "../../../../test/test-data/av/input/betterInput.h264"
|
|
||||||
frameRate = "25"
|
|
||||||
runDuration = 120 * time.Second
|
|
||||||
logPath = "/var/log"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Test h264 inputfile to flv format into rtmp using librtmp c wrapper
|
|
||||||
func main() {
|
|
||||||
// Get the rtmp url from a cmd flag
|
|
||||||
rtmpUrlPtr := flag.String("rtmpUrl", "", "The rtmp url you would like to stream to.")
|
|
||||||
flag.Parse()
|
|
||||||
if *rtmpUrlPtr == "" {
|
|
||||||
log.Println("No RTMP url passed!")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
config := revid.Config{
|
|
||||||
Input: revid.File,
|
|
||||||
InputFileName: inputFile,
|
|
||||||
InputCodec: revid.H264,
|
|
||||||
Outputs: []byte{revid.Rtmp},
|
|
||||||
RtmpMethod: revid.LibRtmp,
|
|
||||||
RtmpUrl: *rtmpUrlPtr,
|
|
||||||
Packetization: revid.Flv,
|
|
||||||
Logger: logger.New(logger.Info, &smartlogger.New(logPath).LogRoller),
|
|
||||||
}
|
|
||||||
revidInst, err := revid.New(config, nil)
|
|
||||||
if err != nil {
|
|
||||||
config.Logger.Log(logger.Error, "Should not have got an error!: ", err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
revidInst.Start()
|
|
||||||
time.Sleep(runDuration)
|
|
||||||
revidInst.Stop()
|
|
||||||
}
|
|
|
@ -1,66 +0,0 @@
|
||||||
/*
|
|
||||||
NAME
|
|
||||||
main.go
|
|
||||||
|
|
||||||
DESCRIPTION
|
|
||||||
See Readme.md
|
|
||||||
|
|
||||||
AUTHOR
|
|
||||||
Saxon Nelson-Milton <saxon@ausocean.org>
|
|
||||||
|
|
||||||
LICENSE
|
|
||||||
main.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
|
|
||||||
|
|
||||||
It is free software: you can redistribute it and/or modify them
|
|
||||||
under the terms of the GNU General Public License as published by the
|
|
||||||
Free Software Foundation, either version 3 of the License, or (at your
|
|
||||||
option) any later version.
|
|
||||||
|
|
||||||
It is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
|
||||||
for more details.
|
|
||||||
|
|
||||||
You should have received a copy of the GNU General Public License
|
|
||||||
along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses).
|
|
||||||
*/
|
|
||||||
|
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"bitbucket.org/ausocean/av/revid"
|
|
||||||
"bitbucket.org/ausocean/iot/pi/smartlogger"
|
|
||||||
"bitbucket.org/ausocean/utils/logger"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
inputFile = "../../../../test/test-data/av/input/betterInput.h264"
|
|
||||||
outputFile = "output.ts"
|
|
||||||
frameRate = "25"
|
|
||||||
runDuration = 120 * time.Second
|
|
||||||
logPath = "/var/log"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Test h264 inputfile to flv format into rtmp using librtmp c wrapper
|
|
||||||
func main() {
|
|
||||||
|
|
||||||
config := revid.Config{
|
|
||||||
Input: revid.File,
|
|
||||||
InputFileName: inputFile,
|
|
||||||
InputCodec: revid.H264,
|
|
||||||
Outputs: []byte{revid.File},
|
|
||||||
OutputFileName: outputFile,
|
|
||||||
Packetization: revid.Mpegts,
|
|
||||||
Logger: logger.New(logger.Info, &smartlogger.New(logPath).LogRoller),
|
|
||||||
}
|
|
||||||
revidInst, err := revid.New(config, nil)
|
|
||||||
if err != nil {
|
|
||||||
config.Logger.Log(logger.Error, "Should not have got an error!:", err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
revidInst.Start()
|
|
||||||
time.Sleep(runDuration)
|
|
||||||
revidInst.Stop()
|
|
||||||
}
|
|
|
@ -68,6 +68,7 @@ type Config struct {
|
||||||
RtpAddress string
|
RtpAddress string
|
||||||
Logger Logger
|
Logger Logger
|
||||||
SendRetry bool
|
SendRetry bool
|
||||||
|
BurstPeriod uint
|
||||||
}
|
}
|
||||||
|
|
||||||
// Enums for config struct
|
// Enums for config struct
|
||||||
|
@ -114,6 +115,7 @@ const (
|
||||||
defaultInputCodec = H264
|
defaultInputCodec = H264
|
||||||
defaultVerbosity = No // FIXME(kortschak): This makes no sense whatsoever. No is currently 15.
|
defaultVerbosity = No // FIXME(kortschak): This makes no sense whatsoever. No is currently 15.
|
||||||
defaultRtpAddr = "localhost:6970"
|
defaultRtpAddr = "localhost:6970"
|
||||||
|
defaultBurstPeriod = 10 // Seconds
|
||||||
)
|
)
|
||||||
|
|
||||||
// Validate checks for any errors in the config fields and defaults settings
|
// Validate checks for any errors in the config fields and defaults settings
|
||||||
|
@ -200,6 +202,11 @@ func (c *Config) Validate(r *Revid) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if c.BurstPeriod == 0 {
|
||||||
|
c.Logger.Log(logger.Warning, pkg+"no burst period defined, defaulting", "burstPeriod", defaultBurstPeriod)
|
||||||
|
c.BurstPeriod = defaultBurstPeriod
|
||||||
|
}
|
||||||
|
|
||||||
if c.FramesPerClip < 1 {
|
if c.FramesPerClip < 1 {
|
||||||
c.Logger.Log(logger.Warning, pkg+"no FramesPerClip defined, defaulting",
|
c.Logger.Log(logger.Warning, pkg+"no FramesPerClip defined, defaulting",
|
||||||
"framesPerClip", defaultFramesPerClip)
|
"framesPerClip", defaultFramesPerClip)
|
||||||
|
|
135
revid/revid.go
135
revid/revid.go
|
@ -119,10 +119,11 @@ type Revid struct {
|
||||||
// bitrate hold the last send bitrate calculation result.
|
// bitrate hold the last send bitrate calculation result.
|
||||||
bitrate int
|
bitrate int
|
||||||
|
|
||||||
// isRunning is a loaded and cocked foot-gun.
|
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
isRunning bool
|
isRunning bool
|
||||||
|
|
||||||
|
wg sync.WaitGroup
|
||||||
|
|
||||||
err chan error
|
err chan error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -326,7 +327,14 @@ func (r *Revid) IsRunning() bool {
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
// setIsRunning sets revid.isRunning using b.
|
func (r *Revid) Config() Config {
|
||||||
|
r.mu.Lock()
|
||||||
|
cfg := r.config
|
||||||
|
r.mu.Unlock()
|
||||||
|
return cfg
|
||||||
|
}
|
||||||
|
|
||||||
|
// setIsRunning sets r.isRunning using b.
|
||||||
func (r *Revid) setIsRunning(b bool) {
|
func (r *Revid) setIsRunning(b bool) {
|
||||||
r.mu.Lock()
|
r.mu.Lock()
|
||||||
r.isRunning = b
|
r.isRunning = b
|
||||||
|
@ -340,9 +348,11 @@ func (r *Revid) Start() error {
|
||||||
return errors.New(pkg + "start called but revid is already running")
|
return errors.New(pkg + "start called but revid is already running")
|
||||||
}
|
}
|
||||||
r.config.Logger.Log(logger.Info, pkg+"starting Revid")
|
r.config.Logger.Log(logger.Info, pkg+"starting Revid")
|
||||||
|
// TODO: this doesn't need to be here
|
||||||
r.config.Logger.Log(logger.Debug, pkg+"setting up output")
|
r.config.Logger.Log(logger.Debug, pkg+"setting up output")
|
||||||
r.setIsRunning(true)
|
r.setIsRunning(true)
|
||||||
r.config.Logger.Log(logger.Info, pkg+"starting output routine")
|
r.config.Logger.Log(logger.Info, pkg+"starting output routine")
|
||||||
|
r.wg.Add(1)
|
||||||
go r.outputClips()
|
go r.outputClips()
|
||||||
r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content")
|
r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content")
|
||||||
err := r.setupInput()
|
err := r.setupInput()
|
||||||
|
@ -363,12 +373,129 @@ func (r *Revid) Stop() error {
|
||||||
if r.cmd != nil && r.cmd.Process != nil {
|
if r.cmd != nil && r.cmd.Process != nil {
|
||||||
r.cmd.Process.Kill()
|
r.cmd.Process.Kill()
|
||||||
}
|
}
|
||||||
|
r.wg.Wait()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Revid) Update(vars map[string]string) error {
|
||||||
|
if r.IsRunning() {
|
||||||
|
if err := r.Stop(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//look through the vars and update revid where needed
|
||||||
|
for key, value := range vars {
|
||||||
|
switch key {
|
||||||
|
case "Output":
|
||||||
|
// FIXME(kortschak): There can be only one!
|
||||||
|
// How do we specify outputs after the first?
|
||||||
|
//
|
||||||
|
// Maybe we shouldn't be doing this!
|
||||||
|
switch value {
|
||||||
|
case "File":
|
||||||
|
r.config.Outputs[0] = File
|
||||||
|
case "Http":
|
||||||
|
r.config.Outputs[0] = Http
|
||||||
|
case "Rtmp":
|
||||||
|
r.config.Outputs[0] = Rtmp
|
||||||
|
case "FfmpegRtmp":
|
||||||
|
r.config.Outputs[0] = FfmpegRtmp
|
||||||
|
default:
|
||||||
|
r.config.Logger.Log(logger.Warning, pkg+"invalid Output1 param", "value", value)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
case "FramesPerClip":
|
||||||
|
f, err := strconv.ParseUint(value, 10, 0)
|
||||||
|
if err != nil {
|
||||||
|
r.config.Logger.Log(logger.Warning, pkg+"invalid framesperclip param", "value", value)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
r.config.FramesPerClip = uint(f)
|
||||||
|
case "RtmpUrl":
|
||||||
|
r.config.RtmpUrl = value
|
||||||
|
case "Bitrate":
|
||||||
|
v, err := strconv.ParseUint(value, 10, 0)
|
||||||
|
if err != nil {
|
||||||
|
r.config.Logger.Log(logger.Warning, pkg+"invalid framerate param", "value", value)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
r.config.Bitrate = uint(v)
|
||||||
|
case "OutputFileName":
|
||||||
|
r.config.OutputFileName = value
|
||||||
|
case "InputFileName":
|
||||||
|
r.config.InputFileName = value
|
||||||
|
case "Height":
|
||||||
|
h, err := strconv.ParseUint(value, 10, 0)
|
||||||
|
if err != nil {
|
||||||
|
r.config.Logger.Log(logger.Warning, pkg+"invalid height param", "value", value)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
r.config.Height = uint(h)
|
||||||
|
case "Width":
|
||||||
|
w, err := strconv.ParseUint(value, 10, 0)
|
||||||
|
if err != nil {
|
||||||
|
r.config.Logger.Log(logger.Warning, pkg+"invalid width param", "value", value)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
r.config.Width = uint(w)
|
||||||
|
case "FrameRate":
|
||||||
|
v, err := strconv.ParseUint(value, 10, 0)
|
||||||
|
if err != nil {
|
||||||
|
r.config.Logger.Log(logger.Warning, pkg+"invalid framerate param", "value", value)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
r.config.FrameRate = uint(v)
|
||||||
|
case "HttpAddress":
|
||||||
|
r.config.HttpAddress = value
|
||||||
|
case "Quantization":
|
||||||
|
q, err := strconv.ParseUint(value, 10, 0)
|
||||||
|
if err != nil {
|
||||||
|
r.config.Logger.Log(logger.Warning, pkg+"invalid quantization param", "value", value)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
r.config.Quantization = uint(q)
|
||||||
|
case "IntraRefreshPeriod":
|
||||||
|
p, err := strconv.ParseUint(value, 10, 0)
|
||||||
|
if err != nil {
|
||||||
|
r.config.Logger.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
r.config.IntraRefreshPeriod = uint(p)
|
||||||
|
case "HorizontalFlip":
|
||||||
|
switch strings.ToLower(value) {
|
||||||
|
case "true":
|
||||||
|
r.config.FlipHorizontal = true
|
||||||
|
case "false":
|
||||||
|
r.config.FlipHorizontal = false
|
||||||
|
default:
|
||||||
|
r.config.Logger.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value)
|
||||||
|
}
|
||||||
|
case "VerticalFlip":
|
||||||
|
switch strings.ToLower(value) {
|
||||||
|
case "true":
|
||||||
|
r.config.FlipVertical = true
|
||||||
|
case "false":
|
||||||
|
r.config.FlipVertical = false
|
||||||
|
default:
|
||||||
|
r.config.Logger.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value)
|
||||||
|
}
|
||||||
|
case "BurstPeriod":
|
||||||
|
v, err := strconv.ParseUint(value, 10, 0)
|
||||||
|
if err != nil {
|
||||||
|
r.config.Logger.Log(logger.Warning, pkg+"invalid BurstPeriod param", "value", value)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
r.config.BurstPeriod = uint(v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// outputClips takes the clips produced in the packClips method and outputs them
|
// outputClips takes the clips produced in the packClips method and outputs them
|
||||||
// to the desired output defined in the revid config
|
// to the desired output defined in the revid config
|
||||||
func (r *Revid) outputClips() {
|
func (r *Revid) outputClips() {
|
||||||
|
defer r.wg.Done()
|
||||||
lastTime := time.Now()
|
lastTime := time.Now()
|
||||||
var count int
|
var count int
|
||||||
loop:
|
loop:
|
||||||
|
@ -507,6 +634,7 @@ func (r *Revid) startRaspivid() error {
|
||||||
r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error())
|
r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
r.wg.Add(1)
|
||||||
go r.processFrom(stdout, 0)
|
go r.processFrom(stdout, 0)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -555,6 +683,7 @@ func (r *Revid) startV4L() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
r.wg.Add(1)
|
||||||
go r.processFrom(stdout, time.Duration(0))
|
go r.processFrom(stdout, time.Duration(0))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -570,6 +699,7 @@ func (r *Revid) setupInputForFile() error {
|
||||||
defer f.Close()
|
defer f.Close()
|
||||||
|
|
||||||
// TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop.
|
// TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop.
|
||||||
|
r.wg.Add(1)
|
||||||
go r.processFrom(f, time.Second/time.Duration(r.config.FrameRate))
|
go r.processFrom(f, time.Second/time.Duration(r.config.FrameRate))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -578,4 +708,5 @@ func (r *Revid) processFrom(read io.Reader, delay time.Duration) {
|
||||||
r.config.Logger.Log(logger.Info, pkg+"reading input data")
|
r.config.Logger.Log(logger.Info, pkg+"reading input data")
|
||||||
r.err <- r.lexTo(r.encoder, read, delay)
|
r.err <- r.lexTo(r.encoder, read, delay)
|
||||||
r.config.Logger.Log(logger.Info, pkg+"finished reading input data")
|
r.config.Logger.Log(logger.Info, pkg+"finished reading input data")
|
||||||
|
r.wg.Done()
|
||||||
}
|
}
|
||||||
|
|
|
@ -76,7 +76,7 @@ type link struct {
|
||||||
protocol int32
|
protocol int32
|
||||||
timeout uint
|
timeout uint
|
||||||
port uint16
|
port uint16
|
||||||
conn *net.TCPConn
|
conn net.Conn
|
||||||
}
|
}
|
||||||
|
|
||||||
// method represents an RTMP method.
|
// method represents an RTMP method.
|
||||||
|
|
|
@ -85,7 +85,7 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
psiSndCnt = 7
|
psiInterval = 1 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
// Meta allows addition of metadata to encoded mts from outside of this pkg.
|
// Meta allows addition of metadata to encoded mts from outside of this pkg.
|
||||||
|
@ -99,6 +99,14 @@ var (
|
||||||
pmtTable = standardPmt.Bytes()
|
pmtTable = standardPmt.Bytes()
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
sdtPid = 17
|
||||||
|
patPid = 0
|
||||||
|
pmtPid = 4096
|
||||||
|
videoPid = 256
|
||||||
|
streamID = 0xe0 // First video stream ID.
|
||||||
|
)
|
||||||
|
|
||||||
// Time related constants.
|
// Time related constants.
|
||||||
const (
|
const (
|
||||||
// ptsOffset is the offset added to the clock to determine
|
// ptsOffset is the offset added to the clock to determine
|
||||||
|
@ -119,9 +127,9 @@ type Encoder struct {
|
||||||
tsSpace [PacketSize]byte
|
tsSpace [PacketSize]byte
|
||||||
pesSpace [pes.MaxPesSize]byte
|
pesSpace [pes.MaxPesSize]byte
|
||||||
|
|
||||||
psiCount int
|
|
||||||
|
|
||||||
continuity map[int]byte
|
continuity map[int]byte
|
||||||
|
|
||||||
|
psiLastTime time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewEncoder returns an Encoder with the specified frame rate.
|
// NewEncoder returns an Encoder with the specified frame rate.
|
||||||
|
@ -133,13 +141,18 @@ func NewEncoder(dst io.Writer, fps float64) *Encoder {
|
||||||
ptsOffset: ptsOffset,
|
ptsOffset: ptsOffset,
|
||||||
|
|
||||||
continuity: map[int]byte{
|
continuity: map[int]byte{
|
||||||
PatPid: 0,
|
patPid: 0,
|
||||||
PmtPid: 0,
|
pmtPid: 0,
|
||||||
VideoPid: 0,
|
videoPid: 0,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
hasPayload = 0x1
|
||||||
|
hasAdaptationField = 0x2
|
||||||
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
hasDTS = 0x1
|
hasDTS = 0x1
|
||||||
hasPTS = 0x2
|
hasPTS = 0x2
|
||||||
|
@ -148,15 +161,18 @@ const (
|
||||||
// generate handles the incoming data and generates equivalent mpegts packets -
|
// generate handles the incoming data and generates equivalent mpegts packets -
|
||||||
// sending them to the output channel.
|
// sending them to the output channel.
|
||||||
func (e *Encoder) Encode(nalu []byte) error {
|
func (e *Encoder) Encode(nalu []byte) error {
|
||||||
if e.psiCount <= 0 {
|
now := time.Now()
|
||||||
|
if now.Sub(e.psiLastTime) > psiInterval {
|
||||||
err := e.writePSI()
|
err := e.writePSI()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
e.psiLastTime = now
|
||||||
}
|
}
|
||||||
|
|
||||||
// Prepare PES data.
|
// Prepare PES data.
|
||||||
pesPkt := pes.Packet{
|
pesPkt := pes.Packet{
|
||||||
StreamID: StreamID,
|
StreamID: streamID,
|
||||||
PDI: hasPTS,
|
PDI: hasPTS,
|
||||||
PTS: e.pts(),
|
PTS: e.pts(),
|
||||||
Data: nalu,
|
Data: nalu,
|
||||||
|
@ -168,10 +184,10 @@ func (e *Encoder) Encode(nalu []byte) error {
|
||||||
for len(buf) != 0 {
|
for len(buf) != 0 {
|
||||||
pkt := Packet{
|
pkt := Packet{
|
||||||
PUSI: pusi,
|
PUSI: pusi,
|
||||||
PID: VideoPid,
|
PID: videoPid,
|
||||||
RAI: pusi,
|
RAI: pusi,
|
||||||
CC: e.ccFor(VideoPid),
|
CC: e.ccFor(videoPid),
|
||||||
AFC: HasAdaptationField | HasPayload,
|
AFC: hasAdaptationField | hasPayload,
|
||||||
PCRF: pusi,
|
PCRF: pusi,
|
||||||
}
|
}
|
||||||
n := pkt.FillPayload(buf)
|
n := pkt.FillPayload(buf)
|
||||||
|
@ -184,7 +200,6 @@ func (e *Encoder) Encode(nalu []byte) error {
|
||||||
pusi = false
|
pusi = false
|
||||||
}
|
}
|
||||||
_, err := e.dst.Write(pkt.Bytes(e.tsSpace[:PacketSize]))
|
_, err := e.dst.Write(pkt.Bytes(e.tsSpace[:PacketSize]))
|
||||||
e.psiCount--
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -227,7 +242,6 @@ func (e *Encoder) writePSI() error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
e.psiCount = psiSndCnt
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -165,7 +165,7 @@ func FindPMT(d []byte) (p []byte, i int, err error) {
|
||||||
}
|
}
|
||||||
for i = 0; i < len(d); i += PacketSize {
|
for i = 0; i < len(d); i += PacketSize {
|
||||||
pid := (uint16(d[i+1]&0x1f) << 8) | uint16(d[i+2])
|
pid := (uint16(d[i+1]&0x1f) << 8) | uint16(d[i+2])
|
||||||
if pid == PmtPid {
|
if pid == pmtPid {
|
||||||
p = d[i+4 : i+PacketSize]
|
p = d[i+4 : i+PacketSize]
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue