/* NAME revid.go DESCRIPTION See Readme.md AUTHORS Saxon A. Nelson-Milton Alan Noble LICENSE revid is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. */ // revid is a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols. package revid import ( "errors" "fmt" "io" "os" "os/exec" "strconv" "strings" "sync" "time" "bitbucket.org/ausocean/av/stream" "bitbucket.org/ausocean/av/stream/flv" "bitbucket.org/ausocean/av/stream/lex" "bitbucket.org/ausocean/av/stream/mts" "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/ring" ) // Ring buffer sizes and read/write timeouts. const ( ringBufferSize = 1000 ringBufferElementSize = 150000 writeTimeout = 10 * time.Millisecond readTimeout = 10 * time.Millisecond ) // RTMP connection properties. const ( rtmpConnectionMaxTries = 5 rtmpConnectionTimeout = 10 ) // Duration of video for each clip sent out. const clipDuration = 1 * time.Second // Time duration between bitrate checks. const bitrateTime = 1 * time.Minute // After a send fail, this is the delay before another send. const sendFailedDelay = 5 * time.Millisecond const ffmpegPath = "/usr/local/bin/ffmpeg" const pkg = "revid:" type Logger interface { SetLevel(int8) Log(level int8, message string, params ...interface{}) } // Revid provides methods to control a revid session; providing methods // to start, stop and change the state of an instance using the Config struct. type Revid struct { // config holds the Revid configuration. // For historical reasons it also handles logging. // FIXME(kortschak): The relationship of concerns // in config/ns is weird. config Config // ns holds the netsender.Sender responsible for HTTP. ns *netsender.Sender // setupInput holds the current approach to setting up // the input stream. setupInput func() error // cmd is the exec'd process that may be used to produce // the input stream. // FIXME(kortschak): This should not exist. Replace this // with a context.Context cancellation. cmd *exec.Cmd // lexTo, encoder and packer handle transcoding the input stream. lexTo func(dst stream.Encoder, src io.Reader, delay time.Duration) error encoder stream.Encoder packer packer // buffer handles passing frames from the transcoder // to the target destination. buffer *ring.Buffer // destination is the target endpoint. destination []loadSender // rtpSender is an unbuffered sender. // It is used to isolate RTP from ring buffer-induced delays. rtpSender *rtpSender // bitrate hold the last send bitrate calculation result. bitrate int mu sync.Mutex isRunning bool wg sync.WaitGroup err chan error } // packer takes data segments and packs them into clips // of the number frames specified in the owners config. type packer struct { owner *Revid lastTime time.Time packetCount uint } // Write implements the io.Writer interface. // // Unless the ring buffer returns an error, all writes // are deemed to be successful, although a successful // write may include a dropped frame. func (p *packer) Write(frame []byte) (int, error) { if len(frame) > ringBufferElementSize { p.owner.config.Logger.Log(logger.Warning, pkg+"frame was too big", "frame size", len(frame)) return len(frame), nil } if len(p.owner.destination) != 0 { n, err := p.owner.buffer.Write(frame) if err != nil { if err == ring.ErrDropped { p.owner.config.Logger.Log(logger.Warning, pkg+"dropped frame", "frame size", len(frame)) return len(frame), nil } p.owner.config.Logger.Log(logger.Error, pkg+"unexpected ring buffer write error", "error", err.Error()) return n, err } } // If we have an rtp sender bypass ringbuffer and give straight to sender if p.owner.rtpSender != nil { err := p.owner.rtpSender.send(frame) if err != nil { p.owner.config.Logger.Log(logger.Error, pkg+"rtp send failed with error", "error", err.Error()) } } p.packetCount++ var hasRtmp bool for _, d := range p.owner.config.Outputs { if d == Rtmp { hasRtmp = true break } } now := time.Now() if hasRtmp || (now.Sub(p.lastTime) > clipDuration && p.packetCount%7 == 0) { p.owner.buffer.Flush() p.packetCount = 0 p.lastTime = now } return len(frame), nil } // New returns a pointer to a new Revid with the desired configuration, and/or // an error if construction of the new instance was not successful. func New(c Config, ns *netsender.Sender) (*Revid, error) { r := Revid{ns: ns, err: make(chan error)} r.packer.owner = &r err := r.reset(c) if err != nil { return nil, err } go r.handleErrors() return &r, nil } // TODO(Saxon): put more thought into error severity. func (r *Revid) handleErrors() { for { err := <-r.err if err != nil { r.config.Logger.Log(logger.Error, pkg+"async error", "error", err.Error()) err = r.Stop() if err != nil { r.config.Logger.Log(logger.Fatal, pkg+"failed to stop", "error", err.Error()) } err = r.Start() if err != nil { r.config.Logger.Log(logger.Fatal, pkg+"failed to restart", "error", err.Error()) } } } } // Bitrate returns the result of the most recent bitrate check. func (r *Revid) Bitrate() int { return r.bitrate } // reset swaps the current config of a Revid with the passed // configuration; checking validity and returning errors if not valid. func (r *Revid) reset(config Config) error { r.config.Logger = config.Logger err := config.Validate(r) if err != nil { return errors.New("Config struct is bad: " + err.Error()) } r.config = config r.buffer = ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout) r.destination = r.destination[:0] for _, typ := range r.config.Outputs { switch typ { case File: s, err := newFileSender(config.OutputPath) if err != nil { return err } r.destination = append(r.destination, s) case FfmpegRtmp: s, err := newFfmpegSender(config.RtmpUrl, fmt.Sprint(r.config.FrameRate)) if err != nil { return err } r.destination = append(r.destination, s) case Rtmp: s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log) if err != nil { return err } r.destination = append(r.destination, s) case Http: r.destination = append(r.destination, newHttpSender(r.ns, r.config.Logger.Log)) case Udp: s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log) if err != nil { return err } r.destination = append(r.destination, s) case Rtp: r.rtpSender, err = newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) if err != nil { return err } } } switch r.config.Input { case Raspivid: r.setupInput = r.startRaspivid case V4L: r.setupInput = r.startV4L case File: r.setupInput = r.setupInputForFile } switch r.config.InputCodec { case H264: r.config.Logger.Log(logger.Info, pkg+"using H264 lexer") r.lexTo = lex.H264 case Mjpeg: r.config.Logger.Log(logger.Info, pkg+"using MJPEG lexer") r.lexTo = lex.MJPEG } switch r.config.Packetization { case None: // no packetisation - Revid output chan grabs raw data straight from parser r.lexTo = func(dst stream.Encoder, src io.Reader, _ time.Duration) error { for { var b [4 << 10]byte n, rerr := src.Read(b[:]) werr := dst.Encode(b[:n]) if rerr != nil { return rerr } if werr != nil { return werr } } } r.encoder = stream.NopEncoder(&r.packer) case Mpegts: r.config.Logger.Log(logger.Info, pkg+"using MPEGTS packetisation") r.encoder = mts.NewEncoder(&r.packer, float64(r.config.FrameRate)) case Flv: r.config.Logger.Log(logger.Info, pkg+"using FLV packetisation") r.encoder, err = flv.NewEncoder(&r.packer, true, true, int(r.config.FrameRate)) if err != nil { r.config.Logger.Log(logger.Fatal, pkg+"failed to open FLV encoder", err.Error()) } } return nil } // IsRunning returns true if revid is running. func (r *Revid) IsRunning() bool { r.mu.Lock() ret := r.isRunning r.mu.Unlock() return ret } func (r *Revid) Config() Config { r.mu.Lock() cfg := r.config r.mu.Unlock() return cfg } // setIsRunning sets r.isRunning using b. func (r *Revid) setIsRunning(b bool) { r.mu.Lock() r.isRunning = b r.mu.Unlock() } // Start invokes a Revid to start processing video from a defined input // and packetising (if theres packetization) to a defined output. func (r *Revid) Start() error { if r.IsRunning() { return errors.New(pkg + "start called but revid is already running") } r.config.Logger.Log(logger.Info, pkg+"starting Revid") // TODO: this doesn't need to be here r.config.Logger.Log(logger.Debug, pkg+"setting up output") r.setIsRunning(true) r.config.Logger.Log(logger.Info, pkg+"starting output routine") r.wg.Add(1) go r.outputClips() r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content") err := r.setupInput() return err } // Stop halts any processing of video data from a camera or file func (r *Revid) Stop() error { if !r.IsRunning() { return errors.New(pkg + "stop called but revid is already stopped") } r.config.Logger.Log(logger.Info, pkg+"stopping revid") r.setIsRunning(false) r.config.Logger.Log(logger.Info, pkg+"killing input proccess") // If a cmd process is running, we kill! if r.cmd != nil && r.cmd.Process != nil { r.cmd.Process.Kill() } r.wg.Wait() return nil } func (r *Revid) Update(vars map[string]string) error { if r.IsRunning() { if err := r.Stop(); err != nil { return err } } //look through the vars and update revid where needed for key, value := range vars { switch key { case "Output": r.config.Outputs = make([]uint8, 1) // FIXME(kortschak): There can be only one! // How do we specify outputs after the first? // // Maybe we shouldn't be doing this! switch value { case "File": r.config.Outputs[0] = File case "Http": r.config.Outputs[0] = Http case "Rtmp": r.config.Outputs[0] = Rtmp case "FfmpegRtmp": r.config.Outputs[0] = FfmpegRtmp default: r.config.Logger.Log(logger.Warning, pkg+"invalid Output1 param", "value", value) continue } case "Packetization": switch value { case "Mpegts": r.config.Packetization = Mpegts case "Flv": r.config.Packetization = Flv default: r.config.Logger.Log(logger.Warning, pkg+"invalid packetization param", "value", value) continue } case "FramesPerClip": f, err := strconv.ParseUint(value, 10, 0) if err != nil { r.config.Logger.Log(logger.Warning, pkg+"invalid framesperclip param", "value", value) break } r.config.FramesPerClip = uint(f) case "RtmpUrl": r.config.RtmpUrl = value case "Bitrate": v, err := strconv.ParseUint(value, 10, 0) if err != nil { r.config.Logger.Log(logger.Warning, pkg+"invalid framerate param", "value", value) break } r.config.Bitrate = uint(v) case "OutputPath": r.config.OutputPath = value case "InputPath": r.config.InputPath = value case "Height": h, err := strconv.ParseUint(value, 10, 0) if err != nil { r.config.Logger.Log(logger.Warning, pkg+"invalid height param", "value", value) break } r.config.Height = uint(h) case "Width": w, err := strconv.ParseUint(value, 10, 0) if err != nil { r.config.Logger.Log(logger.Warning, pkg+"invalid width param", "value", value) break } r.config.Width = uint(w) case "FrameRate": v, err := strconv.ParseUint(value, 10, 0) if err != nil { r.config.Logger.Log(logger.Warning, pkg+"invalid framerate param", "value", value) break } r.config.FrameRate = uint(v) case "HttpAddress": r.config.HttpAddress = value case "Quantization": q, err := strconv.ParseUint(value, 10, 0) if err != nil { r.config.Logger.Log(logger.Warning, pkg+"invalid quantization param", "value", value) break } r.config.Quantization = uint(q) case "IntraRefreshPeriod": p, err := strconv.ParseUint(value, 10, 0) if err != nil { r.config.Logger.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value) break } r.config.IntraRefreshPeriod = uint(p) case "HorizontalFlip": switch strings.ToLower(value) { case "true": r.config.FlipHorizontal = true case "false": r.config.FlipHorizontal = false default: r.config.Logger.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value) } case "VerticalFlip": switch strings.ToLower(value) { case "true": r.config.FlipVertical = true case "false": r.config.FlipVertical = false default: r.config.Logger.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value) } case "BurstPeriod": v, err := strconv.ParseUint(value, 10, 0) if err != nil { r.config.Logger.Log(logger.Warning, pkg+"invalid BurstPeriod param", "value", value) break } r.config.BurstPeriod = uint(v) } } r.config.Logger.Log(logger.Info, pkg+"revid config changed", "config", fmt.Sprintf("%+v", r.config)) return r.reset(r.config) } // outputClips takes the clips produced in the packClips method and outputs them // to the desired output defined in the revid config func (r *Revid) outputClips() { defer r.wg.Done() lastTime := time.Now() var count int loop: for r.IsRunning() { // If the ring buffer has something we can read and send off chunk, err := r.buffer.Next(readTimeout) switch err { case nil: // Do nothing. case ring.ErrTimeout: r.config.Logger.Log(logger.Debug, pkg+"ring buffer read timeout") continue default: r.config.Logger.Log(logger.Error, pkg+"unexpected error", "error", err.Error()) fallthrough case io.EOF: break loop } count += chunk.Len() r.config.Logger.Log(logger.Debug, pkg+"about to send") for i, dest := range r.destination { err = dest.load(chunk) if err != nil { r.config.Logger.Log(logger.Error, pkg+"failed to load clip to output"+strconv.Itoa(i)) } } for i, dest := range r.destination { err = dest.send() if err == nil { r.config.Logger.Log(logger.Debug, pkg+"sent clip to output "+strconv.Itoa(i)) } else if r.config.SendRetry == false { r.config.Logger.Log(logger.Warning, pkg+"send to output "+strconv.Itoa(i)+" failed", "error", err.Error()) } else { r.config.Logger.Log(logger.Error, pkg+"send to output "+strconv.Itoa(i)+ " failed, trying again", "error", err.Error()) err = dest.send() if err != nil && chunk.Len() > 11 { r.config.Logger.Log(logger.Error, pkg+"second send attempted failed, restarting connection", "error", err.Error()) for err != nil { time.Sleep(sendFailedDelay) if rs, ok := dest.(restarter); ok { r.config.Logger.Log(logger.Debug, pkg+"restarting session", "session", rs) err = rs.restart() if err != nil { r.config.Logger.Log(logger.Error, pkg+"failed to restart rtmp session", "error", err.Error()) r.setIsRunning(false) return } r.config.Logger.Log(logger.Info, pkg+"restarted rtmp session") } err = dest.send() if err != nil { r.config.Logger.Log(logger.Error, pkg+"send failed again, with error", "error", err.Error()) } } } } } // Release the chunk back to the ring buffer for use for _, dest := range r.destination { dest.release() } r.config.Logger.Log(logger.Debug, pkg+"done reading that clip from ring buffer") // Log some information regarding bitrate and ring buffer size if it's time now := time.Now() deltaTime := now.Sub(lastTime) if deltaTime > bitrateTime { // FIXME(kortschak): For subsecond deltaTime, this will give infinite bitrate. r.bitrate = int(float64(count*8) / float64(deltaTime/time.Second)) r.config.Logger.Log(logger.Debug, pkg+"bitrate (bits/s)", "bitrate", r.bitrate) r.config.Logger.Log(logger.Debug, pkg+"ring buffer size", "value", r.buffer.Len()) lastTime = now count = 0 } } r.config.Logger.Log(logger.Info, pkg+"not outputting clips anymore") for i, dest := range r.destination { err := dest.close() if err != nil { r.config.Logger.Log(logger.Error, pkg+"failed to close output"+strconv.Itoa(i)+" destination", "error", err.Error()) } } } // startRaspivid sets up things for input from raspivid i.e. starts // a raspivid process and pipes it's data output. func (r *Revid) startRaspivid() error { r.config.Logger.Log(logger.Info, pkg+"starting raspivid") const disabled = "0" args := []string{ "--output", "-", "--nopreview", "--timeout", disabled, "--width", fmt.Sprint(r.config.Width), "--height", fmt.Sprint(r.config.Height), "--bitrate", fmt.Sprint(r.config.Bitrate), "--framerate", fmt.Sprint(r.config.FrameRate), } if r.config.FlipHorizontal { args = append(args, "--hflip") } if r.config.FlipVertical { args = append(args, "--vflip") } switch r.config.InputCodec { default: return fmt.Errorf("revid: invalid input codec: %v", r.config.InputCodec) case H264: args = append(args, "--codec", "H264", "--inline", "--intra", fmt.Sprint(r.config.IntraRefreshPeriod), ) if r.config.Quantize { args = append(args, "-qp", fmt.Sprint(r.config.Quantization)) } case Mjpeg: args = append(args, "--codec", "MJPEG") } r.config.Logger.Log(logger.Info, pkg+"raspivid args", "raspividArgs", strings.Join(args, " ")) r.cmd = exec.Command("raspivid", args...) stdout, err := r.cmd.StdoutPipe() if err != nil { return err } err = r.cmd.Start() if err != nil { r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error()) } r.wg.Add(1) go r.processFrom(stdout, 0) return nil } func (r *Revid) startV4L() error { const defaultVideo = "/dev/video0" r.config.Logger.Log(logger.Info, pkg+"starting webcam") if r.config.InputPath == "" { r.config.Logger.Log(logger.Info, pkg+"using default video device", "device", defaultVideo) r.config.InputPath = defaultVideo } args := []string{ "-i", r.config.InputPath, "-f", "h264", "-r", fmt.Sprint(r.config.FrameRate), } switch { case r.config.FlipHorizontal && r.config.FlipVertical: args = append(args, "-vf", "hflip,vflip") case r.config.FlipHorizontal: args = append(args, "-vf", "hflip") case r.config.FlipVertical: args = append(args, "-vf", "vflip") } args = append(args, "-b:v", fmt.Sprint(r.config.Bitrate), "-maxrate", fmt.Sprint(r.config.Bitrate), "-bufsize", fmt.Sprint(r.config.Bitrate/2), "-s", fmt.Sprintf("%dx%d", r.config.Width, r.config.Height), "-", ) r.config.Logger.Log(logger.Info, pkg+"ffmpeg args", "args", strings.Join(args, " ")) r.cmd = exec.Command("ffmpeg", args...) stdout, err := r.cmd.StdoutPipe() if err != nil { return err } err = r.cmd.Start() if err != nil { r.config.Logger.Log(logger.Fatal, pkg+"cannot start webcam", "error", err.Error()) return err } r.wg.Add(1) go r.processFrom(stdout, time.Duration(0)) return nil } // setupInputForFile sets things up for getting input from a file func (r *Revid) setupInputForFile() error { f, err := os.Open(r.config.InputPath) if err != nil { r.config.Logger.Log(logger.Error, err.Error()) r.Stop() return err } defer f.Close() // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. r.wg.Add(1) go r.processFrom(f, time.Second/time.Duration(r.config.FrameRate)) return nil } func (r *Revid) processFrom(read io.Reader, delay time.Duration) { r.config.Logger.Log(logger.Info, pkg+"reading input data") r.err <- r.lexTo(r.encoder, read, delay) r.config.Logger.Log(logger.Info, pkg+"finished reading input data") r.wg.Done() }