cmd/rv/probe.go improved logging and moved trim function to codec/h264/parse.go

This commit is contained in:
Russell Stanley 2022-02-23 10:26:47 +10:30
parent d4fbad12b7
commit cb59034a2e
3 changed files with 54 additions and 65 deletions

View File

@ -145,7 +145,11 @@ func main() {
rv *revid.Revid
p *turbidityProbe
)
p, err := NewTurbidityProbe(*log, 60*time.Second)
if err != nil {
log.Log(logger.Error, "could not create new turbidity probe", "error", err.Error())
}
log.Log(logger.Debug, "initialising netsender client")
ns, err := netsender.New(log, nil, readPin(p, rv, log), nil, createVarMap())

View File

@ -31,22 +31,19 @@ package main
import (
"bytes"
"errors"
"os"
"time"
"bitbucket.org/ausocean/av/codec/h264"
"bitbucket.org/ausocean/av/turbidity"
"bitbucket.org/ausocean/utils/logger"
"gocv.io/x/gocv"
"gonum.org/v1/gonum/stat"
)
var errNotEnoughBytes = errors.New("not enough bytes to read")
// Misc constants.
const (
maxImages = 1 // Max number of images read when evaluating turbidity.
turbidityTimeout = 50 * time.Second
maxImages = 1 // Max number of images read when evaluating turbidity.
)
// Turbidity sensor constants.
@ -66,11 +63,6 @@ type turbidityProbe struct {
buffer *bytes.Buffer
}
type frameScanner struct {
off int
buf []byte
}
// NewTurbidityProbe returns a new turbidity probe.
func NewTurbidityProbe(log logger.Logger, delay time.Duration) (*turbidityProbe, error) {
tp := new(turbidityProbe)
@ -93,10 +85,12 @@ func NewTurbidityProbe(log logger.Logger, delay time.Duration) (*turbidityProbe,
// Write, reads input h264 frames in the form of a byte stream and writes the the sharpness and contrast
// scores of a video to the the turbidity probe.
func (tp *turbidityProbe) Write(p []byte) (int, error) {
out := len(p)
// The first entry in the buffer must be a keyframe to speed up decoding.
if len(tp.buffer.Bytes()) == 0 {
video, err := trim(p)
video, err := h264.Trim(p)
if err != nil {
tp.log.Log(logger.Warning, "trim error", "error", err.Error())
tp.log.Log(logger.Debug, "no key frame detected", "error", err.Error())
return 0, nil
}
tp.buffer.Write(video)
@ -104,20 +98,19 @@ func (tp *turbidityProbe) Write(p []byte) (int, error) {
} else if len(tp.buffer.Bytes()) < 100000 {
tp.buffer.Write(p)
tp.log.Log(logger.Debug, "write to video buffer complete", "size(bytes)", len(tp.buffer.Bytes()))
} else {
out = 0
}
select {
case <-tp.ticker.C:
tp.buffer.Reset()
/*
tp.log.Log(logger.Debug, "beginning turbidity calculation")
startTime := time.Now()
tp.turbidityCalculation()
tp.log.Log(logger.Debug, "finished turbidity calculation", "total duration (sec)", time.Since(startTime).Seconds())
*/
tp.log.Log(logger.Debug, "beginning turbidity calculation")
startTime := time.Now()
tp.turbidityCalculation()
tp.log.Log(logger.Debug, "finished turbidity calculation", "total duration (sec)", time.Since(startTime).Seconds())
default:
}
return len(p), nil
return out, nil
}
func (tp *turbidityProbe) Close() error {
@ -134,26 +127,26 @@ func (tp *turbidityProbe) turbidityCalculation() {
// TODO: Error handling.
return
}
startTime := time.Now()
_, err = file.Write(tp.buffer.Bytes()[:])
if err != nil {
tp.log.Error("failed to write to temporary file", "error", err.Error())
// TODO: Error handling.
return
}
tp.log.Log(logger.Debug, "writing to temp file", "total duration (sec)", time.Since(startTime).Seconds())
tp.log.Log(logger.Debug, "buffer", "size(bytes)", len(tp.buffer.Bytes()))
tp.log.Log(logger.Debug, "write to file success", "buffer size(bytes)", len(tp.buffer.Bytes()))
tp.buffer.Reset()
startTime = time.Now()
// Read the file and store each frame.
// Open the video file.
startTime := time.Now()
vc, err := gocv.VideoCaptureFile(file.Name())
if err != nil {
tp.log.Error("failed to open video file", "error", err.Error())
// TODO: Error handling.
return
}
tp.log.Log(logger.Debug, ".h264 decoded", "total duration (sec)", time.Since(startTime).Seconds())
tp.log.Log(logger.Debug, "video capture open", "total duration (sec)", time.Since(startTime).Seconds())
// Store each frame untill max.
startTime = time.Now()
for vc.Read(&img) && len(imgs) < maxImages {
imgs = append(imgs, img.Clone())
@ -163,6 +156,7 @@ func (tp *turbidityProbe) turbidityCalculation() {
return
}
tp.log.Log(logger.Debug, "read time", "total duration (sec)", time.Since(startTime).Seconds())
// Process video data to get saturation and contrast scores.
startTime = time.Now()
res, err := tp.ts.Evaluate(imgs)
@ -175,7 +169,6 @@ func (tp *turbidityProbe) turbidityCalculation() {
}
tp.log.Log(logger.Debug, "evaluate complete", "total duration (sec)", time.Since(startTime).Seconds())
// Cleanup
err = os.Remove(file.Name())
if err != nil {
tp.log.Error("could not remove file", "error", err.Error())
@ -185,41 +178,3 @@ func (tp *turbidityProbe) turbidityCalculation() {
tp.log.Error("could not close video capture", "error", err.Error())
}
}
func trim(n []byte) ([]byte, error) {
sc := frameScanner{buf: n}
for {
b, ok := sc.readByte()
if !ok {
return nil, errNotEnoughBytes
}
for i := 1; b == 0x00 && i != 4; i++ {
b, ok = sc.readByte()
if !ok {
return nil, errNotEnoughBytes
}
if b != 0x01 || (i != 2 && i != 3) {
continue
}
b, ok = sc.readByte()
if !ok {
return nil, errNotEnoughBytes
}
nalType := int(b & 0x1f)
if nalType == 7 {
sc.off = sc.off - 4
return sc.buf[sc.off:], nil
}
}
}
}
func (s *frameScanner) readByte() (b byte, ok bool) {
if s.off >= len(s.buf) {
return 0, false
}
b = s.buf[s.off]
s.off++
return b, true
}

View File

@ -78,3 +78,33 @@ func (s *frameScanner) readByte() (b byte, ok bool) {
s.off++
return b, true
}
// Trim, will trim down a given byte stream of video data so that a key frame appears first.
func Trim(n []byte) ([]byte, error) {
sc := frameScanner{buf: n}
for {
b, ok := sc.readByte()
if !ok {
return nil, errNotEnoughBytes
}
for i := 1; b == 0x00 && i != 4; i++ {
b, ok = sc.readByte()
if !ok {
return nil, errNotEnoughBytes
}
if b != 0x01 || (i != 2 && i != 3) {
continue
}
b, ok = sc.readByte()
if !ok {
return nil, errNotEnoughBytes
}
nalType := int(b & 0x1f)
if nalType == 7 {
sc.off = sc.off - 4
return sc.buf[sc.off:], nil
}
}
}
}