From b077752462733c5e2e1a901dbbcae5e4cf33f16b Mon Sep 17 00:00:00 2001 From: Saxon Nelson-Milton Date: Wed, 27 Jan 2021 16:37:33 +1030 Subject: [PATCH] revid & device/raspistill & container/mts: integrating device/raspistill functionality into revid --- container/mts/encoder.go | 25 +++--- container/mts/encoder_test.go | 37 ++------ container/mts/meta/meta.go | 3 +- container/mts/mpegts_test.go | 11 ++- container/mts/payload.go | 7 +- container/mts/payload_test.go | 3 +- container/mts/psi/psi.go | 4 + device/raspistill/imp_testing.go | 2 +- exp/rvcl/config.json | 11 +++ revid/pipeline.go | 35 ++++++-- revid/revid.go | 8 -- revid/revid_test.go | 144 +++++++++++++++++++++++++++++++ revid/senders.go | 35 ++++++-- 13 files changed, 253 insertions(+), 72 deletions(-) create mode 100644 exp/rvcl/config.json create mode 100644 revid/revid_test.go diff --git a/container/mts/encoder.go b/container/mts/encoder.go index 8faccbe5..02b5b7e4 100644 --- a/container/mts/encoder.go +++ b/container/mts/encoder.go @@ -37,6 +37,16 @@ import ( "bitbucket.org/ausocean/av/container/mts/pes" "bitbucket.org/ausocean/av/container/mts/psi" "bitbucket.org/ausocean/utils/realtime" + "bitbucket.org/ausocean/utils/logger" +) + +// Stream IDs as per ITU-T Rec. H.222.0 / ISO/IEC 13818-1 [1], tables 2-22 and 2-34. +const ( + streamIDH264 = 27 + streamIDH265 = 36 + streamIDMJPEG = 28 + streamIDJPEG = 136 // Privately number. + streamIDAudio = 0xc0 // ADPCM audio stream ID. ) // These three constants are used to select between the three different @@ -53,6 +63,7 @@ const ( EncodeH265 EncodeJPEG EncodeMJPEG + EncodeAudio EncodePCM EncodeADPCM ) @@ -109,13 +120,6 @@ var Meta *meta.Data // This will help us obtain a realtime for timestamp meta encoding. var RealTime = realtime.NewRealTime() -type logger interface { - Debug(string, ...interface{}) - Info(string, ...interface{}) - Warning(string, ...interface{}) - Error(string, ...interface{}) - Fatal(string, ...interface{}) -} // Encoder encapsulates properties of an MPEG-TS generator. type Encoder struct { @@ -143,12 +147,12 @@ type Encoder struct { patBytes, pmtBytes []byte // log is a function that will be used through the encoder code for logging. - log logger + log logger.LoggerIF } // NewEncoder returns an Encoder with the specified media type and rate eg. if a video stream // calls write for every frame, the rate will be the frame rate of the video. -func NewEncoder(dst io.WriteCloser, log logger, options ...func(*Encoder) error) (*Encoder, error) { +func NewEncoder(dst io.WriteCloser, log logger.LoggerIF, options ...func(*Encoder) error) (*Encoder, error) { e := &Encoder{ dst: dst, writePeriod: time.Duration(float64(time.Second) / defaultRate), @@ -181,6 +185,7 @@ func NewEncoder(dst io.WriteCloser, log logger, options ...func(*Encoder) error) // Write implements io.Writer. Write takes raw video or audio data and encodes into MPEG-TS, // then sending it to the encoder's io.Writer destination. func (e *Encoder) Write(data []byte) (int, error) { + e.log.Debug("writing data", "len(data)", len(data)) switch e.psiMethod { case psiMethodPacket: e.log.Debug("checking packet no. conditions for PSI write", "count", e.pktCount, "PSI count", e.psiSendCount) @@ -329,7 +334,7 @@ func (e *Encoder) ccFor(pid uint16) byte { // updateMeta adds/updates a metaData descriptor in the given psi bytes using data // contained in the global Meta struct. -func updateMeta(b []byte, log logger) ([]byte, error) { +func updateMeta(b []byte, log logger.LoggerIF) ([]byte, error) { p := psi.PSIBytes(b) if RealTime.IsSet() { t := strconv.Itoa(int(RealTime.Get().Unix())) diff --git a/container/mts/encoder_test.go b/container/mts/encoder_test.go index f3bd1cc0..2752efed 100644 --- a/container/mts/encoder_test.go +++ b/container/mts/encoder_test.go @@ -37,6 +37,7 @@ import ( "bitbucket.org/ausocean/av/container/mts/meta" "bitbucket.org/ausocean/av/container/mts/psi" + "bitbucket.org/ausocean/utils/logger" ) type nopCloser struct{ io.Writer } @@ -54,32 +55,6 @@ func (d *destination) Write(p []byte) (int, error) { return len(p), nil } -// testLogger will allow logging to be done by the testing pkg. -type testLogger testing.T - -func (tl *testLogger) Debug(msg string, args ...interface{}) { tl.log("debug", msg, args...) } -func (tl *testLogger) Info(msg string, args ...interface{}) { tl.log("info", msg, args...) } -func (tl *testLogger) Warning(msg string, args ...interface{}) { tl.log("warning", msg, args...) } -func (tl *testLogger) Error(msg string, args ...interface{}) { tl.log("error", msg, args...) } -func (tl *testLogger) Fatal(msg string, args ...interface{}) { tl.log("fatal", msg, args...) } - -func (tl *testLogger) log(lvl string, msg string, args ...interface{}) { - switch lvl { - case "debug", "info", "warning", "error", "fatal": - default: - panic("invalid log level") - } - msg = lvl + ": " + msg - for i := 0; i < len(args); i++ { - msg += " %v" - } - if len(args) == 0 { - tl.Log(msg + "\n") - return - } - tl.Logf(msg+"\n", args) -} - // TestEncodeVideo checks that we can correctly encode some dummy data into a // valid MPEG-TS stream. This checks for correct MPEG-TS headers and also that the // original data is stored correctly and is retreivable. @@ -127,7 +102,7 @@ func TestEncodeVideo(t *testing.T) { // Create the dst and write the test data to encoder. dst := &destination{} - e, err := NewEncoder(nopCloser{dst}, (*testLogger)(t), PacketBasedPSI(psiSendCount), Rate(25), MediaType(EncodeH264)) + e, err := NewEncoder(nopCloser{dst}, (*logger.TestLogger)(t), PacketBasedPSI(psiSendCount), Rate(25), MediaType(EncodeH264)) if err != nil { t.Fatalf("could not create MTS encoder, failed with error: %v", err) } @@ -191,7 +166,7 @@ func TestEncodePcm(t *testing.T) { sampleSize := 2 blockSize := 16000 writeFreq := float64(sampleRate*sampleSize) / float64(blockSize) - e, err := NewEncoder(nopCloser{&buf}, (*testLogger)(t), PacketBasedPSI(10), Rate(writeFreq), MediaType(EncodePCM)) + e, err := NewEncoder(nopCloser{&buf}, (*logger.TestLogger)(t), PacketBasedPSI(10), Rate(writeFreq), MediaType(EncodePCM)) if err != nil { t.Fatalf("could not create MTS encoder, failed with error: %v", err) } @@ -295,7 +270,7 @@ const fps = 25 func TestMetaEncode1(t *testing.T) { Meta = meta.New() var buf bytes.Buffer - e, err := NewEncoder(nopCloser{&buf}, (*testLogger)(t)) + e, err := NewEncoder(nopCloser{&buf}, (*logger.TestLogger)(t)) if err != nil { t.Fatalf("could not create encoder, failed with error: %v", err) } @@ -327,7 +302,7 @@ func TestMetaEncode1(t *testing.T) { func TestMetaEncode2(t *testing.T) { Meta = meta.New() var buf bytes.Buffer - e, err := NewEncoder(nopCloser{&buf}, (*testLogger)(t)) + e, err := NewEncoder(nopCloser{&buf}, (*logger.TestLogger)(t)) if err != nil { t.Fatalf("could not create MTS encoder, failed with error: %v", err) } @@ -359,7 +334,7 @@ func TestMetaEncode2(t *testing.T) { func TestExtractMeta(t *testing.T) { Meta = meta.New() var buf bytes.Buffer - e, err := NewEncoder(nopCloser{&buf}, (*testLogger)(t)) + e, err := NewEncoder(nopCloser{&buf}, (*logger.TestLogger)(t)) if err != nil { t.Fatalf("could not create MTS encoder, failed with error: %v", err) } diff --git a/container/mts/meta/meta.go b/container/mts/meta/meta.go index 9fca245d..cc1966c7 100644 --- a/container/mts/meta/meta.go +++ b/container/mts/meta/meta.go @@ -33,6 +33,7 @@ package meta import ( "encoding/binary" "errors" + "fmt" "strings" "sync" ) @@ -274,7 +275,7 @@ func GetAllFromString(s string) (map[string]string, error) { // Keys and values are seperated by '=', so split and check that len(kv)=2. kv := strings.Split(entry, "=") if len(kv) != 2 { - return nil, ErrUnexpectedMetaFormat + return nil, fmt.Errorf("not enough key-value pairs (kv: %v)", kv) } all[kv[0]] = kv[1] } diff --git a/container/mts/mpegts_test.go b/container/mts/mpegts_test.go index e1249aae..d9591fd2 100644 --- a/container/mts/mpegts_test.go +++ b/container/mts/mpegts_test.go @@ -42,6 +42,7 @@ import ( "bitbucket.org/ausocean/av/container/mts/meta" "bitbucket.org/ausocean/av/container/mts/pes" "bitbucket.org/ausocean/av/container/mts/psi" + "bitbucket.org/ausocean/utils/logger" ) // TestGetPTSRange checks that GetPTSRange can correctly get the first and last @@ -422,7 +423,13 @@ func TestTrimToMetaRange(t *testing.T) { // TestSegmentForMeta checks that SegmentForMeta can correctly segment some MTS // data based on a given meta key and value. func TestSegmentForMeta(t *testing.T) { - Meta = meta.New() + // Copyright information prefixed to all metadata. + const ( + metaPreambleKey = "copyright" + metaPreambleData = "ausocean.org/license/content2019" + ) + + Meta = meta.NewWith([][2]string{{metaPreambleKey, metaPreambleData}}) const ( nPSI = 10 // The number of PSI pairs to write. @@ -670,7 +677,7 @@ func TestFindPSI(t *testing.T) { }).Bytes() Meta.Add(metaKey, test.meta) - pmtTable, err = updateMeta(pmtTable, (*testLogger)(t)) + pmtTable, err = updateMeta(pmtTable, (*logger.TestLogger)(t)) if err != nil { t.Fatalf("could not update meta for test %d", i) } diff --git a/container/mts/payload.go b/container/mts/payload.go index 089eba76..90392a74 100644 --- a/container/mts/payload.go +++ b/container/mts/payload.go @@ -30,6 +30,7 @@ package mts import ( "errors" + "fmt" "sort" "github.com/Comcast/gots/packet" @@ -72,13 +73,13 @@ func Extract(p []byte) (*Clip, error) { case PmtPid: meta, err = ExtractMeta(pkt[:]) if err != nil { - return nil, err + return nil, fmt.Errorf("could not extract meta data: %w", err) } default: // Must be media. // Get the MPEG-TS payload. payload, err := pkt.Payload() if err != nil { - return nil, err + return nil, fmt.Errorf("could not extract payload: %w", err) } // If PUSI is true then we know it's the start of a new frame, and we have @@ -86,7 +87,7 @@ func Extract(p []byte) (*Clip, error) { if pkt.PayloadUnitStartIndicator() { _pes, err := pes.NewPESHeader(payload) if err != nil { - return nil, err + return nil, fmt.Errorf("could not parse PES: %w", err) } // Extract the PTS and ID, then add a new frame to the clip. diff --git a/container/mts/payload_test.go b/container/mts/payload_test.go index f940c68f..4029598f 100644 --- a/container/mts/payload_test.go +++ b/container/mts/payload_test.go @@ -38,6 +38,7 @@ import ( "bitbucket.org/ausocean/av/container/mts/meta" "bitbucket.org/ausocean/av/container/mts/pes" "bitbucket.org/ausocean/av/container/mts/psi" + "bitbucket.org/ausocean/utils/logger" ) // TestExtract checks that we can coorectly extract media, pts, id and meta from @@ -154,7 +155,7 @@ func writePSIWithMeta(b *bytes.Buffer, t *testing.T) error { } // Update the meta in the pmt table. - pmtBytes, err = updateMeta(pmtBytes, (*testLogger)(t)) + pmtBytes, err = updateMeta(pmtBytes, (*logger.TestLogger)(t)) if err != nil { return err } diff --git a/container/mts/psi/psi.go b/container/mts/psi/psi.go index edfde7be..0c2604a6 100644 --- a/container/mts/psi/psi.go +++ b/container/mts/psi/psi.go @@ -30,6 +30,7 @@ package psi import ( "errors" + "fmt" "github.com/Comcast/gots/psi" ) @@ -303,6 +304,9 @@ func (p *PSIBytes) AddDescriptor(tag int, data []byte) error { i, desc := p.HasDescriptor(tag) if desc == nil { err := p.createDescriptor(tag, data) + if err != nil { + return fmt.Errorf("could not create descriptor: %w", err) + } return err } diff --git a/device/raspistill/imp_testing.go b/device/raspistill/imp_testing.go index 94027e7b..e8112526 100644 --- a/device/raspistill/imp_testing.go +++ b/device/raspistill/imp_testing.go @@ -43,7 +43,7 @@ const ( // TODO(Saxon): find nImages programmatically ? nImages = 6 - imgPath = "../../../test/test-data/av/input/jpeg/" + imgPath = "../../test/test-data/av/input/jpeg/" jpgExt = ".jpg" ) diff --git a/exp/rvcl/config.json b/exp/rvcl/config.json new file mode 100644 index 00000000..40b7cc25 --- /dev/null +++ b/exp/rvcl/config.json @@ -0,0 +1,11 @@ +{ + "Input":"Raspistill", + "InputCodec":"JPEG", + "Output":"Files", + "OutputPath":"out/", + "TimelapseDuration":"3600", + "TimelapseInterval":"10", + "logging":"Debug", + "Suppress":"false", + "RBStartElementSize":"1000000" +} diff --git a/revid/pipeline.go b/revid/pipeline.go index 59340856..fb639f34 100644 --- a/revid/pipeline.go +++ b/revid/pipeline.go @@ -43,6 +43,7 @@ import ( "bitbucket.org/ausocean/av/device" "bitbucket.org/ausocean/av/device/file" "bitbucket.org/ausocean/av/device/geovision" + "bitbucket.org/ausocean/av/device/raspistill" "bitbucket.org/ausocean/av/device/raspivid" "bitbucket.org/ausocean/av/device/webcam" "bitbucket.org/ausocean/av/filter" @@ -81,7 +82,7 @@ func (r *Revid) reset(c config.Config) error { var encOptions []func(*mts.Encoder) error switch r.cfg.Input { - case config.InputRaspivid, config.InputFile, config.InputV4L, config.InputRTSP: + case config.InputRaspivid, config.InputRaspistill, config.InputFile, config.InputV4L, config.InputRTSP: switch r.cfg.InputCodec { case codecutil.H265: if r.cfg.Input != config.InputRTSP { @@ -94,10 +95,14 @@ func (r *Revid) reset(c config.Config) error { st = mts.EncodeMJPEG encOptions = append(encOptions, mts.TimeBasedPSI(time.Duration(r.cfg.PSITime)*time.Second)) r.cfg.CBR = true + case codecutil.JPEG: + st = mts.EncodeJPEG + encOptions = append(encOptions, mts.TimeBasedPSI(time.Duration(r.cfg.PSITime)*time.Second)) + r.cfg.CBR = true case codecutil.PCM, codecutil.ADPCM: return nil, fmt.Errorf("invalid input codec: %v for input: %v", r.cfg.InputCodec, r.cfg.Input) default: - panic("unknown input codec") + panic("unknown input codec for Raspivid, Raspistill, File, V4l or RTSP input") } case config.InputAudio: switch r.cfg.InputCodec { @@ -118,7 +123,7 @@ func (r *Revid) reset(c config.Config) error { panic("unknown input type") } encOptions = append(encOptions, mts.MediaType(st), mts.Rate(rate)) - return mts.NewEncoder(dst, &encLog{r.cfg.Logger}, encOptions...) + return mts.NewEncoder(dst, r.cfg.Logger, encOptions...) }, func(dst io.WriteCloser, fps int) (io.WriteCloser, error) { return flv.NewEncoder(dst, true, true, fps) @@ -167,7 +172,7 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. // Calculate no. of ring buffer elements based on starting element size // const and config directed max ring buffer size, then create buffer. // This is only used if the selected output uses a ring buffer. - nElements := r.cfg.RBCapacity / rbStartingElementSize + nElements := r.cfg.RBCapacity / r.cfg.RBStartElementSize writeTimeout := time.Duration(r.cfg.RBWriteTimeout) * time.Second // We will go through our outputs and create the corresponding senders to add @@ -178,7 +183,7 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. switch out { case config.OutputHTTP: r.cfg.Logger.Log(logger.Debug, "using HTTP output") - rb := ring.NewBuffer(rbStartingElementSize, int(nElements), writeTimeout) + rb := ring.NewBuffer(int(r.cfg.RBStartElementSize), int(nElements), writeTimeout) hs := newHTTPSender(r.ns, r.cfg.Logger.Log, r.bitrate.Report) w = newMTSSender(hs, r.cfg.Logger.Log, rb, r.cfg.ClipDuration) mtsSenders = append(mtsSenders, w) @@ -192,14 +197,23 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. mtsSenders = append(mtsSenders, w) case config.OutputFile: r.cfg.Logger.Log(logger.Debug, "using File output") - w, err := newFileSender(r.cfg.OutputPath) + w, err := newFileSender(r.cfg.Logger, r.cfg.OutputPath, false) + if err != nil { + return err + } + mtsSenders = append(mtsSenders, w) + case config.OutputFiles: + r.cfg.Logger.Log(logger.Debug, "using Files output") + rb := ring.NewBuffer(int(r.cfg.RBStartElementSize), int(nElements), writeTimeout) + fs, err := newFileSender(r.cfg.Logger, r.cfg.OutputPath, true) + w = newMTSSender(fs, r.cfg.Logger.Log, rb, r.cfg.ClipDuration) if err != nil { return err } mtsSenders = append(mtsSenders, w) case config.OutputRTMP: r.cfg.Logger.Log(logger.Debug, "using RTMP output") - rb := ring.NewBuffer(rbStartingElementSize, int(nElements), writeTimeout) + rb := ring.NewBuffer(int(r.cfg.RBStartElementSize), int(nElements), writeTimeout) w, err := newRtmpSender(r.cfg.RTMPURL, rtmpConnectionMaxTries, rb, r.cfg.Logger.Log, r.bitrate.Report) if err != nil { r.cfg.Logger.Log(logger.Warning, "rtmp connect error", "error", err.Error()) @@ -273,6 +287,11 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. r.input = raspivid.New(r.cfg.Logger) err = r.setLexer(r.cfg.InputCodec, false) + case config.InputRaspistill: + r.cfg.Logger.Log(logger.Debug, "using raspistill input") + r.input = raspistill.New(r.cfg.Logger) + r.setLexer(r.cfg.InputCodec, false) + case config.InputV4L: r.cfg.Logger.Log(logger.Debug, "using V4L input") r.input = webcam.New(r.cfg.Logger) @@ -327,9 +346,11 @@ func (r *Revid) setLexer(c uint8, isRTSP bool) error { case codecutil.MJPEG, codecutil.JPEG: r.cfg.Logger.Log(logger.Debug, "using MJPEG/JPEG codec") r.lexTo = jpeg.Lex + jpeg.Log = r.cfg.Logger if isRTSP { r.lexTo = jpeg.NewExtractor().Extract } + case codecutil.PCM, codecutil.ADPCM: return errors.New("invalid codec for this selected input") default: diff --git a/revid/revid.go b/revid/revid.go index 5a2d20ec..f6e0cb0a 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -54,14 +54,6 @@ type Logger interface { Log(level int8, message string, params ...interface{}) } -type encLog struct{ Logger } - -func (el *encLog) Debug(msg string, args ...interface{}) { el.Log(logger.Debug, msg, args...) } -func (el *encLog) Info(msg string, args ...interface{}) { el.Log(logger.Info, msg, args...) } -func (el *encLog) Warning(msg string, args ...interface{}) { el.Log(logger.Warning, msg, args...) } -func (el *encLog) Error(msg string, args ...interface{}) { el.Log(logger.Error, msg, args...) } -func (el *encLog) Fatal(msg string, args ...interface{}) { el.Log(logger.Fatal, msg, args...) } - // Revid provides methods to control a revid session; providing methods // to start, stop and change the state of an instance using the Config struct. type Revid struct { diff --git a/revid/revid_test.go b/revid/revid_test.go new file mode 100644 index 00000000..65741726 --- /dev/null +++ b/revid/revid_test.go @@ -0,0 +1,144 @@ +// +build test + +/* +DESCRIPTION + revid_test.go provides integration testing of the revid API. + +AUTHORS + Saxon A. Nelson-Milton + +LICENSE + Copyright (C) 2017-2021 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + +package revid + +import ( + "bytes" + "io/ioutil" + "os" + "path/filepath" + "strconv" + "testing" + "time" + + "bitbucket.org/ausocean/av/container/mts" + "bitbucket.org/ausocean/av/container/mts/meta" + "bitbucket.org/ausocean/av/revid/config" +) + +func TestRaspistill(t *testing.T) { + // Copyright information prefixed to all metadata. + const ( + metaPreambleKey = "copyright" + metaPreambleData = "ausocean.org/license/content2021" + ) + + // Configuration parameters. + const ( + timelapseInterval = "4" + rbStartElementSize = "1000000" + input = "Raspistill" + codec = "MJPEG" + output = "Files" + outDir = "out" + outputPath = outDir + "/" + logging = "Debug" + testImgDir = "../../../test/test-data/av/input/jpeg/" + ) + + const runTime = 40 * time.Second + + // Add the copyright metadata. + mts.Meta = meta.NewWith([][2]string{{metaPreambleKey, metaPreambleData}}) + + // First remove out dir (if exists) to clear contents, then create dir. + os.RemoveAll(outDir) + os.Mkdir(outDir, os.ModePerm) + + rv, err := New(config.Config{Logger: (*testLogger)(t)}, nil) + if err != nil { + t.Fatalf("did not expect error from revid.New(): %v", err) + } + + err = rv.Update( + map[string]string{ + config.KeyInput: input, + config.KeyInputCodec: codec, + config.KeyOutput: output, + config.KeyOutputPath: outputPath, + config.KeyTimelapseInterval: timelapseInterval, + config.KeyLogging: logging, + config.KeyRBStartElementSize: rbStartElementSize, + }, + ) + if err != nil { + t.Fatalf("did not expect error from rv.Update(): %v", err) + } + + err = rv.Start() + if err != nil { + t.Fatalf("did not expect error from rv.Start(): %v", err) + } + time.Sleep(runTime) + rv.Stop() + + // Get output file information. + os.Chdir(outDir) + var files []string + err = filepath.Walk( + ".", + func(path string, info os.FileInfo, err error) error { + files = append(files, path) + return nil + }, + ) + if err != nil { + t.Fatalf("did not expect error from filepath.Walk(): %v", err) + } + + if len(files) == 0 { + t.Fatalf("did not expect 0 output files") + } + + // Load files outputted files and compare each one with corresponding input. + for i, n := range files { + // Ignore first file (which is prev dir "."). + if i == 0 { + continue + } + + mtsBytes, err := ioutil.ReadFile(n) + if err != nil { + t.Fatalf("could not read output file: %v", err) + } + + clip, err := mts.Extract(mtsBytes) + if err != nil { + t.Fatalf("could not extract clips from MPEG-TS stream: %v", err) + } + img := clip.Bytes() + + inImg, err := ioutil.ReadFile(testImgDir + strconv.Itoa(i) + ".jpg") + if err != nil { + t.Fatalf("could not load input test image: %v", err) + } + + if !bytes.Equal(img, inImg) { + t.Errorf("unexpected image extracted for img: %d", i) + } + } +} diff --git a/revid/senders.go b/revid/senders.go index 82f5cf18..79dfa73b 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -42,6 +42,7 @@ import ( "bitbucket.org/ausocean/av/container/mts" "bitbucket.org/ausocean/av/protocol/rtmp" "bitbucket.org/ausocean/av/protocol/rtp" + "bitbucket.org/ausocean/av/revid/config" "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/ring" @@ -158,20 +159,38 @@ func extractMeta(r string, log func(lvl int8, msg string, args ...interface{})) // fileSender implements loadSender for a local file destination. type fileSender struct { - file *os.File - data []byte + file *os.File + data []byte + multiFile bool + path string + init bool + log config.Logger } -func newFileSender(path string) (*fileSender, error) { - f, err := os.Create(path) - if err != nil { - return nil, err - } - return &fileSender{file: f}, nil +// newFileSender returns a new fileSender. Setting multi true will write a new +// file for each write to this sender. +func newFileSender(l config.Logger, path string, multiFile bool) (*fileSender, error) { + return &fileSender{ + path: path, + log: l, + multiFile: multiFile, + init: true, + }, nil } // Write implements io.Writer. func (s *fileSender) Write(d []byte) (int, error) { + if s.init || s.multiFile { + fileName := s.path + time.Now().String() + s.log.Debug("creating new output file", "init", s.init, "multiFile", s.multiFile, "fileName", fileName) + f, err := os.Create(fileName) + if err != nil { + return 0, fmt.Errorf("could not create file to write media to: %w", err) + } + s.file = f + s.init = false + } + s.log.Debug("writing output file", "len(d)", len(d)) return s.file.Write(d) }