From aba6ab16456ddb1ca4ec81670d6acba712794ffa Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Thu, 4 Oct 2018 10:32:41 +0930 Subject: [PATCH] stream,revid: clean up data flow and docs --- revid/revid.go | 78 +++++++++++++++++++++------------------------- stream/encoding.go | 20 ++++++++++++ 2 files changed, 55 insertions(+), 43 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index bac1262a..309bc8fe 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -38,7 +38,6 @@ import ( "strings" "time" - "bitbucket.org/ausocean/av/rtmp" "bitbucket.org/ausocean/av/stream" "bitbucket.org/ausocean/av/stream/flv" "bitbucket.org/ausocean/av/stream/lex" @@ -92,23 +91,38 @@ type Logger interface { // Revid provides methods to control a revid session; providing methods // to start, stop and change the state of an instance using the Config struct. type Revid struct { - ffmpegPath string - tempDir string - ringBuffer *ring.Buffer - config Config - isRunning bool - encoder stream.Encoder - lexTo func(dst stream.Encoder, src io.Reader, delay time.Duration) error - cmd *exec.Cmd - inputReader io.ReadCloser - ffmpegStdin io.WriteCloser - outputChan chan []byte - setupInput func() error - getFrame func() []byte + // config holds the Revid configuration. + // For historical reasons it also handles logging. + // FIXME(kortschak): The relationship of concerns + // in config/ns is weird. + config Config + // ns holds the netsender.Sender responsible for HTTP. + ns *netsender.Sender + + // setupInput holds the current approach to setting up + // the input stream. + setupInput func() error + + // cmd is the exec'd process that may be used to produce + // the input stream. + // FIXME(kortschak): This should not exist. Replace this + // with a context.Context cancellation. + cmd *exec.Cmd + + // lexTo and encoder handle transcoding the input stream. + lexTo func(dst stream.Encoder, src io.Reader, delay time.Duration) error + encoder stream.Encoder + // ringBuffer handles passing frames from the transcoder + // to the target destination. + ringBuffer *ring.Buffer + // destination is the target endpoint. destination loadSender - rtmpInst rtmp.Session - bitrate int - ns *netsender.Sender + + // bitrate hold the last send bitrate calculation result. + bitrate int + + // isRunning is a loaded and cocked foot-gun. + isRunning bool } // New returns a pointer to a new Revid with the desired configuration, and/or @@ -121,7 +135,6 @@ func New(c Config, ns *netsender.Sender) (*Revid, error) { return nil, err } r.ringBuffer = ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout) - r.outputChan = make(chan []byte, outputChanSize) return &r, nil } @@ -198,9 +211,6 @@ func (r *Revid) reset(config Config) error { case None: // no packetisation - Revid output chan grabs raw data straight from parser r.lexTo = func(dst stream.Encoder, src io.Reader, _ time.Duration) error { - // FIXME(kortschak): Reduce this allocation mess. It exists - // because we do not know that the dst will not modify the - // buffer. It shouldn't, but ... for { var b [4 << 10]byte n, rerr := src.Read(b[:]) @@ -213,8 +223,7 @@ func (r *Revid) reset(config Config) error { } } } - r.getFrame = r.getFrameNoPacketization - return nil + r.encoder = stream.NopEncoder() case Mpegts: r.config.Logger.Log(smartlogger.Info, pkg+"using MPEGTS packetisation") frameRate, _ := strconv.ParseFloat(r.config.FrameRate, 64) @@ -224,9 +233,6 @@ func (r *Revid) reset(config Config) error { frameRate, _ := strconv.Atoi(r.config.FrameRate) r.encoder = flv.NewEncoder(true, true, frameRate) } - // We have packetization of some sort, so we want to send data to Generator - // to perform packetization - r.getFrame = r.getFramePacketization return nil } @@ -271,18 +277,6 @@ func (r *Revid) Stop() { } } -// getFrameNoPacketization gets a frame directly from the revid output chan -// as we don't need to go through the encoder with no packetization settings -func (r *Revid) getFrameNoPacketization() []byte { - return <-r.outputChan -} - -// getFramePacketization gets a frame from the generators output chan - the -// the encoder being an mpegts or flv encoder depending on the config -func (r *Revid) getFramePacketization() []byte { - return <-r.encoder.Stream() -} - // TODO(kortschak): Factor this out to an io.Writer type and remove the Stream chans. // Also add a no-op encoder that handles non-packeted data. // @@ -299,8 +293,7 @@ func (r *Revid) packClips() { lenOfFrame := len(frame) if lenOfFrame > ringBufferElementSize { r.config.Logger.Log(smartlogger.Warning, pkg+"frame was too big", "frame size", lenOfFrame) - frame = r.getFrame() - lenOfFrame = len(frame) + continue } _, err := r.ringBuffer.Write(frame) if err != nil { @@ -470,12 +463,11 @@ func (r *Revid) startRaspivid() error { if err != nil { return err } - r.inputReader = stdout r.config.Logger.Log(smartlogger.Info, pkg+"reading camera data") - err = r.lexTo(r.encoder, r.inputReader, delay) + err = r.lexTo(r.encoder, stdout, delay) if err != nil { - return err + return err } r.config.Logger.Log(smartlogger.Info, pkg+"not trying to read from camera anymore") return nil diff --git a/stream/encoding.go b/stream/encoding.go index cafcd914..8765472c 100644 --- a/stream/encoding.go +++ b/stream/encoding.go @@ -31,3 +31,23 @@ type Encoder interface { Encode([]byte) error Stream() <-chan []byte } + +// NopEncoder returns an +func NopEncoder() Encoder { + return noop{make(chan []byte, 1)} +} + +type noop struct { + dst chan []byte +} + +func (e noop) Encode(p []byte) error { + b := make([]byte, len(p)) + copy(b, p) + e.dst <- b + return nil +} + +func (e noop) Stream() <-chan []byte { + return e.dst +}