av/revid/Revid.go

705 lines
19 KiB
Go

/*
NAME
Revid.go
DESCRIPTION
See Readme.md
AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org>
Alan Noble <alan@ausocean.org>
LICENSE
revid is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses).
*/
// revid is a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols.
package revid
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"os/exec"
"path/filepath"
"strconv"
"sync"
"time"
"bitbucket.org/ausocean/av/generator"
"bitbucket.org/ausocean/av/parser"
"bitbucket.org/ausocean/av/ringbuffer"
"bitbucket.org/ausocean/av/rtmp"
)
// Misc constants
const (
clipDuration = 1 // s
mp2tPacketSize = 188 // MPEG-TS packet size
mp2tMaxPackets = 2016 * clipDuration // # first multiple of 7 and 8 greater than 2000
ringBufferSize = 500
ringBufferElementSize = 150000
httpTimeOut = 5 // s
packetsPerFrame = 7
bitrateTime = 60 // s
mjpegParserInChanLen = 100000
ffmpegPath = "/usr/local/bin/ffmpeg"
rtmpConnectionTimout = 10
outputChanSize = 1000
cameraRetryPeriod = 5 * time.Second
sendFailedDelay = 5
maxSendFailedErrorCount = 500
clipSizeThreshold = 11
rtmpConnectionMaxTries = 5
raspividNoOfTries = 3
sendingWaitTime = time.Duration(5) * time.Millisecond
)
// Log Types
const (
Error = "Error"
Warning = "Warning"
Info = "Info"
Debug = "Debug"
Detail = "Detail"
)
// Revid provides methods to control a revid session; providing methods
// to start, stop and change the state of an instance using the Config struct.
type Revid interface {
Start()
Stop()
changeState(newconfig Config) error
GetConfigRef() *Config
Log(logType, m string)
IsRunning() bool
GetBitrate() int64
}
// The revid struct provides fields to describe the state of a Revid.
type revid struct {
ffmpegPath string
tempDir string
ringBuffer ringbuffer.RingBuffer
config Config
isRunning bool
outputFile *os.File
inputFile *os.File
generator generator.Generator
parser parser.Parser
cmd *exec.Cmd
ffmpegCmd *exec.Cmd
inputReader *bufio.Reader
ffmpegStdin io.WriteCloser
outputChan chan []byte
setupInput func() error
setupOutput func() error
getFrame func() []byte
sendClip func(clip []byte) error
rtmpInst rtmp.RTMPSession
mutex sync.Mutex
sendMutex sync.Mutex
currentBitrate int64
}
// NewRevid returns a pointer to a new revid with the desired
// configuration, and/or an error if construction of the new instant was not
// successful.
func NewRevid(config Config) (r *revid, err error) {
r = new(revid)
r.mutex = sync.Mutex{}
r.sendMutex = sync.Mutex{}
r.ringBuffer = ringbuffer.NewRingBuffer(ringBufferSize, ringBufferElementSize)
err = r.changeState(config)
if err != nil {
r = nil
return
}
r.outputChan = make(chan []byte, outputChanSize)
r.isRunning = false
return
}
// Returns the currently saved bitrate from the most recent bitrate check
// check bitrate output delay in consts for this period
func (r *revid) GetBitrate() int64 {
return r.currentBitrate
}
// GetConfigRef returns a pointer to the revidInst's Config struct object
func (r *revid) GetConfigRef() *Config {
return &r.config
}
// changeState swaps the current config of a revid with the passed
// configuration; checking validity and returning errors if not valid.
func (r *revid) changeState(config Config) error {
r.config.Logger = config.Logger
err := config.Validate(r)
if err != nil {
return errors.New("Config struct is bad!: " + err.Error())
}
r.config = config
switch r.config.Output {
case File:
r.sendClip = r.sendClipToFile
r.setupOutput = r.setupOutputForFile
case FfmpegRtmp:
r.setupOutput = r.setupOutputForFfmpegRtmp
r.sendClip = r.sendClipToFfmpegRtmp
case NativeRtmp:
r.setupOutput = r.setupOutputForLibRtmp
r.sendClip = r.sendClipToLibRtmp
case Http:
r.sendClip = r.sendClipToHTTP
}
switch r.config.Input {
case Raspivid:
r.setupInput = r.setupInputForRaspivid
case File:
r.setupInput = r.setupInputForFile
}
switch r.config.InputCodec {
case H264:
r.Log(Info, "Using H264 parser!")
r.parser = parser.NewH264Parser()
case Mjpeg:
r.Log(Info, "Using MJPEG parser!")
r.parser = parser.NewMJPEGParser(mjpegParserInChanLen)
}
switch r.config.Packetization {
case None:
// no packetisation - Revid output chan grabs raw data straight from parser
r.parser.SetOutputChan(r.outputChan)
r.getFrame = r.getFrameNoPacketization
goto noPacketizationSetup
case Mpegts:
r.Log(Info, "Using MPEGTS packetisation!")
frameRateAsInt, _ := strconv.Atoi(r.config.FrameRate)
r.generator = generator.NewTsGenerator(uint(frameRateAsInt))
case Flv:
r.Log(Info, "Using FLV packetisation!")
frameRateAsInt, _ := strconv.Atoi(r.config.FrameRate)
r.generator = generator.NewFlvGenerator(true, true, uint(frameRateAsInt))
}
// We have packetization of some sort, so we want to send data to Generator
// to perform packetization
r.getFrame = r.getFramePacketization
r.parser.SetOutputChan(r.generator.GetInputChan())
noPacketizationSetup:
return nil
}
// ChangeConfig changes the current configuration of the revid instance.
func (r *revid) ChangeConfig(config Config) (err error) {
r.Stop()
r, err = NewRevid(config)
if err != nil {
return
}
r.Start()
return
}
// Log takes a logtype and message and tries to send this information to the
// logger provided in the revid config - if there is one, otherwise the message
// is sent to stdout
func (r *revid) Log(logType, m string) {
if r.config.Verbosity == Yes {
if r.config.Logger != nil {
r.config.Logger.Log(logType, m)
return
}
fmt.Println(logType + ": " + m)
}
}
// IsRunning returns true if the revid is currently running and false otherwise
func (r *revid) IsRunning() bool {
return r.isRunning
}
// Start invokes a revid to start processing video from a defined input
// and packetising (if theres packetization) to a defined output.
func (r *revid) Start() {
r.mutex.Lock()
defer r.mutex.Unlock()
if r.isRunning {
r.Log(Warning, "revid.Start() called but revid already running!")
return
}
r.Log(Info, "Starting Revid!")
r.Log(Debug, "Setting up output!")
if r.setupOutput != nil {
err := r.setupOutput()
if err != nil {
r.Log(Error, err.Error())
return
}
}
r.isRunning = true
r.Log(Info, "Starting output routine!")
go r.outputClips()
r.Log(Info, "Starting clip packing routine!")
go r.packClips()
r.Log(Info, "Starting packetisation generator!")
r.generator.Start()
r.Log(Info, "Starting parser!")
r.parser.Start()
r.Log(Info, "Setting up input and receiving content!")
go r.setupInput()
}
// Stop halts any processing of video data from a camera or file
func (r *revid) Stop() {
r.mutex.Lock()
defer r.mutex.Unlock()
if !r.isRunning {
r.Log(Warning, "revid.Stop() called but revid not running!")
return
}
r.Log(Info, "Stopping revid!")
// wait for sending to finish
r.sendMutex.Lock()
defer r.sendMutex.Unlock()
r.rtmpInst.EndSession()
r.isRunning = false
r.Log(Info, "Stopping generator!")
if r.generator != nil {
r.generator.Stop()
}
r.Log(Info, "Stopping parser!")
if r.parser != nil {
r.parser.Stop()
}
r.Log(Info, "Killing input proccess!")
// If a cmd process is running, we kill!
if r.cmd != nil && r.cmd.Process != nil {
r.cmd.Process.Kill()
}
}
// getFrameNoPacketization gets a frame directly from the revid output chan
// as we don't need to go through the generator with no packetization settings
func (r *revid) getFrameNoPacketization() []byte {
return <-r.outputChan
}
// getFramePacketization gets a frame from the generators output chan - the
// the generator being an mpegts or flv generator depending on the config
func (r *revid) getFramePacketization() []byte {
return <-(r.generator.GetOutputChan())
}
// flushDataPacketization removes data from the revid inst's coutput chan
func (r *revid) flushData() {
switch r.config.Packetization {
case Flv:
for {
select {
case <-(r.generator.GetOutputChan()):
default:
goto done
}
}
}
done:
}
// packClips takes data segments; whether that be tsPackets or mjpeg frames and
// packs them into clips consisting of the amount frames specified in the config
func (r *revid) packClips() {
clipSize := 0
packetCount := 0
for r.isRunning {
// Get some memory from the ring buffer for out clip
var clip []byte
var err error
if clip, err = r.ringBuffer.Get(); err != nil && r.isRunning {
r.Log(Error, err.Error())
r.Log(Warning, "Clearing output chan!")
for clip, err = r.ringBuffer.Get(); err != nil && r.isRunning; {
time.Sleep(time.Duration(10) * time.Millisecond)
clip, err = r.ringBuffer.Get()
}
r.Log(Debug, "Finally got mem from ringbuffer!")
}
for r.isRunning {
select {
// TODO: This is temporary, need to work out how to make this work
// for cases when there is not packetisation.
case frame := <-(r.generator.GetOutputChan()):
lenOfFrame := len(frame)
if lenOfFrame > ringBufferElementSize {
r.Log(Warning, fmt.Sprintf("Frame was too big: %v bytes, getting another one!", lenOfFrame))
frame = r.getFrame()
lenOfFrame = len(frame)
}
upperBound := clipSize + lenOfFrame
copy(clip[clipSize:upperBound], frame)
packetCount++
clipSize += lenOfFrame
fpcAsInt, err := strconv.Atoi(r.config.FramesPerClip)
if err != nil {
r.Log(Error, "Frames per clip not quite right! Defaulting to 1!")
r.config.FramesPerClip = "1"
}
if packetCount >= fpcAsInt {
if err := r.ringBuffer.DoneWriting(clipSize); err != nil {
r.Log(Error, err.Error())
r.Log(Warning, "Dropping clip!")
}
clipSize = 0
packetCount = 0
goto finishedWithClip
}
default:
time.Sleep(time.Duration(5) * time.Millisecond)
}
}
finishedWithClip:
}
}
// outputClips takes the clips produced in the packClips method and outputs them
// to the desired output defined in the revid config
func (r *revid) outputClips() {
now := time.Now()
prevTime := now
bytes := 0
delay := 0
for r.isRunning {
// Here we slow things down as much as we can to decrease cpu usage
switch {
case r.ringBuffer.GetNoOfElements() < 2:
delay++
time.Sleep(time.Duration(delay) * time.Millisecond)
case delay > 0:
delay--
}
// If the ringbuffer has something we can read and send off
if clip, err := r.ringBuffer.Read(); err == nil && r.isRunning {
bytes += len(clip)
r.Log(Detail, "About to send")
err2 := r.sendClip(clip)
if err2 == nil {
r.Log(Detail, "Sent clip")
}
if r.isRunning && err2 != nil && len(clip) > 11 {
r.Log(Debug, "Send failed! Trying again")
// Try and send again
err2 = r.sendClip(clip)
// if there's still an error we try and reconnect, unless we're stopping
for r.isRunning && err2 != nil {
r.Log(Debug, "Send failed a again! Trying to reconnect...")
time.Sleep(time.Duration(sendFailedDelay) * time.Millisecond)
r.Log(Error, err2.Error())
if r.config.Output == NativeRtmp {
r.Log(Debug, "Ending current rtmp session...")
r.rtmpInst.EndSession()
}
if r.ringBuffer.Full() {
r.Log(Debug, "Flushing incoming data...")
r.flushData()
}
if r.config.Output == NativeRtmp {
r.Log(Info, "Restarting rtmp session...")
r.rtmpInst.StartSession()
}
r.Log(Debug, "Trying to send again with new connection...")
// and if the ring buffer is full then we flush the incoming data
err2 = r.sendClip(clip)
}
}
r.Log(Detail, "Done reading that clip from ringbuffer...")
// let the ringbuffer know that we're done with the memory we grabbed when
// we call ringBuffer.Get()
if err := r.ringBuffer.DoneReading(); err != nil {
r.Log(Error, err.Error())
}
// Log some information regarding bitrate and ring buffer size if it's time
now = time.Now()
deltaTime := now.Sub(prevTime)
if deltaTime > time.Duration(bitrateTime)*time.Second {
r.currentBitrate = int64(float64(bytes*8) / float64(deltaTime/1e9))
r.Log(Debug, fmt.Sprintf("Bitrate: %v bits/s\n", r.currentBitrate))
r.Log(Debug, fmt.Sprintf("Ring buffer size: %v\n", r.ringBuffer.GetNoOfElements()))
prevTime = now
bytes = 0
}
}
}
r.Log(Info, "Not outputting clips anymore!")
}
// senClipToFile writes the passed clip to a file
func (r *revid) sendClipToFile(clip []byte) error {
r.sendMutex.Lock()
defer r.sendMutex.Unlock()
_, err := r.outputFile.Write(clip)
if err != nil {
return err
}
return nil
}
// sendClipToHTTP takes a clip and an output url and posts through http.
func (r *revid) sendClipToHTTP(clip []byte) error {
r.sendMutex.Lock()
defer r.sendMutex.Unlock()
timeout := time.Duration(httpTimeOut * time.Second)
client := http.Client{Timeout: timeout}
url := r.config.HttpAddress + strconv.Itoa(len(clip))
r.Log(Debug, fmt.Sprintf("Posting %s (%d bytes)\n", url, len(clip)))
resp, err := client.Post(url, "video/mp2t", bytes.NewReader(clip))
if err != nil {
return fmt.Errorf("Error posting to %s: %s", url, err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err == nil {
r.Log(Debug, fmt.Sprintf("%s\n", body))
} else {
r.Log(Error, err.Error())
}
return nil
}
// sendClipToFfmpegRtmp sends the clip over the current rtmp connection using
// an ffmpeg process.
func (r *revid) sendClipToFfmpegRtmp(clip []byte) (err error) {
r.sendMutex.Lock()
defer r.sendMutex.Unlock()
_, err = r.ffmpegStdin.Write(clip)
return
}
// sendClipToLibRtmp send the clip over the current rtmp connection using the
// c based librtmp library
func (r *revid) sendClipToLibRtmp(clip []byte) (err error) {
r.sendMutex.Lock()
defer r.sendMutex.Unlock()
err = r.rtmpInst.WriteFrame(clip, uint(len(clip)))
return
}
// setupOutputForFfmpegRtmp sets up output to rtmp using an ffmpeg process
func (r *revid) setupOutputForFfmpegRtmp() error {
r.ffmpegCmd = exec.Command(ffmpegPath,
"-f", "h264",
"-r", r.config.FrameRate,
"-i", "-",
"-f", "lavfi",
"-i", "aevalsrc=0",
"-fflags", "nobuffer",
"-vcodec", "copy",
"-acodec", "aac",
"-map", "0:0",
"-map", "1:0",
"-strict", "experimental",
"-f", "flv",
r.config.RtmpUrl,
)
var err error
r.ffmpegStdin, err = r.ffmpegCmd.StdinPipe()
if err != nil {
r.Log(Error, err.Error())
r.Stop()
return err
}
err = r.ffmpegCmd.Start()
if err != nil {
r.Log(Error, err.Error())
r.Stop()
return err
}
return nil
}
// setupOutputForLibRtmp sets up rtmp output using the wrapper for the c based
// librtmp library - makes connection and starts comms etc.
func (r *revid) setupOutputForLibRtmp() error {
r.rtmpInst = rtmp.NewRTMPSession(r.config.RtmpUrl, rtmpConnectionTimout)
err := r.rtmpInst.StartSession()
for noOfTries := 0; err != nil && noOfTries < rtmpConnectionMaxTries; noOfTries++ {
r.rtmpInst.EndSession()
r.Log(Error, err.Error())
r.Log(Info, "Trying to establish rtmp connection again!")
r.rtmpInst = rtmp.NewRTMPSession(r.config.RtmpUrl, rtmpConnectionTimout)
err = r.rtmpInst.StartSession()
}
if err != nil {
return err
}
return err
}
// setupOutputForFile sets up an output file to output data to
func (r *revid) setupOutputForFile() (err error) {
r.outputFile, err = os.Create(r.config.OutputFileName)
return
}
// setupInputForRaspivid sets up things for input from raspivid i.e. starts
// a raspivid process and pipes it's data output.
func (r *revid) setupInputForRaspivid() error {
r.Log(Info, "Starting raspivid!")
switch r.config.InputCodec {
case H264:
arguments := []string{"-cd", "H264",
"-o", "-",
"-n",
"-t", r.config.Timeout,
"-b", r.config.Bitrate,
"-w", r.config.Width,
"-h", r.config.Height,
"-fps", r.config.FrameRate,
"-ih",
"-g", r.config.IntraRefreshPeriod,
}
if r.config.QuantizationMode == QuantizationOn {
arguments = append(arguments, "-qp")
arguments = append(arguments, r.config.Quantization)
}
if r.config.HorizontalFlip == Yes {
arguments = append(arguments, "-hf")
}
if r.config.VerticalFlip == Yes {
arguments = append(arguments, "-vf")
}
name := "raspivid"
r.cmd = &exec.Cmd{
Path: name,
Args: append([]string{name}, arguments...),
}
r.Log(Info, fmt.Sprintf("Startin raspivid with args: %v", r.cmd.Args))
if filepath.Base(name) == name {
if lp, err := exec.LookPath(name); err != nil {
r.Log(Error, err.Error())
return err
} else {
r.cmd.Path = lp
}
}
case Mjpeg:
r.cmd = exec.Command("raspivid",
"-cd", "MJPEG",
"-o", "-",
"-n",
"-t", r.config.Timeout,
"-fps", r.config.FrameRate,
)
}
stdout, _ := r.cmd.StdoutPipe()
go r.cmd.Run()
r.inputReader = bufio.NewReader(stdout)
go r.readCamera()
return nil
}
// setupInputForFile sets things up for getting input from a file
func (r *revid) setupInputForFile() error {
fps, _ := strconv.Atoi(r.config.FrameRate)
r.parser.SetDelay(uint(float64(1000) / float64(fps)))
r.readFile()
return nil
}
// testRtmp is useful to check robustness of connections. Intended to be run as
// goroutine. After every 'delayTime' the rtmp connection is ended and then
// restarted
func (r *revid) testRtmp(delayTime uint) {
for {
time.Sleep(time.Duration(delayTime) * time.Millisecond)
r.rtmpInst.EndSession()
r.rtmpInst.StartSession()
}
}
// readCamera reads data from the defined camera while the revid is running.
// TODO: use ringbuffer here instead of allocating mem every time!
func (r *revid) readCamera() {
r.Log(Info, "Reading camera data!")
for r.isRunning {
data := make([]byte, 1)
_, err := io.ReadFull(r.inputReader, data)
switch {
// We know this means we're getting nothing from the cam
case (err != nil && err.Error() == "EOF" && r.isRunning) || (err != nil && r.isRunning):
r.Log(Error, "No data from camera!")
time.Sleep(cameraRetryPeriod)
default:
r.parser.GetInputChan() <- data[0]
}
}
r.Log(Info, "Not trying to read from camera anymore!")
}
// readFile reads data from the defined file while the revid is running.
func (r *revid) readFile() error {
var err error
r.inputFile, err = os.Open(r.config.InputFileName)
if err != nil {
r.Log(Error, err.Error())
r.Stop()
return err
}
stats, err := r.inputFile.Stat()
if err != nil {
r.Log(Error, "Could not get input file stats!")
r.Stop()
return err
}
data := make([]byte, stats.Size())
_, err = r.inputFile.Read(data)
if err != nil {
r.Log(Error, err.Error())
r.Stop()
return err
}
for i := range data {
r.parser.GetInputChan() <- data[i]
}
r.inputFile.Close()
return nil
}