revid: got rid of minimalHttpSender

Now that we're removing the concept of a loadSender, there is no need to have a minimalHttpSender (did not implement loadSender) and a httpSender (implemented loadSender). So we can now have
a single httpSender that implements io.Writer just like every other sender.
This commit is contained in:
Saxon 2019-04-01 12:07:28 +10:30
parent 5a67e71fe4
commit f17d2ffb8c
2 changed files with 60 additions and 89 deletions

View File

@ -193,7 +193,7 @@ func (r *Revid) setupPipeline(mtsEnc func(io.Writer, int) io.Writer, flvEnc func
for _, out := range r.config.Outputs {
switch out {
case Http:
w = newMtsSender(newMinimalHttpSender(r.ns, r.config.Logger.Log), nil)
w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), nil)
mtsSenders = append(mtsSenders, w)
case Rtp:
w, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate)

View File

@ -83,24 +83,79 @@ func (s *multiSender) Write(d []byte) (int, error) {
}
// minimalHttpSender implements Sender for posting HTTP to netreceiver or vidgrind.
type minimalHttpSender struct {
type httpSender struct {
client *netsender.Sender
log func(lvl int8, msg string, args ...interface{})
}
// newMinimalHttpSender returns a pointer to a new minimalHttpSender.
func newMinimalHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *minimalHttpSender {
return &minimalHttpSender{
func newHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *httpSender {
return &httpSender{
client: ns,
log: log,
}
}
// send takes a bytes slice d and sends to http using s' http client.
func (s *minimalHttpSender) Write(d []byte) (int, error) {
func (s *httpSender) Write(d []byte) (int, error) {
return len(d), httpSend(d, s.client, s.log)
}
func httpSend(d []byte, client *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) error {
// Only send if "V0" is configured as an input.
send := false
ip := client.Param("ip")
pins := netsender.MakePins(ip, "V")
for i, pin := range pins {
if pin.Name == "V0" {
send = true
pins[i].Value = len(d)
pins[i].Data = d
pins[i].MimeType = "video/mp2t"
break
}
}
if !send {
return nil
}
var err error
var reply string
reply, _, err = client.Send(netsender.RequestRecv, pins)
if err != nil {
return err
}
return extractMeta(reply, log)
}
// extractMeta looks at a reply at extracts any time or location data - then used
// to update time and location information in the mpegts encoder.
func extractMeta(r string, log func(lvl int8, msg string, args ...interface{})) error {
dec, err := netsender.NewJSONDecoder(r)
if err != nil {
return nil
}
// Extract time from reply
t, err := dec.Int("ts")
if err != nil {
log(logger.Warning, pkg+"No timestamp in reply")
} else {
log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t))
mts.Meta.Add("ts", strconv.Itoa(t))
}
// Extract location from reply
g, err := dec.String("ll")
if err != nil {
log(logger.Warning, pkg+"No location in reply")
} else {
log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g))
mts.Meta.Add("loc", g)
}
return nil
}
// loadSender is a destination to send a *ring.Chunk to.
// When a loadSender has finished using the *ring.Chunk
// it must be Closed.
@ -216,90 +271,6 @@ func (s *mtsSender) Write(d []byte) (int, error) {
func (s *mtsSender) close() error { return nil }
// httpSender implements loadSender for posting HTTP to NetReceiver
type httpSender struct {
client *netsender.Sender
log func(lvl int8, msg string, args ...interface{})
data []byte
}
func newHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *httpSender {
return &httpSender{
client: ns,
log: log,
}
}
func (s *httpSender) load(d []byte) error {
s.data = d
return nil
}
func (s *httpSender) send() error {
return httpSend(s.data, s.client, s.log)
}
func httpSend(d []byte, client *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) error {
// Only send if "V0" is configured as an input.
send := false
ip := client.Param("ip")
pins := netsender.MakePins(ip, "V")
for i, pin := range pins {
if pin.Name == "V0" {
send = true
pins[i].Value = len(d)
pins[i].Data = d
pins[i].MimeType = "video/mp2t"
break
}
}
if !send {
return nil
}
var err error
var reply string
reply, _, err = client.Send(netsender.RequestRecv, pins)
if err != nil {
return err
}
return extractMeta(reply, log)
}
// extractMeta looks at a reply at extracts any time or location data - then used
// to update time and location information in the mpegts encoder.
func extractMeta(r string, log func(lvl int8, msg string, args ...interface{})) error {
dec, err := netsender.NewJSONDecoder(r)
if err != nil {
return nil
}
// Extract time from reply
t, err := dec.Int("ts")
if err != nil {
log(logger.Warning, pkg+"No timestamp in reply")
} else {
log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t))
mts.Meta.Add("ts", strconv.Itoa(t))
}
// Extract location from reply
g, err := dec.String("ll")
if err != nil {
log(logger.Warning, pkg+"No location in reply")
} else {
log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g))
mts.Meta.Add("loc", g)
}
return nil
}
func (s *httpSender) release() {}
func (s *httpSender) close() error { return nil }
// rtmpSender implements loadSender for a native RTMP destination.
type rtmpSender struct {
conn *rtmp.Conn