diff --git a/protocol/rtcp/client.go b/protocol/rtcp/client.go new file mode 100644 index 00000000..6717540f --- /dev/null +++ b/protocol/rtcp/client.go @@ -0,0 +1,207 @@ +package rtcp + +import ( + "encoding/binary" + "errors" + "fmt" + "math" + "net" + "sync" + "time" +) + +const ( + senderSSRC = 3605043418 + defaultClientName = "client" + delayUnit = 1.0 / 65536.0 +) + +// client is an rtcp client that will hadle receiving SenderReports from a server +// and sending out ReceiverReports. +type client struct { + ErrChan chan error + cAddr *net.UDPAddr + sAddr *net.UDPAddr + name string + sourceSSRC uint32 + mu sync.Mutex + sequence uint32 + senderTs [64]byte + interval time.Duration + receiveTime time.Time +} + +// NewClient returns a pointer to a new client. +func NewClient(clientAddress, serverAddress, name string, sendInterval time.Duration, rtpSSRC uint32) (*client, error) { + if name == "" { + name = defaultClientName + } + + c := &client{ + name: name, + ErrChan: make(chan error), + interval: sendInterval, + sourceSSRC: rtpSSRC, + } + + var err error + c.cAddr, err = net.ResolveUDPAddr("udp", clientAddress) + if err != nil { + return nil, errors.New(fmt.Sprintf("can't resolve client address, failed with error: %v", err)) + } + + c.sAddr, err = net.ResolveUDPAddr("udp", serverAddress) + if err != nil { + return nil, errors.New(fmt.Sprintf("can't resolve server address, failed with error: %v", err)) + } + + return c, nil +} + +// Start starts the listen and send routines. This will start the process of +// receiving and parsing SenderReports, and the process of sending receiver +// reports to the server. +func (c *client) Start() { + go c.listen() + go c.send() +} + +// Listen reads from the UDP connection and parses SenderReports. +func (c *client) listen() { + conn, err := net.ListenUDP("udp", c.cAddr) + if err != nil { + c.ErrChan <- err + } + buf := make([]byte, 4096) + for { + n, _, _ := conn.ReadFromUDP(buf) + c.parse(buf[:n]) + } +} + +// send write sender reports to the server. +func (c *client) send() { + conn, err := net.DialUDP("udp", c.cAddr, c.sAddr) + if err != nil { + c.ErrChan <- err + } + for { + time.Sleep(c.interval) + report := ReceiverReport{ + Header: Header{ + Version: 2, + Padding: false, + ReportCount: 1, + Type: typeReceiverReport, + }, + SenderSSRC: senderSSRC, + Blocks: []ReportBlock{ + ReportBlock{ + SSRC: c.sourceSSRC, + FractionLost: 0, + PacketsLost: math.MaxUint32, + HighestSequence: c.highestSequence(), + Jitter: c.jitter(), + LSR: c.lastSenderTs(), + DLSR: c.delay(), + }, + }, + Extensions: nil, + } + + description := SourceDescription{ + Header: Header{ + Version: 2, + Padding: false, + ReportCount: 1, + Type: typeSourceDescription, + }, + Chunks: []Chunk{ + Chunk{ + SSRC: senderSSRC, + Items: []SDESItem{ + SDESItem{ + Type: typeCName, + Text: []byte(c.name), + }, + }, + }, + }, + } + + reportBytes := report.Bytes() + reportLen := len(reportBytes) + descriptionBytes := description.Bytes() + totalLength := reportLen + len(descriptionBytes) + bytes := make([]byte, totalLength) + copy(bytes, reportBytes) + copy(bytes[reportLen:], descriptionBytes) + + _, err := conn.Write(bytes) + if err != nil { + c.ErrChan <- err + } + } +} + +// parse will read important statistics from sender reports. +func (c *client) parse(buf []byte) { + c.received() + msw, err := TimestampMSW(buf) + if err != nil { + c.ErrChan <- errors.New(fmt.Sprintf("could not get timestamp msw from sender report, failed with error: %v", err)) + } + lsw, err := TimestampLSW(buf) + if err != nil { + c.ErrChan <- errors.New(fmt.Sprintf("could not get timestamp lsw from sender report, failed with error: %v", err)) + } + c.setSenderTs(msw, lsw) +} + +func (c *client) UpdateSequence(s uint32) { + c.mu.Lock() + c.sequence = s + c.mu.Unlock() +} + +func (c *client) highestSequence() uint32 { + var s uint32 + c.mu.Lock() + s = c.sequence + c.mu.Unlock() + return s +} + +func (c *client) jitter() uint32 { + return 0 +} + +func (c *client) setSenderTs(msw, lsw uint32) { + c.mu.Lock() + binary.BigEndian.PutUint32(c.senderTs[:], msw) + binary.BigEndian.PutUint32(c.senderTs[4:], lsw) + c.mu.Unlock() +} + +func (c *client) lastSenderTs() uint32 { + var ts uint32 + c.mu.Lock() + ts = binary.BigEndian.Uint32(c.senderTs[2:]) + c.mu.Unlock() + return ts +} + +func (c *client) delay() uint32 { + var receiveTime time.Time + c.mu.Lock() + receiveTime = c.receiveTime + c.mu.Unlock() + now := time.Now() + return uint32(now.Sub(receiveTime).Seconds() / delayUnit) +} + +func (c *client) received() { + c.mu.Lock() + c.receiveTime = time.Now() + c.mu.Unlock() +}