From 56422a366e7b4961780777f845c56631648ae0c6 Mon Sep 17 00:00:00 2001 From: saxon Date: Mon, 21 Jan 2019 14:57:40 +1030 Subject: [PATCH 01/34] av/stream/flac: added decode.go and flac_test.go --- stream/flac/decode.go | 0 stream/flac/flac_test.go | 0 2 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 stream/flac/decode.go create mode 100644 stream/flac/flac_test.go diff --git a/stream/flac/decode.go b/stream/flac/decode.go new file mode 100644 index 00000000..e69de29b diff --git a/stream/flac/flac_test.go b/stream/flac/flac_test.go new file mode 100644 index 00000000..e69de29b From 44c79e225673f3f7fd74a420273d23f94227ec91 Mon Sep 17 00:00:00 2001 From: saxon Date: Mon, 21 Jan 2019 15:41:49 +1030 Subject: [PATCH 02/34] av/stream/flac: wrote decode function and test to see if we can get wav. --- stream/flac/decode.go | 108 +++++++++++++++++++++++++++++++++++++++ stream/flac/flac_test.go | 31 +++++++++++ 2 files changed, 139 insertions(+) diff --git a/stream/flac/decode.go b/stream/flac/decode.go index e69de29b..2ecfb737 100644 --- a/stream/flac/decode.go +++ b/stream/flac/decode.go @@ -0,0 +1,108 @@ +package flac + +import ( + "bytes" + "errors" + "io" + + "github.com/go-audio/audio" + "github.com/go-audio/wav" + "github.com/mewkiz/flac" +) + +const wavAudioFormat = 1 + +type buffer struct { + Buffer bytes.Buffer + Index int64 +} + +func (b *buffer) Bytes() []byte { + return b.Buffer.Bytes() +} + +func (b *buffer) Read(p []byte) (int, error) { + n, err := bytes.NewBuffer(b.Buffer.Bytes()[b.Index:]).Read(p) + + if err == nil { + if b.Index+int64(len(p)) < int64(b.Buffer.Len()) { + b.Index += int64(len(p)) + } else { + b.Index = int64(b.Buffer.Len()) + } + } + + return n, err +} + +func (b *buffer) Write(p []byte) (int, error) { + n, err := b.Buffer.Write(p) + + if err == nil { + b.Index = int64(b.Buffer.Len()) + } + + return n, err +} + +func (b *buffer) Seek(offset int64, whence int) (int64, error) { + var err error + var Index int64 = 0 + + switch whence { + case 0: + if offset >= int64(b.Buffer.Len()) || offset < 0 { + err = errors.New("Invalid Offset.") + } else { + b.Index = offset + Index = offset + } + default: + err = errors.New("Unsupported Seek Method.") + } + + return Index, err +} + +// Decode takes a slice of flac and decodes to wav +func Decode(buf []byte) ([]byte, error) { + r := bytes.NewReader(buf) + stream, err := flac.Parse(r) + if err != nil { + return nil, errors.New("Could not parse FLAC") + } + fb := &buffer{} + enc := wav.NewEncoder(fb, int(stream.Info.SampleRate), int(stream.Info.BitsPerSample), int(stream.Info.NChannels), wavAudioFormat) + defer enc.Close() + var data []int + for { + // Decode FLAC audio samples. + frame, err := stream.ParseNext() + if err != nil { + if err == io.EOF { + break + } + return nil, err + } + + // Encode WAV audio samples. + data = data[:0] + for i := 0; i < frame.Subframes[0].NSamples; i++ { + for _, subframe := range frame.Subframes { + data = append(data, int(subframe.Samples[i])) + } + } + buf := &audio.IntBuffer{ + Format: &audio.Format{ + NumChannels: int(stream.Info.NChannels), + SampleRate: int(stream.Info.SampleRate), + }, + Data: data, + SourceBitDepth: int(stream.Info.BitsPerSample), + } + if err := enc.Write(buf); err != nil { + return nil, err + } + } + return fb.Bytes(), nil +} diff --git a/stream/flac/flac_test.go b/stream/flac/flac_test.go index e69de29b..763731d4 100644 --- a/stream/flac/flac_test.go +++ b/stream/flac/flac_test.go @@ -0,0 +1,31 @@ +package flac + +import ( + "io/ioutil" + "os" + "testing" +) + +const ( + testFile = "/home/saxon/Desktop/robot.flac" + outFile = "out.wav" +) + +func TestDecodeFlac(t *testing.T) { + b, err := ioutil.ReadFile(testFile) + if err != nil { + t.Fatalf("Could not read test file, failed with err: %v", err.Error()) + } + out, err := Decode(b) + if err != nil { + t.Errorf("Could not decode, failed with err: %v", err.Error()) + } + f, err := os.Create(outFile) + if err != nil { + t.Fatalf("Could not create output file, failed with err: %v", err.Error()) + } + _, err = f.Write(out) + if err != nil { + t.Fatalf("Could not write to output file, failed with err: %v", err.Error()) + } +} From 6fda0b3c3fe91515c792cac3e29124090f7dcf18 Mon Sep 17 00:00:00 2001 From: saxon Date: Mon, 21 Jan 2019 17:34:15 +1030 Subject: [PATCH 03/34] av/stream/flac: using writerseeker to pass to wav.NewEncoder because I don't want to give it a file, but it's not working --- stream/flac/decode.go | 68 ++++++++-------------------------------- stream/flac/flac_test.go | 2 +- 2 files changed, 14 insertions(+), 56 deletions(-) diff --git a/stream/flac/decode.go b/stream/flac/decode.go index 2ecfb737..941610e2 100644 --- a/stream/flac/decode.go +++ b/stream/flac/decode.go @@ -4,66 +4,16 @@ import ( "bytes" "errors" "io" + "io/ioutil" "github.com/go-audio/audio" "github.com/go-audio/wav" "github.com/mewkiz/flac" + "github.com/orcaman/writerseeker" ) const wavAudioFormat = 1 -type buffer struct { - Buffer bytes.Buffer - Index int64 -} - -func (b *buffer) Bytes() []byte { - return b.Buffer.Bytes() -} - -func (b *buffer) Read(p []byte) (int, error) { - n, err := bytes.NewBuffer(b.Buffer.Bytes()[b.Index:]).Read(p) - - if err == nil { - if b.Index+int64(len(p)) < int64(b.Buffer.Len()) { - b.Index += int64(len(p)) - } else { - b.Index = int64(b.Buffer.Len()) - } - } - - return n, err -} - -func (b *buffer) Write(p []byte) (int, error) { - n, err := b.Buffer.Write(p) - - if err == nil { - b.Index = int64(b.Buffer.Len()) - } - - return n, err -} - -func (b *buffer) Seek(offset int64, whence int) (int64, error) { - var err error - var Index int64 = 0 - - switch whence { - case 0: - if offset >= int64(b.Buffer.Len()) || offset < 0 { - err = errors.New("Invalid Offset.") - } else { - b.Index = offset - Index = offset - } - default: - err = errors.New("Unsupported Seek Method.") - } - - return Index, err -} - // Decode takes a slice of flac and decodes to wav func Decode(buf []byte) ([]byte, error) { r := bytes.NewReader(buf) @@ -71,10 +21,12 @@ func Decode(buf []byte) ([]byte, error) { if err != nil { return nil, errors.New("Could not parse FLAC") } - fb := &buffer{} - enc := wav.NewEncoder(fb, int(stream.Info.SampleRate), int(stream.Info.BitsPerSample), int(stream.Info.NChannels), wavAudioFormat) + ws := &writerseeker.WriterSeeker{} + enc := wav.NewEncoder(ws, int(stream.Info.SampleRate), int(stream.Info.BitsPerSample), int(stream.Info.NChannels), wavAudioFormat) defer enc.Close() var data []int + var out []byte + var d []byte for { // Decode FLAC audio samples. frame, err := stream.ParseNext() @@ -103,6 +55,12 @@ func Decode(buf []byte) ([]byte, error) { if err := enc.Write(buf); err != nil { return nil, err } + d, err = ioutil.ReadAll(ws.Reader()) + if err != nil { + return nil, err + } + out = append(out, d...) } - return fb.Bytes(), nil + + return d, nil } diff --git a/stream/flac/flac_test.go b/stream/flac/flac_test.go index 763731d4..13bef836 100644 --- a/stream/flac/flac_test.go +++ b/stream/flac/flac_test.go @@ -8,7 +8,7 @@ import ( const ( testFile = "/home/saxon/Desktop/robot.flac" - outFile = "out.wav" + outFile = "testOut.wav" ) func TestDecodeFlac(t *testing.T) { From 155134eeed1bd79ddc52c2bdd27c92fb9f527cef Mon Sep 17 00:00:00 2001 From: saxon Date: Mon, 21 Jan 2019 17:37:16 +1030 Subject: [PATCH 04/34] av/stream/flac: moved readAll to after loop --- stream/flac/decode.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/stream/flac/decode.go b/stream/flac/decode.go index 941610e2..6c8445e4 100644 --- a/stream/flac/decode.go +++ b/stream/flac/decode.go @@ -25,8 +25,6 @@ func Decode(buf []byte) ([]byte, error) { enc := wav.NewEncoder(ws, int(stream.Info.SampleRate), int(stream.Info.BitsPerSample), int(stream.Info.NChannels), wavAudioFormat) defer enc.Close() var data []int - var out []byte - var d []byte for { // Decode FLAC audio samples. frame, err := stream.ParseNext() @@ -55,12 +53,10 @@ func Decode(buf []byte) ([]byte, error) { if err := enc.Write(buf); err != nil { return nil, err } - d, err = ioutil.ReadAll(ws.Reader()) - if err != nil { - return nil, err - } - out = append(out, d...) } - + d, err := ioutil.ReadAll(ws.Reader()) + if err != nil { + return nil, err + } return d, nil } From 28e26cd151cc9866cf9e1072c2936528c2ed4554 Mon Sep 17 00:00:00 2001 From: saxon Date: Mon, 21 Jan 2019 17:50:09 +1030 Subject: [PATCH 05/34] av/stream/flc: using my own writeSeeker implementation - working --- stream/flac/decode.go | 52 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 44 insertions(+), 8 deletions(-) diff --git a/stream/flac/decode.go b/stream/flac/decode.go index 6c8445e4..667fcf9c 100644 --- a/stream/flac/decode.go +++ b/stream/flac/decode.go @@ -4,16 +4,55 @@ import ( "bytes" "errors" "io" - "io/ioutil" "github.com/go-audio/audio" "github.com/go-audio/wav" "github.com/mewkiz/flac" - "github.com/orcaman/writerseeker" ) const wavAudioFormat = 1 +type WriterSeeker struct { + buf []byte + pos int +} + +func (ws *WriterSeeker) Bytes() []byte { + return ws.buf +} + +func (m *WriterSeeker) Write(p []byte) (n int, err error) { + minCap := m.pos + len(p) + if minCap > cap(m.buf) { // Make sure buf has enough capacity: + buf2 := make([]byte, len(m.buf), minCap+len(p)) // add some extra + copy(buf2, m.buf) + m.buf = buf2 + } + if minCap > len(m.buf) { + m.buf = m.buf[:minCap] + } + copy(m.buf[m.pos:], p) + m.pos += len(p) + return len(p), nil +} + +func (m *WriterSeeker) Seek(offset int64, whence int) (int64, error) { + newPos, offs := 0, int(offset) + switch whence { + case io.SeekStart: + newPos = offs + case io.SeekCurrent: + newPos = m.pos + offs + case io.SeekEnd: + newPos = len(m.buf) + offs + } + if newPos < 0 { + return 0, errors.New("negative result pos") + } + m.pos = newPos + return int64(newPos), nil +} + // Decode takes a slice of flac and decodes to wav func Decode(buf []byte) ([]byte, error) { r := bytes.NewReader(buf) @@ -21,7 +60,7 @@ func Decode(buf []byte) ([]byte, error) { if err != nil { return nil, errors.New("Could not parse FLAC") } - ws := &writerseeker.WriterSeeker{} + ws := &WriterSeeker{} enc := wav.NewEncoder(ws, int(stream.Info.SampleRate), int(stream.Info.BitsPerSample), int(stream.Info.NChannels), wavAudioFormat) defer enc.Close() var data []int @@ -54,9 +93,6 @@ func Decode(buf []byte) ([]byte, error) { return nil, err } } - d, err := ioutil.ReadAll(ws.Reader()) - if err != nil { - return nil, err - } - return d, nil + + return ws.Bytes(), nil } From 5f3bf33213926fee73fd1c2a8fee55735e93a12b Mon Sep 17 00:00:00 2001 From: saxon Date: Mon, 21 Jan 2019 22:52:17 +1030 Subject: [PATCH 06/34] av/stream/flac/decode.go: wrote func headers --- stream/flac/decode.go | 79 ++++++++++++++++++++++++++++++---------- stream/flac/flac_test.go | 26 +++++++++++++ 2 files changed, 85 insertions(+), 20 deletions(-) diff --git a/stream/flac/decode.go b/stream/flac/decode.go index 667fcf9c..42c4dace 100644 --- a/stream/flac/decode.go +++ b/stream/flac/decode.go @@ -1,3 +1,29 @@ +/* +NAME + decode.go + +DESCRIPTION + decode.go provides functionality for the decoding of FLAC compressed audio + +AUTHOR + Saxon Nelson-Milton + +LICENSE + decode.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ package flac import ( @@ -10,59 +36,72 @@ import ( "github.com/mewkiz/flac" ) -const wavAudioFormat = 1 +const wavFormat = 1 -type WriterSeeker struct { +// writeSeeker implements a memory based io.WriteSeeker. +type writeSeeker struct { buf []byte pos int } -func (ws *WriterSeeker) Bytes() []byte { +// Bytes returns the bytes contained in the writeSeekers buffer. +func (ws *writeSeeker) Bytes() []byte { return ws.buf } -func (m *WriterSeeker) Write(p []byte) (n int, err error) { - minCap := m.pos + len(p) - if minCap > cap(m.buf) { // Make sure buf has enough capacity: - buf2 := make([]byte, len(m.buf), minCap+len(p)) // add some extra - copy(buf2, m.buf) - m.buf = buf2 +// Write writes len(p) bytes from p to the writeSeeker's buf and returns the number +// of bytes written. If less than len(p) bytes are written, an error is returned. +func (ws *writeSeeker) Write(p []byte) (n int, err error) { + minCap := ws.pos + len(p) + if minCap > cap(ws.buf) { // Make sure buf has enough capacity: + buf2 := make([]byte, len(ws.buf), minCap+len(p)) // add some extra + copy(buf2, ws.buf) + ws.buf = buf2 } - if minCap > len(m.buf) { - m.buf = m.buf[:minCap] + if minCap > len(ws.buf) { + ws.buf = ws.buf[:minCap] } - copy(m.buf[m.pos:], p) - m.pos += len(p) + copy(ws.buf[ws.pos:], p) + ws.pos += len(p) return len(p), nil } -func (m *WriterSeeker) Seek(offset int64, whence int) (int64, error) { +// Seek sets the offset for the next Read or Write to offset, interpreted according +// to whence: SeekStart means relative to the start of the file, SeekCurrent means +// relative to the current offset, and SeekEnd means relative to the end. Seek returns +// the new offset relative to the start of the file and an error, if any. +func (ws *writeSeeker) Seek(offset int64, whence int) (int64, error) { newPos, offs := 0, int(offset) switch whence { case io.SeekStart: newPos = offs case io.SeekCurrent: - newPos = m.pos + offs + newPos = ws.pos + offs case io.SeekEnd: - newPos = len(m.buf) + offs + newPos = len(ws.buf) + offs } if newPos < 0 { return 0, errors.New("negative result pos") } - m.pos = newPos + ws.pos = newPos return int64(newPos), nil } -// Decode takes a slice of flac and decodes to wav +// Decode takes buf, a slice of FLAC, and decodes to WAV. If complete decoding +// fails, an error is returned. func Decode(buf []byte) ([]byte, error) { + // Lex and decode the FLAC into a stream to hold audio and properties. r := bytes.NewReader(buf) stream, err := flac.Parse(r) if err != nil { return nil, errors.New("Could not parse FLAC") } - ws := &WriterSeeker{} - enc := wav.NewEncoder(ws, int(stream.Info.SampleRate), int(stream.Info.BitsPerSample), int(stream.Info.NChannels), wavAudioFormat) + + // Create WAV encoder and pass writeSeeker that will store output WAV. + ws := &writeSeeker{} + enc := wav.NewEncoder(ws, int(stream.Info.SampleRate), int(stream.Info.BitsPerSample), int(stream.Info.NChannels), wavFormat) defer enc.Close() + var data []int for { // Decode FLAC audio samples. diff --git a/stream/flac/flac_test.go b/stream/flac/flac_test.go index 13bef836..d69c0494 100644 --- a/stream/flac/flac_test.go +++ b/stream/flac/flac_test.go @@ -1,3 +1,29 @@ +/* +NAME + flac_test.go + +DESCRIPTION + flac_test.go provides utilities to test FLAC audio decoding + +AUTHOR + Saxon Nelson-Milton + +LICENSE + flac_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ package flac import ( From 7628efdbc298a1e1527375fd4416c35f52349feb Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 22 Jan 2019 10:26:22 +1030 Subject: [PATCH 07/34] av/stream/flac: working on cleaning up decode code --- stream/flac/decode.go | 43 ++++++++++++++++++++++++------------------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/stream/flac/decode.go b/stream/flac/decode.go index 42c4dace..5a470370 100644 --- a/stream/flac/decode.go +++ b/stream/flac/decode.go @@ -90,7 +90,8 @@ func (ws *writeSeeker) Seek(offset int64, whence int) (int64, error) { // Decode takes buf, a slice of FLAC, and decodes to WAV. If complete decoding // fails, an error is returned. func Decode(buf []byte) ([]byte, error) { - // Lex and decode the FLAC into a stream to hold audio and properties. + + // Lex the FLAC into a stream to hold audio and it's properties. r := bytes.NewReader(buf) stream, err := flac.Parse(r) if err != nil { @@ -99,17 +100,30 @@ func Decode(buf []byte) ([]byte, error) { // Create WAV encoder and pass writeSeeker that will store output WAV. ws := &writeSeeker{} - enc := wav.NewEncoder(ws, int(stream.Info.SampleRate), int(stream.Info.BitsPerSample), int(stream.Info.NChannels), wavFormat) + sr := int(stream.Info.SampleRate) + bps := int(stream.Info.BitsPerSample) + nc := int(stream.Info.NChannels) + enc := wav.NewEncoder(ws, sr, bps, nc, wavFormat) defer enc.Close() + // Decode FLAC into frames of samples + intBuf := &audio.IntBuffer{ + Format: &audio.Format{NumChannels: nc, SampleRate: sr}, + SourceBitDepth: bps, + } + return decodeFrames(stream, intBuf, enc, ws) +} + +// +func decodeFrames(s *flac.Stream, intBuf *audio.IntBuffer, e *wav.Encoder, ws *writeSeeker) ([]byte, error) { var data []int for { - // Decode FLAC audio samples. - frame, err := stream.ParseNext() - if err != nil { - if err == io.EOF { - break - } + frame, err := s.ParseNext() + + // If we've reached the end of the stream then we can output the writeSeeker's buffer. + if err == io.EOF { + return ws.Bytes(), nil + } else if err != nil { return nil, err } @@ -120,18 +134,9 @@ func Decode(buf []byte) ([]byte, error) { data = append(data, int(subframe.Samples[i])) } } - buf := &audio.IntBuffer{ - Format: &audio.Format{ - NumChannels: int(stream.Info.NChannels), - SampleRate: int(stream.Info.SampleRate), - }, - Data: data, - SourceBitDepth: int(stream.Info.BitsPerSample), - } - if err := enc.Write(buf); err != nil { + intBuf.Data = data + if err := e.Write(intBuf); err != nil { return nil, err } } - - return ws.Bytes(), nil } From 6f1767d152d98ca5128f4c91f56449fc6854b322 Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 22 Jan 2019 10:40:40 +1030 Subject: [PATCH 08/34] av/stream/flac: finished cleaning up decode --- stream/flac/decode.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/stream/flac/decode.go b/stream/flac/decode.go index 5a470370..34d42057 100644 --- a/stream/flac/decode.go +++ b/stream/flac/decode.go @@ -114,7 +114,9 @@ func Decode(buf []byte) ([]byte, error) { return decodeFrames(stream, intBuf, enc, ws) } -// +// decodeFrames parses frames from the stream and encodes them into WAV until +// the end of the stream is reached. The bytes from writeSeeker buffer are then +// returned. If any errors occur during encodeing, nil bytes and the error is returned. func decodeFrames(s *flac.Stream, intBuf *audio.IntBuffer, e *wav.Encoder, ws *writeSeeker) ([]byte, error) { var data []int for { From b5611bb2b42f8145dc364c1a6fbba0d96fc15757 Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 22 Jan 2019 10:45:36 +1030 Subject: [PATCH 09/34] av/stream/flac: added writeseeker tests --- stream/flac/flac_test.go | 44 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/stream/flac/flac_test.go b/stream/flac/flac_test.go index d69c0494..9537d682 100644 --- a/stream/flac/flac_test.go +++ b/stream/flac/flac_test.go @@ -27,6 +27,7 @@ LICENSE package flac import ( + "io" "io/ioutil" "os" "testing" @@ -37,6 +38,49 @@ const ( outFile = "testOut.wav" ) +func TestWriteSeekerWrite(t *testing.T) { + writerSeeker := &writeSeeker{} + var ws io.WriteSeeker = writerSeeker + + ws.Write([]byte("hello")) + if string(writerSeeker.buf) != "hello" { + t.Fail() + } + + ws.Write([]byte(" world")) + if string(writerSeeker.buf) != "hello world" { + t.Fail() + } + +} + +func TestWriteSeekerSeek(t *testing.T) { + writerSeeker := &writeSeeker{} + var ws io.WriteSeeker = writerSeeker + + ws.Write([]byte("hello")) + if string(writerSeeker.buf) != "hello" { + t.Fail() + } + + ws.Write([]byte(" world")) + if string(writerSeeker.buf) != "hello world" { + t.Fail() + } + + ws.Seek(-2, io.SeekEnd) + ws.Write([]byte("k!")) + if string(writerSeeker.buf) != "hello work!" { + t.Fail() + } + + ws.Seek(6, io.SeekStart) + ws.Write([]byte("gopher")) + if string(writerSeeker.buf) != "hello gopher" { + t.Fail() + } +} + func TestDecodeFlac(t *testing.T) { b, err := ioutil.ReadFile(testFile) if err != nil { From 99b7f4a44bae180cd3f3cf288da5bca4d619581f Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 22 Jan 2019 11:15:39 +1030 Subject: [PATCH 10/34] av/stream/flac: saving progress --- stream/flac/flac_test.go | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/stream/flac/flac_test.go b/stream/flac/flac_test.go index 9537d682..79274819 100644 --- a/stream/flac/flac_test.go +++ b/stream/flac/flac_test.go @@ -38,45 +38,51 @@ const ( outFile = "testOut.wav" ) +// TestWriteSeekerWrite checks that basic writing to the ws works as expected. func TestWriteSeekerWrite(t *testing.T) { - writerSeeker := &writeSeeker{} - var ws io.WriteSeeker = writerSeeker + ws := &writeSeeker{} - ws.Write([]byte("hello")) - if string(writerSeeker.buf) != "hello" { - t.Fail() + const tstStr1 = "hello" + ws.Write([]byte(tstStr1)) + got := string(ws.buf) + if got != tstStr1 { + t.Errorf("Write failed, got: %v, want: %v", got, tstStr1) } - ws.Write([]byte(" world")) - if string(writerSeeker.buf) != "hello world" { - t.Fail() + const tstStr2 = " world" + const want = "hello world" + ws.Write([]byte(tstStr2)) + got = string(ws.buf) + if got != want { + t.Errorf("Second write failed, got: %v, want: %v", got, want) } - } +// TestWriteSeekerSeek checks that writing and seeking works as expected, i.e. we +// can write, then seek to a knew place in the buf, and write again, either replacing +// bytes, or appending bytes. func TestWriteSeekerSeek(t *testing.T) { - writerSeeker := &writeSeeker{} - var ws io.WriteSeeker = writerSeeker + ws := &writeSeeker{} ws.Write([]byte("hello")) - if string(writerSeeker.buf) != "hello" { + if string(ws.buf) != "hello" { t.Fail() } ws.Write([]byte(" world")) - if string(writerSeeker.buf) != "hello world" { + if string(ws.buf) != "hello world" { t.Fail() } ws.Seek(-2, io.SeekEnd) ws.Write([]byte("k!")) - if string(writerSeeker.buf) != "hello work!" { + if string(ws.buf) != "hello work!" { t.Fail() } ws.Seek(6, io.SeekStart) ws.Write([]byte("gopher")) - if string(writerSeeker.buf) != "hello gopher" { + if string(ws.buf) != "hello gopher" { t.Fail() } } From 9ffc5367cb01297c8aa696e24e440977353595ff Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 22 Jan 2019 12:14:40 +1030 Subject: [PATCH 11/34] av/stream/flac: cleaned up testing file --- stream/flac/flac_test.go | 38 ++++++++++++++++++++++++++------------ 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/stream/flac/flac_test.go b/stream/flac/flac_test.go index 79274819..0d8079f7 100644 --- a/stream/flac/flac_test.go +++ b/stream/flac/flac_test.go @@ -64,29 +64,43 @@ func TestWriteSeekerWrite(t *testing.T) { func TestWriteSeekerSeek(t *testing.T) { ws := &writeSeeker{} - ws.Write([]byte("hello")) - if string(ws.buf) != "hello" { - t.Fail() + const tstStr1 = "hello" + want1 := tstStr1 + ws.Write([]byte(tstStr1)) + got := string(ws.buf) + if got != tstStr1 { + t.Errorf("Unexpected output, got: %v, want: %v", got, want1) } - ws.Write([]byte(" world")) - if string(ws.buf) != "hello world" { - t.Fail() + const tstStr2 = " world" + const want2 = tstStr1 + tstStr2 + ws.Write([]byte(tstStr2)) + got = string(ws.buf) + if got != want2 { + t.Errorf("Unexpected output, got: %v, want: %v", got, want2) } + const tstStr3 = "k!" + const want3 = "hello work!" ws.Seek(-2, io.SeekEnd) - ws.Write([]byte("k!")) - if string(ws.buf) != "hello work!" { - t.Fail() + ws.Write([]byte(tstStr3)) + got = string(ws.buf) + if got != want3 { + t.Errorf("Unexpected output, got: %v, want: %v", got, want3) } + const tstStr4 = "gopher" + const want4 = "hello gopher" ws.Seek(6, io.SeekStart) - ws.Write([]byte("gopher")) - if string(ws.buf) != "hello gopher" { - t.Fail() + ws.Write([]byte(tstStr4)) + got = string(ws.buf) + if got != want4 { + t.Errorf("Unexpected output, got: %v, want: %v", got, want4) } } +// TestDecodeFlac checks that we can load a flac file and decode to wav, writing +// to a wav file. func TestDecodeFlac(t *testing.T) { b, err := ioutil.ReadFile(testFile) if err != nil { From 4602a555d5d098471ad9ee6885a44786f2e36619 Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 22 Jan 2019 12:27:52 +1030 Subject: [PATCH 12/34] av/stream/flac: updated test file directory --- stream/flac/flac_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stream/flac/flac_test.go b/stream/flac/flac_test.go index 0d8079f7..1f8019e5 100644 --- a/stream/flac/flac_test.go +++ b/stream/flac/flac_test.go @@ -34,7 +34,7 @@ import ( ) const ( - testFile = "/home/saxon/Desktop/robot.flac" + testFile = "../../../test/test-data/av/input/robot.flac" outFile = "testOut.wav" ) From 37bdb2cf8ea293b6d6ee8fdc8f8c60a08b9fed10 Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 22 Jan 2019 14:31:42 +1030 Subject: [PATCH 13/34] av/revid: removed test commands that we're not using anymore --- revid/cmd/h264-file-to-flv-rtmp/main.go | 75 ---------------------- revid/cmd/h264-file-to-mpgets-file/main.go | 66 ------------------- 2 files changed, 141 deletions(-) delete mode 100644 revid/cmd/h264-file-to-flv-rtmp/main.go delete mode 100644 revid/cmd/h264-file-to-mpgets-file/main.go diff --git a/revid/cmd/h264-file-to-flv-rtmp/main.go b/revid/cmd/h264-file-to-flv-rtmp/main.go deleted file mode 100644 index 5f79df4a..00000000 --- a/revid/cmd/h264-file-to-flv-rtmp/main.go +++ /dev/null @@ -1,75 +0,0 @@ -/* -NAME - main.go - -DESCRIPTION - See Readme.md - -AUTHOR - Saxon Nelson-Milton - -LICENSE - main.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) - - It is free software: you can redistribute it and/or modify them - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - It is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. - - You should have received a copy of the GNU General Public License - along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). -*/ - -package main - -import ( - "flag" - "log" - "time" - - "bitbucket.org/ausocean/av/revid" - "bitbucket.org/ausocean/iot/pi/smartlogger" - "bitbucket.org/ausocean/utils/logger" -) - -const ( - inputFile = "../../../../test/test-data/av/input/betterInput.h264" - frameRate = "25" - runDuration = 120 * time.Second - logPath = "/var/log" -) - -// Test h264 inputfile to flv format into rtmp using librtmp c wrapper -func main() { - // Get the rtmp url from a cmd flag - rtmpUrlPtr := flag.String("rtmpUrl", "", "The rtmp url you would like to stream to.") - flag.Parse() - if *rtmpUrlPtr == "" { - log.Println("No RTMP url passed!") - return - } - - config := revid.Config{ - Input: revid.File, - InputFileName: inputFile, - InputCodec: revid.H264, - Output1: revid.Rtmp, - RtmpMethod: revid.LibRtmp, - RtmpUrl: *rtmpUrlPtr, - Packetization: revid.Flv, - Logger: logger.New(logger.Info, &smartlogger.New(logPath).LogRoller), - } - revidInst, err := revid.New(config, nil) - if err != nil { - config.Logger.Log(logger.Error, "Should not have got an error!: ", err.Error()) - return - } - revidInst.Start() - time.Sleep(runDuration) - revidInst.Stop() -} diff --git a/revid/cmd/h264-file-to-mpgets-file/main.go b/revid/cmd/h264-file-to-mpgets-file/main.go deleted file mode 100644 index 03a39fde..00000000 --- a/revid/cmd/h264-file-to-mpgets-file/main.go +++ /dev/null @@ -1,66 +0,0 @@ -/* -NAME - main.go - -DESCRIPTION - See Readme.md - -AUTHOR - Saxon Nelson-Milton - -LICENSE - main.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) - - It is free software: you can redistribute it and/or modify them - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - It is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. - - You should have received a copy of the GNU General Public License - along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). -*/ - -package main - -import ( - "time" - - "bitbucket.org/ausocean/av/revid" - "bitbucket.org/ausocean/iot/pi/smartlogger" - "bitbucket.org/ausocean/utils/logger" -) - -const ( - inputFile = "../../../../test/test-data/av/input/betterInput.h264" - outputFile = "output.ts" - frameRate = "25" - runDuration = 120 * time.Second - logPath = "/var/log" -) - -// Test h264 inputfile to flv format into rtmp using librtmp c wrapper -func main() { - - config := revid.Config{ - Input: revid.File, - InputFileName: inputFile, - InputCodec: revid.H264, - Output1: revid.File, - OutputFileName: outputFile, - Packetization: revid.Mpegts, - Logger: logger.New(logger.Info, &smartlogger.New(logPath).LogRoller), - } - revidInst, err := revid.New(config, nil) - if err != nil { - config.Logger.Log(logger.Error, "Should not have got an error!:", err.Error()) - return - } - revidInst.Start() - time.Sleep(runDuration) - revidInst.Stop() -} From 42c9fb1d097abb012ba802b5792c16a870a49719 Mon Sep 17 00:00:00 2001 From: saxon Date: Thu, 24 Jan 2019 14:33:22 +1030 Subject: [PATCH 14/34] stream/mts/encoder.go: writing psi based on time interval rather than number of packets interval --- stream/mts/encoder.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/stream/mts/encoder.go b/stream/mts/encoder.go index 60393285..8a4121a4 100644 --- a/stream/mts/encoder.go +++ b/stream/mts/encoder.go @@ -122,7 +122,7 @@ var ( ) const ( - psiSndCnt = 7 + psiInterval = 1 * time.Second ) // timeLocation holds time and location data @@ -199,9 +199,10 @@ type Encoder struct { tsSpace [PacketSize]byte pesSpace [pes.MaxPesSize]byte - psiCount int - continuity map[int]byte + + now time.Time + psiLastTime time.Time } // NewEncoder returns an Encoder with the specified frame rate. @@ -233,12 +234,15 @@ const ( // generate handles the incoming data and generates equivalent mpegts packets - // sending them to the output channel. func (e *Encoder) Encode(nalu []byte) error { - if e.psiCount <= 0 { + e.now = time.Now() + if e.now.Sub(e.psiLastTime) > psiInterval { err := e.writePSI() if err != nil { return err } + e.psiLastTime = e.now } + // Prepare PES data. pesPkt := pes.Packet{ StreamID: streamID, @@ -269,7 +273,6 @@ func (e *Encoder) Encode(nalu []byte) error { pusi = false } _, err := e.dst.Write(pkt.Bytes(e.tsSpace[:PacketSize])) - e.psiCount-- if err != nil { return err } @@ -318,7 +321,6 @@ func (e *Encoder) writePSI() error { if err != nil { return err } - e.psiCount = psiSndCnt return nil } From 31b9ec07e9be6b67393f8eda884556233705a685 Mon Sep 17 00:00:00 2001 From: saxon Date: Thu, 24 Jan 2019 14:39:14 +1030 Subject: [PATCH 15/34] stream/mts/encoder.go: no need to have a now field to capture current time - this can be local to encode function --- stream/mts/encoder.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/stream/mts/encoder.go b/stream/mts/encoder.go index 8a4121a4..02761b91 100644 --- a/stream/mts/encoder.go +++ b/stream/mts/encoder.go @@ -201,7 +201,6 @@ type Encoder struct { continuity map[int]byte - now time.Time psiLastTime time.Time } @@ -234,13 +233,13 @@ const ( // generate handles the incoming data and generates equivalent mpegts packets - // sending them to the output channel. func (e *Encoder) Encode(nalu []byte) error { - e.now = time.Now() - if e.now.Sub(e.psiLastTime) > psiInterval { + now := time.Now() + if now.Sub(e.psiLastTime) > psiInterval { err := e.writePSI() if err != nil { return err } - e.psiLastTime = e.now + e.psiLastTime = now } // Prepare PES data. From 0e34623f0f5bb50adb6d17eed657338c38bfb15b Mon Sep 17 00:00:00 2001 From: scruzin Date: Sat, 2 Feb 2019 12:27:21 +1030 Subject: [PATCH 16/34] rtmp: Use a net.Conn interface instead of *net.TCPConn. --- rtmp/conn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rtmp/conn.go b/rtmp/conn.go index 3864df98..7e1b3b15 100644 --- a/rtmp/conn.go +++ b/rtmp/conn.go @@ -76,7 +76,7 @@ type link struct { protocol int32 timeout uint port uint16 - conn *net.TCPConn + conn net.Conn } // method represents an RTMP method. From 1af4b250306d9240d097845672da2fcbc0c2413e Mon Sep 17 00:00:00 2001 From: saxon Date: Sun, 3 Feb 2019 21:17:44 +1030 Subject: [PATCH 17/34] cmd/revid-cli & revid: removed startRevid and stopRevid as shouldn't be required when we have revid.Start() and revid.Stop(). Created revid.Config() which returns copy of config safely using mutex. removed updateRevid in revid-cli and move to fun revid.Update() - as there's no reason why it can't just be a receiver func - even better considering we want to start moving alot of stuff from revid-cli to the revid-api anyways. --- cmd/revid-cli/main.go | 54 +++++--------------- revid/revid.go | 115 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 127 insertions(+), 42 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 5b826b91..826cb2a4 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -72,23 +72,22 @@ func main() { cfg := handleFlags() if !*useNetsender { - // run revid for the specified duration - rv, _, err := startRevid(nil, cfg) + rv, err := revid.New(cfg, nil) if err != nil { + cfg.Logger.Log(logger.Fatal, pkg+"failed to initialiase revid", "error", err.Error()) + } + if err = rv.Start(); err != nil { cfg.Logger.Log(logger.Fatal, pkg+"failed to start revid", "error", err.Error()) } time.Sleep(*runDurationPtr) - err = stopRevid(rv) - if err != nil { + if err = rv.Stop(); err != nil { cfg.Logger.Log(logger.Error, pkg+"failed to stop revid before program termination", "error", err.Error()) } return } - err := run(nil, cfg) - if err != nil { + if err := run(cfg); err != nil { log.Log(logger.Fatal, pkg+"failed to run revid", "error", err.Error()) - os.Exit(1) } } @@ -244,27 +243,20 @@ func handleFlags() revid.Config { } // initialize then run the main NetSender client -func run(rv *revid.Revid, cfg revid.Config) error { +func run(cfg revid.Config) error { // initialize NetSender and use NetSender's logger - //config.Logger = netsender.Logger() log.Log(logger.Info, pkg+"running in NetSender mode") var ns netsender.Sender - err := ns.Init(log, nil, nil, nil) - if err != nil { + if err := ns.Init(log, nil, nil, nil); err != nil { return err } vars, _ := ns.Vars() vs := ns.VarSum() - paused := false - if vars["mode"] == "Paused" { - paused = true - } - if !paused { - rv, cfg, err = updateRevid(&ns, rv, cfg, vars, false) - if err != nil { - return err - } + + rv, cfg, err = updateRevid(&ns, rv, cfg, vars, false) + if err != nil { + return err } for { @@ -331,28 +323,6 @@ func send(ns *netsender.Sender, rv *revid.Revid) error { return nil } -// wrappers for stopping and starting revid -func startRevid(ns *netsender.Sender, cfg revid.Config) (*revid.Revid, revid.Config, error) { - rv, err := revid.New(cfg, ns) - if err != nil { - return nil, cfg, err - } - err = rv.Start() - return rv, cfg, err -} - -func stopRevid(rv *revid.Revid) error { - err := rv.Stop() - if err != nil { - return err - } - - // FIXME(kortschak): Is this waiting on completion of work? - // Use a wait group and Wait method if it is. - time.Sleep(revidStopTime) - return nil -} - func updateRevid(ns *netsender.Sender, rv *revid.Revid, cfg revid.Config, vars map[string]string, stop bool) (*revid.Revid, revid.Config, error) { if stop { err := stopRevid(rv) diff --git a/revid/revid.go b/revid/revid.go index 6833ec2b..81546ead 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -40,6 +40,7 @@ import ( "sync" "time" + "bitbucket.org/ausocean/av/revid" "bitbucket.org/ausocean/av/stream" "bitbucket.org/ausocean/av/stream/flv" "bitbucket.org/ausocean/av/stream/lex" @@ -326,6 +327,13 @@ func (r *Revid) IsRunning() bool { return ret } +func (r *Revid) Config() Config { + r.mu.Lock() + ret := r.config + r.mu.Unlock() + return ret +} + // setIsRunning sets revid.isRunning using b. func (r *Revid) setIsRunning(b bool) { r.mu.Lock() @@ -366,6 +374,113 @@ func (r *Revid) Stop() error { return nil } +func (r *Revid) Update(vars map[string]string) error { + if r.IsRunning() { + r.Stop() + } + //look through the vars and update revid where needed + for key, value := range vars { + switch key { + case "Output": + // FIXME(kortschak): There can be only one! + // How do we specify outputs after the first? + // + // Maybe we shouldn't be doing this! + switch value { + case "File": + cfg.Outputs[0] = revid.File + case "Http": + cfg.Outputs[0] = revid.Http + case "Rtmp": + cfg.Outputs[0] = revid.Rtmp + case "FfmpegRtmp": + cfg.Outputs[0] = revid.FfmpegRtmp + default: + log.Log(logger.Warning, pkg+"invalid Output1 param", "value", value) + continue + } + case "FramesPerClip": + f, err := strconv.ParseUint(value, 10, 0) + if err != nil { + log.Log(logger.Warning, pkg+"invalid framesperclip param", "value", value) + break + } + cfg.FramesPerClip = uint(f) + case "RtmpUrl": + cfg.RtmpUrl = value + case "Bitrate": + r, err := strconv.ParseUint(value, 10, 0) + if err != nil { + log.Log(logger.Warning, pkg+"invalid framerate param", "value", value) + break + } + cfg.Bitrate = uint(r) + case "OutputFileName": + cfg.OutputFileName = value + case "InputFileName": + cfg.InputFileName = value + case "Height": + h, err := strconv.ParseUint(value, 10, 0) + if err != nil { + log.Log(logger.Warning, pkg+"invalid height param", "value", value) + break + } + cfg.Height = uint(h) + case "Width": + w, err := strconv.ParseUint(value, 10, 0) + if err != nil { + log.Log(logger.Warning, pkg+"invalid width param", "value", value) + break + } + cfg.Width = uint(w) + case "FrameRate": + r, err := strconv.ParseUint(value, 10, 0) + if err != nil { + log.Log(logger.Warning, pkg+"invalid framerate param", "value", value) + break + } + cfg.FrameRate = uint(r) + case "HttpAddress": + cfg.HttpAddress = value + case "Quantization": + q, err := strconv.ParseUint(value, 10, 0) + if err != nil { + log.Log(logger.Warning, pkg+"invalid quantization param", "value", value) + break + } + cfg.Quantization = uint(q) + case "IntraRefreshPeriod": + p, err := strconv.ParseUint(value, 10, 0) + if err != nil { + log.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value) + break + } + cfg.IntraRefreshPeriod = uint(p) + case "HorizontalFlip": + switch strings.ToLower(value) { + case "true": + cfg.FlipHorizontal = true + case "false": + cfg.FlipHorizontal = false + default: + log.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value) + } + case "VerticalFlip": + switch strings.ToLower(value) { + case "true": + cfg.FlipVertical = true + case "false": + cfg.FlipVertical = false + default: + log.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value) + } + default: + } + } + + return startRevid(ns, cfg) +} + // outputClips takes the clips produced in the packClips method and outputs them // to the desired output defined in the revid config func (r *Revid) outputClips() { From 9095044e234e730c19ed6a4f468bb46f2ba5c944 Mon Sep 17 00:00:00 2001 From: saxon Date: Sun, 3 Feb 2019 21:55:40 +1030 Subject: [PATCH 18/34] revid: using waitgroups so that revid.Stop() is safer - we can wait until the input and output routines are done before we do anything, like touch the revid config. Also started modifying revid.Update() to remove errors introduced after the copy of updateRevid from revid-cli to revid.go in the previous commit. --- revid/revid.go | 59 ++++++++++++++++++++++++++++---------------------- 1 file changed, 33 insertions(+), 26 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index 81546ead..33d847ea 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -1,6 +1,6 @@ /* NAME - revid.go + r.go DESCRIPTION See Readme.md @@ -40,7 +40,6 @@ import ( "sync" "time" - "bitbucket.org/ausocean/av/revid" "bitbucket.org/ausocean/av/stream" "bitbucket.org/ausocean/av/stream/flv" "bitbucket.org/ausocean/av/stream/lex" @@ -120,10 +119,11 @@ type Revid struct { // bitrate hold the last send bitrate calculation result. bitrate int - // isRunning is a loaded and cocked foot-gun. mu sync.Mutex isRunning bool + wg sync.WaitGroup + err chan error } @@ -334,7 +334,7 @@ func (r *Revid) Config() Config { return ret } -// setIsRunning sets revid.isRunning using b. +// setIsRunning sets r.isRunning using b. func (r *Revid) setIsRunning(b bool) { r.mu.Lock() r.isRunning = b @@ -348,9 +348,11 @@ func (r *Revid) Start() error { return errors.New(pkg + "start called but revid is already running") } r.config.Logger.Log(logger.Info, pkg+"starting Revid") + // TODO: this doesn't need to be here r.config.Logger.Log(logger.Debug, pkg+"setting up output") r.setIsRunning(true) r.config.Logger.Log(logger.Info, pkg+"starting output routine") + r.wg.Add(1) go r.outputClips() r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content") err := r.setupInput() @@ -371,6 +373,7 @@ func (r *Revid) Stop() error { if r.cmd != nil && r.cmd.Process != nil { r.cmd.Process.Kill() } + r.wg.Wait() return nil } @@ -388,89 +391,89 @@ func (r *Revid) Update(vars map[string]string) error { // Maybe we shouldn't be doing this! switch value { case "File": - cfg.Outputs[0] = revid.File + r.config.Outputs[0] = File case "Http": - cfg.Outputs[0] = revid.Http + r.config.Outputs[0] = Http case "Rtmp": - cfg.Outputs[0] = revid.Rtmp + r.config.Outputs[0] = Rtmp case "FfmpegRtmp": - cfg.Outputs[0] = revid.FfmpegRtmp + r.config.Outputs[0] = FfmpegRtmp default: - log.Log(logger.Warning, pkg+"invalid Output1 param", "value", value) + r.config.Logger.Log(logger.Warning, pkg+"invalid Output1 param", "value", value) continue } case "FramesPerClip": f, err := strconv.ParseUint(value, 10, 0) if err != nil { - log.Log(logger.Warning, pkg+"invalid framesperclip param", "value", value) + .Log(logger.Warning, pkg+"invalid framesperclip param", "value", value) break } - cfg.FramesPerClip = uint(f) + r.config.FramesPerClip = uint(f) case "RtmpUrl": - cfg.RtmpUrl = value + r.config.RtmpUrl = value case "Bitrate": r, err := strconv.ParseUint(value, 10, 0) if err != nil { log.Log(logger.Warning, pkg+"invalid framerate param", "value", value) break } - cfg.Bitrate = uint(r) + r.config.Bitrate = uint(r) case "OutputFileName": - cfg.OutputFileName = value + r.config.OutputFileName = value case "InputFileName": - cfg.InputFileName = value + r.config.InputFileName = value case "Height": h, err := strconv.ParseUint(value, 10, 0) if err != nil { log.Log(logger.Warning, pkg+"invalid height param", "value", value) break } - cfg.Height = uint(h) + r.config.Height = uint(h) case "Width": w, err := strconv.ParseUint(value, 10, 0) if err != nil { log.Log(logger.Warning, pkg+"invalid width param", "value", value) break } - cfg.Width = uint(w) + r.config.Width = uint(w) case "FrameRate": r, err := strconv.ParseUint(value, 10, 0) if err != nil { log.Log(logger.Warning, pkg+"invalid framerate param", "value", value) break } - cfg.FrameRate = uint(r) + r.config.FrameRate = uint(r) case "HttpAddress": - cfg.HttpAddress = value + r.config.HttpAddress = value case "Quantization": q, err := strconv.ParseUint(value, 10, 0) if err != nil { log.Log(logger.Warning, pkg+"invalid quantization param", "value", value) break } - cfg.Quantization = uint(q) + r.config.Quantization = uint(q) case "IntraRefreshPeriod": p, err := strconv.ParseUint(value, 10, 0) if err != nil { log.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value) break } - cfg.IntraRefreshPeriod = uint(p) + r.config.IntraRefreshPeriod = uint(p) case "HorizontalFlip": switch strings.ToLower(value) { case "true": - cfg.FlipHorizontal = true + r.config.FlipHorizontal = true case "false": - cfg.FlipHorizontal = false + r.config.FlipHorizontal = false default: log.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value) } case "VerticalFlip": switch strings.ToLower(value) { case "true": - cfg.FlipVertical = true + r.config.FlipVertical = true case "false": - cfg.FlipVertical = false + r.config.FlipVertical = false default: log.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value) } @@ -478,7 +481,7 @@ func (r *Revid) Update(vars map[string]string) error { } } - return startRevid(ns, cfg) + return r.Start() } // outputClips takes the clips produced in the packClips method and outputs them @@ -622,6 +625,7 @@ func (r *Revid) startRaspivid() error { r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error()) } + r.wg.Add(1) go r.processFrom(stdout, 0) return nil } @@ -670,6 +674,7 @@ func (r *Revid) startV4L() error { return err } + r.wg.Add(1) go r.processFrom(stdout, time.Duration(0)) return nil } @@ -685,6 +690,7 @@ func (r *Revid) setupInputForFile() error { defer f.Close() // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. + r.wg.Add(1) go r.processFrom(f, time.Second/time.Duration(r.config.FrameRate)) return nil } @@ -693,4 +699,5 @@ func (r *Revid) processFrom(read io.Reader, delay time.Duration) { r.config.Logger.Log(logger.Info, pkg+"reading input data") r.err <- r.lexTo(r.encoder, read, delay) r.config.Logger.Log(logger.Info, pkg+"finished reading input data") + r.wg.Done() } From 1010721dd070882058660fff25f3e84db4ad446d Mon Sep 17 00:00:00 2001 From: saxon Date: Sun, 3 Feb 2019 23:37:38 +1030 Subject: [PATCH 19/34] cmd/revid-cli & revid: Checking revid mode differently - now using ns.Mode(), which should soon be an available feature. Also now using ns.SetMode() - which tells netreceiver that we've changed mode. --- cmd/revid-cli/main.go | 157 +++++++++--------------------------------- revid/revid.go | 26 +++---- 2 files changed, 45 insertions(+), 138 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 826cb2a4..58f8e507 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -50,6 +50,13 @@ const ( defaultLogVerbosity = logger.Debug ) +// Revid modes +const ( + normal = "Normal" + paused = "Paused" + burst = "Burst" +) + // Other misc consts const ( netSendRetryTime = 5 * time.Second @@ -244,9 +251,9 @@ func handleFlags() revid.Config { // initialize then run the main NetSender client func run(cfg revid.Config) error { - // initialize NetSender and use NetSender's logger log.Log(logger.Info, pkg+"running in NetSender mode") + // initialize NetSender and use NetSender's logger var ns netsender.Sender if err := ns.Init(log, nil, nil, nil); err != nil { return err @@ -254,8 +261,12 @@ func run(cfg revid.Config) error { vars, _ := ns.Vars() vs := ns.VarSum() - rv, cfg, err = updateRevid(&ns, rv, cfg, vars, false) + rv, err := revid.New(cfg, &ns) if err != nil { + log.Log(logger.Fatal, pkg+"could not initialise revid", "error", err.Error()) + } + + if err = rv.Update(vars); err != nil { return err } @@ -267,7 +278,6 @@ func run(cfg revid.Config) error { } if vs != ns.VarSum() { - // vars changed vars, err := ns.Vars() if err != nil { log.Log(logger.Error, pkg+"netSender failed to get vars", "error", err.Error()) @@ -275,24 +285,32 @@ func run(cfg revid.Config) error { continue } vs = ns.VarSum() - if vars["mode"] == "Paused" { - if !paused { + + switch ns.Mode() { + case paused: + if rv.IsRunning() { log.Log(logger.Info, pkg+"pausing revid") - err = stopRevid(rv) - if err != nil { - log.Log(logger.Error, pkg+"failed to stop revide", "error", err.Error()) + if err = rv.Stop(); err != nil { + log.Log(logger.Error, pkg+"failed to stop revid", "error", err.Error()) continue } - paused = true + ns.SetMode(paused) } - } else { - rv, cfg, err = updateRevid(&ns, rv, cfg, vars, !paused) - if err != nil { + case normal: + if err = rv.Update(vars); err != nil { return err } - if paused { - paused = false + ns.SetMode(normal) + case burst: + if err = rv.Start(); err != nil { + return err } + ns.SetMode(burst) + time.Sleep(rv.Config().BurstPeriod) + if err = rv.Stop(); err != nil { + return err + } + ns.SetMode(paused) } } sleepTime, _ := strconv.Atoi(ns.Param("mp")) @@ -323,117 +341,6 @@ func send(ns *netsender.Sender, rv *revid.Revid) error { return nil } -func updateRevid(ns *netsender.Sender, rv *revid.Revid, cfg revid.Config, vars map[string]string, stop bool) (*revid.Revid, revid.Config, error) { - if stop { - err := stopRevid(rv) - if err != nil { - return nil, cfg, err - } - } - - //look through the vars and update revid where needed - for key, value := range vars { - switch key { - case "Output": - // FIXME(kortschak): There can be only one! - // How do we specify outputs after the first? - // - // Maybe we shouldn't be doing this! - switch value { - case "File": - cfg.Outputs[0] = revid.File - case "Http": - cfg.Outputs[0] = revid.Http - case "Rtmp": - cfg.Outputs[0] = revid.Rtmp - case "FfmpegRtmp": - cfg.Outputs[0] = revid.FfmpegRtmp - default: - log.Log(logger.Warning, pkg+"invalid Output1 param", "value", value) - continue - } - case "FramesPerClip": - f, err := strconv.ParseUint(value, 10, 0) - if err != nil { - log.Log(logger.Warning, pkg+"invalid framesperclip param", "value", value) - break - } - cfg.FramesPerClip = uint(f) - case "RtmpUrl": - cfg.RtmpUrl = value - case "Bitrate": - r, err := strconv.ParseUint(value, 10, 0) - if err != nil { - log.Log(logger.Warning, pkg+"invalid framerate param", "value", value) - break - } - cfg.Bitrate = uint(r) - case "OutputFileName": - cfg.OutputFileName = value - case "InputFileName": - cfg.InputFileName = value - case "Height": - h, err := strconv.ParseUint(value, 10, 0) - if err != nil { - log.Log(logger.Warning, pkg+"invalid height param", "value", value) - break - } - cfg.Height = uint(h) - case "Width": - w, err := strconv.ParseUint(value, 10, 0) - if err != nil { - log.Log(logger.Warning, pkg+"invalid width param", "value", value) - break - } - cfg.Width = uint(w) - case "FrameRate": - r, err := strconv.ParseUint(value, 10, 0) - if err != nil { - log.Log(logger.Warning, pkg+"invalid framerate param", "value", value) - break - } - cfg.FrameRate = uint(r) - case "HttpAddress": - cfg.HttpAddress = value - case "Quantization": - q, err := strconv.ParseUint(value, 10, 0) - if err != nil { - log.Log(logger.Warning, pkg+"invalid quantization param", "value", value) - break - } - cfg.Quantization = uint(q) - case "IntraRefreshPeriod": - p, err := strconv.ParseUint(value, 10, 0) - if err != nil { - log.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value) - break - } - cfg.IntraRefreshPeriod = uint(p) - case "HorizontalFlip": - switch strings.ToLower(value) { - case "true": - cfg.FlipHorizontal = true - case "false": - cfg.FlipHorizontal = false - default: - log.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value) - } - case "VerticalFlip": - switch strings.ToLower(value) { - case "true": - cfg.FlipVertical = true - case "false": - cfg.FlipVertical = false - default: - log.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value) - } - default: - } - } - - return startRevid(ns, cfg) -} - // flagStrings implements an appending string set flag. type flagStrings []string diff --git a/revid/revid.go b/revid/revid.go index 33d847ea..c90ba8da 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -405,19 +405,19 @@ func (r *Revid) Update(vars map[string]string) error { case "FramesPerClip": f, err := strconv.ParseUint(value, 10, 0) if err != nil { - .Log(logger.Warning, pkg+"invalid framesperclip param", "value", value) + r.config.Logger.Log(logger.Warning, pkg+"invalid framesperclip param", "value", value) break } r.config.FramesPerClip = uint(f) case "RtmpUrl": r.config.RtmpUrl = value case "Bitrate": - r, err := strconv.ParseUint(value, 10, 0) + v, err := strconv.ParseUint(value, 10, 0) if err != nil { - log.Log(logger.Warning, pkg+"invalid framerate param", "value", value) + r.config.Logger.Log(logger.Warning, pkg+"invalid framerate param", "value", value) break } - r.config.Bitrate = uint(r) + r.config.Bitrate = uint(v) case "OutputFileName": r.config.OutputFileName = value case "InputFileName": @@ -425,37 +425,37 @@ func (r *Revid) Update(vars map[string]string) error { case "Height": h, err := strconv.ParseUint(value, 10, 0) if err != nil { - log.Log(logger.Warning, pkg+"invalid height param", "value", value) + r.config.Logger.Log(logger.Warning, pkg+"invalid height param", "value", value) break } r.config.Height = uint(h) case "Width": w, err := strconv.ParseUint(value, 10, 0) if err != nil { - log.Log(logger.Warning, pkg+"invalid width param", "value", value) + r.config.Logger.Log(logger.Warning, pkg+"invalid width param", "value", value) break } r.config.Width = uint(w) case "FrameRate": - r, err := strconv.ParseUint(value, 10, 0) + v, err := strconv.ParseUint(value, 10, 0) if err != nil { - log.Log(logger.Warning, pkg+"invalid framerate param", "value", value) + r.config.Logger.Log(logger.Warning, pkg+"invalid framerate param", "value", value) break } - r.config.FrameRate = uint(r) + r.config.FrameRate = uint(v) case "HttpAddress": r.config.HttpAddress = value case "Quantization": q, err := strconv.ParseUint(value, 10, 0) if err != nil { - log.Log(logger.Warning, pkg+"invalid quantization param", "value", value) + r.config.Logger.Log(logger.Warning, pkg+"invalid quantization param", "value", value) break } r.config.Quantization = uint(q) case "IntraRefreshPeriod": p, err := strconv.ParseUint(value, 10, 0) if err != nil { - log.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value) + r.config.Logger.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value) break } r.config.IntraRefreshPeriod = uint(p) @@ -466,7 +466,7 @@ func (r *Revid) Update(vars map[string]string) error { case "false": r.config.FlipHorizontal = false default: - log.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value) + r.config.Logger.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value) } case "VerticalFlip": switch strings.ToLower(value) { @@ -475,7 +475,7 @@ func (r *Revid) Update(vars map[string]string) error { case "false": r.config.FlipVertical = false default: - log.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value) + r.config.Logger.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value) } default: } From 6171c4e9994a87722733249a11e0d607e8e788b5 Mon Sep 17 00:00:00 2001 From: saxon Date: Sun, 3 Feb 2019 23:43:51 +1030 Subject: [PATCH 20/34] revid: added handling of burstPeriod to config --- revid/config.go | 7 +++++++ revid/revid.go | 7 +++++++ 2 files changed, 14 insertions(+) diff --git a/revid/config.go b/revid/config.go index dc9c5a8d..b0ba2bc4 100644 --- a/revid/config.go +++ b/revid/config.go @@ -68,6 +68,7 @@ type Config struct { RtpAddress string Logger Logger SendRetry bool + BurstPeriod uint } // Enums for config struct @@ -114,6 +115,7 @@ const ( defaultInputCodec = H264 defaultVerbosity = No // FIXME(kortschak): This makes no sense whatsoever. No is currently 15. defaultRtpAddr = "localhost:6970" + defaultBurstPeriod = 10 // Seconds ) // Validate checks for any errors in the config fields and defaults settings @@ -200,6 +202,11 @@ func (c *Config) Validate(r *Revid) error { } } + if c.BurstPeriod == 0 { + c.Logger.Log(logger.Warning, pkg+"no burst period defined, defaulting", "burstPeriod", defaultBurstPeriod) + c.BurstPeriod = defaultBurstPeriod + } + if c.FramesPerClip < 1 { c.Logger.Log(logger.Warning, pkg+"no FramesPerClip defined, defaulting", "framesPerClip", defaultFramesPerClip) diff --git a/revid/revid.go b/revid/revid.go index c90ba8da..9f1cc12c 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -477,6 +477,13 @@ func (r *Revid) Update(vars map[string]string) error { default: r.config.Logger.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value) } + case "BurstPeriod": + v, err := strconv.ParseUint(value, 10, 0) + if err != nil { + r.config.Logger.Log(logger.Warning, pkg+"invalid BurstPeriod param", "value", value) + break + } + r.config.BurstPeriod = uint(v) default: } } From ee7eb84d26e29b3d9cab7005fddd48d1847a8211 Mon Sep 17 00:00:00 2001 From: saxon Date: Mon, 4 Feb 2019 13:25:37 +1030 Subject: [PATCH 21/34] revid-cli: correctly using ns.Mode() and ns.SetMode() --- cmd/revid-cli/main.go | 75 +++++++++++++++++++++++-------------------- revid/revid.go | 2 +- 2 files changed, 41 insertions(+), 36 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 58f8e507..ac8fff12 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -266,52 +266,57 @@ func run(cfg revid.Config) error { log.Log(logger.Fatal, pkg+"could not initialise revid", "error", err.Error()) } + // Update revid to get latest config settings from netreceiver. if err = rv.Update(vars); err != nil { return err } + // If mode on netreceiver isn't paused then we can start revid. + if ns.Mode() != paused { + if err = rv.Start(); err != nil { + return err + } + } + for { - if err := send(&ns, rv); err != nil { - log.Log(logger.Error, pkg+"polling failed", "error", err.Error()) + if err := ns.Run(); err != nil { + log.Log(logger.Error, pkg+"Run Failed. Retrying...") time.Sleep(netSendRetryTime) continue } - if vs != ns.VarSum() { - vars, err := ns.Vars() - if err != nil { - log.Log(logger.Error, pkg+"netSender failed to get vars", "error", err.Error()) - time.Sleep(netSendRetryTime) - continue - } - vs = ns.VarSum() + // If var sum hasn't change we continue + if vs == ns.VarSum() { + continue + } - switch ns.Mode() { - case paused: - if rv.IsRunning() { - log.Log(logger.Info, pkg+"pausing revid") - if err = rv.Stop(); err != nil { - log.Log(logger.Error, pkg+"failed to stop revid", "error", err.Error()) - continue - } - ns.SetMode(paused) - } - case normal: - if err = rv.Update(vars); err != nil { - return err - } - ns.SetMode(normal) - case burst: - if err = rv.Start(); err != nil { - return err - } - ns.SetMode(burst) - time.Sleep(rv.Config().BurstPeriod) - if err = rv.Stop(); err != nil { - return err - } - ns.SetMode(paused) + vars, err := ns.Vars() + if err != nil { + log.Log(logger.Error, pkg+"netSender failed to get vars", "error", err.Error()) + time.Sleep(netSendRetryTime) + continue + } + vs = ns.VarSum() + + if err = rv.Update(vars); err != nil { + return err + } + + switch ns.Mode() { + case paused: + case normal: + if err = rv.Start(); err != nil { + return err } + case burst: + if err = rv.Start(); err != nil { + return err + } + time.Sleep(time.Duration(rv.Config().BurstPeriod)) + if err = rv.Stop(); err != nil { + return err + } + ns.SetMode(paused, &vs) } sleepTime, _ := strconv.Atoi(ns.Param("mp")) time.Sleep(time.Duration(sleepTime) * time.Second) diff --git a/revid/revid.go b/revid/revid.go index 9f1cc12c..e4c7dbbe 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -488,7 +488,7 @@ func (r *Revid) Update(vars map[string]string) error { } } - return r.Start() + return nil } // outputClips takes the clips produced in the packClips method and outputs them From 93e3899725b8e6029c1cc80265f0ba1405eb4b7e Mon Sep 17 00:00:00 2001 From: saxon Date: Mon, 4 Feb 2019 17:04:49 +1030 Subject: [PATCH 22/34] cmd/revid-cli: using ns.Send() rather than ns.Run() to poll --- cmd/revid-cli/main.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index ac8fff12..bcabf40d 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -272,15 +272,19 @@ func run(cfg revid.Config) error { } // If mode on netreceiver isn't paused then we can start revid. - if ns.Mode() != paused { + if ns.Mode() != paused && ns.Mode() != burst { if err = rv.Start(); err != nil { return err } } + if ns.Mode() == burst { + ns.SetMode(normal, &vs) + } + for { - if err := ns.Run(); err != nil { - log.Log(logger.Error, pkg+"Run Failed. Retrying...") + if err := send(&ns, rv); err != nil { + log.Log(logger.Error, pkg+"Run Failed. Retrying...", "error", err.Error()) time.Sleep(netSendRetryTime) continue } @@ -309,10 +313,12 @@ func run(cfg revid.Config) error { return err } case burst: + log.Log(logger.Info, pkg+"Starting burst...") if err = rv.Start(); err != nil { return err } - time.Sleep(time.Duration(rv.Config().BurstPeriod)) + time.Sleep(time.Duration(rv.Config().BurstPeriod) * time.Second) + log.Log(logger.Info, pkg+"Stopping burst...") if err = rv.Stop(); err != nil { return err } From 8978f9edc59d050a1ee590edfd58f59f6cf2ace5 Mon Sep 17 00:00:00 2001 From: saxon Date: Mon, 4 Feb 2019 17:12:30 +1030 Subject: [PATCH 23/34] cmd/revid-cli & revid: using goto to sleep for monitor period, and using wg.Done() at the end of output routine. --- cmd/revid-cli/main.go | 10 +++++++--- revid/revid.go | 1 + 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index bcabf40d..1eafaae9 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -253,12 +253,15 @@ func handleFlags() revid.Config { func run(cfg revid.Config) error { log.Log(logger.Info, pkg+"running in NetSender mode") + var err error + var vars map[string]string + // initialize NetSender and use NetSender's logger var ns netsender.Sender if err := ns.Init(log, nil, nil, nil); err != nil { return err } - vars, _ := ns.Vars() + vars, _ = ns.Vars() vs := ns.VarSum() rv, err := revid.New(cfg, &ns) @@ -291,10 +294,10 @@ func run(cfg revid.Config) error { // If var sum hasn't change we continue if vs == ns.VarSum() { - continue + goto sleep } - vars, err := ns.Vars() + vars, err = ns.Vars() if err != nil { log.Log(logger.Error, pkg+"netSender failed to get vars", "error", err.Error()) time.Sleep(netSendRetryTime) @@ -324,6 +327,7 @@ func run(cfg revid.Config) error { } ns.SetMode(paused, &vs) } + sleep: sleepTime, _ := strconv.Atoi(ns.Param("mp")) time.Sleep(time.Duration(sleepTime) * time.Second) } diff --git a/revid/revid.go b/revid/revid.go index e4c7dbbe..d0f3abf6 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -582,6 +582,7 @@ loop: r.config.Logger.Log(logger.Error, pkg+"failed to close output"+strconv.Itoa(i)+" destination", "error", err.Error()) } } + r.wg.Done() } // startRaspivid sets up things for input from raspivid i.e. starts From 1cdbfa2c66d2ccae05cf86ed9e81e8d6a4bae9c1 Mon Sep 17 00:00:00 2001 From: saxon Date: Mon, 4 Feb 2019 17:14:19 +1030 Subject: [PATCH 24/34] cmd/revid-cli: setting mode to paused if ns is in burst mode. --- cmd/revid-cli/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 1eafaae9..b33ef124 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -282,7 +282,7 @@ func run(cfg revid.Config) error { } if ns.Mode() == burst { - ns.SetMode(normal, &vs) + ns.SetMode(paused, &vs) } for { From bd2958ba4e0ccc62fe8c5af4d97cf7951bfcaa59 Mon Sep 17 00:00:00 2001 From: saxon Date: Mon, 4 Feb 2019 19:14:02 +1030 Subject: [PATCH 25/34] cmd/revid-cli & revid: added TODO for the use of Run() instead of send in cmd/revid-cli/main.go. Fixed filename in revid/revid.go file header. Renamed ret to cfg in revid.Config(). Catching error from call to revid.Stop() in revid.Udate() --- cmd/revid-cli/main.go | 1 + revid/revid.go | 10 ++++++---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index b33ef124..29191d3a 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -286,6 +286,7 @@ func run(cfg revid.Config) error { } for { + // TODO(saxon): replace this call with call to ns.Run(). if err := send(&ns, rv); err != nil { log.Log(logger.Error, pkg+"Run Failed. Retrying...", "error", err.Error()) time.Sleep(netSendRetryTime) diff --git a/revid/revid.go b/revid/revid.go index d0f3abf6..a3c10b66 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -1,6 +1,6 @@ /* NAME - r.go + revid.go DESCRIPTION See Readme.md @@ -329,9 +329,9 @@ func (r *Revid) IsRunning() bool { func (r *Revid) Config() Config { r.mu.Lock() - ret := r.config + cfg := r.config r.mu.Unlock() - return ret + return cfg } // setIsRunning sets r.isRunning using b. @@ -379,7 +379,9 @@ func (r *Revid) Stop() error { func (r *Revid) Update(vars map[string]string) error { if r.IsRunning() { - r.Stop() + if err := r.Stop(); err != nil { + return err + } } //look through the vars and update revid where needed for key, value := range vars { From 35344402b848ec34a81260002e0394a7452fd225 Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 5 Feb 2019 10:32:16 +1030 Subject: [PATCH 26/34] cmd/revid-cli/main.go: not using closed scope conditions anymore --- cmd/revid-cli/main.go | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 29191d3a..52bb283b 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -253,14 +253,15 @@ func handleFlags() revid.Config { func run(cfg revid.Config) error { log.Log(logger.Info, pkg+"running in NetSender mode") - var err error var vars map[string]string // initialize NetSender and use NetSender's logger var ns netsender.Sender - if err := ns.Init(log, nil, nil, nil); err != nil { + err := ns.Init(log, nil, nil, nil) + if err != nil { return err } + vars, _ = ns.Vars() vs := ns.VarSum() @@ -270,7 +271,8 @@ func run(cfg revid.Config) error { } // Update revid to get latest config settings from netreceiver. - if err = rv.Update(vars); err != nil { + err = rv.Update(vars) + if err != nil { return err } @@ -287,7 +289,8 @@ func run(cfg revid.Config) error { for { // TODO(saxon): replace this call with call to ns.Run(). - if err := send(&ns, rv); err != nil { + err = send(&ns, rv) + if err != nil { log.Log(logger.Error, pkg+"Run Failed. Retrying...", "error", err.Error()) time.Sleep(netSendRetryTime) continue @@ -306,24 +309,28 @@ func run(cfg revid.Config) error { } vs = ns.VarSum() - if err = rv.Update(vars); err != nil { + err = rv.Update(vars) + if err != nil { return err } switch ns.Mode() { case paused: case normal: - if err = rv.Start(); err != nil { + err = rv.Start() + if err != nil { return err } case burst: log.Log(logger.Info, pkg+"Starting burst...") - if err = rv.Start(); err != nil { + err = rv.Start() + if err != nil { return err } time.Sleep(time.Duration(rv.Config().BurstPeriod) * time.Second) log.Log(logger.Info, pkg+"Stopping burst...") - if err = rv.Stop(); err != nil { + err = rv.Stop() + if err != nil { return err } ns.SetMode(paused, &vs) From 4dcbd904499c4b51516cba0c90f3eebeef94df53 Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 5 Feb 2019 10:40:08 +1030 Subject: [PATCH 27/34] cmd/revid-cli: removed another closed scope condition --- cmd/revid-cli/main.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 52bb283b..f37144a2 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -278,7 +278,8 @@ func run(cfg revid.Config) error { // If mode on netreceiver isn't paused then we can start revid. if ns.Mode() != paused && ns.Mode() != burst { - if err = rv.Start(); err != nil { + err = rv.Start() + if err != nil { return err } } From ea8572a777a90b90bb3e19564402426337b73de5 Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 5 Feb 2019 10:45:15 +1030 Subject: [PATCH 28/34] cmd/revid-cli: catching error in conversion of mp --- cmd/revid-cli/main.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index f37144a2..1c2b9c90 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -337,7 +337,10 @@ func run(cfg revid.Config) error { ns.SetMode(paused, &vs) } sleep: - sleepTime, _ := strconv.Atoi(ns.Param("mp")) + sleepTime, err := strconv.Atoi(ns.Param("mp")) + if err != nil { + return err + } time.Sleep(time.Duration(sleepTime) * time.Second) } } From a4d179039b4086facc14cb5eb6b719d09bc0873c Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 5 Feb 2019 10:49:05 +1030 Subject: [PATCH 29/34] revid/revid.go: removed default case in switch with revid.Update() --- revid/revid.go | 1 - 1 file changed, 1 deletion(-) diff --git a/revid/revid.go b/revid/revid.go index a3c10b66..8c4b57ed 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -486,7 +486,6 @@ func (r *Revid) Update(vars map[string]string) error { break } r.config.BurstPeriod = uint(v) - default: } } From de4f471201bc370186924b21e955884066e1c1d3 Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 5 Feb 2019 10:50:21 +1030 Subject: [PATCH 30/34] revid/revid.go: defer r.wg.Done() in revid.outputClips routine --- revid/revid.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/revid/revid.go b/revid/revid.go index 8c4b57ed..02656d03 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -495,6 +495,7 @@ func (r *Revid) Update(vars map[string]string) error { // outputClips takes the clips produced in the packClips method and outputs them // to the desired output defined in the revid config func (r *Revid) outputClips() { + defer r.wg.Done() lastTime := time.Now() var count int loop: @@ -583,7 +584,6 @@ loop: r.config.Logger.Log(logger.Error, pkg+"failed to close output"+strconv.Itoa(i)+" destination", "error", err.Error()) } } - r.wg.Done() } // startRaspivid sets up things for input from raspivid i.e. starts From 9ef5886d669c59b4a0dc69caa82a7867b6828af0 Mon Sep 17 00:00:00 2001 From: saxon Date: Thu, 7 Feb 2019 19:58:08 +1030 Subject: [PATCH 31/34] created experimentation dir under av, and moved flac package here. created experimentation dir under av, and moved flac pkg here. experimentation/flac: removed wav file --- {stream => experimentation}/flac/decode.go | 0 {stream => experimentation}/flac/flac_test.go | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename {stream => experimentation}/flac/decode.go (100%) rename {stream => experimentation}/flac/flac_test.go (100%) diff --git a/stream/flac/decode.go b/experimentation/flac/decode.go similarity index 100% rename from stream/flac/decode.go rename to experimentation/flac/decode.go diff --git a/stream/flac/flac_test.go b/experimentation/flac/flac_test.go similarity index 100% rename from stream/flac/flac_test.go rename to experimentation/flac/flac_test.go From 700328627dcdce15c04f32c705e8533d1733103e Mon Sep 17 00:00:00 2001 From: saxon Date: Fri, 8 Feb 2019 00:14:22 +1030 Subject: [PATCH 32/34] cmd/ts-repair: added required consts and undid changes to mts pkg --- cmd/ts-repair/main.go | 43 ++++++++++++++++++++++++------------- stream/mts/encoder.go | 39 ++++++++++++++++++++++------------ stream/mts/mpegts.go | 49 +------------------------------------------ 3 files changed, 56 insertions(+), 75 deletions(-) diff --git a/cmd/ts-repair/main.go b/cmd/ts-repair/main.go index f3f73bb2..f95d16fe 100644 --- a/cmd/ts-repair/main.go +++ b/cmd/ts-repair/main.go @@ -11,6 +11,21 @@ import ( "github.com/Comcast/gots/packet" ) +const ( + PatPid = 0 + PmtPid = 4096 + VideoPid = 256 + HeadSize = 4 + DefaultAdaptationSize = 2 + AdaptationIdx = 4 + AdaptationControlIdx = 3 + AdaptationBodyIdx = AdaptationIdx + 1 + AdaptationControlMask = 0x30 + DefaultAdaptationBodySize = 1 + DiscontinuityIndicatorMask = 0x80 + DiscontinuityIndicatorIdx = AdaptationIdx + 1 +) + // Various errors that we can encounter. const ( errBadInPath = "No file path provided, or file does not exist" @@ -37,9 +52,9 @@ const ( ) var ccMap = map[int]byte{ - mts.PatPid: 16, - mts.PmtPid: 16, - mts.VideoPid: 16, + PatPid: 16, + PmtPid: 16, + VideoPid: 16, } // packetNo will keep track of the ts packet number for reference. @@ -48,8 +63,8 @@ var packetNo int // Option defines a func that performs an action on p in order to change a ts option. type Option func(p *Packet) -// Packet is a byte array of size mts.PacketSize i.e. 188 bytes. We define this -// to allow us to write receiver funcs for the [mts.PacketSize]byte type. +// Packet is a byte array of size PacketSize i.e. 188 bytes. We define this +// to allow us to write receiver funcs for the [PacketSize]byte type. type Packet [mts.PacketSize]byte // CC returns the CC of p. @@ -78,12 +93,12 @@ func (p *Packet) addAdaptationField(options ...Option) error { return errors.New(errAdaptationPresent) } // Create space for adaptation field. - copy(p[mts.HeadSize+mts.DefaultAdaptationSize:], p[mts.HeadSize:len(p)-mts.DefaultAdaptationSize]) + copy(p[HeadSize+DefaultAdaptationSize:], p[HeadSize:len(p)-DefaultAdaptationSize]) // TODO: seperate into own function // Update adaptation field control. - p[mts.AdaptationControlIdx] &= 0xff ^ mts.AdaptationControlMask - p[mts.AdaptationControlIdx] |= mts.AdaptationControlMask + p[AdaptationControlIdx] &= 0xff ^ AdaptationControlMask + p[AdaptationControlIdx] |= AdaptationControlMask // Default the adaptationfield. p.resetAdaptation() @@ -100,14 +115,14 @@ func (p *Packet) resetAdaptation() error { if !p.hasAdaptation() { return errors.New(errNoAdaptationField) } - p[mts.AdaptationIdx] = mts.DefaultAdaptationBodySize - p[mts.AdaptationBodyIdx] = 0x00 + p[AdaptationIdx] = DefaultAdaptationBodySize + p[AdaptationBodyIdx] = 0x00 return nil } // hasAdaptation returns true if p has an adaptation field and false otherwise. func (p *Packet) hasAdaptation() bool { - afc := p[mts.AdaptationControlIdx] & mts.AdaptationControlMask + afc := p[AdaptationControlIdx] & AdaptationControlMask if afc == 0x20 || afc == 0x30 { return true } else { @@ -119,12 +134,12 @@ func (p *Packet) hasAdaptation() bool { // indicator according to f. func DiscontinuityIndicator(f bool) Option { return func(p *Packet) { - set := byte(mts.DiscontinuityIndicatorMask) + set := byte(DiscontinuityIndicatorMask) if !f { set = 0x00 } - p[mts.DiscontinuityIndicatorIdx] &= 0xff ^ mts.DiscontinuityIndicatorMask - p[mts.DiscontinuityIndicatorIdx] |= mts.DiscontinuityIndicatorMask & set + p[DiscontinuityIndicatorIdx] &= 0xff ^ DiscontinuityIndicatorMask + p[DiscontinuityIndicatorIdx] |= DiscontinuityIndicatorMask & set } } diff --git a/stream/mts/encoder.go b/stream/mts/encoder.go index e49d57b5..02761b91 100644 --- a/stream/mts/encoder.go +++ b/stream/mts/encoder.go @@ -171,6 +171,14 @@ var ( pmtTable = standardPmtTimeLocation.Bytes() ) +const ( + sdtPid = 17 + patPid = 0 + pmtPid = 4096 + videoPid = 256 + streamID = 0xe0 // First video stream ID. +) + // Time related constants. const ( // ptsOffset is the offset added to the clock to determine @@ -205,13 +213,18 @@ func NewEncoder(dst io.Writer, fps float64) *Encoder { ptsOffset: ptsOffset, continuity: map[int]byte{ - PatPid: 0, - PmtPid: 0, - VideoPid: 0, + patPid: 0, + pmtPid: 0, + videoPid: 0, }, } } +const ( + hasPayload = 0x1 + hasAdaptationField = 0x2 +) + const ( hasDTS = 0x1 hasPTS = 0x2 @@ -231,7 +244,7 @@ func (e *Encoder) Encode(nalu []byte) error { // Prepare PES data. pesPkt := pes.Packet{ - StreamID: StreamID, + StreamID: streamID, PDI: hasPTS, PTS: e.pts(), Data: nalu, @@ -243,10 +256,10 @@ func (e *Encoder) Encode(nalu []byte) error { for len(buf) != 0 { pkt := Packet{ PUSI: pusi, - PID: VideoPid, + PID: videoPid, RAI: pusi, - CC: e.ccFor(VideoPid), - AFC: HasAdaptationField | HasPayload, + CC: e.ccFor(videoPid), + AFC: hasAdaptationField | hasPayload, PCRF: pusi, } n := pkt.FillPayload(buf) @@ -275,9 +288,9 @@ func (e *Encoder) writePSI() error { // Write PAT. patPkt := Packet{ PUSI: true, - PID: PatPid, - CC: e.ccFor(PatPid), - AFC: HasPayload, + PID: patPid, + CC: e.ccFor(patPid), + AFC: hasPayload, Payload: patTable, } _, err := e.dst.Write(patPkt.Bytes(e.tsSpace[:PacketSize])) @@ -298,9 +311,9 @@ func (e *Encoder) writePSI() error { // Create mts packet from pmt table. pmtPkt := Packet{ PUSI: true, - PID: PmtPid, - CC: e.ccFor(PmtPid), - AFC: HasPayload, + PID: pmtPid, + CC: e.ccFor(pmtPid), + AFC: hasPayload, Payload: pmtTable, } _, err = e.dst.Write(pmtPkt.Bytes(e.tsSpace[:PacketSize])) diff --git a/stream/mts/mpegts.go b/stream/mts/mpegts.go index 71849513..0bef80d2 100644 --- a/stream/mts/mpegts.go +++ b/stream/mts/mpegts.go @@ -37,53 +37,6 @@ const ( PayloadSize = 176 ) -const ( - SdtPid = 17 - PatPid = 0 - PmtPid = 4096 - VideoPid = 256 - StreamID = 0xe0 // First video stream ID. - HeadSize = 4 - DefaultAdaptationSize = 2 -) - -const ( - AdaptationIdx = 4 - AdaptationControlIdx = 3 - AdaptationBodyIdx = AdaptationIdx + 1 - AdaptationControlMask = 0x30 - DefaultAdaptationBodySize = 1 -) - -const ( - HasPayload = 0x1 - HasAdaptationField = 0x2 -) - -// Adaptation field body masks. -const ( - DiscontinuityIndicatorMask = 0x80 - RandomAccessIndicatorMask = 0x40 - ElementaryStreamPriorityIndicatorMask = 0x20 - ProgramClockReferenceFlagMask = 0x10 - OriginalProgramClockReferenceFlagMask = 0x08 - SplicingPointFlagMask = 0x04 - TransportPrivateDataFlagMask = 0x02 - AdaptationFieldExtensionMask = 0x01 -) - -// Adaptation field body indexes. -const ( - DiscontinuityIndicatorIdx = AdaptationIdx + 1 - RandomAccessIndicatorIdx = AdaptationIdx + 1 - ElementaryStreamPriorityIndicatorIdx = AdaptationIdx + 1 - ProgramClockReferenceFlagIdx = AdaptationIdx + 1 - OriginalProgramClockReferenceFlagIdx = AdaptationIdx + 1 - SplicingPointFlagIdx = AdaptationIdx + 1 - TransportPrivateDataFlagIdx = AdaptationIdx + 1 - AdaptationFieldExtensionFlagIdx = AdaptationIdx + 1 -) - /* The below data struct encapsulates the fields of an MPEG-TS packet. Below is the formatting of an MPEG-TS packet for reference! @@ -182,7 +135,7 @@ func FindPMT(d []byte) (p []byte, i int, err error) { } for i = 0; i < len(d); i += PacketSize { pid := (uint16(d[i+1]&0x1f) << 8) | uint16(d[i+2]) - if pid == PmtPid { + if pid == pmtPid { p = d[i+4 : i+PacketSize] return } From 7d85b78b6d5b546e0defc7952654990fb71b2fcc Mon Sep 17 00:00:00 2001 From: saxon Date: Fri, 8 Feb 2019 00:16:19 +1030 Subject: [PATCH 33/34] moved cmd/ts-repair to experimentation --- {cmd => experimentation}/ts-repair/main.go | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename {cmd => experimentation}/ts-repair/main.go (100%) diff --git a/cmd/ts-repair/main.go b/experimentation/ts-repair/main.go similarity index 100% rename from cmd/ts-repair/main.go rename to experimentation/ts-repair/main.go From b2150cf8ded9d84ed051092d13bfd91f9cbdefd9 Mon Sep 17 00:00:00 2001 From: saxon Date: Fri, 8 Feb 2019 00:25:47 +1030 Subject: [PATCH 34/34] experimentation/ts-repair: added description to file header --- experimentation/ts-repair/main.go | 33 +++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/experimentation/ts-repair/main.go b/experimentation/ts-repair/main.go index f95d16fe..bed81f19 100644 --- a/experimentation/ts-repair/main.go +++ b/experimentation/ts-repair/main.go @@ -1,3 +1,36 @@ +/* +NAME + ts-repair/main.go + +DESCRIPTION + This program attempts to repair mpegts discontinuities using one of two methods + as selected by the mode flag. Setting the mode flag to 0 will result in repair + by shifting all CCs such that they are continuous. Setting the mode flag to 1 + will result in repair through setting the discontinuity indicator to true at + packets where a discontinuity exists. + + Specify the input file with the in flag, and the output file with out flag. + +AUTHOR + Saxon A. Nelson-Milton + +LICENSE + mpegts.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). +*/ + package main import (