diff --git a/revid/revid.go b/revid/revid.go index 11b7482e..e481bf51 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -389,7 +389,6 @@ loop: } } } - r.destination.release() r.config.Logger.Log(smartlogger.Debug, pkg+"done reading that clip from ring buffer") diff --git a/revid/senders.go b/revid/senders.go index fac4b70a..940a9c14 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -29,7 +29,6 @@ LICENSE package revid import ( - "fmt" "io" "net" "os" @@ -318,7 +317,6 @@ func (s *udpSender) load(c *ring.Chunk) error { } func (s *udpSender) send() error { - fmt.Println(len(s.chunk.Bytes())) _, err := s.chunk.WriteTo(s.conn) return err } diff --git a/stream/mts/encoder.go b/stream/mts/encoder.go index 3100c96e..8d557123 100644 --- a/stream/mts/encoder.go +++ b/stream/mts/encoder.go @@ -163,6 +163,8 @@ type Encoder struct { frameInterval time.Duration ptsOffset time.Duration + psiCount uint + continuity map[int]byte } @@ -195,32 +197,35 @@ const ( // generate handles the incoming data and generates equivalent mpegts packets - // sending them to the output channel func (e *Encoder) Encode(nalu []byte) error { - // Write PAT - patPkt := Packet{ - PUSI: true, - PID: patPid, - CC: e.ccFor(patPid), - AFC: hasPayload, - Payload: patTable, - } - _, err := e.dst.Write(patPkt.Bytes()) - if err != nil { - return err - } + if e.psiCount == 0 { + // Write PAT + patPkt := Packet{ + PUSI: true, + PID: patPid, + CC: e.ccFor(patPid), + AFC: hasPayload, + Payload: patTable, + } + _, err := e.dst.Write(patPkt.Bytes()) + if err != nil { + return err + } - // Write PMT. - pmtPkt := Packet{ - PUSI: true, - PID: pmtPid, - CC: e.ccFor(pmtPid), - AFC: hasPayload, - Payload: pmtTable, + // Write PMT. + pmtPkt := Packet{ + PUSI: true, + PID: pmtPid, + CC: e.ccFor(pmtPid), + AFC: hasPayload, + Payload: pmtTable, + } + _, err = e.dst.Write(pmtPkt.Bytes()) + if err != nil { + return err + } + e.psiCount = 100 } - _, err = e.dst.Write(pmtPkt.Bytes()) - if err != nil { - return err - } - + e.psiCount -= 1 // Prepare PES data. pesPkt := pes.Packet{ StreamID: streamID, @@ -250,7 +255,6 @@ func (e *Encoder) Encode(nalu []byte) error { pkt.PCR = e.pcr() pusi = false } - _, err := e.dst.Write(pkt.Bytes()) if err != nil { return err diff --git a/stream/rtp/encoder.go b/stream/rtp/encoder.go index 68384fd4..e082edeb 100644 --- a/stream/rtp/encoder.go +++ b/stream/rtp/encoder.go @@ -28,6 +28,7 @@ LICENSE package rtp import ( + "bitbucket.org/ausocean/av/stream/mts" "io" "math/rand" "time" @@ -36,51 +37,97 @@ import ( const ( yes = 1 no = 0 - defaultPktType = 1 + defaultPktType = 33 timestampFreq = 90000 // Hz + mtsSize = 188 + bufferSize = 1000 ) +type Queue struct { + buf [bufferSize][mtsSize]byte + right uint + left uint + len uint +} + +func (q *Queue) Write(frame []byte) (int, error) { + copy(q.buf[q.right][:], frame) + q.right += 1 + q.len += 1 + return 188, nil +} + +func (q *Queue) Read() []byte { + q.left += 1 + q.len -= 1 + return q.buf[q.left-1][:] +} + +func (q *Queue) Len() uint { + return q.len +} + +func (q *Queue) Reset() { + q.left = 0 + q.right = 0 + q.len = 0 +} + type Encoder struct { dst io.Writer ssrc uint32 seqNo uint16 clock time.Duration frameInterval time.Duration + fps int + mtsEncoder *mts.Encoder + queue *Queue } // NewEncoder returns a new Encoder type given an io.Writer - the destination // after encoding and the desired fps func NewEncoder(dst io.Writer, fps int) *Encoder { + q := &Queue{} return &Encoder{ dst: dst, ssrc: rand.Uint32(), frameInterval: time.Duration(float64(time.Second) / float64(fps)), + fps: fps, + mtsEncoder: mts.NewEncoder(q, float64(fps)), + queue: q, } } // Encode takes a nalu unit and encodes it into an rtp packet and // writes to the io.Writer given in NewEncoder func (e *Encoder) Encode(nalu []byte) error { - pkt := Pkt{ - V: rtpVer, // version - P: no, // padding - X: no, // header extension - CC: no, // CSRC count - M: no, // NOTE: need to check if this works (decoders should ignore this) - PT: defaultPktType, // NOTE: 1-23 according to rtp-h264 specs (don't think we need this) - SN: e.nxtSeqNo(), // sequence number - TS: e.nxtTimestamp(), // timestamp - SSRC: e.ssrc, // source identifier - Payload: nalu, - Padding: no, - } + e.mtsEncoder.Encode(nalu) + for e.queue.Len() > 0 { + var payload []byte + for i := 0; i < 7 && e.queue.Len() > 0; i++ { + payload = append(payload, e.queue.Read()...) + } + pkt := Pkt{ + V: rtpVer, // version + P: no, // padding + X: no, // header extension + CC: no, // CSRC count + M: no, // NOTE: need to check if this works (decoders should ignore this) + PT: defaultPktType, // 33 for mpegts + SN: e.nxtSeqNo(), // sequence number + TS: e.nxtTimestamp(), // timestamp + SSRC: e.ssrc, // source identifier + Payload: payload, + Padding: no, + } + _, err := e.dst.Write(pkt.Bytes()) + if err != nil { + return err + } - _, err := e.dst.Write(pkt.Bytes()) - if err != nil { - return err + e.tick() } - - e.tick() + e.queue.Reset() return nil }