av/ringbuffer/RingBuffer.go

201 lines
5.0 KiB
Go
Raw Normal View History

/*
NAME
RingBuffer.go - a structure that encapsulates a RingBuffer datastructure with conccurency
functionality
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon.milton@gmail.com>
LICENSE
RingBuffer.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses).
*/
package ringbuffer
import (
"errors"
"sync"
)
/*
RingBuffer aims to provide functionality of a RingBuffer data structure.
It may be used in concurrent routines.
*/
2017-12-04 08:29:41 +03:00
type RingBuffer interface {
Get() ([]byte, error)
DoneWriting(size int) error
Read() ([]byte, error)
DoneReading() error
IsReadable() bool
IsWritable() bool
GetNoOfElements() int
}
2018-03-14 04:18:03 +03:00
func (rb *ringBuffer) GetNoOfElements() int {
return rb.noOfElements
}
// ringBuffer implements the RingBuffer interface
type ringBuffer struct {
dataMemory [][]byte
sizeMemory []int
size int
noOfElements int
first int
last int
currentlyWriting bool
currentlyReading bool
mutex sync.Mutex
}
/*
New returns a pointer to a newly allocated RingBuffer with the parameters specified.
It initialises fields and allocates the required dataMemory.
*/
func NewRingBuffer(bufferSize int, elementSize int) (rb *ringBuffer) {
if bufferSize <= 0 || elementSize <= 0 {
return nil
}
rb = new(ringBuffer)
rb.dataMemory = make([][]byte, bufferSize)
for i := range rb.dataMemory {
rb.dataMemory[i] = make([]byte, elementSize)
}
rb.sizeMemory = make([]int, bufferSize)
rb.size = bufferSize
rb.noOfElements = 0
rb.first = -1
rb.last = -1
rb.currentlyWriting = false
rb.currentlyReading = false
rb.mutex = sync.Mutex{}
return
}
/*
Get provides the address to the next empty element in the RingBuffer.
An error is returned if the buffer is full, or there has already been
a call to Get without an accompanying call to DoneWriting.
*/
func (rb *ringBuffer) Get() ([]byte, error) {
rb.mutex.Lock()
defer rb.mutex.Unlock()
if !rb.IsWritable() {
return nil, errors.New("Buffer full!")
}
var nextlast int
if !rb.currentlyWriting {
rb.currentlyWriting = true
nextlast = rb.last + 1
if nextlast == rb.size {
nextlast = 0
}
} else {
nextlast = rb.last
}
return rb.dataMemory[nextlast], nil
}
/*
DoneWriting let's the RingBuffer know that we have finished writing to the
address recieved by a call to Get, and also let's the buffer know how much
data was written. An Error is thrown if there was not an initial call to Get.
*/
func (rb *ringBuffer) DoneWriting(size int) error {
rb.mutex.Lock()
defer rb.mutex.Unlock()
if !rb.currentlyWriting {
return errors.New("DoneWriting called without initial call to Get!")
}
if rb.first == -1 {
rb.first++
}
rb.last++
if rb.last == rb.size {
rb.last = 0
}
rb.sizeMemory[rb.last] = size
rb.noOfElements++
rb.currentlyWriting = false
return nil
}
/*
Read returns a slice containing the next element in the ring buffer and
thereafter considers it empty and can be used for latter writing. The address
of the data is returned, as well as the size of the data contained at this
address. An error is returned if the buffer is empty, or there has been a
second call to Read before a call to DoneReading.
*/
func (rb *ringBuffer) Read() ([]byte, error) {
rb.mutex.Lock()
defer rb.mutex.Unlock()
if !rb.IsReadable() {
return nil, errors.New("Buffer is empty, nothging to read!")
}
if rb.currentlyReading {
return nil, errors.New("Second call to Read! Call DoneReading first!")
}
rb.currentlyReading = true
return rb.dataMemory[rb.first][:rb.sizeMemory[rb.first]], nil
}
/*
DoneReading informs the buffer that we have finished with the address
provided by Read. The buffer can now consider this address to be empty.
An error is returned if there was no initial call to Read.
*/
func (rb *ringBuffer) DoneReading() error {
rb.mutex.Lock()
defer rb.mutex.Unlock()
if !rb.currentlyReading {
return errors.New("DoneReading called but no initial call to Read!")
}
rb.first++
if rb.first == rb.size {
rb.first = 0
}
rb.noOfElements--
rb.currentlyReading = false
return nil
}
/*
IsReadable returns true if it is possible to read from the buffer, i.e. if
it is not empty.
*/
func (rb *ringBuffer) IsReadable() bool {
if rb.first == -1 || rb.noOfElements == 0 {
return false
}
return true
}
/*
IsWritable returns true if it is possible to write to the buffer, i.e. if
it is not full.
*/
func (rb *ringBuffer) IsWritable() bool {
if rb.noOfElements == rb.size {
return false
}
return true
}