cmd/vidforward: add slate image write functionality

This includes adding facilities to handle termination signals
and err handling. Some modifications have been made to the file
input to accomodate the concurrency requirements. The slate
read mechanism is still fairly rudimentary and can only read
from a hardcoded file, but at this stage is for prototyping
purposes.
This commit is contained in:
Saxon Nelson-Milton 2022-11-19 15:35:39 +10:30
parent cb31c5de9b
commit f3cf74ce51
4 changed files with 214 additions and 40 deletions

View File

@ -36,9 +36,12 @@ import (
"net/http"
"strconv"
"sync"
"time"
"bitbucket.org/ausocean/av/codec/codecutil"
"bitbucket.org/ausocean/av/codec/h264"
"bitbucket.org/ausocean/av/container/mts"
"bitbucket.org/ausocean/av/device/file"
"bitbucket.org/ausocean/av/revid"
"bitbucket.org/ausocean/av/revid/config"
"bitbucket.org/ausocean/iot/pi/netlogger"
@ -54,14 +57,18 @@ const (
// Logging configuration.
const (
logPath = "/var/log/netsender/netsender.log"
logPath = "/var/log/vidforward/vidforward.log"
logMaxSize = 500 // MB
logMaxBackup = 10
logMaxAge = 28 // days
logVerbosity = logging.Info
logSuppress = false
logSuppress = true
)
// recvErrorDelay is a delay used when there's recv issues. It is intended to
// prevent spamming from a single client.
const recvErrorDelay = 7 * time.Second
type MAC string
// Broadcast is representative of a broadcast to be forwarded.
@ -79,10 +86,16 @@ type Broadcast struct {
// forwarded to youtube.
type broadcastManager struct {
broadcasts map[MAC]Broadcast
slateExitSignals map[MAC]chan struct{} // Used to signal to stop writing slate image.
log logging.Logger
mu sync.Mutex
}
// newBroadcastManager returns a new broadcastManager with the provided logger.
func newBroadcastManager(l logging.Logger) *broadcastManager {
return &broadcastManager{log: l, broadcasts: make(map[MAC]Broadcast), slateExitSignals: make(map[MAC]chan struct{})}
}
// recvHandler handles recv requests for video forwarding. The MAC is firstly
// checked to ensure it is "active" i.e. should be sending data, and then the
// video is extracted from the request body and provided to the revid pipeline
@ -93,9 +106,16 @@ func (m *broadcastManager) recv(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()
ma := MAC(q.Get("ma"))
_, ok := m.broadcasts[ma]
if !ok {
if !m.isActive(ma) {
m.errorLogWrite(w, "forward request mac is not mapped, doing nothing", "mac", ma)
time.Sleep(recvErrorDelay)
return
}
// We can't receive video if we're in slate mode.
if m.getStatus(ma) == "slate" {
m.errorLogWrite(w, "cannot receive video for this mac, status is slate", "mac", ma)
time.Sleep(recvErrorDelay)
return
}
@ -137,7 +157,7 @@ func (m *broadcastManager) recv(w http.ResponseWriter, r *http.Request) {
for i, frame := range h264Clip.Frames() {
_, err := rv.Write(frame.Media)
if err != nil {
m.errorLogWrite(w, "could not write frame", "no.", i)
m.errorLogWrite(w, "could not write frame", "no.", i, "error", err)
return
}
}
@ -153,6 +173,7 @@ func (m *broadcastManager) recv(w http.ResponseWriter, r *http.Request) {
// control handles control API requests.
func (m *broadcastManager) control(w http.ResponseWriter, r *http.Request) {
m.log.Info("control request", "method", r.Method)
switch r.Method {
case http.MethodPut:
m.processRequest(w, r, m.createOrUpdate)
@ -204,6 +225,25 @@ func (m *broadcastManager) getPipeline(ma MAC) *revid.Revid {
return v.RV
}
// getStatus gets the broadcast's status corresponding to the provided MAC.
func (m *broadcastManager) getStatus(ma MAC) string {
m.mu.Lock()
defer m.mu.Unlock()
v, ok := m.broadcasts[ma]
if !ok {
return ""
}
return v.Status
}
// isActive returns true if a MAC is registered to the broadcast manager.
func (m *broadcastManager) isActive(ma MAC) bool {
m.mu.Lock()
defer m.mu.Unlock()
_, ok := m.broadcasts[ma]
return ok
}
// createOrUpdate creates or updates a Broadcast record. The revid pipeline
// corresponding to the broadcast MAC is firsty configured/re-configured, and
// the pipeline is "started", which will ready it for receiving video on its
@ -234,15 +274,126 @@ func (m *broadcastManager) createOrUpdate(broadcast Broadcast) error {
return fmt.Errorf("could not start revid pipeline: %w", err)
}
if broadcast.Status == "slate" {
go m.writeSlate(broadcast.MAC, broadcast.RV)
switch broadcast.Status {
case "create":
fallthrough
case "play":
signal, ok := m.slateExitSignals[broadcast.MAC]
if ok {
close(signal)
delete(m.slateExitSignals, broadcast.MAC)
}
case "slate":
m.log.Debug("slate request")
// If there's a signal channel it means that we're already writing the slate
// image and theres nothing to do, so return.
_, ok := m.slateExitSignals[broadcast.MAC]
if ok {
m.log.Warning("already writing slate")
return nil
}
// First create a signal that can be used to stop the slate writing routine.
// This will be provided to the writeSlate routine below.
signalCh := make(chan struct{})
m.slateExitSignals[broadcast.MAC] = signalCh
// Also create an errCh that will be used to communicate errors from the
// writeSlate routine.
errCh := make(chan error)
go m.writeSlate(broadcast.RV, errCh, signalCh)
// We'll watch out for any errors that happen within a 5 second window. This
// will indicate something seriously wrong with init, like a missing file etc.
const startupWindowDuration = 5 * time.Second
startupWindow := time.NewTimer(startupWindowDuration)
select {
// If this triggers first, we're all good.
case <-startupWindow.C:
m.log.Debug("out of error window")
// We consider any errors after this either to be normal i.e. as a result
// of stopping the slate input, or something that can not be handled, and
// only logged, therefore we can close the error channel errCh now.
// This will also let the routine know that errors can no longer be sent
// down errCh.
close(errCh)
// This means we got a slate error pretty early and need to let caller know.
case err := <-errCh:
return fmt.Errorf("could not write slate image: %w", err)
}
default:
return fmt.Errorf("unknown status string: %s", broadcast.Status)
}
return nil
}
// This is just a stub. Eventually this will handle writing of a slate image
// to the destination RTMP URL.
func (m *broadcastManager) writeSlate(ma MAC, rv *revid.Revid) {}
// writeSlate is a routine that employs a file input device and h264 lexer to
// write a h264 encoded slate image to the provided revid pipeline.
func (m *broadcastManager) writeSlate(rv *revid.Revid, errCh chan error, exitSignal chan struct{}) {
m.log.Info("writing slate")
const (
// This is temporary and will eventually be part of a broadcast configuration
// where the remote vidforward API user can provide the slate image.
slateFileName = "slate.h264"
// Assume 25fps until this becomes configurable.
slateFrameRate = 25
loopSetting = true
frameDelay = time.Second / slateFrameRate
)
fileInput := file.NewWith(m.log, slateFileName, loopSetting)
err := fileInput.Start()
if err != nil {
errCh <- err
return
}
// This will wait for a signal from the provided slateExitSignal (or from a
// timeout) to stop writing the slate by "Stopping" the file input which will
// terminate the Lex function.
go func() {
slateTimeoutTimer := time.NewTimer(24 * time.Hour)
select {
case <-slateTimeoutTimer.C:
m.log.Warning("slate timeout")
case <-exitSignal:
m.log.Info("slate exist signal")
}
m.log.Info("stopping file input")
fileInput.Stop()
}()
// Begin lexing the slate file and send frames to rv pipeline. We'll stay in
// here until file input closes or there's an unexpected error.
err = h264.Lex(rv, fileInput, frameDelay)
// If we get to this point, it means that the we've finished lexing for some
// reason; let's figure out why.
select {
// The only reason we'd get a receive on errCh from this side is if its been
// closed. This means that we've exceeded the "startup error" period, and that
// either the error is normal from stopping the input, or we can no longer inform
// the caller and just need to log the problem.
case <-errCh:
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
m.log.Debug("got expected error", "error", err)
return
}
m.log.Error("got unexpected error", "error", err)
// This means that a problem occured pretty early in lexing.
default:
m.log.Error("unexpected error during lex startup", "error", err)
errCh <- err
}
m.log.Error("finished writing slate")
}
// delete removes a broadcast from the record.
func (m *broadcastManager) delete(broadcast Broadcast) error {
@ -281,7 +432,7 @@ func main() {
// lumberjack and netloggers.
log := logging.New(logVerbosity, io.MultiWriter(fileLog, netLog), logSuppress)
bm := &broadcastManager{log: log, broadcasts: map[MAC]Broadcast{}}
bm := newBroadcastManager(log)
http.HandleFunc("/recv", bm.recv)
http.HandleFunc("/control", bm.control)
http.ListenAndServe(*host+":"+*port, nil)

View File

@ -174,6 +174,7 @@ func newRingBuffer(sz, cap int, timeout time.Duration) *ringBuffer {
buf: make([][]byte, cap),
n: cap,
ch: make(chan []byte, cap),
timeout: timeout,
}
for i := range rb.buf {
rb.buf[i] = make([]byte, sz)

View File

@ -30,6 +30,7 @@ import (
"fmt"
"io"
"os"
"sync"
"bitbucket.org/ausocean/av/revid/config"
"bitbucket.org/ausocean/utils/logging"
@ -39,14 +40,23 @@ import (
// audio or video data.
type AVFile struct {
f *os.File
cfg config.Config
path string
loop bool
isRunning bool
log logging.Logger
set bool
mu sync.Mutex
}
// NewAVFile returns a new AVFile.
func New(l logging.Logger) *AVFile { return &AVFile{log: l} }
// NewWith returns a new AVFile with required params provided i.e. the Set
// method does not need to be called.
func NewWith(l logging.Logger, path string, loop bool) *AVFile {
return &AVFile{log: l, path: path, loop: loop, set: true}
}
// Name returns the name of the device.
func (m *AVFile) Name() string {
return "File"
@ -54,15 +64,22 @@ func (m *AVFile) Name() string {
// Set simply sets the AVFile's config to the passed config.
func (m *AVFile) Set(c config.Config) error {
m.cfg = c
m.path = c.InputPath
m.loop = c.Loop
m.set = true
return nil
}
// Start will open the file at the location of the InputPath field of the
// config struct.
func (m *AVFile) Start() error {
m.mu.Lock()
defer m.mu.Unlock()
var err error
m.f, err = os.Open(m.cfg.InputPath)
if !m.set {
return errors.New("AVFile has not been set with config")
}
m.f, err = os.Open(m.path)
if err != nil {
return fmt.Errorf("could not open media file: %w", err)
}
@ -72,6 +89,8 @@ func (m *AVFile) Start() error {
// Stop will close the file such that any further reads will fail.
func (m *AVFile) Stop() error {
m.mu.Lock()
defer m.mu.Unlock()
err := m.f.Close()
if err == nil {
m.isRunning = false
@ -83,13 +102,18 @@ func (m *AVFile) Stop() error {
// Read implements io.Reader. If start has not been called, or Start has been
// called and Stop has since been called, an error is returned.
func (m *AVFile) Read(p []byte) (int, error) {
if m.f != nil {
m.mu.Lock()
defer m.mu.Unlock()
if m.f == nil {
return 0, errors.New("AV file is closed, AVFile not started")
}
n, err := m.f.Read(p)
if err != nil && err != io.EOF {
return n, err
}
if (n < len(p) || err == io.EOF) && m.cfg.Loop {
if (n < len(p) || err == io.EOF) && m.loop {
m.log.Info("looping input file")
// In the case that we reach end of file but loop is true, we want to
// seek to start and keep reading from there.
@ -106,10 +130,10 @@ func (m *AVFile) Read(p []byte) (int, error) {
}
return n, err
}
return 0, errors.New("AV file is closed")
}
// IsRunning is used to determine if the AVFile device is running.
func (m *AVFile) IsRunning() bool {
m.mu.Lock()
defer m.mu.Unlock()
return m.f != nil && m.isRunning
}

2
go.mod
View File

@ -2,8 +2,6 @@ module bitbucket.org/ausocean/av
go 1.16
replace bitbucket.org/ausocean/utils v1.3.2 => ../utils
require (
bitbucket.org/ausocean/iot v1.3.3
bitbucket.org/ausocean/utils v1.3.2