container/mts: make rate and media type encoder parameters option functions

This commit is contained in:
Saxon 2020-05-06 20:10:55 +09:30
parent 77459d16da
commit d17880714e
9 changed files with 172 additions and 112 deletions

View File

@ -45,7 +45,7 @@ func NewDiscontinuityRepairer() *DiscontinuityRepairer {
expCC: map[int]int{
PatPid: 16,
PmtPid: 16,
VideoPid: 16,
PIDVideo: 16,
},
}
}

View File

@ -3,8 +3,8 @@ NAME
encoder.go
AUTHOR
Dan Kortschak <dan@ausocean.org>
Saxon Nelson-Milton <saxon@ausocean.org>
Dan Kortschak <dan@ausocean.org>
LICENSE
encoder.go is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean)
@ -17,7 +17,7 @@ LICENSE
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
@ -41,18 +41,18 @@ import (
// Stream IDs as per ITU-T Rec. H.222.0 / ISO/IEC 13818-1 [1], tables 2-22 and 2-34.
const (
H264ID = 27
H265ID = 36
MJPEGID = 28
audioStreamID = 0xc0 // ADPCM audio stream ID.
streamIDH264 = 27
streamIDH265 = 36
streamIDMJPEG = 28
streamIDAudio = 0xc0 // ADPCM audio stream ID.
)
// These three constants are used to select between the three different
// methods of when the PSI is sent.
const (
pktBased = iota
timeBased
nalBased
psiMethodPacket = iota // PSI is inserted after a certain number of packets.
psiMethodTime // PSI is inserted after a certain amount of time.
psiMethodNAL // PSI is inserted before each "key frame" of media.
)
// Constants used to communicate which media codec will be packetized.
@ -65,8 +65,8 @@ const (
// The program IDs we assign to different types of media.
const (
VideoPid = 256
AudioPid = 210
PIDVideo = 256
PIDAudio = 210
)
// Time-related constants.
@ -98,6 +98,14 @@ const (
hasPTS = 0x2
)
// Default encoder configuration parameters.
const (
defaultRate = 25 // FPS
defaultPSIMethod = psiMethodNAL
defaultStreamID = streamIDH264
defaultMediaPID = PIDVideo
)
// Some common manifestations of PSI.
var (
// StandardPAT is a minimal PAT.
@ -177,7 +185,7 @@ type Encoder struct {
psiTime time.Duration
psiSetTime time.Duration
startTime time.Time
mediaPid uint16
mediaPID uint16
streamID byte
// log is a function that will be used through the encoder code for logging.
@ -186,54 +194,16 @@ type Encoder struct {
// NewEncoder returns an Encoder with the specified media type and rate eg. if a video stream
// calls write for every frame, the rate will be the frame rate of the video.
func NewEncoder(dst io.WriteCloser, rate float64, mediaType int, log logger, options ...func(*Encoder) error) (*Encoder, error) {
var mPID uint16
var sID byte
psiM := timeBased
switch mediaType {
case EncodeAudio:
mPID = AudioPid
sID = audioStreamID
psiM = pktBased
log.Debug("configured for audio packetisation")
case EncodeH265:
mPID = VideoPid
sID = H265ID
psiM = nalBased
log.Debug("configured for h.265 packetisation")
case EncodeH264:
mPID = VideoPid
sID = H264ID
psiM = nalBased
log.Debug("configured for h.264 packetisation")
case EncodeMJPEG:
mPID = VideoPid
sID = MJPEGID
psiM = timeBased
log.Debug("configured for MJPEG packetisation")
}
pmt := BasePMT
pmt.Tss.Sd = &psi.PMT{
Pcrpid: 0x0100,
Pil: 0,
Essd: &psi.ESSD{
St: byte(sID),
Epid: mPID,
Esil: 0x00,
},
}
pmtTable = pmt.Bytes()
func NewEncoder(dst io.WriteCloser, log logger, options ...func(*Encoder) error) (*Encoder, error) {
e := &Encoder{
dst: dst,
writePeriod: time.Duration(float64(time.Second) / rate),
writePeriod: time.Duration(float64(time.Second) / defaultRate),
ptsOffset: ptsOffset,
psiMethod: psiM,
psiMethod: defaultPSIMethod,
pktCount: 8,
mediaPid: mPID,
streamID: sID,
continuity: map[uint16]byte{PatPid: 0, PmtPid: 0, mPID: 0},
mediaPID: defaultMediaPID,
streamID: defaultStreamID,
continuity: map[uint16]byte{PatPid: 0, PmtPid: 0, defaultMediaPID: 0},
log: log,
}
@ -245,41 +215,26 @@ func NewEncoder(dst io.WriteCloser, rate float64, mediaType int, log logger, opt
}
log.Debug("encoder options applied")
pmt := BasePMT
pmt.Tss.Sd = &psi.PMT{
Pcrpid: 0x0100,
Pil: 0,
Essd: &psi.ESSD{
St: byte(e.streamID),
Epid: e.mediaPID,
Esil: 0x00,
},
}
pmtTable = pmt.Bytes()
return e, nil
}
// PacketBasedPSI is an option that can be passed to NewEncoder to select
// packet based PSI writing, i.e. PSI are written to the destination every
// sendCount packets.
func PacketBasedPSI(sendCount int) func(*Encoder) error {
return func(e *Encoder) error {
e.psiMethod = pktBased
e.psiSendCount = sendCount
e.pktCount = e.psiSendCount
e.log.Debug("configured for packet based PSI insertion", "count", sendCount)
return nil
}
}
// TimeBasedPSI is another option that can be passed to NewEncoder to select
// time based PSI writing, i.e. PSI are written to the destination every dur (duration)
// (defualt is 2 seconds).
func TimeBasedPSI(dur time.Duration) func(*Encoder) error {
return func(e *Encoder) error {
e.psiMethod = timeBased
e.psiTime = 0
e.psiSetTime = dur
e.startTime = time.Now()
e.log.Debug("configured for time based PSI insertion")
return nil
}
}
// Write implements io.Writer. Write takes raw video or audio data and encodes into MPEG-TS,
// then sending it to the encoder's io.Writer destination.
func (e *Encoder) Write(data []byte) (int, error) {
switch e.psiMethod {
case pktBased:
case psiMethodPacket:
e.log.Debug("checking packet no. conditions for PSI write", "count", e.pktCount, "PSI count", e.psiSendCount)
if e.pktCount >= e.psiSendCount {
e.pktCount = 0
@ -288,7 +243,7 @@ func (e *Encoder) Write(data []byte) (int, error) {
return 0, err
}
}
case nalBased:
case psiMethodNAL:
nalType, err := h264.NALType(data)
if err != nil {
return 0, fmt.Errorf("could not get type from NAL unit, failed with error: %w", err)
@ -300,7 +255,7 @@ func (e *Encoder) Write(data []byte) (int, error) {
return 0, err
}
}
case timeBased:
case psiMethodTime:
dur := time.Now().Sub(e.startTime)
e.log.Debug("checking time conditions for PSI write")
if dur >= e.psiTime {
@ -331,9 +286,9 @@ func (e *Encoder) Write(data []byte) (int, error) {
for len(buf) != 0 {
pkt := Packet{
PUSI: pusi,
PID: uint16(e.mediaPid),
PID: uint16(e.mediaPID),
RAI: pusi,
CC: e.ccFor(e.mediaPid),
CC: e.ccFor(e.mediaPID),
AFC: hasAdaptationField | hasPayload,
PCRF: pusi,
}

View File

@ -127,7 +127,7 @@ func TestEncodeVideo(t *testing.T) {
// Create the dst and write the test data to encoder.
dst := &destination{}
e, err := NewEncoder(nopCloser{dst}, 25, EncodeH264, (*testLogger)(t), PacketBasedPSI(psiSendCount))
e, err := NewEncoder(nopCloser{dst}, (*testLogger)(t), PacketBasedPSI(psiSendCount), Rate(25), MediaType(EncodeH264))
if err != nil {
t.Fatalf("could not create MTS encoder, failed with error: %v", err)
}
@ -144,7 +144,7 @@ func TestEncodeVideo(t *testing.T) {
var _p packet.Packet
copy(_p[:], p)
pid := packet.Pid(&_p)
if pid == VideoPid {
if pid == PIDVideo {
// Get mts header, excluding PCR.
gotHeader := p[0:6]
wantHeader := expectedHeaders[expectedIdx]
@ -161,7 +161,7 @@ func TestEncodeVideo(t *testing.T) {
var _p packet.Packet
copy(_p[:], p)
pid := packet.Pid(&_p)
if pid == VideoPid {
if pid == PIDVideo {
payload, err := packet.Payload(&_p)
if err != nil {
t.Fatalf("could not get payload from mts packet, failed with err: %v\n", err)
@ -191,7 +191,7 @@ func TestEncodePcm(t *testing.T) {
sampleSize := 2
blockSize := 16000
writeFreq := float64(sampleRate*sampleSize) / float64(blockSize)
e, err := NewEncoder(nopCloser{&buf}, writeFreq, EncodeAudio, (*testLogger)(t))
e, err := NewEncoder(nopCloser{&buf}, (*testLogger)(t), PacketBasedPSI(10), Rate(int(writeFreq)), MediaType(EncodeAudio))
if err != nil {
t.Fatalf("could not create MTS encoder, failed with error: %v", err)
}
@ -233,7 +233,7 @@ func TestEncodePcm(t *testing.T) {
for i+PacketSize <= len(clip) {
// Check MTS packet
if pkt.PID() != AudioPid {
if pkt.PID() != PIDAudio {
i += PacketSize
if i+PacketSize <= len(clip) {
copy(pkt[:], clip[i:i+PacketSize])
@ -295,7 +295,7 @@ const fps = 25
func TestMetaEncode1(t *testing.T) {
Meta = meta.New()
var buf bytes.Buffer
e, err := NewEncoder(nopCloser{&buf}, fps, EncodeH264, (*testLogger)(t))
e, err := NewEncoder(nopCloser{&buf}, (*testLogger)(t))
if err != nil {
t.Fatalf("could not create encoder, failed with error: %v", err)
}
@ -327,7 +327,7 @@ func TestMetaEncode1(t *testing.T) {
func TestMetaEncode2(t *testing.T) {
Meta = meta.New()
var buf bytes.Buffer
e, err := NewEncoder(nopCloser{&buf}, fps, EncodeH264, (*testLogger)(t))
e, err := NewEncoder(nopCloser{&buf}, (*testLogger)(t))
if err != nil {
t.Fatalf("could not create MTS encoder, failed with error: %v", err)
}
@ -359,7 +359,7 @@ func TestMetaEncode2(t *testing.T) {
func TestExtractMeta(t *testing.T) {
Meta = meta.New()
var buf bytes.Buffer
e, err := NewEncoder(nopCloser{&buf}, fps, EncodeH264, (*testLogger)(t))
e, err := NewEncoder(nopCloser{&buf}, (*testLogger)(t))
if err != nil {
t.Fatalf("could not create MTS encoder, failed with error: %v", err)
}

View File

@ -49,9 +49,6 @@ const (
PmtPid = 4096
)
// StreamID is the id of the first stream.
const StreamID = 0xe0
// HeadSize is the size of an MPEG-TS packet header.
const HeadSize = 4

View File

@ -85,7 +85,7 @@ func TestGetPTSRange1(t *testing.T) {
curTime += interval
}
got, err := GetPTSRange(clip.Bytes(), VideoPid)
got, err := GetPTSRange(clip.Bytes(), PIDVideo)
if err != nil {
t.Fatalf("did not expect error getting PTS range: %v", err)
}
@ -132,7 +132,7 @@ func writePSI(b *bytes.Buffer) error {
func writeFrame(b *bytes.Buffer, frame []byte, pts uint64) error {
// Prepare PES data.
pesPkt := pes.Packet{
StreamID: H264ID,
StreamID: streamIDH264,
PDI: hasPTS,
PTS: pts,
Data: frame,
@ -145,7 +145,7 @@ func writeFrame(b *bytes.Buffer, frame []byte, pts uint64) error {
for len(buf) != 0 {
pkt := Packet{
PUSI: pusi,
PID: VideoPid,
PID: PIDVideo,
RAI: pusi,
CC: 0,
AFC: hasAdaptationField | hasPayload,
@ -208,7 +208,7 @@ func TestGetPTSRange2(t *testing.T) {
clip.Reset()
for j := 0; j < nPackets; j++ {
pesPkt := pes.Packet{
StreamID: H264ID,
StreamID: streamIDH264,
PDI: hasPTS,
PTS: test.pts[j],
Data: []byte{},

108
container/mts/options.go Normal file
View File

@ -0,0 +1,108 @@
/*
DESCRIPTION
options.go provides option functions that can be provided to the MTS encoders
constructor NewEncoder for encoder configuration. These options include media
type, PSI insertion strategy and intended access unit rate.
AUTHOR
Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE
Copyright (C) 2017-2020 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package mts
import (
"errors"
"time"
)
var (
ErrUnsupportedMedia = errors.New("unsupported media type")
ErrInvalidRate = errors.New("invalid access unit rate")
)
// PacketBasedPSI is an option that can be passed to NewEncoder to select
// packet based PSI writing, i.e. PSI are written to the destination every
// sendCount packets.
func PacketBasedPSI(sendCount int) func(*Encoder) error {
return func(e *Encoder) error {
e.psiMethod = psiMethodPacket
e.psiSendCount = sendCount
e.pktCount = e.psiSendCount
e.log.Debug("configured for packet based PSI insertion", "count", sendCount)
return nil
}
}
// TimeBasedPSI is another option that can be passed to NewEncoder to select
// time based PSI writing, i.e. PSI are written to the destination every dur
// (duration). The defualt is 2 seconds.
func TimeBasedPSI(dur time.Duration) func(*Encoder) error {
return func(e *Encoder) error {
e.psiMethod = psiMethodTime
e.psiTime = 0
e.psiSetTime = dur
e.startTime = time.Now()
e.log.Debug("configured for time based PSI insertion")
return nil
}
}
// MediaType is an option that can be passed to NewEncoder. It is used to
// specifiy the media type/codec of the data we are packetising using the
// encoder. Currently supported options are EncodeH264, EncodeH265, EncodeMJPEG
// and EncodeAudio.
func MediaType(mt int) func(*Encoder) error {
return func(e *Encoder) error {
switch mt {
case EncodeAudio:
e.mediaPID = PIDAudio
e.streamID = streamIDAudio
e.log.Debug("configured for audio packetisation")
case EncodeH265:
e.mediaPID = PIDVideo
e.streamID = streamIDH265
e.log.Debug("configured for h.265 packetisation")
case EncodeH264:
e.mediaPID = PIDVideo
e.streamID = streamIDH264
e.log.Debug("configured for h.264 packetisation")
case EncodeMJPEG:
e.mediaPID = PIDVideo
e.streamID = streamIDMJPEG
e.log.Debug("configured for MJPEG packetisation")
default:
return ErrUnsupportedMedia
}
e.continuity = map[uint16]byte{PatPid: 0, PmtPid: 0, e.mediaPID: 0}
return nil
}
}
// Rate is an option that can be passed to NewEncoder. It is used to specifiy
// the rate at which the access units should be played in playback. This will
// be used to create timestamps and counts such as PTS and PCR.
func Rate(r int) func(*Encoder) error {
return func(e *Encoder) error {
if r < 1 || r > 60 {
return ErrInvalidRate
}
e.writePeriod = time.Duration(float64(time.Second) / float64(r))
return nil
}
}

View File

@ -94,7 +94,7 @@ func TestExtract(t *testing.T) {
want.frames = append(want.frames, Frame{
Media: frame,
PTS: nextPTS,
ID: H264ID,
ID: streamIDH264,
Meta: metaMap,
})
}

View File

@ -221,8 +221,8 @@ func (r *Revid) reset(c config.Config) error {
default:
panic("unknown input type")
}
return mts.NewEncoder(dst, float64(fps), st, &encLog{r.cfg.Logger}, encOptions...)
encOptions = append(encOptions, mts.MediaType(st), mts.Rate(int(fps)))
return mts.NewEncoder(dst, &encLog{r.cfg.Logger}, encOptions...)
},
func(dst io.WriteCloser, fps int) (io.WriteCloser, error) {
return flv.NewEncoder(dst, true, true, fps)

View File

@ -145,7 +145,7 @@ func TestMTSSenderSegment(t *testing.T) {
sender := newMTSSender(dst, (*testLogger)(t).log, ring.NewBuffer(nElements, rbStartingElementSize, 0), 0)
const psiSendCount = 10
encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, (*testLogger)(t), mts.PacketBasedPSI(psiSendCount))
encoder, err := mts.NewEncoder(sender, (*testLogger)(t), mts.PacketBasedPSI(psiSendCount), mts.Rate(25), mts.MediaType(mts.EncodeH264))
if err != nil {
t.Fatalf("could not create MTS encoder, failed with error: %v", err)
}
@ -187,7 +187,7 @@ func TestMTSSenderSegment(t *testing.T) {
t.Log("checking clip data")
for i := 0; i < len(clip); i += mts.PacketSize {
copy(pkt[:], clip[i:i+mts.PacketSize])
if pkt.PID() == mts.VideoPid {
if pkt.PID() == mts.PIDVideo {
t.Log("got video PID")
payload, err := pkt.Payload()
if err != nil {
@ -226,7 +226,7 @@ func TestMtsSenderFailedSend(t *testing.T) {
sender := newMTSSender(dst, (*testLogger)(t).log, ring.NewBuffer(nElements, rbStartingElementSize, 0), 0)
const psiSendCount = 10
encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, (*testLogger)(t), mts.PacketBasedPSI(psiSendCount))
encoder, err := mts.NewEncoder(sender, (*testLogger)(t), mts.PacketBasedPSI(psiSendCount), mts.Rate(25), mts.MediaType(mts.EncodeH264))
if err != nil {
t.Fatalf("could not create MTS encoder, failed with error: %v", err)
}
@ -268,7 +268,7 @@ func TestMtsSenderFailedSend(t *testing.T) {
t.Log("checking clip data")
for i := 0; i < len(clip); i += mts.PacketSize {
copy(pkt[:], clip[i:i+mts.PacketSize])
if pkt.PID() == mts.VideoPid {
if pkt.PID() == mts.PIDVideo {
t.Log("got video PID")
payload, err := pkt.Payload()
if err != nil {
@ -307,7 +307,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) {
sender := newMTSSender(dst, (*testLogger)(t).log, ring.NewBuffer(1, rbStartingElementSize, 0), 0)
const psiSendCount = 10
encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, (*testLogger)(t), mts.PacketBasedPSI(psiSendCount))
encoder, err := mts.NewEncoder(sender, (*testLogger)(t), mts.PacketBasedPSI(psiSendCount), mts.Rate(25), mts.MediaType(mts.EncodeH264))
if err != nil {
t.Fatalf("could not create MTS encoder, failed with error: %v", err)
}