revid: renamed transcode to processFrom. Using mutex for isRunning flag. Created setIsRunning func to set state of isRunning.

This commit is contained in:
saxon 2019-02-01 09:47:31 +10:30
parent 051263c144
commit d26aa8643a
1 changed files with 24 additions and 11 deletions

View File

@ -37,6 +37,7 @@ import (
"os/exec" "os/exec"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
"bitbucket.org/ausocean/av/stream" "bitbucket.org/ausocean/av/stream"
@ -121,6 +122,8 @@ type Revid struct {
// isRunning is a loaded and cocked foot-gun. // isRunning is a loaded and cocked foot-gun.
isRunning bool isRunning bool
mu sync.Mutex
err chan error err chan error
} }
@ -189,6 +192,7 @@ func New(c Config, ns *netsender.Sender) (*Revid, error) {
return &r, nil return &r, nil
} }
// TODO: put more thought into error severity
func (r *Revid) handleErrors() { func (r *Revid) handleErrors() {
for { for {
err := <-r.err err := <-r.err
@ -317,18 +321,21 @@ func (r *Revid) reset(config Config) error {
// IsRunning returns whether the receiver is running. // IsRunning returns whether the receiver is running.
func (r *Revid) IsRunning() bool { func (r *Revid) IsRunning() bool {
return r.isRunning r.mu.Lock()
ret := r.isRunning
r.mu.Unlock()
return ret
} }
// Start invokes a Revid to start processing video from a defined input // Start invokes a Revid to start processing video from a defined input
// and packetising (if theres packetization) to a defined output. // and packetising (if theres packetization) to a defined output.
func (r *Revid) Start() error { func (r *Revid) Start() error {
if r.isRunning { if r.IsRunning() {
return errors.New(pkg + "start called but revid is already running") return errors.New(pkg + "start called but revid is already running")
} }
r.config.Logger.Log(logger.Info, pkg+"starting Revid") r.config.Logger.Log(logger.Info, pkg+"starting Revid")
r.config.Logger.Log(logger.Debug, pkg+"setting up output") r.config.Logger.Log(logger.Debug, pkg+"setting up output")
r.isRunning = true r.setIsRunning(true)
r.config.Logger.Log(logger.Info, pkg+"starting output routine") r.config.Logger.Log(logger.Info, pkg+"starting output routine")
go r.outputClips() go r.outputClips()
r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content") r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content")
@ -338,12 +345,12 @@ func (r *Revid) Start() error {
// Stop halts any processing of video data from a camera or file // Stop halts any processing of video data from a camera or file
func (r *Revid) Stop() error { func (r *Revid) Stop() error {
if !r.isRunning { if !r.IsRunning() {
return errors.New(pkg + "stop called but revid is already stopped") return errors.New(pkg + "stop called but revid is already stopped")
} }
r.config.Logger.Log(logger.Info, pkg+"stopping revid") r.config.Logger.Log(logger.Info, pkg+"stopping revid")
r.isRunning = false r.setIsRunning(false)
r.config.Logger.Log(logger.Info, pkg+"killing input proccess") r.config.Logger.Log(logger.Info, pkg+"killing input proccess")
// If a cmd process is running, we kill! // If a cmd process is running, we kill!
@ -359,7 +366,7 @@ func (r *Revid) outputClips() {
lastTime := time.Now() lastTime := time.Now()
var count int var count int
loop: loop:
for r.isRunning { for r.IsRunning() {
// If the ring buffer has something we can read and send off // If the ring buffer has something we can read and send off
chunk, err := r.buffer.Next(readTimeout) chunk, err := r.buffer.Next(readTimeout)
switch err { switch err {
@ -403,7 +410,7 @@ loop:
err = rs.restart() err = rs.restart()
if err != nil { if err != nil {
r.config.Logger.Log(logger.Error, pkg+"failed to restart rtmp session", "error", err.Error()) r.config.Logger.Log(logger.Error, pkg+"failed to restart rtmp session", "error", err.Error())
r.isRunning = false r.setIsRunning(false)
return return
} }
@ -494,10 +501,16 @@ func (r *Revid) startRaspivid() error {
r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error()) r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error())
} }
go r.transcode(stdout, time.Duration(0)) go r.processFrom(stdout, time.Duration(0))
return nil return nil
} }
func (r *Revid) setIsRunning(b bool) {
r.mu.Lock()
r.isRunning = b
r.mu.Unlock()
}
func (r *Revid) startV4L() error { func (r *Revid) startV4L() error {
const defaultVideo = "/dev/video0" const defaultVideo = "/dev/video0"
@ -542,7 +555,7 @@ func (r *Revid) startV4L() error {
return err return err
} }
go r.transcode(stdout, time.Duration(0)) go r.processFrom(stdout, time.Duration(0))
return nil return nil
} }
@ -557,11 +570,11 @@ func (r *Revid) setupInputForFile() error {
defer f.Close() defer f.Close()
// TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop.
go r.transcode(f, time.Second/time.Duration(r.config.FrameRate)) go r.processFrom(f, time.Second/time.Duration(r.config.FrameRate))
return nil return nil
} }
func (r *Revid) transcode(read io.Reader, delay time.Duration) { func (r *Revid) processFrom(read io.Reader, delay time.Duration) {
r.config.Logger.Log(logger.Info, pkg+"reading input data") r.config.Logger.Log(logger.Info, pkg+"reading input data")
r.err <- r.lexTo(r.encoder, read, delay) r.err <- r.lexTo(r.encoder, read, delay)
r.config.Logger.Log(logger.Info, pkg+"finished reading input data") r.config.Logger.Log(logger.Info, pkg+"finished reading input data")