revid,parse: wire parse.H264 to revid

Through experimentation I have realised the previous ordering of NALU
types does not work with YouTube. I have also exhaustively checked the
requirement for each of the 3 NALU types that are checked and all are
needed. Finally, I have checked whether 7 is needed because of its
inclusion in a parallel branch; checking for this type prevents this
code from working.
This commit is contained in:
Dan Kortschak 2018-06-28 18:24:23 +09:30
parent 353d2cfcc7
commit db6ca2922a
3 changed files with 228 additions and 131 deletions

View File

@ -44,9 +44,9 @@ func init() {
var h264Prefix = [...]byte{0x00, 0x00, 0x01, 0x09, 0xf0} var h264Prefix = [...]byte{0x00, 0x00, 0x01, 0x09, 0xf0}
// H264 parses H.264 NAL units read from src into separate writes to dst with // H264 lexes H.264 NAL units read from src into separate writes to dst with
// successive writes being performed not earlier than the specified delay. // successive writes being performed not earlier than the specified delay.
// NAL units are split at type 1 (Coded slice of a non-IDR picture), 5 // NAL units are split after type 1 (Coded slice of a non-IDR picture), 5
// (Coded slice of a IDR picture) and 8 (Picture parameter set). // (Coded slice of a IDR picture) and 8 (Picture parameter set).
func H264(dst io.Writer, src io.Reader, delay time.Duration) error { func H264(dst io.Writer, src io.Reader, delay time.Duration) error {
var tick <-chan time.Time var tick <-chan time.Time
@ -58,35 +58,62 @@ func H264(dst io.Writer, src io.Reader, delay time.Duration) error {
tick = ticker.C tick = ticker.C
} }
r := bufio.NewReader(src) const bufSize = 8 << 10
buf := make([]byte, len(h264Prefix), 4<<10) c := newScanner(src, make([]byte, 4<<10)) // Standard file buffer size.
buf := make([]byte, len(h264Prefix), bufSize)
copy(buf, h264Prefix[:]) copy(buf, h264Prefix[:])
var zeroes int writeOut := false
outer:
for { for {
b, err := r.ReadByte() var b byte
var err error
buf, b, err = c.scanUntilZeroInto(buf)
if err != nil { if err != nil {
if err != io.EOF { if err != io.EOF {
return err return err
} }
if len(buf) == len(h264Prefix) { break
return nil
} }
<-tick
_, err = dst.Write(buf) for n := 1; b == 0x0 && n < 4; n++ {
return err b, err = c.readByte()
}
if b == 0 {
zeroes++
}
if (zeroes == 2 || zeroes == 3) && b == 1 && len(buf)-len(h264Prefix) > zeroes {
b, err = r.ReadByte()
if err != nil { if err != nil {
if err == io.EOF { if err != io.EOF {
err = io.ErrUnexpectedEOF
}
return err return err
} }
break outer
}
buf = append(buf, b)
if b != 0x1 || (n != 2 && n != 3) {
continue
}
if writeOut {
<-tick
_, err := dst.Write(buf[:len(buf)-(n+1)])
if err != nil {
return err
}
buf = make([]byte, len(h264Prefix)+n, bufSize)
copy(buf, h264Prefix[:])
buf = append(buf, 1)
writeOut = false
}
b, err = c.readByte()
if err != nil {
if err != io.EOF {
return err
}
break outer
}
buf = append(buf, b)
// http://www.itu.int/rec/dologin_pub.asp?lang=e&id=T-REC-H.264-200305-S!!PDF-E&type=items
// Table 7-1 NAL unit type codes
const ( const (
nonIdrPic = 1 nonIdrPic = 1
idrPic = 5 idrPic = 5
@ -94,23 +121,83 @@ func H264(dst io.Writer, src io.Reader, delay time.Duration) error {
) )
switch nalTyp := b & 0x1f; nalTyp { switch nalTyp := b & 0x1f; nalTyp {
case nonIdrPic, idrPic, paramSet: case nonIdrPic, idrPic, paramSet:
writeOut = true
}
}
}
if len(buf) == len(h264Prefix) {
return nil
}
<-tick <-tick
_, err = dst.Write(buf[:len(buf)-zeroes]) _, err := dst.Write(buf)
if err != nil {
return err return err
} }
buf = make([]byte, zeroes+len(h264Prefix), 4<<10)
copy(buf, h264Prefix[:]) // scanner is a byte scanner.
zeroes = 0 type scanner struct {
} buf []byte
buf = append(buf, 1, b) off int
// r is the source of data for the scanner.
r io.Reader
}
// newScanner returns a scanner initialised with an io.Reader and a read buffer.
func newScanner(r io.Reader, buf []byte) *scanner {
return &scanner{r: r, buf: buf[:0]}
}
// scanUntilZeroInto scans the scanner's underlying io.Reader until a zero byte
// has been read, appending all read bytes to dst. The resulting appended data,
// the last read byte and whether the last read byte was zero are returned.
func (c *scanner) scanUntilZeroInto(dst []byte) (res []byte, b byte, err error) {
outer:
for {
var i int
for i, b = range c.buf[c.off:] {
if b != 0x0 {
continue continue
} }
if b != 0 { dst = append(dst, c.buf[c.off:c.off+i+1]...)
zeroes = 0 c.off += i + 1
break outer
} }
buf = append(buf, b) dst = append(dst, c.buf[c.off:]...)
err = c.reload()
if err != nil {
break
} }
}
return dst, b, err
}
// readByte is an unexported ReadByte.
func (c *scanner) readByte() (byte, error) {
if c.off >= len(c.buf) {
err := c.reload()
if err != nil {
return 0, err
}
}
b := c.buf[c.off]
c.off++
return b, nil
}
// reload re-fills the scanner's buffer.
func (c *scanner) reload() error {
n, err := c.r.Read(c.buf[:cap(c.buf)])
c.buf = c.buf[:n]
if err != nil {
if err != io.EOF {
return err
}
if n == 0 {
return io.EOF
}
}
c.off = 0
return nil
} }
// MJPEG parses MJPEG frames read from src into separate writes to dst with // MJPEG parses MJPEG frames read from src into separate writes to dst with

View File

@ -47,15 +47,15 @@ var h264Tests = []struct {
}, },
{ {
name: "null short", name: "null short",
input: []byte{0x00, 0x00, 0x01}, input: []byte{0x00, 0x00, 0x01, 0x0},
delay: 0, delay: 0,
want: [][]byte{{0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01}}, want: [][]byte{{0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01, 0x0}},
}, },
{ {
name: "null short delayed", name: "null short delayed",
input: []byte{0x00, 0x00, 0x01}, input: []byte{0x00, 0x00, 0x01, 0x0},
delay: time.Millisecond, delay: time.Millisecond,
want: [][]byte{{0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01}}, want: [][]byte{{0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01, 0x0}},
}, },
{ {
name: "full short type 1", name: "full short type 1",
@ -69,9 +69,9 @@ var h264Tests = []struct {
delay: 0, delay: 0,
want: [][]byte{ want: [][]byte{
{0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01, 0x00, 'f', 'u', 'l', 'l', {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01, 0x00, 'f', 'u', 'l', 'l',
0x00, 0x00, 0x01, 0x00, 'f', 'r', 'a', 'm', 'e'}, 0x00, 0x00, 0x01, 0x00, 'f', 'r', 'a', 'm', 'e',
{0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01, 0x41, 'w', 'i', 't', 'h', 0x00, 0x00, 0x01, 0x41, 'w', 'i', 't', 'h'},
0x00, 0x00, 0x01, 0x00, 'l', 'e', 'n', 'g', 't', 'h', {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01, 0x00, 'l', 'e', 'n', 'g', 't', 'h',
0x00, 0x00, 0x01, 0x00, 's', 'p', 'r', 'e', 'a', 'd'}, 0x00, 0x00, 0x01, 0x00, 's', 'p', 'r', 'e', 'a', 'd'},
}, },
}, },
@ -87,9 +87,9 @@ var h264Tests = []struct {
delay: 0, delay: 0,
want: [][]byte{ want: [][]byte{
{0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01, 0x00, 'f', 'u', 'l', 'l', {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01, 0x00, 'f', 'u', 'l', 'l',
0x00, 0x00, 0x01, 0x00, 'f', 'r', 'a', 'm', 'e'}, 0x00, 0x00, 0x01, 0x00, 'f', 'r', 'a', 'm', 'e',
{0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01, 0x45, 'w', 'i', 't', 'h', 0x00, 0x00, 0x01, 0x45, 'w', 'i', 't', 'h'},
0x00, 0x00, 0x01, 0x00, 'l', 'e', 'n', 'g', 't', 'h', {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01, 0x00, 'l', 'e', 'n', 'g', 't', 'h',
0x00, 0x00, 0x01, 0x00, 's', 'p', 'r', 'e', 'a', 'd'}, 0x00, 0x00, 0x01, 0x00, 's', 'p', 'r', 'e', 'a', 'd'},
}, },
}, },
@ -105,9 +105,9 @@ var h264Tests = []struct {
delay: 0, delay: 0,
want: [][]byte{ want: [][]byte{
{0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01, 0x00, 'f', 'u', 'l', 'l', {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01, 0x00, 'f', 'u', 'l', 'l',
0x00, 0x00, 0x01, 0x00, 'f', 'r', 'a', 'm', 'e'}, 0x00, 0x00, 0x01, 0x00, 'f', 'r', 'a', 'm', 'e',
{0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01, 0x48, 'w', 'i', 't', 'h', 0x00, 0x00, 0x01, 0x48, 'w', 'i', 't', 'h'},
0x00, 0x00, 0x01, 0x00, 'l', 'e', 'n', 'g', 't', 'h', {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01, 0x00, 'l', 'e', 'n', 'g', 't', 'h',
0x00, 0x00, 0x01, 0x00, 's', 'p', 'r', 'e', 'a', 'd'}, 0x00, 0x00, 0x01, 0x00, 's', 'p', 'r', 'e', 'a', 'd'},
}, },
}, },
@ -123,24 +123,12 @@ var h264Tests = []struct {
delay: time.Millisecond, delay: time.Millisecond,
want: [][]byte{ want: [][]byte{
{0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01, 0x00, 'f', 'u', 'l', 'l', {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01, 0x00, 'f', 'u', 'l', 'l',
0x00, 0x00, 0x01, 0x00, 'f', 'r', 'a', 'm', 'e'}, 0x00, 0x00, 0x01, 0x00, 'f', 'r', 'a', 'm', 'e',
{0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01, 0x41, 'w', 'i', 't', 'h', 0x00, 0x00, 0x01, 0x41, 'w', 'i', 't', 'h'},
0x00, 0x00, 0x01, 0x00, 'l', 'e', 'n', 'g', 't', 'h', {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01, 0x00, 'l', 'e', 'n', 'g', 't', 'h',
0x00, 0x00, 0x01, 0x00, 's', 'p', 'r', 'e', 'a', 'd'}, 0x00, 0x00, 0x01, 0x00, 's', 'p', 'r', 'e', 'a', 'd'},
}, },
}, },
{
name: "null long",
input: []byte{0x00, 0x00, 0x00, 0x01},
delay: 0,
want: [][]byte{{0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x00, 0x01}},
},
{
name: "null long delayed",
input: []byte{0x00, 0x00, 0x00, 0x01},
delay: time.Millisecond,
want: [][]byte{{0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x00, 0x01}},
},
{ {
name: "full long type 1", name: "full long type 1",
input: []byte{ input: []byte{
@ -153,9 +141,9 @@ var h264Tests = []struct {
delay: 0, delay: 0,
want: [][]byte{ want: [][]byte{
{0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x00, 0x01, 0x00, 'f', 'u', 'l', 'l', {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x00, 0x01, 0x00, 'f', 'u', 'l', 'l',
0x00, 0x00, 0x00, 0x01, 0x00, 'f', 'r', 'a', 'm', 'e'}, 0x00, 0x00, 0x00, 0x01, 0x00, 'f', 'r', 'a', 'm', 'e',
{0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x00, 0x01, 0x41, 'w', 'i', 't', 'h', 0x00, 0x00, 0x00, 0x01, 0x41, 'w', 'i', 't', 'h'},
0x00, 0x00, 0x00, 0x01, 0x00, 'l', 'e', 'n', 'g', 't', 'h', {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x00, 0x01, 0x00, 'l', 'e', 'n', 'g', 't', 'h',
0x00, 0x00, 0x00, 0x01, 0x00, 's', 'p', 'r', 'e', 'a', 'd'}, 0x00, 0x00, 0x00, 0x01, 0x00, 's', 'p', 'r', 'e', 'a', 'd'},
}, },
}, },
@ -171,9 +159,9 @@ var h264Tests = []struct {
delay: 0, delay: 0,
want: [][]byte{ want: [][]byte{
{0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x00, 0x01, 0x00, 'f', 'u', 'l', 'l', {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x00, 0x01, 0x00, 'f', 'u', 'l', 'l',
0x00, 0x00, 0x00, 0x01, 0x00, 'f', 'r', 'a', 'm', 'e'}, 0x00, 0x00, 0x00, 0x01, 0x00, 'f', 'r', 'a', 'm', 'e',
{0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x00, 0x01, 0x45, 'w', 'i', 't', 'h', 0x00, 0x00, 0x00, 0x01, 0x45, 'w', 'i', 't', 'h'},
0x00, 0x00, 0x00, 0x01, 0x00, 'l', 'e', 'n', 'g', 't', 'h', {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x00, 0x01, 0x00, 'l', 'e', 'n', 'g', 't', 'h',
0x00, 0x00, 0x00, 0x01, 0x00, 's', 'p', 'r', 'e', 'a', 'd'}, 0x00, 0x00, 0x00, 0x01, 0x00, 's', 'p', 'r', 'e', 'a', 'd'},
}, },
}, },
@ -189,9 +177,9 @@ var h264Tests = []struct {
delay: 0, delay: 0,
want: [][]byte{ want: [][]byte{
{0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x00, 0x01, 0x00, 'f', 'u', 'l', 'l', {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x00, 0x01, 0x00, 'f', 'u', 'l', 'l',
0x00, 0x00, 0x00, 0x01, 0x00, 'f', 'r', 'a', 'm', 'e'}, 0x00, 0x00, 0x00, 0x01, 0x00, 'f', 'r', 'a', 'm', 'e',
{0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x00, 0x01, 0x48, 'w', 'i', 't', 'h', 0x00, 0x00, 0x00, 0x01, 0x48, 'w', 'i', 't', 'h'},
0x00, 0x00, 0x00, 0x01, 0x00, 'l', 'e', 'n', 'g', 't', 'h', {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x00, 0x01, 0x00, 'l', 'e', 'n', 'g', 't', 'h',
0x00, 0x00, 0x00, 0x01, 0x00, 's', 'p', 'r', 'e', 'a', 'd'}, 0x00, 0x00, 0x00, 0x01, 0x00, 's', 'p', 'r', 'e', 'a', 'd'},
}, },
}, },
@ -207,9 +195,9 @@ var h264Tests = []struct {
delay: time.Millisecond, delay: time.Millisecond,
want: [][]byte{ want: [][]byte{
{0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x00, 0x01, 0x00, 'f', 'u', 'l', 'l', {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x00, 0x01, 0x00, 'f', 'u', 'l', 'l',
0x00, 0x00, 0x00, 0x01, 0x00, 'f', 'r', 'a', 'm', 'e'}, 0x00, 0x00, 0x00, 0x01, 0x00, 'f', 'r', 'a', 'm', 'e',
{0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x00, 0x01, 0x41, 'w', 'i', 't', 'h', 0x00, 0x00, 0x00, 0x01, 0x41, 'w', 'i', 't', 'h'},
0x00, 0x00, 0x00, 0x01, 0x00, 'l', 'e', 'n', 'g', 't', 'h', {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x00, 0x01, 0x00, 'l', 'e', 'n', 'g', 't', 'h',
0x00, 0x00, 0x00, 0x01, 0x00, 's', 'p', 'r', 'e', 'a', 'd'}, 0x00, 0x00, 0x00, 0x01, 0x00, 's', 'p', 'r', 'e', 'a', 'd'},
}, },
}, },
@ -315,3 +303,42 @@ func (w *chunkWriter) Write(b []byte) (int, error) {
*w = append(*w, b) *w = append(*w, b)
return len(b), nil return len(b), nil
} }
func TestScannerReadByte(t *testing.T) {
data := []byte("Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.")
for _, size := range []int{1, 2, 8, 1 << 10} {
r := newScanner(bytes.NewReader(data), make([]byte, size))
var got []byte
for {
b, err := r.readByte()
if err != nil {
break
}
got = append(got, b)
}
if !bytes.Equal(got, data) {
t.Errorf("unexpected result for buffer size %d:\ngot :%q\nwant:%q", size, got, data)
}
}
}
func TestScannerScanUntilZero(t *testing.T) {
data := []byte("Lorem ipsum dolor sit amet, consectetur adipiscing elit,\x00 sed do eiusmod tempor incididunt ut \x00labore et dolore magna aliqua.")
for _, size := range []int{1, 2, 8, 1 << 10} {
r := newScanner(bytes.NewReader(data), make([]byte, size))
var got [][]byte
for {
buf, _, err := r.scanUntilZeroInto(nil)
got = append(got, buf)
if err != nil {
break
}
}
want := bytes.SplitAfter(data, []byte{0})
if !reflect.DeepEqual(got, want) {
t.Errorf("unexpected result for buffer zie %d:\ngot :%q\nwant:%q", size, got, want)
}
}
}

View File

@ -30,7 +30,6 @@ LICENSE
package revid package revid
import ( import (
"bufio"
"errors" "errors"
"fmt" "fmt"
"io" "io"
@ -40,7 +39,7 @@ import (
"time" "time"
"bitbucket.org/ausocean/av/generator" "bitbucket.org/ausocean/av/generator"
"bitbucket.org/ausocean/av/parser" "bitbucket.org/ausocean/av/parse"
"bitbucket.org/ausocean/av/rtmp" "bitbucket.org/ausocean/av/rtmp"
"bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/ring" "bitbucket.org/ausocean/utils/ring"
@ -89,9 +88,9 @@ type Revid struct {
config Config config Config
isRunning bool isRunning bool
generator generator.Generator generator generator.Generator
parser parser.Parser parse func(dst io.Writer, src io.Reader, delay time.Duration) error
cmd *exec.Cmd cmd *exec.Cmd
inputReader *bufio.Reader inputReader io.ReadCloser
ffmpegStdin io.WriteCloser ffmpegStdin io.WriteCloser
outputChan chan []byte outputChan chan []byte
setupInput func() error setupInput func() error
@ -180,16 +179,31 @@ func (r *Revid) reset(config Config) error {
switch r.config.InputCodec { switch r.config.InputCodec {
case H264: case H264:
r.Log(Info, "Using H264 parser!") r.Log(Info, "Using H264 parser!")
r.parser = parser.NewH264Parser() r.parse = parse.H264
case Mjpeg: case Mjpeg:
r.Log(Info, "Using MJPEG parser!") r.Log(Info, "Using MJPEG parser!")
r.parser = parser.NewMJPEGParser(mjpegParserInChanLen) r.parse = parse.MJPEG
} }
switch r.config.Packetization { switch r.config.Packetization {
case None: case None:
// no packetisation - Revid output chan grabs raw data straight from parser // no packetisation - Revid output chan grabs raw data straight from parser
r.parser.SetOutputChan(r.outputChan) r.parse = func(dst io.Writer, src io.Reader, _ time.Duration) error {
// FIXME(kortschak): Reduce this allocation mess. It exists
// because we do not know that the dst will not modify the
// buffer. It shouldn't, but ...
for {
var b [4 << 10]byte
n, rerr := src.Read(b[:])
_, werr := dst.Write(b[:n])
if rerr != nil {
return rerr
}
if werr != nil {
return werr
}
}
}
r.getFrame = r.getFrameNoPacketization r.getFrame = r.getFrameNoPacketization
return nil return nil
case Mpegts: case Mpegts:
@ -204,7 +218,6 @@ func (r *Revid) reset(config Config) error {
// We have packetization of some sort, so we want to send data to Generator // We have packetization of some sort, so we want to send data to Generator
// to perform packetization // to perform packetization
r.getFrame = r.getFramePacketization r.getFrame = r.getFramePacketization
r.parser.SetOutputChan(r.generator.InputChan())
return nil return nil
} }
@ -244,8 +257,6 @@ func (r *Revid) Start() {
go r.packClips() go r.packClips()
r.Log(Info, "Starting packetisation generator") r.Log(Info, "Starting packetisation generator")
r.generator.Start() r.generator.Start()
r.Log(Info, "Starting parser")
r.parser.Start()
r.Log(Info, "Setting up input and receiving content") r.Log(Info, "Setting up input and receiving content")
go r.setupInput() go r.setupInput()
} }
@ -265,11 +276,6 @@ func (r *Revid) Stop() {
r.generator.Stop() r.generator.Stop()
} }
r.Log(Info, "Stopping parser!")
if r.parser != nil {
r.parser.Stop()
}
r.Log(Info, "Killing input proccess!") r.Log(Info, "Killing input proccess!")
// If a cmd process is running, we kill! // If a cmd process is running, we kill!
if r.cmd != nil && r.cmd.Process != nil { if r.cmd != nil && r.cmd.Process != nil {
@ -466,28 +472,13 @@ func (r *Revid) startRaspivid() error {
if err != nil { if err != nil {
return err return err
} }
r.inputReader = bufio.NewReader(stdout) r.inputReader = stdout
go r.readCamera() go func() {
return nil
}
// readCamera reads data from the defined camera while the Revid is running.
// TODO: use ringbuffer here instead of allocating mem every time!
func (r *Revid) readCamera() {
r.Log(Info, "Reading camera data!") r.Log(Info, "Reading camera data!")
for r.isRunning { r.parse(chunkWriter(r.generator.InputChan()), r.inputReader, 0)
var data [1]byte
_, err := io.ReadFull(r.inputReader, data[:])
switch {
// We know this means we're getting nothing from the cam
case err != nil && r.isRunning:
r.Log(Error, "No data from camera!")
time.Sleep(cameraRetryPeriod)
default:
r.parser.InputChan() <- data[0]
}
}
r.Log(Info, "Not trying to read from camera anymore!") r.Log(Info, "Not trying to read from camera anymore!")
}()
return nil
} }
// setupInputForFile sets things up for getting input from a file // setupInputForFile sets things up for getting input from a file
@ -496,12 +487,8 @@ func (r *Revid) setupInputForFile() error {
if err != nil { if err != nil {
return err return err
} }
r.parser.SetDelay(uint(float64(1000) / float64(fps))) delay := time.Second / time.Duration(fps)
return r.readFile()
}
// readFile reads data from the defined file while the Revid is running.
func (r *Revid) readFile() error {
f, err := os.Open(r.config.InputFileName) f, err := os.Open(r.config.InputFileName)
if err != nil { if err != nil {
r.Log(Error, err.Error()) r.Log(Error, err.Error())
@ -509,20 +496,16 @@ func (r *Revid) readFile() error {
return err return err
} }
defer f.Close() defer f.Close()
var buf [1 << 12]byte
for { // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop.
n, err := f.Read(buf[:]) return r.parse(chunkWriter(r.generator.InputChan()), f, delay)
for _, b := range buf[:n] { }
r.parser.InputChan() <- b
} // chunkWriter is a shim between the new function-based approach
if err != nil { // and the old flow-based approach.
if err == io.EOF { type chunkWriter chan []byte
break
} func (w chunkWriter) Write(b []byte) (int, error) {
r.Log(Error, err.Error()) w <- b
r.Stop() return len(b), nil
return err
}
}
return nil
} }