From 40079c634686811790e3ff44139826d13d2f74cc Mon Sep 17 00:00:00 2001 From: Unknown Date: Wed, 24 Jan 2018 14:42:22 +1030 Subject: [PATCH] Fixing cosmetic stuff --- h264/h264Parser.go | 7 +- revid/RevidInstance.go | 165 ++++++++++++++++++------------------- tsgenerator/TsGenerator.go | 10 ++- 3 files changed, 92 insertions(+), 90 deletions(-) diff --git a/h264/h264Parser.go b/h264/h264Parser.go index d79f3b21..09d9bf53 100644 --- a/h264/h264Parser.go +++ b/h264/h264Parser.go @@ -56,10 +56,15 @@ func (p* H264Parser)Stop(){ p.isParsing = false } -func (p* H264Parser)Parse() { +func (p *H264Parser)Start(){ + go p.parse() +} + +func (p *H264Parser)parse() { p.isParsing = true outputBuffer := make([]byte, 0, 10000) searchingForEnd := false + p.InputByteChan = make(chan byte, 10000) for p.isParsing { aByte := <-p.InputByteChan outputBuffer = append(outputBuffer, aByte) diff --git a/revid/RevidInstance.go b/revid/RevidInstance.go index e5edd3d2..9d2e28c6 100644 --- a/revid/RevidInstance.go +++ b/revid/RevidInstance.go @@ -51,33 +51,29 @@ import ( // defaults and networking consts const ( - clipDuration = 1 // s - defaultPID = 256 - defaultFrameRate = 25 - mp2tPacketSize = 188 // MPEG-TS packet size - mp2tMaxPackets = 2016 * clipDuration // # first multiple of 7 and 8 greater than 2000 - udpPackets = 7 // # of UDP packets per ethernet frame (8 is the max) - rtpPackets = 7 // # of RTP packets per ethernet frame (7 is the max) - rtpHeaderSize = 12 - rtpSSRC = 1 // any value will do - bufferSize = 100 / clipDuration - httpTimeOut = 5 // s - motionThreshold = "0.0025" - qscale = "3" - defaultRaspividCmd = "raspivid -o -" - framesPerSec = 25 - packetsPerFrame = 7 - h264BufferSize = 500000 - bitrateTime = 60 + clipDuration = 1 // s + mp2tPacketSize = 188 // MPEG-TS packet size + mp2tMaxPackets = 2016 * clipDuration // # first multiple of 7 and 8 greater than 2000 + udpPackets = 7 // # of UDP packets per ethernet frame (8 is the max) + rtpPackets = 7 // # of RTP packets per ethernet frame (7 is the max) + rtpHeaderSize = 12 + rtpSSRC = 1 // any value will do + bufferSize = 100 / clipDuration + httpTimeOut = 5 // s + packetsPerFrame = 7 + h264BufferSize = 500000 + bitrateTime = 60 ) +// Log Types const ( - Error = "Error" + Error = "Error" Warning = "Warning" - Info = "Info" - Debug = "Debug" + Info = "Info" + Debug = "Debug" ) +// Config enums const ( Raspivid = 0 Rtp = 1 @@ -86,9 +82,6 @@ const ( HttpOut = 5 ) -var cmd *exec.Cmd -var inputReader *bufio.Reader - type Config struct { Input uint8 InputCmd string @@ -98,10 +91,10 @@ type Config struct { Height string Width string Bitrate string - FrameRate string - HttpAddress string - Quantization string - Logger smartLogger.LogInstance + FrameRate string + HttpAddress string + Quantization string + Logger smartLogger.LogInstance } type RevidInst interface { @@ -125,10 +118,8 @@ type revidInst struct { inputFile *os.File generator tsgenerator.TsGenerator h264Parser h264.H264Parser -} - -func (r *revidInst) GetConfigRef() *Config{ - return &r.config + cmd *exec.Cmd + inputReader *bufio.Reader } func NewRevidInstance(config Config) (r *revidInst, err error) { @@ -150,50 +141,51 @@ func NewRevidInstance(config Config) (r *revidInst, err error) { return nil, err } } - r.generator = tsgenerator.NewTsGenerator(framesPerSec) + r.generator = tsgenerator.NewTsGenerator(r.config.FrameRate) + r.generator.Start() r.h264Parser = h264.H264Parser{OutputChan: r.generator.GetNalInputChan()} - r.h264Parser.InputByteChan = make(chan byte, 10000) - // TODO: Need to create constructor for parser otherwise I'm going to break - // something eventuallyl - go r.h264Parser.Parse() + r.h264Parser.Start() go r.input() - go r.generator.Generate() r.Log(Info, "New revid instance created! config is:") - r.Log(Info, fmt.Sprintf("%v",r.config)) + r.Log(Info, fmt.Sprintf("%v", r.config)) return } +func (r *revidInst) GetConfigRef() *Config { + return &r.config +} + func (r *revidInst) ChangeState(newconfig Config) error { - // TODO: check that the config is G + // TODO: check that the config is legit r.config = newconfig return nil } -func (r *revidInst) Log(logType, m string){ - r.config.Logger.Log(logType,m) +func (r *revidInst) Log(logType, m string) { + r.config.Logger.Log(logType, m) } -func (r *revidInst)IsRunning() bool { +func (r *revidInst) IsRunning() bool { return r.isRunning } func (r *revidInst) Start() { if r.isRunning { - r.Log(Warning,"revidInst.Start() called but revid already running!") + r.Log(Warning, "revidInst.Start() called but revid already running!") return } - r.Log(Info,"Starting Revid!") + r.Log(Info, "Starting Revid!") var h264Data []byte switch r.config.Input { case Raspivid: - r.Log(Info,"Starting raspivid!") + r.Log(Info, "Starting raspivid!") cmd = exec.Command("raspivid", "-o", "-", "-n", "-t", "0", "-b", - r.config.Bitrate,"-qp", r.config.Quantization, "-w", r.config.Width, "-h", r.config.Height, "-fps", r.config.FrameRate, "-ih", "-g", "100") + r.config.Bitrate, "-qp", r.config.Quantization, "-w", r.config.Width, "-h", r.config.Height, "-fps", r.config.FrameRate, "-ih", "-g", "100") stdout, _ := cmd.StdoutPipe() - err := cmd.Start() - inputReader = bufio.NewReader(stdout) + err := r.cmd.Start() + r.inputReader = bufio.NewReader(stdout) if err != nil { - r.Log(Error,err.Error()) + r.Log(Error, err.Error()) return } r.isRunning = true @@ -201,16 +193,14 @@ func (r *revidInst) Start() { r.Log(Info, "Reading camera data!") for r.isRunning { h264Data = make([]byte, 1) - _, err := io.ReadFull(inputReader, h264Data) - if err != nil { - switch{ - case err.Error() == "EOF" && r.isRunning: - r.Log(Error, "No data from camera!") - time.Sleep(5*time.Second) - case r.isRunning: - r.Log(Error, err.Error()) - } - } else { + _, err := io.ReadFull(r.inputReader, h264Data) + switch { + case err != nil && err.Error() == "EOF" && r.isRunning: + r.Log(Error, "No data from camera!") + time.Sleep(5 * time.Second) + case err != nil && r.isRunning: + r.Log(Error, err.Error()) + default: r.h264Parser.InputByteChan <- h264Data[0] } } @@ -220,12 +210,15 @@ func (r *revidInst) Start() { stats, err := r.inputFile.Stat() if err != nil { r.Log(Error, "Could not get input file stats!") + r.Stop() return } h264Data = make([]byte, stats.Size()) _, err = r.inputFile.Read(h264Data) if err != nil { r.Log(Error, err.Error()) + r.Stop() + return } for i := range h264Data { r.h264Parser.InputByteChan <- h264Data[i] @@ -236,11 +229,11 @@ func (r *revidInst) Start() { func (r *revidInst) Stop() { if r.isRunning { - r.Log(Info,"Stopping revid!") + r.Log(Info, "Stopping revid!") r.isRunning = false - cmd.Process.Kill() + r.cmd.Process.Kill() } else { - r.Log(Warning,"revidInst.Stop() called but revid not running!") + r.Log(Warning, "revidInst.Stop() called but revid not running!") } } @@ -251,22 +244,21 @@ func (r *revidInst) input() { prevTime := now for { if clip, err := r.ringBuffer.Get(); err != nil { - r.Log(Error,err.Error()) - r.Log(Warning,"Clearing tsPkt chan!") - tsPktChanLen := len(r.generator.GetTsOutputChan()) - for i := 0; i < tsPktChanLen; i++ { + r.Log(Error, err.Error()) + r.Log(Warning, "Clearing TS chan!") + for len(r.generator.GetTsOutputChan()) > 0 { <-(r.generator.GetTsOutputChan()) } - time.Sleep(1*time.Second) + time.Sleep(1 * time.Second) } else { for { tsPacket := <-(r.generator.GetTsOutputChan()) - byteSlice, err := tsPacket.ToByteSlice() + tsByteSlice, err := tsPacket.ToByteSlice() if err != nil { - r.Log(Error,err.Error()) + r.Log(Error, err.Error()) } upperBound := clipSize + mp2tPacketSize - copy(clip[clipSize:upperBound], byteSlice) + copy(clip[clipSize:upperBound], tsByteSlice) packetCount++ clipSize += mp2tPacketSize // send if (1) our buffer is full or (2) 1 second has elapsed and we have % packetsPerFrame @@ -274,8 +266,8 @@ func (r *revidInst) input() { if (packetCount == mp2tMaxPackets) || (now.Sub(prevTime) > clipDuration*time.Second && packetCount%packetsPerFrame == 0) { if err := r.ringBuffer.DoneWriting(clipSize); err != nil { - r.Log(Error,err.Error()) - r.Log(Warning,"Dropping clip!") + r.Log(Error, err.Error()) + r.Log(Warning, "Dropping clip!") } clipSize = 0 packetCount = 0 @@ -293,36 +285,37 @@ func (r *revidInst) output() { bytes := 0 delay := 0 for r.isRunning { - switch{ + switch { case r.ringBuffer.GetNoOfElements() < 2: delay++ - time.Sleep(time.Duration(delay)*time.Millisecond) + time.Sleep(time.Duration(delay) * time.Millisecond) case delay > 10: delay -= 10 } if clip, err := r.ringBuffer.Read(); err == nil { - r.Log(Debug,fmt.Sprintf("Delay is: %v\n", delay)) + r.Log(Debug, fmt.Sprintf("Delay is: %v\n", delay)) + r.Log(Debug, fmt.Sprintf("Ring buffer size: %v\n", r.ringBuffer.GetNoOfElements())) switch r.config.Output { case File: r.outputFile.Write(clip) case HttpOut: bytes += len(clip) for err := r.sendClipToHTTP(clip, r.config.HttpAddress); err != nil; { - r.Log(Error,err.Error()) - r.Log(Warning,"Post failed trying again!") + r.Log(Error, err.Error()) + r.Log(Warning, "Post failed trying again!") err = r.sendClipToHTTP(clip, r.config.HttpAddress) } default: - r.Log(Error,"No output defined!") + r.Log(Error, "No output defined!") } if err := r.ringBuffer.DoneReading(); err != nil { - r.Log(Error,err.Error()) + r.Log(Error, err.Error()) } now = time.Now() deltaTime := now.Sub(prevTime) if deltaTime > time.Duration(bitrateTime)*time.Second { - r.Log(Info,fmt.Sprintf("Bitrate: %v bits/s\n", int64(float64(bytes*8) / float64(deltaTime/1e9)))) - r.Log(Info,fmt.Sprintf("Ring buffer size: %v\n", r.ringBuffer.GetNoOfElements())) + r.Log(Info, fmt.Sprintf("Bitrate: %v bits/s\n", int64(float64(bytes*8)/float64(deltaTime/1e9)))) + r.Log(Info, fmt.Sprintf("Ring buffer size: %v\n", r.ringBuffer.GetNoOfElements())) prevTime = now bytes = 0 } @@ -331,13 +324,13 @@ func (r *revidInst) output() { } // sendClipToHTPP posts a video clip via HTTP, using a new TCP connection each time. -func (r *revidInst)sendClipToHTTP(clip []byte, output string) error { +func (r *revidInst) sendClipToHTTP(clip []byte, output string) error { timeout := time.Duration(httpTimeOut * time.Second) client := http.Client{ Timeout: timeout, } url := output + strconv.Itoa(len(clip)) - r.Log(Debug,fmt.Sprintf("Posting %s (%d bytes)\n", url, len(clip))) + r.Log(Debug, fmt.Sprintf("Posting %s (%d bytes)\n", url, len(clip))) resp, err := client.Post(url, "video/mp2t", bytes.NewReader(clip)) // lighter than NewBuffer if err != nil { return fmt.Errorf("Error posting to %s: %s", output, err) @@ -345,9 +338,9 @@ func (r *revidInst)sendClipToHTTP(clip []byte, output string) error { defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err == nil { - r.Log(Debug,fmt.Sprintf("%s\n", body)) + r.Log(Debug, fmt.Sprintf("%s\n", body)) } else { - r.Log(Error,err.Error()) + r.Log(Error, err.Error()) } return nil } diff --git a/tsgenerator/TsGenerator.go b/tsgenerator/TsGenerator.go index 6766303a..bdac8328 100644 --- a/tsgenerator/TsGenerator.go +++ b/tsgenerator/TsGenerator.go @@ -51,7 +51,7 @@ const ( ) type TsGenerator interface { - Generate() + generate() GetNalInputChan() chan<- []byte GetTsOutputChan() <-chan *mpegts.MpegTsPacket } @@ -118,7 +118,11 @@ func (g *tsGenerator) genPcr()(pcr uint64){ return } -func (g *tsGenerator) Generate() { +func (g *tsGenerator) Start(){ + go g.generate() +} + +func (g *tsGenerator) generate() { var rtpBuffer [](*rtp.RtpPacket) for { select { @@ -221,7 +225,7 @@ func (g *tsGenerator) Generate() { PCRF: pusi, } pkt.FillPayload(g.payloadByteChan) - + if pusi { // Create pat table and send off patPkt := mpegts.MpegTsPacket{