diff --git a/parser/Parser.go b/parser/Parser.go index 91b0f0ac..636197c8 100644 --- a/parser/Parser.go +++ b/parser/Parser.go @@ -49,15 +49,16 @@ var ( type Parser interface { Stop() Start() - GetInputChan() - GetOutputChan() + GetInputChan() chan byte + GetOutputChan() chan []byte + SetOutputChan(achan chan []byte) } type h264Parser struct { inputBuffer []byte isParsing bool - parserOutputChanRef chan<- []byte - userOutputChanRef <-chan []byte + parserOutputChanRef chan []byte + userOutputChanRef chan []byte inputChan chan byte } @@ -80,7 +81,7 @@ func (p *h264Parser)GetInputChan() chan byte { return p.inputChan } -func (p *h264Parser)GetOutputChan() <-chan []byte { +func (p *h264Parser)GetOutputChan() chan []byte { return p.userOutputChanRef } @@ -119,8 +120,8 @@ func (p *h264Parser)parse() { type mjpegParser struct { inputBuffer []byte isParsing bool - parserOutputChanRef chan<- []byte - userOutputChanRef <-chan []byte + parserOutputChanRef chan []byte + userOutputChanRef chan []byte inputChan chan byte } @@ -143,7 +144,7 @@ func (p *mjpegParser)GetInputChan() chan byte { return p.inputChan } -func (p *mjpegParser)GetOutputChan() <-chan []byte { +func (p *mjpegParser)GetOutputChan() chan []byte { return p.userOutputChanRef } diff --git a/pes/Pes.go b/pes/Pes.go index 0c526960..c558d015 100644 --- a/pes/Pes.go +++ b/pes/Pes.go @@ -27,11 +27,12 @@ LICENSE package pes import ( - "bitbucket.org/ausocean/av/tools" + //"bitbucket.org/ausocean/av/tools" + "../tools" ) const ( - maxPesSize = 10000 + maxPesSize = 10000 ) /* diff --git a/revid/RevidInstance.go b/revid/RevidInstance.go index e3767d7f..55a5c570 100644 --- a/revid/RevidInstance.go +++ b/revid/RevidInstance.go @@ -41,12 +41,17 @@ import ( "os/exec" "strconv" "time" + "errors" - "bitbucket.org/ausocean/av/h264" - "bitbucket.org/ausocean/av/tsgenerator" + //"bitbucket.org/ausocean/av/h264" + //"bitbucket.org/ausocean/av/tsgenerator" + "../parser" + "../tsgenerator" - "bitbucket.org/ausocean/av/ringbuffer" - "bitbucket.org/ausocean/utils/smartLogger" + //"bitbucket.org/ausocean/av/ringbuffer" + //"bitbucket.org/ausocean/utils/smartLogger" + "../ringbuffer" + "../../utils/smartLogger" ) // defaults and networking consts @@ -63,7 +68,7 @@ const ( packetsPerFrame = 7 h264BufferSize = 500000 bitrateTime = 60 - defualtFrameRate = 25 + mjpegParserInChanLen= 10000 ) // Log Types @@ -76,28 +81,42 @@ const ( // Config enums const ( - Raspivid = 0 - Rtp = 1 - H264Codec = 2 - File = 4 - HttpOut = 5 + raspivid = 1 + rtp = 2 + h264Codec = 3 + file = 4 + httpOut = 5 + h264 = 6 + mjpeg = 7 + none = 8 + mpegts = 9 ) type Config struct { Input uint8 - InputCmd string + InputCodec uint8 Output uint8 OutputFileName string InputFileName string Height string Width string - Bitrate string FrameRate string HttpAddress string Quantization string + Timeout string + Packetization uint8 + IntraRefreshPeriod string Logger smartLogger.LogInstance } +// Default config settings +const ( + defaultFrameRate = "25" + defaultWidth = "1280" + defaultHeight = "720" + defaultIntraRefreshPeriod = "100" +) + type RevidInst interface { Start() Stop() @@ -118,9 +137,10 @@ type revidInst struct { outputFile *os.File inputFile *os.File generator tsgenerator.TsGenerator - h264Parser h264.H264Parser + parser parser.Parser cmd *exec.Cmd inputReader *bufio.Reader + mjpegOutputChan chan []byte } func NewRevidInstance(config Config) (r *revidInst, err error) { @@ -129,25 +149,36 @@ func NewRevidInstance(config Config) (r *revidInst, err error) { r.dumpPCRBase = 0 r.ChangeState(config) switch r.config.Output { - case File: + case file: r.outputFile, err = os.Create(r.config.OutputFileName) if err != nil { return nil, err } } switch r.config.Input { - case File: + case file: r.inputFile, err = os.Open(r.config.InputFileName) if err != nil { return nil, err } } - r.generator = tsgenerator.NewTsGenerator(defaultFrameRate) - r.parser = parser.NewH264Parser() - r.parser.SetOutputChan(r.generator.GetNalInputChan()) - r.generator.Start() + switch r.config.InputCodec { + case h264: + r.parser = parser.NewH264Parser() + case mjpeg: + r.parser = parser.NewMJPEGParser(mjpegParserInChanLen) + } + switch r.config.Packetization { + case none: + r.parser.SetOutputChan(r.mjpegOutputChan) + case mpegts: + frameRateAsInt, _ := strconv.Atoi(r.config.FrameRate) + r.generator = tsgenerator.NewTsGenerator(uint(frameRateAsInt)) + r.parser.SetOutputChan(r.generator.GetNalInputChan()) + r.generator.Start() + } r.parser.Start() - go r.input() + go r.packClips() r.Log(Info, "New revid instance created! config is:") r.Log(Info, fmt.Sprintf("%v", r.config)) return @@ -157,14 +188,67 @@ func (r *revidInst) GetConfigRef() *Config { return &r.config } -func (r *revidInst) ChangeState(newconfig Config) error { - // TODO: check that the config is legit - r.config = newconfig +func (r *revidInst) ChangeState(config Config) error { + switch config.Input { + case rtp: + case raspivid: + case file: + default: + return errors.New("Bad input type defined in config!") + } + switch config.InputCodec { + case h264: + case mjpeg: + default: + return errors.New("Bad input format defined in config!") + } + switch config.Output { + case httpOut: + case file: + default: + return errors.New("Bad output type defined in config!") + } + if integer, err := strconv.Atoi(config.Width); integer < 0 || err != nil { + return errors.New("Bad width defined in config!") + } + if integer, _ := strconv.Atoi(config.Width); integer == 0 { + config.Width = defaultWidth + } + if integer, err := strconv.Atoi(config.Height); integer < 0 || err != nil { + return errors.New("Bad height defined in config!") + } + if integer, _ := strconv.Atoi(config.Height); integer == 0 { + config.Height = defaultHeight + } + if integer, err := strconv.Atoi(config.FrameRate); integer < 0 || err != nil { + return errors.New("Bad FrameRate defined in config!") + } + if integer, _ := strconv.Atoi(config.FrameRate); integer == 0 { + config.FrameRate = defaultFrameRate + } + if integer, err := strconv.Atoi(config.Timeout); integer < 0 || err != nil { + return errors.New("Bad timeout define in config!") + } + if integer, err := strconv.Atoi(config.IntraRefreshPeriod); integer < 0 || err != nil { + return errors.New("Bad intra refresh period defined in config!") + } + if integer, _ := strconv.Atoi(config.IntraRefreshPeriod); integer == 0 { + config.IntraRefreshPeriod = defaultIntraRefreshPeriod + } + if integer, err := strconv.Atoi(config.Quantization); integer <= 0 || integer > 51 || err != nil { + return errors.New("Bad quantization level defined in config!") + } + r.config = config return nil } -func (r *revidInst) Log(logType, m string) { - r.config.Logger.Log(logType, m) +func (r *revidInst) Log(logType, m string) error { + if r.config.Logger != nil { + r.config.Logger.Log(logType, m) + return nil + } else { + return errors.New("No logger was defined in config for this instance!") + } } func (r *revidInst) IsRunning() bool { @@ -177,12 +261,29 @@ func (r *revidInst) Start() { return } r.Log(Info, "Starting Revid!") - var data []byte switch r.config.Input { - case Raspivid: + case raspivid: r.Log(Info, "Starting raspivid!") - r.cmd = exec.Command("raspivid", "-o", "-", "-n", "-t", "0", "-b", - r.config.Bitrate, "-qp", r.config.Quantization, "-w", r.config.Width, "-h", r.config.Height, "-fps", r.config.FrameRate, "-ih", "-g", "100") + var codec string + switch r.config.InputCodec{ + case h264: + codec = "H264" + case mjpeg: + codec = "MJPEG" + } + r.cmd = exec.Command("raspivid", + "-cd", codec, + "-o", "-", + "-n", + "-t", r.config.Timeout, + "-b", "0", + "-qp", r.config.Quantization, + "-w", r.config.Width, + "-h", r.config.Height, + "-fps", r.config.FrameRate, + "-ih", + "-g", r.config.IntraRefreshPeriod, + ) stdout, _ := r.cmd.StdoutPipe() err := r.cmd.Start() r.inputReader = bufio.NewReader(stdout) @@ -191,42 +292,44 @@ func (r *revidInst) Start() { return } r.isRunning = true - go func() { - r.Log(Info, "Reading camera data!") - for r.isRunning { - data = make([]byte, 1) - _, err := io.ReadFull(r.inputReader, data) - switch { - case err != nil && err.Error() == "EOF" && r.isRunning: - r.Log(Error, "No data from camera!") - time.Sleep(5 * time.Second) - case err != nil && r.isRunning: - r.Log(Error, err.Error()) - default: - r.parser.GetInput() <-h264Data[0] - } - } - r.Log(Info, "Out of reading routine!") - }() - case File: + case file: stats, err := r.inputFile.Stat() if err != nil { r.Log(Error, "Could not get input file stats!") r.Stop() return } - h264Data = make([]byte, stats.Size()) - _, err = r.inputFile.Read(h264Data) + data := make([]byte, stats.Size()) + _, err = r.inputFile.Read(data) if err != nil { r.Log(Error, err.Error()) r.Stop() return } - for i := range h264Data { - r.h264Parser.InputByteChan <- h264Data[i] + for i := range data { + r.parser.GetInputChan() <- data[i] } } - go r.output() + go r.readCamera() + go r.outputClips() +} + +func (r *revidInst)readCamera() { + r.Log(Info, "Reading camera data!") + for r.isRunning { + data := make([]byte, 1) + _, err := io.ReadFull(r.inputReader, data) + switch { + case err != nil && err.Error() == "EOF" && r.isRunning: + r.Log(Error, "No data from camera!") + time.Sleep(5 * time.Second) + case err != nil && r.isRunning: + r.Log(Error, err.Error()) + default: + r.parser.GetInputChan()<-data[0] + } + } + r.Log(Info, "Out of reading routine!") } func (r *revidInst) Stop() { @@ -239,7 +342,7 @@ func (r *revidInst) Stop() { } } -func (r *revidInst) input() { +func (r *revidInst) packClips() { clipSize := 0 packetCount := 0 now := time.Now() @@ -247,26 +350,42 @@ func (r *revidInst) input() { for { if clip, err := r.ringBuffer.Get(); err != nil { r.Log(Error, err.Error()) - r.Log(Warning, "Clearing TS chan!") - for len(r.generator.GetTsOutputChan()) > 0 { - <-(r.generator.GetTsOutputChan()) + switch r.config.Packetization { + case none: + r.Log(Warning, "Clearing mjpeg chan!") + for len(r.mjpegOutputChan) > 0 { + <-(r.mjpegOutputChan) + } + case mpegts: + r.Log(Warning, "Clearing TS chan!") + for len(r.generator.GetTsOutputChan()) > 0 { + <-(r.generator.GetTsOutputChan()) + } } time.Sleep(1 * time.Second) } else { for { - tsPacket := <-(r.generator.GetTsOutputChan()) - tsByteSlice, err := tsPacket.ToByteSlice() - if err != nil { - r.Log(Error, err.Error()) + switch r.config.Packetization { + case none: + frame := <-r.mjpegOutputChan + upperBound := clipSize + len(frame) + copy(clip[clipSize:upperBound], frame) + packetCount ++ + clipSize += len(frame) + case mpegts: + tsPacket := <-(r.generator.GetTsOutputChan()) + tsByteSlice, err := tsPacket.ToByteSlice() + if err != nil { + r.Log(Error, err.Error()) + } + upperBound := clipSize + mp2tPacketSize + copy(clip[clipSize:upperBound], tsByteSlice) + packetCount++ + clipSize += mp2tPacketSize } - upperBound := clipSize + mp2tPacketSize - copy(clip[clipSize:upperBound], tsByteSlice) - packetCount++ - clipSize += mp2tPacketSize // send if (1) our buffer is full or (2) 1 second has elapsed and we have % packetsPerFrame now = time.Now() - if (packetCount == mp2tMaxPackets) || - (now.Sub(prevTime) > clipDuration*time.Second && packetCount%packetsPerFrame == 0) { + if now.Sub(prevTime) > clipDuration*time.Second && len(clip) > 0 { if err := r.ringBuffer.DoneWriting(clipSize); err != nil { r.Log(Error, err.Error()) r.Log(Warning, "Dropping clip!") @@ -281,7 +400,7 @@ func (r *revidInst) input() { } } -func (r *revidInst) output() { +func (r *revidInst) outputClips() { now := time.Now() prevTime := now bytes := 0 @@ -298,9 +417,9 @@ func (r *revidInst) output() { r.Log(Debug, fmt.Sprintf("Delay is: %v\n", delay)) r.Log(Debug, fmt.Sprintf("Ring buffer size: %v\n", r.ringBuffer.GetNoOfElements())) switch r.config.Output { - case File: + case file: r.outputFile.Write(clip) - case HttpOut: + case httpOut: bytes += len(clip) for err := r.sendClipToHTTP(clip, r.config.HttpAddress); err != nil; { r.Log(Error, err.Error()) diff --git a/tsgenerator/TsGenerator.go b/tsgenerator/TsGenerator.go index 43af5f8d..381d110e 100644 --- a/tsgenerator/TsGenerator.go +++ b/tsgenerator/TsGenerator.go @@ -31,10 +31,14 @@ package tsgenerator import ( _"fmt" _"os" - "bitbucket.org/ausocean/av/mpegts" - "bitbucket.org/ausocean/av/pes" - "bitbucket.org/ausocean/av/tools" - "bitbucket.org/ausocean/av/rtp" + //"bitbucket.org/ausocean/av/mpegts" + //"bitbucket.org/ausocean/av/pes" + //"bitbucket.org/ausocean/av/tools" + //"bitbucket.org/ausocean/av/rtp" + "../mpegts" + "../pes" + "../tools" + "../rtp" ) var ( @@ -52,7 +56,7 @@ const ( type TsGenerator interface { generate() - GetNalInputChan() chan<- []byte + GetNalInputChan() chan []byte GetTsOutputChan() <-chan *mpegts.MpegTsPacket Start() genPts()(pts uint64) @@ -64,8 +68,7 @@ type tsGenerator struct { tsChan chan<- *mpegts.MpegTsPacket InputChan chan<- rtp.RtpPacket inputChan <-chan rtp.RtpPacket - NalInputChan chan<- []byte - nalInputChan <-chan []byte + nalInputChan chan []byte currentTsPacket *mpegts.MpegTsPacket payloadByteChan chan byte currentCC byte @@ -76,8 +79,8 @@ type tsGenerator struct { ccMap map[int]int } -func (g *tsGenerator)GetNalInputChan() chan<- []byte { - return g.NalInputChan +func (g *tsGenerator)GetNalInputChan() chan []byte { + return g.nalInputChan } func (g *tsGenerator)GetTsOutputChan() <-chan *mpegts.MpegTsPacket { @@ -92,9 +95,7 @@ func NewTsGenerator(fps uint) (g *tsGenerator) { inputChan := make(chan rtp.RtpPacket, 100) g.InputChan = inputChan g.inputChan = inputChan - nalInputChan := make(chan []byte, 10000) - g.NalInputChan = nalInputChan - g.nalInputChan = nalInputChan + g.nalInputChan = make(chan []byte, 10000) g.currentCC = 0 g.fps = fps g.currentPcrTime = .0 @@ -179,7 +180,7 @@ func (g *tsGenerator) generate() { buffer = append(buffer, rtpBuffer[0].Payload[2:]...) if tools.GetEndBit(rtpBuffer[0]) == 1 { rtpBuffer = rtpBuffer[1:] - g.NalInputChan <- buffer + g.nalInputChan <- buffer break } rtpBuffer = rtpBuffer[1:] @@ -199,7 +200,7 @@ func (g *tsGenerator) generate() { buffer = append(buffer, rtpBuffer[0].Payload[0]&0xE0|rtpBuffer[0].Payload[1]&0x1F) buffer = append(buffer, rtpBuffer[0].Payload[2:]...) rtpBuffer = rtpBuffer[1:] - g.NalInputChan <- buffer + g.nalInputChan <- buffer default: } }