package rtcp import ( "encoding/binary" "errors" "fmt" "math" "net" "sync" "time" ) const ( senderSSRC = 3605043418 defaultClientName = "client" delayUnit = 1.0 / 65536.0 ) // client is an rtcp client that will hadle receiving SenderReports from a server // and sending out ReceiverReports. type client struct { ErrChan chan error cAddr *net.UDPAddr sAddr *net.UDPAddr name string sourceSSRC uint32 mu sync.Mutex sequence uint32 senderTs [64]byte interval time.Duration receiveTime time.Time } // NewClient returns a pointer to a new client. func NewClient(clientAddress, serverAddress, name string, sendInterval time.Duration, rtpSSRC uint32) (*client, error) { if name == "" { name = defaultClientName } c := &client{ name: name, ErrChan: make(chan error), interval: sendInterval, sourceSSRC: rtpSSRC, } var err error c.cAddr, err = net.ResolveUDPAddr("udp", clientAddress) if err != nil { return nil, errors.New(fmt.Sprintf("can't resolve client address, failed with error: %v", err)) } c.sAddr, err = net.ResolveUDPAddr("udp", serverAddress) if err != nil { return nil, errors.New(fmt.Sprintf("can't resolve server address, failed with error: %v", err)) } return c, nil } // Start starts the listen and send routines. This will start the process of // receiving and parsing SenderReports, and the process of sending receiver // reports to the server. func (c *client) Start() { go c.listen() go c.send() } // Listen reads from the UDP connection and parses SenderReports. func (c *client) listen() { conn, err := net.ListenUDP("udp", c.cAddr) if err != nil { c.ErrChan <- err } buf := make([]byte, 4096) for { n, _, _ := conn.ReadFromUDP(buf) c.parse(buf[:n]) } } // send write sender reports to the server. func (c *client) send() { conn, err := net.DialUDP("udp", c.cAddr, c.sAddr) if err != nil { c.ErrChan <- err } for { time.Sleep(c.interval) report := ReceiverReport{ Header: Header{ Version: 2, Padding: false, ReportCount: 1, Type: typeReceiverReport, }, SenderSSRC: senderSSRC, Blocks: []ReportBlock{ ReportBlock{ SSRC: c.sourceSSRC, FractionLost: 0, PacketsLost: math.MaxUint32, HighestSequence: c.highestSequence(), Jitter: c.jitter(), LSR: c.lastSenderTs(), DLSR: c.delay(), }, }, Extensions: nil, } description := SourceDescription{ Header: Header{ Version: 2, Padding: false, ReportCount: 1, Type: typeSourceDescription, }, Chunks: []Chunk{ Chunk{ SSRC: senderSSRC, Items: []SDESItem{ SDESItem{ Type: typeCName, Text: []byte(c.name), }, }, }, }, } reportBytes := report.Bytes() reportLen := len(reportBytes) descriptionBytes := description.Bytes() totalLength := reportLen + len(descriptionBytes) bytes := make([]byte, totalLength) copy(bytes, reportBytes) copy(bytes[reportLen:], descriptionBytes) _, err := conn.Write(bytes) if err != nil { c.ErrChan <- err } } } // parse will read important statistics from sender reports. func (c *client) parse(buf []byte) { c.received() msw, err := TimestampMSW(buf) if err != nil { c.ErrChan <- errors.New(fmt.Sprintf("could not get timestamp msw from sender report, failed with error: %v", err)) } lsw, err := TimestampLSW(buf) if err != nil { c.ErrChan <- errors.New(fmt.Sprintf("could not get timestamp lsw from sender report, failed with error: %v", err)) } c.setSenderTs(msw, lsw) } func (c *client) UpdateSequence(s uint32) { c.mu.Lock() c.sequence = s c.mu.Unlock() } func (c *client) highestSequence() uint32 { var s uint32 c.mu.Lock() s = c.sequence c.mu.Unlock() return s } func (c *client) jitter() uint32 { return 0 } func (c *client) setSenderTs(msw, lsw uint32) { c.mu.Lock() binary.BigEndian.PutUint32(c.senderTs[:], msw) binary.BigEndian.PutUint32(c.senderTs[4:], lsw) c.mu.Unlock() } func (c *client) lastSenderTs() uint32 { var ts uint32 c.mu.Lock() ts = binary.BigEndian.Uint32(c.senderTs[2:]) c.mu.Unlock() return ts } func (c *client) delay() uint32 { var receiveTime time.Time c.mu.Lock() receiveTime = c.receiveTime c.mu.Unlock() now := time.Now() return uint32(now.Sub(receiveTime).Seconds() / delayUnit) } func (c *client) received() { c.mu.Lock() c.receiveTime = time.Now() c.mu.Unlock() }