diff --git a/codec/lex/lex.go b/codec/h264/lex.go similarity index 78% rename from codec/lex/lex.go rename to codec/h264/lex.go index da0dd1b6..3976cef1 100644 --- a/codec/lex/lex.go +++ b/codec/h264/lex.go @@ -3,7 +3,7 @@ NAME lex.go DESCRIPTION - See Readme.md + lex.go provides a lexer to lex h264 bytestream into access units. AUTHOR Dan Kortschak @@ -25,13 +25,11 @@ LICENSE along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. */ -// Package lex provides lexers for video encodings. -package lex +// lex.go provides a lexer to lex h264 bytestream into access units. + +package h264 import ( - "bufio" - "bytes" - "fmt" "io" "time" ) @@ -44,11 +42,11 @@ func init() { var h264Prefix = [...]byte{0x00, 0x00, 0x01, 0x09, 0xf0} -// H264 lexes H.264 NAL units read from src into separate writes to dst with +// Lex lexes H.264 NAL units read from src into separate writes to dst with // successive writes being performed not earlier than the specified delay. // NAL units are split after type 1 (Coded slice of a non-IDR picture), 5 // (Coded slice of a IDR picture) and 8 (Picture parameter set). -func H264(dst io.Writer, src io.Reader, delay time.Duration) error { +func Lex(dst io.Writer, src io.Reader, delay time.Duration) error { var tick <-chan time.Time if delay == 0 { tick = noDelay @@ -200,48 +198,3 @@ func (c *scanner) reload() error { c.off = 0 return nil } - -// MJPEG parses MJPEG frames read from src into separate writes to dst with -// successive writes being performed not earlier than the specified delay. -func MJPEG(dst io.Writer, src io.Reader, delay time.Duration) error { - var tick <-chan time.Time - if delay == 0 { - tick = noDelay - } else { - ticker := time.NewTicker(delay) - defer ticker.Stop() - tick = ticker.C - } - - r := bufio.NewReader(src) - for { - buf := make([]byte, 2, 4<<10) - n, err := r.Read(buf) - if n < 2 { - return nil - } - if err != nil { - return err - } - if !bytes.Equal(buf, []byte{0xff, 0xd8}) { - return fmt.Errorf("parser: not MJPEG frame start: %#v", buf) - } - var last byte - for { - b, err := r.ReadByte() - if err != nil { - return err - } - buf = append(buf, b) - if last == 0xff && b == 0xd9 { - break - } - last = b - } - <-tick - _, err = dst.Write(buf) - if err != nil { - return err - } - } -} diff --git a/codec/lex/lex_test.go b/codec/h264/lex_test.go similarity index 79% rename from codec/lex/lex_test.go rename to codec/h264/lex_test.go index a107b253..d584ce18 100644 --- a/codec/lex/lex_test.go +++ b/codec/h264/lex_test.go @@ -3,7 +3,7 @@ NAME lex_test.go DESCRIPTION - See Readme.md + lex_test.go provides tests for the lexer in lex.go. AUTHOR Dan Kortschak @@ -25,7 +25,9 @@ LICENSE along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. */ -package lex +// lex_test.go provides tests for the lexer in lex.go. + +package h264 import ( "bytes" @@ -207,7 +209,7 @@ var h264Tests = []struct { func TestH264(t *testing.T) { for _, test := range h264Tests { var buf chunkEncoder - err := H264(&buf, bytes.NewReader(test.input), test.delay) + err := Lex(&buf, bytes.NewReader(test.input), test.delay) if fmt.Sprint(err) != fmt.Sprint(test.err) { t.Errorf("unexpected error for %q: got:%v want:%v", test.name, err, test.err) } @@ -221,87 +223,6 @@ func TestH264(t *testing.T) { } } */ - -var mjpegTests = []struct { - name string - input []byte - delay time.Duration - want [][]byte - err error -}{ - { - name: "empty", - }, - { - name: "null", - input: []byte{0xff, 0xd8, 0xff, 0xd9}, - delay: 0, - want: [][]byte{{0xff, 0xd8, 0xff, 0xd9}}, - }, - { - name: "null delayed", - input: []byte{0xff, 0xd8, 0xff, 0xd9}, - delay: time.Millisecond, - want: [][]byte{{0xff, 0xd8, 0xff, 0xd9}}, - }, - { - name: "full", - input: []byte{ - 0xff, 0xd8, 'f', 'u', 'l', 'l', 0xff, 0xd9, - 0xff, 0xd8, 'f', 'r', 'a', 'm', 'e', 0xff, 0xd9, - 0xff, 0xd8, 'w', 'i', 't', 'h', 0xff, 0xd9, - 0xff, 0xd8, 'l', 'e', 'n', 'g', 't', 'h', 0xff, 0xd9, - 0xff, 0xd8, 's', 'p', 'r', 'e', 'a', 'd', 0xff, 0xd9, - }, - delay: 0, - want: [][]byte{ - {0xff, 0xd8, 'f', 'u', 'l', 'l', 0xff, 0xd9}, - {0xff, 0xd8, 'f', 'r', 'a', 'm', 'e', 0xff, 0xd9}, - {0xff, 0xd8, 'w', 'i', 't', 'h', 0xff, 0xd9}, - {0xff, 0xd8, 'l', 'e', 'n', 'g', 't', 'h', 0xff, 0xd9}, - {0xff, 0xd8, 's', 'p', 'r', 'e', 'a', 'd', 0xff, 0xd9}, - }, - }, - { - name: "full delayed", - input: []byte{ - 0xff, 0xd8, 'f', 'u', 'l', 'l', 0xff, 0xd9, - 0xff, 0xd8, 'f', 'r', 'a', 'm', 'e', 0xff, 0xd9, - 0xff, 0xd8, 'w', 'i', 't', 'h', 0xff, 0xd9, - 0xff, 0xd8, 'l', 'e', 'n', 'g', 't', 'h', 0xff, 0xd9, - 0xff, 0xd8, 's', 'p', 'r', 'e', 'a', 'd', 0xff, 0xd9, - }, - delay: time.Millisecond, - want: [][]byte{ - {0xff, 0xd8, 'f', 'u', 'l', 'l', 0xff, 0xd9}, - {0xff, 0xd8, 'f', 'r', 'a', 'm', 'e', 0xff, 0xd9}, - {0xff, 0xd8, 'w', 'i', 't', 'h', 0xff, 0xd9}, - {0xff, 0xd8, 'l', 'e', 'n', 'g', 't', 'h', 0xff, 0xd9}, - {0xff, 0xd8, 's', 'p', 'r', 'e', 'a', 'd', 0xff, 0xd9}, - }, - }, -} - -// FIXME this needs to be adapted -/* -func TestMJEG(t *testing.T) { - for _, test := range mjpegTests { - var buf chunkEncoder - err := MJPEG(&buf, bytes.NewReader(test.input), test.delay) - if fmt.Sprint(err) != fmt.Sprint(test.err) { - t.Errorf("unexpected error for %q: got:%v want:%v", test.name, err, test.err) - } - if err != nil { - continue - } - got := [][]byte(buf) - if !reflect.DeepEqual(got, test.want) { - t.Errorf("unexpected result for %q:\ngot :%#v\nwant:%#v", test.name, got, test.want) - } - } -} -*/ - type chunkEncoder [][]byte func (e *chunkEncoder) Encode(b []byte) error { diff --git a/codec/mjpeg/lex.go b/codec/mjpeg/lex.go new file mode 100644 index 00000000..09ee2513 --- /dev/null +++ b/codec/mjpeg/lex.go @@ -0,0 +1,156 @@ +/* +NAME + lex.go + +DESCRIPTION + lex.go provides a lexer to extract separate JPEG images from a MJPEG stream. + +AUTHOR + Dan Kortschak + +LICENSE + lex.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + +// lex.go provides a lexer to extract separate JPEG images from a MJPEG stream. + +package mjpeg + +import ( + "bufio" + "bytes" + "fmt" + "io" + "time" +) + +var noDelay = make(chan time.Time) + +func init() { + close(noDelay) +} + +// Lex parses MJPEG frames read from src into separate writes to dst with +// successive writes being performed not earlier than the specified delay. +func Lex(dst io.Writer, src io.Reader, delay time.Duration) error { + var tick <-chan time.Time + if delay == 0 { + tick = noDelay + } else { + ticker := time.NewTicker(delay) + defer ticker.Stop() + tick = ticker.C + } + + r := bufio.NewReader(src) + for { + buf := make([]byte, 2, 4<<10) + n, err := r.Read(buf) + if n < 2 { + return nil + } + if err != nil { + return err + } + if !bytes.Equal(buf, []byte{0xff, 0xd8}) { + return fmt.Errorf("parser: not MJPEG frame start: %#v", buf) + } + var last byte + for { + b, err := r.ReadByte() + if err != nil { + return err + } + buf = append(buf, b) + if last == 0xff && b == 0xd9 { + break + } + last = b + } + <-tick + _, err = dst.Write(buf) + if err != nil { + return err + } + } +} + +// scanner is a byte scanner. +type scanner struct { + buf []byte + off int + + // r is the source of data for the scanner. + r io.Reader +} + +// newScanner returns a scanner initialised with an io.Reader and a read buffer. +func newScanner(r io.Reader, buf []byte) *scanner { + return &scanner{r: r, buf: buf[:0]} +} + +// scanUntilZeroInto scans the scanner's underlying io.Reader until a zero byte +// has been read, appending all read bytes to dst. The resulting appended data, +// the last read byte and whether the last read byte was zero are returned. +func (c *scanner) scanUntilZeroInto(dst []byte) (res []byte, b byte, err error) { +outer: + for { + var i int + for i, b = range c.buf[c.off:] { + if b != 0x0 { + continue + } + dst = append(dst, c.buf[c.off:c.off+i+1]...) + c.off += i + 1 + break outer + } + dst = append(dst, c.buf[c.off:]...) + err = c.reload() + if err != nil { + break + } + } + return dst, b, err +} + +// readByte is an unexported ReadByte. +func (c *scanner) readByte() (byte, error) { + if c.off >= len(c.buf) { + err := c.reload() + if err != nil { + return 0, err + } + } + b := c.buf[c.off] + c.off++ + return b, nil +} + +// reload re-fills the scanner's buffer. +func (c *scanner) reload() error { + n, err := c.r.Read(c.buf[:cap(c.buf)]) + c.buf = c.buf[:n] + if err != nil { + if err != io.EOF { + return err + } + if n == 0 { + return io.EOF + } + } + c.off = 0 + return nil +} diff --git a/codec/mjpeg/lex_test.go b/codec/mjpeg/lex_test.go new file mode 100644 index 00000000..59b4bd25 --- /dev/null +++ b/codec/mjpeg/lex_test.go @@ -0,0 +1,165 @@ +/* +NAME + lex_test.go + +DESCRIPTION + lex_test.go provides testing for the lexer in lex.go. + +AUTHOR + Dan Kortschak + +LICENSE + lex_test.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + +// lex_test.go provides testing for the lexer in lex.go. + +package mjpeg + +import ( + "bytes" + "reflect" + "testing" + "time" +) + +var mjpegTests = []struct { + name string + input []byte + delay time.Duration + want [][]byte + err error +}{ + { + name: "empty", + }, + { + name: "null", + input: []byte{0xff, 0xd8, 0xff, 0xd9}, + delay: 0, + want: [][]byte{{0xff, 0xd8, 0xff, 0xd9}}, + }, + { + name: "null delayed", + input: []byte{0xff, 0xd8, 0xff, 0xd9}, + delay: time.Millisecond, + want: [][]byte{{0xff, 0xd8, 0xff, 0xd9}}, + }, + { + name: "full", + input: []byte{ + 0xff, 0xd8, 'f', 'u', 'l', 'l', 0xff, 0xd9, + 0xff, 0xd8, 'f', 'r', 'a', 'm', 'e', 0xff, 0xd9, + 0xff, 0xd8, 'w', 'i', 't', 'h', 0xff, 0xd9, + 0xff, 0xd8, 'l', 'e', 'n', 'g', 't', 'h', 0xff, 0xd9, + 0xff, 0xd8, 's', 'p', 'r', 'e', 'a', 'd', 0xff, 0xd9, + }, + delay: 0, + want: [][]byte{ + {0xff, 0xd8, 'f', 'u', 'l', 'l', 0xff, 0xd9}, + {0xff, 0xd8, 'f', 'r', 'a', 'm', 'e', 0xff, 0xd9}, + {0xff, 0xd8, 'w', 'i', 't', 'h', 0xff, 0xd9}, + {0xff, 0xd8, 'l', 'e', 'n', 'g', 't', 'h', 0xff, 0xd9}, + {0xff, 0xd8, 's', 'p', 'r', 'e', 'a', 'd', 0xff, 0xd9}, + }, + }, + { + name: "full delayed", + input: []byte{ + 0xff, 0xd8, 'f', 'u', 'l', 'l', 0xff, 0xd9, + 0xff, 0xd8, 'f', 'r', 'a', 'm', 'e', 0xff, 0xd9, + 0xff, 0xd8, 'w', 'i', 't', 'h', 0xff, 0xd9, + 0xff, 0xd8, 'l', 'e', 'n', 'g', 't', 'h', 0xff, 0xd9, + 0xff, 0xd8, 's', 'p', 'r', 'e', 'a', 'd', 0xff, 0xd9, + }, + delay: time.Millisecond, + want: [][]byte{ + {0xff, 0xd8, 'f', 'u', 'l', 'l', 0xff, 0xd9}, + {0xff, 0xd8, 'f', 'r', 'a', 'm', 'e', 0xff, 0xd9}, + {0xff, 0xd8, 'w', 'i', 't', 'h', 0xff, 0xd9}, + {0xff, 0xd8, 'l', 'e', 'n', 'g', 't', 'h', 0xff, 0xd9}, + {0xff, 0xd8, 's', 'p', 'r', 'e', 'a', 'd', 0xff, 0xd9}, + }, + }, +} + +// FIXME this needs to be adapted +/* +func Lex(t *testing.T) { + for _, test := range mjpegTests { + var buf chunkEncoder + err := MJPEG(&buf, bytes.NewReader(test.input), test.delay) + if fmt.Sprint(err) != fmt.Sprint(test.err) { + t.Errorf("unexpected error for %q: got:%v want:%v", test.name, err, test.err) + } + if err != nil { + continue + } + got := [][]byte(buf) + if !reflect.DeepEqual(got, test.want) { + t.Errorf("unexpected result for %q:\ngot :%#v\nwant:%#v", test.name, got, test.want) + } + } +} +*/ + +type chunkEncoder [][]byte + +func (e *chunkEncoder) Encode(b []byte) error { + *e = append(*e, b) + return nil +} + +func (*chunkEncoder) Stream() <-chan []byte { panic("INVALID USE") } + +func TestScannerReadByte(t *testing.T) { + data := []byte("Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.") + + for _, size := range []int{1, 2, 8, 1 << 10} { + r := newScanner(bytes.NewReader(data), make([]byte, size)) + var got []byte + for { + b, err := r.readByte() + if err != nil { + break + } + got = append(got, b) + } + if !bytes.Equal(got, data) { + t.Errorf("unexpected result for buffer size %d:\ngot :%q\nwant:%q", size, got, data) + } + } +} + +func TestScannerScanUntilZero(t *testing.T) { + data := []byte("Lorem ipsum dolor sit amet, consectetur adipiscing elit,\x00 sed do eiusmod tempor incididunt ut \x00labore et dolore magna aliqua.") + + for _, size := range []int{1, 2, 8, 1 << 10} { + r := newScanner(bytes.NewReader(data), make([]byte, size)) + var got [][]byte + for { + buf, _, err := r.scanUntilZeroInto(nil) + got = append(got, buf) + if err != nil { + break + } + } + want := bytes.SplitAfter(data, []byte{0}) + if !reflect.DeepEqual(got, want) { + t.Errorf("unexpected result for buffer zie %d:\ngot :%q\nwant:%q", size, got, want) + } + } +} diff --git a/protocol/rtmp/rtmp_test.go b/protocol/rtmp/rtmp_test.go index 1cf056cb..e1e79796 100644 --- a/protocol/rtmp/rtmp_test.go +++ b/protocol/rtmp/rtmp_test.go @@ -38,7 +38,7 @@ import ( "testing" "time" - "bitbucket.org/ausocean/av/codec/lex" + "bitbucket.org/ausocean/av/codec/h264" "bitbucket.org/ausocean/av/container/flv" ) @@ -199,7 +199,7 @@ func TestFromFrame(t *testing.T) { if err != nil { t.Errorf("Failed to create flv encoder with error: %v", err) } - err = lex.H264(flvEncoder, bytes.NewReader(videoData), time.Second/time.Duration(frameRate)) + err = h264.Lex(flvEncoder, bytes.NewReader(videoData), time.Second/time.Duration(frameRate)) if err != nil { t.Errorf("Lexing failed with error: %v", err) } @@ -251,7 +251,7 @@ func TestFromFile(t *testing.T) { if err != nil { t.Fatalf("failed to create encoder: %v", err) } - err = lex.H264(flvEncoder, f, time.Second/time.Duration(25)) + err = h264.Lex(flvEncoder, f, time.Second/time.Duration(25)) if err != nil { t.Errorf("Lexing and encoding failed with error: %v", err) } diff --git a/revid/revid.go b/revid/revid.go index 1a3326ae..88a33b7e 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -33,6 +33,7 @@ import ( "errors" "fmt" "io" + "net" "os" "os/exec" "strconv" @@ -40,8 +41,8 @@ import ( "sync" "time" + "bitbucket.org/ausocean/av/codec/h264" "bitbucket.org/ausocean/av/codec/h265" - "bitbucket.org/ausocean/av/codec/lex" "bitbucket.org/ausocean/av/container/flv" "bitbucket.org/ausocean/av/container/mts" "bitbucket.org/ausocean/av/protocol/rtcp" @@ -279,13 +280,13 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.WriteCloser, rate int) switch r.config.Input { case Raspivid: r.setupInput = r.startRaspivid - r.lexTo = lex.H264 + r.lexTo = h264.Lex case V4L: r.setupInput = r.startV4L - r.lexTo = lex.H264 + r.lexTo = h264.Lex case File: r.setupInput = r.setupInputForFile - r.lexTo = lex.H264 + r.lexTo = h264.Lex case RTSPCamera: r.setupInput = r.startRTSPCamera r.lexTo = h265.NewLexer(false).Lex @@ -629,39 +630,23 @@ func (r *Revid) startRTSPCamera() (func() error, error) { if err != nil { return nil, err } - r.config.Logger.Log(logger.Info, pkg+"RTSP server OPTIONS response", "response", resp.String()) + r.config.Logger.Log(logger.Info, pkg+"RTSP OPTIONS response", "response", resp.String()) resp, err = rtspClt.Describe() if err != nil { return nil, err } - r.config.Logger.Log(logger.Info, pkg+"RTSP server DESCRIBE response", "response", resp.String()) + r.config.Logger.Log(logger.Info, pkg+"RTSP DESCRIBE response", "response", resp.String()) resp, err = rtspClt.Setup("track1", fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", rtpPort, rtcpPort)) if err != nil { return nil, err } - transport := resp.Header.Get("Transport") - parts := strings.Split(transport, ";") - var serverPorts string - for _, part := range parts { - if strings.Contains(part, "server_port") { - serverPorts = part - break - } + r.config.Logger.Log(logger.Info, pkg+"RTSP SETUP response", "response", resp.String()) + rtpCltAddr, rtcpCltAddr, rtcpSvrAddr, err := formAddrs(local, remote, *resp) + if err != nil { + return nil, err } - var serverRTCPPort int - if serverPorts == "" { - r.config.Logger.Log(logger.Warning, pkg+"RTSP could not get server ports, defaulting") - serverRTCPPort = defaultServerRTCPPort - } else { - serverRTCPPort, err = strconv.Atoi(strings.Split(serverPorts, "-")[1]) - if err != nil { - serverRTCPPort = defaultServerRTCPPort - } - } - - r.config.Logger.Log(logger.Info, pkg+"RTSP server SETUP response", "response", resp.String()) resp, err = rtspClt.Play() if err != nil { @@ -669,19 +654,17 @@ func (r *Revid) startRTSPCamera() (func() error, error) { } r.config.Logger.Log(logger.Info, pkg+"RTSP server PLAY response", "response", resp.String()) - rtpClt, err := rtp.NewClient(strings.Split(local.String(), ":")[0] + ":" + strconv.Itoa(rtpPort)) + rtpClt, err := rtp.NewClient(rtpCltAddr) if err != nil { return nil, err } - // TODO(saxon): use rtcp client to maintain rtp stream. - rtcpCltAddr := strings.Split(local.String(), ":")[0] + ":" + strconv.Itoa(rtcpPort) - rtcpSvrAddr := strings.Split(remote.String(), ":")[0] + ":" + strconv.Itoa(serverRTCPPort) rtcpClt, err := rtcp.NewClient(rtcpCltAddr, rtcpSvrAddr, rtpClt, r.config.Logger.Log) if err != nil { return nil, err } + // Check errors from RTCP client until it has stopped running. go func() { for { select { @@ -694,10 +677,13 @@ func (r *Revid) startRTSPCamera() (func() error, error) { } }() + // Start the RTCP client. rtcpClt.Start() + // Start reading data from the RTP client. r.wg.Add(1) go r.processFrom(rtpClt, time.Second/time.Duration(r.config.FrameRate)) + return func() error { rtspClt.Close() rtcpClt.Stop() @@ -705,6 +691,35 @@ func (r *Revid) startRTSPCamera() (func() error, error) { }, nil } +// formAddrs is a helper function to form the addresses for the RTP client, +// RTCP client, and the RTSP server's RTCP addr using the local, remote addresses +// of the RTSP conn, and the SETUP method response. +func formAddrs(local, remote *net.TCPAddr, setupResp rtsp.Response) (rtpCltAddr, rtcpCltAddr, rtcpSvrAddr string, err error) { + svrRTCPPort, err := parseSvrRTCPPort(setupResp) + if err != nil { + return "", "", "", err + } + rtpCltAddr = strings.Split(local.String(), ":")[0] + ":" + strconv.Itoa(rtpPort) + rtcpCltAddr = strings.Split(local.String(), ":")[0] + ":" + strconv.Itoa(rtcpPort) + rtcpSvrAddr = strings.Split(remote.String(), ":")[0] + ":" + strconv.Itoa(svrRTCPPort) + return +} + +// parseServerRTCPPort is a helper function to get the RTSP server's RTCP port. +func parseSvrRTCPPort(resp rtsp.Response) (int, error) { + transport := resp.Header.Get("Transport") + for _, p := range strings.Split(transport, ";") { + if strings.Contains(p, "server_port") { + port, err := strconv.Atoi(strings.Split(p, "-")[1]) + if err != nil { + return 0, err + } + return port, nil + } + } + return 0, errors.New("SETUP response did not provide RTCP port") +} + func (r *Revid) processFrom(read io.Reader, delay time.Duration) { r.config.Logger.Log(logger.Info, pkg+"reading input data") r.err <- r.lexTo(r.encoders, read, delay)