mirror of https://bitbucket.org/ausocean/av.git
httpSender now implemented as a NetSender client, rather than a generic HTTP client.
This commit is contained in:
parent
3f59d353c7
commit
5ba5327f33
|
@ -32,14 +32,12 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
|
||||||
"net/http"
|
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"strconv"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"bitbucket.org/ausocean/av/rtmp"
|
"bitbucket.org/ausocean/av/rtmp"
|
||||||
|
"bitbucket.org/ausocean/iot/pi/netsender"
|
||||||
"bitbucket.org/ausocean/utils/ring"
|
"bitbucket.org/ausocean/utils/ring"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -103,10 +101,9 @@ func (s *fileSender) close() error {
|
||||||
return s.file.Close()
|
return s.file.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// httpSender implements loadSender for an HTTP destination.
|
// httpSender implements loadSender for posting HTTP to NetReceiver
|
||||||
type httpSender struct {
|
type httpSender struct {
|
||||||
client http.Client
|
client *netsender.Sender
|
||||||
url string
|
|
||||||
|
|
||||||
log func(lvl, msg string)
|
log func(lvl, msg string)
|
||||||
|
|
||||||
|
@ -115,10 +112,11 @@ type httpSender struct {
|
||||||
chunk *ring.Chunk
|
chunk *ring.Chunk
|
||||||
}
|
}
|
||||||
|
|
||||||
func newHttpSender(url string, timeout time.Duration, log func(lvl, msg string)) *httpSender {
|
func newHttpSender(_ string, _ time.Duration, log func(lvl, msg string)) *httpSender {
|
||||||
|
var client netsender.Sender
|
||||||
|
client.Init(nil, nil, nil)
|
||||||
return &httpSender{
|
return &httpSender{
|
||||||
client: http.Client{Timeout: timeout},
|
client: &client,
|
||||||
url: url,
|
|
||||||
log: log,
|
log: log,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -129,27 +127,23 @@ func (s *httpSender) load(c *ring.Chunk) error {
|
||||||
_, err := s.chunk.WriteTo(buf)
|
_, err := s.chunk.WriteTo(buf)
|
||||||
s.buf = buf.Bytes()
|
s.buf = buf.Bytes()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("fileSender: %v", err)
|
return fmt.Errorf("httpSender: %v", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *httpSender) send() error {
|
func (s *httpSender) send() error {
|
||||||
url := s.url + strconv.Itoa(len(s.buf))
|
pins := netsender.MakePins("V0")
|
||||||
s.log(Debug, fmt.Sprintf("Posting %s (%d bytes)\n", url, len(s.buf)))
|
pins[0].Value = len(s.buf)
|
||||||
resp, err := s.client.Post(url, "video/mp2t", bytes.NewReader(s.buf))
|
var payload netsender.Payload
|
||||||
|
payload.Name = "V0"
|
||||||
|
payload.Data = s.buf
|
||||||
|
payload.MimeType = "video/mp2t"
|
||||||
|
_, _, err := s.client.Send(netsender.RequestPoll, pins, &payload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Error posting to %s: %s", url, err)
|
return err
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
return nil
|
||||||
|
|
||||||
body, err := ioutil.ReadAll(resp.Body)
|
|
||||||
if err == nil {
|
|
||||||
s.log(Debug, fmt.Sprintf("%s\n", body))
|
|
||||||
} else {
|
|
||||||
s.log(Error, err.Error())
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *httpSender) release() {
|
func (s *httpSender) release() {
|
||||||
|
|
Loading…
Reference in New Issue