Use a separate sendMutex not the main mutex!

This commit is contained in:
Alan Noble 2018-05-07 23:06:06 +09:30
parent cc45b02609
commit 953cecf9da
1 changed files with 13 additions and 8 deletions

View File

@ -115,6 +115,7 @@ type revid struct {
sendClip func(clip []byte) error sendClip func(clip []byte) error
rtmpInst rtmp.RTMPSession rtmpInst rtmp.RTMPSession
mutex sync.Mutex mutex sync.Mutex
sendMutex sync.Mutex
currentBitrate int64 currentBitrate int64
} }
@ -124,6 +125,7 @@ type revid struct {
func NewRevid(config Config) (r *revid, err error) { func NewRevid(config Config) (r *revid, err error) {
r = new(revid) r = new(revid)
r.mutex = sync.Mutex{} r.mutex = sync.Mutex{}
r.sendMutex = sync.Mutex{}
r.ringBuffer = ringbuffer.NewRingBuffer(ringBufferSize, ringBufferElementSize) r.ringBuffer = ringbuffer.NewRingBuffer(ringBufferSize, ringBufferElementSize)
err = r.changeState(config) err = r.changeState(config)
if err != nil { if err != nil {
@ -280,6 +282,9 @@ func (r *revid) Stop() {
} }
r.Log(Info, "Stopping revid!") r.Log(Info, "Stopping revid!")
// wait for sending to finish
r.sendMutex.Lock()
defer r.sendMutex.Unlock()
r.rtmpInst.EndSession() r.rtmpInst.EndSession()
r.isRunning = false r.isRunning = false
@ -463,8 +468,8 @@ func (r *revid) outputClips() {
// senClipToFile writes the passed clip to a file // senClipToFile writes the passed clip to a file
func (r *revid) sendClipToFile(clip []byte) error { func (r *revid) sendClipToFile(clip []byte) error {
r.mutex.Lock() r.sendMutex.Lock()
defer r.mutex.Unlock() defer r.sendMutex.Unlock()
_, err := r.outputFile.Write(clip) _, err := r.outputFile.Write(clip)
if err != nil { if err != nil {
return err return err
@ -474,8 +479,8 @@ func (r *revid) sendClipToFile(clip []byte) error {
// sendClipToHTTP takes a clip and an output url and posts through http. // sendClipToHTTP takes a clip and an output url and posts through http.
func (r *revid) sendClipToHTTP(clip []byte) error { func (r *revid) sendClipToHTTP(clip []byte) error {
r.mutex.Lock() r.sendMutex.Lock()
defer r.mutex.Unlock() defer r.sendMutex.Unlock()
timeout := time.Duration(httpTimeOut * time.Second) timeout := time.Duration(httpTimeOut * time.Second)
client := http.Client{Timeout: timeout} client := http.Client{Timeout: timeout}
url := r.config.HttpAddress + strconv.Itoa(len(clip)) url := r.config.HttpAddress + strconv.Itoa(len(clip))
@ -497,8 +502,8 @@ func (r *revid) sendClipToHTTP(clip []byte) error {
// sendClipToFfmpegRtmp sends the clip over the current rtmp connection using // sendClipToFfmpegRtmp sends the clip over the current rtmp connection using
// an ffmpeg process. // an ffmpeg process.
func (r *revid) sendClipToFfmpegRtmp(clip []byte) (err error) { func (r *revid) sendClipToFfmpegRtmp(clip []byte) (err error) {
r.mutex.Lock() r.sendMutex.Lock()
defer r.mutex.Unlock() defer r.sendMutex.Unlock()
_, err = r.ffmpegStdin.Write(clip) _, err = r.ffmpegStdin.Write(clip)
return return
} }
@ -506,8 +511,8 @@ func (r *revid) sendClipToFfmpegRtmp(clip []byte) (err error) {
// sendClipToLibRtmp send the clip over the current rtmp connection using the // sendClipToLibRtmp send the clip over the current rtmp connection using the
// c based librtmp library // c based librtmp library
func (r *revid) sendClipToLibRtmp(clip []byte) (err error) { func (r *revid) sendClipToLibRtmp(clip []byte) (err error) {
r.mutex.Lock() r.sendMutex.Lock()
defer r.mutex.Unlock() defer r.sendMutex.Unlock()
err = r.rtmpInst.WriteFrame(clip, uint(len(clip))) err = r.rtmpInst.WriteFrame(clip, uint(len(clip)))
return return
} }