mirror of https://bitbucket.org/ausocean/av.git
388 lines
9.8 KiB
Go
388 lines
9.8 KiB
Go
/*
|
|
DESCRIPTION
|
|
vidforward is a service for receiving video from cameras and then forwarding to
|
|
youtube. By acting as the RTMP encoder (instead of the camera) vidfoward can enable
|
|
persistent streams by sending slate images during camera downtime.
|
|
|
|
AUTHORS
|
|
Saxon A. Nelson-Milton <saxon@ausocean.org>
|
|
|
|
LICENSE
|
|
Copyright (C) 2022 the Australian Ocean Lab (AusOcean)
|
|
|
|
It is free software: you can redistribute it and/or modify them
|
|
under the terms of the GNU General Public License as published by the
|
|
Free Software Foundation, either version 3 of the License, or (at your
|
|
option) any later version.
|
|
|
|
It is distributed in the hope that it will be useful, but WITHOUT
|
|
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
|
for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
|
|
*/
|
|
|
|
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"bitbucket.org/ausocean/av/codec/codecutil"
|
|
"bitbucket.org/ausocean/av/container/mts"
|
|
"bitbucket.org/ausocean/av/revid"
|
|
"bitbucket.org/ausocean/av/revid/config"
|
|
"bitbucket.org/ausocean/iot/pi/netlogger"
|
|
"bitbucket.org/ausocean/iot/pi/netsender"
|
|
"bitbucket.org/ausocean/utils/logging"
|
|
"gopkg.in/natefinch/lumberjack.v2"
|
|
)
|
|
|
|
// Server defaults.
|
|
const (
|
|
defaultPort = "8080"
|
|
defaultHost = ""
|
|
)
|
|
|
|
// Logging configuration.
|
|
const (
|
|
logPath = "/var/log/netsender/netsender.log"
|
|
logMaxSize = 500 // MB
|
|
logMaxBackup = 10
|
|
logMaxAge = 28 // days
|
|
logVerbosity = logging.Info
|
|
logSuppress = false
|
|
)
|
|
|
|
// Misc constants.
|
|
const (
|
|
netSendRetryTime = 5 * time.Second
|
|
defaultSleepTime = 60 // Seconds
|
|
profilePath = "rv.prof"
|
|
pkg = "rv: "
|
|
runPreDelay = 20 * time.Second
|
|
)
|
|
|
|
// forwardHandler implements http.Handler and handles video forwarding requests
|
|
// (re-using the recv method created for vidgrind). forwardHandler also keeps
|
|
// track of the active mac addresses and their respective revid pipelines.
|
|
type forwardHandler struct {
|
|
actives map[string]*revid.Revid
|
|
log logging.Logger
|
|
mu sync.Mutex
|
|
}
|
|
|
|
// ServeHTTP handles recv requests for video forwarding. The MAC is firstly
|
|
// checked to ensure it is "active" i.e. should be sending data, and then the
|
|
// video is extracted from the request body and provided to the revid pipeline
|
|
// corresponding to said MAC.
|
|
func (h *forwardHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
h.log.Debug("recv handler")
|
|
q := r.URL.Query()
|
|
ma := q.Get("ma")
|
|
|
|
rv := h.getActive(ma)
|
|
if r == nil {
|
|
h.errorLogWrite(w, "forward request mac not active, doing nothing", "mac", ma)
|
|
return
|
|
}
|
|
|
|
const videoPin = "V0"
|
|
v := q.Get(videoPin)
|
|
if v == "" {
|
|
h.errorLogWrite(w, "forward request video pin V0 absent, or has no value")
|
|
return
|
|
}
|
|
|
|
size, err := strconv.Atoi(v)
|
|
if err != nil {
|
|
h.errorLogWrite(w, "forward request video size can't be conerted to int", "error", err)
|
|
return
|
|
}
|
|
|
|
if size <= 0 {
|
|
h.errorLogWrite(w, "forward request video size invalid", "size", size)
|
|
return
|
|
}
|
|
|
|
resp := map[string]interface{}{"ma": ma, "V0": size}
|
|
|
|
mtsClip, err := io.ReadAll(r.Body)
|
|
if err != nil {
|
|
h.errorLogWrite(w, "could not read forward request body", "error", err)
|
|
return
|
|
}
|
|
defer r.Body.Close()
|
|
|
|
if len(mtsClip)%mts.PacketSize != 0 {
|
|
h.errorLogWrite(w, "invalid clip length", "length", len(mtsClip))
|
|
return
|
|
}
|
|
|
|
// Extract the pure h264.
|
|
h264Clip, err := mts.Extract(mtsClip)
|
|
if err != nil {
|
|
h.errorLogWrite(w, "could not extract h.264 from the MPEG-TS clip", "error", err)
|
|
return
|
|
}
|
|
|
|
for i, frame := range h264Clip.Frames() {
|
|
_, err := rv.Write(frame.Media)
|
|
if err != nil {
|
|
h.errorLogWrite(w, "could not write frame", "no.", i)
|
|
return
|
|
}
|
|
}
|
|
|
|
// Return response to client as JSON.
|
|
jsn, err := json.Marshal(resp)
|
|
if err != nil {
|
|
h.errorLogWrite(w, "could not get json for response", "error", err)
|
|
return
|
|
}
|
|
fmt.Fprint(w, string(jsn))
|
|
}
|
|
|
|
// updateActives updates the actives map based on a string of CSV where each
|
|
// value is a key-value of mac and url i.e. <mac1>=<url1>,<mac2>=<url2>,...
|
|
// If there are pairs in the actives map that do not correspond to any pairs in
|
|
// the provided CSV, they will be removed.
|
|
func (h *forwardHandler) updateActives(v string) {
|
|
pairs := strings.Split(v, ",")
|
|
macs := make([]string, 0, len(pairs))
|
|
for _, p := range pairs {
|
|
pair := strings.Split(p, "=")
|
|
if len(pair) != 2 {
|
|
h.log.Warning("invalid <mac>=<rtmp url> pair", "pair", pair)
|
|
continue
|
|
}
|
|
|
|
m := pair[0]
|
|
macs = append(macs, m)
|
|
if !isMac(m) {
|
|
h.log.Warning("invalid MAC in actives update string", "mac", m)
|
|
continue
|
|
}
|
|
|
|
r := h.getActive(m)
|
|
if r == nil {
|
|
h.addActive(m, pair[1])
|
|
}
|
|
}
|
|
h.removeInactives(macs)
|
|
}
|
|
|
|
// writeError logs an error and writes to w in JSON format.
|
|
func (h *forwardHandler) errorLogWrite(w http.ResponseWriter, msg string, args ...interface{}) {
|
|
h.log.Error(msg, args...)
|
|
w.Header().Add("Content-Type", "application/json")
|
|
fmt.Fprint(w, `{"er":"`+msg+`"}`)
|
|
}
|
|
|
|
// getActive provides the revid pipeline for the provided MAC if active.
|
|
func (h *forwardHandler) getActive(ma string) *revid.Revid {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
v, ok := h.actives[ma]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
return v
|
|
}
|
|
|
|
// addActive adds a new revid pipeline configured with the provided RTMP URL url
|
|
// for the MAC ma into the actives map.
|
|
func (h *forwardHandler) addActive(ma, url string) error {
|
|
h.mu.Lock()
|
|
cfg := config.Config{
|
|
Logger: h.log,
|
|
Input: config.InputManual,
|
|
InputCodec: codecutil.H264_AU,
|
|
Outputs: []uint8{config.OutputRTMP},
|
|
RTMPURL: url,
|
|
LogLevel: logging.Debug,
|
|
}
|
|
|
|
rv, err := revid.New(cfg, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("coult not initialise revid: %w", err)
|
|
}
|
|
h.actives[ma] = rv
|
|
err = rv.Start()
|
|
if err != nil {
|
|
return fmt.Errorf("could not start revid pipeline")
|
|
}
|
|
h.mu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
// removeInactives removes all pairs in the active map that do not correspond to
|
|
// those found in actives.
|
|
func (h *forwardHandler) removeInactives(actives []string) {
|
|
h.mu.Lock()
|
|
actives:
|
|
for k := range h.actives {
|
|
for _, m := range actives {
|
|
if k == m {
|
|
continue actives
|
|
}
|
|
}
|
|
delete(h.actives, k)
|
|
}
|
|
h.mu.Unlock()
|
|
}
|
|
|
|
// isActive checks to see if the provided MAC is in the actives map.
|
|
func (h *forwardHandler) isActive(ma string) bool {
|
|
h.mu.Lock()
|
|
_, ok := h.actives[ma]
|
|
h.mu.Unlock()
|
|
return ok
|
|
}
|
|
|
|
func main() {
|
|
host := flag.String("host", defaultHost, "Host ip to run video forwarder on.")
|
|
port := flag.String("port", defaultPort, "Port to run video forwarder on.")
|
|
flag.Parse()
|
|
|
|
if *host == "" || net.ParseIP(*host) == nil {
|
|
panic(fmt.Sprintf("invalid host, host: %s", *host))
|
|
}
|
|
|
|
// Create lumberjack logger to handle logging to file.
|
|
fileLog := &lumberjack.Logger{
|
|
Filename: logPath,
|
|
MaxSize: logMaxSize,
|
|
MaxBackups: logMaxBackup,
|
|
MaxAge: logMaxAge,
|
|
}
|
|
|
|
// Create netlogger to handle logging to cloud.
|
|
netLog := netlogger.New()
|
|
|
|
// Create logger that we call methods on to log, which in turn writes to the
|
|
// lumberjack and netloggers.
|
|
log := logging.New(logVerbosity, io.MultiWriter(fileLog, netLog), logSuppress)
|
|
|
|
log.Debug("initialising netsender client")
|
|
ns, err := netsender.New(log, nil, nil, nil)
|
|
if err != nil {
|
|
log.Fatal(pkg + "could not initialise netsender client: " + err.Error())
|
|
}
|
|
|
|
fh := &forwardHandler{log: log, actives: map[string]*revid.Revid{}}
|
|
|
|
log.Debug("beginning main loop")
|
|
readySig := make(chan struct{})
|
|
go run(ns, log, netLog, fh, readySig)
|
|
|
|
// We won't start serving until we've talked to the cloud for configuration.
|
|
<-readySig
|
|
http.Handle("/recv", fh)
|
|
http.ListenAndServe(*host+":"+*port, nil)
|
|
}
|
|
|
|
// run starts the main loop. This will run netsender on every pass of the loop
|
|
// (sleeping inbetween), check vars, and if changed, update configuration as
|
|
// appropriate.
|
|
func run(ns *netsender.Sender, l logging.Logger, nl *netlogger.Logger, fh *forwardHandler, sig chan struct{}) {
|
|
var vs int
|
|
for {
|
|
l.Debug("running netsender")
|
|
err := ns.Run()
|
|
if err != nil {
|
|
l.Warning(pkg+"Run Failed. Retrying...", "error", err.Error())
|
|
time.Sleep(netSendRetryTime)
|
|
continue
|
|
}
|
|
|
|
l.Debug("sending logs")
|
|
err = nl.Send(ns)
|
|
if err != nil {
|
|
l.Warning(pkg+"Logs could not be sent", "error", err.Error())
|
|
}
|
|
|
|
l.Debug("checking varsum")
|
|
newVs := ns.VarSum()
|
|
if vs == newVs {
|
|
sleep(ns, l)
|
|
continue
|
|
}
|
|
vs = newVs
|
|
l.Info("varsum changed", "vs", vs)
|
|
|
|
l.Debug("getting new vars")
|
|
vars, err := ns.Vars()
|
|
if err != nil {
|
|
l.Error(pkg+"netSender failed to get vars", "error", err.Error())
|
|
time.Sleep(netSendRetryTime)
|
|
continue
|
|
}
|
|
l.Debug("got new vars", "vars", vars)
|
|
|
|
// Check the actives variable and update the forward handlers active map.
|
|
v, ok := vars["Actives"]
|
|
if !ok {
|
|
l.Warning("no actives variable in var map", "vars", vars)
|
|
sleep(ns, l)
|
|
continue
|
|
}
|
|
fh.updateActives(v)
|
|
|
|
// sig is closed on the first time we get here, indicating to external routine
|
|
// that we've got our configuration from the cloud.
|
|
select {
|
|
case <-sig:
|
|
default:
|
|
close(sig)
|
|
}
|
|
|
|
sleep(ns, l)
|
|
}
|
|
}
|
|
|
|
// sleep uses a delay to halt the program based on the monitoring period
|
|
// netsender parameter (mp) defined in the netsender.conf config.
|
|
func sleep(ns *netsender.Sender, l logging.Logger) {
|
|
l.Debug("sleeping")
|
|
t, err := strconv.Atoi(ns.Param("mp"))
|
|
if err != nil {
|
|
l.Error(pkg+"could not get sleep time, using default", "error", err)
|
|
t = defaultSleepTime
|
|
}
|
|
time.Sleep(time.Duration(t) * time.Second)
|
|
l.Debug("finished sleeping")
|
|
}
|
|
|
|
func isMac(m string) bool {
|
|
if len(m) != 17 || m == "00:00:00:00:00:00" {
|
|
return false
|
|
}
|
|
|
|
for i := 0; i <= 15; i++ {
|
|
if (i+1)%3 == 0 && m[i] != ':' {
|
|
return false
|
|
}
|
|
|
|
if (3-i)%3 != 0 {
|
|
continue
|
|
}
|
|
|
|
_, err := strconv.ParseUint(m[i:i+2], 16, 64)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|