Merged in refresh-period (pull request #240)

revid: now have MinPeriod and ClipDuration params

Approved-by: Alan Noble <anoble@gmail.com>
This commit is contained in:
Saxon Milton 2019-08-28 00:06:03 +00:00
commit 7f3f1a43f9
10 changed files with 244 additions and 118 deletions

View File

@ -106,32 +106,30 @@ func handleFlags() revid.Config {
var cfg revid.Config var cfg revid.Config
var ( var (
cpuprofile = flag.String("cpuprofile", "", "write cpu profile to `file`") cpuprofile = flag.String("cpuprofile", "", "write cpu profile to `file`")
inputCodecPtr = flag.String("InputCodec", "H264", "The codec of the input: H264, Mjpeg, PCM, ADPCM")
inputCodecPtr = flag.String("InputCodec", "H264", "The codec of the input: H264, Mjpeg, PCM, ADPCM") inputPtr = flag.String("Input", "", "The input type: Raspivid, File, v4l, Audio, RTSP")
inputPtr = flag.String("Input", "", "The input type: Raspivid, File, v4l, Audio, RTSP") rtspURLPtr = flag.String("RTSPURL", "", "The URL for an RTSP server.")
rtspURLPtr = flag.String("RTSPURL", "", "The URL for an RTSP server.") verbosityPtr = flag.String("Verbosity", "Info", "Verbosity: Debug, Info, Warning, Error, Fatal")
verbosityPtr = flag.String("Verbosity", "Info", "Verbosity: Debug, Info, Warning, Error, Fatal") rtpAddrPtr = flag.String("RtpAddr", "", "Rtp destination address: <IP>:<port> (port is generally 6970-6999)")
rtpAddrPtr = flag.String("RtpAddr", "", "Rtp destination address: <IP>:<port> (port is generally 6970-6999)") logPathPtr = flag.String("LogPath", defaultLogPath, "The log path")
logPathPtr = flag.String("LogPath", defaultLogPath, "The log path") configFilePtr = flag.String("ConfigFile", "", "NetSender config file")
configFilePtr = flag.String("ConfigFile", "", "NetSender config file") rtmpUrlPtr = flag.String("RtmpUrl", "", "Url of rtmp endpoint")
rtmpUrlPtr = flag.String("RtmpUrl", "", "Url of rtmp endpoint") outputPathPtr = flag.String("OutputPath", "", "The directory of the output file")
outputPathPtr = flag.String("OutputPath", "", "The directory of the output file") inputFilePtr = flag.String("InputPath", "", "The directory of the input file")
inputFilePtr = flag.String("InputPath", "", "The directory of the input file") httpAddressPtr = flag.String("HttpAddress", "", "Destination address of http posts")
httpAddressPtr = flag.String("HttpAddress", "", "Destination address of http posts") verticalFlipPtr = flag.Bool("VerticalFlip", false, "Flip video vertically: Yes, No")
verticalFlipPtr = flag.Bool("VerticalFlip", false, "Flip video vertically: Yes, No") horizontalFlipPtr = flag.Bool("HorizontalFlip", false, "Flip video horizontally: Yes, No")
horizontalFlipPtr = flag.Bool("HorizontalFlip", false, "Flip video horizontally: Yes, No") bitratePtr = flag.Uint("Bitrate", 0, "Bitrate of recorded video")
bitratePtr = flag.Uint("Bitrate", 0, "Bitrate of recorded video") heightPtr = flag.Uint("Height", 0, "Height in pixels")
heightPtr = flag.Uint("Height", 0, "Height in pixels") widthPtr = flag.Uint("Width", 0, "Width in pixels")
widthPtr = flag.Uint("Width", 0, "Width in pixels") frameRatePtr = flag.Uint("FrameRate", 0, "Frame rate of captured video")
frameRatePtr = flag.Uint("FrameRate", 0, "Frame rate of captured video") quantizationPtr = flag.Uint("Quantization", 0, "Desired quantization value: 0-40")
quantizationPtr = flag.Uint("Quantization", 0, "Desired quantization value: 0-40") rotationPtr = flag.Uint("Rotation", 0, "Rotate video output. (0-359 degrees)")
intraRefreshPeriodPtr = flag.Uint("IntraRefreshPeriod", 0, "The IntraRefreshPeriod i.e. how many keyframes we send") brightnessPtr = flag.Uint("Brightness", 50, "Set brightness. (0-100) ")
rotationPtr = flag.Uint("Rotation", 0, "Rotate video output. (0-359 degrees)") saturationPtr = flag.Int("Saturation", 0, "Set Saturation. (100-100)")
brightnessPtr = flag.Uint("Brightness", 50, "Set brightness. (0-100) ") exposurePtr = flag.String("Exposure", "auto", "Set exposure mode. ("+strings.Join(revid.ExposureModes[:], ",")+")")
saturationPtr = flag.Int("Saturation", 0, "Set Saturation. (100-100)") autoWhiteBalancePtr = flag.String("Awb", "auto", "Set automatic white balance mode. ("+strings.Join(revid.AutoWhiteBalanceModes[:], ",")+")")
exposurePtr = flag.String("Exposure", "auto", "Set exposure mode. ("+strings.Join(revid.ExposureModes[:], ",")+")")
autoWhiteBalancePtr = flag.String("Awb", "auto", "Set automatic white balance mode. ("+strings.Join(revid.AutoWhiteBalanceModes[:], ",")+")")
// Audio specific flags. // Audio specific flags.
sampleRatePtr = flag.Int("SampleRate", 48000, "Sample rate of recorded audio") sampleRatePtr = flag.Int("SampleRate", 48000, "Sample rate of recorded audio")
@ -246,7 +244,6 @@ func handleFlags() revid.Config {
cfg.FrameRate = *frameRatePtr cfg.FrameRate = *frameRatePtr
cfg.HTTPAddress = *httpAddressPtr cfg.HTTPAddress = *httpAddressPtr
cfg.Quantization = *quantizationPtr cfg.Quantization = *quantizationPtr
cfg.IntraRefreshPeriod = *intraRefreshPeriodPtr
cfg.RTPAddress = *rtpAddrPtr cfg.RTPAddress = *rtpAddrPtr
cfg.Brightness = *brightnessPtr cfg.Brightness = *brightnessPtr
cfg.Saturation = *saturationPtr cfg.Saturation = *saturationPtr

View File

@ -9,7 +9,7 @@ const (
naluTypeSlicePartC naluTypeSlicePartC
naluTypeSliceIDRPicture naluTypeSliceIDRPicture
naluTypeSEI naluTypeSEI
naluTypeSPS NALTypeSPS
naluTypePPS naluTypePPS
naluTypeAccessUnitDelimiter naluTypeAccessUnitDelimiter
naluTypeEndOfSequence naluTypeEndOfSequence

View File

@ -61,7 +61,7 @@ func (h *H264Reader) Start() {
// TODO: need to handle error from this. // TODO: need to handle error from this.
nalUnit, _, _ := h.readNalUnit() nalUnit, _, _ := h.readNalUnit()
switch nalUnit.Type { switch nalUnit.Type {
case naluTypeSPS: case NALTypeSPS:
// TODO: handle this error // TODO: handle this error
sps, _ := NewSPS(nalUnit.RBSP, false) sps, _ := NewSPS(nalUnit.RBSP, false)
h.VideoStreams = append( h.VideoStreams = append(

71
codec/h264/parse.go Normal file
View File

@ -0,0 +1,71 @@
/*
DESCRIPTION
parse.go provides H.264 NAL unit parsing utilities for the extraction of
syntax elements.
AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org>
Dan Kortschak <dan@ausocean.org>
LICENSE
Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean).
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package h264
import "errors"
var errNotEnoughBytes = errors.New("not enough bytes to read")
// NALType returns the NAL type of the given NAL unit bytes. The given NAL unit
// may be in byte stream or packet format.
func NALType(n []byte) (int, error) {
sc := frameScanner{buf: n}
b, ok := sc.readByte()
if !ok {
return 0, errNotEnoughBytes
}
for i := 1; b == 0x00 && i != 4; i++ {
b, ok = sc.readByte()
if !ok {
return 0, errNotEnoughBytes
}
if b != 0x01 || (i != 2 && i != 3) {
continue
}
b, ok = sc.readByte()
if !ok {
return 0, errNotEnoughBytes
}
return int(b & 0x1f), nil
}
return int(b & 0x1f), nil
}
type frameScanner struct {
off int
buf []byte
}
func (s *frameScanner) readByte() (b byte, ok bool) {
if s.off >= len(s.buf) {
return 0, false
}
b = s.buf[s.off]
s.off++
return b, true
}

View File

@ -26,14 +26,53 @@ LICENSE
package mts package mts
import ( import (
"fmt"
"io" "io"
"strconv"
"time" "time"
"bitbucket.org/ausocean/av/codec/h264"
"bitbucket.org/ausocean/av/codec/h264/h264dec"
"bitbucket.org/ausocean/av/container/mts/meta" "bitbucket.org/ausocean/av/container/mts/meta"
"bitbucket.org/ausocean/av/container/mts/pes" "bitbucket.org/ausocean/av/container/mts/pes"
"bitbucket.org/ausocean/av/container/mts/psi" "bitbucket.org/ausocean/av/container/mts/psi"
"bitbucket.org/ausocean/utils/realtime"
) )
// Media type values.
// TODO: reference relevant specifications.
const (
H264ID = 27
H265ID = 36
audioStreamID = 0xc0 // First audio stream ID.
)
// Constants used to communicate which media codec will be packetized.
const (
EncodeH264 = iota
EncodeH265
EncodeAudio
)
// Time-related constants.
const (
// ptsOffset is the offset added to the clock to determine
// the current presentation timestamp.
ptsOffset = 700 * time.Millisecond
// PCRFrequency is the base Program Clock Reference frequency in Hz.
PCRFrequency = 90000
// PTSFrequency is the presentation timestamp frequency in Hz.
PTSFrequency = 90000
// MaxPTS is the largest PTS value (i.e., for a 33-bit unsigned integer).
MaxPTS = (1 << 33) - 1
)
// If we are not using NAL based PSI intervals then we will send PSI every 7 packets.
const psiSendCount = 7
// Some common manifestations of PSI. // Some common manifestations of PSI.
var ( var (
// StandardPAT is a minimal PAT. // StandardPAT is a minimal PAT.
@ -72,51 +111,20 @@ var (
} }
) )
const (
psiInterval = 1 * time.Second
psiSendCount = 7
)
// Meta allows addition of metadata to encoded mts from outside of this pkg. // Meta allows addition of metadata to encoded mts from outside of this pkg.
// See meta pkg for usage. // See meta pkg for usage.
// //
// TODO: make this not global. // TODO: make this not global.
var Meta *meta.Data var Meta *meta.Data
// This will help us obtain a realtime for timestamp meta encoding.
var RealTime = realtime.NewRealTime()
var ( var (
patTable = StandardPAT.Bytes() patTable = StandardPAT.Bytes()
pmtTable []byte pmtTable []byte
) )
const (
H264ID = 27
H265ID = 36
audioStreamID = 0xc0 // First audio stream ID.
)
// Constants used to communicate which media codec will be packetized.
const (
EncodeH264 = iota
EncodeH265
EncodeAudio
)
// Time-related constants.
const (
// ptsOffset is the offset added to the clock to determine
// the current presentation timestamp.
ptsOffset = 700 * time.Millisecond
// PCRFrequency is the base Program Clock Reference frequency in Hz.
PCRFrequency = 90000
// PTSFrequency is the presentation timestamp frequency in Hz.
PTSFrequency = 90000
// MaxPTS is the largest PTS value (i.e., for a 33-bit unsigned integer).
MaxPTS = (1 << 33) - 1
)
// Encoder encapsulates properties of an MPEG-TS generator. // Encoder encapsulates properties of an MPEG-TS generator.
type Encoder struct { type Encoder struct {
dst io.WriteCloser dst io.WriteCloser
@ -130,13 +138,11 @@ type Encoder struct {
continuity map[int]byte continuity map[int]byte
timeBasedPsi bool nalBasedPSI bool
pktCount int pktCount int
psiSendCount int psiSendCount int
mediaPid int mediaPid int
streamID byte streamID byte
psiLastTime time.Time
} }
// NewEncoder returns an Encoder with the specified media type and rate eg. if a video stream // NewEncoder returns an Encoder with the specified media type and rate eg. if a video stream
@ -174,7 +180,7 @@ func NewEncoder(dst io.WriteCloser, rate float64, mediaType int) *Encoder {
writePeriod: time.Duration(float64(time.Second) / rate), writePeriod: time.Duration(float64(time.Second) / rate),
ptsOffset: ptsOffset, ptsOffset: ptsOffset,
timeBasedPsi: true, nalBasedPSI: true,
pktCount: 8, pktCount: 8,
@ -199,12 +205,8 @@ const (
hasPTS = 0x2 hasPTS = 0x2
) )
// TimeBasedPsi allows for the setting of the PSI writing method, therefore, if func (e *Encoder) NALBasedPSI(b bool, sendCount int) {
// PSI is written based on some time duration, or based on a packet count. e.nalBasedPSI = b
// If b is true, then time based PSI is used, otherwise the PSI is written
// every sendCount.
func (e *Encoder) TimeBasedPsi(b bool, sendCount int) {
e.timeBasedPsi = b
e.psiSendCount = sendCount e.psiSendCount = sendCount
e.pktCount = e.psiSendCount e.pktCount = e.psiSendCount
} }
@ -212,14 +214,24 @@ func (e *Encoder) TimeBasedPsi(b bool, sendCount int) {
// Write implements io.Writer. Write takes raw video or audio data and encodes into MPEG-TS, // Write implements io.Writer. Write takes raw video or audio data and encodes into MPEG-TS,
// then sending it to the encoder's io.Writer destination. // then sending it to the encoder's io.Writer destination.
func (e *Encoder) Write(data []byte) (int, error) { func (e *Encoder) Write(data []byte) (int, error) {
now := time.Now() if e.nalBasedPSI {
if (e.timeBasedPsi && (now.Sub(e.psiLastTime) > psiInterval)) || (!e.timeBasedPsi && (e.pktCount >= e.psiSendCount)) { nalType, err := h264.NALType(data)
if err != nil {
return 0, fmt.Errorf("could not get type from NAL unit, failed with error: %v", err)
}
if nalType == h264dec.NALTypeSPS {
err := e.writePSI()
if err != nil {
return 0, err
}
}
} else if e.pktCount >= e.psiSendCount {
e.pktCount = 0 e.pktCount = 0
err := e.writePSI() err := e.writePSI()
if err != nil { if err != nil {
return 0, err return 0, err
} }
e.psiLastTime = now
} }
// Prepare PES data. // Prepare PES data.
@ -328,6 +340,9 @@ func (e *Encoder) ccFor(pid int) byte {
// contained in the global Meta struct. // contained in the global Meta struct.
func updateMeta(b []byte) ([]byte, error) { func updateMeta(b []byte) ([]byte, error) {
p := psi.PSIBytes(b) p := psi.PSIBytes(b)
if RealTime.IsSet() {
Meta.Add("ts", strconv.Itoa(int(RealTime.Get().Unix())))
}
err := p.AddDescriptor(psi.MetadataTag, Meta.Encode()) err := p.AddDescriptor(psi.MetadataTag, Meta.Encode())
return []byte(p), err return []byte(p), err
} }

View File

@ -650,8 +650,11 @@ func SegmentForMeta(d []byte, key, val string) ([][]byte, error) {
} }
// pid returns the packet identifier for the given packet. // pid returns the packet identifier for the given packet.
func pid(p []byte) uint16 { func PID(p []byte) (uint16, error) {
return uint16(p[1]&0x1f)<<8 | uint16(p[2]) if len(p) < PacketSize {
return 0, errors.New("packet length less than 188")
}
return uint16(p[1]&0x1f)<<8 | uint16(p[2]), nil
} }
// Programs returns a map of program numbers and corresponding PMT PIDs for a // Programs returns a map of program numbers and corresponding PMT PIDs for a
@ -683,10 +686,14 @@ func Streams(p []byte) ([]gotspsi.PmtElementaryStream, error) {
// but this program may contain different streams, i.e. a video stream + audio // but this program may contain different streams, i.e. a video stream + audio
// stream. // stream.
func MediaStreams(p []byte) ([]gotspsi.PmtElementaryStream, error) { func MediaStreams(p []byte) ([]gotspsi.PmtElementaryStream, error) {
if len(p) < 2*PacketSize {
return nil, errors.New("PSI is not two packets or more long")
}
pat := p[:PacketSize] pat := p[:PacketSize]
pmt := p[PacketSize : 2*PacketSize] pmt := p[PacketSize : 2*PacketSize]
if pid(pat) != PatPid { pid, _ := PID(pat)
if pid != PatPid {
return nil, errors.New("first packet is not a PAT") return nil, errors.New("first packet is not a PAT")
} }
@ -703,7 +710,8 @@ func MediaStreams(p []byte) ([]gotspsi.PmtElementaryStream, error) {
return nil, ErrMultiplePrograms return nil, ErrMultiplePrograms
} }
if pid(pmt) != pmtPIDs(m)[0] { pid, _ = PID(pmt)
if pid != pmtPIDs(m)[0] {
return nil, errors.New("second packet is not desired PMT") return nil, errors.New("second packet is not desired PMT")
} }

View File

@ -27,6 +27,7 @@ package revid
import ( import (
"errors" "errors"
"time"
"bitbucket.org/ausocean/av/codec/codecutil" "bitbucket.org/ausocean/av/codec/codecutil"
"bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/logger"
@ -102,15 +103,16 @@ const (
defaultBurstPeriod = 10 // Seconds defaultBurstPeriod = 10 // Seconds
// Raspivid video defaults. // Raspivid video defaults.
defaultBrightness = 50 defaultBrightness = 50
defaultExposure = "auto" defaultExposure = "auto"
defaultAutoWhiteBalance = "auto" defaultAutoWhiteBalance = "auto"
defaultRotation = 0 // Degrees defaultRotation = 0 // Degrees
defaultWidth = 1280 defaultWidth = 1280
defaultHeight = 720 defaultHeight = 720
defaultIntraRefreshPeriod = 100 defaultMinFrames = 100
defaultQuantization = 30 defaultClipDuration = 0
defaultBitrate = 400000 defaultQuantization = 30
defaultBitrate = 400000
// Audio defaults. // Audio defaults.
defaultAudioInputCodec = codecutil.ADPCM defaultAudioInputCodec = codecutil.ADPCM
@ -202,9 +204,16 @@ type Config struct {
// are using Raspivid input. // are using Raspivid input.
Quantization uint Quantization uint
// IntraRefreshPeriod defines the frequency of video parameter NAL units for // MinFrames defines the frequency of key NAL units SPS, PPS and IDR in
// Raspivid input. // number of NAL units. This will also determine the frequency of PSI if the
IntraRefreshPeriod uint // output container is MPEG-TS. If ClipDuration is less than MinFrames,
// ClipDuration will default to MinFrames.
MinFrames uint
// ClipDuration is the duration of MTS data that is sent using HTTP or RTP
// output. This defaults to 0, therefore MinFrames will determine the length of
// clips by default.
ClipDuration time.Duration
// Logger holds an implementation of the Logger interface as defined in revid.go. // Logger holds an implementation of the Logger interface as defined in revid.go.
// This must be set for revid to work correctly. // This must be set for revid to work correctly.
@ -383,9 +392,18 @@ func (c *Config) Validate() error {
return errors.New("invalid bitrate") return errors.New("invalid bitrate")
} }
if c.IntraRefreshPeriod == 0 { if c.MinFrames == 0 {
c.Logger.Log(logger.Info, pkg+"no intra refresh defined, defaulting", "intraRefresh", defaultIntraRefreshPeriod) c.Logger.Log(logger.Info, pkg+"no min period defined, defaulting", "MinFrames", defaultMinFrames)
c.IntraRefreshPeriod = defaultIntraRefreshPeriod c.MinFrames = defaultMinFrames
} else if c.MinFrames < 0 {
return errors.New("refresh period is less than 0")
}
if c.ClipDuration == 0 {
c.Logger.Log(logger.Info, pkg+"no clip duration defined, defaulting", "ClipDuration", defaultClipDuration)
c.ClipDuration = defaultClipDuration
} else if c.ClipDuration < 0 {
return errors.New("clip duration is less than 0")
} }
if c.Quantization != 0 && (c.Quantization < 10 || c.Quantization > 40) { if c.Quantization != 0 && (c.Quantization < 10 || c.Quantization > 40) {

View File

@ -51,6 +51,7 @@ import (
"bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/ioext" "bitbucket.org/ausocean/utils/ioext"
"bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring"
) )
// RTMP connection properties. // RTMP connection properties.
@ -230,9 +231,8 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io.
w = newMtsSender( w = newMtsSender(
newHttpSender(r.ns, r.config.Logger.Log), newHttpSender(r.ns, r.config.Logger.Log),
r.config.Logger.Log, r.config.Logger.Log,
r.config.MTSRBSize, ring.NewBuffer(r.config.MTSRBSize, r.config.MTSRBElementSize, 0),
r.config.MTSRBElementSize, r.config.ClipDuration,
0,
) )
mtsSenders = append(mtsSenders, w) mtsSenders = append(mtsSenders, w)
case RTP: case RTP:
@ -456,13 +456,21 @@ func (r *Revid) Update(vars map[string]string) error {
break break
} }
r.config.Quantization = uint(v) r.config.Quantization = uint(v)
case "IntraRefreshPeriod": case "MinFrames":
p, err := strconv.ParseUint(value, 10, 0) v, err := strconv.Atoi(value)
if err != nil { if err != nil {
r.config.Logger.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value) r.config.Logger.Log(logger.Warning, pkg+"invalid MinFrames param", "value", value)
break break
} }
r.config.IntraRefreshPeriod = uint(p) r.config.MinFrames = uint(v)
case "ClipDuration":
v, err := strconv.Atoi(value)
if err != nil {
r.config.Logger.Log(logger.Warning, pkg+"invalid ClipDuration param", "value", value)
break
}
r.config.ClipDuration = time.Duration(v) * time.Second
case "HorizontalFlip": case "HorizontalFlip":
switch strings.ToLower(value) { switch strings.ToLower(value) {
@ -577,7 +585,7 @@ func (r *Revid) startRaspivid() (func() error, error) {
args = append(args, args = append(args,
"--codec", "H264", "--codec", "H264",
"--inline", "--inline",
"--intra", fmt.Sprint(r.config.IntraRefreshPeriod), "--intra", fmt.Sprint(r.config.MinFrames),
) )
if r.config.Quantization != 0 { if r.config.Quantization != 0 {
args = append(args, "-qp", fmt.Sprint(r.config.Quantization)) args = append(args, "-qp", fmt.Sprint(r.config.Quantization))

View File

@ -29,11 +29,11 @@ LICENSE
package revid package revid
import ( import (
"errors"
"fmt" "fmt"
"io" "io"
"net" "net"
"os" "os"
"strconv"
"sync" "sync"
"time" "time"
@ -118,7 +118,7 @@ func extractMeta(r string, log func(lvl int8, msg string, args ...interface{}))
log(logger.Warning, pkg+"No timestamp in reply") log(logger.Warning, pkg+"No timestamp in reply")
} else { } else {
log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t)) log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t))
mts.Meta.Add("ts", strconv.Itoa(t)) mts.RealTime.Set(time.Unix(int64(t), 0))
} }
// Extract location from reply // Extract location from reply
@ -156,8 +156,8 @@ func (s *fileSender) Close() error { return s.file.Close() }
// mtsSender implements io.WriteCloser and provides sending capability specifically // mtsSender implements io.WriteCloser and provides sending capability specifically
// for use with MPEGTS packetization. It handles the construction of appropriately // for use with MPEGTS packetization. It handles the construction of appropriately
// lengthed clips based on PSI. It also accounts for discontinuities by // lengthed clips based on clip duration and PSI. It also accounts for
// setting the discontinuity indicator for the first packet of a clip. // discontinuities by setting the discontinuity indicator for the first packet of a clip.
type mtsSender struct { type mtsSender struct {
dst io.WriteCloser dst io.WriteCloser
buf []byte buf []byte
@ -166,19 +166,22 @@ type mtsSender struct {
pkt packet.Packet pkt packet.Packet
repairer *mts.DiscontinuityRepairer repairer *mts.DiscontinuityRepairer
curPid int curPid int
clipDur time.Duration
prev time.Time
done chan struct{} done chan struct{}
log func(lvl int8, msg string, args ...interface{}) log func(lvl int8, msg string, args ...interface{})
wg sync.WaitGroup wg sync.WaitGroup
} }
// newMtsSender returns a new mtsSender. // newMtsSender returns a new mtsSender.
func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), ringSize int, ringElementSize int, wTimeout time.Duration) *mtsSender { func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rb *ring.Buffer, clipDur time.Duration) *mtsSender {
s := &mtsSender{ s := &mtsSender{
dst: dst, dst: dst,
repairer: mts.NewDiscontinuityRepairer(), repairer: mts.NewDiscontinuityRepairer(),
log: log, log: log,
ring: ring.NewBuffer(ringSize, ringElementSize, wTimeout), ring: rb,
done: make(chan struct{}), done: make(chan struct{}),
clipDur: clipDur,
} }
s.wg.Add(1) s.wg.Add(1)
go s.output() go s.output()
@ -229,15 +232,20 @@ func (s *mtsSender) output() {
// Write implements io.Writer. // Write implements io.Writer.
func (s *mtsSender) Write(d []byte) (int, error) { func (s *mtsSender) Write(d []byte) (int, error) {
if len(d) < mts.PacketSize {
return 0, errors.New("do not have full MTS packet")
}
if s.next != nil { if s.next != nil {
s.buf = append(s.buf, s.next...) s.buf = append(s.buf, s.next...)
} }
bytes := make([]byte, len(d)) bytes := make([]byte, len(d))
copy(bytes, d) copy(bytes, d)
s.next = bytes s.next = bytes
copy(s.pkt[:], bytes) p, _ := mts.PID(bytes)
s.curPid = s.pkt.PID() s.curPid = int(p)
if s.curPid == mts.PatPid && len(s.buf) > 0 { if time.Now().Sub(s.prev) >= s.clipDur && s.curPid == mts.PatPid && len(s.buf) > 0 {
s.prev = time.Now()
_, err := s.ring.Write(s.buf) _, err := s.ring.Write(s.buf)
if err != nil { if err != nil {
s.log(logger.Warning, pkg+"mtsSender: ringBuffer write error", "error", err.Error()) s.log(logger.Warning, pkg+"mtsSender: ringBuffer write error", "error", err.Error())

View File

@ -39,6 +39,7 @@ import (
"bitbucket.org/ausocean/av/container/mts" "bitbucket.org/ausocean/av/container/mts"
"bitbucket.org/ausocean/av/container/mts/meta" "bitbucket.org/ausocean/av/container/mts/meta"
"bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring"
) )
var ( var (
@ -133,12 +134,12 @@ func TestMtsSenderSegment(t *testing.T) {
// Create ringBuffer, sender, sender and the MPEGTS encoder. // Create ringBuffer, sender, sender and the MPEGTS encoder.
const numberOfClips = 11 const numberOfClips = 11
dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips} dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips}
sender := newMtsSender(dst, (*dummyLogger)(t).log, defaultMTSRBSize, defaultMTSRBElementSize, 0) sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(defaultMTSRBSize, defaultMTSRBElementSize, 0), 0)
encoder := mts.NewEncoder(sender, 25, mts.EncodeH264) encoder := mts.NewEncoder(sender, 25, mts.EncodeH264)
// Turn time based PSI writing off for encoder. // Turn time based PSI writing off for encoder.
const psiSendCount = 10 const psiSendCount = 10
encoder.TimeBasedPsi(false, psiSendCount) encoder.NALBasedPSI(false, psiSendCount)
// Write the packets to the encoder, which will in turn write to the mtsSender. // Write the packets to the encoder, which will in turn write to the mtsSender.
// Payload will just be packet number. // Payload will just be packet number.
@ -211,12 +212,12 @@ func TestMtsSenderFailedSend(t *testing.T) {
// Create destination, the mtsSender and the mtsEncoder // Create destination, the mtsSender and the mtsEncoder
const clipToFailAt = 3 const clipToFailAt = 3
dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})} dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})}
sender := newMtsSender(dst, (*dummyLogger)(t).log, defaultMTSRBSize, defaultMTSRBElementSize, 0) sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(defaultMTSRBSize, defaultMTSRBElementSize, 0), 0)
encoder := mts.NewEncoder(sender, 25, mts.EncodeH264) encoder := mts.NewEncoder(sender, 25, mts.EncodeH264)
// Turn time based PSI writing off for encoder and send PSI every 10 packets. // Turn time based PSI writing off for encoder and send PSI every 10 packets.
const psiSendCount = 10 const psiSendCount = 10
encoder.TimeBasedPsi(false, psiSendCount) encoder.NALBasedPSI(false, psiSendCount)
// Write the packets to the encoder, which will in turn write to the mtsSender. // Write the packets to the encoder, which will in turn write to the mtsSender.
// Payload will just be packet number. // Payload will just be packet number.
@ -291,12 +292,12 @@ func TestMtsSenderDiscontinuity(t *testing.T) {
// Create destination, the mtsSender and the mtsEncoder. // Create destination, the mtsSender and the mtsEncoder.
const clipToDelay = 3 const clipToDelay = 3
dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})} dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})}
sender := newMtsSender(dst, (*dummyLogger)(t).log, 1, defaultMTSRBElementSize, 0) sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(1, defaultMTSRBElementSize, 0), 0)
encoder := mts.NewEncoder(sender, 25, mts.EncodeH264) encoder := mts.NewEncoder(sender, 25, mts.EncodeH264)
// Turn time based PSI writing off for encoder. // Turn time based PSI writing off for encoder.
const psiSendCount = 10 const psiSendCount = 10
encoder.TimeBasedPsi(false, psiSendCount) encoder.NALBasedPSI(false, psiSendCount)
// Write the packets to the encoder, which will in turn write to the mtsSender. // Write the packets to the encoder, which will in turn write to the mtsSender.
// Payload will just be packet number. // Payload will just be packet number.