protocol/rtp: Client.ErrChan => Client.err and wrote accessor function Client.Err() to access this chan as only receive

This commit is contained in:
Saxon 2019-04-30 02:37:18 +09:30
parent a0c324a813
commit 49a401681d
2 changed files with 21 additions and 17 deletions

View File

@ -56,13 +56,13 @@ type log func(lvl int8, msg string, args ...interface{})
// Client describes an RTP client that can receive an RTP stream and implements // Client describes an RTP client that can receive an RTP stream and implements
// io.Reader. // io.Reader.
type Client struct { type Client struct {
conn *net.UDPConn // The UDP connection RTP packets will be read from. conn *net.UDPConn // The UDP connection RTP packets will be read from.
wg sync.WaitGroup // Used to wait for recv routine to finish. wg sync.WaitGroup // Used to wait for recv routine to finish.
done chan struct{} // Used to terminate the recv routine. done chan struct{} // Used to terminate the recv routine.
ring *ring.Buffer // Processed data from RTP packets will be stored here. ring *ring.Buffer // Processed data from RTP packets will be stored here.
op func([]byte) ([]byte, error) // The operation to perform on received RTP packets before storing. op func([]byte) ([]byte, error) // The operation to perform on received RTP packets before storing.
ErrChan chan error // Errors encountered during recv will be sent to this chan. err chan error // Errors encountered during recv will be sent to this chan.
rt time.Duration // Read timeout used when reading from the ringbuffer. rt time.Duration // Read timeout used when reading from the ringbuffer.
log log
} }
@ -77,12 +77,12 @@ type Client struct {
// reading from the client ringbuffer. // reading from the client ringbuffer.
func NewClient(addr string, op func([]byte) ([]byte, error), l log, rt time.Duration) (*Client, error) { func NewClient(addr string, op func([]byte) ([]byte, error), l log, rt time.Duration) (*Client, error) {
c := &Client{ c := &Client{
done: make(chan struct{}, chanSize), done: make(chan struct{}, chanSize),
ring: ring.NewBuffer(ringBufferSize, ringBufferElementSize, 0), ring: ring.NewBuffer(ringBufferSize, ringBufferElementSize, 0),
op: op, op: op,
log: l, log: l,
ErrChan: make(chan error), err: make(chan error),
rt: rt, rt: rt,
} }
a, err := net.ResolveUDPAddr("udp", addr) a, err := net.ResolveUDPAddr("udp", addr)
@ -114,6 +114,10 @@ func (c *Client) Stop() {
c.wg.Wait() c.wg.Wait()
} }
func (c *Client) Err() <-chan error {
return c.err
}
// recv will read RTP packets from the UDP connection, perform the operation // recv will read RTP packets from the UDP connection, perform the operation
// on them given at creation of the Client and then store the result in the // on them given at creation of the Client and then store the result in the
// ringBuffer for Reading. // ringBuffer for Reading.
@ -129,7 +133,7 @@ func (c *Client) recv() {
c.log(logger.Debug, pkg+"waiting for packet") c.log(logger.Debug, pkg+"waiting for packet")
n, _, err := c.conn.ReadFromUDP(buf) n, _, err := c.conn.ReadFromUDP(buf)
if err != nil { if err != nil {
c.ErrChan <- err c.err <- err
continue continue
} }
c.log(logger.Debug, pkg+"packet received", "packet", buf[:n]) c.log(logger.Debug, pkg+"packet received", "packet", buf[:n])
@ -141,7 +145,7 @@ func (c *Client) recv() {
default: default:
_buf, err = c.op(buf[:n]) _buf, err = c.op(buf[:n])
if err != nil { if err != nil {
c.ErrChan <- err c.err <- err
continue continue
} }
} }
@ -149,7 +153,7 @@ func (c *Client) recv() {
_, err = c.ring.Write(_buf) _, err = c.ring.Write(_buf)
c.ring.Flush() c.ring.Flush()
if err != nil { if err != nil {
c.ErrChan <- err c.err <- err
continue continue
} }
} }

View File

@ -86,7 +86,7 @@ func TestReceive(t *testing.T) {
// Log any errors from client. // Log any errors from client.
go func() { go func() {
for { for {
err := <-c.ErrChan err := <-c.Err()
t.Logf("unexpected error from client: %v", err) t.Logf("unexpected error from client: %v", err)
} }
}() }()