mirror of https://bitbucket.org/ausocean/av.git
293 lines
8.2 KiB
Go
293 lines
8.2 KiB
Go
/*
|
|
NAME
|
|
client.go
|
|
|
|
DESCRIPTION
|
|
Client.go provides an implemntation of a basic RTCP Client that will send
|
|
receiver reports, and receive sender reports to parse relevant statistics.
|
|
|
|
AUTHORS
|
|
Saxon A. Nelson-Milton <saxon@ausocean.org>
|
|
|
|
LICENSE
|
|
This is Copyright (C) 2019 the Australian Ocean Lab (AusOcean)
|
|
|
|
It is free software: you can redistribute it and/or modify them
|
|
under the terms of the GNU General Public License as published by the
|
|
Free Software Foundation, either version 3 of the License, or (at your
|
|
option) any later version.
|
|
|
|
It is distributed in the hope that it will be useful, but WITHOUT
|
|
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
|
for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
in gpl.txt. If not, see http://www.gnu.org/licenses.
|
|
*/
|
|
|
|
package rtcp
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"errors"
|
|
"fmt"
|
|
"math"
|
|
"net"
|
|
"sync"
|
|
"time"
|
|
|
|
"bitbucket.org/ausocean/utils/logger"
|
|
)
|
|
|
|
const (
|
|
senderSSRC = 1 // Any non-zero value will do.
|
|
defaultClientName = "Client"
|
|
delayUnit = 1.0 / 65536.0
|
|
pkg = "rtcp: "
|
|
rtcpVer = 2
|
|
receiverBufSize = 200
|
|
)
|
|
|
|
type Log func(lvl int8, msg string, args ...interface{})
|
|
|
|
// Client is an RTCP Client that will handle receiving SenderReports from a server
|
|
// and sending out ReceiverReports.
|
|
type Client struct {
|
|
cAddr *net.UDPAddr // Address of client.
|
|
sAddr *net.UDPAddr // Address of RTSP server.
|
|
name string // Name of the client for source description purposes.
|
|
sourceSSRC uint32 // Source identifier of this client.
|
|
mu sync.Mutex // Will be used to change parameters during operation safely.
|
|
seq uint32 // Last RTP sequence number.
|
|
senderTs [8]byte // The timestamp of the last sender report.
|
|
interval time.Duration // Interval between sender report and receiver report.
|
|
receiveTime time.Time // Time last sender report was received.
|
|
buf [receiverBufSize]byte // Buf used to store the receiver report and source descriptions.
|
|
conn *net.UDPConn // The UDP connection used for receiving and sending RTSP packets.
|
|
wg sync.WaitGroup // This is used to wait for send and recv routines to stop when Client is stopped.
|
|
quit chan struct{} // Channel used to communicate quit signal to send and recv routines.
|
|
log Log // Used to log any messages.
|
|
|
|
err chan error // Client will send any errors through this chan. Can be accessed by Err().
|
|
}
|
|
|
|
// NewClient returns a pointer to a new Client.
|
|
func NewClient(clientAddress, serverAddress, name string, sendInterval time.Duration, rtpSSRC uint32, l Log) (*Client, error) {
|
|
if name == "" {
|
|
name = defaultClientName
|
|
}
|
|
|
|
c := &Client{
|
|
name: name,
|
|
err: make(chan error),
|
|
quit: make(chan struct{}),
|
|
interval: sendInterval,
|
|
sourceSSRC: rtpSSRC,
|
|
log: l,
|
|
}
|
|
|
|
var err error
|
|
c.cAddr, err = net.ResolveUDPAddr("udp", clientAddress)
|
|
if err != nil {
|
|
return nil, errors.New(fmt.Sprintf("can't resolve Client address, failed with error: %v\n", err))
|
|
}
|
|
|
|
c.sAddr, err = net.ResolveUDPAddr("udp", serverAddress)
|
|
if err != nil {
|
|
return nil, errors.New(fmt.Sprintf("can't resolve server address, failed with error: %v\n", err))
|
|
}
|
|
|
|
c.conn, err = net.DialUDP("udp", c.cAddr, c.sAddr)
|
|
if err != nil {
|
|
return nil, errors.New(fmt.Sprintf("can't dial, failed with error: %v\n", err))
|
|
}
|
|
return c, nil
|
|
}
|
|
|
|
// Start starts the listen and send routines. This will start the process of
|
|
// receiving and parsing sender reports, and the process of sending receiver
|
|
// reports to the server.
|
|
func (c *Client) Start() {
|
|
c.log(logger.Debug, pkg+"Client is starting")
|
|
c.wg.Add(2)
|
|
go c.recv()
|
|
go c.send()
|
|
}
|
|
|
|
// Stop sends a quit signal to the send and receive routines and closes the
|
|
// UDP connection. It will wait until both routines have returned.
|
|
func (c *Client) Stop() {
|
|
c.log(logger.Debug, pkg+"Client is stopping")
|
|
close(c.quit)
|
|
c.conn.Close()
|
|
c.wg.Wait()
|
|
}
|
|
|
|
// Err provides read access to the Client err channel. This must be checked
|
|
// otherwise the client will block if an error encountered.
|
|
func (c *Client) Err() <-chan error {
|
|
return c.err
|
|
}
|
|
|
|
// recv reads from the UDP connection and parses SenderReports.
|
|
func (c *Client) recv() {
|
|
defer c.wg.Done()
|
|
c.log(logger.Debug, pkg+"Client is receiving")
|
|
buf := make([]byte, 4096)
|
|
for {
|
|
select {
|
|
case <-c.quit:
|
|
return
|
|
default:
|
|
n, _, err := c.conn.ReadFromUDP(buf)
|
|
if err != nil {
|
|
c.err <- err
|
|
continue
|
|
}
|
|
c.log(logger.Debug, pkg+"sender report received", "report", buf[:n])
|
|
c.parse(buf[:n])
|
|
}
|
|
}
|
|
}
|
|
|
|
// send writes receiver reports to the server.
|
|
func (c *Client) send() {
|
|
defer c.wg.Done()
|
|
c.log(logger.Debug, pkg+"Client is sending")
|
|
for {
|
|
select {
|
|
case <-c.quit:
|
|
return
|
|
default:
|
|
time.Sleep(c.interval)
|
|
|
|
report := ReceiverReport{
|
|
Header: Header{
|
|
Version: rtcpVer,
|
|
Padding: false,
|
|
ReportCount: 1,
|
|
Type: typeReceiverReport,
|
|
},
|
|
SenderSSRC: senderSSRC,
|
|
Blocks: []ReportBlock{
|
|
ReportBlock{
|
|
SourceIdentifier: c.sourceSSRC,
|
|
FractionLost: 0,
|
|
PacketsLost: math.MaxUint32,
|
|
HighestSequence: c.sequence(),
|
|
Jitter: c.jitter(),
|
|
SenderReportTs: c.lastSenderTs(),
|
|
SenderReportDelay: c.delay(),
|
|
},
|
|
},
|
|
Extensions: nil,
|
|
}
|
|
|
|
description := Description{
|
|
Header: Header{
|
|
Version: rtcpVer,
|
|
Padding: false,
|
|
ReportCount: 1,
|
|
Type: typeDescription,
|
|
},
|
|
Chunks: []Chunk{
|
|
Chunk{
|
|
SSRC: senderSSRC,
|
|
Items: []SDESItem{
|
|
SDESItem{
|
|
Type: typeCName,
|
|
Text: []byte(c.name),
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
c.log(logger.Debug, pkg+"sending receiver report")
|
|
_, err := c.conn.Write(c.formPayload(&report, &description))
|
|
if err != nil {
|
|
c.err <- err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// formPayload takes a pointer to a ReceiverReport and a pointer to a
|
|
// Source Description and calls Bytes on both, writing to the underlying Client
|
|
// buf. A slice to the combined writtem memory is returned.
|
|
func (c *Client) formPayload(r *ReceiverReport, d *Description) []byte {
|
|
rl := len(r.Bytes(c.buf[:]))
|
|
dl := len(d.Bytes(c.buf[rl:]))
|
|
t := rl + dl
|
|
if t > cap(c.buf) {
|
|
panic("Client buf not big enough")
|
|
}
|
|
return c.buf[:t]
|
|
}
|
|
|
|
// parse will read important statistics from sender reports.
|
|
func (c *Client) parse(buf []byte) {
|
|
c.markReceivedTime()
|
|
t, err := ParseTimestamp(buf)
|
|
if err != nil {
|
|
c.err <- fmt.Errorf("could not get timestamp from sender report, failed with error: %v", err)
|
|
}
|
|
c.setSenderTs(t)
|
|
}
|
|
|
|
// SetSequence will allow updating of the highest sequence number received
|
|
// through an RTP stream.
|
|
func (c *Client) SetSequence(s uint32) {
|
|
c.mu.Lock()
|
|
c.seq = s
|
|
c.mu.Unlock()
|
|
}
|
|
|
|
// sequence will return the highest sequence number received through RTP.
|
|
func (c *Client) sequence() uint32 {
|
|
c.mu.Lock()
|
|
s := c.seq
|
|
c.mu.Unlock()
|
|
return s
|
|
}
|
|
|
|
// jitter returns the interarrival jitter as described by RTCP specifications:
|
|
// https://tools.ietf.org/html/rfc3550
|
|
// TODO(saxon): complete this.
|
|
func (c *Client) jitter() uint32 {
|
|
return 0
|
|
}
|
|
|
|
// setSenderTs allows us to safely set the current sender report timestamp.
|
|
func (c *Client) setSenderTs(t Timestamp) {
|
|
c.mu.Lock()
|
|
binary.BigEndian.PutUint32(c.senderTs[:], t.Seconds)
|
|
binary.BigEndian.PutUint32(c.senderTs[4:], t.Fraction)
|
|
c.mu.Unlock()
|
|
}
|
|
|
|
// lastSenderTs returns the timestamp of the most recent sender report.
|
|
func (c *Client) lastSenderTs() uint32 {
|
|
c.mu.Lock()
|
|
t := binary.BigEndian.Uint32(c.senderTs[2:])
|
|
c.mu.Unlock()
|
|
return t
|
|
}
|
|
|
|
// delay returns the duration between the receive time of the last sender report
|
|
// and now. This is called when forming a receiver report.
|
|
func (c *Client) delay() uint32 {
|
|
c.mu.Lock()
|
|
t := c.receiveTime
|
|
c.mu.Unlock()
|
|
return uint32(time.Now().Sub(t).Seconds() / delayUnit)
|
|
}
|
|
|
|
// markReceivedTime is called when a sender report is received to mark the receive time.
|
|
func (c *Client) markReceivedTime() {
|
|
c.mu.Lock()
|
|
c.receiveTime = time.Now()
|
|
c.mu.Unlock()
|
|
}
|