rv & revid/{revid.go, pipeline.go}: added probe component to revid and wrote skeleton of probe implementation for turbdidity sensing in rv

This commit is contained in:
Saxon Nelson-Milton 2022-01-06 15:28:02 +10:30
parent d1d5bac03a
commit bdb44214a1
3 changed files with 64 additions and 4 deletions

View File

@ -103,12 +103,32 @@ const (
profilePath = "rv.prof"
pkg = "rv: "
runPreDelay = 20 * time.Second
)
// Software define pin values.
// See https://netreceiver.appspot.com/help, External Pin Assignments.
const (
bitratePin = "X36"
saturationPin = "X38"
contrastPin = "X39"
)
// This is set to true if the 'profile' build tag is provided on build.
var canProfile = false
type turbidityProbe struct {
saturation, contrast float64
}
// TODO(Russell): complete this implementation of Write.
func (tp *turbidityProbe) Write(p []byte) (int, error) {
return len(p), nil
}
func (tp *turbidityProbe) Close() error {
return nil
}
func main() {
mts.Meta = meta.NewWith([][2]string{{metaPreambleKey, metaPreambleData}})
@ -134,10 +154,13 @@ func main() {
log.Log(logger.Info, "profiling started")
}
var rv *revid.Revid
var (
rv *revid.Revid
p *turbidityProbe
)
log.Log(logger.Debug, "initialising netsender client")
ns, err := netsender.New(log, nil, readPin(rv), nil, createVarMap())
ns, err := netsender.New(log, nil, readPin(p, rv), nil, createVarMap())
if err != nil {
log.Log(logger.Fatal, pkg+"could not initialise netsender client: "+err.Error())
}
@ -148,6 +171,11 @@ func main() {
log.Log(logger.Fatal, pkg+"could not initialise revid", "error", err.Error())
}
err = rv.SetProbe(p)
if err != nil {
log.Log(logger.Error, pkg+"could not set probe","error",err.Error())
}
// NB: Problems were encountered with communicating with RTSP inputs. When trying to
// connect it would fail due to timeout; as if things had not been set up quickly
// enough before revid tried to do things. This delay fixes this, but there is probably
@ -271,7 +299,7 @@ func sleep(ns *netsender.Sender, l *logger.Logger) {
// readPin provides a callback function of consistent signature for use by
// netsender to retrieve software defined pin values e.g. revid bitrate.
func readPin(rv *revid.Revid) func(pin *netsender.Pin) error {
func readPin(p *turbidityProbe, rv *revid.Revid) func(pin *netsender.Pin) error {
return func(pin *netsender.Pin) error {
switch {
case pin.Name == bitratePin:
@ -281,6 +309,16 @@ func readPin(rv *revid.Revid) func(pin *netsender.Pin) error {
}
case pin.Name[0] == 'X':
return sds.ReadSystem(pin)
case pin.Name == saturationPin:
pin.Value = -1
if p != nil {
pin.Value = int(p.saturation * 1000)
}
case pin.Name == contrastPin:
pin.Value = -1
if p != nil {
pin.Value = int(p.contrast * 100)
}
default:
pin.Value = -1
}

View File

@ -283,6 +283,7 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
r.cfg.Logger.Log(logger.Info, "filters set up")
}
var err error
switch r.cfg.Input {
case config.InputRaspivid:
@ -377,7 +378,13 @@ func (r *Revid) processFrom(in device.AVDevice, delay time.Duration) {
// For a continuous source e.g. a camera or microphone, we should remain
// in this call indefinitely unless in.Stop() is called and an io.EOF is forced.
r.cfg.Logger.Log(logger.Debug, "lexing")
err = r.lexTo(r.filters[0], in, delay)
var w io.Writer
w = r.filters[0]
if r.probe != nil {
w = ioext.MultiWriteCloser(r.filters[0],r.probe)
}
err = r.lexTo(w, in, delay)
switch err {
case nil, io.EOF:
r.cfg.Logger.Log(logger.Info, "end of file")

View File

@ -30,6 +30,7 @@ LICENSE
package revid
import (
"errors"
"fmt"
"io"
"sync"
@ -76,6 +77,12 @@ type Revid struct {
// lexTo, encoder and packer handle transcoding the input stream.
lexTo func(dest io.Writer, src io.Reader, delay time.Duration) error
// probe allows us to "probe" frames after being lexed before going off to
// later encoding stages. This is useful if we wish to perform some processing
// on frames to derive metrics, for example, we might like to probe frames to
// derive turbidity levels. This is provided through SetProbe.
probe io.WriteCloser
// filters will hold the filter interface that will write to the chosen filter from the lexer.
filters []filter.Filter
@ -236,3 +243,11 @@ func (r *Revid) Update(vars map[string]string) error {
r.cfg.Logger.Log(logger.Debug, "config changed", "config", r.cfg)
return nil
}
func (r *Revid) SetProbe(p io.WriteCloser) error {
if r.running {
return errors.New("cannot set probe when revid is running")
}
r.probe = p
return nil
}