mirror of https://bitbucket.org/ausocean/av.git
402 lines
8.1 KiB
Go
402 lines
8.1 KiB
Go
/*
|
|
NAME
|
|
senders.go
|
|
|
|
DESCRIPTION
|
|
See Readme.md
|
|
|
|
AUTHORS
|
|
Saxon A. Nelson-Milton <saxon@ausocean.org>
|
|
Alan Noble <alan@ausocean.org>
|
|
|
|
LICENSE
|
|
revid is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean)
|
|
|
|
It is free software: you can redistribute it and/or modify them
|
|
under the terms of the GNU General Public License as published by the
|
|
Free Software Foundation, either version 3 of the License, or (at your
|
|
option) any later version.
|
|
|
|
It is distributed in the hope that it will be useful, but WITHOUT
|
|
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
|
for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
|
|
*/
|
|
|
|
package revid
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"os"
|
|
"os/exec"
|
|
|
|
"bitbucket.org/ausocean/av/rtmp"
|
|
"bitbucket.org/ausocean/av/stream/mts"
|
|
"bitbucket.org/ausocean/av/stream/rtp"
|
|
"bitbucket.org/ausocean/iot/pi/netsender"
|
|
"bitbucket.org/ausocean/utils/ring"
|
|
"bitbucket.org/ausocean/utils/smartlogger"
|
|
)
|
|
|
|
// loadSender is a destination to send a *ring.Chunk to.
|
|
// When a loadSender has finished using the *ring.Chunk
|
|
// it must be Closed.
|
|
type loadSender interface {
|
|
// load assigns the *ring.Chunk to the loadSender.
|
|
// The load call may fail, but must not mutate the
|
|
// the chunk.
|
|
load(*ring.Chunk) error
|
|
|
|
// send performs a destination-specific send
|
|
// operation. It must not mutate the chunk.
|
|
send() error
|
|
|
|
// release releases the *ring.Chunk.
|
|
release()
|
|
|
|
// close cleans up after use of the loadSender.
|
|
close() error
|
|
}
|
|
|
|
// restart is an optional interface for loadSenders that
|
|
// can restart their connection.
|
|
type restarter interface {
|
|
restart() error
|
|
}
|
|
|
|
// fileSender implements loadSender for a local file destination.
|
|
type fileSender struct {
|
|
file *os.File
|
|
|
|
chunk *ring.Chunk
|
|
}
|
|
|
|
func newFileSender(path string) (*fileSender, error) {
|
|
f, err := os.Create(path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &fileSender{file: f}, nil
|
|
}
|
|
|
|
func (s *fileSender) load(c *ring.Chunk) error {
|
|
s.chunk = c
|
|
return nil
|
|
}
|
|
|
|
func (s *fileSender) send() error {
|
|
_, err := s.chunk.WriteTo(s.file)
|
|
return err
|
|
}
|
|
|
|
func (s *fileSender) release() {
|
|
s.chunk.Close()
|
|
s.chunk = nil
|
|
}
|
|
|
|
func (s *fileSender) close() error {
|
|
return s.file.Close()
|
|
}
|
|
|
|
// httpSender implements loadSender for posting HTTP to NetReceiver
|
|
type httpSender struct {
|
|
client *netsender.Sender
|
|
|
|
log func(lvl int8, msg string, args ...interface{})
|
|
|
|
chunk *ring.Chunk
|
|
}
|
|
|
|
func newHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *httpSender {
|
|
return &httpSender{
|
|
client: ns,
|
|
log: log,
|
|
}
|
|
}
|
|
|
|
func (s *httpSender) load(c *ring.Chunk) error {
|
|
s.chunk = c
|
|
return nil
|
|
}
|
|
|
|
func (s *httpSender) send() error {
|
|
if s.chunk == nil {
|
|
// Do not retry with httpSender,
|
|
// so just return without error
|
|
// if the chunk has been cleared.
|
|
return nil
|
|
}
|
|
// Only send if "V0" is configured as an input.
|
|
send := false
|
|
ip := s.client.Param("ip")
|
|
pins := netsender.MakePins(ip, "V")
|
|
for i, pin := range pins {
|
|
if pin.Name == "V0" {
|
|
send = true
|
|
pins[i].Value = s.chunk.Len()
|
|
pins[i].Data = s.chunk.Bytes()
|
|
pins[i].MimeType = "video/mp2t"
|
|
break
|
|
}
|
|
}
|
|
var err error
|
|
var reply string
|
|
if send {
|
|
reply, _, err = s.client.Send(netsender.RequestRecv, pins)
|
|
}
|
|
|
|
err = extractMeta(reply, s)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func extractMeta(r string, s *httpSender) error {
|
|
dec, err := netsender.NewJsonDecoder(r)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
// Extract time from reply
|
|
t, err := dec.Int("ts")
|
|
if err != nil {
|
|
s.log(smartlogger.Warning, pkg+"No timestamp in reply")
|
|
} else {
|
|
s.log(smartlogger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t))
|
|
mts.TimeMeta(uint64(t))
|
|
}
|
|
|
|
// Extract gps from reply
|
|
g, err := dec.String("ll")
|
|
if err != nil {
|
|
s.log(smartlogger.Warning, pkg+"No gps in reply")
|
|
} else {
|
|
s.log(smartlogger.Debug, fmt.Sprintf("%v got gps: %v", pkg, g))
|
|
mts.GpsMeta(g)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *httpSender) release() {
|
|
// We will not retry, so release
|
|
// the chunk and clear it now.
|
|
s.chunk.Close()
|
|
s.chunk = nil
|
|
}
|
|
|
|
func (s *httpSender) close() error { return nil }
|
|
|
|
// ffmpegSender implements loadSender for an FFMPEG RTMP destination.
|
|
type ffmpegSender struct {
|
|
ffmpeg io.WriteCloser
|
|
|
|
chunk *ring.Chunk
|
|
}
|
|
|
|
func newFfmpegSender(url, framerate string) (*ffmpegSender, error) {
|
|
cmd := exec.Command(ffmpegPath,
|
|
"-f", "h264",
|
|
"-r", framerate,
|
|
"-i", "-",
|
|
"-f", "lavfi",
|
|
"-i", "aevalsrc=0",
|
|
"-fflags", "nobuffer",
|
|
"-vcodec", "copy",
|
|
"-acodec", "aac",
|
|
"-map", "0:0",
|
|
"-map", "1:0",
|
|
"-strict", "experimental",
|
|
"-f", "flv",
|
|
url,
|
|
)
|
|
w, err := cmd.StdinPipe()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
err = cmd.Start()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &ffmpegSender{ffmpeg: w}, nil
|
|
}
|
|
|
|
func (s *ffmpegSender) load(c *ring.Chunk) error {
|
|
s.chunk = c
|
|
return nil
|
|
}
|
|
|
|
func (s *ffmpegSender) send() error {
|
|
_, err := s.chunk.WriteTo(s.ffmpeg)
|
|
return err
|
|
}
|
|
|
|
func (s *ffmpegSender) release() {
|
|
s.chunk.Close()
|
|
s.chunk = nil
|
|
}
|
|
|
|
func (s *ffmpegSender) close() error {
|
|
return s.ffmpeg.Close()
|
|
}
|
|
|
|
// rtmpSender implements loadSender for a native RTMP destination.
|
|
type rtmpSender struct {
|
|
sess *rtmp.Session
|
|
|
|
url string
|
|
timeout uint
|
|
retries int
|
|
log func(lvl int8, msg string, args ...interface{})
|
|
|
|
chunk *ring.Chunk
|
|
}
|
|
|
|
var _ restarter = (*rtmpSender)(nil)
|
|
|
|
func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) {
|
|
var sess *rtmp.Session
|
|
var err error
|
|
for n := 0; n < retries; n++ {
|
|
sess = rtmp.NewSession(url, timeout)
|
|
err = sess.Open()
|
|
if err == nil {
|
|
break
|
|
}
|
|
log(smartlogger.Error, err.Error())
|
|
sess.Close()
|
|
if n < retries-1 {
|
|
log(smartlogger.Info, pkg+"retry rtmp connection")
|
|
}
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
s := &rtmpSender{
|
|
sess: sess,
|
|
url: url,
|
|
timeout: timeout,
|
|
retries: retries,
|
|
log: log,
|
|
}
|
|
return s, nil
|
|
}
|
|
|
|
func (s *rtmpSender) load(c *ring.Chunk) error {
|
|
s.chunk = c
|
|
return nil
|
|
}
|
|
|
|
func (s *rtmpSender) send() error {
|
|
_, err := s.chunk.WriteTo(s.sess)
|
|
return err
|
|
}
|
|
|
|
func (s *rtmpSender) release() {
|
|
s.chunk.Close()
|
|
s.chunk = nil
|
|
}
|
|
|
|
func (s *rtmpSender) restart() error {
|
|
err := s.sess.Close()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for n := 0; n < s.retries; n++ {
|
|
s.sess = rtmp.NewSession(s.url, s.timeout)
|
|
err = s.sess.Open()
|
|
if err == nil {
|
|
break
|
|
}
|
|
s.log(smartlogger.Error, err.Error())
|
|
s.sess.Close()
|
|
if n < s.retries-1 {
|
|
s.log(smartlogger.Info, pkg+"retry rtmp connection")
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (s *rtmpSender) close() error {
|
|
return s.sess.Close()
|
|
}
|
|
|
|
// udpSender implements loadSender for a native udp destination.
|
|
type udpSender struct {
|
|
conn net.Conn
|
|
log func(lvl int8, msg string, args ...interface{})
|
|
chunk *ring.Chunk
|
|
}
|
|
|
|
func newUdpSender(addr string, log func(lvl int8, msg string, args ...interface{})) (*udpSender, error) {
|
|
conn, err := net.Dial("udp", addr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &udpSender{
|
|
conn: conn,
|
|
log: log,
|
|
}, nil
|
|
}
|
|
|
|
func (s *udpSender) load(c *ring.Chunk) error {
|
|
s.chunk = c
|
|
return nil
|
|
}
|
|
|
|
func (s *udpSender) send() error {
|
|
_, err := s.chunk.WriteTo(s.conn)
|
|
return err
|
|
}
|
|
|
|
func (s *udpSender) release() {
|
|
s.chunk.Close()
|
|
s.chunk = nil
|
|
}
|
|
|
|
func (s *udpSender) close() error { return nil }
|
|
|
|
// TODO: Write restart func for rtpSender
|
|
// rtpSender implements loadSender for a native udp destination with rtp packetization.
|
|
type rtpSender struct {
|
|
log func(lvl int8, msg string, args ...interface{})
|
|
chunk *ring.Chunk
|
|
encoder *rtp.Encoder
|
|
}
|
|
|
|
func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps int) (*rtpSender, error) {
|
|
conn, err := net.Dial("udp", addr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
s := &rtpSender{
|
|
log: log,
|
|
encoder: rtp.NewEncoder(conn, fps),
|
|
}
|
|
return s, nil
|
|
}
|
|
|
|
func (s *rtpSender) load(c *ring.Chunk) error {
|
|
s.chunk = c
|
|
return nil
|
|
}
|
|
|
|
func (s *rtpSender) send() error {
|
|
_, err := s.chunk.WriteTo(s.encoder)
|
|
return err
|
|
}
|
|
|
|
func (s *rtpSender) release() {
|
|
s.chunk.Close()
|
|
s.chunk = nil
|
|
}
|
|
|
|
func (s *rtpSender) close() error { return nil }
|