diff --git a/cmd/vidforward/main.go b/cmd/vidforward/main.go new file mode 100644 index 00000000..f408d25d --- /dev/null +++ b/cmd/vidforward/main.go @@ -0,0 +1,387 @@ +/* +DESCRIPTION + vidforward is a service for receiving video from cameras and then forwarding to + youtube. By acting as the RTMP encoder (instead of the camera) vidfoward can enable + persistent streams by sending slate images during camera downtime. + +AUTHORS + Saxon A. Nelson-Milton + +LICENSE + Copyright (C) 2022 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + +package main + +import ( + "encoding/json" + "flag" + "fmt" + "io" + "net" + "net/http" + "strconv" + "strings" + "sync" + "time" + + "bitbucket.org/ausocean/av/codec/codecutil" + "bitbucket.org/ausocean/av/container/mts" + "bitbucket.org/ausocean/av/revid" + "bitbucket.org/ausocean/av/revid/config" + "bitbucket.org/ausocean/iot/pi/netlogger" + "bitbucket.org/ausocean/iot/pi/netsender" + "bitbucket.org/ausocean/utils/logging" + "gopkg.in/natefinch/lumberjack.v2" +) + +// Server defaults. +const ( + defaultPort = "8080" + defaultHost = "" +) + +// Logging configuration. +const ( + logPath = "/var/log/netsender/netsender.log" + logMaxSize = 500 // MB + logMaxBackup = 10 + logMaxAge = 28 // days + logVerbosity = logging.Info + logSuppress = false +) + +// Misc constants. +const ( + netSendRetryTime = 5 * time.Second + defaultSleepTime = 60 // Seconds + profilePath = "rv.prof" + pkg = "rv: " + runPreDelay = 20 * time.Second +) + +// forwardHandler implements http.Handler and handles video forwarding requests +// (re-using the recv method created for vidgrind). forwardHandler also keeps +// track of the active mac addresses and their respective revid pipelines. +type forwardHandler struct { + actives map[string]*revid.Revid + log logging.Logger + mu sync.Mutex +} + +// ServeHTTP handles recv requests for video forwarding. The MAC is firstly +// checked to ensure it is "active" i.e. should be sending data, and then the +// video is extracted from the request body and provided to the revid pipeline +// corresponding to said MAC. +func (h *forwardHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + h.log.Debug("recv handler") + q := r.URL.Query() + ma := q.Get("ma") + + rv := h.getActive(ma) + if r == nil { + h.errorLogWrite(w, "forward request mac not active, doing nothing", "mac", ma) + return + } + + const videoPin = "V0" + v := q.Get(videoPin) + if v == "" { + h.errorLogWrite(w, "forward request video pin V0 absent, or has no value") + return + } + + size, err := strconv.Atoi(v) + if err != nil { + h.errorLogWrite(w, "forward request video size can't be conerted to int", "error", err) + return + } + + if size <= 0 { + h.errorLogWrite(w, "forward request video size invalid", "size", size) + return + } + + resp := map[string]interface{}{"ma": ma, "V0": size} + + mtsClip, err := io.ReadAll(r.Body) + if err != nil { + h.errorLogWrite(w, "could not read forward request body", "error", err) + return + } + defer r.Body.Close() + + if len(mtsClip)%mts.PacketSize != 0 { + h.errorLogWrite(w, "invalid clip length", "length", len(mtsClip)) + return + } + + // Extract the pure h264. + h264Clip, err := mts.Extract(mtsClip) + if err != nil { + h.errorLogWrite(w, "could not extract h.264 from the MPEG-TS clip", "error", err) + return + } + + for i, frame := range h264Clip.Frames() { + _, err := rv.Write(frame.Media) + if err != nil { + h.errorLogWrite(w, "could not write frame", "no.", i) + return + } + } + + // Return response to client as JSON. + jsn, err := json.Marshal(resp) + if err != nil { + h.errorLogWrite(w, "could not get json for response", "error", err) + return + } + fmt.Fprint(w, string(jsn)) +} + +// updateActives updates the actives map based on a string of CSV where each +// value is a key-value of mac and url i.e. =,=,... +// If there are pairs in the actives map that do not correspond to any pairs in +// the provided CSV, they will be removed. +func (h *forwardHandler) updateActives(v string) { + pairs := strings.Split(v, ",") + macs := make([]string, 0, len(pairs)) + for _, p := range pairs { + pair := strings.Split(p, "=") + if len(pair) != 2 { + h.log.Warning("invalid = pair", "pair", pair) + continue + } + + m := pair[0] + macs = append(macs, m) + if !isMac(m) { + h.log.Warning("invalid MAC in actives update string", "mac", m) + continue + } + + r := h.getActive(m) + if r == nil { + h.addActive(m, pair[1]) + } + } + h.removeInactives(macs) +} + +// writeError logs an error and writes to w in JSON format. +func (h *forwardHandler) errorLogWrite(w http.ResponseWriter, msg string, args ...interface{}) { + h.log.Error(msg, args...) + w.Header().Add("Content-Type", "application/json") + fmt.Fprint(w, `{"er":"`+msg+`"}`) +} + +// getActive provides the revid pipeline for the provided MAC if active. +func (h *forwardHandler) getActive(ma string) *revid.Revid { + h.mu.Lock() + defer h.mu.Unlock() + v, ok := h.actives[ma] + if !ok { + return nil + } + return v +} + +// addActive adds a new revid pipeline configured with the provided RTMP URL url +// for the MAC ma into the actives map. +func (h *forwardHandler) addActive(ma, url string) error { + h.mu.Lock() + cfg := config.Config{ + Logger: h.log, + Input: config.InputManual, + InputCodec: codecutil.H264_AU, + Outputs: []uint8{config.OutputRTMP}, + RTMPURL: url, + LogLevel: logging.Debug, + } + + rv, err := revid.New(cfg, nil) + if err != nil { + return fmt.Errorf("coult not initialise revid: %w", err) + } + h.actives[ma] = rv + err = rv.Start() + if err != nil { + return fmt.Errorf("could not start revid pipeline") + } + h.mu.Unlock() + return nil +} + +// removeInactives removes all pairs in the active map that do not correspond to +// those found in actives. +func (h *forwardHandler) removeInactives(actives []string) { + h.mu.Lock() +actives: + for k := range h.actives { + for _, m := range actives { + if k == m { + continue actives + } + } + delete(h.actives, k) + } + h.mu.Unlock() +} + +// isActive checks to see if the provided MAC is in the actives map. +func (h *forwardHandler) isActive(ma string) bool { + h.mu.Lock() + _, ok := h.actives[ma] + h.mu.Unlock() + return ok +} + +func main() { + host := flag.String("host", defaultHost, "Host ip to run video forwarder on.") + port := flag.String("port", defaultPort, "Port to run video forwarder on.") + flag.Parse() + + if *host == "" || net.ParseIP(*host) == nil { + panic(fmt.Sprintf("invalid host, host: %s", *host)) + } + + // Create lumberjack logger to handle logging to file. + fileLog := &lumberjack.Logger{ + Filename: logPath, + MaxSize: logMaxSize, + MaxBackups: logMaxBackup, + MaxAge: logMaxAge, + } + + // Create netlogger to handle logging to cloud. + netLog := netlogger.New() + + // Create logger that we call methods on to log, which in turn writes to the + // lumberjack and netloggers. + log := logging.New(logVerbosity, io.MultiWriter(fileLog, netLog), logSuppress) + + log.Debug("initialising netsender client") + ns, err := netsender.New(log, nil, nil, nil) + if err != nil { + log.Fatal(pkg + "could not initialise netsender client: " + err.Error()) + } + + fh := &forwardHandler{log: log, actives: map[string]*revid.Revid{}} + + log.Debug("beginning main loop") + readySig := make(chan struct{}) + go run(ns, log, netLog, fh, readySig) + + // We won't start serving until we've talked to the cloud for configuration. + <-readySig + http.Handle("/recv", fh) + http.ListenAndServe(*host+":"+*port, nil) +} + +// run starts the main loop. This will run netsender on every pass of the loop +// (sleeping inbetween), check vars, and if changed, update configuration as +// appropriate. +func run(ns *netsender.Sender, l logging.Logger, nl *netlogger.Logger, fh *forwardHandler, sig chan struct{}) { + var vs int + for { + l.Debug("running netsender") + err := ns.Run() + if err != nil { + l.Warning(pkg+"Run Failed. Retrying...", "error", err.Error()) + time.Sleep(netSendRetryTime) + continue + } + + l.Debug("sending logs") + err = nl.Send(ns) + if err != nil { + l.Warning(pkg+"Logs could not be sent", "error", err.Error()) + } + + l.Debug("checking varsum") + newVs := ns.VarSum() + if vs == newVs { + sleep(ns, l) + continue + } + vs = newVs + l.Info("varsum changed", "vs", vs) + + l.Debug("getting new vars") + vars, err := ns.Vars() + if err != nil { + l.Error(pkg+"netSender failed to get vars", "error", err.Error()) + time.Sleep(netSendRetryTime) + continue + } + l.Debug("got new vars", "vars", vars) + + // Check the actives variable and update the forward handlers active map. + v, ok := vars["Actives"] + if !ok { + l.Warning("no actives variable in var map", "vars", vars) + sleep(ns, l) + continue + } + fh.updateActives(v) + + // sig is closed on the first time we get here, indicating to external routine + // that we've got our configuration from the cloud. + select { + case <-sig: + default: + close(sig) + } + + sleep(ns, l) + } +} + +// sleep uses a delay to halt the program based on the monitoring period +// netsender parameter (mp) defined in the netsender.conf config. +func sleep(ns *netsender.Sender, l logging.Logger) { + l.Debug("sleeping") + t, err := strconv.Atoi(ns.Param("mp")) + if err != nil { + l.Error(pkg+"could not get sleep time, using default", "error", err) + t = defaultSleepTime + } + time.Sleep(time.Duration(t) * time.Second) + l.Debug("finished sleeping") +} + +func isMac(m string) bool { + if len(m) != 17 || m == "00:00:00:00:00:00" { + return false + } + + for i := 0; i <= 15; i++ { + if (i+1)%3 == 0 && m[i] != ':' { + return false + } + + if (3-i)%3 != 0 { + continue + } + + _, err := strconv.ParseUint(m[i:i+2], 16, 64) + if err != nil { + return false + } + } + return true +} diff --git a/cmd/vidforward/vidforward_test.go b/cmd/vidforward/vidforward_test.go new file mode 100644 index 00000000..41d00423 --- /dev/null +++ b/cmd/vidforward/vidforward_test.go @@ -0,0 +1,49 @@ +/* +DESCRIPTION + Provides testing for vidfoward functionality. + +AUTHORS + Saxon A. Nelson-Milton + +LICENSE + Copyright (C) 2022 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + +package main + +import "testing" + +// TestIsMac tests the isMac function. +func TestIsMac(t *testing.T) { + tests := []struct { + in string + want bool + }{ + {in: "00:00:00:00:00:00", want: false}, + {in: "00000:00:00:00:01", want: false}, + {in: "00:00:00:00000:01", want: false}, + {in: "15:b5:c7:cg:87:92", want: false}, + {in: "00:00:00:00:00", want: false}, + {in: "7d:ac:cf:84:e8:01", want: true}, + } + + for i, test := range tests { + got := isMac(test.in) + if test.want != got { + t.Errorf("did not get expected result for test %d\ngot: %v\nwant: %v", i, got, test.want) + } + } +} diff --git a/codec/codecutil/lex.go b/codec/codecutil/lex.go index 49d7344f..bfe409d8 100644 --- a/codec/codecutil/lex.go +++ b/codec/codecutil/lex.go @@ -25,8 +25,10 @@ LICENSE package codecutil import ( + "errors" "fmt" "io" + "math" "time" ) @@ -81,3 +83,141 @@ func (l *ByteLexer) Lex(dst io.Writer, src io.Reader, d time.Duration) error { } } } + +// Noop reads media "frames" from src, queues and then writes to dst at intervals, +// maintaining a steady number of frames stored in the queue. This ensures frames +// are outputted at a consistent rate; useful if reads occur from src in blocks (a +// side effect if src is connected to an input that receives packets containing +// multiple frames at intervals e.g. MPEG-TS over HTTP). +// Noop assumes that writing to the input connected to src is blocked until the +// entire previous write is read, i.e. src is expected to be connected to +// a pipe-like structure. +func Noop(dst io.Writer, src io.Reader, d time.Duration) error { + // Controller tuning constants. + const ( + target = 500 // Target channel size to maintain. + coef = 0.01 // Proportional controller coefficient. + minDelay = 1 // Minimum delay between writes (ms). + maxDelay = 1000 // Maximum delay between writes (ms). + defaultDelay = 40 * time.Millisecond // Default delay between writes, equivalent to ~25fps. + ) + + // Ring buffer tuning. + const ( + ringCap = 1000 // Ring buffer capacity. + ringElemSize = 250000 // Ring buffer element size i.e. max h264 frame size. + ) + + if d < 0 { + return fmt.Errorf("invalid delay: %v", d) + } + + if d == 0 { + d = defaultDelay + } + + var ( + delay = time.NewTicker(d) // Ticker based on delay between frames. + errCh = make(chan error) // Used by the output routine to signal errors to the main loop. + rb = newRingBuffer(ringElemSize, ringCap) // Use a ring buffer to reduce allocation and GC load. + ) + defer delay.Stop() + + // This routine is responsible for frame output. + go func() { + for { + err := rb.writeTo(dst) + if err != nil { + errCh <- fmt.Errorf("could not write to dst: %w", err) + } + <-delay.C + + // Adjust delay using proportional controller. + adj := coef * float64(target-rb.len()) + adj = math.Max(math.Min(adj, minDelay), maxDelay) // Limit the delay. + d += time.Millisecond * time.Duration(adj) + delay.Reset(d) + } + }() + + // This loop is responsible for reading frames and checking any errors from + // the output routine. + for { + err := rb.readFrom(src) + if err != nil { + return fmt.Errorf("could not read src: %w", err) + } + select { + case err := <-errCh: + return fmt.Errorf("error from output routine: %w", err) + default: + } + } +} + +// ringBuffer is a basic concurrency safe ring buffer. Concurrency safety is +// achieved using a channel between read and write methods i.e. overwrite/dropping +// behaviour is absent and blocking will occur. +type ringBuffer struct { + n int // Num. of elements. + i int // Current index in underlying buffer. + buf [][]byte // Underlying buffer. + ch chan []byte // ch will act as our concurrency safe queue. +} + +func newRingBuffer(sz, cap int) *ringBuffer { + rb := &ringBuffer{ + buf: make([][]byte, cap), + n: cap, + ch: make(chan []byte, cap), + } + for i := range rb.buf { + rb.buf[i] = make([]byte, sz) + } + return rb +} + +// readFrom gets the next []byte from the buffer and uses it to read from r. +// This data is then stored in the buffer channel ready for writeTo to retreive. +// readFrom will block if the buffer channel is filled, at least within the +// timeout, otherwise an error is returned. +func (b *ringBuffer) readFrom(r io.Reader) error { + buf := b.buf[b.i] + b.i++ + if b.i == b.n { + b.i = 0 + } + n, err := r.Read(buf) + if err != nil { + return err + } + const dur = 1 * time.Minute + timeout := time.NewTimer(dur) + select { + case b.ch <- buf[:n]: + case <-timeout.C: + return errors.New("buffer chan send timeout") + } + return nil +} + +// writeTo tries to get a []byte from the buffer channel within the timeout +// and then writes to w if successful, otherwise an error is returned. +func (b *ringBuffer) writeTo(w io.Writer) error { + const dur = 1 * time.Minute + timeout := time.NewTimer(dur) + select { + case p := <-b.ch: + _, err := w.Write(p) + if err != nil { + return err + } + case <-timeout.C: + return errors.New("buffer chan receive timeout") + } + return nil +} + +func (b *ringBuffer) len() int { + return len(b.ch) +} diff --git a/codec/codecutil/list.go b/codec/codecutil/list.go index 773f0c3e..47507f71 100644 --- a/codec/codecutil/list.go +++ b/codec/codecutil/list.go @@ -27,18 +27,19 @@ package codecutil // All available codecs for reference in any application. // When adding or removing a codec from this list, the IsValid function below must be updated. const ( - PCM = "pcm" - ADPCM = "adpcm" - H264 = "h264" - H265 = "h265" - MJPEG = "mjpeg" - JPEG = "jpeg" + PCM = "pcm" + ADPCM = "adpcm" + H264 = "h264" // h264 bytestream (requires lexing). + H264_AU = "h264_au" // Discrete h264 access units. + H265 = "h265" + MJPEG = "mjpeg" + JPEG = "jpeg" ) // IsValid checks if a string is a known and valid codec in the right format. func IsValid(s string) bool { switch s { - case PCM, ADPCM, H264, H265, MJPEG, JPEG: + case PCM, ADPCM, H264, H264_AU, H265, MJPEG, JPEG: return true default: return false diff --git a/container/mts/payload.go b/container/mts/payload.go index 90392a74..b854a211 100644 --- a/container/mts/payload.go +++ b/container/mts/payload.go @@ -141,6 +141,11 @@ type Frame struct { idx int // Index in the backing slice. } +// Frames returns the frames of a h264 clip. +func (c *Clip) Frames() []Frame { + return c.frames +} + // Bytes returns the concatentated media bytes from each frame in the Clip c. func (c *Clip) Bytes() []byte { if c.backing == nil { diff --git a/device/device.go b/device/device.go index 10ebf534..f1a359d6 100644 --- a/device/device.go +++ b/device/device.go @@ -73,3 +73,63 @@ func (me MultiError) Error() string { } return fmt.Sprintf("%v", []error(me)) } + +// ManualInput is an implementation of the Devices interface that represents +// a manual input mechanism, i.e. data is written to this input manually through +// software (ManualInput also implements io.Writer, unlike other implementations). +// The ManualInput employs an io.Pipe, as such, every write must be accompanied +// by a full read (or reads) of the bytes, otherwise blocking will occur (and +// vice versa). This is intended to make writing of distinct access units easier i.e. +// one whole read (with a big enough buf provided) can represent a distinct frame. +type ManualInput struct { + isRunning bool + reader *io.PipeReader + writer *io.PipeWriter +} + +// NewManualInput provides a new ManualInput. +func NewManualInput() *ManualInput { + r, w := io.Pipe() + return &ManualInput{reader: r, writer: w} +} + +// Read reads from the manual input and puts the bytes into p. +func (m *ManualInput) Read(p []byte) (int, error) { + return m.reader.Read(p) +} + +// Name returns the name of ManualInput i.e. "ManualInput". +func (m *ManualInput) Name() string { return "ManualInput" } + +// Set is a stub to satisfy the Device interface; no configuration fields are +// required by ManualInput. +func (m *ManualInput) Set(c config.Config) error { return nil } + +// Start sets the ManualInput isRunning flag to true. This is mostly here just +// to satisfy the Device interface. +func (m *ManualInput) Start() error { + m.isRunning = true + return nil +} + +// Stop closes the pipe and sets the isRunning flag to false. +func (m *ManualInput) Stop() error { + if !m.isRunning { + return nil + } + err := m.reader.Close() + if err != nil { + return err + } + m.isRunning = false + return nil +} + +// IsRunning returns the value of the isRunning flag to indicate if Start has +// been called (and Stop has not been called after). +func (m *ManualInput) IsRunning() bool { return m.isRunning } + +// Write writes p to the ManualInput's writer side of its pipe. +func (m *ManualInput) Write(p []byte) (int, error) { + return m.writer.Write(p) +} diff --git a/device/file/file_test.go b/device/file/file_test.go index 53e90be5..ae497e04 100644 --- a/device/file/file_test.go +++ b/device/file/file_test.go @@ -29,13 +29,14 @@ import ( "time" "bitbucket.org/ausocean/av/revid/config" + "bitbucket.org/ausocean/utils/logging" ) func TestIsRunning(t *testing.T) { const dur = 250 * time.Millisecond const path = "../../../test/test-data/av/input/motion-detection/mjpeg/school.mjpeg" - d := New() + d := New((*logging.TestLogger)(t)) err := d.Set(config.Config{ InputPath: path, diff --git a/revid/config/config.go b/revid/config/config.go index 530cacaa..bc73e21b 100644 --- a/revid/config/config.go +++ b/revid/config/config.go @@ -44,6 +44,7 @@ const ( InputV4L InputRTSP InputAudio + InputManual // Outputs. OutputRTMP @@ -54,7 +55,8 @@ const ( OutputFiles // Codecs. - H264 + H264 // h264 bytestream. + H264_AU // Discrete h264 access units. H265 MJPEG JPEG @@ -292,7 +294,7 @@ func (c *Config) Update(vars map[string]string) { } func (c *Config) LogInvalidField(name string, def interface{}) { - c.Logger.Info( name+" bad or unset, defaulting", name, def) + c.Logger.Info(name+" bad or unset, defaulting", name, def) } func stringInSlice(want string, slice []string) bool { diff --git a/revid/config/variables.go b/revid/config/variables.go index 644cb0f4..c0922c0a 100644 --- a/revid/config/variables.go +++ b/revid/config/variables.go @@ -149,7 +149,7 @@ var Variables = []struct { Name: KeyTransformMatrix, Type: typeString, Update: func(c *Config, v string) { - c.Logger.Debug( "updating transform matrix", "string", v) + c.Logger.Debug("updating transform matrix", "string", v) v = strings.Replace(v, " ", "", -1) vals := make([]float64, 0) if v == "" { @@ -161,7 +161,7 @@ var Variables = []struct { for _, e := range elements { vFloat, err := strconv.ParseFloat(e, 64) if err != nil { - c.Logger.Warning( "invalid TransformMatrix param", "value", e) + c.Logger.Warning("invalid TransformMatrix param", "value", e) } vals = append(vals, vFloat) } @@ -231,7 +231,7 @@ var Variables = []struct { Update: func(c *Config, v string) { _v, err := strconv.Atoi(v) if err != nil { - c.Logger.Warning( "invalid ClipDuration param", "value", v) + c.Logger.Warning("invalid ClipDuration param", "value", v) } c.ClipDuration = time.Duration(_v) * time.Second }, @@ -283,7 +283,7 @@ var Variables = []struct { for i, filter := range filters { v, ok := m[filter] if !ok { - c.Logger.Warning( "invalid Filters param", "value", v) + c.Logger.Warning("invalid Filters param", "value", v) } c.Filters[i] = uint(v) } @@ -317,7 +317,7 @@ var Variables = []struct { }, { Name: KeyInput, - Type: "enum:raspivid,raspistill,rtsp,v4l,file,audio", + Type: "enum:raspivid,raspistill,rtsp,v4l,file,audio,manual", Update: func(c *Config, v string) { c.Input = parseEnum( KeyInput, @@ -329,13 +329,14 @@ var Variables = []struct { "v4l": InputV4L, "file": InputFile, "audio": InputAudio, + "manual": InputManual, }, c, ) }, Validate: func(c *Config) { switch c.Input { - case InputRaspivid, InputRaspistill, InputV4L, InputFile, InputAudio, InputRTSP: + case InputRaspivid, InputRaspistill, InputV4L, InputFile, InputAudio, InputRTSP, InputManual: default: c.LogInvalidField(KeyInput, defaultInput) c.Input = defaultInput @@ -344,7 +345,7 @@ var Variables = []struct { }, { Name: KeyInputCodec, - Type: "enum:h264,h265,mjpeg,jpeg,pcm,adpcm", + Type: "enum:h264,h264_au,h265,mjpeg,jpeg,pcm,adpcm", Update: func(c *Config, v string) { c.InputCodec = v }, @@ -381,7 +382,7 @@ var Variables = []struct { case "Fatal": c.LogLevel = logging.Fatal default: - c.Logger.Warning( "invalid Logging param", "value", v) + c.Logger.Warning("invalid Logging param", "value", v) } }, Validate: func(c *Config) { @@ -417,8 +418,8 @@ var Variables = []struct { }, }, { - Name: KeyMode, - Type: "enum:Normal,Paused,Burst", + Name: KeyMode, + Type: "enum:Normal,Paused,Burst", Update: func(c *Config, v string) {}, }, { @@ -447,7 +448,7 @@ var Variables = []struct { Update: func(c *Config, v string) { f, err := strconv.ParseFloat(v, 64) if err != nil { - c.Logger.Warning( "invalid MotionMinArea var", "value", v) + c.Logger.Warning("invalid MotionMinArea var", "value", v) } c.MotionMinArea = f }, @@ -468,7 +469,7 @@ var Variables = []struct { Update: func(c *Config, v string) { f, err := strconv.ParseFloat(v, 64) if err != nil { - c.Logger.Warning( "invalid MotionThreshold var", "value", v) + c.Logger.Warning("invalid MotionThreshold var", "value", v) } c.MotionThreshold = f }, @@ -490,7 +491,7 @@ var Variables = []struct { case "rtp": c.Outputs[0] = OutputRTP default: - c.Logger.Warning( "invalid output param", "value", v) + c.Logger.Warning("invalid output param", "value", v) } }, }, @@ -518,7 +519,7 @@ var Variables = []struct { case "rtp": c.Outputs[i] = OutputRTP default: - c.Logger.Warning( "invalid outputs param", "value", v) + c.Logger.Warning("invalid outputs param", "value", v) } } }, @@ -570,7 +571,7 @@ var Variables = []struct { Update: func(c *Config, v string) { _v, err := strconv.ParseFloat(v, 64) if err != nil { - c.Logger.Warning( fmt.Sprintf("invalid %s param", KeyRecPeriod), "value", v) + c.Logger.Warning(fmt.Sprintf("invalid %s param", KeyRecPeriod), "value", v) } c.RecPeriod = _v }, @@ -617,7 +618,7 @@ var Variables = []struct { Update: func(c *Config, v string) { _v, err := strconv.Atoi(v) if err != nil { - c.Logger.Warning( "invalid JPEGQuality param", "value", v) + c.Logger.Warning("invalid JPEGQuality param", "value", v) } c.JPEGQuality = _v }, @@ -636,7 +637,7 @@ var Variables = []struct { Update: func(c *Config, v string) { _v, err := strconv.Atoi(v) if err != nil { - c.Logger.Warning( "invalid TimelapseInterval param", "value", v) + c.Logger.Warning("invalid TimelapseInterval param", "value", v) } c.TimelapseInterval = time.Duration(_v) * time.Second }, @@ -647,7 +648,7 @@ var Variables = []struct { Update: func(c *Config, v string) { _v, err := strconv.Atoi(v) if err != nil { - c.Logger.Warning( "invalid TimelapseDuration param", "value", v) + c.Logger.Warning("invalid TimelapseDuration param", "value", v) } c.TimelapseDuration = time.Duration(_v) * time.Second }, @@ -690,7 +691,7 @@ var Variables = []struct { func parseUint(n, v string, c *Config) uint { _v, err := strconv.ParseUint(v, 10, 64) if err != nil { - c.Logger.Warning( fmt.Sprintf("expected unsigned int for param %s", n), "value", v) + c.Logger.Warning(fmt.Sprintf("expected unsigned int for param %s", n), "value", v) } return uint(_v) } @@ -698,7 +699,7 @@ func parseUint(n, v string, c *Config) uint { func parseInt(n, v string, c *Config) int { _v, err := strconv.Atoi(v) if err != nil { - c.Logger.Warning( fmt.Sprintf("expected integer for param %s", n), "value", v) + c.Logger.Warning(fmt.Sprintf("expected integer for param %s", n), "value", v) } return _v } @@ -710,7 +711,7 @@ func parseBool(n, v string, c *Config) (b bool) { case "false": b = false default: - c.Logger.Warning( fmt.Sprintf("expect bool for param %s", n), "value", v) + c.Logger.Warning(fmt.Sprintf("expect bool for param %s", n), "value", v) } return } @@ -718,7 +719,7 @@ func parseBool(n, v string, c *Config) (b bool) { func parseEnum(n, v string, enums map[string]uint8, c *Config) uint8 { _v, ok := enums[strings.ToLower(v)] if !ok { - c.Logger.Warning( fmt.Sprintf("invalid value for %s param", n), "value", v) + c.Logger.Warning(fmt.Sprintf("invalid value for %s param", n), "value", v) } return _v } diff --git a/revid/pipeline.go b/revid/pipeline.go index d879c208..f2f0791c 100644 --- a/revid/pipeline.go +++ b/revid/pipeline.go @@ -312,6 +312,14 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. case config.InputAudio: r.cfg.Logger.Debug("using audio input") err = r.setupAudio() + + case config.InputManual: + r.cfg.Logger.Debug("using manual input") + r.input = device.NewManualInput() + err = r.setLexer(r.cfg.InputCodec, false) + + default: + return fmt.Errorf("unrecognised input type: %v", r.cfg.Input) } if err != nil { return fmt.Errorf("could not set lexer: %w", err) @@ -339,6 +347,9 @@ func (r *Revid) setLexer(c string, isRTSP bool) error { if isRTSP { r.lexTo = h264.NewExtractor().Extract } + case codecutil.H264_AU: + r.cfg.Logger.Debug("using H.264 AU codec") + r.lexTo = codecutil.Noop case codecutil.H265: r.cfg.Logger.Debug("using H.265 codec") r.lexTo = h265.NewExtractor(false).Extract @@ -363,13 +374,15 @@ func (r *Revid) setLexer(c string, isRTSP bool) error { // processFrom is run as a routine to read from a input data source, lex and // then send individual access units to revid's encoders. -func (r *Revid) processFrom(in device.AVDevice, delay time.Duration) { +func (r *Revid) processFrom(delay time.Duration) { defer r.wg.Done() - err := in.Start() - if err != nil { - r.err <- fmt.Errorf("could not start input device: %w", err) - return + if r.input != nil { + err := r.input.Start() + if err != nil { + r.err <- fmt.Errorf("could not start input device: %w", err) + return + } } // Lex data from input device, in, until finished or an error is encountered. @@ -382,7 +395,7 @@ func (r *Revid) processFrom(in device.AVDevice, delay time.Duration) { w = ioext.MultiWriteCloser(r.filters[0], r.probe) } - err = r.lexTo(w, in, delay) + err := r.lexTo(w, r.input, delay) switch err { case nil, io.EOF: r.cfg.Logger.Info("end of file") @@ -394,7 +407,7 @@ func (r *Revid) processFrom(in device.AVDevice, delay time.Duration) { r.cfg.Logger.Info("finished reading input") r.cfg.Logger.Debug("stopping input") - err = in.Stop() + err = r.input.Stop() if err != nil { r.err <- fmt.Errorf("could not stop input source: %w", err) } else { diff --git a/revid/revid.go b/revid/revid.go index ef6f1973..c5684984 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -126,6 +126,14 @@ func (r *Revid) Bitrate() int { return r.bitrate.Bitrate() } +func (r *Revid) Write(p []byte) (int, error) { + mi, ok := r.input.(*device.ManualInput) + if !ok { + return 0, errors.New("cannot write to anything but ManualInput") + } + return mi.Write(p) +} + // Start invokes a Revid to start processing video from a defined input // and packetising (if theres packetization) to a defined output. func (r *Revid) Start() error { @@ -155,7 +163,7 @@ func (r *Revid) Start() error { r.cfg.Logger.Debug("starting input processing routine") r.wg.Add(1) - go r.processFrom(r.input, d) + go r.processFrom(d) r.running = true return nil diff --git a/revid/senders.go b/revid/senders.go index 8131de19..1a6c6f0f 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -68,6 +68,8 @@ type httpSender struct { } // newHttpSender returns a pointer to a new httpSender. +// report is callback that can be used to report the amount of data sent per write. +// This can be set to nil. func newHTTPSender(ns *netsender.Sender, log logging.Logger, report func(sent int)) *httpSender { return &httpSender{ client: ns, @@ -82,7 +84,9 @@ func (s *httpSender) Write(d []byte) (int, error) { err := httpSend(d, s.client, s.log) if err == nil { s.log.Debug("good send", "len", len(d)) - s.report(len(d)) + if s.report != nil { + s.report(len(d)) + } } else { s.log.Debug("bad send", "error", err) }