diff --git a/revid/revid.go b/revid/revid.go index 4d459eb4..fc34877b 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -31,17 +31,13 @@ package revid import ( "bufio" - "bytes" "errors" "fmt" "io" - "io/ioutil" - "net/http" "os" "os/exec" "path/filepath" "strconv" - "sync" "time" "bitbucket.org/ausocean/av/generator" @@ -92,22 +88,18 @@ type Revid struct { ringBuffer *ring.Buffer config Config isRunning bool - outputFile *os.File inputFile *os.File generator generator.Generator parser parser.Parser cmd *exec.Cmd - ffmpegCmd *exec.Cmd inputReader *bufio.Reader ffmpegStdin io.WriteCloser outputChan chan []byte setupInput func() error setupOutput func() error getFrame func() []byte - sendClip func(*ring.Chunk) error + destination loadSender rtmpInst rtmp.Session - mutex sync.Mutex - sendMutex sync.Mutex currentBitrate int64 } @@ -146,18 +138,33 @@ func (r *Revid) reset(config Config) error { } r.config = config + if r.destination != nil { + err = r.destination.close() + if err != nil { + r.Log(Error, err.Error()) + } + } switch r.config.Output { case File: - r.setupOutput = r.setupOutputForFile - r.sendClip = r.sendClipToFile + s, err := newFileSender(config.OutputFileName) + if err != nil { + return err + } + r.destination = s case FfmpegRtmp: - r.setupOutput = r.setupOutputForFfmpegRtmp - r.sendClip = r.sendClipToFfmpegRtmp + s, err := newFfmpegSender(config.RtmpUrl, r.config.FrameRate) + if err != nil { + return err + } + r.destination = s case NativeRtmp: - r.setupOutput = r.setupOutputForLibRtmp - r.sendClip = r.sendClipToLibRtmp + s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimout, rtmpConnectionMaxTries, r.Log) + if err != nil { + return err + } + r.destination = s case Http: - r.sendClip = r.sendClipToHTTP + r.destination = newHttpSender(config.RtmpUrl, httpTimeout, r.Log) } switch r.config.Input { @@ -235,21 +242,12 @@ func (r *Revid) IsRunning() bool { // Start invokes a Revid to start processing video from a defined input // and packetising (if theres packetization) to a defined output. func (r *Revid) Start() { - r.mutex.Lock() - defer r.mutex.Unlock() if r.isRunning { r.Log(Warning, "Revid.Start() called but revid already running!") return } r.Log(Info, "Starting Revid!") r.Log(Debug, "Setting up output!") - if r.setupOutput != nil { - err := r.setupOutput() - if err != nil { - r.Log(Error, err.Error()) - return - } - } r.isRunning = true r.Log(Info, "Starting output routine!") go r.outputClips() @@ -265,19 +263,12 @@ func (r *Revid) Start() { // Stop halts any processing of video data from a camera or file func (r *Revid) Stop() { - r.mutex.Lock() - defer r.mutex.Unlock() - if !r.isRunning { r.Log(Warning, "Revid.Stop() called but revid not running!") return } r.Log(Info, "Stopping revid!") - // wait for sending to finish - r.sendMutex.Lock() - defer r.sendMutex.Unlock() - r.rtmpInst.Close() r.isRunning = false r.Log(Info, "Stopping generator!") @@ -394,15 +385,20 @@ func (r *Revid) outputClips() { bytes += chunk.Len() r.Log(Detail, "About to send") - err = r.sendClip(chunk) + err = r.destination.load(chunk) + if err != nil { + r.Log(Error, "failed to load clip") + } + err = r.destination.send() if err == nil { - r.Log(Detail, "Sent clip") + r.Log(Detail, "sent clip") } if r.isRunning && err != nil && chunk.Len() > 11 { r.Log(Debug, "Send failed! Trying again") // Try and send again - err = r.sendClip(chunk) + err = r.destination.send() + r.Log(Error, err.Error()) // if there's still an error we try and reconnect, unless we're stopping for r.isRunning && err != nil { @@ -410,22 +406,26 @@ func (r *Revid) outputClips() { time.Sleep(time.Duration(sendFailedDelay) * time.Millisecond) r.Log(Error, err.Error()) - if r.config.Output == NativeRtmp { - r.Log(Debug, "Ending current rtmp session...") - r.rtmpInst.Close() - } - - if r.config.Output == NativeRtmp { - r.Log(Info, "Restarting rtmp session...") - r.rtmpInst.StartSession() + if rs, ok := r.destination.(restarter); ok { + r.Log(Debug, fmt.Sprintf("restarting %T session", rs)) + err = rs.restart() + if err != nil { + // TODO(kortschak): Make this "Fatal" when that exists. + r.Log(Error, "failed to restart rtmp session") + r.isRunning = false + return + } + r.Log(Info, "restarted rtmp session") } r.Log(Debug, "Trying to send again with new connection...") - r.sendClip(chunk) // TODO(kortschak): Log these errors? + err = r.destination.send() + r.Log(Error, err.Error()) } } - chunk.Close() // ring.Chunk is an io.Closer, but Close alwats returns nil. + r.destination.release() + r.Log(Detail, "Done reading that clip from ringbuffer...") // Log some information regarding bitrate and ring buffer size if it's time @@ -440,129 +440,10 @@ func (r *Revid) outputClips() { } } r.Log(Info, "Not outputting clips anymore!") -} - -// senClipToFile writes the passed clip to a file -func (r *Revid) sendClipToFile(clip *ring.Chunk) error { - r.sendMutex.Lock() - _, err := clip.WriteTo(r.outputFile) - r.sendMutex.Unlock() - return err -} - -// sendClipToHTTP takes a clip and an output url and posts through http. -func (r *Revid) sendClipToHTTP(clip *ring.Chunk) error { - defer r.sendMutex.Unlock() - r.sendMutex.Lock() - - client := http.Client{Timeout: httpTimeout} - url := r.config.HttpAddress + strconv.Itoa(clip.Len()) - - // FIXME(kortschak): This is necessary because Post takes - // an io.Reader as a parameter and closes it if it is an - // io.Closer (which *ring.Chunk is), ... and because we - // use a method value for dispatching the sendClip work. - // So to save work in this case, sendClip should be made - // a proper method with a behaviour switch based on a - // Revid field so that we can prepare these bytes only - // once for each clip (reusing a buffer field? or tt - // might be work using a sync.Pool for the bodies). - post := bytes.NewBuffer(make([]byte, 0, clip.Len())) - _, err := clip.WriteTo(post) + err := r.destination.close() if err != nil { - return fmt.Errorf("Error buffering: %v", err) + r.Log(Error, "failed to close destination") } - - r.Log(Debug, fmt.Sprintf("Posting %s (%d bytes)\n", url, clip.Len())) - resp, err := client.Post(url, "video/mp2t", post) - if err != nil { - return fmt.Errorf("Error posting to %s: %s", url, err) - } - defer resp.Body.Close() - - body, err := ioutil.ReadAll(resp.Body) - if err == nil { - r.Log(Debug, fmt.Sprintf("%s\n", body)) - } else { - r.Log(Error, err.Error()) - } - - return nil -} - -// sendClipToFfmpegRtmp sends the clip over the current rtmp connection using -// an ffmpeg process. -func (r *Revid) sendClipToFfmpegRtmp(clip *ring.Chunk) error { - r.sendMutex.Lock() - _, err := clip.WriteTo(r.ffmpegStdin) - r.sendMutex.Unlock() - return err -} - -// sendClipToLibRtmp send the clip over the current rtmp connection using the -// c based librtmp library -func (r *Revid) sendClipToLibRtmp(clip *ring.Chunk) error { - r.sendMutex.Lock() - _, err := clip.WriteTo(r.rtmpInst) - r.sendMutex.Unlock() - return err -} - -// setupOutputForFfmpegRtmp sets up output to rtmp using an ffmpeg process -func (r *Revid) setupOutputForFfmpegRtmp() error { - r.ffmpegCmd = exec.Command(ffmpegPath, - "-f", "h264", - "-r", r.config.FrameRate, - "-i", "-", - "-f", "lavfi", - "-i", "aevalsrc=0", - "-fflags", "nobuffer", - "-vcodec", "copy", - "-acodec", "aac", - "-map", "0:0", - "-map", "1:0", - "-strict", "experimental", - "-f", "flv", - r.config.RtmpUrl, - ) - var err error - r.ffmpegStdin, err = r.ffmpegCmd.StdinPipe() - if err != nil { - r.Log(Error, err.Error()) - r.Stop() - return err - } - err = r.ffmpegCmd.Start() - if err != nil { - r.Log(Error, err.Error()) - r.Stop() - return err - } - return nil -} - -// setupOutputForLibRtmp sets up rtmp output using the wrapper for the c based -// librtmp library - makes connection and starts comms etc. -func (r *Revid) setupOutputForLibRtmp() error { - r.rtmpInst = rtmp.NewSession(r.config.RtmpUrl, rtmpConnectionTimout) - err := r.rtmpInst.StartSession() - for noOfTries := 0; err != nil && noOfTries < rtmpConnectionMaxTries; noOfTries++ { - r.rtmpInst.Close() - r.Log(Error, err.Error()) - r.Log(Info, "Trying to establish rtmp connection again!") - r.rtmpInst = rtmp.NewSession(r.config.RtmpUrl, rtmpConnectionTimout) - err = r.rtmpInst.StartSession() - } - if err != nil { - return err - } - return err -} - -// setupOutputForFile sets up an output file to output data to -func (r *Revid) setupOutputForFile() (err error) { - r.outputFile, err = os.Create(r.config.OutputFileName) - return } // setupInputForRaspivid sets up things for input from raspivid i.e. starts @@ -634,17 +515,6 @@ func (r *Revid) setupInputForFile() error { return nil } -// testRtmp is useful to check robustness of connections. Intended to be run as -// goroutine. After every 'delayTime' the rtmp connection is ended and then -// restarted -func (r *Revid) testRtmp(delayTime uint) { - for { - time.Sleep(time.Duration(delayTime) * time.Millisecond) - r.rtmpInst.Close() - r.rtmpInst.StartSession() - } -} - // readCamera reads data from the defined camera while the Revid is running. // TODO: use ringbuffer here instead of allocating mem every time! func (r *Revid) readCamera() { diff --git a/revid/senders.go b/revid/senders.go new file mode 100644 index 00000000..66590956 --- /dev/null +++ b/revid/senders.go @@ -0,0 +1,294 @@ +/* +NAME + senders.go + +DESCRIPTION + See Readme.md + +AUTHORS + Saxon A. Nelson-Milton + Alan Noble + +LICENSE + revid is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + +package revid + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "net/http" + "os" + "os/exec" + "strconv" + "time" + + "bitbucket.org/ausocean/av/rtmp" + "bitbucket.org/ausocean/utils/ring" +) + +// loadSender is a destination to send a *ring.Chunk to. +// When a loadSender has finished using the *ring.Chunk +// it must be Closed. +type loadSender interface { + // load assigns the *ring.Chunk to the loadSender. + // The load call may fail, but must not mutate the + // the chunk. + load(*ring.Chunk) error + + // send performs a destination-specific send + // operation. It must not mutate the chunk. + send() error + + // release releases the *ring.Chunk. + release() + + // close cleans up after use of the loadSender. + close() error +} + +// restart is an optional interface for loadSenders that +// can restart their connection. +type restarter interface { + restart() error +} + +// fileSender implements loadSender for a local file destination. +type fileSender struct { + file *os.File + + chunk *ring.Chunk +} + +func newFileSender(path string) (*fileSender, error) { + f, err := os.Create(path) + if err != nil { + return nil, err + } + return &fileSender{file: f}, nil +} + +func (s *fileSender) load(c *ring.Chunk) error { + s.chunk = c + return nil +} + +func (s *fileSender) send() error { + _, err := s.chunk.WriteTo(s.file) + return err +} + +func (s *fileSender) release() { + s.chunk.Close() + s.chunk = nil +} + +func (s *fileSender) close() error { + return s.file.Close() +} + +// httpSender implements loadSender for an HTTP destination. +type httpSender struct { + client http.Client + url string + + log func(lvl, msg string) + + buf []byte + + chunk *ring.Chunk +} + +func newHttpSender(url string, timeout time.Duration, log func(lvl, msg string)) *httpSender { + return &httpSender{ + client: http.Client{Timeout: timeout}, + url: url, + log: log, + } +} + +func (s *httpSender) load(c *ring.Chunk) error { + buf := bytes.NewBuffer(s.buf[:0]) + _, err := s.chunk.WriteTo(buf) + s.buf = buf.Bytes() + if err != nil { + return fmt.Errorf("fileSender: %v", err) + } + return nil +} + +func (s *httpSender) send() error { + url := s.url + strconv.Itoa(len(s.buf)) + s.log(Debug, fmt.Sprintf("Posting %s (%d bytes)\n", url, len(s.buf))) + resp, err := s.client.Post(url, "video/mp2t", bytes.NewReader(s.buf)) + if err != nil { + return fmt.Errorf("Error posting to %s: %s", url, err) + } + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err == nil { + s.log(Debug, fmt.Sprintf("%s\n", body)) + } else { + s.log(Error, err.Error()) + } + return err +} + +func (s *httpSender) release() { + s.chunk.Close() + s.chunk = nil +} + +func (s *httpSender) close() error { return nil } + +// ffmpegSender implements loadSender for an FFMPEG RTMP destination. +type ffmpegSender struct { + ffmpeg io.WriteCloser + + chunk *ring.Chunk +} + +func newFfmpegSender(url, framerate string) (*ffmpegSender, error) { + cmd := exec.Command(ffmpegPath, + "-f", "h264", + "-r", framerate, + "-i", "-", + "-f", "lavfi", + "-i", "aevalsrc=0", + "-fflags", "nobuffer", + "-vcodec", "copy", + "-acodec", "aac", + "-map", "0:0", + "-map", "1:0", + "-strict", "experimental", + "-f", "flv", + url, + ) + w, err := cmd.StdinPipe() + if err != nil { + return nil, err + } + err = cmd.Start() + if err != nil { + return nil, err + } + return &ffmpegSender{ffmpeg: w}, nil +} + +func (s *ffmpegSender) load(c *ring.Chunk) error { + s.chunk = c + return nil +} + +func (s *ffmpegSender) send() error { + _, err := s.chunk.WriteTo(s.ffmpeg) + return err +} + +func (s *ffmpegSender) release() { + s.chunk.Close() + s.chunk = nil +} + +func (s *ffmpegSender) close() error { + return s.ffmpeg.Close() +} + +// rtmpSender implements loadSender for a native RTMP destination. +type rtmpSender struct { + sess rtmp.Session + + url string + timeout uint + retries int + log func(lvl, msg string) + + chunk *ring.Chunk +} + +var _ restarter = (*rtmpSender)(nil) + +func newRtmpSender(url string, timeout uint, retries int, log func(lvl, msg string)) (*rtmpSender, error) { + var sess rtmp.Session + var err error + for n := 0; n < retries; n++ { + sess = rtmp.NewSession(url, timeout) + err = sess.StartSession() + if err == nil { + break + } + log(Error, err.Error()) + sess.Close() + if n < retries-1 { + log(Info, "retry rtmp connection") + } + } + if err != nil { + return nil, err + } + + s := &rtmpSender{ + sess: sess, + url: url, + timeout: timeout, + retries: retries, + log: log, + } + return s, nil +} + +func (s *rtmpSender) load(c *ring.Chunk) error { + s.chunk = c + return nil +} + +func (s *rtmpSender) send() error { + _, err := s.chunk.WriteTo(s.sess) + return err +} + +func (s *rtmpSender) release() { + s.chunk.Close() + s.chunk = nil +} + +func (s *rtmpSender) restart() error { + err := s.sess.Close() + if err != nil { + return err + } + for n := 0; n < s.retries; n++ { + s.sess = rtmp.NewSession(s.url, s.timeout) + err = s.sess.StartSession() + if err == nil { + break + } + s.log(Error, err.Error()) + s.sess.Close() + if n < s.retries-1 { + s.log(Info, "retry rtmp connection") + } + } + return err +} + +func (s *rtmpSender) close() error { + return s.sess.Close() +}