From c46a8d8f08059fddca2edb17611361d29ea9bd42 Mon Sep 17 00:00:00 2001 From: Unknown Date: Mon, 12 Feb 2018 18:28:29 +1030 Subject: [PATCH] Fixed errors and bugs and it seems like flv packetization is working --- flv/FLV.go | 65 +++++++------ generator/FLVGenerator.go | 56 +++++++---- generator/Generator.go | 16 +--- generator/MPEGTSGenerator.go | 45 +++++---- generator/tsgenerator_test.go | 2 +- parser/H264Parser.go | 6 ++ parser/MJPEGParser.go | 5 + parser/Parser.go | 1 - revid/Config.go | 24 ++++- revid/RevidInstance.go | 169 ++++++++++++++++++---------------- revid/revid_test.go | 40 ++++---- ringbuffer/RingBuffer.go | 4 +- 12 files changed, 250 insertions(+), 183 deletions(-) diff --git a/flv/FLV.go b/flv/FLV.go index fef99ce3..4e40b7ec 100644 --- a/flv/FLV.go +++ b/flv/FLV.go @@ -2,6 +2,7 @@ package flv import ( "../tools" + "fmt" ) const ( @@ -12,7 +13,11 @@ const ( ) const ( - videoTagType = 9 + VideoTagType = 9 + KeyFrameType = 1 + H264 = 7 + AVCNALU = 1 + DataHeaderLength = 5 ) type Header struct { @@ -20,13 +25,14 @@ type Header struct { VideoFlag bool } -func (h *Header) toByteSlice() []byte { +func (h *Header) ToByteSlice() (output []byte) { output = make([]byte, 0, headerLength) output = append(output, []byte{0x46, 0x4C, 0x56, version, - 0x00 | tools.boolToByte(h.audioFlag)<<3 | tools.boolToByte(h.videoFlag), - 0x00, 0x00, 0x00, byte(72), + 0x00 | tools.BoolToByte(h.AudioFlag)<<2 | tools.BoolToByte(h.VideoFlag), + 0x00, 0x00, 0x00, byte(9), }...) + fmt.Println(output) return } @@ -36,41 +42,48 @@ type VideoTag struct { DataSize uint32 Timestamp uint32 TimestampExtended uint32 + FrameType byte + Codec byte + PacketType byte + CompositionTime uint32 Data []byte } -func (t *VideoTag) toByteSlice() (output []byte) { +func (t *VideoTag) ToByteSlice() (output []byte) { output = make([]byte, 0, maxVideoTagSize) output = append(output, []byte{ - byte(t.prevTagSize >> 24), - byte(t.prevTagSize >> 16), - byte(t.prevTagSize >> 8), - byte(t.prevTagSize), - byte(t.tageType), - byte(t.dataSize >> 16), - byte(t.dataSize >> 8), - byte(t.dataSize), - byte(t.timeStamp >> 16), - byte(t.timeStamp >> 8), - byte(t.timeStamp), - byte(t.timestampExtended), + byte(t.PrevTagSize >> 24), + byte(t.PrevTagSize >> 16), + byte(t.PrevTagSize >> 8), + byte(t.PrevTagSize), + byte(t.TagType), + byte(t.DataSize >> 16), + byte(t.DataSize >> 8), + byte(t.DataSize), + byte(t.Timestamp >> 16), + byte(t.Timestamp >> 8), + byte(t.Timestamp), + byte(t.TimestampExtended), 0x00, 0x00, 0x00, + byte(t.FrameType << 4) | byte(t.Codec), + t.PacketType, + byte(t.CompositionTime >> 16),byte(t.CompositionTime >> 8),byte(t.CompositionTime), }...) - output = append(output, data...) + output = append(output, t.Data...) return } type AudioTag struct { - soundFormat uint8 - soundRate uint8 - soundSize uint8 - soundType uint8 - data []byte + SoundFormat uint8 + SoundRate uint8 + SoundSize uint8 + SoundType uint8 + Data []byte } -func (t *AudioTage) toByteSlice() (output []byte) { +func (t *AudioTag) ToByteSlice() (output []byte) { output = make([]byte, 0, maxAudioTagSize) - output = append(output, byte(soundFormat<<4)|byte(soundRate<<2)|byte(soundSize<<1)|byte(soundType)) - output = append(output, data...) + output = append(output, byte(t.SoundFormat<<4)|byte(t.SoundRate<<2)|byte(t.SoundSize<<1)|byte(t.SoundType)) + output = append(output, t.Data...) return } diff --git a/generator/FLVGenerator.go b/generator/FLVGenerator.go index cea6ad36..ea367ecd 100644 --- a/generator/FLVGenerator.go +++ b/generator/FLVGenerator.go @@ -1,10 +1,23 @@ package generator +import ( + "../flv" +) + +const ( + inputChanLength = 1000 + outputChanLength = 1000 +) + type flvGenerator struct { fps uint inputChan chan []byte outputChan chan []byte - header Header + audioFlag bool + videoFlag bool + lastTagSize int + currentTimestamp uint32 + header flv.Header } func (g *flvGenerator)GetInputChan() chan []byte { @@ -15,10 +28,15 @@ func (g *flvGenerator)GetOutputChan() chan []byte { return g.outputChan } -func NewFlvGenerator() (g *flvGenerator) { +func NewFlvGenerator(audio bool, video bool, fps uint) (g *flvGenerator) { g = new(flvGenerator) - g.timestamp = 0 + g.fps = fps + g.audioFlag = audio + g.videoFlag = video + g.currentTimestamp = 0 g.lastTagSize = 0 + g.inputChan = make(chan []byte, inputChanLength) + g.outputChan = make(chan []byte, outputChanLength) return } @@ -28,36 +46,42 @@ func (g *flvGenerator) Start(){ func (g *flvGenerator) GenHeader(){ header := flv.Header{ - AudioFlag: true, - VideoFlag: true, + AudioFlag: g.audioFlag, + VideoFlag: g.videoFlag, } - g.outputChan <- header.toByteSlice() + g.outputChan <- header.ToByteSlice() } func (g *flvGenerator) getNextTimestamp() (timestamp uint32){ timestamp = g.currentTimestamp - g.currentTimeStamp += 100*time.Millisecond() / g.fps + g.currentTimestamp += uint32(1000) / uint32(g.fps) return } func (g *flvGenerator) ResetTimestamp() { - g.timestamp = 0 + g.currentTimestamp = 0 } -func (g *tsGenerator) generate() { +func (g *flvGenerator) generate() { g.GenHeader() for { select { - case videoFrame := <-g.inputChan - tag := VideoTage{ - PrevTagSize: g.lastTagSize, - TagType: flv.videoTagType, - DataSize: len(videoFrame), + case videoFrame := <-g.inputChan: + tag := flv.VideoTag{ + PrevTagSize: uint32(g.lastTagSize), + TagType: uint8(flv.VideoTagType), + DataSize: uint32(len(videoFrame)) + flv.DataHeaderLength, Timestamp: g.getNextTimestamp(), TimestampExtended: 0, - Data: videoFrame + FrameType: flv.KeyFrameType, + Codec: flv.H264, + PacketType: flv.AVCNALU, + CompositionTime: 0, + Data: videoFrame, } - g.outputChan<-tag.toByteSlice() + tagAsByteSlice := tag.ToByteSlice() + g.lastTagSize = len(tagAsByteSlice) + g.outputChan<-tagAsByteSlice } } } diff --git a/generator/Generator.go b/generator/Generator.go index aa965de0..36ca255f 100644 --- a/generator/Generator.go +++ b/generator/Generator.go @@ -28,22 +28,8 @@ LICENSE package generator -import ( - _"fmt" - _"os" - //"bitbucket.org/ausocean/av/mpegts" - //"bitbucket.org/ausocean/av/pes" - //"bitbucket.org/ausocean/av/tools" - //"bitbucket.org/ausocean/av/rtp" - "../mpegts" - "../pes" - "../tools" - "../rtp" -) - type Generator interface { GetInputChan() chan []byte - GetOutputChan() <-chan *mpegts.MpegTsPacket + GetOutputChan() chan []byte Start() - Stop() } diff --git a/generator/MPEGTSGenerator.go b/generator/MPEGTSGenerator.go index c3dea21f..981b3e4c 100644 --- a/generator/MPEGTSGenerator.go +++ b/generator/MPEGTSGenerator.go @@ -1,5 +1,18 @@ package generator +import ( + _"fmt" + _"os" + //"bitbucket.org/ausocean/av/mpegts" + //"bitbucket.org/ausocean/av/pes" + //"bitbucket.org/ausocean/av/tools" + //"bitbucket.org/ausocean/av/rtp" + "../mpegts" + "../pes" + "../tools" + "../rtp" +) + var ( PatTable = []byte{0, 0, 176, 13, 0, 1, 193, 0, 0, 0, 1, 240, 0, 42, 177, 4, 178, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,} @@ -14,10 +27,8 @@ const ( ) type tsGenerator struct { - TsChan <-chan *mpegts.MpegTsPacket - tsChan chan<- *mpegts.MpegTsPacket - InputChan chan<- rtp.RtpPacket - inputChan <-chan rtp.RtpPacket + rtpInputChan chan rtp.RtpPacket + outputChan chan []byte nalInputChan chan []byte currentTsPacket *mpegts.MpegTsPacket payloadByteChan chan byte @@ -33,18 +44,14 @@ func (g *tsGenerator)GetInputChan() chan []byte { return g.nalInputChan } -func (g *tsGenerator)GetOutputChan() <-chan *mpegts.MpegTsPacket { - return g.TsChan +func (g *tsGenerator)GetOutputChan() chan []byte { + return g.outputChan } func NewTsGenerator(fps uint) (g *tsGenerator) { g = new(tsGenerator) - tsChan := make(chan *mpegts.MpegTsPacket, 100) - g.TsChan = tsChan - g.tsChan = tsChan - inputChan := make(chan rtp.RtpPacket, 100) - g.InputChan = inputChan - g.inputChan = inputChan + g.outputChan = make(chan []byte, 100) + g.rtpInputChan = make(chan rtp.RtpPacket, 100) g.nalInputChan = make(chan []byte, 10000) g.currentCC = 0 g.fps = fps @@ -80,7 +87,7 @@ func (g *tsGenerator) generate() { var rtpBuffer [](*rtp.RtpPacket) for { select { - case rtpPacket := <-g.inputChan: + case rtpPacket := <-g.rtpInputChan: rtpBuffer = append(rtpBuffer, &rtpPacket) if len(rtpBuffer) > 2 { // if there's something weird going on with sequence numbers then @@ -189,7 +196,9 @@ func (g *tsGenerator) generate() { AFC: 1, Payload: PatTable, } - g.tsChan <- &patPkt + + patPktAsByteSlice, _ := patPkt.ToByteSlice() + g.outputChan <- patPktAsByteSlice // Create pmt table and send off pmtPkt := mpegts.MpegTsPacket{ @@ -199,12 +208,14 @@ func (g *tsGenerator) generate() { AFC: 1, Payload: PmtTable, } - g.tsChan <- &pmtPkt + pmtPktAsByteSlice, _ := pmtPkt.ToByteSlice() + g.outputChan <- pmtPktAsByteSlice + pkt.PCR = g.genPcr() pusi = false } - - g.tsChan <- &pkt + pktAsBytelice, _ := pkt.ToByteSlice() + g.outputChan<-pktAsBytelice } } } diff --git a/generator/tsgenerator_test.go b/generator/tsgenerator_test.go index 4a536409..7fd0d500 100644 --- a/generator/tsgenerator_test.go +++ b/generator/tsgenerator_test.go @@ -26,4 +26,4 @@ LICENSE along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). */ -package tsgenerator +package generator diff --git a/parser/H264Parser.go b/parser/H264Parser.go index d95069e3..430ecf74 100644 --- a/parser/H264Parser.go +++ b/parser/H264Parser.go @@ -1,5 +1,11 @@ package parser +import ( + //"bitbucket.org/ausocean/av/itut" + "../itut" + _"fmt" +) + type h264Parser struct { inputBuffer []byte isParsing bool diff --git a/parser/MJPEGParser.go b/parser/MJPEGParser.go index e4500163..715c20f3 100644 --- a/parser/MJPEGParser.go +++ b/parser/MJPEGParser.go @@ -1,5 +1,10 @@ package parser +import ( + //"bitbucket.org/ausocean/av/itut" + _"fmt" +) + type mjpegParser struct { inputBuffer []byte isParsing bool diff --git a/parser/Parser.go b/parser/Parser.go index 003934fc..8495b60c 100644 --- a/parser/Parser.go +++ b/parser/Parser.go @@ -29,7 +29,6 @@ package parser import ( //"bitbucket.org/ausocean/av/itut" - "../itut" "log" "sync" _"fmt" diff --git a/revid/Config.go b/revid/Config.go index 3e7d0821..ac3b5d9b 100644 --- a/revid/Config.go +++ b/revid/Config.go @@ -1,5 +1,17 @@ package revid +import ( + "errors" + "strconv" + + //"bitbucket.org/ausocean/av/parser" + //"bitbucket.org/ausocean/av/tsgenerator" + + //"bitbucket.org/ausocean/av/ringbuffer" + //"bitbucket.org/ausocean/utils/smartLogger" + "../../utils/smartLogger" +) + // Config provides parameters relevant to a revid instance. A new config must // be passed to the constructor. type Config struct { @@ -7,7 +19,7 @@ type Config struct { InputCodec uint8 Output uint8 RtmpEncodingMethod uint8 - FramesPerClip uint + FramesPerClip int RtmpUrl string Bitrate string OutputFileName string @@ -30,7 +42,7 @@ const ( Rtp = 2 H264Codec = 3 File = 4 - HttpOut = 5 + Http = 5 H264 = 6 Mjpeg = 7 None = 8 @@ -38,7 +50,7 @@ const ( Rtmp = 10 Ffmpeg = 11 Revid = 12 - Flv = 13 + Flv = 13 ) // Default config settings @@ -93,7 +105,7 @@ func (config *Config) Validate(r *revidInst) error { } switch config.Output { - case HttpOut: + case Http: case File: case Rtmp: switch config.RtmpEncodingMethod { @@ -108,7 +120,7 @@ func (config *Config) Validate(r *revidInst) error { } case NothingDefined: r.Log(Warning, "No output defined, defaulting to httpOut!") - config.Output = HttpOut + config.Output = Http default: return errors.New("Bad output type defined in config!") } @@ -116,6 +128,7 @@ func (config *Config) Validate(r *revidInst) error { switch config.Packetization { case None: case Mpegts: + case Flv: case NothingDefined: r.Log(Warning, "No packetization option defined, defaulting to none!") config.Packetization = None @@ -185,4 +198,5 @@ func (config *Config) Validate(r *revidInst) error { return errors.New("Bad quantization defined in config!") } } + return nil } diff --git a/revid/RevidInstance.go b/revid/RevidInstance.go index 2eab9c3f..17d30633 100644 --- a/revid/RevidInstance.go +++ b/revid/RevidInstance.go @@ -44,12 +44,11 @@ import ( //"bitbucket.org/ausocean/av/parser" //"bitbucket.org/ausocean/av/tsgenerator" + "../generator" "../parser" - "../tsgenerator" //"bitbucket.org/ausocean/av/ringbuffer" //"bitbucket.org/ausocean/utils/smartLogger" - "../../utils/smartLogger" "../ringbuffer" ) @@ -58,9 +57,9 @@ const ( clipDuration = 1 // s mp2tPacketSize = 188 // MPEG-TS packet size mp2tMaxPackets = 2016 * clipDuration // # first multiple of 7 and 8 greater than 2000 - ringBufferSize = 100 / clipDuration + ringBufferSize = 1000 / clipDuration ringBufferElementSize = 10000000 - maxClipSize = 100000 + maxClipSize = 100000 httpTimeOut = 5 // s packetsPerFrame = 7 h264BufferSize = 1000000 @@ -90,23 +89,25 @@ type RevidInst interface { // The revidInst struct provides fields to describe the state of a RevidInst. type revidInst struct { - ffmpegPath string - tempDir string - ringBuffer ringbuffer.RingBuffer - config Config - isRunning bool - outputFile *os.File - inputFile *os.File - generator generator.TsGenerator - parser parser.Parser - cmd *exec.Cmd - ffmpegCmd *exec.Cmd - inputReader *bufio.Reader - ffmpegStdin io.WriteCloser - outputChan chan []byte - configureOutput func() - getFrame func()[]byte - flushData func() + ffmpegPath string + tempDir string + ringBuffer ringbuffer.RingBuffer + config Config + isRunning bool + outputFile *os.File + inputFile *os.File + generator generator.Generator + parser parser.Parser + cmd *exec.Cmd + ffmpegCmd *exec.Cmd + inputReader *bufio.Reader + ffmpegStdin io.WriteCloser + outputChan chan []byte + setupInput func() error + setupOutput func() error + getFrame func() []byte + flushData func() + sendClip func(clip []byte) error } // NewRevidInstance returns a pointer to a new revidInst with the desired @@ -140,47 +141,48 @@ func (r *revidInst) ChangeState(config Config) error { if err != nil { return errors.New("Config struct is bad!: " + err.Error()) } + r.config = config switch r.config.Output { case File: - r.outputFile, err = os.Create(r.config.OutputFileName) - if err != nil { - return nil, err - } - configureOutput = configureOutputForFile + r.sendClip = r.sendClipToFile + r.setupOutput = r.setupOutputForFile case Rtmp: - configureOutput = configureOutputForRtmp + r.setupOutput = r.setupOutputForRtmp + r.sendClip = r.sendClipToRtmp + case Http: + r.sendClip = r.sendClipToHTTP } switch r.config.Input { case Raspivid: - configureInput = configureInputForRaspivid + r.setupInput = r.setupInputForRaspivid case File: - configureInput = configureInputForFile + r.setupInput = r.setupInputForFile } switch r.config.InputCodec { case H264: r.Log(Info, "Using H264 parser!") r.parser = parser.NewH264Parser() + fmt.Println("here") case Mjpeg: r.Log(Info, "Using MJPEG parser!") r.parser = parser.NewMJPEGParser(mjpegParserInChanLen) } if r.config.Packetization == None { r.parser.SetOutputChan(r.outputChan) - getFrame = getFrameForNoPacketization + r.getFrame = r.getFrameNoPacketization } else { switch r.config.Packetization { case Mpegts: frameRateAsInt, _ := strconv.Atoi(r.config.FrameRate) - r.generator = tsgenerator.NewTsGenerator(uint(frameRateAsInt)) + r.generator = generator.NewTsGenerator(uint(frameRateAsInt)) case Flv: frameRateAsInt, _ := strconv.Atoi(r.config.FrameRate) - r.generator = flvGenerator.NewFlvGenerator(uint(frameRateAsInt)) + r.generator = generator.NewFlvGenerator(false, true, uint(frameRateAsInt)) } - getFrame = getFrameForPacketization + r.getFrame = r.getFramePacketization r.parser.SetOutputChan(r.generator.GetInputChan()) r.generator.Start() } - r.config = config return nil } @@ -230,27 +232,27 @@ func (r *revidInst) Stop() { // Start invokes a revidInst to start processing video from a defined input // and packetising to a defined output. func (r *revidInst) getFrameNoPacketization() []byte { - return <-r.outputChan + return <-r.outputChan } // Start invokes a revidInst to start processing video from a defined input // and packetising to a defined output. -func (r *revidInst) getFrameWithPacketization() []byte { +func (r *revidInst) getFramePacketization() []byte { return <-(r.generator.GetOutputChan()) } // Start invokes a revidInst to start processing video from a defined input // and packetising to a defined output. -func (r *revidInst) flushDataNoPacketisation(){ +func (r *revidInst) flushDataNoPacketisation() { for len(r.outputChan) > 0 { <-(r.outputChan) } } // Start invokes a revidInst to start processing video from a defined input -func (r *revidInst) flushDataMpegtsPacketisation(){ +func (r *revidInst) flushDataMpegtsPacketisation() { for len(r.generator.GetOutputChan()) > 0 { - <-(r.generator.GetTsOutputChan()) + <-(r.generator.GetOutputChan()) } } @@ -259,13 +261,11 @@ func (r *revidInst) flushDataMpegtsPacketisation(){ func (r *revidInst) packClips() { clipSize := 0 packetCount := 0 - now := time.Now() - prevTime := now for { if clip, err := r.ringBuffer.Get(); err != nil { r.Log(Error, err.Error()) r.Log(Warning, "Clearing output chan!") - r.flushOutputData() + r.flushData() } else { for { frame := r.getFrame() @@ -274,7 +274,6 @@ func (r *revidInst) packClips() { copy(clip[clipSize:upperBound], frame) packetCount++ clipSize += lenOfFrame - now = time.Now() if packetCount >= r.config.FramesPerClip { if err := r.ringBuffer.DoneWriting(clipSize); err != nil { r.Log(Error, err.Error()) @@ -282,7 +281,6 @@ func (r *revidInst) packClips() { } clipSize = 0 packetCount = 0 - prevTime = now break } } @@ -329,6 +327,9 @@ func (r *revidInst) outputClips() { prevTime = now bytes = 0 } + } else { + r.Log(Debug, err.Error()) + time.Sleep(1*time.Second) } } } @@ -336,7 +337,7 @@ func (r *revidInst) outputClips() { // Start invokes a revidInst to start processing video from a defined input // and packetising to a defined output. func (r *revidInst) sendClipToFile(clip []byte) error { - err := r.outputFile.Write(clip) + _,err := r.outputFile.Write(clip) if err != nil { return err } @@ -349,10 +350,11 @@ func (r *revidInst) sendClipToHTTP(clip []byte) error { client := http.Client{ Timeout: timeout, } + url := r.config.HttpAddress+strconv.Itoa(len(clip)) r.Log(Debug, fmt.Sprintf("Posting %s (%d bytes)\n", url, len(clip))) - resp, err := client.Post(r.config.HttpAddress + strconv.Itoa(len(clip)), "video/mp2t", bytes.NewReader(clip)) // lighter than NewBuffer + resp, err := client.Post(url, "video/mp2t", bytes.NewReader(clip)) // lighter than NewBuffer if err != nil { - return fmt.Errorf("Error posting to %s: %s", output, err) + return fmt.Errorf("Error posting to %s: %s", url, err) } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) @@ -377,7 +379,7 @@ func (r *revidInst) sendClipToRtmp(clip []byte) error { // Start invokes a revidInst to start processing video from a defined input // and packetising to a defined output. -func (r *revidInst) setupOutputForRtmp(){ +func (r *revidInst) setupOutputForRtmp() error { r.ffmpegCmd = exec.Command(ffmpegPath, "-f", "h264", "-r", r.config.FrameRate, @@ -398,19 +400,25 @@ func (r *revidInst) setupOutputForRtmp(){ if err != nil { r.Log(Error, err.Error()) r.Stop() - return + return err } err = r.ffmpegCmd.Start() if err != nil { r.Log(Error, err.Error()) r.Stop() - return + return err } + return nil +} + +func (r *revidInst) setupOutputForFile() (err error) { + r.outputFile, err = os.Create(r.config.OutputFileName) + return } // Start invokes a revidInst to start processing video from a defined input // and packetising to a defined output. -func (r *revidInst) setupInputForRaspivid(){ +func (r *revidInst) setupInputForRaspivid() error { r.Log(Info, "Starting raspivid!") switch r.config.InputCodec { case H264: @@ -441,14 +449,16 @@ func (r *revidInst) setupInputForRaspivid(){ r.inputReader = bufio.NewReader(stdout) if err != nil { r.Log(Error, err.Error()) - return + return err } go r.readCamera() + return nil } // Start invokes a revidInst to start processing video from a defined input -func (r *revidInst) setupInputForFile(){ +func (r *revidInst) setupInputForFile() error { go r.readFile() + return nil } // readCamera reads data from the defined camera while the revidInst is running. @@ -471,33 +481,32 @@ func (r *revidInst) readCamera() { } // readFile reads data from the defined file while the revidInst is running. -func (r *revidInst) readFile() { - for { - if len(r.parser.GetInputChan()) == 0 { - var err error - r.inputFile, err = os.Open(r.config.InputFileName) - if err != nil { - r.Log(Error, err.Error()) - r.Stop() - return - } - stats, err := r.inputFile.Stat() - if err != nil { - r.Log(Error, "Could not get input file stats!") - r.Stop() - return - } - data := make([]byte, stats.Size()) - _, err = r.inputFile.Read(data) - if err != nil { - r.Log(Error, err.Error()) - r.Stop() - return - } - for i := range data { - r.parser.GetInputChan() <- data[i] - } - r.inputFile.Close() +func (r *revidInst) readFile() error { + if len(r.parser.GetInputChan()) == 0 { + var err error + r.inputFile, err = os.Open(r.config.InputFileName) + if err != nil { + r.Log(Error, err.Error()) + r.Stop() + return err } + stats, err := r.inputFile.Stat() + if err != nil { + r.Log(Error, "Could not get input file stats!") + r.Stop() + return err + } + data := make([]byte, stats.Size()) + _, err = r.inputFile.Read(data) + if err != nil { + r.Log(Error, err.Error()) + r.Stop() + return err + } + for i := range data { + r.parser.GetInputChan() <- data[i] + } + r.inputFile.Close() } + return nil } diff --git a/revid/revid_test.go b/revid/revid_test.go index 756a580a..91daa4c0 100644 --- a/revid/revid_test.go +++ b/revid/revid_test.go @@ -29,8 +29,8 @@ LICENSE package revid import ( - "testing" - "time" + "testing" + "time" ) /* @@ -123,22 +123,22 @@ func TestRtmpOutput(t *testing.T){ */ // Test h264 inputfile to flv output files -func TestFlvOutputFile(t *testing.T){ - config := Config{ - Input: File, - InputFileName: "testInput.h264", - InputCodec: H264, - Output: File, - OutputFileName: "testOutput.flv", - Packetization: Flv, - FrameRate: "25", - } - revidInst, err := NewRevidInstance(config) - if err != nil { - t.Errorf("Should not of have got an error!: %v\n", err.Error()) - return - } - revidInst.Start() - time.Sleep(5*time.Second) - revidInst.Stop() +func TestFlvOutputFile(t *testing.T) { + config := Config{ + Input: File, + InputFileName: "testInput.h264", + InputCodec: H264, + Output: File, + OutputFileName: "testOutput.flv", + Packetization: Flv, + FrameRate: "25", + } + revidInst, err := NewRevidInstance(config) + if err != nil { + t.Errorf("Should not of have got an error!: %v\n", err.Error()) + return + } + revidInst.Start() + time.Sleep(5 * time.Second) + revidInst.Stop() } diff --git a/ringbuffer/RingBuffer.go b/ringbuffer/RingBuffer.go index 21f1b07a..72f0218d 100644 --- a/ringbuffer/RingBuffer.go +++ b/ringbuffer/RingBuffer.go @@ -45,7 +45,7 @@ type RingBuffer interface { DoneReading() error IsReadable() bool IsWritable() bool - GetNoOfElements() int + GetNoOfElements() int } func (rb *ringBuffer)GetNoOfElements() int { @@ -100,7 +100,7 @@ func (rb *ringBuffer) Get() ([]byte, error) { if !rb.IsWritable() { return nil, errors.New("Buffer full!") } - var nextlast int + var nextlast int if !rb.currentlyWriting { rb.currentlyWriting = true nextlast = rb.last + 1