revid/senders.go: Utilise HTTPAddress variable in config

This change bumps the required iot tag and adds functional option
variadic parameters to the httpSender constructor so that we can
provide the HTTPAddress config variable and modify the httpSend
destination address. In addition to this, we have added an option
for the report callback.
This commit is contained in:
Saxon Nelson-Milton 2023-02-04 18:17:29 +10:30
parent 2da3d399d2
commit 4fe01f6899
4 changed files with 44 additions and 14 deletions

2
go.mod
View File

@ -3,7 +3,7 @@ module bitbucket.org/ausocean/av
go 1.18
require (
bitbucket.org/ausocean/iot v1.3.3
bitbucket.org/ausocean/iot v1.4.0
bitbucket.org/ausocean/utils v1.3.2
github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf

4
go.sum
View File

@ -1,6 +1,6 @@
bitbucket.org/ausocean/iot v1.3.0/go.mod h1:rRcWt6SoM/jgIZpP1zrpnKb5BhxIMulAJ+q1xTvLh94=
bitbucket.org/ausocean/iot v1.3.3 h1:xEu6jrvWEofzFo13154FS66g4i2Ojl94kheoUshFB/A=
bitbucket.org/ausocean/iot v1.3.3/go.mod h1:NbEg2PvYSHDdUsy5eMmihBySpWfqaHiMdspQDZdDe8o=
bitbucket.org/ausocean/iot v1.4.0 h1:DzySaszy8dPGv7eb5cdigAfm3d3y582Hr/Sy4nMlvJw=
bitbucket.org/ausocean/iot v1.4.0/go.mod h1:NbEg2PvYSHDdUsy5eMmihBySpWfqaHiMdspQDZdDe8o=
bitbucket.org/ausocean/utils v1.2.11/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8=
bitbucket.org/ausocean/utils v1.3.0/go.mod h1:yWsulKjbBgwL17/w55MQ6cIT9jmNeOkwpd2gUIxAcIY=
bitbucket.org/ausocean/utils v1.3.2 h1:U+jNixVPH1Ih0QpOckI4djhEc/T6FAzt9znADRcJ07s=

View File

@ -183,7 +183,10 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
case config.OutputHTTP:
r.cfg.Logger.Debug("using HTTP output")
pb := pool.NewBuffer(int(r.cfg.PoolStartElementSize), int(nElements), writeTimeout)
hs := newHTTPSender(r.ns, r.cfg.Logger, r.bitrate.Report)
hs, err := newHTTPSender(r.ns, r.cfg.Logger, withReportCallback(r.bitrate.Report), withHTTPAddress(r.cfg.HTTPAddress))
if err != nil {
return fmt.Errorf("coult not create http sender: %w", err)
}
w = newMTSSender(hs, r.cfg.Logger, pb, r.cfg.ClipDuration)
mtsSenders = append(mtsSenders, w)

View File

@ -60,29 +60,58 @@ var (
adjustedMTSPoolElementSize int
)
var errReportCallbackNil = errors.New("report callback is nil")
type httpSenderOption func(s *httpSender) error
// withReportCallback provides a functional option to set the report callback
// function. This can be used to record the number of bytes sent.
func withReportCallback(report func(sent int)) httpSenderOption {
return func(s *httpSender) error {
if report == nil {
return errReportCallbackNil
}
s.report = report
return nil
}
}
// withHTTPAddress provides a functional option to set the destination http
// address.
func withHTTPAddress(addr string) httpSenderOption {
return func(s *httpSender) error {
s.addr = addr
return nil
}
}
// httpSender provides an implemntation of io.Writer to perform sends to a http
// destination.
type httpSender struct {
client *netsender.Sender
log logging.Logger
report func(sent int)
addr string
}
// newHttpSender returns a pointer to a new httpSender.
// report is callback that can be used to report the amount of data sent per write.
// This can be set to nil.
func newHTTPSender(ns *netsender.Sender, log logging.Logger, report func(sent int)) *httpSender {
return &httpSender{
client: ns,
log: log,
report: report,
func newHTTPSender(ns *netsender.Sender, log logging.Logger, opts ...httpSenderOption) (*httpSender, error) {
s := &httpSender{client: ns, log: log}
for _, opt := range opts {
err := opt(s)
if err != nil {
return nil, err
}
}
return s, nil
}
// Write implements io.Writer.
func (s *httpSender) Write(d []byte) (int, error) {
s.log.Debug("HTTP sending")
err := httpSend(d, s.client, s.log)
err := httpSend(d, s.client, s.log, s.addr)
if err == nil {
s.log.Debug("good send", "len", len(d))
if s.report != nil {
@ -96,7 +125,7 @@ func (s *httpSender) Write(d []byte) (int, error) {
func (s *httpSender) Close() error { return nil }
func httpSend(d []byte, client *netsender.Sender, log logging.Logger) error {
func httpSend(d []byte, client *netsender.Sender, log logging.Logger, addr string) error {
// Only send if "V0" or "S0" are configured as input.
send := false
ip := client.Param("ip")
@ -120,9 +149,7 @@ func httpSend(d []byte, client *netsender.Sender, log logging.Logger) error {
if !send {
return nil
}
var err error
var reply string
reply, _, err = client.Send(netsender.RequestRecv, pins)
reply, _, err := client.Send(netsender.RequestRecv, pins, netsender.WithRecvAddress(addr))
if err != nil {
return err
}