mirror of https://bitbucket.org/ausocean/av.git
cmd/vidforward: restructure code
This change creates two new files. Firstly, slate.go, to house slate related functionality. Secondly, utils.go to house generic utilities and helpers.
This commit is contained in:
parent
0e6fb95461
commit
3ea4aff365
|
@ -39,9 +39,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"bitbucket.org/ausocean/av/codec/codecutil"
|
"bitbucket.org/ausocean/av/codec/codecutil"
|
||||||
"bitbucket.org/ausocean/av/codec/h264"
|
|
||||||
"bitbucket.org/ausocean/av/container/mts"
|
"bitbucket.org/ausocean/av/container/mts"
|
||||||
"bitbucket.org/ausocean/av/device/file"
|
|
||||||
"bitbucket.org/ausocean/av/revid"
|
"bitbucket.org/ausocean/av/revid"
|
||||||
"bitbucket.org/ausocean/av/revid/config"
|
"bitbucket.org/ausocean/av/revid/config"
|
||||||
"bitbucket.org/ausocean/iot/pi/netlogger"
|
"bitbucket.org/ausocean/iot/pi/netlogger"
|
||||||
|
@ -107,14 +105,14 @@ func (m *broadcastManager) recv(w http.ResponseWriter, r *http.Request) {
|
||||||
ma := MAC(q.Get("ma"))
|
ma := MAC(q.Get("ma"))
|
||||||
|
|
||||||
if !m.isActive(ma) {
|
if !m.isActive(ma) {
|
||||||
m.errorLogWrite(w, "forward request mac is not mapped, doing nothing", "mac", ma)
|
m.errorLogWrite(m.log, w, "forward request mac is not mapped, doing nothing", "mac", ma)
|
||||||
time.Sleep(recvErrorDelay)
|
time.Sleep(recvErrorDelay)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// We can't receive video if we're in slate mode.
|
// We can't receive video if we're in slate mode.
|
||||||
if m.getStatus(ma) == "slate" {
|
if m.getStatus(ma) == "slate" {
|
||||||
m.errorLogWrite(w, "cannot receive video for this mac, status is slate", "mac", ma)
|
m.errorLogWrite(m.log, w, "cannot receive video for this mac, status is slate", "mac", ma)
|
||||||
time.Sleep(recvErrorDelay)
|
time.Sleep(recvErrorDelay)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -123,7 +121,7 @@ func (m *broadcastManager) recv(w http.ResponseWriter, r *http.Request) {
|
||||||
sizeStr := q.Get(videoPin)
|
sizeStr := q.Get(videoPin)
|
||||||
size, err := strconv.Atoi(sizeStr)
|
size, err := strconv.Atoi(sizeStr)
|
||||||
if err != nil || size <= 0 {
|
if err != nil || size <= 0 {
|
||||||
m.errorLogWrite(w, "invalid video size", "error", err, "size str", sizeStr)
|
m.errorLogWrite(m.log, w, "invalid video size", "error", err, "size str", sizeStr)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -132,20 +130,20 @@ func (m *broadcastManager) recv(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
mtsClip, err := io.ReadAll(r.Body)
|
mtsClip, err := io.ReadAll(r.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.errorLogWrite(w, "could not read forward request body", "error", err)
|
m.errorLogWrite(m.log, w, "could not read forward request body", "error", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer r.Body.Close()
|
defer r.Body.Close()
|
||||||
|
|
||||||
if len(mtsClip)%mts.PacketSize != 0 {
|
if len(mtsClip)%mts.PacketSize != 0 {
|
||||||
m.errorLogWrite(w, "invalid clip length", "length", len(mtsClip))
|
m.errorLogWrite(m.log, w, "invalid clip length", "length", len(mtsClip))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Extract the pure h264 from the MPEG-TS clip.
|
// Extract the pure h264 from the MPEG-TS clip.
|
||||||
h264Clip, err := mts.Extract(mtsClip)
|
h264Clip, err := mts.Extract(mtsClip)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.errorLogWrite(w, "could not extract m.264 from the MPEG-TS clip", "error", err)
|
m.errorLogWrite(m.log, w, "could not extract m.264 from the MPEG-TS clip", "error", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -157,7 +155,7 @@ func (m *broadcastManager) recv(w http.ResponseWriter, r *http.Request) {
|
||||||
for i, frame := range h264Clip.Frames() {
|
for i, frame := range h264Clip.Frames() {
|
||||||
_, err := rv.Write(frame.Media)
|
_, err := rv.Write(frame.Media)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.errorLogWrite(w, "could not write frame", "no.", i, "error", err)
|
m.errorLogWrite(m.log, w, "could not write frame", "no.", i, "error", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -165,7 +163,7 @@ func (m *broadcastManager) recv(w http.ResponseWriter, r *http.Request) {
|
||||||
// Return response to client as JSON.
|
// Return response to client as JSON.
|
||||||
jsn, err := json.Marshal(resp)
|
jsn, err := json.Marshal(resp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.errorLogWrite(w, "could not get json for response", "error", err)
|
m.errorLogWrite(m.log, w, "could not get json for response", "error", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
fmt.Fprint(w, string(jsn))
|
fmt.Fprint(w, string(jsn))
|
||||||
|
@ -180,7 +178,7 @@ func (m *broadcastManager) control(w http.ResponseWriter, r *http.Request) {
|
||||||
case http.MethodDelete:
|
case http.MethodDelete:
|
||||||
m.processRequest(w, r, m.delete)
|
m.processRequest(w, r, m.delete)
|
||||||
default:
|
default:
|
||||||
m.errorLogWrite(w, "unhandled http method", "method", r.Method)
|
m.errorLogWrite(m.log, w, "unhandled http method", "method", r.Method)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -189,7 +187,7 @@ func (m *broadcastManager) control(w http.ResponseWriter, r *http.Request) {
|
||||||
func (m *broadcastManager) processRequest(w http.ResponseWriter, r *http.Request, action func(Broadcast) error) {
|
func (m *broadcastManager) processRequest(w http.ResponseWriter, r *http.Request, action func(Broadcast) error) {
|
||||||
body, err := io.ReadAll(r.Body)
|
body, err := io.ReadAll(r.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.errorLogWrite(w, "could not read request body", "body", r.Body)
|
m.errorLogWrite(m.log, w, "could not read request body", "body", r.Body)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer r.Body.Close()
|
defer r.Body.Close()
|
||||||
|
@ -197,23 +195,16 @@ func (m *broadcastManager) processRequest(w http.ResponseWriter, r *http.Request
|
||||||
var broadcast Broadcast
|
var broadcast Broadcast
|
||||||
err = json.Unmarshal(body, &broadcast)
|
err = json.Unmarshal(body, &broadcast)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.errorLogWrite(w, "could not marshal data", "error", err)
|
m.errorLogWrite(m.log, w, "could not marshal data", "error", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err = action(broadcast)
|
err = action(broadcast)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.errorLogWrite(w, "could not perform action", "method", r.Method, "error", err)
|
m.errorLogWrite(m.log, w, "could not perform action", "method", r.Method, "error", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// writeError logs an error and writes to w in JSON format.
|
|
||||||
func (m *broadcastManager) errorLogWrite(w http.ResponseWriter, msg string, args ...interface{}) {
|
|
||||||
m.log.Error(msg, args...)
|
|
||||||
w.Header().Add("Content-Type", "application/json")
|
|
||||||
fmt.Fprint(w, `{"er":"`+msg+`"}`)
|
|
||||||
}
|
|
||||||
|
|
||||||
// getPipeline gets the revid pipeline corresponding to a provided device MAC.
|
// getPipeline gets the revid pipeline corresponding to a provided device MAC.
|
||||||
func (m *broadcastManager) getPipeline(ma MAC) *revid.Revid {
|
func (m *broadcastManager) getPipeline(ma MAC) *revid.Revid {
|
||||||
m.mu.Lock()
|
m.mu.Lock()
|
||||||
|
@ -302,7 +293,7 @@ func (m *broadcastManager) createOrUpdate(broadcast Broadcast) error {
|
||||||
// writeSlate routine.
|
// writeSlate routine.
|
||||||
errCh := make(chan error)
|
errCh := make(chan error)
|
||||||
|
|
||||||
go m.writeSlate(broadcast.RV, errCh, signalCh)
|
go writeSlate(broadcast.RV, errCh, signalCh, m.log)
|
||||||
|
|
||||||
// We'll watch out for any errors that happen within a 5 second window. This
|
// We'll watch out for any errors that happen within a 5 second window. This
|
||||||
// will indicate something seriously wrong with init, like a missing file etc.
|
// will indicate something seriously wrong with init, like a missing file etc.
|
||||||
|
@ -331,70 +322,6 @@ func (m *broadcastManager) createOrUpdate(broadcast Broadcast) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// writeSlate is a routine that employs a file input device and h264 lexer to
|
|
||||||
// write a h264 encoded slate image to the provided revid pipeline.
|
|
||||||
func (m *broadcastManager) writeSlate(rv *revid.Revid, errCh chan error, exitSignal chan struct{}) {
|
|
||||||
m.log.Info("writing slate")
|
|
||||||
const (
|
|
||||||
// This is temporary and will eventually be part of a broadcast configuration
|
|
||||||
// where the remote vidforward API user can provide the slate image.
|
|
||||||
slateFileName = "slate.h264"
|
|
||||||
|
|
||||||
// Assume 25fps until this becomes configurable.
|
|
||||||
slateFrameRate = 25
|
|
||||||
|
|
||||||
loopSetting = true
|
|
||||||
frameDelay = time.Second / slateFrameRate
|
|
||||||
)
|
|
||||||
|
|
||||||
fileInput := file.NewWith(m.log, slateFileName, loopSetting)
|
|
||||||
err := fileInput.Start()
|
|
||||||
if err != nil {
|
|
||||||
errCh <- err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// This will wait for a signal from the provided slateExitSignal (or from a
|
|
||||||
// timeout) to stop writing the slate by "Stopping" the file input which will
|
|
||||||
// terminate the Lex function.
|
|
||||||
go func() {
|
|
||||||
slateTimeoutTimer := time.NewTimer(24 * time.Hour)
|
|
||||||
select {
|
|
||||||
case <-slateTimeoutTimer.C:
|
|
||||||
m.log.Warning("slate timeout")
|
|
||||||
case <-exitSignal:
|
|
||||||
m.log.Info("slate exist signal")
|
|
||||||
}
|
|
||||||
m.log.Info("stopping file input")
|
|
||||||
fileInput.Stop()
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Begin lexing the slate file and send frames to rv pipeline. We'll stay in
|
|
||||||
// here until file input closes or there's an unexpected error.
|
|
||||||
err = h264.Lex(rv, fileInput, frameDelay)
|
|
||||||
|
|
||||||
// If we get to this point, it means that the we've finished lexing for some
|
|
||||||
// reason; let's figure out why.
|
|
||||||
select {
|
|
||||||
// The only reason we'd get a receive on errCh from this side is if its been
|
|
||||||
// closed. This means that we've exceeded the "startup error" period, and that
|
|
||||||
// either the error is normal from stopping the input, or we can no longer inform
|
|
||||||
// the caller and just need to log the problem.
|
|
||||||
case <-errCh:
|
|
||||||
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
|
|
||||||
m.log.Debug("got expected error", "error", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
m.log.Error("got unexpected error", "error", err)
|
|
||||||
|
|
||||||
// This means that a problem occured pretty early in lexing.
|
|
||||||
default:
|
|
||||||
m.log.Error("unexpected error during lex startup", "error", err)
|
|
||||||
errCh <- err
|
|
||||||
}
|
|
||||||
m.log.Error("finished writing slate")
|
|
||||||
}
|
|
||||||
|
|
||||||
// delete removes a broadcast from the record.
|
// delete removes a broadcast from the record.
|
||||||
func (m *broadcastManager) delete(broadcast Broadcast) error {
|
func (m *broadcastManager) delete(broadcast Broadcast) error {
|
||||||
m.mu.Lock()
|
m.mu.Lock()
|
||||||
|
@ -437,25 +364,3 @@ func main() {
|
||||||
http.HandleFunc("/control", bm.control)
|
http.HandleFunc("/control", bm.control)
|
||||||
http.ListenAndServe(*host+":"+*port, nil)
|
http.ListenAndServe(*host+":"+*port, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func isMac(m string) bool {
|
|
||||||
if len(m) != 17 || m == "00:00:00:00:00:00" {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; i <= 15; i++ {
|
|
||||||
if (i+1)%3 == 0 && m[i] != ':' {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
if (3-i)%3 != 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err := strconv.ParseUint(m[i:i+2], 16, 64)
|
|
||||||
if err != nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
|
@ -0,0 +1,100 @@
|
||||||
|
/*
|
||||||
|
DESCRIPTION
|
||||||
|
slate.go houses vidforward slate functionality including the slate writing
|
||||||
|
routine and HTTP request handlers for receiving of new slate videos.
|
||||||
|
|
||||||
|
AUTHORS
|
||||||
|
Saxon A. Nelson-Milton <saxon@ausocean.org>
|
||||||
|
|
||||||
|
LICENSE
|
||||||
|
Copyright (C) 2022 the Australian Ocean Lab (AusOcean)
|
||||||
|
|
||||||
|
It is free software: you can redistribute it and/or modify them
|
||||||
|
under the terms of the GNU General Public License as published by the
|
||||||
|
Free Software Foundation, either version 3 of the License, or (at your
|
||||||
|
option) any later version.
|
||||||
|
|
||||||
|
It is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||||
|
for more details.
|
||||||
|
|
||||||
|
You should have received a copy of the GNU General Public License
|
||||||
|
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"bitbucket.org/ausocean/av/codec/h264"
|
||||||
|
"bitbucket.org/ausocean/av/device/file"
|
||||||
|
"bitbucket.org/ausocean/utils/logging"
|
||||||
|
)
|
||||||
|
|
||||||
|
// writeSlate is a routine that employs a file input device and h264 lexer to
|
||||||
|
// write a h264 encoded slate image to the provided revid pipeline.
|
||||||
|
func writeSlate(dst io.Writer, errCh chan error, exitSignal chan struct{}, log logging.Logger) {
|
||||||
|
log.Info("writing slate")
|
||||||
|
const (
|
||||||
|
// This is temporary and will eventually be part of a broadcast configuration
|
||||||
|
// where the remote vidforward API user can provide the slate image.
|
||||||
|
slateFileName = "slate.h264"
|
||||||
|
|
||||||
|
// Assume 25fps until this becomes configurable.
|
||||||
|
slateFrameRate = 25
|
||||||
|
|
||||||
|
loopSetting = true
|
||||||
|
frameDelay = time.Second / slateFrameRate
|
||||||
|
)
|
||||||
|
|
||||||
|
fileInput := file.NewWith(log, slateFileName, loopSetting)
|
||||||
|
err := fileInput.Start()
|
||||||
|
if err != nil {
|
||||||
|
errCh <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// This will wait for a signal from the provided slateExitSignal (or from a
|
||||||
|
// timeout) to stop writing the slate by "Stopping" the file input which will
|
||||||
|
// terminate the Lex function.
|
||||||
|
go func() {
|
||||||
|
slateTimeoutTimer := time.NewTimer(24 * time.Hour)
|
||||||
|
select {
|
||||||
|
case <-slateTimeoutTimer.C:
|
||||||
|
log.Warning("slate timeout")
|
||||||
|
case <-exitSignal:
|
||||||
|
log.Info("slate exist signal")
|
||||||
|
}
|
||||||
|
log.Info("stopping file input")
|
||||||
|
fileInput.Stop()
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Begin lexing the slate file and send frames to rv pipeline. We'll stay in
|
||||||
|
// here until file input closes or there's an unexpected error.
|
||||||
|
err = h264.Lex(dst, fileInput, frameDelay)
|
||||||
|
|
||||||
|
// If we get to this point, it means that the we've finished lexing for some
|
||||||
|
// reason; let's figure out why.
|
||||||
|
select {
|
||||||
|
// The only reason we'd get a receive on errCh from this side is if its been
|
||||||
|
// closed. This means that we've exceeded the "startup error" period, and that
|
||||||
|
// either the error is normal from stopping the input, or we can no longer inform
|
||||||
|
// the caller and just need to log the problem.
|
||||||
|
case <-errCh:
|
||||||
|
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
|
||||||
|
log.Debug("got expected error", "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.Error("got unexpected error", "error", err)
|
||||||
|
|
||||||
|
// This means that a problem occured pretty early in lexing.
|
||||||
|
default:
|
||||||
|
log.Error("unexpected error during lex startup", "error", err)
|
||||||
|
errCh <- err
|
||||||
|
}
|
||||||
|
log.Error("finished writing slate")
|
||||||
|
}
|
|
@ -0,0 +1,63 @@
|
||||||
|
/*
|
||||||
|
DESCRIPTION
|
||||||
|
utils.go houses generic vidforward utilities and helper functions.
|
||||||
|
|
||||||
|
AUTHORS
|
||||||
|
Saxon A. Nelson-Milton <saxon@ausocean.org>
|
||||||
|
|
||||||
|
LICENSE
|
||||||
|
Copyright (C) 2022 the Australian Ocean Lab (AusOcean)
|
||||||
|
|
||||||
|
It is free software: you can redistribute it and/or modify them
|
||||||
|
under the terms of the GNU General Public License as published by the
|
||||||
|
Free Software Foundation, either version 3 of the License, or (at your
|
||||||
|
option) any later version.
|
||||||
|
|
||||||
|
It is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||||
|
for more details.
|
||||||
|
|
||||||
|
You should have received a copy of the GNU General Public License
|
||||||
|
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
"bitbucket.org/ausocean/utils/logging"
|
||||||
|
)
|
||||||
|
|
||||||
|
// writeError logs an error and writes to w in JSON format.
|
||||||
|
func (m *broadcastManager) errorLogWrite(log logging.Logger, w http.ResponseWriter, msg string, args ...interface{}) {
|
||||||
|
log.Error(msg, args...)
|
||||||
|
w.Header().Add("Content-Type", "application/json")
|
||||||
|
fmt.Fprint(w, `{"er":"`+msg+`"}`)
|
||||||
|
}
|
||||||
|
|
||||||
|
// isMac returns true if the provided string is a valid mac address.
|
||||||
|
func isMac(m string) bool {
|
||||||
|
if len(m) != 17 || m == "00:00:00:00:00:00" {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i <= 15; i++ {
|
||||||
|
if (i+1)%3 == 0 && m[i] != ':' {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if (3-i)%3 != 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := strconv.ParseUint(m[i:i+2], 16, 64)
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
Loading…
Reference in New Issue