From bc7962b2406bbf3cf254d92c7fd8d1ca0e6bb920 Mon Sep 17 00:00:00 2001 From: Unknown Date: Mon, 11 Dec 2017 14:23:06 +1030 Subject: [PATCH] Getting started with repacketisation --- revid/repacketTest.go | 125 +++++++++++++++++++++++++++++++ rtp/rtp.go | 166 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 291 insertions(+) create mode 100644 revid/repacketTest.go create mode 100644 rtp/rtp.go diff --git a/revid/repacketTest.go b/revid/repacketTest.go new file mode 100644 index 00000000..f79de478 --- /dev/null +++ b/revid/repacketTest.go @@ -0,0 +1,125 @@ +package main + +import ( + "bytes" + "flag" + "fmt" + "io" + "log" + "net" + "time" + + "../rtp" + + "github.com/beatgammit/rtsp" +) + +func init() { + flag.Parse() +} + + +const sampleRequest = `OPTIONS rtsp://example.com/media.mp4 RTSP/1.0 +CSeq: 1 +Require: implicit-play +Proxy-Require: gzipped-messages + +` + +const sampleResponse = `RTSP/1.0 200 OK +CSeq: 1 +Public: DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE + +` + +func main() { + if len(flag.Args()) >= 1 { + rtspUrl := flag.Args()[0] + rtpUrl := flag.Args()[1] + + sess := rtsp.NewSession() + res, err := sess.Options(rtspUrl) + if err != nil { + log.Fatalln(err) + } + fmt.Println("Options:") + fmt.Println(res) + + res, err = sess.Describe(rtspUrl) + if err != nil { + log.Fatalln(err) + } + fmt.Println("Describe:") + fmt.Println(res) + + p, err := rtsp.ParseSdp(&io.LimitedReader{R: res.Body, N: res.ContentLength}) + if err != nil { + log.Fatalln(err) + } + log.Printf("%+v", p) + + fmt.Println("Setting up!") + rtpPort, rtcpPort := 17300, 17319 + res, err = sess.Setup(rtpUrl, fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", rtpPort, rtcpPort)) + if err != nil { + log.Fatalln(err) + } + log.Println(res) + + fmt.Println("Playing !") + res, err = sess.Play(rtspUrl, res.Header.Get("Session")) + if err != nil { + log.Fatalln(err) + } + log.Println(res) + + // create udp connection for rtp stuff + rtpLaddr, err := net.ResolveUDPAddr("udp","192.168.0.109:17300") + if err != nil {fmt.Println("Local rtp addr not set!")} + rtpAddr, err := net.ResolveUDPAddr("udp","192.168.0.50:17300") + if err != nil { fmt.Println( "Resolving rtp address didn't work!")} + rtpConn, err := net.DialUDP("udp",rtpLaddr,rtpAddr) + if err != nil {fmt.Println("Rtp dial didn't work!")} + + + // Create udp connection for rtcp stuff + rtcpLaddr, err := net.ResolveUDPAddr("udp","192.168.0.109:17319") + if err != nil {fmt.Println("Local ")} + rtcpAddr, err := net.ResolveUDPAddr("udp","192.168.0.50:17301") + if err != nil {fmt.Println("resolving rtcp address didn't work!")} + rtcpConn, err := net.DialUDP("udp",rtcpLaddr, rtcpAddr) + if err != nil {fmt.Println("Rtcp dial didnt't work!")} + + + // let's create a session that will store useful stuff from the connections + rtpSession := rtp.New(rtpConn,rtcpConn) + + // Loop here until we get something in the channels + for{ + fmt.Printf("Length of rtpChan: %v\n", len(rtpSession.RtpChan)) + time.Sleep(1*time.Second ) + select{ + case rtpPacket := <-rtpSession.RtpChan: + fmt.Println(rtpPacket) + default: + } + + } + + + } else { + r, err := rtsp.ReadRequest(bytes.NewBufferString(sampleRequest)) + if err != nil { + fmt.Println(err) + } else { + fmt.Println(r) + } + + res, err := rtsp.ReadResponse(bytes.NewBufferString(sampleResponse)) + if err != nil { + fmt.Println(err) + } else { + fmt.Println(res) + } + } +} diff --git a/rtp/rtp.go b/rtp/rtp.go new file mode 100644 index 00000000..c7d8ce8d --- /dev/null +++ b/rtp/rtp.go @@ -0,0 +1,166 @@ +/* +Copyright (c) 2015, T. Jameson Little +All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this +list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, +this list of conditions and the following disclaimer in the documentation and/or +other materials provided with the distribution. + +3. Neither the name of the copyright holder nor the names of its contributors +may be used to endorse or promote products derived from this software without +specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. +IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, +INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + +LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE +OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED +OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + +package rtp + +import ( + "net" + "fmt" +) + +const ( + RTP_VERSION = 2 +) + +const ( + hasRtpPadding = 1 << 5 + hasRtpExt = 1 << 4 +) + +type RtpPacket struct { + Version byte + Padding bool + Ext bool + Marker bool + PayloadType byte + SequenceNumber uint + Timestamp uint + SyncSource uint + CSRC []uint + ExtHeader uint + ExtData []byte + Payload []byte +} + +type Session struct { + Rtp net.PacketConn + Rtcp net.PacketConn + RtpChan <-chan RtpPacket + RtcpChan <-chan []byte + rtpChan chan<- RtpPacket + rtcpChan chan<- []byte +} + +func New(rtp, rtcp net.PacketConn) *Session { + rtpChan := make(chan RtpPacket, 10) + rtcpChan := make(chan []byte, 10) + s := &Session{ + Rtp: rtp, + Rtcp: rtcp, + RtpChan: rtpChan, + RtcpChan: rtcpChan, + rtpChan: rtpChan, + rtcpChan: rtcpChan, + } + go s.HandleRtpConn(rtp) + go s.HandleRtcpConn(rtcp) + return s +} + +func toUint(arr []byte) (ret uint) { + for i, b := range arr { + ret |= uint(b) << (8 * uint(len(arr)-i-1)) + } + return ret +} + +func (s *Session) HandleRtpConn(conn net.PacketConn) { + buf := make([]byte, 4096) + for { + n, _, err := conn.ReadFrom(buf) + if err != nil { + panic(err) + } + + cpy := make([]byte, n) + copy(cpy, buf) + go s.handleRtp(cpy) + } +} + +func (s *Session) HandleRtcpConn(conn net.PacketConn) { + buf := make([]byte, 4096) + for { + n, _, err := conn.ReadFrom(buf) + if err != nil { + panic(err) + } + cpy := make([]byte, n) + copy(cpy, buf) + go s.handleRtcp(cpy) + } +} + +func (s *Session) handleRtp(buf []byte) { + fmt.Println(buf) + packet := RtpPacket{ + Version: (buf[0] & 0xC0)>>6, + Padding: buf[0]&hasRtpPadding != 0, + Ext: buf[0]&hasRtpExt != 0, + Marker: buf[1]&1 != 0, + PayloadType: buf[1] >> 1, + SequenceNumber: toUint(buf[2:4]), + Timestamp: toUint(buf[4:8]), + SyncSource: toUint(buf[8:12]), + CSRC: make([]uint, buf[0]>>4), + } + + if packet.Version != RTP_VERSION { + fmt.Printf("Packet version: %v", packet.Version) + panic("Unsupported version") + } + + + i := 12 + + for j := range packet.CSRC { + packet.CSRC[j] = toUint(buf[i : i+4]) + i += 4 + } + + if packet.Ext { + packet.ExtHeader = toUint(buf[i : i+2]) + length := toUint(buf[i+2 : i+4]) + i += 4 + if length > 0 { + packet.ExtData = buf[i : i+int(length)*4] + i += int(length) * 4 + } + } + + packet.Payload = buf[i:] + + s.rtpChan <- packet +} + +func (s *Session) handleRtcp(buf []byte) { + // TODO: implement rtcp +}