diff --git a/parse/parse.go b/parse/parse.go index bcad48d5..924a607d 100644 --- a/parse/parse.go +++ b/parse/parse.go @@ -44,9 +44,9 @@ func init() { var h264Prefix = [...]byte{0x00, 0x00, 0x01, 0x09, 0xf0} -// H264 parses H.264 NAL units read from src into separate writes to dst with +// H264 lexes H.264 NAL units read from src into separate writes to dst with // successive writes being performed not earlier than the specified delay. -// NAL units are split at type 1 (Coded slice of a non-IDR picture), 5 +// NAL units are split after type 1 (Coded slice of a non-IDR picture), 5 // (Coded slice of a IDR picture) and 8 (Picture parameter set). func H264(dst io.Writer, src io.Reader, delay time.Duration) error { var tick <-chan time.Time @@ -58,35 +58,62 @@ func H264(dst io.Writer, src io.Reader, delay time.Duration) error { tick = ticker.C } - r := bufio.NewReader(src) + const bufSize = 8 << 10 - buf := make([]byte, len(h264Prefix), 4<<10) + c := newScanner(src, make([]byte, 4<<10)) // Standard file buffer size. + + buf := make([]byte, len(h264Prefix), bufSize) copy(buf, h264Prefix[:]) - var zeroes int + writeOut := false +outer: for { - b, err := r.ReadByte() + var b byte + var err error + buf, b, err = c.scanUntilZeroInto(buf) if err != nil { if err != io.EOF { return err } - if len(buf) == len(h264Prefix) { - return nil - } - <-tick - _, err = dst.Write(buf) - return err + break } - if b == 0 { - zeroes++ - } - if (zeroes == 2 || zeroes == 3) && b == 1 && len(buf)-len(h264Prefix) > zeroes { - b, err = r.ReadByte() + + for n := 1; b == 0x0 && n < 4; n++ { + b, err = c.readByte() if err != nil { - if err == io.EOF { - err = io.ErrUnexpectedEOF + if err != io.EOF { + return err } - return err + break outer } + buf = append(buf, b) + + if b != 0x1 || (n != 2 && n != 3) { + continue + } + + if writeOut { + <-tick + _, err := dst.Write(buf[:len(buf)-(n+1)]) + if err != nil { + return err + } + buf = make([]byte, len(h264Prefix)+n, bufSize) + copy(buf, h264Prefix[:]) + buf = append(buf, 1) + writeOut = false + } + + b, err = c.readByte() + if err != nil { + if err != io.EOF { + return err + } + break outer + } + buf = append(buf, b) + + // http://www.itu.int/rec/dologin_pub.asp?lang=e&id=T-REC-H.264-200305-S!!PDF-E&type=items + // Table 7-1 NAL unit type codes const ( nonIdrPic = 1 idrPic = 5 @@ -94,23 +121,83 @@ func H264(dst io.Writer, src io.Reader, delay time.Duration) error { ) switch nalTyp := b & 0x1f; nalTyp { case nonIdrPic, idrPic, paramSet: - <-tick - _, err = dst.Write(buf[:len(buf)-zeroes]) - if err != nil { - return err - } - buf = make([]byte, zeroes+len(h264Prefix), 4<<10) - copy(buf, h264Prefix[:]) - zeroes = 0 + writeOut = true } - buf = append(buf, 1, b) - continue } - if b != 0 { - zeroes = 0 - } - buf = append(buf, b) } + if len(buf) == len(h264Prefix) { + return nil + } + <-tick + _, err := dst.Write(buf) + return err +} + +// scanner is a byte scanner. +type scanner struct { + buf []byte + off int + + // r is the source of data for the scanner. + r io.Reader +} + +// newScanner returns a scanner initialised with an io.Reader and a read buffer. +func newScanner(r io.Reader, buf []byte) *scanner { + return &scanner{r: r, buf: buf[:0]} +} + +// scanUntilZeroInto scans the scanner's underlying io.Reader until a zero byte +// has been read, appending all read bytes to dst. The resulting appended data, +// the last read byte and whether the last read byte was zero are returned. +func (c *scanner) scanUntilZeroInto(dst []byte) (res []byte, b byte, err error) { +outer: + for { + var i int + for i, b = range c.buf[c.off:] { + if b != 0x0 { + continue + } + dst = append(dst, c.buf[c.off:c.off+i+1]...) + c.off += i + 1 + break outer + } + dst = append(dst, c.buf[c.off:]...) + err = c.reload() + if err != nil { + break + } + } + return dst, b, err +} + +// readByte is an unexported ReadByte. +func (c *scanner) readByte() (byte, error) { + if c.off >= len(c.buf) { + err := c.reload() + if err != nil { + return 0, err + } + } + b := c.buf[c.off] + c.off++ + return b, nil +} + +// reload re-fills the scanner's buffer. +func (c *scanner) reload() error { + n, err := c.r.Read(c.buf[:cap(c.buf)]) + c.buf = c.buf[:n] + if err != nil { + if err != io.EOF { + return err + } + if n == 0 { + return io.EOF + } + } + c.off = 0 + return nil } // MJPEG parses MJPEG frames read from src into separate writes to dst with diff --git a/parse/parse_test.go b/parse/parse_test.go index 2b635eaa..1440c7d9 100644 --- a/parse/parse_test.go +++ b/parse/parse_test.go @@ -47,15 +47,15 @@ var h264Tests = []struct { }, { name: "null short", - input: []byte{0x00, 0x00, 0x01}, + input: []byte{0x00, 0x00, 0x01, 0x0}, delay: 0, - want: [][]byte{{0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01}}, + want: [][]byte{{0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01, 0x0}}, }, { name: "null short delayed", - input: []byte{0x00, 0x00, 0x01}, + input: []byte{0x00, 0x00, 0x01, 0x0}, delay: time.Millisecond, - want: [][]byte{{0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01}}, + want: [][]byte{{0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01, 0x0}}, }, { name: "full short type 1", @@ -69,9 +69,9 @@ var h264Tests = []struct { delay: 0, want: [][]byte{ {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01, 0x00, 'f', 'u', 'l', 'l', - 0x00, 0x00, 0x01, 0x00, 'f', 'r', 'a', 'm', 'e'}, - {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01, 0x41, 'w', 'i', 't', 'h', - 0x00, 0x00, 0x01, 0x00, 'l', 'e', 'n', 'g', 't', 'h', + 0x00, 0x00, 0x01, 0x00, 'f', 'r', 'a', 'm', 'e', + 0x00, 0x00, 0x01, 0x41, 'w', 'i', 't', 'h'}, + {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01, 0x00, 'l', 'e', 'n', 'g', 't', 'h', 0x00, 0x00, 0x01, 0x00, 's', 'p', 'r', 'e', 'a', 'd'}, }, }, @@ -87,9 +87,9 @@ var h264Tests = []struct { delay: 0, want: [][]byte{ {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01, 0x00, 'f', 'u', 'l', 'l', - 0x00, 0x00, 0x01, 0x00, 'f', 'r', 'a', 'm', 'e'}, - {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01, 0x45, 'w', 'i', 't', 'h', - 0x00, 0x00, 0x01, 0x00, 'l', 'e', 'n', 'g', 't', 'h', + 0x00, 0x00, 0x01, 0x00, 'f', 'r', 'a', 'm', 'e', + 0x00, 0x00, 0x01, 0x45, 'w', 'i', 't', 'h'}, + {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01, 0x00, 'l', 'e', 'n', 'g', 't', 'h', 0x00, 0x00, 0x01, 0x00, 's', 'p', 'r', 'e', 'a', 'd'}, }, }, @@ -105,9 +105,9 @@ var h264Tests = []struct { delay: 0, want: [][]byte{ {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01, 0x00, 'f', 'u', 'l', 'l', - 0x00, 0x00, 0x01, 0x00, 'f', 'r', 'a', 'm', 'e'}, - {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01, 0x48, 'w', 'i', 't', 'h', - 0x00, 0x00, 0x01, 0x00, 'l', 'e', 'n', 'g', 't', 'h', + 0x00, 0x00, 0x01, 0x00, 'f', 'r', 'a', 'm', 'e', + 0x00, 0x00, 0x01, 0x48, 'w', 'i', 't', 'h'}, + {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01, 0x00, 'l', 'e', 'n', 'g', 't', 'h', 0x00, 0x00, 0x01, 0x00, 's', 'p', 'r', 'e', 'a', 'd'}, }, }, @@ -123,24 +123,12 @@ var h264Tests = []struct { delay: time.Millisecond, want: [][]byte{ {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01, 0x00, 'f', 'u', 'l', 'l', - 0x00, 0x00, 0x01, 0x00, 'f', 'r', 'a', 'm', 'e'}, - {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01, 0x41, 'w', 'i', 't', 'h', - 0x00, 0x00, 0x01, 0x00, 'l', 'e', 'n', 'g', 't', 'h', + 0x00, 0x00, 0x01, 0x00, 'f', 'r', 'a', 'm', 'e', + 0x00, 0x00, 0x01, 0x41, 'w', 'i', 't', 'h'}, + {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x01, 0x00, 'l', 'e', 'n', 'g', 't', 'h', 0x00, 0x00, 0x01, 0x00, 's', 'p', 'r', 'e', 'a', 'd'}, }, }, - { - name: "null long", - input: []byte{0x00, 0x00, 0x00, 0x01}, - delay: 0, - want: [][]byte{{0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x00, 0x01}}, - }, - { - name: "null long delayed", - input: []byte{0x00, 0x00, 0x00, 0x01}, - delay: time.Millisecond, - want: [][]byte{{0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x00, 0x01}}, - }, { name: "full long type 1", input: []byte{ @@ -153,9 +141,9 @@ var h264Tests = []struct { delay: 0, want: [][]byte{ {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x00, 0x01, 0x00, 'f', 'u', 'l', 'l', - 0x00, 0x00, 0x00, 0x01, 0x00, 'f', 'r', 'a', 'm', 'e'}, - {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x00, 0x01, 0x41, 'w', 'i', 't', 'h', - 0x00, 0x00, 0x00, 0x01, 0x00, 'l', 'e', 'n', 'g', 't', 'h', + 0x00, 0x00, 0x00, 0x01, 0x00, 'f', 'r', 'a', 'm', 'e', + 0x00, 0x00, 0x00, 0x01, 0x41, 'w', 'i', 't', 'h'}, + {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x00, 0x01, 0x00, 'l', 'e', 'n', 'g', 't', 'h', 0x00, 0x00, 0x00, 0x01, 0x00, 's', 'p', 'r', 'e', 'a', 'd'}, }, }, @@ -171,9 +159,9 @@ var h264Tests = []struct { delay: 0, want: [][]byte{ {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x00, 0x01, 0x00, 'f', 'u', 'l', 'l', - 0x00, 0x00, 0x00, 0x01, 0x00, 'f', 'r', 'a', 'm', 'e'}, - {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x00, 0x01, 0x45, 'w', 'i', 't', 'h', - 0x00, 0x00, 0x00, 0x01, 0x00, 'l', 'e', 'n', 'g', 't', 'h', + 0x00, 0x00, 0x00, 0x01, 0x00, 'f', 'r', 'a', 'm', 'e', + 0x00, 0x00, 0x00, 0x01, 0x45, 'w', 'i', 't', 'h'}, + {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x00, 0x01, 0x00, 'l', 'e', 'n', 'g', 't', 'h', 0x00, 0x00, 0x00, 0x01, 0x00, 's', 'p', 'r', 'e', 'a', 'd'}, }, }, @@ -189,9 +177,9 @@ var h264Tests = []struct { delay: 0, want: [][]byte{ {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x00, 0x01, 0x00, 'f', 'u', 'l', 'l', - 0x00, 0x00, 0x00, 0x01, 0x00, 'f', 'r', 'a', 'm', 'e'}, - {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x00, 0x01, 0x48, 'w', 'i', 't', 'h', - 0x00, 0x00, 0x00, 0x01, 0x00, 'l', 'e', 'n', 'g', 't', 'h', + 0x00, 0x00, 0x00, 0x01, 0x00, 'f', 'r', 'a', 'm', 'e', + 0x00, 0x00, 0x00, 0x01, 0x48, 'w', 'i', 't', 'h'}, + {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x00, 0x01, 0x00, 'l', 'e', 'n', 'g', 't', 'h', 0x00, 0x00, 0x00, 0x01, 0x00, 's', 'p', 'r', 'e', 'a', 'd'}, }, }, @@ -207,9 +195,9 @@ var h264Tests = []struct { delay: time.Millisecond, want: [][]byte{ {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x00, 0x01, 0x00, 'f', 'u', 'l', 'l', - 0x00, 0x00, 0x00, 0x01, 0x00, 'f', 'r', 'a', 'm', 'e'}, - {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x00, 0x01, 0x41, 'w', 'i', 't', 'h', - 0x00, 0x00, 0x00, 0x01, 0x00, 'l', 'e', 'n', 'g', 't', 'h', + 0x00, 0x00, 0x00, 0x01, 0x00, 'f', 'r', 'a', 'm', 'e', + 0x00, 0x00, 0x00, 0x01, 0x41, 'w', 'i', 't', 'h'}, + {0x0, 0x0, 0x1, 0x9, 0xf0, 0x00, 0x00, 0x00, 0x01, 0x00, 'l', 'e', 'n', 'g', 't', 'h', 0x00, 0x00, 0x00, 0x01, 0x00, 's', 'p', 'r', 'e', 'a', 'd'}, }, }, @@ -315,3 +303,42 @@ func (w *chunkWriter) Write(b []byte) (int, error) { *w = append(*w, b) return len(b), nil } + +func TestScannerReadByte(t *testing.T) { + data := []byte("Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.") + + for _, size := range []int{1, 2, 8, 1 << 10} { + r := newScanner(bytes.NewReader(data), make([]byte, size)) + var got []byte + for { + b, err := r.readByte() + if err != nil { + break + } + got = append(got, b) + } + if !bytes.Equal(got, data) { + t.Errorf("unexpected result for buffer size %d:\ngot :%q\nwant:%q", size, got, data) + } + } +} + +func TestScannerScanUntilZero(t *testing.T) { + data := []byte("Lorem ipsum dolor sit amet, consectetur adipiscing elit,\x00 sed do eiusmod tempor incididunt ut \x00labore et dolore magna aliqua.") + + for _, size := range []int{1, 2, 8, 1 << 10} { + r := newScanner(bytes.NewReader(data), make([]byte, size)) + var got [][]byte + for { + buf, _, err := r.scanUntilZeroInto(nil) + got = append(got, buf) + if err != nil { + break + } + } + want := bytes.SplitAfter(data, []byte{0}) + if !reflect.DeepEqual(got, want) { + t.Errorf("unexpected result for buffer zie %d:\ngot :%q\nwant:%q", size, got, want) + } + } +} diff --git a/revid/revid.go b/revid/revid.go index 8ae5388b..57f55fb7 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -30,7 +30,6 @@ LICENSE package revid import ( - "bufio" "errors" "fmt" "io" @@ -40,7 +39,7 @@ import ( "time" "bitbucket.org/ausocean/av/generator" - "bitbucket.org/ausocean/av/parser" + "bitbucket.org/ausocean/av/parse" "bitbucket.org/ausocean/av/rtmp" "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/ring" @@ -89,9 +88,9 @@ type Revid struct { config Config isRunning bool generator generator.Generator - parser parser.Parser + parse func(dst io.Writer, src io.Reader, delay time.Duration) error cmd *exec.Cmd - inputReader *bufio.Reader + inputReader io.ReadCloser ffmpegStdin io.WriteCloser outputChan chan []byte setupInput func() error @@ -180,16 +179,31 @@ func (r *Revid) reset(config Config) error { switch r.config.InputCodec { case H264: r.Log(Info, "Using H264 parser!") - r.parser = parser.NewH264Parser() + r.parse = parse.H264 case Mjpeg: r.Log(Info, "Using MJPEG parser!") - r.parser = parser.NewMJPEGParser(mjpegParserInChanLen) + r.parse = parse.MJPEG } switch r.config.Packetization { case None: // no packetisation - Revid output chan grabs raw data straight from parser - r.parser.SetOutputChan(r.outputChan) + r.parse = func(dst io.Writer, src io.Reader, _ time.Duration) error { + // FIXME(kortschak): Reduce this allocation mess. It exists + // because we do not know that the dst will not modify the + // buffer. It shouldn't, but ... + for { + var b [4 << 10]byte + n, rerr := src.Read(b[:]) + _, werr := dst.Write(b[:n]) + if rerr != nil { + return rerr + } + if werr != nil { + return werr + } + } + } r.getFrame = r.getFrameNoPacketization return nil case Mpegts: @@ -204,7 +218,6 @@ func (r *Revid) reset(config Config) error { // We have packetization of some sort, so we want to send data to Generator // to perform packetization r.getFrame = r.getFramePacketization - r.parser.SetOutputChan(r.generator.InputChan()) return nil } @@ -244,8 +257,6 @@ func (r *Revid) Start() { go r.packClips() r.Log(Info, "Starting packetisation generator") r.generator.Start() - r.Log(Info, "Starting parser") - r.parser.Start() r.Log(Info, "Setting up input and receiving content") go r.setupInput() } @@ -265,11 +276,6 @@ func (r *Revid) Stop() { r.generator.Stop() } - r.Log(Info, "Stopping parser!") - if r.parser != nil { - r.parser.Stop() - } - r.Log(Info, "Killing input proccess!") // If a cmd process is running, we kill! if r.cmd != nil && r.cmd.Process != nil { @@ -466,42 +472,23 @@ func (r *Revid) startRaspivid() error { if err != nil { return err } - r.inputReader = bufio.NewReader(stdout) - go r.readCamera() + r.inputReader = stdout + go func() { + r.Log(Info, "Reading camera data!") + r.parse(chunkWriter(r.generator.InputChan()), r.inputReader, 0) + r.Log(Info, "Not trying to read from camera anymore!") + }() return nil } -// readCamera reads data from the defined camera while the Revid is running. -// TODO: use ringbuffer here instead of allocating mem every time! -func (r *Revid) readCamera() { - r.Log(Info, "Reading camera data!") - for r.isRunning { - var data [1]byte - _, err := io.ReadFull(r.inputReader, data[:]) - switch { - // We know this means we're getting nothing from the cam - case err != nil && r.isRunning: - r.Log(Error, "No data from camera!") - time.Sleep(cameraRetryPeriod) - default: - r.parser.InputChan() <- data[0] - } - } - r.Log(Info, "Not trying to read from camera anymore!") -} - // setupInputForFile sets things up for getting input from a file func (r *Revid) setupInputForFile() error { fps, err := strconv.Atoi(r.config.FrameRate) if err != nil { return err } - r.parser.SetDelay(uint(float64(1000) / float64(fps))) - return r.readFile() -} + delay := time.Second / time.Duration(fps) -// readFile reads data from the defined file while the Revid is running. -func (r *Revid) readFile() error { f, err := os.Open(r.config.InputFileName) if err != nil { r.Log(Error, err.Error()) @@ -509,20 +496,16 @@ func (r *Revid) readFile() error { return err } defer f.Close() - var buf [1 << 12]byte - for { - n, err := f.Read(buf[:]) - for _, b := range buf[:n] { - r.parser.InputChan() <- b - } - if err != nil { - if err == io.EOF { - break - } - r.Log(Error, err.Error()) - r.Stop() - return err - } - } - return nil + + // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. + return r.parse(chunkWriter(r.generator.InputChan()), f, delay) +} + +// chunkWriter is a shim between the new function-based approach +// and the old flow-based approach. +type chunkWriter chan []byte + +func (w chunkWriter) Write(b []byte) (int, error) { + w <- b + return len(b), nil }