diff --git a/codec/h264/lex.go b/codec/h264/lex.go index 176c8b3b..6159a15e 100644 --- a/codec/h264/lex.go +++ b/codec/h264/lex.go @@ -66,7 +66,7 @@ func Lex(dst io.Writer, src io.Reader, delay time.Duration) error { buf := make([]byte, len(h264Prefix), bufSize) copy(buf, h264Prefix[:]) writeOut := false -outer: + for { var b byte var err error @@ -75,7 +75,10 @@ outer: if err != io.EOF { return err } - break + if len(buf) != 0 { + return io.ErrUnexpectedEOF + } + return io.EOF } for n := 1; b == 0x0 && n < 4; n++ { @@ -84,7 +87,7 @@ outer: if err != io.EOF { return err } - break outer + return io.ErrUnexpectedEOF } buf = append(buf, b) @@ -109,7 +112,7 @@ outer: if err != io.EOF { return err } - break outer + return io.ErrUnexpectedEOF } buf = append(buf, b) @@ -127,10 +130,4 @@ outer: } } } - if len(buf) == len(h264Prefix) { - return nil - } - <-tick - _, err := dst.Write(buf) - return err } diff --git a/codec/h265/lex.go b/codec/h265/lex.go index 55db6be6..2fb41a9d 100644 --- a/codec/h265/lex.go +++ b/codec/h265/lex.go @@ -76,10 +76,14 @@ func (l *Lexer) Lex(dst io.Writer, src io.Reader, delay time.Duration) error { n, err := src.Read(buf) switch err { case nil: // Do nothing. - case io.EOF: - return nil default: - return fmt.Errorf("source read error: %w\n", err) + if err == io.EOF { + if l.buf.Len() == 0 { + return io.EOF + } + return io.ErrUnexpectedEOF + } + return err } // Get payload from RTP packet. @@ -181,7 +185,7 @@ func (l *Lexer) handleFragmentation(d []byte) { } } -// handlePACI will handl PACI packets +// handlePACI will handle PACI packets // // TODO: complete this func (l *Lexer) handlePACI(d []byte) { diff --git a/codec/h265/lex_test.go b/codec/h265/lex_test.go index 1a409e4c..9f725bb9 100644 --- a/codec/h265/lex_test.go +++ b/codec/h265/lex_test.go @@ -247,7 +247,9 @@ func TestLex(t *testing.T) { r := &rtpReader{packets: test.packets} d := &destination{} err := NewLexer(test.donl).Lex(d, r, 0) - if err != nil { + switch err { + case nil, io.EOF: // Do nothing + default: t.Fatalf("error lexing: %v\n", err) } diff --git a/codec/mjpeg/lex.go b/codec/mjpeg/lex.go index 46ab4ce8..aea4ce9f 100644 --- a/codec/mjpeg/lex.go +++ b/codec/mjpeg/lex.go @@ -60,12 +60,9 @@ func Lex(dst io.Writer, src io.Reader, delay time.Duration) error { buf := make([]byte, 2, 4<<10) n, err := r.Read(buf) if n < 2 { - return nil + return io.ErrUnexpectedEOF } if err != nil { - if err == io.EOF { - return nil - } return err } if !bytes.Equal(buf, []byte{0xff, 0xd8}) { @@ -76,7 +73,7 @@ func Lex(dst io.Writer, src io.Reader, delay time.Duration) error { b, err := r.ReadByte() if err != nil { if err == io.EOF { - return nil + return io.ErrUnexpectedEOF } return err } diff --git a/filter/filters_circleci.go b/filter/filters_circleci.go index f6d46082..374e8b0a 100644 --- a/filter/filters_circleci.go +++ b/filter/filters_circleci.go @@ -32,10 +32,10 @@ import ( ) // NewMOGFilter returns a pointer to a new NoOp struct for testing purposes only. -func NewMOGFilter(dst io.WriteCloser, area, threshold float64, history int, debug bool) *NoOp { +func NewMOGFilter(dst io.WriteCloser, area, threshold float64, history int, debug bool, hf int) *NoOp { return &NoOp{dst: dst} } -func NewKNNFilter(dst io.WriteCloser, area, threshold float64, history, kernelSize int, debug bool) *NoOp { +func NewKNNFilter(dst io.WriteCloser, area, threshold float64, history, kernelSize int, debug bool, hf int) *NoOp { return &NoOp{dst: dst} } diff --git a/filter/knn.go b/filter/knn.go index be733ec0..cc5a8a37 100644 --- a/filter/knn.go +++ b/filter/knn.go @@ -40,23 +40,26 @@ import ( // KNNFilter is a filter that provides basic motion detection. KNN is short for // K-Nearest Neighbours method. type KNNFilter struct { - dst io.WriteCloser - area float64 - bs *gocv.BackgroundSubtractorKNN - knl gocv.Mat - debug bool - windows []*gocv.Window + dst io.WriteCloser // Destination to which motion containing frames go. + area float64 // The minimum area that a contour can be found in. + bs *gocv.BackgroundSubtractorKNN // Uses the KNN algorithm to find the difference between the current and background frame. + knl gocv.Mat // Matrix that is used for calculations. + debug bool // If true then debug windows with the bounding boxes and difference will be shown on the screen. + windows []*gocv.Window // Holds debug windows. + hold [][]byte // Will hold all frames up to hf (so only every hf frame is motion detected). + hf int // The number of frames to be held. + hfCount int // Counter for the hold array. } // NewKNNFilter returns a pointer to a new KNNFilter. -func NewKNNFilter(dst io.WriteCloser, area, threshold float64, history, kernelSize int, debug bool) *KNNFilter { +func NewKNNFilter(dst io.WriteCloser, area, threshold float64, history, kernelSize int, debug bool, hf int) *KNNFilter { bs := gocv.NewBackgroundSubtractorKNNWithParams(history, threshold, false) k := gocv.GetStructuringElement(gocv.MorphRect, image.Pt(kernelSize, kernelSize)) var windows []*gocv.Window if debug { windows = []*gocv.Window{gocv.NewWindow("KNN: Bounding boxes"), gocv.NewWindow("KNN: Motion")} } - return &KNNFilter{dst, area, &bs, k, debug, windows} + return &KNNFilter{dst, area, &bs, k, debug, windows, make([][]byte, hf-1), hf, 0} } // Implements io.Closer. @@ -75,6 +78,13 @@ func (m *KNNFilter) Close() error { // Write applies the motion filter to the video stream. Only frames with motion // are written to the destination encoder, frames without are discarded. func (m *KNNFilter) Write(f []byte) (int, error) { + if m.hfCount < (m.hf - 1) { + m.hold[m.hfCount] = f + m.hfCount++ + return len(f), nil + } + m.hfCount = 0 + img, err := gocv.IMDecode(f, gocv.IMReadColor) if err != nil { return 0, fmt.Errorf("can't decode image: %w", err) @@ -125,9 +135,16 @@ func (m *KNNFilter) Write(f []byte) (int, error) { // Don't write to destination if there is no motion. if len(contours) == 0 { - return -1, nil + return len(f), nil } - // Write to destination. + // Write to destination, past 4 frames then current frame. + for i, h := range m.hold { + _, err := m.dst.Write(h) + m.hold[i] = nil + if err != nil { + return len(f), fmt.Errorf("could not write previous frames: %w", err) + } + } return m.dst.Write(f) } diff --git a/filter/mog.go b/filter/mog.go index 9c647bd2..20fd5807 100644 --- a/filter/mog.go +++ b/filter/mog.go @@ -40,23 +40,26 @@ import ( // MOGFilter is a filter that provides basic motion detection. MoG is short for // Mixture of Gaussians method. type MOGFilter struct { - dst io.WriteCloser - area float64 - bs *gocv.BackgroundSubtractorMOG2 - knl gocv.Mat - debug bool - windows []*gocv.Window + dst io.WriteCloser // Destination to which motion containing frames go. + area float64 // The minimum area that a contour can be found in. + bs *gocv.BackgroundSubtractorMOG2 // Uses the MOG algorithm to find the difference between the current and background frame. + knl gocv.Mat // Matrix that is used for calculations. + debug bool // If true then debug windows with the bounding boxes and difference will be shown on the screen. + windows []*gocv.Window // Holds debug windows. + hold [][]byte // Will hold all frames up to hf (so only every hf frame is motion detected). + hf int // The number of frames to be held. + hfCount int // Counter for the hold array. } // NewMOGFilter returns a pointer to a new MOGFilter struct. -func NewMOGFilter(dst io.WriteCloser, area, threshold float64, history int, debug bool) *MOGFilter { +func NewMOGFilter(dst io.WriteCloser, area, threshold float64, history int, debug bool, hf int) *MOGFilter { bs := gocv.NewBackgroundSubtractorMOG2WithParams(history, threshold, false) k := gocv.GetStructuringElement(gocv.MorphRect, image.Pt(3, 3)) var windows []*gocv.Window if debug { windows = []*gocv.Window{gocv.NewWindow("MOG: Bounding boxes"), gocv.NewWindow("MOG: Motion")} } - return &MOGFilter{dst, area, &bs, k, debug, windows} + return &MOGFilter{dst, area, &bs, k, debug, windows, make([][]byte, hf-1), hf, 0} } // Implements io.Closer. @@ -75,6 +78,13 @@ func (m *MOGFilter) Close() error { // Write applies the motion filter to the video stream. Only frames with motion // are written to the destination encoder, frames without are discarded. func (m *MOGFilter) Write(f []byte) (int, error) { + if m.hfCount < (m.hf - 1) { + m.hold[m.hfCount] = f + m.hfCount++ + return len(f), nil + } + + m.hfCount = 0 img, err := gocv.IMDecode(f, gocv.IMReadColor) if err != nil { return 0, fmt.Errorf("image can't be decoded: %w", err) @@ -125,9 +135,16 @@ func (m *MOGFilter) Write(f []byte) (int, error) { // Don't write to destination if there is no motion. if len(contours) == 0 { - return 0, nil + return len(f), nil } - // Write to destination. + // Write to destination, past 4 frames then current frame. + for i, h := range m.hold { + _, err := m.dst.Write(h) + m.hold[i] = nil + if err != nil { + return len(f), fmt.Errorf("could not write previous frames: %w", err) + } + } return m.dst.Write(f) } diff --git a/revid/config/config.go b/revid/config/config.go index 710a75c5..194faf33 100644 --- a/revid/config/config.go +++ b/revid/config/config.go @@ -85,6 +85,7 @@ const ( defaultClipDuration = 0 defaultAudioInputCodec = codecutil.ADPCM defaultPSITime = 2 + defaultMotionInterval = 5 // Ring buffer defaults. defaultRBMaxElements = 10000 @@ -275,6 +276,7 @@ type Config struct { HorizontalFlip bool // HorizontalFlip flips video horizontally for Raspivid input. VerticalFlip bool // VerticalFlip flips video vertically for Raspivid input. Filters []int // Defines the methods of filtering to be used in between lexing and encoding. + MotionInterval int // Sets the number of frames that are held before the filter is used (on the nth frame) PSITime int // Sets the time between a packet being sent. // Ring buffer parameters. @@ -328,6 +330,7 @@ var TypeData = map[string]string{ "MOGHistory": "uint", "MOGMinArea": "float", "MOGThreshold": "float", + "MotionInterval": "int", "RBCapacity": "uint", "RBMaxElements": "uint", "RBWriteTimeout": "uint", @@ -462,6 +465,10 @@ func (c *Config) Validate() error { c.Logger.Log(logger.Info, pkg+"PSITime bad or unset, defaulting", "PSITime", defaultPSITime) c.PSITime = defaultPSITime } + if c.MotionInterval <= 0 { + c.Logger.Log(logger.Info, pkg+"MotionInterval bad or unset, defaulting", "MotionInterval", defaultMotionInterval) + c.MotionInterval = defaultMotionInterval + } if c.MinFPS <= 0 { c.Logger.Log(logger.Info, pkg+"MinFPS bad or unset, defaulting", "MinFPS", defaultMinFPS) diff --git a/revid/revid.go b/revid/revid.go index a05c9650..8b1b42da 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -337,11 +337,11 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. case config.FilterNoOp: r.filters[i] = filter.NewNoOp(dst) case config.FilterMOG: - r.filters[i] = filter.NewMOGFilter(dst, r.cfg.MOGMinArea, r.cfg.MOGThreshold, int(r.cfg.MOGHistory), r.cfg.ShowWindows) + r.filters[i] = filter.NewMOGFilter(dst, r.cfg.MOGMinArea, r.cfg.MOGThreshold, int(r.cfg.MOGHistory), r.cfg.ShowWindows, r.cfg.MotionInterval) case config.FilterVariableFPS: - r.filters[i] = filter.NewVariableFPSFilter(dst, r.cfg.MinFPS, filter.NewMOGFilter(dst, r.cfg.MOGMinArea, r.cfg.MOGThreshold, int(r.cfg.MOGHistory), r.cfg.ShowWindows)) + r.filters[i] = filter.NewVariableFPSFilter(dst, r.cfg.MinFPS, filter.NewMOGFilter(dst, r.cfg.MOGMinArea, r.cfg.MOGThreshold, int(r.cfg.MOGHistory), r.cfg.ShowWindows, r.cfg.MotionInterval)) case config.FilterKNN: - r.filters[i] = filter.NewKNNFilter(dst, r.cfg.KNNMinArea, r.cfg.KNNThreshold, int(r.cfg.KNNHistory), int(r.cfg.KNNKernel), r.cfg.ShowWindows) + r.filters[i] = filter.NewKNNFilter(dst, r.cfg.KNNMinArea, r.cfg.KNNThreshold, int(r.cfg.KNNHistory), int(r.cfg.KNNKernel), r.cfg.ShowWindows, r.cfg.MotionInterval) default: panic("Undefined Filter") } @@ -671,6 +671,13 @@ func (r *Revid) Update(vars map[string]string) error { } r.cfg.Filters[i] = v } + case "MotionInterval": + v, err := strconv.Atoi(value) + if err != nil || v < 0 { + r.cfg.Logger.Log(logger.Warning, pkg+"invalid MotionInterval var", "value", value) + break + } + r.cfg.MotionInterval = v case "PSITime": v, err := strconv.Atoi(value) if err != nil || v < 0 { @@ -847,7 +854,13 @@ func (r *Revid) Update(vars map[string]string) error { // processFrom is run as a routine to read from a input data source, lex and // then send individual access units to revid's encoders. func (r *Revid) processFrom(read io.Reader, delay time.Duration) { - r.err <- r.lexTo(r.filters[0], read, delay) - r.cfg.Logger.Log(logger.Info, pkg+"finished lexing") + err := r.lexTo(r.filters[0], read, delay) + r.cfg.Logger.Log(logger.Debug, pkg+"finished lexing") + switch err { + case nil: // Do nothing. + case io.EOF: // TODO: handle this depending on loop mode. + default: + r.err <- err + } r.wg.Done() }