From 5bdd66e22b65cf017247e3f43ccdf30e8a351471 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Sun, 17 Jun 2018 20:45:48 +0930 Subject: [PATCH 1/9] rtmp: rename StartSession to Open --- revid/senders.go | 4 ++-- rtmp/rtmp.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/revid/senders.go b/revid/senders.go index dd4b31ca..d37e0e4c 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -223,7 +223,7 @@ func newRtmpSender(url string, timeout uint, retries int, log func(lvl, msg stri var err error for n := 0; n < retries; n++ { sess = rtmp.NewSession(url, timeout) - err = sess.StartSession() + err = sess.Open() if err == nil { break } @@ -269,7 +269,7 @@ func (s *rtmpSender) restart() error { } for n := 0; n < s.retries; n++ { s.sess = rtmp.NewSession(s.url, s.timeout) - err = s.sess.StartSession() + err = s.sess.Open() if err == nil { break } diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index 0ddf5e48..d0c53564 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -48,7 +48,7 @@ import ( // Session provides an interface for sending flv tags over rtmp. type Session interface { - StartSession() error + Open() error Write([]byte) (int, error) Close() error } @@ -71,9 +71,9 @@ func NewSession(url string, connectTimeout uint) Session { } } -// StartSession establishes an rtmp connection with the url passed into the +// Open establishes an rtmp connection with the url passed into the // constructor -func (s *session) StartSession() error { +func (s *session) Open() error { if s.rtmp != nil { return errors.New("rtmp: attempt to start already running session") } From 6de4f8c9a62935575985c567d4401b8c365493a1 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Sun, 17 Jun 2018 20:57:50 +0930 Subject: [PATCH 2/9] revid: rename GetBitrate to Bitrate Also change to int; when we get more than 2Gbs^-1, we'll probably be using 64 bit devices. --- cmd/revid-cli/main.go | 2 +- revid/revid.go | 46 +++++++++++++++++++++---------------------- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 87e00faf..937453dc 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -306,7 +306,7 @@ func sendTo(ns *netsender.Sender) error { inputs := netsender.MakePins(ns.GetConfigParam("ip"), "X") for i, pin := range inputs { if pin.Name == "X23" { - inputs[i].Value = int(revidInst.GetBitrate()) + inputs[i].Value = revidInst.Bitrate() } } diff --git a/revid/revid.go b/revid/revid.go index b6428af2..d6d9e26d 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -83,23 +83,23 @@ const ( // Revid provides methods to control a revid session; providing methods // to start, stop and change the state of an instance using the Config struct. type Revid struct { - ffmpegPath string - tempDir string - ringBuffer *ring.Buffer - config Config - isRunning bool - inputFile *os.File - generator generator.Generator - parser parser.Parser - cmd *exec.Cmd - inputReader *bufio.Reader - ffmpegStdin io.WriteCloser - outputChan chan []byte - setupInput func() error - getFrame func() []byte - destination loadSender - rtmpInst rtmp.Session - currentBitrate int64 + ffmpegPath string + tempDir string + ringBuffer *ring.Buffer + config Config + isRunning bool + inputFile *os.File + generator generator.Generator + parser parser.Parser + cmd *exec.Cmd + inputReader *bufio.Reader + ffmpegStdin io.WriteCloser + outputChan chan []byte + setupInput func() error + getFrame func() []byte + destination loadSender + rtmpInst rtmp.Session + bitrate int } // NewRevid returns a pointer to a new Revid with the desired @@ -116,10 +116,9 @@ func New(c Config) (*Revid, error) { return &r, nil } -// Returns the currently saved bitrate from the most recent bitrate check -// check bitrate output delay in consts for this period -func (r *Revid) GetBitrate() int64 { - return r.currentBitrate +// Bitrate returns the result of the most recent bitrate check. +func (r *Revid) Bitrate() int { + return r.bitrate } // GetConfigRef returns a pointer to the revidInst's Config struct object @@ -417,8 +416,9 @@ func (r *Revid) outputClips() { now = time.Now() deltaTime := now.Sub(prevTime) if deltaTime > bitrateTime { - r.currentBitrate = int64(float64(bytes*8) / float64(deltaTime/1e9)) - r.Log(Debug, fmt.Sprintf("Bitrate: %v bits/s\n", r.currentBitrate)) + // FIXME(kortschak): For subsecond deltaTime, this will give infinite bitrate. + r.bitrate = int(float64(bytes*8) / float64(deltaTime/time.Second)) + r.Log(Debug, fmt.Sprintf("Bitrate: %v bits/s\n", r.bitrate)) r.Log(Debug, fmt.Sprintf("Ring buffer size: %v\n", r.ringBuffer.Len())) prevTime = now bytes = 0 From ed923ed25642a3f2e91ee32e9051c5c2912db4b1 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Sun, 17 Jun 2018 21:08:27 +0930 Subject: [PATCH 3/9] revid: rename GetConfigRef to Config --- revid/revid.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index d6d9e26d..04ffd9ad 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -121,8 +121,13 @@ func (r *Revid) Bitrate() int { return r.bitrate } -// GetConfigRef returns a pointer to the revidInst's Config struct object -func (r *Revid) GetConfigRef() *Config { +// Config returns the Revid's config. +func (r *Revid) Config() *Config { + // FIXME(kortschak): This is a massive footgun and should not exist. + // Since the config's fields are accessed in running goroutines, any + // mutation is a data race. With bad luck a data race is possible by + // reading the returned value since it is possible for the running + // Ravid to mutate the config it holds. return &r.config } From a7d3edbe57719e80a819e161d93b1cd6d3fde502 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Sun, 17 Jun 2018 21:10:27 +0930 Subject: [PATCH 4/9] revid: rename ChangeConfig to SetConfig --- revid/revid.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index 04ffd9ad..21ead9a2 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -208,8 +208,8 @@ func (r *Revid) reset(config Config) error { return nil } -// ChangeConfig changes the current configuration of the Revid instance. -func (r *Revid) ChangeConfig(c Config) error { +// SetConfig changes the current configuration of the receiver. +func (r *Revid) SetConfig(c Config) error { // FIXME(kortschak): This is reimplemented in cmd/revid-cli/main.go. // The implementation in the command is used and this is not. // Decide on one or the other. From b66abcbfecae34580dd04aa1a83891f66c16318a Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Sun, 17 Jun 2018 21:11:33 +0930 Subject: [PATCH 5/9] revid: improve doc comment --- revid/revid.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/revid/revid.go b/revid/revid.go index 21ead9a2..4332e987 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -237,7 +237,7 @@ func (r *Revid) Log(logType, m string) { fmt.Println(logType + ": " + m) } -// IsRunning returns true if the revid is currently running and false otherwise +// IsRunning returns whether the receiver is running. func (r *Revid) IsRunning() bool { return r.isRunning } From ca5eefa4c368d199bdac61df54eff7ea9eaa76ba Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Sun, 17 Jun 2018 21:27:38 +0930 Subject: [PATCH 6/9] generator,parser: remove Get prefix from read accessors Also make user-facing chan exposure a little safer. --- generator/flv_generator.go | 8 ++++---- generator/generator.go | 4 ++-- generator/mpegts_generator.go | 8 ++++---- parser/h264.go | 14 +++++++------- parser/mjpeg.go | 10 +++++----- parser/parser.go | 4 ++-- revid/revid.go | 10 +++++----- 7 files changed, 29 insertions(+), 29 deletions(-) diff --git a/generator/flv_generator.go b/generator/flv_generator.go index 7c6d6ddd..375e2154 100644 --- a/generator/flv_generator.go +++ b/generator/flv_generator.go @@ -62,15 +62,15 @@ type flvGenerator struct { isGenerating bool } -// GetInputChan returns the input channel to the generator. This is where the +// InputChan returns the input channel to the generator. This is where the // raw data frames are entered into the generator -func (g *flvGenerator) GetInputChan() chan []byte { +func (g *flvGenerator) InputChan() chan []byte { return g.inputChan } -// GetOutputChan retuns the output chan of the generator - this is where the +// OutputChan retuns the output chan of the generator - this is where the // flv packets (more specifically tags) are outputted. -func (g *flvGenerator) GetOutputChan() chan []byte { +func (g *flvGenerator) OutputChan() <-chan []byte { return g.outputChan } diff --git a/generator/generator.go b/generator/generator.go index 5e04fb3d..50bace43 100644 --- a/generator/generator.go +++ b/generator/generator.go @@ -28,8 +28,8 @@ LICENSE package generator type Generator interface { - GetInputChan() chan []byte - GetOutputChan() chan []byte + InputChan() chan []byte + OutputChan() <-chan []byte Start() Stop() } diff --git a/generator/mpegts_generator.go b/generator/mpegts_generator.go index 2cc4792e..4b9d7e0b 100644 --- a/generator/mpegts_generator.go +++ b/generator/mpegts_generator.go @@ -86,15 +86,15 @@ type tsGenerator struct { isGenerating bool } -// getInputChan returns a handle to the nalInputChan (inputChan) so that nal units +// InputChan returns a handle to the nalInputChan (inputChan) so that nal units // can be passed to the generator and processed -func (g *tsGenerator) GetInputChan() chan []byte { +func (g *tsGenerator) InputChan() chan []byte { return g.nalInputChan } -// GetOutputChan returns a handle to the generator output chan where the mpegts +// OutputChan returns a handle to the generator output chan where the mpegts // packets will show up once ready to go -func (g *tsGenerator) GetOutputChan() chan []byte { +func (g *tsGenerator) OutputChan() <-chan []byte { return g.outputChan } diff --git a/parser/h264.go b/parser/h264.go index 0ad65c6a..922066b3 100644 --- a/parser/h264.go +++ b/parser/h264.go @@ -77,22 +77,22 @@ func (p *h264Parser) SetDelay(delay uint) { p.delay = delay } -// GetInputChan returns a handle to the input channel of the parser -func (p *h264Parser) GetInputChan() chan byte { +// InputChan returns a handle to the input channel of the parser +func (p *h264Parser) InputChan() chan byte { return p.inputChan } -// GetOutputChan returns a handle to the output chan of the parser -func (p *h264Parser) GetOutputChan() chan []byte { +// OutputChan returns a handle to the output chan of the parser +func (p *h264Parser) OutputChan() <-chan []byte { return p.userOutputChanRef } // SetOutputChan sets the parser output chan to the passed output chan. This is // useful if we want the parser output to go directly to a generator of some sort // for packetization. -func (p *h264Parser) SetOutputChan(aChan chan []byte) { - p.parserOutputChanRef = aChan - p.userOutputChanRef = aChan +func (p *h264Parser) SetOutputChan(o chan []byte) { + p.parserOutputChanRef = o + p.userOutputChanRef = o } // parse interprets an incoming h264 stream and extracts individual frames diff --git a/parser/mjpeg.go b/parser/mjpeg.go index 2c9a8736..70901658 100644 --- a/parser/mjpeg.go +++ b/parser/mjpeg.go @@ -57,17 +57,17 @@ func (p *mjpegParser) SetDelay(delay uint) { p.delay = delay } -func (p *mjpegParser) GetInputChan() chan byte { +func (p *mjpegParser) InputChan() chan byte { return p.inputChan } -func (p *mjpegParser) GetOutputChan() chan []byte { +func (p *mjpegParser) OutputChan() <-chan []byte { return p.userOutputChanRef } -func (p *mjpegParser) SetOutputChan(aChan chan []byte) { - p.parserOutputChanRef = aChan - p.userOutputChanRef = aChan +func (p *mjpegParser) SetOutputChan(o chan []byte) { + p.parserOutputChanRef = o + p.userOutputChanRef = o } func (p *mjpegParser) parse() { diff --git a/parser/parser.go b/parser/parser.go index 59ac5448..157b27c1 100644 --- a/parser/parser.go +++ b/parser/parser.go @@ -43,8 +43,8 @@ var ( type Parser interface { Stop() Start() - GetInputChan() chan byte - GetOutputChan() chan []byte + InputChan() chan byte + OutputChan() <-chan []byte SetOutputChan(achan chan []byte) SetDelay(delay uint) } diff --git a/revid/revid.go b/revid/revid.go index 4332e987..235b96eb 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -203,7 +203,7 @@ func (r *Revid) reset(config Config) error { // We have packetization of some sort, so we want to send data to Generator // to perform packetization r.getFrame = r.getFramePacketization - r.parser.SetOutputChan(r.generator.GetInputChan()) + r.parser.SetOutputChan(r.generator.InputChan()) return nil } @@ -300,7 +300,7 @@ func (r *Revid) getFrameNoPacketization() []byte { // getFramePacketization gets a frame from the generators output chan - the // the generator being an mpegts or flv generator depending on the config func (r *Revid) getFramePacketization() []byte { - return <-r.generator.GetOutputChan() + return <-r.generator.OutputChan() } // packClips takes data segments; whether that be tsPackets or mjpeg frames and @@ -312,7 +312,7 @@ func (r *Revid) packClips() { select { // TODO: This is temporary, need to work out how to make this work // for cases when there is not packetisation. - case frame := <-r.generator.GetOutputChan(): + case frame := <-r.generator.OutputChan(): lenOfFrame := len(frame) if lenOfFrame > ringBufferElementSize { r.Log(Warning, fmt.Sprintf("Frame was too big: %v bytes, getting another one!", lenOfFrame)) @@ -518,7 +518,7 @@ func (r *Revid) readCamera() { r.Log(Error, "No data from camera!") time.Sleep(cameraRetryPeriod) default: - r.parser.GetInputChan() <- data[0] + r.parser.InputChan() <- data[0] } } r.Log(Info, "Not trying to read from camera anymore!") @@ -547,7 +547,7 @@ func (r *Revid) readFile() error { return err } for i := range data { - r.parser.GetInputChan() <- data[i] + r.parser.InputChan() <- data[i] } r.inputFile.Close() return nil From 0b0f12431bcceceb3e9ef86ec84ab17774ab5415 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Sun, 17 Jun 2018 21:29:20 +0930 Subject: [PATCH 7/9] parser: expose H264 documentation --- parser/h264.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/parser/h264.go b/parser/h264.go index 922066b3..630c6ee2 100644 --- a/parser/h264.go +++ b/parser/h264.go @@ -37,9 +37,9 @@ const ( outputBufferSize = 10000 ) -// h264Parser provides properties and methods to allow for the parsing of a +// H264 provides properties and methods to allow for the parsing of a // h264 stream - i.e. to allow extraction of the individual access units -type h264Parser struct { +type H264 struct { inputBuffer []byte isParsing bool parserOutputChanRef chan []byte @@ -48,9 +48,9 @@ type h264Parser struct { delay uint } -// NewH264Parser returns an instance of the h264Parser struct -func NewH264Parser() (p *h264Parser) { - p = new(h264Parser) +// NewH264Parser returns an instance of the H264 struct +func NewH264Parser() (p *H264) { + p = new(H264) p.isParsing = true p.inputChan = make(chan byte, inputChanSize) p.delay = 0 @@ -60,12 +60,12 @@ func NewH264Parser() (p *h264Parser) { // Stop simply sets the isParsing flag to false to indicate to the parser that // we don't want to interpret incoming data anymore - this will also make the // parser jump out of the parse func -func (p *h264Parser) Stop() { +func (p *H264) Stop() { p.isParsing = false } // Start starts the parse func as a goroutine so that incoming data is interpreted -func (p *h264Parser) Start() { +func (p *H264) Start() { p.isParsing = true go p.parse() } @@ -73,31 +73,31 @@ func (p *h264Parser) Start() { // SetDelay sets a delay inbetween each buffer output. Useful if we're parsing // a file but want to replicate the speed of incoming video frames from a // camera -func (p *h264Parser) SetDelay(delay uint) { +func (p *H264) SetDelay(delay uint) { p.delay = delay } // InputChan returns a handle to the input channel of the parser -func (p *h264Parser) InputChan() chan byte { +func (p *H264) InputChan() chan byte { return p.inputChan } // OutputChan returns a handle to the output chan of the parser -func (p *h264Parser) OutputChan() <-chan []byte { +func (p *H264) OutputChan() <-chan []byte { return p.userOutputChanRef } // SetOutputChan sets the parser output chan to the passed output chan. This is // useful if we want the parser output to go directly to a generator of some sort // for packetization. -func (p *h264Parser) SetOutputChan(o chan []byte) { +func (p *H264) SetOutputChan(o chan []byte) { p.parserOutputChanRef = o p.userOutputChanRef = o } // parse interprets an incoming h264 stream and extracts individual frames // aka access units -func (p *h264Parser) parse() { +func (p *H264) parse() { outputBuffer := make([]byte, 0, outputBufferSize) searchingForEnd := false for p.isParsing { From 77cb074ede14c8293564e015cabf03aaa9d2e835 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Sun, 17 Jun 2018 21:30:00 +0930 Subject: [PATCH 8/9] parser: expose MJPEG documentation --- parser/mjpeg.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/parser/mjpeg.go b/parser/mjpeg.go index 70901658..4271601a 100644 --- a/parser/mjpeg.go +++ b/parser/mjpeg.go @@ -29,7 +29,7 @@ package parser const frameStartCode = 0xD8 -type mjpegParser struct { +type MJPEG struct { inputBuffer []byte isParsing bool parserOutputChanRef chan []byte @@ -38,39 +38,39 @@ type mjpegParser struct { delay uint } -func NewMJPEGParser(inputChanLen int) (p *mjpegParser) { - p = new(mjpegParser) +func NewMJPEGParser(inputChanLen int) (p *MJPEG) { + p = new(MJPEG) p.isParsing = true p.inputChan = make(chan byte, inputChanLen) return } -func (p *mjpegParser) Stop() { +func (p *MJPEG) Stop() { p.isParsing = false } -func (p *mjpegParser) Start() { +func (p *MJPEG) Start() { go p.parse() } -func (p *mjpegParser) SetDelay(delay uint) { +func (p *MJPEG) SetDelay(delay uint) { p.delay = delay } -func (p *mjpegParser) InputChan() chan byte { +func (p *MJPEG) InputChan() chan byte { return p.inputChan } -func (p *mjpegParser) OutputChan() <-chan []byte { +func (p *MJPEG) OutputChan() <-chan []byte { return p.userOutputChanRef } -func (p *mjpegParser) SetOutputChan(o chan []byte) { +func (p *MJPEG) SetOutputChan(o chan []byte) { p.parserOutputChanRef = o p.userOutputChanRef = o } -func (p *mjpegParser) parse() { +func (p *MJPEG) parse() { var outputBuffer []byte for p.isParsing { aByte := <-p.inputChan From cb4dea71d676d33e8070cc68d233473b3390dd9d Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Sun, 17 Jun 2018 21:38:37 +0930 Subject: [PATCH 9/9] cmd/revid-cli: match change to netsender --- cmd/revid-cli/main.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 937453dc..09b7691b 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -241,7 +241,7 @@ func main() { var vs int if *useNetsender { // initialize NetSender and use NetSender's logger - config.Logger = netsender.GetLogger() + config.Logger = netsender.Logger() var err error err = ns.Init(nil, nil, nil) @@ -267,7 +267,7 @@ func main() { continue } - if vs != ns.GetVarSum() { + if vs != ns.VarSum() { // vars changed vars, err := ns.Vars() if err != nil { @@ -275,7 +275,7 @@ func main() { time.Sleep(netSendRetryTime) continue } - vs = ns.GetVarSum() + vs = ns.VarSum() if vars["mode"] == "Paused" { if !paused { config.Logger.Log(progName, "Info", "Pausing revid")