av/revid/senders.go

508 lines
13 KiB
Go

/*
NAME
senders.go
DESCRIPTION
See Readme.md
AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org>
Alan Noble <alan@ausocean.org>
LICENSE
revid is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package revid
import (
"errors"
"fmt"
"io"
"net"
"os"
"sync"
"time"
"github.com/Comcast/gots/packet"
"bitbucket.org/ausocean/av/container/mts"
"bitbucket.org/ausocean/av/protocol/rtmp"
"bitbucket.org/ausocean/av/protocol/rtp"
"bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/logging"
"bitbucket.org/ausocean/utils/pool"
)
// Log is used by the multiSender.
type Log func(level int8, message string, params ...interface{})
// Sender pool buffer read timeouts.
const (
rtmpPoolReadTimeout = 1 * time.Second
mtsPoolReadTimeout = 1 * time.Second
maxBuffLen = 50000000
)
var (
adjustedRTMPPoolElementSize int
adjustedMTSPoolElementSize int
)
// httpSender provides an implemntation of io.Writer to perform sends to a http
// destination.
type httpSender struct {
client *netsender.Sender
log func(lvl int8, msg string, args ...interface{})
report func(sent int)
}
// newHttpSender returns a pointer to a new httpSender.
func newHTTPSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{}), report func(sent int)) *httpSender {
return &httpSender{
client: ns,
log: log,
report: report,
}
}
// Write implements io.Writer.
func (s *httpSender) Write(d []byte) (int, error) {
s.log(logging.Debug, "HTTP sending")
err := httpSend(d, s.client, s.log)
if err == nil {
s.log(logging.Debug, "good send", "len", len(d))
s.report(len(d))
} else {
s.log(logging.Debug, "bad send", "error", err)
}
return len(d), err
}
func (s *httpSender) Close() error { return nil }
func httpSend(d []byte, client *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) error {
// Only send if "V0" or "S0" are configured as input.
send := false
ip := client.Param("ip")
log(logging.Debug, "making pins, and sending recv request", "ip", ip)
pins := netsender.MakePins(ip, "V,S")
for i, pin := range pins {
switch pin.Name {
case "V0":
pins[i].MimeType = "video/mp2t"
case "S0":
pins[i].MimeType = "audio/x-wav"
default:
continue
}
pins[i].Value = len(d)
pins[i].Data = d
send = true
break
}
if !send {
return nil
}
var err error
var reply string
reply, _, err = client.Send(netsender.RequestRecv, pins)
if err != nil {
return err
}
log(logging.Debug, "good request", "reply", reply)
return extractMeta(reply, log)
}
// extractMeta looks at a reply at extracts any time or location data - then used
// to update time and location information in the mpegts encoder.
func extractMeta(r string, log func(lvl int8, msg string, args ...interface{})) error {
dec, err := netsender.NewJSONDecoder(r)
if err != nil {
return nil
}
// Extract time from reply if mts.Realtime has not been set.
if !mts.RealTime.IsSet() {
t, err := dec.Int("ts")
if err != nil {
log(logging.Warning, "No timestamp in reply")
} else {
log(logging.Debug, "got timestamp", "ts", t)
mts.RealTime.Set(time.Unix(int64(t), 0))
}
}
// Extract location from reply
g, err := dec.String("ll")
if err != nil {
log(logging.Debug, "No location in reply")
} else {
log(logging.Debug, fmt.Sprintf("got location: %v", g))
mts.Meta.Add(mts.LocationKey, g)
}
return nil
}
// fileSender implements loadSender for a local file destination.
type fileSender struct {
file *os.File
data []byte
multiFile bool
path string
init bool
log logging.Logger
}
// newFileSender returns a new fileSender. Setting multi true will write a new
// file for each write to this sender.
func newFileSender(l logging.Logger, path string, multiFile bool) (*fileSender, error) {
return &fileSender{
path: path,
log: l,
multiFile: multiFile,
init: true,
}, nil
}
// Write implements io.Writer.
func (s *fileSender) Write(d []byte) (int, error) {
if s.init || s.multiFile {
fileName := s.path + time.Now().String()
s.log.Debug("creating new output file", "init", s.init, "multiFile", s.multiFile, "fileName", fileName)
f, err := os.Create(fileName)
if err != nil {
return 0, fmt.Errorf("could not create file to write media to: %w", err)
}
s.file = f
s.init = false
}
s.log.Debug("writing output file", "len(d)", len(d))
return s.file.Write(d)
}
func (s *fileSender) Close() error { return s.file.Close() }
// mtsSender implements io.WriteCloser and provides sending capability specifically
// for use with MPEGTS packetization. It handles the construction of appropriately
// lengthed clips based on clip duration and PSI. It also accounts for
// discontinuities by setting the discontinuity indicator for the first packet of a clip.
type mtsSender struct {
dst io.WriteCloser
buf []byte
pool *pool.Buffer
next []byte
pkt packet.Packet
repairer *mts.DiscontinuityRepairer
curPid int
clipDur time.Duration
prev time.Time
done chan struct{}
log func(lvl int8, msg string, args ...interface{})
wg sync.WaitGroup
}
// newMtsSender returns a new mtsSender.
func newMTSSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rb *pool.Buffer, clipDur time.Duration) *mtsSender {
log(logging.Debug, "setting up mtsSender", "clip duration", int(clipDur))
s := &mtsSender{
dst: dst,
repairer: mts.NewDiscontinuityRepairer(),
log: log,
pool: rb,
done: make(chan struct{}),
clipDur: clipDur,
}
s.wg.Add(1)
go s.output()
return s
}
// output starts an mtsSender's data handling routine.
func (s *mtsSender) output() {
var chunk *pool.Chunk
for {
select {
case <-s.done:
s.log(logging.Info, "terminating sender output routine")
defer s.wg.Done()
return
default:
// If chunk is nil then we're ready to get another from the ringBuffer.
if chunk == nil {
var err error
chunk, err = s.pool.Next(mtsPoolReadTimeout)
switch err {
case nil, io.EOF:
continue
case pool.ErrTimeout:
s.log(logging.Debug, "mtsSender: pool buffer read timeout")
continue
default:
s.log(logging.Error, "unexpected error", "error", err.Error())
continue
}
}
err := s.repairer.Repair(chunk.Bytes())
if err != nil {
chunk.Close()
chunk = nil
continue
}
s.log(logging.Debug, "mtsSender: writing")
_, err = s.dst.Write(chunk.Bytes())
if err != nil {
s.log(logging.Debug, "failed write, repairing MTS", "error", err)
s.repairer.Failed()
continue
} else {
s.log(logging.Debug, "good write")
}
chunk.Close()
chunk = nil
}
}
}
// Write implements io.Writer.
func (s *mtsSender) Write(d []byte) (int, error) {
if len(d) < mts.PacketSize {
return 0, errors.New("do not have full MTS packet")
}
if s.next != nil {
s.log(logging.Debug, "appending packet to clip")
s.buf = append(s.buf, s.next...)
}
bytes := make([]byte, len(d))
copy(bytes, d)
s.next = bytes
p, _ := mts.PID(bytes)
s.curPid = int(p)
curDur := time.Now().Sub(s.prev)
s.log(logging.Debug, "checking send conditions", "curDuration", int(curDur), "sendDur", int(s.clipDur), "curPID", s.curPid, "len", len(s.buf))
if curDur >= s.clipDur && s.curPid == mts.PatPid && len(s.buf) > 0 {
s.log(logging.Debug, "writing clip to pool buffer for sending", "size", len(s.buf))
s.prev = time.Now()
n, err := s.pool.Write(s.buf)
if err == nil {
s.pool.Flush()
}
if err != nil {
s.log(logging.Warning, "ringBuffer write error", "error", err.Error(), "n", n, "size", len(s.buf), "rb element size", adjustedMTSPoolElementSize)
if err == pool.ErrTooLong {
adjustedMTSPoolElementSize = len(s.buf) * 2
numElements := maxBuffLen / adjustedMTSPoolElementSize
s.pool = pool.NewBuffer(maxBuffLen/adjustedMTSPoolElementSize, adjustedMTSPoolElementSize, 5*time.Second)
s.log(logging.Info, "adjusted MTS pool buffer element size", "new size", adjustedMTSPoolElementSize, "num elements", numElements, "size(MB)", numElements*adjustedMTSPoolElementSize)
}
}
s.buf = s.buf[:0]
}
return len(d), nil
}
// Close implements io.Closer.
func (s *mtsSender) Close() error {
s.log(logging.Debug, "closing sender output routine")
close(s.done)
s.wg.Wait()
s.log(logging.Info, "sender output routine closed")
return nil
}
// rtmpSender implements loadSender for a native RTMP destination.
type rtmpSender struct {
conn *rtmp.Conn
url string
retries int
log func(lvl int8, msg string, args ...interface{})
pool *pool.Buffer
done chan struct{}
wg sync.WaitGroup
report func(sent int)
}
func newRtmpSender(url string, retries int, rb *pool.Buffer, log func(lvl int8, msg string, args ...interface{}), report func(sent int)) (*rtmpSender, error) {
var conn *rtmp.Conn
var err error
for n := 0; n < retries; n++ {
conn, err = rtmp.Dial(url, log)
if err == nil {
break
}
log(logging.Error, "dial error", "error", err)
if n < retries-1 {
log(logging.Info, "retrying dial")
}
}
s := &rtmpSender{
conn: conn,
url: url,
retries: retries,
log: log,
pool: rb,
done: make(chan struct{}),
report: report,
}
s.wg.Add(1)
go s.output()
return s, err
}
// output starts an mtsSender's data handling routine.
func (s *rtmpSender) output() {
var chunk *pool.Chunk
for {
select {
case <-s.done:
s.log(logging.Info, "terminating sender output routine")
defer s.wg.Done()
return
default:
// If chunk is nil then we're ready to get another from the pool buffer.
if chunk == nil {
var err error
chunk, err = s.pool.Next(rtmpPoolReadTimeout)
switch err {
case nil, io.EOF:
continue
case pool.ErrTimeout:
s.log(logging.Debug, "rtmpSender: pool buffer read timeout")
continue
default:
s.log(logging.Error, "unexpected error", "error", err.Error())
continue
}
}
if s.conn == nil {
s.log(logging.Warning, "no rtmp connection, re-dialing")
err := s.restart()
if err != nil {
s.log(logging.Warning, "could not restart connection", "error", err)
continue
}
}
_, err := s.conn.Write(chunk.Bytes())
switch err {
case nil, rtmp.ErrInvalidFlvTag:
s.log(logging.Debug, "good write to conn")
default:
s.log(logging.Warning, "send error, re-dialing", "error", err)
err = s.restart()
if err != nil {
s.log(logging.Warning, "could not restart connection", "error", err)
}
continue
}
chunk.Close()
chunk = nil
}
}
}
// Write implements io.Writer.
func (s *rtmpSender) Write(d []byte) (int, error) {
s.log(logging.Debug, "writing to pool buffer")
_, err := s.pool.Write(d)
if err == nil {
s.pool.Flush()
s.log(logging.Debug, "good pool buffer write", "len", len(d))
} else {
s.log(logging.Warning, "pool buffer write error", "error", err.Error())
if err == pool.ErrTooLong {
adjustedRTMPPoolElementSize = len(d) * 2
numElements := maxBuffLen / adjustedRTMPPoolElementSize
s.pool = pool.NewBuffer(numElements, adjustedRTMPPoolElementSize, 5*time.Second)
s.log(logging.Info, "adjusted RTMP pool buffer element size", "new size", adjustedRTMPPoolElementSize, "num elements", numElements, "size(MB)", numElements*adjustedRTMPPoolElementSize)
}
}
s.report(len(d))
return len(d), nil
}
func (s *rtmpSender) restart() error {
s.close()
var err error
for n := 0; n < s.retries; n++ {
s.log(logging.Debug, "dialing", "dials", n)
s.conn, err = rtmp.Dial(s.url, s.log)
if err == nil {
break
}
s.log(logging.Error, "dial error", "error", err)
if n < s.retries-1 {
s.log(logging.Info, "retry rtmp connection")
}
}
return err
}
func (s *rtmpSender) Close() error {
s.log(logging.Debug, "closing output routine")
if s.done != nil {
close(s.done)
}
s.wg.Wait()
s.log(logging.Info, "output routine closed")
return s.close()
}
func (s *rtmpSender) close() error {
s.log(logging.Debug, "closing connection")
if s.conn == nil {
return nil
}
return s.conn.Close()
}
// TODO: Write restart func for rtpSender
// rtpSender implements loadSender for a native udp destination with rtp packetization.
type rtpSender struct {
log func(lvl int8, msg string, args ...interface{})
encoder *rtp.Encoder
data []byte
report func(sent int)
}
func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps uint, report func(sent int)) (*rtpSender, error) {
conn, err := net.Dial("udp", addr)
if err != nil {
return nil, err
}
s := &rtpSender{
log: log,
encoder: rtp.NewEncoder(conn, int(fps)),
report: report,
}
return s, nil
}
// Write implements io.Writer.
func (s *rtpSender) Write(d []byte) (int, error) {
s.data = make([]byte, len(d))
copy(s.data, d)
_, err := s.encoder.Write(s.data)
if err != nil {
s.log(logging.Warning, "rtpSender: write error", err.Error())
}
s.report(len(d))
return len(d), nil
}
func (s *rtpSender) Close() error { return nil }