diff --git a/cmd/vidforward/main.go b/cmd/vidforward/main.go index 43caf551..90f3b613 100644 --- a/cmd/vidforward/main.go +++ b/cmd/vidforward/main.go @@ -1,6 +1,7 @@ package main import ( + "encoding/json" "flag" "fmt" "io" @@ -33,7 +34,7 @@ const ( logMaxBackup = 10 logMaxAge = 28 // days logVerbosity = logging.Info - logSuppress = true + logSuppress = false ) // Misc constants. @@ -52,6 +53,7 @@ type forwardHandler struct { } func (h *forwardHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + h.log.Debug("recv handler") q := r.URL.Query() ma := q.Get("ma") @@ -79,6 +81,8 @@ func (h *forwardHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + resp := map[string]interface{}{"ma": ma, "V0": size} + mtsClip, err := io.ReadAll(r.Body) if err != nil { h.errorLogWrite(w, "could not read forward request body", "error", err) @@ -103,6 +107,14 @@ func (h *forwardHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } } + + // Return response to client as JSON + jsn, err := json.Marshal(resp) + if err != nil { + h.errorLogWrite(w, "could not get json for response", "error", err) + return + } + fmt.Fprint(w, string(jsn)) } // writeError writes an error code in JSON format and logs it if in debug mode. @@ -128,8 +140,10 @@ func (h *forwardHandler) addActive(ma, url string) error { Logger: h.log, Input: config.InputManual, InputCodec: codecutil.H264_AU, // h264 access unit i.e. h264 frame. - Outputs: []uint8{config.OutputRTMP}, + Outputs: []uint8{config.OutputRTMP}, RTMPURL: url, + LogLevel: logging.Debug, + FileFPS: 25, } rv, err := revid.New(cfg, nil) @@ -137,6 +151,10 @@ func (h *forwardHandler) addActive(ma, url string) error { return fmt.Errorf("coult not initialise revid: %w", err) } h.actives[ma] = rv + err = rv.Start() + if err != nil { + return fmt.Errorf("could not start revid pipeline") + } h.mu.Unlock() return nil } @@ -192,18 +210,20 @@ func main() { log.Fatal(pkg + "could not initialise netsender client: " + err.Error()) } - fh := &forwardHandler{log: log} + fh := &forwardHandler{log: log, actives: map[string]*revid.Revid{}} log.Debug("beginning main loop") - go run(ns, log, netLog, fh) + readySig := make(chan struct{}) + go run(ns, log, netLog, fh, readySig) + <-readySig http.Handle("/recv", fh) http.ListenAndServe(*host+":"+*port, nil) } // run starts the main loop. This will run netsender on every pass of the loop // (sleeping inbetween), check vars, and if changed, update revid as appropriate. -func run(ns *netsender.Sender, l logging.Logger, nl *netlogger.Logger, fh *forwardHandler) { +func run(ns *netsender.Sender, l logging.Logger, nl *netlogger.Logger, fh *forwardHandler, sig chan struct{}) { var vs int for { l.Debug("running netsender") @@ -267,6 +287,12 @@ func run(ns *netsender.Sender, l logging.Logger, nl *netlogger.Logger, fh *forwa } fh.removeInactives(macs) + select { + case <-sig: + default: + close(sig) + } + sleep(ns, l) } } @@ -294,9 +320,9 @@ func isMac(m string) bool { return false } - if (3-i)%3 != 0 { - continue - } + if (3-i)%3 != 0 { + continue + } _, err := strconv.ParseUint(m[i:i+2], 16, 64) if err != nil { diff --git a/codec/codecutil/lex.go b/codec/codecutil/lex.go index e7dd0b51..6ecba829 100644 --- a/codec/codecutil/lex.go +++ b/codec/codecutil/lex.go @@ -82,12 +82,62 @@ func (l *ByteLexer) Lex(dst io.Writer, src io.Reader, d time.Duration) error { } } +// Noop assumes that for writing to src is blocked until the entire previous +// write to src is read, i.e. src is expected to connected to a pipe-like structure. func Noop(dst io.Writer, src io.Reader, d time.Duration) error { - r := io.TeeReader(src,dst) + if d < 0 { + return fmt.Errorf("invalid delay: %v", d) + } + + if d == 0 { + d = 40 * time.Millisecond + } + ticker := time.NewTicker(d) + defer ticker.Stop() + + const checkDur = 500 * time.Millisecond + rateChkTicker := time.NewTicker(checkDur) + frameCh := make(chan []byte, 1000) + errCh := make(chan error) + go func() { + for { + toWrite := <-frameCh + _, err := dst.Write(toWrite) + if err != nil { + errCh <- fmt.Errorf("could not write to dst: %w", err) + } + + <-ticker.C + select { + case <-rateChkTicker.C: + var adj int + const equilibrium = 500 + if len(frameCh) > equilibrium { + adj = -2 + } else if len(frameCh) < equilibrium { + adj = 2 + } + d += time.Millisecond * time.Duration(adj) + ticker.Reset(d) + default: + } + } + }() + + const maxFrameSize = 250000 // = 20kB + buf := make([]byte, maxFrameSize) for { - _, err := io.ReadAll(r) + n, err := src.Read(buf) if err != nil { - return err + return fmt.Errorf("could not read src: %w", err) + } + newFrame := make([]byte, n) + copy(newFrame, buf[:n]) + frameCh <- newFrame + select { + case err := <-errCh: + return fmt.Errorf("error from output routine: %w", err) + default: } } } diff --git a/device/device.go b/device/device.go index 10ebf534..bd56966f 100644 --- a/device/device.go +++ b/device/device.go @@ -73,3 +73,12 @@ func (me MultiError) Error() string { } return fmt.Sprintf("%v", []error(me)) } + +type ManualInput struct {} +func NewManualInput() *ManualInput { return &ManualInput{} } +func (m *ManualInput) Read(p []byte) (int, error) { return 0, nil } +func (m *ManualInput) Name() string { return "ManualInput" } +func (m *ManualInput) Set(c config.Config) error { return nil } +func (m *ManualInput) Start() error { return nil } +func (m *ManualInput) Stop() error { return nil } +func (m *ManualInput) IsRunning() bool { return true } diff --git a/revid/config/variables.go b/revid/config/variables.go index 4f72a2de..ba3a8a3a 100644 --- a/revid/config/variables.go +++ b/revid/config/variables.go @@ -336,7 +336,7 @@ var Variables = []struct { }, Validate: func(c *Config) { switch c.Input { - case InputRaspivid, InputRaspistill, InputV4L, InputFile, InputAudio, InputRTSP: + case InputRaspivid, InputRaspistill, InputV4L, InputFile, InputAudio, InputRTSP, InputManual: default: c.LogInvalidField(KeyInput, defaultInput) c.Input = defaultInput diff --git a/revid/pipeline.go b/revid/pipeline.go index 4174adea..09eb7a67 100644 --- a/revid/pipeline.go +++ b/revid/pipeline.go @@ -40,6 +40,7 @@ import ( "bitbucket.org/ausocean/av/codec/jpeg" "bitbucket.org/ausocean/av/container/flv" "bitbucket.org/ausocean/av/container/mts" + "bitbucket.org/ausocean/av/device" "bitbucket.org/ausocean/av/device/file" "bitbucket.org/ausocean/av/device/geovision" "bitbucket.org/ausocean/av/device/raspistill" @@ -313,6 +314,9 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. err = r.setupAudio() case config.InputManual: + r.cfg.Logger.Debug("using manual input") + r.input = device.NewManualInput() + err = r.setLexer(r.cfg.InputCodec, false) r.pipeReader, r.pipeWriter = io.Pipe() default: @@ -392,7 +396,7 @@ func (r *Revid) processFrom(delay time.Duration) { w = ioext.MultiWriteCloser(r.filters[0], r.probe) } - var err error + var err error if r.cfg.Input == config.InputManual { err = r.lexTo(w, r.pipeReader, delay) } else { diff --git a/revid/revid.go b/revid/revid.go index 3f7727dd..2d2e3a61 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -130,6 +130,9 @@ func (r *Revid) Bitrate() int { } func (r *Revid) Write(p []byte) (int, error){ + if r.pipeWriter == nil { + return 0, errors.New("revid input pipewriter not initialised, please start revid") + } return r.pipeWriter.Write(p) }