Merged in gardening/clarity (pull request #67)

stream,revid: clean up data flow and docs

Approved-by: Saxon Milton <saxon.milton@gmail.com>
This commit is contained in:
kortschak 2018-10-04 02:04:04 +00:00
commit 8b07457f52
2 changed files with 55 additions and 43 deletions

View File

@ -38,7 +38,6 @@ import (
"strings" "strings"
"time" "time"
"bitbucket.org/ausocean/av/rtmp"
"bitbucket.org/ausocean/av/stream" "bitbucket.org/ausocean/av/stream"
"bitbucket.org/ausocean/av/stream/flv" "bitbucket.org/ausocean/av/stream/flv"
"bitbucket.org/ausocean/av/stream/lex" "bitbucket.org/ausocean/av/stream/lex"
@ -92,23 +91,38 @@ type Logger interface {
// Revid provides methods to control a revid session; providing methods // Revid provides methods to control a revid session; providing methods
// to start, stop and change the state of an instance using the Config struct. // to start, stop and change the state of an instance using the Config struct.
type Revid struct { type Revid struct {
ffmpegPath string // config holds the Revid configuration.
tempDir string // For historical reasons it also handles logging.
ringBuffer *ring.Buffer // FIXME(kortschak): The relationship of concerns
config Config // in config/ns is weird.
isRunning bool config Config
encoder stream.Encoder // ns holds the netsender.Sender responsible for HTTP.
lexTo func(dst stream.Encoder, src io.Reader, delay time.Duration) error ns *netsender.Sender
cmd *exec.Cmd
inputReader io.ReadCloser // setupInput holds the current approach to setting up
ffmpegStdin io.WriteCloser // the input stream.
outputChan chan []byte setupInput func() error
setupInput func() error
getFrame func() []byte // cmd is the exec'd process that may be used to produce
// the input stream.
// FIXME(kortschak): This should not exist. Replace this
// with a context.Context cancellation.
cmd *exec.Cmd
// lexTo and encoder handle transcoding the input stream.
lexTo func(dst stream.Encoder, src io.Reader, delay time.Duration) error
encoder stream.Encoder
// ringBuffer handles passing frames from the transcoder
// to the target destination.
ringBuffer *ring.Buffer
// destination is the target endpoint.
destination loadSender destination loadSender
rtmpInst rtmp.Session
bitrate int // bitrate hold the last send bitrate calculation result.
ns *netsender.Sender bitrate int
// isRunning is a loaded and cocked foot-gun.
isRunning bool
} }
// New returns a pointer to a new Revid with the desired configuration, and/or // New returns a pointer to a new Revid with the desired configuration, and/or
@ -121,7 +135,6 @@ func New(c Config, ns *netsender.Sender) (*Revid, error) {
return nil, err return nil, err
} }
r.ringBuffer = ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout) r.ringBuffer = ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout)
r.outputChan = make(chan []byte, outputChanSize)
return &r, nil return &r, nil
} }
@ -198,9 +211,6 @@ func (r *Revid) reset(config Config) error {
case None: case None:
// no packetisation - Revid output chan grabs raw data straight from parser // no packetisation - Revid output chan grabs raw data straight from parser
r.lexTo = func(dst stream.Encoder, src io.Reader, _ time.Duration) error { r.lexTo = func(dst stream.Encoder, src io.Reader, _ time.Duration) error {
// FIXME(kortschak): Reduce this allocation mess. It exists
// because we do not know that the dst will not modify the
// buffer. It shouldn't, but ...
for { for {
var b [4 << 10]byte var b [4 << 10]byte
n, rerr := src.Read(b[:]) n, rerr := src.Read(b[:])
@ -213,8 +223,7 @@ func (r *Revid) reset(config Config) error {
} }
} }
} }
r.getFrame = r.getFrameNoPacketization r.encoder = stream.NopEncoder()
return nil
case Mpegts: case Mpegts:
r.config.Logger.Log(smartlogger.Info, pkg+"using MPEGTS packetisation") r.config.Logger.Log(smartlogger.Info, pkg+"using MPEGTS packetisation")
frameRate, _ := strconv.ParseFloat(r.config.FrameRate, 64) frameRate, _ := strconv.ParseFloat(r.config.FrameRate, 64)
@ -224,9 +233,6 @@ func (r *Revid) reset(config Config) error {
frameRate, _ := strconv.Atoi(r.config.FrameRate) frameRate, _ := strconv.Atoi(r.config.FrameRate)
r.encoder = flv.NewEncoder(true, true, frameRate) r.encoder = flv.NewEncoder(true, true, frameRate)
} }
// We have packetization of some sort, so we want to send data to Generator
// to perform packetization
r.getFrame = r.getFramePacketization
return nil return nil
} }
@ -271,18 +277,6 @@ func (r *Revid) Stop() {
} }
} }
// getFrameNoPacketization gets a frame directly from the revid output chan
// as we don't need to go through the encoder with no packetization settings
func (r *Revid) getFrameNoPacketization() []byte {
return <-r.outputChan
}
// getFramePacketization gets a frame from the generators output chan - the
// the encoder being an mpegts or flv encoder depending on the config
func (r *Revid) getFramePacketization() []byte {
return <-r.encoder.Stream()
}
// TODO(kortschak): Factor this out to an io.Writer type and remove the Stream chans. // TODO(kortschak): Factor this out to an io.Writer type and remove the Stream chans.
// Also add a no-op encoder that handles non-packeted data. // Also add a no-op encoder that handles non-packeted data.
// //
@ -299,8 +293,7 @@ func (r *Revid) packClips() {
lenOfFrame := len(frame) lenOfFrame := len(frame)
if lenOfFrame > ringBufferElementSize { if lenOfFrame > ringBufferElementSize {
r.config.Logger.Log(smartlogger.Warning, pkg+"frame was too big", "frame size", lenOfFrame) r.config.Logger.Log(smartlogger.Warning, pkg+"frame was too big", "frame size", lenOfFrame)
frame = r.getFrame() continue
lenOfFrame = len(frame)
} }
_, err := r.ringBuffer.Write(frame) _, err := r.ringBuffer.Write(frame)
if err != nil { if err != nil {
@ -470,10 +463,9 @@ func (r *Revid) startRaspivid() error {
if err != nil { if err != nil {
return err return err
} }
r.inputReader = stdout
r.config.Logger.Log(smartlogger.Info, pkg+"reading camera data") r.config.Logger.Log(smartlogger.Info, pkg+"reading camera data")
err = r.lexTo(r.encoder, r.inputReader, delay) err = r.lexTo(r.encoder, stdout, delay)
if err != nil { if err != nil {
return err return err
} }

View File

@ -31,3 +31,23 @@ type Encoder interface {
Encode([]byte) error Encode([]byte) error
Stream() <-chan []byte Stream() <-chan []byte
} }
// NopEncoder returns an
func NopEncoder() Encoder {
return noop{make(chan []byte, 1)}
}
type noop struct {
dst chan []byte
}
func (e noop) Encode(p []byte) error {
b := make([]byte, len(p))
copy(b, p)
e.dst <- b
return nil
}
func (e noop) Stream() <-chan []byte {
return e.dst
}