mirror of https://bitbucket.org/ausocean/av.git
cmd/vidforward & revid: initial implementation of vidforwarding software
This commit is contained in:
parent
f9fddcf0e8
commit
46e97debd5
|
@ -0,0 +1,307 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"bitbucket.org/ausocean/av/codec/codecutil"
|
||||||
|
"bitbucket.org/ausocean/av/container/mts"
|
||||||
|
"bitbucket.org/ausocean/av/revid"
|
||||||
|
"bitbucket.org/ausocean/av/revid/config"
|
||||||
|
"bitbucket.org/ausocean/iot/pi/netlogger"
|
||||||
|
"bitbucket.org/ausocean/iot/pi/netsender"
|
||||||
|
"bitbucket.org/ausocean/utils/logging"
|
||||||
|
"gopkg.in/natefinch/lumberjack.v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultPort = "8080"
|
||||||
|
defaultHost = ""
|
||||||
|
)
|
||||||
|
|
||||||
|
// Logging configuration.
|
||||||
|
const (
|
||||||
|
logPath = "/var/log/netsender/netsender.log"
|
||||||
|
logMaxSize = 500 // MB
|
||||||
|
logMaxBackup = 10
|
||||||
|
logMaxAge = 28 // days
|
||||||
|
logVerbosity = logging.Info
|
||||||
|
logSuppress = true
|
||||||
|
)
|
||||||
|
|
||||||
|
// Misc constants.
|
||||||
|
const (
|
||||||
|
netSendRetryTime = 5 * time.Second
|
||||||
|
defaultSleepTime = 60 // Seconds
|
||||||
|
profilePath = "rv.prof"
|
||||||
|
pkg = "rv: "
|
||||||
|
runPreDelay = 20 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
type forwardHandler struct {
|
||||||
|
actives map[string]*revid.Revid
|
||||||
|
log logging.Logger
|
||||||
|
mu sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *forwardHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
|
q := r.URL.Query()
|
||||||
|
ma := q.Get("ma")
|
||||||
|
|
||||||
|
rv := h.getActive(ma)
|
||||||
|
if r == nil {
|
||||||
|
h.errorLogWrite(w, "forward request mac not active, doing nothing", "mac", ma)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
const videoPin = "V0"
|
||||||
|
v := q.Get(videoPin)
|
||||||
|
if v == "" {
|
||||||
|
h.errorLogWrite(w, "forward request video pin V0 absent, or has no value")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
size, err := strconv.Atoi(v)
|
||||||
|
if err != nil {
|
||||||
|
h.errorLogWrite(w, "forward request video size can't be conerted to int", "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if size <= 0 {
|
||||||
|
h.errorLogWrite(w, "forward request video size invalid", "size", size)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
mtsClip, err := io.ReadAll(r.Body)
|
||||||
|
if err != nil {
|
||||||
|
h.errorLogWrite(w, "could not read forward request body", "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(mtsClip)%mts.PacketSize != 0 {
|
||||||
|
h.errorLogWrite(w, "invalid clip length", "length", len(mtsClip))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
h264Clip, err := mts.Extract(mtsClip)
|
||||||
|
if err != nil {
|
||||||
|
h.errorLogWrite(w, "could not extract h.264 from the MPEG-TS clip", "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, frame := range h264Clip.Frames() {
|
||||||
|
_, err := rv.Write(frame.Media)
|
||||||
|
if err != nil {
|
||||||
|
h.errorLogWrite(w, "could not write frame", "no.", i)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// writeError writes an error code in JSON format and logs it if in debug mode.
|
||||||
|
func (h *forwardHandler) errorLogWrite(w http.ResponseWriter, msg string, args ...interface{}) {
|
||||||
|
h.log.Error(msg, args...)
|
||||||
|
w.Header().Add("Content-Type", "application/json")
|
||||||
|
fmt.Fprint(w, `{"er":"`+msg+`"}`)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *forwardHandler) getActive(ma string) *revid.Revid {
|
||||||
|
h.mu.Lock()
|
||||||
|
defer h.mu.Unlock()
|
||||||
|
v, ok := h.actives[ma]
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *forwardHandler) addActive(ma, url string) error {
|
||||||
|
h.mu.Lock()
|
||||||
|
cfg := config.Config{
|
||||||
|
Logger: h.log,
|
||||||
|
Input: config.InputManual,
|
||||||
|
InputCodec: codecutil.H264_AU, // h264 access unit i.e. h264 frame.
|
||||||
|
Outputs: []uint8{config.OutputRTMP},
|
||||||
|
RTMPURL: url,
|
||||||
|
}
|
||||||
|
|
||||||
|
rv, err := revid.New(cfg, nil)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("coult not initialise revid: %w", err)
|
||||||
|
}
|
||||||
|
h.actives[ma] = rv
|
||||||
|
h.mu.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *forwardHandler) removeInactives(macs []string) {
|
||||||
|
h.mu.Lock()
|
||||||
|
actives:
|
||||||
|
for k := range h.actives {
|
||||||
|
for _, m := range macs {
|
||||||
|
if k == m {
|
||||||
|
continue actives
|
||||||
|
}
|
||||||
|
}
|
||||||
|
delete(h.actives, k)
|
||||||
|
}
|
||||||
|
h.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *forwardHandler) isActive(ma string) bool {
|
||||||
|
h.mu.Lock()
|
||||||
|
_, ok := h.actives[ma]
|
||||||
|
h.mu.Unlock()
|
||||||
|
return ok
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
host := flag.String("host", defaultHost, "Host ip to run video forwarder on.")
|
||||||
|
port := flag.String("port", defaultPort, "Port to run video forwarder on.")
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
|
if *host == "" || net.ParseIP(*host) == nil {
|
||||||
|
panic(fmt.Sprintf("invalid host, host: %s", *host))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create lumberjack logger to handle logging to file.
|
||||||
|
fileLog := &lumberjack.Logger{
|
||||||
|
Filename: logPath,
|
||||||
|
MaxSize: logMaxSize,
|
||||||
|
MaxBackups: logMaxBackup,
|
||||||
|
MaxAge: logMaxAge,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create netlogger to handle logging to cloud.
|
||||||
|
netLog := netlogger.New()
|
||||||
|
|
||||||
|
// Create logger that we call methods on to log, which in turn writes to the
|
||||||
|
// lumberjack and netloggers.
|
||||||
|
log := logging.New(logVerbosity, io.MultiWriter(fileLog, netLog), logSuppress)
|
||||||
|
|
||||||
|
log.Debug("initialising netsender client")
|
||||||
|
ns, err := netsender.New(log, nil, nil, nil)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(pkg + "could not initialise netsender client: " + err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
fh := &forwardHandler{log: log}
|
||||||
|
|
||||||
|
log.Debug("beginning main loop")
|
||||||
|
go run(ns, log, netLog, fh)
|
||||||
|
|
||||||
|
http.Handle("/recv", fh)
|
||||||
|
http.ListenAndServe(*host+":"+*port, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// run starts the main loop. This will run netsender on every pass of the loop
|
||||||
|
// (sleeping inbetween), check vars, and if changed, update revid as appropriate.
|
||||||
|
func run(ns *netsender.Sender, l logging.Logger, nl *netlogger.Logger, fh *forwardHandler) {
|
||||||
|
var vs int
|
||||||
|
for {
|
||||||
|
l.Debug("running netsender")
|
||||||
|
err := ns.Run()
|
||||||
|
if err != nil {
|
||||||
|
l.Warning(pkg+"Run Failed. Retrying...", "error", err.Error())
|
||||||
|
time.Sleep(netSendRetryTime)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
l.Debug("sending logs")
|
||||||
|
err = nl.Send(ns)
|
||||||
|
if err != nil {
|
||||||
|
l.Warning(pkg+"Logs could not be sent", "error", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
l.Debug("checking varsum")
|
||||||
|
newVs := ns.VarSum()
|
||||||
|
if vs == newVs {
|
||||||
|
sleep(ns, l)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
vs = newVs
|
||||||
|
l.Info("varsum changed", "vs", vs)
|
||||||
|
|
||||||
|
l.Debug("getting new vars")
|
||||||
|
vars, err := ns.Vars()
|
||||||
|
if err != nil {
|
||||||
|
l.Error(pkg+"netSender failed to get vars", "error", err.Error())
|
||||||
|
time.Sleep(netSendRetryTime)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
l.Debug("got new vars", "vars", vars)
|
||||||
|
|
||||||
|
// Check the actives variable to seee what MACs are active.
|
||||||
|
v, ok := vars["Actives"]
|
||||||
|
if !ok {
|
||||||
|
l.Warning("no actives variable in var map", "vars", vars)
|
||||||
|
}
|
||||||
|
|
||||||
|
pairs := strings.Split(v, ",")
|
||||||
|
macs := make([]string, 0, len(pairs))
|
||||||
|
for _, p := range pairs {
|
||||||
|
pair := strings.Split(p, "=")
|
||||||
|
if len(pair) != 2 {
|
||||||
|
l.Warning("invalid <mac>=<rtmp url> pair", "pair", pair)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
m := pair[0]
|
||||||
|
macs = append(macs, m)
|
||||||
|
if !isMac(m) {
|
||||||
|
l.Warning("invalid mac in Actives variable", "mac", m)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
r := fh.getActive(m)
|
||||||
|
if r == nil {
|
||||||
|
fh.addActive(m, pair[1])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fh.removeInactives(macs)
|
||||||
|
|
||||||
|
sleep(ns, l)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// sleep uses a delay to halt the program based on the monitoring period
|
||||||
|
// netsender parameter (mp) defined in the netsender.conf config.
|
||||||
|
func sleep(ns *netsender.Sender, l logging.Logger) {
|
||||||
|
l.Debug("sleeping")
|
||||||
|
t, err := strconv.Atoi(ns.Param("mp"))
|
||||||
|
if err != nil {
|
||||||
|
l.Error(pkg+"could not get sleep time, using default", "error", err)
|
||||||
|
t = defaultSleepTime
|
||||||
|
}
|
||||||
|
time.Sleep(time.Duration(t) * time.Second)
|
||||||
|
l.Debug("finished sleeping")
|
||||||
|
}
|
||||||
|
|
||||||
|
func isMac(m string) bool {
|
||||||
|
if len(m) != 17 || m == "00:00:00:00:00:00" {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i <= 15; i++ {
|
||||||
|
if (i+1)%3 == 0 && m[i] != ':' {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if (3-i)%3 != 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := strconv.ParseUint(m[i:i+2], 16, 64)
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
Binary file not shown.
|
@ -0,0 +1,24 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import "testing"
|
||||||
|
|
||||||
|
func TestIsMac(t *testing.T) {
|
||||||
|
tests := []struct{
|
||||||
|
in string
|
||||||
|
want bool
|
||||||
|
}{
|
||||||
|
{in: "00:00:00:00:00:00", want: false},
|
||||||
|
{in: "00000:00:00:00:01", want: false},
|
||||||
|
{in: "00:00:00:00000:01", want: false},
|
||||||
|
{in: "15:b5:c7:cg:87:92", want: false},
|
||||||
|
{in: "00:00:00:00:00", want: false},
|
||||||
|
{in: "7d:ac:cf:84:e8:01", want: true},
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, test := range tests {
|
||||||
|
got := isMac(test.in)
|
||||||
|
if test.want != got {
|
||||||
|
t.Errorf("did not get expected result for test %d\ngot: %v\nwant: %v", i, got, test.want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -81,3 +81,13 @@ func (l *ByteLexer) Lex(dst io.Writer, src io.Reader, d time.Duration) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Noop(dst io.Writer, src io.Reader, d time.Duration) error {
|
||||||
|
r := io.TeeReader(src,dst)
|
||||||
|
for {
|
||||||
|
_, err := io.ReadAll(r)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -30,6 +30,7 @@ const (
|
||||||
PCM = "pcm"
|
PCM = "pcm"
|
||||||
ADPCM = "adpcm"
|
ADPCM = "adpcm"
|
||||||
H264 = "h264"
|
H264 = "h264"
|
||||||
|
H264_AU = "h264_au"
|
||||||
H265 = "h265"
|
H265 = "h265"
|
||||||
MJPEG = "mjpeg"
|
MJPEG = "mjpeg"
|
||||||
JPEG = "jpeg"
|
JPEG = "jpeg"
|
||||||
|
@ -38,7 +39,7 @@ const (
|
||||||
// IsValid checks if a string is a known and valid codec in the right format.
|
// IsValid checks if a string is a known and valid codec in the right format.
|
||||||
func IsValid(s string) bool {
|
func IsValid(s string) bool {
|
||||||
switch s {
|
switch s {
|
||||||
case PCM, ADPCM, H264, H265, MJPEG, JPEG:
|
case PCM, ADPCM, H264, H264_AU, H265, MJPEG, JPEG:
|
||||||
return true
|
return true
|
||||||
default:
|
default:
|
||||||
return false
|
return false
|
||||||
|
|
|
@ -141,6 +141,10 @@ type Frame struct {
|
||||||
idx int // Index in the backing slice.
|
idx int // Index in the backing slice.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c* Clip) Frames() []Frame {
|
||||||
|
return c.frames
|
||||||
|
}
|
||||||
|
|
||||||
// Bytes returns the concatentated media bytes from each frame in the Clip c.
|
// Bytes returns the concatentated media bytes from each frame in the Clip c.
|
||||||
func (c *Clip) Bytes() []byte {
|
func (c *Clip) Bytes() []byte {
|
||||||
if c.backing == nil {
|
if c.backing == nil {
|
||||||
|
|
|
@ -44,6 +44,7 @@ const (
|
||||||
InputV4L
|
InputV4L
|
||||||
InputRTSP
|
InputRTSP
|
||||||
InputAudio
|
InputAudio
|
||||||
|
InputManual
|
||||||
|
|
||||||
// Outputs.
|
// Outputs.
|
||||||
OutputRTMP
|
OutputRTMP
|
||||||
|
@ -55,6 +56,7 @@ const (
|
||||||
|
|
||||||
// Codecs.
|
// Codecs.
|
||||||
H264
|
H264
|
||||||
|
H264_AU
|
||||||
H265
|
H265
|
||||||
MJPEG
|
MJPEG
|
||||||
JPEG
|
JPEG
|
||||||
|
|
|
@ -317,7 +317,7 @@ var Variables = []struct {
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Name: KeyInput,
|
Name: KeyInput,
|
||||||
Type: "enum:raspivid,raspistill,rtsp,v4l,file,audio",
|
Type: "enum:raspivid,raspistill,rtsp,v4l,file,audio,manual",
|
||||||
Update: func(c *Config, v string) {
|
Update: func(c *Config, v string) {
|
||||||
c.Input = parseEnum(
|
c.Input = parseEnum(
|
||||||
KeyInput,
|
KeyInput,
|
||||||
|
@ -329,6 +329,7 @@ var Variables = []struct {
|
||||||
"v4l": InputV4L,
|
"v4l": InputV4L,
|
||||||
"file": InputFile,
|
"file": InputFile,
|
||||||
"audio": InputAudio,
|
"audio": InputAudio,
|
||||||
|
"manual": InputManual,
|
||||||
},
|
},
|
||||||
c,
|
c,
|
||||||
)
|
)
|
||||||
|
@ -344,7 +345,7 @@ var Variables = []struct {
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Name: KeyInputCodec,
|
Name: KeyInputCodec,
|
||||||
Type: "enum:h264,h265,mjpeg,jpeg,pcm,adpcm",
|
Type: "enum:h264,h264_au,h265,mjpeg,jpeg,pcm,adpcm",
|
||||||
Update: func(c *Config, v string) {
|
Update: func(c *Config, v string) {
|
||||||
c.InputCodec = v
|
c.InputCodec = v
|
||||||
},
|
},
|
||||||
|
|
|
@ -40,7 +40,6 @@ import (
|
||||||
"bitbucket.org/ausocean/av/codec/jpeg"
|
"bitbucket.org/ausocean/av/codec/jpeg"
|
||||||
"bitbucket.org/ausocean/av/container/flv"
|
"bitbucket.org/ausocean/av/container/flv"
|
||||||
"bitbucket.org/ausocean/av/container/mts"
|
"bitbucket.org/ausocean/av/container/mts"
|
||||||
"bitbucket.org/ausocean/av/device"
|
|
||||||
"bitbucket.org/ausocean/av/device/file"
|
"bitbucket.org/ausocean/av/device/file"
|
||||||
"bitbucket.org/ausocean/av/device/geovision"
|
"bitbucket.org/ausocean/av/device/geovision"
|
||||||
"bitbucket.org/ausocean/av/device/raspistill"
|
"bitbucket.org/ausocean/av/device/raspistill"
|
||||||
|
@ -312,6 +311,12 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
|
||||||
case config.InputAudio:
|
case config.InputAudio:
|
||||||
r.cfg.Logger.Debug("using audio input")
|
r.cfg.Logger.Debug("using audio input")
|
||||||
err = r.setupAudio()
|
err = r.setupAudio()
|
||||||
|
|
||||||
|
case config.InputManual:
|
||||||
|
r.pipeReader, r.pipeWriter = io.Pipe()
|
||||||
|
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unrecognised input type: %v")
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not set lexer: %w", err)
|
return fmt.Errorf("could not set lexer: %w", err)
|
||||||
|
@ -339,6 +344,9 @@ func (r *Revid) setLexer(c string, isRTSP bool) error {
|
||||||
if isRTSP {
|
if isRTSP {
|
||||||
r.lexTo = h264.NewExtractor().Extract
|
r.lexTo = h264.NewExtractor().Extract
|
||||||
}
|
}
|
||||||
|
case codecutil.H264_AU:
|
||||||
|
r.cfg.Logger.Debug("using H.264 AU codec")
|
||||||
|
r.lexTo = codecutil.Noop
|
||||||
case codecutil.H265:
|
case codecutil.H265:
|
||||||
r.cfg.Logger.Debug("using H.265 codec")
|
r.cfg.Logger.Debug("using H.265 codec")
|
||||||
r.lexTo = h265.NewExtractor(false).Extract
|
r.lexTo = h265.NewExtractor(false).Extract
|
||||||
|
@ -363,13 +371,15 @@ func (r *Revid) setLexer(c string, isRTSP bool) error {
|
||||||
|
|
||||||
// processFrom is run as a routine to read from a input data source, lex and
|
// processFrom is run as a routine to read from a input data source, lex and
|
||||||
// then send individual access units to revid's encoders.
|
// then send individual access units to revid's encoders.
|
||||||
func (r *Revid) processFrom(in device.AVDevice, delay time.Duration) {
|
func (r *Revid) processFrom(delay time.Duration) {
|
||||||
defer r.wg.Done()
|
defer r.wg.Done()
|
||||||
|
|
||||||
err := in.Start()
|
if r.input != nil {
|
||||||
if err != nil {
|
err := r.input.Start()
|
||||||
r.err <- fmt.Errorf("could not start input device: %w", err)
|
if err != nil {
|
||||||
return
|
r.err <- fmt.Errorf("could not start input device: %w", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lex data from input device, in, until finished or an error is encountered.
|
// Lex data from input device, in, until finished or an error is encountered.
|
||||||
|
@ -382,7 +392,13 @@ func (r *Revid) processFrom(in device.AVDevice, delay time.Duration) {
|
||||||
w = ioext.MultiWriteCloser(r.filters[0], r.probe)
|
w = ioext.MultiWriteCloser(r.filters[0], r.probe)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = r.lexTo(w, in, delay)
|
var err error
|
||||||
|
if r.cfg.Input == config.InputManual {
|
||||||
|
err = r.lexTo(w, r.pipeReader, delay)
|
||||||
|
} else {
|
||||||
|
err = r.lexTo(w, r.input, delay)
|
||||||
|
}
|
||||||
|
|
||||||
switch err {
|
switch err {
|
||||||
case nil, io.EOF:
|
case nil, io.EOF:
|
||||||
r.cfg.Logger.Info("end of file")
|
r.cfg.Logger.Info("end of file")
|
||||||
|
@ -394,7 +410,7 @@ func (r *Revid) processFrom(in device.AVDevice, delay time.Duration) {
|
||||||
r.cfg.Logger.Info("finished reading input")
|
r.cfg.Logger.Info("finished reading input")
|
||||||
|
|
||||||
r.cfg.Logger.Debug("stopping input")
|
r.cfg.Logger.Debug("stopping input")
|
||||||
err = in.Stop()
|
err = r.input.Stop()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.err <- fmt.Errorf("could not stop input source: %w", err)
|
r.err <- fmt.Errorf("could not stop input source: %w", err)
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -102,6 +102,9 @@ type Revid struct {
|
||||||
|
|
||||||
// stop is used to signal stopping when looping an input.
|
// stop is used to signal stopping when looping an input.
|
||||||
stop chan struct{}
|
stop chan struct{}
|
||||||
|
|
||||||
|
pipeReader *io.PipeReader
|
||||||
|
pipeWriter *io.PipeWriter
|
||||||
}
|
}
|
||||||
|
|
||||||
// New returns a pointer to a new Revid with the desired configuration, and/or
|
// New returns a pointer to a new Revid with the desired configuration, and/or
|
||||||
|
@ -126,6 +129,10 @@ func (r *Revid) Bitrate() int {
|
||||||
return r.bitrate.Bitrate()
|
return r.bitrate.Bitrate()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *Revid) Write(p []byte) (int, error){
|
||||||
|
return r.pipeWriter.Write(p)
|
||||||
|
}
|
||||||
|
|
||||||
// Start invokes a Revid to start processing video from a defined input
|
// Start invokes a Revid to start processing video from a defined input
|
||||||
// and packetising (if theres packetization) to a defined output.
|
// and packetising (if theres packetization) to a defined output.
|
||||||
func (r *Revid) Start() error {
|
func (r *Revid) Start() error {
|
||||||
|
@ -155,7 +162,7 @@ func (r *Revid) Start() error {
|
||||||
|
|
||||||
r.cfg.Logger.Debug("starting input processing routine")
|
r.cfg.Logger.Debug("starting input processing routine")
|
||||||
r.wg.Add(1)
|
r.wg.Add(1)
|
||||||
go r.processFrom(r.input, d)
|
go r.processFrom(d)
|
||||||
|
|
||||||
r.running = true
|
r.running = true
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -68,6 +68,8 @@ type httpSender struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// newHttpSender returns a pointer to a new httpSender.
|
// newHttpSender returns a pointer to a new httpSender.
|
||||||
|
// report is callback that can be used to report the amount of data sent per write.
|
||||||
|
// This can be set to nil.
|
||||||
func newHTTPSender(ns *netsender.Sender, log logging.Logger, report func(sent int)) *httpSender {
|
func newHTTPSender(ns *netsender.Sender, log logging.Logger, report func(sent int)) *httpSender {
|
||||||
return &httpSender{
|
return &httpSender{
|
||||||
client: ns,
|
client: ns,
|
||||||
|
@ -82,7 +84,9 @@ func (s *httpSender) Write(d []byte) (int, error) {
|
||||||
err := httpSend(d, s.client, s.log)
|
err := httpSend(d, s.client, s.log)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
s.log.Debug("good send", "len", len(d))
|
s.log.Debug("good send", "len", len(d))
|
||||||
s.report(len(d))
|
if s.report != nil {
|
||||||
|
s.report(len(d))
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
s.log.Debug("bad send", "error", err)
|
s.log.Debug("bad send", "error", err)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue