av/ringbuffer/RingBuffer.go

172 lines
4.5 KiB
Go
Raw Normal View History

2017-12-02 08:24:34 +03:00
/*
NAME
RingBuffer.go - a structure that encapsulates a RingBuffer datastructure with conccurency
functionality
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon.milton@gmail.com>
LICENSE
2017-12-03 01:21:02 +03:00
RingBuffer.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
2017-12-02 08:24:34 +03:00
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses).
*/
package ringbuffer
import (
"errors"
"sync"
)
/*
RingBuffer aims to provide functionality of a RingBuffer data structure.
It may be used in concurrent routines.
*/
type ringBuffer struct {
dataMemory [][]byte
sizeMemory []int
2017-12-03 01:47:08 +03:00
size int
noOfElements int
first int
last int
currentlyWriting bool
currentlyReading bool
mutex sync.Mutex
2017-12-02 08:24:34 +03:00
}
/*
New returns the addres of a new ring buffer with the parameters specified.
It initialises fields and allocates the required dataMemory.
2017-12-02 08:24:34 +03:00
*/
2017-12-03 12:14:24 +03:00
func NewRingBuffer(bufferSize int, elementSize int) (rb *ringBuffer) {
if bufferSize <= 0 || elementSize <= 0 {
return nil
}
rb = new(ringBuffer)
2017-12-03 12:14:24 +03:00
rb.dataMemory = make([][]byte, bufferSize)
for i := range rb.dataMemory {
rb.dataMemory[i] = make([]byte, elementSize)
2017-12-02 08:24:34 +03:00
}
rb.sizeMemory = make([]int, bufferSize)
2017-12-03 12:14:24 +03:00
rb.size = bufferSize
2017-12-03 01:47:08 +03:00
rb.noOfElements = 0
rb.first = -1
rb.last = -1
rb.currentlyWriting = false
rb.currentlyReading = false
rb.mutex = sync.Mutex{}
return
}
2017-12-02 08:24:34 +03:00
/*
Get provides the address to the next empty element in the RingBuffer.
An error is returned if the buffer is full, or there has already been
a call to Get without an accompanying call to DoneWriting.
*/
func (rb *ringBuffer) Get() ([]byte, error) {
2017-12-03 01:47:08 +03:00
rb.mutex.Lock()
defer rb.mutex.Unlock()
if rb.noOfElements == rb.size {
2017-12-02 08:24:34 +03:00
return nil, errors.New("Buffer full!")
}
2017-12-03 01:47:08 +03:00
if rb.currentlyWriting {
2017-12-02 08:24:34 +03:00
return nil, errors.New("Second call to Get! Call DoneWriting first!")
}
2017-12-03 01:47:08 +03:00
rb.currentlyWriting = true
nextlast := rb.last + 1
if nextlast == rb.size {
nextlast = 0
2017-12-02 08:24:34 +03:00
}
return rb.dataMemory[nextlast], nil
2017-12-02 08:24:34 +03:00
}
/*
DoneWriting let's the RingBuffer know that we have finished writing to the
address recieved by a call to Get, and also let's the buffer know how much
data was written. An Error is thrown if there was not an initial call to Get.
*/
func (rb *ringBuffer) DoneWriting(size int) error {
2017-12-03 01:47:08 +03:00
rb.mutex.Lock()
defer rb.mutex.Unlock()
if !rb.currentlyWriting {
2017-12-02 08:24:34 +03:00
return errors.New("DoneWriting called without initial call to Get!")
}
2017-12-03 01:47:08 +03:00
if rb.first == -1 {
rb.first++
2017-12-02 08:24:34 +03:00
}
2017-12-03 01:47:08 +03:00
rb.last++
if rb.last == rb.size {
rb.last = 0
2017-12-02 08:24:34 +03:00
}
2017-12-03 12:14:24 +03:00
rb.sizeMemory[rb.last] = size
2017-12-03 01:47:08 +03:00
rb.noOfElements++
rb.currentlyWriting = false
2017-12-02 08:24:34 +03:00
return nil
}
/*
Read gets the address of the next element that we would like to get data
from and thereafter consider empty in the buffer for writing during latter
calls to Get. The address of the data is returned, as well as the size of
2017-12-03 01:31:00 +03:00
the data contained at this address. An error is returned if the buffer is
2017-12-02 08:24:34 +03:00
empty, or there has been a second call to Read before a call to DoneReading.
*/
func (rb *ringBuffer) Read() ([]byte, error) {
2017-12-03 01:47:08 +03:00
rb.mutex.Lock()
defer rb.mutex.Unlock()
2017-12-02 08:24:34 +03:00
if !rb.CanRead() {
return nil, errors.New("Buffer is empty, nothging to read!")
2017-12-02 08:24:34 +03:00
}
2017-12-03 01:47:08 +03:00
if rb.currentlyReading {
return nil, errors.New("Second call to Read! Call DoneReading first!")
2017-12-02 08:24:34 +03:00
}
2017-12-03 01:47:08 +03:00
rb.currentlyReading = true
return rb.dataMemory[rb.first][:rb.sizeMemory[rb.first]], nil
2017-12-02 08:24:34 +03:00
}
/*
2017-12-03 01:28:44 +03:00
DoneReading informs the buffer that we have finished with the address
2017-12-03 01:31:00 +03:00
provided by Read. The buffer can now consider this address to be empty.
2017-12-02 08:24:34 +03:00
An error is returned if there was no initial call to Read.
*/
func (rb *ringBuffer) DoneReading() error {
2017-12-03 01:47:08 +03:00
rb.mutex.Lock()
defer rb.mutex.Unlock()
if !rb.currentlyReading {
2017-12-02 08:24:34 +03:00
return errors.New("DoneReading called but no initial call to Read!")
}
2017-12-03 01:47:08 +03:00
rb.first++
if rb.first == rb.size {
rb.first = 0
2017-12-02 08:24:34 +03:00
}
2017-12-03 01:47:08 +03:00
rb.noOfElements--
rb.currentlyReading = false
2017-12-02 08:24:34 +03:00
return nil
}
/*
CanRead returns true if it is possible to read from the buffer, i.e. if
2017-12-02 08:24:34 +03:00
it is not empty.
*/
func (rb *ringBuffer) CanRead() bool {
2017-12-03 01:47:08 +03:00
if rb.first == -1 || rb.noOfElements == 0 {
2017-12-02 08:24:34 +03:00
return false
}
return true
}