diff --git a/packet/MpegTs.go b/packet/MpegTs.go index 18e0d897..ad1204c3 100644 --- a/packet/MpegTs.go +++ b/packet/MpegTs.go @@ -72,8 +72,7 @@ func boolToByte( in bool ) (out uint8){ return } -// bool to int, shift accordigly, int to string, string binary to int -func (p *MpegTsPacket) toByteSlice() (output [188]byte) { +func (p *MpegTsPacket) ToByteSlice() (output [188]byte) { output[0] = p.SyncByte output[1] = ( boolToByte(p.TEI) << (8-teiIndex%8) ) | ( boolToByte(p.PUSI) << (8-pusiIndex%8) ) | @@ -81,12 +80,12 @@ func (p *MpegTsPacket) toByteSlice() (output [188]byte) { byte(( p.PID & 0xFF00 ) >> 8) output[2] = byte(p.PID & 0x00FF) output[3] = ( p.TSC << 6 ) | ( p.AFC << 4 ) | p.CC - afLen := len(p.AF) - for ii := 4; ii-4 < afLen; ii++ { + for ii := 4; ii-4 < len(p.AF); ii++ { output[ii] = p.AF[ii-4] } - for ii := afLen; ii < packetLength; ii++ { - output[ii] = p.Payload[ii-afLen] + headerSize := packetLength-len(p.Payload) + for ii := headerSize; ii < packetLength; ii++ { + output[ii] = p.Payload[ii-headerSize] } return } diff --git a/packet/Rtp.go b/packet/Rtp.go index 2a5d5786..66905e15 100644 --- a/packet/Rtp.go +++ b/packet/Rtp.go @@ -70,7 +70,7 @@ type Session struct { rtcpChan chan<- []byte } -func New(rtp, rtcp net.PacketConn) *Session { +func NewSession(rtp, rtcp net.PacketConn) *Session { rtpChan := make(chan RtpPacket, 10) rtcpChan := make(chan []byte, 10) s := &Session{ diff --git a/packet/RtpToTsConverter.go b/packet/RtpToTsConverter.go index c52110d1..86441ec9 100644 --- a/packet/RtpToTsConverter.go +++ b/packet/RtpToTsConverter.go @@ -28,8 +28,6 @@ LICENSE package packet -import "fmt" - type RtpToTsConverter interface { Convert() } @@ -46,15 +44,15 @@ type rtpToTsConverter struct { func NewRtpToTsConverter() (c *rtpToTsConverter) { c = new(rtpToTsConverter) - tsChan := make(chan *MpegTsPacket) - rtpChan := make(chan RtpPacket) + tsChan := make(chan *MpegTsPacket,100) + rtpChan := make(chan RtpPacket,100) c.TsChan = tsChan c.RtpChan = rtpChan c.tsChan = tsChan c.rtpChan = rtpChan c.currentTsPacket = new(MpegTsPacket) c.currentCC = 0 - c.payloadByteChan = make(chan byte, 100) + c.payloadByteChan = make(chan byte, 10000) return } @@ -63,14 +61,10 @@ func (c* rtpToTsConverter) Convert() { select{ default: case rtpPacket := <-c.rtpChan: - fmt.Println("here4") for ii := range rtpPacket.Payload { c.payloadByteChan<-rtpPacket.Payload[ii] } - // If we have 156 bytes of payload then we can fill up a mpegts packet - fmt.Println("here5") - if len(c.payloadByteChan) > 156 { - fmt.Println("here6") + for len(c.payloadByteChan) > 156 { c.currentTsPacket = new(MpegTsPacket) c.currentTsPacket.SyncByte = 0x47 c.currentTsPacket.TEI = false @@ -78,11 +72,9 @@ func (c* rtpToTsConverter) Convert() { c.currentTsPacket.Priority = false c.currentTsPacket.PID = 4096 c.currentTsPacket.TSC = 0 - c.currentTsPacket.AFC = 1 // no adaptation field - payload only + c.currentTsPacket.AFC = 1 c.currentTsPacket.Payload = make([]byte, 156) - fmt.Println("here7") for ii:=0; ii < 156; ii++ { - fmt.Println("here8") c.currentTsPacket.Payload[ii] = <-c.payloadByteChan } c.tsChan<-c.currentTsPacket diff --git a/packet/packet_test.go b/packet/packet_test.go new file mode 100644 index 00000000..31811b2d --- /dev/null +++ b/packet/packet_test.go @@ -0,0 +1,101 @@ +package packet + +import ( + //"bytes" + "fmt" + "io" + "log" + "net" + "testing" + + "github.com/beatgammit/rtsp" +) + +const ( + rtpPort = 17300 + rtcpPort = 17319 + rtspUrl = "rtsp://192.168.0.50:8554/CH002.sdp" + rtpUrl = "rtsp://192.168.0.50:8554/CH002.sdp/track1" +) + +/* Let's see if we can connect to an rtsp device then read an rtp stream, +and then convert the rtp packets to mpegts packets and output. */ +func TestMain(t *testing.T) { + sess := rtsp.NewSession() + res, err := sess.Options(rtspUrl) + if err != nil { + t.Errorf("Shouldn't have got error: %v\n",err) + } + fmt.Println("Options:") + fmt.Println(res) + + res, err = sess.Describe(rtspUrl) + if err != nil { + log.Fatalln(err) + t.Errorf("Shouldn't have got error: %v\n", err) + } + fmt.Println("Describe:") + fmt.Println(res) + + p, err := rtsp.ParseSdp(&io.LimitedReader{R: res.Body, N: res.ContentLength}) + if err != nil { + t.Errorf("Shouldn't have got error: %v\n", err) + } + log.Printf("%+v", p) + + fmt.Println("Setting up!") + res, err = sess.Setup(rtpUrl, fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", rtpPort, rtcpPort)) + if err != nil { + t.Errorf("Shouldn't have got error: %v\n", err) + } + log.Println(res) + + fmt.Println("Playing !") + res, err = sess.Play(rtspUrl, res.Header.Get("Session")) + if err != nil { + t.Errorf("Shouldn't have got error: %v\n", err) + } + log.Println(res) + + // create udp connection for rtp stuff + rtpLaddr, err := net.ResolveUDPAddr("udp", "192.168.0.109:17300") + if err != nil { + t.Errorf("Local rtp addr not set! %v\n",err) + } + rtpAddr, err := net.ResolveUDPAddr("udp", "192.168.0.50:17300") + if err != nil { + t.Errorf("Resolving rtp address didn't work! %v\n", err) + } + rtpConn, err := net.DialUDP("udp", rtpLaddr, rtpAddr) + if err != nil { + t.Errorf("Conncection not established! %v\n", err) + } + + // Create udp connection for rtcp stuff + rtcpLaddr, err := net.ResolveUDPAddr("udp", "192.168.0.109:17319") + if err != nil { + t.Errorf("Local RTCP address not resolved! %v\n", err) + } + rtcpAddr, err := net.ResolveUDPAddr("udp", "192.168.0.50:17301") + if err != nil { + t.Errorf("Remote RTCP address not resolved! %v\n", err) + } + rtcpConn, err := net.DialUDP("udp", rtcpLaddr, rtcpAddr) + if err != nil { + t.Errorf("Connection not established! %v\n", err) + } + + // let's create a session that will store useful stuff from the connections + rtpSession := NewSession(rtpConn, rtcpConn) + converter := NewRtpToTsConverter() + go converter.Convert() + for ii:=0;ii<10000000;ii++{ + select{ + default: + case rtpPacket := <-rtpSession.RtpChan: + converter.RtpChan<-rtpPacket + case tsPacket:=<-converter.TsChan: + fmt.Println(tsPacket.ToByteSlice()) + } + } +} diff --git a/revid/repacketTest.go b/revid/repacketTest.go index e8e1f0e6..da001877 100644 --- a/revid/repacketTest.go +++ b/revid/repacketTest.go @@ -88,7 +88,7 @@ func main() { } // let's create a session that will store useful stuff from the connections - rtpSession := packet.New(rtpConn, rtcpConn) + rtpSession := packet.NewSession(rtpConn, rtcpConn) converter := packet.NewRtpToTsConverter() go converter.Convert() for { @@ -96,12 +96,9 @@ func main() { default: case rtpPacket := <-rtpSession.RtpChan: converter.RtpChan<-rtpPacket - fmt.Println("here2") case tsPacket:=<-converter.TsChan: - fmt.Println("here3") - fmt.Println(tsPacket) + fmt.Println(tsPacket.ToByteSlice()) } - } } }