protocol/rtp/client.go: removed buffering in client.

Removed buffering in rtp client. This simplified things alot e.g. the recv routine has been removed, and therefore anything that was there to help with handling of the routine is also gone
like the Start() and Stop() methods as well as signalling channels and waitgroups. The client is now just effectively a wrapper for a udp conn.
This commit is contained in:
Saxon 2019-05-08 16:54:02 +09:30
parent 7e96f5999c
commit c48e681c41
2 changed files with 7 additions and 104 deletions

View File

@ -30,53 +30,21 @@ LICENSE
package rtp package rtp
import ( import (
"io"
"net" "net"
"sync"
"time"
"bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring"
) )
const (
chanSize = 10
pkg = "rtp: "
)
// RingBuffer consts.
const (
ringBufferSize = 100
ringBufferElementSize = 4096
)
type log func(lvl int8, msg string, args ...interface{})
// Client describes an RTP client that can receive an RTP stream and implements // Client describes an RTP client that can receive an RTP stream and implements
// io.Reader. // io.Reader.
type Client struct { type Client struct {
conn *net.UDPConn // The UDP connection RTP packets will be read from. conn *net.UDPConn
wg sync.WaitGroup // Used to wait for recv routine to finish.
done chan struct{} // Used to terminate the recv routine.
ring *ring.Buffer // Processed data from RTP packets will be stored here.
err chan error // Errors encountered during recv will be sent to this chan.
rt time.Duration // Read timeout used when reading from the ringbuffer.
log
} }
// NewClient returns a pointer to a new Client. // NewClient returns a pointer to a new Client.
// //
// addr is the address of form <ip>:<port> that we expect to receive // addr is the address of form <ip>:<port> that we expect to receive
// RTP at. l is a logging function defined by the signuture of the log type // RTP at.
// defined above. rt is the read timeout used when reading from the client ringbuffer. func NewClient(addr string) (*Client, error) {
func NewClient(addr string, l log, rt time.Duration) (*Client, error) { c := &Client{}
c := &Client{
done: make(chan struct{}, chanSize),
ring: ring.NewBuffer(ringBufferSize, ringBufferElementSize, 0),
log: l,
err: make(chan error),
rt: rt,
}
a, err := net.ResolveUDPAddr("udp", addr) a, err := net.ResolveUDPAddr("udp", addr)
if err != nil { if err != nil {
@ -91,67 +59,7 @@ func NewClient(addr string, l log, rt time.Duration) (*Client, error) {
return c, nil return c, nil
} }
// Start will start the recv routine. // Read implements io.Reader. This wraps the Read for the connection.
func (c *Client) Start() {
c.log(logger.Info, pkg+"starting client")
c.wg.Add(1)
go c.recv()
}
// Stop will send a done signal to the receive routine, and also close the
// connection.
func (c *Client) Stop() {
c.log(logger.Info, pkg+"stopping client")
c.conn.Close()
close(c.done)
c.wg.Wait()
}
// Err returns the client err channel as receive only.
func (c *Client) Err() <-chan error {
return c.err
}
// recv will read RTP packets from the UDP connection, perform the operation
// on them given at creation of the Client and then store the result in the
// ringBuffer for Reading.
func (c *Client) recv() {
defer c.wg.Done()
buf := make([]byte, ringBufferElementSize)
for {
select {
case <-c.done:
c.log(logger.Debug, pkg+"done signal received")
return
default:
c.log(logger.Debug, pkg+"waiting for packet")
n, _, err := c.conn.ReadFromUDP(buf)
if err != nil {
c.err <- err
continue
}
c.log(logger.Debug, pkg+"packet received", "packet", buf[:n])
_, err = c.ring.Write(buf[:n])
c.ring.Flush()
if err != nil {
c.err <- err
}
}
}
}
// Read implements io.Reader.
//
// Read will get the next chunk from the ringBuffer and copy the bytes to p.
func (c *Client) Read(p []byte) (int, error) { func (c *Client) Read(p []byte) (int, error) {
c.log(logger.Debug, pkg+"user reading data") return c.conn.Read(p)
chunk, err := c.ring.Next(c.rt)
if err != nil {
return 0, io.EOF
}
n := copy(p, chunk.Bytes())
chunk.Close()
chunk = nil
return n, nil
} }

View File

@ -34,7 +34,6 @@ import (
"io" "io"
"net" "net"
"testing" "testing"
"time"
"bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/logger"
) )
@ -85,11 +84,10 @@ func TestReceive(t *testing.T) {
go func() { go func() {
// Create and start the client. // Create and start the client.
var err error var err error
c, err = NewClient(clientAddr, (*dummyLogger)(t).log, 1*time.Millisecond) c, err = NewClient(clientAddr)
if err != nil { if err != nil {
testErr <- fmt.Errorf("could not create client, failed with error: %v\n", err) testErr <- fmt.Errorf("could not create client, failed with error: %v\n", err)
} }
c.Start()
close(clientReady) close(clientReady)
// Read packets using the client and check them with expected. // Read packets using the client and check them with expected.
@ -115,7 +113,6 @@ func TestReceive(t *testing.T) {
} }
packetsReceived++ packetsReceived++
} }
c.Stop()
close(done) close(done)
}() }()
@ -146,8 +143,6 @@ func TestReceive(t *testing.T) {
loop: loop:
for { for {
select { select {
case err := <-c.Err():
t.Log(err)
case err := <-testErr: case err := <-testErr:
t.Fatal(err) t.Fatal(err)
case err := <-serverErr: case err := <-serverErr: