diff --git a/revid/pipeline.go b/revid/pipeline.go index 39665a47..9b3e9f15 100644 --- a/revid/pipeline.go +++ b/revid/pipeline.go @@ -183,13 +183,13 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. case config.OutputHTTP: r.cfg.Logger.Debug("using HTTP output") pb := pool.NewBuffer(int(r.cfg.PoolStartElementSize), int(nElements), writeTimeout) - hs := newHTTPSender(r.ns, r.cfg.Logger.Log, r.bitrate.Report) - w = newMTSSender(hs, r.cfg.Logger.Log, pb, r.cfg.ClipDuration) + hs := newHTTPSender(r.ns, r.cfg.Logger, r.bitrate.Report) + w = newMTSSender(hs, r.cfg.Logger, pb, r.cfg.ClipDuration) mtsSenders = append(mtsSenders, w) case config.OutputRTP: r.cfg.Logger.Debug("using RTP output") - w, err := newRtpSender(r.cfg.RTPAddress, r.cfg.Logger.Log, r.cfg.FrameRate, r.bitrate.Report) + w, err := newRtpSender(r.cfg.RTPAddress, r.cfg.Logger, r.cfg.FrameRate, r.bitrate.Report) if err != nil { r.cfg.Logger.Warning("rtp connect error", "error", err.Error()) } @@ -208,12 +208,12 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. if err != nil { return err } - w = newMTSSender(fs, r.cfg.Logger.Log, pb, r.cfg.ClipDuration) + w = newMTSSender(fs, r.cfg.Logger, pb, r.cfg.ClipDuration) mtsSenders = append(mtsSenders, w) case config.OutputRTMP: r.cfg.Logger.Debug("using RTMP output") pb := pool.NewBuffer(int(r.cfg.PoolStartElementSize), int(nElements), writeTimeout) - w, err := newRtmpSender(r.cfg.RTMPURL, rtmpConnectionMaxTries, pb, r.cfg.Logger.Log, r.bitrate.Report) + w, err := newRtmpSender(r.cfg.RTMPURL, rtmpConnectionMaxTries, pb, r.cfg.Logger, r.bitrate.Report) if err != nil { r.cfg.Logger.Warning("rtmp connect error", "error", err.Error()) } diff --git a/revid/senders.go b/revid/senders.go index 2452e09f..8131de19 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -47,9 +47,6 @@ import ( "bitbucket.org/ausocean/utils/pool" ) -// Log is used by the multiSender. -type Log func(level int8, message string, params ...interface{}) - // Sender pool buffer read timeouts. const ( rtmpPoolReadTimeout = 1 * time.Second @@ -66,12 +63,12 @@ var ( // destination. type httpSender struct { client *netsender.Sender - log func(lvl int8, msg string, args ...interface{}) + log logging.Logger report func(sent int) } // newHttpSender returns a pointer to a new httpSender. -func newHTTPSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{}), report func(sent int)) *httpSender { +func newHTTPSender(ns *netsender.Sender, log logging.Logger, report func(sent int)) *httpSender { return &httpSender{ client: ns, log: log, @@ -81,24 +78,24 @@ func newHTTPSender(ns *netsender.Sender, log func(lvl int8, msg string, args ... // Write implements io.Writer. func (s *httpSender) Write(d []byte) (int, error) { - s.log(logging.Debug, "HTTP sending") + s.log.Debug("HTTP sending") err := httpSend(d, s.client, s.log) if err == nil { - s.log(logging.Debug, "good send", "len", len(d)) + s.log.Debug("good send", "len", len(d)) s.report(len(d)) } else { - s.log(logging.Debug, "bad send", "error", err) + s.log.Debug("bad send", "error", err) } return len(d), err } func (s *httpSender) Close() error { return nil } -func httpSend(d []byte, client *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) error { +func httpSend(d []byte, client *netsender.Sender, log logging.Logger) error { // Only send if "V0" or "S0" are configured as input. send := false ip := client.Param("ip") - log(logging.Debug, "making pins, and sending recv request", "ip", ip) + log.Debug("making pins, and sending recv request", "ip", ip) pins := netsender.MakePins(ip, "V,S") for i, pin := range pins { switch pin.Name { @@ -124,13 +121,13 @@ func httpSend(d []byte, client *netsender.Sender, log func(lvl int8, msg string, if err != nil { return err } - log(logging.Debug, "good request", "reply", reply) + log.Debug("good request", "reply", reply) return extractMeta(reply, log) } // extractMeta looks at a reply at extracts any time or location data - then used // to update time and location information in the mpegts encoder. -func extractMeta(r string, log func(lvl int8, msg string, args ...interface{})) error { +func extractMeta(r string, log logging.Logger) error { dec, err := netsender.NewJSONDecoder(r) if err != nil { return nil @@ -139,9 +136,9 @@ func extractMeta(r string, log func(lvl int8, msg string, args ...interface{})) if !mts.RealTime.IsSet() { t, err := dec.Int("ts") if err != nil { - log(logging.Warning, "No timestamp in reply") + log.Warning("No timestamp in reply") } else { - log(logging.Debug, "got timestamp", "ts", t) + log.Debug("got timestamp", "ts", t) mts.RealTime.Set(time.Unix(int64(t), 0)) } } @@ -149,9 +146,9 @@ func extractMeta(r string, log func(lvl int8, msg string, args ...interface{})) // Extract location from reply g, err := dec.String("ll") if err != nil { - log(logging.Debug, "No location in reply") + log.Debug("No location in reply") } else { - log(logging.Debug, fmt.Sprintf("got location: %v", g)) + log.Debug(fmt.Sprintf("got location: %v", g)) mts.Meta.Add(mts.LocationKey, g) } @@ -212,13 +209,13 @@ type mtsSender struct { clipDur time.Duration prev time.Time done chan struct{} - log func(lvl int8, msg string, args ...interface{}) + log logging.Logger wg sync.WaitGroup } // newMtsSender returns a new mtsSender. -func newMTSSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rb *pool.Buffer, clipDur time.Duration) *mtsSender { - log(logging.Debug, "setting up mtsSender", "clip duration", int(clipDur)) +func newMTSSender(dst io.WriteCloser, log logging.Logger, rb *pool.Buffer, clipDur time.Duration) *mtsSender { + log.Debug("setting up mtsSender", "clip duration", int(clipDur)) s := &mtsSender{ dst: dst, repairer: mts.NewDiscontinuityRepairer(), @@ -238,7 +235,7 @@ func (s *mtsSender) output() { for { select { case <-s.done: - s.log(logging.Info, "terminating sender output routine") + s.log.Info("terminating sender output routine") defer s.wg.Done() return default: @@ -250,10 +247,10 @@ func (s *mtsSender) output() { case nil, io.EOF: continue case pool.ErrTimeout: - s.log(logging.Debug, "mtsSender: pool buffer read timeout") + s.log.Debug("mtsSender: pool buffer read timeout") continue default: - s.log(logging.Error, "unexpected error", "error", err.Error()) + s.log.Error("unexpected error", "error", err.Error()) continue } } @@ -263,14 +260,14 @@ func (s *mtsSender) output() { chunk = nil continue } - s.log(logging.Debug, "mtsSender: writing") + s.log.Debug("mtsSender: writing") _, err = s.dst.Write(chunk.Bytes()) if err != nil { - s.log(logging.Debug, "failed write, repairing MTS", "error", err) + s.log.Debug("failed write, repairing MTS", "error", err) s.repairer.Failed() continue } else { - s.log(logging.Debug, "good write") + s.log.Debug("good write") } chunk.Close() chunk = nil @@ -285,7 +282,7 @@ func (s *mtsSender) Write(d []byte) (int, error) { } if s.next != nil { - s.log(logging.Debug, "appending packet to clip") + s.log.Debug("appending packet to clip") s.buf = append(s.buf, s.next...) } bytes := make([]byte, len(d)) @@ -294,21 +291,21 @@ func (s *mtsSender) Write(d []byte) (int, error) { p, _ := mts.PID(bytes) s.curPid = int(p) curDur := time.Now().Sub(s.prev) - s.log(logging.Debug, "checking send conditions", "curDuration", int(curDur), "sendDur", int(s.clipDur), "curPID", s.curPid, "len", len(s.buf)) + s.log.Debug("checking send conditions", "curDuration", int(curDur), "sendDur", int(s.clipDur), "curPID", s.curPid, "len", len(s.buf)) if curDur >= s.clipDur && s.curPid == mts.PatPid && len(s.buf) > 0 { - s.log(logging.Debug, "writing clip to pool buffer for sending", "size", len(s.buf)) + s.log.Debug("writing clip to pool buffer for sending", "size", len(s.buf)) s.prev = time.Now() n, err := s.pool.Write(s.buf) if err == nil { s.pool.Flush() } if err != nil { - s.log(logging.Warning, "ringBuffer write error", "error", err.Error(), "n", n, "size", len(s.buf), "rb element size", adjustedMTSPoolElementSize) + s.log.Warning("ringBuffer write error", "error", err.Error(), "n", n, "size", len(s.buf), "rb element size", adjustedMTSPoolElementSize) if err == pool.ErrTooLong { adjustedMTSPoolElementSize = len(s.buf) * 2 numElements := maxBuffLen / adjustedMTSPoolElementSize s.pool = pool.NewBuffer(maxBuffLen/adjustedMTSPoolElementSize, adjustedMTSPoolElementSize, 5*time.Second) - s.log(logging.Info, "adjusted MTS pool buffer element size", "new size", adjustedMTSPoolElementSize, "num elements", numElements, "size(MB)", numElements*adjustedMTSPoolElementSize) + s.log.Info("adjusted MTS pool buffer element size", "new size", adjustedMTSPoolElementSize, "num elements", numElements, "size(MB)", numElements*adjustedMTSPoolElementSize) } } s.buf = s.buf[:0] @@ -318,10 +315,10 @@ func (s *mtsSender) Write(d []byte) (int, error) { // Close implements io.Closer. func (s *mtsSender) Close() error { - s.log(logging.Debug, "closing sender output routine") + s.log.Debug("closing sender output routine") close(s.done) s.wg.Wait() - s.log(logging.Info, "sender output routine closed") + s.log.Info("sender output routine closed") return nil } @@ -330,24 +327,24 @@ type rtmpSender struct { conn *rtmp.Conn url string retries int - log func(lvl int8, msg string, args ...interface{}) + log logging.Logger pool *pool.Buffer done chan struct{} wg sync.WaitGroup report func(sent int) } -func newRtmpSender(url string, retries int, rb *pool.Buffer, log func(lvl int8, msg string, args ...interface{}), report func(sent int)) (*rtmpSender, error) { +func newRtmpSender(url string, retries int, rb *pool.Buffer, log logging.Logger, report func(sent int)) (*rtmpSender, error) { var conn *rtmp.Conn var err error for n := 0; n < retries; n++ { - conn, err = rtmp.Dial(url, log) + conn, err = rtmp.Dial(url, log.Log) if err == nil { break } - log(logging.Error, "dial error", "error", err) + log.Error("dial error", "error", err) if n < retries-1 { - log(logging.Info, "retrying dial") + log.Info("retrying dial") } } s := &rtmpSender{ @@ -370,7 +367,7 @@ func (s *rtmpSender) output() { for { select { case <-s.done: - s.log(logging.Info, "terminating sender output routine") + s.log.Info("terminating sender output routine") defer s.wg.Done() return default: @@ -382,30 +379,30 @@ func (s *rtmpSender) output() { case nil, io.EOF: continue case pool.ErrTimeout: - s.log(logging.Debug, "rtmpSender: pool buffer read timeout") + s.log.Debug("rtmpSender: pool buffer read timeout") continue default: - s.log(logging.Error, "unexpected error", "error", err.Error()) + s.log.Error("unexpected error", "error", err.Error()) continue } } if s.conn == nil { - s.log(logging.Warning, "no rtmp connection, re-dialing") + s.log.Warning("no rtmp connection, re-dialing") err := s.restart() if err != nil { - s.log(logging.Warning, "could not restart connection", "error", err) + s.log.Warning("could not restart connection", "error", err) continue } } _, err := s.conn.Write(chunk.Bytes()) switch err { case nil, rtmp.ErrInvalidFlvTag: - s.log(logging.Debug, "good write to conn") + s.log.Debug("good write to conn") default: - s.log(logging.Warning, "send error, re-dialing", "error", err) + s.log.Warning("send error, re-dialing", "error", err) err = s.restart() if err != nil { - s.log(logging.Warning, "could not restart connection", "error", err) + s.log.Warning("could not restart connection", "error", err) } continue } @@ -417,18 +414,18 @@ func (s *rtmpSender) output() { // Write implements io.Writer. func (s *rtmpSender) Write(d []byte) (int, error) { - s.log(logging.Debug, "writing to pool buffer") + s.log.Debug("writing to pool buffer") _, err := s.pool.Write(d) if err == nil { s.pool.Flush() - s.log(logging.Debug, "good pool buffer write", "len", len(d)) + s.log.Debug("good pool buffer write", "len", len(d)) } else { - s.log(logging.Warning, "pool buffer write error", "error", err.Error()) + s.log.Warning("pool buffer write error", "error", err.Error()) if err == pool.ErrTooLong { adjustedRTMPPoolElementSize = len(d) * 2 numElements := maxBuffLen / adjustedRTMPPoolElementSize s.pool = pool.NewBuffer(numElements, adjustedRTMPPoolElementSize, 5*time.Second) - s.log(logging.Info, "adjusted RTMP pool buffer element size", "new size", adjustedRTMPPoolElementSize, "num elements", numElements, "size(MB)", numElements*adjustedRTMPPoolElementSize) + s.log.Info("adjusted RTMP pool buffer element size", "new size", adjustedRTMPPoolElementSize, "num elements", numElements, "size(MB)", numElements*adjustedRTMPPoolElementSize) } } s.report(len(d)) @@ -439,31 +436,31 @@ func (s *rtmpSender) restart() error { s.close() var err error for n := 0; n < s.retries; n++ { - s.log(logging.Debug, "dialing", "dials", n) - s.conn, err = rtmp.Dial(s.url, s.log) + s.log.Debug("dialing", "dials", n) + s.conn, err = rtmp.Dial(s.url, s.log.Log) if err == nil { break } - s.log(logging.Error, "dial error", "error", err) + s.log.Error("dial error", "error", err) if n < s.retries-1 { - s.log(logging.Info, "retry rtmp connection") + s.log.Info("retry rtmp connection") } } return err } func (s *rtmpSender) Close() error { - s.log(logging.Debug, "closing output routine") + s.log.Debug("closing output routine") if s.done != nil { close(s.done) } s.wg.Wait() - s.log(logging.Info, "output routine closed") + s.log.Info("output routine closed") return s.close() } func (s *rtmpSender) close() error { - s.log(logging.Debug, "closing connection") + s.log.Debug("closing connection") if s.conn == nil { return nil } @@ -473,13 +470,13 @@ func (s *rtmpSender) close() error { // TODO: Write restart func for rtpSender // rtpSender implements loadSender for a native udp destination with rtp packetization. type rtpSender struct { - log func(lvl int8, msg string, args ...interface{}) + log logging.Logger encoder *rtp.Encoder data []byte report func(sent int) } -func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps uint, report func(sent int)) (*rtpSender, error) { +func newRtpSender(addr string, log logging.Logger, fps uint, report func(sent int)) (*rtpSender, error) { conn, err := net.Dial("udp", addr) if err != nil { return nil, err @@ -498,7 +495,7 @@ func (s *rtpSender) Write(d []byte) (int, error) { copy(s.data, d) _, err := s.encoder.Write(s.data) if err != nil { - s.log(logging.Warning, "rtpSender: write error", err.Error()) + s.log.Warning("rtpSender: write error", err.Error()) } s.report(len(d)) return len(d), nil diff --git a/revid/senders_test.go b/revid/senders_test.go index 6c1b5016..b2493801 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -107,7 +107,7 @@ func TestMTSSenderSegment(t *testing.T) { dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips} const testPoolCapacity = 50000000 nElements := testPoolCapacity / poolStartingElementSize - sender := newMTSSender(dst, (*testLogger)(t).Log, pool.NewBuffer(nElements, poolStartingElementSize, 0), 0) + sender := newMTSSender(dst, (*testLogger)(t), pool.NewBuffer(nElements, poolStartingElementSize, 0), 0) const psiSendCount = 10 encoder, err := mts.NewEncoder(sender, (*testLogger)(t), mts.PacketBasedPSI(psiSendCount), mts.Rate(25), mts.MediaType(mts.EncodeH264)) @@ -188,7 +188,7 @@ func TestMtsSenderFailedSend(t *testing.T) { dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})} const testPoolCapacity = 50000000 // 50MB nElements := testPoolCapacity / poolStartingElementSize - sender := newMTSSender(dst, (*testLogger)(t).Log, pool.NewBuffer(nElements, poolStartingElementSize, 0), 0) + sender := newMTSSender(dst, (*testLogger)(t), pool.NewBuffer(nElements, poolStartingElementSize, 0), 0) const psiSendCount = 10 encoder, err := mts.NewEncoder(sender, (*testLogger)(t), mts.PacketBasedPSI(psiSendCount), mts.Rate(25), mts.MediaType(mts.EncodeH264)) @@ -272,7 +272,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder. const clipToDelay = 3 dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})} - sender := newMTSSender(dst, (*testLogger)(t).Log, pool.NewBuffer(1, poolStartingElementSize, 0), 0) + sender := newMTSSender(dst, (*testLogger)(t), pool.NewBuffer(1, poolStartingElementSize, 0), 0) const psiSendCount = 10 encoder, err := mts.NewEncoder(sender, (*testLogger)(t), mts.PacketBasedPSI(psiSendCount), mts.Rate(25), mts.MediaType(mts.EncodeH264))