From 4e7e779de7b5cb29798010aca0e7e58947fafb71 Mon Sep 17 00:00:00 2001 From: saxon Date: Fri, 16 Nov 2018 19:35:19 +1030 Subject: [PATCH 01/37] rtp: created rtp packet structure, wrote byte function (interprets packet structure and creates equivalent byte slice and started writing test utilities --- stream/rtp/encoder.go | 28 ++++++++++++++ stream/rtp/rtp.go | 87 ++++++++++++++++++++++++++++++++++++++++++ stream/rtp/rtp_test.go | 43 +++++++++++++++++++++ 3 files changed, 158 insertions(+) create mode 100644 stream/rtp/encoder.go create mode 100644 stream/rtp/rtp.go create mode 100644 stream/rtp/rtp_test.go diff --git a/stream/rtp/encoder.go b/stream/rtp/encoder.go new file mode 100644 index 00000000..a5270a13 --- /dev/null +++ b/stream/rtp/encoder.go @@ -0,0 +1,28 @@ +/* +NAME + encoder.go + +DESCRIPTION + See Readme.md + +AUTHOR + Saxon Nelson-Milton (saxon@ausocean.org) + +LICENSE + encoder.go is Copyright (C) 2018 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + +package rtp diff --git a/stream/rtp/rtp.go b/stream/rtp/rtp.go new file mode 100644 index 00000000..1ee1422f --- /dev/null +++ b/stream/rtp/rtp.go @@ -0,0 +1,87 @@ +/* +NAME + rtp.go - provides a data structure intended to encapsulate the properties + of an rtp packet and also functions to allow manipulation of these packets. + +DESCRIPTION + See Readme.md + +AUTHOR + Saxon A. Nelson-Milton + +LICENSE + rtp.go is Copyright (C) 2018 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). +*/ + +// See https://tools.ietf.org/html/rfc6184 for rtp packet format + +package rtp + +const ( + rtpVer = 2 + defaultHeadSize = 3 * 4 // bytes +) + +type Pkt struct { + V byte + P byte + X byte + CC byte + M byte + PT byte + SN int16 + TS int32 + SSRC int32 + Payload []byte + Padding int +} + +func (p *Pkt) Bytes() []byte { + if p.V == 0 { + p.V = rtpVer + } + + if p.P != 0 && p.Padding == 0 { + panic("Padding bit set to something other than 1, but there is no padding size defined!") + } else if p.P == 0 && p.Padding != 0 { + panic("Padding bit is set to zero, but it's indicated that there is padding!") + } + + if p.CC != 0 { + panic("CC has been set to something other than 0 - this is not supported yet!") + } + + buf := make([]byte, defaultHeadSize, defaultHeadSize+len(p.Payload)+p.Padding) + // First 4 bytes + buf[0] |= p.V<<6 | p.P<<5 | p.CC + buf[1] |= p.M<<7 | p.PT + buf[2] |= byte(p.SN >> 8) + buf[3] |= byte(p.SN) + // Second lot of 4 bytes + buf[4] |= byte(p.TS >> 24) + buf[5] |= byte(p.TS >> 16) + buf[6] |= byte(p.TS >> 8) + buf[7] |= byte(p.TS) + // Third lot of 4 bytes + buf[8] |= byte(p.SSRC >> 24) + buf[9] |= byte(p.SSRC >> 16) + buf[10] |= byte(p.SSRC >> 8) + buf[11] |= byte(p.SSRC) + // Add payload and padding + buf = append(buf, p.Payload...) + buf = append(buf, append(make([]byte, p.Padding-1, p.Padding), byte(p.Padding))...) + return buf +} diff --git a/stream/rtp/rtp_test.go b/stream/rtp/rtp_test.go new file mode 100644 index 00000000..04213c75 --- /dev/null +++ b/stream/rtp/rtp_test.go @@ -0,0 +1,43 @@ +/* +NAME + rtp_test.go + +DESCRIPTION + See Readme.md + +AUTHOR + Saxon Nelson-Milton (saxon@ausocean.org) + +LICENSE + rtp_test.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + +package rtp + +var rtpTests = []struct { + pkt Pkt + byteOutput []byte +}{ + { + pkt: Pkt{}, + }, + { + name: "null short", + input: []byte{0x00, 0x00, 0x01, 0x0}, + delay: 0, + want: [][]byte{{0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01, 0x0}}, + }, +} From 9f329d49b6fc07c4942279b2f20f4165505e6d20 Mon Sep 17 00:00:00 2001 From: saxon Date: Sat, 17 Nov 2018 16:52:57 +1030 Subject: [PATCH 02/37] rtp: writing test function inside rtp_test.go --- stream/rtp/rtp.go | 30 +++++++++++++++++----------- stream/rtp/rtp_test.go | 45 +++++++++++++++++++++++++++++++++--------- 2 files changed, 55 insertions(+), 20 deletions(-) diff --git a/stream/rtp/rtp.go b/stream/rtp/rtp.go index 1ee1422f..164f9951 100644 --- a/stream/rtp/rtp.go +++ b/stream/rtp/rtp.go @@ -36,17 +36,17 @@ const ( ) type Pkt struct { - V byte - P byte - X byte - CC byte - M byte - PT byte - SN int16 - TS int32 - SSRC int32 - Payload []byte - Padding int + V byte // Version (currently 2) + P byte // Padding indicator (0 => padding, 1 => padding) + X byte // Extension header indicator + CC byte // CSRC count + M byte // Marker bit + PT byte // Packet type + SN int16 // Synch number + TS int32 // Timestamp + SSRC int32 // Synchronisation source identifier + Payload []byte // H264 Payload data + Padding int // No of bytes of padding } func (p *Pkt) Bytes() []byte { @@ -64,6 +64,14 @@ func (p *Pkt) Bytes() []byte { panic("CC has been set to something other than 0 - this is not supported yet!") } + if p.X != 0 { + panic("rtp: X (extension header indicator) not 0, but extensiion headers not currently supported!") + } + + if p.CC != 0 { + panic("rtp: CC (CSRC count) not 0, but CSRC headers not yet supported!") + } + buf := make([]byte, defaultHeadSize, defaultHeadSize+len(p.Payload)+p.Padding) // First 4 bytes buf[0] |= p.V<<6 | p.P<<5 | p.CC diff --git a/stream/rtp/rtp_test.go b/stream/rtp/rtp_test.go index 04213c75..aa84d92f 100644 --- a/stream/rtp/rtp_test.go +++ b/stream/rtp/rtp_test.go @@ -27,17 +27,44 @@ LICENSE package rtp +import ( + "reflect" + "testing" +) + var rtpTests = []struct { - pkt Pkt - byteOutput []byte + num int + pkt Pkt + want []byte }{ { - pkt: Pkt{}, - }, - { - name: "null short", - input: []byte{0x00, 0x00, 0x01, 0x0}, - delay: 0, - want: [][]byte{{0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01, 0x0}}, + num: 1, + pkt: Pkt{ + V: 2, + P: 0, + X: 0, + CC: 0, + M: 0, + PT: 6, + SN: 167, + TS: 160, + SSRC: 10, + Payload: []byte{0x00, 0x01, 0x07, 0xf0, 0x56, 0x37, 0x0a, 0x0f}, + Padding: 0, + }, + want: []byte{ + 0x80, 0x06, 0x00, 0xa7, + 0x00, 0x00, 0x00, 0xa0, + 0x00, 0x00, 0x00, 0x0a, + 0x00, 0x01, 0x07, 0xf0, + 0x56, 0x37, 0x0a, 0x0f, + }, }, } + +func TestRtpPktToByteSlice(t *testing.T) { + got := rtpTests.pkt + if !reflect.DeepEqual(got, tests.Want) { + t.Errorf("unexpected error for %q: got:%v want:%v", test.num, got, tests.Want) + } +} From a6cbfee22bb38f4c72614fbbd088c526c1600ca2 Mon Sep 17 00:00:00 2001 From: saxon Date: Sat, 17 Nov 2018 17:04:26 +1030 Subject: [PATCH 03/37] rtp: finished testing function, works, but first test failing --- stream/rtp/rtp_test.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/stream/rtp/rtp_test.go b/stream/rtp/rtp_test.go index aa84d92f..3844865e 100644 --- a/stream/rtp/rtp_test.go +++ b/stream/rtp/rtp_test.go @@ -63,8 +63,10 @@ var rtpTests = []struct { } func TestRtpPktToByteSlice(t *testing.T) { - got := rtpTests.pkt - if !reflect.DeepEqual(got, tests.Want) { - t.Errorf("unexpected error for %q: got:%v want:%v", test.num, got, tests.Want) + for _, test := range rtpTests { + got := test.pkt + if !reflect.DeepEqual(got, test.want) { + t.Errorf("unexpected error for %q: got:%v want:%v", test.num, got, test.want) + } } } From 1a15889522b1b0a378a29ec64f8c5211cb872ed2 Mon Sep 17 00:00:00 2001 From: saxon Date: Sat, 17 Nov 2018 17:17:54 +1030 Subject: [PATCH 04/37] rtp: fixed bug by actually checking to see if there is padding before adding padding size to end buf - which would mean there's actually padding --- stream/rtp/rtp.go | 6 +++++- stream/rtp/rtp_test.go | 4 ++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/stream/rtp/rtp.go b/stream/rtp/rtp.go index 164f9951..d7fca6b2 100644 --- a/stream/rtp/rtp.go +++ b/stream/rtp/rtp.go @@ -73,6 +73,7 @@ func (p *Pkt) Bytes() []byte { } buf := make([]byte, defaultHeadSize, defaultHeadSize+len(p.Payload)+p.Padding) + // First 4 bytes buf[0] |= p.V<<6 | p.P<<5 | p.CC buf[1] |= p.M<<7 | p.PT @@ -88,8 +89,11 @@ func (p *Pkt) Bytes() []byte { buf[9] |= byte(p.SSRC >> 16) buf[10] |= byte(p.SSRC >> 8) buf[11] |= byte(p.SSRC) + // Add payload and padding buf = append(buf, p.Payload...) - buf = append(buf, append(make([]byte, p.Padding-1, p.Padding), byte(p.Padding))...) + if p.Padding != 0 { + buf = append(buf, append(make([]byte, p.Padding-1, p.Padding), byte(p.Padding))...) + } return buf } diff --git a/stream/rtp/rtp_test.go b/stream/rtp/rtp_test.go index 3844865e..1fe199f2 100644 --- a/stream/rtp/rtp_test.go +++ b/stream/rtp/rtp_test.go @@ -64,9 +64,9 @@ var rtpTests = []struct { func TestRtpPktToByteSlice(t *testing.T) { for _, test := range rtpTests { - got := test.pkt + got := test.pkt.Bytes() if !reflect.DeepEqual(got, test.want) { - t.Errorf("unexpected error for %q: got:%v want:%v", test.num, got, test.want) + t.Errorf("unexpected error for test %v: got:%v want:%v", test.num, got, test.want) } } } From ddf25e1fbe5d1ec185d59b09b1a0cec4268590ba Mon Sep 17 00:00:00 2001 From: saxon Date: Sat, 17 Nov 2018 17:43:04 +1030 Subject: [PATCH 05/37] rtp: started writing encoder for rtp. Needto work out what the packet type part of the header should be --- stream/rtp/encoder.go | 75 ++++++++++++++++++++++++++++++++++++++++++ stream/rtp/rtp.go | 10 ++---- stream/rtp/rtp_test.go | 4 ++- 3 files changed, 81 insertions(+), 8 deletions(-) diff --git a/stream/rtp/encoder.go b/stream/rtp/encoder.go index a5270a13..ada69395 100644 --- a/stream/rtp/encoder.go +++ b/stream/rtp/encoder.go @@ -26,3 +26,78 @@ LICENSE */ package rtp + +import ( + "io" + "time" +) + +// Time related constants. +const ( + rtpVer = 2 + yes = 1 + no = 0 + ccCount = 0 + // pcrFreq is the base Program Clock Reference frequency. + timestampFreq = 90000 // Hz +) + +type Encoder struct { + dst io.Writer + + clock time.Duration + frameInterval time.Duration +} + +// NewEncoder returns an Encoder with the specified frame rate. +func NewEncoder(dst io.Writer, fps float64) *Encoder { + return &Encoder{ + dst: dst, + frameInterval: time.Duration(float64(time.Second) / fps), + } +} + +func (e *Encoder) Encode(nalu []byte) error { + pkt := Pkt{ + Pkt{ + V: rtpVer, // version + P: no, // padding + X: no, // header extension + CC: ccCount, + M: no, // NOTE: need to check if this works (decoders should ignore this) + PT: 6, + SN: 167, + TS: 160, + SSRC: 10, + Payload: []byte{0x00, 0x01, 0x07, 0xf0, 0x56, 0x37, 0x0a, 0x0f}, + Padding: 0, + }, + } + // Write rtp packet to + _, err := e.dst.Write(pkt.Bytes()) + if err != nil { + return err + } + + e.tick() + + return nil +} + +// tick advances the clock one frame interval. +func (e *Encoder) tick() { + e.clock += e.frameInterval +} + +// TODO: alter and make this work for rtp +func (e *Encoder) pts() uint64 { + return uint64((e.clock + e.ptsOffset).Seconds() * pcrFreq) +} + +// TODO: alter and apply this to rtp for sequence number +func (e *Encoder) ccFor(pid int) byte { + cc := e.continuity[pid] + const continuityCounterMask = 0xf + e.continuity[pid] = (cc + 1) & continuityCounterMask + return cc +} diff --git a/stream/rtp/rtp.go b/stream/rtp/rtp.go index d7fca6b2..bdf81e93 100644 --- a/stream/rtp/rtp.go +++ b/stream/rtp/rtp.go @@ -31,8 +31,7 @@ LICENSE package rtp const ( - rtpVer = 2 - defaultHeadSize = 3 * 4 // bytes + rtpVer = 2 ) type Pkt struct { @@ -72,25 +71,22 @@ func (p *Pkt) Bytes() []byte { panic("rtp: CC (CSRC count) not 0, but CSRC headers not yet supported!") } - buf := make([]byte, defaultHeadSize, defaultHeadSize+len(p.Payload)+p.Padding) + const headSize = 3 * 4 // bytes + buf := make([]byte, headSize, headSize+len(p.Payload)+p.Padding) - // First 4 bytes buf[0] |= p.V<<6 | p.P<<5 | p.CC buf[1] |= p.M<<7 | p.PT buf[2] |= byte(p.SN >> 8) buf[3] |= byte(p.SN) - // Second lot of 4 bytes buf[4] |= byte(p.TS >> 24) buf[5] |= byte(p.TS >> 16) buf[6] |= byte(p.TS >> 8) buf[7] |= byte(p.TS) - // Third lot of 4 bytes buf[8] |= byte(p.SSRC >> 24) buf[9] |= byte(p.SSRC >> 16) buf[10] |= byte(p.SSRC >> 8) buf[11] |= byte(p.SSRC) - // Add payload and padding buf = append(buf, p.Payload...) if p.Padding != 0 { buf = append(buf, append(make([]byte, p.Padding-1, p.Padding), byte(p.Padding))...) diff --git a/stream/rtp/rtp_test.go b/stream/rtp/rtp_test.go index 1fe199f2..3d034c32 100644 --- a/stream/rtp/rtp_test.go +++ b/stream/rtp/rtp_test.go @@ -32,6 +32,7 @@ import ( "testing" ) +// TODO (saxon): add more tests var rtpTests = []struct { num int pkt Pkt @@ -66,7 +67,8 @@ func TestRtpPktToByteSlice(t *testing.T) { for _, test := range rtpTests { got := test.pkt.Bytes() if !reflect.DeepEqual(got, test.want) { - t.Errorf("unexpected error for test %v: got:%v want:%v", test.num, got, test.want) + t.Errorf("unexpected error for test %v: got:%v want:%v", test.num, got, + test.want) } } } From 5cc35a77a5d447a2015dccec5f9968feedfdbcb0 Mon Sep 17 00:00:00 2001 From: saxon Date: Sat, 17 Nov 2018 23:17:08 +1030 Subject: [PATCH 06/37] rtp: finished encoder file - wrote encode function, wrote timestamp function and sequence number function - need to test --- stream/rtp/encoder.go | 65 +++++++++++++++++++++---------------------- stream/rtp/rtp.go | 6 ++-- 2 files changed, 35 insertions(+), 36 deletions(-) diff --git a/stream/rtp/encoder.go b/stream/rtp/encoder.go index ada69395..315f66e8 100644 --- a/stream/rtp/encoder.go +++ b/stream/rtp/encoder.go @@ -29,51 +29,52 @@ package rtp import ( "io" + "math/rand" "time" ) -// Time related constants. const ( - rtpVer = 2 - yes = 1 - no = 0 - ccCount = 0 - // pcrFreq is the base Program Clock Reference frequency. - timestampFreq = 90000 // Hz + yes = 1 + no = 0 + defaultPktType = 1 + timestampFreq = 90000 // Hz ) type Encoder struct { - dst io.Writer - + dst io.Writer + ssrc uint32 + seqNo uint16 clock time.Duration frameInterval time.Duration } -// NewEncoder returns an Encoder with the specified frame rate. +// NewEncoder returns a new Encoder type given an io.Writer - the destination +// after encoding and the desired fps func NewEncoder(dst io.Writer, fps float64) *Encoder { return &Encoder{ dst: dst, + ssrc: rand.Uint32(), frameInterval: time.Duration(float64(time.Second) / fps), } } +// Encode takes a nalu unit and encodes it into an rtp packet and +// writes to the io.Writer given in NewEncoder func (e *Encoder) Encode(nalu []byte) error { pkt := Pkt{ - Pkt{ - V: rtpVer, // version - P: no, // padding - X: no, // header extension - CC: ccCount, - M: no, // NOTE: need to check if this works (decoders should ignore this) - PT: 6, - SN: 167, - TS: 160, - SSRC: 10, - Payload: []byte{0x00, 0x01, 0x07, 0xf0, 0x56, 0x37, 0x0a, 0x0f}, - Padding: 0, - }, + V: rtpVer, // version + P: no, // padding + X: no, // header extension + CC: no, // CSRC count + M: no, // NOTE: need to check if this works (decoders should ignore this) + PT: defaultPktType, // NOTE: 1-23 according to rtp-h264 specs (don't think we need this) + SN: e.nxtSeqNo(), // sequence number + TS: e.nxtTimestamp(), // timestamp + SSRC: e.ssrc, // source identifier + Payload: nalu, + Padding: no, } - // Write rtp packet to + _, err := e.dst.Write(pkt.Bytes()) if err != nil { return err @@ -89,15 +90,13 @@ func (e *Encoder) tick() { e.clock += e.frameInterval } -// TODO: alter and make this work for rtp -func (e *Encoder) pts() uint64 { - return uint64((e.clock + e.ptsOffset).Seconds() * pcrFreq) +// nxtTimestamp gets the next timestamp +func (e *Encoder) nxtTimestamp() uint32 { + return uint32(e.clock.Seconds() * timestampFreq) } -// TODO: alter and apply this to rtp for sequence number -func (e *Encoder) ccFor(pid int) byte { - cc := e.continuity[pid] - const continuityCounterMask = 0xf - e.continuity[pid] = (cc + 1) & continuityCounterMask - return cc +// nxtSeqNo gets the next rtp packet sequence number +func (e *Encoder) nxtSeqNo() uint16 { + e.seqNo += 1 + return e.seqNo - 1 } diff --git a/stream/rtp/rtp.go b/stream/rtp/rtp.go index bdf81e93..e4f36218 100644 --- a/stream/rtp/rtp.go +++ b/stream/rtp/rtp.go @@ -41,9 +41,9 @@ type Pkt struct { CC byte // CSRC count M byte // Marker bit PT byte // Packet type - SN int16 // Synch number - TS int32 // Timestamp - SSRC int32 // Synchronisation source identifier + SN uint16 // Synch number + TS uint32 // Timestamp + SSRC uint32 // Synchronisation source identifier Payload []byte // H264 Payload data Padding int // No of bytes of padding } From d291744f6395e96154d26f763a42472b54689d44 Mon Sep 17 00:00:00 2001 From: saxon Date: Sun, 18 Nov 2018 12:32:11 +1030 Subject: [PATCH 07/37] revid: wrote loadsender implementation for udp sending for the rtp - need to test --- revid/senders.go | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/revid/senders.go b/revid/senders.go index 38f7856a..e313993b 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -30,6 +30,7 @@ package revid import ( "io" + "net" "os" "os/exec" @@ -289,3 +290,35 @@ func (s *rtmpSender) restart() error { func (s *rtmpSender) close() error { return s.sess.Close() } + +// rtpSender implements loadSender for a native udp destination. +type udpSender struct { + conn net.Conn + log func(lvl int8, msg string, args ...interface{}) + chunk *ring.Chunk +} + +func newUdpSender(addr string, log func(lvl int8, msg string, + args ...interface{})) (*udpSender, error) { + + conn, err := net.Dial("udp", addr) + if err != nil { + return nil, err + } + return &udpSender{ + conn: conn, + log: log, + }, nil +} + +func (s *udpSender) load(c *ring.Chunk) error { + s.chunk = c + return nil +} + +func (s *udpSender) send() error { + _, err := s.chunk.WriteTo(s.conn) + return err +} + +func (s *udpSender) close() error { return nil } From 3515958a9906599369493571d1d539527e0f58a7 Mon Sep 17 00:00:00 2001 From: saxon Date: Sun, 18 Nov 2018 13:22:48 +1030 Subject: [PATCH 08/37] rtp: added rtp and udp options to the revid config --- revid/config.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/revid/config.go b/revid/config.go index ea00f392..7550464d 100644 --- a/revid/config.go +++ b/revid/config.go @@ -58,6 +58,7 @@ type Config struct { Quantization string Timeout string IntraRefreshPeriod string + UdpAddress string Logger Logger } @@ -81,6 +82,8 @@ const ( No Rtmp FfmpegRtmp + Udp + Rtp ) // Default config settings From f82c26ee628af8d51ec6b460893220ea8332f6d0 Mon Sep 17 00:00:00 2001 From: saxon Date: Sun, 18 Nov 2018 13:23:14 +1030 Subject: [PATCH 09/37] rtp: added udp output and rtp packetisation options to revid-cli --- cmd/revid-cli/main.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 3f11f042..0ade1a26 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -72,6 +72,7 @@ const ( verticalFlipPtr horizontalFlipPtr logPathPtr + udpAddrPtr noOfConfigFlags ) @@ -83,6 +84,7 @@ const ( revidStopTime = 5 * time.Second defaultLogPath = "/var/log/netsender/" pkg = "revid-cli:" + defaultUdpAddr = "localhost:6970" ) // canProfile is set to false with revid-cli is built with "-tags profile". @@ -106,10 +108,10 @@ var ( flagNames = [noOfConfigFlags]struct{ name, description string }{ {"Input", "The input type: Raspivid, File"}, {"InputCodec", "The codec of the input: H264, Mjpeg"}, - {"Output", "The output type: Http, Rtmp, File"}, + {"Output", "The output type: Http, Rtmp, File, Udp"}, {"RtmpMethod", "The method used to send over rtmp: Ffmpeg, Librtmp"}, // NOTE: we add rtp here when we have this functionality - {"Packetization", "The method of data packetisation: Flv, Mpegts, None"}, + {"Packetization", "The method of data packetisation: Flv, Mpegts, Rtp, None"}, {"QuantizationMode", "Whether quantization if on or off (variable bitrate): On, Off"}, {"Verbosity", "Verbosity: Info, Warning, Error, Fatal"}, {"FramesPerClip", "Number of frames per clip sent"}, @@ -127,6 +129,7 @@ var ( {"VerticalFlip", "Flip video vertically: Yes, No"}, {"HorizontalFlip", "Flip video horizontally: Yes, No"}, {"LogPath", "Path for logging files (default is /var/log/netsender/)"}, + {"UdpAddr", "UDP destination address: : (port is generally 6970-6999)"}, } ) @@ -209,6 +212,8 @@ func handleFlags() { config.Output = revid.Rtmp case "FfmpegRtmp": config.Output = revid.FfmpegRtmp + case "Udp": + config.Output = revid.Udp case "": default: logger.Log(smartlogger.Error, pkg+"bad output argument") @@ -231,6 +236,8 @@ func handleFlags() { config.Packetization = revid.Mpegts case "Flv": config.Packetization = revid.Flv + case "Rtp": + config.Packetization = revid.Rtp case "": default: logger.Log(smartlogger.Error, pkg+"bad packetization argument") @@ -293,6 +300,7 @@ func handleFlags() { config.Quantization = *configFlags[quantizationPtr] config.Timeout = *configFlags[timeoutPtr] config.IntraRefreshPeriod = *configFlags[intraRefreshPeriodPtr] + config.UdpAddress = *configFlags[udpAddrPtr] } // initialize then run the main NetSender client From c0e6ba2a5b8523a875bf0c9604e8cf4dae78bd9b Mon Sep 17 00:00:00 2001 From: saxon Date: Sun, 18 Nov 2018 13:56:29 +1030 Subject: [PATCH 10/37] rtp: checking for udp output and rtp packetisation in revid such that the right sender and encoder is selected --- cmd/revid-cli/main.go | 1 - revid/config.go | 1 + revid/revid.go | 11 +++++++++++ revid/senders.go | 2 ++ 4 files changed, 14 insertions(+), 1 deletion(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 0ade1a26..2423898a 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -84,7 +84,6 @@ const ( revidStopTime = 5 * time.Second defaultLogPath = "/var/log/netsender/" pkg = "revid-cli:" - defaultUdpAddr = "localhost:6970" ) // canProfile is set to false with revid-cli is built with "-tags profile". diff --git a/revid/config.go b/revid/config.go index 7550464d..d0cfa9a4 100644 --- a/revid/config.go +++ b/revid/config.go @@ -105,6 +105,7 @@ const ( httpFramesPerClip = 7 defaultInputCodec = H264 defaultVerbosity = No + defaultUdpAddr = "localhost:6970" ) // Validate checks for any errors in the config fields and defaults settings diff --git a/revid/revid.go b/revid/revid.go index 1e532662..8f18ec32 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -43,6 +43,7 @@ import ( "bitbucket.org/ausocean/av/stream/flv" "bitbucket.org/ausocean/av/stream/lex" "bitbucket.org/ausocean/av/stream/mts" + "bitbucket.org/ausocean/av/stream/rtp" "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/ring" "bitbucket.org/ausocean/utils/smartlogger" @@ -226,6 +227,12 @@ func (r *Revid) reset(config Config) error { r.destination = s case Http: r.destination = newHttpSender(r.ns, r.config.Logger.Log) + case Udp: + s, err := newUdpSender(defaultUdpAddr, r.config.Logger.Log) + if err != nil { + return err + } + r.destination = s } switch r.config.Input { @@ -271,6 +278,10 @@ func (r *Revid) reset(config Config) error { if err != nil { return err } + case Rtp: + r.config.Logger.Log(smartlogger.Info, pkg+"using RTP packetisation") + frameRate, _ := strconv.Atoi(r.config.FrameRate) + r.encoder = rtp.NewEncoder(&r.packer, frameRate) } return nil diff --git a/revid/senders.go b/revid/senders.go index e313993b..8a7466ab 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -321,4 +321,6 @@ func (s *udpSender) send() error { return err } +func (s *udpSender) release() {} + func (s *udpSender) close() error { return nil } From fce0937810642f8eab4f0f9fdd89b7565f386e2d Mon Sep 17 00:00:00 2001 From: saxon Date: Sun, 18 Nov 2018 18:13:38 +1030 Subject: [PATCH 11/37] rtp: wrote some code so that config validated udp and rtp stuff as well --- cmd/revid-cli/main.go | 4 ++-- revid/config.go | 4 ++++ revid/revid.go | 1 - revid/senders.go | 5 ++++- stream/rtp/encoder.go | 4 ++-- 5 files changed, 12 insertions(+), 6 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 2423898a..b776e6aa 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -195,8 +195,8 @@ func handleFlags() { } switch *configFlags[inputCodecPtr] { - case "H264Codec": - config.InputCodec = revid.H264Codec + case "H264": + config.InputCodec = revid.H264 case "": default: logger.Log(smartlogger.Error, pkg+"bad input codec argument") diff --git a/revid/config.go b/revid/config.go index d0cfa9a4..93d46c33 100644 --- a/revid/config.go +++ b/revid/config.go @@ -136,6 +136,7 @@ func (c *Config) Validate(r *Revid) error { switch c.Input { case Raspivid: case File: + c.Logger.Log(smartlogger.Info, pkg+"Using file input", "input") case NothingDefined: c.Logger.Log(smartlogger.Warning, pkg+"no input type defined, defaulting", "input", defaultInput) @@ -159,6 +160,7 @@ func (c *Config) Validate(r *Revid) error { return errors.New("bad bitrate and quantization combination for H264 input") } } + c.Logger.Log(smartlogger.Info, pkg+"Reading h264 codec", "inputCodec") case Mjpeg: if c.Quantization != "" { quantization, err := strconv.Atoi(c.Quantization) @@ -182,6 +184,7 @@ func (c *Config) Validate(r *Revid) error { switch c.Output { case File: + case Udp: case Rtmp, FfmpegRtmp: if c.RtmpUrl == "" { c.Logger.Log(smartlogger.Info, pkg+"no RTMP URL: falling back to HTTP") @@ -208,6 +211,7 @@ func (c *Config) Validate(r *Revid) error { case None: case Mpegts: case Flv: + case Rtp: case NothingDefined: c.Logger.Log(smartlogger.Warning, pkg+"no packetization option defined, defaulting", "packetization", defaultPacketization) diff --git a/revid/revid.go b/revid/revid.go index 8f18ec32..11b7482e 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -487,7 +487,6 @@ func (r *Revid) setupInputForFile() error { return err } defer f.Close() - // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. return r.lexTo(r.encoder, f, delay) } diff --git a/revid/senders.go b/revid/senders.go index 8a7466ab..940a9c14 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -321,6 +321,9 @@ func (s *udpSender) send() error { return err } -func (s *udpSender) release() {} +func (s *udpSender) release() { + s.chunk.Close() + s.chunk = nil +} func (s *udpSender) close() error { return nil } diff --git a/stream/rtp/encoder.go b/stream/rtp/encoder.go index 315f66e8..68384fd4 100644 --- a/stream/rtp/encoder.go +++ b/stream/rtp/encoder.go @@ -50,11 +50,11 @@ type Encoder struct { // NewEncoder returns a new Encoder type given an io.Writer - the destination // after encoding and the desired fps -func NewEncoder(dst io.Writer, fps float64) *Encoder { +func NewEncoder(dst io.Writer, fps int) *Encoder { return &Encoder{ dst: dst, ssrc: rand.Uint32(), - frameInterval: time.Duration(float64(time.Second) / fps), + frameInterval: time.Duration(float64(time.Second) / float64(fps)), } } From 42097ddef7550ed913df58d651d48c54ef05430f Mon Sep 17 00:00:00 2001 From: saxon Date: Sun, 18 Nov 2018 19:34:02 +1030 Subject: [PATCH 12/37] rtp: added print messages for debugging purposes rtp: added print messages for debugging purposes rtp: removed binaries --- revid/senders.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/revid/senders.go b/revid/senders.go index 940a9c14..fac4b70a 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -29,6 +29,7 @@ LICENSE package revid import ( + "fmt" "io" "net" "os" @@ -317,6 +318,7 @@ func (s *udpSender) load(c *ring.Chunk) error { } func (s *udpSender) send() error { + fmt.Println(len(s.chunk.Bytes())) _, err := s.chunk.WriteTo(s.conn) return err } From 16614df9f51ddc108d4d1fbef4f744858da399dc Mon Sep 17 00:00:00 2001 From: saxon Date: Wed, 21 Nov 2018 00:09:42 +1030 Subject: [PATCH 13/37] rtp: started using mpegts encoder inside rtp encoder so that fragmentation i.e. smaller rtp packets is easier. Streams fine. --- revid/revid.go | 1 - revid/senders.go | 2 - stream/mts/encoder.go | 54 ++++++++++++++------------- stream/rtp/encoder.go | 85 +++++++++++++++++++++++++++++++++---------- 4 files changed, 95 insertions(+), 47 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index 11b7482e..e481bf51 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -389,7 +389,6 @@ loop: } } } - r.destination.release() r.config.Logger.Log(smartlogger.Debug, pkg+"done reading that clip from ring buffer") diff --git a/revid/senders.go b/revid/senders.go index fac4b70a..940a9c14 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -29,7 +29,6 @@ LICENSE package revid import ( - "fmt" "io" "net" "os" @@ -318,7 +317,6 @@ func (s *udpSender) load(c *ring.Chunk) error { } func (s *udpSender) send() error { - fmt.Println(len(s.chunk.Bytes())) _, err := s.chunk.WriteTo(s.conn) return err } diff --git a/stream/mts/encoder.go b/stream/mts/encoder.go index 3100c96e..8d557123 100644 --- a/stream/mts/encoder.go +++ b/stream/mts/encoder.go @@ -163,6 +163,8 @@ type Encoder struct { frameInterval time.Duration ptsOffset time.Duration + psiCount uint + continuity map[int]byte } @@ -195,32 +197,35 @@ const ( // generate handles the incoming data and generates equivalent mpegts packets - // sending them to the output channel func (e *Encoder) Encode(nalu []byte) error { - // Write PAT - patPkt := Packet{ - PUSI: true, - PID: patPid, - CC: e.ccFor(patPid), - AFC: hasPayload, - Payload: patTable, - } - _, err := e.dst.Write(patPkt.Bytes()) - if err != nil { - return err - } + if e.psiCount == 0 { + // Write PAT + patPkt := Packet{ + PUSI: true, + PID: patPid, + CC: e.ccFor(patPid), + AFC: hasPayload, + Payload: patTable, + } + _, err := e.dst.Write(patPkt.Bytes()) + if err != nil { + return err + } - // Write PMT. - pmtPkt := Packet{ - PUSI: true, - PID: pmtPid, - CC: e.ccFor(pmtPid), - AFC: hasPayload, - Payload: pmtTable, + // Write PMT. + pmtPkt := Packet{ + PUSI: true, + PID: pmtPid, + CC: e.ccFor(pmtPid), + AFC: hasPayload, + Payload: pmtTable, + } + _, err = e.dst.Write(pmtPkt.Bytes()) + if err != nil { + return err + } + e.psiCount = 100 } - _, err = e.dst.Write(pmtPkt.Bytes()) - if err != nil { - return err - } - + e.psiCount -= 1 // Prepare PES data. pesPkt := pes.Packet{ StreamID: streamID, @@ -250,7 +255,6 @@ func (e *Encoder) Encode(nalu []byte) error { pkt.PCR = e.pcr() pusi = false } - _, err := e.dst.Write(pkt.Bytes()) if err != nil { return err diff --git a/stream/rtp/encoder.go b/stream/rtp/encoder.go index 68384fd4..e082edeb 100644 --- a/stream/rtp/encoder.go +++ b/stream/rtp/encoder.go @@ -28,6 +28,7 @@ LICENSE package rtp import ( + "bitbucket.org/ausocean/av/stream/mts" "io" "math/rand" "time" @@ -36,51 +37,97 @@ import ( const ( yes = 1 no = 0 - defaultPktType = 1 + defaultPktType = 33 timestampFreq = 90000 // Hz + mtsSize = 188 + bufferSize = 1000 ) +type Queue struct { + buf [bufferSize][mtsSize]byte + right uint + left uint + len uint +} + +func (q *Queue) Write(frame []byte) (int, error) { + copy(q.buf[q.right][:], frame) + q.right += 1 + q.len += 1 + return 188, nil +} + +func (q *Queue) Read() []byte { + q.left += 1 + q.len -= 1 + return q.buf[q.left-1][:] +} + +func (q *Queue) Len() uint { + return q.len +} + +func (q *Queue) Reset() { + q.left = 0 + q.right = 0 + q.len = 0 +} + type Encoder struct { dst io.Writer ssrc uint32 seqNo uint16 clock time.Duration frameInterval time.Duration + fps int + mtsEncoder *mts.Encoder + queue *Queue } // NewEncoder returns a new Encoder type given an io.Writer - the destination // after encoding and the desired fps func NewEncoder(dst io.Writer, fps int) *Encoder { + q := &Queue{} return &Encoder{ dst: dst, ssrc: rand.Uint32(), frameInterval: time.Duration(float64(time.Second) / float64(fps)), + fps: fps, + mtsEncoder: mts.NewEncoder(q, float64(fps)), + queue: q, } } // Encode takes a nalu unit and encodes it into an rtp packet and // writes to the io.Writer given in NewEncoder func (e *Encoder) Encode(nalu []byte) error { - pkt := Pkt{ - V: rtpVer, // version - P: no, // padding - X: no, // header extension - CC: no, // CSRC count - M: no, // NOTE: need to check if this works (decoders should ignore this) - PT: defaultPktType, // NOTE: 1-23 according to rtp-h264 specs (don't think we need this) - SN: e.nxtSeqNo(), // sequence number - TS: e.nxtTimestamp(), // timestamp - SSRC: e.ssrc, // source identifier - Payload: nalu, - Padding: no, - } + e.mtsEncoder.Encode(nalu) + for e.queue.Len() > 0 { + var payload []byte + for i := 0; i < 7 && e.queue.Len() > 0; i++ { + payload = append(payload, e.queue.Read()...) + } + pkt := Pkt{ + V: rtpVer, // version + P: no, // padding + X: no, // header extension + CC: no, // CSRC count + M: no, // NOTE: need to check if this works (decoders should ignore this) + PT: defaultPktType, // 33 for mpegts + SN: e.nxtSeqNo(), // sequence number + TS: e.nxtTimestamp(), // timestamp + SSRC: e.ssrc, // source identifier + Payload: payload, + Padding: no, + } + _, err := e.dst.Write(pkt.Bytes()) + if err != nil { + return err + } - _, err := e.dst.Write(pkt.Bytes()) - if err != nil { - return err + e.tick() } - - e.tick() + e.queue.Reset() return nil } From 7a1c628731bfda55c8bd6edb13230d218580450d Mon Sep 17 00:00:00 2001 From: saxon Date: Wed, 21 Nov 2018 00:51:13 +1030 Subject: [PATCH 14/37] rtp: cleaned up queue structure in rtp/encoder.go - made safer, error checking etc --- stream/rtp/encoder.go | 48 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 38 insertions(+), 10 deletions(-) diff --git a/stream/rtp/encoder.go b/stream/rtp/encoder.go index e082edeb..becb9c6e 100644 --- a/stream/rtp/encoder.go +++ b/stream/rtp/encoder.go @@ -29,6 +29,7 @@ package rtp import ( "bitbucket.org/ausocean/av/stream/mts" + "errors" "io" "math/rand" "time" @@ -41,26 +42,52 @@ const ( timestampFreq = 90000 // Hz mtsSize = 188 bufferSize = 1000 + maxMtsPerRtp = 7 ) type Queue struct { - buf [bufferSize][mtsSize]byte + buf [][]byte right uint left uint len uint } -func (q *Queue) Write(frame []byte) (int, error) { - copy(q.buf[q.right][:], frame) - q.right += 1 - q.len += 1 - return 188, nil +func NewQueue(maxElements, elementSize uint) (*Queue, error) { + if maxElements <= 0 { + return nil, errors.New("Max elements of queue must be more than 0") + } + if elementSize <= 0 { + return nil, errors.New("Element size in queue must be more than 0") + } + buf := make([][]byte, maxElements) + for i := range buf { + buf[i] = make([]byte, elementSize) + } + return &Queue{ + buf: buf, + right: 0, + left: 0, + len: 0, + }, nil } -func (q *Queue) Read() []byte { +func (q *Queue) Write(frame []byte) (int, error) { + if q.right > uint(len(q.buf)) { + return 0, errors.New("Queue is full, cannot perform write.") + } + copy(q.buf[q.right], frame) + q.right += 1 + q.len += 1 + return len(frame), nil +} + +func (q *Queue) Read() ([]byte, error) { + if q.left >= q.right { + return nil, errors.New("Nothing to read from queue.") + } q.left += 1 q.len -= 1 - return q.buf[q.left-1][:] + return q.buf[q.left-1], nil } func (q *Queue) Len() uint { @@ -104,8 +131,9 @@ func (e *Encoder) Encode(nalu []byte) error { e.mtsEncoder.Encode(nalu) for e.queue.Len() > 0 { var payload []byte - for i := 0; i < 7 && e.queue.Len() > 0; i++ { - payload = append(payload, e.queue.Read()...) + for i := 0; i < maxMtsPerRtp && e.queue.Len() > 0; i++ { + data, _ := e.queue.Read() + payload = append(payload, data...) } pkt := Pkt{ V: rtpVer, // version From 2ff7e6318d3b853c193ca30bb27caf8114651f9e Mon Sep 17 00:00:00 2001 From: saxon Date: Wed, 21 Nov 2018 00:51:57 +1030 Subject: [PATCH 15/37] mts: made psi counter max packet interval const --- stream/mts/encoder.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/stream/mts/encoder.go b/stream/mts/encoder.go index 8d557123..9dc48dec 100644 --- a/stream/mts/encoder.go +++ b/stream/mts/encoder.go @@ -38,7 +38,10 @@ import ( "bitbucket.org/ausocean/av/stream/mts/pes" ) -const psiPacketSize = 184 +const ( + psiPacketSize = 184 + psiSendCount = 100 +) // TODO: really need to finish the at and pmt stuff - this is too hacky var ( @@ -197,7 +200,7 @@ const ( // generate handles the incoming data and generates equivalent mpegts packets - // sending them to the output channel func (e *Encoder) Encode(nalu []byte) error { - if e.psiCount == 0 { + if e.psiCount <= 0 { // Write PAT patPkt := Packet{ PUSI: true, @@ -223,7 +226,7 @@ func (e *Encoder) Encode(nalu []byte) error { if err != nil { return err } - e.psiCount = 100 + e.psiCount = psiSendCount } e.psiCount -= 1 // Prepare PES data. From 8be3c4de23c74e6d0a5c26269c7ec6fd7d288b7d Mon Sep 17 00:00:00 2001 From: saxon Date: Wed, 21 Nov 2018 01:07:38 +1030 Subject: [PATCH 16/37] revid-cli: allow setting of udp address through revid-cli options --- revid/config.go | 4 ++++ revid/revid.go | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/revid/config.go b/revid/config.go index 93d46c33..52a3eaaa 100644 --- a/revid/config.go +++ b/revid/config.go @@ -316,5 +316,9 @@ func (c *Config) Validate(r *Revid) error { return errors.New("quantisation not unsigned integer or is over threshold") } } + + if c.UdpAddress == "" { + c.UdpAddress = defaultUdpAddr + } return nil } diff --git a/revid/revid.go b/revid/revid.go index e481bf51..3aa66919 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -228,7 +228,7 @@ func (r *Revid) reset(config Config) error { case Http: r.destination = newHttpSender(r.ns, r.config.Logger.Log) case Udp: - s, err := newUdpSender(defaultUdpAddr, r.config.Logger.Log) + s, err := newUdpSender(r.config.UdpAddress, r.config.Logger.Log) if err != nil { return err } From 842fea8bcee7933c315dfee950d80460b1b0d2be Mon Sep 17 00:00:00 2001 From: saxon Date: Wed, 21 Nov 2018 01:17:48 +1030 Subject: [PATCH 17/37] rtp: fixed bug in Queue creation --- stream/rtp/encoder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stream/rtp/encoder.go b/stream/rtp/encoder.go index becb9c6e..527d8c6f 100644 --- a/stream/rtp/encoder.go +++ b/stream/rtp/encoder.go @@ -114,7 +114,7 @@ type Encoder struct { // NewEncoder returns a new Encoder type given an io.Writer - the destination // after encoding and the desired fps func NewEncoder(dst io.Writer, fps int) *Encoder { - q := &Queue{} + q, _ := NewQueue(bufferSize, mtsSize) return &Encoder{ dst: dst, ssrc: rand.Uint32(), From fc9b0e6ed72869d42745eb2b0b39de44704c5e17 Mon Sep 17 00:00:00 2001 From: saxon Date: Wed, 21 Nov 2018 12:34:00 +1030 Subject: [PATCH 18/37] mts: wrote func to wrap PSI packet creation --- stream/mts/encoder.go | 54 +++++++++++++++++++++++++------------------ 1 file changed, 31 insertions(+), 23 deletions(-) diff --git a/stream/mts/encoder.go b/stream/mts/encoder.go index 9dc48dec..2a970af9 100644 --- a/stream/mts/encoder.go +++ b/stream/mts/encoder.go @@ -201,32 +201,10 @@ const ( // sending them to the output channel func (e *Encoder) Encode(nalu []byte) error { if e.psiCount <= 0 { - // Write PAT - patPkt := Packet{ - PUSI: true, - PID: patPid, - CC: e.ccFor(patPid), - AFC: hasPayload, - Payload: patTable, - } - _, err := e.dst.Write(patPkt.Bytes()) + err := e.writePSI() if err != nil { return err } - - // Write PMT. - pmtPkt := Packet{ - PUSI: true, - PID: pmtPid, - CC: e.ccFor(pmtPid), - AFC: hasPayload, - Payload: pmtTable, - } - _, err = e.dst.Write(pmtPkt.Bytes()) - if err != nil { - return err - } - e.psiCount = psiSendCount } e.psiCount -= 1 // Prepare PES data. @@ -269,6 +247,36 @@ func (e *Encoder) Encode(nalu []byte) error { return nil } +func (e *Encoder) writePSI() error { + // Write PAT + patPkt := Packet{ + PUSI: true, + PID: patPid, + CC: e.ccFor(patPid), + AFC: hasPayload, + Payload: patTable, + } + _, err := e.dst.Write(patPkt.Bytes()) + if err != nil { + return err + } + + // Write PMT. + pmtPkt := Packet{ + PUSI: true, + PID: pmtPid, + CC: e.ccFor(pmtPid), + AFC: hasPayload, + Payload: pmtTable, + } + _, err = e.dst.Write(pmtPkt.Bytes()) + if err != nil { + return err + } + e.psiCount = psiSendCount + return nil +} + // tick advances the clock one frame interval. func (e *Encoder) tick() { e.clock += e.frameInterval From 589ec1525911cb921cf99db660ed72a772792537 Mon Sep 17 00:00:00 2001 From: saxon Date: Wed, 21 Nov 2018 12:46:01 +1030 Subject: [PATCH 19/37] revid-cli: made -Output=Rtp to mean rtp over udp --- cmd/revid-cli/main.go | 8 +++----- revid/config.go | 10 +++++----- revid/revid.go | 4 ++-- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index b776e6aa..d31ebd0c 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -72,7 +72,7 @@ const ( verticalFlipPtr horizontalFlipPtr logPathPtr - udpAddrPtr + rtpAddrPtr noOfConfigFlags ) @@ -128,7 +128,7 @@ var ( {"VerticalFlip", "Flip video vertically: Yes, No"}, {"HorizontalFlip", "Flip video horizontally: Yes, No"}, {"LogPath", "Path for logging files (default is /var/log/netsender/)"}, - {"UdpAddr", "UDP destination address: : (port is generally 6970-6999)"}, + {"RtpAddr", "Rtp destination address: : (port is generally 6970-6999)"}, } ) @@ -235,8 +235,6 @@ func handleFlags() { config.Packetization = revid.Mpegts case "Flv": config.Packetization = revid.Flv - case "Rtp": - config.Packetization = revid.Rtp case "": default: logger.Log(smartlogger.Error, pkg+"bad packetization argument") @@ -299,7 +297,7 @@ func handleFlags() { config.Quantization = *configFlags[quantizationPtr] config.Timeout = *configFlags[timeoutPtr] config.IntraRefreshPeriod = *configFlags[intraRefreshPeriodPtr] - config.UdpAddress = *configFlags[udpAddrPtr] + config.RtpAddress = *configFlags[RtpAddrPtr] } // initialize then run the main NetSender client diff --git a/revid/config.go b/revid/config.go index 52a3eaaa..2c960f33 100644 --- a/revid/config.go +++ b/revid/config.go @@ -58,7 +58,7 @@ type Config struct { Quantization string Timeout string IntraRefreshPeriod string - UdpAddress string + RtpAddress string Logger Logger } @@ -105,7 +105,7 @@ const ( httpFramesPerClip = 7 defaultInputCodec = H264 defaultVerbosity = No - defaultUdpAddr = "localhost:6970" + defaultRtpAddr = "localhost:6970" ) // Validate checks for any errors in the config fields and defaults settings @@ -184,7 +184,7 @@ func (c *Config) Validate(r *Revid) error { switch c.Output { case File: - case Udp: + case Rtp: case Rtmp, FfmpegRtmp: if c.RtmpUrl == "" { c.Logger.Log(smartlogger.Info, pkg+"no RTMP URL: falling back to HTTP") @@ -317,8 +317,8 @@ func (c *Config) Validate(r *Revid) error { } } - if c.UdpAddress == "" { - c.UdpAddress = defaultUdpAddr + if c.RtpAddress == "" { + c.RtpAddress = defaultRtpAddr } return nil } diff --git a/revid/revid.go b/revid/revid.go index 3aa66919..58022230 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -227,8 +227,8 @@ func (r *Revid) reset(config Config) error { r.destination = s case Http: r.destination = newHttpSender(r.ns, r.config.Logger.Log) - case Udp: - s, err := newUdpSender(r.config.UdpAddress, r.config.Logger.Log) + case Rtp: + s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log) if err != nil { return err } From f53c8663115c923fc9f800e4a664dd45eb13eb07 Mon Sep 17 00:00:00 2001 From: saxon Date: Wed, 21 Nov 2018 12:52:28 +1030 Subject: [PATCH 20/37] revid-cli: using MpegtsRtp to representation packetization under mpegtsrtp packetization --- revid/config.go | 1 + revid/revid.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/revid/config.go b/revid/config.go index 2c960f33..241df8b4 100644 --- a/revid/config.go +++ b/revid/config.go @@ -83,6 +83,7 @@ const ( Rtmp FfmpegRtmp Udp + MpegtsRtp Rtp ) diff --git a/revid/revid.go b/revid/revid.go index 58022230..15088494 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -278,7 +278,7 @@ func (r *Revid) reset(config Config) error { if err != nil { return err } - case Rtp: + case MpegtsRtp: r.config.Logger.Log(smartlogger.Info, pkg+"using RTP packetisation") frameRate, _ := strconv.Atoi(r.config.FrameRate) r.encoder = rtp.NewEncoder(&r.packer, frameRate) From b09a422baa5877ba5e903d70fea1b39071241295 Mon Sep 17 00:00:00 2001 From: saxon Date: Wed, 21 Nov 2018 12:56:03 +1030 Subject: [PATCH 21/37] rtp: use go imports --- stream/rtp/encoder.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/stream/rtp/encoder.go b/stream/rtp/encoder.go index 527d8c6f..d2983437 100644 --- a/stream/rtp/encoder.go +++ b/stream/rtp/encoder.go @@ -28,11 +28,12 @@ LICENSE package rtp import ( - "bitbucket.org/ausocean/av/stream/mts" "errors" "io" "math/rand" "time" + + "bitbucket.org/ausocean/av/stream/mts" ) const ( From f4d44e0c79034aa74c8ec17bc29f1d437a4124a1 Mon Sep 17 00:00:00 2001 From: saxon Date: Wed, 21 Nov 2018 12:57:58 +1030 Subject: [PATCH 22/37] rtp: using = instead of |= for setting of rtp packet fields --- stream/rtp/rtp.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/stream/rtp/rtp.go b/stream/rtp/rtp.go index e4f36218..b9234b0f 100644 --- a/stream/rtp/rtp.go +++ b/stream/rtp/rtp.go @@ -74,18 +74,18 @@ func (p *Pkt) Bytes() []byte { const headSize = 3 * 4 // bytes buf := make([]byte, headSize, headSize+len(p.Payload)+p.Padding) - buf[0] |= p.V<<6 | p.P<<5 | p.CC - buf[1] |= p.M<<7 | p.PT - buf[2] |= byte(p.SN >> 8) - buf[3] |= byte(p.SN) - buf[4] |= byte(p.TS >> 24) - buf[5] |= byte(p.TS >> 16) - buf[6] |= byte(p.TS >> 8) - buf[7] |= byte(p.TS) - buf[8] |= byte(p.SSRC >> 24) - buf[9] |= byte(p.SSRC >> 16) - buf[10] |= byte(p.SSRC >> 8) - buf[11] |= byte(p.SSRC) + buf[0] = p.V<<6 | p.P<<5 | p.CC + buf[1] = p.M<<7 | p.PT + buf[2] = byte(p.SN >> 8) + buf[3] = byte(p.SN) + buf[4] = byte(p.TS >> 24) + buf[5] = byte(p.TS >> 16) + buf[6] = byte(p.TS >> 8) + buf[7] = byte(p.TS) + buf[8] = byte(p.SSRC >> 24) + buf[9] = byte(p.SSRC >> 16) + buf[10] = byte(p.SSRC >> 8) + buf[11] = byte(p.SSRC) buf = append(buf, p.Payload...) if p.Padding != 0 { From 6f1515cc467e99feb52e069315f1a86d97a49898 Mon Sep 17 00:00:00 2001 From: saxon Date: Wed, 21 Nov 2018 13:00:48 +1030 Subject: [PATCH 23/37] rtp: simplified addition of padding to rtp packet --- stream/rtp/rtp.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/stream/rtp/rtp.go b/stream/rtp/rtp.go index b9234b0f..fa62d9fb 100644 --- a/stream/rtp/rtp.go +++ b/stream/rtp/rtp.go @@ -45,7 +45,7 @@ type Pkt struct { TS uint32 // Timestamp SSRC uint32 // Synchronisation source identifier Payload []byte // H264 Payload data - Padding int // No of bytes of padding + Padding byte // No of bytes of padding } func (p *Pkt) Bytes() []byte { @@ -72,7 +72,7 @@ func (p *Pkt) Bytes() []byte { } const headSize = 3 * 4 // bytes - buf := make([]byte, headSize, headSize+len(p.Payload)+p.Padding) + buf := make([]byte, headSize, headSize+len(p.Payload)+int(p.Padding)) buf[0] = p.V<<6 | p.P<<5 | p.CC buf[1] = p.M<<7 | p.PT @@ -89,7 +89,8 @@ func (p *Pkt) Bytes() []byte { buf = append(buf, p.Payload...) if p.Padding != 0 { - buf = append(buf, append(make([]byte, p.Padding-1, p.Padding), byte(p.Padding))...) + buf = buf[:cap(buf)] + buf[len(buf)-1] = byte(p.Padding) } return buf } From 338bc53e24528c291d1fe9104f6b60e24b176ce3 Mon Sep 17 00:00:00 2001 From: saxon Date: Wed, 21 Nov 2018 13:07:01 +1030 Subject: [PATCH 24/37] rtp: added standards information regarding padding --- stream/rtp/rtp.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/stream/rtp/rtp.go b/stream/rtp/rtp.go index fa62d9fb..3d474647 100644 --- a/stream/rtp/rtp.go +++ b/stream/rtp/rtp.go @@ -26,8 +26,10 @@ LICENSE along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). */ -// See https://tools.ietf.org/html/rfc6184 for rtp packet format - +/* +See https://tools.ietf.org/html/rfc6184 and https://tools.ietf.org/html/rfc3550 +for rtp-h264 and rtp standards. +*/ package rtp const ( @@ -88,6 +90,9 @@ func (p *Pkt) Bytes() []byte { buf[11] = byte(p.SSRC) buf = append(buf, p.Payload...) + // see https://tools.ietf.org/html/rfc3550 section 5.1 (padding). At end of + // rtp packet, padding may exist, with the last octet being the length of the + // padding including itself. if p.Padding != 0 { buf = buf[:cap(buf)] buf[len(buf)-1] = byte(p.Padding) From 4c01514b41a8d342b8d1aed51b6cf5ddff16c113 Mon Sep 17 00:00:00 2001 From: saxon Date: Wed, 21 Nov 2018 13:44:18 +1030 Subject: [PATCH 25/37] revid: commiting unstaged stuff --- cmd/revid-cli/main.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index d31ebd0c..e069956c 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -235,6 +235,8 @@ func handleFlags() { config.Packetization = revid.Mpegts case "Flv": config.Packetization = revid.Flv + case "MpegtsRtp": + config.Packetization = revid.MpegtsRtp case "": default: logger.Log(smartlogger.Error, pkg+"bad packetization argument") @@ -297,7 +299,7 @@ func handleFlags() { config.Quantization = *configFlags[quantizationPtr] config.Timeout = *configFlags[timeoutPtr] config.IntraRefreshPeriod = *configFlags[intraRefreshPeriodPtr] - config.RtpAddress = *configFlags[RtpAddrPtr] + config.RtpAddress = *configFlags[rtpAddrPtr] } // initialize then run the main NetSender client From 315ba6bd767587f5f77ea66389f97c35f14d1570 Mon Sep 17 00:00:00 2001 From: saxon Date: Wed, 21 Nov 2018 18:14:36 +1030 Subject: [PATCH 26/37] revid: back to working state --- cmd/revid-cli/main.go | 9 +++++++-- revid/config.go | 13 ------------- revid/revid.go | 2 ++ 3 files changed, 9 insertions(+), 15 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index e069956c..47dd0226 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -30,6 +30,7 @@ package main import ( "flag" + "fmt" "os" "runtime/pprof" "strconv" @@ -107,10 +108,10 @@ var ( flagNames = [noOfConfigFlags]struct{ name, description string }{ {"Input", "The input type: Raspivid, File"}, {"InputCodec", "The codec of the input: H264, Mjpeg"}, - {"Output", "The output type: Http, Rtmp, File, Udp"}, + {"Output", "The output type: Http, Rtmp, File, Udp, Rtp"}, {"RtmpMethod", "The method used to send over rtmp: Ffmpeg, Librtmp"}, // NOTE: we add rtp here when we have this functionality - {"Packetization", "The method of data packetisation: Flv, Mpegts, Rtp, None"}, + {"Packetization", "The method of data packetisation: Flv, Mpegts, None"}, {"QuantizationMode", "Whether quantization if on or off (variable bitrate): On, Off"}, {"Verbosity", "Verbosity: Info, Warning, Error, Fatal"}, {"FramesPerClip", "Number of frames per clip sent"}, @@ -213,6 +214,10 @@ func handleFlags() { config.Output = revid.FfmpegRtmp case "Udp": config.Output = revid.Udp + case "Rtp": + fmt.Println("rtp output") + config.Output = revid.Rtp + config.Packetization = revid.MpegtsRtp case "": default: logger.Log(smartlogger.Error, pkg+"bad output argument") diff --git a/revid/config.go b/revid/config.go index 241df8b4..eccbf29a 100644 --- a/revid/config.go +++ b/revid/config.go @@ -208,19 +208,6 @@ func (c *Config) Validate(r *Revid) error { return errors.New("bad output type defined in config") } - switch c.Packetization { - case None: - case Mpegts: - case Flv: - case Rtp: - case NothingDefined: - c.Logger.Log(smartlogger.Warning, pkg+"no packetization option defined, defaulting", - "packetization", defaultPacketization) - c.Packetization = defaultPacketization - default: - return errors.New("bad packetization option defined in config") - } - switch c.HorizontalFlip { case Yes: case No: diff --git a/revid/revid.go b/revid/revid.go index 15088494..4b1ed4f8 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -228,6 +228,7 @@ func (r *Revid) reset(config Config) error { case Http: r.destination = newHttpSender(r.ns, r.config.Logger.Log) case Rtp: + fmt.Println("here") s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log) if err != nil { return err @@ -279,6 +280,7 @@ func (r *Revid) reset(config Config) error { return err } case MpegtsRtp: + fmt.Println("here") r.config.Logger.Log(smartlogger.Info, pkg+"using RTP packetisation") frameRate, _ := strconv.Atoi(r.config.FrameRate) r.encoder = rtp.NewEncoder(&r.packer, frameRate) From cedc63a0b59e4b46a38fe877dfa034352629efbd Mon Sep 17 00:00:00 2001 From: saxon Date: Wed, 21 Nov 2018 18:16:10 +1030 Subject: [PATCH 27/37] revid: got rid of debug prints --- cmd/revid-cli/main.go | 2 -- revid/revid.go | 2 -- 2 files changed, 4 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 47dd0226..58c094f6 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -30,7 +30,6 @@ package main import ( "flag" - "fmt" "os" "runtime/pprof" "strconv" @@ -215,7 +214,6 @@ func handleFlags() { case "Udp": config.Output = revid.Udp case "Rtp": - fmt.Println("rtp output") config.Output = revid.Rtp config.Packetization = revid.MpegtsRtp case "": diff --git a/revid/revid.go b/revid/revid.go index 4b1ed4f8..15088494 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -228,7 +228,6 @@ func (r *Revid) reset(config Config) error { case Http: r.destination = newHttpSender(r.ns, r.config.Logger.Log) case Rtp: - fmt.Println("here") s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log) if err != nil { return err @@ -280,7 +279,6 @@ func (r *Revid) reset(config Config) error { return err } case MpegtsRtp: - fmt.Println("here") r.config.Logger.Log(smartlogger.Info, pkg+"using RTP packetisation") frameRate, _ := strconv.Atoi(r.config.FrameRate) r.encoder = rtp.NewEncoder(&r.packer, frameRate) From 1cebc821d1bb720a45b0a14c74c0137f74dda2b4 Mon Sep 17 00:00:00 2001 From: saxon Date: Wed, 21 Nov 2018 20:23:02 +1030 Subject: [PATCH 28/37] revid and rtp: simplified relationship between mts encoder and rtp encoder and revmoed queue --- revid/revid.go | 10 ++-- stream/rtp/encoder.go | 123 +++++++++++------------------------------- 2 files changed, 37 insertions(+), 96 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index 15088494..8e76c009 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -111,9 +111,10 @@ type Revid struct { cmd *exec.Cmd // lexTo, encoder and packer handle transcoding the input stream. - lexTo func(dst stream.Encoder, src io.Reader, delay time.Duration) error - encoder stream.Encoder - packer packer + lexTo func(dst stream.Encoder, src io.Reader, delay time.Duration) error + encoder stream.Encoder + encoder2 *rtp.Encoder + packer packer // buffer handles passing frames from the transcoder // to the target destination. buffer *ring.Buffer @@ -281,7 +282,8 @@ func (r *Revid) reset(config Config) error { case MpegtsRtp: r.config.Logger.Log(smartlogger.Info, pkg+"using RTP packetisation") frameRate, _ := strconv.Atoi(r.config.FrameRate) - r.encoder = rtp.NewEncoder(&r.packer, frameRate) + r.encoder2 = rtp.NewEncoder(&r.packer, frameRate) + r.encoder = mts.NewEncoder(r.encoder2, float64(frameRate)) } return nil diff --git a/stream/rtp/encoder.go b/stream/rtp/encoder.go index d2983437..134259ae 100644 --- a/stream/rtp/encoder.go +++ b/stream/rtp/encoder.go @@ -28,12 +28,9 @@ LICENSE package rtp import ( - "errors" "io" "math/rand" "time" - - "bitbucket.org/ausocean/av/stream/mts" ) const ( @@ -44,63 +41,10 @@ const ( mtsSize = 188 bufferSize = 1000 maxMtsPerRtp = 7 + bufferCap = 7 * 188 + sendLength = 7 * 188 ) -type Queue struct { - buf [][]byte - right uint - left uint - len uint -} - -func NewQueue(maxElements, elementSize uint) (*Queue, error) { - if maxElements <= 0 { - return nil, errors.New("Max elements of queue must be more than 0") - } - if elementSize <= 0 { - return nil, errors.New("Element size in queue must be more than 0") - } - buf := make([][]byte, maxElements) - for i := range buf { - buf[i] = make([]byte, elementSize) - } - return &Queue{ - buf: buf, - right: 0, - left: 0, - len: 0, - }, nil -} - -func (q *Queue) Write(frame []byte) (int, error) { - if q.right > uint(len(q.buf)) { - return 0, errors.New("Queue is full, cannot perform write.") - } - copy(q.buf[q.right], frame) - q.right += 1 - q.len += 1 - return len(frame), nil -} - -func (q *Queue) Read() ([]byte, error) { - if q.left >= q.right { - return nil, errors.New("Nothing to read from queue.") - } - q.left += 1 - q.len -= 1 - return q.buf[q.left-1], nil -} - -func (q *Queue) Len() uint { - return q.len -} - -func (q *Queue) Reset() { - q.left = 0 - q.right = 0 - q.len = 0 -} - type Encoder struct { dst io.Writer ssrc uint32 @@ -108,56 +52,51 @@ type Encoder struct { clock time.Duration frameInterval time.Duration fps int - mtsEncoder *mts.Encoder - queue *Queue + buffer []byte } // NewEncoder returns a new Encoder type given an io.Writer - the destination // after encoding and the desired fps func NewEncoder(dst io.Writer, fps int) *Encoder { - q, _ := NewQueue(bufferSize, mtsSize) return &Encoder{ dst: dst, ssrc: rand.Uint32(), frameInterval: time.Duration(float64(time.Second) / float64(fps)), fps: fps, - mtsEncoder: mts.NewEncoder(q, float64(fps)), - queue: q, + buffer: make([]byte, 0, bufferCap), } } +func (e *Encoder) Write(data []byte) (int, error) { + e.buffer = append(e.buffer, data...) + for len(e.buffer) >= sendLength { + e.Encode(e.buffer) + e.buffer = e.buffer[:0] + } + return len(data), nil +} + // Encode takes a nalu unit and encodes it into an rtp packet and // writes to the io.Writer given in NewEncoder -func (e *Encoder) Encode(nalu []byte) error { - e.mtsEncoder.Encode(nalu) - for e.queue.Len() > 0 { - var payload []byte - for i := 0; i < maxMtsPerRtp && e.queue.Len() > 0; i++ { - data, _ := e.queue.Read() - payload = append(payload, data...) - } - pkt := Pkt{ - V: rtpVer, // version - P: no, // padding - X: no, // header extension - CC: no, // CSRC count - M: no, // NOTE: need to check if this works (decoders should ignore this) - PT: defaultPktType, // 33 for mpegts - SN: e.nxtSeqNo(), // sequence number - TS: e.nxtTimestamp(), // timestamp - SSRC: e.ssrc, // source identifier - Payload: payload, - Padding: no, - } - _, err := e.dst.Write(pkt.Bytes()) - if err != nil { - return err - } - - e.tick() +func (e *Encoder) Encode(payload []byte) error { + pkt := Pkt{ + V: rtpVer, // version + P: no, // padding + X: no, // header extension + CC: no, // CSRC count + M: no, // NOTE: need to check if this works (decoders should ignore this) + PT: defaultPktType, // 33 for mpegts + SN: e.nxtSeqNo(), // sequence number + TS: e.nxtTimestamp(), // timestamp + SSRC: e.ssrc, // source identifier + Payload: payload, + Padding: no, } - e.queue.Reset() - + _, err := e.dst.Write(pkt.Bytes()) + if err != nil { + return err + } + e.tick() return nil } From efe2333683d6b2decc5d49035d147438aaf901da Mon Sep 17 00:00:00 2001 From: saxon Date: Wed, 21 Nov 2018 20:28:40 +1030 Subject: [PATCH 29/37] rtp: added some commenting --- stream/rtp/encoder.go | 8 +++++--- stream/rtp/rtp.go | 2 ++ 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/stream/rtp/encoder.go b/stream/rtp/encoder.go index 134259ae..6ce2d1b4 100644 --- a/stream/rtp/encoder.go +++ b/stream/rtp/encoder.go @@ -40,11 +40,11 @@ const ( timestampFreq = 90000 // Hz mtsSize = 188 bufferSize = 1000 - maxMtsPerRtp = 7 - bufferCap = 7 * 188 sendLength = 7 * 188 ) +// Encoder implements io writer and provides functionality to wrap data into +// rtp packets type Encoder struct { dst io.Writer ssrc uint32 @@ -63,10 +63,12 @@ func NewEncoder(dst io.Writer, fps int) *Encoder { ssrc: rand.Uint32(), frameInterval: time.Duration(float64(time.Second) / float64(fps)), fps: fps, - buffer: make([]byte, 0, bufferCap), + buffer: make([]byte, 0, sendLength), } } +// Write provides an interface between a prior encoder and this rtp encoder, +// so that multiple layers of packetization can occur. func (e *Encoder) Write(data []byte) (int, error) { e.buffer = append(e.buffer, data...) for len(e.buffer) >= sendLength { diff --git a/stream/rtp/rtp.go b/stream/rtp/rtp.go index 3d474647..98bc254e 100644 --- a/stream/rtp/rtp.go +++ b/stream/rtp/rtp.go @@ -36,6 +36,7 @@ const ( rtpVer = 2 ) +// Pkt provides fields consistent with RFC3550 definition of an rtp packet type Pkt struct { V byte // Version (currently 2) P byte // Padding indicator (0 => padding, 1 => padding) @@ -50,6 +51,7 @@ type Pkt struct { Padding byte // No of bytes of padding } +// Bytes provides a byte slice of the packet func (p *Pkt) Bytes() []byte { if p.V == 0 { p.V = rtpVer From 92294aed709b422bdec79cba0bb819d6cc453fd8 Mon Sep 17 00:00:00 2001 From: saxon Date: Wed, 21 Nov 2018 20:43:17 +1030 Subject: [PATCH 30/37] rtp: assert that udp with no packetization works --- revid/config.go | 1 + revid/revid.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/revid/config.go b/revid/config.go index eccbf29a..e87eb53d 100644 --- a/revid/config.go +++ b/revid/config.go @@ -186,6 +186,7 @@ func (c *Config) Validate(r *Revid) error { switch c.Output { case File: case Rtp: + case Udp: case Rtmp, FfmpegRtmp: if c.RtmpUrl == "" { c.Logger.Log(smartlogger.Info, pkg+"no RTMP URL: falling back to HTTP") diff --git a/revid/revid.go b/revid/revid.go index 8e76c009..7cc1a0d5 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -228,7 +228,7 @@ func (r *Revid) reset(config Config) error { r.destination = s case Http: r.destination = newHttpSender(r.ns, r.config.Logger.Log) - case Rtp: + case Rtp, Udp: s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log) if err != nil { return err From 4f4bebae2576161d36cf209faf157dabe843141b Mon Sep 17 00:00:00 2001 From: saxon Date: Wed, 21 Nov 2018 21:34:28 +1030 Subject: [PATCH 31/37] revid: removed revid's overwnship of rtp encoder --- revid/revid.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index 7cc1a0d5..c3f6ee53 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -111,10 +111,9 @@ type Revid struct { cmd *exec.Cmd // lexTo, encoder and packer handle transcoding the input stream. - lexTo func(dst stream.Encoder, src io.Reader, delay time.Duration) error - encoder stream.Encoder - encoder2 *rtp.Encoder - packer packer + lexTo func(dst stream.Encoder, src io.Reader, delay time.Duration) error + encoder stream.Encoder + packer packer // buffer handles passing frames from the transcoder // to the target destination. buffer *ring.Buffer @@ -282,8 +281,7 @@ func (r *Revid) reset(config Config) error { case MpegtsRtp: r.config.Logger.Log(smartlogger.Info, pkg+"using RTP packetisation") frameRate, _ := strconv.Atoi(r.config.FrameRate) - r.encoder2 = rtp.NewEncoder(&r.packer, frameRate) - r.encoder = mts.NewEncoder(r.encoder2, float64(frameRate)) + r.encoder = mts.NewEncoder(rtp.NewEncoder(&r.packer, frameRate), float64(frameRate)) } return nil From a231d57f06b7d7a4dff2e0f7c519a36cfcbe3416 Mon Sep 17 00:00:00 2001 From: saxon Date: Thu, 22 Nov 2018 10:21:25 +1030 Subject: [PATCH 32/37] revid: usage of nil error in logging message --- revid/revid.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index c3f6ee53..60a1d367 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -359,10 +359,12 @@ loop: } if err != nil && chunk.Len() > 11 { - r.config.Logger.Log(smartlogger.Debug, pkg+"send failed, trying again") + r.config.Logger.Log(smartlogger.Error, pkg+"first send failed", "error", err.Error()) // Try and send again err = r.destination.send() - r.config.Logger.Log(smartlogger.Error, pkg+"destination send error", "error", err.Error()) + if err != nil { + r.config.Logger.Log(smartlogger.Error, pkg+"destination send error", "error", err.Error()) + } // if there's still an error we try and reconnect, unless we're stopping for err != nil { From e32cf9c1b70b9f2e1a5093ac1dc64b983ac10349 Mon Sep 17 00:00:00 2001 From: saxon Date: Thu, 22 Nov 2018 10:41:33 +1030 Subject: [PATCH 33/37] mts: updated todo message in mpegts.go regarding plans for psi --- stream/mts/encoder.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/stream/mts/encoder.go b/stream/mts/encoder.go index 2a970af9..0cab8165 100644 --- a/stream/mts/encoder.go +++ b/stream/mts/encoder.go @@ -43,7 +43,8 @@ const ( psiSendCount = 100 ) -// TODO: really need to finish the at and pmt stuff - this is too hacky +// TODO: Finish off mts/psi so that we can create pat and pmt tables instead +// of hardcoding. var ( patTable = []byte{ 0x00, // pointer From 103bd2b91c30996732cc0462e686c935785c4fe8 Mon Sep 17 00:00:00 2001 From: saxon Date: Thu, 22 Nov 2018 19:32:31 +1030 Subject: [PATCH 34/37] rtp: removed exclamation marks in panics --- stream/rtp/rtp.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/stream/rtp/rtp.go b/stream/rtp/rtp.go index 98bc254e..45898a32 100644 --- a/stream/rtp/rtp.go +++ b/stream/rtp/rtp.go @@ -58,21 +58,21 @@ func (p *Pkt) Bytes() []byte { } if p.P != 0 && p.Padding == 0 { - panic("Padding bit set to something other than 1, but there is no padding size defined!") + panic("Padding bit set to something other than 1, but there is no padding size defined.") } else if p.P == 0 && p.Padding != 0 { - panic("Padding bit is set to zero, but it's indicated that there is padding!") + panic("Padding bit is set to zero, but it's indicated that there is padding.") } if p.CC != 0 { - panic("CC has been set to something other than 0 - this is not supported yet!") + panic("CC has been set to something other than 0 - this is not supported yet.") } if p.X != 0 { - panic("rtp: X (extension header indicator) not 0, but extensiion headers not currently supported!") + panic("rtp: X (extension header indicator) not 0, but extensiion headers not currently supported.") } if p.CC != 0 { - panic("rtp: CC (CSRC count) not 0, but CSRC headers not yet supported!") + panic("rtp: CC (CSRC count) not 0, but CSRC headers not yet supported.") } const headSize = 3 * 4 // bytes From 5eb7225da90e288ccf86bef1e3e06bbe6a84cfe8 Mon Sep 17 00:00:00 2001 From: saxon Date: Sat, 24 Nov 2018 12:14:44 +1030 Subject: [PATCH 35/37] revid & rtp: fixed up some logging, line breaks, and simplified some decrements/increments --- revid/config.go | 2 -- revid/revid.go | 2 ++ revid/senders.go | 3 +-- stream/mts/encoder.go | 2 +- stream/rtp/encoder.go | 2 +- 5 files changed, 5 insertions(+), 6 deletions(-) diff --git a/revid/config.go b/revid/config.go index e87eb53d..1ff69e1d 100644 --- a/revid/config.go +++ b/revid/config.go @@ -137,7 +137,6 @@ func (c *Config) Validate(r *Revid) error { switch c.Input { case Raspivid: case File: - c.Logger.Log(smartlogger.Info, pkg+"Using file input", "input") case NothingDefined: c.Logger.Log(smartlogger.Warning, pkg+"no input type defined, defaulting", "input", defaultInput) @@ -161,7 +160,6 @@ func (c *Config) Validate(r *Revid) error { return errors.New("bad bitrate and quantization combination for H264 input") } } - c.Logger.Log(smartlogger.Info, pkg+"Reading h264 codec", "inputCodec") case Mjpeg: if c.Quantization != "" { quantization, err := strconv.Atoi(c.Quantization) diff --git a/revid/revid.go b/revid/revid.go index 60a1d367..fde3efb7 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -391,6 +391,7 @@ loop: } } } + r.destination.release() r.config.Logger.Log(smartlogger.Debug, pkg+"done reading that clip from ring buffer") @@ -488,6 +489,7 @@ func (r *Revid) setupInputForFile() error { return err } defer f.Close() + // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. return r.lexTo(r.encoder, f, delay) } diff --git a/revid/senders.go b/revid/senders.go index 940a9c14..9bf37e05 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -298,8 +298,7 @@ type udpSender struct { chunk *ring.Chunk } -func newUdpSender(addr string, log func(lvl int8, msg string, - args ...interface{})) (*udpSender, error) { +func newUdpSender(addr string, log func(lvl int8, msg string, args ...interface{})) (*udpSender, error) { conn, err := net.Dial("udp", addr) if err != nil { diff --git a/stream/mts/encoder.go b/stream/mts/encoder.go index 0cab8165..8c6bce27 100644 --- a/stream/mts/encoder.go +++ b/stream/mts/encoder.go @@ -207,7 +207,7 @@ func (e *Encoder) Encode(nalu []byte) error { return err } } - e.psiCount -= 1 + e.psiCount-- // Prepare PES data. pesPkt := pes.Packet{ StreamID: streamID, diff --git a/stream/rtp/encoder.go b/stream/rtp/encoder.go index 6ce2d1b4..eb6e404e 100644 --- a/stream/rtp/encoder.go +++ b/stream/rtp/encoder.go @@ -114,6 +114,6 @@ func (e *Encoder) nxtTimestamp() uint32 { // nxtSeqNo gets the next rtp packet sequence number func (e *Encoder) nxtSeqNo() uint16 { - e.seqNo += 1 + e.seqNo++ return e.seqNo - 1 } From 0531b9542b3650458ed83de9f6f99ddcd76a1766 Mon Sep 17 00:00:00 2001 From: saxon Date: Sat, 24 Nov 2018 12:22:17 +1030 Subject: [PATCH 36/37] rtp: client only needs to specify padding length and then padding indicator is set based on this --- stream/rtp/encoder.go | 1 - stream/rtp/rtp.go | 11 +++++------ stream/rtp/rtp_test.go | 2 +- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/stream/rtp/encoder.go b/stream/rtp/encoder.go index eb6e404e..c8750e95 100644 --- a/stream/rtp/encoder.go +++ b/stream/rtp/encoder.go @@ -83,7 +83,6 @@ func (e *Encoder) Write(data []byte) (int, error) { func (e *Encoder) Encode(payload []byte) error { pkt := Pkt{ V: rtpVer, // version - P: no, // padding X: no, // header extension CC: no, // CSRC count M: no, // NOTE: need to check if this works (decoders should ignore this) diff --git a/stream/rtp/rtp.go b/stream/rtp/rtp.go index 45898a32..4e393679 100644 --- a/stream/rtp/rtp.go +++ b/stream/rtp/rtp.go @@ -37,9 +37,10 @@ const ( ) // Pkt provides fields consistent with RFC3550 definition of an rtp packet +// The padding indicator does not need to be set manually, only the padding length type Pkt struct { V byte // Version (currently 2) - P byte // Padding indicator (0 => padding, 1 => padding) + p byte // Padding indicator (0 => padding, 1 => padding) X byte // Extension header indicator CC byte // CSRC count M byte // Marker bit @@ -57,10 +58,8 @@ func (p *Pkt) Bytes() []byte { p.V = rtpVer } - if p.P != 0 && p.Padding == 0 { - panic("Padding bit set to something other than 1, but there is no padding size defined.") - } else if p.P == 0 && p.Padding != 0 { - panic("Padding bit is set to zero, but it's indicated that there is padding.") + if p.Padding > 0 { + p.p = 1 } if p.CC != 0 { @@ -78,7 +77,7 @@ func (p *Pkt) Bytes() []byte { const headSize = 3 * 4 // bytes buf := make([]byte, headSize, headSize+len(p.Payload)+int(p.Padding)) - buf[0] = p.V<<6 | p.P<<5 | p.CC + buf[0] = p.V<<6 | p.p<<5 | p.CC buf[1] = p.M<<7 | p.PT buf[2] = byte(p.SN >> 8) buf[3] = byte(p.SN) diff --git a/stream/rtp/rtp_test.go b/stream/rtp/rtp_test.go index 3d034c32..85b33590 100644 --- a/stream/rtp/rtp_test.go +++ b/stream/rtp/rtp_test.go @@ -42,7 +42,7 @@ var rtpTests = []struct { num: 1, pkt: Pkt{ V: 2, - P: 0, + p: 0, X: 0, CC: 0, M: 0, From 4ce4525a1ca1adeed6ce07fc649de093232ac49a Mon Sep 17 00:00:00 2001 From: saxon Date: Sat, 24 Nov 2018 15:20:26 +1030 Subject: [PATCH 37/37] revid: removed blank line under newUdpSender --- revid/senders.go | 1 - 1 file changed, 1 deletion(-) diff --git a/revid/senders.go b/revid/senders.go index 9bf37e05..cc0b869f 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -299,7 +299,6 @@ type udpSender struct { } func newUdpSender(addr string, log func(lvl int8, msg string, args ...interface{})) (*udpSender, error) { - conn, err := net.Dial("udp", addr) if err != nil { return nil, err