diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 1c2b9c90..ac02572c 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -37,6 +37,8 @@ import ( "time" "bitbucket.org/ausocean/av/revid" + "bitbucket.org/ausocean/av/stream/mts" + "bitbucket.org/ausocean/av/stream/mts/meta" "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/iot/pi/smartlogger" "bitbucket.org/ausocean/utils/logger" @@ -72,7 +74,14 @@ var canProfile = true // The logger that will be used throughout var log *logger.Logger +const ( + metaPreambleKey = "copyright" + metaPreambleData = "ausocean.org/license/content2019" +) + func main() { + mts.Meta = meta.NewWith([][2]string{{metaPreambleKey, metaPreambleData}}) + useNetsender := flag.Bool("NetSender", false, "Are we checking vars through netsender?") runDurationPtr := flag.Duration("runDuration", defaultRunDuration, "How long do you want revid to run for?") diff --git a/revid/senders.go b/revid/senders.go index a73523d5..e86cb24b 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -35,6 +35,7 @@ import ( "net" "os" "os/exec" + "strconv" "bitbucket.org/ausocean/av/rtmp" "bitbucket.org/ausocean/av/stream/mts" @@ -172,7 +173,7 @@ func (s *httpSender) extractMeta(r string) error { s.log(logger.Warning, pkg+"No timestamp in reply") } else { s.log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t)) - mts.MetaData.SetTimeStamp(uint64(t)) + mts.Meta.Add("ts", strconv.Itoa(t)) } // Extract location from reply @@ -181,7 +182,7 @@ func (s *httpSender) extractMeta(r string) error { s.log(logger.Warning, pkg+"No location in reply") } else { s.log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g)) - mts.MetaData.SetLocation(g) + mts.Meta.Add("loc", g) } return nil diff --git a/stream/mts/encoder.go b/stream/mts/encoder.go index 02761b91..a309eaf9 100644 --- a/stream/mts/encoder.go +++ b/stream/mts/encoder.go @@ -30,9 +30,9 @@ package mts import ( "io" - "sync" "time" + "bitbucket.org/ausocean/av/stream/mts/meta" "bitbucket.org/ausocean/av/stream/mts/pes" "bitbucket.org/ausocean/av/stream/mts/psi" ) @@ -82,93 +82,21 @@ var ( }, }, } - - // standardPmtTimeLocation is a standard PMT with time and location - // descriptors, but time and location fields zeroed out. - standardPmtTimeLocation = psi.PSI{ - Pf: 0x00, - Tid: 0x02, - Ssi: true, - Sl: 0x3e, - Tss: &psi.TSS{ - Tide: 0x01, - V: 0, - Cni: true, - Sn: 0, - Lsn: 0, - Sd: &psi.PMT{ - Pcrpid: 0x0100, - Pil: psi.PmtTimeLocationPil, - Pd: []psi.Desc{ - { - Dt: psi.TimeDescTag, - Dl: psi.TimeDataSize, - Dd: make([]byte, psi.TimeDataSize), - }, - { - Dt: psi.LocationDescTag, - Dl: psi.LocationDataSize, - Dd: make([]byte, psi.LocationDataSize), - }, - }, - Essd: &psi.ESSD{ - St: 0x1b, - Epid: 0x0100, - Esil: 0x00, - }, - }, - }, - } ) const ( psiInterval = 1 * time.Second ) -// timeLocation holds time and location data -type timeLocation struct { - mu sync.RWMutex - time uint64 - location string -} - -// SetTimeStamp sets the time field of a TimeLocation. -func (tl *timeLocation) SetTimeStamp(t uint64) { - tl.mu.Lock() - tl.time = t - tl.mu.Unlock() -} - -// GetTimeStamp returns the location of a TimeLocation. -func (tl *timeLocation) TimeStamp() uint64 { - tl.mu.RLock() - t := tl.time - tl.mu.RUnlock() - return t -} - -// SetLocation sets the location of a TimeLocation. -func (tl *timeLocation) SetLocation(l string) { - tl.mu.Lock() - tl.location = l - tl.mu.Unlock() -} - -// GetLocation returns the location of a TimeLocation. -func (tl *timeLocation) Location() string { - tl.mu.RLock() - l := tl.location - tl.mu.RUnlock() - return l -} - -// MetData will hold time and location data which may be set externally if -// this data is available. It is then inserted into mpegts packets outputted. -var MetaData timeLocation +// Meta allows addition of metadata to encoded mts from outside of this pkg. +// See meta pkg for usage. +// +// TODO: make this not global. +var Meta *meta.Data var ( patTable = standardPat.Bytes() - pmtTable = standardPmtTimeLocation.Bytes() + pmtTable = standardPmt.Bytes() ) const ( @@ -288,33 +216,27 @@ func (e *Encoder) writePSI() error { // Write PAT. patPkt := Packet{ PUSI: true, - PID: patPid, - CC: e.ccFor(patPid), - AFC: hasPayload, - Payload: patTable, + PID: PatPid, + CC: e.ccFor(PatPid), + AFC: HasPayload, + Payload: psi.AddPadding(patTable), } _, err := e.dst.Write(patPkt.Bytes(e.tsSpace[:PacketSize])) if err != nil { return err } - - // Update pmt table time and location. - err = psi.UpdateTime(pmtTable, MetaData.TimeStamp()) + pmtTable, err = updateMeta(pmtTable) if err != nil { return err } - err = psi.UpdateLocation(pmtTable, MetaData.Location()) - if err != nil { - return nil - } // Create mts packet from pmt table. pmtPkt := Packet{ PUSI: true, - PID: pmtPid, - CC: e.ccFor(pmtPid), - AFC: hasPayload, - Payload: pmtTable, + PID: PmtPid, + CC: e.ccFor(PmtPid), + AFC: HasPayload, + Payload: psi.AddPadding(pmtTable), } _, err = e.dst.Write(pmtPkt.Bytes(e.tsSpace[:PacketSize])) if err != nil { @@ -345,3 +267,11 @@ func (e *Encoder) ccFor(pid int) byte { e.continuity[pid] = (cc + 1) & continuityCounterMask return cc } + +// updateMeta adds/updates a metaData descriptor in the given psi bytes using data +// contained in the global Meta struct. +func updateMeta(b []byte) ([]byte, error) { + p := psi.PSIBytes(b) + err := p.AddDescriptor(psi.MetadataTag, Meta.Encode()) + return []byte(p), err +} diff --git a/stream/mts/meta/meta.go b/stream/mts/meta/meta.go new file mode 100644 index 00000000..66790315 --- /dev/null +++ b/stream/mts/meta/meta.go @@ -0,0 +1,185 @@ +/* +NAME + meta.go + +DESCRIPTION + See Readme.md + +AUTHOR + Saxon Nelson-Milton + +LICENSE + meta.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + +package meta + +import ( + "encoding/binary" + "errors" + "strings" + "sync" +) + +// This is the headsize of our metadata string, +// which is encoded int the data body of a pmt descriptor. +const headSize = 4 + +const ( + majVer = 1 + minVer = 0 +) + +// Indices of bytes for uint16 metadata length. +const ( + dataLenIdx = 2 +) + +var ( + errKeyAbsent = errors.New("Key does not exist in map") + errNoHeader = errors.New("Metadata string does not contain header") + errInvalidHeader = errors.New("Metadata string does not contain valid header") +) + +// Metadata provides functionality for the storage and encoding of metadata +// using a map. +type Data struct { + mu sync.RWMutex + data map[string]string + order []string + enc []byte +} + +// New returns a pointer to a new Metadata. +func New() *Data { + return &Data{ + data: make(map[string]string), + enc: []byte{ + 0x00, // Reserved byte + (majVer << 4) | minVer, // MS and LS versions + 0x00, // Data len byte1 + 0x00, // Data len byte2 + }, + } +} + +// NewWith creates a meta.Data and fills map with initial data given. If there +// is repeated key, then the latter overwrites the prior. +func NewWith(data [][2]string) *Data { + m := New() + m.order = make([]string, 0, len(data)) + for _, d := range data { + if _, exists := m.data[d[0]]; !exists { + m.order = append(m.order, d[0]) + } + m.data[d[0]] = d[1] + } + return m +} + +// Add adds metadata with key and val. +func (m *Data) Add(key, val string) { + m.mu.Lock() + defer m.mu.Unlock() + m.data[key] = val + for _, k := range m.order { + if k == key { + return + } + } + m.order = append(m.order, key) + return +} + +// All returns the a copy of the map containing the meta data. +func (m *Data) All() map[string]string { + m.mu.Lock() + cpy := make(map[string]string) + for k, v := range m.data { + cpy[k] = v + } + m.mu.Unlock() + return cpy +} + +// Get returns the meta data for the passed key. +func (m *Data) Get(key string) (val string, ok bool) { + m.mu.Lock() + val, ok = m.data[key] + m.mu.Unlock() + return +} + +// Delete deletes a meta entry in the map and returns error if it doesn’t exist. +func (m *Data) Delete(key string) { + m.mu.Lock() + defer m.mu.Unlock() + if _, ok := m.data[key]; ok { + delete(m.data, key) + for i, k := range m.order { + if k == key { + copy(m.order[:i], m.order[i+1:]) + m.order = m.order[:len(m.order)-1] + break + } + } + return + } + return +} + +// Encode takes the meta data map and encodes into a byte slice with header +// describing the version, length of data and data in TSV format. +func (m *Data) Encode() []byte { + m.enc = m.enc[:headSize] + + // Iterate over map and append entries, only adding tab if we're not on the + // last entry. + var entry string + for i, k := range m.order { + v := m.data[k] + entry += k + "=" + v + if i+1 < len(m.data) { + entry += "\t" + } + } + m.enc = append(m.enc, []byte(entry)...) + + // Calculate and set data length in encoded meta header. + dataLen := len(m.enc[headSize:]) + binary.BigEndian.PutUint16(m.enc[dataLenIdx:dataLenIdx+2], uint16(dataLen)) + return m.enc +} + +// ReadFrom extracts a value from a metadata string d, for the given key. If the +// key is not present in the metadata string, an error is returned. If the +// metadata header is not present in the string, an error is returned. +func Extract(key string, d []byte) (string, error) { + if d[0] != 0 { + return "", errNoHeader + } else if d[0] == 0 && binary.BigEndian.Uint16(d[2:headSize]) != uint16(len(d[headSize:])) { + return "", errInvalidHeader + } + d = d[headSize:] + entries := strings.Split(string(d), "\t") + for _, entry := range entries { + kv := strings.Split(entry, "=") + if kv[0] == key { + return kv[1], nil + } + } + return "", errKeyAbsent +} diff --git a/stream/mts/meta/meta_test.go b/stream/mts/meta/meta_test.go new file mode 100644 index 00000000..e1f9f3b7 --- /dev/null +++ b/stream/mts/meta/meta_test.go @@ -0,0 +1,167 @@ +/* +NAME + meta_test.go + +DESCRIPTION + See Readme.md + +AUTHOR + Saxon Nelson-Milton + +LICENSE + meta_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + +package meta + +import ( + "bytes" + "encoding/binary" + "reflect" + "testing" +) + +const ( + tstKey1 = "loc" + tstData1 = "a,b,c" + tstKey2 = "ts" + tstData2 = "12345678" + tstData3 = "d,e,f" +) + +// TestAddAndGet ensures that we can add metadata and then successfully get it. +func TestAddAndGet(t *testing.T) { + meta := New() + meta.Add(tstKey1, tstData1) + meta.Add(tstKey2, tstData2) + if data, ok := meta.Get(tstKey1); !ok { + t.Errorf("Could not get data for key: %v\n", tstKey1) + if data != tstData1 { + t.Error("Did not get expected data") + } + } + + if data, ok := meta.Get(tstKey2); !ok { + t.Errorf("Could not get data for key: %v", tstKey2) + if data != tstData2 { + t.Error("Did not get expected data") + } + } +} + +// TestUpdate checks that we can use Meta.Add to actually update metadata +// if it already exists in the Meta map. +func TestUpdate(t *testing.T) { + meta := New() + meta.Add(tstKey1, tstData1) + meta.Add(tstKey1, tstData3) + + if data, ok := meta.Get(tstKey1); !ok { + t.Errorf("Could not get data for key: %v\n", tstKey1) + if data != tstData2 { + t.Error(`Data did not correctly update for key "loc"`) + } + } +} + +// TestAll ensures we can get a correct map using Meta.All() after adding some data +func TestAll(t *testing.T) { + meta := New() + tstMap := map[string]string{ + tstKey1: tstData1, + tstKey2: tstData2, + } + + meta.Add(tstKey1, tstData1) + meta.Add(tstKey2, tstData2) + metaMap := meta.All() + + if !reflect.DeepEqual(metaMap, tstMap) { + t.Errorf("Map not correct. Got: %v, want: %v", metaMap, tstMap) + } +} + +// TestGetAbsentKey ensures that we get the expected error when we try to get with +// key that does not yet exist in the Meta map. +func TestGetAbsentKey(t *testing.T) { + meta := New() + + if _, ok := meta.Get(tstKey1); ok { + t.Error("Get for absent key incorrectly returned'ok'") + } +} + +// TestDelete ensures we can remove a data entry in the Meta map. +func TestDelete(t *testing.T) { + meta := New() + meta.Add(tstKey1, tstData1) + meta.Delete(tstKey1) + if _, ok := meta.Get(tstKey1); ok { + t.Error("Get incorrectly returned okay for absent key") + } +} + +// TestEncode checks that we're getting the correct byte slice from Meta.Encode(). +func TestEncode(t *testing.T) { + meta := New() + meta.Add(tstKey1, tstData1) + meta.Add(tstKey2, tstData2) + + dataLen := len(tstKey1+tstData1+tstKey2+tstData2) + 3 + header := [4]byte{ + 0x00, + 0x10, + } + binary.BigEndian.PutUint16(header[2:4], uint16(dataLen)) + expectedOut := append(header[:], []byte( + tstKey1+"="+tstData1+"\t"+ + tstKey2+"="+tstData2)...) + + got := meta.Encode() + if !bytes.Equal(expectedOut, got) { + t.Errorf("Did not get expected out. \nGot : %v, \nwant: %v\n", got, expectedOut) + } +} + +// TestReadFrom checks that we can correctly obtain a value for a partiular key +// from a string of metadata using the ReadFrom func. +func TestReadFrom(t *testing.T) { + tstMeta := append([]byte{0x00, 0x10, 0x00, 0x12}, "loc=a,b,c\tts=12345"...) + + tests := []struct { + key string + want string + }{ + { + "loc", + "a,b,c", + }, + { + "ts", + "12345", + }, + } + + for _, test := range tests { + got, err := Extract(test.key, []byte(tstMeta)) + if err != nil { + t.Errorf("Unexpected err: %v\n", err) + } + if got != test.want { + t.Errorf("Did not get expected out. \nGot : %v, \nwant: %v\n", got, test.want) + } + } +} diff --git a/stream/mts/metaEncode_test.go b/stream/mts/metaEncode_test.go new file mode 100644 index 00000000..e970b7c8 --- /dev/null +++ b/stream/mts/metaEncode_test.go @@ -0,0 +1,102 @@ +/* +NAME + metaEncode_test.go + +DESCRIPTION + See Readme.md + +AUTHOR + Saxon Nelson-Milton + +LICENSE + metaEncode_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + +package mts + +import ( + "bytes" + "testing" + + "bitbucket.org/ausocean/av/stream/mts/meta" + "bitbucket.org/ausocean/av/stream/mts/psi" +) + +const ( + errNotExpectedOut = "Unexpected output. \n Got : %v\n, Want: %v\n" + errUnexpectedErr = "Unexpected error: %v\n" +) + +const fps = 25 + +// TestMetaEncode1 checks that we can externally add a single metadata entry to +// the mts global Meta meta.Data struct and then successfully have the mts encoder +// write this to psi. +func TestMetaEncode1(t *testing.T) { + Meta = meta.New() + var b []byte + buf := bytes.NewBuffer(b) + e := NewEncoder(buf, fps) + Meta.Add("ts", "12345678") + if err := e.writePSI(); err != nil { + t.Errorf(errUnexpectedErr, err.Error()) + } + out := buf.Bytes() + got := out[PacketSize+4:] + + want := []byte{ + 0x00, 0x02, 0xb0, 0x23, 0x00, 0x01, 0xc1, 0x00, 0x00, 0xe1, 0x00, 0xf0, 0x11, + psi.MetadataTag, // Descriptor tag + 0x0f, // Length of bytes to follow + 0x00, 0x10, 0x00, 0x0b, 't', 's', '=', '1', '2', '3', '4', '5', '6', '7', '8', // timestamp + 0x1b, 0xe1, 0x00, 0xf0, 0x00, + } + want = psi.AddCrc(want) + want = psi.AddPadding(want) + if !bytes.Equal(got, want) { + t.Errorf(errNotExpectedOut, got, want) + } +} + +// TestMetaEncode2 checks that we can externally add two metadata entries to the +// Meta meta.Data global and then have the mts encoder successfully encode this +// into psi. +func TestMetaEncode2(t *testing.T) { + Meta = meta.New() + var b []byte + buf := bytes.NewBuffer(b) + e := NewEncoder(buf, fps) + Meta.Add("ts", "12345678") + Meta.Add("loc", "1234,4321,1234") + if err := e.writePSI(); err != nil { + t.Errorf(errUnexpectedErr, err.Error()) + } + out := buf.Bytes() + got := out[PacketSize+4:] + want := []byte{ + 0x00, 0x02, 0xb0, 0x36, 0x00, 0x01, 0xc1, 0x00, 0x00, 0xe1, 0x00, 0xf0, 0x24, + psi.MetadataTag, // Descriptor tag + 0x22, // Length of bytes to follow + 0x00, 0x10, 0x00, 0x1e, 't', 's', '=', '1', '2', '3', '4', '5', '6', '7', '8', '\t', // timestamp + 'l', 'o', 'c', '=', '1', '2', '3', '4', ',', '4', '3', '2', '1', ',', '1', '2', '3', '4', // location + 0x1b, 0xe1, 0x00, 0xf0, 0x00, + } + want = psi.AddCrc(want) + want = psi.AddPadding(want) + if !bytes.Equal(got, want) { + t.Errorf(errNotExpectedOut, got, want) + } +} diff --git a/stream/mts/mpegts.go b/stream/mts/mpegts.go index 0bef80d2..08791af7 100644 --- a/stream/mts/mpegts.go +++ b/stream/mts/mpegts.go @@ -32,11 +32,41 @@ import ( "errors" ) +// General mpegts packet properties. const ( PacketSize = 188 PayloadSize = 176 ) +// Program ID for various types of ts packets. +const ( + SdtPid = 17 + PatPid = 0 + PmtPid = 4096 + VideoPid = 256 +) + +// StreamID is the id of the first stream. +const StreamID = 0xe0 + +// HeadSize is the size of an mpegts packet header. +const HeadSize = 4 + +// Consts relating to adaptation field. +const ( + AdaptationIdx = 4 // Index to the adaptation field (index of AFL). + AdaptationControlIdx = 3 // Index to octet with adaptation field control. + AdaptationFieldsIdx = AdaptationIdx + 1 // Adaptation field index is the index of the adaptation fields. + DefaultAdaptationSize = 2 // Default size of the adaptation field. + AdaptationControlMask = 0x30 // Mask for the adaptation field control in octet 3. +) + +// TODO: make this better - currently doesn't make sense. +const ( + HasPayload = 0x1 + HasAdaptationField = 0x2 +) + /* The below data struct encapsulates the fields of an MPEG-TS packet. Below is the formatting of an MPEG-TS packet for reference! diff --git a/stream/mts/psi/crc.go b/stream/mts/psi/crc.go index a361307d..e0ac7deb 100644 --- a/stream/mts/psi/crc.go +++ b/stream/mts/psi/crc.go @@ -34,16 +34,16 @@ import ( ) // addCrc appends a crc table to a given psi table in bytes -func addCrc(out []byte) []byte { +func AddCrc(out []byte) []byte { t := make([]byte, len(out)+4) copy(t, out) - updateCrc(t) + UpdateCrc(t[1:]) return t } // updateCrc updates the crc of bytes slice, writing the checksum into the last four bytes. -func updateCrc(b []byte) { - crc32 := crc32_Update(0xffffffff, crc32_MakeTable(bits.Reverse32(crc32.IEEE)), b[1:len(b)-4]) +func UpdateCrc(b []byte) { + crc32 := crc32_Update(0xffffffff, crc32_MakeTable(bits.Reverse32(crc32.IEEE)), b[:len(b)-4]) binary.BigEndian.PutUint32(b[len(b)-4:], crc32) } diff --git a/stream/mts/psi/descriptor_test.go b/stream/mts/psi/descriptor_test.go new file mode 100644 index 00000000..94441277 --- /dev/null +++ b/stream/mts/psi/descriptor_test.go @@ -0,0 +1,322 @@ +/* +NAME + descriptor_test.go + +DESCRIPTION + See Readme.md + +AUTHOR + Saxon Nelson-Milton + +LICENSE + descriptor_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ + +package psi + +import ( + "bytes" + "testing" +) + +const ( + errNotExpectedOut = "Did not get expected output: \ngot : %v, \nwant: %v" + errUnexpectedErr = "Unexpected error: %v\n" +) + +var ( + tstPsi1 = PSI{ + Pf: 0x00, + Tid: 0x02, + Ssi: true, + Sl: 0x1c, + Tss: &TSS{ + Tide: 0x01, + V: 0, + Cni: true, + Sn: 0, + Lsn: 0, + Sd: &PMT{ + Pcrpid: 0x0100, // wrong + Pil: 10, + Pd: []Desc{ + { + Dt: TimeDescTag, + Dl: TimeDataSize, + Dd: make([]byte, TimeDataSize), + }, + }, + Essd: &ESSD{ + St: 0x1b, + Epid: 0x0100, + Esil: 0x00, + }, + }, + }, + } + + tstPsi2 = PSI{ + Pf: 0x00, + Tid: 0x02, + Ssi: true, + Sl: 0x12, + Tss: &TSS{ + Tide: 0x01, + V: 0, + Cni: true, + Sn: 0, + Lsn: 0, + Sd: &PMT{ + Pcrpid: 0x0100, + Pil: 0, + Essd: &ESSD{ + St: 0x1b, + Epid: 0x0100, + Esil: 0x00, + }, + }, + }, + } + + tstPsi3 = PSI{ + Pf: 0x00, + Tid: 0x02, + Ssi: true, + Sl: 0x3e, + Tss: &TSS{ + Tide: 0x01, + V: 0, + Cni: true, + Sn: 0, + Lsn: 0, + Sd: &PMT{ + Pcrpid: 0x0100, + Pil: PmtTimeLocationPil, + Pd: []Desc{ + { + Dt: TimeDescTag, + Dl: TimeDataSize, + Dd: make([]byte, TimeDataSize), + }, + { + Dt: LocationDescTag, + Dl: LocationDataSize, + Dd: make([]byte, LocationDataSize), + }, + }, + Essd: &ESSD{ + St: 0x1b, + Epid: 0x0100, + Esil: 0x00, + }, + }, + }, + } +) + +var ( + pmtTimeBytesResizedBigger = []byte{ + 0x00, 0x02, 0xb0, 0x1e, 0x00, 0x01, 0xc1, 0x00, 0x00, 0xe1, 0x00, 0xf0, 0x0c, + TimeDescTag, // Descriptor tag + 0x0a, // Length of bytes to follow + 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, // timestamp + 0x1b, 0xe1, 0x00, 0xf0, 0x00, + } + + pmtTimeBytesResizedSmaller = []byte{ + 0x00, 0x02, 0xb0, 0x1a, 0x00, 0x01, 0xc1, 0x00, 0x00, 0xe1, 0x00, 0xf0, 0x08, + TimeDescTag, // Descriptor tag + 0x06, // Length of bytes to follow + 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, // timestamp + 0x1b, 0xe1, 0x00, 0xf0, 0x00, + } +) + +// TestHasDescriptorExists checks that PSIBytes.HasDescriptor performs as expected +// when the PSI we're interested in has the descriptor of interest. HasDescriptor +// should return the descriptor bytes. +// TODO: HasDescriptor also returns index of descriptor - we should check this. +func TestHasDescriptorExists(t *testing.T) { + p := PSIBytes(tstPsi3.Bytes()) + _, got := p.HasDescriptor(LocationDescTag) + want := []byte{ + LocationDescTag, + LocationDataSize, + } + want = append(want, make([]byte, LocationDataSize)...) + if !bytes.Equal(got, want) { + t.Errorf(errNotExpectedOut, got, want) + } +} + +// TestHasDescriptorAbsent checks that PSIBytes.HasDescriptor performs as expected +// when the PSI does not have the descriptor of interest. HasDescriptor should +// return a nil slice and a negative index. +// TODO: check index here as well. +func TestHasDescriptorAbsent(t *testing.T) { + p := PSIBytes(tstPsi3.Bytes()) + const fakeTag = 236 + _, got := p.HasDescriptor(fakeTag) + var want []byte + if !bytes.Equal(got, want) { + t.Errorf(errNotExpectedOut, got, want) + } +} + +// TestHasDescriptorNone checks that PSIBytes.HasDescriptor behaves as expected +// when the PSI does not have any descriptors. HasDescriptor should return a nil +// slice. +// TODO: again check index here. +func TestHasDescriptorNone(t *testing.T) { + p := PSIBytes(tstPsi2.Bytes()) + _, got := p.HasDescriptor(LocationDescTag) + var want []byte + if !bytes.Equal(got, want) { + t.Errorf(errNotExpectedOut, got, want) + } +} + +// TestProgramInfoLen checks that PSIBytes.ProgramInfoLen correctly extracts +// the program info length from a PSI. +func TestProgramInfoLen(t *testing.T) { + p := PSIBytes(tstPsi1.Bytes()) + got := p.ProgramInfoLen() + want := 10 + if got != want { + t.Errorf(errNotExpectedOut, got, want) + } +} + +// TestDescriptors checks that PSIBytes.descriptors correctly returns the descriptors +// from a PSI when descriptors exist. +func TestDescriptors(t *testing.T) { + p := PSIBytes(tstPsi1.Bytes()) + got := p.descriptors() + want := []byte{ + TimeDescTag, + TimeDataSize, + } + want = append(want, make([]byte, TimeDataSize)...) + if !bytes.Equal(got, want) { + t.Errorf(errNotExpectedOut, got, want) + } +} + +// TestDescriptors checks that PSIBYtes.desriptors correctly returns nil when +// we try to get descriptors from a psi without any descriptors. +func TestDescriptorsNone(t *testing.T) { + p := PSIBytes(tstPsi2.Bytes()) + got := p.descriptors() + var want []byte + if !bytes.Equal(got, want) { + t.Errorf(errNotExpectedOut, got, want) + } +} + +// TestCreateDescriptorEmpty checks that PSIBytes.createDescriptor correctly adds +// a descriptor to the descriptors list in a PSI when it has no descriptors already. +func TestCreateDescriptorEmpty(t *testing.T) { + got := PSIBytes(tstPsi2.Bytes()) + got.createDescriptor(TimeDescTag, make([]byte, TimeDataSize)) + UpdateCrc(got[1:]) + want := PSIBytes(tstPsi1.Bytes()) + if !bytes.Equal(want, got) { + t.Errorf(errNotExpectedOut, got, want) + } +} + +// TestCreateDescriptorNotEmpty checks that PSIBytes.createDescriptor correctly adds +// a descriptor to the descriptors list in a PSI when it already has one with +// a different tag. +func TestCreateDescriptorNotEmpty(t *testing.T) { + got := PSIBytes(tstPsi1.Bytes()) + got.createDescriptor(LocationDescTag, make([]byte, LocationDataSize)) + UpdateCrc(got[1:]) + want := PSIBytes(tstPsi3.Bytes()) + if !bytes.Equal(want, got) { + t.Errorf(errNotExpectedOut, got, want) + } +} + +// TestAddDescriptorEmpty checks that PSIBytes.AddDescriptor correctly adds a descriptor +// when there are no other descriptors present in the PSI. +func TestAddDescriptorEmpty(t *testing.T) { + got := PSIBytes(tstPsi2.Bytes()) + if err := got.AddDescriptor(TimeDescTag, make([]byte, TimeDataSize)); err != nil { + t.Errorf(errUnexpectedErr, err.Error()) + } + want := PSIBytes(tstPsi1.Bytes()) + if !bytes.Equal(got, want) { + t.Errorf(errNotExpectedOut, got, want) + } +} + +// TestAddDescriptorNonEmpty checks that PSIBytes.AddDescriptor correctly adds a +// descriptor when there is already a descriptor of a differing type in a PSI. +func TestAddDescriptorNonEmpty(t *testing.T) { + got := PSIBytes(tstPsi1.Bytes()) + if err := got.AddDescriptor(LocationDescTag, make([]byte, LocationDataSize)); err != nil { + t.Errorf(errUnexpectedErr, err.Error()) + } + want := PSIBytes(tstPsi3.Bytes()) + if !bytes.Equal(got, want) { + t.Errorf(errNotExpectedOut, got, want) + } +} + +// TestAddDescriptorUpdateSame checks that PSIBytes.AddDescriptor correctly updates data in a descriptor +// with the same given tag, with data being the same size. AddDescriptor should just copy new data into +// the descriptors data field. +func TestAddDescriptorUpdateSame(t *testing.T) { + newData := [8]byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08} + want := PSIBytes(tstPsi2.Bytes()) + want.createDescriptor(TimeDescTag, newData[:]) + got := PSIBytes(tstPsi1.Bytes()) + if err := got.AddDescriptor(TimeDescTag, newData[:]); err != nil { + t.Errorf(errUnexpectedErr, err.Error()) + } + if !bytes.Equal(got, want) { + t.Errorf(errNotExpectedOut, got, want) + } +} + +// TestAddDescriptorUpdateBigger checks that PSIBytes.AddDescriptor correctly resizes descriptor with same given tag +// to a bigger size and copies in new data. AddDescriptor should find descriptor with same tag, increase size of psi, +// shift data to make room for update descriptor, and then copy in the new data. +func TestAddDescriptorUpdateBigger(t *testing.T) { + got := PSIBytes(tstPsi1.Bytes()) + if err := got.AddDescriptor(TimeDescTag, []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a}); err != nil { + t.Errorf(errUnexpectedErr, err.Error()) + } + want := AddCrc(pmtTimeBytesResizedBigger) + if !bytes.Equal(got, want) { + t.Errorf(errNotExpectedOut, got, want) + } +} + +// TestAddDescriptorUpdateSmaller checks that PSIBytes.AddDescriptor correctly resizes descriptor with same given tag +// in a psi to a smaller size and copies in new data. AddDescriptor should find tag with same descrtiptor, shift data +// after descriptor upwards, trim the psi to new size, and then copy in new data. +func TestAddDescriptorUpdateSmaller(t *testing.T) { + got := PSIBytes(tstPsi1.Bytes()) + if err := got.AddDescriptor(TimeDescTag, []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06}); err != nil { + t.Errorf(errUnexpectedErr, err.Error()) + } + want := AddCrc(pmtTimeBytesResizedSmaller) + if !bytes.Equal(got, want) { + t.Errorf(errNotExpectedOut, got, want) + } +} diff --git a/stream/mts/psi/helpers.go b/stream/mts/psi/helpers.go index 82d7ce72..b8bab6b5 100644 --- a/stream/mts/psi/helpers.go +++ b/stream/mts/psi/helpers.go @@ -64,15 +64,14 @@ func UpdateTime(dst []byte, t uint64) error { for i := range dst[TimeDataIndx : TimeDataIndx+TimeDataSize] { dst[i+TimeDataIndx] = ts[i] } - updateCrc(dst) + UpdateCrc(dst[1:]) return nil } // SyntaxSecLenFrom takes a byte slice representation of a psi and extracts // it's syntax section length -func SyntaxSecLenFrom(p []byte) (l uint8) { - l = uint8(p[syntaxSecLenIndx]) - crcSize - return +func SyntaxSecLenFrom(p []byte) int { + return int(((p[SyntaxSecLenIdx1] & SyntaxSecLenMask1) << 8) | p[SyntaxSecLenIdx2]) } // TimeFrom takes a byte slice representation of a psi-pmt and extracts it's @@ -112,7 +111,7 @@ func UpdateLocation(d []byte, s string) error { for i := range loc { loc[i] = 0 } - updateCrc(d) + UpdateCrc(d[1:]) return nil } @@ -127,7 +126,7 @@ func trimTo(d []byte, t byte) []byte { // addPadding adds an appropriate amount of padding to a pat or pmt table for // addition to an mpegts packet -func addPadding(d []byte) []byte { +func AddPadding(d []byte) []byte { t := make([]byte, PacketSize) copy(t, d) padding := t[len(d):] diff --git a/stream/mts/psi/psi.go b/stream/mts/psi/psi.go index 09028ba6..c93d3011 100644 --- a/stream/mts/psi/psi.go +++ b/stream/mts/psi/psi.go @@ -26,11 +26,16 @@ LICENSE package psi -const ( - PacketSize = 184 // packet size of a psi. +import ( + "errors" + + "github.com/Comcast/gots/psi" ) -// Lengths of section definitions +// PacketSize of psi (without mpegts header) +const PacketSize = 184 + +// Lengths of section definitions. const ( ESSDDefLen = 5 DescDefLen = 2 @@ -40,13 +45,14 @@ const ( PSIDefLen = 3 ) -// Table Type IDs +// Table Type IDs. const ( patID = 0x00 pmtID = 0x02 ) // Consts relating to time description +// TODO: remove this, we don't do metadata like this anymore. const ( TimeDescTag = 234 TimeTagIndx = 13 @@ -55,6 +61,7 @@ const ( ) // Consts relating to location description +// TODO: remove this, we don't do metadata like this anymore. const ( LocationDescTag = 235 LocationTagIndx = 23 @@ -62,10 +69,35 @@ const ( LocationDataSize = 32 // bytes ) -// Other misc consts +// crc hassh Size +const crcSize = 4 + +// Consts relating to syntax section. const ( - syntaxSecLenIndx = 3 - crcSize = 4 + TotalSyntaxSecLen = 180 + SyntaxSecLenIdx1 = 2 + SyntaxSecLenIdx2 = 3 + SyntaxSecLenMask1 = 0x03 + SectionLenMask1 = 0x03 +) + +// Consts relating to program info len. +const ( + ProgramInfoLenIdx1 = 11 + ProgramInfoLenIdx2 = 12 + ProgramInfoLenMask1 = 0x03 +) + +// DescriptorsIdx is the index that the descriptors start at. +const DescriptorsIdx = ProgramInfoLenIdx2 + 1 + +// MetadataTag is the descriptor tag used for metadata. +const MetadataTag = 0x26 + +// TODO: get rid of these - not a good idea. +type ( + PSIBytes []byte + Descriptor []byte ) // Program specific information @@ -135,8 +167,7 @@ func (p *PSI) Bytes() []byte { out[2] = 0x80 | 0x30 | (0x03 & byte(p.Sl>>8)) out[3] = byte(p.Sl) out = append(out, p.Tss.Bytes()...) - out = addCrc(out) - out = addPadding(out) + out = AddCrc(out) return out } @@ -205,3 +236,135 @@ func asByte(b bool) byte { } return 0x00 } + +// AddDescriptor adds or updates a descriptor in a PSI given a descriptor tag +// and data. If the psi is not a pmt, then an error is returned. If a descriptor +// with the given tag is not found in the psi, room is made and a descriptor with +// given tag and data is created. If a descriptor with the tag is found, the +// descriptor is resized as required and the new data is copied in. +func (p *PSIBytes) AddDescriptor(tag int, data []byte) error { + if psi.TableID(*p) != pmtID { + return errors.New("trying to add descriptor, but not pmt") + } + + i, desc := p.HasDescriptor(tag) + if desc == nil { + err := p.createDescriptor(tag, data) + return err + } + + oldDescLen := desc.len() + oldDataLen := int(desc[1]) + newDataLen := len(data) + newDescLen := 2 + newDataLen + delta := newDescLen - oldDescLen + + // If the old data length is more than the new data length, we need shift data + // after descriptor up, and then trim the psi. If the oldDataLen is less than + // new data then we need reseize psi and shift data down. If same do nothing. + switch { + case oldDataLen > newDataLen: + copy((*p)[i+newDescLen:], (*p)[i+oldDescLen:]) + *p = (*p)[:len(*p)+delta] + case oldDataLen < newDataLen: + tmp := make([]byte, len(*p)+delta) + copy(tmp, *p) + *p = tmp + copy((*p)[i+newDescLen:], (*p)[i+oldDescLen:]) + } + + // Copy in new data + (*p)[i+1] = byte(newDataLen) + copy((*p)[i+2:], data) + + newProgInfoLen := p.ProgramInfoLen() + delta + p.setProgInfoLen(newProgInfoLen) + newSectionLen := int(psi.SectionLength(*p)) + delta + p.setSectionLen(newSectionLen) + UpdateCrc((*p)[1:]) + return nil +} + +// HasDescriptor checks if a descriptor of the given tag exists in a PSI. If the descriptor +// of the given tag exists, an index of this descriptor, as well as the Descriptor is returned. +// If the descriptor of the given tag cannot be found, -1 and a nil slice is returned. +// +// TODO: check if pmt, return error if not ? +func (p *PSIBytes) HasDescriptor(tag int) (int, Descriptor) { + descs := p.descriptors() + if descs == nil { + return -1, nil + } + for i := 0; i < len(descs); i += 2 + int(descs[i+1]) { + if int(descs[i]) == tag { + return i + DescriptorsIdx, descs[i : i+2+int(descs[i+1])] + } + } + return -1, nil +} + +// createDescriptor creates a descriptor in a psi given a tag and data. It does so +// by resizing the psi, shifting existing data down and copying in new descriptor +// in new space. +func (p *PSIBytes) createDescriptor(tag int, data []byte) error { + curProgLen := p.ProgramInfoLen() + oldSyntaxSectionLen := SyntaxSecLenFrom(*p) + if TotalSyntaxSecLen-(oldSyntaxSectionLen+2+len(data)) <= 0 { + return errors.New("Not enough space in psi to create descriptor.") + } + dataLen := len(data) + newDescIdx := DescriptorsIdx + curProgLen + newDescLen := dataLen + 2 + + // Increase size of psi and copy data down to make room for new descriptor. + tmp := make([]byte, len(*p)+newDescLen) + copy(tmp, *p) + *p = tmp + copy((*p)[newDescIdx+newDescLen:], (*p)[newDescIdx:newDescIdx+newDescLen]) + // Set the tag, data len and data of the new desriptor. + (*p)[newDescIdx] = byte(tag) + (*p)[newDescIdx+1] = byte(dataLen) + copy((*p)[newDescIdx+2:newDescIdx+2+dataLen], data) + + // Set length fields and update the psi crc. + addedLen := dataLen + 2 + newProgInfoLen := curProgLen + addedLen + p.setProgInfoLen(newProgInfoLen) + newSyntaxSectionLen := int(oldSyntaxSectionLen) + addedLen + p.setSectionLen(newSyntaxSectionLen) + UpdateCrc((*p)[1:]) + + return nil +} + +// setProgInfoLen sets the program information length in a psi with a pmt. +func (p *PSIBytes) setProgInfoLen(l int) { + (*p)[ProgramInfoLenIdx1] &= 0xff ^ ProgramInfoLenMask1 + (*p)[ProgramInfoLenIdx1] |= byte(l>>8) & ProgramInfoLenMask1 + (*p)[ProgramInfoLenIdx2] = byte(l) +} + +// setSectionLen sets section length in a psi. +func (p *PSIBytes) setSectionLen(l int) { + (*p)[SyntaxSecLenIdx1] &= 0xff ^ SyntaxSecLenMask1 + (*p)[SyntaxSecLenIdx1] |= byte(l>>8) & SyntaxSecLenMask1 + (*p)[SyntaxSecLenIdx2] = byte(l) +} + +// descriptors returns the descriptors in a psi if they exist, otherwise +// a nil slice is returned. +func (p *PSIBytes) descriptors() []byte { + return (*p)[DescriptorsIdx : DescriptorsIdx+p.ProgramInfoLen()] +} + +// len returns the length of a descriptor in bytes. +func (d *Descriptor) len() int { + return int(2 + (*d)[1]) +} + +// ProgramInfoLen returns the program info length of a PSI. +// +// TODO: check if pmt - if not return 0 ? or -1 ? +func (p *PSIBytes) ProgramInfoLen() int { + return int((((*p)[ProgramInfoLenIdx1] & ProgramInfoLenMask1) << 8) | (*p)[ProgramInfoLenIdx2]) +} diff --git a/stream/mts/psi/psi_test.go b/stream/mts/psi/psi_test.go index e437066e..7e3a3104 100644 --- a/stream/mts/psi/psi_test.go +++ b/stream/mts/psi/psi_test.go @@ -282,7 +282,7 @@ var bytesTests = []struct { func TestBytes(t *testing.T) { for _, test := range bytesTests { got := test.input.Bytes() - if !bytes.Equal(got, addPadding(addCrc(test.want))) { + if !bytes.Equal(got, AddCrc(test.want)) { t.Errorf("unexpected error for test %v: got:%v want:%v", test.name, got, test.want) } @@ -301,7 +301,7 @@ func TestTimestampToBytes(t *testing.T) { func TestTimeUpdate(t *testing.T) { cpy := make([]byte, len(pmtTimeBytes1)) copy(cpy, pmtTimeBytes1) - cpy = addCrc(cpy) + cpy = AddCrc(cpy) err := UpdateTime(cpy, tstTime2) cpy = cpy[:len(cpy)-4] if err != nil { @@ -343,7 +343,7 @@ func TestLocationGet(t *testing.T) { func TestLocationUpdate(t *testing.T) { cpy := make([]byte, len(pmtWithMetaTst1)) copy(cpy, pmtWithMetaTst1) - cpy = addCrc(cpy) + cpy = AddCrc(cpy) err := UpdateLocation(cpy, locationTstStr2) cpy = cpy[:len(cpy)-4] if err != nil {