encoding: restructure packages handling stream encoding

This commit is contained in:
Dan Kortschak 2018-08-19 20:29:22 +09:30
parent 9e28fd45fd
commit c0f9f7bf7b
17 changed files with 126 additions and 635 deletions

View File

@ -1,6 +1,6 @@
/* /*
NAME NAME
generator.go encoding.go
DESCRIPTION DESCRIPTION
See Readme.md See Readme.md
@ -9,7 +9,7 @@ AUTHOR
Saxon Nelson-Milton <saxon@ausocean.org> Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE LICENSE
generator.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) encoding.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the under the terms of the GNU General Public License as published by the
@ -25,9 +25,9 @@ LICENSE
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/ */
package generator package encoding
type Generator interface { type Encoder interface {
InputChan() chan []byte InputChan() chan []byte
OutputChan() <-chan []byte OutputChan() <-chan []byte
Start() Start()

View File

@ -6,6 +6,7 @@ DESCRIPTION
See Readme.md See Readme.md
AUTHOR AUTHOR
Dan Kortschak <dan@ausocean.org>
Saxon Nelson-Milton <saxon@ausocean.org> Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE LICENSE
@ -24,13 +25,9 @@ LICENSE
You should have received a copy of the GNU General Public License You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/ */
package generator package flv
import ( import "time"
"time"
"bitbucket.org/ausocean/av/flv"
)
const ( const (
inputChanLength = 500 inputChanLength = 500
@ -61,7 +58,7 @@ type flvGenerator struct {
audio bool audio bool
video bool video bool
lastTagSize int lastTagSize int
header flv.Header header Header
startTime time.Time startTime time.Time
firstTag bool firstTag bool
isGenerating bool isGenerating bool
@ -105,7 +102,7 @@ func (g *flvGenerator) Stop() {
// GenHeader generates the flv header and sends it down the output chan for use // GenHeader generates the flv header and sends it down the output chan for use
// This will generally be called once at the start of file writing/transmission. // This will generally be called once at the start of file writing/transmission.
func (g *flvGenerator) GenHeader() { func (g *flvGenerator) GenHeader() {
header := flv.Header{ header := Header{
HasAudio: g.audio, HasAudio: g.audio,
HasVideo: g.video, HasVideo: g.video,
} }
@ -227,23 +224,23 @@ func (g *flvGenerator) generate() {
// Do we have video to send off? // Do we have video to send off?
if g.video { if g.video {
if isKeyFrame(frame) { if isKeyFrame(frame) {
frameType = flv.KeyFrameType frameType = KeyFrameType
} else { } else {
frameType = flv.InterFrameType frameType = InterFrameType
} }
if isSequenceHeader(frame) { if isSequenceHeader(frame) {
packetType = flv.SequenceHeader packetType = SequenceHeader
} else { } else {
packetType = flv.AVCNALU packetType = AVCNALU
} }
tag := flv.VideoTag{ tag := VideoTag{
TagType: uint8(flv.VideoTagType), TagType: uint8(VideoTagType),
DataSize: uint32(len(frame)) + flv.DataHeaderLength, DataSize: uint32(len(frame)) + DataHeaderLength,
Timestamp: timeStamp, Timestamp: timeStamp,
TimestampExtended: flv.NoTimestampExtension, TimestampExtended: NoTimestampExtension,
FrameType: frameType, FrameType: frameType,
Codec: flv.H264, Codec: H264,
PacketType: packetType, PacketType: packetType,
CompositionTime: 0, CompositionTime: 0,
Data: frame, Data: frame,
@ -255,12 +252,12 @@ func (g *flvGenerator) generate() {
if g.audio { if g.audio {
// Not sure why but we need two audio tags for dummy silent audio // Not sure why but we need two audio tags for dummy silent audio
// TODO: create constants or SoundSize and SoundType parameters // TODO: create constants or SoundSize and SoundType parameters
tag := flv.AudioTag{ tag := AudioTag{
TagType: uint8(flv.AudioTagType), TagType: uint8(AudioTagType),
DataSize: 7, DataSize: 7,
Timestamp: timeStamp, Timestamp: timeStamp,
TimestampExtended: flv.NoTimestampExtension, TimestampExtended: NoTimestampExtension,
SoundFormat: flv.AACAudioFormat, SoundFormat: AACAudioFormat,
SoundRate: 3, SoundRate: 3,
SoundSize: true, SoundSize: true,
SoundType: true, SoundType: true,
@ -269,12 +266,12 @@ func (g *flvGenerator) generate() {
} }
g.outputChan <- tag.Bytes() g.outputChan <- tag.Bytes()
tag = flv.AudioTag{ tag = AudioTag{
TagType: uint8(flv.AudioTagType), TagType: uint8(AudioTagType),
DataSize: 21, DataSize: 21,
Timestamp: timeStamp, Timestamp: timeStamp,
TimestampExtended: flv.NoTimestampExtension, TimestampExtended: NoTimestampExtension,
SoundFormat: flv.AACAudioFormat, SoundFormat: AACAudioFormat,
SoundRate: 3, SoundRate: 3,
SoundSize: true, SoundSize: true,
SoundType: true, SoundType: true,

View File

@ -26,7 +26,7 @@ LICENSE
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/ */
package generator package mts
import ( import (
"encoding/binary" "encoding/binary"
@ -34,8 +34,7 @@ import (
"math/bits" "math/bits"
"time" "time"
"bitbucket.org/ausocean/av/mpegts" "bitbucket.org/ausocean/av/encoding/mts/pes"
"bitbucket.org/ausocean/av/pes"
) )
const psiPacketSize = 184 const psiPacketSize = 184
@ -221,7 +220,7 @@ func (g *tsGenerator) generate() {
nalu := <-g.nalInputChan nalu := <-g.nalInputChan
// Write PAT // Write PAT
patPkt := mpegts.Packet{ patPkt := Packet{
PUSI: true, PUSI: true,
PID: patPid, PID: patPid,
CC: g.ccFor(patPid), CC: g.ccFor(patPid),
@ -231,7 +230,7 @@ func (g *tsGenerator) generate() {
g.outputChan <- patPkt.Bytes() g.outputChan <- patPkt.Bytes()
// Write PMT. // Write PMT.
pmtPkt := mpegts.Packet{ pmtPkt := Packet{
PUSI: true, PUSI: true,
PID: pmtPid, PID: pmtPid,
CC: g.ccFor(pmtPid), CC: g.ccFor(pmtPid),
@ -252,7 +251,7 @@ func (g *tsGenerator) generate() {
pusi := true pusi := true
for len(buf) != 0 { for len(buf) != 0 {
pkt := mpegts.Packet{ pkt := Packet{
PUSI: pusi, PUSI: pusi,
PID: videoPid, PID: videoPid,
RAI: pusi, RAI: pusi,

View File

@ -1,6 +1,6 @@
/* /*
NAME NAME
MpegTs.go - provides a data structure intended to encapsulate the properties mpegts.go - provides a data structure intended to encapsulate the properties
of an MpegTs packet and also functions to allow manipulation of these packets. of an MpegTs packet and also functions to allow manipulation of these packets.
DESCRIPTION DESCRIPTION
@ -10,7 +10,7 @@ AUTHOR
Saxon A. Nelson-Milton <saxon.milton@gmail.com> Saxon A. Nelson-Milton <saxon.milton@gmail.com>
LICENSE LICENSE
MpegTs.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) mpegts.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the under the terms of the GNU General Public License as published by the
@ -26,7 +26,7 @@ LICENSE
along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses).
*/ */
package mpegts package mts
const ( const (
mpegTsSize = 188 mpegTsSize = 188

View File

@ -0,0 +1,74 @@
/*
NAME
mpegts_test.go
DESCRIPTION
See Readme.md
AUTHOR
Dan Kortschak <dan@ausocean.org>
Saxon Nelson-Milton <saxon.milton@gmail.com>
LICENSE
mpegts_test.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package pes
import (
"reflect"
"testing"
)
const (
dataLength = 3 // bytes
)
func TestPesToByteSlice(t *testing.T) {
pkt := Packet{
StreamID: 0xE0, // StreamID
PDI: byte(2),
PTS: 100000,
HeaderLength: byte(10),
Stuff: []byte{0xFF, 0xFF},
Data: []byte{0xEA, 0x4B, 0x12},
}
got := pkt.Bytes()
want := []byte{
0x00, // packet start code prefix byte 1
0x00, // packet start code prefix byte 2
0x01, // packet start code prefix byte 3
0xE0, // stream ID
0x00, // PES Packet length byte 1
0x00, // PES packet length byte 2
0x80, // Marker bits,ScramblingControl, Priority, DAI, Copyright, Original
0x80, // PDI, ESCR, ESRate, DSMTrickMode, ACI, CRC, Ext
10, // header length
0x21, // PCR byte 1
0x00, // pcr byte 2
0x07, // pcr byte 3
0x0D, // pcr byte 4
0x41, // pcr byte 5
0xFF, // Stuffing byte 1
0xFF, // stuffing byte 3
0xEA, // data byte 1
0x4B, // data byte 2
0x12, // data byte 3
}
if !reflect.DeepEqual(got, want) {
t.Errorf("unexpected packet encoding:\ngot: %#v\nwant:%#v", got, want)
}
}

View File

@ -1,33 +0,0 @@
/*
NAME
RtpToTsConverter.go - provides utilities for the conversion of Rtp packets
to equivalent MpegTs packets.
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon.milton@gmail.com>
LICENSE
RtpToTsConverter.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses).
*/
package itut
func StartCode1() []byte { return []byte{0x00, 0x00, 0x01} }
func StartCode2() []byte { return []byte{0x00, 0x00, 0x00, 0x01} }
func AUD() []byte { return []byte{0x09, 0xF0} }

View File

@ -1,69 +0,0 @@
/*
NAME
MpegTs.go - provides a data structure intended to encapsulate the properties
of an MpegTs packet.
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon.milton@gmail.com>
LICENSE
MpegTs.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses).
*/
package mpegts
import "testing"
// Just ensure that we can create a byte slice with a mpegts packet correctly
func TestMpegTsToByteSlice(t *testing.T) {
payload := []byte{0x56, 0xA2, 0x78, 0x89, 0x67}
pcr := 100000 // => 100000
stuffing := make([]byte, 171)
for i := range stuffing {
stuffing[i] = 0xFF
}
tsPkt := MpegTsPacket{
PUSI: true,
PID: uint16(256),
AFC: byte(3),
AFL: 7 + 171,
CC: byte(6),
PCRF: true,
PCR: uint64(pcr),
Stuff: stuffing,
Payload: payload,
}
expectedOutput := []byte{0x47, 0x41, 0x00, 0x36, byte(178), 0x10}
for i := 40; i >= 0; i -= 8 {
expectedOutput = append(expectedOutput, byte(pcr>>uint(i)))
}
for i := 0; i < 171; i++ {
expectedOutput = append(expectedOutput, 0xFF)
}
expectedOutput = append(expectedOutput, payload...)
tsPktAsByteSlice, err := tsPkt.ToByteSlice()
if err != nil {
t.Errorf("Should not have got error!: %v", err)
}
for i := 0; i < 188; i++ {
if tsPktAsByteSlice[i] != expectedOutput[i] {
t.Errorf("Conversion to byte slice bad! Byte: %v Wanted: %v Got: %v", i, expectedOutput[i], tsPktAsByteSlice[i])
}
}
}

View File

@ -1,139 +0,0 @@
/*
NAME
h264.go
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE
h264.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package parser
import (
"time"
"bitbucket.org/ausocean/av/itut"
)
const (
inputChanSize = 100000
outputBufferSize = 10000
)
// H264 provides properties and methods to allow for the parsing of a
// h264 stream - i.e. to allow extraction of the individual access units
type H264 struct {
inputBuffer []byte
isParsing bool
parserOutputChanRef chan []byte
userOutputChanRef chan []byte
inputChan chan byte
delay uint
}
// NewH264Parser returns an instance of the H264 struct
func NewH264Parser() (p *H264) {
p = new(H264)
p.isParsing = true
p.inputChan = make(chan byte, inputChanSize)
p.delay = 0
return
}
// Stop simply sets the isParsing flag to false to indicate to the parser that
// we don't want to interpret incoming data anymore - this will also make the
// parser jump out of the parse func
func (p *H264) Stop() {
p.isParsing = false
}
// Start starts the parse func as a goroutine so that incoming data is interpreted
func (p *H264) Start() {
p.isParsing = true
go p.parse()
}
// SetDelay sets a delay inbetween each buffer output. Useful if we're parsing
// a file but want to replicate the speed of incoming video frames from a
// camera
func (p *H264) SetDelay(delay uint) {
p.delay = delay
}
// InputChan returns a handle to the input channel of the parser
func (p *H264) InputChan() chan byte {
return p.inputChan
}
// OutputChan returns a handle to the output chan of the parser
func (p *H264) OutputChan() <-chan []byte {
return p.userOutputChanRef
}
// SetOutputChan sets the parser output chan to the passed output chan. This is
// useful if we want the parser output to go directly to a generator of some sort
// for packetization.
func (p *H264) SetOutputChan(o chan []byte) {
p.parserOutputChanRef = o
p.userOutputChanRef = o
}
// parse interprets an incoming h264 stream and extracts individual frames
// aka access units
func (p *H264) parse() {
outputBuffer := make([]byte, 0, outputBufferSize)
searchingForEnd := false
for p.isParsing {
var aByte uint8
if p.isParsing {
aByte = <-p.inputChan
} else {
return
}
outputBuffer = append(outputBuffer, aByte)
for i := 1; aByte == 0x00 && i != 4; i++ {
if p.isParsing {
aByte = <-p.inputChan
} else {
return
}
outputBuffer = append(outputBuffer, aByte)
if (aByte == 0x01 && i == 2) || (aByte == 0x01 && i == 3) {
if searchingForEnd {
output := append(append(itut.StartCode1(), itut.AUD()...), outputBuffer[:len(outputBuffer)-(i+1)]...)
time.Sleep(time.Duration(p.delay) * time.Millisecond)
p.parserOutputChanRef <- output
outputBuffer = outputBuffer[len(outputBuffer)-1-i:]
searchingForEnd = false
}
if p.isParsing {
aByte = <-p.inputChan
} else {
return
}
outputBuffer = append(outputBuffer, aByte)
if nalType := aByte & 0x1F; nalType == 1 || nalType == 5 || nalType == 8 || nalType == 7 {
searchingForEnd = true
}
}
}
}
}

View File

@ -1,87 +0,0 @@
/*
NAME
mjpeg.go
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE
mjpeg.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package parser
const frameStartCode = 0xD8
type MJPEG struct {
inputBuffer []byte
isParsing bool
parserOutputChanRef chan []byte
userOutputChanRef chan []byte
inputChan chan byte
delay uint
}
func NewMJPEGParser(inputChanLen int) (p *MJPEG) {
p = new(MJPEG)
p.isParsing = true
p.inputChan = make(chan byte, inputChanLen)
return
}
func (p *MJPEG) Stop() {
p.isParsing = false
}
func (p *MJPEG) Start() {
go p.parse()
}
func (p *MJPEG) SetDelay(delay uint) {
p.delay = delay
}
func (p *MJPEG) InputChan() chan byte {
return p.inputChan
}
func (p *MJPEG) OutputChan() <-chan []byte {
return p.userOutputChanRef
}
func (p *MJPEG) SetOutputChan(o chan []byte) {
p.parserOutputChanRef = o
p.userOutputChanRef = o
}
func (p *MJPEG) parse() {
var outputBuffer []byte
for p.isParsing {
aByte := <-p.inputChan
outputBuffer = append(outputBuffer, aByte)
if aByte == 0xFF && len(outputBuffer) != 0 {
aByte := <-p.inputChan
outputBuffer = append(outputBuffer, aByte)
if aByte == frameStartCode {
p.parserOutputChanRef <- outputBuffer[:len(outputBuffer)-2]
outputBuffer = outputBuffer[len(outputBuffer)-2:]
}
}
}
}

View File

@ -1,50 +0,0 @@
/*
NAME
parser.go
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon.milton@gmail.com>
LICENSE
parser.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licensesx).
*/
package parser
import (
"log"
"sync"
)
// h264 consts
const acceptedLength = 1000
var (
Info *log.Logger
mutex *sync.Mutex
)
type Parser interface {
Stop()
Start()
InputChan() chan byte
OutputChan() <-chan []byte
SetOutputChan(achan chan []byte)
SetDelay(delay uint)
}

View File

@ -1,127 +0,0 @@
/*
NAME
parser_test.go
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE
parser_test.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package parser
import (
"fmt"
"io/ioutil"
"log"
"os"
"strconv"
"testing"
)
const (
mjpegInputFileName = "testInput/testInput.avi"
h264fName = "../../test/test-data/av/input/betterInput.h264"
nOfFrames = 4
outChanSize = 100
)
func TestH264Parser(t *testing.T) {
log.SetOutput(os.Stderr)
log.Println("Opening input file!")
inputFile, err := os.Open(h264fName)
if err != nil {
t.Errorf("Should not have got error opening file!")
return
}
log.Println("Reading data from file!")
data, err := ioutil.ReadAll(inputFile)
if err != nil {
t.Errorf("Should not have got read error!")
return
}
// Create 'parser' and start it up
log.Println("Creating parser!")
parser := NewH264Parser()
parser.SetOutputChan(make(chan []byte, outChanSize))
parser.Start()
for i, n := 0, 0; n <= nOfFrames; i++ {
select {
case parser.InputChan() <- data[i]:
case frame := <-parser.OutputChan():
path := fmt.Sprintf("testOutput/" + strconv.Itoa(n) + "h264_frame")
out, err := os.Create(path)
if err != nil {
t.Errorf("Unexpected error creating %q: %v", path, err)
return
}
out.Write(frame)
out.Close()
n++
default:
}
}
}
/*
func TestMJPEGParser(t *testing.T) {
log.Println("Opening input file!")
// Open the input file
inputFile, err := os.Open(testInputFileName)
if err != nil {
t.Errorf("Should not have got error opening file!")
}
log.Println("Getting file stats!")
stats, err := inputFile.Stat()
if err != nil {
t.Errorf("Could not get input file stats!")
return
}
log.Println("Creating space for file data!")
data := make([]byte, stats.Size())
_, err = inputFile.Read(data)
if err != nil {
t.Errorf("Should not have got read error!")
return
}
log.Println("Creating parser!")
parser := NewMJPEGParser(len(data) + 1)
parser.SetOutputChan(make(chan []byte, 10000))
parser.Start()
log.Printf("len(data): %v\n", len(data))
for i := range data {
parser.GetInputChan() <- data[i]
}
log.Println("Writing jpegs to files!")
for i := 0; len(parser.GetOutputChan()) > 0; i++ {
// Open a new output file
out, err := os.Create("testOutput/image" + strconv.Itoa(i) + ".jpeg")
if err != nil {
t.Errorf("Should not have got error creating output file!")
return
}
out.Write(<-parser.GetOutputChan())
out.Close()
}
}
*/

View File

@ -1,76 +0,0 @@
/*
NAME
MpegTs.go - provides a data structure intended to encapsulate the properties
of an MpegTs packet.
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon.milton@gmail.com>
LICENSE
MpegTs.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package pes
import (
"testing"
)
const (
dataLength = 3 // bytes
)
func TestPesToByteSlice(t *testing.T) {
pesPkt := PESPacket{
StreamID: 0xE0, // StreamID
PDI: byte(2),
PTS: 100000,
HeaderLength: byte(10),
Stuff: []byte{0xFF, 0xFF},
Data: []byte{0xEA, 0x4B, 0x12},
}
pesExpectedOutput := []byte{
0x00, // packet start code prefix byte 1
0x00, // packet start code prefix byte 2
0x01, // packet start code prefix byte 3
0xE0, // stream ID
0x00, // PES Packet length byte 1
0x00, // PES packet length byte 2
0x80, // Marker bits,ScramblingControl, Priority, DAI, Copyright, Original
0x80, // PDI, ESCR, ESRate, DSMTrickMode, ACI, CRC, Ext
byte(10), // header length
0x21, // PCR byte 1
0x00, // pcr byte 2
0x07, // pcr byte 3
0x0D, // pcr byte 4
0x41, // pcr byte 5
0xFF, // Stuffing byte 1
0xFF, // stuffing byte 3
0xEA, // data byte 1
0x4B, // data byte 2
0x12, // data byte 3
}
pesPktAsByteSlice := pesPkt.ToByteSlice()
for ii := range pesPktAsByteSlice {
if pesPktAsByteSlice[ii] != pesExpectedOutput[ii] {
t.Errorf("Conversion to byte slice bad! Byte: %v Wanted: %v Got: %v",
ii, pesExpectedOutput[ii], pesPktAsByteSlice[ii])
}
}
}

32
revid/revid.go Executable file → Normal file
View File

@ -38,7 +38,9 @@ import (
"strconv" "strconv"
"time" "time"
"bitbucket.org/ausocean/av/generator" "bitbucket.org/ausocean/av/encoding"
"bitbucket.org/ausocean/av/encoding/flv"
"bitbucket.org/ausocean/av/encoding/mts"
"bitbucket.org/ausocean/av/parse" "bitbucket.org/ausocean/av/parse"
"bitbucket.org/ausocean/av/rtmp" "bitbucket.org/ausocean/av/rtmp"
"bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/iot/pi/netsender"
@ -88,7 +90,7 @@ type Revid struct {
ringBuffer *ring.Buffer ringBuffer *ring.Buffer
config Config config Config
isRunning bool isRunning bool
generator generator.Generator encoder encoding.Encoder
parse func(dst io.Writer, src io.Reader, delay time.Duration) error parse func(dst io.Writer, src io.Reader, delay time.Duration) error
cmd *exec.Cmd cmd *exec.Cmd
inputReader io.ReadCloser inputReader io.ReadCloser
@ -209,11 +211,11 @@ func (r *Revid) reset(config Config) error {
case Mpegts: case Mpegts:
r.Log(Info, "Using MPEGTS packetisation") r.Log(Info, "Using MPEGTS packetisation")
frameRate, _ := strconv.Atoi(r.config.FrameRate) frameRate, _ := strconv.Atoi(r.config.FrameRate)
r.generator = generator.NewTsGenerator(float64(frameRate)) r.encoder = mts.NewTsGenerator(float64(frameRate))
case Flv: case Flv:
r.Log(Info, "Using FLV packetisation") r.Log(Info, "Using FLV packetisation")
frameRate, _ := strconv.Atoi(r.config.FrameRate) frameRate, _ := strconv.Atoi(r.config.FrameRate)
r.generator = generator.NewFlvGenerator(true, true, frameRate) r.encoder = flv.NewFlvGenerator(true, true, frameRate)
} }
// We have packetization of some sort, so we want to send data to Generator // We have packetization of some sort, so we want to send data to Generator
// to perform packetization // to perform packetization
@ -255,8 +257,8 @@ func (r *Revid) Start() {
go r.outputClips() go r.outputClips()
r.Log(Info, "Starting clip packing routine") r.Log(Info, "Starting clip packing routine")
go r.packClips() go r.packClips()
r.Log(Info, "Starting packetisation generator") r.Log(Info, "Starting packetisation encoder")
r.generator.Start() r.encoder.Start()
r.Log(Info, "Setting up input and receiving content") r.Log(Info, "Setting up input and receiving content")
go r.setupInput() go r.setupInput()
} }
@ -271,9 +273,9 @@ func (r *Revid) Stop() {
r.Log(Info, "Stopping revid!") r.Log(Info, "Stopping revid!")
r.isRunning = false r.isRunning = false
r.Log(Info, "Stopping generator!") r.Log(Info, "Stopping encoder!")
if r.generator != nil { if r.encoder != nil {
r.generator.Stop() r.encoder.Stop()
} }
r.Log(Info, "Killing input proccess!") r.Log(Info, "Killing input proccess!")
@ -284,15 +286,15 @@ func (r *Revid) Stop() {
} }
// getFrameNoPacketization gets a frame directly from the revid output chan // getFrameNoPacketization gets a frame directly from the revid output chan
// as we don't need to go through the generator with no packetization settings // as we don't need to go through the encoder with no packetization settings
func (r *Revid) getFrameNoPacketization() []byte { func (r *Revid) getFrameNoPacketization() []byte {
return <-r.outputChan return <-r.outputChan
} }
// getFramePacketization gets a frame from the generators output chan - the // getFramePacketization gets a frame from the generators output chan - the
// the generator being an mpegts or flv generator depending on the config // the encoder being an mpegts or flv encoder depending on the config
func (r *Revid) getFramePacketization() []byte { func (r *Revid) getFramePacketization() []byte {
return <-r.generator.OutputChan() return <-r.encoder.OutputChan()
} }
// packClips takes data segments; whether that be tsPackets or mjpeg frames and // packClips takes data segments; whether that be tsPackets or mjpeg frames and
@ -304,7 +306,7 @@ func (r *Revid) packClips() {
select { select {
// TODO: This is temporary, need to work out how to make this work // TODO: This is temporary, need to work out how to make this work
// for cases when there is not packetisation. // for cases when there is not packetisation.
case frame := <-r.generator.OutputChan(): case frame := <-r.encoder.OutputChan():
lenOfFrame := len(frame) lenOfFrame := len(frame)
if lenOfFrame > ringBufferElementSize { if lenOfFrame > ringBufferElementSize {
r.Log(Warning, fmt.Sprintf("Frame was too big: %v bytes, getting another one!", lenOfFrame)) r.Log(Warning, fmt.Sprintf("Frame was too big: %v bytes, getting another one!", lenOfFrame))
@ -475,7 +477,7 @@ func (r *Revid) startRaspivid() error {
r.inputReader = stdout r.inputReader = stdout
go func() { go func() {
r.Log(Info, "Reading camera data!") r.Log(Info, "Reading camera data!")
r.parse(chunkWriter(r.generator.InputChan()), r.inputReader, 0) r.parse(chunkWriter(r.encoder.InputChan()), r.inputReader, 0)
r.Log(Info, "Not trying to read from camera anymore!") r.Log(Info, "Not trying to read from camera anymore!")
}() }()
return nil return nil
@ -498,7 +500,7 @@ func (r *Revid) setupInputForFile() error {
defer f.Close() defer f.Close()
// TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop.
return r.parse(chunkWriter(r.generator.InputChan()), f, delay) return r.parse(chunkWriter(r.encoder.InputChan()), f, delay)
} }
// chunkWriter is a shim between the new function-based approach // chunkWriter is a shim between the new function-based approach