From cdd670393042d7829bd71b5f92dcbabb5093d40d Mon Sep 17 00:00:00 2001 From: saxon Date: Wed, 6 Feb 2019 15:45:14 +1030 Subject: [PATCH 01/29] } --- revid/revid.go | 7 ++++++- stream/rtp/encoder.go | 4 ++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index 02656d03..bdf9ccf3 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -145,7 +145,12 @@ func (p *packer) Write(frame []byte) (int, error) { p.owner.config.Logger.Log(logger.Warning, pkg+"frame was too big", "frame size", len(frame)) return len(frame), nil } - n, err := p.owner.buffer.Write(frame) + var n int + var err error + if len(p.owner.destination) != 0 { + n, err = p.owner.buffer.Write(frame) + } + // If we have an rtp sender bypass ringbuffer and give straight to sender if p.owner.rtpSender != nil { err = p.owner.rtpSender.send(frame) diff --git a/stream/rtp/encoder.go b/stream/rtp/encoder.go index 20df9434..329a24c0 100644 --- a/stream/rtp/encoder.go +++ b/stream/rtp/encoder.go @@ -73,8 +73,8 @@ func NewEncoder(dst io.Writer, fps int) *Encoder { func (e *Encoder) Write(data []byte) (int, error) { e.buffer = append(e.buffer, data...) for len(e.buffer) >= sendLen { - e.Encode(e.buffer[:sendLen]) - e.buffer = e.buffer[sendLen:] + e.Encode(e.buffer) + e.buffer = e.buffer[:0] } return len(data), nil } From ef4aa8efd1018842536764b6fe877f6b15b91d92 Mon Sep 17 00:00:00 2001 From: saxon Date: Wed, 6 Feb 2019 15:52:59 +1030 Subject: [PATCH 02/29] revid/revid.go: reverted change regarding putting things in ringbuffer when we won't use it --- revid/revid.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index bdf9ccf3..6948c0f9 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -145,11 +145,7 @@ func (p *packer) Write(frame []byte) (int, error) { p.owner.config.Logger.Log(logger.Warning, pkg+"frame was too big", "frame size", len(frame)) return len(frame), nil } - var n int - var err error - if len(p.owner.destination) != 0 { - n, err = p.owner.buffer.Write(frame) - } + n, err := p.owner.buffer.Write(frame) // If we have an rtp sender bypass ringbuffer and give straight to sender if p.owner.rtpSender != nil { From 87cb303935d7d974f2159ff1d835aab22eed152e Mon Sep 17 00:00:00 2001 From: Saxon Milton Date: Wed, 6 Feb 2019 05:27:35 +0000 Subject: [PATCH 03/29] revid/revid.go: again checking to see we actually have destinations before putting anything into the ring buffer --- revid/revid.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/revid/revid.go b/revid/revid.go index 6948c0f9..8968a1e5 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -145,7 +145,11 @@ func (p *packer) Write(frame []byte) (int, error) { p.owner.config.Logger.Log(logger.Warning, pkg+"frame was too big", "frame size", len(frame)) return len(frame), nil } - n, err := p.owner.buffer.Write(frame) + var n int + var err error + if len(p.owner.destination) != 0 { + n, err := p.owner.buffer.Write(frame) + } // If we have an rtp sender bypass ringbuffer and give straight to sender if p.owner.rtpSender != nil { From ccb8a52db169fe67151837e4a420bb1758e0585d Mon Sep 17 00:00:00 2001 From: Saxon Milton Date: Wed, 6 Feb 2019 05:28:57 +0000 Subject: [PATCH 04/29] revid/revid.go: removing shorthand initialisations for vars already declared --- revid/revid.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/revid/revid.go b/revid/revid.go index 8968a1e5..454a3b8a 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -148,7 +148,7 @@ func (p *packer) Write(frame []byte) (int, error) { var n int var err error if len(p.owner.destination) != 0 { - n, err := p.owner.buffer.Write(frame) + n, err = p.owner.buffer.Write(frame) } // If we have an rtp sender bypass ringbuffer and give straight to sender From c7a9e0a06bfab6686caf24c483a6276e119cce98 Mon Sep 17 00:00:00 2001 From: saxon Date: Thu, 24 Jan 2019 22:29:43 +1030 Subject: [PATCH 05/29] mts: fixing cc --- cmd/ts-repair/main.go | 82 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 cmd/ts-repair/main.go diff --git a/cmd/ts-repair/main.go b/cmd/ts-repair/main.go new file mode 100644 index 00000000..bcb1ebc1 --- /dev/null +++ b/cmd/ts-repair/main.go @@ -0,0 +1,82 @@ +package main + +import ( + "flag" + "io" + "os" + + "bitbucket.org/ausocean/av/stream/mts" + "github.com/Comcast/gots/packet" +) + +const ( + errBadInPath = "No file path provided, or file does not exist" + errCantCreateOut = "Can't create output file" + errCantGetPid = "Can't get pid from packet" + errReadFail = "Read failed" + errWriteFail = "Write to file failed" + usage = "The path to the file to be repaired" +) + +var ccMap = map[int]byte{ + mts.PatPid: 1, + mts.PmtPid: 1, + mts.VideoPid: 0, +} + +type Packet [mts.PacketSize]byte + +func (p *Packet) setCC(cc byte) { + (*p)[3] |= cc & 0xf +} + +func main() { + // Deal with input flags + inPtr := flag.String("path", "", usage) + outPtr := flag.String("out", "out.ts", usage) + flag.Parse() + + // Try and open the given input file, otherwise panic - we can't do anything + inFile, err := os.Open(*inPtr) + defer inFile.Close() + if err != nil { + panic(errBadInPath) + } + + // Try and create output file, otherwise panic - we can't do anything + outFile, err := os.Create(*outPtr) + defer outFile.Close() + if err != nil { + panic(errCantCreateOut) + } + + // Read each packet from the input file reader + var p Packet + for { + // If we get an end of file then return, otherwise we panic - can't do anything else + if _, err := inFile.Read(p[:mts.PacketSize]); err == io.EOF { + return + } else if err != nil { + panic(errReadFail + ": " + err.Error()) + } + + // Get the pid from the packet and set the cc based on this pid using our map + pid, err := packet.Pid((*packet.Packet)(&p)) + if err != nil { + panic(errCantGetPid) + } + p.setCC(ccFor(int(pid))) + + // Write this packet to the output file + if _, err := outFile.Write(p[:]); err != nil { + panic(errWriteFail + ": " + err.Error()) + } + } +} + +// ccFor gets the next cc for the given pid +func ccFor(pid int) byte { + cc := ccMap[pid] + ccMap[pid] = (cc + 1) & 0xf + return cc +} From 72f0087b9a6d22566a30e4bc0b01d915095c9406 Mon Sep 17 00:00:00 2001 From: saxon Date: Fri, 25 Jan 2019 15:03:57 +1030 Subject: [PATCH 06/29] cmd/ts-repair: got setting of discontinuity indicators working and also adding adaptation fields to pat and pmt for this reason. --- cmd/ts-repair/main.go | 125 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 115 insertions(+), 10 deletions(-) diff --git a/cmd/ts-repair/main.go b/cmd/ts-repair/main.go index bcb1ebc1..f34da350 100644 --- a/cmd/ts-repair/main.go +++ b/cmd/ts-repair/main.go @@ -2,6 +2,7 @@ package main import ( "flag" + "fmt" "io" "os" @@ -15,25 +16,94 @@ const ( errCantGetPid = "Can't get pid from packet" errReadFail = "Read failed" errWriteFail = "Write to file failed" - usage = "The path to the file to be repaired" + errBadMode = "Bad fix mode" +) + +const ( + inUsage = "The path to the file to be repaired" + outUsage = "Output file path" + modeUsage = "Fix mode: 0 = cc-shift, 1 = di-update" +) + +const ( + ccShift = iota + diUpdate ) var ccMap = map[int]byte{ - mts.PatPid: 1, - mts.PmtPid: 1, - mts.VideoPid: 0, + mts.PatPid: 16, + mts.PmtPid: 16, + mts.VideoPid: 16, } +var packetNo int + +type Option func(p *Packet) + type Packet [mts.PacketSize]byte +func (p *Packet) CC() byte { + return (*p)[3] & 0x0f +} + func (p *Packet) setCC(cc byte) { (*p)[3] |= cc & 0xf } +func (p *Packet) setDI(di bool) { + if di { + p[5] |= 0x80 + } else { + p[5] &= 0x7f + } +} + +func (p *Packet) addAdaptationField(options ...Option) { + // Create space for adaptation field + copy(p[mts.HeadSize+mts.DefaultAdaptationSize:], p[mts.HeadSize:len(p)-mts.DefaultAdaptationSize]) + + // TODO: seperate into own function + // Update adaptation field control + p[mts.AdaptationControlIdx] &= 0xff ^ mts.AdaptationControlMask + p[mts.AdaptationControlIdx] |= mts.AdaptationControlMask + // Default the adaptationfield + p.resetAdaptation() + + for _, option := range options { + option(p) + } +} + +func (p *Packet) resetAdaptation() { + p[mts.AdaptationIdx] = mts.DefaultAdaptationBodySize + p[mts.AdaptationBodyIdx] = 0x00 +} + +func (p *Packet) hasAdaptation() bool { + afc := p[mts.AdaptationControlIdx] & mts.AdaptationControlMask + if afc == 0x20 || afc == 0x30 { + return true + } else { + return false + } +} + +func DiscontinuityIndicator(f bool) Option { + return func(p *Packet) { + set := byte(mts.DiscontinuityIndicatorMask) + if !f { + set = 0x00 + } + p[mts.DiscontinuityIndicatorIdx] &= 0xff ^ mts.DiscontinuityIndicatorMask + p[mts.DiscontinuityIndicatorIdx] |= mts.DiscontinuityIndicatorMask & set + } +} + func main() { // Deal with input flags - inPtr := flag.String("path", "", usage) - outPtr := flag.String("out", "out.ts", usage) + inPtr := flag.String("in", "", inUsage) + outPtr := flag.String("out", "out.ts", outUsage) + modePtr := flag.Int("mode", diUpdate, modeUsage) flag.Parse() // Try and open the given input file, otherwise panic - we can't do anything @@ -59,14 +129,42 @@ func main() { } else if err != nil { panic(errReadFail + ": " + err.Error()) } - + packetNo++ // Get the pid from the packet and set the cc based on this pid using our map pid, err := packet.Pid((*packet.Packet)(&p)) if err != nil { panic(errCantGetPid) } - p.setCC(ccFor(int(pid))) + cc := p.CC() + expect, exists := expectedCC(int(pid)) + if !exists { + updateCCMap(int(pid), cc) + } else { + switch *modePtr { + case ccShift: + p.setCC(expect) + + case diUpdate: + + if cc != expect { + fmt.Printf("packetNo: %v pid: %v, cc: %v, expect: %v\n", packetNo, pid, cc, expect) + if p.hasAdaptation() { + fmt.Println("hasAdaptation") + p.setDI(true) + } else { + fmt.Println("doesn't have adaptation") + fmt.Println(p) + p.addAdaptationField(DiscontinuityIndicator(true)) + fmt.Println(p) + } + updateCCMap(int(pid), p.CC()) + } + + default: + panic(errBadMode) + } + } // Write this packet to the output file if _, err := outFile.Write(p[:]); err != nil { panic(errWriteFail + ": " + err.Error()) @@ -75,8 +173,15 @@ func main() { } // ccFor gets the next cc for the given pid -func ccFor(pid int) byte { +func expectedCC(pid int) (byte, bool) { cc := ccMap[pid] + if cc == 16 { + return 16, false + } + ccMap[pid] = (cc + 1) & 0xf + return cc, true +} + +func updateCCMap(pid int, cc byte) { ccMap[pid] = (cc + 1) & 0xf - return cc } From cac718473758b686b3880af04e0bfa1e5558b9f0 Mon Sep 17 00:00:00 2001 From: saxon Date: Fri, 25 Jan 2019 15:18:27 +1030 Subject: [PATCH 07/29] cmd/ts-repair: got rid of debug prints --- cmd/ts-repair/main.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/cmd/ts-repair/main.go b/cmd/ts-repair/main.go index f34da350..039ada9b 100644 --- a/cmd/ts-repair/main.go +++ b/cmd/ts-repair/main.go @@ -148,15 +148,11 @@ func main() { case diUpdate: if cc != expect { - fmt.Printf("packetNo: %v pid: %v, cc: %v, expect: %v\n", packetNo, pid, cc, expect) + fmt.Printf("***** Discontinuity found (packetNo: %v pid: %v, cc: %v, expect: %v)\n", packetNo, pid, cc, expect) if p.hasAdaptation() { - fmt.Println("hasAdaptation") p.setDI(true) } else { - fmt.Println("doesn't have adaptation") - fmt.Println(p) p.addAdaptationField(DiscontinuityIndicator(true)) - fmt.Println(p) } updateCCMap(int(pid), p.CC()) } From 90a85fbfbb68731f664ebaacef39a0e6d23409c7 Mon Sep 17 00:00:00 2001 From: saxon Date: Fri, 25 Jan 2019 16:10:13 +1030 Subject: [PATCH 08/29] cmd/ts-repair: updated comments and made funcs more robust --- cmd/ts-repair/main.go | 68 ++++++++++++++++++++++++++++++++----------- 1 file changed, 51 insertions(+), 17 deletions(-) diff --git a/cmd/ts-repair/main.go b/cmd/ts-repair/main.go index 039ada9b..f3f73bb2 100644 --- a/cmd/ts-repair/main.go +++ b/cmd/ts-repair/main.go @@ -1,6 +1,7 @@ package main import ( + "errors" "flag" "fmt" "io" @@ -10,21 +11,26 @@ import ( "github.com/Comcast/gots/packet" ) +// Various errors that we can encounter. const ( - errBadInPath = "No file path provided, or file does not exist" - errCantCreateOut = "Can't create output file" - errCantGetPid = "Can't get pid from packet" - errReadFail = "Read failed" - errWriteFail = "Write to file failed" - errBadMode = "Bad fix mode" + errBadInPath = "No file path provided, or file does not exist" + errCantCreateOut = "Can't create output file" + errCantGetPid = "Can't get pid from packet" + errReadFail = "Read failed" + errWriteFail = "Write to file failed" + errBadMode = "Bad fix mode" + errAdaptationPresent = "Adaptation field is already present in packet" + errNoAdaptationField = "No adaptation field in this packet" ) +// Consts describing flag usage. const ( inUsage = "The path to the file to be repaired" outUsage = "Output file path" modeUsage = "Fix mode: 0 = cc-shift, 1 = di-update" ) +// Repair modes. const ( ccShift = iota diUpdate @@ -36,20 +42,27 @@ var ccMap = map[int]byte{ mts.VideoPid: 16, } +// packetNo will keep track of the ts packet number for reference. var packetNo int +// Option defines a func that performs an action on p in order to change a ts option. type Option func(p *Packet) +// Packet is a byte array of size mts.PacketSize i.e. 188 bytes. We define this +// to allow us to write receiver funcs for the [mts.PacketSize]byte type. type Packet [mts.PacketSize]byte +// CC returns the CC of p. func (p *Packet) CC() byte { return (*p)[3] & 0x0f } +// setCC sets the CC of p. func (p *Packet) setCC(cc byte) { (*p)[3] |= cc & 0xf } +// setDI sets the discontinuity counter of p. func (p *Packet) setDI(di bool) { if di { p[5] |= 0x80 @@ -58,27 +71,41 @@ func (p *Packet) setDI(di bool) { } } -func (p *Packet) addAdaptationField(options ...Option) { - // Create space for adaptation field +// addAdaptationField adds an adaptation field to p, and applys the passed options to this field. +// TODO: this will probably break if we already have adaptation field. +func (p *Packet) addAdaptationField(options ...Option) error { + if p.hasAdaptation() { + return errors.New(errAdaptationPresent) + } + // Create space for adaptation field. copy(p[mts.HeadSize+mts.DefaultAdaptationSize:], p[mts.HeadSize:len(p)-mts.DefaultAdaptationSize]) // TODO: seperate into own function - // Update adaptation field control + // Update adaptation field control. p[mts.AdaptationControlIdx] &= 0xff ^ mts.AdaptationControlMask p[mts.AdaptationControlIdx] |= mts.AdaptationControlMask - // Default the adaptationfield + // Default the adaptationfield. p.resetAdaptation() + // Apply and options that have bee passed. for _, option := range options { option(p) } + return nil } -func (p *Packet) resetAdaptation() { +// resetAdaptation sets fields in ps adaptation field to 0 if the adaptation field +// exists, otherwise an error is returned. +func (p *Packet) resetAdaptation() error { + if !p.hasAdaptation() { + return errors.New(errNoAdaptationField) + } p[mts.AdaptationIdx] = mts.DefaultAdaptationBodySize p[mts.AdaptationBodyIdx] = 0x00 + return nil } +// hasAdaptation returns true if p has an adaptation field and false otherwise. func (p *Packet) hasAdaptation() bool { afc := p[mts.AdaptationControlIdx] & mts.AdaptationControlMask if afc == 0x20 || afc == 0x30 { @@ -88,6 +115,8 @@ func (p *Packet) hasAdaptation() bool { } } +// DiscontinuityIndicator returns and Option that will set p's discontinuity +// indicator according to f. func DiscontinuityIndicator(f bool) Option { return func(p *Packet) { set := byte(mts.DiscontinuityIndicatorMask) @@ -130,23 +159,26 @@ func main() { panic(errReadFail + ": " + err.Error()) } packetNo++ - // Get the pid from the packet and set the cc based on this pid using our map + + // Get the pid from the packet pid, err := packet.Pid((*packet.Packet)(&p)) if err != nil { panic(errCantGetPid) } + // Get the cc from the packet and also the expected cc (if exists) cc := p.CC() expect, exists := expectedCC(int(pid)) if !exists { updateCCMap(int(pid), cc) } else { switch *modePtr { + // ccShift mode shifts all CC regardless of presence of Discontinuities or not case ccShift: p.setCC(expect) - + // diUpdate mode finds discontinuities and sets the discontinuity indicator to true. + // If we have a pat or pmt then we need to add an adaptation field and then set the DI. case diUpdate: - if cc != expect { fmt.Printf("***** Discontinuity found (packetNo: %v pid: %v, cc: %v, expect: %v)\n", packetNo, pid, cc, expect) if p.hasAdaptation() { @@ -156,19 +188,20 @@ func main() { } updateCCMap(int(pid), p.CC()) } - default: panic(errBadMode) } } - // Write this packet to the output file + + // Write this packet to the output file. if _, err := outFile.Write(p[:]); err != nil { panic(errWriteFail + ": " + err.Error()) } } } -// ccFor gets the next cc for the given pid +// expectedCC returns the expected cc for the given pid. If the cc hasn't been +// used yet, then 16 and false is returned. func expectedCC(pid int) (byte, bool) { cc := ccMap[pid] if cc == 16 { @@ -178,6 +211,7 @@ func expectedCC(pid int) (byte, bool) { return cc, true } +// updateCCMap updates the cc for the passed pid. func updateCCMap(pid int, cc byte) { ccMap[pid] = (cc + 1) & 0xf } From a6d02a2e0289399cc69accdb033b33992d9a5617 Mon Sep 17 00:00:00 2001 From: saxon Date: Fri, 25 Jan 2019 16:25:01 +1030 Subject: [PATCH 09/29] stream/mts: adding some constants --- stream/mts/encoder.go | 39 ++++++++++++---------------------- stream/mts/mpegts.go | 49 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 61 insertions(+), 27 deletions(-) diff --git a/stream/mts/encoder.go b/stream/mts/encoder.go index 02761b91..e49d57b5 100644 --- a/stream/mts/encoder.go +++ b/stream/mts/encoder.go @@ -171,14 +171,6 @@ var ( pmtTable = standardPmtTimeLocation.Bytes() ) -const ( - sdtPid = 17 - patPid = 0 - pmtPid = 4096 - videoPid = 256 - streamID = 0xe0 // First video stream ID. -) - // Time related constants. const ( // ptsOffset is the offset added to the clock to determine @@ -213,18 +205,13 @@ func NewEncoder(dst io.Writer, fps float64) *Encoder { ptsOffset: ptsOffset, continuity: map[int]byte{ - patPid: 0, - pmtPid: 0, - videoPid: 0, + PatPid: 0, + PmtPid: 0, + VideoPid: 0, }, } } -const ( - hasPayload = 0x1 - hasAdaptationField = 0x2 -) - const ( hasDTS = 0x1 hasPTS = 0x2 @@ -244,7 +231,7 @@ func (e *Encoder) Encode(nalu []byte) error { // Prepare PES data. pesPkt := pes.Packet{ - StreamID: streamID, + StreamID: StreamID, PDI: hasPTS, PTS: e.pts(), Data: nalu, @@ -256,10 +243,10 @@ func (e *Encoder) Encode(nalu []byte) error { for len(buf) != 0 { pkt := Packet{ PUSI: pusi, - PID: videoPid, + PID: VideoPid, RAI: pusi, - CC: e.ccFor(videoPid), - AFC: hasAdaptationField | hasPayload, + CC: e.ccFor(VideoPid), + AFC: HasAdaptationField | HasPayload, PCRF: pusi, } n := pkt.FillPayload(buf) @@ -288,9 +275,9 @@ func (e *Encoder) writePSI() error { // Write PAT. patPkt := Packet{ PUSI: true, - PID: patPid, - CC: e.ccFor(patPid), - AFC: hasPayload, + PID: PatPid, + CC: e.ccFor(PatPid), + AFC: HasPayload, Payload: patTable, } _, err := e.dst.Write(patPkt.Bytes(e.tsSpace[:PacketSize])) @@ -311,9 +298,9 @@ func (e *Encoder) writePSI() error { // Create mts packet from pmt table. pmtPkt := Packet{ PUSI: true, - PID: pmtPid, - CC: e.ccFor(pmtPid), - AFC: hasPayload, + PID: PmtPid, + CC: e.ccFor(PmtPid), + AFC: HasPayload, Payload: pmtTable, } _, err = e.dst.Write(pmtPkt.Bytes(e.tsSpace[:PacketSize])) diff --git a/stream/mts/mpegts.go b/stream/mts/mpegts.go index 0bef80d2..71849513 100644 --- a/stream/mts/mpegts.go +++ b/stream/mts/mpegts.go @@ -37,6 +37,53 @@ const ( PayloadSize = 176 ) +const ( + SdtPid = 17 + PatPid = 0 + PmtPid = 4096 + VideoPid = 256 + StreamID = 0xe0 // First video stream ID. + HeadSize = 4 + DefaultAdaptationSize = 2 +) + +const ( + AdaptationIdx = 4 + AdaptationControlIdx = 3 + AdaptationBodyIdx = AdaptationIdx + 1 + AdaptationControlMask = 0x30 + DefaultAdaptationBodySize = 1 +) + +const ( + HasPayload = 0x1 + HasAdaptationField = 0x2 +) + +// Adaptation field body masks. +const ( + DiscontinuityIndicatorMask = 0x80 + RandomAccessIndicatorMask = 0x40 + ElementaryStreamPriorityIndicatorMask = 0x20 + ProgramClockReferenceFlagMask = 0x10 + OriginalProgramClockReferenceFlagMask = 0x08 + SplicingPointFlagMask = 0x04 + TransportPrivateDataFlagMask = 0x02 + AdaptationFieldExtensionMask = 0x01 +) + +// Adaptation field body indexes. +const ( + DiscontinuityIndicatorIdx = AdaptationIdx + 1 + RandomAccessIndicatorIdx = AdaptationIdx + 1 + ElementaryStreamPriorityIndicatorIdx = AdaptationIdx + 1 + ProgramClockReferenceFlagIdx = AdaptationIdx + 1 + OriginalProgramClockReferenceFlagIdx = AdaptationIdx + 1 + SplicingPointFlagIdx = AdaptationIdx + 1 + TransportPrivateDataFlagIdx = AdaptationIdx + 1 + AdaptationFieldExtensionFlagIdx = AdaptationIdx + 1 +) + /* The below data struct encapsulates the fields of an MPEG-TS packet. Below is the formatting of an MPEG-TS packet for reference! @@ -135,7 +182,7 @@ func FindPMT(d []byte) (p []byte, i int, err error) { } for i = 0; i < len(d); i += PacketSize { pid := (uint16(d[i+1]&0x1f) << 8) | uint16(d[i+2]) - if pid == pmtPid { + if pid == PmtPid { p = d[i+4 : i+PacketSize] return } From b28a75d665e275fa4faec7eacd47b566675ccdf5 Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 22 Jan 2019 14:31:42 +1030 Subject: [PATCH 10/29] av/revid: removed test commands that we're not using anymore --- revid/cmd/h264-file-to-flv-rtmp/main.go | 75 ---------------------- revid/cmd/h264-file-to-mpgets-file/main.go | 66 ------------------- 2 files changed, 141 deletions(-) delete mode 100644 revid/cmd/h264-file-to-flv-rtmp/main.go delete mode 100644 revid/cmd/h264-file-to-mpgets-file/main.go diff --git a/revid/cmd/h264-file-to-flv-rtmp/main.go b/revid/cmd/h264-file-to-flv-rtmp/main.go deleted file mode 100644 index 4f7c9d7c..00000000 --- a/revid/cmd/h264-file-to-flv-rtmp/main.go +++ /dev/null @@ -1,75 +0,0 @@ -/* -NAME - main.go - -DESCRIPTION - See Readme.md - -AUTHOR - Saxon Nelson-Milton - -LICENSE - main.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) - - It is free software: you can redistribute it and/or modify them - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - It is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. - - You should have received a copy of the GNU General Public License - along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). -*/ - -package main - -import ( - "flag" - "log" - "time" - - "bitbucket.org/ausocean/av/revid" - "bitbucket.org/ausocean/iot/pi/smartlogger" - "bitbucket.org/ausocean/utils/logger" -) - -const ( - inputFile = "../../../../test/test-data/av/input/betterInput.h264" - frameRate = "25" - runDuration = 120 * time.Second - logPath = "/var/log" -) - -// Test h264 inputfile to flv format into rtmp using librtmp c wrapper -func main() { - // Get the rtmp url from a cmd flag - rtmpUrlPtr := flag.String("rtmpUrl", "", "The rtmp url you would like to stream to.") - flag.Parse() - if *rtmpUrlPtr == "" { - log.Println("No RTMP url passed!") - return - } - - config := revid.Config{ - Input: revid.File, - InputFileName: inputFile, - InputCodec: revid.H264, - Outputs: []byte{revid.Rtmp}, - RtmpMethod: revid.LibRtmp, - RtmpUrl: *rtmpUrlPtr, - Packetization: revid.Flv, - Logger: logger.New(logger.Info, &smartlogger.New(logPath).LogRoller), - } - revidInst, err := revid.New(config, nil) - if err != nil { - config.Logger.Log(logger.Error, "Should not have got an error!: ", err.Error()) - return - } - revidInst.Start() - time.Sleep(runDuration) - revidInst.Stop() -} diff --git a/revid/cmd/h264-file-to-mpgets-file/main.go b/revid/cmd/h264-file-to-mpgets-file/main.go deleted file mode 100644 index 768f560b..00000000 --- a/revid/cmd/h264-file-to-mpgets-file/main.go +++ /dev/null @@ -1,66 +0,0 @@ -/* -NAME - main.go - -DESCRIPTION - See Readme.md - -AUTHOR - Saxon Nelson-Milton - -LICENSE - main.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) - - It is free software: you can redistribute it and/or modify them - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - It is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. - - You should have received a copy of the GNU General Public License - along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). -*/ - -package main - -import ( - "time" - - "bitbucket.org/ausocean/av/revid" - "bitbucket.org/ausocean/iot/pi/smartlogger" - "bitbucket.org/ausocean/utils/logger" -) - -const ( - inputFile = "../../../../test/test-data/av/input/betterInput.h264" - outputFile = "output.ts" - frameRate = "25" - runDuration = 120 * time.Second - logPath = "/var/log" -) - -// Test h264 inputfile to flv format into rtmp using librtmp c wrapper -func main() { - - config := revid.Config{ - Input: revid.File, - InputFileName: inputFile, - InputCodec: revid.H264, - Outputs: []byte{revid.File}, - OutputFileName: outputFile, - Packetization: revid.Mpegts, - Logger: logger.New(logger.Info, &smartlogger.New(logPath).LogRoller), - } - revidInst, err := revid.New(config, nil) - if err != nil { - config.Logger.Log(logger.Error, "Should not have got an error!:", err.Error()) - return - } - revidInst.Start() - time.Sleep(runDuration) - revidInst.Stop() -} From dda6602a475ad5f795776656e506c49acf25d7f6 Mon Sep 17 00:00:00 2001 From: saxon Date: Mon, 21 Jan 2019 14:57:40 +1030 Subject: [PATCH 11/29] av/stream/flac: added decode.go and flac_test.go --- stream/flac/decode.go | 0 stream/flac/flac_test.go | 0 2 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 stream/flac/decode.go create mode 100644 stream/flac/flac_test.go diff --git a/stream/flac/decode.go b/stream/flac/decode.go new file mode 100644 index 00000000..e69de29b diff --git a/stream/flac/flac_test.go b/stream/flac/flac_test.go new file mode 100644 index 00000000..e69de29b From 631b3a27f2833cc89a0fa0fd89be408d98b68d6f Mon Sep 17 00:00:00 2001 From: saxon Date: Mon, 21 Jan 2019 15:41:49 +1030 Subject: [PATCH 12/29] av/stream/flac: wrote decode function and test to see if we can get wav. --- stream/flac/decode.go | 108 +++++++++++++++++++++++++++++++++++++++ stream/flac/flac_test.go | 31 +++++++++++ 2 files changed, 139 insertions(+) diff --git a/stream/flac/decode.go b/stream/flac/decode.go index e69de29b..2ecfb737 100644 --- a/stream/flac/decode.go +++ b/stream/flac/decode.go @@ -0,0 +1,108 @@ +package flac + +import ( + "bytes" + "errors" + "io" + + "github.com/go-audio/audio" + "github.com/go-audio/wav" + "github.com/mewkiz/flac" +) + +const wavAudioFormat = 1 + +type buffer struct { + Buffer bytes.Buffer + Index int64 +} + +func (b *buffer) Bytes() []byte { + return b.Buffer.Bytes() +} + +func (b *buffer) Read(p []byte) (int, error) { + n, err := bytes.NewBuffer(b.Buffer.Bytes()[b.Index:]).Read(p) + + if err == nil { + if b.Index+int64(len(p)) < int64(b.Buffer.Len()) { + b.Index += int64(len(p)) + } else { + b.Index = int64(b.Buffer.Len()) + } + } + + return n, err +} + +func (b *buffer) Write(p []byte) (int, error) { + n, err := b.Buffer.Write(p) + + if err == nil { + b.Index = int64(b.Buffer.Len()) + } + + return n, err +} + +func (b *buffer) Seek(offset int64, whence int) (int64, error) { + var err error + var Index int64 = 0 + + switch whence { + case 0: + if offset >= int64(b.Buffer.Len()) || offset < 0 { + err = errors.New("Invalid Offset.") + } else { + b.Index = offset + Index = offset + } + default: + err = errors.New("Unsupported Seek Method.") + } + + return Index, err +} + +// Decode takes a slice of flac and decodes to wav +func Decode(buf []byte) ([]byte, error) { + r := bytes.NewReader(buf) + stream, err := flac.Parse(r) + if err != nil { + return nil, errors.New("Could not parse FLAC") + } + fb := &buffer{} + enc := wav.NewEncoder(fb, int(stream.Info.SampleRate), int(stream.Info.BitsPerSample), int(stream.Info.NChannels), wavAudioFormat) + defer enc.Close() + var data []int + for { + // Decode FLAC audio samples. + frame, err := stream.ParseNext() + if err != nil { + if err == io.EOF { + break + } + return nil, err + } + + // Encode WAV audio samples. + data = data[:0] + for i := 0; i < frame.Subframes[0].NSamples; i++ { + for _, subframe := range frame.Subframes { + data = append(data, int(subframe.Samples[i])) + } + } + buf := &audio.IntBuffer{ + Format: &audio.Format{ + NumChannels: int(stream.Info.NChannels), + SampleRate: int(stream.Info.SampleRate), + }, + Data: data, + SourceBitDepth: int(stream.Info.BitsPerSample), + } + if err := enc.Write(buf); err != nil { + return nil, err + } + } + return fb.Bytes(), nil +} diff --git a/stream/flac/flac_test.go b/stream/flac/flac_test.go index e69de29b..763731d4 100644 --- a/stream/flac/flac_test.go +++ b/stream/flac/flac_test.go @@ -0,0 +1,31 @@ +package flac + +import ( + "io/ioutil" + "os" + "testing" +) + +const ( + testFile = "/home/saxon/Desktop/robot.flac" + outFile = "out.wav" +) + +func TestDecodeFlac(t *testing.T) { + b, err := ioutil.ReadFile(testFile) + if err != nil { + t.Fatalf("Could not read test file, failed with err: %v", err.Error()) + } + out, err := Decode(b) + if err != nil { + t.Errorf("Could not decode, failed with err: %v", err.Error()) + } + f, err := os.Create(outFile) + if err != nil { + t.Fatalf("Could not create output file, failed with err: %v", err.Error()) + } + _, err = f.Write(out) + if err != nil { + t.Fatalf("Could not write to output file, failed with err: %v", err.Error()) + } +} From 05f5a967fee4c0f78d11f57aa11916f788c9ea46 Mon Sep 17 00:00:00 2001 From: saxon Date: Mon, 21 Jan 2019 17:34:15 +1030 Subject: [PATCH 13/29] av/stream/flac: using writerseeker to pass to wav.NewEncoder because I don't want to give it a file, but it's not working --- stream/flac/decode.go | 68 ++++++++-------------------------------- stream/flac/flac_test.go | 2 +- 2 files changed, 14 insertions(+), 56 deletions(-) diff --git a/stream/flac/decode.go b/stream/flac/decode.go index 2ecfb737..941610e2 100644 --- a/stream/flac/decode.go +++ b/stream/flac/decode.go @@ -4,66 +4,16 @@ import ( "bytes" "errors" "io" + "io/ioutil" "github.com/go-audio/audio" "github.com/go-audio/wav" "github.com/mewkiz/flac" + "github.com/orcaman/writerseeker" ) const wavAudioFormat = 1 -type buffer struct { - Buffer bytes.Buffer - Index int64 -} - -func (b *buffer) Bytes() []byte { - return b.Buffer.Bytes() -} - -func (b *buffer) Read(p []byte) (int, error) { - n, err := bytes.NewBuffer(b.Buffer.Bytes()[b.Index:]).Read(p) - - if err == nil { - if b.Index+int64(len(p)) < int64(b.Buffer.Len()) { - b.Index += int64(len(p)) - } else { - b.Index = int64(b.Buffer.Len()) - } - } - - return n, err -} - -func (b *buffer) Write(p []byte) (int, error) { - n, err := b.Buffer.Write(p) - - if err == nil { - b.Index = int64(b.Buffer.Len()) - } - - return n, err -} - -func (b *buffer) Seek(offset int64, whence int) (int64, error) { - var err error - var Index int64 = 0 - - switch whence { - case 0: - if offset >= int64(b.Buffer.Len()) || offset < 0 { - err = errors.New("Invalid Offset.") - } else { - b.Index = offset - Index = offset - } - default: - err = errors.New("Unsupported Seek Method.") - } - - return Index, err -} - // Decode takes a slice of flac and decodes to wav func Decode(buf []byte) ([]byte, error) { r := bytes.NewReader(buf) @@ -71,10 +21,12 @@ func Decode(buf []byte) ([]byte, error) { if err != nil { return nil, errors.New("Could not parse FLAC") } - fb := &buffer{} - enc := wav.NewEncoder(fb, int(stream.Info.SampleRate), int(stream.Info.BitsPerSample), int(stream.Info.NChannels), wavAudioFormat) + ws := &writerseeker.WriterSeeker{} + enc := wav.NewEncoder(ws, int(stream.Info.SampleRate), int(stream.Info.BitsPerSample), int(stream.Info.NChannels), wavAudioFormat) defer enc.Close() var data []int + var out []byte + var d []byte for { // Decode FLAC audio samples. frame, err := stream.ParseNext() @@ -103,6 +55,12 @@ func Decode(buf []byte) ([]byte, error) { if err := enc.Write(buf); err != nil { return nil, err } + d, err = ioutil.ReadAll(ws.Reader()) + if err != nil { + return nil, err + } + out = append(out, d...) } - return fb.Bytes(), nil + + return d, nil } diff --git a/stream/flac/flac_test.go b/stream/flac/flac_test.go index 763731d4..13bef836 100644 --- a/stream/flac/flac_test.go +++ b/stream/flac/flac_test.go @@ -8,7 +8,7 @@ import ( const ( testFile = "/home/saxon/Desktop/robot.flac" - outFile = "out.wav" + outFile = "testOut.wav" ) func TestDecodeFlac(t *testing.T) { From 670ebfeaf8526eb902e23879c1a432f89000b6c2 Mon Sep 17 00:00:00 2001 From: saxon Date: Mon, 21 Jan 2019 17:37:16 +1030 Subject: [PATCH 14/29] av/stream/flac: moved readAll to after loop --- stream/flac/decode.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/stream/flac/decode.go b/stream/flac/decode.go index 941610e2..6c8445e4 100644 --- a/stream/flac/decode.go +++ b/stream/flac/decode.go @@ -25,8 +25,6 @@ func Decode(buf []byte) ([]byte, error) { enc := wav.NewEncoder(ws, int(stream.Info.SampleRate), int(stream.Info.BitsPerSample), int(stream.Info.NChannels), wavAudioFormat) defer enc.Close() var data []int - var out []byte - var d []byte for { // Decode FLAC audio samples. frame, err := stream.ParseNext() @@ -55,12 +53,10 @@ func Decode(buf []byte) ([]byte, error) { if err := enc.Write(buf); err != nil { return nil, err } - d, err = ioutil.ReadAll(ws.Reader()) - if err != nil { - return nil, err - } - out = append(out, d...) } - + d, err := ioutil.ReadAll(ws.Reader()) + if err != nil { + return nil, err + } return d, nil } From 40ed9fcee17f13156789ac7d894b4e53b8e58ecd Mon Sep 17 00:00:00 2001 From: saxon Date: Mon, 21 Jan 2019 17:50:09 +1030 Subject: [PATCH 15/29] av/stream/flc: using my own writeSeeker implementation - working --- stream/flac/decode.go | 52 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 44 insertions(+), 8 deletions(-) diff --git a/stream/flac/decode.go b/stream/flac/decode.go index 6c8445e4..667fcf9c 100644 --- a/stream/flac/decode.go +++ b/stream/flac/decode.go @@ -4,16 +4,55 @@ import ( "bytes" "errors" "io" - "io/ioutil" "github.com/go-audio/audio" "github.com/go-audio/wav" "github.com/mewkiz/flac" - "github.com/orcaman/writerseeker" ) const wavAudioFormat = 1 +type WriterSeeker struct { + buf []byte + pos int +} + +func (ws *WriterSeeker) Bytes() []byte { + return ws.buf +} + +func (m *WriterSeeker) Write(p []byte) (n int, err error) { + minCap := m.pos + len(p) + if minCap > cap(m.buf) { // Make sure buf has enough capacity: + buf2 := make([]byte, len(m.buf), minCap+len(p)) // add some extra + copy(buf2, m.buf) + m.buf = buf2 + } + if minCap > len(m.buf) { + m.buf = m.buf[:minCap] + } + copy(m.buf[m.pos:], p) + m.pos += len(p) + return len(p), nil +} + +func (m *WriterSeeker) Seek(offset int64, whence int) (int64, error) { + newPos, offs := 0, int(offset) + switch whence { + case io.SeekStart: + newPos = offs + case io.SeekCurrent: + newPos = m.pos + offs + case io.SeekEnd: + newPos = len(m.buf) + offs + } + if newPos < 0 { + return 0, errors.New("negative result pos") + } + m.pos = newPos + return int64(newPos), nil +} + // Decode takes a slice of flac and decodes to wav func Decode(buf []byte) ([]byte, error) { r := bytes.NewReader(buf) @@ -21,7 +60,7 @@ func Decode(buf []byte) ([]byte, error) { if err != nil { return nil, errors.New("Could not parse FLAC") } - ws := &writerseeker.WriterSeeker{} + ws := &WriterSeeker{} enc := wav.NewEncoder(ws, int(stream.Info.SampleRate), int(stream.Info.BitsPerSample), int(stream.Info.NChannels), wavAudioFormat) defer enc.Close() var data []int @@ -54,9 +93,6 @@ func Decode(buf []byte) ([]byte, error) { return nil, err } } - d, err := ioutil.ReadAll(ws.Reader()) - if err != nil { - return nil, err - } - return d, nil + + return ws.Bytes(), nil } From 3773fc79fa90474bcbd9b207e59807dc2871815d Mon Sep 17 00:00:00 2001 From: saxon Date: Mon, 21 Jan 2019 22:52:17 +1030 Subject: [PATCH 16/29] av/stream/flac/decode.go: wrote func headers --- stream/flac/decode.go | 79 ++++++++++++++++++++++++++++++---------- stream/flac/flac_test.go | 26 +++++++++++++ 2 files changed, 85 insertions(+), 20 deletions(-) diff --git a/stream/flac/decode.go b/stream/flac/decode.go index 667fcf9c..42c4dace 100644 --- a/stream/flac/decode.go +++ b/stream/flac/decode.go @@ -1,3 +1,29 @@ +/* +NAME + decode.go + +DESCRIPTION + decode.go provides functionality for the decoding of FLAC compressed audio + +AUTHOR + Saxon Nelson-Milton + +LICENSE + decode.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ package flac import ( @@ -10,59 +36,72 @@ import ( "github.com/mewkiz/flac" ) -const wavAudioFormat = 1 +const wavFormat = 1 -type WriterSeeker struct { +// writeSeeker implements a memory based io.WriteSeeker. +type writeSeeker struct { buf []byte pos int } -func (ws *WriterSeeker) Bytes() []byte { +// Bytes returns the bytes contained in the writeSeekers buffer. +func (ws *writeSeeker) Bytes() []byte { return ws.buf } -func (m *WriterSeeker) Write(p []byte) (n int, err error) { - minCap := m.pos + len(p) - if minCap > cap(m.buf) { // Make sure buf has enough capacity: - buf2 := make([]byte, len(m.buf), minCap+len(p)) // add some extra - copy(buf2, m.buf) - m.buf = buf2 +// Write writes len(p) bytes from p to the writeSeeker's buf and returns the number +// of bytes written. If less than len(p) bytes are written, an error is returned. +func (ws *writeSeeker) Write(p []byte) (n int, err error) { + minCap := ws.pos + len(p) + if minCap > cap(ws.buf) { // Make sure buf has enough capacity: + buf2 := make([]byte, len(ws.buf), minCap+len(p)) // add some extra + copy(buf2, ws.buf) + ws.buf = buf2 } - if minCap > len(m.buf) { - m.buf = m.buf[:minCap] + if minCap > len(ws.buf) { + ws.buf = ws.buf[:minCap] } - copy(m.buf[m.pos:], p) - m.pos += len(p) + copy(ws.buf[ws.pos:], p) + ws.pos += len(p) return len(p), nil } -func (m *WriterSeeker) Seek(offset int64, whence int) (int64, error) { +// Seek sets the offset for the next Read or Write to offset, interpreted according +// to whence: SeekStart means relative to the start of the file, SeekCurrent means +// relative to the current offset, and SeekEnd means relative to the end. Seek returns +// the new offset relative to the start of the file and an error, if any. +func (ws *writeSeeker) Seek(offset int64, whence int) (int64, error) { newPos, offs := 0, int(offset) switch whence { case io.SeekStart: newPos = offs case io.SeekCurrent: - newPos = m.pos + offs + newPos = ws.pos + offs case io.SeekEnd: - newPos = len(m.buf) + offs + newPos = len(ws.buf) + offs } if newPos < 0 { return 0, errors.New("negative result pos") } - m.pos = newPos + ws.pos = newPos return int64(newPos), nil } -// Decode takes a slice of flac and decodes to wav +// Decode takes buf, a slice of FLAC, and decodes to WAV. If complete decoding +// fails, an error is returned. func Decode(buf []byte) ([]byte, error) { + // Lex and decode the FLAC into a stream to hold audio and properties. r := bytes.NewReader(buf) stream, err := flac.Parse(r) if err != nil { return nil, errors.New("Could not parse FLAC") } - ws := &WriterSeeker{} - enc := wav.NewEncoder(ws, int(stream.Info.SampleRate), int(stream.Info.BitsPerSample), int(stream.Info.NChannels), wavAudioFormat) + + // Create WAV encoder and pass writeSeeker that will store output WAV. + ws := &writeSeeker{} + enc := wav.NewEncoder(ws, int(stream.Info.SampleRate), int(stream.Info.BitsPerSample), int(stream.Info.NChannels), wavFormat) defer enc.Close() + var data []int for { // Decode FLAC audio samples. diff --git a/stream/flac/flac_test.go b/stream/flac/flac_test.go index 13bef836..d69c0494 100644 --- a/stream/flac/flac_test.go +++ b/stream/flac/flac_test.go @@ -1,3 +1,29 @@ +/* +NAME + flac_test.go + +DESCRIPTION + flac_test.go provides utilities to test FLAC audio decoding + +AUTHOR + Saxon Nelson-Milton + +LICENSE + flac_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ package flac import ( From 3ee56bff1a196d67e5338ef3de0e6f98ce1b0475 Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 22 Jan 2019 10:26:22 +1030 Subject: [PATCH 17/29] av/stream/flac: working on cleaning up decode code --- stream/flac/decode.go | 43 ++++++++++++++++++++++++------------------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/stream/flac/decode.go b/stream/flac/decode.go index 42c4dace..5a470370 100644 --- a/stream/flac/decode.go +++ b/stream/flac/decode.go @@ -90,7 +90,8 @@ func (ws *writeSeeker) Seek(offset int64, whence int) (int64, error) { // Decode takes buf, a slice of FLAC, and decodes to WAV. If complete decoding // fails, an error is returned. func Decode(buf []byte) ([]byte, error) { - // Lex and decode the FLAC into a stream to hold audio and properties. + + // Lex the FLAC into a stream to hold audio and it's properties. r := bytes.NewReader(buf) stream, err := flac.Parse(r) if err != nil { @@ -99,17 +100,30 @@ func Decode(buf []byte) ([]byte, error) { // Create WAV encoder and pass writeSeeker that will store output WAV. ws := &writeSeeker{} - enc := wav.NewEncoder(ws, int(stream.Info.SampleRate), int(stream.Info.BitsPerSample), int(stream.Info.NChannels), wavFormat) + sr := int(stream.Info.SampleRate) + bps := int(stream.Info.BitsPerSample) + nc := int(stream.Info.NChannels) + enc := wav.NewEncoder(ws, sr, bps, nc, wavFormat) defer enc.Close() + // Decode FLAC into frames of samples + intBuf := &audio.IntBuffer{ + Format: &audio.Format{NumChannels: nc, SampleRate: sr}, + SourceBitDepth: bps, + } + return decodeFrames(stream, intBuf, enc, ws) +} + +// +func decodeFrames(s *flac.Stream, intBuf *audio.IntBuffer, e *wav.Encoder, ws *writeSeeker) ([]byte, error) { var data []int for { - // Decode FLAC audio samples. - frame, err := stream.ParseNext() - if err != nil { - if err == io.EOF { - break - } + frame, err := s.ParseNext() + + // If we've reached the end of the stream then we can output the writeSeeker's buffer. + if err == io.EOF { + return ws.Bytes(), nil + } else if err != nil { return nil, err } @@ -120,18 +134,9 @@ func Decode(buf []byte) ([]byte, error) { data = append(data, int(subframe.Samples[i])) } } - buf := &audio.IntBuffer{ - Format: &audio.Format{ - NumChannels: int(stream.Info.NChannels), - SampleRate: int(stream.Info.SampleRate), - }, - Data: data, - SourceBitDepth: int(stream.Info.BitsPerSample), - } - if err := enc.Write(buf); err != nil { + intBuf.Data = data + if err := e.Write(intBuf); err != nil { return nil, err } } - - return ws.Bytes(), nil } From da5e13bb5dbcf8c3300e8d3791e2ba039657e287 Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 22 Jan 2019 10:40:40 +1030 Subject: [PATCH 18/29] av/stream/flac: finished cleaning up decode --- stream/flac/decode.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/stream/flac/decode.go b/stream/flac/decode.go index 5a470370..34d42057 100644 --- a/stream/flac/decode.go +++ b/stream/flac/decode.go @@ -114,7 +114,9 @@ func Decode(buf []byte) ([]byte, error) { return decodeFrames(stream, intBuf, enc, ws) } -// +// decodeFrames parses frames from the stream and encodes them into WAV until +// the end of the stream is reached. The bytes from writeSeeker buffer are then +// returned. If any errors occur during encodeing, nil bytes and the error is returned. func decodeFrames(s *flac.Stream, intBuf *audio.IntBuffer, e *wav.Encoder, ws *writeSeeker) ([]byte, error) { var data []int for { From e557734c832731a87fcb3fc327926b0c926a2013 Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 22 Jan 2019 10:45:36 +1030 Subject: [PATCH 19/29] av/stream/flac: added writeseeker tests --- stream/flac/flac_test.go | 44 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/stream/flac/flac_test.go b/stream/flac/flac_test.go index d69c0494..9537d682 100644 --- a/stream/flac/flac_test.go +++ b/stream/flac/flac_test.go @@ -27,6 +27,7 @@ LICENSE package flac import ( + "io" "io/ioutil" "os" "testing" @@ -37,6 +38,49 @@ const ( outFile = "testOut.wav" ) +func TestWriteSeekerWrite(t *testing.T) { + writerSeeker := &writeSeeker{} + var ws io.WriteSeeker = writerSeeker + + ws.Write([]byte("hello")) + if string(writerSeeker.buf) != "hello" { + t.Fail() + } + + ws.Write([]byte(" world")) + if string(writerSeeker.buf) != "hello world" { + t.Fail() + } + +} + +func TestWriteSeekerSeek(t *testing.T) { + writerSeeker := &writeSeeker{} + var ws io.WriteSeeker = writerSeeker + + ws.Write([]byte("hello")) + if string(writerSeeker.buf) != "hello" { + t.Fail() + } + + ws.Write([]byte(" world")) + if string(writerSeeker.buf) != "hello world" { + t.Fail() + } + + ws.Seek(-2, io.SeekEnd) + ws.Write([]byte("k!")) + if string(writerSeeker.buf) != "hello work!" { + t.Fail() + } + + ws.Seek(6, io.SeekStart) + ws.Write([]byte("gopher")) + if string(writerSeeker.buf) != "hello gopher" { + t.Fail() + } +} + func TestDecodeFlac(t *testing.T) { b, err := ioutil.ReadFile(testFile) if err != nil { From 1104c96a2fb1ac5e3f224d9ee4f53dc94a51f0c7 Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 22 Jan 2019 11:15:39 +1030 Subject: [PATCH 20/29] av/stream/flac: saving progress --- stream/flac/flac_test.go | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/stream/flac/flac_test.go b/stream/flac/flac_test.go index 9537d682..79274819 100644 --- a/stream/flac/flac_test.go +++ b/stream/flac/flac_test.go @@ -38,45 +38,51 @@ const ( outFile = "testOut.wav" ) +// TestWriteSeekerWrite checks that basic writing to the ws works as expected. func TestWriteSeekerWrite(t *testing.T) { - writerSeeker := &writeSeeker{} - var ws io.WriteSeeker = writerSeeker + ws := &writeSeeker{} - ws.Write([]byte("hello")) - if string(writerSeeker.buf) != "hello" { - t.Fail() + const tstStr1 = "hello" + ws.Write([]byte(tstStr1)) + got := string(ws.buf) + if got != tstStr1 { + t.Errorf("Write failed, got: %v, want: %v", got, tstStr1) } - ws.Write([]byte(" world")) - if string(writerSeeker.buf) != "hello world" { - t.Fail() + const tstStr2 = " world" + const want = "hello world" + ws.Write([]byte(tstStr2)) + got = string(ws.buf) + if got != want { + t.Errorf("Second write failed, got: %v, want: %v", got, want) } - } +// TestWriteSeekerSeek checks that writing and seeking works as expected, i.e. we +// can write, then seek to a knew place in the buf, and write again, either replacing +// bytes, or appending bytes. func TestWriteSeekerSeek(t *testing.T) { - writerSeeker := &writeSeeker{} - var ws io.WriteSeeker = writerSeeker + ws := &writeSeeker{} ws.Write([]byte("hello")) - if string(writerSeeker.buf) != "hello" { + if string(ws.buf) != "hello" { t.Fail() } ws.Write([]byte(" world")) - if string(writerSeeker.buf) != "hello world" { + if string(ws.buf) != "hello world" { t.Fail() } ws.Seek(-2, io.SeekEnd) ws.Write([]byte("k!")) - if string(writerSeeker.buf) != "hello work!" { + if string(ws.buf) != "hello work!" { t.Fail() } ws.Seek(6, io.SeekStart) ws.Write([]byte("gopher")) - if string(writerSeeker.buf) != "hello gopher" { + if string(ws.buf) != "hello gopher" { t.Fail() } } From 3f41c7b72bb7db6d76dab53b34f55d568c99c80b Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 22 Jan 2019 12:14:40 +1030 Subject: [PATCH 21/29] av/stream/flac: cleaned up testing file --- stream/flac/flac_test.go | 38 ++++++++++++++++++++++++++------------ 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/stream/flac/flac_test.go b/stream/flac/flac_test.go index 79274819..0d8079f7 100644 --- a/stream/flac/flac_test.go +++ b/stream/flac/flac_test.go @@ -64,29 +64,43 @@ func TestWriteSeekerWrite(t *testing.T) { func TestWriteSeekerSeek(t *testing.T) { ws := &writeSeeker{} - ws.Write([]byte("hello")) - if string(ws.buf) != "hello" { - t.Fail() + const tstStr1 = "hello" + want1 := tstStr1 + ws.Write([]byte(tstStr1)) + got := string(ws.buf) + if got != tstStr1 { + t.Errorf("Unexpected output, got: %v, want: %v", got, want1) } - ws.Write([]byte(" world")) - if string(ws.buf) != "hello world" { - t.Fail() + const tstStr2 = " world" + const want2 = tstStr1 + tstStr2 + ws.Write([]byte(tstStr2)) + got = string(ws.buf) + if got != want2 { + t.Errorf("Unexpected output, got: %v, want: %v", got, want2) } + const tstStr3 = "k!" + const want3 = "hello work!" ws.Seek(-2, io.SeekEnd) - ws.Write([]byte("k!")) - if string(ws.buf) != "hello work!" { - t.Fail() + ws.Write([]byte(tstStr3)) + got = string(ws.buf) + if got != want3 { + t.Errorf("Unexpected output, got: %v, want: %v", got, want3) } + const tstStr4 = "gopher" + const want4 = "hello gopher" ws.Seek(6, io.SeekStart) - ws.Write([]byte("gopher")) - if string(ws.buf) != "hello gopher" { - t.Fail() + ws.Write([]byte(tstStr4)) + got = string(ws.buf) + if got != want4 { + t.Errorf("Unexpected output, got: %v, want: %v", got, want4) } } +// TestDecodeFlac checks that we can load a flac file and decode to wav, writing +// to a wav file. func TestDecodeFlac(t *testing.T) { b, err := ioutil.ReadFile(testFile) if err != nil { From 5b131c80f6a65eb060add7611032c63cc7eb9279 Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 22 Jan 2019 12:27:52 +1030 Subject: [PATCH 22/29] av/stream/flac: updated test file directory --- stream/flac/flac_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stream/flac/flac_test.go b/stream/flac/flac_test.go index 0d8079f7..1f8019e5 100644 --- a/stream/flac/flac_test.go +++ b/stream/flac/flac_test.go @@ -34,7 +34,7 @@ import ( ) const ( - testFile = "/home/saxon/Desktop/robot.flac" + testFile = "../../../test/test-data/av/input/robot.flac" outFile = "testOut.wav" ) From e98d7bb62e74f32b5d7790a05710d5ebb146bfcf Mon Sep 17 00:00:00 2001 From: saxon Date: Thu, 7 Feb 2019 19:58:08 +1030 Subject: [PATCH 23/29] created experimentation dir under av, and moved flac package here. created experimentation dir under av, and moved flac pkg here. experimentation/flac: removed wav file --- {stream => experimentation}/flac/decode.go | 0 {stream => experimentation}/flac/flac_test.go | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename {stream => experimentation}/flac/decode.go (100%) rename {stream => experimentation}/flac/flac_test.go (100%) diff --git a/stream/flac/decode.go b/experimentation/flac/decode.go similarity index 100% rename from stream/flac/decode.go rename to experimentation/flac/decode.go diff --git a/stream/flac/flac_test.go b/experimentation/flac/flac_test.go similarity index 100% rename from stream/flac/flac_test.go rename to experimentation/flac/flac_test.go From 19821553e869f0a4f353132dd9f3640b4ba8d9aa Mon Sep 17 00:00:00 2001 From: saxon Date: Fri, 8 Feb 2019 00:14:22 +1030 Subject: [PATCH 24/29] cmd/ts-repair: added required consts and undid changes to mts pkg --- cmd/ts-repair/main.go | 43 ++++++++++++++++++++++++------------- stream/mts/encoder.go | 39 ++++++++++++++++++++++------------ stream/mts/mpegts.go | 49 +------------------------------------------ 3 files changed, 56 insertions(+), 75 deletions(-) diff --git a/cmd/ts-repair/main.go b/cmd/ts-repair/main.go index f3f73bb2..f95d16fe 100644 --- a/cmd/ts-repair/main.go +++ b/cmd/ts-repair/main.go @@ -11,6 +11,21 @@ import ( "github.com/Comcast/gots/packet" ) +const ( + PatPid = 0 + PmtPid = 4096 + VideoPid = 256 + HeadSize = 4 + DefaultAdaptationSize = 2 + AdaptationIdx = 4 + AdaptationControlIdx = 3 + AdaptationBodyIdx = AdaptationIdx + 1 + AdaptationControlMask = 0x30 + DefaultAdaptationBodySize = 1 + DiscontinuityIndicatorMask = 0x80 + DiscontinuityIndicatorIdx = AdaptationIdx + 1 +) + // Various errors that we can encounter. const ( errBadInPath = "No file path provided, or file does not exist" @@ -37,9 +52,9 @@ const ( ) var ccMap = map[int]byte{ - mts.PatPid: 16, - mts.PmtPid: 16, - mts.VideoPid: 16, + PatPid: 16, + PmtPid: 16, + VideoPid: 16, } // packetNo will keep track of the ts packet number for reference. @@ -48,8 +63,8 @@ var packetNo int // Option defines a func that performs an action on p in order to change a ts option. type Option func(p *Packet) -// Packet is a byte array of size mts.PacketSize i.e. 188 bytes. We define this -// to allow us to write receiver funcs for the [mts.PacketSize]byte type. +// Packet is a byte array of size PacketSize i.e. 188 bytes. We define this +// to allow us to write receiver funcs for the [PacketSize]byte type. type Packet [mts.PacketSize]byte // CC returns the CC of p. @@ -78,12 +93,12 @@ func (p *Packet) addAdaptationField(options ...Option) error { return errors.New(errAdaptationPresent) } // Create space for adaptation field. - copy(p[mts.HeadSize+mts.DefaultAdaptationSize:], p[mts.HeadSize:len(p)-mts.DefaultAdaptationSize]) + copy(p[HeadSize+DefaultAdaptationSize:], p[HeadSize:len(p)-DefaultAdaptationSize]) // TODO: seperate into own function // Update adaptation field control. - p[mts.AdaptationControlIdx] &= 0xff ^ mts.AdaptationControlMask - p[mts.AdaptationControlIdx] |= mts.AdaptationControlMask + p[AdaptationControlIdx] &= 0xff ^ AdaptationControlMask + p[AdaptationControlIdx] |= AdaptationControlMask // Default the adaptationfield. p.resetAdaptation() @@ -100,14 +115,14 @@ func (p *Packet) resetAdaptation() error { if !p.hasAdaptation() { return errors.New(errNoAdaptationField) } - p[mts.AdaptationIdx] = mts.DefaultAdaptationBodySize - p[mts.AdaptationBodyIdx] = 0x00 + p[AdaptationIdx] = DefaultAdaptationBodySize + p[AdaptationBodyIdx] = 0x00 return nil } // hasAdaptation returns true if p has an adaptation field and false otherwise. func (p *Packet) hasAdaptation() bool { - afc := p[mts.AdaptationControlIdx] & mts.AdaptationControlMask + afc := p[AdaptationControlIdx] & AdaptationControlMask if afc == 0x20 || afc == 0x30 { return true } else { @@ -119,12 +134,12 @@ func (p *Packet) hasAdaptation() bool { // indicator according to f. func DiscontinuityIndicator(f bool) Option { return func(p *Packet) { - set := byte(mts.DiscontinuityIndicatorMask) + set := byte(DiscontinuityIndicatorMask) if !f { set = 0x00 } - p[mts.DiscontinuityIndicatorIdx] &= 0xff ^ mts.DiscontinuityIndicatorMask - p[mts.DiscontinuityIndicatorIdx] |= mts.DiscontinuityIndicatorMask & set + p[DiscontinuityIndicatorIdx] &= 0xff ^ DiscontinuityIndicatorMask + p[DiscontinuityIndicatorIdx] |= DiscontinuityIndicatorMask & set } } diff --git a/stream/mts/encoder.go b/stream/mts/encoder.go index e49d57b5..02761b91 100644 --- a/stream/mts/encoder.go +++ b/stream/mts/encoder.go @@ -171,6 +171,14 @@ var ( pmtTable = standardPmtTimeLocation.Bytes() ) +const ( + sdtPid = 17 + patPid = 0 + pmtPid = 4096 + videoPid = 256 + streamID = 0xe0 // First video stream ID. +) + // Time related constants. const ( // ptsOffset is the offset added to the clock to determine @@ -205,13 +213,18 @@ func NewEncoder(dst io.Writer, fps float64) *Encoder { ptsOffset: ptsOffset, continuity: map[int]byte{ - PatPid: 0, - PmtPid: 0, - VideoPid: 0, + patPid: 0, + pmtPid: 0, + videoPid: 0, }, } } +const ( + hasPayload = 0x1 + hasAdaptationField = 0x2 +) + const ( hasDTS = 0x1 hasPTS = 0x2 @@ -231,7 +244,7 @@ func (e *Encoder) Encode(nalu []byte) error { // Prepare PES data. pesPkt := pes.Packet{ - StreamID: StreamID, + StreamID: streamID, PDI: hasPTS, PTS: e.pts(), Data: nalu, @@ -243,10 +256,10 @@ func (e *Encoder) Encode(nalu []byte) error { for len(buf) != 0 { pkt := Packet{ PUSI: pusi, - PID: VideoPid, + PID: videoPid, RAI: pusi, - CC: e.ccFor(VideoPid), - AFC: HasAdaptationField | HasPayload, + CC: e.ccFor(videoPid), + AFC: hasAdaptationField | hasPayload, PCRF: pusi, } n := pkt.FillPayload(buf) @@ -275,9 +288,9 @@ func (e *Encoder) writePSI() error { // Write PAT. patPkt := Packet{ PUSI: true, - PID: PatPid, - CC: e.ccFor(PatPid), - AFC: HasPayload, + PID: patPid, + CC: e.ccFor(patPid), + AFC: hasPayload, Payload: patTable, } _, err := e.dst.Write(patPkt.Bytes(e.tsSpace[:PacketSize])) @@ -298,9 +311,9 @@ func (e *Encoder) writePSI() error { // Create mts packet from pmt table. pmtPkt := Packet{ PUSI: true, - PID: PmtPid, - CC: e.ccFor(PmtPid), - AFC: HasPayload, + PID: pmtPid, + CC: e.ccFor(pmtPid), + AFC: hasPayload, Payload: pmtTable, } _, err = e.dst.Write(pmtPkt.Bytes(e.tsSpace[:PacketSize])) diff --git a/stream/mts/mpegts.go b/stream/mts/mpegts.go index 71849513..0bef80d2 100644 --- a/stream/mts/mpegts.go +++ b/stream/mts/mpegts.go @@ -37,53 +37,6 @@ const ( PayloadSize = 176 ) -const ( - SdtPid = 17 - PatPid = 0 - PmtPid = 4096 - VideoPid = 256 - StreamID = 0xe0 // First video stream ID. - HeadSize = 4 - DefaultAdaptationSize = 2 -) - -const ( - AdaptationIdx = 4 - AdaptationControlIdx = 3 - AdaptationBodyIdx = AdaptationIdx + 1 - AdaptationControlMask = 0x30 - DefaultAdaptationBodySize = 1 -) - -const ( - HasPayload = 0x1 - HasAdaptationField = 0x2 -) - -// Adaptation field body masks. -const ( - DiscontinuityIndicatorMask = 0x80 - RandomAccessIndicatorMask = 0x40 - ElementaryStreamPriorityIndicatorMask = 0x20 - ProgramClockReferenceFlagMask = 0x10 - OriginalProgramClockReferenceFlagMask = 0x08 - SplicingPointFlagMask = 0x04 - TransportPrivateDataFlagMask = 0x02 - AdaptationFieldExtensionMask = 0x01 -) - -// Adaptation field body indexes. -const ( - DiscontinuityIndicatorIdx = AdaptationIdx + 1 - RandomAccessIndicatorIdx = AdaptationIdx + 1 - ElementaryStreamPriorityIndicatorIdx = AdaptationIdx + 1 - ProgramClockReferenceFlagIdx = AdaptationIdx + 1 - OriginalProgramClockReferenceFlagIdx = AdaptationIdx + 1 - SplicingPointFlagIdx = AdaptationIdx + 1 - TransportPrivateDataFlagIdx = AdaptationIdx + 1 - AdaptationFieldExtensionFlagIdx = AdaptationIdx + 1 -) - /* The below data struct encapsulates the fields of an MPEG-TS packet. Below is the formatting of an MPEG-TS packet for reference! @@ -182,7 +135,7 @@ func FindPMT(d []byte) (p []byte, i int, err error) { } for i = 0; i < len(d); i += PacketSize { pid := (uint16(d[i+1]&0x1f) << 8) | uint16(d[i+2]) - if pid == PmtPid { + if pid == pmtPid { p = d[i+4 : i+PacketSize] return } From 7b789aed29939b29533df36c9b951f6e0163b6f1 Mon Sep 17 00:00:00 2001 From: saxon Date: Fri, 8 Feb 2019 00:16:19 +1030 Subject: [PATCH 25/29] moved cmd/ts-repair to experimentation --- {cmd => experimentation}/ts-repair/main.go | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename {cmd => experimentation}/ts-repair/main.go (100%) diff --git a/cmd/ts-repair/main.go b/experimentation/ts-repair/main.go similarity index 100% rename from cmd/ts-repair/main.go rename to experimentation/ts-repair/main.go From 383b2962af0a7440184813068602456e17055b86 Mon Sep 17 00:00:00 2001 From: saxon Date: Fri, 8 Feb 2019 00:25:47 +1030 Subject: [PATCH 26/29] experimentation/ts-repair: added description to file header --- experimentation/ts-repair/main.go | 33 +++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/experimentation/ts-repair/main.go b/experimentation/ts-repair/main.go index f95d16fe..bed81f19 100644 --- a/experimentation/ts-repair/main.go +++ b/experimentation/ts-repair/main.go @@ -1,3 +1,36 @@ +/* +NAME + ts-repair/main.go + +DESCRIPTION + This program attempts to repair mpegts discontinuities using one of two methods + as selected by the mode flag. Setting the mode flag to 0 will result in repair + by shifting all CCs such that they are continuous. Setting the mode flag to 1 + will result in repair through setting the discontinuity indicator to true at + packets where a discontinuity exists. + + Specify the input file with the in flag, and the output file with out flag. + +AUTHOR + Saxon A. Nelson-Milton + +LICENSE + mpegts.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). +*/ + package main import ( From 795157577139d0fc37d43ed0eae78bf933a4257b Mon Sep 17 00:00:00 2001 From: saxon Date: Fri, 8 Feb 2019 18:00:23 +1030 Subject: [PATCH 27/29] stream/mts: undo changes to encoder.go stream/rtp/encoder.go: undoing changes --- stream/mts/encoder.go | 118 +++++++++--------------------------------- stream/rtp/encoder.go | 4 +- 2 files changed, 26 insertions(+), 96 deletions(-) diff --git a/stream/mts/encoder.go b/stream/mts/encoder.go index 02761b91..a309eaf9 100644 --- a/stream/mts/encoder.go +++ b/stream/mts/encoder.go @@ -30,9 +30,9 @@ package mts import ( "io" - "sync" "time" + "bitbucket.org/ausocean/av/stream/mts/meta" "bitbucket.org/ausocean/av/stream/mts/pes" "bitbucket.org/ausocean/av/stream/mts/psi" ) @@ -82,93 +82,21 @@ var ( }, }, } - - // standardPmtTimeLocation is a standard PMT with time and location - // descriptors, but time and location fields zeroed out. - standardPmtTimeLocation = psi.PSI{ - Pf: 0x00, - Tid: 0x02, - Ssi: true, - Sl: 0x3e, - Tss: &psi.TSS{ - Tide: 0x01, - V: 0, - Cni: true, - Sn: 0, - Lsn: 0, - Sd: &psi.PMT{ - Pcrpid: 0x0100, - Pil: psi.PmtTimeLocationPil, - Pd: []psi.Desc{ - { - Dt: psi.TimeDescTag, - Dl: psi.TimeDataSize, - Dd: make([]byte, psi.TimeDataSize), - }, - { - Dt: psi.LocationDescTag, - Dl: psi.LocationDataSize, - Dd: make([]byte, psi.LocationDataSize), - }, - }, - Essd: &psi.ESSD{ - St: 0x1b, - Epid: 0x0100, - Esil: 0x00, - }, - }, - }, - } ) const ( psiInterval = 1 * time.Second ) -// timeLocation holds time and location data -type timeLocation struct { - mu sync.RWMutex - time uint64 - location string -} - -// SetTimeStamp sets the time field of a TimeLocation. -func (tl *timeLocation) SetTimeStamp(t uint64) { - tl.mu.Lock() - tl.time = t - tl.mu.Unlock() -} - -// GetTimeStamp returns the location of a TimeLocation. -func (tl *timeLocation) TimeStamp() uint64 { - tl.mu.RLock() - t := tl.time - tl.mu.RUnlock() - return t -} - -// SetLocation sets the location of a TimeLocation. -func (tl *timeLocation) SetLocation(l string) { - tl.mu.Lock() - tl.location = l - tl.mu.Unlock() -} - -// GetLocation returns the location of a TimeLocation. -func (tl *timeLocation) Location() string { - tl.mu.RLock() - l := tl.location - tl.mu.RUnlock() - return l -} - -// MetData will hold time and location data which may be set externally if -// this data is available. It is then inserted into mpegts packets outputted. -var MetaData timeLocation +// Meta allows addition of metadata to encoded mts from outside of this pkg. +// See meta pkg for usage. +// +// TODO: make this not global. +var Meta *meta.Data var ( patTable = standardPat.Bytes() - pmtTable = standardPmtTimeLocation.Bytes() + pmtTable = standardPmt.Bytes() ) const ( @@ -288,33 +216,27 @@ func (e *Encoder) writePSI() error { // Write PAT. patPkt := Packet{ PUSI: true, - PID: patPid, - CC: e.ccFor(patPid), - AFC: hasPayload, - Payload: patTable, + PID: PatPid, + CC: e.ccFor(PatPid), + AFC: HasPayload, + Payload: psi.AddPadding(patTable), } _, err := e.dst.Write(patPkt.Bytes(e.tsSpace[:PacketSize])) if err != nil { return err } - - // Update pmt table time and location. - err = psi.UpdateTime(pmtTable, MetaData.TimeStamp()) + pmtTable, err = updateMeta(pmtTable) if err != nil { return err } - err = psi.UpdateLocation(pmtTable, MetaData.Location()) - if err != nil { - return nil - } // Create mts packet from pmt table. pmtPkt := Packet{ PUSI: true, - PID: pmtPid, - CC: e.ccFor(pmtPid), - AFC: hasPayload, - Payload: pmtTable, + PID: PmtPid, + CC: e.ccFor(PmtPid), + AFC: HasPayload, + Payload: psi.AddPadding(pmtTable), } _, err = e.dst.Write(pmtPkt.Bytes(e.tsSpace[:PacketSize])) if err != nil { @@ -345,3 +267,11 @@ func (e *Encoder) ccFor(pid int) byte { e.continuity[pid] = (cc + 1) & continuityCounterMask return cc } + +// updateMeta adds/updates a metaData descriptor in the given psi bytes using data +// contained in the global Meta struct. +func updateMeta(b []byte) ([]byte, error) { + p := psi.PSIBytes(b) + err := p.AddDescriptor(psi.MetadataTag, Meta.Encode()) + return []byte(p), err +} diff --git a/stream/rtp/encoder.go b/stream/rtp/encoder.go index 329a24c0..20df9434 100644 --- a/stream/rtp/encoder.go +++ b/stream/rtp/encoder.go @@ -73,8 +73,8 @@ func NewEncoder(dst io.Writer, fps int) *Encoder { func (e *Encoder) Write(data []byte) (int, error) { e.buffer = append(e.buffer, data...) for len(e.buffer) >= sendLen { - e.Encode(e.buffer) - e.buffer = e.buffer[:0] + e.Encode(e.buffer[:sendLen]) + e.buffer = e.buffer[sendLen:] } return len(data), nil } From 020428db620cebb1d0072bdc2129d4a5aae2f4cc Mon Sep 17 00:00:00 2001 From: saxon Date: Fri, 8 Feb 2019 18:12:03 +1030 Subject: [PATCH 28/29] revid/revid.go: checking err from ring buffer write within destinations length check --- revid/revid.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index 454a3b8a..aa81e8fd 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -145,10 +145,17 @@ func (p *packer) Write(frame []byte) (int, error) { p.owner.config.Logger.Log(logger.Warning, pkg+"frame was too big", "frame size", len(frame)) return len(frame), nil } - var n int - var err error + if len(p.owner.destination) != 0 { - n, err = p.owner.buffer.Write(frame) + n, err := p.owner.buffer.Write(frame) + if err != nil { + if err == ring.ErrDropped { + p.owner.config.Logger.Log(logger.Warning, pkg+"dropped frame", "frame size", len(frame)) + return len(frame), nil + } + p.owner.config.Logger.Log(logger.Error, pkg+"unexpected ring buffer write error", "error", err.Error()) + return n, err + } } // If we have an rtp sender bypass ringbuffer and give straight to sender @@ -158,14 +165,7 @@ func (p *packer) Write(frame []byte) (int, error) { p.owner.config.Logger.Log(logger.Error, pkg+"rtp send failed with error", "error", err.Error()) } } - if err != nil { - if err == ring.ErrDropped { - p.owner.config.Logger.Log(logger.Warning, pkg+"dropped frame", "frame size", len(frame)) - return len(frame), nil - } - p.owner.config.Logger.Log(logger.Error, pkg+"unexpected ring buffer write error", "error", err.Error()) - return n, err - } + p.packetCount++ var hasRtmp bool for _, d := range p.owner.config.Outputs { From ad04893432ea2c90ad73c5185fbe9b6ba2c94c60 Mon Sep 17 00:00:00 2001 From: saxon Date: Fri, 8 Feb 2019 18:14:33 +1030 Subject: [PATCH 29/29] revid/revid.go: intialising err from rtp send --- revid/revid.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/revid/revid.go b/revid/revid.go index 6d99ac76..e426272a 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -160,7 +160,7 @@ func (p *packer) Write(frame []byte) (int, error) { // If we have an rtp sender bypass ringbuffer and give straight to sender if p.owner.rtpSender != nil { - err = p.owner.rtpSender.send(frame) + err := p.owner.rtpSender.send(frame) if err != nil { p.owner.config.Logger.Log(logger.Error, pkg+"rtp send failed with error", "error", err.Error()) }