generator: clean up MPEG-TS encoding

This deletes or marks unused cruft, removes internal chans and makes
interface chans single item long to make way for removal.
This commit is contained in:
Dan Kortschak 2018-08-18 11:27:36 +09:30
parent 05b9416aa4
commit ddb78a6234
4 changed files with 153 additions and 173 deletions

View File

@ -6,6 +6,7 @@ DESCRIPTION
See Readme.md See Readme.md
AUTHOR AUTHOR
Dan Kortschak <dan@ausocean.org>
Saxon Nelson-Milton <saxon@ausocean.org> Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE LICENSE
@ -24,6 +25,7 @@ LICENSE
You should have received a copy of the GNU General Public License You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/ */
package generator package generator
import ( import (
@ -39,15 +41,12 @@ var (
pmtTable = []byte{0, 2, 176, 18, 0, 1, 193, 0, 0, 0xE1, 0x00, 0xF0, 0, 0x1B, 0xE1, 0, 0xF0, 0, 0x15, 0xBD, 0x4D, 0x56} pmtTable = []byte{0, 2, 176, 18, 0, 1, 193, 0, 0, 0xE1, 0x00, 0xF0, 0, 0x1B, 0xE1, 0, 0xF0, 0, 0x15, 0xBD, 0x4D, 0x56}
) )
// genPatAndPmt generates the rest of the pat and pmt tables i.e. fills them func init() {
// with 0xFFs - because it looks ugly to hardcode above. This is called through
// NewMpegtsgenerator
func genPatAndPmt() {
for len(patTable) < psiPacketSize { for len(patTable) < psiPacketSize {
patTable = append(patTable, 255) patTable = append(patTable, 0xff)
} }
for len(pmtTable) < psiPacketSize { for len(pmtTable) < psiPacketSize {
pmtTable = append(pmtTable, 255) pmtTable = append(pmtTable, 0xff)
} }
} }
@ -56,30 +55,46 @@ const (
PatPid = 0 PatPid = 0
pmtPid = 4096 pmtPid = 4096
videoPid = 256 videoPid = 256
streamID = 0xE0 streamID = 0xe0
outputChanSize = 100 ptsOffset = 0.7
inputChanSize = 10000 maxCC = 0xf
pesPktChanSize = 1000
payloadByteChanSize = 200000
ptsOffset = .7
maxCC = 15
) )
// tsGenerator encapsulates properties of an mpegts generator. // tsGenerator encapsulates properties of an mpegts generator.
type tsGenerator struct { type tsGenerator struct {
outputChan chan []byte outputChan chan []byte
nalInputChan chan []byte nalInputChan chan []byte
currentTsPacket *mpegts.MpegTsPacket
payloadByteChan chan byte
currentCC byte
currentPtsTime float64 currentPtsTime float64
currentPcrTime float64 currentPcrTime float64
fps uint fps float64
pesPktChan chan []byte continuity map[int]byte
ccMap map[int]int
isGenerating bool isGenerating bool
} }
// NewTsGenerator returns an instance of the tsGenerator struct
func NewTsGenerator(fps float64) (g *tsGenerator) {
return &tsGenerator{
outputChan: make(chan []byte, 1),
nalInputChan: make(chan []byte, 1),
fps: fps,
currentPtsTime: ptsOffset,
continuity: map[int]byte{
SdtPid: 0, // FIXME(kortschak): This is not used.
PatPid: 0,
pmtPid: 0,
videoPid: 0,
},
}
}
// Start is called when we would like generation to begin, i.e. we would like
// the generator to start taking input data and creating mpegts packets
func (g *tsGenerator) Start() {
go g.generate()
}
func (g *tsGenerator) Stop() {}
// InputChan returns a handle to the nalInputChan (inputChan) so that nal units // InputChan returns a handle to the nalInputChan (inputChan) so that nal units
// can be passed to the generator and processed // can be passed to the generator and processed
func (g *tsGenerator) InputChan() chan []byte { func (g *tsGenerator) InputChan() chan []byte {
@ -92,117 +107,79 @@ func (g *tsGenerator) OutputChan() <-chan []byte {
return g.outputChan return g.outputChan
} }
// NewTsGenerator returns an instance of the tsGenerator struct
func NewTsGenerator(fps uint) (g *tsGenerator) {
g = new(tsGenerator)
g.outputChan = make(chan []byte, outputChanSize)
g.nalInputChan = make(chan []byte, inputChanSize)
g.currentCC = 0
g.fps = fps
g.currentPcrTime = 0.0
g.currentPtsTime = ptsOffset
g.pesPktChan = make(chan []byte, pesPktChanSize)
g.payloadByteChan = make(chan byte, payloadByteChanSize)
g.ccMap = make(map[int]int, 4)
g.ccMap[SdtPid] = 0
g.ccMap[PatPid] = 0
g.ccMap[pmtPid] = 0
g.ccMap[videoPid] = 0
genPatAndPmt()
g.isGenerating = false
return
}
// getPts retuns the next presentation timestamp for the tsGenerator t.
func (g *tsGenerator) genPts() (pts uint64) {
pts = uint64(g.currentPtsTime * float64(90000))
g.currentPtsTime += 1.0 / float64(g.fps)
return
}
// genPcr returns the next program clock reference for the tsGenerator g
func (g *tsGenerator) genPcr() (pcr uint64) {
pcr = uint64(g.currentPcrTime * float64(90000))
g.currentPcrTime += 1.0 / float64(g.fps)
return
}
// Start is called when we would like generation to begin, i.e. we would like
// the generator to start taking input data and creating mpegts packets
func (g *tsGenerator) Start() {
g.isGenerating = true
go g.generate()
}
func (g *tsGenerator) Stop() {
g.isGenerating = false
}
// getCC returns the next continuity counter for a particular pid
func (g *tsGenerator) getCC(pid int) int {
temp := g.ccMap[pid]
if g.ccMap[pid]++; g.ccMap[pid] > maxCC {
g.ccMap[pid] = 0
}
return temp
}
// generate handles the incoming data and generates equivalent mpegts packets - // generate handles the incoming data and generates equivalent mpegts packets -
// sending them to the output channel // sending them to the output channel
func (g *tsGenerator) generate() { func (g *tsGenerator) generate() {
for { for {
select { nalu := <-g.nalInputChan
case nalUnit := <-g.nalInputChan: pesPkt := pes.Packet{
pesPkt := pes.PESPacket{
StreamID: streamID, StreamID: streamID,
PDI: byte(2), PDI: 2,
PTS: g.genPts(), PTS: g.pts(),
Data: nalUnit, Data: nalu,
HeaderLength: 5, HeaderLength: 5,
} }
pesPktSlice := pesPkt.ToByteSlice() buf := pesPkt.Bytes()
for ii := range pesPktSlice {
g.payloadByteChan <- pesPktSlice[ii]
}
pusi := true pusi := true
for len(g.payloadByteChan) > 0 { for len(buf) != 0 {
pkt := mpegts.MpegTsPacket{ pkt := mpegts.Packet{
PUSI: pusi, PUSI: pusi,
PID: videoPid, PID: videoPid,
RAI: pusi, RAI: pusi,
CC: byte(g.getCC(videoPid)), CC: g.ccFor(videoPid),
AFC: byte(3), AFC: 3,
PCRF: pusi, PCRF: pusi,
} }
pkt.FillPayload(g.payloadByteChan) n := pkt.FillPayload(buf)
buf = buf[n:]
// TODO: create consts for AFC parameters // TODO: create consts for AFC parameters
if pusi { if pusi {
// Create pat table // Create pat table
patPkt := mpegts.MpegTsPacket{ patPkt := mpegts.Packet{
PUSI: pusi, PUSI: pusi,
PID: PatPid, PID: PatPid,
CC: byte(g.getCC(PatPid)), CC: g.ccFor(PatPid),
AFC: 1, AFC: 1,
Payload: patTable, Payload: patTable,
} }
g.outputChan <- patPkt.ToByteSlice() g.outputChan <- patPkt.Bytes()
// Create pmt table // Create pmt table
pmtPkt := mpegts.MpegTsPacket{ pmtPkt := mpegts.Packet{
PUSI: pusi, PUSI: pusi,
PID: pmtPid, PID: pmtPid,
CC: byte(g.getCC(pmtPid)), CC: g.ccFor(pmtPid),
AFC: 1, AFC: 1,
Payload: pmtTable, Payload: pmtTable,
} }
g.outputChan <- pmtPkt.ToByteSlice() g.outputChan <- pmtPkt.Bytes()
// If pusi then we need to gen a pcr // If pusi then we need to gen a pcr
pkt.PCR = g.genPcr() pkt.PCR = g.pcr()
pusi = false pusi = false
} }
g.outputChan <- pkt.ToByteSlice() g.outputChan <- pkt.Bytes()
} }
} }
} }
// pts retuns the next presentation timestamp.
func (g *tsGenerator) pts() uint64 {
pts := uint64(g.currentPtsTime * 90000) // FIXME(kortschak): Name this magic number.
g.currentPtsTime += 1 / g.fps
return pts
}
// pcr returns the next program clock reference.
func (g *tsGenerator) pcr() uint64 {
pcr := uint64(g.currentPcrTime * 90000) // FIXME(kortschak): Name this magic number.
g.currentPcrTime += 1 / g.fps
return pcr
}
// ccFor returns the next continuity counter for pid.
func (g *tsGenerator) ccFor(pid int) byte {
cc := g.continuity[pid]
g.continuity[pid] = (cc + 1) & maxCC
return cc
} }

View File

@ -28,8 +28,6 @@ LICENSE
package mpegts package mpegts
import "bitbucket.org/ausocean/av/tools"
const ( const (
mpegTsSize = 188 mpegTsSize = 188
mpegtsPayloadSize = 176 mpegtsPayloadSize = 176
@ -100,7 +98,7 @@ the formatting of an MPEG-TS packet for reference!
| - | ... | | - | ... |
---------------------------------------------------------------------------- ----------------------------------------------------------------------------
*/ */
type MpegTsPacket struct { type Packet struct {
TEI bool // Transport Error Indicator TEI bool // Transport Error Indicator
PUSI bool // Payload Unit Start Indicator PUSI bool // Payload Unit Start Indicator
Priority bool // Tranposrt priority indicator Priority bool // Tranposrt priority indicator
@ -127,27 +125,32 @@ type MpegTsPacket struct {
// FillPayload takes a channel and fills the packets Payload field until the // FillPayload takes a channel and fills the packets Payload field until the
// channel is empty or we've the packet reaches capacity // channel is empty or we've the packet reaches capacity
func (p *MpegTsPacket) FillPayload(channel chan byte) { func (p *Packet) FillPayload(data []byte) int {
p.Payload = make([]byte, 0, mpegtsPayloadSize) currentPktLength := 6 + asInt(p.PCRF)*6 + asInt(p.OPCRF)*6 +
currentPktLength := 6 + int(toByte(p.PCRF))*6 + int(toByte(p.OPCRF))*6 + asInt(p.SPF)*1 + asInt(p.TPDF)*1 + len(p.TPD)
int(toByte(p.SPF))*1 + int(toByte(p.TPDF))*1 + len(p.TPD) p.Payload = make([]byte, mpegtsPayloadSize-currentPktLength)
// While we're within the mpegts packet size and we still have data we can use return copy(p.Payload, data)
for (currentPktLength+len(p.Payload)) < mpegTsSize && len(channel) > 0 {
p.Payload = append(p.Payload, <-channel)
}
} }
// toByte is a simple wrapper function for tools.BoolToByte which takes a bool func asInt(b bool) int {
// and returns an equivalent byte if b {
func toByte(b bool) byte { return 1
return tools.BoolToByte(b) }
return 0
}
func asByte(b bool) byte {
if b {
return 1
}
return 0
} }
// ToByteSlice interprets the fields of the ts packet instance and outputs a // ToByteSlice interprets the fields of the ts packet instance and outputs a
// corresponding byte slice // corresponding byte slice
func (p *MpegTsPacket) ToByteSlice() (output []byte) { func (p *Packet) Bytes() []byte {
stuffingLength := 182 - len(p.Payload) - len(p.TPD) - int(toByte(p.PCRF))*6 - stuffingLength := 182 - len(p.Payload) - len(p.TPD) - asInt(p.PCRF)*6 -
int(toByte(p.OPCRF))*6 - int(toByte(p.SPF)) asInt(p.OPCRF)*6 - asInt(p.SPF)
var stuffing []byte var stuffing []byte
if stuffingLength > 0 { if stuffingLength > 0 {
stuffing = make([]byte, stuffingLength) stuffing = make([]byte, stuffingLength)
@ -155,36 +158,36 @@ func (p *MpegTsPacket) ToByteSlice() (output []byte) {
for i := range stuffing { for i := range stuffing {
stuffing[i] = 0xFF stuffing[i] = 0xFF
} }
afl := 1 + int(toByte(p.PCRF))*6 + int(toByte(p.OPCRF))*6 + int(toByte(p.SPF)) + int(toByte(p.TPDF)) + len(p.TPD) + len(stuffing) afl := 1 + asInt(p.PCRF)*6 + asInt(p.OPCRF)*6 + asInt(p.SPF) + asInt(p.TPDF) + len(p.TPD) + len(stuffing)
output = make([]byte, 0, mpegTsSize) buf := make([]byte, 0, mpegTsSize)
output = append(output, []byte{ buf = append(buf, []byte{
0x47, 0x47,
(toByte(p.TEI)<<7 | toByte(p.PUSI)<<6 | toByte(p.Priority)<<5 | byte((p.PID&0xFF00)>>8)), (asByte(p.TEI)<<7 | asByte(p.PUSI)<<6 | asByte(p.Priority)<<5 | byte((p.PID&0xFF00)>>8)),
byte(p.PID & 0x00FF), byte(p.PID & 0x00FF),
(p.TSC<<6 | p.AFC<<4 | p.CC), (p.TSC<<6 | p.AFC<<4 | p.CC),
}...) }...)
if p.AFC == 3 || p.AFC == 2 { if p.AFC == 3 || p.AFC == 2 {
output = append(output, []byte{ buf = append(buf, []byte{
byte(afl), (toByte(p.DI)<<7 | toByte(p.RAI)<<6 | toByte(p.ESPI)<<5 | byte(afl), (asByte(p.DI)<<7 | asByte(p.RAI)<<6 | asByte(p.ESPI)<<5 |
toByte(p.PCRF)<<4 | toByte(p.OPCRF)<<3 | toByte(p.SPF)<<2 | asByte(p.PCRF)<<4 | asByte(p.OPCRF)<<3 | asByte(p.SPF)<<2 |
toByte(p.TPDF)<<1 | toByte(p.AFEF)), asByte(p.TPDF)<<1 | asByte(p.AFEF)),
}...) }...)
for i := 40; p.PCRF && i >= 0; i -= 8 { for i := 40; p.PCRF && i >= 0; i -= 8 {
output = append(output, byte((p.PCR<<15)>>uint(i))) buf = append(buf, byte((p.PCR<<15)>>uint(i)))
} }
for i := 40; p.OPCRF && i >= 0; i -= 8 { for i := 40; p.OPCRF && i >= 0; i -= 8 {
output = append(output, byte(p.OPCR>>uint(i))) buf = append(buf, byte(p.OPCR>>uint(i)))
} }
if p.SPF { if p.SPF {
output = append(output, p.SC) buf = append(buf, p.SC)
} }
if p.TPDF { if p.TPDF {
output = append(output, append([]byte{p.TPDL}, p.TPD...)...) buf = append(buf, append([]byte{p.TPDL}, p.TPD...)...)
} }
output = append(output, p.Ext...) buf = append(buf, p.Ext...)
output = append(output, stuffing...) buf = append(buf, stuffing...)
} }
output = append(output, p.Payload...) buf = append(buf, p.Payload...)
return return buf
} }

View File

@ -70,7 +70,7 @@ the formatting of a PES packet for reference!
---------------------------------------------------------------------------- ----------------------------------------------------------------------------
*/ */
// TODO: add DSMTM, ACI, CRC, Ext fields // TODO: add DSMTM, ACI, CRC, Ext fields
type PESPacket struct { type Packet struct {
StreamID byte // Type of stream StreamID byte // Type of stream
Length uint16 // Pes packet length in bytes after this field Length uint16 // Pes packet length in bytes after this field
SC byte // Scrambling control SC byte // Scrambling control
@ -94,9 +94,9 @@ type PESPacket struct {
Data []byte // Pes packet data Data []byte // Pes packet data
} }
func (p *PESPacket) ToByteSlice() (output []byte) { func (p *Packet) Bytes() []byte {
output = make([]byte, 0, maxPesSize) buf := make([]byte, 0, maxPesSize)
output = append(output, []byte{ buf = append(buf, []byte{
0x00, 0x00, 0x01, 0x00, 0x00, 0x01,
p.StreamID, p.StreamID,
byte((p.Length & 0xFF00) >> 8), byte((p.Length & 0xFF00) >> 8),
@ -110,7 +110,7 @@ func (p *PESPacket) ToByteSlice() (output []byte) {
if p.PDI == byte(2) { if p.PDI == byte(2) {
pts := 0x2100010001 | (p.PTS&0x1C0000000)<<3 | (p.PTS&0x3FFF8000)<<2 | pts := 0x2100010001 | (p.PTS&0x1C0000000)<<3 | (p.PTS&0x3FFF8000)<<2 |
(p.PTS&0x7FFF)<<1 (p.PTS&0x7FFF)<<1
output = append(output, []byte{ buf = append(buf, []byte{
byte((pts & 0xFF00000000) >> 32), byte((pts & 0xFF00000000) >> 32),
byte((pts & 0x00FF000000) >> 24), byte((pts & 0x00FF000000) >> 24),
byte((pts & 0x0000FF0000) >> 16), byte((pts & 0x0000FF0000) >> 16),
@ -118,6 +118,6 @@ func (p *PESPacket) ToByteSlice() (output []byte) {
byte(pts & 0x00000000FF), byte(pts & 0x00000000FF),
}...) }...)
} }
output = append(output, append(p.Stuff, p.Data...)...) buf = append(buf, append(p.Stuff, p.Data...)...)
return return buf
} }

View File

@ -209,7 +209,7 @@ func (r *Revid) reset(config Config) error {
case Mpegts: case Mpegts:
r.Log(Info, "Using MPEGTS packetisation") r.Log(Info, "Using MPEGTS packetisation")
frameRate, _ := strconv.Atoi(r.config.FrameRate) frameRate, _ := strconv.Atoi(r.config.FrameRate)
r.generator = generator.NewTsGenerator(uint(frameRate)) r.generator = generator.NewTsGenerator(float64(frameRate))
case Flv: case Flv:
r.Log(Info, "Using FLV packetisation") r.Log(Info, "Using FLV packetisation")
frameRate, _ := strconv.Atoi(r.config.FrameRate) frameRate, _ := strconv.Atoi(r.config.FrameRate)