diff --git a/generator/mpegts_generator.go b/generator/mpegts_generator.go index 6917516b..88d129fb 100644 --- a/generator/mpegts_generator.go +++ b/generator/mpegts_generator.go @@ -6,6 +6,7 @@ DESCRIPTION See Readme.md AUTHOR + Dan Kortschak Saxon Nelson-Milton LICENSE @@ -22,8 +23,9 @@ LICENSE for more details. You should have received a copy of the GNU General Public License - along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. */ + package generator import ( @@ -39,47 +41,60 @@ var ( pmtTable = []byte{0, 2, 176, 18, 0, 1, 193, 0, 0, 0xE1, 0x00, 0xF0, 0, 0x1B, 0xE1, 0, 0xF0, 0, 0x15, 0xBD, 0x4D, 0x56} ) -// genPatAndPmt generates the rest of the pat and pmt tables i.e. fills them -// with 0xFFs - because it looks ugly to hardcode above. This is called through -// NewMpegtsgenerator -func genPatAndPmt() { +func init() { for len(patTable) < psiPacketSize { - patTable = append(patTable, 255) + patTable = append(patTable, 0xff) } for len(pmtTable) < psiPacketSize { - pmtTable = append(pmtTable, 255) + pmtTable = append(pmtTable, 0xff) } } const ( - SdtPid = 17 - PatPid = 0 - pmtPid = 4096 - videoPid = 256 - streamID = 0xE0 - outputChanSize = 100 - inputChanSize = 10000 - pesPktChanSize = 1000 - payloadByteChanSize = 200000 - ptsOffset = .7 - maxCC = 15 + SdtPid = 17 + PatPid = 0 + pmtPid = 4096 + videoPid = 256 + streamID = 0xe0 + ptsOffset = 0.7 + maxCC = 0xf ) // tsGenerator encapsulates properties of an mpegts generator. type tsGenerator struct { - outputChan chan []byte - nalInputChan chan []byte - currentTsPacket *mpegts.MpegTsPacket - payloadByteChan chan byte - currentCC byte - currentPtsTime float64 - currentPcrTime float64 - fps uint - pesPktChan chan []byte - ccMap map[int]int - isGenerating bool + outputChan chan []byte + nalInputChan chan []byte + currentPtsTime float64 + currentPcrTime float64 + fps float64 + continuity map[int]byte + isGenerating bool } +// NewTsGenerator returns an instance of the tsGenerator struct +func NewTsGenerator(fps float64) (g *tsGenerator) { + return &tsGenerator{ + outputChan: make(chan []byte, 1), + nalInputChan: make(chan []byte, 1), + fps: fps, + currentPtsTime: ptsOffset, + continuity: map[int]byte{ + SdtPid: 0, // FIXME(kortschak): This is not used. + PatPid: 0, + pmtPid: 0, + videoPid: 0, + }, + } +} + +// Start is called when we would like generation to begin, i.e. we would like +// the generator to start taking input data and creating mpegts packets +func (g *tsGenerator) Start() { + go g.generate() +} + +func (g *tsGenerator) Stop() {} + // InputChan returns a handle to the nalInputChan (inputChan) so that nal units // can be passed to the generator and processed func (g *tsGenerator) InputChan() chan []byte { @@ -92,117 +107,79 @@ func (g *tsGenerator) OutputChan() <-chan []byte { return g.outputChan } -// NewTsGenerator returns an instance of the tsGenerator struct -func NewTsGenerator(fps uint) (g *tsGenerator) { - g = new(tsGenerator) - g.outputChan = make(chan []byte, outputChanSize) - g.nalInputChan = make(chan []byte, inputChanSize) - g.currentCC = 0 - g.fps = fps - g.currentPcrTime = 0.0 - g.currentPtsTime = ptsOffset - g.pesPktChan = make(chan []byte, pesPktChanSize) - g.payloadByteChan = make(chan byte, payloadByteChanSize) - g.ccMap = make(map[int]int, 4) - g.ccMap[SdtPid] = 0 - g.ccMap[PatPid] = 0 - g.ccMap[pmtPid] = 0 - g.ccMap[videoPid] = 0 - genPatAndPmt() - g.isGenerating = false - return -} - -// getPts retuns the next presentation timestamp for the tsGenerator t. -func (g *tsGenerator) genPts() (pts uint64) { - pts = uint64(g.currentPtsTime * float64(90000)) - g.currentPtsTime += 1.0 / float64(g.fps) - return -} - -// genPcr returns the next program clock reference for the tsGenerator g -func (g *tsGenerator) genPcr() (pcr uint64) { - pcr = uint64(g.currentPcrTime * float64(90000)) - g.currentPcrTime += 1.0 / float64(g.fps) - return -} - -// Start is called when we would like generation to begin, i.e. we would like -// the generator to start taking input data and creating mpegts packets -func (g *tsGenerator) Start() { - g.isGenerating = true - go g.generate() -} - -func (g *tsGenerator) Stop() { - g.isGenerating = false -} - -// getCC returns the next continuity counter for a particular pid -func (g *tsGenerator) getCC(pid int) int { - temp := g.ccMap[pid] - if g.ccMap[pid]++; g.ccMap[pid] > maxCC { - g.ccMap[pid] = 0 - } - return temp -} - // generate handles the incoming data and generates equivalent mpegts packets - // sending them to the output channel func (g *tsGenerator) generate() { for { - select { - case nalUnit := <-g.nalInputChan: - pesPkt := pes.PESPacket{ - StreamID: streamID, - PDI: byte(2), - PTS: g.genPts(), - Data: nalUnit, - HeaderLength: 5, - } - pesPktSlice := pesPkt.ToByteSlice() + nalu := <-g.nalInputChan + pesPkt := pes.Packet{ + StreamID: streamID, + PDI: 2, + PTS: g.pts(), + Data: nalu, + HeaderLength: 5, + } + buf := pesPkt.Bytes() - for ii := range pesPktSlice { - g.payloadByteChan <- pesPktSlice[ii] + pusi := true + for len(buf) != 0 { + pkt := mpegts.Packet{ + PUSI: pusi, + PID: videoPid, + RAI: pusi, + CC: g.ccFor(videoPid), + AFC: 3, + PCRF: pusi, } - pusi := true - for len(g.payloadByteChan) > 0 { - pkt := mpegts.MpegTsPacket{ - PUSI: pusi, - PID: videoPid, - RAI: pusi, - CC: byte(g.getCC(videoPid)), - AFC: byte(3), - PCRF: pusi, - } - pkt.FillPayload(g.payloadByteChan) + n := pkt.FillPayload(buf) + buf = buf[n:] - // TODO: create consts for AFC parameters - if pusi { - // Create pat table - patPkt := mpegts.MpegTsPacket{ - PUSI: pusi, - PID: PatPid, - CC: byte(g.getCC(PatPid)), - AFC: 1, - Payload: patTable, - } - g.outputChan <- patPkt.ToByteSlice() - // Create pmt table - pmtPkt := mpegts.MpegTsPacket{ - PUSI: pusi, - PID: pmtPid, - CC: byte(g.getCC(pmtPid)), - AFC: 1, - Payload: pmtTable, - } - g.outputChan <- pmtPkt.ToByteSlice() - // If pusi then we need to gen a pcr - pkt.PCR = g.genPcr() - pusi = false + // TODO: create consts for AFC parameters + if pusi { + // Create pat table + patPkt := mpegts.Packet{ + PUSI: pusi, + PID: PatPid, + CC: g.ccFor(PatPid), + AFC: 1, + Payload: patTable, } - g.outputChan <- pkt.ToByteSlice() + g.outputChan <- patPkt.Bytes() + // Create pmt table + pmtPkt := mpegts.Packet{ + PUSI: pusi, + PID: pmtPid, + CC: g.ccFor(pmtPid), + AFC: 1, + Payload: pmtTable, + } + g.outputChan <- pmtPkt.Bytes() + // If pusi then we need to gen a pcr + pkt.PCR = g.pcr() + pusi = false } + g.outputChan <- pkt.Bytes() } } } + +// pts retuns the next presentation timestamp. +func (g *tsGenerator) pts() uint64 { + pts := uint64(g.currentPtsTime * 90000) // FIXME(kortschak): Name this magic number. + g.currentPtsTime += 1 / g.fps + return pts +} + +// pcr returns the next program clock reference. +func (g *tsGenerator) pcr() uint64 { + pcr := uint64(g.currentPcrTime * 90000) // FIXME(kortschak): Name this magic number. + g.currentPcrTime += 1 / g.fps + return pcr +} + +// ccFor returns the next continuity counter for pid. +func (g *tsGenerator) ccFor(pid int) byte { + cc := g.continuity[pid] + g.continuity[pid] = (cc + 1) & maxCC + return cc +} diff --git a/mpegts/MpegTs.go b/mpegts/MpegTs.go index 0012ddd5..4b4a8a02 100644 --- a/mpegts/MpegTs.go +++ b/mpegts/MpegTs.go @@ -28,8 +28,6 @@ LICENSE package mpegts -import "bitbucket.org/ausocean/av/tools" - const ( mpegTsSize = 188 mpegtsPayloadSize = 176 @@ -100,7 +98,7 @@ the formatting of an MPEG-TS packet for reference! | - | ... | ---------------------------------------------------------------------------- */ -type MpegTsPacket struct { +type Packet struct { TEI bool // Transport Error Indicator PUSI bool // Payload Unit Start Indicator Priority bool // Tranposrt priority indicator @@ -127,27 +125,32 @@ type MpegTsPacket struct { // FillPayload takes a channel and fills the packets Payload field until the // channel is empty or we've the packet reaches capacity -func (p *MpegTsPacket) FillPayload(channel chan byte) { - p.Payload = make([]byte, 0, mpegtsPayloadSize) - currentPktLength := 6 + int(toByte(p.PCRF))*6 + int(toByte(p.OPCRF))*6 + - int(toByte(p.SPF))*1 + int(toByte(p.TPDF))*1 + len(p.TPD) - // While we're within the mpegts packet size and we still have data we can use - for (currentPktLength+len(p.Payload)) < mpegTsSize && len(channel) > 0 { - p.Payload = append(p.Payload, <-channel) - } +func (p *Packet) FillPayload(data []byte) int { + currentPktLength := 6 + asInt(p.PCRF)*6 + asInt(p.OPCRF)*6 + + asInt(p.SPF)*1 + asInt(p.TPDF)*1 + len(p.TPD) + p.Payload = make([]byte, mpegtsPayloadSize-currentPktLength) + return copy(p.Payload, data) } -// toByte is a simple wrapper function for tools.BoolToByte which takes a bool -// and returns an equivalent byte -func toByte(b bool) byte { - return tools.BoolToByte(b) +func asInt(b bool) int { + if b { + return 1 + } + return 0 +} + +func asByte(b bool) byte { + if b { + return 1 + } + return 0 } // ToByteSlice interprets the fields of the ts packet instance and outputs a // corresponding byte slice -func (p *MpegTsPacket) ToByteSlice() (output []byte) { - stuffingLength := 182 - len(p.Payload) - len(p.TPD) - int(toByte(p.PCRF))*6 - - int(toByte(p.OPCRF))*6 - int(toByte(p.SPF)) +func (p *Packet) Bytes() []byte { + stuffingLength := 182 - len(p.Payload) - len(p.TPD) - asInt(p.PCRF)*6 - + asInt(p.OPCRF)*6 - asInt(p.SPF) var stuffing []byte if stuffingLength > 0 { stuffing = make([]byte, stuffingLength) @@ -155,36 +158,36 @@ func (p *MpegTsPacket) ToByteSlice() (output []byte) { for i := range stuffing { stuffing[i] = 0xFF } - afl := 1 + int(toByte(p.PCRF))*6 + int(toByte(p.OPCRF))*6 + int(toByte(p.SPF)) + int(toByte(p.TPDF)) + len(p.TPD) + len(stuffing) - output = make([]byte, 0, mpegTsSize) - output = append(output, []byte{ + afl := 1 + asInt(p.PCRF)*6 + asInt(p.OPCRF)*6 + asInt(p.SPF) + asInt(p.TPDF) + len(p.TPD) + len(stuffing) + buf := make([]byte, 0, mpegTsSize) + buf = append(buf, []byte{ 0x47, - (toByte(p.TEI)<<7 | toByte(p.PUSI)<<6 | toByte(p.Priority)<<5 | byte((p.PID&0xFF00)>>8)), + (asByte(p.TEI)<<7 | asByte(p.PUSI)<<6 | asByte(p.Priority)<<5 | byte((p.PID&0xFF00)>>8)), byte(p.PID & 0x00FF), (p.TSC<<6 | p.AFC<<4 | p.CC), }...) if p.AFC == 3 || p.AFC == 2 { - output = append(output, []byte{ - byte(afl), (toByte(p.DI)<<7 | toByte(p.RAI)<<6 | toByte(p.ESPI)<<5 | - toByte(p.PCRF)<<4 | toByte(p.OPCRF)<<3 | toByte(p.SPF)<<2 | - toByte(p.TPDF)<<1 | toByte(p.AFEF)), + buf = append(buf, []byte{ + byte(afl), (asByte(p.DI)<<7 | asByte(p.RAI)<<6 | asByte(p.ESPI)<<5 | + asByte(p.PCRF)<<4 | asByte(p.OPCRF)<<3 | asByte(p.SPF)<<2 | + asByte(p.TPDF)<<1 | asByte(p.AFEF)), }...) for i := 40; p.PCRF && i >= 0; i -= 8 { - output = append(output, byte((p.PCR<<15)>>uint(i))) + buf = append(buf, byte((p.PCR<<15)>>uint(i))) } for i := 40; p.OPCRF && i >= 0; i -= 8 { - output = append(output, byte(p.OPCR>>uint(i))) + buf = append(buf, byte(p.OPCR>>uint(i))) } if p.SPF { - output = append(output, p.SC) + buf = append(buf, p.SC) } if p.TPDF { - output = append(output, append([]byte{p.TPDL}, p.TPD...)...) + buf = append(buf, append([]byte{p.TPDL}, p.TPD...)...) } - output = append(output, p.Ext...) - output = append(output, stuffing...) + buf = append(buf, p.Ext...) + buf = append(buf, stuffing...) } - output = append(output, p.Payload...) - return + buf = append(buf, p.Payload...) + return buf } diff --git a/pes/pes.go b/pes/pes.go index 82cccafe..01285981 100644 --- a/pes/pes.go +++ b/pes/pes.go @@ -70,7 +70,7 @@ the formatting of a PES packet for reference! ---------------------------------------------------------------------------- */ // TODO: add DSMTM, ACI, CRC, Ext fields -type PESPacket struct { +type Packet struct { StreamID byte // Type of stream Length uint16 // Pes packet length in bytes after this field SC byte // Scrambling control @@ -94,9 +94,9 @@ type PESPacket struct { Data []byte // Pes packet data } -func (p *PESPacket) ToByteSlice() (output []byte) { - output = make([]byte, 0, maxPesSize) - output = append(output, []byte{ +func (p *Packet) Bytes() []byte { + buf := make([]byte, 0, maxPesSize) + buf = append(buf, []byte{ 0x00, 0x00, 0x01, p.StreamID, byte((p.Length & 0xFF00) >> 8), @@ -110,7 +110,7 @@ func (p *PESPacket) ToByteSlice() (output []byte) { if p.PDI == byte(2) { pts := 0x2100010001 | (p.PTS&0x1C0000000)<<3 | (p.PTS&0x3FFF8000)<<2 | (p.PTS&0x7FFF)<<1 - output = append(output, []byte{ + buf = append(buf, []byte{ byte((pts & 0xFF00000000) >> 32), byte((pts & 0x00FF000000) >> 24), byte((pts & 0x0000FF0000) >> 16), @@ -118,6 +118,6 @@ func (p *PESPacket) ToByteSlice() (output []byte) { byte(pts & 0x00000000FF), }...) } - output = append(output, append(p.Stuff, p.Data...)...) - return + buf = append(buf, append(p.Stuff, p.Data...)...) + return buf } diff --git a/revid/revid.go b/revid/revid.go index 839a358b..40a916d2 100755 --- a/revid/revid.go +++ b/revid/revid.go @@ -209,7 +209,7 @@ func (r *Revid) reset(config Config) error { case Mpegts: r.Log(Info, "Using MPEGTS packetisation") frameRate, _ := strconv.Atoi(r.config.FrameRate) - r.generator = generator.NewTsGenerator(uint(frameRate)) + r.generator = generator.NewTsGenerator(float64(frameRate)) case Flv: r.Log(Info, "Using FLV packetisation") frameRate, _ := strconv.Atoi(r.config.FrameRate)