protoocl/rtp: added client.go file

Added client.go which contains a struct to describe an RTP client. It provides a method, Start, which will
invoke a recv routine to start receiving packets and process them using an op function passed on the Client's
creation. Client implements io.Reader, so that the client may be read from.
This commit is contained in:
Saxon 2019-04-19 18:17:06 +09:30
parent 6ee286e988
commit ccc08bfad1
1 changed files with 124 additions and 0 deletions

124
protocol/rtp/client.go Normal file
View File

@ -0,0 +1,124 @@
/*
NAME
client.go
DESCRIPTION
client.go provides an RTP client that will receive RTP on a UDP connection
AUTHOR
Saxon A. Nelson-Milton <saxon@ausocean.org>
LICENSE
rtp.go is Copyright (C) 2018 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses).
*/
package rtp
import (
"io"
"net"
"sync"
"bitbucket.org/ausocean/utils/ring"
)
const (
chanSize = 10
)
type Client struct {
conn *net.UDPConn
wg sync.WaitGroup
done chan struct{}
ring *ring.Buffer
op func([]byte) ([]byte, error)
ErrChan chan error
}
func NewClient(addr string, op func([]byte) ([]byte, error)) (*Client, error) {
c := &Client{
done: make(chan struct{}, 10),
ring: ring.NewBuffer(10, 4096, 0),
}
a, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
return nil, err
}
c.conn, err = net.ListenUDP("udp", a)
if err != nil {
return nil, err
}
return c, nil
}
func (c *Client) Start() {
c.wg.Add(1)
go c.recv()
}
func (c *Client) recv() {
defer c.wg.Done()
buf := make([]byte, 4096)
for {
select {
case <-c.done:
return
default:
n, _, err := c.conn.ReadFromUDP(buf)
if err != nil {
c.ErrChan <- err
continue
}
var _buf []byte
switch c.op {
case nil:
_buf = buf[:n]
default:
_buf, err = c.op(buf[:n])
if err != nil {
c.ErrChan <- err
continue
}
}
_, err = c.ring.Write(_buf)
c.ring.Flush()
if err != nil {
c.ErrChan <- err
continue
}
}
}
}
func (c *Client) Stop() {
close(c.done)
c.conn.Close()
c.wg.Wait()
}
func (c *Client) Read(p []byte) (int, error) {
chunk, err := c.ring.Next(0)
if err != nil {
return 0, io.EOF
}
n := copy(p, chunk.Bytes())
chunk.Close()
chunk = nil
return n, nil
}