From c0f9f7bf7b0dbdb85adf341ad3f02b2e9f039d58 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Sun, 19 Aug 2018 20:29:22 +0930 Subject: [PATCH 1/5] encoding: restructure packages handling stream encoding --- .../generator.go => encoding/encoding.go | 8 +- .../flv/encoder.go | 49 +++--- {flv => encoding/flv}/flv.go | 0 .../mts/encoder.go | 11 +- mpegts/MpegTs.go => encoding/mts/mpegts.go | 6 +- {pes => encoding/mts/pes}/pes.go | 0 encoding/mts/pes/pes_test.go | 74 ++++++++++ {mpegts => encoding/mts}/psi/pat.go | 0 {mpegts => encoding/mts}/psi/pmt.go | 0 itut/standards.go | 33 ----- mpegts/mpegts_test.go | 69 --------- parser/h264.go | 139 ------------------ parser/mjpeg.go | 87 ----------- parser/parser.go | 50 ------- parser/parser_test.go | 127 ---------------- pes/pes_test.go | 76 ---------- revid/revid.go | 32 ++-- 17 files changed, 126 insertions(+), 635 deletions(-) rename generator/generator.go => encoding/encoding.go (85%) rename generator/flv_generator.go => encoding/flv/encoder.go (87%) rename {flv => encoding/flv}/flv.go (100%) rename generator/mpegts_generator.go => encoding/mts/encoder.go (97%) rename mpegts/MpegTs.go => encoding/mts/mpegts.go (98%) rename {pes => encoding/mts/pes}/pes.go (100%) create mode 100644 encoding/mts/pes/pes_test.go rename {mpegts => encoding/mts}/psi/pat.go (100%) rename {mpegts => encoding/mts}/psi/pmt.go (100%) delete mode 100644 itut/standards.go delete mode 100644 mpegts/mpegts_test.go delete mode 100644 parser/h264.go delete mode 100644 parser/mjpeg.go delete mode 100644 parser/parser.go delete mode 100644 parser/parser_test.go delete mode 100644 pes/pes_test.go mode change 100755 => 100644 revid/revid.go diff --git a/generator/generator.go b/encoding/encoding.go similarity index 85% rename from generator/generator.go rename to encoding/encoding.go index 50bace43..10233373 100644 --- a/generator/generator.go +++ b/encoding/encoding.go @@ -1,6 +1,6 @@ /* NAME - generator.go + encoding.go DESCRIPTION See Readme.md @@ -9,7 +9,7 @@ AUTHOR Saxon Nelson-Milton LICENSE - generator.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + encoding.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the @@ -25,9 +25,9 @@ LICENSE along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. */ -package generator +package encoding -type Generator interface { +type Encoder interface { InputChan() chan []byte OutputChan() <-chan []byte Start() diff --git a/generator/flv_generator.go b/encoding/flv/encoder.go similarity index 87% rename from generator/flv_generator.go rename to encoding/flv/encoder.go index 296e8849..bbcfb73e 100644 --- a/generator/flv_generator.go +++ b/encoding/flv/encoder.go @@ -6,6 +6,7 @@ DESCRIPTION See Readme.md AUTHOR + Dan Kortschak Saxon Nelson-Milton LICENSE @@ -22,15 +23,11 @@ LICENSE for more details. You should have received a copy of the GNU General Public License - along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. */ -package generator +package flv -import ( - "time" - - "bitbucket.org/ausocean/av/flv" -) +import "time" const ( inputChanLength = 500 @@ -61,7 +58,7 @@ type flvGenerator struct { audio bool video bool lastTagSize int - header flv.Header + header Header startTime time.Time firstTag bool isGenerating bool @@ -105,7 +102,7 @@ func (g *flvGenerator) Stop() { // GenHeader generates the flv header and sends it down the output chan for use // This will generally be called once at the start of file writing/transmission. func (g *flvGenerator) GenHeader() { - header := flv.Header{ + header := Header{ HasAudio: g.audio, HasVideo: g.video, } @@ -227,23 +224,23 @@ func (g *flvGenerator) generate() { // Do we have video to send off? if g.video { if isKeyFrame(frame) { - frameType = flv.KeyFrameType + frameType = KeyFrameType } else { - frameType = flv.InterFrameType + frameType = InterFrameType } if isSequenceHeader(frame) { - packetType = flv.SequenceHeader + packetType = SequenceHeader } else { - packetType = flv.AVCNALU + packetType = AVCNALU } - tag := flv.VideoTag{ - TagType: uint8(flv.VideoTagType), - DataSize: uint32(len(frame)) + flv.DataHeaderLength, + tag := VideoTag{ + TagType: uint8(VideoTagType), + DataSize: uint32(len(frame)) + DataHeaderLength, Timestamp: timeStamp, - TimestampExtended: flv.NoTimestampExtension, + TimestampExtended: NoTimestampExtension, FrameType: frameType, - Codec: flv.H264, + Codec: H264, PacketType: packetType, CompositionTime: 0, Data: frame, @@ -255,12 +252,12 @@ func (g *flvGenerator) generate() { if g.audio { // Not sure why but we need two audio tags for dummy silent audio // TODO: create constants or SoundSize and SoundType parameters - tag := flv.AudioTag{ - TagType: uint8(flv.AudioTagType), + tag := AudioTag{ + TagType: uint8(AudioTagType), DataSize: 7, Timestamp: timeStamp, - TimestampExtended: flv.NoTimestampExtension, - SoundFormat: flv.AACAudioFormat, + TimestampExtended: NoTimestampExtension, + SoundFormat: AACAudioFormat, SoundRate: 3, SoundSize: true, SoundType: true, @@ -269,12 +266,12 @@ func (g *flvGenerator) generate() { } g.outputChan <- tag.Bytes() - tag = flv.AudioTag{ - TagType: uint8(flv.AudioTagType), + tag = AudioTag{ + TagType: uint8(AudioTagType), DataSize: 21, Timestamp: timeStamp, - TimestampExtended: flv.NoTimestampExtension, - SoundFormat: flv.AACAudioFormat, + TimestampExtended: NoTimestampExtension, + SoundFormat: AACAudioFormat, SoundRate: 3, SoundSize: true, SoundType: true, diff --git a/flv/flv.go b/encoding/flv/flv.go similarity index 100% rename from flv/flv.go rename to encoding/flv/flv.go diff --git a/generator/mpegts_generator.go b/encoding/mts/encoder.go similarity index 97% rename from generator/mpegts_generator.go rename to encoding/mts/encoder.go index 0c4a1811..d6395aea 100644 --- a/generator/mpegts_generator.go +++ b/encoding/mts/encoder.go @@ -26,7 +26,7 @@ LICENSE along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. */ -package generator +package mts import ( "encoding/binary" @@ -34,8 +34,7 @@ import ( "math/bits" "time" - "bitbucket.org/ausocean/av/mpegts" - "bitbucket.org/ausocean/av/pes" + "bitbucket.org/ausocean/av/encoding/mts/pes" ) const psiPacketSize = 184 @@ -221,7 +220,7 @@ func (g *tsGenerator) generate() { nalu := <-g.nalInputChan // Write PAT - patPkt := mpegts.Packet{ + patPkt := Packet{ PUSI: true, PID: patPid, CC: g.ccFor(patPid), @@ -231,7 +230,7 @@ func (g *tsGenerator) generate() { g.outputChan <- patPkt.Bytes() // Write PMT. - pmtPkt := mpegts.Packet{ + pmtPkt := Packet{ PUSI: true, PID: pmtPid, CC: g.ccFor(pmtPid), @@ -252,7 +251,7 @@ func (g *tsGenerator) generate() { pusi := true for len(buf) != 0 { - pkt := mpegts.Packet{ + pkt := Packet{ PUSI: pusi, PID: videoPid, RAI: pusi, diff --git a/mpegts/MpegTs.go b/encoding/mts/mpegts.go similarity index 98% rename from mpegts/MpegTs.go rename to encoding/mts/mpegts.go index 4b4a8a02..eb1dce6f 100644 --- a/mpegts/MpegTs.go +++ b/encoding/mts/mpegts.go @@ -1,6 +1,6 @@ /* NAME - MpegTs.go - provides a data structure intended to encapsulate the properties + mpegts.go - provides a data structure intended to encapsulate the properties of an MpegTs packet and also functions to allow manipulation of these packets. DESCRIPTION @@ -10,7 +10,7 @@ AUTHOR Saxon A. Nelson-Milton LICENSE - MpegTs.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + mpegts.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the @@ -26,7 +26,7 @@ LICENSE along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). */ -package mpegts +package mts const ( mpegTsSize = 188 diff --git a/pes/pes.go b/encoding/mts/pes/pes.go similarity index 100% rename from pes/pes.go rename to encoding/mts/pes/pes.go diff --git a/encoding/mts/pes/pes_test.go b/encoding/mts/pes/pes_test.go new file mode 100644 index 00000000..ee436253 --- /dev/null +++ b/encoding/mts/pes/pes_test.go @@ -0,0 +1,74 @@ +/* +NAME + mpegts_test.go + +DESCRIPTION + See Readme.md + +AUTHOR + Dan Kortschak + Saxon Nelson-Milton + +LICENSE + mpegts_test.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + +package pes + +import ( + "reflect" + "testing" +) + +const ( + dataLength = 3 // bytes +) + +func TestPesToByteSlice(t *testing.T) { + pkt := Packet{ + StreamID: 0xE0, // StreamID + PDI: byte(2), + PTS: 100000, + HeaderLength: byte(10), + Stuff: []byte{0xFF, 0xFF}, + Data: []byte{0xEA, 0x4B, 0x12}, + } + got := pkt.Bytes() + want := []byte{ + 0x00, // packet start code prefix byte 1 + 0x00, // packet start code prefix byte 2 + 0x01, // packet start code prefix byte 3 + 0xE0, // stream ID + 0x00, // PES Packet length byte 1 + 0x00, // PES packet length byte 2 + 0x80, // Marker bits,ScramblingControl, Priority, DAI, Copyright, Original + 0x80, // PDI, ESCR, ESRate, DSMTrickMode, ACI, CRC, Ext + 10, // header length + 0x21, // PCR byte 1 + 0x00, // pcr byte 2 + 0x07, // pcr byte 3 + 0x0D, // pcr byte 4 + 0x41, // pcr byte 5 + 0xFF, // Stuffing byte 1 + 0xFF, // stuffing byte 3 + 0xEA, // data byte 1 + 0x4B, // data byte 2 + 0x12, // data byte 3 + } + if !reflect.DeepEqual(got, want) { + t.Errorf("unexpected packet encoding:\ngot: %#v\nwant:%#v", got, want) + } +} diff --git a/mpegts/psi/pat.go b/encoding/mts/psi/pat.go similarity index 100% rename from mpegts/psi/pat.go rename to encoding/mts/psi/pat.go diff --git a/mpegts/psi/pmt.go b/encoding/mts/psi/pmt.go similarity index 100% rename from mpegts/psi/pmt.go rename to encoding/mts/psi/pmt.go diff --git a/itut/standards.go b/itut/standards.go deleted file mode 100644 index 30880d5a..00000000 --- a/itut/standards.go +++ /dev/null @@ -1,33 +0,0 @@ -/* -NAME - RtpToTsConverter.go - provides utilities for the conversion of Rtp packets - to equivalent MpegTs packets. - -DESCRIPTION - See Readme.md - -AUTHOR - Saxon Nelson-Milton - -LICENSE - RtpToTsConverter.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) - - It is free software: you can redistribute it and/or modify them - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - It is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. - - You should have received a copy of the GNU General Public License - along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). -*/ - -package itut - -func StartCode1() []byte { return []byte{0x00, 0x00, 0x01} } -func StartCode2() []byte { return []byte{0x00, 0x00, 0x00, 0x01} } -func AUD() []byte { return []byte{0x09, 0xF0} } diff --git a/mpegts/mpegts_test.go b/mpegts/mpegts_test.go deleted file mode 100644 index 15600d19..00000000 --- a/mpegts/mpegts_test.go +++ /dev/null @@ -1,69 +0,0 @@ -/* -NAME - MpegTs.go - provides a data structure intended to encapsulate the properties - of an MpegTs packet. - -DESCRIPTION - See Readme.md - -AUTHOR - Saxon Nelson-Milton - -LICENSE - MpegTs.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) - - It is free software: you can redistribute it and/or modify them - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - It is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. - - You should have received a copy of the GNU General Public License - along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). -*/ - -package mpegts - -import "testing" - -// Just ensure that we can create a byte slice with a mpegts packet correctly -func TestMpegTsToByteSlice(t *testing.T) { - payload := []byte{0x56, 0xA2, 0x78, 0x89, 0x67} - pcr := 100000 // => 100000 - stuffing := make([]byte, 171) - for i := range stuffing { - stuffing[i] = 0xFF - } - tsPkt := MpegTsPacket{ - PUSI: true, - PID: uint16(256), - AFC: byte(3), - AFL: 7 + 171, - CC: byte(6), - PCRF: true, - PCR: uint64(pcr), - Stuff: stuffing, - Payload: payload, - } - expectedOutput := []byte{0x47, 0x41, 0x00, 0x36, byte(178), 0x10} - for i := 40; i >= 0; i -= 8 { - expectedOutput = append(expectedOutput, byte(pcr>>uint(i))) - } - for i := 0; i < 171; i++ { - expectedOutput = append(expectedOutput, 0xFF) - } - expectedOutput = append(expectedOutput, payload...) - tsPktAsByteSlice, err := tsPkt.ToByteSlice() - if err != nil { - t.Errorf("Should not have got error!: %v", err) - } - for i := 0; i < 188; i++ { - if tsPktAsByteSlice[i] != expectedOutput[i] { - t.Errorf("Conversion to byte slice bad! Byte: %v Wanted: %v Got: %v", i, expectedOutput[i], tsPktAsByteSlice[i]) - } - } -} diff --git a/parser/h264.go b/parser/h264.go deleted file mode 100644 index 86ecd030..00000000 --- a/parser/h264.go +++ /dev/null @@ -1,139 +0,0 @@ -/* -NAME - h264.go - -DESCRIPTION - See Readme.md - -AUTHOR - Saxon Nelson-Milton - -LICENSE - h264.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) - - It is free software: you can redistribute it and/or modify them - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - It is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. - - You should have received a copy of the GNU General Public License - along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. -*/ -package parser - -import ( - "time" - - "bitbucket.org/ausocean/av/itut" -) - -const ( - inputChanSize = 100000 - outputBufferSize = 10000 -) - -// H264 provides properties and methods to allow for the parsing of a -// h264 stream - i.e. to allow extraction of the individual access units -type H264 struct { - inputBuffer []byte - isParsing bool - parserOutputChanRef chan []byte - userOutputChanRef chan []byte - inputChan chan byte - delay uint -} - -// NewH264Parser returns an instance of the H264 struct -func NewH264Parser() (p *H264) { - p = new(H264) - p.isParsing = true - p.inputChan = make(chan byte, inputChanSize) - p.delay = 0 - return -} - -// Stop simply sets the isParsing flag to false to indicate to the parser that -// we don't want to interpret incoming data anymore - this will also make the -// parser jump out of the parse func -func (p *H264) Stop() { - p.isParsing = false -} - -// Start starts the parse func as a goroutine so that incoming data is interpreted -func (p *H264) Start() { - p.isParsing = true - go p.parse() -} - -// SetDelay sets a delay inbetween each buffer output. Useful if we're parsing -// a file but want to replicate the speed of incoming video frames from a -// camera -func (p *H264) SetDelay(delay uint) { - p.delay = delay -} - -// InputChan returns a handle to the input channel of the parser -func (p *H264) InputChan() chan byte { - return p.inputChan -} - -// OutputChan returns a handle to the output chan of the parser -func (p *H264) OutputChan() <-chan []byte { - return p.userOutputChanRef -} - -// SetOutputChan sets the parser output chan to the passed output chan. This is -// useful if we want the parser output to go directly to a generator of some sort -// for packetization. -func (p *H264) SetOutputChan(o chan []byte) { - p.parserOutputChanRef = o - p.userOutputChanRef = o -} - -// parse interprets an incoming h264 stream and extracts individual frames -// aka access units -func (p *H264) parse() { - outputBuffer := make([]byte, 0, outputBufferSize) - searchingForEnd := false - for p.isParsing { - var aByte uint8 - if p.isParsing { - aByte = <-p.inputChan - } else { - return - } - - outputBuffer = append(outputBuffer, aByte) - for i := 1; aByte == 0x00 && i != 4; i++ { - if p.isParsing { - aByte = <-p.inputChan - } else { - return - } - outputBuffer = append(outputBuffer, aByte) - if (aByte == 0x01 && i == 2) || (aByte == 0x01 && i == 3) { - if searchingForEnd { - output := append(append(itut.StartCode1(), itut.AUD()...), outputBuffer[:len(outputBuffer)-(i+1)]...) - time.Sleep(time.Duration(p.delay) * time.Millisecond) - p.parserOutputChanRef <- output - outputBuffer = outputBuffer[len(outputBuffer)-1-i:] - searchingForEnd = false - } - if p.isParsing { - aByte = <-p.inputChan - } else { - return - } - outputBuffer = append(outputBuffer, aByte) - if nalType := aByte & 0x1F; nalType == 1 || nalType == 5 || nalType == 8 || nalType == 7 { - searchingForEnd = true - } - } - } - } -} diff --git a/parser/mjpeg.go b/parser/mjpeg.go deleted file mode 100644 index 4271601a..00000000 --- a/parser/mjpeg.go +++ /dev/null @@ -1,87 +0,0 @@ -/* -NAME - mjpeg.go - -DESCRIPTION - See Readme.md - -AUTHOR - Saxon Nelson-Milton - -LICENSE - mjpeg.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) - - It is free software: you can redistribute it and/or modify them - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - It is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. - - You should have received a copy of the GNU General Public License - along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. -*/ - -package parser - -const frameStartCode = 0xD8 - -type MJPEG struct { - inputBuffer []byte - isParsing bool - parserOutputChanRef chan []byte - userOutputChanRef chan []byte - inputChan chan byte - delay uint -} - -func NewMJPEGParser(inputChanLen int) (p *MJPEG) { - p = new(MJPEG) - p.isParsing = true - p.inputChan = make(chan byte, inputChanLen) - return -} - -func (p *MJPEG) Stop() { - p.isParsing = false -} - -func (p *MJPEG) Start() { - go p.parse() -} - -func (p *MJPEG) SetDelay(delay uint) { - p.delay = delay -} - -func (p *MJPEG) InputChan() chan byte { - return p.inputChan -} - -func (p *MJPEG) OutputChan() <-chan []byte { - return p.userOutputChanRef -} - -func (p *MJPEG) SetOutputChan(o chan []byte) { - p.parserOutputChanRef = o - p.userOutputChanRef = o -} - -func (p *MJPEG) parse() { - var outputBuffer []byte - for p.isParsing { - aByte := <-p.inputChan - outputBuffer = append(outputBuffer, aByte) - if aByte == 0xFF && len(outputBuffer) != 0 { - aByte := <-p.inputChan - outputBuffer = append(outputBuffer, aByte) - if aByte == frameStartCode { - p.parserOutputChanRef <- outputBuffer[:len(outputBuffer)-2] - outputBuffer = outputBuffer[len(outputBuffer)-2:] - } - } - } -} diff --git a/parser/parser.go b/parser/parser.go deleted file mode 100644 index 157b27c1..00000000 --- a/parser/parser.go +++ /dev/null @@ -1,50 +0,0 @@ -/* -NAME - parser.go - -DESCRIPTION - See Readme.md - -AUTHOR - Saxon Nelson-Milton - -LICENSE - parser.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) - - It is free software: you can redistribute it and/or modify them - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - It is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. - - You should have received a copy of the GNU General Public License - along with revid in gpl.txt. If not, see http://www.gnu.org/licensesx). -*/ - -package parser - -import ( - "log" - "sync" -) - -// h264 consts -const acceptedLength = 1000 - -var ( - Info *log.Logger - mutex *sync.Mutex -) - -type Parser interface { - Stop() - Start() - InputChan() chan byte - OutputChan() <-chan []byte - SetOutputChan(achan chan []byte) - SetDelay(delay uint) -} diff --git a/parser/parser_test.go b/parser/parser_test.go deleted file mode 100644 index 066dbf11..00000000 --- a/parser/parser_test.go +++ /dev/null @@ -1,127 +0,0 @@ -/* -NAME - parser_test.go - -DESCRIPTION - See Readme.md - -AUTHOR - Saxon Nelson-Milton - -LICENSE - parser_test.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) - - It is free software: you can redistribute it and/or modify them - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - It is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. - - You should have received a copy of the GNU General Public License - along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. -*/ - -package parser - -import ( - "fmt" - "io/ioutil" - "log" - "os" - "strconv" - "testing" -) - -const ( - mjpegInputFileName = "testInput/testInput.avi" - h264fName = "../../test/test-data/av/input/betterInput.h264" - nOfFrames = 4 - outChanSize = 100 -) - -func TestH264Parser(t *testing.T) { - log.SetOutput(os.Stderr) - log.Println("Opening input file!") - inputFile, err := os.Open(h264fName) - if err != nil { - t.Errorf("Should not have got error opening file!") - return - } - - log.Println("Reading data from file!") - data, err := ioutil.ReadAll(inputFile) - if err != nil { - t.Errorf("Should not have got read error!") - return - } - - // Create 'parser' and start it up - log.Println("Creating parser!") - parser := NewH264Parser() - parser.SetOutputChan(make(chan []byte, outChanSize)) - parser.Start() - - for i, n := 0, 0; n <= nOfFrames; i++ { - select { - case parser.InputChan() <- data[i]: - case frame := <-parser.OutputChan(): - path := fmt.Sprintf("testOutput/" + strconv.Itoa(n) + "h264_frame") - out, err := os.Create(path) - if err != nil { - t.Errorf("Unexpected error creating %q: %v", path, err) - return - } - out.Write(frame) - out.Close() - n++ - default: - } - } -} - -/* -func TestMJPEGParser(t *testing.T) { - log.Println("Opening input file!") - // Open the input file - inputFile, err := os.Open(testInputFileName) - if err != nil { - t.Errorf("Should not have got error opening file!") - } - log.Println("Getting file stats!") - stats, err := inputFile.Stat() - if err != nil { - t.Errorf("Could not get input file stats!") - return - } - log.Println("Creating space for file data!") - data := make([]byte, stats.Size()) - _, err = inputFile.Read(data) - if err != nil { - t.Errorf("Should not have got read error!") - return - } - log.Println("Creating parser!") - parser := NewMJPEGParser(len(data) + 1) - parser.SetOutputChan(make(chan []byte, 10000)) - parser.Start() - log.Printf("len(data): %v\n", len(data)) - for i := range data { - parser.GetInputChan() <- data[i] - } - log.Println("Writing jpegs to files!") - for i := 0; len(parser.GetOutputChan()) > 0; i++ { - // Open a new output file - out, err := os.Create("testOutput/image" + strconv.Itoa(i) + ".jpeg") - if err != nil { - t.Errorf("Should not have got error creating output file!") - return - } - out.Write(<-parser.GetOutputChan()) - out.Close() - } -} -*/ diff --git a/pes/pes_test.go b/pes/pes_test.go deleted file mode 100644 index 172e11c2..00000000 --- a/pes/pes_test.go +++ /dev/null @@ -1,76 +0,0 @@ -/* -NAME - MpegTs.go - provides a data structure intended to encapsulate the properties - of an MpegTs packet. - -DESCRIPTION - See Readme.md - -AUTHOR - Saxon Nelson-Milton - -LICENSE - MpegTs.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) - - It is free software: you can redistribute it and/or modify them - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - It is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. - - You should have received a copy of the GNU General Public License - along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. -*/ - -package pes - -import ( - "testing" -) - -const ( - dataLength = 3 // bytes -) - -func TestPesToByteSlice(t *testing.T) { - pesPkt := PESPacket{ - StreamID: 0xE0, // StreamID - PDI: byte(2), - PTS: 100000, - HeaderLength: byte(10), - Stuff: []byte{0xFF, 0xFF}, - Data: []byte{0xEA, 0x4B, 0x12}, - } - pesExpectedOutput := []byte{ - 0x00, // packet start code prefix byte 1 - 0x00, // packet start code prefix byte 2 - 0x01, // packet start code prefix byte 3 - 0xE0, // stream ID - 0x00, // PES Packet length byte 1 - 0x00, // PES packet length byte 2 - 0x80, // Marker bits,ScramblingControl, Priority, DAI, Copyright, Original - 0x80, // PDI, ESCR, ESRate, DSMTrickMode, ACI, CRC, Ext - byte(10), // header length - 0x21, // PCR byte 1 - 0x00, // pcr byte 2 - 0x07, // pcr byte 3 - 0x0D, // pcr byte 4 - 0x41, // pcr byte 5 - 0xFF, // Stuffing byte 1 - 0xFF, // stuffing byte 3 - 0xEA, // data byte 1 - 0x4B, // data byte 2 - 0x12, // data byte 3 - } - pesPktAsByteSlice := pesPkt.ToByteSlice() - for ii := range pesPktAsByteSlice { - if pesPktAsByteSlice[ii] != pesExpectedOutput[ii] { - t.Errorf("Conversion to byte slice bad! Byte: %v Wanted: %v Got: %v", - ii, pesExpectedOutput[ii], pesPktAsByteSlice[ii]) - } - } -} diff --git a/revid/revid.go b/revid/revid.go old mode 100755 new mode 100644 index 40a916d2..663bea98 --- a/revid/revid.go +++ b/revid/revid.go @@ -38,7 +38,9 @@ import ( "strconv" "time" - "bitbucket.org/ausocean/av/generator" + "bitbucket.org/ausocean/av/encoding" + "bitbucket.org/ausocean/av/encoding/flv" + "bitbucket.org/ausocean/av/encoding/mts" "bitbucket.org/ausocean/av/parse" "bitbucket.org/ausocean/av/rtmp" "bitbucket.org/ausocean/iot/pi/netsender" @@ -88,7 +90,7 @@ type Revid struct { ringBuffer *ring.Buffer config Config isRunning bool - generator generator.Generator + encoder encoding.Encoder parse func(dst io.Writer, src io.Reader, delay time.Duration) error cmd *exec.Cmd inputReader io.ReadCloser @@ -209,11 +211,11 @@ func (r *Revid) reset(config Config) error { case Mpegts: r.Log(Info, "Using MPEGTS packetisation") frameRate, _ := strconv.Atoi(r.config.FrameRate) - r.generator = generator.NewTsGenerator(float64(frameRate)) + r.encoder = mts.NewTsGenerator(float64(frameRate)) case Flv: r.Log(Info, "Using FLV packetisation") frameRate, _ := strconv.Atoi(r.config.FrameRate) - r.generator = generator.NewFlvGenerator(true, true, frameRate) + r.encoder = flv.NewFlvGenerator(true, true, frameRate) } // We have packetization of some sort, so we want to send data to Generator // to perform packetization @@ -255,8 +257,8 @@ func (r *Revid) Start() { go r.outputClips() r.Log(Info, "Starting clip packing routine") go r.packClips() - r.Log(Info, "Starting packetisation generator") - r.generator.Start() + r.Log(Info, "Starting packetisation encoder") + r.encoder.Start() r.Log(Info, "Setting up input and receiving content") go r.setupInput() } @@ -271,9 +273,9 @@ func (r *Revid) Stop() { r.Log(Info, "Stopping revid!") r.isRunning = false - r.Log(Info, "Stopping generator!") - if r.generator != nil { - r.generator.Stop() + r.Log(Info, "Stopping encoder!") + if r.encoder != nil { + r.encoder.Stop() } r.Log(Info, "Killing input proccess!") @@ -284,15 +286,15 @@ func (r *Revid) Stop() { } // getFrameNoPacketization gets a frame directly from the revid output chan -// as we don't need to go through the generator with no packetization settings +// as we don't need to go through the encoder with no packetization settings func (r *Revid) getFrameNoPacketization() []byte { return <-r.outputChan } // getFramePacketization gets a frame from the generators output chan - the -// the generator being an mpegts or flv generator depending on the config +// the encoder being an mpegts or flv encoder depending on the config func (r *Revid) getFramePacketization() []byte { - return <-r.generator.OutputChan() + return <-r.encoder.OutputChan() } // packClips takes data segments; whether that be tsPackets or mjpeg frames and @@ -304,7 +306,7 @@ func (r *Revid) packClips() { select { // TODO: This is temporary, need to work out how to make this work // for cases when there is not packetisation. - case frame := <-r.generator.OutputChan(): + case frame := <-r.encoder.OutputChan(): lenOfFrame := len(frame) if lenOfFrame > ringBufferElementSize { r.Log(Warning, fmt.Sprintf("Frame was too big: %v bytes, getting another one!", lenOfFrame)) @@ -475,7 +477,7 @@ func (r *Revid) startRaspivid() error { r.inputReader = stdout go func() { r.Log(Info, "Reading camera data!") - r.parse(chunkWriter(r.generator.InputChan()), r.inputReader, 0) + r.parse(chunkWriter(r.encoder.InputChan()), r.inputReader, 0) r.Log(Info, "Not trying to read from camera anymore!") }() return nil @@ -498,7 +500,7 @@ func (r *Revid) setupInputForFile() error { defer f.Close() // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. - return r.parse(chunkWriter(r.generator.InputChan()), f, delay) + return r.parse(chunkWriter(r.encoder.InputChan()), f, delay) } // chunkWriter is a shim between the new function-based approach From 5d8eca850cd795eeb4c22f32bf86ac539e579a56 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Sun, 19 Aug 2018 20:39:57 +0930 Subject: [PATCH 2/5] mts: rename encoder type --- encoding/mts/encoder.go | 68 ++++++++++++++++++++--------------------- revid/revid.go | 4 +-- 2 files changed, 36 insertions(+), 36 deletions(-) diff --git a/encoding/mts/encoder.go b/encoding/mts/encoder.go index d6395aea..6512dbd8 100644 --- a/encoding/mts/encoder.go +++ b/encoding/mts/encoder.go @@ -1,6 +1,6 @@ /* NAME - mpegts_generator.go + encoder.go DESCRIPTION See Readme.md @@ -10,7 +10,7 @@ AUTHOR Saxon Nelson-Milton LICENSE - mpegts_generator.go is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean) + encoder.go is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the @@ -154,8 +154,8 @@ const ( pcrFreq = 90000 // Hz ) -// tsGenerator encapsulates properties of an mpegts generator. -type tsGenerator struct { +// Encoder encapsulates properties of an mpegts generator. +type Encoder struct { outputChan chan []byte nalInputChan chan []byte @@ -166,9 +166,9 @@ type tsGenerator struct { continuity map[int]byte } -// NewTsGenerator returns an instance of the tsGenerator struct -func NewTsGenerator(fps float64) (g *tsGenerator) { - return &tsGenerator{ +// NewEncoder returns an Encoder with the specified frame rate. +func NewEncoder(fps float64) *Encoder { + return &Encoder{ outputChan: make(chan []byte, 1), nalInputChan: make(chan []byte, 1), @@ -185,22 +185,22 @@ func NewTsGenerator(fps float64) (g *tsGenerator) { // Start is called when we would like generation to begin, i.e. we would like // the generator to start taking input data and creating mpegts packets -func (g *tsGenerator) Start() { - go g.generate() +func (e *Encoder) Start() { + go e.generate() } -func (g *tsGenerator) Stop() {} +func (e *Encoder) Stop() {} // InputChan returns a handle to the nalInputChan (inputChan) so that nal units // can be passed to the generator and processed -func (g *tsGenerator) InputChan() chan []byte { - return g.nalInputChan +func (e *Encoder) InputChan() chan []byte { + return e.nalInputChan } // OutputChan returns a handle to the generator output chan where the mpegts // packets will show up once ready to go -func (g *tsGenerator) OutputChan() <-chan []byte { - return g.outputChan +func (e *Encoder) OutputChan() <-chan []byte { + return e.outputChan } const ( @@ -215,35 +215,35 @@ const ( // generate handles the incoming data and generates equivalent mpegts packets - // sending them to the output channel -func (g *tsGenerator) generate() { +func (e *Encoder) generate() { for { - nalu := <-g.nalInputChan + nalu := <-e.nalInputChan // Write PAT patPkt := Packet{ PUSI: true, PID: patPid, - CC: g.ccFor(patPid), + CC: e.ccFor(patPid), AFC: hasPayload, Payload: patTable, } - g.outputChan <- patPkt.Bytes() + e.outputChan <- patPkt.Bytes() // Write PMT. pmtPkt := Packet{ PUSI: true, PID: pmtPid, - CC: g.ccFor(pmtPid), + CC: e.ccFor(pmtPid), AFC: hasPayload, Payload: pmtTable, } - g.outputChan <- pmtPkt.Bytes() + e.outputChan <- pmtPkt.Bytes() // Prepare PES data. pesPkt := pes.Packet{ StreamID: streamID, PDI: hasPTS, - PTS: g.pts(), + PTS: e.pts(), Data: nalu, HeaderLength: 5, } @@ -255,7 +255,7 @@ func (g *tsGenerator) generate() { PUSI: pusi, PID: videoPid, RAI: pusi, - CC: g.ccFor(videoPid), + CC: e.ccFor(videoPid), AFC: hasAdaptationField | hasPayload, PCRF: pusi, } @@ -265,36 +265,36 @@ func (g *tsGenerator) generate() { if pusi { // If the packet has a Payload Unit Start Indicator // flag set then we need to write a PCR. - pkt.PCR = g.pcr() + pkt.PCR = e.pcr() pusi = false } - g.outputChan <- pkt.Bytes() + e.outputChan <- pkt.Bytes() } - g.tick() + e.tick() } } // tick advances the clock one frame interval. -func (g *tsGenerator) tick() { - g.clock += g.frameInterval +func (e *Encoder) tick() { + e.clock += e.frameInterval } // pts retuns the current presentation timestamp. -func (g *tsGenerator) pts() uint64 { - return uint64((g.clock + g.ptsOffset).Seconds() * pcrFreq) +func (e *Encoder) pts() uint64 { + return uint64((e.clock + e.ptsOffset).Seconds() * pcrFreq) } // pcr returns the current program clock reference. -func (g *tsGenerator) pcr() uint64 { - return uint64(g.clock.Seconds() * pcrFreq) +func (e *Encoder) pcr() uint64 { + return uint64(e.clock.Seconds() * pcrFreq) } // ccFor returns the next continuity counter for pid. -func (g *tsGenerator) ccFor(pid int) byte { - cc := g.continuity[pid] +func (e *Encoder) ccFor(pid int) byte { + cc := e.continuity[pid] const continuityCounterMask = 0xf - g.continuity[pid] = (cc + 1) & continuityCounterMask + e.continuity[pid] = (cc + 1) & continuityCounterMask return cc } diff --git a/revid/revid.go b/revid/revid.go index 663bea98..3f9d2ff2 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -210,8 +210,8 @@ func (r *Revid) reset(config Config) error { return nil case Mpegts: r.Log(Info, "Using MPEGTS packetisation") - frameRate, _ := strconv.Atoi(r.config.FrameRate) - r.encoder = mts.NewTsGenerator(float64(frameRate)) + frameRate, _ := strconv.ParseFloat(r.config.FrameRate, 64) + r.encoder = mts.NewEncoder(frameRate) case Flv: r.Log(Info, "Using FLV packetisation") frameRate, _ := strconv.Atoi(r.config.FrameRate) From 437c7756dfaefc9f13a1d9a69c1e9bf44392dee7 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Sun, 19 Aug 2018 21:05:04 +0930 Subject: [PATCH 3/5] flv: rename encoder type --- encoding/flv/encoder.go | 66 ++++++++++++++++++++--------------------- revid/revid.go | 2 +- 2 files changed, 34 insertions(+), 34 deletions(-) diff --git a/encoding/flv/encoder.go b/encoding/flv/encoder.go index bbcfb73e..45f3a269 100644 --- a/encoding/flv/encoder.go +++ b/encoding/flv/encoder.go @@ -49,9 +49,9 @@ var ( } ) -// flvGenerator provides properties required for the generation of flv video +// Encoder provides properties required for the generation of flv video // from raw video data -type flvGenerator struct { +type Encoder struct { fps int inputChan chan []byte outputChan chan []byte @@ -66,19 +66,19 @@ type flvGenerator struct { // InputChan returns the input channel to the generator. This is where the // raw data frames are entered into the generator -func (g *flvGenerator) InputChan() chan []byte { - return g.inputChan +func (e *Encoder) InputChan() chan []byte { + return e.inputChan } // OutputChan retuns the output chan of the generator - this is where the // flv packets (more specifically tags) are outputted. -func (g *flvGenerator) OutputChan() <-chan []byte { - return g.outputChan +func (e *Encoder) OutputChan() <-chan []byte { + return e.outputChan } -// NewFlvGenerator retuns an instance of the flvGenerator struct -func NewFlvGenerator(audio, video bool, fps int) *flvGenerator { - return &flvGenerator{ +// NewEncoder retuns a new FLV encoder. +func NewEncoder(audio, video bool, fps int) *Encoder { + return &Encoder{ fps: fps, audio: audio, video: video, @@ -90,33 +90,33 @@ func NewFlvGenerator(audio, video bool, fps int) *flvGenerator { // Start begins the generation routine - i.e. if raw data is given to the input // channel flv tags will be produced and available from the output channel. -func (g *flvGenerator) Start() { - g.isGenerating = true - go g.generate() +func (e *Encoder) Start() { + e.isGenerating = true + go e.generate() } -func (g *flvGenerator) Stop() { - g.isGenerating = false +func (e *Encoder) Stop() { + e.isGenerating = false } // GenHeader generates the flv header and sends it down the output chan for use // This will generally be called once at the start of file writing/transmission. -func (g *flvGenerator) GenHeader() { +func (e *Encoder) GenHeader() { header := Header{ - HasAudio: g.audio, - HasVideo: g.video, + HasAudio: e.audio, + HasVideo: e.video, } - g.outputChan <- header.Bytes() + e.outputChan <- header.Bytes() } // getNextTimestamp generates and returns the next timestamp based on current time -func (g *flvGenerator) getNextTimestamp() (timestamp uint32) { - if g.firstTag { - g.startTime = time.Now() - g.firstTag = false +func (e *Encoder) getNextTimestamp() (timestamp uint32) { + if e.firstTag { + e.startTime = time.Now() + e.firstTag = false return 0 } - return uint32(time.Now().Sub(g.startTime).Seconds() * float64(1000)) + return uint32(time.Now().Sub(e.startTime).Seconds() * float64(1000)) } // http://www.itu.int/rec/dologin_pub.asp?lang=e&id=T-REC-H.264-200305-S!!PDF-E&type=items @@ -211,18 +211,18 @@ func (s *frameScanner) readByte() (b byte, ok bool) { // generate takes in raw video data from the input chan and packetises it into // flv tags, which are then passed to the output channel. -func (g *flvGenerator) generate() { - g.GenHeader() +func (e *Encoder) generate() { + e.GenHeader() var frameType byte var packetType byte - for g.isGenerating { + for e.isGenerating { select { default: time.Sleep(time.Duration(5) * time.Millisecond) - case frame := <-g.inputChan: - timeStamp := g.getNextTimestamp() + case frame := <-e.inputChan: + timeStamp := e.getNextTimestamp() // Do we have video to send off? - if g.video { + if e.video { if isKeyFrame(frame) { frameType = KeyFrameType } else { @@ -246,10 +246,10 @@ func (g *flvGenerator) generate() { Data: frame, PrevTagSize: uint32(videoHeaderSize + len(frame)), } - g.outputChan <- tag.Bytes() + e.outputChan <- tag.Bytes() } // Do we even have some audio to send off ? - if g.audio { + if e.audio { // Not sure why but we need two audio tags for dummy silent audio // TODO: create constants or SoundSize and SoundType parameters tag := AudioTag{ @@ -264,7 +264,7 @@ func (g *flvGenerator) generate() { Data: dummyAudioTag1Data, PrevTagSize: uint32(audioSize), } - g.outputChan <- tag.Bytes() + e.outputChan <- tag.Bytes() tag = AudioTag{ TagType: uint8(AudioTagType), @@ -278,7 +278,7 @@ func (g *flvGenerator) generate() { Data: dummyAudioTag2Data, PrevTagSize: uint32(22), } - g.outputChan <- tag.Bytes() + e.outputChan <- tag.Bytes() } } } diff --git a/revid/revid.go b/revid/revid.go index 3f9d2ff2..0a1a3557 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -215,7 +215,7 @@ func (r *Revid) reset(config Config) error { case Flv: r.Log(Info, "Using FLV packetisation") frameRate, _ := strconv.Atoi(r.config.FrameRate) - r.encoder = flv.NewFlvGenerator(true, true, frameRate) + r.encoder = flv.NewEncoder(true, true, frameRate) } // We have packetization of some sort, so we want to send data to Generator // to perform packetization From 7e1d7f77b8683dc0f5c3c5a943bdfd14057fb8b0 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Sun, 19 Aug 2018 21:28:20 +0930 Subject: [PATCH 4/5] revid,encoding/{flv,mts}: remove in-bound chans for packet encoders --- encoding/encoding.go | 6 +- encoding/flv/encoder.go | 181 +++++++++++++++++----------------------- encoding/mts/encoder.go | 141 ++++++++++++++----------------- revid/revid.go | 29 +++---- 4 files changed, 156 insertions(+), 201 deletions(-) diff --git a/encoding/encoding.go b/encoding/encoding.go index 10233373..746141c2 100644 --- a/encoding/encoding.go +++ b/encoding/encoding.go @@ -28,8 +28,6 @@ LICENSE package encoding type Encoder interface { - InputChan() chan []byte - OutputChan() <-chan []byte - Start() - Stop() + Encode([]byte) error + Stream() <-chan []byte } diff --git a/encoding/flv/encoder.go b/encoding/flv/encoder.go index 45f3a269..3a545677 100644 --- a/encoding/flv/encoder.go +++ b/encoding/flv/encoder.go @@ -53,8 +53,7 @@ var ( // from raw video data type Encoder struct { fps int - inputChan chan []byte - outputChan chan []byte + stream chan []byte audio bool video bool lastTagSize int @@ -64,49 +63,31 @@ type Encoder struct { isGenerating bool } -// InputChan returns the input channel to the generator. This is where the -// raw data frames are entered into the generator -func (e *Encoder) InputChan() chan []byte { - return e.inputChan -} - -// OutputChan retuns the output chan of the generator - this is where the -// flv packets (more specifically tags) are outputted. -func (e *Encoder) OutputChan() <-chan []byte { - return e.outputChan -} - // NewEncoder retuns a new FLV encoder. func NewEncoder(audio, video bool, fps int) *Encoder { - return &Encoder{ - fps: fps, - audio: audio, - video: video, - inputChan: make(chan []byte, inputChanLength), - outputChan: make(chan []byte, outputChanLength), - firstTag: true, + e := Encoder{ + fps: fps, + audio: audio, + video: video, + stream: make(chan []byte, outputChanLength), + firstTag: true, } + e.stream <- e.HeaderBytes() + return &e } -// Start begins the generation routine - i.e. if raw data is given to the input -// channel flv tags will be produced and available from the output channel. -func (e *Encoder) Start() { - e.isGenerating = true - go e.generate() -} - -func (e *Encoder) Stop() { - e.isGenerating = false -} - -// GenHeader generates the flv header and sends it down the output chan for use -// This will generally be called once at the start of file writing/transmission. -func (e *Encoder) GenHeader() { +// HeaderBytes returns the a +func (e *Encoder) HeaderBytes() []byte { header := Header{ HasAudio: e.audio, HasVideo: e.video, } - e.outputChan <- header.Bytes() + return header.Bytes() +} + +// Stream returns a channel of streaming packets. +func (e *Encoder) Stream() <-chan []byte { + return e.stream } // getNextTimestamp generates and returns the next timestamp based on current time @@ -211,75 +192,69 @@ func (s *frameScanner) readByte() (b byte, ok bool) { // generate takes in raw video data from the input chan and packetises it into // flv tags, which are then passed to the output channel. -func (e *Encoder) generate() { - e.GenHeader() +func (e *Encoder) Encode(frame []byte) error { var frameType byte var packetType byte - for e.isGenerating { - select { - default: - time.Sleep(time.Duration(5) * time.Millisecond) - case frame := <-e.inputChan: - timeStamp := e.getNextTimestamp() - // Do we have video to send off? - if e.video { - if isKeyFrame(frame) { - frameType = KeyFrameType - } else { - frameType = InterFrameType - } - if isSequenceHeader(frame) { - packetType = SequenceHeader - } else { - packetType = AVCNALU - } - - tag := VideoTag{ - TagType: uint8(VideoTagType), - DataSize: uint32(len(frame)) + DataHeaderLength, - Timestamp: timeStamp, - TimestampExtended: NoTimestampExtension, - FrameType: frameType, - Codec: H264, - PacketType: packetType, - CompositionTime: 0, - Data: frame, - PrevTagSize: uint32(videoHeaderSize + len(frame)), - } - e.outputChan <- tag.Bytes() - } - // Do we even have some audio to send off ? - if e.audio { - // Not sure why but we need two audio tags for dummy silent audio - // TODO: create constants or SoundSize and SoundType parameters - tag := AudioTag{ - TagType: uint8(AudioTagType), - DataSize: 7, - Timestamp: timeStamp, - TimestampExtended: NoTimestampExtension, - SoundFormat: AACAudioFormat, - SoundRate: 3, - SoundSize: true, - SoundType: true, - Data: dummyAudioTag1Data, - PrevTagSize: uint32(audioSize), - } - e.outputChan <- tag.Bytes() - - tag = AudioTag{ - TagType: uint8(AudioTagType), - DataSize: 21, - Timestamp: timeStamp, - TimestampExtended: NoTimestampExtension, - SoundFormat: AACAudioFormat, - SoundRate: 3, - SoundSize: true, - SoundType: true, - Data: dummyAudioTag2Data, - PrevTagSize: uint32(22), - } - e.outputChan <- tag.Bytes() - } + timeStamp := e.getNextTimestamp() + // Do we have video to send off? + if e.video { + if isKeyFrame(frame) { + frameType = KeyFrameType + } else { + frameType = InterFrameType } + if isSequenceHeader(frame) { + packetType = SequenceHeader + } else { + packetType = AVCNALU + } + + tag := VideoTag{ + TagType: uint8(VideoTagType), + DataSize: uint32(len(frame)) + DataHeaderLength, + Timestamp: timeStamp, + TimestampExtended: NoTimestampExtension, + FrameType: frameType, + Codec: H264, + PacketType: packetType, + CompositionTime: 0, + Data: frame, + PrevTagSize: uint32(videoHeaderSize + len(frame)), + } + e.stream <- tag.Bytes() } + // Do we even have some audio to send off ? + if e.audio { + // Not sure why but we need two audio tags for dummy silent audio + // TODO: create constants or SoundSize and SoundType parameters + tag := AudioTag{ + TagType: uint8(AudioTagType), + DataSize: 7, + Timestamp: timeStamp, + TimestampExtended: NoTimestampExtension, + SoundFormat: AACAudioFormat, + SoundRate: 3, + SoundSize: true, + SoundType: true, + Data: dummyAudioTag1Data, + PrevTagSize: uint32(audioSize), + } + e.stream <- tag.Bytes() + + tag = AudioTag{ + TagType: uint8(AudioTagType), + DataSize: 21, + Timestamp: timeStamp, + TimestampExtended: NoTimestampExtension, + SoundFormat: AACAudioFormat, + SoundRate: 3, + SoundSize: true, + SoundType: true, + Data: dummyAudioTag2Data, + PrevTagSize: uint32(22), + } + e.stream <- tag.Bytes() + } + + return nil } diff --git a/encoding/mts/encoder.go b/encoding/mts/encoder.go index 6512dbd8..334b20da 100644 --- a/encoding/mts/encoder.go +++ b/encoding/mts/encoder.go @@ -156,8 +156,7 @@ const ( // Encoder encapsulates properties of an mpegts generator. type Encoder struct { - outputChan chan []byte - nalInputChan chan []byte + stream chan []byte clock time.Duration frameInterval time.Duration @@ -169,8 +168,7 @@ type Encoder struct { // NewEncoder returns an Encoder with the specified frame rate. func NewEncoder(fps float64) *Encoder { return &Encoder{ - outputChan: make(chan []byte, 1), - nalInputChan: make(chan []byte, 1), + stream: make(chan []byte, 1), frameInterval: time.Duration(float64(time.Second) / fps), ptsOffset: ptsOffset, @@ -183,24 +181,9 @@ func NewEncoder(fps float64) *Encoder { } } -// Start is called when we would like generation to begin, i.e. we would like -// the generator to start taking input data and creating mpegts packets -func (e *Encoder) Start() { - go e.generate() -} - -func (e *Encoder) Stop() {} - -// InputChan returns a handle to the nalInputChan (inputChan) so that nal units -// can be passed to the generator and processed -func (e *Encoder) InputChan() chan []byte { - return e.nalInputChan -} - -// OutputChan returns a handle to the generator output chan where the mpegts -// packets will show up once ready to go -func (e *Encoder) OutputChan() <-chan []byte { - return e.outputChan +// Stream returns a channel of streaming packets. +func (e *Encoder) Stream() <-chan []byte { + return e.stream } const ( @@ -215,65 +198,63 @@ const ( // generate handles the incoming data and generates equivalent mpegts packets - // sending them to the output channel -func (e *Encoder) generate() { - for { - nalu := <-e.nalInputChan - - // Write PAT - patPkt := Packet{ - PUSI: true, - PID: patPid, - CC: e.ccFor(patPid), - AFC: hasPayload, - Payload: patTable, - } - e.outputChan <- patPkt.Bytes() - - // Write PMT. - pmtPkt := Packet{ - PUSI: true, - PID: pmtPid, - CC: e.ccFor(pmtPid), - AFC: hasPayload, - Payload: pmtTable, - } - e.outputChan <- pmtPkt.Bytes() - - // Prepare PES data. - pesPkt := pes.Packet{ - StreamID: streamID, - PDI: hasPTS, - PTS: e.pts(), - Data: nalu, - HeaderLength: 5, - } - buf := pesPkt.Bytes() - - pusi := true - for len(buf) != 0 { - pkt := Packet{ - PUSI: pusi, - PID: videoPid, - RAI: pusi, - CC: e.ccFor(videoPid), - AFC: hasAdaptationField | hasPayload, - PCRF: pusi, - } - n := pkt.FillPayload(buf) - buf = buf[n:] - - if pusi { - // If the packet has a Payload Unit Start Indicator - // flag set then we need to write a PCR. - pkt.PCR = e.pcr() - pusi = false - } - - e.outputChan <- pkt.Bytes() - } - - e.tick() +func (e *Encoder) Encode(nalu []byte) error { + // Write PAT + patPkt := Packet{ + PUSI: true, + PID: patPid, + CC: e.ccFor(patPid), + AFC: hasPayload, + Payload: patTable, } + e.stream <- patPkt.Bytes() + + // Write PMT. + pmtPkt := Packet{ + PUSI: true, + PID: pmtPid, + CC: e.ccFor(pmtPid), + AFC: hasPayload, + Payload: pmtTable, + } + e.stream <- pmtPkt.Bytes() + + // Prepare PES data. + pesPkt := pes.Packet{ + StreamID: streamID, + PDI: hasPTS, + PTS: e.pts(), + Data: nalu, + HeaderLength: 5, + } + buf := pesPkt.Bytes() + + pusi := true + for len(buf) != 0 { + pkt := Packet{ + PUSI: pusi, + PID: videoPid, + RAI: pusi, + CC: e.ccFor(videoPid), + AFC: hasAdaptationField | hasPayload, + PCRF: pusi, + } + n := pkt.FillPayload(buf) + buf = buf[n:] + + if pusi { + // If the packet has a Payload Unit Start Indicator + // flag set then we need to write a PCR. + pkt.PCR = e.pcr() + pusi = false + } + + e.stream <- pkt.Bytes() + } + + e.tick() + + return nil } // tick advances the clock one frame interval. diff --git a/revid/revid.go b/revid/revid.go index 0a1a3557..514a324a 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -257,8 +257,6 @@ func (r *Revid) Start() { go r.outputClips() r.Log(Info, "Starting clip packing routine") go r.packClips() - r.Log(Info, "Starting packetisation encoder") - r.encoder.Start() r.Log(Info, "Setting up input and receiving content") go r.setupInput() } @@ -273,11 +271,6 @@ func (r *Revid) Stop() { r.Log(Info, "Stopping revid!") r.isRunning = false - r.Log(Info, "Stopping encoder!") - if r.encoder != nil { - r.encoder.Stop() - } - r.Log(Info, "Killing input proccess!") // If a cmd process is running, we kill! if r.cmd != nil && r.cmd.Process != nil { @@ -294,9 +287,12 @@ func (r *Revid) getFrameNoPacketization() []byte { // getFramePacketization gets a frame from the generators output chan - the // the encoder being an mpegts or flv encoder depending on the config func (r *Revid) getFramePacketization() []byte { - return <-r.encoder.OutputChan() + return <-r.encoder.Stream() } +// TODO(kortschak): Factor this out to an io.Writer type and remove the Stream chans. +// Also add a no-op encoder that handles non-packeted data. +// // packClips takes data segments; whether that be tsPackets or mjpeg frames and // packs them into clips consisting of the amount frames specified in the config func (r *Revid) packClips() { @@ -306,7 +302,7 @@ func (r *Revid) packClips() { select { // TODO: This is temporary, need to work out how to make this work // for cases when there is not packetisation. - case frame := <-r.encoder.OutputChan(): + case frame := <-r.encoder.Stream(): lenOfFrame := len(frame) if lenOfFrame > ringBufferElementSize { r.Log(Warning, fmt.Sprintf("Frame was too big: %v bytes, getting another one!", lenOfFrame)) @@ -477,7 +473,7 @@ func (r *Revid) startRaspivid() error { r.inputReader = stdout go func() { r.Log(Info, "Reading camera data!") - r.parse(chunkWriter(r.encoder.InputChan()), r.inputReader, 0) + r.parse(chunkWriter{r.encoder}, r.inputReader, 0) r.Log(Info, "Not trying to read from camera anymore!") }() return nil @@ -500,14 +496,19 @@ func (r *Revid) setupInputForFile() error { defer f.Close() // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. - return r.parse(chunkWriter(r.encoder.InputChan()), f, delay) + return r.parse(chunkWriter{r.encoder}, f, delay) } +// TODO(kortschak): Remove this type and revise the signature of +// the parsers to accept an encoding.Encoder. +// // chunkWriter is a shim between the new function-based approach // and the old flow-based approach. -type chunkWriter chan []byte +type chunkWriter struct { + encoding.Encoder +} func (w chunkWriter) Write(b []byte) (int, error) { - w <- b - return len(b), nil + err := w.Encoder.Encode(b) + return len(b), err } From 05ba19e4aa96053f4a4d68632f552edfc787d0c2 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Mon, 20 Aug 2018 07:49:02 +0930 Subject: [PATCH 5/5] encoding->stream: rename packages --- revid/revid.go | 12 ++++++------ {encoding => stream}/encoding.go | 2 +- {encoding => stream}/flv/encoder.go | 0 {encoding => stream}/flv/flv.go | 0 {encoding => stream}/mts/encoder.go | 2 +- {encoding => stream}/mts/mpegts.go | 0 {encoding => stream}/mts/pes/pes.go | 0 {encoding => stream}/mts/pes/pes_test.go | 0 {encoding => stream}/mts/psi/pat.go | 0 {encoding => stream}/mts/psi/pmt.go | 0 10 files changed, 8 insertions(+), 8 deletions(-) rename {encoding => stream}/encoding.go (98%) rename {encoding => stream}/flv/encoder.go (100%) rename {encoding => stream}/flv/flv.go (100%) rename {encoding => stream}/mts/encoder.go (99%) rename {encoding => stream}/mts/mpegts.go (100%) rename {encoding => stream}/mts/pes/pes.go (100%) rename {encoding => stream}/mts/pes/pes_test.go (100%) rename {encoding => stream}/mts/psi/pat.go (100%) rename {encoding => stream}/mts/psi/pmt.go (100%) diff --git a/revid/revid.go b/revid/revid.go index 514a324a..c66c7062 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -38,11 +38,11 @@ import ( "strconv" "time" - "bitbucket.org/ausocean/av/encoding" - "bitbucket.org/ausocean/av/encoding/flv" - "bitbucket.org/ausocean/av/encoding/mts" "bitbucket.org/ausocean/av/parse" "bitbucket.org/ausocean/av/rtmp" + "bitbucket.org/ausocean/av/stream" + "bitbucket.org/ausocean/av/stream/flv" + "bitbucket.org/ausocean/av/stream/mts" "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/ring" ) @@ -90,7 +90,7 @@ type Revid struct { ringBuffer *ring.Buffer config Config isRunning bool - encoder encoding.Encoder + encoder stream.Encoder parse func(dst io.Writer, src io.Reader, delay time.Duration) error cmd *exec.Cmd inputReader io.ReadCloser @@ -500,12 +500,12 @@ func (r *Revid) setupInputForFile() error { } // TODO(kortschak): Remove this type and revise the signature of -// the parsers to accept an encoding.Encoder. +// the parsers to accept an stream.Encoder. // // chunkWriter is a shim between the new function-based approach // and the old flow-based approach. type chunkWriter struct { - encoding.Encoder + stream.Encoder } func (w chunkWriter) Write(b []byte) (int, error) { diff --git a/encoding/encoding.go b/stream/encoding.go similarity index 98% rename from encoding/encoding.go rename to stream/encoding.go index 746141c2..cafcd914 100644 --- a/encoding/encoding.go +++ b/stream/encoding.go @@ -25,7 +25,7 @@ LICENSE along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. */ -package encoding +package stream type Encoder interface { Encode([]byte) error diff --git a/encoding/flv/encoder.go b/stream/flv/encoder.go similarity index 100% rename from encoding/flv/encoder.go rename to stream/flv/encoder.go diff --git a/encoding/flv/flv.go b/stream/flv/flv.go similarity index 100% rename from encoding/flv/flv.go rename to stream/flv/flv.go diff --git a/encoding/mts/encoder.go b/stream/mts/encoder.go similarity index 99% rename from encoding/mts/encoder.go rename to stream/mts/encoder.go index 334b20da..05720139 100644 --- a/encoding/mts/encoder.go +++ b/stream/mts/encoder.go @@ -34,7 +34,7 @@ import ( "math/bits" "time" - "bitbucket.org/ausocean/av/encoding/mts/pes" + "bitbucket.org/ausocean/av/stream/mts/pes" ) const psiPacketSize = 184 diff --git a/encoding/mts/mpegts.go b/stream/mts/mpegts.go similarity index 100% rename from encoding/mts/mpegts.go rename to stream/mts/mpegts.go diff --git a/encoding/mts/pes/pes.go b/stream/mts/pes/pes.go similarity index 100% rename from encoding/mts/pes/pes.go rename to stream/mts/pes/pes.go diff --git a/encoding/mts/pes/pes_test.go b/stream/mts/pes/pes_test.go similarity index 100% rename from encoding/mts/pes/pes_test.go rename to stream/mts/pes/pes_test.go diff --git a/encoding/mts/psi/pat.go b/stream/mts/psi/pat.go similarity index 100% rename from encoding/mts/psi/pat.go rename to stream/mts/psi/pat.go diff --git a/encoding/mts/psi/pmt.go b/stream/mts/psi/pmt.go similarity index 100% rename from encoding/mts/psi/pmt.go rename to stream/mts/psi/pmt.go