av/cmd/vidforward/main.go

334 lines
7.4 KiB
Go
Raw Normal View History

package main
import (
"encoding/json"
"flag"
"fmt"
"io"
"net"
"net/http"
"strconv"
"strings"
"sync"
"time"
"bitbucket.org/ausocean/av/codec/codecutil"
"bitbucket.org/ausocean/av/container/mts"
"bitbucket.org/ausocean/av/revid"
"bitbucket.org/ausocean/av/revid/config"
"bitbucket.org/ausocean/iot/pi/netlogger"
"bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/logging"
"gopkg.in/natefinch/lumberjack.v2"
)
const (
defaultPort = "8080"
defaultHost = ""
)
// Logging configuration.
const (
logPath = "/var/log/netsender/netsender.log"
logMaxSize = 500 // MB
logMaxBackup = 10
logMaxAge = 28 // days
logVerbosity = logging.Info
logSuppress = false
)
// Misc constants.
const (
netSendRetryTime = 5 * time.Second
defaultSleepTime = 60 // Seconds
profilePath = "rv.prof"
pkg = "rv: "
runPreDelay = 20 * time.Second
)
type forwardHandler struct {
actives map[string]*revid.Revid
log logging.Logger
mu sync.Mutex
}
func (h *forwardHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.log.Debug("recv handler")
q := r.URL.Query()
ma := q.Get("ma")
rv := h.getActive(ma)
if r == nil {
h.errorLogWrite(w, "forward request mac not active, doing nothing", "mac", ma)
return
}
const videoPin = "V0"
v := q.Get(videoPin)
if v == "" {
h.errorLogWrite(w, "forward request video pin V0 absent, or has no value")
return
}
size, err := strconv.Atoi(v)
if err != nil {
h.errorLogWrite(w, "forward request video size can't be conerted to int", "error", err)
return
}
if size <= 0 {
h.errorLogWrite(w, "forward request video size invalid", "size", size)
return
}
resp := map[string]interface{}{"ma": ma, "V0": size}
mtsClip, err := io.ReadAll(r.Body)
if err != nil {
h.errorLogWrite(w, "could not read forward request body", "error", err)
return
}
if len(mtsClip)%mts.PacketSize != 0 {
h.errorLogWrite(w, "invalid clip length", "length", len(mtsClip))
return
}
h264Clip, err := mts.Extract(mtsClip)
if err != nil {
h.errorLogWrite(w, "could not extract h.264 from the MPEG-TS clip", "error", err)
return
}
for i, frame := range h264Clip.Frames() {
_, err := rv.Write(frame.Media)
if err != nil {
h.errorLogWrite(w, "could not write frame", "no.", i)
return
}
}
// Return response to client as JSON
jsn, err := json.Marshal(resp)
if err != nil {
h.errorLogWrite(w, "could not get json for response", "error", err)
return
}
fmt.Fprint(w, string(jsn))
}
// writeError writes an error code in JSON format and logs it if in debug mode.
func (h *forwardHandler) errorLogWrite(w http.ResponseWriter, msg string, args ...interface{}) {
h.log.Error(msg, args...)
w.Header().Add("Content-Type", "application/json")
fmt.Fprint(w, `{"er":"`+msg+`"}`)
}
func (h *forwardHandler) getActive(ma string) *revid.Revid {
h.mu.Lock()
defer h.mu.Unlock()
v, ok := h.actives[ma]
if !ok {
return nil
}
return v
}
func (h *forwardHandler) addActive(ma, url string) error {
h.mu.Lock()
cfg := config.Config{
Logger: h.log,
Input: config.InputManual,
InputCodec: codecutil.H264_AU, // h264 access unit i.e. h264 frame.
Outputs: []uint8{config.OutputRTMP},
RTMPURL: url,
LogLevel: logging.Debug,
FileFPS: 25,
}
rv, err := revid.New(cfg, nil)
if err != nil {
return fmt.Errorf("coult not initialise revid: %w", err)
}
h.actives[ma] = rv
err = rv.Start()
if err != nil {
return fmt.Errorf("could not start revid pipeline")
}
h.mu.Unlock()
return nil
}
func (h *forwardHandler) removeInactives(macs []string) {
h.mu.Lock()
actives:
for k := range h.actives {
for _, m := range macs {
if k == m {
continue actives
}
}
delete(h.actives, k)
}
h.mu.Unlock()
}
func (h *forwardHandler) isActive(ma string) bool {
h.mu.Lock()
_, ok := h.actives[ma]
h.mu.Unlock()
return ok
}
func main() {
host := flag.String("host", defaultHost, "Host ip to run video forwarder on.")
port := flag.String("port", defaultPort, "Port to run video forwarder on.")
flag.Parse()
if *host == "" || net.ParseIP(*host) == nil {
panic(fmt.Sprintf("invalid host, host: %s", *host))
}
// Create lumberjack logger to handle logging to file.
fileLog := &lumberjack.Logger{
Filename: logPath,
MaxSize: logMaxSize,
MaxBackups: logMaxBackup,
MaxAge: logMaxAge,
}
// Create netlogger to handle logging to cloud.
netLog := netlogger.New()
// Create logger that we call methods on to log, which in turn writes to the
// lumberjack and netloggers.
log := logging.New(logVerbosity, io.MultiWriter(fileLog, netLog), logSuppress)
log.Debug("initialising netsender client")
ns, err := netsender.New(log, nil, nil, nil)
if err != nil {
log.Fatal(pkg + "could not initialise netsender client: " + err.Error())
}
fh := &forwardHandler{log: log, actives: map[string]*revid.Revid{}}
log.Debug("beginning main loop")
readySig := make(chan struct{})
go run(ns, log, netLog, fh, readySig)
<-readySig
http.Handle("/recv", fh)
http.ListenAndServe(*host+":"+*port, nil)
}
// run starts the main loop. This will run netsender on every pass of the loop
// (sleeping inbetween), check vars, and if changed, update revid as appropriate.
func run(ns *netsender.Sender, l logging.Logger, nl *netlogger.Logger, fh *forwardHandler, sig chan struct{}) {
var vs int
for {
l.Debug("running netsender")
err := ns.Run()
if err != nil {
l.Warning(pkg+"Run Failed. Retrying...", "error", err.Error())
time.Sleep(netSendRetryTime)
continue
}
l.Debug("sending logs")
err = nl.Send(ns)
if err != nil {
l.Warning(pkg+"Logs could not be sent", "error", err.Error())
}
l.Debug("checking varsum")
newVs := ns.VarSum()
if vs == newVs {
sleep(ns, l)
continue
}
vs = newVs
l.Info("varsum changed", "vs", vs)
l.Debug("getting new vars")
vars, err := ns.Vars()
if err != nil {
l.Error(pkg+"netSender failed to get vars", "error", err.Error())
time.Sleep(netSendRetryTime)
continue
}
l.Debug("got new vars", "vars", vars)
// Check the actives variable to seee what MACs are active.
v, ok := vars["Actives"]
if !ok {
l.Warning("no actives variable in var map", "vars", vars)
}
pairs := strings.Split(v, ",")
macs := make([]string, 0, len(pairs))
for _, p := range pairs {
pair := strings.Split(p, "=")
if len(pair) != 2 {
l.Warning("invalid <mac>=<rtmp url> pair", "pair", pair)
continue
}
m := pair[0]
macs = append(macs, m)
if !isMac(m) {
l.Warning("invalid mac in Actives variable", "mac", m)
continue
}
r := fh.getActive(m)
if r == nil {
fh.addActive(m, pair[1])
}
}
fh.removeInactives(macs)
select {
case <-sig:
default:
close(sig)
}
sleep(ns, l)
}
}
// sleep uses a delay to halt the program based on the monitoring period
// netsender parameter (mp) defined in the netsender.conf config.
func sleep(ns *netsender.Sender, l logging.Logger) {
l.Debug("sleeping")
t, err := strconv.Atoi(ns.Param("mp"))
if err != nil {
l.Error(pkg+"could not get sleep time, using default", "error", err)
t = defaultSleepTime
}
time.Sleep(time.Duration(t) * time.Second)
l.Debug("finished sleeping")
}
func isMac(m string) bool {
if len(m) != 17 || m == "00:00:00:00:00:00" {
return false
}
for i := 0; i <= 15; i++ {
if (i+1)%3 == 0 && m[i] != ':' {
return false
}
if (3-i)%3 != 0 {
continue
}
_, err := strconv.ParseUint(m[i:i+2], 16, 64)
if err != nil {
return false
}
}
return true
}