/* NAME revid.go AUTHORS Saxon A. Nelson-Milton Alan Noble Dan Kortschak Trek Hopton LICENSE revid is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License in gpl.txt. If not, see http://www.gnu.org/licenses. */ // Package revid provides an API for reading, transcoding, and writing audio/video streams and files. package revid import ( "errors" "fmt" "io" "net" "os" "os/exec" "strconv" "strings" "sync" "time" "bitbucket.org/ausocean/av/codec/codecutil" "bitbucket.org/ausocean/av/codec/h264" "bitbucket.org/ausocean/av/codec/h265" "bitbucket.org/ausocean/av/container/flv" "bitbucket.org/ausocean/av/container/mts" "bitbucket.org/ausocean/av/input/audio" "bitbucket.org/ausocean/av/protocol/rtcp" "bitbucket.org/ausocean/av/protocol/rtp" "bitbucket.org/ausocean/av/protocol/rtsp" "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/ioext" "bitbucket.org/ausocean/utils/logger" ) // mtsSender ringBuffer sizes. const ( rbSize = 1000 rbElementSize = 100000 ) // RTMP connection properties. const ( rtmpConnectionMaxTries = 5 rtmpConnectionTimeout = 10 ) const ( rtpPort = 60000 rtcpPort = 60001 defaultServerRTCPPort = 17301 ) const pkg = "revid:" type Logger interface { SetLevel(int8) Log(level int8, message string, params ...interface{}) } // Revid provides methods to control a revid session; providing methods // to start, stop and change the state of an instance using the Config struct. type Revid struct { // config holds the Revid configuration. // For historical reasons it also handles logging. // FIXME(kortschak): The relationship of concerns // in config/ns is weird. config Config // ns holds the netsender.Sender responsible for HTTP. ns *netsender.Sender // setupInput holds the current approach to setting up // the input stream. It returns a function used for cleaning up, and any errors. setupInput func() (func() error, error) // closeInput holds the cleanup function return from setupInput and is called // in Revid.Stop(). closeInput func() error // cmd is the exec'd process that may be used to produce // the input stream. // FIXME(kortschak): This should not exist. Replace this // with a context.Context cancellation. cmd *exec.Cmd // lexTo, encoder and packer handle transcoding the input stream. lexTo func(dest io.Writer, src io.Reader, delay time.Duration) error // encoders will hold the multiWriteCloser that writes to encoders from the lexer. encoders io.WriteCloser // isRunning is used to keep track of revid's running state between methods. isRunning bool // wg will be used to wait for any processing routines to finish. wg sync.WaitGroup // err will channel errors from revid routines to the handle errors routine. err chan error } // New returns a pointer to a new Revid with the desired configuration, and/or // an error if construction of the new instance was not successful. func New(c Config, ns *netsender.Sender) (*Revid, error) { r := Revid{ns: ns, err: make(chan error)} err := r.reset(c) if err != nil { return nil, err } go r.handleErrors() return &r, nil } // Config returns a copy of revids current config. // // Config is not safe for concurrent use. func (r *Revid) Config() Config { return r.config } // TODO(Saxon): put more thought into error severity. func (r *Revid) handleErrors() { for { err := <-r.err if err != nil { r.config.Logger.Log(logger.Error, pkg+"async error", "error", err.Error()) r.Stop() err = r.Start() if err != nil { r.config.Logger.Log(logger.Error, pkg+"failed to restart revid", "error", err.Error()) } } } } // Bitrate returns the result of the most recent bitrate check. // // TODO: get this working again. func (r *Revid) Bitrate() int { return -1 } // reset swaps the current config of a Revid with the passed // configuration; checking validity and returning errors if not valid. It then // sets up the data pipeline accordingly to this configuration. func (r *Revid) reset(config Config) error { err := r.setConfig(config) if err != nil { return err } r.config.Logger.SetLevel(config.LogLevel) err = r.setupPipeline( func(dst io.WriteCloser, fps float64) (io.WriteCloser, error) { var st int switch r.config.Input { case Raspivid, File, V4L: st = mts.EncodeH264 case RTSP: st = mts.EncodeH265 case Audio: st = mts.EncodeAudio } e := mts.NewEncoder(dst, float64(fps), st) return e, nil }, func(dst io.WriteCloser, fps int) (io.WriteCloser, error) { return flv.NewEncoder(dst, true, true, fps) }, ioext.MultiWriteCloser, ) if err != nil { return err } return nil } // setConfig takes a config, checks it's validity and then replaces the current // revid config. func (r *Revid) setConfig(config Config) error { r.config.Logger = config.Logger err := config.Validate(r) if err != nil { return errors.New("Config struct is bad: " + err.Error()) } r.config = config return nil } // setupPipeline constructs the revid dataPipeline. Inputs, encoders and // senders are created and linked based on the current revid config. // // mtsEnc and flvEnc will be called to obtain an mts encoder and flv encoder // respectively. multiWriter will be used to create an ioext.multiWriteCloser // so that encoders can write to multiple senders. func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.WriteCloser, error), flvEnc func(dst io.WriteCloser, rate int) (io.WriteCloser, error), multiWriter func(...io.WriteCloser) io.WriteCloser) error { // encoders will hold the encoders that are required for revid's current // configuration. var encoders []io.WriteCloser // mtsSenders will hold the senders the require MPEGTS encoding, and flvSenders // will hold senders that require FLV encoding. var mtsSenders, flvSenders []io.WriteCloser // We will go through our outputs and create the corresponding senders to add // to mtsSenders if the output requires MPEGTS encoding, or flvSenders if the // output requires FLV encoding. var w io.WriteCloser for _, out := range r.config.Outputs { switch out { case HTTP: w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, rbSize, rbElementSize, 0) mtsSenders = append(mtsSenders, w) case RTP: w, err := newRtpSender(r.config.RTPAddress, r.config.Logger.Log, r.config.FrameRate) if err != nil { r.config.Logger.Log(logger.Warning, pkg+"rtp connect error", "error", err.Error()) } mtsSenders = append(mtsSenders, w) case File: w, err := newFileSender(r.config.OutputPath) if err != nil { return err } mtsSenders = append(mtsSenders, w) case RTMP: w, err := newRtmpSender(r.config.RTMPURL, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log) if err != nil { r.config.Logger.Log(logger.Warning, pkg+"rtmp connect error", "error", err.Error()) } flvSenders = append(flvSenders, w) } } // If we have some senders that require MPEGTS encoding then add an MPEGTS // encoder to revid's encoder slice, and give this encoder the mtsSenders // as a destination. if len(mtsSenders) != 0 { mw := multiWriter(mtsSenders...) e, _ := mtsEnc(mw, r.config.WriteRate) encoders = append(encoders, e) } // If we have some senders that require FLV encoding then add an FLV // encoder to revid's encoder slice, and give this encoder the flvSenders // as a destination. if len(flvSenders) != 0 { mw := multiWriter(flvSenders...) e, err := flvEnc(mw, int(r.config.FrameRate)) if err != nil { return err } encoders = append(encoders, e) } r.encoders = multiWriter(encoders...) switch r.config.Input { case Raspivid: r.setupInput = r.startRaspivid r.lexTo = h264.Lex case V4L: r.setupInput = r.startV4L r.lexTo = h264.Lex case File: r.setupInput = r.setupInputForFile case RTSP: r.setupInput = r.startRTSPCamera r.lexTo = h265.NewLexer(false).Lex case Audio: r.setupInput = r.startAudioDevice r.lexTo = codecutil.NewByteLexer(&r.config.ChunkSize).Lex } return nil } // Start invokes a Revid to start processing video from a defined input // and packetising (if theres packetization) to a defined output. // // Start is not safe for concurrent use. func (r *Revid) Start() error { if r.isRunning { r.config.Logger.Log(logger.Warning, pkg+"start called, but revid already running") return nil } r.config.Logger.Log(logger.Info, pkg+"starting Revid") r.isRunning = true var err error r.closeInput, err = r.setupInput() if err != nil { r.Stop() } return err } // Stop closes down the pipeline. This closes encoders and sender output routines, // connections, and/or files. // // Stop is not safe for concurrent use. func (r *Revid) Stop() { if !r.isRunning { r.config.Logger.Log(logger.Warning, pkg+"stop called but revid isn't running") return } if r.closeInput != nil { err := r.closeInput() if err != nil { r.config.Logger.Log(logger.Error, pkg+"could not close input", "error", err.Error()) } } r.config.Logger.Log(logger.Info, pkg+"closing pipeline") err := r.encoders.Close() if err != nil { r.config.Logger.Log(logger.Error, pkg+"failed to close pipeline", "error", err.Error()) } if r.cmd != nil && r.cmd.Process != nil { r.config.Logger.Log(logger.Info, pkg+"killing input proccess") r.cmd.Process.Kill() } r.wg.Wait() r.isRunning = false } // Update takes a map of variables and their values and edits the current config // if the variables are recognised as valid parameters. // // Update is not safe for concurrent use. func (r *Revid) Update(vars map[string]string) error { if r.isRunning { r.Stop() } //look through the vars and update revid where needed for key, value := range vars { switch key { case "Saturation": s, err := strconv.ParseInt(value, 10, 0) if err != nil { r.config.Logger.Log(logger.Warning, pkg+"invalid saturation param", "value", value) } r.config.Saturation = int(s) case "Brightness": b, err := strconv.ParseUint(value, 10, 0) if err != nil { r.config.Logger.Log(logger.Warning, pkg+"invalid brightness param", "value", value) } r.config.Brightness = uint(b) case "Exposure": r.config.Exposure = value case "AutoWhiteBalance": r.config.AutoWhiteBalance = value case "Output": outputs := strings.Split(value, ",") r.config.Outputs = make([]uint8, len(outputs)) for i, output := range outputs { switch output { case "File": r.config.Outputs[i] = File case "Http": r.config.Outputs[i] = HTTP case "Rtmp": r.config.Outputs[i] = RTMP case "Rtp": r.config.Outputs[i] = RTP default: r.config.Logger.Log(logger.Warning, pkg+"invalid output param", "value", value) continue } } case "RtmpUrl": r.config.RTMPURL = value case "RtpAddress": r.config.RTPAddress = value case "Bitrate": v, err := strconv.ParseUint(value, 10, 0) if err != nil { r.config.Logger.Log(logger.Warning, pkg+"invalid framerate param", "value", value) break } r.config.Bitrate = uint(v) case "OutputPath": r.config.OutputPath = value case "InputPath": r.config.InputPath = value case "Height": h, err := strconv.ParseUint(value, 10, 0) if err != nil { r.config.Logger.Log(logger.Warning, pkg+"invalid height param", "value", value) break } r.config.Height = uint(h) case "Width": w, err := strconv.ParseUint(value, 10, 0) if err != nil { r.config.Logger.Log(logger.Warning, pkg+"invalid width param", "value", value) break } r.config.Width = uint(w) case "FrameRate": v, err := strconv.ParseUint(value, 10, 0) if err != nil { r.config.Logger.Log(logger.Warning, pkg+"invalid framerate param", "value", value) break } r.config.FrameRate = uint(v) case "Rotation": v, err := strconv.ParseUint(value, 10, 0) if err != nil || v > 359 { r.config.Logger.Log(logger.Warning, pkg+"invalid rotation param", "value", value) break } r.config.Rotation = uint(v) case "HttpAddress": r.config.HTTPAddress = value case "Quantization": q, err := strconv.ParseUint(value, 10, 0) if err != nil { r.config.Logger.Log(logger.Warning, pkg+"invalid quantization param", "value", value) break } r.config.Quantization = uint(q) case "IntraRefreshPeriod": p, err := strconv.ParseUint(value, 10, 0) if err != nil { r.config.Logger.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value) break } r.config.IntraRefreshPeriod = uint(p) case "HorizontalFlip": switch strings.ToLower(value) { case "true": r.config.FlipHorizontal = true case "false": r.config.FlipHorizontal = false default: r.config.Logger.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value) } case "VerticalFlip": switch strings.ToLower(value) { case "true": r.config.FlipVertical = true case "false": r.config.FlipVertical = false default: r.config.Logger.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value) } case "BurstPeriod": v, err := strconv.ParseUint(value, 10, 0) if err != nil { r.config.Logger.Log(logger.Warning, pkg+"invalid BurstPeriod param", "value", value) break } r.config.BurstPeriod = uint(v) case "Logging": switch value { case "Debug": r.config.LogLevel = logger.Debug case "Info": r.config.LogLevel = logger.Info case "Warning": r.config.LogLevel = logger.Warning case "Error": r.config.LogLevel = logger.Error case "Fatal": r.config.LogLevel = logger.Fatal default: r.config.Logger.Log(logger.Warning, pkg+"invalid Logging param", "value", value) } } } r.config.Logger.Log(logger.Info, pkg+"revid config changed", "config", fmt.Sprintf("%+v", r.config)) return r.reset(r.config) } // startRaspivid sets up things for input from raspivid i.e. starts // a raspivid process and pipes it's data output. func (r *Revid) startRaspivid() (func() error, error) { r.config.Logger.Log(logger.Info, pkg+"starting raspivid") const disabled = "0" args := []string{ "--output", "-", "--nopreview", "--timeout", disabled, "--width", fmt.Sprint(r.config.Width), "--height", fmt.Sprint(r.config.Height), "--bitrate", fmt.Sprint(r.config.Bitrate), "--framerate", fmt.Sprint(r.config.FrameRate), "--rotation", fmt.Sprint(r.config.Rotation), "--brightness", fmt.Sprint(r.config.Brightness), "--saturation", fmt.Sprint(r.config.Saturation), "--exposure", fmt.Sprint(r.config.Exposure), "--awb", fmt.Sprint(r.config.AutoWhiteBalance), } if r.config.FlipHorizontal { args = append(args, "--hflip") } if r.config.FlipVertical { args = append(args, "--vflip") } if r.config.FlipHorizontal { args = append(args, "--hflip") } switch r.config.InputCodec { default: return nil, fmt.Errorf("revid: invalid input codec: %v", r.config.InputCodec) case codecutil.H264: args = append(args, "--codec", "H264", "--inline", "--intra", fmt.Sprint(r.config.IntraRefreshPeriod), ) if r.config.Quantize { args = append(args, "-qp", fmt.Sprint(r.config.Quantization)) } case codecutil.MJPEG: args = append(args, "--codec", "MJPEG") } r.config.Logger.Log(logger.Info, pkg+"raspivid args", "raspividArgs", strings.Join(args, " ")) r.cmd = exec.Command("raspivid", args...) stdout, err := r.cmd.StdoutPipe() if err != nil { return nil, err } err = r.cmd.Start() if err != nil { r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error()) } r.wg.Add(1) go r.processFrom(stdout, 0) return nil, nil } func (r *Revid) startV4L() (func() error, error) { const defaultVideo = "/dev/video0" r.config.Logger.Log(logger.Info, pkg+"starting webcam") if r.config.InputPath == "" { r.config.Logger.Log(logger.Info, pkg+"using default video device", "device", defaultVideo) r.config.InputPath = defaultVideo } args := []string{ "-i", r.config.InputPath, "-f", "h264", "-r", fmt.Sprint(r.config.FrameRate), } args = append(args, "-b:v", fmt.Sprint(r.config.Bitrate), "-maxrate", fmt.Sprint(r.config.Bitrate), "-bufsize", fmt.Sprint(r.config.Bitrate/2), "-s", fmt.Sprintf("%dx%d", r.config.Width, r.config.Height), "-", ) r.config.Logger.Log(logger.Info, pkg+"ffmpeg args", "args", strings.Join(args, " ")) r.cmd = exec.Command("ffmpeg", args...) stdout, err := r.cmd.StdoutPipe() if err != nil { return nil, nil } err = r.cmd.Start() if err != nil { r.config.Logger.Log(logger.Fatal, pkg+"cannot start webcam", "error", err.Error()) return nil, nil } r.wg.Add(1) go r.processFrom(stdout, time.Duration(0)) return nil, nil } // setupInputForFile sets things up for getting input from a file func (r *Revid) setupInputForFile() (func() error, error) { f, err := os.Open(r.config.InputPath) if err != nil { r.config.Logger.Log(logger.Error, err.Error()) r.Stop() return nil, err } // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. r.wg.Add(1) go r.processFrom(f, 0) return func() error { return f.Close() }, nil } // startAudioDevice is used to start capturing audio from an audio device and processing it. // It returns a function that can be used to stop the device and any errors that occur. func (r *Revid) startAudioDevice() (func() error, error) { // Create audio device. ac := &audio.Config{ SampleRate: r.config.SampleRate, Channels: r.config.Channels, RecPeriod: r.config.RecPeriod, BitDepth: r.config.BitDepth, Codec: r.config.InputCodec, } mts.Meta.Add("sampleRate", strconv.Itoa(r.config.SampleRate)) mts.Meta.Add("channels", strconv.Itoa(r.config.Channels)) mts.Meta.Add("period", fmt.Sprintf("%.6f", r.config.RecPeriod)) mts.Meta.Add("bitDepth", strconv.Itoa(r.config.BitDepth)) switch r.config.InputCodec { case codecutil.PCM: mts.Meta.Add("codec", "pcm") case codecutil.ADPCM: mts.Meta.Add("codec", "adpcm") default: r.config.Logger.Log(logger.Fatal, pkg+"no audio codec set in config") } ai, err := audio.NewDevice(ac, r.config.Logger) if err != nil { r.config.Logger.Log(logger.Fatal, pkg+"failed to create audio device", "error", err.Error()) } // Start audio device err = ai.Start() if err != nil { r.config.Logger.Log(logger.Fatal, pkg+"failed to start audio device", "error", err.Error()) } // Process output from audio device. r.config.ChunkSize = ai.ChunkSize() r.wg.Add(1) go r.processFrom(ai, time.Duration(float64(time.Second)/r.config.WriteRate)) return func() error { ai.Stop() return nil }, nil } // startRTSPCamera uses RTSP to request an RTP stream from an IP camera. An RTP // client is created from which RTP packets containing either h264/h265 can read // by the selected lexer. func (r *Revid) startRTSPCamera() (func() error, error) { rtspClt, local, remote, err := rtsp.NewClient(r.config.RTSPURL) if err != nil { return nil, err } resp, err := rtspClt.Options() if err != nil { return nil, err } r.config.Logger.Log(logger.Info, pkg+"RTSP OPTIONS response", "response", resp.String()) resp, err = rtspClt.Describe() if err != nil { return nil, err } r.config.Logger.Log(logger.Info, pkg+"RTSP DESCRIBE response", "response", resp.String()) resp, err = rtspClt.Setup("track1", fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", rtpPort, rtcpPort)) if err != nil { return nil, err } r.config.Logger.Log(logger.Info, pkg+"RTSP SETUP response", "response", resp.String()) rtpCltAddr, rtcpCltAddr, rtcpSvrAddr, err := formAddrs(local, remote, *resp) if err != nil { return nil, err } resp, err = rtspClt.Play() if err != nil { return nil, err } r.config.Logger.Log(logger.Info, pkg+"RTSP server PLAY response", "response", resp.String()) rtpClt, err := rtp.NewClient(rtpCltAddr) if err != nil { return nil, err } rtcpClt, err := rtcp.NewClient(rtcpCltAddr, rtcpSvrAddr, rtpClt, r.config.Logger.Log) if err != nil { return nil, err } // Check errors from RTCP client until it has stopped running. go func() { for { err, ok := <-rtcpClt.Err() if ok { r.config.Logger.Log(logger.Warning, pkg+"RTCP error", "error", err.Error()) } else { return } } }() // Start the RTCP client. rtcpClt.Start() // Start reading data from the RTP client. r.wg.Add(1) go r.processFrom(rtpClt, time.Second/time.Duration(r.config.FrameRate)) return func() error { rtspClt.Close() rtcpClt.Stop() return nil }, nil } // formAddrs is a helper function to form the addresses for the RTP client, // RTCP client, and the RTSP server's RTCP addr using the local, remote addresses // of the RTSP conn, and the SETUP method response. func formAddrs(local, remote *net.TCPAddr, setupResp rtsp.Response) (rtpCltAddr, rtcpCltAddr, rtcpSvrAddr string, err error) { svrRTCPPort, err := parseSvrRTCPPort(setupResp) if err != nil { return "", "", "", err } rtpCltAddr = strings.Split(local.String(), ":")[0] + ":" + strconv.Itoa(rtpPort) rtcpCltAddr = strings.Split(local.String(), ":")[0] + ":" + strconv.Itoa(rtcpPort) rtcpSvrAddr = strings.Split(remote.String(), ":")[0] + ":" + strconv.Itoa(svrRTCPPort) return } // parseServerRTCPPort is a helper function to get the RTSP server's RTCP port. func parseSvrRTCPPort(resp rtsp.Response) (int, error) { transport := resp.Header.Get("Transport") for _, p := range strings.Split(transport, ";") { if strings.Contains(p, "server_port") { port, err := strconv.Atoi(strings.Split(p, "-")[1]) if err != nil { return 0, err } return port, nil } } return 0, errors.New("SETUP response did not provide RTCP port") } func (r *Revid) processFrom(read io.Reader, delay time.Duration) { r.config.Logger.Log(logger.Info, pkg+"reading input data") r.err <- r.lexTo(r.encoders, read, delay) r.config.Logger.Log(logger.Info, pkg+"finished reading input data") r.wg.Done() }