av/revid/RevidInstance.go

653 lines
17 KiB
Go

/*
NAME
revid - a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols.
DESCRIPTION
See Readme.md
AUTHORS
Alan Noble <anoble@gmail.com>
Saxon A. Nelson-Milton <saxon.milton@gmail.com>
LICENSE
revid is Copyright (C) 2017 Alan Noble.
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses).
*/
// revid is a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols.
package revid
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"os/exec"
"strconv"
"time"
//"bitbucket.org/ausocean/av/parser"
//"bitbucket.org/ausocean/av/tsgenerator"
"../parser"
"../tsgenerator"
//"bitbucket.org/ausocean/av/ringbuffer"
//"bitbucket.org/ausocean/utils/smartLogger"
"../../utils/smartLogger"
"../ringbuffer"
)
// Misc constants
const (
clipDuration = 1 // s
mp2tPacketSize = 188 // MPEG-TS packet size
mp2tMaxPackets = 2016 * clipDuration // # first multiple of 7 and 8 greater than 2000
ringBufferSize = 100 / clipDuration
ringBufferElementSize = 10000000
maxClipSize = 100000
httpTimeOut = 5 // s
packetsPerFrame = 7
h264BufferSize = 1000000
bitrateTime = 60
mjpegParserInChanLen = 100000
ffmpegPath = "/home/saxon/bin/ffmpeg"
)
// Log Types
const (
Error = "Error"
Warning = "Warning"
Info = "Info"
Debug = "Debug"
)
// Config provides parameters relevant to a revid instance. A new config must
// be passed to the constructor.
type Config struct {
Input uint8
InputCodec uint8
Output uint8
RtmpEncodingMethod uint8
FramesPerClip uint
RtmpUrl string
Bitrate string
OutputFileName string
InputFileName string
Height string
Width string
FrameRate string
HttpAddress string
Quantization string
Timeout string
Packetization uint8
IntraRefreshPeriod string
Logger smartLogger.LogInstance
}
// Enums for config struct
const (
NothingDefined = 0
Raspivid = 1
Rtp = 2
H264Codec = 3
File = 4
HttpOut = 5
H264 = 6
Mjpeg = 7
None = 8
Mpegts = 9
Rtmp = 10
Ffmpeg = 11
Revid = 12
)
// Default config settings
const (
defaultFrameRate = "25"
defaultWidth = "1280"
defaultHeight = "720"
defaultIntraRefreshPeriod = "100"
defaultTimeout = "0"
defaultQuantization = "35"
defaultBitrate = "0"
)
// RevidInst provides methods to control a revidInst session; providing methods
// to start, stop and change the state of an instance using the Config struct.
type RevidInst interface {
Start()
Stop()
ChangeState(newconfig Config) error
GetConfigRef() *Config
Log(logType, m string)
IsRunning() bool
}
// The revidInst struct provides fields to describe the state of a RevidInst.
type revidInst struct {
ffmpegPath string
tempDir string
ringBuffer ringbuffer.RingBuffer
config Config
isRunning bool
outputFile *os.File
inputFile *os.File
generator tsgenerator.TsGenerator
parser parser.Parser
cmd *exec.Cmd
ffmpegCmd *exec.Cmd
inputReader *bufio.Reader
ffmpegStdin io.WriteCloser
outputChan chan []byte
}
// NewRevidInstance returns a pointer to a new revidInst with the desired
// configuration, and/or an error if construction of the new instant was not
// successful.
func NewRevidInstance(config Config) (r *revidInst, err error) {
r = new(revidInst)
r.ringBuffer = ringbuffer.NewRingBuffer(ringBufferSize, ringBufferElementSize)
err = r.ChangeState(config)
if err != nil {
return nil, err
}
switch r.config.Output {
case File:
r.outputFile, err = os.Create(r.config.OutputFileName)
if err != nil {
return nil, err
}
}
switch r.config.InputCodec {
case H264:
r.Log(Info, "Using H264 parser!")
r.parser = parser.NewH264Parser()
case Mjpeg:
r.parser = parser.NewMJPEGParser(mjpegParserInChanLen)
}
r.outputChan = make(chan []byte, 10000)
switch r.config.Packetization {
case None:
r.parser.SetOutputChan(r.outputChan)
case Mpegts:
frameRateAsInt, _ := strconv.Atoi(r.config.FrameRate)
r.generator = tsgenerator.NewTsGenerator(uint(frameRateAsInt))
r.parser.SetOutputChan(r.generator.GetNalInputChan())
r.generator.Start()
}
r.parser.Start()
go r.packClips()
r.Log(Info, "New revid instance created! config is:")
r.Log(Info, fmt.Sprintf("%v", r.config))
return
}
// GetConfigRef returns a pointer to the revidInst's Config struct object
func (r *revidInst) GetConfigRef() *Config {
return &r.config
}
// ChangeState swaps the current config of a revidInst with the passed
// configuration; checking validity and returning errors if not valid.
func (r *revidInst) ChangeState(config Config) error {
r.config.Logger = config.Logger
switch config.Input {
case Rtp:
case Raspivid:
case File:
case NothingDefined:
r.Log(Warning, "No input type defined, defaulting to raspivid!")
config.Input = Raspivid
default:
return errors.New("Bad input type defined in config!")
}
switch config.InputCodec {
case H264:
if config.Bitrate != "" && config.Quantization != "" {
bitrate, _ := strconv.Atoi(config.Bitrate)
quantization, _ := strconv.Atoi(config.Quantization)
if (bitrate > 0 && quantization > 0) || (bitrate == 0 && quantization == 0) {
return errors.New("Bad bitrate and quantization combination for H264 input!")
}
}
case Mjpeg:
if config.Quantization != "" {
quantization, _ := strconv.Atoi(config.Quantization)
if quantization > 0 || config.Bitrate == "" {
return errors.New("Bad bitrate or quantization for mjpeg input!")
}
}
case NothingDefined:
r.Log(Warning, "No input codec defined, defaulting to h264!")
config.InputCodec = H264
r.Log(Warning, "Defaulting bitrate to 0 and quantization to 35!")
config.Bitrate = defaultBitrate
config.Quantization = defaultQuantization
default:
return errors.New("Bad input codec defined in config!")
}
switch config.Output {
case HttpOut:
case File:
case Rtmp:
switch config.RtmpEncodingMethod {
case Revid:
case Ffmpeg:
case NothingDefined:
r.Log(Warning, "No RTMP encoding method defined, defautling to ffmpeg!")
config.RtmpEncodingMethod = Ffmpeg
}
if config.RtmpUrl == "" {
return errors.New("Bad RTMP URL")
}
case NothingDefined:
r.Log(Warning, "No output defined, defaulting to httpOut!")
config.Output = HttpOut
default:
return errors.New("Bad output type defined in config!")
}
switch config.Packetization {
case None:
case Mpegts:
case NothingDefined:
r.Log(Warning, "No packetization option defined, defaulting to none!")
config.Packetization = None
default:
return errors.New("Bad packetization option defined in config!")
}
switch {
case config.FramesPerClip >= 0:
case config.FramesPerClip == 0:
r.Log(Warning, "No frames per clip defined, defaulting to 1!")
config.FramesPerClip = 1
case config.FramesPerClip < 0:
return errors.New("Bad frames per clip given!")
}
if config.FramesPerClip == 0 {
r.Log(Warning, "No frames per clip define, defaulting to 1!")
config.FramesPerClip = 1
} else {}
if config.Width == "" {
r.Log(Warning, "No width defined, defaulting to 1280!")
config.Width = defaultWidth
} else {
if integer, err := strconv.Atoi(config.Width); integer < 0 || err != nil {
return errors.New("Bad width defined in config!")
}
}
if config.Height == "" {
r.Log(Warning, "No height defined, defaulting to 720!")
config.Height = defaultHeight
} else {
if integer, err := strconv.Atoi(config.Height); integer < 0 || err != nil {
return errors.New("Bad height defined in config!")
}
}
if config.FrameRate == "" {
r.Log(Warning, "No frame rate defined, defaulting to 25!")
config.FrameRate = defaultFrameRate
} else {
if integer, err := strconv.Atoi(config.FrameRate); integer < 0 || err != nil {
return errors.New("Bad frame rate defined in config!")
}
}
if config.Timeout == "" {
r.Log(Warning, "No timeout defined, defaulting to 0!")
config.Timeout = defaultTimeout
} else {
if integer, err := strconv.Atoi(config.Timeout); integer < 0 || err != nil {
return errors.New("Bad timeout defined in config!")
}
}
if config.IntraRefreshPeriod == "" {
r.Log(Warning, "No intra refresh defined, defaulting to 100!")
config.IntraRefreshPeriod = defaultIntraRefreshPeriod
} else {
if integer, err := strconv.Atoi(config.IntraRefreshPeriod); integer < 0 || err != nil {
return errors.New("Bad intra refresh defined in config!")
}
}
if config.Quantization == "" {
r.Log(Warning, "No quantization defined, defaulting to 35!")
config.Quantization = defaultQuantization
} else {
if integer, err := strconv.Atoi(config.Quantization); integer < 0 || integer > 51 || err != nil {
return errors.New("Bad quantization defined in config!")
}
}
r.config = config
return nil
}
// Log takes a logtype and message and tries to send this information to the
// logger provided in the revidInst config - if there is one, otherwise the message
// is sent to stdout
func (r *revidInst) Log(logType, m string) {
if r.config.Logger != nil {
r.config.Logger.Log(logType, m)
} else {
fmt.Println(logType + ": " + m)
}
}
// IsRunning returns true if the revidInst is currently running and false otherwise
func (r *revidInst) IsRunning() bool {
return r.isRunning
}
// Start invokes a revidInst to start processing video from a defined input
// and packetising to a defined output.
func (r *revidInst) Start() {
if r.isRunning {
r.Log(Warning, "revidInst.Start() called but revid already running!")
return
}
r.Log(Info, "Starting Revid!")
// Configure output
switch r.config.Output {
case Rtmp:
r.ffmpegCmd = exec.Command(ffmpegPath,
"-f", "h264",
"-r", r.config.FrameRate,
"-i", "-",
"-f", "lavfi",
"-i", "aevalsrc=0",
"-fflags", "nobuffer",
"-vcodec", "copy",
"-acodec", "aac",
"-map", "0:0",
"-map", "1:0",
"-strict", "experimental",
"-f", "flv",
r.config.RtmpUrl,
)
var err error
r.ffmpegStdin, err = r.ffmpegCmd.StdinPipe()
if err != nil {
r.Log(Error, err.Error())
r.Stop()
return
}
err = r.ffmpegCmd.Start()
if err != nil {
r.Log(Error, err.Error())
r.Stop()
return
}
}
// Configure input
switch r.config.Input {
case Raspivid:
r.Log(Info, "Starting raspivid!")
switch r.config.InputCodec {
case H264:
r.cmd = exec.Command("raspivid",
"-cd", "H264",
"-o", "-",
"-n",
"-t", r.config.Timeout,
"-b", r.config.Bitrate,
"-qp", r.config.Quantization,
"-w", r.config.Width,
"-h", r.config.Height,
"-fps", r.config.FrameRate,
"-ih",
"-g", r.config.IntraRefreshPeriod,
)
case Mjpeg:
r.cmd = exec.Command("raspivid",
"-cd", "MJPEG",
"-o", "-",
"-n",
"-t", r.config.Timeout,
"-fps", r.config.FrameRate,
)
}
stdout, _ := r.cmd.StdoutPipe()
err := r.cmd.Start()
r.inputReader = bufio.NewReader(stdout)
if err != nil {
r.Log(Error, err.Error())
return
}
go r.readCamera()
case File:
go r.readFile()
}
go r.outputClips()
r.isRunning = true
}
// readCamera reads data from the defined camera while the revidInst is running.
func (r *revidInst) readCamera() {
r.Log(Info, "Reading camera data!")
for r.isRunning {
data := make([]byte, 1)
_, err := io.ReadFull(r.inputReader, data)
switch {
case err != nil && err.Error() == "EOF" && r.isRunning:
r.Log(Error, "No data from camera!")
time.Sleep(5 * time.Second)
case err != nil && r.isRunning:
r.Log(Error, err.Error())
default:
r.parser.GetInputChan() <- data[0]
}
}
r.Log(Info, "Out of reading routine!")
}
// readFile reads data from the defined file while the revidInst is running.
func (r *revidInst) readFile() {
for {
if len(r.parser.GetInputChan()) == 0 {
var err error
r.inputFile, err = os.Open(r.config.InputFileName)
if err != nil {
r.Log(Error, err.Error())
r.Stop()
return
}
stats, err := r.inputFile.Stat()
if err != nil {
r.Log(Error, "Could not get input file stats!")
r.Stop()
return
}
data := make([]byte, stats.Size())
_, err = r.inputFile.Read(data)
if err != nil {
r.Log(Error, err.Error())
r.Stop()
return
}
for i := range data {
r.parser.GetInputChan() <- data[i]
}
r.inputFile.Close()
}
}
}
// Stop halts any processing of video data from a camera
func (r *revidInst) Stop() {
if r.isRunning {
r.Log(Info, "Stopping revid!")
r.isRunning = false
if r.cmd != nil {
r.cmd.Process.Kill()
}
} else {
r.Log(Warning, "revidInst.Stop() called but revid not running!")
}
}
// packClips takes data segments; whether that be tsPackets or jpeg frames and
// packs them into clips 1s long.
func (r *revidInst) packClips() {
clipSize := 0
packetCount := 0
now := time.Now()
prevTime := now
for {
if clip, err := r.ringBuffer.Get(); err != nil {
r.Log(Error, err.Error())
switch r.config.Packetization {
case None:
r.Log(Warning, "Clearing mjpeg chan!")
for len(r.outputChan) > 0 {
<-(r.outputChan)
}
case Mpegts:
r.Log(Warning, "Clearing TS chan!")
for len(r.generator.GetTsOutputChan()) > 0 {
<-(r.generator.GetTsOutputChan())
}
}
time.Sleep(1 * time.Second)
} else {
for {
switch r.config.Packetization {
case None:
frame := <-r.outputChan
upperBound := clipSize + len(frame)
copy(clip[clipSize:upperBound], frame)
packetCount++
clipSize += len(frame)
case Mpegts:
tsPacket := <-(r.generator.GetTsOutputChan())
tsByteSlice, err := tsPacket.ToByteSlice()
if err != nil {
r.Log(Error, err.Error())
}
upperBound := clipSize + mp2tPacketSize
copy(clip[clipSize:upperBound], tsByteSlice)
packetCount++
clipSize += mp2tPacketSize
}
// send if (1) our buffer is full or (2) 1 second has elapsed and we have % packetsPerFrame
now = time.Now()
if packetCount > r.config.FramesPerClip {
if err := r.ringBuffer.DoneWriting(clipSize); err != nil {
r.Log(Error, err.Error())
r.Log(Warning, "Dropping clip!")
}
clipSize = 0
packetCount = 0
prevTime = now
break
}
}
}
}
}
// outputClips takes the clips produced in the packClips method and outputs them
// to the desired output defined in the revidInst config
func (r *revidInst) outputClips() {
now := time.Now()
prevTime := now
bytes := 0
delay := 0
for r.isRunning {
// Here we slow things down as much as we can to decrease cpu usage
switch {
case r.ringBuffer.GetNoOfElements() < 2:
delay++
time.Sleep(time.Duration(delay) * time.Millisecond)
case delay > 10:
delay -= 10
}
// If the ringbuffer has something we can read and send off
if clip, err := r.ringBuffer.Read(); err == nil {
r.Log(Debug, fmt.Sprintf("Delay is: %v\n", delay))
r.Log(Debug, fmt.Sprintf("Ring buffer size: %v\n", r.ringBuffer.GetNoOfElements()))
// Output clip to the output specified in the configuration struct
switch r.config.Output {
case File:
r.outputFile.Write(clip)
case HttpOut:
bytes += len(clip)
for err := r.sendClipToHTTP(clip, r.config.HttpAddress); err != nil; {
r.Log(Error, err.Error())
r.Log(Warning, "Post failed trying again!")
err = r.sendClipToHTTP(clip, r.config.HttpAddress)
}
case Rtmp:
fmt.Println("Outputting!")
_, err := r.ffmpegStdin.Write(clip)
if err != nil {
r.Log(Error, err.Error())
}
default:
r.Log(Error, "No output defined!")
}
if err := r.ringBuffer.DoneReading(); err != nil {
r.Log(Error, err.Error())
}
// Log some information regarding bitrate and ring buffer size if it's time
now = time.Now()
deltaTime := now.Sub(prevTime)
if deltaTime > time.Duration(bitrateTime)*time.Second {
r.Log(Info, fmt.Sprintf("Bitrate: %v bits/s\n", int64(float64(bytes*8)/float64(deltaTime/1e9))))
r.Log(Info, fmt.Sprintf("Ring buffer size: %v\n", r.ringBuffer.GetNoOfElements()))
prevTime = now
bytes = 0
}
}
}
}
// sendClipToHTTP takes a clip and an output url and posts through http.
func (r *revidInst) sendClipToHTTP(clip []byte, output string) error {
timeout := time.Duration(httpTimeOut * time.Second)
client := http.Client{
Timeout: timeout,
}
url := output + strconv.Itoa(len(clip))
r.Log(Debug, fmt.Sprintf("Posting %s (%d bytes)\n", url, len(clip)))
resp, err := client.Post(url, "video/mp2t", bytes.NewReader(clip)) // lighter than NewBuffer
if err != nil {
return fmt.Errorf("Error posting to %s: %s", output, err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err == nil {
r.Log(Debug, fmt.Sprintf("%s\n", body))
} else {
r.Log(Error, err.Error())
}
return nil
}