mirror of https://bitbucket.org/ausocean/av.git
Merged in raspistill-revid (pull request #451)
revid & device/raspistill & container/mts: integrating device/raspistill functionality into revid Approved-by: Trek Hopton
This commit is contained in:
commit
2905990cc6
|
@ -37,6 +37,7 @@ import (
|
||||||
"bitbucket.org/ausocean/av/container/mts/pes"
|
"bitbucket.org/ausocean/av/container/mts/pes"
|
||||||
"bitbucket.org/ausocean/av/container/mts/psi"
|
"bitbucket.org/ausocean/av/container/mts/psi"
|
||||||
"bitbucket.org/ausocean/utils/realtime"
|
"bitbucket.org/ausocean/utils/realtime"
|
||||||
|
"bitbucket.org/ausocean/utils/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
// These three constants are used to select between the three different
|
// These three constants are used to select between the three different
|
||||||
|
@ -109,13 +110,6 @@ var Meta *meta.Data
|
||||||
// This will help us obtain a realtime for timestamp meta encoding.
|
// This will help us obtain a realtime for timestamp meta encoding.
|
||||||
var RealTime = realtime.NewRealTime()
|
var RealTime = realtime.NewRealTime()
|
||||||
|
|
||||||
type logger interface {
|
|
||||||
Debug(string, ...interface{})
|
|
||||||
Info(string, ...interface{})
|
|
||||||
Warning(string, ...interface{})
|
|
||||||
Error(string, ...interface{})
|
|
||||||
Fatal(string, ...interface{})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Encoder encapsulates properties of an MPEG-TS generator.
|
// Encoder encapsulates properties of an MPEG-TS generator.
|
||||||
type Encoder struct {
|
type Encoder struct {
|
||||||
|
@ -143,12 +137,12 @@ type Encoder struct {
|
||||||
patBytes, pmtBytes []byte
|
patBytes, pmtBytes []byte
|
||||||
|
|
||||||
// log is a function that will be used through the encoder code for logging.
|
// log is a function that will be used through the encoder code for logging.
|
||||||
log logger
|
log logger.LoggerIF
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewEncoder returns an Encoder with the specified media type and rate eg. if a video stream
|
// NewEncoder returns an Encoder with the specified media type and rate eg. if a video stream
|
||||||
// calls write for every frame, the rate will be the frame rate of the video.
|
// calls write for every frame, the rate will be the frame rate of the video.
|
||||||
func NewEncoder(dst io.WriteCloser, log logger, options ...func(*Encoder) error) (*Encoder, error) {
|
func NewEncoder(dst io.WriteCloser, log logger.LoggerIF, options ...func(*Encoder) error) (*Encoder, error) {
|
||||||
e := &Encoder{
|
e := &Encoder{
|
||||||
dst: dst,
|
dst: dst,
|
||||||
writePeriod: time.Duration(float64(time.Second) / defaultRate),
|
writePeriod: time.Duration(float64(time.Second) / defaultRate),
|
||||||
|
@ -181,6 +175,7 @@ func NewEncoder(dst io.WriteCloser, log logger, options ...func(*Encoder) error)
|
||||||
// Write implements io.Writer. Write takes raw video or audio data and encodes into MPEG-TS,
|
// Write implements io.Writer. Write takes raw video or audio data and encodes into MPEG-TS,
|
||||||
// then sending it to the encoder's io.Writer destination.
|
// then sending it to the encoder's io.Writer destination.
|
||||||
func (e *Encoder) Write(data []byte) (int, error) {
|
func (e *Encoder) Write(data []byte) (int, error) {
|
||||||
|
e.log.Debug("writing data", "len(data)", len(data))
|
||||||
switch e.psiMethod {
|
switch e.psiMethod {
|
||||||
case psiMethodPacket:
|
case psiMethodPacket:
|
||||||
e.log.Debug("checking packet no. conditions for PSI write", "count", e.pktCount, "PSI count", e.psiSendCount)
|
e.log.Debug("checking packet no. conditions for PSI write", "count", e.pktCount, "PSI count", e.psiSendCount)
|
||||||
|
@ -329,7 +324,7 @@ func (e *Encoder) ccFor(pid uint16) byte {
|
||||||
|
|
||||||
// updateMeta adds/updates a metaData descriptor in the given psi bytes using data
|
// updateMeta adds/updates a metaData descriptor in the given psi bytes using data
|
||||||
// contained in the global Meta struct.
|
// contained in the global Meta struct.
|
||||||
func updateMeta(b []byte, log logger) ([]byte, error) {
|
func updateMeta(b []byte, log logger.LoggerIF) ([]byte, error) {
|
||||||
p := psi.PSIBytes(b)
|
p := psi.PSIBytes(b)
|
||||||
if RealTime.IsSet() {
|
if RealTime.IsSet() {
|
||||||
t := strconv.Itoa(int(RealTime.Get().Unix()))
|
t := strconv.Itoa(int(RealTime.Get().Unix()))
|
||||||
|
|
|
@ -37,6 +37,7 @@ import (
|
||||||
|
|
||||||
"bitbucket.org/ausocean/av/container/mts/meta"
|
"bitbucket.org/ausocean/av/container/mts/meta"
|
||||||
"bitbucket.org/ausocean/av/container/mts/psi"
|
"bitbucket.org/ausocean/av/container/mts/psi"
|
||||||
|
"bitbucket.org/ausocean/utils/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
type nopCloser struct{ io.Writer }
|
type nopCloser struct{ io.Writer }
|
||||||
|
@ -54,32 +55,6 @@ func (d *destination) Write(p []byte) (int, error) {
|
||||||
return len(p), nil
|
return len(p), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// testLogger will allow logging to be done by the testing pkg.
|
|
||||||
type testLogger testing.T
|
|
||||||
|
|
||||||
func (tl *testLogger) Debug(msg string, args ...interface{}) { tl.log("debug", msg, args...) }
|
|
||||||
func (tl *testLogger) Info(msg string, args ...interface{}) { tl.log("info", msg, args...) }
|
|
||||||
func (tl *testLogger) Warning(msg string, args ...interface{}) { tl.log("warning", msg, args...) }
|
|
||||||
func (tl *testLogger) Error(msg string, args ...interface{}) { tl.log("error", msg, args...) }
|
|
||||||
func (tl *testLogger) Fatal(msg string, args ...interface{}) { tl.log("fatal", msg, args...) }
|
|
||||||
|
|
||||||
func (tl *testLogger) log(lvl string, msg string, args ...interface{}) {
|
|
||||||
switch lvl {
|
|
||||||
case "debug", "info", "warning", "error", "fatal":
|
|
||||||
default:
|
|
||||||
panic("invalid log level")
|
|
||||||
}
|
|
||||||
msg = lvl + ": " + msg
|
|
||||||
for i := 0; i < len(args); i++ {
|
|
||||||
msg += " %v"
|
|
||||||
}
|
|
||||||
if len(args) == 0 {
|
|
||||||
tl.Log(msg + "\n")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
tl.Logf(msg+"\n", args)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestEncodeVideo checks that we can correctly encode some dummy data into a
|
// TestEncodeVideo checks that we can correctly encode some dummy data into a
|
||||||
// valid MPEG-TS stream. This checks for correct MPEG-TS headers and also that the
|
// valid MPEG-TS stream. This checks for correct MPEG-TS headers and also that the
|
||||||
// original data is stored correctly and is retreivable.
|
// original data is stored correctly and is retreivable.
|
||||||
|
@ -127,7 +102,7 @@ func TestEncodeVideo(t *testing.T) {
|
||||||
|
|
||||||
// Create the dst and write the test data to encoder.
|
// Create the dst and write the test data to encoder.
|
||||||
dst := &destination{}
|
dst := &destination{}
|
||||||
e, err := NewEncoder(nopCloser{dst}, (*testLogger)(t), PacketBasedPSI(psiSendCount), Rate(25), MediaType(EncodeH264))
|
e, err := NewEncoder(nopCloser{dst}, (*logger.TestLogger)(t), PacketBasedPSI(psiSendCount), Rate(25), MediaType(EncodeH264))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("could not create MTS encoder, failed with error: %v", err)
|
t.Fatalf("could not create MTS encoder, failed with error: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -191,7 +166,7 @@ func TestEncodePcm(t *testing.T) {
|
||||||
sampleSize := 2
|
sampleSize := 2
|
||||||
blockSize := 16000
|
blockSize := 16000
|
||||||
writeFreq := float64(sampleRate*sampleSize) / float64(blockSize)
|
writeFreq := float64(sampleRate*sampleSize) / float64(blockSize)
|
||||||
e, err := NewEncoder(nopCloser{&buf}, (*testLogger)(t), PacketBasedPSI(10), Rate(writeFreq), MediaType(EncodePCM))
|
e, err := NewEncoder(nopCloser{&buf}, (*logger.TestLogger)(t), PacketBasedPSI(10), Rate(writeFreq), MediaType(EncodePCM))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("could not create MTS encoder, failed with error: %v", err)
|
t.Fatalf("could not create MTS encoder, failed with error: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -295,7 +270,7 @@ const fps = 25
|
||||||
func TestMetaEncode1(t *testing.T) {
|
func TestMetaEncode1(t *testing.T) {
|
||||||
Meta = meta.New()
|
Meta = meta.New()
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
e, err := NewEncoder(nopCloser{&buf}, (*testLogger)(t))
|
e, err := NewEncoder(nopCloser{&buf}, (*logger.TestLogger)(t))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("could not create encoder, failed with error: %v", err)
|
t.Fatalf("could not create encoder, failed with error: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -327,7 +302,7 @@ func TestMetaEncode1(t *testing.T) {
|
||||||
func TestMetaEncode2(t *testing.T) {
|
func TestMetaEncode2(t *testing.T) {
|
||||||
Meta = meta.New()
|
Meta = meta.New()
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
e, err := NewEncoder(nopCloser{&buf}, (*testLogger)(t))
|
e, err := NewEncoder(nopCloser{&buf}, (*logger.TestLogger)(t))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("could not create MTS encoder, failed with error: %v", err)
|
t.Fatalf("could not create MTS encoder, failed with error: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -359,7 +334,7 @@ func TestMetaEncode2(t *testing.T) {
|
||||||
func TestExtractMeta(t *testing.T) {
|
func TestExtractMeta(t *testing.T) {
|
||||||
Meta = meta.New()
|
Meta = meta.New()
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
e, err := NewEncoder(nopCloser{&buf}, (*testLogger)(t))
|
e, err := NewEncoder(nopCloser{&buf}, (*logger.TestLogger)(t))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("could not create MTS encoder, failed with error: %v", err)
|
t.Fatalf("could not create MTS encoder, failed with error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,7 @@ package meta
|
||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
@ -274,7 +275,7 @@ func GetAllFromString(s string) (map[string]string, error) {
|
||||||
// Keys and values are seperated by '=', so split and check that len(kv)=2.
|
// Keys and values are seperated by '=', so split and check that len(kv)=2.
|
||||||
kv := strings.Split(entry, "=")
|
kv := strings.Split(entry, "=")
|
||||||
if len(kv) != 2 {
|
if len(kv) != 2 {
|
||||||
return nil, ErrUnexpectedMetaFormat
|
return nil, fmt.Errorf("not enough key-value pairs (kv: %v)", kv)
|
||||||
}
|
}
|
||||||
all[kv[0]] = kv[1]
|
all[kv[0]] = kv[1]
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,6 +42,7 @@ import (
|
||||||
"bitbucket.org/ausocean/av/container/mts/meta"
|
"bitbucket.org/ausocean/av/container/mts/meta"
|
||||||
"bitbucket.org/ausocean/av/container/mts/pes"
|
"bitbucket.org/ausocean/av/container/mts/pes"
|
||||||
"bitbucket.org/ausocean/av/container/mts/psi"
|
"bitbucket.org/ausocean/av/container/mts/psi"
|
||||||
|
"bitbucket.org/ausocean/utils/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TestGetPTSRange checks that GetPTSRange can correctly get the first and last
|
// TestGetPTSRange checks that GetPTSRange can correctly get the first and last
|
||||||
|
@ -422,7 +423,13 @@ func TestTrimToMetaRange(t *testing.T) {
|
||||||
// TestSegmentForMeta checks that SegmentForMeta can correctly segment some MTS
|
// TestSegmentForMeta checks that SegmentForMeta can correctly segment some MTS
|
||||||
// data based on a given meta key and value.
|
// data based on a given meta key and value.
|
||||||
func TestSegmentForMeta(t *testing.T) {
|
func TestSegmentForMeta(t *testing.T) {
|
||||||
Meta = meta.New()
|
// Copyright information prefixed to all metadata.
|
||||||
|
const (
|
||||||
|
metaPreambleKey = "copyright"
|
||||||
|
metaPreambleData = "ausocean.org/license/content2019"
|
||||||
|
)
|
||||||
|
|
||||||
|
Meta = meta.NewWith([][2]string{{metaPreambleKey, metaPreambleData}})
|
||||||
|
|
||||||
const (
|
const (
|
||||||
nPSI = 10 // The number of PSI pairs to write.
|
nPSI = 10 // The number of PSI pairs to write.
|
||||||
|
@ -670,7 +677,7 @@ func TestFindPSI(t *testing.T) {
|
||||||
}).Bytes()
|
}).Bytes()
|
||||||
|
|
||||||
Meta.Add(metaKey, test.meta)
|
Meta.Add(metaKey, test.meta)
|
||||||
pmtTable, err = updateMeta(pmtTable, (*testLogger)(t))
|
pmtTable, err = updateMeta(pmtTable, (*logger.TestLogger)(t))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("could not update meta for test %d", i)
|
t.Fatalf("could not update meta for test %d", i)
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,6 +30,7 @@ package mts
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"sort"
|
"sort"
|
||||||
|
|
||||||
"github.com/Comcast/gots/packet"
|
"github.com/Comcast/gots/packet"
|
||||||
|
@ -72,13 +73,13 @@ func Extract(p []byte) (*Clip, error) {
|
||||||
case PmtPid:
|
case PmtPid:
|
||||||
meta, err = ExtractMeta(pkt[:])
|
meta, err = ExtractMeta(pkt[:])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, fmt.Errorf("could not extract meta data: %w", err)
|
||||||
}
|
}
|
||||||
default: // Must be media.
|
default: // Must be media.
|
||||||
// Get the MPEG-TS payload.
|
// Get the MPEG-TS payload.
|
||||||
payload, err := pkt.Payload()
|
payload, err := pkt.Payload()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, fmt.Errorf("could not extract payload: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// If PUSI is true then we know it's the start of a new frame, and we have
|
// If PUSI is true then we know it's the start of a new frame, and we have
|
||||||
|
@ -86,7 +87,7 @@ func Extract(p []byte) (*Clip, error) {
|
||||||
if pkt.PayloadUnitStartIndicator() {
|
if pkt.PayloadUnitStartIndicator() {
|
||||||
_pes, err := pes.NewPESHeader(payload)
|
_pes, err := pes.NewPESHeader(payload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, fmt.Errorf("could not parse PES: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Extract the PTS and ID, then add a new frame to the clip.
|
// Extract the PTS and ID, then add a new frame to the clip.
|
||||||
|
|
|
@ -38,6 +38,7 @@ import (
|
||||||
"bitbucket.org/ausocean/av/container/mts/meta"
|
"bitbucket.org/ausocean/av/container/mts/meta"
|
||||||
"bitbucket.org/ausocean/av/container/mts/pes"
|
"bitbucket.org/ausocean/av/container/mts/pes"
|
||||||
"bitbucket.org/ausocean/av/container/mts/psi"
|
"bitbucket.org/ausocean/av/container/mts/psi"
|
||||||
|
"bitbucket.org/ausocean/utils/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TestExtract checks that we can coorectly extract media, pts, id and meta from
|
// TestExtract checks that we can coorectly extract media, pts, id and meta from
|
||||||
|
@ -154,7 +155,7 @@ func writePSIWithMeta(b *bytes.Buffer, t *testing.T) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the meta in the pmt table.
|
// Update the meta in the pmt table.
|
||||||
pmtBytes, err = updateMeta(pmtBytes, (*testLogger)(t))
|
pmtBytes, err = updateMeta(pmtBytes, (*logger.TestLogger)(t))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,6 +30,7 @@ package psi
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"github.com/Comcast/gots/psi"
|
"github.com/Comcast/gots/psi"
|
||||||
)
|
)
|
||||||
|
@ -303,6 +304,9 @@ func (p *PSIBytes) AddDescriptor(tag int, data []byte) error {
|
||||||
i, desc := p.HasDescriptor(tag)
|
i, desc := p.HasDescriptor(tag)
|
||||||
if desc == nil {
|
if desc == nil {
|
||||||
err := p.createDescriptor(tag, data)
|
err := p.createDescriptor(tag, data)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not create descriptor: %w", err)
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -43,7 +43,7 @@ const (
|
||||||
// TODO(Saxon): find nImages programmatically ?
|
// TODO(Saxon): find nImages programmatically ?
|
||||||
nImages = 6
|
nImages = 6
|
||||||
|
|
||||||
imgPath = "../../../test/test-data/av/input/jpeg/"
|
imgPath = "../../test/test-data/av/input/jpeg/"
|
||||||
jpgExt = ".jpg"
|
jpgExt = ".jpg"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,11 @@
|
||||||
|
{
|
||||||
|
"Input":"Raspistill",
|
||||||
|
"InputCodec":"JPEG",
|
||||||
|
"Output":"Files",
|
||||||
|
"OutputPath":"out/",
|
||||||
|
"TimelapseDuration":"3600",
|
||||||
|
"TimelapseInterval":"10",
|
||||||
|
"logging":"Debug",
|
||||||
|
"Suppress":"false",
|
||||||
|
"RBStartElementSize":"1000000"
|
||||||
|
}
|
|
@ -43,6 +43,7 @@ import (
|
||||||
"bitbucket.org/ausocean/av/device"
|
"bitbucket.org/ausocean/av/device"
|
||||||
"bitbucket.org/ausocean/av/device/file"
|
"bitbucket.org/ausocean/av/device/file"
|
||||||
"bitbucket.org/ausocean/av/device/geovision"
|
"bitbucket.org/ausocean/av/device/geovision"
|
||||||
|
"bitbucket.org/ausocean/av/device/raspistill"
|
||||||
"bitbucket.org/ausocean/av/device/raspivid"
|
"bitbucket.org/ausocean/av/device/raspivid"
|
||||||
"bitbucket.org/ausocean/av/device/webcam"
|
"bitbucket.org/ausocean/av/device/webcam"
|
||||||
"bitbucket.org/ausocean/av/filter"
|
"bitbucket.org/ausocean/av/filter"
|
||||||
|
@ -81,7 +82,7 @@ func (r *Revid) reset(c config.Config) error {
|
||||||
var encOptions []func(*mts.Encoder) error
|
var encOptions []func(*mts.Encoder) error
|
||||||
|
|
||||||
switch r.cfg.Input {
|
switch r.cfg.Input {
|
||||||
case config.InputRaspivid, config.InputFile, config.InputV4L, config.InputRTSP:
|
case config.InputRaspivid, config.InputRaspistill, config.InputFile, config.InputV4L, config.InputRTSP:
|
||||||
switch r.cfg.InputCodec {
|
switch r.cfg.InputCodec {
|
||||||
case codecutil.H265:
|
case codecutil.H265:
|
||||||
if r.cfg.Input != config.InputRTSP {
|
if r.cfg.Input != config.InputRTSP {
|
||||||
|
@ -94,10 +95,14 @@ func (r *Revid) reset(c config.Config) error {
|
||||||
st = mts.EncodeMJPEG
|
st = mts.EncodeMJPEG
|
||||||
encOptions = append(encOptions, mts.TimeBasedPSI(time.Duration(r.cfg.PSITime)*time.Second))
|
encOptions = append(encOptions, mts.TimeBasedPSI(time.Duration(r.cfg.PSITime)*time.Second))
|
||||||
r.cfg.CBR = true
|
r.cfg.CBR = true
|
||||||
|
case codecutil.JPEG:
|
||||||
|
st = mts.EncodeJPEG
|
||||||
|
encOptions = append(encOptions, mts.TimeBasedPSI(time.Duration(r.cfg.PSITime)*time.Second))
|
||||||
|
r.cfg.CBR = true
|
||||||
case codecutil.PCM, codecutil.ADPCM:
|
case codecutil.PCM, codecutil.ADPCM:
|
||||||
return nil, fmt.Errorf("invalid input codec: %v for input: %v", r.cfg.InputCodec, r.cfg.Input)
|
return nil, fmt.Errorf("invalid input codec: %v for input: %v", r.cfg.InputCodec, r.cfg.Input)
|
||||||
default:
|
default:
|
||||||
panic("unknown input codec")
|
panic("unknown input codec for Raspivid, Raspistill, File, V4l or RTSP input")
|
||||||
}
|
}
|
||||||
case config.InputAudio:
|
case config.InputAudio:
|
||||||
switch r.cfg.InputCodec {
|
switch r.cfg.InputCodec {
|
||||||
|
@ -118,7 +123,7 @@ func (r *Revid) reset(c config.Config) error {
|
||||||
panic("unknown input type")
|
panic("unknown input type")
|
||||||
}
|
}
|
||||||
encOptions = append(encOptions, mts.MediaType(st), mts.Rate(rate))
|
encOptions = append(encOptions, mts.MediaType(st), mts.Rate(rate))
|
||||||
return mts.NewEncoder(dst, &encLog{r.cfg.Logger}, encOptions...)
|
return mts.NewEncoder(dst, r.cfg.Logger, encOptions...)
|
||||||
},
|
},
|
||||||
func(dst io.WriteCloser, fps int) (io.WriteCloser, error) {
|
func(dst io.WriteCloser, fps int) (io.WriteCloser, error) {
|
||||||
return flv.NewEncoder(dst, true, true, fps)
|
return flv.NewEncoder(dst, true, true, fps)
|
||||||
|
@ -167,7 +172,7 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
|
||||||
// Calculate no. of ring buffer elements based on starting element size
|
// Calculate no. of ring buffer elements based on starting element size
|
||||||
// const and config directed max ring buffer size, then create buffer.
|
// const and config directed max ring buffer size, then create buffer.
|
||||||
// This is only used if the selected output uses a ring buffer.
|
// This is only used if the selected output uses a ring buffer.
|
||||||
nElements := r.cfg.RBCapacity / rbStartingElementSize
|
nElements := r.cfg.RBCapacity / r.cfg.RBStartElementSize
|
||||||
writeTimeout := time.Duration(r.cfg.RBWriteTimeout) * time.Second
|
writeTimeout := time.Duration(r.cfg.RBWriteTimeout) * time.Second
|
||||||
|
|
||||||
// We will go through our outputs and create the corresponding senders to add
|
// We will go through our outputs and create the corresponding senders to add
|
||||||
|
@ -178,7 +183,7 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
|
||||||
switch out {
|
switch out {
|
||||||
case config.OutputHTTP:
|
case config.OutputHTTP:
|
||||||
r.cfg.Logger.Log(logger.Debug, "using HTTP output")
|
r.cfg.Logger.Log(logger.Debug, "using HTTP output")
|
||||||
rb := ring.NewBuffer(rbStartingElementSize, int(nElements), writeTimeout)
|
rb := ring.NewBuffer(int(r.cfg.RBStartElementSize), int(nElements), writeTimeout)
|
||||||
hs := newHTTPSender(r.ns, r.cfg.Logger.Log, r.bitrate.Report)
|
hs := newHTTPSender(r.ns, r.cfg.Logger.Log, r.bitrate.Report)
|
||||||
w = newMTSSender(hs, r.cfg.Logger.Log, rb, r.cfg.ClipDuration)
|
w = newMTSSender(hs, r.cfg.Logger.Log, rb, r.cfg.ClipDuration)
|
||||||
mtsSenders = append(mtsSenders, w)
|
mtsSenders = append(mtsSenders, w)
|
||||||
|
@ -192,14 +197,23 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
|
||||||
mtsSenders = append(mtsSenders, w)
|
mtsSenders = append(mtsSenders, w)
|
||||||
case config.OutputFile:
|
case config.OutputFile:
|
||||||
r.cfg.Logger.Log(logger.Debug, "using File output")
|
r.cfg.Logger.Log(logger.Debug, "using File output")
|
||||||
w, err := newFileSender(r.cfg.OutputPath)
|
w, err := newFileSender(r.cfg.Logger, r.cfg.OutputPath, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
mtsSenders = append(mtsSenders, w)
|
mtsSenders = append(mtsSenders, w)
|
||||||
|
case config.OutputFiles:
|
||||||
|
r.cfg.Logger.Log(logger.Debug, "using Files output")
|
||||||
|
rb := ring.NewBuffer(int(r.cfg.RBStartElementSize), int(nElements), writeTimeout)
|
||||||
|
fs, err := newFileSender(r.cfg.Logger, r.cfg.OutputPath, true)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
w = newMTSSender(fs, r.cfg.Logger.Log, rb, r.cfg.ClipDuration)
|
||||||
|
mtsSenders = append(mtsSenders, w)
|
||||||
case config.OutputRTMP:
|
case config.OutputRTMP:
|
||||||
r.cfg.Logger.Log(logger.Debug, "using RTMP output")
|
r.cfg.Logger.Log(logger.Debug, "using RTMP output")
|
||||||
rb := ring.NewBuffer(rbStartingElementSize, int(nElements), writeTimeout)
|
rb := ring.NewBuffer(int(r.cfg.RBStartElementSize), int(nElements), writeTimeout)
|
||||||
w, err := newRtmpSender(r.cfg.RTMPURL, rtmpConnectionMaxTries, rb, r.cfg.Logger.Log, r.bitrate.Report)
|
w, err := newRtmpSender(r.cfg.RTMPURL, rtmpConnectionMaxTries, rb, r.cfg.Logger.Log, r.bitrate.Report)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.cfg.Logger.Log(logger.Warning, "rtmp connect error", "error", err.Error())
|
r.cfg.Logger.Log(logger.Warning, "rtmp connect error", "error", err.Error())
|
||||||
|
@ -273,6 +287,11 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
|
||||||
r.input = raspivid.New(r.cfg.Logger)
|
r.input = raspivid.New(r.cfg.Logger)
|
||||||
err = r.setLexer(r.cfg.InputCodec, false)
|
err = r.setLexer(r.cfg.InputCodec, false)
|
||||||
|
|
||||||
|
case config.InputRaspistill:
|
||||||
|
r.cfg.Logger.Log(logger.Debug, "using raspistill input")
|
||||||
|
r.input = raspistill.New(r.cfg.Logger)
|
||||||
|
r.setLexer(r.cfg.InputCodec, false)
|
||||||
|
|
||||||
case config.InputV4L:
|
case config.InputV4L:
|
||||||
r.cfg.Logger.Log(logger.Debug, "using V4L input")
|
r.cfg.Logger.Log(logger.Debug, "using V4L input")
|
||||||
r.input = webcam.New(r.cfg.Logger)
|
r.input = webcam.New(r.cfg.Logger)
|
||||||
|
@ -327,9 +346,11 @@ func (r *Revid) setLexer(c uint8, isRTSP bool) error {
|
||||||
case codecutil.MJPEG, codecutil.JPEG:
|
case codecutil.MJPEG, codecutil.JPEG:
|
||||||
r.cfg.Logger.Log(logger.Debug, "using MJPEG/JPEG codec")
|
r.cfg.Logger.Log(logger.Debug, "using MJPEG/JPEG codec")
|
||||||
r.lexTo = jpeg.Lex
|
r.lexTo = jpeg.Lex
|
||||||
|
jpeg.Log = r.cfg.Logger
|
||||||
if isRTSP {
|
if isRTSP {
|
||||||
r.lexTo = jpeg.NewExtractor().Extract
|
r.lexTo = jpeg.NewExtractor().Extract
|
||||||
}
|
}
|
||||||
|
|
||||||
case codecutil.PCM, codecutil.ADPCM:
|
case codecutil.PCM, codecutil.ADPCM:
|
||||||
return errors.New("invalid codec for this selected input")
|
return errors.New("invalid codec for this selected input")
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -54,14 +54,6 @@ type Logger interface {
|
||||||
Log(level int8, message string, params ...interface{})
|
Log(level int8, message string, params ...interface{})
|
||||||
}
|
}
|
||||||
|
|
||||||
type encLog struct{ Logger }
|
|
||||||
|
|
||||||
func (el *encLog) Debug(msg string, args ...interface{}) { el.Log(logger.Debug, msg, args...) }
|
|
||||||
func (el *encLog) Info(msg string, args ...interface{}) { el.Log(logger.Info, msg, args...) }
|
|
||||||
func (el *encLog) Warning(msg string, args ...interface{}) { el.Log(logger.Warning, msg, args...) }
|
|
||||||
func (el *encLog) Error(msg string, args ...interface{}) { el.Log(logger.Error, msg, args...) }
|
|
||||||
func (el *encLog) Fatal(msg string, args ...interface{}) { el.Log(logger.Fatal, msg, args...) }
|
|
||||||
|
|
||||||
// Revid provides methods to control a revid session; providing methods
|
// Revid provides methods to control a revid session; providing methods
|
||||||
// to start, stop and change the state of an instance using the Config struct.
|
// to start, stop and change the state of an instance using the Config struct.
|
||||||
type Revid struct {
|
type Revid struct {
|
||||||
|
|
|
@ -0,0 +1,157 @@
|
||||||
|
// +build test
|
||||||
|
|
||||||
|
/*
|
||||||
|
DESCRIPTION
|
||||||
|
revid_test.go provides integration testing of the revid API.
|
||||||
|
|
||||||
|
AUTHORS
|
||||||
|
Saxon A. Nelson-Milton <saxon@ausocean.org>
|
||||||
|
|
||||||
|
LICENSE
|
||||||
|
Copyright (C) 2017-2021 the Australian Ocean Lab (AusOcean)
|
||||||
|
|
||||||
|
It is free software: you can redistribute it and/or modify them
|
||||||
|
under the terms of the GNU General Public License as published by the
|
||||||
|
Free Software Foundation, either version 3 of the License, or (at your
|
||||||
|
option) any later version.
|
||||||
|
|
||||||
|
It is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||||
|
for more details.
|
||||||
|
|
||||||
|
You should have received a copy of the GNU General Public License
|
||||||
|
in gpl.txt. If not, see http://www.gnu.org/licenses.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package revid
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strconv"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"bitbucket.org/ausocean/av/container/mts"
|
||||||
|
"bitbucket.org/ausocean/av/container/mts/meta"
|
||||||
|
"bitbucket.org/ausocean/av/revid/config"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRaspistill(t *testing.T) {
|
||||||
|
// Copyright information prefixed to all metadata.
|
||||||
|
const (
|
||||||
|
metaPreambleKey = "copyright"
|
||||||
|
metaPreambleData = "ausocean.org/license/content2021"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Configuration parameters.
|
||||||
|
const (
|
||||||
|
timelapseInterval = "4"
|
||||||
|
rbStartElementSize = "1000000"
|
||||||
|
input = "Raspistill"
|
||||||
|
codec = "JPEG"
|
||||||
|
output = "Files"
|
||||||
|
outDir = "out"
|
||||||
|
outputPath = outDir + "/"
|
||||||
|
logging = "Debug"
|
||||||
|
testImgDir = "../../../test/test-data/av/input/jpeg/"
|
||||||
|
)
|
||||||
|
|
||||||
|
const runTime = 40 * time.Second
|
||||||
|
|
||||||
|
// Add the copyright metadata.
|
||||||
|
mts.Meta = meta.NewWith([][2]string{{metaPreambleKey, metaPreambleData}})
|
||||||
|
|
||||||
|
// First remove out dir (if exists) to clear contents, then create dir.
|
||||||
|
err := os.RemoveAll(outDir)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("could not remove any prior out directory: %v",err)
|
||||||
|
}
|
||||||
|
|
||||||
|
os.Mkdir(outDir, os.ModePerm)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("could not create new out directory: %v",err)
|
||||||
|
}
|
||||||
|
|
||||||
|
rv, err := New(config.Config{Logger: (*testLogger)(t)}, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("did not expect error from revid.New(): %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = rv.Update(
|
||||||
|
map[string]string{
|
||||||
|
config.KeyInput: input,
|
||||||
|
config.KeyInputCodec: codec,
|
||||||
|
config.KeyOutput: output,
|
||||||
|
config.KeyOutputPath: outputPath,
|
||||||
|
config.KeyTimelapseInterval: timelapseInterval,
|
||||||
|
config.KeyLogging: logging,
|
||||||
|
config.KeyRBStartElementSize: rbStartElementSize,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("did not expect error from rv.Update(): %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = rv.Start()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("did not expect error from rv.Start(): %v", err)
|
||||||
|
}
|
||||||
|
time.Sleep(runTime)
|
||||||
|
rv.Stop()
|
||||||
|
|
||||||
|
// Get output file information.
|
||||||
|
os.Chdir(outDir)
|
||||||
|
var files []string
|
||||||
|
err = filepath.Walk(
|
||||||
|
".",
|
||||||
|
func(path string, info os.FileInfo, err error) error {
|
||||||
|
files = append(files, path)
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("did not expect error from filepath.Walk(): %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(files) == 0 {
|
||||||
|
t.Fatalf("did not expect 0 output files")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Load files outputted files and compare each one with corresponding input.
|
||||||
|
for i, n := range files {
|
||||||
|
// Ignore first file (which is prev dir ".").
|
||||||
|
if i == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
mtsBytes, err := ioutil.ReadFile(n)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("could not read output file: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
clip, err := mts.Extract(mtsBytes)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("could not extract clips from MPEG-TS stream: %v", err)
|
||||||
|
}
|
||||||
|
img := clip.Bytes()
|
||||||
|
|
||||||
|
inImg, err := ioutil.ReadFile(testImgDir + strconv.Itoa(i) + ".jpg")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("could not load input test image: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !bytes.Equal(img, inImg) {
|
||||||
|
t.Errorf("unexpected image extracted for img: %d", i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clean up out directory.
|
||||||
|
err := os.RemoveAll(outDir)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("could not clean up out directory: %v",err)
|
||||||
|
}
|
||||||
|
}
|
|
@ -42,6 +42,7 @@ import (
|
||||||
"bitbucket.org/ausocean/av/container/mts"
|
"bitbucket.org/ausocean/av/container/mts"
|
||||||
"bitbucket.org/ausocean/av/protocol/rtmp"
|
"bitbucket.org/ausocean/av/protocol/rtmp"
|
||||||
"bitbucket.org/ausocean/av/protocol/rtp"
|
"bitbucket.org/ausocean/av/protocol/rtp"
|
||||||
|
"bitbucket.org/ausocean/av/revid/config"
|
||||||
"bitbucket.org/ausocean/iot/pi/netsender"
|
"bitbucket.org/ausocean/iot/pi/netsender"
|
||||||
"bitbucket.org/ausocean/utils/logger"
|
"bitbucket.org/ausocean/utils/logger"
|
||||||
"bitbucket.org/ausocean/utils/ring"
|
"bitbucket.org/ausocean/utils/ring"
|
||||||
|
@ -158,20 +159,38 @@ func extractMeta(r string, log func(lvl int8, msg string, args ...interface{}))
|
||||||
|
|
||||||
// fileSender implements loadSender for a local file destination.
|
// fileSender implements loadSender for a local file destination.
|
||||||
type fileSender struct {
|
type fileSender struct {
|
||||||
file *os.File
|
file *os.File
|
||||||
data []byte
|
data []byte
|
||||||
|
multiFile bool
|
||||||
|
path string
|
||||||
|
init bool
|
||||||
|
log config.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func newFileSender(path string) (*fileSender, error) {
|
// newFileSender returns a new fileSender. Setting multi true will write a new
|
||||||
f, err := os.Create(path)
|
// file for each write to this sender.
|
||||||
if err != nil {
|
func newFileSender(l config.Logger, path string, multiFile bool) (*fileSender, error) {
|
||||||
return nil, err
|
return &fileSender{
|
||||||
}
|
path: path,
|
||||||
return &fileSender{file: f}, nil
|
log: l,
|
||||||
|
multiFile: multiFile,
|
||||||
|
init: true,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write implements io.Writer.
|
// Write implements io.Writer.
|
||||||
func (s *fileSender) Write(d []byte) (int, error) {
|
func (s *fileSender) Write(d []byte) (int, error) {
|
||||||
|
if s.init || s.multiFile {
|
||||||
|
fileName := s.path + time.Now().String()
|
||||||
|
s.log.Debug("creating new output file", "init", s.init, "multiFile", s.multiFile, "fileName", fileName)
|
||||||
|
f, err := os.Create(fileName)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("could not create file to write media to: %w", err)
|
||||||
|
}
|
||||||
|
s.file = f
|
||||||
|
s.init = false
|
||||||
|
}
|
||||||
|
s.log.Debug("writing output file", "len(d)", len(d))
|
||||||
return s.file.Write(d)
|
return s.file.Write(d)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue