From 1a19412223f001300b0f8469a829836c5c44d9a3 Mon Sep 17 00:00:00 2001 From: Saxon Date: Tue, 16 Apr 2019 12:33:58 +0930 Subject: [PATCH] protocol/rtcp: finished client_test.go improved usability or client Finished writing the client_test.go file and through the process fixed some bugs in the client. Also increased usability by providing a Stop() method so that the send and recv routines, and also the connection can be terminated. Also created a sender report struct in rtcp.go - this helped with testing. --- protocol/rtcp/client.go | 154 ++++++++++++++++++++++------------- protocol/rtcp/client_test.go | 124 ++++++++++++++++++++++++---- protocol/rtcp/parse.go | 5 +- protocol/rtcp/rtcp.go | 2 +- 4 files changed, 212 insertions(+), 73 deletions(-) diff --git a/protocol/rtcp/client.go b/protocol/rtcp/client.go index 654f56ae..f2bef73e 100644 --- a/protocol/rtcp/client.go +++ b/protocol/rtcp/client.go @@ -8,14 +8,19 @@ import ( "net" "sync" "time" + + "bitbucket.org/ausocean/utils/logger" ) const ( senderSSRC = 3605043418 defaultClientName = "client" delayUnit = 1.0 / 65536.0 + pkg = "rtcp: " ) +type log func(lvl int8, msg string, args ...interface{}) + // client is an rtcp client that will hadle receiving SenderReports from a server // and sending out ReceiverReports. type client struct { @@ -30,32 +35,44 @@ type client struct { interval time.Duration receiveTime time.Time buf [200]byte + conn *net.UDPConn + wg sync.WaitGroup + quitSend chan struct{} + quitRecv chan struct{} + log } // NewClient returns a pointer to a new client. -func NewClient(clientAddress, serverAddress, name string, sendInterval time.Duration, rtpSSRC uint32) (*client, error) { +func NewClient(clientAddress, serverAddress, name string, sendInterval time.Duration, rtpSSRC uint32, l log) (*client, error) { if name == "" { name = defaultClientName } c := &client{ name: name, - ErrChan: make(chan error), + ErrChan: make(chan error, 2), + quitSend: make(chan struct{}), + quitRecv: make(chan struct{}), interval: sendInterval, sourceSSRC: rtpSSRC, + log: l, } var err error c.cAddr, err = net.ResolveUDPAddr("udp", clientAddress) if err != nil { - return nil, errors.New(fmt.Sprintf("can't resolve client address, failed with error: %v", err)) + return nil, errors.New(fmt.Sprintf("can't resolve client address, failed with error: %v\n", err)) } c.sAddr, err = net.ResolveUDPAddr("udp", serverAddress) if err != nil { - return nil, errors.New(fmt.Sprintf("can't resolve server address, failed with error: %v", err)) + return nil, errors.New(fmt.Sprintf("can't resolve server address, failed with error: %v\n", err)) } + c.conn, err = net.DialUDP("udp", c.cAddr, c.sAddr) + if err != nil { + return nil, errors.New(fmt.Sprintf("can't dial, failed with error: %v\n", err)) + } return c, nil } @@ -63,77 +80,102 @@ func NewClient(clientAddress, serverAddress, name string, sendInterval time.Dura // receiving and parsing sender reports, and the process of sending receiver // reports to the server. func (c *client) Start() { - go c.listen() + c.log(logger.Debug, pkg+"client is starting") + c.wg.Add(1) + go c.recv() + c.wg.Add(1) go c.send() } +// Stop sends a quit signal to the send and receive routines and closes the +// udp connection. It will wait until both routines have returned. +func (c *client) Stop() { + c.log(logger.Debug, pkg+"client is stopping") + close(c.quitSend) + close(c.quitRecv) + c.conn.Close() + c.wg.Wait() +} + // listen reads from the UDP connection and parses SenderReports. -func (c *client) listen() { - conn, err := net.ListenUDP("udp", c.cAddr) - if err != nil { - c.ErrChan <- err - } +func (c *client) recv() { + defer c.wg.Done() + c.log(logger.Debug, pkg+"client is receiving") buf := make([]byte, 4096) for { - n, _, _ := conn.ReadFromUDP(buf) - c.parse(buf[:n]) + select { + case <-c.quitRecv: + return + default: + n, _, err := c.conn.ReadFromUDP(buf) + if err != nil { + c.ErrChan <- err + continue + } + c.log(logger.Debug, pkg+"sender report received", "report", buf[:n]) + c.parse(buf[:n]) + } } } // send writes receiver reports to the server. func (c *client) send() { - conn, err := net.DialUDP("udp", c.cAddr, c.sAddr) - if err != nil { - c.ErrChan <- err - } + defer c.wg.Done() + c.log(logger.Debug, pkg+"client is sending") for { - time.Sleep(c.interval) + select { + case <-c.quitSend: + return + default: + time.Sleep(c.interval) - report := ReceiverReport{ - Header: Header{ - Version: 2, - Padding: false, - ReportCount: 1, - Type: typeReceiverReport, - }, - SenderSSRC: senderSSRC, - Blocks: []ReportBlock{ - ReportBlock{ - SSRC: c.sourceSSRC, - FractionLost: 0, - PacketsLost: math.MaxUint32, - HighestSequence: c.highestSequence(), - Jitter: c.jitter(), - LSR: c.lastSenderTs(), - DLSR: c.delay(), + report := ReceiverReport{ + Header: Header{ + Version: 2, + Padding: false, + ReportCount: 1, + Type: typeReceiverReport, }, - }, - Extensions: nil, - } + SenderSSRC: senderSSRC, + Blocks: []ReportBlock{ + ReportBlock{ + SSRC: c.sourceSSRC, + FractionLost: 0, + PacketsLost: math.MaxUint32, + HighestSequence: c.highestSequence(), + Jitter: c.jitter(), + LSR: c.lastSenderTs(), + DLSR: c.delay(), + }, + }, + Extensions: nil, + } - description := SourceDescription{ - Header: Header{ - Version: 2, - Padding: false, - ReportCount: 1, - Type: typeSourceDescription, - }, - Chunks: []Chunk{ - Chunk{ - SSRC: senderSSRC, - Items: []SDESItem{ - SDESItem{ - Type: typeCName, - Text: []byte(c.name), + description := SourceDescription{ + Header: Header{ + Version: 2, + Padding: false, + ReportCount: 1, + Type: typeSourceDescription, + }, + Chunks: []Chunk{ + Chunk{ + SSRC: senderSSRC, + Items: []SDESItem{ + SDESItem{ + Type: typeCName, + Text: []byte(c.name), + }, }, }, }, - }, - } + } - _, err := conn.Write(c.formPayload(&report, &description)) - if err != nil { - c.ErrChan <- err + c.log(logger.Debug, pkg+"sending receiver report") + _, err := c.conn.Write(c.formPayload(&report, &description)) + if err != nil { + c.ErrChan <- err + } } } } diff --git a/protocol/rtcp/client_test.go b/protocol/rtcp/client_test.go index cc10d4b9..1cce8c28 100644 --- a/protocol/rtcp/client_test.go +++ b/protocol/rtcp/client_test.go @@ -2,11 +2,18 @@ package rtcp import ( "bytes" + "encoding/binary" "fmt" "math" + "net" + "strings" "testing" + "time" + + "bitbucket.org/ausocean/utils/logger" ) +// TestFromPayload checks that formPayload is working as expected. func TestFormPayload(t *testing.T) { expect := []byte{ 0x81, 0xc9, 0x00, 0x07, @@ -80,29 +87,116 @@ func TestFormPayload(t *testing.T) { } } -/* -func TestReceiveAndSend(t *testing.T) { - quit := make(chan struct{}) - go testServer(quit) +// dummyLogger will allow logging to be done by the testing pkg. +type dummyLogger testing.T + +func (dl *dummyLogger) log(lvl int8, msg string, args ...interface{}) { + var l string + switch lvl { + case logger.Warning: + l = "warning" + case logger.Debug: + l = "debug" + case logger.Info: + l = "info" + case logger.Error: + l = "error" + case logger.Fatal: + l = "fatal" + } + msg = l + ": " + msg + for i := 0; i < len(args); i++ { + msg += " %v" + } + if len(args) == 0 { + dl.Log(msg + "\n") + return + } + dl.Logf(msg+"\n", args) } -func testServer(quit chan struct{}, t *testing.T) { - const testServerAddr = "localhost:8000" - sAddr, err := net.ResolveUDPAddr("udp", testServerAddr) +// TestReceiveAndSend tests basic RTCP client behaviour with a basic RTCP server. +// The RTCP client will send through receiver reports, and the RTCP server will +// respond with sender reports. +func TestReceiveAndSend(t *testing.T) { + const clientAddr, serverAddr = "localhost:8000", "localhost:8001" + c, err := NewClient( + clientAddr, + serverAddr, + "testClient", + 10*time.Millisecond, + 12345, + (*dummyLogger)(t).log, + ) + if err != nil { + t.Fatalf("unexpected error when creating client: %v\n", err) + } + + go func() { + for { + select { + case err := <-c.ErrChan: + const errConnClosed = "use of closed network connection" + if !strings.Contains(err.Error(), errConnClosed) { + t.Fatalf("error received from client error chan: %v\n", err) + } + + default: + } + } + }() + + c.Start() + + sAddr, err := net.ResolveUDPAddr("udp", serverAddr) if err != nil { t.Fatalf("could not resolve test server address, failed with error: %v", err) } - conn, err := net.DialUDP("udp", nil, sAddr) + cAddr, err := net.ResolveUDPAddr("udp", clientAddr) if err != nil { - t.Fatalf("could not dial, failed with error: %v", err) + t.Fatalf("could not resolve client address, failed with error: %v", err) } - select { - case <-quit: - return - default: + conn, err := net.DialUDP("udp", sAddr, cAddr) + if err != nil { + t.Fatalf("could not dial, failed with error: %v\n", err) } + + buf := make([]byte, 4096) + for i := 0; i < 5; i++ { + t.Log("SERVER: waiting for receiver report\n") + n, _, _ := conn.ReadFromUDP(buf) + t.Logf("SERVER: receiver report received: \n%v\n", buf[:n]) + + c.UpdateSequence(uint32(i)) + + now := time.Now().Second() + var time [8]byte + binary.BigEndian.PutUint64(time[:], uint64(now)) + msw := binary.BigEndian.Uint32(time[:]) + lsw := binary.BigEndian.Uint32(time[4:]) + + report := SenderReport{ + Header: Header{ + Version: 2, + Padding: false, + ReportCount: 0, + Type: typeSenderReport, + }, + SSRC: 1234567, + TimestampMSW: msw, + TimestampLSW: lsw, + RTPTimestamp: 0, + PacketCount: 0, + OctetCount: 0, + } + r := report.Bytes() + t.Logf("SERVER: sending sender report: \n%v\n", r) + _, err := conn.Write(r) + if err != nil { + t.Errorf("did not expect error: %v\n", err) + } + } + c.Stop() } - -*/ diff --git a/protocol/rtcp/parse.go b/protocol/rtcp/parse.go index 1a7b6541..4bf5e566 100644 --- a/protocol/rtcp/parse.go +++ b/protocol/rtcp/parse.go @@ -9,12 +9,15 @@ import ( // significant word, and the least significant word. If the given bytes do not // represent a valid receiver report, an error is returned. func Timestamp(buf []byte) (msw, lsw uint32, err error) { + if len(buf) < 4 { + return 0, 0, errors.New("bad RTCP packet, not of sufficient length") + } if (buf[0] & 0xc0 >> 6) != 2 { return 0, 0, errors.New("incompatible RTCP version") } if buf[1] != typeSenderReport { - return 0, 0, errors.New("rtcp packet is not of sender report type") + return 0, 0, errors.New("RTCP packet is not of sender report type") } msw = binary.BigEndian.Uint32(buf[8:]) diff --git a/protocol/rtcp/rtcp.go b/protocol/rtcp/rtcp.go index bc9291a8..7c76b587 100644 --- a/protocol/rtcp/rtcp.go +++ b/protocol/rtcp/rtcp.go @@ -149,7 +149,7 @@ func (r *SenderReport) Bytes() []byte { r.PacketCount, r.OctetCount, } { - binary.BigEndian.PutUint32(buf[i+1:], w) + binary.BigEndian.PutUint32(buf[i+4:], w) } return buf }