diff --git a/packets/Nal.go b/packets/Nal.go index 690e8ead..add8350a 100644 --- a/packets/Nal.go +++ b/packets/Nal.go @@ -63,14 +63,15 @@ func ParseNALFragment(unit []byte) (u *NALFragment) { u.End = (unit[1] & 0x40) != 0 u.Reserved = (unit[1] & 0x20) != 0 u.FiveNUBs = unit[1] & 0x1F - u.Data = unit[2:] + u.Data = make([]byte,len(unit[2:])) + copy(u.Data[:],unit[2:]) return } func ParseNALSpsPps(unit []byte)(u *NALSpsPps){ u = new(NALSpsPps) u.Data = make([]byte,len(unit)) - u.Data = unit + copy(u.Data[:],unit[:]) return } diff --git a/packets/Rtp.go b/packets/Rtp.go index b4746535..8df41007 100644 --- a/packets/Rtp.go +++ b/packets/Rtp.go @@ -33,7 +33,7 @@ package packets import ( "net" - "os" + "fmt" ) const ( @@ -95,17 +95,22 @@ func toUint(arr []byte) (ret uint) { } func (s *Session) HandleRtpConn(conn net.PacketConn) { - file,_ := os.Create("video") buf := make([]byte, 4096) for { + fmt.Println("handling rtp conn") n, _, err := conn.ReadFrom(buf) + fmt.Printf("n: %v\n",n) + fmt.Println("here1") if err != nil { panic(err) } + fmt.Println("here2") cpy := make([]byte, n) + fmt.Println("here3") copy(cpy, buf) - file.Write(cpy) + fmt.Println("here4") go s.handleRtp(cpy) + fmt.Println("here5") } } @@ -153,7 +158,9 @@ func (s *Session) handleRtp(buf []byte) { } } packet.Payload = buf[i:] + fmt.Println("sending rtp packet") s.rtpChan <- packet + fmt.Println("Sent") } func (s *Session) handleRtcp(buf []byte) { diff --git a/packets/RtpToTsConverter.go b/packets/RtpToTsConverter.go index 0f2845a4..719ad3ed 100644 --- a/packets/RtpToTsConverter.go +++ b/packets/RtpToTsConverter.go @@ -28,6 +28,8 @@ LICENSE package packets +import "fmt" + type RtpToTsConverter interface { Convert() } @@ -35,8 +37,8 @@ type RtpToTsConverter interface { type rtpToTsConverter struct { TsChan <-chan *MpegTsPacket tsChan chan<- *MpegTsPacket - InputChan chan<- *RtpPacket - inputChan <-chan *RtpPacket + InputChan chan<- RtpPacket + inputChan <-chan RtpPacket currentTsPacket *MpegTsPacket payloadByteChan chan byte currentCC byte @@ -47,7 +49,7 @@ func NewRtpToTsConverter() (c *rtpToTsConverter) { tsChan := make(chan *MpegTsPacket,100) c.TsChan = tsChan c.tsChan = tsChan - inputChan := make(chan *RtpPacket,100) + inputChan := make(chan RtpPacket,100) c.InputChan = inputChan c.inputChan = inputChan c.currentCC = 0 @@ -55,27 +57,32 @@ func NewRtpToTsConverter() (c *rtpToTsConverter) { } func (c* rtpToTsConverter) Convert() { - nalUnitChan := make(chan NALUnit, 1000) - pesPktChan := make(chan []byte, 1000) - pesDataChan := make(chan byte, 1000) - payloadByteChan := make(chan byte, 10000) + nalUnitChan := make(chan NALUnit, 10000) + pesPktChan := make(chan []byte, 10000) + pesDataChan := make(chan byte, 15000) + payloadByteChan := make(chan byte, 15000) // Get nal units from incoming rtp for { select { default: case rtpPacket := <-c.inputChan: + fmt.Println("case1") if GetNalType( rtpPacket.Payload ) == 28 { nalUnitChan<-ParseNALFragment(rtpPacket.Payload) } else { nalUnitChan<-ParseNALSpsPps(rtpPacket.Payload) } + fmt.Println("done case1") case nalUnit := <-nalUnitChan: + fmt.Println("case2") nalUnitByteSlice := nalUnit.ToByteSlice() + fmt.Printf("len(nalUnitByteSlice): %v\n", len(nalUnitByteSlice)) for ii := range nalUnitByteSlice { pesDataChan<-nalUnitByteSlice[ii] } if nalFragment, isNALFragment := nalUnit.(*NALFragment); (isNALFragment && nalFragment.End) || !isNALFragment { + fmt.Printf("lenPesDataChan: %v\n",len(pesDataChan)) pesDataChanLen := len(pesDataChan) pesPkt := new(PESPacket) pesPkt.StreamID = 0xE0 @@ -99,11 +106,13 @@ func (c* rtpToTsConverter) Convert() { } pesPktChan<-pesPkt.ToByteSlice() } - + fmt.Println("done case2") case pesPkt := <-pesPktChan: + fmt.Println("case3") for ii:=range pesPkt { payloadByteChan<-pesPkt[ii] } + fmt.Println("done loading") firstPacket:=true for len(payloadByteChan) > 0 { lengthOfByteChan := len(payloadByteChan) @@ -141,6 +150,7 @@ func (c* rtpToTsConverter) Convert() { } c.tsChan<-c.currentTsPacket } + fmt.Println("done case3") } } } diff --git a/packets/packets_test.go b/packets/packets_test.go index d8c231a0..63137f89 100644 --- a/packets/packets_test.go +++ b/packets/packets_test.go @@ -31,15 +31,15 @@ package packets import ( //"bytes" "fmt" - _"io" - _"log" - _"net" + "io" + "log" + "net" "reflect" "testing" "time" "math/rand" - _"github.com/beatgammit/rtsp" + "github.com/beatgammit/rtsp" ) /******************************************************* @@ -54,7 +54,6 @@ const ( /* Let's see if we can connect to an rtsp device then read an rtp stream, and then convert the rtp packets to mpegts packets and output. */ -/* func TestRTSP(t *testing.T) { sess := rtsp.NewSession() res, err := sess.Options(rtspUrl) @@ -146,7 +145,7 @@ func TestRTP(t *testing.T) { fmt.Printf("RTP packet: %v\n", rtpPacket) } } -*/ + /******************************************************* Testing stuff related to the Nal.go file @@ -389,7 +388,7 @@ func TestRtpToTsConverter(t *testing.T){ rtpPacket1.Payload = make([]byte,100) copy(rtpPacket1.Payload[:], nalFragment.ToByteSlice()) fmt.Println(rtpPacket1.Payload) - converter.InputChan<-rtpPacket1 + converter.InputChan<-(*rtpPacket1) // Create second rtp packet rtpPacket2 := new(RtpPacket) rtpPacket2.Version = 2 @@ -417,7 +416,7 @@ func TestRtpToTsConverter(t *testing.T){ } rtpPacket2.Payload = make([]byte,200) copy(rtpPacket2.Payload[:], nalFragment.ToByteSlice()) - converter.InputChan<-rtpPacket2 + converter.InputChan<-(*rtpPacket2) // Create first expected tsPacket afField := make([]byte, 2) diff --git a/revid/revidv2.go b/revid/revidv2.go index a1bae2af..ff8e7b3c 100644 --- a/revid/revidv2.go +++ b/revid/revidv2.go @@ -288,10 +288,14 @@ func input(input string, output string) { converter := packets.NewRtpToTsConverter() go func(){ for{ - converter.InputChan<-<-rtpSession.RtpChan + select { + default: + case aPacket := <-rtpSession.RtpChan: + converter.InputChan<-aPacket + } } - } - go converter.Convert(rtpSession) + }() + go converter.Convert() clipSize := 0 packetCount := 0 now := time.Now() @@ -324,11 +328,11 @@ func input(input string, output string) { ii++ } else { donePSI = true + fmt.Println("getting TS packet") packet := <-converter.TsChan packetByteSlice := packet.ToByteSlice() copy(clip[clipSize:upperBound],packetByteSlice) } - //fmt.Println(clip[clipSize:upperBound]) packetCount++ clipSize += mp2tPacketSize