/* NAME client.go DESCRIPTION client.go provides an RTP client, Client, that will receive RTP on a UDP connection, perform an operation on the packets and then store in a ringBuffer that can be read from Read, an implemntation of io.Reader. AUTHOR Saxon A. Nelson-Milton LICENSE This is Copyright (C) 2019 the Australian Ocean Lab (AusOcean). It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License in gpl.txt. If not, see http://www.gnu.org/licenses. */ package rtp import ( "io" "net" "sync" "time" "bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/ring" ) // Misc consts. const ( chanSize = 10 pkg = "rtp: " ) // RingBuffer consts. const ( ringBufferSize = 10 ringBufferElementSize = 4096 ) type log func(lvl int8, msg string, args ...interface{}) // Client describes an RTP client that can receive an RTP stream and implements // io.Reader. type Client struct { conn *net.UDPConn // The UDP connection RTP packets will be read from. wg sync.WaitGroup // Used to wait for recv routine to finish. done chan struct{} // Used to terminate the recv routine. ring *ring.Buffer // Processed data from RTP packets will be stored here. op func([]byte) ([]byte, error) // The operation to perform on received RTP packets before storing. ErrChan chan error // Errors encountered during recv will be sent to this chan. rt time.Duration // Read timeout used when reading from the ringbuffer. log } // NewClient returns a pointer to a new Client. // // addr is the address of form : that we expect to receive // RTP at. op is a function, if non nil, that will be used to perform an // operation on each received RTP packet. For example, the op func may parse // out the RTP payload. The result of the operation is then what is stored // in the ringBuffer for reading. l is a logging function defined by the // signuture of the log type defined above. rt is the read timeout used when // reading from the client ringbuffer. func NewClient(addr string, op func([]byte) ([]byte, error), l log, rt time.Duration) (*Client, error) { c := &Client{ done: make(chan struct{}, chanSize), ring: ring.NewBuffer(ringBufferSize, ringBufferElementSize, 0), op: op, log: l, ErrChan: make(chan error), rt: rt, } a, err := net.ResolveUDPAddr("udp", addr) if err != nil { return nil, err } c.conn, err = net.ListenUDP("udp", a) if err != nil { return nil, err } return c, nil } // Start will start the recv routine. func (c *Client) Start() { c.log(logger.Info, pkg+"starting client") c.wg.Add(1) go c.recv() } // Stop will send a done signal to the receive routine, and also close the // connection. func (c *Client) Stop() { c.log(logger.Info, pkg+"stopping client") c.conn.Close() close(c.done) c.wg.Wait() } // recv will read RTP packets from the UDP connection, perform the operation // on them given at creation of the Client and then store the result in the // ringBuffer for Reading. func (c *Client) recv() { defer c.wg.Done() buf := make([]byte, ringBufferElementSize) for { select { case <-c.done: c.log(logger.Debug, pkg+"done signal received") return default: c.log(logger.Debug, pkg+"waiting for packet") n, _, err := c.conn.ReadFromUDP(buf) if err != nil { c.ErrChan <- err continue } c.log(logger.Debug, pkg+"packet received", "packet", buf[:n]) var _buf []byte switch c.op { case nil: _buf = buf[:n] default: _buf, err = c.op(buf[:n]) if err != nil { c.ErrChan <- err continue } } _, err = c.ring.Write(_buf) c.ring.Flush() if err != nil { c.ErrChan <- err continue } } } } // Read implements io.Reader. // // Read will get the next chunk from the ringBuffer and copy the bytes to p. func (c *Client) Read(p []byte) (int, error) { c.log(logger.Debug, pkg+"user reading data") chunk, err := c.ring.Next(c.rt) if err != nil { return 0, io.EOF } n := copy(p, chunk.Bytes()) chunk.Close() chunk = nil return n, nil }