diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index fc90dbcc..d6523ca1 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -105,7 +105,8 @@ func handleFlags() revid.Config { var ( cpuprofile = flag.String("cpuprofile", "", "write cpu profile to `file`") - inputPtr = flag.String("Input", "", "The input type: Raspivid, File, Webcam") + inputPtr = flag.String("Input", "", "The input type: Raspivid, File, Webcam, RTSP") + rtspURLPtr = flag.String("RTSPURL", "", "The URL for an RTSP server.") inputCodecPtr = flag.String("InputCodec", "", "The codec of the input: H264, Mjpeg") rtmpMethodPtr = flag.String("RtmpMethod", "", "The method used to send over rtmp: Ffmpeg, Librtmp") quantizePtr = flag.Bool("Quantize", false, "Quantize input (non-variable bitrate)") @@ -180,6 +181,8 @@ func handleFlags() revid.Config { cfg.Input = revid.V4L case "File": cfg.Input = revid.File + case "RTSP": + cfg.Input = revid.RTSP case "": default: log.Log(logger.Error, pkg+"bad input argument") @@ -223,6 +226,7 @@ func handleFlags() revid.Config { netsender.ConfigFile = *configFilePtr } + cfg.RTSPURL = *rtspURLPtr cfg.Quantize = *quantizePtr cfg.Rotation = *rotationPtr cfg.FlipHorizontal = *horizontalFlipPtr diff --git a/codec/h265/lex.go b/codec/h265/lex.go index fcf2e59d..ebe34013 100644 --- a/codec/h265/lex.go +++ b/codec/h265/lex.go @@ -63,7 +63,8 @@ type Lexer struct { func NewLexer(donl bool) *Lexer { return &Lexer{ donl: donl, - buf: bytes.NewBuffer(make([]byte, 0, maxAUSize))} + buf: bytes.NewBuffer(make([]byte, 0, maxAUSize)), + } } // Lex continually reads RTP packets from the io.Reader src and lexes into @@ -86,7 +87,6 @@ func (l *Lexer) Lex(dst io.Writer, src io.Reader, delay time.Duration) error { if err != nil { return fmt.Errorf("could not get rtp payload, failed with err: %v\n", err) } - nalType := (payload[0] >> 1) & 0x3f // If not currently fragmented then we ignore current write. @@ -151,9 +151,20 @@ func (l *Lexer) handleFragmentation(d []byte) { start := d[2]&0x80 != 0 end := d[2]&0x40 != 0 - d = d[3:] - if l.donl { - d = d[2:] + b1 := (d[0] & 0x81) | ((d[2] & 0x3f) << 1) + b2 := d[1] + if start { + d = d[1:] + if l.donl { + d = d[2:] + } + d[0] = b1 + d[1] = b2 + } else { + d = d[3:] + if l.donl { + d = d[2:] + } } switch { diff --git a/codec/h265/lex_test.go b/codec/h265/lex_test.go index 637a1860..1a409e4c 100644 --- a/codec/h265/lex_test.go +++ b/codec/h265/lex_test.go @@ -129,7 +129,7 @@ func TestLex(t *testing.T) { 0x01, 0x02, 0x03, 0x04, // NAL data. // NAL 2 0x00, 0x00, 0x00, 0x01, // Start code. - 0x01, 0x02, 0x03, // FU payload. + 0x00, 0x00, 0x01, 0x02, 0x03, // FU payload. 0x04, 0x05, 0x06, // FU payload. 0x07, 0x08, 0x09, // FU payload. // NAL 3 @@ -216,7 +216,7 @@ func TestLex(t *testing.T) { 0x01, 0x02, 0x03, 0x04, // NAL data. // NAL 2 0x00, 0x00, 0x00, 0x01, // Start code. - 0x01, 0x02, 0x03, // FU payload. + 0x00, 0x00, 0x01, 0x02, 0x03, // FU payload. 0x04, 0x05, 0x06, // FU payload. 0x07, 0x08, 0x09, // FU payload. // NAL 3 diff --git a/container/mts/encoder.go b/container/mts/encoder.go index 3208276b..a1598dd8 100644 --- a/container/mts/encoder.go +++ b/container/mts/encoder.go @@ -55,30 +55,6 @@ var ( }, }, } - - // standardPmt is a minimal PMT, without descriptors for time and location. - standardPmt = psi.PSI{ - Pf: 0x00, - Tid: 0x02, - Ssi: true, - Sl: 0x12, - Tss: &psi.TSS{ - Tide: 0x01, - V: 0, - Cni: true, - Sn: 0, - Lsn: 0, - Sd: &psi.PMT{ - Pcrpid: 0x0100, - Pil: 0, - Essd: &psi.ESSD{ - St: 0x1b, - Epid: 0x0100, - Esil: 0x00, - }, - }, - }, - } ) const ( @@ -94,7 +70,7 @@ var Meta *meta.Data var ( patTable = standardPat.Bytes() - pmtTable = standardPmt.Bytes() + pmtTable []byte ) const ( @@ -103,15 +79,16 @@ const ( pmtPid = 4096 videoPid = 256 audioPid = 210 - videoStreamID = 0xe0 // First video stream ID. + H264ID = 27 + H265ID = 36 audioStreamID = 0xc0 // First audio stream ID. ) -// Video and Audio constants are used to communicate which media type will be encoded when creating a -// new encoder with NewEncoder. +// Constants used to communicate which media codec will be packetized. const ( - Video = iota - Audio + EncodeH264 = iota + EncodeH265 + EncodeAudio ) // Time-related constants. @@ -155,14 +132,41 @@ func NewEncoder(dst io.WriteCloser, rate float64, mediaType int) *Encoder { var mPid int var sid byte switch mediaType { - case Audio: + case EncodeAudio: mPid = audioPid sid = audioStreamID - case Video: + case EncodeH265: mPid = videoPid - sid = videoStreamID + sid = H265ID + case EncodeH264: + mPid = videoPid + sid = H264ID } + // standardPmt is a minimal PMT, without descriptors for metadata. + pmtTable = (&psi.PSI{ + Pf: 0x00, + Tid: 0x02, + Ssi: true, + Sl: 0x12, + Tss: &psi.TSS{ + Tide: 0x01, + V: 0, + Cni: true, + Sn: 0, + Lsn: 0, + Sd: &psi.PMT{ + Pcrpid: 0x0100, + Pil: 0, + Essd: &psi.ESSD{ + St: byte(sid), + Epid: 0x0100, + Esil: 0x00, + }, + }, + }, + }).Bytes() + return &Encoder{ dst: dst, diff --git a/container/mts/encoder_test.go b/container/mts/encoder_test.go index 8436d241..24fb823d 100644 --- a/container/mts/encoder_test.go +++ b/container/mts/encoder_test.go @@ -99,7 +99,7 @@ func TestEncodeVideo(t *testing.T) { // Create the dst and write the test data to encoder. dst := &destination{} - _, err := NewEncoder(nopCloser{dst}, 25, Video).Write(data) + _, err := NewEncoder(nopCloser{dst}, 25, EncodeH264).Write(data) if err != nil { t.Fatalf("could not write data to encoder, failed with err: %v\n", err) } @@ -158,7 +158,7 @@ func TestEncodePcm(t *testing.T) { sampleSize := 2 blockSize := 16000 writeFreq := float64(sampleRate*sampleSize) / float64(blockSize) - e := NewEncoder(nopCloser{&buf}, writeFreq, Audio) + e := NewEncoder(nopCloser{&buf}, writeFreq, EncodeAudio) inPath := "../../../test/test-data/av/input/sweep_400Hz_20000Hz_-3dBFS_5s_48khz.pcm" inPcm, err := ioutil.ReadFile(inPath) diff --git a/container/mts/metaEncode_test.go b/container/mts/metaEncode_test.go index 939de5b7..83660777 100644 --- a/container/mts/metaEncode_test.go +++ b/container/mts/metaEncode_test.go @@ -48,7 +48,7 @@ const fps = 25 func TestMetaEncode1(t *testing.T) { Meta = meta.New() var buf bytes.Buffer - e := NewEncoder(nopCloser{&buf}, fps, Video) + e := NewEncoder(nopCloser{&buf}, fps, EncodeH264) Meta.Add("ts", "12345678") if err := e.writePSI(); err != nil { t.Errorf(errUnexpectedErr, err.Error()) @@ -76,7 +76,7 @@ func TestMetaEncode1(t *testing.T) { func TestMetaEncode2(t *testing.T) { Meta = meta.New() var buf bytes.Buffer - e := NewEncoder(nopCloser{&buf}, fps, Video) + e := NewEncoder(nopCloser{&buf}, fps, EncodeH264) Meta.Add("ts", "12345678") Meta.Add("loc", "1234,4321,1234") if err := e.writePSI(); err != nil { diff --git a/container/mts/mpegts_test.go b/container/mts/mpegts_test.go index 650fecab..4c90cc0e 100644 --- a/container/mts/mpegts_test.go +++ b/container/mts/mpegts_test.go @@ -126,7 +126,7 @@ func writePSI(b *bytes.Buffer) error { func writeFrame(b *bytes.Buffer, frame []byte, pts uint64) error { // Prepare PES data. pesPkt := pes.Packet{ - StreamID: videoStreamID, + StreamID: H264ID, PDI: hasPTS, PTS: pts, Data: frame, diff --git a/protocol/rtcp/client.go b/protocol/rtcp/client.go index 7d6c995c..4ac5b694 100644 --- a/protocol/rtcp/client.go +++ b/protocol/rtcp/client.go @@ -37,16 +37,18 @@ import ( "sync" "time" + "bitbucket.org/ausocean/av/protocol/rtp" "bitbucket.org/ausocean/utils/logger" ) const ( - senderSSRC = 1 // Any non-zero value will do. - defaultClientName = "Client" - delayUnit = 1.0 / 65536.0 - pkg = "rtcp: " - rtcpVer = 2 - receiverBufSize = 200 + clientSSRC = 1 // Any non-zero value will do. + defaultClientName = "Client" + defaultSendInterval = 2 * time.Second + delayUnit = 1.0 / 65536.0 + pkg = "rtcp: " + rtcpVer = 2 + receiverBufSize = 200 ) // Log describes a function signature required by the RTCP for the purpose of @@ -70,23 +72,18 @@ type Client struct { wg sync.WaitGroup // This is used to wait for send and recv routines to stop when Client is stopped. quit chan struct{} // Channel used to communicate quit signal to send and recv routines. log Log // Used to log any messages. - - err chan error // Client will send any errors through this chan. Can be accessed by Err(). + rtpClt *rtp.Client + err chan error // Client will send any errors through this chan. Can be accessed by Err(). } // NewClient returns a pointer to a new Client. -func NewClient(clientAddress, serverAddress, name string, sendInterval time.Duration, rtpSSRC uint32, l Log) (*Client, error) { - if name == "" { - name = defaultClientName - } - +func NewClient(clientAddress, serverAddress string, rtpClt *rtp.Client, l Log) (*Client, error) { c := &Client{ - name: name, - err: make(chan error), - quit: make(chan struct{}), - interval: sendInterval, - sourceSSRC: rtpSSRC, - log: l, + name: defaultClientName, + quit: make(chan struct{}), + interval: defaultSendInterval, + rtpClt: rtpClt, + log: l, } var err error @@ -107,11 +104,23 @@ func NewClient(clientAddress, serverAddress, name string, sendInterval time.Dura return c, nil } +// SetSendInterval sets a custom receiver report send interval (default is 5 seconds.) +func (c *Client) SetSendInterval(d time.Duration) { + c.interval = d +} + +// SetName sets a custom client name for use in receiver report source description. +// Default is "Client". +func (c *Client) SetName(name string) { + c.name = name +} + // Start starts the listen and send routines. This will start the process of // receiving and parsing sender reports, and the process of sending receiver // reports to the server. func (c *Client) Start() { c.log(logger.Debug, pkg+"Client is starting") + c.err = make(chan error) c.wg.Add(2) go c.recv() go c.send() @@ -124,6 +133,7 @@ func (c *Client) Stop() { close(c.quit) c.conn.Close() c.wg.Wait() + close(c.err) } // Err provides read access to the Client err channel. This must be checked @@ -171,13 +181,13 @@ func (c *Client) send() { ReportCount: 1, Type: typeReceiverReport, }, - SenderSSRC: senderSSRC, + SenderSSRC: clientSSRC, Blocks: []ReportBlock{ ReportBlock{ - SourceIdentifier: c.sourceSSRC, + SourceIdentifier: c.rtpClt.SSRC(), FractionLost: 0, PacketsLost: math.MaxUint32, - HighestSequence: c.sequence(), + HighestSequence: uint32((c.rtpClt.Cycles() << 16) | c.rtpClt.Sequence()), Jitter: c.jitter(), SenderReportTs: c.lastSenderTs(), SenderReportDelay: c.delay(), @@ -195,7 +205,7 @@ func (c *Client) send() { }, Chunks: []Chunk{ Chunk{ - SSRC: senderSSRC, + SSRC: clientSSRC, Items: []SDESItem{ SDESItem{ Type: typeCName, @@ -238,22 +248,6 @@ func (c *Client) parse(buf []byte) { c.setSenderTs(t) } -// SetSequence will allow updating of the highest sequence number received -// through an RTP stream. -func (c *Client) SetSequence(s uint32) { - c.mu.Lock() - c.seq = s - c.mu.Unlock() -} - -// sequence will return the highest sequence number received through RTP. -func (c *Client) sequence() uint32 { - c.mu.Lock() - s := c.seq - c.mu.Unlock() - return s -} - // jitter returns the interarrival jitter as described by RTCP specifications: // https://tools.ietf.org/html/rfc3550 // TODO(saxon): complete this. diff --git a/protocol/rtcp/client_test.go b/protocol/rtcp/client_test.go index 64a4d685..6c95c75d 100644 --- a/protocol/rtcp/client_test.go +++ b/protocol/rtcp/client_test.go @@ -37,6 +37,7 @@ import ( "testing" "time" + "bitbucket.org/ausocean/av/protocol/rtp" "bitbucket.org/ausocean/utils/logger" ) @@ -148,12 +149,15 @@ func (dl *dummyLogger) log(lvl int8, msg string, args ...interface{}) { // respond with sender reports. func TestReceiveAndSend(t *testing.T) { const clientAddr, serverAddr = "localhost:8000", "localhost:8001" + rtpClt, err := rtp.NewClient("localhost:8002") + if err != nil { + t.Fatalf("unexpected error when creating RTP client: %v", err) + } + c, err := NewClient( clientAddr, serverAddr, - "testClient", - 10*time.Millisecond, - 12345, + rtpClt, (*dummyLogger)(t).log, ) if err != nil { @@ -162,14 +166,14 @@ func TestReceiveAndSend(t *testing.T) { go func() { for { - select { - case err := <-c.Err(): + err, ok := <-c.Err() + if ok { const errConnClosed = "use of closed network connection" if !strings.Contains(err.Error(), errConnClosed) { t.Fatalf("error received from client error chan: %v\n", err) } - - default: + } else { + return } } }() @@ -197,8 +201,6 @@ func TestReceiveAndSend(t *testing.T) { n, _, _ := conn.ReadFromUDP(buf) t.Logf("SERVER: receiver report received: \n%v\n", buf[:n]) - c.SetSequence(uint32(i)) - now := time.Now().Second() var time [8]byte binary.BigEndian.PutUint64(time[:], uint64(now)) diff --git a/protocol/rtcp/parse.go b/protocol/rtcp/parse.go index 2f756f4b..2007a26d 100644 --- a/protocol/rtcp/parse.go +++ b/protocol/rtcp/parse.go @@ -38,9 +38,9 @@ type Timestamp struct { Fraction uint32 } -// Timestamp gets the timestamp from a receiver report and returns it as the most -// significant word, and the least significant word. If the given bytes do not -// represent a valid receiver report, an error is returned. +// ParseTimestamp gets the timestamp from a receiver report and returns it as +// a Timestamp as defined above. If the given bytes do not represent a valid +// receiver report, an error is returned. func ParseTimestamp(buf []byte) (Timestamp, error) { if len(buf) < 4 { return Timestamp{}, errors.New("bad RTCP packet, not of sufficient length") diff --git a/protocol/rtp/client.go b/protocol/rtp/client.go index 3ab856b9..e8418b0d 100644 --- a/protocol/rtp/client.go +++ b/protocol/rtp/client.go @@ -29,12 +29,17 @@ package rtp import ( "net" + "sync" ) // Client describes an RTP client that can receive an RTP stream and implements // io.Reader. type Client struct { - r *PacketReader + r *PacketReader + ssrc uint32 + mu sync.Mutex + sequence uint16 + cycles uint16 } // NewClient returns a pointer to a new Client. @@ -53,13 +58,52 @@ func NewClient(addr string) (*Client, error) { if err != nil { return nil, err } - return c, nil } +// SSRC returns the identified for the source from which the RTP packets being +// received are coming from. +func (c *Client) SSRC() uint32 { + return c.ssrc +} + // Read implements io.Reader. func (c *Client) Read(p []byte) (int, error) { - return c.r.Read(p) + n, err := c.r.Read(p) + if err != nil { + return n, err + } + if c.ssrc == 0 { + c.ssrc, _ = SSRC(p[:n]) + } + s, _ := Sequence(p[:n]) + c.setSequence(s) + return n, err +} + +// setSequence sets the most recently received sequence number, and updates the +// cycles count if the sequence number has rolled over. +func (c *Client) setSequence(s uint16) { + c.mu.Lock() + if s < c.sequence { + c.cycles++ + } + c.sequence = s + c.mu.Unlock() +} + +// Sequence returns the most recent RTP packet sequence number received. +func (c *Client) Sequence() uint16 { + c.mu.Lock() + defer c.mu.Unlock() + return c.sequence +} + +// Cycles returns the number of RTP sequence number cycles that have been received. +func (c *Client) Cycles() uint16 { + c.mu.Lock() + defer c.mu.Unlock() + return c.cycles } // PacketReader provides an io.Reader interface to an underlying UDP PacketConn. diff --git a/protocol/rtp/parse.go b/protocol/rtp/parse.go index fcc05907..16e64c5d 100644 --- a/protocol/rtp/parse.go +++ b/protocol/rtp/parse.go @@ -50,11 +50,9 @@ func Marker(d []byte) (bool, error) { // Payload returns the payload from an RTP packet provided the version is // compatible, otherwise an error is returned. func Payload(d []byte) ([]byte, error) { - if len(d) < defaultHeadSize { - panic("invalid RTP packet length") - } - if version(d) != rtpVer { - return nil, errors.New(badVer) + err := checkPacket(d) + if err != nil { + return nil, err } extLen := 0 if hasExt(d) { @@ -64,6 +62,38 @@ func Payload(d []byte) ([]byte, error) { return d[payloadIdx:], nil } +// SSRC returns the source identifier from an RTP packet. An error is return if +// the packet is not valid. +func SSRC(d []byte) (uint32, error) { + err := checkPacket(d) + if err != nil { + return 0, err + } + return binary.BigEndian.Uint32(d[8:]), nil +} + +// Sequence returns the sequence number of an RTP packet. An error is returned +// if the packet is not valid. +func Sequence(d []byte) (uint16, error) { + err := checkPacket(d) + if err != nil { + return 0, err + } + return binary.BigEndian.Uint16(d[2:]), nil +} + +// checkPacket checks the validity of the packet, firstly by checking size and +// then also checking that version is compatible with these utilities. +func checkPacket(d []byte) error { + if len(d) < defaultHeadSize { + return errors.New("invalid RTP packet length") + } + if version(d) != rtpVer { + return errors.New(badVer) + } + return nil +} + // hasExt returns true if an extension is present in the RTP packet. func hasExt(d []byte) bool { return (d[0] & 0x10 >> 4) == 1 diff --git a/protocol/rtsp/client.go b/protocol/rtsp/client.go index 26e539db..f6c9d0eb 100644 --- a/protocol/rtsp/client.go +++ b/protocol/rtsp/client.go @@ -43,20 +43,27 @@ type Client struct { sessionID string } -// NewClient returns a pointer to a new Client. The address addr will be parsed and -// a connection to the RTSP server will be made. -func NewClient(addr string) (*Client, error) { - c := &Client{addr: addr} - var err error +// NewClient returns a pointer to a new Client and the local address of the +// RTSP connection. The address addr will be parsed and a connection to the +// RTSP server will be made. +func NewClient(addr string) (c *Client, local, remote *net.TCPAddr, err error) { + c = &Client{addr: addr} c.url, err = url.Parse(addr) if err != nil { - return nil, err + return nil, nil,nil, err } c.conn, err = net.Dial("tcp", c.url.Host) if err != nil { - return nil, err + return nil, nil, nil, err } - return c, nil + local = c.conn.LocalAddr().(*net.TCPAddr) + remote = c.conn.RemoteAddr().(*net.TCPAddr) + return +} + +// Close closes the RTSP connection. +func (c *Client) Close() error { + return c.conn.Close() } // Describe forms and sends an RTSP request of method DESCRIBE to the RTSP server. diff --git a/protocol/rtsp/cmd/record/main.go b/protocol/rtsp/cmd/record/main.go deleted file mode 100644 index 536e0068..00000000 --- a/protocol/rtsp/cmd/record/main.go +++ /dev/null @@ -1,78 +0,0 @@ -/* -NAME - record - -DESCRIPTION - record provides a program to connect to an RTSP server, request for an - RTP stream and then save the RTP payload to file. - -AUTHORS - Saxon A. Nelson-Milton <saxon@ausocean.org> - -LICENSE - This is Copyright (C) 2019 the Australian Ocean Lab (AusOcean). - - It is free software: you can redistribute it and/or modify them - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - It is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. - - You should have received a copy of the GNU General Public License - in gpl.txt. If not, see http://www.gnu.org/licenses. -*/ - -package main - -import ( - "flag" - "fmt" - "log" - - "bitbucket.org/ausocean/av/protocol/rtsp" -) - -func main() { - rtspServerPtr := flag.String("rtsp-server", "", "The RTSP server we would like to get video from.") - clientPortPtr := flag.Uint("port", 6870, "The port on the client we would like to receive RTP on.") - trackPtr := flag.String("track", "track1", "The track that we would like to receive media from.") - flag.Parse() - - clt, err := rtsp.NewClient(*rtspServerPtr) - if err != nil { - panic(fmt.Sprintf("creating RTSP client failed with error: %v", err)) - } - - resp, err := clt.Options() - if err != nil { - log.Fatalln(err) - } - fmt.Println("Options:") - fmt.Println(resp) - - resp, err = clt.Describe() - if err != nil { - log.Fatalln(err) - } - fmt.Println("Describe:") - fmt.Println(resp) - - resp, err = clt.Setup(*trackPtr, fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", *clientPortPtr, *clientPortPtr+1)) - if err != nil { - log.Fatalln(err) - } - log.Println(resp) - - resp, err = clt.Play() - if err != nil { - log.Fatalln(err) - } - log.Println(resp) - - // TODO(saxon): use RTCP client here to maintain stream. - select {} -} diff --git a/protocol/rtsp/rtsp_test.go b/protocol/rtsp/rtsp_test.go index 104f6b63..4157cfba 100644 --- a/protocol/rtsp/rtsp_test.go +++ b/protocol/rtsp/rtsp_test.go @@ -202,7 +202,7 @@ func TestMethods(t *testing.T) { // Keep trying to connect to server. // TODO: use generalised retry utility when available. for retry := 0; ; retry++ { - clt, err = NewClient(serverAddr) + clt, _, _, err = NewClient(serverAddr) if err == nil { break } diff --git a/revid/config.go b/revid/config.go index 010af3fd..990182c9 100644 --- a/revid/config.go +++ b/revid/config.go @@ -74,6 +74,7 @@ type Config struct { Saturation int Exposure string AutoWhiteBalance string + RTSPURL string } // Possible modes for raspivid --exposure parameter. @@ -130,6 +131,7 @@ const ( Udp MpegtsRtp Rtp + RTSP ) // Default config settings @@ -172,7 +174,7 @@ func (c *Config) Validate(r *Revid) error { } switch c.Input { - case Raspivid, V4L, File: + case Raspivid, V4L, File, RTSP: case NothingDefined: c.Logger.Log(logger.Info, pkg+"no input type defined, defaulting", "input", defaultInput) c.Input = defaultInput diff --git a/revid/revid.go b/revid/revid.go index c5d92c5a..7b88aada 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -33,6 +33,7 @@ import ( "errors" "fmt" "io" + "net" "os" "os/exec" "strconv" @@ -41,9 +42,12 @@ import ( "time" "bitbucket.org/ausocean/av/codec/h264" - "bitbucket.org/ausocean/av/codec/mjpeg" + "bitbucket.org/ausocean/av/codec/h265" "bitbucket.org/ausocean/av/container/flv" "bitbucket.org/ausocean/av/container/mts" + "bitbucket.org/ausocean/av/protocol/rtcp" + "bitbucket.org/ausocean/av/protocol/rtp" + "bitbucket.org/ausocean/av/protocol/rtsp" "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/ioext" "bitbucket.org/ausocean/utils/logger" @@ -61,6 +65,12 @@ const ( rtmpConnectionTimeout = 10 ) +const ( + rtpPort = 60000 + rtcpPort = 60001 + defaultServerRTCPPort = 17301 +) + const pkg = "revid:" type Logger interface { @@ -164,7 +174,14 @@ func (r *Revid) reset(config Config) error { err = r.setupPipeline( func(dst io.WriteCloser, fps int) (io.WriteCloser, error) { - e := mts.NewEncoder(dst, float64(fps), mts.Video) + var st int + switch r.config.Input { + case Raspivid, File, V4L: + st = mts.EncodeH264 + case RTSP: + st = mts.EncodeH265 + } + e := mts.NewEncoder(dst, float64(fps), st) return e, nil }, func(dst io.WriteCloser, fps int) (io.WriteCloser, error) { @@ -263,20 +280,17 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.WriteCloser, rate int) switch r.config.Input { case Raspivid: r.setupInput = r.startRaspivid + r.lexTo = h264.LexFromBytestream case V4L: r.setupInput = r.startV4L + r.lexTo = h264.LexFromBytestream case File: r.setupInput = r.setupInputForFile + case RTSP: + r.setupInput = r.startRTSPCamera + r.lexTo = h265.NewLexer(false).Lex } - switch r.config.InputCodec { - case H264: - r.config.Logger.Log(logger.Info, pkg+"using H264 lexer") - r.lexTo = h264.LexFromBytestream - case Mjpeg: - r.config.Logger.Log(logger.Info, pkg+"using MJPEG lexer") - r.lexTo = mjpeg.Lex - } return nil } @@ -605,6 +619,108 @@ func (r *Revid) setupInputForFile() (func() error, error) { return func() error { return f.Close() }, nil } +// startRTSPCamera uses RTSP to request an RTP stream from an IP camera. An RTP +// client is created from which RTP packets containing either h264/h265 can read +// by the selected lexer. +func (r *Revid) startRTSPCamera() (func() error, error) { + rtspClt, local, remote, err := rtsp.NewClient(r.config.RTSPURL) + if err != nil { + return nil, err + } + + resp, err := rtspClt.Options() + if err != nil { + return nil, err + } + r.config.Logger.Log(logger.Info, pkg+"RTSP OPTIONS response", "response", resp.String()) + + resp, err = rtspClt.Describe() + if err != nil { + return nil, err + } + r.config.Logger.Log(logger.Info, pkg+"RTSP DESCRIBE response", "response", resp.String()) + + resp, err = rtspClt.Setup("track1", fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", rtpPort, rtcpPort)) + if err != nil { + return nil, err + } + r.config.Logger.Log(logger.Info, pkg+"RTSP SETUP response", "response", resp.String()) + rtpCltAddr, rtcpCltAddr, rtcpSvrAddr, err := formAddrs(local, remote, *resp) + if err != nil { + return nil, err + } + + resp, err = rtspClt.Play() + if err != nil { + return nil, err + } + r.config.Logger.Log(logger.Info, pkg+"RTSP server PLAY response", "response", resp.String()) + + rtpClt, err := rtp.NewClient(rtpCltAddr) + if err != nil { + return nil, err + } + + rtcpClt, err := rtcp.NewClient(rtcpCltAddr, rtcpSvrAddr, rtpClt, r.config.Logger.Log) + if err != nil { + return nil, err + } + + // Check errors from RTCP client until it has stopped running. + go func() { + for { + err, ok := <-rtcpClt.Err() + if ok { + r.config.Logger.Log(logger.Warning, pkg+"RTCP error", "error", err.Error()) + } else { + return + } + } + }() + + // Start the RTCP client. + rtcpClt.Start() + + // Start reading data from the RTP client. + r.wg.Add(1) + go r.processFrom(rtpClt, time.Second/time.Duration(r.config.FrameRate)) + + return func() error { + rtspClt.Close() + rtcpClt.Stop() + return nil + }, nil +} + +// formAddrs is a helper function to form the addresses for the RTP client, +// RTCP client, and the RTSP server's RTCP addr using the local, remote addresses +// of the RTSP conn, and the SETUP method response. +func formAddrs(local, remote *net.TCPAddr, setupResp rtsp.Response) (rtpCltAddr, rtcpCltAddr, rtcpSvrAddr string, err error) { + svrRTCPPort, err := parseSvrRTCPPort(setupResp) + if err != nil { + return "", "", "", err + } + rtpCltAddr = strings.Split(local.String(), ":")[0] + ":" + strconv.Itoa(rtpPort) + rtcpCltAddr = strings.Split(local.String(), ":")[0] + ":" + strconv.Itoa(rtcpPort) + rtcpSvrAddr = strings.Split(remote.String(), ":")[0] + ":" + strconv.Itoa(svrRTCPPort) + return +} + +// parseServerRTCPPort is a helper function to get the RTSP server's RTCP port. +func parseSvrRTCPPort(resp rtsp.Response) (int, error) { + transport := resp.Header.Get("Transport") + for _, p := range strings.Split(transport, ";") { + if strings.Contains(p, "server_port") { + port, err := strconv.Atoi(strings.Split(p, "-")[1]) + if err != nil { + return 0, err + } + return port, nil + } + } + return 0, errors.New("SETUP response did not provide RTCP port") +} + func (r *Revid) processFrom(read io.Reader, delay time.Duration) { r.config.Logger.Log(logger.Info, pkg+"reading input data") r.err <- r.lexTo(r.encoders, read, delay) diff --git a/revid/senders_test.go b/revid/senders_test.go index 80293759..b7f67959 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -134,7 +134,7 @@ func TestMtsSenderSegment(t *testing.T) { const numberOfClips = 11 dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips} sender := newMtsSender(dst, (*dummyLogger)(t).log, rbSize, rbElementSize, 0) - encoder := mts.NewEncoder(sender, 25, mts.Video) + encoder := mts.NewEncoder(sender, 25, mts.EncodeH264) // Turn time based PSI writing off for encoder. const psiSendCount = 10 @@ -212,7 +212,7 @@ func TestMtsSenderFailedSend(t *testing.T) { const clipToFailAt = 3 dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})} sender := newMtsSender(dst, (*dummyLogger)(t).log, rbSize, rbElementSize, 0) - encoder := mts.NewEncoder(sender, 25, mts.Video) + encoder := mts.NewEncoder(sender, 25, mts.EncodeH264) // Turn time based PSI writing off for encoder and send PSI every 10 packets. const psiSendCount = 10 @@ -292,7 +292,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) { const clipToDelay = 3 dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})} sender := newMtsSender(dst, (*dummyLogger)(t).log, 1, rbElementSize, 0) - encoder := mts.NewEncoder(sender, 25, mts.Video) + encoder := mts.NewEncoder(sender, 25, mts.EncodeH264) // Turn time based PSI writing off for encoder. const psiSendCount = 10