Merged in filter-new (pull request #310)

Filter interface take 2

Approved-by: Saxon Milton <saxon.milton@gmail.com>
This commit is contained in:
Ella Pietraroia 2019-12-23 01:58:16 +00:00 committed by Saxon Milton
commit 476deedf2b
5 changed files with 86 additions and 6 deletions

49
filter/filter.go Normal file
View File

@ -0,0 +1,49 @@
/*
NAME
filter.go
AUTHORS
Ella Pietraroia <ella@ausocean.org>
LICENSE
filter.go is Copyright (C) 2019 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
// Package filter provides the interface and implementations of the filters to be used
// on video input that has been lexed.
package filter
import (
"io"
)
// Interface for all filters.
type Filter interface {
io.WriteCloser
//NB: Filter interface may evolve with more methods as required.
}
// The NoOp filter will perform no operation on the data that is being recieved,
// it will pass it on to the encoder with no changes.
type NoOp struct {
dst io.Writer
}
func NewNoOp(dst io.Writer) *NoOp { return &NoOp{dst: dst} }
func (n *NoOp) Write(p []byte) (int, error) { return n.dst.Write(p) }
func (n *NoOp) Close() error { return nil }

View File

@ -46,7 +46,7 @@ type MOGFilter struct {
}
// NewMOGFilter returns a pointer to a new MOGFilter.
func NewMOGFilter(dst io.WriteCloser, area, threshold float64, history, kernelSize int, debug bool) *MogFilter {
func NewMOGFilter(dst io.WriteCloser, area, threshold float64, history, kernelSize int, debug bool) *MOGFilter {
bs := gocv.NewBackgroundSubtractorMOG2WithParams(history, threshold, false)
k := gocv.GetStructuringElement(gocv.MorphRect, image.Pt(kernelSize, kernelSize))
var windows []*gocv.Window
@ -65,7 +65,7 @@ func (m *MOGFilter) Close() error {
for _, window := range m.windows {
window.Close()
}
return m.dst.Close()
return nil
}
// Implements io.Writer.

1
go.mod
View File

@ -12,5 +12,4 @@ require (
github.com/pkg/errors v0.8.1
github.com/yobert/alsa v0.0.0-20180630182551-d38d89fa843e
gocv.io/x/gocv v0.21.0
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
)

View File

@ -108,6 +108,12 @@ const (
QualityExcellent
)
// The different media filters.
const (
FilterNoOp = iota
FilterMOG
)
// Config provides parameters relevant to a revid instance. A new config must
// be passed to the constructor. Default values for these fields are defined
// as consts above.
@ -248,6 +254,7 @@ type Config struct {
HorizontalFlip bool // HorizontalFlip flips video horizontally for Raspivid input.
VerticalFlip bool // VerticalFlip flips video vertically for Raspivid input.
Filter int // Defines the method of filtering to be used in between lexing and encoding.
PSITime int // Sets the time between a packet being sent
// RTMP ring buffer parameters.

View File

@ -49,6 +49,7 @@ import (
"bitbucket.org/ausocean/av/device/geovision"
"bitbucket.org/ausocean/av/device/raspivid"
"bitbucket.org/ausocean/av/device/webcam"
"bitbucket.org/ausocean/av/filter"
"bitbucket.org/ausocean/av/revid/config"
"bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/ioext"
@ -110,7 +111,10 @@ type Revid struct {
// lexTo, encoder and packer handle transcoding the input stream.
lexTo func(dest io.Writer, src io.Reader, delay time.Duration) error
// encoders will hold the multiWriteCloser that writes to encoders from the lexer.
// filter will hold the filter interface that will write to the chosen filter from the lexer.
filter filter.Filter
// encoders will hold the multiWriteCloser that writes to encoders from the filter.
encoders io.WriteCloser
// running is used to keep track of revid's running state between methods.
@ -324,6 +328,15 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
r.encoders = multiWriter(encoders...)
switch r.cfg.Filter {
case config.FilterNoOp:
r.filter = filter.NewNoOp(r.encoders)
case config.FilterMOG:
r.filter = filter.NewMOGFilter(r.encoders, 25, 20, 500, 3, true)
default:
panic("Undefined Filter")
}
switch r.cfg.Input {
case config.InputRaspivid:
r.input = raspivid.New(r.cfg.Logger)
@ -432,11 +445,16 @@ func (r *Revid) Stop() {
r.cfg.Logger.Log(logger.Error, pkg+"could not stop input", "error", err.Error())
}
r.cfg.Logger.Log(logger.Info, pkg+"closing pipeline")
r.cfg.Logger.Log(logger.Info, pkg+"closing pid peline")
err = r.encoders.Close()
if err != nil {
r.cfg.Logger.Log(logger.Error, pkg+"failed to close pipeline", "error", err.Error())
}
err = r.filter.Close()
if err != nil {
r.cfg.Logger.Log(logger.Error, pkg+"failed to close pipeline", "error", err.Error())
}
r.cfg.Logger.Log(logger.Info, pkg+"closed pipeline")
if r.cmd != nil && r.cmd.Process != nil {
@ -611,6 +629,13 @@ func (r *Revid) Update(vars map[string]string) error {
default:
r.cfg.Logger.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value)
}
case "Filter":
m := map[string]int{"NoOp": config.FilterNoOp, "MOG": config.FilterMOG}
v, ok := m[value]
if !ok {
r.cfg.Logger.Log(logger.Warning, pkg+"invalid FilterMethod param", "value", value)
}
r.cfg.Filter = v
case "PSITime":
v, err := strconv.Atoi(value)
if err != nil || v < 0 {
@ -719,7 +744,7 @@ func (r *Revid) Update(vars map[string]string) error {
// processFrom is run as a routine to read from a input data source, lex and
// then send individual access units to revid's encoders.
func (r *Revid) processFrom(read io.Reader, delay time.Duration) {
r.err <- r.lexTo(r.encoders, read, delay)
r.err <- r.lexTo(r.filter, read, delay)
r.cfg.Logger.Log(logger.Info, pkg+"finished lexing")
r.wg.Done()
}