av/revid/senders.go

460 lines
11 KiB
Go

/*
NAME
senders.go
DESCRIPTION
See Readme.md
AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org>
Alan Noble <alan@ausocean.org>
LICENSE
revid is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package revid
import (
"fmt"
"net"
"os"
"strconv"
"github.com/Comcast/gots/packet"
"bitbucket.org/ausocean/av/rtmp"
"bitbucket.org/ausocean/av/stream/mts"
"bitbucket.org/ausocean/av/stream/rtp"
"bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/logger"
)
// Sender is intended to provided functionality for the sending of a byte slice
// to an implemented destination.
type Sender interface {
// send takes the bytes slice d and sends to a particular destination as
// implemented.
send(d []byte) error
}
// multiSender allows for the sending through multi loadSenders using a single
// call to multiSender.Write.
type multiSender struct {
owner *Revid
senders []loadSender
}
// newMultiSender returns a pointer to a new multiSender.
func newMultiSender(owner *Revid, senders []loadSender) *multiSender {
return &multiSender{owner: owner, senders: senders}
}
// Write implements io.Writer. The written slice will be sent to each loadSender
// in multiSender.senders. If s.owner.config.SendRetry is true then on failed
// sends we notify the current sender to take any required actions and then try
// the send again.
func (s *multiSender) Write(d []byte) (int, error) {
for i, sender := range s.senders {
sender.load(d)
s.owner.config.Logger.Log(logger.Debug, "sending to output", "output", i)
err := sender.send()
if err != nil {
if !s.owner.IsRunning() {
return 0, err
}
if s.owner.config.SendRetry {
for err != nil {
s.owner.config.Logger.Log(logger.Warning, "send failed", "output", i, "error", err)
err = sender.handleSendFail(err)
if err != nil {
s.owner.config.Logger.Log(logger.Warning, "could not currenty handle send fail", "output", i, "error", err)
continue
}
err = sender.send()
}
}
}
}
return len(d), nil
}
// minimalHttpSender implements Sender for posting HTTP to netreceiver or vidgrind.
type minimalHttpSender struct {
client *netsender.Sender
log func(lvl int8, msg string, args ...interface{})
}
// newMinimalHttpSender returns a pointer to a new minimalHttpSender.
func newMinimalHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *minimalHttpSender {
return &minimalHttpSender{
client: ns,
log: log,
}
}
// send takes a bytes slice d and sends to http using s' http client.
func (s *minimalHttpSender) send(d []byte) error {
return httpSend(d, s.client, s.log)
}
// loadSender is a destination to send a *ring.Chunk to.
// When a loadSender has finished using the *ring.Chunk
// it must be Closed.
type loadSender interface {
// load assigns the *ring.Chunk to the loadSender.
// The load call may fail, but must not mutate the
// the chunk.
load(d []byte) error
// send performs a destination-specific send
// operation. It must not mutate the chunk.
send() error
// release releases the *ring.Chunk.
release()
// close cleans up after use of the loadSender.
close() error
// handleSendFail performs any actions necessary in response to a failed send.
handleSendFail(err error) error
}
// restart is an optional interface for loadSenders that
// can restart their connection.
type restarter interface {
restart() error
}
// fileSender implements loadSender for a local file destination.
type fileSender struct {
file *os.File
data []byte
}
func newFileSender(path string) (*fileSender, error) {
f, err := os.Create(path)
if err != nil {
return nil, err
}
return &fileSender{file: f}, nil
}
func (s *fileSender) load(d []byte) error {
s.data = d
return nil
}
func (s *fileSender) send() error {
_, err := s.file.Write(s.data)
return err
}
func (s *fileSender) release() {}
func (s *fileSender) close() error {
return s.file.Close()
}
func (s *fileSender) handleSendFail(err error) error { return nil }
// mtsSender implemented loadSender and provides sending capability specifically
// for use with MPEGTS packetization. It handles the construction of appropriately
// lengthed clips based on PSI. It also fixes accounts for discontinuities by
// setting the discontinuity indicator for the first packet of a clip.
type mtsSender struct {
sender Sender
buf []byte
next []byte
pkt packet.Packet
failed bool
discarded bool
repairer *mts.DiscontinuityRepairer
curPid int
}
// newMtsSender returns a new mtsSender.
func newMtsSender(s Sender, log func(lvl int8, msg string, args ...interface{})) *mtsSender {
return &mtsSender{
sender: s,
repairer: mts.NewDiscontinuityRepairer(),
}
}
// load takes a *ring.Chunk and assigns to s.next, also grabbing it's pid and
// assigning to s.curPid. s.next if exists is also appended to the sender buf.
func (s *mtsSender) load(d []byte) error {
if s.next != nil {
s.buf = append(s.buf, s.next...)
}
bytes := make([]byte, len(d))
copy(bytes, d)
s.next = bytes
copy(s.pkt[:], bytes)
s.curPid = s.pkt.PID()
return nil
}
// send checks the currently loaded paackets PID; if it is a PAT then what is in
// the mtsSenders buffer is fixed and sent.
func (ms *mtsSender) send() error {
if ms.curPid == mts.PatPid && len(ms.buf) > 0 {
err := ms.fixAndSend()
if err != nil {
return err
}
ms.buf = ms.buf[:0]
}
return nil
}
// fixAndSend checks for discontinuities in the senders buffer and then sends.
// If a discontinuity is found the PAT packet at the start of the clip has it's
// discontintuity indicator set to true.
func (ms *mtsSender) fixAndSend() error {
err := ms.repairer.Repair(ms.buf)
if err == nil {
err = ms.sender.send(ms.buf)
if err == nil {
return nil
}
}
ms.failed = true
ms.repairer.Failed()
return err
}
func (s *mtsSender) close() error { return nil }
// release will set the s.fail flag to false and clear the buffer if
// the previous send was a fail. The currently loaded chunk is also closed.
func (s *mtsSender) release() {
if s.failed {
s.buf = s.buf[:0]
s.failed = false
}
}
func (s *mtsSender) handleSendFail(err error) error { return nil }
// httpSender implements loadSender for posting HTTP to NetReceiver
type httpSender struct {
client *netsender.Sender
log func(lvl int8, msg string, args ...interface{})
data []byte
}
func newHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *httpSender {
return &httpSender{
client: ns,
log: log,
}
}
func (s *httpSender) load(d []byte) error {
s.data = d
return nil
}
func (s *httpSender) send() error {
return httpSend(s.data, s.client, s.log)
}
func httpSend(d []byte, client *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) error {
// Only send if "V0" is configured as an input.
send := false
ip := client.Param("ip")
pins := netsender.MakePins(ip, "V")
for i, pin := range pins {
if pin.Name == "V0" {
send = true
pins[i].Value = len(d)
pins[i].Data = d
pins[i].MimeType = "video/mp2t"
break
}
}
if !send {
return nil
}
var err error
var reply string
reply, _, err = client.Send(netsender.RequestRecv, pins)
if err != nil {
return err
}
return extractMeta(reply, log)
}
// extractMeta looks at a reply at extracts any time or location data - then used
// to update time and location information in the mpegts encoder.
func extractMeta(r string, log func(lvl int8, msg string, args ...interface{})) error {
dec, err := netsender.NewJSONDecoder(r)
if err != nil {
return nil
}
// Extract time from reply
t, err := dec.Int("ts")
if err != nil {
log(logger.Warning, pkg+"No timestamp in reply")
} else {
log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t))
mts.Meta.Add("ts", strconv.Itoa(t))
}
// Extract location from reply
g, err := dec.String("ll")
if err != nil {
log(logger.Warning, pkg+"No location in reply")
} else {
log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g))
mts.Meta.Add("loc", g)
}
return nil
}
func (s *httpSender) release() {}
func (s *httpSender) close() error { return nil }
func (s *httpSender) handleSendFail(err error) error { return nil }
// rtmpSender implements loadSender for a native RTMP destination.
type rtmpSender struct {
conn *rtmp.Conn
url string
timeout uint
retries int
log func(lvl int8, msg string, args ...interface{})
data []byte
}
var _ restarter = (*rtmpSender)(nil)
func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) {
var conn *rtmp.Conn
var err error
for n := 0; n < retries; n++ {
conn, err = rtmp.Dial(url, timeout, log)
if err == nil {
break
}
log(logger.Error, err.Error())
if n < retries-1 {
log(logger.Info, pkg+"retry rtmp connection")
}
}
if err != nil {
return nil, err
}
s := &rtmpSender{
conn: conn,
url: url,
timeout: timeout,
retries: retries,
log: log,
}
return s, nil
}
func (s *rtmpSender) load(d []byte) error {
s.data = make([]byte, len(d))
copy(s.data, d)
return nil
}
func (s *rtmpSender) send() error {
_, err := s.conn.Write(s.data)
if err == rtmp.ErrInvalidFlvTag {
return nil
}
return err
}
func (s *rtmpSender) release() {}
func (s *rtmpSender) restart() error {
s.close()
var err error
for n := 0; n < s.retries; n++ {
s.conn, err = rtmp.Dial(s.url, s.timeout, s.log)
if err == nil {
break
}
s.log(logger.Error, err.Error())
if n < s.retries-1 {
s.log(logger.Info, pkg+"retry rtmp connection")
}
}
return err
}
func (s *rtmpSender) close() error {
if s.conn != nil {
return s.conn.Close()
}
return nil
}
func (s *rtmpSender) handleSendFail(err error) error {
return s.restart()
}
// TODO: Write restart func for rtpSender
// rtpSender implements loadSender for a native udp destination with rtp packetization.
type rtpSender struct {
log func(lvl int8, msg string, args ...interface{})
encoder *rtp.Encoder
data []byte
}
func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps uint) (*rtpSender, error) {
conn, err := net.Dial("udp", addr)
if err != nil {
return nil, err
}
s := &rtpSender{
log: log,
encoder: rtp.NewEncoder(conn, int(fps)),
}
return s, nil
}
func (s *rtpSender) load(d []byte) error {
s.data = make([]byte, len(d))
copy(s.data, d)
return nil
}
func (s *rtpSender) close() error { return nil }
func (s *rtpSender) release() {}
func (s *rtpSender) send() error {
_, err := s.encoder.Write(s.data)
return err
}
func (s *rtpSender) handleSendFail(err error) error { return nil }