diff --git a/protocol/rtcp/client.go b/protocol/rtcp/client.go index 796e4489..715b1089 100644 --- a/protocol/rtcp/client.go +++ b/protocol/rtcp/client.go @@ -1,9 +1,9 @@ /* NAME - client.go + Client.go DESCRIPTION - client.go provides an implemntation of a basic RTCP client that will send + Client.go provides an implemntation of a basic RTCP Client that will send receiver reports, and receive sender reports to parse relevant statistics. AUTHORS @@ -41,44 +41,45 @@ import ( ) const ( - senderSSRC = 3605043418 - defaultClientName = "client" + senderSSRC = 1 // Any non-zero value will do. + defaultClientName = "Client" delayUnit = 1.0 / 65536.0 pkg = "rtcp: " rtcpVer = 2 + receiverBufSize = 200 ) type log func(lvl int8, msg string, args ...interface{}) -// client is an RTCP client that will hadle receiving SenderReports from a server +// Client is an RTCP Client that will handle receiving SenderReports from a server // and sending out ReceiverReports. -type client struct { - ErrChan chan error +type Client struct { + ErrChan chan error // Client will send any errors through this chan. - cAddr *net.UDPAddr - sAddr *net.UDPAddr - name string - sourceSSRC uint32 - mu sync.Mutex - sequence uint32 - senderTs [64]byte - interval time.Duration - receiveTime time.Time - buf [200]byte - conn *net.UDPConn - wg sync.WaitGroup - quitSend chan struct{} - quitRecv chan struct{} - log + cAddr *net.UDPAddr // Address of client. + sAddr *net.UDPAddr // Address of RTSP server. + name string // Name of the client for source description purposes. + sourceSSRC uint32 // Source identifier of this client. + mu sync.Mutex // Will be used to change parameters during operation safely. + sequence uint32 // Last RTP sequence number. + senderTs [8]byte // The timestamp of the last sender report. + interval time.Duration // Interval between sender report and receiver report. + receiveTime time.Time // Time last sender report was received. + buf [receiverBufSize]byte // Buf used to store the receiver report and source descriptions. + conn *net.UDPConn // The UDP connection used for receiving and sending RTSP packets. + wg sync.WaitGroup // This is used to wait for send and recv routines to stop when Client is stopped. + quitSend chan struct{} // Channel used to communicate quit signal to send routine. + quitRecv chan struct{} // Channel used to communicate quit signal to recv routine. + log // Used to log any messages. } -// NewClient returns a pointer to a new client. -func NewClient(clientAddress, serverAddress, name string, sendInterval time.Duration, rtpSSRC uint32, l log) (*client, error) { +// NewClient returns a pointer to a new Client. +func NewClient(clientAddress, serverAddress, name string, sendInterval time.Duration, rtpSSRC uint32, l log) (*Client, error) { if name == "" { name = defaultClientName } - c := &client{ + c := &Client{ name: name, ErrChan: make(chan error, 2), quitSend: make(chan struct{}), @@ -91,7 +92,7 @@ func NewClient(clientAddress, serverAddress, name string, sendInterval time.Dura var err error c.cAddr, err = net.ResolveUDPAddr("udp", clientAddress) if err != nil { - return nil, errors.New(fmt.Sprintf("can't resolve client address, failed with error: %v\n", err)) + return nil, errors.New(fmt.Sprintf("can't resolve Client address, failed with error: %v\n", err)) } c.sAddr, err = net.ResolveUDPAddr("udp", serverAddress) @@ -109,18 +110,17 @@ func NewClient(clientAddress, serverAddress, name string, sendInterval time.Dura // Start starts the listen and send routines. This will start the process of // receiving and parsing sender reports, and the process of sending receiver // reports to the server. -func (c *client) Start() { - c.log(logger.Debug, pkg+"client is starting") - c.wg.Add(1) +func (c *Client) Start() { + c.log(logger.Debug, pkg+"Client is starting") + c.wg.Add(2) go c.recv() - c.wg.Add(1) go c.send() } // Stop sends a quit signal to the send and receive routines and closes the // UDP connection. It will wait until both routines have returned. -func (c *client) Stop() { - c.log(logger.Debug, pkg+"client is stopping") +func (c *Client) Stop() { + c.log(logger.Debug, pkg+"Client is stopping") close(c.quitSend) close(c.quitRecv) c.conn.Close() @@ -128,9 +128,9 @@ func (c *client) Stop() { } // recv reads from the UDP connection and parses SenderReports. -func (c *client) recv() { +func (c *Client) recv() { defer c.wg.Done() - c.log(logger.Debug, pkg+"client is receiving") + c.log(logger.Debug, pkg+"Client is receiving") buf := make([]byte, 4096) for { select { @@ -149,9 +149,9 @@ func (c *client) recv() { } // send writes receiver reports to the server. -func (c *client) send() { +func (c *Client) send() { defer c.wg.Done() - c.log(logger.Debug, pkg+"client is sending") + c.log(logger.Debug, pkg+"Client is sending") for { select { case <-c.quitSend: @@ -211,38 +211,45 @@ func (c *client) send() { } // formPayload takes a pointer to a ReceiverReport and a pointer to a -// Source Description and calls Bytes on both, writing to the underlying client +// Source Description and calls Bytes on both, writing to the underlying Client // buf. A slice to the combined writtem memory is returned. -func (c *client) formPayload(r *ReceiverReport, d *SourceDescription) []byte { +func (c *Client) formPayload(r *ReceiverReport, d *SourceDescription) []byte { rl := len(r.Bytes(c.buf[:])) dl := len(d.Bytes(c.buf[rl:])) t := rl + dl if t > cap(c.buf) { - panic("client buf not big enough") + panic("Client buf not big enough") } return c.buf[:t] } // parse will read important statistics from sender reports. -func (c *client) parse(buf []byte) { - c.received() +func (c *Client) parse(buf []byte) { + c.markReceivedTime() msw, lsw, err := Timestamp(buf) if err != nil { c.ErrChan <- errors.New(fmt.Sprintf("could not get timestamp from sender report, failed with error: %v", err)) } - c.setSenderTs(msw, lsw) + c.setSenderTs( + struct { + msw uint32 + lsw uint32 + }{ + msw, + lsw, + }) } -// UpdateSequence will allow updating of the highest sequence number received +// SetSequence will allow updating of the highest sequence number received // through an RTP stream. -func (c *client) UpdateSequence(s uint32) { +func (c *Client) SetSequence(s uint32) { c.mu.Lock() c.sequence = s c.mu.Unlock() } // highestSequence will return the highest sequence number received through RTP. -func (c *client) highestSequence() uint32 { +func (c *Client) highestSequence() uint32 { c.mu.Lock() s := c.sequence c.mu.Unlock() @@ -251,20 +258,20 @@ func (c *client) highestSequence() uint32 { // jitter returns the interarrival jitter as described by RTCP specifications: // https://tools.ietf.org/html/rfc3550 -func (c *client) jitter() uint32 { +func (c *Client) jitter() uint32 { return 0 } // setSenderTs allows us to safely set the current sender report timestamp. -func (c *client) setSenderTs(msw, lsw uint32) { +func (c *Client) setSenderTs(t struct{ msw, lsw uint32 }) { c.mu.Lock() - binary.BigEndian.PutUint32(c.senderTs[:], msw) - binary.BigEndian.PutUint32(c.senderTs[4:], lsw) + binary.BigEndian.PutUint32(c.senderTs[:], t.msw) + binary.BigEndian.PutUint32(c.senderTs[4:], t.lsw) c.mu.Unlock() } // lastSenderTs returns the timestamp of the most recent sender report. -func (c *client) lastSenderTs() uint32 { +func (c *Client) lastSenderTs() uint32 { c.mu.Lock() t := binary.BigEndian.Uint32(c.senderTs[2:]) c.mu.Unlock() @@ -273,7 +280,7 @@ func (c *client) lastSenderTs() uint32 { // delay returns the duration between the receive time of the last sender report // and now. This is called when forming a receiver report. -func (c *client) delay() uint32 { +func (c *Client) delay() uint32 { c.mu.Lock() t := c.receiveTime c.mu.Unlock() @@ -281,7 +288,7 @@ func (c *client) delay() uint32 { } // received is called when a sender report is received to mark the receive time. -func (c *client) received() { +func (c *Client) markReceivedTime() { c.mu.Lock() c.receiveTime = time.Now() c.mu.Unlock() diff --git a/protocol/rtcp/client_test.go b/protocol/rtcp/client_test.go index a05822fe..62491bf9 100644 --- a/protocol/rtcp/client_test.go +++ b/protocol/rtcp/client_test.go @@ -101,7 +101,7 @@ func TestFormPayload(t *testing.T) { }, } - c := &client{} + c := &Client{} p := c.formPayload(&report, &description) if !bytes.Equal(p, expect) { @@ -197,7 +197,7 @@ func TestReceiveAndSend(t *testing.T) { n, _, _ := conn.ReadFromUDP(buf) t.Logf("SERVER: receiver report received: \n%v\n", buf[:n]) - c.UpdateSequence(uint32(i)) + c.SetSequence(uint32(i)) now := time.Now().Second() var time [8]byte diff --git a/protocol/rtcp/rtcp.go b/protocol/rtcp/rtcp.go index 1fd3cf3f..109e129e 100644 --- a/protocol/rtcp/rtcp.go +++ b/protocol/rtcp/rtcp.go @@ -138,7 +138,7 @@ func (d *SourceDescription) Bytes(buf []byte) []byte { // bodyLen calculates the body length of a source description packet in bytes. func (d *SourceDescription) bodyLen() int { - l := 0 + var l int for _, c := range d.Chunks { l += c.len() }