From ba25cdfd12951b4678a310964f23986a36c2a63f Mon Sep 17 00:00:00 2001 From: Saxon Date: Wed, 20 Nov 2019 13:40:07 +1030 Subject: [PATCH] codec/mjpeg/extract.go: wrote Extractor type Wrote extractor type that provides an Extract function to extract JPEG frames from an RTP/MJPEG stream and writes them to a destination. --- codec/mjpeg/extract.go | 288 +++++++++++++++++++++++++++++++++++++++++ codec/mjpeg/jpeg.go | 97 +++++++------- 2 files changed, 337 insertions(+), 48 deletions(-) create mode 100644 codec/mjpeg/extract.go diff --git a/codec/mjpeg/extract.go b/codec/mjpeg/extract.go new file mode 100644 index 00000000..d0f2e791 --- /dev/null +++ b/codec/mjpeg/extract.go @@ -0,0 +1,288 @@ +/* +DESCRIPTION + extract.go provides an Extractor to get JPEG from RTP. + +AUTHOR + Saxon Nelson-Milton + +LICENSE + Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + +package mjpeg + +import ( + "bytes" + "fmt" + "io" + "time" + "errors" + + "bitbucket.org/ausocean/av/protocol/rtp" +) + +// Buffer sizes. +const ( + maxRTPSize = 1500 // Max ethernet transmission unit in bytes. +) + +var ( + errNoQTable = errors.New("no quantization table") + errReservedQ = errors.New("q value is reserved") +) + +var defaultQuantisers = []byte{ + // Luma table. + 16, 11, 12, 14, 12, 10, 16, 14, + 13, 14, 18, 17, 16, 19, 24, 40, + 26, 24, 22, 22, 24, 49, 35, 37, + 29, 40, 58, 51, 61, 60, 57, 51, + 56, 55, 64, 72, 92, 78, 64, 68, + 87, 69, 55, 56, 80, 109, 81, 87, + 95, 98, 103, 104, 103, 62, 77, 113, + 121, 112, 100, 120, 92, 101, 103, 99, + + /* chroma table */ + 17, 18, 18, 24, 21, 24, 47, 26, + 26, 47, 99, 66, 56, 66, 99, 99, + 99, 99, 99, 99, 99, 99, 99, 99, + 99, 99, 99, 99, 99, 99, 99, 99, + 99, 99, 99, 99, 99, 99, 99, 99, + 99, 99, 99, 99, 99, 99, 99, 99, + 99, 99, 99, 99, 99, 99, 99, 99, + 99, 99, 99, 99, 99, 99, 99, 99, + +} + +// Extractor is an Extractor for extracting JPEG from an RTP stream. +type Extractor struct { + buf *bytes.Buffer // Holds the current JPEG image. + dst io.Writer // The destination we'll be writing extracted NALUs to. +} + +// NewExtractor returns a new Extractor. +func NewExtractor() *Extractor { return &Extractor{} } + +// Extract will continously read RTP packets from src containing JPEG (in RTP +// payload format) and extract the JPEG images, sending them to dst. This +// function expects that each read from src will provide a single RTP packet. +func (e *Extractor) Extract(dst io.Writer, src io.Reader, delay time.Duration) error { + buf := make([]byte, maxRTPSize) + + var ( + qTables [128][128]byte + qTablesLen [128]byte + ) + + for { + n, err := src.Read(buf) + switch err { + case nil: // Do nothing. + case io.EOF: + return nil + default: + return fmt.Errorf("source read error: %v\n", err) + } + + // Get payload from RTP packet. + p, err := rtp.Payload(buf[:n]) + if err != nil { + return fmt.Errorf("could not get RTP payload, failed with err: %v\n", err) + } + + b := newByteStream(p) + _ = b.get8() // Ignore type-specific flag + + var ( + off = b.get24() // Fragment offset. + t = b.get8() // Type. + q = b.get8() // Quantization value. + width = b.get8() // Picture width. + height = b.get8() // Picture height. + dri int // Restart interval. + ) + + if t&0x40 != 0 { + dri = b.get16() + _ = b.get16() // Ignore restart count. + t &= ^0x40 + } + + if t > 1 { + panic("unimplemented RTP/JPEG type") + } + + // Parse quantization table if our offset is 0. + if off == 0 { + var qTable []byte + var qLen int + + if q > 127 { + _ = b.get8() // Ignore first byte (reserved for future use). + prec := b.get8() // The size of coefficients. + qLen = b.get16() // The length of the quantization table. + + if prec != 0 { + panic("unsupported precision") + } + + if qLen > 0 { + qTable = b.getBuf(qLen) + + if q < 255 { + if qTablesLen[q-128] == 0 && qLen <= 128 { + copy(qTables[q-128][:],qTable) + qTablesLen[q-128] = byte(qLen) + } + } + } else { + if q == 255 { + return errNoQTable + } + + if qTablesLen[q-128] == 0 { + return fmt.Errorf("no quantization tables known for q %d yet",q) + } + + qTable = qTables[q-128][:] + qLen = int(qTablesLen[q-128]) + } + } else { // q <= 127 + if q == 0 || q > 99 { + return errReservedQ + } + qTable = defaultQTable(q) + qLen = len(qTable) + } + + e.buf.Reset() + + err = writeHeader(e.buf, t, width, height, qLen / 64, dri, qTable) + if err != nil { + return fmt.Errorf("could not write JPEG header: %w",err) + } + } + + if e.buf.Len() == 0 { + // Must have missed start of frame? So ignore and wait for start. + continue + } + + // TODO: check that timestamp is consistent + // This will need expansion to RTP package to create Timestamp parsing func. + + // TODO: could also check offset with how many bytes we currently have + // to determine if there are missing frames. + + // Write frame data + err = b.writeTo(e.buf,b.remaining()) + if err != nil { + return fmt.Errorf("could not write remaining frame data to output buffer: %w",err) + } + + m, err := rtp.Marker(buf[:n]) + if err != nil { + return fmt.Errorf("could not read RTP marker: %w",err) + } + + if m { + _,err = e.buf.Write([]byte{0xff,codeEOI}) + if err != nil { + return fmt.Errorf("could not write EOI marker: %w",err) + } + + _,err = e.buf.WriteTo(dst) + if err != nil { + return fmt.Errorf("could not write JPEG to dst: %w",err) + } + } + } +} + +type byteStream struct { + bytes []byte + i int +} + +func newByteStream(b []byte) *byteStream { return &byteStream{bytes: b} } + +func (b *byteStream) get24() int { + v := int(b.bytes[b.i])<<16 | int(b.bytes[b.i+1])<<8 | int(b.bytes[b.i+2]) + b.i += 3 + return v +} + +func (b *byteStream) get8() int { + v := int(b.bytes[b.i]) + b.i++ + return v +} + +func (b *byteStream) get16() int { + v := int(b.bytes[b.i])<<8 | int(b.bytes[b.i+1]) + b.i += 2 + return v +} + +func (b *byteStream) getBuf(n int) []byte { + v := b.bytes[b.i:b.i+n] + b.i += n + return v +} + +func (b *byteStream) remaining() int { + return len(b.bytes) - b.i +} + +func (b *byteStream) writeTo(w io.Writer, n int) error { + _n,err := w.Write(b.bytes[b.i:n]) + b.i += _n + if err != nil { + return err + } + return nil +} + +func defaultQTable(q int) []byte { + f := clip(q,q,99) + const tabLen = 128 + tab := make([]byte,tabLen) + + if q < 50 { + q = 5000 / f + } else { + q = 200 - f*2 + } + + for i := 0; i < tabLen; i++ { + v := (int(defaultQuantisers[i])*q + 50) / 100 + v = clip(v,1,255) + tab[i] = byte(v) + } + return tab +} + +func clip(v, min, max int) int { + if v < min { + return min + } + + if v > max { + return max + } + + return v +} diff --git a/codec/mjpeg/jpeg.go b/codec/mjpeg/jpeg.go index be4c7604..72daf6f6 100644 --- a/codec/mjpeg/jpeg.go +++ b/codec/mjpeg/jpeg.go @@ -39,6 +39,7 @@ const ( codeSOS = 0xda // Start of scan. codeAPP0 = 0xe0 // TODO: find out what this is. codeSOF0 = 0xc0 // Baseline + codeEOI = 0xd9 // End of image. ) var ( @@ -96,42 +97,37 @@ var ( } ) -// writeMarker writes an JPEG marker with code to w. -func writeMarker(w io.Writer, code byte) error { - _, err := w.Write([]byte{0xff, code}) - if err != nil { - return err - } - return nil +type multiError []error + +func (me multiError) Error() string { + return fmt.Sprintf("%v", []error(me)) } -// writeHuffman write a JPEG huffman table to w. -func writeHuffman(w io.Writer, class, id int, bits, values []byte) error { - _, err := w.Write([]byte{byte(class<<4 | id)}) - if err != nil { - return fmt.Errorf("could not write class and id: %w", err) - } +func (me multiError) add(e error) { + me = append(me, e) +} - var n int - for i := 1; i <= 16; i++ { - n += int(bits[i]) - } +type putter struct { + idx int +} - _, err = w.Write(bits[1:17]) - if err != nil { - return fmt.Errorf("could not write first lot of huffman bytes: %w", err) - } +func (p *putter) put16(b []byte, v uint16) { + binary.BigEndian.PutUint16(b[p.idx:], v) + p.idx += 2 +} - _, err = w.Write(values[0:n]) - if err != nil { - return fmt.Errorf("could not write second lot of huffman bytes: %w", err) - } +func (p *putter) put8(b []byte, v uint8) { + b[p.idx] = byte(v) + p.idx++ +} - return nil +func (p *putter) putBuf(dst, src []byte, l int) { + copy(dst[p.idx:], src) + p.idx++ } // writeHeader writes a JPEG header to the writer w. -func writeHeader(w io.Writer, size, _type, width, height, nbqTab, dri int, qtable []byte) error { +func writeHeader(w io.Writer, _type, width, height, nbqTab, dri int, qtable []byte) error { width <<= 3 height <<= 3 @@ -282,31 +278,36 @@ func writeHeader(w io.Writer, size, _type, width, height, nbqTab, dri int, qtabl return nil } -type multiError []error - -func (me multiError) Error() string { - return fmt.Sprintf("%v", []error(me)) +// writeMarker writes an JPEG marker with code to w. +func writeMarker(w io.Writer, code byte) error { + _, err := w.Write([]byte{0xff, code}) + if err != nil { + return err + } + return nil } -func (me multiError) add(e error) { - me = append(me, e) -} +// writeHuffman write a JPEG huffman table to w. +func writeHuffman(w io.Writer, class, id int, bits, values []byte) error { + _, err := w.Write([]byte{byte(class<<4 | id)}) + if err != nil { + return fmt.Errorf("could not write class and id: %w", err) + } -type putter struct { - idx int -} + var n int + for i := 1; i <= 16; i++ { + n += int(bits[i]) + } -func (p *putter) put16(b []byte, v uint16) { - binary.BigEndian.PutUint16(b[p.idx:], v) - p.idx += 2 -} + _, err = w.Write(bits[1:17]) + if err != nil { + return fmt.Errorf("could not write first lot of huffman bytes: %w", err) + } -func (p *putter) put8(b []byte, v uint8) { - b[p.idx] = byte(v) - p.idx++ -} + _, err = w.Write(values[0:n]) + if err != nil { + return fmt.Errorf("could not write second lot of huffman bytes: %w", err) + } -func (p *putter) putBuf(dst, src []byte, l int) { - copy(dst[p.idx:], src) - p.idx++ + return nil }