From 31b1a6a7d3e099ee94671482ba5cbc9c849c4958 Mon Sep 17 00:00:00 2001 From: scruzin Date: Mon, 7 Jan 2019 18:00:42 +1030 Subject: [PATCH] Made function names camel case and factored packet functions into packet.go. --- rtmp/packet.go | 489 ++++++++++++++++++++++++++++ rtmp/parseurl.go | 5 +- rtmp/rtmp.go | 742 ++++++------------------------------------- rtmp/rtmp_headers.go | 46 +-- rtmp/session.go | 26 +- 5 files changed, 614 insertions(+), 694 deletions(-) create mode 100644 rtmp/packet.go diff --git a/rtmp/packet.go b/rtmp/packet.go new file mode 100644 index 00000000..310abed1 --- /dev/null +++ b/rtmp/packet.go @@ -0,0 +1,489 @@ +/* +NAME + packet.go + +DESCRIPTION + See Readme.md + +AUTHORS + Saxon Nelson-Milton + Dan Kortschak + Alan Noble + +LICENSE + packet.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. + + Derived from librtmp under the GNU Lesser General Public License 2.1 + Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org + Copyright (C) 2008-2009 Andrej Stepanchuk + Copyright (C) 2009-2010 Howard Chu +*/ + +package rtmp + +import ( + "encoding/binary" + "log" +) + +const ( + RTMP_PACKET_TYPE_CHUNK_SIZE = 0x01 + RTMP_PACKET_TYPE_BYTES_READ_REPORT = 0x03 + RTMP_PACKET_TYPE_CONTROL = 0x04 + RTMP_PACKET_TYPE_SERVER_BW = 0x05 + RTMP_PACKET_TYPE_CLIENT_BW = 0x06 + RTMP_PACKET_TYPE_AUDIO = 0x08 + RTMP_PACKET_TYPE_VIDEO = 0x09 + RTMP_PACKET_TYPE_FLEX_STREAM_SEND = 0x0F + RTMP_PACKET_TYPE_FLEX_SHARED_OBJECT = 0x10 + RTMP_PACKET_TYPE_FLEX_MESSAGE = 0x11 + RTMP_PACKET_TYPE_INFO = 0x12 + RTMP_PACKET_TYPE_INVOKE = 0x14 + RTMP_PACKET_TYPE_FLASH_VIDEO = 0x16 +) + +const ( + RTMP_PACKET_SIZE_LARGE = 0 + RTMP_PACKET_SIZE_MEDIUM = 1 + RTMP_PACKET_SIZE_SMALL = 2 + RTMP_PACKET_SIZE_MINIMUM = 3 +) + +// packetSize defines valid packet sizes. +var packetSize = [...]int{12, 8, 4, 1} + +// packet defines an RTMP packet. +type packet struct { + headerType uint8 + packetType uint8 + channel int32 + hasAbsTimestamp bool + timestamp uint32 + info int32 + bodySize uint32 + bytesRead uint32 + chunk *chunk + header []byte + body []byte +} + +// chunk defines an RTMP packet chunk. +type chunk struct { + headerSize int32 + data []byte + header [RTMP_MAX_HEADER_SIZE]byte +} + +// ToDo: Consider making the following functions into methods. +// readPacket reads a packet. +func readPacket(s *Session, pkt *packet) error { + var hbuf [RTMP_MAX_HEADER_SIZE]byte + header := hbuf[:] + + err := readN(s, header[:1]) + if err != nil { + log.Println("readPacket: failed to read RTMP packet header!") + return err + } + pkt.headerType = (header[0] & 0xc0) >> 6 + pkt.channel = int32(header[0] & 0x3f) + header = header[1:] + + switch { + case pkt.channel == 0: + err = readN(s, header[:1]) + if err != nil { + log.Println("readPacket: failed to read rtmp packet header 2nd byte.") + return err + } + header = header[1:] + pkt.channel = int32(header[0]) + 64 + + case pkt.channel == 1: + err = readN(s, header[:2]) + if err != nil { + log.Println("readPacket: failed to read RTMP packet 3rd byte") + return err + } + header = header[2:] + pkt.channel = int32(binary.BigEndian.Uint16(header[:2])) + 64 + + } + + if pkt.channel >= s.channelsAllocatedIn { + n := pkt.channel + 10 + timestamp := append(s.channelTimestamp, make([]int32, 10)...) + + var pkts []*packet + if s.vecChannelsIn == nil { + pkts = make([]*packet, n) + } else { + pkts = append(s.vecChannelsIn[:pkt.channel:pkt.channel], make([]*packet, 10)...) + } + + s.channelTimestamp = timestamp + s.vecChannelsIn = pkts + + for i := int(s.channelsAllocatedIn); i < len(s.channelTimestamp); i++ { + s.channelTimestamp[i] = 0 + } + for i := int(s.channelsAllocatedIn); i < int(n); i++ { + s.vecChannelsIn[i] = nil + } + s.channelsAllocatedIn = n + } + + size := packetSize[pkt.headerType] + switch { + case size == RTMP_LARGE_HEADER_SIZE: + pkt.hasAbsTimestamp = true + case size < RTMP_LARGE_HEADER_SIZE: + if s.vecChannelsIn[pkt.channel] != nil { + *pkt = *(s.vecChannelsIn[pkt.channel]) + } + } + + size-- + + if size > 0 { + err = readN(s, header[:size]) + if err != nil { + log.Println("readPacket: failed to read rtmp packet heades.") + return err + } + } + + hSize := len(hbuf) - len(header) + size + + if size >= 3 { + pkt.timestamp = C_AMF_DecodeInt24(header[:3]) + + if size >= 6 { + pkt.bodySize = C_AMF_DecodeInt24(header[3:6]) + pkt.bytesRead = 0 + + if size > 6 { + pkt.packetType = header[6] + + if size == 11 { + pkt.info = decodeInt32LE(header[7:11]) + } + } + } + } + + extendedTimestamp := pkt.timestamp == 0xffffff + if extendedTimestamp { + err = readN(s, header[size:size+4]) + if err != nil { + log.Println("RTMPRead_Packet: Failed to read extended timestamp") + return err + } + // TODO: port this + pkt.timestamp = C_AMF_DecodeInt32(header[size : size+4]) + hSize += 4 + } + + if pkt.bodySize > 0 && pkt.body == nil { + resizePacket(pkt, pkt.bodySize, (hbuf[0]&0xc0)>>6) + } + + toRead := int32(pkt.bodySize - pkt.bytesRead) + chunkSize := s.inChunkSize + + if toRead < chunkSize { + chunkSize = toRead + } + + if pkt.chunk != nil { + pkt.chunk.headerSize = int32(hSize) + copy(pkt.chunk.header[:], hbuf[:hSize]) + pkt.chunk.data = pkt.body[pkt.bytesRead : pkt.bytesRead+uint32(chunkSize)] + } + + err = readN(s, pkt.body[pkt.bytesRead:][:chunkSize]) + if err != nil { + log.Println("readPacket: failed to read RTMP packet body") + return err + } + + pkt.bytesRead += uint32(chunkSize) + + // keep the packet as ref for other packets on this channel + if s.vecChannelsIn[pkt.channel] == nil { + s.vecChannelsIn[pkt.channel] = &packet{} + } + *(s.vecChannelsIn[pkt.channel]) = *pkt + + if extendedTimestamp { + s.vecChannelsIn[pkt.channel].timestamp = 0xffffff + } + + if pkt.bytesRead != pkt.bodySize { + panic("readPacket: bytesRead != bodySize") + } + + if !pkt.hasAbsTimestamp { + // timestamps seem to always be relative + pkt.timestamp += uint32(s.channelTimestamp[pkt.channel]) + } + s.channelTimestamp[pkt.channel] = int32(pkt.timestamp) + + s.vecChannelsIn[pkt.channel].body = nil + s.vecChannelsIn[pkt.channel].bytesRead = 0 + s.vecChannelsIn[pkt.channel].hasAbsTimestamp = false + return nil +} + +// resizePacket adjust the packet's storage to accommodate a body of the given size. +func resizePacket(pkt *packet, size uint32, ht uint8) { + buf := make([]byte, RTMP_MAX_HEADER_SIZE+size) + pkt.headerType = ht + pkt.header = buf + pkt.body = buf[RTMP_MAX_HEADER_SIZE:] +} + +// sendPacket sends a packet. +func sendPacket(s *Session, pkt *packet, queue int) error { + var prevPkt *packet + var last int + + if pkt.channel >= s.channelsAllocatedOut { + n := int(pkt.channel + 10) + + var pkts []*packet + if s.vecChannelsOut == nil { + pkts = make([]*packet, n) + } else { + pkts = append(s.vecChannelsOut[:pkt.channel:pkt.channel], make([]*packet, 10)...) + } + s.vecChannelsOut = pkts + + for i := int(s.channelsAllocatedOut); i < n; i++ { + s.vecChannelsOut[i] = nil + } + + s.channelsAllocatedOut = int32(n) + } + prevPkt = s.vecChannelsOut[pkt.channel] + + if prevPkt != nil && pkt.headerType != RTMP_PACKET_SIZE_LARGE { + // compress a bit by using the prev packet's attributes + if prevPkt.bodySize == pkt.bodySize && prevPkt.packetType == pkt.packetType && pkt.headerType == RTMP_PACKET_SIZE_MEDIUM { + pkt.headerType = RTMP_PACKET_SIZE_SMALL + } + + if prevPkt.timestamp == pkt.timestamp && pkt.headerType == RTMP_PACKET_SIZE_SMALL { + pkt.headerType = RTMP_PACKET_SIZE_MINIMUM + } + + last = int(prevPkt.timestamp) + } + + if pkt.headerType > 3 { + log.Printf("Sanity failed! trying to send header of type: 0x%02x.", + pkt.headerType) + return errInvalidHeader + } + + var headBytes []byte + var origIdx int + if pkt.body != nil { + // Span from -packetsize for the type to the start of the body. + headBytes = pkt.header + origIdx = RTMP_MAX_HEADER_SIZE - packetSize[pkt.headerType] + } else { + // Allocate a new header and allow 6 bytes of movement backward. + var hbuf [RTMP_MAX_HEADER_SIZE]byte + headBytes = hbuf[:] + origIdx = 6 + } + + var cSize int + switch { + case pkt.channel > 319: + cSize = 2 + case pkt.channel > 63: + cSize = 1 + } + + hSize := packetSize[pkt.headerType] + if cSize != 0 { + origIdx -= cSize + hSize += cSize + } + + var ts uint32 + if prevPkt != nil { + ts = uint32(int(pkt.timestamp) - last) + } + if ts >= 0xffffff { + origIdx -= 4 + hSize += 4 + log.Printf("Larger timestamp than 24-bit: 0x%v", ts) + } + + headerIdx := origIdx + + c := pkt.headerType << 6 + switch cSize { + case 0: + c |= byte(pkt.channel) + case 1: + // Do nothing. + case 2: + c |= 1 + } + headBytes[headerIdx] = c + headerIdx++ + + if cSize != 0 { + tmp := pkt.channel - 64 + headBytes[headerIdx] = byte(tmp & 0xff) + headerIdx++ + + if cSize == 2 { + headBytes[headerIdx] = byte(tmp >> 8) + headerIdx++ + } + } + + if packetSize[pkt.headerType] > 1 { + res := ts + if ts > 0xffffff { + res = 0xffffff + } + C_AMF_EncodeInt24(headBytes[headerIdx:], int32(res)) + headerIdx += 3 // 24bits + } + + if packetSize[pkt.headerType] > 4 { + C_AMF_EncodeInt24(headBytes[headerIdx:], int32(pkt.bodySize)) + headerIdx += 3 // 24bits + headBytes[headerIdx] = pkt.packetType + headerIdx++ + } + + if packetSize[pkt.headerType] > 8 { + n := int(encodeInt32LE(headBytes[headerIdx:headerIdx+4], pkt.info)) + headerIdx += n + } + + if ts >= 0xffffff { + C_AMF_EncodeInt32(headBytes[headerIdx:], int32(ts)) + headerIdx += 4 // 32bits + } + + size := int(pkt.bodySize) + chunkSize := int(s.outChunkSize) + + if debugMode { + log.Printf("C_RTMP_SendPacket: %v->%v, size=%v", s.link.conn.LocalAddr(), s.link.conn.RemoteAddr(), size) + } + + // Send the previously deferred packet if combining it with the next packet would exceed the chunk size. + if s.defered != nil && len(s.defered)+size+hSize > chunkSize { + err := writeN(s, s.defered) + if err != nil { + return err + } + s.defered = nil + } + + // TODO(kortschak): Rewrite this horrific peice of premature optimisation. + // NB: RTMP wants packets in chunks which are 128 bytes by default, but the server may request a different size. + for size+hSize != 0 { + if s.defered == nil && pkt.packetType == RTMP_PACKET_TYPE_AUDIO && size < chunkSize { + s.defered = headBytes[origIdx:][:size+hSize] + break + } + if chunkSize > size { + chunkSize = size + } + bytes := headBytes[origIdx:][:chunkSize+hSize] + if s.defered != nil { + // Prepend the previously deferred packet and write it with the current one. + bytes = append(s.defered, bytes...) + } + err := writeN(s, bytes) + if err != nil { + return err + } + s.defered = nil + + size -= chunkSize + origIdx += chunkSize + hSize + hSize = 0 + + if size > 0 { + origIdx -= 1 + cSize + hSize = 1 + cSize + + if ts >= 0xffffff { + origIdx -= 4 + hSize += 4 + } + + headBytes[origIdx] = 0xc0 | c + + if cSize != 0 { + tmp := int(pkt.channel) - 64 + headBytes[origIdx+1] = byte(tmp) + + if cSize == 2 { + headBytes[origIdx+2] = byte(tmp >> 8) + } + } + if ts >= 0xffffff { + extendedTimestamp := headBytes[origIdx+1+cSize:] + C_AMF_EncodeInt32(extendedTimestamp[:4], int32(ts)) + } + } + } + + // We invoked a remote method + // TODO: port the const + if pkt.packetType == RTMP_PACKET_TYPE_INVOKE { + buf := pkt.body[1:] + meth := C_AMF_DecodeString(buf) + + if debugMode { + log.Printf("invoking %v", meth) + } + // keep it in call queue till result arrives + if queue != 0 { + buf = buf[3+len(meth):] + txn := int32(C_AMF_DecodeNumber(buf[:8])) + s.methodCalls = append(s.methodCalls, method{name: meth, num: txn}) + } + } + + if s.vecChannelsOut[pkt.channel] == nil { + s.vecChannelsOut[pkt.channel] = &packet{} + } + *(s.vecChannelsOut[pkt.channel]) = *pkt + + return nil +} + +func decodeInt32LE(data []byte) int32 { + return int32(data[3])<<24 | int32(data[2])<<16 | int32(data[1])<<8 | int32(data[0]) +} + +func encodeInt32LE(dst []byte, v int32) int32 { + binary.LittleEndian.PutUint32(dst, uint32(v)) + return 4 +} diff --git a/rtmp/parseurl.go b/rtmp/parseurl.go index 8faa6414..97110eb8 100644 --- a/rtmp/parseurl.go +++ b/rtmp/parseurl.go @@ -40,9 +40,8 @@ import ( "strings" ) -// int RTMP_ParseURL(const char *url, int *protocol, AVal *host, unsigned int *port, AVal *playpath, AVal *app); -// parseurl.c +33 -func C_RTMP_ParseURL(addr string) (protocol int32, host string, port uint16, app, playpath string, err error) { +// parseURL parses an RTMP URL (ok, technically it is lexing). +func parseURL(addr string) (protocol int32, host string, port uint16, app, playpath string, err error) { u, err := url.Parse(addr) if err != nil { log.Printf("failed to parse addr: %v", err) diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index e85f1658..b9121c66 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -108,30 +108,23 @@ const ( av_videoFunction = "videoFunction" ) -var RTMPT_cmds = []string{ - "open", - "send", - "idle", - "close", +// RTMP protocol strings. +var rtmpProtocolStrings = [...]string{ + "rtmp", + "rtmpt", + "rtmpe", + "rtmpte", + "rtmps", + "rtmpts", + "", + "", + "rtmfp", } -var ( - packetSize = [...]int{12, 8, 4, 1} - RTMPProtocolStringsLower = [...]string{ - "rtmp", - "rtmpt", - "rtmpe", - "rtmpte", - "rtmps", - "rtmpts", - "", - "", - "rtmfp", - } -) - +// RTMP errors. var ( errUnknownScheme = errors.New("rtmp: unknown scheme") + errNotConnected = errors.New("rtmp: not connected") errHandshake = errors.New("rtmp: handshake failed") errConnSend = errors.New("rtmp: connection send error") errConnStream = errors.New("rtmp: connection stream error") @@ -143,37 +136,9 @@ var ( errCopying = errors.New("rtmp: copying error") ) -// RTMPPacket_IsReady(a) -// rtmp.h +142 -func C_RTMPPacket_IsReady(pkt *packet) bool { - return pkt.bytesRead == pkt.bodySize -} - -// uint32_t RTMP_GetTime(); -// rtmp.c +156 -func C_RTMP_GetTime() int32 { - return int32(time.Now().UnixNano() / 1000000) -} - -// void RTMP_EnableWrite(RTMP *r); -// rtmp.c +351 -func C_RTMP_EnableWrite(s *Session) { - s.link.protocol |= RTMP_FEATURE_WRITE -} - -// void RTMP_SetBufferMS(RTMP *r, int size); -// rtmp.c +381 -// DELETED - -// void SocksSetup(RTMP *r, C_AVal* sockshost); -// rtmp.c +410 -// DELETED - -// int RTMP_SetupURL(RTMP *r, char* url); -// rtmp.c +757 -// NOTE: code dealing with rtmp over http has been disregarded -func C_RTMP_SetupURL(s *Session, addr string) (err error) { - s.link.protocol, s.link.host, s.link.port, s.link.app, s.link.playpath, err = C_RTMP_ParseURL(addr) +// setupURL parses the RTMP URL. +func setupURL(s *Session, addr string) (err error) { + s.link.protocol, s.link.host, s.link.port, s.link.app, s.link.playpath, err = parseURL(addr) if err != nil { return err } @@ -181,7 +146,7 @@ func C_RTMP_SetupURL(s *Session, addr string) (err error) { if s.link.tcUrl == "" { if s.link.app != "" { s.link.tcUrl = fmt.Sprintf("%v://%v:%v/%v", - RTMPProtocolStringsLower[s.link.protocol], s.link.host, s.link.port, s.link.app) + rtmpProtocolStrings[s.link.protocol], s.link.host, s.link.port, s.link.app) s.link.lFlags |= RTMP_LF_FTCU } else { s.link.tcUrl = addr @@ -201,9 +166,8 @@ func C_RTMP_SetupURL(s *Session, addr string) (err error) { return nil } -// int RTMP_Connect(RTMP *r, RTMPPacket* cp); -// rtmp.c +1032 -func C_RTMP_Connect(s *Session, cp *packet) error { +// connect establishes an RTMP connection. +func connect(s *Session, cp *packet) error { addr, err := net.ResolveTCPAddr("tcp4", s.link.host+":"+strconv.Itoa(int(s.link.port))) if err != nil { return err @@ -215,30 +179,24 @@ func C_RTMP_Connect(s *Session, cp *packet) error { if debugMode { log.Println("... connected, handshaking...") } - err = C_HandShake(s, 1) + err = handshake(s, 1) if err != nil { - log.Println("C_RTMP_Connect1: handshake failed!") + log.Println("connect: handshake failed") return errHandshake } if debugMode { log.Println("... handshaked...") } - err = C_SendConnectPacket(s, cp) + err = sendConnectPacket(s, cp) if err != nil { - log.Println("RTMP connect failed!") + log.Println("connect: sendConnect failed") return errConnSend } return nil } -// int RTMP_Connect1(RTMP* r, RTMPPacket* cp); -// rtmp.c +978 -// DELETED - subsumed by RTMP_Connect - -// int RTMP_ConnectStream(RTMP* r, int seekTime); -// rtmp.c +1099 -// Side effects: s.isPlaying is set true upon successful connection -func C_RTMP_ConnectStream(s *Session, seekTime int32) error { +// connectStream reads a packet and handles it +func connectStream(s *Session, seekTime int32) error { var pkt packet if seekTime > 0 { @@ -246,27 +204,25 @@ func C_RTMP_ConnectStream(s *Session, seekTime int32) error { } for !s.isPlaying && s.isConnected() { - err := C_RTMP_ReadPacket(s, &pkt) + err := readPacket(s, &pkt) if err != nil { break } - // TODO: port is ready - if C_RTMPPacket_IsReady(&pkt) { - if pkt.bodySize == 0 { - continue - } - if pkt.packetType == RTMP_PACKET_TYPE_AUDIO || - pkt.packetType == RTMP_PACKET_TYPE_VIDEO || - pkt.packetType == RTMP_PACKET_TYPE_INFO { - log.Println("C_RTMP_ConnectStream: got packet before play()! Ignoring.") - pkt.body = nil - continue - } - - C_RTMP_ClientPacket(s, &pkt) - pkt.body = nil + if pkt.bodySize == 0 { + continue } + + if pkt.packetType == RTMP_PACKET_TYPE_AUDIO || + pkt.packetType == RTMP_PACKET_TYPE_VIDEO || + pkt.packetType == RTMP_PACKET_TYPE_INFO { + log.Println("connectStream: got packet before play()! Ignoring.") + pkt.body = nil + continue + } + + handlePacket(s, &pkt) + pkt.body = nil } if !s.isPlaying { @@ -275,35 +231,33 @@ func C_RTMP_ConnectStream(s *Session, seekTime int32) error { return nil } -// int RTMP_ClientPacket() -// rtmp.c +1226 -// NOTE cases have been commented out that are not currently used by AusOcean -func C_RTMP_ClientPacket(s *Session, pkt *packet) int32 { +// handlePacket handles a packet that the client has received. +// NB: cases have been commented out that are not currently used by AusOcean +func handlePacket(s *Session, pkt *packet) int32 { var hasMediaPacket int32 switch pkt.packetType { case RTMP_PACKET_TYPE_CHUNK_SIZE: - // TODO: port this - C_HandleChangeChunkSize(s, pkt) + if pkt.bodySize >= 4 { + s.inChunkSize = int32(C_AMF_DecodeInt32(pkt.body[:4])) + } case RTMP_PACKET_TYPE_BYTES_READ_REPORT: - // TODO: usue new logger here - //RTMP_Log(RTMP_LOGDEBUG, "%s, received: bytes read report", __FUNCTION__); + s.serverBW = int32(C_AMF_DecodeInt32(pkt.body[:4])) case RTMP_PACKET_TYPE_CONTROL: panic("Unsupported packet type RTMP_PACKET_TYPE_CONTROL") - /* - log.Println("RTMP_PACKET_TYPE_CONTROL") - // TODO: port this - C.HandleCtrl(s, pkt) - */ + case RTMP_PACKET_TYPE_SERVER_BW: - // TODO: port this - C_HandlServerBW(s, pkt) + s.serverBW = int32(C_AMF_DecodeInt32(pkt.body[:4])) case RTMP_PACKET_TYPE_CLIENT_BW: - // TODO: port this - C_HandleClientBW(s, pkt) + s.clientBW = int32(C_AMF_DecodeInt32(pkt.body[:4])) + if pkt.bodySize > 4 { + s.clientBW2 = pkt.body[4] + } else { + s.clientBW2 = 0xff + } case RTMP_PACKET_TYPE_AUDIO: panic("Unsupported packet type RTMP_PACKET_TYPE_AUDIO") @@ -318,11 +272,10 @@ func C_RTMP_ClientPacket(s *Session, pkt *packet) int32 { panic("Unsupported packet type RTMP_PACKET_TYPE_INFO") case RTMP_PACKET_TYPE_INVOKE: - log.Println("RTMP_PACKET_TYPE_INVOKE:") - // TODO use new logger here - //RTMP_Log(RTMP_LOGDEBUG, "%s, received: invoke %u bytes", __FUNCTION__,pkt.bodySize); - - err := C_HandleInvoke(s, pkt.body[:pkt.bodySize]) + if debugMode { + log.Println("RTMP_PACKET_TYPE_INVOKE:") + } + err := handleInvoke(s, pkt.body[:pkt.bodySize]) if err != nil { // This will never happen with the methods we implement. log.Println("HasMediaPacket") @@ -339,9 +292,7 @@ func C_RTMP_ClientPacket(s *Session, pkt *packet) int32 { return hasMediaPacket } -// int ReadN(RTMP* r, char* buffer, int n); -// rtmp.c +1390 -func C_ReadN(s *Session, buf []byte) error { +func readN(s *Session, buf []byte) error { err := s.link.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(s.link.timeout))) if err != nil { return err @@ -349,14 +300,14 @@ func C_ReadN(s *Session, buf []byte) error { n, err := io.ReadFull(s.link.conn, buf) if err != nil { if debugMode { - log.Printf("C_ReadN error: %v\n", err) + log.Printf("readN error: %v\n", err) } s.close() return err } s.nBytesIn += int32(n) if s.nBytesIn > (s.nBytesInSent + s.clientBW/10) { - err := C_SendBytesReceived(s) + err := sendBytesReceived(s) if err != nil { return err } @@ -364,9 +315,7 @@ func C_ReadN(s *Session, buf []byte) error { return nil } -// int WriteN(RTMP* r, const char* buffer, int n); -// rtmp.c +1502 -func C_WriteN(s *Session, buf []byte) error { +func writeN(s *Session, buf []byte) error { //ToDo: consider using a different timeout for writes than for reads err := s.link.conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(s.link.timeout))) if err != nil { @@ -375,7 +324,7 @@ func C_WriteN(s *Session, buf []byte) error { _, err = s.link.conn.Write(buf) if err != nil { if debugMode { - log.Printf("C_WriteN, RTMP send error: %v\n", err) + log.Printf("writeN, RTMP send error: %v\n", err) } s.close() return err @@ -383,11 +332,9 @@ func C_WriteN(s *Session, buf []byte) error { return nil } -// int SendConnectPacket(RTMP* r, RTMPPacket* cp); -// rtmp.c +1579 -func C_SendConnectPacket(s *Session, cp *packet) error { +func sendConnectPacket(s *Session, cp *packet) error { if cp != nil { - return C_RTMP_SendPacket(s, cp, 1) + return sendPacket(s, cp, 1) } var pbuf [4096]byte @@ -508,12 +455,10 @@ func C_SendConnectPacket(s *Session, cp *packet) error { pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return C_RTMP_SendPacket(s, &pkt, 1) + return sendPacket(s, &pkt, 1) } -// int RTMP_SendCreateStream(RTMP* r); -// rtmp.c +1725 -func C_RTMP_SendCreateStream(s *Session) error { +func sendCreateStream(s *Session) error { var pbuf [256]byte pkt := packet{ channel: 0x03, /* control channel (invoke) */ @@ -541,12 +486,10 @@ func C_RTMP_SendCreateStream(s *Session) error { pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return C_RTMP_SendPacket(s, &pkt, 1) + return sendPacket(s, &pkt, 1) } -// int SendReleaseStream(RTMP* r); -// rtmp.c +1816 -func C_SendReleaseStream(s *Session) error { +func sendReleaseStream(s *Session) error { var pbuf [1024]byte pkt := packet{ channel: 0x03, /* control channel (invoke) */ @@ -577,12 +520,10 @@ func C_SendReleaseStream(s *Session) error { } pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return C_RTMP_SendPacket(s, &pkt, 0) + return sendPacket(s, &pkt, 0) } -// int SendFCPublish(RTMP* r); -// rtmp.c +1846 -func C_SendFCPublish(s *Session) error { +func sendFCPublish(s *Session) error { var pbuf [1024]byte pkt := packet{ channel: 0x03, /* control channel (invoke) */ @@ -614,12 +555,10 @@ func C_SendFCPublish(s *Session) error { pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return C_RTMP_SendPacket(s, &pkt, 0) + return sendPacket(s, &pkt, 0) } -// int SendFCUnpublish(RTMP *r); -// rtmp.c +1875 -func C_SendFCUnpublish(s *Session) error { +func sendFCUnpublish(s *Session) error { var pbuf [1024]byte pkt := packet{ channel: 0x03, /* control channel (invoke) */ @@ -651,12 +590,10 @@ func C_SendFCUnpublish(s *Session) error { pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return C_RTMP_SendPacket(s, &pkt, 0) + return sendPacket(s, &pkt, 0) } -// int SendPublish(RTMP* r); -// rtmp.c +1908 -func C_SendPublish(s *Session) error { +func sendPublish(s *Session) error { var pbuf [1024]byte pkt := packet{ channel: 0x04, /* source channel (invoke) */ @@ -692,13 +629,10 @@ func C_SendPublish(s *Session) error { pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return C_RTMP_SendPacket(s, &pkt, 1) + return sendPacket(s, &pkt, 1) } -// int -// SendDeleteStream(RTMP *r, double dStreamId) -// rtmp.c +1942 -func C_SendDeleteStream(s *Session, dStreamId float64) error { +func sendDeleteStream(s *Session, dStreamId float64) error { var pbuf [256]byte pkt := packet{ channel: 0x03, /* control channel (invoke) */ @@ -730,12 +664,11 @@ func C_SendDeleteStream(s *Session, dStreamId float64) error { pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) /* no response expected */ - return C_RTMP_SendPacket(s, &pkt, 0) + return sendPacket(s, &pkt, 0) } -// int SendBytesReceived(RTMP* r); -// rtmp.c +2080 -func C_SendBytesReceived(s *Session) error { +// sendBytesReceived tells the server how many bytes the client has received. +func sendBytesReceived(s *Session) error { var pbuf [256]byte pkt := packet{ channel: 0x02, /* control channel (invoke) */ @@ -756,12 +689,10 @@ func C_SendBytesReceived(s *Session) error { } pkt.bodySize = 4 - return C_RTMP_SendPacket(s, &pkt, 0) + return sendPacket(s, &pkt, 0) } -// int SendCheckBW(RTMP* r); -// rtmp.c +2105 -func C_SendCheckBW(s *Session) error { +func sendCheckBW(s *Session) error { var pbuf [256]byte pkt := packet{ channel: 0x03, /* control channel (invoke) */ @@ -789,21 +720,18 @@ func C_SendCheckBW(s *Session) error { pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return C_RTMP_SendPacket(s, &pkt, 0) + return sendPacket(s, &pkt, 0) } -// void AV_erase(method* vals, int* num, int i, int freeit); -// rtmp.c +2393 -func C_AV_erase(m []method, i int) []method { +func eraseMethod(m []method, i int) []method { copy(m[i:], m[i+1:]) m[len(m)-1] = method{} return m[:len(m)-1] } -// int HandleInvoke(RTMP* r, const char* body, unsigned int bodySize); -// rtmp.c +2912 +// int handleInvoke handles a packet invoke request // Side effects: s.isPlaying set to true upon av_NetStream_Publish_Start -func C_HandleInvoke(s *Session, body []byte) error { +func handleInvoke(s *Session, body []byte) error { if body[0] != 0x02 { return errInvalidBody } @@ -827,7 +755,7 @@ func C_HandleInvoke(s *Session, body []byte) error { for i, m := range s.methodCalls { if float64(m.num) == txn { methodInvoked = m.name - s.methodCalls = C_AV_erase(s.methodCalls, i) + s.methodCalls = eraseMethod(s.methodCalls, i) break } } @@ -847,13 +775,13 @@ func C_HandleInvoke(s *Session, body []byte) error { } if (s.link.protocol & RTMP_FEATURE_WRITE) != 0 { - C_SendReleaseStream(s) - C_SendFCPublish(s) + sendReleaseStream(s) + sendFCPublish(s) } else { panic("Link protocol has no RTMP_FEATURE_WRITE") } - C_RTMP_SendCreateStream(s) + sendCreateStream(s) if (s.link.protocol & RTMP_FEATURE_WRITE) == 0 { panic("Link protocol has no RTMP_FEATURE_WRITE") } @@ -862,7 +790,7 @@ func C_HandleInvoke(s *Session, body []byte) error { s.streamID = int32(C_AMFProp_GetNumber(C_AMF_GetProp(&obj, "", 3))) if s.link.protocol&RTMP_FEATURE_WRITE != 0 { - C_SendPublish(s) + sendPublish(s) } else { panic("Link protocol has no RTMP_FEATURE_WRITE") } @@ -874,7 +802,7 @@ func C_HandleInvoke(s *Session, body []byte) error { case av_onBWDone: if s.bwCheckCounter == 0 { - C_SendCheckBW(s) + sendCheckBW(s) } case av_onFCUnsubscribe, av_onFCSubscribe: @@ -915,7 +843,7 @@ func C_HandleInvoke(s *Session, body []byte) error { s.isPlaying = true for i, m := range s.methodCalls { if m.name == av_publish { - s.methodCalls = C_AV_erase(s.methodCalls, i) + s.methodCalls = eraseMethod(s.methodCalls, i) break } } @@ -942,226 +870,9 @@ leave: return nil } -// void HandleChangeChunkSize(RTMP* r, const RTMPPacket* packet); -// rtmp.c +3345 -func C_HandleChangeChunkSize(s *Session, pkt *packet) { - if pkt.bodySize >= 4 { - s.inChunkSize = int32(C_AMF_DecodeInt32(pkt.body[:4])) - // TODO use new logger here - // RTMP_Log(RTMP_LOGDEBUG, "%s, received: chunk size change to %d", __FUNCTION__, s.inChunkSize); - } -} - -// void HandleServerBW(RTMP* r, const RTMPPacket* packet); -// rtmp.c +3508 -func C_HandlServerBW(s *Session, pkt *packet) { - s.serverBW = int32(C_AMF_DecodeInt32(pkt.body[:4])) - // TODO use new logger here - // RTMP_Log(RTMP_LOGDEBUG, "%s: server BW = %d", __FUNCTION__, s.serverBW); -} - -// void HandleClientBW(RTMP* r, const RTMPPacket* packet); -// rtmp.c +3515 -func C_HandleClientBW(s *Session, pkt *packet) { - s.clientBW = int32(C_AMF_DecodeInt32(pkt.body[:4])) - //s.clientBW = int32(C.AMF_DecodeInt32((*byte)(unsafe.Pointer(pkt.body)))) - - if pkt.bodySize > 4 { - s.clientBW2 = pkt.body[4] - } else { - s.clientBW2 = 0xff - } - // TODO use new logger here - // RTMP_Log(RTMP_LOGDEBUG, "%s: client BW = %d %d", __FUNCTION__, s.clientBW, - //s.clientBW2); -} - -// static int DecodeInt32LE(const char* data); -// rtmp.c +3527 -func C_DecodeInt32LE(data []byte) int32 { - return int32(data[3])<<24 | int32(data[2])<<16 | int32(data[1])<<8 | int32(data[0]) -} - -// int EncodeInt32LE(char* output, int nVal); -// rtmp.c +3537 -func C_EncodeInt32LE(dst []byte, v int32) int32 { - binary.LittleEndian.PutUint32(dst, uint32(v)) - return 4 -} - -// int RTMP_ReadPacket(RTMP* r, RTMPPacket* packet); -// rtmp.c +3550 -func C_RTMP_ReadPacket(s *Session, pkt *packet) error { - var hbuf [RTMP_MAX_HEADER_SIZE]byte - header := hbuf[:] - - err := C_ReadN(s, header[:1]) - if err != nil { - log.Println("C_RTMP_ReadPacket: failed to read RTMP packet header!") - return err - } - pkt.headerType = (header[0] & 0xc0) >> 6 - pkt.channel = int32(header[0] & 0x3f) - header = header[1:] - - switch { - case pkt.channel == 0: - err = C_ReadN(s, header[:1]) - if err != nil { - log.Println("C_RTMP_ReadPacket: failed to read rtmp packet header 2nd byte.") - return err - } - header = header[1:] - pkt.channel = int32(header[0]) + 64 - - case pkt.channel == 1: - err = C_ReadN(s, header[:2]) - if err != nil { - log.Println("C_RTMP_ReadPacket: failed to read RTMP packet 3rd byte") - return err - } - header = header[2:] - pkt.channel = int32(binary.BigEndian.Uint16(header[:2])) + 64 - - } - - if pkt.channel >= s.channelsAllocatedIn { - n := pkt.channel + 10 - timestamp := append(s.channelTimestamp, make([]int32, 10)...) - - var pkts []*packet - if s.vecChannelsIn == nil { - pkts = make([]*packet, n) - } else { - pkts = append(s.vecChannelsIn[:pkt.channel:pkt.channel], make([]*packet, 10)...) - } - - s.channelTimestamp = timestamp - s.vecChannelsIn = pkts - - for i := int(s.channelsAllocatedIn); i < len(s.channelTimestamp); i++ { - s.channelTimestamp[i] = 0 - } - for i := int(s.channelsAllocatedIn); i < int(n); i++ { - s.vecChannelsIn[i] = nil - } - s.channelsAllocatedIn = n - } - - size := packetSize[pkt.headerType] - switch { - case size == RTMP_LARGE_HEADER_SIZE: - pkt.hasAbsTimestamp = true - case size < RTMP_LARGE_HEADER_SIZE: - if s.vecChannelsIn[pkt.channel] != nil { - *pkt = *(s.vecChannelsIn[pkt.channel]) - } - } - - size-- - - if size > 0 { - err = C_ReadN(s, header[:size]) - if err != nil { - log.Println("C_RTMP_ReadPacket: failed to read rtmp packet heades.") - return err - } - } - - hSize := len(hbuf) - len(header) + size - - if size >= 3 { - pkt.timestamp = C_AMF_DecodeInt24(header[:3]) - - if size >= 6 { - pkt.bodySize = C_AMF_DecodeInt24(header[3:6]) - pkt.bytesRead = 0 - - if size > 6 { - pkt.packetType = header[6] - - if size == 11 { - pkt.info = C_DecodeInt32LE(header[7:11]) - } - } - } - } - - extendedTimestamp := pkt.timestamp == 0xffffff - if extendedTimestamp { - err = C_ReadN(s, header[size:size+4]) - if err != nil { - log.Println("RTMPRead_Packet: Failed to read extended timestamp") - return err - } - // TODO: port this - pkt.timestamp = C_AMF_DecodeInt32(header[size : size+4]) - hSize += 4 - } - - if pkt.bodySize > 0 && pkt.body == nil { - resizePacket(pkt, pkt.bodySize, (hbuf[0]&0xc0)>>6) - } - - toRead := int32(pkt.bodySize - pkt.bytesRead) - chunkSize := s.inChunkSize - - if toRead < chunkSize { - chunkSize = toRead - } - - if pkt.chunk != nil { - pkt.chunk.headerSize = int32(hSize) - copy(pkt.chunk.header[:], hbuf[:hSize]) - pkt.chunk.data = pkt.body[pkt.bytesRead : pkt.bytesRead+uint32(chunkSize)] - } - - err = C_ReadN(s, pkt.body[pkt.bytesRead:][:chunkSize]) - if err != nil { - log.Println("C_RTMP_ReadPacket: failed to read RTMP packet body") - return err - } - - pkt.bytesRead += uint32(chunkSize) - - // keep the packet as ref for other packets on this channel - if s.vecChannelsIn[pkt.channel] == nil { - s.vecChannelsIn[pkt.channel] = &packet{} - } - *(s.vecChannelsIn[pkt.channel]) = *pkt - - if extendedTimestamp { - s.vecChannelsIn[pkt.channel].timestamp = 0xffffff - } - - // TODO: port this - if C_RTMPPacket_IsReady(pkt) { - if !pkt.hasAbsTimestamp { - // timestamps seem to always be relative - pkt.timestamp += uint32(s.channelTimestamp[pkt.channel]) - } - s.channelTimestamp[pkt.channel] = int32(pkt.timestamp) - - s.vecChannelsIn[pkt.channel].body = nil - s.vecChannelsIn[pkt.channel].bytesRead = 0 - s.vecChannelsIn[pkt.channel].hasAbsTimestamp = false - } else { - pkt.body = nil /* so it won't be erased on free */ - } - return nil -} - -// resizePacket adjust the packet's storage to accommodate a body of the given size. -func resizePacket(pkt *packet, size uint32, ht uint8) { - buf := make([]byte, RTMP_MAX_HEADER_SIZE+size) - pkt.headerType = ht - pkt.header = buf - pkt.body = buf[RTMP_MAX_HEADER_SIZE:] -} - // int HandShake(RTMP* r, int FP9HandShake); // rtmp.c +3744 -func C_HandShake(s *Session, FP9HandShake int32) error { +func handshake(s *Session, FP9HandShake int32) error { var clientbuf [RTMP_SIG_SIZE + 1]byte clientsig := clientbuf[1:] @@ -1169,32 +880,32 @@ func C_HandShake(s *Session, FP9HandShake int32) error { clientbuf[0] = 0x03 // not encrypted - binary.BigEndian.PutUint32(clientsig, uint32(C_RTMP_GetTime())) + binary.BigEndian.PutUint32(clientsig, uint32(time.Now().UnixNano()/1000000)) copy(clientsig[4:8], []byte{0, 0, 0, 0}) for i := 8; i < RTMP_SIG_SIZE; i++ { clientsig[i] = byte(rand.Intn(256)) } - err := C_WriteN(s, clientbuf[:]) + err := writeN(s, clientbuf[:]) if err != nil { return err } var typ [1]byte - err = C_ReadN(s, typ[:]) + err = readN(s, typ[:]) if err != nil { return err } if debugMode { - log.Printf("C_HandShake: Type answer: %v\n", typ[0]) + log.Printf("handshake: Type answer: %v\n", typ[0]) } if typ[0] != clientbuf[0] { - log.Printf("C_HandShake: type mismatch: client sent %v, server sent: %v\n", + log.Printf("handshake: type mismatch: client sent %v, server sent: %v\n", clientbuf[0], typ) } - err = C_ReadN(s, serversig[:]) + err = readN(s, serversig[:]) if err != nil { return err } @@ -1207,12 +918,12 @@ func C_HandShake(s *Session, FP9HandShake int32) error { // serversig[4], serversig[5], serversig[6], serversig[7]) // 2nd part of handshake - err = C_WriteN(s, serversig[:]) + err = writeN(s, serversig[:]) if err != nil { return err } - err = C_ReadN(s, serversig[:]) + err = readN(s, serversig[:]) if err != nil { return err } @@ -1224,233 +935,8 @@ func C_HandShake(s *Session, FP9HandShake int32) error { return nil } -// int RTMP_SendPacket(RTMP* r, RTMPPacket* packet, int queue); -// rtmp.c +3896 -func C_RTMP_SendPacket(s *Session, pkt *packet, queue int) error { - var prevPkt *packet - var last int - - if pkt.channel >= s.channelsAllocatedOut { - n := int(pkt.channel + 10) - - var pkts []*packet - if s.vecChannelsOut == nil { - pkts = make([]*packet, n) - } else { - pkts = append(s.vecChannelsOut[:pkt.channel:pkt.channel], make([]*packet, 10)...) - } - s.vecChannelsOut = pkts - - for i := int(s.channelsAllocatedOut); i < n; i++ { - s.vecChannelsOut[i] = nil - } - - s.channelsAllocatedOut = int32(n) - } - prevPkt = s.vecChannelsOut[pkt.channel] - - if prevPkt != nil && pkt.headerType != RTMP_PACKET_SIZE_LARGE { - // compress a bit by using the prev packet's attributes - if prevPkt.bodySize == pkt.bodySize && prevPkt.packetType == pkt.packetType && pkt.headerType == RTMP_PACKET_SIZE_MEDIUM { - pkt.headerType = RTMP_PACKET_SIZE_SMALL - } - - if prevPkt.timestamp == pkt.timestamp && pkt.headerType == RTMP_PACKET_SIZE_SMALL { - pkt.headerType = RTMP_PACKET_SIZE_MINIMUM - } - - last = int(prevPkt.timestamp) - } - - if pkt.headerType > 3 { - log.Printf("Sanity failed! trying to send header of type: 0x%02x.", - pkt.headerType) - return errInvalidHeader - } - - var headBytes []byte - var origIdx int - if pkt.body != nil { - // Span from -packetsize for the type to the start of the body. - headBytes = pkt.header - origIdx = RTMP_MAX_HEADER_SIZE - packetSize[pkt.headerType] - } else { - // Allocate a new header and allow 6 bytes of movement backward. - var hbuf [RTMP_MAX_HEADER_SIZE]byte - headBytes = hbuf[:] - origIdx = 6 - } - - var cSize int - switch { - case pkt.channel > 319: - cSize = 2 - case pkt.channel > 63: - cSize = 1 - } - - hSize := packetSize[pkt.headerType] - if cSize != 0 { - origIdx -= cSize - hSize += cSize - } - - var ts uint32 - if prevPkt != nil { - ts = uint32(int(pkt.timestamp) - last) - } - if ts >= 0xffffff { - origIdx -= 4 - hSize += 4 - log.Printf("Larger timestamp than 24-bit: 0x%v", ts) - } - - headerIdx := origIdx - - c := pkt.headerType << 6 - switch cSize { - case 0: - c |= byte(pkt.channel) - case 1: - // Do nothing. - case 2: - c |= 1 - } - headBytes[headerIdx] = c - headerIdx++ - - if cSize != 0 { - tmp := pkt.channel - 64 - headBytes[headerIdx] = byte(tmp & 0xff) - headerIdx++ - - if cSize == 2 { - headBytes[headerIdx] = byte(tmp >> 8) - headerIdx++ - } - } - - if packetSize[pkt.headerType] > 1 { - res := ts - if ts > 0xffffff { - res = 0xffffff - } - C_AMF_EncodeInt24(headBytes[headerIdx:], int32(res)) - headerIdx += 3 // 24bits - } - - if packetSize[pkt.headerType] > 4 { - C_AMF_EncodeInt24(headBytes[headerIdx:], int32(pkt.bodySize)) - headerIdx += 3 // 24bits - headBytes[headerIdx] = pkt.packetType - headerIdx++ - } - - if packetSize[pkt.headerType] > 8 { - n := int(C_EncodeInt32LE(headBytes[headerIdx:headerIdx+4], pkt.info)) - headerIdx += n - } - - if ts >= 0xffffff { - C_AMF_EncodeInt32(headBytes[headerIdx:], int32(ts)) - headerIdx += 4 // 32bits - } - - size := int(pkt.bodySize) - chunkSize := int(s.outChunkSize) - - if debugMode { - log.Printf("C_RTMP_SendPacket: %v->%v, size=%v", s.link.conn.LocalAddr(), s.link.conn.RemoteAddr(), size) - } - - // Send the previously deferred packet if combining it with the next packet would exceed the chunk size. - if s.defered != nil && len(s.defered)+size+hSize > chunkSize { - err := C_WriteN(s, s.defered) - if err != nil { - return err - } - s.defered = nil - } - - // TODO(kortschak): Rewrite this horrific peice of premature optimisation. - // NB: RTMP wants packets in chunks which are 128 bytes by default, but the server may request a different size. - for size+hSize != 0 { - if s.defered == nil && pkt.packetType == RTMP_PACKET_TYPE_AUDIO && size < chunkSize { - s.defered = headBytes[origIdx:][:size+hSize] - break - } - if chunkSize > size { - chunkSize = size - } - bytes := headBytes[origIdx:][:chunkSize+hSize] - if s.defered != nil { - // Prepend the previously deferred packet and write it with the current one. - bytes = append(s.defered, bytes...) - } - err := C_WriteN(s, bytes) - if err != nil { - return err - } - s.defered = nil - - size -= chunkSize - origIdx += chunkSize + hSize - hSize = 0 - - if size > 0 { - origIdx -= 1 + cSize - hSize = 1 + cSize - - if ts >= 0xffffff { - origIdx -= 4 - hSize += 4 - } - - headBytes[origIdx] = 0xc0 | c - - if cSize != 0 { - tmp := int(pkt.channel) - 64 - headBytes[origIdx+1] = byte(tmp) - - if cSize == 2 { - headBytes[origIdx+2] = byte(tmp >> 8) - } - } - if ts >= 0xffffff { - extendedTimestamp := headBytes[origIdx+1+cSize:] - C_AMF_EncodeInt32(extendedTimestamp[:4], int32(ts)) - } - } - } - - // We invoked a remote method - // TODO: port the const - if pkt.packetType == RTMP_PACKET_TYPE_INVOKE { - buf := pkt.body[1:] - meth := C_AMF_DecodeString(buf) - - if debugMode { - log.Printf("invoking %v", meth) - } - // keep it in call queue till result arrives - if queue != 0 { - buf = buf[3+len(meth):] - txn := int32(C_AMF_DecodeNumber(buf[:8])) - s.methodCalls = append(s.methodCalls, method{name: meth, num: txn}) - } - } - - if s.vecChannelsOut[pkt.channel] == nil { - s.vecChannelsOut[pkt.channel] = &packet{} - } - *(s.vecChannelsOut[pkt.channel]) = *pkt - - return nil -} - -/// int RTMP_Write(RTMP* r, const char* buf, int size); -// rtmp.c +5136 -func C_RTMP_Write(s *Session, buf []byte) error { +// write prepares data to write then sends it. +func (s *Session) write(buf []byte) error { var pkt = &s.write var enc []byte size := len(buf) @@ -1511,7 +997,7 @@ func C_RTMP_Write(s *Session, buf []byte) error { pkt.bytesRead += uint32(num) buf = buf[num:] if pkt.bytesRead == pkt.bodySize { - err := C_RTMP_SendPacket(s, pkt, 0) + err := sendPacket(s, pkt, 0) pkt.body = nil pkt.bytesRead = 0 if err != nil { @@ -1525,21 +1011,3 @@ func C_RTMP_Write(s *Session, buf []byte) error { } return nil } - -var rtmpErrs = [...]string{ - 1: "rtmp: not connected", - 2: "rtmp: write error", - 3: "rtmp: not started", -} - -type Err uint - -func (e Err) Error() string { - if 0 <= int(e) && int(e) < len(rtmpErrs) { - s := rtmpErrs[e] - if s != "" { - return s - } - } - return "rtmp: " + strconv.Itoa(int(e)) -} diff --git a/rtmp/rtmp_headers.go b/rtmp/rtmp_headers.go index 2260f01d..9ff28427 100644 --- a/rtmp/rtmp_headers.go +++ b/rtmp/rtmp_headers.go @@ -33,7 +33,9 @@ LICENSE */ package rtmp -import "net" +import ( + "net" +) const ( RTMPT_OPEN = iota @@ -42,28 +44,6 @@ const ( RTMPT_CLOSE ) -const ( - RTMP_PACKET_TYPE_CHUNK_SIZE = 0x01 - RTMP_PACKET_TYPE_BYTES_READ_REPORT = 0x03 - RTMP_PACKET_TYPE_CONTROL = 0x04 - RTMP_PACKET_TYPE_SERVER_BW = 0x05 - RTMP_PACKET_TYPE_CLIENT_BW = 0x06 - RTMP_PACKET_TYPE_AUDIO = 0x08 - RTMP_PACKET_TYPE_VIDEO = 0x09 - RTMP_PACKET_TYPE_FLEX_STREAM_SEND = 0x0F - RTMP_PACKET_TYPE_FLEX_SHARED_OBJECT = 0x10 - RTMP_PACKET_TYPE_FLEX_MESSAGE = 0x11 - RTMP_PACKET_TYPE_INFO = 0x12 - RTMP_PACKET_TYPE_INVOKE = 0x14 - RTMP_PACKET_TYPE_FLASH_VIDEO = 0x16 -) - -const ( - RTMP_PACKET_SIZE_LARGE = 0 - RTMP_PACKET_SIZE_MEDIUM = 1 - RTMP_PACKET_SIZE_SMALL = 2 - RTMP_PACKET_SIZE_MINIMUM = 3 -) const ( RTMP_READ_HEADER = 0x01 RTMP_READ_RESUME = 0x02 @@ -114,26 +94,6 @@ const ( RTMP_MAX_HEADER_SIZE = 18 ) -type chunk struct { - headerSize int32 - data []byte - header [RTMP_MAX_HEADER_SIZE]byte -} - -type packet struct { - headerType uint8 - packetType uint8 - hasAbsTimestamp bool - channel int32 - timestamp uint32 - info int32 - bodySize uint32 - bytesRead uint32 - chunk *chunk - header []byte - body []byte -} - type link struct { host string playpath string diff --git a/rtmp/session.go b/rtmp/session.go index b6b066f4..869a9733 100644 --- a/rtmp/session.go +++ b/rtmp/session.go @@ -90,20 +90,20 @@ func (s *Session) Open() error { // start does the heavylifting for Open(). func (s *Session) start() error { s.init() - err := C_RTMP_SetupURL(s, s.url) + err := setupURL(s, s.url) if err != nil { s.close() return err } - C_RTMP_EnableWrite(s) - err = C_RTMP_Connect(s, nil) + s.enableWrite() + err = connect(s, nil) if err != nil { s.close() return err } - err = C_RTMP_ConnectStream(s, 0) + err = connectStream(s, 0) if err != nil { s.close() return err @@ -128,7 +128,7 @@ func (s *Session) init() { // Close terminates the rtmp connection, func (s *Session) Close() error { if !s.isConnected() { - return Err(3) + return errNotConnected } s.close() return nil @@ -140,9 +140,9 @@ func (s *Session) close() { if s.isConnected() { if s.streamID > 0 { if s.link.protocol&RTMP_FEATURE_WRITE != 0 { - C_SendFCUnpublish(s) + sendFCUnpublish(s) } - C_SendDeleteStream(s, float64(s.streamID)) + sendDeleteStream(s, float64(s.streamID)) } s.link.conn.Close() } @@ -152,12 +152,11 @@ func (s *Session) close() { // Write writes a frame (flv tag) to the rtmp connection. func (s *Session) Write(data []byte) (int, error) { if !s.isConnected() { - return 0, Err(1) + return 0, errNotConnected } - err := C_RTMP_Write(s, data) + err := s.write(data) if err != nil { - // TODO: propagate err - return 0, Err(2) + return 0, err } return len(data), nil } @@ -166,3 +165,8 @@ func (s *Session) Write(data []byte) (int, error) { func (s *Session) isConnected() bool { return s.link.conn != nil } + +// enableWrite enables the current session for writing. +func (s *Session) enableWrite() { + s.link.protocol |= RTMP_FEATURE_WRITE +}