/* NAME revid - a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols. DESCRIPTION See Readme.md AUTHORS Alan Noble Saxon A. Nelson-Milton LICENSE revid is Copyright (C) 2017 Alan Noble. It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). */ // revid is a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols. package revid import ( "bufio" "bytes" "errors" "fmt" "io" "io/ioutil" "net/http" "os" "os/exec" "strconv" "time" "bitbucket.org/ausocean/av/parser" "bitbucket.org/ausocean/av/tsgenerator" //"../parser" //"../tsgenerator" "bitbucket.org/ausocean/av/ringbuffer" "bitbucket.org/ausocean/utils/smartLogger" //"../../utils/smartLogger" //"../ringbuffer" ) // Misc constants const ( clipDuration = 1 // s mp2tPacketSize = 188 // MPEG-TS packet size mp2tMaxPackets = 2016 * clipDuration // # first multiple of 7 and 8 greater than 2000 ringBufferSize = 100 / clipDuration ringBufferElementSize = 1000000 httpTimeOut = 5 // s packetsPerFrame = 7 h264BufferSize = 1000000 bitrateTime = 60 mjpegParserInChanLen = 100000 ) // Log Types const ( Error = "Error" Warning = "Warning" Info = "Info" Debug = "Debug" ) // Config provides parameters relevant to a revid instance. A new config must // be passed to the constructor. type Config struct { Input uint8 InputCodec uint8 Output uint8 RtmpEncodingMethod uint8 RtmpURL string Bitrate string OutputFileName string InputFileName string Height string Width string FrameRate string HttpAddress string Quantization string Timeout string Packetization uint8 IntraRefreshPeriod string Logger smartLogger.LogInstance } // Enums for config struct const ( NothingDefined = 0 Raspivid = 1 Rtp = 2 H264Codec = 3 File = 4 HttpOut = 5 H264 = 6 Mjpeg = 7 None = 8 Mpegts = 9 Rtmp = 10 Ffmpeg = 11 Revid = 12 ) // Default config settings const ( defaultFrameRate = "25" defaultWidth = "1280" defaultHeight = "720" defaultIntraRefreshPeriod = "100" defaultTimeout = "0" defaultQuantization = "35" defaultBitrate = "0" ) // RevidInst provides methods to control a revidInst session; providing methods // to start, stop and change the state of an instance using the Config struct. type RevidInst interface { Start() Stop() ChangeState(newconfig Config) error GetConfigRef() *Config Log(logType, m string) IsRunning() bool } // The revidInst struct provides fields to describe the state of a RevidInst. type revidInst struct { ffmpegPath string tempDir string ringBuffer ringbuffer.RingBuffer config Config isRunning bool outputFile *os.File inputFile *os.File generator tsgenerator.TsGenerator parser parser.Parser cmd *exec.Cmd inputReader *bufio.Reader mjpegOutputChan chan []byte } // NewRevidInstance returns a pointer to a new revidInst with the desired // configuration, and/or an error if construction of the new instant was not // successful. func NewRevidInstance(config Config) (r *revidInst, err error) { r = new(revidInst) r.ringBuffer = ringbuffer.NewRingBuffer(ringBufferSize, ringBufferElementSize) err = r.ChangeState(config) if err != nil { return nil, err } switch r.config.Output { case File: r.outputFile, err = os.Create(r.config.OutputFileName) if err != nil { return nil, err } } switch r.config.Input { case File: r.inputFile, err = os.Open(r.config.InputFileName) if err != nil { return nil, err } } switch r.config.InputCodec { case H264: r.Log(Info, "Using H264 parser!") r.parser = parser.NewH264Parser() case Mjpeg: r.parser = parser.NewMJPEGParser(mjpegParserInChanLen) } r.mjpegOutputChan = make(chan []byte, 10000) switch r.config.Packetization { case None: r.parser.SetOutputChan(r.mjpegOutputChan) case Mpegts: frameRateAsInt, _ := strconv.Atoi(r.config.FrameRate) r.generator = tsgenerator.NewTsGenerator(uint(frameRateAsInt)) r.parser.SetOutputChan(r.generator.GetNalInputChan()) r.generator.Start() } r.parser.Start() go r.packClips() r.Log(Info, "New revid instance created! config is:") r.Log(Info, fmt.Sprintf("%v", r.config)) return } // GetConfigRef returns a pointer to the revidInst's Config struct object func (r *revidInst) GetConfigRef() *Config { return &r.config } // ChangeState swaps the current config of a revidInst with the passed // configuration; checking validity and returning errors if not valid. func (r *revidInst) ChangeState(config Config) error { r.config.Logger = config.Logger switch config.Input { case Rtp: case Raspivid: case File: case NothingDefined: r.Log(Warning, "No input type defined, defaulting to raspivid!") config.Input = Raspivid default: return errors.New("Bad input type defined in config!") } switch config.InputCodec { case H264: if config.Bitrate != "" && config.Quantization != "" { bitrate, _ := strconv.Atoi(config.Bitrate) quantization, _ := strconv.Atoi(config.Quantization) if (bitrate > 0 && quantization > 0) || (bitrate == 0 && quantization == 0) { return errors.New("Bad bitrate and quantization combination for H264 input!") } } case Mjpeg: if config.Quantization != "" { quantization, _ := strconv.Atoi(config.Quantization) if quantization > 0 || config.Bitrate == "" { return errors.New("Bad bitrate or quantization for mjpeg input!") } } case NothingDefined: r.Log(Warning, "No input codec defined, defaulting to h264!") config.InputCodec = H264 r.Log(Warning, "Defaulting bitrate to 0 and quantization to 35!") config.Bitrate = defaultBitrate config.Quantization = defaultQuantization default: return errors.New("Bad input codec defined in config!") } switch config.Output { case HttpOut: case File: case Rtmp: switch config.RtmpEncodingMethod { case Revid: case Ffmpeg: case NothingDefine: r.Log(Warning, "No RTMP encoding method defined, defautling to ffmpeg!") config.RtmpEncodingMethod = Ffmpeg } case NothingDefined: r.Log(Warning, "No output defined, defaulting to httpOut!") config.Output = HttpOut default: return errors.New("Bad output type defined in config!") } switch config.Packetization { case None: case Mpegts: case NothingDefined: r.Log(Warning, "No packetization option defined, defaulting to none!") config.Packetization = None default: return errors.New("Bad packetization option defined in config!") } if config.Width == "" { r.Log(Warning, "No width defined, defaulting to 1280!") config.Width = defaultWidth } else { if integer, err := strconv.Atoi(config.Width); integer < 0 || err != nil { return errors.New("Bad width defined in config!") } } if config.Height == "" { r.Log(Warning, "No height defined, defaulting to 720!") config.Height = defaultHeight } else { if integer, err := strconv.Atoi(config.Height); integer < 0 || err != nil { return errors.New("Bad height defined in config!") } } if config.FrameRate == "" { r.Log(Warning, "No frame rate defined, defaulting to 25!") config.FrameRate = defaultFrameRate } else { if integer, err := strconv.Atoi(config.FrameRate); integer < 0 || err != nil { return errors.New("Bad frame rate defined in config!") } } if config.Timeout == "" { r.Log(Warning, "No timeout defined, defaulting to 0!") config.Timeout = defaultTimeout } else { if integer, err := strconv.Atoi(config.Timeout); integer < 0 || err != nil { return errors.New("Bad timeout defined in config!") } } if config.IntraRefreshPeriod == "" { r.Log(Warning, "No intra refresh defined, defaulting to 100!") config.IntraRefreshPeriod = defaultIntraRefreshPeriod } else { if integer, err := strconv.Atoi(config.IntraRefreshPeriod); integer < 0 || err != nil { return errors.New("Bad intra refresh defined in config!") } } if config.Quantization == "" { r.Log(Warning, "No quantization defined, defaulting to 35!") config.Quantization = defaultQuantization } else { if integer, err := strconv.Atoi(config.Quantization); integer < 0 || integer > 51 || err != nil { return errors.New("Bad quantization defined in config!") } } r.config = config return nil } // Log takes a logtype and message and tries to send this information to the // logger provided in the revidInst config - if there is one, otherwise the message // is sent to stdout func (r *revidInst) Log(logType, m string) { if r.config.Logger != nil { r.config.Logger.Log(logType, m) } else { fmt.Println(logType + ": " + m) } } // IsRunning returns true if the revidInst is currently running and false otherwise func (r *revidInst) IsRunning() bool { return r.isRunning } // Start invokes a revidInst to start processing video from a defined input // and packetising to a defined output. func (r *revidInst) Start() { if r.isRunning { r.Log(Warning, "revidInst.Start() called but revid already running!") return } r.Log(Info, "Starting Revid!") switch r.config.Input { case Raspivid: r.Log(Info, "Starting raspivid!") switch r.config.InputCodec { case H264: r.cmd = exec.Command("raspivid", "-cd", "H264", "-o", "-", "-n", "-t", r.config.Timeout, "-b", r.config.Bitrate, "-qp", r.config.Quantization, "-w", r.config.Width, "-h", r.config.Height, "-fps", r.config.FrameRate, "-ih", "-g", r.config.IntraRefreshPeriod, ) case Mjpeg: r.cmd = exec.Command("raspivid", "-cd", "MJPEG", "-o", "-", "-n", "-t", r.config.Timeout, "-fps", r.config.FrameRate, ) } stdout, _ := r.cmd.StdoutPipe() err := r.cmd.Start() r.inputReader = bufio.NewReader(stdout) if err != nil { r.Log(Error, err.Error()) return } r.isRunning = true case File: stats, err := r.inputFile.Stat() if err != nil { r.Log(Error, "Could not get input file stats!") r.Stop() return } data := make([]byte, stats.Size()) _, err = r.inputFile.Read(data) if err != nil { r.Log(Error, err.Error()) r.Stop() return } for i := range data { r.parser.GetInputChan() <- data[i] } } go r.readCamera() go r.outputClips() } // readCamera reads data from the defined camera while the revidInst is running. func (r *revidInst) readCamera() { r.Log(Info, "Reading camera data!") for r.isRunning { data := make([]byte, 1) _, err := io.ReadFull(r.inputReader, data) switch { case err != nil && err.Error() == "EOF" && r.isRunning: r.Log(Error, "No data from camera!") time.Sleep(5 * time.Second) case err != nil && r.isRunning: r.Log(Error, err.Error()) default: r.parser.GetInputChan() <- data[0] } } r.Log(Info, "Out of reading routine!") } // Stop halts any processing of video data from a camera func (r *revidInst) Stop() { if r.isRunning { r.Log(Info, "Stopping revid!") r.isRunning = false if r.cmd != nil { r.cmd.Process.Kill() } } else { r.Log(Warning, "revidInst.Stop() called but revid not running!") } } // packClips takes data segments; whether that be tsPackets or jpeg frames and // packs them into clips 1s long. func (r *revidInst) packClips() { clipSize := 0 packetCount := 0 now := time.Now() prevTime := now for { if clip, err := r.ringBuffer.Get(); err != nil { r.Log(Error, err.Error()) switch r.config.Packetization { case None: r.Log(Warning, "Clearing mjpeg chan!") for len(r.mjpegOutputChan) > 0 { <-(r.mjpegOutputChan) } case Mpegts: r.Log(Warning, "Clearing TS chan!") for len(r.generator.GetTsOutputChan()) > 0 { <-(r.generator.GetTsOutputChan()) } } time.Sleep(1 * time.Second) } else { for { switch r.config.Packetization { case None: frame := <-r.mjpegOutputChan upperBound := clipSize + len(frame) copy(clip[clipSize:upperBound], frame) packetCount++ clipSize += len(frame) case Mpegts: tsPacket := <-(r.generator.GetTsOutputChan()) tsByteSlice, err := tsPacket.ToByteSlice() if err != nil { r.Log(Error, err.Error()) } upperBound := clipSize + mp2tPacketSize copy(clip[clipSize:upperBound], tsByteSlice) packetCount++ clipSize += mp2tPacketSize } // send if (1) our buffer is full or (2) 1 second has elapsed and we have % packetsPerFrame now = time.Now() if now.Sub(prevTime) > clipDuration*time.Second && len(clip) > 0 { if err := r.ringBuffer.DoneWriting(clipSize); err != nil { r.Log(Error, err.Error()) r.Log(Warning, "Dropping clip!") } clipSize = 0 packetCount = 0 prevTime = now break } } } } } // outputClips takes the clips produced in the packClips method and outputs them // to the desired output defined in the revidInst config func (r *revidInst) outputClips() { now := time.Now() prevTime := now bytes := 0 delay := 0 for r.isRunning { switch { case r.ringBuffer.GetNoOfElements() < 2: delay++ time.Sleep(time.Duration(delay) * time.Millisecond) case delay > 10: delay -= 10 } if clip, err := r.ringBuffer.Read(); err == nil { r.Log(Debug, fmt.Sprintf("Delay is: %v\n", delay)) r.Log(Debug, fmt.Sprintf("Ring buffer size: %v\n", r.ringBuffer.GetNoOfElements())) switch r.config.Output { case File: fmt.Println("") r.outputFile.Write(clip) case HttpOut: bytes += len(clip) for err := r.sendClipToHTTP(clip, r.config.HttpAddress); err != nil; { r.Log(Error, err.Error()) r.Log(Warning, "Post failed trying again!") err = r.sendClipToHTTP(clip, r.config.HttpAddress) } default: r.Log(Error, "No output defined!") } if err := r.ringBuffer.DoneReading(); err != nil { r.Log(Error, err.Error()) } now = time.Now() deltaTime := now.Sub(prevTime) if deltaTime > time.Duration(bitrateTime)*time.Second { r.Log(Info, fmt.Sprintf("Bitrate: %v bits/s\n", int64(float64(bytes*8)/float64(deltaTime/1e9)))) r.Log(Info, fmt.Sprintf("Ring buffer size: %v\n", r.ringBuffer.GetNoOfElements())) prevTime = now bytes = 0 } } } } // sendClipToHTTP takes a clip and an output url and posts through http. func (r *revidInst) sendClipToHTTP(clip []byte, output string) error { timeout := time.Duration(httpTimeOut * time.Second) client := http.Client{ Timeout: timeout, } url := output + strconv.Itoa(len(clip)) r.Log(Debug, fmt.Sprintf("Posting %s (%d bytes)\n", url, len(clip))) resp, err := client.Post(url, "video/mp2t", bytes.NewReader(clip)) // lighter than NewBuffer if err != nil { return fmt.Errorf("Error posting to %s: %s", output, err) } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err == nil { r.Log(Debug, fmt.Sprintf("%s\n", body)) } else { r.Log(Error, err.Error()) } return nil }