From bc6a0ae55e137d65e5eea8d4464568e2636dcafc Mon Sep 17 00:00:00 2001 From: Saxon Date: Mon, 20 May 2019 18:14:23 +0930 Subject: [PATCH] revid: using RTCP client to maintain RTP stream from RTSP server Now adopting an RTCP client so that the RTP stream from the RTSP server can be maintained past 1 minute. This change involved some refactor. The rtcp.NewClient signature has been simplified. There is now a default send interval and name for use in the source description in the receiver reports. These can be customised if required with the new SetSendInterval and SetName funcs. The rtcp.NewClient signature now takes an rtp.Client, so that it can get information from the RTP stream, like most recent sequence number. As a result of this requirement the rtp package parse file has been extended with some functions for parsing out the sequence number and ssrc from RTP packets and the RTP client provides getters for these things. --- cmd/revid-cli/main.go | 2 + protocol/rtcp/client.go | 78 ++++++++++++++++++------------------ protocol/rtcp/client_test.go | 12 +++--- protocol/rtcp/parse.go | 34 +++++++++++----- protocol/rtp/client.go | 55 ++++++++++++++++++++++++- protocol/rtp/parse.go | 40 +++++++++++++++--- protocol/rtsp/client.go | 5 +++ revid/config.go | 1 + revid/revid.go | 30 ++++++++++++-- 9 files changed, 194 insertions(+), 63 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index a1961873..6cbcab0a 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -106,6 +106,7 @@ func handleFlags() revid.Config { cpuprofile = flag.String("cpuprofile", "", "write cpu profile to `file`") inputPtr = flag.String("Input", "", "The input type: Raspivid, File, Webcam, RTSPCamera") + rtcpServerAddrPtr = flag.String("RTCPServerAddr", "", "The RTCP server we are communicating with") rtspURLPtr = flag.String("RTSPURL", "", "The URL for an RTSP server.") rtpRecvAddrPtr = flag.String("RTPRecvAddr", "", "The RTP address we would like to receive RTP from.") rtcpAddrPtr = flag.String("RTCPAddr", "", "The address for RTCP communication.") @@ -228,6 +229,7 @@ func handleFlags() revid.Config { netsender.ConfigFile = *configFilePtr } + cfg.RTCPServerAddr = *rtcpServerAddrPtr cfg.RTSPURL = *rtspURLPtr cfg.RTPRecvAddr = *rtpRecvAddrPtr cfg.RTCPAddr = *rtcpAddrPtr diff --git a/protocol/rtcp/client.go b/protocol/rtcp/client.go index 7d6c995c..80d65511 100644 --- a/protocol/rtcp/client.go +++ b/protocol/rtcp/client.go @@ -37,16 +37,18 @@ import ( "sync" "time" + "bitbucket.org/ausocean/av/protocol/rtp" "bitbucket.org/ausocean/utils/logger" ) const ( - senderSSRC = 1 // Any non-zero value will do. - defaultClientName = "Client" - delayUnit = 1.0 / 65536.0 - pkg = "rtcp: " - rtcpVer = 2 - receiverBufSize = 200 + clientSSRC = 1 // Any non-zero value will do. + defaultClientName = "Client" + defaultSendInterval = 2 * time.Second + delayUnit = 1.0 / 65536.0 + pkg = "rtcp: " + rtcpVer = 2 + receiverBufSize = 200 ) // Log describes a function signature required by the RTCP for the purpose of @@ -70,23 +72,19 @@ type Client struct { wg sync.WaitGroup // This is used to wait for send and recv routines to stop when Client is stopped. quit chan struct{} // Channel used to communicate quit signal to send and recv routines. log Log // Used to log any messages. - - err chan error // Client will send any errors through this chan. Can be accessed by Err(). + rtpClt *rtp.Client + err chan error // Client will send any errors through this chan. Can be accessed by Err(). } // NewClient returns a pointer to a new Client. -func NewClient(clientAddress, serverAddress, name string, sendInterval time.Duration, rtpSSRC uint32, l Log) (*Client, error) { - if name == "" { - name = defaultClientName - } - +func NewClient(clientAddress, serverAddress string, rtpClt *rtp.Client, l Log) (*Client, error) { c := &Client{ - name: name, - err: make(chan error), - quit: make(chan struct{}), - interval: sendInterval, - sourceSSRC: rtpSSRC, - log: l, + name: defaultClientName, + err: make(chan error), + quit: make(chan struct{}), + interval: defaultSendInterval, + rtpClt: rtpClt, + log: l, } var err error @@ -107,6 +105,17 @@ func NewClient(clientAddress, serverAddress, name string, sendInterval time.Dura return c, nil } +// SetSendInterval sets a custom receiver report send interval (default is 5 seconds.) +func (c *Client) SetSendInterval(d time.Duration) { + c.interval = d +} + +// SetName sets a custom client name for use in receiver report source description. +// Default is Client". +func (c *Client) SetName(name string) { + c.name = name +} + // Start starts the listen and send routines. This will start the process of // receiving and parsing sender reports, and the process of sending receiver // reports to the server. @@ -126,6 +135,13 @@ func (c *Client) Stop() { c.wg.Wait() } +// Done gives access to the clients quit chan, which is closed when the RTCP +// client is stopped. Therefore, Done may be used to check when the RTCP client +// has stopped running - ideal for use in a routine checking Client.Err(). +func (c *Client) Done() <-chan struct{} { + return c.quit +} + // Err provides read access to the Client err channel. This must be checked // otherwise the client will block if an error encountered. func (c *Client) Err() <-chan error { @@ -171,13 +187,13 @@ func (c *Client) send() { ReportCount: 1, Type: typeReceiverReport, }, - SenderSSRC: senderSSRC, + SenderSSRC: clientSSRC, Blocks: []ReportBlock{ ReportBlock{ - SourceIdentifier: c.sourceSSRC, + SourceIdentifier: c.rtpClt.SSRC(), FractionLost: 0, PacketsLost: math.MaxUint32, - HighestSequence: c.sequence(), + HighestSequence: uint32((c.rtpClt.Cycles() << 16) | c.rtpClt.Sequence()), Jitter: c.jitter(), SenderReportTs: c.lastSenderTs(), SenderReportDelay: c.delay(), @@ -195,7 +211,7 @@ func (c *Client) send() { }, Chunks: []Chunk{ Chunk{ - SSRC: senderSSRC, + SSRC: clientSSRC, Items: []SDESItem{ SDESItem{ Type: typeCName, @@ -238,22 +254,6 @@ func (c *Client) parse(buf []byte) { c.setSenderTs(t) } -// SetSequence will allow updating of the highest sequence number received -// through an RTP stream. -func (c *Client) SetSequence(s uint32) { - c.mu.Lock() - c.seq = s - c.mu.Unlock() -} - -// sequence will return the highest sequence number received through RTP. -func (c *Client) sequence() uint32 { - c.mu.Lock() - s := c.seq - c.mu.Unlock() - return s -} - // jitter returns the interarrival jitter as described by RTCP specifications: // https://tools.ietf.org/html/rfc3550 // TODO(saxon): complete this. diff --git a/protocol/rtcp/client_test.go b/protocol/rtcp/client_test.go index 64a4d685..111bcbbf 100644 --- a/protocol/rtcp/client_test.go +++ b/protocol/rtcp/client_test.go @@ -37,6 +37,7 @@ import ( "testing" "time" + "bitbucket.org/ausocean/av/protocol/rtp" "bitbucket.org/ausocean/utils/logger" ) @@ -148,12 +149,15 @@ func (dl *dummyLogger) log(lvl int8, msg string, args ...interface{}) { // respond with sender reports. func TestReceiveAndSend(t *testing.T) { const clientAddr, serverAddr = "localhost:8000", "localhost:8001" + rtpClt, err := rtp.NewClient("localhost:8002") + if err != nil { + t.Fatalf("unexpected error when creating RTP client: %v", err) + } + c, err := NewClient( clientAddr, serverAddr, - "testClient", - 10*time.Millisecond, - 12345, + rtpClt, (*dummyLogger)(t).log, ) if err != nil { @@ -197,8 +201,6 @@ func TestReceiveAndSend(t *testing.T) { n, _, _ := conn.ReadFromUDP(buf) t.Logf("SERVER: receiver report received: \n%v\n", buf[:n]) - c.SetSequence(uint32(i)) - now := time.Now().Second() var time [8]byte binary.BigEndian.PutUint64(time[:], uint64(now)) diff --git a/protocol/rtcp/parse.go b/protocol/rtcp/parse.go index 2f756f4b..be680fa0 100644 --- a/protocol/rtcp/parse.go +++ b/protocol/rtcp/parse.go @@ -42,15 +42,9 @@ type Timestamp struct { // significant word, and the least significant word. If the given bytes do not // represent a valid receiver report, an error is returned. func ParseTimestamp(buf []byte) (Timestamp, error) { - if len(buf) < 4 { - return Timestamp{}, errors.New("bad RTCP packet, not of sufficient length") - } - if (buf[0]&0xc0)>>6 != rtcpVer { - return Timestamp{}, errors.New("incompatible RTCP version") - } - - if buf[1] != typeSenderReport { - return Timestamp{}, errors.New("RTCP packet is not of sender report type") + err := checkPacket(buf) + if err != nil { + return Timestamp{}, err } return Timestamp{ @@ -58,3 +52,25 @@ func ParseTimestamp(buf []byte) (Timestamp, error) { Fraction: binary.BigEndian.Uint32(buf[12:]), }, nil } + +func ParseSSRC(buf []byte) (uint32, error) { + err := checkPacket(buf) + if err != nil { + return 0, err + } + return binary.BigEndian.Uint32(buf[4:]), nil +} + +func checkPacket(buf []byte) error { + if len(buf) < 4 { + return errors.New("bad RTCP packet, not of sufficient length") + } + if (buf[0]&0xc0)>>6 != rtcpVer { + return errors.New("incompatible RTCP version") + } + + if buf[1] != typeSenderReport { + return errors.New("RTCP packet is not of sender report type") + } + return nil +} diff --git a/protocol/rtp/client.go b/protocol/rtp/client.go index 3ab856b9..53b4d98f 100644 --- a/protocol/rtp/client.go +++ b/protocol/rtp/client.go @@ -28,13 +28,19 @@ LICENSE package rtp import ( + "fmt" "net" + "sync" ) // Client describes an RTP client that can receive an RTP stream and implements // io.Reader. type Client struct { - r *PacketReader + r *PacketReader + ssrc uint32 + mu sync.Mutex + sequence uint16 + cycles uint16 } // NewClient returns a pointer to a new Client. @@ -54,14 +60,61 @@ func NewClient(addr string) (*Client, error) { return nil, err } + buf := make([]byte, 4096) + n, err := c.Read(buf) + if err != nil { + return nil, err + } + c.ssrc, err = SSRC(buf[:n]) + if err != nil { + return nil, fmt.Errorf("could not parse SSRC from RTP packet, failed with error: %v", err) + } + return c, nil } +// SSRC returns the identified for the source from which the RTP packets being +// received are coming from. +func (c *Client) SSRC() uint32 { + return c.ssrc +} + // Read implements io.Reader. func (c *Client) Read(p []byte) (int, error) { + n, err := c.r.Read(p) + if err != nil { + return n, err + } + s, _ := Sequence(p[:n]) + c.setSequence(s) return c.r.Read(p) } +// setSequence sets the most recently received sequence number, and updates the +// cycles count if the sequence number has rolled over. +func (c *Client) setSequence(s uint16) { + c.mu.Lock() + if s < c.sequence { + c.cycles++ + } + c.sequence = s + c.mu.Unlock() +} + +// Sequence returns the most recent RTP packet sequence number received. +func (c *Client) Sequence() uint16 { + c.mu.Lock() + defer c.mu.Unlock() + return c.sequence +} + +// Cycles returns the number of RTP sequence number cycles that have been received. +func (c *Client) Cycles() uint16 { + c.mu.Lock() + defer c.mu.Unlock() + return c.cycles +} + // PacketReader provides an io.Reader interface to an underlying UDP PacketConn. type PacketReader struct { net.PacketConn diff --git a/protocol/rtp/parse.go b/protocol/rtp/parse.go index fcc05907..16e64c5d 100644 --- a/protocol/rtp/parse.go +++ b/protocol/rtp/parse.go @@ -50,11 +50,9 @@ func Marker(d []byte) (bool, error) { // Payload returns the payload from an RTP packet provided the version is // compatible, otherwise an error is returned. func Payload(d []byte) ([]byte, error) { - if len(d) < defaultHeadSize { - panic("invalid RTP packet length") - } - if version(d) != rtpVer { - return nil, errors.New(badVer) + err := checkPacket(d) + if err != nil { + return nil, err } extLen := 0 if hasExt(d) { @@ -64,6 +62,38 @@ func Payload(d []byte) ([]byte, error) { return d[payloadIdx:], nil } +// SSRC returns the source identifier from an RTP packet. An error is return if +// the packet is not valid. +func SSRC(d []byte) (uint32, error) { + err := checkPacket(d) + if err != nil { + return 0, err + } + return binary.BigEndian.Uint32(d[8:]), nil +} + +// Sequence returns the sequence number of an RTP packet. An error is returned +// if the packet is not valid. +func Sequence(d []byte) (uint16, error) { + err := checkPacket(d) + if err != nil { + return 0, err + } + return binary.BigEndian.Uint16(d[2:]), nil +} + +// checkPacket checks the validity of the packet, firstly by checking size and +// then also checking that version is compatible with these utilities. +func checkPacket(d []byte) error { + if len(d) < defaultHeadSize { + return errors.New("invalid RTP packet length") + } + if version(d) != rtpVer { + return errors.New(badVer) + } + return nil +} + // hasExt returns true if an extension is present in the RTP packet. func hasExt(d []byte) bool { return (d[0] & 0x10 >> 4) == 1 diff --git a/protocol/rtsp/client.go b/protocol/rtsp/client.go index 26e539db..77689b64 100644 --- a/protocol/rtsp/client.go +++ b/protocol/rtsp/client.go @@ -59,6 +59,11 @@ func NewClient(addr string) (*Client, error) { return c, nil } +// Close closes the RTSP connection. +func (c *Client) Close() error { + return c.conn.Close() +} + // Describe forms and sends an RTSP request of method DESCRIBE to the RTSP server. func (c *Client) Describe() (*Response, error) { req, err := NewRequest("DESCRIBE", c.nextCSeq(), c.url, nil) diff --git a/revid/config.go b/revid/config.go index 784ab02c..441b7c6d 100644 --- a/revid/config.go +++ b/revid/config.go @@ -77,6 +77,7 @@ type Config struct { RTSPURL string RTPRecvAddr string RTCPAddr string + RTCPServerAddr string } // Possible modes for raspivid --exposure parameter. diff --git a/revid/revid.go b/revid/revid.go index 0d1b254e..d5eefaa4 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -44,6 +44,7 @@ import ( "bitbucket.org/ausocean/av/codec/lex" "bitbucket.org/ausocean/av/container/flv" "bitbucket.org/ausocean/av/container/mts" + "bitbucket.org/ausocean/av/protocol/rtcp" "bitbucket.org/ausocean/av/protocol/rtp" "bitbucket.org/ausocean/av/protocol/rtsp" "bitbucket.org/ausocean/iot/pi/netsender" @@ -614,7 +615,6 @@ func (r *Revid) setupInputForFile() (func() error, error) { func (r *Revid) startRTSPCamera() (func() error, error) { rtspClt, err := rtsp.NewClient(r.config.RTSPURL) - fmt.Printf("RTSPURL: %v\n", r.config.RTSPURL) if err != nil { return nil, err } @@ -654,16 +654,38 @@ func (r *Revid) startRTSPCamera() (func() error, error) { } r.config.Logger.Log(logger.Info, pkg+"RTSP server PLAY response", "response", resp.String()) - // TODO(saxon): use rtcp client to maintain rtp stream. - rtpClt, err := rtp.NewClient(r.config.RTPRecvAddr) if err != nil { return nil, err } + // TODO(saxon): use rtcp client to maintain rtp stream. + rtcpClt, err := rtcp.NewClient(r.config.RTCPAddr, r.config.RTCPServerAddr, rtpClt, r.config.Logger.Log) + if err != nil { + return nil, err + } + + go func() { + for { + select { + case <-rtcpClt.Done(): + return + case err := <-rtcpClt.Err(): + r.config.Logger.Log(logger.Warning, pkg+"RTCP error", "error", err.Error()) + default: + } + } + }() + + rtcpClt.Start() + r.wg.Add(1) go r.processFrom(rtpClt, time.Second/time.Duration(r.config.FrameRate)) - return func() error { return nil }, nil + return func() error { + rtspClt.Close() + rtcpClt.Stop() + return nil + }, nil } func (r *Revid) processFrom(read io.Reader, delay time.Duration) {