diff --git a/revid/revid.go b/revid/revid.go index d78673d7..1b150f62 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -53,7 +53,7 @@ const ( clipDuration = 1 * time.Second mp2tPacketSize = 188 // MPEG-TS packet size mp2tMaxPackets = int(clipDuration * 2016 / time.Second) // # first multiple of 7 and 8 greater than 2000 - ringBufferSize = 500 + ringBufferSize = 10000 ringBufferElementSize = 150000 writeTimeout = 10 * time.Millisecond readTimeout = 10 * time.Millisecond @@ -109,12 +109,13 @@ type Revid struct { // with a context.Context cancellation. cmd *exec.Cmd - // lexTo and encoder handle transcoding the input stream. + // lexTo, encoder and packer handle transcoding the input stream. lexTo func(dst stream.Encoder, src io.Reader, delay time.Duration) error encoder stream.Encoder - // ringBuffer handles passing frames from the transcoder + packer packer + // buffer handles passing frames from the transcoder // to the target destination. - ringBuffer *ring.Buffer + buffer *ring.Buffer // destination is the target endpoint. destination loadSender @@ -125,16 +126,51 @@ type Revid struct { isRunning bool } +// packer takes data segments and packs them into clips +// of the number frames specified in the owners config. +type packer struct { + owner *Revid + + packetCount int +} + +// Write implements the io.Writer interface. +// +// Unless the ring buffer returns an error, all writes +// are deemed to be successful, although a successful +// write may include a dropped frame. +func (p *packer) Write(frame []byte) (int, error) { + if len(frame) > ringBufferElementSize { + p.owner.config.Logger.Log(smartlogger.Warning, pkg+"frame was too big", "frame size", len(frame)) + return len(frame), nil + } + n, err := p.owner.buffer.Write(frame) + if err != nil { + if err == ring.ErrDropped { + p.owner.config.Logger.Log(smartlogger.Warning, pkg+"dropped frame", "frame size", len(frame)) + return len(frame), nil + } + p.owner.config.Logger.Log(smartlogger.Error, pkg+"unexpected ring buffer write error", "error", err.Error()) + return n, err + } + p.packetCount++ + if p.packetCount >= p.owner.config.FramesPerClip { + p.owner.buffer.Flush() + p.packetCount = 0 + } + return len(frame), nil +} + // New returns a pointer to a new Revid with the desired configuration, and/or // an error if construction of the new instance was not successful. func New(c Config, ns *netsender.Sender) (*Revid, error) { - var r Revid - r.ns = ns + r := Revid{ns: ns} + r.buffer = ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout) + r.packer.owner = &r err := r.reset(c) if err != nil { return nil, err } - r.ringBuffer = ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout) return &r, nil } @@ -223,15 +259,18 @@ func (r *Revid) reset(config Config) error { } } } - r.encoder = stream.NopEncoder() + r.encoder = stream.NopEncoder(&r.packer) case Mpegts: r.config.Logger.Log(smartlogger.Info, pkg+"using MPEGTS packetisation") frameRate, _ := strconv.ParseFloat(r.config.FrameRate, 64) - r.encoder = mts.NewEncoder(frameRate) + r.encoder = mts.NewEncoder(&r.packer, frameRate) case Flv: r.config.Logger.Log(smartlogger.Info, pkg+"using FLV packetisation") frameRate, _ := strconv.Atoi(r.config.FrameRate) - r.encoder = flv.NewEncoder(true, true, frameRate) + r.encoder, err = flv.NewEncoder(&r.packer, true, true, frameRate) + if err != nil { + return err + } } return nil @@ -254,8 +293,6 @@ func (r *Revid) Start() { r.isRunning = true r.config.Logger.Log(smartlogger.Info, pkg+"starting output routine") go r.outputClips() - r.config.Logger.Log(smartlogger.Info, pkg+"starting clip packing routine") - go r.packClips() r.config.Logger.Log(smartlogger.Info, pkg+"setting up input and receiving content") go r.setupInput() } @@ -277,47 +314,6 @@ func (r *Revid) Stop() { } } -// TODO(kortschak): Factor this out to an io.Writer type and remove the Stream chans. -// Also add a no-op encoder that handles non-packeted data. -// -// packClips takes data segments; whether that be tsPackets or mjpeg frames and -// packs them into clips consisting of the amount frames specified in the config -func (r *Revid) packClips() { - clipSize := 0 - packetCount := 0 - for r.isRunning { - select { - // TODO: This is temporary, need to work out how to make this work - // for cases when there is not packetisation. - case frame := <-r.encoder.Stream(): - lenOfFrame := len(frame) - if lenOfFrame > ringBufferElementSize { - r.config.Logger.Log(smartlogger.Warning, pkg+"frame was too big", "frame size", lenOfFrame) - continue - } - _, err := r.ringBuffer.Write(frame) - if err != nil { - if err == ring.ErrDropped { - r.config.Logger.Log(smartlogger.Warning, pkg+"dropped frame", "frame size", len(frame)) - } else { - r.config.Logger.Log(smartlogger.Error, pkg+"unexpected ringbuffer write error", - "error", err.Error()) - } - } - packetCount++ - clipSize += lenOfFrame - if packetCount >= r.config.FramesPerClip { - r.ringBuffer.Flush() - clipSize = 0 - packetCount = 0 - continue - } - default: - time.Sleep(5 * time.Millisecond) - } - } -} - // outputClips takes the clips produced in the packClips method and outputs them // to the desired output defined in the revid config func (r *Revid) outputClips() { @@ -328,14 +324,14 @@ func (r *Revid) outputClips() { for r.isRunning { // Here we slow things down as much as we can to decrease cpu usage switch { - case r.ringBuffer.Len() < 2: + case r.buffer.Len() < 2: delay++ time.Sleep(time.Duration(delay) * time.Millisecond) case delay > 0: delay-- } - // If the ringbuffer has something we can read and send off - chunk, err := r.ringBuffer.Next(readTimeout) + // If the ring buffer has something we can read and send off + chunk, err := r.buffer.Next(readTimeout) if err != nil || !r.isRunning { if err == io.EOF { break @@ -388,7 +384,7 @@ func (r *Revid) outputClips() { r.destination.release() - r.config.Logger.Log(smartlogger.Debug, pkg+"done reading that clip from ringbuffer") + r.config.Logger.Log(smartlogger.Debug, pkg+"done reading that clip from ring buffer") // Log some information regarding bitrate and ring buffer size if it's time now = time.Now() @@ -397,7 +393,7 @@ func (r *Revid) outputClips() { // FIXME(kortschak): For subsecond deltaTime, this will give infinite bitrate. r.bitrate = int(float64(bytes*8) / float64(deltaTime/time.Second)) r.config.Logger.Log(smartlogger.Debug, pkg+"bitrate (bits/s)", "bitrate", r.bitrate) - r.config.Logger.Log(smartlogger.Debug, pkg+"ring buffer size", "value", r.ringBuffer.Len()) + r.config.Logger.Log(smartlogger.Debug, pkg+"ring buffer size", "value", r.buffer.Len()) prevTime = now bytes = 0 } diff --git a/stream/encoding.go b/stream/encoding.go index 8765472c..26f0e19b 100644 --- a/stream/encoding.go +++ b/stream/encoding.go @@ -27,27 +27,22 @@ LICENSE package stream +import "io" + type Encoder interface { Encode([]byte) error - Stream() <-chan []byte } // NopEncoder returns an -func NopEncoder() Encoder { - return noop{make(chan []byte, 1)} +func NopEncoder(dst io.Writer) Encoder { + return noop{dst} } type noop struct { - dst chan []byte + dst io.Writer } func (e noop) Encode(p []byte) error { - b := make([]byte, len(p)) - copy(b, p) - e.dst <- b - return nil -} - -func (e noop) Stream() <-chan []byte { - return e.dst + _, err := e.dst.Write(p) + return err } diff --git a/stream/flv/encoder.go b/stream/flv/encoder.go index 3a545677..e15552df 100644 --- a/stream/flv/encoder.go +++ b/stream/flv/encoder.go @@ -27,7 +27,10 @@ LICENSE */ package flv -import "time" +import ( + "io" + "time" +) const ( inputChanLength = 500 @@ -52,28 +55,30 @@ var ( // Encoder provides properties required for the generation of flv video // from raw video data type Encoder struct { - fps int - stream chan []byte - audio bool - video bool - lastTagSize int - header Header - startTime time.Time - firstTag bool - isGenerating bool + dst io.Writer + + fps int + audio bool + video bool + lastTagSize int + header Header + start time.Time } // NewEncoder retuns a new FLV encoder. -func NewEncoder(audio, video bool, fps int) *Encoder { +func NewEncoder(dst io.Writer, audio, video bool, fps int) (*Encoder, error) { e := Encoder{ - fps: fps, - audio: audio, - video: video, - stream: make(chan []byte, outputChanLength), - firstTag: true, + dst: dst, + fps: fps, + audio: audio, + video: video, } - e.stream <- e.HeaderBytes() - return &e + // TODO(kortschak): Do this lazily. + _, err := e.dst.Write(e.HeaderBytes()) + if err != nil { + return nil, err + } + return &e, nil } // HeaderBytes returns the a @@ -85,19 +90,13 @@ func (e *Encoder) HeaderBytes() []byte { return header.Bytes() } -// Stream returns a channel of streaming packets. -func (e *Encoder) Stream() <-chan []byte { - return e.stream -} - // getNextTimestamp generates and returns the next timestamp based on current time func (e *Encoder) getNextTimestamp() (timestamp uint32) { - if e.firstTag { - e.startTime = time.Now() - e.firstTag = false + if e.start.IsZero() { + e.start = time.Now() return 0 } - return uint32(time.Now().Sub(e.startTime).Seconds() * float64(1000)) + return uint32(time.Now().Sub(e.start).Seconds() * float64(1000)) } // http://www.itu.int/rec/dologin_pub.asp?lang=e&id=T-REC-H.264-200305-S!!PDF-E&type=items @@ -221,7 +220,10 @@ func (e *Encoder) Encode(frame []byte) error { Data: frame, PrevTagSize: uint32(videoHeaderSize + len(frame)), } - e.stream <- tag.Bytes() + _, err := e.dst.Write(tag.Bytes()) + if err != nil { + return err + } } // Do we even have some audio to send off ? if e.audio { @@ -239,7 +241,10 @@ func (e *Encoder) Encode(frame []byte) error { Data: dummyAudioTag1Data, PrevTagSize: uint32(audioSize), } - e.stream <- tag.Bytes() + _, err := e.dst.Write(tag.Bytes()) + if err != nil { + return err + } tag = AudioTag{ TagType: uint8(AudioTagType), @@ -253,7 +258,10 @@ func (e *Encoder) Encode(frame []byte) error { Data: dummyAudioTag2Data, PrevTagSize: uint32(22), } - e.stream <- tag.Bytes() + _, err = e.dst.Write(tag.Bytes()) + if err != nil { + return err + } } return nil diff --git a/stream/mts/encoder.go b/stream/mts/encoder.go index 05720139..3100c96e 100644 --- a/stream/mts/encoder.go +++ b/stream/mts/encoder.go @@ -31,6 +31,7 @@ package mts import ( "encoding/binary" "hash/crc32" + "io" "math/bits" "time" @@ -156,7 +157,7 @@ const ( // Encoder encapsulates properties of an mpegts generator. type Encoder struct { - stream chan []byte + dst io.Writer clock time.Duration frameInterval time.Duration @@ -166,9 +167,9 @@ type Encoder struct { } // NewEncoder returns an Encoder with the specified frame rate. -func NewEncoder(fps float64) *Encoder { +func NewEncoder(dst io.Writer, fps float64) *Encoder { return &Encoder{ - stream: make(chan []byte, 1), + dst: dst, frameInterval: time.Duration(float64(time.Second) / fps), ptsOffset: ptsOffset, @@ -181,11 +182,6 @@ func NewEncoder(fps float64) *Encoder { } } -// Stream returns a channel of streaming packets. -func (e *Encoder) Stream() <-chan []byte { - return e.stream -} - const ( hasPayload = 0x1 hasAdaptationField = 0x2 @@ -207,7 +203,10 @@ func (e *Encoder) Encode(nalu []byte) error { AFC: hasPayload, Payload: patTable, } - e.stream <- patPkt.Bytes() + _, err := e.dst.Write(patPkt.Bytes()) + if err != nil { + return err + } // Write PMT. pmtPkt := Packet{ @@ -217,7 +216,10 @@ func (e *Encoder) Encode(nalu []byte) error { AFC: hasPayload, Payload: pmtTable, } - e.stream <- pmtPkt.Bytes() + _, err = e.dst.Write(pmtPkt.Bytes()) + if err != nil { + return err + } // Prepare PES data. pesPkt := pes.Packet{ @@ -249,7 +251,10 @@ func (e *Encoder) Encode(nalu []byte) error { pusi = false } - e.stream <- pkt.Bytes() + _, err := e.dst.Write(pkt.Bytes()) + if err != nil { + return err + } } e.tick()