From ccc08bfad105b04cb3363472cbaf90082a2b5860 Mon Sep 17 00:00:00 2001 From: Saxon Date: Fri, 19 Apr 2019 18:17:06 +0930 Subject: [PATCH 01/18] protoocl/rtp: added client.go file Added client.go which contains a struct to describe an RTP client. It provides a method, Start, which will invoke a recv routine to start receiving packets and process them using an op function passed on the Client's creation. Client implements io.Reader, so that the client may be read from. --- protocol/rtp/client.go | 124 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 124 insertions(+) create mode 100644 protocol/rtp/client.go diff --git a/protocol/rtp/client.go b/protocol/rtp/client.go new file mode 100644 index 00000000..4140abc1 --- /dev/null +++ b/protocol/rtp/client.go @@ -0,0 +1,124 @@ +/* +NAME + client.go + +DESCRIPTION + client.go provides an RTP client that will receive RTP on a UDP connection + +AUTHOR + Saxon A. Nelson-Milton + +LICENSE + rtp.go is Copyright (C) 2018 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). +*/ + +package rtp + +import ( + "io" + "net" + "sync" + + "bitbucket.org/ausocean/utils/ring" +) + +const ( + chanSize = 10 +) + +type Client struct { + conn *net.UDPConn + wg sync.WaitGroup + done chan struct{} + ring *ring.Buffer + op func([]byte) ([]byte, error) + ErrChan chan error +} + +func NewClient(addr string, op func([]byte) ([]byte, error)) (*Client, error) { + c := &Client{ + done: make(chan struct{}, 10), + ring: ring.NewBuffer(10, 4096, 0), + } + + a, err := net.ResolveUDPAddr("udp", addr) + if err != nil { + return nil, err + } + + c.conn, err = net.ListenUDP("udp", a) + if err != nil { + return nil, err + } + + return c, nil +} + +func (c *Client) Start() { + c.wg.Add(1) + go c.recv() +} + +func (c *Client) recv() { + defer c.wg.Done() + buf := make([]byte, 4096) + for { + select { + case <-c.done: + return + default: + n, _, err := c.conn.ReadFromUDP(buf) + if err != nil { + c.ErrChan <- err + continue + } + var _buf []byte + switch c.op { + case nil: + _buf = buf[:n] + default: + _buf, err = c.op(buf[:n]) + if err != nil { + c.ErrChan <- err + continue + } + } + _, err = c.ring.Write(_buf) + c.ring.Flush() + if err != nil { + c.ErrChan <- err + continue + } + } + } +} + +func (c *Client) Stop() { + close(c.done) + c.conn.Close() + c.wg.Wait() +} + +func (c *Client) Read(p []byte) (int, error) { + chunk, err := c.ring.Next(0) + if err != nil { + return 0, io.EOF + } + n := copy(p, chunk.Bytes()) + chunk.Close() + chunk = nil + return n, nil +} From b302eafa6809b488541b201325eb9e3cd54ab21c Mon Sep 17 00:00:00 2001 From: Saxon Date: Fri, 19 Apr 2019 18:22:43 +0930 Subject: [PATCH 02/18] protocol/rtp: setting Client's op field in constructor --- protocol/rtp/client.go | 1 + 1 file changed, 1 insertion(+) diff --git a/protocol/rtp/client.go b/protocol/rtp/client.go index 4140abc1..22dd3d71 100644 --- a/protocol/rtp/client.go +++ b/protocol/rtp/client.go @@ -52,6 +52,7 @@ func NewClient(addr string, op func([]byte) ([]byte, error)) (*Client, error) { c := &Client{ done: make(chan struct{}, 10), ring: ring.NewBuffer(10, 4096, 0), + op: op, } a, err := net.ResolveUDPAddr("udp", addr) From 190d546c589790c74354c599432998762378c193 Mon Sep 17 00:00:00 2001 From: Saxon Date: Fri, 19 Apr 2019 18:40:15 +0930 Subject: [PATCH 03/18] protocol/rtp: commented file --- protocol/rtp/client.go | 40 ++++++++++++++++++++++++++++++---------- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/protocol/rtp/client.go b/protocol/rtp/client.go index 22dd3d71..dcdf7e08 100644 --- a/protocol/rtp/client.go +++ b/protocol/rtp/client.go @@ -3,13 +3,15 @@ NAME client.go DESCRIPTION - client.go provides an RTP client that will receive RTP on a UDP connection + client.go provides an RTP client, Client, that will receive RTP on a UDP + connection, perform an operation on the packets and then store in a ringBuffer + that can be read from Read, an implemntation of io.Reader. AUTHOR Saxon A. Nelson-Milton LICENSE - rtp.go is Copyright (C) 2018 the Australian Ocean Lab (AusOcean) + This is Copyright (C) 2019 the Australian Ocean Lab (AusOcean). It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the @@ -19,10 +21,10 @@ LICENSE It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. + for more details. You should have received a copy of the GNU General Public License - along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). + in gpl.txt. If not, see http://www.gnu.org/licenses. */ package rtp @@ -39,15 +41,24 @@ const ( chanSize = 10 ) +// Client describes an RTP client that can receive an RTP stream and implements +// io.Reader. type Client struct { - conn *net.UDPConn - wg sync.WaitGroup - done chan struct{} - ring *ring.Buffer - op func([]byte) ([]byte, error) - ErrChan chan error + conn *net.UDPConn // The UDP connection RTP packets will be read from. + wg sync.WaitGroup // Used to wait for recv routine to finish. + done chan struct{} // Used to terminate the recv routine. + ring *ring.Buffer // Processed data from RTP packets will be stored here. + op func([]byte) ([]byte, error) // The operation to perform on received RTP packets before storing. + ErrChan chan error // Errors encountered during recv will be sent to this chan. } +// NewClient returns a pointer to a new Client. +// +// addr is the address of form : that we expect to receive +// RTP at. op is a function, if non nil, that will be used to perform an +// operation on each received RTP packet. For example, the op func may parse +// out the RTP payload. The result of the operation is then what is stored +// in the ringBuffer for reading. func NewClient(addr string, op func([]byte) ([]byte, error)) (*Client, error) { c := &Client{ done: make(chan struct{}, 10), @@ -68,11 +79,15 @@ func NewClient(addr string, op func([]byte) ([]byte, error)) (*Client, error) { return c, nil } +// Start will start the recv routine. func (c *Client) Start() { c.wg.Add(1) go c.recv() } +// recv will read RTP packets from the UDP connection, perform the operation +// on them given at creation of the Client and then store the result in the +// ringBuffer for Reading. func (c *Client) recv() { defer c.wg.Done() buf := make([]byte, 4096) @@ -107,12 +122,17 @@ func (c *Client) recv() { } } +// Stop will send a done signal to the receive routine, and also close the +// connection. func (c *Client) Stop() { close(c.done) c.conn.Close() c.wg.Wait() } +// Read implements io.Reader. +// +// Read will get the next chunk from the ringBuffer and copy the bytes to p. func (c *Client) Read(p []byte) (int, error) { chunk, err := c.ring.Next(0) if err != nil { From 00db293b441d3f378588649e22f481662a181756 Mon Sep 17 00:00:00 2001 From: Saxon Date: Sat, 20 Apr 2019 21:05:28 +0930 Subject: [PATCH 04/18] protocol/rtp: consts for chan and ringbuffer sizes --- protocol/rtp/client.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/protocol/rtp/client.go b/protocol/rtp/client.go index dcdf7e08..b6d80956 100644 --- a/protocol/rtp/client.go +++ b/protocol/rtp/client.go @@ -37,16 +37,23 @@ import ( "bitbucket.org/ausocean/utils/ring" ) +// Misc consts. const ( chanSize = 10 ) +// RingBuffer consts. +const ( + ringBufferSize = 10 + ringBufferElementSize = 4096 +) + // Client describes an RTP client that can receive an RTP stream and implements // io.Reader. type Client struct { conn *net.UDPConn // The UDP connection RTP packets will be read from. wg sync.WaitGroup // Used to wait for recv routine to finish. - done chan struct{} // Used to terminate the recv routine. + done chan struct{} // Used to terminate the recv routine. ring *ring.Buffer // Processed data from RTP packets will be stored here. op func([]byte) ([]byte, error) // The operation to perform on received RTP packets before storing. ErrChan chan error // Errors encountered during recv will be sent to this chan. @@ -61,8 +68,8 @@ type Client struct { // in the ringBuffer for reading. func NewClient(addr string, op func([]byte) ([]byte, error)) (*Client, error) { c := &Client{ - done: make(chan struct{}, 10), - ring: ring.NewBuffer(10, 4096, 0), + done: make(chan struct{}, chanSize), + ring: ring.NewBuffer(ringBufferSize, ringBufferElementSize, 0), op: op, } @@ -90,7 +97,7 @@ func (c *Client) Start() { // ringBuffer for Reading. func (c *Client) recv() { defer c.wg.Done() - buf := make([]byte, 4096) + buf := make([]byte, ringBufferElementSize) for { select { case <-c.done: @@ -101,6 +108,7 @@ func (c *Client) recv() { c.ErrChan <- err continue } + var _buf []byte switch c.op { case nil: @@ -112,6 +120,7 @@ func (c *Client) recv() { continue } } + _, err = c.ring.Write(_buf) c.ring.Flush() if err != nil { From f454a3485624e39a89271b9d575887008bbe3198 Mon Sep 17 00:00:00 2001 From: Saxon Date: Sat, 20 Apr 2019 21:10:03 +0930 Subject: [PATCH 05/18] protocol/rtp: added client_test.go for client testing --- protocol/rtp/client_test.go | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 protocol/rtp/client_test.go diff --git a/protocol/rtp/client_test.go b/protocol/rtp/client_test.go new file mode 100644 index 00000000..754f578a --- /dev/null +++ b/protocol/rtp/client_test.go @@ -0,0 +1,35 @@ +/* +NAME + client_test.go + +DESCRIPTION + client_test.go provides testing utilities to check RTP client functionality + provided in client.go. + +AUTHOR + Saxon A. Nelson-Milton + +LICENSE + This is Copyright (C) 2019 the Australian Ocean Lab (AusOcean). + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + +package rtp + +import "testing" + +func TestReceive(t *testing.T) { + +} From be76998c7df6fabc35a451fae102f205a0dd4232 Mon Sep 17 00:00:00 2001 From: Saxon Date: Sun, 21 Apr 2019 22:40:08 +0930 Subject: [PATCH 06/18] protocol/rtp: wrote TestReceiveNoOp Wrote test TestReceiveNoOP to check that client works correctly when we give no operation to perform on RTP packets before storing in the client ringBuffer, which calling io.Reader implementation Read will get packets form. --- protocol/rtp/client.go | 46 ++++++++++----- protocol/rtp/client_test.go | 109 +++++++++++++++++++++++++++++++++++- 2 files changed, 139 insertions(+), 16 deletions(-) diff --git a/protocol/rtp/client.go b/protocol/rtp/client.go index b6d80956..00682308 100644 --- a/protocol/rtp/client.go +++ b/protocol/rtp/client.go @@ -33,13 +33,16 @@ import ( "io" "net" "sync" + "time" + "bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/ring" ) // Misc consts. const ( chanSize = 10 + pkg = "rtp: " ) // RingBuffer consts. @@ -48,6 +51,8 @@ const ( ringBufferElementSize = 4096 ) +type log func(lvl int8, msg string, args ...interface{}) + // Client describes an RTP client that can receive an RTP stream and implements // io.Reader. type Client struct { @@ -57,6 +62,8 @@ type Client struct { ring *ring.Buffer // Processed data from RTP packets will be stored here. op func([]byte) ([]byte, error) // The operation to perform on received RTP packets before storing. ErrChan chan error // Errors encountered during recv will be sent to this chan. + rt time.Duration // Read timeout used when reading from the ringbuffer. + log } // NewClient returns a pointer to a new Client. @@ -65,12 +72,17 @@ type Client struct { // RTP at. op is a function, if non nil, that will be used to perform an // operation on each received RTP packet. For example, the op func may parse // out the RTP payload. The result of the operation is then what is stored -// in the ringBuffer for reading. -func NewClient(addr string, op func([]byte) ([]byte, error)) (*Client, error) { +// in the ringBuffer for reading. l is a logging function defined by the +// signuture of the log type defined above. rt is the read timeout used when +// reading from the client ringbuffer. +func NewClient(addr string, op func([]byte) ([]byte, error), l log, rt time.Duration) (*Client, error) { c := &Client{ - done: make(chan struct{}, chanSize), - ring: ring.NewBuffer(ringBufferSize, ringBufferElementSize, 0), - op: op, + done: make(chan struct{}, chanSize), + ring: ring.NewBuffer(ringBufferSize, ringBufferElementSize, 0), + op: op, + log: l, + ErrChan: make(chan error), + rt: rt, } a, err := net.ResolveUDPAddr("udp", addr) @@ -88,10 +100,20 @@ func NewClient(addr string, op func([]byte) ([]byte, error)) (*Client, error) { // Start will start the recv routine. func (c *Client) Start() { + c.log(logger.Info, pkg+"starting client") c.wg.Add(1) go c.recv() } +// Stop will send a done signal to the receive routine, and also close the +// connection. +func (c *Client) Stop() { + c.log(logger.Info, pkg+"stopping client") + c.conn.Close() + close(c.done) + c.wg.Wait() +} + // recv will read RTP packets from the UDP connection, perform the operation // on them given at creation of the Client and then store the result in the // ringBuffer for Reading. @@ -101,13 +123,16 @@ func (c *Client) recv() { for { select { case <-c.done: + c.log(logger.Debug, pkg+"done signal received") return default: + c.log(logger.Debug, pkg+"waiting for packet") n, _, err := c.conn.ReadFromUDP(buf) if err != nil { c.ErrChan <- err continue } + c.log(logger.Debug, pkg+"packet received", "packet", buf[:n]) var _buf []byte switch c.op { @@ -131,19 +156,12 @@ func (c *Client) recv() { } } -// Stop will send a done signal to the receive routine, and also close the -// connection. -func (c *Client) Stop() { - close(c.done) - c.conn.Close() - c.wg.Wait() -} - // Read implements io.Reader. // // Read will get the next chunk from the ringBuffer and copy the bytes to p. func (c *Client) Read(p []byte) (int, error) { - chunk, err := c.ring.Next(0) + c.log(logger.Debug, pkg+"user reading packet") + chunk, err := c.ring.Next(c.rt) if err != nil { return 0, io.EOF } diff --git a/protocol/rtp/client_test.go b/protocol/rtp/client_test.go index 754f578a..d3f3fcb6 100644 --- a/protocol/rtp/client_test.go +++ b/protocol/rtp/client_test.go @@ -28,8 +28,113 @@ LICENSE package rtp -import "testing" +import ( + "bytes" + "io" + "net" + "testing" + "time" -func TestReceive(t *testing.T) { + "bitbucket.org/ausocean/utils/logger" +) + +// dummyLogger will allow logging to be done by the testing pkg. +type dummyLogger testing.T + +func (dl *dummyLogger) log(lvl int8, msg string, args ...interface{}) { + var l string + switch lvl { + case logger.Warning: + l = "warning" + case logger.Debug: + l = "debug" + case logger.Info: + l = "info" + case logger.Error: + l = "error" + case logger.Fatal: + l = "fatal" + } + msg = l + ": " + msg + for i := 0; i < len(args); i++ { + msg += " %v" + } + if len(args) == 0 { + dl.Log(msg + "\n") + return + } + dl.Logf(msg+"\n", args...) +} + +// TestReceiveNoOp will check that we can successfully use a Client to receive +// RTP using no operation. +func TestReceiveNoOp(t *testing.T) { + const clientAddr = "localhost:8000" + const packetsToSend = 20 + + // Create new client; note that op function set to nil, i.e. we don't want to + // perform any operation on packet before storing in ringbuffer. + c, err := NewClient(clientAddr, nil, (*dummyLogger)(t).log, 1*time.Millisecond) + if err != nil { + t.Fatalf("could not create client, failed with error: %v", err) + } + c.Start() + + // Log any errors from client. + go func() { + for { + err := <-c.ErrChan + t.Logf("unexpected error from client: %v", err) + } + }() + + // Start the RTP 'server'. + go func() { + cAddr, err := net.ResolveUDPAddr("udp", clientAddr) + if err != nil { + t.Fatalf("could not resolve server address, failed with err: %v", err) + } + + conn, err := net.DialUDP("udp", nil, cAddr) + if err != nil { + t.Fatalf("could not dial udp, failed with err: %v", err) + } + + // Send packets to the client. Packet payload will just be the packet number. + for i := 0; i < packetsToSend; i++ { + p := (&Pkt{V: rtpVer, Payload: []byte{byte(i)}}).Bytes(nil) + _, err := conn.Write(p) + if err != nil { + t.Errorf("could not write packet to conn, failed with err: %v", err) + } + } + }() + + // Read packets using the client and check them with expected. + var packetsReceived int + buf := make([]byte, 4096) + for { + if packetsReceived == packetsToSend { + break + } + n, err := c.Read(buf) + switch err { + case nil: + case io.EOF: + continue + default: + t.Fatalf("unexpected error: %v", err) + } + expect := (&Pkt{V: rtpVer, Payload: []byte{byte(packetsReceived)}}).Bytes(nil) + got := buf[:n] + if !bytes.Equal(got, expect) { + t.Errorf("did not get expected result. \nGot: %v\n Want: %v\n", got, expect) + } + packetsReceived++ + } + c.Stop() +} + +func TestReceiveOp(t *testing.T) { } From a0c324a8137a0e3264d2eb90056a26f2b4f57b8d Mon Sep 17 00:00:00 2001 From: Saxon Date: Sun, 21 Apr 2019 23:07:12 +0930 Subject: [PATCH 07/18] protocol/rtp: simplified test Simplified clien_test.go file by testing two different RTP packet operations using loop. We now first test no operation, and then test a rtp.Payload operation, which gets the payload of the packets and stores them in the client ringbuffer for the user of the client to read. --- protocol/rtp/client.go | 2 +- protocol/rtp/client_test.go | 135 +++++++++++++++++++----------------- 2 files changed, 73 insertions(+), 64 deletions(-) diff --git a/protocol/rtp/client.go b/protocol/rtp/client.go index 00682308..daab5fee 100644 --- a/protocol/rtp/client.go +++ b/protocol/rtp/client.go @@ -160,7 +160,7 @@ func (c *Client) recv() { // // Read will get the next chunk from the ringBuffer and copy the bytes to p. func (c *Client) Read(p []byte) (int, error) { - c.log(logger.Debug, pkg+"user reading packet") + c.log(logger.Debug, pkg+"user reading data") chunk, err := c.ring.Next(c.rt) if err != nil { return 0, io.EOF diff --git a/protocol/rtp/client_test.go b/protocol/rtp/client_test.go index d3f3fcb6..23753c64 100644 --- a/protocol/rtp/client_test.go +++ b/protocol/rtp/client_test.go @@ -66,75 +66,84 @@ func (dl *dummyLogger) log(lvl int8, msg string, args ...interface{}) { dl.Logf(msg+"\n", args...) } -// TestReceiveNoOp will check that we can successfully use a Client to receive -// RTP using no operation. -func TestReceiveNoOp(t *testing.T) { - const clientAddr = "localhost:8000" - const packetsToSend = 20 +// TestReceive checks that the Client can correctly receive RTP packets and +// perform a specificed operation on the packets before storing in the ringBuffer. +func TestReceive(t *testing.T) { + const ( + clientAddr = "localhost:8000" + packetsToSend = 20 + ) - // Create new client; note that op function set to nil, i.e. we don't want to - // perform any operation on packet before storing in ringbuffer. - c, err := NewClient(clientAddr, nil, (*dummyLogger)(t).log, 1*time.Millisecond) - if err != nil { - t.Fatalf("could not create client, failed with error: %v", err) - } - c.Start() - - // Log any errors from client. - go func() { - for { - err := <-c.ErrChan - t.Logf("unexpected error from client: %v", err) - } - }() - - // Start the RTP 'server'. - go func() { - cAddr, err := net.ResolveUDPAddr("udp", clientAddr) + for i, op := range []func([]byte) ([]byte, error){nil, Payload} { + t.Logf("running op: %v", i) + // Create new client and give current operation we are testing. + c, err := NewClient(clientAddr, op, (*dummyLogger)(t).log, 1*time.Millisecond) if err != nil { - t.Fatalf("could not resolve server address, failed with err: %v", err) + t.Fatalf("could not create client, failed with error: %v", err) } + c.Start() - conn, err := net.DialUDP("udp", nil, cAddr) - if err != nil { - t.Fatalf("could not dial udp, failed with err: %v", err) - } - - // Send packets to the client. Packet payload will just be the packet number. - for i := 0; i < packetsToSend; i++ { - p := (&Pkt{V: rtpVer, Payload: []byte{byte(i)}}).Bytes(nil) - _, err := conn.Write(p) - if err != nil { - t.Errorf("could not write packet to conn, failed with err: %v", err) + // Log any errors from client. + go func() { + for { + err := <-c.ErrChan + t.Logf("unexpected error from client: %v", err) } - } - }() + }() - // Read packets using the client and check them with expected. - var packetsReceived int - buf := make([]byte, 4096) - for { - if packetsReceived == packetsToSend { - break + // Start the RTP 'server'. + go func() { + cAddr, err := net.ResolveUDPAddr("udp", clientAddr) + if err != nil { + t.Fatalf("could not resolve server address, failed with err: %v", err) + } + + conn, err := net.DialUDP("udp", nil, cAddr) + if err != nil { + t.Fatalf("could not dial udp, failed with err: %v", err) + } + + // Send packets to the client. + for i := 0; i < packetsToSend; i++ { + p := (&Pkt{V: rtpVer, Payload: []byte{byte(i)}}).Bytes(nil) + _, err := conn.Write(p) + if err != nil { + t.Errorf("could not write packet to conn, failed with err: %v", err) + } + } + }() + + // Read packets using the client and check them with expected. + var packetsReceived int + buf := make([]byte, 4096) + for { + if packetsReceived == packetsToSend { + break + } + + n, err := c.Read(buf) + switch err { + case nil: + case io.EOF: + continue + default: + t.Fatalf("unexpected error: %v", err) + } + + expect := (&Pkt{V: rtpVer, Payload: []byte{byte(packetsReceived)}}).Bytes(nil) + if op != nil { + expect, err = op(expect) + if err != nil { + t.Fatalf("unexpected error when applying op: %v", err) + } + } + + got := buf[:n] + if !bytes.Equal(got, expect) { + t.Errorf("did not get expected result. \nGot: %v\n Want: %v\n", got, expect) + } + packetsReceived++ } - n, err := c.Read(buf) - switch err { - case nil: - case io.EOF: - continue - default: - t.Fatalf("unexpected error: %v", err) - } - expect := (&Pkt{V: rtpVer, Payload: []byte{byte(packetsReceived)}}).Bytes(nil) - got := buf[:n] - if !bytes.Equal(got, expect) { - t.Errorf("did not get expected result. \nGot: %v\n Want: %v\n", got, expect) - } - packetsReceived++ + c.Stop() } - c.Stop() -} - -func TestReceiveOp(t *testing.T) { - } From 49a401681ddaf4da9e3e0639014849e82fc07a65 Mon Sep 17 00:00:00 2001 From: Saxon Date: Tue, 30 Apr 2019 02:37:18 +0930 Subject: [PATCH 08/18] protocol/rtp: Client.ErrChan => Client.err and wrote accessor function Client.Err() to access this chan as only receive --- protocol/rtp/client.go | 36 ++++++++++++++++++++---------------- protocol/rtp/client_test.go | 2 +- 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/protocol/rtp/client.go b/protocol/rtp/client.go index daab5fee..fb5e0b13 100644 --- a/protocol/rtp/client.go +++ b/protocol/rtp/client.go @@ -56,13 +56,13 @@ type log func(lvl int8, msg string, args ...interface{}) // Client describes an RTP client that can receive an RTP stream and implements // io.Reader. type Client struct { - conn *net.UDPConn // The UDP connection RTP packets will be read from. - wg sync.WaitGroup // Used to wait for recv routine to finish. - done chan struct{} // Used to terminate the recv routine. - ring *ring.Buffer // Processed data from RTP packets will be stored here. - op func([]byte) ([]byte, error) // The operation to perform on received RTP packets before storing. - ErrChan chan error // Errors encountered during recv will be sent to this chan. - rt time.Duration // Read timeout used when reading from the ringbuffer. + conn *net.UDPConn // The UDP connection RTP packets will be read from. + wg sync.WaitGroup // Used to wait for recv routine to finish. + done chan struct{} // Used to terminate the recv routine. + ring *ring.Buffer // Processed data from RTP packets will be stored here. + op func([]byte) ([]byte, error) // The operation to perform on received RTP packets before storing. + err chan error // Errors encountered during recv will be sent to this chan. + rt time.Duration // Read timeout used when reading from the ringbuffer. log } @@ -77,12 +77,12 @@ type Client struct { // reading from the client ringbuffer. func NewClient(addr string, op func([]byte) ([]byte, error), l log, rt time.Duration) (*Client, error) { c := &Client{ - done: make(chan struct{}, chanSize), - ring: ring.NewBuffer(ringBufferSize, ringBufferElementSize, 0), - op: op, - log: l, - ErrChan: make(chan error), - rt: rt, + done: make(chan struct{}, chanSize), + ring: ring.NewBuffer(ringBufferSize, ringBufferElementSize, 0), + op: op, + log: l, + err: make(chan error), + rt: rt, } a, err := net.ResolveUDPAddr("udp", addr) @@ -114,6 +114,10 @@ func (c *Client) Stop() { c.wg.Wait() } +func (c *Client) Err() <-chan error { + return c.err +} + // recv will read RTP packets from the UDP connection, perform the operation // on them given at creation of the Client and then store the result in the // ringBuffer for Reading. @@ -129,7 +133,7 @@ func (c *Client) recv() { c.log(logger.Debug, pkg+"waiting for packet") n, _, err := c.conn.ReadFromUDP(buf) if err != nil { - c.ErrChan <- err + c.err <- err continue } c.log(logger.Debug, pkg+"packet received", "packet", buf[:n]) @@ -141,7 +145,7 @@ func (c *Client) recv() { default: _buf, err = c.op(buf[:n]) if err != nil { - c.ErrChan <- err + c.err <- err continue } } @@ -149,7 +153,7 @@ func (c *Client) recv() { _, err = c.ring.Write(_buf) c.ring.Flush() if err != nil { - c.ErrChan <- err + c.err <- err continue } } diff --git a/protocol/rtp/client_test.go b/protocol/rtp/client_test.go index 23753c64..1775d2f4 100644 --- a/protocol/rtp/client_test.go +++ b/protocol/rtp/client_test.go @@ -86,7 +86,7 @@ func TestReceive(t *testing.T) { // Log any errors from client. go func() { for { - err := <-c.ErrChan + err := <-c.Err() t.Logf("unexpected error from client: %v", err) } }() From 6694cab9565d70687d350f94000ae05eaca7bd0f Mon Sep 17 00:00:00 2001 From: Saxon Date: Tue, 30 Apr 2019 02:38:38 +0930 Subject: [PATCH 09/18] protocol/rtp: commented Client.Err() --- protocol/rtp/client.go | 1 + 1 file changed, 1 insertion(+) diff --git a/protocol/rtp/client.go b/protocol/rtp/client.go index fb5e0b13..95cbe128 100644 --- a/protocol/rtp/client.go +++ b/protocol/rtp/client.go @@ -114,6 +114,7 @@ func (c *Client) Stop() { c.wg.Wait() } +// Err returns the client err channel as receive only. func (c *Client) Err() <-chan error { return c.err } From d358f70585b6596ff8e221781e887f10de5fa5d1 Mon Sep 17 00:00:00 2001 From: Saxon Date: Tue, 30 Apr 2019 09:39:41 +0930 Subject: [PATCH 10/18] protocol/rtp: made client_test.go more robust --- protocol/rtp/client_test.go | 107 +++++++++++++++++++++--------------- 1 file changed, 63 insertions(+), 44 deletions(-) diff --git a/protocol/rtp/client_test.go b/protocol/rtp/client_test.go index 1775d2f4..5ed0d2e0 100644 --- a/protocol/rtp/client_test.go +++ b/protocol/rtp/client_test.go @@ -30,6 +30,7 @@ package rtp import ( "bytes" + "fmt" "io" "net" "testing" @@ -74,33 +75,68 @@ func TestReceive(t *testing.T) { packetsToSend = 20 ) - for i, op := range []func([]byte) ([]byte, error){nil, Payload} { - t.Logf("running op: %v", i) - // Create new client and give current operation we are testing. - c, err := NewClient(clientAddr, op, (*dummyLogger)(t).log, 1*time.Millisecond) - if err != nil { - t.Fatalf("could not create client, failed with error: %v", err) - } - c.Start() + for _, op := range []func([]byte) ([]byte, error){nil, Payload} { + testErr := make(chan error) + serverErr := make(chan error) + done := make(chan struct{}) + clientReady := make(chan struct{}) + var c *Client - // Log any errors from client. + // Start routine to read from client. go func() { - for { - err := <-c.Err() - t.Logf("unexpected error from client: %v", err) + // Create and start the client. + var err error + c, err = NewClient(clientAddr, op, (*dummyLogger)(t).log, 1*time.Millisecond) + if err != nil { + testErr <- fmt.Errorf("could not create client, failed with error: %v\n", err) } + c.Start() + close(clientReady) + + // Read packets using the client and check them with expected. + var packetsReceived int + buf := make([]byte, 4096) + for packetsReceived != packetsToSend { + n, err := c.Read(buf) + switch err { + case nil: + case io.EOF: + continue + default: + testErr <- fmt.Errorf("unexpected error from c.Read: %v\n", err) + } + + // Create expected data and apply operation if there is one. + expect := (&Pkt{V: rtpVer, Payload: []byte{byte(packetsReceived)}}).Bytes(nil) + if op != nil { + expect, err = op(expect) + if err != nil { + testErr <- fmt.Errorf("unexpected error when applying op: %v\n", err) + } + } + + // Compare. + got := buf[:n] + if !bytes.Equal(got, expect) { + testErr <- fmt.Errorf("did not get expected result. \nGot: %v\n Want: %v\n", got, expect) + } + packetsReceived++ + } + c.Stop() + close(done) }() - // Start the RTP 'server'. + // Start the RTP server. go func() { + <-clientReady cAddr, err := net.ResolveUDPAddr("udp", clientAddr) if err != nil { - t.Fatalf("could not resolve server address, failed with err: %v", err) + serverErr <- fmt.Errorf("could not resolve server address, failed with err: %v\n", err) } conn, err := net.DialUDP("udp", nil, cAddr) if err != nil { - t.Fatalf("could not dial udp, failed with err: %v", err) + serverErr <- fmt.Errorf("could not dial udp, failed with err: %v\n", err) } // Send packets to the client. @@ -108,42 +144,25 @@ func TestReceive(t *testing.T) { p := (&Pkt{V: rtpVer, Payload: []byte{byte(i)}}).Bytes(nil) _, err := conn.Write(p) if err != nil { - t.Errorf("could not write packet to conn, failed with err: %v", err) + serverErr <- fmt.Errorf("could not write packet to conn, failed with err: %v\n", err) } } }() - // Read packets using the client and check them with expected. - var packetsReceived int - buf := make([]byte, 4096) + <-clientReady + loop: for { - if packetsReceived == packetsToSend { - break - } - - n, err := c.Read(buf) - switch err { - case nil: - case io.EOF: - continue + select { + case err := <-c.Err(): + t.Log(err) + case err := <-testErr: + t.Fatal(err) + case err := <-serverErr: + t.Fatal(err) + case <-done: + break loop default: - t.Fatalf("unexpected error: %v", err) } - - expect := (&Pkt{V: rtpVer, Payload: []byte{byte(packetsReceived)}}).Bytes(nil) - if op != nil { - expect, err = op(expect) - if err != nil { - t.Fatalf("unexpected error when applying op: %v", err) - } - } - - got := buf[:n] - if !bytes.Equal(got, expect) { - t.Errorf("did not get expected result. \nGot: %v\n Want: %v\n", got, expect) - } - packetsReceived++ } - c.Stop() } } From 80a7d41d8a9ca6f16334887bd6ba80d191388e5a Mon Sep 17 00:00:00 2001 From: Saxon Date: Wed, 1 May 2019 14:15:39 +0930 Subject: [PATCH 11/18] protocol/rtp: removed op from Client i.e. what is read from Client are RTP packets. --- protocol/rtp/client.go | 32 ++------ protocol/rtp/client_test.go | 160 +++++++++++++++++------------------- 2 files changed, 85 insertions(+), 107 deletions(-) diff --git a/protocol/rtp/client.go b/protocol/rtp/client.go index 95cbe128..0635177d 100644 --- a/protocol/rtp/client.go +++ b/protocol/rtp/client.go @@ -47,7 +47,7 @@ const ( // RingBuffer consts. const ( - ringBufferSize = 10 + ringBufferSize = 100 ringBufferElementSize = 4096 ) @@ -56,13 +56,12 @@ type log func(lvl int8, msg string, args ...interface{}) // Client describes an RTP client that can receive an RTP stream and implements // io.Reader. type Client struct { - conn *net.UDPConn // The UDP connection RTP packets will be read from. - wg sync.WaitGroup // Used to wait for recv routine to finish. - done chan struct{} // Used to terminate the recv routine. - ring *ring.Buffer // Processed data from RTP packets will be stored here. - op func([]byte) ([]byte, error) // The operation to perform on received RTP packets before storing. - err chan error // Errors encountered during recv will be sent to this chan. - rt time.Duration // Read timeout used when reading from the ringbuffer. + conn *net.UDPConn // The UDP connection RTP packets will be read from. + wg sync.WaitGroup // Used to wait for recv routine to finish. + done chan struct{} // Used to terminate the recv routine. + ring *ring.Buffer // Processed data from RTP packets will be stored here. + err chan error // Errors encountered during recv will be sent to this chan. + rt time.Duration // Read timeout used when reading from the ringbuffer. log } @@ -75,11 +74,10 @@ type Client struct { // in the ringBuffer for reading. l is a logging function defined by the // signuture of the log type defined above. rt is the read timeout used when // reading from the client ringbuffer. -func NewClient(addr string, op func([]byte) ([]byte, error), l log, rt time.Duration) (*Client, error) { +func NewClient(addr string, l log, rt time.Duration) (*Client, error) { c := &Client{ done: make(chan struct{}, chanSize), ring: ring.NewBuffer(ringBufferSize, ringBufferElementSize, 0), - op: op, log: l, err: make(chan error), rt: rt, @@ -139,19 +137,7 @@ func (c *Client) recv() { } c.log(logger.Debug, pkg+"packet received", "packet", buf[:n]) - var _buf []byte - switch c.op { - case nil: - _buf = buf[:n] - default: - _buf, err = c.op(buf[:n]) - if err != nil { - c.err <- err - continue - } - } - - _, err = c.ring.Write(_buf) + _, err = c.ring.Write(buf[:n]) c.ring.Flush() if err != nil { c.err <- err diff --git a/protocol/rtp/client_test.go b/protocol/rtp/client_test.go index 5ed0d2e0..69b8169b 100644 --- a/protocol/rtp/client_test.go +++ b/protocol/rtp/client_test.go @@ -75,94 +75,86 @@ func TestReceive(t *testing.T) { packetsToSend = 20 ) - for _, op := range []func([]byte) ([]byte, error){nil, Payload} { - testErr := make(chan error) - serverErr := make(chan error) - done := make(chan struct{}) - clientReady := make(chan struct{}) - var c *Client + testErr := make(chan error) + serverErr := make(chan error) + done := make(chan struct{}) + clientReady := make(chan struct{}) + var c *Client - // Start routine to read from client. - go func() { - // Create and start the client. - var err error - c, err = NewClient(clientAddr, op, (*dummyLogger)(t).log, 1*time.Millisecond) - if err != nil { - testErr <- fmt.Errorf("could not create client, failed with error: %v\n", err) - } - c.Start() - close(clientReady) + // Start routine to read from client. + go func() { + // Create and start the client. + var err error + c, err = NewClient(clientAddr, (*dummyLogger)(t).log, 1*time.Millisecond) + if err != nil { + testErr <- fmt.Errorf("could not create client, failed with error: %v\n", err) + } + c.Start() + close(clientReady) - // Read packets using the client and check them with expected. - var packetsReceived int - buf := make([]byte, 4096) - for packetsReceived != packetsToSend { - n, err := c.Read(buf) - switch err { - case nil: - case io.EOF: - continue - default: - testErr <- fmt.Errorf("unexpected error from c.Read: %v\n", err) - } - - // Create expected data and apply operation if there is one. - expect := (&Pkt{V: rtpVer, Payload: []byte{byte(packetsReceived)}}).Bytes(nil) - if op != nil { - expect, err = op(expect) - if err != nil { - testErr <- fmt.Errorf("unexpected error when applying op: %v\n", err) - } - } - - // Compare. - got := buf[:n] - if !bytes.Equal(got, expect) { - testErr <- fmt.Errorf("did not get expected result. \nGot: %v\n Want: %v\n", got, expect) - } - packetsReceived++ - } - c.Stop() - close(done) - }() - - // Start the RTP server. - go func() { - <-clientReady - cAddr, err := net.ResolveUDPAddr("udp", clientAddr) - if err != nil { - serverErr <- fmt.Errorf("could not resolve server address, failed with err: %v\n", err) - } - - conn, err := net.DialUDP("udp", nil, cAddr) - if err != nil { - serverErr <- fmt.Errorf("could not dial udp, failed with err: %v\n", err) - } - - // Send packets to the client. - for i := 0; i < packetsToSend; i++ { - p := (&Pkt{V: rtpVer, Payload: []byte{byte(i)}}).Bytes(nil) - _, err := conn.Write(p) - if err != nil { - serverErr <- fmt.Errorf("could not write packet to conn, failed with err: %v\n", err) - } - } - }() - - <-clientReady - loop: - for { - select { - case err := <-c.Err(): - t.Log(err) - case err := <-testErr: - t.Fatal(err) - case err := <-serverErr: - t.Fatal(err) - case <-done: - break loop + // Read packets using the client and check them with expected. + var packetsReceived int + buf := make([]byte, 4096) + for packetsReceived != packetsToSend { + n, err := c.Read(buf) + switch err { + case nil: + case io.EOF: + continue default: + testErr <- fmt.Errorf("unexpected error from c.Read: %v\n", err) } + + // Create expected data and apply operation if there is one. + expect := (&Pkt{V: rtpVer, Payload: []byte{byte(packetsReceived)}}).Bytes(nil) + + // Compare. + got := buf[:n] + if !bytes.Equal(got, expect) { + testErr <- fmt.Errorf("did not get expected result. \nGot: %v\n Want: %v\n", got, expect) + } + packetsReceived++ + } + c.Stop() + close(done) + }() + + // Start the RTP server. + go func() { + <-clientReady + cAddr, err := net.ResolveUDPAddr("udp", clientAddr) + if err != nil { + serverErr <- fmt.Errorf("could not resolve server address, failed with err: %v\n", err) + } + + conn, err := net.DialUDP("udp", nil, cAddr) + if err != nil { + serverErr <- fmt.Errorf("could not dial udp, failed with err: %v\n", err) + } + + // Send packets to the client. + for i := 0; i < packetsToSend; i++ { + p := (&Pkt{V: rtpVer, Payload: []byte{byte(i)}}).Bytes(nil) + _, err := conn.Write(p) + if err != nil { + serverErr <- fmt.Errorf("could not write packet to conn, failed with err: %v\n", err) + } + } + }() + + <-clientReady +loop: + for { + select { + case err := <-c.Err(): + t.Log(err) + case err := <-testErr: + t.Fatal(err) + case err := <-serverErr: + t.Fatal(err) + case <-done: + break loop + default: } } } From 2f039e0c4b9d3f3d0dc6c0b59620cbc811f760db Mon Sep 17 00:00:00 2001 From: Saxon Date: Fri, 3 May 2019 20:02:29 +0930 Subject: [PATCH 12/18] protoocl/rtp: removed comment for 'misc constants' --- protocol/rtp/client.go | 1 - 1 file changed, 1 deletion(-) diff --git a/protocol/rtp/client.go b/protocol/rtp/client.go index 0635177d..f69c2986 100644 --- a/protocol/rtp/client.go +++ b/protocol/rtp/client.go @@ -39,7 +39,6 @@ import ( "bitbucket.org/ausocean/utils/ring" ) -// Misc consts. const ( chanSize = 10 pkg = "rtp: " From c9aa43394bf3037730053e2fd87b816902c25d3f Mon Sep 17 00:00:00 2001 From: Saxon Date: Fri, 3 May 2019 20:04:06 +0930 Subject: [PATCH 13/18] protocol/rtp/client.go: got rid of useless continue in recv dst write if error encountered --- protocol/rtp/client.go | 1 - 1 file changed, 1 deletion(-) diff --git a/protocol/rtp/client.go b/protocol/rtp/client.go index f69c2986..e8998917 100644 --- a/protocol/rtp/client.go +++ b/protocol/rtp/client.go @@ -140,7 +140,6 @@ func (c *Client) recv() { c.ring.Flush() if err != nil { c.err <- err - continue } } } From aa7553947a9065704d54661f3cf1277751072ae9 Mon Sep 17 00:00:00 2001 From: Saxon Date: Wed, 8 May 2019 15:45:08 +0930 Subject: [PATCH 14/18] protocol/rtp/rtp.go: renamed Pkt type to Packet. --- protocol/rtp/client_test.go | 4 ++-- protocol/rtp/encoder.go | 2 +- protocol/rtp/parse_test.go | 14 +++++++------- protocol/rtp/rtp.go | 4 ++-- protocol/rtp/rtp_test.go | 10 +++++----- 5 files changed, 17 insertions(+), 17 deletions(-) diff --git a/protocol/rtp/client_test.go b/protocol/rtp/client_test.go index 69b8169b..7137ef4b 100644 --- a/protocol/rtp/client_test.go +++ b/protocol/rtp/client_test.go @@ -106,7 +106,7 @@ func TestReceive(t *testing.T) { } // Create expected data and apply operation if there is one. - expect := (&Pkt{V: rtpVer, Payload: []byte{byte(packetsReceived)}}).Bytes(nil) + expect := (&Packet{V: rtpVer, Payload: []byte{byte(packetsReceived)}}).Bytes(nil) // Compare. got := buf[:n] @@ -134,7 +134,7 @@ func TestReceive(t *testing.T) { // Send packets to the client. for i := 0; i < packetsToSend; i++ { - p := (&Pkt{V: rtpVer, Payload: []byte{byte(i)}}).Bytes(nil) + p := (&Packet{V: rtpVer, Payload: []byte{byte(i)}}).Bytes(nil) _, err := conn.Write(p) if err != nil { serverErr <- fmt.Errorf("could not write packet to conn, failed with err: %v\n", err) diff --git a/protocol/rtp/encoder.go b/protocol/rtp/encoder.go index 587f64c1..d74ea97c 100644 --- a/protocol/rtp/encoder.go +++ b/protocol/rtp/encoder.go @@ -97,7 +97,7 @@ func min(a, b int) int { // Encode takes a nalu unit and encodes it into an rtp packet and // writes to the io.Writer given in NewEncoder func (e *Encoder) Encode(payload []byte) error { - pkt := Pkt{ + pkt := Packet{ V: rtpVer, // version X: false, // header extension CC: 0, // CSRC count diff --git a/protocol/rtp/parse_test.go b/protocol/rtp/parse_test.go index f3468c57..1f046f68 100644 --- a/protocol/rtp/parse_test.go +++ b/protocol/rtp/parse_test.go @@ -35,7 +35,7 @@ import ( // TestVersion checks that we can correctly get the version from an RTP packet. func TestVersion(t *testing.T) { const expect = 1 - got := version((&Pkt{V: expect}).Bytes(nil)) + got := version((&Packet{V: expect}).Bytes(nil)) if got != expect { t.Errorf("unexpected version for RTP packet. Got: %v\n Want: %v\n", got, expect) } @@ -46,7 +46,7 @@ func TestVersion(t *testing.T) { func TestCsrcCount(t *testing.T) { const ver, expect = 2, 2 - pkt := (&Pkt{ + pkt := (&Packet{ V: ver, CC: expect, CSRC: make([][4]byte, expect), @@ -64,7 +64,7 @@ func TestHasExt(t *testing.T) { const ver = 2 // First check for when there is an extension field. - pkt := &Pkt{ + pkt := &Packet{ V: ver, X: true, Extension: ExtensionHeader{ @@ -93,19 +93,19 @@ func TestPayload(t *testing.T) { expect := []byte{0x01, 0x02, 0x03, 0x04, 0x05} testPkts := [][]byte{ - (&Pkt{ + (&Packet{ V: ver, Payload: expect, }).Bytes(nil), - (&Pkt{ + (&Packet{ V: ver, CC: 3, CSRC: make([][4]byte, 3), Payload: expect, }).Bytes(nil), - (&Pkt{ + (&Packet{ V: ver, X: true, Extension: ExtensionHeader{ @@ -115,7 +115,7 @@ func TestPayload(t *testing.T) { Payload: expect, }).Bytes(nil), - (&Pkt{ + (&Packet{ V: ver, CC: 3, CSRC: make([][4]byte, 3), diff --git a/protocol/rtp/rtp.go b/protocol/rtp/rtp.go index 73f6f15b..ba9ab8f5 100644 --- a/protocol/rtp/rtp.go +++ b/protocol/rtp/rtp.go @@ -46,7 +46,7 @@ const ( // Pkt provides fields consistent with RFC3550 definition of an rtp packet // The padding indicator does not need to be set manually, only the padding length -type Pkt struct { +type Packet struct { V uint8 // Version (currently 2). p bool // Padding indicator (0 => padding, 1 => padding). X bool // Extension header indicator. @@ -69,7 +69,7 @@ type ExtensionHeader struct { } // Bytes provides a byte slice of the packet -func (p *Pkt) Bytes(buf []byte) []byte { +func (p *Packet) Bytes(buf []byte) []byte { // Calculate the required length for the RTP packet. headerExtensionLen := 0 if p.X { diff --git a/protocol/rtp/rtp_test.go b/protocol/rtp/rtp_test.go index 2622fb81..438f6035 100644 --- a/protocol/rtp/rtp_test.go +++ b/protocol/rtp/rtp_test.go @@ -35,13 +35,13 @@ import ( // TODO (saxon): add more tests var rtpTests = []struct { num int - pkt Pkt + pkt Packet want []byte }{ // No padding, no CSRC and no extension. { num: 1, - pkt: Pkt{ + pkt: Packet{ V: 2, p: false, X: false, @@ -67,7 +67,7 @@ var rtpTests = []struct { // With padding. { num: 2, - pkt: Pkt{ + pkt: Packet{ V: 2, p: true, X: false, @@ -101,7 +101,7 @@ var rtpTests = []struct { // With padding and CSRC. { num: 3, - pkt: Pkt{ + pkt: Packet{ V: 2, p: true, X: false, @@ -141,7 +141,7 @@ var rtpTests = []struct { // With padding, CSRC and extension. { num: 4, - pkt: Pkt{ + pkt: Packet{ V: 2, p: true, X: true, From 7e96f5999ce62b5fbcd8b1717af304ca36d1bd5f Mon Sep 17 00:00:00 2001 From: Saxon Date: Wed, 8 May 2019 15:47:11 +0930 Subject: [PATCH 15/18] protocol/rtp/client.go: updated comment for NewClient to remove mention of removed op argument --- protocol/rtp/client.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/protocol/rtp/client.go b/protocol/rtp/client.go index e8998917..ff4dda24 100644 --- a/protocol/rtp/client.go +++ b/protocol/rtp/client.go @@ -67,12 +67,8 @@ type Client struct { // NewClient returns a pointer to a new Client. // // addr is the address of form : that we expect to receive -// RTP at. op is a function, if non nil, that will be used to perform an -// operation on each received RTP packet. For example, the op func may parse -// out the RTP payload. The result of the operation is then what is stored -// in the ringBuffer for reading. l is a logging function defined by the -// signuture of the log type defined above. rt is the read timeout used when -// reading from the client ringbuffer. +// RTP at. l is a logging function defined by the signuture of the log type +// defined above. rt is the read timeout used when reading from the client ringbuffer. func NewClient(addr string, l log, rt time.Duration) (*Client, error) { c := &Client{ done: make(chan struct{}, chanSize), From c48e681c414335033b063d0ef156c4d6988f7e8d Mon Sep 17 00:00:00 2001 From: Saxon Date: Wed, 8 May 2019 16:54:02 +0930 Subject: [PATCH 16/18] protocol/rtp/client.go: removed buffering in client. Removed buffering in rtp client. This simplified things alot e.g. the recv routine has been removed, and therefore anything that was there to help with handling of the routine is also gone like the Start() and Stop() methods as well as signalling channels and waitgroups. The client is now just effectively a wrapper for a udp conn. --- protocol/rtp/client.go | 104 +++--------------------------------- protocol/rtp/client_test.go | 7 +-- 2 files changed, 7 insertions(+), 104 deletions(-) diff --git a/protocol/rtp/client.go b/protocol/rtp/client.go index ff4dda24..439e43c6 100644 --- a/protocol/rtp/client.go +++ b/protocol/rtp/client.go @@ -30,53 +30,21 @@ LICENSE package rtp import ( - "io" "net" - "sync" - "time" - - "bitbucket.org/ausocean/utils/logger" - "bitbucket.org/ausocean/utils/ring" ) -const ( - chanSize = 10 - pkg = "rtp: " -) - -// RingBuffer consts. -const ( - ringBufferSize = 100 - ringBufferElementSize = 4096 -) - -type log func(lvl int8, msg string, args ...interface{}) - // Client describes an RTP client that can receive an RTP stream and implements // io.Reader. type Client struct { - conn *net.UDPConn // The UDP connection RTP packets will be read from. - wg sync.WaitGroup // Used to wait for recv routine to finish. - done chan struct{} // Used to terminate the recv routine. - ring *ring.Buffer // Processed data from RTP packets will be stored here. - err chan error // Errors encountered during recv will be sent to this chan. - rt time.Duration // Read timeout used when reading from the ringbuffer. - log + conn *net.UDPConn } // NewClient returns a pointer to a new Client. // // addr is the address of form : that we expect to receive -// RTP at. l is a logging function defined by the signuture of the log type -// defined above. rt is the read timeout used when reading from the client ringbuffer. -func NewClient(addr string, l log, rt time.Duration) (*Client, error) { - c := &Client{ - done: make(chan struct{}, chanSize), - ring: ring.NewBuffer(ringBufferSize, ringBufferElementSize, 0), - log: l, - err: make(chan error), - rt: rt, - } +// RTP at. +func NewClient(addr string) (*Client, error) { + c := &Client{} a, err := net.ResolveUDPAddr("udp", addr) if err != nil { @@ -91,67 +59,7 @@ func NewClient(addr string, l log, rt time.Duration) (*Client, error) { return c, nil } -// Start will start the recv routine. -func (c *Client) Start() { - c.log(logger.Info, pkg+"starting client") - c.wg.Add(1) - go c.recv() -} - -// Stop will send a done signal to the receive routine, and also close the -// connection. -func (c *Client) Stop() { - c.log(logger.Info, pkg+"stopping client") - c.conn.Close() - close(c.done) - c.wg.Wait() -} - -// Err returns the client err channel as receive only. -func (c *Client) Err() <-chan error { - return c.err -} - -// recv will read RTP packets from the UDP connection, perform the operation -// on them given at creation of the Client and then store the result in the -// ringBuffer for Reading. -func (c *Client) recv() { - defer c.wg.Done() - buf := make([]byte, ringBufferElementSize) - for { - select { - case <-c.done: - c.log(logger.Debug, pkg+"done signal received") - return - default: - c.log(logger.Debug, pkg+"waiting for packet") - n, _, err := c.conn.ReadFromUDP(buf) - if err != nil { - c.err <- err - continue - } - c.log(logger.Debug, pkg+"packet received", "packet", buf[:n]) - - _, err = c.ring.Write(buf[:n]) - c.ring.Flush() - if err != nil { - c.err <- err - } - } - } -} - -// Read implements io.Reader. -// -// Read will get the next chunk from the ringBuffer and copy the bytes to p. +// Read implements io.Reader. This wraps the Read for the connection. func (c *Client) Read(p []byte) (int, error) { - c.log(logger.Debug, pkg+"user reading data") - chunk, err := c.ring.Next(c.rt) - if err != nil { - return 0, io.EOF - } - n := copy(p, chunk.Bytes()) - chunk.Close() - chunk = nil - return n, nil + return c.conn.Read(p) } diff --git a/protocol/rtp/client_test.go b/protocol/rtp/client_test.go index 7137ef4b..55fef74c 100644 --- a/protocol/rtp/client_test.go +++ b/protocol/rtp/client_test.go @@ -34,7 +34,6 @@ import ( "io" "net" "testing" - "time" "bitbucket.org/ausocean/utils/logger" ) @@ -85,11 +84,10 @@ func TestReceive(t *testing.T) { go func() { // Create and start the client. var err error - c, err = NewClient(clientAddr, (*dummyLogger)(t).log, 1*time.Millisecond) + c, err = NewClient(clientAddr) if err != nil { testErr <- fmt.Errorf("could not create client, failed with error: %v\n", err) } - c.Start() close(clientReady) // Read packets using the client and check them with expected. @@ -115,7 +113,6 @@ func TestReceive(t *testing.T) { } packetsReceived++ } - c.Stop() close(done) }() @@ -146,8 +143,6 @@ func TestReceive(t *testing.T) { loop: for { select { - case err := <-c.Err(): - t.Log(err) case err := <-testErr: t.Fatal(err) case err := <-serverErr: From 534a0bcecb6d17271b4a1c1e150b8b1eaed04cc8 Mon Sep 17 00:00:00 2001 From: Saxon Date: Wed, 8 May 2019 16:57:07 +0930 Subject: [PATCH 17/18] protocol/rtp/client.go: updated file description in file header. --- protocol/rtp/client.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/protocol/rtp/client.go b/protocol/rtp/client.go index 439e43c6..f47f9baf 100644 --- a/protocol/rtp/client.go +++ b/protocol/rtp/client.go @@ -3,9 +3,7 @@ NAME client.go DESCRIPTION - client.go provides an RTP client, Client, that will receive RTP on a UDP - connection, perform an operation on the packets and then store in a ringBuffer - that can be read from Read, an implemntation of io.Reader. + client.go provides an RTP client. AUTHOR Saxon A. Nelson-Milton From 3692ba772de161aaeb82a3e730249b322152b587 Mon Sep 17 00:00:00 2001 From: Saxon Date: Wed, 8 May 2019 16:57:58 +0930 Subject: [PATCH 18/18] protocol/rtp/client.go: removed dummyLogger as not required anymore. --- protocol/rtp/client_test.go | 30 ------------------------------ 1 file changed, 30 deletions(-) diff --git a/protocol/rtp/client_test.go b/protocol/rtp/client_test.go index 55fef74c..39810fef 100644 --- a/protocol/rtp/client_test.go +++ b/protocol/rtp/client_test.go @@ -34,38 +34,8 @@ import ( "io" "net" "testing" - - "bitbucket.org/ausocean/utils/logger" ) -// dummyLogger will allow logging to be done by the testing pkg. -type dummyLogger testing.T - -func (dl *dummyLogger) log(lvl int8, msg string, args ...interface{}) { - var l string - switch lvl { - case logger.Warning: - l = "warning" - case logger.Debug: - l = "debug" - case logger.Info: - l = "info" - case logger.Error: - l = "error" - case logger.Fatal: - l = "fatal" - } - msg = l + ": " + msg - for i := 0; i < len(args); i++ { - msg += " %v" - } - if len(args) == 0 { - dl.Log(msg + "\n") - return - } - dl.Logf(msg+"\n", args...) -} - // TestReceive checks that the Client can correctly receive RTP packets and // perform a specificed operation on the packets before storing in the ringBuffer. func TestReceive(t *testing.T) {