av/revid/RevidInstance.go

686 lines
18 KiB
Go

/*
NAME
revid - a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols.
DESCRIPTION
See Readme.md
AUTHORS
Alan Noble <anoble@gmail.com>
Saxon A. Nelson-Milton <saxon.milton@gmail.com>
LICENSE
revid is Copyright (C) 2017 Alan Noble.
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses).
*/
// revid is a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols.
package revid
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"os/exec"
"strconv"
"time"
//"bitbucket.org/ausocean/av/parser"
//"bitbucket.org/ausocean/av/tsgenerator"
"../parser"
"../tsgenerator"
//"bitbucket.org/ausocean/av/ringbuffer"
//"bitbucket.org/ausocean/utils/smartLogger"
"../../utils/smartLogger"
"../ringbuffer"
)
// Misc constants
const (
clipDuration = 1 // s
mp2tPacketSize = 188 // MPEG-TS packet size
mp2tMaxPackets = 2016 * clipDuration // # first multiple of 7 and 8 greater than 2000
ringBufferSize = 100 / clipDuration
ringBufferElementSize = 10000000
maxClipSize = 100000
httpTimeOut = 5 // s
packetsPerFrame = 7
h264BufferSize = 1000000
bitrateTime = 60
mjpegParserInChanLen = 100000
ffmpegPath = "/home/saxon/bin/ffmpeg"
)
// Log Types
const (
Error = "Error"
Warning = "Warning"
Info = "Info"
Debug = "Debug"
)
// Config provides parameters relevant to a revid instance. A new config must
// be passed to the constructor.
type Config struct {
Input uint8
InputCodec uint8
Output uint8
RtmpEncodingMethod uint8
FramesPerClip uint
RtmpUrl string
Bitrate string
OutputFileName string
InputFileName string
Height string
Width string
FrameRate string
HttpAddress string
Quantization string
Timeout string
Packetization uint8
IntraRefreshPeriod string
Logger smartLogger.LogInstance
}
// Enums for config struct
const (
NothingDefined = 0
Raspivid = 1
Rtp = 2
H264Codec = 3
File = 4
HttpOut = 5
H264 = 6
Mjpeg = 7
None = 8
Mpegts = 9
Rtmp = 10
Ffmpeg = 11
Revid = 12
Flv = 13
)
// Default config settings
const (
defaultFrameRate = "25"
defaultWidth = "1280"
defaultHeight = "720"
defaultIntraRefreshPeriod = "100"
defaultTimeout = "0"
defaultQuantization = "35"
defaultBitrate = "0"
)
// RevidInst provides methods to control a revidInst session; providing methods
// to start, stop and change the state of an instance using the Config struct.
type RevidInst interface {
Start()
Stop()
ChangeState(newconfig Config) error
GetConfigRef() *Config
Log(logType, m string)
IsRunning() bool
}
// The revidInst struct provides fields to describe the state of a RevidInst.
type revidInst struct {
ffmpegPath string
tempDir string
ringBuffer ringbuffer.RingBuffer
config Config
isRunning bool
outputFile *os.File
inputFile *os.File
generator generator.TsGenerator
parser parser.Parser
cmd *exec.Cmd
ffmpegCmd *exec.Cmd
inputReader *bufio.Reader
ffmpegStdin io.WriteCloser
outputChan chan []byte
configureOutput func()
getFrame func()[]byte
flushData func()
}
// NewRevidInstance returns a pointer to a new revidInst with the desired
// configuration, and/or an error if construction of the new instant was not
// successful.
func NewRevidInstance(config Config) (r *revidInst, err error) {
r = new(revidInst)
r.ringBuffer = ringbuffer.NewRingBuffer(ringBufferSize, ringBufferElementSize)
err = r.ChangeState(config)
if err != nil {
return nil, err
}
r.outputChan = make(chan []byte, 10000)
r.parser.Start()
go r.packClips()
r.Log(Info, "New revid instance created! config is:")
r.Log(Info, fmt.Sprintf("%v", r.config))
return
}
// GetConfigRef returns a pointer to the revidInst's Config struct object
func (r *revidInst) GetConfigRef() *Config {
return &r.config
}
// ChangeState swaps the current config of a revidInst with the passed
// configuration; checking validity and returning errors if not valid.
func (r *revidInst) ChangeState(config Config) error {
r.config.Logger = config.Logger
err := checkConfig(config)
if err != nil {
return errors.New("Config struct is bad!: " + err.Error())
}
switch r.config.Output {
case File:
r.outputFile, err = os.Create(r.config.OutputFileName)
if err != nil {
return nil, err
}
configureOutput = configureOutputForFile
case Rtmp:
configureOutput = configureOutputForRtmp
}
switch r.config.Input {
case Raspivid:
configureInput = configureInputForRaspivid
case File:
configureInput = configureInputForFile
}
switch r.config.InputCodec {
case H264:
r.Log(Info, "Using H264 parser!")
r.parser = parser.NewH264Parser()
case Mjpeg:
r.Log(Info, "Using MJPEG parser!")
r.parser = parser.NewMJPEGParser(mjpegParserInChanLen)
}
switch r.config.Packetization {
case None:
r.parser.SetOutputChan(r.outputChan)
getFrame = getFrameForNoPacketization
case Mpegts:
frameRateAsInt, _ := strconv.Atoi(r.config.FrameRate)
r.generator = tsgenerator.NewTsGenerator(uint(frameRateAsInt))
r.parser.SetOutputChan(r.generator.GetNalInputChan())
r.generator.Start()
getFrame = getFrameForMpegtsPacketization
}
r.config = config
return nil
}
// Log takes a logtype and message and tries to send this information to the
// logger provided in the revidInst config - if there is one, otherwise the message
// is sent to stdout
func (r *revidInst) Log(logType, m string) {
if r.config.Logger != nil {
r.config.Logger.Log(logType, m)
} else {
fmt.Println(logType + ": " + m)
}
}
// IsRunning returns true if the revidInst is currently running and false otherwise
func (r *revidInst) IsRunning() bool {
return r.isRunning
}
// Start invokes a revidInst to start processing video from a defined input
// and packetising to a defined output.
func (r *revidInst) Start() {
if r.isRunning {
r.Log(Warning, "revidInst.Start() called but revid already running!")
return
}
r.Log(Info, "Starting Revid!")
r.setupOutput()
r.setupInput()
go r.outputClips()
r.isRunning = true
}
// Stop halts any processing of video data from a camera or file
func (r *revidInst) Stop() {
if r.isRunning {
r.Log(Info, "Stopping revid!")
r.isRunning = false
if r.cmd != nil {
r.cmd.Process.Kill()
}
} else {
r.Log(Warning, "revidInst.Stop() called but revid not running!")
}
}
// Start invokes a revidInst to start processing video from a defined input
// and packetising to a defined output.
func (r *revidInst) getFrameNoPacketization() []byte {
return <-r.outputChan
}
// Start invokes a revidInst to start processing video from a defined input
// and packetising to a defined output.
func (r *revidInst) getFrameWithPacketization() []byte {
return <-(r.generator.GetOutputChan())
}
// Start invokes a revidInst to start processing video from a defined input
// and packetising to a defined output.
func (r *revidInst) flushDataNoPacketisation(){
for len(r.outputChan) > 0 {
<-(r.outputChan)
}
}
// Start invokes a revidInst to start processing video from a defined input
func (r *revidInst) flushDataMpegtsPacketisation(){
for len(r.generator.GetOutputChan()) > 0 {
<-(r.generator.GetTsOutputChan())
}
}
// packClips takes data segments; whether that be tsPackets or jpeg frames and
// packs them into clips 1s long.
func (r *revidInst) packClips() {
clipSize := 0
packetCount := 0
now := time.Now()
prevTime := now
for {
if clip, err := r.ringBuffer.Get(); err != nil {
r.Log(Error, err.Error())
r.Log(Warning, "Clearing output chan!")
r.flushOutputData()
} else {
for {
frame := r.getFrame()
lenOfFrame := len(frame)
upperBound := clipSize + lenOfFrame
copy(clip[clipSize:upperBound], frame)
packetCount++
clipSize += lenOfFrame
now = time.Now()
if packetCount >= r.config.FramesPerClip {
if err := r.ringBuffer.DoneWriting(clipSize); err != nil {
r.Log(Error, err.Error())
r.Log(Warning, "Dropping clip!")
}
clipSize = 0
packetCount = 0
prevTime = now
break
}
}
}
}
}
// outputClips takes the clips produced in the packClips method and outputs them
// to the desired output defined in the revidInst config
func (r *revidInst) outputClips() {
now := time.Now()
prevTime := now
bytes := 0
delay := 0
for r.isRunning {
// Here we slow things down as much as we can to decrease cpu usage
switch {
case r.ringBuffer.GetNoOfElements() < 2:
delay++
time.Sleep(time.Duration(delay) * time.Millisecond)
case delay > 10:
delay -= 10
}
// If the ringbuffer has something we can read and send off
if clip, err := r.ringBuffer.Read(); err == nil {
r.Log(Debug, fmt.Sprintf("Delay is: %v\n", delay))
r.Log(Debug, fmt.Sprintf("Ring buffer size: %v\n", r.ringBuffer.GetNoOfElements()))
// Output clip to the output specified in the configuration struct
bytes += len(clip)
for err := r.sendClip(clip); err != nil; {
r.Log(Error, err.Error())
r.Log(Warning, "Send failed trying again!")
err = r.sendClip(clip)
}
if err := r.ringBuffer.DoneReading(); err != nil {
r.Log(Error, err.Error())
}
// Log some information regarding bitrate and ring buffer size if it's time
now = time.Now()
deltaTime := now.Sub(prevTime)
if deltaTime > time.Duration(bitrateTime)*time.Second {
r.Log(Info, fmt.Sprintf("Bitrate: %v bits/s\n", int64(float64(bytes*8)/float64(deltaTime/1e9))))
r.Log(Info, fmt.Sprintf("Ring buffer size: %v\n", r.ringBuffer.GetNoOfElements()))
prevTime = now
bytes = 0
}
}
}
}
// Start invokes a revidInst to start processing video from a defined input
// and packetising to a defined output.
func (r *revidInst) sendClipToFile(clip []byte) error {
err := r.outputFile.Write(clip)
if err != nil {
return err
}
return nil
}
// sendClipToHTTP takes a clip and an output url and posts through http.
func (r *revidInst) sendClipToHTTP(clip []byte) error {
timeout := time.Duration(httpTimeOut * time.Second)
client := http.Client{
Timeout: timeout,
}
r.Log(Debug, fmt.Sprintf("Posting %s (%d bytes)\n", url, len(clip)))
resp, err := client.Post(r.config.HttpAddress + strconv.Itoa(len(clip)), "video/mp2t", bytes.NewReader(clip)) // lighter than NewBuffer
if err != nil {
return fmt.Errorf("Error posting to %s: %s", output, err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err == nil {
r.Log(Debug, fmt.Sprintf("%s\n", body))
} else {
r.Log(Error, err.Error())
}
return nil
}
// Start invokes a revidInst to start processing video from a defined input
// and packetising to a defined output.
func (r *revidInst) sendClipToRtmp(clip []byte) error {
fmt.Println("Outputting!")
_, err := r.ffmpegStdin.Write(clip)
if err != nil {
return err
}
return nil
}
// checkConfig checks for any errors in the config files and defaults settings
// if particular parameters have not been defined.
func (r *revidInst) checkConfig(config Config) error {
switch config.Input {
case Rtp:
case Raspivid:
case File:
case NothingDefined:
r.Log(Warning, "No input type defined, defaulting to raspivid!")
config.Input = Raspivid
default:
return errors.New("Bad input type defined in config!")
}
switch config.InputCodec {
case H264:
if config.Bitrate != "" && config.Quantization != "" {
bitrate, _ := strconv.Atoi(config.Bitrate)
quantization, _ := strconv.Atoi(config.Quantization)
if (bitrate > 0 && quantization > 0) || (bitrate == 0 && quantization == 0) {
return errors.New("Bad bitrate and quantization combination for H264 input!")
}
}
case Mjpeg:
if config.Quantization != "" {
quantization, _ := strconv.Atoi(config.Quantization)
if quantization > 0 || config.Bitrate == "" {
return errors.New("Bad bitrate or quantization for mjpeg input!")
}
}
case NothingDefined:
r.Log(Warning, "No input codec defined, defaulting to h264!")
config.InputCodec = H264
r.Log(Warning, "Defaulting bitrate to 0 and quantization to 35!")
config.Bitrate = defaultBitrate
config.Quantization = defaultQuantization
default:
return errors.New("Bad input codec defined in config!")
}
switch config.Output {
case HttpOut:
case File:
case Rtmp:
switch config.RtmpEncodingMethod {
case Revid:
case Ffmpeg:
case NothingDefined:
r.Log(Warning, "No RTMP encoding method defined, defautling to ffmpeg!")
config.RtmpEncodingMethod = Ffmpeg
}
if config.RtmpUrl == "" {
return errors.New("Bad RTMP URL")
}
case NothingDefined:
r.Log(Warning, "No output defined, defaulting to httpOut!")
config.Output = HttpOut
default:
return errors.New("Bad output type defined in config!")
}
switch config.Packetization {
case None:
case Mpegts:
case NothingDefined:
r.Log(Warning, "No packetization option defined, defaulting to none!")
config.Packetization = None
default:
return errors.New("Bad packetization option defined in config!")
}
switch {
case config.FramesPerClip > 0:
case config.FramesPerClip == 0:
r.Log(Warning, "No frames per clip defined, defaulting to 1!")
config.FramesPerClip = 1
case config.FramesPerClip < 0:
return errors.New("Bad frames per clip given!")
}
if config.Width == "" {
r.Log(Warning, "No width defined, defaulting to 1280!")
config.Width = defaultWidth
} else {
if integer, err := strconv.Atoi(config.Width); integer < 0 || err != nil {
return errors.New("Bad width defined in config!")
}
}
if config.Height == "" {
r.Log(Warning, "No height defined, defaulting to 720!")
config.Height = defaultHeight
} else {
if integer, err := strconv.Atoi(config.Height); integer < 0 || err != nil {
return errors.New("Bad height defined in config!")
}
}
if config.FrameRate == "" {
r.Log(Warning, "No frame rate defined, defaulting to 25!")
config.FrameRate = defaultFrameRate
} else {
if integer, err := strconv.Atoi(config.FrameRate); integer < 0 || err != nil {
return errors.New("Bad frame rate defined in config!")
}
}
if config.Timeout == "" {
r.Log(Warning, "No timeout defined, defaulting to 0!")
config.Timeout = defaultTimeout
} else {
if integer, err := strconv.Atoi(config.Timeout); integer < 0 || err != nil {
return errors.New("Bad timeout defined in config!")
}
}
if config.IntraRefreshPeriod == "" {
r.Log(Warning, "No intra refresh defined, defaulting to 100!")
config.IntraRefreshPeriod = defaultIntraRefreshPeriod
} else {
if integer, err := strconv.Atoi(config.IntraRefreshPeriod); integer < 0 || err != nil {
return errors.New("Bad intra refresh defined in config!")
}
}
if config.Quantization == "" {
r.Log(Warning, "No quantization defined, defaulting to 35!")
config.Quantization = defaultQuantization
} else {
if integer, err := strconv.Atoi(config.Quantization); integer < 0 || integer > 51 || err != nil {
return errors.New("Bad quantization defined in config!")
}
}
}
// Start invokes a revidInst to start processing video from a defined input
// and packetising to a defined output.
func (r *revidInst) setupOutputForRtmp(){
r.ffmpegCmd = exec.Command(ffmpegPath,
"-f", "h264",
"-r", r.config.FrameRate,
"-i", "-",
"-f", "lavfi",
"-i", "aevalsrc=0",
"-fflags", "nobuffer",
"-vcodec", "copy",
"-acodec", "aac",
"-map", "0:0",
"-map", "1:0",
"-strict", "experimental",
"-f", "flv",
r.config.RtmpUrl,
)
var err error
r.ffmpegStdin, err = r.ffmpegCmd.StdinPipe()
if err != nil {
r.Log(Error, err.Error())
r.Stop()
return
}
err = r.ffmpegCmd.Start()
if err != nil {
r.Log(Error, err.Error())
r.Stop()
return
}
}
// Start invokes a revidInst to start processing video from a defined input
// and packetising to a defined output.
func (r *revidInst) setupInputForRaspivid(){
r.Log(Info, "Starting raspivid!")
switch r.config.InputCodec {
case H264:
r.cmd = exec.Command("raspivid",
"-cd", "H264",
"-o", "-",
"-n",
"-t", r.config.Timeout,
"-b", r.config.Bitrate,
"-qp", r.config.Quantization,
"-w", r.config.Width,
"-h", r.config.Height,
"-fps", r.config.FrameRate,
"-ih",
"-g", r.config.IntraRefreshPeriod,
)
case Mjpeg:
r.cmd = exec.Command("raspivid",
"-cd", "MJPEG",
"-o", "-",
"-n",
"-t", r.config.Timeout,
"-fps", r.config.FrameRate,
)
}
stdout, _ := r.cmd.StdoutPipe()
err := r.cmd.Start()
r.inputReader = bufio.NewReader(stdout)
if err != nil {
r.Log(Error, err.Error())
return
}
go r.readCamera()
}
// Start invokes a revidInst to start processing video from a defined input
func (r *revidInst) setupInputForFile(){
go r.readFile()
}
// readCamera reads data from the defined camera while the revidInst is running.
func (r *revidInst) readCamera() {
r.Log(Info, "Reading camera data!")
for r.isRunning {
data := make([]byte, 1)
_, err := io.ReadFull(r.inputReader, data)
switch {
case err != nil && err.Error() == "EOF" && r.isRunning:
r.Log(Error, "No data from camera!")
time.Sleep(5 * time.Second)
case err != nil && r.isRunning:
r.Log(Error, err.Error())
default:
r.parser.GetInputChan() <- data[0]
}
}
r.Log(Info, "Out of reading routine!")
}
// readFile reads data from the defined file while the revidInst is running.
func (r *revidInst) readFile() {
for {
if len(r.parser.GetInputChan()) == 0 {
var err error
r.inputFile, err = os.Open(r.config.InputFileName)
if err != nil {
r.Log(Error, err.Error())
r.Stop()
return
}
stats, err := r.inputFile.Stat()
if err != nil {
r.Log(Error, "Could not get input file stats!")
r.Stop()
return
}
data := make([]byte, stats.Size())
_, err = r.inputFile.Read(data)
if err != nil {
r.Log(Error, err.Error())
r.Stop()
return
}
for i := range data {
r.parser.GetInputChan() <- data[i]
}
r.inputFile.Close()
}
}
}