/* NAME revid - a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols. DESCRIPTION See Readme.md AUTHORS Alan Noble Saxon A. Nelson-Milton LICENSE revid is Copyright (C) 2017 Alan Noble. It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). */ // revid is a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols. package revid import ( "bufio" "bytes" "errors" "fmt" "io" "io/ioutil" "net/http" "os" "os/exec" "strconv" "time" "bitbucket.org/ausocean/av/parser" "bitbucket.org/ausocean/av/tsgenerator" //"../parser" //"../tsgenerator" "bitbucket.org/ausocean/av/ringbuffer" "bitbucket.org/ausocean/utils/smartLogger" //"../../utils/smartLogger" //"../ringbuffer" ) // Misc constants const ( clipDuration = 1 // s mp2tPacketSize = 188 // MPEG-TS packet size mp2tMaxPackets = 2016 * clipDuration // # first multiple of 7 and 8 greater than 2000 ringBufferSize = 100 / clipDuration ringBufferElementSize = 500000 httpTimeOut = 5 // s packetsPerFrame = 7 h264BufferSize = 500000 bitrateTime = 60 mjpegParserInChanLen = 100000 ) // Log Types const ( Error = "Error" Warning = "Warning" Info = "Info" Debug = "Debug" ) // Config provides parameters relevant to a revid instance. A new config must // be passed to the constructor. type Config struct { Input uint8 InputCodec uint8 Output uint8 OutputFileName string InputFileName string Height string Width string FrameRate string HttpAddress string Quantization string Timeout string Packetization uint8 IntraRefreshPeriod string Logger smartLogger.LogInstance } // Enums for config struct const ( NothingDefined = 0 Raspivid = 1 Rtp = 2 H264Codec = 3 File = 4 HttpOut = 5 H264 = 6 Mjpeg = 7 None = 8 Mpegts = 9 ) // Default config settings const ( defaultFrameRate = "25" defaultWidth = "1280" defaultHeight = "720" defaultIntraRefreshPeriod = "100" defaultTimeout = "0" defaultQuantization = "35" ) // RevidInst provides methods to control a revidInst session; providing methods // to start, stop and change the state of an instance using the Config struct. type RevidInst interface { Start() Stop() ChangeState(newconfig Config) error GetConfigRef() *Config Log(logType, m string) IsRunning() bool } // The revidInst struct provides fields to describe the state of a RevidInst. type revidInst struct { ffmpegPath string tempDir string ringBuffer ringbuffer.RingBuffer config Config isRunning bool outputFile *os.File inputFile *os.File generator tsgenerator.TsGenerator parser parser.Parser cmd *exec.Cmd inputReader *bufio.Reader mjpegOutputChan chan []byte } // NewRevidInstance returns a pointer to a new revidInst with the desired // configuration, and/or an error if construction of the new instant was not // successful. func NewRevidInstance(config Config) (r *revidInst, err error) { r = new(revidInst) r.ringBuffer = ringbuffer.NewRingBuffer(ringBufferSize, ringBufferElementSize) err = r.ChangeState(config) if err != nil { return nil, err } switch r.config.Output { case File: r.outputFile, err = os.Create(r.config.OutputFileName) if err != nil { return nil, err } } switch r.config.Input { case File: r.inputFile, err = os.Open(r.config.InputFileName) if err != nil { return nil, err } } switch r.config.InputCodec { case H264: r.Log(Info, "Using H264 parser!") r.parser = parser.NewH264Parser() case Mjpeg: r.parser = parser.NewMJPEGParser(mjpegParserInChanLen) } r.mjpegOutputChan = make(chan []byte, 10000) switch r.config.Packetization { case None: r.parser.SetOutputChan(r.mjpegOutputChan) case Mpegts: frameRateAsInt, _ := strconv.Atoi(r.config.FrameRate) r.generator = tsgenerator.NewTsGenerator(uint(frameRateAsInt)) r.parser.SetOutputChan(r.generator.GetNalInputChan()) r.generator.Start() } r.parser.Start() go r.packClips() r.Log(Info, "New revid instance created! config is:") r.Log(Info, fmt.Sprintf("%v", r.config)) return } // GetConfigRef returns a pointer to the revidInst's Config struct object func (r *revidInst) GetConfigRef() *Config { return &r.config } // ChangeState swaps the current config of a revidInst with the passed // configuration; checking validity and returning errors if not valid. func (r *revidInst) ChangeState(config Config) error { r.config.Logger = config.Logger switch config.Input { case Rtp: case Raspivid: case File: case NothingDefined: r.Log(Warning, "No input type defined, defaulting to raspivid!") config.Input = Raspivid default: return errors.New("Bad input type defined in config!") } switch config.InputCodec { case H264: case Mjpeg: case NothingDefined: r.Log(Warning, "No input codec defined, defaulting to h264!") config.InputCodec = H264 default: return errors.New("Bad input codec defined in config!") } switch config.Output { case HttpOut: case File: case NothingDefined: r.Log(Warning, "No output defined, defaulting to httpOut!") config.Output = HttpOut default: return errors.New("Bad output type defined in config!") } switch config.Packetization { case None: case Mpegts: case NothingDefined: r.Log(Warning, "No packetization option defined, defaulting to none!") config.Packetization = None default: return errors.New("Bad packetization option defined in config!") } if config.Width == "" { r.Log(Warning, "No width defined, defaulting to 1280!") config.Width = defaultWidth } else { if integer, err := strconv.Atoi(config.Width); integer < 0 || err != nil { return errors.New("Bad width defined in config!") } } if config.Height == "" { r.Log(Warning, "No height defined, defaulting to 720!") config.Height = defaultHeight } else { if integer, err := strconv.Atoi(config.Height); integer < 0 || err != nil { return errors.New("Bad height defined in config!") } } if config.FrameRate == "" { r.Log(Warning, "No frame rate defined, defaulting to 25!") config.FrameRate = defaultFrameRate } else { if integer, err := strconv.Atoi(config.FrameRate); integer < 0 || err != nil { return errors.New("Bad frame rate defined in config!") } } if config.Timeout == "" { r.Log(Warning, "No timeout defined, defaulting to 0!") config.Timeout = defaultTimeout } else { if integer, err := strconv.Atoi(config.Timeout); integer < 0 || err != nil { return errors.New("Bad timeout defined in config!") } } if config.IntraRefreshPeriod == "" { r.Log(Warning, "No intra refresh defined, defaulting to 100!") config.IntraRefreshPeriod = defaultIntraRefreshPeriod } else { if integer, err := strconv.Atoi(config.IntraRefreshPeriod); integer < 0 || err != nil { return errors.New("Bad intra refresh defined in config!") } } if config.Quantization == "" { r.Log(Warning, "No quantization defined, defaulting to 35!") config.Quantization = defaultQuantization } else { if integer, err := strconv.Atoi(config.Quantization); integer <= 0 || integer > 51 || err != nil { return errors.New("Bad quantization defined in config!") } } r.config = config return nil } // Log takes a logtype and message and tries to send this information to the // logger provided in the revidInst config - if there is one, otherwise the message // is sent to stdout func (r *revidInst) Log(logType, m string) { if r.config.Logger != nil { r.config.Logger.Log(logType, m) } else { fmt.Println(logType + ": " + m) } } // IsRunning returns true if the revidInst is currently running and false otherwise func (r *revidInst) IsRunning() bool { return r.isRunning } // Start invokes a revidInst to start processing video from a defined input // and packetising to a defined output. func (r *revidInst) Start() { if r.isRunning { r.Log(Warning, "revidInst.Start() called but revid already running!") return } r.Log(Info, "Starting Revid!") switch r.config.Input { case Raspivid: r.Log(Info, "Starting raspivid!") var codec string switch r.config.InputCodec { case H264: codec = "H264" r.cmd = exec.Command("raspivid", "-cd", codec, "-o", "-", "-n", "-t", r.config.Timeout, "-b", "0", "-qp", r.config.Quantization, "-w", r.config.Width, "-h", r.config.Height, "-fps", r.config.FrameRate, "-ih", "-g", r.config.IntraRefreshPeriod, ) case Mjpeg: codec = "MJPEG" r.cmd = exec.Command("raspivid", "-cd", codec, "-o", "-", "-n", "-t", r.config.Timeout, /* "-b", "0", "-qp", r.config.Quantization, "-w", r.config.Width, "-h", r.config.Height, "-fps", r.config.FrameRate, */ ) } stdout, _ := r.cmd.StdoutPipe() err := r.cmd.Start() r.inputReader = bufio.NewReader(stdout) if err != nil { r.Log(Error, err.Error()) return } r.isRunning = true case File: stats, err := r.inputFile.Stat() if err != nil { r.Log(Error, "Could not get input file stats!") r.Stop() return } data := make([]byte, stats.Size()) _, err = r.inputFile.Read(data) if err != nil { r.Log(Error, err.Error()) r.Stop() return } for i := range data { r.parser.GetInputChan() <- data[i] } } go r.readCamera() go r.outputClips() } // readCamera reads data from the defined camera while the revidInst is running. func (r *revidInst) readCamera() { r.Log(Info, "Reading camera data!") for r.isRunning { data := make([]byte, 1) _, err := io.ReadFull(r.inputReader, data) switch { case err != nil && err.Error() == "EOF" && r.isRunning: r.Log(Error, "No data from camera!") time.Sleep(5 * time.Second) case err != nil && r.isRunning: r.Log(Error, err.Error()) default: r.parser.GetInputChan() <- data[0] } } r.Log(Info, "Out of reading routine!") } // Stop halts any processing of video data from a camera func (r *revidInst) Stop() { if r.isRunning { r.Log(Info, "Stopping revid!") r.isRunning = false if r.cmd != nil { r.cmd.Process.Kill() } } else { r.Log(Warning, "revidInst.Stop() called but revid not running!") } } // packClips takes data segments; whether that be tsPackets or jpeg frames and // packs them into clips 1s long. func (r *revidInst) packClips() { clipSize := 0 packetCount := 0 now := time.Now() prevTime := now for { if clip, err := r.ringBuffer.Get(); err != nil { r.Log(Error, err.Error()) switch r.config.Packetization { case None: r.Log(Warning, "Clearing mjpeg chan!") for len(r.mjpegOutputChan) > 0 { <-(r.mjpegOutputChan) } case Mpegts: r.Log(Warning, "Clearing TS chan!") for len(r.generator.GetTsOutputChan()) > 0 { <-(r.generator.GetTsOutputChan()) } } time.Sleep(1 * time.Second) } else { for { switch r.config.Packetization { case None: frame := <-r.mjpegOutputChan upperBound := clipSize + len(frame) fmt.Printf("len(clip): %v\n", len(clip)) fmt.Printf("clipsize: %v\n", clipSize) fmt.Printf("upperBound: %v\n", upperBound) fmt.Printf("len(frame): %v\n", len(frame)) copy(clip[clipSize:upperBound], frame) packetCount++ clipSize += len(frame) case Mpegts: tsPacket := <-(r.generator.GetTsOutputChan()) tsByteSlice, err := tsPacket.ToByteSlice() if err != nil { r.Log(Error, err.Error()) } upperBound := clipSize + mp2tPacketSize copy(clip[clipSize:upperBound], tsByteSlice) packetCount++ clipSize += mp2tPacketSize } // send if (1) our buffer is full or (2) 1 second has elapsed and we have % packetsPerFrame now = time.Now() if now.Sub(prevTime) > clipDuration*time.Second && len(clip) > 0 { if err := r.ringBuffer.DoneWriting(clipSize); err != nil { r.Log(Error, err.Error()) r.Log(Warning, "Dropping clip!") } clipSize = 0 packetCount = 0 prevTime = now break } } } } } // outputClips takes the clips produced in the packClips method and outputs them // to the desired output defined in the revidInst config func (r *revidInst) outputClips() { now := time.Now() prevTime := now bytes := 0 delay := 0 for r.isRunning { switch { case r.ringBuffer.GetNoOfElements() < 2: delay++ time.Sleep(time.Duration(delay) * time.Millisecond) case delay > 10: delay -= 10 } if clip, err := r.ringBuffer.Read(); err == nil { r.Log(Debug, fmt.Sprintf("Delay is: %v\n", delay)) r.Log(Debug, fmt.Sprintf("Ring buffer size: %v\n", r.ringBuffer.GetNoOfElements())) switch r.config.Output { case File: fmt.Println("") r.outputFile.Write(clip) case HttpOut: bytes += len(clip) for err := r.sendClipToHTTP(clip, r.config.HttpAddress); err != nil; { r.Log(Error, err.Error()) r.Log(Warning, "Post failed trying again!") err = r.sendClipToHTTP(clip, r.config.HttpAddress) } default: r.Log(Error, "No output defined!") } if err := r.ringBuffer.DoneReading(); err != nil { r.Log(Error, err.Error()) } now = time.Now() deltaTime := now.Sub(prevTime) if deltaTime > time.Duration(bitrateTime)*time.Second { r.Log(Info, fmt.Sprintf("Bitrate: %v bits/s\n", int64(float64(bytes*8)/float64(deltaTime/1e9)))) r.Log(Info, fmt.Sprintf("Ring buffer size: %v\n", r.ringBuffer.GetNoOfElements())) prevTime = now bytes = 0 } } } } // sendClipToHTTP takes a clip and an output url and posts through http. func (r *revidInst) sendClipToHTTP(clip []byte, output string) error { timeout := time.Duration(httpTimeOut * time.Second) client := http.Client{ Timeout: timeout, } url := output + strconv.Itoa(len(clip)) r.Log(Debug, fmt.Sprintf("Posting %s (%d bytes)\n", url, len(clip))) resp, err := client.Post(url, "video/mp2t", bytes.NewReader(clip)) // lighter than NewBuffer if err != nil { return fmt.Errorf("Error posting to %s: %s", output, err) } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err == nil { r.Log(Debug, fmt.Sprintf("%s\n", body)) } else { r.Log(Error, err.Error()) } return nil }