mirror of https://bitbucket.org/ausocean/av.git
revid/cmd: merged in master and again removed cmds
This commit is contained in:
commit
734cf8cca9
|
@ -50,6 +50,13 @@ const (
|
||||||
defaultLogVerbosity = logger.Debug
|
defaultLogVerbosity = logger.Debug
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Revid modes
|
||||||
|
const (
|
||||||
|
normal = "Normal"
|
||||||
|
paused = "Paused"
|
||||||
|
burst = "Burst"
|
||||||
|
)
|
||||||
|
|
||||||
// Other misc consts
|
// Other misc consts
|
||||||
const (
|
const (
|
||||||
netSendRetryTime = 5 * time.Second
|
netSendRetryTime = 5 * time.Second
|
||||||
|
@ -72,23 +79,22 @@ func main() {
|
||||||
cfg := handleFlags()
|
cfg := handleFlags()
|
||||||
|
|
||||||
if !*useNetsender {
|
if !*useNetsender {
|
||||||
// run revid for the specified duration
|
rv, err := revid.New(cfg, nil)
|
||||||
rv, _, err := startRevid(nil, cfg)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
cfg.Logger.Log(logger.Fatal, pkg+"failed to initialiase revid", "error", err.Error())
|
||||||
|
}
|
||||||
|
if err = rv.Start(); err != nil {
|
||||||
cfg.Logger.Log(logger.Fatal, pkg+"failed to start revid", "error", err.Error())
|
cfg.Logger.Log(logger.Fatal, pkg+"failed to start revid", "error", err.Error())
|
||||||
}
|
}
|
||||||
time.Sleep(*runDurationPtr)
|
time.Sleep(*runDurationPtr)
|
||||||
err = stopRevid(rv)
|
if err = rv.Stop(); err != nil {
|
||||||
if err != nil {
|
|
||||||
cfg.Logger.Log(logger.Error, pkg+"failed to stop revid before program termination", "error", err.Error())
|
cfg.Logger.Log(logger.Error, pkg+"failed to stop revid before program termination", "error", err.Error())
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err := run(nil, cfg)
|
if err := run(cfg); err != nil {
|
||||||
if err != nil {
|
|
||||||
log.Log(logger.Fatal, pkg+"failed to run revid", "error", err.Error())
|
log.Log(logger.Fatal, pkg+"failed to run revid", "error", err.Error())
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -102,8 +108,6 @@ func handleFlags() revid.Config {
|
||||||
|
|
||||||
inputPtr = flag.String("Input", "", "The input type: Raspivid, File, Webcam")
|
inputPtr = flag.String("Input", "", "The input type: Raspivid, File, Webcam")
|
||||||
inputCodecPtr = flag.String("InputCodec", "", "The codec of the input: H264, Mjpeg")
|
inputCodecPtr = flag.String("InputCodec", "", "The codec of the input: H264, Mjpeg")
|
||||||
output1Ptr = flag.String("Output1", "", "The first output type: Http, Rtmp, File, Udp, Rtp")
|
|
||||||
output2Ptr = flag.String("Output2", "", "The second output type: Http, Rtmp, File, Udp, Rtp")
|
|
||||||
rtmpMethodPtr = flag.String("RtmpMethod", "", "The method used to send over rtmp: Ffmpeg, Librtmp")
|
rtmpMethodPtr = flag.String("RtmpMethod", "", "The method used to send over rtmp: Ffmpeg, Librtmp")
|
||||||
packetizationPtr = flag.String("Packetization", "", "The method of data packetisation: Flv, Mpegts, None")
|
packetizationPtr = flag.String("Packetization", "", "The method of data packetisation: Flv, Mpegts, None")
|
||||||
quantizePtr = flag.Bool("Quantize", false, "Quantize input (non-variable bitrate)")
|
quantizePtr = flag.Bool("Quantize", false, "Quantize input (non-variable bitrate)")
|
||||||
|
@ -126,6 +130,9 @@ func handleFlags() revid.Config {
|
||||||
configFilePtr = flag.String("ConfigFile", "", "NetSender config file")
|
configFilePtr = flag.String("ConfigFile", "", "NetSender config file")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var outputs flagStrings
|
||||||
|
flag.Var(&outputs, "Output", "output type: Http, Rtmp, File, Udp, Rtp (may be used more than once)")
|
||||||
|
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
log = logger.New(defaultLogVerbosity, &smartlogger.New(*logPathPtr).LogRoller)
|
log = logger.New(defaultLogVerbosity, &smartlogger.New(*logPathPtr).LogRoller)
|
||||||
|
@ -167,40 +174,24 @@ func handleFlags() revid.Config {
|
||||||
log.Log(logger.Error, pkg+"bad input codec argument")
|
log.Log(logger.Error, pkg+"bad input codec argument")
|
||||||
}
|
}
|
||||||
|
|
||||||
switch *output1Ptr {
|
for _, o := range outputs {
|
||||||
case "File":
|
switch o {
|
||||||
cfg.Output1 = revid.File
|
case "File":
|
||||||
case "Http":
|
cfg.Outputs = append(cfg.Outputs, revid.File)
|
||||||
cfg.Output1 = revid.Http
|
case "Http":
|
||||||
case "Rtmp":
|
cfg.Outputs = append(cfg.Outputs, revid.Http)
|
||||||
cfg.Output1 = revid.Rtmp
|
case "Rtmp":
|
||||||
case "FfmpegRtmp":
|
cfg.Outputs = append(cfg.Outputs, revid.Rtmp)
|
||||||
cfg.Output1 = revid.FfmpegRtmp
|
case "FfmpegRtmp":
|
||||||
case "Udp":
|
cfg.Outputs = append(cfg.Outputs, revid.FfmpegRtmp)
|
||||||
cfg.Output1 = revid.Udp
|
case "Udp":
|
||||||
case "Rtp":
|
cfg.Outputs = append(cfg.Outputs, revid.Udp)
|
||||||
cfg.Output1 = revid.Rtp
|
case "Rtp":
|
||||||
case "":
|
cfg.Outputs = append(cfg.Outputs, revid.Rtp)
|
||||||
default:
|
case "":
|
||||||
log.Log(logger.Error, pkg+"bad output 1 argument")
|
default:
|
||||||
}
|
log.Log(logger.Error, pkg+"bad output argument", "arg", o)
|
||||||
|
}
|
||||||
switch *output2Ptr {
|
|
||||||
case "File":
|
|
||||||
cfg.Output2 = revid.File
|
|
||||||
case "Http":
|
|
||||||
cfg.Output2 = revid.Http
|
|
||||||
case "Rtmp":
|
|
||||||
cfg.Output2 = revid.Rtmp
|
|
||||||
case "FfmpegRtmp":
|
|
||||||
cfg.Output2 = revid.FfmpegRtmp
|
|
||||||
case "Udp":
|
|
||||||
cfg.Output2 = revid.Udp
|
|
||||||
case "Rtp":
|
|
||||||
cfg.Output2 = revid.Rtp
|
|
||||||
case "":
|
|
||||||
default:
|
|
||||||
log.Log(logger.Error, pkg+"bad output 2 argument")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
switch *rtmpMethodPtr {
|
switch *rtmpMethodPtr {
|
||||||
|
@ -259,66 +250,97 @@ func handleFlags() revid.Config {
|
||||||
}
|
}
|
||||||
|
|
||||||
// initialize then run the main NetSender client
|
// initialize then run the main NetSender client
|
||||||
func run(rv *revid.Revid, cfg revid.Config) error {
|
func run(cfg revid.Config) error {
|
||||||
// initialize NetSender and use NetSender's logger
|
|
||||||
//config.Logger = netsender.Logger()
|
|
||||||
log.Log(logger.Info, pkg+"running in NetSender mode")
|
log.Log(logger.Info, pkg+"running in NetSender mode")
|
||||||
|
|
||||||
|
var vars map[string]string
|
||||||
|
|
||||||
|
// initialize NetSender and use NetSender's logger
|
||||||
var ns netsender.Sender
|
var ns netsender.Sender
|
||||||
err := ns.Init(log, nil, nil, nil)
|
err := ns.Init(log, nil, nil, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
vars, _ := ns.Vars()
|
|
||||||
|
vars, _ = ns.Vars()
|
||||||
vs := ns.VarSum()
|
vs := ns.VarSum()
|
||||||
paused := false
|
|
||||||
if vars["mode"] == "Paused" {
|
rv, err := revid.New(cfg, &ns)
|
||||||
paused = true
|
if err != nil {
|
||||||
|
log.Log(logger.Fatal, pkg+"could not initialise revid", "error", err.Error())
|
||||||
}
|
}
|
||||||
if !paused {
|
|
||||||
rv, cfg, err = updateRevid(&ns, rv, cfg, vars, false)
|
// Update revid to get latest config settings from netreceiver.
|
||||||
|
err = rv.Update(vars)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// If mode on netreceiver isn't paused then we can start revid.
|
||||||
|
if ns.Mode() != paused && ns.Mode() != burst {
|
||||||
|
err = rv.Start()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ns.Mode() == burst {
|
||||||
|
ns.SetMode(paused, &vs)
|
||||||
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
if err := send(&ns, rv); err != nil {
|
// TODO(saxon): replace this call with call to ns.Run().
|
||||||
log.Log(logger.Error, pkg+"polling failed", "error", err.Error())
|
err = send(&ns, rv)
|
||||||
|
if err != nil {
|
||||||
|
log.Log(logger.Error, pkg+"Run Failed. Retrying...", "error", err.Error())
|
||||||
time.Sleep(netSendRetryTime)
|
time.Sleep(netSendRetryTime)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if vs != ns.VarSum() {
|
// If var sum hasn't change we continue
|
||||||
// vars changed
|
if vs == ns.VarSum() {
|
||||||
vars, err := ns.Vars()
|
goto sleep
|
||||||
if err != nil {
|
}
|
||||||
log.Log(logger.Error, pkg+"netSender failed to get vars", "error", err.Error())
|
|
||||||
time.Sleep(netSendRetryTime)
|
vars, err = ns.Vars()
|
||||||
continue
|
if err != nil {
|
||||||
}
|
log.Log(logger.Error, pkg+"netSender failed to get vars", "error", err.Error())
|
||||||
vs = ns.VarSum()
|
time.Sleep(netSendRetryTime)
|
||||||
if vars["mode"] == "Paused" {
|
continue
|
||||||
if !paused {
|
}
|
||||||
log.Log(logger.Info, pkg+"pausing revid")
|
vs = ns.VarSum()
|
||||||
err = stopRevid(rv)
|
|
||||||
if err != nil {
|
err = rv.Update(vars)
|
||||||
log.Log(logger.Error, pkg+"failed to stop revide", "error", err.Error())
|
if err != nil {
|
||||||
continue
|
return err
|
||||||
}
|
}
|
||||||
paused = true
|
|
||||||
}
|
switch ns.Mode() {
|
||||||
} else {
|
case paused:
|
||||||
rv, cfg, err = updateRevid(&ns, rv, cfg, vars, !paused)
|
case normal:
|
||||||
if err != nil {
|
err = rv.Start()
|
||||||
return err
|
if err != nil {
|
||||||
}
|
return err
|
||||||
if paused {
|
}
|
||||||
paused = false
|
case burst:
|
||||||
}
|
log.Log(logger.Info, pkg+"Starting burst...")
|
||||||
}
|
err = rv.Start()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
time.Sleep(time.Duration(rv.Config().BurstPeriod) * time.Second)
|
||||||
|
log.Log(logger.Info, pkg+"Stopping burst...")
|
||||||
|
err = rv.Stop()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
ns.SetMode(paused, &vs)
|
||||||
|
}
|
||||||
|
sleep:
|
||||||
|
sleepTime, err := strconv.Atoi(ns.Param("mp"))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
sleepTime, _ := strconv.Atoi(ns.Param("mp"))
|
|
||||||
time.Sleep(time.Duration(sleepTime) * time.Second)
|
time.Sleep(time.Duration(sleepTime) * time.Second)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -346,131 +368,27 @@ func send(ns *netsender.Sender, rv *revid.Revid) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// wrappers for stopping and starting revid
|
// flagStrings implements an appending string set flag.
|
||||||
func startRevid(ns *netsender.Sender, cfg revid.Config) (*revid.Revid, revid.Config, error) {
|
type flagStrings []string
|
||||||
rv, err := revid.New(cfg, ns)
|
|
||||||
if err != nil {
|
func (v *flagStrings) String() string {
|
||||||
return nil, cfg, err
|
if *v != nil {
|
||||||
|
return strings.Join(*v, ",")
|
||||||
}
|
}
|
||||||
err = rv.Start()
|
return ""
|
||||||
return rv, cfg, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func stopRevid(rv *revid.Revid) error {
|
func (v *flagStrings) Set(s string) error {
|
||||||
err := rv.Stop()
|
if s == "" {
|
||||||
if err != nil {
|
return nil
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
for _, e := range *v {
|
||||||
// FIXME(kortschak): Is this waiting on completion of work?
|
if e == s {
|
||||||
// Use a wait group and Wait method if it is.
|
return nil
|
||||||
time.Sleep(revidStopTime)
|
}
|
||||||
|
}
|
||||||
|
*v = append(*v, s)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func updateRevid(ns *netsender.Sender, rv *revid.Revid, cfg revid.Config, vars map[string]string, stop bool) (*revid.Revid, revid.Config, error) {
|
func (v *flagStrings) Get() interface{} { return *v }
|
||||||
if stop {
|
|
||||||
err := stopRevid(rv)
|
|
||||||
if err != nil {
|
|
||||||
return nil, cfg, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//look through the vars and update revid where needed
|
|
||||||
for key, value := range vars {
|
|
||||||
switch key {
|
|
||||||
case "Output":
|
|
||||||
switch value {
|
|
||||||
case "File":
|
|
||||||
cfg.Output1 = revid.File
|
|
||||||
case "Http":
|
|
||||||
cfg.Output1 = revid.Http
|
|
||||||
case "Rtmp":
|
|
||||||
cfg.Output1 = revid.Rtmp
|
|
||||||
case "FfmpegRtmp":
|
|
||||||
cfg.Output1 = revid.FfmpegRtmp
|
|
||||||
default:
|
|
||||||
log.Log(logger.Warning, pkg+"invalid Output1 param", "value", value)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
case "FramesPerClip":
|
|
||||||
f, err := strconv.ParseUint(value, 10, 0)
|
|
||||||
if err != nil {
|
|
||||||
log.Log(logger.Warning, pkg+"invalid framesperclip param", "value", value)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
cfg.FramesPerClip = uint(f)
|
|
||||||
case "RtmpUrl":
|
|
||||||
cfg.RtmpUrl = value
|
|
||||||
case "Bitrate":
|
|
||||||
r, err := strconv.ParseUint(value, 10, 0)
|
|
||||||
if err != nil {
|
|
||||||
log.Log(logger.Warning, pkg+"invalid framerate param", "value", value)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
cfg.Bitrate = uint(r)
|
|
||||||
case "OutputFileName":
|
|
||||||
cfg.OutputFileName = value
|
|
||||||
case "InputFileName":
|
|
||||||
cfg.InputFileName = value
|
|
||||||
case "Height":
|
|
||||||
h, err := strconv.ParseUint(value, 10, 0)
|
|
||||||
if err != nil {
|
|
||||||
log.Log(logger.Warning, pkg+"invalid height param", "value", value)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
cfg.Height = uint(h)
|
|
||||||
case "Width":
|
|
||||||
w, err := strconv.ParseUint(value, 10, 0)
|
|
||||||
if err != nil {
|
|
||||||
log.Log(logger.Warning, pkg+"invalid width param", "value", value)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
cfg.Width = uint(w)
|
|
||||||
case "FrameRate":
|
|
||||||
r, err := strconv.ParseUint(value, 10, 0)
|
|
||||||
if err != nil {
|
|
||||||
log.Log(logger.Warning, pkg+"invalid framerate param", "value", value)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
cfg.FrameRate = uint(r)
|
|
||||||
case "HttpAddress":
|
|
||||||
cfg.HttpAddress = value
|
|
||||||
case "Quantization":
|
|
||||||
q, err := strconv.ParseUint(value, 10, 0)
|
|
||||||
if err != nil {
|
|
||||||
log.Log(logger.Warning, pkg+"invalid quantization param", "value", value)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
cfg.Quantization = uint(q)
|
|
||||||
case "IntraRefreshPeriod":
|
|
||||||
p, err := strconv.ParseUint(value, 10, 0)
|
|
||||||
if err != nil {
|
|
||||||
log.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
cfg.IntraRefreshPeriod = uint(p)
|
|
||||||
case "HorizontalFlip":
|
|
||||||
switch strings.ToLower(value) {
|
|
||||||
case "true":
|
|
||||||
cfg.FlipHorizontal = true
|
|
||||||
case "false":
|
|
||||||
cfg.FlipHorizontal = false
|
|
||||||
default:
|
|
||||||
log.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value)
|
|
||||||
}
|
|
||||||
case "VerticalFlip":
|
|
||||||
switch strings.ToLower(value) {
|
|
||||||
case "true":
|
|
||||||
cfg.FlipVertical = true
|
|
||||||
case "false":
|
|
||||||
cfg.FlipVertical = false
|
|
||||||
default:
|
|
||||||
log.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value)
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return startRevid(ns, cfg)
|
|
||||||
}
|
|
||||||
|
|
|
@ -0,0 +1,144 @@
|
||||||
|
/*
|
||||||
|
NAME
|
||||||
|
decode.go
|
||||||
|
|
||||||
|
DESCRIPTION
|
||||||
|
decode.go provides functionality for the decoding of FLAC compressed audio
|
||||||
|
|
||||||
|
AUTHOR
|
||||||
|
Saxon Nelson-Milton <saxon@ausocean.org>
|
||||||
|
|
||||||
|
LICENSE
|
||||||
|
decode.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
|
||||||
|
|
||||||
|
It is free software: you can redistribute it and/or modify them
|
||||||
|
under the terms of the GNU General Public License as published by the
|
||||||
|
Free Software Foundation, either version 3 of the License, or (at your
|
||||||
|
option) any later version.
|
||||||
|
|
||||||
|
It is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||||
|
for more details.
|
||||||
|
|
||||||
|
You should have received a copy of the GNU General Public License
|
||||||
|
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
|
||||||
|
*/
|
||||||
|
package flac
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/go-audio/audio"
|
||||||
|
"github.com/go-audio/wav"
|
||||||
|
"github.com/mewkiz/flac"
|
||||||
|
)
|
||||||
|
|
||||||
|
const wavFormat = 1
|
||||||
|
|
||||||
|
// writeSeeker implements a memory based io.WriteSeeker.
|
||||||
|
type writeSeeker struct {
|
||||||
|
buf []byte
|
||||||
|
pos int
|
||||||
|
}
|
||||||
|
|
||||||
|
// Bytes returns the bytes contained in the writeSeekers buffer.
|
||||||
|
func (ws *writeSeeker) Bytes() []byte {
|
||||||
|
return ws.buf
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write writes len(p) bytes from p to the writeSeeker's buf and returns the number
|
||||||
|
// of bytes written. If less than len(p) bytes are written, an error is returned.
|
||||||
|
func (ws *writeSeeker) Write(p []byte) (n int, err error) {
|
||||||
|
minCap := ws.pos + len(p)
|
||||||
|
if minCap > cap(ws.buf) { // Make sure buf has enough capacity:
|
||||||
|
buf2 := make([]byte, len(ws.buf), minCap+len(p)) // add some extra
|
||||||
|
copy(buf2, ws.buf)
|
||||||
|
ws.buf = buf2
|
||||||
|
}
|
||||||
|
if minCap > len(ws.buf) {
|
||||||
|
ws.buf = ws.buf[:minCap]
|
||||||
|
}
|
||||||
|
copy(ws.buf[ws.pos:], p)
|
||||||
|
ws.pos += len(p)
|
||||||
|
return len(p), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Seek sets the offset for the next Read or Write to offset, interpreted according
|
||||||
|
// to whence: SeekStart means relative to the start of the file, SeekCurrent means
|
||||||
|
// relative to the current offset, and SeekEnd means relative to the end. Seek returns
|
||||||
|
// the new offset relative to the start of the file and an error, if any.
|
||||||
|
func (ws *writeSeeker) Seek(offset int64, whence int) (int64, error) {
|
||||||
|
newPos, offs := 0, int(offset)
|
||||||
|
switch whence {
|
||||||
|
case io.SeekStart:
|
||||||
|
newPos = offs
|
||||||
|
case io.SeekCurrent:
|
||||||
|
newPos = ws.pos + offs
|
||||||
|
case io.SeekEnd:
|
||||||
|
newPos = len(ws.buf) + offs
|
||||||
|
}
|
||||||
|
if newPos < 0 {
|
||||||
|
return 0, errors.New("negative result pos")
|
||||||
|
}
|
||||||
|
ws.pos = newPos
|
||||||
|
return int64(newPos), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Decode takes buf, a slice of FLAC, and decodes to WAV. If complete decoding
|
||||||
|
// fails, an error is returned.
|
||||||
|
func Decode(buf []byte) ([]byte, error) {
|
||||||
|
|
||||||
|
// Lex the FLAC into a stream to hold audio and it's properties.
|
||||||
|
r := bytes.NewReader(buf)
|
||||||
|
stream, err := flac.Parse(r)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.New("Could not parse FLAC")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create WAV encoder and pass writeSeeker that will store output WAV.
|
||||||
|
ws := &writeSeeker{}
|
||||||
|
sr := int(stream.Info.SampleRate)
|
||||||
|
bps := int(stream.Info.BitsPerSample)
|
||||||
|
nc := int(stream.Info.NChannels)
|
||||||
|
enc := wav.NewEncoder(ws, sr, bps, nc, wavFormat)
|
||||||
|
defer enc.Close()
|
||||||
|
|
||||||
|
// Decode FLAC into frames of samples
|
||||||
|
intBuf := &audio.IntBuffer{
|
||||||
|
Format: &audio.Format{NumChannels: nc, SampleRate: sr},
|
||||||
|
SourceBitDepth: bps,
|
||||||
|
}
|
||||||
|
return decodeFrames(stream, intBuf, enc, ws)
|
||||||
|
}
|
||||||
|
|
||||||
|
// decodeFrames parses frames from the stream and encodes them into WAV until
|
||||||
|
// the end of the stream is reached. The bytes from writeSeeker buffer are then
|
||||||
|
// returned. If any errors occur during encodeing, nil bytes and the error is returned.
|
||||||
|
func decodeFrames(s *flac.Stream, intBuf *audio.IntBuffer, e *wav.Encoder, ws *writeSeeker) ([]byte, error) {
|
||||||
|
var data []int
|
||||||
|
for {
|
||||||
|
frame, err := s.ParseNext()
|
||||||
|
|
||||||
|
// If we've reached the end of the stream then we can output the writeSeeker's buffer.
|
||||||
|
if err == io.EOF {
|
||||||
|
return ws.Bytes(), nil
|
||||||
|
} else if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Encode WAV audio samples.
|
||||||
|
data = data[:0]
|
||||||
|
for i := 0; i < frame.Subframes[0].NSamples; i++ {
|
||||||
|
for _, subframe := range frame.Subframes {
|
||||||
|
data = append(data, int(subframe.Samples[i]))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
intBuf.Data = data
|
||||||
|
if err := e.Write(intBuf); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,121 @@
|
||||||
|
/*
|
||||||
|
NAME
|
||||||
|
flac_test.go
|
||||||
|
|
||||||
|
DESCRIPTION
|
||||||
|
flac_test.go provides utilities to test FLAC audio decoding
|
||||||
|
|
||||||
|
AUTHOR
|
||||||
|
Saxon Nelson-Milton <saxon@ausocean.org>
|
||||||
|
|
||||||
|
LICENSE
|
||||||
|
flac_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
|
||||||
|
|
||||||
|
It is free software: you can redistribute it and/or modify them
|
||||||
|
under the terms of the GNU General Public License as published by the
|
||||||
|
Free Software Foundation, either version 3 of the License, or (at your
|
||||||
|
option) any later version.
|
||||||
|
|
||||||
|
It is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||||
|
for more details.
|
||||||
|
|
||||||
|
You should have received a copy of the GNU General Public License
|
||||||
|
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
|
||||||
|
*/
|
||||||
|
package flac
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
testFile = "../../../test/test-data/av/input/robot.flac"
|
||||||
|
outFile = "testOut.wav"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestWriteSeekerWrite checks that basic writing to the ws works as expected.
|
||||||
|
func TestWriteSeekerWrite(t *testing.T) {
|
||||||
|
ws := &writeSeeker{}
|
||||||
|
|
||||||
|
const tstStr1 = "hello"
|
||||||
|
ws.Write([]byte(tstStr1))
|
||||||
|
got := string(ws.buf)
|
||||||
|
if got != tstStr1 {
|
||||||
|
t.Errorf("Write failed, got: %v, want: %v", got, tstStr1)
|
||||||
|
}
|
||||||
|
|
||||||
|
const tstStr2 = " world"
|
||||||
|
const want = "hello world"
|
||||||
|
ws.Write([]byte(tstStr2))
|
||||||
|
got = string(ws.buf)
|
||||||
|
if got != want {
|
||||||
|
t.Errorf("Second write failed, got: %v, want: %v", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestWriteSeekerSeek checks that writing and seeking works as expected, i.e. we
|
||||||
|
// can write, then seek to a knew place in the buf, and write again, either replacing
|
||||||
|
// bytes, or appending bytes.
|
||||||
|
func TestWriteSeekerSeek(t *testing.T) {
|
||||||
|
ws := &writeSeeker{}
|
||||||
|
|
||||||
|
const tstStr1 = "hello"
|
||||||
|
want1 := tstStr1
|
||||||
|
ws.Write([]byte(tstStr1))
|
||||||
|
got := string(ws.buf)
|
||||||
|
if got != tstStr1 {
|
||||||
|
t.Errorf("Unexpected output, got: %v, want: %v", got, want1)
|
||||||
|
}
|
||||||
|
|
||||||
|
const tstStr2 = " world"
|
||||||
|
const want2 = tstStr1 + tstStr2
|
||||||
|
ws.Write([]byte(tstStr2))
|
||||||
|
got = string(ws.buf)
|
||||||
|
if got != want2 {
|
||||||
|
t.Errorf("Unexpected output, got: %v, want: %v", got, want2)
|
||||||
|
}
|
||||||
|
|
||||||
|
const tstStr3 = "k!"
|
||||||
|
const want3 = "hello work!"
|
||||||
|
ws.Seek(-2, io.SeekEnd)
|
||||||
|
ws.Write([]byte(tstStr3))
|
||||||
|
got = string(ws.buf)
|
||||||
|
if got != want3 {
|
||||||
|
t.Errorf("Unexpected output, got: %v, want: %v", got, want3)
|
||||||
|
}
|
||||||
|
|
||||||
|
const tstStr4 = "gopher"
|
||||||
|
const want4 = "hello gopher"
|
||||||
|
ws.Seek(6, io.SeekStart)
|
||||||
|
ws.Write([]byte(tstStr4))
|
||||||
|
got = string(ws.buf)
|
||||||
|
if got != want4 {
|
||||||
|
t.Errorf("Unexpected output, got: %v, want: %v", got, want4)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestDecodeFlac checks that we can load a flac file and decode to wav, writing
|
||||||
|
// to a wav file.
|
||||||
|
func TestDecodeFlac(t *testing.T) {
|
||||||
|
b, err := ioutil.ReadFile(testFile)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Could not read test file, failed with err: %v", err.Error())
|
||||||
|
}
|
||||||
|
out, err := Decode(b)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Could not decode, failed with err: %v", err.Error())
|
||||||
|
}
|
||||||
|
f, err := os.Create(outFile)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Could not create output file, failed with err: %v", err.Error())
|
||||||
|
}
|
||||||
|
_, err = f.Write(out)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Could not write to output file, failed with err: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
|
@ -40,8 +40,7 @@ type Config struct {
|
||||||
|
|
||||||
Input uint8
|
Input uint8
|
||||||
InputCodec uint8
|
InputCodec uint8
|
||||||
Output1 uint8
|
Outputs []uint8
|
||||||
Output2 uint8
|
|
||||||
RtmpMethod uint8
|
RtmpMethod uint8
|
||||||
Packetization uint8
|
Packetization uint8
|
||||||
|
|
||||||
|
@ -69,6 +68,7 @@ type Config struct {
|
||||||
RtpAddress string
|
RtpAddress string
|
||||||
Logger Logger
|
Logger Logger
|
||||||
SendRetry bool
|
SendRetry bool
|
||||||
|
BurstPeriod uint
|
||||||
}
|
}
|
||||||
|
|
||||||
// Enums for config struct
|
// Enums for config struct
|
||||||
|
@ -115,6 +115,7 @@ const (
|
||||||
defaultInputCodec = H264
|
defaultInputCodec = H264
|
||||||
defaultVerbosity = No // FIXME(kortschak): This makes no sense whatsoever. No is currently 15.
|
defaultVerbosity = No // FIXME(kortschak): This makes no sense whatsoever. No is currently 15.
|
||||||
defaultRtpAddr = "localhost:6970"
|
defaultRtpAddr = "localhost:6970"
|
||||||
|
defaultBurstPeriod = 10 // Seconds
|
||||||
)
|
)
|
||||||
|
|
||||||
// Validate checks for any errors in the config fields and defaults settings
|
// Validate checks for any errors in the config fields and defaults settings
|
||||||
|
@ -172,45 +173,38 @@ func (c *Config) Validate(r *Revid) error {
|
||||||
return errors.New("bad input codec defined in config")
|
return errors.New("bad input codec defined in config")
|
||||||
}
|
}
|
||||||
|
|
||||||
switch c.Output1 {
|
for i, o := range c.Outputs {
|
||||||
case File:
|
switch o {
|
||||||
case Udp:
|
case File:
|
||||||
case Rtmp, FfmpegRtmp:
|
case Udp:
|
||||||
if c.RtmpUrl == "" {
|
case Rtmp, FfmpegRtmp:
|
||||||
c.Logger.Log(logger.Info, pkg+"no RTMP URL: falling back to HTTP")
|
if c.RtmpUrl == "" {
|
||||||
c.Output1 = Http
|
c.Logger.Log(logger.Info, pkg+"no RTMP URL: falling back to HTTP")
|
||||||
break
|
c.Outputs[i] = Http
|
||||||
|
// FIXME(kortschak): Does this want the same line as below?
|
||||||
|
// c.FramesPerClip = httpFramesPerClip
|
||||||
|
break
|
||||||
|
}
|
||||||
|
c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for rtmp out",
|
||||||
|
"framesPerClip", defaultFramesPerClip)
|
||||||
|
c.FramesPerClip = defaultFramesPerClip
|
||||||
|
case NothingDefined:
|
||||||
|
c.Logger.Log(logger.Warning, pkg+"no output defined, defaulting", "output",
|
||||||
|
defaultOutput)
|
||||||
|
c.Outputs[i] = defaultOutput
|
||||||
|
fallthrough
|
||||||
|
case Http, Rtp:
|
||||||
|
c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for http out",
|
||||||
|
"framesPerClip", httpFramesPerClip)
|
||||||
|
c.FramesPerClip = httpFramesPerClip
|
||||||
|
default:
|
||||||
|
return errors.New("bad output type defined in config")
|
||||||
}
|
}
|
||||||
c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for rtmp out",
|
|
||||||
"framesPerClip", defaultFramesPerClip)
|
|
||||||
c.FramesPerClip = defaultFramesPerClip
|
|
||||||
case NothingDefined:
|
|
||||||
c.Logger.Log(logger.Warning, pkg+"no output defined, defaulting", "output",
|
|
||||||
defaultOutput)
|
|
||||||
c.Output1 = defaultOutput
|
|
||||||
fallthrough
|
|
||||||
case Http, Rtp:
|
|
||||||
c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for http out",
|
|
||||||
"framesPerClip", httpFramesPerClip)
|
|
||||||
c.FramesPerClip = httpFramesPerClip
|
|
||||||
default:
|
|
||||||
return errors.New("bad output type defined in config")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
switch c.Output2 {
|
if c.BurstPeriod == 0 {
|
||||||
case File:
|
c.Logger.Log(logger.Warning, pkg+"no burst period defined, defaulting", "burstPeriod", defaultBurstPeriod)
|
||||||
case Rtp:
|
c.BurstPeriod = defaultBurstPeriod
|
||||||
case Udp:
|
|
||||||
case Rtmp, FfmpegRtmp:
|
|
||||||
if c.RtmpUrl == "" {
|
|
||||||
c.Logger.Log(logger.Info, pkg+"no RTMP URL: falling back to HTTP")
|
|
||||||
c.Output2 = Http
|
|
||||||
break
|
|
||||||
}
|
|
||||||
case NothingDefined:
|
|
||||||
case Http:
|
|
||||||
default:
|
|
||||||
return errors.New("bad output2 type defined in config")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.FramesPerClip < 1 {
|
if c.FramesPerClip < 1 {
|
||||||
|
|
238
revid/revid.go
238
revid/revid.go
|
@ -37,6 +37,7 @@ import (
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"bitbucket.org/ausocean/av/stream"
|
"bitbucket.org/ausocean/av/stream"
|
||||||
|
@ -118,8 +119,12 @@ type Revid struct {
|
||||||
// bitrate hold the last send bitrate calculation result.
|
// bitrate hold the last send bitrate calculation result.
|
||||||
bitrate int
|
bitrate int
|
||||||
|
|
||||||
// isRunning is a loaded and cocked foot-gun.
|
mu sync.Mutex
|
||||||
isRunning bool
|
isRunning bool
|
||||||
|
|
||||||
|
wg sync.WaitGroup
|
||||||
|
|
||||||
|
err chan error
|
||||||
}
|
}
|
||||||
|
|
||||||
// packer takes data segments and packs them into clips
|
// packer takes data segments and packs them into clips
|
||||||
|
@ -157,8 +162,15 @@ func (p *packer) Write(frame []byte) (int, error) {
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
p.packetCount++
|
p.packetCount++
|
||||||
|
var hasRtmp bool
|
||||||
|
for _, d := range p.owner.config.Outputs {
|
||||||
|
if d == Rtmp {
|
||||||
|
hasRtmp = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
if (p.owner.config.Output1 != Rtmp && now.Sub(p.lastTime) > clipDuration && p.packetCount%7 == 0) || p.owner.config.Output1 == Rtmp {
|
if hasRtmp || (now.Sub(p.lastTime) > clipDuration && p.packetCount%7 == 0) {
|
||||||
p.owner.buffer.Flush()
|
p.owner.buffer.Flush()
|
||||||
p.packetCount = 0
|
p.packetCount = 0
|
||||||
p.lastTime = now
|
p.lastTime = now
|
||||||
|
@ -169,16 +181,35 @@ func (p *packer) Write(frame []byte) (int, error) {
|
||||||
// New returns a pointer to a new Revid with the desired configuration, and/or
|
// New returns a pointer to a new Revid with the desired configuration, and/or
|
||||||
// an error if construction of the new instance was not successful.
|
// an error if construction of the new instance was not successful.
|
||||||
func New(c Config, ns *netsender.Sender) (*Revid, error) {
|
func New(c Config, ns *netsender.Sender) (*Revid, error) {
|
||||||
r := Revid{ns: ns}
|
r := Revid{ns: ns, err: make(chan error)}
|
||||||
r.buffer = ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout)
|
r.buffer = ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout)
|
||||||
r.packer.owner = &r
|
r.packer.owner = &r
|
||||||
err := r.reset(c)
|
err := r.reset(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
go r.handleErrors()
|
||||||
return &r, nil
|
return &r, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(Saxon): put more thought into error severity.
|
||||||
|
func (r *Revid) handleErrors() {
|
||||||
|
for {
|
||||||
|
err := <-r.err
|
||||||
|
if err != nil {
|
||||||
|
r.config.Logger.Log(logger.Error, pkg+"async error", "error", err.Error())
|
||||||
|
err = r.Stop()
|
||||||
|
if err != nil {
|
||||||
|
r.config.Logger.Log(logger.Fatal, pkg+"failed to stop", "error", err.Error())
|
||||||
|
}
|
||||||
|
err = r.Start()
|
||||||
|
if err != nil {
|
||||||
|
r.config.Logger.Log(logger.Fatal, pkg+"failed to restart", "error", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Bitrate returns the result of the most recent bitrate check.
|
// Bitrate returns the result of the most recent bitrate check.
|
||||||
func (r *Revid) Bitrate() int {
|
func (r *Revid) Bitrate() int {
|
||||||
return r.bitrate
|
return r.bitrate
|
||||||
|
@ -203,40 +234,35 @@ func (r *Revid) reset(config Config) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
n := 1
|
r.destination = r.destination[:0]
|
||||||
if r.config.Output2 != 0 && r.config.Output2 != Rtp {
|
for _, typ := range r.config.Outputs {
|
||||||
n = 2
|
switch typ {
|
||||||
}
|
|
||||||
r.destination = make([]loadSender, n)
|
|
||||||
|
|
||||||
for outNo, outType := range []uint8{r.config.Output1, r.config.Output2} {
|
|
||||||
switch outType {
|
|
||||||
case File:
|
case File:
|
||||||
s, err := newFileSender(config.OutputFileName)
|
s, err := newFileSender(config.OutputFileName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
r.destination[outNo] = s
|
r.destination = append(r.destination, s)
|
||||||
case FfmpegRtmp:
|
case FfmpegRtmp:
|
||||||
s, err := newFfmpegSender(config.RtmpUrl, fmt.Sprint(r.config.FrameRate))
|
s, err := newFfmpegSender(config.RtmpUrl, fmt.Sprint(r.config.FrameRate))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
r.destination[outNo] = s
|
r.destination = append(r.destination, s)
|
||||||
case Rtmp:
|
case Rtmp:
|
||||||
s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log)
|
s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
r.destination[outNo] = s
|
r.destination = append(r.destination, s)
|
||||||
case Http:
|
case Http:
|
||||||
r.destination[outNo] = newHttpSender(r.ns, r.config.Logger.Log)
|
r.destination = append(r.destination, newHttpSender(r.ns, r.config.Logger.Log))
|
||||||
case Udp:
|
case Udp:
|
||||||
s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log)
|
s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
r.destination[outNo] = s
|
r.destination = append(r.destination, s)
|
||||||
case Rtp:
|
case Rtp:
|
||||||
r.rtpSender, err = newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate)
|
r.rtpSender, err = newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -293,21 +319,40 @@ func (r *Revid) reset(config Config) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsRunning returns whether the receiver is running.
|
// IsRunning returns true if revid is running.
|
||||||
func (r *Revid) IsRunning() bool {
|
func (r *Revid) IsRunning() bool {
|
||||||
return r.isRunning
|
r.mu.Lock()
|
||||||
|
ret := r.isRunning
|
||||||
|
r.mu.Unlock()
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Revid) Config() Config {
|
||||||
|
r.mu.Lock()
|
||||||
|
cfg := r.config
|
||||||
|
r.mu.Unlock()
|
||||||
|
return cfg
|
||||||
|
}
|
||||||
|
|
||||||
|
// setIsRunning sets r.isRunning using b.
|
||||||
|
func (r *Revid) setIsRunning(b bool) {
|
||||||
|
r.mu.Lock()
|
||||||
|
r.isRunning = b
|
||||||
|
r.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start invokes a Revid to start processing video from a defined input
|
// Start invokes a Revid to start processing video from a defined input
|
||||||
// and packetising (if theres packetization) to a defined output.
|
// and packetising (if theres packetization) to a defined output.
|
||||||
func (r *Revid) Start() error {
|
func (r *Revid) Start() error {
|
||||||
if r.isRunning {
|
if r.IsRunning() {
|
||||||
return errors.New(pkg + "start called but revid is already running")
|
return errors.New(pkg + "start called but revid is already running")
|
||||||
}
|
}
|
||||||
r.config.Logger.Log(logger.Info, pkg+"starting Revid")
|
r.config.Logger.Log(logger.Info, pkg+"starting Revid")
|
||||||
|
// TODO: this doesn't need to be here
|
||||||
r.config.Logger.Log(logger.Debug, pkg+"setting up output")
|
r.config.Logger.Log(logger.Debug, pkg+"setting up output")
|
||||||
r.isRunning = true
|
r.setIsRunning(true)
|
||||||
r.config.Logger.Log(logger.Info, pkg+"starting output routine")
|
r.config.Logger.Log(logger.Info, pkg+"starting output routine")
|
||||||
|
r.wg.Add(1)
|
||||||
go r.outputClips()
|
go r.outputClips()
|
||||||
r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content")
|
r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content")
|
||||||
err := r.setupInput()
|
err := r.setupInput()
|
||||||
|
@ -316,28 +361,145 @@ func (r *Revid) Start() error {
|
||||||
|
|
||||||
// Stop halts any processing of video data from a camera or file
|
// Stop halts any processing of video data from a camera or file
|
||||||
func (r *Revid) Stop() error {
|
func (r *Revid) Stop() error {
|
||||||
if !r.isRunning {
|
if !r.IsRunning() {
|
||||||
return errors.New(pkg + "stop called but revid is already stopped")
|
return errors.New(pkg + "stop called but revid is already stopped")
|
||||||
}
|
}
|
||||||
|
|
||||||
r.config.Logger.Log(logger.Info, pkg+"stopping revid")
|
r.config.Logger.Log(logger.Info, pkg+"stopping revid")
|
||||||
r.isRunning = false
|
r.setIsRunning(false)
|
||||||
|
|
||||||
r.config.Logger.Log(logger.Info, pkg+"killing input proccess")
|
r.config.Logger.Log(logger.Info, pkg+"killing input proccess")
|
||||||
// If a cmd process is running, we kill!
|
// If a cmd process is running, we kill!
|
||||||
if r.cmd != nil && r.cmd.Process != nil {
|
if r.cmd != nil && r.cmd.Process != nil {
|
||||||
r.cmd.Process.Kill()
|
r.cmd.Process.Kill()
|
||||||
}
|
}
|
||||||
|
r.wg.Wait()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Revid) Update(vars map[string]string) error {
|
||||||
|
if r.IsRunning() {
|
||||||
|
if err := r.Stop(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//look through the vars and update revid where needed
|
||||||
|
for key, value := range vars {
|
||||||
|
switch key {
|
||||||
|
case "Output":
|
||||||
|
// FIXME(kortschak): There can be only one!
|
||||||
|
// How do we specify outputs after the first?
|
||||||
|
//
|
||||||
|
// Maybe we shouldn't be doing this!
|
||||||
|
switch value {
|
||||||
|
case "File":
|
||||||
|
r.config.Outputs[0] = File
|
||||||
|
case "Http":
|
||||||
|
r.config.Outputs[0] = Http
|
||||||
|
case "Rtmp":
|
||||||
|
r.config.Outputs[0] = Rtmp
|
||||||
|
case "FfmpegRtmp":
|
||||||
|
r.config.Outputs[0] = FfmpegRtmp
|
||||||
|
default:
|
||||||
|
r.config.Logger.Log(logger.Warning, pkg+"invalid Output1 param", "value", value)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
case "FramesPerClip":
|
||||||
|
f, err := strconv.ParseUint(value, 10, 0)
|
||||||
|
if err != nil {
|
||||||
|
r.config.Logger.Log(logger.Warning, pkg+"invalid framesperclip param", "value", value)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
r.config.FramesPerClip = uint(f)
|
||||||
|
case "RtmpUrl":
|
||||||
|
r.config.RtmpUrl = value
|
||||||
|
case "Bitrate":
|
||||||
|
v, err := strconv.ParseUint(value, 10, 0)
|
||||||
|
if err != nil {
|
||||||
|
r.config.Logger.Log(logger.Warning, pkg+"invalid framerate param", "value", value)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
r.config.Bitrate = uint(v)
|
||||||
|
case "OutputFileName":
|
||||||
|
r.config.OutputFileName = value
|
||||||
|
case "InputFileName":
|
||||||
|
r.config.InputFileName = value
|
||||||
|
case "Height":
|
||||||
|
h, err := strconv.ParseUint(value, 10, 0)
|
||||||
|
if err != nil {
|
||||||
|
r.config.Logger.Log(logger.Warning, pkg+"invalid height param", "value", value)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
r.config.Height = uint(h)
|
||||||
|
case "Width":
|
||||||
|
w, err := strconv.ParseUint(value, 10, 0)
|
||||||
|
if err != nil {
|
||||||
|
r.config.Logger.Log(logger.Warning, pkg+"invalid width param", "value", value)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
r.config.Width = uint(w)
|
||||||
|
case "FrameRate":
|
||||||
|
v, err := strconv.ParseUint(value, 10, 0)
|
||||||
|
if err != nil {
|
||||||
|
r.config.Logger.Log(logger.Warning, pkg+"invalid framerate param", "value", value)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
r.config.FrameRate = uint(v)
|
||||||
|
case "HttpAddress":
|
||||||
|
r.config.HttpAddress = value
|
||||||
|
case "Quantization":
|
||||||
|
q, err := strconv.ParseUint(value, 10, 0)
|
||||||
|
if err != nil {
|
||||||
|
r.config.Logger.Log(logger.Warning, pkg+"invalid quantization param", "value", value)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
r.config.Quantization = uint(q)
|
||||||
|
case "IntraRefreshPeriod":
|
||||||
|
p, err := strconv.ParseUint(value, 10, 0)
|
||||||
|
if err != nil {
|
||||||
|
r.config.Logger.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
r.config.IntraRefreshPeriod = uint(p)
|
||||||
|
case "HorizontalFlip":
|
||||||
|
switch strings.ToLower(value) {
|
||||||
|
case "true":
|
||||||
|
r.config.FlipHorizontal = true
|
||||||
|
case "false":
|
||||||
|
r.config.FlipHorizontal = false
|
||||||
|
default:
|
||||||
|
r.config.Logger.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value)
|
||||||
|
}
|
||||||
|
case "VerticalFlip":
|
||||||
|
switch strings.ToLower(value) {
|
||||||
|
case "true":
|
||||||
|
r.config.FlipVertical = true
|
||||||
|
case "false":
|
||||||
|
r.config.FlipVertical = false
|
||||||
|
default:
|
||||||
|
r.config.Logger.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value)
|
||||||
|
}
|
||||||
|
case "BurstPeriod":
|
||||||
|
v, err := strconv.ParseUint(value, 10, 0)
|
||||||
|
if err != nil {
|
||||||
|
r.config.Logger.Log(logger.Warning, pkg+"invalid BurstPeriod param", "value", value)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
r.config.BurstPeriod = uint(v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// outputClips takes the clips produced in the packClips method and outputs them
|
// outputClips takes the clips produced in the packClips method and outputs them
|
||||||
// to the desired output defined in the revid config
|
// to the desired output defined in the revid config
|
||||||
func (r *Revid) outputClips() {
|
func (r *Revid) outputClips() {
|
||||||
|
defer r.wg.Done()
|
||||||
lastTime := time.Now()
|
lastTime := time.Now()
|
||||||
var count int
|
var count int
|
||||||
loop:
|
loop:
|
||||||
for r.isRunning {
|
for r.IsRunning() {
|
||||||
// If the ring buffer has something we can read and send off
|
// If the ring buffer has something we can read and send off
|
||||||
chunk, err := r.buffer.Next(readTimeout)
|
chunk, err := r.buffer.Next(readTimeout)
|
||||||
switch err {
|
switch err {
|
||||||
|
@ -381,7 +543,7 @@ loop:
|
||||||
err = rs.restart()
|
err = rs.restart()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.config.Logger.Log(logger.Error, pkg+"failed to restart rtmp session", "error", err.Error())
|
r.config.Logger.Log(logger.Error, pkg+"failed to restart rtmp session", "error", err.Error())
|
||||||
r.isRunning = false
|
r.setIsRunning(false)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -472,11 +634,9 @@ func (r *Revid) startRaspivid() error {
|
||||||
r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error())
|
r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
r.config.Logger.Log(logger.Info, pkg+"reading camera data")
|
r.wg.Add(1)
|
||||||
delay := time.Second / time.Duration(r.config.FrameRate)
|
go r.processFrom(stdout, 0)
|
||||||
err = r.lexTo(r.encoder, stdout, delay)
|
return nil
|
||||||
r.config.Logger.Log(logger.Info, pkg+"finished reading camera data")
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Revid) startV4L() error {
|
func (r *Revid) startV4L() error {
|
||||||
|
@ -512,7 +672,6 @@ func (r *Revid) startV4L() error {
|
||||||
r.config.Logger.Log(logger.Info, pkg+"ffmpeg args", "args", strings.Join(args, " "))
|
r.config.Logger.Log(logger.Info, pkg+"ffmpeg args", "args", strings.Join(args, " "))
|
||||||
r.cmd = exec.Command("ffmpeg", args...)
|
r.cmd = exec.Command("ffmpeg", args...)
|
||||||
|
|
||||||
delay := time.Second / time.Duration(r.config.FrameRate)
|
|
||||||
stdout, err := r.cmd.StdoutPipe()
|
stdout, err := r.cmd.StdoutPipe()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -524,15 +683,13 @@ func (r *Revid) startV4L() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
r.config.Logger.Log(logger.Info, pkg+"reading camera data")
|
r.wg.Add(1)
|
||||||
err = r.lexTo(r.encoder, stdout, delay)
|
go r.processFrom(stdout, time.Duration(0))
|
||||||
r.config.Logger.Log(logger.Info, pkg+"finished reading camera data")
|
return nil
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// setupInputForFile sets things up for getting input from a file
|
// setupInputForFile sets things up for getting input from a file
|
||||||
func (r *Revid) setupInputForFile() error {
|
func (r *Revid) setupInputForFile() error {
|
||||||
delay := time.Second / time.Duration(r.config.FrameRate)
|
|
||||||
f, err := os.Open(r.config.InputFileName)
|
f, err := os.Open(r.config.InputFileName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.config.Logger.Log(logger.Error, err.Error())
|
r.config.Logger.Log(logger.Error, err.Error())
|
||||||
|
@ -542,5 +699,14 @@ func (r *Revid) setupInputForFile() error {
|
||||||
defer f.Close()
|
defer f.Close()
|
||||||
|
|
||||||
// TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop.
|
// TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop.
|
||||||
return r.lexTo(r.encoder, f, delay)
|
r.wg.Add(1)
|
||||||
|
go r.processFrom(f, time.Second/time.Duration(r.config.FrameRate))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Revid) processFrom(read io.Reader, delay time.Duration) {
|
||||||
|
r.config.Logger.Log(logger.Info, pkg+"reading input data")
|
||||||
|
r.err <- r.lexTo(r.encoder, read, delay)
|
||||||
|
r.config.Logger.Log(logger.Info, pkg+"finished reading input data")
|
||||||
|
r.wg.Done()
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,7 +51,7 @@ import (
|
||||||
const (
|
const (
|
||||||
typeNumber = 0x00
|
typeNumber = 0x00
|
||||||
typeBoolean = 0x01
|
typeBoolean = 0x01
|
||||||
typeString = 0x02
|
TypeString = 0x02
|
||||||
TypeObject = 0x03
|
TypeObject = 0x03
|
||||||
typeMovieClip = 0x04
|
typeMovieClip = 0x04
|
||||||
TypeNull = 0x05
|
TypeNull = 0x05
|
||||||
|
@ -93,7 +93,7 @@ type Property struct {
|
||||||
var (
|
var (
|
||||||
ErrShortBuffer = errors.New("amf: short buffer") // The supplied buffer was too short.
|
ErrShortBuffer = errors.New("amf: short buffer") // The supplied buffer was too short.
|
||||||
ErrInvalidType = errors.New("amf: invalid type") // An invalid type was supplied to the encoder.
|
ErrInvalidType = errors.New("amf: invalid type") // An invalid type was supplied to the encoder.
|
||||||
ErrUnexpectedType = errors.New("amf: unexpected end") // An unexpected type was encountered while decoding.
|
ErrUnexpectedType = errors.New("amf: unexpected type") // An unexpected type was encountered while decoding.
|
||||||
ErrPropertyNotFound = errors.New("amf: property not found") // The requested property was not found.
|
ErrPropertyNotFound = errors.New("amf: property not found") // The requested property was not found.
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -160,6 +160,7 @@ func EncodeInt32(buf []byte, val uint32) ([]byte, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// EncodeString encodes a string.
|
// EncodeString encodes a string.
|
||||||
|
// Strings less than 65536 in length are encoded as TypeString, while longer strings are ecodeded as typeLongString.
|
||||||
func EncodeString(buf []byte, val string) ([]byte, error) {
|
func EncodeString(buf []byte, val string) ([]byte, error) {
|
||||||
const typeSize = 1
|
const typeSize = 1
|
||||||
if len(val) < 65536 && len(val)+typeSize+binary.Size(int16(0)) > len(buf) {
|
if len(val) < 65536 && len(val)+typeSize+binary.Size(int16(0)) > len(buf) {
|
||||||
|
@ -171,7 +172,7 @@ func EncodeString(buf []byte, val string) ([]byte, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(val) < 65536 {
|
if len(val) < 65536 {
|
||||||
buf[0] = typeString
|
buf[0] = TypeString
|
||||||
buf = buf[1:]
|
buf = buf[1:]
|
||||||
binary.BigEndian.PutUint16(buf[:2], uint16(len(val)))
|
binary.BigEndian.PutUint16(buf[:2], uint16(len(val)))
|
||||||
buf = buf[2:]
|
buf = buf[2:]
|
||||||
|
@ -263,7 +264,7 @@ func EncodeProperty(prop *Property, buf []byte) ([]byte, error) {
|
||||||
return EncodeNumber(buf, prop.Number)
|
return EncodeNumber(buf, prop.Number)
|
||||||
case typeBoolean:
|
case typeBoolean:
|
||||||
return EncodeBoolean(buf, prop.Number != 0)
|
return EncodeBoolean(buf, prop.Number != 0)
|
||||||
case typeString:
|
case TypeString:
|
||||||
return EncodeString(buf, prop.String)
|
return EncodeString(buf, prop.String)
|
||||||
case TypeNull:
|
case TypeNull:
|
||||||
if len(buf) < 2 {
|
if len(buf) < 2 {
|
||||||
|
@ -320,7 +321,7 @@ func DecodeProperty(prop *Property, buf []byte, decodeName bool) (int, error) {
|
||||||
prop.Number = float64(buf[0])
|
prop.Number = float64(buf[0])
|
||||||
buf = buf[1:]
|
buf = buf[1:]
|
||||||
|
|
||||||
case typeString:
|
case TypeString:
|
||||||
n := DecodeInt16(buf[:2])
|
n := DecodeInt16(buf[:2])
|
||||||
if len(buf) < int(n+2) {
|
if len(buf) < int(n+2) {
|
||||||
return 0, ErrShortBuffer
|
return 0, ErrShortBuffer
|
||||||
|
@ -354,7 +355,6 @@ func DecodeProperty(prop *Property, buf []byte, decodeName bool) (int, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Encode encodes an Object into its AMF representation.
|
// Encode encodes an Object into its AMF representation.
|
||||||
// This is the top-level encoding function and is typically the only function callers will need to use.
|
|
||||||
func Encode(obj *Object, buf []byte) ([]byte, error) {
|
func Encode(obj *Object, buf []byte) ([]byte, error) {
|
||||||
if len(buf) < 5 {
|
if len(buf) < 5 {
|
||||||
return nil, ErrShortBuffer
|
return nil, ErrShortBuffer
|
||||||
|
@ -481,7 +481,7 @@ func (obj *Object) NumberProperty(name string, idx int) (float64, error) {
|
||||||
|
|
||||||
// StringProperty is a wrapper for Property that returns a String property's value, if any.
|
// StringProperty is a wrapper for Property that returns a String property's value, if any.
|
||||||
func (obj *Object) StringProperty(name string, idx int) (string, error) {
|
func (obj *Object) StringProperty(name string, idx int) (string, error) {
|
||||||
prop, err := obj.Property(name, idx, typeString)
|
prop, err := obj.Property(name, idx, TypeString)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,7 +58,7 @@ func TestSanity(t *testing.T) {
|
||||||
// TestStrings tests string encoding and decoding.
|
// TestStrings tests string encoding and decoding.
|
||||||
func TestStrings(t *testing.T) {
|
func TestStrings(t *testing.T) {
|
||||||
// Short string encoding is as follows:
|
// Short string encoding is as follows:
|
||||||
// enc[0] = data type (typeString)
|
// enc[0] = data type (TypeString)
|
||||||
// end[1:3] = size
|
// end[1:3] = size
|
||||||
// enc[3:] = data
|
// enc[3:] = data
|
||||||
for _, s := range testStrings {
|
for _, s := range testStrings {
|
||||||
|
@ -67,8 +67,8 @@ func TestStrings(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("EncodeString failed")
|
t.Errorf("EncodeString failed")
|
||||||
}
|
}
|
||||||
if buf[0] != typeString {
|
if buf[0] != TypeString {
|
||||||
t.Errorf("Expected typeString, got %v", buf[0])
|
t.Errorf("Expected TypeString, got %v", buf[0])
|
||||||
}
|
}
|
||||||
ds := DecodeString(buf[1:])
|
ds := DecodeString(buf[1:])
|
||||||
if s != ds {
|
if s != ds {
|
||||||
|
@ -76,7 +76,7 @@ func TestStrings(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Long string encoding is as follows:
|
// Long string encoding is as follows:
|
||||||
// enc[0] = data type (typeString)
|
// enc[0] = data type (TypeString)
|
||||||
// end[1:5] = size
|
// end[1:5] = size
|
||||||
// enc[5:] = data
|
// enc[5:] = data
|
||||||
s := string(make([]byte, 65536))
|
s := string(make([]byte, 65536))
|
||||||
|
@ -148,7 +148,7 @@ func TestProperties(t *testing.T) {
|
||||||
// Encode/decode string properties.
|
// Encode/decode string properties.
|
||||||
enc = buf[:]
|
enc = buf[:]
|
||||||
for i := range testStrings {
|
for i := range testStrings {
|
||||||
enc, err = EncodeProperty(&Property{Type: typeString, String: testStrings[i]}, enc)
|
enc, err = EncodeProperty(&Property{Type: TypeString, String: testStrings[i]}, enc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("EncodeProperty of string failed")
|
t.Errorf("EncodeProperty of string failed")
|
||||||
}
|
}
|
||||||
|
@ -235,7 +235,7 @@ func TestObject(t *testing.T) {
|
||||||
// Construct a more complicated object that includes a nested object.
|
// Construct a more complicated object that includes a nested object.
|
||||||
var obj2 Object
|
var obj2 Object
|
||||||
for i := range testStrings {
|
for i := range testStrings {
|
||||||
obj2.Properties = append(obj2.Properties, Property{Type: typeString, String: testStrings[i]})
|
obj2.Properties = append(obj2.Properties, Property{Type: TypeString, String: testStrings[i]})
|
||||||
obj2.Properties = append(obj2.Properties, Property{Type: typeNumber, Number: float64(testNumbers[i])})
|
obj2.Properties = append(obj2.Properties, Property{Type: typeNumber, Number: float64(testNumbers[i])})
|
||||||
}
|
}
|
||||||
obj2.Properties = append(obj2.Properties, Property{Type: TypeObject, Object: obj1})
|
obj2.Properties = append(obj2.Properties, Property{Type: TypeObject, Object: obj1})
|
||||||
|
|
16
rtmp/conn.go
16
rtmp/conn.go
|
@ -76,7 +76,7 @@ type link struct {
|
||||||
protocol int32
|
protocol int32
|
||||||
timeout uint
|
timeout uint
|
||||||
port uint16
|
port uint16
|
||||||
conn *net.TCPConn
|
conn net.Conn
|
||||||
}
|
}
|
||||||
|
|
||||||
// method represents an RTMP method.
|
// method represents an RTMP method.
|
||||||
|
@ -121,20 +121,6 @@ func Dial(url string, timeout uint, log Log) (*Conn, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if c.link.app == "" {
|
|
||||||
return nil, errInvalidURL
|
|
||||||
}
|
|
||||||
if c.link.port == 0 {
|
|
||||||
switch {
|
|
||||||
case (c.link.protocol & featureSSL) != 0:
|
|
||||||
c.link.port = 433
|
|
||||||
c.log(FatalLevel, pkg+"SSL not supported")
|
|
||||||
case (c.link.protocol & featureHTTP) != 0:
|
|
||||||
c.link.port = 80
|
|
||||||
default:
|
|
||||||
c.link.port = 1935
|
|
||||||
}
|
|
||||||
}
|
|
||||||
c.link.url = rtmpProtocolStrings[c.link.protocol] + "://" + c.link.host + ":" + strconv.Itoa(int(c.link.port)) + "/" + c.link.app
|
c.link.url = rtmpProtocolStrings[c.link.protocol] + "://" + c.link.host + ":" + strconv.Itoa(int(c.link.port)) + "/" + c.link.app
|
||||||
c.link.protocol |= featureWrite
|
c.link.protocol |= featureWrite
|
||||||
|
|
||||||
|
|
|
@ -81,7 +81,7 @@ const (
|
||||||
// 3: basic header (chunk type and stream ID) (1 byte)
|
// 3: basic header (chunk type and stream ID) (1 byte)
|
||||||
var headerSizes = [...]int{12, 8, 4, 1}
|
var headerSizes = [...]int{12, 8, 4, 1}
|
||||||
|
|
||||||
// packet defines an RTMP packet.
|
// packet represents an RTMP packet.
|
||||||
type packet struct {
|
type packet struct {
|
||||||
headerType uint8
|
headerType uint8
|
||||||
packetType uint8
|
packetType uint8
|
||||||
|
@ -90,7 +90,6 @@ type packet struct {
|
||||||
timestamp uint32
|
timestamp uint32
|
||||||
streamID uint32
|
streamID uint32
|
||||||
bodySize uint32
|
bodySize uint32
|
||||||
bytesRead uint32
|
|
||||||
buf []byte
|
buf []byte
|
||||||
body []byte
|
body []byte
|
||||||
}
|
}
|
||||||
|
@ -179,7 +178,6 @@ func (pkt *packet) readFrom(c *Conn) error {
|
||||||
pkt.timestamp = amf.DecodeInt24(header[:3])
|
pkt.timestamp = amf.DecodeInt24(header[:3])
|
||||||
if size >= 6 {
|
if size >= 6 {
|
||||||
pkt.bodySize = amf.DecodeInt24(header[3:6])
|
pkt.bodySize = amf.DecodeInt24(header[3:6])
|
||||||
pkt.bytesRead = 0
|
|
||||||
|
|
||||||
if size > 6 {
|
if size > 6 {
|
||||||
pkt.packetType = header[6]
|
pkt.packetType = header[6]
|
||||||
|
@ -201,25 +199,18 @@ func (pkt *packet) readFrom(c *Conn) error {
|
||||||
hSize += 4
|
hSize += 4
|
||||||
}
|
}
|
||||||
|
|
||||||
if pkt.bodySize > 0 && pkt.body == nil {
|
pkt.resize(pkt.bodySize, pkt.headerType)
|
||||||
pkt.resize(pkt.bodySize, (hbuf[0]&0xc0)>>6)
|
|
||||||
|
if pkt.bodySize > c.inChunkSize {
|
||||||
|
c.log(WarnLevel, pkg+"reading large packet", "size", int(pkt.bodySize))
|
||||||
}
|
}
|
||||||
|
|
||||||
toRead := pkt.bodySize - pkt.bytesRead
|
_, err = c.read(pkt.body[:pkt.bodySize])
|
||||||
chunkSize := c.inChunkSize
|
|
||||||
|
|
||||||
if toRead < chunkSize {
|
|
||||||
chunkSize = toRead
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = c.read(pkt.body[pkt.bytesRead:][:chunkSize])
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log(DebugLevel, pkg+"failed to read packet body", "error", err.Error())
|
c.log(DebugLevel, pkg+"failed to read packet body", "error", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
pkt.bytesRead += uint32(chunkSize)
|
|
||||||
|
|
||||||
// Keep the packet as a reference for other packets on this channel.
|
// Keep the packet as a reference for other packets on this channel.
|
||||||
if c.channelsIn[pkt.channel] == nil {
|
if c.channelsIn[pkt.channel] == nil {
|
||||||
c.channelsIn[pkt.channel] = &packet{}
|
c.channelsIn[pkt.channel] = &packet{}
|
||||||
|
@ -237,15 +228,16 @@ func (pkt *packet) readFrom(c *Conn) error {
|
||||||
c.channelTimestamp[pkt.channel] = int32(pkt.timestamp)
|
c.channelTimestamp[pkt.channel] = int32(pkt.timestamp)
|
||||||
|
|
||||||
c.channelsIn[pkt.channel].body = nil
|
c.channelsIn[pkt.channel].body = nil
|
||||||
c.channelsIn[pkt.channel].bytesRead = 0
|
|
||||||
c.channelsIn[pkt.channel].hasAbsTimestamp = false
|
c.channelsIn[pkt.channel].hasAbsTimestamp = false
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// resize adjusts the packet's storage to accommodate a body of the given size and header type.
|
// resize adjusts the packet's storage (if necessary) to accommodate a body of the given size and header type.
|
||||||
// When headerSizeAuto is specified, the header type is computed based on packet type.
|
// When headerSizeAuto is specified, the header type is computed based on packet type.
|
||||||
func (pkt *packet) resize(size uint32, ht uint8) {
|
func (pkt *packet) resize(size uint32, ht uint8) {
|
||||||
pkt.buf = make([]byte, fullHeaderSize+size)
|
if cap(pkt.buf) < fullHeaderSize+int(size) {
|
||||||
|
pkt.buf = make([]byte, fullHeaderSize+size)
|
||||||
|
}
|
||||||
pkt.body = pkt.buf[fullHeaderSize:]
|
pkt.body = pkt.buf[fullHeaderSize:]
|
||||||
if ht != headerSizeAuto {
|
if ht != headerSizeAuto {
|
||||||
pkt.headerType = ht
|
pkt.headerType = ht
|
||||||
|
@ -407,7 +399,7 @@ func (pkt *packet) writeTo(c *Conn, queue bool) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Send previously deferrd packet if combining it with the next one would exceed the chunk size.
|
// Send previously deferred packet if combining it with the next one would exceed the chunk size.
|
||||||
if len(c.deferred)+size+hSize > chunkSize {
|
if len(c.deferred)+size+hSize > chunkSize {
|
||||||
c.log(DebugLevel, pkg+"sending deferred packet separately", "size", len(c.deferred))
|
c.log(DebugLevel, pkg+"sending deferred packet separately", "size", len(c.deferred))
|
||||||
_, err := c.write(c.deferred)
|
_, err := c.write(c.deferred)
|
||||||
|
@ -419,7 +411,7 @@ func (pkt *packet) writeTo(c *Conn, queue bool) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(kortschak): Rewrite this horrific peice of premature optimisation.
|
// TODO(kortschak): Rewrite this horrific peice of premature optimisation.
|
||||||
c.log(DebugLevel, pkg+"sending packet", "la", c.link.conn.LocalAddr(), "ra", c.link.conn.RemoteAddr(), "size", size)
|
c.log(DebugLevel, pkg+"sending packet", "size", size, "la", c.link.conn.LocalAddr(), "ra", c.link.conn.RemoteAddr())
|
||||||
for size+hSize != 0 {
|
for size+hSize != 0 {
|
||||||
if chunkSize > size {
|
if chunkSize > size {
|
||||||
chunkSize = size
|
chunkSize = size
|
||||||
|
|
|
@ -41,7 +41,6 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// parseURL parses an RTMP URL (ok, technically it is lexing).
|
// parseURL parses an RTMP URL (ok, technically it is lexing).
|
||||||
//
|
|
||||||
func parseURL(addr string) (protocol int32, host string, port uint16, app, playpath string, err error) {
|
func parseURL(addr string) (protocol int32, host string, port uint16, app, playpath string, err error) {
|
||||||
u, err := url.Parse(addr)
|
u, err := url.Parse(addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -81,6 +80,9 @@ func parseURL(addr string) (protocol int32, host string, port uint16, app, playp
|
||||||
}
|
}
|
||||||
elems := strings.SplitN(u.Path[1:], "/", 3)
|
elems := strings.SplitN(u.Path[1:], "/", 3)
|
||||||
app = elems[0]
|
app = elems[0]
|
||||||
|
if app == "" {
|
||||||
|
return protocol, host, port, app, playpath, errInvalidURL
|
||||||
|
}
|
||||||
playpath = elems[1]
|
playpath = elems[1]
|
||||||
if len(elems) == 3 && len(elems[2]) != 0 {
|
if len(elems) == 3 && len(elems[2]) != 0 {
|
||||||
playpath = path.Join(elems[1:]...)
|
playpath = path.Join(elems[1:]...)
|
||||||
|
@ -97,5 +99,15 @@ func parseURL(addr string) (protocol int32, host string, port uint16, app, playp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case port != 0:
|
||||||
|
case (protocol & featureSSL) != 0:
|
||||||
|
return protocol, host, port, app, playpath, errUnimplemented // port = 433
|
||||||
|
case (protocol & featureHTTP) != 0:
|
||||||
|
port = 80
|
||||||
|
default:
|
||||||
|
port = 1935
|
||||||
|
}
|
||||||
|
|
||||||
return protocol, host, port, app, playpath, nil
|
return protocol, host, port, app, playpath, nil
|
||||||
}
|
}
|
||||||
|
|
41
rtmp/rtmp.go
41
rtmp/rtmp.go
|
@ -174,6 +174,13 @@ func connect(c *Conn) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
c.log(DebugLevel, pkg+"connected")
|
c.log(DebugLevel, pkg+"connected")
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
c.link.conn.Close()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
err = handshake(c)
|
err = handshake(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log(WarnLevel, pkg+"handshake failed", "error", err.Error())
|
c.log(WarnLevel, pkg+"handshake failed", "error", err.Error())
|
||||||
|
@ -185,12 +192,14 @@ func connect(c *Conn) error {
|
||||||
c.log(WarnLevel, pkg+"sendConnect failed", "error", err.Error())
|
c.log(WarnLevel, pkg+"sendConnect failed", "error", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
c.log(DebugLevel, pkg+"negotiating")
|
c.log(DebugLevel, pkg+"negotiating")
|
||||||
|
var buf [256]byte
|
||||||
for !c.isPlaying {
|
for !c.isPlaying {
|
||||||
pkt := packet{}
|
pkt := packet{buf: buf[:]}
|
||||||
err = pkt.readFrom(c)
|
err = pkt.readFrom(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
break
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
switch pkt.packetType {
|
switch pkt.packetType {
|
||||||
|
@ -199,14 +208,10 @@ func connect(c *Conn) error {
|
||||||
default:
|
default:
|
||||||
err = handlePacket(c, &pkt)
|
err = handlePacket(c, &pkt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
break
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !c.isPlaying {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -276,26 +281,18 @@ func sendConnectPacket(c *Conn) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
enc[0] = amf.TypeObject
|
// required link info
|
||||||
enc = enc[1:]
|
info := amf.Object{Properties: []amf.Property{
|
||||||
enc, err = amf.EncodeNamedString(enc, avApp, c.link.app)
|
amf.Property{Type: amf.TypeString, Name: avApp, String: c.link.app},
|
||||||
if err != nil {
|
amf.Property{Type: amf.TypeString, Name: avType, String: avNonprivate},
|
||||||
return err
|
amf.Property{Type: amf.TypeString, Name: avTcUrl, String: c.link.url}},
|
||||||
}
|
}
|
||||||
enc, err = amf.EncodeNamedString(enc, avType, avNonprivate)
|
enc, err = amf.Encode(&info, enc)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
enc, err = amf.EncodeNamedString(enc, avTcUrl, c.link.url)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
enc, err = amf.EncodeInt24(enc, amf.TypeObjectEnd)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// add auth string, if any
|
// optional link auth info
|
||||||
if c.link.auth != "" {
|
if c.link.auth != "" {
|
||||||
enc, err = amf.EncodeBoolean(enc, c.link.flags&linkAuth != 0)
|
enc, err = amf.EncodeBoolean(enc, c.link.flags&linkAuth != 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -243,8 +243,9 @@ func TestFromFile(t *testing.T) {
|
||||||
}
|
}
|
||||||
defer f.Close()
|
defer f.Close()
|
||||||
|
|
||||||
|
rs := &rtmpSender{conn: c}
|
||||||
// Pass RTMP session, true for audio, true for video, and 25 FPS
|
// Pass RTMP session, true for audio, true for video, and 25 FPS
|
||||||
flvEncoder, err := flv.NewEncoder(c, true, true, 25)
|
flvEncoder, err := flv.NewEncoder(rs, true, true, 25)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("failed to create encoder: %v", err)
|
t.Fatalf("failed to create encoder: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -122,7 +122,7 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
psiSndCnt = 7
|
psiInterval = 1 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
// timeLocation holds time and location data
|
// timeLocation holds time and location data
|
||||||
|
@ -199,9 +199,9 @@ type Encoder struct {
|
||||||
tsSpace [PacketSize]byte
|
tsSpace [PacketSize]byte
|
||||||
pesSpace [pes.MaxPesSize]byte
|
pesSpace [pes.MaxPesSize]byte
|
||||||
|
|
||||||
psiCount int
|
|
||||||
|
|
||||||
continuity map[int]byte
|
continuity map[int]byte
|
||||||
|
|
||||||
|
psiLastTime time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewEncoder returns an Encoder with the specified frame rate.
|
// NewEncoder returns an Encoder with the specified frame rate.
|
||||||
|
@ -233,6 +233,15 @@ const (
|
||||||
// generate handles the incoming data and generates equivalent mpegts packets -
|
// generate handles the incoming data and generates equivalent mpegts packets -
|
||||||
// sending them to the output channel.
|
// sending them to the output channel.
|
||||||
func (e *Encoder) Encode(nalu []byte) error {
|
func (e *Encoder) Encode(nalu []byte) error {
|
||||||
|
now := time.Now()
|
||||||
|
if now.Sub(e.psiLastTime) > psiInterval {
|
||||||
|
err := e.writePSI()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
e.psiLastTime = now
|
||||||
|
}
|
||||||
|
|
||||||
// Prepare PES data.
|
// Prepare PES data.
|
||||||
pesPkt := pes.Packet{
|
pesPkt := pes.Packet{
|
||||||
StreamID: streamID,
|
StreamID: streamID,
|
||||||
|
@ -262,13 +271,6 @@ func (e *Encoder) Encode(nalu []byte) error {
|
||||||
pkt.PCR = e.pcr()
|
pkt.PCR = e.pcr()
|
||||||
pusi = false
|
pusi = false
|
||||||
}
|
}
|
||||||
if e.psiCount <= 0 {
|
|
||||||
err := e.writePSI()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
e.psiCount--
|
|
||||||
_, err := e.dst.Write(pkt.Bytes(e.tsSpace[:PacketSize]))
|
_, err := e.dst.Write(pkt.Bytes(e.tsSpace[:PacketSize]))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -318,7 +320,6 @@ func (e *Encoder) writePSI() error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
e.psiCount = psiSndCnt
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -128,19 +128,19 @@ type Packet struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// FindPMT will take a clip of mpegts and try to find a PMT table - if one
|
// FindPMT will take a clip of mpegts and try to find a PMT table - if one
|
||||||
// is found, then it is returned, otherwise nil and an error is returned.
|
// is found, then it is returned along with its index, otherwise nil, -1 and an error is returned.
|
||||||
func FindPMT(d []byte) (p []byte, err error) {
|
func FindPMT(d []byte) (p []byte, i int, err error) {
|
||||||
if len(d) < PacketSize {
|
if len(d) < PacketSize {
|
||||||
return nil, errors.New("Mmpegts data not of valid length")
|
return nil, -1, errors.New("Mmpegts data not of valid length")
|
||||||
}
|
}
|
||||||
for i := 0; i < len(d); i += PacketSize {
|
for i = 0; i < len(d); i += PacketSize {
|
||||||
pid := (uint16(d[i+1]&0x1f) << 8) | uint16(d[i+2])
|
pid := (uint16(d[i+1]&0x1f) << 8) | uint16(d[i+2])
|
||||||
if pid == pmtPid {
|
if pid == pmtPid {
|
||||||
p = d[i+4 : i+PacketSize]
|
p = d[i+4 : i+PacketSize]
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil, errors.New("Could not find pmt table in mpegts data")
|
return nil, -1, errors.New("Could not find pmt table in mpegts data")
|
||||||
}
|
}
|
||||||
|
|
||||||
// FillPayload takes a channel and fills the packets Payload field until the
|
// FillPayload takes a channel and fills the packets Payload field until the
|
||||||
|
|
Loading…
Reference in New Issue