/* NAME revid.go DESCRIPTION See Readme.md AUTHORS Saxon A. Nelson-Milton Alan Noble LICENSE revid is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. */ // revid is a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols. package revid import ( "errors" "fmt" "io" "os" "os/exec" "strconv" "time" "bitbucket.org/ausocean/av/encoding" "bitbucket.org/ausocean/av/encoding/flv" "bitbucket.org/ausocean/av/encoding/mts" "bitbucket.org/ausocean/av/parse" "bitbucket.org/ausocean/av/rtmp" "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/ring" ) // Misc constants const ( clipDuration = 1 * time.Second mp2tPacketSize = 188 // MPEG-TS packet size mp2tMaxPackets = int(clipDuration * 2016 / time.Second) // # first multiple of 7 and 8 greater than 2000 ringBufferSize = 500 ringBufferElementSize = 150000 writeTimeout = 10 * time.Millisecond readTimeout = 10 * time.Millisecond httpTimeout = 5 * time.Second packetsPerFrame = 7 bitrateTime = 1 * time.Minute mjpegParserInChanLen = 100000 ffmpegPath = "/usr/local/bin/ffmpeg" rtmpConnectionTimout = 10 outputChanSize = 1000 cameraRetryPeriod = 5 * time.Second sendFailedDelay = 5 maxSendFailedErrorCount = 500 clipSizeThreshold = 11 rtmpConnectionMaxTries = 5 raspividNoOfTries = 3 sendingWaitTime = 5 * time.Millisecond runContinuously = "0" // -t arg to raspivid ) // Log Types const ( Error = "Error" Warning = "Warning" Info = "Info" Debug = "Debug" Detail = "Detail" ) // Revid provides methods to control a revid session; providing methods // to start, stop and change the state of an instance using the Config struct. type Revid struct { ffmpegPath string tempDir string ringBuffer *ring.Buffer config Config isRunning bool encoder encoding.Encoder parse func(dst io.Writer, src io.Reader, delay time.Duration) error cmd *exec.Cmd inputReader io.ReadCloser ffmpegStdin io.WriteCloser outputChan chan []byte setupInput func() error getFrame func() []byte destination loadSender rtmpInst rtmp.Session bitrate int ns *netsender.Sender } // New returns a pointer to a new Revid with the desired configuration, and/or // an error if construction of the new instance was not successful. func New(c Config, ns *netsender.Sender) (*Revid, error) { var r Revid err := r.reset(c) if err != nil { return nil, err } r.ringBuffer = ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout) r.outputChan = make(chan []byte, outputChanSize) r.ns = ns return &r, nil } // Bitrate returns the result of the most recent bitrate check. func (r *Revid) Bitrate() int { return r.bitrate } // Config returns the Revid's config. func (r *Revid) Config() *Config { // FIXME(kortschak): This is a massive footgun and should not exist. // Since the config's fields are accessed in running goroutines, any // mutation is a data race. With bad luck a data race is possible by // reading the returned value since it is possible for the running // Ravid to mutate the config it holds. return &r.config } // reset swaps the current config of a Revid with the passed // configuration; checking validity and returning errors if not valid. func (r *Revid) reset(config Config) error { r.config.Logger = config.Logger err := config.Validate(r) if err != nil { return errors.New("Config struct is bad!: " + err.Error()) } r.config = config if r.destination != nil { err = r.destination.close() if err != nil { r.Log(Error, err.Error()) } } switch r.config.Output { case File: s, err := newFileSender(config.OutputFileName) if err != nil { return err } r.destination = s case FfmpegRtmp: s, err := newFfmpegSender(config.RtmpUrl, r.config.FrameRate) if err != nil { return err } r.destination = s case Rtmp: s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimout, rtmpConnectionMaxTries, r.Log) if err != nil { return err } r.destination = s case Http: r.destination = newHttpSender(r.ns, r.Log) } switch r.config.Input { case Raspivid: r.setupInput = r.startRaspivid case File: r.setupInput = r.setupInputForFile } switch r.config.InputCodec { case H264: r.Log(Info, "Using H264 parser!") r.parse = parse.H264 case Mjpeg: r.Log(Info, "Using MJPEG parser!") r.parse = parse.MJPEG } switch r.config.Packetization { case None: // no packetisation - Revid output chan grabs raw data straight from parser r.parse = func(dst io.Writer, src io.Reader, _ time.Duration) error { // FIXME(kortschak): Reduce this allocation mess. It exists // because we do not know that the dst will not modify the // buffer. It shouldn't, but ... for { var b [4 << 10]byte n, rerr := src.Read(b[:]) _, werr := dst.Write(b[:n]) if rerr != nil { return rerr } if werr != nil { return werr } } } r.getFrame = r.getFrameNoPacketization return nil case Mpegts: r.Log(Info, "Using MPEGTS packetisation") frameRate, _ := strconv.ParseFloat(r.config.FrameRate, 64) r.encoder = mts.NewEncoder(frameRate) case Flv: r.Log(Info, "Using FLV packetisation") frameRate, _ := strconv.Atoi(r.config.FrameRate) r.encoder = flv.NewEncoder(true, true, frameRate) } // We have packetization of some sort, so we want to send data to Generator // to perform packetization r.getFrame = r.getFramePacketization return nil } // Log takes a logtype and message and tries to send this information to the // logger provided in the revid config - if there is one, otherwise the message // is sent to stdout func (r *Revid) Log(logType, m string) { if r.config.Verbosity != Yes { return } if r.config.Logger != nil { r.config.Logger.Log("revid", logType, m) return } fmt.Println(logType + ": " + m) } // IsRunning returns whether the receiver is running. func (r *Revid) IsRunning() bool { return r.isRunning } // Start invokes a Revid to start processing video from a defined input // and packetising (if theres packetization) to a defined output. func (r *Revid) Start() { if r.isRunning { r.Log(Warning, "Revid.Start() called but revid already running!") return } r.Log(Info, "Starting Revid") r.Log(Debug, "Setting up output") r.isRunning = true r.Log(Info, "Starting output routine") go r.outputClips() r.Log(Info, "Starting clip packing routine") go r.packClips() r.Log(Info, "Setting up input and receiving content") go r.setupInput() } // Stop halts any processing of video data from a camera or file func (r *Revid) Stop() { if !r.isRunning { r.Log(Warning, "Revid.Stop() called but revid not running!") return } r.Log(Info, "Stopping revid!") r.isRunning = false r.Log(Info, "Killing input proccess!") // If a cmd process is running, we kill! if r.cmd != nil && r.cmd.Process != nil { r.cmd.Process.Kill() } } // getFrameNoPacketization gets a frame directly from the revid output chan // as we don't need to go through the encoder with no packetization settings func (r *Revid) getFrameNoPacketization() []byte { return <-r.outputChan } // getFramePacketization gets a frame from the generators output chan - the // the encoder being an mpegts or flv encoder depending on the config func (r *Revid) getFramePacketization() []byte { return <-r.encoder.Stream() } // TODO(kortschak): Factor this out to an io.Writer type and remove the Stream chans. // Also add a no-op encoder that handles non-packeted data. // // packClips takes data segments; whether that be tsPackets or mjpeg frames and // packs them into clips consisting of the amount frames specified in the config func (r *Revid) packClips() { clipSize := 0 packetCount := 0 for r.isRunning { select { // TODO: This is temporary, need to work out how to make this work // for cases when there is not packetisation. case frame := <-r.encoder.Stream(): lenOfFrame := len(frame) if lenOfFrame > ringBufferElementSize { r.Log(Warning, fmt.Sprintf("Frame was too big: %v bytes, getting another one!", lenOfFrame)) frame = r.getFrame() lenOfFrame = len(frame) } _, err := r.ringBuffer.Write(frame) if err != nil { if err == ring.ErrDropped { r.Log(Warning, fmt.Sprintf("dropped %d byte frame", len(frame))) } else { r.Log(Error, err.Error()) } } packetCount++ clipSize += lenOfFrame if packetCount >= r.config.FramesPerClip { r.ringBuffer.Flush() clipSize = 0 packetCount = 0 continue } default: time.Sleep(5 * time.Millisecond) } } } // outputClips takes the clips produced in the packClips method and outputs them // to the desired output defined in the revid config func (r *Revid) outputClips() { now := time.Now() prevTime := now bytes := 0 delay := 0 for r.isRunning { // Here we slow things down as much as we can to decrease cpu usage switch { case r.ringBuffer.Len() < 2: delay++ time.Sleep(time.Duration(delay) * time.Millisecond) case delay > 0: delay-- } // If the ringbuffer has something we can read and send off chunk, err := r.ringBuffer.Next(readTimeout) if err != nil || !r.isRunning { if err == io.EOF { break } continue } bytes += chunk.Len() r.Log(Detail, "About to send") err = r.destination.load(chunk) if err != nil { r.Log(Error, "failed to load clip") } err = r.destination.send() if err == nil { r.Log(Detail, "sent clip") } if r.isRunning && err != nil && chunk.Len() > 11 { r.Log(Debug, "Send failed! Trying again") // Try and send again err = r.destination.send() r.Log(Error, err.Error()) // if there's still an error we try and reconnect, unless we're stopping for r.isRunning && err != nil { r.Log(Debug, "Send failed a again! Trying to reconnect...") time.Sleep(time.Duration(sendFailedDelay) * time.Millisecond) r.Log(Error, err.Error()) if rs, ok := r.destination.(restarter); ok { r.Log(Debug, fmt.Sprintf("restarting %T session", rs)) err = rs.restart() if err != nil { // TODO(kortschak): Make this "Fatal" when that exists. r.Log(Error, "failed to restart rtmp session") r.isRunning = false return } r.Log(Info, "restarted rtmp session") } r.Log(Debug, "Trying to send again with new connection...") err = r.destination.send() if err != nil { r.Log(Error, err.Error()) } } } r.destination.release() r.Log(Detail, "Done reading that clip from ringbuffer...") // Log some information regarding bitrate and ring buffer size if it's time now = time.Now() deltaTime := now.Sub(prevTime) if deltaTime > bitrateTime { // FIXME(kortschak): For subsecond deltaTime, this will give infinite bitrate. r.bitrate = int(float64(bytes*8) / float64(deltaTime/time.Second)) r.Log(Debug, fmt.Sprintf("Bitrate: %v bits/s\n", r.bitrate)) r.Log(Debug, fmt.Sprintf("Ring buffer size: %v\n", r.ringBuffer.Len())) prevTime = now bytes = 0 } } r.Log(Info, "Not outputting clips anymore!") err := r.destination.close() if err != nil { r.Log(Error, "failed to close destination") } } // startRaspivid sets up things for input from raspivid i.e. starts // a raspivid process and pipes it's data output. func (r *Revid) startRaspivid() error { r.Log(Info, "Starting raspivid!") switch r.config.InputCodec { case H264: args := []string{ "-cd", "H264", "-o", "-", "-n", "-t", runContinuously, "-b", r.config.Bitrate, "-w", r.config.Width, "-h", r.config.Height, "-fps", r.config.FrameRate, "-ih", "-g", r.config.IntraRefreshPeriod, } if r.config.QuantizationMode == QuantizationOn { args = append(args, "-qp", r.config.Quantization) } if r.config.HorizontalFlip == Yes { args = append(args, "-hf") } if r.config.VerticalFlip == Yes { args = append(args, "-vf") } r.Log(Info, fmt.Sprintf("Starting raspivid with args: %v", args)) r.cmd = exec.Command("raspivid", args...) case Mjpeg: r.cmd = exec.Command("raspivid", "-cd", "MJPEG", "-o", "-", "-n", "-t", runContinuously, "-fps", r.config.FrameRate, ) } stdout, err := r.cmd.StdoutPipe() if err != nil { return err } err = r.cmd.Start() if err != nil { return err } r.inputReader = stdout go func() { r.Log(Info, "Reading camera data!") r.parse(chunkWriter{r.encoder}, r.inputReader, 0) r.Log(Info, "Not trying to read from camera anymore!") }() return nil } // setupInputForFile sets things up for getting input from a file func (r *Revid) setupInputForFile() error { fps, err := strconv.Atoi(r.config.FrameRate) if err != nil { return err } delay := time.Second / time.Duration(fps) f, err := os.Open(r.config.InputFileName) if err != nil { r.Log(Error, err.Error()) r.Stop() return err } defer f.Close() // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. return r.parse(chunkWriter{r.encoder}, f, delay) } // TODO(kortschak): Remove this type and revise the signature of // the parsers to accept an encoding.Encoder. // // chunkWriter is a shim between the new function-based approach // and the old flow-based approach. type chunkWriter struct { encoding.Encoder } func (w chunkWriter) Write(b []byte) (int, error) { err := w.Encoder.Encode(b) return len(b), err }