/* NAME revid.go DESCRIPTION See Readme.md AUTHORS Saxon A. Nelson-Milton Alan Noble LICENSE revid is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. */ // revid is a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols. package revid import ( "bufio" "errors" "fmt" "io" "os" "os/exec" "path/filepath" "strconv" "time" "bitbucket.org/ausocean/av/generator" "bitbucket.org/ausocean/av/parser" "bitbucket.org/ausocean/av/rtmp" "bitbucket.org/ausocean/utils/ring" ) // Misc constants const ( clipDuration = 1 * time.Second mp2tPacketSize = 188 // MPEG-TS packet size mp2tMaxPackets = int(clipDuration * 2016 / time.Second) // # first multiple of 7 and 8 greater than 2000 ringBufferSize = 500 ringBufferElementSize = 150000 writeTimeout = 10 * time.Millisecond readTimeout = 10 * time.Millisecond httpTimeout = 5 * time.Second packetsPerFrame = 7 bitrateTime = 1 * time.Minute mjpegParserInChanLen = 100000 ffmpegPath = "/usr/local/bin/ffmpeg" rtmpConnectionTimout = 10 outputChanSize = 1000 cameraRetryPeriod = 5 * time.Second sendFailedDelay = 5 maxSendFailedErrorCount = 500 clipSizeThreshold = 11 rtmpConnectionMaxTries = 5 raspividNoOfTries = 3 sendingWaitTime = 5 * time.Millisecond ) // Log Types const ( Error = "Error" Warning = "Warning" Info = "Info" Debug = "Debug" Detail = "Detail" ) // Revid provides methods to control a revid session; providing methods // to start, stop and change the state of an instance using the Config struct. type Revid struct { ffmpegPath string tempDir string ringBuffer *ring.Buffer config Config isRunning bool inputFile *os.File generator generator.Generator parser parser.Parser cmd *exec.Cmd inputReader *bufio.Reader ffmpegStdin io.WriteCloser outputChan chan []byte setupInput func() error getFrame func() []byte destination loadSender rtmpInst rtmp.Session bitrate int } // NewRevid returns a pointer to a new Revid with the desired // configuration, and/or an error if construction of the new instant was not // successful. func New(c Config) (*Revid, error) { var r Revid err := r.reset(c) if err != nil { return nil, err } r.ringBuffer = ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout) r.outputChan = make(chan []byte, outputChanSize) return &r, nil } // Bitrate returns the result of the most recent bitrate check. func (r *Revid) Bitrate() int { return r.bitrate } // Config returns the Revid's config. func (r *Revid) Config() *Config { // FIXME(kortschak): This is a massive footgun and should not exist. // Since the config's fields are accessed in running goroutines, any // mutation is a data race. With bad luck a data race is possible by // reading the returned value since it is possible for the running // Ravid to mutate the config it holds. return &r.config } // reset swaps the current config of a Revid with the passed // configuration; checking validity and returning errors if not valid. func (r *Revid) reset(config Config) error { r.config.Logger = config.Logger err := config.Validate(r) if err != nil { return errors.New("Config struct is bad!: " + err.Error()) } r.config = config if r.destination != nil { err = r.destination.close() if err != nil { r.Log(Error, err.Error()) } } switch r.config.Output { case File: s, err := newFileSender(config.OutputFileName) if err != nil { return err } r.destination = s case FfmpegRtmp: s, err := newFfmpegSender(config.RtmpUrl, r.config.FrameRate) if err != nil { return err } r.destination = s case NativeRtmp: s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimout, rtmpConnectionMaxTries, r.Log) if err != nil { return err } r.destination = s case Http: r.destination = newHttpSender(config.RtmpUrl, httpTimeout, r.Log) } switch r.config.Input { case Raspivid: r.setupInput = r.setupInputForRaspivid case File: r.setupInput = r.setupInputForFile } switch r.config.InputCodec { case H264: r.Log(Info, "Using H264 parser!") r.parser = parser.NewH264Parser() case Mjpeg: r.Log(Info, "Using MJPEG parser!") r.parser = parser.NewMJPEGParser(mjpegParserInChanLen) } switch r.config.Packetization { case None: // no packetisation - Revid output chan grabs raw data straight from parser r.parser.SetOutputChan(r.outputChan) r.getFrame = r.getFrameNoPacketization return nil case Mpegts: r.Log(Info, "Using MPEGTS packetisation!") frameRateAsInt, _ := strconv.Atoi(r.config.FrameRate) r.generator = generator.NewTsGenerator(uint(frameRateAsInt)) case Flv: r.Log(Info, "Using FLV packetisation!") frameRateAsInt, _ := strconv.Atoi(r.config.FrameRate) r.generator = generator.NewFlvGenerator(true, true, uint(frameRateAsInt)) } // We have packetization of some sort, so we want to send data to Generator // to perform packetization r.getFrame = r.getFramePacketization r.parser.SetOutputChan(r.generator.InputChan()) return nil } // SetConfig changes the current configuration of the receiver. func (r *Revid) SetConfig(c Config) error { // FIXME(kortschak): This is reimplemented in cmd/revid-cli/main.go. // The implementation in the command is used and this is not. // Decide on one or the other. r.Stop() r, err := New(c) if err != nil { return err } r.Start() return nil } // Log takes a logtype and message and tries to send this information to the // logger provided in the revid config - if there is one, otherwise the message // is sent to stdout func (r *Revid) Log(logType, m string) { if r.config.Verbosity != Yes { return } if r.config.Logger != nil { r.config.Logger.Log("revid", logType, m) return } fmt.Println(logType + ": " + m) } // IsRunning returns whether the receiver is running. func (r *Revid) IsRunning() bool { return r.isRunning } // Start invokes a Revid to start processing video from a defined input // and packetising (if theres packetization) to a defined output. func (r *Revid) Start() { if r.isRunning { r.Log(Warning, "Revid.Start() called but revid already running!") return } r.Log(Info, "Starting Revid!") r.Log(Debug, "Setting up output!") r.isRunning = true r.Log(Info, "Starting output routine!") go r.outputClips() r.Log(Info, "Starting clip packing routine!") go r.packClips() r.Log(Info, "Starting packetisation generator!") r.generator.Start() r.Log(Info, "Starting parser!") r.parser.Start() r.Log(Info, "Setting up input and receiving content!") go r.setupInput() } // Stop halts any processing of video data from a camera or file func (r *Revid) Stop() { if !r.isRunning { r.Log(Warning, "Revid.Stop() called but revid not running!") return } r.Log(Info, "Stopping revid!") r.isRunning = false r.Log(Info, "Stopping generator!") if r.generator != nil { r.generator.Stop() } r.Log(Info, "Stopping parser!") if r.parser != nil { r.parser.Stop() } r.Log(Info, "Killing input proccess!") // If a cmd process is running, we kill! if r.cmd != nil && r.cmd.Process != nil { r.cmd.Process.Kill() } } // getFrameNoPacketization gets a frame directly from the revid output chan // as we don't need to go through the generator with no packetization settings func (r *Revid) getFrameNoPacketization() []byte { return <-r.outputChan } // getFramePacketization gets a frame from the generators output chan - the // the generator being an mpegts or flv generator depending on the config func (r *Revid) getFramePacketization() []byte { return <-r.generator.OutputChan() } // packClips takes data segments; whether that be tsPackets or mjpeg frames and // packs them into clips consisting of the amount frames specified in the config func (r *Revid) packClips() { clipSize := 0 packetCount := 0 for r.isRunning { select { // TODO: This is temporary, need to work out how to make this work // for cases when there is not packetisation. case frame := <-r.generator.OutputChan(): lenOfFrame := len(frame) if lenOfFrame > ringBufferElementSize { r.Log(Warning, fmt.Sprintf("Frame was too big: %v bytes, getting another one!", lenOfFrame)) frame = r.getFrame() lenOfFrame = len(frame) } _, err := r.ringBuffer.Write(frame) if err != nil { r.Log(Error, err.Error()) if err == ring.ErrDropped { r.Log(Warning, fmt.Sprintf("dropped %d byte frame", len(frame))) } } packetCount++ clipSize += lenOfFrame fpcAsInt, err := strconv.Atoi(r.config.FramesPerClip) if err != nil { r.Log(Error, "Frames per clip not quite right! Defaulting to 1!") r.config.FramesPerClip = "1" } if packetCount >= fpcAsInt { r.ringBuffer.Flush() clipSize = 0 packetCount = 0 continue } default: time.Sleep(time.Duration(5) * time.Millisecond) } } } // outputClips takes the clips produced in the packClips method and outputs them // to the desired output defined in the revid config func (r *Revid) outputClips() { now := time.Now() prevTime := now bytes := 0 delay := 0 for r.isRunning { // Here we slow things down as much as we can to decrease cpu usage switch { case r.ringBuffer.Len() < 2: delay++ time.Sleep(time.Duration(delay) * time.Millisecond) case delay > 0: delay-- } // If the ringbuffer has something we can read and send off chunk, err := r.ringBuffer.Next(readTimeout) if err != nil || !r.isRunning { if err == io.EOF { break } continue } bytes += chunk.Len() r.Log(Detail, "About to send") err = r.destination.load(chunk) if err != nil { r.Log(Error, "failed to load clip") } err = r.destination.send() if err == nil { r.Log(Detail, "sent clip") } if r.isRunning && err != nil && chunk.Len() > 11 { r.Log(Debug, "Send failed! Trying again") // Try and send again err = r.destination.send() r.Log(Error, err.Error()) // if there's still an error we try and reconnect, unless we're stopping for r.isRunning && err != nil { r.Log(Debug, "Send failed a again! Trying to reconnect...") time.Sleep(time.Duration(sendFailedDelay) * time.Millisecond) r.Log(Error, err.Error()) if rs, ok := r.destination.(restarter); ok { r.Log(Debug, fmt.Sprintf("restarting %T session", rs)) err = rs.restart() if err != nil { // TODO(kortschak): Make this "Fatal" when that exists. r.Log(Error, "failed to restart rtmp session") r.isRunning = false return } r.Log(Info, "restarted rtmp session") } r.Log(Debug, "Trying to send again with new connection...") err = r.destination.send() if err != nil { r.Log(Error, err.Error()) } } } r.destination.release() r.Log(Detail, "Done reading that clip from ringbuffer...") // Log some information regarding bitrate and ring buffer size if it's time now = time.Now() deltaTime := now.Sub(prevTime) if deltaTime > bitrateTime { // FIXME(kortschak): For subsecond deltaTime, this will give infinite bitrate. r.bitrate = int(float64(bytes*8) / float64(deltaTime/time.Second)) r.Log(Debug, fmt.Sprintf("Bitrate: %v bits/s\n", r.bitrate)) r.Log(Debug, fmt.Sprintf("Ring buffer size: %v\n", r.ringBuffer.Len())) prevTime = now bytes = 0 } } r.Log(Info, "Not outputting clips anymore!") err := r.destination.close() if err != nil { r.Log(Error, "failed to close destination") } } // setupInputForRaspivid sets up things for input from raspivid i.e. starts // a raspivid process and pipes it's data output. func (r *Revid) setupInputForRaspivid() error { r.Log(Info, "Starting raspivid!") switch r.config.InputCodec { case H264: arguments := []string{"-cd", "H264", "-o", "-", "-n", "-t", r.config.Timeout, "-b", r.config.Bitrate, "-w", r.config.Width, "-h", r.config.Height, "-fps", r.config.FrameRate, "-ih", "-g", r.config.IntraRefreshPeriod, } if r.config.QuantizationMode == QuantizationOn { arguments = append(arguments, "-qp") arguments = append(arguments, r.config.Quantization) } if r.config.HorizontalFlip == Yes { arguments = append(arguments, "-hf") } if r.config.VerticalFlip == Yes { arguments = append(arguments, "-vf") } name := "raspivid" r.cmd = &exec.Cmd{ Path: name, Args: append([]string{name}, arguments...), } r.Log(Info, fmt.Sprintf("Startin raspivid with args: %v", r.cmd.Args)) if filepath.Base(name) == name { if lp, err := exec.LookPath(name); err != nil { r.Log(Error, err.Error()) return err } else { r.cmd.Path = lp } } case Mjpeg: r.cmd = exec.Command("raspivid", "-cd", "MJPEG", "-o", "-", "-n", "-t", r.config.Timeout, "-fps", r.config.FrameRate, ) } stdout, _ := r.cmd.StdoutPipe() go r.cmd.Run() r.inputReader = bufio.NewReader(stdout) go r.readCamera() return nil } // setupInputForFile sets things up for getting input from a file func (r *Revid) setupInputForFile() error { fps, _ := strconv.Atoi(r.config.FrameRate) r.parser.SetDelay(uint(float64(1000) / float64(fps))) r.readFile() return nil } // readCamera reads data from the defined camera while the Revid is running. // TODO: use ringbuffer here instead of allocating mem every time! func (r *Revid) readCamera() { r.Log(Info, "Reading camera data!") for r.isRunning { data := make([]byte, 1) _, err := io.ReadFull(r.inputReader, data) switch { // We know this means we're getting nothing from the cam case (err != nil && err.Error() == "EOF" && r.isRunning) || (err != nil && r.isRunning): r.Log(Error, "No data from camera!") time.Sleep(cameraRetryPeriod) default: r.parser.InputChan() <- data[0] } } r.Log(Info, "Not trying to read from camera anymore!") } // readFile reads data from the defined file while the Revid is running. func (r *Revid) readFile() error { var err error r.inputFile, err = os.Open(r.config.InputFileName) if err != nil { r.Log(Error, err.Error()) r.Stop() return err } stats, err := r.inputFile.Stat() if err != nil { r.Log(Error, "Could not get input file stats!") r.Stop() return err } data := make([]byte, stats.Size()) _, err = r.inputFile.Read(data) if err != nil { r.Log(Error, err.Error()) r.Stop() return err } for i := range data { r.parser.InputChan() <- data[i] } r.inputFile.Close() return nil }