mts+psi: fixed conflicts with master

This commit is contained in:
saxon 2019-01-12 17:03:55 +10:30
commit 662d71bf54
22 changed files with 1933 additions and 2162 deletions

View File

@ -119,11 +119,13 @@ func handleFlags() revid.Config {
verticalFlipPtr = flag.Bool("VerticalFlip", false, "Flip video vertically: Yes, No") verticalFlipPtr = flag.Bool("VerticalFlip", false, "Flip video vertically: Yes, No")
horizontalFlipPtr = flag.Bool("HorizontalFlip", false, "Flip video horizontally: Yes, No") horizontalFlipPtr = flag.Bool("HorizontalFlip", false, "Flip video horizontally: Yes, No")
rtpAddrPtr = flag.String("RtpAddr", "", "Rtp destination address: <IP>:<port> (port is generally 6970-6999)") rtpAddrPtr = flag.String("RtpAddr", "", "Rtp destination address: <IP>:<port> (port is generally 6970-6999)")
logPathPtr = flag.String("LogPath", defaultLogPath, "The log path")
configFilePtr = flag.String("ConfigFile", "", "NetSender config file")
) )
flag.Parse() flag.Parse()
log = logger.New(defaultLogVerbosity, &smartlogger.New("/var/log/netsender").LogRoller) log = logger.New(defaultLogVerbosity, &smartlogger.New(*logPathPtr).LogRoller)
cfg.Logger = log cfg.Logger = log
@ -207,13 +209,12 @@ func handleFlags() revid.Config {
} }
switch *packetizationPtr { switch *packetizationPtr {
case "None": case "", "None":
cfg.Packetization = revid.None cfg.Packetization = revid.None
case "Mpegts": case "Mpegts":
cfg.Packetization = revid.Mpegts cfg.Packetization = revid.Mpegts
case "Flv": case "Flv":
cfg.Packetization = revid.Flv cfg.Packetization = revid.Flv
case "":
default: default:
log.Log(logger.Error, pkg+"bad packetization argument") log.Log(logger.Error, pkg+"bad packetization argument")
} }
@ -229,6 +230,10 @@ func handleFlags() revid.Config {
log.Log(logger.Error, pkg+"bad verbosity argument") log.Log(logger.Error, pkg+"bad verbosity argument")
} }
if *configFilePtr != "" {
netsender.ConfigFile = *configFilePtr
}
cfg.Quantize = *quantizePtr cfg.Quantize = *quantizePtr
cfg.FlipHorizontal = *horizontalFlipPtr cfg.FlipHorizontal = *horizontalFlipPtr
cfg.FlipVertical = *verticalFlipPtr cfg.FlipVertical = *verticalFlipPtr

View File

@ -125,14 +125,11 @@ type Revid struct {
isRunning bool isRunning bool
} }
var now = time.Now()
var prevTime = now
// packer takes data segments and packs them into clips // packer takes data segments and packs them into clips
// of the number frames specified in the owners config. // of the number frames specified in the owners config.
type packer struct { type packer struct {
owner *Revid owner *Revid
lastTime time.Time
packetCount uint packetCount uint
} }
@ -156,11 +153,11 @@ func (p *packer) Write(frame []byte) (int, error) {
return n, err return n, err
} }
p.packetCount++ p.packetCount++
now = time.Now() now := time.Now()
if (p.owner.config.Output1 != Rtmp && now.Sub(prevTime) > clipDuration && p.packetCount%7 == 0) || p.owner.config.Output1 == Rtmp { if (p.owner.config.Output1 != Rtmp && now.Sub(p.lastTime) > clipDuration && p.packetCount%7 == 0) || p.owner.config.Output1 == Rtmp {
p.owner.buffer.Flush() p.owner.buffer.Flush()
p.packetCount = 0 p.packetCount = 0
prevTime = now p.lastTime = now
} }
return len(frame), nil return len(frame), nil
} }
@ -294,7 +291,7 @@ func (r *Revid) reset(config Config) error {
r.config.Logger.Log(logger.Info, pkg+"using FLV packetisation") r.config.Logger.Log(logger.Info, pkg+"using FLV packetisation")
r.encoder, err = flv.NewEncoder(&r.packer, true, true, int(r.config.FrameRate)) r.encoder, err = flv.NewEncoder(&r.packer, true, true, int(r.config.FrameRate))
if err != nil { if err != nil {
return err r.config.Logger.Log(logger.Fatal, pkg+"failed to open FLV encoder", err.Error())
} }
} }
@ -490,7 +487,6 @@ func (r *Revid) startRaspivid() error {
// setupInputForFile sets things up for getting input from a file // setupInputForFile sets things up for getting input from a file
func (r *Revid) setupInputForFile() error { func (r *Revid) setupInputForFile() error {
delay := time.Second / time.Duration(r.config.FrameRate) delay := time.Second / time.Duration(r.config.FrameRate)
f, err := os.Open(r.config.InputFileName) f, err := os.Open(r.config.InputFileName)
if err != nil { if err != nil {
r.config.Logger.Log(logger.Error, err.Error()) r.config.Logger.Log(logger.Error, err.Error())

View File

@ -144,14 +144,16 @@ func (s *httpSender) send() error {
break break
} }
} }
if !send {
return nil
}
var err error var err error
var reply string var reply string
if send {
reply, _, err = s.client.Send(netsender.RequestRecv, pins) reply, _, err = s.client.Send(netsender.RequestRecv, pins)
if err != nil { if err != nil {
return err return err
} }
}
return s.extractMeta(reply) return s.extractMeta(reply)
} }
@ -169,7 +171,7 @@ func (s *httpSender) extractMeta(r string) error {
s.log(logger.Warning, pkg+"No timestamp in reply") s.log(logger.Warning, pkg+"No timestamp in reply")
} else { } else {
s.log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t)) s.log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t))
mts.SetTimeStamp(uint64(t)) mts.MetaData.SetTimeStamp(uint64(t))
} }
// Extract location from reply // Extract location from reply
@ -178,7 +180,7 @@ func (s *httpSender) extractMeta(r string) error {
s.log(logger.Warning, pkg+"No location in reply") s.log(logger.Warning, pkg+"No location in reply")
} else { } else {
s.log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g)) s.log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g))
mts.SetLocation(g) mts.MetaData.SetLocation(g)
} }
return nil return nil
@ -264,7 +266,7 @@ func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg
var sess *rtmp.Session var sess *rtmp.Session
var err error var err error
for n := 0; n < retries; n++ { for n := 0; n < retries; n++ {
sess = rtmp.NewSession(url, timeout) sess = rtmp.NewSession(url, timeout, log)
err = sess.Open() err = sess.Open()
if err == nil { if err == nil {
break break
@ -310,7 +312,7 @@ func (s *rtmpSender) restart() error {
return err return err
} }
for n := 0; n < s.retries; n++ { for n := 0; n < s.retries; n++ {
s.sess = rtmp.NewSession(s.url, s.timeout) s.sess = rtmp.NewSession(s.url, s.timeout, s.log)
err = s.sess.Open() err = s.sess.Open()
if err == nil { if err == nil {
break break

View File

@ -9,7 +9,7 @@ AUTHORS
Saxon Nelson-Milton <saxon@ausocean.org> Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE LICENSE
amf_headers.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) amf_headers.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the under the terms of the GNU General Public License as published by the

515
rtmp/packet.go Normal file
View File

@ -0,0 +1,515 @@
/*
NAME
packet.go
DESCRIPTION
RTMP packet functionality.
AUTHORS
Saxon Nelson-Milton <saxon@ausocean.org>
Dan Kortschak <dan@ausocean.org>
Alan Noble <alan@ausocean.org>
LICENSE
packet.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
Derived from librtmp under the GNU Lesser General Public License 2.1
Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org
Copyright (C) 2008-2009 Andrej Stepanchuk
Copyright (C) 2009-2010 Howard Chu
*/
package rtmp
import (
"encoding/binary"
"io"
)
// Packet types.
const (
packetTypeChunkSize = 0x01
packetTypeBytesReadReport = 0x03
packetTypeControl = 0x04
packetTypeServerBW = 0x05
packetTypeClientBW = 0x06
packetTypeAudio = 0x08
packetTypeVideo = 0x09
packetTypeFlexStreamSend = 0x0F // not implemented
packetTypeFlexSharedObject = 0x10 // not implemented
packetTypeFlexMessage = 0x11 // not implemented
packetTypeInfo = 0x12
packetTypeInvoke = 0x14
packetTypeFlashVideo = 0x16 // not implemented
)
// Header sizes.
const (
headerSizeLarge = 0
headerSizeMedium = 1
headerSizeSmall = 2
headerSizeMinimum = 3
headerSizeAuto = 4
)
// Special channels.
const (
chanBytesRead = 0x02
chanControl = 0x03
chanSource = 0x04
)
// headerSizes defines header sizes for header types 0, 1, 2 and 3 respectively:
// 0: full header (12 bytes)
// 1: header without message ID (8 bytes)
// 2: basic header + timestamp (4 byes)
// 3: basic header (chunk type and stream ID) (1 byte)
var headerSizes = [...]int{12, 8, 4, 1}
// packet defines an RTMP packet.
type packet struct {
headerType uint8
packetType uint8
channel int32
hasAbsTimestamp bool
timestamp uint32
info int32
bodySize uint32
bytesRead uint32
chunk *chunk
header []byte
body []byte
}
// chunk defines an RTMP packet chunk.
type chunk struct {
headerSize int32
data []byte
header [fullHeaderSize]byte
}
// read reads a packet.
func (pkt *packet) read(s *Session) error {
var hbuf [fullHeaderSize]byte
header := hbuf[:]
_, err := s.read(header[:1])
if err != nil {
s.log(DebugLevel, pkg+"failed to read packet header 1st byte", "error", err.Error())
if err == io.EOF {
s.log(WarnLevel, pkg+"EOF error; connection likely terminated")
}
return err
}
pkt.headerType = (header[0] & 0xc0) >> 6
pkt.channel = int32(header[0] & 0x3f)
header = header[1:]
switch {
case pkt.channel == 0:
_, err = s.read(header[:1])
if err != nil {
s.log(DebugLevel, pkg+"failed to read packet header 2nd byte", "error", err.Error())
return err
}
header = header[1:]
pkt.channel = int32(header[0]) + 64
case pkt.channel == 1:
_, err = s.read(header[:2])
if err != nil {
s.log(DebugLevel, pkg+"failed to read packet header 3rd byte", "error", err.Error())
return err
}
header = header[2:]
pkt.channel = int32(binary.BigEndian.Uint16(header[:2])) + 64
}
if pkt.channel >= s.channelsAllocatedIn {
n := pkt.channel + 10
timestamp := append(s.channelTimestamp, make([]int32, 10)...)
var pkts []*packet
if s.channelsIn == nil {
pkts = make([]*packet, n)
} else {
pkts = append(s.channelsIn[:pkt.channel:pkt.channel], make([]*packet, 10)...)
}
s.channelTimestamp = timestamp
s.channelsIn = pkts
for i := int(s.channelsAllocatedIn); i < len(s.channelTimestamp); i++ {
s.channelTimestamp[i] = 0
}
for i := int(s.channelsAllocatedIn); i < int(n); i++ {
s.channelsIn[i] = nil
}
s.channelsAllocatedIn = n
}
size := headerSizes[pkt.headerType]
switch {
case size == fullHeaderSize:
pkt.hasAbsTimestamp = true
case size < fullHeaderSize:
if s.channelsIn[pkt.channel] != nil {
*pkt = *(s.channelsIn[pkt.channel])
}
}
size--
if size > 0 {
_, err = s.read(header[:size])
if err != nil {
s.log(DebugLevel, pkg+"failed to read packet header", "error", err.Error())
return err
}
}
hSize := len(hbuf) - len(header) + size
if size >= 3 {
pkt.timestamp = C_AMF_DecodeInt24(header[:3])
if size >= 6 {
pkt.bodySize = C_AMF_DecodeInt24(header[3:6])
pkt.bytesRead = 0
if size > 6 {
pkt.packetType = header[6]
if size == 11 {
pkt.info = decodeInt32LE(header[7:11])
}
}
}
}
extendedTimestamp := pkt.timestamp == 0xffffff
if extendedTimestamp {
_, err = s.read(header[size : size+4])
if err != nil {
s.log(DebugLevel, pkg+"failed to read extended timestamp", "error", err.Error())
return err
}
// TODO: port this
pkt.timestamp = C_AMF_DecodeInt32(header[size : size+4])
hSize += 4
}
if pkt.bodySize > 0 && pkt.body == nil {
pkt.resize(pkt.bodySize, (hbuf[0]&0xc0)>>6)
}
toRead := int32(pkt.bodySize - pkt.bytesRead)
chunkSize := s.inChunkSize
if toRead < chunkSize {
chunkSize = toRead
}
if pkt.chunk != nil {
panic("non-nil chunk")
pkt.chunk.headerSize = int32(hSize)
copy(pkt.chunk.header[:], hbuf[:hSize])
pkt.chunk.data = pkt.body[pkt.bytesRead : pkt.bytesRead+uint32(chunkSize)]
}
_, err = s.read(pkt.body[pkt.bytesRead:][:chunkSize])
if err != nil {
s.log(DebugLevel, pkg+"failed to read packet body", "error", err.Error())
return err
}
pkt.bytesRead += uint32(chunkSize)
// keep the packet as ref for other packets on this channel
if s.channelsIn[pkt.channel] == nil {
s.channelsIn[pkt.channel] = &packet{}
}
*(s.channelsIn[pkt.channel]) = *pkt
if extendedTimestamp {
s.channelsIn[pkt.channel].timestamp = 0xffffff
}
if pkt.bytesRead != pkt.bodySize {
panic("readPacket: bytesRead != bodySize")
}
if !pkt.hasAbsTimestamp {
// timestamps seem to always be relative
pkt.timestamp += uint32(s.channelTimestamp[pkt.channel])
}
s.channelTimestamp[pkt.channel] = int32(pkt.timestamp)
s.channelsIn[pkt.channel].body = nil
s.channelsIn[pkt.channel].bytesRead = 0
s.channelsIn[pkt.channel].hasAbsTimestamp = false
return nil
}
// resize adjusts the packet's storage to accommodate a body of the given size and header type.
func (pkt *packet) resize(size uint32, ht uint8) {
buf := make([]byte, fullHeaderSize+size)
pkt.header = buf
pkt.body = buf[fullHeaderSize:]
if ht != headerSizeAuto {
pkt.headerType = ht
return
}
switch pkt.packetType {
case packetTypeVideo, packetTypeAudio:
if pkt.timestamp == 0 {
pkt.headerType = headerSizeLarge
} else {
pkt.headerType = headerSizeMedium
}
case packetTypeInfo:
pkt.headerType = headerSizeLarge
pkt.bodySize += 16
default:
pkt.headerType = headerSizeMedium
}
}
// write sends a packet.
// When queue is true, we expect a response to this request and cache the method on s.methodCalls.
func (pkt *packet) write(s *Session, queue bool) error {
if pkt.body == nil {
return errInvalidBody
}
if pkt.channel >= s.channelsAllocatedOut {
s.log(DebugLevel, pkg+"growing channelsOut", "channel", pkt.channel)
n := int(pkt.channel + 10)
var pkts []*packet
if s.channelsOut == nil {
pkts = make([]*packet, n)
} else {
pkts = append(s.channelsOut[:pkt.channel:pkt.channel], make([]*packet, 10)...)
}
s.channelsOut = pkts
for i := int(s.channelsAllocatedOut); i < n; i++ {
s.channelsOut[i] = nil
}
s.channelsAllocatedOut = int32(n)
}
prevPkt := s.channelsOut[pkt.channel]
var last int
if prevPkt != nil && pkt.headerType != headerSizeLarge {
// compress a bit by using the prev packet's attributes
if prevPkt.bodySize == pkt.bodySize && prevPkt.packetType == pkt.packetType && pkt.headerType == headerSizeMedium {
pkt.headerType = headerSizeSmall
}
if prevPkt.timestamp == pkt.timestamp && pkt.headerType == headerSizeSmall {
pkt.headerType = headerSizeMinimum
}
last = int(prevPkt.timestamp)
}
if pkt.headerType > 3 {
s.log(WarnLevel, pkg+"unexpected header type", "type", pkt.headerType)
return errInvalidHeader
}
// The complete packet starts from headerSize _before_ the start the body.
// origIdx is the original offset, which will be 0 for a full (12-byte) header or 11 for a minimum (1-byte) header.
headBytes := pkt.header
hSize := headerSizes[pkt.headerType]
origIdx := fullHeaderSize - hSize
// adjust 1 or 2 bytes for the channel
cSize := 0
switch {
case pkt.channel > 319:
cSize = 2
case pkt.channel > 63:
cSize = 1
}
if cSize != 0 {
origIdx -= cSize
hSize += cSize
}
// adjust 4 bytes for the timestamp
var ts uint32
if prevPkt != nil {
ts = uint32(int(pkt.timestamp) - last)
}
if ts >= 0xffffff {
origIdx -= 4
hSize += 4
s.log(DebugLevel, pkg+"larger timestamp than 24 bits", "timestamp", ts)
}
headerIdx := origIdx
c := pkt.headerType << 6
switch cSize {
case 0:
c |= byte(pkt.channel)
case 1:
// Do nothing.
case 2:
c |= 1
}
headBytes[headerIdx] = c
headerIdx++
if cSize != 0 {
tmp := pkt.channel - 64
headBytes[headerIdx] = byte(tmp & 0xff)
headerIdx++
if cSize == 2 {
headBytes[headerIdx] = byte(tmp >> 8)
headerIdx++
}
}
if headerSizes[pkt.headerType] > 1 {
res := ts
if ts > 0xffffff {
res = 0xffffff
}
C_AMF_EncodeInt24(headBytes[headerIdx:], int32(res))
headerIdx += 3 // 24bits
}
if headerSizes[pkt.headerType] > 4 {
C_AMF_EncodeInt24(headBytes[headerIdx:], int32(pkt.bodySize))
headerIdx += 3 // 24bits
headBytes[headerIdx] = pkt.packetType
headerIdx++
}
if headerSizes[pkt.headerType] > 8 {
binary.LittleEndian.PutUint32(headBytes[headerIdx:headerIdx+4], uint32(pkt.info))
headerIdx += 4 // 32bits
}
if ts >= 0xffffff {
C_AMF_EncodeInt32(headBytes[headerIdx:], int32(ts))
headerIdx += 4 // 32bits
}
size := int(pkt.bodySize)
chunkSize := int(s.outChunkSize)
if s.deferred == nil {
// Defer sending small audio packets (at most once).
if pkt.packetType == packetTypeAudio && size < chunkSize {
s.deferred = headBytes[origIdx:][:size+hSize]
s.log(DebugLevel, pkg+"deferred sending packet", "size", size, "la", s.link.conn.LocalAddr(), "ra", s.link.conn.RemoteAddr())
return nil
}
} else {
// Send previously deferrd packet if combining it with the next one would exceed the chunk size.
if len(s.deferred)+size+hSize > chunkSize {
s.log(DebugLevel, pkg+"sending deferred packet separately", "size", len(s.deferred))
_, err := s.write(s.deferred)
if err != nil {
return err
}
s.deferred = nil
}
}
// TODO(kortschak): Rewrite this horrific peice of premature optimisation.
// NB: RTMP wants packets in chunks which are 128 bytes by default, but the server may request a different size.
s.log(DebugLevel, pkg+"sending packet", "la", s.link.conn.LocalAddr(), "ra", s.link.conn.RemoteAddr(), "size", size)
for size+hSize != 0 {
if chunkSize > size {
chunkSize = size
}
bytes := headBytes[origIdx:][:chunkSize+hSize]
if s.deferred != nil {
// Prepend the previously deferred packet and write it with the current one.
s.log(DebugLevel, pkg+"combining deferred packet", "size", len(s.deferred))
bytes = append(s.deferred, bytes...)
}
_, err := s.write(bytes)
if err != nil {
return err
}
s.deferred = nil
size -= chunkSize
origIdx += chunkSize + hSize
hSize = 0
if size > 0 {
origIdx -= 1 + cSize
hSize = 1 + cSize
if ts >= 0xffffff {
origIdx -= 4
hSize += 4
}
headBytes[origIdx] = 0xc0 | c
if cSize != 0 {
tmp := int(pkt.channel) - 64
headBytes[origIdx+1] = byte(tmp)
if cSize == 2 {
headBytes[origIdx+2] = byte(tmp >> 8)
}
}
if ts >= 0xffffff {
extendedTimestamp := headBytes[origIdx+1+cSize:]
C_AMF_EncodeInt32(extendedTimestamp[:4], int32(ts))
}
}
}
// We invoked a remote method
if pkt.packetType == packetTypeInvoke {
buf := pkt.body[1:]
meth := C_AMF_DecodeString(buf)
s.log(DebugLevel, pkg+"invoking method "+meth)
// keep it in call queue till result arrives
if queue {
buf = buf[3+len(meth):]
txn := int32(C_AMF_DecodeNumber(buf[:8]))
s.methodCalls = append(s.methodCalls, method{name: meth, num: txn})
}
}
if s.channelsOut[pkt.channel] == nil {
s.channelsOut[pkt.channel] = &packet{}
}
*(s.channelsOut[pkt.channel]) = *pkt
return nil
}
func decodeInt32LE(data []byte) int32 {
return int32(data[3])<<24 | int32(data[2])<<16 | int32(data[1])<<8 | int32(data[0])
}
func encodeInt32LE(dst []byte, v int32) int32 {
binary.LittleEndian.PutUint32(dst, uint32(v))
return 4
}

View File

@ -8,9 +8,10 @@ DESCRIPTION
AUTHOR AUTHOR
Dan Kortschak <dan@ausocean.org> Dan Kortschak <dan@ausocean.org>
Saxon Nelson-Milton <saxon@ausocean.org> Saxon Nelson-Milton <saxon@ausocean.org>
Alan Noble <alan@ausocean.org>
LICENSE LICENSE
parseurl.go is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean) parseurl.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the under the terms of the GNU General Public License as published by the
@ -33,53 +34,50 @@ LICENSE
package rtmp package rtmp
import ( import (
"log"
"net/url" "net/url"
"path" "path"
"strconv" "strconv"
"strings" "strings"
) )
// int RTMP_ParseURL(const char *url, int *protocol, AVal *host, unsigned int *port, AVal *playpath, AVal *app); // parseURL parses an RTMP URL (ok, technically it is lexing).
// parseurl.c +33 //
func C_RTMP_ParseURL(addr string) (protocol int32, host string, port uint16, app, playpath string, ok bool) { func parseURL(addr string) (protocol int32, host string, port uint16, app, playpath string, err error) {
u, err := url.Parse(addr) u, err := url.Parse(addr)
if err != nil { if err != nil {
log.Printf("failed to parse addr: %v", err) return protocol, host, port, app, playpath, err
return protocol, host, port, app, playpath, false
} }
switch u.Scheme { switch u.Scheme {
case "rtmp": case "rtmp":
protocol = RTMP_PROTOCOL_RTMP protocol = protoRTMP
case "rtmpt": case "rtmpt":
protocol = RTMP_PROTOCOL_RTMPT protocol = protoRTMPT
case "rtmps": case "rtmps":
protocol = RTMP_PROTOCOL_RTMPS protocol = protoRTMPS
case "rtmpe": case "rtmpe":
protocol = RTMP_PROTOCOL_RTMPE protocol = protoRTMPE
case "rtmfp": case "rtmfp":
protocol = RTMP_PROTOCOL_RTMFP protocol = protoRTMFP
case "rtmpte": case "rtmpte":
protocol = RTMP_PROTOCOL_RTMPTE protocol = protoRTMPTE
case "rtmpts": case "rtmpts":
protocol = RTMP_PROTOCOL_RTMPTS protocol = protoRTMPTS
default: default:
log.Printf("unknown scheme: %q", u.Scheme) return protocol, host, port, app, playpath, errUnknownScheme
return protocol, host, port, app, playpath, false
} }
host = u.Host host = u.Host
if p := u.Port(); p != "" { if p := u.Port(); p != "" {
pi, err := strconv.Atoi(p) pi, err := strconv.Atoi(p)
if err != nil { if err != nil {
return protocol, host, port, app, playpath, false return protocol, host, port, app, playpath, err
} }
port = uint16(pi) port = uint16(pi)
} }
if !path.IsAbs(u.Path) { if !path.IsAbs(u.Path) {
return protocol, host, port, app, playpath, true return protocol, host, port, app, playpath, nil
} }
elems := strings.SplitN(u.Path[1:], "/", 3) elems := strings.SplitN(u.Path[1:], "/", 3)
app = elems[0] app = elems[0]
@ -99,5 +97,5 @@ func C_RTMP_ParseURL(addr string) (protocol int32, host string, port uint16, app
} }
} }
return protocol, host, port, app, playpath, true return protocol, host, port, app, playpath, nil
} }

File diff suppressed because it is too large Load Diff

View File

@ -1,221 +0,0 @@
/*
NAME
rtmp_headers.go
DESCRIPTION
See Readme.md
AUTHORS
Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE
rtmp_headers.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
Derived from librtmp under the GNU Lesser General Public License 2.1
Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org
Copyright (C) 2008-2009 Andrej Stepanchuk
Copyright (C) 2009-2010 Howard Chu
*/
package rtmp
import "net"
const (
RTMPT_OPEN = iota
RTMPT_SEND
RTMPT_IDLE
RTMPT_CLOSE
)
const (
RTMP_PACKET_TYPE_CHUNK_SIZE = 0x01
RTMP_PACKET_TYPE_BYTES_READ_REPORT = 0x03
RTMP_PACKET_TYPE_CONTROL = 0x04
RTMP_PACKET_TYPE_SERVER_BW = 0x05
RTMP_PACKET_TYPE_CLIENT_BW = 0x06
RTMP_PACKET_TYPE_AUDIO = 0x08
RTMP_PACKET_TYPE_VIDEO = 0x09
RTMP_PACKET_TYPE_FLEX_STREAM_SEND = 0x0F
RTMP_PACKET_TYPE_FLEX_SHARED_OBJECT = 0x10
RTMP_PACKET_TYPE_FLEX_MESSAGE = 0x11
RTMP_PACKET_TYPE_INFO = 0x12
RTMP_PACKET_TYPE_INVOKE = 0x14
RTMP_PACKET_TYPE_FLASH_VIDEO = 0x16
)
const (
RTMP_PACKET_SIZE_LARGE = 0
RTMP_PACKET_SIZE_MEDIUM = 1
RTMP_PACKET_SIZE_SMALL = 2
RTMP_PACKET_SIZE_MINIMUM = 3
)
const (
RTMP_READ_HEADER = 0x01
RTMP_READ_RESUME = 0x02
RTMP_READ_NO_IGNORE = 0x04
RTMP_READ_GOTKF = 0x08
RTMP_READ_GOTFLVK = 0x10
RTMP_READ_SEEKING = 0x20
RTMP_READ_COMPLETE = -3
RTMP_READ_ERROR = -2
RTMP_READ_EOF = -1
RTMP_READ_IGNORE = 0
)
const (
RTMP_LF_AUTH = 0x0001 /* using auth param */
RTMP_LF_LIVE = 0x0002 /* stream is live */
RTMP_LF_SWFV = 0x0004 /* do SWF verification */
RTMP_LF_PLST = 0x0008 /* send playlist before play */
RTMP_LF_BUFX = 0x0010 /* toggle stream on BufferEmpty msg */
RTMP_LF_FTCU = 0x0020 /* free tcUrl on close */
RTMP_LF_FAPU = 0x0040 /* free app on close */
)
const (
RTMP_FEATURE_HTTP = 0x01
RTMP_FEATURE_ENC = 0x02
RTMP_FEATURE_SSL = 0x04
RTMP_FEATURE_MFP = 0x08 /* not yet supported */
RTMP_FEATURE_WRITE = 0x10 /* publish, not play */
RTMP_FEATURE_HTTP2 = 0x20 /* server-side rtmpt */
)
const (
RTMP_PROTOCOL_RTMP = 0
RTMP_PROTOCOL_RTMPE = RTMP_FEATURE_ENC
RTMP_PROTOCOL_RTMPT = RTMP_FEATURE_HTTP
RTMP_PROTOCOL_RTMPS = RTMP_FEATURE_SSL
RTMP_PROTOCOL_RTMPTE = (RTMP_FEATURE_HTTP | RTMP_FEATURE_ENC)
RTMP_PROTOCOL_RTMPTS = (RTMP_FEATURE_HTTP | RTMP_FEATURE_SSL)
RTMP_PROTOCOL_RTMFP = RTMP_FEATURE_MFP
)
const (
RTMP_DEFAULT_CHUNKSIZE = 128
RTMP_BUFFER_CACHE_SIZE = (16 * 1024)
RTMP_SIG_SIZE = 1536
RTMP_LARGE_HEADER_SIZE = 12
RTMP_MAX_HEADER_SIZE = 18
)
// typedef struct RTMPChunk
// rtmp.h +105
type C_RTMPChunk struct {
c_headerSize int32
c_chunk []byte
c_header [RTMP_MAX_HEADER_SIZE]byte
}
// typedef struct RTMPPacket
// rtmp.h +113
type C_RTMPPacket struct {
m_headerType uint8
m_packetType uint8
m_hasAbsTimestamp bool
m_nChannel int32
m_nTimeStamp uint32
m_nInfoField2 int32
m_nBodySize uint32
m_nBytesRead uint32
m_chunk *C_RTMPChunk
m_header []byte
m_body []byte
}
// typedef struct RTMPSockBuf
// rtmp.h +127
type C_RTMPSockBuf struct {
conn *net.TCPConn
sb_size int
sb_start int
sb_buf [RTMP_BUFFER_CACHE_SIZE]byte // port const
sb_timedout bool
}
// RTMPPacket_IsReady(a)
// rtmp.h +142
func C_RTMPPacket_IsReady(p *C_RTMPPacket) bool {
return p.m_nBytesRead == p.m_nBodySize
}
// typedef struct RTMP_LNK
// rtmp.h +144
type C_RTMP_LNK struct {
hostname string
sockshost string
playpath0 string
playpath string
tcUrl string
swfUrl string
pageUrl string
app string
auth string
flashVer string
token string
extras C_AMFObject
seekTime int32
lFlags int32
swfAge int32
protocol int32
timeout int32
socksport uint16
port uint16
}
// typedef struct RTMPMethod
// rtmp.h +231
type C_RTMP_METHOD struct {
name string
num int32
}
// typedef struct RTMP
// rtmp.h +237
type C_RTMP struct {
m_inChunkSize int32
m_outChunkSize int32
m_nBWCheckCounter int32
m_nBytesIn int32
m_nBytesInSent int32
m_nBufferMS int32
m_stream_id int32
m_mediaChannel int32
m_pausing int32
m_nServerBW int32
m_nClientBW int32
m_nClientBW2 uint8
m_bPlaying bool
m_bSendEncoding bool
m_bSendCounter bool
m_numInvokes int32
m_methodCalls []C_RTMP_METHOD
m_channelsAllocatedIn int32
m_channelsAllocatedOut int32
m_vecChannelsIn []*C_RTMPPacket
m_vecChannelsOut []*C_RTMPPacket
m_channelTimestamp []int32
m_fAudioCodecs float64
m_fVideoCodecs float64
m_fEncoding float64
m_fDuration float64
m_msgCounter int32
m_resplen int32
m_unackd int32
m_write C_RTMPPacket
m_sb C_RTMPSockBuf
Link C_RTMP_LNK
}

View File

@ -1,7 +0,0 @@
package rtmp
// #define SET_RCVTIMEO(tv,s) int tv = s*1000
// rtmp_sys.h +43
func SET_RCVTIMEO(tv *int32, s int32) {
*tv = s * 1000
}

238
rtmp/rtmp_test.go Normal file
View File

@ -0,0 +1,238 @@
/*
NAME
rtmp_test.go
DESCRIPTION
RTMP tests
AUTHORS
Saxon Nelson-Milton <saxon@ausocean.org>
Dan Kortschak <dan@ausocean.org>
Alan Noble <alan@ausocean.org>
LICENSE
rtmp_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package rtmp
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"testing"
"time"
"bitbucket.org/ausocean/av/stream/flv"
"bitbucket.org/ausocean/av/stream/lex"
)
const (
rtmpProtocol = "rtmp"
testHost = "a.rtmp.youtube.com"
testApp = "live2"
testBaseURL = rtmpProtocol + "://" + testHost + "/" + testApp + "/"
testTimeout = 30
testDataDir = "../../test/test-data/av/input"
)
// testVerbosity controls the amount of output.
// NB: This is not the log level, which is DebugLevel.
// 0: suppress logging completely
// 1: log messages only
// 2: log messages with errors, if any
var testVerbosity = 1
// testKey is the YouTube RTMP key required for YouTube streaming (RTMP_TEST_KEY env var).
// NB: don't share your key with others.
var testKey string
// testFile is the test video file (RTMP_TEST_FILE env var).
// betterInput.h264 is a good one to use.
var testFile string
// testLog is a bare bones logger that logs to stdout, and exits upon either an error or fatal error.
func testLog(level int8, msg string, params ...interface{}) {
logLevels := [...]string{"Debug", "Info", "Warn", "Error", "", "", "Fatal"}
if testVerbosity == 0 {
return
}
if level < -1 || level > 5 {
panic("Invalid log level")
}
switch testVerbosity {
case 0:
// silence is golden
case 1:
fmt.Printf("%s: %s\n", logLevels[level+1], msg)
case 2:
// extract the first param if it is one we care about, otherwise just print the message
if len(params) >= 2 {
switch params[0].(string) {
case "error":
fmt.Printf("%s: %s, error=%v\n", logLevels[level+1], msg, params[1].(string))
case "size":
fmt.Printf("%s: %s, size=%d\n", logLevels[level+1], msg, params[1].(int))
default:
fmt.Printf("%s: %s\n", logLevels[level+1], msg)
}
} else {
fmt.Printf("%s: %s\n", logLevels[level+1], msg)
}
}
if level >= 4 {
// Error or Fatal
buf := make([]byte, 1<<16)
size := runtime.Stack(buf, true)
fmt.Printf("%s\n", string(buf[:size]))
os.Exit(1)
}
}
// TestKey tests that the RTMP_TEST_KEY environment variable is present
func TestKey(t *testing.T) {
testLog(0, "TestKey")
testKey = os.Getenv("RTMP_TEST_KEY")
if testKey == "" {
msg := "RTMP_TEST_KEY environment variable not defined"
testLog(0, msg)
t.Skip(msg)
}
testLog(0, "Testing against URL "+testBaseURL+testKey)
}
// TestSetupURL tests URL parsing.
func TestSetupURL(t *testing.T) {
testLog(0, "TestSetupURL")
// test with just the base URL
s := NewSession(testBaseURL, testTimeout, testLog)
if s.url != testBaseURL && s.link.timeout != testTimeout {
t.Errorf("NewSession failed")
}
err := setupURL(s)
if err != nil {
t.Errorf("setupURL(testBaseURL) failed with error: %v", err)
}
// test the parts are as expected
if rtmpProtocolStrings[s.link.protocol] != rtmpProtocol {
t.Errorf("setupURL returned wrong protocol: %v", s.link.protocol)
}
if s.link.host != testHost {
t.Errorf("setupURL returned wrong host: %v", s.link.host)
}
if s.link.app != testApp {
t.Errorf("setupURL returned wrong app: %v", s.link.app)
}
}
// TestOpenClose tests opening an closing an RTMP connection.
func TestOpenClose(t *testing.T) {
testLog(0, "TestOpenClose")
if testKey == "" {
t.Skip("Skipping TestOpenClose since no RTMP_TEST_KEY")
}
s := NewSession(testBaseURL+testKey, testTimeout, testLog)
err := s.Open()
if err != nil {
t.Errorf("Open failed with error: %v", err)
return
}
err = s.Close()
if err != nil {
t.Errorf("Close failed with error: %v", err)
return
}
}
// TestFromFrame tests streaming from a single H.264 frame which is repeated.
func TestFromFrame(t *testing.T) {
testLog(0, "TestFromFrame")
if testKey == "" {
t.Skip("Skipping TestFromFrame since no RTMP_TEST_KEY")
}
s := NewSession(testBaseURL+testKey, testTimeout, testLog)
err := s.Open()
if err != nil {
t.Errorf("Session.Open failed with error: %v", err)
}
b, err := ioutil.ReadFile(filepath.Join(testDataDir, "AusOcean_logo_1080p.h264"))
if err != nil {
t.Errorf("ReadFile failed with error: %v", err)
}
// Pass RTMP session, true for audio, true for video, and 25 FPS
// ToDo: fix this. Although we can encode the file and YouTube
// doesn't complain, YouTube doesn't play it (even when we
// send 1 minute's worth).
flvEncoder, err := flv.NewEncoder(s, true, true, 25)
if err != nil {
t.Fatalf("failed to create encoder: %v", err)
}
for i := 0; i < 25; i++ {
err := flvEncoder.Encode(b)
if err != nil {
t.Errorf("Encoding failed with error: %v", err)
}
time.Sleep(time.Millisecond / 25) // rate limit to 1/25s
}
err = s.Close()
if err != nil {
t.Errorf("Session.Close failed with error: %v", err)
}
}
// TestFromFile tests streaming from an video file comprising raw H.264.
// The test file is supplied via the RTMP_TEST_FILE environment variable.
func TestFromFile(t *testing.T) {
testLog(0, "TestFromFile")
testFile := os.Getenv("RTMP_TEST_FILE")
if testFile == "" {
t.Skip("Skipping TestFromFile since no RTMP_TEST_FILE")
}
if testKey == "" {
t.Skip("Skipping TestFromFile since no RTMP_TEST_KEY")
}
s := NewSession(testBaseURL+testKey, testTimeout, testLog)
err := s.Open()
if err != nil {
t.Errorf("Session.Open failed with error: %v", err)
}
f, err := os.Open(testFile)
if err != nil {
t.Errorf("Open failed with error: %v", err)
}
defer f.Close()
// Pass RTMP session, true for audio, true for video, and 25 FPS
flvEncoder, err := flv.NewEncoder(s, true, true, 25)
if err != nil {
t.Fatalf("failed to create encoder: %v", err)
}
err = lex.H264(flvEncoder, f, time.Second/time.Duration(25))
if err != nil {
t.Errorf("Lexing and encoding failed with error: %v", err)
}
err = s.Close()
if err != nil {
t.Errorf("Session.Close failed with error: %v", err)
}
}

View File

@ -3,14 +3,15 @@ NAME
session.go session.go
DESCRIPTION DESCRIPTION
See Readme.md RTMP session functionality.
AUTHORS AUTHORS
Saxon Nelson-Milton <saxon@ausocean.org> Saxon Nelson-Milton <saxon@ausocean.org>
Dan Kortschak <dan@ausocean.org> Dan Kortschak <dan@ausocean.org>
Alan Noble <alan@ausocean.org>
LICENSE LICENSE
session.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) session.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the under the terms of the GNU General Public License as published by the
@ -33,65 +34,216 @@ LICENSE
package rtmp package rtmp
import ( import (
"errors" "io"
"net"
"time"
) )
// session provides parameters required for an rtmp communication session. // Session holds the state for an RTMP session.
type Session struct { type Session struct {
rtmp *C_RTMP
url string url string
timeout uint inChunkSize int32
outChunkSize int32
checkCounter int32
nBytesIn int32
nBytesInSent int32
streamID int32
serverBW int32
clientBW int32
clientBW2 uint8
isPlaying bool
sendEncoding bool
numInvokes int32
methodCalls []method
channelsAllocatedIn int32
channelsAllocatedOut int32
channelsIn []*packet
channelsOut []*packet
channelTimestamp []int32
audioCodecs float64
videoCodecs float64
encoding float64
deferred []byte
link link
log Log
} }
// NewSession returns a new session. // link represents RTMP URL and connection information.
func NewSession(url string, connectTimeout uint) *Session { type link struct {
host string
playpath string
tcUrl string
swfUrl string
pageUrl string
app string
auth string
flashVer string
token string
extras C_AMFObject
flags int32
swfAge int32
protocol int32
timeout uint
port uint16
conn *net.TCPConn
}
// method represents an RTMP method.
type method struct {
name string
num int32
}
// Log defines the RTMP logging function.
type Log func(level int8, message string, params ...interface{})
// Log levels used by Log.
const (
DebugLevel int8 = -1
InfoLevel int8 = 0
WarnLevel int8 = 1
ErrorLevel int8 = 2
FatalLevel int8 = 5
)
// NewSession returns a new Session.
func NewSession(url string, timeout uint, log Log) *Session {
return &Session{ return &Session{
url: url, url: url,
timeout: connectTimeout, inChunkSize: 128,
outChunkSize: 128,
clientBW: 2500000,
clientBW2: 2,
serverBW: 2500000,
audioCodecs: 3191.0,
videoCodecs: 252.0,
log: log,
link: link{
timeout: timeout,
swfAge: 30,
},
} }
} }
// Open establishes an rtmp connection with the url passed into the // Open establishes an rtmp connection with the url passed into the constructor.
// constructor
func (s *Session) Open() error { func (s *Session) Open() error {
if s.rtmp != nil { s.log(DebugLevel, pkg+"Session.Open")
return errors.New("rtmp: attempt to start already running session") if s.isConnected() {
return errConnected
} }
var err error err := setupURL(s)
s.rtmp, err = startSession(s.rtmp, s.url, uint32(s.timeout)) if err != nil {
if s.rtmp == nil { return err
}
s.enableWrite()
err = connect(s)
if err != nil {
s.Close()
return err
}
err = connectStream(s)
if err != nil {
s.Close()
return err return err
} }
return nil return nil
} }
// Close terminates the rtmp connection // Close terminates the rtmp connection.
// NB: Close is idempotent and the session value is cleared completely.
func (s *Session) Close() error { func (s *Session) Close() error {
if s.rtmp == nil { s.log(DebugLevel, pkg+"Session.Close")
return Err(3) if !s.isConnected() {
return errNotConnected
} }
ret := endSession(s.rtmp) if s.streamID > 0 {
s.rtmp = nil if s.link.protocol&featureWrite != 0 {
if ret != 0 { sendFCUnpublish(s)
return Err(ret)
} }
sendDeleteStream(s, float64(s.streamID))
}
s.link.conn.Close()
*s = Session{}
return nil return nil
} }
// Write writes a frame (flv tag) to the rtmp connection // Write writes a frame (flv tag) to the rtmp connection.
func (s *Session) Write(data []byte) (int, error) { func (s *Session) Write(data []byte) (int, error) {
if s.rtmp == nil { if !s.isConnected() {
return 0, Err(3) return 0, errNotConnected
}
if len(data) < minDataSize {
return 0, errTinyPacket
}
if data[0] == packetTypeInfo || (data[0] == 'F' && data[1] == 'L' && data[2] == 'V') {
return 0, errUnimplemented
} }
if C_RTMP_IsConnected(s.rtmp) == 0 { pkt := packet{
//if C.RTMP_IsConnected(s.rtmp) == 0 { packetType: data[0],
return 0, Err(1) bodySize: C_AMF_DecodeInt24(data[1:4]),
timestamp: C_AMF_DecodeInt24(data[4:7]) | uint32(data[7])<<24,
channel: chanSource,
info: s.streamID,
} }
if C_RTMP_Write(s.rtmp, data) == 0 { pkt.resize(pkt.bodySize, headerSizeAuto)
//if C.RTMP_Write(s.rtmp, (*byte)(unsafe.Pointer(&data[0])), int32(len(data))) == 0 { copy(pkt.body, data[minDataSize:minDataSize+pkt.bodySize])
return 0, Err(2) err := pkt.write(s, false)
if err != nil {
return 0, err
} }
return len(data), nil return len(data), nil
} }
// I/O functions
// read from an RTMP connection. Sends a bytes received message if the
// number of bytes received (nBytesIn) is greater than the number sent
// (nBytesInSent) by 10% of the bandwidth.
func (s *Session) read(buf []byte) (int, error) {
err := s.link.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(s.link.timeout)))
if err != nil {
return 0, err
}
n, err := io.ReadFull(s.link.conn, buf)
if err != nil {
s.log(DebugLevel, pkg+"read failed", "error", err.Error())
return 0, err
}
s.nBytesIn += int32(n)
if s.nBytesIn > (s.nBytesInSent + s.clientBW/10) {
err := sendBytesReceived(s)
if err != nil {
return n, err // NB: we still read n bytes, even though send bytes failed
}
}
return n, nil
}
// write to an RTMP connection.
func (s *Session) write(buf []byte) (int, error) {
//ToDo: consider using a different timeout for writes than for reads
err := s.link.conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(s.link.timeout)))
if err != nil {
return 0, err
}
n, err := s.link.conn.Write(buf)
if err != nil {
s.log(WarnLevel, pkg+"write failed", "error", err.Error())
return 0, err
}
return n, nil
}
// isConnected returns true if the RTMP connection is up.
func (s *Session) isConnected() bool {
return s.link.conn != nil
}
// enableWrite enables the current session for writing.
func (s *Session) enableWrite() {
s.link.protocol |= featureWrite
}

View File

@ -1,151 +0,0 @@
/*
NAME
rtmp.go
DESCRIPTION
See Readme.md
AUTHORS
Saxon Nelson-Milton <saxon@ausocean.org>
Dan Kortschak <dan@ausocean.org>
LICENSE
rtmp.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
Derived from librtmp under the GNU Lesser General Public License 2.1
Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org
Copyright (C) 2008-2009 Andrej Stepanchuk
Copyright (C) 2009-2010 Howard Chu
*/
package rtmp
import (
"fmt"
"log"
"net"
"golang.org/x/sys/unix"
)
// int RTMP_Connect(RTMP *r, RTMPPacket* cp);
// rtmp.c +1032
func C_RTMP_Connect(r *C_RTMP, cp *C_RTMPPacket) (ok bool) {
if r.Link.hostname == "" {
return false
}
var hostname string
if r.Link.socksport != 0 {
hostname = fmt.Sprintf("%s:%d", r.Link.sockshost, r.Link.socksport)
} else {
hostname = fmt.Sprintf("%s:%d", r.Link.hostname, r.Link.port)
}
addr, err := net.ResolveTCPAddr("tcp4", hostname)
if err != nil {
return false
}
r.m_sb.conn, err = net.DialTCP("tcp4", nil, addr)
if err != nil {
return false
}
if r.Link.socksport != 0 {
if !C_SocksNegotiate(r, addr) {
return false
}
}
f, err := r.m_sb.conn.File()
if err != nil {
log.Printf("failed to get fd to set timeout: %v", err)
return false
}
tv := setTimeval(int(r.Link.timeout))
err = unix.SetsockoptTimeval(int(f.Fd()), unix.SOL_SOCKET, unix.SO_RCVTIMEO, &tv)
if err != nil {
log.Printf("failed to set timeout: %v", err)
}
r.m_bSendCounter = true
return C_RTMP_Connect1(r, cp)
}
// int SocksNegotiate(RTMP* r);
// rtmp.c +1062
func C_SocksNegotiate(r *C_RTMP, addr *net.TCPAddr) (ok bool) {
ip := addr.IP.To4()
packet := []byte{
0x4, // SOCKS version
0x1, // Establish a TCP/IP stream connection
byte(r.Link.port >> 8), byte(r.Link.port),
ip[0], ip[1], ip[2], ip[3],
0x0, // Empty user ID string
}
C_WriteN(r, packet)
if C_ReadN(r, packet[:8]) != 8 {
return false
}
if packet[0] == 0x0 && packet[1] == 0x5a {
return true
}
// TODO: use new logger here
log.Println("C_SocksNegotitate: SOCKS returned error code!")
return false
}
// int RTMPSockBuf_Fill(RTMPSockBuf* sb);
// rtmp.c +4253
func C_RTMPSockBuf_Fill(sb *C_RTMPSockBuf) int {
if sb.sb_size == 0 {
sb.sb_start = 0
}
n, err := sb.conn.Read(sb.sb_buf[sb.sb_start+sb.sb_size:])
if err != nil {
return 0
}
sb.sb_size += n
return n
}
// int RTMPSockBuf_Send(RTMPSockBuf* sb, const char* buf, int len);
// rtmp.c +4297
// TODO replace send with golang net connection send
func C_RTMPSockBuf_Send(sb *C_RTMPSockBuf, buf []byte) int32 {
n, err := sb.conn.Write(buf)
if err != nil {
return -1
}
return int32(n)
}
// int
// RTMPSockBuf_Close(RTMPSockBuf *sb)
// rtmp.c +4369
func C_RTMPSockBuf_Close(sb *C_RTMPSockBuf) int32 {
if sb.conn != nil {
err := sb.conn.Close()
sb.conn = nil
if err == nil {
return 1
}
}
return 0
}

View File

@ -1,7 +0,0 @@
package rtmp
import "golang.org/x/sys/unix"
func setTimeval(sec int) unix.Timeval {
return unix.Timeval{Sec: int64(sec)}
}

View File

@ -1,7 +0,0 @@
package rtmp
import "golang.org/x/sys/unix"
func setTimeval(sec int) unix.Timeval {
return unix.Timeval{Sec: int32(sec)}
}

View File

@ -60,7 +60,6 @@ type Encoder struct {
fps int fps int
audio bool audio bool
video bool video bool
lastTagSize int
header Header header Header
start time.Time start time.Time
} }
@ -73,12 +72,11 @@ func NewEncoder(dst io.Writer, audio, video bool, fps int) (*Encoder, error) {
audio: audio, audio: audio,
video: video, video: video,
} }
// TODO(kortschak): Do this lazily. _, err := dst.Write(e.HeaderBytes())
_, err := e.dst.Write(e.HeaderBytes())
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &e, nil return &e, err
} }
// HeaderBytes returns the a // HeaderBytes returns the a
@ -194,6 +192,17 @@ func (s *frameScanner) readByte() (b byte, ok bool) {
func (e *Encoder) Encode(frame []byte) error { func (e *Encoder) Encode(frame []byte) error {
var frameType byte var frameType byte
var packetType byte var packetType byte
if e.start.IsZero() {
// This is the first frame, so write the PreviousTagSize0.
//
// See https://download.macromedia.com/f4v/video_file_format_spec_v10_1.pdf
// section E.3.
var zero [4]byte
_, err := e.dst.Write(zero[:])
if err != nil {
return err
}
}
timeStamp := e.getNextTimestamp() timeStamp := e.getNextTimestamp()
// Do we have video to send off? // Do we have video to send off?
if e.video { if e.video {

View File

@ -79,6 +79,8 @@ type Header struct {
} }
func (h *Header) Bytes() []byte { func (h *Header) Bytes() []byte {
// See https://download.macromedia.com/f4v/video_file_format_spec_v10_1.pdf
// section E.2.
const headerLength = 9 const headerLength = 9
b := [headerLength]byte{ b := [headerLength]byte{
0: 'F', 1: 'L', 2: 'V', 3: version, 0: 'F', 1: 'L', 2: 'V', 3: version,

View File

@ -30,34 +30,145 @@ package mts
import ( import (
"io" "io"
"sync"
"time" "time"
"bitbucket.org/ausocean/av/stream/mts/pes" "bitbucket.org/ausocean/av/stream/mts/pes"
"bitbucket.org/ausocean/av/stream/mts/psi" "bitbucket.org/ausocean/av/stream/mts/psi"
) )
// Some common manifestations of PSI
var (
// standardPat is a minimal PAT.
standardPat = psi.PSI{
Pf: 0x00,
Tid: 0x00,
Ssi: true,
Pb: false,
Sl: 0x0d,
Tss: &psi.TSS{
Tide: 0x01,
V: 0,
Cni: true,
Sn: 0,
Lsn: 0,
Sd: &psi.PAT{
Pn: 0x01,
Pmpid: 0x1000,
},
},
}
// standardPmt is a minimal PMT, without descriptors for time and location.
standardPmt = psi.PSI{
Pf: 0x00,
Tid: 0x02,
Ssi: true,
Sl: 0x12,
Tss: &psi.TSS{
Tide: 0x01,
V: 0,
Cni: true,
Sn: 0,
Lsn: 0,
Sd: &psi.PMT{
Pcrpid: 0x0100,
Pil: 0,
Essd: &psi.ESSD{
St: 0x1b,
Epid: 0x0100,
Esil: 0x00,
},
},
},
}
// standardPmtTimeLocation is a standard PMT with time and location
// descriptors, but time and location fields zeroed out.
standardPmtTimeLocation = psi.PSI{
Pf: 0x00,
Tid: 0x02,
Ssi: true,
Sl: 0x3e,
Tss: &psi.TSS{
Tide: 0x01,
V: 0,
Cni: true,
Sn: 0,
Lsn: 0,
Sd: &psi.PMT{
Pcrpid: 0x0100,
Pil: psi.PmtTimeLocationPil,
Pd: []psi.Desc{
{
Dt: psi.TimeDescTag,
Dl: psi.TimeDataSize,
Dd: make([]byte, psi.TimeDataSize),
},
{
Dt: psi.LocationDescTag,
Dl: psi.LocationDataSize,
Dd: make([]byte, psi.LocationDataSize),
},
},
Essd: &psi.ESSD{
St: 0x1b,
Epid: 0x0100,
Esil: 0x00,
},
},
},
}
)
const ( const (
psiSndCnt = 7 psiSndCnt = 7
) )
type MetaData struct { // timeLocation holds time and location data
type timeLocation struct {
mu sync.RWMutex
time uint64 time uint64
location string location string
} }
var metaData = MetaData{time: 0, location: ""} // SetTimeStamp sets the time field of a TimeLocation.
func (tl *timeLocation) SetTimeStamp(t uint64) {
func SetTimeStamp(t uint64) { tl.mu.Lock()
metaData.time = t tl.time = t
tl.mu.Unlock()
} }
func SetLocation(g string) { // GetTimeStamp returns the location of a TimeLocation.
metaData.location = g func (tl *timeLocation) TimeStamp() uint64 {
tl.mu.RLock()
t := tl.time
tl.mu.RUnlock()
return t
} }
// SetLocation sets the location of a TimeLocation.
func (tl *timeLocation) SetLocation(l string) {
tl.mu.Lock()
tl.location = l
tl.mu.Unlock()
}
// GetLocation returns the location of a TimeLocation.
func (tl *timeLocation) Location() string {
tl.mu.RLock()
l := tl.location
tl.mu.RUnlock()
return l
}
// MetData will hold time and location data which may be set externally if
// this data is available. It is then inserted into mpegts packets outputted.
var MetaData timeLocation
var ( var (
patTable = psi.StdPat.Bytes() patTable = standardPat.Bytes()
pmtTable = psi.StdPmtTimeLocation.Bytes() pmtTable = standardPmtTimeLocation.Bytes()
) )
const ( const (
@ -71,7 +182,7 @@ const (
// Time related constants. // Time related constants.
const ( const (
// ptsOffset is the offset added to the clock to determine // ptsOffset is the offset added to the clock to determine
// the current presentation timestamp, // the current presentation timestamp.
ptsOffset = 700 * time.Millisecond ptsOffset = 700 * time.Millisecond
// pcrFreq is the base Program Clock Reference frequency. // pcrFreq is the base Program Clock Reference frequency.
@ -120,9 +231,8 @@ const (
) )
// generate handles the incoming data and generates equivalent mpegts packets - // generate handles the incoming data and generates equivalent mpegts packets -
// sending them to the output channel // sending them to the output channel.
func (e *Encoder) Encode(nalu []byte) error { func (e *Encoder) Encode(nalu []byte) error {
// Prepare PES data. // Prepare PES data.
pesPkt := pes.Packet{ pesPkt := pes.Packet{
StreamID: streamID, StreamID: streamID,
@ -173,7 +283,7 @@ func (e *Encoder) Encode(nalu []byte) error {
// writePSI creates mpegts with pat and pmt tables - with pmt table having updated // writePSI creates mpegts with pat and pmt tables - with pmt table having updated
// location and time data. // location and time data.
func (e *Encoder) writePSI() error { func (e *Encoder) writePSI() error {
// Write PAT // Write PAT.
patPkt := Packet{ patPkt := Packet{
PUSI: true, PUSI: true,
PID: patPid, PID: patPid,
@ -186,17 +296,17 @@ func (e *Encoder) writePSI() error {
return err return err
} }
// Update pmt table time and location // Update pmt table time and location.
err = psi.UpdateTime(pmtTable, metaData.time) err = psi.UpdateTime(pmtTable, MetaData.TimeStamp())
if err != nil { if err != nil {
return err return err
} }
err = psi.UpdateLocation(pmtTable, metaData.location) err = psi.UpdateLocation(pmtTable, MetaData.Location())
if err != nil { if err != nil {
return nil return nil
} }
// Create mts packet from pmt table // Create mts packet from pmt table.
pmtPkt := Packet{ pmtPkt := Packet{
PUSI: true, PUSI: true,
PID: pmtPid, PID: pmtPid,

71
stream/mts/psi/crc.go Normal file
View File

@ -0,0 +1,71 @@
/*
NAME
crc.go
DESCRIPTION
See Readme.md
AUTHOR
Dan Kortschak <dan@ausocean.org>
Saxon Milton <saxon@ausocean.org>
LICENSE
crc.go is Copyright (C) 2018 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package psi
import (
"encoding/binary"
"hash/crc32"
"math/bits"
)
// addCrc appends a crc table to a given psi table in bytes
func addCrc(out []byte) []byte {
t := make([]byte, len(out)+4)
copy(t, out)
updateCrc(t)
return t
}
// updateCrc updates the crc of bytes slice, writing the checksum into the last four bytes.
func updateCrc(b []byte) {
crc32 := crc32_Update(0xffffffff, crc32_MakeTable(bits.Reverse32(crc32.IEEE)), b[1:len(b)-4])
binary.BigEndian.PutUint32(b[len(b)-4:], crc32)
}
func crc32_MakeTable(poly uint32) *crc32.Table {
var t crc32.Table
for i := range t {
crc := uint32(i) << 24
for j := 0; j < 8; j++ {
if crc&0x80000000 != 0 {
crc = (crc << 1) ^ poly
} else {
crc <<= 1
}
}
t[i] = crc
}
return &t
}
func crc32_Update(crc uint32, tab *crc32.Table, p []byte) uint32 {
for _, v := range p {
crc = tab[byte(crc>>24)^v] ^ (crc << 8)
}
return crc
}

View File

@ -1,15 +1,15 @@
/* /*
NAME NAME
op.go helpers.go
DESCRIPTION DESCRIPTION
op.go provides functionality for editing and reading bytes slices helpers.go provides functionality for editing and reading bytes slices
directly in order to insert/read timestamp and location data in psi. directly in order to insert/read timestamp and location data in psi.
AUTHOR AUTHOR
Saxon Milton <saxon@ausocean.org> Saxon Milton <saxon@ausocean.org>
LICENSE LICENSE
op.go is Copyright (C) 2018 the Australian Ocean Lab (AusOcean) helpers.go is Copyright (C) 2018 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the under the terms of the GNU General Public License as published by the
@ -31,49 +31,40 @@ import (
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"errors" "errors"
"hash/crc32"
"math/bits"
) )
// TimeBytes takes a timestamp as an uint64 and converts to an 8 byte slice - // TimeBytes takes a timestamp as an uint64 and converts to an 8 byte slice -
// allows for updating of timestamp in pmt time descriptor. // allows for updating of timestamp in pmt time descriptor.
func TimeBytes(t uint64) []byte { func TimeBytes(t uint64) []byte {
var s [timeDataSize]byte var s [TimeDataSize]byte
binary.BigEndian.PutUint64(s[:], t) binary.BigEndian.PutUint64(s[:], t)
return s[:] return s[:]
} }
// HasTime takes a psi as a byte slice and checks to see if it has a time descriptor // HasTime takes a psi as a byte slice and checks to see if it has a time descriptor
// - if so return nil, otherwise return error // - if so return nil, otherwise return error
func HasTime(p []byte) error { func HasTime(p []byte) bool {
if p[timeTagIndx] != timeDescTag { return p[TimeTagIndx] == TimeDescTag
return errors.New("PSI does not contain a time descriptor, cannot update")
}
return nil
} }
// HasLocation takes a psi as a byte slice and checks to see if it has a location descriptor // HasLocation takes a psi as a byte slice and checks to see if it has a location descriptor
// - if so return nil, otherwise return error // - if so return nil, otherwise return error
func HasLocation(p []byte) error { func HasLocation(p []byte) bool {
if p[locationTagIndx] != locationDescTag { return p[LocationTagIndx] == LocationDescTag
return errors.New("PSI does not contain a location descriptor, cannot update")
}
return nil
} }
// UpdateTime takes the byte slice representation of a psi-pmt as well as a time // UpdateTime takes the byte slice representation of a psi-pmt as well as a time
// as an integer and attempts to update the time descriptor in the pmt with the // as an integer and attempts to update the time descriptor in the pmt with the
// given time if the time descriptor exists, otherwise an error is returned // given time if the time descriptor exists, otherwise an error is returned
func UpdateTime(dst []byte, t uint64) error { func UpdateTime(dst []byte, t uint64) error {
err := HasTime(dst) if !HasTime(dst) {
if err != nil { return errors.New("pmt does not have time descriptor, cannot update")
return err
} }
ts := TimeBytes(uint64(t)) ts := TimeBytes(uint64(t))
for i := range dst[timeDataIndx : timeDataIndx+timeDataSize] { for i := range dst[TimeDataIndx : TimeDataIndx+TimeDataSize] {
dst[i+timeDataIndx] = ts[i] dst[i+TimeDataIndx] = ts[i]
} }
dst = updateCrc(dst) updateCrc(dst)
return nil return nil
} }
@ -88,11 +79,10 @@ func SyntaxSecLenFrom(p []byte) (l uint8) {
// timestamp, returning as a uint64 if it exists, otherwise returning 0 and nil // timestamp, returning as a uint64 if it exists, otherwise returning 0 and nil
// if it does not exist // if it does not exist
func TimeFrom(p []byte) (t uint64, err error) { func TimeFrom(p []byte) (t uint64, err error) {
err = HasTime(p) if !HasTime(p) {
if err != nil { return 0, errors.New("pmt does not have a time descriptor")
return 0, err
} }
t = binary.BigEndian.Uint64(p[timeDataIndx : timeDataIndx+timeDataSize]) t = binary.BigEndian.Uint64(p[TimeDataIndx : TimeDataIndx+TimeDataSize])
return t, nil return t, nil
} }
@ -100,11 +90,10 @@ func TimeFrom(p []byte) (t uint64, err error) {
// timestamp, returning as a uint64 if it exists, otherwise returning 0 and nil // timestamp, returning as a uint64 if it exists, otherwise returning 0 and nil
// if it does not exist // if it does not exist
func LocationFrom(p []byte) (g string, err error) { func LocationFrom(p []byte) (g string, err error) {
err = HasLocation(p) if !HasLocation(p) {
if err != nil { return "", errors.New("pmt does not have location descriptor")
return "", err
} }
gBytes := p[locationDataIndx : locationDataIndx+locationDataSize] gBytes := p[LocationDataIndx : LocationDataIndx+LocationDataSize]
gBytes = bytes.Trim(gBytes, "\x00") gBytes = bytes.Trim(gBytes, "\x00")
g = string(gBytes) g = string(gBytes)
return g, nil return g, nil
@ -113,7 +102,7 @@ func LocationFrom(p []byte) (g string, err error) {
// LocationStrBytes take a string of location data and converts to a 32 byte slice - // LocationStrBytes take a string of location data and converts to a 32 byte slice -
// easy update of slice representation of a pmt with location descriptor // easy update of slice representation of a pmt with location descriptor
func LocationStrBytes(s string) []byte { func LocationStrBytes(s string) []byte {
var b [locationDataSize]byte var b [LocationDataSize]byte
copy(b[:], s) copy(b[:], s)
return b[:] return b[:]
} }
@ -122,34 +111,22 @@ func LocationStrBytes(s string) []byte {
// descriptor and attempts to update the location data value with the passed string. // descriptor and attempts to update the location data value with the passed string.
// If the psi does not contain a location descriptor, and error is returned. // If the psi does not contain a location descriptor, and error is returned.
func UpdateLocation(d []byte, s string) error { func UpdateLocation(d []byte, s string) error {
err := HasLocation(d) if !HasLocation(d) {
if err != nil { return errors.New("pmt does not location descriptor, cannot update")
return err
} }
gb := LocationStrBytes(s) gb := LocationStrBytes(s)
for i := range d[locationDataIndx : locationDataIndx+locationDataSize] { copy(d[LocationDataIndx:LocationDataIndx+LocationDataSize], gb)
d[i+locationDataIndx] = gb[i] updateCrc(d)
}
d = updateCrc(d)
return nil return nil
} }
// addCrc appends a crc table to a given psi table in bytes func trimTo(d []byte, t byte) []byte {
func addCrc(out []byte) []byte { for i, b := range d {
out = append(out, make([]byte, 4)...) if b == t {
out = updateCrc(out) return d[:i]
return out
} }
}
// updateCrc updates the crc of psi bytes slice that may have been modified return d
func updateCrc(out []byte) []byte {
crcIndx := 4 + SyntaxSecLenFrom(out)
crc32 := crc32_Update(0xffffffff, crc32_MakeTable(bits.Reverse32(crc32.IEEE)), out[1:crcIndx])
out[crcIndx] = byte(crc32 >> 24)
out[crcIndx+1] = byte(crc32 >> 16)
out[crcIndx+2] = byte(crc32 >> 8)
out[crcIndx+3] = byte(crc32)
return out
} }
// addPadding adds an appropriate amount of padding to a pat or pmt table for // addPadding adds an appropriate amount of padding to a pat or pmt table for

View File

@ -26,10 +26,6 @@ LICENSE
package psi package psi
import (
"hash/crc32"
)
// Misc consts // Misc consts
const ( const (
PktSize = 184 PktSize = 184
@ -47,24 +43,24 @@ const (
// Table Type IDs // Table Type IDs
const ( const (
PATTableID = 0x00 patID = 0x00
PMTTableID = 0x02 pmtID = 0x02
) )
// Consts relating to time description // Consts relating to time description
const ( const (
timeDescTag = 234 TimeDescTag = 234
timeTagIndx = 13 TimeTagIndx = 13
timeDataIndx = 15 TimeDataIndx = 15
timeDataSize = 8 // bytes, because time is stored in uint64 TimeDataSize = 8 // bytes, because time is stored in uint64
) )
// Consts relating to location description // Consts relating to location description
const ( const (
locationDescTag = 235 LocationDescTag = 235
locationTagIndx = 23 LocationTagIndx = 23
locationDataIndx = 25 LocationDataIndx = 25
locationDataSize = 32 // bytes LocationDataSize = 32 // bytes
) )
// Other misc consts // Other misc consts
@ -92,21 +88,21 @@ type TSS struct {
Cni bool // Current/next indicator Cni bool // Current/next indicator
Sn byte // Section number Sn byte // Section number
Lsn byte // Last section number Lsn byte // Last section number
Sd SD // Specific data PAT/PMT Sd SpecificData // Specific data PAT/PMT
} }
// Specific Data, (could be PAT or PMT) // Specific Data, (could be PAT or PMT)
type SD interface { type SpecificData interface {
Bytes() []byte Bytes() []byte
} }
// Program association table, implements SD // Program association table, implements SpecificData
type PAT struct { type PAT struct {
Pn uint16 // Program Number Pn uint16 // Program Number
Pmpid uint16 // Program map PID Pmpid uint16 // Program map PID
} }
// Program mapping table, implements SD // Program mapping table, implements SpecificData
type PMT struct { type PMT struct {
Pcrpid uint16 // Program clock reference pid Pcrpid uint16 // Program clock reference pid
Pil uint16 // Program info length Pil uint16 // Program info length
@ -129,109 +125,6 @@ type Desc struct {
Dd []byte // Descriptor data Dd []byte // Descriptor data
} }
// ReadPSI creates a PSI data structure from a given byte slice that represents a PSI
func ReadPSI(data []byte) *PSI {
psi := PSI{}
pos := 0
psi.Pf = data[pos]
if psi.Pf != 0 {
panic("No support for pointer filler bytes")
}
psi.Tid = data[pos]
pos++
psi.Ssi = byteToBool(data[pos] & 0x80)
psi.Pb = byteToBool(data[pos] & 0x40)
psi.Sl = uint16(data[pos]&0x03)<<8 | uint16(data[pos+1])
pos += 2
psi.Tss = readTSS(data[pos:], &psi)
return &psi
}
// ReadTSS creates a TSS data structure from a given byte slice that represents a TSS
func readTSS(data []byte, p *PSI) *TSS {
tss := TSS{}
pos := 0
tss.Tide = uint16(data[pos])<<8 | uint16(data[pos+1])
pos += 2
tss.V = (data[pos] & 0x3e) >> 1
tss.Cni = byteToBool(data[pos] & 0x01)
pos++
tss.Sn = data[pos]
pos++
tss.Lsn = data[pos]
pos++
switch p.Tid {
case PATTableID:
tss.Sd = readPAT(data[pos:], &tss)
case PMTTableID:
tss.Sd = readPMT(data[pos:], &tss)
default:
panic("Can't yet deal with tables that are not PAT or PMT")
}
return &tss
}
// readPAT creates a pat struct based on a bytes slice representing a pat
func readPAT(data []byte, p *TSS) *PAT {
pat := PAT{}
pos := 0
pat.Pn = uint16(data[pos])<<8 | uint16(data[pos+1])
pos += 2
pat.Pmpid = uint16(data[pos]&0x1f)<<8 | uint16(data[pos+1])
return &pat
}
// readPMT creates a pmt struct based on a bytes slice that represents a pmt
func readPMT(data []byte, p *TSS) *PAT {
pmt := PMT{}
pos := 0
pmt.Pcrpid = uint16(data[pos]&0x1f)<<8 | uint16(data[pos+1])
pos += 2
pmt.Pil = uint16(data[pos]&0x03)<<8 | uint16(data[pos+1])
pos += 2
if pmt.Pil != 0 {
pmt.Pd = readDescs(data[pos:], int(pmt.Pil))
}
pos += int(pmt.Pil)
// TODO Read ES stuff
pmt.Essd = readEssd(data[pos:])
return nil
}
// readDescs reads provides a slice of Descs given a byte slice that represents Descs
// and the no of bytes that the descs accumilate
func readDescs(data []byte, descLen int) (o []Desc) {
pos := 0
o = make([]Desc, 1)
o[0].Dt = data[pos]
pos++
o[0].Dl = data[pos]
pos++
o[0].Dd = make([]byte, o[0].Dl)
for i := 0; i < int(o[0].Dl); i++ {
o[0].Dd[i] = data[pos]
pos++
}
if 2+len(o[0].Dd) != descLen {
panic("No support for reading more than one descriptor")
}
return
}
// readEESD creates an ESSD struct based on a bytes slice that represents ESSD
func readEssd(data []byte) *ESSD {
essd := ESSD{}
pos := 0
essd.St = data[pos]
pos++
essd.Epid = uint16(data[pos]&0x1f)<<8 | uint16(data[pos+1])
pos += 2
essd.Esil = uint16(data[pos]&0x03)<<8 | uint16(data[pos+1])
pos += 2
essd.Esd = readDescs(data[pos:], int(essd.Esil))
return &essd
}
// Bytes outputs a byte slice representation of the PSI // Bytes outputs a byte slice representation of the PSI
func (p *PSI) Bytes() []byte { func (p *PSI) Bytes() []byte {
out := make([]byte, 4) out := make([]byte, 4)
@ -253,7 +146,7 @@ func (t *TSS) Bytes() []byte {
out := make([]byte, TSSDefLen) out := make([]byte, TSSDefLen)
out[0] = byte(t.Tide >> 8) out[0] = byte(t.Tide >> 8)
out[1] = byte(t.Tide) out[1] = byte(t.Tide)
out[2] = 0xc0 | (0x3e & (t.V << 1)) | (0x01 & boolToByte(t.Cni)) out[2] = 0xc0 | (0x3e & (t.V << 1)) | (0x01 & asByte(t.Cni))
out[3] = t.Sn out[3] = t.Sn
out[4] = t.Lsn out[4] = t.Lsn
out = append(out, t.Sd.Bytes()...) out = append(out, t.Sd.Bytes()...)
@ -307,39 +200,9 @@ func (e *ESSD) Bytes() []byte {
return out return out
} }
func boolToByte(b bool) (o byte) { func asByte(b bool) byte {
if b { if b {
o = 0x01 return 0x01
} }
return return 0x00
}
func byteToBool(b byte) (o bool) {
if b != 0 {
o = true
}
return
}
func crc32_MakeTable(poly uint32) *crc32.Table {
var t crc32.Table
for i := range t {
crc := uint32(i) << 24
for j := 0; j < 8; j++ {
if crc&0x80000000 != 0 {
crc = (crc << 1) ^ poly
} else {
crc <<= 1
}
}
t[i] = crc
}
return &t
}
func crc32_Update(crc uint32, tab *crc32.Table, p []byte) uint32 {
for _, v := range p {
crc = tab[byte(crc>>24)^v] ^ (crc << 8)
}
return crc
} }

View File

@ -31,10 +31,94 @@ import (
"testing" "testing"
) )
// Some common manifestations of PSI
var (
// standardPat is a minimal PAT.
standardPat = PSI{
Pf: 0x00,
Tid: 0x00,
Ssi: true,
Pb: false,
Sl: 0x0d,
Tss: &TSS{
Tide: 0x01,
V: 0,
Cni: true,
Sn: 0,
Lsn: 0,
Sd: &PAT{
Pn: 0x01,
Pmpid: 0x1000,
},
},
}
// standardPmt is a minimal PMT, without time and location descriptors.
standardPmt = PSI{
Pf: 0x00,
Tid: 0x02,
Ssi: true,
Sl: 0x12,
Tss: &TSS{
Tide: 0x01,
V: 0,
Cni: true,
Sn: 0,
Lsn: 0,
Sd: &PMT{
Pcrpid: 0x0100, // wrong
Pil: 0,
Essd: &ESSD{
St: 0x1b,
Epid: 0x0100,
Esil: 0x00,
},
},
},
}
// standardPmtTimeLocation is a standard PMT with time and location
// descriptors, but time and location fields zeroed out.
standardPmtTimeLocation = PSI{
Pf: 0x00,
Tid: 0x02,
Ssi: true,
Sl: 0x3e,
Tss: &TSS{
Tide: 0x01,
V: 0,
Cni: true,
Sn: 0,
Lsn: 0,
Sd: &PMT{
Pcrpid: 0x0100,
Pil: PmtTimeLocationPil,
Pd: []Desc{
{
Dt: TimeDescTag,
Dl: TimeDataSize,
Dd: make([]byte, TimeDataSize),
},
{
Dt: LocationDescTag,
Dl: LocationDataSize,
Dd: make([]byte, LocationDataSize),
},
},
Essd: &ESSD{
St: 0x1b,
Epid: 0x0100,
Esil: 0x00,
},
},
},
}
)
// Times as ints for testing // Times as ints for testing
const ( const (
tstTime1 = 1235367435 // 0x49A2360B tstTime1 = 1235367435 // 0x49a2360b
tstTime2 = 1735357535 // 0x676F745F tstTime2 = 1735357535 // 0x676f745f
) )
// GPS string for testing // GPS string for testing
@ -59,12 +143,12 @@ var (
// Parts to construct bytes of pmt with time and bytes // Parts to construct bytes of pmt with time and bytes
var ( var (
pmtTimeLocationBytesPart1 = []byte{ pmtTimeLocationBytesPart1 = []byte{
0x00, 0x02, 0xb0, 0x3e, 0x00, 0x01, 0xc1, 0x00, 0x00, 0xe1, 0x00, 0xf0, 0x0a, 0x00, 0x02, 0xb0, 0x12, 0x00, 0x01, 0xc1, 0x00, 0x00, 0xe1, 0x00, 0xf0, 0x0a,
byte(timeDescTag), // Descriptor tag for timestamp TimeDescTag, // Descriptor tag for timestamp
byte(timeDataSize), // Length of bytes to follow TimeDataSize, // Length of bytes to follow
0x00, 0x00, 0x00, 0x00, 0x67, 0x6F, 0x74, 0x5F, // Timestamp data 0x00, 0x00, 0x00, 0x00, 0x67, 0x6f, 0x74, 0x5f, // Timestamp data
byte(locationDescTag), // Descriptor tag for location LocationDescTag, // Descriptor tag for location
byte(locationDataSize), // Length of bytes to follow LocationDataSize, // Length of bytes to follow
} }
pmtTimeLocationBytesPart2 = []byte{ pmtTimeLocationBytesPart2 = []byte{
0x1b, 0xe1, 0x00, 0xf0, 0x00, 0x1b, 0xe1, 0x00, 0xf0, 0x00,
@ -74,19 +158,19 @@ var (
var ( var (
// Bytes representing pmt with tstTime1 // Bytes representing pmt with tstTime1
pmtTimeBytes1 = []byte{ pmtTimeBytes1 = []byte{
0x00, 0x02, 0xb0, 0x1c, 0x00, 0x01, 0xc1, 0x00, 0x00, 0xe1, 0x00, 0xf0, 0x0a, 0x00, 0x02, 0xb0, 0x12, 0x00, 0x01, 0xc1, 0x00, 0x00, 0xe1, 0x00, 0xf0, 0x0a,
byte(timeDescTag), // Descriptor tag TimeDescTag, // Descriptor tag
byte(timeDataSize), // Length of bytes to follow TimeDataSize, // Length of bytes to follow
0x00, 0x00, 0x00, 0x00, 0x49, 0xA2, 0x36, 0x0B, // timestamp 0x00, 0x00, 0x00, 0x00, 0x49, 0xa2, 0x36, 0x0b, // timestamp
0x1b, 0xe1, 0x00, 0xf0, 0x00, 0x1b, 0xe1, 0x00, 0xf0, 0x00,
} }
// Bytes representing pmt with tstTime 2 // Bytes representing pmt with tstTime 2
pmtTimeBytes2 = []byte{ pmtTimeBytes2 = []byte{
0x00, 0x02, 0xb0, 0x1c, 0x00, 0x01, 0xc1, 0x00, 0x00, 0xe1, 0x00, 0xf0, 0x0a, 0x00, 0x02, 0xb0, 0x12, 0x00, 0x01, 0xc1, 0x00, 0x00, 0xe1, 0x00, 0xf0, 0x0a,
byte(timeDescTag), // Descriptor tag TimeDescTag, // Descriptor tag
byte(timeDataSize), // Length of bytes to follow TimeDataSize, // Length of bytes to follow
0x00, 0x00, 0x00, 0x00, 0x67, 0x6F, 0x74, 0x5F, // timestamp 0x00, 0x00, 0x00, 0x00, 0x67, 0x6f, 0x74, 0x5f, // timestamp
0x1b, 0xe1, 0x00, 0xf0, 0x00, 0x1b, 0xe1, 0x00, 0xf0, 0x00,
} }
@ -106,15 +190,15 @@ var bytesTests = []struct {
// Pat test // Pat test
{ {
name: "pat Bytes()", name: "pat Bytes()",
input: StdPat, input: standardPat,
want: StdPatBytes, want: StandardPatBytes,
}, },
// Pmt test data no descriptor // Pmt test data no descriptor
{ {
name: "pmt to Bytes() without descriptors", name: "pmt to Bytes() without descriptors",
input: StdPmt, input: standardPmt,
want: StdPmtBytes, want: StandardPmtBytes,
}, },
// Pmt with time descriptor // Pmt with time descriptor
@ -124,9 +208,9 @@ var bytesTests = []struct {
Pf: 0x00, Pf: 0x00,
Tid: 0x02, Tid: 0x02,
Ssi: true, Ssi: true,
Sl: uint16(0x1c), Sl: 0x12,
Tss: &TSS{ Tss: &TSS{
Tide: uint16(0x01), Tide: 0x01,
V: 0, V: 0,
Cni: true, Cni: true,
Sn: 0, Sn: 0,
@ -135,9 +219,9 @@ var bytesTests = []struct {
Pcrpid: 0x0100, // wrong Pcrpid: 0x0100, // wrong
Pil: 10, Pil: 10,
Pd: []Desc{ Pd: []Desc{
Desc{ {
Dt: byte(timeDescTag), Dt: TimeDescTag,
Dl: byte(timeDataSize), Dl: TimeDataSize,
Dd: TimeBytes(tstTime1), Dd: TimeBytes(tstTime1),
}, },
}, },
@ -159,9 +243,9 @@ var bytesTests = []struct {
Pf: 0x00, Pf: 0x00,
Tid: 0x02, Tid: 0x02,
Ssi: true, Ssi: true,
Sl: uint16(0x3e), Sl: 0x12,
Tss: &TSS{ Tss: &TSS{
Tide: uint16(0x01), Tide: 0x01,
V: 0, V: 0,
Cni: true, Cni: true,
Sn: 0, Sn: 0,
@ -170,14 +254,14 @@ var bytesTests = []struct {
Pcrpid: 0x0100, // wrong Pcrpid: 0x0100, // wrong
Pil: 10, Pil: 10,
Pd: []Desc{ Pd: []Desc{
Desc{ {
Dt: byte(timeDescTag), Dt: TimeDescTag,
Dl: byte(timeDataSize), Dl: TimeDataSize,
Dd: TimeBytes(tstTime2), Dd: TimeBytes(tstTime2),
}, },
Desc{ {
Dt: byte(locationDescTag), Dt: LocationDescTag,
Dl: byte(locationDataSize), Dl: LocationDataSize,
Dd: LocationStrBytes(locationTstStr1), Dd: LocationStrBytes(locationTstStr1),
}, },
}, },
@ -241,7 +325,7 @@ func TestTimeGet(t *testing.T) {
// TestLocationGet checks that we can correctly get location data from a pmt table // TestLocationGet checks that we can correctly get location data from a pmt table
func TestLocationGet(t *testing.T) { func TestLocationGet(t *testing.T) {
pb := StdPmtTimeLocation.Bytes() pb := standardPmtTimeLocation.Bytes()
err := UpdateLocation(pb, locationTstStr1) err := UpdateLocation(pb, locationTstStr1)
if err != nil { if err != nil {
t.Errorf("Error for TestLocationGet UpdateLocation(pb, locationTstStr1): %v", err) t.Errorf("Error for TestLocationGet UpdateLocation(pb, locationTstStr1): %v", err)
@ -270,6 +354,15 @@ func TestLocationUpdate(t *testing.T) {
} }
} }
func TestTrim(t *testing.T) {
test := []byte{0xa3, 0x01, 0x03, 0x00, 0xde}
want := []byte{0xa3, 0x01, 0x03}
got := trimTo(test, 0x00)
if !bytes.Equal(got, want) {
t.Errorf(errCmp, "TestTrim", want, got)
}
}
// buildPmtTimeLocationBytes is a helper function to help construct the byte slices // buildPmtTimeLocationBytes is a helper function to help construct the byte slices
// for pmts with time and location, as the location data field is 32 bytes, i.e. quite large // for pmts with time and location, as the location data field is 32 bytes, i.e. quite large
// to type out // to type out

View File

@ -27,95 +27,12 @@ LICENSE
package psi package psi
const ( const (
pmtTimeLocationPil = 44 PmtTimeLocationPil = 44
)
// Some common manifestations of PSI
var (
// PSI struct to represent basic pat
StdPat = PSI{
Pf: 0x00,
Tid: 0x00,
Ssi: true,
Pb: false,
Sl: uint16(0x0d),
Tss: &TSS{
Tide: uint16(0x01),
V: 0,
Cni: true,
Sn: 0,
Lsn: 0,
Sd: &PAT{
Pn: uint16(0x01),
Pmpid: uint16(0x1000),
},
},
}
// PSI struct to represent basic pmt without descriptors for time and location
StdPmt = PSI{
Pf: 0x00,
Tid: 0x02,
Ssi: true,
Sl: uint16(0x12),
Tss: &TSS{
Tide: uint16(0x01),
V: 0,
Cni: true,
Sn: 0,
Lsn: 0,
Sd: &PMT{
Pcrpid: 0x0100, // wrong
Pil: 0,
Essd: &ESSD{
St: 0x1b,
Epid: 0x0100,
Esil: 0x00,
},
},
},
}
// Std pmt with time and location descriptors, time and location fields are zeroed out
StdPmtTimeLocation = PSI{
Pf: 0x00,
Tid: 0x02,
Ssi: true,
Sl: uint16(0x3e),
Tss: &TSS{
Tide: uint16(0x01),
V: 0,
Cni: true,
Sn: 0,
Lsn: 0,
Sd: &PMT{
Pcrpid: 0x0100,
Pil: pmtTimeLocationPil,
Pd: []Desc{
Desc{
Dt: byte(timeDescTag),
Dl: byte(timeDataSize),
Dd: make([]byte, timeDataSize),
},
Desc{
Dt: byte(locationDescTag),
Dl: byte(locationDataSize),
Dd: make([]byte, locationDataSize),
},
},
Essd: &ESSD{
St: 0x1b,
Epid: 0x0100,
Esil: 0x00,
},
},
},
}
) )
// Std PSI in bytes form // Std PSI in bytes form
var ( var (
StdPatBytes = []byte{ StandardPatBytes = []byte{
0x00, // pointer 0x00, // pointer
// ---- section included in data sent to CRC32 during check // ---- section included in data sent to CRC32 during check
@ -136,7 +53,7 @@ var (
// 0x2a, 0xb1, 0x04, 0xb2, // CRC // 0x2a, 0xb1, 0x04, 0xb2, // CRC
// ---- // ----
} }
StdPmtBytes = []byte{ StandardPmtBytes = []byte{
0x00, // pointer 0x00, // pointer
// ---- section included in data sent to CRC32 during check // ---- section included in data sent to CRC32 during check