diff --git a/codec/h264/lex.go b/codec/h264/lex.go index 9a9c48ab..176c8b3b 100644 --- a/codec/h264/lex.go +++ b/codec/h264/lex.go @@ -7,6 +7,7 @@ DESCRIPTION AUTHOR Dan Kortschak + Saxon Nelson-Milton LICENSE lex.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) @@ -25,45 +26,15 @@ LICENSE along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. */ -// lex.go provides a lexer to lex h264 bytestream into access units. - +// Package h264 provides a h264 bytestream lexer and RTP H264 access unit +// extracter. package h264 import ( - "bytes" - "encoding/binary" - "fmt" "io" "time" "bitbucket.org/ausocean/av/codec/codecutil" - "bitbucket.org/ausocean/av/protocol/rtp" -) - -// NAL types (from https://tools.ietf.org/html/rfc6184#page-13) -const ( - // Single nal units bounds. - typeSingleNALULowBound = 1 - typeSingleNALUHighBound = 23 - - // Single-time aggregation packets. - typeSTAPA = 24 - typeSTAPB = 25 - - // Multi-time aggregation packets. - typeMTAP16 = 26 - typeMTAP24 = 27 - - // Fragmentation packets. - typeFUA = 28 - typeFUB = 29 -) - -// Min NAL lengths. -const ( - minSingleNALLen = 1 - minSTAPALen = 4 - minFUALen = 2 ) var noDelay = make(chan time.Time) @@ -74,11 +45,11 @@ func init() { var h264Prefix = [...]byte{0x00, 0x00, 0x01, 0x09, 0xf0} -// LexFromBytestream lexes H.264 NAL units read from src into separate writes +// Lex lexes H.264 NAL units read from src into separate writes // to dst with successive writes being performed not earlier than the specified // delay. NAL units are split after type 1 (Coded slice of a non-IDR picture), 5 // (Coded slice of a IDR picture) and 8 (Picture parameter set). -func LexFromBytestream(dst io.Writer, src io.Reader, delay time.Duration) error { +func Lex(dst io.Writer, src io.Reader, delay time.Duration) error { var tick <-chan time.Time if delay == 0 { tick = noDelay @@ -163,157 +134,3 @@ outer: _, err := dst.Write(buf) return err } - -// Buffer sizes. -const ( - maxAUSize = 100000 // Max access unit size in bytes. - maxRTPSize = 1500 // Max ethernet transmission unit in bytes. -) - -// RTPLexer is a lexer for lexing H264 from RTP packets. -type RTPLexer struct { - buf *bytes.Buffer // Holds the current access unit. - frag bool // Indicates if we're currently dealing with a fragmentation packet. -} - -// NewRTPLexer returns a new RTPLexer. -func NewRTPLexer() *RTPLexer { - return &RTPLexer{ - buf: bytes.NewBuffer(make([]byte, 0, maxAUSize))} -} - -// Lex extracts H264 access units from an RTP stream. This function -// expects that each read from src will provide a single RTP packet. -func (l *RTPLexer) Lex(dst io.Writer, src io.Reader, delay time.Duration) error { - buf := make([]byte, maxRTPSize) - for { - n, err := src.Read(buf) - switch err { - case nil: // Do nothing. - case io.EOF: - return nil - default: - return fmt.Errorf("source read error: %v\n", err) - } - - // Get payload from RTP packet. - payload, err := rtp.Payload(buf[:n]) - if err != nil { - return fmt.Errorf("could not get RTP payload, failed with err: %v\n", err) - } - - nalType := payload[0] & 0x1f - - // If not currently fragmented then we ignore current write. - if l.frag && nalType != typeFUA { - l.buf.Reset() - l.frag = false - continue - } - - if typeSingleNALULowBound <= nalType && nalType <= typeSingleNALUHighBound { - // If len too small, ignore. - if len(payload) < minSingleNALLen { - continue - } - l.writeWithPrefix(payload) - } else { - switch nalType { - case typeSTAPA: - l.handleSTAPA(payload) - case typeFUA: - l.handleFUA(payload) - case typeSTAPB: - panic("STAP-B type unsupported") - case typeMTAP16: - panic("MTAP16 type unsupported") - case typeMTAP24: - panic("MTAP24 type unsupported") - case typeFUB: - panic("FU-B type unsupported") - default: - panic("unsupported type") - } - } - - markerIsSet, err := rtp.Marker(buf[:n]) - if err != nil { - return fmt.Errorf("could not get marker bit, failed with err: %v\n", err) - } - - if markerIsSet { - l.buf.WriteTo(dst) - l.buf.Reset() - } - } - return nil -} - -// handleSTAPA parses NAL units from an aggregation packet and writes -// them to the Lexers buffer buf. -func (l *RTPLexer) handleSTAPA(d []byte) { - // If the length is too small, ignore. - if len(d) < minSTAPALen { - return - } - - for i := 1; i < len(d); { - size := int(binary.BigEndian.Uint16(d[i:])) - - // Skip over NAL unit size. - const sizeOfFieldLen = 2 - i += sizeOfFieldLen - - // Get the NALU. - nalu := d[i : i+size] - i += size - l.writeWithPrefix(nalu) - } -} - -// handleFUA parses NAL units from fragmentation packets and writes -// them to the Lexer's buf. -func (l *RTPLexer) handleFUA(d []byte) { - // If length is too small, ignore. - if len(d) < minFUALen { - return - } - - // Get start and end indiciators from FU header. - const FUHeadIdx = 1 - start := d[FUHeadIdx]&0x80 != 0 - end := d[FUHeadIdx]&0x40 != 0 - - // If start, form new header, skip FU indicator only and set first byte to - // new header. Otherwise, skip over both FU indicator and FU header. - if start { - newHead := (d[0] & 0xe0) | (d[1] & 0x1f) - d = d[1:] - d[0] = newHead - if end { - panic("bad fragmentation packet") - } - l.frag = true - l.writeWithPrefix(d) - } else { - d = d[2:] - if end { - l.frag = false - } - l.writeNoPrefix(d) - } -} - -// write writes a NAL unit to the Lexer's buf in byte stream format using the -// start code. -func (l *RTPLexer) writeWithPrefix(d []byte) { - const prefix = "\x00\x00\x00\x01" - l.buf.Write([]byte(prefix)) - l.buf.Write(d) -} - -// writeNoPrefix writes data to the Lexer's buf. This is used for non start -// fragmentations of a NALU. -func (l *RTPLexer) writeNoPrefix(d []byte) { - l.buf.Write(d) -} diff --git a/codec/h264/lex_test.go b/codec/h264/lex_test.go index 46191fbd..87e6b7d2 100644 --- a/codec/h264/lex_test.go +++ b/codec/h264/lex_test.go @@ -31,8 +31,6 @@ LICENSE package h264 import ( - "io" - "testing" "time" ) @@ -223,143 +221,3 @@ func TestH264(t *testing.T) { } } */ - -// rtpReader provides an io.Reader for reading the test RTP stream. -type rtpReader struct { - packets [][]byte - idx int -} - -// Read implements io.Reader. -func (r *rtpReader) Read(p []byte) (int, error) { - if r.idx == len(r.packets) { - return 0, io.EOF - } - b := r.packets[r.idx] - n := copy(p, b) - if n < len(r.packets[r.idx]) { - r.packets[r.idx] = r.packets[r.idx][n:] - } else { - r.idx++ - } - return n, nil -} - -// destination holds the access units extracted during the lexing process. -type destination [][]byte - -// Write implements io.Writer. -func (d *destination) Write(p []byte) (int, error) { - tmp := make([]byte, len(p)) - copy(tmp, p) - *d = append(*d, tmp) - return len(p), nil -} - -// TestLex checks that the Lexer can correctly extract H264 access units from -// h264 RTP stream in RTP payload format. -func TestRTPLex(t *testing.T) { - const rtpVer = 2 - - tests := []struct { - packets [][]byte - expect [][]byte - }{ - { - packets: [][]byte{ - { // Single NAL unit. - 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // RTP header. - typeSingleNALULowBound, // NAL header. - 0x01, 0x02, 0x03, 0x04, // NAL Data. - }, - { // Fragmentation (start packet). - 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // RTP header. - typeFUA, // FU indicator. - 0x80 | typeSingleNALULowBound, // FU header. - 0x01, 0x02, 0x03, // FU payload. - }, - { // Fragmentation (middle packet) - 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // RTP header. - typeFUA, // NAL indicator. - typeSingleNALULowBound, // FU header. - 0x04, 0x05, 0x06, // FU payload. - }, - { // Fragmentation (end packet) - 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // RTP header. - typeFUA, // NAL indicator. - 0x40 | typeSingleNALULowBound, // FU header. - 0x07, 0x08, 0x09, // FU payload - }, - - { // Aggregation. Make last packet of access unit => marker bit true. - 0x80, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // RTP header. - typeSTAPA, // NAL header. - 0x00, 0x04, // NAL 1 size. - 0x01, 0x02, 0x03, 0x04, // NAL 1 data. - 0x00, 0x04, // NAL 2 size. - 0x01, 0x02, 0x03, 0x04, // NAL 2 data. - }, - // Second access unit. - { // Single NAL unit. - 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // RTP header. - typeSingleNALULowBound, // NAL header. - 0x01, 0x02, 0x03, 0x04, // NAL Data. - }, - { // Single NAL. Make last packet of access unit => marker bit true. - 0x80, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // RTP header. - typeSingleNALULowBound, // NAL header. - 0x01, 0x02, 0x03, 0x04, // NAL data. - }, - }, - expect: [][]byte{ - // First access unit. - { - // NAL 1 - 0x00, 0x00, 0x00, 0x01, // Start code. - typeSingleNALULowBound, // NAL header. - 0x01, 0x02, 0x03, 0x04, // NAL data. - // NAL 2 - 0x00, 0x00, 0x00, 0x01, // Start code. - typeSingleNALULowBound, - 0x01, 0x02, 0x03, // FU payload. - 0x04, 0x05, 0x06, // FU payload. - 0x07, 0x08, 0x09, // FU payload. - // NAL 3 - 0x00, 0x00, 0x00, 0x01, // Start code. - 0x01, 0x02, 0x03, 0x04, // NAL data. - // NAL 4 - 0x00, 0x00, 0x00, 0x01, // Start code. - 0x01, 0x02, 0x03, 0x04, // NAL 2 data - }, - // Second access unit. - { - // NAL 1 - 0x00, 0x00, 0x00, 0x01, // Start code. - typeSingleNALULowBound, // NAL header. - 0x01, 0x02, 0x03, 0x04, // Data. - // NAL 2 - 0x00, 0x00, 0x00, 0x01, // Start code. - typeSingleNALULowBound, // NAL header. - 0x01, 0x02, 0x03, 0x04, // Data. - }, - }, - }, - } - - for testNum, test := range tests { - r := &rtpReader{packets: test.packets} - d := &destination{} - err := NewRTPLexer().Lex(d, r, 0) - if err != nil { - t.Fatalf("error lexing: %v\n", err) - } - - for i, accessUnit := range test.expect { - for j, part := range accessUnit { - if part != [][]byte(*d)[i][j] { - t.Fatalf("did not get expected data for test: %v.\nGot: %v\nWant: %v\n", testNum, d, test.expect) - } - } - } - } -} diff --git a/protocol/rtmp/rtmp_test.go b/protocol/rtmp/rtmp_test.go index c7c38bd4..e1e79796 100644 --- a/protocol/rtmp/rtmp_test.go +++ b/protocol/rtmp/rtmp_test.go @@ -199,7 +199,7 @@ func TestFromFrame(t *testing.T) { if err != nil { t.Errorf("Failed to create flv encoder with error: %v", err) } - err = h264.LexFromBytestream(flvEncoder, bytes.NewReader(videoData), time.Second/time.Duration(frameRate)) + err = h264.Lex(flvEncoder, bytes.NewReader(videoData), time.Second/time.Duration(frameRate)) if err != nil { t.Errorf("Lexing failed with error: %v", err) } @@ -251,7 +251,7 @@ func TestFromFile(t *testing.T) { if err != nil { t.Fatalf("failed to create encoder: %v", err) } - err = h264.LexFromBytestream(flvEncoder, f, time.Second/time.Duration(25)) + err = h264.Lex(flvEncoder, f, time.Second/time.Duration(25)) if err != nil { t.Errorf("Lexing and encoding failed with error: %v", err) } diff --git a/revid/revid.go b/revid/revid.go index 7b88aada..2f0f768e 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -280,10 +280,10 @@ func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.WriteCloser, rate int) switch r.config.Input { case Raspivid: r.setupInput = r.startRaspivid - r.lexTo = h264.LexFromBytestream + r.lexTo = h264.Lex case V4L: r.setupInput = r.startV4L - r.lexTo = h264.LexFromBytestream + r.lexTo = h264.Lex case File: r.setupInput = r.setupInputForFile case RTSP: