/* NAME Client.go DESCRIPTION Client.go provides an implemntation of a basic RTCP Client that will send receiver reports, and receive sender reports to parse relevant statistics. AUTHORS Saxon A. Nelson-Milton LICENSE This is Copyright (C) 2019 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License in gpl.txt. If not, see http://www.gnu.org/licenses. */ package rtcp import ( "encoding/binary" "errors" "fmt" "math" "net" "sync" "time" "bitbucket.org/ausocean/utils/logger" ) const ( senderSSRC = 1 // Any non-zero value will do. defaultClientName = "Client" delayUnit = 1.0 / 65536.0 pkg = "rtcp: " rtcpVer = 2 receiverBufSize = 200 ) type log func(lvl int8, msg string, args ...interface{}) // Client is an RTCP Client that will handle receiving SenderReports from a server // and sending out ReceiverReports. type Client struct { ErrChan chan error // Client will send any errors through this chan. cAddr *net.UDPAddr // Address of client. sAddr *net.UDPAddr // Address of RTSP server. name string // Name of the client for source description purposes. sourceSSRC uint32 // Source identifier of this client. mu sync.Mutex // Will be used to change parameters during operation safely. sequence uint32 // Last RTP sequence number. senderTs [8]byte // The timestamp of the last sender report. interval time.Duration // Interval between sender report and receiver report. receiveTime time.Time // Time last sender report was received. buf [receiverBufSize]byte // Buf used to store the receiver report and source descriptions. conn *net.UDPConn // The UDP connection used for receiving and sending RTSP packets. wg sync.WaitGroup // This is used to wait for send and recv routines to stop when Client is stopped. quitSend chan struct{} // Channel used to communicate quit signal to send routine. quitRecv chan struct{} // Channel used to communicate quit signal to recv routine. log // Used to log any messages. } // NewClient returns a pointer to a new Client. func NewClient(clientAddress, serverAddress, name string, sendInterval time.Duration, rtpSSRC uint32, l log) (*Client, error) { if name == "" { name = defaultClientName } c := &Client{ name: name, ErrChan: make(chan error, 2), quitSend: make(chan struct{}), quitRecv: make(chan struct{}), interval: sendInterval, sourceSSRC: rtpSSRC, log: l, } var err error c.cAddr, err = net.ResolveUDPAddr("udp", clientAddress) if err != nil { return nil, errors.New(fmt.Sprintf("can't resolve Client address, failed with error: %v\n", err)) } c.sAddr, err = net.ResolveUDPAddr("udp", serverAddress) if err != nil { return nil, errors.New(fmt.Sprintf("can't resolve server address, failed with error: %v\n", err)) } c.conn, err = net.DialUDP("udp", c.cAddr, c.sAddr) if err != nil { return nil, errors.New(fmt.Sprintf("can't dial, failed with error: %v\n", err)) } return c, nil } // Start starts the listen and send routines. This will start the process of // receiving and parsing sender reports, and the process of sending receiver // reports to the server. func (c *Client) Start() { c.log(logger.Debug, pkg+"Client is starting") c.wg.Add(2) go c.recv() go c.send() } // Stop sends a quit signal to the send and receive routines and closes the // UDP connection. It will wait until both routines have returned. func (c *Client) Stop() { c.log(logger.Debug, pkg+"Client is stopping") close(c.quitSend) close(c.quitRecv) c.conn.Close() c.wg.Wait() } // recv reads from the UDP connection and parses SenderReports. func (c *Client) recv() { defer c.wg.Done() c.log(logger.Debug, pkg+"Client is receiving") buf := make([]byte, 4096) for { select { case <-c.quitRecv: return default: n, _, err := c.conn.ReadFromUDP(buf) if err != nil { c.ErrChan <- err continue } c.log(logger.Debug, pkg+"sender report received", "report", buf[:n]) c.parse(buf[:n]) } } } // send writes receiver reports to the server. func (c *Client) send() { defer c.wg.Done() c.log(logger.Debug, pkg+"Client is sending") for { select { case <-c.quitSend: return default: time.Sleep(c.interval) report := ReceiverReport{ Header: Header{ Version: rtcpVer, Padding: false, ReportCount: 1, Type: typeReceiverReport, }, SenderSSRC: senderSSRC, Blocks: []ReportBlock{ ReportBlock{ SSRC: c.sourceSSRC, FractionLost: 0, PacketsLost: math.MaxUint32, HighestSequence: c.highestSequence(), Jitter: c.jitter(), LSR: c.lastSenderTs(), DLSR: c.delay(), }, }, Extensions: nil, } description := SourceDescription{ Header: Header{ Version: rtcpVer, Padding: false, ReportCount: 1, Type: typeSourceDescription, }, Chunks: []Chunk{ Chunk{ SSRC: senderSSRC, Items: []SDESItem{ SDESItem{ Type: typeCName, Text: []byte(c.name), }, }, }, }, } c.log(logger.Debug, pkg+"sending receiver report") _, err := c.conn.Write(c.formPayload(&report, &description)) if err != nil { c.ErrChan <- err } } } } // formPayload takes a pointer to a ReceiverReport and a pointer to a // Source Description and calls Bytes on both, writing to the underlying Client // buf. A slice to the combined writtem memory is returned. func (c *Client) formPayload(r *ReceiverReport, d *SourceDescription) []byte { rl := len(r.Bytes(c.buf[:])) dl := len(d.Bytes(c.buf[rl:])) t := rl + dl if t > cap(c.buf) { panic("Client buf not big enough") } return c.buf[:t] } // parse will read important statistics from sender reports. func (c *Client) parse(buf []byte) { c.markReceivedTime() msw, lsw, err := Timestamp(buf) if err != nil { c.ErrChan <- errors.New(fmt.Sprintf("could not get timestamp from sender report, failed with error: %v", err)) } c.setSenderTs( struct { msw uint32 lsw uint32 }{ msw, lsw, }) } // SetSequence will allow updating of the highest sequence number received // through an RTP stream. func (c *Client) SetSequence(s uint32) { c.mu.Lock() c.sequence = s c.mu.Unlock() } // highestSequence will return the highest sequence number received through RTP. func (c *Client) highestSequence() uint32 { c.mu.Lock() s := c.sequence c.mu.Unlock() return s } // jitter returns the interarrival jitter as described by RTCP specifications: // https://tools.ietf.org/html/rfc3550 func (c *Client) jitter() uint32 { return 0 } // setSenderTs allows us to safely set the current sender report timestamp. func (c *Client) setSenderTs(t struct{ msw, lsw uint32 }) { c.mu.Lock() binary.BigEndian.PutUint32(c.senderTs[:], t.msw) binary.BigEndian.PutUint32(c.senderTs[4:], t.lsw) c.mu.Unlock() } // lastSenderTs returns the timestamp of the most recent sender report. func (c *Client) lastSenderTs() uint32 { c.mu.Lock() t := binary.BigEndian.Uint32(c.senderTs[2:]) c.mu.Unlock() return t } // delay returns the duration between the receive time of the last sender report // and now. This is called when forming a receiver report. func (c *Client) delay() uint32 { c.mu.Lock() t := c.receiveTime c.mu.Unlock() return uint32(time.Now().Sub(t).Seconds() / delayUnit) } // received is called when a sender report is received to mark the receive time. func (c *Client) markReceivedTime() { c.mu.Lock() c.receiveTime = time.Now() c.mu.Unlock() }