protocol/rtp: removed op from Client i.e. what is read from Client are RTP packets.

This commit is contained in:
Saxon 2019-05-01 14:15:39 +09:30
parent d358f70585
commit 80a7d41d8a
2 changed files with 85 additions and 107 deletions

View File

@ -47,7 +47,7 @@ const (
// RingBuffer consts. // RingBuffer consts.
const ( const (
ringBufferSize = 10 ringBufferSize = 100
ringBufferElementSize = 4096 ringBufferElementSize = 4096
) )
@ -56,13 +56,12 @@ type log func(lvl int8, msg string, args ...interface{})
// Client describes an RTP client that can receive an RTP stream and implements // Client describes an RTP client that can receive an RTP stream and implements
// io.Reader. // io.Reader.
type Client struct { type Client struct {
conn *net.UDPConn // The UDP connection RTP packets will be read from. conn *net.UDPConn // The UDP connection RTP packets will be read from.
wg sync.WaitGroup // Used to wait for recv routine to finish. wg sync.WaitGroup // Used to wait for recv routine to finish.
done chan struct{} // Used to terminate the recv routine. done chan struct{} // Used to terminate the recv routine.
ring *ring.Buffer // Processed data from RTP packets will be stored here. ring *ring.Buffer // Processed data from RTP packets will be stored here.
op func([]byte) ([]byte, error) // The operation to perform on received RTP packets before storing. err chan error // Errors encountered during recv will be sent to this chan.
err chan error // Errors encountered during recv will be sent to this chan. rt time.Duration // Read timeout used when reading from the ringbuffer.
rt time.Duration // Read timeout used when reading from the ringbuffer.
log log
} }
@ -75,11 +74,10 @@ type Client struct {
// in the ringBuffer for reading. l is a logging function defined by the // in the ringBuffer for reading. l is a logging function defined by the
// signuture of the log type defined above. rt is the read timeout used when // signuture of the log type defined above. rt is the read timeout used when
// reading from the client ringbuffer. // reading from the client ringbuffer.
func NewClient(addr string, op func([]byte) ([]byte, error), l log, rt time.Duration) (*Client, error) { func NewClient(addr string, l log, rt time.Duration) (*Client, error) {
c := &Client{ c := &Client{
done: make(chan struct{}, chanSize), done: make(chan struct{}, chanSize),
ring: ring.NewBuffer(ringBufferSize, ringBufferElementSize, 0), ring: ring.NewBuffer(ringBufferSize, ringBufferElementSize, 0),
op: op,
log: l, log: l,
err: make(chan error), err: make(chan error),
rt: rt, rt: rt,
@ -139,19 +137,7 @@ func (c *Client) recv() {
} }
c.log(logger.Debug, pkg+"packet received", "packet", buf[:n]) c.log(logger.Debug, pkg+"packet received", "packet", buf[:n])
var _buf []byte _, err = c.ring.Write(buf[:n])
switch c.op {
case nil:
_buf = buf[:n]
default:
_buf, err = c.op(buf[:n])
if err != nil {
c.err <- err
continue
}
}
_, err = c.ring.Write(_buf)
c.ring.Flush() c.ring.Flush()
if err != nil { if err != nil {
c.err <- err c.err <- err

View File

@ -75,94 +75,86 @@ func TestReceive(t *testing.T) {
packetsToSend = 20 packetsToSend = 20
) )
for _, op := range []func([]byte) ([]byte, error){nil, Payload} { testErr := make(chan error)
testErr := make(chan error) serverErr := make(chan error)
serverErr := make(chan error) done := make(chan struct{})
done := make(chan struct{}) clientReady := make(chan struct{})
clientReady := make(chan struct{}) var c *Client
var c *Client
// Start routine to read from client. // Start routine to read from client.
go func() { go func() {
// Create and start the client. // Create and start the client.
var err error var err error
c, err = NewClient(clientAddr, op, (*dummyLogger)(t).log, 1*time.Millisecond) c, err = NewClient(clientAddr, (*dummyLogger)(t).log, 1*time.Millisecond)
if err != nil { if err != nil {
testErr <- fmt.Errorf("could not create client, failed with error: %v\n", err) testErr <- fmt.Errorf("could not create client, failed with error: %v\n", err)
} }
c.Start() c.Start()
close(clientReady) close(clientReady)
// Read packets using the client and check them with expected. // Read packets using the client and check them with expected.
var packetsReceived int var packetsReceived int
buf := make([]byte, 4096) buf := make([]byte, 4096)
for packetsReceived != packetsToSend { for packetsReceived != packetsToSend {
n, err := c.Read(buf) n, err := c.Read(buf)
switch err { switch err {
case nil: case nil:
case io.EOF: case io.EOF:
continue continue
default:
testErr <- fmt.Errorf("unexpected error from c.Read: %v\n", err)
}
// Create expected data and apply operation if there is one.
expect := (&Pkt{V: rtpVer, Payload: []byte{byte(packetsReceived)}}).Bytes(nil)
if op != nil {
expect, err = op(expect)
if err != nil {
testErr <- fmt.Errorf("unexpected error when applying op: %v\n", err)
}
}
// Compare.
got := buf[:n]
if !bytes.Equal(got, expect) {
testErr <- fmt.Errorf("did not get expected result. \nGot: %v\n Want: %v\n", got, expect)
}
packetsReceived++
}
c.Stop()
close(done)
}()
// Start the RTP server.
go func() {
<-clientReady
cAddr, err := net.ResolveUDPAddr("udp", clientAddr)
if err != nil {
serverErr <- fmt.Errorf("could not resolve server address, failed with err: %v\n", err)
}
conn, err := net.DialUDP("udp", nil, cAddr)
if err != nil {
serverErr <- fmt.Errorf("could not dial udp, failed with err: %v\n", err)
}
// Send packets to the client.
for i := 0; i < packetsToSend; i++ {
p := (&Pkt{V: rtpVer, Payload: []byte{byte(i)}}).Bytes(nil)
_, err := conn.Write(p)
if err != nil {
serverErr <- fmt.Errorf("could not write packet to conn, failed with err: %v\n", err)
}
}
}()
<-clientReady
loop:
for {
select {
case err := <-c.Err():
t.Log(err)
case err := <-testErr:
t.Fatal(err)
case err := <-serverErr:
t.Fatal(err)
case <-done:
break loop
default: default:
testErr <- fmt.Errorf("unexpected error from c.Read: %v\n", err)
} }
// Create expected data and apply operation if there is one.
expect := (&Pkt{V: rtpVer, Payload: []byte{byte(packetsReceived)}}).Bytes(nil)
// Compare.
got := buf[:n]
if !bytes.Equal(got, expect) {
testErr <- fmt.Errorf("did not get expected result. \nGot: %v\n Want: %v\n", got, expect)
}
packetsReceived++
}
c.Stop()
close(done)
}()
// Start the RTP server.
go func() {
<-clientReady
cAddr, err := net.ResolveUDPAddr("udp", clientAddr)
if err != nil {
serverErr <- fmt.Errorf("could not resolve server address, failed with err: %v\n", err)
}
conn, err := net.DialUDP("udp", nil, cAddr)
if err != nil {
serverErr <- fmt.Errorf("could not dial udp, failed with err: %v\n", err)
}
// Send packets to the client.
for i := 0; i < packetsToSend; i++ {
p := (&Pkt{V: rtpVer, Payload: []byte{byte(i)}}).Bytes(nil)
_, err := conn.Write(p)
if err != nil {
serverErr <- fmt.Errorf("could not write packet to conn, failed with err: %v\n", err)
}
}
}()
<-clientReady
loop:
for {
select {
case err := <-c.Err():
t.Log(err)
case err := <-testErr:
t.Fatal(err)
case err := <-serverErr:
t.Fatal(err)
case <-done:
break loop
default:
} }
} }
} }