Merged in revid-rtsp-camera (pull request #200)

revid: RTSP Camera Input

Approved-by: Alan Noble <anoble@gmail.com>
This commit is contained in:
Saxon Milton 2019-05-27 05:16:59 +00:00
commit 4b74ea3291
18 changed files with 343 additions and 206 deletions

View File

@ -105,7 +105,8 @@ func handleFlags() revid.Config {
var ( var (
cpuprofile = flag.String("cpuprofile", "", "write cpu profile to `file`") cpuprofile = flag.String("cpuprofile", "", "write cpu profile to `file`")
inputPtr = flag.String("Input", "", "The input type: Raspivid, File, Webcam") inputPtr = flag.String("Input", "", "The input type: Raspivid, File, Webcam, RTSP")
rtspURLPtr = flag.String("RTSPURL", "", "The URL for an RTSP server.")
inputCodecPtr = flag.String("InputCodec", "", "The codec of the input: H264, Mjpeg") inputCodecPtr = flag.String("InputCodec", "", "The codec of the input: H264, Mjpeg")
rtmpMethodPtr = flag.String("RtmpMethod", "", "The method used to send over rtmp: Ffmpeg, Librtmp") rtmpMethodPtr = flag.String("RtmpMethod", "", "The method used to send over rtmp: Ffmpeg, Librtmp")
quantizePtr = flag.Bool("Quantize", false, "Quantize input (non-variable bitrate)") quantizePtr = flag.Bool("Quantize", false, "Quantize input (non-variable bitrate)")
@ -180,6 +181,8 @@ func handleFlags() revid.Config {
cfg.Input = revid.V4L cfg.Input = revid.V4L
case "File": case "File":
cfg.Input = revid.File cfg.Input = revid.File
case "RTSP":
cfg.Input = revid.RTSP
case "": case "":
default: default:
log.Log(logger.Error, pkg+"bad input argument") log.Log(logger.Error, pkg+"bad input argument")
@ -223,6 +226,7 @@ func handleFlags() revid.Config {
netsender.ConfigFile = *configFilePtr netsender.ConfigFile = *configFilePtr
} }
cfg.RTSPURL = *rtspURLPtr
cfg.Quantize = *quantizePtr cfg.Quantize = *quantizePtr
cfg.Rotation = *rotationPtr cfg.Rotation = *rotationPtr
cfg.FlipHorizontal = *horizontalFlipPtr cfg.FlipHorizontal = *horizontalFlipPtr

View File

@ -63,7 +63,8 @@ type Lexer struct {
func NewLexer(donl bool) *Lexer { func NewLexer(donl bool) *Lexer {
return &Lexer{ return &Lexer{
donl: donl, donl: donl,
buf: bytes.NewBuffer(make([]byte, 0, maxAUSize))} buf: bytes.NewBuffer(make([]byte, 0, maxAUSize)),
}
} }
// Lex continually reads RTP packets from the io.Reader src and lexes into // Lex continually reads RTP packets from the io.Reader src and lexes into
@ -86,7 +87,6 @@ func (l *Lexer) Lex(dst io.Writer, src io.Reader, delay time.Duration) error {
if err != nil { if err != nil {
return fmt.Errorf("could not get rtp payload, failed with err: %v\n", err) return fmt.Errorf("could not get rtp payload, failed with err: %v\n", err)
} }
nalType := (payload[0] >> 1) & 0x3f nalType := (payload[0] >> 1) & 0x3f
// If not currently fragmented then we ignore current write. // If not currently fragmented then we ignore current write.
@ -151,10 +151,21 @@ func (l *Lexer) handleFragmentation(d []byte) {
start := d[2]&0x80 != 0 start := d[2]&0x80 != 0
end := d[2]&0x40 != 0 end := d[2]&0x40 != 0
b1 := (d[0] & 0x81) | ((d[2] & 0x3f) << 1)
b2 := d[1]
if start {
d = d[1:]
if l.donl {
d = d[2:]
}
d[0] = b1
d[1] = b2
} else {
d = d[3:] d = d[3:]
if l.donl { if l.donl {
d = d[2:] d = d[2:]
} }
}
switch { switch {
case start && !end: case start && !end:

View File

@ -129,7 +129,7 @@ func TestLex(t *testing.T) {
0x01, 0x02, 0x03, 0x04, // NAL data. 0x01, 0x02, 0x03, 0x04, // NAL data.
// NAL 2 // NAL 2
0x00, 0x00, 0x00, 0x01, // Start code. 0x00, 0x00, 0x00, 0x01, // Start code.
0x01, 0x02, 0x03, // FU payload. 0x00, 0x00, 0x01, 0x02, 0x03, // FU payload.
0x04, 0x05, 0x06, // FU payload. 0x04, 0x05, 0x06, // FU payload.
0x07, 0x08, 0x09, // FU payload. 0x07, 0x08, 0x09, // FU payload.
// NAL 3 // NAL 3
@ -216,7 +216,7 @@ func TestLex(t *testing.T) {
0x01, 0x02, 0x03, 0x04, // NAL data. 0x01, 0x02, 0x03, 0x04, // NAL data.
// NAL 2 // NAL 2
0x00, 0x00, 0x00, 0x01, // Start code. 0x00, 0x00, 0x00, 0x01, // Start code.
0x01, 0x02, 0x03, // FU payload. 0x00, 0x00, 0x01, 0x02, 0x03, // FU payload.
0x04, 0x05, 0x06, // FU payload. 0x04, 0x05, 0x06, // FU payload.
0x07, 0x08, 0x09, // FU payload. 0x07, 0x08, 0x09, // FU payload.
// NAL 3 // NAL 3

View File

@ -55,30 +55,6 @@ var (
}, },
}, },
} }
// standardPmt is a minimal PMT, without descriptors for time and location.
standardPmt = psi.PSI{
Pf: 0x00,
Tid: 0x02,
Ssi: true,
Sl: 0x12,
Tss: &psi.TSS{
Tide: 0x01,
V: 0,
Cni: true,
Sn: 0,
Lsn: 0,
Sd: &psi.PMT{
Pcrpid: 0x0100,
Pil: 0,
Essd: &psi.ESSD{
St: 0x1b,
Epid: 0x0100,
Esil: 0x00,
},
},
},
}
) )
const ( const (
@ -94,7 +70,7 @@ var Meta *meta.Data
var ( var (
patTable = standardPat.Bytes() patTable = standardPat.Bytes()
pmtTable = standardPmt.Bytes() pmtTable []byte
) )
const ( const (
@ -103,15 +79,16 @@ const (
pmtPid = 4096 pmtPid = 4096
videoPid = 256 videoPid = 256
audioPid = 210 audioPid = 210
videoStreamID = 0xe0 // First video stream ID. H264ID = 27
H265ID = 36
audioStreamID = 0xc0 // First audio stream ID. audioStreamID = 0xc0 // First audio stream ID.
) )
// Video and Audio constants are used to communicate which media type will be encoded when creating a // Constants used to communicate which media codec will be packetized.
// new encoder with NewEncoder.
const ( const (
Video = iota EncodeH264 = iota
Audio EncodeH265
EncodeAudio
) )
// Time-related constants. // Time-related constants.
@ -155,14 +132,41 @@ func NewEncoder(dst io.WriteCloser, rate float64, mediaType int) *Encoder {
var mPid int var mPid int
var sid byte var sid byte
switch mediaType { switch mediaType {
case Audio: case EncodeAudio:
mPid = audioPid mPid = audioPid
sid = audioStreamID sid = audioStreamID
case Video: case EncodeH265:
mPid = videoPid mPid = videoPid
sid = videoStreamID sid = H265ID
case EncodeH264:
mPid = videoPid
sid = H264ID
} }
// standardPmt is a minimal PMT, without descriptors for metadata.
pmtTable = (&psi.PSI{
Pf: 0x00,
Tid: 0x02,
Ssi: true,
Sl: 0x12,
Tss: &psi.TSS{
Tide: 0x01,
V: 0,
Cni: true,
Sn: 0,
Lsn: 0,
Sd: &psi.PMT{
Pcrpid: 0x0100,
Pil: 0,
Essd: &psi.ESSD{
St: byte(sid),
Epid: 0x0100,
Esil: 0x00,
},
},
},
}).Bytes()
return &Encoder{ return &Encoder{
dst: dst, dst: dst,

View File

@ -99,7 +99,7 @@ func TestEncodeVideo(t *testing.T) {
// Create the dst and write the test data to encoder. // Create the dst and write the test data to encoder.
dst := &destination{} dst := &destination{}
_, err := NewEncoder(nopCloser{dst}, 25, Video).Write(data) _, err := NewEncoder(nopCloser{dst}, 25, EncodeH264).Write(data)
if err != nil { if err != nil {
t.Fatalf("could not write data to encoder, failed with err: %v\n", err) t.Fatalf("could not write data to encoder, failed with err: %v\n", err)
} }
@ -158,7 +158,7 @@ func TestEncodePcm(t *testing.T) {
sampleSize := 2 sampleSize := 2
blockSize := 16000 blockSize := 16000
writeFreq := float64(sampleRate*sampleSize) / float64(blockSize) writeFreq := float64(sampleRate*sampleSize) / float64(blockSize)
e := NewEncoder(nopCloser{&buf}, writeFreq, Audio) e := NewEncoder(nopCloser{&buf}, writeFreq, EncodeAudio)
inPath := "../../../test/test-data/av/input/sweep_400Hz_20000Hz_-3dBFS_5s_48khz.pcm" inPath := "../../../test/test-data/av/input/sweep_400Hz_20000Hz_-3dBFS_5s_48khz.pcm"
inPcm, err := ioutil.ReadFile(inPath) inPcm, err := ioutil.ReadFile(inPath)

View File

@ -48,7 +48,7 @@ const fps = 25
func TestMetaEncode1(t *testing.T) { func TestMetaEncode1(t *testing.T) {
Meta = meta.New() Meta = meta.New()
var buf bytes.Buffer var buf bytes.Buffer
e := NewEncoder(nopCloser{&buf}, fps, Video) e := NewEncoder(nopCloser{&buf}, fps, EncodeH264)
Meta.Add("ts", "12345678") Meta.Add("ts", "12345678")
if err := e.writePSI(); err != nil { if err := e.writePSI(); err != nil {
t.Errorf(errUnexpectedErr, err.Error()) t.Errorf(errUnexpectedErr, err.Error())
@ -76,7 +76,7 @@ func TestMetaEncode1(t *testing.T) {
func TestMetaEncode2(t *testing.T) { func TestMetaEncode2(t *testing.T) {
Meta = meta.New() Meta = meta.New()
var buf bytes.Buffer var buf bytes.Buffer
e := NewEncoder(nopCloser{&buf}, fps, Video) e := NewEncoder(nopCloser{&buf}, fps, EncodeH264)
Meta.Add("ts", "12345678") Meta.Add("ts", "12345678")
Meta.Add("loc", "1234,4321,1234") Meta.Add("loc", "1234,4321,1234")
if err := e.writePSI(); err != nil { if err := e.writePSI(); err != nil {

View File

@ -126,7 +126,7 @@ func writePSI(b *bytes.Buffer) error {
func writeFrame(b *bytes.Buffer, frame []byte, pts uint64) error { func writeFrame(b *bytes.Buffer, frame []byte, pts uint64) error {
// Prepare PES data. // Prepare PES data.
pesPkt := pes.Packet{ pesPkt := pes.Packet{
StreamID: videoStreamID, StreamID: H264ID,
PDI: hasPTS, PDI: hasPTS,
PTS: pts, PTS: pts,
Data: frame, Data: frame,

View File

@ -37,12 +37,14 @@ import (
"sync" "sync"
"time" "time"
"bitbucket.org/ausocean/av/protocol/rtp"
"bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/logger"
) )
const ( const (
senderSSRC = 1 // Any non-zero value will do. clientSSRC = 1 // Any non-zero value will do.
defaultClientName = "Client" defaultClientName = "Client"
defaultSendInterval = 2 * time.Second
delayUnit = 1.0 / 65536.0 delayUnit = 1.0 / 65536.0
pkg = "rtcp: " pkg = "rtcp: "
rtcpVer = 2 rtcpVer = 2
@ -70,22 +72,17 @@ type Client struct {
wg sync.WaitGroup // This is used to wait for send and recv routines to stop when Client is stopped. wg sync.WaitGroup // This is used to wait for send and recv routines to stop when Client is stopped.
quit chan struct{} // Channel used to communicate quit signal to send and recv routines. quit chan struct{} // Channel used to communicate quit signal to send and recv routines.
log Log // Used to log any messages. log Log // Used to log any messages.
rtpClt *rtp.Client
err chan error // Client will send any errors through this chan. Can be accessed by Err(). err chan error // Client will send any errors through this chan. Can be accessed by Err().
} }
// NewClient returns a pointer to a new Client. // NewClient returns a pointer to a new Client.
func NewClient(clientAddress, serverAddress, name string, sendInterval time.Duration, rtpSSRC uint32, l Log) (*Client, error) { func NewClient(clientAddress, serverAddress string, rtpClt *rtp.Client, l Log) (*Client, error) {
if name == "" {
name = defaultClientName
}
c := &Client{ c := &Client{
name: name, name: defaultClientName,
err: make(chan error),
quit: make(chan struct{}), quit: make(chan struct{}),
interval: sendInterval, interval: defaultSendInterval,
sourceSSRC: rtpSSRC, rtpClt: rtpClt,
log: l, log: l,
} }
@ -107,11 +104,23 @@ func NewClient(clientAddress, serverAddress, name string, sendInterval time.Dura
return c, nil return c, nil
} }
// SetSendInterval sets a custom receiver report send interval (default is 5 seconds.)
func (c *Client) SetSendInterval(d time.Duration) {
c.interval = d
}
// SetName sets a custom client name for use in receiver report source description.
// Default is "Client".
func (c *Client) SetName(name string) {
c.name = name
}
// Start starts the listen and send routines. This will start the process of // Start starts the listen and send routines. This will start the process of
// receiving and parsing sender reports, and the process of sending receiver // receiving and parsing sender reports, and the process of sending receiver
// reports to the server. // reports to the server.
func (c *Client) Start() { func (c *Client) Start() {
c.log(logger.Debug, pkg+"Client is starting") c.log(logger.Debug, pkg+"Client is starting")
c.err = make(chan error)
c.wg.Add(2) c.wg.Add(2)
go c.recv() go c.recv()
go c.send() go c.send()
@ -124,6 +133,7 @@ func (c *Client) Stop() {
close(c.quit) close(c.quit)
c.conn.Close() c.conn.Close()
c.wg.Wait() c.wg.Wait()
close(c.err)
} }
// Err provides read access to the Client err channel. This must be checked // Err provides read access to the Client err channel. This must be checked
@ -171,13 +181,13 @@ func (c *Client) send() {
ReportCount: 1, ReportCount: 1,
Type: typeReceiverReport, Type: typeReceiverReport,
}, },
SenderSSRC: senderSSRC, SenderSSRC: clientSSRC,
Blocks: []ReportBlock{ Blocks: []ReportBlock{
ReportBlock{ ReportBlock{
SourceIdentifier: c.sourceSSRC, SourceIdentifier: c.rtpClt.SSRC(),
FractionLost: 0, FractionLost: 0,
PacketsLost: math.MaxUint32, PacketsLost: math.MaxUint32,
HighestSequence: c.sequence(), HighestSequence: uint32((c.rtpClt.Cycles() << 16) | c.rtpClt.Sequence()),
Jitter: c.jitter(), Jitter: c.jitter(),
SenderReportTs: c.lastSenderTs(), SenderReportTs: c.lastSenderTs(),
SenderReportDelay: c.delay(), SenderReportDelay: c.delay(),
@ -195,7 +205,7 @@ func (c *Client) send() {
}, },
Chunks: []Chunk{ Chunks: []Chunk{
Chunk{ Chunk{
SSRC: senderSSRC, SSRC: clientSSRC,
Items: []SDESItem{ Items: []SDESItem{
SDESItem{ SDESItem{
Type: typeCName, Type: typeCName,
@ -238,22 +248,6 @@ func (c *Client) parse(buf []byte) {
c.setSenderTs(t) c.setSenderTs(t)
} }
// SetSequence will allow updating of the highest sequence number received
// through an RTP stream.
func (c *Client) SetSequence(s uint32) {
c.mu.Lock()
c.seq = s
c.mu.Unlock()
}
// sequence will return the highest sequence number received through RTP.
func (c *Client) sequence() uint32 {
c.mu.Lock()
s := c.seq
c.mu.Unlock()
return s
}
// jitter returns the interarrival jitter as described by RTCP specifications: // jitter returns the interarrival jitter as described by RTCP specifications:
// https://tools.ietf.org/html/rfc3550 // https://tools.ietf.org/html/rfc3550
// TODO(saxon): complete this. // TODO(saxon): complete this.

View File

@ -37,6 +37,7 @@ import (
"testing" "testing"
"time" "time"
"bitbucket.org/ausocean/av/protocol/rtp"
"bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/logger"
) )
@ -148,12 +149,15 @@ func (dl *dummyLogger) log(lvl int8, msg string, args ...interface{}) {
// respond with sender reports. // respond with sender reports.
func TestReceiveAndSend(t *testing.T) { func TestReceiveAndSend(t *testing.T) {
const clientAddr, serverAddr = "localhost:8000", "localhost:8001" const clientAddr, serverAddr = "localhost:8000", "localhost:8001"
rtpClt, err := rtp.NewClient("localhost:8002")
if err != nil {
t.Fatalf("unexpected error when creating RTP client: %v", err)
}
c, err := NewClient( c, err := NewClient(
clientAddr, clientAddr,
serverAddr, serverAddr,
"testClient", rtpClt,
10*time.Millisecond,
12345,
(*dummyLogger)(t).log, (*dummyLogger)(t).log,
) )
if err != nil { if err != nil {
@ -162,14 +166,14 @@ func TestReceiveAndSend(t *testing.T) {
go func() { go func() {
for { for {
select { err, ok := <-c.Err()
case err := <-c.Err(): if ok {
const errConnClosed = "use of closed network connection" const errConnClosed = "use of closed network connection"
if !strings.Contains(err.Error(), errConnClosed) { if !strings.Contains(err.Error(), errConnClosed) {
t.Fatalf("error received from client error chan: %v\n", err) t.Fatalf("error received from client error chan: %v\n", err)
} }
} else {
default: return
} }
} }
}() }()
@ -197,8 +201,6 @@ func TestReceiveAndSend(t *testing.T) {
n, _, _ := conn.ReadFromUDP(buf) n, _, _ := conn.ReadFromUDP(buf)
t.Logf("SERVER: receiver report received: \n%v\n", buf[:n]) t.Logf("SERVER: receiver report received: \n%v\n", buf[:n])
c.SetSequence(uint32(i))
now := time.Now().Second() now := time.Now().Second()
var time [8]byte var time [8]byte
binary.BigEndian.PutUint64(time[:], uint64(now)) binary.BigEndian.PutUint64(time[:], uint64(now))

View File

@ -38,9 +38,9 @@ type Timestamp struct {
Fraction uint32 Fraction uint32
} }
// Timestamp gets the timestamp from a receiver report and returns it as the most // ParseTimestamp gets the timestamp from a receiver report and returns it as
// significant word, and the least significant word. If the given bytes do not // a Timestamp as defined above. If the given bytes do not represent a valid
// represent a valid receiver report, an error is returned. // receiver report, an error is returned.
func ParseTimestamp(buf []byte) (Timestamp, error) { func ParseTimestamp(buf []byte) (Timestamp, error) {
if len(buf) < 4 { if len(buf) < 4 {
return Timestamp{}, errors.New("bad RTCP packet, not of sufficient length") return Timestamp{}, errors.New("bad RTCP packet, not of sufficient length")

View File

@ -29,12 +29,17 @@ package rtp
import ( import (
"net" "net"
"sync"
) )
// Client describes an RTP client that can receive an RTP stream and implements // Client describes an RTP client that can receive an RTP stream and implements
// io.Reader. // io.Reader.
type Client struct { type Client struct {
r *PacketReader r *PacketReader
ssrc uint32
mu sync.Mutex
sequence uint16
cycles uint16
} }
// NewClient returns a pointer to a new Client. // NewClient returns a pointer to a new Client.
@ -53,13 +58,52 @@ func NewClient(addr string) (*Client, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return c, nil return c, nil
} }
// SSRC returns the identified for the source from which the RTP packets being
// received are coming from.
func (c *Client) SSRC() uint32 {
return c.ssrc
}
// Read implements io.Reader. // Read implements io.Reader.
func (c *Client) Read(p []byte) (int, error) { func (c *Client) Read(p []byte) (int, error) {
return c.r.Read(p) n, err := c.r.Read(p)
if err != nil {
return n, err
}
if c.ssrc == 0 {
c.ssrc, _ = SSRC(p[:n])
}
s, _ := Sequence(p[:n])
c.setSequence(s)
return n, err
}
// setSequence sets the most recently received sequence number, and updates the
// cycles count if the sequence number has rolled over.
func (c *Client) setSequence(s uint16) {
c.mu.Lock()
if s < c.sequence {
c.cycles++
}
c.sequence = s
c.mu.Unlock()
}
// Sequence returns the most recent RTP packet sequence number received.
func (c *Client) Sequence() uint16 {
c.mu.Lock()
defer c.mu.Unlock()
return c.sequence
}
// Cycles returns the number of RTP sequence number cycles that have been received.
func (c *Client) Cycles() uint16 {
c.mu.Lock()
defer c.mu.Unlock()
return c.cycles
} }
// PacketReader provides an io.Reader interface to an underlying UDP PacketConn. // PacketReader provides an io.Reader interface to an underlying UDP PacketConn.

View File

@ -50,11 +50,9 @@ func Marker(d []byte) (bool, error) {
// Payload returns the payload from an RTP packet provided the version is // Payload returns the payload from an RTP packet provided the version is
// compatible, otherwise an error is returned. // compatible, otherwise an error is returned.
func Payload(d []byte) ([]byte, error) { func Payload(d []byte) ([]byte, error) {
if len(d) < defaultHeadSize { err := checkPacket(d)
panic("invalid RTP packet length") if err != nil {
} return nil, err
if version(d) != rtpVer {
return nil, errors.New(badVer)
} }
extLen := 0 extLen := 0
if hasExt(d) { if hasExt(d) {
@ -64,6 +62,38 @@ func Payload(d []byte) ([]byte, error) {
return d[payloadIdx:], nil return d[payloadIdx:], nil
} }
// SSRC returns the source identifier from an RTP packet. An error is return if
// the packet is not valid.
func SSRC(d []byte) (uint32, error) {
err := checkPacket(d)
if err != nil {
return 0, err
}
return binary.BigEndian.Uint32(d[8:]), nil
}
// Sequence returns the sequence number of an RTP packet. An error is returned
// if the packet is not valid.
func Sequence(d []byte) (uint16, error) {
err := checkPacket(d)
if err != nil {
return 0, err
}
return binary.BigEndian.Uint16(d[2:]), nil
}
// checkPacket checks the validity of the packet, firstly by checking size and
// then also checking that version is compatible with these utilities.
func checkPacket(d []byte) error {
if len(d) < defaultHeadSize {
return errors.New("invalid RTP packet length")
}
if version(d) != rtpVer {
return errors.New(badVer)
}
return nil
}
// hasExt returns true if an extension is present in the RTP packet. // hasExt returns true if an extension is present in the RTP packet.
func hasExt(d []byte) bool { func hasExt(d []byte) bool {
return (d[0] & 0x10 >> 4) == 1 return (d[0] & 0x10 >> 4) == 1

View File

@ -43,20 +43,27 @@ type Client struct {
sessionID string sessionID string
} }
// NewClient returns a pointer to a new Client. The address addr will be parsed and // NewClient returns a pointer to a new Client and the local address of the
// a connection to the RTSP server will be made. // RTSP connection. The address addr will be parsed and a connection to the
func NewClient(addr string) (*Client, error) { // RTSP server will be made.
c := &Client{addr: addr} func NewClient(addr string) (c *Client, local, remote *net.TCPAddr, err error) {
var err error c = &Client{addr: addr}
c.url, err = url.Parse(addr) c.url, err = url.Parse(addr)
if err != nil { if err != nil {
return nil, err return nil, nil,nil, err
} }
c.conn, err = net.Dial("tcp", c.url.Host) c.conn, err = net.Dial("tcp", c.url.Host)
if err != nil { if err != nil {
return nil, err return nil, nil, nil, err
} }
return c, nil local = c.conn.LocalAddr().(*net.TCPAddr)
remote = c.conn.RemoteAddr().(*net.TCPAddr)
return
}
// Close closes the RTSP connection.
func (c *Client) Close() error {
return c.conn.Close()
} }
// Describe forms and sends an RTSP request of method DESCRIBE to the RTSP server. // Describe forms and sends an RTSP request of method DESCRIBE to the RTSP server.

View File

@ -1,78 +0,0 @@
/*
NAME
record
DESCRIPTION
record provides a program to connect to an RTSP server, request for an
RTP stream and then save the RTP payload to file.
AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org>
LICENSE
This is Copyright (C) 2019 the Australian Ocean Lab (AusOcean).
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package main
import (
"flag"
"fmt"
"log"
"bitbucket.org/ausocean/av/protocol/rtsp"
)
func main() {
rtspServerPtr := flag.String("rtsp-server", "", "The RTSP server we would like to get video from.")
clientPortPtr := flag.Uint("port", 6870, "The port on the client we would like to receive RTP on.")
trackPtr := flag.String("track", "track1", "The track that we would like to receive media from.")
flag.Parse()
clt, err := rtsp.NewClient(*rtspServerPtr)
if err != nil {
panic(fmt.Sprintf("creating RTSP client failed with error: %v", err))
}
resp, err := clt.Options()
if err != nil {
log.Fatalln(err)
}
fmt.Println("Options:")
fmt.Println(resp)
resp, err = clt.Describe()
if err != nil {
log.Fatalln(err)
}
fmt.Println("Describe:")
fmt.Println(resp)
resp, err = clt.Setup(*trackPtr, fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", *clientPortPtr, *clientPortPtr+1))
if err != nil {
log.Fatalln(err)
}
log.Println(resp)
resp, err = clt.Play()
if err != nil {
log.Fatalln(err)
}
log.Println(resp)
// TODO(saxon): use RTCP client here to maintain stream.
select {}
}

View File

@ -202,7 +202,7 @@ func TestMethods(t *testing.T) {
// Keep trying to connect to server. // Keep trying to connect to server.
// TODO: use generalised retry utility when available. // TODO: use generalised retry utility when available.
for retry := 0; ; retry++ { for retry := 0; ; retry++ {
clt, err = NewClient(serverAddr) clt, _, _, err = NewClient(serverAddr)
if err == nil { if err == nil {
break break
} }

View File

@ -74,6 +74,7 @@ type Config struct {
Saturation int Saturation int
Exposure string Exposure string
AutoWhiteBalance string AutoWhiteBalance string
RTSPURL string
} }
// Possible modes for raspivid --exposure parameter. // Possible modes for raspivid --exposure parameter.
@ -130,6 +131,7 @@ const (
Udp Udp
MpegtsRtp MpegtsRtp
Rtp Rtp
RTSP
) )
// Default config settings // Default config settings
@ -172,7 +174,7 @@ func (c *Config) Validate(r *Revid) error {
} }
switch c.Input { switch c.Input {
case Raspivid, V4L, File: case Raspivid, V4L, File, RTSP:
case NothingDefined: case NothingDefined:
c.Logger.Log(logger.Info, pkg+"no input type defined, defaulting", "input", defaultInput) c.Logger.Log(logger.Info, pkg+"no input type defined, defaulting", "input", defaultInput)
c.Input = defaultInput c.Input = defaultInput

View File

@ -33,6 +33,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"net"
"os" "os"
"os/exec" "os/exec"
"strconv" "strconv"
@ -41,9 +42,12 @@ import (
"time" "time"
"bitbucket.org/ausocean/av/codec/h264" "bitbucket.org/ausocean/av/codec/h264"
"bitbucket.org/ausocean/av/codec/mjpeg" "bitbucket.org/ausocean/av/codec/h265"
"bitbucket.org/ausocean/av/container/flv" "bitbucket.org/ausocean/av/container/flv"
"bitbucket.org/ausocean/av/container/mts" "bitbucket.org/ausocean/av/container/mts"
"bitbucket.org/ausocean/av/protocol/rtcp"
"bitbucket.org/ausocean/av/protocol/rtp"
"bitbucket.org/ausocean/av/protocol/rtsp"
"bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/ioext" "bitbucket.org/ausocean/utils/ioext"
"bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/logger"
@ -61,6 +65,12 @@ const (
rtmpConnectionTimeout = 10 rtmpConnectionTimeout = 10
) )
const (
rtpPort = 60000
rtcpPort = 60001
defaultServerRTCPPort = 17301
)
const pkg = "revid:" const pkg = "revid:"
type Logger interface { type Logger interface {
@ -164,7 +174,14 @@ func (r *Revid) reset(config Config) error {
err = r.setupPipeline( err = r.setupPipeline(
func(dst io.WriteCloser, fps int) (io.WriteCloser, error) { func(dst io.WriteCloser, fps int) (io.WriteCloser, error) {
e := mts.NewEncoder(dst, float64(fps), mts.Video) var st int
switch r.config.Input {
case Raspivid, File, V4L:
st = mts.EncodeH264
case RTSP:
st = mts.EncodeH265
}
e := mts.NewEncoder(dst, float64(fps), st)
return e, nil return e, nil
}, },
func(dst io.WriteCloser, fps int) (io.WriteCloser, error) { func(dst io.WriteCloser, fps int) (io.WriteCloser, error) {
@ -263,20 +280,18 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.WriteCloser, rate int)
switch r.config.Input { switch r.config.Input {
case Raspivid: case Raspivid:
r.setupInput = r.startRaspivid r.setupInput = r.startRaspivid
r.lexTo = h264.Lex
case V4L: case V4L:
r.setupInput = r.startV4L r.setupInput = r.startV4L
r.lexTo = h264.Lex
case File: case File:
r.setupInput = r.setupInputForFile r.setupInput = r.setupInputForFile
r.lexTo = h264.Lex
case RTSP:
r.setupInput = r.startRTSPCamera
r.lexTo = h265.NewLexer(false).Lex
} }
switch r.config.InputCodec {
case H264:
r.config.Logger.Log(logger.Info, pkg+"using H264 lexer")
r.lexTo = h264.Lex
case Mjpeg:
r.config.Logger.Log(logger.Info, pkg+"using MJPEG lexer")
r.lexTo = mjpeg.Lex
}
return nil return nil
} }
@ -605,6 +620,108 @@ func (r *Revid) setupInputForFile() (func() error, error) {
return func() error { return f.Close() }, nil return func() error { return f.Close() }, nil
} }
// startRTSPCamera uses RTSP to request an RTP stream from an IP camera. An RTP
// client is created from which RTP packets containing either h264/h265 can read
// by the selected lexer.
func (r *Revid) startRTSPCamera() (func() error, error) {
rtspClt, local, remote, err := rtsp.NewClient(r.config.RTSPURL)
if err != nil {
return nil, err
}
resp, err := rtspClt.Options()
if err != nil {
return nil, err
}
r.config.Logger.Log(logger.Info, pkg+"RTSP OPTIONS response", "response", resp.String())
resp, err = rtspClt.Describe()
if err != nil {
return nil, err
}
r.config.Logger.Log(logger.Info, pkg+"RTSP DESCRIBE response", "response", resp.String())
resp, err = rtspClt.Setup("track1", fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", rtpPort, rtcpPort))
if err != nil {
return nil, err
}
r.config.Logger.Log(logger.Info, pkg+"RTSP SETUP response", "response", resp.String())
rtpCltAddr, rtcpCltAddr, rtcpSvrAddr, err := formAddrs(local, remote, *resp)
if err != nil {
return nil, err
}
resp, err = rtspClt.Play()
if err != nil {
return nil, err
}
r.config.Logger.Log(logger.Info, pkg+"RTSP server PLAY response", "response", resp.String())
rtpClt, err := rtp.NewClient(rtpCltAddr)
if err != nil {
return nil, err
}
rtcpClt, err := rtcp.NewClient(rtcpCltAddr, rtcpSvrAddr, rtpClt, r.config.Logger.Log)
if err != nil {
return nil, err
}
// Check errors from RTCP client until it has stopped running.
go func() {
for {
err, ok := <-rtcpClt.Err()
if ok {
r.config.Logger.Log(logger.Warning, pkg+"RTCP error", "error", err.Error())
} else {
return
}
}
}()
// Start the RTCP client.
rtcpClt.Start()
// Start reading data from the RTP client.
r.wg.Add(1)
go r.processFrom(rtpClt, time.Second/time.Duration(r.config.FrameRate))
return func() error {
rtspClt.Close()
rtcpClt.Stop()
return nil
}, nil
}
// formAddrs is a helper function to form the addresses for the RTP client,
// RTCP client, and the RTSP server's RTCP addr using the local, remote addresses
// of the RTSP conn, and the SETUP method response.
func formAddrs(local, remote *net.TCPAddr, setupResp rtsp.Response) (rtpCltAddr, rtcpCltAddr, rtcpSvrAddr string, err error) {
svrRTCPPort, err := parseSvrRTCPPort(setupResp)
if err != nil {
return "", "", "", err
}
rtpCltAddr = strings.Split(local.String(), ":")[0] + ":" + strconv.Itoa(rtpPort)
rtcpCltAddr = strings.Split(local.String(), ":")[0] + ":" + strconv.Itoa(rtcpPort)
rtcpSvrAddr = strings.Split(remote.String(), ":")[0] + ":" + strconv.Itoa(svrRTCPPort)
return
}
// parseServerRTCPPort is a helper function to get the RTSP server's RTCP port.
func parseSvrRTCPPort(resp rtsp.Response) (int, error) {
transport := resp.Header.Get("Transport")
for _, p := range strings.Split(transport, ";") {
if strings.Contains(p, "server_port") {
port, err := strconv.Atoi(strings.Split(p, "-")[1])
if err != nil {
return 0, err
}
return port, nil
}
}
return 0, errors.New("SETUP response did not provide RTCP port")
}
func (r *Revid) processFrom(read io.Reader, delay time.Duration) { func (r *Revid) processFrom(read io.Reader, delay time.Duration) {
r.config.Logger.Log(logger.Info, pkg+"reading input data") r.config.Logger.Log(logger.Info, pkg+"reading input data")
r.err <- r.lexTo(r.encoders, read, delay) r.err <- r.lexTo(r.encoders, read, delay)

View File

@ -134,7 +134,7 @@ func TestMtsSenderSegment(t *testing.T) {
const numberOfClips = 11 const numberOfClips = 11
dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips} dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips}
sender := newMtsSender(dst, (*dummyLogger)(t).log, rbSize, rbElementSize, 0) sender := newMtsSender(dst, (*dummyLogger)(t).log, rbSize, rbElementSize, 0)
encoder := mts.NewEncoder(sender, 25, mts.Video) encoder := mts.NewEncoder(sender, 25, mts.EncodeH264)
// Turn time based PSI writing off for encoder. // Turn time based PSI writing off for encoder.
const psiSendCount = 10 const psiSendCount = 10
@ -212,7 +212,7 @@ func TestMtsSenderFailedSend(t *testing.T) {
const clipToFailAt = 3 const clipToFailAt = 3
dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})} dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})}
sender := newMtsSender(dst, (*dummyLogger)(t).log, rbSize, rbElementSize, 0) sender := newMtsSender(dst, (*dummyLogger)(t).log, rbSize, rbElementSize, 0)
encoder := mts.NewEncoder(sender, 25, mts.Video) encoder := mts.NewEncoder(sender, 25, mts.EncodeH264)
// Turn time based PSI writing off for encoder and send PSI every 10 packets. // Turn time based PSI writing off for encoder and send PSI every 10 packets.
const psiSendCount = 10 const psiSendCount = 10
@ -292,7 +292,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) {
const clipToDelay = 3 const clipToDelay = 3
dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})} dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})}
sender := newMtsSender(dst, (*dummyLogger)(t).log, 1, rbElementSize, 0) sender := newMtsSender(dst, (*dummyLogger)(t).log, 1, rbElementSize, 0)
encoder := mts.NewEncoder(sender, 25, mts.Video) encoder := mts.NewEncoder(sender, 25, mts.EncodeH264)
// Turn time based PSI writing off for encoder. // Turn time based PSI writing off for encoder.
const psiSendCount = 10 const psiSendCount = 10