diff --git a/codec/h265/lex.go b/codec/h265/lex.go new file mode 100644 index 00000000..7baaf734 --- /dev/null +++ b/codec/h265/lex.go @@ -0,0 +1,157 @@ +/* +NAME + lex.go + +DESCRIPTION + lex.go provides a lexer for taking h265 rtp format and lexing into access units. + +AUTHORS + Saxon A. Nelson-Milton + +LICENSE + Copyright (C) 2019 the Australian Ocean Lab (AusOcean). + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + +package h265 + +import ( + "encoding/binary" + "fmt" + "io" + "time" + + "bitbucket.org/ausocean/av/protocol/rtp" +) + +// NALU types. +const ( + typeAggregation = 48 + typeFragmentation = 49 + typePACI = 50 +) + +// Buffer sizes. +const ( + maxAUSize = 100000 + maxRTPSize = 4096 +) + +// Lexer is an H265 lexer. +type Lexer struct { + UsingDON bool // Indicates whether DONL and DOND will be used for in RTP stream. + buf [maxAUSize]byte // Holds current access unit. + off int // Holds offset into buf that we're occupying with access unit. + fragmented bool // Indicates if we're currently dealing with a fragmentation unit. +} + +// Lex continually reads RTP packets from the io.Reader src and lexes into +// access units which are written to the io.Writer dst. Lex expects that for +// each read from src, a single RTP packet is received. +func (l *Lexer) Lex(dst io.Writer, src io.Reader, delay time.Duration) error { + buf := make([]byte, maxRTPSize) + for { + n, err := src.Read(buf) + switch err { + case nil: + case io.EOF: + continue + default: + return fmt.Errorf("source read error: %v\n", err) + } + + // Get payload from RTP packet. + payload, err := rtp.Payload(buf[:n]) + if err != nil { + return fmt.Errorf("could not get rtp payload, failed with err: %v\n", err) + } + + nalType := (buf[0] >> 1) & 0x3f + + // If not currently fragmented then we ignore current write + if l.fragmented && nalType != typeFragmentation { + l.off = 0 + continue + } + + switch nalType { + case typeAggregation: + l.handleAggregation(payload) + case typeFragmentation: + l.handleFragmentation(payload) + case typePACI: + l.handlePACI(payload) + default: + l.write(payload) + } + + m, err := rtp.Marker(buf[:n]) + if err != nil { + return fmt.Errorf("could not get marker bit, failed with err: %v\n", err) + } + + if m { + _, err := dst.Write(l.buf[:l.off]) + if err != nil { + // TODO: work out what to do here. + } + l.off = 0 + } + } + return nil +} + +// handleAggregation parses NAL units from an aggregation packet and writing +// them to the Lexers buffer buf. +func (l *Lexer) handleAggregation(d []byte) { + idx := 2 + for idx < len(d) { + if l.UsingDON { + switch idx { + case 2: + idx += 2 + default: + idx += 1 + } + } + size := int(binary.BigEndian.Uint16(d[idx:])) + idx += 2 + nalu := d[idx : idx+size] + idx += size + l.write(nalu) + } +} + +// handleFragmentation parses NAL units from fragmentation packets and writes +// them to the Lexer's buf. +func (l *Lexer) handleFragmentation(d []byte) { + +} + +// handlePACI will handl PACI packets +// +// TODO: complete this +func (l *Lexer) handlePACI(d []byte) { + panic("unsupported nal type") +} + +// write writes a NAL unit to the Lexers buf in byte stream format using the +// start code. +func (l *Lexer) write(d []byte) { + const startCode = "\x00\x00\x00\x01" + copy(l.buf[l.off:], []byte(startCode)) + copy(l.buf[l.off+4:], d) + l.off += len(d) + 4 +} diff --git a/protocol/rtp/parse.go b/protocol/rtp/parse.go index d658aa20..b60af30a 100644 --- a/protocol/rtp/parse.go +++ b/protocol/rtp/parse.go @@ -34,6 +34,19 @@ import ( const badVer = "incompatible RTP version" +// Marker returns the state of the RTP marker bit, and an error if parsing fails. +func Marker(d []byte) (bool, error) { + if len(d) < defaultHeadSize { + panic("invalid RTP packet length") + } + + if version(d) != rtpVer { + return false, errors.New(badVer) + } + + return d[1]&0x80 == 1, nil +} + // Payload returns the payload from an RTP packet provided the version is // compatible, otherwise an error is returned. func Payload(d []byte) ([]byte, error) {