mirror of https://bitbucket.org/ausocean/av.git
rtp: started using mpegts encoder inside rtp encoder so that fragmentation i.e. smaller rtp packets is easier. Streams fine.
This commit is contained in:
parent
42097ddef7
commit
16614df9f5
|
@ -389,7 +389,6 @@ loop:
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
r.destination.release()
|
||||
|
||||
r.config.Logger.Log(smartlogger.Debug, pkg+"done reading that clip from ring buffer")
|
||||
|
|
|
@ -29,7 +29,6 @@ LICENSE
|
|||
package revid
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"os"
|
||||
|
@ -318,7 +317,6 @@ func (s *udpSender) load(c *ring.Chunk) error {
|
|||
}
|
||||
|
||||
func (s *udpSender) send() error {
|
||||
fmt.Println(len(s.chunk.Bytes()))
|
||||
_, err := s.chunk.WriteTo(s.conn)
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -163,6 +163,8 @@ type Encoder struct {
|
|||
frameInterval time.Duration
|
||||
ptsOffset time.Duration
|
||||
|
||||
psiCount uint
|
||||
|
||||
continuity map[int]byte
|
||||
}
|
||||
|
||||
|
@ -195,32 +197,35 @@ const (
|
|||
// generate handles the incoming data and generates equivalent mpegts packets -
|
||||
// sending them to the output channel
|
||||
func (e *Encoder) Encode(nalu []byte) error {
|
||||
// Write PAT
|
||||
patPkt := Packet{
|
||||
PUSI: true,
|
||||
PID: patPid,
|
||||
CC: e.ccFor(patPid),
|
||||
AFC: hasPayload,
|
||||
Payload: patTable,
|
||||
}
|
||||
_, err := e.dst.Write(patPkt.Bytes())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if e.psiCount == 0 {
|
||||
// Write PAT
|
||||
patPkt := Packet{
|
||||
PUSI: true,
|
||||
PID: patPid,
|
||||
CC: e.ccFor(patPid),
|
||||
AFC: hasPayload,
|
||||
Payload: patTable,
|
||||
}
|
||||
_, err := e.dst.Write(patPkt.Bytes())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Write PMT.
|
||||
pmtPkt := Packet{
|
||||
PUSI: true,
|
||||
PID: pmtPid,
|
||||
CC: e.ccFor(pmtPid),
|
||||
AFC: hasPayload,
|
||||
Payload: pmtTable,
|
||||
// Write PMT.
|
||||
pmtPkt := Packet{
|
||||
PUSI: true,
|
||||
PID: pmtPid,
|
||||
CC: e.ccFor(pmtPid),
|
||||
AFC: hasPayload,
|
||||
Payload: pmtTable,
|
||||
}
|
||||
_, err = e.dst.Write(pmtPkt.Bytes())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e.psiCount = 100
|
||||
}
|
||||
_, err = e.dst.Write(pmtPkt.Bytes())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
e.psiCount -= 1
|
||||
// Prepare PES data.
|
||||
pesPkt := pes.Packet{
|
||||
StreamID: streamID,
|
||||
|
@ -250,7 +255,6 @@ func (e *Encoder) Encode(nalu []byte) error {
|
|||
pkt.PCR = e.pcr()
|
||||
pusi = false
|
||||
}
|
||||
|
||||
_, err := e.dst.Write(pkt.Bytes())
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -28,6 +28,7 @@ LICENSE
|
|||
package rtp
|
||||
|
||||
import (
|
||||
"bitbucket.org/ausocean/av/stream/mts"
|
||||
"io"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
@ -36,51 +37,97 @@ import (
|
|||
const (
|
||||
yes = 1
|
||||
no = 0
|
||||
defaultPktType = 1
|
||||
defaultPktType = 33
|
||||
timestampFreq = 90000 // Hz
|
||||
mtsSize = 188
|
||||
bufferSize = 1000
|
||||
)
|
||||
|
||||
type Queue struct {
|
||||
buf [bufferSize][mtsSize]byte
|
||||
right uint
|
||||
left uint
|
||||
len uint
|
||||
}
|
||||
|
||||
func (q *Queue) Write(frame []byte) (int, error) {
|
||||
copy(q.buf[q.right][:], frame)
|
||||
q.right += 1
|
||||
q.len += 1
|
||||
return 188, nil
|
||||
}
|
||||
|
||||
func (q *Queue) Read() []byte {
|
||||
q.left += 1
|
||||
q.len -= 1
|
||||
return q.buf[q.left-1][:]
|
||||
}
|
||||
|
||||
func (q *Queue) Len() uint {
|
||||
return q.len
|
||||
}
|
||||
|
||||
func (q *Queue) Reset() {
|
||||
q.left = 0
|
||||
q.right = 0
|
||||
q.len = 0
|
||||
}
|
||||
|
||||
type Encoder struct {
|
||||
dst io.Writer
|
||||
ssrc uint32
|
||||
seqNo uint16
|
||||
clock time.Duration
|
||||
frameInterval time.Duration
|
||||
fps int
|
||||
mtsEncoder *mts.Encoder
|
||||
queue *Queue
|
||||
}
|
||||
|
||||
// NewEncoder returns a new Encoder type given an io.Writer - the destination
|
||||
// after encoding and the desired fps
|
||||
func NewEncoder(dst io.Writer, fps int) *Encoder {
|
||||
q := &Queue{}
|
||||
return &Encoder{
|
||||
dst: dst,
|
||||
ssrc: rand.Uint32(),
|
||||
frameInterval: time.Duration(float64(time.Second) / float64(fps)),
|
||||
fps: fps,
|
||||
mtsEncoder: mts.NewEncoder(q, float64(fps)),
|
||||
queue: q,
|
||||
}
|
||||
}
|
||||
|
||||
// Encode takes a nalu unit and encodes it into an rtp packet and
|
||||
// writes to the io.Writer given in NewEncoder
|
||||
func (e *Encoder) Encode(nalu []byte) error {
|
||||
pkt := Pkt{
|
||||
V: rtpVer, // version
|
||||
P: no, // padding
|
||||
X: no, // header extension
|
||||
CC: no, // CSRC count
|
||||
M: no, // NOTE: need to check if this works (decoders should ignore this)
|
||||
PT: defaultPktType, // NOTE: 1-23 according to rtp-h264 specs (don't think we need this)
|
||||
SN: e.nxtSeqNo(), // sequence number
|
||||
TS: e.nxtTimestamp(), // timestamp
|
||||
SSRC: e.ssrc, // source identifier
|
||||
Payload: nalu,
|
||||
Padding: no,
|
||||
}
|
||||
e.mtsEncoder.Encode(nalu)
|
||||
for e.queue.Len() > 0 {
|
||||
var payload []byte
|
||||
for i := 0; i < 7 && e.queue.Len() > 0; i++ {
|
||||
payload = append(payload, e.queue.Read()...)
|
||||
}
|
||||
pkt := Pkt{
|
||||
V: rtpVer, // version
|
||||
P: no, // padding
|
||||
X: no, // header extension
|
||||
CC: no, // CSRC count
|
||||
M: no, // NOTE: need to check if this works (decoders should ignore this)
|
||||
PT: defaultPktType, // 33 for mpegts
|
||||
SN: e.nxtSeqNo(), // sequence number
|
||||
TS: e.nxtTimestamp(), // timestamp
|
||||
SSRC: e.ssrc, // source identifier
|
||||
Payload: payload,
|
||||
Padding: no,
|
||||
}
|
||||
_, err := e.dst.Write(pkt.Bytes())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err := e.dst.Write(pkt.Bytes())
|
||||
if err != nil {
|
||||
return err
|
||||
e.tick()
|
||||
}
|
||||
|
||||
e.tick()
|
||||
e.queue.Reset()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue