From adfb87dcf972c9933a06b70c50ee73af8f3886d1 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sun, 20 Jan 2019 07:30:24 +1030 Subject: [PATCH 01/14] TypeString now exported (was typeString). --- rtmp/amf/amf.go | 14 +++++++------- rtmp/amf/amf_test.go | 12 ++++++------ 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/rtmp/amf/amf.go b/rtmp/amf/amf.go index 837531bb..7ccfd932 100644 --- a/rtmp/amf/amf.go +++ b/rtmp/amf/amf.go @@ -51,7 +51,7 @@ import ( const ( typeNumber = 0x00 typeBoolean = 0x01 - typeString = 0x02 + TypeString = 0x02 TypeObject = 0x03 typeMovieClip = 0x04 TypeNull = 0x05 @@ -93,7 +93,7 @@ type Property struct { var ( ErrShortBuffer = errors.New("amf: short buffer") // The supplied buffer was too short. ErrInvalidType = errors.New("amf: invalid type") // An invalid type was supplied to the encoder. - ErrUnexpectedType = errors.New("amf: unexpected end") // An unexpected type was encountered while decoding. + ErrUnexpectedType = errors.New("amf: unexpected type") // An unexpected type was encountered while decoding. ErrPropertyNotFound = errors.New("amf: property not found") // The requested property was not found. ) @@ -160,6 +160,7 @@ func EncodeInt32(buf []byte, val uint32) ([]byte, error) { } // EncodeString encodes a string. +// Strings less than 65536 in length are encoded as TypeString, while longer strings are ecodeded as typeLongString. func EncodeString(buf []byte, val string) ([]byte, error) { const typeSize = 1 if len(val) < 65536 && len(val)+typeSize+binary.Size(int16(0)) > len(buf) { @@ -171,7 +172,7 @@ func EncodeString(buf []byte, val string) ([]byte, error) { } if len(val) < 65536 { - buf[0] = typeString + buf[0] = TypeString buf = buf[1:] binary.BigEndian.PutUint16(buf[:2], uint16(len(val))) buf = buf[2:] @@ -263,7 +264,7 @@ func EncodeProperty(prop *Property, buf []byte) ([]byte, error) { return EncodeNumber(buf, prop.Number) case typeBoolean: return EncodeBoolean(buf, prop.Number != 0) - case typeString: + case TypeString: return EncodeString(buf, prop.String) case TypeNull: if len(buf) < 2 { @@ -320,7 +321,7 @@ func DecodeProperty(prop *Property, buf []byte, decodeName bool) (int, error) { prop.Number = float64(buf[0]) buf = buf[1:] - case typeString: + case TypeString: n := DecodeInt16(buf[:2]) if len(buf) < int(n+2) { return 0, ErrShortBuffer @@ -354,7 +355,6 @@ func DecodeProperty(prop *Property, buf []byte, decodeName bool) (int, error) { } // Encode encodes an Object into its AMF representation. -// This is the top-level encoding function and is typically the only function callers will need to use. func Encode(obj *Object, buf []byte) ([]byte, error) { if len(buf) < 5 { return nil, ErrShortBuffer @@ -481,7 +481,7 @@ func (obj *Object) NumberProperty(name string, idx int) (float64, error) { // StringProperty is a wrapper for Property that returns a String property's value, if any. func (obj *Object) StringProperty(name string, idx int) (string, error) { - prop, err := obj.Property(name, idx, typeString) + prop, err := obj.Property(name, idx, TypeString) if err != nil { return "", err } diff --git a/rtmp/amf/amf_test.go b/rtmp/amf/amf_test.go index 59548c09..957e1c7e 100644 --- a/rtmp/amf/amf_test.go +++ b/rtmp/amf/amf_test.go @@ -58,7 +58,7 @@ func TestSanity(t *testing.T) { // TestStrings tests string encoding and decoding. func TestStrings(t *testing.T) { // Short string encoding is as follows: - // enc[0] = data type (typeString) + // enc[0] = data type (TypeString) // end[1:3] = size // enc[3:] = data for _, s := range testStrings { @@ -67,8 +67,8 @@ func TestStrings(t *testing.T) { if err != nil { t.Errorf("EncodeString failed") } - if buf[0] != typeString { - t.Errorf("Expected typeString, got %v", buf[0]) + if buf[0] != TypeString { + t.Errorf("Expected TypeString, got %v", buf[0]) } ds := DecodeString(buf[1:]) if s != ds { @@ -76,7 +76,7 @@ func TestStrings(t *testing.T) { } } // Long string encoding is as follows: - // enc[0] = data type (typeString) + // enc[0] = data type (TypeString) // end[1:5] = size // enc[5:] = data s := string(make([]byte, 65536)) @@ -148,7 +148,7 @@ func TestProperties(t *testing.T) { // Encode/decode string properties. enc = buf[:] for i := range testStrings { - enc, err = EncodeProperty(&Property{Type: typeString, String: testStrings[i]}, enc) + enc, err = EncodeProperty(&Property{Type: TypeString, String: testStrings[i]}, enc) if err != nil { t.Errorf("EncodeProperty of string failed") } @@ -235,7 +235,7 @@ func TestObject(t *testing.T) { // Construct a more complicated object that includes a nested object. var obj2 Object for i := range testStrings { - obj2.Properties = append(obj2.Properties, Property{Type: typeString, String: testStrings[i]}) + obj2.Properties = append(obj2.Properties, Property{Type: TypeString, String: testStrings[i]}) obj2.Properties = append(obj2.Properties, Property{Type: typeNumber, Number: float64(testNumbers[i])}) } obj2.Properties = append(obj2.Properties, Property{Type: TypeObject, Object: obj1}) From 42be87d98a26684b2a40e51fadded6e3bd705587 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sun, 20 Jan 2019 07:33:43 +1030 Subject: [PATCH 02/14] sendConnectPacket() now encodes required link info in one go using amf.Encode(). --- rtmp/rtmp.go | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index bfee3e37..1175e8ce 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -276,26 +276,18 @@ func sendConnectPacket(c *Conn) error { return err } - enc[0] = amf.TypeObject - enc = enc[1:] - enc, err = amf.EncodeNamedString(enc, avApp, c.link.app) - if err != nil { - return err + // required link info + info := amf.Object{Properties: []amf.Property{ + amf.Property{Type: amf.TypeString, Name: avApp, String: c.link.app}, + amf.Property{Type: amf.TypeString, Name: avType, String: avNonprivate}, + amf.Property{Type: amf.TypeString, Name: avTcUrl, String: c.link.url}}, } - enc, err = amf.EncodeNamedString(enc, avType, avNonprivate) - if err != nil { - return err - } - enc, err = amf.EncodeNamedString(enc, avTcUrl, c.link.url) - if err != nil { - return err - } - enc, err = amf.EncodeInt24(enc, amf.TypeObjectEnd) + enc, err = amf.Encode(&info, enc) if err != nil { return err } - // add auth string, if any + // optional link auth info if c.link.auth != "" { enc, err = amf.EncodeBoolean(enc, c.link.flags&linkAuth != 0) if err != nil { From 828cc3780a9005282a5bbaa7a309747870104994 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sun, 20 Jan 2019 07:56:55 +1030 Subject: [PATCH 03/14] Removed superfluous packet.bytesRead. --- rtmp/packet.go | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/rtmp/packet.go b/rtmp/packet.go index 3cf18f14..642e6194 100644 --- a/rtmp/packet.go +++ b/rtmp/packet.go @@ -81,7 +81,7 @@ const ( // 3: basic header (chunk type and stream ID) (1 byte) var headerSizes = [...]int{12, 8, 4, 1} -// packet defines an RTMP packet. +// packet represents an RTMP packet. type packet struct { headerType uint8 packetType uint8 @@ -90,7 +90,6 @@ type packet struct { timestamp uint32 streamID uint32 bodySize uint32 - bytesRead uint32 buf []byte body []byte } @@ -179,7 +178,6 @@ func (pkt *packet) readFrom(c *Conn) error { pkt.timestamp = amf.DecodeInt24(header[:3]) if size >= 6 { pkt.bodySize = amf.DecodeInt24(header[3:6]) - pkt.bytesRead = 0 if size > 6 { pkt.packetType = header[6] @@ -205,21 +203,17 @@ func (pkt *packet) readFrom(c *Conn) error { pkt.resize(pkt.bodySize, (hbuf[0]&0xc0)>>6) } - toRead := pkt.bodySize - pkt.bytesRead - chunkSize := c.inChunkSize - - if toRead < chunkSize { - chunkSize = toRead + sz := c.inChunkSize + if pkt.bodySize < sz { + sz = pkt.bodySize } - _, err = c.read(pkt.body[pkt.bytesRead:][:chunkSize]) + _, err = c.read(pkt.body[:sz]) if err != nil { c.log(DebugLevel, pkg+"failed to read packet body", "error", err.Error()) return err } - pkt.bytesRead += uint32(chunkSize) - // Keep the packet as a reference for other packets on this channel. if c.channelsIn[pkt.channel] == nil { c.channelsIn[pkt.channel] = &packet{} @@ -237,7 +231,6 @@ func (pkt *packet) readFrom(c *Conn) error { c.channelTimestamp[pkt.channel] = int32(pkt.timestamp) c.channelsIn[pkt.channel].body = nil - c.channelsIn[pkt.channel].bytesRead = 0 c.channelsIn[pkt.channel].hasAbsTimestamp = false return nil } From ddd1e4ab1789fe5a66577f3741e80bfbd393155b Mon Sep 17 00:00:00 2001 From: scruzin Date: Sun, 20 Jan 2019 08:09:49 +1030 Subject: [PATCH 04/14] readFrom() now logs a warning if it is reading a large packet (which I suspect never happens). --- rtmp/packet.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/rtmp/packet.go b/rtmp/packet.go index 642e6194..0f3ae241 100644 --- a/rtmp/packet.go +++ b/rtmp/packet.go @@ -203,12 +203,11 @@ func (pkt *packet) readFrom(c *Conn) error { pkt.resize(pkt.bodySize, (hbuf[0]&0xc0)>>6) } - sz := c.inChunkSize - if pkt.bodySize < sz { - sz = pkt.bodySize + if pkt.bodySize > c.inChunkSize { + c.log(WarnLevel, pkg+"reading large packet", "size", int(pkt.bodySize)) } - _, err = c.read(pkt.body[:sz]) + _, err = c.read(pkt.body[:pkt.bodySize]) if err != nil { c.log(DebugLevel, pkg+"failed to read packet body", "error", err.Error()) return err From 969e2f4fa940719e14924faac0ec358066d7fb46 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sun, 20 Jan 2019 08:41:32 +1030 Subject: [PATCH 05/14] packet.resize() now only makes a new buf when necessary. --- rtmp/packet.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/rtmp/packet.go b/rtmp/packet.go index 0f3ae241..a92f7022 100644 --- a/rtmp/packet.go +++ b/rtmp/packet.go @@ -199,9 +199,7 @@ func (pkt *packet) readFrom(c *Conn) error { hSize += 4 } - if pkt.bodySize > 0 && pkt.body == nil { - pkt.resize(pkt.bodySize, (hbuf[0]&0xc0)>>6) - } + pkt.resize(pkt.bodySize, pkt.headerType) if pkt.bodySize > c.inChunkSize { c.log(WarnLevel, pkg+"reading large packet", "size", int(pkt.bodySize)) @@ -234,10 +232,12 @@ func (pkt *packet) readFrom(c *Conn) error { return nil } -// resize adjusts the packet's storage to accommodate a body of the given size and header type. +// resize adjusts the packet's storage (if necessary) to accommodate a body of the given size and header type. // When headerSizeAuto is specified, the header type is computed based on packet type. func (pkt *packet) resize(size uint32, ht uint8) { - pkt.buf = make([]byte, fullHeaderSize+size) + if cap(pkt.buf) < fullHeaderSize+int(size) { + pkt.buf = make([]byte, fullHeaderSize+size) + } pkt.body = pkt.buf[fullHeaderSize:] if ht != headerSizeAuto { pkt.headerType = ht From fbad21bc76af71deca65e924229c6ef643130064 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sun, 20 Jan 2019 08:52:58 +1030 Subject: [PATCH 06/14] connect() now constructs its packet with a buf to avoid needless packet resizing later. --- rtmp/rtmp.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index 1175e8ce..a6897eef 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -186,8 +186,10 @@ func connect(c *Conn) error { return err } c.log(DebugLevel, pkg+"negotiating") + + var buf [256]byte for !c.isPlaying { - pkt := packet{} + pkt := packet{buf: buf[:]} err = pkt.readFrom(c) if err != nil { break From 04ad1fec257ea38e9ad03eeeb0c17f6d16631080 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sun, 20 Jan 2019 09:20:56 +1030 Subject: [PATCH 07/14] Log packet size before la and ra. --- rtmp/packet.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rtmp/packet.go b/rtmp/packet.go index a92f7022..b28a1cba 100644 --- a/rtmp/packet.go +++ b/rtmp/packet.go @@ -399,7 +399,7 @@ func (pkt *packet) writeTo(c *Conn, queue bool) error { return nil } } else { - // Send previously deferrd packet if combining it with the next one would exceed the chunk size. + // Send previously deferred packet if combining it with the next one would exceed the chunk size. if len(c.deferred)+size+hSize > chunkSize { c.log(DebugLevel, pkg+"sending deferred packet separately", "size", len(c.deferred)) _, err := c.write(c.deferred) @@ -411,7 +411,7 @@ func (pkt *packet) writeTo(c *Conn, queue bool) error { } // TODO(kortschak): Rewrite this horrific peice of premature optimisation. - c.log(DebugLevel, pkg+"sending packet", "la", c.link.conn.LocalAddr(), "ra", c.link.conn.RemoteAddr(), "size", size) + c.log(DebugLevel, pkg+"sending packet", "size", size, "la", c.link.conn.LocalAddr(), "ra", c.link.conn.RemoteAddr()) for size+hSize != 0 { if chunkSize > size { chunkSize = size From a362d1d2abb6b02298b572ae793d7e10ab016a52 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sun, 20 Jan 2019 09:50:51 +1030 Subject: [PATCH 08/14] connect() now closes c.link.conn upon an error. --- rtmp/rtmp.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index a6897eef..3352558a 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -174,6 +174,13 @@ func connect(c *Conn) error { return err } c.log(DebugLevel, pkg+"connected") + + defer func() { + if err != nil { + c.link.conn.Close() + } + }() + err = handshake(c) if err != nil { c.log(WarnLevel, pkg+"handshake failed", "error", err.Error()) From efe40a6778e563f9e01447b8ebe015c362f48927 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sun, 20 Jan 2019 09:59:28 +1030 Subject: [PATCH 09/14] Move test for empty RTMP 'app' into parseURL(). --- rtmp/conn.go | 3 --- rtmp/parseurl.go | 4 +++- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/rtmp/conn.go b/rtmp/conn.go index 2554d092..1db2cb6e 100644 --- a/rtmp/conn.go +++ b/rtmp/conn.go @@ -121,9 +121,6 @@ func Dial(url string, timeout uint, log Log) (*Conn, error) { if err != nil { return nil, err } - if c.link.app == "" { - return nil, errInvalidURL - } if c.link.port == 0 { switch { case (c.link.protocol & featureSSL) != 0: diff --git a/rtmp/parseurl.go b/rtmp/parseurl.go index eae4277e..ad3409b6 100644 --- a/rtmp/parseurl.go +++ b/rtmp/parseurl.go @@ -41,7 +41,6 @@ import ( ) // parseURL parses an RTMP URL (ok, technically it is lexing). -// func parseURL(addr string) (protocol int32, host string, port uint16, app, playpath string, err error) { u, err := url.Parse(addr) if err != nil { @@ -81,6 +80,9 @@ func parseURL(addr string) (protocol int32, host string, port uint16, app, playp } elems := strings.SplitN(u.Path[1:], "/", 3) app = elems[0] + if app == "" { + return protocol, host, port, app, playpath, errInvalidURL + } playpath = elems[1] if len(elems) == 3 && len(elems[2]) != 0 { playpath = path.Join(elems[1:]...) From 89d9bf3eef7b91ae9d98f1e0bbafd5b0aa125926 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sun, 20 Jan 2019 10:28:05 +1030 Subject: [PATCH 10/14] Return errors from the loop in connect() rather than breaking. --- rtmp/rtmp.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index 3352558a..6f666785 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -192,14 +192,14 @@ func connect(c *Conn) error { c.log(WarnLevel, pkg+"sendConnect failed", "error", err.Error()) return err } - c.log(DebugLevel, pkg+"negotiating") + c.log(DebugLevel, pkg+"negotiating") var buf [256]byte for !c.isPlaying { pkt := packet{buf: buf[:]} err = pkt.readFrom(c) if err != nil { - break + return err } switch pkt.packetType { @@ -208,14 +208,10 @@ func connect(c *Conn) error { default: err = handlePacket(c, &pkt) if err != nil { - break + return err } } } - - if !c.isPlaying { - return err - } return nil } From 0105c76dc5e029d47f0d858805bfcbd12569bb2a Mon Sep 17 00:00:00 2001 From: scruzin Date: Sun, 20 Jan 2019 13:33:44 +1030 Subject: [PATCH 11/14] Move port defaulting logic to parseURL(). --- rtmp/conn.go | 11 ----------- rtmp/parseurl.go | 10 ++++++++++ 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/rtmp/conn.go b/rtmp/conn.go index 1db2cb6e..3864df98 100644 --- a/rtmp/conn.go +++ b/rtmp/conn.go @@ -121,17 +121,6 @@ func Dial(url string, timeout uint, log Log) (*Conn, error) { if err != nil { return nil, err } - if c.link.port == 0 { - switch { - case (c.link.protocol & featureSSL) != 0: - c.link.port = 433 - c.log(FatalLevel, pkg+"SSL not supported") - case (c.link.protocol & featureHTTP) != 0: - c.link.port = 80 - default: - c.link.port = 1935 - } - } c.link.url = rtmpProtocolStrings[c.link.protocol] + "://" + c.link.host + ":" + strconv.Itoa(int(c.link.port)) + "/" + c.link.app c.link.protocol |= featureWrite diff --git a/rtmp/parseurl.go b/rtmp/parseurl.go index ad3409b6..c2880368 100644 --- a/rtmp/parseurl.go +++ b/rtmp/parseurl.go @@ -99,5 +99,15 @@ func parseURL(addr string) (protocol int32, host string, port uint16, app, playp } } + if port == 0 { + switch { + case (protocol & featureSSL) != 0: + port = 433 + case (protocol & featureHTTP) != 0: + port = 80 + default: + port = 1935 + } + } return protocol, host, port, app, playpath, nil } From 3a70dc6ddc447a8549a1c086aa658aa08fdfafc6 Mon Sep 17 00:00:00 2001 From: scruzin Date: Mon, 21 Jan 2019 10:38:27 +1030 Subject: [PATCH 12/14] parseURL() now returns errUnimplemented for SSL. --- rtmp/parseurl.go | 1 + 1 file changed, 1 insertion(+) diff --git a/rtmp/parseurl.go b/rtmp/parseurl.go index c2880368..cc197fb9 100644 --- a/rtmp/parseurl.go +++ b/rtmp/parseurl.go @@ -103,6 +103,7 @@ func parseURL(addr string) (protocol int32, host string, port uint16, app, playp switch { case (protocol & featureSSL) != 0: port = 433 + return errUnimplemented case (protocol & featureHTTP) != 0: port = 80 default: From 2697dcf51520bb1b33ceb962e0d66f083aeca6e7 Mon Sep 17 00:00:00 2001 From: scruzin Date: Mon, 21 Jan 2019 10:57:40 +1030 Subject: [PATCH 13/14] More elegant switch in parseURL() for port logic. --- rtmp/parseurl.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/rtmp/parseurl.go b/rtmp/parseurl.go index cc197fb9..4fea7d82 100644 --- a/rtmp/parseurl.go +++ b/rtmp/parseurl.go @@ -99,16 +99,15 @@ func parseURL(addr string) (protocol int32, host string, port uint16, app, playp } } - if port == 0 { - switch { - case (protocol & featureSSL) != 0: - port = 433 - return errUnimplemented - case (protocol & featureHTTP) != 0: - port = 80 - default: - port = 1935 - } + switch { + case port != 0: + case (protocol & featureSSL) != 0: + return protocol, host, port, app, playpath, errUnimplemented // port = 433 + case (protocol & featureHTTP) != 0: + port = 80 + default: + port = 1935 } + return protocol, host, port, app, playpath, nil } From 2da8d2af85100797248dcd7a789777266a19e0de Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 22 Jan 2019 13:54:56 +1030 Subject: [PATCH 14/14] av/rtmp/rtmp_test.go: using local rtmpSender io.writer implemntation to handle errors from rtmp --- rtmp/rtmp_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rtmp/rtmp_test.go b/rtmp/rtmp_test.go index 618f2b95..be4908a6 100644 --- a/rtmp/rtmp_test.go +++ b/rtmp/rtmp_test.go @@ -243,8 +243,9 @@ func TestFromFile(t *testing.T) { } defer f.Close() + rs := &rtmpSender{conn: c} // Pass RTMP session, true for audio, true for video, and 25 FPS - flvEncoder, err := flv.NewEncoder(c, true, true, 25) + flvEncoder, err := flv.NewEncoder(rs, true, true, 25) if err != nil { t.Fatalf("failed to create encoder: %v", err) }