diff --git a/codec/codecutil/lex.go b/codec/codecutil/lex.go index b948bce5..75e7e637 100644 --- a/codec/codecutil/lex.go +++ b/codec/codecutil/lex.go @@ -30,6 +30,7 @@ import ( "io" "math" "time" + "context" ) // ByteLexer is used to lex bytes using a buffer size which is configured upon construction. @@ -127,13 +128,20 @@ func Noop(dst io.Writer, src io.Reader, d time.Duration) error { defer delay.Stop() // This routine is responsible for frame output to rest of the pipeline. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() go func() { for { err := rb.writeTo(dst) if err != nil && !errors.Is(err, errBuffChanReceiveTimeout) { errCh <- fmt.Errorf("could not write to dst: %w", err) } - <-delay.C + + select { + case <-delay.C: + case <-ctx.Done(): + return + } // Adjust delay using proportional controller. adj := coef * float64(target-rb.len())