From 1059b5e738c68a042ca8bd14f333f10b585dd832 Mon Sep 17 00:00:00 2001 From: Saxon Date: Tue, 14 May 2019 11:47:18 +0930 Subject: [PATCH 01/23] revid: wrote basics of a startRTSPCamera func Currently just connecting to RTSP server, requesting OPTIONS, DESCRIBE, SETUP and PLAY. Also creating RTP client and giving this to process from for the lexer. --- revid/config.go | 3 +++ revid/revid.go | 44 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/revid/config.go b/revid/config.go index 010af3fd..72400c60 100644 --- a/revid/config.go +++ b/revid/config.go @@ -74,6 +74,9 @@ type Config struct { Saturation int Exposure string AutoWhiteBalance string + RTSPURL string + RTPRecvAddr string + RTCPAddr string } // Possible modes for raspivid --exposure parameter. diff --git a/revid/revid.go b/revid/revid.go index 43c4f982..25453257 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -43,6 +43,8 @@ import ( "bitbucket.org/ausocean/av/codec/lex" "bitbucket.org/ausocean/av/container/flv" "bitbucket.org/ausocean/av/container/mts" + "bitbucket.org/ausocean/av/protocol/rtp" + "bitbucket.org/ausocean/av/protocol/rtsp" "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/ioext" "bitbucket.org/ausocean/utils/logger" @@ -604,6 +606,48 @@ func (r *Revid) setupInputForFile() (func() error, error) { return func() error { return f.Close() }, nil } +func (r *Revid) startRTSPCamera() (func() error, error) { + rtspClt, err := rtsp.NewClient(r.config.RTSPURL) + if err != nil { + return nil, err + } + + resp, err := rtspClt.Options() + if err != nil { + return nil, err + } + r.config.Logger.Log(logger.Info, pkg+"RTSP server OPTIONS response", "response", resp.String()) + + resp, err = rtspClt.Describe() + if err != nil { + return nil, err + } + r.config.Logger.Log(logger.Info, pkg+"RTSP server DESCRIBE response", "response", resp.String()) + + transport := fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", strings.Split(r.config.RTPRecvAddr, ":")[1], strings.Split(r.config.RTCPAddr, ":")[1]) + resp, err = rtspClt.Setup("track1", transport) + if err != nil { + return nil, err + } + r.config.Logger.Log(logger.Info, pkg+"RTSP server SETUP response", "response", resp.String()) + + resp, err = rtspClt.Play() + if err != nil { + return nil, err + } + + // TODO(saxon): use rtcp client to maintain rtp stream. + + rtpClt, err := rtp.NewClient(r.config.RTPRecvAddr) + if err != nil { + return nil, err + } + + r.wg.Add(1) + go r.processFrom(rtpClt, time.Second/time.Duration(r.config.FrameRate)) + return func() error { return nil }, nil +} + func (r *Revid) processFrom(read io.Reader, delay time.Duration) { r.config.Logger.Log(logger.Info, pkg+"reading input data") r.err <- r.lexTo(r.encoders, read, delay) From 25bb49bce7ac3e5eea39706eca494928c7f5c225 Mon Sep 17 00:00:00 2001 From: Saxon Date: Tue, 14 May 2019 11:56:04 +0930 Subject: [PATCH 02/23] revid: using h265 lexer now in the case of RTSPCamera input --- revid/config.go | 3 ++- revid/revid.go | 27 ++++++++++++++++++--------- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/revid/config.go b/revid/config.go index 72400c60..784ab02c 100644 --- a/revid/config.go +++ b/revid/config.go @@ -133,6 +133,7 @@ const ( Udp MpegtsRtp Rtp + RTSPCamera ) // Default config settings @@ -175,7 +176,7 @@ func (c *Config) Validate(r *Revid) error { } switch c.Input { - case Raspivid, V4L, File: + case Raspivid, V4L, File, RTSPCamera: case NothingDefined: c.Logger.Log(logger.Info, pkg+"no input type defined, defaulting", "input", defaultInput) c.Input = defaultInput diff --git a/revid/revid.go b/revid/revid.go index 25453257..47b7ce7c 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -40,6 +40,7 @@ import ( "sync" "time" + "bitbucket.org/ausocean/av/codec/h265" "bitbucket.org/ausocean/av/codec/lex" "bitbucket.org/ausocean/av/container/flv" "bitbucket.org/ausocean/av/container/mts" @@ -264,20 +265,18 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.WriteCloser, rate int) switch r.config.Input { case Raspivid: r.setupInput = r.startRaspivid + r.lexTo = lex.H264 case V4L: r.setupInput = r.startV4L + r.lexTo = lex.H264 case File: r.setupInput = r.setupInputForFile + r.lexTo = lex.H264 + case RTSPCamera: + r.setupInput = r.startRTSPCamera + r.lexTo = h265.NewLexer(false).Lex } - switch r.config.InputCodec { - case H264: - r.config.Logger.Log(logger.Info, pkg+"using H264 lexer") - r.lexTo = lex.H264 - case Mjpeg: - r.config.Logger.Log(logger.Info, pkg+"using MJPEG lexer") - r.lexTo = lex.MJPEG - } return nil } @@ -624,7 +623,17 @@ func (r *Revid) startRTSPCamera() (func() error, error) { } r.config.Logger.Log(logger.Info, pkg+"RTSP server DESCRIBE response", "response", resp.String()) - transport := fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", strings.Split(r.config.RTPRecvAddr, ":")[1], strings.Split(r.config.RTCPAddr, ":")[1]) + rtpPort, err := strconv.Atoi(strings.Split(r.config.RTPRecvAddr, ":")[0]) + if err != nil { + return nil, err + } + + rtcpPort, err := strconv.Atoi(strings.Split(r.config.RTCPAddr, ":")[0]) + if err != nil { + return nil, err + } + + transport := fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", rtpPort, rtcpPort) resp, err = rtspClt.Setup("track1", transport) if err != nil { return nil, err From 5a2f15054dd63686fafba6f1f676a4dccd9bb949 Mon Sep 17 00:00:00 2001 From: Saxon Date: Thu, 16 May 2019 12:55:35 +0930 Subject: [PATCH 03/23] cmd/revid-cli: added flags related to RTSP input Added 'RTSPCamera' option to description for 'Input' revid-cli flag. Also added other flags required to set config params for RTSP input, like RTSPURL, RTPRecvAddr and RTCPAddr. --- cmd/revid-cli/main.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index fc90dbcc..e41fc657 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -105,7 +105,10 @@ func handleFlags() revid.Config { var ( cpuprofile = flag.String("cpuprofile", "", "write cpu profile to `file`") - inputPtr = flag.String("Input", "", "The input type: Raspivid, File, Webcam") + inputPtr = flag.String("Input", "", "The input type: Raspivid, File, Webcam, RTSPCamera") + rtspURLPtr = flag.String("RTSPURL", "", "The URL for an RTSP server.") + rtpRecvAddrPtr = flag.String("RTPRecvAddr", "", "The RTP address we would like to receive RTP from.") + rtcpAddrPtr = flag.String("RTCPAddr", "", "The address for RTCP communication.") inputCodecPtr = flag.String("InputCodec", "", "The codec of the input: H264, Mjpeg") rtmpMethodPtr = flag.String("RtmpMethod", "", "The method used to send over rtmp: Ffmpeg, Librtmp") quantizePtr = flag.Bool("Quantize", false, "Quantize input (non-variable bitrate)") @@ -223,6 +226,9 @@ func handleFlags() revid.Config { netsender.ConfigFile = *configFilePtr } + cfg.RTSPURL = *rtspURLPtr + cfg.RTPRecvAddr = *rtpRecvAddrPtr + cfg.RTCPAddr = *rtcpAddrPtr cfg.Quantize = *quantizePtr cfg.Rotation = *rotationPtr cfg.FlipHorizontal = *horizontalFlipPtr From bd56e936a40a4a7f5d1983a31cb237bef4713987 Mon Sep 17 00:00:00 2001 From: Saxon Date: Thu, 16 May 2019 13:05:09 +0930 Subject: [PATCH 04/23] cmd/revid-cli: checking Input flag string and assigning cfg.Input to revid.RTSPCamera if 'RTSPCamera' is entered. --- cmd/revid-cli/main.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index e41fc657..a1961873 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -183,6 +183,8 @@ func handleFlags() revid.Config { cfg.Input = revid.V4L case "File": cfg.Input = revid.File + case "RTSPCamera": + cfg.Input = revid.RTSPCamera case "": default: log.Log(logger.Error, pkg+"bad input argument") From 548b7caa81e8c15188d4a7a717b2cb8a636d7809 Mon Sep 17 00:00:00 2001 From: Saxon Date: Thu, 16 May 2019 13:57:10 +0930 Subject: [PATCH 05/23] revid: fixed rtpPort and rtcpPort parsing and added H264 and H265 IDs Added H264ID and H265ID consts and added logic to select this const for use in encoder based on mediaType param in NewEncoder. Also now declaring PMT in NewEncoder so that we can set streamID correctly based on mediaType. --- container/mts/encoder.go | 65 +++++++++++++++++--------------- container/mts/encoder_test.go | 2 +- container/mts/metaEncode_test.go | 4 +- container/mts/mpegts_test.go | 2 +- revid/revid.go | 15 ++++++-- revid/senders_test.go | 6 +-- 6 files changed, 54 insertions(+), 40 deletions(-) diff --git a/container/mts/encoder.go b/container/mts/encoder.go index 3208276b..a90df386 100644 --- a/container/mts/encoder.go +++ b/container/mts/encoder.go @@ -55,30 +55,6 @@ var ( }, }, } - - // standardPmt is a minimal PMT, without descriptors for time and location. - standardPmt = psi.PSI{ - Pf: 0x00, - Tid: 0x02, - Ssi: true, - Sl: 0x12, - Tss: &psi.TSS{ - Tide: 0x01, - V: 0, - Cni: true, - Sn: 0, - Lsn: 0, - Sd: &psi.PMT{ - Pcrpid: 0x0100, - Pil: 0, - Essd: &psi.ESSD{ - St: 0x1b, - Epid: 0x0100, - Esil: 0x00, - }, - }, - }, - } ) const ( @@ -94,7 +70,7 @@ var Meta *meta.Data var ( patTable = standardPat.Bytes() - pmtTable = standardPmt.Bytes() + pmtTable []byte ) const ( @@ -103,14 +79,16 @@ const ( pmtPid = 4096 videoPid = 256 audioPid = 210 - videoStreamID = 0xe0 // First video stream ID. + H264ID = 27 + H265ID = 36 audioStreamID = 0xc0 // First audio stream ID. ) // Video and Audio constants are used to communicate which media type will be encoded when creating a // new encoder with NewEncoder. const ( - Video = iota + H264 = iota + H265 Audio ) @@ -158,11 +136,38 @@ func NewEncoder(dst io.WriteCloser, rate float64, mediaType int) *Encoder { case Audio: mPid = audioPid sid = audioStreamID - case Video: + case H265: mPid = videoPid - sid = videoStreamID + sid = H264ID + case H264: + mPid = videoPid + sid = H265ID } + // standardPmt is a minimal PMT, without descriptors for time and location. + pmtTable = (&psi.PSI{ + Pf: 0x00, + Tid: 0x02, + Ssi: true, + Sl: 0x12, + Tss: &psi.TSS{ + Tide: 0x01, + V: 0, + Cni: true, + Sn: 0, + Lsn: 0, + Sd: &psi.PMT{ + Pcrpid: 0x0100, + Pil: 0, + Essd: &psi.ESSD{ + St: byte(sid), + Epid: 0x0100, + Esil: 0x00, + }, + }, + }, + }).Bytes() + return &Encoder{ dst: dst, @@ -219,7 +224,7 @@ func (e *Encoder) Write(data []byte) (int, error) { // Prepare PES data. pesPkt := pes.Packet{ - StreamID: e.streamID, + StreamID: byte(36), PDI: hasPTS, PTS: e.pts(), Data: data, diff --git a/container/mts/encoder_test.go b/container/mts/encoder_test.go index 8436d241..188c615a 100644 --- a/container/mts/encoder_test.go +++ b/container/mts/encoder_test.go @@ -99,7 +99,7 @@ func TestEncodeVideo(t *testing.T) { // Create the dst and write the test data to encoder. dst := &destination{} - _, err := NewEncoder(nopCloser{dst}, 25, Video).Write(data) + _, err := NewEncoder(nopCloser{dst}, 25, H264).Write(data) if err != nil { t.Fatalf("could not write data to encoder, failed with err: %v\n", err) } diff --git a/container/mts/metaEncode_test.go b/container/mts/metaEncode_test.go index 939de5b7..75d5f6e1 100644 --- a/container/mts/metaEncode_test.go +++ b/container/mts/metaEncode_test.go @@ -48,7 +48,7 @@ const fps = 25 func TestMetaEncode1(t *testing.T) { Meta = meta.New() var buf bytes.Buffer - e := NewEncoder(nopCloser{&buf}, fps, Video) + e := NewEncoder(nopCloser{&buf}, fps, H264) Meta.Add("ts", "12345678") if err := e.writePSI(); err != nil { t.Errorf(errUnexpectedErr, err.Error()) @@ -76,7 +76,7 @@ func TestMetaEncode1(t *testing.T) { func TestMetaEncode2(t *testing.T) { Meta = meta.New() var buf bytes.Buffer - e := NewEncoder(nopCloser{&buf}, fps, Video) + e := NewEncoder(nopCloser{&buf}, fps, H264) Meta.Add("ts", "12345678") Meta.Add("loc", "1234,4321,1234") if err := e.writePSI(); err != nil { diff --git a/container/mts/mpegts_test.go b/container/mts/mpegts_test.go index 650fecab..4c90cc0e 100644 --- a/container/mts/mpegts_test.go +++ b/container/mts/mpegts_test.go @@ -126,7 +126,7 @@ func writePSI(b *bytes.Buffer) error { func writeFrame(b *bytes.Buffer, frame []byte, pts uint64) error { // Prepare PES data. pesPkt := pes.Packet{ - StreamID: videoStreamID, + StreamID: H264ID, PDI: hasPTS, PTS: pts, Data: frame, diff --git a/revid/revid.go b/revid/revid.go index 47b7ce7c..0d1b254e 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -166,7 +166,14 @@ func (r *Revid) reset(config Config) error { err = r.setupPipeline( func(dst io.WriteCloser, fps int) (io.WriteCloser, error) { - e := mts.NewEncoder(dst, float64(fps), mts.Video) + var st int + switch r.config.Input { + case Raspivid, File, V4L: + st = mts.H264 + case RTSPCamera: + st = mts.H265 + } + e := mts.NewEncoder(dst, float64(fps), st) return e, nil }, func(dst io.WriteCloser, fps int) (io.WriteCloser, error) { @@ -607,6 +614,7 @@ func (r *Revid) setupInputForFile() (func() error, error) { func (r *Revid) startRTSPCamera() (func() error, error) { rtspClt, err := rtsp.NewClient(r.config.RTSPURL) + fmt.Printf("RTSPURL: %v\n", r.config.RTSPURL) if err != nil { return nil, err } @@ -623,12 +631,12 @@ func (r *Revid) startRTSPCamera() (func() error, error) { } r.config.Logger.Log(logger.Info, pkg+"RTSP server DESCRIBE response", "response", resp.String()) - rtpPort, err := strconv.Atoi(strings.Split(r.config.RTPRecvAddr, ":")[0]) + rtpPort, err := strconv.Atoi(strings.Split(r.config.RTPRecvAddr, ":")[1]) if err != nil { return nil, err } - rtcpPort, err := strconv.Atoi(strings.Split(r.config.RTCPAddr, ":")[0]) + rtcpPort, err := strconv.Atoi(strings.Split(r.config.RTCPAddr, ":")[1]) if err != nil { return nil, err } @@ -644,6 +652,7 @@ func (r *Revid) startRTSPCamera() (func() error, error) { if err != nil { return nil, err } + r.config.Logger.Log(logger.Info, pkg+"RTSP server PLAY response", "response", resp.String()) // TODO(saxon): use rtcp client to maintain rtp stream. diff --git a/revid/senders_test.go b/revid/senders_test.go index 80293759..c350e369 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -134,7 +134,7 @@ func TestMtsSenderSegment(t *testing.T) { const numberOfClips = 11 dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips} sender := newMtsSender(dst, (*dummyLogger)(t).log, rbSize, rbElementSize, 0) - encoder := mts.NewEncoder(sender, 25, mts.Video) + encoder := mts.NewEncoder(sender, 25, mts.H264) // Turn time based PSI writing off for encoder. const psiSendCount = 10 @@ -212,7 +212,7 @@ func TestMtsSenderFailedSend(t *testing.T) { const clipToFailAt = 3 dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})} sender := newMtsSender(dst, (*dummyLogger)(t).log, rbSize, rbElementSize, 0) - encoder := mts.NewEncoder(sender, 25, mts.Video) + encoder := mts.NewEncoder(sender, 25, mts.H264) // Turn time based PSI writing off for encoder and send PSI every 10 packets. const psiSendCount = 10 @@ -292,7 +292,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) { const clipToDelay = 3 dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})} sender := newMtsSender(dst, (*dummyLogger)(t).log, 1, rbElementSize, 0) - encoder := mts.NewEncoder(sender, 25, mts.Video) + encoder := mts.NewEncoder(sender, 25, mts.H264) // Turn time based PSI writing off for encoder. const psiSendCount = 10 From 92d4c5f79acb8616fddd231bf7aa27cbdbb84e74 Mon Sep 17 00:00:00 2001 From: Saxon Date: Thu, 16 May 2019 16:41:52 +0930 Subject: [PATCH 06/23] container/mts/encoder.go: generalising stream id logic --- codec/h265/lex.go | 1 - container/mts/encoder.go | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/codec/h265/lex.go b/codec/h265/lex.go index fcf2e59d..d67cf289 100644 --- a/codec/h265/lex.go +++ b/codec/h265/lex.go @@ -86,7 +86,6 @@ func (l *Lexer) Lex(dst io.Writer, src io.Reader, delay time.Duration) error { if err != nil { return fmt.Errorf("could not get rtp payload, failed with err: %v\n", err) } - nalType := (payload[0] >> 1) & 0x3f // If not currently fragmented then we ignore current write. diff --git a/container/mts/encoder.go b/container/mts/encoder.go index a90df386..0302c73a 100644 --- a/container/mts/encoder.go +++ b/container/mts/encoder.go @@ -138,10 +138,10 @@ func NewEncoder(dst io.WriteCloser, rate float64, mediaType int) *Encoder { sid = audioStreamID case H265: mPid = videoPid - sid = H264ID + sid = H265ID case H264: mPid = videoPid - sid = H265ID + sid = H264ID } // standardPmt is a minimal PMT, without descriptors for time and location. @@ -224,7 +224,7 @@ func (e *Encoder) Write(data []byte) (int, error) { // Prepare PES data. pesPkt := pes.Packet{ - StreamID: byte(36), + StreamID: e.streamID, PDI: hasPTS, PTS: e.pts(), Data: data, From a2d1b09e92e570f0342a95605c09794b2c5651a0 Mon Sep 17 00:00:00 2001 From: Saxon Date: Sun, 19 May 2019 17:21:41 +0930 Subject: [PATCH 07/23] codec/h265: fixed lexer to get nal header into start of fragment Now getting the nal header and type from fu header for the first fragment. We can now lex and create HEVC MTS to RTP - working fine. Need to use RTCP now to continue stream. --- codec/h265/lex.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/codec/h265/lex.go b/codec/h265/lex.go index d67cf289..f5afa5e6 100644 --- a/codec/h265/lex.go +++ b/codec/h265/lex.go @@ -76,6 +76,7 @@ func (l *Lexer) Lex(dst io.Writer, src io.Reader, delay time.Duration) error { switch err { case nil: // Do nothing. case io.EOF: + fmt.Println("done") return nil default: return fmt.Errorf("source read error: %v\n", err) @@ -150,6 +151,11 @@ func (l *Lexer) handleFragmentation(d []byte) { start := d[2]&0x80 != 0 end := d[2]&0x40 != 0 + var head []byte + if start { + head = []byte{(d[0] & 0x81) | ((d[2] & 0x3f) << 1), d[1]} + } + d = d[3:] if l.donl { d = d[2:] @@ -158,7 +164,8 @@ func (l *Lexer) handleFragmentation(d []byte) { switch { case start && !end: l.frag = true - l.writeWithPrefix(d) + _d := append(head, d...) + l.writeWithPrefix(_d) case !start && end: l.frag = false fallthrough From abd41d9f014ee0e4a51e99accaa66a313b9e6f8d Mon Sep 17 00:00:00 2001 From: Saxon Date: Sun, 19 May 2019 20:45:26 +0930 Subject: [PATCH 08/23] codec/h265/lex_test.go: fixed TestLex to account for previous changes --- codec/h265/lex.go | 1 - codec/h265/lex_test.go | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/codec/h265/lex.go b/codec/h265/lex.go index f5afa5e6..84d9222a 100644 --- a/codec/h265/lex.go +++ b/codec/h265/lex.go @@ -76,7 +76,6 @@ func (l *Lexer) Lex(dst io.Writer, src io.Reader, delay time.Duration) error { switch err { case nil: // Do nothing. case io.EOF: - fmt.Println("done") return nil default: return fmt.Errorf("source read error: %v\n", err) diff --git a/codec/h265/lex_test.go b/codec/h265/lex_test.go index 637a1860..1a409e4c 100644 --- a/codec/h265/lex_test.go +++ b/codec/h265/lex_test.go @@ -129,7 +129,7 @@ func TestLex(t *testing.T) { 0x01, 0x02, 0x03, 0x04, // NAL data. // NAL 2 0x00, 0x00, 0x00, 0x01, // Start code. - 0x01, 0x02, 0x03, // FU payload. + 0x00, 0x00, 0x01, 0x02, 0x03, // FU payload. 0x04, 0x05, 0x06, // FU payload. 0x07, 0x08, 0x09, // FU payload. // NAL 3 @@ -216,7 +216,7 @@ func TestLex(t *testing.T) { 0x01, 0x02, 0x03, 0x04, // NAL data. // NAL 2 0x00, 0x00, 0x00, 0x01, // Start code. - 0x01, 0x02, 0x03, // FU payload. + 0x00, 0x00, 0x01, 0x02, 0x03, // FU payload. 0x04, 0x05, 0x06, // FU payload. 0x07, 0x08, 0x09, // FU payload. // NAL 3 From 0567a817570ca413b9471306d1c51253157f5466 Mon Sep 17 00:00:00 2001 From: Saxon Date: Sun, 19 May 2019 21:14:41 +0930 Subject: [PATCH 09/23] codec/h265: removed unnecessary allocation in handle fragmentation --- codec/h265/lex.go | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/codec/h265/lex.go b/codec/h265/lex.go index 84d9222a..6e81b167 100644 --- a/codec/h265/lex.go +++ b/codec/h265/lex.go @@ -63,7 +63,8 @@ type Lexer struct { func NewLexer(donl bool) *Lexer { return &Lexer{ donl: donl, - buf: bytes.NewBuffer(make([]byte, 0, maxAUSize))} + buf: bytes.NewBuffer(make([]byte, 0, maxAUSize)), + } } // Lex continually reads RTP packets from the io.Reader src and lexes into @@ -150,21 +151,25 @@ func (l *Lexer) handleFragmentation(d []byte) { start := d[2]&0x80 != 0 end := d[2]&0x40 != 0 - var head []byte + old := d if start { - head = []byte{(d[0] & 0x81) | ((d[2] & 0x3f) << 1), d[1]} - } - - d = d[3:] - if l.donl { - d = d[2:] + d = d[1:] + if l.donl { + d = d[2:] + } + d[0] = (old[0] & 0x81) | ((old[2] & 0x3f) << 1) + d[1] = old[1] + } else { + d = d[3:] + if l.donl { + d = d[2:] + } } switch { case start && !end: l.frag = true - _d := append(head, d...) - l.writeWithPrefix(_d) + l.writeWithPrefix(d) case !start && end: l.frag = false fallthrough From bc6a0ae55e137d65e5eea8d4464568e2636dcafc Mon Sep 17 00:00:00 2001 From: Saxon Date: Mon, 20 May 2019 18:14:23 +0930 Subject: [PATCH 10/23] revid: using RTCP client to maintain RTP stream from RTSP server Now adopting an RTCP client so that the RTP stream from the RTSP server can be maintained past 1 minute. This change involved some refactor. The rtcp.NewClient signature has been simplified. There is now a default send interval and name for use in the source description in the receiver reports. These can be customised if required with the new SetSendInterval and SetName funcs. The rtcp.NewClient signature now takes an rtp.Client, so that it can get information from the RTP stream, like most recent sequence number. As a result of this requirement the rtp package parse file has been extended with some functions for parsing out the sequence number and ssrc from RTP packets and the RTP client provides getters for these things. --- cmd/revid-cli/main.go | 2 + protocol/rtcp/client.go | 78 ++++++++++++++++++------------------ protocol/rtcp/client_test.go | 12 +++--- protocol/rtcp/parse.go | 34 +++++++++++----- protocol/rtp/client.go | 55 ++++++++++++++++++++++++- protocol/rtp/parse.go | 40 +++++++++++++++--- protocol/rtsp/client.go | 5 +++ revid/config.go | 1 + revid/revid.go | 30 ++++++++++++-- 9 files changed, 194 insertions(+), 63 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index a1961873..6cbcab0a 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -106,6 +106,7 @@ func handleFlags() revid.Config { cpuprofile = flag.String("cpuprofile", "", "write cpu profile to `file`") inputPtr = flag.String("Input", "", "The input type: Raspivid, File, Webcam, RTSPCamera") + rtcpServerAddrPtr = flag.String("RTCPServerAddr", "", "The RTCP server we are communicating with") rtspURLPtr = flag.String("RTSPURL", "", "The URL for an RTSP server.") rtpRecvAddrPtr = flag.String("RTPRecvAddr", "", "The RTP address we would like to receive RTP from.") rtcpAddrPtr = flag.String("RTCPAddr", "", "The address for RTCP communication.") @@ -228,6 +229,7 @@ func handleFlags() revid.Config { netsender.ConfigFile = *configFilePtr } + cfg.RTCPServerAddr = *rtcpServerAddrPtr cfg.RTSPURL = *rtspURLPtr cfg.RTPRecvAddr = *rtpRecvAddrPtr cfg.RTCPAddr = *rtcpAddrPtr diff --git a/protocol/rtcp/client.go b/protocol/rtcp/client.go index 7d6c995c..80d65511 100644 --- a/protocol/rtcp/client.go +++ b/protocol/rtcp/client.go @@ -37,16 +37,18 @@ import ( "sync" "time" + "bitbucket.org/ausocean/av/protocol/rtp" "bitbucket.org/ausocean/utils/logger" ) const ( - senderSSRC = 1 // Any non-zero value will do. - defaultClientName = "Client" - delayUnit = 1.0 / 65536.0 - pkg = "rtcp: " - rtcpVer = 2 - receiverBufSize = 200 + clientSSRC = 1 // Any non-zero value will do. + defaultClientName = "Client" + defaultSendInterval = 2 * time.Second + delayUnit = 1.0 / 65536.0 + pkg = "rtcp: " + rtcpVer = 2 + receiverBufSize = 200 ) // Log describes a function signature required by the RTCP for the purpose of @@ -70,23 +72,19 @@ type Client struct { wg sync.WaitGroup // This is used to wait for send and recv routines to stop when Client is stopped. quit chan struct{} // Channel used to communicate quit signal to send and recv routines. log Log // Used to log any messages. - - err chan error // Client will send any errors through this chan. Can be accessed by Err(). + rtpClt *rtp.Client + err chan error // Client will send any errors through this chan. Can be accessed by Err(). } // NewClient returns a pointer to a new Client. -func NewClient(clientAddress, serverAddress, name string, sendInterval time.Duration, rtpSSRC uint32, l Log) (*Client, error) { - if name == "" { - name = defaultClientName - } - +func NewClient(clientAddress, serverAddress string, rtpClt *rtp.Client, l Log) (*Client, error) { c := &Client{ - name: name, - err: make(chan error), - quit: make(chan struct{}), - interval: sendInterval, - sourceSSRC: rtpSSRC, - log: l, + name: defaultClientName, + err: make(chan error), + quit: make(chan struct{}), + interval: defaultSendInterval, + rtpClt: rtpClt, + log: l, } var err error @@ -107,6 +105,17 @@ func NewClient(clientAddress, serverAddress, name string, sendInterval time.Dura return c, nil } +// SetSendInterval sets a custom receiver report send interval (default is 5 seconds.) +func (c *Client) SetSendInterval(d time.Duration) { + c.interval = d +} + +// SetName sets a custom client name for use in receiver report source description. +// Default is Client". +func (c *Client) SetName(name string) { + c.name = name +} + // Start starts the listen and send routines. This will start the process of // receiving and parsing sender reports, and the process of sending receiver // reports to the server. @@ -126,6 +135,13 @@ func (c *Client) Stop() { c.wg.Wait() } +// Done gives access to the clients quit chan, which is closed when the RTCP +// client is stopped. Therefore, Done may be used to check when the RTCP client +// has stopped running - ideal for use in a routine checking Client.Err(). +func (c *Client) Done() <-chan struct{} { + return c.quit +} + // Err provides read access to the Client err channel. This must be checked // otherwise the client will block if an error encountered. func (c *Client) Err() <-chan error { @@ -171,13 +187,13 @@ func (c *Client) send() { ReportCount: 1, Type: typeReceiverReport, }, - SenderSSRC: senderSSRC, + SenderSSRC: clientSSRC, Blocks: []ReportBlock{ ReportBlock{ - SourceIdentifier: c.sourceSSRC, + SourceIdentifier: c.rtpClt.SSRC(), FractionLost: 0, PacketsLost: math.MaxUint32, - HighestSequence: c.sequence(), + HighestSequence: uint32((c.rtpClt.Cycles() << 16) | c.rtpClt.Sequence()), Jitter: c.jitter(), SenderReportTs: c.lastSenderTs(), SenderReportDelay: c.delay(), @@ -195,7 +211,7 @@ func (c *Client) send() { }, Chunks: []Chunk{ Chunk{ - SSRC: senderSSRC, + SSRC: clientSSRC, Items: []SDESItem{ SDESItem{ Type: typeCName, @@ -238,22 +254,6 @@ func (c *Client) parse(buf []byte) { c.setSenderTs(t) } -// SetSequence will allow updating of the highest sequence number received -// through an RTP stream. -func (c *Client) SetSequence(s uint32) { - c.mu.Lock() - c.seq = s - c.mu.Unlock() -} - -// sequence will return the highest sequence number received through RTP. -func (c *Client) sequence() uint32 { - c.mu.Lock() - s := c.seq - c.mu.Unlock() - return s -} - // jitter returns the interarrival jitter as described by RTCP specifications: // https://tools.ietf.org/html/rfc3550 // TODO(saxon): complete this. diff --git a/protocol/rtcp/client_test.go b/protocol/rtcp/client_test.go index 64a4d685..111bcbbf 100644 --- a/protocol/rtcp/client_test.go +++ b/protocol/rtcp/client_test.go @@ -37,6 +37,7 @@ import ( "testing" "time" + "bitbucket.org/ausocean/av/protocol/rtp" "bitbucket.org/ausocean/utils/logger" ) @@ -148,12 +149,15 @@ func (dl *dummyLogger) log(lvl int8, msg string, args ...interface{}) { // respond with sender reports. func TestReceiveAndSend(t *testing.T) { const clientAddr, serverAddr = "localhost:8000", "localhost:8001" + rtpClt, err := rtp.NewClient("localhost:8002") + if err != nil { + t.Fatalf("unexpected error when creating RTP client: %v", err) + } + c, err := NewClient( clientAddr, serverAddr, - "testClient", - 10*time.Millisecond, - 12345, + rtpClt, (*dummyLogger)(t).log, ) if err != nil { @@ -197,8 +201,6 @@ func TestReceiveAndSend(t *testing.T) { n, _, _ := conn.ReadFromUDP(buf) t.Logf("SERVER: receiver report received: \n%v\n", buf[:n]) - c.SetSequence(uint32(i)) - now := time.Now().Second() var time [8]byte binary.BigEndian.PutUint64(time[:], uint64(now)) diff --git a/protocol/rtcp/parse.go b/protocol/rtcp/parse.go index 2f756f4b..be680fa0 100644 --- a/protocol/rtcp/parse.go +++ b/protocol/rtcp/parse.go @@ -42,15 +42,9 @@ type Timestamp struct { // significant word, and the least significant word. If the given bytes do not // represent a valid receiver report, an error is returned. func ParseTimestamp(buf []byte) (Timestamp, error) { - if len(buf) < 4 { - return Timestamp{}, errors.New("bad RTCP packet, not of sufficient length") - } - if (buf[0]&0xc0)>>6 != rtcpVer { - return Timestamp{}, errors.New("incompatible RTCP version") - } - - if buf[1] != typeSenderReport { - return Timestamp{}, errors.New("RTCP packet is not of sender report type") + err := checkPacket(buf) + if err != nil { + return Timestamp{}, err } return Timestamp{ @@ -58,3 +52,25 @@ func ParseTimestamp(buf []byte) (Timestamp, error) { Fraction: binary.BigEndian.Uint32(buf[12:]), }, nil } + +func ParseSSRC(buf []byte) (uint32, error) { + err := checkPacket(buf) + if err != nil { + return 0, err + } + return binary.BigEndian.Uint32(buf[4:]), nil +} + +func checkPacket(buf []byte) error { + if len(buf) < 4 { + return errors.New("bad RTCP packet, not of sufficient length") + } + if (buf[0]&0xc0)>>6 != rtcpVer { + return errors.New("incompatible RTCP version") + } + + if buf[1] != typeSenderReport { + return errors.New("RTCP packet is not of sender report type") + } + return nil +} diff --git a/protocol/rtp/client.go b/protocol/rtp/client.go index 3ab856b9..53b4d98f 100644 --- a/protocol/rtp/client.go +++ b/protocol/rtp/client.go @@ -28,13 +28,19 @@ LICENSE package rtp import ( + "fmt" "net" + "sync" ) // Client describes an RTP client that can receive an RTP stream and implements // io.Reader. type Client struct { - r *PacketReader + r *PacketReader + ssrc uint32 + mu sync.Mutex + sequence uint16 + cycles uint16 } // NewClient returns a pointer to a new Client. @@ -54,14 +60,61 @@ func NewClient(addr string) (*Client, error) { return nil, err } + buf := make([]byte, 4096) + n, err := c.Read(buf) + if err != nil { + return nil, err + } + c.ssrc, err = SSRC(buf[:n]) + if err != nil { + return nil, fmt.Errorf("could not parse SSRC from RTP packet, failed with error: %v", err) + } + return c, nil } +// SSRC returns the identified for the source from which the RTP packets being +// received are coming from. +func (c *Client) SSRC() uint32 { + return c.ssrc +} + // Read implements io.Reader. func (c *Client) Read(p []byte) (int, error) { + n, err := c.r.Read(p) + if err != nil { + return n, err + } + s, _ := Sequence(p[:n]) + c.setSequence(s) return c.r.Read(p) } +// setSequence sets the most recently received sequence number, and updates the +// cycles count if the sequence number has rolled over. +func (c *Client) setSequence(s uint16) { + c.mu.Lock() + if s < c.sequence { + c.cycles++ + } + c.sequence = s + c.mu.Unlock() +} + +// Sequence returns the most recent RTP packet sequence number received. +func (c *Client) Sequence() uint16 { + c.mu.Lock() + defer c.mu.Unlock() + return c.sequence +} + +// Cycles returns the number of RTP sequence number cycles that have been received. +func (c *Client) Cycles() uint16 { + c.mu.Lock() + defer c.mu.Unlock() + return c.cycles +} + // PacketReader provides an io.Reader interface to an underlying UDP PacketConn. type PacketReader struct { net.PacketConn diff --git a/protocol/rtp/parse.go b/protocol/rtp/parse.go index fcc05907..16e64c5d 100644 --- a/protocol/rtp/parse.go +++ b/protocol/rtp/parse.go @@ -50,11 +50,9 @@ func Marker(d []byte) (bool, error) { // Payload returns the payload from an RTP packet provided the version is // compatible, otherwise an error is returned. func Payload(d []byte) ([]byte, error) { - if len(d) < defaultHeadSize { - panic("invalid RTP packet length") - } - if version(d) != rtpVer { - return nil, errors.New(badVer) + err := checkPacket(d) + if err != nil { + return nil, err } extLen := 0 if hasExt(d) { @@ -64,6 +62,38 @@ func Payload(d []byte) ([]byte, error) { return d[payloadIdx:], nil } +// SSRC returns the source identifier from an RTP packet. An error is return if +// the packet is not valid. +func SSRC(d []byte) (uint32, error) { + err := checkPacket(d) + if err != nil { + return 0, err + } + return binary.BigEndian.Uint32(d[8:]), nil +} + +// Sequence returns the sequence number of an RTP packet. An error is returned +// if the packet is not valid. +func Sequence(d []byte) (uint16, error) { + err := checkPacket(d) + if err != nil { + return 0, err + } + return binary.BigEndian.Uint16(d[2:]), nil +} + +// checkPacket checks the validity of the packet, firstly by checking size and +// then also checking that version is compatible with these utilities. +func checkPacket(d []byte) error { + if len(d) < defaultHeadSize { + return errors.New("invalid RTP packet length") + } + if version(d) != rtpVer { + return errors.New(badVer) + } + return nil +} + // hasExt returns true if an extension is present in the RTP packet. func hasExt(d []byte) bool { return (d[0] & 0x10 >> 4) == 1 diff --git a/protocol/rtsp/client.go b/protocol/rtsp/client.go index 26e539db..77689b64 100644 --- a/protocol/rtsp/client.go +++ b/protocol/rtsp/client.go @@ -59,6 +59,11 @@ func NewClient(addr string) (*Client, error) { return c, nil } +// Close closes the RTSP connection. +func (c *Client) Close() error { + return c.conn.Close() +} + // Describe forms and sends an RTSP request of method DESCRIBE to the RTSP server. func (c *Client) Describe() (*Response, error) { req, err := NewRequest("DESCRIBE", c.nextCSeq(), c.url, nil) diff --git a/revid/config.go b/revid/config.go index 784ab02c..441b7c6d 100644 --- a/revid/config.go +++ b/revid/config.go @@ -77,6 +77,7 @@ type Config struct { RTSPURL string RTPRecvAddr string RTCPAddr string + RTCPServerAddr string } // Possible modes for raspivid --exposure parameter. diff --git a/revid/revid.go b/revid/revid.go index 0d1b254e..d5eefaa4 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -44,6 +44,7 @@ import ( "bitbucket.org/ausocean/av/codec/lex" "bitbucket.org/ausocean/av/container/flv" "bitbucket.org/ausocean/av/container/mts" + "bitbucket.org/ausocean/av/protocol/rtcp" "bitbucket.org/ausocean/av/protocol/rtp" "bitbucket.org/ausocean/av/protocol/rtsp" "bitbucket.org/ausocean/iot/pi/netsender" @@ -614,7 +615,6 @@ func (r *Revid) setupInputForFile() (func() error, error) { func (r *Revid) startRTSPCamera() (func() error, error) { rtspClt, err := rtsp.NewClient(r.config.RTSPURL) - fmt.Printf("RTSPURL: %v\n", r.config.RTSPURL) if err != nil { return nil, err } @@ -654,16 +654,38 @@ func (r *Revid) startRTSPCamera() (func() error, error) { } r.config.Logger.Log(logger.Info, pkg+"RTSP server PLAY response", "response", resp.String()) - // TODO(saxon): use rtcp client to maintain rtp stream. - rtpClt, err := rtp.NewClient(r.config.RTPRecvAddr) if err != nil { return nil, err } + // TODO(saxon): use rtcp client to maintain rtp stream. + rtcpClt, err := rtcp.NewClient(r.config.RTCPAddr, r.config.RTCPServerAddr, rtpClt, r.config.Logger.Log) + if err != nil { + return nil, err + } + + go func() { + for { + select { + case <-rtcpClt.Done(): + return + case err := <-rtcpClt.Err(): + r.config.Logger.Log(logger.Warning, pkg+"RTCP error", "error", err.Error()) + default: + } + } + }() + + rtcpClt.Start() + r.wg.Add(1) go r.processFrom(rtpClt, time.Second/time.Duration(r.config.FrameRate)) - return func() error { return nil }, nil + return func() error { + rtspClt.Close() + rtcpClt.Stop() + return nil + }, nil } func (r *Revid) processFrom(read io.Reader, delay time.Duration) { From 3ff726e4398558d3ddf47b38b8399e970401014a Mon Sep 17 00:00:00 2001 From: Saxon Date: Mon, 20 May 2019 19:45:59 +0930 Subject: [PATCH 11/23] revid & cmd/revid-cli: removed need for command line flags to specify addresses for RTP and RTCP. Removed the command line flags that were used to specifiy local and remote addresses for RTP and RTCP. These are now derived from the initial RTSP connection and also from the RTSP SETUP method reply. --- cmd/revid-cli/main.go | 6 ----- protocol/rtsp/client.go | 18 ++++++++------- protocol/rtsp/rtsp_test.go | 2 +- revid/config.go | 3 --- revid/revid.go | 45 ++++++++++++++++++++++++++------------ 5 files changed, 42 insertions(+), 32 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 6cbcab0a..5c10e51f 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -106,10 +106,7 @@ func handleFlags() revid.Config { cpuprofile = flag.String("cpuprofile", "", "write cpu profile to `file`") inputPtr = flag.String("Input", "", "The input type: Raspivid, File, Webcam, RTSPCamera") - rtcpServerAddrPtr = flag.String("RTCPServerAddr", "", "The RTCP server we are communicating with") rtspURLPtr = flag.String("RTSPURL", "", "The URL for an RTSP server.") - rtpRecvAddrPtr = flag.String("RTPRecvAddr", "", "The RTP address we would like to receive RTP from.") - rtcpAddrPtr = flag.String("RTCPAddr", "", "The address for RTCP communication.") inputCodecPtr = flag.String("InputCodec", "", "The codec of the input: H264, Mjpeg") rtmpMethodPtr = flag.String("RtmpMethod", "", "The method used to send over rtmp: Ffmpeg, Librtmp") quantizePtr = flag.Bool("Quantize", false, "Quantize input (non-variable bitrate)") @@ -229,10 +226,7 @@ func handleFlags() revid.Config { netsender.ConfigFile = *configFilePtr } - cfg.RTCPServerAddr = *rtcpServerAddrPtr cfg.RTSPURL = *rtspURLPtr - cfg.RTPRecvAddr = *rtpRecvAddrPtr - cfg.RTCPAddr = *rtcpAddrPtr cfg.Quantize = *quantizePtr cfg.Rotation = *rotationPtr cfg.FlipHorizontal = *horizontalFlipPtr diff --git a/protocol/rtsp/client.go b/protocol/rtsp/client.go index 77689b64..f6c9d0eb 100644 --- a/protocol/rtsp/client.go +++ b/protocol/rtsp/client.go @@ -43,20 +43,22 @@ type Client struct { sessionID string } -// NewClient returns a pointer to a new Client. The address addr will be parsed and -// a connection to the RTSP server will be made. -func NewClient(addr string) (*Client, error) { - c := &Client{addr: addr} - var err error +// NewClient returns a pointer to a new Client and the local address of the +// RTSP connection. The address addr will be parsed and a connection to the +// RTSP server will be made. +func NewClient(addr string) (c *Client, local, remote *net.TCPAddr, err error) { + c = &Client{addr: addr} c.url, err = url.Parse(addr) if err != nil { - return nil, err + return nil, nil,nil, err } c.conn, err = net.Dial("tcp", c.url.Host) if err != nil { - return nil, err + return nil, nil, nil, err } - return c, nil + local = c.conn.LocalAddr().(*net.TCPAddr) + remote = c.conn.RemoteAddr().(*net.TCPAddr) + return } // Close closes the RTSP connection. diff --git a/protocol/rtsp/rtsp_test.go b/protocol/rtsp/rtsp_test.go index 104f6b63..4157cfba 100644 --- a/protocol/rtsp/rtsp_test.go +++ b/protocol/rtsp/rtsp_test.go @@ -202,7 +202,7 @@ func TestMethods(t *testing.T) { // Keep trying to connect to server. // TODO: use generalised retry utility when available. for retry := 0; ; retry++ { - clt, err = NewClient(serverAddr) + clt, _, _, err = NewClient(serverAddr) if err == nil { break } diff --git a/revid/config.go b/revid/config.go index 441b7c6d..ab68e677 100644 --- a/revid/config.go +++ b/revid/config.go @@ -75,9 +75,6 @@ type Config struct { Exposure string AutoWhiteBalance string RTSPURL string - RTPRecvAddr string - RTCPAddr string - RTCPServerAddr string } // Possible modes for raspivid --exposure parameter. diff --git a/revid/revid.go b/revid/revid.go index d5eefaa4..1a3326ae 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -64,6 +64,12 @@ const ( rtmpConnectionTimeout = 10 ) +const ( + rtpPort = 60000 + rtcpPort = 60001 + defaultServerRTCPPort = 17301 +) + const pkg = "revid:" type Logger interface { @@ -614,7 +620,7 @@ func (r *Revid) setupInputForFile() (func() error, error) { } func (r *Revid) startRTSPCamera() (func() error, error) { - rtspClt, err := rtsp.NewClient(r.config.RTSPURL) + rtspClt, local, remote, err := rtsp.NewClient(r.config.RTSPURL) if err != nil { return nil, err } @@ -631,21 +637,30 @@ func (r *Revid) startRTSPCamera() (func() error, error) { } r.config.Logger.Log(logger.Info, pkg+"RTSP server DESCRIBE response", "response", resp.String()) - rtpPort, err := strconv.Atoi(strings.Split(r.config.RTPRecvAddr, ":")[1]) + resp, err = rtspClt.Setup("track1", fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", rtpPort, rtcpPort)) if err != nil { return nil, err } + transport := resp.Header.Get("Transport") + parts := strings.Split(transport, ";") + var serverPorts string + for _, part := range parts { + if strings.Contains(part, "server_port") { + serverPorts = part + break + } + } + var serverRTCPPort int + if serverPorts == "" { + r.config.Logger.Log(logger.Warning, pkg+"RTSP could not get server ports, defaulting") + serverRTCPPort = defaultServerRTCPPort + } else { + serverRTCPPort, err = strconv.Atoi(strings.Split(serverPorts, "-")[1]) + if err != nil { + serverRTCPPort = defaultServerRTCPPort + } + } - rtcpPort, err := strconv.Atoi(strings.Split(r.config.RTCPAddr, ":")[1]) - if err != nil { - return nil, err - } - - transport := fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", rtpPort, rtcpPort) - resp, err = rtspClt.Setup("track1", transport) - if err != nil { - return nil, err - } r.config.Logger.Log(logger.Info, pkg+"RTSP server SETUP response", "response", resp.String()) resp, err = rtspClt.Play() @@ -654,13 +669,15 @@ func (r *Revid) startRTSPCamera() (func() error, error) { } r.config.Logger.Log(logger.Info, pkg+"RTSP server PLAY response", "response", resp.String()) - rtpClt, err := rtp.NewClient(r.config.RTPRecvAddr) + rtpClt, err := rtp.NewClient(strings.Split(local.String(), ":")[0] + ":" + strconv.Itoa(rtpPort)) if err != nil { return nil, err } // TODO(saxon): use rtcp client to maintain rtp stream. - rtcpClt, err := rtcp.NewClient(r.config.RTCPAddr, r.config.RTCPServerAddr, rtpClt, r.config.Logger.Log) + rtcpCltAddr := strings.Split(local.String(), ":")[0] + ":" + strconv.Itoa(rtcpPort) + rtcpSvrAddr := strings.Split(remote.String(), ":")[0] + ":" + strconv.Itoa(serverRTCPPort) + rtcpClt, err := rtcp.NewClient(rtcpCltAddr, rtcpSvrAddr, rtpClt, r.config.Logger.Log) if err != nil { return nil, err } From 970a445ca4b721ed8e2eae6a31842e11202fd67b Mon Sep 17 00:00:00 2001 From: Saxon Date: Mon, 20 May 2019 20:49:50 +0930 Subject: [PATCH 12/23] codec/h265/lexer.go & protocol/rtp/client.go: fixed lexer and rtp client. The lexer had a bug which is now fixed, and the RTP client is no longer looking for SSRC in rtp.NewClient (which means we miss a packet). --- codec/h265/lex.go | 7 ++++--- protocol/rtp/client.go | 17 ++++------------- 2 files changed, 8 insertions(+), 16 deletions(-) diff --git a/codec/h265/lex.go b/codec/h265/lex.go index 6e81b167..ebe34013 100644 --- a/codec/h265/lex.go +++ b/codec/h265/lex.go @@ -151,14 +151,15 @@ func (l *Lexer) handleFragmentation(d []byte) { start := d[2]&0x80 != 0 end := d[2]&0x40 != 0 - old := d + b1 := (d[0] & 0x81) | ((d[2] & 0x3f) << 1) + b2 := d[1] if start { d = d[1:] if l.donl { d = d[2:] } - d[0] = (old[0] & 0x81) | ((old[2] & 0x3f) << 1) - d[1] = old[1] + d[0] = b1 + d[1] = b2 } else { d = d[3:] if l.donl { diff --git a/protocol/rtp/client.go b/protocol/rtp/client.go index 53b4d98f..e8418b0d 100644 --- a/protocol/rtp/client.go +++ b/protocol/rtp/client.go @@ -28,7 +28,6 @@ LICENSE package rtp import ( - "fmt" "net" "sync" ) @@ -59,17 +58,6 @@ func NewClient(addr string) (*Client, error) { if err != nil { return nil, err } - - buf := make([]byte, 4096) - n, err := c.Read(buf) - if err != nil { - return nil, err - } - c.ssrc, err = SSRC(buf[:n]) - if err != nil { - return nil, fmt.Errorf("could not parse SSRC from RTP packet, failed with error: %v", err) - } - return c, nil } @@ -85,9 +73,12 @@ func (c *Client) Read(p []byte) (int, error) { if err != nil { return n, err } + if c.ssrc == 0 { + c.ssrc, _ = SSRC(p[:n]) + } s, _ := Sequence(p[:n]) c.setSequence(s) - return c.r.Read(p) + return n, err } // setSequence sets the most recently received sequence number, and updates the From 39a573e10be15a5ed67e2dcafa0d05a85ffc77fa Mon Sep 17 00:00:00 2001 From: Saxon Date: Tue, 21 May 2019 01:51:14 +0930 Subject: [PATCH 13/23] protocol/rtsp: removed cmd/record as revid can do what this did --- protocol/rtsp/cmd/record/main.go | 78 -------------------------------- 1 file changed, 78 deletions(-) delete mode 100644 protocol/rtsp/cmd/record/main.go diff --git a/protocol/rtsp/cmd/record/main.go b/protocol/rtsp/cmd/record/main.go deleted file mode 100644 index 536e0068..00000000 --- a/protocol/rtsp/cmd/record/main.go +++ /dev/null @@ -1,78 +0,0 @@ -/* -NAME - record - -DESCRIPTION - record provides a program to connect to an RTSP server, request for an - RTP stream and then save the RTP payload to file. - -AUTHORS - Saxon A. Nelson-Milton - -LICENSE - This is Copyright (C) 2019 the Australian Ocean Lab (AusOcean). - - It is free software: you can redistribute it and/or modify them - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - It is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. - - You should have received a copy of the GNU General Public License - in gpl.txt. If not, see http://www.gnu.org/licenses. -*/ - -package main - -import ( - "flag" - "fmt" - "log" - - "bitbucket.org/ausocean/av/protocol/rtsp" -) - -func main() { - rtspServerPtr := flag.String("rtsp-server", "", "The RTSP server we would like to get video from.") - clientPortPtr := flag.Uint("port", 6870, "The port on the client we would like to receive RTP on.") - trackPtr := flag.String("track", "track1", "The track that we would like to receive media from.") - flag.Parse() - - clt, err := rtsp.NewClient(*rtspServerPtr) - if err != nil { - panic(fmt.Sprintf("creating RTSP client failed with error: %v", err)) - } - - resp, err := clt.Options() - if err != nil { - log.Fatalln(err) - } - fmt.Println("Options:") - fmt.Println(resp) - - resp, err = clt.Describe() - if err != nil { - log.Fatalln(err) - } - fmt.Println("Describe:") - fmt.Println(resp) - - resp, err = clt.Setup(*trackPtr, fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", *clientPortPtr, *clientPortPtr+1)) - if err != nil { - log.Fatalln(err) - } - log.Println(resp) - - resp, err = clt.Play() - if err != nil { - log.Fatalln(err) - } - log.Println(resp) - - // TODO(saxon): use RTCP client here to maintain stream. - select {} -} From 88ffdf08b58e11c57a8f94eb417e5a147e09c984 Mon Sep 17 00:00:00 2001 From: Saxon Date: Tue, 21 May 2019 12:18:52 +0930 Subject: [PATCH 14/23] revid: changed the input enum for RTSP input from RTSPCamera to RTSP --- cmd/revid-cli/main.go | 6 +++--- revid/config.go | 4 ++-- revid/revid.go | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 5c10e51f..d6523ca1 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -105,7 +105,7 @@ func handleFlags() revid.Config { var ( cpuprofile = flag.String("cpuprofile", "", "write cpu profile to `file`") - inputPtr = flag.String("Input", "", "The input type: Raspivid, File, Webcam, RTSPCamera") + inputPtr = flag.String("Input", "", "The input type: Raspivid, File, Webcam, RTSP") rtspURLPtr = flag.String("RTSPURL", "", "The URL for an RTSP server.") inputCodecPtr = flag.String("InputCodec", "", "The codec of the input: H264, Mjpeg") rtmpMethodPtr = flag.String("RtmpMethod", "", "The method used to send over rtmp: Ffmpeg, Librtmp") @@ -181,8 +181,8 @@ func handleFlags() revid.Config { cfg.Input = revid.V4L case "File": cfg.Input = revid.File - case "RTSPCamera": - cfg.Input = revid.RTSPCamera + case "RTSP": + cfg.Input = revid.RTSP case "": default: log.Log(logger.Error, pkg+"bad input argument") diff --git a/revid/config.go b/revid/config.go index ab68e677..990182c9 100644 --- a/revid/config.go +++ b/revid/config.go @@ -131,7 +131,7 @@ const ( Udp MpegtsRtp Rtp - RTSPCamera + RTSP ) // Default config settings @@ -174,7 +174,7 @@ func (c *Config) Validate(r *Revid) error { } switch c.Input { - case Raspivid, V4L, File, RTSPCamera: + case Raspivid, V4L, File, RTSP: case NothingDefined: c.Logger.Log(logger.Info, pkg+"no input type defined, defaulting", "input", defaultInput) c.Input = defaultInput diff --git a/revid/revid.go b/revid/revid.go index 88a33b7e..6d8f219a 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -178,7 +178,7 @@ func (r *Revid) reset(config Config) error { switch r.config.Input { case Raspivid, File, V4L: st = mts.H264 - case RTSPCamera: + case RTSP: st = mts.H265 } e := mts.NewEncoder(dst, float64(fps), st) @@ -287,7 +287,7 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.WriteCloser, rate int) case File: r.setupInput = r.setupInputForFile r.lexTo = h264.Lex - case RTSPCamera: + case RTSP: r.setupInput = r.startRTSPCamera r.lexTo = h265.NewLexer(false).Lex } From eeaf806c6efc2517c8f2541650c61d6f86fda32a Mon Sep 17 00:00:00 2001 From: Saxon Date: Tue, 21 May 2019 13:19:09 +0930 Subject: [PATCH 15/23] protocol/rtcp/client.go & revid/revid.go: removed rtcp.Client.Done() and now blocking on rtcp.Client.Err() in revid. It seems unnecessary to have the rtcp.Client.Done() func, considering that we could use the rtcp.Client.err channel itself to determine if the RTCP client has been stopped. We simple wait on a chan receive in revid in the error handling routine, and we check the 'ok' return - if it is false, then the err chan has been closed and we can get out of the error handling loop. This should also reduce CPU usage significantly. --- protocol/rtcp/client.go | 10 ++-------- revid/revid.go | 9 ++++----- 2 files changed, 6 insertions(+), 13 deletions(-) diff --git a/protocol/rtcp/client.go b/protocol/rtcp/client.go index 80d65511..6accecb8 100644 --- a/protocol/rtcp/client.go +++ b/protocol/rtcp/client.go @@ -80,7 +80,6 @@ type Client struct { func NewClient(clientAddress, serverAddress string, rtpClt *rtp.Client, l Log) (*Client, error) { c := &Client{ name: defaultClientName, - err: make(chan error), quit: make(chan struct{}), interval: defaultSendInterval, rtpClt: rtpClt, @@ -121,6 +120,7 @@ func (c *Client) SetName(name string) { // reports to the server. func (c *Client) Start() { c.log(logger.Debug, pkg+"Client is starting") + c.err = make(chan error) c.wg.Add(2) go c.recv() go c.send() @@ -132,16 +132,10 @@ func (c *Client) Stop() { c.log(logger.Debug, pkg+"Client is stopping") close(c.quit) c.conn.Close() + close(c.err) c.wg.Wait() } -// Done gives access to the clients quit chan, which is closed when the RTCP -// client is stopped. Therefore, Done may be used to check when the RTCP client -// has stopped running - ideal for use in a routine checking Client.Err(). -func (c *Client) Done() <-chan struct{} { - return c.quit -} - // Err provides read access to the Client err channel. This must be checked // otherwise the client will block if an error encountered. func (c *Client) Err() <-chan error { diff --git a/revid/revid.go b/revid/revid.go index 6d8f219a..97d96714 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -667,12 +667,11 @@ func (r *Revid) startRTSPCamera() (func() error, error) { // Check errors from RTCP client until it has stopped running. go func() { for { - select { - case <-rtcpClt.Done(): - return - case err := <-rtcpClt.Err(): + err, ok := <-rtcpClt.Err() + if ok { r.config.Logger.Log(logger.Warning, pkg+"RTCP error", "error", err.Error()) - default: + } else { + return } } }() From d29141cf05204a86c35a6992b3b81c3b9c423909 Mon Sep 17 00:00:00 2001 From: Saxon Date: Tue, 21 May 2019 16:57:17 +0930 Subject: [PATCH 16/23] container/mts: changed consts H264, H265 and Audio to EncodeH264, EncodeH265 and EncodeAudio --- container/mts/encoder.go | 12 ++++++------ container/mts/encoder_test.go | 4 ++-- container/mts/metaEncode_test.go | 4 ++-- revid/revid.go | 4 ++-- revid/senders_test.go | 6 +++--- 5 files changed, 15 insertions(+), 15 deletions(-) diff --git a/container/mts/encoder.go b/container/mts/encoder.go index 0302c73a..1092b659 100644 --- a/container/mts/encoder.go +++ b/container/mts/encoder.go @@ -87,9 +87,9 @@ const ( // Video and Audio constants are used to communicate which media type will be encoded when creating a // new encoder with NewEncoder. const ( - H264 = iota - H265 - Audio + EncodeH264 = iota + EncodeH265 + EncodeAudio ) // Time-related constants. @@ -133,13 +133,13 @@ func NewEncoder(dst io.WriteCloser, rate float64, mediaType int) *Encoder { var mPid int var sid byte switch mediaType { - case Audio: + case EncodeAudio: mPid = audioPid sid = audioStreamID - case H265: + case EncodeH265: mPid = videoPid sid = H265ID - case H264: + case EncodeH264: mPid = videoPid sid = H264ID } diff --git a/container/mts/encoder_test.go b/container/mts/encoder_test.go index 188c615a..24fb823d 100644 --- a/container/mts/encoder_test.go +++ b/container/mts/encoder_test.go @@ -99,7 +99,7 @@ func TestEncodeVideo(t *testing.T) { // Create the dst and write the test data to encoder. dst := &destination{} - _, err := NewEncoder(nopCloser{dst}, 25, H264).Write(data) + _, err := NewEncoder(nopCloser{dst}, 25, EncodeH264).Write(data) if err != nil { t.Fatalf("could not write data to encoder, failed with err: %v\n", err) } @@ -158,7 +158,7 @@ func TestEncodePcm(t *testing.T) { sampleSize := 2 blockSize := 16000 writeFreq := float64(sampleRate*sampleSize) / float64(blockSize) - e := NewEncoder(nopCloser{&buf}, writeFreq, Audio) + e := NewEncoder(nopCloser{&buf}, writeFreq, EncodeAudio) inPath := "../../../test/test-data/av/input/sweep_400Hz_20000Hz_-3dBFS_5s_48khz.pcm" inPcm, err := ioutil.ReadFile(inPath) diff --git a/container/mts/metaEncode_test.go b/container/mts/metaEncode_test.go index 75d5f6e1..83660777 100644 --- a/container/mts/metaEncode_test.go +++ b/container/mts/metaEncode_test.go @@ -48,7 +48,7 @@ const fps = 25 func TestMetaEncode1(t *testing.T) { Meta = meta.New() var buf bytes.Buffer - e := NewEncoder(nopCloser{&buf}, fps, H264) + e := NewEncoder(nopCloser{&buf}, fps, EncodeH264) Meta.Add("ts", "12345678") if err := e.writePSI(); err != nil { t.Errorf(errUnexpectedErr, err.Error()) @@ -76,7 +76,7 @@ func TestMetaEncode1(t *testing.T) { func TestMetaEncode2(t *testing.T) { Meta = meta.New() var buf bytes.Buffer - e := NewEncoder(nopCloser{&buf}, fps, H264) + e := NewEncoder(nopCloser{&buf}, fps, EncodeH264) Meta.Add("ts", "12345678") Meta.Add("loc", "1234,4321,1234") if err := e.writePSI(); err != nil { diff --git a/revid/revid.go b/revid/revid.go index 97d96714..fa30d5a8 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -177,9 +177,9 @@ func (r *Revid) reset(config Config) error { var st int switch r.config.Input { case Raspivid, File, V4L: - st = mts.H264 + st = mts.EncodeH264 case RTSP: - st = mts.H265 + st = mts.EncodeH265 } e := mts.NewEncoder(dst, float64(fps), st) return e, nil diff --git a/revid/senders_test.go b/revid/senders_test.go index c350e369..b7f67959 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -134,7 +134,7 @@ func TestMtsSenderSegment(t *testing.T) { const numberOfClips = 11 dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips} sender := newMtsSender(dst, (*dummyLogger)(t).log, rbSize, rbElementSize, 0) - encoder := mts.NewEncoder(sender, 25, mts.H264) + encoder := mts.NewEncoder(sender, 25, mts.EncodeH264) // Turn time based PSI writing off for encoder. const psiSendCount = 10 @@ -212,7 +212,7 @@ func TestMtsSenderFailedSend(t *testing.T) { const clipToFailAt = 3 dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})} sender := newMtsSender(dst, (*dummyLogger)(t).log, rbSize, rbElementSize, 0) - encoder := mts.NewEncoder(sender, 25, mts.H264) + encoder := mts.NewEncoder(sender, 25, mts.EncodeH264) // Turn time based PSI writing off for encoder and send PSI every 10 packets. const psiSendCount = 10 @@ -292,7 +292,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) { const clipToDelay = 3 dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})} sender := newMtsSender(dst, (*dummyLogger)(t).log, 1, rbElementSize, 0) - encoder := mts.NewEncoder(sender, 25, mts.H264) + encoder := mts.NewEncoder(sender, 25, mts.EncodeH264) // Turn time based PSI writing off for encoder. const psiSendCount = 10 From fbcd1638643d52e14dde984a9a4aeef234c4e718 Mon Sep 17 00:00:00 2001 From: Saxon Date: Tue, 21 May 2019 17:07:28 +0930 Subject: [PATCH 17/23] protocol/rtcp: fixed bug regarding checking of close err channel from client in routines. --- protocol/rtcp/client.go | 2 +- protocol/rtcp/client_test.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/protocol/rtcp/client.go b/protocol/rtcp/client.go index 6accecb8..87194405 100644 --- a/protocol/rtcp/client.go +++ b/protocol/rtcp/client.go @@ -132,8 +132,8 @@ func (c *Client) Stop() { c.log(logger.Debug, pkg+"Client is stopping") close(c.quit) c.conn.Close() - close(c.err) c.wg.Wait() + close(c.err) } // Err provides read access to the Client err channel. This must be checked diff --git a/protocol/rtcp/client_test.go b/protocol/rtcp/client_test.go index 111bcbbf..6c95c75d 100644 --- a/protocol/rtcp/client_test.go +++ b/protocol/rtcp/client_test.go @@ -166,14 +166,14 @@ func TestReceiveAndSend(t *testing.T) { go func() { for { - select { - case err := <-c.Err(): + err, ok := <-c.Err() + if ok { const errConnClosed = "use of closed network connection" if !strings.Contains(err.Error(), errConnClosed) { t.Fatalf("error received from client error chan: %v\n", err) } - - default: + } else { + return } } }() From 85984555a3eeb897b009a2ef8ce2eeb321d6a4c7 Mon Sep 17 00:00:00 2001 From: Saxon Date: Tue, 21 May 2019 17:09:47 +0930 Subject: [PATCH 18/23] container/mts/encoder.go: fixed comment for pmtTable initialisation --- container/mts/encoder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/container/mts/encoder.go b/container/mts/encoder.go index 1092b659..a5594581 100644 --- a/container/mts/encoder.go +++ b/container/mts/encoder.go @@ -144,7 +144,7 @@ func NewEncoder(dst io.WriteCloser, rate float64, mediaType int) *Encoder { sid = H264ID } - // standardPmt is a minimal PMT, without descriptors for time and location. + // standardPmt is a minimal PMT, without descriptors for metadata. pmtTable = (&psi.PSI{ Pf: 0x00, Tid: 0x02, From ecc0ab866474d87aa66c41cfe90f686261646222 Mon Sep 17 00:00:00 2001 From: Saxon Date: Thu, 23 May 2019 14:05:17 +0930 Subject: [PATCH 19/23] container/mts/encoder.go: updated comment for Encode constants --- container/mts/encoder.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/container/mts/encoder.go b/container/mts/encoder.go index a5594581..a1598dd8 100644 --- a/container/mts/encoder.go +++ b/container/mts/encoder.go @@ -84,8 +84,7 @@ const ( audioStreamID = 0xc0 // First audio stream ID. ) -// Video and Audio constants are used to communicate which media type will be encoded when creating a -// new encoder with NewEncoder. +// Constants used to communicate which media codec will be packetized. const ( EncodeH264 = iota EncodeH265 From d7d205a7a990e3f2d7a8da96d8345cbb03f923c8 Mon Sep 17 00:00:00 2001 From: Saxon Date: Thu, 23 May 2019 14:07:19 +0930 Subject: [PATCH 20/23] protocol/rtcp/parse.go: updated comment for ParseTimestamp(...) --- protocol/rtcp/parse.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/protocol/rtcp/parse.go b/protocol/rtcp/parse.go index be680fa0..458be546 100644 --- a/protocol/rtcp/parse.go +++ b/protocol/rtcp/parse.go @@ -38,9 +38,9 @@ type Timestamp struct { Fraction uint32 } -// Timestamp gets the timestamp from a receiver report and returns it as the most -// significant word, and the least significant word. If the given bytes do not -// represent a valid receiver report, an error is returned. +// ParseTimestamp gets the timestamp from a receiver report and returns it as +// a Timestamp as defined above. If the given bytes do not represent a valid +// receiver report, an error is returned. func ParseTimestamp(buf []byte) (Timestamp, error) { err := checkPacket(buf) if err != nil { From 809d904878a3616dd0c35881881181cb0808e560 Mon Sep 17 00:00:00 2001 From: Saxon Date: Thu, 23 May 2019 14:10:02 +0930 Subject: [PATCH 21/23] revid/revid.go: commented startRTSPCamera --- revid/revid.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/revid/revid.go b/revid/revid.go index fa30d5a8..6ba89638 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -620,6 +620,9 @@ func (r *Revid) setupInputForFile() (func() error, error) { return func() error { return f.Close() }, nil } +// startRTSPCamera uses RTSP to request an RTP stream from an IP camera. An RTP +// client is created from which RTP packets containing either h264/h265 can read +// by the selected lexer. func (r *Revid) startRTSPCamera() (func() error, error) { rtspClt, local, remote, err := rtsp.NewClient(r.config.RTSPURL) if err != nil { From ad241abdfd4342c1fd229ae95835c70713325852 Mon Sep 17 00:00:00 2001 From: Saxon Date: Fri, 24 May 2019 10:14:37 +0930 Subject: [PATCH 22/23] protocol/rtcp/client.go: fixed missing double quote in comment for Client.SetName() --- protocol/rtcp/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/rtcp/client.go b/protocol/rtcp/client.go index 87194405..4ac5b694 100644 --- a/protocol/rtcp/client.go +++ b/protocol/rtcp/client.go @@ -110,7 +110,7 @@ func (c *Client) SetSendInterval(d time.Duration) { } // SetName sets a custom client name for use in receiver report source description. -// Default is Client". +// Default is "Client". func (c *Client) SetName(name string) { c.name = name } From 03c45b1bcfac90ac04ef7c373dfd93e3e202d58a Mon Sep 17 00:00:00 2001 From: Saxon Date: Mon, 27 May 2019 14:31:14 +0930 Subject: [PATCH 23/23] protocol/rtcp/parse.go: removed ParseSSRC and checkPacket functions as not required anymore --- protocol/rtcp/parse.go | 34 +++++++++------------------------- 1 file changed, 9 insertions(+), 25 deletions(-) diff --git a/protocol/rtcp/parse.go b/protocol/rtcp/parse.go index 458be546..2007a26d 100644 --- a/protocol/rtcp/parse.go +++ b/protocol/rtcp/parse.go @@ -42,9 +42,15 @@ type Timestamp struct { // a Timestamp as defined above. If the given bytes do not represent a valid // receiver report, an error is returned. func ParseTimestamp(buf []byte) (Timestamp, error) { - err := checkPacket(buf) - if err != nil { - return Timestamp{}, err + if len(buf) < 4 { + return Timestamp{}, errors.New("bad RTCP packet, not of sufficient length") + } + if (buf[0]&0xc0)>>6 != rtcpVer { + return Timestamp{}, errors.New("incompatible RTCP version") + } + + if buf[1] != typeSenderReport { + return Timestamp{}, errors.New("RTCP packet is not of sender report type") } return Timestamp{ @@ -52,25 +58,3 @@ func ParseTimestamp(buf []byte) (Timestamp, error) { Fraction: binary.BigEndian.Uint32(buf[12:]), }, nil } - -func ParseSSRC(buf []byte) (uint32, error) { - err := checkPacket(buf) - if err != nil { - return 0, err - } - return binary.BigEndian.Uint32(buf[4:]), nil -} - -func checkPacket(buf []byte) error { - if len(buf) < 4 { - return errors.New("bad RTCP packet, not of sufficient length") - } - if (buf[0]&0xc0)>>6 != rtcpVer { - return errors.New("incompatible RTCP version") - } - - if buf[1] != typeSenderReport { - return errors.New("RTCP packet is not of sender report type") - } - return nil -}