diff --git a/Readme.md b/Readme.md index 3220064d..95602640 100644 --- a/Readme.md +++ b/Readme.md @@ -2,6 +2,10 @@ av is a collection of tools and packages written in Go for audio-video processing. +# Authors +Alan Noble +Saxon A. Nelson-Milton + # Description * revid: a tool for re-muxing and re-directing video streams. diff --git a/camstreamer/camstreamer.go b/camstreamer/camstreamer.go new file mode 100644 index 00000000..8a5fda82 --- /dev/null +++ b/camstreamer/camstreamer.go @@ -0,0 +1,169 @@ +package camstreamer + +import ( + "errors" +) + +type CamStreamer struct { + RtspUrl string + RtpUrl string + RtpPort uint16 + RtcpPort uint16 +} + +func (cs *CamStreamer)Connect()(session *RtpSession, err error){ + var res string + sess := rtsp.NewSession() + + if res, err = sess.Options(sc.RtspUrl); err != nil { + return + } + res, err = sess.Describe(rtspUrl) + if err != nil { + log.Fatalln(err) + } + p, err := rtsp.ParseSdp(&io.LimitedReader{R: res.Body, N: res.ContentLength}) + if err != nil { + } + log.Printf("%+v", p) + res, err = sess.Setup(rtpUrl, fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", rtpPort, rtcpPort)) + if err != nil { + } + log.Println(res) + res, err = sess.Play(rtspUrl, res.Header.Get("Session")) + if err != nil { + } + log.Println(res) + // create udp connection for rtp stuff + rtpLaddr, err := net.ResolveUDPAddr("udp", "192.168.0.109:17300") + if err != nil { + t.Errorf("Local rtp addr not set! %v\n", err) + } + rtpAddr, err := net.ResolveUDPAddr("udp", "192.168.0.50:17300") + if err != nil { + t.Errorf("Resolving rtp address didn't work! %v\n", err) + } + rtpConn, err := net.DialUDP("udp", rtpLaddr, rtpAddr) + if err != nil { + t.Errorf("Conncection not established! %v\n", err) + } + // Create udp connection for rtcp stuff + rtcpLaddr, err := net.ResolveUDPAddr("udp", "192.168.0.109:17319") + if err != nil { + t.Errorf("Local RTCP address not resolved! %v\n", err) + } + rtcpAddr, err := net.ResolveUDPAddr("udp", "192.168.0.50:17301") + if err != nil { + t.Errorf("Remote RTCP address not resolved! %v\n", err) + } + rtcpConn, err := net.DialUDP("udp", rtcpLaddr, rtcpAddr) + if err != nil { + t.Errorf("Connection not established! %v\n", err) + } + // let's create a session that will store useful stuff from the connections + rtpSession := NewSession(rtpConn, rtcpConn) +} + +/******************************************************* +Testing stuff related to connection i.e. rtsp, rtp, rtcp +********************************************************/ +const ( + rtpPort = 17300 + rtcpPort = 17319 + rtspUrl = "rtsp://192.168.0.50:8554/CH002.sdp" + rtpUrl = "rtsp://192.168.0.50:8554/CH002.sdp/track1" +) + +/* Let's see if we can connect to an rtsp device then read an rtp stream, +and then convert the rtp packets to mpegts packets and output. */ +func TestRTSP(t *testing.T) { + sess := rtsp.NewSession() + res, err := sess.Options(rtspUrl) + if err != nil { + t.Errorf("Shouldn't have got error: %v\n", err) + } + + res, err = sess.Describe(rtspUrl) + if err != nil { + log.Fatalln(err) + t.Errorf("Shouldn't have got error: %v\n", err) + } + p, err := rtsp.ParseSdp(&io.LimitedReader{R: res.Body, N: res.ContentLength}) + if err != nil { + t.Errorf("Shouldn't have got error: %v\n", err) + } + log.Printf("%+v", p) + res, err = sess.Setup(rtpUrl, fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", rtpPort, rtcpPort)) + if err != nil { + t.Errorf("Shouldn't have got error: %v\n", err) + } + log.Println(res) + res, err = sess.Play(rtspUrl, res.Header.Get("Session")) + if err != nil { + t.Errorf("Shouldn't have got error: %v\n", err) + } + log.Println(res) +} + +func TestRTP(t *testing.T) { + sess := rtsp.NewSession() + res, err := sess.Options(rtspUrl) + if err != nil { + t.Errorf("Shouldn't have got error: %v\n", err) + } + res, err = sess.Describe(rtspUrl) + if err != nil { + log.Fatalln(err) + t.Errorf("Shouldn't have got error: %v\n", err) + } + p, err := rtsp.ParseSdp(&io.LimitedReader{R: res.Body, N: res.ContentLength}) + if err != nil { + t.Errorf("Shouldn't have got error: %v\n", err) + } + log.Printf("%+v", p) + res, err = sess.Setup(rtpUrl, fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", rtpPort, rtcpPort)) + if err != nil { + t.Errorf("Shouldn't have got error: %v\n", err) + } + log.Println(res) + res, err = sess.Play(rtspUrl, res.Header.Get("Session")) + if err != nil { + t.Errorf("Shouldn't have got error: %v\n", err) + } + log.Println(res) + // create udp connection for rtp stuff + rtpLaddr, err := net.ResolveUDPAddr("udp", "192.168.0.109:17300") + if err != nil { + t.Errorf("Local rtp addr not set! %v\n", err) + } + rtpAddr, err := net.ResolveUDPAddr("udp", "192.168.0.50:17300") + if err != nil { + t.Errorf("Resolving rtp address didn't work! %v\n", err) + } + rtpConn, err := net.DialUDP("udp", rtpLaddr, rtpAddr) + if err != nil { + t.Errorf("Conncection not established! %v\n", err) + } + // Create udp connection for rtcp stuff + rtcpLaddr, err := net.ResolveUDPAddr("udp", "192.168.0.109:17319") + if err != nil { + t.Errorf("Local RTCP address not resolved! %v\n", err) + } + rtcpAddr, err := net.ResolveUDPAddr("udp", "192.168.0.50:17301") + if err != nil { + t.Errorf("Remote RTCP address not resolved! %v\n", err) + } + rtcpConn, err := net.DialUDP("udp", rtcpLaddr, rtcpAddr) + if err != nil { + t.Errorf("Connection not established! %v\n", err) + } + // let's create a session that will store useful stuff from the connections + rtpSession := NewSession(rtpConn, rtcpConn) + time.Sleep(2 * time.Second) + select { + default: + t.Errorf("Should have got rtpPacket!") + case rtpPacket := <-rtpSession.RtpChan: + fmt.Printf("RTP packet: %v\n", rtpPacket) + } +} diff --git a/effslice/EffSlice.go b/effslice/EffSlice.go new file mode 100644 index 00000000..c6b7c4e7 --- /dev/null +++ b/effslice/EffSlice.go @@ -0,0 +1,28 @@ +package efficientbuffer + +type dataBlock struct { + address []byte // Address of the data block (slice) + lowerBound int // Lower bound of the data we're interested in + upperBound int // Upper bound of the data we're interested in + startIndex int // Index in our EffSlice +} + +type EffSlice struct { + data map[int](*dataChunk) +} + +func (s *EffSlice)GetElement(index int) byte { +} + +func (s *EffSlice)AsByteSlice() []byte { + +} + +func (s *EffSlice)Append(data *EffSlice){ +} + +func (s *EffSlice)Append(data []byte){ +} + +func (s *EffSlice)Len(){ +} diff --git a/h264/H264Writer.go b/h264/H264Writer.go new file mode 100644 index 00000000..4c2bf8f7 --- /dev/null +++ b/h264/H264Writer.go @@ -0,0 +1,146 @@ +/* +NAME + RtpToTsConverter.go - provides utilities for the conversion of Rtp packets + to equivalent MpegTs packets. + +DESCRIPTION + See Readme.md + +AUTHOR + Saxon Nelson-Milton + +LICENSE + RtpToTsConverter.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). +*/ + +package h264 + +import ( + _"fmt" + "os" + + "bitbucket.org/ausocean/av/mpegts" + "bitbucket.org/ausocean/av/rtp" + "bitbucket.org/ausocean/av/tools" + "bitbucket.org/ausocean/av/itut" +) + +type RtpToH264Converter interface { + Convert() +} + +type rtpToH264Converter struct { + TsChan <-chan *mpegts.MpegTsPacket + tsChan chan<- *mpegts.MpegTsPacket + InputChan chan<- rtp.RtpPacket + inputChan <-chan rtp.RtpPacket + currentTsPacket *mpegts.MpegTsPacket + payloadByteChan chan byte + currentCC byte +} + +//func parseH264File() +func NewRtpToH264Converter() (c *rtpToH264Converter) { + c = new(rtpToH264Converter) + tsChan := make(chan *mpegts.MpegTsPacket,100) + c.TsChan = tsChan + c.tsChan = tsChan + inputChan := make(chan rtp.RtpPacket,100) + c.InputChan = inputChan + c.inputChan = inputChan + c.currentCC = 0 + return +} + +func (c* rtpToH264Converter) Convert() { + file,_ := os.Create("video") + var rtpBuffer [](*rtp.RtpPacket) + for { + select { + default: + case rtpPacket := <-c.inputChan: + rtpBuffer = append(rtpBuffer,&rtpPacket) + if len(rtpBuffer) > 2 { + // if there's something weird going on with sequence numbers then sort + if rtpPacket.SequenceNumber < rtpBuffer[len(rtpBuffer)-2].SequenceNumber { + for i := 1; i < len(rtpBuffer); i++ { + for j := i; j > 0 && rtpBuffer[j].SequenceNumber < rtpBuffer[j - 1].SequenceNumber; j-- { + temp := rtpBuffer[j] + rtpBuffer[j] = rtpBuffer[j-1] + rtpBuffer[j-1] = temp + } + } + } + } + if len(rtpBuffer) > 200 { + // Discard everything before a type 7 + for tools.GetOctectType(rtpBuffer[0]) != 7 { + rtpBuffer = rtpBuffer[1:] + } + // get sps + sps := make([]byte,len(rtpBuffer[0].Payload)) + copy(sps[:],rtpBuffer[0].Payload[:]) + rtpBuffer = rtpBuffer[1:] + // get pps + pps := make([]byte,len(rtpBuffer[0].Payload)) + copy(pps[:],rtpBuffer[0].Payload[:]) + rtpBuffer = rtpBuffer[1:] + // get sei + sei := make([]byte, len(rtpBuffer[0].Payload)) + copy(sei[:],rtpBuffer[0].Payload[:]) + rtpBuffer = rtpBuffer[1:] + // while we haven't reached the next sps in the buffer + for tools.GetOctectType(rtpBuffer[0]) != 7 { + switch(tools.GetOctectType(rtpBuffer[0])){ + case 28: + if tools.GetStartBit(rtpBuffer[0]) == 1{ + var buffer []byte + buffer = append(buffer, append(itut.StartCode1(),itut.AUD()...)...) + buffer = append(buffer, append(itut.StartCode1(),sps...)...) + buffer = append(buffer, append(itut.StartCode1(),pps...)...) + buffer = append(buffer, append(itut.StartCode1(),sei...)...) + buffer = append(buffer, itut.StartCode1()...) + buffer = append(buffer, rtpBuffer[0].Payload[0] & 0xE0 | rtpBuffer[0].Payload[1] & 0x1F ) + buffer = append(buffer, rtpBuffer[0].Payload[2:]...) + rtpBuffer = rtpBuffer[1:] + for { + buffer = append(buffer, rtpBuffer[0].Payload[2:]...) + if tools.GetEndBit(rtpBuffer[0]) == 1 { + rtpBuffer = rtpBuffer[1:] + file.Write(buffer) + break + } + rtpBuffer = rtpBuffer[1:] + } + } + case 1: + var buffer []byte + buffer = append(buffer, append(itut.StartCode1(), itut.AUD()...)...) + buffer = append(buffer, append(itut.StartCode1(), sps...)...) + buffer = append(buffer, append(itut.StartCode1(),pps...)...) + buffer = append(buffer, append(itut.StartCode1(),sei...)...) + buffer = append(buffer, itut.StartCode1()...) + buffer = append(buffer, rtpBuffer[0].Payload[0] & 0xE0 | rtpBuffer[0].Payload[1] & 0x1F ) + buffer = append(buffer, rtpBuffer[0].Payload[2:]...) + rtpBuffer = rtpBuffer[1:] + file.Write(buffer) + default: + } + } + } + } + } +} diff --git a/h264/h264Parser.go b/h264/h264Parser.go new file mode 100644 index 00000000..09d9bf53 --- /dev/null +++ b/h264/h264Parser.go @@ -0,0 +1,89 @@ +/* +NAME + RtpToTsConverter.go - provides utilities for the conversion of Rtp packets + to equivalent MpegTs packets. + +DESCRIPTION + See Readme.md + +AUTHOR + Saxon Nelson-Milton + +LICENSE + RtpToTsConverter.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). +*/ + +package h264 + +import ( + "bitbucket.org/ausocean/av/itut" + "log" + "sync" + _"fmt" + _"time" +) + +const ( + acceptedLength = 1000 +) + +var ( + Info *log.Logger + mutex *sync.Mutex +) + +type H264Parser struct { + inputBuffer []byte + isParsing bool + OutputChan chan<- []byte + InputByteChan chan byte +} + +func (p* H264Parser)Stop(){ + p.isParsing = false +} + +func (p *H264Parser)Start(){ + go p.parse() +} + +func (p *H264Parser)parse() { + p.isParsing = true + outputBuffer := make([]byte, 0, 10000) + searchingForEnd := false + p.InputByteChan = make(chan byte, 10000) + for p.isParsing { + aByte := <-p.InputByteChan + outputBuffer = append(outputBuffer, aByte) + for i:=1; aByte == 0x00 && i != 4; i++ { + aByte = <-p.InputByteChan + outputBuffer = append(outputBuffer, aByte) + if ( aByte == 0x01 && i == 2 ) || ( aByte == 0x01 && i == 3 ) { + if searchingForEnd { + output := append(append(itut.StartCode1(),itut.AUD()...),outputBuffer[:len(outputBuffer)-(i+1)]...) + p.OutputChan<-output + outputBuffer = outputBuffer[len(outputBuffer)-1-i:] + searchingForEnd = false + } + aByte = <-p.InputByteChan + outputBuffer = append(outputBuffer, aByte) + if nalType := aByte & 0x1F; nalType == 1 || nalType == 5 { + searchingForEnd = true + } + } + } + } +} diff --git a/itut/standards.go b/itut/standards.go new file mode 100644 index 00000000..96f66b2a --- /dev/null +++ b/itut/standards.go @@ -0,0 +1,33 @@ +/* +NAME + RtpToTsConverter.go - provides utilities for the conversion of Rtp packets + to equivalent MpegTs packets. + +DESCRIPTION + See Readme.md + +AUTHOR + Saxon Nelson-Milton + +LICENSE + RtpToTsConverter.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). +*/ + +package itut + +func StartCode1() []byte { return []byte{0x00, 0x00, 0x01} } +func StartCode2() []byte { return []byte{0x00, 0x00, 0x00, 0x01} } +func AUD() []byte { return []byte{0x09, 0xF0} } diff --git a/mpegts/MpegTs.go b/mpegts/MpegTs.go new file mode 100644 index 00000000..b66fb209 --- /dev/null +++ b/mpegts/MpegTs.go @@ -0,0 +1,196 @@ +/* +NAME + MpegTs.go - provides a data structure intended to encapsulate the properties + of an MpegTs packet and also functions to allow manipulation of these packets. + +DESCRIPTION + See Readme.md + +AUTHOR + Saxon A. Nelson-Milton + +LICENSE + MpegTs.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). +*/ + +package mpegts + +import ( + "bitbucket.org/ausocean/av/tools" + "errors" + //"fmt" +) + +const ( + maxMpegTsSize = 188 + mpegtsPayloadSize = 176 +) + +/* +The below data struct encapsulates the fields of an MPEG-TS packet. Below is +the formatting of an MPEG-TS packet for reference! + + MPEG-TS Packet Formatting +============================================================================ +| octet no | bit 0 | bit 1 | bit 2 | bit 3 | bit 4 | bit 5 | bit 6 | bit 7 | +============================================================================ +| octet 0 | sync byte (0x47) | +---------------------------------------------------------------------------- +| octet 1 | TEI | PUSI | Prior | PID | +---------------------------------------------------------------------------- +| octet 2 | PID cont. | +---------------------------------------------------------------------------- +| octet 3 | TSC | AFC | CC | +---------------------------------------------------------------------------- +| octet 4 | AFL | +---------------------------------------------------------------------------- +| octet 5 | DI | RAI | ESPI | PCRF | OPCRF | SPF | TPDF | AFEF | +---------------------------------------------------------------------------- +| optional | PCR (48 bits => 6 bytes) | +---------------------------------------------------------------------------- +| - | PCR cont. | +---------------------------------------------------------------------------- +| - | PCR cont. | +---------------------------------------------------------------------------- +| - | PCR cont. | +---------------------------------------------------------------------------- +| - | PCR cont. | +---------------------------------------------------------------------------- +| - | PCR cont. | +---------------------------------------------------------------------------- +| optional | OPCR (48 bits => 6 bytes) | +---------------------------------------------------------------------------- +| - | OPCR cont. | +---------------------------------------------------------------------------- +| - | OPCR cont. | +---------------------------------------------------------------------------- +| - | OPCR cont. | +---------------------------------------------------------------------------- +| - | OPCR cont. | +---------------------------------------------------------------------------- +| - | OPCR cont. | +---------------------------------------------------------------------------- +| optional | SC | +---------------------------------------------------------------------------- +| optional | TPDL | +---------------------------------------------------------------------------- +| optional | TPD (variable length) | +---------------------------------------------------------------------------- +| - | ... | +---------------------------------------------------------------------------- +| optional | Extension (variable length) | +---------------------------------------------------------------------------- +| - | ... | +---------------------------------------------------------------------------- +| optional | Stuffing (variable length) | +---------------------------------------------------------------------------- +| - | ... | +---------------------------------------------------------------------------- +| optional | Payload (variable length) | +---------------------------------------------------------------------------- +| - | ... | +---------------------------------------------------------------------------- +*/ +type MpegTsPacket struct { + TEI bool // Transport Error Indicator + PUSI bool // Payload Unit Start Indicator + Priority bool // Tranposrt priority indicator + PID uint16 // Packet identifier + TSC byte // Transport Scrambling Control + AFC byte // Adaption Field Control + CC byte // Continuity Counter + DI bool // Discontinouty indicator + RAI bool // random access indicator + ESPI bool // Elementary stream priority indicator + PCRF bool // PCR flag + OPCRF bool // OPCR flag + SPF bool // Splicing point flag + TPDF bool // Transport private data flag + AFEF bool // Adaptation field extension flag + PCR uint64 // Program clock reference + OPCR uint64 // Original program clock reference + SC byte // Splice countdown + TPDL byte // Tranposrt private data length + TPD []byte // Private data + Ext []byte // Adaptation field extension + Payload []byte // Mpeg ts payload +} + +// TODO: make payload private considering we now have FillPayload method + +func (p *MpegTsPacket) FillPayload(channel chan byte){ + p.Payload = make([]byte,0,mpegtsPayloadSize) + currentPktLength := 6 + int(tools.BoolToByte(p.PCRF))*6+int(tools.BoolToByte(p.OPCRF))*6+ + int(tools.BoolToByte(p.SPF))*1+int(tools.BoolToByte(p.TPDF))*1+len(p.TPD) + for (currentPktLength+len(p.Payload)) < 188 { + //fmt.Printf("len(channel): %v\n", len(channel)) + select { + case nextByte := <-channel: + p.Payload = append(p.Payload,nextByte) + default: + return + } + } +} + +func (p *MpegTsPacket) ToByteSlice() (output []byte, err error) { + stuffingLength := 182-len(p.Payload)-len(p.TPD)-int(tools.BoolToByte(p.PCRF))*6- + int(tools.BoolToByte(p.OPCRF))*6 - int(tools.BoolToByte(p.SPF)) + var stuffing []byte + if stuffingLength > 0 { + stuffing = make([]byte,stuffingLength) + } + for i := range stuffing { + stuffing[i] = 0xFF + } + afl := 1+int(tools.BoolToByte(p.PCRF))*6+int(tools.BoolToByte(p.OPCRF))* + 6+int(tools.BoolToByte(p.SPF))*1+int(tools.BoolToByte(p.TPDF))*1+len(p.TPD)+len(stuffing) + output = make([]byte,0,maxMpegTsSize) + output = append(output, []byte{ + 0x47, + (tools.BoolToByte(p.TEI)<<7 | tools.BoolToByte(p.PUSI)<<6 | tools.BoolToByte(p.Priority)<<5 | + byte((p.PID&0xFF00)>>8)), + byte(p.PID & 0x00FF), + (p.TSC<<6 | p.AFC<<4 | p.CC),}...) + + if p.AFC == 3 || p.AFC == 2 { + output = append(output, []byte{ + byte(afl), (tools.BoolToByte(p.DI)<<7 | tools.BoolToByte(p.RAI)<<6 | tools.BoolToByte(p.ESPI)<<5 | + tools.BoolToByte(p.PCRF)<<4 | tools.BoolToByte(p.OPCRF)<<3 | tools.BoolToByte(p.SPF)<<2 | + tools.BoolToByte(p.TPDF)<<1 | tools.BoolToByte(p.AFEF)), + }...) + for i := 40; p.PCRF && i >= 0; i-=8 { + output = append(output, byte((p.PCR<<15)>>uint(i))) + } + for i := 40; p.OPCRF && i >= 0; i-=8 { + output = append(output, byte(p.OPCR>>uint(i))) + } + if p.SPF { + output = append(output, p.SC) + } + if p.TPDF { + output = append(output, append([]byte{p.TPDL}, p.TPD...)...) + } + output = append(output, p.Ext...) + output = append(output, stuffing...) + } + output = append(output, p.Payload...) + + if len(output) != 188 { + err = errors.New("Length of MPEG-TS packet is not 188! Something is wrong!") + } + return +} diff --git a/mpegts/mpegts_test.go b/mpegts/mpegts_test.go new file mode 100644 index 00000000..6b4a211d --- /dev/null +++ b/mpegts/mpegts_test.go @@ -0,0 +1,72 @@ +/* +NAME + MpegTs.go - provides a data structure intended to encapsulate the properties + of an MpegTs packet. + +DESCRIPTION + See Readme.md + +AUTHOR + Saxon Nelson-Milton + +LICENSE + MpegTs.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). +*/ + +package mpegts + +import ( + "testing" + _"fmt" +) + +// Just ensure that we can create a byte slice with a mpegts packet correctly +func TestMpegTsToByteSlice(t *testing.T){ + payload := []byte{0x56,0xA2,0x78,0x89,0x67} + pcr := 100000 // => 100000 + stuffing := make([]byte,171) + for i := range stuffing { + stuffing[i] = 0xFF + } + tsPkt := MpegTsPacket{ + PUSI: true, + PID: uint16(256), + AFC: byte(3), + AFL: 7+171, + CC: byte(6), + PCRF: true, + PCR: uint64(pcr), + Stuff: stuffing, + Payload: payload, + } + expectedOutput := []byte{ 0x47, 0x41, 0x00, 0x36, byte(178),0x10} + for i := 40; i >= 0; i-= 8 { + expectedOutput = append(expectedOutput,byte(pcr>>uint(i))) + } + for i := 0; i < 171; i++ { + expectedOutput = append(expectedOutput, 0xFF) + } + expectedOutput = append(expectedOutput,payload...) + tsPktAsByteSlice, err := tsPkt.ToByteSlice() + if err != nil { + t.Errorf("Should not have got error!") + } + for i := 0; i < 188; i++ { + if tsPktAsByteSlice[i] != expectedOutput[i] { + t.Errorf("Conversion to byte slice bad! Byte: %v Wanted: %v Got: %v", i, expectedOutput[i], tsPktAsByteSlice[i]) + } + } +} diff --git a/mpegts/psi/pat.go b/mpegts/psi/pat.go new file mode 100644 index 00000000..9d40fa3b --- /dev/null +++ b/mpegts/psi/pat.go @@ -0,0 +1,51 @@ +/* +NAME + revid - a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols. + +DESCRIPTION + See Readme.md + +AUTHOR + Alan Noble + +LICENSE + revid is Copyright (C) 2017 Alan Noble. + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). +*/ + +package psi + +type PAT struct { + PF byte // POint field + PFB []byte // pointer filler bytes + TableID byte // Table ID + SSI bool // Sectiopn syntax indicator (1 for PAT, PMT, CAT) + PB bool // Private bit (0 for PAT, PMT, CAT) + SL uint16 // Section length + TIE uint16 // Table ID extension + Version byte // Version number + CNI bool // Current/next indicator + Section byte // Section number + LSN byte // Last section number + DT byte // Descriptor tag + DL byte // Descriptor length + Program uint16 // Program number + PMPID uint16 // Program map PID + CRC32 uint32 // Checksum of table +} + +type (p *PAT)ToByteSlice()(output []byte){ + +} diff --git a/mpegts/psi/pmt.go b/mpegts/psi/pmt.go new file mode 100644 index 00000000..ee5e89ee --- /dev/null +++ b/mpegts/psi/pmt.go @@ -0,0 +1,47 @@ +/* +NAME + revid - a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols. + +DESCRIPTION + See Readme.md + +AUTHOR + Alan Noble + +LICENSE + revid is Copyright (C) 2017 Alan Noble. + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). +*/ + +package psi + +type PMT struct { + PF byte // POint field + PFB []byte // pointer filler bytes + TableID byte // Table ID + SSI bool // Sectiopn syntax indicator (1 for PAT, PMT, CAT) + PB bool // Private bit (0 for PAT, PMT, CAT) + SL uint16 // Section length + TIE uint16 // Table ID extension + Version byte // Version number + CNI bool // Current/next indicator + Section byte // Section number + LSN byte // Last section number + +} + +func (p* PMT)ToByteSlice()(output []byte){ + +} diff --git a/nal/NalAccessUnit.go b/nal/NalAccessUnit.go new file mode 100644 index 00000000..0de160fb --- /dev/null +++ b/nal/NalAccessUnit.go @@ -0,0 +1,50 @@ +/* +NAME + PES.go - +DESCRIPTION + See Readme.md + +AUTHOR + Saxon Nelson-Milton + +LICENSE + PES.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). +*/ + +package nal + +type NalAccessUnit struct { + SPS []byte + PPS []byte + SEI [][]byte + Data [][]byte +} + +func (u *NalAccessUnit) AsAnnexB() (output []byte) { + startCode := []byte{ 0x00,0x00,0x01} + AUD := []byte{0x09, 0xF0} + format := [][]byte{startCode, AUD, startCode, u.SPS, startCode, u.PPS } + for i := range format { + output = append(output,format[i]...) + } + for i := range u.SEI { + output = append(output, append(startCode,u.SEI[i]...)...) + } + for i := range u.Data { + output = append(output, append(startCode,u.Data[i]...)...) + } + return +} diff --git a/nal/NalUnit.go b/nal/NalUnit.go new file mode 100644 index 00000000..5af02fe4 --- /dev/null +++ b/nal/NalUnit.go @@ -0,0 +1,101 @@ +/* +NAME + PES.go - +DESCRIPTION + See Readme.md + +AUTHOR + Saxon Nelson-Milton + +LICENSE + PES.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). +*/ + +package nal + +type NALUnit interface { + ToByteSlice() []byte + GetType() byte +} + +type NALSpsPps struct { + Data []byte +} + +type NALFragment struct { + ThreeNUBs byte + FragmentType byte + Start bool + End bool + Reserved bool + FiveNUBs byte + Data []byte +} + + +func GetNalType(unit []byte) byte { + return unit[0] & 0x1F +} + +/* +First byte: [ 3 NAL UNIT BITS | 5 FRAGMENT TYPE BITS] +Second byte: [ START BIT | END BIT | RESERVED BIT | 5 NAL UNIT BITS] +Other bytes: [... VIDEO FRAGMENT DATA...] +*/ +func ParseNALFragment(unit []byte) (u *NALFragment) { + u = new(NALFragment) + u.ThreeNUBs = (unit[0] & 0xE0) >> 5 + u.FragmentType = unit[0] & 0x1F + u.Start = (unit[1] & 0x80) != 0 + u.End = (unit[1] & 0x40) != 0 + u.Reserved = (unit[1] & 0x20) != 0 + u.FiveNUBs = unit[1] & 0x1F + u.Data = make([]byte,len(unit[2:])) + copy(u.Data[:],unit[2:]) + return +} + +func ParseNALSpsPps(unit []byte)(u *NALSpsPps){ + u = new(NALSpsPps) + u.Data = make([]byte,len(unit)) + copy(u.Data[:],unit[:]) + return +} + +func (u *NALFragment) ToByteSlice() (output []byte) { + output = make([]byte, 2+len(u.Data)) + output[0] = ( u.ThreeNUBs << 5 ) | u.FragmentType + output[1] = boolToByte( u.Start ) << 7 | + boolToByte( u.End ) << 6 | + boolToByte( u.Reserved ) << 5 | + u.FiveNUBs + copy(output[2:],u.Data) + return +} + +func (u *NALFragment) GetType() byte { + return GetNalType(u.ToByteSlice()) +} + +func (u *NALSpsPps) GetType() byte { + return GetNalType(u.ToByteSlice()) +} + +func (u *NALSpsPps) ToByteSlice() (output []byte){ + output = make([]byte,len(u.Data)) + output = u.Data + return +} diff --git a/nal/nal_test.go b/nal/nal_test.go new file mode 100644 index 00000000..6f13ea07 --- /dev/null +++ b/nal/nal_test.go @@ -0,0 +1,118 @@ +/* +NAME + MpegTs.go - provides a data structure intended to encapsulate the properties + of an MpegTs packet. + +DESCRIPTION + See Readme.md + +AUTHOR + Saxon Nelson-Milton + +LICENSE + MpegTs.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). +*/ + +package nal + +import ( + "testing" +) + +var parseInput = []byte{ + 0x6C, // 3NalUnitBits = 101(5), Fragment type = 1100 (type = 12 ) + 0x94, // starbit = 1, endbit = 0, Reservedbit = 0, 5NalUnitBits = 10100 (20) + 0x8E, // 10001110 random frame byte + 0x26, // 00100110 random frame byte + 0xD0, // 11010000 random frame byte +} + +var expectedParsing = []interface{}{ + byte(3), + byte(12), + bool(true), + bool(false), + bool(false), + byte(20), + []byte{0x8E, 0x26, 0xD0}, +} + +const ( + nalTestType = 12 +) + +func TestNalFragmentParsing(t *testing.T) { + nalUnit := ParseNALFragment(parseInput) + value := reflect.ValueOf(*nalUnit) + length := value.NumField() + fields := make([]interface{}, length) + for ii := 0; ii < length; ii++ { + fields[ii] = value.Field(ii).Interface() + } + for ii := range fields { + if !reflect.DeepEqual(fields[ii], expectedParsing[ii]) { + t.Errorf("Bad Parsing! Field: %v wanted: %v got: %v\n", ii, expectedParsing[ii], + fields[ii]) + } + } +} + +func TestNalFragmentToByteSlice(t *testing.T) { + nalUnit := ParseNALFragment(parseInput) + output := nalUnit.ToByteSlice() + for ii := range output { + if output[ii] != parseInput[ii] { + t.Errorf("Bad conversion to byte slice at %vth byte! wanted: %v got: %v", + parseInput[ii], output[ii]) + } + } +} + +func TestNalFragmentType(t *testing.T) { + nalUnit := ParseNALFragment(parseInput) + nalType := nalUnit.GetType() + if nalType != nalTestType { + t.Errorf("Returned wrong type!") + } +} + +func TestNalSpsPpsParsing(t *testing.T) { + nalSpsPps := ParseNALSpsPps(parseInput) + for ii := range parseInput { + if nalSpsPps.Data[ii] != parseInput[ii] { + t.Errorf("Bad Parsing! Byte: %v wanted: %v got: %v\n", ii, parseInput[ii], + nalSpsPps.Data[ii]) + } + } +} + +func TestNalSpsPpsToByteSlice(t *testing.T) { + nalSpsPps := ParseNALSpsPps(parseInput) + nalSpsPpsByteSlice := nalSpsPps.ToByteSlice() + for ii := range parseInput { + if nalSpsPpsByteSlice[ii] != parseInput[ii] { + t.Errorf("Bad conversion to byte slice! Byte: %v wanted: %v got: %v\n", ii, + parseInput[ii], nalSpsPpsByteSlice[ii]) + } + } +} + +func TestNalSpsPpsType(t *testing.T) { + nalSpsPps := ParseNALSpsPps(parseInput) + if nalSpsPps.GetType() != nalTestType { + t.Errorf("Returned wrong type!") + } +} diff --git a/pes/Pes.go b/pes/Pes.go new file mode 100644 index 00000000..0c526960 --- /dev/null +++ b/pes/Pes.go @@ -0,0 +1,127 @@ +/* +NAME + PES.go - +DESCRIPTION + See Readme.md + +AUTHOR + Saxon A. Nelson-Milton + +LICENSE + PES.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). +*/ + +package pes + +import ( + "bitbucket.org/ausocean/av/tools" +) + +const ( + maxPesSize = 10000 +) + +/* +The below data struct encapsulates the fields of an PES packet. Below is +the formatting of a PES packet for reference! + + PES Packet Formatting +============================================================================ +| octet no | bit 0 | bit 1 | bit 2 | bit 3 | bit 4 | bit 5 | bit 6 | bit 7 | +============================================================================ +| octet 0 | 0x00 | +---------------------------------------------------------------------------- +| octet 1 | 0x00 | +---------------------------------------------------------------------------- +| octet 2 | 0x01 | +---------------------------------------------------------------------------- +| octet 3 | Stream ID (0xE0 for video) | +---------------------------------------------------------------------------- +| octet 4 | PES Packet Length (no of bytes in packet after this field) | +---------------------------------------------------------------------------- +| octet 5 | PES Length cont. | +---------------------------------------------------------------------------- +| octet 6 | 0x2 | SC | Prior | DAI | Copyr | Copy | +---------------------------------------------------------------------------- +| octet 7 | PDI | ESCRF | ESRF | DSMTMF| ACIF | CRCF | EF | +---------------------------------------------------------------------------- +| octet 8 | PES Header Length | +---------------------------------------------------------------------------- +| optional | optional fields (determined by flags above) (variable Length) | +---------------------------------------------------------------------------- +| - | ... | +---------------------------------------------------------------------------- +| optional | stuffing bytes (varible length) | +---------------------------------------------------------------------------- +| - | ... | +---------------------------------------------------------------------------- +| Optional | Data (variable length) | +---------------------------------------------------------------------------- +| - | ... | +---------------------------------------------------------------------------- +*/ +// TODO: add DSMTM, ACI, CRC, Ext fields +type PESPacket struct { + StreamID byte // Type of stream + Length uint16 // Pes packet length in bytes after this field + SC byte // Scrambling control + Priority bool // Priority Indicator + DAI bool // Data alginment indicator + Copyright bool // Copyright indicator + Original bool // Original data indicator + PDI byte // PTS DTS indicator + ESCRF bool // Elementary stream clock reference flag + ESRF bool // Elementary stream rate reference flag + DSMTMF bool // Dsm trick mode flag + ACIF bool // Additional copy info flag + CRCF bool // Not sure + EF bool // Extension flag + HeaderLength byte // Pes header length + PTS uint64 // Presentation time stamp + DTS uint64 // Decoding timestamp + ESCR uint64 // Elementary stream clock reference + ESR uint32 // Elementary stream rate reference + Stuff []byte // Stuffing bytes + Data []byte // Pes packet data +} + +func (p *PESPacket) ToByteSlice() (output []byte) { + output = make([]byte, 0, maxPesSize) + output = append(output, []byte{ + 0x00, 0x00, 0x01, + p.StreamID, + byte((p.Length & 0xFF00) >> 8), + byte(p.Length & 0x00FF), + (0x2<<6 | p.SC<<4 | tools.BoolToByte(p.Priority)<<3 | tools.BoolToByte(p.DAI)<<2 | + tools.BoolToByte(p.Copyright)<<1 | tools.BoolToByte(p.Original)), + (p.PDI<<6 | tools.BoolToByte(p.ESCRF)<<5 | tools.BoolToByte(p.ESRF)<<4 | tools.BoolToByte(p.DSMTMF)<<3 | + tools.BoolToByte(p.ACIF)<<2 | tools.BoolToByte(p.CRCF)<<1 | tools.BoolToByte(p.EF)), + p.HeaderLength, + }...) + if p.PDI == byte(2) { + pts := 0x2100010001 | (p.PTS&0x1C0000000)<<3 | (p.PTS&0x3FFF8000)<<2 | + (p.PTS&0x7FFF)<<1 + output = append(output, []byte{ + byte((pts & 0xFF00000000) >> 32), + byte((pts & 0x00FF000000) >> 24), + byte((pts & 0x0000FF0000) >> 16), + byte((pts & 0x000000FF00) >> 8), + byte(pts & 0x00000000FF), + }...) + } + output = append(output, append(p.Stuff, p.Data...)...) + return +} diff --git a/pes/pes_test.go b/pes/pes_test.go new file mode 100644 index 00000000..5f47f16a --- /dev/null +++ b/pes/pes_test.go @@ -0,0 +1,76 @@ +/* +NAME + MpegTs.go - provides a data structure intended to encapsulate the properties + of an MpegTs packet. + +DESCRIPTION + See Readme.md + +AUTHOR + Saxon Nelson-Milton + +LICENSE + MpegTs.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). +*/ + +package pes + +import ( + "testing" +) + +const ( + dataLength = 3 // bytes +) + +func TestPesToByteSlice(t *testing.T) { + pesPkt := PESPacket{ + StreamID: 0xE0, // StreamID + PDI: byte(2), + PTS: 100000, + HeaderLength: byte(10), + Stuff: []byte{0xFF,0xFF,}, + Data: []byte{ 0xEA, 0x4B, 0x12, }, + } + pesExpectedOutput := []byte{ + 0x00, // packet start code prefix byte 1 + 0x00, // packet start code prefix byte 2 + 0x01, // packet start code prefix byte 3 + 0xE0, // stream ID + 0x00, // PES Packet length byte 1 + 0x00, // PES packet length byte 2 + 0x80, // Marker bits,ScramblingControl, Priority, DAI, Copyright, Original + 0x80, // PDI, ESCR, ESRate, DSMTrickMode, ACI, CRC, Ext + byte(10), // header length + 0x21, // PCR byte 1 + 0x00, // pcr byte 2 + 0x07, // pcr byte 3 + 0x0D, // pcr byte 4 + 0x41, // pcr byte 5 + 0xFF, // Stuffing byte 1 + 0xFF, // stuffing byte 3 + 0xEA, // data byte 1 + 0x4B, // data byte 2 + 0x12, // data byte 3 + } + pesPktAsByteSlice := pesPkt.ToByteSlice() + for ii := range pesPktAsByteSlice { + if pesPktAsByteSlice[ii] != pesExpectedOutput[ii] { + t.Errorf("Conversion to byte slice bad! Byte: %v Wanted: %v Got: %v", + ii, pesExpectedOutput[ii], pesPktAsByteSlice[ii]) + } + } +} diff --git a/revid/RevidInstance.go b/revid/RevidInstance.go new file mode 100644 index 00000000..6db415c3 --- /dev/null +++ b/revid/RevidInstance.go @@ -0,0 +1,346 @@ +/* +NAME + revid - a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols. + +DESCRIPTION + See Readme.md + +AUTHORS + Alan Noble + Saxon A. Nelson-Milton + +LICENSE + revid is Copyright (C) 2017 Alan Noble. + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). +*/ + +// revid is a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols. +package revid + +import ( + "bufio" + "bytes" + "fmt" + "io" + "io/ioutil" + "net" + "net/http" + "os" + "os/exec" + "strconv" + "time" + + "bitbucket.org/ausocean/av/h264" + "bitbucket.org/ausocean/av/tsgenerator" + + "bitbucket.org/ausocean/av/ringbuffer" + "bitbucket.org/ausocean/utils/smartLogger" +) + +// defaults and networking consts +const ( + clipDuration = 1 // s + mp2tPacketSize = 188 // MPEG-TS packet size + mp2tMaxPackets = 2016 * clipDuration // # first multiple of 7 and 8 greater than 2000 + udpPackets = 7 // # of UDP packets per ethernet frame (8 is the max) + rtpPackets = 7 // # of RTP packets per ethernet frame (7 is the max) + rtpHeaderSize = 12 + rtpSSRC = 1 // any value will do + bufferSize = 100 / clipDuration + httpTimeOut = 5 // s + packetsPerFrame = 7 + h264BufferSize = 500000 + bitrateTime = 60 +) + +// Log Types +const ( + Error = "Error" + Warning = "Warning" + Info = "Info" + Debug = "Debug" +) + +// Config enums +const ( + Raspivid = 0 + Rtp = 1 + H264Codec = 2 + File = 4 + HttpOut = 5 +) + +type Config struct { + Input uint8 + InputCmd string + Output uint8 + OutputFileName string + InputFileName string + Height string + Width string + Bitrate string + FrameRate string + HttpAddress string + Quantization string + Logger smartLogger.LogInstance +} + +type RevidInst interface { + Start() + Stop() + ChangeState(newconfig Config) error + GetConfigRef() *Config + Log(logType, m string) + IsRunning() bool +} + +type revidInst struct { + dumpPCRBase uint64 + conn net.Conn + ffmpegPath string + tempDir string + ringBuffer ringbuffer.RingBuffer + config Config + isRunning bool + outputFile *os.File + inputFile *os.File + generator tsgenerator.TsGenerator + h264Parser h264.H264Parser + cmd *exec.Cmd + inputReader *bufio.Reader +} + +func NewRevidInstance(config Config) (r *revidInst, err error) { + r = new(revidInst) + r.ringBuffer = ringbuffer.NewRingBuffer(bufferSize, mp2tPacketSize*mp2tMaxPackets) + r.dumpPCRBase = 0 + r.ChangeState(config) + switch r.config.Output { + case File: + r.outputFile, err = os.Create(r.config.OutputFileName) + if err != nil { + return nil, err + } + } + switch r.config.Input { + case File: + r.inputFile, err = os.Open(r.config.InputFileName) + if err != nil { + return nil, err + } + } + r.generator = tsgenerator.NewTsGenerator(25) + r.generator.Start() + r.h264Parser = h264.H264Parser{OutputChan: r.generator.GetNalInputChan()} + r.h264Parser.Start() + go r.input() + r.Log(Info, "New revid instance created! config is:") + r.Log(Info, fmt.Sprintf("%v", r.config)) + return +} + +func (r *revidInst) GetConfigRef() *Config { + return &r.config +} + +func (r *revidInst) ChangeState(newconfig Config) error { + // TODO: check that the config is legit + r.config = newconfig + return nil +} + +func (r *revidInst) Log(logType, m string) { + r.config.Logger.Log(logType, m) +} + +func (r *revidInst) IsRunning() bool { + return r.isRunning +} + +func (r *revidInst) Start() { + if r.isRunning { + r.Log(Warning, "revidInst.Start() called but revid already running!") + return + } + r.Log(Info, "Starting Revid!") + var h264Data []byte + switch r.config.Input { + case Raspivid: + r.Log(Info, "Starting raspivid!") + r.cmd = exec.Command("raspivid", "-o", "-", "-n", "-t", "0", "-b", + r.config.Bitrate, "-qp", r.config.Quantization, "-w", r.config.Width, "-h", r.config.Height, "-fps", r.config.FrameRate, "-ih", "-g", "100") + stdout, _ := r.cmd.StdoutPipe() + err := r.cmd.Start() + r.inputReader = bufio.NewReader(stdout) + if err != nil { + r.Log(Error, err.Error()) + return + } + r.isRunning = true + go func() { + r.Log(Info, "Reading camera data!") + for r.isRunning { + h264Data = make([]byte, 1) + _, err := io.ReadFull(r.inputReader, h264Data) + switch { + case err != nil && err.Error() == "EOF" && r.isRunning: + r.Log(Error, "No data from camera!") + time.Sleep(5 * time.Second) + case err != nil && r.isRunning: + r.Log(Error, err.Error()) + default: + r.h264Parser.InputByteChan <- h264Data[0] + } + } + r.Log(Info, "Out of reading routine!") + }() + case File: + stats, err := r.inputFile.Stat() + if err != nil { + r.Log(Error, "Could not get input file stats!") + r.Stop() + return + } + h264Data = make([]byte, stats.Size()) + _, err = r.inputFile.Read(h264Data) + if err != nil { + r.Log(Error, err.Error()) + r.Stop() + return + } + for i := range h264Data { + r.h264Parser.InputByteChan <- h264Data[i] + } + } + go r.output() +} + +func (r *revidInst) Stop() { + if r.isRunning { + r.Log(Info, "Stopping revid!") + r.isRunning = false + r.cmd.Process.Kill() + } else { + r.Log(Warning, "revidInst.Stop() called but revid not running!") + } +} + +func (r *revidInst) input() { + clipSize := 0 + packetCount := 0 + now := time.Now() + prevTime := now + for { + if clip, err := r.ringBuffer.Get(); err != nil { + r.Log(Error, err.Error()) + r.Log(Warning, "Clearing TS chan!") + for len(r.generator.GetTsOutputChan()) > 0 { + <-(r.generator.GetTsOutputChan()) + } + time.Sleep(1 * time.Second) + } else { + for { + tsPacket := <-(r.generator.GetTsOutputChan()) + tsByteSlice, err := tsPacket.ToByteSlice() + if err != nil { + r.Log(Error, err.Error()) + } + upperBound := clipSize + mp2tPacketSize + copy(clip[clipSize:upperBound], tsByteSlice) + packetCount++ + clipSize += mp2tPacketSize + // send if (1) our buffer is full or (2) 1 second has elapsed and we have % packetsPerFrame + now = time.Now() + if (packetCount == mp2tMaxPackets) || + (now.Sub(prevTime) > clipDuration*time.Second && packetCount%packetsPerFrame == 0) { + if err := r.ringBuffer.DoneWriting(clipSize); err != nil { + r.Log(Error, err.Error()) + r.Log(Warning, "Dropping clip!") + } + clipSize = 0 + packetCount = 0 + prevTime = now + break + } + } + } + } +} + +func (r *revidInst) output() { + now := time.Now() + prevTime := now + bytes := 0 + delay := 0 + for r.isRunning { + switch { + case r.ringBuffer.GetNoOfElements() < 2: + delay++ + time.Sleep(time.Duration(delay) * time.Millisecond) + case delay > 10: + delay -= 10 + } + if clip, err := r.ringBuffer.Read(); err == nil { + r.Log(Debug, fmt.Sprintf("Delay is: %v\n", delay)) + r.Log(Debug, fmt.Sprintf("Ring buffer size: %v\n", r.ringBuffer.GetNoOfElements())) + switch r.config.Output { + case File: + r.outputFile.Write(clip) + case HttpOut: + bytes += len(clip) + for err := r.sendClipToHTTP(clip, r.config.HttpAddress); err != nil; { + r.Log(Error, err.Error()) + r.Log(Warning, "Post failed trying again!") + err = r.sendClipToHTTP(clip, r.config.HttpAddress) + } + default: + r.Log(Error, "No output defined!") + } + if err := r.ringBuffer.DoneReading(); err != nil { + r.Log(Error, err.Error()) + } + now = time.Now() + deltaTime := now.Sub(prevTime) + if deltaTime > time.Duration(bitrateTime)*time.Second { + r.Log(Info, fmt.Sprintf("Bitrate: %v bits/s\n", int64(float64(bytes*8)/float64(deltaTime/1e9)))) + r.Log(Info, fmt.Sprintf("Ring buffer size: %v\n", r.ringBuffer.GetNoOfElements())) + prevTime = now + bytes = 0 + } + } + } +} + +// sendClipToHTPP posts a video clip via HTTP, using a new TCP connection each time. +func (r *revidInst) sendClipToHTTP(clip []byte, output string) error { + timeout := time.Duration(httpTimeOut * time.Second) + client := http.Client{ + Timeout: timeout, + } + url := output + strconv.Itoa(len(clip)) + r.Log(Debug, fmt.Sprintf("Posting %s (%d bytes)\n", url, len(clip))) + resp, err := client.Post(url, "video/mp2t", bytes.NewReader(clip)) // lighter than NewBuffer + if err != nil { + return fmt.Errorf("Error posting to %s: %s", output, err) + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err == nil { + r.Log(Debug, fmt.Sprintf("%s\n", body)) + } else { + r.Log(Error, err.Error()) + } + return nil +} diff --git a/revid/out.mp4 b/revid/out.mp4 new file mode 100644 index 00000000..3c774417 Binary files /dev/null and b/revid/out.mp4 differ diff --git a/revid/out.ts b/revid/out.ts new file mode 100644 index 00000000..f05b9430 Binary files /dev/null and b/revid/out.ts differ diff --git a/revid/output/saxonOut.ts b/revid/output/saxonOut.ts new file mode 100644 index 00000000..e29f9faf Binary files /dev/null and b/revid/output/saxonOut.ts differ diff --git a/revid/revid b/revid/revid new file mode 100755 index 00000000..ec424fa5 Binary files /dev/null and b/revid/revid differ diff --git a/revid/revid_test.go b/revid/revid_test.go new file mode 100644 index 00000000..cd5d8d6f --- /dev/null +++ b/revid/revid_test.go @@ -0,0 +1,83 @@ +/* +NAME + MpegTs.go - provides a data structure intended to encapsulate the properties + of an MpegTs packet. + +DESCRIPTION + See Readme.md + +AUTHOR + Saxon Nelson-Milton + +LICENSE + MpegTs.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). +*/ + +package revid + +import ( + "testing" + "time" +) + +/* + * Testing with file input + * + */ + /* +func TestFileInput(t *testing.T){ + config := Config{ + Input: file, + InputFileName: "testInput.h264", + Output: file, + OutputFileName: "output/TestFileAsInput.ts", + } + revidInst, err := NewRevidInstance(config) + if err != nil { + t.Errorf("Should not have got error!") + } + revidInst.Start() + time.Sleep(100*time.Second) + revidInst.Stop() +} +* */ + + +/* + Testing use with raspivid +*/ +func TestRaspividInput(t *testing.T){ + config := Config{ + Input: Raspivid, + Output: File, + OutputFileName: "output/TestRaspividOutput.ts", + Width: "1280", + Height: "720", + Bitrate: "1000000", + FrameRate: "25", + } + revidInst, err := NewRevidInstance(config) + if err != nil { + t.Errorf("Should not have got an error!") + } + revidInst.Start() + time.Sleep(100*time.Second) + revidInst.Stop() +} + + + + diff --git a/revid/test.bash b/revid/test.bash deleted file mode 100644 index 293b4e74..00000000 --- a/revid/test.bash +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/bash -echo Running Revid with input: rtsp://184.72.239.149/vod/mp4:BigBuckBunny_175k.mov -echo and output: rtp://0.0.0.0:1234 -revid -i rtsp://184.72.239.149/vod/mp4:BigBuckBunny_175k.mov -m r -o rtp://0.0.0.0:1234 diff --git a/revid/test.bat b/revid/test.bat deleted file mode 100644 index 59f5c573..00000000 --- a/revid/test.bat +++ /dev/null @@ -1,4 +0,0 @@ -@echo off -echo Running Revid with input: rtsp://184.72.239.149/vod/mp4:BigBuckBunny_175k.mov -echo and output: rtp://0.0.0.0:1234 -revid -i rtsp://184.72.239.149/vod/mp4:BigBuckBunny_175k.mov -m r -o rtp://0.0.0.0:1234 diff --git a/revid/testInput.h264 b/revid/testInput.h264 new file mode 100644 index 00000000..52372371 Binary files /dev/null and b/revid/testInput.h264 differ diff --git a/ringbuffer/RingBuffer.go b/ringbuffer/RingBuffer.go index 197790f3..21f1b07a 100644 --- a/ringbuffer/RingBuffer.go +++ b/ringbuffer/RingBuffer.go @@ -45,6 +45,11 @@ type RingBuffer interface { DoneReading() error IsReadable() bool IsWritable() bool + GetNoOfElements() int +} + +func (rb *ringBuffer)GetNoOfElements() int { + return rb.noOfElements } // ringBuffer implements the RingBuffer interface @@ -95,13 +100,15 @@ func (rb *ringBuffer) Get() ([]byte, error) { if !rb.IsWritable() { return nil, errors.New("Buffer full!") } - if rb.currentlyWriting { - return nil, errors.New("Second call to Get! Call DoneWriting first!") - } - rb.currentlyWriting = true - nextlast := rb.last + 1 - if nextlast == rb.size { - nextlast = 0 + var nextlast int + if !rb.currentlyWriting { + rb.currentlyWriting = true + nextlast = rb.last + 1 + if nextlast == rb.size { + nextlast = 0 + } + } else { + nextlast = rb.last } return rb.dataMemory[nextlast], nil } diff --git a/rtp/Rtp.go b/rtp/Rtp.go new file mode 100644 index 00000000..78ea67e1 --- /dev/null +++ b/rtp/Rtp.go @@ -0,0 +1,160 @@ +/* +Copyright (c) 2015, T. Jameson Little +All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this +list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, +this list of conditions and the following disclaimer in the documentation and/or +other materials provided with the distribution. + +3. Neither the name of the copyright holder nor the names of its contributors +may be used to endorse or promote products derived from this software without +specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. +IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, +INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + +LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE +OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED +OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + +package rtp + +import ( + "net" + _"fmt" +) + +const ( + RTP_VERSION = 2 +) + +const ( + hasRtpPadding = 1 << 5 + hasRtpExt = 1 << 4 + hasMarker = 1 << 7 +) + +type RtpPacket struct { + Version byte + Padding bool + Ext bool + CC byte + Marker bool + PayloadType byte + SequenceNumber uint + Timestamp uint + SyncSource uint + CSRC []uint + ExtHeader uint + ExtData []byte + Payload []byte +} + +type Session struct { + Rtp net.PacketConn + Rtcp net.PacketConn + RtpChan <-chan RtpPacket + RtcpChan <-chan []byte + rtpChan chan<- RtpPacket + rtcpChan chan<- []byte +} + +func NewSession(rtp, rtcp net.PacketConn) *Session { + rtpChan := make(chan RtpPacket, 10) + rtcpChan := make(chan []byte, 10) + s := &Session{ + Rtp: rtp, + Rtcp: rtcp, + RtpChan: rtpChan, + RtcpChan: rtcpChan, + rtpChan: rtpChan, + rtcpChan: rtcpChan, + } + go s.HandleRtpConn(rtp) + go s.HandleRtcpConn(rtcp) + return s +} + +func toUint(arr []byte) (ret uint) { + for i, b := range arr { + ret |= uint(b) << (8 * uint(len(arr)-i-1)) + } + return ret +} + +func (s *Session) HandleRtpConn(conn net.PacketConn) { + buf := make([]byte, 4096) + for { + n, _, err := conn.ReadFrom(buf) + if err != nil { + panic(err) + } + cpy := make([]byte, n) + copy(cpy, buf) + go s.handleRtp(cpy) + } +} + +func (s *Session) HandleRtcpConn(conn net.PacketConn) { + buf := make([]byte, 4096) + for { + n, _, err := conn.ReadFrom(buf) + if err != nil { + panic(err) + } + cpy := make([]byte, n) + copy(cpy, buf) + go s.handleRtcp(cpy) + } +} + +func (s *Session) handleRtp(buf []byte) { + packet := RtpPacket{ + Version: (buf[0] & 0xC0) >> 6, + Padding: buf[0]&hasRtpPadding != 0, + Ext: buf[0]&hasRtpExt != 0, + CC: buf[0] & 0x0F, + Marker: buf[1]&hasMarker != 0, + PayloadType: buf[1] & 0x7F, + SequenceNumber: toUint(buf[2:4]), + Timestamp: toUint(buf[4:8]), + SyncSource: toUint(buf[8:12]), + CSRC: make([]uint, buf[0]&0x0F), + } + if packet.Version != RTP_VERSION { + panic("Unsupported version") + } + i := 12 + for j := range packet.CSRC { + packet.CSRC[j] = toUint(buf[i : i+4]) + i += 4 + } + if packet.Ext { + packet.ExtHeader = toUint(buf[i : i+2]) + length := toUint(buf[i+2 : i+4]) + i += 4 + if length > 0 { + packet.ExtData = buf[i : i+int(length)*4] + i += int(length) * 4 + } + } + packet.Payload = buf[i:] + + s.rtpChan <- packet +} + +func (s *Session) handleRtcp(buf []byte) { + // TODO: implement rtcp +} diff --git a/tools/helpers.go b/tools/helpers.go new file mode 100644 index 00000000..66986d3e --- /dev/null +++ b/tools/helpers.go @@ -0,0 +1,54 @@ +/* +NAME + MpegTs.go - provides a data structure intended to encapsulate the properties + of an MpegTs packet. + +DESCRIPTION + See Readme.md + +AUTHOR + Saxon Nelson-Milton + +LICENSE + MpegTs.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). +*/ + +package tools + +import ( + _"os" + _"fmt" + "bitbucket.org/ausocean/av/rtp" +) + +func BoolToByte(in bool) (out byte) { + if in { + out = 1 + } + return +} + +func GetOctectType(p *rtp.RtpPacket) byte { + return p.Payload[0] & 0x1F +} + +func GetStartBit(p *rtp.RtpPacket) byte { + return (p.Payload[1] & 0x80) >> 7 +} + +func GetEndBit(p *rtp.RtpPacket) byte { + return (p.Payload[1] & 0x40) >> 6 +} diff --git a/tools/tools_test.go b/tools/tools_test.go new file mode 100644 index 00000000..e1baa6e0 --- /dev/null +++ b/tools/tools_test.go @@ -0,0 +1,83 @@ +/* +NAME + MpegTs.go - provides a data structure intended to encapsulate the properties + of an MpegTs packet. + +DESCRIPTION + See Readme.md + +AUTHOR + Saxon Nelson-Milton + +LICENSE + MpegTs.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). +*/ + +package tools + +import ( + "testing" +) + +func TestH264Parsing(t *testing.T){ + // Using file + /* + file, err := os.Open(fileName) + if err != nil { + panic("Could not open file!") + return + } + stats, err := file.Stat() + if err != nil { + panic("Could not get file stats!") + } + buffer := make([]byte, stats.Size()) + _, err = file.Read(buffer) + if err != nil { + panic("Could not read file!") + } + */ + // straight from buffer + someData := []byte{ + 0,0,1,7,59,100,45,82,93,0,0,1,8,23,78,65,0,0,1,6,45,34,23,3,2,0,0,1,5,3,4,5, + 56,76,4,234,78,65,34,34,43,0,0,1,7,67,10,45,8,93,0,0,1,8,23,7,5,0,0,1,6, + 4,34,2,3,2,0,0,1,1,3,4,5,5,76,4,234,78,65,34,34,43,45, + } + nalAccess1 := []byte{ + 0,0,1,9,240,0,0,1,7,59,100,45,82,93,0,0,1,8,23,78,65,0,0,1,6,45,34,23,3,2,0,0,1,5,3,4,5, + 56,76,4,234,78,65,34,34,43, + } + nalAccess2 := []byte{ + 0,0,1,9,240,0,0,1,7,67,10,45,8,93,0,0,1,8,23,7,5,0,0,1,6, + 4,34,2,3,2,0,0,1,1,3,4,5,5,76,4,234,78,65,34,34,43,45, + } + aChannel := make(chan []byte, 10) + var nalAccessChan chan<- []byte + nalAccessChan = aChannel + go ParseH264Buffer(someData,nalAccessChan) + anAccessUnit := <-aChannel + for i := range anAccessUnit { + if anAccessUnit[i] != nalAccess1[i] { + t.Errorf("Should have been equal!") + } + } + anAccessUnit = <-aChannel + for i := range anAccessUnit { + if anAccessUnit[i] != nalAccess2[i] { + t.Errorf("Should have been equal!") + } + } +} diff --git a/tsgenerator/TsGenerator.go b/tsgenerator/TsGenerator.go new file mode 100644 index 00000000..43af5f8d --- /dev/null +++ b/tsgenerator/TsGenerator.go @@ -0,0 +1,268 @@ +/* +NAME + RtpToTsConverter.go - provides utilities for the conversion of Rtp packets + to equivalent MpegTs packets. + +DESCRIPTION + See Readme.md + +AUTHOR + Saxon Nelson-Milton + +LICENSE + RtpToTsConverter.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). +*/ + +package tsgenerator + +import ( + _"fmt" + _"os" + "bitbucket.org/ausocean/av/mpegts" + "bitbucket.org/ausocean/av/pes" + "bitbucket.org/ausocean/av/tools" + "bitbucket.org/ausocean/av/rtp" +) + +var ( + PatTable = []byte{0, 0, 176, 13, 0, 1, 193, 0, 0, 0, 1, 240, 0, 42, 177, 4, 178, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,} + + PmtTable = []byte{0, 2, 176, 18, 0, 1, 193, 0, 0, 0xE1, 0x00, 0xF0, 0, 0x1B, 0xE1, 0, 0xF0, 0, 0x15, 0xBD, 0x4D, 0x56, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,} +) + +const ( + SdtPid = 17 + PatPid = 0 + PmtPid = 4096 + VideoPid = 256 +) + +type TsGenerator interface { + generate() + GetNalInputChan() chan<- []byte + GetTsOutputChan() <-chan *mpegts.MpegTsPacket + Start() + genPts()(pts uint64) + genPcr()(pts uint64) +} + +type tsGenerator struct { + TsChan <-chan *mpegts.MpegTsPacket + tsChan chan<- *mpegts.MpegTsPacket + InputChan chan<- rtp.RtpPacket + inputChan <-chan rtp.RtpPacket + NalInputChan chan<- []byte + nalInputChan <-chan []byte + currentTsPacket *mpegts.MpegTsPacket + payloadByteChan chan byte + currentCC byte + currentPtsTime float64 + currentPcrTime float64 + fps uint + pesPktChan chan []byte + ccMap map[int]int +} + +func (g *tsGenerator)GetNalInputChan() chan<- []byte { + return g.NalInputChan +} + +func (g *tsGenerator)GetTsOutputChan() <-chan *mpegts.MpegTsPacket { + return g.TsChan +} + +func NewTsGenerator(fps uint) (g *tsGenerator) { + g = new(tsGenerator) + tsChan := make(chan *mpegts.MpegTsPacket, 100) + g.TsChan = tsChan + g.tsChan = tsChan + inputChan := make(chan rtp.RtpPacket, 100) + g.InputChan = inputChan + g.inputChan = inputChan + nalInputChan := make(chan []byte, 10000) + g.NalInputChan = nalInputChan + g.nalInputChan = nalInputChan + g.currentCC = 0 + g.fps = fps + g.currentPcrTime = .0 + g.currentPtsTime = .7 + g.pesPktChan = make(chan []byte, 1000) + g.payloadByteChan = make(chan byte, 100000) + g.ccMap = make(map[int]int, 4) + g.ccMap[SdtPid] = 0 + g.ccMap[PatPid] = 0 + g.ccMap[PmtPid] = 0 + g.ccMap[VideoPid] = 0 + return +} + +func (g *tsGenerator) genPts()(pts uint64){ + pts = uint64(g.currentPtsTime * float64(90000)) + g.currentPtsTime += 1.0/float64(g.fps) + return +} + +func (g *tsGenerator) genPcr()(pcr uint64){ + pcr = uint64(g.currentPcrTime * float64(90000)) + g.currentPcrTime += 1.0/float64(g.fps) + return +} + +func (g *tsGenerator) Start(){ + go g.generate() +} + +func (g *tsGenerator) generate() { + var rtpBuffer [](*rtp.RtpPacket) + for { + select { + case rtpPacket := <-g.inputChan: + rtpBuffer = append(rtpBuffer, &rtpPacket) + if len(rtpBuffer) > 2 { + // if there's something weird going on with sequence numbers then + // insertion sort + if rtpPacket.SequenceNumber < rtpBuffer[len(rtpBuffer)-2].SequenceNumber { + for i := 1; i < len(rtpBuffer); i++ { + for j := i; j > 0 && rtpBuffer[j].SequenceNumber < rtpBuffer[j-1].SequenceNumber; j-- { + temp := rtpBuffer[j] + rtpBuffer[j] = rtpBuffer[j-1] + rtpBuffer[j-1] = temp + } + } + } + } + + if len(rtpBuffer) > 200 { + for tools.GetOctectType(rtpBuffer[0]) != 7 { + rtpBuffer = rtpBuffer[1:] + } + sps := make([]byte, len(rtpBuffer[0].Payload)) + copy(sps[:], rtpBuffer[0].Payload[:]) + rtpBuffer = rtpBuffer[1:] + pps := make([]byte, len(rtpBuffer[0].Payload)) + copy(pps[:], rtpBuffer[0].Payload[:]) + rtpBuffer = rtpBuffer[1:] + sei := make([]byte, len(rtpBuffer[0].Payload)) + copy(sei[:], rtpBuffer[0].Payload[:]) + rtpBuffer = rtpBuffer[1:] + for tools.GetOctectType(rtpBuffer[0]) != 7 { + switch tools.GetOctectType(rtpBuffer[0]) { + case 28: + if tools.GetStartBit(rtpBuffer[0]) == 1 { + var buffer []byte + buffer = append(buffer, []byte{0x00, 0x00, 0x01}...) + buffer = append(buffer, []byte{0x09, 0x10}...) + buffer = append(buffer, []byte{0x00, 0x00, 0x01}...) + buffer = append(buffer, sps...) + buffer = append(buffer, []byte{0x00, 0x00, 0x01}...) + buffer = append(buffer, pps...) + buffer = append(buffer, []byte{0x00, 0x00, 0x01}...) + buffer = append(buffer, sei...) + buffer = append(buffer, []byte{0x00, 0x00, 0x01}...) + buffer = append(buffer, rtpBuffer[0].Payload[0]&0xE0|rtpBuffer[0].Payload[1]&0x1F) + buffer = append(buffer, rtpBuffer[0].Payload[2:]...) + rtpBuffer = rtpBuffer[1:] + for { + buffer = append(buffer, rtpBuffer[0].Payload[2:]...) + if tools.GetEndBit(rtpBuffer[0]) == 1 { + rtpBuffer = rtpBuffer[1:] + g.NalInputChan <- buffer + break + } + rtpBuffer = rtpBuffer[1:] + } + } + case 1: + var buffer []byte + buffer = append(buffer, []byte{0x00, 0x00, 0x01}...) + buffer = append(buffer, []byte{0x09, 0x10}...) + buffer = append(buffer, []byte{0x00, 0x00, 0x01}...) + buffer = append(buffer, sps...) + buffer = append(buffer, []byte{0x00, 0x00, 0x01}...) + buffer = append(buffer, pps...) + buffer = append(buffer, []byte{0x00, 0x00, 0x01}...) + buffer = append(buffer, sei...) + buffer = append(buffer, []byte{0x00, 0x00, 0x01}...) + buffer = append(buffer, rtpBuffer[0].Payload[0]&0xE0|rtpBuffer[0].Payload[1]&0x1F) + buffer = append(buffer, rtpBuffer[0].Payload[2:]...) + rtpBuffer = rtpBuffer[1:] + g.NalInputChan <- buffer + default: + } + } + } + case nalUnit := <-g.nalInputChan: + pesPkt := pes.PESPacket{ + StreamID: 0xE0, + PDI: byte(2), + PTS: g.genPts(), + Data: nalUnit, + HeaderLength: 5, + } + g.pesPktChan <- pesPkt.ToByteSlice() + case pesPkt := <-g.pesPktChan: + for ii := range pesPkt { + g.payloadByteChan <- pesPkt[ii] + } + pusi := true + for len(g.payloadByteChan) > 0 { + pkt := mpegts.MpegTsPacket{ + PUSI: pusi, + PID: VideoPid, + RAI: pusi, + CC: byte(g.getCC(VideoPid)), + AFC: byte(3), + PCRF: pusi, + } + pkt.FillPayload(g.payloadByteChan) + + if pusi { + // Create pat table and send off + patPkt := mpegts.MpegTsPacket{ + PUSI: pusi, + PID: PatPid, + CC: byte(g.getCC(PatPid)), + AFC: 1, + Payload: PatTable, + } + g.tsChan <- &patPkt + + // Create pmt table and send off + pmtPkt := mpegts.MpegTsPacket{ + PUSI: pusi, + PID: PmtPid, + CC: byte(g.getCC(PmtPid)), + AFC: 1, + Payload: PmtTable, + } + g.tsChan <- &pmtPkt + pkt.PCR = g.genPcr() + pusi = false + } + + g.tsChan <- &pkt + } + } + } +} + +func (g *tsGenerator) getCC(pid int) int { + temp := g.ccMap[pid] + if g.ccMap[pid]++; g.ccMap[pid] > 15 { + g.ccMap[pid] = 0 + } + return temp +} diff --git a/tsgenerator/tsgenerator_test.go b/tsgenerator/tsgenerator_test.go new file mode 100644 index 00000000..4a536409 --- /dev/null +++ b/tsgenerator/tsgenerator_test.go @@ -0,0 +1,29 @@ +/* +NAME + RtpToTsConverter.go - provides utilities for the conversion of Rtp packets + to equivalent MpegTs packets. + +DESCRIPTION + See Readme.md + +AUTHOR + Saxon Nelson-Milton + +LICENSE + RtpToTsConverter.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). +*/ + +package tsgenerator