vidforward & codecutil & mts & device & revid: improved Noop lexer code using ring buffer structure and improved overall commenting and code

This commit is contained in:
Saxon Nelson-Milton 2022-09-20 15:54:59 +09:30
parent b312774b49
commit dddfcc256b
12 changed files with 322 additions and 119 deletions

View File

@ -1,3 +1,29 @@
/*
DESCRIPTION
vidforward is a service for receiving video from cameras and then forwarding to
youtube. By acting as the RTMP encoder (instead of the camera) vidfoward can enable
persistent streams by sending slate images during camera downtime.
AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org>
LICENSE
Copyright (C) 2022 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package main
import (
@ -22,6 +48,7 @@ import (
"gopkg.in/natefinch/lumberjack.v2"
)
// Server defaults.
const (
defaultPort = "8080"
defaultHost = ""
@ -46,12 +73,19 @@ const (
runPreDelay = 20 * time.Second
)
// forwardHandler implements http.Handler and handles video forwarding requests
// (re-using the recv method created for vidgrind). forwardHandler also keeps
// track of the active mac addresses and their respective revid pipelines.
type forwardHandler struct {
actives map[string]*revid.Revid
log logging.Logger
mu sync.Mutex
}
// ServeHTTP handles recv requests for video forwarding. The MAC is firstly
// checked to ensure it is "active" i.e. should be sending data, and then the
// video is extracted from the request body and provided to the revid pipeline
// corresponding to said MAC.
func (h *forwardHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.log.Debug("recv handler")
q := r.URL.Query()
@ -88,12 +122,14 @@ func (h *forwardHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.errorLogWrite(w, "could not read forward request body", "error", err)
return
}
defer r.Body.Close()
if len(mtsClip)%mts.PacketSize != 0 {
h.errorLogWrite(w, "invalid clip length", "length", len(mtsClip))
return
}
// Extract the pure h264.
h264Clip, err := mts.Extract(mtsClip)
if err != nil {
h.errorLogWrite(w, "could not extract h.264 from the MPEG-TS clip", "error", err)
@ -108,7 +144,7 @@ func (h *forwardHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}
// Return response to client as JSON
// Return response to client as JSON.
jsn, err := json.Marshal(resp)
if err != nil {
h.errorLogWrite(w, "could not get json for response", "error", err)
@ -117,13 +153,43 @@ func (h *forwardHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, string(jsn))
}
// writeError writes an error code in JSON format and logs it if in debug mode.
// updateActives updates the actives map based on a string of CSV where each
// value is a key-value of mac and url i.e. <mac1>=<url1>,<mac2>=<url2>,...
// If there are pairs in the actives map that do not correspond to any pairs in
// the provided CSV, they will be removed.
func (h *forwardHandler) updateActives(v string) {
pairs := strings.Split(v, ",")
macs := make([]string, 0, len(pairs))
for _, p := range pairs {
pair := strings.Split(p, "=")
if len(pair) != 2 {
h.log.Warning("invalid <mac>=<rtmp url> pair", "pair", pair)
continue
}
m := pair[0]
macs = append(macs, m)
if !isMac(m) {
h.log.Warning("invalid MAC in actives update string", "mac", m)
continue
}
r := h.getActive(m)
if r == nil {
h.addActive(m, pair[1])
}
}
h.removeInactives(macs)
}
// writeError logs an error and writes to w in JSON format.
func (h *forwardHandler) errorLogWrite(w http.ResponseWriter, msg string, args ...interface{}) {
h.log.Error(msg, args...)
w.Header().Add("Content-Type", "application/json")
fmt.Fprint(w, `{"er":"`+msg+`"}`)
}
// getActive provides the revid pipeline for the provided MAC if active.
func (h *forwardHandler) getActive(ma string) *revid.Revid {
h.mu.Lock()
defer h.mu.Unlock()
@ -134,16 +200,17 @@ func (h *forwardHandler) getActive(ma string) *revid.Revid {
return v
}
// addActive adds a new revid pipeline configured with the provided RTMP URL url
// for the MAC ma into the actives map.
func (h *forwardHandler) addActive(ma, url string) error {
h.mu.Lock()
cfg := config.Config{
Logger: h.log,
Input: config.InputManual,
InputCodec: codecutil.H264_AU, // h264 access unit i.e. h264 frame.
InputCodec: codecutil.H264_AU,
Outputs: []uint8{config.OutputRTMP},
RTMPURL: url,
LogLevel: logging.Debug,
FileFPS: 25,
}
rv, err := revid.New(cfg, nil)
@ -159,11 +226,13 @@ func (h *forwardHandler) addActive(ma, url string) error {
return nil
}
func (h *forwardHandler) removeInactives(macs []string) {
// removeInactives removes all pairs in the active map that do not correspond to
// those found in actives.
func (h *forwardHandler) removeInactives(actives []string) {
h.mu.Lock()
actives:
for k := range h.actives {
for _, m := range macs {
for _, m := range actives {
if k == m {
continue actives
}
@ -173,6 +242,7 @@ actives:
h.mu.Unlock()
}
// isActive checks to see if the provided MAC is in the actives map.
func (h *forwardHandler) isActive(ma string) bool {
h.mu.Lock()
_, ok := h.actives[ma]
@ -216,13 +286,15 @@ func main() {
readySig := make(chan struct{})
go run(ns, log, netLog, fh, readySig)
// We won't start serving until we've talked to the cloud for configuration.
<-readySig
http.Handle("/recv", fh)
http.ListenAndServe(*host+":"+*port, nil)
}
// run starts the main loop. This will run netsender on every pass of the loop
// (sleeping inbetween), check vars, and if changed, update revid as appropriate.
// (sleeping inbetween), check vars, and if changed, update configuration as
// appropriate.
func run(ns *netsender.Sender, l logging.Logger, nl *netlogger.Logger, fh *forwardHandler, sig chan struct{}) {
var vs int
for {
@ -258,35 +330,17 @@ func run(ns *netsender.Sender, l logging.Logger, nl *netlogger.Logger, fh *forwa
}
l.Debug("got new vars", "vars", vars)
// Check the actives variable to seee what MACs are active.
// Check the actives variable and update the forward handlers active map.
v, ok := vars["Actives"]
if !ok {
l.Warning("no actives variable in var map", "vars", vars)
}
pairs := strings.Split(v, ",")
macs := make([]string, 0, len(pairs))
for _, p := range pairs {
pair := strings.Split(p, "=")
if len(pair) != 2 {
l.Warning("invalid <mac>=<rtmp url> pair", "pair", pair)
sleep(ns, l)
continue
}
fh.updateActives(v)
m := pair[0]
macs = append(macs, m)
if !isMac(m) {
l.Warning("invalid mac in Actives variable", "mac", m)
continue
}
r := fh.getActive(m)
if r == nil {
fh.addActive(m, pair[1])
}
}
fh.removeInactives(macs)
// sig is closed on the first time we get here, indicating to external routine
// that we've got our configuration from the cloud.
select {
case <-sig:
default:

Binary file not shown.

View File

@ -1,7 +1,32 @@
/*
DESCRIPTION
Provides testing for vidfoward functionality.
AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org>
LICENSE
Copyright (C) 2022 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package main
import "testing"
// TestIsMac tests the isMac function.
func TestIsMac(t *testing.T) {
tests := []struct {
in string

View File

@ -25,8 +25,10 @@ LICENSE
package codecutil
import (
"errors"
"fmt"
"io"
"math"
"time"
)
@ -82,58 +84,69 @@ func (l *ByteLexer) Lex(dst io.Writer, src io.Reader, d time.Duration) error {
}
}
// Noop assumes that for writing to src is blocked until the entire previous
// write to src is read, i.e. src is expected to connected to a pipe-like structure.
// Noop reads media "frames" from src, queues and then writes to dst at intervals,
// maintaining a steady number of frames stored in the queue. This ensures frames
// are outputted at a consistent rate; useful if reads occur from src in blocks (a
// side effect if src is connected to an input that receives packets containing
// multiple frames at intervals e.g. MPEG-TS over HTTP).
// Noop assumes that writing to the input connected to src is blocked until the
// entire previous write is read, i.e. src is expected to be connected to
// a pipe-like structure.
func Noop(dst io.Writer, src io.Reader, d time.Duration) error {
// Controller tuning constants.
const (
target = 500 // Target channel size to maintain.
coef = 0.01 // Proportional controller coefficient.
minDelay = 1 // Minimum delay between writes (ms).
maxDelay = 1000 // Maximum delay between writes (ms).
defaultDelay = 40 * time.Millisecond // Default delay between writes, equivalent to ~25fps.
)
// Ring buffer tuning.
const (
ringCap = 1000 // Ring buffer capacity.
ringElemSize = 250000 // Ring buffer element size i.e. max h264 frame size.
)
if d < 0 {
return fmt.Errorf("invalid delay: %v", d)
}
if d == 0 {
d = 40 * time.Millisecond
d = defaultDelay
}
ticker := time.NewTicker(d)
defer ticker.Stop()
const checkDur = 500 * time.Millisecond
rateChkTicker := time.NewTicker(checkDur)
frameCh := make(chan []byte, 1000)
errCh := make(chan error)
var (
delay = time.NewTicker(d) // Ticker based on delay between frames.
errCh = make(chan error) // Used by the output routine to signal errors to the main loop.
rb = newRingBuffer(ringElemSize, ringCap) // Use a ring buffer to reduce allocation and GC load.
)
defer delay.Stop()
// This routine is responsible for frame output.
go func() {
for {
toWrite := <-frameCh
_, err := dst.Write(toWrite)
err := rb.writeTo(dst)
if err != nil {
errCh <- fmt.Errorf("could not write to dst: %w", err)
}
<-delay.C
<-ticker.C
select {
case <-rateChkTicker.C:
var adj int
const equilibrium = 500
if len(frameCh) > equilibrium {
adj = -2
} else if len(frameCh) < equilibrium {
adj = 2
}
// Adjust delay using proportional controller.
adj := coef * float64(target-rb.len())
adj = math.Max(math.Min(adj, minDelay), maxDelay) // Limit the delay.
d += time.Millisecond * time.Duration(adj)
ticker.Reset(d)
default:
}
delay.Reset(d)
}
}()
const maxFrameSize = 250000 // = 20kB
buf := make([]byte, maxFrameSize)
// This loop is responsible for reading frames and checking any errors from
// the output routine.
for {
n, err := src.Read(buf)
err := rb.readFrom(src)
if err != nil {
return fmt.Errorf("could not read src: %w", err)
}
newFrame := make([]byte, n)
copy(newFrame, buf[:n])
frameCh <- newFrame
select {
case err := <-errCh:
return fmt.Errorf("error from output routine: %w", err)
@ -141,3 +154,70 @@ func Noop(dst io.Writer, src io.Reader, d time.Duration) error {
}
}
}
// ringBuffer is a basic concurrency safe ring buffer. Concurrency safety is
// achieved using a channel between read and write methods i.e. overwrite/dropping
// behaviour is absent and blocking will occur.
type ringBuffer struct {
n int // Num. of elements.
i int // Current index in underlying buffer.
buf [][]byte // Underlying buffer.
ch chan []byte // ch will act as our concurrency safe queue.
}
func newRingBuffer(sz, cap int) *ringBuffer {
rb := &ringBuffer{
buf: make([][]byte, cap),
n: cap,
ch: make(chan []byte, cap),
}
for i := range rb.buf {
rb.buf[i] = make([]byte, sz)
}
return rb
}
// readFrom gets the next []byte from the buffer and uses it to read from r.
// This data is then stored in the buffer channel ready for writeTo to retreive.
// readFrom will block if the buffer channel is filled, at least within the
// timeout, otherwise an error is returned.
func (b *ringBuffer) readFrom(r io.Reader) error {
buf := b.buf[b.i]
b.i++
if b.i == b.n {
b.i = 0
}
n, err := r.Read(buf)
if err != nil {
return err
}
const dur = 1 * time.Minute
timeout := time.NewTimer(dur)
select {
case b.ch <- buf[:n]:
case <-timeout.C:
return errors.New("buffer chan send timeout")
}
return nil
}
// writeTo tries to get a []byte from the buffer channel within the timeout
// and then writes to w if successful, otherwise an error is returned.
func (b *ringBuffer) writeTo(w io.Writer) error {
const dur = 1 * time.Minute
timeout := time.NewTimer(dur)
select {
case p := <-b.ch:
_, err := w.Write(p)
if err != nil {
return err
}
case <-timeout.C:
return errors.New("buffer chan receive timeout")
}
return nil
}
func (b *ringBuffer) len() int {
return len(b.ch)
}

View File

@ -29,8 +29,8 @@ package codecutil
const (
PCM = "pcm"
ADPCM = "adpcm"
H264 = "h264"
H264_AU = "h264_au"
H264 = "h264" // h264 bytestream (requires lexing).
H264_AU = "h264_au" // Discrete h264 access units.
H265 = "h265"
MJPEG = "mjpeg"
JPEG = "jpeg"

View File

@ -141,6 +141,7 @@ type Frame struct {
idx int // Index in the backing slice.
}
// Frames returns the frames of a h264 clip.
func (c *Clip) Frames() []Frame {
return c.frames
}

View File

@ -74,11 +74,62 @@ func (me MultiError) Error() string {
return fmt.Sprintf("%v", []error(me))
}
type ManualInput struct {}
func NewManualInput() *ManualInput { return &ManualInput{} }
func (m *ManualInput) Read(p []byte) (int, error) { return 0, nil }
// ManualInput is an implementation of the Devices interface that represents
// a manual input mechanism, i.e. data is written to this input manually through
// software (ManualInput also implements io.Writer, unlike other implementations).
// The ManualInput employs an io.Pipe, as such, every write must be accompanied
// by a full read (or reads) of the bytes, otherwise blocking will occur (and
// vice versa). This is intended to make writing of distinct access units easier i.e.
// one whole read (with a big enough buf provided) can represent a distinct frame.
type ManualInput struct {
isRunning bool
reader *io.PipeReader
writer *io.PipeWriter
}
// NewManualInput provides a new ManualInput.
func NewManualInput() *ManualInput {
r, w := io.Pipe()
return &ManualInput{reader: r, writer: w}
}
// Read reads from the manual input and puts the bytes into p.
func (m *ManualInput) Read(p []byte) (int, error) {
return m.reader.Read(p)
}
// Name returns the name of ManualInput i.e. "ManualInput".
func (m *ManualInput) Name() string { return "ManualInput" }
// Set is a stub to satisfy the Device interface; no configuration fields are
// required by ManualInput.
func (m *ManualInput) Set(c config.Config) error { return nil }
func (m *ManualInput) Start() error { return nil }
func (m *ManualInput) Stop() error { return nil }
func (m *ManualInput) IsRunning() bool { return true }
// Start sets the ManualInput isRunning flag to true. This is mostly here just
// to satisfy the Device interface.
func (m *ManualInput) Start() error {
m.isRunning = true
return nil
}
// Stop closes the pipe and sets the isRunning flag to false.
func (m *ManualInput) Stop() error {
if !m.isRunning {
return nil
}
err := m.reader.Close()
if err != nil {
return err
}
m.isRunning = false
return nil
}
// IsRunning returns the value of the isRunning flag to indicate if Start has
// been called (and Stop has not been called after).
func (m *ManualInput) IsRunning() bool { return m.isRunning }
// Write writes p to the ManualInput's writer side of its pipe.
func (m *ManualInput) Write(p []byte) (int, error) {
return m.writer.Write(p)
}

View File

@ -29,13 +29,14 @@ import (
"time"
"bitbucket.org/ausocean/av/revid/config"
"bitbucket.org/ausocean/utils/logging"
)
func TestIsRunning(t *testing.T) {
const dur = 250 * time.Millisecond
const path = "../../../test/test-data/av/input/motion-detection/mjpeg/school.mjpeg"
d := New()
d := New((*logging.TestLogger)(t))
err := d.Set(config.Config{
InputPath: path,

View File

@ -55,8 +55,8 @@ const (
OutputFiles
// Codecs.
H264
H264_AU
H264 // h264 bytestream.
H264_AU // Discrete h264 access units.
H265
MJPEG
JPEG

View File

@ -317,10 +317,9 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
r.cfg.Logger.Debug("using manual input")
r.input = device.NewManualInput()
err = r.setLexer(r.cfg.InputCodec, false)
r.pipeReader, r.pipeWriter = io.Pipe()
default:
return fmt.Errorf("unrecognised input type: %v")
return fmt.Errorf("unrecognised input type: %v", r.cfg.Input)
}
if err != nil {
return fmt.Errorf("could not set lexer: %w", err)
@ -396,13 +395,7 @@ func (r *Revid) processFrom(delay time.Duration) {
w = ioext.MultiWriteCloser(r.filters[0], r.probe)
}
var err error
if r.cfg.Input == config.InputManual {
err = r.lexTo(w, r.pipeReader, delay)
} else {
err = r.lexTo(w, r.input, delay)
}
err := r.lexTo(w, r.input, delay)
switch err {
case nil, io.EOF:
r.cfg.Logger.Info("end of file")

View File

@ -102,9 +102,6 @@ type Revid struct {
// stop is used to signal stopping when looping an input.
stop chan struct{}
pipeReader *io.PipeReader
pipeWriter *io.PipeWriter
}
// New returns a pointer to a new Revid with the desired configuration, and/or
@ -130,10 +127,11 @@ func (r *Revid) Bitrate() int {
}
func (r *Revid) Write(p []byte) (int, error) {
if r.pipeWriter == nil {
return 0, errors.New("revid input pipewriter not initialised, please start revid")
mi, ok := r.input.(*device.ManualInput)
if !ok {
return 0, errors.New("cannot write to anything but ManualInput")
}
return r.pipeWriter.Write(p)
return mi.Write(p)
}
// Start invokes a Revid to start processing video from a defined input