From 80a7d41d8a9ca6f16334887bd6ba80d191388e5a Mon Sep 17 00:00:00 2001 From: Saxon Date: Wed, 1 May 2019 14:15:39 +0930 Subject: [PATCH] protocol/rtp: removed op from Client i.e. what is read from Client are RTP packets. --- protocol/rtp/client.go | 32 ++------ protocol/rtp/client_test.go | 160 +++++++++++++++++------------------- 2 files changed, 85 insertions(+), 107 deletions(-) diff --git a/protocol/rtp/client.go b/protocol/rtp/client.go index 95cbe128..0635177d 100644 --- a/protocol/rtp/client.go +++ b/protocol/rtp/client.go @@ -47,7 +47,7 @@ const ( // RingBuffer consts. const ( - ringBufferSize = 10 + ringBufferSize = 100 ringBufferElementSize = 4096 ) @@ -56,13 +56,12 @@ type log func(lvl int8, msg string, args ...interface{}) // Client describes an RTP client that can receive an RTP stream and implements // io.Reader. type Client struct { - conn *net.UDPConn // The UDP connection RTP packets will be read from. - wg sync.WaitGroup // Used to wait for recv routine to finish. - done chan struct{} // Used to terminate the recv routine. - ring *ring.Buffer // Processed data from RTP packets will be stored here. - op func([]byte) ([]byte, error) // The operation to perform on received RTP packets before storing. - err chan error // Errors encountered during recv will be sent to this chan. - rt time.Duration // Read timeout used when reading from the ringbuffer. + conn *net.UDPConn // The UDP connection RTP packets will be read from. + wg sync.WaitGroup // Used to wait for recv routine to finish. + done chan struct{} // Used to terminate the recv routine. + ring *ring.Buffer // Processed data from RTP packets will be stored here. + err chan error // Errors encountered during recv will be sent to this chan. + rt time.Duration // Read timeout used when reading from the ringbuffer. log } @@ -75,11 +74,10 @@ type Client struct { // in the ringBuffer for reading. l is a logging function defined by the // signuture of the log type defined above. rt is the read timeout used when // reading from the client ringbuffer. -func NewClient(addr string, op func([]byte) ([]byte, error), l log, rt time.Duration) (*Client, error) { +func NewClient(addr string, l log, rt time.Duration) (*Client, error) { c := &Client{ done: make(chan struct{}, chanSize), ring: ring.NewBuffer(ringBufferSize, ringBufferElementSize, 0), - op: op, log: l, err: make(chan error), rt: rt, @@ -139,19 +137,7 @@ func (c *Client) recv() { } c.log(logger.Debug, pkg+"packet received", "packet", buf[:n]) - var _buf []byte - switch c.op { - case nil: - _buf = buf[:n] - default: - _buf, err = c.op(buf[:n]) - if err != nil { - c.err <- err - continue - } - } - - _, err = c.ring.Write(_buf) + _, err = c.ring.Write(buf[:n]) c.ring.Flush() if err != nil { c.err <- err diff --git a/protocol/rtp/client_test.go b/protocol/rtp/client_test.go index 5ed0d2e0..69b8169b 100644 --- a/protocol/rtp/client_test.go +++ b/protocol/rtp/client_test.go @@ -75,94 +75,86 @@ func TestReceive(t *testing.T) { packetsToSend = 20 ) - for _, op := range []func([]byte) ([]byte, error){nil, Payload} { - testErr := make(chan error) - serverErr := make(chan error) - done := make(chan struct{}) - clientReady := make(chan struct{}) - var c *Client + testErr := make(chan error) + serverErr := make(chan error) + done := make(chan struct{}) + clientReady := make(chan struct{}) + var c *Client - // Start routine to read from client. - go func() { - // Create and start the client. - var err error - c, err = NewClient(clientAddr, op, (*dummyLogger)(t).log, 1*time.Millisecond) - if err != nil { - testErr <- fmt.Errorf("could not create client, failed with error: %v\n", err) - } - c.Start() - close(clientReady) + // Start routine to read from client. + go func() { + // Create and start the client. + var err error + c, err = NewClient(clientAddr, (*dummyLogger)(t).log, 1*time.Millisecond) + if err != nil { + testErr <- fmt.Errorf("could not create client, failed with error: %v\n", err) + } + c.Start() + close(clientReady) - // Read packets using the client and check them with expected. - var packetsReceived int - buf := make([]byte, 4096) - for packetsReceived != packetsToSend { - n, err := c.Read(buf) - switch err { - case nil: - case io.EOF: - continue - default: - testErr <- fmt.Errorf("unexpected error from c.Read: %v\n", err) - } - - // Create expected data and apply operation if there is one. - expect := (&Pkt{V: rtpVer, Payload: []byte{byte(packetsReceived)}}).Bytes(nil) - if op != nil { - expect, err = op(expect) - if err != nil { - testErr <- fmt.Errorf("unexpected error when applying op: %v\n", err) - } - } - - // Compare. - got := buf[:n] - if !bytes.Equal(got, expect) { - testErr <- fmt.Errorf("did not get expected result. \nGot: %v\n Want: %v\n", got, expect) - } - packetsReceived++ - } - c.Stop() - close(done) - }() - - // Start the RTP server. - go func() { - <-clientReady - cAddr, err := net.ResolveUDPAddr("udp", clientAddr) - if err != nil { - serverErr <- fmt.Errorf("could not resolve server address, failed with err: %v\n", err) - } - - conn, err := net.DialUDP("udp", nil, cAddr) - if err != nil { - serverErr <- fmt.Errorf("could not dial udp, failed with err: %v\n", err) - } - - // Send packets to the client. - for i := 0; i < packetsToSend; i++ { - p := (&Pkt{V: rtpVer, Payload: []byte{byte(i)}}).Bytes(nil) - _, err := conn.Write(p) - if err != nil { - serverErr <- fmt.Errorf("could not write packet to conn, failed with err: %v\n", err) - } - } - }() - - <-clientReady - loop: - for { - select { - case err := <-c.Err(): - t.Log(err) - case err := <-testErr: - t.Fatal(err) - case err := <-serverErr: - t.Fatal(err) - case <-done: - break loop + // Read packets using the client and check them with expected. + var packetsReceived int + buf := make([]byte, 4096) + for packetsReceived != packetsToSend { + n, err := c.Read(buf) + switch err { + case nil: + case io.EOF: + continue default: + testErr <- fmt.Errorf("unexpected error from c.Read: %v\n", err) } + + // Create expected data and apply operation if there is one. + expect := (&Pkt{V: rtpVer, Payload: []byte{byte(packetsReceived)}}).Bytes(nil) + + // Compare. + got := buf[:n] + if !bytes.Equal(got, expect) { + testErr <- fmt.Errorf("did not get expected result. \nGot: %v\n Want: %v\n", got, expect) + } + packetsReceived++ + } + c.Stop() + close(done) + }() + + // Start the RTP server. + go func() { + <-clientReady + cAddr, err := net.ResolveUDPAddr("udp", clientAddr) + if err != nil { + serverErr <- fmt.Errorf("could not resolve server address, failed with err: %v\n", err) + } + + conn, err := net.DialUDP("udp", nil, cAddr) + if err != nil { + serverErr <- fmt.Errorf("could not dial udp, failed with err: %v\n", err) + } + + // Send packets to the client. + for i := 0; i < packetsToSend; i++ { + p := (&Pkt{V: rtpVer, Payload: []byte{byte(i)}}).Bytes(nil) + _, err := conn.Write(p) + if err != nil { + serverErr <- fmt.Errorf("could not write packet to conn, failed with err: %v\n", err) + } + } + }() + + <-clientReady +loop: + for { + select { + case err := <-c.Err(): + t.Log(err) + case err := <-testErr: + t.Fatal(err) + case err := <-serverErr: + t.Fatal(err) + case <-done: + break loop + default: } } }