Merged in audio-mts-encoder (pull request #182)

Upgrade MTS encoder to transfer audio data

Approved-by: kortschak <dan@kortschak.io>
This commit is contained in:
Trek Hopton 2019-04-11 06:41:18 +00:00
commit 4738bae0d1
7 changed files with 209 additions and 38 deletions

139
container/mts/audio_test.go Normal file
View File

@ -0,0 +1,139 @@
/*
NAME
audio_test.go
AUTHOR
Trek Hopton <trek@ausocean.org>
LICENSE
audio_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License in gpl.txt.
If not, see http://www.gnu.org/licenses.
*/
package mts
import (
"bytes"
"io/ioutil"
"testing"
"github.com/Comcast/gots/packet"
"github.com/Comcast/gots/pes"
"bitbucket.org/ausocean/av/container/mts/meta"
)
// TestEncodePcm tests the mpegts encoder's ability to encode pcm audio data.
// It reads and encodes input pcm data into mpegts, then decodes the mpegts and compares the result to the input pcm.
func TestEncodePcm(t *testing.T) {
Meta = meta.New()
var buf bytes.Buffer
sampleRate := 48000
sampleSize := 2
blockSize := 16000
writeFreq := float64(sampleRate*sampleSize) / float64(blockSize)
e := NewEncoder(&buf, writeFreq, Audio)
inPath := "../../../test/test-data/av/input/sweep_400Hz_20000Hz_-3dBFS_5s_48khz.pcm"
inPcm, err := ioutil.ReadFile(inPath)
if err != nil {
t.Errorf("unable to read file: %v", err)
}
// Break pcm into blocks and encode to mts and get the resulting bytes.
for i := 0; i < len(inPcm); i += blockSize {
if len(inPcm)-i < blockSize {
block := inPcm[i:]
_, err = e.Write(block)
if err != nil {
t.Errorf("unable to write block: %v", err)
}
} else {
block := inPcm[i : i+blockSize]
_, err = e.Write(block)
if err != nil {
t.Errorf("unable to write block: %v", err)
}
}
}
clip := buf.Bytes()
// Get the first MTS packet to check
var pkt packet.Packet
pesPacket := make([]byte, 0, blockSize)
got := make([]byte, 0, len(inPcm))
i := 0
if i+PacketSize <= len(clip) {
copy(pkt[:], clip[i:i+PacketSize])
}
// Loop through MTS packets until all the audio data from PES packets has been retrieved
for i+PacketSize <= len(clip) {
// Check MTS packet
if !(pkt.PID() == audioPid) {
i += PacketSize
if i+PacketSize <= len(clip) {
copy(pkt[:], clip[i:i+PacketSize])
}
continue
}
if !pkt.PayloadUnitStartIndicator() {
i += PacketSize
if i+PacketSize <= len(clip) {
copy(pkt[:], clip[i:i+PacketSize])
}
} else {
// Copy the first MTS payload
payload, err := pkt.Payload()
if err != nil {
t.Errorf("unable to get MTS payload: %v", err)
}
pesPacket = append(pesPacket, payload...)
i += PacketSize
if i+PacketSize <= len(clip) {
copy(pkt[:], clip[i:i+PacketSize])
}
// Copy the rest of the MTS payloads that are part of the same PES packet
for (!pkt.PayloadUnitStartIndicator()) && i+PacketSize <= len(clip) {
payload, err = pkt.Payload()
if err != nil {
t.Errorf("unable to get MTS payload: %v", err)
}
pesPacket = append(pesPacket, payload...)
i += PacketSize
if i+PacketSize <= len(clip) {
copy(pkt[:], clip[i:i+PacketSize])
}
}
}
// Get the audio data from the current PES packet
pesHeader, err := pes.NewPESHeader(pesPacket)
if err != nil {
t.Errorf("unable to read PES packet: %v", err)
}
got = append(got, pesHeader.Data()...)
pesPacket = pesPacket[:0]
}
// Compare data from MTS with original data.
if !bytes.Equal(got, inPcm) {
t.Error("data decoded from mts did not match input data")
}
}

View File

@ -2,9 +2,6 @@
NAME NAME
encoder.go encoder.go
DESCRIPTION
See Readme.md
AUTHOR AUTHOR
Dan Kortschak <dan@ausocean.org> Dan Kortschak <dan@ausocean.org>
Saxon Nelson-Milton <saxon@ausocean.org> Saxon Nelson-Milton <saxon@ausocean.org>
@ -29,6 +26,7 @@ LICENSE
package mts package mts
import ( import (
"fmt"
"io" "io"
"time" "time"
@ -105,7 +103,16 @@ const (
patPid = 0 patPid = 0
pmtPid = 4096 pmtPid = 4096
videoPid = 256 videoPid = 256
streamID = 0xe0 // First video stream ID. audioPid = 210
videoStreamID = 0xe0 // First video stream ID.
audioStreamID = 0xc0 // First audio stream ID.
)
// Video and Audio constants are used to communicate which media type will be encoded when creating a
// new encoder with NewEncoder.
const (
Video = iota
Audio
) )
// Time related constants. // Time related constants.
@ -124,7 +131,7 @@ type Encoder struct {
clock time.Duration clock time.Duration
lastTime time.Time lastTime time.Time
frameInterval time.Duration writePeriod time.Duration
ptsOffset time.Duration ptsOffset time.Duration
tsSpace [PacketSize]byte tsSpace [PacketSize]byte
pesSpace [pes.MaxPesSize]byte pesSpace [pes.MaxPesSize]byte
@ -134,26 +141,43 @@ type Encoder struct {
timeBasedPsi bool timeBasedPsi bool
pktCount int pktCount int
psiSendCount int psiSendCount int
mediaPid int
streamID byte
psiLastTime time.Time psiLastTime time.Time
} }
// NewEncoder returns an Encoder with the specified frame rate. // NewEncoder returns an Encoder with the specified media type and rate eg. if a video stream
func NewEncoder(dst io.Writer, fps float64) *Encoder { // calls write for every frame, the rate will be the frame rate of the video.
func NewEncoder(dst io.Writer, rate float64, mediaType int) *Encoder {
var mPid int
var sid byte
switch mediaType {
case Audio:
mPid = audioPid
sid = audioStreamID
case Video:
mPid = videoPid
sid = videoStreamID
}
return &Encoder{ return &Encoder{
dst: dst, dst: dst,
frameInterval: time.Duration(float64(time.Second) / fps), writePeriod: time.Duration(float64(time.Second) / rate),
ptsOffset: ptsOffset, ptsOffset: ptsOffset,
timeBasedPsi: true, timeBasedPsi: true,
pktCount: 8, pktCount: 8,
mediaPid: mPid,
streamID: sid,
continuity: map[int]byte{ continuity: map[int]byte{
patPid: 0, patPid: 0,
pmtPid: 0, pmtPid: 0,
videoPid: 0, mPid: 0,
}, },
} }
} }
@ -180,7 +204,10 @@ func (e *Encoder) TimeBasedPsi(b bool, sendCount int) {
// Write implements io.Writer. Write takes raw h264 and encodes into mpegts, // Write implements io.Writer. Write takes raw h264 and encodes into mpegts,
// then sending it to the encoder's io.Writer destination. // then sending it to the encoder's io.Writer destination.
func (e *Encoder) Write(nalu []byte) (int, error) { func (e *Encoder) Write(data []byte) (int, error) {
if len(data) > pes.MaxPesSize {
return 0, fmt.Errorf("data size too large (Max is %v): %v", pes.MaxPesSize, len(data))
}
now := time.Now() now := time.Now()
if (e.timeBasedPsi && (now.Sub(e.psiLastTime) > psiInterval)) || (!e.timeBasedPsi && (e.pktCount >= e.psiSendCount)) { if (e.timeBasedPsi && (now.Sub(e.psiLastTime) > psiInterval)) || (!e.timeBasedPsi && (e.pktCount >= e.psiSendCount)) {
e.pktCount = 0 e.pktCount = 0
@ -193,21 +220,22 @@ func (e *Encoder) Write(nalu []byte) (int, error) {
// Prepare PES data. // Prepare PES data.
pesPkt := pes.Packet{ pesPkt := pes.Packet{
StreamID: streamID, StreamID: e.streamID,
PDI: hasPTS, PDI: hasPTS,
PTS: e.pts(), PTS: e.pts(),
Data: nalu, Data: data,
HeaderLength: 5, HeaderLength: 5,
} }
buf := pesPkt.Bytes(e.pesSpace[:pes.MaxPesSize]) buf := pesPkt.Bytes(e.pesSpace[:pes.MaxPesSize])
pusi := true pusi := true
for len(buf) != 0 { for len(buf) != 0 {
pkt := Packet{ pkt := Packet{
PUSI: pusi, PUSI: pusi,
PID: videoPid, PID: uint16(e.mediaPid),
RAI: pusi, RAI: pusi,
CC: e.ccFor(videoPid), CC: e.ccFor(e.mediaPid),
AFC: hasAdaptationField | hasPayload, AFC: hasAdaptationField | hasPayload,
PCRF: pusi, PCRF: pusi,
} }
@ -222,14 +250,14 @@ func (e *Encoder) Write(nalu []byte) (int, error) {
} }
_, err := e.dst.Write(pkt.Bytes(e.tsSpace[:PacketSize])) _, err := e.dst.Write(pkt.Bytes(e.tsSpace[:PacketSize]))
if err != nil { if err != nil {
return len(nalu), err return len(data), err
} }
e.pktCount++ e.pktCount++
} }
e.tick() e.tick()
return len(nalu), nil return len(data), nil
} }
// writePSI creates mpegts with pat and pmt tables - with pmt table having updated // writePSI creates mpegts with pat and pmt tables - with pmt table having updated
@ -271,7 +299,7 @@ func (e *Encoder) writePSI() error {
// tick advances the clock one frame interval. // tick advances the clock one frame interval.
func (e *Encoder) tick() { func (e *Encoder) tick() {
e.clock += e.frameInterval e.clock += e.writePeriod
} }
// pts retuns the current presentation timestamp. // pts retuns the current presentation timestamp.

View File

@ -49,7 +49,7 @@ func TestMetaEncode1(t *testing.T) {
Meta = meta.New() Meta = meta.New()
var b []byte var b []byte
buf := bytes.NewBuffer(b) buf := bytes.NewBuffer(b)
e := NewEncoder(buf, fps) e := NewEncoder(buf, fps, Video)
Meta.Add("ts", "12345678") Meta.Add("ts", "12345678")
if err := e.writePSI(); err != nil { if err := e.writePSI(); err != nil {
t.Errorf(errUnexpectedErr, err.Error()) t.Errorf(errUnexpectedErr, err.Error())
@ -78,7 +78,7 @@ func TestMetaEncode2(t *testing.T) {
Meta = meta.New() Meta = meta.New()
var b []byte var b []byte
buf := bytes.NewBuffer(b) buf := bytes.NewBuffer(b)
e := NewEncoder(buf, fps) e := NewEncoder(buf, fps, Video)
Meta.Add("ts", "12345678") Meta.Add("ts", "12345678")
Meta.Add("loc", "1234,4321,1234") Meta.Add("loc", "1234,4321,1234")
if err := e.writePSI(); err != nil { if err := e.writePSI(); err != nil {

View File

@ -74,7 +74,7 @@ const (
) )
/* /*
The below data struct encapsulates the fields of an MPEG-TS packet. Below is Packet encapsulates the fields of an MPEG-TS packet. Below is
the formatting of an MPEG-TS packet for reference! the formatting of an MPEG-TS packet for reference!
MPEG-TS Packet Formatting MPEG-TS Packet Formatting
@ -196,7 +196,11 @@ func FindPid(d []byte, pid uint16) (pkt []byte, i int, err error) {
func (p *Packet) FillPayload(data []byte) int { func (p *Packet) FillPayload(data []byte) int {
currentPktLen := 6 + asInt(p.PCRF)*6 + asInt(p.OPCRF)*6 + currentPktLen := 6 + asInt(p.PCRF)*6 + asInt(p.OPCRF)*6 +
asInt(p.SPF)*1 + asInt(p.TPDF)*1 + len(p.TPD) asInt(p.SPF)*1 + asInt(p.TPDF)*1 + len(p.TPD)
if len(data) > PayloadSize-currentPktLen {
p.Payload = make([]byte, PayloadSize-currentPktLen) p.Payload = make([]byte, PayloadSize-currentPktLen)
} else {
p.Payload = make([]byte, len(data))
}
return copy(p.Payload, data) return copy(p.Payload, data)
} }
@ -214,7 +218,7 @@ func asByte(b bool) byte {
return 0 return 0
} }
// ToByteSlice interprets the fields of the ts packet instance and outputs a // Bytes interprets the fields of the ts packet instance and outputs a
// corresponding byte slice // corresponding byte slice
func (p *Packet) Bytes(buf []byte) []byte { func (p *Packet) Bytes(buf []byte) []byte {
if buf == nil || cap(buf) != PacketSize { if buf == nil || cap(buf) != PacketSize {

View File

@ -26,7 +26,7 @@ LICENSE
package pes package pes
const MaxPesSize = 10000 const MaxPesSize = 64 * 1 << 10
/* /*
The below data struct encapsulates the fields of an PES packet. Below is The below data struct encapsulates the fields of an PES packet. Below is

View File

@ -259,7 +259,7 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.W
} }
func newMtsEncoder(dst io.Writer, fps int) (io.Writer, error) { func newMtsEncoder(dst io.Writer, fps int) (io.Writer, error) {
e := mts.NewEncoder(dst, float64(fps)) e := mts.NewEncoder(dst, float64(fps), mts.Video)
return e, nil return e, nil
} }

View File

@ -111,7 +111,7 @@ func TestMtsSenderSegment(t *testing.T) {
tstDst := &destination{} tstDst := &destination{}
loadSender := newMtsSender(tstDst, log) loadSender := newMtsSender(tstDst, log)
rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout)
encoder := mts.NewEncoder((*buffer)(rb), 25) encoder := mts.NewEncoder((*buffer)(rb), 25, mts.Video)
// Turn time based PSI writing off for encoder. // Turn time based PSI writing off for encoder.
const psiSendCount = 10 const psiSendCount = 10
@ -195,7 +195,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) {
tstDst := &destination{testDiscontinuities: true, discontinuityAt: clipWithDiscontinuity} tstDst := &destination{testDiscontinuities: true, discontinuityAt: clipWithDiscontinuity}
loadSender := newMtsSender(tstDst, log) loadSender := newMtsSender(tstDst, log)
rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout)
encoder := mts.NewEncoder((*buffer)(rb), 25) encoder := mts.NewEncoder((*buffer)(rb), 25, mts.Video)
// Turn time based PSI writing off for encoder. // Turn time based PSI writing off for encoder.
const psiSendCount = 10 const psiSendCount = 10