From 222864108ff4a28ce140fe366b12c7ac44536233 Mon Sep 17 00:00:00 2001 From: Ella Pietraroia Date: Tue, 17 Dec 2019 10:21:56 +1030 Subject: [PATCH 1/3] making a time based psi method for when to send packets Added a case that allows packets to be sent by unit of time (in seconds) rather than by number of packets or nal methods. Also made a variable that can be changed in vidgrinder to choose the amount of time, called PsiTime --- container/mts/encoder.go | 59 ++++++++++++++++++++++++++++++++-------- revid/config/config.go | 24 ++++++++++------ revid/revid.go | 17 ++++++++++-- 3 files changed, 78 insertions(+), 22 deletions(-) diff --git a/container/mts/encoder.go b/container/mts/encoder.go index 251607e6..42ac5d17 100644 --- a/container/mts/encoder.go +++ b/container/mts/encoder.go @@ -155,9 +155,12 @@ type Encoder struct { continuity map[uint16]byte - nalBasedPSI bool + psiMethod int pktCount int psiSendCount int + psiTime time.Duration + temp time.Duration + startTime time.Time mediaPid uint16 streamID byte } @@ -167,21 +170,24 @@ type Encoder struct { func NewEncoder(dst io.WriteCloser, rate float64, mediaType int, options ...func(*Encoder) error) (*Encoder, error) { var mPID uint16 var sID byte - nbp := true + psim := timeBased switch mediaType { case EncodeAudio: mPID = AudioPid sID = audioStreamID - nbp = false + psim = pktBased case EncodeH265: mPID = VideoPid sID = H265ID + psim = nalBased case EncodeH264: mPID = VideoPid sID = H264ID + psim = nalBased case EncodeMJPEG: mPID = VideoPid sID = MJPEGID + psim = timeBased } pmt := BasePMT @@ -202,7 +208,7 @@ func NewEncoder(dst io.WriteCloser, rate float64, mediaType int, options ...func writePeriod: time.Duration(float64(time.Second) / rate), ptsOffset: ptsOffset, - nalBasedPSI: nbp, + psiMethod: psim, pktCount: 8, @@ -225,22 +231,47 @@ func NewEncoder(dst io.WriteCloser, rate float64, mediaType int, options ...func return e, nil } +const ( + pktBased = iota + timeBased + nalBased +) + // PacketBasedPSI is an option that can be passed to NewEncoder to select // packet based PSI writing, i.e. PSI are written to the destination every // sendCount packets. func PacketBasedPSI(sendCount int) func(*Encoder) error { return func(e *Encoder) error { - e.nalBasedPSI = false + e.psiMethod = pktBased e.psiSendCount = sendCount e.pktCount = e.psiSendCount return nil } } +func TimeBasedPSI(timeBetweenPSI time.Duration) func(*Encoder) error { + return func(e *Encoder) error { + e.psiMethod = timeBased + e.psiTime = 0 + e.temp = timeBetweenPSI + e.startTime = time.Now() + return nil + } +} + // Write implements io.Writer. Write takes raw video or audio data and encodes into MPEG-TS, // then sending it to the encoder's io.Writer destination. func (e *Encoder) Write(data []byte) (int, error) { - if e.nalBasedPSI { + switch e.psiMethod { + case pktBased: + if e.pktCount >= e.psiSendCount { + e.pktCount = 0 + err := e.writePSI() + if err != nil { + return 0, err + } + } + case nalBased: nalType, err := h264.NALType(data) if err != nil { return 0, fmt.Errorf("could not get type from NAL unit, failed with error: %w", err) @@ -252,12 +283,17 @@ func (e *Encoder) Write(data []byte) (int, error) { return 0, err } } - } else if e.pktCount >= e.psiSendCount { - e.pktCount = 0 - err := e.writePSI() - if err != nil { - return 0, err + case timeBased: + if time.Now().Sub(e.startTime) >= e.psiTime { + e.psiTime = e.temp + e.startTime = time.Now() + err := e.writePSI() + if err != nil { + return 0, err + } + } + default: } // Prepare PES data. @@ -319,6 +355,7 @@ func (e *Encoder) writePSI() error { } e.pktCount++ pmtTable, err = updateMeta(pmtTable) + if err != nil { return err } diff --git a/revid/config/config.go b/revid/config/config.go index 1e21b7c0..a9152e35 100644 --- a/revid/config/config.go +++ b/revid/config/config.go @@ -82,6 +82,7 @@ const ( defaultWriteRate = 25 defaultClipDuration = 0 defaultAudioInputCodec = codecutil.ADPCM + defaultPsiTime = 2 // MTS ring buffer defaults. defaultMTSRBSize = 100 @@ -238,14 +239,16 @@ type Config struct { Channels int // Number of audio channels, 1 for mono, 2 for stereo. BitDepth int // Sample bit depth. - RTPAddress string // RTPAddress defines the RTP output destination. - BurstPeriod uint // BurstPeriod defines the revid burst period in seconds. - Rotation uint // Rotation defines the video rotation angle in degrees Raspivid input. - Height uint // Height defines the input video height Raspivid input. - Width uint // Width defines the input video width Raspivid input. - Bitrate uint // Bitrate specifies the bitrate for constant bitrate in kbps. - HorizontalFlip bool // HorizontalFlip flips video horizontally for Raspivid input. - VerticalFlip bool // VerticalFlip flips video vertically for Raspivid input. + RTPAddress string // RTPAddress defines the RTP output destination. + BurstPeriod uint // BurstPeriod defines the revid burst period in seconds. + Rotation uint // Rotation defines the video rotation angle in degrees Raspivid input. + Height uint // Height defines the input video height Raspivid input. + Width uint // Width defines the input video width Raspivid input. + Bitrate uint // Bitrate specifies the bitrate for constant bitrate in kbps. + + HorizontalFlip bool // HorizontalFlip flips video horizontally for Raspivid input. + VerticalFlip bool // VerticalFlip flips video vertically for Raspivid input. + PsiTime int // Sets the time between a packet being sent // RTMP ring buffer parameters. RTMPRBSize int // The number of elements in the RTMP sender ringbuffer. @@ -385,6 +388,11 @@ func (c *Config) Validate() error { c.MTSRBWriteTimeout = defaultMTSRBWriteTimeout } + if c.PsiTime <= 0 { + c.Logger.Log(logger.Info, pkg+"PsiTime bad or unset, defaulting", "PsiTime", defaultPsiTime) + c.PsiTime = defaultPsiTime + } + return nil } diff --git a/revid/revid.go b/revid/revid.go index bd657ef1..be9d2f1d 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -75,6 +75,10 @@ const ( rtmpConnectionTimeout = 10 ) +const ( + defualtPsiTime = 4 +) + const pkg = "revid: " type Logger interface { @@ -186,7 +190,7 @@ func (r *Revid) reset(c config.Config) error { st = mts.EncodeH264 case codecutil.MJPEG: st = mts.EncodeMJPEG - encOptions = append(encOptions, mts.PacketBasedPSI(int(r.cfg.MinFrames))) + encOptions = append(encOptions, mts.TimeBasedPSI(time.Duration(r.cfg.PsiTime)*time.Second)) default: panic("unknown input codec for raspivid input") } @@ -196,7 +200,7 @@ func (r *Revid) reset(c config.Config) error { st = mts.EncodeH264 case codecutil.MJPEG: st = mts.EncodeMJPEG - encOptions = append(encOptions, mts.PacketBasedPSI(int(r.cfg.MinFrames))) + encOptions = append(encOptions, mts.TimeBasedPSI(time.Duration(r.cfg.PsiTime)*time.Second)) default: panic("unknown input codec for v4l or input file input") } @@ -208,7 +212,7 @@ func (r *Revid) reset(c config.Config) error { st = mts.EncodeH264 case codecutil.MJPEG: st = mts.EncodeMJPEG - encOptions = append(encOptions, mts.PacketBasedPSI(int(r.cfg.MinFrames))) + encOptions = append(encOptions, mts.TimeBasedPSI(time.Duration(r.cfg.PsiTime)*time.Second)) default: panic("unknown input codec for RTSP input") } @@ -611,6 +615,13 @@ func (r *Revid) Update(vars map[string]string) error { default: r.cfg.Logger.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value) } + case "PsiTime": + v, err := strconv.Atoi(value) + if err != nil || v < 0 { + r.cfg.Logger.Log(logger.Warning, pkg+"invalid PsiTime var", "value", value) + break + } + r.cfg.PsiTime = v case "BurstPeriod": v, err := strconv.Atoi(value) if err != nil { From 0ec0a08e0e10d619da7579259e73d321aa9f9c63 Mon Sep 17 00:00:00 2001 From: Ella Pietraroia Date: Thu, 19 Dec 2019 11:15:47 +1030 Subject: [PATCH 2/3] comments and some other small changes made to revid.go encoder.go and config.go --- container/mts/encoder.go | 26 ++++++++++++++++---------- revid/config/config.go | 10 +++++----- revid/revid.go | 14 +++++++------- 3 files changed, 28 insertions(+), 22 deletions(-) diff --git a/container/mts/encoder.go b/container/mts/encoder.go index 42ac5d17..20ccd3e1 100644 --- a/container/mts/encoder.go +++ b/container/mts/encoder.go @@ -159,7 +159,7 @@ type Encoder struct { pktCount int psiSendCount int psiTime time.Duration - temp time.Duration + psiSetTime time.Duration startTime time.Time mediaPid uint16 streamID byte @@ -170,24 +170,24 @@ type Encoder struct { func NewEncoder(dst io.WriteCloser, rate float64, mediaType int, options ...func(*Encoder) error) (*Encoder, error) { var mPID uint16 var sID byte - psim := timeBased + psiM := timeBased switch mediaType { case EncodeAudio: mPID = AudioPid sID = audioStreamID - psim = pktBased + psiM = pktBased case EncodeH265: mPID = VideoPid sID = H265ID - psim = nalBased + psiM = nalBased case EncodeH264: mPID = VideoPid sID = H264ID - psim = nalBased + psiM = nalBased case EncodeMJPEG: mPID = VideoPid sID = MJPEGID - psim = timeBased + psiM = timeBased } pmt := BasePMT @@ -208,7 +208,7 @@ func NewEncoder(dst io.WriteCloser, rate float64, mediaType int, options ...func writePeriod: time.Duration(float64(time.Second) / rate), ptsOffset: ptsOffset, - psiMethod: psim, + psiMethod: psiM, pktCount: 8, @@ -231,6 +231,8 @@ func NewEncoder(dst io.WriteCloser, rate float64, mediaType int, options ...func return e, nil } +// These three constants are used to select between the three different +// methods of when the PSI is sent. const ( pktBased = iota timeBased @@ -249,11 +251,14 @@ func PacketBasedPSI(sendCount int) func(*Encoder) error { } } -func TimeBasedPSI(timeBetweenPSI time.Duration) func(*Encoder) error { +// TimeBasedPSI is another option that can be passed to NewEncoder to select +// time based PSI writing, i.e. PSI are written to the destination every dur (duration) +// (defualt is 2 seconds). +func TimeBasedPSI(dur time.Duration) func(*Encoder) error { return func(e *Encoder) error { e.psiMethod = timeBased e.psiTime = 0 - e.temp = timeBetweenPSI + e.psiSetTime = dur e.startTime = time.Now() return nil } @@ -285,7 +290,7 @@ func (e *Encoder) Write(data []byte) (int, error) { } case timeBased: if time.Now().Sub(e.startTime) >= e.psiTime { - e.psiTime = e.temp + e.psiTime = e.psiSetTime e.startTime = time.Now() err := e.writePSI() if err != nil { @@ -294,6 +299,7 @@ func (e *Encoder) Write(data []byte) (int, error) { } default: + panic("No PSI method found") } // Prepare PES data. diff --git a/revid/config/config.go b/revid/config/config.go index a9152e35..f46bdfeb 100644 --- a/revid/config/config.go +++ b/revid/config/config.go @@ -82,7 +82,7 @@ const ( defaultWriteRate = 25 defaultClipDuration = 0 defaultAudioInputCodec = codecutil.ADPCM - defaultPsiTime = 2 + defaultPSITime = 2 // MTS ring buffer defaults. defaultMTSRBSize = 100 @@ -248,7 +248,7 @@ type Config struct { HorizontalFlip bool // HorizontalFlip flips video horizontally for Raspivid input. VerticalFlip bool // VerticalFlip flips video vertically for Raspivid input. - PsiTime int // Sets the time between a packet being sent + PSITime int // Sets the time between a packet being sent // RTMP ring buffer parameters. RTMPRBSize int // The number of elements in the RTMP sender ringbuffer. @@ -388,9 +388,9 @@ func (c *Config) Validate() error { c.MTSRBWriteTimeout = defaultMTSRBWriteTimeout } - if c.PsiTime <= 0 { - c.Logger.Log(logger.Info, pkg+"PsiTime bad or unset, defaulting", "PsiTime", defaultPsiTime) - c.PsiTime = defaultPsiTime + if c.PSITime <= 0 { + c.Logger.Log(logger.Info, pkg+"PSITime bad or unset, defaulting", "PSITime", defaultPSITime) + c.PSITime = defaultPSITime } return nil diff --git a/revid/revid.go b/revid/revid.go index be9d2f1d..74e4ccfd 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -76,7 +76,7 @@ const ( ) const ( - defualtPsiTime = 4 + defualtPSITime = 2 ) const pkg = "revid: " @@ -190,7 +190,7 @@ func (r *Revid) reset(c config.Config) error { st = mts.EncodeH264 case codecutil.MJPEG: st = mts.EncodeMJPEG - encOptions = append(encOptions, mts.TimeBasedPSI(time.Duration(r.cfg.PsiTime)*time.Second)) + encOptions = append(encOptions, mts.TimeBasedPSI(time.Duration(r.cfg.PSITime)*time.Second)) default: panic("unknown input codec for raspivid input") } @@ -200,7 +200,7 @@ func (r *Revid) reset(c config.Config) error { st = mts.EncodeH264 case codecutil.MJPEG: st = mts.EncodeMJPEG - encOptions = append(encOptions, mts.TimeBasedPSI(time.Duration(r.cfg.PsiTime)*time.Second)) + encOptions = append(encOptions, mts.TimeBasedPSI(time.Duration(r.cfg.PSITime)*time.Second)) default: panic("unknown input codec for v4l or input file input") } @@ -212,7 +212,7 @@ func (r *Revid) reset(c config.Config) error { st = mts.EncodeH264 case codecutil.MJPEG: st = mts.EncodeMJPEG - encOptions = append(encOptions, mts.TimeBasedPSI(time.Duration(r.cfg.PsiTime)*time.Second)) + encOptions = append(encOptions, mts.TimeBasedPSI(time.Duration(r.cfg.PSITime)*time.Second)) default: panic("unknown input codec for RTSP input") } @@ -615,13 +615,13 @@ func (r *Revid) Update(vars map[string]string) error { default: r.cfg.Logger.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value) } - case "PsiTime": + case "PSITime": v, err := strconv.Atoi(value) if err != nil || v < 0 { - r.cfg.Logger.Log(logger.Warning, pkg+"invalid PsiTime var", "value", value) + r.cfg.Logger.Log(logger.Warning, pkg+"invalid PSITime var", "value", value) break } - r.cfg.PsiTime = v + r.cfg.PSITime = v case "BurstPeriod": v, err := strconv.Atoi(value) if err != nil { From d0adae710ee67e60fd3dbf502f4877ab73045c12 Mon Sep 17 00:00:00 2001 From: Scott Date: Thu, 19 Dec 2019 14:21:32 +1030 Subject: [PATCH 3/3] PR fixes --- container/mts/encoder.go | 2 +- revid/revid.go | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/container/mts/encoder.go b/container/mts/encoder.go index 20ccd3e1..3447eebf 100644 --- a/container/mts/encoder.go +++ b/container/mts/encoder.go @@ -299,7 +299,7 @@ func (e *Encoder) Write(data []byte) (int, error) { } default: - panic("No PSI method found") + panic("Undefined PSI method") } // Prepare PES data. diff --git a/revid/revid.go b/revid/revid.go index 74e4ccfd..b2b399dc 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -75,10 +75,6 @@ const ( rtmpConnectionTimeout = 10 ) -const ( - defualtPSITime = 2 -) - const pkg = "revid: " type Logger interface {