diff --git a/cmd/rv/probe.go b/cmd/rv/probe.go index 13ae1081..38f80a20 100644 --- a/cmd/rv/probe.go +++ b/cmd/rv/probe.go @@ -31,8 +31,7 @@ package main import ( "bytes" - "context" - "math" + "errors" "os" "time" @@ -42,6 +41,8 @@ import ( "gonum.org/v1/gonum/stat" ) +var errNotEnoughBytes = errors.New("not enough bytes to read") + // Misc constants. const ( maxImages = 1 // Max number of images read when evaluating turbidity. @@ -65,6 +66,11 @@ type turbidityProbe struct { buffer *bytes.Buffer } +type frameScanner struct { + off int + buf []byte +} + // NewTurbidityProbe returns a new turbidity probe. func NewTurbidityProbe(log logger.Logger, delay time.Duration) (*turbidityProbe, error) { tp := new(turbidityProbe) @@ -87,31 +93,30 @@ func NewTurbidityProbe(log logger.Logger, delay time.Duration) (*turbidityProbe, // Write, reads input h264 frames in the form of a byte stream and writes the the sharpness and contrast // scores of a video to the the turbidity probe. func (tp *turbidityProbe) Write(p []byte) (int, error) { - tp.buffer.Write(p) - ctx, cancel := context.WithTimeout(context.Background(), turbidityTimeout) - - go func() { - select { - case <-tp.ticker.C: - done := make(chan bool) - startTime := time.Now() - - tp.log.Log(logger.Debug, "beginning turbidity calculation") - go tp.turbidityCalculation(ctx, done) - - select { - case <-ctx.Done(): - tp.log.Debug("context deadline exceeded", "limit(sec)", time.Since(startTime).Seconds()) - cancel() - return - case <-done: - tp.log.Log(logger.Debug, "finished turbidity calculation", "total duration (sec)", time.Since(startTime).Seconds()) - return - } - default: - return + if len(tp.buffer.Bytes()) == 0 { + video, err := trim(p) + if err != nil { + tp.log.Log(logger.Warning, "trim error", "error", err.Error()) + return 0, nil } - }() + tp.buffer.Write(video) + tp.log.Log(logger.Debug, "video trimmed, write keyframe complete", "size(bytes)", len(tp.buffer.Bytes())) + } else if len(tp.buffer.Bytes()) < 100000 { + tp.buffer.Write(p) + tp.log.Log(logger.Debug, "write to video buffer complete", "size(bytes)", len(tp.buffer.Bytes())) + } + + select { + case <-tp.ticker.C: + tp.buffer.Reset() + /* + tp.log.Log(logger.Debug, "beginning turbidity calculation") + startTime := time.Now() + tp.turbidityCalculation() + tp.log.Log(logger.Debug, "finished turbidity calculation", "total duration (sec)", time.Since(startTime).Seconds()) + */ + default: + } return len(p), nil } @@ -119,7 +124,7 @@ func (tp *turbidityProbe) Close() error { return nil } -func (tp *turbidityProbe) turbidityCalculation(ctx context.Context, done chan<- bool) { +func (tp *turbidityProbe) turbidityCalculation() { var imgs []gocv.Mat img := gocv.NewMat() // Write byte array to a temp file. @@ -129,10 +134,8 @@ func (tp *turbidityProbe) turbidityCalculation(ctx context.Context, done chan<- // TODO: Error handling. return } - - defer os.Remove(file.Name()) startTime := time.Now() - _, err = file.Write(tp.buffer.Bytes()[:len(tp.buffer.Bytes())/int(math.Pow(2, 2))]) + _, err = file.Write(tp.buffer.Bytes()[:]) if err != nil { tp.log.Error("failed to write to temporary file", "error", err.Error()) // TODO: Error handling. @@ -140,7 +143,6 @@ func (tp *turbidityProbe) turbidityCalculation(ctx context.Context, done chan<- } tp.log.Log(logger.Debug, "writing to temp file", "total duration (sec)", time.Since(startTime).Seconds()) tp.log.Log(logger.Debug, "buffer", "size(bytes)", len(tp.buffer.Bytes())) - tp.buffer.Reset() startTime = time.Now() @@ -152,17 +154,15 @@ func (tp *turbidityProbe) turbidityCalculation(ctx context.Context, done chan<- return } tp.log.Log(logger.Debug, ".h264 decoded", "total duration (sec)", time.Since(startTime).Seconds()) - startTime = time.Now() for vc.Read(&img) && len(imgs) < maxImages { imgs = append(imgs, img.Clone()) } if len(imgs) <= 0 { - tp.log.Log(logger.Warning, "no frames found", "error") + tp.log.Log(logger.Warning, "no frames found") return } tp.log.Log(logger.Debug, "read time", "total duration (sec)", time.Since(startTime).Seconds()) - // Process video data to get saturation and contrast scores. startTime = time.Now() res, err := tp.ts.Evaluate(imgs) @@ -174,6 +174,52 @@ func (tp *turbidityProbe) turbidityCalculation(ctx context.Context, done chan<- tp.sharpness = stat.Mean(res.Sharpness, nil) } tp.log.Log(logger.Debug, "evaluate complete", "total duration (sec)", time.Since(startTime).Seconds()) - done <- true - return + + // Cleanup + err = os.Remove(file.Name()) + if err != nil { + tp.log.Error("could not remove file", "error", err.Error()) + } + err = vc.Close() + if err != nil { + tp.log.Error("could not close video capture", "error", err.Error()) + } +} + +func trim(n []byte) ([]byte, error) { + sc := frameScanner{buf: n} + for { + b, ok := sc.readByte() + if !ok { + return nil, errNotEnoughBytes + } + for i := 1; b == 0x00 && i != 4; i++ { + b, ok = sc.readByte() + if !ok { + return nil, errNotEnoughBytes + } + if b != 0x01 || (i != 2 && i != 3) { + continue + } + + b, ok = sc.readByte() + if !ok { + return nil, errNotEnoughBytes + } + nalType := int(b & 0x1f) + if nalType == 7 { + sc.off = sc.off - 4 + return sc.buf[sc.off:], nil + } + } + } +} + +func (s *frameScanner) readByte() (b byte, ok bool) { + if s.off >= len(s.buf) { + return 0, false + } + b = s.buf[s.off] + s.off++ + return b, true }