From 3881cb9712c3a4ae364dd3d65b5a18fec709c27d Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Sat, 9 Jun 2018 11:31:21 +0930 Subject: [PATCH 1/2] revid: avoid monomorphic interface --- cmd/revid-cli/main.go | 4 +- revid/config.go | 102 ++++++++++++++++++++-------------------- revid/revid.go | 105 ++++++++++++++++++------------------------ 3 files changed, 98 insertions(+), 113 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 58205edb..1e73ae5f 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -95,7 +95,7 @@ const ( // Globals var ( - revidInst revid.Revid + revidInst *revid.Revid config revid.Config ) @@ -341,7 +341,7 @@ func startRevid() { func createRevidInstance() { // Try to create the revid instance with the given config var err error - for revidInst, err = revid.NewRevid(config); err != nil; { + for revidInst, err = revid.New(config); err != nil; { // If the config does have a logger, use it to output error, otherwise // just output to std output if config.Logger != nil { diff --git a/revid/config.go b/revid/config.go index 45be8429..d699ffef 100644 --- a/revid/config.go +++ b/revid/config.go @@ -104,46 +104,46 @@ const ( // Validate checks for any errors in the config fields and defaults settings // if particular parameters have not been defined. -func (config *Config) Validate(r *revid) error { - switch config.Verbosity { +func (c *Config) Validate(r *Revid) error { + switch c.Verbosity { case Yes: case No: case NothingDefined: - config.Verbosity = Yes + c.Verbosity = Yes r.Log(Warning, "No verbosity mode defined, defaulting to no Verbosity!") default: return errors.New("Bad Verbosity defined in config!") } - switch config.QuantizationMode { + switch c.QuantizationMode { case QuantizationOn: case QuantizationOff: case NothingDefined: r.Log(Warning, "No quantization mode defined, defaulting to QuantizationOff!") - config.QuantizationMode = QuantizationOff + c.QuantizationMode = QuantizationOff default: return errors.New("Bad QuantizationMode defined in config!") } - switch config.Input { + switch c.Input { case Rtp: case Raspivid: case File: case NothingDefined: r.Log(Warning, "No input type defined, defaulting to raspivid!") - config.Input = defaultInput + c.Input = defaultInput default: return errors.New("Bad input type defined in config!") } - switch config.InputCodec { + switch c.InputCodec { case H264: - if config.Bitrate != "" && config.Quantization != "" { - bitrate, err := strconv.Atoi(config.Bitrate) + if c.Bitrate != "" && c.Quantization != "" { + bitrate, err := strconv.Atoi(c.Bitrate) if err != nil { return errors.New("Something is wrong with bitrate in conig!") } - quantization, err := strconv.Atoi(config.Quantization) + quantization, err := strconv.Atoi(c.Quantization) if err != nil { return errors.New("Something is wrong with quantization in config!") } @@ -152,141 +152,141 @@ func (config *Config) Validate(r *revid) error { } } case Mjpeg: - if config.Quantization != "" { - quantization, err := strconv.Atoi(config.Quantization) + if c.Quantization != "" { + quantization, err := strconv.Atoi(c.Quantization) if err != nil { return errors.New("Something is wrong with quantization in config!") } - if quantization > 0 || config.Bitrate == "" { + if quantization > 0 || c.Bitrate == "" { return errors.New("Bad bitrate or quantization for mjpeg input!") } } case NothingDefined: r.Log(Warning, "No input codec defined, defaulting to h264!") - config.InputCodec = H264 + c.InputCodec = H264 r.Log(Warning, "Defaulting bitrate to 0 and quantization to 35!") - config.Quantization = defaultQuantization + c.Quantization = defaultQuantization default: return errors.New("Bad input codec defined in config!") } - switch config.Output { + switch c.Output { case Http: case File: case NativeRtmp, FfmpegRtmp: - if config.RtmpUrl == "" { + if c.RtmpUrl == "" { r.Log(Info, "No RTMP URL: falling back to HTTP") - config.Output = Http + c.Output = Http break } r.Log(Info, "Defaulting frames per clip to 1 for rtmp output!") - config.FramesPerClip = "1" + c.FramesPerClip = "1" case NothingDefined: r.Log(Warning, "No output defined, defaulting to httpOut!") - config.Output = defaultOutput + c.Output = defaultOutput default: return errors.New("Bad output type defined in config!") } - switch config.Packetization { + switch c.Packetization { case None: case Mpegts: case Flv: case NothingDefined: r.Log(Warning, "No packetization option defined, defaulting to none!") - config.Packetization = Flv + c.Packetization = Flv default: return errors.New("Bad packetization option defined in config!") } - switch config.HorizontalFlip { + switch c.HorizontalFlip { case Yes: case No: case NothingDefined: r.Log(Warning, "No horizontal flip option defined, defaulting to not flipped!") - config.HorizontalFlip = defaultHorizontalFlip + c.HorizontalFlip = defaultHorizontalFlip default: return errors.New("Bad horizontal flip option defined in config!") } - switch config.VerticalFlip { + switch c.VerticalFlip { case Yes: case No: case NothingDefined: r.Log(Warning, "No vertical flip option defined, defaulting to not flipped!") - config.VerticalFlip = defaultVerticalFlip + c.VerticalFlip = defaultVerticalFlip default: return errors.New("Bad vertical flip option defined in config!") } - if config.FramesPerClip == "" { + if c.FramesPerClip == "" { r.Log(Warning, "No FramesPerClip defined defined, defaulting to 1!") - config.Width = defaultFramesPerClip + c.Width = defaultFramesPerClip } else { - if integer, err := strconv.Atoi(config.FramesPerClip); integer <= 0 || err != nil { + if integer, err := strconv.Atoi(c.FramesPerClip); integer <= 0 || err != nil { return errors.New("Bad width defined in config!") } } - if config.Width == "" { + if c.Width == "" { r.Log(Warning, "No width defined, defaulting to 1280!") - config.Width = defaultWidth + c.Width = defaultWidth } else { - if integer, err := strconv.Atoi(config.Width); integer < 0 || err != nil { + if integer, err := strconv.Atoi(c.Width); integer < 0 || err != nil { return errors.New("Bad width defined in config!") } } - if config.Height == "" { + if c.Height == "" { r.Log(Warning, "No height defined, defaulting to 720!") - config.Height = defaultHeight + c.Height = defaultHeight } else { - if integer, err := strconv.Atoi(config.Height); integer < 0 || err != nil { + if integer, err := strconv.Atoi(c.Height); integer < 0 || err != nil { return errors.New("Bad height defined in config!") } } - if config.FrameRate == "" { + if c.FrameRate == "" { r.Log(Warning, "No frame rate defined, defaulting to 25!") - config.FrameRate = defaultFrameRate + c.FrameRate = defaultFrameRate } else { - if integer, err := strconv.Atoi(config.FrameRate); integer < 0 || err != nil { + if integer, err := strconv.Atoi(c.FrameRate); integer < 0 || err != nil { return errors.New("Bad frame rate defined in config!") } } - if config.Bitrate == "" { + if c.Bitrate == "" { r.Log(Warning, "No bitrate defined, defaulting!") - config.Bitrate = defaultBitrate + c.Bitrate = defaultBitrate } else { - if integer, err := strconv.Atoi(config.Bitrate); integer < 0 || err != nil { + if integer, err := strconv.Atoi(c.Bitrate); integer < 0 || err != nil { return errors.New("Bad bitrate defined in config!") } } - if config.Timeout == "" { + if c.Timeout == "" { r.Log(Warning, "No timeout defined, defaulting to 0!") - config.Timeout = defaultTimeout + c.Timeout = defaultTimeout } else { - if integer, err := strconv.Atoi(config.Timeout); integer < 0 || err != nil { + if integer, err := strconv.Atoi(c.Timeout); integer < 0 || err != nil { return errors.New("Bad timeout defined in config!") } } - if config.IntraRefreshPeriod == "" { + if c.IntraRefreshPeriod == "" { r.Log(Warning, "No intra refresh defined, defaulting to 100!") - config.IntraRefreshPeriod = defaultIntraRefreshPeriod + c.IntraRefreshPeriod = defaultIntraRefreshPeriod } else { - if integer, err := strconv.Atoi(config.IntraRefreshPeriod); integer < 0 || err != nil { + if integer, err := strconv.Atoi(c.IntraRefreshPeriod); integer < 0 || err != nil { return errors.New("Bad intra refresh defined in config!") } } - if config.Quantization == "" { + if c.Quantization == "" { r.Log(Warning, "No quantization defined, defaulting to 35!") - config.Quantization = defaultQuantization + c.Quantization = defaultQuantization } else { - if integer, err := strconv.Atoi(config.Quantization); integer < 0 || integer > 51 || err != nil { + if integer, err := strconv.Atoi(c.Quantization); integer < 0 || integer > 51 || err != nil { return errors.New("Bad quantization defined in config!") } } diff --git a/revid/revid.go b/revid/revid.go index 77eb610d..4d459eb4 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -86,18 +86,7 @@ const ( // Revid provides methods to control a revid session; providing methods // to start, stop and change the state of an instance using the Config struct. -type Revid interface { - Start() - Stop() - changeState(newconfig Config) error - GetConfigRef() *Config - Log(logType, m string) - IsRunning() bool - GetBitrate() int64 -} - -// The revid struct provides fields to describe the state of a Revid. -type revid struct { +type Revid struct { ffmpegPath string tempDir string ringBuffer *ring.Buffer @@ -122,38 +111,34 @@ type revid struct { currentBitrate int64 } -// NewRevid returns a pointer to a new revid with the desired +// NewRevid returns a pointer to a new Revid with the desired // configuration, and/or an error if construction of the new instant was not // successful. -func NewRevid(config Config) (r *revid, err error) { - r = new(revid) - r.mutex = sync.Mutex{} - r.sendMutex = sync.Mutex{} - r.ringBuffer = ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout) - err = r.changeState(config) +func New(c Config) (*Revid, error) { + var r Revid + err := r.reset(c) if err != nil { - r = nil - return + return nil, err } + r.ringBuffer = ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout) r.outputChan = make(chan []byte, outputChanSize) - r.isRunning = false - return + return &r, nil } // Returns the currently saved bitrate from the most recent bitrate check // check bitrate output delay in consts for this period -func (r *revid) GetBitrate() int64 { +func (r *Revid) GetBitrate() int64 { return r.currentBitrate } // GetConfigRef returns a pointer to the revidInst's Config struct object -func (r *revid) GetConfigRef() *Config { +func (r *Revid) GetConfigRef() *Config { return &r.config } -// changeState swaps the current config of a revid with the passed +// reset swaps the current config of a Revid with the passed // configuration; checking validity and returning errors if not valid. -func (r *revid) changeState(config Config) error { +func (r *Revid) reset(config Config) error { r.config.Logger = config.Logger err := config.Validate(r) if err != nil { @@ -214,25 +199,25 @@ noPacketizationSetup: return nil } -// ChangeConfig changes the current configuration of the revid instance. -func (r *revid) ChangeConfig(config Config) (err error) { +// ChangeConfig changes the current configuration of the Revid instance. +func (r *Revid) ChangeConfig(c Config) error { // FIXME(kortschak): This is reimplemented in cmd/revid-cli/main.go. // The implementation in the command is used and this is not. // Decide on one or the other. r.Stop() - r, err = NewRevid(config) + r, err := New(c) if err != nil { - return + return err } r.Start() - return + return err } // Log takes a logtype and message and tries to send this information to the // logger provided in the revid config - if there is one, otherwise the message // is sent to stdout -func (r *revid) Log(logType, m string) { +func (r *Revid) Log(logType, m string) { if r.config.Verbosity == Yes { if r.config.Logger != nil { r.config.Logger.Log("revid", logType, m) @@ -243,17 +228,17 @@ func (r *revid) Log(logType, m string) { } // IsRunning returns true if the revid is currently running and false otherwise -func (r *revid) IsRunning() bool { +func (r *Revid) IsRunning() bool { return r.isRunning } -// Start invokes a revid to start processing video from a defined input +// Start invokes a Revid to start processing video from a defined input // and packetising (if theres packetization) to a defined output. -func (r *revid) Start() { +func (r *Revid) Start() { r.mutex.Lock() defer r.mutex.Unlock() if r.isRunning { - r.Log(Warning, "revid.Start() called but revid already running!") + r.Log(Warning, "Revid.Start() called but revid already running!") return } r.Log(Info, "Starting Revid!") @@ -279,12 +264,12 @@ func (r *revid) Start() { } // Stop halts any processing of video data from a camera or file -func (r *revid) Stop() { +func (r *Revid) Stop() { r.mutex.Lock() defer r.mutex.Unlock() if !r.isRunning { - r.Log(Warning, "revid.Stop() called but revid not running!") + r.Log(Warning, "Revid.Stop() called but revid not running!") return } @@ -315,18 +300,18 @@ func (r *revid) Stop() { // getFrameNoPacketization gets a frame directly from the revid output chan // as we don't need to go through the generator with no packetization settings -func (r *revid) getFrameNoPacketization() []byte { +func (r *Revid) getFrameNoPacketization() []byte { return <-r.outputChan } // getFramePacketization gets a frame from the generators output chan - the // the generator being an mpegts or flv generator depending on the config -func (r *revid) getFramePacketization() []byte { +func (r *Revid) getFramePacketization() []byte { return <-(r.generator.GetOutputChan()) } -// flushDataPacketization removes data from the revid inst's coutput chan -func (r *revid) flushData() { +// flushDataPacketization removes data from the Revid inst's coutput chan +func (r *Revid) flushData() { switch r.config.Packetization { case Flv: for { @@ -342,7 +327,7 @@ done: // packClips takes data segments; whether that be tsPackets or mjpeg frames and // packs them into clips consisting of the amount frames specified in the config -func (r *revid) packClips() { +func (r *Revid) packClips() { clipSize := 0 packetCount := 0 for r.isRunning { @@ -384,7 +369,7 @@ func (r *revid) packClips() { // outputClips takes the clips produced in the packClips method and outputs them // to the desired output defined in the revid config -func (r *revid) outputClips() { +func (r *Revid) outputClips() { now := time.Now() prevTime := now bytes := 0 @@ -458,7 +443,7 @@ func (r *revid) outputClips() { } // senClipToFile writes the passed clip to a file -func (r *revid) sendClipToFile(clip *ring.Chunk) error { +func (r *Revid) sendClipToFile(clip *ring.Chunk) error { r.sendMutex.Lock() _, err := clip.WriteTo(r.outputFile) r.sendMutex.Unlock() @@ -466,7 +451,7 @@ func (r *revid) sendClipToFile(clip *ring.Chunk) error { } // sendClipToHTTP takes a clip and an output url and posts through http. -func (r *revid) sendClipToHTTP(clip *ring.Chunk) error { +func (r *Revid) sendClipToHTTP(clip *ring.Chunk) error { defer r.sendMutex.Unlock() r.sendMutex.Lock() @@ -479,7 +464,7 @@ func (r *revid) sendClipToHTTP(clip *ring.Chunk) error { // use a method value for dispatching the sendClip work. // So to save work in this case, sendClip should be made // a proper method with a behaviour switch based on a - // revid field so that we can prepare these bytes only + // Revid field so that we can prepare these bytes only // once for each clip (reusing a buffer field? or tt // might be work using a sync.Pool for the bodies). post := bytes.NewBuffer(make([]byte, 0, clip.Len())) @@ -507,7 +492,7 @@ func (r *revid) sendClipToHTTP(clip *ring.Chunk) error { // sendClipToFfmpegRtmp sends the clip over the current rtmp connection using // an ffmpeg process. -func (r *revid) sendClipToFfmpegRtmp(clip *ring.Chunk) error { +func (r *Revid) sendClipToFfmpegRtmp(clip *ring.Chunk) error { r.sendMutex.Lock() _, err := clip.WriteTo(r.ffmpegStdin) r.sendMutex.Unlock() @@ -516,7 +501,7 @@ func (r *revid) sendClipToFfmpegRtmp(clip *ring.Chunk) error { // sendClipToLibRtmp send the clip over the current rtmp connection using the // c based librtmp library -func (r *revid) sendClipToLibRtmp(clip *ring.Chunk) error { +func (r *Revid) sendClipToLibRtmp(clip *ring.Chunk) error { r.sendMutex.Lock() _, err := clip.WriteTo(r.rtmpInst) r.sendMutex.Unlock() @@ -524,7 +509,7 @@ func (r *revid) sendClipToLibRtmp(clip *ring.Chunk) error { } // setupOutputForFfmpegRtmp sets up output to rtmp using an ffmpeg process -func (r *revid) setupOutputForFfmpegRtmp() error { +func (r *Revid) setupOutputForFfmpegRtmp() error { r.ffmpegCmd = exec.Command(ffmpegPath, "-f", "h264", "-r", r.config.FrameRate, @@ -558,7 +543,7 @@ func (r *revid) setupOutputForFfmpegRtmp() error { // setupOutputForLibRtmp sets up rtmp output using the wrapper for the c based // librtmp library - makes connection and starts comms etc. -func (r *revid) setupOutputForLibRtmp() error { +func (r *Revid) setupOutputForLibRtmp() error { r.rtmpInst = rtmp.NewSession(r.config.RtmpUrl, rtmpConnectionTimout) err := r.rtmpInst.StartSession() for noOfTries := 0; err != nil && noOfTries < rtmpConnectionMaxTries; noOfTries++ { @@ -575,14 +560,14 @@ func (r *revid) setupOutputForLibRtmp() error { } // setupOutputForFile sets up an output file to output data to -func (r *revid) setupOutputForFile() (err error) { +func (r *Revid) setupOutputForFile() (err error) { r.outputFile, err = os.Create(r.config.OutputFileName) return } // setupInputForRaspivid sets up things for input from raspivid i.e. starts // a raspivid process and pipes it's data output. -func (r *revid) setupInputForRaspivid() error { +func (r *Revid) setupInputForRaspivid() error { r.Log(Info, "Starting raspivid!") switch r.config.InputCodec { case H264: @@ -642,7 +627,7 @@ func (r *revid) setupInputForRaspivid() error { } // setupInputForFile sets things up for getting input from a file -func (r *revid) setupInputForFile() error { +func (r *Revid) setupInputForFile() error { fps, _ := strconv.Atoi(r.config.FrameRate) r.parser.SetDelay(uint(float64(1000) / float64(fps))) r.readFile() @@ -652,7 +637,7 @@ func (r *revid) setupInputForFile() error { // testRtmp is useful to check robustness of connections. Intended to be run as // goroutine. After every 'delayTime' the rtmp connection is ended and then // restarted -func (r *revid) testRtmp(delayTime uint) { +func (r *Revid) testRtmp(delayTime uint) { for { time.Sleep(time.Duration(delayTime) * time.Millisecond) r.rtmpInst.Close() @@ -660,9 +645,9 @@ func (r *revid) testRtmp(delayTime uint) { } } -// readCamera reads data from the defined camera while the revid is running. +// readCamera reads data from the defined camera while the Revid is running. // TODO: use ringbuffer here instead of allocating mem every time! -func (r *revid) readCamera() { +func (r *Revid) readCamera() { r.Log(Info, "Reading camera data!") for r.isRunning { data := make([]byte, 1) @@ -679,8 +664,8 @@ func (r *revid) readCamera() { r.Log(Info, "Not trying to read from camera anymore!") } -// readFile reads data from the defined file while the revid is running. -func (r *revid) readFile() error { +// readFile reads data from the defined file while the Revid is running. +func (r *Revid) readFile() error { var err error r.inputFile, err = os.Open(r.config.InputFileName) if err != nil { From 1105ee3ea9f2a5188f72f5e175c302541f222457 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Sat, 9 Jun 2018 14:08:48 +0930 Subject: [PATCH 2/2] revid: use loadSender for clip sending --- revid/revid.go | 224 ++++++++---------------------------- revid/senders.go | 294 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 341 insertions(+), 177 deletions(-) create mode 100644 revid/senders.go diff --git a/revid/revid.go b/revid/revid.go index 4d459eb4..fc34877b 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -31,17 +31,13 @@ package revid import ( "bufio" - "bytes" "errors" "fmt" "io" - "io/ioutil" - "net/http" "os" "os/exec" "path/filepath" "strconv" - "sync" "time" "bitbucket.org/ausocean/av/generator" @@ -92,22 +88,18 @@ type Revid struct { ringBuffer *ring.Buffer config Config isRunning bool - outputFile *os.File inputFile *os.File generator generator.Generator parser parser.Parser cmd *exec.Cmd - ffmpegCmd *exec.Cmd inputReader *bufio.Reader ffmpegStdin io.WriteCloser outputChan chan []byte setupInput func() error setupOutput func() error getFrame func() []byte - sendClip func(*ring.Chunk) error + destination loadSender rtmpInst rtmp.Session - mutex sync.Mutex - sendMutex sync.Mutex currentBitrate int64 } @@ -146,18 +138,33 @@ func (r *Revid) reset(config Config) error { } r.config = config + if r.destination != nil { + err = r.destination.close() + if err != nil { + r.Log(Error, err.Error()) + } + } switch r.config.Output { case File: - r.setupOutput = r.setupOutputForFile - r.sendClip = r.sendClipToFile + s, err := newFileSender(config.OutputFileName) + if err != nil { + return err + } + r.destination = s case FfmpegRtmp: - r.setupOutput = r.setupOutputForFfmpegRtmp - r.sendClip = r.sendClipToFfmpegRtmp + s, err := newFfmpegSender(config.RtmpUrl, r.config.FrameRate) + if err != nil { + return err + } + r.destination = s case NativeRtmp: - r.setupOutput = r.setupOutputForLibRtmp - r.sendClip = r.sendClipToLibRtmp + s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimout, rtmpConnectionMaxTries, r.Log) + if err != nil { + return err + } + r.destination = s case Http: - r.sendClip = r.sendClipToHTTP + r.destination = newHttpSender(config.RtmpUrl, httpTimeout, r.Log) } switch r.config.Input { @@ -235,21 +242,12 @@ func (r *Revid) IsRunning() bool { // Start invokes a Revid to start processing video from a defined input // and packetising (if theres packetization) to a defined output. func (r *Revid) Start() { - r.mutex.Lock() - defer r.mutex.Unlock() if r.isRunning { r.Log(Warning, "Revid.Start() called but revid already running!") return } r.Log(Info, "Starting Revid!") r.Log(Debug, "Setting up output!") - if r.setupOutput != nil { - err := r.setupOutput() - if err != nil { - r.Log(Error, err.Error()) - return - } - } r.isRunning = true r.Log(Info, "Starting output routine!") go r.outputClips() @@ -265,19 +263,12 @@ func (r *Revid) Start() { // Stop halts any processing of video data from a camera or file func (r *Revid) Stop() { - r.mutex.Lock() - defer r.mutex.Unlock() - if !r.isRunning { r.Log(Warning, "Revid.Stop() called but revid not running!") return } r.Log(Info, "Stopping revid!") - // wait for sending to finish - r.sendMutex.Lock() - defer r.sendMutex.Unlock() - r.rtmpInst.Close() r.isRunning = false r.Log(Info, "Stopping generator!") @@ -394,15 +385,20 @@ func (r *Revid) outputClips() { bytes += chunk.Len() r.Log(Detail, "About to send") - err = r.sendClip(chunk) + err = r.destination.load(chunk) + if err != nil { + r.Log(Error, "failed to load clip") + } + err = r.destination.send() if err == nil { - r.Log(Detail, "Sent clip") + r.Log(Detail, "sent clip") } if r.isRunning && err != nil && chunk.Len() > 11 { r.Log(Debug, "Send failed! Trying again") // Try and send again - err = r.sendClip(chunk) + err = r.destination.send() + r.Log(Error, err.Error()) // if there's still an error we try and reconnect, unless we're stopping for r.isRunning && err != nil { @@ -410,22 +406,26 @@ func (r *Revid) outputClips() { time.Sleep(time.Duration(sendFailedDelay) * time.Millisecond) r.Log(Error, err.Error()) - if r.config.Output == NativeRtmp { - r.Log(Debug, "Ending current rtmp session...") - r.rtmpInst.Close() - } - - if r.config.Output == NativeRtmp { - r.Log(Info, "Restarting rtmp session...") - r.rtmpInst.StartSession() + if rs, ok := r.destination.(restarter); ok { + r.Log(Debug, fmt.Sprintf("restarting %T session", rs)) + err = rs.restart() + if err != nil { + // TODO(kortschak): Make this "Fatal" when that exists. + r.Log(Error, "failed to restart rtmp session") + r.isRunning = false + return + } + r.Log(Info, "restarted rtmp session") } r.Log(Debug, "Trying to send again with new connection...") - r.sendClip(chunk) // TODO(kortschak): Log these errors? + err = r.destination.send() + r.Log(Error, err.Error()) } } - chunk.Close() // ring.Chunk is an io.Closer, but Close alwats returns nil. + r.destination.release() + r.Log(Detail, "Done reading that clip from ringbuffer...") // Log some information regarding bitrate and ring buffer size if it's time @@ -440,129 +440,10 @@ func (r *Revid) outputClips() { } } r.Log(Info, "Not outputting clips anymore!") -} - -// senClipToFile writes the passed clip to a file -func (r *Revid) sendClipToFile(clip *ring.Chunk) error { - r.sendMutex.Lock() - _, err := clip.WriteTo(r.outputFile) - r.sendMutex.Unlock() - return err -} - -// sendClipToHTTP takes a clip and an output url and posts through http. -func (r *Revid) sendClipToHTTP(clip *ring.Chunk) error { - defer r.sendMutex.Unlock() - r.sendMutex.Lock() - - client := http.Client{Timeout: httpTimeout} - url := r.config.HttpAddress + strconv.Itoa(clip.Len()) - - // FIXME(kortschak): This is necessary because Post takes - // an io.Reader as a parameter and closes it if it is an - // io.Closer (which *ring.Chunk is), ... and because we - // use a method value for dispatching the sendClip work. - // So to save work in this case, sendClip should be made - // a proper method with a behaviour switch based on a - // Revid field so that we can prepare these bytes only - // once for each clip (reusing a buffer field? or tt - // might be work using a sync.Pool for the bodies). - post := bytes.NewBuffer(make([]byte, 0, clip.Len())) - _, err := clip.WriteTo(post) + err := r.destination.close() if err != nil { - return fmt.Errorf("Error buffering: %v", err) + r.Log(Error, "failed to close destination") } - - r.Log(Debug, fmt.Sprintf("Posting %s (%d bytes)\n", url, clip.Len())) - resp, err := client.Post(url, "video/mp2t", post) - if err != nil { - return fmt.Errorf("Error posting to %s: %s", url, err) - } - defer resp.Body.Close() - - body, err := ioutil.ReadAll(resp.Body) - if err == nil { - r.Log(Debug, fmt.Sprintf("%s\n", body)) - } else { - r.Log(Error, err.Error()) - } - - return nil -} - -// sendClipToFfmpegRtmp sends the clip over the current rtmp connection using -// an ffmpeg process. -func (r *Revid) sendClipToFfmpegRtmp(clip *ring.Chunk) error { - r.sendMutex.Lock() - _, err := clip.WriteTo(r.ffmpegStdin) - r.sendMutex.Unlock() - return err -} - -// sendClipToLibRtmp send the clip over the current rtmp connection using the -// c based librtmp library -func (r *Revid) sendClipToLibRtmp(clip *ring.Chunk) error { - r.sendMutex.Lock() - _, err := clip.WriteTo(r.rtmpInst) - r.sendMutex.Unlock() - return err -} - -// setupOutputForFfmpegRtmp sets up output to rtmp using an ffmpeg process -func (r *Revid) setupOutputForFfmpegRtmp() error { - r.ffmpegCmd = exec.Command(ffmpegPath, - "-f", "h264", - "-r", r.config.FrameRate, - "-i", "-", - "-f", "lavfi", - "-i", "aevalsrc=0", - "-fflags", "nobuffer", - "-vcodec", "copy", - "-acodec", "aac", - "-map", "0:0", - "-map", "1:0", - "-strict", "experimental", - "-f", "flv", - r.config.RtmpUrl, - ) - var err error - r.ffmpegStdin, err = r.ffmpegCmd.StdinPipe() - if err != nil { - r.Log(Error, err.Error()) - r.Stop() - return err - } - err = r.ffmpegCmd.Start() - if err != nil { - r.Log(Error, err.Error()) - r.Stop() - return err - } - return nil -} - -// setupOutputForLibRtmp sets up rtmp output using the wrapper for the c based -// librtmp library - makes connection and starts comms etc. -func (r *Revid) setupOutputForLibRtmp() error { - r.rtmpInst = rtmp.NewSession(r.config.RtmpUrl, rtmpConnectionTimout) - err := r.rtmpInst.StartSession() - for noOfTries := 0; err != nil && noOfTries < rtmpConnectionMaxTries; noOfTries++ { - r.rtmpInst.Close() - r.Log(Error, err.Error()) - r.Log(Info, "Trying to establish rtmp connection again!") - r.rtmpInst = rtmp.NewSession(r.config.RtmpUrl, rtmpConnectionTimout) - err = r.rtmpInst.StartSession() - } - if err != nil { - return err - } - return err -} - -// setupOutputForFile sets up an output file to output data to -func (r *Revid) setupOutputForFile() (err error) { - r.outputFile, err = os.Create(r.config.OutputFileName) - return } // setupInputForRaspivid sets up things for input from raspivid i.e. starts @@ -634,17 +515,6 @@ func (r *Revid) setupInputForFile() error { return nil } -// testRtmp is useful to check robustness of connections. Intended to be run as -// goroutine. After every 'delayTime' the rtmp connection is ended and then -// restarted -func (r *Revid) testRtmp(delayTime uint) { - for { - time.Sleep(time.Duration(delayTime) * time.Millisecond) - r.rtmpInst.Close() - r.rtmpInst.StartSession() - } -} - // readCamera reads data from the defined camera while the Revid is running. // TODO: use ringbuffer here instead of allocating mem every time! func (r *Revid) readCamera() { diff --git a/revid/senders.go b/revid/senders.go new file mode 100644 index 00000000..66590956 --- /dev/null +++ b/revid/senders.go @@ -0,0 +1,294 @@ +/* +NAME + senders.go + +DESCRIPTION + See Readme.md + +AUTHORS + Saxon A. Nelson-Milton + Alan Noble + +LICENSE + revid is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + +package revid + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "net/http" + "os" + "os/exec" + "strconv" + "time" + + "bitbucket.org/ausocean/av/rtmp" + "bitbucket.org/ausocean/utils/ring" +) + +// loadSender is a destination to send a *ring.Chunk to. +// When a loadSender has finished using the *ring.Chunk +// it must be Closed. +type loadSender interface { + // load assigns the *ring.Chunk to the loadSender. + // The load call may fail, but must not mutate the + // the chunk. + load(*ring.Chunk) error + + // send performs a destination-specific send + // operation. It must not mutate the chunk. + send() error + + // release releases the *ring.Chunk. + release() + + // close cleans up after use of the loadSender. + close() error +} + +// restart is an optional interface for loadSenders that +// can restart their connection. +type restarter interface { + restart() error +} + +// fileSender implements loadSender for a local file destination. +type fileSender struct { + file *os.File + + chunk *ring.Chunk +} + +func newFileSender(path string) (*fileSender, error) { + f, err := os.Create(path) + if err != nil { + return nil, err + } + return &fileSender{file: f}, nil +} + +func (s *fileSender) load(c *ring.Chunk) error { + s.chunk = c + return nil +} + +func (s *fileSender) send() error { + _, err := s.chunk.WriteTo(s.file) + return err +} + +func (s *fileSender) release() { + s.chunk.Close() + s.chunk = nil +} + +func (s *fileSender) close() error { + return s.file.Close() +} + +// httpSender implements loadSender for an HTTP destination. +type httpSender struct { + client http.Client + url string + + log func(lvl, msg string) + + buf []byte + + chunk *ring.Chunk +} + +func newHttpSender(url string, timeout time.Duration, log func(lvl, msg string)) *httpSender { + return &httpSender{ + client: http.Client{Timeout: timeout}, + url: url, + log: log, + } +} + +func (s *httpSender) load(c *ring.Chunk) error { + buf := bytes.NewBuffer(s.buf[:0]) + _, err := s.chunk.WriteTo(buf) + s.buf = buf.Bytes() + if err != nil { + return fmt.Errorf("fileSender: %v", err) + } + return nil +} + +func (s *httpSender) send() error { + url := s.url + strconv.Itoa(len(s.buf)) + s.log(Debug, fmt.Sprintf("Posting %s (%d bytes)\n", url, len(s.buf))) + resp, err := s.client.Post(url, "video/mp2t", bytes.NewReader(s.buf)) + if err != nil { + return fmt.Errorf("Error posting to %s: %s", url, err) + } + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err == nil { + s.log(Debug, fmt.Sprintf("%s\n", body)) + } else { + s.log(Error, err.Error()) + } + return err +} + +func (s *httpSender) release() { + s.chunk.Close() + s.chunk = nil +} + +func (s *httpSender) close() error { return nil } + +// ffmpegSender implements loadSender for an FFMPEG RTMP destination. +type ffmpegSender struct { + ffmpeg io.WriteCloser + + chunk *ring.Chunk +} + +func newFfmpegSender(url, framerate string) (*ffmpegSender, error) { + cmd := exec.Command(ffmpegPath, + "-f", "h264", + "-r", framerate, + "-i", "-", + "-f", "lavfi", + "-i", "aevalsrc=0", + "-fflags", "nobuffer", + "-vcodec", "copy", + "-acodec", "aac", + "-map", "0:0", + "-map", "1:0", + "-strict", "experimental", + "-f", "flv", + url, + ) + w, err := cmd.StdinPipe() + if err != nil { + return nil, err + } + err = cmd.Start() + if err != nil { + return nil, err + } + return &ffmpegSender{ffmpeg: w}, nil +} + +func (s *ffmpegSender) load(c *ring.Chunk) error { + s.chunk = c + return nil +} + +func (s *ffmpegSender) send() error { + _, err := s.chunk.WriteTo(s.ffmpeg) + return err +} + +func (s *ffmpegSender) release() { + s.chunk.Close() + s.chunk = nil +} + +func (s *ffmpegSender) close() error { + return s.ffmpeg.Close() +} + +// rtmpSender implements loadSender for a native RTMP destination. +type rtmpSender struct { + sess rtmp.Session + + url string + timeout uint + retries int + log func(lvl, msg string) + + chunk *ring.Chunk +} + +var _ restarter = (*rtmpSender)(nil) + +func newRtmpSender(url string, timeout uint, retries int, log func(lvl, msg string)) (*rtmpSender, error) { + var sess rtmp.Session + var err error + for n := 0; n < retries; n++ { + sess = rtmp.NewSession(url, timeout) + err = sess.StartSession() + if err == nil { + break + } + log(Error, err.Error()) + sess.Close() + if n < retries-1 { + log(Info, "retry rtmp connection") + } + } + if err != nil { + return nil, err + } + + s := &rtmpSender{ + sess: sess, + url: url, + timeout: timeout, + retries: retries, + log: log, + } + return s, nil +} + +func (s *rtmpSender) load(c *ring.Chunk) error { + s.chunk = c + return nil +} + +func (s *rtmpSender) send() error { + _, err := s.chunk.WriteTo(s.sess) + return err +} + +func (s *rtmpSender) release() { + s.chunk.Close() + s.chunk = nil +} + +func (s *rtmpSender) restart() error { + err := s.sess.Close() + if err != nil { + return err + } + for n := 0; n < s.retries; n++ { + s.sess = rtmp.NewSession(s.url, s.timeout) + err = s.sess.StartSession() + if err == nil { + break + } + s.log(Error, err.Error()) + s.sess.Close() + if n < s.retries-1 { + s.log(Info, "retry rtmp connection") + } + } + return err +} + +func (s *rtmpSender) close() error { + return s.sess.Close() +}