diff --git a/generator/mpegts_generator.go b/generator/mpegts_generator.go index 6917516b..0c4a1811 100644 --- a/generator/mpegts_generator.go +++ b/generator/mpegts_generator.go @@ -6,10 +6,11 @@ DESCRIPTION See Readme.md AUTHOR + Dan Kortschak Saxon Nelson-Milton LICENSE - mpegts_generator.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + mpegts_generator.go is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the @@ -22,11 +23,17 @@ LICENSE for more details. You should have received a copy of the GNU General Public License - along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. */ + package generator import ( + "encoding/binary" + "hash/crc32" + "math/bits" + "time" + "bitbucket.org/ausocean/av/mpegts" "bitbucket.org/ausocean/av/pes" ) @@ -35,51 +42,156 @@ const psiPacketSize = 184 // TODO: really need to finish the at and pmt stuff - this is too hacky var ( - patTable = []byte{0, 0, 176, 13, 0, 1, 193, 0, 0, 0, 1, 240, 0, 42, 177, 4, 178} - pmtTable = []byte{0, 2, 176, 18, 0, 1, 193, 0, 0, 0xE1, 0x00, 0xF0, 0, 0x1B, 0xE1, 0, 0xF0, 0, 0x15, 0xBD, 0x4D, 0x56} + patTable = []byte{ + 0x00, // pointer + + // ---- section included in data sent to CRC32 during check + // table header + 0x00, // table id + 0xb0, // section syntax indicator:1|private bit:1|reserved:2|section length:2|more bytes...:2 + 0x0d, // more bytes... + + // syntax section + 0x00, 0x01, // table id extension + 0xc1, // reserved bits:3|version:5|use now:1 + 0x00, // section number + 0x00, // last section number + // table data + 0x00, 0x01, // Program number + 0xf0, 0x00, // reserved:3|program map PID:13 + + // 0x2a, 0xb1, 0x04, 0xb2, // CRC + // ---- + } + pmtTable = []byte{ + 0x00, // pointer + + // ---- section included in data sent to CRC32 during check + // table header + 0x02, // table id + 0xb0, // section syntax indicator:1|private bit:1|reserved:2|section length:2|more bytes...:2 + 0x12, // more bytes... + + // syntax section + 0x00, 0x01, // table id extension + 0xc1, // reserved bits:3|version:5|use now:1 + 0x00, // section number + 0x00, // last section number + // table data + 0xe1, 0x00, // reserved:3|PCR PID:13 + 0xf0, 0x00, // reserved:4|unused:2|program info length:10 + // No program descriptors since program info length is 0. + // elementary stream info data + 0x1b, // stream type + 0xe1, 0x00, // reserved:3|elementary PID:13 + 0xf0, 0x00, // reserved:4|unused:2|ES info length:10 + // No elementary stream descriptors since ES info length is 0. + + // 0x15, 0xbd, 0x4d, 0x56, // CRC + // ---- + } ) -// genPatAndPmt generates the rest of the pat and pmt tables i.e. fills them -// with 0xFFs - because it looks ugly to hardcode above. This is called through -// NewMpegtsgenerator -func genPatAndPmt() { - for len(patTable) < psiPacketSize { - patTable = append(patTable, 255) +func init() { + // Generate IEEE polynomial table + // for the big-endian algorithm. + crcTable := crc32_MakeTable(bits.Reverse32(crc32.IEEE)) + + patTable = completePSI(patTable, crcTable) + pmtTable = completePSI(pmtTable, crcTable) +} + +func completePSI(psi []byte, tab *crc32.Table) []byte { + var buf [4]byte + crc := crc32_Update(0xffffffff, tab, psi[1:]) + binary.BigEndian.PutUint32(buf[:], crc) + dst := make([]byte, len(psi), psiPacketSize) + copy(dst, psi) + dst = append(dst, buf[:]...) + for len(dst) < cap(dst) { + dst = append(dst, 0xff) } - for len(pmtTable) < psiPacketSize { - pmtTable = append(pmtTable, 255) + return dst +} + +func crc32_MakeTable(poly uint32) *crc32.Table { + var t crc32.Table + for i := range t { + crc := uint32(i) << 24 + for j := 0; j < 8; j++ { + if crc&0x80000000 != 0 { + crc = (crc << 1) ^ poly + } else { + crc <<= 1 + } + } + t[i] = crc } + return &t +} + +func crc32_Update(crc uint32, tab *crc32.Table, p []byte) uint32 { + for _, v := range p { + crc = tab[byte(crc>>24)^v] ^ (crc << 8) + } + return crc } const ( - SdtPid = 17 - PatPid = 0 - pmtPid = 4096 - videoPid = 256 - streamID = 0xE0 - outputChanSize = 100 - inputChanSize = 10000 - pesPktChanSize = 1000 - payloadByteChanSize = 200000 - ptsOffset = .7 - maxCC = 15 + sdtPid = 17 + patPid = 0 + pmtPid = 4096 + videoPid = 256 + streamID = 0xe0 // First video stream ID. +) + +// Time related constants. +const ( + // ptsOffset is the offset added to the clock to determine + // the current presentation timestamp, + ptsOffset = 700 * time.Millisecond + + // pcrFreq is the base Program Clock Reference frequency. + pcrFreq = 90000 // Hz ) // tsGenerator encapsulates properties of an mpegts generator. type tsGenerator struct { - outputChan chan []byte - nalInputChan chan []byte - currentTsPacket *mpegts.MpegTsPacket - payloadByteChan chan byte - currentCC byte - currentPtsTime float64 - currentPcrTime float64 - fps uint - pesPktChan chan []byte - ccMap map[int]int - isGenerating bool + outputChan chan []byte + nalInputChan chan []byte + + clock time.Duration + frameInterval time.Duration + ptsOffset time.Duration + + continuity map[int]byte } +// NewTsGenerator returns an instance of the tsGenerator struct +func NewTsGenerator(fps float64) (g *tsGenerator) { + return &tsGenerator{ + outputChan: make(chan []byte, 1), + nalInputChan: make(chan []byte, 1), + + frameInterval: time.Duration(float64(time.Second) / fps), + ptsOffset: ptsOffset, + + continuity: map[int]byte{ + patPid: 0, + pmtPid: 0, + videoPid: 0, + }, + } +} + +// Start is called when we would like generation to begin, i.e. we would like +// the generator to start taking input data and creating mpegts packets +func (g *tsGenerator) Start() { + go g.generate() +} + +func (g *tsGenerator) Stop() {} + // InputChan returns a handle to the nalInputChan (inputChan) so that nal units // can be passed to the generator and processed func (g *tsGenerator) InputChan() chan []byte { @@ -92,117 +204,98 @@ func (g *tsGenerator) OutputChan() <-chan []byte { return g.outputChan } -// NewTsGenerator returns an instance of the tsGenerator struct -func NewTsGenerator(fps uint) (g *tsGenerator) { - g = new(tsGenerator) - g.outputChan = make(chan []byte, outputChanSize) - g.nalInputChan = make(chan []byte, inputChanSize) - g.currentCC = 0 - g.fps = fps - g.currentPcrTime = 0.0 - g.currentPtsTime = ptsOffset - g.pesPktChan = make(chan []byte, pesPktChanSize) - g.payloadByteChan = make(chan byte, payloadByteChanSize) - g.ccMap = make(map[int]int, 4) - g.ccMap[SdtPid] = 0 - g.ccMap[PatPid] = 0 - g.ccMap[pmtPid] = 0 - g.ccMap[videoPid] = 0 - genPatAndPmt() - g.isGenerating = false - return -} +const ( + hasPayload = 0x1 + hasAdaptationField = 0x2 +) -// getPts retuns the next presentation timestamp for the tsGenerator t. -func (g *tsGenerator) genPts() (pts uint64) { - pts = uint64(g.currentPtsTime * float64(90000)) - g.currentPtsTime += 1.0 / float64(g.fps) - return -} - -// genPcr returns the next program clock reference for the tsGenerator g -func (g *tsGenerator) genPcr() (pcr uint64) { - pcr = uint64(g.currentPcrTime * float64(90000)) - g.currentPcrTime += 1.0 / float64(g.fps) - return -} - -// Start is called when we would like generation to begin, i.e. we would like -// the generator to start taking input data and creating mpegts packets -func (g *tsGenerator) Start() { - g.isGenerating = true - go g.generate() -} - -func (g *tsGenerator) Stop() { - g.isGenerating = false -} - -// getCC returns the next continuity counter for a particular pid -func (g *tsGenerator) getCC(pid int) int { - temp := g.ccMap[pid] - if g.ccMap[pid]++; g.ccMap[pid] > maxCC { - g.ccMap[pid] = 0 - } - return temp -} +const ( + hasDTS = 0x1 + hasPTS = 0x2 +) // generate handles the incoming data and generates equivalent mpegts packets - // sending them to the output channel func (g *tsGenerator) generate() { for { - select { - case nalUnit := <-g.nalInputChan: - pesPkt := pes.PESPacket{ - StreamID: streamID, - PDI: byte(2), - PTS: g.genPts(), - Data: nalUnit, - HeaderLength: 5, - } - pesPktSlice := pesPkt.ToByteSlice() + nalu := <-g.nalInputChan - for ii := range pesPktSlice { - g.payloadByteChan <- pesPktSlice[ii] - } - pusi := true - for len(g.payloadByteChan) > 0 { - pkt := mpegts.MpegTsPacket{ - PUSI: pusi, - PID: videoPid, - RAI: pusi, - CC: byte(g.getCC(videoPid)), - AFC: byte(3), - PCRF: pusi, - } - pkt.FillPayload(g.payloadByteChan) - - // TODO: create consts for AFC parameters - if pusi { - // Create pat table - patPkt := mpegts.MpegTsPacket{ - PUSI: pusi, - PID: PatPid, - CC: byte(g.getCC(PatPid)), - AFC: 1, - Payload: patTable, - } - g.outputChan <- patPkt.ToByteSlice() - // Create pmt table - pmtPkt := mpegts.MpegTsPacket{ - PUSI: pusi, - PID: pmtPid, - CC: byte(g.getCC(pmtPid)), - AFC: 1, - Payload: pmtTable, - } - g.outputChan <- pmtPkt.ToByteSlice() - // If pusi then we need to gen a pcr - pkt.PCR = g.genPcr() - pusi = false - } - g.outputChan <- pkt.ToByteSlice() - } + // Write PAT + patPkt := mpegts.Packet{ + PUSI: true, + PID: patPid, + CC: g.ccFor(patPid), + AFC: hasPayload, + Payload: patTable, } + g.outputChan <- patPkt.Bytes() + + // Write PMT. + pmtPkt := mpegts.Packet{ + PUSI: true, + PID: pmtPid, + CC: g.ccFor(pmtPid), + AFC: hasPayload, + Payload: pmtTable, + } + g.outputChan <- pmtPkt.Bytes() + + // Prepare PES data. + pesPkt := pes.Packet{ + StreamID: streamID, + PDI: hasPTS, + PTS: g.pts(), + Data: nalu, + HeaderLength: 5, + } + buf := pesPkt.Bytes() + + pusi := true + for len(buf) != 0 { + pkt := mpegts.Packet{ + PUSI: pusi, + PID: videoPid, + RAI: pusi, + CC: g.ccFor(videoPid), + AFC: hasAdaptationField | hasPayload, + PCRF: pusi, + } + n := pkt.FillPayload(buf) + buf = buf[n:] + + if pusi { + // If the packet has a Payload Unit Start Indicator + // flag set then we need to write a PCR. + pkt.PCR = g.pcr() + pusi = false + } + + g.outputChan <- pkt.Bytes() + } + + g.tick() } } + +// tick advances the clock one frame interval. +func (g *tsGenerator) tick() { + g.clock += g.frameInterval +} + +// pts retuns the current presentation timestamp. +func (g *tsGenerator) pts() uint64 { + return uint64((g.clock + g.ptsOffset).Seconds() * pcrFreq) +} + +// pcr returns the current program clock reference. +func (g *tsGenerator) pcr() uint64 { + return uint64(g.clock.Seconds() * pcrFreq) +} + +// ccFor returns the next continuity counter for pid. +func (g *tsGenerator) ccFor(pid int) byte { + cc := g.continuity[pid] + const continuityCounterMask = 0xf + g.continuity[pid] = (cc + 1) & continuityCounterMask + return cc +} diff --git a/mpegts/MpegTs.go b/mpegts/MpegTs.go index 0012ddd5..4b4a8a02 100644 --- a/mpegts/MpegTs.go +++ b/mpegts/MpegTs.go @@ -28,8 +28,6 @@ LICENSE package mpegts -import "bitbucket.org/ausocean/av/tools" - const ( mpegTsSize = 188 mpegtsPayloadSize = 176 @@ -100,7 +98,7 @@ the formatting of an MPEG-TS packet for reference! | - | ... | ---------------------------------------------------------------------------- */ -type MpegTsPacket struct { +type Packet struct { TEI bool // Transport Error Indicator PUSI bool // Payload Unit Start Indicator Priority bool // Tranposrt priority indicator @@ -127,27 +125,32 @@ type MpegTsPacket struct { // FillPayload takes a channel and fills the packets Payload field until the // channel is empty or we've the packet reaches capacity -func (p *MpegTsPacket) FillPayload(channel chan byte) { - p.Payload = make([]byte, 0, mpegtsPayloadSize) - currentPktLength := 6 + int(toByte(p.PCRF))*6 + int(toByte(p.OPCRF))*6 + - int(toByte(p.SPF))*1 + int(toByte(p.TPDF))*1 + len(p.TPD) - // While we're within the mpegts packet size and we still have data we can use - for (currentPktLength+len(p.Payload)) < mpegTsSize && len(channel) > 0 { - p.Payload = append(p.Payload, <-channel) - } +func (p *Packet) FillPayload(data []byte) int { + currentPktLength := 6 + asInt(p.PCRF)*6 + asInt(p.OPCRF)*6 + + asInt(p.SPF)*1 + asInt(p.TPDF)*1 + len(p.TPD) + p.Payload = make([]byte, mpegtsPayloadSize-currentPktLength) + return copy(p.Payload, data) } -// toByte is a simple wrapper function for tools.BoolToByte which takes a bool -// and returns an equivalent byte -func toByte(b bool) byte { - return tools.BoolToByte(b) +func asInt(b bool) int { + if b { + return 1 + } + return 0 +} + +func asByte(b bool) byte { + if b { + return 1 + } + return 0 } // ToByteSlice interprets the fields of the ts packet instance and outputs a // corresponding byte slice -func (p *MpegTsPacket) ToByteSlice() (output []byte) { - stuffingLength := 182 - len(p.Payload) - len(p.TPD) - int(toByte(p.PCRF))*6 - - int(toByte(p.OPCRF))*6 - int(toByte(p.SPF)) +func (p *Packet) Bytes() []byte { + stuffingLength := 182 - len(p.Payload) - len(p.TPD) - asInt(p.PCRF)*6 - + asInt(p.OPCRF)*6 - asInt(p.SPF) var stuffing []byte if stuffingLength > 0 { stuffing = make([]byte, stuffingLength) @@ -155,36 +158,36 @@ func (p *MpegTsPacket) ToByteSlice() (output []byte) { for i := range stuffing { stuffing[i] = 0xFF } - afl := 1 + int(toByte(p.PCRF))*6 + int(toByte(p.OPCRF))*6 + int(toByte(p.SPF)) + int(toByte(p.TPDF)) + len(p.TPD) + len(stuffing) - output = make([]byte, 0, mpegTsSize) - output = append(output, []byte{ + afl := 1 + asInt(p.PCRF)*6 + asInt(p.OPCRF)*6 + asInt(p.SPF) + asInt(p.TPDF) + len(p.TPD) + len(stuffing) + buf := make([]byte, 0, mpegTsSize) + buf = append(buf, []byte{ 0x47, - (toByte(p.TEI)<<7 | toByte(p.PUSI)<<6 | toByte(p.Priority)<<5 | byte((p.PID&0xFF00)>>8)), + (asByte(p.TEI)<<7 | asByte(p.PUSI)<<6 | asByte(p.Priority)<<5 | byte((p.PID&0xFF00)>>8)), byte(p.PID & 0x00FF), (p.TSC<<6 | p.AFC<<4 | p.CC), }...) if p.AFC == 3 || p.AFC == 2 { - output = append(output, []byte{ - byte(afl), (toByte(p.DI)<<7 | toByte(p.RAI)<<6 | toByte(p.ESPI)<<5 | - toByte(p.PCRF)<<4 | toByte(p.OPCRF)<<3 | toByte(p.SPF)<<2 | - toByte(p.TPDF)<<1 | toByte(p.AFEF)), + buf = append(buf, []byte{ + byte(afl), (asByte(p.DI)<<7 | asByte(p.RAI)<<6 | asByte(p.ESPI)<<5 | + asByte(p.PCRF)<<4 | asByte(p.OPCRF)<<3 | asByte(p.SPF)<<2 | + asByte(p.TPDF)<<1 | asByte(p.AFEF)), }...) for i := 40; p.PCRF && i >= 0; i -= 8 { - output = append(output, byte((p.PCR<<15)>>uint(i))) + buf = append(buf, byte((p.PCR<<15)>>uint(i))) } for i := 40; p.OPCRF && i >= 0; i -= 8 { - output = append(output, byte(p.OPCR>>uint(i))) + buf = append(buf, byte(p.OPCR>>uint(i))) } if p.SPF { - output = append(output, p.SC) + buf = append(buf, p.SC) } if p.TPDF { - output = append(output, append([]byte{p.TPDL}, p.TPD...)...) + buf = append(buf, append([]byte{p.TPDL}, p.TPD...)...) } - output = append(output, p.Ext...) - output = append(output, stuffing...) + buf = append(buf, p.Ext...) + buf = append(buf, stuffing...) } - output = append(output, p.Payload...) - return + buf = append(buf, p.Payload...) + return buf } diff --git a/pes/pes.go b/pes/pes.go index 82cccafe..01285981 100644 --- a/pes/pes.go +++ b/pes/pes.go @@ -70,7 +70,7 @@ the formatting of a PES packet for reference! ---------------------------------------------------------------------------- */ // TODO: add DSMTM, ACI, CRC, Ext fields -type PESPacket struct { +type Packet struct { StreamID byte // Type of stream Length uint16 // Pes packet length in bytes after this field SC byte // Scrambling control @@ -94,9 +94,9 @@ type PESPacket struct { Data []byte // Pes packet data } -func (p *PESPacket) ToByteSlice() (output []byte) { - output = make([]byte, 0, maxPesSize) - output = append(output, []byte{ +func (p *Packet) Bytes() []byte { + buf := make([]byte, 0, maxPesSize) + buf = append(buf, []byte{ 0x00, 0x00, 0x01, p.StreamID, byte((p.Length & 0xFF00) >> 8), @@ -110,7 +110,7 @@ func (p *PESPacket) ToByteSlice() (output []byte) { if p.PDI == byte(2) { pts := 0x2100010001 | (p.PTS&0x1C0000000)<<3 | (p.PTS&0x3FFF8000)<<2 | (p.PTS&0x7FFF)<<1 - output = append(output, []byte{ + buf = append(buf, []byte{ byte((pts & 0xFF00000000) >> 32), byte((pts & 0x00FF000000) >> 24), byte((pts & 0x0000FF0000) >> 16), @@ -118,6 +118,6 @@ func (p *PESPacket) ToByteSlice() (output []byte) { byte(pts & 0x00000000FF), }...) } - output = append(output, append(p.Stuff, p.Data...)...) - return + buf = append(buf, append(p.Stuff, p.Data...)...) + return buf } diff --git a/revid/revid.go b/revid/revid.go index 839a358b..40a916d2 100755 --- a/revid/revid.go +++ b/revid/revid.go @@ -209,7 +209,7 @@ func (r *Revid) reset(config Config) error { case Mpegts: r.Log(Info, "Using MPEGTS packetisation") frameRate, _ := strconv.Atoi(r.config.FrameRate) - r.generator = generator.NewTsGenerator(uint(frameRate)) + r.generator = generator.NewTsGenerator(float64(frameRate)) case Flv: r.Log(Info, "Using FLV packetisation") frameRate, _ := strconv.Atoi(r.config.FrameRate)