device/raspistill & revid & container/mts: fixed issues with raspistill test implementations and made slight logging improvements in mts encoder and revid.

This commit is contained in:
Saxon Nelson-Milton 2021-02-03 12:17:23 +10:30
parent d096434e1e
commit fa4713f8fd
6 changed files with 61 additions and 29 deletions

View File

@ -36,8 +36,8 @@ import (
"bitbucket.org/ausocean/av/container/mts/meta" "bitbucket.org/ausocean/av/container/mts/meta"
"bitbucket.org/ausocean/av/container/mts/pes" "bitbucket.org/ausocean/av/container/mts/pes"
"bitbucket.org/ausocean/av/container/mts/psi" "bitbucket.org/ausocean/av/container/mts/psi"
"bitbucket.org/ausocean/utils/realtime"
"bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/realtime"
) )
// These three constants are used to select between the three different // These three constants are used to select between the three different
@ -110,7 +110,6 @@ var Meta *meta.Data
// This will help us obtain a realtime for timestamp meta encoding. // This will help us obtain a realtime for timestamp meta encoding.
var RealTime = realtime.NewRealTime() var RealTime = realtime.NewRealTime()
// Encoder encapsulates properties of an MPEG-TS generator. // Encoder encapsulates properties of an MPEG-TS generator.
type Encoder struct { type Encoder struct {
dst io.WriteCloser dst io.WriteCloser
@ -246,7 +245,10 @@ func (e *Encoder) Write(data []byte) (int, error) {
pkt.PCR = pcr pkt.PCR = pcr
pusi = false pusi = false
} }
_, err := e.dst.Write(pkt.Bytes(e.tsSpace[:PacketSize]))
b := pkt.Bytes(e.tsSpace[:PacketSize])
e.log.Debug("writing MTS packet to destination", "size", len(b), "pusi", pusi, "PID", pkt.PID, "PTS", pts, "PCR", pkt.PCR)
_, err := e.dst.Write(b)
if err != nil { if err != nil {
return len(data), err return len(data), err
} }

View File

@ -30,10 +30,10 @@ LICENSE
package raspistill package raspistill
import ( import (
"bufio"
"errors" "errors"
"fmt" "fmt"
"io" "io"
"bufio"
"os/exec" "os/exec"
"strings" "strings"
@ -98,7 +98,6 @@ func (r *Raspistill) start() error {
return fmt.Errorf("could not pipe command error: %w", err) return fmt.Errorf("could not pipe command error: %w", err)
} }
go func() { go func() {
errScnr := bufio.NewScanner(stderr) errScnr := bufio.NewScanner(stderr)
for { for {

View File

@ -31,7 +31,7 @@ package raspistill
import ( import (
"io" "io"
"io/ioutil" "io/ioutil"
"path/filepath" "os"
"strconv" "strconv"
"sync" "sync"
"time" "time"
@ -42,9 +42,10 @@ import (
const ( const (
// TODO(Saxon): find nImages programmatically ? // TODO(Saxon): find nImages programmatically ?
nImages = 6 nImages = 6
relImgPath = "/bitbucket.org/ausocean/test/test-data/av/input/jpeg/"
imgPath = "../../../test/test-data/av/input/jpeg/"
jpgExt = ".jpg" jpgExt = ".jpg"
gopathEnvName = "GOPATH"
readWaitDelay = 1 * time.Second
) )
type raspistill struct { type raspistill struct {
@ -69,10 +70,11 @@ func new(l config.Logger) raspistill {
// We expect the 6 images test images to be named 0.jpg through to 5.jpg. // We expect the 6 images test images to be named 0.jpg through to 5.jpg.
r.log.Debug("loading test JPEG images") r.log.Debug("loading test JPEG images")
for i, _ := range r.images { for i, _ := range r.images {
absPath, err := filepath.Abs(imgPath) imgDir := os.Getenv(gopathEnvName) + relImgPath
path := absPath + "/" + strconv.Itoa(i) + jpgExt path := imgDir + strconv.Itoa(i) + jpgExt
r.images[i], err = ioutil.ReadFile(path)
var err error
r.images[i], err = ioutil.ReadFile(path)
if err != nil { if err != nil {
r.log.Fatal("error loading test image", "imageNum", i, "error", err) r.log.Fatal("error loading test image", "imageNum", i, "error", err)
} }
@ -85,14 +87,17 @@ func new(l config.Logger) raspistill {
// calls on Raspistill.read will return an error. // calls on Raspistill.read will return an error.
func (r *Raspistill) stop() error { func (r *Raspistill) stop() error {
r.log.Debug("stopping test raspistill") r.log.Debug("stopping test raspistill")
if r.running() {
r.setRunning(false) r.setRunning(false)
close(r.term)
}
return nil return nil
} }
// start creates and starts the timelapse and duration tickers and sets // start creates and starts the timelapse and duration tickers and sets
// isRunning flag to true indicating that raspistill is capturing. // isRunning flag to true indicating that raspistill is capturing.
func (r *Raspistill) start() error { func (r *Raspistill) start() error {
r.log.Debug("starting test raspistill") r.log.Debug("starting test implementation raspistill", "duration", r.cfg.TimelapseDuration.String(), "interval", r.cfg.TimelapseInterval.String())
r.durTicker = time.NewTicker(r.cfg.TimelapseDuration) r.durTicker = time.NewTicker(r.cfg.TimelapseDuration)
r.intvlTicker = time.NewTicker(r.cfg.TimelapseInterval) r.intvlTicker = time.NewTicker(r.cfg.TimelapseInterval)
@ -130,13 +135,13 @@ func (r *Raspistill) capture() {
r.intvlTicker.Reset(r.cfg.TimelapseInterval) r.intvlTicker.Reset(r.cfg.TimelapseInterval)
case <-r.term: case <-r.term:
r.log.Debug("got termination signal")
r.setRunning(false) r.setRunning(false)
return return
case t := <-r.durTicker.C: case t := <-r.durTicker.C:
r.log.Debug("got duration tick, timelapse over", "tick", t) r.log.Debug("got duration tick, timelapse over", "tick", t)
r.buf = nil close(r.term)
return
} }
} }
} }
@ -147,20 +152,39 @@ func (r *Raspistill) capture() {
func (r *Raspistill) read(p []byte) (int, error) { func (r *Raspistill) read(p []byte) (int, error) {
r.log.Debug("reading from test raspistill") r.log.Debug("reading from test raspistill")
if !r.running() { if !r.running() {
return 0, errNotStarted return 0, io.EOF
}
// Waits until there's something in the buffer for reading, unless there's a
// termination signal, in which case return io.EOF.
for {
select {
case <-r.term:
return 0, io.EOF
default:
}
if r.bufLen() != 0 {
break
}
time.Sleep(readWaitDelay)
} }
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
if r.buf == nil {
return 0, io.EOF
}
n := copy(p, r.buf) n := copy(p, r.buf)
r.buf = r.buf[n:] r.buf = r.buf[n:]
return n, nil return n, nil
} }
func (r *Raspistill) bufLen() int {
r.mu.Lock()
l := len(r.buf)
r.mu.Unlock()
return l
}
func (r *Raspistill) setRunning(s bool) { func (r *Raspistill) setRunning(s bool) {
r.mu.Lock() r.mu.Lock()
r.isRunning = s r.isRunning = s

View File

@ -43,7 +43,7 @@ const pkg = "raspistill: "
// Config field validation bounds. // Config field validation bounds.
const ( const (
minTimelapseDuration = 30 * time.Second // s minTimelapseDuration = 10 * time.Second // s
maxTimelapseDuration = 86400 * time.Second // s = 24 hours maxTimelapseDuration = 86400 * time.Second // s = 24 hours
minTimelapseInterval = 1 * time.Second // s minTimelapseInterval = 1 * time.Second // s
maxTimelapseInterval = 86400 * time.Second // s = 24 hours maxTimelapseInterval = 86400 * time.Second // s = 24 hours

View File

@ -50,6 +50,7 @@ func TestRaspistill(t *testing.T) {
// Configuration parameters. // Configuration parameters.
const ( const (
timelapseInterval = "4" timelapseInterval = "4"
timelapseDuration = "25"
rbStartElementSize = "1000000" rbStartElementSize = "1000000"
input = "Raspistill" input = "Raspistill"
codec = "JPEG" codec = "JPEG"
@ -88,6 +89,7 @@ func TestRaspistill(t *testing.T) {
config.KeyOutput: output, config.KeyOutput: output,
config.KeyOutputPath: outputPath, config.KeyOutputPath: outputPath,
config.KeyTimelapseInterval: timelapseInterval, config.KeyTimelapseInterval: timelapseInterval,
config.KeyTimelapseDuration: timelapseDuration,
config.KeyLogging: logging, config.KeyLogging: logging,
config.KeyRBStartElementSize: rbStartElementSize, config.KeyRBStartElementSize: rbStartElementSize,
}, },
@ -150,7 +152,7 @@ func TestRaspistill(t *testing.T) {
} }
// Clean up out directory. // Clean up out directory.
err := os.RemoveAll(outDir) err = os.RemoveAll(outDir)
if err != nil { if err != nil {
t.Fatalf("could not clean up out directory: %v", err) t.Fatalf("could not clean up out directory: %v", err)
} }

View File

@ -217,6 +217,7 @@ type mtsSender struct {
// newMtsSender returns a new mtsSender. // newMtsSender returns a new mtsSender.
func newMTSSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rb *ring.Buffer, clipDur time.Duration) *mtsSender { func newMTSSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rb *ring.Buffer, clipDur time.Duration) *mtsSender {
log(logger.Debug, "setting up mtsSender", "clip duration", int(clipDur))
s := &mtsSender{ s := &mtsSender{
dst: dst, dst: dst,
repairer: mts.NewDiscontinuityRepairer(), repairer: mts.NewDiscontinuityRepairer(),
@ -283,6 +284,7 @@ func (s *mtsSender) Write(d []byte) (int, error) {
} }
if s.next != nil { if s.next != nil {
s.log(logger.Debug, "appending packet to clip")
s.buf = append(s.buf, s.next...) s.buf = append(s.buf, s.next...)
} }
bytes := make([]byte, len(d)) bytes := make([]byte, len(d))
@ -290,7 +292,10 @@ func (s *mtsSender) Write(d []byte) (int, error) {
s.next = bytes s.next = bytes
p, _ := mts.PID(bytes) p, _ := mts.PID(bytes)
s.curPid = int(p) s.curPid = int(p)
if time.Now().Sub(s.prev) >= s.clipDur && s.curPid == mts.PatPid && len(s.buf) > 0 { curDur := time.Now().Sub(s.prev)
s.log(logger.Debug, "checking send conditions", "curDuration", int(curDur), "sendDur", int(s.clipDur), "curPID", s.curPid, "len", len(s.buf))
if curDur >= s.clipDur && s.curPid == mts.PatPid && len(s.buf) > 0 {
s.log(logger.Debug, "writing clip to ring buffer for sending", "size", len(s.buf))
s.prev = time.Now() s.prev = time.Now()
n, err := s.ring.Write(s.buf) n, err := s.ring.Write(s.buf)
if err == nil { if err == nil {