/* NAME encoder.go DESCRIPTION See Readme.md AUTHOR Saxon Nelson-Milton (saxon@ausocean.org) LICENSE encoder.go is Copyright (C) 2018 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. */ package rtp import ( "errors" "io" "math/rand" "time" "bitbucket.org/ausocean/av/stream/mts" ) const ( yes = 1 no = 0 defaultPktType = 33 timestampFreq = 90000 // Hz mtsSize = 188 bufferSize = 1000 maxMtsPerRtp = 7 ) type Queue struct { buf [][]byte right uint left uint len uint } func NewQueue(maxElements, elementSize uint) (*Queue, error) { if maxElements <= 0 { return nil, errors.New("Max elements of queue must be more than 0") } if elementSize <= 0 { return nil, errors.New("Element size in queue must be more than 0") } buf := make([][]byte, maxElements) for i := range buf { buf[i] = make([]byte, elementSize) } return &Queue{ buf: buf, right: 0, left: 0, len: 0, }, nil } func (q *Queue) Write(frame []byte) (int, error) { if q.right > uint(len(q.buf)) { return 0, errors.New("Queue is full, cannot perform write.") } copy(q.buf[q.right], frame) q.right += 1 q.len += 1 return len(frame), nil } func (q *Queue) Read() ([]byte, error) { if q.left >= q.right { return nil, errors.New("Nothing to read from queue.") } q.left += 1 q.len -= 1 return q.buf[q.left-1], nil } func (q *Queue) Len() uint { return q.len } func (q *Queue) Reset() { q.left = 0 q.right = 0 q.len = 0 } type Encoder struct { dst io.Writer ssrc uint32 seqNo uint16 clock time.Duration frameInterval time.Duration fps int mtsEncoder *mts.Encoder queue *Queue } // NewEncoder returns a new Encoder type given an io.Writer - the destination // after encoding and the desired fps func NewEncoder(dst io.Writer, fps int) *Encoder { q, _ := NewQueue(bufferSize, mtsSize) return &Encoder{ dst: dst, ssrc: rand.Uint32(), frameInterval: time.Duration(float64(time.Second) / float64(fps)), fps: fps, mtsEncoder: mts.NewEncoder(q, float64(fps)), queue: q, } } // Encode takes a nalu unit and encodes it into an rtp packet and // writes to the io.Writer given in NewEncoder func (e *Encoder) Encode(nalu []byte) error { e.mtsEncoder.Encode(nalu) for e.queue.Len() > 0 { var payload []byte for i := 0; i < maxMtsPerRtp && e.queue.Len() > 0; i++ { data, _ := e.queue.Read() payload = append(payload, data...) } pkt := Pkt{ V: rtpVer, // version P: no, // padding X: no, // header extension CC: no, // CSRC count M: no, // NOTE: need to check if this works (decoders should ignore this) PT: defaultPktType, // 33 for mpegts SN: e.nxtSeqNo(), // sequence number TS: e.nxtTimestamp(), // timestamp SSRC: e.ssrc, // source identifier Payload: payload, Padding: no, } _, err := e.dst.Write(pkt.Bytes()) if err != nil { return err } e.tick() } e.queue.Reset() return nil } // tick advances the clock one frame interval. func (e *Encoder) tick() { e.clock += e.frameInterval } // nxtTimestamp gets the next timestamp func (e *Encoder) nxtTimestamp() uint32 { return uint32(e.clock.Seconds() * timestampFreq) } // nxtSeqNo gets the next rtp packet sequence number func (e *Encoder) nxtSeqNo() uint16 { e.seqNo += 1 return e.seqNo - 1 }