av/revid/revid.go

263 lines
7.2 KiB
Go

/*
NAME
revid.go
AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org>
Alan Noble <alan@ausocean.org>
Dan Kortschak <dan@ausocean.org>
Trek Hopton <trek@ausocean.org>
Scott Barnard <scott@ausocean.org>
LICENSE
revid is Copyright (C) 2017-2020 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
// Package revid provides an API for reading, transcoding, and writing audio/video streams and files.
package revid
import (
"errors"
"fmt"
"io"
"sync"
"time"
"bitbucket.org/ausocean/av/device"
"bitbucket.org/ausocean/av/filter"
"bitbucket.org/ausocean/av/container/mts"
"bitbucket.org/ausocean/av/revid/config"
"bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/bitrate"
)
// Misc consts.
const (
poolStartingElementSize = 10000 // Bytes.
rtmpConnectionMaxTries = 5
)
type Logger interface {
SetLevel(int8)
Log(level int8, message string, params ...interface{})
}
// Revid provides methods to control a revid session; providing methods
// to start, stop and change the state of an instance using the Config struct.
type Revid struct {
// config holds the Revid configuration.
// For historical reasons it also handles logging.
// FIXME(kortschak): The relationship of concerns
// in config/ns is weird.
cfg config.Config
// ns holds the netsender.Sender responsible for HTTP.
ns *netsender.Sender
// input will capture audio or video from which we can read data.
input device.AVDevice
// closeInput holds the cleanup function return from setupInput and is called
// in Revid.Stop().
closeInput func() error
// lexTo, encoder and packer handle transcoding the input stream.
lexTo func(dest io.Writer, src io.Reader, delay time.Duration) error
// probe allows us to "probe" frames after being lexed before going off to
// later encoding stages. This is useful if we wish to perform some processing
// on frames to derive metrics, for example, we might like to probe frames to
// derive turbidity levels. This is provided through SetProbe.
probe io.WriteCloser
// filters will hold the filter interface that will write to the chosen filter from the lexer.
filters []filter.Filter
// encoders will hold the multiWriteCloser that writes to encoders from the filter.
encoders io.WriteCloser
// running is used to keep track of revid's running state between methods.
running bool
// wg will be used to wait for any processing routines to finish.
wg sync.WaitGroup
// err will channel errors from revid routines to the handle errors routine.
err chan error
// bitrate is used for bitrate calculations.
bitrate bitrate.Calculator
// stop is used to signal stopping when looping an input.
stop chan struct{}
}
// New returns a pointer to a new Revid with the desired configuration, and/or
// an error if construction of the new instance was not successful.
func New(c config.Config, ns *netsender.Sender) (*Revid, error) {
r := Revid{ns: ns, err: make(chan error)}
err := r.setConfig(c)
if err != nil {
return nil, fmt.Errorf("could not set config, failed with error: %w", err)
}
go r.handleErrors()
return &r, nil
}
// Config returns a copy of revids current config.
func (r *Revid) Config() config.Config {
return r.cfg
}
// Bitrate returns the result of the most recent bitrate check.
func (r *Revid) Bitrate() int {
return r.bitrate.Bitrate()
}
func (r *Revid) Write(p []byte) (int, error) {
mi, ok := r.input.(*device.ManualInput)
if !ok {
return 0, errors.New("cannot write to anything but ManualInput")
}
return mi.Write(p)
}
// Start invokes a Revid to start processing video from a defined input
// and packetising (if theres packetization) to a defined output.
func (r *Revid) Start() error {
if r.running {
r.cfg.Logger.Warning("start called, but revid already running")
return nil
}
r.stop = make(chan struct{})
r.cfg.Logger.Debug("resetting revid")
err := r.reset(r.cfg)
if err != nil {
r.Stop()
return err
}
r.cfg.Logger.Info("revid reset")
// Calculate delay between frames based on FileFPS for video or
// between recording periods for audio.
d := time.Duration(0)
if r.cfg.Input == config.InputAudio {
d = time.Duration(r.cfg.RecPeriod * float64(time.Second))
mts.RealTime.Set(time.Now()) // Enable timestamps in MTS output, if any.
} else if r.cfg.FileFPS != 0 {
d = time.Duration(1000/r.cfg.FileFPS) * time.Millisecond
}
r.cfg.Logger.Debug("starting input processing routine")
r.wg.Add(1)
go r.processFrom(d)
r.running = true
return nil
}
// Stop closes down the pipeline. This closes encoders and sender output routines,
// connections, and/or files.
func (r *Revid) Stop() {
if !r.running {
r.cfg.Logger.Warning("stop called but revid isn't running")
return
}
close(r.stop)
r.cfg.Logger.Debug("stopping input")
err := r.input.Stop()
if err != nil {
r.cfg.Logger.Error("could not stop input", "error", err.Error())
} else {
r.cfg.Logger.Info("input stopped")
}
r.cfg.Logger.Debug("closing pipeline")
err = r.encoders.Close()
if err != nil {
r.cfg.Logger.Error("failed to close pipeline", "error", err.Error())
} else {
r.cfg.Logger.Info("pipeline closed")
}
for _, filter := range r.filters {
err = filter.Close()
if err != nil {
r.cfg.Logger.Error("failed to close filters", "error", err.Error())
} else {
r.cfg.Logger.Info("filters closed")
}
}
r.cfg.Logger.Debug("waiting for routines to finish")
r.wg.Wait()
r.cfg.Logger.Info("routines finished")
r.running = false
}
// Burst starts revid, waits for time specified, and then stops revid.
func (r *Revid) Burst() error {
r.cfg.Logger.Debug("starting revid")
err := r.Start()
if err != nil {
return fmt.Errorf("could not start revid: %w", err)
}
r.cfg.Logger.Info("revid started")
dur := time.Duration(r.cfg.BurstPeriod) * time.Second
time.Sleep(dur)
r.cfg.Logger.Debug("stopping revid")
r.Stop()
r.cfg.Logger.Info("revid stopped")
return nil
}
func (r *Revid) Running() bool {
return r.running
}
// Update takes a map of variables and their values and edits the current config
// if the variables are recognised as valid parameters.
func (r *Revid) Update(vars map[string]string) error {
if r.running {
r.cfg.Logger.Debug("revid running; stopping for re-config")
r.Stop()
r.cfg.Logger.Info("revid was running; stopped for re-config")
}
//look through the vars and update revid where needed
r.cfg.Logger.Debug("checking vars from server", "vars", vars)
r.cfg.Update(vars)
r.cfg.Logger.Info("finished reconfig")
r.cfg.Logger.Debug("config changed", "config", r.cfg)
return nil
}
func (r *Revid) SetProbe(p io.WriteCloser) error {
if r.running {
return errors.New("cannot set probe when revid is running")
}
r.probe = p
return nil
}