/* NAME client.go DESCRIPTION client.go provides an RTP client, Client, that will receive RTP on a UDP connection, perform an operation on the packets and then store in a ringBuffer that can be read from Read, an implemntation of io.Reader. AUTHOR Saxon A. Nelson-Milton LICENSE This is Copyright (C) 2019 the Australian Ocean Lab (AusOcean). It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License in gpl.txt. If not, see http://www.gnu.org/licenses. */ package rtp import ( "io" "net" "sync" "time" "bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/ring" ) const ( chanSize = 10 pkg = "rtp: " ) // RingBuffer consts. const ( ringBufferSize = 100 ringBufferElementSize = 4096 ) type log func(lvl int8, msg string, args ...interface{}) // Client describes an RTP client that can receive an RTP stream and implements // io.Reader. type Client struct { conn *net.UDPConn // The UDP connection RTP packets will be read from. wg sync.WaitGroup // Used to wait for recv routine to finish. done chan struct{} // Used to terminate the recv routine. ring *ring.Buffer // Processed data from RTP packets will be stored here. err chan error // Errors encountered during recv will be sent to this chan. rt time.Duration // Read timeout used when reading from the ringbuffer. log } // NewClient returns a pointer to a new Client. // // addr is the address of form : that we expect to receive // RTP at. op is a function, if non nil, that will be used to perform an // operation on each received RTP packet. For example, the op func may parse // out the RTP payload. The result of the operation is then what is stored // in the ringBuffer for reading. l is a logging function defined by the // signuture of the log type defined above. rt is the read timeout used when // reading from the client ringbuffer. func NewClient(addr string, l log, rt time.Duration) (*Client, error) { c := &Client{ done: make(chan struct{}, chanSize), ring: ring.NewBuffer(ringBufferSize, ringBufferElementSize, 0), log: l, err: make(chan error), rt: rt, } a, err := net.ResolveUDPAddr("udp", addr) if err != nil { return nil, err } c.conn, err = net.ListenUDP("udp", a) if err != nil { return nil, err } return c, nil } // Start will start the recv routine. func (c *Client) Start() { c.log(logger.Info, pkg+"starting client") c.wg.Add(1) go c.recv() } // Stop will send a done signal to the receive routine, and also close the // connection. func (c *Client) Stop() { c.log(logger.Info, pkg+"stopping client") c.conn.Close() close(c.done) c.wg.Wait() } // Err returns the client err channel as receive only. func (c *Client) Err() <-chan error { return c.err } // recv will read RTP packets from the UDP connection, perform the operation // on them given at creation of the Client and then store the result in the // ringBuffer for Reading. func (c *Client) recv() { defer c.wg.Done() buf := make([]byte, ringBufferElementSize) for { select { case <-c.done: c.log(logger.Debug, pkg+"done signal received") return default: c.log(logger.Debug, pkg+"waiting for packet") n, _, err := c.conn.ReadFromUDP(buf) if err != nil { c.err <- err continue } c.log(logger.Debug, pkg+"packet received", "packet", buf[:n]) _, err = c.ring.Write(buf[:n]) c.ring.Flush() if err != nil { c.err <- err } } } } // Read implements io.Reader. // // Read will get the next chunk from the ringBuffer and copy the bytes to p. func (c *Client) Read(p []byte) (int, error) { c.log(logger.Debug, pkg+"user reading data") chunk, err := c.ring.Next(c.rt) if err != nil { return 0, io.EOF } n := copy(p, chunk.Bytes()) chunk.Close() chunk = nil return n, nil }