From fd3202f99adc3ce22df9cf2d5269b40a983646df Mon Sep 17 00:00:00 2001 From: Russell Stanley Date: Wed, 16 Feb 2022 12:16:09 +1030 Subject: [PATCH] add concurrency to probe write function --- cmd/rv/probe.go | 46 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 36 insertions(+), 10 deletions(-) diff --git a/cmd/rv/probe.go b/cmd/rv/probe.go index 4ae0f33a..13ae1081 100644 --- a/cmd/rv/probe.go +++ b/cmd/rv/probe.go @@ -31,6 +31,8 @@ package main import ( "bytes" + "context" + "math" "os" "time" @@ -43,7 +45,7 @@ import ( // Misc constants. const ( maxImages = 1 // Max number of images read when evaluating turbidity. - turbidityTimeout = time.Minute + turbidityTimeout = 50 * time.Second ) // Turbidity sensor constants. @@ -61,7 +63,6 @@ type turbidityProbe struct { ts *turbidity.TurbiditySensor log logger.Logger buffer *bytes.Buffer - // ctx context.Context } // NewTurbidityProbe returns a new turbidity probe. @@ -71,7 +72,6 @@ func NewTurbidityProbe(log logger.Logger, delay time.Duration) (*turbidityProbe, tp.delay = delay tp.ticker = *time.NewTicker(delay) tp.buffer = bytes.NewBuffer(*new([]byte)) - // tp.ctx, _ = context.WithTimeout(context.Background(), turbidityTimeout) // Create the turbidity sensor. standard := gocv.IMRead("../../turbidity/images/default.jpg", gocv.IMReadGrayScale) @@ -88,13 +88,26 @@ func NewTurbidityProbe(log logger.Logger, delay time.Duration) (*turbidityProbe, // scores of a video to the the turbidity probe. func (tp *turbidityProbe) Write(p []byte) (int, error) { tp.buffer.Write(p) + ctx, cancel := context.WithTimeout(context.Background(), turbidityTimeout) + go func() { select { case <-tp.ticker.C: + done := make(chan bool) startTime := time.Now() - tp.turbidityCalculation(p) - tp.log.Log(logger.Debug, "finished turbidity calculation", "total duration (sec)", time.Since(startTime).Seconds()) - return + + tp.log.Log(logger.Debug, "beginning turbidity calculation") + go tp.turbidityCalculation(ctx, done) + + select { + case <-ctx.Done(): + tp.log.Debug("context deadline exceeded", "limit(sec)", time.Since(startTime).Seconds()) + cancel() + return + case <-done: + tp.log.Log(logger.Debug, "finished turbidity calculation", "total duration (sec)", time.Since(startTime).Seconds()) + return + } default: return } @@ -106,7 +119,7 @@ func (tp *turbidityProbe) Close() error { return nil } -func (tp *turbidityProbe) turbidityCalculation(p []byte) { +func (tp *turbidityProbe) turbidityCalculation(ctx context.Context, done chan<- bool) { var imgs []gocv.Mat img := gocv.NewMat() // Write byte array to a temp file. @@ -116,14 +129,21 @@ func (tp *turbidityProbe) turbidityCalculation(p []byte) { // TODO: Error handling. return } + defer os.Remove(file.Name()) - _, err = file.Write(tp.buffer.Bytes()) + startTime := time.Now() + _, err = file.Write(tp.buffer.Bytes()[:len(tp.buffer.Bytes())/int(math.Pow(2, 2))]) if err != nil { tp.log.Error("failed to write to temporary file", "error", err.Error()) // TODO: Error handling. return } + tp.log.Log(logger.Debug, "writing to temp file", "total duration (sec)", time.Since(startTime).Seconds()) + tp.log.Log(logger.Debug, "buffer", "size(bytes)", len(tp.buffer.Bytes())) + tp.buffer.Reset() + + startTime = time.Now() // Read the file and store each frame. vc, err := gocv.VideoCaptureFile(file.Name()) if err != nil { @@ -131,16 +151,20 @@ func (tp *turbidityProbe) turbidityCalculation(p []byte) { // TODO: Error handling. return } + tp.log.Log(logger.Debug, ".h264 decoded", "total duration (sec)", time.Since(startTime).Seconds()) + + startTime = time.Now() for vc.Read(&img) && len(imgs) < maxImages { imgs = append(imgs, img.Clone()) } if len(imgs) <= 0 { - tp.log.Log(logger.Warning, "no frames found", "error", err.Error()) + tp.log.Log(logger.Warning, "no frames found", "error") return } - tp.log.Log(logger.Debug, "found frames", "frames", len(imgs)) + tp.log.Log(logger.Debug, "read time", "total duration (sec)", time.Since(startTime).Seconds()) // Process video data to get saturation and contrast scores. + startTime = time.Now() res, err := tp.ts.Evaluate(imgs) if err != nil { tp.log.Error("evaluate failed", "error", err.Error()) @@ -149,5 +173,7 @@ func (tp *turbidityProbe) turbidityCalculation(p []byte) { tp.contrast = stat.Mean(res.Contrast, nil) tp.sharpness = stat.Mean(res.Sharpness, nil) } + tp.log.Log(logger.Debug, "evaluate complete", "total duration (sec)", time.Since(startTime).Seconds()) + done <- true return }