Merged in fix-senders-logging (pull request #482)

revid/senders.go: using logging.Logger interface

resolves issue #343

Approved-by: Trek Hopton
This commit is contained in:
Saxon Milton 2022-06-09 05:49:16 +00:00
commit 0d5100b2c8
3 changed files with 64 additions and 67 deletions

View File

@ -183,13 +183,13 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
case config.OutputHTTP:
r.cfg.Logger.Debug("using HTTP output")
pb := pool.NewBuffer(int(r.cfg.PoolStartElementSize), int(nElements), writeTimeout)
hs := newHTTPSender(r.ns, r.cfg.Logger.Log, r.bitrate.Report)
w = newMTSSender(hs, r.cfg.Logger.Log, pb, r.cfg.ClipDuration)
hs := newHTTPSender(r.ns, r.cfg.Logger, r.bitrate.Report)
w = newMTSSender(hs, r.cfg.Logger, pb, r.cfg.ClipDuration)
mtsSenders = append(mtsSenders, w)
case config.OutputRTP:
r.cfg.Logger.Debug("using RTP output")
w, err := newRtpSender(r.cfg.RTPAddress, r.cfg.Logger.Log, r.cfg.FrameRate, r.bitrate.Report)
w, err := newRtpSender(r.cfg.RTPAddress, r.cfg.Logger, r.cfg.FrameRate, r.bitrate.Report)
if err != nil {
r.cfg.Logger.Warning("rtp connect error", "error", err.Error())
}
@ -208,12 +208,12 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
if err != nil {
return err
}
w = newMTSSender(fs, r.cfg.Logger.Log, pb, r.cfg.ClipDuration)
w = newMTSSender(fs, r.cfg.Logger, pb, r.cfg.ClipDuration)
mtsSenders = append(mtsSenders, w)
case config.OutputRTMP:
r.cfg.Logger.Debug("using RTMP output")
pb := pool.NewBuffer(int(r.cfg.PoolStartElementSize), int(nElements), writeTimeout)
w, err := newRtmpSender(r.cfg.RTMPURL, rtmpConnectionMaxTries, pb, r.cfg.Logger.Log, r.bitrate.Report)
w, err := newRtmpSender(r.cfg.RTMPURL, rtmpConnectionMaxTries, pb, r.cfg.Logger, r.bitrate.Report)
if err != nil {
r.cfg.Logger.Warning("rtmp connect error", "error", err.Error())
}

View File

@ -47,9 +47,6 @@ import (
"bitbucket.org/ausocean/utils/pool"
)
// Log is used by the multiSender.
type Log func(level int8, message string, params ...interface{})
// Sender pool buffer read timeouts.
const (
rtmpPoolReadTimeout = 1 * time.Second
@ -66,12 +63,12 @@ var (
// destination.
type httpSender struct {
client *netsender.Sender
log func(lvl int8, msg string, args ...interface{})
log logging.Logger
report func(sent int)
}
// newHttpSender returns a pointer to a new httpSender.
func newHTTPSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{}), report func(sent int)) *httpSender {
func newHTTPSender(ns *netsender.Sender, log logging.Logger, report func(sent int)) *httpSender {
return &httpSender{
client: ns,
log: log,
@ -81,24 +78,24 @@ func newHTTPSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...
// Write implements io.Writer.
func (s *httpSender) Write(d []byte) (int, error) {
s.log(logging.Debug, "HTTP sending")
s.log.Debug("HTTP sending")
err := httpSend(d, s.client, s.log)
if err == nil {
s.log(logging.Debug, "good send", "len", len(d))
s.log.Debug("good send", "len", len(d))
s.report(len(d))
} else {
s.log(logging.Debug, "bad send", "error", err)
s.log.Debug("bad send", "error", err)
}
return len(d), err
}
func (s *httpSender) Close() error { return nil }
func httpSend(d []byte, client *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) error {
func httpSend(d []byte, client *netsender.Sender, log logging.Logger) error {
// Only send if "V0" or "S0" are configured as input.
send := false
ip := client.Param("ip")
log(logging.Debug, "making pins, and sending recv request", "ip", ip)
log.Debug("making pins, and sending recv request", "ip", ip)
pins := netsender.MakePins(ip, "V,S")
for i, pin := range pins {
switch pin.Name {
@ -124,13 +121,13 @@ func httpSend(d []byte, client *netsender.Sender, log func(lvl int8, msg string,
if err != nil {
return err
}
log(logging.Debug, "good request", "reply", reply)
log.Debug("good request", "reply", reply)
return extractMeta(reply, log)
}
// extractMeta looks at a reply at extracts any time or location data - then used
// to update time and location information in the mpegts encoder.
func extractMeta(r string, log func(lvl int8, msg string, args ...interface{})) error {
func extractMeta(r string, log logging.Logger) error {
dec, err := netsender.NewJSONDecoder(r)
if err != nil {
return nil
@ -139,9 +136,9 @@ func extractMeta(r string, log func(lvl int8, msg string, args ...interface{}))
if !mts.RealTime.IsSet() {
t, err := dec.Int("ts")
if err != nil {
log(logging.Warning, "No timestamp in reply")
log.Warning("No timestamp in reply")
} else {
log(logging.Debug, "got timestamp", "ts", t)
log.Debug("got timestamp", "ts", t)
mts.RealTime.Set(time.Unix(int64(t), 0))
}
}
@ -149,9 +146,9 @@ func extractMeta(r string, log func(lvl int8, msg string, args ...interface{}))
// Extract location from reply
g, err := dec.String("ll")
if err != nil {
log(logging.Debug, "No location in reply")
log.Debug("No location in reply")
} else {
log(logging.Debug, fmt.Sprintf("got location: %v", g))
log.Debug(fmt.Sprintf("got location: %v", g))
mts.Meta.Add(mts.LocationKey, g)
}
@ -212,13 +209,13 @@ type mtsSender struct {
clipDur time.Duration
prev time.Time
done chan struct{}
log func(lvl int8, msg string, args ...interface{})
log logging.Logger
wg sync.WaitGroup
}
// newMtsSender returns a new mtsSender.
func newMTSSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rb *pool.Buffer, clipDur time.Duration) *mtsSender {
log(logging.Debug, "setting up mtsSender", "clip duration", int(clipDur))
func newMTSSender(dst io.WriteCloser, log logging.Logger, rb *pool.Buffer, clipDur time.Duration) *mtsSender {
log.Debug("setting up mtsSender", "clip duration", int(clipDur))
s := &mtsSender{
dst: dst,
repairer: mts.NewDiscontinuityRepairer(),
@ -238,7 +235,7 @@ func (s *mtsSender) output() {
for {
select {
case <-s.done:
s.log(logging.Info, "terminating sender output routine")
s.log.Info("terminating sender output routine")
defer s.wg.Done()
return
default:
@ -250,10 +247,10 @@ func (s *mtsSender) output() {
case nil, io.EOF:
continue
case pool.ErrTimeout:
s.log(logging.Debug, "mtsSender: pool buffer read timeout")
s.log.Debug("mtsSender: pool buffer read timeout")
continue
default:
s.log(logging.Error, "unexpected error", "error", err.Error())
s.log.Error("unexpected error", "error", err.Error())
continue
}
}
@ -263,14 +260,14 @@ func (s *mtsSender) output() {
chunk = nil
continue
}
s.log(logging.Debug, "mtsSender: writing")
s.log.Debug("mtsSender: writing")
_, err = s.dst.Write(chunk.Bytes())
if err != nil {
s.log(logging.Debug, "failed write, repairing MTS", "error", err)
s.log.Debug("failed write, repairing MTS", "error", err)
s.repairer.Failed()
continue
} else {
s.log(logging.Debug, "good write")
s.log.Debug("good write")
}
chunk.Close()
chunk = nil
@ -285,7 +282,7 @@ func (s *mtsSender) Write(d []byte) (int, error) {
}
if s.next != nil {
s.log(logging.Debug, "appending packet to clip")
s.log.Debug("appending packet to clip")
s.buf = append(s.buf, s.next...)
}
bytes := make([]byte, len(d))
@ -294,21 +291,21 @@ func (s *mtsSender) Write(d []byte) (int, error) {
p, _ := mts.PID(bytes)
s.curPid = int(p)
curDur := time.Now().Sub(s.prev)
s.log(logging.Debug, "checking send conditions", "curDuration", int(curDur), "sendDur", int(s.clipDur), "curPID", s.curPid, "len", len(s.buf))
s.log.Debug("checking send conditions", "curDuration", int(curDur), "sendDur", int(s.clipDur), "curPID", s.curPid, "len", len(s.buf))
if curDur >= s.clipDur && s.curPid == mts.PatPid && len(s.buf) > 0 {
s.log(logging.Debug, "writing clip to pool buffer for sending", "size", len(s.buf))
s.log.Debug("writing clip to pool buffer for sending", "size", len(s.buf))
s.prev = time.Now()
n, err := s.pool.Write(s.buf)
if err == nil {
s.pool.Flush()
}
if err != nil {
s.log(logging.Warning, "ringBuffer write error", "error", err.Error(), "n", n, "size", len(s.buf), "rb element size", adjustedMTSPoolElementSize)
s.log.Warning("ringBuffer write error", "error", err.Error(), "n", n, "size", len(s.buf), "rb element size", adjustedMTSPoolElementSize)
if err == pool.ErrTooLong {
adjustedMTSPoolElementSize = len(s.buf) * 2
numElements := maxBuffLen / adjustedMTSPoolElementSize
s.pool = pool.NewBuffer(maxBuffLen/adjustedMTSPoolElementSize, adjustedMTSPoolElementSize, 5*time.Second)
s.log(logging.Info, "adjusted MTS pool buffer element size", "new size", adjustedMTSPoolElementSize, "num elements", numElements, "size(MB)", numElements*adjustedMTSPoolElementSize)
s.log.Info("adjusted MTS pool buffer element size", "new size", adjustedMTSPoolElementSize, "num elements", numElements, "size(MB)", numElements*adjustedMTSPoolElementSize)
}
}
s.buf = s.buf[:0]
@ -318,10 +315,10 @@ func (s *mtsSender) Write(d []byte) (int, error) {
// Close implements io.Closer.
func (s *mtsSender) Close() error {
s.log(logging.Debug, "closing sender output routine")
s.log.Debug("closing sender output routine")
close(s.done)
s.wg.Wait()
s.log(logging.Info, "sender output routine closed")
s.log.Info("sender output routine closed")
return nil
}
@ -330,24 +327,24 @@ type rtmpSender struct {
conn *rtmp.Conn
url string
retries int
log func(lvl int8, msg string, args ...interface{})
log logging.Logger
pool *pool.Buffer
done chan struct{}
wg sync.WaitGroup
report func(sent int)
}
func newRtmpSender(url string, retries int, rb *pool.Buffer, log func(lvl int8, msg string, args ...interface{}), report func(sent int)) (*rtmpSender, error) {
func newRtmpSender(url string, retries int, rb *pool.Buffer, log logging.Logger, report func(sent int)) (*rtmpSender, error) {
var conn *rtmp.Conn
var err error
for n := 0; n < retries; n++ {
conn, err = rtmp.Dial(url, log)
conn, err = rtmp.Dial(url, log.Log)
if err == nil {
break
}
log(logging.Error, "dial error", "error", err)
log.Error("dial error", "error", err)
if n < retries-1 {
log(logging.Info, "retrying dial")
log.Info("retrying dial")
}
}
s := &rtmpSender{
@ -370,7 +367,7 @@ func (s *rtmpSender) output() {
for {
select {
case <-s.done:
s.log(logging.Info, "terminating sender output routine")
s.log.Info("terminating sender output routine")
defer s.wg.Done()
return
default:
@ -382,30 +379,30 @@ func (s *rtmpSender) output() {
case nil, io.EOF:
continue
case pool.ErrTimeout:
s.log(logging.Debug, "rtmpSender: pool buffer read timeout")
s.log.Debug("rtmpSender: pool buffer read timeout")
continue
default:
s.log(logging.Error, "unexpected error", "error", err.Error())
s.log.Error("unexpected error", "error", err.Error())
continue
}
}
if s.conn == nil {
s.log(logging.Warning, "no rtmp connection, re-dialing")
s.log.Warning("no rtmp connection, re-dialing")
err := s.restart()
if err != nil {
s.log(logging.Warning, "could not restart connection", "error", err)
s.log.Warning("could not restart connection", "error", err)
continue
}
}
_, err := s.conn.Write(chunk.Bytes())
switch err {
case nil, rtmp.ErrInvalidFlvTag:
s.log(logging.Debug, "good write to conn")
s.log.Debug("good write to conn")
default:
s.log(logging.Warning, "send error, re-dialing", "error", err)
s.log.Warning("send error, re-dialing", "error", err)
err = s.restart()
if err != nil {
s.log(logging.Warning, "could not restart connection", "error", err)
s.log.Warning("could not restart connection", "error", err)
}
continue
}
@ -417,18 +414,18 @@ func (s *rtmpSender) output() {
// Write implements io.Writer.
func (s *rtmpSender) Write(d []byte) (int, error) {
s.log(logging.Debug, "writing to pool buffer")
s.log.Debug("writing to pool buffer")
_, err := s.pool.Write(d)
if err == nil {
s.pool.Flush()
s.log(logging.Debug, "good pool buffer write", "len", len(d))
s.log.Debug("good pool buffer write", "len", len(d))
} else {
s.log(logging.Warning, "pool buffer write error", "error", err.Error())
s.log.Warning("pool buffer write error", "error", err.Error())
if err == pool.ErrTooLong {
adjustedRTMPPoolElementSize = len(d) * 2
numElements := maxBuffLen / adjustedRTMPPoolElementSize
s.pool = pool.NewBuffer(numElements, adjustedRTMPPoolElementSize, 5*time.Second)
s.log(logging.Info, "adjusted RTMP pool buffer element size", "new size", adjustedRTMPPoolElementSize, "num elements", numElements, "size(MB)", numElements*adjustedRTMPPoolElementSize)
s.log.Info("adjusted RTMP pool buffer element size", "new size", adjustedRTMPPoolElementSize, "num elements", numElements, "size(MB)", numElements*adjustedRTMPPoolElementSize)
}
}
s.report(len(d))
@ -439,31 +436,31 @@ func (s *rtmpSender) restart() error {
s.close()
var err error
for n := 0; n < s.retries; n++ {
s.log(logging.Debug, "dialing", "dials", n)
s.conn, err = rtmp.Dial(s.url, s.log)
s.log.Debug("dialing", "dials", n)
s.conn, err = rtmp.Dial(s.url, s.log.Log)
if err == nil {
break
}
s.log(logging.Error, "dial error", "error", err)
s.log.Error("dial error", "error", err)
if n < s.retries-1 {
s.log(logging.Info, "retry rtmp connection")
s.log.Info("retry rtmp connection")
}
}
return err
}
func (s *rtmpSender) Close() error {
s.log(logging.Debug, "closing output routine")
s.log.Debug("closing output routine")
if s.done != nil {
close(s.done)
}
s.wg.Wait()
s.log(logging.Info, "output routine closed")
s.log.Info("output routine closed")
return s.close()
}
func (s *rtmpSender) close() error {
s.log(logging.Debug, "closing connection")
s.log.Debug("closing connection")
if s.conn == nil {
return nil
}
@ -473,13 +470,13 @@ func (s *rtmpSender) close() error {
// TODO: Write restart func for rtpSender
// rtpSender implements loadSender for a native udp destination with rtp packetization.
type rtpSender struct {
log func(lvl int8, msg string, args ...interface{})
log logging.Logger
encoder *rtp.Encoder
data []byte
report func(sent int)
}
func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps uint, report func(sent int)) (*rtpSender, error) {
func newRtpSender(addr string, log logging.Logger, fps uint, report func(sent int)) (*rtpSender, error) {
conn, err := net.Dial("udp", addr)
if err != nil {
return nil, err
@ -498,7 +495,7 @@ func (s *rtpSender) Write(d []byte) (int, error) {
copy(s.data, d)
_, err := s.encoder.Write(s.data)
if err != nil {
s.log(logging.Warning, "rtpSender: write error", err.Error())
s.log.Warning("rtpSender: write error", err.Error())
}
s.report(len(d))
return len(d), nil

View File

@ -107,7 +107,7 @@ func TestMTSSenderSegment(t *testing.T) {
dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips}
const testPoolCapacity = 50000000
nElements := testPoolCapacity / poolStartingElementSize
sender := newMTSSender(dst, (*testLogger)(t).Log, pool.NewBuffer(nElements, poolStartingElementSize, 0), 0)
sender := newMTSSender(dst, (*testLogger)(t), pool.NewBuffer(nElements, poolStartingElementSize, 0), 0)
const psiSendCount = 10
encoder, err := mts.NewEncoder(sender, (*testLogger)(t), mts.PacketBasedPSI(psiSendCount), mts.Rate(25), mts.MediaType(mts.EncodeH264))
@ -188,7 +188,7 @@ func TestMtsSenderFailedSend(t *testing.T) {
dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})}
const testPoolCapacity = 50000000 // 50MB
nElements := testPoolCapacity / poolStartingElementSize
sender := newMTSSender(dst, (*testLogger)(t).Log, pool.NewBuffer(nElements, poolStartingElementSize, 0), 0)
sender := newMTSSender(dst, (*testLogger)(t), pool.NewBuffer(nElements, poolStartingElementSize, 0), 0)
const psiSendCount = 10
encoder, err := mts.NewEncoder(sender, (*testLogger)(t), mts.PacketBasedPSI(psiSendCount), mts.Rate(25), mts.MediaType(mts.EncodeH264))
@ -272,7 +272,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) {
// Create destination, the mtsSender and the mtsEncoder.
const clipToDelay = 3
dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})}
sender := newMTSSender(dst, (*testLogger)(t).Log, pool.NewBuffer(1, poolStartingElementSize, 0), 0)
sender := newMTSSender(dst, (*testLogger)(t), pool.NewBuffer(1, poolStartingElementSize, 0), 0)
const psiSendCount = 10
encoder, err := mts.NewEncoder(sender, (*testLogger)(t), mts.PacketBasedPSI(psiSendCount), mts.Rate(25), mts.MediaType(mts.EncodeH264))