av/cmd/vidforward/main.go

388 lines
9.8 KiB
Go

/*
DESCRIPTION
vidforward is a service for receiving video from cameras and then forwarding to
youtube. By acting as the RTMP encoder (instead of the camera) vidfoward can enable
persistent streams by sending slate images during camera downtime.
AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org>
LICENSE
Copyright (C) 2022 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package main
import (
"encoding/json"
"flag"
"fmt"
"io"
"net"
"net/http"
"strconv"
"strings"
"sync"
"time"
"bitbucket.org/ausocean/av/codec/codecutil"
"bitbucket.org/ausocean/av/container/mts"
"bitbucket.org/ausocean/av/revid"
"bitbucket.org/ausocean/av/revid/config"
"bitbucket.org/ausocean/iot/pi/netlogger"
"bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/logging"
"gopkg.in/natefinch/lumberjack.v2"
)
// Server defaults.
const (
defaultPort = "8080"
defaultHost = ""
)
// Logging configuration.
const (
logPath = "/var/log/netsender/netsender.log"
logMaxSize = 500 // MB
logMaxBackup = 10
logMaxAge = 28 // days
logVerbosity = logging.Info
logSuppress = false
)
// Misc constants.
const (
netSendRetryTime = 5 * time.Second
defaultSleepTime = 60 // Seconds
profilePath = "rv.prof"
pkg = "rv: "
runPreDelay = 20 * time.Second
)
// forwardHandler implements http.Handler and handles video forwarding requests
// (re-using the recv method created for vidgrind). forwardHandler also keeps
// track of the active mac addresses and their respective revid pipelines.
type forwardHandler struct {
actives map[string]*revid.Revid
log logging.Logger
mu sync.Mutex
}
// ServeHTTP handles recv requests for video forwarding. The MAC is firstly
// checked to ensure it is "active" i.e. should be sending data, and then the
// video is extracted from the request body and provided to the revid pipeline
// corresponding to said MAC.
func (h *forwardHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.log.Debug("recv handler")
q := r.URL.Query()
ma := q.Get("ma")
rv := h.getActive(ma)
if r == nil {
h.errorLogWrite(w, "forward request mac not active, doing nothing", "mac", ma)
return
}
const videoPin = "V0"
v := q.Get(videoPin)
if v == "" {
h.errorLogWrite(w, "forward request video pin V0 absent, or has no value")
return
}
size, err := strconv.Atoi(v)
if err != nil {
h.errorLogWrite(w, "forward request video size can't be conerted to int", "error", err)
return
}
if size <= 0 {
h.errorLogWrite(w, "forward request video size invalid", "size", size)
return
}
resp := map[string]interface{}{"ma": ma, "V0": size}
mtsClip, err := io.ReadAll(r.Body)
if err != nil {
h.errorLogWrite(w, "could not read forward request body", "error", err)
return
}
defer r.Body.Close()
if len(mtsClip)%mts.PacketSize != 0 {
h.errorLogWrite(w, "invalid clip length", "length", len(mtsClip))
return
}
// Extract the pure h264.
h264Clip, err := mts.Extract(mtsClip)
if err != nil {
h.errorLogWrite(w, "could not extract h.264 from the MPEG-TS clip", "error", err)
return
}
for i, frame := range h264Clip.Frames() {
_, err := rv.Write(frame.Media)
if err != nil {
h.errorLogWrite(w, "could not write frame", "no.", i)
return
}
}
// Return response to client as JSON.
jsn, err := json.Marshal(resp)
if err != nil {
h.errorLogWrite(w, "could not get json for response", "error", err)
return
}
fmt.Fprint(w, string(jsn))
}
// updateActives updates the actives map based on a string of CSV where each
// value is a key-value of mac and url i.e. <mac1>=<url1>,<mac2>=<url2>,...
// If there are pairs in the actives map that do not correspond to any pairs in
// the provided CSV, they will be removed.
func (h *forwardHandler) updateActives(v string) {
pairs := strings.Split(v, ",")
macs := make([]string, 0, len(pairs))
for _, p := range pairs {
pair := strings.Split(p, "=")
if len(pair) != 2 {
h.log.Warning("invalid <mac>=<rtmp url> pair", "pair", pair)
continue
}
m := pair[0]
macs = append(macs, m)
if !isMac(m) {
h.log.Warning("invalid MAC in actives update string", "mac", m)
continue
}
r := h.getActive(m)
if r == nil {
h.addActive(m, pair[1])
}
}
h.removeInactives(macs)
}
// writeError logs an error and writes to w in JSON format.
func (h *forwardHandler) errorLogWrite(w http.ResponseWriter, msg string, args ...interface{}) {
h.log.Error(msg, args...)
w.Header().Add("Content-Type", "application/json")
fmt.Fprint(w, `{"er":"`+msg+`"}`)
}
// getActive provides the revid pipeline for the provided MAC if active.
func (h *forwardHandler) getActive(ma string) *revid.Revid {
h.mu.Lock()
defer h.mu.Unlock()
v, ok := h.actives[ma]
if !ok {
return nil
}
return v
}
// addActive adds a new revid pipeline configured with the provided RTMP URL url
// for the MAC ma into the actives map.
func (h *forwardHandler) addActive(ma, url string) error {
h.mu.Lock()
cfg := config.Config{
Logger: h.log,
Input: config.InputManual,
InputCodec: codecutil.H264_AU,
Outputs: []uint8{config.OutputRTMP},
RTMPURL: url,
LogLevel: logging.Debug,
}
rv, err := revid.New(cfg, nil)
if err != nil {
return fmt.Errorf("coult not initialise revid: %w", err)
}
h.actives[ma] = rv
err = rv.Start()
if err != nil {
return fmt.Errorf("could not start revid pipeline")
}
h.mu.Unlock()
return nil
}
// removeInactives removes all pairs in the active map that do not correspond to
// those found in actives.
func (h *forwardHandler) removeInactives(actives []string) {
h.mu.Lock()
actives:
for k := range h.actives {
for _, m := range actives {
if k == m {
continue actives
}
}
delete(h.actives, k)
}
h.mu.Unlock()
}
// isActive checks to see if the provided MAC is in the actives map.
func (h *forwardHandler) isActive(ma string) bool {
h.mu.Lock()
_, ok := h.actives[ma]
h.mu.Unlock()
return ok
}
func main() {
host := flag.String("host", defaultHost, "Host ip to run video forwarder on.")
port := flag.String("port", defaultPort, "Port to run video forwarder on.")
flag.Parse()
if *host == "" || net.ParseIP(*host) == nil {
panic(fmt.Sprintf("invalid host, host: %s", *host))
}
// Create lumberjack logger to handle logging to file.
fileLog := &lumberjack.Logger{
Filename: logPath,
MaxSize: logMaxSize,
MaxBackups: logMaxBackup,
MaxAge: logMaxAge,
}
// Create netlogger to handle logging to cloud.
netLog := netlogger.New()
// Create logger that we call methods on to log, which in turn writes to the
// lumberjack and netloggers.
log := logging.New(logVerbosity, io.MultiWriter(fileLog, netLog), logSuppress)
log.Debug("initialising netsender client")
ns, err := netsender.New(log, nil, nil, nil)
if err != nil {
log.Fatal(pkg + "could not initialise netsender client: " + err.Error())
}
fh := &forwardHandler{log: log, actives: map[string]*revid.Revid{}}
log.Debug("beginning main loop")
readySig := make(chan struct{})
go run(ns, log, netLog, fh, readySig)
// We won't start serving until we've talked to the cloud for configuration.
<-readySig
http.Handle("/recv", fh)
http.ListenAndServe(*host+":"+*port, nil)
}
// run starts the main loop. This will run netsender on every pass of the loop
// (sleeping inbetween), check vars, and if changed, update configuration as
// appropriate.
func run(ns *netsender.Sender, l logging.Logger, nl *netlogger.Logger, fh *forwardHandler, sig chan struct{}) {
var vs int
for {
l.Debug("running netsender")
err := ns.Run()
if err != nil {
l.Warning(pkg+"Run Failed. Retrying...", "error", err.Error())
time.Sleep(netSendRetryTime)
continue
}
l.Debug("sending logs")
err = nl.Send(ns)
if err != nil {
l.Warning(pkg+"Logs could not be sent", "error", err.Error())
}
l.Debug("checking varsum")
newVs := ns.VarSum()
if vs == newVs {
sleep(ns, l)
continue
}
vs = newVs
l.Info("varsum changed", "vs", vs)
l.Debug("getting new vars")
vars, err := ns.Vars()
if err != nil {
l.Error(pkg+"netSender failed to get vars", "error", err.Error())
time.Sleep(netSendRetryTime)
continue
}
l.Debug("got new vars", "vars", vars)
// Check the actives variable and update the forward handlers active map.
v, ok := vars["Actives"]
if !ok {
l.Warning("no actives variable in var map", "vars", vars)
sleep(ns, l)
continue
}
fh.updateActives(v)
// sig is closed on the first time we get here, indicating to external routine
// that we've got our configuration from the cloud.
select {
case <-sig:
default:
close(sig)
}
sleep(ns, l)
}
}
// sleep uses a delay to halt the program based on the monitoring period
// netsender parameter (mp) defined in the netsender.conf config.
func sleep(ns *netsender.Sender, l logging.Logger) {
l.Debug("sleeping")
t, err := strconv.Atoi(ns.Param("mp"))
if err != nil {
l.Error(pkg+"could not get sleep time, using default", "error", err)
t = defaultSleepTime
}
time.Sleep(time.Duration(t) * time.Second)
l.Debug("finished sleeping")
}
func isMac(m string) bool {
if len(m) != 17 || m == "00:00:00:00:00:00" {
return false
}
for i := 0; i <= 15; i++ {
if (i+1)%3 == 0 && m[i] != ':' {
return false
}
if (3-i)%3 != 0 {
continue
}
_, err := strconv.ParseUint(m[i:i+2], 16, 64)
if err != nil {
return false
}
}
return true
}