diff --git a/protocol/rtp/client.go b/protocol/rtp/client.go new file mode 100644 index 00000000..4140abc1 --- /dev/null +++ b/protocol/rtp/client.go @@ -0,0 +1,124 @@ +/* +NAME + client.go + +DESCRIPTION + client.go provides an RTP client that will receive RTP on a UDP connection + +AUTHOR + Saxon A. Nelson-Milton + +LICENSE + rtp.go is Copyright (C) 2018 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). +*/ + +package rtp + +import ( + "io" + "net" + "sync" + + "bitbucket.org/ausocean/utils/ring" +) + +const ( + chanSize = 10 +) + +type Client struct { + conn *net.UDPConn + wg sync.WaitGroup + done chan struct{} + ring *ring.Buffer + op func([]byte) ([]byte, error) + ErrChan chan error +} + +func NewClient(addr string, op func([]byte) ([]byte, error)) (*Client, error) { + c := &Client{ + done: make(chan struct{}, 10), + ring: ring.NewBuffer(10, 4096, 0), + } + + a, err := net.ResolveUDPAddr("udp", addr) + if err != nil { + return nil, err + } + + c.conn, err = net.ListenUDP("udp", a) + if err != nil { + return nil, err + } + + return c, nil +} + +func (c *Client) Start() { + c.wg.Add(1) + go c.recv() +} + +func (c *Client) recv() { + defer c.wg.Done() + buf := make([]byte, 4096) + for { + select { + case <-c.done: + return + default: + n, _, err := c.conn.ReadFromUDP(buf) + if err != nil { + c.ErrChan <- err + continue + } + var _buf []byte + switch c.op { + case nil: + _buf = buf[:n] + default: + _buf, err = c.op(buf[:n]) + if err != nil { + c.ErrChan <- err + continue + } + } + _, err = c.ring.Write(_buf) + c.ring.Flush() + if err != nil { + c.ErrChan <- err + continue + } + } + } +} + +func (c *Client) Stop() { + close(c.done) + c.conn.Close() + c.wg.Wait() +} + +func (c *Client) Read(p []byte) (int, error) { + chunk, err := c.ring.Next(0) + if err != nil { + return 0, io.EOF + } + n := copy(p, chunk.Bytes()) + chunk.Close() + chunk = nil + return n, nil +}