diff --git a/codec/h264/extract.go b/codec/h264/extract.go index d4fdcb0f..f432de31 100644 --- a/codec/h264/extract.go +++ b/codec/h264/extract.go @@ -33,6 +33,7 @@ import ( "io" "time" + "bitbucket.org/ausocean/av/codec/h264/h264dec" "bitbucket.org/ausocean/av/protocol/rtp" ) @@ -68,10 +69,15 @@ const ( maxRTPSize = 1500 // Max ethernet transmission unit in bytes. ) +// Bytes for an access unit delimeter. +var aud = []byte{0x00, 0x00, 0x01, 0x09, 0xf0} + // Extracter is an extracter for extracting H264 access units from RTP stream. type Extracter struct { - buf *bytes.Buffer // Holds the current access unit. - frag bool // Indicates if we're currently dealing with a fragmentation packet. + buf *bytes.Buffer // Holds the current access unit. + frag bool // Indicates if we're currently dealing with a fragmentation packet. + dst io.Writer // The destination we'll be writing extracted NALUs to. + toWrite []byte // Holds the current NAL unit with start code to be written. } // NewExtracter returns a new Extracter. @@ -83,6 +89,9 @@ func NewExtracter() *Extracter { // Extract extracts H264 access units from an RTP stream. This function // expects that each read from src will provide a single RTP packet. func (e *Extracter) Extract(dst io.Writer, src io.Reader, delay time.Duration) error { + e.toWrite = []byte{0, 0, 0, 1} + e.buf.Write(aud) + e.dst = dst buf := make([]byte, maxRTPSize) for { n, err := src.Read(buf) @@ -133,16 +142,6 @@ func (e *Extracter) Extract(dst io.Writer, src io.Reader, delay time.Duration) e panic("unsupported type") } } - - markerIsSet, err := rtp.Marker(buf[:n]) - if err != nil { - return fmt.Errorf("could not get marker bit, failed with err: %v\n", err) - } - - if markerIsSet { - e.buf.WriteTo(dst) - e.buf.Reset() - } } return nil } @@ -202,12 +201,19 @@ func (e *Extracter) handleFUA(d []byte) { } } -// write writes a NAL unit to the Extracter's buf in byte stream format using the -// start code. +// writeWithPrefix writes a NAL unit to the Extracter's buf in byte stream format +// using the start code, and sends any ready prior access unit stored in the buf +// to the destination. func (e *Extracter) writeWithPrefix(d []byte) { - const prefix = "\x00\x00\x00\x01" - e.buf.Write([]byte(prefix)) - e.buf.Write(d) + e.toWrite = append(e.toWrite, d...) + curType, _ := NALType(e.toWrite) + if e.buf.Len() != 0 && (curType == h264dec.NALTypeIDR || curType == h264dec.NALTypeNonIDR) { + e.buf.WriteTo(e.dst) + e.buf.Reset() + e.buf.Write(aud) + } + e.buf.Write(e.toWrite) + e.toWrite = e.toWrite[:4] } // writeNoPrefix writes data to the Extracter's buf. This is used for non start diff --git a/codec/h264/h264dec/frame.go b/codec/h264/h264dec/frame.go index 1904a09d..0e510e8e 100644 --- a/codec/h264/h264dec/frame.go +++ b/codec/h264/h264dec/frame.go @@ -3,14 +3,14 @@ package h264dec // NALU types, as defined in table 7-1 in specifications. const ( naluTypeUnspecified = iota - naluTypeSliceNonIDRPicture + NALTypeNonIDR naluTypeSlicePartA naluTypeSlicePartB naluTypeSlicePartC - naluTypeSliceIDRPicture - naluTypeSEI + NALTypeIDR + NALTypeSEI NALTypeSPS - naluTypePPS + NALTypePPS NALTypeAccessUnitDelimiter naluTypeEndOfSequence naluTypeEndOfStream diff --git a/codec/h264/h264dec/read.go b/codec/h264/h264dec/read.go index 9b283c92..5adb5647 100644 --- a/codec/h264/h264dec/read.go +++ b/codec/h264/h264dec/read.go @@ -68,14 +68,14 @@ func (h *H264Reader) Start() { h.VideoStreams, &VideoStream{SPS: sps}, ) - case naluTypePPS: + case NALTypePPS: videoStream := h.VideoStreams[len(h.VideoStreams)-1] // TODO: handle this error // TODO: fix chromaFormat videoStream.PPS, _ = NewPPS(nil, 0) - case naluTypeSliceIDRPicture: + case NALTypeIDR: fallthrough - case naluTypeSliceNonIDRPicture: + case NALTypeNonIDR: videoStream := h.VideoStreams[len(h.VideoStreams)-1] logger.Printf("info: frame number %d\n", len(videoStream.Slices)) // TODO: handle this error diff --git a/container/flv/encoder.go b/container/flv/encoder.go index 306d4b66..6014a15f 100644 --- a/container/flv/encoder.go +++ b/container/flv/encoder.go @@ -115,7 +115,7 @@ func isKeyFrame(frame []byte) bool { return false } switch nalTyp := b & 0x1f; nalTyp { - case idrPic, suppEnhInf: + case idrPic: return true case nonIdrPic: return false diff --git a/revid/config.go b/revid/config.go index 21f509b4..46fbe587 100644 --- a/revid/config.go +++ b/revid/config.go @@ -100,6 +100,7 @@ const ( defaultInputCodec = codecutil.H264 defaultVerbosity = logger.Error defaultRtpAddr = "localhost:6970" + defaultRTSPURL = "rtsp://admin:admin@192.168.1.50:8554/CH001.sdp" defaultBurstPeriod = 10 // Seconds // Raspivid video defaults. @@ -124,8 +125,8 @@ const ( // Ringbuffer defaults. defaultMTSRBSize = 1000 defaultMTSRBElementSize = 100000 - defaultRTMPRBSize = 500 - defaultRTMPRBElementSize = 200000 + defaultRTMPRBSize = 1000 + defaultRTMPRBElementSize = 300000 ) // Config provides parameters relevant to a revid instance. A new config must @@ -277,7 +278,12 @@ func (c *Config) Validate() error { } switch c.Input { - case Raspivid, V4L, File, Audio, RTSP: + case Raspivid, V4L, File, Audio: + case RTSP: + if c.RTSPURL == "" { + c.Logger.Log(logger.Info, pkg+"no RTSPURL defined, defaulting", "RTSPURL", defaultRTSPURL) + c.RTSPURL = defaultRTSPURL + } case NothingDefined: c.Logger.Log(logger.Info, pkg+"no input type defined, defaulting", "input", defaultInput) c.Input = defaultInput diff --git a/revid/revid.go b/revid/revid.go index 1c5caac3..0f5b51da 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -181,7 +181,15 @@ func (r *Revid) reset(config Config) error { case File, V4L: st = mts.EncodeH264 case RTSP: - st = mts.EncodeH265 + switch r.config.InputCodec { + case codecutil.H265: + st = mts.EncodeH265 + case codecutil.H264: + st = mts.EncodeH264 + case codecutil.MJPEG: + st = mts.EncodeMJPEG + encOptions = append(encOptions, mts.PacketBasedPSI(int(r.config.MinFrames))) + } case Audio: st = mts.EncodeAudio } @@ -309,7 +317,14 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. r.setupInput = r.setupInputForFile case RTSP: r.setupInput = r.startRTSPCamera - r.lexTo = h265.NewLexer(false).Lex + switch r.config.InputCodec { + case codecutil.H264: + r.lexTo = h264.NewExtracter().Extract + case codecutil.H265: + r.lexTo = h265.NewLexer(false).Lex + case codecutil.MJPEG: + panic("not implemented") + } case Audio: r.setupInput = r.startAudioDevice r.lexTo = codecutil.NewByteLexer(&r.config.ChunkSize).Lex diff --git a/revid/senders.go b/revid/senders.go index a6a593bd..9680f986 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -294,7 +294,7 @@ func newRtmpSender(url string, timeout uint, retries, rbSize, rbElementSize int, timeout: timeout, retries: retries, log: log, - ring: ring.NewBuffer(rbSize, rbElementSize, 0), + ring: ring.NewBuffer(rbSize, rbElementSize, 5*time.Second), done: make(chan struct{}), } s.wg.Add(1)