From 4fe01f6899a923fd271cfd7397609c1bcafd3531 Mon Sep 17 00:00:00 2001 From: Saxon Nelson-Milton Date: Sat, 4 Feb 2023 18:17:29 +1030 Subject: [PATCH] revid/senders.go: Utilise HTTPAddress variable in config This change bumps the required iot tag and adds functional option variadic parameters to the httpSender constructor so that we can provide the HTTPAddress config variable and modify the httpSend destination address. In addition to this, we have added an option for the report callback. --- go.mod | 2 +- go.sum | 4 ++-- revid/pipeline.go | 5 ++++- revid/senders.go | 47 +++++++++++++++++++++++++++++++++++++---------- 4 files changed, 44 insertions(+), 14 deletions(-) diff --git a/go.mod b/go.mod index d2c848b8..b94dd41d 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module bitbucket.org/ausocean/av go 1.18 require ( - bitbucket.org/ausocean/iot v1.3.3 + bitbucket.org/ausocean/iot v1.4.0 bitbucket.org/ausocean/utils v1.3.2 github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf diff --git a/go.sum b/go.sum index cc2a47a7..27504d0d 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,6 @@ bitbucket.org/ausocean/iot v1.3.0/go.mod h1:rRcWt6SoM/jgIZpP1zrpnKb5BhxIMulAJ+q1xTvLh94= -bitbucket.org/ausocean/iot v1.3.3 h1:xEu6jrvWEofzFo13154FS66g4i2Ojl94kheoUshFB/A= -bitbucket.org/ausocean/iot v1.3.3/go.mod h1:NbEg2PvYSHDdUsy5eMmihBySpWfqaHiMdspQDZdDe8o= +bitbucket.org/ausocean/iot v1.4.0 h1:DzySaszy8dPGv7eb5cdigAfm3d3y582Hr/Sy4nMlvJw= +bitbucket.org/ausocean/iot v1.4.0/go.mod h1:NbEg2PvYSHDdUsy5eMmihBySpWfqaHiMdspQDZdDe8o= bitbucket.org/ausocean/utils v1.2.11/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8= bitbucket.org/ausocean/utils v1.3.0/go.mod h1:yWsulKjbBgwL17/w55MQ6cIT9jmNeOkwpd2gUIxAcIY= bitbucket.org/ausocean/utils v1.3.2 h1:U+jNixVPH1Ih0QpOckI4djhEc/T6FAzt9znADRcJ07s= diff --git a/revid/pipeline.go b/revid/pipeline.go index f2f0791c..2edd116f 100644 --- a/revid/pipeline.go +++ b/revid/pipeline.go @@ -183,7 +183,10 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. case config.OutputHTTP: r.cfg.Logger.Debug("using HTTP output") pb := pool.NewBuffer(int(r.cfg.PoolStartElementSize), int(nElements), writeTimeout) - hs := newHTTPSender(r.ns, r.cfg.Logger, r.bitrate.Report) + hs, err := newHTTPSender(r.ns, r.cfg.Logger, withReportCallback(r.bitrate.Report), withHTTPAddress(r.cfg.HTTPAddress)) + if err != nil { + return fmt.Errorf("coult not create http sender: %w", err) + } w = newMTSSender(hs, r.cfg.Logger, pb, r.cfg.ClipDuration) mtsSenders = append(mtsSenders, w) diff --git a/revid/senders.go b/revid/senders.go index 87877784..ec241f9e 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -60,29 +60,58 @@ var ( adjustedMTSPoolElementSize int ) +var errReportCallbackNil = errors.New("report callback is nil") + +type httpSenderOption func(s *httpSender) error + +// withReportCallback provides a functional option to set the report callback +// function. This can be used to record the number of bytes sent. +func withReportCallback(report func(sent int)) httpSenderOption { + return func(s *httpSender) error { + if report == nil { + return errReportCallbackNil + } + s.report = report + return nil + } +} + +// withHTTPAddress provides a functional option to set the destination http +// address. +func withHTTPAddress(addr string) httpSenderOption { + return func(s *httpSender) error { + s.addr = addr + return nil + } +} + // httpSender provides an implemntation of io.Writer to perform sends to a http // destination. type httpSender struct { client *netsender.Sender log logging.Logger report func(sent int) + addr string } // newHttpSender returns a pointer to a new httpSender. // report is callback that can be used to report the amount of data sent per write. // This can be set to nil. -func newHTTPSender(ns *netsender.Sender, log logging.Logger, report func(sent int)) *httpSender { - return &httpSender{ - client: ns, - log: log, - report: report, +func newHTTPSender(ns *netsender.Sender, log logging.Logger, opts ...httpSenderOption) (*httpSender, error) { + s := &httpSender{client: ns, log: log} + for _, opt := range opts { + err := opt(s) + if err != nil { + return nil, err + } } + return s, nil } // Write implements io.Writer. func (s *httpSender) Write(d []byte) (int, error) { s.log.Debug("HTTP sending") - err := httpSend(d, s.client, s.log) + err := httpSend(d, s.client, s.log, s.addr) if err == nil { s.log.Debug("good send", "len", len(d)) if s.report != nil { @@ -96,7 +125,7 @@ func (s *httpSender) Write(d []byte) (int, error) { func (s *httpSender) Close() error { return nil } -func httpSend(d []byte, client *netsender.Sender, log logging.Logger) error { +func httpSend(d []byte, client *netsender.Sender, log logging.Logger, addr string) error { // Only send if "V0" or "S0" are configured as input. send := false ip := client.Param("ip") @@ -120,9 +149,7 @@ func httpSend(d []byte, client *netsender.Sender, log logging.Logger) error { if !send { return nil } - var err error - var reply string - reply, _, err = client.Send(netsender.RequestRecv, pins) + reply, _, err := client.Send(netsender.RequestRecv, pins, netsender.WithRecvAddress(addr)) if err != nil { return err }