From 817cc86a3420fef884d9dfd06a90310d204aef4c Mon Sep 17 00:00:00 2001 From: Saxon Date: Sun, 25 Aug 2019 17:14:06 +0930 Subject: [PATCH 1/5] revid: now have MinPeriod and ClipDuration params This change included a rename of IntraRefreshPeriod to MinPeriod, and the addition of the ClipDuration param. PSI are now written before IDR. Clips are no longer outputed based on PSI but rather a time ClipDuration, where ClipDuration >= MinPeriod, however, PSI must still be at the beginning of each clip. Also created functionality to update meta time even if we don't have a response to update. --- cmd/revid-cli/main.go | 51 +++++++++++++------------- codec/h264/parse.go | 71 ++++++++++++++++++++++++++++++++++++ container/mts/encoder.go | 77 +++++++++++++++++++++++++++++++++------- container/mts/mpegts.go | 16 ++++++--- revid/config.go | 48 +++++++++++++++++-------- revid/revid.go | 24 ++++++++----- revid/senders.go | 26 +++++++++----- revid/senders_test.go | 13 +++---- 8 files changed, 244 insertions(+), 82 deletions(-) create mode 100644 codec/h264/parse.go diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 53f38833..5eca3ff7 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -106,32 +106,30 @@ func handleFlags() revid.Config { var cfg revid.Config var ( - cpuprofile = flag.String("cpuprofile", "", "write cpu profile to `file`") - - inputCodecPtr = flag.String("InputCodec", "H264", "The codec of the input: H264, Mjpeg, PCM, ADPCM") - inputPtr = flag.String("Input", "", "The input type: Raspivid, File, v4l, Audio, RTSP") - rtspURLPtr = flag.String("RTSPURL", "", "The URL for an RTSP server.") - verbosityPtr = flag.String("Verbosity", "Info", "Verbosity: Debug, Info, Warning, Error, Fatal") - rtpAddrPtr = flag.String("RtpAddr", "", "Rtp destination address: : (port is generally 6970-6999)") - logPathPtr = flag.String("LogPath", defaultLogPath, "The log path") - configFilePtr = flag.String("ConfigFile", "", "NetSender config file") - rtmpUrlPtr = flag.String("RtmpUrl", "", "Url of rtmp endpoint") - outputPathPtr = flag.String("OutputPath", "", "The directory of the output file") - inputFilePtr = flag.String("InputPath", "", "The directory of the input file") - httpAddressPtr = flag.String("HttpAddress", "", "Destination address of http posts") - verticalFlipPtr = flag.Bool("VerticalFlip", false, "Flip video vertically: Yes, No") - horizontalFlipPtr = flag.Bool("HorizontalFlip", false, "Flip video horizontally: Yes, No") - bitratePtr = flag.Uint("Bitrate", 0, "Bitrate of recorded video") - heightPtr = flag.Uint("Height", 0, "Height in pixels") - widthPtr = flag.Uint("Width", 0, "Width in pixels") - frameRatePtr = flag.Uint("FrameRate", 0, "Frame rate of captured video") - quantizationPtr = flag.Uint("Quantization", 0, "Desired quantization value: 0-40") - intraRefreshPeriodPtr = flag.Uint("IntraRefreshPeriod", 0, "The IntraRefreshPeriod i.e. how many keyframes we send") - rotationPtr = flag.Uint("Rotation", 0, "Rotate video output. (0-359 degrees)") - brightnessPtr = flag.Uint("Brightness", 50, "Set brightness. (0-100) ") - saturationPtr = flag.Int("Saturation", 0, "Set Saturation. (100-100)") - exposurePtr = flag.String("Exposure", "auto", "Set exposure mode. ("+strings.Join(revid.ExposureModes[:], ",")+")") - autoWhiteBalancePtr = flag.String("Awb", "auto", "Set automatic white balance mode. ("+strings.Join(revid.AutoWhiteBalanceModes[:], ",")+")") + cpuprofile = flag.String("cpuprofile", "", "write cpu profile to `file`") + inputCodecPtr = flag.String("InputCodec", "H264", "The codec of the input: H264, Mjpeg, PCM, ADPCM") + inputPtr = flag.String("Input", "", "The input type: Raspivid, File, v4l, Audio, RTSP") + rtspURLPtr = flag.String("RTSPURL", "", "The URL for an RTSP server.") + verbosityPtr = flag.String("Verbosity", "Info", "Verbosity: Debug, Info, Warning, Error, Fatal") + rtpAddrPtr = flag.String("RtpAddr", "", "Rtp destination address: : (port is generally 6970-6999)") + logPathPtr = flag.String("LogPath", defaultLogPath, "The log path") + configFilePtr = flag.String("ConfigFile", "", "NetSender config file") + rtmpUrlPtr = flag.String("RtmpUrl", "", "Url of rtmp endpoint") + outputPathPtr = flag.String("OutputPath", "", "The directory of the output file") + inputFilePtr = flag.String("InputPath", "", "The directory of the input file") + httpAddressPtr = flag.String("HttpAddress", "", "Destination address of http posts") + verticalFlipPtr = flag.Bool("VerticalFlip", false, "Flip video vertically: Yes, No") + horizontalFlipPtr = flag.Bool("HorizontalFlip", false, "Flip video horizontally: Yes, No") + bitratePtr = flag.Uint("Bitrate", 0, "Bitrate of recorded video") + heightPtr = flag.Uint("Height", 0, "Height in pixels") + widthPtr = flag.Uint("Width", 0, "Width in pixels") + frameRatePtr = flag.Uint("FrameRate", 0, "Frame rate of captured video") + quantizationPtr = flag.Uint("Quantization", 0, "Desired quantization value: 0-40") + rotationPtr = flag.Uint("Rotation", 0, "Rotate video output. (0-359 degrees)") + brightnessPtr = flag.Uint("Brightness", 50, "Set brightness. (0-100) ") + saturationPtr = flag.Int("Saturation", 0, "Set Saturation. (100-100)") + exposurePtr = flag.String("Exposure", "auto", "Set exposure mode. ("+strings.Join(revid.ExposureModes[:], ",")+")") + autoWhiteBalancePtr = flag.String("Awb", "auto", "Set automatic white balance mode. ("+strings.Join(revid.AutoWhiteBalanceModes[:], ",")+")") // Audio specific flags. sampleRatePtr = flag.Int("SampleRate", 48000, "Sample rate of recorded audio") @@ -246,7 +244,6 @@ func handleFlags() revid.Config { cfg.FrameRate = *frameRatePtr cfg.HTTPAddress = *httpAddressPtr cfg.Quantization = *quantizationPtr - cfg.IntraRefreshPeriod = *intraRefreshPeriodPtr cfg.RTPAddress = *rtpAddrPtr cfg.Brightness = *brightnessPtr cfg.Saturation = *saturationPtr diff --git a/codec/h264/parse.go b/codec/h264/parse.go new file mode 100644 index 00000000..9465fb1c --- /dev/null +++ b/codec/h264/parse.go @@ -0,0 +1,71 @@ +/* +DESCRIPTION + parse.go provides H.264 NAL unit parsing utilities for the extraction of + syntax elements. + +AUTHORS + Saxon A. Nelson-Milton + Dan Kortschak + +LICENSE + Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean). + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + +package h264 + +import "errors" + +var errNotEnoughBytes = errors.New("not enough bytes to read") + +// NALType returns the NAL type of the given NAL unit bytes. The given NAL unit +// may be in byte stream or packet format. +func NALType(n []byte) (int, error) { + sc := frameScanner{buf: n} + b, ok := sc.readByte() + if !ok { + return 0, errNotEnoughBytes + } + for i := 1; b == 0x00 && i != 4; i++ { + b, ok = sc.readByte() + if !ok { + return 0, errNotEnoughBytes + } + if b != 0x01 || (i != 2 && i != 3) { + continue + } + + b, ok = sc.readByte() + if !ok { + return 0, errNotEnoughBytes + } + return int(b & 0x1f), nil + } + return int(b & 0x1f), nil +} + +type frameScanner struct { + off int + buf []byte +} + +func (s *frameScanner) readByte() (b byte, ok bool) { + if s.off >= len(s.buf) { + return 0, false + } + b = s.buf[s.off] + s.off++ + return b, true +} diff --git a/container/mts/encoder.go b/container/mts/encoder.go index 5d5533cb..63413b77 100644 --- a/container/mts/encoder.go +++ b/container/mts/encoder.go @@ -26,9 +26,13 @@ LICENSE package mts import ( + "fmt" "io" + "strconv" + "sync" "time" + "bitbucket.org/ausocean/av/codec/h264" "bitbucket.org/ausocean/av/container/mts/meta" "bitbucket.org/ausocean/av/container/mts/pes" "bitbucket.org/ausocean/av/container/mts/psi" @@ -117,6 +121,42 @@ const ( MaxPTS = (1 << 33) - 1 ) +// Globals for use in keeping real time. +var ( + realRefTime time.Time // Holds a reference real time given to SetTime. + sysRefTime time.Time // Holds a system reference time set when realRefTime is obtained. + timeIsSet bool // Indicates if the time has been set. + mu = sync.Mutex{} // Used when accessing/mutating above time vars. +) + +// SetTime allows setting of current time. This is useful if the system running +// this encoder does not have time keeping. The user may wish to obtain an +// accurate time from an NTP server or local machine and pass to this function. +func SetTime(t time.Time) { + mu.Lock() + realRefTime = t + sysRefTime = time.Now() + timeIsSet = true + mu.Unlock() +} + +// Time provides either a real time that has been calculated from a reference +// set by SetTime, or using the current system time. +func Time() time.Time { + mu.Lock() + t := realRefTime.Add(time.Now().Sub(sysRefTime)) + mu.Unlock() + return t +} + +// TimeIsSet returns true if SetTime has been used to set a real reference time. +func TimeIsSet() bool { + mu.Lock() + b := timeIsSet + mu.Unlock() + return b +} + // Encoder encapsulates properties of an MPEG-TS generator. type Encoder struct { dst io.WriteCloser @@ -130,13 +170,11 @@ type Encoder struct { continuity map[int]byte - timeBasedPsi bool + nalBasedPSI bool pktCount int psiSendCount int mediaPid int streamID byte - - psiLastTime time.Time } // NewEncoder returns an Encoder with the specified media type and rate eg. if a video stream @@ -174,7 +212,7 @@ func NewEncoder(dst io.WriteCloser, rate float64, mediaType int) *Encoder { writePeriod: time.Duration(float64(time.Second) / rate), ptsOffset: ptsOffset, - timeBasedPsi: true, + nalBasedPSI: true, pktCount: 8, @@ -199,12 +237,8 @@ const ( hasPTS = 0x2 ) -// TimeBasedPsi allows for the setting of the PSI writing method, therefore, if -// PSI is written based on some time duration, or based on a packet count. -// If b is true, then time based PSI is used, otherwise the PSI is written -// every sendCount. -func (e *Encoder) TimeBasedPsi(b bool, sendCount int) { - e.timeBasedPsi = b +func (e *Encoder) NALBasedPSI(b bool, sendCount int) { + e.nalBasedPSI = b e.psiSendCount = sendCount e.pktCount = e.psiSendCount } @@ -212,14 +246,28 @@ func (e *Encoder) TimeBasedPsi(b bool, sendCount int) { // Write implements io.Writer. Write takes raw video or audio data and encodes into MPEG-TS, // then sending it to the encoder's io.Writer destination. func (e *Encoder) Write(data []byte) (int, error) { - now := time.Now() - if (e.timeBasedPsi && (now.Sub(e.psiLastTime) > psiInterval)) || (!e.timeBasedPsi && (e.pktCount >= e.psiSendCount)) { + if e.nalBasedPSI { + nalType, err := h264.NALType(data) + if err != nil { + return 0, fmt.Errorf("could not get type from NAL unit, failed with error: %v", err) + } + + // NAL type that will signify refresh. These are defined in H.264 specifications + // table 7-1. + const nalTypeSPS = 7 + + if nalType == nalTypeSPS { + err := e.writePSI() + if err != nil { + return 0, err + } + } + } else if e.pktCount >= e.psiSendCount { e.pktCount = 0 err := e.writePSI() if err != nil { return 0, err } - e.psiLastTime = now } // Prepare PES data. @@ -328,6 +376,9 @@ func (e *Encoder) ccFor(pid int) byte { // contained in the global Meta struct. func updateMeta(b []byte) ([]byte, error) { p := psi.PSIBytes(b) + if TimeIsSet() { + Meta.Add("ts", strconv.Itoa(int(Time().Unix()))) + } err := p.AddDescriptor(psi.MetadataTag, Meta.Encode()) return []byte(p), err } diff --git a/container/mts/mpegts.go b/container/mts/mpegts.go index 8ca509ba..fe31b5ce 100644 --- a/container/mts/mpegts.go +++ b/container/mts/mpegts.go @@ -650,8 +650,11 @@ func SegmentForMeta(d []byte, key, val string) ([][]byte, error) { } // pid returns the packet identifier for the given packet. -func pid(p []byte) uint16 { - return uint16(p[1]&0x1f)<<8 | uint16(p[2]) +func PID(p []byte) (uint16, error) { + if len(p) < PacketSize { + return 0, errors.New("packet length less than 188") + } + return uint16(p[1]&0x1f)<<8 | uint16(p[2]), nil } // Programs returns a map of program numbers and corresponding PMT PIDs for a @@ -683,10 +686,14 @@ func Streams(p []byte) ([]gotspsi.PmtElementaryStream, error) { // but this program may contain different streams, i.e. a video stream + audio // stream. func MediaStreams(p []byte) ([]gotspsi.PmtElementaryStream, error) { + if len(p) < 2*PacketSize { + return nil, errors.New("PSI is not two packets or more long") + } pat := p[:PacketSize] pmt := p[PacketSize : 2*PacketSize] - if pid(pat) != PatPid { + pid, _ := PID(pat) + if pid != PatPid { return nil, errors.New("first packet is not a PAT") } @@ -703,7 +710,8 @@ func MediaStreams(p []byte) ([]gotspsi.PmtElementaryStream, error) { return nil, ErrMultiplePrograms } - if pid(pmt) != pmtPIDs(m)[0] { + pid, _ = PID(pmt) + if pid != pmtPIDs(m)[0] { return nil, errors.New("second packet is not desired PMT") } diff --git a/revid/config.go b/revid/config.go index 42ab2b5d..e0870c40 100644 --- a/revid/config.go +++ b/revid/config.go @@ -27,6 +27,7 @@ package revid import ( "errors" + "time" "bitbucket.org/ausocean/av/codec/codecutil" "bitbucket.org/ausocean/utils/logger" @@ -102,15 +103,16 @@ const ( defaultBurstPeriod = 10 // Seconds // Raspivid video defaults. - defaultBrightness = 50 - defaultExposure = "auto" - defaultAutoWhiteBalance = "auto" - defaultRotation = 0 // Degrees - defaultWidth = 1280 - defaultHeight = 720 - defaultIntraRefreshPeriod = 100 - defaultQuantization = 30 - defaultBitrate = 400000 + defaultBrightness = 50 + defaultExposure = "auto" + defaultAutoWhiteBalance = "auto" + defaultRotation = 0 // Degrees + defaultWidth = 1280 + defaultHeight = 720 + defaultRefreshPeriod = 100 + defaultClipDuration = 0 + defaultQuantization = 30 + defaultBitrate = 400000 // Audio defaults. defaultAudioInputCodec = codecutil.ADPCM @@ -202,9 +204,16 @@ type Config struct { // are using Raspivid input. Quantization uint - // IntraRefreshPeriod defines the frequency of video parameter NAL units for - // Raspivid input. - IntraRefreshPeriod uint + // MinPeriod defines the frequency of key NAL units SPS, PPS and IDR in + // number of NAL units. This will also determine the frequency of PSI if the + // output container is MPEG-TS. If ClipDuration is less than MinPeriod, + // ClipDuration will default to MinPeriod. + MinPeriod uint + + // ClipDuration is the duration of MTS data that is sent using HTTP or RTP + // output. This defaults to 0, therefore MinPeriod will determine the length of + // clips by default. + ClipDuration time.Duration // Logger holds an implementation of the Logger interface as defined in revid.go. // This must be set for revid to work correctly. @@ -383,9 +392,18 @@ func (c *Config) Validate() error { return errors.New("invalid bitrate") } - if c.IntraRefreshPeriod == 0 { - c.Logger.Log(logger.Info, pkg+"no intra refresh defined, defaulting", "intraRefresh", defaultIntraRefreshPeriod) - c.IntraRefreshPeriod = defaultIntraRefreshPeriod + if c.MinPeriod == 0 { + c.Logger.Log(logger.Info, pkg+"no intra refresh defined, defaulting", "intraRefresh", defaultRefreshPeriod) + c.MinPeriod = defaultRefreshPeriod + } else if c.MinPeriod < 0 { + return errors.New("refresh period is less than 0") + } + + if c.ClipDuration == 0 { + c.Logger.Log(logger.Info, pkg+"no clip duration defined, defaulting", "ClipDuration", defaultClipDuration) + c.ClipDuration = defaultClipDuration + } else if c.ClipDuration < 0 { + return errors.New("clip duration is less than 0") } if c.Quantization != 0 && (c.Quantization < 10 || c.Quantization > 40) { diff --git a/revid/revid.go b/revid/revid.go index 9e11844a..13efdd7b 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -51,6 +51,7 @@ import ( "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/ioext" "bitbucket.org/ausocean/utils/logger" + "bitbucket.org/ausocean/utils/ring" ) // RTMP connection properties. @@ -230,9 +231,8 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. w = newMtsSender( newHttpSender(r.ns, r.config.Logger.Log), r.config.Logger.Log, - r.config.MTSRBSize, - r.config.MTSRBElementSize, - 0, + ring.NewBuffer(r.config.MTSRBSize, r.config.MTSRBElementSize, 0), + r.config.ClipDuration, ) mtsSenders = append(mtsSenders, w) case RTP: @@ -456,13 +456,21 @@ func (r *Revid) Update(vars map[string]string) error { break } r.config.Quantization = uint(v) - case "IntraRefreshPeriod": - p, err := strconv.ParseUint(value, 10, 0) + case "MinPeriod": + v, err := strconv.Atoi(value) if err != nil { - r.config.Logger.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value) + r.config.Logger.Log(logger.Warning, pkg+"invalid MinPeriod param", "value", value) break } - r.config.IntraRefreshPeriod = uint(p) + r.config.MinPeriod = uint(v) + + case "ClipDuration": + v, err := strconv.Atoi(value) + if err != nil { + r.config.Logger.Log(logger.Warning, pkg+"invalid ClipDuration param", "value", value) + break + } + r.config.ClipDuration = time.Duration(v) * time.Second case "HorizontalFlip": switch strings.ToLower(value) { @@ -577,7 +585,7 @@ func (r *Revid) startRaspivid() (func() error, error) { args = append(args, "--codec", "H264", "--inline", - "--intra", fmt.Sprint(r.config.IntraRefreshPeriod), + "--intra", fmt.Sprint(r.config.MinPeriod), ) if r.config.Quantization != 0 { args = append(args, "-qp", fmt.Sprint(r.config.Quantization)) diff --git a/revid/senders.go b/revid/senders.go index 303663ec..18389354 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -29,11 +29,11 @@ LICENSE package revid import ( + "errors" "fmt" "io" "net" "os" - "strconv" "sync" "time" @@ -118,7 +118,7 @@ func extractMeta(r string, log func(lvl int8, msg string, args ...interface{})) log(logger.Warning, pkg+"No timestamp in reply") } else { log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t)) - mts.Meta.Add("ts", strconv.Itoa(t)) + mts.SetTime(time.Unix(int64(t), 0)) } // Extract location from reply @@ -156,8 +156,8 @@ func (s *fileSender) Close() error { return s.file.Close() } // mtsSender implements io.WriteCloser and provides sending capability specifically // for use with MPEGTS packetization. It handles the construction of appropriately -// lengthed clips based on PSI. It also accounts for discontinuities by -// setting the discontinuity indicator for the first packet of a clip. +// lengthed clips based on clip duration and PSI. It also accounts for +// discontinuities by setting the discontinuity indicator for the first packet of a clip. type mtsSender struct { dst io.WriteCloser buf []byte @@ -166,19 +166,22 @@ type mtsSender struct { pkt packet.Packet repairer *mts.DiscontinuityRepairer curPid int + clipDur time.Duration + prev time.Time done chan struct{} log func(lvl int8, msg string, args ...interface{}) wg sync.WaitGroup } // newMtsSender returns a new mtsSender. -func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), ringSize int, ringElementSize int, wTimeout time.Duration) *mtsSender { +func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rb *ring.Buffer, clipDur time.Duration) *mtsSender { s := &mtsSender{ dst: dst, repairer: mts.NewDiscontinuityRepairer(), log: log, - ring: ring.NewBuffer(ringSize, ringElementSize, wTimeout), + ring: rb, done: make(chan struct{}), + clipDur: clipDur, } s.wg.Add(1) go s.output() @@ -229,15 +232,20 @@ func (s *mtsSender) output() { // Write implements io.Writer. func (s *mtsSender) Write(d []byte) (int, error) { + if len(d) < mts.PacketSize { + return 0, errors.New("do not have full MTS packet") + } + if s.next != nil { s.buf = append(s.buf, s.next...) } bytes := make([]byte, len(d)) copy(bytes, d) s.next = bytes - copy(s.pkt[:], bytes) - s.curPid = s.pkt.PID() - if s.curPid == mts.PatPid && len(s.buf) > 0 { + p, _ := mts.PID(bytes) + s.curPid = int(p) + if time.Now().Sub(s.prev) >= s.clipDur && s.curPid == mts.PatPid && len(s.buf) > 0 { + s.prev = time.Now() _, err := s.ring.Write(s.buf) if err != nil { s.log(logger.Warning, pkg+"mtsSender: ringBuffer write error", "error", err.Error()) diff --git a/revid/senders_test.go b/revid/senders_test.go index d92f19f4..3fe2f291 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -39,6 +39,7 @@ import ( "bitbucket.org/ausocean/av/container/mts" "bitbucket.org/ausocean/av/container/mts/meta" "bitbucket.org/ausocean/utils/logger" + "bitbucket.org/ausocean/utils/ring" ) var ( @@ -133,12 +134,12 @@ func TestMtsSenderSegment(t *testing.T) { // Create ringBuffer, sender, sender and the MPEGTS encoder. const numberOfClips = 11 dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips} - sender := newMtsSender(dst, (*dummyLogger)(t).log, defaultMTSRBSize, defaultMTSRBElementSize, 0) + sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(defaultMTSRBSize, defaultMTSRBElementSize, 0), 0) encoder := mts.NewEncoder(sender, 25, mts.EncodeH264) // Turn time based PSI writing off for encoder. const psiSendCount = 10 - encoder.TimeBasedPsi(false, psiSendCount) + encoder.NALBasedPSI(false, psiSendCount) // Write the packets to the encoder, which will in turn write to the mtsSender. // Payload will just be packet number. @@ -211,12 +212,12 @@ func TestMtsSenderFailedSend(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder const clipToFailAt = 3 dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})} - sender := newMtsSender(dst, (*dummyLogger)(t).log, defaultMTSRBSize, defaultMTSRBElementSize, 0) + sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(defaultMTSRBSize, defaultMTSRBElementSize, 0), 0) encoder := mts.NewEncoder(sender, 25, mts.EncodeH264) // Turn time based PSI writing off for encoder and send PSI every 10 packets. const psiSendCount = 10 - encoder.TimeBasedPsi(false, psiSendCount) + encoder.NALBasedPSI(false, psiSendCount) // Write the packets to the encoder, which will in turn write to the mtsSender. // Payload will just be packet number. @@ -291,12 +292,12 @@ func TestMtsSenderDiscontinuity(t *testing.T) { // Create destination, the mtsSender and the mtsEncoder. const clipToDelay = 3 dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})} - sender := newMtsSender(dst, (*dummyLogger)(t).log, 1, defaultMTSRBElementSize, 0) + sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(1, defaultMTSRBElementSize, 0), 0) encoder := mts.NewEncoder(sender, 25, mts.EncodeH264) // Turn time based PSI writing off for encoder. const psiSendCount = 10 - encoder.TimeBasedPsi(false, psiSendCount) + encoder.NALBasedPSI(false, psiSendCount) // Write the packets to the encoder, which will in turn write to the mtsSender. // Payload will just be packet number. From b9cd6b3f135e23c0291d98de79cf2611c0a87686 Mon Sep 17 00:00:00 2001 From: Saxon Date: Mon, 26 Aug 2019 09:24:18 +0930 Subject: [PATCH 2/5] container/mts/encoder.go: using NALTypeSPS from h264dec package --- codec/h264/h264dec/frame.go | 2 +- codec/h264/h264dec/read.go | 2 +- container/mts/encoder.go | 7 ++----- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/codec/h264/h264dec/frame.go b/codec/h264/h264dec/frame.go index 82b32ae5..ca85982b 100644 --- a/codec/h264/h264dec/frame.go +++ b/codec/h264/h264dec/frame.go @@ -9,7 +9,7 @@ const ( naluTypeSlicePartC naluTypeSliceIDRPicture naluTypeSEI - naluTypeSPS + NALTypeSPS naluTypePPS naluTypeAccessUnitDelimiter naluTypeEndOfSequence diff --git a/codec/h264/h264dec/read.go b/codec/h264/h264dec/read.go index c98d46ca..9b283c92 100644 --- a/codec/h264/h264dec/read.go +++ b/codec/h264/h264dec/read.go @@ -61,7 +61,7 @@ func (h *H264Reader) Start() { // TODO: need to handle error from this. nalUnit, _, _ := h.readNalUnit() switch nalUnit.Type { - case naluTypeSPS: + case NALTypeSPS: // TODO: handle this error sps, _ := NewSPS(nalUnit.RBSP, false) h.VideoStreams = append( diff --git a/container/mts/encoder.go b/container/mts/encoder.go index 63413b77..cb3d9d06 100644 --- a/container/mts/encoder.go +++ b/container/mts/encoder.go @@ -33,6 +33,7 @@ import ( "time" "bitbucket.org/ausocean/av/codec/h264" + "bitbucket.org/ausocean/av/codec/h264/h264dec" "bitbucket.org/ausocean/av/container/mts/meta" "bitbucket.org/ausocean/av/container/mts/pes" "bitbucket.org/ausocean/av/container/mts/psi" @@ -252,11 +253,7 @@ func (e *Encoder) Write(data []byte) (int, error) { return 0, fmt.Errorf("could not get type from NAL unit, failed with error: %v", err) } - // NAL type that will signify refresh. These are defined in H.264 specifications - // table 7-1. - const nalTypeSPS = 7 - - if nalType == nalTypeSPS { + if nalType == h264dec.NALTypeSPS { err := e.writePSI() if err != nil { return 0, err From 24e9ed69ca3e47f9edc5c71d7f1d12cfe76237ba Mon Sep 17 00:00:00 2001 From: Saxon Date: Mon, 26 Aug 2019 09:26:35 +0930 Subject: [PATCH 3/5] revid/config.go: got rid of remaining references of 'RefreshPeriod' which is now MinPeriod --- revid/config.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/revid/config.go b/revid/config.go index e0870c40..7807b52c 100644 --- a/revid/config.go +++ b/revid/config.go @@ -109,7 +109,7 @@ const ( defaultRotation = 0 // Degrees defaultWidth = 1280 defaultHeight = 720 - defaultRefreshPeriod = 100 + defaultMinPeriod = 100 defaultClipDuration = 0 defaultQuantization = 30 defaultBitrate = 400000 @@ -393,8 +393,8 @@ func (c *Config) Validate() error { } if c.MinPeriod == 0 { - c.Logger.Log(logger.Info, pkg+"no intra refresh defined, defaulting", "intraRefresh", defaultRefreshPeriod) - c.MinPeriod = defaultRefreshPeriod + c.Logger.Log(logger.Info, pkg+"no min period defined, defaulting", "MinPeriod", defaultMinPeriod) + c.MinPeriod = defaultMinPeriod } else if c.MinPeriod < 0 { return errors.New("refresh period is less than 0") } From e57e14678a496e02075067d52fa2bfcd474a9387 Mon Sep 17 00:00:00 2001 From: Saxon Date: Mon, 26 Aug 2019 13:29:07 +0930 Subject: [PATCH 4/5] container/mts: using RealTime type from utils package instead of global vars with mutator functions --- container/mts/encoder.go | 113 ++++++++++++++------------------------- revid/senders.go | 2 +- 2 files changed, 41 insertions(+), 74 deletions(-) diff --git a/container/mts/encoder.go b/container/mts/encoder.go index cb3d9d06..7d968d5f 100644 --- a/container/mts/encoder.go +++ b/container/mts/encoder.go @@ -29,7 +29,6 @@ import ( "fmt" "io" "strconv" - "sync" "time" "bitbucket.org/ausocean/av/codec/h264" @@ -37,8 +36,43 @@ import ( "bitbucket.org/ausocean/av/container/mts/meta" "bitbucket.org/ausocean/av/container/mts/pes" "bitbucket.org/ausocean/av/container/mts/psi" + "bitbucket.org/ausocean/utils/realtime" ) +// Media type values. +// TODO: reference relevant specifications. +const ( + H264ID = 27 + H265ID = 36 + audioStreamID = 0xc0 // First audio stream ID. +) + +// Constants used to communicate which media codec will be packetized. +const ( + EncodeH264 = iota + EncodeH265 + EncodeAudio +) + +// Time-related constants. +const ( + // ptsOffset is the offset added to the clock to determine + // the current presentation timestamp. + ptsOffset = 700 * time.Millisecond + + // PCRFrequency is the base Program Clock Reference frequency in Hz. + PCRFrequency = 90000 + + // PTSFrequency is the presentation timestamp frequency in Hz. + PTSFrequency = 90000 + + // MaxPTS is the largest PTS value (i.e., for a 33-bit unsigned integer). + MaxPTS = (1 << 33) - 1 +) + +// If we are not using NAL based PSI intervals then we will send PSI every 7 packets. +const psiSendCount = 7 + // Some common manifestations of PSI. var ( // StandardPAT is a minimal PAT. @@ -77,87 +111,20 @@ var ( } ) -const ( - psiInterval = 1 * time.Second - psiSendCount = 7 -) - // Meta allows addition of metadata to encoded mts from outside of this pkg. // See meta pkg for usage. // // TODO: make this not global. var Meta *meta.Data +// This will help us obtain a realtime for timestamp meta encoding. +var RealTime = realtime.NewRealTime() + var ( patTable = StandardPAT.Bytes() pmtTable []byte ) -const ( - H264ID = 27 - H265ID = 36 - audioStreamID = 0xc0 // First audio stream ID. -) - -// Constants used to communicate which media codec will be packetized. -const ( - EncodeH264 = iota - EncodeH265 - EncodeAudio -) - -// Time-related constants. -const ( - // ptsOffset is the offset added to the clock to determine - // the current presentation timestamp. - ptsOffset = 700 * time.Millisecond - - // PCRFrequency is the base Program Clock Reference frequency in Hz. - PCRFrequency = 90000 - - // PTSFrequency is the presentation timestamp frequency in Hz. - PTSFrequency = 90000 - - // MaxPTS is the largest PTS value (i.e., for a 33-bit unsigned integer). - MaxPTS = (1 << 33) - 1 -) - -// Globals for use in keeping real time. -var ( - realRefTime time.Time // Holds a reference real time given to SetTime. - sysRefTime time.Time // Holds a system reference time set when realRefTime is obtained. - timeIsSet bool // Indicates if the time has been set. - mu = sync.Mutex{} // Used when accessing/mutating above time vars. -) - -// SetTime allows setting of current time. This is useful if the system running -// this encoder does not have time keeping. The user may wish to obtain an -// accurate time from an NTP server or local machine and pass to this function. -func SetTime(t time.Time) { - mu.Lock() - realRefTime = t - sysRefTime = time.Now() - timeIsSet = true - mu.Unlock() -} - -// Time provides either a real time that has been calculated from a reference -// set by SetTime, or using the current system time. -func Time() time.Time { - mu.Lock() - t := realRefTime.Add(time.Now().Sub(sysRefTime)) - mu.Unlock() - return t -} - -// TimeIsSet returns true if SetTime has been used to set a real reference time. -func TimeIsSet() bool { - mu.Lock() - b := timeIsSet - mu.Unlock() - return b -} - // Encoder encapsulates properties of an MPEG-TS generator. type Encoder struct { dst io.WriteCloser @@ -373,8 +340,8 @@ func (e *Encoder) ccFor(pid int) byte { // contained in the global Meta struct. func updateMeta(b []byte) ([]byte, error) { p := psi.PSIBytes(b) - if TimeIsSet() { - Meta.Add("ts", strconv.Itoa(int(Time().Unix()))) + if RealTime.IsSet() { + Meta.Add("ts", strconv.Itoa(int(RealTime.Get().Unix()))) } err := p.AddDescriptor(psi.MetadataTag, Meta.Encode()) return []byte(p), err diff --git a/revid/senders.go b/revid/senders.go index 18389354..a6a593bd 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -118,7 +118,7 @@ func extractMeta(r string, log func(lvl int8, msg string, args ...interface{})) log(logger.Warning, pkg+"No timestamp in reply") } else { log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t)) - mts.SetTime(time.Unix(int64(t), 0)) + mts.RealTime.Set(time.Unix(int64(t), 0)) } // Extract location from reply From a8081b52b298359ae8d50ae5ee2572f8af6a1faf Mon Sep 17 00:00:00 2001 From: Saxon Date: Mon, 26 Aug 2019 13:43:45 +0930 Subject: [PATCH 5/5] revid/config.go: MinPeriod => MinFrames --- revid/config.go | 20 ++++++++++---------- revid/revid.go | 8 ++++---- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/revid/config.go b/revid/config.go index 7807b52c..87fb9139 100644 --- a/revid/config.go +++ b/revid/config.go @@ -109,7 +109,7 @@ const ( defaultRotation = 0 // Degrees defaultWidth = 1280 defaultHeight = 720 - defaultMinPeriod = 100 + defaultMinFrames = 100 defaultClipDuration = 0 defaultQuantization = 30 defaultBitrate = 400000 @@ -204,14 +204,14 @@ type Config struct { // are using Raspivid input. Quantization uint - // MinPeriod defines the frequency of key NAL units SPS, PPS and IDR in + // MinFrames defines the frequency of key NAL units SPS, PPS and IDR in // number of NAL units. This will also determine the frequency of PSI if the - // output container is MPEG-TS. If ClipDuration is less than MinPeriod, - // ClipDuration will default to MinPeriod. - MinPeriod uint + // output container is MPEG-TS. If ClipDuration is less than MinFrames, + // ClipDuration will default to MinFrames. + MinFrames uint // ClipDuration is the duration of MTS data that is sent using HTTP or RTP - // output. This defaults to 0, therefore MinPeriod will determine the length of + // output. This defaults to 0, therefore MinFrames will determine the length of // clips by default. ClipDuration time.Duration @@ -392,10 +392,10 @@ func (c *Config) Validate() error { return errors.New("invalid bitrate") } - if c.MinPeriod == 0 { - c.Logger.Log(logger.Info, pkg+"no min period defined, defaulting", "MinPeriod", defaultMinPeriod) - c.MinPeriod = defaultMinPeriod - } else if c.MinPeriod < 0 { + if c.MinFrames == 0 { + c.Logger.Log(logger.Info, pkg+"no min period defined, defaulting", "MinFrames", defaultMinFrames) + c.MinFrames = defaultMinFrames + } else if c.MinFrames < 0 { return errors.New("refresh period is less than 0") } diff --git a/revid/revid.go b/revid/revid.go index 13efdd7b..fa1156b1 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -456,13 +456,13 @@ func (r *Revid) Update(vars map[string]string) error { break } r.config.Quantization = uint(v) - case "MinPeriod": + case "MinFrames": v, err := strconv.Atoi(value) if err != nil { - r.config.Logger.Log(logger.Warning, pkg+"invalid MinPeriod param", "value", value) + r.config.Logger.Log(logger.Warning, pkg+"invalid MinFrames param", "value", value) break } - r.config.MinPeriod = uint(v) + r.config.MinFrames = uint(v) case "ClipDuration": v, err := strconv.Atoi(value) @@ -585,7 +585,7 @@ func (r *Revid) startRaspivid() (func() error, error) { args = append(args, "--codec", "H264", "--inline", - "--intra", fmt.Sprint(r.config.MinPeriod), + "--intra", fmt.Sprint(r.config.MinFrames), ) if r.config.Quantization != 0 { args = append(args, "-qp", fmt.Sprint(r.config.Quantization))