revid & device/raspistill & container/mts: integrating device/raspistill functionality into revid

This commit is contained in:
Saxon Nelson-Milton 2021-01-27 16:37:33 +10:30
parent 9db5e7acb3
commit b077752462
13 changed files with 253 additions and 72 deletions

View File

@ -37,6 +37,16 @@ import (
"bitbucket.org/ausocean/av/container/mts/pes"
"bitbucket.org/ausocean/av/container/mts/psi"
"bitbucket.org/ausocean/utils/realtime"
"bitbucket.org/ausocean/utils/logger"
)
// Stream IDs as per ITU-T Rec. H.222.0 / ISO/IEC 13818-1 [1], tables 2-22 and 2-34.
const (
streamIDH264 = 27
streamIDH265 = 36
streamIDMJPEG = 28
streamIDJPEG = 136 // Privately number.
streamIDAudio = 0xc0 // ADPCM audio stream ID.
)
// These three constants are used to select between the three different
@ -53,6 +63,7 @@ const (
EncodeH265
EncodeJPEG
EncodeMJPEG
EncodeAudio
EncodePCM
EncodeADPCM
)
@ -109,13 +120,6 @@ var Meta *meta.Data
// This will help us obtain a realtime for timestamp meta encoding.
var RealTime = realtime.NewRealTime()
type logger interface {
Debug(string, ...interface{})
Info(string, ...interface{})
Warning(string, ...interface{})
Error(string, ...interface{})
Fatal(string, ...interface{})
}
// Encoder encapsulates properties of an MPEG-TS generator.
type Encoder struct {
@ -143,12 +147,12 @@ type Encoder struct {
patBytes, pmtBytes []byte
// log is a function that will be used through the encoder code for logging.
log logger
log logger.LoggerIF
}
// NewEncoder returns an Encoder with the specified media type and rate eg. if a video stream
// calls write for every frame, the rate will be the frame rate of the video.
func NewEncoder(dst io.WriteCloser, log logger, options ...func(*Encoder) error) (*Encoder, error) {
func NewEncoder(dst io.WriteCloser, log logger.LoggerIF, options ...func(*Encoder) error) (*Encoder, error) {
e := &Encoder{
dst: dst,
writePeriod: time.Duration(float64(time.Second) / defaultRate),
@ -181,6 +185,7 @@ func NewEncoder(dst io.WriteCloser, log logger, options ...func(*Encoder) error)
// Write implements io.Writer. Write takes raw video or audio data and encodes into MPEG-TS,
// then sending it to the encoder's io.Writer destination.
func (e *Encoder) Write(data []byte) (int, error) {
e.log.Debug("writing data", "len(data)", len(data))
switch e.psiMethod {
case psiMethodPacket:
e.log.Debug("checking packet no. conditions for PSI write", "count", e.pktCount, "PSI count", e.psiSendCount)
@ -329,7 +334,7 @@ func (e *Encoder) ccFor(pid uint16) byte {
// updateMeta adds/updates a metaData descriptor in the given psi bytes using data
// contained in the global Meta struct.
func updateMeta(b []byte, log logger) ([]byte, error) {
func updateMeta(b []byte, log logger.LoggerIF) ([]byte, error) {
p := psi.PSIBytes(b)
if RealTime.IsSet() {
t := strconv.Itoa(int(RealTime.Get().Unix()))

View File

@ -37,6 +37,7 @@ import (
"bitbucket.org/ausocean/av/container/mts/meta"
"bitbucket.org/ausocean/av/container/mts/psi"
"bitbucket.org/ausocean/utils/logger"
)
type nopCloser struct{ io.Writer }
@ -54,32 +55,6 @@ func (d *destination) Write(p []byte) (int, error) {
return len(p), nil
}
// testLogger will allow logging to be done by the testing pkg.
type testLogger testing.T
func (tl *testLogger) Debug(msg string, args ...interface{}) { tl.log("debug", msg, args...) }
func (tl *testLogger) Info(msg string, args ...interface{}) { tl.log("info", msg, args...) }
func (tl *testLogger) Warning(msg string, args ...interface{}) { tl.log("warning", msg, args...) }
func (tl *testLogger) Error(msg string, args ...interface{}) { tl.log("error", msg, args...) }
func (tl *testLogger) Fatal(msg string, args ...interface{}) { tl.log("fatal", msg, args...) }
func (tl *testLogger) log(lvl string, msg string, args ...interface{}) {
switch lvl {
case "debug", "info", "warning", "error", "fatal":
default:
panic("invalid log level")
}
msg = lvl + ": " + msg
for i := 0; i < len(args); i++ {
msg += " %v"
}
if len(args) == 0 {
tl.Log(msg + "\n")
return
}
tl.Logf(msg+"\n", args)
}
// TestEncodeVideo checks that we can correctly encode some dummy data into a
// valid MPEG-TS stream. This checks for correct MPEG-TS headers and also that the
// original data is stored correctly and is retreivable.
@ -127,7 +102,7 @@ func TestEncodeVideo(t *testing.T) {
// Create the dst and write the test data to encoder.
dst := &destination{}
e, err := NewEncoder(nopCloser{dst}, (*testLogger)(t), PacketBasedPSI(psiSendCount), Rate(25), MediaType(EncodeH264))
e, err := NewEncoder(nopCloser{dst}, (*logger.TestLogger)(t), PacketBasedPSI(psiSendCount), Rate(25), MediaType(EncodeH264))
if err != nil {
t.Fatalf("could not create MTS encoder, failed with error: %v", err)
}
@ -191,7 +166,7 @@ func TestEncodePcm(t *testing.T) {
sampleSize := 2
blockSize := 16000
writeFreq := float64(sampleRate*sampleSize) / float64(blockSize)
e, err := NewEncoder(nopCloser{&buf}, (*testLogger)(t), PacketBasedPSI(10), Rate(writeFreq), MediaType(EncodePCM))
e, err := NewEncoder(nopCloser{&buf}, (*logger.TestLogger)(t), PacketBasedPSI(10), Rate(writeFreq), MediaType(EncodePCM))
if err != nil {
t.Fatalf("could not create MTS encoder, failed with error: %v", err)
}
@ -295,7 +270,7 @@ const fps = 25
func TestMetaEncode1(t *testing.T) {
Meta = meta.New()
var buf bytes.Buffer
e, err := NewEncoder(nopCloser{&buf}, (*testLogger)(t))
e, err := NewEncoder(nopCloser{&buf}, (*logger.TestLogger)(t))
if err != nil {
t.Fatalf("could not create encoder, failed with error: %v", err)
}
@ -327,7 +302,7 @@ func TestMetaEncode1(t *testing.T) {
func TestMetaEncode2(t *testing.T) {
Meta = meta.New()
var buf bytes.Buffer
e, err := NewEncoder(nopCloser{&buf}, (*testLogger)(t))
e, err := NewEncoder(nopCloser{&buf}, (*logger.TestLogger)(t))
if err != nil {
t.Fatalf("could not create MTS encoder, failed with error: %v", err)
}
@ -359,7 +334,7 @@ func TestMetaEncode2(t *testing.T) {
func TestExtractMeta(t *testing.T) {
Meta = meta.New()
var buf bytes.Buffer
e, err := NewEncoder(nopCloser{&buf}, (*testLogger)(t))
e, err := NewEncoder(nopCloser{&buf}, (*logger.TestLogger)(t))
if err != nil {
t.Fatalf("could not create MTS encoder, failed with error: %v", err)
}

View File

@ -33,6 +33,7 @@ package meta
import (
"encoding/binary"
"errors"
"fmt"
"strings"
"sync"
)
@ -274,7 +275,7 @@ func GetAllFromString(s string) (map[string]string, error) {
// Keys and values are seperated by '=', so split and check that len(kv)=2.
kv := strings.Split(entry, "=")
if len(kv) != 2 {
return nil, ErrUnexpectedMetaFormat
return nil, fmt.Errorf("not enough key-value pairs (kv: %v)", kv)
}
all[kv[0]] = kv[1]
}

View File

@ -42,6 +42,7 @@ import (
"bitbucket.org/ausocean/av/container/mts/meta"
"bitbucket.org/ausocean/av/container/mts/pes"
"bitbucket.org/ausocean/av/container/mts/psi"
"bitbucket.org/ausocean/utils/logger"
)
// TestGetPTSRange checks that GetPTSRange can correctly get the first and last
@ -422,7 +423,13 @@ func TestTrimToMetaRange(t *testing.T) {
// TestSegmentForMeta checks that SegmentForMeta can correctly segment some MTS
// data based on a given meta key and value.
func TestSegmentForMeta(t *testing.T) {
Meta = meta.New()
// Copyright information prefixed to all metadata.
const (
metaPreambleKey = "copyright"
metaPreambleData = "ausocean.org/license/content2019"
)
Meta = meta.NewWith([][2]string{{metaPreambleKey, metaPreambleData}})
const (
nPSI = 10 // The number of PSI pairs to write.
@ -670,7 +677,7 @@ func TestFindPSI(t *testing.T) {
}).Bytes()
Meta.Add(metaKey, test.meta)
pmtTable, err = updateMeta(pmtTable, (*testLogger)(t))
pmtTable, err = updateMeta(pmtTable, (*logger.TestLogger)(t))
if err != nil {
t.Fatalf("could not update meta for test %d", i)
}

View File

@ -30,6 +30,7 @@ package mts
import (
"errors"
"fmt"
"sort"
"github.com/Comcast/gots/packet"
@ -72,13 +73,13 @@ func Extract(p []byte) (*Clip, error) {
case PmtPid:
meta, err = ExtractMeta(pkt[:])
if err != nil {
return nil, err
return nil, fmt.Errorf("could not extract meta data: %w", err)
}
default: // Must be media.
// Get the MPEG-TS payload.
payload, err := pkt.Payload()
if err != nil {
return nil, err
return nil, fmt.Errorf("could not extract payload: %w", err)
}
// If PUSI is true then we know it's the start of a new frame, and we have
@ -86,7 +87,7 @@ func Extract(p []byte) (*Clip, error) {
if pkt.PayloadUnitStartIndicator() {
_pes, err := pes.NewPESHeader(payload)
if err != nil {
return nil, err
return nil, fmt.Errorf("could not parse PES: %w", err)
}
// Extract the PTS and ID, then add a new frame to the clip.

View File

@ -38,6 +38,7 @@ import (
"bitbucket.org/ausocean/av/container/mts/meta"
"bitbucket.org/ausocean/av/container/mts/pes"
"bitbucket.org/ausocean/av/container/mts/psi"
"bitbucket.org/ausocean/utils/logger"
)
// TestExtract checks that we can coorectly extract media, pts, id and meta from
@ -154,7 +155,7 @@ func writePSIWithMeta(b *bytes.Buffer, t *testing.T) error {
}
// Update the meta in the pmt table.
pmtBytes, err = updateMeta(pmtBytes, (*testLogger)(t))
pmtBytes, err = updateMeta(pmtBytes, (*logger.TestLogger)(t))
if err != nil {
return err
}

View File

@ -30,6 +30,7 @@ package psi
import (
"errors"
"fmt"
"github.com/Comcast/gots/psi"
)
@ -303,6 +304,9 @@ func (p *PSIBytes) AddDescriptor(tag int, data []byte) error {
i, desc := p.HasDescriptor(tag)
if desc == nil {
err := p.createDescriptor(tag, data)
if err != nil {
return fmt.Errorf("could not create descriptor: %w", err)
}
return err
}

View File

@ -43,7 +43,7 @@ const (
// TODO(Saxon): find nImages programmatically ?
nImages = 6
imgPath = "../../../test/test-data/av/input/jpeg/"
imgPath = "../../test/test-data/av/input/jpeg/"
jpgExt = ".jpg"
)

11
exp/rvcl/config.json Normal file
View File

@ -0,0 +1,11 @@
{
"Input":"Raspistill",
"InputCodec":"JPEG",
"Output":"Files",
"OutputPath":"out/",
"TimelapseDuration":"3600",
"TimelapseInterval":"10",
"logging":"Debug",
"Suppress":"false",
"RBStartElementSize":"1000000"
}

View File

@ -43,6 +43,7 @@ import (
"bitbucket.org/ausocean/av/device"
"bitbucket.org/ausocean/av/device/file"
"bitbucket.org/ausocean/av/device/geovision"
"bitbucket.org/ausocean/av/device/raspistill"
"bitbucket.org/ausocean/av/device/raspivid"
"bitbucket.org/ausocean/av/device/webcam"
"bitbucket.org/ausocean/av/filter"
@ -81,7 +82,7 @@ func (r *Revid) reset(c config.Config) error {
var encOptions []func(*mts.Encoder) error
switch r.cfg.Input {
case config.InputRaspivid, config.InputFile, config.InputV4L, config.InputRTSP:
case config.InputRaspivid, config.InputRaspistill, config.InputFile, config.InputV4L, config.InputRTSP:
switch r.cfg.InputCodec {
case codecutil.H265:
if r.cfg.Input != config.InputRTSP {
@ -94,10 +95,14 @@ func (r *Revid) reset(c config.Config) error {
st = mts.EncodeMJPEG
encOptions = append(encOptions, mts.TimeBasedPSI(time.Duration(r.cfg.PSITime)*time.Second))
r.cfg.CBR = true
case codecutil.JPEG:
st = mts.EncodeJPEG
encOptions = append(encOptions, mts.TimeBasedPSI(time.Duration(r.cfg.PSITime)*time.Second))
r.cfg.CBR = true
case codecutil.PCM, codecutil.ADPCM:
return nil, fmt.Errorf("invalid input codec: %v for input: %v", r.cfg.InputCodec, r.cfg.Input)
default:
panic("unknown input codec")
panic("unknown input codec for Raspivid, Raspistill, File, V4l or RTSP input")
}
case config.InputAudio:
switch r.cfg.InputCodec {
@ -118,7 +123,7 @@ func (r *Revid) reset(c config.Config) error {
panic("unknown input type")
}
encOptions = append(encOptions, mts.MediaType(st), mts.Rate(rate))
return mts.NewEncoder(dst, &encLog{r.cfg.Logger}, encOptions...)
return mts.NewEncoder(dst, r.cfg.Logger, encOptions...)
},
func(dst io.WriteCloser, fps int) (io.WriteCloser, error) {
return flv.NewEncoder(dst, true, true, fps)
@ -167,7 +172,7 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
// Calculate no. of ring buffer elements based on starting element size
// const and config directed max ring buffer size, then create buffer.
// This is only used if the selected output uses a ring buffer.
nElements := r.cfg.RBCapacity / rbStartingElementSize
nElements := r.cfg.RBCapacity / r.cfg.RBStartElementSize
writeTimeout := time.Duration(r.cfg.RBWriteTimeout) * time.Second
// We will go through our outputs and create the corresponding senders to add
@ -178,7 +183,7 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
switch out {
case config.OutputHTTP:
r.cfg.Logger.Log(logger.Debug, "using HTTP output")
rb := ring.NewBuffer(rbStartingElementSize, int(nElements), writeTimeout)
rb := ring.NewBuffer(int(r.cfg.RBStartElementSize), int(nElements), writeTimeout)
hs := newHTTPSender(r.ns, r.cfg.Logger.Log, r.bitrate.Report)
w = newMTSSender(hs, r.cfg.Logger.Log, rb, r.cfg.ClipDuration)
mtsSenders = append(mtsSenders, w)
@ -192,14 +197,23 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
mtsSenders = append(mtsSenders, w)
case config.OutputFile:
r.cfg.Logger.Log(logger.Debug, "using File output")
w, err := newFileSender(r.cfg.OutputPath)
w, err := newFileSender(r.cfg.Logger, r.cfg.OutputPath, false)
if err != nil {
return err
}
mtsSenders = append(mtsSenders, w)
case config.OutputFiles:
r.cfg.Logger.Log(logger.Debug, "using Files output")
rb := ring.NewBuffer(int(r.cfg.RBStartElementSize), int(nElements), writeTimeout)
fs, err := newFileSender(r.cfg.Logger, r.cfg.OutputPath, true)
w = newMTSSender(fs, r.cfg.Logger.Log, rb, r.cfg.ClipDuration)
if err != nil {
return err
}
mtsSenders = append(mtsSenders, w)
case config.OutputRTMP:
r.cfg.Logger.Log(logger.Debug, "using RTMP output")
rb := ring.NewBuffer(rbStartingElementSize, int(nElements), writeTimeout)
rb := ring.NewBuffer(int(r.cfg.RBStartElementSize), int(nElements), writeTimeout)
w, err := newRtmpSender(r.cfg.RTMPURL, rtmpConnectionMaxTries, rb, r.cfg.Logger.Log, r.bitrate.Report)
if err != nil {
r.cfg.Logger.Log(logger.Warning, "rtmp connect error", "error", err.Error())
@ -273,6 +287,11 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
r.input = raspivid.New(r.cfg.Logger)
err = r.setLexer(r.cfg.InputCodec, false)
case config.InputRaspistill:
r.cfg.Logger.Log(logger.Debug, "using raspistill input")
r.input = raspistill.New(r.cfg.Logger)
r.setLexer(r.cfg.InputCodec, false)
case config.InputV4L:
r.cfg.Logger.Log(logger.Debug, "using V4L input")
r.input = webcam.New(r.cfg.Logger)
@ -327,9 +346,11 @@ func (r *Revid) setLexer(c uint8, isRTSP bool) error {
case codecutil.MJPEG, codecutil.JPEG:
r.cfg.Logger.Log(logger.Debug, "using MJPEG/JPEG codec")
r.lexTo = jpeg.Lex
jpeg.Log = r.cfg.Logger
if isRTSP {
r.lexTo = jpeg.NewExtractor().Extract
}
case codecutil.PCM, codecutil.ADPCM:
return errors.New("invalid codec for this selected input")
default:

View File

@ -54,14 +54,6 @@ type Logger interface {
Log(level int8, message string, params ...interface{})
}
type encLog struct{ Logger }
func (el *encLog) Debug(msg string, args ...interface{}) { el.Log(logger.Debug, msg, args...) }
func (el *encLog) Info(msg string, args ...interface{}) { el.Log(logger.Info, msg, args...) }
func (el *encLog) Warning(msg string, args ...interface{}) { el.Log(logger.Warning, msg, args...) }
func (el *encLog) Error(msg string, args ...interface{}) { el.Log(logger.Error, msg, args...) }
func (el *encLog) Fatal(msg string, args ...interface{}) { el.Log(logger.Fatal, msg, args...) }
// Revid provides methods to control a revid session; providing methods
// to start, stop and change the state of an instance using the Config struct.
type Revid struct {

144
revid/revid_test.go Normal file
View File

@ -0,0 +1,144 @@
// +build test
/*
DESCRIPTION
revid_test.go provides integration testing of the revid API.
AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org>
LICENSE
Copyright (C) 2017-2021 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package revid
import (
"bytes"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"testing"
"time"
"bitbucket.org/ausocean/av/container/mts"
"bitbucket.org/ausocean/av/container/mts/meta"
"bitbucket.org/ausocean/av/revid/config"
)
func TestRaspistill(t *testing.T) {
// Copyright information prefixed to all metadata.
const (
metaPreambleKey = "copyright"
metaPreambleData = "ausocean.org/license/content2021"
)
// Configuration parameters.
const (
timelapseInterval = "4"
rbStartElementSize = "1000000"
input = "Raspistill"
codec = "MJPEG"
output = "Files"
outDir = "out"
outputPath = outDir + "/"
logging = "Debug"
testImgDir = "../../../test/test-data/av/input/jpeg/"
)
const runTime = 40 * time.Second
// Add the copyright metadata.
mts.Meta = meta.NewWith([][2]string{{metaPreambleKey, metaPreambleData}})
// First remove out dir (if exists) to clear contents, then create dir.
os.RemoveAll(outDir)
os.Mkdir(outDir, os.ModePerm)
rv, err := New(config.Config{Logger: (*testLogger)(t)}, nil)
if err != nil {
t.Fatalf("did not expect error from revid.New(): %v", err)
}
err = rv.Update(
map[string]string{
config.KeyInput: input,
config.KeyInputCodec: codec,
config.KeyOutput: output,
config.KeyOutputPath: outputPath,
config.KeyTimelapseInterval: timelapseInterval,
config.KeyLogging: logging,
config.KeyRBStartElementSize: rbStartElementSize,
},
)
if err != nil {
t.Fatalf("did not expect error from rv.Update(): %v", err)
}
err = rv.Start()
if err != nil {
t.Fatalf("did not expect error from rv.Start(): %v", err)
}
time.Sleep(runTime)
rv.Stop()
// Get output file information.
os.Chdir(outDir)
var files []string
err = filepath.Walk(
".",
func(path string, info os.FileInfo, err error) error {
files = append(files, path)
return nil
},
)
if err != nil {
t.Fatalf("did not expect error from filepath.Walk(): %v", err)
}
if len(files) == 0 {
t.Fatalf("did not expect 0 output files")
}
// Load files outputted files and compare each one with corresponding input.
for i, n := range files {
// Ignore first file (which is prev dir ".").
if i == 0 {
continue
}
mtsBytes, err := ioutil.ReadFile(n)
if err != nil {
t.Fatalf("could not read output file: %v", err)
}
clip, err := mts.Extract(mtsBytes)
if err != nil {
t.Fatalf("could not extract clips from MPEG-TS stream: %v", err)
}
img := clip.Bytes()
inImg, err := ioutil.ReadFile(testImgDir + strconv.Itoa(i) + ".jpg")
if err != nil {
t.Fatalf("could not load input test image: %v", err)
}
if !bytes.Equal(img, inImg) {
t.Errorf("unexpected image extracted for img: %d", i)
}
}
}

View File

@ -42,6 +42,7 @@ import (
"bitbucket.org/ausocean/av/container/mts"
"bitbucket.org/ausocean/av/protocol/rtmp"
"bitbucket.org/ausocean/av/protocol/rtp"
"bitbucket.org/ausocean/av/revid/config"
"bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring"
@ -158,20 +159,38 @@ func extractMeta(r string, log func(lvl int8, msg string, args ...interface{}))
// fileSender implements loadSender for a local file destination.
type fileSender struct {
file *os.File
data []byte
file *os.File
data []byte
multiFile bool
path string
init bool
log config.Logger
}
func newFileSender(path string) (*fileSender, error) {
f, err := os.Create(path)
if err != nil {
return nil, err
}
return &fileSender{file: f}, nil
// newFileSender returns a new fileSender. Setting multi true will write a new
// file for each write to this sender.
func newFileSender(l config.Logger, path string, multiFile bool) (*fileSender, error) {
return &fileSender{
path: path,
log: l,
multiFile: multiFile,
init: true,
}, nil
}
// Write implements io.Writer.
func (s *fileSender) Write(d []byte) (int, error) {
if s.init || s.multiFile {
fileName := s.path + time.Now().String()
s.log.Debug("creating new output file", "init", s.init, "multiFile", s.multiFile, "fileName", fileName)
f, err := os.Create(fileName)
if err != nil {
return 0, fmt.Errorf("could not create file to write media to: %w", err)
}
s.file = f
s.init = false
}
s.log.Debug("writing output file", "len(d)", len(d))
return s.file.Write(d)
}