diff --git a/cmd/rv/main.go b/cmd/rv/main.go index 85a14ebe..2f8c4338 100644 --- a/cmd/rv/main.go +++ b/cmd/rv/main.go @@ -70,7 +70,6 @@ import ( "bitbucket.org/ausocean/av/revid/config" "bitbucket.org/ausocean/iot/pi/netlogger" "bitbucket.org/ausocean/iot/pi/netsender" - "bitbucket.org/ausocean/iot/pi/sds" "bitbucket.org/ausocean/utils/logger" ) @@ -147,10 +146,14 @@ func main() { rv *revid.Revid p *turbidityProbe ) + p, err := NewTurbidityProbe(*log, 60*time.Second) + if err != nil { + log.Log(logger.Fatal, "could not create new turbidity probe", "error", err.Error()) + } log.Log(logger.Debug, "initialising netsender client") - ns, err := netsender.New(log, nil, readPin(p, rv), nil, createVarMap()) + ns, err := netsender.New(log, nil, readPin(p, rv, log), nil, createVarMap()) if err != nil { log.Log(logger.Fatal, pkg+"could not initialise netsender client: "+err.Error()) } @@ -299,7 +302,7 @@ func sleep(ns *netsender.Sender, l *logger.Logger) { // readPin provides a callback function of consistent signature for use by // netsender to retrieve software defined pin values e.g. revid bitrate. -func readPin(p *turbidityProbe, rv *revid.Revid) func(pin *netsender.Pin) error { +func readPin(p *turbidityProbe, rv *revid.Revid, l *logger.Logger) func(pin *netsender.Pin) error { return func(pin *netsender.Pin) error { switch { case pin.Name == bitratePin: @@ -307,16 +310,16 @@ func readPin(p *turbidityProbe, rv *revid.Revid) func(pin *netsender.Pin) error if rv != nil { pin.Value = rv.Bitrate() } - case pin.Name[0] == 'X': - return sds.ReadSystem(pin) case pin.Name == sharpnessPin: pin.Value = -1 if p != nil { + l.Debug("setting sharpness value", "sharpness", p.sharpness*1000) pin.Value = int(p.sharpness * 1000) } case pin.Name == contrastPin: pin.Value = -1 if p != nil { + l.Debug("setting contrast pin", "contrast", p.contrast) pin.Value = int(p.contrast * 100) } default: diff --git a/cmd/rv/probe.go b/cmd/rv/probe.go index 3af817f6..9bda6931 100644 --- a/cmd/rv/probe.go +++ b/cmd/rv/probe.go @@ -30,35 +30,45 @@ LICENSE package main import ( + "bytes" + "errors" + "fmt" "os" "time" "gocv.io/x/gocv" "gonum.org/v1/gonum/stat" + "bitbucket.org/ausocean/av/codec/h264" "bitbucket.org/ausocean/av/turbidity" "bitbucket.org/ausocean/utils/logger" ) +// Misc constants. +const ( + maxImages = 1 // Max number of images read when evaluating turbidity. + bufferLimit = 20000 // 20KB + trimTolerance = 200 // Number of times trim can be called where no keyframe is found. +) + // Turbidity sensor constants. const ( - k1, k2 = 8, 8 // Block size, must be divisible by the size template with no remainder. + k1, k2 = 4, 4 // Block size, must be divisible by the size template with no remainder. filterSize = 3 // Sobel filter size. scale = 1.0 // Amount of scale applied to sobel filter values. alpha = 1.0 // Paramater for contrast equation. ) -// Misc constants. -const ( - maxImages = 10 // Max number of images read when evaluating turbidity. -) - +// turbidityProbe will hold the latest video data and calculate the sharpness and contrast scores. +// These scores will be sent to netreceiver based on the given delay. type turbidityProbe struct { sharpness, contrast float64 delay time.Duration ticker time.Ticker ts *turbidity.TurbiditySensor log logger.Logger + buffer *bytes.Buffer + trimCounter int } // NewTurbidityProbe returns a new turbidity probe. @@ -67,13 +77,14 @@ func NewTurbidityProbe(log logger.Logger, delay time.Duration) (*turbidityProbe, tp.log = log tp.delay = delay tp.ticker = *time.NewTicker(delay) + tp.buffer = bytes.NewBuffer(*new([]byte)) // Create the turbidity sensor. - standard := gocv.IMRead("../../turbidity/images/template.jpg", gocv.IMReadGrayScale) + standard := gocv.IMRead("../../turbidity/images/default.jpg", gocv.IMReadGrayScale) template := gocv.IMRead("../../turbidity/images/template.jpg", gocv.IMReadGrayScale) - ts, err := turbidity.NewTurbiditySensor(template, standard, k1, k2, filterSize, scale, alpha) + ts, err := turbidity.NewTurbiditySensor(template, standard, k1, k2, filterSize, scale, alpha, log) if err != nil { - log.Error("failed create turbidity sensor", "error", err.Error()) + return nil, fmt.Errorf("failed to create turbidity sensor: %w", err) } tp.ts = ts return tp, nil @@ -82,43 +93,46 @@ func NewTurbidityProbe(log logger.Logger, delay time.Duration) (*turbidityProbe, // Write, reads input h264 frames in the form of a byte stream and writes the the sharpness and contrast // scores of a video to the the turbidity probe. func (tp *turbidityProbe) Write(p []byte) (int, error) { - select { - case <-tp.ticker.C: - var imgs []gocv.Mat - img := gocv.NewMat() - - // Write byte array to a temp file. - file, err := os.CreateTemp("temp", "video*.h264") + if tp.buffer.Len() == 0 { + // The first entry in the buffer must be a keyframe to speed up decoding. + video, err := h264.Trim(p) if err != nil { - tp.log.Error("failed to create temp file", "error", err.Error()) - return 0, err - } - defer os.Remove(file.Name()) - n, err := file.Write(p) - if err != nil { - tp.log.Error("failed to write to temporary file", "error", err.Error()) - return n, err + tp.trimCounter++ + if tp.trimCounter >= trimTolerance { + return 0, fmt.Errorf("could not trim h264 within tolerance: %w", err) + } + return len(p), nil + } else { + tp.log.Log(logger.Debug, "trim successful", "keyframe error counter", tp.trimCounter) + tp.trimCounter = 0 } - // Read the file and store each frame. - vc, err := gocv.VideoCaptureFile(file.Name()) + n, err := tp.buffer.Write(video) if err != nil { - tp.log.Error("failed to open video file", "error", err.Error()) - return len(p), err + tp.buffer.Reset() + return 0, fmt.Errorf("could not write trimmed video to buffer: %w", err) } - for vc.Read(&img) && len(imgs) < maxImages { - imgs = append(imgs, img.Clone()) - } - - // Process video data to get saturation and contrast scores. - res, err := tp.ts.Evaluate(imgs) + tp.log.Log(logger.Debug, "video trimmed, write keyframe complete", "size(bytes)", n) + } else if tp.buffer.Len() < bufferLimit { + // Buffer size is limited to speed up decoding. + _, err := tp.buffer.Write(p) if err != nil { - tp.log.Error("evaluate failed", "errror", err.Error()) - return len(p), err + tp.buffer.Reset() + return 0, fmt.Errorf("could not write to buffer: %w", err) + } + } else { + // Buffer is large enough to begin turbidity calculation. + select { + case <-tp.ticker.C: + tp.log.Log(logger.Debug, "beginning turbidity calculation") + startTime := time.Now() + err := tp.turbidityCalculation() + if err != nil { + return 0, fmt.Errorf("could not calculate turbidity: %w", err) + } + tp.log.Log(logger.Debug, "finished turbidity calculation", "total duration (sec)", time.Since(startTime).Seconds()) + default: } - tp.contrast = stat.Mean(res.Contrast, nil) - tp.sharpness = stat.Mean(res.Sharpness, nil) - default: } return len(p), nil } @@ -126,3 +140,71 @@ func (tp *turbidityProbe) Write(p []byte) (int, error) { func (tp *turbidityProbe) Close() error { return nil } + +func (tp *turbidityProbe) turbidityCalculation() error { + var imgs []gocv.Mat + img := gocv.NewMat() + + // Write byte array to a temp file. + file, err := os.CreateTemp("temp", "video*.h264") + if err != nil { + return fmt.Errorf("failed to create temp file: %w", err) + } + tp.log.Log(logger.Debug, "writing to file", "buffer size(bytes)", tp.buffer.Len()) + + _, err = file.Write(tp.buffer.Bytes()) + if err != nil { + return fmt.Errorf("failed to write to temporary file: %w", err) + } + tp.log.Log(logger.Debug, "write to file success", "buffer size(bytes)", tp.buffer.Len()) + tp.buffer.Reset() + + // Open the video file. + startTime := time.Now() + vc, err := gocv.VideoCaptureFile(file.Name()) + if err != nil { + return fmt.Errorf("failed to open video file: %w", err) + } + tp.log.Log(logger.Debug, "video capture open", "total duration (sec)", time.Since(startTime).Seconds()) + + // Store each frame until maximum amount is reached. + startTime = time.Now() + for vc.Read(&img) && len(imgs) < maxImages { + imgs = append(imgs, img.Clone()) + } + if len(imgs) <= 0 { + return errors.New("no frames found") + } + tp.log.Log(logger.Debug, "read time", "total duration (sec)", time.Since(startTime).Seconds()) + + // Process video data to get saturation and contrast scores. + res, err := tp.ts.Evaluate(imgs) + if err != nil { + err_ := cleanUp(file.Name(), vc) + if err_ != nil { + return fmt.Errorf("could not clean up: %v, after evaluation error: %w", err_, err) + } + return fmt.Errorf("evaluation error: %w", err) + } + + tp.contrast = stat.Mean(res.Contrast, nil) + tp.sharpness = stat.Mean(res.Sharpness, nil) + + err = cleanUp(file.Name(), vc) + if err != nil { + return fmt.Errorf("could not clean up: %w", err) + } + return nil +} + +func cleanUp(file string, vc *gocv.VideoCapture) error { + err := os.Remove(file) + if err != nil { + return fmt.Errorf("could not remove temp file: %w", err) + } + err = vc.Close() + if err != nil { + return fmt.Errorf("could not close video capture device: %w", err) + } + return nil +} diff --git a/codec/h264/parse.go b/codec/h264/parse.go index accf68cf..f341517e 100644 --- a/codec/h264/parse.go +++ b/codec/h264/parse.go @@ -78,3 +78,33 @@ func (s *frameScanner) readByte() (b byte, ok bool) { s.off++ return b, true } + +// Trim will trim down a given byte stream of video data so that a key frame appears first. +func Trim(n []byte) ([]byte, error) { + sc := frameScanner{buf: n} + for { + b, ok := sc.readByte() + if !ok { + return nil, errNotEnoughBytes + } + for i := 1; b == 0x00 && i != 4; i++ { + b, ok = sc.readByte() + if !ok { + return nil, errNotEnoughBytes + } + if b != 0x01 || (i != 2 && i != 3) { + continue + } + + b, ok = sc.readByte() + if !ok { + return nil, errNotEnoughBytes + } + nalType := int(b & 0x1f) + if nalType == 7 { + sc.off = sc.off - 4 + return sc.buf[sc.off:], nil + } + } + } +} diff --git a/turbidity/images/default.jpg b/turbidity/images/default.jpg index f3716fba..579fd142 100644 Binary files a/turbidity/images/default.jpg and b/turbidity/images/default.jpg differ diff --git a/turbidity/images/template.jpg b/turbidity/images/template.jpg index d6165d76..3c577d1b 100644 Binary files a/turbidity/images/template.jpg and b/turbidity/images/template.jpg differ diff --git a/turbidity/turbidity.go b/turbidity/turbidity.go index 75eac3d6..8bbc4d6e 100644 --- a/turbidity/turbidity.go +++ b/turbidity/turbidity.go @@ -35,7 +35,9 @@ import ( "fmt" "image" "math" + "time" + "bitbucket.org/ausocean/utils/logger" "gocv.io/x/gocv" ) @@ -46,10 +48,11 @@ type TurbiditySensor struct { standard, standardCorners gocv.Mat k1, k2, sobelFilterSize int scale, alpha float64 + log logger.Logger } // NewTurbiditySensor returns a new TurbiditySensor. -func NewTurbiditySensor(template, standard gocv.Mat, k1, k2, sobelFilterSize int, scale, alpha float64) (*TurbiditySensor, error) { +func NewTurbiditySensor(template, standard gocv.Mat, k1, k2, sobelFilterSize int, scale, alpha float64, log logger.Logger) (*TurbiditySensor, error) { ts := new(TurbiditySensor) templateCorners := gocv.NewMat() standardCorners := gocv.NewMat() @@ -76,6 +79,7 @@ func NewTurbiditySensor(template, standard gocv.Mat, k1, k2, sobelFilterSize int ts.k1, ts.k2, ts.sobelFilterSize = k1, k2, sobelFilterSize ts.alpha, ts.scale = alpha, scale + ts.log = log return ts, nil } @@ -87,16 +91,24 @@ func (ts TurbiditySensor) Evaluate(imgs []gocv.Mat) (*Results, error) { } for i := range imgs { + timer := time.Now() marker, err := ts.transform(imgs[i]) if err != nil { return nil, fmt.Errorf("could not transform image: %d: %w", i, err) } - edge := ts.sobel(marker) + ts.log.Log(logger.Debug, "transform successful", "transform duration (sec)", time.Since(timer).Seconds()) + + timer = time.Now() + edge := ts.sobel(marker) + ts.log.Log(logger.Debug, "sobel filter successful", "sobel duration", time.Since(timer).Seconds()) + + timer = time.Now() sharpScore, contScore, err := ts.EvaluateImage(marker, edge) if err != nil { return result, err } + ts.log.Log(logger.Debug, "sharpness and contrast evaluation successful", "evaluation duration", time.Since(timer).Seconds()) result.Update(sharpScore, contScore, float64(i), i) } return result, nil @@ -180,7 +192,7 @@ func (ts TurbiditySensor) evaluateBlockAMEE(img gocv.Mat, xStart, yStart, xEnd, func (ts TurbiditySensor) transform(img gocv.Mat) (gocv.Mat, error) { out := gocv.NewMat() mask := gocv.NewMat() - imgCorners := gocv.NewMat() + imgCorners := ts.standardCorners const ( ransacThreshold = 3.0 // Maximum allowed reprojection error to treat a point pair as an inlier. maxIter = 2000 // The maximum number of RANSAC iterations. @@ -191,14 +203,12 @@ func (ts TurbiditySensor) transform(img gocv.Mat) (gocv.Mat, error) { return out, errors.New("image is empty, cannot transform") } // Check image for corners, if non can be found corners will be set to default value. - if !gocv.FindChessboardCorners(img, image.Pt(3, 3), &imgCorners, gocv.CalibCBFastCheck) { - imgCorners = ts.standardCorners - } + // if !gocv.FindChessboardCorners(img, image.Pt(3, 3), &imgCorners, gocv.CalibCBFastCheck) {} // Find and apply transformation. H := gocv.FindHomography(imgCorners, &ts.templateCorners, gocv.HomograpyMethodRANSAC, ransacThreshold, &mask, maxIter, confidence) gocv.WarpPerspective(img, &out, H, image.Pt(ts.template.Rows(), ts.template.Cols())) - + gocv.CvtColor(out, &out, gocv.ColorRGBToGray) return out, nil } diff --git a/turbidity/turbidity_test.go b/turbidity/turbidity_test.go index 79c4252a..4c97b65a 100644 --- a/turbidity/turbidity_test.go +++ b/turbidity/turbidity_test.go @@ -30,12 +30,15 @@ package turbidity import ( "fmt" + "io" "testing" + "bitbucket.org/ausocean/utils/logger" "gocv.io/x/gocv" "gonum.org/v1/gonum/stat" "gonum.org/v1/plot" "gonum.org/v1/plot/plotutil" + "gopkg.in/natefinch/lumberjack.v2" ) const ( @@ -44,15 +47,35 @@ const ( increment = 2.5 // Increment of the turbidity level. ) +// Logging configuration. +const ( + logPath = "/var/log/netsender/netsender.log" + logMaxSize = 500 // MB + logMaxBackup = 10 + logMaxAge = 28 // days + logVerbosity = logger.Info + logSuppress = true +) + // TestImages will read a library of test images and calculate the sharpness and contrast scores. // A plot of the results will be generated and stored in the plots directory. func TestImages(t *testing.T) { + const ( k1, k2 = 8, 8 filterSize = 3 scale, alpha = 1.0, 1.0 ) + // Create lumberjack logger. + fileLog := &lumberjack.Logger{ + Filename: logPath, + MaxSize: logMaxSize, + MaxBackups: logMaxBackup, + MaxAge: logMaxAge, + } + log := *logger.New(logVerbosity, io.MultiWriter(fileLog), logSuppress) + template := gocv.IMRead("images/template.jpg", gocv.IMReadGrayScale) standard := gocv.IMRead("images/default.jpg", gocv.IMReadGrayScale) @@ -66,7 +89,7 @@ func TestImages(t *testing.T) { } } - ts, err := NewTurbiditySensor(template, standard, k1, k2, filterSize, scale, alpha) + ts, err := NewTurbiditySensor(template, standard, k1, k2, filterSize, scale, alpha, log) if err != nil { t.Fatalf("could not create turbidity sensor: %v", err) } @@ -88,7 +111,7 @@ func TestImages(t *testing.T) { results.Update(stat.Mean(sample_result.Sharpness, nil), stat.Mean(sample_result.Contrast, nil), float64(i)*increment, i) } - err = plotResults(results.Turbidity, normalize(results.Sharpness), normalize(results.Contrast)) + err = plotResults(results.Turbidity, results.Sharpness, results.Contrast) if err != nil { t.Fatalf("plotting Failed: %v", err) }