diff --git a/container/mts/audio_test.go b/container/mts/audio_test.go new file mode 100644 index 00000000..23ba16e6 --- /dev/null +++ b/container/mts/audio_test.go @@ -0,0 +1,139 @@ +/* +NAME + audio_test.go + +AUTHOR + Trek Hopton + +LICENSE + audio_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License in gpl.txt. + If not, see http://www.gnu.org/licenses. +*/ + +package mts + +import ( + "bytes" + "io/ioutil" + "testing" + + "github.com/Comcast/gots/packet" + "github.com/Comcast/gots/pes" + + "bitbucket.org/ausocean/av/container/mts/meta" +) + +// TestEncodePcm tests the mpegts encoder's ability to encode pcm audio data. +// It reads and encodes input pcm data into mpegts, then decodes the mpegts and compares the result to the input pcm. +func TestEncodePcm(t *testing.T) { + Meta = meta.New() + + var buf bytes.Buffer + sampleRate := 48000 + sampleSize := 2 + blockSize := 16000 + writeFreq := float64(sampleRate*sampleSize) / float64(blockSize) + e := NewEncoder(&buf, writeFreq, Audio) + + inPath := "../../../test/test-data/av/input/sweep_400Hz_20000Hz_-3dBFS_5s_48khz.pcm" + inPcm, err := ioutil.ReadFile(inPath) + if err != nil { + t.Errorf("unable to read file: %v", err) + } + + // Break pcm into blocks and encode to mts and get the resulting bytes. + for i := 0; i < len(inPcm); i += blockSize { + if len(inPcm)-i < blockSize { + block := inPcm[i:] + _, err = e.Write(block) + if err != nil { + t.Errorf("unable to write block: %v", err) + } + } else { + block := inPcm[i : i+blockSize] + _, err = e.Write(block) + if err != nil { + t.Errorf("unable to write block: %v", err) + } + } + } + clip := buf.Bytes() + + // Get the first MTS packet to check + var pkt packet.Packet + pesPacket := make([]byte, 0, blockSize) + got := make([]byte, 0, len(inPcm)) + i := 0 + if i+PacketSize <= len(clip) { + copy(pkt[:], clip[i:i+PacketSize]) + } + + // Loop through MTS packets until all the audio data from PES packets has been retrieved + for i+PacketSize <= len(clip) { + + // Check MTS packet + if !(pkt.PID() == audioPid) { + i += PacketSize + if i+PacketSize <= len(clip) { + copy(pkt[:], clip[i:i+PacketSize]) + } + continue + } + if !pkt.PayloadUnitStartIndicator() { + i += PacketSize + if i+PacketSize <= len(clip) { + copy(pkt[:], clip[i:i+PacketSize]) + } + } else { + // Copy the first MTS payload + payload, err := pkt.Payload() + if err != nil { + t.Errorf("unable to get MTS payload: %v", err) + } + pesPacket = append(pesPacket, payload...) + + i += PacketSize + if i+PacketSize <= len(clip) { + copy(pkt[:], clip[i:i+PacketSize]) + } + + // Copy the rest of the MTS payloads that are part of the same PES packet + for (!pkt.PayloadUnitStartIndicator()) && i+PacketSize <= len(clip) { + payload, err = pkt.Payload() + if err != nil { + t.Errorf("unable to get MTS payload: %v", err) + } + pesPacket = append(pesPacket, payload...) + + i += PacketSize + if i+PacketSize <= len(clip) { + copy(pkt[:], clip[i:i+PacketSize]) + } + } + } + // Get the audio data from the current PES packet + pesHeader, err := pes.NewPESHeader(pesPacket) + if err != nil { + t.Errorf("unable to read PES packet: %v", err) + } + got = append(got, pesHeader.Data()...) + pesPacket = pesPacket[:0] + } + + // Compare data from MTS with original data. + if !bytes.Equal(got, inPcm) { + t.Error("data decoded from mts did not match input data") + } +} diff --git a/container/mts/encoder.go b/container/mts/encoder.go index 92e87051..e72cfebf 100644 --- a/container/mts/encoder.go +++ b/container/mts/encoder.go @@ -2,9 +2,6 @@ NAME encoder.go -DESCRIPTION - See Readme.md - AUTHOR Dan Kortschak Saxon Nelson-Milton @@ -29,6 +26,7 @@ LICENSE package mts import ( + "fmt" "io" "time" @@ -101,11 +99,20 @@ var ( ) const ( - sdtPid = 17 - patPid = 0 - pmtPid = 4096 - videoPid = 256 - streamID = 0xe0 // First video stream ID. + sdtPid = 17 + patPid = 0 + pmtPid = 4096 + videoPid = 256 + audioPid = 210 + videoStreamID = 0xe0 // First video stream ID. + audioStreamID = 0xc0 // First audio stream ID. +) + +// Video and Audio constants are used to communicate which media type will be encoded when creating a +// new encoder with NewEncoder. +const ( + Video = iota + Audio ) // Time related constants. @@ -122,38 +129,55 @@ const ( type Encoder struct { dst io.Writer - clock time.Duration - lastTime time.Time - frameInterval time.Duration - ptsOffset time.Duration - tsSpace [PacketSize]byte - pesSpace [pes.MaxPesSize]byte + clock time.Duration + lastTime time.Time + writePeriod time.Duration + ptsOffset time.Duration + tsSpace [PacketSize]byte + pesSpace [pes.MaxPesSize]byte continuity map[int]byte timeBasedPsi bool pktCount int psiSendCount int + mediaPid int + streamID byte psiLastTime time.Time } -// NewEncoder returns an Encoder with the specified frame rate. -func NewEncoder(dst io.Writer, fps float64) *Encoder { +// NewEncoder returns an Encoder with the specified media type and rate eg. if a video stream +// calls write for every frame, the rate will be the frame rate of the video. +func NewEncoder(dst io.Writer, rate float64, mediaType int) *Encoder { + var mPid int + var sid byte + switch mediaType { + case Audio: + mPid = audioPid + sid = audioStreamID + case Video: + mPid = videoPid + sid = videoStreamID + } + return &Encoder{ dst: dst, - frameInterval: time.Duration(float64(time.Second) / fps), - ptsOffset: ptsOffset, + writePeriod: time.Duration(float64(time.Second) / rate), + ptsOffset: ptsOffset, timeBasedPsi: true, pktCount: 8, + mediaPid: mPid, + streamID: sid, + continuity: map[int]byte{ - patPid: 0, - pmtPid: 0, - videoPid: 0, + patPid: 0, + pmtPid: 0, + mPid: 0, }, } } @@ -180,7 +204,10 @@ func (e *Encoder) TimeBasedPsi(b bool, sendCount int) { // Write implements io.Writer. Write takes raw h264 and encodes into mpegts, // then sending it to the encoder's io.Writer destination. -func (e *Encoder) Write(nalu []byte) (int, error) { +func (e *Encoder) Write(data []byte) (int, error) { + if len(data) > pes.MaxPesSize { + return 0, fmt.Errorf("data size too large (Max is %v): %v", pes.MaxPesSize, len(data)) + } now := time.Now() if (e.timeBasedPsi && (now.Sub(e.psiLastTime) > psiInterval)) || (!e.timeBasedPsi && (e.pktCount >= e.psiSendCount)) { e.pktCount = 0 @@ -193,21 +220,22 @@ func (e *Encoder) Write(nalu []byte) (int, error) { // Prepare PES data. pesPkt := pes.Packet{ - StreamID: streamID, + StreamID: e.streamID, PDI: hasPTS, PTS: e.pts(), - Data: nalu, + Data: data, HeaderLength: 5, } + buf := pesPkt.Bytes(e.pesSpace[:pes.MaxPesSize]) pusi := true for len(buf) != 0 { pkt := Packet{ PUSI: pusi, - PID: videoPid, + PID: uint16(e.mediaPid), RAI: pusi, - CC: e.ccFor(videoPid), + CC: e.ccFor(e.mediaPid), AFC: hasAdaptationField | hasPayload, PCRF: pusi, } @@ -222,14 +250,14 @@ func (e *Encoder) Write(nalu []byte) (int, error) { } _, err := e.dst.Write(pkt.Bytes(e.tsSpace[:PacketSize])) if err != nil { - return len(nalu), err + return len(data), err } e.pktCount++ } e.tick() - return len(nalu), nil + return len(data), nil } // writePSI creates mpegts with pat and pmt tables - with pmt table having updated @@ -271,7 +299,7 @@ func (e *Encoder) writePSI() error { // tick advances the clock one frame interval. func (e *Encoder) tick() { - e.clock += e.frameInterval + e.clock += e.writePeriod } // pts retuns the current presentation timestamp. diff --git a/container/mts/metaEncode_test.go b/container/mts/metaEncode_test.go index 00f806b9..2bf94d64 100644 --- a/container/mts/metaEncode_test.go +++ b/container/mts/metaEncode_test.go @@ -49,7 +49,7 @@ func TestMetaEncode1(t *testing.T) { Meta = meta.New() var b []byte buf := bytes.NewBuffer(b) - e := NewEncoder(buf, fps) + e := NewEncoder(buf, fps, Video) Meta.Add("ts", "12345678") if err := e.writePSI(); err != nil { t.Errorf(errUnexpectedErr, err.Error()) @@ -78,7 +78,7 @@ func TestMetaEncode2(t *testing.T) { Meta = meta.New() var b []byte buf := bytes.NewBuffer(b) - e := NewEncoder(buf, fps) + e := NewEncoder(buf, fps, Video) Meta.Add("ts", "12345678") Meta.Add("loc", "1234,4321,1234") if err := e.writePSI(); err != nil { diff --git a/container/mts/mpegts.go b/container/mts/mpegts.go index 18fe8b5f..ed8cbe02 100644 --- a/container/mts/mpegts.go +++ b/container/mts/mpegts.go @@ -74,7 +74,7 @@ const ( ) /* -The below data struct encapsulates the fields of an MPEG-TS packet. Below is +Packet encapsulates the fields of an MPEG-TS packet. Below is the formatting of an MPEG-TS packet for reference! MPEG-TS Packet Formatting @@ -196,7 +196,11 @@ func FindPid(d []byte, pid uint16) (pkt []byte, i int, err error) { func (p *Packet) FillPayload(data []byte) int { currentPktLen := 6 + asInt(p.PCRF)*6 + asInt(p.OPCRF)*6 + asInt(p.SPF)*1 + asInt(p.TPDF)*1 + len(p.TPD) - p.Payload = make([]byte, PayloadSize-currentPktLen) + if len(data) > PayloadSize-currentPktLen { + p.Payload = make([]byte, PayloadSize-currentPktLen) + } else { + p.Payload = make([]byte, len(data)) + } return copy(p.Payload, data) } @@ -214,7 +218,7 @@ func asByte(b bool) byte { return 0 } -// ToByteSlice interprets the fields of the ts packet instance and outputs a +// Bytes interprets the fields of the ts packet instance and outputs a // corresponding byte slice func (p *Packet) Bytes(buf []byte) []byte { if buf == nil || cap(buf) != PacketSize { diff --git a/container/mts/pes/pes.go b/container/mts/pes/pes.go index 1e085166..b0e40f86 100644 --- a/container/mts/pes/pes.go +++ b/container/mts/pes/pes.go @@ -26,7 +26,7 @@ LICENSE package pes -const MaxPesSize = 10000 +const MaxPesSize = 64 * 1 << 10 /* The below data struct encapsulates the fields of an PES packet. Below is diff --git a/revid/revid.go b/revid/revid.go index 0828d7da..8636dba8 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -259,7 +259,7 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.W } func newMtsEncoder(dst io.Writer, fps int) (io.Writer, error) { - e := mts.NewEncoder(dst, float64(fps)) + e := mts.NewEncoder(dst, float64(fps), mts.Video) return e, nil } diff --git a/revid/senders_test.go b/revid/senders_test.go index 95474922..aba06455 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -111,7 +111,7 @@ func TestMtsSenderSegment(t *testing.T) { tstDst := &destination{} loadSender := newMtsSender(tstDst, log) rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) - encoder := mts.NewEncoder((*buffer)(rb), 25) + encoder := mts.NewEncoder((*buffer)(rb), 25, mts.Video) // Turn time based PSI writing off for encoder. const psiSendCount = 10 @@ -195,7 +195,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) { tstDst := &destination{testDiscontinuities: true, discontinuityAt: clipWithDiscontinuity} loadSender := newMtsSender(tstDst, log) rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) - encoder := mts.NewEncoder((*buffer)(rb), 25) + encoder := mts.NewEncoder((*buffer)(rb), 25, mts.Video) // Turn time based PSI writing off for encoder. const psiSendCount = 10