mirror of https://bitbucket.org/ausocean/av.git
Merged in revid-api-redesign-phase1 (pull request #130)
stream/mts: patch for revid.Start() no exit bug Approved-by: Alan Noble <anoble@gmail.com>
This commit is contained in:
commit
ea89e7a5af
|
@ -37,6 +37,7 @@ import (
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"bitbucket.org/ausocean/av/stream"
|
"bitbucket.org/ausocean/av/stream"
|
||||||
|
@ -119,7 +120,10 @@ type Revid struct {
|
||||||
bitrate int
|
bitrate int
|
||||||
|
|
||||||
// isRunning is a loaded and cocked foot-gun.
|
// isRunning is a loaded and cocked foot-gun.
|
||||||
|
mu sync.Mutex
|
||||||
isRunning bool
|
isRunning bool
|
||||||
|
|
||||||
|
err chan error
|
||||||
}
|
}
|
||||||
|
|
||||||
// packer takes data segments and packs them into clips
|
// packer takes data segments and packs them into clips
|
||||||
|
@ -176,16 +180,35 @@ func (p *packer) Write(frame []byte) (int, error) {
|
||||||
// New returns a pointer to a new Revid with the desired configuration, and/or
|
// New returns a pointer to a new Revid with the desired configuration, and/or
|
||||||
// an error if construction of the new instance was not successful.
|
// an error if construction of the new instance was not successful.
|
||||||
func New(c Config, ns *netsender.Sender) (*Revid, error) {
|
func New(c Config, ns *netsender.Sender) (*Revid, error) {
|
||||||
r := Revid{ns: ns}
|
r := Revid{ns: ns, err: make(chan error)}
|
||||||
r.buffer = ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout)
|
r.buffer = ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout)
|
||||||
r.packer.owner = &r
|
r.packer.owner = &r
|
||||||
err := r.reset(c)
|
err := r.reset(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
go r.handleErrors()
|
||||||
return &r, nil
|
return &r, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(Saxon): put more thought into error severity.
|
||||||
|
func (r *Revid) handleErrors() {
|
||||||
|
for {
|
||||||
|
err := <-r.err
|
||||||
|
if err != nil {
|
||||||
|
r.config.Logger.Log(logger.Error, pkg+"async error", "error", err.Error())
|
||||||
|
err = r.Stop()
|
||||||
|
if err != nil {
|
||||||
|
r.config.Logger.Log(logger.Fatal, pkg+"failed to stop", "error", err.Error())
|
||||||
|
}
|
||||||
|
err = r.Start()
|
||||||
|
if err != nil {
|
||||||
|
r.config.Logger.Log(logger.Fatal, pkg+"failed to restart", "error", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Bitrate returns the result of the most recent bitrate check.
|
// Bitrate returns the result of the most recent bitrate check.
|
||||||
func (r *Revid) Bitrate() int {
|
func (r *Revid) Bitrate() int {
|
||||||
return r.bitrate
|
return r.bitrate
|
||||||
|
@ -295,20 +318,30 @@ func (r *Revid) reset(config Config) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsRunning returns whether the receiver is running.
|
// IsRunning returns true if revid is running.
|
||||||
func (r *Revid) IsRunning() bool {
|
func (r *Revid) IsRunning() bool {
|
||||||
return r.isRunning
|
r.mu.Lock()
|
||||||
|
ret := r.isRunning
|
||||||
|
r.mu.Unlock()
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
|
||||||
|
// setIsRunning sets revid.isRunning using b.
|
||||||
|
func (r *Revid) setIsRunning(b bool) {
|
||||||
|
r.mu.Lock()
|
||||||
|
r.isRunning = b
|
||||||
|
r.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start invokes a Revid to start processing video from a defined input
|
// Start invokes a Revid to start processing video from a defined input
|
||||||
// and packetising (if theres packetization) to a defined output.
|
// and packetising (if theres packetization) to a defined output.
|
||||||
func (r *Revid) Start() error {
|
func (r *Revid) Start() error {
|
||||||
if r.isRunning {
|
if r.IsRunning() {
|
||||||
return errors.New(pkg + "start called but revid is already running")
|
return errors.New(pkg + "start called but revid is already running")
|
||||||
}
|
}
|
||||||
r.config.Logger.Log(logger.Info, pkg+"starting Revid")
|
r.config.Logger.Log(logger.Info, pkg+"starting Revid")
|
||||||
r.config.Logger.Log(logger.Debug, pkg+"setting up output")
|
r.config.Logger.Log(logger.Debug, pkg+"setting up output")
|
||||||
r.isRunning = true
|
r.setIsRunning(true)
|
||||||
r.config.Logger.Log(logger.Info, pkg+"starting output routine")
|
r.config.Logger.Log(logger.Info, pkg+"starting output routine")
|
||||||
go r.outputClips()
|
go r.outputClips()
|
||||||
r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content")
|
r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content")
|
||||||
|
@ -318,12 +351,12 @@ func (r *Revid) Start() error {
|
||||||
|
|
||||||
// Stop halts any processing of video data from a camera or file
|
// Stop halts any processing of video data from a camera or file
|
||||||
func (r *Revid) Stop() error {
|
func (r *Revid) Stop() error {
|
||||||
if !r.isRunning {
|
if !r.IsRunning() {
|
||||||
return errors.New(pkg + "stop called but revid is already stopped")
|
return errors.New(pkg + "stop called but revid is already stopped")
|
||||||
}
|
}
|
||||||
|
|
||||||
r.config.Logger.Log(logger.Info, pkg+"stopping revid")
|
r.config.Logger.Log(logger.Info, pkg+"stopping revid")
|
||||||
r.isRunning = false
|
r.setIsRunning(false)
|
||||||
|
|
||||||
r.config.Logger.Log(logger.Info, pkg+"killing input proccess")
|
r.config.Logger.Log(logger.Info, pkg+"killing input proccess")
|
||||||
// If a cmd process is running, we kill!
|
// If a cmd process is running, we kill!
|
||||||
|
@ -339,7 +372,7 @@ func (r *Revid) outputClips() {
|
||||||
lastTime := time.Now()
|
lastTime := time.Now()
|
||||||
var count int
|
var count int
|
||||||
loop:
|
loop:
|
||||||
for r.isRunning {
|
for r.IsRunning() {
|
||||||
// If the ring buffer has something we can read and send off
|
// If the ring buffer has something we can read and send off
|
||||||
chunk, err := r.buffer.Next(readTimeout)
|
chunk, err := r.buffer.Next(readTimeout)
|
||||||
switch err {
|
switch err {
|
||||||
|
@ -383,7 +416,7 @@ loop:
|
||||||
err = rs.restart()
|
err = rs.restart()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.config.Logger.Log(logger.Error, pkg+"failed to restart rtmp session", "error", err.Error())
|
r.config.Logger.Log(logger.Error, pkg+"failed to restart rtmp session", "error", err.Error())
|
||||||
r.isRunning = false
|
r.setIsRunning(false)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -474,11 +507,8 @@ func (r *Revid) startRaspivid() error {
|
||||||
r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error())
|
r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
r.config.Logger.Log(logger.Info, pkg+"reading camera data")
|
go r.processFrom(stdout, 0)
|
||||||
delay := time.Second / time.Duration(r.config.FrameRate)
|
return nil
|
||||||
err = r.lexTo(r.encoder, stdout, delay)
|
|
||||||
r.config.Logger.Log(logger.Info, pkg+"finished reading camera data")
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Revid) startV4L() error {
|
func (r *Revid) startV4L() error {
|
||||||
|
@ -514,7 +544,6 @@ func (r *Revid) startV4L() error {
|
||||||
r.config.Logger.Log(logger.Info, pkg+"ffmpeg args", "args", strings.Join(args, " "))
|
r.config.Logger.Log(logger.Info, pkg+"ffmpeg args", "args", strings.Join(args, " "))
|
||||||
r.cmd = exec.Command("ffmpeg", args...)
|
r.cmd = exec.Command("ffmpeg", args...)
|
||||||
|
|
||||||
delay := time.Second / time.Duration(r.config.FrameRate)
|
|
||||||
stdout, err := r.cmd.StdoutPipe()
|
stdout, err := r.cmd.StdoutPipe()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -526,15 +555,12 @@ func (r *Revid) startV4L() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
r.config.Logger.Log(logger.Info, pkg+"reading camera data")
|
go r.processFrom(stdout, time.Duration(0))
|
||||||
err = r.lexTo(r.encoder, stdout, delay)
|
return nil
|
||||||
r.config.Logger.Log(logger.Info, pkg+"finished reading camera data")
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// setupInputForFile sets things up for getting input from a file
|
// setupInputForFile sets things up for getting input from a file
|
||||||
func (r *Revid) setupInputForFile() error {
|
func (r *Revid) setupInputForFile() error {
|
||||||
delay := time.Second / time.Duration(r.config.FrameRate)
|
|
||||||
f, err := os.Open(r.config.InputFileName)
|
f, err := os.Open(r.config.InputFileName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.config.Logger.Log(logger.Error, err.Error())
|
r.config.Logger.Log(logger.Error, err.Error())
|
||||||
|
@ -544,5 +570,12 @@ func (r *Revid) setupInputForFile() error {
|
||||||
defer f.Close()
|
defer f.Close()
|
||||||
|
|
||||||
// TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop.
|
// TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop.
|
||||||
return r.lexTo(r.encoder, f, delay)
|
go r.processFrom(f, time.Second/time.Duration(r.config.FrameRate))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Revid) processFrom(read io.Reader, delay time.Duration) {
|
||||||
|
r.config.Logger.Log(logger.Info, pkg+"reading input data")
|
||||||
|
r.err <- r.lexTo(r.encoder, read, delay)
|
||||||
|
r.config.Logger.Log(logger.Info, pkg+"finished reading input data")
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue