Merged in revid/loadsender (pull request #28)

revid: use loadSender for clip sending
This commit is contained in:
kortschak 2018-06-09 05:17:08 +00:00
commit b46ed2a954
4 changed files with 430 additions and 281 deletions

View File

@ -95,7 +95,7 @@ const (
// Globals
var (
revidInst revid.Revid
revidInst *revid.Revid
config revid.Config
)
@ -341,7 +341,7 @@ func startRevid() {
func createRevidInstance() {
// Try to create the revid instance with the given config
var err error
for revidInst, err = revid.NewRevid(config); err != nil; {
for revidInst, err = revid.New(config); err != nil; {
// If the config does have a logger, use it to output error, otherwise
// just output to std output
if config.Logger != nil {

View File

@ -104,46 +104,46 @@ const (
// Validate checks for any errors in the config fields and defaults settings
// if particular parameters have not been defined.
func (config *Config) Validate(r *revid) error {
switch config.Verbosity {
func (c *Config) Validate(r *Revid) error {
switch c.Verbosity {
case Yes:
case No:
case NothingDefined:
config.Verbosity = Yes
c.Verbosity = Yes
r.Log(Warning, "No verbosity mode defined, defaulting to no Verbosity!")
default:
return errors.New("Bad Verbosity defined in config!")
}
switch config.QuantizationMode {
switch c.QuantizationMode {
case QuantizationOn:
case QuantizationOff:
case NothingDefined:
r.Log(Warning, "No quantization mode defined, defaulting to QuantizationOff!")
config.QuantizationMode = QuantizationOff
c.QuantizationMode = QuantizationOff
default:
return errors.New("Bad QuantizationMode defined in config!")
}
switch config.Input {
switch c.Input {
case Rtp:
case Raspivid:
case File:
case NothingDefined:
r.Log(Warning, "No input type defined, defaulting to raspivid!")
config.Input = defaultInput
c.Input = defaultInput
default:
return errors.New("Bad input type defined in config!")
}
switch config.InputCodec {
switch c.InputCodec {
case H264:
if config.Bitrate != "" && config.Quantization != "" {
bitrate, err := strconv.Atoi(config.Bitrate)
if c.Bitrate != "" && c.Quantization != "" {
bitrate, err := strconv.Atoi(c.Bitrate)
if err != nil {
return errors.New("Something is wrong with bitrate in conig!")
}
quantization, err := strconv.Atoi(config.Quantization)
quantization, err := strconv.Atoi(c.Quantization)
if err != nil {
return errors.New("Something is wrong with quantization in config!")
}
@ -152,141 +152,141 @@ func (config *Config) Validate(r *revid) error {
}
}
case Mjpeg:
if config.Quantization != "" {
quantization, err := strconv.Atoi(config.Quantization)
if c.Quantization != "" {
quantization, err := strconv.Atoi(c.Quantization)
if err != nil {
return errors.New("Something is wrong with quantization in config!")
}
if quantization > 0 || config.Bitrate == "" {
if quantization > 0 || c.Bitrate == "" {
return errors.New("Bad bitrate or quantization for mjpeg input!")
}
}
case NothingDefined:
r.Log(Warning, "No input codec defined, defaulting to h264!")
config.InputCodec = H264
c.InputCodec = H264
r.Log(Warning, "Defaulting bitrate to 0 and quantization to 35!")
config.Quantization = defaultQuantization
c.Quantization = defaultQuantization
default:
return errors.New("Bad input codec defined in config!")
}
switch config.Output {
switch c.Output {
case Http:
case File:
case NativeRtmp, FfmpegRtmp:
if config.RtmpUrl == "" {
if c.RtmpUrl == "" {
r.Log(Info, "No RTMP URL: falling back to HTTP")
config.Output = Http
c.Output = Http
break
}
r.Log(Info, "Defaulting frames per clip to 1 for rtmp output!")
config.FramesPerClip = "1"
c.FramesPerClip = "1"
case NothingDefined:
r.Log(Warning, "No output defined, defaulting to httpOut!")
config.Output = defaultOutput
c.Output = defaultOutput
default:
return errors.New("Bad output type defined in config!")
}
switch config.Packetization {
switch c.Packetization {
case None:
case Mpegts:
case Flv:
case NothingDefined:
r.Log(Warning, "No packetization option defined, defaulting to none!")
config.Packetization = Flv
c.Packetization = Flv
default:
return errors.New("Bad packetization option defined in config!")
}
switch config.HorizontalFlip {
switch c.HorizontalFlip {
case Yes:
case No:
case NothingDefined:
r.Log(Warning, "No horizontal flip option defined, defaulting to not flipped!")
config.HorizontalFlip = defaultHorizontalFlip
c.HorizontalFlip = defaultHorizontalFlip
default:
return errors.New("Bad horizontal flip option defined in config!")
}
switch config.VerticalFlip {
switch c.VerticalFlip {
case Yes:
case No:
case NothingDefined:
r.Log(Warning, "No vertical flip option defined, defaulting to not flipped!")
config.VerticalFlip = defaultVerticalFlip
c.VerticalFlip = defaultVerticalFlip
default:
return errors.New("Bad vertical flip option defined in config!")
}
if config.FramesPerClip == "" {
if c.FramesPerClip == "" {
r.Log(Warning, "No FramesPerClip defined defined, defaulting to 1!")
config.Width = defaultFramesPerClip
c.Width = defaultFramesPerClip
} else {
if integer, err := strconv.Atoi(config.FramesPerClip); integer <= 0 || err != nil {
if integer, err := strconv.Atoi(c.FramesPerClip); integer <= 0 || err != nil {
return errors.New("Bad width defined in config!")
}
}
if config.Width == "" {
if c.Width == "" {
r.Log(Warning, "No width defined, defaulting to 1280!")
config.Width = defaultWidth
c.Width = defaultWidth
} else {
if integer, err := strconv.Atoi(config.Width); integer < 0 || err != nil {
if integer, err := strconv.Atoi(c.Width); integer < 0 || err != nil {
return errors.New("Bad width defined in config!")
}
}
if config.Height == "" {
if c.Height == "" {
r.Log(Warning, "No height defined, defaulting to 720!")
config.Height = defaultHeight
c.Height = defaultHeight
} else {
if integer, err := strconv.Atoi(config.Height); integer < 0 || err != nil {
if integer, err := strconv.Atoi(c.Height); integer < 0 || err != nil {
return errors.New("Bad height defined in config!")
}
}
if config.FrameRate == "" {
if c.FrameRate == "" {
r.Log(Warning, "No frame rate defined, defaulting to 25!")
config.FrameRate = defaultFrameRate
c.FrameRate = defaultFrameRate
} else {
if integer, err := strconv.Atoi(config.FrameRate); integer < 0 || err != nil {
if integer, err := strconv.Atoi(c.FrameRate); integer < 0 || err != nil {
return errors.New("Bad frame rate defined in config!")
}
}
if config.Bitrate == "" {
if c.Bitrate == "" {
r.Log(Warning, "No bitrate defined, defaulting!")
config.Bitrate = defaultBitrate
c.Bitrate = defaultBitrate
} else {
if integer, err := strconv.Atoi(config.Bitrate); integer < 0 || err != nil {
if integer, err := strconv.Atoi(c.Bitrate); integer < 0 || err != nil {
return errors.New("Bad bitrate defined in config!")
}
}
if config.Timeout == "" {
if c.Timeout == "" {
r.Log(Warning, "No timeout defined, defaulting to 0!")
config.Timeout = defaultTimeout
c.Timeout = defaultTimeout
} else {
if integer, err := strconv.Atoi(config.Timeout); integer < 0 || err != nil {
if integer, err := strconv.Atoi(c.Timeout); integer < 0 || err != nil {
return errors.New("Bad timeout defined in config!")
}
}
if config.IntraRefreshPeriod == "" {
if c.IntraRefreshPeriod == "" {
r.Log(Warning, "No intra refresh defined, defaulting to 100!")
config.IntraRefreshPeriod = defaultIntraRefreshPeriod
c.IntraRefreshPeriod = defaultIntraRefreshPeriod
} else {
if integer, err := strconv.Atoi(config.IntraRefreshPeriod); integer < 0 || err != nil {
if integer, err := strconv.Atoi(c.IntraRefreshPeriod); integer < 0 || err != nil {
return errors.New("Bad intra refresh defined in config!")
}
}
if config.Quantization == "" {
if c.Quantization == "" {
r.Log(Warning, "No quantization defined, defaulting to 35!")
config.Quantization = defaultQuantization
c.Quantization = defaultQuantization
} else {
if integer, err := strconv.Atoi(config.Quantization); integer < 0 || integer > 51 || err != nil {
if integer, err := strconv.Atoi(c.Quantization); integer < 0 || integer > 51 || err != nil {
return errors.New("Bad quantization defined in config!")
}
}

View File

@ -31,17 +31,13 @@ package revid
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"os/exec"
"path/filepath"
"strconv"
"sync"
"time"
"bitbucket.org/ausocean/av/generator"
@ -86,74 +82,55 @@ const (
// Revid provides methods to control a revid session; providing methods
// to start, stop and change the state of an instance using the Config struct.
type Revid interface {
Start()
Stop()
changeState(newconfig Config) error
GetConfigRef() *Config
Log(logType, m string)
IsRunning() bool
GetBitrate() int64
}
// The revid struct provides fields to describe the state of a Revid.
type revid struct {
type Revid struct {
ffmpegPath string
tempDir string
ringBuffer *ring.Buffer
config Config
isRunning bool
outputFile *os.File
inputFile *os.File
generator generator.Generator
parser parser.Parser
cmd *exec.Cmd
ffmpegCmd *exec.Cmd
inputReader *bufio.Reader
ffmpegStdin io.WriteCloser
outputChan chan []byte
setupInput func() error
setupOutput func() error
getFrame func() []byte
sendClip func(*ring.Chunk) error
destination loadSender
rtmpInst rtmp.Session
mutex sync.Mutex
sendMutex sync.Mutex
currentBitrate int64
}
// NewRevid returns a pointer to a new revid with the desired
// NewRevid returns a pointer to a new Revid with the desired
// configuration, and/or an error if construction of the new instant was not
// successful.
func NewRevid(config Config) (r *revid, err error) {
r = new(revid)
r.mutex = sync.Mutex{}
r.sendMutex = sync.Mutex{}
r.ringBuffer = ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout)
err = r.changeState(config)
func New(c Config) (*Revid, error) {
var r Revid
err := r.reset(c)
if err != nil {
r = nil
return
return nil, err
}
r.ringBuffer = ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout)
r.outputChan = make(chan []byte, outputChanSize)
r.isRunning = false
return
return &r, nil
}
// Returns the currently saved bitrate from the most recent bitrate check
// check bitrate output delay in consts for this period
func (r *revid) GetBitrate() int64 {
func (r *Revid) GetBitrate() int64 {
return r.currentBitrate
}
// GetConfigRef returns a pointer to the revidInst's Config struct object
func (r *revid) GetConfigRef() *Config {
func (r *Revid) GetConfigRef() *Config {
return &r.config
}
// changeState swaps the current config of a revid with the passed
// reset swaps the current config of a Revid with the passed
// configuration; checking validity and returning errors if not valid.
func (r *revid) changeState(config Config) error {
func (r *Revid) reset(config Config) error {
r.config.Logger = config.Logger
err := config.Validate(r)
if err != nil {
@ -161,18 +138,33 @@ func (r *revid) changeState(config Config) error {
}
r.config = config
if r.destination != nil {
err = r.destination.close()
if err != nil {
r.Log(Error, err.Error())
}
}
switch r.config.Output {
case File:
r.setupOutput = r.setupOutputForFile
r.sendClip = r.sendClipToFile
s, err := newFileSender(config.OutputFileName)
if err != nil {
return err
}
r.destination = s
case FfmpegRtmp:
r.setupOutput = r.setupOutputForFfmpegRtmp
r.sendClip = r.sendClipToFfmpegRtmp
s, err := newFfmpegSender(config.RtmpUrl, r.config.FrameRate)
if err != nil {
return err
}
r.destination = s
case NativeRtmp:
r.setupOutput = r.setupOutputForLibRtmp
r.sendClip = r.sendClipToLibRtmp
s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimout, rtmpConnectionMaxTries, r.Log)
if err != nil {
return err
}
r.destination = s
case Http:
r.sendClip = r.sendClipToHTTP
r.destination = newHttpSender(config.RtmpUrl, httpTimeout, r.Log)
}
switch r.config.Input {
@ -214,25 +206,25 @@ noPacketizationSetup:
return nil
}
// ChangeConfig changes the current configuration of the revid instance.
func (r *revid) ChangeConfig(config Config) (err error) {
// ChangeConfig changes the current configuration of the Revid instance.
func (r *Revid) ChangeConfig(c Config) error {
// FIXME(kortschak): This is reimplemented in cmd/revid-cli/main.go.
// The implementation in the command is used and this is not.
// Decide on one or the other.
r.Stop()
r, err = NewRevid(config)
r, err := New(c)
if err != nil {
return
return err
}
r.Start()
return
return err
}
// Log takes a logtype and message and tries to send this information to the
// logger provided in the revid config - if there is one, otherwise the message
// is sent to stdout
func (r *revid) Log(logType, m string) {
func (r *Revid) Log(logType, m string) {
if r.config.Verbosity == Yes {
if r.config.Logger != nil {
r.config.Logger.Log("revid", logType, m)
@ -243,28 +235,19 @@ func (r *revid) Log(logType, m string) {
}
// IsRunning returns true if the revid is currently running and false otherwise
func (r *revid) IsRunning() bool {
func (r *Revid) IsRunning() bool {
return r.isRunning
}
// Start invokes a revid to start processing video from a defined input
// Start invokes a Revid to start processing video from a defined input
// and packetising (if theres packetization) to a defined output.
func (r *revid) Start() {
r.mutex.Lock()
defer r.mutex.Unlock()
func (r *Revid) Start() {
if r.isRunning {
r.Log(Warning, "revid.Start() called but revid already running!")
r.Log(Warning, "Revid.Start() called but revid already running!")
return
}
r.Log(Info, "Starting Revid!")
r.Log(Debug, "Setting up output!")
if r.setupOutput != nil {
err := r.setupOutput()
if err != nil {
r.Log(Error, err.Error())
return
}
}
r.isRunning = true
r.Log(Info, "Starting output routine!")
go r.outputClips()
@ -279,20 +262,13 @@ func (r *revid) Start() {
}
// Stop halts any processing of video data from a camera or file
func (r *revid) Stop() {
r.mutex.Lock()
defer r.mutex.Unlock()
func (r *Revid) Stop() {
if !r.isRunning {
r.Log(Warning, "revid.Stop() called but revid not running!")
r.Log(Warning, "Revid.Stop() called but revid not running!")
return
}
r.Log(Info, "Stopping revid!")
// wait for sending to finish
r.sendMutex.Lock()
defer r.sendMutex.Unlock()
r.rtmpInst.Close()
r.isRunning = false
r.Log(Info, "Stopping generator!")
@ -315,18 +291,18 @@ func (r *revid) Stop() {
// getFrameNoPacketization gets a frame directly from the revid output chan
// as we don't need to go through the generator with no packetization settings
func (r *revid) getFrameNoPacketization() []byte {
func (r *Revid) getFrameNoPacketization() []byte {
return <-r.outputChan
}
// getFramePacketization gets a frame from the generators output chan - the
// the generator being an mpegts or flv generator depending on the config
func (r *revid) getFramePacketization() []byte {
func (r *Revid) getFramePacketization() []byte {
return <-(r.generator.GetOutputChan())
}
// flushDataPacketization removes data from the revid inst's coutput chan
func (r *revid) flushData() {
// flushDataPacketization removes data from the Revid inst's coutput chan
func (r *Revid) flushData() {
switch r.config.Packetization {
case Flv:
for {
@ -342,7 +318,7 @@ done:
// packClips takes data segments; whether that be tsPackets or mjpeg frames and
// packs them into clips consisting of the amount frames specified in the config
func (r *revid) packClips() {
func (r *Revid) packClips() {
clipSize := 0
packetCount := 0
for r.isRunning {
@ -384,7 +360,7 @@ func (r *revid) packClips() {
// outputClips takes the clips produced in the packClips method and outputs them
// to the desired output defined in the revid config
func (r *revid) outputClips() {
func (r *Revid) outputClips() {
now := time.Now()
prevTime := now
bytes := 0
@ -409,15 +385,20 @@ func (r *revid) outputClips() {
bytes += chunk.Len()
r.Log(Detail, "About to send")
err = r.sendClip(chunk)
err = r.destination.load(chunk)
if err != nil {
r.Log(Error, "failed to load clip")
}
err = r.destination.send()
if err == nil {
r.Log(Detail, "Sent clip")
r.Log(Detail, "sent clip")
}
if r.isRunning && err != nil && chunk.Len() > 11 {
r.Log(Debug, "Send failed! Trying again")
// Try and send again
err = r.sendClip(chunk)
err = r.destination.send()
r.Log(Error, err.Error())
// if there's still an error we try and reconnect, unless we're stopping
for r.isRunning && err != nil {
@ -425,22 +406,26 @@ func (r *revid) outputClips() {
time.Sleep(time.Duration(sendFailedDelay) * time.Millisecond)
r.Log(Error, err.Error())
if r.config.Output == NativeRtmp {
r.Log(Debug, "Ending current rtmp session...")
r.rtmpInst.Close()
}
if r.config.Output == NativeRtmp {
r.Log(Info, "Restarting rtmp session...")
r.rtmpInst.StartSession()
if rs, ok := r.destination.(restarter); ok {
r.Log(Debug, fmt.Sprintf("restarting %T session", rs))
err = rs.restart()
if err != nil {
// TODO(kortschak): Make this "Fatal" when that exists.
r.Log(Error, "failed to restart rtmp session")
r.isRunning = false
return
}
r.Log(Info, "restarted rtmp session")
}
r.Log(Debug, "Trying to send again with new connection...")
r.sendClip(chunk) // TODO(kortschak): Log these errors?
err = r.destination.send()
r.Log(Error, err.Error())
}
}
chunk.Close() // ring.Chunk is an io.Closer, but Close alwats returns nil.
r.destination.release()
r.Log(Detail, "Done reading that clip from ringbuffer...")
// Log some information regarding bitrate and ring buffer size if it's time
@ -455,134 +440,15 @@ func (r *revid) outputClips() {
}
}
r.Log(Info, "Not outputting clips anymore!")
}
// senClipToFile writes the passed clip to a file
func (r *revid) sendClipToFile(clip *ring.Chunk) error {
r.sendMutex.Lock()
_, err := clip.WriteTo(r.outputFile)
r.sendMutex.Unlock()
return err
}
// sendClipToHTTP takes a clip and an output url and posts through http.
func (r *revid) sendClipToHTTP(clip *ring.Chunk) error {
defer r.sendMutex.Unlock()
r.sendMutex.Lock()
client := http.Client{Timeout: httpTimeout}
url := r.config.HttpAddress + strconv.Itoa(clip.Len())
// FIXME(kortschak): This is necessary because Post takes
// an io.Reader as a parameter and closes it if it is an
// io.Closer (which *ring.Chunk is), ... and because we
// use a method value for dispatching the sendClip work.
// So to save work in this case, sendClip should be made
// a proper method with a behaviour switch based on a
// revid field so that we can prepare these bytes only
// once for each clip (reusing a buffer field? or tt
// might be work using a sync.Pool for the bodies).
post := bytes.NewBuffer(make([]byte, 0, clip.Len()))
_, err := clip.WriteTo(post)
err := r.destination.close()
if err != nil {
return fmt.Errorf("Error buffering: %v", err)
r.Log(Error, "failed to close destination")
}
r.Log(Debug, fmt.Sprintf("Posting %s (%d bytes)\n", url, clip.Len()))
resp, err := client.Post(url, "video/mp2t", post)
if err != nil {
return fmt.Errorf("Error posting to %s: %s", url, err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err == nil {
r.Log(Debug, fmt.Sprintf("%s\n", body))
} else {
r.Log(Error, err.Error())
}
return nil
}
// sendClipToFfmpegRtmp sends the clip over the current rtmp connection using
// an ffmpeg process.
func (r *revid) sendClipToFfmpegRtmp(clip *ring.Chunk) error {
r.sendMutex.Lock()
_, err := clip.WriteTo(r.ffmpegStdin)
r.sendMutex.Unlock()
return err
}
// sendClipToLibRtmp send the clip over the current rtmp connection using the
// c based librtmp library
func (r *revid) sendClipToLibRtmp(clip *ring.Chunk) error {
r.sendMutex.Lock()
_, err := clip.WriteTo(r.rtmpInst)
r.sendMutex.Unlock()
return err
}
// setupOutputForFfmpegRtmp sets up output to rtmp using an ffmpeg process
func (r *revid) setupOutputForFfmpegRtmp() error {
r.ffmpegCmd = exec.Command(ffmpegPath,
"-f", "h264",
"-r", r.config.FrameRate,
"-i", "-",
"-f", "lavfi",
"-i", "aevalsrc=0",
"-fflags", "nobuffer",
"-vcodec", "copy",
"-acodec", "aac",
"-map", "0:0",
"-map", "1:0",
"-strict", "experimental",
"-f", "flv",
r.config.RtmpUrl,
)
var err error
r.ffmpegStdin, err = r.ffmpegCmd.StdinPipe()
if err != nil {
r.Log(Error, err.Error())
r.Stop()
return err
}
err = r.ffmpegCmd.Start()
if err != nil {
r.Log(Error, err.Error())
r.Stop()
return err
}
return nil
}
// setupOutputForLibRtmp sets up rtmp output using the wrapper for the c based
// librtmp library - makes connection and starts comms etc.
func (r *revid) setupOutputForLibRtmp() error {
r.rtmpInst = rtmp.NewSession(r.config.RtmpUrl, rtmpConnectionTimout)
err := r.rtmpInst.StartSession()
for noOfTries := 0; err != nil && noOfTries < rtmpConnectionMaxTries; noOfTries++ {
r.rtmpInst.Close()
r.Log(Error, err.Error())
r.Log(Info, "Trying to establish rtmp connection again!")
r.rtmpInst = rtmp.NewSession(r.config.RtmpUrl, rtmpConnectionTimout)
err = r.rtmpInst.StartSession()
}
if err != nil {
return err
}
return err
}
// setupOutputForFile sets up an output file to output data to
func (r *revid) setupOutputForFile() (err error) {
r.outputFile, err = os.Create(r.config.OutputFileName)
return
}
// setupInputForRaspivid sets up things for input from raspivid i.e. starts
// a raspivid process and pipes it's data output.
func (r *revid) setupInputForRaspivid() error {
func (r *Revid) setupInputForRaspivid() error {
r.Log(Info, "Starting raspivid!")
switch r.config.InputCodec {
case H264:
@ -642,27 +508,16 @@ func (r *revid) setupInputForRaspivid() error {
}
// setupInputForFile sets things up for getting input from a file
func (r *revid) setupInputForFile() error {
func (r *Revid) setupInputForFile() error {
fps, _ := strconv.Atoi(r.config.FrameRate)
r.parser.SetDelay(uint(float64(1000) / float64(fps)))
r.readFile()
return nil
}
// testRtmp is useful to check robustness of connections. Intended to be run as
// goroutine. After every 'delayTime' the rtmp connection is ended and then
// restarted
func (r *revid) testRtmp(delayTime uint) {
for {
time.Sleep(time.Duration(delayTime) * time.Millisecond)
r.rtmpInst.Close()
r.rtmpInst.StartSession()
}
}
// readCamera reads data from the defined camera while the revid is running.
// readCamera reads data from the defined camera while the Revid is running.
// TODO: use ringbuffer here instead of allocating mem every time!
func (r *revid) readCamera() {
func (r *Revid) readCamera() {
r.Log(Info, "Reading camera data!")
for r.isRunning {
data := make([]byte, 1)
@ -679,8 +534,8 @@ func (r *revid) readCamera() {
r.Log(Info, "Not trying to read from camera anymore!")
}
// readFile reads data from the defined file while the revid is running.
func (r *revid) readFile() error {
// readFile reads data from the defined file while the Revid is running.
func (r *Revid) readFile() error {
var err error
r.inputFile, err = os.Open(r.config.InputFileName)
if err != nil {

294
revid/senders.go Normal file
View File

@ -0,0 +1,294 @@
/*
NAME
senders.go
DESCRIPTION
See Readme.md
AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org>
Alan Noble <alan@ausocean.org>
LICENSE
revid is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package revid
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"os/exec"
"strconv"
"time"
"bitbucket.org/ausocean/av/rtmp"
"bitbucket.org/ausocean/utils/ring"
)
// loadSender is a destination to send a *ring.Chunk to.
// When a loadSender has finished using the *ring.Chunk
// it must be Closed.
type loadSender interface {
// load assigns the *ring.Chunk to the loadSender.
// The load call may fail, but must not mutate the
// the chunk.
load(*ring.Chunk) error
// send performs a destination-specific send
// operation. It must not mutate the chunk.
send() error
// release releases the *ring.Chunk.
release()
// close cleans up after use of the loadSender.
close() error
}
// restart is an optional interface for loadSenders that
// can restart their connection.
type restarter interface {
restart() error
}
// fileSender implements loadSender for a local file destination.
type fileSender struct {
file *os.File
chunk *ring.Chunk
}
func newFileSender(path string) (*fileSender, error) {
f, err := os.Create(path)
if err != nil {
return nil, err
}
return &fileSender{file: f}, nil
}
func (s *fileSender) load(c *ring.Chunk) error {
s.chunk = c
return nil
}
func (s *fileSender) send() error {
_, err := s.chunk.WriteTo(s.file)
return err
}
func (s *fileSender) release() {
s.chunk.Close()
s.chunk = nil
}
func (s *fileSender) close() error {
return s.file.Close()
}
// httpSender implements loadSender for an HTTP destination.
type httpSender struct {
client http.Client
url string
log func(lvl, msg string)
buf []byte
chunk *ring.Chunk
}
func newHttpSender(url string, timeout time.Duration, log func(lvl, msg string)) *httpSender {
return &httpSender{
client: http.Client{Timeout: timeout},
url: url,
log: log,
}
}
func (s *httpSender) load(c *ring.Chunk) error {
buf := bytes.NewBuffer(s.buf[:0])
_, err := s.chunk.WriteTo(buf)
s.buf = buf.Bytes()
if err != nil {
return fmt.Errorf("fileSender: %v", err)
}
return nil
}
func (s *httpSender) send() error {
url := s.url + strconv.Itoa(len(s.buf))
s.log(Debug, fmt.Sprintf("Posting %s (%d bytes)\n", url, len(s.buf)))
resp, err := s.client.Post(url, "video/mp2t", bytes.NewReader(s.buf))
if err != nil {
return fmt.Errorf("Error posting to %s: %s", url, err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err == nil {
s.log(Debug, fmt.Sprintf("%s\n", body))
} else {
s.log(Error, err.Error())
}
return err
}
func (s *httpSender) release() {
s.chunk.Close()
s.chunk = nil
}
func (s *httpSender) close() error { return nil }
// ffmpegSender implements loadSender for an FFMPEG RTMP destination.
type ffmpegSender struct {
ffmpeg io.WriteCloser
chunk *ring.Chunk
}
func newFfmpegSender(url, framerate string) (*ffmpegSender, error) {
cmd := exec.Command(ffmpegPath,
"-f", "h264",
"-r", framerate,
"-i", "-",
"-f", "lavfi",
"-i", "aevalsrc=0",
"-fflags", "nobuffer",
"-vcodec", "copy",
"-acodec", "aac",
"-map", "0:0",
"-map", "1:0",
"-strict", "experimental",
"-f", "flv",
url,
)
w, err := cmd.StdinPipe()
if err != nil {
return nil, err
}
err = cmd.Start()
if err != nil {
return nil, err
}
return &ffmpegSender{ffmpeg: w}, nil
}
func (s *ffmpegSender) load(c *ring.Chunk) error {
s.chunk = c
return nil
}
func (s *ffmpegSender) send() error {
_, err := s.chunk.WriteTo(s.ffmpeg)
return err
}
func (s *ffmpegSender) release() {
s.chunk.Close()
s.chunk = nil
}
func (s *ffmpegSender) close() error {
return s.ffmpeg.Close()
}
// rtmpSender implements loadSender for a native RTMP destination.
type rtmpSender struct {
sess rtmp.Session
url string
timeout uint
retries int
log func(lvl, msg string)
chunk *ring.Chunk
}
var _ restarter = (*rtmpSender)(nil)
func newRtmpSender(url string, timeout uint, retries int, log func(lvl, msg string)) (*rtmpSender, error) {
var sess rtmp.Session
var err error
for n := 0; n < retries; n++ {
sess = rtmp.NewSession(url, timeout)
err = sess.StartSession()
if err == nil {
break
}
log(Error, err.Error())
sess.Close()
if n < retries-1 {
log(Info, "retry rtmp connection")
}
}
if err != nil {
return nil, err
}
s := &rtmpSender{
sess: sess,
url: url,
timeout: timeout,
retries: retries,
log: log,
}
return s, nil
}
func (s *rtmpSender) load(c *ring.Chunk) error {
s.chunk = c
return nil
}
func (s *rtmpSender) send() error {
_, err := s.chunk.WriteTo(s.sess)
return err
}
func (s *rtmpSender) release() {
s.chunk.Close()
s.chunk = nil
}
func (s *rtmpSender) restart() error {
err := s.sess.Close()
if err != nil {
return err
}
for n := 0; n < s.retries; n++ {
s.sess = rtmp.NewSession(s.url, s.timeout)
err = s.sess.StartSession()
if err == nil {
break
}
s.log(Error, err.Error())
s.sess.Close()
if n < s.retries-1 {
s.log(Info, "retry rtmp connection")
}
}
return err
}
func (s *rtmpSender) close() error {
return s.sess.Close()
}