Use mutex instead of isSending to be safe.

This commit is contained in:
Alan Noble 2018-05-07 22:39:58 +09:30
parent defe5c54a8
commit cc45b02609
1 changed files with 12 additions and 18 deletions

View File

@ -100,7 +100,6 @@ type revid struct {
ringBuffer ringbuffer.RingBuffer ringBuffer ringbuffer.RingBuffer
config Config config Config
isRunning bool isRunning bool
isSending bool
outputFile *os.File outputFile *os.File
inputFile *os.File inputFile *os.File
generator generator.Generator generator generator.Generator
@ -132,7 +131,6 @@ func NewRevid(config Config) (r *revid, err error) {
return return
} }
r.outputChan = make(chan []byte, outputChanSize) r.outputChan = make(chan []byte, outputChanSize)
r.isSending = false
r.isRunning = false r.isRunning = false
return return
} }
@ -267,23 +265,19 @@ func (r *revid) Start() {
r.generator.Start() r.generator.Start()
r.Log(Info, "Starting parser!") r.Log(Info, "Starting parser!")
r.parser.Start() r.parser.Start()
r.Log(Info, "Setting up input and recieving content!") r.Log(Info, "Setting up input and receiving content!")
go r.setupInput() go r.setupInput()
} }
// Stop halts any processing of video data from a camera or file // Stop halts any processing of video data from a camera or file
func (r *revid) Stop() { func (r *revid) Stop() {
r.mutex.Lock()
defer r.mutex.Unlock()
if !r.isRunning { if !r.isRunning {
r.Log(Warning, "revid.Stop() called but revid not running!") r.Log(Warning, "revid.Stop() called but revid not running!")
return return
} }
// Wait until we're not sending anything before we stop things
for r.isSending {
time.Sleep(sendingWaitTime)
}
r.mutex.Lock()
defer r.mutex.Unlock()
r.Log(Info, "Stopping revid!") r.Log(Info, "Stopping revid!")
r.rtmpInst.EndSession() r.rtmpInst.EndSession()
@ -469,18 +463,19 @@ func (r *revid) outputClips() {
// senClipToFile writes the passed clip to a file // senClipToFile writes the passed clip to a file
func (r *revid) sendClipToFile(clip []byte) error { func (r *revid) sendClipToFile(clip []byte) error {
r.isSending = true r.mutex.Lock()
defer r.mutex.Unlock()
_, err := r.outputFile.Write(clip) _, err := r.outputFile.Write(clip)
if err != nil { if err != nil {
return err return err
} }
r.isSending = false
return nil return nil
} }
// sendClipToHTTP takes a clip and an output url and posts through http. // sendClipToHTTP takes a clip and an output url and posts through http.
func (r *revid) sendClipToHTTP(clip []byte) error { func (r *revid) sendClipToHTTP(clip []byte) error {
r.isSending = true r.mutex.Lock()
defer r.mutex.Unlock()
timeout := time.Duration(httpTimeOut * time.Second) timeout := time.Duration(httpTimeOut * time.Second)
client := http.Client{Timeout: timeout} client := http.Client{Timeout: timeout}
url := r.config.HttpAddress + strconv.Itoa(len(clip)) url := r.config.HttpAddress + strconv.Itoa(len(clip))
@ -496,25 +491,24 @@ func (r *revid) sendClipToHTTP(clip []byte) error {
} else { } else {
r.Log(Error, err.Error()) r.Log(Error, err.Error())
} }
r.isSending = false
return nil return nil
} }
// sendClipToFfmpegRtmp sends the clip over the current rtmp connection using // sendClipToFfmpegRtmp sends the clip over the current rtmp connection using
// an ffmpeg process. // an ffmpeg process.
func (r *revid) sendClipToFfmpegRtmp(clip []byte) (err error) { func (r *revid) sendClipToFfmpegRtmp(clip []byte) (err error) {
r.isSending = true r.mutex.Lock()
defer r.mutex.Unlock()
_, err = r.ffmpegStdin.Write(clip) _, err = r.ffmpegStdin.Write(clip)
r.isSending = false
return return
} }
// sendClipToLibRtmp send the clip over the current rtmp connection using the // sendClipToLibRtmp send the clip over the current rtmp connection using the
// c based librtmp library // c based librtmp library
func (r *revid) sendClipToLibRtmp(clip []byte) (err error) { func (r *revid) sendClipToLibRtmp(clip []byte) (err error) {
r.isSending = true r.mutex.Lock()
defer r.mutex.Unlock()
err = r.rtmpInst.WriteFrame(clip, uint(len(clip))) err = r.rtmpInst.WriteFrame(clip, uint(len(clip)))
r.isSending = false
return return
} }