diff --git a/rtmp/packet.go b/rtmp/packet.go index 8980cf5d..6d33731c 100644 --- a/rtmp/packet.go +++ b/rtmp/packet.go @@ -35,6 +35,7 @@ LICENSE package rtmp import ( + "io" "encoding/binary" ) @@ -67,8 +68,12 @@ const ( RTMP_CHANNEL_SOURCE = 0x04 ) -// packetSize defines valid packet sizes. -var packetSize = [...]int{12, 8, 4, 1} +// headerSizes defines header sizes for header types 0, 1, 2 and 3 respectively: +// 0: full header (12 bytes) +// 1: header without message ID (8 bytes) +// 2: basic header + timestamp (4 byes) +// 3: basic header (chunk type and stream ID) (1 byte) +var headerSizes = [...]int{12, 8, 4, 1} // packet defines an RTMP packet. type packet struct { @@ -101,6 +106,9 @@ func readPacket(s *Session, pkt *packet) error { err := readN(s, header[:1]) if err != nil { s.log(DebugLevel, pkg+"failed to read packet header 1st byte", "error", err.Error()) + if err == io.EOF { + s.log(WarnLevel, pkg+"EOF error; connection likely terminated") + } return err } pkt.headerType = (header[0] & 0xc0) >> 6 @@ -150,7 +158,7 @@ func readPacket(s *Session, pkt *packet) error { s.channelsAllocatedIn = n } - size := packetSize[pkt.headerType] + size := headerSizes[pkt.headerType] switch { case size == RTMP_LARGE_HEADER_SIZE: pkt.hasAbsTimestamp = true @@ -304,7 +312,7 @@ func sendPacket(s *Session, pkt *packet, queue bool) error { if pkt.body != nil { // Span from -packetsize for the type to the start of the body. headBytes = pkt.header - origIdx = RTMP_MAX_HEADER_SIZE - packetSize[pkt.headerType] + origIdx = RTMP_MAX_HEADER_SIZE - headerSizes[pkt.headerType] } else { // Allocate a new header and allow 6 bytes of movement backward. var hbuf [RTMP_MAX_HEADER_SIZE]byte @@ -320,7 +328,7 @@ func sendPacket(s *Session, pkt *packet, queue bool) error { cSize = 1 } - hSize := packetSize[pkt.headerType] + hSize := headerSizes[pkt.headerType] if cSize != 0 { origIdx -= cSize hSize += cSize @@ -361,7 +369,7 @@ func sendPacket(s *Session, pkt *packet, queue bool) error { } } - if packetSize[pkt.headerType] > 1 { + if headerSizes[pkt.headerType] > 1 { res := ts if ts > 0xffffff { res = 0xffffff @@ -370,14 +378,14 @@ func sendPacket(s *Session, pkt *packet, queue bool) error { headerIdx += 3 // 24bits } - if packetSize[pkt.headerType] > 4 { + if headerSizes[pkt.headerType] > 4 { C_AMF_EncodeInt24(headBytes[headerIdx:], int32(pkt.bodySize)) headerIdx += 3 // 24bits headBytes[headerIdx] = pkt.packetType headerIdx++ } - if packetSize[pkt.headerType] > 8 { + if headerSizes[pkt.headerType] > 8 { n := int(encodeInt32LE(headBytes[headerIdx:headerIdx+4], pkt.info)) headerIdx += n } @@ -415,7 +423,6 @@ func sendPacket(s *Session, pkt *packet, queue bool) error { if s.deferred != nil { // Prepend the previously deferred packet and write it with the current one. bytes = append(s.deferred, bytes...) - s.deferred = nil } err := writeN(s, bytes) if err != nil { diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index eb0d5621..2c6300d5 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -191,40 +191,38 @@ func connect(s *Session) error { // connectStream reads a packet and handles it func connectStream(s *Session) error { - var pkt packet - + var err error for !s.isPlaying && s.isConnected() { - err := readPacket(s, &pkt) + pkt := packet{} + err = readPacket(s, &pkt) if err != nil { break } - if pkt.bodySize == 0 { continue } - if pkt.packetType == RTMP_PACKET_TYPE_AUDIO || pkt.packetType == RTMP_PACKET_TYPE_VIDEO || pkt.packetType == RTMP_PACKET_TYPE_INFO { s.log(DebugLevel, pkg+"got packet before play; ignoring") - pkt.body = nil continue } - handlePacket(s, &pkt) - pkt.body = nil + err = handlePacket(s, &pkt) + if err != nil { + break + } } if !s.isPlaying { - return errConnStream + return err } return nil } // handlePacket handles a packet that the client has received. // NB: cases have been commented out that are not currently used by AusOcean -func handlePacket(s *Session, pkt *packet) int32 { - var hasMediaPacket int32 +func handlePacket(s *Session, pkt *packet) error { switch pkt.packetType { case RTMP_PACKET_TYPE_CHUNK_SIZE: @@ -266,7 +264,7 @@ func handlePacket(s *Session, pkt *packet) int32 { if err != nil { // This will never happen with the methods we implement. s.log(WarnLevel, pkg+"unexpected error from handleInvoke", "error", err.Error()) - hasMediaPacket = 2 + return err } case RTMP_PACKET_TYPE_FLASH_VIDEO: @@ -275,7 +273,7 @@ func handlePacket(s *Session, pkt *packet) int32 { default: s.log(WarnLevel, pkg+"unknown packet type", "type", pkt.packetType) } - return hasMediaPacket + return nil } func readN(s *Session, buf []byte) error { @@ -286,7 +284,6 @@ func readN(s *Session, buf []byte) error { n, err := io.ReadFull(s.link.conn, buf) if err != nil { s.log(DebugLevel, pkg+"read failed", "error", err.Error()) - s.close() return err } s.nBytesIn += int32(n) @@ -308,7 +305,6 @@ func writeN(s *Session, buf []byte) error { _, err = s.link.conn.Write(buf) if err != nil { s.log(WarnLevel, pkg+"write failed", "error", err.Error()) - s.close() return err } return nil @@ -404,12 +400,10 @@ func sendConnectPacket(s *Session) error { } } - if copy(enc, []byte{0, 0, AMF_OBJECT_END}) != 3 { - return errCopying // TODO: is this even possible? - } + copy(enc, []byte{0, 0, AMF_OBJECT_END}) enc = enc[3:] - /* add auth string */ + // add auth string if s.link.auth != "" { enc = C_AMF_EncodeBoolean(enc, s.link.lFlags&RTMP_LF_AUTH != 0) if enc == nil { @@ -430,7 +424,7 @@ func sendConnectPacket(s *Session) error { pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return sendPacket(s, &pkt, true) + return sendPacket(s, &pkt, true) // response expected } func sendCreateStream(s *Session) error { @@ -458,7 +452,7 @@ func sendCreateStream(s *Session) error { pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return sendPacket(s, &pkt, true) + return sendPacket(s, &pkt, true) // response expected } func sendReleaseStream(s *Session) error { @@ -589,7 +583,7 @@ func sendPublish(s *Session) error { pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return sendPacket(s, &pkt, true) + return sendPacket(s, &pkt, true) // response expected } func sendDeleteStream(s *Session, dStreamId float64) error { @@ -779,6 +773,7 @@ func handleInvoke(s *Session, body []byte) error { s.log(FatalLevel, pkg+"unsupported method av_NetStream_Play_Start/av_NetStream_Play_PublishNotify") case av_NetStream_Publish_Start: + s.log(DebugLevel, pkg+"playing") s.isPlaying = true for i, m := range s.methodCalls { if m.name == av_publish { @@ -786,6 +781,7 @@ func handleInvoke(s *Session, body []byte) error { break } } + // ToDo: handle case when av_publish method not found case av_NetStream_Play_Complete, av_NetStream_Play_Stop, av_NetStream_Play_UnpublishNotify: s.log(FatalLevel, pkg+"unsupported method av_NetStream_Play_Complete/av_NetStream_Play_Stop/av_NetStream_Play_UnpublishNotify") @@ -825,14 +821,15 @@ func handshake(s *Session) error { if err != nil { return err } + s.log(DebugLevel, pkg+"handshake sent") var typ [1]byte err = readN(s, typ[:]) if err != nil { return err } + s.log(DebugLevel, pkg+"handshake received") - s.log(DebugLevel, pkg+"handshake", "received", typ[0]) if typ[0] != clientbuf[0] { s.log(WarnLevel, pkg+"handshake type mismatch", "sent", clientbuf[0], "received", typ) } diff --git a/rtmp/rtmp_test.go b/rtmp/rtmp_test.go index 0dd0e58c..4f927138 100644 --- a/rtmp/rtmp_test.go +++ b/rtmp/rtmp_test.go @@ -29,7 +29,6 @@ package rtmp import ( "fmt" - "io/ioutil" "os" "runtime" "testing" @@ -37,6 +36,7 @@ import ( "bitbucket.org/ausocean/av/stream/flv" "bitbucket.org/ausocean/av/stream/lex" + "io/ioutil" ) const ( @@ -87,7 +87,7 @@ func TestKey(t *testing.T) { testLog(0, "TestKey") testKey = os.Getenv("RTMP_TEST_KEY") if testKey == "" { - t.Errorf("RTMP_TEST_KEY environment variable not defined") + fmt.Printf("RTMP_TEST_KEY environment variable not defined\n") os.Exit(1) } testLog(0, "Testing against URL "+testBaseURL+testKey) @@ -122,18 +122,28 @@ func TestSetupURL(t *testing.T) { } } +func TestOpen(t *testing.T) { + testLog(0, "TestOpen") + s := NewSession(testBaseURL+testKey, testTimeout, testLog) + err := setupURL(s, s.url) + if err != nil { + t.Errorf("setupURL failed with error: %v", err) + } + s.enableWrite() + err = s.Open() + if err != nil { + t.Errorf("connect failed with error: %v", err) + } +} + func TestOpenClose(t *testing.T) { testLog(0, "TestOpenClose") s := NewSession(testBaseURL+testKey, testTimeout, testLog) err := s.Open() if err != nil { - t.Errorf("Session.Open failed with error: %v", err) + t.Errorf("Open failed with error: %v", err) return } - err = s.Close() - if err != nil { - t.Errorf("Session.Close failed with error: %v", err) - } } func TestFromFile(t *testing.T) {