revid: added support for MJPEG packetisation

Changes included adding support for variable InputCodec that may be set via netreceiver/vidgrind to set to H264/MJPEG. Also setting revid's lexTo
function to mjpeg.Lex in the case of an MJPEG InputCodec. Added options parameter to mts.NewEncoder function so that we can change options of the
encoder, namely whether it bases PSI interval on NAL type, or number of frames - in the case of MJPEG we based PSI interval on number of frames.
This commit is contained in:
Saxon 2019-09-24 07:42:26 +09:30
parent f8f78947f9
commit 191be04b11
6 changed files with 130 additions and 68 deletions

View File

@ -39,11 +39,11 @@ import (
"bitbucket.org/ausocean/utils/realtime" "bitbucket.org/ausocean/utils/realtime"
) )
// Media type values. // Stream IDs as per ITU-T Rec. H.222.0 / ISO/IEC 13818-1 [1], tables 2-22 and 2-34.
// TODO: reference relevant specifications.
const ( const (
H264ID = 27 H264ID = 27
H265ID = 36 H265ID = 36
MJPEGID = 27 // TODO: find an available value for this.
audioStreamID = 0xc0 // ADPCM audio stream ID. audioStreamID = 0xc0 // ADPCM audio stream ID.
) )
@ -51,9 +51,16 @@ const (
const ( const (
EncodeH264 = iota EncodeH264 = iota
EncodeH265 EncodeH265
EncodeMJPEG
EncodeAudio EncodeAudio
) )
// The program IDs we assign to different types of media.
const (
VideoPid = 256
AudioPid = 210
)
// Time-related constants. // Time-related constants.
const ( const (
// ptsOffset is the offset added to the clock to determine // ptsOffset is the offset added to the clock to determine
@ -73,6 +80,16 @@ const (
// If we are not using NAL based PSI intervals then we will send PSI every 7 packets. // If we are not using NAL based PSI intervals then we will send PSI every 7 packets.
const psiSendCount = 7 const psiSendCount = 7
const (
hasPayload = 0x1
hasAdaptationField = 0x2
)
const (
hasDTS = 0x1
hasPTS = 0x2
)
// Some common manifestations of PSI. // Some common manifestations of PSI.
var ( var (
// StandardPAT is a minimal PAT. // StandardPAT is a minimal PAT.
@ -111,7 +128,7 @@ var (
} }
) )
// Meta allows addition of metadata to encoded mts from outside of this pkg. // Meta allows addition of metadata to encoded mts from outsIDe of this pkg.
// See meta pkg for usage. // See meta pkg for usage.
// //
// TODO: make this not global. // TODO: make this not global.
@ -147,21 +164,24 @@ type Encoder struct {
// NewEncoder returns an Encoder with the specified media type and rate eg. if a video stream // NewEncoder returns an Encoder with the specified media type and rate eg. if a video stream
// calls write for every frame, the rate will be the frame rate of the video. // calls write for every frame, the rate will be the frame rate of the video.
func NewEncoder(dst io.WriteCloser, rate float64, mediaType int) *Encoder { func NewEncoder(dst io.WriteCloser, rate float64, mediaType int, options ...func(*Encoder) error) (*Encoder, error) {
var mPid uint16 var mPID uint16
var sid byte var sID byte
nbp := true nbp := true
switch mediaType { switch mediaType {
case EncodeAudio: case EncodeAudio:
mPid = AudioPid mPID = AudioPid
sid = audioStreamID sID = audioStreamID
nbp = false nbp = false
case EncodeH265: case EncodeH265:
mPid = VideoPid mPID = VideoPid
sid = H265ID sID = H265ID
case EncodeH264: case EncodeH264:
mPid = VideoPid mPID = VideoPid
sid = H264ID sID = H264ID
case EncodeMJPEG:
mPID = VideoPid
sID = MJPEGID
} }
pmt := BasePMT pmt := BasePMT
@ -169,14 +189,14 @@ func NewEncoder(dst io.WriteCloser, rate float64, mediaType int) *Encoder {
Pcrpid: 0x0100, Pcrpid: 0x0100,
Pil: 0, Pil: 0,
Essd: &psi.ESSD{ Essd: &psi.ESSD{
St: byte(sid), St: byte(sID),
Epid: mPid, Epid: mPID,
Esil: 0x00, Esil: 0x00,
}, },
} }
pmtTable = pmt.Bytes() pmtTable = pmt.Bytes()
return &Encoder{ e := &Encoder{
dst: dst, dst: dst,
writePeriod: time.Duration(float64(time.Second) / rate), writePeriod: time.Duration(float64(time.Second) / rate),
@ -186,31 +206,35 @@ func NewEncoder(dst io.WriteCloser, rate float64, mediaType int) *Encoder {
pktCount: 8, pktCount: 8,
mediaPid: mPid, mediaPid: mPID,
streamID: sid, streamID: sID,
continuity: map[uint16]byte{ continuity: map[uint16]byte{
PatPid: 0, PatPid: 0,
PmtPid: 0, PmtPid: 0,
mPid: 0, mPID: 0,
}, },
} }
for _, option := range options {
err := option(e)
if err != nil {
return nil, fmt.Errorf("option failed with error: %v", err)
}
}
return e, nil
} }
const ( // PacketBasedPSI is an option that can be passed to NewEncoder to select
hasPayload = 0x1 // packet based PSI writing, i.e. PSI are written to the destination every
hasAdaptationField = 0x2 // sendCount packets.
) func PacketBasedPSI(sendCount int) func(*Encoder) error {
return func(e *Encoder) error {
const ( e.nalBasedPSI = false
hasDTS = 0x1
hasPTS = 0x2
)
func (e *Encoder) NALBasedPSI(b bool, sendCount int) {
e.nalBasedPSI = b
e.psiSendCount = sendCount e.psiSendCount = sendCount
e.pktCount = e.psiSendCount e.pktCount = e.psiSendCount
return nil
}
} }
// Write implements io.Writer. Write takes raw video or audio data and encodes into MPEG-TS, // Write implements io.Writer. Write takes raw video or audio data and encodes into MPEG-TS,

View File

@ -3,8 +3,8 @@ NAME
encoder_test.go encoder_test.go
AUTHOR AUTHOR
Trek Hopton <trek@ausocean.org>
Saxon A. Nelson-Milton <saxon@ausocean.org> Saxon A. Nelson-Milton <saxon@ausocean.org>
Trek Hopton <trek@ausocean.org>
LICENSE LICENSE
encoder_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) encoder_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
@ -101,12 +101,14 @@ func TestEncodeVideo(t *testing.T) {
// Create the dst and write the test data to encoder. // Create the dst and write the test data to encoder.
dst := &destination{} dst := &destination{}
e := NewEncoder(nopCloser{dst}, 25, EncodeH264) e, err := NewEncoder(nopCloser{dst}, 25, EncodeH264, PacketBasedPSI(psiSendCount))
e.NALBasedPSI(false, psiSendCount)
_, err := e.Write(data)
if err != nil { if err != nil {
t.Fatalf("could not write data to encoder, failed with err: %v\n", err) t.Fatalf("could not create MTS encoder, failed with error: %v", err)
}
_, err = e.Write(data)
if err != nil {
t.Fatalf("could not write data to encoder, failed with error: %v\n", err)
} }
// Check headers. // Check headers.
@ -163,7 +165,10 @@ func TestEncodePcm(t *testing.T) {
sampleSize := 2 sampleSize := 2
blockSize := 16000 blockSize := 16000
writeFreq := float64(sampleRate*sampleSize) / float64(blockSize) writeFreq := float64(sampleRate*sampleSize) / float64(blockSize)
e := NewEncoder(nopCloser{&buf}, writeFreq, EncodeAudio) e, err := NewEncoder(nopCloser{&buf}, writeFreq, EncodeAudio)
if err != nil {
t.Fatalf("could not create MTS encoder, failed with error: %v", err)
}
inPath := "../../../test/test-data/av/input/sweep_400Hz_20000Hz_-3dBFS_5s_48khz.pcm" inPath := "../../../test/test-data/av/input/sweep_400Hz_20000Hz_-3dBFS_5s_48khz.pcm"
inPcm, err := ioutil.ReadFile(inPath) inPcm, err := ioutil.ReadFile(inPath)
@ -264,7 +269,11 @@ const fps = 25
func TestMetaEncode1(t *testing.T) { func TestMetaEncode1(t *testing.T) {
Meta = meta.New() Meta = meta.New()
var buf bytes.Buffer var buf bytes.Buffer
e := NewEncoder(nopCloser{&buf}, fps, EncodeH264) e, err := NewEncoder(nopCloser{&buf}, fps, EncodeH264)
if err != nil {
t.Fatalf("could not create encoder, failed with error: %v", err)
}
Meta.Add("ts", "12345678") Meta.Add("ts", "12345678")
if err := e.writePSI(); err != nil { if err := e.writePSI(); err != nil {
t.Errorf("unexpected error: %v\n", err.Error()) t.Errorf("unexpected error: %v\n", err.Error())
@ -292,7 +301,11 @@ func TestMetaEncode1(t *testing.T) {
func TestMetaEncode2(t *testing.T) { func TestMetaEncode2(t *testing.T) {
Meta = meta.New() Meta = meta.New()
var buf bytes.Buffer var buf bytes.Buffer
e := NewEncoder(nopCloser{&buf}, fps, EncodeH264) e, err := NewEncoder(nopCloser{&buf}, fps, EncodeH264)
if err != nil {
t.Fatalf("could not create MTS encoder, failed with error: %v", err)
}
Meta.Add("ts", "12345678") Meta.Add("ts", "12345678")
Meta.Add("loc", "1234,4321,1234") Meta.Add("loc", "1234,4321,1234")
if err := e.writePSI(); err != nil { if err := e.writePSI(); err != nil {
@ -320,7 +333,11 @@ func TestMetaEncode2(t *testing.T) {
func TestExtractMeta(t *testing.T) { func TestExtractMeta(t *testing.T) {
Meta = meta.New() Meta = meta.New()
var buf bytes.Buffer var buf bytes.Buffer
e := NewEncoder(nopCloser{&buf}, fps, EncodeH264) e, err := NewEncoder(nopCloser{&buf}, fps, EncodeH264)
if err != nil {
t.Fatalf("could not create MTS encoder, failed with error: %v", err)
}
Meta.Add("ts", "12345678") Meta.Add("ts", "12345678")
Meta.Add("loc", "1234,4321,1234") Meta.Add("loc", "1234,4321,1234")
if err := e.writePSI(); err != nil { if err := e.writePSI(); err != nil {

View File

@ -42,13 +42,11 @@ import (
const PacketSize = 188 const PacketSize = 188
// Program ID for various types of ts packets. // Standard program IDs for program specific information MPEG-TS packets.
const ( const (
SdtPid = 17 SdtPid = 17
PatPid = 0 PatPid = 0
PmtPid = 4096 PmtPid = 4096
VideoPid = 256
AudioPid = 210
) )
// StreamID is the id of the first stream. // StreamID is the id of the first stream.

View File

@ -286,12 +286,7 @@ func (c *Config) Validate() error {
} }
switch c.InputCodec { switch c.InputCodec {
case codecutil.H264: case codecutil.H264, codecutil.MJPEG, codecutil.PCM, codecutil.ADPCM:
case codecutil.MJPEG:
if c.Quantization > 0 || c.Bitrate == 0 {
return errors.New("bad bitrate or quantization for mjpeg input")
}
case codecutil.PCM, codecutil.ADPCM:
default: default:
switch c.Input { switch c.Input {
case Audio: case Audio:

View File

@ -43,6 +43,7 @@ import (
"bitbucket.org/ausocean/av/codec/codecutil" "bitbucket.org/ausocean/av/codec/codecutil"
"bitbucket.org/ausocean/av/codec/h264" "bitbucket.org/ausocean/av/codec/h264"
"bitbucket.org/ausocean/av/codec/h265" "bitbucket.org/ausocean/av/codec/h265"
"bitbucket.org/ausocean/av/codec/mjpeg"
"bitbucket.org/ausocean/av/container/flv" "bitbucket.org/ausocean/av/container/flv"
"bitbucket.org/ausocean/av/container/mts" "bitbucket.org/ausocean/av/container/mts"
"bitbucket.org/ausocean/av/protocol/rtcp" "bitbucket.org/ausocean/av/protocol/rtcp"
@ -121,7 +122,7 @@ func New(c Config, ns *netsender.Sender) (*Revid, error) {
r := Revid{ns: ns, err: make(chan error)} r := Revid{ns: ns, err: make(chan error)}
err := r.setConfig(c) err := r.setConfig(c)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not set config, failed with error: %v",err) return nil, fmt.Errorf("could not set config, failed with error: %v", err)
} }
r.config.Logger.SetLevel(c.LogLevel) r.config.Logger.SetLevel(c.LogLevel)
go r.handleErrors() go r.handleErrors()
@ -171,16 +172,26 @@ func (r *Revid) reset(config Config) error {
err = r.setupPipeline( err = r.setupPipeline(
func(dst io.WriteCloser, fps float64) (io.WriteCloser, error) { func(dst io.WriteCloser, fps float64) (io.WriteCloser, error) {
var st int var st int
var encOptions []func(*mts.Encoder) error
switch r.config.Input { switch r.config.Input {
case Raspivid, File, V4L: case Raspivid:
switch r.config.InputCodec {
case H264:
st = mts.EncodeH264
case MJPEG:
st = mts.EncodeMJPEG
encOptions = append(encOptions, mts.PacketBasedPSI(int(r.config.MinFrames)))
}
case File, V4L:
st = mts.EncodeH264 st = mts.EncodeH264
case RTSP: case RTSP:
st = mts.EncodeH265 st = mts.EncodeH265
case Audio: case Audio:
st = mts.EncodeAudio st = mts.EncodeAudio
} }
e := mts.NewEncoder(dst, float64(fps), st)
return e, nil return mts.NewEncoder(dst, float64(fps), st, encOptions...)
}, },
func(dst io.WriteCloser, fps int) (io.WriteCloser, error) { func(dst io.WriteCloser, fps int) (io.WriteCloser, error) {
return flv.NewEncoder(dst, true, true, fps) return flv.NewEncoder(dst, true, true, fps)
@ -290,7 +301,12 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
switch r.config.Input { switch r.config.Input {
case Raspivid: case Raspivid:
r.setupInput = r.startRaspivid r.setupInput = r.startRaspivid
switch r.config.InputCodec {
case H264:
r.lexTo = h264.Lex r.lexTo = h264.Lex
case MJPEG:
r.lexTo = mjpeg.Lex
}
case V4L: case V4L:
r.setupInput = r.startV4L r.setupInput = r.startV4L
r.lexTo = h264.Lex r.lexTo = h264.Lex
@ -390,6 +406,15 @@ func (r *Revid) Update(vars map[string]string) error {
r.config.Exposure = value r.config.Exposure = value
case "AutoWhiteBalance": case "AutoWhiteBalance":
r.config.AutoWhiteBalance = value r.config.AutoWhiteBalance = value
case "InputCodec":
switch value {
case "H264":
r.config.InputCodec = H264
case "MJPEG":
r.config.InputCodec = MJPEG
default:
r.config.Logger.Log(logger.Warning, pkg+"invalid InputCodec variable value", "value", value)
}
case "Output": case "Output":
outputs := strings.Split(value, ",") outputs := strings.Split(value, ",")
r.config.Outputs = make([]uint8, len(outputs)) r.config.Outputs = make([]uint8, len(outputs))

View File

@ -135,11 +135,12 @@ func TestMtsSenderSegment(t *testing.T) {
const numberOfClips = 11 const numberOfClips = 11
dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips} dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips}
sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(defaultMTSRBSize, defaultMTSRBElementSize, 0), 0) sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(defaultMTSRBSize, defaultMTSRBElementSize, 0), 0)
encoder := mts.NewEncoder(sender, 25, mts.EncodeH264)
// Turn time based PSI writing off for encoder.
const psiSendCount = 10 const psiSendCount = 10
encoder.NALBasedPSI(false, psiSendCount) encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount))
if err != nil {
t.Fatalf("could not create MTS encoder, failed with error: %v", err)
}
// Write the packets to the encoder, which will in turn write to the mtsSender. // Write the packets to the encoder, which will in turn write to the mtsSender.
// Payload will just be packet number. // Payload will just be packet number.
@ -213,11 +214,12 @@ func TestMtsSenderFailedSend(t *testing.T) {
const clipToFailAt = 3 const clipToFailAt = 3
dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})} dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})}
sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(defaultMTSRBSize, defaultMTSRBElementSize, 0), 0) sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(defaultMTSRBSize, defaultMTSRBElementSize, 0), 0)
encoder := mts.NewEncoder(sender, 25, mts.EncodeH264)
// Turn time based PSI writing off for encoder and send PSI every 10 packets.
const psiSendCount = 10 const psiSendCount = 10
encoder.NALBasedPSI(false, psiSendCount) encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount))
if err != nil {
t.Fatalf("could not create MTS encoder, failed with error: %v", err)
}
// Write the packets to the encoder, which will in turn write to the mtsSender. // Write the packets to the encoder, which will in turn write to the mtsSender.
// Payload will just be packet number. // Payload will just be packet number.
@ -293,11 +295,12 @@ func TestMtsSenderDiscontinuity(t *testing.T) {
const clipToDelay = 3 const clipToDelay = 3
dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})} dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})}
sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(1, defaultMTSRBElementSize, 0), 0) sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(1, defaultMTSRBElementSize, 0), 0)
encoder := mts.NewEncoder(sender, 25, mts.EncodeH264)
// Turn time based PSI writing off for encoder.
const psiSendCount = 10 const psiSendCount = 10
encoder.NALBasedPSI(false, psiSendCount) encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264)
if err != nil {
t.Fatalf("could not create MTS encoder, failed with error: %v", err)
}
// Write the packets to the encoder, which will in turn write to the mtsSender. // Write the packets to the encoder, which will in turn write to the mtsSender.
// Payload will just be packet number. // Payload will just be packet number.