diff --git a/protocol/rtp/client.go b/protocol/rtp/client.go index ff4dda24..439e43c6 100644 --- a/protocol/rtp/client.go +++ b/protocol/rtp/client.go @@ -30,53 +30,21 @@ LICENSE package rtp import ( - "io" "net" - "sync" - "time" - - "bitbucket.org/ausocean/utils/logger" - "bitbucket.org/ausocean/utils/ring" ) -const ( - chanSize = 10 - pkg = "rtp: " -) - -// RingBuffer consts. -const ( - ringBufferSize = 100 - ringBufferElementSize = 4096 -) - -type log func(lvl int8, msg string, args ...interface{}) - // Client describes an RTP client that can receive an RTP stream and implements // io.Reader. type Client struct { - conn *net.UDPConn // The UDP connection RTP packets will be read from. - wg sync.WaitGroup // Used to wait for recv routine to finish. - done chan struct{} // Used to terminate the recv routine. - ring *ring.Buffer // Processed data from RTP packets will be stored here. - err chan error // Errors encountered during recv will be sent to this chan. - rt time.Duration // Read timeout used when reading from the ringbuffer. - log + conn *net.UDPConn } // NewClient returns a pointer to a new Client. // // addr is the address of form : that we expect to receive -// RTP at. l is a logging function defined by the signuture of the log type -// defined above. rt is the read timeout used when reading from the client ringbuffer. -func NewClient(addr string, l log, rt time.Duration) (*Client, error) { - c := &Client{ - done: make(chan struct{}, chanSize), - ring: ring.NewBuffer(ringBufferSize, ringBufferElementSize, 0), - log: l, - err: make(chan error), - rt: rt, - } +// RTP at. +func NewClient(addr string) (*Client, error) { + c := &Client{} a, err := net.ResolveUDPAddr("udp", addr) if err != nil { @@ -91,67 +59,7 @@ func NewClient(addr string, l log, rt time.Duration) (*Client, error) { return c, nil } -// Start will start the recv routine. -func (c *Client) Start() { - c.log(logger.Info, pkg+"starting client") - c.wg.Add(1) - go c.recv() -} - -// Stop will send a done signal to the receive routine, and also close the -// connection. -func (c *Client) Stop() { - c.log(logger.Info, pkg+"stopping client") - c.conn.Close() - close(c.done) - c.wg.Wait() -} - -// Err returns the client err channel as receive only. -func (c *Client) Err() <-chan error { - return c.err -} - -// recv will read RTP packets from the UDP connection, perform the operation -// on them given at creation of the Client and then store the result in the -// ringBuffer for Reading. -func (c *Client) recv() { - defer c.wg.Done() - buf := make([]byte, ringBufferElementSize) - for { - select { - case <-c.done: - c.log(logger.Debug, pkg+"done signal received") - return - default: - c.log(logger.Debug, pkg+"waiting for packet") - n, _, err := c.conn.ReadFromUDP(buf) - if err != nil { - c.err <- err - continue - } - c.log(logger.Debug, pkg+"packet received", "packet", buf[:n]) - - _, err = c.ring.Write(buf[:n]) - c.ring.Flush() - if err != nil { - c.err <- err - } - } - } -} - -// Read implements io.Reader. -// -// Read will get the next chunk from the ringBuffer and copy the bytes to p. +// Read implements io.Reader. This wraps the Read for the connection. func (c *Client) Read(p []byte) (int, error) { - c.log(logger.Debug, pkg+"user reading data") - chunk, err := c.ring.Next(c.rt) - if err != nil { - return 0, io.EOF - } - n := copy(p, chunk.Bytes()) - chunk.Close() - chunk = nil - return n, nil + return c.conn.Read(p) } diff --git a/protocol/rtp/client_test.go b/protocol/rtp/client_test.go index 7137ef4b..55fef74c 100644 --- a/protocol/rtp/client_test.go +++ b/protocol/rtp/client_test.go @@ -34,7 +34,6 @@ import ( "io" "net" "testing" - "time" "bitbucket.org/ausocean/utils/logger" ) @@ -85,11 +84,10 @@ func TestReceive(t *testing.T) { go func() { // Create and start the client. var err error - c, err = NewClient(clientAddr, (*dummyLogger)(t).log, 1*time.Millisecond) + c, err = NewClient(clientAddr) if err != nil { testErr <- fmt.Errorf("could not create client, failed with error: %v\n", err) } - c.Start() close(clientReady) // Read packets using the client and check them with expected. @@ -115,7 +113,6 @@ func TestReceive(t *testing.T) { } packetsReceived++ } - c.Stop() close(done) }() @@ -146,8 +143,6 @@ func TestReceive(t *testing.T) { loop: for { select { - case err := <-c.Err(): - t.Log(err) case err := <-testErr: t.Fatal(err) case err := <-serverErr: