mirror of https://bitbucket.org/ausocean/av.git
revid: improved mtsSender's output comment and moved closer to call
This commit is contained in:
parent
dd833afe2e
commit
f546b9daed
|
@ -180,36 +180,7 @@ func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...int
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write implements io.Writer.
|
// output starts an mtsSender's data handling routine.
|
||||||
func (s *mtsSender) Write(d []byte) (int, error) {
|
|
||||||
if s.next != nil {
|
|
||||||
s.buf = append(s.buf, s.next...)
|
|
||||||
}
|
|
||||||
bytes := make([]byte, len(d))
|
|
||||||
copy(bytes, d)
|
|
||||||
s.next = bytes
|
|
||||||
copy(s.pkt[:], bytes)
|
|
||||||
s.curPid = s.pkt.PID()
|
|
||||||
if s.curPid == mts.PatPid && len(s.buf) > 0 {
|
|
||||||
_, err := s.ringBuf.Write(s.buf)
|
|
||||||
if err != nil {
|
|
||||||
s.log(logger.Warning, pkg+"mtsSender: ringBuffer write error", "error", err.Error())
|
|
||||||
}
|
|
||||||
s.ringBuf.Flush()
|
|
||||||
s.buf = s.buf[:0]
|
|
||||||
}
|
|
||||||
return len(d), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close implements io.Closer.
|
|
||||||
func (s *mtsSender) Close() error {
|
|
||||||
close(s.quit)
|
|
||||||
s.wg.Wait()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// output is a routine start at creation of the mtsSender. It will get data
|
|
||||||
// from the mtsSenders ringBuffer and attempt to send.
|
|
||||||
func (s *mtsSender) output() {
|
func (s *mtsSender) output() {
|
||||||
var chunk *ring.Chunk
|
var chunk *ring.Chunk
|
||||||
loop:
|
loop:
|
||||||
|
@ -255,6 +226,34 @@ loop:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Write implements io.Writer.
|
||||||
|
func (s *mtsSender) Write(d []byte) (int, error) {
|
||||||
|
if s.next != nil {
|
||||||
|
s.buf = append(s.buf, s.next...)
|
||||||
|
}
|
||||||
|
bytes := make([]byte, len(d))
|
||||||
|
copy(bytes, d)
|
||||||
|
s.next = bytes
|
||||||
|
copy(s.pkt[:], bytes)
|
||||||
|
s.curPid = s.pkt.PID()
|
||||||
|
if s.curPid == mts.PatPid && len(s.buf) > 0 {
|
||||||
|
_, err := s.ringBuf.Write(s.buf)
|
||||||
|
if err != nil {
|
||||||
|
s.log(logger.Warning, pkg+"mtsSender: ringBuffer write error", "error", err.Error())
|
||||||
|
}
|
||||||
|
s.ringBuf.Flush()
|
||||||
|
s.buf = s.buf[:0]
|
||||||
|
}
|
||||||
|
return len(d), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close implements io.Closer.
|
||||||
|
func (s *mtsSender) Close() error {
|
||||||
|
close(s.quit)
|
||||||
|
s.wg.Wait()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// rtmpSender implements loadSender for a native RTMP destination.
|
// rtmpSender implements loadSender for a native RTMP destination.
|
||||||
type rtmpSender struct {
|
type rtmpSender struct {
|
||||||
conn *rtmp.Conn
|
conn *rtmp.Conn
|
||||||
|
|
Loading…
Reference in New Issue