av/cmd/vidforward/main.go

462 lines
14 KiB
Go

/*
DESCRIPTION
vidforward is a service for receiving video from cameras and then forwarding to
youtube. By acting as the RTMP encoder (instead of the camera) vidforward can enable
persistent streams by sending slate images during camera downtime.
AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org>
LICENSE
Copyright (C) 2022 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package main
import (
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"net"
"net/http"
"strconv"
"sync"
"time"
"bitbucket.org/ausocean/av/codec/codecutil"
"bitbucket.org/ausocean/av/codec/h264"
"bitbucket.org/ausocean/av/container/mts"
"bitbucket.org/ausocean/av/device/file"
"bitbucket.org/ausocean/av/revid"
"bitbucket.org/ausocean/av/revid/config"
"bitbucket.org/ausocean/iot/pi/netlogger"
"bitbucket.org/ausocean/utils/logging"
"gopkg.in/natefinch/lumberjack.v2"
)
// Server defaults.
const (
defaultPort = "8080"
defaultHost = ""
)
// Logging configuration.
const (
logPath = "/var/log/vidforward/vidforward.log"
logMaxSize = 500 // MB
logMaxBackup = 10
logMaxAge = 28 // days
logVerbosity = logging.Info
logSuppress = true
)
// recvErrorDelay is a delay used when there's recv issues. It is intended to
// prevent spamming from a single client.
const recvErrorDelay = 7 * time.Second
type MAC string
// Broadcast is representative of a broadcast to be forwarded.
type Broadcast struct {
MAC // MAC address of the device from which the video is being received.
URL string // The destination youtube RTMP URL.
Status string // The broadcast status i.e. active, inactive and slate.
RV *revid.Revid // The revid pipeline which will handle forwarding to youtube.
}
// broadcastManager manages a map of Broadcasts we expect to be forwarding video
// for. The broadcastManager is communicated with through a series of HTTP request
// handlers. There is a basic REST API through which we can add/delete broadcasts,
// and a recv handler which is invoked when a camera wishes to get its video
// forwarded to youtube.
type broadcastManager struct {
broadcasts map[MAC]Broadcast
slateExitSignals map[MAC]chan struct{} // Used to signal to stop writing slate image.
log logging.Logger
mu sync.Mutex
}
// newBroadcastManager returns a new broadcastManager with the provided logger.
func newBroadcastManager(l logging.Logger) *broadcastManager {
return &broadcastManager{log: l, broadcasts: make(map[MAC]Broadcast), slateExitSignals: make(map[MAC]chan struct{})}
}
// recvHandler handles recv requests for video forwarding. The MAC is firstly
// checked to ensure it is "active" i.e. should be sending data, and then the
// video is extracted from the request body and provided to the revid pipeline
// corresponding to said MAC.
// Clips of MPEG-TS h264 are the only accepted format and codec.
func (m *broadcastManager) recv(w http.ResponseWriter, r *http.Request) {
m.log.Debug("recv handler")
q := r.URL.Query()
ma := MAC(q.Get("ma"))
if !m.isActive(ma) {
m.errorLogWrite(w, "forward request mac is not mapped, doing nothing", "mac", ma)
time.Sleep(recvErrorDelay)
return
}
// We can't receive video if we're in slate mode.
if m.getStatus(ma) == "slate" {
m.errorLogWrite(w, "cannot receive video for this mac, status is slate", "mac", ma)
time.Sleep(recvErrorDelay)
return
}
const videoPin = "V0"
sizeStr := q.Get(videoPin)
size, err := strconv.Atoi(sizeStr)
if err != nil || size <= 0 {
m.errorLogWrite(w, "invalid video size", "error", err, "size str", sizeStr)
return
}
// Prepare HTTP response with received video size and device mac.
resp := map[string]interface{}{"ma": ma, "V0": size}
mtsClip, err := io.ReadAll(r.Body)
if err != nil {
m.errorLogWrite(w, "could not read forward request body", "error", err)
return
}
defer r.Body.Close()
if len(mtsClip)%mts.PacketSize != 0 {
m.errorLogWrite(w, "invalid clip length", "length", len(mtsClip))
return
}
// Extract the pure h264 from the MPEG-TS clip.
h264Clip, err := mts.Extract(mtsClip)
if err != nil {
m.errorLogWrite(w, "could not extract m.264 from the MPEG-TS clip", "error", err)
return
}
rv := m.getPipeline(ma)
if r == nil {
panic(fmt.Sprintf("no revid pipeline for mac address: %s", ma))
}
for i, frame := range h264Clip.Frames() {
_, err := rv.Write(frame.Media)
if err != nil {
m.errorLogWrite(w, "could not write frame", "no.", i, "error", err)
return
}
}
// Return response to client as JSON.
jsn, err := json.Marshal(resp)
if err != nil {
m.errorLogWrite(w, "could not get json for response", "error", err)
return
}
fmt.Fprint(w, string(jsn))
}
// control handles control API requests.
func (m *broadcastManager) control(w http.ResponseWriter, r *http.Request) {
m.log.Info("control request", "method", r.Method)
switch r.Method {
case http.MethodPut:
m.processRequest(w, r, m.createOrUpdate)
case http.MethodDelete:
m.processRequest(w, r, m.delete)
default:
m.errorLogWrite(w, "unhandled http method", "method", r.Method)
}
}
// processRequest unmarshals the broadcast data object from the request into
// a Broadcast value, and then performs the provided action with that value.
func (m *broadcastManager) processRequest(w http.ResponseWriter, r *http.Request, action func(Broadcast) error) {
body, err := io.ReadAll(r.Body)
if err != nil {
m.errorLogWrite(w, "could not read request body", "body", r.Body)
return
}
defer r.Body.Close()
var broadcast Broadcast
err = json.Unmarshal(body, &broadcast)
if err != nil {
m.errorLogWrite(w, "could not marshal data", "error", err)
return
}
err = action(broadcast)
if err != nil {
m.errorLogWrite(w, "could not perform action", "method", r.Method, "error", err)
}
}
// writeError logs an error and writes to w in JSON format.
func (m *broadcastManager) errorLogWrite(w http.ResponseWriter, msg string, args ...interface{}) {
m.log.Error(msg, args...)
w.Header().Add("Content-Type", "application/json")
fmt.Fprint(w, `{"er":"`+msg+`"}`)
}
// getPipeline gets the revid pipeline corresponding to a provided device MAC.
func (m *broadcastManager) getPipeline(ma MAC) *revid.Revid {
m.mu.Lock()
defer m.mu.Unlock()
v, ok := m.broadcasts[ma]
if !ok {
return nil
}
return v.RV
}
// getStatus gets the broadcast's status corresponding to the provided MAC.
func (m *broadcastManager) getStatus(ma MAC) string {
m.mu.Lock()
defer m.mu.Unlock()
v, ok := m.broadcasts[ma]
if !ok {
return ""
}
return v.Status
}
// isActive returns true if a MAC is registered to the broadcast manager.
func (m *broadcastManager) isActive(ma MAC) bool {
m.mu.Lock()
defer m.mu.Unlock()
_, ok := m.broadcasts[ma]
return ok
}
// createOrUpdate creates or updates a Broadcast record. The revid pipeline
// corresponding to the broadcast MAC is firsty configured/re-configured, and
// the pipeline is "started", which will ready it for receiving video on its
// input. In the case that the status is "slate", we will spin up a routine to
// handle writing a slate image to the pipeline.
func (m *broadcastManager) createOrUpdate(broadcast Broadcast) error {
m.mu.Lock()
defer m.mu.Unlock()
cfg := config.Config{
Logger: m.log,
Input: config.InputManual,
InputCodec: codecutil.H264_AU,
Outputs: []uint8{config.OutputRTMP},
RTMPURL: broadcast.URL,
LogLevel: logging.Debug,
}
var err error
broadcast.RV, err = revid.New(cfg, nil)
if err != nil {
return fmt.Errorf("could not initialise revid: %w", err)
}
m.broadcasts[broadcast.MAC] = broadcast
err = broadcast.RV.Start()
if err != nil {
return fmt.Errorf("could not start revid pipeline: %w", err)
}
switch broadcast.Status {
case "create":
fallthrough
case "play":
signal, ok := m.slateExitSignals[broadcast.MAC]
if ok {
close(signal)
delete(m.slateExitSignals, broadcast.MAC)
}
case "slate":
m.log.Debug("slate request")
// If there's a signal channel it means that we're already writing the slate
// image and theres nothing to do, so return.
_, ok := m.slateExitSignals[broadcast.MAC]
if ok {
m.log.Warning("already writing slate")
return nil
}
// First create a signal that can be used to stop the slate writing routine.
// This will be provided to the writeSlate routine below.
signalCh := make(chan struct{})
m.slateExitSignals[broadcast.MAC] = signalCh
// Also create an errCh that will be used to communicate errors from the
// writeSlate routine.
errCh := make(chan error)
go m.writeSlate(broadcast.RV, errCh, signalCh)
// We'll watch out for any errors that happen within a 5 second window. This
// will indicate something seriously wrong with init, like a missing file etc.
const startupWindowDuration = 5 * time.Second
startupWindow := time.NewTimer(startupWindowDuration)
select {
// If this triggers first, we're all good.
case <-startupWindow.C:
m.log.Debug("out of error window")
// We consider any errors after this either to be normal i.e. as a result
// of stopping the slate input, or something that can not be handled, and
// only logged, therefore we can close the error channel errCh now.
// This will also let the routine know that errors can no longer be sent
// down errCh.
close(errCh)
// This means we got a slate error pretty early and need to let caller know.
case err := <-errCh:
return fmt.Errorf("could not write slate image: %w", err)
}
default:
return fmt.Errorf("unknown status string: %s", broadcast.Status)
}
return nil
}
// writeSlate is a routine that employs a file input device and h264 lexer to
// write a h264 encoded slate image to the provided revid pipeline.
func (m *broadcastManager) writeSlate(rv *revid.Revid, errCh chan error, exitSignal chan struct{}) {
m.log.Info("writing slate")
const (
// This is temporary and will eventually be part of a broadcast configuration
// where the remote vidforward API user can provide the slate image.
slateFileName = "slate.h264"
// Assume 25fps until this becomes configurable.
slateFrameRate = 25
loopSetting = true
frameDelay = time.Second / slateFrameRate
)
fileInput := file.NewWith(m.log, slateFileName, loopSetting)
err := fileInput.Start()
if err != nil {
errCh <- err
return
}
// This will wait for a signal from the provided slateExitSignal (or from a
// timeout) to stop writing the slate by "Stopping" the file input which will
// terminate the Lex function.
go func() {
slateTimeoutTimer := time.NewTimer(24 * time.Hour)
select {
case <-slateTimeoutTimer.C:
m.log.Warning("slate timeout")
case <-exitSignal:
m.log.Info("slate exist signal")
}
m.log.Info("stopping file input")
fileInput.Stop()
}()
// Begin lexing the slate file and send frames to rv pipeline. We'll stay in
// here until file input closes or there's an unexpected error.
err = h264.Lex(rv, fileInput, frameDelay)
// If we get to this point, it means that the we've finished lexing for some
// reason; let's figure out why.
select {
// The only reason we'd get a receive on errCh from this side is if its been
// closed. This means that we've exceeded the "startup error" period, and that
// either the error is normal from stopping the input, or we can no longer inform
// the caller and just need to log the problem.
case <-errCh:
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
m.log.Debug("got expected error", "error", err)
return
}
m.log.Error("got unexpected error", "error", err)
// This means that a problem occured pretty early in lexing.
default:
m.log.Error("unexpected error during lex startup", "error", err)
errCh <- err
}
m.log.Error("finished writing slate")
}
// delete removes a broadcast from the record.
func (m *broadcastManager) delete(broadcast Broadcast) error {
m.mu.Lock()
b, ok := m.broadcasts[broadcast.MAC]
if !ok {
return errors.New("no broadcast by that mac in record")
}
b.RV.Stop()
delete(m.broadcasts, broadcast.MAC)
m.mu.Unlock()
return nil
}
func main() {
host := flag.String("host", defaultHost, "Host IP to run video forwarder on.")
port := flag.String("port", defaultPort, "Port to run video forwarder on.")
flag.Parse()
if *host == "" || net.ParseIP(*host) == nil {
panic(fmt.Sprintf("invalid host, host: %s", *host))
}
// Create lumberjack logger to handle logging to file.
fileLog := &lumberjack.Logger{
Filename: logPath,
MaxSize: logMaxSize,
MaxBackups: logMaxBackup,
MaxAge: logMaxAge,
}
// Create netlogger to handle logging to cloud.
netLog := netlogger.New()
// Create logger that we call methods on to log, which in turn writes to the
// lumberjack and netloggers.
log := logging.New(logVerbosity, io.MultiWriter(fileLog, netLog), logSuppress)
bm := newBroadcastManager(log)
http.HandleFunc("/recv", bm.recv)
http.HandleFunc("/control", bm.control)
http.ListenAndServe(*host+":"+*port, nil)
}
func isMac(m string) bool {
if len(m) != 17 || m == "00:00:00:00:00:00" {
return false
}
for i := 0; i <= 15; i++ {
if (i+1)%3 == 0 && m[i] != ':' {
return false
}
if (3-i)%3 != 0 {
continue
}
_, err := strconv.ParseUint(m[i:i+2], 16, 64)
if err != nil {
return false
}
}
return true
}