av/revid/revid.go

254 lines
7.2 KiB
Go
Raw Permalink Normal View History

/*
NAME
revid.go
AUTHORS
2018-06-07 14:50:57 +03:00
Saxon A. Nelson-Milton <saxon@ausocean.org>
Alan Noble <alan@ausocean.org>
Dan Kortschak <dan@ausocean.org>
Trek Hopton <trek@ausocean.org>
Scott Barnard <scott@ausocean.org>
LICENSE
revid is Copyright (C) 2017-2020 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
2019-04-22 08:44:08 +03:00
for more details.
You should have received a copy of the GNU General Public License
2019-04-22 08:44:08 +03:00
in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
// Package revid provides an API for reading, transcoding, and writing audio/video streams and files.
package revid
import (
"errors"
"fmt"
"io"
"sync"
2018-04-16 07:54:21 +03:00
"time"
2018-02-14 10:02:57 +03:00
"bitbucket.org/ausocean/av/device"
"bitbucket.org/ausocean/av/filter"
"bitbucket.org/ausocean/av/revid/config"
"bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/bitrate"
"bitbucket.org/ausocean/utils/logger"
)
// Misc consts.
const (
poolStartingElementSize = 10000 // Bytes.
rtmpConnectionMaxTries = 5
)
type Logger interface {
SetLevel(int8)
Log(level int8, message string, params ...interface{})
}
2018-04-16 08:12:16 +03:00
// Revid provides methods to control a revid session; providing methods
// to start, stop and change the state of an instance using the Config struct.
2018-06-09 05:01:21 +03:00
type Revid struct {
// config holds the Revid configuration.
// For historical reasons it also handles logging.
// FIXME(kortschak): The relationship of concerns
// in config/ns is weird.
cfg config.Config
// ns holds the netsender.Sender responsible for HTTP.
ns *netsender.Sender
// input will capture audio or video from which we can read data.
input device.AVDevice
// closeInput holds the cleanup function return from setupInput and is called
// in Revid.Stop().
closeInput func() error
// lexTo, encoder and packer handle transcoding the input stream.
lexTo func(dest io.Writer, src io.Reader, delay time.Duration) error
2019-03-09 07:58:07 +03:00
// probe allows us to "probe" frames after being lexed before going off to
// later encoding stages. This is useful if we wish to perform some processing
// on frames to derive metrics, for example, we might like to probe frames to
// derive turbidity levels. This is provided through SetProbe.
probe io.WriteCloser
2019-12-27 08:36:30 +03:00
// filters will hold the filter interface that will write to the chosen filter from the lexer.
filters []filter.Filter
// encoders will hold the multiWriteCloser that writes to encoders from the filter.
2019-04-18 10:25:48 +03:00
encoders io.WriteCloser
// running is used to keep track of revid's running state between methods.
running bool
2019-04-18 10:25:48 +03:00
// wg will be used to wait for any processing routines to finish.
wg sync.WaitGroup
2019-04-22 08:44:08 +03:00
// err will channel errors from revid routines to the handle errors routine.
err chan error
// bitrate is used for bitrate calculations.
bitrate bitrate.Calculator
// stop is used to signal stopping when looping an input.
stop chan struct{}
2018-01-23 06:15:06 +03:00
}
// New returns a pointer to a new Revid with the desired configuration, and/or
// an error if construction of the new instance was not successful.
func New(c config.Config, ns *netsender.Sender) (*Revid, error) {
r := Revid{ns: ns, err: make(chan error)}
err := r.setConfig(c)
if err != nil {
return nil, fmt.Errorf("could not set config, failed with error: %w", err)
}
go r.handleErrors()
return &r, nil
}
2019-04-15 04:18:12 +03:00
// Config returns a copy of revids current config.
func (r *Revid) Config() config.Config {
return r.cfg
2019-04-15 04:18:12 +03:00
}
2018-06-27 01:20:05 +03:00
// Bitrate returns the result of the most recent bitrate check.
func (r *Revid) Bitrate() int {
return r.bitrate.Bitrate()
}
2018-06-09 05:01:21 +03:00
// Start invokes a Revid to start processing video from a defined input
// and packetising (if theres packetization) to a defined output.
func (r *Revid) Start() error {
if r.running {
r.cfg.Logger.Log(logger.Warning, "start called, but revid already running")
return nil
}
r.stop = make(chan struct{})
r.cfg.Logger.Log(logger.Debug, "resetting revid")
err := r.reset(r.cfg)
if err != nil {
r.Stop()
return err
}
r.cfg.Logger.Log(logger.Info, "revid reset")
2020-08-14 05:50:51 +03:00
// Calculate delay between frames based on FileFPS for video or
2020-08-13 09:55:37 +03:00
// between recording periods for audio.
2020-01-25 02:45:27 +03:00
d := time.Duration(0)
if r.cfg.Input == config.InputAudio {
2020-08-14 06:24:07 +03:00
d = time.Duration(r.cfg.RecPeriod * float64(time.Second))
2020-08-14 06:04:20 +03:00
} else if r.cfg.FileFPS != 0 {
2020-08-14 06:24:07 +03:00
d = time.Duration(1000/r.cfg.FileFPS) * time.Millisecond
2020-01-25 02:45:27 +03:00
}
r.cfg.Logger.Log(logger.Debug, "starting input processing routine")
r.wg.Add(1)
go r.processFrom(r.input, d)
r.running = true
return nil
}
// Stop closes down the pipeline. This closes encoders and sender output routines,
// connections, and/or files.
func (r *Revid) Stop() {
if !r.running {
r.cfg.Logger.Log(logger.Warning, "stop called but revid isn't running")
return
2018-03-17 16:29:42 +03:00
}
2018-05-07 05:53:50 +03:00
close(r.stop)
r.cfg.Logger.Log(logger.Debug, "stopping input")
err := r.input.Stop()
if err != nil {
r.cfg.Logger.Log(logger.Error, "could not stop input", "error", err.Error())
} else {
r.cfg.Logger.Log(logger.Info, "input stopped")
}
r.cfg.Logger.Log(logger.Debug, "closing pipeline")
err = r.encoders.Close()
if err != nil {
r.cfg.Logger.Log(logger.Error, "failed to close pipeline", "error", err.Error())
} else {
r.cfg.Logger.Log(logger.Info, "pipeline closed")
}
2019-12-23 04:29:17 +03:00
2019-12-27 09:18:28 +03:00
for _, filter := range r.filters {
err = filter.Close()
if err != nil {
r.cfg.Logger.Log(logger.Error, "failed to close filters", "error", err.Error())
} else {
r.cfg.Logger.Log(logger.Info, "filters closed")
2019-12-27 09:18:28 +03:00
}
}
2019-12-27 09:18:28 +03:00
r.cfg.Logger.Log(logger.Debug, "waiting for routines to finish")
r.wg.Wait()
r.cfg.Logger.Log(logger.Info, "routines finished")
r.running = false
2020-01-30 05:50:44 +03:00
}
// Burst starts revid, waits for time specified, and then stops revid.
func (r *Revid) Burst() error {
r.cfg.Logger.Log(logger.Debug, "starting revid")
2020-01-30 05:50:44 +03:00
err := r.Start()
if err != nil {
return fmt.Errorf("could not start revid: %w", err)
}
r.cfg.Logger.Log(logger.Info, "revid started")
2020-01-30 05:50:44 +03:00
dur := time.Duration(r.cfg.BurstPeriod) * time.Second
time.Sleep(dur)
r.cfg.Logger.Log(logger.Debug, "stopping revid")
2020-01-30 05:50:44 +03:00
r.Stop()
r.cfg.Logger.Log(logger.Info, "revid stopped")
2020-01-30 05:50:44 +03:00
return nil
}
func (r *Revid) Running() bool {
return r.running
}
// Update takes a map of variables and their values and edits the current config
// if the variables are recognised as valid parameters.
func (r *Revid) Update(vars map[string]string) error {
if r.running {
r.cfg.Logger.Log(logger.Debug, "revid running; stopping for re-config")
r.Stop()
r.cfg.Logger.Log(logger.Info, "revid was running; stopped for re-config")
}
//look through the vars and update revid where needed
r.cfg.Logger.Log(logger.Debug, "checking vars from server", "vars", vars)
r.cfg.Update(vars)
r.cfg.Logger.Log(logger.Info, "finished reconfig")
r.cfg.Logger.Log(logger.Debug, "config changed", "config", r.cfg)
return nil
}
func (r *Revid) SetProbe(p io.WriteCloser) error {
if r.running {
return errors.New("cannot set probe when revid is running")
}
r.probe = p
return nil
}