diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index a1961873..6cbcab0a 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -106,6 +106,7 @@ func handleFlags() revid.Config { cpuprofile = flag.String("cpuprofile", "", "write cpu profile to `file`") inputPtr = flag.String("Input", "", "The input type: Raspivid, File, Webcam, RTSPCamera") + rtcpServerAddrPtr = flag.String("RTCPServerAddr", "", "The RTCP server we are communicating with") rtspURLPtr = flag.String("RTSPURL", "", "The URL for an RTSP server.") rtpRecvAddrPtr = flag.String("RTPRecvAddr", "", "The RTP address we would like to receive RTP from.") rtcpAddrPtr = flag.String("RTCPAddr", "", "The address for RTCP communication.") @@ -228,6 +229,7 @@ func handleFlags() revid.Config { netsender.ConfigFile = *configFilePtr } + cfg.RTCPServerAddr = *rtcpServerAddrPtr cfg.RTSPURL = *rtspURLPtr cfg.RTPRecvAddr = *rtpRecvAddrPtr cfg.RTCPAddr = *rtcpAddrPtr diff --git a/protocol/rtcp/client.go b/protocol/rtcp/client.go index 7d6c995c..80d65511 100644 --- a/protocol/rtcp/client.go +++ b/protocol/rtcp/client.go @@ -37,16 +37,18 @@ import ( "sync" "time" + "bitbucket.org/ausocean/av/protocol/rtp" "bitbucket.org/ausocean/utils/logger" ) const ( - senderSSRC = 1 // Any non-zero value will do. - defaultClientName = "Client" - delayUnit = 1.0 / 65536.0 - pkg = "rtcp: " - rtcpVer = 2 - receiverBufSize = 200 + clientSSRC = 1 // Any non-zero value will do. + defaultClientName = "Client" + defaultSendInterval = 2 * time.Second + delayUnit = 1.0 / 65536.0 + pkg = "rtcp: " + rtcpVer = 2 + receiverBufSize = 200 ) // Log describes a function signature required by the RTCP for the purpose of @@ -70,23 +72,19 @@ type Client struct { wg sync.WaitGroup // This is used to wait for send and recv routines to stop when Client is stopped. quit chan struct{} // Channel used to communicate quit signal to send and recv routines. log Log // Used to log any messages. - - err chan error // Client will send any errors through this chan. Can be accessed by Err(). + rtpClt *rtp.Client + err chan error // Client will send any errors through this chan. Can be accessed by Err(). } // NewClient returns a pointer to a new Client. -func NewClient(clientAddress, serverAddress, name string, sendInterval time.Duration, rtpSSRC uint32, l Log) (*Client, error) { - if name == "" { - name = defaultClientName - } - +func NewClient(clientAddress, serverAddress string, rtpClt *rtp.Client, l Log) (*Client, error) { c := &Client{ - name: name, - err: make(chan error), - quit: make(chan struct{}), - interval: sendInterval, - sourceSSRC: rtpSSRC, - log: l, + name: defaultClientName, + err: make(chan error), + quit: make(chan struct{}), + interval: defaultSendInterval, + rtpClt: rtpClt, + log: l, } var err error @@ -107,6 +105,17 @@ func NewClient(clientAddress, serverAddress, name string, sendInterval time.Dura return c, nil } +// SetSendInterval sets a custom receiver report send interval (default is 5 seconds.) +func (c *Client) SetSendInterval(d time.Duration) { + c.interval = d +} + +// SetName sets a custom client name for use in receiver report source description. +// Default is Client". +func (c *Client) SetName(name string) { + c.name = name +} + // Start starts the listen and send routines. This will start the process of // receiving and parsing sender reports, and the process of sending receiver // reports to the server. @@ -126,6 +135,13 @@ func (c *Client) Stop() { c.wg.Wait() } +// Done gives access to the clients quit chan, which is closed when the RTCP +// client is stopped. Therefore, Done may be used to check when the RTCP client +// has stopped running - ideal for use in a routine checking Client.Err(). +func (c *Client) Done() <-chan struct{} { + return c.quit +} + // Err provides read access to the Client err channel. This must be checked // otherwise the client will block if an error encountered. func (c *Client) Err() <-chan error { @@ -171,13 +187,13 @@ func (c *Client) send() { ReportCount: 1, Type: typeReceiverReport, }, - SenderSSRC: senderSSRC, + SenderSSRC: clientSSRC, Blocks: []ReportBlock{ ReportBlock{ - SourceIdentifier: c.sourceSSRC, + SourceIdentifier: c.rtpClt.SSRC(), FractionLost: 0, PacketsLost: math.MaxUint32, - HighestSequence: c.sequence(), + HighestSequence: uint32((c.rtpClt.Cycles() << 16) | c.rtpClt.Sequence()), Jitter: c.jitter(), SenderReportTs: c.lastSenderTs(), SenderReportDelay: c.delay(), @@ -195,7 +211,7 @@ func (c *Client) send() { }, Chunks: []Chunk{ Chunk{ - SSRC: senderSSRC, + SSRC: clientSSRC, Items: []SDESItem{ SDESItem{ Type: typeCName, @@ -238,22 +254,6 @@ func (c *Client) parse(buf []byte) { c.setSenderTs(t) } -// SetSequence will allow updating of the highest sequence number received -// through an RTP stream. -func (c *Client) SetSequence(s uint32) { - c.mu.Lock() - c.seq = s - c.mu.Unlock() -} - -// sequence will return the highest sequence number received through RTP. -func (c *Client) sequence() uint32 { - c.mu.Lock() - s := c.seq - c.mu.Unlock() - return s -} - // jitter returns the interarrival jitter as described by RTCP specifications: // https://tools.ietf.org/html/rfc3550 // TODO(saxon): complete this. diff --git a/protocol/rtcp/client_test.go b/protocol/rtcp/client_test.go index 64a4d685..111bcbbf 100644 --- a/protocol/rtcp/client_test.go +++ b/protocol/rtcp/client_test.go @@ -37,6 +37,7 @@ import ( "testing" "time" + "bitbucket.org/ausocean/av/protocol/rtp" "bitbucket.org/ausocean/utils/logger" ) @@ -148,12 +149,15 @@ func (dl *dummyLogger) log(lvl int8, msg string, args ...interface{}) { // respond with sender reports. func TestReceiveAndSend(t *testing.T) { const clientAddr, serverAddr = "localhost:8000", "localhost:8001" + rtpClt, err := rtp.NewClient("localhost:8002") + if err != nil { + t.Fatalf("unexpected error when creating RTP client: %v", err) + } + c, err := NewClient( clientAddr, serverAddr, - "testClient", - 10*time.Millisecond, - 12345, + rtpClt, (*dummyLogger)(t).log, ) if err != nil { @@ -197,8 +201,6 @@ func TestReceiveAndSend(t *testing.T) { n, _, _ := conn.ReadFromUDP(buf) t.Logf("SERVER: receiver report received: \n%v\n", buf[:n]) - c.SetSequence(uint32(i)) - now := time.Now().Second() var time [8]byte binary.BigEndian.PutUint64(time[:], uint64(now)) diff --git a/protocol/rtcp/parse.go b/protocol/rtcp/parse.go index 2f756f4b..be680fa0 100644 --- a/protocol/rtcp/parse.go +++ b/protocol/rtcp/parse.go @@ -42,15 +42,9 @@ type Timestamp struct { // significant word, and the least significant word. If the given bytes do not // represent a valid receiver report, an error is returned. func ParseTimestamp(buf []byte) (Timestamp, error) { - if len(buf) < 4 { - return Timestamp{}, errors.New("bad RTCP packet, not of sufficient length") - } - if (buf[0]&0xc0)>>6 != rtcpVer { - return Timestamp{}, errors.New("incompatible RTCP version") - } - - if buf[1] != typeSenderReport { - return Timestamp{}, errors.New("RTCP packet is not of sender report type") + err := checkPacket(buf) + if err != nil { + return Timestamp{}, err } return Timestamp{ @@ -58,3 +52,25 @@ func ParseTimestamp(buf []byte) (Timestamp, error) { Fraction: binary.BigEndian.Uint32(buf[12:]), }, nil } + +func ParseSSRC(buf []byte) (uint32, error) { + err := checkPacket(buf) + if err != nil { + return 0, err + } + return binary.BigEndian.Uint32(buf[4:]), nil +} + +func checkPacket(buf []byte) error { + if len(buf) < 4 { + return errors.New("bad RTCP packet, not of sufficient length") + } + if (buf[0]&0xc0)>>6 != rtcpVer { + return errors.New("incompatible RTCP version") + } + + if buf[1] != typeSenderReport { + return errors.New("RTCP packet is not of sender report type") + } + return nil +} diff --git a/protocol/rtp/client.go b/protocol/rtp/client.go index 3ab856b9..53b4d98f 100644 --- a/protocol/rtp/client.go +++ b/protocol/rtp/client.go @@ -28,13 +28,19 @@ LICENSE package rtp import ( + "fmt" "net" + "sync" ) // Client describes an RTP client that can receive an RTP stream and implements // io.Reader. type Client struct { - r *PacketReader + r *PacketReader + ssrc uint32 + mu sync.Mutex + sequence uint16 + cycles uint16 } // NewClient returns a pointer to a new Client. @@ -54,14 +60,61 @@ func NewClient(addr string) (*Client, error) { return nil, err } + buf := make([]byte, 4096) + n, err := c.Read(buf) + if err != nil { + return nil, err + } + c.ssrc, err = SSRC(buf[:n]) + if err != nil { + return nil, fmt.Errorf("could not parse SSRC from RTP packet, failed with error: %v", err) + } + return c, nil } +// SSRC returns the identified for the source from which the RTP packets being +// received are coming from. +func (c *Client) SSRC() uint32 { + return c.ssrc +} + // Read implements io.Reader. func (c *Client) Read(p []byte) (int, error) { + n, err := c.r.Read(p) + if err != nil { + return n, err + } + s, _ := Sequence(p[:n]) + c.setSequence(s) return c.r.Read(p) } +// setSequence sets the most recently received sequence number, and updates the +// cycles count if the sequence number has rolled over. +func (c *Client) setSequence(s uint16) { + c.mu.Lock() + if s < c.sequence { + c.cycles++ + } + c.sequence = s + c.mu.Unlock() +} + +// Sequence returns the most recent RTP packet sequence number received. +func (c *Client) Sequence() uint16 { + c.mu.Lock() + defer c.mu.Unlock() + return c.sequence +} + +// Cycles returns the number of RTP sequence number cycles that have been received. +func (c *Client) Cycles() uint16 { + c.mu.Lock() + defer c.mu.Unlock() + return c.cycles +} + // PacketReader provides an io.Reader interface to an underlying UDP PacketConn. type PacketReader struct { net.PacketConn diff --git a/protocol/rtp/parse.go b/protocol/rtp/parse.go index fcc05907..16e64c5d 100644 --- a/protocol/rtp/parse.go +++ b/protocol/rtp/parse.go @@ -50,11 +50,9 @@ func Marker(d []byte) (bool, error) { // Payload returns the payload from an RTP packet provided the version is // compatible, otherwise an error is returned. func Payload(d []byte) ([]byte, error) { - if len(d) < defaultHeadSize { - panic("invalid RTP packet length") - } - if version(d) != rtpVer { - return nil, errors.New(badVer) + err := checkPacket(d) + if err != nil { + return nil, err } extLen := 0 if hasExt(d) { @@ -64,6 +62,38 @@ func Payload(d []byte) ([]byte, error) { return d[payloadIdx:], nil } +// SSRC returns the source identifier from an RTP packet. An error is return if +// the packet is not valid. +func SSRC(d []byte) (uint32, error) { + err := checkPacket(d) + if err != nil { + return 0, err + } + return binary.BigEndian.Uint32(d[8:]), nil +} + +// Sequence returns the sequence number of an RTP packet. An error is returned +// if the packet is not valid. +func Sequence(d []byte) (uint16, error) { + err := checkPacket(d) + if err != nil { + return 0, err + } + return binary.BigEndian.Uint16(d[2:]), nil +} + +// checkPacket checks the validity of the packet, firstly by checking size and +// then also checking that version is compatible with these utilities. +func checkPacket(d []byte) error { + if len(d) < defaultHeadSize { + return errors.New("invalid RTP packet length") + } + if version(d) != rtpVer { + return errors.New(badVer) + } + return nil +} + // hasExt returns true if an extension is present in the RTP packet. func hasExt(d []byte) bool { return (d[0] & 0x10 >> 4) == 1 diff --git a/protocol/rtsp/client.go b/protocol/rtsp/client.go index 26e539db..77689b64 100644 --- a/protocol/rtsp/client.go +++ b/protocol/rtsp/client.go @@ -59,6 +59,11 @@ func NewClient(addr string) (*Client, error) { return c, nil } +// Close closes the RTSP connection. +func (c *Client) Close() error { + return c.conn.Close() +} + // Describe forms and sends an RTSP request of method DESCRIBE to the RTSP server. func (c *Client) Describe() (*Response, error) { req, err := NewRequest("DESCRIBE", c.nextCSeq(), c.url, nil) diff --git a/revid/config.go b/revid/config.go index 784ab02c..441b7c6d 100644 --- a/revid/config.go +++ b/revid/config.go @@ -77,6 +77,7 @@ type Config struct { RTSPURL string RTPRecvAddr string RTCPAddr string + RTCPServerAddr string } // Possible modes for raspivid --exposure parameter. diff --git a/revid/revid.go b/revid/revid.go index 0d1b254e..d5eefaa4 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -44,6 +44,7 @@ import ( "bitbucket.org/ausocean/av/codec/lex" "bitbucket.org/ausocean/av/container/flv" "bitbucket.org/ausocean/av/container/mts" + "bitbucket.org/ausocean/av/protocol/rtcp" "bitbucket.org/ausocean/av/protocol/rtp" "bitbucket.org/ausocean/av/protocol/rtsp" "bitbucket.org/ausocean/iot/pi/netsender" @@ -614,7 +615,6 @@ func (r *Revid) setupInputForFile() (func() error, error) { func (r *Revid) startRTSPCamera() (func() error, error) { rtspClt, err := rtsp.NewClient(r.config.RTSPURL) - fmt.Printf("RTSPURL: %v\n", r.config.RTSPURL) if err != nil { return nil, err } @@ -654,16 +654,38 @@ func (r *Revid) startRTSPCamera() (func() error, error) { } r.config.Logger.Log(logger.Info, pkg+"RTSP server PLAY response", "response", resp.String()) - // TODO(saxon): use rtcp client to maintain rtp stream. - rtpClt, err := rtp.NewClient(r.config.RTPRecvAddr) if err != nil { return nil, err } + // TODO(saxon): use rtcp client to maintain rtp stream. + rtcpClt, err := rtcp.NewClient(r.config.RTCPAddr, r.config.RTCPServerAddr, rtpClt, r.config.Logger.Log) + if err != nil { + return nil, err + } + + go func() { + for { + select { + case <-rtcpClt.Done(): + return + case err := <-rtcpClt.Err(): + r.config.Logger.Log(logger.Warning, pkg+"RTCP error", "error", err.Error()) + default: + } + } + }() + + rtcpClt.Start() + r.wg.Add(1) go r.processFrom(rtpClt, time.Second/time.Duration(r.config.FrameRate)) - return func() error { return nil }, nil + return func() error { + rtspClt.Close() + rtcpClt.Stop() + return nil + }, nil } func (r *Revid) processFrom(read io.Reader, delay time.Duration) {