revid: use loadSender for clip sending

This commit is contained in:
Dan Kortschak 2018-06-09 14:08:48 +09:30
parent 3881cb9712
commit 1105ee3ea9
2 changed files with 341 additions and 177 deletions

View File

@ -31,17 +31,13 @@ package revid
import ( import (
"bufio" "bufio"
"bytes"
"errors" "errors"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"net/http"
"os" "os"
"os/exec" "os/exec"
"path/filepath" "path/filepath"
"strconv" "strconv"
"sync"
"time" "time"
"bitbucket.org/ausocean/av/generator" "bitbucket.org/ausocean/av/generator"
@ -92,22 +88,18 @@ type Revid struct {
ringBuffer *ring.Buffer ringBuffer *ring.Buffer
config Config config Config
isRunning bool isRunning bool
outputFile *os.File
inputFile *os.File inputFile *os.File
generator generator.Generator generator generator.Generator
parser parser.Parser parser parser.Parser
cmd *exec.Cmd cmd *exec.Cmd
ffmpegCmd *exec.Cmd
inputReader *bufio.Reader inputReader *bufio.Reader
ffmpegStdin io.WriteCloser ffmpegStdin io.WriteCloser
outputChan chan []byte outputChan chan []byte
setupInput func() error setupInput func() error
setupOutput func() error setupOutput func() error
getFrame func() []byte getFrame func() []byte
sendClip func(*ring.Chunk) error destination loadSender
rtmpInst rtmp.Session rtmpInst rtmp.Session
mutex sync.Mutex
sendMutex sync.Mutex
currentBitrate int64 currentBitrate int64
} }
@ -146,18 +138,33 @@ func (r *Revid) reset(config Config) error {
} }
r.config = config r.config = config
if r.destination != nil {
err = r.destination.close()
if err != nil {
r.Log(Error, err.Error())
}
}
switch r.config.Output { switch r.config.Output {
case File: case File:
r.setupOutput = r.setupOutputForFile s, err := newFileSender(config.OutputFileName)
r.sendClip = r.sendClipToFile if err != nil {
return err
}
r.destination = s
case FfmpegRtmp: case FfmpegRtmp:
r.setupOutput = r.setupOutputForFfmpegRtmp s, err := newFfmpegSender(config.RtmpUrl, r.config.FrameRate)
r.sendClip = r.sendClipToFfmpegRtmp if err != nil {
return err
}
r.destination = s
case NativeRtmp: case NativeRtmp:
r.setupOutput = r.setupOutputForLibRtmp s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimout, rtmpConnectionMaxTries, r.Log)
r.sendClip = r.sendClipToLibRtmp if err != nil {
return err
}
r.destination = s
case Http: case Http:
r.sendClip = r.sendClipToHTTP r.destination = newHttpSender(config.RtmpUrl, httpTimeout, r.Log)
} }
switch r.config.Input { switch r.config.Input {
@ -235,21 +242,12 @@ func (r *Revid) IsRunning() bool {
// Start invokes a Revid to start processing video from a defined input // Start invokes a Revid to start processing video from a defined input
// and packetising (if theres packetization) to a defined output. // and packetising (if theres packetization) to a defined output.
func (r *Revid) Start() { func (r *Revid) Start() {
r.mutex.Lock()
defer r.mutex.Unlock()
if r.isRunning { if r.isRunning {
r.Log(Warning, "Revid.Start() called but revid already running!") r.Log(Warning, "Revid.Start() called but revid already running!")
return return
} }
r.Log(Info, "Starting Revid!") r.Log(Info, "Starting Revid!")
r.Log(Debug, "Setting up output!") r.Log(Debug, "Setting up output!")
if r.setupOutput != nil {
err := r.setupOutput()
if err != nil {
r.Log(Error, err.Error())
return
}
}
r.isRunning = true r.isRunning = true
r.Log(Info, "Starting output routine!") r.Log(Info, "Starting output routine!")
go r.outputClips() go r.outputClips()
@ -265,19 +263,12 @@ func (r *Revid) Start() {
// Stop halts any processing of video data from a camera or file // Stop halts any processing of video data from a camera or file
func (r *Revid) Stop() { func (r *Revid) Stop() {
r.mutex.Lock()
defer r.mutex.Unlock()
if !r.isRunning { if !r.isRunning {
r.Log(Warning, "Revid.Stop() called but revid not running!") r.Log(Warning, "Revid.Stop() called but revid not running!")
return return
} }
r.Log(Info, "Stopping revid!") r.Log(Info, "Stopping revid!")
// wait for sending to finish
r.sendMutex.Lock()
defer r.sendMutex.Unlock()
r.rtmpInst.Close()
r.isRunning = false r.isRunning = false
r.Log(Info, "Stopping generator!") r.Log(Info, "Stopping generator!")
@ -394,15 +385,20 @@ func (r *Revid) outputClips() {
bytes += chunk.Len() bytes += chunk.Len()
r.Log(Detail, "About to send") r.Log(Detail, "About to send")
err = r.sendClip(chunk) err = r.destination.load(chunk)
if err != nil {
r.Log(Error, "failed to load clip")
}
err = r.destination.send()
if err == nil { if err == nil {
r.Log(Detail, "Sent clip") r.Log(Detail, "sent clip")
} }
if r.isRunning && err != nil && chunk.Len() > 11 { if r.isRunning && err != nil && chunk.Len() > 11 {
r.Log(Debug, "Send failed! Trying again") r.Log(Debug, "Send failed! Trying again")
// Try and send again // Try and send again
err = r.sendClip(chunk) err = r.destination.send()
r.Log(Error, err.Error())
// if there's still an error we try and reconnect, unless we're stopping // if there's still an error we try and reconnect, unless we're stopping
for r.isRunning && err != nil { for r.isRunning && err != nil {
@ -410,22 +406,26 @@ func (r *Revid) outputClips() {
time.Sleep(time.Duration(sendFailedDelay) * time.Millisecond) time.Sleep(time.Duration(sendFailedDelay) * time.Millisecond)
r.Log(Error, err.Error()) r.Log(Error, err.Error())
if r.config.Output == NativeRtmp { if rs, ok := r.destination.(restarter); ok {
r.Log(Debug, "Ending current rtmp session...") r.Log(Debug, fmt.Sprintf("restarting %T session", rs))
r.rtmpInst.Close() err = rs.restart()
} if err != nil {
// TODO(kortschak): Make this "Fatal" when that exists.
if r.config.Output == NativeRtmp { r.Log(Error, "failed to restart rtmp session")
r.Log(Info, "Restarting rtmp session...") r.isRunning = false
r.rtmpInst.StartSession() return
}
r.Log(Info, "restarted rtmp session")
} }
r.Log(Debug, "Trying to send again with new connection...") r.Log(Debug, "Trying to send again with new connection...")
r.sendClip(chunk) // TODO(kortschak): Log these errors? err = r.destination.send()
r.Log(Error, err.Error())
} }
} }
chunk.Close() // ring.Chunk is an io.Closer, but Close alwats returns nil. r.destination.release()
r.Log(Detail, "Done reading that clip from ringbuffer...") r.Log(Detail, "Done reading that clip from ringbuffer...")
// Log some information regarding bitrate and ring buffer size if it's time // Log some information regarding bitrate and ring buffer size if it's time
@ -440,129 +440,10 @@ func (r *Revid) outputClips() {
} }
} }
r.Log(Info, "Not outputting clips anymore!") r.Log(Info, "Not outputting clips anymore!")
} err := r.destination.close()
// senClipToFile writes the passed clip to a file
func (r *Revid) sendClipToFile(clip *ring.Chunk) error {
r.sendMutex.Lock()
_, err := clip.WriteTo(r.outputFile)
r.sendMutex.Unlock()
return err
}
// sendClipToHTTP takes a clip and an output url and posts through http.
func (r *Revid) sendClipToHTTP(clip *ring.Chunk) error {
defer r.sendMutex.Unlock()
r.sendMutex.Lock()
client := http.Client{Timeout: httpTimeout}
url := r.config.HttpAddress + strconv.Itoa(clip.Len())
// FIXME(kortschak): This is necessary because Post takes
// an io.Reader as a parameter and closes it if it is an
// io.Closer (which *ring.Chunk is), ... and because we
// use a method value for dispatching the sendClip work.
// So to save work in this case, sendClip should be made
// a proper method with a behaviour switch based on a
// Revid field so that we can prepare these bytes only
// once for each clip (reusing a buffer field? or tt
// might be work using a sync.Pool for the bodies).
post := bytes.NewBuffer(make([]byte, 0, clip.Len()))
_, err := clip.WriteTo(post)
if err != nil { if err != nil {
return fmt.Errorf("Error buffering: %v", err) r.Log(Error, "failed to close destination")
} }
r.Log(Debug, fmt.Sprintf("Posting %s (%d bytes)\n", url, clip.Len()))
resp, err := client.Post(url, "video/mp2t", post)
if err != nil {
return fmt.Errorf("Error posting to %s: %s", url, err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err == nil {
r.Log(Debug, fmt.Sprintf("%s\n", body))
} else {
r.Log(Error, err.Error())
}
return nil
}
// sendClipToFfmpegRtmp sends the clip over the current rtmp connection using
// an ffmpeg process.
func (r *Revid) sendClipToFfmpegRtmp(clip *ring.Chunk) error {
r.sendMutex.Lock()
_, err := clip.WriteTo(r.ffmpegStdin)
r.sendMutex.Unlock()
return err
}
// sendClipToLibRtmp send the clip over the current rtmp connection using the
// c based librtmp library
func (r *Revid) sendClipToLibRtmp(clip *ring.Chunk) error {
r.sendMutex.Lock()
_, err := clip.WriteTo(r.rtmpInst)
r.sendMutex.Unlock()
return err
}
// setupOutputForFfmpegRtmp sets up output to rtmp using an ffmpeg process
func (r *Revid) setupOutputForFfmpegRtmp() error {
r.ffmpegCmd = exec.Command(ffmpegPath,
"-f", "h264",
"-r", r.config.FrameRate,
"-i", "-",
"-f", "lavfi",
"-i", "aevalsrc=0",
"-fflags", "nobuffer",
"-vcodec", "copy",
"-acodec", "aac",
"-map", "0:0",
"-map", "1:0",
"-strict", "experimental",
"-f", "flv",
r.config.RtmpUrl,
)
var err error
r.ffmpegStdin, err = r.ffmpegCmd.StdinPipe()
if err != nil {
r.Log(Error, err.Error())
r.Stop()
return err
}
err = r.ffmpegCmd.Start()
if err != nil {
r.Log(Error, err.Error())
r.Stop()
return err
}
return nil
}
// setupOutputForLibRtmp sets up rtmp output using the wrapper for the c based
// librtmp library - makes connection and starts comms etc.
func (r *Revid) setupOutputForLibRtmp() error {
r.rtmpInst = rtmp.NewSession(r.config.RtmpUrl, rtmpConnectionTimout)
err := r.rtmpInst.StartSession()
for noOfTries := 0; err != nil && noOfTries < rtmpConnectionMaxTries; noOfTries++ {
r.rtmpInst.Close()
r.Log(Error, err.Error())
r.Log(Info, "Trying to establish rtmp connection again!")
r.rtmpInst = rtmp.NewSession(r.config.RtmpUrl, rtmpConnectionTimout)
err = r.rtmpInst.StartSession()
}
if err != nil {
return err
}
return err
}
// setupOutputForFile sets up an output file to output data to
func (r *Revid) setupOutputForFile() (err error) {
r.outputFile, err = os.Create(r.config.OutputFileName)
return
} }
// setupInputForRaspivid sets up things for input from raspivid i.e. starts // setupInputForRaspivid sets up things for input from raspivid i.e. starts
@ -634,17 +515,6 @@ func (r *Revid) setupInputForFile() error {
return nil return nil
} }
// testRtmp is useful to check robustness of connections. Intended to be run as
// goroutine. After every 'delayTime' the rtmp connection is ended and then
// restarted
func (r *Revid) testRtmp(delayTime uint) {
for {
time.Sleep(time.Duration(delayTime) * time.Millisecond)
r.rtmpInst.Close()
r.rtmpInst.StartSession()
}
}
// readCamera reads data from the defined camera while the Revid is running. // readCamera reads data from the defined camera while the Revid is running.
// TODO: use ringbuffer here instead of allocating mem every time! // TODO: use ringbuffer here instead of allocating mem every time!
func (r *Revid) readCamera() { func (r *Revid) readCamera() {

294
revid/senders.go Normal file
View File

@ -0,0 +1,294 @@
/*
NAME
senders.go
DESCRIPTION
See Readme.md
AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org>
Alan Noble <alan@ausocean.org>
LICENSE
revid is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package revid
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"os/exec"
"strconv"
"time"
"bitbucket.org/ausocean/av/rtmp"
"bitbucket.org/ausocean/utils/ring"
)
// loadSender is a destination to send a *ring.Chunk to.
// When a loadSender has finished using the *ring.Chunk
// it must be Closed.
type loadSender interface {
// load assigns the *ring.Chunk to the loadSender.
// The load call may fail, but must not mutate the
// the chunk.
load(*ring.Chunk) error
// send performs a destination-specific send
// operation. It must not mutate the chunk.
send() error
// release releases the *ring.Chunk.
release()
// close cleans up after use of the loadSender.
close() error
}
// restart is an optional interface for loadSenders that
// can restart their connection.
type restarter interface {
restart() error
}
// fileSender implements loadSender for a local file destination.
type fileSender struct {
file *os.File
chunk *ring.Chunk
}
func newFileSender(path string) (*fileSender, error) {
f, err := os.Create(path)
if err != nil {
return nil, err
}
return &fileSender{file: f}, nil
}
func (s *fileSender) load(c *ring.Chunk) error {
s.chunk = c
return nil
}
func (s *fileSender) send() error {
_, err := s.chunk.WriteTo(s.file)
return err
}
func (s *fileSender) release() {
s.chunk.Close()
s.chunk = nil
}
func (s *fileSender) close() error {
return s.file.Close()
}
// httpSender implements loadSender for an HTTP destination.
type httpSender struct {
client http.Client
url string
log func(lvl, msg string)
buf []byte
chunk *ring.Chunk
}
func newHttpSender(url string, timeout time.Duration, log func(lvl, msg string)) *httpSender {
return &httpSender{
client: http.Client{Timeout: timeout},
url: url,
log: log,
}
}
func (s *httpSender) load(c *ring.Chunk) error {
buf := bytes.NewBuffer(s.buf[:0])
_, err := s.chunk.WriteTo(buf)
s.buf = buf.Bytes()
if err != nil {
return fmt.Errorf("fileSender: %v", err)
}
return nil
}
func (s *httpSender) send() error {
url := s.url + strconv.Itoa(len(s.buf))
s.log(Debug, fmt.Sprintf("Posting %s (%d bytes)\n", url, len(s.buf)))
resp, err := s.client.Post(url, "video/mp2t", bytes.NewReader(s.buf))
if err != nil {
return fmt.Errorf("Error posting to %s: %s", url, err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err == nil {
s.log(Debug, fmt.Sprintf("%s\n", body))
} else {
s.log(Error, err.Error())
}
return err
}
func (s *httpSender) release() {
s.chunk.Close()
s.chunk = nil
}
func (s *httpSender) close() error { return nil }
// ffmpegSender implements loadSender for an FFMPEG RTMP destination.
type ffmpegSender struct {
ffmpeg io.WriteCloser
chunk *ring.Chunk
}
func newFfmpegSender(url, framerate string) (*ffmpegSender, error) {
cmd := exec.Command(ffmpegPath,
"-f", "h264",
"-r", framerate,
"-i", "-",
"-f", "lavfi",
"-i", "aevalsrc=0",
"-fflags", "nobuffer",
"-vcodec", "copy",
"-acodec", "aac",
"-map", "0:0",
"-map", "1:0",
"-strict", "experimental",
"-f", "flv",
url,
)
w, err := cmd.StdinPipe()
if err != nil {
return nil, err
}
err = cmd.Start()
if err != nil {
return nil, err
}
return &ffmpegSender{ffmpeg: w}, nil
}
func (s *ffmpegSender) load(c *ring.Chunk) error {
s.chunk = c
return nil
}
func (s *ffmpegSender) send() error {
_, err := s.chunk.WriteTo(s.ffmpeg)
return err
}
func (s *ffmpegSender) release() {
s.chunk.Close()
s.chunk = nil
}
func (s *ffmpegSender) close() error {
return s.ffmpeg.Close()
}
// rtmpSender implements loadSender for a native RTMP destination.
type rtmpSender struct {
sess rtmp.Session
url string
timeout uint
retries int
log func(lvl, msg string)
chunk *ring.Chunk
}
var _ restarter = (*rtmpSender)(nil)
func newRtmpSender(url string, timeout uint, retries int, log func(lvl, msg string)) (*rtmpSender, error) {
var sess rtmp.Session
var err error
for n := 0; n < retries; n++ {
sess = rtmp.NewSession(url, timeout)
err = sess.StartSession()
if err == nil {
break
}
log(Error, err.Error())
sess.Close()
if n < retries-1 {
log(Info, "retry rtmp connection")
}
}
if err != nil {
return nil, err
}
s := &rtmpSender{
sess: sess,
url: url,
timeout: timeout,
retries: retries,
log: log,
}
return s, nil
}
func (s *rtmpSender) load(c *ring.Chunk) error {
s.chunk = c
return nil
}
func (s *rtmpSender) send() error {
_, err := s.chunk.WriteTo(s.sess)
return err
}
func (s *rtmpSender) release() {
s.chunk.Close()
s.chunk = nil
}
func (s *rtmpSender) restart() error {
err := s.sess.Close()
if err != nil {
return err
}
for n := 0; n < s.retries; n++ {
s.sess = rtmp.NewSession(s.url, s.timeout)
err = s.sess.StartSession()
if err == nil {
break
}
s.log(Error, err.Error())
s.sess.Close()
if n < s.retries-1 {
s.log(Info, "retry rtmp connection")
}
}
return err
}
func (s *rtmpSender) close() error {
return s.sess.Close()
}