diff --git a/protocol/rtp/client.go b/protocol/rtp/client.go index b6d80956..00682308 100644 --- a/protocol/rtp/client.go +++ b/protocol/rtp/client.go @@ -33,13 +33,16 @@ import ( "io" "net" "sync" + "time" + "bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/ring" ) // Misc consts. const ( chanSize = 10 + pkg = "rtp: " ) // RingBuffer consts. @@ -48,6 +51,8 @@ const ( ringBufferElementSize = 4096 ) +type log func(lvl int8, msg string, args ...interface{}) + // Client describes an RTP client that can receive an RTP stream and implements // io.Reader. type Client struct { @@ -57,6 +62,8 @@ type Client struct { ring *ring.Buffer // Processed data from RTP packets will be stored here. op func([]byte) ([]byte, error) // The operation to perform on received RTP packets before storing. ErrChan chan error // Errors encountered during recv will be sent to this chan. + rt time.Duration // Read timeout used when reading from the ringbuffer. + log } // NewClient returns a pointer to a new Client. @@ -65,12 +72,17 @@ type Client struct { // RTP at. op is a function, if non nil, that will be used to perform an // operation on each received RTP packet. For example, the op func may parse // out the RTP payload. The result of the operation is then what is stored -// in the ringBuffer for reading. -func NewClient(addr string, op func([]byte) ([]byte, error)) (*Client, error) { +// in the ringBuffer for reading. l is a logging function defined by the +// signuture of the log type defined above. rt is the read timeout used when +// reading from the client ringbuffer. +func NewClient(addr string, op func([]byte) ([]byte, error), l log, rt time.Duration) (*Client, error) { c := &Client{ - done: make(chan struct{}, chanSize), - ring: ring.NewBuffer(ringBufferSize, ringBufferElementSize, 0), - op: op, + done: make(chan struct{}, chanSize), + ring: ring.NewBuffer(ringBufferSize, ringBufferElementSize, 0), + op: op, + log: l, + ErrChan: make(chan error), + rt: rt, } a, err := net.ResolveUDPAddr("udp", addr) @@ -88,10 +100,20 @@ func NewClient(addr string, op func([]byte) ([]byte, error)) (*Client, error) { // Start will start the recv routine. func (c *Client) Start() { + c.log(logger.Info, pkg+"starting client") c.wg.Add(1) go c.recv() } +// Stop will send a done signal to the receive routine, and also close the +// connection. +func (c *Client) Stop() { + c.log(logger.Info, pkg+"stopping client") + c.conn.Close() + close(c.done) + c.wg.Wait() +} + // recv will read RTP packets from the UDP connection, perform the operation // on them given at creation of the Client and then store the result in the // ringBuffer for Reading. @@ -101,13 +123,16 @@ func (c *Client) recv() { for { select { case <-c.done: + c.log(logger.Debug, pkg+"done signal received") return default: + c.log(logger.Debug, pkg+"waiting for packet") n, _, err := c.conn.ReadFromUDP(buf) if err != nil { c.ErrChan <- err continue } + c.log(logger.Debug, pkg+"packet received", "packet", buf[:n]) var _buf []byte switch c.op { @@ -131,19 +156,12 @@ func (c *Client) recv() { } } -// Stop will send a done signal to the receive routine, and also close the -// connection. -func (c *Client) Stop() { - close(c.done) - c.conn.Close() - c.wg.Wait() -} - // Read implements io.Reader. // // Read will get the next chunk from the ringBuffer and copy the bytes to p. func (c *Client) Read(p []byte) (int, error) { - chunk, err := c.ring.Next(0) + c.log(logger.Debug, pkg+"user reading packet") + chunk, err := c.ring.Next(c.rt) if err != nil { return 0, io.EOF } diff --git a/protocol/rtp/client_test.go b/protocol/rtp/client_test.go index 754f578a..d3f3fcb6 100644 --- a/protocol/rtp/client_test.go +++ b/protocol/rtp/client_test.go @@ -28,8 +28,113 @@ LICENSE package rtp -import "testing" +import ( + "bytes" + "io" + "net" + "testing" + "time" -func TestReceive(t *testing.T) { + "bitbucket.org/ausocean/utils/logger" +) + +// dummyLogger will allow logging to be done by the testing pkg. +type dummyLogger testing.T + +func (dl *dummyLogger) log(lvl int8, msg string, args ...interface{}) { + var l string + switch lvl { + case logger.Warning: + l = "warning" + case logger.Debug: + l = "debug" + case logger.Info: + l = "info" + case logger.Error: + l = "error" + case logger.Fatal: + l = "fatal" + } + msg = l + ": " + msg + for i := 0; i < len(args); i++ { + msg += " %v" + } + if len(args) == 0 { + dl.Log(msg + "\n") + return + } + dl.Logf(msg+"\n", args...) +} + +// TestReceiveNoOp will check that we can successfully use a Client to receive +// RTP using no operation. +func TestReceiveNoOp(t *testing.T) { + const clientAddr = "localhost:8000" + const packetsToSend = 20 + + // Create new client; note that op function set to nil, i.e. we don't want to + // perform any operation on packet before storing in ringbuffer. + c, err := NewClient(clientAddr, nil, (*dummyLogger)(t).log, 1*time.Millisecond) + if err != nil { + t.Fatalf("could not create client, failed with error: %v", err) + } + c.Start() + + // Log any errors from client. + go func() { + for { + err := <-c.ErrChan + t.Logf("unexpected error from client: %v", err) + } + }() + + // Start the RTP 'server'. + go func() { + cAddr, err := net.ResolveUDPAddr("udp", clientAddr) + if err != nil { + t.Fatalf("could not resolve server address, failed with err: %v", err) + } + + conn, err := net.DialUDP("udp", nil, cAddr) + if err != nil { + t.Fatalf("could not dial udp, failed with err: %v", err) + } + + // Send packets to the client. Packet payload will just be the packet number. + for i := 0; i < packetsToSend; i++ { + p := (&Pkt{V: rtpVer, Payload: []byte{byte(i)}}).Bytes(nil) + _, err := conn.Write(p) + if err != nil { + t.Errorf("could not write packet to conn, failed with err: %v", err) + } + } + }() + + // Read packets using the client and check them with expected. + var packetsReceived int + buf := make([]byte, 4096) + for { + if packetsReceived == packetsToSend { + break + } + n, err := c.Read(buf) + switch err { + case nil: + case io.EOF: + continue + default: + t.Fatalf("unexpected error: %v", err) + } + expect := (&Pkt{V: rtpVer, Payload: []byte{byte(packetsReceived)}}).Bytes(nil) + got := buf[:n] + if !bytes.Equal(got, expect) { + t.Errorf("did not get expected result. \nGot: %v\n Want: %v\n", got, expect) + } + packetsReceived++ + } + c.Stop() +} + +func TestReceiveOp(t *testing.T) { }