From fa4713f8fd3643c9fefaec0c83191a70a33d34e8 Mon Sep 17 00:00:00 2001 From: Saxon Nelson-Milton Date: Wed, 3 Feb 2021 12:17:23 +1030 Subject: [PATCH] device/raspistill & revid & container/mts: fixed issues with raspistill test implementations and made slight logging improvements in mts encoder and revid. --- container/mts/encoder.go | 8 +++-- device/raspistill/imp_release.go | 7 ++-- device/raspistill/imp_testing.go | 56 +++++++++++++++++++++++--------- device/raspistill/raspistill.go | 2 +- revid/revid_test.go | 10 +++--- revid/senders.go | 7 +++- 6 files changed, 61 insertions(+), 29 deletions(-) diff --git a/container/mts/encoder.go b/container/mts/encoder.go index 6c4bb151..e78bd8b8 100644 --- a/container/mts/encoder.go +++ b/container/mts/encoder.go @@ -36,8 +36,8 @@ import ( "bitbucket.org/ausocean/av/container/mts/meta" "bitbucket.org/ausocean/av/container/mts/pes" "bitbucket.org/ausocean/av/container/mts/psi" - "bitbucket.org/ausocean/utils/realtime" "bitbucket.org/ausocean/utils/logger" + "bitbucket.org/ausocean/utils/realtime" ) // These three constants are used to select between the three different @@ -110,7 +110,6 @@ var Meta *meta.Data // This will help us obtain a realtime for timestamp meta encoding. var RealTime = realtime.NewRealTime() - // Encoder encapsulates properties of an MPEG-TS generator. type Encoder struct { dst io.WriteCloser @@ -246,7 +245,10 @@ func (e *Encoder) Write(data []byte) (int, error) { pkt.PCR = pcr pusi = false } - _, err := e.dst.Write(pkt.Bytes(e.tsSpace[:PacketSize])) + + b := pkt.Bytes(e.tsSpace[:PacketSize]) + e.log.Debug("writing MTS packet to destination", "size", len(b), "pusi", pusi, "PID", pkt.PID, "PTS", pts, "PCR", pkt.PCR) + _, err := e.dst.Write(b) if err != nil { return len(data), err } diff --git a/device/raspistill/imp_release.go b/device/raspistill/imp_release.go index 58658e7d..aeeadd01 100644 --- a/device/raspistill/imp_release.go +++ b/device/raspistill/imp_release.go @@ -30,10 +30,10 @@ LICENSE package raspistill import ( + "bufio" "errors" "fmt" "io" - "bufio" "os/exec" "strings" @@ -98,7 +98,6 @@ func (r *Raspistill) start() error { return fmt.Errorf("could not pipe command error: %w", err) } - go func() { errScnr := bufio.NewScanner(stderr) for { @@ -110,13 +109,13 @@ func (r *Raspistill) start() error { } if errScnr.Scan() { - r.log.Error("error line from raspistill stderr","error",errScnr.Text()) + r.log.Error("error line from raspistill stderr", "error", errScnr.Text()) continue } err := errScnr.Err() if err != nil { - r.log.Error("error from stderr scan","error",err) + r.log.Error("error from stderr scan", "error", err) } } }() diff --git a/device/raspistill/imp_testing.go b/device/raspistill/imp_testing.go index 94027e7b..213e6c2b 100644 --- a/device/raspistill/imp_testing.go +++ b/device/raspistill/imp_testing.go @@ -31,7 +31,7 @@ package raspistill import ( "io" "io/ioutil" - "path/filepath" + "os" "strconv" "sync" "time" @@ -41,10 +41,11 @@ import ( const ( // TODO(Saxon): find nImages programmatically ? - nImages = 6 - - imgPath = "../../../test/test-data/av/input/jpeg/" - jpgExt = ".jpg" + nImages = 6 + relImgPath = "/bitbucket.org/ausocean/test/test-data/av/input/jpeg/" + jpgExt = ".jpg" + gopathEnvName = "GOPATH" + readWaitDelay = 1 * time.Second ) type raspistill struct { @@ -69,10 +70,11 @@ func new(l config.Logger) raspistill { // We expect the 6 images test images to be named 0.jpg through to 5.jpg. r.log.Debug("loading test JPEG images") for i, _ := range r.images { - absPath, err := filepath.Abs(imgPath) - path := absPath + "/" + strconv.Itoa(i) + jpgExt - r.images[i], err = ioutil.ReadFile(path) + imgDir := os.Getenv(gopathEnvName) + relImgPath + path := imgDir + strconv.Itoa(i) + jpgExt + var err error + r.images[i], err = ioutil.ReadFile(path) if err != nil { r.log.Fatal("error loading test image", "imageNum", i, "error", err) } @@ -85,14 +87,17 @@ func new(l config.Logger) raspistill { // calls on Raspistill.read will return an error. func (r *Raspistill) stop() error { r.log.Debug("stopping test raspistill") - r.setRunning(false) + if r.running() { + r.setRunning(false) + close(r.term) + } return nil } // start creates and starts the timelapse and duration tickers and sets // isRunning flag to true indicating that raspistill is capturing. func (r *Raspistill) start() error { - r.log.Debug("starting test raspistill") + r.log.Debug("starting test implementation raspistill", "duration", r.cfg.TimelapseDuration.String(), "interval", r.cfg.TimelapseInterval.String()) r.durTicker = time.NewTicker(r.cfg.TimelapseDuration) r.intvlTicker = time.NewTicker(r.cfg.TimelapseInterval) @@ -130,13 +135,13 @@ func (r *Raspistill) capture() { r.intvlTicker.Reset(r.cfg.TimelapseInterval) case <-r.term: + r.log.Debug("got termination signal") r.setRunning(false) return case t := <-r.durTicker.C: r.log.Debug("got duration tick, timelapse over", "tick", t) - r.buf = nil - return + close(r.term) } } } @@ -147,20 +152,39 @@ func (r *Raspistill) capture() { func (r *Raspistill) read(p []byte) (int, error) { r.log.Debug("reading from test raspistill") if !r.running() { - return 0, errNotStarted + return 0, io.EOF + } + + // Waits until there's something in the buffer for reading, unless there's a + // termination signal, in which case return io.EOF. + for { + select { + case <-r.term: + return 0, io.EOF + default: + } + + if r.bufLen() != 0 { + break + } + time.Sleep(readWaitDelay) } r.mu.Lock() defer r.mu.Unlock() - if r.buf == nil { - return 0, io.EOF - } n := copy(p, r.buf) r.buf = r.buf[n:] return n, nil } +func (r *Raspistill) bufLen() int { + r.mu.Lock() + l := len(r.buf) + r.mu.Unlock() + return l +} + func (r *Raspistill) setRunning(s bool) { r.mu.Lock() r.isRunning = s diff --git a/device/raspistill/raspistill.go b/device/raspistill/raspistill.go index 67213565..5c74553a 100644 --- a/device/raspistill/raspistill.go +++ b/device/raspistill/raspistill.go @@ -43,7 +43,7 @@ const pkg = "raspistill: " // Config field validation bounds. const ( - minTimelapseDuration = 30 * time.Second // s + minTimelapseDuration = 10 * time.Second // s maxTimelapseDuration = 86400 * time.Second // s = 24 hours minTimelapseInterval = 1 * time.Second // s maxTimelapseInterval = 86400 * time.Second // s = 24 hours diff --git a/revid/revid_test.go b/revid/revid_test.go index b653ff01..194e27c8 100644 --- a/revid/revid_test.go +++ b/revid/revid_test.go @@ -50,6 +50,7 @@ func TestRaspistill(t *testing.T) { // Configuration parameters. const ( timelapseInterval = "4" + timelapseDuration = "25" rbStartElementSize = "1000000" input = "Raspistill" codec = "JPEG" @@ -68,12 +69,12 @@ func TestRaspistill(t *testing.T) { // First remove out dir (if exists) to clear contents, then create dir. err := os.RemoveAll(outDir) if err != nil { - t.Fatalf("could not remove any prior out directory: %v",err) + t.Fatalf("could not remove any prior out directory: %v", err) } os.Mkdir(outDir, os.ModePerm) if err != nil { - t.Fatalf("could not create new out directory: %v",err) + t.Fatalf("could not create new out directory: %v", err) } rv, err := New(config.Config{Logger: (*testLogger)(t)}, nil) @@ -88,6 +89,7 @@ func TestRaspistill(t *testing.T) { config.KeyOutput: output, config.KeyOutputPath: outputPath, config.KeyTimelapseInterval: timelapseInterval, + config.KeyTimelapseDuration: timelapseDuration, config.KeyLogging: logging, config.KeyRBStartElementSize: rbStartElementSize, }, @@ -150,8 +152,8 @@ func TestRaspistill(t *testing.T) { } // Clean up out directory. - err := os.RemoveAll(outDir) + err = os.RemoveAll(outDir) if err != nil { - t.Fatalf("could not clean up out directory: %v",err) + t.Fatalf("could not clean up out directory: %v", err) } } diff --git a/revid/senders.go b/revid/senders.go index 79dfa73b..619e08b9 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -217,6 +217,7 @@ type mtsSender struct { // newMtsSender returns a new mtsSender. func newMTSSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rb *ring.Buffer, clipDur time.Duration) *mtsSender { + log(logger.Debug, "setting up mtsSender", "clip duration", int(clipDur)) s := &mtsSender{ dst: dst, repairer: mts.NewDiscontinuityRepairer(), @@ -283,6 +284,7 @@ func (s *mtsSender) Write(d []byte) (int, error) { } if s.next != nil { + s.log(logger.Debug, "appending packet to clip") s.buf = append(s.buf, s.next...) } bytes := make([]byte, len(d)) @@ -290,7 +292,10 @@ func (s *mtsSender) Write(d []byte) (int, error) { s.next = bytes p, _ := mts.PID(bytes) s.curPid = int(p) - if time.Now().Sub(s.prev) >= s.clipDur && s.curPid == mts.PatPid && len(s.buf) > 0 { + curDur := time.Now().Sub(s.prev) + s.log(logger.Debug, "checking send conditions", "curDuration", int(curDur), "sendDur", int(s.clipDur), "curPID", s.curPid, "len", len(s.buf)) + if curDur >= s.clipDur && s.curPid == mts.PatPid && len(s.buf) > 0 { + s.log(logger.Debug, "writing clip to ring buffer for sending", "size", len(s.buf)) s.prev = time.Now() n, err := s.ring.Write(s.buf) if err == nil {