From 3ea4aff3652b27af8d7cc12ae484a08a840a09d3 Mon Sep 17 00:00:00 2001 From: Saxon Nelson-Milton Date: Sun, 27 Nov 2022 09:57:39 +1030 Subject: [PATCH] cmd/vidforward: restructure code This change creates two new files. Firstly, slate.go, to house slate related functionality. Secondly, utils.go to house generic utilities and helpers. --- cmd/vidforward/main.go | 121 +++++----------------------------------- cmd/vidforward/slate.go | 100 +++++++++++++++++++++++++++++++++ cmd/vidforward/utils.go | 63 +++++++++++++++++++++ 3 files changed, 176 insertions(+), 108 deletions(-) create mode 100644 cmd/vidforward/slate.go create mode 100644 cmd/vidforward/utils.go diff --git a/cmd/vidforward/main.go b/cmd/vidforward/main.go index cd7df336..98107647 100644 --- a/cmd/vidforward/main.go +++ b/cmd/vidforward/main.go @@ -39,9 +39,7 @@ import ( "time" "bitbucket.org/ausocean/av/codec/codecutil" - "bitbucket.org/ausocean/av/codec/h264" "bitbucket.org/ausocean/av/container/mts" - "bitbucket.org/ausocean/av/device/file" "bitbucket.org/ausocean/av/revid" "bitbucket.org/ausocean/av/revid/config" "bitbucket.org/ausocean/iot/pi/netlogger" @@ -107,14 +105,14 @@ func (m *broadcastManager) recv(w http.ResponseWriter, r *http.Request) { ma := MAC(q.Get("ma")) if !m.isActive(ma) { - m.errorLogWrite(w, "forward request mac is not mapped, doing nothing", "mac", ma) + m.errorLogWrite(m.log, w, "forward request mac is not mapped, doing nothing", "mac", ma) time.Sleep(recvErrorDelay) return } // We can't receive video if we're in slate mode. if m.getStatus(ma) == "slate" { - m.errorLogWrite(w, "cannot receive video for this mac, status is slate", "mac", ma) + m.errorLogWrite(m.log, w, "cannot receive video for this mac, status is slate", "mac", ma) time.Sleep(recvErrorDelay) return } @@ -123,7 +121,7 @@ func (m *broadcastManager) recv(w http.ResponseWriter, r *http.Request) { sizeStr := q.Get(videoPin) size, err := strconv.Atoi(sizeStr) if err != nil || size <= 0 { - m.errorLogWrite(w, "invalid video size", "error", err, "size str", sizeStr) + m.errorLogWrite(m.log, w, "invalid video size", "error", err, "size str", sizeStr) return } @@ -132,20 +130,20 @@ func (m *broadcastManager) recv(w http.ResponseWriter, r *http.Request) { mtsClip, err := io.ReadAll(r.Body) if err != nil { - m.errorLogWrite(w, "could not read forward request body", "error", err) + m.errorLogWrite(m.log, w, "could not read forward request body", "error", err) return } defer r.Body.Close() if len(mtsClip)%mts.PacketSize != 0 { - m.errorLogWrite(w, "invalid clip length", "length", len(mtsClip)) + m.errorLogWrite(m.log, w, "invalid clip length", "length", len(mtsClip)) return } // Extract the pure h264 from the MPEG-TS clip. h264Clip, err := mts.Extract(mtsClip) if err != nil { - m.errorLogWrite(w, "could not extract m.264 from the MPEG-TS clip", "error", err) + m.errorLogWrite(m.log, w, "could not extract m.264 from the MPEG-TS clip", "error", err) return } @@ -157,7 +155,7 @@ func (m *broadcastManager) recv(w http.ResponseWriter, r *http.Request) { for i, frame := range h264Clip.Frames() { _, err := rv.Write(frame.Media) if err != nil { - m.errorLogWrite(w, "could not write frame", "no.", i, "error", err) + m.errorLogWrite(m.log, w, "could not write frame", "no.", i, "error", err) return } } @@ -165,7 +163,7 @@ func (m *broadcastManager) recv(w http.ResponseWriter, r *http.Request) { // Return response to client as JSON. jsn, err := json.Marshal(resp) if err != nil { - m.errorLogWrite(w, "could not get json for response", "error", err) + m.errorLogWrite(m.log, w, "could not get json for response", "error", err) return } fmt.Fprint(w, string(jsn)) @@ -180,7 +178,7 @@ func (m *broadcastManager) control(w http.ResponseWriter, r *http.Request) { case http.MethodDelete: m.processRequest(w, r, m.delete) default: - m.errorLogWrite(w, "unhandled http method", "method", r.Method) + m.errorLogWrite(m.log, w, "unhandled http method", "method", r.Method) } } @@ -189,7 +187,7 @@ func (m *broadcastManager) control(w http.ResponseWriter, r *http.Request) { func (m *broadcastManager) processRequest(w http.ResponseWriter, r *http.Request, action func(Broadcast) error) { body, err := io.ReadAll(r.Body) if err != nil { - m.errorLogWrite(w, "could not read request body", "body", r.Body) + m.errorLogWrite(m.log, w, "could not read request body", "body", r.Body) return } defer r.Body.Close() @@ -197,23 +195,16 @@ func (m *broadcastManager) processRequest(w http.ResponseWriter, r *http.Request var broadcast Broadcast err = json.Unmarshal(body, &broadcast) if err != nil { - m.errorLogWrite(w, "could not marshal data", "error", err) + m.errorLogWrite(m.log, w, "could not marshal data", "error", err) return } err = action(broadcast) if err != nil { - m.errorLogWrite(w, "could not perform action", "method", r.Method, "error", err) + m.errorLogWrite(m.log, w, "could not perform action", "method", r.Method, "error", err) } } -// writeError logs an error and writes to w in JSON format. -func (m *broadcastManager) errorLogWrite(w http.ResponseWriter, msg string, args ...interface{}) { - m.log.Error(msg, args...) - w.Header().Add("Content-Type", "application/json") - fmt.Fprint(w, `{"er":"`+msg+`"}`) -} - // getPipeline gets the revid pipeline corresponding to a provided device MAC. func (m *broadcastManager) getPipeline(ma MAC) *revid.Revid { m.mu.Lock() @@ -302,7 +293,7 @@ func (m *broadcastManager) createOrUpdate(broadcast Broadcast) error { // writeSlate routine. errCh := make(chan error) - go m.writeSlate(broadcast.RV, errCh, signalCh) + go writeSlate(broadcast.RV, errCh, signalCh, m.log) // We'll watch out for any errors that happen within a 5 second window. This // will indicate something seriously wrong with init, like a missing file etc. @@ -331,70 +322,6 @@ func (m *broadcastManager) createOrUpdate(broadcast Broadcast) error { return nil } -// writeSlate is a routine that employs a file input device and h264 lexer to -// write a h264 encoded slate image to the provided revid pipeline. -func (m *broadcastManager) writeSlate(rv *revid.Revid, errCh chan error, exitSignal chan struct{}) { - m.log.Info("writing slate") - const ( - // This is temporary and will eventually be part of a broadcast configuration - // where the remote vidforward API user can provide the slate image. - slateFileName = "slate.h264" - - // Assume 25fps until this becomes configurable. - slateFrameRate = 25 - - loopSetting = true - frameDelay = time.Second / slateFrameRate - ) - - fileInput := file.NewWith(m.log, slateFileName, loopSetting) - err := fileInput.Start() - if err != nil { - errCh <- err - return - } - - // This will wait for a signal from the provided slateExitSignal (or from a - // timeout) to stop writing the slate by "Stopping" the file input which will - // terminate the Lex function. - go func() { - slateTimeoutTimer := time.NewTimer(24 * time.Hour) - select { - case <-slateTimeoutTimer.C: - m.log.Warning("slate timeout") - case <-exitSignal: - m.log.Info("slate exist signal") - } - m.log.Info("stopping file input") - fileInput.Stop() - }() - - // Begin lexing the slate file and send frames to rv pipeline. We'll stay in - // here until file input closes or there's an unexpected error. - err = h264.Lex(rv, fileInput, frameDelay) - - // If we get to this point, it means that the we've finished lexing for some - // reason; let's figure out why. - select { - // The only reason we'd get a receive on errCh from this side is if its been - // closed. This means that we've exceeded the "startup error" period, and that - // either the error is normal from stopping the input, or we can no longer inform - // the caller and just need to log the problem. - case <-errCh: - if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { - m.log.Debug("got expected error", "error", err) - return - } - m.log.Error("got unexpected error", "error", err) - - // This means that a problem occured pretty early in lexing. - default: - m.log.Error("unexpected error during lex startup", "error", err) - errCh <- err - } - m.log.Error("finished writing slate") -} - // delete removes a broadcast from the record. func (m *broadcastManager) delete(broadcast Broadcast) error { m.mu.Lock() @@ -437,25 +364,3 @@ func main() { http.HandleFunc("/control", bm.control) http.ListenAndServe(*host+":"+*port, nil) } - -func isMac(m string) bool { - if len(m) != 17 || m == "00:00:00:00:00:00" { - return false - } - - for i := 0; i <= 15; i++ { - if (i+1)%3 == 0 && m[i] != ':' { - return false - } - - if (3-i)%3 != 0 { - continue - } - - _, err := strconv.ParseUint(m[i:i+2], 16, 64) - if err != nil { - return false - } - } - return true -} diff --git a/cmd/vidforward/slate.go b/cmd/vidforward/slate.go new file mode 100644 index 00000000..838e98b9 --- /dev/null +++ b/cmd/vidforward/slate.go @@ -0,0 +1,100 @@ +/* +DESCRIPTION + slate.go houses vidforward slate functionality including the slate writing + routine and HTTP request handlers for receiving of new slate videos. + +AUTHORS + Saxon A. Nelson-Milton + +LICENSE + Copyright (C) 2022 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + +package main + +import ( + "errors" + "io" + "time" + + "bitbucket.org/ausocean/av/codec/h264" + "bitbucket.org/ausocean/av/device/file" + "bitbucket.org/ausocean/utils/logging" +) + +// writeSlate is a routine that employs a file input device and h264 lexer to +// write a h264 encoded slate image to the provided revid pipeline. +func writeSlate(dst io.Writer, errCh chan error, exitSignal chan struct{}, log logging.Logger) { + log.Info("writing slate") + const ( + // This is temporary and will eventually be part of a broadcast configuration + // where the remote vidforward API user can provide the slate image. + slateFileName = "slate.h264" + + // Assume 25fps until this becomes configurable. + slateFrameRate = 25 + + loopSetting = true + frameDelay = time.Second / slateFrameRate + ) + + fileInput := file.NewWith(log, slateFileName, loopSetting) + err := fileInput.Start() + if err != nil { + errCh <- err + return + } + + // This will wait for a signal from the provided slateExitSignal (or from a + // timeout) to stop writing the slate by "Stopping" the file input which will + // terminate the Lex function. + go func() { + slateTimeoutTimer := time.NewTimer(24 * time.Hour) + select { + case <-slateTimeoutTimer.C: + log.Warning("slate timeout") + case <-exitSignal: + log.Info("slate exist signal") + } + log.Info("stopping file input") + fileInput.Stop() + }() + + // Begin lexing the slate file and send frames to rv pipeline. We'll stay in + // here until file input closes or there's an unexpected error. + err = h264.Lex(dst, fileInput, frameDelay) + + // If we get to this point, it means that the we've finished lexing for some + // reason; let's figure out why. + select { + // The only reason we'd get a receive on errCh from this side is if its been + // closed. This means that we've exceeded the "startup error" period, and that + // either the error is normal from stopping the input, or we can no longer inform + // the caller and just need to log the problem. + case <-errCh: + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { + log.Debug("got expected error", "error", err) + return + } + log.Error("got unexpected error", "error", err) + + // This means that a problem occured pretty early in lexing. + default: + log.Error("unexpected error during lex startup", "error", err) + errCh <- err + } + log.Error("finished writing slate") +} diff --git a/cmd/vidforward/utils.go b/cmd/vidforward/utils.go new file mode 100644 index 00000000..63e4d00d --- /dev/null +++ b/cmd/vidforward/utils.go @@ -0,0 +1,63 @@ +/* +DESCRIPTION + utils.go houses generic vidforward utilities and helper functions. + +AUTHORS + Saxon A. Nelson-Milton + +LICENSE + Copyright (C) 2022 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + +package main + +import ( + "fmt" + "net/http" + "strconv" + + "bitbucket.org/ausocean/utils/logging" +) + +// writeError logs an error and writes to w in JSON format. +func (m *broadcastManager) errorLogWrite(log logging.Logger, w http.ResponseWriter, msg string, args ...interface{}) { + log.Error(msg, args...) + w.Header().Add("Content-Type", "application/json") + fmt.Fprint(w, `{"er":"`+msg+`"}`) +} + +// isMac returns true if the provided string is a valid mac address. +func isMac(m string) bool { + if len(m) != 17 || m == "00:00:00:00:00:00" { + return false + } + + for i := 0; i <= 15; i++ { + if (i+1)%3 == 0 && m[i] != ':' { + return false + } + + if (3-i)%3 != 0 { + continue + } + + _, err := strconv.ParseUint(m[i:i+2], 16, 64) + if err != nil { + return false + } + } + return true +}