This commit is contained in:
Alan Noble 2018-06-22 15:57:33 +09:30
commit 60c09c2800
12 changed files with 157 additions and 143 deletions

View File

@ -6,8 +6,8 @@ DESCRIPTION
See Readme.md See Readme.md
AUTHORS AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org> Saxon A. Nelson-Milton <saxon@ausocean.org>
Jack Richardson <jack@ausocean.org> Jack Richardson <jack@ausocean.org>
LICENSE LICENSE
RevidCLI.go is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean) RevidCLI.go is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean)
@ -20,10 +20,10 @@ LICENSE
It is distributed in the hope that it will be useful, but WITHOUT It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details. for more details.
You should have received a copy of the GNU General Public License You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/ */
package main package main
@ -224,7 +224,10 @@ func main() {
fmt.Println("Bad vertical flip option!") fmt.Println("Bad vertical flip option!")
} }
config.FramesPerClip = *configFlags[framesPerClipPtr] fpc, err := strconv.Atoi(*configFlags[framesPerClipPtr])
if err == nil && fpc > 0 {
config.FramesPerClip = fpc
}
config.RtmpUrl = *configFlags[rtmpUrlPtr] config.RtmpUrl = *configFlags[rtmpUrlPtr]
config.Bitrate = *configFlags[bitratePtr] config.Bitrate = *configFlags[bitratePtr]
config.OutputFileName = *configFlags[outputFileNamePtr] config.OutputFileName = *configFlags[outputFileNamePtr]
@ -241,7 +244,7 @@ func main() {
var vs int var vs int
if *useNetsender { if *useNetsender {
// initialize NetSender and use NetSender's logger // initialize NetSender and use NetSender's logger
config.Logger = netsender.GetLogger() config.Logger = netsender.Logger()
var err error var err error
err = ns.Init(nil, nil, nil) err = ns.Init(nil, nil, nil)
@ -267,7 +270,7 @@ func main() {
continue continue
} }
if vs != ns.GetVarSum() { if vs != ns.VarSum() {
// vars changed // vars changed
vars, err := ns.Vars() vars, err := ns.Vars()
if err != nil { if err != nil {
@ -275,7 +278,7 @@ func main() {
time.Sleep(netSendRetryTime) time.Sleep(netSendRetryTime)
continue continue
} }
vs = ns.GetVarSum() vs = ns.VarSum()
if vars["mode"] == "Paused" { if vars["mode"] == "Paused" {
if !paused { if !paused {
config.Logger.Log(progName, "Info", "Pausing revid") config.Logger.Log(progName, "Info", "Pausing revid")
@ -306,7 +309,7 @@ func sendTo(ns *netsender.Sender) error {
inputs := netsender.MakePins(ns.GetConfigParam("ip"), "X") inputs := netsender.MakePins(ns.GetConfigParam("ip"), "X")
for i, pin := range inputs { for i, pin := range inputs {
if pin.Name == "X23" { if pin.Name == "X23" {
inputs[i].Value = int(revidInst.GetBitrate()) inputs[i].Value = revidInst.Bitrate()
} }
} }
@ -357,9 +360,9 @@ func updateRevid(vars map[string]string, stop bool) {
for key, value := range vars { for key, value := range vars {
switch key { switch key {
case "FramesPerClip": case "FramesPerClip":
asInt, err := strconv.Atoi(value) fpc, err := strconv.Atoi(value)
if asInt > 0 && err == nil { if fpc > 0 && err == nil {
config.FramesPerClip = value config.FramesPerClip = fpc
} }
case "RtmpUrl": case "RtmpUrl":
config.RtmpUrl = value config.RtmpUrl = value

View File

@ -62,15 +62,15 @@ type flvGenerator struct {
isGenerating bool isGenerating bool
} }
// GetInputChan returns the input channel to the generator. This is where the // InputChan returns the input channel to the generator. This is where the
// raw data frames are entered into the generator // raw data frames are entered into the generator
func (g *flvGenerator) GetInputChan() chan []byte { func (g *flvGenerator) InputChan() chan []byte {
return g.inputChan return g.inputChan
} }
// GetOutputChan retuns the output chan of the generator - this is where the // OutputChan retuns the output chan of the generator - this is where the
// flv packets (more specifically tags) are outputted. // flv packets (more specifically tags) are outputted.
func (g *flvGenerator) GetOutputChan() chan []byte { func (g *flvGenerator) OutputChan() <-chan []byte {
return g.outputChan return g.outputChan
} }

View File

@ -28,8 +28,8 @@ LICENSE
package generator package generator
type Generator interface { type Generator interface {
GetInputChan() chan []byte InputChan() chan []byte
GetOutputChan() chan []byte OutputChan() <-chan []byte
Start() Start()
Stop() Stop()
} }

View File

@ -86,15 +86,15 @@ type tsGenerator struct {
isGenerating bool isGenerating bool
} }
// getInputChan returns a handle to the nalInputChan (inputChan) so that nal units // InputChan returns a handle to the nalInputChan (inputChan) so that nal units
// can be passed to the generator and processed // can be passed to the generator and processed
func (g *tsGenerator) GetInputChan() chan []byte { func (g *tsGenerator) InputChan() chan []byte {
return g.nalInputChan return g.nalInputChan
} }
// GetOutputChan returns a handle to the generator output chan where the mpegts // OutputChan returns a handle to the generator output chan where the mpegts
// packets will show up once ready to go // packets will show up once ready to go
func (g *tsGenerator) GetOutputChan() chan []byte { func (g *tsGenerator) OutputChan() <-chan []byte {
return g.outputChan return g.outputChan
} }

View File

@ -37,9 +37,9 @@ const (
outputBufferSize = 10000 outputBufferSize = 10000
) )
// h264Parser provides properties and methods to allow for the parsing of a // H264 provides properties and methods to allow for the parsing of a
// h264 stream - i.e. to allow extraction of the individual access units // h264 stream - i.e. to allow extraction of the individual access units
type h264Parser struct { type H264 struct {
inputBuffer []byte inputBuffer []byte
isParsing bool isParsing bool
parserOutputChanRef chan []byte parserOutputChanRef chan []byte
@ -48,9 +48,9 @@ type h264Parser struct {
delay uint delay uint
} }
// NewH264Parser returns an instance of the h264Parser struct // NewH264Parser returns an instance of the H264 struct
func NewH264Parser() (p *h264Parser) { func NewH264Parser() (p *H264) {
p = new(h264Parser) p = new(H264)
p.isParsing = true p.isParsing = true
p.inputChan = make(chan byte, inputChanSize) p.inputChan = make(chan byte, inputChanSize)
p.delay = 0 p.delay = 0
@ -60,12 +60,12 @@ func NewH264Parser() (p *h264Parser) {
// Stop simply sets the isParsing flag to false to indicate to the parser that // Stop simply sets the isParsing flag to false to indicate to the parser that
// we don't want to interpret incoming data anymore - this will also make the // we don't want to interpret incoming data anymore - this will also make the
// parser jump out of the parse func // parser jump out of the parse func
func (p *h264Parser) Stop() { func (p *H264) Stop() {
p.isParsing = false p.isParsing = false
} }
// Start starts the parse func as a goroutine so that incoming data is interpreted // Start starts the parse func as a goroutine so that incoming data is interpreted
func (p *h264Parser) Start() { func (p *H264) Start() {
p.isParsing = true p.isParsing = true
go p.parse() go p.parse()
} }
@ -73,31 +73,31 @@ func (p *h264Parser) Start() {
// SetDelay sets a delay inbetween each buffer output. Useful if we're parsing // SetDelay sets a delay inbetween each buffer output. Useful if we're parsing
// a file but want to replicate the speed of incoming video frames from a // a file but want to replicate the speed of incoming video frames from a
// camera // camera
func (p *h264Parser) SetDelay(delay uint) { func (p *H264) SetDelay(delay uint) {
p.delay = delay p.delay = delay
} }
// GetInputChan returns a handle to the input channel of the parser // InputChan returns a handle to the input channel of the parser
func (p *h264Parser) GetInputChan() chan byte { func (p *H264) InputChan() chan byte {
return p.inputChan return p.inputChan
} }
// GetOutputChan returns a handle to the output chan of the parser // OutputChan returns a handle to the output chan of the parser
func (p *h264Parser) GetOutputChan() chan []byte { func (p *H264) OutputChan() <-chan []byte {
return p.userOutputChanRef return p.userOutputChanRef
} }
// SetOutputChan sets the parser output chan to the passed output chan. This is // SetOutputChan sets the parser output chan to the passed output chan. This is
// useful if we want the parser output to go directly to a generator of some sort // useful if we want the parser output to go directly to a generator of some sort
// for packetization. // for packetization.
func (p *h264Parser) SetOutputChan(aChan chan []byte) { func (p *H264) SetOutputChan(o chan []byte) {
p.parserOutputChanRef = aChan p.parserOutputChanRef = o
p.userOutputChanRef = aChan p.userOutputChanRef = o
} }
// parse interprets an incoming h264 stream and extracts individual frames // parse interprets an incoming h264 stream and extracts individual frames
// aka access units // aka access units
func (p *h264Parser) parse() { func (p *H264) parse() {
outputBuffer := make([]byte, 0, outputBufferSize) outputBuffer := make([]byte, 0, outputBufferSize)
searchingForEnd := false searchingForEnd := false
for p.isParsing { for p.isParsing {

View File

@ -29,7 +29,7 @@ package parser
const frameStartCode = 0xD8 const frameStartCode = 0xD8
type mjpegParser struct { type MJPEG struct {
inputBuffer []byte inputBuffer []byte
isParsing bool isParsing bool
parserOutputChanRef chan []byte parserOutputChanRef chan []byte
@ -38,39 +38,39 @@ type mjpegParser struct {
delay uint delay uint
} }
func NewMJPEGParser(inputChanLen int) (p *mjpegParser) { func NewMJPEGParser(inputChanLen int) (p *MJPEG) {
p = new(mjpegParser) p = new(MJPEG)
p.isParsing = true p.isParsing = true
p.inputChan = make(chan byte, inputChanLen) p.inputChan = make(chan byte, inputChanLen)
return return
} }
func (p *mjpegParser) Stop() { func (p *MJPEG) Stop() {
p.isParsing = false p.isParsing = false
} }
func (p *mjpegParser) Start() { func (p *MJPEG) Start() {
go p.parse() go p.parse()
} }
func (p *mjpegParser) SetDelay(delay uint) { func (p *MJPEG) SetDelay(delay uint) {
p.delay = delay p.delay = delay
} }
func (p *mjpegParser) GetInputChan() chan byte { func (p *MJPEG) InputChan() chan byte {
return p.inputChan return p.inputChan
} }
func (p *mjpegParser) GetOutputChan() chan []byte { func (p *MJPEG) OutputChan() <-chan []byte {
return p.userOutputChanRef return p.userOutputChanRef
} }
func (p *mjpegParser) SetOutputChan(aChan chan []byte) { func (p *MJPEG) SetOutputChan(o chan []byte) {
p.parserOutputChanRef = aChan p.parserOutputChanRef = o
p.userOutputChanRef = aChan p.userOutputChanRef = o
} }
func (p *mjpegParser) parse() { func (p *MJPEG) parse() {
var outputBuffer []byte var outputBuffer []byte
for p.isParsing { for p.isParsing {
aByte := <-p.inputChan aByte := <-p.inputChan

View File

@ -43,8 +43,8 @@ var (
type Parser interface { type Parser interface {
Stop() Stop()
Start() Start()
GetInputChan() chan byte InputChan() chan byte
GetOutputChan() chan []byte OutputChan() <-chan []byte
SetOutputChan(achan chan []byte) SetOutputChan(achan chan []byte)
SetDelay(delay uint) SetDelay(delay uint)
} }

View File

@ -46,7 +46,7 @@ type Config struct {
Verbosity uint8 Verbosity uint8
HorizontalFlip uint8 HorizontalFlip uint8
VerticalFlip uint8 VerticalFlip uint8
FramesPerClip string FramesPerClip int
RtmpUrl string RtmpUrl string
Bitrate string Bitrate string
OutputFileName string OutputFileName string
@ -97,7 +97,7 @@ const (
defaultQuantization = "40" defaultQuantization = "40"
defaultBitrate = "500000" defaultBitrate = "500000"
defaultQuantizationMode = QuantizationOff defaultQuantizationMode = QuantizationOff
defaultFramesPerClip = "1" defaultFramesPerClip = 1
defaultVerticalFlip = No defaultVerticalFlip = No
defaultHorizontalFlip = No defaultHorizontalFlip = No
) )
@ -171,7 +171,6 @@ func (c *Config) Validate(r *Revid) error {
} }
switch c.Output { switch c.Output {
case Http:
case File: case File:
case NativeRtmp, FfmpegRtmp: case NativeRtmp, FfmpegRtmp:
if c.RtmpUrl == "" { if c.RtmpUrl == "" {
@ -180,10 +179,14 @@ func (c *Config) Validate(r *Revid) error {
break break
} }
r.Log(Info, "Defaulting frames per clip to 1 for rtmp output!") r.Log(Info, "Defaulting frames per clip to 1 for rtmp output!")
c.FramesPerClip = "1" c.FramesPerClip = 1
case NothingDefined: case NothingDefined:
r.Log(Warning, "No output defined, defaulting to httpOut!") r.Log(Warning, "No output defined, defaulting to httpOut!")
c.Output = defaultOutput c.Output = defaultOutput
fallthrough
case Http:
r.Log(Info, "Defaulting frames per clip to 7 for http output!")
c.FramesPerClip = 7
default: default:
return errors.New("Bad output type defined in config!") return errors.New("Bad output type defined in config!")
} }
@ -219,13 +222,9 @@ func (c *Config) Validate(r *Revid) error {
return errors.New("Bad vertical flip option defined in config!") return errors.New("Bad vertical flip option defined in config!")
} }
if c.FramesPerClip == "" { if c.FramesPerClip < 1 {
r.Log(Warning, "No FramesPerClip defined defined, defaulting to 1!") r.Log(Warning, "No FramesPerClip defined defined, defaulting to 1!")
c.FramesPerClip = defaultFramesPerClip c.FramesPerClip = defaultFramesPerClip
} else {
if integer, err := strconv.Atoi(c.FramesPerClip); integer <= 0 || err != nil {
return errors.New("Bad width defined in config!")
}
} }
if c.Width == "" { if c.Width == "" {

View File

@ -83,23 +83,23 @@ const (
// Revid provides methods to control a revid session; providing methods // Revid provides methods to control a revid session; providing methods
// to start, stop and change the state of an instance using the Config struct. // to start, stop and change the state of an instance using the Config struct.
type Revid struct { type Revid struct {
ffmpegPath string ffmpegPath string
tempDir string tempDir string
ringBuffer *ring.Buffer ringBuffer *ring.Buffer
config Config config Config
isRunning bool isRunning bool
inputFile *os.File inputFile *os.File
generator generator.Generator generator generator.Generator
parser parser.Parser parser parser.Parser
cmd *exec.Cmd cmd *exec.Cmd
inputReader *bufio.Reader inputReader *bufio.Reader
ffmpegStdin io.WriteCloser ffmpegStdin io.WriteCloser
outputChan chan []byte outputChan chan []byte
setupInput func() error setupInput func() error
getFrame func() []byte getFrame func() []byte
destination loadSender destination loadSender
rtmpInst rtmp.Session rtmpInst rtmp.Session
currentBitrate int64 bitrate int
} }
// NewRevid returns a pointer to a new Revid with the desired // NewRevid returns a pointer to a new Revid with the desired
@ -116,14 +116,18 @@ func New(c Config) (*Revid, error) {
return &r, nil return &r, nil
} }
// Returns the currently saved bitrate from the most recent bitrate check // Bitrate returns the result of the most recent bitrate check.
// check bitrate output delay in consts for this period func (r *Revid) Bitrate() int {
func (r *Revid) GetBitrate() int64 { return r.bitrate
return r.currentBitrate
} }
// GetConfigRef returns a pointer to the revidInst's Config struct object // Config returns the Revid's config.
func (r *Revid) GetConfigRef() *Config { func (r *Revid) Config() *Config {
// FIXME(kortschak): This is a massive footgun and should not exist.
// Since the config's fields are accessed in running goroutines, any
// mutation is a data race. With bad luck a data race is possible by
// reading the returned value since it is possible for the running
// Ravid to mutate the config it holds.
return &r.config return &r.config
} }
@ -163,7 +167,7 @@ func (r *Revid) reset(config Config) error {
} }
r.destination = s r.destination = s
case Http: case Http:
r.destination = newHttpSender(config.RtmpUrl, httpTimeout, r.Log) r.destination = newHttpSender(config.HttpAddress, httpTimeout, r.Log)
} }
switch r.config.Input { switch r.config.Input {
@ -199,13 +203,13 @@ func (r *Revid) reset(config Config) error {
// We have packetization of some sort, so we want to send data to Generator // We have packetization of some sort, so we want to send data to Generator
// to perform packetization // to perform packetization
r.getFrame = r.getFramePacketization r.getFrame = r.getFramePacketization
r.parser.SetOutputChan(r.generator.GetInputChan()) r.parser.SetOutputChan(r.generator.InputChan())
return nil return nil
} }
// ChangeConfig changes the current configuration of the Revid instance. // SetConfig changes the current configuration of the receiver.
func (r *Revid) ChangeConfig(c Config) error { func (r *Revid) SetConfig(c Config) error {
// FIXME(kortschak): This is reimplemented in cmd/revid-cli/main.go. // FIXME(kortschak): This is reimplemented in cmd/revid-cli/main.go.
// The implementation in the command is used and this is not. // The implementation in the command is used and this is not.
// Decide on one or the other. // Decide on one or the other.
@ -233,7 +237,7 @@ func (r *Revid) Log(logType, m string) {
fmt.Println(logType + ": " + m) fmt.Println(logType + ": " + m)
} }
// IsRunning returns true if the revid is currently running and false otherwise // IsRunning returns whether the receiver is running.
func (r *Revid) IsRunning() bool { func (r *Revid) IsRunning() bool {
return r.isRunning return r.isRunning
} }
@ -296,7 +300,7 @@ func (r *Revid) getFrameNoPacketization() []byte {
// getFramePacketization gets a frame from the generators output chan - the // getFramePacketization gets a frame from the generators output chan - the
// the generator being an mpegts or flv generator depending on the config // the generator being an mpegts or flv generator depending on the config
func (r *Revid) getFramePacketization() []byte { func (r *Revid) getFramePacketization() []byte {
return <-r.generator.GetOutputChan() return <-r.generator.OutputChan()
} }
// packClips takes data segments; whether that be tsPackets or mjpeg frames and // packClips takes data segments; whether that be tsPackets or mjpeg frames and
@ -308,7 +312,7 @@ func (r *Revid) packClips() {
select { select {
// TODO: This is temporary, need to work out how to make this work // TODO: This is temporary, need to work out how to make this work
// for cases when there is not packetisation. // for cases when there is not packetisation.
case frame := <-r.generator.GetOutputChan(): case frame := <-r.generator.OutputChan():
lenOfFrame := len(frame) lenOfFrame := len(frame)
if lenOfFrame > ringBufferElementSize { if lenOfFrame > ringBufferElementSize {
r.Log(Warning, fmt.Sprintf("Frame was too big: %v bytes, getting another one!", lenOfFrame)) r.Log(Warning, fmt.Sprintf("Frame was too big: %v bytes, getting another one!", lenOfFrame))
@ -317,26 +321,22 @@ func (r *Revid) packClips() {
} }
_, err := r.ringBuffer.Write(frame) _, err := r.ringBuffer.Write(frame)
if err != nil { if err != nil {
r.Log(Error, err.Error())
if err == ring.ErrDropped { if err == ring.ErrDropped {
r.Log(Warning, fmt.Sprintf("dropped %d byte frame", len(frame))) r.Log(Warning, fmt.Sprintf("dropped %d byte frame", len(frame)))
} else {
r.Log(Error, err.Error())
} }
} }
packetCount++ packetCount++
clipSize += lenOfFrame clipSize += lenOfFrame
fpcAsInt, err := strconv.Atoi(r.config.FramesPerClip) if packetCount >= r.config.FramesPerClip {
if err != nil {
r.Log(Error, "Frames per clip not quite right! Defaulting to 1!")
r.config.FramesPerClip = "1"
}
if packetCount >= fpcAsInt {
r.ringBuffer.Flush() r.ringBuffer.Flush()
clipSize = 0 clipSize = 0
packetCount = 0 packetCount = 0
continue continue
} }
default: default:
time.Sleep(time.Duration(5) * time.Millisecond) time.Sleep(5 * time.Millisecond)
} }
} }
} }
@ -417,8 +417,9 @@ func (r *Revid) outputClips() {
now = time.Now() now = time.Now()
deltaTime := now.Sub(prevTime) deltaTime := now.Sub(prevTime)
if deltaTime > bitrateTime { if deltaTime > bitrateTime {
r.currentBitrate = int64(float64(bytes*8) / float64(deltaTime/1e9)) // FIXME(kortschak): For subsecond deltaTime, this will give infinite bitrate.
r.Log(Debug, fmt.Sprintf("Bitrate: %v bits/s\n", r.currentBitrate)) r.bitrate = int(float64(bytes*8) / float64(deltaTime/time.Second))
r.Log(Debug, fmt.Sprintf("Bitrate: %v bits/s\n", r.bitrate))
r.Log(Debug, fmt.Sprintf("Ring buffer size: %v\n", r.ringBuffer.Len())) r.Log(Debug, fmt.Sprintf("Ring buffer size: %v\n", r.ringBuffer.Len()))
prevTime = now prevTime = now
bytes = 0 bytes = 0
@ -513,7 +514,7 @@ func (r *Revid) readCamera() {
r.Log(Error, "No data from camera!") r.Log(Error, "No data from camera!")
time.Sleep(cameraRetryPeriod) time.Sleep(cameraRetryPeriod)
default: default:
r.parser.GetInputChan() <- data[0] r.parser.InputChan() <- data[0]
} }
} }
r.Log(Info, "Not trying to read from camera anymore!") r.Log(Info, "Not trying to read from camera anymore!")
@ -542,7 +543,7 @@ func (r *Revid) readFile() error {
return err return err
} }
for i := range data { for i := range data {
r.parser.GetInputChan() <- data[i] r.parser.InputChan() <- data[i]
} }
r.inputFile.Close() r.inputFile.Close()
return nil return nil

View File

@ -29,8 +29,6 @@ LICENSE
package revid package revid
import ( import (
"bytes"
"fmt"
"io" "io"
"os" "os"
"os/exec" "os/exec"
@ -107,8 +105,6 @@ type httpSender struct {
log func(lvl, msg string) log func(lvl, msg string)
buf []byte
chunk *ring.Chunk chunk *ring.Chunk
} }
@ -123,32 +119,30 @@ func newHttpSender(_ string, _ time.Duration, log func(lvl, msg string)) *httpSe
func (s *httpSender) load(c *ring.Chunk) error { func (s *httpSender) load(c *ring.Chunk) error {
s.chunk = c s.chunk = c
buf := bytes.NewBuffer(s.buf[:0])
_, err := s.chunk.WriteTo(buf)
s.buf = buf.Bytes()
if err != nil {
return fmt.Errorf("httpSender: %v", err)
}
return nil return nil
} }
func (s *httpSender) send() error { func (s *httpSender) send() error {
if s.chunk == nil {
// Do not retry with httpSender,
// so just return without error
// if the chunk has been cleared.
return nil
}
pins := netsender.MakePins("V0", "") pins := netsender.MakePins("V0", "")
pins[0].Value = len(s.buf) pins[0].Value = s.chunk.Len()
pins[0].Data = s.buf pins[0].Data = s.chunk.Bytes()
pins[0].MimeType = "video/mp2t" pins[0].MimeType = "video/mp2t"
_, _, err := s.client.Send(netsender.RequestPoll, pins) _, _, err := s.client.Send(netsender.RequestPoll, pins)
if err != nil { // We will not retry, so release
return err // the chunk and clear it now.
}
return nil
}
func (s *httpSender) release() {
s.chunk.Close() s.chunk.Close()
s.chunk = nil s.chunk = nil
return err
} }
func (s *httpSender) release() {}
func (s *httpSender) close() error { return nil } func (s *httpSender) close() error { return nil }
// ffmpegSender implements loadSender for an FFMPEG RTMP destination. // ffmpegSender implements loadSender for an FFMPEG RTMP destination.
@ -223,7 +217,7 @@ func newRtmpSender(url string, timeout uint, retries int, log func(lvl, msg stri
var err error var err error
for n := 0; n < retries; n++ { for n := 0; n < retries; n++ {
sess = rtmp.NewSession(url, timeout) sess = rtmp.NewSession(url, timeout)
err = sess.StartSession() err = sess.Open()
if err == nil { if err == nil {
break break
} }
@ -269,7 +263,7 @@ func (s *rtmpSender) restart() error {
} }
for n := 0; n < s.retries; n++ { for n := 0; n < s.retries; n++ {
s.sess = rtmp.NewSession(s.url, s.timeout) s.sess = rtmp.NewSession(s.url, s.timeout)
err = s.sess.StartSession() err = s.sess.Open()
if err == nil { if err == nil {
break break
} }

View File

@ -34,30 +34,29 @@ LICENSE
#include <rtmp.h> #include <rtmp.h>
RTMP* start_session(RTMP* rtmp, char* url, uint connect_timeout) { RTMP* start_session(RTMP* rtmp, char* url, uint connect_timeout) {
printf("RTMP url: %s\n", url);
rtmp = RTMP_Alloc(); rtmp = RTMP_Alloc();
RTMP_Init(rtmp); RTMP_Init(rtmp);
rtmp->Link.timeout = connect_timeout; rtmp->Link.timeout = connect_timeout;
if (!RTMP_SetupURL(rtmp, url)) { if (!RTMP_SetupURL(rtmp, url)) {
printf("Can't setup url!\n");
RTMP_Close(rtmp); RTMP_Close(rtmp);
RTMP_Free(rtmp); RTMP_Free(rtmp);
errno = EINVAL;
return NULL; return NULL;
} }
RTMP_EnableWrite(rtmp); RTMP_EnableWrite(rtmp);
RTMP_SetBufferMS(rtmp, 3600 * 1000); RTMP_SetBufferMS(rtmp, 3600 * 1000);
if (!RTMP_Connect(rtmp, NULL)) { if (!RTMP_Connect(rtmp, NULL)) {
printf("RTMP can't connect!\n");
RTMP_Close(rtmp); RTMP_Close(rtmp);
RTMP_Free(rtmp); RTMP_Free(rtmp);
errno = EIO;
return NULL; return NULL;
} }
if (!RTMP_ConnectStream(rtmp, 0)) { if (!RTMP_ConnectStream(rtmp, 0)) {
printf("RTMP can't connect stream!\n");
RTMP_Close(rtmp); RTMP_Close(rtmp);
RTMP_Free(rtmp); RTMP_Free(rtmp);
errno = EIO;
return NULL; return NULL;
} }
@ -66,13 +65,11 @@ RTMP* start_session(RTMP* rtmp, char* url, uint connect_timeout) {
unsigned int write_frame(RTMP* rtmp, char* data, uint data_length) { unsigned int write_frame(RTMP* rtmp, char* data, uint data_length) {
if (!RTMP_IsConnected(rtmp)) { if (!RTMP_IsConnected(rtmp)) {
printf("RTMP is not connected!\n");
return 1; return 1;
} }
if (!RTMP_Write(rtmp, (const char*)data, data_length)) { if (!RTMP_Write(rtmp, (const char*)data, data_length)) {
printf("RTMP write error!\n"); return 2;
return 1;
} }
return 0; return 0;
@ -80,8 +77,7 @@ unsigned int write_frame(RTMP* rtmp, char* data, uint data_length) {
unsigned int end_session(RTMP* rtmp) { unsigned int end_session(RTMP* rtmp) {
if (rtmp == NULL) { if (rtmp == NULL) {
printf("Tried to end RTMP session, but not allocated yet!\n"); return 3;
return 1;
} }
RTMP_Close(rtmp); RTMP_Close(rtmp);

View File

@ -43,12 +43,13 @@ import "C"
import ( import (
"errors" "errors"
"strconv"
"unsafe" "unsafe"
) )
// Session provides an interface for sending flv tags over rtmp. // Session provides an interface for sending flv tags over rtmp.
type Session interface { type Session interface {
StartSession() error Open() error
Write([]byte) (int, error) Write([]byte) (int, error)
Close() error Close() error
} }
@ -71,15 +72,16 @@ func NewSession(url string, connectTimeout uint) Session {
} }
} }
// StartSession establishes an rtmp connection with the url passed into the // Open establishes an rtmp connection with the url passed into the
// constructor // constructor
func (s *session) StartSession() error { func (s *session) Open() error {
if s.rtmp != nil { if s.rtmp != nil {
return errors.New("rtmp: attempt to start already running session") return errors.New("rtmp: attempt to start already running session")
} }
s.rtmp = C.start_session(s.rtmp, C.CString(s.url), C.uint(s.timeout)) var err error
s.rtmp, err = C.start_session(s.rtmp, C.CString(s.url), C.uint(s.timeout))
if s.rtmp == nil { if s.rtmp == nil {
return errors.New("RTMP start error! Check rtmp log for details!") return err
} }
return nil return nil
} }
@ -87,10 +89,11 @@ func (s *session) StartSession() error {
// Write writes a frame (flv tag) to the rtmp connection // Write writes a frame (flv tag) to the rtmp connection
func (s *session) Write(data []byte) (int, error) { func (s *session) Write(data []byte) (int, error) {
if s.rtmp == nil { if s.rtmp == nil {
return 0, errors.New("rtmp: attempt to write to non-running session") return 0, Err(3)
} }
if C.write_frame(s.rtmp, (*C.char)(unsafe.Pointer(&data[0])), C.uint(len(data))) != 0 { ret := C.write_frame(s.rtmp, (*C.char)(unsafe.Pointer(&data[0])), C.uint(len(data)))
return 0, errors.New("RTMP write error! Check rtmp log for details!") if ret != 0 {
return 0, Err(ret)
} }
return len(data), nil return len(data), nil
} }
@ -98,12 +101,30 @@ func (s *session) Write(data []byte) (int, error) {
// Close terminates the rtmp connection // Close terminates the rtmp connection
func (s *session) Close() error { func (s *session) Close() error {
if s.rtmp == nil { if s.rtmp == nil {
return errors.New("Tried to stop rtmp session, but not running!") return Err(3)
} }
ret := C.end_session(s.rtmp) ret := C.end_session(s.rtmp)
s.rtmp = nil s.rtmp = nil
if ret != 0 { if ret != 0 {
return errors.New("RTMP end session error! Check rtmp log for details!") return Err(ret)
} }
return nil return nil
} }
var rtmpErrs = [...]string{
1: "rtmp: not connected",
2: "rtmp: write error",
3: "rtmp: not started",
}
type Err uint
func (e Err) Error() string {
if 0 <= int(e) && int(e) < len(rtmpErrs) {
s := rtmpErrs[e]
if s != "" {
return s
}
}
return "rtmp: " + strconv.Itoa(int(e))
}