From be76998c7df6fabc35a451fae102f205a0dd4232 Mon Sep 17 00:00:00 2001 From: Saxon Date: Sun, 21 Apr 2019 22:40:08 +0930 Subject: [PATCH] protocol/rtp: wrote TestReceiveNoOp Wrote test TestReceiveNoOP to check that client works correctly when we give no operation to perform on RTP packets before storing in the client ringBuffer, which calling io.Reader implementation Read will get packets form. --- protocol/rtp/client.go | 46 ++++++++++----- protocol/rtp/client_test.go | 109 +++++++++++++++++++++++++++++++++++- 2 files changed, 139 insertions(+), 16 deletions(-) diff --git a/protocol/rtp/client.go b/protocol/rtp/client.go index b6d80956..00682308 100644 --- a/protocol/rtp/client.go +++ b/protocol/rtp/client.go @@ -33,13 +33,16 @@ import ( "io" "net" "sync" + "time" + "bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/ring" ) // Misc consts. const ( chanSize = 10 + pkg = "rtp: " ) // RingBuffer consts. @@ -48,6 +51,8 @@ const ( ringBufferElementSize = 4096 ) +type log func(lvl int8, msg string, args ...interface{}) + // Client describes an RTP client that can receive an RTP stream and implements // io.Reader. type Client struct { @@ -57,6 +62,8 @@ type Client struct { ring *ring.Buffer // Processed data from RTP packets will be stored here. op func([]byte) ([]byte, error) // The operation to perform on received RTP packets before storing. ErrChan chan error // Errors encountered during recv will be sent to this chan. + rt time.Duration // Read timeout used when reading from the ringbuffer. + log } // NewClient returns a pointer to a new Client. @@ -65,12 +72,17 @@ type Client struct { // RTP at. op is a function, if non nil, that will be used to perform an // operation on each received RTP packet. For example, the op func may parse // out the RTP payload. The result of the operation is then what is stored -// in the ringBuffer for reading. -func NewClient(addr string, op func([]byte) ([]byte, error)) (*Client, error) { +// in the ringBuffer for reading. l is a logging function defined by the +// signuture of the log type defined above. rt is the read timeout used when +// reading from the client ringbuffer. +func NewClient(addr string, op func([]byte) ([]byte, error), l log, rt time.Duration) (*Client, error) { c := &Client{ - done: make(chan struct{}, chanSize), - ring: ring.NewBuffer(ringBufferSize, ringBufferElementSize, 0), - op: op, + done: make(chan struct{}, chanSize), + ring: ring.NewBuffer(ringBufferSize, ringBufferElementSize, 0), + op: op, + log: l, + ErrChan: make(chan error), + rt: rt, } a, err := net.ResolveUDPAddr("udp", addr) @@ -88,10 +100,20 @@ func NewClient(addr string, op func([]byte) ([]byte, error)) (*Client, error) { // Start will start the recv routine. func (c *Client) Start() { + c.log(logger.Info, pkg+"starting client") c.wg.Add(1) go c.recv() } +// Stop will send a done signal to the receive routine, and also close the +// connection. +func (c *Client) Stop() { + c.log(logger.Info, pkg+"stopping client") + c.conn.Close() + close(c.done) + c.wg.Wait() +} + // recv will read RTP packets from the UDP connection, perform the operation // on them given at creation of the Client and then store the result in the // ringBuffer for Reading. @@ -101,13 +123,16 @@ func (c *Client) recv() { for { select { case <-c.done: + c.log(logger.Debug, pkg+"done signal received") return default: + c.log(logger.Debug, pkg+"waiting for packet") n, _, err := c.conn.ReadFromUDP(buf) if err != nil { c.ErrChan <- err continue } + c.log(logger.Debug, pkg+"packet received", "packet", buf[:n]) var _buf []byte switch c.op { @@ -131,19 +156,12 @@ func (c *Client) recv() { } } -// Stop will send a done signal to the receive routine, and also close the -// connection. -func (c *Client) Stop() { - close(c.done) - c.conn.Close() - c.wg.Wait() -} - // Read implements io.Reader. // // Read will get the next chunk from the ringBuffer and copy the bytes to p. func (c *Client) Read(p []byte) (int, error) { - chunk, err := c.ring.Next(0) + c.log(logger.Debug, pkg+"user reading packet") + chunk, err := c.ring.Next(c.rt) if err != nil { return 0, io.EOF } diff --git a/protocol/rtp/client_test.go b/protocol/rtp/client_test.go index 754f578a..d3f3fcb6 100644 --- a/protocol/rtp/client_test.go +++ b/protocol/rtp/client_test.go @@ -28,8 +28,113 @@ LICENSE package rtp -import "testing" +import ( + "bytes" + "io" + "net" + "testing" + "time" -func TestReceive(t *testing.T) { + "bitbucket.org/ausocean/utils/logger" +) + +// dummyLogger will allow logging to be done by the testing pkg. +type dummyLogger testing.T + +func (dl *dummyLogger) log(lvl int8, msg string, args ...interface{}) { + var l string + switch lvl { + case logger.Warning: + l = "warning" + case logger.Debug: + l = "debug" + case logger.Info: + l = "info" + case logger.Error: + l = "error" + case logger.Fatal: + l = "fatal" + } + msg = l + ": " + msg + for i := 0; i < len(args); i++ { + msg += " %v" + } + if len(args) == 0 { + dl.Log(msg + "\n") + return + } + dl.Logf(msg+"\n", args...) +} + +// TestReceiveNoOp will check that we can successfully use a Client to receive +// RTP using no operation. +func TestReceiveNoOp(t *testing.T) { + const clientAddr = "localhost:8000" + const packetsToSend = 20 + + // Create new client; note that op function set to nil, i.e. we don't want to + // perform any operation on packet before storing in ringbuffer. + c, err := NewClient(clientAddr, nil, (*dummyLogger)(t).log, 1*time.Millisecond) + if err != nil { + t.Fatalf("could not create client, failed with error: %v", err) + } + c.Start() + + // Log any errors from client. + go func() { + for { + err := <-c.ErrChan + t.Logf("unexpected error from client: %v", err) + } + }() + + // Start the RTP 'server'. + go func() { + cAddr, err := net.ResolveUDPAddr("udp", clientAddr) + if err != nil { + t.Fatalf("could not resolve server address, failed with err: %v", err) + } + + conn, err := net.DialUDP("udp", nil, cAddr) + if err != nil { + t.Fatalf("could not dial udp, failed with err: %v", err) + } + + // Send packets to the client. Packet payload will just be the packet number. + for i := 0; i < packetsToSend; i++ { + p := (&Pkt{V: rtpVer, Payload: []byte{byte(i)}}).Bytes(nil) + _, err := conn.Write(p) + if err != nil { + t.Errorf("could not write packet to conn, failed with err: %v", err) + } + } + }() + + // Read packets using the client and check them with expected. + var packetsReceived int + buf := make([]byte, 4096) + for { + if packetsReceived == packetsToSend { + break + } + n, err := c.Read(buf) + switch err { + case nil: + case io.EOF: + continue + default: + t.Fatalf("unexpected error: %v", err) + } + expect := (&Pkt{V: rtpVer, Payload: []byte{byte(packetsReceived)}}).Bytes(nil) + got := buf[:n] + if !bytes.Equal(got, expect) { + t.Errorf("did not get expected result. \nGot: %v\n Want: %v\n", got, expect) + } + packetsReceived++ + } + c.Stop() +} + +func TestReceiveOp(t *testing.T) { }