protocol/rtsp: added beatgammits pkg and also my stream/main.go program

Most of this is unmodified except for the stream/main.go program. I am going to use this as a test that simply streams from an RTSP server to vlc.
This commit is contained in:
Saxon 2019-04-16 22:47:13 +09:30
parent 6ee286e988
commit 4a97a626a0
7 changed files with 743 additions and 0 deletions

12
protocol/rtsp/LICENSE.BSD Normal file
View File

@ -0,0 +1,12 @@
Copyright (c) 2015, T. Jameson Little <t.jameson.little@gmail.com>
All rights reserved.
Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

11
protocol/rtsp/README.md Normal file
View File

@ -0,0 +1,11 @@
rtsp
====
`rtsp` implements RTSP in Go. The development focus is for video streaming from security cameras, but the library is developed such that it should be useful for any type of stream.
Currently, `rtp` and `rtcp` are implemented as sub-packages, but this will likely change once the library matures.
License
=======
`rtsp` is licensed under the BSD 3-clause license. See LICENSE.BSD for details.

View File

@ -0,0 +1,54 @@
package main
import (
"flag"
"fmt"
"io"
"log"
"bitbucket.org/ausocean/av/protocol/rtsp"
)
func main() {
rtspServerPtr := flag.String("rtsp-server", "", "The RTSP server we would like to get video from")
clientPortPtr := flag.Uint("port", 6870, "The port on the client we would like to receive RTP on")
trackPtr := flag.String("track", "track1", "The track that we would like to receive media from")
flag.Parse()
sess := rtsp.NewSession()
res, err := sess.Options(*rtspServerPtr)
if err != nil {
log.Fatalln(err)
}
fmt.Println("Options:")
fmt.Println(res)
res, err = sess.Describe(*rtspServerPtr)
if err != nil {
log.Fatalln(err)
}
fmt.Println("Describe:")
fmt.Println(res)
p, err := rtsp.ParseSdp(&io.LimitedReader{R: res.Body, N: res.ContentLength})
if err != nil {
log.Fatalln(err)
}
log.Printf("%+v", p)
res, err = sess.Setup(*rtspServerPtr+"/"+*trackPtr, fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", *clientPortPtr, *clientPortPtr+1))
if err != nil {
log.Fatalln(err)
}
log.Println(res)
res, err = sess.Play(*rtspServerPtr, res.Header.Get("Session"))
if err != nil {
log.Fatalln(err)
}
log.Println(res)
// Send back rtcp
for {
}
}

150
protocol/rtsp/rtp/rtp.go Normal file
View File

@ -0,0 +1,150 @@
package rtp
import (
"fmt"
"net"
)
const (
RTP_VERSION = 2
)
const (
hasRtpPadding = 1 << 2
hasRtpExt = 1 << 3
)
// Packet as per https://tools.ietf.org/html/rfc1889#section-5.1
//
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// |V=2|P|X| CC |M| PT | sequence number |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | timestamp |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | synchronization source (SSRC) identifier |
// +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// | contributing source (CSRC) identifiers |
// | .... |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
type RtpPacket struct {
Version byte
Padding bool
Ext bool
Marker bool
PayloadType byte
SequenceNumber uint
Timestamp uint
SyncSource uint
CSRC []uint
ExtHeader uint
ExtData []byte
Payload []byte
}
type Session struct {
Rtp net.PacketConn
Rtcp net.PacketConn
RtpChan <-chan RtpPacket
RtcpChan <-chan []byte
rtpChan chan<- RtpPacket
rtcpChan chan<- []byte
}
func New(rtp, rtcp net.PacketConn) *Session {
rtpChan := make(chan RtpPacket, 10)
rtcpChan := make(chan []byte, 10)
s := &Session{
Rtp: rtp,
Rtcp: rtcp,
RtpChan: rtpChan,
RtcpChan: rtcpChan,
rtpChan: rtpChan,
rtcpChan: rtcpChan,
}
go s.HandleRtpConn(rtp)
go s.HandleRtcpConn(rtcp)
return s
}
func toUint(arr []byte) (ret uint) {
for i, b := range arr {
ret |= uint(b) << (8 * uint(len(arr)-i-1))
}
return ret
}
func (s *Session) HandleRtpConn(conn net.PacketConn) {
buf := make([]byte, 4096)
for {
n, _, err := conn.ReadFrom(buf)
if err != nil {
panic(err)
}
cpy := make([]byte, n)
copy(cpy, buf)
go s.handleRtp(cpy)
}
}
func (s *Session) HandleRtcpConn(conn net.PacketConn) {
buf := make([]byte, 4096)
for {
n, _, err := conn.ReadFrom(buf)
if err != nil {
panic(err)
}
cpy := make([]byte, n)
copy(cpy, buf)
go s.handleRtcp(cpy)
}
}
func (s *Session) handleRtp(buf []byte) {
packet := RtpPacket{
Version: buf[0] & 0x03,
Padding: buf[0]&hasRtpPadding != 0,
Ext: buf[0]&hasRtpExt != 0,
CSRC: make([]uint, buf[0]>>4),
Marker: buf[1]&1 != 0,
PayloadType: buf[1] >> 1,
SequenceNumber: toUint(buf[2:4]),
Timestamp: toUint(buf[4:8]),
SyncSource: toUint(buf[8:12]),
}
if packet.Version != RTP_VERSION {
fmt.Printf("version: %v\n", packet.Version)
}
i := 12
for j := range packet.CSRC {
packet.CSRC[j] = toUint(buf[i : i+4])
i += 4
}
if packet.Ext {
packet.ExtHeader = toUint(buf[i : i+2])
length := toUint(buf[i+2 : i+4])
i += 4
if length > 0 {
packet.ExtData = buf[i : i+int(length)*4]
i += int(length) * 4
}
}
packet.Payload = buf[i:]
s.rtpChan <- packet
}
func (s *Session) handleRtcp(buf []byte) {
// TODO: implement rtcp
}

View File

@ -0,0 +1,21 @@
package rtp
import (
"testing"
)
func TestToUint(t *testing.T) {
tests := []struct {
arr []byte
exp uint
}{
{[]byte{1, 2}, 0x102},
{[]byte{3, 2, 1, 0}, 0x3020100},
}
for _, tst := range tests {
val := toUint(tst.arr)
if val != tst.exp {
t.Errorf("%d != %d for % x", val, tst.exp, tst.arr)
}
}
}

422
protocol/rtsp/rtsp.go Normal file
View File

@ -0,0 +1,422 @@
package rtsp
import (
"bufio"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"strconv"
"strings"
)
const (
// Client to server for presentation and stream objects; recommended
DESCRIBE = "DESCRIBE"
// Bidirectional for client and stream objects; optional
ANNOUNCE = "ANNOUNCE"
// Bidirectional for client and stream objects; optional
GET_PARAMETER = "GET_PARAMETER"
// Bidirectional for client and stream objects; required for Client to server, optional for server to client
OPTIONS = "OPTIONS"
// Client to server for presentation and stream objects; recommended
PAUSE = "PAUSE"
// Client to server for presentation and stream objects; required
PLAY = "PLAY"
// Client to server for presentation and stream objects; optional
RECORD = "RECORD"
// Server to client for presentation and stream objects; optional
REDIRECT = "REDIRECT"
// Client to server for stream objects; required
SETUP = "SETUP"
// Bidirectional for presentation and stream objects; optional
SET_PARAMETER = "SET_PARAMETER"
// Client to server for presentation and stream objects; required
TEARDOWN = "TEARDOWN"
)
const (
// all requests
Continue = 100
// all requests
OK = 200
// RECORD
Created = 201
// RECORD
LowOnStorageSpace = 250
// all requests
MultipleChoices = 300
// all requests
MovedPermanently = 301
// all requests
MovedTemporarily = 302
// all requests
SeeOther = 303
// all requests
UseProxy = 305
// all requests
BadRequest = 400
// all requests
Unauthorized = 401
// all requests
PaymentRequired = 402
// all requests
Forbidden = 403
// all requests
NotFound = 404
// all requests
MethodNotAllowed = 405
// all requests
NotAcceptable = 406
// all requests
ProxyAuthenticationRequired = 407
// all requests
RequestTimeout = 408
// all requests
Gone = 410
// all requests
LengthRequired = 411
// DESCRIBE, SETUP
PreconditionFailed = 412
// all requests
RequestEntityTooLarge = 413
// all requests
RequestURITooLong = 414
// all requests
UnsupportedMediaType = 415
// SETUP
Invalidparameter = 451
// SETUP
IllegalConferenceIdentifier = 452
// SETUP
NotEnoughBandwidth = 453
// all requests
SessionNotFound = 454
// all requests
MethodNotValidInThisState = 455
// all requests
HeaderFieldNotValid = 456
// PLAY
InvalidRange = 457
// SET_PARAMETER
ParameterIsReadOnly = 458
// all requests
AggregateOperationNotAllowed = 459
// all requests
OnlyAggregateOperationAllowed = 460
// all requests
UnsupportedTransport = 461
// all requests
DestinationUnreachable = 462
// all requests
InternalServerError = 500
// all requests
NotImplemented = 501
// all requests
BadGateway = 502
// all requests
ServiceUnavailable = 503
// all requests
GatewayTimeout = 504
// all requests
RTSPVersionNotSupported = 505
// all requests
OptionNotsupport = 551
)
type ResponseWriter interface {
http.ResponseWriter
}
type Request struct {
Method string
URL *url.URL
Proto string
ProtoMajor int
ProtoMinor int
Header http.Header
ContentLength int
Body io.ReadCloser
}
func (r Request) String() string {
s := fmt.Sprintf("%s %s %s/%d.%d\r\n", r.Method, r.URL, r.Proto, r.ProtoMajor, r.ProtoMinor)
for k, v := range r.Header {
for _, v := range v {
s += fmt.Sprintf("%s: %s\r\n", k, v)
}
}
s += "\r\n"
if r.Body != nil {
str, _ := ioutil.ReadAll(r.Body)
s += string(str)
}
return s
}
func NewRequest(method, urlStr, cSeq string, body io.ReadCloser) (*Request, error) {
u, err := url.Parse(urlStr)
if err != nil {
return nil, err
}
req := &Request{
Method: method,
URL: u,
Proto: "RTSP",
ProtoMajor: 1,
ProtoMinor: 0,
Header: map[string][]string{"CSeq": []string{cSeq}},
Body: body,
}
return req, nil
}
type Session struct {
cSeq int
conn net.Conn
session string
}
func NewSession() *Session {
return &Session{}
}
func (s *Session) nextCSeq() string {
s.cSeq++
return strconv.Itoa(s.cSeq)
}
func (s *Session) Describe(urlStr string) (*Response, error) {
req, err := NewRequest(DESCRIBE, urlStr, s.nextCSeq(), nil)
if err != nil {
panic(err)
}
req.Header.Add("Accept", "application/sdp")
if s.conn == nil {
s.conn, err = net.Dial("tcp", req.URL.Host)
if err != nil {
return nil, err
}
}
_, err = io.WriteString(s.conn, req.String())
if err != nil {
return nil, err
}
return ReadResponse(s.conn)
}
func (s *Session) Options(urlStr string) (*Response, error) {
req, err := NewRequest(OPTIONS, urlStr, s.nextCSeq(), nil)
if err != nil {
panic(err)
}
if s.conn == nil {
s.conn, err = net.Dial("tcp", req.URL.Host)
if err != nil {
return nil, err
}
}
_, err = io.WriteString(s.conn, req.String())
if err != nil {
return nil, err
}
return ReadResponse(s.conn)
}
func (s *Session) Setup(urlStr, transport string) (*Response, error) {
req, err := NewRequest(SETUP, urlStr, s.nextCSeq(), nil)
if err != nil {
panic(err)
}
req.Header.Add("Transport", transport)
if s.conn == nil {
s.conn, err = net.Dial("tcp", req.URL.Host)
if err != nil {
return nil, err
}
}
_, err = io.WriteString(s.conn, req.String())
if err != nil {
return nil, err
}
resp, err := ReadResponse(s.conn)
s.session = resp.Header.Get("Session")
return resp, err
}
func (s *Session) Play(urlStr, sessionId string) (*Response, error) {
req, err := NewRequest(PLAY, urlStr+"/", s.nextCSeq(), nil)
if err != nil {
panic(err)
}
req.Header.Add("Session", sessionId)
if s.conn == nil {
s.conn, err = net.Dial("tcp", req.URL.Host)
if err != nil {
return nil, err
}
}
_, err = io.WriteString(s.conn, req.String())
if err != nil {
return nil, err
}
return ReadResponse(s.conn)
}
type closer struct {
*bufio.Reader
r io.Reader
}
func (c closer) Close() error {
if c.Reader == nil {
return nil
}
defer func() {
c.Reader = nil
c.r = nil
}()
if r, ok := c.r.(io.ReadCloser); ok {
return r.Close()
}
return nil
}
func ParseRTSPVersion(s string) (proto string, major int, minor int, err error) {
parts := strings.SplitN(s, "/", 2)
proto = parts[0]
parts = strings.SplitN(parts[1], ".", 2)
if major, err = strconv.Atoi(parts[0]); err != nil {
return
}
if minor, err = strconv.Atoi(parts[0]); err != nil {
return
}
return
}
// super simple RTSP parser; would be nice if net/http would allow more general parsing
func ReadRequest(r io.Reader) (req *Request, err error) {
req = new(Request)
req.Header = make(map[string][]string)
b := bufio.NewReader(r)
var s string
// TODO: allow CR, LF, or CRLF
if s, err = b.ReadString('\n'); err != nil {
return
}
parts := strings.SplitN(s, " ", 3)
req.Method = parts[0]
if req.URL, err = url.Parse(parts[1]); err != nil {
return
}
req.Proto, req.ProtoMajor, req.ProtoMinor, err = ParseRTSPVersion(parts[2])
if err != nil {
return
}
// read headers
for {
if s, err = b.ReadString('\n'); err != nil {
return
} else if s = strings.TrimRight(s, "\r\n"); s == "" {
break
}
parts := strings.SplitN(s, ":", 2)
req.Header.Add(strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1]))
}
req.ContentLength, _ = strconv.Atoi(req.Header.Get("Content-Length"))
fmt.Println("Content Length:", req.ContentLength)
req.Body = closer{b, r}
return
}
type Response struct {
Proto string
ProtoMajor int
ProtoMinor int
StatusCode int
Status string
ContentLength int64
Header http.Header
Body io.ReadCloser
}
func (res Response) String() string {
s := fmt.Sprintf("%s/%d.%d %d %s\n", res.Proto, res.ProtoMajor, res.ProtoMinor, res.StatusCode, res.Status)
for k, v := range res.Header {
for _, v := range v {
s += fmt.Sprintf("%s: %s\n", k, v)
}
}
return s
}
func ReadResponse(r io.Reader) (res *Response, err error) {
res = new(Response)
res.Header = make(map[string][]string)
b := bufio.NewReader(r)
var s string
// TODO: allow CR, LF, or CRLF
if s, err = b.ReadString('\n'); err != nil {
return
}
parts := strings.SplitN(s, " ", 3)
res.Proto, res.ProtoMajor, res.ProtoMinor, err = ParseRTSPVersion(parts[0])
if err != nil {
return
}
if res.StatusCode, err = strconv.Atoi(parts[1]); err != nil {
return
}
res.Status = strings.TrimSpace(parts[2])
// read headers
for {
if s, err = b.ReadString('\n'); err != nil {
return
} else if s = strings.TrimRight(s, "\r\n"); s == "" {
break
}
parts := strings.SplitN(s, ":", 2)
res.Header.Add(strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1]))
}
res.ContentLength, _ = strconv.ParseInt(res.Header.Get("Content-Length"), 10, 64)
res.Body = closer{b, r}
return
}

73
protocol/rtsp/sdp.go Normal file
View File

@ -0,0 +1,73 @@
package rtsp
import (
"bufio"
"errors"
"io"
"strconv"
"strings"
)
type SessionSection struct {
Version int
Originator string
SessionName string
SessionInformation string
URI string
Email string
Phone string
ConnectionInformation string
BandwidthInformation string
}
func ParseSdp(r io.Reader) (SessionSection, error) {
var packet SessionSection
s := bufio.NewScanner(r)
for s.Scan() {
parts := strings.SplitN(s.Text(), "=", 2)
if len(parts) == 2 {
if len(parts[0]) != 1 {
return packet, errors.New("SDP only allows 1-character variables")
}
switch parts[0] {
// version
case "v":
ver, err := strconv.Atoi(parts[1])
if err != nil {
return packet, err
}
packet.Version = ver
// owner/creator and session identifier
case "o":
// o=<username> <session id> <version> <network type> <address type> <address>
// TODO: parse this
packet.Originator = parts[1]
// session name
case "s":
packet.SessionName = parts[1]
// session information
case "i":
packet.SessionInformation = parts[1]
// URI of description
case "u":
packet.URI = parts[1]
// email address
case "e":
packet.Email = parts[1]
// phone number
case "p":
packet.Phone = parts[1]
// connection information - not required if included in all media
case "c":
// TODO: parse this
packet.ConnectionInformation = parts[1]
// bandwidth information
case "b":
// TODO: parse this
packet.BandwidthInformation = parts[1]
}
}
}
return packet, nil
}