package rtcp import ( "encoding/binary" "errors" "fmt" "math" "net" "sync" "time" "bitbucket.org/ausocean/utils/logger" ) const ( senderSSRC = 3605043418 defaultClientName = "client" delayUnit = 1.0 / 65536.0 pkg = "rtcp: " ) type log func(lvl int8, msg string, args ...interface{}) // client is an rtcp client that will hadle receiving SenderReports from a server // and sending out ReceiverReports. type client struct { ErrChan chan error cAddr *net.UDPAddr sAddr *net.UDPAddr name string sourceSSRC uint32 mu sync.Mutex sequence uint32 senderTs [64]byte interval time.Duration receiveTime time.Time buf [200]byte conn *net.UDPConn wg sync.WaitGroup quitSend chan struct{} quitRecv chan struct{} log } // NewClient returns a pointer to a new client. func NewClient(clientAddress, serverAddress, name string, sendInterval time.Duration, rtpSSRC uint32, l log) (*client, error) { if name == "" { name = defaultClientName } c := &client{ name: name, ErrChan: make(chan error, 2), quitSend: make(chan struct{}), quitRecv: make(chan struct{}), interval: sendInterval, sourceSSRC: rtpSSRC, log: l, } var err error c.cAddr, err = net.ResolveUDPAddr("udp", clientAddress) if err != nil { return nil, errors.New(fmt.Sprintf("can't resolve client address, failed with error: %v\n", err)) } c.sAddr, err = net.ResolveUDPAddr("udp", serverAddress) if err != nil { return nil, errors.New(fmt.Sprintf("can't resolve server address, failed with error: %v\n", err)) } c.conn, err = net.DialUDP("udp", c.cAddr, c.sAddr) if err != nil { return nil, errors.New(fmt.Sprintf("can't dial, failed with error: %v\n", err)) } return c, nil } // Start starts the listen and send routines. This will start the process of // receiving and parsing sender reports, and the process of sending receiver // reports to the server. func (c *client) Start() { c.log(logger.Debug, pkg+"client is starting") c.wg.Add(1) go c.recv() c.wg.Add(1) go c.send() } // Stop sends a quit signal to the send and receive routines and closes the // udp connection. It will wait until both routines have returned. func (c *client) Stop() { c.log(logger.Debug, pkg+"client is stopping") close(c.quitSend) close(c.quitRecv) c.conn.Close() c.wg.Wait() } // listen reads from the UDP connection and parses SenderReports. func (c *client) recv() { defer c.wg.Done() c.log(logger.Debug, pkg+"client is receiving") buf := make([]byte, 4096) for { select { case <-c.quitRecv: return default: n, _, err := c.conn.ReadFromUDP(buf) if err != nil { c.ErrChan <- err continue } c.log(logger.Debug, pkg+"sender report received", "report", buf[:n]) c.parse(buf[:n]) } } } // send writes receiver reports to the server. func (c *client) send() { defer c.wg.Done() c.log(logger.Debug, pkg+"client is sending") for { select { case <-c.quitSend: return default: time.Sleep(c.interval) report := ReceiverReport{ Header: Header{ Version: 2, Padding: false, ReportCount: 1, Type: typeReceiverReport, }, SenderSSRC: senderSSRC, Blocks: []ReportBlock{ ReportBlock{ SSRC: c.sourceSSRC, FractionLost: 0, PacketsLost: math.MaxUint32, HighestSequence: c.highestSequence(), Jitter: c.jitter(), LSR: c.lastSenderTs(), DLSR: c.delay(), }, }, Extensions: nil, } description := SourceDescription{ Header: Header{ Version: 2, Padding: false, ReportCount: 1, Type: typeSourceDescription, }, Chunks: []Chunk{ Chunk{ SSRC: senderSSRC, Items: []SDESItem{ SDESItem{ Type: typeCName, Text: []byte(c.name), }, }, }, }, } c.log(logger.Debug, pkg+"sending receiver report") _, err := c.conn.Write(c.formPayload(&report, &description)) if err != nil { c.ErrChan <- err } } } } // formPayload takes a pointer to a ReceiverReport and a pointer to a // Source Description and calls Bytes on both, writing to the underlying client // buf. A slice to the combined writtem memory is returned. func (c *client) formPayload(r *ReceiverReport, d *SourceDescription) []byte { rl := len(r.Bytes(c.buf[:])) dl := len(d.Bytes(c.buf[rl:])) t := rl + dl if t > cap(c.buf) { panic("client buf not big enough") } return c.buf[:t] } // parse will read important statistics from sender reports. func (c *client) parse(buf []byte) { c.received() msw, lsw, err := Timestamp(buf) if err != nil { c.ErrChan <- errors.New(fmt.Sprintf("could not get timestamp from sender report, failed with error: %v", err)) } c.setSenderTs(msw, lsw) } // UpdateSequence will allow updating of the highest sequence number received // through an rtp stream. func (c *client) UpdateSequence(s uint32) { c.mu.Lock() c.sequence = s c.mu.Unlock() } // highestSequence will return the highest sequence number received through rtp. func (c *client) highestSequence() uint32 { var s uint32 c.mu.Lock() s = c.sequence c.mu.Unlock() return s } // jitter returns the interarrival jitter as described by RTCP specifications: // https://tools.ietf.org/html/rfc3550 func (c *client) jitter() uint32 { return 0 } // setSenderTs allows us to safely set the current sender report timestamp. func (c *client) setSenderTs(msw, lsw uint32) { c.mu.Lock() binary.BigEndian.PutUint32(c.senderTs[:], msw) binary.BigEndian.PutUint32(c.senderTs[4:], lsw) c.mu.Unlock() } // lastSenderTs returns the timestamp of the most recent sender report. func (c *client) lastSenderTs() uint32 { var ts uint32 c.mu.Lock() ts = binary.BigEndian.Uint32(c.senderTs[2:]) c.mu.Unlock() return ts } // delay returns the duration between the receive time of the last sender report // and now. This is called when forming a receiver report. func (c *client) delay() uint32 { var receiveTime time.Time c.mu.Lock() receiveTime = c.receiveTime c.mu.Unlock() now := time.Now() return uint32(now.Sub(receiveTime).Seconds() / delayUnit) } // received is called when a sender report is received to mark the receive time. func (c *client) received() { c.mu.Lock() c.receiveTime = time.Now() c.mu.Unlock() }