revid,ring: plumb in ring buffer with WriteTo

There is a residual scar of an intermediate []byte copy because of the
signature of http.NewRequest, but this can be addressed later. The rtmp
interface needed alteration so that rtmp.Session satisfies io.Writer.
This commit is contained in:
Dan Kortschak 2018-06-04 11:29:35 +09:30
parent b8d804e4ec
commit b7b000aeac
3 changed files with 62 additions and 58 deletions

View File

@ -115,7 +115,7 @@ type revid struct {
setupInput func() error setupInput func() error
setupOutput func() error setupOutput func() error
getFrame func() []byte getFrame func() []byte
sendClip func(clip []byte) error sendClip func(*ring.Chunk) error
rtmpInst rtmp.Session rtmpInst rtmp.Session
mutex sync.Mutex mutex sync.Mutex
sendMutex sync.Mutex sendMutex sync.Mutex
@ -163,8 +163,8 @@ func (r *revid) changeState(config Config) error {
switch r.config.Output { switch r.config.Output {
case File: case File:
r.sendClip = r.sendClipToFile
r.setupOutput = r.setupOutputForFile r.setupOutput = r.setupOutputForFile
r.sendClip = r.sendClipToFile
case FfmpegRtmp: case FfmpegRtmp:
r.setupOutput = r.setupOutputForFfmpegRtmp r.setupOutput = r.setupOutputForFfmpegRtmp
r.sendClip = r.sendClipToFfmpegRtmp r.sendClip = r.sendClipToFfmpegRtmp
@ -395,7 +395,7 @@ func (r *revid) outputClips() {
delay-- delay--
} }
// If the ringbuffer has something we can read and send off // If the ringbuffer has something we can read and send off
err := r.ringBuffer.Next(readTimeout) chunk, err := r.ringBuffer.Next(readTimeout)
if err != nil || !r.isRunning { if err != nil || !r.isRunning {
if err == io.EOF { if err == io.EOF {
break break
@ -403,31 +403,17 @@ func (r *revid) outputClips() {
continue continue
} }
// TODO(kortchak): Do this without copy/allocate. bytes += chunk.Len()
// To address this, the io.ReadWriters held by the ring buffer
// should be io.WriterTos with a modification such that there
// is the capacity to reset for a retry or not keep offset
// state between WriteTo calls. Then the sendClip field should
// be made a func(io.WriterTo) (int error).
// This requires that bytes.Buffer not be used as the ring
// element type.
clip, err := ioutil.ReadAll(r.ringBuffer)
if err != nil || !r.isRunning {
r.Log(Error, err.Error())
continue
}
bytes += len(clip)
r.Log(Detail, "About to send") r.Log(Detail, "About to send")
err = r.sendClip(clip) err = r.sendClip(chunk)
if err == nil { if err == nil {
r.Log(Detail, "Sent clip") r.Log(Detail, "Sent clip")
} }
if r.isRunning && err != nil && len(clip) > 11 { if r.isRunning && err != nil && chunk.Len() > 11 {
r.Log(Debug, "Send failed! Trying again") r.Log(Debug, "Send failed! Trying again")
// Try and send again // Try and send again
err = r.sendClip(clip) err = r.sendClip(chunk)
// if there's still an error we try and reconnect, unless we're stopping // if there's still an error we try and reconnect, unless we're stopping
for r.isRunning && err != nil { for r.isRunning && err != nil {
@ -446,10 +432,11 @@ func (r *revid) outputClips() {
} }
r.Log(Debug, "Trying to send again with new connection...") r.Log(Debug, "Trying to send again with new connection...")
r.sendClip(clip) // TODO(kortschak): Log these errors? r.sendClip(chunk) // TODO(kortschak): Log these errors?
} }
} }
chunk.Close() // ring.Chunk is an io.Closer, but Close alwats returns nil.
r.Log(Detail, "Done reading that clip from ringbuffer...") r.Log(Detail, "Done reading that clip from ringbuffer...")
// Log some information regarding bitrate and ring buffer size if it's time // Log some information regarding bitrate and ring buffer size if it's time
@ -467,54 +454,70 @@ func (r *revid) outputClips() {
} }
// senClipToFile writes the passed clip to a file // senClipToFile writes the passed clip to a file
func (r *revid) sendClipToFile(clip []byte) error { func (r *revid) sendClipToFile(clip *ring.Chunk) error {
r.sendMutex.Lock() r.sendMutex.Lock()
defer r.sendMutex.Unlock() _, err := clip.WriteTo(r.outputFile)
_, err := r.outputFile.Write(clip) r.sendMutex.Unlock()
if err != nil {
return err return err
}
return nil
} }
// sendClipToHTTP takes a clip and an output url and posts through http. // sendClipToHTTP takes a clip and an output url and posts through http.
func (r *revid) sendClipToHTTP(clip []byte) error { func (r *revid) sendClipToHTTP(clip *ring.Chunk) error {
r.sendMutex.Lock()
defer r.sendMutex.Unlock() defer r.sendMutex.Unlock()
r.sendMutex.Lock()
timeout := time.Duration(httpTimeOut * time.Second) timeout := time.Duration(httpTimeOut * time.Second)
client := http.Client{Timeout: timeout} client := http.Client{Timeout: timeout}
url := r.config.HttpAddress + strconv.Itoa(len(clip)) url := r.config.HttpAddress + strconv.Itoa(clip.Len())
r.Log(Debug, fmt.Sprintf("Posting %s (%d bytes)\n", url, len(clip)))
resp, err := client.Post(url, "video/mp2t", bytes.NewReader(clip)) // FIXME(kortschak): This is necessary because Post takes
// an io.Reader as a parameter and closes it if it is an
// io.Closer (which *ring.Chunk is), ... and because we
// use a method value for dispatching the sendClip work.
// So to save work in this case, sendClip should be made
// a proper method with a behaviour switch based on a
// revid field so that we can prepare these bytes only
// once for each clip (reusing a buffer field? or tt
// might be work using a sync.Pool for the bodies).
post := bytes.NewBuffer(make([]byte, 0, clip.Len()))
_, err := clip.WriteTo(post)
if err != nil {
return fmt.Errorf("Error buffering: %v", err)
}
r.Log(Debug, fmt.Sprintf("Posting %s (%d bytes)\n", url, clip.Len()))
resp, err := client.Post(url, "video/mp2t", post)
if err != nil { if err != nil {
return fmt.Errorf("Error posting to %s: %s", url, err) return fmt.Errorf("Error posting to %s: %s", url, err)
} }
defer resp.Body.Close() defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body) body, err := ioutil.ReadAll(resp.Body)
if err == nil { if err == nil {
r.Log(Debug, fmt.Sprintf("%s\n", body)) r.Log(Debug, fmt.Sprintf("%s\n", body))
} else { } else {
r.Log(Error, err.Error()) r.Log(Error, err.Error())
} }
return nil return nil
} }
// sendClipToFfmpegRtmp sends the clip over the current rtmp connection using // sendClipToFfmpegRtmp sends the clip over the current rtmp connection using
// an ffmpeg process. // an ffmpeg process.
func (r *revid) sendClipToFfmpegRtmp(clip []byte) (err error) { func (r *revid) sendClipToFfmpegRtmp(clip *ring.Chunk) error {
r.sendMutex.Lock() r.sendMutex.Lock()
defer r.sendMutex.Unlock() _, err := clip.WriteTo(r.ffmpegStdin)
_, err = r.ffmpegStdin.Write(clip) r.sendMutex.Unlock()
return return err
} }
// sendClipToLibRtmp send the clip over the current rtmp connection using the // sendClipToLibRtmp send the clip over the current rtmp connection using the
// c based librtmp library // c based librtmp library
func (r *revid) sendClipToLibRtmp(clip []byte) (err error) { func (r *revid) sendClipToLibRtmp(clip *ring.Chunk) error {
r.sendMutex.Lock() r.sendMutex.Lock()
defer r.sendMutex.Unlock() _, err := clip.WriteTo(r.rtmpInst)
err = r.rtmpInst.Write(clip) r.sendMutex.Unlock()
return return err
} }
// setupOutputForFfmpegRtmp sets up output to rtmp using an ffmpeg process // setupOutputForFfmpegRtmp sets up output to rtmp using an ffmpeg process

View File

@ -104,13 +104,13 @@ func (b *Buffer) Write(p []byte) (int, error) {
if len(p) > b.head.cap() { if len(p) > b.head.cap() {
return 0, ErrTooLong return 0, ErrTooLong
} }
if len(p) > b.head.cap()-b.head.len() { if len(p) > b.head.cap()-b.head.Len() {
b.full <- b.head b.full <- b.head
b.head = nil b.head = nil
return b.Write(p) return b.Write(p)
} }
n, err := b.head.write(p) n, err := b.head.write(p)
if b.head.cap()-b.head.len() == 0 { if b.head.cap()-b.head.Len() == 0 {
b.full <- b.head b.full <- b.head
b.head = nil b.head = nil
} }
@ -179,7 +179,7 @@ func (b *Buffer) Read(p []byte) (int, error) {
return 0, io.EOF return 0, io.EOF
} }
n, err := b.tail.read(p) n, err := b.tail.read(p)
if b.tail.len() == 0 { if b.tail.Len() == 0 {
b.tail.reset() b.tail.reset()
b.empty <- b.tail b.empty <- b.tail
b.tail = nil b.tail = nil
@ -200,7 +200,8 @@ func newChunk(buf []byte) *Chunk {
return &Chunk{buf: buf[:0]} return &Chunk{buf: buf[:0]}
} }
func (b *Chunk) len() int { // Len returns the number of bytes held in the chunk.
func (b *Chunk) Len() int {
return len(b.buf) - b.off return len(b.buf) - b.off
} }
@ -228,7 +229,7 @@ func (b *Chunk) write(p []byte) (n int, err error) {
} }
func (b *Chunk) read(p []byte) (n int, err error) { func (b *Chunk) read(p []byte) (n int, err error) {
if b.len() <= 0 { if b.Len() <= 0 {
if len(p) == 0 { if len(p) == 0 {
return 0, nil return 0, nil
} }
@ -246,18 +247,18 @@ func (b *Chunk) read(p []byte) (n int, err error) {
// //
// WriteTo will panic if the Chunk has not been obtained through a call to Buffer.Next or // WriteTo will panic if the Chunk has not been obtained through a call to Buffer.Next or
// has been closed. WriteTo must be used in the same goroutine as the call to Next. // has been closed. WriteTo must be used in the same goroutine as the call to Next.
func (b *Chunk) WriteTo(w io.Writer) (n int, err error) { func (b *Chunk) WriteTo(w io.Writer) (n int64, err error) {
if b.owner == nil || b.owner.tail != b { if b.owner == nil || b.owner.tail != b {
panic("ring: invalid use of ring buffer chunk") panic("ring: invalid use of ring buffer chunk")
} }
n, err = w.Write(b.buf) _n, err := w.Write(b.buf)
if n > len(b.buf) { if _n > len(b.buf) {
panic("ring: invalid byte count") panic("ring: invalid byte count")
} }
if n != len(b.buf) { if _n != len(b.buf) {
return n, io.ErrShortWrite return int64(_n), io.ErrShortWrite
} }
return n, nil return int64(_n), nil
} }
// Close closes the Chunk, reseting its data and releasing it back to the Buffer. A Chunk // Close closes the Chunk, reseting its data and releasing it back to the Buffer. A Chunk

View File

@ -49,7 +49,7 @@ import (
// Session provides an interface for sending flv tags over rtmp. // Session provides an interface for sending flv tags over rtmp.
type Session interface { type Session interface {
StartSession() error StartSession() error
Write([]byte) error Write([]byte) (int, error)
Close() error Close() error
} }
@ -85,14 +85,14 @@ func (s *session) StartSession() error {
} }
// Write writes a frame (flv tag) to the rtmp connection // Write writes a frame (flv tag) to the rtmp connection
func (s *session) Write(data []byte) error { func (s *session) Write(data []byte) (int, error) {
if s.rtmp == nil { if s.rtmp == nil {
return errors.New("rtmp: attempt to write to non-running session") return 0, errors.New("rtmp: attempt to write to non-running session")
} }
if C.write_frame(s.rtmp, (*C.char)(unsafe.Pointer(&data[0])), C.uint(len(data))) != 0 { if C.write_frame(s.rtmp, (*C.char)(unsafe.Pointer(&data[0])), C.uint(len(data))) != 0 {
return errors.New("RTMP write error! Check rtmp log for details!") return 0, errors.New("RTMP write error! Check rtmp log for details!")
} }
return nil return len(data), nil
} }
// Close terminates the rtmp connection // Close terminates the rtmp connection