/* NAME Revid.go DESCRIPTION See Readme.md AUTHORS Saxon A. Nelson-Milton Alan Noble LICENSE revid is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). */ // revid is a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols. package revid import ( "bufio" "bytes" "errors" "fmt" "io" "io/ioutil" "net/http" "os" "os/exec" "strconv" "sync" "time" "bitbucket.org/ausocean/av/generator" "bitbucket.org/ausocean/av/parser" "bitbucket.org/ausocean/av/ringbuffer" "bitbucket.org/ausocean/av/rtmp" /* "../generator" "../parser" "../ringbuffer" "../rtmp" */) // Misc constants const ( clipDuration = 1 // s mp2tPacketSize = 188 // MPEG-TS packet size mp2tMaxPackets = 2016 * clipDuration // # first multiple of 7 and 8 greater than 2000 ringBufferSize = 1000 ringBufferElementSize = 150000 httpTimeOut = 5 // s packetsPerFrame = 7 h264BufferSize = 1000000 bitrateTime = 10 // s mjpegParserInChanLen = 100000 ffmpegPath = "/usr/local/bin/ffmpeg" rtmpConnectionTimout = 10 outputChanSize = 10000 cameraRetryPeriod = 5 * time.Second sendFailedDelay = 5 maxSendFailedErrorCount = 500 clipSizeThreshold = 11 rtmpConnectionMaxTries = 5 ) // Log Types const ( Error = "Error" Warning = "Warning" Info = "Info" Debug = "Debug" ) // Revid provides methods to control a revid session; providing methods // to start, stop and change the state of an instance using the Config struct. type Revid interface { Start() Stop() changeState(newconfig Config) error GetConfigRef() *Config Log(logType, m string) IsRunning() bool } // The revid struct provides fields to describe the state of a Revid. type revid struct { ffmpegPath string tempDir string ringBuffer ringbuffer.RingBuffer config Config isRunning bool outputFile *os.File inputFile *os.File generator generator.Generator parser parser.Parser cmd *exec.Cmd ffmpegCmd *exec.Cmd inputReader *bufio.Reader ffmpegStdin io.WriteCloser outputChan chan []byte setupInput func() error setupOutput func() error getFrame func() []byte sendClip func(clip []byte) error rtmpInst rtmp.RTMPSession mutex sync.Mutex currentBitrate int64 } // NewRevid returns a pointer to a new revid with the desired // configuration, and/or an error if construction of the new instant was not // successful. func NewRevid(config Config) (r *revid, err error) { r = new(revid) r.mutex = sync.Mutex{} r.ringBuffer = ringbuffer.NewRingBuffer(ringBufferSize, ringBufferElementSize) err = r.changeState(config) if err != nil { r = nil return } r.outputChan = make(chan []byte, outputChanSize) r.parser.Start() go r.packClips() return } // Returns the currently saved bitrate from the most recent bitrate check // check bitrate output delay in consts for this period func (r *revid) GetBitrate() int64 { return r.currentBitrate } // GetConfigRef returns a pointer to the revidInst's Config struct object func (r *revid) GetConfigRef() *Config { return &r.config } // changeState swaps the current config of a revid with the passed // configuration; checking validity and returning errors if not valid. func (r *revid) changeState(config Config) error { r.config.Logger = config.Logger err := config.Validate(r) if err != nil { return errors.New("Config struct is bad!: " + err.Error()) } r.config = config switch r.config.Output { case File: r.sendClip = r.sendClipToFile r.setupOutput = r.setupOutputForFile case FfmpegRtmp: r.setupOutput = r.setupOutputForFfmpegRtmp r.sendClip = r.sendClipToFfmpegRtmp case NativeRtmp: r.setupOutput = r.setupOutputForLibRtmp r.sendClip = r.sendClipToLibRtmp case Http: r.sendClip = r.sendClipToHTTP } switch r.config.Input { case Raspivid: r.setupInput = r.setupInputForRaspivid case File: r.setupInput = r.setupInputForFile } switch r.config.InputCodec { case H264: r.Log(Info, "Using H264 parser!") r.parser = parser.NewH264Parser() case Mjpeg: r.Log(Info, "Using MJPEG parser!") r.parser = parser.NewMJPEGParser(mjpegParserInChanLen) } switch r.config.Packetization { case None: // no packetisation - Revid output chan grabs raw data straight from parser r.parser.SetOutputChan(r.outputChan) r.getFrame = r.getFrameNoPacketization goto noPacketizationSetup case Mpegts: r.Log(Info, "Using MPEGTS packetisation!") frameRateAsInt, _ := strconv.Atoi(r.config.FrameRate) r.generator = generator.NewTsGenerator(uint(frameRateAsInt)) case Flv: r.Log(Info, "Using FLV packetisation!") frameRateAsInt, _ := strconv.Atoi(r.config.FrameRate) r.generator = generator.NewFlvGenerator(true, true, uint(frameRateAsInt)) } // We have packetization of some sort, so we want to send data to Generator // to perform packetization r.getFrame = r.getFramePacketization r.parser.SetOutputChan(r.generator.GetInputChan()) r.generator.Start() noPacketizationSetup: return nil } // ChangeConfig changes the current configuration of the revid instance. func (r *revid) ChangeConfig(config Config) (err error) { r.Stop() r, err = NewRevid(config) if err != nil { return } r.Start() return } // Log takes a logtype and message and tries to send this information to the // logger provided in the revid config - if there is one, otherwise the message // is sent to stdout func (r *revid) Log(logType, m string) { if r.config.Verbosity == Yes { if r.config.Logger != nil { r.config.Logger.Log(logType, m) return } fmt.Println(logType + ": " + m) } } // IsRunning returns true if the revid is currently running and false otherwise func (r *revid) IsRunning() bool { return r.isRunning } // Start invokes a revid to start processing video from a defined input // and packetising (if theres packetization) to a defined output. func (r *revid) Start() { r.mutex.Lock() defer r.mutex.Unlock() if r.isRunning { r.Log(Warning, "revid.Start() called but revid already running!") return } r.Log(Info, "Starting Revid!") if r.setupOutput != nil { err := r.setupOutput() if err != nil { r.Log(Error, err.Error()) return } } go r.setupInput() go r.outputClips() r.isRunning = true } // Stop halts any processing of video data from a camera or file func (r *revid) Stop() { r.mutex.Lock() defer r.mutex.Unlock() if !r.isRunning { r.Log(Warning, "revid.Stop() called but revid not running!") return } r.Log(Info, "Stopping revid!") r.isRunning = false // If a cmd process is running, we kill! if r.cmd != nil && r.cmd.Process != nil { r.cmd.Process.Kill() } r.rtmpInst.EndSession() } // getFrameNoPacketization gets a frame directly from the revid output chan // as we don't need to go through the generator with no packetization settings func (r *revid) getFrameNoPacketization() []byte { return <-r.outputChan } // getFramePacketization gets a frame from the generators output chan - the // the generator being an mpegts or flv generator depending on the config func (r *revid) getFramePacketization() []byte { return <-(r.generator.GetOutputChan()) } // flushDataPacketization removes data from the revid inst's coutput chan func (r *revid) flushData() { for len(r.outputChan) > 0 { <-(r.outputChan) } } // packClips takes data segments; whether that be tsPackets or mjpeg frames and // packs them into clips consisting of the amount frames specified in the config func (r *revid) packClips() { clipSize := 0 packetCount := 0 for { // Get some memory from the ring buffer for out clip var clip []byte var err error if clip, err = r.ringBuffer.Get(); err != nil { r.Log(Error, err.Error()) r.Log(Warning, "Clearing output chan!") // Keep clearing output chan until out buffer has some space //r.rtmpInst.EndSession() for clip, err = r.ringBuffer.Get(); err != nil; { r.flushData() clip, err = r.ringBuffer.Get() } r.Log(Debug, "Finally got mem from ringbuffer!") } for { frame := r.getFrame() lenOfFrame := len(frame) if lenOfFrame > ringBufferElementSize { r.Log(Warning,fmt.Sprintf("Frame was too big: %v bytes, getting another one!", lenOfFrame)) frame = r.getFrame() lenOfFrame = len(frame) } upperBound := clipSize + lenOfFrame copy(clip[clipSize:upperBound], frame) packetCount++ clipSize += lenOfFrame fpcAsInt,err := strconv.Atoi(r.config.FramesPerClip) if err != nil { r.Log(Error, "Frames per clip not quite right! Defaulting to 1!") r.config.FramesPerClip = "1" } if packetCount >= fpcAsInt { if err := r.ringBuffer.DoneWriting(clipSize); err != nil { r.Log(Error, err.Error()) r.Log(Warning, "Dropping clip!") } clipSize = 0 packetCount = 0 break } } } } // outputClips takes the clips produced in the packClips method and outputs them // to the desired output defined in the revid config func (r *revid) outputClips() { now := time.Now() prevTime := now bytes := 0 delay := 0 for r.isRunning { // Here we slow things down as much as we can to decrease cpu usage switch { case r.ringBuffer.GetNoOfElements() < 2: delay++ time.Sleep(time.Duration(delay) * time.Millisecond) case delay > 0: delay-- } // If the ringbuffer has something we can read and send off if clip, err := r.ringBuffer.Read(); err == nil { bytes += len(clip) errorCount := 0 r.Log(Debug,"About to send!") err2 := r.sendClip(clip) r.Log(Debug,"Finished send!") for ; err2 != nil; errorCount++ { r.Log(Warning, "Send failed trying again!") // If the clip size is not bigger than the threshold then we classify // it as junk and we don't try to send it off again. if len(clip) >= clipSizeThreshold { err2 = r.sendClip(clip) if err2 == nil { break } } else { break } // So that we don't fill up the log, once we reach the maxSendFailedErrorCount // we will add some delay to slow things down until we have a connection again if errorCount > maxSendFailedErrorCount { time.Sleep(time.Duration(sendFailedDelay) * time.Second) } r.Log(Error, err2.Error()) if r.config.Output == NativeRtmp && errorCount > 10 { r.rtmpInst.EndSession() r.rtmpInst = rtmp.NewRTMPSession(r.config.RtmpUrl, rtmpConnectionTimout) err = r.rtmpInst.StartSession() } } // let the ringbuffer know that we're done with the memory we grabbed when // we call ringBuffer.Get() if err := r.ringBuffer.DoneReading(); err != nil { r.Log(Error, err.Error()) } // Log some information regarding bitrate and ring buffer size if it's time now = time.Now() deltaTime := now.Sub(prevTime) if deltaTime > time.Duration(bitrateTime)*time.Second { r.currentBitrate = int64(float64(bytes*8)/float64(deltaTime/1e9)) r.Log(Info, fmt.Sprintf("Bitrate: %v bits/s\n", r.currentBitrate )) r.Log(Info, fmt.Sprintf("Ring buffer size: %v\n", r.ringBuffer.GetNoOfElements())) prevTime = now bytes = 0 } } else { //r.Log(Error, err.Error()) } } r.Log(Info, "Not outputting clips anymore!") } // senClipToFile writes the passed clip to a file func (r *revid) sendClipToFile(clip []byte) error { _, err := r.outputFile.Write(clip) if err != nil { return err } return nil } // sendClipToHTTP takes a clip and an output url and posts through http. func (r *revid) sendClipToHTTP(clip []byte) error { timeout := time.Duration(httpTimeOut * time.Second) client := http.Client{Timeout: timeout} url := r.config.HttpAddress + strconv.Itoa(len(clip)) r.Log(Debug, fmt.Sprintf("Posting %s (%d bytes)\n", url, len(clip))) resp, err := client.Post(url, "video/mp2t", bytes.NewReader(clip)) if err != nil { return fmt.Errorf("Error posting to %s: %s", url, err) } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err == nil { r.Log(Debug, fmt.Sprintf("%s\n", body)) } else { r.Log(Error, err.Error()) } return nil } // sendClipToFfmpegRtmp sends the clip over the current rtmp connection using // an ffmpeg process. func (r *revid) sendClipToFfmpegRtmp(clip []byte) (err error) { _, err = r.ffmpegStdin.Write(clip) return } // sendClipToLibRtmp send the clip over the current rtmp connection using the // c based librtmp library func (r *revid) sendClipToLibRtmp(clip []byte) (err error) { err = r.rtmpInst.WriteFrame(clip, uint(len(clip))) return } // setupOutputForFfmpegRtmp sets up output to rtmp using an ffmpeg process func (r *revid) setupOutputForFfmpegRtmp() error { r.ffmpegCmd = exec.Command(ffmpegPath, "-f", "h264", "-r", r.config.FrameRate, "-i", "-", "-f", "lavfi", "-i", "aevalsrc=0", "-fflags", "nobuffer", "-vcodec", "copy", "-acodec", "aac", "-map", "0:0", "-map", "1:0", "-strict", "experimental", "-f", "flv", r.config.RtmpUrl, ) var err error r.ffmpegStdin, err = r.ffmpegCmd.StdinPipe() if err != nil { r.Log(Error, err.Error()) r.Stop() return err } err = r.ffmpegCmd.Start() if err != nil { r.Log(Error, err.Error()) r.Stop() return err } return nil } // setupOutputForLibRtmp sets up rtmp output using the wrapper for the c based // librtmp library - makes connection and starts comms etc. func (r *revid) setupOutputForLibRtmp() error { r.rtmpInst = rtmp.NewRTMPSession(r.config.RtmpUrl, rtmpConnectionTimout) err := r.rtmpInst.StartSession() for noOfTries := 0; err != nil && noOfTries < rtmpConnectionMaxTries; noOfTries++ { r.rtmpInst.EndSession() r.Log(Error, err.Error()) r.Log(Info, "Trying to establish rtmp connection again!") r.rtmpInst = rtmp.NewRTMPSession(r.config.RtmpUrl, rtmpConnectionTimout) err = r.rtmpInst.StartSession() } if err != nil { return err } return err } // setupOutputForFile sets up an output file to output data to func (r *revid) setupOutputForFile() (err error) { r.outputFile, err = os.Create(r.config.OutputFileName) return } // setupInputForRaspivid sets up things for input from raspivid i.e. starts // a raspivid process and pipes it's data output. func (r *revid) setupInputForRaspivid() error { r.Log(Info, "Starting raspivid!") switch r.config.InputCodec { case H264: var quantizationFlag string var quantizationValue string var verticalFlipFlag string var horizontalFlipFlag string switch r.config.QuantizationMode { case QuantizationOn: quantizationFlag = "-qp" quantizationValue = r.config.Quantization case QuantizationOff: quantizationFlag = "" quantizationValue = "" } switch r.config.HorizontalFlip { case Yes: horizontalFlipFlag = "-hf" case No: horizontalFlipFlag = "" } switch r.config.VerticalFlip { case Yes: verticalFlipFlag = "-vf" case No: verticalFlipFlag = "" } r.cmd = exec.Command("raspivid", "-cd", "H264", "-o", "-", "-n", "-t", r.config.Timeout, "-b", "0", quantizationFlag, quantizationValue, "-w", r.config.Width, "-h", r.config.Height, "-fps", r.config.FrameRate, "-ih", "-g", r.config.IntraRefreshPeriod, verticalFlipFlag, horizontalFlipFlag, ) case Mjpeg: r.cmd = exec.Command("raspivid", "-cd", "MJPEG", "-o", "-", "-n", "-t", r.config.Timeout, "-fps", r.config.FrameRate, ) } stdout, _ := r.cmd.StdoutPipe() err := r.cmd.Start() r.inputReader = bufio.NewReader(stdout) if err != nil { r.Log(Error, err.Error()) return err } go r.readCamera() return nil } // setupInputForFile sets things up for getting input from a file func (r *revid) setupInputForFile() error { fps, _ := strconv.Atoi(r.config.FrameRate) r.parser.SetDelay(uint(float64(1000) / float64(fps))) r.readFile() return nil } // testRtmp is useful to check robustness of connections. Intended to be run as // goroutine. After every 'delayTime' the rtmp connection is ended and then // restarted func (r *revid) testRtmp(delayTime uint) { for { time.Sleep(time.Duration(delayTime) * time.Millisecond) r.rtmpInst.EndSession() r.rtmpInst.StartSession() } } // readCamera reads data from the defined camera while the revid is running. // TODO: use ringbuffer here instead of allocating mem every time! func (r *revid) readCamera() { r.Log(Info, "Reading camera data!") for r.isRunning { data := make([]byte, 1) _, err := io.ReadFull(r.inputReader, data) switch { // We know this means we're getting nothing from the cam case ( err != nil && err.Error() == "EOF" && r.isRunning ) || ( err != nil && r.isRunning): r.Log(Error, "No data from camera!") time.Sleep(cameraRetryPeriod) default: r.parser.GetInputChan() <- data[0] } } r.Log(Info, "Not trying to read from camera anymore!") } // readFile reads data from the defined file while the revid is running. func (r *revid) readFile() error { var err error r.inputFile, err = os.Open(r.config.InputFileName) if err != nil { r.Log(Error, err.Error()) r.Stop() return err } stats, err := r.inputFile.Stat() if err != nil { r.Log(Error, "Could not get input file stats!") r.Stop() return err } data := make([]byte, stats.Size()) _, err = r.inputFile.Read(data) if err != nil { r.Log(Error, err.Error()) r.Stop() return err } for i := range data { r.parser.GetInputChan() <- data[i] } r.inputFile.Close() return nil }