Merged in mts-chunking (pull request #146)

Mts chunking

Approved-by: kortschak <dan@kortschak.io>
This commit is contained in:
Saxon Milton 2019-03-05 06:15:03 +00:00
commit 5280f037ca
7 changed files with 614 additions and 62 deletions

260
revid/mtsSender_test.go Normal file
View File

@ -0,0 +1,260 @@
/*
NAME
mtsSender_test.go
DESCRIPTION
mtsSender_test.go contains tests that validate the functionalilty of the
mtsSender under senders.go. Tests include checks that the mtsSender is
segmenting sends correctly, and also that it can correct discontinuities.
AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org>
LICENSE
mtsSender_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package revid
import (
"errors"
"fmt"
"testing"
"time"
"github.com/Comcast/gots/packet"
"github.com/Comcast/gots/pes"
"bitbucket.org/ausocean/av/stream/mts"
"bitbucket.org/ausocean/av/stream/mts/meta"
"bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring"
)
// Ring buffer sizes and read/write timeouts.
const (
rbSize = 100
rbElementSize = 150000
wTimeout = 10 * time.Millisecond
rTimeout = 10 * time.Millisecond
)
// sender simulates sending of video data, creating discontinuities if
// testDiscontinuities is set to true.
type sender struct {
buf [][]byte
testDiscontinuities bool
discontinuityAt int
currentPkt int
}
// send takes d and neglects if testDiscontinuities is true, returning an error,
// otherwise d is appended to senders buf.
func (ts *sender) send(d []byte) error {
if ts.testDiscontinuities && ts.currentPkt == ts.discontinuityAt {
ts.currentPkt++
return errors.New("could not send")
}
cpy := make([]byte, len(d))
copy(cpy, d)
ts.buf = append(ts.buf, cpy)
ts.currentPkt++
return nil
}
// log implements the required logging func for some of the structs in use
// within tests.
func log(lvl int8, msg string, args ...interface{}) {
var l string
switch lvl {
case logger.Warning:
l = "warning"
case logger.Debug:
l = "debug"
case logger.Info:
l = "info"
case logger.Error:
l = "error"
case logger.Fatal:
l = "fatal"
}
msg = l + ": " + msg
for i := 0; i < len(args); i++ {
msg += " %v"
}
fmt.Printf(msg, args)
}
// buffer implements io.Writer and handles the writing of data to a
// ring buffer used in tests.
type buffer ring.Buffer
// Write implements the io.Writer interface.
func (b *buffer) Write(d []byte) (int, error) {
r := (*ring.Buffer)(b)
n, err := r.Write(d)
r.Flush()
return n, err
}
// TestSegment ensures that the mtsSender correctly segments data into clips
// based on positioning of PSI in the mtsEncoder's output stream.
func TestSegment(t *testing.T) {
mts.Meta = meta.New()
// Create ringBuffer, sender, loadsender and the MPEGTS encoder.
tstSender := &sender{}
loadSender := newMtsSender(tstSender, log)
rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout)
encoder := mts.NewEncoder((*buffer)(rb), 25)
// Turn time based PSI writing off for encoder.
const psiSendCount = 10
encoder.TimeBasedPsi(false, psiSendCount)
const noOfPacketsToWrite = 100
for i := 0; i < noOfPacketsToWrite; i++ {
// Insert a payload so that we check that the segmentation works correctly
// in this regard. Packet number will be used.
encoder.Encode([]byte{byte(i)})
rb.Flush()
for {
next, err := rb.Next(rTimeout)
if err != nil {
break
}
err = loadSender.load(next)
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
err = loadSender.send()
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
loadSender.release()
}
}
result := tstSender.buf
expectData := 0
for clipNo, clip := range result {
t.Logf("Checking clip: %v\n", clipNo)
// Check that the clip is of expected length.
clipLen := len(clip)
if clipLen != psiSendCount*mts.PacketSize {
t.Fatalf("Clip %v is not correct length. Got: %v Want: %v\n Clip: %v\n", clipNo, clipLen, psiSendCount*mts.PacketSize, clip)
}
// Also check that the first packet is a PAT.
firstPkt := clip[:mts.PacketSize]
var pkt packet.Packet
copy(pkt[:], firstPkt)
pid := pkt.PID()
if pid != mts.PatPid {
t.Fatalf("First packet of clip %v is not pat, but rather: %v\n", clipNo, pid)
}
// Check that the clip data is okay.
for i := 0; i < len(clip); i += mts.PacketSize {
copy(pkt[:], clip[i:i+mts.PacketSize])
if pkt.PID() == mts.VideoPid {
payload, err := pkt.Payload()
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
// Parse PES from the MTS payload.
pes, err := pes.NewPESHeader(payload)
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
// Get the data from the PES packet and convert to an int.
data := int8(pes.Data()[0])
// Calc expected data in the PES and then check.
if data != int8(expectData) {
t.Errorf("Did not get expected pkt data. ClipNo: %v, pktNoInClip: %v, Got: %v, want: %v\n", clipNo, i/mts.PacketSize, data, expectData)
}
expectData++
}
}
}
}
func TestSendFailDiscontinuity(t *testing.T) {
mts.Meta = meta.New()
// Create ringBuffer sender, loadSender and the MPEGTS encoder.
const clipWithDiscontinuity = 3
tstSender := &sender{testDiscontinuities: true, discontinuityAt: clipWithDiscontinuity}
loadSender := newMtsSender(tstSender, log)
rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout)
encoder := mts.NewEncoder((*buffer)(rb), 25)
// Turn time based PSI writing off for encoder.
const psiSendCount = 10
encoder.TimeBasedPsi(false, psiSendCount)
const noOfPacketsToWrite = 100
for i := 0; i < noOfPacketsToWrite; i++ {
// Our payload will just be packet number.
encoder.Encode([]byte{byte(i)})
rb.Flush()
for {
next, err := rb.Next(rTimeout)
if err != nil {
break
}
err = loadSender.load(next)
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
loadSender.send()
loadSender.release()
}
}
result := tstSender.buf
// First check that we have less clips as expected.
expectedLen := (((noOfPacketsToWrite/psiSendCount)*2 + noOfPacketsToWrite) / psiSendCount) - 1
gotLen := len(result)
if gotLen != expectedLen {
t.Errorf("We don't have one less clip as we should. Got: %v, want: %v\n", gotLen, expectedLen)
}
// Now check that the discontinuity indicator is set at the discontinuityClip PAT.
disconClip := result[clipWithDiscontinuity]
firstPkt := disconClip[:mts.PacketSize]
var pkt packet.Packet
copy(pkt[:], firstPkt)
discon, err := (*packet.AdaptationField)(&pkt).Discontinuity()
if err != nil {
t.Fatalf("Unexpected err: %v\n", err)
}
if !discon {
t.Fatalf("Did not get discontinuity indicator for PAT")
}
}

View File

@ -114,10 +114,6 @@ type Revid struct {
// destination is the target endpoint. // destination is the target endpoint.
destination []loadSender destination []loadSender
// rtpSender is an unbuffered sender.
// It is used to isolate RTP from ring buffer-induced delays.
rtpSender *rtpSender
// bitrate hold the last send bitrate calculation result. // bitrate hold the last send bitrate calculation result.
bitrate int bitrate int
@ -143,40 +139,22 @@ type packer struct {
// are deemed to be successful, although a successful // are deemed to be successful, although a successful
// write may include a dropped frame. // write may include a dropped frame.
func (p *packer) Write(frame []byte) (int, error) { func (p *packer) Write(frame []byte) (int, error) {
if len(p.owner.destination) != 0 { if len(p.owner.destination) == 0 {
n, err := p.owner.buffer.Write(frame) panic("must have at least 1 destination")
if err != nil {
if err == ring.ErrDropped {
p.owner.config.Logger.Log(logger.Warning, pkg+"dropped frame", "frame size", len(frame))
return len(frame), nil
}
p.owner.config.Logger.Log(logger.Error, pkg+"unexpected ring buffer write error", "error", err.Error())
return n, err
}
} }
// If we have an rtp sender bypass ringbuffer and give straight to sender n, err := p.owner.buffer.Write(frame)
if p.owner.rtpSender != nil { if err != nil {
err := p.owner.rtpSender.send(frame) if err == ring.ErrDropped {
if err != nil { p.owner.config.Logger.Log(logger.Warning, pkg+"dropped frame", "frame size", len(frame))
p.owner.config.Logger.Log(logger.Error, pkg+"rtp send failed with error", "error", err.Error()) return len(frame), nil
} }
p.owner.config.Logger.Log(logger.Error, pkg+"unexpected ring buffer write error", "error", err.Error())
return n, err
} }
p.packetCount++ p.owner.buffer.Flush()
var hasRtmp bool
for _, d := range p.owner.config.Outputs {
if d == Rtmp {
hasRtmp = true
break
}
}
now := time.Now()
if hasRtmp || (now.Sub(p.lastTime) > clipDuration && p.packetCount%7 == 0) {
p.owner.buffer.Flush()
p.packetCount = 0
p.lastTime = now
}
return len(frame), nil return len(frame), nil
} }
@ -232,7 +210,7 @@ func (r *Revid) reset(config Config) error {
r.buffer = ring.NewBuffer(mtsRbSize, mtsRbElementSize, writeTimeout) r.buffer = ring.NewBuffer(mtsRbSize, mtsRbElementSize, writeTimeout)
} }
r.destination = r.destination[:0] r.destination = make([]loadSender, 0, len(r.config.Outputs))
for _, typ := range r.config.Outputs { for _, typ := range r.config.Outputs {
switch typ { switch typ {
case File: case File:
@ -254,7 +232,12 @@ func (r *Revid) reset(config Config) error {
} }
r.destination = append(r.destination, s) r.destination = append(r.destination, s)
case Http: case Http:
r.destination = append(r.destination, newHttpSender(r.ns, r.config.Logger.Log)) switch r.Config().Packetization {
case Mpegts:
r.destination = append(r.destination, newMtsSender(newMinimalHttpSender(r.ns, r.config.Logger.Log), nil))
default:
r.destination = append(r.destination, newHttpSender(r.ns, r.config.Logger.Log))
}
case Udp: case Udp:
s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log) s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log)
if err != nil { if err != nil {
@ -262,10 +245,11 @@ func (r *Revid) reset(config Config) error {
} }
r.destination = append(r.destination, s) r.destination = append(r.destination, s)
case Rtp: case Rtp:
r.rtpSender, err = newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) s, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate)
if err != nil { if err != nil {
return err return err
} }
r.destination = append(r.destination, s)
} }
} }

View File

@ -29,7 +29,6 @@ LICENSE
package revid package revid
import ( import (
"errors"
"fmt" "fmt"
"io" "io"
"net" "net"
@ -37,6 +36,8 @@ import (
"os/exec" "os/exec"
"strconv" "strconv"
"github.com/Comcast/gots/packet"
"bitbucket.org/ausocean/av/rtmp" "bitbucket.org/ausocean/av/rtmp"
"bitbucket.org/ausocean/av/stream/mts" "bitbucket.org/ausocean/av/stream/mts"
"bitbucket.org/ausocean/av/stream/rtp" "bitbucket.org/ausocean/av/stream/rtp"
@ -45,6 +46,33 @@ import (
"bitbucket.org/ausocean/utils/ring" "bitbucket.org/ausocean/utils/ring"
) )
// Sender is intended to provided functionality for the sending of a byte slice
// to an implemented destination.
type Sender interface {
// send takes the bytes slice d and sends to a particular destination as
// implemented.
send(d []byte) error
}
// minimalHttpSender implements Sender for posting HTTP to netreceiver or vidgrind.
type minimalHttpSender struct {
client *netsender.Sender
log func(lvl int8, msg string, args ...interface{})
}
// newMinimalHttpSender returns a pointer to a new minimalHttpSender.
func newMinimalHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *minimalHttpSender {
return &minimalHttpSender{
client: ns,
log: log,
}
}
// send takes a bytes slice d and sends to http using s' http client.
func (s *minimalHttpSender) send(d []byte) error {
return httpSend(d, s.client, s.log)
}
// loadSender is a destination to send a *ring.Chunk to. // loadSender is a destination to send a *ring.Chunk to.
// When a loadSender has finished using the *ring.Chunk // When a loadSender has finished using the *ring.Chunk
// it must be Closed. // it must be Closed.
@ -105,6 +133,86 @@ func (s *fileSender) close() error {
return s.file.Close() return s.file.Close()
} }
// mtsSender implemented loadSender and provides sending capability specifically
// for use with MPEGTS packetization. It handles the construction of appropriately
// lengthed clips based on PSI. It also fixes accounts for discontinuities by
// setting the discontinuity indicator for the first packet of a clip.
type mtsSender struct {
sender Sender
buf []byte
next []byte
pkt packet.Packet
failed bool
discarded bool
repairer *mts.DiscontinuityRepairer
chunk *ring.Chunk
curPid int
}
// newMtsSender returns a new mtsSender.
func newMtsSender(s Sender, log func(lvl int8, msg string, args ...interface{})) *mtsSender {
return &mtsSender{
sender: s,
repairer: mts.NewDiscontinuityRepairer(),
}
}
// load takes a *ring.Chunk and assigns to s.next, also grabbing it's pid and
// assigning to s.curPid. s.next if exists is also appended to the sender buf.
func (s *mtsSender) load(c *ring.Chunk) error {
if s.next != nil {
s.buf = append(s.buf, s.next...)
}
s.chunk = c
bytes := s.chunk.Bytes()
s.next = bytes
copy(s.pkt[:], bytes)
s.curPid = s.pkt.PID()
return nil
}
// send checks the currently loaded paackets PID; if it is a PAT then what is in
// the mtsSenders buffer is fixed and sent.
func (ms *mtsSender) send() error {
if ms.curPid == mts.PatPid && len(ms.buf) > 0 {
err := ms.fixAndSend()
if err != nil {
return err
}
ms.buf = ms.buf[:0]
}
return nil
}
// fixAndSend checks for discontinuities in the senders buffer and then sends.
// If a discontinuity is found the PAT packet at the start of the clip has it's
// discontintuity indicator set to true.
func (ms *mtsSender) fixAndSend() error {
err := ms.repairer.Repair(ms.buf)
if err == nil {
err = ms.sender.send(ms.buf)
if err == nil {
return nil
}
}
ms.failed = true
ms.repairer.Failed()
return err
}
func (s *mtsSender) close() error { return nil }
// release will set the s.fail flag to false and clear the buffer if
// the previous send was a fail. The currently loaded chunk is also closed.
func (s *mtsSender) release() {
if s.failed {
s.buf = s.buf[:0]
s.failed = false
}
s.chunk.Close()
s.chunk = nil
}
// httpSender implements loadSender for posting HTTP to NetReceiver // httpSender implements loadSender for posting HTTP to NetReceiver
type httpSender struct { type httpSender struct {
client *netsender.Sender client *netsender.Sender
@ -133,15 +241,19 @@ func (s *httpSender) send() error {
// if the chunk has been cleared. // if the chunk has been cleared.
return nil return nil
} }
return httpSend(s.chunk.Bytes(), s.client, s.log)
}
func httpSend(d []byte, client *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) error {
// Only send if "V0" is configured as an input. // Only send if "V0" is configured as an input.
send := false send := false
ip := s.client.Param("ip") ip := client.Param("ip")
pins := netsender.MakePins(ip, "V") pins := netsender.MakePins(ip, "V")
for i, pin := range pins { for i, pin := range pins {
if pin.Name == "V0" { if pin.Name == "V0" {
send = true send = true
pins[i].Value = s.chunk.Len() pins[i].Value = len(d)
pins[i].Data = s.chunk.Bytes() pins[i].Data = d
pins[i].MimeType = "video/mp2t" pins[i].MimeType = "video/mp2t"
break break
} }
@ -152,17 +264,16 @@ func (s *httpSender) send() error {
} }
var err error var err error
var reply string var reply string
reply, _, err = s.client.Send(netsender.RequestRecv, pins) reply, _, err = client.Send(netsender.RequestRecv, pins)
if err != nil { if err != nil {
return err return err
} }
return extractMeta(reply, log)
return s.extractMeta(reply)
} }
// extractMeta looks at a reply at extracts any time or location data - then used // extractMeta looks at a reply at extracts any time or location data - then used
// to update time and location information in the mpegts encoder. // to update time and location information in the mpegts encoder.
func (s *httpSender) extractMeta(r string) error { func extractMeta(r string, log func(lvl int8, msg string, args ...interface{})) error {
dec, err := netsender.NewJSONDecoder(r) dec, err := netsender.NewJSONDecoder(r)
if err != nil { if err != nil {
return nil return nil
@ -170,18 +281,18 @@ func (s *httpSender) extractMeta(r string) error {
// Extract time from reply // Extract time from reply
t, err := dec.Int("ts") t, err := dec.Int("ts")
if err != nil { if err != nil {
s.log(logger.Warning, pkg+"No timestamp in reply") log(logger.Warning, pkg+"No timestamp in reply")
} else { } else {
s.log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t)) log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t))
mts.Meta.Add("ts", strconv.Itoa(t)) mts.Meta.Add("ts", strconv.Itoa(t))
} }
// Extract location from reply // Extract location from reply
g, err := dec.String("ll") g, err := dec.String("ll")
if err != nil { if err != nil {
s.log(logger.Warning, pkg+"No location in reply") log(logger.Warning, pkg+"No location in reply")
} else { } else {
s.log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g)) log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g))
mts.Meta.Add("loc", g) mts.Meta.Add("loc", g)
} }
@ -372,6 +483,7 @@ func (s *udpSender) close() error { return nil }
type rtpSender struct { type rtpSender struct {
log func(lvl int8, msg string, args ...interface{}) log func(lvl int8, msg string, args ...interface{})
encoder *rtp.Encoder encoder *rtp.Encoder
chunk *ring.Chunk
} }
func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps uint) (*rtpSender, error) { func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps uint) (*rtpSender, error) {
@ -386,12 +498,19 @@ func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{
return s, nil return s, nil
} }
func (s *rtpSender) send(d []byte) error { func (s *rtpSender) load(c *ring.Chunk) error {
var err error s.chunk = c
if d != nil { return nil
_, err = s.encoder.Write(d) }
} else {
err = errors.New("no data to send provided") func (s *rtpSender) close() error { return nil }
}
func (s *rtpSender) release() {
s.chunk.Close()
s.chunk = nil
}
func (s *rtpSender) send() error {
_, err := s.chunk.WriteTo(s.encoder)
return err return err
} }

109
stream/mts/discontinuity.go Normal file
View File

@ -0,0 +1,109 @@
/*
NAME
discontinuity.go
DESCRIPTION
discontinuity.go provides functionality for detecting discontinuities in
mpegts and accounting for using the discontinuity indicator in the adaptation
field.
AUTHOR
Saxon A. Nelson-Milton <saxon@ausocean.org>
LICENSE
discontinuity.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses).
*/
package mts
import (
"github.com/Comcast/gots/packet"
)
// discontinuityRepairer provides function to detect discontinuities in mpegts
// and set the discontinuity indicator as appropriate.
type DiscontinuityRepairer struct {
expCC map[int]int
}
// NewDiscontinuityRepairer returns a pointer to a new discontinuityRepairer.
func NewDiscontinuityRepairer() *DiscontinuityRepairer {
return &DiscontinuityRepairer{
expCC: map[int]int{
PatPid: 16,
PmtPid: 16,
VideoPid: 16,
},
}
}
// Failed is to be called in the case of a failed send. This will decrement the
// expectedCC so that it aligns with the failed chunks cc.
func (dr *DiscontinuityRepairer) Failed() {
dr.decExpectedCC(PatPid)
}
// Repair takes a clip of mpegts and checks that the first packet, which should
// be a PAT, contains a cc that is expected, otherwise the discontinuity indicator
// is set to true.
func (dr *DiscontinuityRepairer) Repair(d []byte) error {
var pkt packet.Packet
copy(pkt[:], d[:PacketSize])
pid := pkt.PID()
if pid != PatPid {
panic("Clip to repair must have PAT first")
}
cc := pkt.ContinuityCounter()
expect, _ := dr.ExpectedCC(pid)
if cc != int(expect) {
if packet.ContainsAdaptationField(&pkt) {
(*packet.AdaptationField)(&pkt).SetDiscontinuity(true)
} else {
err := addAdaptationField(&pkt, DiscontinuityIndicator(true))
if err != nil {
return err
}
}
dr.SetExpectedCC(pid, cc)
copy(d[:PacketSize], pkt[:])
}
dr.IncExpectedCC(pid)
return nil
}
// expectedCC returns the expected cc. If the cc hasn't been used yet, then 16
// and false is returned.
func (dr *DiscontinuityRepairer) ExpectedCC(pid int) (int, bool) {
if dr.expCC[pid] == 16 {
return 16, false
}
return dr.expCC[pid], true
}
// incExpectedCC increments the expected cc.
func (dr *DiscontinuityRepairer) IncExpectedCC(pid int) {
dr.expCC[pid] = (dr.expCC[pid] + 1) & 0xf
}
// decExpectedCC decrements the expected cc.
func (dr *DiscontinuityRepairer) decExpectedCC(pid int) {
dr.expCC[pid] = (dr.expCC[pid] - 1) & 0xf
}
// setExpectedCC sets the expected cc.
func (dr *DiscontinuityRepairer) SetExpectedCC(pid, cc int) {
dr.expCC[pid] = cc
}

View File

@ -85,7 +85,8 @@ var (
) )
const ( const (
psiInterval = 1 * time.Second psiInterval = 1 * time.Second
psiSendCount = 7
) )
// Meta allows addition of metadata to encoded mts from outside of this pkg. // Meta allows addition of metadata to encoded mts from outside of this pkg.
@ -130,6 +131,10 @@ type Encoder struct {
continuity map[int]byte continuity map[int]byte
timeBasedPsi bool
pktCount int
psiSendCount int
psiLastTime time.Time psiLastTime time.Time
} }
@ -141,6 +146,10 @@ func NewEncoder(dst io.Writer, fps float64) *Encoder {
frameInterval: time.Duration(float64(time.Second) / fps), frameInterval: time.Duration(float64(time.Second) / fps),
ptsOffset: ptsOffset, ptsOffset: ptsOffset,
timeBasedPsi: true,
pktCount: 8,
continuity: map[int]byte{ continuity: map[int]byte{
patPid: 0, patPid: 0,
pmtPid: 0, pmtPid: 0,
@ -159,11 +168,22 @@ const (
hasPTS = 0x2 hasPTS = 0x2
) )
// TimeBasedPsi allows for the setting of the PSI writing method, therefore, if
// PSI is written based on some time duration, or based on a packet count.
// If b is true, then time based PSI is used, otherwise the PSI is written
// every sendCount.
func (e *Encoder) TimeBasedPsi(b bool, sendCount int) {
e.timeBasedPsi = b
e.psiSendCount = sendCount
e.pktCount = e.psiSendCount
}
// generate handles the incoming data and generates equivalent mpegts packets - // generate handles the incoming data and generates equivalent mpegts packets -
// sending them to the output channel. // sending them to the output channel.
func (e *Encoder) Encode(nalu []byte) error { func (e *Encoder) Encode(nalu []byte) error {
now := time.Now() now := time.Now()
if now.Sub(e.psiLastTime) > psiInterval { if (e.timeBasedPsi && (now.Sub(e.psiLastTime) > psiInterval)) || (!e.timeBasedPsi && (e.pktCount >= e.psiSendCount)) {
e.pktCount = 0
err := e.writePSI() err := e.writePSI()
if err != nil { if err != nil {
return err return err
@ -204,6 +224,7 @@ func (e *Encoder) Encode(nalu []byte) error {
if err != nil { if err != nil {
return err return err
} }
e.pktCount++
} }
e.tick() e.tick()
@ -226,6 +247,7 @@ func (e *Encoder) writePSI() error {
if err != nil { if err != nil {
return err return err
} }
e.pktCount++
pmtTable, err = updateMeta(pmtTable) pmtTable, err = updateMeta(pmtTable)
if err != nil { if err != nil {
return err return err
@ -243,6 +265,7 @@ func (e *Encoder) writePSI() error {
if err != nil { if err != nil {
return err return err
} }
e.pktCount++
return nil return nil
} }

View File

@ -144,6 +144,9 @@ func (m *Data) Delete(key string) {
// Encode takes the meta data map and encodes into a byte slice with header // Encode takes the meta data map and encodes into a byte slice with header
// describing the version, length of data and data in TSV format. // describing the version, length of data and data in TSV format.
func (m *Data) Encode() []byte { func (m *Data) Encode() []byte {
if m.enc == nil {
panic("Meta has not been initialized yet")
}
m.enc = m.enc[:headSize] m.enc = m.enc[:headSize]
// Iterate over map and append entries, only adding tab if we're not on the // Iterate over map and append entries, only adding tab if we're not on the

View File

@ -31,6 +31,8 @@ package mts
import ( import (
"errors" "errors"
"fmt" "fmt"
"github.com/Comcast/gots/packet"
) )
// General mpegts packet properties. // General mpegts packet properties.
@ -55,11 +57,14 @@ const HeadSize = 4
// Consts relating to adaptation field. // Consts relating to adaptation field.
const ( const (
AdaptationIdx = 4 // Index to the adaptation field (index of AFL). AdaptationIdx = 4 // Index to the adaptation field (index of AFL).
AdaptationControlIdx = 3 // Index to octet with adaptation field control. AdaptationControlIdx = 3 // Index to octet with adaptation field control.
AdaptationFieldsIdx = AdaptationIdx + 1 // Adaptation field index is the index of the adaptation fields. AdaptationFieldsIdx = AdaptationIdx + 1 // Adaptation field index is the index of the adaptation fields.
DefaultAdaptationSize = 2 // Default size of the adaptation field. DefaultAdaptationSize = 2 // Default size of the adaptation field.
AdaptationControlMask = 0x30 // Mask for the adaptation field control in octet 3. AdaptationControlMask = 0x30 // Mask for the adaptation field control in octet 3.
DefaultAdaptationBodySize = 1 // Default size of the adaptation field body.
DiscontinuityIndicatorMask = 0x80 // Mask for the discontinuity indicator at the discontinuity indicator idk.
DiscontinuityIndicatorIdx = AdaptationIdx + 1 // The index at which the discontinuity indicator is found in an MTS packet.
) )
// TODO: make this better - currently doesn't make sense. // TODO: make this better - currently doesn't make sense.
@ -257,3 +262,52 @@ func (p *Packet) Bytes(buf []byte) []byte {
buf = append(buf, p.Payload...) buf = append(buf, p.Payload...)
return buf return buf
} }
type Option func(p *packet.Packet)
// addAdaptationField adds an adaptation field to p, and applys the passed options to this field.
// TODO: this will probably break if we already have adaptation field.
func addAdaptationField(p *packet.Packet, options ...Option) error {
if packet.ContainsAdaptationField((*packet.Packet)(p)) {
return errors.New("Adaptation field is already present in packet")
}
// Create space for adaptation field.
copy(p[HeadSize+DefaultAdaptationSize:], p[HeadSize:len(p)-DefaultAdaptationSize])
// TODO: seperate into own function
// Update adaptation field control.
p[AdaptationControlIdx] &= 0xff ^ AdaptationControlMask
p[AdaptationControlIdx] |= AdaptationControlMask
// Default the adaptationfield.
resetAdaptation(p)
// Apply and options that have bee passed.
for _, option := range options {
option(p)
}
return nil
}
// resetAdaptation sets fields in ps adaptation field to 0 if the adaptation field
// exists, otherwise an error is returned.
func resetAdaptation(p *packet.Packet) error {
if !packet.ContainsAdaptationField((*packet.Packet)(p)) {
return errors.New("No adaptation field in this packet")
}
p[AdaptationIdx] = DefaultAdaptationBodySize
p[AdaptationIdx+1] = 0x00
return nil
}
// DiscontinuityIndicator returns and Option that will set p's discontinuity
// indicator according to f.
func DiscontinuityIndicator(f bool) Option {
return func(p *packet.Packet) {
set := byte(DiscontinuityIndicatorMask)
if !f {
set = 0x00
}
p[DiscontinuityIndicatorIdx] &= 0xff ^ DiscontinuityIndicatorMask
p[DiscontinuityIndicatorIdx] |= DiscontinuityIndicatorMask & set
}
}