fixed bugs causing problems with GeoVision H264 RTMP to youtube streaming.

Two fixes were involved, firstly, access unit delimeters were used to denote access units and the Write timeout on rtmpSender's ringBuffer was
increased to stop the 'unable to dump old write' errors. Also made some small changes elsewhere that should fix future issues, particular with
MTS output.
This commit is contained in:
Saxon 2019-10-07 14:18:20 +10:30
parent 8e3f173162
commit fc4e88bce6
7 changed files with 58 additions and 31 deletions

View File

@ -33,6 +33,7 @@ import (
"io" "io"
"time" "time"
"bitbucket.org/ausocean/av/codec/h264/h264dec"
"bitbucket.org/ausocean/av/protocol/rtp" "bitbucket.org/ausocean/av/protocol/rtp"
) )
@ -68,10 +69,15 @@ const (
maxRTPSize = 1500 // Max ethernet transmission unit in bytes. maxRTPSize = 1500 // Max ethernet transmission unit in bytes.
) )
// Bytes for an access unit delimeter.
var aud = []byte{0x00, 0x00, 0x01, 0x09, 0xf0}
// Extracter is an extracter for extracting H264 access units from RTP stream. // Extracter is an extracter for extracting H264 access units from RTP stream.
type Extracter struct { type Extracter struct {
buf *bytes.Buffer // Holds the current access unit. buf *bytes.Buffer // Holds the current access unit.
frag bool // Indicates if we're currently dealing with a fragmentation packet. frag bool // Indicates if we're currently dealing with a fragmentation packet.
dst io.Writer // The destination we'll be writing extracted NALUs to.
toWrite []byte // Holds the current NAL unit with start code to be written.
} }
// NewExtracter returns a new Extracter. // NewExtracter returns a new Extracter.
@ -83,6 +89,9 @@ func NewExtracter() *Extracter {
// Extract extracts H264 access units from an RTP stream. This function // Extract extracts H264 access units from an RTP stream. This function
// expects that each read from src will provide a single RTP packet. // expects that each read from src will provide a single RTP packet.
func (e *Extracter) Extract(dst io.Writer, src io.Reader, delay time.Duration) error { func (e *Extracter) Extract(dst io.Writer, src io.Reader, delay time.Duration) error {
e.toWrite = []byte{0, 0, 0, 1}
e.buf.Write(aud)
e.dst = dst
buf := make([]byte, maxRTPSize) buf := make([]byte, maxRTPSize)
for { for {
n, err := src.Read(buf) n, err := src.Read(buf)
@ -133,16 +142,6 @@ func (e *Extracter) Extract(dst io.Writer, src io.Reader, delay time.Duration) e
panic("unsupported type") panic("unsupported type")
} }
} }
markerIsSet, err := rtp.Marker(buf[:n])
if err != nil {
return fmt.Errorf("could not get marker bit, failed with err: %v\n", err)
}
if markerIsSet {
e.buf.WriteTo(dst)
e.buf.Reset()
}
} }
return nil return nil
} }
@ -202,12 +201,19 @@ func (e *Extracter) handleFUA(d []byte) {
} }
} }
// write writes a NAL unit to the Extracter's buf in byte stream format using the // writeWithPrefix writes a NAL unit to the Extracter's buf in byte stream format
// start code. // using the start code, and sends any ready prior access unit stored in the buf
// to the destination.
func (e *Extracter) writeWithPrefix(d []byte) { func (e *Extracter) writeWithPrefix(d []byte) {
const prefix = "\x00\x00\x00\x01" e.toWrite = append(e.toWrite, d...)
e.buf.Write([]byte(prefix)) curType, _ := NALType(e.toWrite)
e.buf.Write(d) if e.buf.Len() != 0 && (curType == h264dec.NALTypeIDR || curType == h264dec.NALTypeNonIDR) {
e.buf.WriteTo(e.dst)
e.buf.Reset()
e.buf.Write(aud)
}
e.buf.Write(e.toWrite)
e.toWrite = e.toWrite[:4]
} }
// writeNoPrefix writes data to the Extracter's buf. This is used for non start // writeNoPrefix writes data to the Extracter's buf. This is used for non start

View File

@ -3,14 +3,14 @@ package h264dec
// NALU types, as defined in table 7-1 in specifications. // NALU types, as defined in table 7-1 in specifications.
const ( const (
naluTypeUnspecified = iota naluTypeUnspecified = iota
naluTypeSliceNonIDRPicture NALTypeNonIDR
naluTypeSlicePartA naluTypeSlicePartA
naluTypeSlicePartB naluTypeSlicePartB
naluTypeSlicePartC naluTypeSlicePartC
naluTypeSliceIDRPicture NALTypeIDR
naluTypeSEI NALTypeSEI
NALTypeSPS NALTypeSPS
naluTypePPS NALTypePPS
NALTypeAccessUnitDelimiter NALTypeAccessUnitDelimiter
naluTypeEndOfSequence naluTypeEndOfSequence
naluTypeEndOfStream naluTypeEndOfStream

View File

@ -68,14 +68,14 @@ func (h *H264Reader) Start() {
h.VideoStreams, h.VideoStreams,
&VideoStream{SPS: sps}, &VideoStream{SPS: sps},
) )
case naluTypePPS: case NALTypePPS:
videoStream := h.VideoStreams[len(h.VideoStreams)-1] videoStream := h.VideoStreams[len(h.VideoStreams)-1]
// TODO: handle this error // TODO: handle this error
// TODO: fix chromaFormat // TODO: fix chromaFormat
videoStream.PPS, _ = NewPPS(nil, 0) videoStream.PPS, _ = NewPPS(nil, 0)
case naluTypeSliceIDRPicture: case NALTypeIDR:
fallthrough fallthrough
case naluTypeSliceNonIDRPicture: case NALTypeNonIDR:
videoStream := h.VideoStreams[len(h.VideoStreams)-1] videoStream := h.VideoStreams[len(h.VideoStreams)-1]
logger.Printf("info: frame number %d\n", len(videoStream.Slices)) logger.Printf("info: frame number %d\n", len(videoStream.Slices))
// TODO: handle this error // TODO: handle this error

View File

@ -115,7 +115,7 @@ func isKeyFrame(frame []byte) bool {
return false return false
} }
switch nalTyp := b & 0x1f; nalTyp { switch nalTyp := b & 0x1f; nalTyp {
case idrPic, suppEnhInf: case idrPic:
return true return true
case nonIdrPic: case nonIdrPic:
return false return false

View File

@ -100,6 +100,7 @@ const (
defaultInputCodec = codecutil.H264 defaultInputCodec = codecutil.H264
defaultVerbosity = logger.Error defaultVerbosity = logger.Error
defaultRtpAddr = "localhost:6970" defaultRtpAddr = "localhost:6970"
defaultRTSPURL = "rtsp://admin:admin@192.168.1.50:8554/CH001.sdp"
defaultBurstPeriod = 10 // Seconds defaultBurstPeriod = 10 // Seconds
// Raspivid video defaults. // Raspivid video defaults.
@ -124,8 +125,8 @@ const (
// Ringbuffer defaults. // Ringbuffer defaults.
defaultMTSRBSize = 1000 defaultMTSRBSize = 1000
defaultMTSRBElementSize = 100000 defaultMTSRBElementSize = 100000
defaultRTMPRBSize = 500 defaultRTMPRBSize = 1000
defaultRTMPRBElementSize = 200000 defaultRTMPRBElementSize = 300000
) )
// Config provides parameters relevant to a revid instance. A new config must // Config provides parameters relevant to a revid instance. A new config must
@ -277,7 +278,12 @@ func (c *Config) Validate() error {
} }
switch c.Input { switch c.Input {
case Raspivid, V4L, File, Audio, RTSP: case Raspivid, V4L, File, Audio:
case RTSP:
if c.RTSPURL == "" {
c.Logger.Log(logger.Info, pkg+"no RTSPURL defined, defaulting", "RTSPURL", defaultRTSPURL)
c.RTSPURL = defaultRTSPURL
}
case NothingDefined: case NothingDefined:
c.Logger.Log(logger.Info, pkg+"no input type defined, defaulting", "input", defaultInput) c.Logger.Log(logger.Info, pkg+"no input type defined, defaulting", "input", defaultInput)
c.Input = defaultInput c.Input = defaultInput

View File

@ -181,7 +181,15 @@ func (r *Revid) reset(config Config) error {
case File, V4L: case File, V4L:
st = mts.EncodeH264 st = mts.EncodeH264
case RTSP: case RTSP:
switch r.config.InputCodec {
case codecutil.H265:
st = mts.EncodeH265 st = mts.EncodeH265
case codecutil.H264:
st = mts.EncodeH264
case codecutil.MJPEG:
st = mts.EncodeMJPEG
encOptions = append(encOptions, mts.PacketBasedPSI(int(r.config.MinFrames)))
}
case Audio: case Audio:
st = mts.EncodeAudio st = mts.EncodeAudio
} }
@ -309,7 +317,14 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
r.setupInput = r.setupInputForFile r.setupInput = r.setupInputForFile
case RTSP: case RTSP:
r.setupInput = r.startRTSPCamera r.setupInput = r.startRTSPCamera
switch r.config.InputCodec {
case codecutil.H264:
r.lexTo = h264.NewExtracter().Extract
case codecutil.H265:
r.lexTo = h265.NewLexer(false).Lex r.lexTo = h265.NewLexer(false).Lex
case codecutil.MJPEG:
panic("not implemented")
}
case Audio: case Audio:
r.setupInput = r.startAudioDevice r.setupInput = r.startAudioDevice
r.lexTo = codecutil.NewByteLexer(&r.config.ChunkSize).Lex r.lexTo = codecutil.NewByteLexer(&r.config.ChunkSize).Lex

View File

@ -294,7 +294,7 @@ func newRtmpSender(url string, timeout uint, retries, rbSize, rbElementSize int,
timeout: timeout, timeout: timeout,
retries: retries, retries: retries,
log: log, log: log,
ring: ring.NewBuffer(rbSize, rbElementSize, 0), ring: ring.NewBuffer(rbSize, rbElementSize, 5*time.Second),
done: make(chan struct{}), done: make(chan struct{}),
} }
s.wg.Add(1) s.wg.Add(1)