/* DESCRIPTION vidforward is a service for receiving video from cameras and then forwarding to youtube. By acting as the RTMP encoder (instead of the camera) vidforward can enable persistent streams by sending slate images during camera downtime. AUTHORS Saxon A. Nelson-Milton LICENSE Copyright (C) 2022 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. */ package main import ( "encoding/json" "errors" "flag" "fmt" "io" "io/ioutil" "net" "net/http" "os" "reflect" "strconv" "strings" "sync" "time" "bitbucket.org/ausocean/av/cmd/vidforward/global" "bitbucket.org/ausocean/av/container/mts" "bitbucket.org/ausocean/av/revid" "bitbucket.org/ausocean/utils/logging" "gopkg.in/natefinch/lumberjack.v2" ) // This is the path to the vidforward configuration. // This contains parameters such as log level and logging filters. const configFileName = "/etc/vidforward/config.json" // Server defaults. const ( defaultPort = "8080" defaultHost = "" ) // Logging configuration. const ( logPath = "/var/log/vidforward/vidforward.log" logMaxSize = 500 // MB logMaxBackup = 10 logMaxAge = 28 // days logSuppress = true ) // recvErrorDelay is a delay used when there's recv issues. It is intended to // prevent spamming from a single client. const recvErrorDelay = 7 * time.Second type MAC string // The possible states for a broadcast. const ( statusActive = "active" statusSlate = "slate" statusCreate = "create" statusPlay = "play" ) // Broadcast is representative of a broadcast to be forwarded. type Broadcast struct { mac MAC // MAC address of the device from which the video is being received. urls []string // The destination youtube RTMP URLs. status string // The broadcast status i.e. active or slate. rv *revid.Revid // The revid pipeline which will handle forwarding to youtube. } // equal checks to see if the broadcast is equal to another broadcast. // NOTE: This is not a deep equal, and is only used to check if a broadcast // should be updated. func (b *Broadcast) equal(other Broadcast) bool { return b.mac == other.mac && b.status == other.status && reflect.DeepEqual(b.urls, other.urls) } // broadcastManager manages a map of Broadcasts we expect to be forwarding video // for. The broadcastManager is communicated with through a series of HTTP request // handlers. There is a basic REST API through which we can add/delete broadcasts, // and a recv handler which is invoked when a camera wishes to get its video // forwarded to youtube. type broadcastManager struct { broadcasts map[MAC]*Broadcast slateExitSignals map[MAC]chan struct{} // Used to signal to stop writing slate image. lastLoggedNonActive map[MAC]time.Time // Used to log non-active MACs every minute. log logging.Logger dogNotifier *watchdogNotifier mu sync.Mutex } // newBroadcastManager returns a new broadcastManager with the provided logger. func newBroadcastManager(l logging.Logger) (*broadcastManager, error) { m := &broadcastManager{ log: l, broadcasts: make(map[MAC]*Broadcast), slateExitSignals: make(map[MAC]chan struct{}), } notifier, err := newWatchdogNotifier(l, terminationCallback(m)) if err != nil { return nil, err } m.dogNotifier = notifier return m, nil } // terminationCallback provides a callback that saves the provided // broadcastManagers state. func terminationCallback(m *broadcastManager) func() { return func() { err := m.save() if err != nil { m.log.Error("could not save on notifier termination signal", "error", err) return } m.log.Info("successfully saved broadcast manager state on termination signal") } } // loadConfig loads the vidforward configuration file. This primarily concerns logging // configuration for the time being, with the intended use case of debugging. func (m *broadcastManager) loadConfig() error { m.mu.Lock() defer m.mu.Unlock() m.log.Info("loading logger config file") data, err := ioutil.ReadFile(configFileName) if err != nil { return fmt.Errorf("could not read config file: %w", err) } var cfg struct { LogLevel string `json:"LogLevel"` LogSuppress bool `json:"LogSuppress"` LogCallerFilters []string `json:"LogCallerFilters"` } if err = json.Unmarshal(data, &cfg); err != nil { return fmt.Errorf("could not unmarshal config file: %w", err) } m.log.Debug("logger config loaded","cfg",cfg) m.log.(*logging.JSONLogger).SetLevel(map[string]int8{ "debug": logging.Debug, "info": logging.Info, "warning": logging.Warning, "error": logging.Error, "fatal": logging.Fatal, }[cfg.LogLevel]) m.log.(*logging.JSONLogger).SetSuppress(cfg.LogSuppress) m.log.(*logging.JSONLogger).SetCallerFilters(cfg.LogCallerFilters...) return nil } // This is a callback that can be used by file watchers to reload the config. func (m *broadcastManager) onConfigChange() { err := m.loadConfig() if err != nil { m.log.Error("could not load config", "error", err) return } } // recvHandler handles recv requests for video forwarding. The MAC is firstly // checked to ensure it is "active" i.e. should be sending data, and then the // video is extracted from the request body and provided to the revid pipeline // corresponding to said MAC. // Clips of MPEG-TS h264 are the only accepted format and codec. func (m *broadcastManager) recv(w http.ResponseWriter, r *http.Request) { done := m.dogNotifier.handlerInvoked("recv") defer done() q := r.URL.Query() ma := MAC(q.Get("ma")) // Check that we're not receiving video when we shouldn't be. There's // two conditions when this can happen; when the MAC is not mapped to a // broadcast, or when the broadcast is in slate mode. // It's expected this might happen a little bit under normal operation. // It's difficult to get the camera power timing right, so we might // receive a request before the camera has been registered, or after // we've transitioned into slate mode. // If this happens too much however, it may indicate a problem. var reason string switch { case !m.isActive(ma): reason = "forward request mac is not mapped, doing nothing" fallthrough case m.getStatus(ma) == statusSlate: if reason == "" { reason = "cannot receive video for this mac, status is slate" } // We don't want to clutter the logs so only log non-active MACs every // minute. const logNonActiveInternal = 1 * time.Minute last, ok := m.lastLoggedNonActive[ma] if !ok || ok && time.Now().Sub(last) > logNonActiveInternal { m.errorLogWrite(m.log, w, reason, "mac", ma) m.lastLoggedNonActive[ma] = time.Now() } // Stall the client with a delay to prevent spamming. Probably cause timeout // on client. time.Sleep(recvErrorDelay) return default: // Continue (seems like mac is active and we're not in slate.) } const videoPin = "V0" sizeStr := q.Get(videoPin) size, err := strconv.Atoi(sizeStr) if err != nil || size <= 0 { m.errorLogWrite(m.log, w, "invalid video size", "error", err, "size str", sizeStr) return } // Prepare HTTP response with received video size and device mac. resp := map[string]interface{}{"ma": ma, "V0": size} mtsClip, err := io.ReadAll(r.Body) if err != nil { m.errorLogWrite(m.log, w, "could not read forward request body", "error", err) return } defer r.Body.Close() if len(mtsClip)%mts.PacketSize != 0 { m.errorLogWrite(m.log, w, "invalid clip length", "length", len(mtsClip)) return } // Extract the pure h264 from the MPEG-TS clip. h264Clip, err := mts.Extract(mtsClip) if err != nil { m.errorLogWrite(m.log, w, "could not extract m.264 from the MPEG-TS clip", "error", err) return } rv, err := m.getPipeline(ma) if err != nil { m.errorLogWrite(m.log, w, "could not get revid pipeline","mac",ma,"error",err) return } for i, frame := range h264Clip.Frames() { _, err := rv.Write(frame.Media) if err != nil { m.errorLogWrite(m.log, w, "could not write frame", "no.", i, "error", err) return } } // Return response to client as JSON. jsn, err := json.Marshal(resp) if err != nil { m.errorLogWrite(m.log, w, "could not get json for response", "error", err) return } fmt.Fprint(w, string(jsn)) } // control handles control API requests. func (m *broadcastManager) control(w http.ResponseWriter, r *http.Request) { done := m.dogNotifier.handlerInvoked("control") defer done() m.log.Info("control request", "method", r.Method) switch r.Method { case http.MethodPut: m.processRequest(w, r, m.createOrUpdate) case http.MethodDelete: m.processRequest(w, r, m.delete) default: m.errorLogWrite(m.log, w, "unhandled http method", "method", r.Method) } } // slate handles slate API requests to upload a new slate video. func (m *broadcastManager) slate(w http.ResponseWriter, r *http.Request) { done := m.dogNotifier.handlerInvoked("slate") defer done() if r.Method != http.MethodPost { http.Error(w, "invalid request method", http.StatusMethodNotAllowed) return } file, _, err := r.FormFile("slate-file") if err != nil { m.errorLogWrite(m.log, w, "could not get slate file from form", "error", err) return } defer file.Close() // This will overwrite the slate file if it already exists. dst, err := os.OpenFile(slateFileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666) if err != nil { m.errorLogWrite(m.log, w, "could not open slate file", "error", err) return } defer dst.Close() n, err := io.Copy(dst, file) if err != nil { m.errorLogWrite(m.log, w, "could not copy slate file", "error", err) return } // Return response to client as JSON. jsn, err := json.Marshal(map[string]interface{}{"size": n}) if err != nil { m.errorLogWrite(m.log, w, "could not get json for response", "error", err) return } fmt.Fprint(w, string(jsn)) } // processRequest unmarshals the broadcast data object from the request into // a Broadcast value, and then performs the provided action with that value. func (m *broadcastManager) processRequest(w http.ResponseWriter, r *http.Request, action func(Broadcast) error) { body, err := io.ReadAll(r.Body) if err != nil { m.errorLogWrite(m.log, w, "could not read request body", "body", r.Body) return } defer r.Body.Close() var broadcast Broadcast err = json.Unmarshal(body, &broadcast) if err != nil { m.errorLogWrite(m.log, w, "could not marshal data", "error", err) return } err = action(broadcast) if err != nil { m.errorLogWrite(m.log, w, "could not perform action", "method", r.Method, "error", err) return } err = m.save() if err != nil { m.errorLogWrite(m.log, w, "could not save manager state", "error", err) } } // getPipeline gets the revid pipeline corresponding to a provided device MAC. // If it hasn't been created yet, it's created, and if it hasn't been started yet // (or just created) then it is started. func (m *broadcastManager) getPipeline(ma MAC) (*revid.Revid, error) { m.mu.Lock() defer m.mu.Unlock() b, ok := m.broadcasts[ma] if !ok { panic("shouldn't be getting pipeline if this mac isn't registered") } var err error b.rv, err = m.initOrStartPipeline(b.rv, b.urls) if err != nil { return nil, fmt.Errorf("could not init or start pipeline: %v",err) } return b.rv, nil } // initOrStartPipeline ensures that provided Revid pointer points to an // initialised and running revid pipeline. func (m *broadcastManager) initOrStartPipeline(rv *revid.Revid, urls []string) (*revid.Revid, error){ var err error if rv == nil { rv, err = newRevid(m.log, urls) if err != nil { return nil, fmt.Errorf("could not create new revid: %v",err) } } if !rv.Running() { err = rv.Start() if err != nil { return nil, fmt.Errorf("could not start revid pipeline: %v",err) } } return rv, nil } // getStatus gets the broadcast's status corresponding to the provided MAC. func (m *broadcastManager) getStatus(ma MAC) string { m.mu.Lock() defer m.mu.Unlock() v, ok := m.broadcasts[ma] if !ok { return "" } return v.status } // isActive returns true if a MAC is registered to the broadcast manager. func (m *broadcastManager) isActive(ma MAC) bool { m.mu.Lock() defer m.mu.Unlock() _, ok := m.broadcasts[ma] return ok } // createOrUpdate creates or updates a Broadcast record. If the record already // exists, it will be updated with the new data. If it doesn't exist, a new // revid pipeline will be created and started. Actions occur according to the // status field of the broadcast i.e. whether we expect data from a source // or write the slate image. func (m *broadcastManager) createOrUpdate(broadcast Broadcast) error { m.log.Debug("create or update","mac",broadcast.mac) m.mu.Lock() defer m.mu.Unlock() // Try to get any old broadcasts for the provided MAC. maybeOld, ok := m.broadcasts[broadcast.mac] // If there's no old broadcast, we need to create a new revid pipeline. if !ok { maybeOld = &broadcast } var err error maybeOld.rv, err = m.initOrStartPipeline(maybeOld.rv, broadcast.urls) if err != nil { return fmt.Errorf("could not get revid pipeline: %w", err) } // If the URLS have changed, we need to update the revid pipeline. // We won't enter this if we've just created a new revid pipeline. if !reflect.DeepEqual(maybeOld.urls, broadcast.urls) { m.log.Debug("RTMP URLs have changed, updating pipeline config","mac",broadcast.mac,"old",maybeOld.urls,"new",broadcast.urls) urls := strings.Join(broadcast.urls, ",") err := maybeOld.rv.Update(map[string]string{"RTMPURL": urls}) if err != nil { return fmt.Errorf("could not update revid: %w", err) } } // If the status has changed, we need to update accordingly. // i.e. if the status is slate, we need to start writing the slate image. // We won't enter this if we've just created a new revid pipeline. if maybeOld.status != broadcast.status { m.log.Debug("status has changed, starting or stopping slate","mac",broadcast.mac,"old",maybeOld.status,"new",broadcast.status) switch broadcast.status { case statusActive, statusPlay, statusCreate: m.log.Info("updating configuration for mac", "mac", broadcast.mac) signal, ok := m.slateExitSignals[broadcast.mac] if ok { close(signal) delete(m.slateExitSignals, broadcast.mac) } case statusSlate: m.log.Info("slate request") // If there's a signal channel it means that we're already writing the slate // image and theres nothing to do, so return. _, ok := m.slateExitSignals[broadcast.mac] if ok { m.log.Warning("already writing slate") return nil } // First create a signal that can be used to stop the slate writing routine. // This will be provided to the writeSlate routine below. signalCh := make(chan struct{}) m.slateExitSignals[broadcast.mac] = signalCh err = writeSlateAndCheckErrors(maybeOld.rv, signalCh, m.log) if err != nil { return fmt.Errorf("could not write slate and check for errors: %w", err) } default: return fmt.Errorf("unknown status string: %s", broadcast.status) } } // We need to give the revid pipeline to the new broadcast record. broadcast.rv = maybeOld.rv // And then replace the old record with the new one in the map. m.broadcasts[broadcast.mac] = &broadcast return nil } // delete removes a broadcast from the record. func (m *broadcastManager) delete(broadcast Broadcast) error { m.mu.Lock() b, ok := m.broadcasts[broadcast.mac] if !ok { return errors.New("no broadcast by that mac in record") } b.rv.Stop() delete(m.broadcasts, broadcast.mac) m.mu.Unlock() return nil } func main() { host := flag.String("host", defaultHost, "Host IP to run video forwarder on.") port := flag.String("port", defaultPort, "Port to run video forwarder on.") flag.Parse() if *host == "" || net.ParseIP(*host) == nil { panic(fmt.Sprintf("invalid host, host: %s", *host)) } // Create lumberjack logger to handle logging to file. fileLog := &lumberjack.Logger{ Filename: logPath, MaxSize: logMaxSize, MaxBackups: logMaxBackup, MaxAge: logMaxAge, } // Create logger that we call methods on to log, which in turn writes to the // lumberjack and netloggers. log := logging.New(loggingLevel, io.MultiWriter(fileLog), logSuppress) global.SetLogger(log) m, err := newBroadcastManager(log) if err != nil { log.Fatal("could not create new broadcast manager", "error", err) } // Try to load any previous state. There may be a previous state if the // watchdog did a process restart. err = m.load() if err != nil { log.Warning("could not load previous state", "error", err) } // Try to load the config file. err = m.loadConfig() if err != nil { log.Warning("could not load config file", "error", err) } // Set up a file watcher to watch the config file. This will allow us // to perform updates to configuration while the service is running. watchFile(configFileName, m.onConfigChange, log) http.HandleFunc("/recv", m.recv) http.HandleFunc("/control", m.control) http.HandleFunc("/slate", m.slate) go m.dogNotifier.notify() log.Info("listening", "host", *host, "port", *port) http.ListenAndServe(*host+":"+*port, nil) }