diff --git a/stream/rtp/encoder.go b/stream/rtp/encoder.go index e082edeb..becb9c6e 100644 --- a/stream/rtp/encoder.go +++ b/stream/rtp/encoder.go @@ -29,6 +29,7 @@ package rtp import ( "bitbucket.org/ausocean/av/stream/mts" + "errors" "io" "math/rand" "time" @@ -41,26 +42,52 @@ const ( timestampFreq = 90000 // Hz mtsSize = 188 bufferSize = 1000 + maxMtsPerRtp = 7 ) type Queue struct { - buf [bufferSize][mtsSize]byte + buf [][]byte right uint left uint len uint } -func (q *Queue) Write(frame []byte) (int, error) { - copy(q.buf[q.right][:], frame) - q.right += 1 - q.len += 1 - return 188, nil +func NewQueue(maxElements, elementSize uint) (*Queue, error) { + if maxElements <= 0 { + return nil, errors.New("Max elements of queue must be more than 0") + } + if elementSize <= 0 { + return nil, errors.New("Element size in queue must be more than 0") + } + buf := make([][]byte, maxElements) + for i := range buf { + buf[i] = make([]byte, elementSize) + } + return &Queue{ + buf: buf, + right: 0, + left: 0, + len: 0, + }, nil } -func (q *Queue) Read() []byte { +func (q *Queue) Write(frame []byte) (int, error) { + if q.right > uint(len(q.buf)) { + return 0, errors.New("Queue is full, cannot perform write.") + } + copy(q.buf[q.right], frame) + q.right += 1 + q.len += 1 + return len(frame), nil +} + +func (q *Queue) Read() ([]byte, error) { + if q.left >= q.right { + return nil, errors.New("Nothing to read from queue.") + } q.left += 1 q.len -= 1 - return q.buf[q.left-1][:] + return q.buf[q.left-1], nil } func (q *Queue) Len() uint { @@ -104,8 +131,9 @@ func (e *Encoder) Encode(nalu []byte) error { e.mtsEncoder.Encode(nalu) for e.queue.Len() > 0 { var payload []byte - for i := 0; i < 7 && e.queue.Len() > 0; i++ { - payload = append(payload, e.queue.Read()...) + for i := 0; i < maxMtsPerRtp && e.queue.Len() > 0; i++ { + data, _ := e.queue.Read() + payload = append(payload, data...) } pkt := Pkt{ V: rtpVer, // version