From fc4e88bce6c7469f01753b3e552d9c8d2a3adf1e Mon Sep 17 00:00:00 2001 From: Saxon Date: Mon, 7 Oct 2019 14:18:20 +1030 Subject: [PATCH] fixed bugs causing problems with GeoVision H264 RTMP to youtube streaming. Two fixes were involved, firstly, access unit delimeters were used to denote access units and the Write timeout on rtmpSender's ringBuffer was increased to stop the 'unable to dump old write' errors. Also made some small changes elsewhere that should fix future issues, particular with MTS output. --- codec/h264/extract.go | 40 +++++++++++++++++++++---------------- codec/h264/h264dec/frame.go | 8 ++++---- codec/h264/h264dec/read.go | 6 +++--- container/flv/encoder.go | 2 +- revid/config.go | 12 ++++++++--- revid/revid.go | 19 ++++++++++++++++-- revid/senders.go | 2 +- 7 files changed, 58 insertions(+), 31 deletions(-) diff --git a/codec/h264/extract.go b/codec/h264/extract.go index d4fdcb0f..f432de31 100644 --- a/codec/h264/extract.go +++ b/codec/h264/extract.go @@ -33,6 +33,7 @@ import ( "io" "time" + "bitbucket.org/ausocean/av/codec/h264/h264dec" "bitbucket.org/ausocean/av/protocol/rtp" ) @@ -68,10 +69,15 @@ const ( maxRTPSize = 1500 // Max ethernet transmission unit in bytes. ) +// Bytes for an access unit delimeter. +var aud = []byte{0x00, 0x00, 0x01, 0x09, 0xf0} + // Extracter is an extracter for extracting H264 access units from RTP stream. type Extracter struct { - buf *bytes.Buffer // Holds the current access unit. - frag bool // Indicates if we're currently dealing with a fragmentation packet. + buf *bytes.Buffer // Holds the current access unit. + frag bool // Indicates if we're currently dealing with a fragmentation packet. + dst io.Writer // The destination we'll be writing extracted NALUs to. + toWrite []byte // Holds the current NAL unit with start code to be written. } // NewExtracter returns a new Extracter. @@ -83,6 +89,9 @@ func NewExtracter() *Extracter { // Extract extracts H264 access units from an RTP stream. This function // expects that each read from src will provide a single RTP packet. func (e *Extracter) Extract(dst io.Writer, src io.Reader, delay time.Duration) error { + e.toWrite = []byte{0, 0, 0, 1} + e.buf.Write(aud) + e.dst = dst buf := make([]byte, maxRTPSize) for { n, err := src.Read(buf) @@ -133,16 +142,6 @@ func (e *Extracter) Extract(dst io.Writer, src io.Reader, delay time.Duration) e panic("unsupported type") } } - - markerIsSet, err := rtp.Marker(buf[:n]) - if err != nil { - return fmt.Errorf("could not get marker bit, failed with err: %v\n", err) - } - - if markerIsSet { - e.buf.WriteTo(dst) - e.buf.Reset() - } } return nil } @@ -202,12 +201,19 @@ func (e *Extracter) handleFUA(d []byte) { } } -// write writes a NAL unit to the Extracter's buf in byte stream format using the -// start code. +// writeWithPrefix writes a NAL unit to the Extracter's buf in byte stream format +// using the start code, and sends any ready prior access unit stored in the buf +// to the destination. func (e *Extracter) writeWithPrefix(d []byte) { - const prefix = "\x00\x00\x00\x01" - e.buf.Write([]byte(prefix)) - e.buf.Write(d) + e.toWrite = append(e.toWrite, d...) + curType, _ := NALType(e.toWrite) + if e.buf.Len() != 0 && (curType == h264dec.NALTypeIDR || curType == h264dec.NALTypeNonIDR) { + e.buf.WriteTo(e.dst) + e.buf.Reset() + e.buf.Write(aud) + } + e.buf.Write(e.toWrite) + e.toWrite = e.toWrite[:4] } // writeNoPrefix writes data to the Extracter's buf. This is used for non start diff --git a/codec/h264/h264dec/frame.go b/codec/h264/h264dec/frame.go index 1904a09d..0e510e8e 100644 --- a/codec/h264/h264dec/frame.go +++ b/codec/h264/h264dec/frame.go @@ -3,14 +3,14 @@ package h264dec // NALU types, as defined in table 7-1 in specifications. const ( naluTypeUnspecified = iota - naluTypeSliceNonIDRPicture + NALTypeNonIDR naluTypeSlicePartA naluTypeSlicePartB naluTypeSlicePartC - naluTypeSliceIDRPicture - naluTypeSEI + NALTypeIDR + NALTypeSEI NALTypeSPS - naluTypePPS + NALTypePPS NALTypeAccessUnitDelimiter naluTypeEndOfSequence naluTypeEndOfStream diff --git a/codec/h264/h264dec/read.go b/codec/h264/h264dec/read.go index 9b283c92..5adb5647 100644 --- a/codec/h264/h264dec/read.go +++ b/codec/h264/h264dec/read.go @@ -68,14 +68,14 @@ func (h *H264Reader) Start() { h.VideoStreams, &VideoStream{SPS: sps}, ) - case naluTypePPS: + case NALTypePPS: videoStream := h.VideoStreams[len(h.VideoStreams)-1] // TODO: handle this error // TODO: fix chromaFormat videoStream.PPS, _ = NewPPS(nil, 0) - case naluTypeSliceIDRPicture: + case NALTypeIDR: fallthrough - case naluTypeSliceNonIDRPicture: + case NALTypeNonIDR: videoStream := h.VideoStreams[len(h.VideoStreams)-1] logger.Printf("info: frame number %d\n", len(videoStream.Slices)) // TODO: handle this error diff --git a/container/flv/encoder.go b/container/flv/encoder.go index 306d4b66..6014a15f 100644 --- a/container/flv/encoder.go +++ b/container/flv/encoder.go @@ -115,7 +115,7 @@ func isKeyFrame(frame []byte) bool { return false } switch nalTyp := b & 0x1f; nalTyp { - case idrPic, suppEnhInf: + case idrPic: return true case nonIdrPic: return false diff --git a/revid/config.go b/revid/config.go index 21f509b4..46fbe587 100644 --- a/revid/config.go +++ b/revid/config.go @@ -100,6 +100,7 @@ const ( defaultInputCodec = codecutil.H264 defaultVerbosity = logger.Error defaultRtpAddr = "localhost:6970" + defaultRTSPURL = "rtsp://admin:admin@192.168.1.50:8554/CH001.sdp" defaultBurstPeriod = 10 // Seconds // Raspivid video defaults. @@ -124,8 +125,8 @@ const ( // Ringbuffer defaults. defaultMTSRBSize = 1000 defaultMTSRBElementSize = 100000 - defaultRTMPRBSize = 500 - defaultRTMPRBElementSize = 200000 + defaultRTMPRBSize = 1000 + defaultRTMPRBElementSize = 300000 ) // Config provides parameters relevant to a revid instance. A new config must @@ -277,7 +278,12 @@ func (c *Config) Validate() error { } switch c.Input { - case Raspivid, V4L, File, Audio, RTSP: + case Raspivid, V4L, File, Audio: + case RTSP: + if c.RTSPURL == "" { + c.Logger.Log(logger.Info, pkg+"no RTSPURL defined, defaulting", "RTSPURL", defaultRTSPURL) + c.RTSPURL = defaultRTSPURL + } case NothingDefined: c.Logger.Log(logger.Info, pkg+"no input type defined, defaulting", "input", defaultInput) c.Input = defaultInput diff --git a/revid/revid.go b/revid/revid.go index 1c5caac3..0f5b51da 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -181,7 +181,15 @@ func (r *Revid) reset(config Config) error { case File, V4L: st = mts.EncodeH264 case RTSP: - st = mts.EncodeH265 + switch r.config.InputCodec { + case codecutil.H265: + st = mts.EncodeH265 + case codecutil.H264: + st = mts.EncodeH264 + case codecutil.MJPEG: + st = mts.EncodeMJPEG + encOptions = append(encOptions, mts.PacketBasedPSI(int(r.config.MinFrames))) + } case Audio: st = mts.EncodeAudio } @@ -309,7 +317,14 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. r.setupInput = r.setupInputForFile case RTSP: r.setupInput = r.startRTSPCamera - r.lexTo = h265.NewLexer(false).Lex + switch r.config.InputCodec { + case codecutil.H264: + r.lexTo = h264.NewExtracter().Extract + case codecutil.H265: + r.lexTo = h265.NewLexer(false).Lex + case codecutil.MJPEG: + panic("not implemented") + } case Audio: r.setupInput = r.startAudioDevice r.lexTo = codecutil.NewByteLexer(&r.config.ChunkSize).Lex diff --git a/revid/senders.go b/revid/senders.go index a6a593bd..9680f986 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -294,7 +294,7 @@ func newRtmpSender(url string, timeout uint, retries, rbSize, rbElementSize int, timeout: timeout, retries: retries, log: log, - ring: ring.NewBuffer(rbSize, rbElementSize, 0), + ring: ring.NewBuffer(rbSize, rbElementSize, 5*time.Second), done: make(chan struct{}), } s.wg.Add(1)