codec/mjpeg/extract.go: wrote Extractor type

Wrote extractor type that provides an Extract function to extract JPEG frames from an RTP/MJPEG stream and writes them to a destination.
This commit is contained in:
Saxon 2019-11-20 13:40:07 +10:30
parent c2ce700cbd
commit ba25cdfd12
2 changed files with 337 additions and 48 deletions

288
codec/mjpeg/extract.go Normal file
View File

@ -0,0 +1,288 @@
/*
DESCRIPTION
extract.go provides an Extractor to get JPEG from RTP.
AUTHOR
Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE
Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package mjpeg
import (
"bytes"
"fmt"
"io"
"time"
"errors"
"bitbucket.org/ausocean/av/protocol/rtp"
)
// Buffer sizes.
const (
maxRTPSize = 1500 // Max ethernet transmission unit in bytes.
)
var (
errNoQTable = errors.New("no quantization table")
errReservedQ = errors.New("q value is reserved")
)
var defaultQuantisers = []byte{
// Luma table.
16, 11, 12, 14, 12, 10, 16, 14,
13, 14, 18, 17, 16, 19, 24, 40,
26, 24, 22, 22, 24, 49, 35, 37,
29, 40, 58, 51, 61, 60, 57, 51,
56, 55, 64, 72, 92, 78, 64, 68,
87, 69, 55, 56, 80, 109, 81, 87,
95, 98, 103, 104, 103, 62, 77, 113,
121, 112, 100, 120, 92, 101, 103, 99,
/* chroma table */
17, 18, 18, 24, 21, 24, 47, 26,
26, 47, 99, 66, 56, 66, 99, 99,
99, 99, 99, 99, 99, 99, 99, 99,
99, 99, 99, 99, 99, 99, 99, 99,
99, 99, 99, 99, 99, 99, 99, 99,
99, 99, 99, 99, 99, 99, 99, 99,
99, 99, 99, 99, 99, 99, 99, 99,
99, 99, 99, 99, 99, 99, 99, 99,
}
// Extractor is an Extractor for extracting JPEG from an RTP stream.
type Extractor struct {
buf *bytes.Buffer // Holds the current JPEG image.
dst io.Writer // The destination we'll be writing extracted NALUs to.
}
// NewExtractor returns a new Extractor.
func NewExtractor() *Extractor { return &Extractor{} }
// Extract will continously read RTP packets from src containing JPEG (in RTP
// payload format) and extract the JPEG images, sending them to dst. This
// function expects that each read from src will provide a single RTP packet.
func (e *Extractor) Extract(dst io.Writer, src io.Reader, delay time.Duration) error {
buf := make([]byte, maxRTPSize)
var (
qTables [128][128]byte
qTablesLen [128]byte
)
for {
n, err := src.Read(buf)
switch err {
case nil: // Do nothing.
case io.EOF:
return nil
default:
return fmt.Errorf("source read error: %v\n", err)
}
// Get payload from RTP packet.
p, err := rtp.Payload(buf[:n])
if err != nil {
return fmt.Errorf("could not get RTP payload, failed with err: %v\n", err)
}
b := newByteStream(p)
_ = b.get8() // Ignore type-specific flag
var (
off = b.get24() // Fragment offset.
t = b.get8() // Type.
q = b.get8() // Quantization value.
width = b.get8() // Picture width.
height = b.get8() // Picture height.
dri int // Restart interval.
)
if t&0x40 != 0 {
dri = b.get16()
_ = b.get16() // Ignore restart count.
t &= ^0x40
}
if t > 1 {
panic("unimplemented RTP/JPEG type")
}
// Parse quantization table if our offset is 0.
if off == 0 {
var qTable []byte
var qLen int
if q > 127 {
_ = b.get8() // Ignore first byte (reserved for future use).
prec := b.get8() // The size of coefficients.
qLen = b.get16() // The length of the quantization table.
if prec != 0 {
panic("unsupported precision")
}
if qLen > 0 {
qTable = b.getBuf(qLen)
if q < 255 {
if qTablesLen[q-128] == 0 && qLen <= 128 {
copy(qTables[q-128][:],qTable)
qTablesLen[q-128] = byte(qLen)
}
}
} else {
if q == 255 {
return errNoQTable
}
if qTablesLen[q-128] == 0 {
return fmt.Errorf("no quantization tables known for q %d yet",q)
}
qTable = qTables[q-128][:]
qLen = int(qTablesLen[q-128])
}
} else { // q <= 127
if q == 0 || q > 99 {
return errReservedQ
}
qTable = defaultQTable(q)
qLen = len(qTable)
}
e.buf.Reset()
err = writeHeader(e.buf, t, width, height, qLen / 64, dri, qTable)
if err != nil {
return fmt.Errorf("could not write JPEG header: %w",err)
}
}
if e.buf.Len() == 0 {
// Must have missed start of frame? So ignore and wait for start.
continue
}
// TODO: check that timestamp is consistent
// This will need expansion to RTP package to create Timestamp parsing func.
// TODO: could also check offset with how many bytes we currently have
// to determine if there are missing frames.
// Write frame data
err = b.writeTo(e.buf,b.remaining())
if err != nil {
return fmt.Errorf("could not write remaining frame data to output buffer: %w",err)
}
m, err := rtp.Marker(buf[:n])
if err != nil {
return fmt.Errorf("could not read RTP marker: %w",err)
}
if m {
_,err = e.buf.Write([]byte{0xff,codeEOI})
if err != nil {
return fmt.Errorf("could not write EOI marker: %w",err)
}
_,err = e.buf.WriteTo(dst)
if err != nil {
return fmt.Errorf("could not write JPEG to dst: %w",err)
}
}
}
}
type byteStream struct {
bytes []byte
i int
}
func newByteStream(b []byte) *byteStream { return &byteStream{bytes: b} }
func (b *byteStream) get24() int {
v := int(b.bytes[b.i])<<16 | int(b.bytes[b.i+1])<<8 | int(b.bytes[b.i+2])
b.i += 3
return v
}
func (b *byteStream) get8() int {
v := int(b.bytes[b.i])
b.i++
return v
}
func (b *byteStream) get16() int {
v := int(b.bytes[b.i])<<8 | int(b.bytes[b.i+1])
b.i += 2
return v
}
func (b *byteStream) getBuf(n int) []byte {
v := b.bytes[b.i:b.i+n]
b.i += n
return v
}
func (b *byteStream) remaining() int {
return len(b.bytes) - b.i
}
func (b *byteStream) writeTo(w io.Writer, n int) error {
_n,err := w.Write(b.bytes[b.i:n])
b.i += _n
if err != nil {
return err
}
return nil
}
func defaultQTable(q int) []byte {
f := clip(q,q,99)
const tabLen = 128
tab := make([]byte,tabLen)
if q < 50 {
q = 5000 / f
} else {
q = 200 - f*2
}
for i := 0; i < tabLen; i++ {
v := (int(defaultQuantisers[i])*q + 50) / 100
v = clip(v,1,255)
tab[i] = byte(v)
}
return tab
}
func clip(v, min, max int) int {
if v < min {
return min
}
if v > max {
return max
}
return v
}

View File

@ -39,6 +39,7 @@ const (
codeSOS = 0xda // Start of scan. codeSOS = 0xda // Start of scan.
codeAPP0 = 0xe0 // TODO: find out what this is. codeAPP0 = 0xe0 // TODO: find out what this is.
codeSOF0 = 0xc0 // Baseline codeSOF0 = 0xc0 // Baseline
codeEOI = 0xd9 // End of image.
) )
var ( var (
@ -96,42 +97,37 @@ var (
} }
) )
// writeMarker writes an JPEG marker with code to w. type multiError []error
func writeMarker(w io.Writer, code byte) error {
_, err := w.Write([]byte{0xff, code}) func (me multiError) Error() string {
if err != nil { return fmt.Sprintf("%v", []error(me))
return err
}
return nil
} }
// writeHuffman write a JPEG huffman table to w. func (me multiError) add(e error) {
func writeHuffman(w io.Writer, class, id int, bits, values []byte) error { me = append(me, e)
_, err := w.Write([]byte{byte(class<<4 | id)}) }
if err != nil {
return fmt.Errorf("could not write class and id: %w", err)
}
var n int type putter struct {
for i := 1; i <= 16; i++ { idx int
n += int(bits[i]) }
}
_, err = w.Write(bits[1:17]) func (p *putter) put16(b []byte, v uint16) {
if err != nil { binary.BigEndian.PutUint16(b[p.idx:], v)
return fmt.Errorf("could not write first lot of huffman bytes: %w", err) p.idx += 2
} }
_, err = w.Write(values[0:n]) func (p *putter) put8(b []byte, v uint8) {
if err != nil { b[p.idx] = byte(v)
return fmt.Errorf("could not write second lot of huffman bytes: %w", err) p.idx++
} }
return nil func (p *putter) putBuf(dst, src []byte, l int) {
copy(dst[p.idx:], src)
p.idx++
} }
// writeHeader writes a JPEG header to the writer w. // writeHeader writes a JPEG header to the writer w.
func writeHeader(w io.Writer, size, _type, width, height, nbqTab, dri int, qtable []byte) error { func writeHeader(w io.Writer, _type, width, height, nbqTab, dri int, qtable []byte) error {
width <<= 3 width <<= 3
height <<= 3 height <<= 3
@ -282,31 +278,36 @@ func writeHeader(w io.Writer, size, _type, width, height, nbqTab, dri int, qtabl
return nil return nil
} }
type multiError []error // writeMarker writes an JPEG marker with code to w.
func writeMarker(w io.Writer, code byte) error {
func (me multiError) Error() string { _, err := w.Write([]byte{0xff, code})
return fmt.Sprintf("%v", []error(me)) if err != nil {
return err
}
return nil
} }
func (me multiError) add(e error) { // writeHuffman write a JPEG huffman table to w.
me = append(me, e) func writeHuffman(w io.Writer, class, id int, bits, values []byte) error {
} _, err := w.Write([]byte{byte(class<<4 | id)})
if err != nil {
return fmt.Errorf("could not write class and id: %w", err)
}
type putter struct { var n int
idx int for i := 1; i <= 16; i++ {
} n += int(bits[i])
}
func (p *putter) put16(b []byte, v uint16) { _, err = w.Write(bits[1:17])
binary.BigEndian.PutUint16(b[p.idx:], v) if err != nil {
p.idx += 2 return fmt.Errorf("could not write first lot of huffman bytes: %w", err)
} }
func (p *putter) put8(b []byte, v uint8) { _, err = w.Write(values[0:n])
b[p.idx] = byte(v) if err != nil {
p.idx++ return fmt.Errorf("could not write second lot of huffman bytes: %w", err)
} }
func (p *putter) putBuf(dst, src []byte, l int) { return nil
copy(dst[p.idx:], src)
p.idx++
} }