diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 3f11f042..58c094f6 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -72,6 +72,7 @@ const ( verticalFlipPtr horizontalFlipPtr logPathPtr + rtpAddrPtr noOfConfigFlags ) @@ -106,7 +107,7 @@ var ( flagNames = [noOfConfigFlags]struct{ name, description string }{ {"Input", "The input type: Raspivid, File"}, {"InputCodec", "The codec of the input: H264, Mjpeg"}, - {"Output", "The output type: Http, Rtmp, File"}, + {"Output", "The output type: Http, Rtmp, File, Udp, Rtp"}, {"RtmpMethod", "The method used to send over rtmp: Ffmpeg, Librtmp"}, // NOTE: we add rtp here when we have this functionality {"Packetization", "The method of data packetisation: Flv, Mpegts, None"}, @@ -127,6 +128,7 @@ var ( {"VerticalFlip", "Flip video vertically: Yes, No"}, {"HorizontalFlip", "Flip video horizontally: Yes, No"}, {"LogPath", "Path for logging files (default is /var/log/netsender/)"}, + {"RtpAddr", "Rtp destination address: : (port is generally 6970-6999)"}, } ) @@ -193,8 +195,8 @@ func handleFlags() { } switch *configFlags[inputCodecPtr] { - case "H264Codec": - config.InputCodec = revid.H264Codec + case "H264": + config.InputCodec = revid.H264 case "": default: logger.Log(smartlogger.Error, pkg+"bad input codec argument") @@ -209,6 +211,11 @@ func handleFlags() { config.Output = revid.Rtmp case "FfmpegRtmp": config.Output = revid.FfmpegRtmp + case "Udp": + config.Output = revid.Udp + case "Rtp": + config.Output = revid.Rtp + config.Packetization = revid.MpegtsRtp case "": default: logger.Log(smartlogger.Error, pkg+"bad output argument") @@ -231,6 +238,8 @@ func handleFlags() { config.Packetization = revid.Mpegts case "Flv": config.Packetization = revid.Flv + case "MpegtsRtp": + config.Packetization = revid.MpegtsRtp case "": default: logger.Log(smartlogger.Error, pkg+"bad packetization argument") @@ -293,6 +302,7 @@ func handleFlags() { config.Quantization = *configFlags[quantizationPtr] config.Timeout = *configFlags[timeoutPtr] config.IntraRefreshPeriod = *configFlags[intraRefreshPeriodPtr] + config.RtpAddress = *configFlags[rtpAddrPtr] } // initialize then run the main NetSender client diff --git a/revid/config.go b/revid/config.go index ea00f392..1ff69e1d 100644 --- a/revid/config.go +++ b/revid/config.go @@ -58,6 +58,7 @@ type Config struct { Quantization string Timeout string IntraRefreshPeriod string + RtpAddress string Logger Logger } @@ -81,6 +82,9 @@ const ( No Rtmp FfmpegRtmp + Udp + MpegtsRtp + Rtp ) // Default config settings @@ -102,6 +106,7 @@ const ( httpFramesPerClip = 7 defaultInputCodec = H264 defaultVerbosity = No + defaultRtpAddr = "localhost:6970" ) // Validate checks for any errors in the config fields and defaults settings @@ -178,6 +183,8 @@ func (c *Config) Validate(r *Revid) error { switch c.Output { case File: + case Rtp: + case Udp: case Rtmp, FfmpegRtmp: if c.RtmpUrl == "" { c.Logger.Log(smartlogger.Info, pkg+"no RTMP URL: falling back to HTTP") @@ -200,18 +207,6 @@ func (c *Config) Validate(r *Revid) error { return errors.New("bad output type defined in config") } - switch c.Packetization { - case None: - case Mpegts: - case Flv: - case NothingDefined: - c.Logger.Log(smartlogger.Warning, pkg+"no packetization option defined, defaulting", - "packetization", defaultPacketization) - c.Packetization = defaultPacketization - default: - return errors.New("bad packetization option defined in config") - } - switch c.HorizontalFlip { case Yes: case No: @@ -308,5 +303,9 @@ func (c *Config) Validate(r *Revid) error { return errors.New("quantisation not unsigned integer or is over threshold") } } + + if c.RtpAddress == "" { + c.RtpAddress = defaultRtpAddr + } return nil } diff --git a/revid/revid.go b/revid/revid.go index 1e532662..fde3efb7 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -43,6 +43,7 @@ import ( "bitbucket.org/ausocean/av/stream/flv" "bitbucket.org/ausocean/av/stream/lex" "bitbucket.org/ausocean/av/stream/mts" + "bitbucket.org/ausocean/av/stream/rtp" "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/ring" "bitbucket.org/ausocean/utils/smartlogger" @@ -226,6 +227,12 @@ func (r *Revid) reset(config Config) error { r.destination = s case Http: r.destination = newHttpSender(r.ns, r.config.Logger.Log) + case Rtp, Udp: + s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log) + if err != nil { + return err + } + r.destination = s } switch r.config.Input { @@ -271,6 +278,10 @@ func (r *Revid) reset(config Config) error { if err != nil { return err } + case MpegtsRtp: + r.config.Logger.Log(smartlogger.Info, pkg+"using RTP packetisation") + frameRate, _ := strconv.Atoi(r.config.FrameRate) + r.encoder = mts.NewEncoder(rtp.NewEncoder(&r.packer, frameRate), float64(frameRate)) } return nil @@ -348,10 +359,12 @@ loop: } if err != nil && chunk.Len() > 11 { - r.config.Logger.Log(smartlogger.Debug, pkg+"send failed, trying again") + r.config.Logger.Log(smartlogger.Error, pkg+"first send failed", "error", err.Error()) // Try and send again err = r.destination.send() - r.config.Logger.Log(smartlogger.Error, pkg+"destination send error", "error", err.Error()) + if err != nil { + r.config.Logger.Log(smartlogger.Error, pkg+"destination send error", "error", err.Error()) + } // if there's still an error we try and reconnect, unless we're stopping for err != nil { diff --git a/revid/senders.go b/revid/senders.go index 38f7856a..cc0b869f 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -30,6 +30,7 @@ package revid import ( "io" + "net" "os" "os/exec" @@ -289,3 +290,38 @@ func (s *rtmpSender) restart() error { func (s *rtmpSender) close() error { return s.sess.Close() } + +// rtpSender implements loadSender for a native udp destination. +type udpSender struct { + conn net.Conn + log func(lvl int8, msg string, args ...interface{}) + chunk *ring.Chunk +} + +func newUdpSender(addr string, log func(lvl int8, msg string, args ...interface{})) (*udpSender, error) { + conn, err := net.Dial("udp", addr) + if err != nil { + return nil, err + } + return &udpSender{ + conn: conn, + log: log, + }, nil +} + +func (s *udpSender) load(c *ring.Chunk) error { + s.chunk = c + return nil +} + +func (s *udpSender) send() error { + _, err := s.chunk.WriteTo(s.conn) + return err +} + +func (s *udpSender) release() { + s.chunk.Close() + s.chunk = nil +} + +func (s *udpSender) close() error { return nil } diff --git a/stream/mts/encoder.go b/stream/mts/encoder.go index 3100c96e..8c6bce27 100644 --- a/stream/mts/encoder.go +++ b/stream/mts/encoder.go @@ -38,9 +38,13 @@ import ( "bitbucket.org/ausocean/av/stream/mts/pes" ) -const psiPacketSize = 184 +const ( + psiPacketSize = 184 + psiSendCount = 100 +) -// TODO: really need to finish the at and pmt stuff - this is too hacky +// TODO: Finish off mts/psi so that we can create pat and pmt tables instead +// of hardcoding. var ( patTable = []byte{ 0x00, // pointer @@ -163,6 +167,8 @@ type Encoder struct { frameInterval time.Duration ptsOffset time.Duration + psiCount uint + continuity map[int]byte } @@ -195,32 +201,13 @@ const ( // generate handles the incoming data and generates equivalent mpegts packets - // sending them to the output channel func (e *Encoder) Encode(nalu []byte) error { - // Write PAT - patPkt := Packet{ - PUSI: true, - PID: patPid, - CC: e.ccFor(patPid), - AFC: hasPayload, - Payload: patTable, + if e.psiCount <= 0 { + err := e.writePSI() + if err != nil { + return err + } } - _, err := e.dst.Write(patPkt.Bytes()) - if err != nil { - return err - } - - // Write PMT. - pmtPkt := Packet{ - PUSI: true, - PID: pmtPid, - CC: e.ccFor(pmtPid), - AFC: hasPayload, - Payload: pmtTable, - } - _, err = e.dst.Write(pmtPkt.Bytes()) - if err != nil { - return err - } - + e.psiCount-- // Prepare PES data. pesPkt := pes.Packet{ StreamID: streamID, @@ -250,7 +237,6 @@ func (e *Encoder) Encode(nalu []byte) error { pkt.PCR = e.pcr() pusi = false } - _, err := e.dst.Write(pkt.Bytes()) if err != nil { return err @@ -262,6 +248,36 @@ func (e *Encoder) Encode(nalu []byte) error { return nil } +func (e *Encoder) writePSI() error { + // Write PAT + patPkt := Packet{ + PUSI: true, + PID: patPid, + CC: e.ccFor(patPid), + AFC: hasPayload, + Payload: patTable, + } + _, err := e.dst.Write(patPkt.Bytes()) + if err != nil { + return err + } + + // Write PMT. + pmtPkt := Packet{ + PUSI: true, + PID: pmtPid, + CC: e.ccFor(pmtPid), + AFC: hasPayload, + Payload: pmtTable, + } + _, err = e.dst.Write(pmtPkt.Bytes()) + if err != nil { + return err + } + e.psiCount = psiSendCount + return nil +} + // tick advances the clock one frame interval. func (e *Encoder) tick() { e.clock += e.frameInterval diff --git a/stream/rtp/encoder.go b/stream/rtp/encoder.go new file mode 100644 index 00000000..c8750e95 --- /dev/null +++ b/stream/rtp/encoder.go @@ -0,0 +1,118 @@ +/* +NAME + encoder.go + +DESCRIPTION + See Readme.md + +AUTHOR + Saxon Nelson-Milton (saxon@ausocean.org) + +LICENSE + encoder.go is Copyright (C) 2018 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + +package rtp + +import ( + "io" + "math/rand" + "time" +) + +const ( + yes = 1 + no = 0 + defaultPktType = 33 + timestampFreq = 90000 // Hz + mtsSize = 188 + bufferSize = 1000 + sendLength = 7 * 188 +) + +// Encoder implements io writer and provides functionality to wrap data into +// rtp packets +type Encoder struct { + dst io.Writer + ssrc uint32 + seqNo uint16 + clock time.Duration + frameInterval time.Duration + fps int + buffer []byte +} + +// NewEncoder returns a new Encoder type given an io.Writer - the destination +// after encoding and the desired fps +func NewEncoder(dst io.Writer, fps int) *Encoder { + return &Encoder{ + dst: dst, + ssrc: rand.Uint32(), + frameInterval: time.Duration(float64(time.Second) / float64(fps)), + fps: fps, + buffer: make([]byte, 0, sendLength), + } +} + +// Write provides an interface between a prior encoder and this rtp encoder, +// so that multiple layers of packetization can occur. +func (e *Encoder) Write(data []byte) (int, error) { + e.buffer = append(e.buffer, data...) + for len(e.buffer) >= sendLength { + e.Encode(e.buffer) + e.buffer = e.buffer[:0] + } + return len(data), nil +} + +// Encode takes a nalu unit and encodes it into an rtp packet and +// writes to the io.Writer given in NewEncoder +func (e *Encoder) Encode(payload []byte) error { + pkt := Pkt{ + V: rtpVer, // version + X: no, // header extension + CC: no, // CSRC count + M: no, // NOTE: need to check if this works (decoders should ignore this) + PT: defaultPktType, // 33 for mpegts + SN: e.nxtSeqNo(), // sequence number + TS: e.nxtTimestamp(), // timestamp + SSRC: e.ssrc, // source identifier + Payload: payload, + Padding: no, + } + _, err := e.dst.Write(pkt.Bytes()) + if err != nil { + return err + } + e.tick() + return nil +} + +// tick advances the clock one frame interval. +func (e *Encoder) tick() { + e.clock += e.frameInterval +} + +// nxtTimestamp gets the next timestamp +func (e *Encoder) nxtTimestamp() uint32 { + return uint32(e.clock.Seconds() * timestampFreq) +} + +// nxtSeqNo gets the next rtp packet sequence number +func (e *Encoder) nxtSeqNo() uint16 { + e.seqNo++ + return e.seqNo - 1 +} diff --git a/stream/rtp/rtp.go b/stream/rtp/rtp.go new file mode 100644 index 00000000..4e393679 --- /dev/null +++ b/stream/rtp/rtp.go @@ -0,0 +1,102 @@ +/* +NAME + rtp.go - provides a data structure intended to encapsulate the properties + of an rtp packet and also functions to allow manipulation of these packets. + +DESCRIPTION + See Readme.md + +AUTHOR + Saxon A. Nelson-Milton + +LICENSE + rtp.go is Copyright (C) 2018 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). +*/ + +/* +See https://tools.ietf.org/html/rfc6184 and https://tools.ietf.org/html/rfc3550 +for rtp-h264 and rtp standards. +*/ +package rtp + +const ( + rtpVer = 2 +) + +// Pkt provides fields consistent with RFC3550 definition of an rtp packet +// The padding indicator does not need to be set manually, only the padding length +type Pkt struct { + V byte // Version (currently 2) + p byte // Padding indicator (0 => padding, 1 => padding) + X byte // Extension header indicator + CC byte // CSRC count + M byte // Marker bit + PT byte // Packet type + SN uint16 // Synch number + TS uint32 // Timestamp + SSRC uint32 // Synchronisation source identifier + Payload []byte // H264 Payload data + Padding byte // No of bytes of padding +} + +// Bytes provides a byte slice of the packet +func (p *Pkt) Bytes() []byte { + if p.V == 0 { + p.V = rtpVer + } + + if p.Padding > 0 { + p.p = 1 + } + + if p.CC != 0 { + panic("CC has been set to something other than 0 - this is not supported yet.") + } + + if p.X != 0 { + panic("rtp: X (extension header indicator) not 0, but extensiion headers not currently supported.") + } + + if p.CC != 0 { + panic("rtp: CC (CSRC count) not 0, but CSRC headers not yet supported.") + } + + const headSize = 3 * 4 // bytes + buf := make([]byte, headSize, headSize+len(p.Payload)+int(p.Padding)) + + buf[0] = p.V<<6 | p.p<<5 | p.CC + buf[1] = p.M<<7 | p.PT + buf[2] = byte(p.SN >> 8) + buf[3] = byte(p.SN) + buf[4] = byte(p.TS >> 24) + buf[5] = byte(p.TS >> 16) + buf[6] = byte(p.TS >> 8) + buf[7] = byte(p.TS) + buf[8] = byte(p.SSRC >> 24) + buf[9] = byte(p.SSRC >> 16) + buf[10] = byte(p.SSRC >> 8) + buf[11] = byte(p.SSRC) + + buf = append(buf, p.Payload...) + // see https://tools.ietf.org/html/rfc3550 section 5.1 (padding). At end of + // rtp packet, padding may exist, with the last octet being the length of the + // padding including itself. + if p.Padding != 0 { + buf = buf[:cap(buf)] + buf[len(buf)-1] = byte(p.Padding) + } + return buf +} diff --git a/stream/rtp/rtp_test.go b/stream/rtp/rtp_test.go new file mode 100644 index 00000000..85b33590 --- /dev/null +++ b/stream/rtp/rtp_test.go @@ -0,0 +1,74 @@ +/* +NAME + rtp_test.go + +DESCRIPTION + See Readme.md + +AUTHOR + Saxon Nelson-Milton (saxon@ausocean.org) + +LICENSE + rtp_test.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + +package rtp + +import ( + "reflect" + "testing" +) + +// TODO (saxon): add more tests +var rtpTests = []struct { + num int + pkt Pkt + want []byte +}{ + { + num: 1, + pkt: Pkt{ + V: 2, + p: 0, + X: 0, + CC: 0, + M: 0, + PT: 6, + SN: 167, + TS: 160, + SSRC: 10, + Payload: []byte{0x00, 0x01, 0x07, 0xf0, 0x56, 0x37, 0x0a, 0x0f}, + Padding: 0, + }, + want: []byte{ + 0x80, 0x06, 0x00, 0xa7, + 0x00, 0x00, 0x00, 0xa0, + 0x00, 0x00, 0x00, 0x0a, + 0x00, 0x01, 0x07, 0xf0, + 0x56, 0x37, 0x0a, 0x0f, + }, + }, +} + +func TestRtpPktToByteSlice(t *testing.T) { + for _, test := range rtpTests { + got := test.pkt.Bytes() + if !reflect.DeepEqual(got, test.want) { + t.Errorf("unexpected error for test %v: got:%v want:%v", test.num, got, + test.want) + } + } +}