2018-06-09 07:38:48 +03:00
/ *
NAME
senders . go
DESCRIPTION
See Readme . md
AUTHORS
Saxon A . Nelson - Milton < saxon @ ausocean . org >
Alan Noble < alan @ ausocean . org >
LICENSE
revid is Copyright ( C ) 2017 - 2018 the Australian Ocean Lab ( AusOcean )
It is free software : you can redistribute it and / or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation , either version 3 of the License , or ( at your
option ) any later version .
It is distributed in the hope that it will be useful , but WITHOUT
ANY WARRANTY ; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE . See the GNU General Public License
for more details .
You should have received a copy of the GNU General Public License
along with revid in gpl . txt . If not , see http : //www.gnu.org/licenses.
* /
package revid
import (
2019-08-25 10:44:06 +03:00
"errors"
2018-12-14 06:05:56 +03:00
"fmt"
2019-03-29 05:19:26 +03:00
"io"
2018-11-18 05:02:11 +03:00
"net"
2018-06-09 07:38:48 +03:00
"os"
2019-04-08 12:32:42 +03:00
"sync"
"time"
2018-06-09 07:38:48 +03:00
2019-03-01 06:00:06 +03:00
"github.com/Comcast/gots/packet"
2019-03-25 04:21:03 +03:00
"bitbucket.org/ausocean/av/container/mts"
"bitbucket.org/ausocean/av/protocol/rtmp"
"bitbucket.org/ausocean/av/protocol/rtp"
2018-06-15 10:12:29 +03:00
"bitbucket.org/ausocean/iot/pi/netsender"
2019-01-02 08:09:47 +03:00
"bitbucket.org/ausocean/utils/logger"
2020-04-09 09:25:01 +03:00
"bitbucket.org/ausocean/utils/ring"
2018-06-09 07:38:48 +03:00
)
2019-03-24 12:34:35 +03:00
// Log is used by the multiSender.
2019-03-24 12:31:25 +03:00
type Log func ( level int8 , message string , params ... interface { } )
2019-08-23 09:11:54 +03:00
// Sender ring buffer read timeouts.
const (
rtmpRBReadTimeout = 1 * time . Second
mtsRBReadTimeout = 1 * time . Second
2020-04-10 10:05:45 +03:00
maxBuffLen = 50000000
2019-08-23 09:11:54 +03:00
)
2020-04-10 19:32:44 +03:00
var (
adjustedRTMPRBElementSize int
adjustedMTSRBElementSize int
)
2019-04-02 06:15:36 +03:00
// httpSender provides an implemntation of io.Writer to perform sends to a http
// destination.
2019-04-01 04:37:28 +03:00
type httpSender struct {
2020-01-22 07:56:14 +03:00
client * netsender . Sender
log func ( lvl int8 , msg string , args ... interface { } )
report func ( sent int )
2019-02-16 06:56:51 +03:00
}
2019-05-20 18:15:54 +03:00
// newHttpSender returns a pointer to a new httpSender.
2020-01-22 07:56:14 +03:00
func newHttpSender ( ns * netsender . Sender , log func ( lvl int8 , msg string , args ... interface { } ) , report func ( sent int ) ) * httpSender {
2019-04-01 04:37:28 +03:00
return & httpSender {
2020-01-22 07:56:14 +03:00
client : ns ,
log : log ,
report : report ,
2019-02-16 06:56:51 +03:00
}
}
2019-04-02 06:15:36 +03:00
// Write implements io.Writer.
2019-04-01 04:37:28 +03:00
func ( s * httpSender ) Write ( d [ ] byte ) ( int , error ) {
2020-03-27 15:20:51 +03:00
s . log ( logger . Debug , "HTTP sending" )
2020-01-20 10:10:45 +03:00
err := httpSend ( d , s . client , s . log )
if err == nil {
2020-03-27 15:20:51 +03:00
s . log ( logger . Debug , "good send" , "len" , len ( d ) )
2020-01-22 07:56:14 +03:00
s . report ( len ( d ) )
2020-03-27 15:20:51 +03:00
} else {
s . log ( logger . Debug , "bad send" , "error" , err )
2020-01-20 10:10:45 +03:00
}
return len ( d ) , err
2019-02-16 06:56:51 +03:00
}
2019-04-08 12:32:42 +03:00
func ( s * httpSender ) Close ( ) error { return nil }
2019-04-01 04:37:28 +03:00
func httpSend ( d [ ] byte , client * netsender . Sender , log func ( lvl int8 , msg string , args ... interface { } ) ) error {
// Only send if "V0" is configured as an input.
send := false
ip := client . Param ( "ip" )
2020-03-27 15:20:51 +03:00
log ( logger . Debug , "making pins, and sending recv request" , "ip" , ip )
2019-04-01 04:37:28 +03:00
pins := netsender . MakePins ( ip , "V" )
for i , pin := range pins {
if pin . Name == "V0" {
send = true
pins [ i ] . Value = len ( d )
pins [ i ] . Data = d
pins [ i ] . MimeType = "video/mp2t"
break
}
}
if ! send {
return nil
}
var err error
var reply string
reply , _ , err = client . Send ( netsender . RequestRecv , pins )
if err != nil {
return err
}
2020-03-27 15:20:51 +03:00
log ( logger . Debug , "good request" , "reply" , reply )
2019-04-01 04:37:28 +03:00
return extractMeta ( reply , log )
}
// extractMeta looks at a reply at extracts any time or location data - then used
// to update time and location information in the mpegts encoder.
func extractMeta ( r string , log func ( lvl int8 , msg string , args ... interface { } ) ) error {
dec , err := netsender . NewJSONDecoder ( r )
if err != nil {
return nil
}
// Extract time from reply
t , err := dec . Int ( "ts" )
if err != nil {
2020-03-27 17:34:02 +03:00
log ( logger . Warning , "No timestamp in reply" )
2019-04-01 04:37:28 +03:00
} else {
2020-03-27 17:34:02 +03:00
log ( logger . Debug , fmt . Sprintf ( "got timestamp: %v" , t ) )
2019-08-26 06:59:07 +03:00
mts . RealTime . Set ( time . Unix ( int64 ( t ) , 0 ) )
2019-04-01 04:37:28 +03:00
}
// Extract location from reply
g , err := dec . String ( "ll" )
if err != nil {
2020-03-27 17:34:02 +03:00
log ( logger . Debug , "No location in reply" )
2019-04-01 04:37:28 +03:00
} else {
2020-03-27 17:34:02 +03:00
log ( logger . Debug , fmt . Sprintf ( "got location: %v" , g ) )
2019-04-01 04:37:28 +03:00
mts . Meta . Add ( "loc" , g )
}
return nil
}
2018-06-09 07:38:48 +03:00
// fileSender implements loadSender for a local file destination.
type fileSender struct {
file * os . File
2019-03-09 07:58:07 +03:00
data [ ] byte
2018-06-09 07:38:48 +03:00
}
2019-04-08 12:32:42 +03:00
func newFileSender ( path string ) ( * fileSender , error ) {
2018-06-09 07:38:48 +03:00
f , err := os . Create ( path )
if err != nil {
return nil , err
}
return & fileSender { file : f } , nil
}
2019-04-01 04:41:05 +03:00
// Write implements io.Writer.
func ( s * fileSender ) Write ( d [ ] byte ) ( int , error ) {
return s . file . Write ( d )
2018-06-09 07:38:48 +03:00
}
2019-04-08 12:32:42 +03:00
func ( s * fileSender ) Close ( ) error { return s . file . Close ( ) }
2018-06-09 07:38:48 +03:00
2019-04-08 12:32:42 +03:00
// mtsSender implements io.WriteCloser and provides sending capability specifically
2019-03-01 05:58:34 +03:00
// for use with MPEGTS packetization. It handles the construction of appropriately
2019-08-25 10:44:06 +03:00
// lengthed clips based on clip duration and PSI. It also accounts for
// discontinuities by setting the discontinuity indicator for the first packet of a clip.
2019-02-15 15:47:13 +03:00
type mtsSender struct {
2019-04-08 12:32:42 +03:00
dst io . WriteCloser
2019-04-01 04:20:11 +03:00
buf [ ] byte
2020-04-09 09:25:01 +03:00
ring * ring . Buffer
2019-04-01 04:20:11 +03:00
next [ ] byte
pkt packet . Packet
repairer * mts . DiscontinuityRepairer
curPid int
2019-08-25 10:44:06 +03:00
clipDur time . Duration
prev time . Time
2019-04-18 10:25:48 +03:00
done chan struct { }
2019-04-08 12:32:42 +03:00
log func ( lvl int8 , msg string , args ... interface { } )
wg sync . WaitGroup
2019-03-29 05:19:26 +03:00
}
2019-03-01 05:58:34 +03:00
// newMtsSender returns a new mtsSender.
2020-04-09 09:25:01 +03:00
func newMtsSender ( dst io . WriteCloser , log func ( lvl int8 , msg string , args ... interface { } ) , rb * ring . Buffer , clipDur time . Duration ) * mtsSender {
2019-04-08 12:32:42 +03:00
s := & mtsSender {
2019-04-01 04:32:15 +03:00
dst : dst ,
2019-02-15 04:31:07 +03:00
repairer : mts . NewDiscontinuityRepairer ( ) ,
2019-04-08 12:32:42 +03:00
log : log ,
2019-08-25 10:44:06 +03:00
ring : rb ,
2019-04-18 10:25:48 +03:00
done : make ( chan struct { } ) ,
2019-08-25 10:44:06 +03:00
clipDur : clipDur ,
2019-02-15 04:31:07 +03:00
}
2019-04-08 12:32:42 +03:00
s . wg . Add ( 1 )
go s . output ( )
return s
2019-02-15 04:31:07 +03:00
}
2019-04-10 05:45:46 +03:00
// output starts an mtsSender's data handling routine.
2019-04-08 12:32:42 +03:00
func ( s * mtsSender ) output ( ) {
2020-04-09 09:25:01 +03:00
var chunk * ring . Chunk
2019-04-08 12:32:42 +03:00
for {
select {
2019-04-18 10:25:48 +03:00
case <- s . done :
2020-03-27 17:34:02 +03:00
s . log ( logger . Info , "terminating sender output routine" )
2019-04-08 12:32:42 +03:00
defer s . wg . Done ( )
return
default :
2020-04-09 09:25:01 +03:00
// If chunk is nil then we're ready to get another from the ringBuffer.
if chunk == nil {
2019-04-08 12:32:42 +03:00
var err error
2020-04-09 09:25:01 +03:00
chunk , err = s . ring . Next ( mtsRBReadTimeout )
2019-04-08 12:32:42 +03:00
switch err {
2019-04-15 03:55:35 +03:00
case nil , io . EOF :
2019-04-10 05:49:28 +03:00
continue
2020-04-09 09:25:01 +03:00
case ring . ErrTimeout :
s . log ( logger . Debug , "mtsSender: ring buffer read timeout" )
2019-04-08 12:32:42 +03:00
continue
default :
2020-03-27 17:34:02 +03:00
s . log ( logger . Error , "unexpected error" , "error" , err . Error ( ) )
2019-04-08 12:32:42 +03:00
continue
}
2019-04-10 05:49:28 +03:00
}
2020-04-09 09:25:01 +03:00
err := s . repairer . Repair ( chunk . Bytes ( ) )
2019-04-10 05:49:28 +03:00
if err != nil {
2020-04-09 09:25:01 +03:00
chunk . Close ( )
chunk = nil
2019-04-10 05:49:28 +03:00
continue
}
2020-04-09 09:25:01 +03:00
s . log ( logger . Debug , "mtsSender: writing" )
_ , err = s . dst . Write ( chunk . Bytes ( ) )
2019-04-10 05:49:28 +03:00
if err != nil {
2020-03-27 15:20:51 +03:00
s . log ( logger . Debug , "failed write, repairing MTS" , "error" , err )
2019-04-10 05:49:28 +03:00
s . repairer . Failed ( )
continue
2020-03-27 15:20:51 +03:00
} else {
s . log ( logger . Debug , "good write" )
2019-04-08 12:32:42 +03:00
}
2020-04-09 09:25:01 +03:00
chunk . Close ( )
chunk = nil
2019-04-08 12:32:42 +03:00
}
}
}
2019-04-10 05:45:46 +03:00
// Write implements io.Writer.
func ( s * mtsSender ) Write ( d [ ] byte ) ( int , error ) {
2019-08-25 10:44:06 +03:00
if len ( d ) < mts . PacketSize {
return 0 , errors . New ( "do not have full MTS packet" )
}
2019-04-10 05:45:46 +03:00
if s . next != nil {
s . buf = append ( s . buf , s . next ... )
}
bytes := make ( [ ] byte , len ( d ) )
copy ( bytes , d )
s . next = bytes
2019-08-25 10:44:06 +03:00
p , _ := mts . PID ( bytes )
s . curPid = int ( p )
if time . Now ( ) . Sub ( s . prev ) >= s . clipDur && s . curPid == mts . PatPid && len ( s . buf ) > 0 {
s . prev = time . Now ( )
2019-11-22 05:19:21 +03:00
n , err := s . ring . Write ( s . buf )
2019-11-08 02:36:51 +03:00
if err == nil {
s . ring . Flush ( )
}
2019-04-10 05:45:46 +03:00
if err != nil {
2020-03-27 17:34:02 +03:00
s . log ( logger . Warning , "ringBuffer write error" , "error" , err . Error ( ) , "n" , n , "size" , len ( s . buf ) )
2020-04-09 09:25:01 +03:00
if err == ring . ErrTooLong {
2020-04-10 19:32:44 +03:00
adjustedMTSRBElementSize = len ( d ) * 2
numElements := maxBuffLen / adjustedMTSRBElementSize
s . ring = ring . NewBuffer ( maxBuffLen / adjustedMTSRBElementSize , adjustedMTSRBElementSize , 5 * time . Second )
s . log ( logger . Info , "adjusted MTS ring buffer element size" , "new size" , adjustedMTSRBElementSize , "num elements" , numElements , "size(MB)" , numElements * adjustedMTSRBElementSize )
2020-04-09 09:25:01 +03:00
}
2019-04-10 05:45:46 +03:00
}
s . buf = s . buf [ : 0 ]
}
return len ( d ) , nil
}
// Close implements io.Closer.
func ( s * mtsSender ) Close ( ) error {
2020-03-27 15:20:51 +03:00
s . log ( logger . Debug , "closing sender output routine" )
2019-04-18 10:25:48 +03:00
close ( s . done )
2019-04-10 05:45:46 +03:00
s . wg . Wait ( )
2020-03-27 15:20:51 +03:00
s . log ( logger . Info , "sender output routine closed" )
2019-04-10 05:45:46 +03:00
return nil
}
2018-06-09 07:38:48 +03:00
// rtmpSender implements loadSender for a native RTMP destination.
type rtmpSender struct {
2020-01-22 07:56:14 +03:00
conn * rtmp . Conn
url string
timeout uint
retries int
log func ( lvl int8 , msg string , args ... interface { } )
2020-04-09 09:25:01 +03:00
ring * ring . Buffer
2020-01-22 07:56:14 +03:00
done chan struct { }
wg sync . WaitGroup
report func ( sent int )
2018-06-09 07:38:48 +03:00
}
2020-04-09 09:25:01 +03:00
func newRtmpSender ( url string , timeout uint , retries int , rb * ring . Buffer , log func ( lvl int8 , msg string , args ... interface { } ) , report func ( sent int ) ) ( * rtmpSender , error ) {
2019-01-19 05:40:38 +03:00
var conn * rtmp . Conn
2018-06-09 07:38:48 +03:00
var err error
for n := 0 ; n < retries ; n ++ {
2019-01-19 05:40:38 +03:00
conn , err = rtmp . Dial ( url , timeout , log )
2018-06-09 07:38:48 +03:00
if err == nil {
break
}
2020-03-27 15:20:51 +03:00
log ( logger . Error , "dial error" , "error" , err )
2018-06-09 07:38:48 +03:00
if n < retries - 1 {
2020-03-27 17:34:02 +03:00
log ( logger . Info , "retrying dial" )
2018-06-09 07:38:48 +03:00
}
}
s := & rtmpSender {
2020-01-22 07:56:14 +03:00
conn : conn ,
url : url ,
timeout : timeout ,
retries : retries ,
log : log ,
ring : rb ,
done : make ( chan struct { } ) ,
report : report ,
2018-06-09 07:38:48 +03:00
}
2019-04-15 04:18:12 +03:00
s . wg . Add ( 1 )
go s . output ( )
2019-03-13 10:44:00 +03:00
return s , err
2018-06-09 07:38:48 +03:00
}
2019-04-15 04:18:12 +03:00
// output starts an mtsSender's data handling routine.
func ( s * rtmpSender ) output ( ) {
2020-04-09 09:25:01 +03:00
var chunk * ring . Chunk
2019-04-15 04:18:12 +03:00
for {
select {
2019-04-18 10:25:48 +03:00
case <- s . done :
2020-03-27 17:34:02 +03:00
s . log ( logger . Info , "terminating sender output routine" )
2019-04-15 04:18:12 +03:00
defer s . wg . Done ( )
return
default :
2020-04-09 09:25:01 +03:00
// If chunk is nil then we're ready to get another from the ring buffer.
if chunk == nil {
2019-04-15 04:18:12 +03:00
var err error
2020-04-09 09:25:01 +03:00
chunk , err = s . ring . Next ( rtmpRBReadTimeout )
2019-04-15 04:18:12 +03:00
switch err {
case nil , io . EOF :
continue
2020-04-09 09:25:01 +03:00
case ring . ErrTimeout :
s . log ( logger . Debug , "rtmpSender: ring buffer read timeout" )
2019-04-15 04:18:12 +03:00
continue
default :
2020-03-27 17:34:02 +03:00
s . log ( logger . Error , "unexpected error" , "error" , err . Error ( ) )
2019-04-15 04:18:12 +03:00
continue
}
}
if s . conn == nil {
2020-03-27 17:34:02 +03:00
s . log ( logger . Warning , "no rtmp connection, re-dialing" )
2019-04-15 04:18:12 +03:00
err := s . restart ( )
if err != nil {
2020-03-27 17:34:02 +03:00
s . log ( logger . Warning , "could not restart connection" , "error" , err )
2019-04-15 04:18:12 +03:00
continue
}
}
2020-04-09 09:25:01 +03:00
_ , err := s . conn . Write ( chunk . Bytes ( ) )
2019-04-15 04:50:36 +03:00
switch err {
case nil , rtmp . ErrInvalidFlvTag :
2020-03-27 15:20:51 +03:00
s . log ( logger . Debug , "good write to conn" )
2019-04-15 04:50:36 +03:00
default :
2020-03-27 17:34:02 +03:00
s . log ( logger . Warning , "send error, re-dialing" , "error" , err )
2019-04-15 04:18:12 +03:00
err = s . restart ( )
if err != nil {
2020-03-27 17:34:02 +03:00
s . log ( logger . Warning , "could not restart connection" , "error" , err )
2019-04-15 04:18:12 +03:00
}
continue
}
2020-04-09 09:25:01 +03:00
chunk . Close ( )
chunk = nil
2019-04-15 04:18:12 +03:00
}
}
}
2019-03-29 08:54:47 +03:00
// Write implements io.Writer.
2019-03-29 05:19:26 +03:00
func ( s * rtmpSender ) Write ( d [ ] byte ) ( int , error ) {
2020-03-27 15:20:51 +03:00
s . log ( logger . Debug , "writing to ring buffer" )
2019-04-18 10:25:48 +03:00
_ , err := s . ring . Write ( d )
2019-11-08 02:36:51 +03:00
if err == nil {
s . ring . Flush ( )
2020-03-27 15:20:51 +03:00
s . log ( logger . Debug , "good ring buffer write" , "len" , len ( d ) )
} else {
2020-03-27 17:34:02 +03:00
s . log ( logger . Warning , "ring buffer write error" , "error" , err . Error ( ) )
2020-04-09 09:25:01 +03:00
if err == ring . ErrTooLong {
2020-04-10 19:32:44 +03:00
adjustedRTMPRBElementSize = len ( d ) * 2
numElements := maxBuffLen / adjustedRTMPRBElementSize
s . ring = ring . NewBuffer ( numElements , adjustedRTMPRBElementSize , 5 * time . Second )
s . log ( logger . Info , "adjusted RTMP ring buffer element size" , "new size" , adjustedRTMPRBElementSize , "num elements" , numElements , "size(MB)" , numElements * adjustedRTMPRBElementSize )
2020-04-09 09:25:01 +03:00
}
2019-03-03 10:54:54 +03:00
}
2020-01-22 07:56:14 +03:00
s . report ( len ( d ) )
2019-04-15 04:18:12 +03:00
return len ( d ) , nil
2018-06-09 07:38:48 +03:00
}
func ( s * rtmpSender ) restart ( ) error {
2019-04-15 04:50:36 +03:00
s . close ( )
2019-03-03 10:11:35 +03:00
var err error
2018-06-09 07:38:48 +03:00
for n := 0 ; n < s . retries ; n ++ {
2020-03-27 15:20:51 +03:00
s . log ( logger . Debug , "dialing" , "dials" , n )
2019-01-19 05:40:38 +03:00
s . conn , err = rtmp . Dial ( s . url , s . timeout , s . log )
2018-06-09 07:38:48 +03:00
if err == nil {
break
}
2020-03-27 15:20:51 +03:00
s . log ( logger . Error , "dial error" , "error" , err )
2018-06-09 07:38:48 +03:00
if n < s . retries - 1 {
2020-03-27 17:34:02 +03:00
s . log ( logger . Info , "retry rtmp connection" )
2018-06-09 07:38:48 +03:00
}
}
return err
}
2019-04-08 12:32:42 +03:00
func ( s * rtmpSender ) Close ( ) error {
2020-03-27 15:20:51 +03:00
s . log ( logger . Debug , "closing output routine" )
2019-04-18 10:25:48 +03:00
if s . done != nil {
close ( s . done )
2019-04-15 04:50:36 +03:00
}
s . wg . Wait ( )
2020-03-27 15:20:51 +03:00
s . log ( logger . Info , "output routine closed" )
2019-04-15 04:50:36 +03:00
return s . close ( )
}
func ( s * rtmpSender ) close ( ) error {
2020-03-27 15:20:51 +03:00
s . log ( logger . Debug , "closing connection" )
2019-04-10 05:50:39 +03:00
if s . conn == nil {
return nil
2019-03-03 10:54:54 +03:00
}
2019-04-10 05:50:39 +03:00
return s . conn . Close ( )
2018-06-09 07:38:48 +03:00
}
2018-11-18 05:02:11 +03:00
2018-11-25 16:15:38 +03:00
// TODO: Write restart func for rtpSender
// rtpSender implements loadSender for a native udp destination with rtp packetization.
2018-11-25 15:40:38 +03:00
type rtpSender struct {
log func ( lvl int8 , msg string , args ... interface { } )
encoder * rtp . Encoder
2019-03-09 07:58:07 +03:00
data [ ] byte
2020-01-23 02:37:28 +03:00
report func ( sent int )
2018-11-25 15:40:38 +03:00
}
2020-01-23 02:37:28 +03:00
func newRtpSender ( addr string , log func ( lvl int8 , msg string , args ... interface { } ) , fps uint , report func ( sent int ) ) ( * rtpSender , error ) {
2018-11-25 15:40:38 +03:00
conn , err := net . Dial ( "udp" , addr )
if err != nil {
return nil , err
}
s := & rtpSender {
log : log ,
2018-12-10 02:09:20 +03:00
encoder : rtp . NewEncoder ( conn , int ( fps ) ) ,
2020-01-23 02:37:28 +03:00
report : report ,
2018-11-25 15:40:38 +03:00
}
return s , nil
}
2019-04-02 05:23:42 +03:00
// Write implements io.Writer.
func ( s * rtpSender ) Write ( d [ ] byte ) ( int , error ) {
2019-04-09 09:14:18 +03:00
s . data = make ( [ ] byte , len ( d ) )
copy ( s . data , d )
_ , err := s . encoder . Write ( s . data )
if err != nil {
2020-03-27 17:34:02 +03:00
s . log ( logger . Warning , "rtpSender: write error" , err . Error ( ) )
2019-04-09 09:14:18 +03:00
}
2020-01-23 02:37:28 +03:00
s . report ( len ( d ) )
2019-04-09 09:14:18 +03:00
return len ( d ) , nil
2019-02-15 04:31:07 +03:00
}
2019-04-08 12:32:42 +03:00
func ( s * rtpSender ) Close ( ) error { return nil }