From ed4d97f893414b3d1f19edc305e4e82c382eaf73 Mon Sep 17 00:00:00 2001 From: saxon Date: Thu, 31 Jan 2019 15:03:50 +1030 Subject: [PATCH 1/8] stream/mts: patch for revid.Start() no exit bug --- revid/revid.go | 51 +++++++++++++++++++++++++++++++++++++------------- 1 file changed, 38 insertions(+), 13 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index 9de472e1..df1e111a 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -120,6 +120,8 @@ type Revid struct { // isRunning is a loaded and cocked foot-gun. isRunning bool + + err chan error } // packer takes data segments and packs them into clips @@ -176,13 +178,29 @@ func (p *packer) Write(frame []byte) (int, error) { // New returns a pointer to a new Revid with the desired configuration, and/or // an error if construction of the new instance was not successful. func New(c Config, ns *netsender.Sender) (*Revid, error) { - r := Revid{ns: ns} + r := Revid{ns: ns, err: make(chan error)} r.buffer = ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout) r.packer.owner = &r err := r.reset(c) if err != nil { return nil, err } + go func() { + for { + err := <-r.err + if err != nil { + r.config.Logger.Log(logger.Error, pkg+"async error", "error", err.Error()) + err = r.Stop() + if err != nil { + r.config.Logger.Log(logger.Fatal, pkg+"failed to stop", "error", err.Error()) + } + r.Start() + if err != nil { + r.config.Logger.Log(logger.Fatal, pkg+"failed to restart", "error", err.Error()) + } + } + } + }() return &r, nil } @@ -312,8 +330,8 @@ func (r *Revid) Start() error { r.config.Logger.Log(logger.Info, pkg+"starting output routine") go r.outputClips() r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content") - err := r.setupInput() - return err + go r.setupInput() + return nil } // Stop halts any processing of video data from a camera or file @@ -474,11 +492,13 @@ func (r *Revid) startRaspivid() error { r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error()) } - r.config.Logger.Log(logger.Info, pkg+"reading camera data") - delay := time.Second / time.Duration(r.config.FrameRate) - err = r.lexTo(r.encoder, stdout, delay) - r.config.Logger.Log(logger.Info, pkg+"finished reading camera data") - return err + go func() { + r.config.Logger.Log(logger.Info, pkg+"reading camera data") + delay := time.Second / time.Duration(r.config.FrameRate) + r.err <- r.lexTo(r.encoder, stdout, delay) + r.config.Logger.Log(logger.Info, pkg+"finished reading camera data") + }() + return nil } func (r *Revid) startV4L() error { @@ -526,10 +546,12 @@ func (r *Revid) startV4L() error { return err } - r.config.Logger.Log(logger.Info, pkg+"reading camera data") - err = r.lexTo(r.encoder, stdout, delay) - r.config.Logger.Log(logger.Info, pkg+"finished reading camera data") - return err + go func() { + r.config.Logger.Log(logger.Info, pkg+"reading camera data") + r.err <- r.lexTo(r.encoder, stdout, delay) + r.config.Logger.Log(logger.Info, pkg+"finished reading camera data") + }() + return nil } // setupInputForFile sets things up for getting input from a file @@ -544,5 +566,8 @@ func (r *Revid) setupInputForFile() error { defer f.Close() // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. - return r.lexTo(r.encoder, f, delay) + go func() { + r.err <- r.lexTo(r.encoder, f, delay) + }() + return nil } From d53eafe215f430a480199ec9604b12b0276f83a3 Mon Sep 17 00:00:00 2001 From: saxon Date: Thu, 31 Jan 2019 15:15:38 +1030 Subject: [PATCH 2/8] revid/revid.go: not running r.setupInput() as routine - now getting error an returning --- revid/revid.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index df1e111a..febb05c1 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -330,8 +330,8 @@ func (r *Revid) Start() error { r.config.Logger.Log(logger.Info, pkg+"starting output routine") go r.outputClips() r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content") - go r.setupInput() - return nil + err := r.setupInput() + return err } // Stop halts any processing of video data from a camera or file From e18b1f51f0d1557d0a5a56cd119bf2f31c9c5467 Mon Sep 17 00:00:00 2001 From: saxon Date: Thu, 31 Jan 2019 15:49:44 +1030 Subject: [PATCH 3/8] revid/revid.go: capture error from r.Start() in error handling routine --- revid/revid.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/revid/revid.go b/revid/revid.go index febb05c1..2b146bf4 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -194,7 +194,7 @@ func New(c Config, ns *netsender.Sender) (*Revid, error) { if err != nil { r.config.Logger.Log(logger.Fatal, pkg+"failed to stop", "error", err.Error()) } - r.Start() + err = r.Start() if err != nil { r.config.Logger.Log(logger.Fatal, pkg+"failed to restart", "error", err.Error()) } From 1e307fc37b4b971310466f5f204d3e08c24d539d Mon Sep 17 00:00:00 2001 From: saxon Date: Thu, 31 Jan 2019 19:42:20 +1030 Subject: [PATCH 4/8] revid/revid.go: made routines named rather than anonymous --- revid/revid.go | 55 +++++++++++++++++++++++--------------------------- 1 file changed, 25 insertions(+), 30 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index 2b146bf4..c8f6d217 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -185,23 +185,25 @@ func New(c Config, ns *netsender.Sender) (*Revid, error) { if err != nil { return nil, err } - go func() { - for { - err := <-r.err + go r.handleErrors() + return &r, nil +} + +func (r *Revid) handleErrors() { + for { + err := <-r.err + if err != nil { + r.config.Logger.Log(logger.Error, pkg+"async error", "error", err.Error()) + err = r.Stop() if err != nil { - r.config.Logger.Log(logger.Error, pkg+"async error", "error", err.Error()) - err = r.Stop() - if err != nil { - r.config.Logger.Log(logger.Fatal, pkg+"failed to stop", "error", err.Error()) - } - err = r.Start() - if err != nil { - r.config.Logger.Log(logger.Fatal, pkg+"failed to restart", "error", err.Error()) - } + r.config.Logger.Log(logger.Fatal, pkg+"failed to stop", "error", err.Error()) + } + err = r.Start() + if err != nil { + r.config.Logger.Log(logger.Fatal, pkg+"failed to restart", "error", err.Error()) } } - }() - return &r, nil + } } // Bitrate returns the result of the most recent bitrate check. @@ -492,12 +494,7 @@ func (r *Revid) startRaspivid() error { r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error()) } - go func() { - r.config.Logger.Log(logger.Info, pkg+"reading camera data") - delay := time.Second / time.Duration(r.config.FrameRate) - r.err <- r.lexTo(r.encoder, stdout, delay) - r.config.Logger.Log(logger.Info, pkg+"finished reading camera data") - }() + go r.lex(stdout, time.Duration(0)) return nil } @@ -534,7 +531,6 @@ func (r *Revid) startV4L() error { r.config.Logger.Log(logger.Info, pkg+"ffmpeg args", "args", strings.Join(args, " ")) r.cmd = exec.Command("ffmpeg", args...) - delay := time.Second / time.Duration(r.config.FrameRate) stdout, err := r.cmd.StdoutPipe() if err != nil { return err @@ -546,17 +542,12 @@ func (r *Revid) startV4L() error { return err } - go func() { - r.config.Logger.Log(logger.Info, pkg+"reading camera data") - r.err <- r.lexTo(r.encoder, stdout, delay) - r.config.Logger.Log(logger.Info, pkg+"finished reading camera data") - }() + go r.lex(stdout, time.Second/time.Duration(r.config.FrameRate)) return nil } // setupInputForFile sets things up for getting input from a file func (r *Revid) setupInputForFile() error { - delay := time.Second / time.Duration(r.config.FrameRate) f, err := os.Open(r.config.InputFileName) if err != nil { r.config.Logger.Log(logger.Error, err.Error()) @@ -566,8 +557,12 @@ func (r *Revid) setupInputForFile() error { defer f.Close() // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. - go func() { - r.err <- r.lexTo(r.encoder, f, delay) - }() + go r.lex(f, time.Second/time.Duration(r.config.FrameRate)) return nil } + +func (r *Revid) lex(read io.Reader, delay time.Duration) { + r.config.Logger.Log(logger.Info, pkg+"reading input data") + r.err <- r.lexTo(r.encoder, read, delay) + r.config.Logger.Log(logger.Info, pkg+"finished reading input data") +} From 051263c144c195d15bea214327bbf259385fdc3c Mon Sep 17 00:00:00 2001 From: saxon Date: Thu, 31 Jan 2019 19:58:25 +1030 Subject: [PATCH 5/8] revid/revid.go: revid.lex to revid.transcode --- revid/revid.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index c8f6d217..6d52be59 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -494,7 +494,7 @@ func (r *Revid) startRaspivid() error { r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error()) } - go r.lex(stdout, time.Duration(0)) + go r.transcode(stdout, time.Duration(0)) return nil } @@ -542,7 +542,7 @@ func (r *Revid) startV4L() error { return err } - go r.lex(stdout, time.Second/time.Duration(r.config.FrameRate)) + go r.transcode(stdout, time.Duration(0)) return nil } @@ -557,11 +557,11 @@ func (r *Revid) setupInputForFile() error { defer f.Close() // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. - go r.lex(f, time.Second/time.Duration(r.config.FrameRate)) + go r.transcode(f, time.Second/time.Duration(r.config.FrameRate)) return nil } -func (r *Revid) lex(read io.Reader, delay time.Duration) { +func (r *Revid) transcode(read io.Reader, delay time.Duration) { r.config.Logger.Log(logger.Info, pkg+"reading input data") r.err <- r.lexTo(r.encoder, read, delay) r.config.Logger.Log(logger.Info, pkg+"finished reading input data") From d26aa8643aec5661ceca0ccc3866b1f090691dd7 Mon Sep 17 00:00:00 2001 From: saxon Date: Fri, 1 Feb 2019 09:47:31 +1030 Subject: [PATCH 6/8] revid: renamed transcode to processFrom. Using mutex for isRunning flag. Created setIsRunning func to set state of isRunning. --- revid/revid.go | 35 ++++++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index 6d52be59..f7313b57 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -37,6 +37,7 @@ import ( "os/exec" "strconv" "strings" + "sync" "time" "bitbucket.org/ausocean/av/stream" @@ -121,6 +122,8 @@ type Revid struct { // isRunning is a loaded and cocked foot-gun. isRunning bool + mu sync.Mutex + err chan error } @@ -189,6 +192,7 @@ func New(c Config, ns *netsender.Sender) (*Revid, error) { return &r, nil } +// TODO: put more thought into error severity func (r *Revid) handleErrors() { for { err := <-r.err @@ -317,18 +321,21 @@ func (r *Revid) reset(config Config) error { // IsRunning returns whether the receiver is running. func (r *Revid) IsRunning() bool { - return r.isRunning + r.mu.Lock() + ret := r.isRunning + r.mu.Unlock() + return ret } // Start invokes a Revid to start processing video from a defined input // and packetising (if theres packetization) to a defined output. func (r *Revid) Start() error { - if r.isRunning { + if r.IsRunning() { return errors.New(pkg + "start called but revid is already running") } r.config.Logger.Log(logger.Info, pkg+"starting Revid") r.config.Logger.Log(logger.Debug, pkg+"setting up output") - r.isRunning = true + r.setIsRunning(true) r.config.Logger.Log(logger.Info, pkg+"starting output routine") go r.outputClips() r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content") @@ -338,12 +345,12 @@ func (r *Revid) Start() error { // Stop halts any processing of video data from a camera or file func (r *Revid) Stop() error { - if !r.isRunning { + if !r.IsRunning() { return errors.New(pkg + "stop called but revid is already stopped") } r.config.Logger.Log(logger.Info, pkg+"stopping revid") - r.isRunning = false + r.setIsRunning(false) r.config.Logger.Log(logger.Info, pkg+"killing input proccess") // If a cmd process is running, we kill! @@ -359,7 +366,7 @@ func (r *Revid) outputClips() { lastTime := time.Now() var count int loop: - for r.isRunning { + for r.IsRunning() { // If the ring buffer has something we can read and send off chunk, err := r.buffer.Next(readTimeout) switch err { @@ -403,7 +410,7 @@ loop: err = rs.restart() if err != nil { r.config.Logger.Log(logger.Error, pkg+"failed to restart rtmp session", "error", err.Error()) - r.isRunning = false + r.setIsRunning(false) return } @@ -494,10 +501,16 @@ func (r *Revid) startRaspivid() error { r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error()) } - go r.transcode(stdout, time.Duration(0)) + go r.processFrom(stdout, time.Duration(0)) return nil } +func (r *Revid) setIsRunning(b bool) { + r.mu.Lock() + r.isRunning = b + r.mu.Unlock() +} + func (r *Revid) startV4L() error { const defaultVideo = "/dev/video0" @@ -542,7 +555,7 @@ func (r *Revid) startV4L() error { return err } - go r.transcode(stdout, time.Duration(0)) + go r.processFrom(stdout, time.Duration(0)) return nil } @@ -557,11 +570,11 @@ func (r *Revid) setupInputForFile() error { defer f.Close() // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. - go r.transcode(f, time.Second/time.Duration(r.config.FrameRate)) + go r.processFrom(f, time.Second/time.Duration(r.config.FrameRate)) return nil } -func (r *Revid) transcode(read io.Reader, delay time.Duration) { +func (r *Revid) processFrom(read io.Reader, delay time.Duration) { r.config.Logger.Log(logger.Info, pkg+"reading input data") r.err <- r.lexTo(r.encoder, read, delay) r.config.Logger.Log(logger.Info, pkg+"finished reading input data") From 9bddf343f589771be3f40e9ef6eebef0cbcd1de0 Mon Sep 17 00:00:00 2001 From: saxon Date: Fri, 1 Feb 2019 10:07:00 +1030 Subject: [PATCH 7/8] revid/revid.go: moved revid.mu declaration . Updated todo owner --- revid/revid.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index f7313b57..3e658312 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -120,10 +120,9 @@ type Revid struct { bitrate int // isRunning is a loaded and cocked foot-gun. + mu sync.Mutex isRunning bool - mu sync.Mutex - err chan error } @@ -192,7 +191,7 @@ func New(c Config, ns *netsender.Sender) (*Revid, error) { return &r, nil } -// TODO: put more thought into error severity +// TODO(Saxon): put more thought into error severity. func (r *Revid) handleErrors() { for { err := <-r.err @@ -319,7 +318,7 @@ func (r *Revid) reset(config Config) error { return nil } -// IsRunning returns whether the receiver is running. +// IsRunning returns true if revid is running. func (r *Revid) IsRunning() bool { r.mu.Lock() ret := r.isRunning @@ -327,6 +326,13 @@ func (r *Revid) IsRunning() bool { return ret } +// setIsRunning sets revid.isRunning using b. +func (r *Revid) setIsRunning(b bool) { + r.mu.Lock() + r.isRunning = b + r.mu.Unlock() +} + // Start invokes a Revid to start processing video from a defined input // and packetising (if theres packetization) to a defined output. func (r *Revid) Start() error { @@ -505,12 +511,6 @@ func (r *Revid) startRaspivid() error { return nil } -func (r *Revid) setIsRunning(b bool) { - r.mu.Lock() - r.isRunning = b - r.mu.Unlock() -} - func (r *Revid) startV4L() error { const defaultVideo = "/dev/video0" From c44d6bbfd3187cd51440186e1b571d9355dc74c7 Mon Sep 17 00:00:00 2001 From: saxon Date: Fri, 1 Feb 2019 10:08:49 +1030 Subject: [PATCH 8/8] revid/revid.go: not during time.Duration conversion --- revid/revid.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/revid/revid.go b/revid/revid.go index 3e658312..6833ec2b 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -507,7 +507,7 @@ func (r *Revid) startRaspivid() error { r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error()) } - go r.processFrom(stdout, time.Duration(0)) + go r.processFrom(stdout, 0) return nil }