Merged in psi-improvements (pull request #79)

Psi improvements

Approved-by: Alan Noble <anoble@gmail.com>
This commit is contained in:
Saxon Milton 2018-12-17 11:13:30 +00:00
commit 081007b091
13 changed files with 1051 additions and 219 deletions

View File

@ -45,7 +45,7 @@ const (
progName = "revid-cli" progName = "revid-cli"
// Logging is set to INFO level. // Logging is set to INFO level.
defaultLogVerbosity = smartlogger.Info defaultLogVerbosity = smartlogger.Debug
) )
// Other misc consts // Other misc consts
@ -231,8 +231,9 @@ func handleFlags() revid.Config {
switch *verbosityPtr { switch *verbosityPtr {
case "No": case "No":
cfg.LogLevel = smartlogger.Fatal cfg.LogLevel = smartlogger.Fatal
case "Yes": case "Debug":
cfg.LogLevel = smartlogger.Debug cfg.LogLevel = smartlogger.Debug
//logger.SetLevel(smartlogger.Debug)
case "": case "":
default: default:
logger.Log(smartlogger.Error, pkg+"bad verbosity argument") logger.Log(smartlogger.Error, pkg+"bad verbosity argument")

View File

@ -65,6 +65,7 @@ type Config struct {
IntraRefreshPeriod string IntraRefreshPeriod string
RtpAddress string RtpAddress string
Logger Logger Logger Logger
SendRetry bool
} }
// Enums for config struct // Enums for config struct
@ -103,7 +104,7 @@ const (
defaultIntraRefreshPeriod = "100" defaultIntraRefreshPeriod = "100"
defaultTimeout = "0" defaultTimeout = "0"
defaultQuantization = "40" defaultQuantization = "40"
defaultBitrate = "500000" defaultBitrate = "400000"
defaultQuantizationMode = QuantizationOff defaultQuantizationMode = QuantizationOff
defaultFramesPerClip = 1 defaultFramesPerClip = 1
defaultVerticalFlip = No defaultVerticalFlip = No
@ -188,7 +189,6 @@ func (c *Config) Validate(r *Revid) error {
switch c.Output1 { switch c.Output1 {
case File: case File:
case Rtp:
case Udp: case Udp:
case Rtmp, FfmpegRtmp: case Rtmp, FfmpegRtmp:
if c.RtmpUrl == "" { if c.RtmpUrl == "" {
@ -204,7 +204,7 @@ func (c *Config) Validate(r *Revid) error {
defaultOutput) defaultOutput)
c.Output1 = defaultOutput c.Output1 = defaultOutput
fallthrough fallthrough
case Http: case Http, Rtp:
c.Logger.Log(smartlogger.Info, pkg+"defaulting frames per clip for http out", c.Logger.Log(smartlogger.Info, pkg+"defaulting frames per clip for http out",
"framesPerClip", httpFramesPerClip) "framesPerClip", httpFramesPerClip)
c.FramesPerClip = httpFramesPerClip c.FramesPerClip = httpFramesPerClip

View File

@ -125,6 +125,9 @@ type Revid struct {
isRunning bool isRunning bool
} }
var now = time.Now()
var prevTime = now
// packer takes data segments and packs them into clips // packer takes data segments and packs them into clips
// of the number frames specified in the owners config. // of the number frames specified in the owners config.
type packer struct { type packer struct {
@ -153,9 +156,11 @@ func (p *packer) Write(frame []byte) (int, error) {
return n, err return n, err
} }
p.packetCount++ p.packetCount++
if p.packetCount >= p.owner.config.FramesPerClip { now = time.Now()
if now.Sub(prevTime) > clipDuration && p.packetCount%7 == 0 {
p.owner.buffer.Flush() p.owner.buffer.Flush()
p.packetCount = 0 p.packetCount = 0
prevTime = now
} }
return len(frame), nil return len(frame), nil
} }
@ -374,6 +379,8 @@ loop:
err = dest.send() err = dest.send()
if err == nil { if err == nil {
r.config.Logger.Log(smartlogger.Debug, pkg+"sent clip to output "+strconv.Itoa(i)) r.config.Logger.Log(smartlogger.Debug, pkg+"sent clip to output "+strconv.Itoa(i))
} else if r.config.SendRetry == false {
r.config.Logger.Log(smartlogger.Warning, pkg+"send to output "+strconv.Itoa(i)+"failed", "error", err.Error())
} else { } else {
r.config.Logger.Log(smartlogger.Error, pkg+"send to output "+strconv.Itoa(i)+ r.config.Logger.Log(smartlogger.Error, pkg+"send to output "+strconv.Itoa(i)+
"failed, trying again", "error", err.Error()) "failed, trying again", "error", err.Error())

View File

@ -29,12 +29,14 @@ LICENSE
package revid package revid
import ( import (
"fmt"
"io" "io"
"net" "net"
"os" "os"
"os/exec" "os/exec"
"bitbucket.org/ausocean/av/rtmp" "bitbucket.org/ausocean/av/rtmp"
"bitbucket.org/ausocean/av/stream/mts"
"bitbucket.org/ausocean/av/stream/rtp" "bitbucket.org/ausocean/av/stream/rtp"
"bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/ring" "bitbucket.org/ausocean/utils/ring"
@ -143,11 +145,44 @@ func (s *httpSender) send() error {
} }
} }
var err error var err error
var reply string
if send { if send {
_, _, err = s.client.Send(netsender.RequestRecv, pins) reply, _, err = s.client.Send(netsender.RequestRecv, pins)
} if err != nil {
return err return err
} }
}
return s.extractMeta(reply)
}
// extractMeta looks at a reply at extracts any time or location data - then used
// to update time and location information in the mpegts encoder.
func (s *httpSender) extractMeta(r string) error {
dec, err := netsender.NewJSONDecoder(r)
if err != nil {
return nil
}
// Extract time from reply
t, err := dec.Int("ts")
if err != nil {
s.log(smartlogger.Warning, pkg+"No timestamp in reply")
} else {
s.log(smartlogger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t))
mts.SetTimeStamp(uint64(t))
}
// Extract location from reply
g, err := dec.String("ll")
if err != nil {
s.log(smartlogger.Warning, pkg+"No location in reply")
} else {
s.log(smartlogger.Debug, fmt.Sprintf("%v got location: %v", pkg, g))
mts.SetLocation(g)
}
return nil
}
func (s *httpSender) release() { func (s *httpSender) release() {
// We will not retry, so release // We will not retry, so release

View File

@ -29,118 +29,38 @@ LICENSE
package mts package mts
import ( import (
"encoding/binary"
"hash/crc32"
"io" "io"
"math/bits"
"time" "time"
"bitbucket.org/ausocean/av/stream/mts/pes" "bitbucket.org/ausocean/av/stream/mts/pes"
"bitbucket.org/ausocean/av/stream/mts/psi"
) )
const ( const (
psiPacketSize = 184 psiPacketSize = 184
psiSendCount = 100 psiSendCount = 7
) )
// TODO: Finish off mts/psi so that we can create pat and pmt tables instead type MetaData struct {
// of hardcoding. time uint64
location string
}
var metaData = MetaData{time: 0, location: ""}
func SetTimeStamp(t uint64) {
metaData.time = t
}
func SetLocation(g string) {
metaData.location = g
}
var ( var (
patTable = []byte{ patTable = psi.StdPat.Bytes()
0x00, // pointer pmtTable = psi.StdPmtTimeLocation.Bytes()
// ---- section included in data sent to CRC32 during check
// table header
0x00, // table id
0xb0, // section syntax indicator:1|private bit:1|reserved:2|section length:2|more bytes...:2
0x0d, // more bytes...
// syntax section
0x00, 0x01, // table id extension
0xc1, // reserved bits:3|version:5|use now:1
0x00, // section number
0x00, // last section number
// table data
0x00, 0x01, // Program number
0xf0, 0x00, // reserved:3|program map PID:13
// 0x2a, 0xb1, 0x04, 0xb2, // CRC
// ----
}
pmtTable = []byte{
0x00, // pointer
// ---- section included in data sent to CRC32 during check
// table header
0x02, // table id
0xb0, // section syntax indicator:1|private bit:1|reserved:2|section length:2|more bytes...:2
0x12, // more bytes...
// syntax section
0x00, 0x01, // table id extension
0xc1, // reserved bits:3|version:5|use now:1
0x00, // section number
0x00, // last section number
// table data
0xe1, 0x00, // reserved:3|PCR PID:13
0xf0, 0x00, // reserved:4|unused:2|program info length:10
// No program descriptors since program info length is 0.
// elementary stream info data
0x1b, // stream type
0xe1, 0x00, // reserved:3|elementary PID:13
0xf0, 0x00, // reserved:4|unused:2|ES info length:10
// No elementary stream descriptors since ES info length is 0.
// 0x15, 0xbd, 0x4d, 0x56, // CRC
// ----
}
) )
func init() {
// Generate IEEE polynomial table
// for the big-endian algorithm.
crcTable := crc32_MakeTable(bits.Reverse32(crc32.IEEE))
patTable = completePSI(patTable, crcTable)
pmtTable = completePSI(pmtTable, crcTable)
}
func completePSI(psi []byte, tab *crc32.Table) []byte {
var buf [4]byte
crc := crc32_Update(0xffffffff, tab, psi[1:])
binary.BigEndian.PutUint32(buf[:], crc)
dst := make([]byte, len(psi), psiPacketSize)
copy(dst, psi)
dst = append(dst, buf[:]...)
for len(dst) < cap(dst) {
dst = append(dst, 0xff)
}
return dst
}
func crc32_MakeTable(poly uint32) *crc32.Table {
var t crc32.Table
for i := range t {
crc := uint32(i) << 24
for j := 0; j < 8; j++ {
if crc&0x80000000 != 0 {
crc = (crc << 1) ^ poly
} else {
crc <<= 1
}
}
t[i] = crc
}
return &t
}
func crc32_Update(crc uint32, tab *crc32.Table, p []byte) uint32 {
for _, v := range p {
crc = tab[byte(crc>>24)^v] ^ (crc << 8)
}
return crc
}
const ( const (
sdtPid = 17 sdtPid = 17
patPid = 0 patPid = 0
@ -167,7 +87,7 @@ type Encoder struct {
frameInterval time.Duration frameInterval time.Duration
ptsOffset time.Duration ptsOffset time.Duration
psiCount uint psiCount int
continuity map[int]byte continuity map[int]byte
} }
@ -201,13 +121,7 @@ const (
// generate handles the incoming data and generates equivalent mpegts packets - // generate handles the incoming data and generates equivalent mpegts packets -
// sending them to the output channel // sending them to the output channel
func (e *Encoder) Encode(nalu []byte) error { func (e *Encoder) Encode(nalu []byte) error {
if e.psiCount <= 0 {
err := e.writePSI()
if err != nil {
return err
}
}
e.psiCount--
// Prepare PES data. // Prepare PES data.
pesPkt := pes.Packet{ pesPkt := pes.Packet{
StreamID: streamID, StreamID: streamID,
@ -237,6 +151,13 @@ func (e *Encoder) Encode(nalu []byte) error {
pkt.PCR = e.pcr() pkt.PCR = e.pcr()
pusi = false pusi = false
} }
if e.psiCount <= 0 {
err := e.writePSI()
if err != nil {
return err
}
}
e.psiCount--
_, err := e.dst.Write(pkt.Bytes()) _, err := e.dst.Write(pkt.Bytes())
if err != nil { if err != nil {
return err return err
@ -248,6 +169,8 @@ func (e *Encoder) Encode(nalu []byte) error {
return nil return nil
} }
// writePSI creates mpegts with pat and pmt tables - with pmt table having updated
// location and time data.
func (e *Encoder) writePSI() error { func (e *Encoder) writePSI() error {
// Write PAT // Write PAT
patPkt := Packet{ patPkt := Packet{
@ -255,20 +178,30 @@ func (e *Encoder) writePSI() error {
PID: patPid, PID: patPid,
CC: e.ccFor(patPid), CC: e.ccFor(patPid),
AFC: hasPayload, AFC: hasPayload,
Payload: patTable, Payload: addPadding(patTable),
} }
_, err := e.dst.Write(patPkt.Bytes()) _, err := e.dst.Write(patPkt.Bytes())
if err != nil { if err != nil {
return err return err
} }
// Write PMT. // Update pmt table time and location
err = psi.UpdateTime(pmtTable, metaData.time)
if err != nil {
return err
}
err = psi.UpdateLocation(pmtTable, metaData.location)
if err != nil {
return nil
}
// Create mts packet from pmt table
pmtPkt := Packet{ pmtPkt := Packet{
PUSI: true, PUSI: true,
PID: pmtPid, PID: pmtPid,
CC: e.ccFor(pmtPid), CC: e.ccFor(pmtPid),
AFC: hasPayload, AFC: hasPayload,
Payload: pmtTable, Payload: addPadding(pmtTable),
} }
_, err = e.dst.Write(pmtPkt.Bytes()) _, err = e.dst.Write(pmtPkt.Bytes())
if err != nil { if err != nil {
@ -278,6 +211,15 @@ func (e *Encoder) writePSI() error {
return nil return nil
} }
// addPadding adds an appropriate amount of padding to a pat or pmt table for
// addition to an mpegts packet
func addPadding(d []byte) []byte {
for len(d) < psiPacketSize {
d = append(d, 0xff)
}
return d
}
// tick advances the clock one frame interval. // tick advances the clock one frame interval.
func (e *Encoder) tick() { func (e *Encoder) tick() {
e.clock += e.frameInterval e.clock += e.frameInterval

View File

@ -28,6 +28,10 @@ LICENSE
package mts package mts
import (
"errors"
)
const ( const (
mpegTsSize = 188 mpegTsSize = 188
mpegtsPayloadSize = 176 mpegtsPayloadSize = 176
@ -123,6 +127,22 @@ type Packet struct {
Payload []byte // Mpeg ts Payload Payload []byte // Mpeg ts Payload
} }
// FindPMT will take a clip of mpegts and try to find a PMT table - if one
// is found, then it is returned, otherwise nil and an error is returned.
func FindPMT(d []byte) (p []byte, err error) {
if len(d) < mpegTsSize {
return nil, errors.New("Mmpegts data not of valid length")
}
for i := 0; i < len(d); i += mpegTsSize {
pid := (uint16(d[i+1]&0x1f) << 8) | uint16(d[i+2])
if pid == pmtPid {
p = d[i+4 : i+mpegTsSize]
return
}
}
return nil, errors.New("Could not find pmt table in mpegts data")
}
// FillPayload takes a channel and fills the packets Payload field until the // FillPayload takes a channel and fills the packets Payload field until the
// channel is empty or we've the packet reaches capacity // channel is empty or we've the packet reaches capacity
func (p *Packet) FillPayload(data []byte) int { func (p *Packet) FillPayload(data []byte) int {

145
stream/mts/psi/op.go Normal file
View File

@ -0,0 +1,145 @@
/*
NAME
op.go
DESCRIPTION
op.go provides functionality for editing and reading bytes slices
directly in order to insert/read timestamp and location data in psi.
AUTHOR
Saxon Milton <saxon@ausocean.org>
LICENSE
op.go is Copyright (C) 2018 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package psi
import (
"bytes"
"encoding/binary"
"errors"
"hash/crc32"
"math/bits"
)
// TimeBytes takes a timestamp as an uint64 and converts to an 8 byte slice -
// allows for updating of timestamp in pmt time descriptor.
func TimeBytes(t uint64) []byte {
var s [timeDataSize]byte
binary.BigEndian.PutUint64(s[:], t)
return s[:]
}
// HasTime takes a psi as a byte slice and checks to see if it has a time descriptor
// - if so return nil, otherwise return error
func HasTime(p []byte) error {
if p[timeTagIndx] != timeDescTag {
return errors.New("PSI does not contain a time descriptor, cannot update")
}
return nil
}
// HasLocation takes a psi as a byte slice and checks to see if it has a location descriptor
// - if so return nil, otherwise return error
func HasLocation(p []byte) error {
if p[locationTagIndx] != locationDescTag {
return errors.New("PSI does not contain a location descriptor, cannot update")
}
return nil
}
// UpdateTime takes the byte slice representation of a psi-pmt as well as a time
// as an integer and attempts to update the time descriptor in the pmt with the
// given time if the time descriptor exists, otherwise an error is returned
func UpdateTime(dst []byte, t uint64) error {
err := HasTime(dst)
if err != nil {
return err
}
ts := TimeBytes(uint64(t))
for i := range dst[timeDataIndx : timeDataIndx+timeDataSize] {
dst[i+timeDataIndx] = ts[i]
}
dst = updateCrc(dst)
return nil
}
// TimeFrom takes a byte slice representation of a psi-pmt and extracts it's
// timestamp, returning as a uint64 if it exists, otherwise returning 0 and nil
// if it does not exist
func TimeFrom(p []byte) (t uint64, err error) {
err = HasTime(p)
if err != nil {
return 0, err
}
t = binary.BigEndian.Uint64(p[timeDataIndx : timeDataIndx+timeDataSize])
return t, nil
}
// LocationFrom takes a byte slice representation of a psi-pmt and extracts it's
// timestamp, returning as a uint64 if it exists, otherwise returning 0 and nil
// if it does not exist
func LocationFrom(p []byte) (g string, err error) {
err = HasLocation(p)
if err != nil {
return "", err
}
gBytes := p[locationDataIndx : locationDataIndx+locationDataSize]
gBytes = bytes.Trim(gBytes, "\x00")
g = string(gBytes)
return g, nil
}
// LocationStrBytes take a string of location data and converts to a 32 byte slice -
// easy update of slice representation of a pmt with location descriptor
func LocationStrBytes(s string) []byte {
var b [locationDataSize]byte
copy(b[:], s)
return b[:]
}
// UpdateLocation takes a byte slice representation of a psi-pmt containing a location
// descriptor and attempts to update the location data value with the passed string.
// If the psi does not contain a location descriptor, and error is returned.
func UpdateLocation(d []byte, s string) error {
err := HasLocation(d)
if err != nil {
return err
}
gb := LocationStrBytes(s)
for i := range d[locationDataIndx : locationDataIndx+locationDataSize] {
d[i+locationDataIndx] = gb[i]
}
d = updateCrc(d)
return nil
}
// addCrc appends a crc table to a given psi table in bytes
func addCrc(out []byte) []byte {
out = append(out, make([]byte, 4)...)
out = updateCrc(out)
return out
}
// updateCrc updates the crc of psi bytes slice that may have been modified
func updateCrc(out []byte) []byte {
crc32 := crc32_Update(0xffffffff, crc32_MakeTable(bits.Reverse32(crc32.IEEE)), out[1:len(out)-4])
out[len(out)-4] = byte(crc32 >> 24)
out[len(out)-3] = byte(crc32 >> 16)
out[len(out)-2] = byte(crc32 >> 8)
out[len(out)-1] = byte(crc32)
return out
}

View File

@ -1,51 +0,0 @@
/*
NAME
revid - a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols.
DESCRIPTION
See Readme.md
AUTHOR
Alan Noble <anoble@gmail.com>
LICENSE
revid is Copyright (C) 2017 Alan Noble.
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses).
*/
package psi
type PAT struct {
PF byte // Point field
PFB []byte // pointer filler bytes
TableID byte // Table ID
SSI bool // Sectiopn syntax indicator (1 for PAT, PMT, CAT)
PB bool // Private bit (0 for PAT, PMT, CAT)
SL uint16 // Section length
TIE uint16 // Table ID extension
Version byte // Version number
CNI bool // Current/next indicator
Section byte // Section number
LSN byte // Last section number
DT byte // Descriptor tag
DL byte // Descriptor length
Program uint16 // Program number
PMPID uint16 // Program map PID
CRC32 uint32 // Checksum of table
}
func (p *PAT) ToByteSlice() (output []byte) {
return
}

View File

@ -1,47 +0,0 @@
/*
NAME
revid - a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols.
DESCRIPTION
See Readme.md
AUTHOR
Alan Noble <anoble@gmail.com>
LICENSE
revid is Copyright (C) 2017 Alan Noble.
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses).
*/
package psi
type PMT struct {
PF byte // Point field
PFB []byte // pointer filler bytes
TableID byte // Table ID
SSI bool // Sectiopn syntax indicator (1 for PAT, PMT, CAT)
PB bool // Private bit (0 for PAT, PMT, CAT)
SL uint16 // Section length
TIE uint16 // Table ID extension
Version byte // Version number
CNI bool // Current/next indicator
Section byte // Section number
LSN byte // Last section number
}
func (p *PMT) ToByteSlice() (output []byte) {
return
}

333
stream/mts/psi/psi.go Normal file
View File

@ -0,0 +1,333 @@
/*
NAME
psi.go
DESCRIPTION
See Readme.md
AUTHOR
Saxon Milton <saxon@ausocean.org>
LICENSE
psi.go is Copyright (C) 2018 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package psi
import (
"hash/crc32"
)
// Lengths of section definitions
const (
ESSDDefLen = 5
DescDefLen = 2
PMTDefLen = 4
PATLen = 4
TSSDefLen = 5
PSIDefLen = 3
)
// Table Type IDs
const (
PATTableID = 0x00
PMTTableID = 0x02
)
// Consts relating to time description
const (
timeDescTag = 234
timeTagIndx = 13
timeDataIndx = 15
timeDataSize = 8 // bytes, because time is stored in uint64
)
// Consts relating to location description
const (
locationDescTag = 235
locationTagIndx = 23
locationDataIndx = 25
locationDataSize = 32 // bytes
)
// Program specific information
type PSI struct {
Pf byte // Point field
Pfb []byte // Pointer filler bytes
Tid byte // Table ID
Ssi bool // Section syntax indicator (1 for PAT, PMT, CAT)
Pb bool // Private bit (0 for PAT, PMT, CAT)
Sl uint16 // Section length
Tss *TSS // Table syntax section (length defined by SL) if length 0 then nil
Crc uint32 // crc32 of entire table excluding pointer field, pointer filler bytes and the trailing CRC32
}
// Table syntax section
type TSS struct {
Tide uint16 // Table ID extension
V byte // Version number
Cni bool // Current/next indicator
Sn byte // Section number
Lsn byte // Last section number
Sd SD // Specific data PAT/PMT
}
// Specific Data, (could be PAT or PMT)
type SD interface {
Bytes() []byte
}
// Program association table, implements SD
type PAT struct {
Pn uint16 // Program Number
Pmpid uint16 // Program map PID
}
// Program mapping table, implements SD
type PMT struct {
Pcrpid uint16 // Program clock reference pid
Pil uint16 // Program info length
Pd []Desc // Program descriptors
Essd *ESSD // Elementary stream specific data
}
// Elementary stream specific data
type ESSD struct {
St byte // Stream type
Epid uint16 // Elementary pid
Esil uint16 // Elementary stream
Esd []Desc // Elementary stream desriptors
}
// Descriptor
type Desc struct {
Dt byte // Descriptor tag
Dl byte // Descriptor length
Dd []byte // Descriptor data
}
// ReadPSI creates a PSI data structure from a given byte slice that represents a PSI
func ReadPSI(data []byte) *PSI {
psi := PSI{}
pos := 0
psi.Pf = data[pos]
if psi.Pf != 0 {
panic("No support for pointer filler bytes")
}
psi.Tid = data[pos]
pos++
psi.Ssi = byteToBool(data[pos] & 0x80)
psi.Pb = byteToBool(data[pos] & 0x40)
psi.Sl = uint16(data[pos]&0x03)<<8 | uint16(data[pos+1])
pos += 2
psi.Tss = readTSS(data[pos:], &psi)
return &psi
}
// ReadTSS creates a TSS data structure from a given byte slice that represents a TSS
func readTSS(data []byte, p *PSI) *TSS {
tss := TSS{}
pos := 0
tss.Tide = uint16(data[pos])<<8 | uint16(data[pos+1])
pos += 2
tss.V = (data[pos] & 0x3e) >> 1
tss.Cni = byteToBool(data[pos] & 0x01)
pos++
tss.Sn = data[pos]
pos++
tss.Lsn = data[pos]
pos++
switch p.Tid {
case PATTableID:
tss.Sd = readPAT(data[pos:], &tss)
case PMTTableID:
tss.Sd = readPMT(data[pos:], &tss)
default:
panic("Can't yet deal with tables that are not PAT or PMT")
}
return &tss
}
// readPAT creates a pat struct based on a bytes slice representing a pat
func readPAT(data []byte, p *TSS) *PAT {
pat := PAT{}
pos := 0
pat.Pn = uint16(data[pos])<<8 | uint16(data[pos+1])
pos += 2
pat.Pmpid = uint16(data[pos]&0x1f)<<8 | uint16(data[pos+1])
return &pat
}
// readPMT creates a pmt struct based on a bytes slice that represents a pmt
func readPMT(data []byte, p *TSS) *PAT {
pmt := PMT{}
pos := 0
pmt.Pcrpid = uint16(data[pos]&0x1f)<<8 | uint16(data[pos+1])
pos += 2
pmt.Pil = uint16(data[pos]&0x03)<<8 | uint16(data[pos+1])
pos += 2
if pmt.Pil != 0 {
pmt.Pd = readDescs(data[pos:], int(pmt.Pil))
}
pos += int(pmt.Pil)
// TODO Read ES stuff
pmt.Essd = readEssd(data[pos:])
return nil
}
// readDescs reads provides a slice of Descs given a byte slice that represents Descs
// and the no of bytes that the descs accumilate
func readDescs(data []byte, descLen int) (o []Desc) {
pos := 0
o = make([]Desc, 1)
o[0].Dt = data[pos]
pos++
o[0].Dl = data[pos]
pos++
o[0].Dd = make([]byte, o[0].Dl)
for i := 0; i < int(o[0].Dl); i++ {
o[0].Dd[i] = data[pos]
pos++
}
if 2+len(o[0].Dd) != descLen {
panic("No support for reading more than one descriptor")
}
return
}
// readEESD creates an ESSD struct based on a bytes slice that represents ESSD
func readEssd(data []byte) *ESSD {
essd := ESSD{}
pos := 0
essd.St = data[pos]
pos++
essd.Epid = uint16(data[pos]&0x1f)<<8 | uint16(data[pos+1])
pos += 2
essd.Esil = uint16(data[pos]&0x03)<<8 | uint16(data[pos+1])
pos += 2
essd.Esd = readDescs(data[pos:], int(essd.Esil))
return &essd
}
// Bytes outputs a byte slice representation of the PSI
func (p *PSI) Bytes() []byte {
out := make([]byte, 4)
out[0] = p.Pf
if p.Pf != 0 {
panic("No support for pointer filler bytes")
}
out[1] = p.Tid
out[2] = 0x80 | 0x30 | (0x03 & byte(p.Sl>>8))
out[3] = byte(p.Sl)
out = append(out, p.Tss.Bytes()...)
out = addCrc(out)
return out
}
// Bytes outputs a byte slice representation of the TSS
func (t *TSS) Bytes() []byte {
out := make([]byte, TSSDefLen)
out[0] = byte(t.Tide >> 8)
out[1] = byte(t.Tide)
out[2] = 0xc0 | (0x3e & (t.V << 1)) | (0x01 & boolToByte(t.Cni))
out[3] = t.Sn
out[4] = t.Lsn
out = append(out, t.Sd.Bytes()...)
return out
}
// Bytes outputs a byte slice representation of the PAT
func (p *PAT) Bytes() []byte {
out := make([]byte, PATLen)
out[0] = byte(p.Pn >> 8)
out[1] = byte(p.Pn)
out[2] = 0xe0 | (0x1f & byte(p.Pmpid>>8))
out[3] = byte(p.Pmpid)
return out
}
// Bytes outputs a byte slice representation of the PMT
func (p *PMT) Bytes() []byte {
out := make([]byte, PMTDefLen)
out[0] = 0xe0 | (0x1f & byte(p.Pcrpid>>8)) // byte 10
out[1] = byte(p.Pcrpid)
out[2] = 0xf0 | (0x03 & byte(p.Pil>>8))
out[3] = byte(p.Pil)
for _, d := range p.Pd {
out = append(out, d.Bytes()...)
}
out = append(out, p.Essd.Bytes()...)
return out
}
// Bytes outputs a byte slice representation of the Desc
func (d *Desc) Bytes() []byte {
out := make([]byte, DescDefLen)
out[0] = d.Dt
out[1] = d.Dl
out = append(out, d.Dd...)
return out
}
// Bytes outputs a byte slice representation of the ESSD
func (e *ESSD) Bytes() []byte {
out := make([]byte, ESSDDefLen)
out[0] = e.St
out[1] = 0xe0 | (0x1f & byte(e.Epid>>8))
out[2] = byte(e.Epid)
out[3] = 0xf0 | (0x03 & byte(e.Esil>>8))
out[4] = byte(e.Esil)
for _, d := range e.Esd {
out = append(out, d.Bytes()...)
}
return out
}
func boolToByte(b bool) (o byte) {
if b {
o = 0x01
}
return
}
func byteToBool(b byte) (o bool) {
if b != 0 {
o = true
}
return
}
func crc32_MakeTable(poly uint32) *crc32.Table {
var t crc32.Table
for i := range t {
crc := uint32(i) << 24
for j := 0; j < 8; j++ {
if crc&0x80000000 != 0 {
crc = (crc << 1) ^ poly
} else {
crc <<= 1
}
}
t[i] = crc
}
return &t
}
func crc32_Update(crc uint32, tab *crc32.Table, p []byte) uint32 {
for _, v := range p {
crc = tab[byte(crc>>24)^v] ^ (crc << 8)
}
return crc
}

281
stream/mts/psi/psi_test.go Normal file
View File

@ -0,0 +1,281 @@
/*
NAME
psi_test.go
DESCRIPTION
See Readme.md
AUTHOR
Saxon Milton <saxon@ausocean.org>
LICENSE
psi_test.go is Copyright (C) 2018 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package psi
import (
"bytes"
"testing"
)
// Times as ints for testing
const (
tstTime1 = 1235367435 // 0x49A2360B
tstTime2 = 1735357535 // 0x676F745F
)
// GPS string for testing
// TODO: make these realistic
const (
locationTstStr1 = "$GPGGA,123519,4807.038,N,01131.0"
locationTstStr2 = "$GPGGA,183710,4902.048,N,02171.0"
)
// err message
const (
errCmp = "Incorrect output, for: %v wanted: %v, got: %v"
)
// Test time to bytes test Data
var (
timeSlice = []byte{
0x00, 0x00, 0x00, 0x00, 0x49, 0xA2, 0x36, 0x0B,
}
)
// Parts to construct bytes of pmt with time and bytes
var (
pmtTimeLocationBytesPart1 = []byte{
0x00, 0x02, 0xb0, 0x12, 0x00, 0x01, 0xc1, 0x00, 0x00, 0xe1, 0x00, 0xf0, 0x0a,
byte(timeDescTag), // Descriptor tag for timestamp
byte(timeDataSize), // Length of bytes to follow
0x00, 0x00, 0x00, 0x00, 0x67, 0x6F, 0x74, 0x5F, // Timestamp data
byte(locationDescTag), // Descriptor tag for location
byte(locationDataSize), // Length of bytes to follow
}
pmtTimeLocationBytesPart2 = []byte{
0x1b, 0xe1, 0x00, 0xf0, 0x00,
}
)
var (
// Bytes representing pmt with tstTime1
pmtTimeBytes1 = []byte{
0x00, 0x02, 0xb0, 0x12, 0x00, 0x01, 0xc1, 0x00, 0x00, 0xe1, 0x00, 0xf0, 0x0a,
byte(timeDescTag), // Descriptor tag
byte(timeDataSize), // Length of bytes to follow
0x00, 0x00, 0x00, 0x00, 0x49, 0xA2, 0x36, 0x0B, // timestamp
0x1b, 0xe1, 0x00, 0xf0, 0x00,
}
// Bytes representing pmt with tstTime 2
pmtTimeBytes2 = []byte{
0x00, 0x02, 0xb0, 0x12, 0x00, 0x01, 0xc1, 0x00, 0x00, 0xe1, 0x00, 0xf0, 0x0a,
byte(timeDescTag), // Descriptor tag
byte(timeDataSize), // Length of bytes to follow
0x00, 0x00, 0x00, 0x00, 0x67, 0x6F, 0x74, 0x5F, // timestamp
0x1b, 0xe1, 0x00, 0xf0, 0x00,
}
// Bytes representing pmt with time1 and location1
pmtTimeLocationBytes1 = buildPmtTimeLocationBytes(locationTstStr1)
// bytes representing pmt with with time1 and location 2
pmtTimeLocationBytes2 = buildPmtTimeLocationBytes(locationTstStr2)
)
// bytesTests contains data for testing the Bytes() funcs for the PSI data struct
var bytesTests = []struct {
name string
input PSI
want []byte
}{
// Pat test
{
name: "pat Bytes()",
input: StdPat,
want: StdPatBytes,
},
// Pmt test data no descriptor
{
name: "pmt to Bytes() without descriptors",
input: StdPmt,
want: StdPmtBytes,
},
// Pmt with time descriptor
{
name: "pmt to Bytes() with time descriptor",
input: PSI{
Pf: 0x00,
Tid: 0x02,
Ssi: true,
Sl: uint16(0x12),
Tss: &TSS{
Tide: uint16(0x01),
V: 0,
Cni: true,
Sn: 0,
Lsn: 0,
Sd: &PMT{
Pcrpid: 0x0100, // wrong
Pil: 10,
Pd: []Desc{
Desc{
Dt: byte(timeDescTag),
Dl: byte(timeDataSize),
Dd: TimeBytes(tstTime1),
},
},
Essd: &ESSD{
St: 0x1b,
Epid: 0x0100,
Esil: 0x00,
},
},
},
},
want: pmtTimeBytes1,
},
// Pmt with time and location
{
name: "pmt Bytes() with time and location",
input: PSI{
Pf: 0x00,
Tid: 0x02,
Ssi: true,
Sl: uint16(0x12),
Tss: &TSS{
Tide: uint16(0x01),
V: 0,
Cni: true,
Sn: 0,
Lsn: 0,
Sd: &PMT{
Pcrpid: 0x0100, // wrong
Pil: 10,
Pd: []Desc{
Desc{
Dt: byte(timeDescTag),
Dl: byte(timeDataSize),
Dd: TimeBytes(tstTime2),
},
Desc{
Dt: byte(locationDescTag),
Dl: byte(locationDataSize),
Dd: LocationStrBytes(locationTstStr1),
},
},
Essd: &ESSD{
St: 0x1b,
Epid: 0x0100,
Esil: 0x00,
},
},
},
},
want: buildPmtTimeLocationBytes(locationTstStr1),
},
}
// TestBytes ensures that the Bytes() funcs are working correctly to take PSI
// structs and convert them to byte slices
func TestBytes(t *testing.T) {
for _, test := range bytesTests {
got := test.input.Bytes()
// Remove crc32s
got = got[:len(got)-4]
if !bytes.Equal(got, test.want) {
t.Errorf("unexpected error for test %v: got:%v want:%v", test.name, got,
test.want)
}
}
}
// TestTimestampToBytes is a quick sanity check of the int64 time to []byte func
func TestTimestampToBytes(t *testing.T) {
tb := TimeBytes(tstTime1)
if !bytes.Equal(timeSlice, tb) {
t.Errorf(errCmp, "testTimeStampToBytes", timeSlice, tb)
}
}
// TestTimeUpdate checks to see if we can correctly update the timstamp in pmt
func TestTimeUpdate(t *testing.T) {
cpy := make([]byte, len(pmtTimeBytes1))
copy(cpy, pmtTimeBytes1)
cpy = addCrc(cpy)
err := UpdateTime(cpy, tstTime2)
cpy = cpy[:len(cpy)-4]
if err != nil {
t.Errorf("Update time returned err: %v", err)
}
if !bytes.Equal(pmtTimeBytes2, cpy) {
t.Errorf(errCmp, "TestTimeUpdate", pmtTimeBytes2, cpy)
}
}
// TestTimeGet tsts to see if we can correctly get the timestamp from a pmt
func TestTimeGet(t *testing.T) {
s, err := TimeFrom(pmtTimeBytes1)
if err != nil {
t.Errorf("Getting timestamp failed with err: %v", err)
}
if s != tstTime1 {
t.Errorf(errCmp, "TestTimeGet", tstTime1, s)
}
}
// TestLocationGet checks that we can correctly get location data from a pmt table
func TestLocationGet(t *testing.T) {
pb := StdPmtTimeLocation.Bytes()
err := UpdateLocation(pb, locationTstStr1)
if err != nil {
t.Errorf("Error for TestLocationGet UpdateLocation(pb, locationTstStr1): %v", err)
}
g, err := LocationFrom(pb)
if err != nil {
t.Errorf("Error for TestLocationGet LocationOf(pb): %v", err)
}
if g != locationTstStr1 {
t.Errorf(errCmp, "TestLocationGet", locationTstStr1, g)
}
}
// TestLocationUpdate checks to see if we can update the location string in a pmt correctly
func TestLocationUpdate(t *testing.T) {
cpy := make([]byte, len(pmtTimeLocationBytes1))
copy(cpy, pmtTimeLocationBytes1)
cpy = addCrc(cpy)
err := UpdateLocation(cpy, locationTstStr2)
cpy = cpy[:len(cpy)-4]
if err != nil {
t.Errorf("Update time returned err: %v", err)
}
if !bytes.Equal(pmtTimeLocationBytes2, cpy) {
t.Errorf(errCmp, "TestLocationUpdate", pmtTimeLocationBytes2, cpy)
}
}
// buildPmtTimeLocationBytes is a helper function to help construct the byte slices
// for pmts with time and location, as the location data field is 32 bytes, i.e. quite large
// to type out
func buildPmtTimeLocationBytes(tstStr string) []byte {
return append(append(append(make([]byte, 0), pmtTimeLocationBytesPart1...),
LocationStrBytes(tstStr)...), pmtTimeLocationBytesPart2...)
}

166
stream/mts/psi/std.go Normal file
View File

@ -0,0 +1,166 @@
/*
NAME
std.go
DESCRIPTION
See Readme.md
AUTHOR
Saxon Milton <saxon@ausocean.org>
LICENSE
std.go is Copyright (C) 2018 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package psi
const (
pmtTimeLocationPil = 44
)
// Some common manifestations of PSI
var (
// PSI struct to represent basic pat
StdPat = PSI{
Pf: 0x00,
Tid: 0x00,
Ssi: true,
Pb: false,
Sl: uint16(0x0d),
Tss: &TSS{
Tide: uint16(0x01),
V: 0,
Cni: true,
Sn: 0,
Lsn: 0,
Sd: &PAT{
Pn: uint16(0x01),
Pmpid: uint16(0x1000),
},
},
}
// PSI struct to represent basic pmt without descriptors for time and location
StdPmt = PSI{
Pf: 0x00,
Tid: 0x02,
Ssi: true,
Sl: uint16(0x12),
Tss: &TSS{
Tide: uint16(0x01),
V: 0,
Cni: true,
Sn: 0,
Lsn: 0,
Sd: &PMT{
Pcrpid: 0x0100, // wrong
Pil: 0,
Essd: &ESSD{
St: 0x1b,
Epid: 0x0100,
Esil: 0x00,
},
},
},
}
// Std pmt with time and location descriptors, time and location fields are zeroed out
StdPmtTimeLocation = PSI{
Pf: 0x00,
Tid: 0x02,
Ssi: true,
Sl: uint16(0x3e),
Tss: &TSS{
Tide: uint16(0x01),
V: 0,
Cni: true,
Sn: 0,
Lsn: 0,
Sd: &PMT{
Pcrpid: 0x0100,
Pil: pmtTimeLocationPil,
Pd: []Desc{
Desc{
Dt: byte(timeDescTag),
Dl: byte(timeDataSize),
Dd: make([]byte, timeDataSize),
},
Desc{
Dt: byte(locationDescTag),
Dl: byte(locationDataSize),
Dd: make([]byte, locationDataSize),
},
},
Essd: &ESSD{
St: 0x1b,
Epid: 0x0100,
Esil: 0x00,
},
},
},
}
)
// Std PSI in bytes form
var (
StdPatBytes = []byte{
0x00, // pointer
// ---- section included in data sent to CRC32 during check
// table header
0x00, // table id
0xb0, // section syntax indicator:1|private bit:1|reserved:2|section length:2|more bytes...:2
0x0d, // more bytes...
// syntax section
0x00, 0x01, // table id extension
0xc1, // reserved bits:2|version:5|use now:1 1100 0001
0x00, // section number
0x00, // last section number
// table data
0x00, 0x01, // Program number
0xf0, 0x00, // reserved:3|program map PID:13
// 0x2a, 0xb1, 0x04, 0xb2, // CRC
// ----
}
StdPmtBytes = []byte{
0x00, // pointer
// ---- section included in data sent to CRC32 during check
// table header
0x02, // table id
0xb0, // section syntax indicator:1|private bit:1|reserved:2|section length:2|more bytes...:2
0x12, // more bytes...
// syntax section
0x00, 0x01, // table id extension
0xc1, // reserved bits:3|version:5|use now:1
0x00, // section number
0x00, // last section number
// table data
0xe1, 0x00, // reserved:3|PCR PID:13
0xf0, 0x00, // reserved:4|unused:2|program info length:10
// No program descriptors since program info length is 0.
// elementary stream info data
0x1b, // stream type
0xe1, 0x00, // reserved:3|elementary PID:13
0xf0, 0x00, // reserved:4|unused:2|ES info length:10
// No elementary stream descriptors since ES info length is 0.
// 0x15, 0xbd, 0x4d, 0x56, // CRC
// ----
}
)

View File

@ -63,7 +63,7 @@ func NewEncoder(dst io.Writer, fps int) *Encoder {
ssrc: rand.Uint32(), ssrc: rand.Uint32(),
frameInterval: time.Duration(float64(time.Second) / float64(fps)), frameInterval: time.Duration(float64(time.Second) / float64(fps)),
fps: fps, fps: fps,
buffer: make([]byte, 0, sendLength), buffer: make([]byte, 0),
} }
} }
@ -72,8 +72,8 @@ func NewEncoder(dst io.Writer, fps int) *Encoder {
func (e *Encoder) Write(data []byte) (int, error) { func (e *Encoder) Write(data []byte) (int, error) {
e.buffer = append(e.buffer, data...) e.buffer = append(e.buffer, data...)
for len(e.buffer) >= sendLength { for len(e.buffer) >= sendLength {
e.Encode(e.buffer) e.Encode(e.buffer[:sendLength])
e.buffer = e.buffer[:0] e.buffer = e.buffer[sendLength:]
} }
return len(data), nil return len(data), nil
} }