diff --git a/revid/revid.go b/revid/revid.go index 6d52be59..f7313b57 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -37,6 +37,7 @@ import ( "os/exec" "strconv" "strings" + "sync" "time" "bitbucket.org/ausocean/av/stream" @@ -121,6 +122,8 @@ type Revid struct { // isRunning is a loaded and cocked foot-gun. isRunning bool + mu sync.Mutex + err chan error } @@ -189,6 +192,7 @@ func New(c Config, ns *netsender.Sender) (*Revid, error) { return &r, nil } +// TODO: put more thought into error severity func (r *Revid) handleErrors() { for { err := <-r.err @@ -317,18 +321,21 @@ func (r *Revid) reset(config Config) error { // IsRunning returns whether the receiver is running. func (r *Revid) IsRunning() bool { - return r.isRunning + r.mu.Lock() + ret := r.isRunning + r.mu.Unlock() + return ret } // Start invokes a Revid to start processing video from a defined input // and packetising (if theres packetization) to a defined output. func (r *Revid) Start() error { - if r.isRunning { + if r.IsRunning() { return errors.New(pkg + "start called but revid is already running") } r.config.Logger.Log(logger.Info, pkg+"starting Revid") r.config.Logger.Log(logger.Debug, pkg+"setting up output") - r.isRunning = true + r.setIsRunning(true) r.config.Logger.Log(logger.Info, pkg+"starting output routine") go r.outputClips() r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content") @@ -338,12 +345,12 @@ func (r *Revid) Start() error { // Stop halts any processing of video data from a camera or file func (r *Revid) Stop() error { - if !r.isRunning { + if !r.IsRunning() { return errors.New(pkg + "stop called but revid is already stopped") } r.config.Logger.Log(logger.Info, pkg+"stopping revid") - r.isRunning = false + r.setIsRunning(false) r.config.Logger.Log(logger.Info, pkg+"killing input proccess") // If a cmd process is running, we kill! @@ -359,7 +366,7 @@ func (r *Revid) outputClips() { lastTime := time.Now() var count int loop: - for r.isRunning { + for r.IsRunning() { // If the ring buffer has something we can read and send off chunk, err := r.buffer.Next(readTimeout) switch err { @@ -403,7 +410,7 @@ loop: err = rs.restart() if err != nil { r.config.Logger.Log(logger.Error, pkg+"failed to restart rtmp session", "error", err.Error()) - r.isRunning = false + r.setIsRunning(false) return } @@ -494,10 +501,16 @@ func (r *Revid) startRaspivid() error { r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error()) } - go r.transcode(stdout, time.Duration(0)) + go r.processFrom(stdout, time.Duration(0)) return nil } +func (r *Revid) setIsRunning(b bool) { + r.mu.Lock() + r.isRunning = b + r.mu.Unlock() +} + func (r *Revid) startV4L() error { const defaultVideo = "/dev/video0" @@ -542,7 +555,7 @@ func (r *Revid) startV4L() error { return err } - go r.transcode(stdout, time.Duration(0)) + go r.processFrom(stdout, time.Duration(0)) return nil } @@ -557,11 +570,11 @@ func (r *Revid) setupInputForFile() error { defer f.Close() // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. - go r.transcode(f, time.Second/time.Duration(r.config.FrameRate)) + go r.processFrom(f, time.Second/time.Duration(r.config.FrameRate)) return nil } -func (r *Revid) transcode(read io.Reader, delay time.Duration) { +func (r *Revid) processFrom(read io.Reader, delay time.Duration) { r.config.Logger.Log(logger.Info, pkg+"reading input data") r.err <- r.lexTo(r.encoder, read, delay) r.config.Logger.Log(logger.Info, pkg+"finished reading input data")