Merged in lexers-return-ioeof (pull request #330)

codec: lexers return ioeof

Approved-by: Alan Noble <anoble@gmail.com>
Approved-by: kortschak <dan@kortschak.io>
This commit is contained in:
Saxon Milton 2020-01-23 03:23:03 +00:00
commit c2329b3a3f
5 changed files with 28 additions and 22 deletions

View File

@ -66,7 +66,7 @@ func Lex(dst io.Writer, src io.Reader, delay time.Duration) error {
buf := make([]byte, len(h264Prefix), bufSize) buf := make([]byte, len(h264Prefix), bufSize)
copy(buf, h264Prefix[:]) copy(buf, h264Prefix[:])
writeOut := false writeOut := false
outer:
for { for {
var b byte var b byte
var err error var err error
@ -75,7 +75,10 @@ outer:
if err != io.EOF { if err != io.EOF {
return err return err
} }
break if len(buf) != 0 {
return io.ErrUnexpectedEOF
}
return io.EOF
} }
for n := 1; b == 0x0 && n < 4; n++ { for n := 1; b == 0x0 && n < 4; n++ {
@ -84,7 +87,7 @@ outer:
if err != io.EOF { if err != io.EOF {
return err return err
} }
break outer return io.ErrUnexpectedEOF
} }
buf = append(buf, b) buf = append(buf, b)
@ -109,7 +112,7 @@ outer:
if err != io.EOF { if err != io.EOF {
return err return err
} }
break outer return io.ErrUnexpectedEOF
} }
buf = append(buf, b) buf = append(buf, b)
@ -127,10 +130,4 @@ outer:
} }
} }
} }
if len(buf) == len(h264Prefix) {
return nil
}
<-tick
_, err := dst.Write(buf)
return err
} }

View File

@ -76,10 +76,14 @@ func (l *Lexer) Lex(dst io.Writer, src io.Reader, delay time.Duration) error {
n, err := src.Read(buf) n, err := src.Read(buf)
switch err { switch err {
case nil: // Do nothing. case nil: // Do nothing.
case io.EOF:
return nil
default: default:
return fmt.Errorf("source read error: %w\n", err) if err == io.EOF {
if l.buf.Len() == 0 {
return io.EOF
}
return io.ErrUnexpectedEOF
}
return err
} }
// Get payload from RTP packet. // Get payload from RTP packet.
@ -181,7 +185,7 @@ func (l *Lexer) handleFragmentation(d []byte) {
} }
} }
// handlePACI will handl PACI packets // handlePACI will handle PACI packets
// //
// TODO: complete this // TODO: complete this
func (l *Lexer) handlePACI(d []byte) { func (l *Lexer) handlePACI(d []byte) {

View File

@ -247,7 +247,9 @@ func TestLex(t *testing.T) {
r := &rtpReader{packets: test.packets} r := &rtpReader{packets: test.packets}
d := &destination{} d := &destination{}
err := NewLexer(test.donl).Lex(d, r, 0) err := NewLexer(test.donl).Lex(d, r, 0)
if err != nil { switch err {
case nil, io.EOF: // Do nothing
default:
t.Fatalf("error lexing: %v\n", err) t.Fatalf("error lexing: %v\n", err)
} }

View File

@ -60,12 +60,9 @@ func Lex(dst io.Writer, src io.Reader, delay time.Duration) error {
buf := make([]byte, 2, 4<<10) buf := make([]byte, 2, 4<<10)
n, err := r.Read(buf) n, err := r.Read(buf)
if n < 2 { if n < 2 {
return nil return io.ErrUnexpectedEOF
} }
if err != nil { if err != nil {
if err == io.EOF {
return nil
}
return err return err
} }
if !bytes.Equal(buf, []byte{0xff, 0xd8}) { if !bytes.Equal(buf, []byte{0xff, 0xd8}) {
@ -76,7 +73,7 @@ func Lex(dst io.Writer, src io.Reader, delay time.Duration) error {
b, err := r.ReadByte() b, err := r.ReadByte()
if err != nil { if err != nil {
if err == io.EOF { if err == io.EOF {
return nil return io.ErrUnexpectedEOF
} }
return err return err
} }

View File

@ -854,7 +854,13 @@ func (r *Revid) Update(vars map[string]string) error {
// processFrom is run as a routine to read from a input data source, lex and // processFrom is run as a routine to read from a input data source, lex and
// then send individual access units to revid's encoders. // then send individual access units to revid's encoders.
func (r *Revid) processFrom(read io.Reader, delay time.Duration) { func (r *Revid) processFrom(read io.Reader, delay time.Duration) {
r.err <- r.lexTo(r.filters[0], read, delay) err := r.lexTo(r.filters[0], read, delay)
r.cfg.Logger.Log(logger.Info, pkg+"finished lexing") r.cfg.Logger.Log(logger.Debug, pkg+"finished lexing")
switch err {
case nil: // Do nothing.
case io.EOF: // TODO: handle this depending on loop mode.
default:
r.err <- err
}
r.wg.Done() r.wg.Done()
} }