From 798add533bcd3e156e863362c80c0ec035c20f61 Mon Sep 17 00:00:00 2001 From: saxon Date: Fri, 14 Dec 2018 13:35:56 +1030 Subject: [PATCH] revid + mts +psi: wrote func to find pmt in byte slice, wrote func to get gps, changed the way in which psi are insterted, based no of mpgets packets to send on time, i.e. per second --- cmd/revid-cli/main.go | 5 +++-- revid/revid.go | 7 ++++++- revid/senders.go | 5 ++++- stream/mts/encoder.go | 19 ++++++++++--------- stream/mts/mpegts.go | 17 +++++++++++++++++ 5 files changed, 40 insertions(+), 13 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index f50fd367..32b18be6 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -45,7 +45,7 @@ const ( progName = "revid-cli" // Logging is set to INFO level. - defaultLogVerbosity = smartlogger.Info + defaultLogVerbosity = smartlogger.Debug ) // Other misc consts @@ -231,8 +231,9 @@ func handleFlags() revid.Config { switch *verbosityPtr { case "No": cfg.LogLevel = smartlogger.Fatal - case "Yes": + case "Debug": cfg.LogLevel = smartlogger.Debug + //logger.SetLevel(smartlogger.Debug) case "": default: logger.Log(smartlogger.Error, pkg+"bad verbosity argument") diff --git a/revid/revid.go b/revid/revid.go index 5baf69ed..f7dbc9ad 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -125,6 +125,9 @@ type Revid struct { isRunning bool } +var now = time.Now() +var prevTime = now + // packer takes data segments and packs them into clips // of the number frames specified in the owners config. type packer struct { @@ -153,9 +156,11 @@ func (p *packer) Write(frame []byte) (int, error) { return n, err } p.packetCount++ - if p.packetCount >= p.owner.config.FramesPerClip { + now = time.Now() + if now.Sub(prevTime) > clipDuration && p.packetCount%7 == 0 { p.owner.buffer.Flush() p.packetCount = 0 + prevTime = now } return len(frame), nil } diff --git a/revid/senders.go b/revid/senders.go index 64eb7967..d2172b09 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -29,6 +29,7 @@ LICENSE package revid import ( + "fmt" "io" "net" "os" @@ -160,13 +161,14 @@ func (s *httpSender) send() error { func extractMeta(r string, s *httpSender) error { dec, err := netsender.NewJsonDecoder(r) if err != nil { - s.log(smartlogger.Warning, pkg+"Could not decode JSON to get metadata") + return nil } // Extract time from reply t, err := dec.Int("ts") if err != nil { s.log(smartlogger.Warning, pkg+"No timestamp in reply") } else { + s.log(smartlogger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t)) mts.TimeMeta(uint64(t)) } @@ -175,6 +177,7 @@ func extractMeta(r string, s *httpSender) error { if err != nil { s.log(smartlogger.Warning, pkg+"No gps in reply") } else { + s.log(smartlogger.Debug, fmt.Sprintf("%v got gps: %v", pkg, g)) mts.GpsMeta(g) } diff --git a/stream/mts/encoder.go b/stream/mts/encoder.go index 78cbf2b2..fc1d2ef1 100644 --- a/stream/mts/encoder.go +++ b/stream/mts/encoder.go @@ -43,7 +43,7 @@ var ( const ( psiPacketSize = 184 - psiSendCount = 100 + psiSendCount = 21 ) type MetaData struct { @@ -92,7 +92,7 @@ type Encoder struct { frameInterval time.Duration ptsOffset time.Duration - psiCount uint + psiCount int continuity map[int]byte } @@ -126,13 +126,7 @@ const ( // generate handles the incoming data and generates equivalent mpegts packets - // sending them to the output channel func (e *Encoder) Encode(nalu []byte) error { - if e.psiCount <= 0 { - err := e.writePSI() - if err != nil { - return err - } - } - e.psiCount-- + // Prepare PES data. pesPkt := pes.Packet{ StreamID: streamID, @@ -162,6 +156,13 @@ func (e *Encoder) Encode(nalu []byte) error { pkt.PCR = e.pcr() pusi = false } + if e.psiCount <= 0 { + err := e.writePSI() + if err != nil { + return err + } + } + e.psiCount-- _, err := e.dst.Write(pkt.Bytes()) if err != nil { return err diff --git a/stream/mts/mpegts.go b/stream/mts/mpegts.go index eb1dce6f..d8bd0ed9 100644 --- a/stream/mts/mpegts.go +++ b/stream/mts/mpegts.go @@ -28,6 +28,10 @@ LICENSE package mts +import ( + "errors" +) + const ( mpegTsSize = 188 mpegtsPayloadSize = 176 @@ -123,6 +127,19 @@ type Packet struct { Payload []byte // Mpeg ts Payload } +// FindPMT will take a clip of mpegts and try to find a PMT table - if one +// is found, then it is returned, otherwise nil and an error is returned. +func FindPMT(d []byte) (p []byte, err error) { + for i := 0; i < len(d); i += mpegTsSize { + pid := (uint16(d[i+1]&0x1f) << 8) | uint16(d[i+2]) + if pid == pmtPid { + p = d[i+4 : i+mpegTsSize] + return + } + } + return nil, errors.New("Could not find pmt table in mpegts data") +} + // FillPayload takes a channel and fills the packets Payload field until the // channel is empty or we've the packet reaches capacity func (p *Packet) FillPayload(data []byte) int {