revid: using RTCP client to maintain RTP stream from RTSP server

Now adopting an RTCP client so that the RTP stream from the RTSP server can be maintained past 1 minute.
This change involved some refactor.
The rtcp.NewClient signature has been simplified. There is now a default send interval and name for use
in the source description in the receiver reports. These can be customised if required with the new
SetSendInterval and SetName funcs. The rtcp.NewClient signature now takes an rtp.Client, so that it
can get information from the RTP stream, like most recent sequence number. As a result of this requirement
the rtp package parse file has been extended with some functions for parsing out the sequence number and
ssrc from RTP packets and the RTP client provides getters for these things.
This commit is contained in:
Saxon 2019-05-20 18:14:23 +09:30
parent 0567a81757
commit bc6a0ae55e
9 changed files with 194 additions and 63 deletions

View File

@ -106,6 +106,7 @@ func handleFlags() revid.Config {
cpuprofile = flag.String("cpuprofile", "", "write cpu profile to `file`") cpuprofile = flag.String("cpuprofile", "", "write cpu profile to `file`")
inputPtr = flag.String("Input", "", "The input type: Raspivid, File, Webcam, RTSPCamera") inputPtr = flag.String("Input", "", "The input type: Raspivid, File, Webcam, RTSPCamera")
rtcpServerAddrPtr = flag.String("RTCPServerAddr", "", "The RTCP server we are communicating with")
rtspURLPtr = flag.String("RTSPURL", "", "The URL for an RTSP server.") rtspURLPtr = flag.String("RTSPURL", "", "The URL for an RTSP server.")
rtpRecvAddrPtr = flag.String("RTPRecvAddr", "", "The RTP address we would like to receive RTP from.") rtpRecvAddrPtr = flag.String("RTPRecvAddr", "", "The RTP address we would like to receive RTP from.")
rtcpAddrPtr = flag.String("RTCPAddr", "", "The address for RTCP communication.") rtcpAddrPtr = flag.String("RTCPAddr", "", "The address for RTCP communication.")
@ -228,6 +229,7 @@ func handleFlags() revid.Config {
netsender.ConfigFile = *configFilePtr netsender.ConfigFile = *configFilePtr
} }
cfg.RTCPServerAddr = *rtcpServerAddrPtr
cfg.RTSPURL = *rtspURLPtr cfg.RTSPURL = *rtspURLPtr
cfg.RTPRecvAddr = *rtpRecvAddrPtr cfg.RTPRecvAddr = *rtpRecvAddrPtr
cfg.RTCPAddr = *rtcpAddrPtr cfg.RTCPAddr = *rtcpAddrPtr

View File

@ -37,16 +37,18 @@ import (
"sync" "sync"
"time" "time"
"bitbucket.org/ausocean/av/protocol/rtp"
"bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/logger"
) )
const ( const (
senderSSRC = 1 // Any non-zero value will do. clientSSRC = 1 // Any non-zero value will do.
defaultClientName = "Client" defaultClientName = "Client"
delayUnit = 1.0 / 65536.0 defaultSendInterval = 2 * time.Second
pkg = "rtcp: " delayUnit = 1.0 / 65536.0
rtcpVer = 2 pkg = "rtcp: "
receiverBufSize = 200 rtcpVer = 2
receiverBufSize = 200
) )
// Log describes a function signature required by the RTCP for the purpose of // Log describes a function signature required by the RTCP for the purpose of
@ -70,23 +72,19 @@ type Client struct {
wg sync.WaitGroup // This is used to wait for send and recv routines to stop when Client is stopped. wg sync.WaitGroup // This is used to wait for send and recv routines to stop when Client is stopped.
quit chan struct{} // Channel used to communicate quit signal to send and recv routines. quit chan struct{} // Channel used to communicate quit signal to send and recv routines.
log Log // Used to log any messages. log Log // Used to log any messages.
rtpClt *rtp.Client
err chan error // Client will send any errors through this chan. Can be accessed by Err(). err chan error // Client will send any errors through this chan. Can be accessed by Err().
} }
// NewClient returns a pointer to a new Client. // NewClient returns a pointer to a new Client.
func NewClient(clientAddress, serverAddress, name string, sendInterval time.Duration, rtpSSRC uint32, l Log) (*Client, error) { func NewClient(clientAddress, serverAddress string, rtpClt *rtp.Client, l Log) (*Client, error) {
if name == "" {
name = defaultClientName
}
c := &Client{ c := &Client{
name: name, name: defaultClientName,
err: make(chan error), err: make(chan error),
quit: make(chan struct{}), quit: make(chan struct{}),
interval: sendInterval, interval: defaultSendInterval,
sourceSSRC: rtpSSRC, rtpClt: rtpClt,
log: l, log: l,
} }
var err error var err error
@ -107,6 +105,17 @@ func NewClient(clientAddress, serverAddress, name string, sendInterval time.Dura
return c, nil return c, nil
} }
// SetSendInterval sets a custom receiver report send interval (default is 5 seconds.)
func (c *Client) SetSendInterval(d time.Duration) {
c.interval = d
}
// SetName sets a custom client name for use in receiver report source description.
// Default is Client".
func (c *Client) SetName(name string) {
c.name = name
}
// Start starts the listen and send routines. This will start the process of // Start starts the listen and send routines. This will start the process of
// receiving and parsing sender reports, and the process of sending receiver // receiving and parsing sender reports, and the process of sending receiver
// reports to the server. // reports to the server.
@ -126,6 +135,13 @@ func (c *Client) Stop() {
c.wg.Wait() c.wg.Wait()
} }
// Done gives access to the clients quit chan, which is closed when the RTCP
// client is stopped. Therefore, Done may be used to check when the RTCP client
// has stopped running - ideal for use in a routine checking Client.Err().
func (c *Client) Done() <-chan struct{} {
return c.quit
}
// Err provides read access to the Client err channel. This must be checked // Err provides read access to the Client err channel. This must be checked
// otherwise the client will block if an error encountered. // otherwise the client will block if an error encountered.
func (c *Client) Err() <-chan error { func (c *Client) Err() <-chan error {
@ -171,13 +187,13 @@ func (c *Client) send() {
ReportCount: 1, ReportCount: 1,
Type: typeReceiverReport, Type: typeReceiverReport,
}, },
SenderSSRC: senderSSRC, SenderSSRC: clientSSRC,
Blocks: []ReportBlock{ Blocks: []ReportBlock{
ReportBlock{ ReportBlock{
SourceIdentifier: c.sourceSSRC, SourceIdentifier: c.rtpClt.SSRC(),
FractionLost: 0, FractionLost: 0,
PacketsLost: math.MaxUint32, PacketsLost: math.MaxUint32,
HighestSequence: c.sequence(), HighestSequence: uint32((c.rtpClt.Cycles() << 16) | c.rtpClt.Sequence()),
Jitter: c.jitter(), Jitter: c.jitter(),
SenderReportTs: c.lastSenderTs(), SenderReportTs: c.lastSenderTs(),
SenderReportDelay: c.delay(), SenderReportDelay: c.delay(),
@ -195,7 +211,7 @@ func (c *Client) send() {
}, },
Chunks: []Chunk{ Chunks: []Chunk{
Chunk{ Chunk{
SSRC: senderSSRC, SSRC: clientSSRC,
Items: []SDESItem{ Items: []SDESItem{
SDESItem{ SDESItem{
Type: typeCName, Type: typeCName,
@ -238,22 +254,6 @@ func (c *Client) parse(buf []byte) {
c.setSenderTs(t) c.setSenderTs(t)
} }
// SetSequence will allow updating of the highest sequence number received
// through an RTP stream.
func (c *Client) SetSequence(s uint32) {
c.mu.Lock()
c.seq = s
c.mu.Unlock()
}
// sequence will return the highest sequence number received through RTP.
func (c *Client) sequence() uint32 {
c.mu.Lock()
s := c.seq
c.mu.Unlock()
return s
}
// jitter returns the interarrival jitter as described by RTCP specifications: // jitter returns the interarrival jitter as described by RTCP specifications:
// https://tools.ietf.org/html/rfc3550 // https://tools.ietf.org/html/rfc3550
// TODO(saxon): complete this. // TODO(saxon): complete this.

View File

@ -37,6 +37,7 @@ import (
"testing" "testing"
"time" "time"
"bitbucket.org/ausocean/av/protocol/rtp"
"bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/logger"
) )
@ -148,12 +149,15 @@ func (dl *dummyLogger) log(lvl int8, msg string, args ...interface{}) {
// respond with sender reports. // respond with sender reports.
func TestReceiveAndSend(t *testing.T) { func TestReceiveAndSend(t *testing.T) {
const clientAddr, serverAddr = "localhost:8000", "localhost:8001" const clientAddr, serverAddr = "localhost:8000", "localhost:8001"
rtpClt, err := rtp.NewClient("localhost:8002")
if err != nil {
t.Fatalf("unexpected error when creating RTP client: %v", err)
}
c, err := NewClient( c, err := NewClient(
clientAddr, clientAddr,
serverAddr, serverAddr,
"testClient", rtpClt,
10*time.Millisecond,
12345,
(*dummyLogger)(t).log, (*dummyLogger)(t).log,
) )
if err != nil { if err != nil {
@ -197,8 +201,6 @@ func TestReceiveAndSend(t *testing.T) {
n, _, _ := conn.ReadFromUDP(buf) n, _, _ := conn.ReadFromUDP(buf)
t.Logf("SERVER: receiver report received: \n%v\n", buf[:n]) t.Logf("SERVER: receiver report received: \n%v\n", buf[:n])
c.SetSequence(uint32(i))
now := time.Now().Second() now := time.Now().Second()
var time [8]byte var time [8]byte
binary.BigEndian.PutUint64(time[:], uint64(now)) binary.BigEndian.PutUint64(time[:], uint64(now))

View File

@ -42,15 +42,9 @@ type Timestamp struct {
// significant word, and the least significant word. If the given bytes do not // significant word, and the least significant word. If the given bytes do not
// represent a valid receiver report, an error is returned. // represent a valid receiver report, an error is returned.
func ParseTimestamp(buf []byte) (Timestamp, error) { func ParseTimestamp(buf []byte) (Timestamp, error) {
if len(buf) < 4 { err := checkPacket(buf)
return Timestamp{}, errors.New("bad RTCP packet, not of sufficient length") if err != nil {
} return Timestamp{}, err
if (buf[0]&0xc0)>>6 != rtcpVer {
return Timestamp{}, errors.New("incompatible RTCP version")
}
if buf[1] != typeSenderReport {
return Timestamp{}, errors.New("RTCP packet is not of sender report type")
} }
return Timestamp{ return Timestamp{
@ -58,3 +52,25 @@ func ParseTimestamp(buf []byte) (Timestamp, error) {
Fraction: binary.BigEndian.Uint32(buf[12:]), Fraction: binary.BigEndian.Uint32(buf[12:]),
}, nil }, nil
} }
func ParseSSRC(buf []byte) (uint32, error) {
err := checkPacket(buf)
if err != nil {
return 0, err
}
return binary.BigEndian.Uint32(buf[4:]), nil
}
func checkPacket(buf []byte) error {
if len(buf) < 4 {
return errors.New("bad RTCP packet, not of sufficient length")
}
if (buf[0]&0xc0)>>6 != rtcpVer {
return errors.New("incompatible RTCP version")
}
if buf[1] != typeSenderReport {
return errors.New("RTCP packet is not of sender report type")
}
return nil
}

View File

@ -28,13 +28,19 @@ LICENSE
package rtp package rtp
import ( import (
"fmt"
"net" "net"
"sync"
) )
// Client describes an RTP client that can receive an RTP stream and implements // Client describes an RTP client that can receive an RTP stream and implements
// io.Reader. // io.Reader.
type Client struct { type Client struct {
r *PacketReader r *PacketReader
ssrc uint32
mu sync.Mutex
sequence uint16
cycles uint16
} }
// NewClient returns a pointer to a new Client. // NewClient returns a pointer to a new Client.
@ -54,14 +60,61 @@ func NewClient(addr string) (*Client, error) {
return nil, err return nil, err
} }
buf := make([]byte, 4096)
n, err := c.Read(buf)
if err != nil {
return nil, err
}
c.ssrc, err = SSRC(buf[:n])
if err != nil {
return nil, fmt.Errorf("could not parse SSRC from RTP packet, failed with error: %v", err)
}
return c, nil return c, nil
} }
// SSRC returns the identified for the source from which the RTP packets being
// received are coming from.
func (c *Client) SSRC() uint32 {
return c.ssrc
}
// Read implements io.Reader. // Read implements io.Reader.
func (c *Client) Read(p []byte) (int, error) { func (c *Client) Read(p []byte) (int, error) {
n, err := c.r.Read(p)
if err != nil {
return n, err
}
s, _ := Sequence(p[:n])
c.setSequence(s)
return c.r.Read(p) return c.r.Read(p)
} }
// setSequence sets the most recently received sequence number, and updates the
// cycles count if the sequence number has rolled over.
func (c *Client) setSequence(s uint16) {
c.mu.Lock()
if s < c.sequence {
c.cycles++
}
c.sequence = s
c.mu.Unlock()
}
// Sequence returns the most recent RTP packet sequence number received.
func (c *Client) Sequence() uint16 {
c.mu.Lock()
defer c.mu.Unlock()
return c.sequence
}
// Cycles returns the number of RTP sequence number cycles that have been received.
func (c *Client) Cycles() uint16 {
c.mu.Lock()
defer c.mu.Unlock()
return c.cycles
}
// PacketReader provides an io.Reader interface to an underlying UDP PacketConn. // PacketReader provides an io.Reader interface to an underlying UDP PacketConn.
type PacketReader struct { type PacketReader struct {
net.PacketConn net.PacketConn

View File

@ -50,11 +50,9 @@ func Marker(d []byte) (bool, error) {
// Payload returns the payload from an RTP packet provided the version is // Payload returns the payload from an RTP packet provided the version is
// compatible, otherwise an error is returned. // compatible, otherwise an error is returned.
func Payload(d []byte) ([]byte, error) { func Payload(d []byte) ([]byte, error) {
if len(d) < defaultHeadSize { err := checkPacket(d)
panic("invalid RTP packet length") if err != nil {
} return nil, err
if version(d) != rtpVer {
return nil, errors.New(badVer)
} }
extLen := 0 extLen := 0
if hasExt(d) { if hasExt(d) {
@ -64,6 +62,38 @@ func Payload(d []byte) ([]byte, error) {
return d[payloadIdx:], nil return d[payloadIdx:], nil
} }
// SSRC returns the source identifier from an RTP packet. An error is return if
// the packet is not valid.
func SSRC(d []byte) (uint32, error) {
err := checkPacket(d)
if err != nil {
return 0, err
}
return binary.BigEndian.Uint32(d[8:]), nil
}
// Sequence returns the sequence number of an RTP packet. An error is returned
// if the packet is not valid.
func Sequence(d []byte) (uint16, error) {
err := checkPacket(d)
if err != nil {
return 0, err
}
return binary.BigEndian.Uint16(d[2:]), nil
}
// checkPacket checks the validity of the packet, firstly by checking size and
// then also checking that version is compatible with these utilities.
func checkPacket(d []byte) error {
if len(d) < defaultHeadSize {
return errors.New("invalid RTP packet length")
}
if version(d) != rtpVer {
return errors.New(badVer)
}
return nil
}
// hasExt returns true if an extension is present in the RTP packet. // hasExt returns true if an extension is present in the RTP packet.
func hasExt(d []byte) bool { func hasExt(d []byte) bool {
return (d[0] & 0x10 >> 4) == 1 return (d[0] & 0x10 >> 4) == 1

View File

@ -59,6 +59,11 @@ func NewClient(addr string) (*Client, error) {
return c, nil return c, nil
} }
// Close closes the RTSP connection.
func (c *Client) Close() error {
return c.conn.Close()
}
// Describe forms and sends an RTSP request of method DESCRIBE to the RTSP server. // Describe forms and sends an RTSP request of method DESCRIBE to the RTSP server.
func (c *Client) Describe() (*Response, error) { func (c *Client) Describe() (*Response, error) {
req, err := NewRequest("DESCRIBE", c.nextCSeq(), c.url, nil) req, err := NewRequest("DESCRIBE", c.nextCSeq(), c.url, nil)

View File

@ -77,6 +77,7 @@ type Config struct {
RTSPURL string RTSPURL string
RTPRecvAddr string RTPRecvAddr string
RTCPAddr string RTCPAddr string
RTCPServerAddr string
} }
// Possible modes for raspivid --exposure parameter. // Possible modes for raspivid --exposure parameter.

View File

@ -44,6 +44,7 @@ import (
"bitbucket.org/ausocean/av/codec/lex" "bitbucket.org/ausocean/av/codec/lex"
"bitbucket.org/ausocean/av/container/flv" "bitbucket.org/ausocean/av/container/flv"
"bitbucket.org/ausocean/av/container/mts" "bitbucket.org/ausocean/av/container/mts"
"bitbucket.org/ausocean/av/protocol/rtcp"
"bitbucket.org/ausocean/av/protocol/rtp" "bitbucket.org/ausocean/av/protocol/rtp"
"bitbucket.org/ausocean/av/protocol/rtsp" "bitbucket.org/ausocean/av/protocol/rtsp"
"bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/iot/pi/netsender"
@ -614,7 +615,6 @@ func (r *Revid) setupInputForFile() (func() error, error) {
func (r *Revid) startRTSPCamera() (func() error, error) { func (r *Revid) startRTSPCamera() (func() error, error) {
rtspClt, err := rtsp.NewClient(r.config.RTSPURL) rtspClt, err := rtsp.NewClient(r.config.RTSPURL)
fmt.Printf("RTSPURL: %v\n", r.config.RTSPURL)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -654,16 +654,38 @@ func (r *Revid) startRTSPCamera() (func() error, error) {
} }
r.config.Logger.Log(logger.Info, pkg+"RTSP server PLAY response", "response", resp.String()) r.config.Logger.Log(logger.Info, pkg+"RTSP server PLAY response", "response", resp.String())
// TODO(saxon): use rtcp client to maintain rtp stream.
rtpClt, err := rtp.NewClient(r.config.RTPRecvAddr) rtpClt, err := rtp.NewClient(r.config.RTPRecvAddr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// TODO(saxon): use rtcp client to maintain rtp stream.
rtcpClt, err := rtcp.NewClient(r.config.RTCPAddr, r.config.RTCPServerAddr, rtpClt, r.config.Logger.Log)
if err != nil {
return nil, err
}
go func() {
for {
select {
case <-rtcpClt.Done():
return
case err := <-rtcpClt.Err():
r.config.Logger.Log(logger.Warning, pkg+"RTCP error", "error", err.Error())
default:
}
}
}()
rtcpClt.Start()
r.wg.Add(1) r.wg.Add(1)
go r.processFrom(rtpClt, time.Second/time.Duration(r.config.FrameRate)) go r.processFrom(rtpClt, time.Second/time.Duration(r.config.FrameRate))
return func() error { return nil }, nil return func() error {
rtspClt.Close()
rtcpClt.Stop()
return nil
}, nil
} }
func (r *Revid) processFrom(read io.Reader, delay time.Duration) { func (r *Revid) processFrom(read io.Reader, delay time.Duration) {