diff --git a/cmd/vidforward/main.go b/cmd/vidforward/main.go index 90f3b613..f408d25d 100644 --- a/cmd/vidforward/main.go +++ b/cmd/vidforward/main.go @@ -1,3 +1,29 @@ +/* +DESCRIPTION + vidforward is a service for receiving video from cameras and then forwarding to + youtube. By acting as the RTMP encoder (instead of the camera) vidfoward can enable + persistent streams by sending slate images during camera downtime. + +AUTHORS + Saxon A. Nelson-Milton + +LICENSE + Copyright (C) 2022 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + package main import ( @@ -22,6 +48,7 @@ import ( "gopkg.in/natefinch/lumberjack.v2" ) +// Server defaults. const ( defaultPort = "8080" defaultHost = "" @@ -46,12 +73,19 @@ const ( runPreDelay = 20 * time.Second ) +// forwardHandler implements http.Handler and handles video forwarding requests +// (re-using the recv method created for vidgrind). forwardHandler also keeps +// track of the active mac addresses and their respective revid pipelines. type forwardHandler struct { actives map[string]*revid.Revid log logging.Logger mu sync.Mutex } +// ServeHTTP handles recv requests for video forwarding. The MAC is firstly +// checked to ensure it is "active" i.e. should be sending data, and then the +// video is extracted from the request body and provided to the revid pipeline +// corresponding to said MAC. func (h *forwardHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { h.log.Debug("recv handler") q := r.URL.Query() @@ -88,12 +122,14 @@ func (h *forwardHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { h.errorLogWrite(w, "could not read forward request body", "error", err) return } + defer r.Body.Close() if len(mtsClip)%mts.PacketSize != 0 { h.errorLogWrite(w, "invalid clip length", "length", len(mtsClip)) return } + // Extract the pure h264. h264Clip, err := mts.Extract(mtsClip) if err != nil { h.errorLogWrite(w, "could not extract h.264 from the MPEG-TS clip", "error", err) @@ -108,7 +144,7 @@ func (h *forwardHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } - // Return response to client as JSON + // Return response to client as JSON. jsn, err := json.Marshal(resp) if err != nil { h.errorLogWrite(w, "could not get json for response", "error", err) @@ -117,13 +153,43 @@ func (h *forwardHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { fmt.Fprint(w, string(jsn)) } -// writeError writes an error code in JSON format and logs it if in debug mode. +// updateActives updates the actives map based on a string of CSV where each +// value is a key-value of mac and url i.e. =,=,... +// If there are pairs in the actives map that do not correspond to any pairs in +// the provided CSV, they will be removed. +func (h *forwardHandler) updateActives(v string) { + pairs := strings.Split(v, ",") + macs := make([]string, 0, len(pairs)) + for _, p := range pairs { + pair := strings.Split(p, "=") + if len(pair) != 2 { + h.log.Warning("invalid = pair", "pair", pair) + continue + } + + m := pair[0] + macs = append(macs, m) + if !isMac(m) { + h.log.Warning("invalid MAC in actives update string", "mac", m) + continue + } + + r := h.getActive(m) + if r == nil { + h.addActive(m, pair[1]) + } + } + h.removeInactives(macs) +} + +// writeError logs an error and writes to w in JSON format. func (h *forwardHandler) errorLogWrite(w http.ResponseWriter, msg string, args ...interface{}) { h.log.Error(msg, args...) w.Header().Add("Content-Type", "application/json") fmt.Fprint(w, `{"er":"`+msg+`"}`) } +// getActive provides the revid pipeline for the provided MAC if active. func (h *forwardHandler) getActive(ma string) *revid.Revid { h.mu.Lock() defer h.mu.Unlock() @@ -134,16 +200,17 @@ func (h *forwardHandler) getActive(ma string) *revid.Revid { return v } +// addActive adds a new revid pipeline configured with the provided RTMP URL url +// for the MAC ma into the actives map. func (h *forwardHandler) addActive(ma, url string) error { h.mu.Lock() cfg := config.Config{ Logger: h.log, Input: config.InputManual, - InputCodec: codecutil.H264_AU, // h264 access unit i.e. h264 frame. + InputCodec: codecutil.H264_AU, Outputs: []uint8{config.OutputRTMP}, RTMPURL: url, LogLevel: logging.Debug, - FileFPS: 25, } rv, err := revid.New(cfg, nil) @@ -159,11 +226,13 @@ func (h *forwardHandler) addActive(ma, url string) error { return nil } -func (h *forwardHandler) removeInactives(macs []string) { +// removeInactives removes all pairs in the active map that do not correspond to +// those found in actives. +func (h *forwardHandler) removeInactives(actives []string) { h.mu.Lock() actives: for k := range h.actives { - for _, m := range macs { + for _, m := range actives { if k == m { continue actives } @@ -173,6 +242,7 @@ actives: h.mu.Unlock() } +// isActive checks to see if the provided MAC is in the actives map. func (h *forwardHandler) isActive(ma string) bool { h.mu.Lock() _, ok := h.actives[ma] @@ -216,13 +286,15 @@ func main() { readySig := make(chan struct{}) go run(ns, log, netLog, fh, readySig) + // We won't start serving until we've talked to the cloud for configuration. <-readySig http.Handle("/recv", fh) http.ListenAndServe(*host+":"+*port, nil) } // run starts the main loop. This will run netsender on every pass of the loop -// (sleeping inbetween), check vars, and if changed, update revid as appropriate. +// (sleeping inbetween), check vars, and if changed, update configuration as +// appropriate. func run(ns *netsender.Sender, l logging.Logger, nl *netlogger.Logger, fh *forwardHandler, sig chan struct{}) { var vs int for { @@ -258,40 +330,22 @@ func run(ns *netsender.Sender, l logging.Logger, nl *netlogger.Logger, fh *forwa } l.Debug("got new vars", "vars", vars) - // Check the actives variable to seee what MACs are active. + // Check the actives variable and update the forward handlers active map. v, ok := vars["Actives"] if !ok { l.Warning("no actives variable in var map", "vars", vars) + sleep(ns, l) + continue } + fh.updateActives(v) - pairs := strings.Split(v, ",") - macs := make([]string, 0, len(pairs)) - for _, p := range pairs { - pair := strings.Split(p, "=") - if len(pair) != 2 { - l.Warning("invalid = pair", "pair", pair) - continue - } - - m := pair[0] - macs = append(macs, m) - if !isMac(m) { - l.Warning("invalid mac in Actives variable", "mac", m) - continue - } - - r := fh.getActive(m) - if r == nil { - fh.addActive(m, pair[1]) - } - } - fh.removeInactives(macs) - + // sig is closed on the first time we get here, indicating to external routine + // that we've got our configuration from the cloud. select { case <-sig: default: close(sig) - } + } sleep(ns, l) } diff --git a/cmd/vidforward/vidforward b/cmd/vidforward/vidforward deleted file mode 100755 index 82d11135..00000000 Binary files a/cmd/vidforward/vidforward and /dev/null differ diff --git a/cmd/vidforward/vidforward_test.go b/cmd/vidforward/vidforward_test.go index 8e0ee765..41d00423 100644 --- a/cmd/vidforward/vidforward_test.go +++ b/cmd/vidforward/vidforward_test.go @@ -1,9 +1,34 @@ +/* +DESCRIPTION + Provides testing for vidfoward functionality. + +AUTHORS + Saxon A. Nelson-Milton + +LICENSE + Copyright (C) 2022 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + package main import "testing" +// TestIsMac tests the isMac function. func TestIsMac(t *testing.T) { - tests := []struct{ + tests := []struct { in string want bool }{ diff --git a/codec/codecutil/lex.go b/codec/codecutil/lex.go index 6ecba829..bfe409d8 100644 --- a/codec/codecutil/lex.go +++ b/codec/codecutil/lex.go @@ -25,8 +25,10 @@ LICENSE package codecutil import ( + "errors" "fmt" "io" + "math" "time" ) @@ -82,58 +84,69 @@ func (l *ByteLexer) Lex(dst io.Writer, src io.Reader, d time.Duration) error { } } -// Noop assumes that for writing to src is blocked until the entire previous -// write to src is read, i.e. src is expected to connected to a pipe-like structure. +// Noop reads media "frames" from src, queues and then writes to dst at intervals, +// maintaining a steady number of frames stored in the queue. This ensures frames +// are outputted at a consistent rate; useful if reads occur from src in blocks (a +// side effect if src is connected to an input that receives packets containing +// multiple frames at intervals e.g. MPEG-TS over HTTP). +// Noop assumes that writing to the input connected to src is blocked until the +// entire previous write is read, i.e. src is expected to be connected to +// a pipe-like structure. func Noop(dst io.Writer, src io.Reader, d time.Duration) error { + // Controller tuning constants. + const ( + target = 500 // Target channel size to maintain. + coef = 0.01 // Proportional controller coefficient. + minDelay = 1 // Minimum delay between writes (ms). + maxDelay = 1000 // Maximum delay between writes (ms). + defaultDelay = 40 * time.Millisecond // Default delay between writes, equivalent to ~25fps. + ) + + // Ring buffer tuning. + const ( + ringCap = 1000 // Ring buffer capacity. + ringElemSize = 250000 // Ring buffer element size i.e. max h264 frame size. + ) + if d < 0 { return fmt.Errorf("invalid delay: %v", d) } if d == 0 { - d = 40 * time.Millisecond + d = defaultDelay } - ticker := time.NewTicker(d) - defer ticker.Stop() - const checkDur = 500 * time.Millisecond - rateChkTicker := time.NewTicker(checkDur) - frameCh := make(chan []byte, 1000) - errCh := make(chan error) + var ( + delay = time.NewTicker(d) // Ticker based on delay between frames. + errCh = make(chan error) // Used by the output routine to signal errors to the main loop. + rb = newRingBuffer(ringElemSize, ringCap) // Use a ring buffer to reduce allocation and GC load. + ) + defer delay.Stop() + + // This routine is responsible for frame output. go func() { for { - toWrite := <-frameCh - _, err := dst.Write(toWrite) + err := rb.writeTo(dst) if err != nil { errCh <- fmt.Errorf("could not write to dst: %w", err) } + <-delay.C - <-ticker.C - select { - case <-rateChkTicker.C: - var adj int - const equilibrium = 500 - if len(frameCh) > equilibrium { - adj = -2 - } else if len(frameCh) < equilibrium { - adj = 2 - } - d += time.Millisecond * time.Duration(adj) - ticker.Reset(d) - default: - } + // Adjust delay using proportional controller. + adj := coef * float64(target-rb.len()) + adj = math.Max(math.Min(adj, minDelay), maxDelay) // Limit the delay. + d += time.Millisecond * time.Duration(adj) + delay.Reset(d) } }() - const maxFrameSize = 250000 // = 20kB - buf := make([]byte, maxFrameSize) + // This loop is responsible for reading frames and checking any errors from + // the output routine. for { - n, err := src.Read(buf) + err := rb.readFrom(src) if err != nil { return fmt.Errorf("could not read src: %w", err) } - newFrame := make([]byte, n) - copy(newFrame, buf[:n]) - frameCh <- newFrame select { case err := <-errCh: return fmt.Errorf("error from output routine: %w", err) @@ -141,3 +154,70 @@ func Noop(dst io.Writer, src io.Reader, d time.Duration) error { } } } + +// ringBuffer is a basic concurrency safe ring buffer. Concurrency safety is +// achieved using a channel between read and write methods i.e. overwrite/dropping +// behaviour is absent and blocking will occur. +type ringBuffer struct { + n int // Num. of elements. + i int // Current index in underlying buffer. + buf [][]byte // Underlying buffer. + ch chan []byte // ch will act as our concurrency safe queue. +} + +func newRingBuffer(sz, cap int) *ringBuffer { + rb := &ringBuffer{ + buf: make([][]byte, cap), + n: cap, + ch: make(chan []byte, cap), + } + for i := range rb.buf { + rb.buf[i] = make([]byte, sz) + } + return rb +} + +// readFrom gets the next []byte from the buffer and uses it to read from r. +// This data is then stored in the buffer channel ready for writeTo to retreive. +// readFrom will block if the buffer channel is filled, at least within the +// timeout, otherwise an error is returned. +func (b *ringBuffer) readFrom(r io.Reader) error { + buf := b.buf[b.i] + b.i++ + if b.i == b.n { + b.i = 0 + } + n, err := r.Read(buf) + if err != nil { + return err + } + const dur = 1 * time.Minute + timeout := time.NewTimer(dur) + select { + case b.ch <- buf[:n]: + case <-timeout.C: + return errors.New("buffer chan send timeout") + } + return nil +} + +// writeTo tries to get a []byte from the buffer channel within the timeout +// and then writes to w if successful, otherwise an error is returned. +func (b *ringBuffer) writeTo(w io.Writer) error { + const dur = 1 * time.Minute + timeout := time.NewTimer(dur) + select { + case p := <-b.ch: + _, err := w.Write(p) + if err != nil { + return err + } + case <-timeout.C: + return errors.New("buffer chan receive timeout") + } + return nil +} + +func (b *ringBuffer) len() int { + return len(b.ch) +} diff --git a/codec/codecutil/list.go b/codec/codecutil/list.go index e895a198..47507f71 100644 --- a/codec/codecutil/list.go +++ b/codec/codecutil/list.go @@ -27,13 +27,13 @@ package codecutil // All available codecs for reference in any application. // When adding or removing a codec from this list, the IsValid function below must be updated. const ( - PCM = "pcm" - ADPCM = "adpcm" - H264 = "h264" - H264_AU = "h264_au" - H265 = "h265" - MJPEG = "mjpeg" - JPEG = "jpeg" + PCM = "pcm" + ADPCM = "adpcm" + H264 = "h264" // h264 bytestream (requires lexing). + H264_AU = "h264_au" // Discrete h264 access units. + H265 = "h265" + MJPEG = "mjpeg" + JPEG = "jpeg" ) // IsValid checks if a string is a known and valid codec in the right format. diff --git a/container/mts/payload.go b/container/mts/payload.go index f791e709..b854a211 100644 --- a/container/mts/payload.go +++ b/container/mts/payload.go @@ -141,8 +141,9 @@ type Frame struct { idx int // Index in the backing slice. } -func (c* Clip) Frames() []Frame { - return c.frames +// Frames returns the frames of a h264 clip. +func (c *Clip) Frames() []Frame { + return c.frames } // Bytes returns the concatentated media bytes from each frame in the Clip c. diff --git a/device/device.go b/device/device.go index bd56966f..f1a359d6 100644 --- a/device/device.go +++ b/device/device.go @@ -74,11 +74,62 @@ func (me MultiError) Error() string { return fmt.Sprintf("%v", []error(me)) } -type ManualInput struct {} -func NewManualInput() *ManualInput { return &ManualInput{} } -func (m *ManualInput) Read(p []byte) (int, error) { return 0, nil } +// ManualInput is an implementation of the Devices interface that represents +// a manual input mechanism, i.e. data is written to this input manually through +// software (ManualInput also implements io.Writer, unlike other implementations). +// The ManualInput employs an io.Pipe, as such, every write must be accompanied +// by a full read (or reads) of the bytes, otherwise blocking will occur (and +// vice versa). This is intended to make writing of distinct access units easier i.e. +// one whole read (with a big enough buf provided) can represent a distinct frame. +type ManualInput struct { + isRunning bool + reader *io.PipeReader + writer *io.PipeWriter +} + +// NewManualInput provides a new ManualInput. +func NewManualInput() *ManualInput { + r, w := io.Pipe() + return &ManualInput{reader: r, writer: w} +} + +// Read reads from the manual input and puts the bytes into p. +func (m *ManualInput) Read(p []byte) (int, error) { + return m.reader.Read(p) +} + +// Name returns the name of ManualInput i.e. "ManualInput". func (m *ManualInput) Name() string { return "ManualInput" } + +// Set is a stub to satisfy the Device interface; no configuration fields are +// required by ManualInput. func (m *ManualInput) Set(c config.Config) error { return nil } -func (m *ManualInput) Start() error { return nil } -func (m *ManualInput) Stop() error { return nil } -func (m *ManualInput) IsRunning() bool { return true } + +// Start sets the ManualInput isRunning flag to true. This is mostly here just +// to satisfy the Device interface. +func (m *ManualInput) Start() error { + m.isRunning = true + return nil +} + +// Stop closes the pipe and sets the isRunning flag to false. +func (m *ManualInput) Stop() error { + if !m.isRunning { + return nil + } + err := m.reader.Close() + if err != nil { + return err + } + m.isRunning = false + return nil +} + +// IsRunning returns the value of the isRunning flag to indicate if Start has +// been called (and Stop has not been called after). +func (m *ManualInput) IsRunning() bool { return m.isRunning } + +// Write writes p to the ManualInput's writer side of its pipe. +func (m *ManualInput) Write(p []byte) (int, error) { + return m.writer.Write(p) +} diff --git a/device/file/file_test.go b/device/file/file_test.go index 53e90be5..ae497e04 100644 --- a/device/file/file_test.go +++ b/device/file/file_test.go @@ -29,13 +29,14 @@ import ( "time" "bitbucket.org/ausocean/av/revid/config" + "bitbucket.org/ausocean/utils/logging" ) func TestIsRunning(t *testing.T) { const dur = 250 * time.Millisecond const path = "../../../test/test-data/av/input/motion-detection/mjpeg/school.mjpeg" - d := New() + d := New((*logging.TestLogger)(t)) err := d.Set(config.Config{ InputPath: path, diff --git a/revid/config/config.go b/revid/config/config.go index b6c736c2..bc73e21b 100644 --- a/revid/config/config.go +++ b/revid/config/config.go @@ -55,8 +55,8 @@ const ( OutputFiles // Codecs. - H264 - H264_AU + H264 // h264 bytestream. + H264_AU // Discrete h264 access units. H265 MJPEG JPEG @@ -294,7 +294,7 @@ func (c *Config) Update(vars map[string]string) { } func (c *Config) LogInvalidField(name string, def interface{}) { - c.Logger.Info( name+" bad or unset, defaulting", name, def) + c.Logger.Info(name+" bad or unset, defaulting", name, def) } func stringInSlice(want string, slice []string) bool { diff --git a/revid/config/variables.go b/revid/config/variables.go index ba3a8a3a..c0922c0a 100644 --- a/revid/config/variables.go +++ b/revid/config/variables.go @@ -149,7 +149,7 @@ var Variables = []struct { Name: KeyTransformMatrix, Type: typeString, Update: func(c *Config, v string) { - c.Logger.Debug( "updating transform matrix", "string", v) + c.Logger.Debug("updating transform matrix", "string", v) v = strings.Replace(v, " ", "", -1) vals := make([]float64, 0) if v == "" { @@ -161,7 +161,7 @@ var Variables = []struct { for _, e := range elements { vFloat, err := strconv.ParseFloat(e, 64) if err != nil { - c.Logger.Warning( "invalid TransformMatrix param", "value", e) + c.Logger.Warning("invalid TransformMatrix param", "value", e) } vals = append(vals, vFloat) } @@ -231,7 +231,7 @@ var Variables = []struct { Update: func(c *Config, v string) { _v, err := strconv.Atoi(v) if err != nil { - c.Logger.Warning( "invalid ClipDuration param", "value", v) + c.Logger.Warning("invalid ClipDuration param", "value", v) } c.ClipDuration = time.Duration(_v) * time.Second }, @@ -283,7 +283,7 @@ var Variables = []struct { for i, filter := range filters { v, ok := m[filter] if !ok { - c.Logger.Warning( "invalid Filters param", "value", v) + c.Logger.Warning("invalid Filters param", "value", v) } c.Filters[i] = uint(v) } @@ -329,7 +329,7 @@ var Variables = []struct { "v4l": InputV4L, "file": InputFile, "audio": InputAudio, - "manual": InputManual, + "manual": InputManual, }, c, ) @@ -382,7 +382,7 @@ var Variables = []struct { case "Fatal": c.LogLevel = logging.Fatal default: - c.Logger.Warning( "invalid Logging param", "value", v) + c.Logger.Warning("invalid Logging param", "value", v) } }, Validate: func(c *Config) { @@ -418,8 +418,8 @@ var Variables = []struct { }, }, { - Name: KeyMode, - Type: "enum:Normal,Paused,Burst", + Name: KeyMode, + Type: "enum:Normal,Paused,Burst", Update: func(c *Config, v string) {}, }, { @@ -448,7 +448,7 @@ var Variables = []struct { Update: func(c *Config, v string) { f, err := strconv.ParseFloat(v, 64) if err != nil { - c.Logger.Warning( "invalid MotionMinArea var", "value", v) + c.Logger.Warning("invalid MotionMinArea var", "value", v) } c.MotionMinArea = f }, @@ -469,7 +469,7 @@ var Variables = []struct { Update: func(c *Config, v string) { f, err := strconv.ParseFloat(v, 64) if err != nil { - c.Logger.Warning( "invalid MotionThreshold var", "value", v) + c.Logger.Warning("invalid MotionThreshold var", "value", v) } c.MotionThreshold = f }, @@ -491,7 +491,7 @@ var Variables = []struct { case "rtp": c.Outputs[0] = OutputRTP default: - c.Logger.Warning( "invalid output param", "value", v) + c.Logger.Warning("invalid output param", "value", v) } }, }, @@ -519,7 +519,7 @@ var Variables = []struct { case "rtp": c.Outputs[i] = OutputRTP default: - c.Logger.Warning( "invalid outputs param", "value", v) + c.Logger.Warning("invalid outputs param", "value", v) } } }, @@ -571,7 +571,7 @@ var Variables = []struct { Update: func(c *Config, v string) { _v, err := strconv.ParseFloat(v, 64) if err != nil { - c.Logger.Warning( fmt.Sprintf("invalid %s param", KeyRecPeriod), "value", v) + c.Logger.Warning(fmt.Sprintf("invalid %s param", KeyRecPeriod), "value", v) } c.RecPeriod = _v }, @@ -618,7 +618,7 @@ var Variables = []struct { Update: func(c *Config, v string) { _v, err := strconv.Atoi(v) if err != nil { - c.Logger.Warning( "invalid JPEGQuality param", "value", v) + c.Logger.Warning("invalid JPEGQuality param", "value", v) } c.JPEGQuality = _v }, @@ -637,7 +637,7 @@ var Variables = []struct { Update: func(c *Config, v string) { _v, err := strconv.Atoi(v) if err != nil { - c.Logger.Warning( "invalid TimelapseInterval param", "value", v) + c.Logger.Warning("invalid TimelapseInterval param", "value", v) } c.TimelapseInterval = time.Duration(_v) * time.Second }, @@ -648,7 +648,7 @@ var Variables = []struct { Update: func(c *Config, v string) { _v, err := strconv.Atoi(v) if err != nil { - c.Logger.Warning( "invalid TimelapseDuration param", "value", v) + c.Logger.Warning("invalid TimelapseDuration param", "value", v) } c.TimelapseDuration = time.Duration(_v) * time.Second }, @@ -691,7 +691,7 @@ var Variables = []struct { func parseUint(n, v string, c *Config) uint { _v, err := strconv.ParseUint(v, 10, 64) if err != nil { - c.Logger.Warning( fmt.Sprintf("expected unsigned int for param %s", n), "value", v) + c.Logger.Warning(fmt.Sprintf("expected unsigned int for param %s", n), "value", v) } return uint(_v) } @@ -699,7 +699,7 @@ func parseUint(n, v string, c *Config) uint { func parseInt(n, v string, c *Config) int { _v, err := strconv.Atoi(v) if err != nil { - c.Logger.Warning( fmt.Sprintf("expected integer for param %s", n), "value", v) + c.Logger.Warning(fmt.Sprintf("expected integer for param %s", n), "value", v) } return _v } @@ -711,7 +711,7 @@ func parseBool(n, v string, c *Config) (b bool) { case "false": b = false default: - c.Logger.Warning( fmt.Sprintf("expect bool for param %s", n), "value", v) + c.Logger.Warning(fmt.Sprintf("expect bool for param %s", n), "value", v) } return } @@ -719,7 +719,7 @@ func parseBool(n, v string, c *Config) (b bool) { func parseEnum(n, v string, enums map[string]uint8, c *Config) uint8 { _v, ok := enums[strings.ToLower(v)] if !ok { - c.Logger.Warning( fmt.Sprintf("invalid value for %s param", n), "value", v) + c.Logger.Warning(fmt.Sprintf("invalid value for %s param", n), "value", v) } return _v } diff --git a/revid/pipeline.go b/revid/pipeline.go index 09eb7a67..f2f0791c 100644 --- a/revid/pipeline.go +++ b/revid/pipeline.go @@ -317,10 +317,9 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. r.cfg.Logger.Debug("using manual input") r.input = device.NewManualInput() err = r.setLexer(r.cfg.InputCodec, false) - r.pipeReader, r.pipeWriter = io.Pipe() default: - return fmt.Errorf("unrecognised input type: %v") + return fmt.Errorf("unrecognised input type: %v", r.cfg.Input) } if err != nil { return fmt.Errorf("could not set lexer: %w", err) @@ -396,13 +395,7 @@ func (r *Revid) processFrom(delay time.Duration) { w = ioext.MultiWriteCloser(r.filters[0], r.probe) } - var err error - if r.cfg.Input == config.InputManual { - err = r.lexTo(w, r.pipeReader, delay) - } else { - err = r.lexTo(w, r.input, delay) - } - + err := r.lexTo(w, r.input, delay) switch err { case nil, io.EOF: r.cfg.Logger.Info("end of file") diff --git a/revid/revid.go b/revid/revid.go index 2d2e3a61..c5684984 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -102,9 +102,6 @@ type Revid struct { // stop is used to signal stopping when looping an input. stop chan struct{} - - pipeReader *io.PipeReader - pipeWriter *io.PipeWriter } // New returns a pointer to a new Revid with the desired configuration, and/or @@ -129,11 +126,12 @@ func (r *Revid) Bitrate() int { return r.bitrate.Bitrate() } -func (r *Revid) Write(p []byte) (int, error){ - if r.pipeWriter == nil { - return 0, errors.New("revid input pipewriter not initialised, please start revid") +func (r *Revid) Write(p []byte) (int, error) { + mi, ok := r.input.(*device.ManualInput) + if !ok { + return 0, errors.New("cannot write to anything but ManualInput") } - return r.pipeWriter.Write(p) + return mi.Write(p) } // Start invokes a Revid to start processing video from a defined input