av/revid/senders.go

395 lines
8.1 KiB
Go

/*
NAME
senders.go
DESCRIPTION
See Readme.md
AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org>
Alan Noble <alan@ausocean.org>
LICENSE
revid is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package revid
import (
"errors"
"fmt"
"io"
"net"
"os"
"os/exec"
"bitbucket.org/ausocean/av/rtmp"
"bitbucket.org/ausocean/av/stream/mts"
"bitbucket.org/ausocean/av/stream/rtp"
"bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring"
)
// loadSender is a destination to send a *ring.Chunk to.
// When a loadSender has finished using the *ring.Chunk
// it must be Closed.
type loadSender interface {
// load assigns the *ring.Chunk to the loadSender.
// The load call may fail, but must not mutate the
// the chunk.
load(*ring.Chunk) error
// send performs a destination-specific send
// operation. It must not mutate the chunk.
send() error
// release releases the *ring.Chunk.
release()
// close cleans up after use of the loadSender.
close() error
}
// restart is an optional interface for loadSenders that
// can restart their connection.
type restarter interface {
restart() error
}
// fileSender implements loadSender for a local file destination.
type fileSender struct {
file *os.File
chunk *ring.Chunk
}
func newFileSender(path string) (*fileSender, error) {
f, err := os.Create(path)
if err != nil {
return nil, err
}
return &fileSender{file: f}, nil
}
func (s *fileSender) load(c *ring.Chunk) error {
s.chunk = c
return nil
}
func (s *fileSender) send() error {
_, err := s.chunk.WriteTo(s.file)
return err
}
func (s *fileSender) release() {
s.chunk.Close()
s.chunk = nil
}
func (s *fileSender) close() error {
return s.file.Close()
}
// httpSender implements loadSender for posting HTTP to NetReceiver
type httpSender struct {
client *netsender.Sender
log func(lvl int8, msg string, args ...interface{})
chunk *ring.Chunk
}
func newHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *httpSender {
return &httpSender{
client: ns,
log: log,
}
}
func (s *httpSender) load(c *ring.Chunk) error {
s.chunk = c
return nil
}
func (s *httpSender) send() error {
if s.chunk == nil {
// Do not retry with httpSender,
// so just return without error
// if the chunk has been cleared.
return nil
}
// Only send if "V0" is configured as an input.
send := false
ip := s.client.Param("ip")
pins := netsender.MakePins(ip, "V")
for i, pin := range pins {
if pin.Name == "V0" {
send = true
pins[i].Value = s.chunk.Len()
pins[i].Data = s.chunk.Bytes()
pins[i].MimeType = "video/mp2t"
break
}
}
var err error
var reply string
if send {
reply, _, err = s.client.Send(netsender.RequestRecv, pins)
if err != nil {
return err
}
}
return s.extractMeta(reply)
}
// extractMeta looks at a reply at extracts any time or location data - then used
// to update time and location information in the mpegts encoder.
func (s *httpSender) extractMeta(r string) error {
dec, err := netsender.NewJSONDecoder(r)
if err != nil {
return nil
}
// Extract time from reply
t, err := dec.Int("ts")
if err != nil {
s.log(logger.Warning, pkg+"No timestamp in reply")
} else {
s.log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t))
mts.SetTimeStamp(uint64(t))
}
// Extract location from reply
g, err := dec.String("ll")
if err != nil {
s.log(logger.Warning, pkg+"No location in reply")
} else {
s.log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g))
mts.SetLocation(g)
}
return nil
}
func (s *httpSender) release() {
// We will not retry, so release
// the chunk and clear it now.
s.chunk.Close()
s.chunk = nil
}
func (s *httpSender) close() error { return nil }
// ffmpegSender implements loadSender for an FFMPEG RTMP destination.
type ffmpegSender struct {
ffmpeg io.WriteCloser
chunk *ring.Chunk
}
func newFfmpegSender(url, framerate string) (*ffmpegSender, error) {
cmd := exec.Command(ffmpegPath,
"-f", "h264",
"-r", framerate,
"-i", "-",
"-f", "lavfi",
"-i", "aevalsrc=0",
"-fflags", "nobuffer",
"-vcodec", "copy",
"-acodec", "aac",
"-map", "0:0",
"-map", "1:0",
"-strict", "experimental",
"-f", "flv",
url,
)
w, err := cmd.StdinPipe()
if err != nil {
return nil, err
}
err = cmd.Start()
if err != nil {
return nil, err
}
return &ffmpegSender{ffmpeg: w}, nil
}
func (s *ffmpegSender) load(c *ring.Chunk) error {
s.chunk = c
return nil
}
func (s *ffmpegSender) send() error {
_, err := s.chunk.WriteTo(s.ffmpeg)
return err
}
func (s *ffmpegSender) release() {
s.chunk.Close()
s.chunk = nil
}
func (s *ffmpegSender) close() error {
return s.ffmpeg.Close()
}
// rtmpSender implements loadSender for a native RTMP destination.
type rtmpSender struct {
sess *rtmp.Session
url string
timeout uint
retries int
log func(lvl int8, msg string, args ...interface{})
chunk *ring.Chunk
}
var _ restarter = (*rtmpSender)(nil)
func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) {
var sess *rtmp.Session
var err error
for n := 0; n < retries; n++ {
sess = rtmp.NewSession(url, timeout)
err = sess.Open()
if err == nil {
break
}
log(logger.Error, err.Error())
sess.Close()
if n < retries-1 {
log(logger.Info, pkg+"retry rtmp connection")
}
}
if err != nil {
return nil, err
}
s := &rtmpSender{
sess: sess,
url: url,
timeout: timeout,
retries: retries,
log: log,
}
return s, nil
}
func (s *rtmpSender) load(c *ring.Chunk) error {
s.chunk = c
return nil
}
func (s *rtmpSender) send() error {
_, err := s.chunk.WriteTo(s.sess)
return err
}
func (s *rtmpSender) release() {
s.chunk.Close()
s.chunk = nil
}
func (s *rtmpSender) restart() error {
err := s.sess.Close()
if err != nil {
return err
}
for n := 0; n < s.retries; n++ {
s.sess = rtmp.NewSession(s.url, s.timeout)
err = s.sess.Open()
if err == nil {
break
}
s.log(logger.Error, err.Error())
s.sess.Close()
if n < s.retries-1 {
s.log(logger.Info, pkg+"retry rtmp connection")
}
}
return err
}
func (s *rtmpSender) close() error {
return s.sess.Close()
}
// udpSender implements loadSender for a native udp destination.
type udpSender struct {
conn net.Conn
log func(lvl int8, msg string, args ...interface{})
chunk *ring.Chunk
}
func newUdpSender(addr string, log func(lvl int8, msg string, args ...interface{})) (*udpSender, error) {
conn, err := net.Dial("udp", addr)
if err != nil {
return nil, err
}
return &udpSender{
conn: conn,
log: log,
}, nil
}
func (s *udpSender) load(c *ring.Chunk) error {
s.chunk = c
return nil
}
func (s *udpSender) send() error {
_, err := s.chunk.WriteTo(s.conn)
return err
}
func (s *udpSender) release() {
s.chunk.Close()
s.chunk = nil
}
func (s *udpSender) close() error { return nil }
// TODO: Write restart func for rtpSender
// rtpSender implements loadSender for a native udp destination with rtp packetization.
type rtpSender struct {
log func(lvl int8, msg string, args ...interface{})
encoder *rtp.Encoder
}
func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps uint) (*rtpSender, error) {
conn, err := net.Dial("udp", addr)
if err != nil {
return nil, err
}
s := &rtpSender{
log: log,
encoder: rtp.NewEncoder(conn, int(fps)),
}
return s, nil
}
func (s *rtpSender) send(d []byte) error {
var err error
if d != nil {
_, err = s.encoder.Write(d)
} else {
err = errors.New("no data to send provided")
}
return err
}