added 4x goroutines with mutex lock and waitgroups

This commit is contained in:
Ella Pietraroia 2020-01-30 15:31:01 +10:30
parent a66972a9d6
commit e82f413173
3 changed files with 96 additions and 62 deletions

View File

@ -86,6 +86,7 @@ func Lex(dst io.Writer, src io.Reader, delay time.Duration) error {
} }
last = b last = b
} }
fmt.Print("lex: just before write")
<-tick <-tick
_, err = dst.Write(buf) _, err = dst.Write(buf)
if err != nil { if err != nil {

View File

@ -36,6 +36,8 @@ import (
"image/jpeg" "image/jpeg"
"io" "io"
"os" "os"
"strconv"
"sync"
"time" "time"
"golang.org/x/image/font" "golang.org/x/image/font"
@ -56,57 +58,108 @@ func absDiff(a, b uint32) int {
// MOGFilter is a filter that provides basic motion detection. MoG is short for // MOGFilter is a filter that provides basic motion detection. MoG is short for
// Mixture of Gaussians method. // Mixture of Gaussians method.
type BasicFilter struct { type BasicFilter struct {
dst io.WriteCloser dst io.WriteCloser
bg [][][3]uint32 img image.Image
w int bg [][][3]uint32
h int bwImg *image.RGBA
file io.WriteCloser w int
debug bool h int
file io.WriteCloser
motion int
debug bool
} }
// NewMOGFilter returns a pointer to a new MOGFilter struct. // NewMOGFilter returns a pointer to a new MOGFilter struct.
func NewBasicFilter(dst io.WriteCloser, debug bool) *BasicFilter { func NewBasicFilter(dst io.WriteCloser, debug bool) *BasicFilter {
bwImg := image.NewRGBA(image.Rect(0, 0, 0, 0))
var file io.WriteCloser var file io.WriteCloser
var err error var err error
if debug { if debug {
file, err = os.Create("motion.mjpeg") file, err = os.Create("motion.mjpeg")
if err != nil { if err != nil {
panic("!!! TEST CODE !!!: file didnt work") panic("debug file didn't create")
} }
} else { } else {
file = nil file = nil
} }
return &BasicFilter{dst, nil, nil, bwImg, 0, 0, file, 0, debug}
return &BasicFilter{dst, nil, 0, 0, file, debug}
} }
// Implements io.Closer. // Implements io.Closer.
// Close frees resources used by gocv, because it has to be done manually, due to // Close frees resources used by gocv, because it has to be done manually, due to
// it using c-go. // it using c-go.
func (m *BasicFilter) Close() error { func (b *BasicFilter) Close() error {
return nil return nil
} }
// Go routine for one row of the image to be processed
func (b *BasicFilter) Process(j int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < b.w; i++ {
// ti0 := time.Now()
n := b.img.At(i, j)
// ti1 := time.Now()
R, G, B, _ := n.RGBA()
// ti2 := time.Now()
diffB := absDiff(B, b.bg[j][i][2])
diffR := absDiff(R, b.bg[j][i][0])
diffG := absDiff(G, b.bg[j][i][1])
// ti3 := time.Now()
diff := diffR + diffG + diffB
if diff > 45000 {
b.motion++
}
if b.debug {
if diff > 45000 {
b.bwImg.SetRGBA(i, j, color.RGBA{0xff, 0xff, 0xff, 0xff})
} else {
b.bwImg.SetRGBA(i, j, color.RGBA{0x00, 0x00, 0x00, 0xff})
}
}
// ti4 := time.Now()
// Update backgound image.
copy(b.bg[j][i][:], []uint32{R, G, B})
// ti5 := time.Now()
// fmt.Print("At: ", ti1.Sub(ti0).Nanoseconds(), "\n")
// fmt.Print("RGBA: ", ti2.Sub(ti1).Nanoseconds(), "\n")
// fmt.Print("3x absDiff: ", ti3.Sub(ti2).Nanoseconds(), "\n")
// fmt.Print("total diff + threshold + debug: ", ti4.Sub(ti3).Nanoseconds(), "\n")
// fmt.Print("update bg: ", ti5.Sub(ti4).Nanoseconds(), "\n")
// fmt.Print("Total: ", time.Now().Sub(ti0).Nanoseconds(), "\n\n")
}
}
// Implements io.Writer. // Implements io.Writer.
// Write applies the motion filter to the video stream. Only frames with motion // Write applies the motion filter to the video stream. Only frames with motion
// are written to the destination encoder, frames without are discarded. // are written to the destination encoder, frames without are discarded.
func (b *BasicFilter) Write(f []byte) (int, error) { func (b *BasicFilter) Write(f []byte) (int, error) {
fmt.Print("in basic write\n")
t0 := time.Now() t0 := time.Now()
//decode MJPEG //decode MJPEG
r := bytes.NewReader(f) r := bytes.NewReader(f)
img, err := jpeg.Decode(r) var err error
b.img, err = jpeg.Decode(r)
if err != nil { if err != nil {
return 0, fmt.Errorf("image can't be decoded: %w", err) return 0, fmt.Errorf("image can't be decoded: %w", err)
} }
t1 := time.Now() t1 := time.Now()
//get background image and save a new background image if needed //get background image and save a new background image if needed
//first frame must always be sent //first frame must always be sent
if b.bg == nil { if b.bg == nil {
bounds := img.Bounds() bounds := b.img.Bounds()
b.w = bounds.Max.X b.w = bounds.Max.X
b.h = bounds.Max.Y b.h = bounds.Max.Y
b.bwImg = image.NewRGBA(image.Rect(0, 0, b.w, b.h))
b.bg = make([][][3]uint32, b.h) b.bg = make([][][3]uint32, b.h)
for i, _ := range b.bg { for i, _ := range b.bg {
b.bg[i] = make([][3]uint32, b.w) b.bg[i] = make([][3]uint32, b.w)
@ -114,7 +167,7 @@ func (b *BasicFilter) Write(f []byte) (int, error) {
for j := 0; j < b.h; j++ { for j := 0; j < b.h; j++ {
for i := 0; i < b.w; i++ { for i := 0; i < b.w; i++ {
n := img.At(i, j) n := b.img.At(i, j)
R, G, B, _ := n.RGBA() R, G, B, _ := n.RGBA()
copy(b.bg[j][i][:], []uint32{R, G, B}) copy(b.bg[j][i][:], []uint32{R, G, B})
} }
@ -122,68 +175,49 @@ func (b *BasicFilter) Write(f []byte) (int, error) {
return len(f), nil return len(f), nil
} }
motion := 0
//for all pixels get the difference from the background image //for all pixels get the difference from the background image
bwImg := image.NewRGBA(image.Rect(0, 0, b.w, b.h)) // c := make(chan int)
for j := 0; j < b.h; j++ { // fmt.Print("channel made \n")
for i := 0; i < b.w; i++ { var m sync.Mutex
// ti0 := time.Now()
n := img.At(i, j)
// ti1 := time.Now()
R, G, B, _ := n.RGBA()
// ti2 := time.Now()
// diffR := int(math.Abs(float64(R) - float64(b.bg[j][i][0])))
// diffG := int(math.Abs(float64(G) - float64(b.bg[j][i][1])))
// diffB := int(math.Abs(float64(B) - float64(b.bg[j][i][2])))
diffB := absDiff(B, b.bg[j][i][2]) j := 0
diffR := absDiff(R, b.bg[j][i][0]) m.Lock()
diffG := absDiff(G, b.bg[j][i][1]) var wg sync.WaitGroup
// ti3 := time.Now() for j < b.h {
wg.Add(4)
diff := diffR + diffG + diffB go b.Process(j, &wg)
go b.Process(j+1, &wg)
if diff > 45000 { go b.Process(j+2, &wg)
motion++ go b.Process(j+3, &wg)
} j = j + 4
wg.Wait()
if b.debug {
if diff > 45000 {
bwImg.SetRGBA(i, j, color.RGBA{0xff, 0xff, 0xff, 0xff})
} else {
bwImg.SetRGBA(i, j, color.RGBA{0x00, 0x00, 0x00, 0xff})
}
}
// ti4 := time.Now()
// Update backgound image.
copy(b.bg[j][i][:], []uint32{R, G, B})
// ti5 := time.Now()
// fmt.Print("At: ", ti1.Sub(ti0).Nanoseconds(), "\n")
// fmt.Print("RGBA: ", ti2.Sub(ti1).Nanoseconds(), "\n")
// fmt.Print("3x absDiff: ", ti3.Sub(ti2).Nanoseconds(), "\n")
// fmt.Print("total diff + threshold + debug: ", ti4.Sub(ti3).Nanoseconds(), "\n")
// fmt.Print("update bg: ", ti5.Sub(ti4).Nanoseconds(), "\n")
// fmt.Print("Total: ", time.Now().Sub(ti0).Nanoseconds(), "\n\n")
}
} }
if j >= b.h {
m.Unlock()
}
t2 := time.Now() t2 := time.Now()
//visualise //visualise
if b.debug { if b.debug {
col := color.RGBA{200, 100, 0, 255} col := color.RGBA{200, 100, 0, 255}
d := &font.Drawer{ d := &font.Drawer{
Dst: bwImg, Dst: b.bwImg,
Src: image.NewUniform(col), Src: image.NewUniform(col),
Face: basicfont.Face7x13, Face: basicfont.Face7x13,
Dot: fixed.P(20, 30), Dot: fixed.P(20, 30),
} }
if motion > 1000 { var s string
d.DrawString("Motion") if b.motion > 1000 {
s = strconv.Itoa(b.motion) + " Motion"
} else {
s = strconv.Itoa(b.motion)
} }
err = jpeg.Encode(b.file, bwImg, nil) d.DrawString(s)
err = jpeg.Encode(b.file, b.bwImg, nil)
if err != nil { if err != nil {
return len(f), err return len(f), err
} }
@ -196,7 +230,7 @@ func (b *BasicFilter) Write(f []byte) (int, error) {
fmt.Print("Total: ", time.Now().Sub(t0).Milliseconds(), "\n\n") fmt.Print("Total: ", time.Now().Sub(t0).Milliseconds(), "\n\n")
//choose a threshold that motion is detected for if greater //choose a threshold that motion is detected for if greater
if motion < 1000 { if b.motion < 1000 {
return len(f), nil return len(f), nil
} }
//discard non motion frames //discard non motion frames

View File

@ -361,7 +361,7 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
case config.FilterKNN: case config.FilterKNN:
r.filters[i] = filter.NewKNNFilter(dst, knnMinArea, knnThreshold, knnHistory, knnKernel, r.cfg.ShowWindows) r.filters[i] = filter.NewKNNFilter(dst, knnMinArea, knnThreshold, knnHistory, knnKernel, r.cfg.ShowWindows)
case config.FilterBasic: case config.FilterBasic:
r.filters[i] = filter.NewBasicFilter(dst, false) r.filters[i] = filter.NewBasicFilter(dst, r.cfg.ShowWindows)
default: default:
panic("Undefined Filter") panic("Undefined Filter")
} }
@ -452,7 +452,6 @@ func (r *Revid) Start() error {
if err != nil { if err != nil {
return fmt.Errorf("could not start input device: %w", err) return fmt.Errorf("could not start input device: %w", err)
} }
r.wg.Add(1) r.wg.Add(1)
go r.processFrom(r.input, 0) go r.processFrom(r.input, 0)