From b7076fa4daf90d52d0b9466dfa0f9252e28d670a Mon Sep 17 00:00:00 2001 From: Unknown Date: Wed, 10 Jan 2018 14:27:56 +1030 Subject: [PATCH] Finished fixing syntax errors --- camstreamer/camstreamer.go | 104 ++++++++++++++++++++++++++++++++++ h264/H264Writer.go | 62 ++++++++++---------- h264/h264Parser.go | 22 +++---- revid/RevidInstance.go | 106 +++++++++++++++++++++++----------- revid/config.go | 48 ---------------- revid/revid_test.go | 107 +---------------------------------- revid/{tools => }/tsFuncs.go | 0 tools/helpers.go | 1 - tsgenerator/TsGenerator.go | 66 ++++++++++----------- 9 files changed, 253 insertions(+), 263 deletions(-) delete mode 100644 revid/config.go rename revid/{tools => }/tsFuncs.go (100%) diff --git a/camstreamer/camstreamer.go b/camstreamer/camstreamer.go index b0910f41..8a5fda82 100644 --- a/camstreamer/camstreamer.go +++ b/camstreamer/camstreamer.go @@ -63,3 +63,107 @@ func (cs *CamStreamer)Connect()(session *RtpSession, err error){ // let's create a session that will store useful stuff from the connections rtpSession := NewSession(rtpConn, rtcpConn) } + +/******************************************************* +Testing stuff related to connection i.e. rtsp, rtp, rtcp +********************************************************/ +const ( + rtpPort = 17300 + rtcpPort = 17319 + rtspUrl = "rtsp://192.168.0.50:8554/CH002.sdp" + rtpUrl = "rtsp://192.168.0.50:8554/CH002.sdp/track1" +) + +/* Let's see if we can connect to an rtsp device then read an rtp stream, +and then convert the rtp packets to mpegts packets and output. */ +func TestRTSP(t *testing.T) { + sess := rtsp.NewSession() + res, err := sess.Options(rtspUrl) + if err != nil { + t.Errorf("Shouldn't have got error: %v\n", err) + } + + res, err = sess.Describe(rtspUrl) + if err != nil { + log.Fatalln(err) + t.Errorf("Shouldn't have got error: %v\n", err) + } + p, err := rtsp.ParseSdp(&io.LimitedReader{R: res.Body, N: res.ContentLength}) + if err != nil { + t.Errorf("Shouldn't have got error: %v\n", err) + } + log.Printf("%+v", p) + res, err = sess.Setup(rtpUrl, fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", rtpPort, rtcpPort)) + if err != nil { + t.Errorf("Shouldn't have got error: %v\n", err) + } + log.Println(res) + res, err = sess.Play(rtspUrl, res.Header.Get("Session")) + if err != nil { + t.Errorf("Shouldn't have got error: %v\n", err) + } + log.Println(res) +} + +func TestRTP(t *testing.T) { + sess := rtsp.NewSession() + res, err := sess.Options(rtspUrl) + if err != nil { + t.Errorf("Shouldn't have got error: %v\n", err) + } + res, err = sess.Describe(rtspUrl) + if err != nil { + log.Fatalln(err) + t.Errorf("Shouldn't have got error: %v\n", err) + } + p, err := rtsp.ParseSdp(&io.LimitedReader{R: res.Body, N: res.ContentLength}) + if err != nil { + t.Errorf("Shouldn't have got error: %v\n", err) + } + log.Printf("%+v", p) + res, err = sess.Setup(rtpUrl, fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", rtpPort, rtcpPort)) + if err != nil { + t.Errorf("Shouldn't have got error: %v\n", err) + } + log.Println(res) + res, err = sess.Play(rtspUrl, res.Header.Get("Session")) + if err != nil { + t.Errorf("Shouldn't have got error: %v\n", err) + } + log.Println(res) + // create udp connection for rtp stuff + rtpLaddr, err := net.ResolveUDPAddr("udp", "192.168.0.109:17300") + if err != nil { + t.Errorf("Local rtp addr not set! %v\n", err) + } + rtpAddr, err := net.ResolveUDPAddr("udp", "192.168.0.50:17300") + if err != nil { + t.Errorf("Resolving rtp address didn't work! %v\n", err) + } + rtpConn, err := net.DialUDP("udp", rtpLaddr, rtpAddr) + if err != nil { + t.Errorf("Conncection not established! %v\n", err) + } + // Create udp connection for rtcp stuff + rtcpLaddr, err := net.ResolveUDPAddr("udp", "192.168.0.109:17319") + if err != nil { + t.Errorf("Local RTCP address not resolved! %v\n", err) + } + rtcpAddr, err := net.ResolveUDPAddr("udp", "192.168.0.50:17301") + if err != nil { + t.Errorf("Remote RTCP address not resolved! %v\n", err) + } + rtcpConn, err := net.DialUDP("udp", rtcpLaddr, rtcpAddr) + if err != nil { + t.Errorf("Connection not established! %v\n", err) + } + // let's create a session that will store useful stuff from the connections + rtpSession := NewSession(rtpConn, rtcpConn) + time.Sleep(2 * time.Second) + select { + default: + t.Errorf("Should have got rtpPacket!") + case rtpPacket := <-rtpSession.RtpChan: + fmt.Printf("RTP packet: %v\n", rtpPacket) + } +} diff --git a/h264/H264Writer.go b/h264/H264Writer.go index dcc07ec7..3ac308a1 100644 --- a/h264/H264Writer.go +++ b/h264/H264Writer.go @@ -26,11 +26,16 @@ LICENSE along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). */ -package packets +package h264 import ( _"fmt" "os" + + "../mpegts" + "../rtp" + "../tools" + "../itut" ) type RtpToH264Converter interface { @@ -38,23 +43,22 @@ type RtpToH264Converter interface { } type rtpToH264Converter struct { - TsChan <-chan *MpegTsPacket - tsChan chan<- *MpegTsPacket - InputChan chan<- RtpPacket - inputChan <-chan RtpPacket - currentTsPacket *MpegTsPacket + TsChan <-chan *mpegts.MpegTsPacket + tsChan chan<- *mpegts.MpegTsPacket + InputChan chan<- rtp.RtpPacket + inputChan <-chan rtp.RtpPacket + currentTsPacket *mpegts.MpegTsPacket payloadByteChan chan byte currentCC byte } //func parseH264File() - func NewRtpToH264Converter() (c *rtpToH264Converter) { c = new(rtpToH264Converter) - tsChan := make(chan *MpegTsPacket,100) + tsChan := make(chan *mpegts.MpegTsPacket,100) c.TsChan = tsChan c.tsChan = tsChan - inputChan := make(chan RtpPacket,100) + inputChan := make(chan rtp.RtpPacket,100) c.InputChan = inputChan c.inputChan = inputChan c.currentCC = 0 @@ -63,7 +67,7 @@ func NewRtpToH264Converter() (c *rtpToH264Converter) { func (c* rtpToH264Converter) Convert() { file,_ := os.Create("video") - var rtpBuffer [](*RtpPacket) + var rtpBuffer [](*rtp.RtpPacket) for { select { default: @@ -83,7 +87,7 @@ func (c* rtpToH264Converter) Convert() { } if len(rtpBuffer) > 200 { // Discard everything before a type 7 - for GetOctectType(rtpBuffer[0]) != 7 { + for tools.GetOctectType(rtpBuffer[0]) != 7 { rtpBuffer = rtpBuffer[1:] } // get sps @@ -99,26 +103,22 @@ func (c* rtpToH264Converter) Convert() { copy(sei[:],rtpBuffer[0].Payload[:]) rtpBuffer = rtpBuffer[1:] // while we haven't reached the next sps in the buffer - for GetOctectType(rtpBuffer[0]) != 7 { - switch(GetOctectType(rtpBuffer[0])){ + for tools.GetOctectType(rtpBuffer[0]) != 7 { + switch(tools.GetOctectType(rtpBuffer[0])){ case 28: - if GetStartBit(rtpBuffer[0]) == 1{ + if tools.GetStartBit(rtpBuffer[0]) == 1{ var buffer []byte - buffer = append(buffer, []byte{0x00,0x00,0x01}...) - buffer = append(buffer, []byte{0x09,0x10}...) - buffer = append(buffer, []byte{0x00,0x00,0x01}...) - buffer = append(buffer, sps...) - buffer = append(buffer, []byte{0x00,0x00,0x01}...) - buffer = append(buffer, pps...) - buffer = append(buffer, []byte{0x00,0x00,0x01}...) - buffer = append(buffer, sei...) - buffer = append(buffer, []byte{0x00,0x00,0x01}...) + buffer = append(buffer, append(itut.StartCode1(),itut.AUD()...)...) + buffer = append(buffer, append(itut.StartCode1(),sps...)...) + buffer = append(buffer, append(itut.StartCode1(),pps...)...) + buffer = append(buffer, append(itut.StartCode1(),sei...)...) + buffer = append(buffer, itut.StartCode1()...) buffer = append(buffer, rtpBuffer[0].Payload[0] & 0xE0 | rtpBuffer[0].Payload[1] & 0x1F ) buffer = append(buffer, rtpBuffer[0].Payload[2:]...) rtpBuffer = rtpBuffer[1:] for { buffer = append(buffer, rtpBuffer[0].Payload[2:]...) - if getEndBit(rtpBuffer[0]) == 1 { + if tools.GetEndBit(rtpBuffer[0]) == 1 { rtpBuffer = rtpBuffer[1:] file.Write(buffer) break @@ -128,15 +128,11 @@ func (c* rtpToH264Converter) Convert() { } case 1: var buffer []byte - buffer = append(buffer, []byte{0x00,0x00,0x01}...) - buffer = append(buffer, []byte{0x09,0x10}...) - buffer = append(buffer, []byte{0x00,0x00,0x01}...) - buffer = append(buffer, sps...) - buffer = append(buffer, []byte{0x00,0x00,0x01}...) - buffer = append(buffer, pps...) - buffer = append(buffer, []byte{0x00,0x00,0x01}...) - buffer = append(buffer, sei...) - buffer = append(buffer, []byte{0x00,0x00,0x01}...) + buffer = append(buffer, append(itut.StartCode1(), itut.AUD()...)...) + buffer = append(buffer, append(itut.StartCode1(), sps...)...) + buffer = append(buffer, append(itut.StartCode1(),pps...)...) + buffer = append(buffer, append(itut.StartCode1(),sei...)...) + buffer = append(buffer, itut.StartCode1()...) buffer = append(buffer, rtpBuffer[0].Payload[0] & 0xE0 | rtpBuffer[0].Payload[1] & 0x1F ) buffer = append(buffer, rtpBuffer[0].Payload[2:]...) rtpBuffer = rtpBuffer[1:] diff --git a/h264/h264Parser.go b/h264/h264Parser.go index 227da1fe..e7251609 100644 --- a/h264/h264Parser.go +++ b/h264/h264Parser.go @@ -30,25 +30,27 @@ package h264 import ( "../itut" + "reflect" ) -type H264Parser { +type H264Parser struct { inputBuffer []byte isParsing bool + OutputChan chan<- []byte } func (p* H264Parser)SendInputData(someData []byte){ - inputBuffer = append(input, someData) + p.inputBuffer = append(p.inputBuffer, someData...) } func (p* H264Parser)Stop(){ - isParsing = false + p.isParsing = false } -func (p* H264Parser)Parse(outputChan chan<- []byte) { - isParsing = true - buffer = b.InputBuffer - for isParsing { +func (p* H264Parser)Parse() { + p.isParsing = true + buffer := p.inputBuffer + for p.isParsing { for i := 0 ;; i++{ var start bool i, start = func() (int,bool) { @@ -62,13 +64,13 @@ func (p* H264Parser)Parse(outputChan chan<- []byte) { }() if nalType := buffer[i] & 0x1F; start && ( nalType == 1 || nalType == 5) { for ; i < len(buffer) && !(i+3 < len(buffer) && ( reflect.DeepEqual(buffer[i:i+3],itut.StartCode1()) || - reflect.DeepEqual(buffer[i:i+4],startCode2))); i++ {} - outputChan<-append(append(itut.StartCode1(),itut.AUD()),buffer[:i]...) + reflect.DeepEqual(buffer[i:i+4],itut.StartCode2()))); i++ {} + p.OutputChan<-append(append(itut.StartCode1(),itut.AUD()...),buffer[:i]...) buffer = buffer[i:] i=0 } if i >= len(buffer) { - inputBuffer = []byte{} + p.inputBuffer = []byte{} break } } diff --git a/revid/RevidInstance.go b/revid/RevidInstance.go index 6dabe017..31328a66 100644 --- a/revid/RevidInstance.go +++ b/revid/RevidInstance.go @@ -30,34 +30,24 @@ LICENSE package revid import ( - "/tools" "bufio" "bytes" "crypto/md5" - "encoding/binary" "encoding/hex" - "errors" - "flag" "fmt" - "io" "io/ioutil" - "ioutil" "log" - "math/rand" "net" "net/http" "os" "os/exec" - "runtime" "strconv" - "strings" "time" - "bitbucket.org/ausocean/av/ringbuffer" + "../h264" + "../tsgenerator" - "github.com/Comcast/gots/packet" - "github.com/Comcast/gots/packet/adaptationfield" - "github.com/Comcast/gots/psi" + "bitbucket.org/ausocean/av/ringbuffer" ) // defaults and networking consts @@ -76,12 +66,30 @@ const ( motionThreshold = "0.0025" qscale = "3" defaultRaspividCmd = "raspivid -o -" + framesPerSec = 25 + packetsPerFrame = 7 ) +const ( + raspivid = 0 + rtp = 1 + h264Codec = 2 + file = 4 + httpOut = 5 +) + +type Config struct { + Input uint8 + InputCmd string + Output uint8 + OutputFileName string +} + + type RevidInst interface { - Run() + Start() Stop() - ChangeState(newConfig Config) err + ChangeState(newConfig Config) error } type revidInst struct { @@ -95,31 +103,35 @@ type revidInst struct { config Config isRunning bool Error *log.Logger + outputFile *os.File + inputFile *os.File } -func NewRevidInstance(config Config) (r *revid, err error) { - ringBuffer = ringbuffer.NewRingBuffer(bufferSize, mp2tPacketSize*mp2tMaxPackets) - r.Error = log.New(os.Stderr, "ERROR: ", log.Ldat|log.Ltime|log.Lshortfile) +func NewRevidInstance(config Config) (r *revidInst, err error) { + r = new(revidInst) + r.ringBuffer = ringbuffer.NewRingBuffer(bufferSize, mp2tPacketSize*mp2tMaxPackets) + r.Error = log.New(os.Stderr, "ERROR: ", log.Ldate|log.Ltime|log.Lshortfile) r.expectCC = -1 r.dumpCC = -1 r.dumpPCRBase = 0 r.ChangeState(config) - switch r.config.output { + switch r.config.Output { case file: - r.outputFile, err = os.Create(r.outputFileName) + r.outputFile, err = os.Create(r.config.OutputFileName) if err != nil { return nil, err } } + return } -func (r *RevidInst) ChangeState(newConfig Config) error { +func (r *revidInst) ChangeState(newConfig Config) error { // TODO: check that the config is G r.config = newConfig return nil } -func (r *revidInst) Run() { +func (r *revidInst) Start() { r.isRunning = true go r.input() go r.output() @@ -129,15 +141,15 @@ func (r *revidInst) Stop() { r.isRunning = false } -func (r *revid) input() { +func (r *revidInst) input() { generator := tsgenerator.NewTsGenerator(framesPerSec) go generator.Generate() - h264Parser := h264.NewH264Parser(generator.NalInputChan) + h264Parser := h264.H264Parser{OutputChan: generator.NalInputChan} go h264Parser.Parse() - var inputReader *Reader - switch r.config.input { + var inputReader *bufio.Reader + switch r.config.Input { case raspivid: - cmd := exec.Command(raspividCmd) + cmd := exec.Command(r.config.InputCmd) stdout, _ := cmd.StdoutPipe() err := cmd.Start() inputReader = bufio.NewReader(stdout) @@ -152,26 +164,51 @@ func (r *revid) input() { packetCount := 0 now := time.Now() prevTime := now + + startPackets := [][]byte{ + {71,64,17,16,0,66,240,65,0,1,193,0,0,255,1,255,0,1,252,128,48,72,46,1,6,70,70,109,112,101,103,37,115,116,114,101,97,109,101,100,32,98,121,32,116,104,101,32,71,101,111,86,105,115,105,111,110,32,82,116,115,112,32,83,101,114,118,101,114,99,176,214,195,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255}, + {71,64,0,16,0,0,176,13,0,1,193,0,0,0,1,240,0,42,177,4,178,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255}, + /*PMT*/{71,80,0,16, + /*Start of payload*/ + 0,2,176,18,0,1,193,0,0,0xE1,0x00,0xF0,0,0x1B,0xE1,0,0xF0,0,0x15,0xBD,0x4D,0x56,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255}, + } + + donePSI := false + ii:=0 for r.isRunning { h264Data, err := ioutil.ReadAll(inputReader) if err != nil { r.Error.Println(err.Error()) } h264Parser.SendInputData(h264Data) - if clip, err := ringBuffer.Get(); err != nil { + if clip, err := r.ringBuffer.Get(); err != nil { r.Error.Println(err.Error()) return } else { for { upperBound := clipSize + mp2tPacketSize - clip[clipSize:uppderBound] = (<-converter.TsChan).ToByteSlice() + if ii < 3 && !donePSI { + packetByteSlice := startPackets[ii] + copy(clip[clipSize:upperBound],packetByteSlice) + ii++ + } else { + donePSI = true + if err != nil { + fmt.Println(err) + } + byteSlice,err := (<-generator.TsChan).ToByteSlice() + if err != nil { + r.Error.Println(err.Error()) + } + copy(clip[clipSize:upperBound],byteSlice) + } packetCount++ clipSize += mp2tPacketSize // send if (1) our buffer is full or (2) 1 second has elapsed and we have % packetsPerFrame now = time.Now() if (packetCount == mp2tMaxPackets) || (now.Sub(prevTime) > clipDuration*time.Second && packetCount%packetsPerFrame == 0) { - if err := ringBuffer.DoneWriting(clipSize); err != nil { + if err := r.ringBuffer.DoneWriting(clipSize); err != nil { r.Error.Println(err.Error()) return } @@ -185,16 +222,16 @@ func (r *revid) input() { } } -func (r *revidInst) output(output string) { +func (r *revidInst) output() { for r.isRunning { - if clip, err := ringBuffer.Read(); err == nil { - switch r.config.output { + if clip, err := r.ringBuffer.Read(); err == nil { + switch r.config.Output { case file: r.outputFile.Write(clip) default: r.Error.Println("No output?") } - if err := ringBuffer.DoneReading(); err != nil { + if err := r.ringBuffer.DoneReading(); err != nil { r.Error.Println(err.Error()) } } @@ -219,4 +256,5 @@ func sendClipToHTTP(clip []byte, output string, _ net.Conn) error { if err == nil { fmt.Printf("%s\n", body) } + return nil } diff --git a/revid/config.go b/revid/config.go deleted file mode 100644 index 1f7c86d0..00000000 --- a/revid/config.go +++ /dev/null @@ -1,48 +0,0 @@ -/* -NAME - RtpToTsConverter.go - provides utilities for the conversion of Rtp packets - to equivalent MpegTs packets. - -DESCRIPTION - See Readme.md - -AUTHOR - Saxon Nelson-Milton - -LICENSE - RtpToTsConverter.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) - - It is free software: you can redistribute it and/or modify them - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - It is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. - - You should have received a copy of the GNU General Public License - along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). -*/ - -package revid - -const ( - // input types - raspivid = 0 - rtp = 1 - // input format - h264 = 2 - rtp = 3 - // output types - file = 4 - http = 5 -) - -type Config struct { - Input uint8 - InputCmd string - Output uint8 - OutPutFileName string -} diff --git a/revid/revid_test.go b/revid/revid_test.go index 7f68cab9..950f5a86 100644 --- a/revid/revid_test.go +++ b/revid/revid_test.go @@ -31,109 +31,6 @@ package revid import ( "testing" ) -/******************************************************* -Testing stuff related to connection i.e. rtsp, rtp, rtcp -********************************************************/ -const ( - rtpPort = 17300 - rtcpPort = 17319 - rtspUrl = "rtsp://192.168.0.50:8554/CH002.sdp" - rtpUrl = "rtsp://192.168.0.50:8554/CH002.sdp/track1" -) - -/* Let's see if we can connect to an rtsp device then read an rtp stream, -and then convert the rtp packets to mpegts packets and output. */ -func TestRTSP(t *testing.T) { - sess := rtsp.NewSession() - res, err := sess.Options(rtspUrl) - if err != nil { - t.Errorf("Shouldn't have got error: %v\n", err) - } - - res, err = sess.Describe(rtspUrl) - if err != nil { - log.Fatalln(err) - t.Errorf("Shouldn't have got error: %v\n", err) - } - p, err := rtsp.ParseSdp(&io.LimitedReader{R: res.Body, N: res.ContentLength}) - if err != nil { - t.Errorf("Shouldn't have got error: %v\n", err) - } - log.Printf("%+v", p) - res, err = sess.Setup(rtpUrl, fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", rtpPort, rtcpPort)) - if err != nil { - t.Errorf("Shouldn't have got error: %v\n", err) - } - log.Println(res) - res, err = sess.Play(rtspUrl, res.Header.Get("Session")) - if err != nil { - t.Errorf("Shouldn't have got error: %v\n", err) - } - log.Println(res) -} - -func TestRTP(t *testing.T) { - sess := rtsp.NewSession() - res, err := sess.Options(rtspUrl) - if err != nil { - t.Errorf("Shouldn't have got error: %v\n", err) - } - res, err = sess.Describe(rtspUrl) - if err != nil { - log.Fatalln(err) - t.Errorf("Shouldn't have got error: %v\n", err) - } - p, err := rtsp.ParseSdp(&io.LimitedReader{R: res.Body, N: res.ContentLength}) - if err != nil { - t.Errorf("Shouldn't have got error: %v\n", err) - } - log.Printf("%+v", p) - res, err = sess.Setup(rtpUrl, fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", rtpPort, rtcpPort)) - if err != nil { - t.Errorf("Shouldn't have got error: %v\n", err) - } - log.Println(res) - res, err = sess.Play(rtspUrl, res.Header.Get("Session")) - if err != nil { - t.Errorf("Shouldn't have got error: %v\n", err) - } - log.Println(res) - // create udp connection for rtp stuff - rtpLaddr, err := net.ResolveUDPAddr("udp", "192.168.0.109:17300") - if err != nil { - t.Errorf("Local rtp addr not set! %v\n", err) - } - rtpAddr, err := net.ResolveUDPAddr("udp", "192.168.0.50:17300") - if err != nil { - t.Errorf("Resolving rtp address didn't work! %v\n", err) - } - rtpConn, err := net.DialUDP("udp", rtpLaddr, rtpAddr) - if err != nil { - t.Errorf("Conncection not established! %v\n", err) - } - // Create udp connection for rtcp stuff - rtcpLaddr, err := net.ResolveUDPAddr("udp", "192.168.0.109:17319") - if err != nil { - t.Errorf("Local RTCP address not resolved! %v\n", err) - } - rtcpAddr, err := net.ResolveUDPAddr("udp", "192.168.0.50:17301") - if err != nil { - t.Errorf("Remote RTCP address not resolved! %v\n", err) - } - rtcpConn, err := net.DialUDP("udp", rtcpLaddr, rtcpAddr) - if err != nil { - t.Errorf("Connection not established! %v\n", err) - } - // let's create a session that will store useful stuff from the connections - rtpSession := NewSession(rtpConn, rtcpConn) - time.Sleep(2 * time.Second) - select { - default: - t.Errorf("Should have got rtpPacket!") - case rtpPacket := <-rtpSession.RtpChan: - fmt.Printf("RTP packet: %v\n", rtpPacket) - } -} /* Testing use with raspivid @@ -141,13 +38,15 @@ func TestRTP(t *testing.T) { func TestRaspividInput(t *testing.T){ config := Config{ Input: raspivid, + InputCmd: "raspivid -o -" Output: file, + OutputFileName: "output/TestRaspividOutput.ts" } revidInst, err := NewRevidInstance(config) if err != nil { t.Errorf("Should not have got an error!") } - revidInst.Run() + revidInst.Start() time.Sleep(5*time.Second) revidInst.Stop() } diff --git a/revid/tools/tsFuncs.go b/revid/tsFuncs.go similarity index 100% rename from revid/tools/tsFuncs.go rename to revid/tsFuncs.go diff --git a/tools/helpers.go b/tools/helpers.go index fc9cd0a7..ff8160c9 100644 --- a/tools/helpers.go +++ b/tools/helpers.go @@ -31,7 +31,6 @@ package tools import ( _"os" _"fmt" - "reflect" "../rtp" ) diff --git a/tsgenerator/TsGenerator.go b/tsgenerator/TsGenerator.go index 3c92e670..07be9b45 100644 --- a/tsgenerator/TsGenerator.go +++ b/tsgenerator/TsGenerator.go @@ -57,49 +57,49 @@ type tsGenerator struct { isGenerating bool } -func NewTsGenerator(fps uint) (c *tsGenerator) { - c = new(tsCreator) +func NewTsGenerator(fps uint) (g *tsGenerator) { + g = new(tsGenerator) tsChan := make(chan *mpegts.MpegTsPacket, 100) - c.TsChan = tsChan - c.tsChan = tsChan + g.TsChan = tsChan + g.tsChan = tsChan inputChan := make(chan rtp.RtpPacket, 100) - c.InputChan = inputChan - c.inputChan = inputChan + g.InputChan = inputChan + g.inputChan = inputChan nalInputChan := make(chan []byte, 10000) - c.NalInputChan = nalInputChan - c.nalInputChan = nalInputChan - c.currentCC = 0 - c.fps = fps - c.currentPcrTime = .0 - c.currentPtsTime = .7 + g.NalInputChan = nalInputChan + g.nalInputChan = nalInputChan + g.currentCC = 0 + g.fps = fps + g.currentPcrTime = .0 + g.currentPtsTime = .7 return } -func (c* tsgenerator) genPts()(pts uint64){ - pts = uint64(c.currentPtsTime * float64(90000)) - c.currentPtsTime += 1.0/float64(c.fps) +func (g *tsGenerator) genPts()(pts uint64){ + pts = uint64(g.currentPtsTime * float64(90000)) + g.currentPtsTime += 1.0/float64(g.fps) return } -func (c* tsgenerator) genPcr()(pcr uint64){ - pcr = uint64(c.currentPcrTime * float64(90000)) - c.currentPcrTime += 1.0/float64(c.fps) +func (g *tsGenerator) genPcr()(pcr uint64){ + pcr = uint64(g.currentPcrTime * float64(90000)) + g.currentPcrTime += 1.0/float64(g.fps) return } -func (c *tsgenerator) Stop(){ - isGenerating = false +func (g *tsGenerator) Stop(){ + g.isGenerating = false } -func (c *tsgenerator) Generate() { - isGenerating = true +func (g *tsGenerator) Generate() { + g.isGenerating = true pesPktChan := make(chan []byte, 1000) payloadByteChan := make(chan byte, 100000) var rtpBuffer [](*rtp.RtpPacket) - for isGenerating { + for g.isGenerating { select { default: - case rtpPacket := <-c.inputChan: + case rtpPacket := <-g.inputChan: rtpBuffer = append(rtpBuffer, &rtpPacket) if len(rtpBuffer) > 2 { // if there's something weird going on with sequence numbers then @@ -149,7 +149,7 @@ func (c *tsgenerator) Generate() { buffer = append(buffer, rtpBuffer[0].Payload[2:]...) if tools.GetEndBit(rtpBuffer[0]) == 1 { rtpBuffer = rtpBuffer[1:] - c.NalInputChan <- buffer + g.NalInputChan <- buffer break } rtpBuffer = rtpBuffer[1:] @@ -169,16 +169,16 @@ func (c *tsgenerator) Generate() { buffer = append(buffer, rtpBuffer[0].Payload[0]&0xE0|rtpBuffer[0].Payload[1]&0x1F) buffer = append(buffer, rtpBuffer[0].Payload[2:]...) rtpBuffer = rtpBuffer[1:] - c.NalInputChan <- buffer + g.NalInputChan <- buffer default: } } } - case nalUnit := <-c.nalInputChan: + case nalUnit := <-g.nalInputChan: pesPkt := pes.PESPacket{ StreamID: 0xE0, PDI: byte(2), - PTS: c.genPts(), + PTS: g.genPts(), Data: nalUnit, HeaderLength: 5, } @@ -193,19 +193,19 @@ func (c *tsgenerator) Generate() { PUSI: pusi, PID: 256, RAI: pusi, - CC: c.currentCC, + CC: g.currentCC, AFC: byte(3), PCRF: pusi, } pkt.FillPayload(payloadByteChan) if pusi { - pkt.PCR = c.genPcr() + pkt.PCR = g.genPcr() pusi = false } - if c.currentCC++; c.currentCC > 15 { - c.currentCC = 0 + if g.currentCC++; g.currentCC > 15 { + g.currentCC = 0 } - c.tsChan <- &pkt + g.tsChan <- &pkt } } }