diff --git a/revid/revid.go b/revid/revid.go index e6bc2f3a..5506f9a2 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -193,7 +193,7 @@ func (r *Revid) setupPipeline(mtsEnc func(io.Writer, int) io.Writer, flvEnc func for _, out := range r.config.Outputs { switch out { case Http: - w = newMtsSender(newMinimalHttpSender(r.ns, r.config.Logger.Log), nil) + w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), nil) mtsSenders = append(mtsSenders, w) case Rtp: w, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) diff --git a/revid/senders.go b/revid/senders.go index 6cd4e9bd..a73bdaca 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -83,24 +83,79 @@ func (s *multiSender) Write(d []byte) (int, error) { } // minimalHttpSender implements Sender for posting HTTP to netreceiver or vidgrind. -type minimalHttpSender struct { +type httpSender struct { client *netsender.Sender log func(lvl int8, msg string, args ...interface{}) } // newMinimalHttpSender returns a pointer to a new minimalHttpSender. -func newMinimalHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *minimalHttpSender { - return &minimalHttpSender{ +func newHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *httpSender { + return &httpSender{ client: ns, log: log, } } // send takes a bytes slice d and sends to http using s' http client. -func (s *minimalHttpSender) Write(d []byte) (int, error) { +func (s *httpSender) Write(d []byte) (int, error) { return len(d), httpSend(d, s.client, s.log) } +func httpSend(d []byte, client *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) error { + // Only send if "V0" is configured as an input. + send := false + ip := client.Param("ip") + pins := netsender.MakePins(ip, "V") + for i, pin := range pins { + if pin.Name == "V0" { + send = true + pins[i].Value = len(d) + pins[i].Data = d + pins[i].MimeType = "video/mp2t" + break + } + } + + if !send { + return nil + } + var err error + var reply string + reply, _, err = client.Send(netsender.RequestRecv, pins) + if err != nil { + return err + } + return extractMeta(reply, log) +} + +// extractMeta looks at a reply at extracts any time or location data - then used +// to update time and location information in the mpegts encoder. +func extractMeta(r string, log func(lvl int8, msg string, args ...interface{})) error { + dec, err := netsender.NewJSONDecoder(r) + if err != nil { + return nil + } + // Extract time from reply + t, err := dec.Int("ts") + if err != nil { + log(logger.Warning, pkg+"No timestamp in reply") + } else { + log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t)) + mts.Meta.Add("ts", strconv.Itoa(t)) + } + + // Extract location from reply + g, err := dec.String("ll") + if err != nil { + log(logger.Warning, pkg+"No location in reply") + } else { + log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g)) + mts.Meta.Add("loc", g) + } + + return nil +} + // loadSender is a destination to send a *ring.Chunk to. // When a loadSender has finished using the *ring.Chunk // it must be Closed. @@ -216,90 +271,6 @@ func (s *mtsSender) Write(d []byte) (int, error) { func (s *mtsSender) close() error { return nil } -// httpSender implements loadSender for posting HTTP to NetReceiver -type httpSender struct { - client *netsender.Sender - - log func(lvl int8, msg string, args ...interface{}) - - data []byte -} - -func newHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *httpSender { - return &httpSender{ - client: ns, - log: log, - } -} - -func (s *httpSender) load(d []byte) error { - s.data = d - return nil -} - -func (s *httpSender) send() error { - return httpSend(s.data, s.client, s.log) -} - -func httpSend(d []byte, client *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) error { - // Only send if "V0" is configured as an input. - send := false - ip := client.Param("ip") - pins := netsender.MakePins(ip, "V") - for i, pin := range pins { - if pin.Name == "V0" { - send = true - pins[i].Value = len(d) - pins[i].Data = d - pins[i].MimeType = "video/mp2t" - break - } - } - - if !send { - return nil - } - var err error - var reply string - reply, _, err = client.Send(netsender.RequestRecv, pins) - if err != nil { - return err - } - return extractMeta(reply, log) -} - -// extractMeta looks at a reply at extracts any time or location data - then used -// to update time and location information in the mpegts encoder. -func extractMeta(r string, log func(lvl int8, msg string, args ...interface{})) error { - dec, err := netsender.NewJSONDecoder(r) - if err != nil { - return nil - } - // Extract time from reply - t, err := dec.Int("ts") - if err != nil { - log(logger.Warning, pkg+"No timestamp in reply") - } else { - log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t)) - mts.Meta.Add("ts", strconv.Itoa(t)) - } - - // Extract location from reply - g, err := dec.String("ll") - if err != nil { - log(logger.Warning, pkg+"No location in reply") - } else { - log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g)) - mts.Meta.Add("loc", g) - } - - return nil -} - -func (s *httpSender) release() {} - -func (s *httpSender) close() error { return nil } - // rtmpSender implements loadSender for a native RTMP destination. type rtmpSender struct { conn *rtmp.Conn