/* NAME client.go DESCRIPTION client.go provides an RTP client that will receive RTP on a UDP connection AUTHOR Saxon A. Nelson-Milton LICENSE rtp.go is Copyright (C) 2018 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). */ package rtp import ( "io" "net" "sync" "bitbucket.org/ausocean/utils/ring" ) const ( chanSize = 10 ) type Client struct { conn *net.UDPConn wg sync.WaitGroup done chan struct{} ring *ring.Buffer op func([]byte) ([]byte, error) ErrChan chan error } func NewClient(addr string, op func([]byte) ([]byte, error)) (*Client, error) { c := &Client{ done: make(chan struct{}, 10), ring: ring.NewBuffer(10, 4096, 0), op: op, } a, err := net.ResolveUDPAddr("udp", addr) if err != nil { return nil, err } c.conn, err = net.ListenUDP("udp", a) if err != nil { return nil, err } return c, nil } func (c *Client) Start() { c.wg.Add(1) go c.recv() } func (c *Client) recv() { defer c.wg.Done() buf := make([]byte, 4096) for { select { case <-c.done: return default: n, _, err := c.conn.ReadFromUDP(buf) if err != nil { c.ErrChan <- err continue } var _buf []byte switch c.op { case nil: _buf = buf[:n] default: _buf, err = c.op(buf[:n]) if err != nil { c.ErrChan <- err continue } } _, err = c.ring.Write(_buf) c.ring.Flush() if err != nil { c.ErrChan <- err continue } } } } func (c *Client) Stop() { close(c.done) c.conn.Close() c.wg.Wait() } func (c *Client) Read(p []byte) (int, error) { chunk, err := c.ring.Next(0) if err != nil { return 0, io.EOF } n := copy(p, chunk.Bytes()) chunk.Close() chunk = nil return n, nil }