From 191be04b11306ada4c7b772e189c87add3933a68 Mon Sep 17 00:00:00 2001 From: Saxon Date: Tue, 24 Sep 2019 07:42:26 +0930 Subject: [PATCH] revid: added support for MJPEG packetisation Changes included adding support for variable InputCodec that may be set via netreceiver/vidgrind to set to H264/MJPEG. Also setting revid's lexTo function to mjpeg.Lex in the case of an MJPEG InputCodec. Added options parameter to mts.NewEncoder function so that we can change options of the encoder, namely whether it bases PSI interval on NAL type, or number of frames - in the case of MJPEG we based PSI interval on number of frames. --- container/mts/encoder.go | 88 ++++++++++++++++++++++------------- container/mts/encoder_test.go | 37 +++++++++++---- container/mts/mpegts.go | 10 ++-- revid/config.go | 7 +-- revid/revid.go | 35 ++++++++++++-- revid/senders_test.go | 21 +++++---- 6 files changed, 130 insertions(+), 68 deletions(-) diff --git a/container/mts/encoder.go b/container/mts/encoder.go index a903500b..d5fcde53 100644 --- a/container/mts/encoder.go +++ b/container/mts/encoder.go @@ -39,11 +39,11 @@ import ( "bitbucket.org/ausocean/utils/realtime" ) -// Media type values. -// TODO: reference relevant specifications. +// Stream IDs as per ITU-T Rec. H.222.0 / ISO/IEC 13818-1 [1], tables 2-22 and 2-34. const ( H264ID = 27 H265ID = 36 + MJPEGID = 27 // TODO: find an available value for this. audioStreamID = 0xc0 // ADPCM audio stream ID. ) @@ -51,9 +51,16 @@ const ( const ( EncodeH264 = iota EncodeH265 + EncodeMJPEG EncodeAudio ) +// The program IDs we assign to different types of media. +const ( + VideoPid = 256 + AudioPid = 210 +) + // Time-related constants. const ( // ptsOffset is the offset added to the clock to determine @@ -73,6 +80,16 @@ const ( // If we are not using NAL based PSI intervals then we will send PSI every 7 packets. const psiSendCount = 7 +const ( + hasPayload = 0x1 + hasAdaptationField = 0x2 +) + +const ( + hasDTS = 0x1 + hasPTS = 0x2 +) + // Some common manifestations of PSI. var ( // StandardPAT is a minimal PAT. @@ -111,7 +128,7 @@ var ( } ) -// Meta allows addition of metadata to encoded mts from outside of this pkg. +// Meta allows addition of metadata to encoded mts from outsIDe of this pkg. // See meta pkg for usage. // // TODO: make this not global. @@ -147,21 +164,24 @@ type Encoder struct { // NewEncoder returns an Encoder with the specified media type and rate eg. if a video stream // calls write for every frame, the rate will be the frame rate of the video. -func NewEncoder(dst io.WriteCloser, rate float64, mediaType int) *Encoder { - var mPid uint16 - var sid byte +func NewEncoder(dst io.WriteCloser, rate float64, mediaType int, options ...func(*Encoder) error) (*Encoder, error) { + var mPID uint16 + var sID byte nbp := true switch mediaType { case EncodeAudio: - mPid = AudioPid - sid = audioStreamID + mPID = AudioPid + sID = audioStreamID nbp = false case EncodeH265: - mPid = VideoPid - sid = H265ID + mPID = VideoPid + sID = H265ID case EncodeH264: - mPid = VideoPid - sid = H264ID + mPID = VideoPid + sID = H264ID + case EncodeMJPEG: + mPID = VideoPid + sID = MJPEGID } pmt := BasePMT @@ -169,14 +189,14 @@ func NewEncoder(dst io.WriteCloser, rate float64, mediaType int) *Encoder { Pcrpid: 0x0100, Pil: 0, Essd: &psi.ESSD{ - St: byte(sid), - Epid: mPid, + St: byte(sID), + Epid: mPID, Esil: 0x00, }, } pmtTable = pmt.Bytes() - return &Encoder{ + e := &Encoder{ dst: dst, writePeriod: time.Duration(float64(time.Second) / rate), @@ -186,31 +206,35 @@ func NewEncoder(dst io.WriteCloser, rate float64, mediaType int) *Encoder { pktCount: 8, - mediaPid: mPid, - streamID: sid, + mediaPid: mPID, + streamID: sID, continuity: map[uint16]byte{ PatPid: 0, PmtPid: 0, - mPid: 0, + mPID: 0, }, } + + for _, option := range options { + err := option(e) + if err != nil { + return nil, fmt.Errorf("option failed with error: %v", err) + } + } + return e, nil } -const ( - hasPayload = 0x1 - hasAdaptationField = 0x2 -) - -const ( - hasDTS = 0x1 - hasPTS = 0x2 -) - -func (e *Encoder) NALBasedPSI(b bool, sendCount int) { - e.nalBasedPSI = b - e.psiSendCount = sendCount - e.pktCount = e.psiSendCount +// PacketBasedPSI is an option that can be passed to NewEncoder to select +// packet based PSI writing, i.e. PSI are written to the destination every +// sendCount packets. +func PacketBasedPSI(sendCount int) func(*Encoder) error { + return func(e *Encoder) error { + e.nalBasedPSI = false + e.psiSendCount = sendCount + e.pktCount = e.psiSendCount + return nil + } } // Write implements io.Writer. Write takes raw video or audio data and encodes into MPEG-TS, diff --git a/container/mts/encoder_test.go b/container/mts/encoder_test.go index 6b46703e..e154dcc5 100644 --- a/container/mts/encoder_test.go +++ b/container/mts/encoder_test.go @@ -3,8 +3,8 @@ NAME encoder_test.go AUTHOR - Trek Hopton Saxon A. Nelson-Milton + Trek Hopton LICENSE encoder_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) @@ -101,12 +101,14 @@ func TestEncodeVideo(t *testing.T) { // Create the dst and write the test data to encoder. dst := &destination{} - e := NewEncoder(nopCloser{dst}, 25, EncodeH264) - e.NALBasedPSI(false, psiSendCount) - - _, err := e.Write(data) + e, err := NewEncoder(nopCloser{dst}, 25, EncodeH264, PacketBasedPSI(psiSendCount)) if err != nil { - t.Fatalf("could not write data to encoder, failed with err: %v\n", err) + t.Fatalf("could not create MTS encoder, failed with error: %v", err) + } + + _, err = e.Write(data) + if err != nil { + t.Fatalf("could not write data to encoder, failed with error: %v\n", err) } // Check headers. @@ -163,7 +165,10 @@ func TestEncodePcm(t *testing.T) { sampleSize := 2 blockSize := 16000 writeFreq := float64(sampleRate*sampleSize) / float64(blockSize) - e := NewEncoder(nopCloser{&buf}, writeFreq, EncodeAudio) + e, err := NewEncoder(nopCloser{&buf}, writeFreq, EncodeAudio) + if err != nil { + t.Fatalf("could not create MTS encoder, failed with error: %v", err) + } inPath := "../../../test/test-data/av/input/sweep_400Hz_20000Hz_-3dBFS_5s_48khz.pcm" inPcm, err := ioutil.ReadFile(inPath) @@ -264,7 +269,11 @@ const fps = 25 func TestMetaEncode1(t *testing.T) { Meta = meta.New() var buf bytes.Buffer - e := NewEncoder(nopCloser{&buf}, fps, EncodeH264) + e, err := NewEncoder(nopCloser{&buf}, fps, EncodeH264) + if err != nil { + t.Fatalf("could not create encoder, failed with error: %v", err) + } + Meta.Add("ts", "12345678") if err := e.writePSI(); err != nil { t.Errorf("unexpected error: %v\n", err.Error()) @@ -292,7 +301,11 @@ func TestMetaEncode1(t *testing.T) { func TestMetaEncode2(t *testing.T) { Meta = meta.New() var buf bytes.Buffer - e := NewEncoder(nopCloser{&buf}, fps, EncodeH264) + e, err := NewEncoder(nopCloser{&buf}, fps, EncodeH264) + if err != nil { + t.Fatalf("could not create MTS encoder, failed with error: %v", err) + } + Meta.Add("ts", "12345678") Meta.Add("loc", "1234,4321,1234") if err := e.writePSI(); err != nil { @@ -320,7 +333,11 @@ func TestMetaEncode2(t *testing.T) { func TestExtractMeta(t *testing.T) { Meta = meta.New() var buf bytes.Buffer - e := NewEncoder(nopCloser{&buf}, fps, EncodeH264) + e, err := NewEncoder(nopCloser{&buf}, fps, EncodeH264) + if err != nil { + t.Fatalf("could not create MTS encoder, failed with error: %v", err) + } + Meta.Add("ts", "12345678") Meta.Add("loc", "1234,4321,1234") if err := e.writePSI(); err != nil { diff --git a/container/mts/mpegts.go b/container/mts/mpegts.go index fe31b5ce..8352a984 100644 --- a/container/mts/mpegts.go +++ b/container/mts/mpegts.go @@ -42,13 +42,11 @@ import ( const PacketSize = 188 -// Program ID for various types of ts packets. +// Standard program IDs for program specific information MPEG-TS packets. const ( - SdtPid = 17 - PatPid = 0 - PmtPid = 4096 - VideoPid = 256 - AudioPid = 210 + SdtPid = 17 + PatPid = 0 + PmtPid = 4096 ) // StreamID is the id of the first stream. diff --git a/revid/config.go b/revid/config.go index 87fb9139..21f509b4 100644 --- a/revid/config.go +++ b/revid/config.go @@ -286,12 +286,7 @@ func (c *Config) Validate() error { } switch c.InputCodec { - case codecutil.H264: - case codecutil.MJPEG: - if c.Quantization > 0 || c.Bitrate == 0 { - return errors.New("bad bitrate or quantization for mjpeg input") - } - case codecutil.PCM, codecutil.ADPCM: + case codecutil.H264, codecutil.MJPEG, codecutil.PCM, codecutil.ADPCM: default: switch c.Input { case Audio: diff --git a/revid/revid.go b/revid/revid.go index 0f357ebd..60801ef5 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -43,6 +43,7 @@ import ( "bitbucket.org/ausocean/av/codec/codecutil" "bitbucket.org/ausocean/av/codec/h264" "bitbucket.org/ausocean/av/codec/h265" + "bitbucket.org/ausocean/av/codec/mjpeg" "bitbucket.org/ausocean/av/container/flv" "bitbucket.org/ausocean/av/container/mts" "bitbucket.org/ausocean/av/protocol/rtcp" @@ -121,7 +122,7 @@ func New(c Config, ns *netsender.Sender) (*Revid, error) { r := Revid{ns: ns, err: make(chan error)} err := r.setConfig(c) if err != nil { - return nil, fmt.Errorf("could not set config, failed with error: %v",err) + return nil, fmt.Errorf("could not set config, failed with error: %v", err) } r.config.Logger.SetLevel(c.LogLevel) go r.handleErrors() @@ -171,16 +172,26 @@ func (r *Revid) reset(config Config) error { err = r.setupPipeline( func(dst io.WriteCloser, fps float64) (io.WriteCloser, error) { var st int + var encOptions []func(*mts.Encoder) error + switch r.config.Input { - case Raspivid, File, V4L: + case Raspivid: + switch r.config.InputCodec { + case H264: + st = mts.EncodeH264 + case MJPEG: + st = mts.EncodeMJPEG + encOptions = append(encOptions, mts.PacketBasedPSI(int(r.config.MinFrames))) + } + case File, V4L: st = mts.EncodeH264 case RTSP: st = mts.EncodeH265 case Audio: st = mts.EncodeAudio } - e := mts.NewEncoder(dst, float64(fps), st) - return e, nil + + return mts.NewEncoder(dst, float64(fps), st, encOptions...) }, func(dst io.WriteCloser, fps int) (io.WriteCloser, error) { return flv.NewEncoder(dst, true, true, fps) @@ -290,7 +301,12 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. switch r.config.Input { case Raspivid: r.setupInput = r.startRaspivid - r.lexTo = h264.Lex + switch r.config.InputCodec { + case H264: + r.lexTo = h264.Lex + case MJPEG: + r.lexTo = mjpeg.Lex + } case V4L: r.setupInput = r.startV4L r.lexTo = h264.Lex @@ -390,6 +406,15 @@ func (r *Revid) Update(vars map[string]string) error { r.config.Exposure = value case "AutoWhiteBalance": r.config.AutoWhiteBalance = value + case "InputCodec": + switch value { + case "H264": + r.config.InputCodec = H264 + case "MJPEG": + r.config.InputCodec = MJPEG + default: + r.config.Logger.Log(logger.Warning, pkg+"invalid InputCodec variable value", "value", value) + } case "Output": outputs := strings.Split(value, ",") r.config.Outputs = make([]uint8, len(outputs)) diff --git a/revid/senders_test.go b/revid/senders_test.go index 3fe2f291..e6a39c7a 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -135,11 +135,12 @@ func TestMtsSenderSegment(t *testing.T) { const numberOfClips = 11 dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips} sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(defaultMTSRBSize, defaultMTSRBElementSize, 0), 0) - encoder := mts.NewEncoder(sender, 25, mts.EncodeH264) - // Turn time based PSI writing off for encoder. const psiSendCount = 10 - encoder.NALBasedPSI(false, psiSendCount) + encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount)) + if err != nil { + t.Fatalf("could not create MTS encoder, failed with error: %v", err) + } // Write the packets to the encoder, which will in turn write to the mtsSender. // Payload will just be packet number. @@ -213,11 +214,12 @@ func TestMtsSenderFailedSend(t *testing.T) { const clipToFailAt = 3 dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})} sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(defaultMTSRBSize, defaultMTSRBElementSize, 0), 0) - encoder := mts.NewEncoder(sender, 25, mts.EncodeH264) - // Turn time based PSI writing off for encoder and send PSI every 10 packets. const psiSendCount = 10 - encoder.NALBasedPSI(false, psiSendCount) + encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount)) + if err != nil { + t.Fatalf("could not create MTS encoder, failed with error: %v", err) + } // Write the packets to the encoder, which will in turn write to the mtsSender. // Payload will just be packet number. @@ -293,11 +295,12 @@ func TestMtsSenderDiscontinuity(t *testing.T) { const clipToDelay = 3 dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})} sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(1, defaultMTSRBElementSize, 0), 0) - encoder := mts.NewEncoder(sender, 25, mts.EncodeH264) - // Turn time based PSI writing off for encoder. const psiSendCount = 10 - encoder.NALBasedPSI(false, psiSendCount) + encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264) + if err != nil { + t.Fatalf("could not create MTS encoder, failed with error: %v", err) + } // Write the packets to the encoder, which will in turn write to the mtsSender. // Payload will just be packet number.