From c48e681c414335033b063d0ef156c4d6988f7e8d Mon Sep 17 00:00:00 2001 From: Saxon Date: Wed, 8 May 2019 16:54:02 +0930 Subject: [PATCH] protocol/rtp/client.go: removed buffering in client. Removed buffering in rtp client. This simplified things alot e.g. the recv routine has been removed, and therefore anything that was there to help with handling of the routine is also gone like the Start() and Stop() methods as well as signalling channels and waitgroups. The client is now just effectively a wrapper for a udp conn. --- protocol/rtp/client.go | 104 +++--------------------------------- protocol/rtp/client_test.go | 7 +-- 2 files changed, 7 insertions(+), 104 deletions(-) diff --git a/protocol/rtp/client.go b/protocol/rtp/client.go index ff4dda24..439e43c6 100644 --- a/protocol/rtp/client.go +++ b/protocol/rtp/client.go @@ -30,53 +30,21 @@ LICENSE package rtp import ( - "io" "net" - "sync" - "time" - - "bitbucket.org/ausocean/utils/logger" - "bitbucket.org/ausocean/utils/ring" ) -const ( - chanSize = 10 - pkg = "rtp: " -) - -// RingBuffer consts. -const ( - ringBufferSize = 100 - ringBufferElementSize = 4096 -) - -type log func(lvl int8, msg string, args ...interface{}) - // Client describes an RTP client that can receive an RTP stream and implements // io.Reader. type Client struct { - conn *net.UDPConn // The UDP connection RTP packets will be read from. - wg sync.WaitGroup // Used to wait for recv routine to finish. - done chan struct{} // Used to terminate the recv routine. - ring *ring.Buffer // Processed data from RTP packets will be stored here. - err chan error // Errors encountered during recv will be sent to this chan. - rt time.Duration // Read timeout used when reading from the ringbuffer. - log + conn *net.UDPConn } // NewClient returns a pointer to a new Client. // // addr is the address of form : that we expect to receive -// RTP at. l is a logging function defined by the signuture of the log type -// defined above. rt is the read timeout used when reading from the client ringbuffer. -func NewClient(addr string, l log, rt time.Duration) (*Client, error) { - c := &Client{ - done: make(chan struct{}, chanSize), - ring: ring.NewBuffer(ringBufferSize, ringBufferElementSize, 0), - log: l, - err: make(chan error), - rt: rt, - } +// RTP at. +func NewClient(addr string) (*Client, error) { + c := &Client{} a, err := net.ResolveUDPAddr("udp", addr) if err != nil { @@ -91,67 +59,7 @@ func NewClient(addr string, l log, rt time.Duration) (*Client, error) { return c, nil } -// Start will start the recv routine. -func (c *Client) Start() { - c.log(logger.Info, pkg+"starting client") - c.wg.Add(1) - go c.recv() -} - -// Stop will send a done signal to the receive routine, and also close the -// connection. -func (c *Client) Stop() { - c.log(logger.Info, pkg+"stopping client") - c.conn.Close() - close(c.done) - c.wg.Wait() -} - -// Err returns the client err channel as receive only. -func (c *Client) Err() <-chan error { - return c.err -} - -// recv will read RTP packets from the UDP connection, perform the operation -// on them given at creation of the Client and then store the result in the -// ringBuffer for Reading. -func (c *Client) recv() { - defer c.wg.Done() - buf := make([]byte, ringBufferElementSize) - for { - select { - case <-c.done: - c.log(logger.Debug, pkg+"done signal received") - return - default: - c.log(logger.Debug, pkg+"waiting for packet") - n, _, err := c.conn.ReadFromUDP(buf) - if err != nil { - c.err <- err - continue - } - c.log(logger.Debug, pkg+"packet received", "packet", buf[:n]) - - _, err = c.ring.Write(buf[:n]) - c.ring.Flush() - if err != nil { - c.err <- err - } - } - } -} - -// Read implements io.Reader. -// -// Read will get the next chunk from the ringBuffer and copy the bytes to p. +// Read implements io.Reader. This wraps the Read for the connection. func (c *Client) Read(p []byte) (int, error) { - c.log(logger.Debug, pkg+"user reading data") - chunk, err := c.ring.Next(c.rt) - if err != nil { - return 0, io.EOF - } - n := copy(p, chunk.Bytes()) - chunk.Close() - chunk = nil - return n, nil + return c.conn.Read(p) } diff --git a/protocol/rtp/client_test.go b/protocol/rtp/client_test.go index 7137ef4b..55fef74c 100644 --- a/protocol/rtp/client_test.go +++ b/protocol/rtp/client_test.go @@ -34,7 +34,6 @@ import ( "io" "net" "testing" - "time" "bitbucket.org/ausocean/utils/logger" ) @@ -85,11 +84,10 @@ func TestReceive(t *testing.T) { go func() { // Create and start the client. var err error - c, err = NewClient(clientAddr, (*dummyLogger)(t).log, 1*time.Millisecond) + c, err = NewClient(clientAddr) if err != nil { testErr <- fmt.Errorf("could not create client, failed with error: %v\n", err) } - c.Start() close(clientReady) // Read packets using the client and check them with expected. @@ -115,7 +113,6 @@ func TestReceive(t *testing.T) { } packetsReceived++ } - c.Stop() close(done) }() @@ -146,8 +143,6 @@ func TestReceive(t *testing.T) { loop: for { select { - case err := <-c.Err(): - t.Log(err) case err := <-testErr: t.Fatal(err) case err := <-serverErr: