diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index eb0d5621..2c6300d5 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -191,40 +191,38 @@ func connect(s *Session) error { // connectStream reads a packet and handles it func connectStream(s *Session) error { - var pkt packet - + var err error for !s.isPlaying && s.isConnected() { - err := readPacket(s, &pkt) + pkt := packet{} + err = readPacket(s, &pkt) if err != nil { break } - if pkt.bodySize == 0 { continue } - if pkt.packetType == RTMP_PACKET_TYPE_AUDIO || pkt.packetType == RTMP_PACKET_TYPE_VIDEO || pkt.packetType == RTMP_PACKET_TYPE_INFO { s.log(DebugLevel, pkg+"got packet before play; ignoring") - pkt.body = nil continue } - handlePacket(s, &pkt) - pkt.body = nil + err = handlePacket(s, &pkt) + if err != nil { + break + } } if !s.isPlaying { - return errConnStream + return err } return nil } // handlePacket handles a packet that the client has received. // NB: cases have been commented out that are not currently used by AusOcean -func handlePacket(s *Session, pkt *packet) int32 { - var hasMediaPacket int32 +func handlePacket(s *Session, pkt *packet) error { switch pkt.packetType { case RTMP_PACKET_TYPE_CHUNK_SIZE: @@ -266,7 +264,7 @@ func handlePacket(s *Session, pkt *packet) int32 { if err != nil { // This will never happen with the methods we implement. s.log(WarnLevel, pkg+"unexpected error from handleInvoke", "error", err.Error()) - hasMediaPacket = 2 + return err } case RTMP_PACKET_TYPE_FLASH_VIDEO: @@ -275,7 +273,7 @@ func handlePacket(s *Session, pkt *packet) int32 { default: s.log(WarnLevel, pkg+"unknown packet type", "type", pkt.packetType) } - return hasMediaPacket + return nil } func readN(s *Session, buf []byte) error { @@ -286,7 +284,6 @@ func readN(s *Session, buf []byte) error { n, err := io.ReadFull(s.link.conn, buf) if err != nil { s.log(DebugLevel, pkg+"read failed", "error", err.Error()) - s.close() return err } s.nBytesIn += int32(n) @@ -308,7 +305,6 @@ func writeN(s *Session, buf []byte) error { _, err = s.link.conn.Write(buf) if err != nil { s.log(WarnLevel, pkg+"write failed", "error", err.Error()) - s.close() return err } return nil @@ -404,12 +400,10 @@ func sendConnectPacket(s *Session) error { } } - if copy(enc, []byte{0, 0, AMF_OBJECT_END}) != 3 { - return errCopying // TODO: is this even possible? - } + copy(enc, []byte{0, 0, AMF_OBJECT_END}) enc = enc[3:] - /* add auth string */ + // add auth string if s.link.auth != "" { enc = C_AMF_EncodeBoolean(enc, s.link.lFlags&RTMP_LF_AUTH != 0) if enc == nil { @@ -430,7 +424,7 @@ func sendConnectPacket(s *Session) error { pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return sendPacket(s, &pkt, true) + return sendPacket(s, &pkt, true) // response expected } func sendCreateStream(s *Session) error { @@ -458,7 +452,7 @@ func sendCreateStream(s *Session) error { pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return sendPacket(s, &pkt, true) + return sendPacket(s, &pkt, true) // response expected } func sendReleaseStream(s *Session) error { @@ -589,7 +583,7 @@ func sendPublish(s *Session) error { pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return sendPacket(s, &pkt, true) + return sendPacket(s, &pkt, true) // response expected } func sendDeleteStream(s *Session, dStreamId float64) error { @@ -779,6 +773,7 @@ func handleInvoke(s *Session, body []byte) error { s.log(FatalLevel, pkg+"unsupported method av_NetStream_Play_Start/av_NetStream_Play_PublishNotify") case av_NetStream_Publish_Start: + s.log(DebugLevel, pkg+"playing") s.isPlaying = true for i, m := range s.methodCalls { if m.name == av_publish { @@ -786,6 +781,7 @@ func handleInvoke(s *Session, body []byte) error { break } } + // ToDo: handle case when av_publish method not found case av_NetStream_Play_Complete, av_NetStream_Play_Stop, av_NetStream_Play_UnpublishNotify: s.log(FatalLevel, pkg+"unsupported method av_NetStream_Play_Complete/av_NetStream_Play_Stop/av_NetStream_Play_UnpublishNotify") @@ -825,14 +821,15 @@ func handshake(s *Session) error { if err != nil { return err } + s.log(DebugLevel, pkg+"handshake sent") var typ [1]byte err = readN(s, typ[:]) if err != nil { return err } + s.log(DebugLevel, pkg+"handshake received") - s.log(DebugLevel, pkg+"handshake", "received", typ[0]) if typ[0] != clientbuf[0] { s.log(WarnLevel, pkg+"handshake type mismatch", "sent", clientbuf[0], "received", typ) }