From 32612865dcf501ee6b80d9c3ae535fd2e7c170d6 Mon Sep 17 00:00:00 2001 From: Ella Pietraroia Date: Wed, 22 Jan 2020 16:32:47 +1030 Subject: [PATCH 1/6] making new file for basic filter --- filter/basic.go | 62 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 filter/basic.go diff --git a/filter/basic.go b/filter/basic.go new file mode 100644 index 00000000..a810b0a0 --- /dev/null +++ b/filter/basic.go @@ -0,0 +1,62 @@ +// +build !circleci + +/* +DESCRIPTION + A filter that detects motion and discards frames without motion. The + filter uses a Mixture of Gaussians method (MoG) to determine what is + background and what is foreground. + +AUTHORS + Scott Barnard + +LICENSE + mog.go is Copyright (C) 2019 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + +package filter + +import ( + "io" +) + +// MOGFilter is a filter that provides basic motion detection. MoG is short for +// Mixture of Gaussians method. +type BasicFilter struct { + dst io.WriteCloser +} + +// NewMOGFilter returns a pointer to a new MOGFilter struct. +func NewBasicFilter(dst io.WriteCloser) *BasicFilter { + return &BasicFilter{dst} +} + +// Implements io.Closer. +// Close frees resources used by gocv, because it has to be done manually, due to +// it using c-go. +func (m *BasicFilter) Close() error { + return nil +} + +// Implements io.Writer. +// Write applies the motion filter to the video stream. Only frames with motion +// are written to the destination encoder, frames without are discarded. +func (m *BasicFilter) Write(f []byte) (int, error) { + //decode MJPEG + + //for all frames find difference in this frame from last frames + + return m.dst.Write(f) +} From f3c7b0f6aca1397bed4ed027573584c7680b2400 Mon Sep 17 00:00:00 2001 From: Ella Pietraroia Date: Fri, 24 Jan 2020 17:01:28 +1030 Subject: [PATCH 2/6] basic filter that does not use gocv for motion detection --- filter/basic.go | 136 ++++++++++++++++++++++++++++++++++++++--- filter/mog.go | 9 ++- revid/config/config.go | 3 +- revid/revid.go | 4 +- 4 files changed, 140 insertions(+), 12 deletions(-) diff --git a/filter/basic.go b/filter/basic.go index a810b0a0..6175f823 100644 --- a/filter/basic.go +++ b/filter/basic.go @@ -7,7 +7,7 @@ DESCRIPTION background and what is foreground. AUTHORS - Scott Barnard + Ella Pietraroia LICENSE mog.go is Copyright (C) 2019 the Australian Ocean Lab (AusOcean) @@ -29,18 +29,46 @@ LICENSE package filter import ( + "bytes" + "fmt" + "image" + "image/color" + "image/jpeg" "io" + "os" + "time" + + "golang.org/x/image/font" + "golang.org/x/image/font/basicfont" + "golang.org/x/image/math/fixed" ) // MOGFilter is a filter that provides basic motion detection. MoG is short for // Mixture of Gaussians method. type BasicFilter struct { - dst io.WriteCloser + dst io.WriteCloser + bg [][][3]uint32 + current [][][3]uint32 + w int + h int + file io.WriteCloser + debug bool } // NewMOGFilter returns a pointer to a new MOGFilter struct. -func NewBasicFilter(dst io.WriteCloser) *BasicFilter { - return &BasicFilter{dst} +func NewBasicFilter(dst io.WriteCloser, debug bool) *BasicFilter { + var file io.WriteCloser + var err error + if debug { + file, err = os.Create("motion.mjpeg") + if err != nil { + panic("!!! TEST CODE !!!: file didnt work") + } + } else { + file = nil + } + + return &BasicFilter{dst, nil, nil, 0, 0, file, debug} } // Implements io.Closer. @@ -53,10 +81,104 @@ func (m *BasicFilter) Close() error { // Implements io.Writer. // Write applies the motion filter to the video stream. Only frames with motion // are written to the destination encoder, frames without are discarded. -func (m *BasicFilter) Write(f []byte) (int, error) { +func (b *BasicFilter) Write(f []byte) (int, error) { + //t0 := time.Now() //decode MJPEG + r := bytes.NewReader(f) + img, err := jpeg.Decode(r) + if err != nil { + return 0, fmt.Errorf("image can't be decoded: %w", err) + } + t1 := time.Now() - //for all frames find difference in this frame from last frames + //get background image and save a new background image if needed + //first frame must always be sent + if b.bg == nil { + bounds := img.Bounds() + b.w = bounds.Max.X + b.h = bounds.Max.Y - return m.dst.Write(f) + b.bg = make([][][3]uint32, b.h) + for i, _ := range b.bg { + b.bg[i] = make([][3]uint32, b.w) + } + + b.current = make([][][3]uint32, b.h) + for i, _ := range b.current { + b.current[i] = make([][3]uint32, b.w) + } + + for j := 0; j < b.h; j++ { + for i := 0; i < b.w; i++ { + n := img.At(i, j) + R, G, B, _ := n.RGBA() + b.bg[j][i] = [3]uint32{R, G, B} + } + } + t2 := time.Now() + fmt.Print("BG loop: ", t2.Sub(t1).Milliseconds(), "\n") + return len(f), nil + } + + motion := 0 + //for all pixels get the difference from the background image + + bwImg := image.NewRGBA(image.Rect(0, 0, b.w, b.h)) + for j := 0; j < b.h; j++ { + for i := 0; i < b.w; i++ { + n := img.At(i, j) + R, G, B, _ := n.RGBA() + b.current[j][i] = [3]uint32{R, G, B} + + diffR := R - b.bg[j][i][0] + diffG := G - b.bg[j][i][1] + diffB := B - b.bg[j][i][2] + + // diffR := int(math.Abs(float64(R) - float64(b.bg[j][i][0]))) + // diffG := int(math.Abs(float64(G) - float64(b.bg[j][i][1]))) + // diffB := int(math.Abs(float64(B) - float64(b.bg[j][i][2]))) + diff := diffR + diffG + diffB + + if diff < 45000 && b.debug { + bwImg.SetRGBA(i, j, color.RGBA{0x00, 0x00, 0x00, 0xff}) + } else { + bwImg.SetRGBA(i, j, color.RGBA{0xff, 0xff, 0xff, 0xff}) + motion++ + } + } + } + //t3 := time.Now() + //visualise + if b.debug { + col := color.RGBA{200, 100, 0, 255} + d := &font.Drawer{ + Dst: bwImg, + Src: image.NewUniform(col), + Face: basicfont.Face7x13, + Dot: fixed.P(20, 30), + } + if motion > 1000 { + d.DrawString("Motion") + } + err = jpeg.Encode(b.file, bwImg, nil) + if err != nil { + return len(f), err + } + } + //t4 := time.Now() + // //update backgound image + b.bg = b.current + //get mean of this difference + + //choose a threshold that motion is detected for if greater + + //discard non motion frames + + //write motion frames + // fmt.Print("Encode: ", t1.Sub(t0).Milliseconds(), "\n") + // fmt.Print("BG loop: ", t2.Sub(t1).Milliseconds(), "\n") + // fmt.Print("Calc loop: ", t3.Sub(t2).Milliseconds(), "\n") + // fmt.Print("VLC: ", t4.Sub(t3).Milliseconds(), "\n") + // fmt.Print("Total: ", time.Now().Sub(t0).Milliseconds(), "\n\n") + return b.dst.Write(f) } diff --git a/filter/mog.go b/filter/mog.go index 20fd5807..e5aa2927 100644 --- a/filter/mog.go +++ b/filter/mog.go @@ -33,6 +33,7 @@ import ( "image" "image/color" "io" + "time" "gocv.io/x/gocv" ) @@ -78,6 +79,7 @@ func (m *MOGFilter) Close() error { // Write applies the motion filter to the video stream. Only frames with motion // are written to the destination encoder, frames without are discarded. func (m *MOGFilter) Write(f []byte) (int, error) { + t0 := time.Now() if m.hfCount < (m.hf - 1) { m.hold[m.hfCount] = f m.hfCount++ @@ -134,9 +136,9 @@ func (m *MOGFilter) Write(f []byte) (int, error) { } // Don't write to destination if there is no motion. - if len(contours) == 0 { - return len(f), nil - } + // if len(contours) == 0 { + // return len(f), nil + // } // Write to destination, past 4 frames then current frame. for i, h := range m.hold { @@ -146,5 +148,6 @@ func (m *MOGFilter) Write(f []byte) (int, error) { return len(f), fmt.Errorf("could not write previous frames: %w", err) } } + fmt.Print(time.Now().Sub(t0).Milliseconds(), "\n") return m.dst.Write(f) } diff --git a/revid/config/config.go b/revid/config/config.go index 5f6fa4b3..af2ec6b1 100644 --- a/revid/config/config.go +++ b/revid/config/config.go @@ -126,6 +126,7 @@ const ( FilterMOG FilterVariableFPS FilterKNN + FilterBasic ) // OS names @@ -310,7 +311,7 @@ var TypeData = map[string]string{ "CBR": "bool", "ClipDuration": "uint", "Exposure": "enum:auto,night,nightpreview,backlight,spotlight,sports,snow,beach,verylong,fixedfps,antishake,fireworks", - "Filters": "enums:NoOp,MOG,VariableFPS,KNN", + "Filters": "enums:NoOp,MOG,VariableFPS,KNN,Basic", "FrameRate": "uint", "Height": "uint", "HorizontalFlip": "bool", diff --git a/revid/revid.go b/revid/revid.go index d717d66f..bdbab24e 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -360,6 +360,8 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. r.filters[i] = filter.NewVariableFPSFilter(dst, minFPS, filter.NewMOGFilter(r.encoders, mogMinArea, mogThreshold, mogHistory, r.cfg.ShowWindows, r.cfg.MotionInterval)) case config.FilterKNN: r.filters[i] = filter.NewKNNFilter(dst, knnMinArea, knnThreshold, knnHistory, knnKernel, r.cfg.ShowWindows) + case config.FilterBasic: + r.filters[i] = filter.NewBasicFilter(dst, false) default: panic("Undefined Filter") } @@ -680,7 +682,7 @@ func (r *Revid) Update(vars map[string]string) error { } case "Filters": filters := strings.Split(value, ",") - m := map[string]int{"NoOp": config.FilterNoOp, "MOG": config.FilterMOG, "VariableFPS": config.FilterVariableFPS, "KNN": config.FilterKNN} + m := map[string]int{"NoOp": config.FilterNoOp, "MOG": config.FilterMOG, "VariableFPS": config.FilterVariableFPS, "KNN": config.FilterKNN, "Basic": config.FilterBasic} r.cfg.Filters = make([]int, len(filters)) for i, filter := range filters { v, ok := m[filter] From a66972a9d61a4a1bb238fca44e1c40be7ae9ce70 Mon Sep 17 00:00:00 2001 From: Ella Pietraroia Date: Tue, 28 Jan 2020 16:04:18 +1030 Subject: [PATCH 3/6] optimising --- filter/basic.go | 98 ++++++++++++++++++++++++++++++------------------- filter/mog.go | 9 ++--- 2 files changed, 63 insertions(+), 44 deletions(-) diff --git a/filter/basic.go b/filter/basic.go index 6175f823..9657874b 100644 --- a/filter/basic.go +++ b/filter/basic.go @@ -43,16 +43,25 @@ import ( "golang.org/x/image/math/fixed" ) +func absDiff(a, b uint32) int { + c := int(a) - int(b) + if c < 0 { + return -c + } else { + return c + } + +} + // MOGFilter is a filter that provides basic motion detection. MoG is short for // Mixture of Gaussians method. type BasicFilter struct { - dst io.WriteCloser - bg [][][3]uint32 - current [][][3]uint32 - w int - h int - file io.WriteCloser - debug bool + dst io.WriteCloser + bg [][][3]uint32 + w int + h int + file io.WriteCloser + debug bool } // NewMOGFilter returns a pointer to a new MOGFilter struct. @@ -68,7 +77,7 @@ func NewBasicFilter(dst io.WriteCloser, debug bool) *BasicFilter { file = nil } - return &BasicFilter{dst, nil, nil, 0, 0, file, debug} + return &BasicFilter{dst, nil, 0, 0, file, debug} } // Implements io.Closer. @@ -82,7 +91,7 @@ func (m *BasicFilter) Close() error { // Write applies the motion filter to the video stream. Only frames with motion // are written to the destination encoder, frames without are discarded. func (b *BasicFilter) Write(f []byte) (int, error) { - //t0 := time.Now() + t0 := time.Now() //decode MJPEG r := bytes.NewReader(f) img, err := jpeg.Decode(r) @@ -103,20 +112,13 @@ func (b *BasicFilter) Write(f []byte) (int, error) { b.bg[i] = make([][3]uint32, b.w) } - b.current = make([][][3]uint32, b.h) - for i, _ := range b.current { - b.current[i] = make([][3]uint32, b.w) - } - for j := 0; j < b.h; j++ { for i := 0; i < b.w; i++ { n := img.At(i, j) R, G, B, _ := n.RGBA() - b.bg[j][i] = [3]uint32{R, G, B} + copy(b.bg[j][i][:], []uint32{R, G, B}) } } - t2 := time.Now() - fmt.Print("BG loop: ", t2.Sub(t1).Milliseconds(), "\n") return len(f), nil } @@ -126,28 +128,49 @@ func (b *BasicFilter) Write(f []byte) (int, error) { bwImg := image.NewRGBA(image.Rect(0, 0, b.w, b.h)) for j := 0; j < b.h; j++ { for i := 0; i < b.w; i++ { + // ti0 := time.Now() n := img.At(i, j) + // ti1 := time.Now() R, G, B, _ := n.RGBA() - b.current[j][i] = [3]uint32{R, G, B} - - diffR := R - b.bg[j][i][0] - diffG := G - b.bg[j][i][1] - diffB := B - b.bg[j][i][2] - + // ti2 := time.Now() // diffR := int(math.Abs(float64(R) - float64(b.bg[j][i][0]))) // diffG := int(math.Abs(float64(G) - float64(b.bg[j][i][1]))) // diffB := int(math.Abs(float64(B) - float64(b.bg[j][i][2]))) + + diffB := absDiff(B, b.bg[j][i][2]) + diffR := absDiff(R, b.bg[j][i][0]) + diffG := absDiff(G, b.bg[j][i][1]) + + // ti3 := time.Now() + diff := diffR + diffG + diffB - if diff < 45000 && b.debug { - bwImg.SetRGBA(i, j, color.RGBA{0x00, 0x00, 0x00, 0xff}) - } else { - bwImg.SetRGBA(i, j, color.RGBA{0xff, 0xff, 0xff, 0xff}) + if diff > 45000 { motion++ } + + if b.debug { + if diff > 45000 { + bwImg.SetRGBA(i, j, color.RGBA{0xff, 0xff, 0xff, 0xff}) + } else { + bwImg.SetRGBA(i, j, color.RGBA{0x00, 0x00, 0x00, 0xff}) + } + } + // ti4 := time.Now() + // Update backgound image. + copy(b.bg[j][i][:], []uint32{R, G, B}) + // ti5 := time.Now() + + // fmt.Print("At: ", ti1.Sub(ti0).Nanoseconds(), "\n") + // fmt.Print("RGBA: ", ti2.Sub(ti1).Nanoseconds(), "\n") + // fmt.Print("3x absDiff: ", ti3.Sub(ti2).Nanoseconds(), "\n") + // fmt.Print("total diff + threshold + debug: ", ti4.Sub(ti3).Nanoseconds(), "\n") + // fmt.Print("update bg: ", ti5.Sub(ti4).Nanoseconds(), "\n") + // fmt.Print("Total: ", time.Now().Sub(ti0).Nanoseconds(), "\n\n") } } - //t3 := time.Now() + t2 := time.Now() + //visualise if b.debug { col := color.RGBA{200, 100, 0, 255} @@ -165,20 +188,19 @@ func (b *BasicFilter) Write(f []byte) (int, error) { return len(f), err } } - //t4 := time.Now() - // //update backgound image - b.bg = b.current - //get mean of this difference + t3 := time.Now() + + fmt.Print("Encode: ", t1.Sub(t0).Milliseconds(), "\n") + fmt.Print("Calc loop: ", t2.Sub(t1).Milliseconds(), "\n") + fmt.Print("VLC: ", t3.Sub(t2).Milliseconds(), "\n") + fmt.Print("Total: ", time.Now().Sub(t0).Milliseconds(), "\n\n") //choose a threshold that motion is detected for if greater - + if motion < 1000 { + return len(f), nil + } //discard non motion frames //write motion frames - // fmt.Print("Encode: ", t1.Sub(t0).Milliseconds(), "\n") - // fmt.Print("BG loop: ", t2.Sub(t1).Milliseconds(), "\n") - // fmt.Print("Calc loop: ", t3.Sub(t2).Milliseconds(), "\n") - // fmt.Print("VLC: ", t4.Sub(t3).Milliseconds(), "\n") - // fmt.Print("Total: ", time.Now().Sub(t0).Milliseconds(), "\n\n") return b.dst.Write(f) } diff --git a/filter/mog.go b/filter/mog.go index e5aa2927..20fd5807 100644 --- a/filter/mog.go +++ b/filter/mog.go @@ -33,7 +33,6 @@ import ( "image" "image/color" "io" - "time" "gocv.io/x/gocv" ) @@ -79,7 +78,6 @@ func (m *MOGFilter) Close() error { // Write applies the motion filter to the video stream. Only frames with motion // are written to the destination encoder, frames without are discarded. func (m *MOGFilter) Write(f []byte) (int, error) { - t0 := time.Now() if m.hfCount < (m.hf - 1) { m.hold[m.hfCount] = f m.hfCount++ @@ -136,9 +134,9 @@ func (m *MOGFilter) Write(f []byte) (int, error) { } // Don't write to destination if there is no motion. - // if len(contours) == 0 { - // return len(f), nil - // } + if len(contours) == 0 { + return len(f), nil + } // Write to destination, past 4 frames then current frame. for i, h := range m.hold { @@ -148,6 +146,5 @@ func (m *MOGFilter) Write(f []byte) (int, error) { return len(f), fmt.Errorf("could not write previous frames: %w", err) } } - fmt.Print(time.Now().Sub(t0).Milliseconds(), "\n") return m.dst.Write(f) } From e82f4131730fd48343d83167c81dc92b0e503b4a Mon Sep 17 00:00:00 2001 From: Ella Pietraroia Date: Thu, 30 Jan 2020 15:31:01 +1030 Subject: [PATCH 4/6] added 4x goroutines with mutex lock and waitgroups --- codec/mjpeg/lex.go | 1 + filter/basic.go | 154 +++++++++++++++++++++++++++------------------ revid/revid.go | 3 +- 3 files changed, 96 insertions(+), 62 deletions(-) diff --git a/codec/mjpeg/lex.go b/codec/mjpeg/lex.go index 46ab4ce8..f1166022 100644 --- a/codec/mjpeg/lex.go +++ b/codec/mjpeg/lex.go @@ -86,6 +86,7 @@ func Lex(dst io.Writer, src io.Reader, delay time.Duration) error { } last = b } + fmt.Print("lex: just before write") <-tick _, err = dst.Write(buf) if err != nil { diff --git a/filter/basic.go b/filter/basic.go index 9657874b..47e64e1e 100644 --- a/filter/basic.go +++ b/filter/basic.go @@ -36,6 +36,8 @@ import ( "image/jpeg" "io" "os" + "strconv" + "sync" "time" "golang.org/x/image/font" @@ -56,57 +58,108 @@ func absDiff(a, b uint32) int { // MOGFilter is a filter that provides basic motion detection. MoG is short for // Mixture of Gaussians method. type BasicFilter struct { - dst io.WriteCloser - bg [][][3]uint32 - w int - h int - file io.WriteCloser - debug bool + dst io.WriteCloser + img image.Image + bg [][][3]uint32 + bwImg *image.RGBA + w int + h int + file io.WriteCloser + motion int + debug bool } // NewMOGFilter returns a pointer to a new MOGFilter struct. func NewBasicFilter(dst io.WriteCloser, debug bool) *BasicFilter { + bwImg := image.NewRGBA(image.Rect(0, 0, 0, 0)) var file io.WriteCloser var err error if debug { file, err = os.Create("motion.mjpeg") if err != nil { - panic("!!! TEST CODE !!!: file didnt work") + panic("debug file didn't create") } } else { file = nil } - - return &BasicFilter{dst, nil, 0, 0, file, debug} + return &BasicFilter{dst, nil, nil, bwImg, 0, 0, file, 0, debug} } // Implements io.Closer. // Close frees resources used by gocv, because it has to be done manually, due to // it using c-go. -func (m *BasicFilter) Close() error { +func (b *BasicFilter) Close() error { return nil } +// Go routine for one row of the image to be processed +func (b *BasicFilter) Process(j int, wg *sync.WaitGroup) { + defer wg.Done() + for i := 0; i < b.w; i++ { + // ti0 := time.Now() + n := b.img.At(i, j) + // ti1 := time.Now() + R, G, B, _ := n.RGBA() + // ti2 := time.Now() + + diffB := absDiff(B, b.bg[j][i][2]) + diffR := absDiff(R, b.bg[j][i][0]) + diffG := absDiff(G, b.bg[j][i][1]) + + // ti3 := time.Now() + + diff := diffR + diffG + diffB + + if diff > 45000 { + b.motion++ + } + + if b.debug { + if diff > 45000 { + b.bwImg.SetRGBA(i, j, color.RGBA{0xff, 0xff, 0xff, 0xff}) + } else { + b.bwImg.SetRGBA(i, j, color.RGBA{0x00, 0x00, 0x00, 0xff}) + } + } + // ti4 := time.Now() + // Update backgound image. + copy(b.bg[j][i][:], []uint32{R, G, B}) + // ti5 := time.Now() + + // fmt.Print("At: ", ti1.Sub(ti0).Nanoseconds(), "\n") + // fmt.Print("RGBA: ", ti2.Sub(ti1).Nanoseconds(), "\n") + // fmt.Print("3x absDiff: ", ti3.Sub(ti2).Nanoseconds(), "\n") + // fmt.Print("total diff + threshold + debug: ", ti4.Sub(ti3).Nanoseconds(), "\n") + // fmt.Print("update bg: ", ti5.Sub(ti4).Nanoseconds(), "\n") + // fmt.Print("Total: ", time.Now().Sub(ti0).Nanoseconds(), "\n\n") + } +} + // Implements io.Writer. // Write applies the motion filter to the video stream. Only frames with motion // are written to the destination encoder, frames without are discarded. func (b *BasicFilter) Write(f []byte) (int, error) { + fmt.Print("in basic write\n") t0 := time.Now() //decode MJPEG r := bytes.NewReader(f) - img, err := jpeg.Decode(r) + var err error + b.img, err = jpeg.Decode(r) if err != nil { return 0, fmt.Errorf("image can't be decoded: %w", err) } + t1 := time.Now() //get background image and save a new background image if needed //first frame must always be sent if b.bg == nil { - bounds := img.Bounds() + bounds := b.img.Bounds() b.w = bounds.Max.X b.h = bounds.Max.Y + b.bwImg = image.NewRGBA(image.Rect(0, 0, b.w, b.h)) + b.bg = make([][][3]uint32, b.h) for i, _ := range b.bg { b.bg[i] = make([][3]uint32, b.w) @@ -114,7 +167,7 @@ func (b *BasicFilter) Write(f []byte) (int, error) { for j := 0; j < b.h; j++ { for i := 0; i < b.w; i++ { - n := img.At(i, j) + n := b.img.At(i, j) R, G, B, _ := n.RGBA() copy(b.bg[j][i][:], []uint32{R, G, B}) } @@ -122,68 +175,49 @@ func (b *BasicFilter) Write(f []byte) (int, error) { return len(f), nil } - motion := 0 //for all pixels get the difference from the background image - bwImg := image.NewRGBA(image.Rect(0, 0, b.w, b.h)) - for j := 0; j < b.h; j++ { - for i := 0; i < b.w; i++ { - // ti0 := time.Now() - n := img.At(i, j) - // ti1 := time.Now() - R, G, B, _ := n.RGBA() - // ti2 := time.Now() - // diffR := int(math.Abs(float64(R) - float64(b.bg[j][i][0]))) - // diffG := int(math.Abs(float64(G) - float64(b.bg[j][i][1]))) - // diffB := int(math.Abs(float64(B) - float64(b.bg[j][i][2]))) + // c := make(chan int) + // fmt.Print("channel made \n") + var m sync.Mutex - diffB := absDiff(B, b.bg[j][i][2]) - diffR := absDiff(R, b.bg[j][i][0]) - diffG := absDiff(G, b.bg[j][i][1]) + j := 0 + m.Lock() + var wg sync.WaitGroup - // ti3 := time.Now() - - diff := diffR + diffG + diffB - - if diff > 45000 { - motion++ - } - - if b.debug { - if diff > 45000 { - bwImg.SetRGBA(i, j, color.RGBA{0xff, 0xff, 0xff, 0xff}) - } else { - bwImg.SetRGBA(i, j, color.RGBA{0x00, 0x00, 0x00, 0xff}) - } - } - // ti4 := time.Now() - // Update backgound image. - copy(b.bg[j][i][:], []uint32{R, G, B}) - // ti5 := time.Now() - - // fmt.Print("At: ", ti1.Sub(ti0).Nanoseconds(), "\n") - // fmt.Print("RGBA: ", ti2.Sub(ti1).Nanoseconds(), "\n") - // fmt.Print("3x absDiff: ", ti3.Sub(ti2).Nanoseconds(), "\n") - // fmt.Print("total diff + threshold + debug: ", ti4.Sub(ti3).Nanoseconds(), "\n") - // fmt.Print("update bg: ", ti5.Sub(ti4).Nanoseconds(), "\n") - // fmt.Print("Total: ", time.Now().Sub(ti0).Nanoseconds(), "\n\n") - } + for j < b.h { + wg.Add(4) + go b.Process(j, &wg) + go b.Process(j+1, &wg) + go b.Process(j+2, &wg) + go b.Process(j+3, &wg) + j = j + 4 + wg.Wait() } + + if j >= b.h { + m.Unlock() + } + t2 := time.Now() //visualise if b.debug { col := color.RGBA{200, 100, 0, 255} d := &font.Drawer{ - Dst: bwImg, + Dst: b.bwImg, Src: image.NewUniform(col), Face: basicfont.Face7x13, Dot: fixed.P(20, 30), } - if motion > 1000 { - d.DrawString("Motion") + var s string + if b.motion > 1000 { + s = strconv.Itoa(b.motion) + " Motion" + } else { + s = strconv.Itoa(b.motion) } - err = jpeg.Encode(b.file, bwImg, nil) + d.DrawString(s) + err = jpeg.Encode(b.file, b.bwImg, nil) if err != nil { return len(f), err } @@ -196,7 +230,7 @@ func (b *BasicFilter) Write(f []byte) (int, error) { fmt.Print("Total: ", time.Now().Sub(t0).Milliseconds(), "\n\n") //choose a threshold that motion is detected for if greater - if motion < 1000 { + if b.motion < 1000 { return len(f), nil } //discard non motion frames diff --git a/revid/revid.go b/revid/revid.go index bdbab24e..8e1a4b35 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -361,7 +361,7 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. case config.FilterKNN: r.filters[i] = filter.NewKNNFilter(dst, knnMinArea, knnThreshold, knnHistory, knnKernel, r.cfg.ShowWindows) case config.FilterBasic: - r.filters[i] = filter.NewBasicFilter(dst, false) + r.filters[i] = filter.NewBasicFilter(dst, r.cfg.ShowWindows) default: panic("Undefined Filter") } @@ -452,7 +452,6 @@ func (r *Revid) Start() error { if err != nil { return fmt.Errorf("could not start input device: %w", err) } - r.wg.Add(1) go r.processFrom(r.input, 0) From 513eee1260011ed27e3082de19fee9b81371515a Mon Sep 17 00:00:00 2001 From: Ella Pietraroia Date: Fri, 31 Jan 2020 10:26:25 +1030 Subject: [PATCH 5/6] finding errors --- cmd/revid-cli/main.go | 2 +- cmd/rv/main.go | 274 ++++++++++++++++++++++++++++++++++++++++++ codec/mjpeg/lex.go | 1 - filter/basic.go | 1 - 4 files changed, 275 insertions(+), 3 deletions(-) create mode 100644 cmd/rv/main.go diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index bced0a4f..1b973d3a 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -331,7 +331,7 @@ func run(cfg config.Config) { case normal: err = rv.Start() if err != nil { - log.Log(logger.Warning, pkg+"could not start revid", "error", err.Error()) + log.Log(logger.Error, pkg+"could not start revid", "error", err.Error()) ns.SetMode(paused, &vs) goto sleep } diff --git a/cmd/rv/main.go b/cmd/rv/main.go new file mode 100644 index 00000000..fb8dd4b1 --- /dev/null +++ b/cmd/rv/main.go @@ -0,0 +1,274 @@ +/* +DESCRIPTION + rv is a netsender client using the revid package to perform media collection + and forwarding whose behaviour is controllable via the cloud interfaces + netreceiver and vidgrind. + +AUTHORS + Saxon A. Nelson-Milton + Alan Noble + Dan Kortschak + Jack Richardson + Trek Hopton + Scott Barnard + +LICENSE + Copyright (C) 2020 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. + +USAGE + There must firstly be a netsender configuration file under /etc/netsender.conf. + Example: + + ma 00:00:00:00:00:01 + dk 0 + wi + ip V0, T0 + op + mp 60 + ap 0 + tg + hw + sh vidgrind.appspot.com + + Revid configuration is controlled by valid variables given values on netreceiver + or vidgrind interface. See revid/config for valid variables. + + To run rv simply build and call: + ./rv +*/ + +// Package rv is a netsender client for revid. +package main + +import ( + "fmt" + "io" + "os" + "runtime/pprof" + "strconv" + "time" + + "gopkg.in/natefinch/lumberjack.v2" + + "bitbucket.org/ausocean/av/container/mts" + "bitbucket.org/ausocean/av/container/mts/meta" + "bitbucket.org/ausocean/av/revid" + "bitbucket.org/ausocean/av/revid/config" + "bitbucket.org/ausocean/iot/pi/netlogger" + "bitbucket.org/ausocean/iot/pi/netsender" + "bitbucket.org/ausocean/iot/pi/sds" + "bitbucket.org/ausocean/utils/logger" +) + +// Copyright information prefixed to all metadata. +const ( + metaPreambleKey = "copyright" + metaPreambleData = "ausocean.org/license/content2019" +) + +// Logging configuration. +const ( + logPath = "/var/log/netsender/netsender.log" + logMaxSize = 500 // MB + logMaxBackup = 10 + logMaxAge = 28 // days + logVerbosity = logger.Info + logSuppress = true +) + +// Revid modes. +const ( + modeNormal = "Normal" + modePaused = "Paused" + modeBurst = "Burst" + modeLoop = "Loop" +) + +// Misc constants. +const ( + netSendRetryTime = 5 * time.Second + defaultSleepTime = 60 // Seconds + profilePath = "rv.prof" + pkg = "rv: " +) + +// This is set to true if the 'profile' build tag is provided on build. +var canProfile = false + +func main() { + mts.Meta = meta.NewWith([][2]string{{metaPreambleKey, metaPreambleData}}) + + // Create lumberjack logger to handle logging to file. + fileLog := &lumberjack.Logger{ + Filename: logPath, + MaxSize: logMaxSize, + MaxBackups: logMaxBackup, + MaxAge: logMaxAge, + } + + // Create netlogger to handle logging to cloud. + netLog := netlogger.New() + + // Create logger that we call methods on to log, which in turn writes to the + // lumberjack and netloggers. + log := logger.New(logVerbosity, io.MultiWriter(fileLog, netLog), logSuppress) + + // If rv has been built with the profile tag, then we'll start a CPU profile. + if canProfile { + profile(log) + defer pprof.StopCPUProfile() + } + + var rv *revid.Revid + + ns, err := netsender.New(log, nil, readPin(rv), nil, config.TypeData) + if err != nil { + log.Log(logger.Fatal, pkg+"could not initialise netsender client: "+err.Error()) + } + + rv, err = revid.New(config.Config{Logger: log}, ns) + if err != nil { + log.Log(logger.Fatal, pkg+"could not initialise revid", "error", err.Error()) + } + + run(rv, ns, log, netLog) +} + +// run starts the main loop. This will run netsender on every pass of the loop +// (sleeping inbetween), check vars, and if changed, update revid as appropriate. +func run(rv *revid.Revid, ns *netsender.Sender, l *logger.Logger, nl *netlogger.Logger) { + var vs int + for { + err := ns.Run() + if err != nil { + l.Log(logger.Warning, pkg+"Run Failed. Retrying...", "error", err.Error()) + time.Sleep(netSendRetryTime) + continue + } + + err = nl.Send(ns) + if err != nil { + l.Log(logger.Warning, pkg+"Logs could not be sent", "error", err.Error()) + } + + // If var sum hasn't changed we skip rest of loop. + newVs := ns.VarSum() + if vs == newVs { + sleep(ns, l) + continue + } + vs = newVs + + vars, err := ns.Vars() + if err != nil { + l.Log(logger.Error, pkg+"netSender failed to get vars", "error", err.Error()) + time.Sleep(netSendRetryTime) + continue + } + + // Configure revid based on the vars. + err = rv.Update(vars) + if err != nil { + l.Log(logger.Warning, pkg+"Couldn't update revid", "error", err.Error()) + sleep(ns, l) + continue + } + + switch ns.Mode() { + case modePaused: + rv.Stop() + case modeNormal, modeLoop: + err = rv.Start() + if err != nil { + l.Log(logger.Error, pkg+"could not start revid", "error", err.Error()) + ns.SetMode(modePaused, &vs) + sleep(ns, l) + continue + } + case modeBurst: + err = burst(l, rv, ns) + if err != nil { + l.Log(logger.Warning, pkg+"could not start burst", "error", err.Error()) + ns.SetMode(modePaused, &vs) + sleep(ns, l) + continue + } + ns.SetMode(modePaused, &vs) + } + + sleep(ns, l) + } +} + +// profile opens a file to hold CPU profiling metrics and then starts the +// CPU profiler. +func profile(l *logger.Logger) { + f, err := os.Create(profilePath) + if err != nil { + l.Log(logger.Fatal, pkg+"could not create CPU profile", "error", err.Error()) + } + if err := pprof.StartCPUProfile(f); err != nil { + l.Log(logger.Fatal, pkg+"could not start CPU profile", "error", err.Error()) + } +} + +// sleep uses a delay to halt the program based on the monitoring period +// netsender parameter (mp) defined in the netsender.conf config. +func sleep(ns *netsender.Sender, l *logger.Logger) { + t, err := strconv.Atoi(ns.Param("mp")) + if err != nil { + l.Log(logger.Error, pkg+"could not get sleep time, using default", "error", err) + t = defaultSleepTime + } + time.Sleep(time.Duration(t) * time.Second) +} + +// readPin provides a callback function of consistent signature for use by +// netsender to retrieve software defined pin values e.g. revid bitrate (X23). +func readPin(rv *revid.Revid) func(pin *netsender.Pin) error { + return func(pin *netsender.Pin) error { + switch { + case pin.Name == "X23": + pin.Value = -1 + if rv != nil { + pin.Value = rv.Bitrate() + } + case pin.Name[0] == 'X': + return sds.ReadSystem(pin) + default: + pin.Value = -1 + } + return nil + } +} + +// burst starts revid, waits for time specified in the Config.BurstPeriod +// field, and then stops revid. +// +// TODO: move this functionality to the revid API into a Revid.Burst(time) method. +func burst(l *logger.Logger, r *revid.Revid, s *netsender.Sender) error { + l.Log(logger.Info, pkg+"starting burst") + + err := r.Start() + if err != nil { + return fmt.Errorf("could not start revid: %w", err) + } + + time.Sleep(time.Duration(r.Config().BurstPeriod) * time.Second) + l.Log(logger.Info, pkg+"stopping burst") + r.Stop() + return nil +} diff --git a/codec/mjpeg/lex.go b/codec/mjpeg/lex.go index f1166022..46ab4ce8 100644 --- a/codec/mjpeg/lex.go +++ b/codec/mjpeg/lex.go @@ -86,7 +86,6 @@ func Lex(dst io.Writer, src io.Reader, delay time.Duration) error { } last = b } - fmt.Print("lex: just before write") <-tick _, err = dst.Write(buf) if err != nil { diff --git a/filter/basic.go b/filter/basic.go index 47e64e1e..8f984f1b 100644 --- a/filter/basic.go +++ b/filter/basic.go @@ -139,7 +139,6 @@ func (b *BasicFilter) Process(j int, wg *sync.WaitGroup) { // Write applies the motion filter to the video stream. Only frames with motion // are written to the destination encoder, frames without are discarded. func (b *BasicFilter) Write(f []byte) (int, error) { - fmt.Print("in basic write\n") t0 := time.Now() //decode MJPEG r := bytes.NewReader(f) From 6470c98a085b99418fac8ec718ca4f90e3bc8bc4 Mon Sep 17 00:00:00 2001 From: Ella Pietraroia Date: Fri, 31 Jan 2020 10:53:25 +1030 Subject: [PATCH 6/6] tidying up and adding any other comments --- filter/basic.go | 45 ++++++++++----------------------------------- 1 file changed, 10 insertions(+), 35 deletions(-) diff --git a/filter/basic.go b/filter/basic.go index 8f984f1b..19fac225 100644 --- a/filter/basic.go +++ b/filter/basic.go @@ -96,18 +96,14 @@ func (b *BasicFilter) Close() error { func (b *BasicFilter) Process(j int, wg *sync.WaitGroup) { defer wg.Done() for i := 0; i < b.w; i++ { - // ti0 := time.Now() n := b.img.At(i, j) - // ti1 := time.Now() R, G, B, _ := n.RGBA() - // ti2 := time.Now() + // Compare the difference of the RGB values of each pixel to the background image. diffB := absDiff(B, b.bg[j][i][2]) diffR := absDiff(R, b.bg[j][i][0]) diffG := absDiff(G, b.bg[j][i][1]) - // ti3 := time.Now() - diff := diffR + diffG + diffB if diff > 45000 { @@ -121,17 +117,9 @@ func (b *BasicFilter) Process(j int, wg *sync.WaitGroup) { b.bwImg.SetRGBA(i, j, color.RGBA{0x00, 0x00, 0x00, 0xff}) } } - // ti4 := time.Now() + // Update backgound image. copy(b.bg[j][i][:], []uint32{R, G, B}) - // ti5 := time.Now() - - // fmt.Print("At: ", ti1.Sub(ti0).Nanoseconds(), "\n") - // fmt.Print("RGBA: ", ti2.Sub(ti1).Nanoseconds(), "\n") - // fmt.Print("3x absDiff: ", ti3.Sub(ti2).Nanoseconds(), "\n") - // fmt.Print("total diff + threshold + debug: ", ti4.Sub(ti3).Nanoseconds(), "\n") - // fmt.Print("update bg: ", ti5.Sub(ti4).Nanoseconds(), "\n") - // fmt.Print("Total: ", time.Now().Sub(ti0).Nanoseconds(), "\n\n") } } @@ -140,7 +128,7 @@ func (b *BasicFilter) Process(j int, wg *sync.WaitGroup) { // are written to the destination encoder, frames without are discarded. func (b *BasicFilter) Write(f []byte) (int, error) { t0 := time.Now() - //decode MJPEG + // Decode MJPEG. r := bytes.NewReader(f) var err error b.img, err = jpeg.Decode(r) @@ -150,8 +138,8 @@ func (b *BasicFilter) Write(f []byte) (int, error) { t1 := time.Now() - //get background image and save a new background image if needed - //first frame must always be sent + // Get background image and save a new background image if needed + // first frame must always be sent. if b.bg == nil { bounds := b.img.Bounds() b.w = bounds.Max.X @@ -174,13 +162,9 @@ func (b *BasicFilter) Write(f []byte) (int, error) { return len(f), nil } - //for all pixels get the difference from the background image - - // c := make(chan int) - // fmt.Print("channel made \n") - var m sync.Mutex - + // Use 4x goroutines to each process one row of pixels j := 0 + var m sync.Mutex m.Lock() var wg sync.WaitGroup @@ -198,9 +182,7 @@ func (b *BasicFilter) Write(f []byte) (int, error) { m.Unlock() } - t2 := time.Now() - - //visualise + // Will save a video of where motion is detected in motion.mjpeg (in the current folder). if b.debug { col := color.RGBA{200, 100, 0, 255} d := &font.Drawer{ @@ -221,19 +203,12 @@ func (b *BasicFilter) Write(f []byte) (int, error) { return len(f), err } } - t3 := time.Now() - fmt.Print("Encode: ", t1.Sub(t0).Milliseconds(), "\n") - fmt.Print("Calc loop: ", t2.Sub(t1).Milliseconds(), "\n") - fmt.Print("VLC: ", t3.Sub(t2).Milliseconds(), "\n") - fmt.Print("Total: ", time.Now().Sub(t0).Milliseconds(), "\n\n") - - //choose a threshold that motion is detected for if greater + // If there are not enough motion pixels then discard the frame. if b.motion < 1000 { return len(f), nil } - //discard non motion frames - //write motion frames + // Write all motion frames. return b.dst.Write(f) }