Merged in revid-rtmp-url-slice (pull request #496)

revid: accept multiple RTMP outputs

Approved-by: Alan Noble
This commit is contained in:
Saxon Milton 2023-03-11 22:59:12 +00:00
commit a4754e5ead
9 changed files with 52 additions and 35 deletions

View File

@ -44,7 +44,7 @@ const fileName = "state.json"
// marshal/unmarshal overriding. // marshal/unmarshal overriding.
type BroadcastBasic struct { type BroadcastBasic struct {
MAC MAC
URL string URLs []string
Status string Status string
} }
@ -60,7 +60,7 @@ type ManagerBasic struct {
func (b Broadcast) MarshalJSON() ([]byte, error) { func (b Broadcast) MarshalJSON() ([]byte, error) {
return json.Marshal(BroadcastBasic{ return json.Marshal(BroadcastBasic{
MAC: b.mac, MAC: b.mac,
URL: b.url, URLs: b.urls,
Status: b.status, Status: b.status,
}) })
} }
@ -75,10 +75,10 @@ func (b *Broadcast) UnmarshalJSON(data []byte) error {
} }
b.mac = bm.MAC b.mac = bm.MAC
b.url = bm.URL b.urls = bm.URLs
b.status = bm.Status b.status = bm.Status
b.rv, err = newRevid(global.GetLogger(), b.url) b.rv, err = newRevid(global.GetLogger(), b.urls)
if err != nil { if err != nil {
return fmt.Errorf("could not populate RV field: %w", err) return fmt.Errorf("could not populate RV field: %w", err)
} }

View File

@ -58,11 +58,11 @@ func TestBroadcastMarshal(t *testing.T) {
{ {
in: Broadcast{ in: Broadcast{
mac: testMAC, mac: testMAC,
url: testURL, urls: []string{testURL},
status: statusActive, status: statusActive,
rv: newRevidForTest((*logging.TestLogger)(t), testURL, t), rv: newRevidForTest((*logging.TestLogger)(t), testURL, t),
}, },
expect: []byte("{\"MAC\":\"" + testMAC + "\",\"URL\":\"" + testURL + "\",\"Status\":\"" + statusActive + "\"}"), expect: []byte("{\"MAC\":\"" + testMAC + "\",\"URLs\":[\"" + testURL + "\"],\"Status\":\"" + statusActive + "\"}"),
}, },
} }
@ -91,11 +91,11 @@ func TestBroadcastUnmarshal(t *testing.T) {
{ {
expect: Broadcast{ expect: Broadcast{
mac: testMAC, mac: testMAC,
url: testURL, urls: []string{testURL},
status: statusActive, status: statusActive,
rv: newRevidForTest(logger, testURL, t), rv: newRevidForTest(logger, testURL, t),
}, },
in: []byte("{\"MAC\":\"" + testMAC + "\",\"URL\":\"" + testURL + "\",\"Status\":\"" + statusActive + "\"}"), in: []byte("{\"MAC\":\"" + testMAC + "\",\"URLs\":[\"" + testURL + "\"],\"Status\":\"" + statusActive + "\"}"),
}, },
} }
@ -127,7 +127,7 @@ func TestBroadcastManagerMarshal(t *testing.T) {
broadcasts: map[MAC]Broadcast{ broadcasts: map[MAC]Broadcast{
testMAC: Broadcast{ testMAC: Broadcast{
testMAC, testMAC,
testURL, []string{testURL},
statusSlate, statusSlate,
newRevidForTest((*logging.TestLogger)(t), testURL, t), newRevidForTest((*logging.TestLogger)(t), testURL, t),
}, },
@ -136,7 +136,7 @@ func TestBroadcastManagerMarshal(t *testing.T) {
log: logger, log: logger,
dogNotifier: newWatchdogNotifierForTest(t, logger), dogNotifier: newWatchdogNotifierForTest(t, logger),
}, },
expect: []byte("{\"Broadcasts\":{\"" + testMAC + "\":{\"MAC\":\"" + testMAC + "\",\"URL\":\"" + testURL + "\",\"Status\":\"" + statusSlate + "\"}},\"SlateExitSignals\":[\"" + testMAC + "\"]}"), expect: []byte("{\"Broadcasts\":{\"" + testMAC + "\":{\"MAC\":\"" + testMAC + "\",\"URLs\":[\"" + testURL + "\"],\"Status\":\"" + statusSlate + "\"}},\"SlateExitSignals\":[\"" + testMAC + "\"]}"),
}, },
} }
@ -163,12 +163,12 @@ func TestBroadcastManagerUnmarshal(t *testing.T) {
expect broadcastManager expect broadcastManager
}{ }{
{ {
in: []byte("{\"Broadcasts\":{\"" + testMAC + "\":{\"MAC\":\"" + testMAC + "\",\"URL\":\"" + testURL + "\",\"Status\":\"" + statusSlate + "\"}},\"SlateExitSignals\":[\"" + testMAC + "\"]}"), in: []byte("{\"Broadcasts\":{\"" + testMAC + "\":{\"MAC\":\"" + testMAC + "\",\"URLs\":[\"" + testURL + "\"],\"Status\":\"" + statusSlate + "\"}},\"SlateExitSignals\":[\"" + testMAC + "\"]}"),
expect: broadcastManager{ expect: broadcastManager{
broadcasts: map[MAC]Broadcast{ broadcasts: map[MAC]Broadcast{
testMAC: Broadcast{ testMAC: Broadcast{
testMAC, testMAC,
testURL, []string{testURL},
statusSlate, statusSlate,
newRevidForTest((*logging.TestLogger)(t), testURL, t), newRevidForTest((*logging.TestLogger)(t), testURL, t),
}, },
@ -239,7 +239,7 @@ func watchdogNotifiersEqual(w1, w2 watchdogNotifier) bool {
} }
func broadcastsEqual(b1, b2 Broadcast) bool { func broadcastsEqual(b1, b2 Broadcast) bool {
if b1.mac != b2.mac || b1.url != b2.url || b1.status != b2.status || if b1.mac != b2.mac || !reflect.DeepEqual(b1.urls, b2.urls) || b1.status != b2.status ||
((b1.rv == nil || b2.rv == nil) && b1.rv != b2.rv) { ((b1.rv == nil || b2.rv == nil) && b1.rv != b2.rv) {
return false return false
} }
@ -268,7 +268,7 @@ func configsEqual(cfg1, cfg2 config.Config) bool {
// newRevidForTest allows us to create revid in table driven test entry. // newRevidForTest allows us to create revid in table driven test entry.
func newRevidForTest(log logging.Logger, url string, t *testing.T) *revid.Revid { func newRevidForTest(log logging.Logger, url string, t *testing.T) *revid.Revid {
r, err := newRevid(log, url) r, err := newRevid(log, []string{url})
if err != nil { if err != nil {
t.Fatalf("could not create revid pipeline: %v", err) t.Fatalf("could not create revid pipeline: %v", err)
return nil return nil

View File

@ -80,7 +80,7 @@ const (
// Broadcast is representative of a broadcast to be forwarded. // Broadcast is representative of a broadcast to be forwarded.
type Broadcast struct { type Broadcast struct {
mac MAC // MAC address of the device from which the video is being received. mac MAC // MAC address of the device from which the video is being received.
url string // The destination youtube RTMP URL. urls []string // The destination youtube RTMP URLs.
status string // The broadcast status i.e. active or slate. status string // The broadcast status i.e. active or slate.
rv *revid.Revid // The revid pipeline which will handle forwarding to youtube. rv *revid.Revid // The revid pipeline which will handle forwarding to youtube.
} }
@ -281,12 +281,17 @@ func (m *broadcastManager) createOrUpdate(broadcast Broadcast) error {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
var outputs []uint8
for _ = range broadcast.urls {
outputs = append(outputs, config.OutputRTMP)
}
cfg := config.Config{ cfg := config.Config{
Logger: m.log, Logger: m.log,
Input: config.InputManual, Input: config.InputManual,
InputCodec: codecutil.H264_AU, InputCodec: codecutil.H264_AU,
Outputs: []uint8{config.OutputRTMP}, Outputs: outputs,
RTMPURL: broadcast.url, RTMPURL: broadcast.urls,
LogLevel: logging.Debug, LogLevel: logging.Debug,
} }

View File

@ -37,14 +37,14 @@ import (
var loggingLevel = logging.Info var loggingLevel = logging.Info
func newRevid(log logging.Logger, url string) (*revid.Revid, error) { func newRevid(log logging.Logger, urls []string) (*revid.Revid, error) {
return revid.New( return revid.New(
config.Config{ config.Config{
Logger: log, Logger: log,
Input: config.InputManual, Input: config.InputManual,
InputCodec: codecutil.H264_AU, InputCodec: codecutil.H264_AU,
Outputs: []uint8{config.OutputRTMP}, Outputs: []uint8{config.OutputRTMP},
RTMPURL: url, RTMPURL: urls,
LogLevel: loggingLevel, LogLevel: loggingLevel,
}, nil) }, nil)
} }

View File

@ -226,16 +226,16 @@ type Config struct {
// localhost:6970. MPEGT-TS packetization is used. // localhost:6970. MPEGT-TS packetization is used.
Outputs []uint8 Outputs []uint8
PSITime uint // Sets the time between a packet being sent. PSITime uint // Sets the time between a packet being sent.
Quantization uint // Quantization defines the quantization level, which will determine variable bitrate quality in the case of input from the Pi Camera. Quantization uint // Quantization defines the quantization level, which will determine variable bitrate quality in the case of input from the Pi Camera.
PoolCapacity uint // The number of bytes the pool buffer will occupy. PoolCapacity uint // The number of bytes the pool buffer will occupy.
PoolStartElementSize uint // The starting element size of the pool buffer from which element size will increase to accomodate frames. PoolStartElementSize uint // The starting element size of the pool buffer from which element size will increase to accomodate frames.
PoolWriteTimeout uint // The pool buffer write timeout in seconds. PoolWriteTimeout uint // The pool buffer write timeout in seconds.
RecPeriod float64 // How many seconds to record at a time. RecPeriod float64 // How many seconds to record at a time.
Rotation uint // Rotation defines the video rotation angle in degrees Raspivid input. Rotation uint // Rotation defines the video rotation angle in degrees Raspivid input.
RTMPURL string // RTMPURL specifies the Rtmp output destination URL. This must be defined if RTMP is to be used as an output. RTMPURL []string // RTMPURL specifies the RTMP output destination URLs. This must be defined if RTMP is to be used as an output.
RTPAddress string // RTPAddress defines the RTP output destination. RTPAddress string // RTPAddress defines the RTP output destination.
SampleRate uint // Samples a second (Hz). SampleRate uint // Samples a second (Hz).
Saturation int Saturation int
// Sharpness is the sharpness of capture image/video from a capture device. // Sharpness is the sharpness of capture image/video from a capture device.

View File

@ -166,7 +166,7 @@ func TestUpdate(t *testing.T) {
PoolCapacity: 100000, PoolCapacity: 100000,
PoolWriteTimeout: 50, PoolWriteTimeout: 50,
Rotation: 180, Rotation: 180,
RTMPURL: "rtmp://url", RTMPURL: []string{"rtmp://url"},
RTPAddress: "ip:port", RTPAddress: "ip:port",
Saturation: -10, Saturation: -10,
VBRBitrate: 300000, VBRBitrate: 300000,

View File

@ -582,9 +582,15 @@ var Variables = []struct {
Update: func(c *Config, v string) { c.Rotation = parseUint(KeyRotation, v, c) }, Update: func(c *Config, v string) { c.Rotation = parseUint(KeyRotation, v, c) },
}, },
{ {
Name: KeyRTMPURL, Name: KeyRTMPURL,
Type: typeString, Type: typeString,
Update: func(c *Config, v string) { c.RTMPURL = v }, Update: func(c *Config, v string) {
v = strings.ReplaceAll(v, " ", "")
split := strings.Split(v, ",")
for _, s := range split {
c.RTMPURL = append(c.RTMPURL, s)
}
},
}, },
{ {
Name: KeyRTPAddress, Name: KeyRTPAddress,

View File

@ -178,6 +178,7 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
// to mtsSenders if the output requires MPEGTS encoding, or flvSenders if the // to mtsSenders if the output requires MPEGTS encoding, or flvSenders if the
// output requires FLV encoding. // output requires FLV encoding.
var w io.WriteCloser var w io.WriteCloser
rtmpUrlIdx := 0
for _, out := range r.cfg.Outputs { for _, out := range r.cfg.Outputs {
switch out { switch out {
case config.OutputHTTP: case config.OutputHTTP:
@ -215,11 +216,16 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
mtsSenders = append(mtsSenders, w) mtsSenders = append(mtsSenders, w)
case config.OutputRTMP: case config.OutputRTMP:
r.cfg.Logger.Debug("using RTMP output") r.cfg.Logger.Debug("using RTMP output")
if rtmpUrlIdx > len(r.cfg.RTMPURL)-1 {
r.cfg.Logger.Warning("rtmp outputs exceed available rtmp urls")
break
}
pb := pool.NewBuffer(int(r.cfg.PoolStartElementSize), int(nElements), writeTimeout) pb := pool.NewBuffer(int(r.cfg.PoolStartElementSize), int(nElements), writeTimeout)
w, err := newRtmpSender(r.cfg.RTMPURL, rtmpConnectionMaxTries, pb, r.cfg.Logger, r.bitrate.Report) w, err := newRtmpSender(r.cfg.RTMPURL[rtmpUrlIdx], rtmpConnectionMaxTries, pb, r.cfg.Logger, r.bitrate.Report)
if err != nil { if err != nil {
r.cfg.Logger.Warning("rtmp connect error", "error", err.Error()) r.cfg.Logger.Warning("rtmp connect error", "error", err.Error())
} }
rtmpUrlIdx++
flvSenders = append(flvSenders, w) flvSenders = append(flvSenders, w)
} }
} }

View File

@ -203,7 +203,7 @@ func TestResetEncoderSenderSetup(t *testing.T) {
for testNum, test := range tests { for testNum, test := range tests {
// Create a new config and reset revid with it. // Create a new config and reset revid with it.
const dummyURL = "rtmp://dummy" const dummyURL = "rtmp://dummy"
c := config.Config{Logger: &testLogger{}, Outputs: test.outputs, RTMPURL: dummyURL} c := config.Config{Logger: &testLogger{}, Outputs: test.outputs, RTMPURL: []string{dummyURL}}
err := rv.setConfig(c) err := rv.setConfig(c)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v for test %v", err, testNum) t.Fatalf("unexpected error: %v for test %v", err, testNum)