diff --git a/revid/Revid.go b/revid/Revid.go index a43b46de..fbb9b1af 100644 --- a/revid/Revid.go +++ b/revid/Revid.go @@ -115,7 +115,7 @@ type revid struct { setupInput func() error setupOutput func() error getFrame func() []byte - sendClip func(clip []byte) error + sendClip func(*ring.Chunk) error rtmpInst rtmp.Session mutex sync.Mutex sendMutex sync.Mutex @@ -163,8 +163,8 @@ func (r *revid) changeState(config Config) error { switch r.config.Output { case File: - r.sendClip = r.sendClipToFile r.setupOutput = r.setupOutputForFile + r.sendClip = r.sendClipToFile case FfmpegRtmp: r.setupOutput = r.setupOutputForFfmpegRtmp r.sendClip = r.sendClipToFfmpegRtmp @@ -395,7 +395,7 @@ func (r *revid) outputClips() { delay-- } // If the ringbuffer has something we can read and send off - err := r.ringBuffer.Next(readTimeout) + chunk, err := r.ringBuffer.Next(readTimeout) if err != nil || !r.isRunning { if err == io.EOF { break @@ -403,31 +403,17 @@ func (r *revid) outputClips() { continue } - // TODO(kortchak): Do this without copy/allocate. - // To address this, the io.ReadWriters held by the ring buffer - // should be io.WriterTos with a modification such that there - // is the capacity to reset for a retry or not keep offset - // state between WriteTo calls. Then the sendClip field should - // be made a func(io.WriterTo) (int error). - // This requires that bytes.Buffer not be used as the ring - // element type. - clip, err := ioutil.ReadAll(r.ringBuffer) - if err != nil || !r.isRunning { - r.Log(Error, err.Error()) - continue - } - - bytes += len(clip) + bytes += chunk.Len() r.Log(Detail, "About to send") - err = r.sendClip(clip) + err = r.sendClip(chunk) if err == nil { r.Log(Detail, "Sent clip") } - if r.isRunning && err != nil && len(clip) > 11 { + if r.isRunning && err != nil && chunk.Len() > 11 { r.Log(Debug, "Send failed! Trying again") // Try and send again - err = r.sendClip(clip) + err = r.sendClip(chunk) // if there's still an error we try and reconnect, unless we're stopping for r.isRunning && err != nil { @@ -446,10 +432,11 @@ func (r *revid) outputClips() { } r.Log(Debug, "Trying to send again with new connection...") - r.sendClip(clip) // TODO(kortschak): Log these errors? + r.sendClip(chunk) // TODO(kortschak): Log these errors? } } + chunk.Close() // ring.Chunk is an io.Closer, but Close alwats returns nil. r.Log(Detail, "Done reading that clip from ringbuffer...") // Log some information regarding bitrate and ring buffer size if it's time @@ -467,54 +454,70 @@ func (r *revid) outputClips() { } // senClipToFile writes the passed clip to a file -func (r *revid) sendClipToFile(clip []byte) error { +func (r *revid) sendClipToFile(clip *ring.Chunk) error { r.sendMutex.Lock() - defer r.sendMutex.Unlock() - _, err := r.outputFile.Write(clip) - if err != nil { - return err - } - return nil + _, err := clip.WriteTo(r.outputFile) + r.sendMutex.Unlock() + return err } // sendClipToHTTP takes a clip and an output url and posts through http. -func (r *revid) sendClipToHTTP(clip []byte) error { - r.sendMutex.Lock() +func (r *revid) sendClipToHTTP(clip *ring.Chunk) error { defer r.sendMutex.Unlock() + r.sendMutex.Lock() + timeout := time.Duration(httpTimeOut * time.Second) client := http.Client{Timeout: timeout} - url := r.config.HttpAddress + strconv.Itoa(len(clip)) - r.Log(Debug, fmt.Sprintf("Posting %s (%d bytes)\n", url, len(clip))) - resp, err := client.Post(url, "video/mp2t", bytes.NewReader(clip)) + url := r.config.HttpAddress + strconv.Itoa(clip.Len()) + + // FIXME(kortschak): This is necessary because Post takes + // an io.Reader as a parameter and closes it if it is an + // io.Closer (which *ring.Chunk is), ... and because we + // use a method value for dispatching the sendClip work. + // So to save work in this case, sendClip should be made + // a proper method with a behaviour switch based on a + // revid field so that we can prepare these bytes only + // once for each clip (reusing a buffer field? or tt + // might be work using a sync.Pool for the bodies). + post := bytes.NewBuffer(make([]byte, 0, clip.Len())) + _, err := clip.WriteTo(post) + if err != nil { + return fmt.Errorf("Error buffering: %v", err) + } + + r.Log(Debug, fmt.Sprintf("Posting %s (%d bytes)\n", url, clip.Len())) + resp, err := client.Post(url, "video/mp2t", post) if err != nil { return fmt.Errorf("Error posting to %s: %s", url, err) } defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) if err == nil { r.Log(Debug, fmt.Sprintf("%s\n", body)) } else { r.Log(Error, err.Error()) } + return nil } // sendClipToFfmpegRtmp sends the clip over the current rtmp connection using // an ffmpeg process. -func (r *revid) sendClipToFfmpegRtmp(clip []byte) (err error) { +func (r *revid) sendClipToFfmpegRtmp(clip *ring.Chunk) error { r.sendMutex.Lock() - defer r.sendMutex.Unlock() - _, err = r.ffmpegStdin.Write(clip) - return + _, err := clip.WriteTo(r.ffmpegStdin) + r.sendMutex.Unlock() + return err } // sendClipToLibRtmp send the clip over the current rtmp connection using the // c based librtmp library -func (r *revid) sendClipToLibRtmp(clip []byte) (err error) { +func (r *revid) sendClipToLibRtmp(clip *ring.Chunk) error { r.sendMutex.Lock() - defer r.sendMutex.Unlock() - err = r.rtmpInst.Write(clip) - return + _, err := clip.WriteTo(r.rtmpInst) + r.sendMutex.Unlock() + return err } // setupOutputForFfmpegRtmp sets up output to rtmp using an ffmpeg process diff --git a/ring/ring.go b/ring/ring.go index 4c075fae..40ae3be7 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -104,13 +104,13 @@ func (b *Buffer) Write(p []byte) (int, error) { if len(p) > b.head.cap() { return 0, ErrTooLong } - if len(p) > b.head.cap()-b.head.len() { + if len(p) > b.head.cap()-b.head.Len() { b.full <- b.head b.head = nil return b.Write(p) } n, err := b.head.write(p) - if b.head.cap()-b.head.len() == 0 { + if b.head.cap()-b.head.Len() == 0 { b.full <- b.head b.head = nil } @@ -179,7 +179,7 @@ func (b *Buffer) Read(p []byte) (int, error) { return 0, io.EOF } n, err := b.tail.read(p) - if b.tail.len() == 0 { + if b.tail.Len() == 0 { b.tail.reset() b.empty <- b.tail b.tail = nil @@ -200,7 +200,8 @@ func newChunk(buf []byte) *Chunk { return &Chunk{buf: buf[:0]} } -func (b *Chunk) len() int { +// Len returns the number of bytes held in the chunk. +func (b *Chunk) Len() int { return len(b.buf) - b.off } @@ -228,7 +229,7 @@ func (b *Chunk) write(p []byte) (n int, err error) { } func (b *Chunk) read(p []byte) (n int, err error) { - if b.len() <= 0 { + if b.Len() <= 0 { if len(p) == 0 { return 0, nil } @@ -246,18 +247,18 @@ func (b *Chunk) read(p []byte) (n int, err error) { // // WriteTo will panic if the Chunk has not been obtained through a call to Buffer.Next or // has been closed. WriteTo must be used in the same goroutine as the call to Next. -func (b *Chunk) WriteTo(w io.Writer) (n int, err error) { +func (b *Chunk) WriteTo(w io.Writer) (n int64, err error) { if b.owner == nil || b.owner.tail != b { panic("ring: invalid use of ring buffer chunk") } - n, err = w.Write(b.buf) - if n > len(b.buf) { + _n, err := w.Write(b.buf) + if _n > len(b.buf) { panic("ring: invalid byte count") } - if n != len(b.buf) { - return n, io.ErrShortWrite + if _n != len(b.buf) { + return int64(_n), io.ErrShortWrite } - return n, nil + return int64(_n), nil } // Close closes the Chunk, reseting its data and releasing it back to the Buffer. A Chunk diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index 784f1a94..0ddf5e48 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -49,7 +49,7 @@ import ( // Session provides an interface for sending flv tags over rtmp. type Session interface { StartSession() error - Write([]byte) error + Write([]byte) (int, error) Close() error } @@ -85,14 +85,14 @@ func (s *session) StartSession() error { } // Write writes a frame (flv tag) to the rtmp connection -func (s *session) Write(data []byte) error { +func (s *session) Write(data []byte) (int, error) { if s.rtmp == nil { - return errors.New("rtmp: attempt to write to non-running session") + return 0, errors.New("rtmp: attempt to write to non-running session") } if C.write_frame(s.rtmp, (*C.char)(unsafe.Pointer(&data[0])), C.uint(len(data))) != 0 { - return errors.New("RTMP write error! Check rtmp log for details!") + return 0, errors.New("RTMP write error! Check rtmp log for details!") } - return nil + return len(data), nil } // Close terminates the rtmp connection