Merged in remove-stream-chan (pull request #68)

revid: remove stream chan

* revid,stream: factor chan operations out into a separate type

* revid: remove stream chan

* revid: reduce label length

Approved-by: Alan Noble <anoble@gmail.com>
This commit is contained in:
kortschak 2018-10-19 00:50:08 +00:00
parent 4f5a47b1e5
commit cec4f3803f
4 changed files with 115 additions and 111 deletions

View File

@ -53,7 +53,7 @@ const (
clipDuration = 1 * time.Second clipDuration = 1 * time.Second
mp2tPacketSize = 188 // MPEG-TS packet size mp2tPacketSize = 188 // MPEG-TS packet size
mp2tMaxPackets = int(clipDuration * 2016 / time.Second) // # first multiple of 7 and 8 greater than 2000 mp2tMaxPackets = int(clipDuration * 2016 / time.Second) // # first multiple of 7 and 8 greater than 2000
ringBufferSize = 500 ringBufferSize = 10000
ringBufferElementSize = 150000 ringBufferElementSize = 150000
writeTimeout = 10 * time.Millisecond writeTimeout = 10 * time.Millisecond
readTimeout = 10 * time.Millisecond readTimeout = 10 * time.Millisecond
@ -109,12 +109,13 @@ type Revid struct {
// with a context.Context cancellation. // with a context.Context cancellation.
cmd *exec.Cmd cmd *exec.Cmd
// lexTo and encoder handle transcoding the input stream. // lexTo, encoder and packer handle transcoding the input stream.
lexTo func(dst stream.Encoder, src io.Reader, delay time.Duration) error lexTo func(dst stream.Encoder, src io.Reader, delay time.Duration) error
encoder stream.Encoder encoder stream.Encoder
// ringBuffer handles passing frames from the transcoder packer packer
// buffer handles passing frames from the transcoder
// to the target destination. // to the target destination.
ringBuffer *ring.Buffer buffer *ring.Buffer
// destination is the target endpoint. // destination is the target endpoint.
destination loadSender destination loadSender
@ -125,16 +126,51 @@ type Revid struct {
isRunning bool isRunning bool
} }
// packer takes data segments and packs them into clips
// of the number frames specified in the owners config.
type packer struct {
owner *Revid
packetCount int
}
// Write implements the io.Writer interface.
//
// Unless the ring buffer returns an error, all writes
// are deemed to be successful, although a successful
// write may include a dropped frame.
func (p *packer) Write(frame []byte) (int, error) {
if len(frame) > ringBufferElementSize {
p.owner.config.Logger.Log(smartlogger.Warning, pkg+"frame was too big", "frame size", len(frame))
return len(frame), nil
}
n, err := p.owner.buffer.Write(frame)
if err != nil {
if err == ring.ErrDropped {
p.owner.config.Logger.Log(smartlogger.Warning, pkg+"dropped frame", "frame size", len(frame))
return len(frame), nil
}
p.owner.config.Logger.Log(smartlogger.Error, pkg+"unexpected ring buffer write error", "error", err.Error())
return n, err
}
p.packetCount++
if p.packetCount >= p.owner.config.FramesPerClip {
p.owner.buffer.Flush()
p.packetCount = 0
}
return len(frame), nil
}
// New returns a pointer to a new Revid with the desired configuration, and/or // New returns a pointer to a new Revid with the desired configuration, and/or
// an error if construction of the new instance was not successful. // an error if construction of the new instance was not successful.
func New(c Config, ns *netsender.Sender) (*Revid, error) { func New(c Config, ns *netsender.Sender) (*Revid, error) {
var r Revid r := Revid{ns: ns}
r.ns = ns r.buffer = ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout)
r.packer.owner = &r
err := r.reset(c) err := r.reset(c)
if err != nil { if err != nil {
return nil, err return nil, err
} }
r.ringBuffer = ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout)
return &r, nil return &r, nil
} }
@ -223,15 +259,18 @@ func (r *Revid) reset(config Config) error {
} }
} }
} }
r.encoder = stream.NopEncoder() r.encoder = stream.NopEncoder(&r.packer)
case Mpegts: case Mpegts:
r.config.Logger.Log(smartlogger.Info, pkg+"using MPEGTS packetisation") r.config.Logger.Log(smartlogger.Info, pkg+"using MPEGTS packetisation")
frameRate, _ := strconv.ParseFloat(r.config.FrameRate, 64) frameRate, _ := strconv.ParseFloat(r.config.FrameRate, 64)
r.encoder = mts.NewEncoder(frameRate) r.encoder = mts.NewEncoder(&r.packer, frameRate)
case Flv: case Flv:
r.config.Logger.Log(smartlogger.Info, pkg+"using FLV packetisation") r.config.Logger.Log(smartlogger.Info, pkg+"using FLV packetisation")
frameRate, _ := strconv.Atoi(r.config.FrameRate) frameRate, _ := strconv.Atoi(r.config.FrameRate)
r.encoder = flv.NewEncoder(true, true, frameRate) r.encoder, err = flv.NewEncoder(&r.packer, true, true, frameRate)
if err != nil {
return err
}
} }
return nil return nil
@ -254,8 +293,6 @@ func (r *Revid) Start() {
r.isRunning = true r.isRunning = true
r.config.Logger.Log(smartlogger.Info, pkg+"starting output routine") r.config.Logger.Log(smartlogger.Info, pkg+"starting output routine")
go r.outputClips() go r.outputClips()
r.config.Logger.Log(smartlogger.Info, pkg+"starting clip packing routine")
go r.packClips()
r.config.Logger.Log(smartlogger.Info, pkg+"setting up input and receiving content") r.config.Logger.Log(smartlogger.Info, pkg+"setting up input and receiving content")
go r.setupInput() go r.setupInput()
} }
@ -277,47 +314,6 @@ func (r *Revid) Stop() {
} }
} }
// TODO(kortschak): Factor this out to an io.Writer type and remove the Stream chans.
// Also add a no-op encoder that handles non-packeted data.
//
// packClips takes data segments; whether that be tsPackets or mjpeg frames and
// packs them into clips consisting of the amount frames specified in the config
func (r *Revid) packClips() {
clipSize := 0
packetCount := 0
for r.isRunning {
select {
// TODO: This is temporary, need to work out how to make this work
// for cases when there is not packetisation.
case frame := <-r.encoder.Stream():
lenOfFrame := len(frame)
if lenOfFrame > ringBufferElementSize {
r.config.Logger.Log(smartlogger.Warning, pkg+"frame was too big", "frame size", lenOfFrame)
continue
}
_, err := r.ringBuffer.Write(frame)
if err != nil {
if err == ring.ErrDropped {
r.config.Logger.Log(smartlogger.Warning, pkg+"dropped frame", "frame size", len(frame))
} else {
r.config.Logger.Log(smartlogger.Error, pkg+"unexpected ringbuffer write error",
"error", err.Error())
}
}
packetCount++
clipSize += lenOfFrame
if packetCount >= r.config.FramesPerClip {
r.ringBuffer.Flush()
clipSize = 0
packetCount = 0
continue
}
default:
time.Sleep(5 * time.Millisecond)
}
}
}
// outputClips takes the clips produced in the packClips method and outputs them // outputClips takes the clips produced in the packClips method and outputs them
// to the desired output defined in the revid config // to the desired output defined in the revid config
func (r *Revid) outputClips() { func (r *Revid) outputClips() {
@ -328,14 +324,14 @@ func (r *Revid) outputClips() {
for r.isRunning { for r.isRunning {
// Here we slow things down as much as we can to decrease cpu usage // Here we slow things down as much as we can to decrease cpu usage
switch { switch {
case r.ringBuffer.Len() < 2: case r.buffer.Len() < 2:
delay++ delay++
time.Sleep(time.Duration(delay) * time.Millisecond) time.Sleep(time.Duration(delay) * time.Millisecond)
case delay > 0: case delay > 0:
delay-- delay--
} }
// If the ringbuffer has something we can read and send off // If the ring buffer has something we can read and send off
chunk, err := r.ringBuffer.Next(readTimeout) chunk, err := r.buffer.Next(readTimeout)
if err != nil || !r.isRunning { if err != nil || !r.isRunning {
if err == io.EOF { if err == io.EOF {
break break
@ -388,7 +384,7 @@ func (r *Revid) outputClips() {
r.destination.release() r.destination.release()
r.config.Logger.Log(smartlogger.Debug, pkg+"done reading that clip from ringbuffer") r.config.Logger.Log(smartlogger.Debug, pkg+"done reading that clip from ring buffer")
// Log some information regarding bitrate and ring buffer size if it's time // Log some information regarding bitrate and ring buffer size if it's time
now = time.Now() now = time.Now()
@ -397,7 +393,7 @@ func (r *Revid) outputClips() {
// FIXME(kortschak): For subsecond deltaTime, this will give infinite bitrate. // FIXME(kortschak): For subsecond deltaTime, this will give infinite bitrate.
r.bitrate = int(float64(bytes*8) / float64(deltaTime/time.Second)) r.bitrate = int(float64(bytes*8) / float64(deltaTime/time.Second))
r.config.Logger.Log(smartlogger.Debug, pkg+"bitrate (bits/s)", "bitrate", r.bitrate) r.config.Logger.Log(smartlogger.Debug, pkg+"bitrate (bits/s)", "bitrate", r.bitrate)
r.config.Logger.Log(smartlogger.Debug, pkg+"ring buffer size", "value", r.ringBuffer.Len()) r.config.Logger.Log(smartlogger.Debug, pkg+"ring buffer size", "value", r.buffer.Len())
prevTime = now prevTime = now
bytes = 0 bytes = 0
} }

View File

@ -27,27 +27,22 @@ LICENSE
package stream package stream
import "io"
type Encoder interface { type Encoder interface {
Encode([]byte) error Encode([]byte) error
Stream() <-chan []byte
} }
// NopEncoder returns an // NopEncoder returns an
func NopEncoder() Encoder { func NopEncoder(dst io.Writer) Encoder {
return noop{make(chan []byte, 1)} return noop{dst}
} }
type noop struct { type noop struct {
dst chan []byte dst io.Writer
} }
func (e noop) Encode(p []byte) error { func (e noop) Encode(p []byte) error {
b := make([]byte, len(p)) _, err := e.dst.Write(p)
copy(b, p) return err
e.dst <- b
return nil
}
func (e noop) Stream() <-chan []byte {
return e.dst
} }

View File

@ -27,7 +27,10 @@ LICENSE
*/ */
package flv package flv
import "time" import (
"io"
"time"
)
const ( const (
inputChanLength = 500 inputChanLength = 500
@ -52,28 +55,30 @@ var (
// Encoder provides properties required for the generation of flv video // Encoder provides properties required for the generation of flv video
// from raw video data // from raw video data
type Encoder struct { type Encoder struct {
fps int dst io.Writer
stream chan []byte
audio bool fps int
video bool audio bool
lastTagSize int video bool
header Header lastTagSize int
startTime time.Time header Header
firstTag bool start time.Time
isGenerating bool
} }
// NewEncoder retuns a new FLV encoder. // NewEncoder retuns a new FLV encoder.
func NewEncoder(audio, video bool, fps int) *Encoder { func NewEncoder(dst io.Writer, audio, video bool, fps int) (*Encoder, error) {
e := Encoder{ e := Encoder{
fps: fps, dst: dst,
audio: audio, fps: fps,
video: video, audio: audio,
stream: make(chan []byte, outputChanLength), video: video,
firstTag: true,
} }
e.stream <- e.HeaderBytes() // TODO(kortschak): Do this lazily.
return &e _, err := e.dst.Write(e.HeaderBytes())
if err != nil {
return nil, err
}
return &e, nil
} }
// HeaderBytes returns the a // HeaderBytes returns the a
@ -85,19 +90,13 @@ func (e *Encoder) HeaderBytes() []byte {
return header.Bytes() return header.Bytes()
} }
// Stream returns a channel of streaming packets.
func (e *Encoder) Stream() <-chan []byte {
return e.stream
}
// getNextTimestamp generates and returns the next timestamp based on current time // getNextTimestamp generates and returns the next timestamp based on current time
func (e *Encoder) getNextTimestamp() (timestamp uint32) { func (e *Encoder) getNextTimestamp() (timestamp uint32) {
if e.firstTag { if e.start.IsZero() {
e.startTime = time.Now() e.start = time.Now()
e.firstTag = false
return 0 return 0
} }
return uint32(time.Now().Sub(e.startTime).Seconds() * float64(1000)) return uint32(time.Now().Sub(e.start).Seconds() * float64(1000))
} }
// http://www.itu.int/rec/dologin_pub.asp?lang=e&id=T-REC-H.264-200305-S!!PDF-E&type=items // http://www.itu.int/rec/dologin_pub.asp?lang=e&id=T-REC-H.264-200305-S!!PDF-E&type=items
@ -221,7 +220,10 @@ func (e *Encoder) Encode(frame []byte) error {
Data: frame, Data: frame,
PrevTagSize: uint32(videoHeaderSize + len(frame)), PrevTagSize: uint32(videoHeaderSize + len(frame)),
} }
e.stream <- tag.Bytes() _, err := e.dst.Write(tag.Bytes())
if err != nil {
return err
}
} }
// Do we even have some audio to send off ? // Do we even have some audio to send off ?
if e.audio { if e.audio {
@ -239,7 +241,10 @@ func (e *Encoder) Encode(frame []byte) error {
Data: dummyAudioTag1Data, Data: dummyAudioTag1Data,
PrevTagSize: uint32(audioSize), PrevTagSize: uint32(audioSize),
} }
e.stream <- tag.Bytes() _, err := e.dst.Write(tag.Bytes())
if err != nil {
return err
}
tag = AudioTag{ tag = AudioTag{
TagType: uint8(AudioTagType), TagType: uint8(AudioTagType),
@ -253,7 +258,10 @@ func (e *Encoder) Encode(frame []byte) error {
Data: dummyAudioTag2Data, Data: dummyAudioTag2Data,
PrevTagSize: uint32(22), PrevTagSize: uint32(22),
} }
e.stream <- tag.Bytes() _, err = e.dst.Write(tag.Bytes())
if err != nil {
return err
}
} }
return nil return nil

View File

@ -31,6 +31,7 @@ package mts
import ( import (
"encoding/binary" "encoding/binary"
"hash/crc32" "hash/crc32"
"io"
"math/bits" "math/bits"
"time" "time"
@ -156,7 +157,7 @@ const (
// Encoder encapsulates properties of an mpegts generator. // Encoder encapsulates properties of an mpegts generator.
type Encoder struct { type Encoder struct {
stream chan []byte dst io.Writer
clock time.Duration clock time.Duration
frameInterval time.Duration frameInterval time.Duration
@ -166,9 +167,9 @@ type Encoder struct {
} }
// NewEncoder returns an Encoder with the specified frame rate. // NewEncoder returns an Encoder with the specified frame rate.
func NewEncoder(fps float64) *Encoder { func NewEncoder(dst io.Writer, fps float64) *Encoder {
return &Encoder{ return &Encoder{
stream: make(chan []byte, 1), dst: dst,
frameInterval: time.Duration(float64(time.Second) / fps), frameInterval: time.Duration(float64(time.Second) / fps),
ptsOffset: ptsOffset, ptsOffset: ptsOffset,
@ -181,11 +182,6 @@ func NewEncoder(fps float64) *Encoder {
} }
} }
// Stream returns a channel of streaming packets.
func (e *Encoder) Stream() <-chan []byte {
return e.stream
}
const ( const (
hasPayload = 0x1 hasPayload = 0x1
hasAdaptationField = 0x2 hasAdaptationField = 0x2
@ -207,7 +203,10 @@ func (e *Encoder) Encode(nalu []byte) error {
AFC: hasPayload, AFC: hasPayload,
Payload: patTable, Payload: patTable,
} }
e.stream <- patPkt.Bytes() _, err := e.dst.Write(patPkt.Bytes())
if err != nil {
return err
}
// Write PMT. // Write PMT.
pmtPkt := Packet{ pmtPkt := Packet{
@ -217,7 +216,10 @@ func (e *Encoder) Encode(nalu []byte) error {
AFC: hasPayload, AFC: hasPayload,
Payload: pmtTable, Payload: pmtTable,
} }
e.stream <- pmtPkt.Bytes() _, err = e.dst.Write(pmtPkt.Bytes())
if err != nil {
return err
}
// Prepare PES data. // Prepare PES data.
pesPkt := pes.Packet{ pesPkt := pes.Packet{
@ -249,7 +251,10 @@ func (e *Encoder) Encode(nalu []byte) error {
pusi = false pusi = false
} }
e.stream <- pkt.Bytes() _, err := e.dst.Write(pkt.Bytes())
if err != nil {
return err
}
} }
e.tick() e.tick()