diff --git a/rtmp/packet.go b/rtmp/packet.go index ebd79e11..95efb658 100644 --- a/rtmp/packet.go +++ b/rtmp/packet.go @@ -62,6 +62,12 @@ const ( RTMP_PACKET_SIZE_MINIMUM = 3 ) +const ( + RTMP_CHANNEL_BYTES_READ = 0x02 + RTMP_CHANNEL_CONTROL = 0x03 + RTMP_CHANNEL_SOURCE = 0x04 +) + // packetSize defines valid packet sizes. var packetSize = [...]int{12, 8, 4, 1} @@ -257,7 +263,7 @@ func resizePacket(pkt *packet, size uint32, ht uint8) { } // sendPacket sends a packet. -func sendPacket(s *Session, pkt *packet, queue int) error { +func sendPacket(s *Session, pkt *packet, queue bool) error { var prevPkt *packet var last int @@ -455,7 +461,6 @@ func sendPacket(s *Session, pkt *packet, queue int) error { } // We invoked a remote method - // TODO: port the const if pkt.packetType == RTMP_PACKET_TYPE_INVOKE { buf := pkt.body[1:] meth := C_AMF_DecodeString(buf) @@ -464,7 +469,7 @@ func sendPacket(s *Session, pkt *packet, queue int) error { log.Printf("invoking %v", meth) } // keep it in call queue till result arrives - if queue != 0 { + if queue { buf = buf[3+len(meth):] txn := int32(C_AMF_DecodeNumber(buf[:8])) s.methodCalls = append(s.methodCalls, method{name: meth, num: txn}) diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index eb2bb394..b1515b6d 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -166,7 +166,7 @@ func setupURL(s *Session, addr string) (err error) { } // connect establishes an RTMP connection. -func connect(s *Session, cp *packet) error { +func connect(s *Session) error { addr, err := net.ResolveTCPAddr("tcp4", s.link.host+":"+strconv.Itoa(int(s.link.port))) if err != nil { return err @@ -178,7 +178,7 @@ func connect(s *Session, cp *packet) error { if debugMode { log.Println("... connected, handshaking...") } - err = handshake(s, 1) + err = handshake(s) if err != nil { log.Println("connect: handshake failed") return errHandshake @@ -186,7 +186,7 @@ func connect(s *Session, cp *packet) error { if debugMode { log.Println("... handshaked...") } - err = sendConnectPacket(s, cp) + err = sendConnectPacket(s) if err != nil { log.Println("connect: sendConnect failed") return errConnSend @@ -195,13 +195,9 @@ func connect(s *Session, cp *packet) error { } // connectStream reads a packet and handles it -func connectStream(s *Session, seekTime int32) error { +func connectStream(s *Session) error { var pkt packet - if seekTime > 0 { - s.link.seekTime = seekTime - } - for !s.isPlaying && s.isConnected() { err := readPacket(s, &pkt) if err != nil { @@ -331,21 +327,14 @@ func writeN(s *Session, buf []byte) error { return nil } -func sendConnectPacket(s *Session, cp *packet) error { - if cp != nil { - return sendPacket(s, cp, 1) - } - +func sendConnectPacket(s *Session) error { var pbuf [4096]byte pkt := packet{ - channel: 0x03, - headerType: RTMP_PACKET_SIZE_LARGE, - packetType: RTMP_PACKET_TYPE_INVOKE, - timestamp: 0, - info: 0, - hasAbsTimestamp: false, - header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + channel: RTMP_CHANNEL_CONTROL, + headerType: RTMP_PACKET_SIZE_LARGE, + packetType: RTMP_PACKET_TYPE_INVOKE, + header: pbuf[:], + body: pbuf[RTMP_MAX_HEADER_SIZE:], } enc := pkt.body @@ -454,20 +443,17 @@ func sendConnectPacket(s *Session, cp *packet) error { pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return sendPacket(s, &pkt, 1) + return sendPacket(s, &pkt, true) } func sendCreateStream(s *Session) error { var pbuf [256]byte pkt := packet{ - channel: 0x03, /* control channel (invoke) */ - headerType: RTMP_PACKET_SIZE_MEDIUM, - packetType: RTMP_PACKET_TYPE_INVOKE, - timestamp: 0, - info: 0, - hasAbsTimestamp: false, - header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + channel: RTMP_CHANNEL_CONTROL, + headerType: RTMP_PACKET_SIZE_MEDIUM, + packetType: RTMP_PACKET_TYPE_INVOKE, + header: pbuf[:], + body: pbuf[RTMP_MAX_HEADER_SIZE:], } enc := pkt.body @@ -485,20 +471,17 @@ func sendCreateStream(s *Session) error { pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return sendPacket(s, &pkt, 1) + return sendPacket(s, &pkt, true) } func sendReleaseStream(s *Session) error { var pbuf [1024]byte pkt := packet{ - channel: 0x03, /* control channel (invoke) */ - headerType: RTMP_PACKET_SIZE_MEDIUM, - packetType: RTMP_PACKET_TYPE_INVOKE, - timestamp: 0, - info: 0, - hasAbsTimestamp: false, - header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + channel: RTMP_CHANNEL_CONTROL, + headerType: RTMP_PACKET_SIZE_MEDIUM, + packetType: RTMP_PACKET_TYPE_INVOKE, + header: pbuf[:], + body: pbuf[RTMP_MAX_HEADER_SIZE:], } enc := pkt.body @@ -519,20 +502,17 @@ func sendReleaseStream(s *Session) error { } pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return sendPacket(s, &pkt, 0) + return sendPacket(s, &pkt, false) } func sendFCPublish(s *Session) error { var pbuf [1024]byte pkt := packet{ - channel: 0x03, /* control channel (invoke) */ - headerType: RTMP_PACKET_SIZE_MEDIUM, - packetType: RTMP_PACKET_TYPE_INVOKE, - timestamp: 0, - info: 0, - hasAbsTimestamp: false, - header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + channel: RTMP_CHANNEL_CONTROL, + headerType: RTMP_PACKET_SIZE_MEDIUM, + packetType: RTMP_PACKET_TYPE_INVOKE, + header: pbuf[:], + body: pbuf[RTMP_MAX_HEADER_SIZE:], } enc := pkt.body @@ -554,20 +534,17 @@ func sendFCPublish(s *Session) error { pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return sendPacket(s, &pkt, 0) + return sendPacket(s, &pkt, false) } func sendFCUnpublish(s *Session) error { var pbuf [1024]byte pkt := packet{ - channel: 0x03, /* control channel (invoke) */ - headerType: RTMP_PACKET_SIZE_MEDIUM, - packetType: RTMP_PACKET_TYPE_INVOKE, - timestamp: 0, - info: 0, - hasAbsTimestamp: false, - header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + channel: RTMP_CHANNEL_CONTROL, + headerType: RTMP_PACKET_SIZE_MEDIUM, + packetType: RTMP_PACKET_TYPE_INVOKE, + header: pbuf[:], + body: pbuf[RTMP_MAX_HEADER_SIZE:], } enc := pkt.body @@ -589,20 +566,17 @@ func sendFCUnpublish(s *Session) error { pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return sendPacket(s, &pkt, 0) + return sendPacket(s, &pkt, false) } func sendPublish(s *Session) error { var pbuf [1024]byte pkt := packet{ - channel: 0x04, /* source channel (invoke) */ - headerType: RTMP_PACKET_SIZE_LARGE, - packetType: RTMP_PACKET_TYPE_INVOKE, - timestamp: 0, - info: s.streamID, - hasAbsTimestamp: false, - header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + channel: RTMP_CHANNEL_SOURCE, + headerType: RTMP_PACKET_SIZE_LARGE, + packetType: RTMP_PACKET_TYPE_INVOKE, + header: pbuf[:], + body: pbuf[RTMP_MAX_HEADER_SIZE:], } enc := pkt.body @@ -628,20 +602,17 @@ func sendPublish(s *Session) error { pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return sendPacket(s, &pkt, 1) + return sendPacket(s, &pkt, true) } func sendDeleteStream(s *Session, dStreamId float64) error { var pbuf [256]byte pkt := packet{ - channel: 0x03, /* control channel (invoke) */ - headerType: RTMP_PACKET_SIZE_MEDIUM, - packetType: RTMP_PACKET_TYPE_INVOKE, - timestamp: 0, - info: 0, - hasAbsTimestamp: false, - header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + channel: RTMP_CHANNEL_CONTROL, + headerType: RTMP_PACKET_SIZE_MEDIUM, + packetType: RTMP_PACKET_TYPE_INVOKE, + header: pbuf[:], + body: pbuf[RTMP_MAX_HEADER_SIZE:], } enc := pkt.body @@ -663,21 +634,18 @@ func sendDeleteStream(s *Session, dStreamId float64) error { pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) /* no response expected */ - return sendPacket(s, &pkt, 0) + return sendPacket(s, &pkt, false) } // sendBytesReceived tells the server how many bytes the client has received. func sendBytesReceived(s *Session) error { var pbuf [256]byte pkt := packet{ - channel: 0x02, /* control channel (invoke) */ - headerType: RTMP_PACKET_SIZE_MEDIUM, - packetType: RTMP_PACKET_TYPE_BYTES_READ_REPORT, - timestamp: 0, - info: 0, - hasAbsTimestamp: false, - header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + channel: RTMP_CHANNEL_BYTES_READ, + headerType: RTMP_PACKET_SIZE_MEDIUM, + packetType: RTMP_PACKET_TYPE_BYTES_READ_REPORT, + header: pbuf[:], + body: pbuf[RTMP_MAX_HEADER_SIZE:], } enc := pkt.body @@ -688,20 +656,17 @@ func sendBytesReceived(s *Session) error { } pkt.bodySize = 4 - return sendPacket(s, &pkt, 0) + return sendPacket(s, &pkt, false) } func sendCheckBW(s *Session) error { var pbuf [256]byte pkt := packet{ - channel: 0x03, /* control channel (invoke) */ - headerType: RTMP_PACKET_SIZE_LARGE, - packetType: RTMP_PACKET_TYPE_INVOKE, - timestamp: 0, - info: 0, - hasAbsTimestamp: false, - header: pbuf[:], - body: pbuf[RTMP_MAX_HEADER_SIZE:], + channel: RTMP_CHANNEL_CONTROL, + headerType: RTMP_PACKET_SIZE_LARGE, + packetType: RTMP_PACKET_TYPE_INVOKE, + header: pbuf[:], + body: pbuf[RTMP_MAX_HEADER_SIZE:], } enc := pkt.body @@ -719,7 +684,7 @@ func sendCheckBW(s *Session) error { pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return sendPacket(s, &pkt, 0) + return sendPacket(s, &pkt, false) } func eraseMethod(m []method, i int) []method { @@ -740,9 +705,6 @@ func handleInvoke(s *Session, body []byte) error { return errDecoding } - // NOTE we don't really need this ?? still functions without it - //C.AMF_Dump(&obj) - //C.AMFProp_GetString(C_AMF_GetProp(&obj, nil, 0), &method) meth := C_AMFProp_GetString(C_AMF_GetProp(&obj, "", 0)) txn := C_AMFProp_GetNumber(C_AMF_GetProp(&obj, "", 1)) // TODO use new logger here @@ -865,18 +827,15 @@ func handleInvoke(s *Session, body []byte) error { } leave: C_AMF_Reset(&obj) - // None of the methods we implement will result in a true return. return nil } -func handshake(s *Session, FP9HandShake int32) error { +func handshake(s *Session) error { var clientbuf [RTMP_SIG_SIZE + 1]byte clientsig := clientbuf[1:] var serversig [RTMP_SIG_SIZE]byte - - clientbuf[0] = 0x03 // not encrypted - + clientbuf[0] = RTMP_CHANNEL_CONTROL binary.BigEndian.PutUint32(clientsig, uint32(time.Now().UnixNano()/1000000)) copy(clientsig[4:8], []byte{0, 0, 0, 0}) @@ -934,16 +893,15 @@ func handshake(s *Session, FP9HandShake int32) error { // write prepares data to write then sends it. func (s *Session) write(buf []byte) error { - var pkt packet + pkt := packet{ + channel: RTMP_CHANNEL_SOURCE, + info: s.streamID, + } var enc []byte - size := len(buf) - var num int - pkt.channel = 0x04 - pkt.info = s.streamID for len(buf) != 0 { if pkt.bytesRead == 0 { - if size < minDataSize { + if len(buf) < minDataSize { return errTinyPacket } @@ -985,7 +943,7 @@ func (s *Session) write(buf []byte) error { } else { enc = pkt.body[:pkt.bodySize][pkt.bytesRead:] } - num = int(pkt.bodySize - pkt.bytesRead) + num := int(pkt.bodySize - pkt.bytesRead) if num > len(buf) { num = len(buf) } @@ -994,7 +952,7 @@ func (s *Session) write(buf []byte) error { pkt.bytesRead += uint32(num) buf = buf[num:] if pkt.bytesRead == pkt.bodySize { - err := sendPacket(s, &pkt, 0) + err := sendPacket(s, &pkt, false) pkt.body = nil pkt.bytesRead = 0 if err != nil { diff --git a/rtmp/rtmp_headers.go b/rtmp/rtmp_headers.go index 9ff28427..becb48be 100644 --- a/rtmp/rtmp_headers.go +++ b/rtmp/rtmp_headers.go @@ -105,7 +105,6 @@ type link struct { flashVer string token string extras C_AMFObject - seekTime int32 lFlags int32 swfAge int32 protocol int32 diff --git a/rtmp/session.go b/rtmp/session.go index 5cdd8924..bac165e6 100644 --- a/rtmp/session.go +++ b/rtmp/session.go @@ -96,13 +96,13 @@ func (s *Session) start() error { } s.enableWrite() - err = connect(s, nil) + err = connect(s) if err != nil { s.close() return err } - err = connectStream(s, 0) + err = connectStream(s) if err != nil { s.close() return err