Merged in mjpeg-packetisation (pull request #257)

revid: added support for MJPEG packetisation

Approved-by: Alan Noble <anoble@gmail.com>
This commit is contained in:
Saxon Milton 2019-09-25 01:28:53 +00:00
commit fc5edb9adc
6 changed files with 129 additions and 67 deletions

View File

@ -39,11 +39,11 @@ import (
"bitbucket.org/ausocean/utils/realtime" "bitbucket.org/ausocean/utils/realtime"
) )
// Media type values. // Stream IDs as per ITU-T Rec. H.222.0 / ISO/IEC 13818-1 [1], tables 2-22 and 2-34.
// TODO: reference relevant specifications.
const ( const (
H264ID = 27 H264ID = 27
H265ID = 36 H265ID = 36
MJPEGID = 28
audioStreamID = 0xc0 // ADPCM audio stream ID. audioStreamID = 0xc0 // ADPCM audio stream ID.
) )
@ -51,9 +51,16 @@ const (
const ( const (
EncodeH264 = iota EncodeH264 = iota
EncodeH265 EncodeH265
EncodeMJPEG
EncodeAudio EncodeAudio
) )
// The program IDs we assign to different types of media.
const (
VideoPid = 256
AudioPid = 210
)
// Time-related constants. // Time-related constants.
const ( const (
// ptsOffset is the offset added to the clock to determine // ptsOffset is the offset added to the clock to determine
@ -73,6 +80,16 @@ const (
// If we are not using NAL based PSI intervals then we will send PSI every 7 packets. // If we are not using NAL based PSI intervals then we will send PSI every 7 packets.
const psiSendCount = 7 const psiSendCount = 7
const (
hasPayload = 0x1
hasAdaptationField = 0x2
)
const (
hasDTS = 0x1
hasPTS = 0x2
)
// Some common manifestations of PSI. // Some common manifestations of PSI.
var ( var (
// StandardPAT is a minimal PAT. // StandardPAT is a minimal PAT.
@ -147,21 +164,24 @@ type Encoder struct {
// NewEncoder returns an Encoder with the specified media type and rate eg. if a video stream // NewEncoder returns an Encoder with the specified media type and rate eg. if a video stream
// calls write for every frame, the rate will be the frame rate of the video. // calls write for every frame, the rate will be the frame rate of the video.
func NewEncoder(dst io.WriteCloser, rate float64, mediaType int) *Encoder { func NewEncoder(dst io.WriteCloser, rate float64, mediaType int, options ...func(*Encoder) error) (*Encoder, error) {
var mPid uint16 var mPID uint16
var sid byte var sID byte
nbp := true nbp := true
switch mediaType { switch mediaType {
case EncodeAudio: case EncodeAudio:
mPid = AudioPid mPID = AudioPid
sid = audioStreamID sID = audioStreamID
nbp = false nbp = false
case EncodeH265: case EncodeH265:
mPid = VideoPid mPID = VideoPid
sid = H265ID sID = H265ID
case EncodeH264: case EncodeH264:
mPid = VideoPid mPID = VideoPid
sid = H264ID sID = H264ID
case EncodeMJPEG:
mPID = VideoPid
sID = MJPEGID
} }
pmt := BasePMT pmt := BasePMT
@ -169,14 +189,14 @@ func NewEncoder(dst io.WriteCloser, rate float64, mediaType int) *Encoder {
Pcrpid: 0x0100, Pcrpid: 0x0100,
Pil: 0, Pil: 0,
Essd: &psi.ESSD{ Essd: &psi.ESSD{
St: byte(sid), St: byte(sID),
Epid: mPid, Epid: mPID,
Esil: 0x00, Esil: 0x00,
}, },
} }
pmtTable = pmt.Bytes() pmtTable = pmt.Bytes()
return &Encoder{ e := &Encoder{
dst: dst, dst: dst,
writePeriod: time.Duration(float64(time.Second) / rate), writePeriod: time.Duration(float64(time.Second) / rate),
@ -186,31 +206,35 @@ func NewEncoder(dst io.WriteCloser, rate float64, mediaType int) *Encoder {
pktCount: 8, pktCount: 8,
mediaPid: mPid, mediaPid: mPID,
streamID: sid, streamID: sID,
continuity: map[uint16]byte{ continuity: map[uint16]byte{
PatPid: 0, PatPid: 0,
PmtPid: 0, PmtPid: 0,
mPid: 0, mPID: 0,
}, },
} }
for _, option := range options {
err := option(e)
if err != nil {
return nil, fmt.Errorf("option failed with error: %v", err)
}
}
return e, nil
} }
const ( // PacketBasedPSI is an option that can be passed to NewEncoder to select
hasPayload = 0x1 // packet based PSI writing, i.e. PSI are written to the destination every
hasAdaptationField = 0x2 // sendCount packets.
) func PacketBasedPSI(sendCount int) func(*Encoder) error {
return func(e *Encoder) error {
const ( e.nalBasedPSI = false
hasDTS = 0x1 e.psiSendCount = sendCount
hasPTS = 0x2 e.pktCount = e.psiSendCount
) return nil
}
func (e *Encoder) NALBasedPSI(b bool, sendCount int) {
e.nalBasedPSI = b
e.psiSendCount = sendCount
e.pktCount = e.psiSendCount
} }
// Write implements io.Writer. Write takes raw video or audio data and encodes into MPEG-TS, // Write implements io.Writer. Write takes raw video or audio data and encodes into MPEG-TS,

View File

@ -3,8 +3,8 @@ NAME
encoder_test.go encoder_test.go
AUTHOR AUTHOR
Trek Hopton <trek@ausocean.org>
Saxon A. Nelson-Milton <saxon@ausocean.org> Saxon A. Nelson-Milton <saxon@ausocean.org>
Trek Hopton <trek@ausocean.org>
LICENSE LICENSE
encoder_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) encoder_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
@ -101,12 +101,14 @@ func TestEncodeVideo(t *testing.T) {
// Create the dst and write the test data to encoder. // Create the dst and write the test data to encoder.
dst := &destination{} dst := &destination{}
e := NewEncoder(nopCloser{dst}, 25, EncodeH264) e, err := NewEncoder(nopCloser{dst}, 25, EncodeH264, PacketBasedPSI(psiSendCount))
e.NALBasedPSI(false, psiSendCount)
_, err := e.Write(data)
if err != nil { if err != nil {
t.Fatalf("could not write data to encoder, failed with err: %v\n", err) t.Fatalf("could not create MTS encoder, failed with error: %v", err)
}
_, err = e.Write(data)
if err != nil {
t.Fatalf("could not write data to encoder, failed with error: %v\n", err)
} }
// Check headers. // Check headers.
@ -163,7 +165,10 @@ func TestEncodePcm(t *testing.T) {
sampleSize := 2 sampleSize := 2
blockSize := 16000 blockSize := 16000
writeFreq := float64(sampleRate*sampleSize) / float64(blockSize) writeFreq := float64(sampleRate*sampleSize) / float64(blockSize)
e := NewEncoder(nopCloser{&buf}, writeFreq, EncodeAudio) e, err := NewEncoder(nopCloser{&buf}, writeFreq, EncodeAudio)
if err != nil {
t.Fatalf("could not create MTS encoder, failed with error: %v", err)
}
inPath := "../../../test/test-data/av/input/sweep_400Hz_20000Hz_-3dBFS_5s_48khz.pcm" inPath := "../../../test/test-data/av/input/sweep_400Hz_20000Hz_-3dBFS_5s_48khz.pcm"
inPcm, err := ioutil.ReadFile(inPath) inPcm, err := ioutil.ReadFile(inPath)
@ -264,7 +269,11 @@ const fps = 25
func TestMetaEncode1(t *testing.T) { func TestMetaEncode1(t *testing.T) {
Meta = meta.New() Meta = meta.New()
var buf bytes.Buffer var buf bytes.Buffer
e := NewEncoder(nopCloser{&buf}, fps, EncodeH264) e, err := NewEncoder(nopCloser{&buf}, fps, EncodeH264)
if err != nil {
t.Fatalf("could not create encoder, failed with error: %v", err)
}
Meta.Add("ts", "12345678") Meta.Add("ts", "12345678")
if err := e.writePSI(); err != nil { if err := e.writePSI(); err != nil {
t.Errorf("unexpected error: %v\n", err.Error()) t.Errorf("unexpected error: %v\n", err.Error())
@ -292,7 +301,11 @@ func TestMetaEncode1(t *testing.T) {
func TestMetaEncode2(t *testing.T) { func TestMetaEncode2(t *testing.T) {
Meta = meta.New() Meta = meta.New()
var buf bytes.Buffer var buf bytes.Buffer
e := NewEncoder(nopCloser{&buf}, fps, EncodeH264) e, err := NewEncoder(nopCloser{&buf}, fps, EncodeH264)
if err != nil {
t.Fatalf("could not create MTS encoder, failed with error: %v", err)
}
Meta.Add("ts", "12345678") Meta.Add("ts", "12345678")
Meta.Add("loc", "1234,4321,1234") Meta.Add("loc", "1234,4321,1234")
if err := e.writePSI(); err != nil { if err := e.writePSI(); err != nil {
@ -320,7 +333,11 @@ func TestMetaEncode2(t *testing.T) {
func TestExtractMeta(t *testing.T) { func TestExtractMeta(t *testing.T) {
Meta = meta.New() Meta = meta.New()
var buf bytes.Buffer var buf bytes.Buffer
e := NewEncoder(nopCloser{&buf}, fps, EncodeH264) e, err := NewEncoder(nopCloser{&buf}, fps, EncodeH264)
if err != nil {
t.Fatalf("could not create MTS encoder, failed with error: %v", err)
}
Meta.Add("ts", "12345678") Meta.Add("ts", "12345678")
Meta.Add("loc", "1234,4321,1234") Meta.Add("loc", "1234,4321,1234")
if err := e.writePSI(); err != nil { if err := e.writePSI(); err != nil {

View File

@ -42,13 +42,11 @@ import (
const PacketSize = 188 const PacketSize = 188
// Program ID for various types of ts packets. // Standard program IDs for program specific information MPEG-TS packets.
const ( const (
SdtPid = 17 SdtPid = 17
PatPid = 0 PatPid = 0
PmtPid = 4096 PmtPid = 4096
VideoPid = 256
AudioPid = 210
) )
// StreamID is the id of the first stream. // StreamID is the id of the first stream.

View File

@ -286,12 +286,7 @@ func (c *Config) Validate() error {
} }
switch c.InputCodec { switch c.InputCodec {
case codecutil.H264: case codecutil.H264, codecutil.MJPEG, codecutil.PCM, codecutil.ADPCM:
case codecutil.MJPEG:
if c.Quantization > 0 || c.Bitrate == 0 {
return errors.New("bad bitrate or quantization for mjpeg input")
}
case codecutil.PCM, codecutil.ADPCM:
default: default:
switch c.Input { switch c.Input {
case Audio: case Audio:

View File

@ -43,6 +43,7 @@ import (
"bitbucket.org/ausocean/av/codec/codecutil" "bitbucket.org/ausocean/av/codec/codecutil"
"bitbucket.org/ausocean/av/codec/h264" "bitbucket.org/ausocean/av/codec/h264"
"bitbucket.org/ausocean/av/codec/h265" "bitbucket.org/ausocean/av/codec/h265"
"bitbucket.org/ausocean/av/codec/mjpeg"
"bitbucket.org/ausocean/av/container/flv" "bitbucket.org/ausocean/av/container/flv"
"bitbucket.org/ausocean/av/container/mts" "bitbucket.org/ausocean/av/container/mts"
"bitbucket.org/ausocean/av/protocol/rtcp" "bitbucket.org/ausocean/av/protocol/rtcp"
@ -121,7 +122,7 @@ func New(c Config, ns *netsender.Sender) (*Revid, error) {
r := Revid{ns: ns, err: make(chan error)} r := Revid{ns: ns, err: make(chan error)}
err := r.setConfig(c) err := r.setConfig(c)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not set config, failed with error: %v",err) return nil, fmt.Errorf("could not set config, failed with error: %v", err)
} }
r.config.Logger.SetLevel(c.LogLevel) r.config.Logger.SetLevel(c.LogLevel)
go r.handleErrors() go r.handleErrors()
@ -171,16 +172,26 @@ func (r *Revid) reset(config Config) error {
err = r.setupPipeline( err = r.setupPipeline(
func(dst io.WriteCloser, fps float64) (io.WriteCloser, error) { func(dst io.WriteCloser, fps float64) (io.WriteCloser, error) {
var st int var st int
var encOptions []func(*mts.Encoder) error
switch r.config.Input { switch r.config.Input {
case Raspivid, File, V4L: case Raspivid:
switch r.config.InputCodec {
case codecutil.H264:
st = mts.EncodeH264
case codecutil.MJPEG:
st = mts.EncodeMJPEG
encOptions = append(encOptions, mts.PacketBasedPSI(int(r.config.MinFrames)))
}
case File, V4L:
st = mts.EncodeH264 st = mts.EncodeH264
case RTSP: case RTSP:
st = mts.EncodeH265 st = mts.EncodeH265
case Audio: case Audio:
st = mts.EncodeAudio st = mts.EncodeAudio
} }
e := mts.NewEncoder(dst, float64(fps), st)
return e, nil return mts.NewEncoder(dst, float64(fps), st, encOptions...)
}, },
func(dst io.WriteCloser, fps int) (io.WriteCloser, error) { func(dst io.WriteCloser, fps int) (io.WriteCloser, error) {
return flv.NewEncoder(dst, true, true, fps) return flv.NewEncoder(dst, true, true, fps)
@ -290,7 +301,12 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
switch r.config.Input { switch r.config.Input {
case Raspivid: case Raspivid:
r.setupInput = r.startRaspivid r.setupInput = r.startRaspivid
r.lexTo = h264.Lex switch r.config.InputCodec {
case codecutil.H264:
r.lexTo = h264.Lex
case codecutil.MJPEG:
r.lexTo = mjpeg.Lex
}
case V4L: case V4L:
r.setupInput = r.startV4L r.setupInput = r.startV4L
r.lexTo = h264.Lex r.lexTo = h264.Lex
@ -390,6 +406,15 @@ func (r *Revid) Update(vars map[string]string) error {
r.config.Exposure = value r.config.Exposure = value
case "AutoWhiteBalance": case "AutoWhiteBalance":
r.config.AutoWhiteBalance = value r.config.AutoWhiteBalance = value
case "InputCodec":
switch value {
case "H264":
r.config.InputCodec = codecutil.H264
case "MJPEG":
r.config.InputCodec = codecutil.MJPEG
default:
r.config.Logger.Log(logger.Warning, pkg+"invalid InputCodec variable value", "value", value)
}
case "Output": case "Output":
outputs := strings.Split(value, ",") outputs := strings.Split(value, ",")
r.config.Outputs = make([]uint8, len(outputs)) r.config.Outputs = make([]uint8, len(outputs))

View File

@ -135,11 +135,12 @@ func TestMtsSenderSegment(t *testing.T) {
const numberOfClips = 11 const numberOfClips = 11
dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips} dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips}
sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(defaultMTSRBSize, defaultMTSRBElementSize, 0), 0) sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(defaultMTSRBSize, defaultMTSRBElementSize, 0), 0)
encoder := mts.NewEncoder(sender, 25, mts.EncodeH264)
// Turn time based PSI writing off for encoder.
const psiSendCount = 10 const psiSendCount = 10
encoder.NALBasedPSI(false, psiSendCount) encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount))
if err != nil {
t.Fatalf("could not create MTS encoder, failed with error: %v", err)
}
// Write the packets to the encoder, which will in turn write to the mtsSender. // Write the packets to the encoder, which will in turn write to the mtsSender.
// Payload will just be packet number. // Payload will just be packet number.
@ -213,11 +214,12 @@ func TestMtsSenderFailedSend(t *testing.T) {
const clipToFailAt = 3 const clipToFailAt = 3
dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})} dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})}
sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(defaultMTSRBSize, defaultMTSRBElementSize, 0), 0) sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(defaultMTSRBSize, defaultMTSRBElementSize, 0), 0)
encoder := mts.NewEncoder(sender, 25, mts.EncodeH264)
// Turn time based PSI writing off for encoder and send PSI every 10 packets.
const psiSendCount = 10 const psiSendCount = 10
encoder.NALBasedPSI(false, psiSendCount) encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount))
if err != nil {
t.Fatalf("could not create MTS encoder, failed with error: %v", err)
}
// Write the packets to the encoder, which will in turn write to the mtsSender. // Write the packets to the encoder, which will in turn write to the mtsSender.
// Payload will just be packet number. // Payload will just be packet number.
@ -293,11 +295,12 @@ func TestMtsSenderDiscontinuity(t *testing.T) {
const clipToDelay = 3 const clipToDelay = 3
dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})} dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})}
sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(1, defaultMTSRBElementSize, 0), 0) sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(1, defaultMTSRBElementSize, 0), 0)
encoder := mts.NewEncoder(sender, 25, mts.EncodeH264)
// Turn time based PSI writing off for encoder.
const psiSendCount = 10 const psiSendCount = 10
encoder.NALBasedPSI(false, psiSendCount) encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264)
if err != nil {
t.Fatalf("could not create MTS encoder, failed with error: %v", err)
}
// Write the packets to the encoder, which will in turn write to the mtsSender. // Write the packets to the encoder, which will in turn write to the mtsSender.
// Payload will just be packet number. // Payload will just be packet number.