From e772d37a1b383ff77a6207f4e95a96d42ab3c799 Mon Sep 17 00:00:00 2001 From: scruzin Date: Thu, 10 Jan 2019 12:48:31 +1030 Subject: [PATCH 1/4] Warn about EOF errors. --- rtmp/packet.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/rtmp/packet.go b/rtmp/packet.go index 8980cf5d..79bd99fe 100644 --- a/rtmp/packet.go +++ b/rtmp/packet.go @@ -35,6 +35,7 @@ LICENSE package rtmp import ( + "io" "encoding/binary" ) @@ -101,6 +102,9 @@ func readPacket(s *Session, pkt *packet) error { err := readN(s, header[:1]) if err != nil { s.log(DebugLevel, pkg+"failed to read packet header 1st byte", "error", err.Error()) + if err == io.EOF { + s.log(WarnLevel, pkg+"EOF error; connection likely terminated") + } return err } pkt.headerType = (header[0] & 0xc0) >> 6 @@ -415,7 +419,6 @@ func sendPacket(s *Session, pkt *packet, queue bool) error { if s.deferred != nil { // Prepend the previously deferred packet and write it with the current one. bytes = append(s.deferred, bytes...) - s.deferred = nil } err := writeN(s, bytes) if err != nil { From 3dbaa810fc6d483293ad3d006d20f9a8f7028b3e Mon Sep 17 00:00:00 2001 From: scruzin Date: Thu, 10 Jan 2019 12:49:52 +1030 Subject: [PATCH 2/4] Additional logging. --- rtmp/rtmp.go | 43 ++++++++++++++++++++----------------------- 1 file changed, 20 insertions(+), 23 deletions(-) diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index eb0d5621..2c6300d5 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -191,40 +191,38 @@ func connect(s *Session) error { // connectStream reads a packet and handles it func connectStream(s *Session) error { - var pkt packet - + var err error for !s.isPlaying && s.isConnected() { - err := readPacket(s, &pkt) + pkt := packet{} + err = readPacket(s, &pkt) if err != nil { break } - if pkt.bodySize == 0 { continue } - if pkt.packetType == RTMP_PACKET_TYPE_AUDIO || pkt.packetType == RTMP_PACKET_TYPE_VIDEO || pkt.packetType == RTMP_PACKET_TYPE_INFO { s.log(DebugLevel, pkg+"got packet before play; ignoring") - pkt.body = nil continue } - handlePacket(s, &pkt) - pkt.body = nil + err = handlePacket(s, &pkt) + if err != nil { + break + } } if !s.isPlaying { - return errConnStream + return err } return nil } // handlePacket handles a packet that the client has received. // NB: cases have been commented out that are not currently used by AusOcean -func handlePacket(s *Session, pkt *packet) int32 { - var hasMediaPacket int32 +func handlePacket(s *Session, pkt *packet) error { switch pkt.packetType { case RTMP_PACKET_TYPE_CHUNK_SIZE: @@ -266,7 +264,7 @@ func handlePacket(s *Session, pkt *packet) int32 { if err != nil { // This will never happen with the methods we implement. s.log(WarnLevel, pkg+"unexpected error from handleInvoke", "error", err.Error()) - hasMediaPacket = 2 + return err } case RTMP_PACKET_TYPE_FLASH_VIDEO: @@ -275,7 +273,7 @@ func handlePacket(s *Session, pkt *packet) int32 { default: s.log(WarnLevel, pkg+"unknown packet type", "type", pkt.packetType) } - return hasMediaPacket + return nil } func readN(s *Session, buf []byte) error { @@ -286,7 +284,6 @@ func readN(s *Session, buf []byte) error { n, err := io.ReadFull(s.link.conn, buf) if err != nil { s.log(DebugLevel, pkg+"read failed", "error", err.Error()) - s.close() return err } s.nBytesIn += int32(n) @@ -308,7 +305,6 @@ func writeN(s *Session, buf []byte) error { _, err = s.link.conn.Write(buf) if err != nil { s.log(WarnLevel, pkg+"write failed", "error", err.Error()) - s.close() return err } return nil @@ -404,12 +400,10 @@ func sendConnectPacket(s *Session) error { } } - if copy(enc, []byte{0, 0, AMF_OBJECT_END}) != 3 { - return errCopying // TODO: is this even possible? - } + copy(enc, []byte{0, 0, AMF_OBJECT_END}) enc = enc[3:] - /* add auth string */ + // add auth string if s.link.auth != "" { enc = C_AMF_EncodeBoolean(enc, s.link.lFlags&RTMP_LF_AUTH != 0) if enc == nil { @@ -430,7 +424,7 @@ func sendConnectPacket(s *Session) error { pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return sendPacket(s, &pkt, true) + return sendPacket(s, &pkt, true) // response expected } func sendCreateStream(s *Session) error { @@ -458,7 +452,7 @@ func sendCreateStream(s *Session) error { pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return sendPacket(s, &pkt, true) + return sendPacket(s, &pkt, true) // response expected } func sendReleaseStream(s *Session) error { @@ -589,7 +583,7 @@ func sendPublish(s *Session) error { pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return sendPacket(s, &pkt, true) + return sendPacket(s, &pkt, true) // response expected } func sendDeleteStream(s *Session, dStreamId float64) error { @@ -779,6 +773,7 @@ func handleInvoke(s *Session, body []byte) error { s.log(FatalLevel, pkg+"unsupported method av_NetStream_Play_Start/av_NetStream_Play_PublishNotify") case av_NetStream_Publish_Start: + s.log(DebugLevel, pkg+"playing") s.isPlaying = true for i, m := range s.methodCalls { if m.name == av_publish { @@ -786,6 +781,7 @@ func handleInvoke(s *Session, body []byte) error { break } } + // ToDo: handle case when av_publish method not found case av_NetStream_Play_Complete, av_NetStream_Play_Stop, av_NetStream_Play_UnpublishNotify: s.log(FatalLevel, pkg+"unsupported method av_NetStream_Play_Complete/av_NetStream_Play_Stop/av_NetStream_Play_UnpublishNotify") @@ -825,14 +821,15 @@ func handshake(s *Session) error { if err != nil { return err } + s.log(DebugLevel, pkg+"handshake sent") var typ [1]byte err = readN(s, typ[:]) if err != nil { return err } + s.log(DebugLevel, pkg+"handshake received") - s.log(DebugLevel, pkg+"handshake", "received", typ[0]) if typ[0] != clientbuf[0] { s.log(WarnLevel, pkg+"handshake type mismatch", "sent", clientbuf[0], "received", typ) } From 97964650186f3c3863f176cbe9e44f963545bbde Mon Sep 17 00:00:00 2001 From: scruzin Date: Thu, 10 Jan 2019 12:51:18 +1030 Subject: [PATCH 3/4] Exit if RTMP_TEST_KEY not defined. --- rtmp/rtmp_test.go | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/rtmp/rtmp_test.go b/rtmp/rtmp_test.go index ea5f687c..2dc6781d 100644 --- a/rtmp/rtmp_test.go +++ b/rtmp/rtmp_test.go @@ -29,10 +29,10 @@ package rtmp import ( "fmt" - "io/ioutil" "os" "runtime" "testing" + "io/ioutil" ) const ( @@ -81,9 +81,9 @@ func testLog(level int8, msg string, params ...interface{}) { func TestKey(t *testing.T) { testLog(0, "TestKey") - testKey := os.Getenv("RTMP_TEST_KEY") + testKey = os.Getenv("RTMP_TEST_KEY") if testKey == "" { - t.Errorf("RTMP_TEST_KEY environment variable not defined") + fmt.Printf("RTMP_TEST_KEY environment variable not defined\n") os.Exit(1) } testLog(0, "Testing against URL "+testBaseURL+testKey) @@ -118,25 +118,35 @@ func TestSetupURL(t *testing.T) { } } +func TestOpen(t *testing.T) { + testLog(0, "TestOpen") + s := NewSession(testBaseURL+testKey, testTimeout, testLog) + err := setupURL(s, s.url) + if err != nil { + t.Errorf("setupURL failed with error: %v", err) + } + s.enableWrite() + err = s.Open() + if err != nil { + t.Errorf("connect failed with error: %v", err) + } +} + func TestOpenClose(t *testing.T) { testLog(0, "TestOpenClose") s := NewSession(testBaseURL+testKey, testTimeout, testLog) err := s.Open() if err != nil { - t.Errorf("Session.Open failed with error: %v", err) + t.Errorf("Open failed with error: %v", err) return } - err = s.Close() - if err != nil { - t.Errorf("Session.Close failed with error: %v", err) - } } func TestFromFile(t *testing.T) { testLog(0, "TestFromFile") testFile := os.Getenv("RTMP_TEST_FILE") if testKey == "" { - t.Errorf("RTMP_TEST_FILE environment variable not defined") + fmt.Printf("RTMP_TEST_FILE environment variable not defined\n") os.Exit(1) } s := NewSession(testBaseURL+testKey, testTimeout, testLog) @@ -150,6 +160,7 @@ func TestFromFile(t *testing.T) { } // ToDo: rate limit writing n, err := s.Write(video) + if err != nil { t.Errorf("Session.Write failed with error: %v", err) } From 2333d1953e4593f843c1642315b51915f7e82e87 Mon Sep 17 00:00:00 2001 From: scruzin Date: Thu, 10 Jan 2019 13:23:12 +1030 Subject: [PATCH 4/4] packetSize -> headerSizes. --- rtmp/packet.go | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/rtmp/packet.go b/rtmp/packet.go index 79bd99fe..6d33731c 100644 --- a/rtmp/packet.go +++ b/rtmp/packet.go @@ -68,8 +68,12 @@ const ( RTMP_CHANNEL_SOURCE = 0x04 ) -// packetSize defines valid packet sizes. -var packetSize = [...]int{12, 8, 4, 1} +// headerSizes defines header sizes for header types 0, 1, 2 and 3 respectively: +// 0: full header (12 bytes) +// 1: header without message ID (8 bytes) +// 2: basic header + timestamp (4 byes) +// 3: basic header (chunk type and stream ID) (1 byte) +var headerSizes = [...]int{12, 8, 4, 1} // packet defines an RTMP packet. type packet struct { @@ -154,7 +158,7 @@ func readPacket(s *Session, pkt *packet) error { s.channelsAllocatedIn = n } - size := packetSize[pkt.headerType] + size := headerSizes[pkt.headerType] switch { case size == RTMP_LARGE_HEADER_SIZE: pkt.hasAbsTimestamp = true @@ -308,7 +312,7 @@ func sendPacket(s *Session, pkt *packet, queue bool) error { if pkt.body != nil { // Span from -packetsize for the type to the start of the body. headBytes = pkt.header - origIdx = RTMP_MAX_HEADER_SIZE - packetSize[pkt.headerType] + origIdx = RTMP_MAX_HEADER_SIZE - headerSizes[pkt.headerType] } else { // Allocate a new header and allow 6 bytes of movement backward. var hbuf [RTMP_MAX_HEADER_SIZE]byte @@ -324,7 +328,7 @@ func sendPacket(s *Session, pkt *packet, queue bool) error { cSize = 1 } - hSize := packetSize[pkt.headerType] + hSize := headerSizes[pkt.headerType] if cSize != 0 { origIdx -= cSize hSize += cSize @@ -365,7 +369,7 @@ func sendPacket(s *Session, pkt *packet, queue bool) error { } } - if packetSize[pkt.headerType] > 1 { + if headerSizes[pkt.headerType] > 1 { res := ts if ts > 0xffffff { res = 0xffffff @@ -374,14 +378,14 @@ func sendPacket(s *Session, pkt *packet, queue bool) error { headerIdx += 3 // 24bits } - if packetSize[pkt.headerType] > 4 { + if headerSizes[pkt.headerType] > 4 { C_AMF_EncodeInt24(headBytes[headerIdx:], int32(pkt.bodySize)) headerIdx += 3 // 24bits headBytes[headerIdx] = pkt.packetType headerIdx++ } - if packetSize[pkt.headerType] > 8 { + if headerSizes[pkt.headerType] > 8 { n := int(encodeInt32LE(headBytes[headerIdx:headerIdx+4], pkt.info)) headerIdx += n }