mirror of https://bitbucket.org/ausocean/av.git
container/mts: doing logging in encoder code now
This commit is contained in:
parent
18ffefd7cb
commit
13ce7cdba0
|
@ -47,6 +47,14 @@ const (
|
|||
audioStreamID = 0xc0 // ADPCM audio stream ID.
|
||||
)
|
||||
|
||||
// These three constants are used to select between the three different
|
||||
// methods of when the PSI is sent.
|
||||
const (
|
||||
pktBased = iota
|
||||
timeBased
|
||||
nalBased
|
||||
)
|
||||
|
||||
// Constants used to communicate which media codec will be packetized.
|
||||
const (
|
||||
EncodeH264 = iota
|
||||
|
@ -187,18 +195,22 @@ func NewEncoder(dst io.WriteCloser, rate float64, mediaType int, log logger, opt
|
|||
mPID = AudioPid
|
||||
sID = audioStreamID
|
||||
psiM = pktBased
|
||||
log.Debug("configured for audio packetisation")
|
||||
case EncodeH265:
|
||||
mPID = VideoPid
|
||||
sID = H265ID
|
||||
psiM = nalBased
|
||||
log.Debug("configured for h.265 packetisation")
|
||||
case EncodeH264:
|
||||
mPID = VideoPid
|
||||
sID = H264ID
|
||||
psiM = nalBased
|
||||
log.Debug("configured for h.264 packetisation")
|
||||
case EncodeMJPEG:
|
||||
mPID = VideoPid
|
||||
sID = MJPEGID
|
||||
psiM = timeBased
|
||||
log.Debug("configured for MJPEG packetisation")
|
||||
}
|
||||
|
||||
pmt := BasePMT
|
||||
|
@ -214,23 +226,15 @@ func NewEncoder(dst io.WriteCloser, rate float64, mediaType int, log logger, opt
|
|||
pmtTable = pmt.Bytes()
|
||||
|
||||
e := &Encoder{
|
||||
dst: dst,
|
||||
|
||||
dst: dst,
|
||||
writePeriod: time.Duration(float64(time.Second) / rate),
|
||||
ptsOffset: ptsOffset,
|
||||
|
||||
psiMethod: psiM,
|
||||
|
||||
pktCount: 8,
|
||||
|
||||
mediaPid: mPID,
|
||||
streamID: sID,
|
||||
|
||||
continuity: map[uint16]byte{
|
||||
PatPid: 0,
|
||||
PmtPid: 0,
|
||||
mPID: 0,
|
||||
},
|
||||
psiMethod: psiM,
|
||||
pktCount: 8,
|
||||
mediaPid: mPID,
|
||||
streamID: sID,
|
||||
continuity: map[uint16]byte{PatPid: 0, PmtPid: 0, mPID: 0},
|
||||
log: log,
|
||||
}
|
||||
|
||||
for _, option := range options {
|
||||
|
@ -239,17 +243,11 @@ func NewEncoder(dst io.WriteCloser, rate float64, mediaType int, log logger, opt
|
|||
return nil, fmt.Errorf("option failed with error: %w", err)
|
||||
}
|
||||
}
|
||||
log.Debug("encoder options applied")
|
||||
|
||||
return e, nil
|
||||
}
|
||||
|
||||
// These three constants are used to select between the three different
|
||||
// methods of when the PSI is sent.
|
||||
const (
|
||||
pktBased = iota
|
||||
timeBased
|
||||
nalBased
|
||||
)
|
||||
|
||||
// PacketBasedPSI is an option that can be passed to NewEncoder to select
|
||||
// packet based PSI writing, i.e. PSI are written to the destination every
|
||||
// sendCount packets.
|
||||
|
@ -258,6 +256,7 @@ func PacketBasedPSI(sendCount int) func(*Encoder) error {
|
|||
e.psiMethod = pktBased
|
||||
e.psiSendCount = sendCount
|
||||
e.pktCount = e.psiSendCount
|
||||
e.log.Debug("configured for packet based PSI insertion", "count", sendCount)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
@ -271,6 +270,7 @@ func TimeBasedPSI(dur time.Duration) func(*Encoder) error {
|
|||
e.psiTime = 0
|
||||
e.psiSetTime = dur
|
||||
e.startTime = time.Now()
|
||||
e.log.Debug("configured for time based PSI insertion")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
@ -280,6 +280,7 @@ func TimeBasedPSI(dur time.Duration) func(*Encoder) error {
|
|||
func (e *Encoder) Write(data []byte) (int, error) {
|
||||
switch e.psiMethod {
|
||||
case pktBased:
|
||||
e.log.Debug("checking packet no. conditions for PSI write", "count", e.pktCount, "PSI count", e.psiSendCount)
|
||||
if e.pktCount >= e.psiSendCount {
|
||||
e.pktCount = 0
|
||||
err := e.writePSI()
|
||||
|
@ -292,7 +293,7 @@ func (e *Encoder) Write(data []byte) (int, error) {
|
|||
if err != nil {
|
||||
return 0, fmt.Errorf("could not get type from NAL unit, failed with error: %w", err)
|
||||
}
|
||||
|
||||
e.log.Debug("checking conditions for PSI write", "AU type", nalType, "needed type", h264dec.NALTypeSPS)
|
||||
if nalType == h264dec.NALTypeSPS {
|
||||
err := e.writePSI()
|
||||
if err != nil {
|
||||
|
@ -300,24 +301,26 @@ func (e *Encoder) Write(data []byte) (int, error) {
|
|||
}
|
||||
}
|
||||
case timeBased:
|
||||
if time.Now().Sub(e.startTime) >= e.psiTime {
|
||||
dur := time.Now().Sub(e.startTime)
|
||||
e.log.Debug("checking time conditions for PSI write")
|
||||
if dur >= e.psiTime {
|
||||
e.psiTime = e.psiSetTime
|
||||
e.startTime = time.Now()
|
||||
err := e.writePSI()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
}
|
||||
default:
|
||||
panic("Undefined PSI method")
|
||||
panic("undefined PSI method")
|
||||
}
|
||||
|
||||
// Prepare PES data.
|
||||
pts := e.pts()
|
||||
pesPkt := pes.Packet{
|
||||
StreamID: e.streamID,
|
||||
PDI: hasPTS,
|
||||
PTS: e.pts(),
|
||||
PTS: pts,
|
||||
Data: data,
|
||||
HeaderLength: 5,
|
||||
}
|
||||
|
@ -340,7 +343,9 @@ func (e *Encoder) Write(data []byte) (int, error) {
|
|||
if pusi {
|
||||
// If the packet has a Payload Unit Start Indicator
|
||||
// flag set then we need to write a PCR.
|
||||
pkt.PCR = e.pcr()
|
||||
pcr := e.pcr()
|
||||
e.log.Debug("new access unit", "PCR", pcr, "PTS", pts)
|
||||
pkt.PCR = pcr
|
||||
pusi = false
|
||||
}
|
||||
_, err := e.dst.Write(pkt.Bytes(e.tsSpace[:PacketSize]))
|
||||
|
@ -371,7 +376,8 @@ func (e *Encoder) writePSI() error {
|
|||
return err
|
||||
}
|
||||
e.pktCount++
|
||||
pmtTable, err = updateMeta(pmtTable)
|
||||
|
||||
pmtTable, err = updateMeta(pmtTable, e.log)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -390,6 +396,8 @@ func (e *Encoder) writePSI() error {
|
|||
return err
|
||||
}
|
||||
e.pktCount++
|
||||
|
||||
e.log.Debug("PSI written", "PAT CC", patPkt.CC, "PMT CC", pmtPkt.CC)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -418,15 +426,18 @@ func (e *Encoder) ccFor(pid uint16) byte {
|
|||
|
||||
// updateMeta adds/updates a metaData descriptor in the given psi bytes using data
|
||||
// contained in the global Meta struct.
|
||||
func updateMeta(b []byte) ([]byte, error) {
|
||||
func updateMeta(b []byte, log logger) ([]byte, error) {
|
||||
p := psi.PSIBytes(b)
|
||||
if RealTime.IsSet() {
|
||||
Meta.Add("ts", strconv.Itoa(int(RealTime.Get().Unix())))
|
||||
t := strconv.Itoa(int(RealTime.Get().Unix()))
|
||||
Meta.Add("ts", t)
|
||||
log.Debug("latest time added to meta", "time", t)
|
||||
}
|
||||
err := p.AddDescriptor(psi.MetadataTag, Meta.Encode())
|
||||
return []byte(p), err
|
||||
}
|
||||
|
||||
func (e *Encoder) Close() error {
|
||||
e.log.Debug("closing encoder")
|
||||
return e.dst.Close()
|
||||
}
|
||||
|
|
|
@ -57,11 +57,11 @@ func (d *destination) Write(p []byte) (int, error) {
|
|||
// testLogger will allow logging to be done by the testing pkg.
|
||||
type testLogger testing.T
|
||||
|
||||
func (tl *testLogger) Debug(msg string, args ...interface{}) { tl.log("debug", msg, args) }
|
||||
func (tl *testLogger) Info(msg string, args ...interface{}) { tl.log("info", msg, args) }
|
||||
func (tl *testLogger) Warning(msg string, args ...interface{}) { tl.log("warning", msg, args) }
|
||||
func (tl *testLogger) Error(msg string, args ...interface{}) { tl.log("error", msg, args) }
|
||||
func (tl *testLogger) Fatal(msg string, args ...interface{}) { tl.log("fatal", msg, args) }
|
||||
func (tl *testLogger) Debug(msg string, args ...interface{}) { tl.log("debug", msg, args...) }
|
||||
func (tl *testLogger) Info(msg string, args ...interface{}) { tl.log("info", msg, args...) }
|
||||
func (tl *testLogger) Warning(msg string, args ...interface{}) { tl.log("warning", msg, args...) }
|
||||
func (tl *testLogger) Error(msg string, args ...interface{}) { tl.log("error", msg, args...) }
|
||||
func (tl *testLogger) Fatal(msg string, args ...interface{}) { tl.log("fatal", msg, args...) }
|
||||
|
||||
func (tl *testLogger) log(lvl string, msg string, args ...interface{}) {
|
||||
switch lvl {
|
||||
|
|
|
@ -365,7 +365,7 @@ func TestTrimToMetaRange(t *testing.T) {
|
|||
|
||||
for i := 0; i < nPSI; i++ {
|
||||
Meta.Add(key, strconv.Itoa((i*2)+1))
|
||||
err := writePSIWithMeta(&clip)
|
||||
err := writePSIWithMeta(&clip, t)
|
||||
if err != nil {
|
||||
t.Fatalf("did not expect to get error writing PSI, error: %v", err)
|
||||
}
|
||||
|
@ -472,7 +472,7 @@ func TestSegmentForMeta(t *testing.T) {
|
|||
} else {
|
||||
Meta.Delete(key)
|
||||
}
|
||||
err := writePSIWithMeta(&clip)
|
||||
err := writePSIWithMeta(&clip, t)
|
||||
if err != nil {
|
||||
t.Fatalf("did not expect to get error writing PSI, error: %v", err)
|
||||
}
|
||||
|
@ -668,7 +668,7 @@ func TestFindPSI(t *testing.T) {
|
|||
}).Bytes()
|
||||
|
||||
Meta.Add(metaKey, test.meta)
|
||||
pmtTable, err = updateMeta(pmtTable)
|
||||
pmtTable, err = updateMeta(pmtTable, (*testLogger)(t))
|
||||
if err != nil {
|
||||
t.Fatalf("could not update meta for test %d", i)
|
||||
}
|
||||
|
|
|
@ -70,7 +70,7 @@ func TestExtract(t *testing.T) {
|
|||
// We'll add the frame number as meta.
|
||||
Meta.Add("frameNum", strconv.Itoa(i))
|
||||
|
||||
err = writePSIWithMeta(&clip)
|
||||
err = writePSIWithMeta(&clip, t)
|
||||
if err != nil {
|
||||
t.Fatalf("did not expect error writing psi: %v", err)
|
||||
}
|
||||
|
@ -136,7 +136,7 @@ func TestExtract(t *testing.T) {
|
|||
}
|
||||
|
||||
// writePSIWithMeta writes PSI to b with updated metadata.
|
||||
func writePSIWithMeta(b *bytes.Buffer) error {
|
||||
func writePSIWithMeta(b *bytes.Buffer, t *testing.T) error {
|
||||
// Write PAT.
|
||||
pat := Packet{
|
||||
PUSI: true,
|
||||
|
@ -151,7 +151,7 @@ func writePSIWithMeta(b *bytes.Buffer) error {
|
|||
}
|
||||
|
||||
// Update the meta in the pmt table.
|
||||
pmtTable, err = updateMeta(pmtTable)
|
||||
pmtTable, err = updateMeta(pmtTable, (*testLogger)(t))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -202,7 +202,7 @@ func TestClipBytes(t *testing.T) {
|
|||
// We'll add the frame number as meta.
|
||||
Meta.Add("frameNum", strconv.Itoa(i))
|
||||
|
||||
err = writePSIWithMeta(&clip)
|
||||
err = writePSIWithMeta(&clip, t)
|
||||
if err != nil {
|
||||
t.Fatalf("did not expect error writing psi: %v", err)
|
||||
}
|
||||
|
|
|
@ -76,11 +76,11 @@ type Logger interface {
|
|||
|
||||
type encLog struct{ Logger }
|
||||
|
||||
func (el *encLog) Debug(msg string, args ...interface{}) { el.Log(logger.Debug, msg, args) }
|
||||
func (el *encLog) Info(msg string, args ...interface{}) { el.Log(logger.Info, msg, args) }
|
||||
func (el *encLog) Warning(msg string, args ...interface{}) { el.Log(logger.Warning, msg, args) }
|
||||
func (el *encLog) Error(msg string, args ...interface{}) { el.Log(logger.Error, msg, args) }
|
||||
func (el *encLog) Fatal(msg string, args ...interface{}) { el.Log(logger.Fatal, msg, args) }
|
||||
func (el *encLog) Debug(msg string, args ...interface{}) { el.Log(logger.Debug, msg, args...) }
|
||||
func (el *encLog) Info(msg string, args ...interface{}) { el.Log(logger.Info, msg, args...) }
|
||||
func (el *encLog) Warning(msg string, args ...interface{}) { el.Log(logger.Warning, msg, args...) }
|
||||
func (el *encLog) Error(msg string, args ...interface{}) { el.Log(logger.Error, msg, args...) }
|
||||
func (el *encLog) Fatal(msg string, args ...interface{}) { el.Log(logger.Fatal, msg, args...) }
|
||||
|
||||
// Revid provides methods to control a revid session; providing methods
|
||||
// to start, stop and change the state of an instance using the Config struct.
|
||||
|
|
|
@ -101,11 +101,11 @@ func (ts *destination) Close() error { return nil }
|
|||
// testLogger will allow logging to be done by the testing pkg.
|
||||
type testLogger testing.T
|
||||
|
||||
func (tl *testLogger) Debug(msg string, args ...interface{}) { tl.log(logger.Debug, msg, args) }
|
||||
func (tl *testLogger) Info(msg string, args ...interface{}) { tl.log(logger.Info, msg, args) }
|
||||
func (tl *testLogger) Warning(msg string, args ...interface{}) { tl.log(logger.Warning, msg, args) }
|
||||
func (tl *testLogger) Error(msg string, args ...interface{}) { tl.log(logger.Error, msg, args) }
|
||||
func (tl *testLogger) Fatal(msg string, args ...interface{}) { tl.log(logger.Fatal, msg, args) }
|
||||
func (tl *testLogger) Debug(msg string, args ...interface{}) { tl.log(logger.Debug, msg, args...) }
|
||||
func (tl *testLogger) Info(msg string, args ...interface{}) { tl.log(logger.Info, msg, args...) }
|
||||
func (tl *testLogger) Warning(msg string, args ...interface{}) { tl.log(logger.Warning, msg, args...) }
|
||||
func (tl *testLogger) Error(msg string, args ...interface{}) { tl.log(logger.Error, msg, args...) }
|
||||
func (tl *testLogger) Fatal(msg string, args ...interface{}) { tl.log(logger.Fatal, msg, args...) }
|
||||
|
||||
func (dl *testLogger) log(lvl int8, msg string, args ...interface{}) {
|
||||
var l string
|
||||
|
|
Loading…
Reference in New Issue