From d17880714e8c5ea1a7a855d5ad9ce2fa11e3aa73 Mon Sep 17 00:00:00 2001 From: Saxon Date: Wed, 6 May 2020 20:10:55 +0930 Subject: [PATCH] container/mts: make rate and media type encoder parameters option functions --- container/mts/discontinuity.go | 2 +- container/mts/encoder.go | 131 +++++++++++---------------------- container/mts/encoder_test.go | 16 ++-- container/mts/mpegts.go | 3 - container/mts/mpegts_test.go | 8 +- container/mts/options.go | 108 +++++++++++++++++++++++++++ container/mts/payload_test.go | 2 +- revid/revid.go | 4 +- revid/senders_test.go | 10 +-- 9 files changed, 172 insertions(+), 112 deletions(-) create mode 100644 container/mts/options.go diff --git a/container/mts/discontinuity.go b/container/mts/discontinuity.go index e127ff94..1ddc1ecc 100644 --- a/container/mts/discontinuity.go +++ b/container/mts/discontinuity.go @@ -45,7 +45,7 @@ func NewDiscontinuityRepairer() *DiscontinuityRepairer { expCC: map[int]int{ PatPid: 16, PmtPid: 16, - VideoPid: 16, + PIDVideo: 16, }, } } diff --git a/container/mts/encoder.go b/container/mts/encoder.go index 8ca677f6..95b99a22 100644 --- a/container/mts/encoder.go +++ b/container/mts/encoder.go @@ -3,8 +3,8 @@ NAME encoder.go AUTHOR - Dan Kortschak Saxon Nelson-Milton + Dan Kortschak LICENSE encoder.go is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean) @@ -17,7 +17,7 @@ LICENSE It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. + for more details. You should have received a copy of the GNU General Public License along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. @@ -41,18 +41,18 @@ import ( // Stream IDs as per ITU-T Rec. H.222.0 / ISO/IEC 13818-1 [1], tables 2-22 and 2-34. const ( - H264ID = 27 - H265ID = 36 - MJPEGID = 28 - audioStreamID = 0xc0 // ADPCM audio stream ID. + streamIDH264 = 27 + streamIDH265 = 36 + streamIDMJPEG = 28 + streamIDAudio = 0xc0 // ADPCM audio stream ID. ) // These three constants are used to select between the three different // methods of when the PSI is sent. const ( - pktBased = iota - timeBased - nalBased + psiMethodPacket = iota // PSI is inserted after a certain number of packets. + psiMethodTime // PSI is inserted after a certain amount of time. + psiMethodNAL // PSI is inserted before each "key frame" of media. ) // Constants used to communicate which media codec will be packetized. @@ -65,8 +65,8 @@ const ( // The program IDs we assign to different types of media. const ( - VideoPid = 256 - AudioPid = 210 + PIDVideo = 256 + PIDAudio = 210 ) // Time-related constants. @@ -98,6 +98,14 @@ const ( hasPTS = 0x2 ) +// Default encoder configuration parameters. +const ( + defaultRate = 25 // FPS + defaultPSIMethod = psiMethodNAL + defaultStreamID = streamIDH264 + defaultMediaPID = PIDVideo +) + // Some common manifestations of PSI. var ( // StandardPAT is a minimal PAT. @@ -177,7 +185,7 @@ type Encoder struct { psiTime time.Duration psiSetTime time.Duration startTime time.Time - mediaPid uint16 + mediaPID uint16 streamID byte // log is a function that will be used through the encoder code for logging. @@ -186,54 +194,16 @@ type Encoder struct { // NewEncoder returns an Encoder with the specified media type and rate eg. if a video stream // calls write for every frame, the rate will be the frame rate of the video. -func NewEncoder(dst io.WriteCloser, rate float64, mediaType int, log logger, options ...func(*Encoder) error) (*Encoder, error) { - var mPID uint16 - var sID byte - psiM := timeBased - switch mediaType { - case EncodeAudio: - mPID = AudioPid - sID = audioStreamID - psiM = pktBased - log.Debug("configured for audio packetisation") - case EncodeH265: - mPID = VideoPid - sID = H265ID - psiM = nalBased - log.Debug("configured for h.265 packetisation") - case EncodeH264: - mPID = VideoPid - sID = H264ID - psiM = nalBased - log.Debug("configured for h.264 packetisation") - case EncodeMJPEG: - mPID = VideoPid - sID = MJPEGID - psiM = timeBased - log.Debug("configured for MJPEG packetisation") - } - - pmt := BasePMT - pmt.Tss.Sd = &psi.PMT{ - Pcrpid: 0x0100, - Pil: 0, - Essd: &psi.ESSD{ - St: byte(sID), - Epid: mPID, - Esil: 0x00, - }, - } - pmtTable = pmt.Bytes() - +func NewEncoder(dst io.WriteCloser, log logger, options ...func(*Encoder) error) (*Encoder, error) { e := &Encoder{ dst: dst, - writePeriod: time.Duration(float64(time.Second) / rate), + writePeriod: time.Duration(float64(time.Second) / defaultRate), ptsOffset: ptsOffset, - psiMethod: psiM, + psiMethod: defaultPSIMethod, pktCount: 8, - mediaPid: mPID, - streamID: sID, - continuity: map[uint16]byte{PatPid: 0, PmtPid: 0, mPID: 0}, + mediaPID: defaultMediaPID, + streamID: defaultStreamID, + continuity: map[uint16]byte{PatPid: 0, PmtPid: 0, defaultMediaPID: 0}, log: log, } @@ -245,41 +215,26 @@ func NewEncoder(dst io.WriteCloser, rate float64, mediaType int, log logger, opt } log.Debug("encoder options applied") + pmt := BasePMT + pmt.Tss.Sd = &psi.PMT{ + Pcrpid: 0x0100, + Pil: 0, + Essd: &psi.ESSD{ + St: byte(e.streamID), + Epid: e.mediaPID, + Esil: 0x00, + }, + } + pmtTable = pmt.Bytes() + return e, nil } -// PacketBasedPSI is an option that can be passed to NewEncoder to select -// packet based PSI writing, i.e. PSI are written to the destination every -// sendCount packets. -func PacketBasedPSI(sendCount int) func(*Encoder) error { - return func(e *Encoder) error { - e.psiMethod = pktBased - e.psiSendCount = sendCount - e.pktCount = e.psiSendCount - e.log.Debug("configured for packet based PSI insertion", "count", sendCount) - return nil - } -} - -// TimeBasedPSI is another option that can be passed to NewEncoder to select -// time based PSI writing, i.e. PSI are written to the destination every dur (duration) -// (defualt is 2 seconds). -func TimeBasedPSI(dur time.Duration) func(*Encoder) error { - return func(e *Encoder) error { - e.psiMethod = timeBased - e.psiTime = 0 - e.psiSetTime = dur - e.startTime = time.Now() - e.log.Debug("configured for time based PSI insertion") - return nil - } -} - // Write implements io.Writer. Write takes raw video or audio data and encodes into MPEG-TS, // then sending it to the encoder's io.Writer destination. func (e *Encoder) Write(data []byte) (int, error) { switch e.psiMethod { - case pktBased: + case psiMethodPacket: e.log.Debug("checking packet no. conditions for PSI write", "count", e.pktCount, "PSI count", e.psiSendCount) if e.pktCount >= e.psiSendCount { e.pktCount = 0 @@ -288,7 +243,7 @@ func (e *Encoder) Write(data []byte) (int, error) { return 0, err } } - case nalBased: + case psiMethodNAL: nalType, err := h264.NALType(data) if err != nil { return 0, fmt.Errorf("could not get type from NAL unit, failed with error: %w", err) @@ -300,7 +255,7 @@ func (e *Encoder) Write(data []byte) (int, error) { return 0, err } } - case timeBased: + case psiMethodTime: dur := time.Now().Sub(e.startTime) e.log.Debug("checking time conditions for PSI write") if dur >= e.psiTime { @@ -331,9 +286,9 @@ func (e *Encoder) Write(data []byte) (int, error) { for len(buf) != 0 { pkt := Packet{ PUSI: pusi, - PID: uint16(e.mediaPid), + PID: uint16(e.mediaPID), RAI: pusi, - CC: e.ccFor(e.mediaPid), + CC: e.ccFor(e.mediaPID), AFC: hasAdaptationField | hasPayload, PCRF: pusi, } diff --git a/container/mts/encoder_test.go b/container/mts/encoder_test.go index 4cc6c7d9..2f9b1717 100644 --- a/container/mts/encoder_test.go +++ b/container/mts/encoder_test.go @@ -127,7 +127,7 @@ func TestEncodeVideo(t *testing.T) { // Create the dst and write the test data to encoder. dst := &destination{} - e, err := NewEncoder(nopCloser{dst}, 25, EncodeH264, (*testLogger)(t), PacketBasedPSI(psiSendCount)) + e, err := NewEncoder(nopCloser{dst}, (*testLogger)(t), PacketBasedPSI(psiSendCount), Rate(25), MediaType(EncodeH264)) if err != nil { t.Fatalf("could not create MTS encoder, failed with error: %v", err) } @@ -144,7 +144,7 @@ func TestEncodeVideo(t *testing.T) { var _p packet.Packet copy(_p[:], p) pid := packet.Pid(&_p) - if pid == VideoPid { + if pid == PIDVideo { // Get mts header, excluding PCR. gotHeader := p[0:6] wantHeader := expectedHeaders[expectedIdx] @@ -161,7 +161,7 @@ func TestEncodeVideo(t *testing.T) { var _p packet.Packet copy(_p[:], p) pid := packet.Pid(&_p) - if pid == VideoPid { + if pid == PIDVideo { payload, err := packet.Payload(&_p) if err != nil { t.Fatalf("could not get payload from mts packet, failed with err: %v\n", err) @@ -191,7 +191,7 @@ func TestEncodePcm(t *testing.T) { sampleSize := 2 blockSize := 16000 writeFreq := float64(sampleRate*sampleSize) / float64(blockSize) - e, err := NewEncoder(nopCloser{&buf}, writeFreq, EncodeAudio, (*testLogger)(t)) + e, err := NewEncoder(nopCloser{&buf}, (*testLogger)(t), PacketBasedPSI(10), Rate(int(writeFreq)), MediaType(EncodeAudio)) if err != nil { t.Fatalf("could not create MTS encoder, failed with error: %v", err) } @@ -233,7 +233,7 @@ func TestEncodePcm(t *testing.T) { for i+PacketSize <= len(clip) { // Check MTS packet - if pkt.PID() != AudioPid { + if pkt.PID() != PIDAudio { i += PacketSize if i+PacketSize <= len(clip) { copy(pkt[:], clip[i:i+PacketSize]) @@ -295,7 +295,7 @@ const fps = 25 func TestMetaEncode1(t *testing.T) { Meta = meta.New() var buf bytes.Buffer - e, err := NewEncoder(nopCloser{&buf}, fps, EncodeH264, (*testLogger)(t)) + e, err := NewEncoder(nopCloser{&buf}, (*testLogger)(t)) if err != nil { t.Fatalf("could not create encoder, failed with error: %v", err) } @@ -327,7 +327,7 @@ func TestMetaEncode1(t *testing.T) { func TestMetaEncode2(t *testing.T) { Meta = meta.New() var buf bytes.Buffer - e, err := NewEncoder(nopCloser{&buf}, fps, EncodeH264, (*testLogger)(t)) + e, err := NewEncoder(nopCloser{&buf}, (*testLogger)(t)) if err != nil { t.Fatalf("could not create MTS encoder, failed with error: %v", err) } @@ -359,7 +359,7 @@ func TestMetaEncode2(t *testing.T) { func TestExtractMeta(t *testing.T) { Meta = meta.New() var buf bytes.Buffer - e, err := NewEncoder(nopCloser{&buf}, fps, EncodeH264, (*testLogger)(t)) + e, err := NewEncoder(nopCloser{&buf}, (*testLogger)(t)) if err != nil { t.Fatalf("could not create MTS encoder, failed with error: %v", err) } diff --git a/container/mts/mpegts.go b/container/mts/mpegts.go index 8352a984..b89a7882 100644 --- a/container/mts/mpegts.go +++ b/container/mts/mpegts.go @@ -49,9 +49,6 @@ const ( PmtPid = 4096 ) -// StreamID is the id of the first stream. -const StreamID = 0xe0 - // HeadSize is the size of an MPEG-TS packet header. const HeadSize = 4 diff --git a/container/mts/mpegts_test.go b/container/mts/mpegts_test.go index 1459c758..688d9135 100644 --- a/container/mts/mpegts_test.go +++ b/container/mts/mpegts_test.go @@ -85,7 +85,7 @@ func TestGetPTSRange1(t *testing.T) { curTime += interval } - got, err := GetPTSRange(clip.Bytes(), VideoPid) + got, err := GetPTSRange(clip.Bytes(), PIDVideo) if err != nil { t.Fatalf("did not expect error getting PTS range: %v", err) } @@ -132,7 +132,7 @@ func writePSI(b *bytes.Buffer) error { func writeFrame(b *bytes.Buffer, frame []byte, pts uint64) error { // Prepare PES data. pesPkt := pes.Packet{ - StreamID: H264ID, + StreamID: streamIDH264, PDI: hasPTS, PTS: pts, Data: frame, @@ -145,7 +145,7 @@ func writeFrame(b *bytes.Buffer, frame []byte, pts uint64) error { for len(buf) != 0 { pkt := Packet{ PUSI: pusi, - PID: VideoPid, + PID: PIDVideo, RAI: pusi, CC: 0, AFC: hasAdaptationField | hasPayload, @@ -208,7 +208,7 @@ func TestGetPTSRange2(t *testing.T) { clip.Reset() for j := 0; j < nPackets; j++ { pesPkt := pes.Packet{ - StreamID: H264ID, + StreamID: streamIDH264, PDI: hasPTS, PTS: test.pts[j], Data: []byte{}, diff --git a/container/mts/options.go b/container/mts/options.go new file mode 100644 index 00000000..eee206cf --- /dev/null +++ b/container/mts/options.go @@ -0,0 +1,108 @@ +/* +DESCRIPTION + options.go provides option functions that can be provided to the MTS encoders + constructor NewEncoder for encoder configuration. These options include media + type, PSI insertion strategy and intended access unit rate. + +AUTHOR + Saxon Nelson-Milton + +LICENSE + Copyright (C) 2017-2020 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + +package mts + +import ( + "errors" + "time" +) + +var ( + ErrUnsupportedMedia = errors.New("unsupported media type") + ErrInvalidRate = errors.New("invalid access unit rate") +) + +// PacketBasedPSI is an option that can be passed to NewEncoder to select +// packet based PSI writing, i.e. PSI are written to the destination every +// sendCount packets. +func PacketBasedPSI(sendCount int) func(*Encoder) error { + return func(e *Encoder) error { + e.psiMethod = psiMethodPacket + e.psiSendCount = sendCount + e.pktCount = e.psiSendCount + e.log.Debug("configured for packet based PSI insertion", "count", sendCount) + return nil + } +} + +// TimeBasedPSI is another option that can be passed to NewEncoder to select +// time based PSI writing, i.e. PSI are written to the destination every dur +// (duration). The defualt is 2 seconds. +func TimeBasedPSI(dur time.Duration) func(*Encoder) error { + return func(e *Encoder) error { + e.psiMethod = psiMethodTime + e.psiTime = 0 + e.psiSetTime = dur + e.startTime = time.Now() + e.log.Debug("configured for time based PSI insertion") + return nil + } +} + +// MediaType is an option that can be passed to NewEncoder. It is used to +// specifiy the media type/codec of the data we are packetising using the +// encoder. Currently supported options are EncodeH264, EncodeH265, EncodeMJPEG +// and EncodeAudio. +func MediaType(mt int) func(*Encoder) error { + return func(e *Encoder) error { + switch mt { + case EncodeAudio: + e.mediaPID = PIDAudio + e.streamID = streamIDAudio + e.log.Debug("configured for audio packetisation") + case EncodeH265: + e.mediaPID = PIDVideo + e.streamID = streamIDH265 + e.log.Debug("configured for h.265 packetisation") + case EncodeH264: + e.mediaPID = PIDVideo + e.streamID = streamIDH264 + e.log.Debug("configured for h.264 packetisation") + case EncodeMJPEG: + e.mediaPID = PIDVideo + e.streamID = streamIDMJPEG + e.log.Debug("configured for MJPEG packetisation") + default: + return ErrUnsupportedMedia + } + e.continuity = map[uint16]byte{PatPid: 0, PmtPid: 0, e.mediaPID: 0} + return nil + } +} + +// Rate is an option that can be passed to NewEncoder. It is used to specifiy +// the rate at which the access units should be played in playback. This will +// be used to create timestamps and counts such as PTS and PCR. +func Rate(r int) func(*Encoder) error { + return func(e *Encoder) error { + if r < 1 || r > 60 { + return ErrInvalidRate + } + e.writePeriod = time.Duration(float64(time.Second) / float64(r)) + return nil + } +} diff --git a/container/mts/payload_test.go b/container/mts/payload_test.go index a114c28e..35259fe4 100644 --- a/container/mts/payload_test.go +++ b/container/mts/payload_test.go @@ -94,7 +94,7 @@ func TestExtract(t *testing.T) { want.frames = append(want.frames, Frame{ Media: frame, PTS: nextPTS, - ID: H264ID, + ID: streamIDH264, Meta: metaMap, }) } diff --git a/revid/revid.go b/revid/revid.go index b4a2f6c5..b15391d9 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -221,8 +221,8 @@ func (r *Revid) reset(c config.Config) error { default: panic("unknown input type") } - - return mts.NewEncoder(dst, float64(fps), st, &encLog{r.cfg.Logger}, encOptions...) + encOptions = append(encOptions, mts.MediaType(st), mts.Rate(int(fps))) + return mts.NewEncoder(dst, &encLog{r.cfg.Logger}, encOptions...) }, func(dst io.WriteCloser, fps int) (io.WriteCloser, error) { return flv.NewEncoder(dst, true, true, fps) diff --git a/revid/senders_test.go b/revid/senders_test.go index c30225e8..0a075b21 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -145,7 +145,7 @@ func TestMTSSenderSegment(t *testing.T) { sender := newMTSSender(dst, (*testLogger)(t).log, ring.NewBuffer(nElements, rbStartingElementSize, 0), 0) const psiSendCount = 10 - encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, (*testLogger)(t), mts.PacketBasedPSI(psiSendCount)) + encoder, err := mts.NewEncoder(sender, (*testLogger)(t), mts.PacketBasedPSI(psiSendCount), mts.Rate(25), mts.MediaType(mts.EncodeH264)) if err != nil { t.Fatalf("could not create MTS encoder, failed with error: %v", err) } @@ -187,7 +187,7 @@ func TestMTSSenderSegment(t *testing.T) { t.Log("checking clip data") for i := 0; i < len(clip); i += mts.PacketSize { copy(pkt[:], clip[i:i+mts.PacketSize]) - if pkt.PID() == mts.VideoPid { + if pkt.PID() == mts.PIDVideo { t.Log("got video PID") payload, err := pkt.Payload() if err != nil { @@ -226,7 +226,7 @@ func TestMtsSenderFailedSend(t *testing.T) { sender := newMTSSender(dst, (*testLogger)(t).log, ring.NewBuffer(nElements, rbStartingElementSize, 0), 0) const psiSendCount = 10 - encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, (*testLogger)(t), mts.PacketBasedPSI(psiSendCount)) + encoder, err := mts.NewEncoder(sender, (*testLogger)(t), mts.PacketBasedPSI(psiSendCount), mts.Rate(25), mts.MediaType(mts.EncodeH264)) if err != nil { t.Fatalf("could not create MTS encoder, failed with error: %v", err) } @@ -268,7 +268,7 @@ func TestMtsSenderFailedSend(t *testing.T) { t.Log("checking clip data") for i := 0; i < len(clip); i += mts.PacketSize { copy(pkt[:], clip[i:i+mts.PacketSize]) - if pkt.PID() == mts.VideoPid { + if pkt.PID() == mts.PIDVideo { t.Log("got video PID") payload, err := pkt.Payload() if err != nil { @@ -307,7 +307,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) { sender := newMTSSender(dst, (*testLogger)(t).log, ring.NewBuffer(1, rbStartingElementSize, 0), 0) const psiSendCount = 10 - encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, (*testLogger)(t), mts.PacketBasedPSI(psiSendCount)) + encoder, err := mts.NewEncoder(sender, (*testLogger)(t), mts.PacketBasedPSI(psiSendCount), mts.Rate(25), mts.MediaType(mts.EncodeH264)) if err != nil { t.Fatalf("could not create MTS encoder, failed with error: %v", err) }