Merged in video-forwarder (pull request #484)

Video forwarder

Resolves issue #378

Approved-by: Russell Stanley
This commit is contained in:
Saxon Milton 2022-09-21 07:24:47 +00:00
commit ea900398a3
12 changed files with 712 additions and 41 deletions

387
cmd/vidforward/main.go Normal file
View File

@ -0,0 +1,387 @@
/*
DESCRIPTION
vidforward is a service for receiving video from cameras and then forwarding to
youtube. By acting as the RTMP encoder (instead of the camera) vidfoward can enable
persistent streams by sending slate images during camera downtime.
AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org>
LICENSE
Copyright (C) 2022 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package main
import (
"encoding/json"
"flag"
"fmt"
"io"
"net"
"net/http"
"strconv"
"strings"
"sync"
"time"
"bitbucket.org/ausocean/av/codec/codecutil"
"bitbucket.org/ausocean/av/container/mts"
"bitbucket.org/ausocean/av/revid"
"bitbucket.org/ausocean/av/revid/config"
"bitbucket.org/ausocean/iot/pi/netlogger"
"bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/logging"
"gopkg.in/natefinch/lumberjack.v2"
)
// Server defaults.
const (
defaultPort = "8080"
defaultHost = ""
)
// Logging configuration.
const (
logPath = "/var/log/netsender/netsender.log"
logMaxSize = 500 // MB
logMaxBackup = 10
logMaxAge = 28 // days
logVerbosity = logging.Info
logSuppress = false
)
// Misc constants.
const (
netSendRetryTime = 5 * time.Second
defaultSleepTime = 60 // Seconds
profilePath = "rv.prof"
pkg = "rv: "
runPreDelay = 20 * time.Second
)
// forwardHandler implements http.Handler and handles video forwarding requests
// (re-using the recv method created for vidgrind). forwardHandler also keeps
// track of the active mac addresses and their respective revid pipelines.
type forwardHandler struct {
actives map[string]*revid.Revid
log logging.Logger
mu sync.Mutex
}
// ServeHTTP handles recv requests for video forwarding. The MAC is firstly
// checked to ensure it is "active" i.e. should be sending data, and then the
// video is extracted from the request body and provided to the revid pipeline
// corresponding to said MAC.
func (h *forwardHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.log.Debug("recv handler")
q := r.URL.Query()
ma := q.Get("ma")
rv := h.getActive(ma)
if r == nil {
h.errorLogWrite(w, "forward request mac not active, doing nothing", "mac", ma)
return
}
const videoPin = "V0"
v := q.Get(videoPin)
if v == "" {
h.errorLogWrite(w, "forward request video pin V0 absent, or has no value")
return
}
size, err := strconv.Atoi(v)
if err != nil {
h.errorLogWrite(w, "forward request video size can't be conerted to int", "error", err)
return
}
if size <= 0 {
h.errorLogWrite(w, "forward request video size invalid", "size", size)
return
}
resp := map[string]interface{}{"ma": ma, "V0": size}
mtsClip, err := io.ReadAll(r.Body)
if err != nil {
h.errorLogWrite(w, "could not read forward request body", "error", err)
return
}
defer r.Body.Close()
if len(mtsClip)%mts.PacketSize != 0 {
h.errorLogWrite(w, "invalid clip length", "length", len(mtsClip))
return
}
// Extract the pure h264.
h264Clip, err := mts.Extract(mtsClip)
if err != nil {
h.errorLogWrite(w, "could not extract h.264 from the MPEG-TS clip", "error", err)
return
}
for i, frame := range h264Clip.Frames() {
_, err := rv.Write(frame.Media)
if err != nil {
h.errorLogWrite(w, "could not write frame", "no.", i)
return
}
}
// Return response to client as JSON.
jsn, err := json.Marshal(resp)
if err != nil {
h.errorLogWrite(w, "could not get json for response", "error", err)
return
}
fmt.Fprint(w, string(jsn))
}
// updateActives updates the actives map based on a string of CSV where each
// value is a key-value of mac and url i.e. <mac1>=<url1>,<mac2>=<url2>,...
// If there are pairs in the actives map that do not correspond to any pairs in
// the provided CSV, they will be removed.
func (h *forwardHandler) updateActives(v string) {
pairs := strings.Split(v, ",")
macs := make([]string, 0, len(pairs))
for _, p := range pairs {
pair := strings.Split(p, "=")
if len(pair) != 2 {
h.log.Warning("invalid <mac>=<rtmp url> pair", "pair", pair)
continue
}
m := pair[0]
macs = append(macs, m)
if !isMac(m) {
h.log.Warning("invalid MAC in actives update string", "mac", m)
continue
}
r := h.getActive(m)
if r == nil {
h.addActive(m, pair[1])
}
}
h.removeInactives(macs)
}
// writeError logs an error and writes to w in JSON format.
func (h *forwardHandler) errorLogWrite(w http.ResponseWriter, msg string, args ...interface{}) {
h.log.Error(msg, args...)
w.Header().Add("Content-Type", "application/json")
fmt.Fprint(w, `{"er":"`+msg+`"}`)
}
// getActive provides the revid pipeline for the provided MAC if active.
func (h *forwardHandler) getActive(ma string) *revid.Revid {
h.mu.Lock()
defer h.mu.Unlock()
v, ok := h.actives[ma]
if !ok {
return nil
}
return v
}
// addActive adds a new revid pipeline configured with the provided RTMP URL url
// for the MAC ma into the actives map.
func (h *forwardHandler) addActive(ma, url string) error {
h.mu.Lock()
cfg := config.Config{
Logger: h.log,
Input: config.InputManual,
InputCodec: codecutil.H264_AU,
Outputs: []uint8{config.OutputRTMP},
RTMPURL: url,
LogLevel: logging.Debug,
}
rv, err := revid.New(cfg, nil)
if err != nil {
return fmt.Errorf("coult not initialise revid: %w", err)
}
h.actives[ma] = rv
err = rv.Start()
if err != nil {
return fmt.Errorf("could not start revid pipeline")
}
h.mu.Unlock()
return nil
}
// removeInactives removes all pairs in the active map that do not correspond to
// those found in actives.
func (h *forwardHandler) removeInactives(actives []string) {
h.mu.Lock()
actives:
for k := range h.actives {
for _, m := range actives {
if k == m {
continue actives
}
}
delete(h.actives, k)
}
h.mu.Unlock()
}
// isActive checks to see if the provided MAC is in the actives map.
func (h *forwardHandler) isActive(ma string) bool {
h.mu.Lock()
_, ok := h.actives[ma]
h.mu.Unlock()
return ok
}
func main() {
host := flag.String("host", defaultHost, "Host ip to run video forwarder on.")
port := flag.String("port", defaultPort, "Port to run video forwarder on.")
flag.Parse()
if *host == "" || net.ParseIP(*host) == nil {
panic(fmt.Sprintf("invalid host, host: %s", *host))
}
// Create lumberjack logger to handle logging to file.
fileLog := &lumberjack.Logger{
Filename: logPath,
MaxSize: logMaxSize,
MaxBackups: logMaxBackup,
MaxAge: logMaxAge,
}
// Create netlogger to handle logging to cloud.
netLog := netlogger.New()
// Create logger that we call methods on to log, which in turn writes to the
// lumberjack and netloggers.
log := logging.New(logVerbosity, io.MultiWriter(fileLog, netLog), logSuppress)
log.Debug("initialising netsender client")
ns, err := netsender.New(log, nil, nil, nil)
if err != nil {
log.Fatal(pkg + "could not initialise netsender client: " + err.Error())
}
fh := &forwardHandler{log: log, actives: map[string]*revid.Revid{}}
log.Debug("beginning main loop")
readySig := make(chan struct{})
go run(ns, log, netLog, fh, readySig)
// We won't start serving until we've talked to the cloud for configuration.
<-readySig
http.Handle("/recv", fh)
http.ListenAndServe(*host+":"+*port, nil)
}
// run starts the main loop. This will run netsender on every pass of the loop
// (sleeping inbetween), check vars, and if changed, update configuration as
// appropriate.
func run(ns *netsender.Sender, l logging.Logger, nl *netlogger.Logger, fh *forwardHandler, sig chan struct{}) {
var vs int
for {
l.Debug("running netsender")
err := ns.Run()
if err != nil {
l.Warning(pkg+"Run Failed. Retrying...", "error", err.Error())
time.Sleep(netSendRetryTime)
continue
}
l.Debug("sending logs")
err = nl.Send(ns)
if err != nil {
l.Warning(pkg+"Logs could not be sent", "error", err.Error())
}
l.Debug("checking varsum")
newVs := ns.VarSum()
if vs == newVs {
sleep(ns, l)
continue
}
vs = newVs
l.Info("varsum changed", "vs", vs)
l.Debug("getting new vars")
vars, err := ns.Vars()
if err != nil {
l.Error(pkg+"netSender failed to get vars", "error", err.Error())
time.Sleep(netSendRetryTime)
continue
}
l.Debug("got new vars", "vars", vars)
// Check the actives variable and update the forward handlers active map.
v, ok := vars["Actives"]
if !ok {
l.Warning("no actives variable in var map", "vars", vars)
sleep(ns, l)
continue
}
fh.updateActives(v)
// sig is closed on the first time we get here, indicating to external routine
// that we've got our configuration from the cloud.
select {
case <-sig:
default:
close(sig)
}
sleep(ns, l)
}
}
// sleep uses a delay to halt the program based on the monitoring period
// netsender parameter (mp) defined in the netsender.conf config.
func sleep(ns *netsender.Sender, l logging.Logger) {
l.Debug("sleeping")
t, err := strconv.Atoi(ns.Param("mp"))
if err != nil {
l.Error(pkg+"could not get sleep time, using default", "error", err)
t = defaultSleepTime
}
time.Sleep(time.Duration(t) * time.Second)
l.Debug("finished sleeping")
}
func isMac(m string) bool {
if len(m) != 17 || m == "00:00:00:00:00:00" {
return false
}
for i := 0; i <= 15; i++ {
if (i+1)%3 == 0 && m[i] != ':' {
return false
}
if (3-i)%3 != 0 {
continue
}
_, err := strconv.ParseUint(m[i:i+2], 16, 64)
if err != nil {
return false
}
}
return true
}

View File

@ -0,0 +1,49 @@
/*
DESCRIPTION
Provides testing for vidfoward functionality.
AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org>
LICENSE
Copyright (C) 2022 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package main
import "testing"
// TestIsMac tests the isMac function.
func TestIsMac(t *testing.T) {
tests := []struct {
in string
want bool
}{
{in: "00:00:00:00:00:00", want: false},
{in: "00000:00:00:00:01", want: false},
{in: "00:00:00:00000:01", want: false},
{in: "15:b5:c7:cg:87:92", want: false},
{in: "00:00:00:00:00", want: false},
{in: "7d:ac:cf:84:e8:01", want: true},
}
for i, test := range tests {
got := isMac(test.in)
if test.want != got {
t.Errorf("did not get expected result for test %d\ngot: %v\nwant: %v", i, got, test.want)
}
}
}

View File

@ -25,8 +25,10 @@ LICENSE
package codecutil package codecutil
import ( import (
"errors"
"fmt" "fmt"
"io" "io"
"math"
"time" "time"
) )
@ -81,3 +83,141 @@ func (l *ByteLexer) Lex(dst io.Writer, src io.Reader, d time.Duration) error {
} }
} }
} }
// Noop reads media "frames" from src, queues and then writes to dst at intervals,
// maintaining a steady number of frames stored in the queue. This ensures frames
// are outputted at a consistent rate; useful if reads occur from src in blocks (a
// side effect if src is connected to an input that receives packets containing
// multiple frames at intervals e.g. MPEG-TS over HTTP).
// Noop assumes that writing to the input connected to src is blocked until the
// entire previous write is read, i.e. src is expected to be connected to
// a pipe-like structure.
func Noop(dst io.Writer, src io.Reader, d time.Duration) error {
// Controller tuning constants.
const (
target = 500 // Target channel size to maintain.
coef = 0.01 // Proportional controller coefficient.
minDelay = 1 // Minimum delay between writes (ms).
maxDelay = 1000 // Maximum delay between writes (ms).
defaultDelay = 40 * time.Millisecond // Default delay between writes, equivalent to ~25fps.
)
// Ring buffer tuning.
const (
ringCap = 1000 // Ring buffer capacity.
ringElemSize = 250000 // Ring buffer element size i.e. max h264 frame size.
)
if d < 0 {
return fmt.Errorf("invalid delay: %v", d)
}
if d == 0 {
d = defaultDelay
}
var (
delay = time.NewTicker(d) // Ticker based on delay between frames.
errCh = make(chan error) // Used by the output routine to signal errors to the main loop.
rb = newRingBuffer(ringElemSize, ringCap) // Use a ring buffer to reduce allocation and GC load.
)
defer delay.Stop()
// This routine is responsible for frame output.
go func() {
for {
err := rb.writeTo(dst)
if err != nil {
errCh <- fmt.Errorf("could not write to dst: %w", err)
}
<-delay.C
// Adjust delay using proportional controller.
adj := coef * float64(target-rb.len())
adj = math.Max(math.Min(adj, minDelay), maxDelay) // Limit the delay.
d += time.Millisecond * time.Duration(adj)
delay.Reset(d)
}
}()
// This loop is responsible for reading frames and checking any errors from
// the output routine.
for {
err := rb.readFrom(src)
if err != nil {
return fmt.Errorf("could not read src: %w", err)
}
select {
case err := <-errCh:
return fmt.Errorf("error from output routine: %w", err)
default:
}
}
}
// ringBuffer is a basic concurrency safe ring buffer. Concurrency safety is
// achieved using a channel between read and write methods i.e. overwrite/dropping
// behaviour is absent and blocking will occur.
type ringBuffer struct {
n int // Num. of elements.
i int // Current index in underlying buffer.
buf [][]byte // Underlying buffer.
ch chan []byte // ch will act as our concurrency safe queue.
}
func newRingBuffer(sz, cap int) *ringBuffer {
rb := &ringBuffer{
buf: make([][]byte, cap),
n: cap,
ch: make(chan []byte, cap),
}
for i := range rb.buf {
rb.buf[i] = make([]byte, sz)
}
return rb
}
// readFrom gets the next []byte from the buffer and uses it to read from r.
// This data is then stored in the buffer channel ready for writeTo to retreive.
// readFrom will block if the buffer channel is filled, at least within the
// timeout, otherwise an error is returned.
func (b *ringBuffer) readFrom(r io.Reader) error {
buf := b.buf[b.i]
b.i++
if b.i == b.n {
b.i = 0
}
n, err := r.Read(buf)
if err != nil {
return err
}
const dur = 1 * time.Minute
timeout := time.NewTimer(dur)
select {
case b.ch <- buf[:n]:
case <-timeout.C:
return errors.New("buffer chan send timeout")
}
return nil
}
// writeTo tries to get a []byte from the buffer channel within the timeout
// and then writes to w if successful, otherwise an error is returned.
func (b *ringBuffer) writeTo(w io.Writer) error {
const dur = 1 * time.Minute
timeout := time.NewTimer(dur)
select {
case p := <-b.ch:
_, err := w.Write(p)
if err != nil {
return err
}
case <-timeout.C:
return errors.New("buffer chan receive timeout")
}
return nil
}
func (b *ringBuffer) len() int {
return len(b.ch)
}

View File

@ -29,7 +29,8 @@ package codecutil
const ( const (
PCM = "pcm" PCM = "pcm"
ADPCM = "adpcm" ADPCM = "adpcm"
H264 = "h264" H264 = "h264" // h264 bytestream (requires lexing).
H264_AU = "h264_au" // Discrete h264 access units.
H265 = "h265" H265 = "h265"
MJPEG = "mjpeg" MJPEG = "mjpeg"
JPEG = "jpeg" JPEG = "jpeg"
@ -38,7 +39,7 @@ const (
// IsValid checks if a string is a known and valid codec in the right format. // IsValid checks if a string is a known and valid codec in the right format.
func IsValid(s string) bool { func IsValid(s string) bool {
switch s { switch s {
case PCM, ADPCM, H264, H265, MJPEG, JPEG: case PCM, ADPCM, H264, H264_AU, H265, MJPEG, JPEG:
return true return true
default: default:
return false return false

View File

@ -141,6 +141,11 @@ type Frame struct {
idx int // Index in the backing slice. idx int // Index in the backing slice.
} }
// Frames returns the frames of a h264 clip.
func (c *Clip) Frames() []Frame {
return c.frames
}
// Bytes returns the concatentated media bytes from each frame in the Clip c. // Bytes returns the concatentated media bytes from each frame in the Clip c.
func (c *Clip) Bytes() []byte { func (c *Clip) Bytes() []byte {
if c.backing == nil { if c.backing == nil {

View File

@ -73,3 +73,63 @@ func (me MultiError) Error() string {
} }
return fmt.Sprintf("%v", []error(me)) return fmt.Sprintf("%v", []error(me))
} }
// ManualInput is an implementation of the Devices interface that represents
// a manual input mechanism, i.e. data is written to this input manually through
// software (ManualInput also implements io.Writer, unlike other implementations).
// The ManualInput employs an io.Pipe, as such, every write must be accompanied
// by a full read (or reads) of the bytes, otherwise blocking will occur (and
// vice versa). This is intended to make writing of distinct access units easier i.e.
// one whole read (with a big enough buf provided) can represent a distinct frame.
type ManualInput struct {
isRunning bool
reader *io.PipeReader
writer *io.PipeWriter
}
// NewManualInput provides a new ManualInput.
func NewManualInput() *ManualInput {
r, w := io.Pipe()
return &ManualInput{reader: r, writer: w}
}
// Read reads from the manual input and puts the bytes into p.
func (m *ManualInput) Read(p []byte) (int, error) {
return m.reader.Read(p)
}
// Name returns the name of ManualInput i.e. "ManualInput".
func (m *ManualInput) Name() string { return "ManualInput" }
// Set is a stub to satisfy the Device interface; no configuration fields are
// required by ManualInput.
func (m *ManualInput) Set(c config.Config) error { return nil }
// Start sets the ManualInput isRunning flag to true. This is mostly here just
// to satisfy the Device interface.
func (m *ManualInput) Start() error {
m.isRunning = true
return nil
}
// Stop closes the pipe and sets the isRunning flag to false.
func (m *ManualInput) Stop() error {
if !m.isRunning {
return nil
}
err := m.reader.Close()
if err != nil {
return err
}
m.isRunning = false
return nil
}
// IsRunning returns the value of the isRunning flag to indicate if Start has
// been called (and Stop has not been called after).
func (m *ManualInput) IsRunning() bool { return m.isRunning }
// Write writes p to the ManualInput's writer side of its pipe.
func (m *ManualInput) Write(p []byte) (int, error) {
return m.writer.Write(p)
}

View File

@ -29,13 +29,14 @@ import (
"time" "time"
"bitbucket.org/ausocean/av/revid/config" "bitbucket.org/ausocean/av/revid/config"
"bitbucket.org/ausocean/utils/logging"
) )
func TestIsRunning(t *testing.T) { func TestIsRunning(t *testing.T) {
const dur = 250 * time.Millisecond const dur = 250 * time.Millisecond
const path = "../../../test/test-data/av/input/motion-detection/mjpeg/school.mjpeg" const path = "../../../test/test-data/av/input/motion-detection/mjpeg/school.mjpeg"
d := New() d := New((*logging.TestLogger)(t))
err := d.Set(config.Config{ err := d.Set(config.Config{
InputPath: path, InputPath: path,

View File

@ -44,6 +44,7 @@ const (
InputV4L InputV4L
InputRTSP InputRTSP
InputAudio InputAudio
InputManual
// Outputs. // Outputs.
OutputRTMP OutputRTMP
@ -54,7 +55,8 @@ const (
OutputFiles OutputFiles
// Codecs. // Codecs.
H264 H264 // h264 bytestream.
H264_AU // Discrete h264 access units.
H265 H265
MJPEG MJPEG
JPEG JPEG

View File

@ -317,7 +317,7 @@ var Variables = []struct {
}, },
{ {
Name: KeyInput, Name: KeyInput,
Type: "enum:raspivid,raspistill,rtsp,v4l,file,audio", Type: "enum:raspivid,raspistill,rtsp,v4l,file,audio,manual",
Update: func(c *Config, v string) { Update: func(c *Config, v string) {
c.Input = parseEnum( c.Input = parseEnum(
KeyInput, KeyInput,
@ -329,13 +329,14 @@ var Variables = []struct {
"v4l": InputV4L, "v4l": InputV4L,
"file": InputFile, "file": InputFile,
"audio": InputAudio, "audio": InputAudio,
"manual": InputManual,
}, },
c, c,
) )
}, },
Validate: func(c *Config) { Validate: func(c *Config) {
switch c.Input { switch c.Input {
case InputRaspivid, InputRaspistill, InputV4L, InputFile, InputAudio, InputRTSP: case InputRaspivid, InputRaspistill, InputV4L, InputFile, InputAudio, InputRTSP, InputManual:
default: default:
c.LogInvalidField(KeyInput, defaultInput) c.LogInvalidField(KeyInput, defaultInput)
c.Input = defaultInput c.Input = defaultInput
@ -344,7 +345,7 @@ var Variables = []struct {
}, },
{ {
Name: KeyInputCodec, Name: KeyInputCodec,
Type: "enum:h264,h265,mjpeg,jpeg,pcm,adpcm", Type: "enum:h264,h264_au,h265,mjpeg,jpeg,pcm,adpcm",
Update: func(c *Config, v string) { Update: func(c *Config, v string) {
c.InputCodec = v c.InputCodec = v
}, },

View File

@ -312,6 +312,14 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
case config.InputAudio: case config.InputAudio:
r.cfg.Logger.Debug("using audio input") r.cfg.Logger.Debug("using audio input")
err = r.setupAudio() err = r.setupAudio()
case config.InputManual:
r.cfg.Logger.Debug("using manual input")
r.input = device.NewManualInput()
err = r.setLexer(r.cfg.InputCodec, false)
default:
return fmt.Errorf("unrecognised input type: %v", r.cfg.Input)
} }
if err != nil { if err != nil {
return fmt.Errorf("could not set lexer: %w", err) return fmt.Errorf("could not set lexer: %w", err)
@ -339,6 +347,9 @@ func (r *Revid) setLexer(c string, isRTSP bool) error {
if isRTSP { if isRTSP {
r.lexTo = h264.NewExtractor().Extract r.lexTo = h264.NewExtractor().Extract
} }
case codecutil.H264_AU:
r.cfg.Logger.Debug("using H.264 AU codec")
r.lexTo = codecutil.Noop
case codecutil.H265: case codecutil.H265:
r.cfg.Logger.Debug("using H.265 codec") r.cfg.Logger.Debug("using H.265 codec")
r.lexTo = h265.NewExtractor(false).Extract r.lexTo = h265.NewExtractor(false).Extract
@ -363,14 +374,16 @@ func (r *Revid) setLexer(c string, isRTSP bool) error {
// processFrom is run as a routine to read from a input data source, lex and // processFrom is run as a routine to read from a input data source, lex and
// then send individual access units to revid's encoders. // then send individual access units to revid's encoders.
func (r *Revid) processFrom(in device.AVDevice, delay time.Duration) { func (r *Revid) processFrom(delay time.Duration) {
defer r.wg.Done() defer r.wg.Done()
err := in.Start() if r.input != nil {
err := r.input.Start()
if err != nil { if err != nil {
r.err <- fmt.Errorf("could not start input device: %w", err) r.err <- fmt.Errorf("could not start input device: %w", err)
return return
} }
}
// Lex data from input device, in, until finished or an error is encountered. // Lex data from input device, in, until finished or an error is encountered.
// For a continuous source e.g. a camera or microphone, we should remain // For a continuous source e.g. a camera or microphone, we should remain
@ -382,7 +395,7 @@ func (r *Revid) processFrom(in device.AVDevice, delay time.Duration) {
w = ioext.MultiWriteCloser(r.filters[0], r.probe) w = ioext.MultiWriteCloser(r.filters[0], r.probe)
} }
err = r.lexTo(w, in, delay) err := r.lexTo(w, r.input, delay)
switch err { switch err {
case nil, io.EOF: case nil, io.EOF:
r.cfg.Logger.Info("end of file") r.cfg.Logger.Info("end of file")
@ -394,7 +407,7 @@ func (r *Revid) processFrom(in device.AVDevice, delay time.Duration) {
r.cfg.Logger.Info("finished reading input") r.cfg.Logger.Info("finished reading input")
r.cfg.Logger.Debug("stopping input") r.cfg.Logger.Debug("stopping input")
err = in.Stop() err = r.input.Stop()
if err != nil { if err != nil {
r.err <- fmt.Errorf("could not stop input source: %w", err) r.err <- fmt.Errorf("could not stop input source: %w", err)
} else { } else {

View File

@ -126,6 +126,14 @@ func (r *Revid) Bitrate() int {
return r.bitrate.Bitrate() return r.bitrate.Bitrate()
} }
func (r *Revid) Write(p []byte) (int, error) {
mi, ok := r.input.(*device.ManualInput)
if !ok {
return 0, errors.New("cannot write to anything but ManualInput")
}
return mi.Write(p)
}
// Start invokes a Revid to start processing video from a defined input // Start invokes a Revid to start processing video from a defined input
// and packetising (if theres packetization) to a defined output. // and packetising (if theres packetization) to a defined output.
func (r *Revid) Start() error { func (r *Revid) Start() error {
@ -155,7 +163,7 @@ func (r *Revid) Start() error {
r.cfg.Logger.Debug("starting input processing routine") r.cfg.Logger.Debug("starting input processing routine")
r.wg.Add(1) r.wg.Add(1)
go r.processFrom(r.input, d) go r.processFrom(d)
r.running = true r.running = true
return nil return nil

View File

@ -68,6 +68,8 @@ type httpSender struct {
} }
// newHttpSender returns a pointer to a new httpSender. // newHttpSender returns a pointer to a new httpSender.
// report is callback that can be used to report the amount of data sent per write.
// This can be set to nil.
func newHTTPSender(ns *netsender.Sender, log logging.Logger, report func(sent int)) *httpSender { func newHTTPSender(ns *netsender.Sender, log logging.Logger, report func(sent int)) *httpSender {
return &httpSender{ return &httpSender{
client: ns, client: ns,
@ -82,7 +84,9 @@ func (s *httpSender) Write(d []byte) (int, error) {
err := httpSend(d, s.client, s.log) err := httpSend(d, s.client, s.log)
if err == nil { if err == nil {
s.log.Debug("good send", "len", len(d)) s.log.Debug("good send", "len", len(d))
if s.report != nil {
s.report(len(d)) s.report(len(d))
}
} else { } else {
s.log.Debug("bad send", "error", err) s.log.Debug("bad send", "error", err)
} }