Merged in encoding (pull request #45)

encoding: remove in-bound chans

Approved-by: Alan Noble <anoble@gmail.com>
This commit is contained in:
kortschak 2018-08-23 04:31:38 +00:00
commit 90fac96048
18 changed files with 446 additions and 1000 deletions

View File

@ -1,288 +0,0 @@
/*
NAME
flv_generator.go
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE
flv_generator.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package generator
import (
"time"
"bitbucket.org/ausocean/av/flv"
)
const (
inputChanLength = 500
outputChanLength = 500
audioSize = 18
videoHeaderSize = 16
)
// Data representing silent audio (required for youtube)
var (
dummyAudioTag1Data = []byte{
0x00, 0x12, 0x08, 0x56, 0xe5, 0x00,
}
dummyAudioTag2Data = []byte{
0x01, 0xdc, 0x00, 0x4c, 0x61, 0x76, 0x63, 0x35, 0x38,
0x2e, 0x36, 0x2e, 0x31, 0x30, 0x32, 0x00, 0x02, 0x30,
0x40, 0x0e,
}
)
// flvGenerator provides properties required for the generation of flv video
// from raw video data
type flvGenerator struct {
fps int
inputChan chan []byte
outputChan chan []byte
audio bool
video bool
lastTagSize int
header flv.Header
startTime time.Time
firstTag bool
isGenerating bool
}
// InputChan returns the input channel to the generator. This is where the
// raw data frames are entered into the generator
func (g *flvGenerator) InputChan() chan []byte {
return g.inputChan
}
// OutputChan retuns the output chan of the generator - this is where the
// flv packets (more specifically tags) are outputted.
func (g *flvGenerator) OutputChan() <-chan []byte {
return g.outputChan
}
// NewFlvGenerator retuns an instance of the flvGenerator struct
func NewFlvGenerator(audio, video bool, fps int) *flvGenerator {
return &flvGenerator{
fps: fps,
audio: audio,
video: video,
inputChan: make(chan []byte, inputChanLength),
outputChan: make(chan []byte, outputChanLength),
firstTag: true,
}
}
// Start begins the generation routine - i.e. if raw data is given to the input
// channel flv tags will be produced and available from the output channel.
func (g *flvGenerator) Start() {
g.isGenerating = true
go g.generate()
}
func (g *flvGenerator) Stop() {
g.isGenerating = false
}
// GenHeader generates the flv header and sends it down the output chan for use
// This will generally be called once at the start of file writing/transmission.
func (g *flvGenerator) GenHeader() {
header := flv.Header{
HasAudio: g.audio,
HasVideo: g.video,
}
g.outputChan <- header.Bytes()
}
// getNextTimestamp generates and returns the next timestamp based on current time
func (g *flvGenerator) getNextTimestamp() (timestamp uint32) {
if g.firstTag {
g.startTime = time.Now()
g.firstTag = false
return 0
}
return uint32(time.Now().Sub(g.startTime).Seconds() * float64(1000))
}
// http://www.itu.int/rec/dologin_pub.asp?lang=e&id=T-REC-H.264-200305-S!!PDF-E&type=items
// Table 7-1 NAL unit type codes
const (
nonIdrPic = 1
idrPic = 5
suppEnhInf = 6
seqParamSet = 7
paramSet = 8
)
// isKeyFrame returns true if the passed frame data represents that of a keyframe
// FIXME(kortschak): Clarify and document the logic of this functions.
func isKeyFrame(frame []byte) bool {
sc := frameScanner{buf: frame}
for {
b, ok := sc.readByte()
if !ok {
return false
}
for i := 1; b == 0x00 && i < 4; i++ {
b, ok = sc.readByte()
if !ok {
return false
}
if b != 0x01 || (i != 3 && i != 2) {
continue
}
b, ok = sc.readByte()
if !ok {
return false
}
switch nalTyp := b & 0x1f; nalTyp {
case idrPic, suppEnhInf:
return true
case nonIdrPic:
return false
}
}
}
return false
}
// isSequenceHeader returns true if the passed frame data represents that of a
// a sequence header.
// FIXME(kortschak): Clarify and document the logic of this functions.
func isSequenceHeader(frame []byte) bool {
sc := frameScanner{buf: frame}
for {
b, ok := sc.readByte()
if !ok {
return false
}
for i := 1; b == 0x00 && i != 4; i++ {
b, ok = sc.readByte()
if !ok {
return false
}
if b != 0x01 || (i != 2 && i != 3) {
continue
}
b, ok = sc.readByte()
if !ok {
return false
}
switch nalTyp := b & 0x1f; nalTyp {
case suppEnhInf, seqParamSet, paramSet:
return true
case nonIdrPic, idrPic:
return false
}
}
}
}
type frameScanner struct {
off int
buf []byte
}
func (s *frameScanner) readByte() (b byte, ok bool) {
if s.off >= len(s.buf) {
return 0, false
}
b = s.buf[s.off]
s.off++
return b, true
}
// generate takes in raw video data from the input chan and packetises it into
// flv tags, which are then passed to the output channel.
func (g *flvGenerator) generate() {
g.GenHeader()
var frameType byte
var packetType byte
for g.isGenerating {
select {
default:
time.Sleep(time.Duration(5) * time.Millisecond)
case frame := <-g.inputChan:
timeStamp := g.getNextTimestamp()
// Do we have video to send off?
if g.video {
if isKeyFrame(frame) {
frameType = flv.KeyFrameType
} else {
frameType = flv.InterFrameType
}
if isSequenceHeader(frame) {
packetType = flv.SequenceHeader
} else {
packetType = flv.AVCNALU
}
tag := flv.VideoTag{
TagType: uint8(flv.VideoTagType),
DataSize: uint32(len(frame)) + flv.DataHeaderLength,
Timestamp: timeStamp,
TimestampExtended: flv.NoTimestampExtension,
FrameType: frameType,
Codec: flv.H264,
PacketType: packetType,
CompositionTime: 0,
Data: frame,
PrevTagSize: uint32(videoHeaderSize + len(frame)),
}
g.outputChan <- tag.Bytes()
}
// Do we even have some audio to send off ?
if g.audio {
// Not sure why but we need two audio tags for dummy silent audio
// TODO: create constants or SoundSize and SoundType parameters
tag := flv.AudioTag{
TagType: uint8(flv.AudioTagType),
DataSize: 7,
Timestamp: timeStamp,
TimestampExtended: flv.NoTimestampExtension,
SoundFormat: flv.AACAudioFormat,
SoundRate: 3,
SoundSize: true,
SoundType: true,
Data: dummyAudioTag1Data,
PrevTagSize: uint32(audioSize),
}
g.outputChan <- tag.Bytes()
tag = flv.AudioTag{
TagType: uint8(flv.AudioTagType),
DataSize: 21,
Timestamp: timeStamp,
TimestampExtended: flv.NoTimestampExtension,
SoundFormat: flv.AACAudioFormat,
SoundRate: 3,
SoundSize: true,
SoundType: true,
Data: dummyAudioTag2Data,
PrevTagSize: uint32(22),
}
g.outputChan <- tag.Bytes()
}
}
}
}

View File

@ -1,33 +0,0 @@
/*
NAME
RtpToTsConverter.go - provides utilities for the conversion of Rtp packets
to equivalent MpegTs packets.
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon.milton@gmail.com>
LICENSE
RtpToTsConverter.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses).
*/
package itut
func StartCode1() []byte { return []byte{0x00, 0x00, 0x01} }
func StartCode2() []byte { return []byte{0x00, 0x00, 0x00, 0x01} }
func AUD() []byte { return []byte{0x09, 0xF0} }

View File

@ -1,69 +0,0 @@
/*
NAME
MpegTs.go - provides a data structure intended to encapsulate the properties
of an MpegTs packet.
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon.milton@gmail.com>
LICENSE
MpegTs.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses).
*/
package mpegts
import "testing"
// Just ensure that we can create a byte slice with a mpegts packet correctly
func TestMpegTsToByteSlice(t *testing.T) {
payload := []byte{0x56, 0xA2, 0x78, 0x89, 0x67}
pcr := 100000 // => 100000
stuffing := make([]byte, 171)
for i := range stuffing {
stuffing[i] = 0xFF
}
tsPkt := MpegTsPacket{
PUSI: true,
PID: uint16(256),
AFC: byte(3),
AFL: 7 + 171,
CC: byte(6),
PCRF: true,
PCR: uint64(pcr),
Stuff: stuffing,
Payload: payload,
}
expectedOutput := []byte{0x47, 0x41, 0x00, 0x36, byte(178), 0x10}
for i := 40; i >= 0; i -= 8 {
expectedOutput = append(expectedOutput, byte(pcr>>uint(i)))
}
for i := 0; i < 171; i++ {
expectedOutput = append(expectedOutput, 0xFF)
}
expectedOutput = append(expectedOutput, payload...)
tsPktAsByteSlice, err := tsPkt.ToByteSlice()
if err != nil {
t.Errorf("Should not have got error!: %v", err)
}
for i := 0; i < 188; i++ {
if tsPktAsByteSlice[i] != expectedOutput[i] {
t.Errorf("Conversion to byte slice bad! Byte: %v Wanted: %v Got: %v", i, expectedOutput[i], tsPktAsByteSlice[i])
}
}
}

View File

@ -1,139 +0,0 @@
/*
NAME
h264.go
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE
h264.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package parser
import (
"time"
"bitbucket.org/ausocean/av/itut"
)
const (
inputChanSize = 100000
outputBufferSize = 10000
)
// H264 provides properties and methods to allow for the parsing of a
// h264 stream - i.e. to allow extraction of the individual access units
type H264 struct {
inputBuffer []byte
isParsing bool
parserOutputChanRef chan []byte
userOutputChanRef chan []byte
inputChan chan byte
delay uint
}
// NewH264Parser returns an instance of the H264 struct
func NewH264Parser() (p *H264) {
p = new(H264)
p.isParsing = true
p.inputChan = make(chan byte, inputChanSize)
p.delay = 0
return
}
// Stop simply sets the isParsing flag to false to indicate to the parser that
// we don't want to interpret incoming data anymore - this will also make the
// parser jump out of the parse func
func (p *H264) Stop() {
p.isParsing = false
}
// Start starts the parse func as a goroutine so that incoming data is interpreted
func (p *H264) Start() {
p.isParsing = true
go p.parse()
}
// SetDelay sets a delay inbetween each buffer output. Useful if we're parsing
// a file but want to replicate the speed of incoming video frames from a
// camera
func (p *H264) SetDelay(delay uint) {
p.delay = delay
}
// InputChan returns a handle to the input channel of the parser
func (p *H264) InputChan() chan byte {
return p.inputChan
}
// OutputChan returns a handle to the output chan of the parser
func (p *H264) OutputChan() <-chan []byte {
return p.userOutputChanRef
}
// SetOutputChan sets the parser output chan to the passed output chan. This is
// useful if we want the parser output to go directly to a generator of some sort
// for packetization.
func (p *H264) SetOutputChan(o chan []byte) {
p.parserOutputChanRef = o
p.userOutputChanRef = o
}
// parse interprets an incoming h264 stream and extracts individual frames
// aka access units
func (p *H264) parse() {
outputBuffer := make([]byte, 0, outputBufferSize)
searchingForEnd := false
for p.isParsing {
var aByte uint8
if p.isParsing {
aByte = <-p.inputChan
} else {
return
}
outputBuffer = append(outputBuffer, aByte)
for i := 1; aByte == 0x00 && i != 4; i++ {
if p.isParsing {
aByte = <-p.inputChan
} else {
return
}
outputBuffer = append(outputBuffer, aByte)
if (aByte == 0x01 && i == 2) || (aByte == 0x01 && i == 3) {
if searchingForEnd {
output := append(append(itut.StartCode1(), itut.AUD()...), outputBuffer[:len(outputBuffer)-(i+1)]...)
time.Sleep(time.Duration(p.delay) * time.Millisecond)
p.parserOutputChanRef <- output
outputBuffer = outputBuffer[len(outputBuffer)-1-i:]
searchingForEnd = false
}
if p.isParsing {
aByte = <-p.inputChan
} else {
return
}
outputBuffer = append(outputBuffer, aByte)
if nalType := aByte & 0x1F; nalType == 1 || nalType == 5 || nalType == 8 || nalType == 7 {
searchingForEnd = true
}
}
}
}
}

View File

@ -1,87 +0,0 @@
/*
NAME
mjpeg.go
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE
mjpeg.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package parser
const frameStartCode = 0xD8
type MJPEG struct {
inputBuffer []byte
isParsing bool
parserOutputChanRef chan []byte
userOutputChanRef chan []byte
inputChan chan byte
delay uint
}
func NewMJPEGParser(inputChanLen int) (p *MJPEG) {
p = new(MJPEG)
p.isParsing = true
p.inputChan = make(chan byte, inputChanLen)
return
}
func (p *MJPEG) Stop() {
p.isParsing = false
}
func (p *MJPEG) Start() {
go p.parse()
}
func (p *MJPEG) SetDelay(delay uint) {
p.delay = delay
}
func (p *MJPEG) InputChan() chan byte {
return p.inputChan
}
func (p *MJPEG) OutputChan() <-chan []byte {
return p.userOutputChanRef
}
func (p *MJPEG) SetOutputChan(o chan []byte) {
p.parserOutputChanRef = o
p.userOutputChanRef = o
}
func (p *MJPEG) parse() {
var outputBuffer []byte
for p.isParsing {
aByte := <-p.inputChan
outputBuffer = append(outputBuffer, aByte)
if aByte == 0xFF && len(outputBuffer) != 0 {
aByte := <-p.inputChan
outputBuffer = append(outputBuffer, aByte)
if aByte == frameStartCode {
p.parserOutputChanRef <- outputBuffer[:len(outputBuffer)-2]
outputBuffer = outputBuffer[len(outputBuffer)-2:]
}
}
}
}

View File

@ -1,50 +0,0 @@
/*
NAME
parser.go
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon.milton@gmail.com>
LICENSE
parser.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licensesx).
*/
package parser
import (
"log"
"sync"
)
// h264 consts
const acceptedLength = 1000
var (
Info *log.Logger
mutex *sync.Mutex
)
type Parser interface {
Stop()
Start()
InputChan() chan byte
OutputChan() <-chan []byte
SetOutputChan(achan chan []byte)
SetDelay(delay uint)
}

View File

@ -1,127 +0,0 @@
/*
NAME
parser_test.go
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE
parser_test.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package parser
import (
"fmt"
"io/ioutil"
"log"
"os"
"strconv"
"testing"
)
const (
mjpegInputFileName = "testInput/testInput.avi"
h264fName = "../../test/test-data/av/input/betterInput.h264"
nOfFrames = 4
outChanSize = 100
)
func TestH264Parser(t *testing.T) {
log.SetOutput(os.Stderr)
log.Println("Opening input file!")
inputFile, err := os.Open(h264fName)
if err != nil {
t.Errorf("Should not have got error opening file!")
return
}
log.Println("Reading data from file!")
data, err := ioutil.ReadAll(inputFile)
if err != nil {
t.Errorf("Should not have got read error!")
return
}
// Create 'parser' and start it up
log.Println("Creating parser!")
parser := NewH264Parser()
parser.SetOutputChan(make(chan []byte, outChanSize))
parser.Start()
for i, n := 0, 0; n <= nOfFrames; i++ {
select {
case parser.InputChan() <- data[i]:
case frame := <-parser.OutputChan():
path := fmt.Sprintf("testOutput/" + strconv.Itoa(n) + "h264_frame")
out, err := os.Create(path)
if err != nil {
t.Errorf("Unexpected error creating %q: %v", path, err)
return
}
out.Write(frame)
out.Close()
n++
default:
}
}
}
/*
func TestMJPEGParser(t *testing.T) {
log.Println("Opening input file!")
// Open the input file
inputFile, err := os.Open(testInputFileName)
if err != nil {
t.Errorf("Should not have got error opening file!")
}
log.Println("Getting file stats!")
stats, err := inputFile.Stat()
if err != nil {
t.Errorf("Could not get input file stats!")
return
}
log.Println("Creating space for file data!")
data := make([]byte, stats.Size())
_, err = inputFile.Read(data)
if err != nil {
t.Errorf("Should not have got read error!")
return
}
log.Println("Creating parser!")
parser := NewMJPEGParser(len(data) + 1)
parser.SetOutputChan(make(chan []byte, 10000))
parser.Start()
log.Printf("len(data): %v\n", len(data))
for i := range data {
parser.GetInputChan() <- data[i]
}
log.Println("Writing jpegs to files!")
for i := 0; len(parser.GetOutputChan()) > 0; i++ {
// Open a new output file
out, err := os.Create("testOutput/image" + strconv.Itoa(i) + ".jpeg")
if err != nil {
t.Errorf("Should not have got error creating output file!")
return
}
out.Write(<-parser.GetOutputChan())
out.Close()
}
}
*/

View File

@ -1,76 +0,0 @@
/*
NAME
MpegTs.go - provides a data structure intended to encapsulate the properties
of an MpegTs packet.
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon.milton@gmail.com>
LICENSE
MpegTs.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package pes
import (
"testing"
)
const (
dataLength = 3 // bytes
)
func TestPesToByteSlice(t *testing.T) {
pesPkt := PESPacket{
StreamID: 0xE0, // StreamID
PDI: byte(2),
PTS: 100000,
HeaderLength: byte(10),
Stuff: []byte{0xFF, 0xFF},
Data: []byte{0xEA, 0x4B, 0x12},
}
pesExpectedOutput := []byte{
0x00, // packet start code prefix byte 1
0x00, // packet start code prefix byte 2
0x01, // packet start code prefix byte 3
0xE0, // stream ID
0x00, // PES Packet length byte 1
0x00, // PES packet length byte 2
0x80, // Marker bits,ScramblingControl, Priority, DAI, Copyright, Original
0x80, // PDI, ESCR, ESRate, DSMTrickMode, ACI, CRC, Ext
byte(10), // header length
0x21, // PCR byte 1
0x00, // pcr byte 2
0x07, // pcr byte 3
0x0D, // pcr byte 4
0x41, // pcr byte 5
0xFF, // Stuffing byte 1
0xFF, // stuffing byte 3
0xEA, // data byte 1
0x4B, // data byte 2
0x12, // data byte 3
}
pesPktAsByteSlice := pesPkt.ToByteSlice()
for ii := range pesPktAsByteSlice {
if pesPktAsByteSlice[ii] != pesExpectedOutput[ii] {
t.Errorf("Conversion to byte slice bad! Byte: %v Wanted: %v Got: %v",
ii, pesExpectedOutput[ii], pesPktAsByteSlice[ii])
}
}
}

45
revid/revid.go Executable file → Normal file
View File

@ -38,9 +38,11 @@ import (
"strconv" "strconv"
"time" "time"
"bitbucket.org/ausocean/av/generator"
"bitbucket.org/ausocean/av/parse" "bitbucket.org/ausocean/av/parse"
"bitbucket.org/ausocean/av/rtmp" "bitbucket.org/ausocean/av/rtmp"
"bitbucket.org/ausocean/av/stream"
"bitbucket.org/ausocean/av/stream/flv"
"bitbucket.org/ausocean/av/stream/mts"
"bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/ring" "bitbucket.org/ausocean/utils/ring"
) )
@ -88,7 +90,7 @@ type Revid struct {
ringBuffer *ring.Buffer ringBuffer *ring.Buffer
config Config config Config
isRunning bool isRunning bool
generator generator.Generator encoder stream.Encoder
parse func(dst io.Writer, src io.Reader, delay time.Duration) error parse func(dst io.Writer, src io.Reader, delay time.Duration) error
cmd *exec.Cmd cmd *exec.Cmd
inputReader io.ReadCloser inputReader io.ReadCloser
@ -208,12 +210,12 @@ func (r *Revid) reset(config Config) error {
return nil return nil
case Mpegts: case Mpegts:
r.Log(Info, "Using MPEGTS packetisation") r.Log(Info, "Using MPEGTS packetisation")
frameRate, _ := strconv.Atoi(r.config.FrameRate) frameRate, _ := strconv.ParseFloat(r.config.FrameRate, 64)
r.generator = generator.NewTsGenerator(float64(frameRate)) r.encoder = mts.NewEncoder(frameRate)
case Flv: case Flv:
r.Log(Info, "Using FLV packetisation") r.Log(Info, "Using FLV packetisation")
frameRate, _ := strconv.Atoi(r.config.FrameRate) frameRate, _ := strconv.Atoi(r.config.FrameRate)
r.generator = generator.NewFlvGenerator(true, true, frameRate) r.encoder = flv.NewEncoder(true, true, frameRate)
} }
// We have packetization of some sort, so we want to send data to Generator // We have packetization of some sort, so we want to send data to Generator
// to perform packetization // to perform packetization
@ -255,8 +257,6 @@ func (r *Revid) Start() {
go r.outputClips() go r.outputClips()
r.Log(Info, "Starting clip packing routine") r.Log(Info, "Starting clip packing routine")
go r.packClips() go r.packClips()
r.Log(Info, "Starting packetisation generator")
r.generator.Start()
r.Log(Info, "Setting up input and receiving content") r.Log(Info, "Setting up input and receiving content")
go r.setupInput() go r.setupInput()
} }
@ -271,11 +271,6 @@ func (r *Revid) Stop() {
r.Log(Info, "Stopping revid!") r.Log(Info, "Stopping revid!")
r.isRunning = false r.isRunning = false
r.Log(Info, "Stopping generator!")
if r.generator != nil {
r.generator.Stop()
}
r.Log(Info, "Killing input proccess!") r.Log(Info, "Killing input proccess!")
// If a cmd process is running, we kill! // If a cmd process is running, we kill!
if r.cmd != nil && r.cmd.Process != nil { if r.cmd != nil && r.cmd.Process != nil {
@ -284,17 +279,20 @@ func (r *Revid) Stop() {
} }
// getFrameNoPacketization gets a frame directly from the revid output chan // getFrameNoPacketization gets a frame directly from the revid output chan
// as we don't need to go through the generator with no packetization settings // as we don't need to go through the encoder with no packetization settings
func (r *Revid) getFrameNoPacketization() []byte { func (r *Revid) getFrameNoPacketization() []byte {
return <-r.outputChan return <-r.outputChan
} }
// getFramePacketization gets a frame from the generators output chan - the // getFramePacketization gets a frame from the generators output chan - the
// the generator being an mpegts or flv generator depending on the config // the encoder being an mpegts or flv encoder depending on the config
func (r *Revid) getFramePacketization() []byte { func (r *Revid) getFramePacketization() []byte {
return <-r.generator.OutputChan() return <-r.encoder.Stream()
} }
// TODO(kortschak): Factor this out to an io.Writer type and remove the Stream chans.
// Also add a no-op encoder that handles non-packeted data.
//
// packClips takes data segments; whether that be tsPackets or mjpeg frames and // packClips takes data segments; whether that be tsPackets or mjpeg frames and
// packs them into clips consisting of the amount frames specified in the config // packs them into clips consisting of the amount frames specified in the config
func (r *Revid) packClips() { func (r *Revid) packClips() {
@ -304,7 +302,7 @@ func (r *Revid) packClips() {
select { select {
// TODO: This is temporary, need to work out how to make this work // TODO: This is temporary, need to work out how to make this work
// for cases when there is not packetisation. // for cases when there is not packetisation.
case frame := <-r.generator.OutputChan(): case frame := <-r.encoder.Stream():
lenOfFrame := len(frame) lenOfFrame := len(frame)
if lenOfFrame > ringBufferElementSize { if lenOfFrame > ringBufferElementSize {
r.Log(Warning, fmt.Sprintf("Frame was too big: %v bytes, getting another one!", lenOfFrame)) r.Log(Warning, fmt.Sprintf("Frame was too big: %v bytes, getting another one!", lenOfFrame))
@ -475,7 +473,7 @@ func (r *Revid) startRaspivid() error {
r.inputReader = stdout r.inputReader = stdout
go func() { go func() {
r.Log(Info, "Reading camera data!") r.Log(Info, "Reading camera data!")
r.parse(chunkWriter(r.generator.InputChan()), r.inputReader, 0) r.parse(chunkWriter{r.encoder}, r.inputReader, 0)
r.Log(Info, "Not trying to read from camera anymore!") r.Log(Info, "Not trying to read from camera anymore!")
}() }()
return nil return nil
@ -498,14 +496,19 @@ func (r *Revid) setupInputForFile() error {
defer f.Close() defer f.Close()
// TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop.
return r.parse(chunkWriter(r.generator.InputChan()), f, delay) return r.parse(chunkWriter{r.encoder}, f, delay)
} }
// TODO(kortschak): Remove this type and revise the signature of
// the parsers to accept an stream.Encoder.
//
// chunkWriter is a shim between the new function-based approach // chunkWriter is a shim between the new function-based approach
// and the old flow-based approach. // and the old flow-based approach.
type chunkWriter chan []byte type chunkWriter struct {
stream.Encoder
}
func (w chunkWriter) Write(b []byte) (int, error) { func (w chunkWriter) Write(b []byte) (int, error) {
w <- b err := w.Encoder.Encode(b)
return len(b), nil return len(b), err
} }

View File

@ -1,6 +1,6 @@
/* /*
NAME NAME
generator.go encoding.go
DESCRIPTION DESCRIPTION
See Readme.md See Readme.md
@ -9,7 +9,7 @@ AUTHOR
Saxon Nelson-Milton <saxon@ausocean.org> Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE LICENSE
generator.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) encoding.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the under the terms of the GNU General Public License as published by the
@ -25,11 +25,9 @@ LICENSE
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/ */
package generator package stream
type Generator interface { type Encoder interface {
InputChan() chan []byte Encode([]byte) error
OutputChan() <-chan []byte Stream() <-chan []byte
Start()
Stop()
} }

260
stream/flv/encoder.go Normal file
View File

@ -0,0 +1,260 @@
/*
NAME
flv_generator.go
DESCRIPTION
See Readme.md
AUTHOR
Dan Kortschak <dan@ausocean.org>
Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE
flv_generator.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package flv
import "time"
const (
inputChanLength = 500
outputChanLength = 500
audioSize = 18
videoHeaderSize = 16
)
// Data representing silent audio (required for youtube)
var (
dummyAudioTag1Data = []byte{
0x00, 0x12, 0x08, 0x56, 0xe5, 0x00,
}
dummyAudioTag2Data = []byte{
0x01, 0xdc, 0x00, 0x4c, 0x61, 0x76, 0x63, 0x35, 0x38,
0x2e, 0x36, 0x2e, 0x31, 0x30, 0x32, 0x00, 0x02, 0x30,
0x40, 0x0e,
}
)
// Encoder provides properties required for the generation of flv video
// from raw video data
type Encoder struct {
fps int
stream chan []byte
audio bool
video bool
lastTagSize int
header Header
startTime time.Time
firstTag bool
isGenerating bool
}
// NewEncoder retuns a new FLV encoder.
func NewEncoder(audio, video bool, fps int) *Encoder {
e := Encoder{
fps: fps,
audio: audio,
video: video,
stream: make(chan []byte, outputChanLength),
firstTag: true,
}
e.stream <- e.HeaderBytes()
return &e
}
// HeaderBytes returns the a
func (e *Encoder) HeaderBytes() []byte {
header := Header{
HasAudio: e.audio,
HasVideo: e.video,
}
return header.Bytes()
}
// Stream returns a channel of streaming packets.
func (e *Encoder) Stream() <-chan []byte {
return e.stream
}
// getNextTimestamp generates and returns the next timestamp based on current time
func (e *Encoder) getNextTimestamp() (timestamp uint32) {
if e.firstTag {
e.startTime = time.Now()
e.firstTag = false
return 0
}
return uint32(time.Now().Sub(e.startTime).Seconds() * float64(1000))
}
// http://www.itu.int/rec/dologin_pub.asp?lang=e&id=T-REC-H.264-200305-S!!PDF-E&type=items
// Table 7-1 NAL unit type codes
const (
nonIdrPic = 1
idrPic = 5
suppEnhInf = 6
seqParamSet = 7
paramSet = 8
)
// isKeyFrame returns true if the passed frame data represents that of a keyframe
// FIXME(kortschak): Clarify and document the logic of this functions.
func isKeyFrame(frame []byte) bool {
sc := frameScanner{buf: frame}
for {
b, ok := sc.readByte()
if !ok {
return false
}
for i := 1; b == 0x00 && i < 4; i++ {
b, ok = sc.readByte()
if !ok {
return false
}
if b != 0x01 || (i != 3 && i != 2) {
continue
}
b, ok = sc.readByte()
if !ok {
return false
}
switch nalTyp := b & 0x1f; nalTyp {
case idrPic, suppEnhInf:
return true
case nonIdrPic:
return false
}
}
}
return false
}
// isSequenceHeader returns true if the passed frame data represents that of a
// a sequence header.
// FIXME(kortschak): Clarify and document the logic of this functions.
func isSequenceHeader(frame []byte) bool {
sc := frameScanner{buf: frame}
for {
b, ok := sc.readByte()
if !ok {
return false
}
for i := 1; b == 0x00 && i != 4; i++ {
b, ok = sc.readByte()
if !ok {
return false
}
if b != 0x01 || (i != 2 && i != 3) {
continue
}
b, ok = sc.readByte()
if !ok {
return false
}
switch nalTyp := b & 0x1f; nalTyp {
case suppEnhInf, seqParamSet, paramSet:
return true
case nonIdrPic, idrPic:
return false
}
}
}
}
type frameScanner struct {
off int
buf []byte
}
func (s *frameScanner) readByte() (b byte, ok bool) {
if s.off >= len(s.buf) {
return 0, false
}
b = s.buf[s.off]
s.off++
return b, true
}
// generate takes in raw video data from the input chan and packetises it into
// flv tags, which are then passed to the output channel.
func (e *Encoder) Encode(frame []byte) error {
var frameType byte
var packetType byte
timeStamp := e.getNextTimestamp()
// Do we have video to send off?
if e.video {
if isKeyFrame(frame) {
frameType = KeyFrameType
} else {
frameType = InterFrameType
}
if isSequenceHeader(frame) {
packetType = SequenceHeader
} else {
packetType = AVCNALU
}
tag := VideoTag{
TagType: uint8(VideoTagType),
DataSize: uint32(len(frame)) + DataHeaderLength,
Timestamp: timeStamp,
TimestampExtended: NoTimestampExtension,
FrameType: frameType,
Codec: H264,
PacketType: packetType,
CompositionTime: 0,
Data: frame,
PrevTagSize: uint32(videoHeaderSize + len(frame)),
}
e.stream <- tag.Bytes()
}
// Do we even have some audio to send off ?
if e.audio {
// Not sure why but we need two audio tags for dummy silent audio
// TODO: create constants or SoundSize and SoundType parameters
tag := AudioTag{
TagType: uint8(AudioTagType),
DataSize: 7,
Timestamp: timeStamp,
TimestampExtended: NoTimestampExtension,
SoundFormat: AACAudioFormat,
SoundRate: 3,
SoundSize: true,
SoundType: true,
Data: dummyAudioTag1Data,
PrevTagSize: uint32(audioSize),
}
e.stream <- tag.Bytes()
tag = AudioTag{
TagType: uint8(AudioTagType),
DataSize: 21,
Timestamp: timeStamp,
TimestampExtended: NoTimestampExtension,
SoundFormat: AACAudioFormat,
SoundRate: 3,
SoundSize: true,
SoundType: true,
Data: dummyAudioTag2Data,
PrevTagSize: uint32(22),
}
e.stream <- tag.Bytes()
}
return nil
}

View File

@ -1,6 +1,6 @@
/* /*
NAME NAME
mpegts_generator.go encoder.go
DESCRIPTION DESCRIPTION
See Readme.md See Readme.md
@ -10,7 +10,7 @@ AUTHOR
Saxon Nelson-Milton <saxon@ausocean.org> Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE LICENSE
mpegts_generator.go is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean) encoder.go is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the under the terms of the GNU General Public License as published by the
@ -26,7 +26,7 @@ LICENSE
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/ */
package generator package mts
import ( import (
"encoding/binary" "encoding/binary"
@ -34,8 +34,7 @@ import (
"math/bits" "math/bits"
"time" "time"
"bitbucket.org/ausocean/av/mpegts" "bitbucket.org/ausocean/av/stream/mts/pes"
"bitbucket.org/ausocean/av/pes"
) )
const psiPacketSize = 184 const psiPacketSize = 184
@ -155,10 +154,9 @@ const (
pcrFreq = 90000 // Hz pcrFreq = 90000 // Hz
) )
// tsGenerator encapsulates properties of an mpegts generator. // Encoder encapsulates properties of an mpegts generator.
type tsGenerator struct { type Encoder struct {
outputChan chan []byte stream chan []byte
nalInputChan chan []byte
clock time.Duration clock time.Duration
frameInterval time.Duration frameInterval time.Duration
@ -167,11 +165,10 @@ type tsGenerator struct {
continuity map[int]byte continuity map[int]byte
} }
// NewTsGenerator returns an instance of the tsGenerator struct // NewEncoder returns an Encoder with the specified frame rate.
func NewTsGenerator(fps float64) (g *tsGenerator) { func NewEncoder(fps float64) *Encoder {
return &tsGenerator{ return &Encoder{
outputChan: make(chan []byte, 1), stream: make(chan []byte, 1),
nalInputChan: make(chan []byte, 1),
frameInterval: time.Duration(float64(time.Second) / fps), frameInterval: time.Duration(float64(time.Second) / fps),
ptsOffset: ptsOffset, ptsOffset: ptsOffset,
@ -184,24 +181,9 @@ func NewTsGenerator(fps float64) (g *tsGenerator) {
} }
} }
// Start is called when we would like generation to begin, i.e. we would like // Stream returns a channel of streaming packets.
// the generator to start taking input data and creating mpegts packets func (e *Encoder) Stream() <-chan []byte {
func (g *tsGenerator) Start() { return e.stream
go g.generate()
}
func (g *tsGenerator) Stop() {}
// InputChan returns a handle to the nalInputChan (inputChan) so that nal units
// can be passed to the generator and processed
func (g *tsGenerator) InputChan() chan []byte {
return g.nalInputChan
}
// OutputChan returns a handle to the generator output chan where the mpegts
// packets will show up once ready to go
func (g *tsGenerator) OutputChan() <-chan []byte {
return g.outputChan
} }
const ( const (
@ -216,35 +198,32 @@ const (
// generate handles the incoming data and generates equivalent mpegts packets - // generate handles the incoming data and generates equivalent mpegts packets -
// sending them to the output channel // sending them to the output channel
func (g *tsGenerator) generate() { func (e *Encoder) Encode(nalu []byte) error {
for {
nalu := <-g.nalInputChan
// Write PAT // Write PAT
patPkt := mpegts.Packet{ patPkt := Packet{
PUSI: true, PUSI: true,
PID: patPid, PID: patPid,
CC: g.ccFor(patPid), CC: e.ccFor(patPid),
AFC: hasPayload, AFC: hasPayload,
Payload: patTable, Payload: patTable,
} }
g.outputChan <- patPkt.Bytes() e.stream <- patPkt.Bytes()
// Write PMT. // Write PMT.
pmtPkt := mpegts.Packet{ pmtPkt := Packet{
PUSI: true, PUSI: true,
PID: pmtPid, PID: pmtPid,
CC: g.ccFor(pmtPid), CC: e.ccFor(pmtPid),
AFC: hasPayload, AFC: hasPayload,
Payload: pmtTable, Payload: pmtTable,
} }
g.outputChan <- pmtPkt.Bytes() e.stream <- pmtPkt.Bytes()
// Prepare PES data. // Prepare PES data.
pesPkt := pes.Packet{ pesPkt := pes.Packet{
StreamID: streamID, StreamID: streamID,
PDI: hasPTS, PDI: hasPTS,
PTS: g.pts(), PTS: e.pts(),
Data: nalu, Data: nalu,
HeaderLength: 5, HeaderLength: 5,
} }
@ -252,11 +231,11 @@ func (g *tsGenerator) generate() {
pusi := true pusi := true
for len(buf) != 0 { for len(buf) != 0 {
pkt := mpegts.Packet{ pkt := Packet{
PUSI: pusi, PUSI: pusi,
PID: videoPid, PID: videoPid,
RAI: pusi, RAI: pusi,
CC: g.ccFor(videoPid), CC: e.ccFor(videoPid),
AFC: hasAdaptationField | hasPayload, AFC: hasAdaptationField | hasPayload,
PCRF: pusi, PCRF: pusi,
} }
@ -266,36 +245,37 @@ func (g *tsGenerator) generate() {
if pusi { if pusi {
// If the packet has a Payload Unit Start Indicator // If the packet has a Payload Unit Start Indicator
// flag set then we need to write a PCR. // flag set then we need to write a PCR.
pkt.PCR = g.pcr() pkt.PCR = e.pcr()
pusi = false pusi = false
} }
g.outputChan <- pkt.Bytes() e.stream <- pkt.Bytes()
} }
g.tick() e.tick()
}
return nil
} }
// tick advances the clock one frame interval. // tick advances the clock one frame interval.
func (g *tsGenerator) tick() { func (e *Encoder) tick() {
g.clock += g.frameInterval e.clock += e.frameInterval
} }
// pts retuns the current presentation timestamp. // pts retuns the current presentation timestamp.
func (g *tsGenerator) pts() uint64 { func (e *Encoder) pts() uint64 {
return uint64((g.clock + g.ptsOffset).Seconds() * pcrFreq) return uint64((e.clock + e.ptsOffset).Seconds() * pcrFreq)
} }
// pcr returns the current program clock reference. // pcr returns the current program clock reference.
func (g *tsGenerator) pcr() uint64 { func (e *Encoder) pcr() uint64 {
return uint64(g.clock.Seconds() * pcrFreq) return uint64(e.clock.Seconds() * pcrFreq)
} }
// ccFor returns the next continuity counter for pid. // ccFor returns the next continuity counter for pid.
func (g *tsGenerator) ccFor(pid int) byte { func (e *Encoder) ccFor(pid int) byte {
cc := g.continuity[pid] cc := e.continuity[pid]
const continuityCounterMask = 0xf const continuityCounterMask = 0xf
g.continuity[pid] = (cc + 1) & continuityCounterMask e.continuity[pid] = (cc + 1) & continuityCounterMask
return cc return cc
} }

View File

@ -1,6 +1,6 @@
/* /*
NAME NAME
MpegTs.go - provides a data structure intended to encapsulate the properties mpegts.go - provides a data structure intended to encapsulate the properties
of an MpegTs packet and also functions to allow manipulation of these packets. of an MpegTs packet and also functions to allow manipulation of these packets.
DESCRIPTION DESCRIPTION
@ -10,7 +10,7 @@ AUTHOR
Saxon A. Nelson-Milton <saxon.milton@gmail.com> Saxon A. Nelson-Milton <saxon.milton@gmail.com>
LICENSE LICENSE
MpegTs.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) mpegts.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the under the terms of the GNU General Public License as published by the
@ -26,7 +26,7 @@ LICENSE
along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses).
*/ */
package mpegts package mts
const ( const (
mpegTsSize = 188 mpegTsSize = 188

View File

@ -0,0 +1,74 @@
/*
NAME
mpegts_test.go
DESCRIPTION
See Readme.md
AUTHOR
Dan Kortschak <dan@ausocean.org>
Saxon Nelson-Milton <saxon.milton@gmail.com>
LICENSE
mpegts_test.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package pes
import (
"reflect"
"testing"
)
const (
dataLength = 3 // bytes
)
func TestPesToByteSlice(t *testing.T) {
pkt := Packet{
StreamID: 0xE0, // StreamID
PDI: byte(2),
PTS: 100000,
HeaderLength: byte(10),
Stuff: []byte{0xFF, 0xFF},
Data: []byte{0xEA, 0x4B, 0x12},
}
got := pkt.Bytes()
want := []byte{
0x00, // packet start code prefix byte 1
0x00, // packet start code prefix byte 2
0x01, // packet start code prefix byte 3
0xE0, // stream ID
0x00, // PES Packet length byte 1
0x00, // PES packet length byte 2
0x80, // Marker bits,ScramblingControl, Priority, DAI, Copyright, Original
0x80, // PDI, ESCR, ESRate, DSMTrickMode, ACI, CRC, Ext
10, // header length
0x21, // PCR byte 1
0x00, // pcr byte 2
0x07, // pcr byte 3
0x0D, // pcr byte 4
0x41, // pcr byte 5
0xFF, // Stuffing byte 1
0xFF, // stuffing byte 3
0xEA, // data byte 1
0x4B, // data byte 2
0x12, // data byte 3
}
if !reflect.DeepEqual(got, want) {
t.Errorf("unexpected packet encoding:\ngot: %#v\nwant:%#v", got, want)
}
}