diff --git a/cmd/vidforward/main.go b/cmd/vidforward/main.go index 994338c6..cd7df336 100644 --- a/cmd/vidforward/main.go +++ b/cmd/vidforward/main.go @@ -36,9 +36,12 @@ import ( "net/http" "strconv" "sync" + "time" "bitbucket.org/ausocean/av/codec/codecutil" + "bitbucket.org/ausocean/av/codec/h264" "bitbucket.org/ausocean/av/container/mts" + "bitbucket.org/ausocean/av/device/file" "bitbucket.org/ausocean/av/revid" "bitbucket.org/ausocean/av/revid/config" "bitbucket.org/ausocean/iot/pi/netlogger" @@ -54,14 +57,18 @@ const ( // Logging configuration. const ( - logPath = "/var/log/netsender/netsender.log" + logPath = "/var/log/vidforward/vidforward.log" logMaxSize = 500 // MB logMaxBackup = 10 logMaxAge = 28 // days logVerbosity = logging.Info - logSuppress = false + logSuppress = true ) +// recvErrorDelay is a delay used when there's recv issues. It is intended to +// prevent spamming from a single client. +const recvErrorDelay = 7 * time.Second + type MAC string // Broadcast is representative of a broadcast to be forwarded. @@ -78,9 +85,15 @@ type Broadcast struct { // and a recv handler which is invoked when a camera wishes to get its video // forwarded to youtube. type broadcastManager struct { - broadcasts map[MAC]Broadcast - log logging.Logger - mu sync.Mutex + broadcasts map[MAC]Broadcast + slateExitSignals map[MAC]chan struct{} // Used to signal to stop writing slate image. + log logging.Logger + mu sync.Mutex +} + +// newBroadcastManager returns a new broadcastManager with the provided logger. +func newBroadcastManager(l logging.Logger) *broadcastManager { + return &broadcastManager{log: l, broadcasts: make(map[MAC]Broadcast), slateExitSignals: make(map[MAC]chan struct{})} } // recvHandler handles recv requests for video forwarding. The MAC is firstly @@ -93,9 +106,16 @@ func (m *broadcastManager) recv(w http.ResponseWriter, r *http.Request) { q := r.URL.Query() ma := MAC(q.Get("ma")) - _, ok := m.broadcasts[ma] - if !ok { + if !m.isActive(ma) { m.errorLogWrite(w, "forward request mac is not mapped, doing nothing", "mac", ma) + time.Sleep(recvErrorDelay) + return + } + + // We can't receive video if we're in slate mode. + if m.getStatus(ma) == "slate" { + m.errorLogWrite(w, "cannot receive video for this mac, status is slate", "mac", ma) + time.Sleep(recvErrorDelay) return } @@ -137,7 +157,7 @@ func (m *broadcastManager) recv(w http.ResponseWriter, r *http.Request) { for i, frame := range h264Clip.Frames() { _, err := rv.Write(frame.Media) if err != nil { - m.errorLogWrite(w, "could not write frame", "no.", i) + m.errorLogWrite(w, "could not write frame", "no.", i, "error", err) return } } @@ -153,6 +173,7 @@ func (m *broadcastManager) recv(w http.ResponseWriter, r *http.Request) { // control handles control API requests. func (m *broadcastManager) control(w http.ResponseWriter, r *http.Request) { + m.log.Info("control request", "method", r.Method) switch r.Method { case http.MethodPut: m.processRequest(w, r, m.createOrUpdate) @@ -204,6 +225,25 @@ func (m *broadcastManager) getPipeline(ma MAC) *revid.Revid { return v.RV } +// getStatus gets the broadcast's status corresponding to the provided MAC. +func (m *broadcastManager) getStatus(ma MAC) string { + m.mu.Lock() + defer m.mu.Unlock() + v, ok := m.broadcasts[ma] + if !ok { + return "" + } + return v.Status +} + +// isActive returns true if a MAC is registered to the broadcast manager. +func (m *broadcastManager) isActive(ma MAC) bool { + m.mu.Lock() + defer m.mu.Unlock() + _, ok := m.broadcasts[ma] + return ok +} + // createOrUpdate creates or updates a Broadcast record. The revid pipeline // corresponding to the broadcast MAC is firsty configured/re-configured, and // the pipeline is "started", which will ready it for receiving video on its @@ -234,15 +274,126 @@ func (m *broadcastManager) createOrUpdate(broadcast Broadcast) error { return fmt.Errorf("could not start revid pipeline: %w", err) } - if broadcast.Status == "slate" { - go m.writeSlate(broadcast.MAC, broadcast.RV) + switch broadcast.Status { + case "create": + fallthrough + case "play": + signal, ok := m.slateExitSignals[broadcast.MAC] + if ok { + close(signal) + delete(m.slateExitSignals, broadcast.MAC) + } + case "slate": + m.log.Debug("slate request") + // If there's a signal channel it means that we're already writing the slate + // image and theres nothing to do, so return. + _, ok := m.slateExitSignals[broadcast.MAC] + if ok { + m.log.Warning("already writing slate") + return nil + } + + // First create a signal that can be used to stop the slate writing routine. + // This will be provided to the writeSlate routine below. + signalCh := make(chan struct{}) + m.slateExitSignals[broadcast.MAC] = signalCh + + // Also create an errCh that will be used to communicate errors from the + // writeSlate routine. + errCh := make(chan error) + + go m.writeSlate(broadcast.RV, errCh, signalCh) + + // We'll watch out for any errors that happen within a 5 second window. This + // will indicate something seriously wrong with init, like a missing file etc. + const startupWindowDuration = 5 * time.Second + startupWindow := time.NewTimer(startupWindowDuration) + select { + + // If this triggers first, we're all good. + case <-startupWindow.C: + m.log.Debug("out of error window") + + // We consider any errors after this either to be normal i.e. as a result + // of stopping the slate input, or something that can not be handled, and + // only logged, therefore we can close the error channel errCh now. + // This will also let the routine know that errors can no longer be sent + // down errCh. + close(errCh) + + // This means we got a slate error pretty early and need to let caller know. + case err := <-errCh: + return fmt.Errorf("could not write slate image: %w", err) + } + default: + return fmt.Errorf("unknown status string: %s", broadcast.Status) } return nil } -// This is just a stub. Eventually this will handle writing of a slate image -// to the destination RTMP URL. -func (m *broadcastManager) writeSlate(ma MAC, rv *revid.Revid) {} +// writeSlate is a routine that employs a file input device and h264 lexer to +// write a h264 encoded slate image to the provided revid pipeline. +func (m *broadcastManager) writeSlate(rv *revid.Revid, errCh chan error, exitSignal chan struct{}) { + m.log.Info("writing slate") + const ( + // This is temporary and will eventually be part of a broadcast configuration + // where the remote vidforward API user can provide the slate image. + slateFileName = "slate.h264" + + // Assume 25fps until this becomes configurable. + slateFrameRate = 25 + + loopSetting = true + frameDelay = time.Second / slateFrameRate + ) + + fileInput := file.NewWith(m.log, slateFileName, loopSetting) + err := fileInput.Start() + if err != nil { + errCh <- err + return + } + + // This will wait for a signal from the provided slateExitSignal (or from a + // timeout) to stop writing the slate by "Stopping" the file input which will + // terminate the Lex function. + go func() { + slateTimeoutTimer := time.NewTimer(24 * time.Hour) + select { + case <-slateTimeoutTimer.C: + m.log.Warning("slate timeout") + case <-exitSignal: + m.log.Info("slate exist signal") + } + m.log.Info("stopping file input") + fileInput.Stop() + }() + + // Begin lexing the slate file and send frames to rv pipeline. We'll stay in + // here until file input closes or there's an unexpected error. + err = h264.Lex(rv, fileInput, frameDelay) + + // If we get to this point, it means that the we've finished lexing for some + // reason; let's figure out why. + select { + // The only reason we'd get a receive on errCh from this side is if its been + // closed. This means that we've exceeded the "startup error" period, and that + // either the error is normal from stopping the input, or we can no longer inform + // the caller and just need to log the problem. + case <-errCh: + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { + m.log.Debug("got expected error", "error", err) + return + } + m.log.Error("got unexpected error", "error", err) + + // This means that a problem occured pretty early in lexing. + default: + m.log.Error("unexpected error during lex startup", "error", err) + errCh <- err + } + m.log.Error("finished writing slate") +} // delete removes a broadcast from the record. func (m *broadcastManager) delete(broadcast Broadcast) error { @@ -281,7 +432,7 @@ func main() { // lumberjack and netloggers. log := logging.New(logVerbosity, io.MultiWriter(fileLog, netLog), logSuppress) - bm := &broadcastManager{log: log, broadcasts: map[MAC]Broadcast{}} + bm := newBroadcastManager(log) http.HandleFunc("/recv", bm.recv) http.HandleFunc("/control", bm.control) http.ListenAndServe(*host+":"+*port, nil) diff --git a/codec/codecutil/lex.go b/codec/codecutil/lex.go index b8a1ce7d..9772bd22 100644 --- a/codec/codecutil/lex.go +++ b/codec/codecutil/lex.go @@ -174,6 +174,7 @@ func newRingBuffer(sz, cap int, timeout time.Duration) *ringBuffer { buf: make([][]byte, cap), n: cap, ch: make(chan []byte, cap), + timeout: timeout, } for i := range rb.buf { rb.buf[i] = make([]byte, sz) diff --git a/device/file/file.go b/device/file/file.go index bb211782..0e3d0f01 100644 --- a/device/file/file.go +++ b/device/file/file.go @@ -30,6 +30,7 @@ import ( "fmt" "io" "os" + "sync" "bitbucket.org/ausocean/av/revid/config" "bitbucket.org/ausocean/utils/logging" @@ -39,14 +40,23 @@ import ( // audio or video data. type AVFile struct { f *os.File - cfg config.Config + path string + loop bool isRunning bool log logging.Logger + set bool + mu sync.Mutex } // NewAVFile returns a new AVFile. func New(l logging.Logger) *AVFile { return &AVFile{log: l} } +// NewWith returns a new AVFile with required params provided i.e. the Set +// method does not need to be called. +func NewWith(l logging.Logger, path string, loop bool) *AVFile { + return &AVFile{log: l, path: path, loop: loop, set: true} +} + // Name returns the name of the device. func (m *AVFile) Name() string { return "File" @@ -54,15 +64,22 @@ func (m *AVFile) Name() string { // Set simply sets the AVFile's config to the passed config. func (m *AVFile) Set(c config.Config) error { - m.cfg = c + m.path = c.InputPath + m.loop = c.Loop + m.set = true return nil } // Start will open the file at the location of the InputPath field of the // config struct. func (m *AVFile) Start() error { + m.mu.Lock() + defer m.mu.Unlock() var err error - m.f, err = os.Open(m.cfg.InputPath) + if !m.set { + return errors.New("AVFile has not been set with config") + } + m.f, err = os.Open(m.path) if err != nil { return fmt.Errorf("could not open media file: %w", err) } @@ -72,6 +89,8 @@ func (m *AVFile) Start() error { // Stop will close the file such that any further reads will fail. func (m *AVFile) Stop() error { + m.mu.Lock() + defer m.mu.Unlock() err := m.f.Close() if err == nil { m.isRunning = false @@ -83,33 +102,38 @@ func (m *AVFile) Stop() error { // Read implements io.Reader. If start has not been called, or Start has been // called and Stop has since been called, an error is returned. func (m *AVFile) Read(p []byte) (int, error) { - if m.f != nil { - n, err := m.f.Read(p) - if err != nil && err != io.EOF { - return n, err - } + m.mu.Lock() + defer m.mu.Unlock() + if m.f == nil { + return 0, errors.New("AV file is closed, AVFile not started") + } - if (n < len(p) || err == io.EOF) && m.cfg.Loop { - m.log.Info("looping input file") - // In the case that we reach end of file but loop is true, we want to - // seek to start and keep reading from there. - _, err = m.f.Seek(0, io.SeekStart) - if err != nil { - return 0, fmt.Errorf("could not seek to start of file for input loop: %w", err) - } - - // Now that we've seeked to start, let's try reading again. - n, err = m.f.Read(p) - if err != nil { - return n, fmt.Errorf("could not read after start seek: %w", err) - } - } + n, err := m.f.Read(p) + if err != nil && err != io.EOF { return n, err } - return 0, errors.New("AV file is closed") + + if (n < len(p) || err == io.EOF) && m.loop { + m.log.Info("looping input file") + // In the case that we reach end of file but loop is true, we want to + // seek to start and keep reading from there. + _, err = m.f.Seek(0, io.SeekStart) + if err != nil { + return 0, fmt.Errorf("could not seek to start of file for input loop: %w", err) + } + + // Now that we've seeked to start, let's try reading again. + n, err = m.f.Read(p) + if err != nil { + return n, fmt.Errorf("could not read after start seek: %w", err) + } + } + return n, err } // IsRunning is used to determine if the AVFile device is running. func (m *AVFile) IsRunning() bool { + m.mu.Lock() + defer m.mu.Unlock() return m.f != nil && m.isRunning } diff --git a/go.mod b/go.mod index 85f7d39d..7addbb62 100644 --- a/go.mod +++ b/go.mod @@ -2,8 +2,6 @@ module bitbucket.org/ausocean/av go 1.16 -replace bitbucket.org/ausocean/utils v1.3.2 => ../utils - require ( bitbucket.org/ausocean/iot v1.3.3 bitbucket.org/ausocean/utils v1.3.2