Merged in http-address-variable (pull request #494)

revid/senders.go: Utilise HTTPAddress variable in config

Resolves issue #394
This commit is contained in:
Saxon Milton 2023-02-04 08:30:48 +00:00
commit 0f0deaa598
4 changed files with 44 additions and 14 deletions

2
go.mod
View File

@ -3,7 +3,7 @@ module bitbucket.org/ausocean/av
go 1.18 go 1.18
require ( require (
bitbucket.org/ausocean/iot v1.3.3 bitbucket.org/ausocean/iot v1.4.0
bitbucket.org/ausocean/utils v1.3.2 bitbucket.org/ausocean/utils v1.3.2
github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf

4
go.sum
View File

@ -1,6 +1,6 @@
bitbucket.org/ausocean/iot v1.3.0/go.mod h1:rRcWt6SoM/jgIZpP1zrpnKb5BhxIMulAJ+q1xTvLh94= bitbucket.org/ausocean/iot v1.3.0/go.mod h1:rRcWt6SoM/jgIZpP1zrpnKb5BhxIMulAJ+q1xTvLh94=
bitbucket.org/ausocean/iot v1.3.3 h1:xEu6jrvWEofzFo13154FS66g4i2Ojl94kheoUshFB/A= bitbucket.org/ausocean/iot v1.4.0 h1:DzySaszy8dPGv7eb5cdigAfm3d3y582Hr/Sy4nMlvJw=
bitbucket.org/ausocean/iot v1.3.3/go.mod h1:NbEg2PvYSHDdUsy5eMmihBySpWfqaHiMdspQDZdDe8o= bitbucket.org/ausocean/iot v1.4.0/go.mod h1:NbEg2PvYSHDdUsy5eMmihBySpWfqaHiMdspQDZdDe8o=
bitbucket.org/ausocean/utils v1.2.11/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8= bitbucket.org/ausocean/utils v1.2.11/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8=
bitbucket.org/ausocean/utils v1.3.0/go.mod h1:yWsulKjbBgwL17/w55MQ6cIT9jmNeOkwpd2gUIxAcIY= bitbucket.org/ausocean/utils v1.3.0/go.mod h1:yWsulKjbBgwL17/w55MQ6cIT9jmNeOkwpd2gUIxAcIY=
bitbucket.org/ausocean/utils v1.3.2 h1:U+jNixVPH1Ih0QpOckI4djhEc/T6FAzt9znADRcJ07s= bitbucket.org/ausocean/utils v1.3.2 h1:U+jNixVPH1Ih0QpOckI4djhEc/T6FAzt9znADRcJ07s=

View File

@ -183,7 +183,10 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
case config.OutputHTTP: case config.OutputHTTP:
r.cfg.Logger.Debug("using HTTP output") r.cfg.Logger.Debug("using HTTP output")
pb := pool.NewBuffer(int(r.cfg.PoolStartElementSize), int(nElements), writeTimeout) pb := pool.NewBuffer(int(r.cfg.PoolStartElementSize), int(nElements), writeTimeout)
hs := newHTTPSender(r.ns, r.cfg.Logger, r.bitrate.Report) hs, err := newHTTPSender(r.ns, r.cfg.Logger, withReportCallback(r.bitrate.Report), withHTTPAddress(r.cfg.HTTPAddress))
if err != nil {
return fmt.Errorf("coult not create http sender: %w", err)
}
w = newMTSSender(hs, r.cfg.Logger, pb, r.cfg.ClipDuration) w = newMTSSender(hs, r.cfg.Logger, pb, r.cfg.ClipDuration)
mtsSenders = append(mtsSenders, w) mtsSenders = append(mtsSenders, w)

View File

@ -60,29 +60,58 @@ var (
adjustedMTSPoolElementSize int adjustedMTSPoolElementSize int
) )
var errReportCallbackNil = errors.New("report callback is nil")
type httpSenderOption func(s *httpSender) error
// withReportCallback provides a functional option to set the report callback
// function. This can be used to record the number of bytes sent.
func withReportCallback(report func(sent int)) httpSenderOption {
return func(s *httpSender) error {
if report == nil {
return errReportCallbackNil
}
s.report = report
return nil
}
}
// withHTTPAddress provides a functional option to set the destination http
// address.
func withHTTPAddress(addr string) httpSenderOption {
return func(s *httpSender) error {
s.addr = addr
return nil
}
}
// httpSender provides an implemntation of io.Writer to perform sends to a http // httpSender provides an implemntation of io.Writer to perform sends to a http
// destination. // destination.
type httpSender struct { type httpSender struct {
client *netsender.Sender client *netsender.Sender
log logging.Logger log logging.Logger
report func(sent int) report func(sent int)
addr string
} }
// newHttpSender returns a pointer to a new httpSender. // newHttpSender returns a pointer to a new httpSender.
// report is callback that can be used to report the amount of data sent per write. // report is callback that can be used to report the amount of data sent per write.
// This can be set to nil. // This can be set to nil.
func newHTTPSender(ns *netsender.Sender, log logging.Logger, report func(sent int)) *httpSender { func newHTTPSender(ns *netsender.Sender, log logging.Logger, opts ...httpSenderOption) (*httpSender, error) {
return &httpSender{ s := &httpSender{client: ns, log: log}
client: ns, for _, opt := range opts {
log: log, err := opt(s)
report: report, if err != nil {
return nil, err
}
} }
return s, nil
} }
// Write implements io.Writer. // Write implements io.Writer.
func (s *httpSender) Write(d []byte) (int, error) { func (s *httpSender) Write(d []byte) (int, error) {
s.log.Debug("HTTP sending") s.log.Debug("HTTP sending")
err := httpSend(d, s.client, s.log) err := httpSend(d, s.client, s.log, s.addr)
if err == nil { if err == nil {
s.log.Debug("good send", "len", len(d)) s.log.Debug("good send", "len", len(d))
if s.report != nil { if s.report != nil {
@ -96,7 +125,7 @@ func (s *httpSender) Write(d []byte) (int, error) {
func (s *httpSender) Close() error { return nil } func (s *httpSender) Close() error { return nil }
func httpSend(d []byte, client *netsender.Sender, log logging.Logger) error { func httpSend(d []byte, client *netsender.Sender, log logging.Logger, addr string) error {
// Only send if "V0" or "S0" are configured as input. // Only send if "V0" or "S0" are configured as input.
send := false send := false
ip := client.Param("ip") ip := client.Param("ip")
@ -120,9 +149,7 @@ func httpSend(d []byte, client *netsender.Sender, log logging.Logger) error {
if !send { if !send {
return nil return nil
} }
var err error reply, _, err := client.Send(netsender.RequestRecv, pins, netsender.WithRecvAddress(addr))
var reply string
reply, _, err = client.Send(netsender.RequestRecv, pins)
if err != nil { if err != nil {
return err return err
} }