diff --git a/generator/FLVGenerator.go b/generator/FLVGenerator.go new file mode 100644 index 00000000..8ef79225 --- /dev/null +++ b/generator/FLVGenerator.go @@ -0,0 +1,3 @@ +type FLVGenerator struct { + +} diff --git a/generator/Generator.go b/generator/Generator.go new file mode 100644 index 00000000..aa965de0 --- /dev/null +++ b/generator/Generator.go @@ -0,0 +1,49 @@ +/* +NAME + RtpToTsConverter.go - provides utilities for the conversion of Rtp packets + to equivalent MpegTs packets. + +DESCRIPTION + See Readme.md + +AUTHOR + Saxon Nelson-Milton + +LICENSE + RtpToTsConverter.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). +*/ + +package generator + +import ( + _"fmt" + _"os" + //"bitbucket.org/ausocean/av/mpegts" + //"bitbucket.org/ausocean/av/pes" + //"bitbucket.org/ausocean/av/tools" + //"bitbucket.org/ausocean/av/rtp" + "../mpegts" + "../pes" + "../tools" + "../rtp" +) + +type Generator interface { + GetInputChan() chan []byte + GetOutputChan() <-chan *mpegts.MpegTsPacket + Start() + Stop() +} diff --git a/tsgenerator/TsGenerator.go b/generator/MPEGTSGenerator.go similarity index 85% rename from tsgenerator/TsGenerator.go rename to generator/MPEGTSGenerator.go index 381d110e..aa483a7a 100644 --- a/tsgenerator/TsGenerator.go +++ b/generator/MPEGTSGenerator.go @@ -1,45 +1,4 @@ -/* -NAME - RtpToTsConverter.go - provides utilities for the conversion of Rtp packets - to equivalent MpegTs packets. - -DESCRIPTION - See Readme.md - -AUTHOR - Saxon Nelson-Milton - -LICENSE - RtpToTsConverter.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) - - It is free software: you can redistribute it and/or modify them - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - It is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. - - You should have received a copy of the GNU General Public License - along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). -*/ - -package tsgenerator - -import ( - _"fmt" - _"os" - //"bitbucket.org/ausocean/av/mpegts" - //"bitbucket.org/ausocean/av/pes" - //"bitbucket.org/ausocean/av/tools" - //"bitbucket.org/ausocean/av/rtp" - "../mpegts" - "../pes" - "../tools" - "../rtp" -) +package generator var ( PatTable = []byte{0, 0, 176, 13, 0, 1, 193, 0, 0, 0, 1, 240, 0, 42, 177, 4, 178, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,} @@ -54,15 +13,6 @@ const ( VideoPid = 256 ) -type TsGenerator interface { - generate() - GetNalInputChan() chan []byte - GetTsOutputChan() <-chan *mpegts.MpegTsPacket - Start() - genPts()(pts uint64) - genPcr()(pts uint64) -} - type tsGenerator struct { TsChan <-chan *mpegts.MpegTsPacket tsChan chan<- *mpegts.MpegTsPacket diff --git a/tsgenerator/tsgenerator_test.go b/generator/tsgenerator_test.go similarity index 100% rename from tsgenerator/tsgenerator_test.go rename to generator/tsgenerator_test.go diff --git a/parser/H264Parser.go b/parser/H264Parser.go new file mode 100644 index 00000000..d95069e3 --- /dev/null +++ b/parser/H264Parser.go @@ -0,0 +1,63 @@ +package parser + +type h264Parser struct { + inputBuffer []byte + isParsing bool + parserOutputChanRef chan []byte + userOutputChanRef chan []byte + inputChan chan byte +} + +func NewH264Parser() (p *h264Parser) { + p = new(h264Parser) + p.isParsing = true + p.inputChan = make(chan byte, 10000) + return +} + +func (p* h264Parser)Stop(){ + p.isParsing = false +} + +func (p *h264Parser)Start(){ + go p.parse() +} + +func (p *h264Parser)GetInputChan() chan byte { + return p.inputChan +} + +func (p *h264Parser)GetOutputChan() chan []byte { + return p.userOutputChanRef +} + +func (p *h264Parser)SetOutputChan(aChan chan []byte){ + p.parserOutputChanRef = aChan + p.userOutputChanRef = aChan +} + +func (p *h264Parser)parse() { + outputBuffer := make([]byte, 0, 10000) + searchingForEnd := false + for p.isParsing { + aByte := <-p.inputChan + outputBuffer = append(outputBuffer, aByte) + for i:=1; aByte == 0x00 && i != 4; i++ { + aByte = <-p.inputChan + outputBuffer = append(outputBuffer, aByte) + if ( aByte == 0x01 && i == 2 ) || ( aByte == 0x01 && i == 3 ) { + if searchingForEnd { + output := append(append(itut.StartCode1(),itut.AUD()...),outputBuffer[:len(outputBuffer)-(i+1)]...) + p.parserOutputChanRef<-output + outputBuffer = outputBuffer[len(outputBuffer)-1-i:] + searchingForEnd = false + } + aByte = <-p.inputChan + outputBuffer = append(outputBuffer, aByte) + if nalType := aByte & 0x1F; nalType == 1 || nalType == 5 { + searchingForEnd = true + } + } + } + } +} diff --git a/parser/MJPEGParser.go b/parser/MJPEGParser.go new file mode 100644 index 00000000..e4500163 --- /dev/null +++ b/parser/MJPEGParser.go @@ -0,0 +1,53 @@ +package parser + +type mjpegParser struct { + inputBuffer []byte + isParsing bool + parserOutputChanRef chan []byte + userOutputChanRef chan []byte + inputChan chan byte +} + +func NewMJPEGParser(inputChanLen int) (p *mjpegParser){ + p = new(mjpegParser) + p.isParsing = true + p.inputChan = make(chan byte, inputChanLen ) + return +} + +func (p *mjpegParser)Stop(){ + p.isParsing = false +} + +func (p *mjpegParser)Start(){ + go p.parse() +} + +func (p *mjpegParser)GetInputChan() chan byte { + return p.inputChan +} + +func (p *mjpegParser)GetOutputChan() chan []byte { + return p.userOutputChanRef +} + +func (p *mjpegParser)SetOutputChan(aChan chan []byte){ + p.parserOutputChanRef = aChan + p.userOutputChanRef = aChan +} + +func (p *mjpegParser)parse() { + var outputBuffer []byte + for p.isParsing { + aByte := <-p.inputChan + outputBuffer = append(outputBuffer, aByte) + if aByte == 0xFF && len(outputBuffer) != 0 { + aByte := <-p.inputChan + outputBuffer = append(outputBuffer, aByte) + if aByte == 0xD8 { + p.parserOutputChanRef<-outputBuffer[:len(outputBuffer)-2] + outputBuffer = outputBuffer[len(outputBuffer)-2:] + } + } + } +} diff --git a/parser/Parser.go b/parser/Parser.go index 6eb5ae3a..003934fc 100644 --- a/parser/Parser.go +++ b/parser/Parser.go @@ -52,118 +52,3 @@ type Parser interface { GetOutputChan() chan []byte SetOutputChan(achan chan []byte) } - -type h264Parser struct { - inputBuffer []byte - isParsing bool - parserOutputChanRef chan []byte - userOutputChanRef chan []byte - inputChan chan byte -} - -func NewH264Parser() (p *h264Parser) { - p = new(h264Parser) - p.isParsing = true - p.inputChan = make(chan byte, 10000) - return -} - -func (p* h264Parser)Stop(){ - p.isParsing = false -} - -func (p *h264Parser)Start(){ - go p.parse() -} - -func (p *h264Parser)GetInputChan() chan byte { - return p.inputChan -} - -func (p *h264Parser)GetOutputChan() chan []byte { - return p.userOutputChanRef -} - -func (p *h264Parser)SetOutputChan(aChan chan []byte){ - p.parserOutputChanRef = aChan - p.userOutputChanRef = aChan -} - -func (p *h264Parser)parse() { - outputBuffer := make([]byte, 0, 10000) - searchingForEnd := false - for p.isParsing { - aByte := <-p.inputChan - outputBuffer = append(outputBuffer, aByte) - for i:=1; aByte == 0x00 && i != 4; i++ { - aByte = <-p.inputChan - outputBuffer = append(outputBuffer, aByte) - if ( aByte == 0x01 && i == 2 ) || ( aByte == 0x01 && i == 3 ) { - if searchingForEnd { - output := append(append(itut.StartCode1(),itut.AUD()...),outputBuffer[:len(outputBuffer)-(i+1)]...) - p.parserOutputChanRef<-output - outputBuffer = outputBuffer[len(outputBuffer)-1-i:] - searchingForEnd = false - } - aByte = <-p.inputChan - outputBuffer = append(outputBuffer, aByte) - if nalType := aByte & 0x1F; nalType == 1 || nalType == 5 { - searchingForEnd = true - } - } - } - } -} - - -type mjpegParser struct { - inputBuffer []byte - isParsing bool - parserOutputChanRef chan []byte - userOutputChanRef chan []byte - inputChan chan byte -} - -func NewMJPEGParser(inputChanLen int) (p *mjpegParser){ - p = new(mjpegParser) - p.isParsing = true - p.inputChan = make(chan byte, inputChanLen ) - return -} - -func (p *mjpegParser)Stop(){ - p.isParsing = false -} - -func (p *mjpegParser)Start(){ - go p.parse() -} - -func (p *mjpegParser)GetInputChan() chan byte { - return p.inputChan -} - -func (p *mjpegParser)GetOutputChan() chan []byte { - return p.userOutputChanRef -} - -func (p *mjpegParser)SetOutputChan(aChan chan []byte){ - p.parserOutputChanRef = aChan - p.userOutputChanRef = aChan -} - -func (p *mjpegParser)parse() { - var outputBuffer []byte - for p.isParsing { - aByte := <-p.inputChan - outputBuffer = append(outputBuffer, aByte) - if aByte == 0xFF && len(outputBuffer) != 0 { - aByte := <-p.inputChan - outputBuffer = append(outputBuffer, aByte) - if aByte == 0xD8 { - p.parserOutputChanRef<-outputBuffer[:len(outputBuffer)-2] - outputBuffer = outputBuffer[len(outputBuffer)-2:] - } - } - } -} diff --git a/revid/RevidInstance.go b/revid/RevidInstance.go index bdd48b88..6d95339d 100644 --- a/revid/RevidInstance.go +++ b/revid/RevidInstance.go @@ -115,6 +115,7 @@ const ( Rtmp = 10 Ffmpeg = 11 Revid = 12 + Flv = 13 ) // Default config settings @@ -148,13 +149,16 @@ type revidInst struct { isRunning bool outputFile *os.File inputFile *os.File - generator tsgenerator.TsGenerator + generator generator.TsGenerator parser parser.Parser cmd *exec.Cmd ffmpegCmd *exec.Cmd inputReader *bufio.Reader ffmpegStdin io.WriteCloser outputChan chan []byte + configureOutput func() + getFrame func()[]byte + flushData func() } // NewRevidInstance returns a pointer to a new revidInst with the desired @@ -167,30 +171,7 @@ func NewRevidInstance(config Config) (r *revidInst, err error) { if err != nil { return nil, err } - switch r.config.Output { - case File: - r.outputFile, err = os.Create(r.config.OutputFileName) - if err != nil { - return nil, err - } - } - switch r.config.InputCodec { - case H264: - r.Log(Info, "Using H264 parser!") - r.parser = parser.NewH264Parser() - case Mjpeg: - r.parser = parser.NewMJPEGParser(mjpegParserInChanLen) - } r.outputChan = make(chan []byte, 10000) - switch r.config.Packetization { - case None: - r.parser.SetOutputChan(r.outputChan) - case Mpegts: - frameRateAsInt, _ := strconv.Atoi(r.config.FrameRate) - r.generator = tsgenerator.NewTsGenerator(uint(frameRateAsInt)) - r.parser.SetOutputChan(r.generator.GetNalInputChan()) - r.generator.Start() - } r.parser.Start() go r.packClips() r.Log(Info, "New revid instance created! config is:") @@ -207,6 +188,243 @@ func (r *revidInst) GetConfigRef() *Config { // configuration; checking validity and returning errors if not valid. func (r *revidInst) ChangeState(config Config) error { r.config.Logger = config.Logger + err := checkConfig(config) + if err != nil { + return errors.New("Config struct is bad!: " + err.Error()) + } + switch r.config.Output { + case File: + r.outputFile, err = os.Create(r.config.OutputFileName) + if err != nil { + return nil, err + } + configureOutput = configureOutputForFile + case Rtmp: + configureOutput = configureOutputForRtmp + } + switch r.config.Input { + case Raspivid: + configureInput = configureInputForRaspivid + case File: + configureInput = configureInputForFile + } + switch r.config.InputCodec { + case H264: + r.Log(Info, "Using H264 parser!") + r.parser = parser.NewH264Parser() + case Mjpeg: + r.Log(Info, "Using MJPEG parser!") + r.parser = parser.NewMJPEGParser(mjpegParserInChanLen) + } + switch r.config.Packetization { + case None: + r.parser.SetOutputChan(r.outputChan) + getFrame = getFrameForNoPacketization + case Mpegts: + frameRateAsInt, _ := strconv.Atoi(r.config.FrameRate) + r.generator = tsgenerator.NewTsGenerator(uint(frameRateAsInt)) + r.parser.SetOutputChan(r.generator.GetNalInputChan()) + r.generator.Start() + getFrame = getFrameForMpegtsPacketization + } + r.config = config + return nil +} + +// Log takes a logtype and message and tries to send this information to the +// logger provided in the revidInst config - if there is one, otherwise the message +// is sent to stdout +func (r *revidInst) Log(logType, m string) { + if r.config.Logger != nil { + r.config.Logger.Log(logType, m) + } else { + fmt.Println(logType + ": " + m) + } +} + +// IsRunning returns true if the revidInst is currently running and false otherwise +func (r *revidInst) IsRunning() bool { + return r.isRunning +} + +// Start invokes a revidInst to start processing video from a defined input +// and packetising to a defined output. +func (r *revidInst) Start() { + if r.isRunning { + r.Log(Warning, "revidInst.Start() called but revid already running!") + return + } + r.Log(Info, "Starting Revid!") + r.setupOutput() + r.setupInput() + go r.outputClips() + r.isRunning = true +} + +// Stop halts any processing of video data from a camera or file +func (r *revidInst) Stop() { + if r.isRunning { + r.Log(Info, "Stopping revid!") + r.isRunning = false + if r.cmd != nil { + r.cmd.Process.Kill() + } + } else { + r.Log(Warning, "revidInst.Stop() called but revid not running!") + } +} + +// Start invokes a revidInst to start processing video from a defined input +// and packetising to a defined output. +func (r *revidInst) getFrameNoPacketization() []byte { + return <-r.outputChan +} + +// Start invokes a revidInst to start processing video from a defined input +// and packetising to a defined output. +func (r *revidInst) getFrameWithPacketization() []byte { + return <-(r.generator.GetOutputChan()) +} + +// Start invokes a revidInst to start processing video from a defined input +// and packetising to a defined output. +func (r *revidInst) flushDataNoPacketisation(){ + for len(r.outputChan) > 0 { + <-(r.outputChan) + } +} + +// Start invokes a revidInst to start processing video from a defined input +func (r *revidInst) flushDataMpegtsPacketisation(){ + for len(r.generator.GetOutputChan()) > 0 { + <-(r.generator.GetTsOutputChan()) + } +} + +// packClips takes data segments; whether that be tsPackets or jpeg frames and +// packs them into clips 1s long. +func (r *revidInst) packClips() { + clipSize := 0 + packetCount := 0 + now := time.Now() + prevTime := now + for { + if clip, err := r.ringBuffer.Get(); err != nil { + r.Log(Error, err.Error()) + r.Log(Warning, "Clearing output chan!") + r.flushOutputData() + } else { + for { + frame := r.getFrame() + lenOfFrame := len(frame) + upperBound := clipSize + lenOfFrame + copy(clip[clipSize:upperBound], frame) + packetCount++ + clipSize += lenOfFrame + now = time.Now() + if packetCount >= r.config.FramesPerClip { + if err := r.ringBuffer.DoneWriting(clipSize); err != nil { + r.Log(Error, err.Error()) + r.Log(Warning, "Dropping clip!") + } + clipSize = 0 + packetCount = 0 + prevTime = now + break + } + } + } + } +} + +// outputClips takes the clips produced in the packClips method and outputs them +// to the desired output defined in the revidInst config +func (r *revidInst) outputClips() { + now := time.Now() + prevTime := now + bytes := 0 + delay := 0 + for r.isRunning { + // Here we slow things down as much as we can to decrease cpu usage + switch { + case r.ringBuffer.GetNoOfElements() < 2: + delay++ + time.Sleep(time.Duration(delay) * time.Millisecond) + case delay > 10: + delay -= 10 + } + // If the ringbuffer has something we can read and send off + if clip, err := r.ringBuffer.Read(); err == nil { + r.Log(Debug, fmt.Sprintf("Delay is: %v\n", delay)) + r.Log(Debug, fmt.Sprintf("Ring buffer size: %v\n", r.ringBuffer.GetNoOfElements())) + // Output clip to the output specified in the configuration struct + bytes += len(clip) + for err := r.sendClip(clip); err != nil; { + r.Log(Error, err.Error()) + r.Log(Warning, "Send failed trying again!") + err = r.sendClip(clip) + } + if err := r.ringBuffer.DoneReading(); err != nil { + r.Log(Error, err.Error()) + } + // Log some information regarding bitrate and ring buffer size if it's time + now = time.Now() + deltaTime := now.Sub(prevTime) + if deltaTime > time.Duration(bitrateTime)*time.Second { + r.Log(Info, fmt.Sprintf("Bitrate: %v bits/s\n", int64(float64(bytes*8)/float64(deltaTime/1e9)))) + r.Log(Info, fmt.Sprintf("Ring buffer size: %v\n", r.ringBuffer.GetNoOfElements())) + prevTime = now + bytes = 0 + } + } + } +} + +// Start invokes a revidInst to start processing video from a defined input +// and packetising to a defined output. +func (r *revidInst) sendClipToFile(clip []byte) error { + err := r.outputFile.Write(clip) + if err != nil { + return err + } + return nil +} + +// sendClipToHTTP takes a clip and an output url and posts through http. +func (r *revidInst) sendClipToHTTP(clip []byte) error { + timeout := time.Duration(httpTimeOut * time.Second) + client := http.Client{ + Timeout: timeout, + } + r.Log(Debug, fmt.Sprintf("Posting %s (%d bytes)\n", url, len(clip))) + resp, err := client.Post(r.config.HttpAddress + strconv.Itoa(len(clip)), "video/mp2t", bytes.NewReader(clip)) // lighter than NewBuffer + if err != nil { + return fmt.Errorf("Error posting to %s: %s", output, err) + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err == nil { + r.Log(Debug, fmt.Sprintf("%s\n", body)) + } else { + r.Log(Error, err.Error()) + } + return nil +} + +// Start invokes a revidInst to start processing video from a defined input +// and packetising to a defined output. +func (r *revidInst) sendClipToRtmp(clip []byte) error { + fmt.Println("Outputting!") + _, err := r.ffmpegStdin.Write(clip) + if err != nil { + return err + } + return nil +} + +// checkConfig checks for any errors in the config files and defaults settings +// if particular parameters have not been defined. +func (r *revidInst) checkConfig(config Config) error { switch config.Input { case Rtp: case Raspivid: @@ -276,7 +494,7 @@ func (r *revidInst) ChangeState(config Config) error { } switch { - case config.FramesPerClip >= 0: + case config.FramesPerClip > 0: case config.FramesPerClip == 0: r.Log(Warning, "No frames per clip defined, defaulting to 1!") config.FramesPerClip = 1 @@ -284,11 +502,6 @@ func (r *revidInst) ChangeState(config Config) error { return errors.New("Bad frames per clip given!") } - if config.FramesPerClip == 0 { - r.Log(Warning, "No frames per clip define, defaulting to 1!") - config.FramesPerClip = 1 - } else {} - if config.Width == "" { r.Log(Warning, "No width defined, defaulting to 1280!") config.Width = defaultWidth @@ -342,108 +555,82 @@ func (r *revidInst) ChangeState(config Config) error { return errors.New("Bad quantization defined in config!") } } - r.config = config - return nil -} - -// Log takes a logtype and message and tries to send this information to the -// logger provided in the revidInst config - if there is one, otherwise the message -// is sent to stdout -func (r *revidInst) Log(logType, m string) { - if r.config.Logger != nil { - r.config.Logger.Log(logType, m) - } else { - fmt.Println(logType + ": " + m) - } -} - -// IsRunning returns true if the revidInst is currently running and false otherwise -func (r *revidInst) IsRunning() bool { - return r.isRunning } // Start invokes a revidInst to start processing video from a defined input // and packetising to a defined output. -func (r *revidInst) Start() { - if r.isRunning { - r.Log(Warning, "revidInst.Start() called but revid already running!") +func (r *revidInst) setupOutputForRtmp(){ + r.ffmpegCmd = exec.Command(ffmpegPath, + "-f", "h264", + "-r", r.config.FrameRate, + "-i", "-", + "-f", "lavfi", + "-i", "aevalsrc=0", + "-fflags", "nobuffer", + "-vcodec", "copy", + "-acodec", "aac", + "-map", "0:0", + "-map", "1:0", + "-strict", "experimental", + "-f", "flv", + r.config.RtmpUrl, + ) + var err error + r.ffmpegStdin, err = r.ffmpegCmd.StdinPipe() + if err != nil { + r.Log(Error, err.Error()) + r.Stop() return } - r.Log(Info, "Starting Revid!") - // Configure output - switch r.config.Output { - case Rtmp: - r.ffmpegCmd = exec.Command(ffmpegPath, - "-f", "h264", - "-r", r.config.FrameRate, - "-i", "-", - "-f", "lavfi", - "-i", "aevalsrc=0", - "-fflags", "nobuffer", - "-vcodec", "copy", - "-acodec", "aac", - "-map", "0:0", - "-map", "1:0", - "-strict", "experimental", - "-f", "flv", - r.config.RtmpUrl, - ) - var err error - r.ffmpegStdin, err = r.ffmpegCmd.StdinPipe() - if err != nil { - r.Log(Error, err.Error()) - r.Stop() - return - } - err = r.ffmpegCmd.Start() - if err != nil { - r.Log(Error, err.Error()) - r.Stop() - return - } + err = r.ffmpegCmd.Start() + if err != nil { + r.Log(Error, err.Error()) + r.Stop() + return } +} - // Configure input - switch r.config.Input { - case Raspivid: - r.Log(Info, "Starting raspivid!") - switch r.config.InputCodec { - case H264: - r.cmd = exec.Command("raspivid", - "-cd", "H264", - "-o", "-", - "-n", - "-t", r.config.Timeout, - "-b", r.config.Bitrate, - "-qp", r.config.Quantization, - "-w", r.config.Width, - "-h", r.config.Height, - "-fps", r.config.FrameRate, - "-ih", - "-g", r.config.IntraRefreshPeriod, - ) - case Mjpeg: - r.cmd = exec.Command("raspivid", - "-cd", "MJPEG", - "-o", "-", - "-n", - "-t", r.config.Timeout, - "-fps", r.config.FrameRate, - ) - } - stdout, _ := r.cmd.StdoutPipe() - err := r.cmd.Start() - r.inputReader = bufio.NewReader(stdout) - if err != nil { - r.Log(Error, err.Error()) - return - } - go r.readCamera() - case File: - go r.readFile() +// Start invokes a revidInst to start processing video from a defined input +// and packetising to a defined output. +func (r *revidInst) setupInputForRaspivid(){ + r.Log(Info, "Starting raspivid!") + switch r.config.InputCodec { + case H264: + r.cmd = exec.Command("raspivid", + "-cd", "H264", + "-o", "-", + "-n", + "-t", r.config.Timeout, + "-b", r.config.Bitrate, + "-qp", r.config.Quantization, + "-w", r.config.Width, + "-h", r.config.Height, + "-fps", r.config.FrameRate, + "-ih", + "-g", r.config.IntraRefreshPeriod, + ) + case Mjpeg: + r.cmd = exec.Command("raspivid", + "-cd", "MJPEG", + "-o", "-", + "-n", + "-t", r.config.Timeout, + "-fps", r.config.FrameRate, + ) } - go r.outputClips() - r.isRunning = true + stdout, _ := r.cmd.StdoutPipe() + err := r.cmd.Start() + r.inputReader = bufio.NewReader(stdout) + if err != nil { + r.Log(Error, err.Error()) + return + } + go r.readCamera() +} + +// Start invokes a revidInst to start processing video from a defined input +func (r *revidInst) setupInputForFile(){ + go r.readFile() } // readCamera reads data from the defined camera while the revidInst is running. @@ -496,157 +683,3 @@ func (r *revidInst) readFile() { } } } - -// Stop halts any processing of video data from a camera -func (r *revidInst) Stop() { - if r.isRunning { - r.Log(Info, "Stopping revid!") - r.isRunning = false - if r.cmd != nil { - r.cmd.Process.Kill() - } - } else { - r.Log(Warning, "revidInst.Stop() called but revid not running!") - } -} - -// packClips takes data segments; whether that be tsPackets or jpeg frames and -// packs them into clips 1s long. -func (r *revidInst) packClips() { - clipSize := 0 - packetCount := 0 - now := time.Now() - prevTime := now - for { - if clip, err := r.ringBuffer.Get(); err != nil { - r.Log(Error, err.Error()) - switch r.config.Packetization { - case None: - r.Log(Warning, "Clearing mjpeg chan!") - for len(r.outputChan) > 0 { - <-(r.outputChan) - } - case Mpegts: - r.Log(Warning, "Clearing TS chan!") - for len(r.generator.GetTsOutputChan()) > 0 { - <-(r.generator.GetTsOutputChan()) - } - } - time.Sleep(1 * time.Second) - } else { - for { - switch r.config.Packetization { - case None: - frame := <-r.outputChan - upperBound := clipSize + len(frame) - copy(clip[clipSize:upperBound], frame) - packetCount++ - clipSize += len(frame) - case Mpegts: - tsPacket := <-(r.generator.GetTsOutputChan()) - tsByteSlice, err := tsPacket.ToByteSlice() - if err != nil { - r.Log(Error, err.Error()) - } - upperBound := clipSize + mp2tPacketSize - copy(clip[clipSize:upperBound], tsByteSlice) - packetCount++ - clipSize += mp2tPacketSize - } - // send if (1) our buffer is full or (2) 1 second has elapsed and we have % packetsPerFrame - now = time.Now() - if packetCount > r.config.FramesPerClip { - if err := r.ringBuffer.DoneWriting(clipSize); err != nil { - r.Log(Error, err.Error()) - r.Log(Warning, "Dropping clip!") - } - clipSize = 0 - packetCount = 0 - prevTime = now - break - } - } - } - } -} - -// outputClips takes the clips produced in the packClips method and outputs them -// to the desired output defined in the revidInst config -func (r *revidInst) outputClips() { - now := time.Now() - prevTime := now - bytes := 0 - delay := 0 - for r.isRunning { - // Here we slow things down as much as we can to decrease cpu usage - switch { - case r.ringBuffer.GetNoOfElements() < 2: - delay++ - time.Sleep(time.Duration(delay) * time.Millisecond) - case delay > 10: - delay -= 10 - } - - // If the ringbuffer has something we can read and send off - if clip, err := r.ringBuffer.Read(); err == nil { - r.Log(Debug, fmt.Sprintf("Delay is: %v\n", delay)) - r.Log(Debug, fmt.Sprintf("Ring buffer size: %v\n", r.ringBuffer.GetNoOfElements())) - - // Output clip to the output specified in the configuration struct - switch r.config.Output { - case File: - r.outputFile.Write(clip) - case HttpOut: - bytes += len(clip) - for err := r.sendClipToHTTP(clip, r.config.HttpAddress); err != nil; { - r.Log(Error, err.Error()) - r.Log(Warning, "Post failed trying again!") - err = r.sendClipToHTTP(clip, r.config.HttpAddress) - } - case Rtmp: - fmt.Println("Outputting!") - _, err := r.ffmpegStdin.Write(clip) - if err != nil { - r.Log(Error, err.Error()) - } - default: - r.Log(Error, "No output defined!") - } - if err := r.ringBuffer.DoneReading(); err != nil { - r.Log(Error, err.Error()) - } - - // Log some information regarding bitrate and ring buffer size if it's time - now = time.Now() - deltaTime := now.Sub(prevTime) - if deltaTime > time.Duration(bitrateTime)*time.Second { - r.Log(Info, fmt.Sprintf("Bitrate: %v bits/s\n", int64(float64(bytes*8)/float64(deltaTime/1e9)))) - r.Log(Info, fmt.Sprintf("Ring buffer size: %v\n", r.ringBuffer.GetNoOfElements())) - prevTime = now - bytes = 0 - } - } - } -} - -// sendClipToHTTP takes a clip and an output url and posts through http. -func (r *revidInst) sendClipToHTTP(clip []byte, output string) error { - timeout := time.Duration(httpTimeOut * time.Second) - client := http.Client{ - Timeout: timeout, - } - url := output + strconv.Itoa(len(clip)) - r.Log(Debug, fmt.Sprintf("Posting %s (%d bytes)\n", url, len(clip))) - resp, err := client.Post(url, "video/mp2t", bytes.NewReader(clip)) // lighter than NewBuffer - if err != nil { - return fmt.Errorf("Error posting to %s: %s", output, err) - } - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) - if err == nil { - r.Log(Debug, fmt.Sprintf("%s\n", body)) - } else { - r.Log(Error, err.Error()) - } - return nil -} diff --git a/revid/rtmpPublisher.go b/revid/rtmpPublisher.go new file mode 100644 index 00000000..b8b336ce --- /dev/null +++ b/revid/rtmpPublisher.go @@ -0,0 +1,81 @@ +package main + +import ( + "flag" + "fmt" + "os" + "time" + + "github.com/zhangpeihao/goflv" + rtmp "github.com/zhangpeihao/gortmp" + "github.com/zhangpeihao/log" +) + +const ( + programName = "RtmpPublisher" + version = "0.0.1" +) + +var ( + url *string = flag.String("URL", "rtmp://video-center.alivecdn.com/AppName/StreamName?vhost=live.gz-app.com", "The rtmp url to connect.") + streamName *string = flag.String("Stream", "camstream", "Stream name to play.") + flvFileName *string = flag.String("FLV", "./v_4097.flv", "FLV file to publishs.") +) +var obConn rtmp.OutboundConn +var createStreamChan chan rtmp.OutboundStream +var videoDataSize int64 +var audioDataSize int64 +var flvFile *flv.File + +var status uint + +func publish(stream rtmp.OutboundStream) { + startTs := uint32(0) + startAt := time.Now().UnixNano() + preTs := uint32(0) + for status == rtmp.OUTBOUND_CONN_STATUS_CREATE_STREAM_OK { + + fmt.Printf("@@@@@@@@@@@@@@diff1 header(%+v), startTs: %d\n", header, startTs) + if err = stream.PublishData(header.TagType, data, diff1); err != nil { + fmt.Println("PublishData() error:", err) + break + } + } +} + +func main() { + createStreamChan = make(chan rtmp.OutboundStream) + testHandler := &TestOutboundConnHandler{} + fmt.Println("to dial") + fmt.Println("a") + var err error + obConn, err = rtmp.Dial(*url, testHandler, 100) + if err != nil { + fmt.Println("Dial error", err) + os.Exit(-1) + } + fmt.Println("b") + defer obConn.Close() + fmt.Println("to connect") + err = obConn.Connect() + if err != nil { + fmt.Printf("Connect error: %s", err.Error()) + os.Exit(-1) + } + fmt.Println("c") + for { + select { + case stream := <-createStreamChan: + // Publish + stream.Attach(testHandler) + err = stream.Publish(*streamName, "live") + if err != nil { + fmt.Printf("Publish error: %s", err.Error()) + os.Exit(-1) + } + + case <-time.After(1 * time.Second): + fmt.Printf("Audio size: %d bytes; Vedio size: %d bytes\n", audioDataSize, videoDataSize) + } + } +}