av/protocol/rtcp/client.go

214 lines
4.9 KiB
Go

package rtcp
import (
"encoding/binary"
"errors"
"fmt"
"math"
"net"
"sync"
"time"
)
const (
senderSSRC = 3605043418
defaultClientName = "client"
delayUnit = 1.0 / 65536.0
)
// client is an rtcp client that will hadle receiving SenderReports from a server
// and sending out ReceiverReports.
type client struct {
ErrChan chan error
cAddr *net.UDPAddr
sAddr *net.UDPAddr
name string
sourceSSRC uint32
mu sync.Mutex
sequence uint32
senderTs [64]byte
interval time.Duration
receiveTime time.Time
}
// NewClient returns a pointer to a new client.
func NewClient(clientAddress, serverAddress, name string, sendInterval time.Duration, rtpSSRC uint32) (*client, error) {
if name == "" {
name = defaultClientName
}
c := &client{
name: name,
ErrChan: make(chan error),
interval: sendInterval,
sourceSSRC: rtpSSRC,
}
var err error
c.cAddr, err = net.ResolveUDPAddr("udp", clientAddress)
if err != nil {
return nil, errors.New(fmt.Sprintf("can't resolve client address, failed with error: %v", err))
}
c.sAddr, err = net.ResolveUDPAddr("udp", serverAddress)
if err != nil {
return nil, errors.New(fmt.Sprintf("can't resolve server address, failed with error: %v", err))
}
return c, nil
}
// Start starts the listen and send routines. This will start the process of
// receiving and parsing sender reports, and the process of sending receiver
// reports to the server.
func (c *client) Start() {
go c.listen()
go c.send()
}
// listen reads from the UDP connection and parses SenderReports.
func (c *client) listen() {
conn, err := net.ListenUDP("udp", c.cAddr)
if err != nil {
c.ErrChan <- err
}
buf := make([]byte, 4096)
for {
n, _, _ := conn.ReadFromUDP(buf)
c.parse(buf[:n])
}
}
// send writes receiver reports to the server.
func (c *client) send() {
conn, err := net.DialUDP("udp", c.cAddr, c.sAddr)
if err != nil {
c.ErrChan <- err
}
for {
time.Sleep(c.interval)
report := ReceiverReport{
Header: Header{
Version: 2,
Padding: false,
ReportCount: 1,
Type: typeReceiverReport,
},
SenderSSRC: senderSSRC,
Blocks: []ReportBlock{
ReportBlock{
SSRC: c.sourceSSRC,
FractionLost: 0,
PacketsLost: math.MaxUint32,
HighestSequence: c.highestSequence(),
Jitter: c.jitter(),
LSR: c.lastSenderTs(),
DLSR: c.delay(),
},
},
Extensions: nil,
}
description := SourceDescription{
Header: Header{
Version: 2,
Padding: false,
ReportCount: 1,
Type: typeSourceDescription,
},
Chunks: []Chunk{
Chunk{
SSRC: senderSSRC,
Items: []SDESItem{
SDESItem{
Type: typeCName,
Text: []byte(c.name),
},
},
},
},
}
reportBytes := report.Bytes()
reportLen := len(reportBytes)
descriptionBytes := description.Bytes()
totalLength := reportLen + len(descriptionBytes)
bytes := make([]byte, totalLength)
copy(bytes, reportBytes)
copy(bytes[reportLen:], descriptionBytes)
_, err := conn.Write(bytes)
if err != nil {
c.ErrChan <- err
}
}
}
// parse will read important statistics from sender reports.
func (c *client) parse(buf []byte) {
c.received()
msw, lsw, err := Timestamp(buf)
if err != nil {
c.ErrChan <- errors.New(fmt.Sprintf("could not get timestamp from sender report, failed with error: %v", err))
}
c.setSenderTs(msw, lsw)
}
// UpdateSequence will allow updating of the highest sequence number received
// through an rtp stream.
func (c *client) UpdateSequence(s uint32) {
c.mu.Lock()
c.sequence = s
c.mu.Unlock()
}
// highestSequence will return the highest sequence number received through rtp.
func (c *client) highestSequence() uint32 {
var s uint32
c.mu.Lock()
s = c.sequence
c.mu.Unlock()
return s
}
// jitter returns the interarrival jitter as described by RTCP specifications:
// https://tools.ietf.org/html/rfc3550
func (c *client) jitter() uint32 {
return 0
}
// setSenderTs allows us to safely set the current sender report timestamp.
func (c *client) setSenderTs(msw, lsw uint32) {
c.mu.Lock()
binary.BigEndian.PutUint32(c.senderTs[:], msw)
binary.BigEndian.PutUint32(c.senderTs[4:], lsw)
c.mu.Unlock()
}
// lastSenderTs returns the timestamp of the most recent sender report.
func (c *client) lastSenderTs() uint32 {
var ts uint32
c.mu.Lock()
ts = binary.BigEndian.Uint32(c.senderTs[2:])
c.mu.Unlock()
return ts
}
// delay returns the duration between the receive time of the last sender report
// and now. This is called when forming a receiver report.
func (c *client) delay() uint32 {
var receiveTime time.Time
c.mu.Lock()
receiveTime = c.receiveTime
c.mu.Unlock()
now := time.Now()
return uint32(now.Sub(receiveTime).Seconds() / delayUnit)
}
// received is called when a sender report is received to mark the receive time.
func (c *client) received() {
c.mu.Lock()
c.receiveTime = time.Now()
c.mu.Unlock()
}