Working on cleaning up code and comments

This commit is contained in:
Saxon Milton 2018-02-28 01:40:38 +10:30
parent 7b50a4e95c
commit 92a2b033c9
8 changed files with 178 additions and 279 deletions

View File

@ -1,72 +0,0 @@
/*
NAME
BitrateCalculator.go - is a simple struct with methods to allow for easy
calculation of bitrate.
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon.milton@gmail.com>
LICENSE
BitrateCalculator.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses).
*/
package bitrate
import (
"fmt"
"time"
)
// BitrateCalculator provides fields and methods to allow calculation of bitrate
type BitrateCalculator struct {
outputDelay int // sec
now time.Time
prev time.Time
startedBefore bool
elapsedTime time.Duration
lastDisplayTime time.Time
}
// Place this at the start of the code segment that you would like to time
func (bc *BitrateCalculator) Start(outputDelay int) {
if outputDelay >= 0 {
bc.outputDelay = outputDelay
} else {
bc.outputDelay = 0
}
bc.prev = time.Now()
if !bc.startedBefore {
bc.startedBefore = true
bc.elapsedTime = time.Duration(0)
bc.lastDisplayTime = time.Now()
}
}
// Place this at the end of the code segment that you would like to time
func (bc *BitrateCalculator) Stop(noOfKB float64) (bitrate int64) {
bc.now = time.Now()
deltaTime := bc.now.Sub(bc.prev)
if bc.now.Sub(bc.lastDisplayTime) > time.Duration(bc.outputDelay)*time.Second {
bitrate = int64(noOfKB / float64(deltaTime/1e9))
fmt.Printf("Bitrate: %d kbps\n", bitrate)
bc.elapsedTime = time.Duration(0)
bc.lastDisplayTime = bc.now
}
return
}

View File

@ -1,77 +0,0 @@
/*
NAME
BitrateCalculator_test.go - is a file that may be used to test the
BitrateCalculator.go file using the golang testing utilities
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon.milton@gmail.com>
LICENSE
BitrateCalculator_test.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses).
*/
package bitrate
import (
"testing"
"time"
)
// Some consts used over duration of testing
const (
bitrateDelay1 = 0 // s
bitrateDelay2 = 5 // s
amountOfData = 100000.0
testTime = 2500.0 // ms
)
// This will be the BitrateCalculator object we use over testing
var bitrateCalc BitrateCalculator
// Simple test to check that the calculator can calc bitrate over a given
// duration of time
func Test1(t *testing.T) {
bitrateCalc = BitrateCalculator{}
bitrateCalc.Start(bitrateDelay1)
time.Sleep(testTime * time.Millisecond)
currentBitrate := int64(bitrateCalc.Stop(amountOfData))
actualBitrate := int64(amountOfData / ((testTime * time.Millisecond) / 1e9))
if currentBitrate != actualBitrate {
t.Errorf("Bitrate is wrong! Calculated: %v Actual %v", currentBitrate, actualBitrate)
}
}
// Now let's check that the output delay feature works
func Test2(t *testing.T) {
bitrateCalc = BitrateCalculator{}
var currentBitrate int64
for i := 0; i < 2; i++ {
bitrateCalc.Start(bitrateDelay2)
time.Sleep(testTime * time.Millisecond)
currentBitrate = int64(bitrateCalc.Stop(amountOfData))
if i == 0 && currentBitrate != 0 {
t.Errorf("The bitrate calc did not delay outputting!")
}
time.Sleep(6000 * time.Millisecond)
}
actualBitrate := int64(amountOfData / ((testTime * time.Millisecond) / 1e9))
if currentBitrate != actualBitrate {
t.Errorf("Bitrate is wrong! Calculated: %v Actual %v", currentBitrate, actualBitrate)
}
}

View File

@ -1,28 +0,0 @@
package efficientbuffer
type dataBlock struct {
address []byte // Address of the data block (slice)
lowerBound int // Lower bound of the data we're interested in
upperBound int // Upper bound of the data we're interested in
startIndex int // Index in our EffSlice
}
type EffSlice struct {
data map[int](*dataChunk)
}
func (s *EffSlice)GetElement(index int) byte {
}
func (s *EffSlice)AsByteSlice() []byte {
}
func (s *EffSlice)Append(data *EffSlice){
}
func (s *EffSlice)Append(data []byte){
}
func (s *EffSlice)Len(){
}

View File

@ -1,3 +1,30 @@
/*
NAME
FLV.go
DESCRIPTION
See Readme.md
AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org>
LICENSE
FLV.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses).
*/
package flv package flv
import ( import (
@ -13,29 +40,36 @@ const (
) )
const ( const (
VideoTagType = 9 VideoTagType = 9
AudioTagType = 8 AudioTagType = 8
KeyFrameType = 1 KeyFrameType = 1
InterFrameType = 2 InterFrameType = 2
H264 = 7 H264 = 7
AVCNALU = 1 AVCNALU = 1
SequenceHeader = 0 SequenceHeader = 0
DataHeaderLength = 5 DataHeaderLength = 5
NoTimestampExtension = 0 NoTimestampExtension = 0
AACAudioFormat = 10 AACAudioFormat = 10
PCMAudioFormat = 0 PCMAudioFormat = 0
) )
var flvHeaderCode = []byte{0x46, 0x4C, 0x56}
type Header struct { type Header struct {
AudioFlag bool AudioFlag bool
VideoFlag bool VideoFlag bool
} }
func btb(b bool) byte {
return tools.BoolToByte(b)
}
func (h *Header) ToByteSlice() (output []byte) { func (h *Header) ToByteSlice() (output []byte) {
output = make([]byte, 0, headerLength) output = make([]byte, 0, headerLength)
output = append(output, []byte{0x46, 0x4C, 0x56, output = append(output, flvheaderCode...)
output = append(output, []byte {
version, version,
0x00 | tools.BoolToByte(h.AudioFlag)<<2 | tools.BoolToByte(h.VideoFlag), 0x00 | btb(h.AudioFlag)<<2 | btb(h.VideoFlag),
0x00, 0x00, 0x00, byte(9), 0x00, 0x00, 0x00, byte(9),
}...) }...)
fmt.Println(output) fmt.Println(output)
@ -47,10 +81,10 @@ type VideoTag struct {
DataSize uint32 DataSize uint32
Timestamp uint32 Timestamp uint32
TimestampExtended uint32 TimestampExtended uint32
FrameType byte FrameType byte
Codec byte Codec byte
PacketType byte PacketType byte
CompositionTime uint32 CompositionTime uint32
Data []byte Data []byte
PrevTagSize uint32 PrevTagSize uint32
} }
@ -66,10 +100,14 @@ func (t *VideoTag) ToByteSlice() (output []byte) {
byte(t.Timestamp >> 8), byte(t.Timestamp >> 8),
byte(t.Timestamp), byte(t.Timestamp),
byte(t.TimestampExtended), byte(t.TimestampExtended),
0x00, 0x00, 0x00, 0x00,
0x00 | byte(t.FrameType << 4) | byte(t.Codec), 0x00,
0x00,
0x00 | byte(t.FrameType<<4) | byte(t.Codec),
t.PacketType, t.PacketType,
byte(t.CompositionTime >> 16),byte(t.CompositionTime >> 8),byte(t.CompositionTime), byte(t.CompositionTime >> 16),
byte(t.CompositionTime >> 8),
byte(t.CompositionTime),
}...) }...)
output = append(output, t.Data...) output = append(output, t.Data...)
output = append(output, []byte{ output = append(output, []byte{
@ -86,11 +124,11 @@ type AudioTag struct {
DataSize uint32 DataSize uint32
Timestamp uint32 Timestamp uint32
TimestampExtended uint32 TimestampExtended uint32
SoundFormat uint8 SoundFormat uint8
SoundRate uint8 SoundRate uint8
SoundSize bool SoundSize bool
SoundType bool SoundType bool
Data []byte Data []byte
PrevTagSize uint32 PrevTagSize uint32
} }
@ -105,8 +143,10 @@ func (t *AudioTag) ToByteSlice() (output []byte) {
byte(t.Timestamp >> 8), byte(t.Timestamp >> 8),
byte(t.Timestamp), byte(t.Timestamp),
byte(t.TimestampExtended), byte(t.TimestampExtended),
0x00, 0x00, 0x00, 0x00,
byte(t.SoundFormat << 4) | byte(t.SoundRate<<2) | byte(tools.BoolToByte(t.SoundSize)<<1) | byte(tools.BoolToByte(t.SoundType)), 0x00,
0x00,
byte(t.SoundFormat<<4) | byte(t.SoundRate<<2) | btb(t.SoundSize)<<1 | btb(t.SoundType),
}...) }...)
output = append(output, t.Data...) output = append(output, t.Data...)
output = append(output, []byte{ output = append(output, []byte{

View File

@ -1,3 +1,30 @@
/*
NAME
Config.go
DESCRIPTION
See Readme.md
AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org>
LICENSE
Config.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses).
*/
package revid package revid
import ( import (
@ -19,9 +46,10 @@ type Config struct {
InputCodec uint8 InputCodec uint8
Output uint8 Output uint8
RtmpEncodingMethod uint8 RtmpEncodingMethod uint8
RtmpMethod uint8
Packetization uint8
FramesPerClip int FramesPerClip int
RtmpUrl string RtmpUrl string
RtmpMethod uint8
Bitrate string Bitrate string
OutputFileName string OutputFileName string
InputFileName string InputFileName string
@ -31,7 +59,6 @@ type Config struct {
HttpAddress string HttpAddress string
Quantization string Quantization string
Timeout string Timeout string
Packetization uint8
IntraRefreshPeriod string IntraRefreshPeriod string
Logger smartLogger.LogInstance Logger smartLogger.LogInstance
} }
@ -43,7 +70,7 @@ const (
Rtp = 2 Rtp = 2
H264Codec = 3 H264Codec = 3
File = 4 File = 4
Http = 5 Http = 5
H264 = 6 H264 = 6
Mjpeg = 7 Mjpeg = 7
None = 8 None = 8
@ -52,7 +79,7 @@ const (
Ffmpeg = 11 Ffmpeg = 11
Revid = 12 Revid = 12
Flv = 13 Flv = 13
LibRtmp = 14 LibRtmp = 14
) )
// Default config settings // Default config settings

View File

@ -1,16 +1,16 @@
/* /*
NAME NAME
revid - a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols. RevidInstance.go
DESCRIPTION DESCRIPTION
See Readme.md See Readme.md
AUTHORS AUTHORS
Alan Noble <anoble@gmail.com> Saxon A. Nelson-Milton <saxon@ausocean.org>
Saxon A. Nelson-Milton <saxon.milton@gmail.com> Alan Noble <alan@ausocean.org>
LICENSE LICENSE
revid is Copyright (C) 2017 Alan Noble. revid is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the under the terms of the GNU General Public License as published by the
@ -67,7 +67,8 @@ const (
bitrateTime = 60 bitrateTime = 60
mjpegParserInChanLen = 100000 mjpegParserInChanLen = 100000
ffmpegPath = "/home/saxon/bin/ffmpeg" ffmpegPath = "/home/saxon/bin/ffmpeg"
rtmpConnectionTimout = 10 rtmpConnectionTimout = 10
outputChanSize = 10000
) )
// Log Types // Log Types
@ -109,8 +110,8 @@ type revidInst struct {
setupOutput func() error setupOutput func() error
getFrame func() []byte getFrame func() []byte
flushData func() flushData func()
sendClip func(clip []byte) error sendClip func(clip []byte) error
rtmpInst rtmp.RTMPSession rtmpInst rtmp.RTMPSession
} }
// NewRevidInstance returns a pointer to a new revidInst with the desired // NewRevidInstance returns a pointer to a new revidInst with the desired
@ -123,7 +124,7 @@ func NewRevidInstance(config Config) (r *revidInst, err error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
r.outputChan = make(chan []byte, 10000) r.outputChan = make(chan []byte, outputchanSize)
r.parser.Start() r.parser.Start()
go r.packClips() go r.packClips()
r.Log(Info, "New revid instance created! config is:") r.Log(Info, "New revid instance created! config is:")
@ -145,6 +146,7 @@ func (r *revidInst) ChangeState(config Config) error {
return errors.New("Config struct is bad!: " + err.Error()) return errors.New("Config struct is bad!: " + err.Error())
} }
r.config = config r.config = config
switch r.config.Output { switch r.config.Output {
case File: case File:
r.sendClip = r.sendClipToFile r.sendClip = r.sendClipToFile
@ -161,6 +163,7 @@ func (r *revidInst) ChangeState(config Config) error {
case Http: case Http:
r.sendClip = r.sendClipToHTTP r.sendClip = r.sendClipToHTTP
} }
switch r.config.Input { switch r.config.Input {
case Raspivid: case Raspivid:
r.setupInput = r.setupInputForRaspivid r.setupInput = r.setupInputForRaspivid
@ -176,24 +179,29 @@ func (r *revidInst) ChangeState(config Config) error {
r.Log(Info, "Using MJPEG parser!") r.Log(Info, "Using MJPEG parser!")
r.parser = parser.NewMJPEGParser(mjpegParserInChanLen) r.parser = parser.NewMJPEGParser(mjpegParserInChanLen)
} }
if r.config.Packetization == None {
switch r.config.Packetization {
case None:
// no packetisation - Revid output chan grabs raw data straight from parser
r.parser.SetOutputChan(r.outputChan) r.parser.SetOutputChan(r.outputChan)
r.getFrame = r.getFrameNoPacketization r.getFrame = r.getFrameNoPacketization
} else { goto noPacketizationSetup
switch r.config.Packetization { case Mpegts:
case Mpegts: r.Log(Info, "Using MPEGTS packetisation!")
r.Log(Info, "Using MPEGTS packetisation!") frameRateAsInt, _ := strconv.Atoi(r.config.FrameRate)
frameRateAsInt, _ := strconv.Atoi(r.config.FrameRate) r.generator = generator.NewTsGenerator(uint(frameRateAsInt))
r.generator = generator.NewTsGenerator(uint(frameRateAsInt)) case Flv:
case Flv: r.Log(Info, "Using FLV packetisation!")
r.Log(Info, "Using FLV packetisation!") frameRateAsInt, _ := strconv.Atoi(r.config.FrameRate)
frameRateAsInt, _ := strconv.Atoi(r.config.FrameRate) r.generator = generator.NewFlvGenerator(true, true, uint(frameRateAsInt))
r.generator = generator.NewFlvGenerator(true, true, uint(frameRateAsInt))
}
r.getFrame = r.getFramePacketization
r.parser.SetOutputChan(r.generator.GetInputChan())
r.generator.Start()
} }
// We have packetization of some sort, so we want to send data to Generator
// to perform packetization
r.getFrame = r.getFramePacketization
r.parser.SetOutputChan(r.generator.GetInputChan())
r.generator.Start()
noPacketizationSetup:
return nil return nil
} }
@ -214,7 +222,7 @@ func (r *revidInst) IsRunning() bool {
} }
// Start invokes a revidInst to start processing video from a defined input // Start invokes a revidInst to start processing video from a defined input
// and packetising to a defined output. // and packetising (if theres packetization) to a defined output.
func (r *revidInst) Start() { func (r *revidInst) Start() {
if r.isRunning { if r.isRunning {
r.Log(Warning, "revidInst.Start() called but revid already running!") r.Log(Warning, "revidInst.Start() called but revid already running!")
@ -236,6 +244,7 @@ func (r *revidInst) Stop() {
if r.isRunning { if r.isRunning {
r.Log(Info, "Stopping revid!") r.Log(Info, "Stopping revid!")
r.isRunning = false r.isRunning = false
// If a cmd process is running, we kill!
if r.cmd != nil { if r.cmd != nil {
r.cmd.Process.Kill() r.cmd.Process.Kill()
} }
@ -244,39 +253,32 @@ func (r *revidInst) Stop() {
} }
} }
// Start invokes a revidInst to start processing video from a defined input // getFrameNoPacketization gets a frame directly from the revid output chan
// and packetising to a defined output. // as we don't need to go through the generator with no packetization settings
func (r *revidInst) getFrameNoPacketization() []byte { func (r *revidInst) getFrameNoPacketization() []byte {
return <-r.outputChan return <-r.outputChan
} }
// Start invokes a revidInst to start processing video from a defined input // getFramePacketization gets a frame from the generators output chan - the
// and packetising to a defined output. // the generator being an mpegts or flv generator depending on the config
func (r *revidInst) getFramePacketization() []byte { func (r *revidInst) getFramePacketization() []byte {
return <-(r.generator.GetOutputChan()) return <-(r.generator.GetOutputChan())
} }
// Start invokes a revidInst to start processing video from a defined input // flushDataPacketization removes data from the revid inst's coutput chan
// and packetising to a defined output. func (r *revidInst) flushData() {
func (r *revidInst) flushDataNoPacketisation() {
for len(r.outputChan) > 0 { for len(r.outputChan) > 0 {
<-(r.outputChan) <-(r.outputChan)
} }
} }
// Start invokes a revidInst to start processing video from a defined input // packClips takes data segments; whether that be tsPackets or mjpeg frames and
func (r *revidInst) flushDataMpegtsPacketisation() { // packs them into clips consisting of the amount frames specified in the config
for len(r.generator.GetOutputChan()) > 0 {
<-(r.generator.GetOutputChan())
}
}
// packClips takes data segments; whether that be tsPackets or jpeg frames and
// packs them into clips 1s long.
func (r *revidInst) packClips() { func (r *revidInst) packClips() {
clipSize := 0 clipSize := 0
packetCount := 0 packetCount := 0
for { for {
// Get some memory from the ring buffer for out clip
if clip, err := r.ringBuffer.Get(); err != nil { if clip, err := r.ringBuffer.Get(); err != nil {
r.Log(Error, err.Error()) r.Log(Error, err.Error())
r.Log(Warning, "Clearing output chan!") r.Log(Warning, "Clearing output chan!")
@ -334,7 +336,8 @@ func (r *revidInst) outputClips() {
break break
} }
} }
// let the ringbuffer know that we're done with the memory we grabbed when
// we call ringBuffer.Get()
if err := r.ringBuffer.DoneReading(); err != nil { if err := r.ringBuffer.DoneReading(); err != nil {
r.Log(Error, err.Error()) r.Log(Error, err.Error())
} }
@ -353,8 +356,7 @@ func (r *revidInst) outputClips() {
} }
} }
// Start invokes a revidInst to start processing video from a defined input // senClipToFile writes the passed clip to a file
// and packetising to a defined output.
func (r *revidInst) sendClipToFile(clip []byte) error { func (r *revidInst) sendClipToFile(clip []byte) error {
_,err := r.outputFile.Write(clip) _,err := r.outputFile.Write(clip)
if err != nil { if err != nil {
@ -366,9 +368,7 @@ func (r *revidInst) sendClipToFile(clip []byte) error {
// sendClipToHTTP takes a clip and an output url and posts through http. // sendClipToHTTP takes a clip and an output url and posts through http.
func (r *revidInst) sendClipToHTTP(clip []byte) error { func (r *revidInst) sendClipToHTTP(clip []byte) error {
timeout := time.Duration(httpTimeOut * time.Second) timeout := time.Duration(httpTimeOut * time.Second)
client := http.Client{ client := http.Client{Timeout: timeout}
Timeout: timeout,
}
url := r.config.HttpAddress+strconv.Itoa(len(clip)) url := r.config.HttpAddress+strconv.Itoa(len(clip))
r.Log(Debug, fmt.Sprintf("Posting %s (%d bytes)\n", url, len(clip))) r.Log(Debug, fmt.Sprintf("Posting %s (%d bytes)\n", url, len(clip)))
resp, err := client.Post(url, "video/mp2t", bytes.NewReader(clip)) // lighter than NewBuffer resp, err := client.Post(url, "video/mp2t", bytes.NewReader(clip)) // lighter than NewBuffer
@ -385,23 +385,21 @@ func (r *revidInst) sendClipToHTTP(clip []byte) error {
return nil return nil
} }
// Start invokes a revidInst to start processing video from a defined input // sendClipToFfmpegRtmp sends the clip over the current rtmp connection using
// and packetising to a defined output. // an ffmpeg process.
func (r *revidInst) sendClipToFfmpegRtmp(clip []byte) error { func (r *revidInst) sendClipToFfmpegRtmp(clip []byte) (err error) {
_, err := r.ffmpegStdin.Write(clip) _, err = r.ffmpegStdin.Write(clip)
if err != nil { return
return err
}
return nil
} }
// sendClipToLibRtmp send the clip over the current rtmp connection using the
// c based librtmp library
func (r *revidInst) sendClipToLibRtmp(clip []byte) (err error) { func (r *revidInst) sendClipToLibRtmp(clip []byte) (err error) {
err = r.rtmpInst.WriteFrame(clip,uint(len(clip))) err = r.rtmpInst.WriteFrame(clip,uint(len(clip)))
return return
} }
// Start invokes a revidInst to start processing video from a defined input // setupOutputForFfmpegRtmp sets up output to rtmp using an ffmpeg process
// and packetising to a defined output.
func (r *revidInst) setupOutputForFfmpegRtmp() error { func (r *revidInst) setupOutputForFfmpegRtmp() error {
r.ffmpegCmd = exec.Command(ffmpegPath, r.ffmpegCmd = exec.Command(ffmpegPath,
"-f", "h264", "-f", "h264",
@ -434,20 +432,23 @@ func (r *revidInst) setupOutputForFfmpegRtmp() error {
return nil return nil
} }
// setupOutputForLibRtmp sets up rtmp output using the wrapper for the c based
// librtmp library - makes connection and starts comms etc.
func (r *revidInst) setupOutputForLibRtmp() (err error) { func (r *revidInst) setupOutputForLibRtmp() (err error) {
r.rtmpInst = rtmp.NewRTMPSession(r.config.RtmpUrl, rtmpConnectionTimout) r.rtmpInst = rtmp.NewRTMPSession(r.config.RtmpUrl, rtmpConnectionTimout)
err = r.rtmpInst.StartSession() err = r.rtmpInst.StartSession()
//go r.testRtmp(5000) // go r.testRtmp(5000)
return return
} }
// setupOutputForFile sets up an output file to output data to
func (r *revidInst) setupOutputForFile() (err error) { func (r *revidInst) setupOutputForFile() (err error) {
r.outputFile, err = os.Create(r.config.OutputFileName) r.outputFile, err = os.Create(r.config.OutputFileName)
return return
} }
// Start invokes a revidInst to start processing video from a defined input // setupInputForRaspivid sets up things for input from raspivid i.e. starts
// and packetising to a defined output. // a raspivid process and pipes it's data output.
func (r *revidInst) setupInputForRaspivid() error { func (r *revidInst) setupInputForRaspivid() error {
r.Log(Info, "Starting raspivid!") r.Log(Info, "Starting raspivid!")
switch r.config.InputCodec { switch r.config.InputCodec {
@ -485,7 +486,7 @@ func (r *revidInst) setupInputForRaspivid() error {
return nil return nil
} }
// Start invokes a revidInst to start processing video from a defined input // setupInputForFile sets things up for getting input from a file
func (r *revidInst) setupInputForFile() error { func (r *revidInst) setupInputForFile() error {
fps,_ := strconv.Atoi(r.config.FrameRate) fps,_ := strconv.Atoi(r.config.FrameRate)
r.parser.SetDelay( uint( float64(1000) / float64(fps) ) ) r.parser.SetDelay( uint( float64(1000) / float64(fps) ) )
@ -493,31 +494,40 @@ func (r *revidInst) setupInputForFile() error {
return nil return nil
} }
// testRtmp is useful to check robustness of connections. Intended to be run as
// goroutine. After every 'delayTime' the rtmp connection is ended and then
// restarted
func (r *revidInst)testRtmp(delayTime uint){ func (r *revidInst)testRtmp(delayTime uint){
for { for {
time.Sleep(time.Duration(delayTime)*time.Millisecond) time.Sleep(time.Duration(delayTime)*time.Millisecond)
r.rtmpInst.EndSession() r.rtmpInst.EndSession()
r.rtmpInst.StartSession() r.rtmpInst.StartSession()
} }
} }
// readCamera reads data from the defined camera while the revidInst is running. // readCamera reads data from the defined camera while the revidInst is running.
// TODO: use ringbuffer here instead of allocating mem every time!
func (r *revidInst) readCamera() { func (r *revidInst) readCamera() {
r.Log(Info, "Reading camera data!") r.Log(Info, "Reading camera data!")
for r.isRunning { for r.isRunning {
data := make([]byte, 1) data := make([]byte, 1)
_, err := io.ReadFull(r.inputReader, data) _, err := io.ReadFull(r.inputReader, data)
switch { switch {
// We know this means we're getting nothing from the cam
case err != nil && err.Error() == "EOF" && r.isRunning: case err != nil && err.Error() == "EOF" && r.isRunning:
r.Log(Error, "No data from camera!") r.Log(Error, "No data from camera!")
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
// We don't know what this one is, so let's just output the error to the log
case err != nil && r.isRunning: case err != nil && r.isRunning:
r.Log(Error, err.Error()) r.Log(Error, err.Error())
// Everything is fine! send the data to the parser to split it up into
// frames/access units
default: default:
r.parser.GetInputChan() <- data[0] r.parser.GetInputChan() <- data[0]
} }
} }
r.Log(Info, "Out of reading routine!") r.Log(Info, "Not trying to read from camera anymore!")
} }
// readFile reads data from the defined file while the revidInst is running. // readFile reads data from the defined file while the revidInst is running.

View File

@ -1,16 +1,15 @@
/* /*
NAME NAME
MpegTs.go - provides a data structure intended to encapsulate the properties revid_test.go
of an MpegTs packet.
DESCRIPTION DESCRIPTION
See Readme.md See Readme.md
AUTHOR AUTHOR
Saxon Nelson-Milton <saxon.milton@gmail.com> Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE LICENSE
MpegTs.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) revid_test.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the under the terms of the GNU General Public License as published by the
@ -33,7 +32,7 @@ import (
"time" "time"
) )
/*
// Test revidInst with a file input // Test revidInst with a file input
func TestFileInput(t *testing.T){ func TestFileInput(t *testing.T){
config := Config{ config := Config{
@ -94,9 +93,9 @@ func TestRaspividMJPEGInput(t *testing.T){
time.Sleep(20*time.Second) time.Sleep(20*time.Second)
revidInst.Stop() revidInst.Stop()
} }
*/
/*
// Test revidInst with rtmp output // Test revidInst with rtmp output
func TestRtmpOutput(t *testing.T){ func TestRtmpOutput(t *testing.T){
config := Config{ config := Config{
@ -120,9 +119,9 @@ func TestRtmpOutput(t *testing.T){
time.Sleep(120*time.Second) time.Sleep(120*time.Second)
revidInst.Stop() revidInst.Stop()
} }
*/
/*
// Test h264 inputfile to flv output files // Test h264 inputfile to flv output files
func TestFlvOutputFile(t *testing.T) { func TestFlvOutputFile(t *testing.T) {
config := Config{ config := Config{
@ -143,7 +142,7 @@ func TestFlvOutputFile(t *testing.T) {
time.Sleep(30 * time.Second) time.Sleep(30 * time.Second)
revidInst.Stop() revidInst.Stop()
} }
*/
// Test h264 inputfile to flv format into rtmp using librtmp c wrapper // Test h264 inputfile to flv format into rtmp using librtmp c wrapper
func TestRtmpOutputUsingLibRtmp(t *testing.T){ func TestRtmpOutputUsingLibRtmp(t *testing.T){