mirror of https://bitbucket.org/ausocean/av.git
Don't do anything about cuff chan receive timeout
This commit is contained in:
parent
dcb9a6be4b
commit
96078e1091
|
@ -84,6 +84,8 @@ func (l *ByteLexer) Lex(dst io.Writer, src io.Reader, d time.Duration) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var errBuffChanReceiveTimeout = errors.New("buffer chan receive timeout")
|
||||||
|
|
||||||
// Noop reads media "frames" from src, queues and then writes to dst at intervals,
|
// Noop reads media "frames" from src, queues and then writes to dst at intervals,
|
||||||
// maintaining a steady number of frames stored in the queue (channel). This ensures frames
|
// maintaining a steady number of frames stored in the queue (channel). This ensures frames
|
||||||
// are outputted at a consistent rate; useful if reads occur from src in blocks (a
|
// are outputted at a consistent rate; useful if reads occur from src in blocks (a
|
||||||
|
@ -128,7 +130,7 @@ func Noop(dst io.Writer, src io.Reader, d time.Duration) error {
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
err := rb.writeTo(dst)
|
err := rb.writeTo(dst)
|
||||||
if err != nil {
|
if err != nil && !errors.Is(err, errBuffChanReceiveTimeout) {
|
||||||
errCh <- fmt.Errorf("could not write to dst: %w", err)
|
errCh <- fmt.Errorf("could not write to dst: %w", err)
|
||||||
}
|
}
|
||||||
<-delay.C
|
<-delay.C
|
||||||
|
@ -171,9 +173,9 @@ type ringBuffer struct {
|
||||||
// and cap as the number of elements.
|
// and cap as the number of elements.
|
||||||
func newRingBuffer(sz, cap int, timeout time.Duration) *ringBuffer {
|
func newRingBuffer(sz, cap int, timeout time.Duration) *ringBuffer {
|
||||||
rb := &ringBuffer{
|
rb := &ringBuffer{
|
||||||
buf: make([][]byte, cap),
|
buf: make([][]byte, cap),
|
||||||
n: cap,
|
n: cap,
|
||||||
ch: make(chan []byte, cap),
|
ch: make(chan []byte, cap),
|
||||||
timeout: timeout,
|
timeout: timeout,
|
||||||
}
|
}
|
||||||
for i := range rb.buf {
|
for i := range rb.buf {
|
||||||
|
@ -216,7 +218,7 @@ func (b *ringBuffer) writeTo(w io.Writer) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
case <-timeout.C:
|
case <-timeout.C:
|
||||||
return errors.New("buffer chan receive timeout")
|
return errBuffChanReceiveTimeout
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -110,7 +110,7 @@ func newHTTPSender(ns *netsender.Sender, log logging.Logger, opts ...httpSenderO
|
||||||
|
|
||||||
// Write implements io.Writer.
|
// Write implements io.Writer.
|
||||||
func (s *httpSender) Write(d []byte) (int, error) {
|
func (s *httpSender) Write(d []byte) (int, error) {
|
||||||
s.log.Debug("HTTP sending")
|
s.log.Debug("HTTP sending", "address", s.addr)
|
||||||
err := httpSend(d, s.client, s.log, s.addr)
|
err := httpSend(d, s.client, s.log, s.addr)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
s.log.Debug("good send", "len", len(d))
|
s.log.Debug("good send", "len", len(d))
|
||||||
|
|
Loading…
Reference in New Issue