Merged in add-rtp-output (pull request #70)

Add rtp output

Approved-by: Alan Noble <anoble@gmail.com>
Approved-by: kortschak <dan@kortschak.io>
This commit is contained in:
Saxon Milton 2018-11-24 05:01:58 +00:00
commit 219cafc032
8 changed files with 413 additions and 45 deletions

View File

@ -72,6 +72,7 @@ const (
verticalFlipPtr
horizontalFlipPtr
logPathPtr
rtpAddrPtr
noOfConfigFlags
)
@ -106,7 +107,7 @@ var (
flagNames = [noOfConfigFlags]struct{ name, description string }{
{"Input", "The input type: Raspivid, File"},
{"InputCodec", "The codec of the input: H264, Mjpeg"},
{"Output", "The output type: Http, Rtmp, File"},
{"Output", "The output type: Http, Rtmp, File, Udp, Rtp"},
{"RtmpMethod", "The method used to send over rtmp: Ffmpeg, Librtmp"},
// NOTE: we add rtp here when we have this functionality
{"Packetization", "The method of data packetisation: Flv, Mpegts, None"},
@ -127,6 +128,7 @@ var (
{"VerticalFlip", "Flip video vertically: Yes, No"},
{"HorizontalFlip", "Flip video horizontally: Yes, No"},
{"LogPath", "Path for logging files (default is /var/log/netsender/)"},
{"RtpAddr", "Rtp destination address: <IP>:<port> (port is generally 6970-6999)"},
}
)
@ -193,8 +195,8 @@ func handleFlags() {
}
switch *configFlags[inputCodecPtr] {
case "H264Codec":
config.InputCodec = revid.H264Codec
case "H264":
config.InputCodec = revid.H264
case "":
default:
logger.Log(smartlogger.Error, pkg+"bad input codec argument")
@ -209,6 +211,11 @@ func handleFlags() {
config.Output = revid.Rtmp
case "FfmpegRtmp":
config.Output = revid.FfmpegRtmp
case "Udp":
config.Output = revid.Udp
case "Rtp":
config.Output = revid.Rtp
config.Packetization = revid.MpegtsRtp
case "":
default:
logger.Log(smartlogger.Error, pkg+"bad output argument")
@ -231,6 +238,8 @@ func handleFlags() {
config.Packetization = revid.Mpegts
case "Flv":
config.Packetization = revid.Flv
case "MpegtsRtp":
config.Packetization = revid.MpegtsRtp
case "":
default:
logger.Log(smartlogger.Error, pkg+"bad packetization argument")
@ -293,6 +302,7 @@ func handleFlags() {
config.Quantization = *configFlags[quantizationPtr]
config.Timeout = *configFlags[timeoutPtr]
config.IntraRefreshPeriod = *configFlags[intraRefreshPeriodPtr]
config.RtpAddress = *configFlags[rtpAddrPtr]
}
// initialize then run the main NetSender client

View File

@ -58,6 +58,7 @@ type Config struct {
Quantization string
Timeout string
IntraRefreshPeriod string
RtpAddress string
Logger Logger
}
@ -81,6 +82,9 @@ const (
No
Rtmp
FfmpegRtmp
Udp
MpegtsRtp
Rtp
)
// Default config settings
@ -102,6 +106,7 @@ const (
httpFramesPerClip = 7
defaultInputCodec = H264
defaultVerbosity = No
defaultRtpAddr = "localhost:6970"
)
// Validate checks for any errors in the config fields and defaults settings
@ -178,6 +183,8 @@ func (c *Config) Validate(r *Revid) error {
switch c.Output {
case File:
case Rtp:
case Udp:
case Rtmp, FfmpegRtmp:
if c.RtmpUrl == "" {
c.Logger.Log(smartlogger.Info, pkg+"no RTMP URL: falling back to HTTP")
@ -200,18 +207,6 @@ func (c *Config) Validate(r *Revid) error {
return errors.New("bad output type defined in config")
}
switch c.Packetization {
case None:
case Mpegts:
case Flv:
case NothingDefined:
c.Logger.Log(smartlogger.Warning, pkg+"no packetization option defined, defaulting",
"packetization", defaultPacketization)
c.Packetization = defaultPacketization
default:
return errors.New("bad packetization option defined in config")
}
switch c.HorizontalFlip {
case Yes:
case No:
@ -308,5 +303,9 @@ func (c *Config) Validate(r *Revid) error {
return errors.New("quantisation not unsigned integer or is over threshold")
}
}
if c.RtpAddress == "" {
c.RtpAddress = defaultRtpAddr
}
return nil
}

View File

@ -43,6 +43,7 @@ import (
"bitbucket.org/ausocean/av/stream/flv"
"bitbucket.org/ausocean/av/stream/lex"
"bitbucket.org/ausocean/av/stream/mts"
"bitbucket.org/ausocean/av/stream/rtp"
"bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/ring"
"bitbucket.org/ausocean/utils/smartlogger"
@ -226,6 +227,12 @@ func (r *Revid) reset(config Config) error {
r.destination = s
case Http:
r.destination = newHttpSender(r.ns, r.config.Logger.Log)
case Rtp, Udp:
s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log)
if err != nil {
return err
}
r.destination = s
}
switch r.config.Input {
@ -271,6 +278,10 @@ func (r *Revid) reset(config Config) error {
if err != nil {
return err
}
case MpegtsRtp:
r.config.Logger.Log(smartlogger.Info, pkg+"using RTP packetisation")
frameRate, _ := strconv.Atoi(r.config.FrameRate)
r.encoder = mts.NewEncoder(rtp.NewEncoder(&r.packer, frameRate), float64(frameRate))
}
return nil
@ -348,10 +359,12 @@ loop:
}
if err != nil && chunk.Len() > 11 {
r.config.Logger.Log(smartlogger.Debug, pkg+"send failed, trying again")
r.config.Logger.Log(smartlogger.Error, pkg+"first send failed", "error", err.Error())
// Try and send again
err = r.destination.send()
if err != nil {
r.config.Logger.Log(smartlogger.Error, pkg+"destination send error", "error", err.Error())
}
// if there's still an error we try and reconnect, unless we're stopping
for err != nil {

View File

@ -30,6 +30,7 @@ package revid
import (
"io"
"net"
"os"
"os/exec"
@ -289,3 +290,38 @@ func (s *rtmpSender) restart() error {
func (s *rtmpSender) close() error {
return s.sess.Close()
}
// rtpSender implements loadSender for a native udp destination.
type udpSender struct {
conn net.Conn
log func(lvl int8, msg string, args ...interface{})
chunk *ring.Chunk
}
func newUdpSender(addr string, log func(lvl int8, msg string, args ...interface{})) (*udpSender, error) {
conn, err := net.Dial("udp", addr)
if err != nil {
return nil, err
}
return &udpSender{
conn: conn,
log: log,
}, nil
}
func (s *udpSender) load(c *ring.Chunk) error {
s.chunk = c
return nil
}
func (s *udpSender) send() error {
_, err := s.chunk.WriteTo(s.conn)
return err
}
func (s *udpSender) release() {
s.chunk.Close()
s.chunk = nil
}
func (s *udpSender) close() error { return nil }

View File

@ -38,9 +38,13 @@ import (
"bitbucket.org/ausocean/av/stream/mts/pes"
)
const psiPacketSize = 184
const (
psiPacketSize = 184
psiSendCount = 100
)
// TODO: really need to finish the at and pmt stuff - this is too hacky
// TODO: Finish off mts/psi so that we can create pat and pmt tables instead
// of hardcoding.
var (
patTable = []byte{
0x00, // pointer
@ -163,6 +167,8 @@ type Encoder struct {
frameInterval time.Duration
ptsOffset time.Duration
psiCount uint
continuity map[int]byte
}
@ -195,32 +201,13 @@ const (
// generate handles the incoming data and generates equivalent mpegts packets -
// sending them to the output channel
func (e *Encoder) Encode(nalu []byte) error {
// Write PAT
patPkt := Packet{
PUSI: true,
PID: patPid,
CC: e.ccFor(patPid),
AFC: hasPayload,
Payload: patTable,
}
_, err := e.dst.Write(patPkt.Bytes())
if e.psiCount <= 0 {
err := e.writePSI()
if err != nil {
return err
}
// Write PMT.
pmtPkt := Packet{
PUSI: true,
PID: pmtPid,
CC: e.ccFor(pmtPid),
AFC: hasPayload,
Payload: pmtTable,
}
_, err = e.dst.Write(pmtPkt.Bytes())
if err != nil {
return err
}
e.psiCount--
// Prepare PES data.
pesPkt := pes.Packet{
StreamID: streamID,
@ -250,7 +237,6 @@ func (e *Encoder) Encode(nalu []byte) error {
pkt.PCR = e.pcr()
pusi = false
}
_, err := e.dst.Write(pkt.Bytes())
if err != nil {
return err
@ -262,6 +248,36 @@ func (e *Encoder) Encode(nalu []byte) error {
return nil
}
func (e *Encoder) writePSI() error {
// Write PAT
patPkt := Packet{
PUSI: true,
PID: patPid,
CC: e.ccFor(patPid),
AFC: hasPayload,
Payload: patTable,
}
_, err := e.dst.Write(patPkt.Bytes())
if err != nil {
return err
}
// Write PMT.
pmtPkt := Packet{
PUSI: true,
PID: pmtPid,
CC: e.ccFor(pmtPid),
AFC: hasPayload,
Payload: pmtTable,
}
_, err = e.dst.Write(pmtPkt.Bytes())
if err != nil {
return err
}
e.psiCount = psiSendCount
return nil
}
// tick advances the clock one frame interval.
func (e *Encoder) tick() {
e.clock += e.frameInterval

118
stream/rtp/encoder.go Normal file
View File

@ -0,0 +1,118 @@
/*
NAME
encoder.go
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton (saxon@ausocean.org)
LICENSE
encoder.go is Copyright (C) 2018 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package rtp
import (
"io"
"math/rand"
"time"
)
const (
yes = 1
no = 0
defaultPktType = 33
timestampFreq = 90000 // Hz
mtsSize = 188
bufferSize = 1000
sendLength = 7 * 188
)
// Encoder implements io writer and provides functionality to wrap data into
// rtp packets
type Encoder struct {
dst io.Writer
ssrc uint32
seqNo uint16
clock time.Duration
frameInterval time.Duration
fps int
buffer []byte
}
// NewEncoder returns a new Encoder type given an io.Writer - the destination
// after encoding and the desired fps
func NewEncoder(dst io.Writer, fps int) *Encoder {
return &Encoder{
dst: dst,
ssrc: rand.Uint32(),
frameInterval: time.Duration(float64(time.Second) / float64(fps)),
fps: fps,
buffer: make([]byte, 0, sendLength),
}
}
// Write provides an interface between a prior encoder and this rtp encoder,
// so that multiple layers of packetization can occur.
func (e *Encoder) Write(data []byte) (int, error) {
e.buffer = append(e.buffer, data...)
for len(e.buffer) >= sendLength {
e.Encode(e.buffer)
e.buffer = e.buffer[:0]
}
return len(data), nil
}
// Encode takes a nalu unit and encodes it into an rtp packet and
// writes to the io.Writer given in NewEncoder
func (e *Encoder) Encode(payload []byte) error {
pkt := Pkt{
V: rtpVer, // version
X: no, // header extension
CC: no, // CSRC count
M: no, // NOTE: need to check if this works (decoders should ignore this)
PT: defaultPktType, // 33 for mpegts
SN: e.nxtSeqNo(), // sequence number
TS: e.nxtTimestamp(), // timestamp
SSRC: e.ssrc, // source identifier
Payload: payload,
Padding: no,
}
_, err := e.dst.Write(pkt.Bytes())
if err != nil {
return err
}
e.tick()
return nil
}
// tick advances the clock one frame interval.
func (e *Encoder) tick() {
e.clock += e.frameInterval
}
// nxtTimestamp gets the next timestamp
func (e *Encoder) nxtTimestamp() uint32 {
return uint32(e.clock.Seconds() * timestampFreq)
}
// nxtSeqNo gets the next rtp packet sequence number
func (e *Encoder) nxtSeqNo() uint16 {
e.seqNo++
return e.seqNo - 1
}

102
stream/rtp/rtp.go Normal file
View File

@ -0,0 +1,102 @@
/*
NAME
rtp.go - provides a data structure intended to encapsulate the properties
of an rtp packet and also functions to allow manipulation of these packets.
DESCRIPTION
See Readme.md
AUTHOR
Saxon A. Nelson-Milton <saxon@ausocean.org>
LICENSE
rtp.go is Copyright (C) 2018 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses).
*/
/*
See https://tools.ietf.org/html/rfc6184 and https://tools.ietf.org/html/rfc3550
for rtp-h264 and rtp standards.
*/
package rtp
const (
rtpVer = 2
)
// Pkt provides fields consistent with RFC3550 definition of an rtp packet
// The padding indicator does not need to be set manually, only the padding length
type Pkt struct {
V byte // Version (currently 2)
p byte // Padding indicator (0 => padding, 1 => padding)
X byte // Extension header indicator
CC byte // CSRC count
M byte // Marker bit
PT byte // Packet type
SN uint16 // Synch number
TS uint32 // Timestamp
SSRC uint32 // Synchronisation source identifier
Payload []byte // H264 Payload data
Padding byte // No of bytes of padding
}
// Bytes provides a byte slice of the packet
func (p *Pkt) Bytes() []byte {
if p.V == 0 {
p.V = rtpVer
}
if p.Padding > 0 {
p.p = 1
}
if p.CC != 0 {
panic("CC has been set to something other than 0 - this is not supported yet.")
}
if p.X != 0 {
panic("rtp: X (extension header indicator) not 0, but extensiion headers not currently supported.")
}
if p.CC != 0 {
panic("rtp: CC (CSRC count) not 0, but CSRC headers not yet supported.")
}
const headSize = 3 * 4 // bytes
buf := make([]byte, headSize, headSize+len(p.Payload)+int(p.Padding))
buf[0] = p.V<<6 | p.p<<5 | p.CC
buf[1] = p.M<<7 | p.PT
buf[2] = byte(p.SN >> 8)
buf[3] = byte(p.SN)
buf[4] = byte(p.TS >> 24)
buf[5] = byte(p.TS >> 16)
buf[6] = byte(p.TS >> 8)
buf[7] = byte(p.TS)
buf[8] = byte(p.SSRC >> 24)
buf[9] = byte(p.SSRC >> 16)
buf[10] = byte(p.SSRC >> 8)
buf[11] = byte(p.SSRC)
buf = append(buf, p.Payload...)
// see https://tools.ietf.org/html/rfc3550 section 5.1 (padding). At end of
// rtp packet, padding may exist, with the last octet being the length of the
// padding including itself.
if p.Padding != 0 {
buf = buf[:cap(buf)]
buf[len(buf)-1] = byte(p.Padding)
}
return buf
}

74
stream/rtp/rtp_test.go Normal file
View File

@ -0,0 +1,74 @@
/*
NAME
rtp_test.go
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton (saxon@ausocean.org)
LICENSE
rtp_test.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package rtp
import (
"reflect"
"testing"
)
// TODO (saxon): add more tests
var rtpTests = []struct {
num int
pkt Pkt
want []byte
}{
{
num: 1,
pkt: Pkt{
V: 2,
p: 0,
X: 0,
CC: 0,
M: 0,
PT: 6,
SN: 167,
TS: 160,
SSRC: 10,
Payload: []byte{0x00, 0x01, 0x07, 0xf0, 0x56, 0x37, 0x0a, 0x0f},
Padding: 0,
},
want: []byte{
0x80, 0x06, 0x00, 0xa7,
0x00, 0x00, 0x00, 0xa0,
0x00, 0x00, 0x00, 0x0a,
0x00, 0x01, 0x07, 0xf0,
0x56, 0x37, 0x0a, 0x0f,
},
},
}
func TestRtpPktToByteSlice(t *testing.T) {
for _, test := range rtpTests {
got := test.pkt.Bytes()
if !reflect.DeepEqual(got, test.want) {
t.Errorf("unexpected error for test %v: got:%v want:%v", test.num, got,
test.want)
}
}
}