diff --git a/rtmp/amf_headers.go b/rtmp/amf_headers.go index 4fcc86f6..453b38de 100644 --- a/rtmp/amf_headers.go +++ b/rtmp/amf_headers.go @@ -9,7 +9,7 @@ AUTHORS Saxon Nelson-Milton <saxon@ausocean.org> LICENSE - amf_headers.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + amf_headers.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the diff --git a/rtmp/packet.go b/rtmp/packet.go new file mode 100644 index 00000000..95efb658 --- /dev/null +++ b/rtmp/packet.go @@ -0,0 +1,494 @@ +/* +NAME + packet.go + +DESCRIPTION + See Readme.md + +AUTHORS + Saxon Nelson-Milton <saxon@ausocean.org> + Dan Kortschak <dan@ausocean.org> + Alan Noble <alan@ausocean.org> + +LICENSE + packet.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. + + Derived from librtmp under the GNU Lesser General Public License 2.1 + Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org + Copyright (C) 2008-2009 Andrej Stepanchuk + Copyright (C) 2009-2010 Howard Chu +*/ + +package rtmp + +import ( + "encoding/binary" + "log" +) + +const ( + RTMP_PACKET_TYPE_CHUNK_SIZE = 0x01 + RTMP_PACKET_TYPE_BYTES_READ_REPORT = 0x03 + RTMP_PACKET_TYPE_CONTROL = 0x04 + RTMP_PACKET_TYPE_SERVER_BW = 0x05 + RTMP_PACKET_TYPE_CLIENT_BW = 0x06 + RTMP_PACKET_TYPE_AUDIO = 0x08 + RTMP_PACKET_TYPE_VIDEO = 0x09 + RTMP_PACKET_TYPE_FLEX_STREAM_SEND = 0x0F + RTMP_PACKET_TYPE_FLEX_SHARED_OBJECT = 0x10 + RTMP_PACKET_TYPE_FLEX_MESSAGE = 0x11 + RTMP_PACKET_TYPE_INFO = 0x12 + RTMP_PACKET_TYPE_INVOKE = 0x14 + RTMP_PACKET_TYPE_FLASH_VIDEO = 0x16 +) + +const ( + RTMP_PACKET_SIZE_LARGE = 0 + RTMP_PACKET_SIZE_MEDIUM = 1 + RTMP_PACKET_SIZE_SMALL = 2 + RTMP_PACKET_SIZE_MINIMUM = 3 +) + +const ( + RTMP_CHANNEL_BYTES_READ = 0x02 + RTMP_CHANNEL_CONTROL = 0x03 + RTMP_CHANNEL_SOURCE = 0x04 +) + +// packetSize defines valid packet sizes. +var packetSize = [...]int{12, 8, 4, 1} + +// packet defines an RTMP packet. +type packet struct { + headerType uint8 + packetType uint8 + channel int32 + hasAbsTimestamp bool + timestamp uint32 + info int32 + bodySize uint32 + bytesRead uint32 + chunk *chunk + header []byte + body []byte +} + +// chunk defines an RTMP packet chunk. +type chunk struct { + headerSize int32 + data []byte + header [RTMP_MAX_HEADER_SIZE]byte +} + +// ToDo: Consider making the following functions into methods. +// readPacket reads a packet. +func readPacket(s *Session, pkt *packet) error { + var hbuf [RTMP_MAX_HEADER_SIZE]byte + header := hbuf[:] + + err := readN(s, header[:1]) + if err != nil { + log.Println("readPacket: failed to read RTMP packet header!") + return err + } + pkt.headerType = (header[0] & 0xc0) >> 6 + pkt.channel = int32(header[0] & 0x3f) + header = header[1:] + + switch { + case pkt.channel == 0: + err = readN(s, header[:1]) + if err != nil { + log.Println("readPacket: failed to read rtmp packet header 2nd byte.") + return err + } + header = header[1:] + pkt.channel = int32(header[0]) + 64 + + case pkt.channel == 1: + err = readN(s, header[:2]) + if err != nil { + log.Println("readPacket: failed to read RTMP packet 3rd byte") + return err + } + header = header[2:] + pkt.channel = int32(binary.BigEndian.Uint16(header[:2])) + 64 + + } + + if pkt.channel >= s.channelsAllocatedIn { + n := pkt.channel + 10 + timestamp := append(s.channelTimestamp, make([]int32, 10)...) + + var pkts []*packet + if s.vecChannelsIn == nil { + pkts = make([]*packet, n) + } else { + pkts = append(s.vecChannelsIn[:pkt.channel:pkt.channel], make([]*packet, 10)...) + } + + s.channelTimestamp = timestamp + s.vecChannelsIn = pkts + + for i := int(s.channelsAllocatedIn); i < len(s.channelTimestamp); i++ { + s.channelTimestamp[i] = 0 + } + for i := int(s.channelsAllocatedIn); i < int(n); i++ { + s.vecChannelsIn[i] = nil + } + s.channelsAllocatedIn = n + } + + size := packetSize[pkt.headerType] + switch { + case size == RTMP_LARGE_HEADER_SIZE: + pkt.hasAbsTimestamp = true + case size < RTMP_LARGE_HEADER_SIZE: + if s.vecChannelsIn[pkt.channel] != nil { + *pkt = *(s.vecChannelsIn[pkt.channel]) + } + } + + size-- + + if size > 0 { + err = readN(s, header[:size]) + if err != nil { + log.Println("readPacket: failed to read rtmp packet heades.") + return err + } + } + + hSize := len(hbuf) - len(header) + size + + if size >= 3 { + pkt.timestamp = C_AMF_DecodeInt24(header[:3]) + + if size >= 6 { + pkt.bodySize = C_AMF_DecodeInt24(header[3:6]) + pkt.bytesRead = 0 + + if size > 6 { + pkt.packetType = header[6] + + if size == 11 { + pkt.info = decodeInt32LE(header[7:11]) + } + } + } + } + + extendedTimestamp := pkt.timestamp == 0xffffff + if extendedTimestamp { + err = readN(s, header[size:size+4]) + if err != nil { + log.Println("readPacket: Failed to read extended timestamp") + return err + } + // TODO: port this + pkt.timestamp = C_AMF_DecodeInt32(header[size : size+4]) + hSize += 4 + } + + if pkt.bodySize > 0 && pkt.body == nil { + resizePacket(pkt, pkt.bodySize, (hbuf[0]&0xc0)>>6) + } + + toRead := int32(pkt.bodySize - pkt.bytesRead) + chunkSize := s.inChunkSize + + if toRead < chunkSize { + chunkSize = toRead + } + + if pkt.chunk != nil { + pkt.chunk.headerSize = int32(hSize) + copy(pkt.chunk.header[:], hbuf[:hSize]) + pkt.chunk.data = pkt.body[pkt.bytesRead : pkt.bytesRead+uint32(chunkSize)] + } + + err = readN(s, pkt.body[pkt.bytesRead:][:chunkSize]) + if err != nil { + log.Println("readPacket: failed to read RTMP packet body") + return err + } + + pkt.bytesRead += uint32(chunkSize) + + // keep the packet as ref for other packets on this channel + if s.vecChannelsIn[pkt.channel] == nil { + s.vecChannelsIn[pkt.channel] = &packet{} + } + *(s.vecChannelsIn[pkt.channel]) = *pkt + + if extendedTimestamp { + s.vecChannelsIn[pkt.channel].timestamp = 0xffffff + } + + if pkt.bytesRead != pkt.bodySize { + panic("readPacket: bytesRead != bodySize") + } + + if !pkt.hasAbsTimestamp { + // timestamps seem to always be relative + pkt.timestamp += uint32(s.channelTimestamp[pkt.channel]) + } + s.channelTimestamp[pkt.channel] = int32(pkt.timestamp) + + s.vecChannelsIn[pkt.channel].body = nil + s.vecChannelsIn[pkt.channel].bytesRead = 0 + s.vecChannelsIn[pkt.channel].hasAbsTimestamp = false + return nil +} + +// resizePacket adjust the packet's storage to accommodate a body of the given size. +func resizePacket(pkt *packet, size uint32, ht uint8) { + buf := make([]byte, RTMP_MAX_HEADER_SIZE+size) + pkt.headerType = ht + pkt.header = buf + pkt.body = buf[RTMP_MAX_HEADER_SIZE:] +} + +// sendPacket sends a packet. +func sendPacket(s *Session, pkt *packet, queue bool) error { + var prevPkt *packet + var last int + + if pkt.channel >= s.channelsAllocatedOut { + n := int(pkt.channel + 10) + + var pkts []*packet + if s.vecChannelsOut == nil { + pkts = make([]*packet, n) + } else { + pkts = append(s.vecChannelsOut[:pkt.channel:pkt.channel], make([]*packet, 10)...) + } + s.vecChannelsOut = pkts + + for i := int(s.channelsAllocatedOut); i < n; i++ { + s.vecChannelsOut[i] = nil + } + + s.channelsAllocatedOut = int32(n) + } + prevPkt = s.vecChannelsOut[pkt.channel] + + if prevPkt != nil && pkt.headerType != RTMP_PACKET_SIZE_LARGE { + // compress a bit by using the prev packet's attributes + if prevPkt.bodySize == pkt.bodySize && prevPkt.packetType == pkt.packetType && pkt.headerType == RTMP_PACKET_SIZE_MEDIUM { + pkt.headerType = RTMP_PACKET_SIZE_SMALL + } + + if prevPkt.timestamp == pkt.timestamp && pkt.headerType == RTMP_PACKET_SIZE_SMALL { + pkt.headerType = RTMP_PACKET_SIZE_MINIMUM + } + + last = int(prevPkt.timestamp) + } + + if pkt.headerType > 3 { + log.Printf("Sanity failed! trying to send header of type: 0x%02x.", + pkt.headerType) + return errInvalidHeader + } + + var headBytes []byte + var origIdx int + if pkt.body != nil { + // Span from -packetsize for the type to the start of the body. + headBytes = pkt.header + origIdx = RTMP_MAX_HEADER_SIZE - packetSize[pkt.headerType] + } else { + // Allocate a new header and allow 6 bytes of movement backward. + var hbuf [RTMP_MAX_HEADER_SIZE]byte + headBytes = hbuf[:] + origIdx = 6 + } + + var cSize int + switch { + case pkt.channel > 319: + cSize = 2 + case pkt.channel > 63: + cSize = 1 + } + + hSize := packetSize[pkt.headerType] + if cSize != 0 { + origIdx -= cSize + hSize += cSize + } + + var ts uint32 + if prevPkt != nil { + ts = uint32(int(pkt.timestamp) - last) + } + if ts >= 0xffffff { + origIdx -= 4 + hSize += 4 + log.Printf("Larger timestamp than 24-bit: 0x%v", ts) + } + + headerIdx := origIdx + + c := pkt.headerType << 6 + switch cSize { + case 0: + c |= byte(pkt.channel) + case 1: + // Do nothing. + case 2: + c |= 1 + } + headBytes[headerIdx] = c + headerIdx++ + + if cSize != 0 { + tmp := pkt.channel - 64 + headBytes[headerIdx] = byte(tmp & 0xff) + headerIdx++ + + if cSize == 2 { + headBytes[headerIdx] = byte(tmp >> 8) + headerIdx++ + } + } + + if packetSize[pkt.headerType] > 1 { + res := ts + if ts > 0xffffff { + res = 0xffffff + } + C_AMF_EncodeInt24(headBytes[headerIdx:], int32(res)) + headerIdx += 3 // 24bits + } + + if packetSize[pkt.headerType] > 4 { + C_AMF_EncodeInt24(headBytes[headerIdx:], int32(pkt.bodySize)) + headerIdx += 3 // 24bits + headBytes[headerIdx] = pkt.packetType + headerIdx++ + } + + if packetSize[pkt.headerType] > 8 { + n := int(encodeInt32LE(headBytes[headerIdx:headerIdx+4], pkt.info)) + headerIdx += n + } + + if ts >= 0xffffff { + C_AMF_EncodeInt32(headBytes[headerIdx:], int32(ts)) + headerIdx += 4 // 32bits + } + + size := int(pkt.bodySize) + chunkSize := int(s.outChunkSize) + + if debugMode { + log.Printf("sendPacket: %v->%v, size=%v", s.link.conn.LocalAddr(), s.link.conn.RemoteAddr(), size) + } + + // Send the previously deferred packet if combining it with the next packet would exceed the chunk size. + if s.defered != nil && len(s.defered)+size+hSize > chunkSize { + err := writeN(s, s.defered) + if err != nil { + return err + } + s.defered = nil + } + + // TODO(kortschak): Rewrite this horrific peice of premature optimisation. + // NB: RTMP wants packets in chunks which are 128 bytes by default, but the server may request a different size. + for size+hSize != 0 { + if s.defered == nil && pkt.packetType == RTMP_PACKET_TYPE_AUDIO && size < chunkSize { + s.defered = headBytes[origIdx:][:size+hSize] + break + } + if chunkSize > size { + chunkSize = size + } + bytes := headBytes[origIdx:][:chunkSize+hSize] + if s.defered != nil { + // Prepend the previously deferred packet and write it with the current one. + bytes = append(s.defered, bytes...) + } + err := writeN(s, bytes) + if err != nil { + return err + } + s.defered = nil + + size -= chunkSize + origIdx += chunkSize + hSize + hSize = 0 + + if size > 0 { + origIdx -= 1 + cSize + hSize = 1 + cSize + + if ts >= 0xffffff { + origIdx -= 4 + hSize += 4 + } + + headBytes[origIdx] = 0xc0 | c + + if cSize != 0 { + tmp := int(pkt.channel) - 64 + headBytes[origIdx+1] = byte(tmp) + + if cSize == 2 { + headBytes[origIdx+2] = byte(tmp >> 8) + } + } + if ts >= 0xffffff { + extendedTimestamp := headBytes[origIdx+1+cSize:] + C_AMF_EncodeInt32(extendedTimestamp[:4], int32(ts)) + } + } + } + + // We invoked a remote method + if pkt.packetType == RTMP_PACKET_TYPE_INVOKE { + buf := pkt.body[1:] + meth := C_AMF_DecodeString(buf) + + if debugMode { + log.Printf("invoking %v", meth) + } + // keep it in call queue till result arrives + if queue { + buf = buf[3+len(meth):] + txn := int32(C_AMF_DecodeNumber(buf[:8])) + s.methodCalls = append(s.methodCalls, method{name: meth, num: txn}) + } + } + + if s.vecChannelsOut[pkt.channel] == nil { + s.vecChannelsOut[pkt.channel] = &packet{} + } + *(s.vecChannelsOut[pkt.channel]) = *pkt + + return nil +} + +func decodeInt32LE(data []byte) int32 { + return int32(data[3])<<24 | int32(data[2])<<16 | int32(data[1])<<8 | int32(data[0]) +} + +func encodeInt32LE(dst []byte, v int32) int32 { + binary.LittleEndian.PutUint32(dst, uint32(v)) + return 4 +} diff --git a/rtmp/parseurl.go b/rtmp/parseurl.go index 8faa6414..97110eb8 100644 --- a/rtmp/parseurl.go +++ b/rtmp/parseurl.go @@ -40,9 +40,8 @@ import ( "strings" ) -// int RTMP_ParseURL(const char *url, int *protocol, AVal *host, unsigned int *port, AVal *playpath, AVal *app); -// parseurl.c +33 -func C_RTMP_ParseURL(addr string) (protocol int32, host string, port uint16, app, playpath string, err error) { +// parseURL parses an RTMP URL (ok, technically it is lexing). +func parseURL(addr string) (protocol int32, host string, port uint16, app, playpath string, err error) { u, err := url.Parse(addr) if err != nil { log.Printf("failed to parse addr: %v", err) diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index ef36a3f1..b1515b6d 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -8,9 +8,10 @@ DESCRIPTION AUTHORS Saxon Nelson-Milton <saxon@ausocean.org> Dan Kortschak <dan@ausocean.org> + Alan Noble <alan@ausocean.org> LICENSE - rtmp.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + rtmp.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the @@ -48,12 +49,11 @@ import ( const ( minDataSize = 11 - debugMode = false + debugMode = true length = 512 ) const ( - // av_setDataFrame is a static const global in rtmp.c setDataFrame = "@setDataFrame" av__checkbw = "_checkbw" @@ -107,30 +107,23 @@ const ( av_videoFunction = "videoFunction" ) -var RTMPT_cmds = []string{ - "open", - "send", - "idle", - "close", +// RTMP protocol strings. +var rtmpProtocolStrings = [...]string{ + "rtmp", + "rtmpt", + "rtmpe", + "rtmpte", + "rtmps", + "rtmpts", + "", + "", + "rtmfp", } -var ( - packetSize = [...]int{12, 8, 4, 1} - RTMPProtocolStringsLower = [...]string{ - "rtmp", - "rtmpt", - "rtmpe", - "rtmpte", - "rtmps", - "rtmpts", - "", - "", - "rtmfp", - } -) - +// RTMP errors. var ( errUnknownScheme = errors.New("rtmp: unknown scheme") + errNotConnected = errors.New("rtmp: not connected") errHandshake = errors.New("rtmp: handshake failed") errConnSend = errors.New("rtmp: connection send error") errConnStream = errors.New("rtmp: connection stream error") @@ -142,245 +135,124 @@ var ( errCopying = errors.New("rtmp: copying error") ) -func startSession(rtmp *C_RTMP, u string, timeout uint) (*C_RTMP, error) { - rtmp = C_RTMP_Alloc() - C_RTMP_Init(rtmp) - rtmp.Link.timeout = timeout - err := C_RTMP_SetupURL(rtmp, u) - if err != nil { - C_RTMP_Close(rtmp) - return nil, err - } - - C_RTMP_EnableWrite(rtmp) - rtmp.m_nBufferMS = 3600 * 1000 - err = C_RTMP_Connect(rtmp, nil) - if err != nil { - C_RTMP_Close(rtmp) - return nil, err - } - - err = C_RTMP_ConnectStream(rtmp, 0) - if err != nil { - C_RTMP_Close(rtmp) - return nil, err - } - - return rtmp, nil -} - -func endSession(rtmp *C_RTMP) uint32 { - if rtmp == nil { - return 3 - } - - C_RTMP_Close(rtmp) - return 0 -} - -// uint32_t RTMP_GetTime(); -// rtmp.c +156 -func C_RTMP_GetTime() int32 { - return int32(time.Now().UnixNano() / 1000000) -} - -// int RTMPPacket_Alloc(RTMPPacket* p, uint32_t nSize); -// rtmp.c +189 -func C_RTMPPacket_Alloc(p *C_RTMPPacket, nSize uint32) { - buf := make([]byte, RTMP_MAX_HEADER_SIZE+nSize) - p.m_header = buf - p.m_body = buf[RTMP_MAX_HEADER_SIZE:] - p.m_nBytesRead = 0 -} - -// void RTMPPacket_Free(RTMPPacket* p); -// rtmp.c +203 -func C_RTMPPacket_Free(p *C_RTMPPacket) { - if p.m_body != nil { - p.m_body = nil - } -} - -// RTMP* RTMP_IsConnected(); -// rtmp.c +317 -func C_RTMP_Alloc() *C_RTMP { - return &C_RTMP{} -} - -// void RTMP_Init(RTMP *r); -// rtmp.c +329 -func C_RTMP_Init(r *C_RTMP) { - *r = C_RTMP{} - r.m_inChunkSize = RTMP_DEFAULT_CHUNKSIZE - r.m_outChunkSize = RTMP_DEFAULT_CHUNKSIZE - r.m_nBufferMS = 30000 - r.m_nClientBW = 2500000 - r.m_nClientBW2 = 2 - r.m_nServerBW = 2500000 - r.m_fAudioCodecs = 3191.0 - r.m_fVideoCodecs = 252.0 - r.Link.timeout = 30 - r.Link.swfAge = 30 -} - -// void RTMP_EnableWrite(RTMP *r); -// rtmp.c +351 -func C_RTMP_EnableWrite(r *C_RTMP) { - r.Link.protocol |= RTMP_FEATURE_WRITE -} - -// int RTMP_IsConnected(RTMP *r); -// rtmp.c +363 -func C_RTMP_IsConnected(r *C_RTMP) bool { - return r.Link.conn != nil -} - -// void RTMP_SetBufferMS(RTMP *r, int size); -// rtmp.c +381 -// DELETED - -// void SocksSetup(RTMP *r, C_AVal* sockshost); -// rtmp.c +410 -// DELETED - -// int RTMP_SetupURL(RTMP *r, char* url); -// rtmp.c +757 -// NOTE: code dealing with rtmp over http has been disregarded -func C_RTMP_SetupURL(r *C_RTMP, addr string) (err error) { - r.Link.protocol, r.Link.host, r.Link.port, r.Link.app, r.Link.playpath0, err = C_RTMP_ParseURL(addr) +// setupURL parses the RTMP URL. +func setupURL(s *Session, addr string) (err error) { + s.link.protocol, s.link.host, s.link.port, s.link.app, s.link.playpath, err = parseURL(addr) if err != nil { return err } - r.Link.playpath = r.Link.playpath0 - if r.Link.tcUrl == "" { - if r.Link.app != "" { - r.Link.tcUrl = fmt.Sprintf("%v://%v:%v/%v", - RTMPProtocolStringsLower[r.Link.protocol], r.Link.host, r.Link.port, r.Link.app) - r.Link.lFlags |= RTMP_LF_FTCU + if s.link.tcUrl == "" { + if s.link.app != "" { + s.link.tcUrl = fmt.Sprintf("%v://%v:%v/%v", + rtmpProtocolStrings[s.link.protocol], s.link.host, s.link.port, s.link.app) + s.link.lFlags |= RTMP_LF_FTCU } else { - r.Link.tcUrl = addr + s.link.tcUrl = addr } } - if r.Link.port == 0 { + if s.link.port == 0 { switch { - case (r.Link.protocol & RTMP_FEATURE_SSL) != 0: - r.Link.port = 433 - case (r.Link.protocol & RTMP_FEATURE_HTTP) != 0: - r.Link.port = 80 + case (s.link.protocol & RTMP_FEATURE_SSL) != 0: + s.link.port = 433 + case (s.link.protocol & RTMP_FEATURE_HTTP) != 0: + s.link.port = 80 default: - r.Link.port = 1935 + s.link.port = 1935 } } return nil } -// int RTMP_Connect(RTMP *r, RTMPPacket* cp); -// rtmp.c +1032 -func C_RTMP_Connect(r *C_RTMP, cp *C_RTMPPacket) error { - addr, err := net.ResolveTCPAddr("tcp4", r.Link.host+":"+strconv.Itoa(int(r.Link.port))) +// connect establishes an RTMP connection. +func connect(s *Session) error { + addr, err := net.ResolveTCPAddr("tcp4", s.link.host+":"+strconv.Itoa(int(s.link.port))) if err != nil { return err } - r.Link.conn, err = net.DialTCP("tcp4", nil, addr) + s.link.conn, err = net.DialTCP("tcp4", nil, addr) if err != nil { return err } if debugMode { log.Println("... connected, handshaking...") } - err = C_HandShake(r, 1) + err = handshake(s) if err != nil { - log.Println("C_RTMP_Connect1: handshake failed!") + log.Println("connect: handshake failed") return errHandshake } if debugMode { log.Println("... handshaked...") } - err = C_SendConnectPacket(r, cp) + err = sendConnectPacket(s) if err != nil { - log.Println("RTMP connect failed!") + log.Println("connect: sendConnect failed") return errConnSend } return nil } -// int RTMP_Connect1(RTMP* r, RTMPPacket* cp); -// rtmp.c +978 -// DELETED - subsumed by RTMP_Connect +// connectStream reads a packet and handles it +func connectStream(s *Session) error { + var pkt packet -// int RTMP_ConnectStream(RTMP* r, int seekTime); -// rtmp.c +1099 -// Side effects: r.m_bPlaying is set true upon successful connection -func C_RTMP_ConnectStream(r *C_RTMP, seekTime int32) error { - var packet C_RTMPPacket - - if seekTime > 0 { - r.Link.seekTime = seekTime - } - - r.m_mediaChannel = 0 - - for !r.m_bPlaying && C_RTMP_IsConnected(r) { - err := C_RTMP_ReadPacket(r, &packet) + for !s.isPlaying && s.isConnected() { + err := readPacket(s, &pkt) if err != nil { break } - // TODO: port is ready - if C_RTMPPacket_IsReady(&packet) { - if packet.m_nBodySize == 0 { - continue - } - if packet.m_packetType == RTMP_PACKET_TYPE_AUDIO || - packet.m_packetType == RTMP_PACKET_TYPE_VIDEO || - packet.m_packetType == RTMP_PACKET_TYPE_INFO { - log.Println("C_RTMP_ConnectStream: got packet before play()! Ignoring.") - C_RTMPPacket_Free(&packet) - continue - } - - C_RTMP_ClientPacket(r, &packet) - C_RTMPPacket_Free(&packet) + if pkt.bodySize == 0 { + continue } + + if pkt.packetType == RTMP_PACKET_TYPE_AUDIO || + pkt.packetType == RTMP_PACKET_TYPE_VIDEO || + pkt.packetType == RTMP_PACKET_TYPE_INFO { + log.Println("connectStream: got packet before play()! Ignoring.") + pkt.body = nil + continue + } + + handlePacket(s, &pkt) + pkt.body = nil } - if !r.m_bPlaying { + if !s.isPlaying { return errConnStream } return nil } -// int RTMP_ClientPacket() -// rtmp.c +1226 -// NOTE cases have been commented out that are not currently used by AusOcean -func C_RTMP_ClientPacket(r *C_RTMP, packet *C_RTMPPacket) int32 { - var bHasMediaPacket int32 - switch packet.m_packetType { +// handlePacket handles a packet that the client has received. +// NB: cases have been commented out that are not currently used by AusOcean +func handlePacket(s *Session, pkt *packet) int32 { + var hasMediaPacket int32 + switch pkt.packetType { case RTMP_PACKET_TYPE_CHUNK_SIZE: - // TODO: port this - C_HandleChangeChunkSize(r, packet) + if pkt.bodySize >= 4 { + s.inChunkSize = int32(C_AMF_DecodeInt32(pkt.body[:4])) + } case RTMP_PACKET_TYPE_BYTES_READ_REPORT: - // TODO: usue new logger here - //RTMP_Log(RTMP_LOGDEBUG, "%s, received: bytes read report", __FUNCTION__); + s.serverBW = int32(C_AMF_DecodeInt32(pkt.body[:4])) case RTMP_PACKET_TYPE_CONTROL: panic("Unsupported packet type RTMP_PACKET_TYPE_CONTROL") - /* - log.Println("RTMP_PACKET_TYPE_CONTROL") - // TODO: port this - C.HandleCtrl(r, packet) - */ + case RTMP_PACKET_TYPE_SERVER_BW: - // TODO: port this - C_HandlServerBW(r, packet) + s.serverBW = int32(C_AMF_DecodeInt32(pkt.body[:4])) case RTMP_PACKET_TYPE_CLIENT_BW: - // TODO: port this - C_HandleClientBW(r, packet) + s.clientBW = int32(C_AMF_DecodeInt32(pkt.body[:4])) + if pkt.bodySize > 4 { + s.clientBW2 = pkt.body[4] + } else { + s.clientBW2 = 0xff + } case RTMP_PACKET_TYPE_AUDIO: panic("Unsupported packet type RTMP_PACKET_TYPE_AUDIO") @@ -395,15 +267,14 @@ func C_RTMP_ClientPacket(r *C_RTMP, packet *C_RTMPPacket) int32 { panic("Unsupported packet type RTMP_PACKET_TYPE_INFO") case RTMP_PACKET_TYPE_INVOKE: - log.Println("RTMP_PACKET_TYPE_INVOKE:") - // TODO use new logger here - //RTMP_Log(RTMP_LOGDEBUG, "%s, received: invoke %u bytes", __FUNCTION__,packet.m_nBodySize); - - err := C_HandleInvoke(r, packet.m_body[:packet.m_nBodySize]) + if debugMode { + log.Println("RTMP_PACKET_TYPE_INVOKE:") + } + err := handleInvoke(s, pkt.body[:pkt.bodySize]) if err != nil { // This will never happen with the methods we implement. log.Println("HasMediaPacket") - bHasMediaPacket = 2 + hasMediaPacket = 2 } case RTMP_PACKET_TYPE_FLASH_VIDEO: @@ -411,29 +282,27 @@ func C_RTMP_ClientPacket(r *C_RTMP, packet *C_RTMPPacket) int32 { default: // TODO use new logger here - // RTMP_Log(RTMP_LOGDEBUG, "%s, unknown packet type received: 0x%02x", __FUNCTION__,packet.m_packetType); + // RTMP_Log(RTMP_LOGDEBUG, "%s, unknown packet type received: 0x%02x", __FUNCTION__,pkt.packetType); } - return bHasMediaPacket + return hasMediaPacket } -// int ReadN(RTMP* r, char* buffer, int n); -// rtmp.c +1390 -func C_ReadN(r *C_RTMP, buf []byte) error { - err := r.Link.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(r.Link.timeout))) +func readN(s *Session, buf []byte) error { + err := s.link.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(s.link.timeout))) if err != nil { return err } - n, err := io.ReadFull(r.Link.conn, buf) + n, err := io.ReadFull(s.link.conn, buf) if err != nil { if debugMode { - log.Printf("C_ReadN error: %v\n", err) + log.Printf("readN error: %v\n", err) } - C_RTMP_Close(r) + s.close() return err } - r.m_nBytesIn += int32(n) - if r.m_nBytesIn > (r.m_nBytesInSent + r.m_nClientBW/10) { - err := C_SendBytesReceived(r) + s.nBytesIn += int32(n) + if s.nBytesIn > (s.nBytesInSent + s.clientBW/10) { + err := sendBytesReceived(s) if err != nil { return err } @@ -441,89 +310,78 @@ func C_ReadN(r *C_RTMP, buf []byte) error { return nil } -// int WriteN(RTMP* r, const char* buffer, int n); -// rtmp.c +1502 -func C_WriteN(r *C_RTMP, buf []byte) error { +func writeN(s *Session, buf []byte) error { //ToDo: consider using a different timeout for writes than for reads - err := r.Link.conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(r.Link.timeout))) + err := s.link.conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(s.link.timeout))) if err != nil { return err } - _, err = r.Link.conn.Write(buf) + _, err = s.link.conn.Write(buf) if err != nil { if debugMode { - log.Printf("C_WriteN, RTMP send error: %v\n", err) + log.Printf("writeN, RTMP send error: %v\n", err) } - C_RTMP_Close(r) + s.close() return err } return nil } -// int SendConnectPacket(RTMP* r, RTMPPacket* cp); -// rtmp.c +1579 -func C_SendConnectPacket(r *C_RTMP, cp *C_RTMPPacket) error { - if cp != nil { - return C_RTMP_SendPacket(r, cp, 1) - } - +func sendConnectPacket(s *Session) error { var pbuf [4096]byte - packet := C_RTMPPacket{ - m_nChannel: 0x03, - m_headerType: RTMP_PACKET_SIZE_LARGE, - m_packetType: RTMP_PACKET_TYPE_INVOKE, - m_nTimeStamp: 0, - m_nInfoField2: 0, - m_hasAbsTimestamp: false, - m_header: pbuf[:], - m_body: pbuf[RTMP_MAX_HEADER_SIZE:], + pkt := packet{ + channel: RTMP_CHANNEL_CONTROL, + headerType: RTMP_PACKET_SIZE_LARGE, + packetType: RTMP_PACKET_TYPE_INVOKE, + header: pbuf[:], + body: pbuf[RTMP_MAX_HEADER_SIZE:], } - enc := packet.m_body + enc := pkt.body enc = C_AMF_EncodeString(enc, av_connect) if enc == nil { return errEncoding } - r.m_numInvokes += 1 - enc = C_AMF_EncodeNumber(enc, float64(r.m_numInvokes)) + s.numInvokes += 1 + enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_OBJECT enc = enc[1:] - enc = C_AMF_EncodeNamedString(enc, av_app, r.Link.app) + enc = C_AMF_EncodeNamedString(enc, av_app, s.link.app) if enc == nil { return errEncoding } - if r.Link.protocol&RTMP_FEATURE_WRITE != 0 { + if s.link.protocol&RTMP_FEATURE_WRITE != 0 { enc = C_AMF_EncodeNamedString(enc, av_type, av_nonprivate) if enc == nil { return errEncoding } } - if r.Link.flashVer != "" { - enc = C_AMF_EncodeNamedString(enc, av_flashVer, r.Link.flashVer) + if s.link.flashVer != "" { + enc = C_AMF_EncodeNamedString(enc, av_flashVer, s.link.flashVer) if enc == nil { return errEncoding } } - if r.Link.swfUrl != "" { - enc = C_AMF_EncodeNamedString(enc, av_swfUrl, r.Link.swfUrl) + if s.link.swfUrl != "" { + enc = C_AMF_EncodeNamedString(enc, av_swfUrl, s.link.swfUrl) if enc == nil { return errEncoding } } - if r.Link.tcUrl != "" { - enc = C_AMF_EncodeNamedString(enc, av_tcUrl, r.Link.tcUrl) + if s.link.tcUrl != "" { + enc = C_AMF_EncodeNamedString(enc, av_tcUrl, s.link.tcUrl) if enc == nil { return errEncoding } } - if r.Link.protocol&RTMP_FEATURE_WRITE == 0 { + if s.link.protocol&RTMP_FEATURE_WRITE == 0 { enc = C_AMF_EncodeNamedBoolean(enc, av_fpad, false) if enc == nil { return errEncoding @@ -532,11 +390,11 @@ func C_SendConnectPacket(r *C_RTMP, cp *C_RTMPPacket) error { if enc == nil { return errEncoding } - enc = C_AMF_EncodeNamedNumber(enc, av_audioCodecs, r.m_fAudioCodecs) + enc = C_AMF_EncodeNamedNumber(enc, av_audioCodecs, s.audioCodecs) if enc == nil { return errEncoding } - enc = C_AMF_EncodeNamedNumber(enc, av_videoCodecs, r.m_fVideoCodecs) + enc = C_AMF_EncodeNamedNumber(enc, av_videoCodecs, s.videoCodecs) if enc == nil { return errEncoding } @@ -544,16 +402,16 @@ func C_SendConnectPacket(r *C_RTMP, cp *C_RTMPPacket) error { if enc == nil { return errEncoding } - if r.Link.pageUrl != "" { - enc = C_AMF_EncodeNamedString(enc, av_pageUrl, r.Link.pageUrl) + if s.link.pageUrl != "" { + enc = C_AMF_EncodeNamedString(enc, av_pageUrl, s.link.pageUrl) if enc == nil { return errEncoding } } } - if r.m_fEncoding != 0.0 || r.m_bSendEncoding { - enc = C_AMF_EncodeNamedNumber(enc, av_objectEncoding, r.m_fEncoding) + if s.encoding != 0.0 || s.sendEncoding { + enc = C_AMF_EncodeNamedNumber(enc, av_objectEncoding, s.encoding) if enc == nil { return errEncoding } @@ -565,200 +423,175 @@ func C_SendConnectPacket(r *C_RTMP, cp *C_RTMPPacket) error { enc = enc[3:] /* add auth string */ - if r.Link.auth != "" { - enc = C_AMF_EncodeBoolean(enc, r.Link.lFlags&RTMP_LF_AUTH != 0) + if s.link.auth != "" { + enc = C_AMF_EncodeBoolean(enc, s.link.lFlags&RTMP_LF_AUTH != 0) if enc == nil { return errEncoding } - enc = C_AMF_EncodeString(enc, r.Link.auth) + enc = C_AMF_EncodeString(enc, s.link.auth) if enc == nil { return errEncoding } } - for i := range r.Link.extras.o_props { - enc = C_AMF_PropEncode(&r.Link.extras.o_props[i], enc) + for i := range s.link.extras.o_props { + enc = C_AMF_PropEncode(&s.link.extras.o_props[i], enc) if enc == nil { return errEncoding } } - packet.m_nBodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return C_RTMP_SendPacket(r, &packet, 1) + return sendPacket(s, &pkt, true) } -// int RTMP_SendCreateStream(RTMP* r); -// rtmp.c +1725 -func C_RTMP_SendCreateStream(r *C_RTMP) error { +func sendCreateStream(s *Session) error { var pbuf [256]byte - packet := C_RTMPPacket{ - m_nChannel: 0x03, /* control channel (invoke) */ - m_headerType: RTMP_PACKET_SIZE_MEDIUM, - m_packetType: RTMP_PACKET_TYPE_INVOKE, - m_nTimeStamp: 0, - m_nInfoField2: 0, - m_hasAbsTimestamp: false, - m_header: pbuf[:], - m_body: pbuf[RTMP_MAX_HEADER_SIZE:], + pkt := packet{ + channel: RTMP_CHANNEL_CONTROL, + headerType: RTMP_PACKET_SIZE_MEDIUM, + packetType: RTMP_PACKET_TYPE_INVOKE, + header: pbuf[:], + body: pbuf[RTMP_MAX_HEADER_SIZE:], } - enc := packet.m_body + enc := pkt.body enc = C_AMF_EncodeString(enc, av_createStream) if enc == nil { return errEncoding } - r.m_numInvokes++ - enc = C_AMF_EncodeNumber(enc, float64(r.m_numInvokes)) + s.numInvokes++ + enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_NULL enc = enc[1:] - packet.m_nBodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return C_RTMP_SendPacket(r, &packet, 1) + return sendPacket(s, &pkt, true) } -// int SendReleaseStream(RTMP* r); -// rtmp.c +1816 -func C_SendReleaseStream(r *C_RTMP) error { +func sendReleaseStream(s *Session) error { var pbuf [1024]byte - packet := C_RTMPPacket{ - m_nChannel: 0x03, /* control channel (invoke) */ - m_headerType: RTMP_PACKET_SIZE_MEDIUM, - m_packetType: RTMP_PACKET_TYPE_INVOKE, - m_nTimeStamp: 0, - m_nInfoField2: 0, - m_hasAbsTimestamp: false, - m_header: pbuf[:], - m_body: pbuf[RTMP_MAX_HEADER_SIZE:], + pkt := packet{ + channel: RTMP_CHANNEL_CONTROL, + headerType: RTMP_PACKET_SIZE_MEDIUM, + packetType: RTMP_PACKET_TYPE_INVOKE, + header: pbuf[:], + body: pbuf[RTMP_MAX_HEADER_SIZE:], } - enc := packet.m_body + enc := pkt.body enc = C_AMF_EncodeString(enc, av_releaseStream) if enc == nil { return errEncoding } - r.m_numInvokes++ - enc = C_AMF_EncodeNumber(enc, float64(r.m_numInvokes)) + s.numInvokes++ + enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_NULL enc = enc[1:] - enc = C_AMF_EncodeString(enc, r.Link.playpath) + enc = C_AMF_EncodeString(enc, s.link.playpath) if enc == nil { return errEncoding } - packet.m_nBodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return C_RTMP_SendPacket(r, &packet, 0) + return sendPacket(s, &pkt, false) } -// int SendFCPublish(RTMP* r); -// rtmp.c +1846 -func C_SendFCPublish(r *C_RTMP) error { +func sendFCPublish(s *Session) error { var pbuf [1024]byte - packet := C_RTMPPacket{ - m_nChannel: 0x03, /* control channel (invoke) */ - m_headerType: RTMP_PACKET_SIZE_MEDIUM, - m_packetType: RTMP_PACKET_TYPE_INVOKE, - m_nTimeStamp: 0, - m_nInfoField2: 0, - m_hasAbsTimestamp: false, - m_header: pbuf[:], - m_body: pbuf[RTMP_MAX_HEADER_SIZE:], + pkt := packet{ + channel: RTMP_CHANNEL_CONTROL, + headerType: RTMP_PACKET_SIZE_MEDIUM, + packetType: RTMP_PACKET_TYPE_INVOKE, + header: pbuf[:], + body: pbuf[RTMP_MAX_HEADER_SIZE:], } - enc := packet.m_body + enc := pkt.body enc = C_AMF_EncodeString(enc, av_FCPublish) if enc == nil { return errEncoding } - r.m_numInvokes++ - enc = C_AMF_EncodeNumber(enc, float64(r.m_numInvokes)) + s.numInvokes++ + enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_NULL enc = enc[1:] - enc = C_AMF_EncodeString(enc, r.Link.playpath) + enc = C_AMF_EncodeString(enc, s.link.playpath) if enc == nil { return errEncoding } - packet.m_nBodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return C_RTMP_SendPacket(r, &packet, 0) + return sendPacket(s, &pkt, false) } -// int SendFCUnpublish(RTMP *r); -// rtmp.c +1875 -func C_SendFCUnpublish(r *C_RTMP) error { +func sendFCUnpublish(s *Session) error { var pbuf [1024]byte - packet := C_RTMPPacket{ - m_nChannel: 0x03, /* control channel (invoke) */ - m_headerType: RTMP_PACKET_SIZE_MEDIUM, - m_packetType: RTMP_PACKET_TYPE_INVOKE, - m_nTimeStamp: 0, - m_nInfoField2: 0, - m_hasAbsTimestamp: false, - m_header: pbuf[:], - m_body: pbuf[RTMP_MAX_HEADER_SIZE:], + pkt := packet{ + channel: RTMP_CHANNEL_CONTROL, + headerType: RTMP_PACKET_SIZE_MEDIUM, + packetType: RTMP_PACKET_TYPE_INVOKE, + header: pbuf[:], + body: pbuf[RTMP_MAX_HEADER_SIZE:], } - enc := packet.m_body + enc := pkt.body enc = C_AMF_EncodeString(enc, av_FCUnpublish) if enc == nil { return errEncoding } - r.m_numInvokes++ - enc = C_AMF_EncodeNumber(enc, float64(r.m_numInvokes)) + s.numInvokes++ + enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_NULL enc = enc[1:] - enc = C_AMF_EncodeString(enc, r.Link.playpath) + enc = C_AMF_EncodeString(enc, s.link.playpath) if enc == nil { return errEncoding } - packet.m_nBodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return C_RTMP_SendPacket(r, &packet, 0) + return sendPacket(s, &pkt, false) } -// int SendPublish(RTMP* r); -// rtmp.c +1908 -func C_SendPublish(r *C_RTMP) error { +func sendPublish(s *Session) error { var pbuf [1024]byte - packet := C_RTMPPacket{ - m_nChannel: 0x04, /* source channel (invoke) */ - m_headerType: RTMP_PACKET_SIZE_LARGE, - m_packetType: RTMP_PACKET_TYPE_INVOKE, - m_nTimeStamp: 0, - m_nInfoField2: r.m_stream_id, - m_hasAbsTimestamp: false, - m_header: pbuf[:], - m_body: pbuf[RTMP_MAX_HEADER_SIZE:], + pkt := packet{ + channel: RTMP_CHANNEL_SOURCE, + headerType: RTMP_PACKET_SIZE_LARGE, + packetType: RTMP_PACKET_TYPE_INVOKE, + header: pbuf[:], + body: pbuf[RTMP_MAX_HEADER_SIZE:], } - enc := packet.m_body + enc := pkt.body enc = C_AMF_EncodeString(enc, av_publish) if enc == nil { return errEncoding } - r.m_numInvokes++ - enc = C_AMF_EncodeNumber(enc, float64(r.m_numInvokes)) + s.numInvokes++ + enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_NULL enc = enc[1:] - enc = C_AMF_EncodeString(enc, r.Link.playpath) + enc = C_AMF_EncodeString(enc, s.link.playpath) if enc == nil { return errEncoding } @@ -767,34 +600,28 @@ func C_SendPublish(r *C_RTMP) error { return errEncoding } - packet.m_nBodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return C_RTMP_SendPacket(r, &packet, 1) + return sendPacket(s, &pkt, true) } -// int -// SendDeleteStream(RTMP *r, double dStreamId) -// rtmp.c +1942 -func C_SendDeleteStream(r *C_RTMP, dStreamId float64) error { +func sendDeleteStream(s *Session, dStreamId float64) error { var pbuf [256]byte - packet := C_RTMPPacket{ - m_nChannel: 0x03, /* control channel (invoke) */ - m_headerType: RTMP_PACKET_SIZE_MEDIUM, - m_packetType: RTMP_PACKET_TYPE_INVOKE, - m_nTimeStamp: 0, - m_nInfoField2: 0, - m_hasAbsTimestamp: false, - m_header: pbuf[:], - m_body: pbuf[RTMP_MAX_HEADER_SIZE:], + pkt := packet{ + channel: RTMP_CHANNEL_CONTROL, + headerType: RTMP_PACKET_SIZE_MEDIUM, + packetType: RTMP_PACKET_TYPE_INVOKE, + header: pbuf[:], + body: pbuf[RTMP_MAX_HEADER_SIZE:], } - enc := packet.m_body + enc := pkt.body enc = C_AMF_EncodeString(enc, av_deleteStream) if enc == nil { return errEncoding } - r.m_numInvokes++ - enc = C_AMF_EncodeNumber(enc, float64(r.m_numInvokes)) + s.numInvokes++ + enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } @@ -804,83 +631,71 @@ func C_SendDeleteStream(r *C_RTMP, dStreamId float64) error { if enc == nil { return errEncoding } - packet.m_nBodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) /* no response expected */ - return C_RTMP_SendPacket(r, &packet, 0) + return sendPacket(s, &pkt, false) } -// int SendBytesReceived(RTMP* r); -// rtmp.c +2080 -func C_SendBytesReceived(r *C_RTMP) error { +// sendBytesReceived tells the server how many bytes the client has received. +func sendBytesReceived(s *Session) error { var pbuf [256]byte - packet := C_RTMPPacket{ - m_nChannel: 0x02, /* control channel (invoke) */ - m_headerType: RTMP_PACKET_SIZE_MEDIUM, - m_packetType: RTMP_PACKET_TYPE_BYTES_READ_REPORT, - m_nTimeStamp: 0, - m_nInfoField2: 0, - m_hasAbsTimestamp: false, - m_header: pbuf[:], - m_body: pbuf[RTMP_MAX_HEADER_SIZE:], + pkt := packet{ + channel: RTMP_CHANNEL_BYTES_READ, + headerType: RTMP_PACKET_SIZE_MEDIUM, + packetType: RTMP_PACKET_TYPE_BYTES_READ_REPORT, + header: pbuf[:], + body: pbuf[RTMP_MAX_HEADER_SIZE:], } - enc := packet.m_body + enc := pkt.body - r.m_nBytesInSent = r.m_nBytesIn - enc = C_AMF_EncodeInt32(enc, r.m_nBytesIn) + s.nBytesInSent = s.nBytesIn + enc = C_AMF_EncodeInt32(enc, s.nBytesIn) if enc == nil { return errEncoding } - packet.m_nBodySize = 4 + pkt.bodySize = 4 - return C_RTMP_SendPacket(r, &packet, 0) + return sendPacket(s, &pkt, false) } -// int SendCheckBW(RTMP* r); -// rtmp.c +2105 -func C_SendCheckBW(r *C_RTMP) error { +func sendCheckBW(s *Session) error { var pbuf [256]byte - packet := C_RTMPPacket{ - m_nChannel: 0x03, /* control channel (invoke) */ - m_headerType: RTMP_PACKET_SIZE_LARGE, - m_packetType: RTMP_PACKET_TYPE_INVOKE, - m_nTimeStamp: 0, - m_nInfoField2: 0, - m_hasAbsTimestamp: false, - m_header: pbuf[:], - m_body: pbuf[RTMP_MAX_HEADER_SIZE:], + pkt := packet{ + channel: RTMP_CHANNEL_CONTROL, + headerType: RTMP_PACKET_SIZE_LARGE, + packetType: RTMP_PACKET_TYPE_INVOKE, + header: pbuf[:], + body: pbuf[RTMP_MAX_HEADER_SIZE:], } - enc := packet.m_body + enc := pkt.body enc = C_AMF_EncodeString(enc, av__checkbw) if enc == nil { return errEncoding } - r.m_numInvokes++ - enc = C_AMF_EncodeNumber(enc, float64(r.m_numInvokes)) + s.numInvokes++ + enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_NULL enc = enc[1:] - packet.m_nBodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return C_RTMP_SendPacket(r, &packet, 0) + return sendPacket(s, &pkt, false) } -// void AV_erase(C_RTMP_METHOD* vals, int* num, int i, int freeit); -// rtmp.c +2393 -func C_AV_erase(m []C_RTMP_METHOD, i int) []C_RTMP_METHOD { +func eraseMethod(m []method, i int) []method { copy(m[i:], m[i+1:]) - m[len(m)-1] = C_RTMP_METHOD{} + m[len(m)-1] = method{} return m[:len(m)-1] } -// int HandleInvoke(RTMP* r, const char* body, unsigned int nBodySize); -// rtmp.c +2912 -// Side effects: r.m_bPlaying set to true upon av_NetStream_Publish_Start -func C_HandleInvoke(r *C_RTMP, body []byte) error { +// int handleInvoke handles a packet invoke request +// Side effects: s.isPlaying set to true upon av_NetStream_Publish_Start +func handleInvoke(s *Session, body []byte) error { if body[0] != 0x02 { return errInvalidBody } @@ -890,21 +705,18 @@ func C_HandleInvoke(r *C_RTMP, body []byte) error { return errDecoding } - // NOTE we don't really need this ?? still functions without it - //C.AMF_Dump(&obj) - //C.AMFProp_GetString(C_AMF_GetProp(&obj, nil, 0), &method) - method := C_AMFProp_GetString(C_AMF_GetProp(&obj, "", 0)) + meth := C_AMFProp_GetString(C_AMF_GetProp(&obj, "", 0)) txn := C_AMFProp_GetNumber(C_AMF_GetProp(&obj, "", 1)) // TODO use new logger here // RTMP_Log(RTMP_LOGDEBUG, "%s, server invoking <%s>", __FUNCTION__, method.av_val); - switch method { + switch meth { case av__result: var methodInvoked string - for i, m := range r.m_methodCalls { + for i, m := range s.methodCalls { if float64(m.num) == txn { methodInvoked = m.name - r.m_methodCalls = C_AV_erase(r.m_methodCalls, i) + s.methodCalls = eraseMethod(s.methodCalls, i) break } } @@ -919,27 +731,27 @@ func C_HandleInvoke(r *C_RTMP, body []byte) error { //methodInvoked.av_val); switch methodInvoked { case av_connect: - if r.Link.token != "" { + if s.link.token != "" { panic("No support for link token") } - if (r.Link.protocol & RTMP_FEATURE_WRITE) != 0 { - C_SendReleaseStream(r) - C_SendFCPublish(r) + if (s.link.protocol & RTMP_FEATURE_WRITE) != 0 { + sendReleaseStream(s) + sendFCPublish(s) } else { panic("Link protocol has no RTMP_FEATURE_WRITE") } - C_RTMP_SendCreateStream(r) - if (r.Link.protocol & RTMP_FEATURE_WRITE) == 0 { + sendCreateStream(s) + if (s.link.protocol & RTMP_FEATURE_WRITE) == 0 { panic("Link protocol has no RTMP_FEATURE_WRITE") } case av_createStream: - r.m_stream_id = int32(C_AMFProp_GetNumber(C_AMF_GetProp(&obj, "", 3))) + s.streamID = int32(C_AMFProp_GetNumber(C_AMF_GetProp(&obj, "", 3))) - if r.Link.protocol&RTMP_FEATURE_WRITE != 0 { - C_SendPublish(r) + if s.link.protocol&RTMP_FEATURE_WRITE != 0 { + sendPublish(s) } else { panic("Link protocol has no RTMP_FEATURE_WRITE") } @@ -950,8 +762,8 @@ func C_HandleInvoke(r *C_RTMP, body []byte) error { //C.free(unsafe.Pointer(methodInvoked.av_val)) case av_onBWDone: - if r.m_nBWCheckCounter == 0 { - C_SendCheckBW(r) + if s.bwCheckCounter == 0 { + sendCheckBW(s) } case av_onFCUnsubscribe, av_onFCSubscribe: @@ -989,10 +801,10 @@ func C_HandleInvoke(r *C_RTMP, body []byte) error { panic("Unsupported method av_NetStream_Play_Start/av_NetStream_Play_PublishNotify") case av_NetStream_Publish_Start: - r.m_bPlaying = true - for i, m := range r.m_methodCalls { + s.isPlaying = true + for i, m := range s.methodCalls { if m.name == av_publish { - r.m_methodCalls = C_AV_erase(r.m_methodCalls, i) + s.methodCalls = eraseMethod(s.methodCalls, i) break } } @@ -1011,262 +823,45 @@ func C_HandleInvoke(r *C_RTMP, body []byte) error { panic("Unsupported method av_playlist_ready") default: - panic(fmt.Sprintf("unknown method: %q", method)) + panic(fmt.Sprintf("unknown method: %q", meth)) } leave: C_AMF_Reset(&obj) - // None of the methods we implement will result in a true return. return nil } -// void HandleChangeChunkSize(RTMP* r, const RTMPPacket* packet); -// rtmp.c +3345 -func C_HandleChangeChunkSize(r *C_RTMP, packet *C_RTMPPacket) { - if packet.m_nBodySize >= 4 { - //r.m_inChunkSize = int32(C.AMF_DecodeInt32((*byte)(unsafe.Pointer(packet.m_body)))) - r.m_inChunkSize = int32(C_AMF_DecodeInt32(packet.m_body[:4])) - // TODO use new logger here - // RTMP_Log(RTMP_LOGDEBUG, "%s, received: chunk size change to %d", __FUNCTION__, r.m_inChunkSize); - } -} - -// void HandleServerBW(RTMP* r, const RTMPPacket* packet); -// rtmp.c +3508 -func C_HandlServerBW(r *C_RTMP, packet *C_RTMPPacket) { - r.m_nServerBW = int32(C_AMF_DecodeInt32(packet.m_body[:4])) - // TODO use new logger here - // RTMP_Log(RTMP_LOGDEBUG, "%s: server BW = %d", __FUNCTION__, r.m_nServerBW); -} - -// void HandleClientBW(RTMP* r, const RTMPPacket* packet); -// rtmp.c +3515 -func C_HandleClientBW(r *C_RTMP, packet *C_RTMPPacket) { - r.m_nClientBW = int32(C_AMF_DecodeInt32(packet.m_body[:4])) - //r.m_nClientBW = int32(C.AMF_DecodeInt32((*byte)(unsafe.Pointer(packet.m_body)))) - - if packet.m_nBodySize > 4 { - r.m_nClientBW2 = packet.m_body[4] - } else { - r.m_nClientBW2 = 0xff - } - // TODO use new logger here - // RTMP_Log(RTMP_LOGDEBUG, "%s: client BW = %d %d", __FUNCTION__, r.m_nClientBW, - //r.m_nClientBW2); -} - -// static int DecodeInt32LE(const char* data); -// rtmp.c +3527 -func C_DecodeInt32LE(data []byte) int32 { - return int32(data[3])<<24 | int32(data[2])<<16 | int32(data[1])<<8 | int32(data[0]) -} - -// int EncodeInt32LE(char* output, int nVal); -// rtmp.c +3537 -func C_EncodeInt32LE(dst []byte, v int32) int32 { - binary.LittleEndian.PutUint32(dst, uint32(v)) - return 4 -} - -// int RTMP_ReadPacket(RTMP* r, RTMPPacket* packet); -// rtmp.c +3550 -func C_RTMP_ReadPacket(r *C_RTMP, packet *C_RTMPPacket) error { - var hbuf [RTMP_MAX_HEADER_SIZE]byte - header := hbuf[:] - - err := C_ReadN(r, header[:1]) - if err != nil { - log.Println("C_RTMP_ReadPacket: failed to read RTMP packet header!") - return err - } - packet.m_headerType = (header[0] & 0xc0) >> 6 - packet.m_nChannel = int32(header[0] & 0x3f) - header = header[1:] - - switch { - case packet.m_nChannel == 0: - err = C_ReadN(r, header[:1]) - if err != nil { - log.Println("C_RTMP_ReadPacket: failed to read rtmp packet header 2nd byte.") - return err - } - header = header[1:] - packet.m_nChannel = int32(header[0]) + 64 - - case packet.m_nChannel == 1: - err = C_ReadN(r, header[:2]) - if err != nil { - log.Println("C_RTMP_ReadPacket: failed to read RTMP packet 3rd byte") - return err - } - header = header[2:] - packet.m_nChannel = int32(binary.BigEndian.Uint16(header[:2])) + 64 - - } - - if packet.m_nChannel >= r.m_channelsAllocatedIn { - n := packet.m_nChannel + 10 - timestamp := append(r.m_channelTimestamp, make([]int32, 10)...) - - var packets []*C_RTMPPacket - if r.m_vecChannelsIn == nil { - packets = make([]*C_RTMPPacket, n) - } else { - packets = append(r.m_vecChannelsIn[:packet.m_nChannel:packet.m_nChannel], make([]*C_RTMPPacket, 10)...) - } - - r.m_channelTimestamp = timestamp - r.m_vecChannelsIn = packets - - for i := int(r.m_channelsAllocatedIn); i < len(r.m_channelTimestamp); i++ { - r.m_channelTimestamp[i] = 0 - } - for i := int(r.m_channelsAllocatedIn); i < int(n); i++ { - r.m_vecChannelsIn[i] = nil - } - r.m_channelsAllocatedIn = n - } - - nSize := packetSize[packet.m_headerType] - switch { - case nSize == RTMP_LARGE_HEADER_SIZE: - packet.m_hasAbsTimestamp = true - case nSize < RTMP_LARGE_HEADER_SIZE: - if r.m_vecChannelsIn[packet.m_nChannel] != nil { - *packet = *(r.m_vecChannelsIn[packet.m_nChannel]) - } - } - - nSize-- - - if nSize > 0 { - err = C_ReadN(r, header[:nSize]) - if err != nil { - log.Println("C_RTMP_ReadPacket: failed to read rtmp packet header.") - return err - } - } - - hSize := len(hbuf) - len(header) + nSize - - if nSize >= 3 { - packet.m_nTimeStamp = C_AMF_DecodeInt24(header[:3]) - - if nSize >= 6 { - packet.m_nBodySize = C_AMF_DecodeInt24(header[3:6]) - packet.m_nBytesRead = 0 - - if nSize > 6 { - packet.m_packetType = header[6] - - if nSize == 11 { - packet.m_nInfoField2 = C_DecodeInt32LE(header[7:11]) - } - } - } - } - - extendedTimestamp := packet.m_nTimeStamp == 0xffffff - if extendedTimestamp { - err = C_ReadN(r, header[nSize:nSize+4]) - if err != nil { - log.Println("RTMPRead_Packet: Failed to read extended timestamp") - return err - } - // TODO: port this - packet.m_nTimeStamp = C_AMF_DecodeInt32(header[nSize : nSize+4]) - hSize += 4 - } - - if packet.m_nBodySize > 0 && packet.m_body == nil { - // TODO: port this - C_RTMPPacket_Alloc(packet, packet.m_nBodySize) - packet.m_headerType = (hbuf[0] & 0xc0) >> 6 - } - - nToRead := int32(packet.m_nBodySize - packet.m_nBytesRead) - nChunk := r.m_inChunkSize - - if nToRead < nChunk { - nChunk = nToRead - } - - if packet.m_chunk != nil { - packet.m_chunk.c_headerSize = int32(hSize) - copy(packet.m_chunk.c_header[:], hbuf[:hSize]) - packet.m_chunk.c_chunk = packet.m_body[packet.m_nBytesRead : packet.m_nBytesRead+uint32(nChunk)] - } - - err = C_ReadN(r, packet.m_body[packet.m_nBytesRead:][:nChunk]) - if err != nil { - log.Println("C_RTMP_ReadPacket: failed to read RTMP packet body") - return err - } - - packet.m_nBytesRead += uint32(nChunk) - - // keep the packet as ref for other packets on this channel - if r.m_vecChannelsIn[packet.m_nChannel] == nil { - r.m_vecChannelsIn[packet.m_nChannel] = &C_RTMPPacket{} - } - *(r.m_vecChannelsIn[packet.m_nChannel]) = *packet - - if extendedTimestamp { - r.m_vecChannelsIn[packet.m_nChannel].m_nTimeStamp = 0xffffff - } - - // TODO: port this - if C_RTMPPacket_IsReady(packet) { - if !packet.m_hasAbsTimestamp { - // timestamps seem to always be relative - packet.m_nTimeStamp += uint32(r.m_channelTimestamp[packet.m_nChannel]) - } - r.m_channelTimestamp[packet.m_nChannel] = int32(packet.m_nTimeStamp) - - r.m_vecChannelsIn[packet.m_nChannel].m_body = nil - r.m_vecChannelsIn[packet.m_nChannel].m_nBytesRead = 0 - r.m_vecChannelsIn[packet.m_nChannel].m_hasAbsTimestamp = false - } else { - packet.m_body = nil /* so it won't be erased on free */ - } - return nil -} - -// int HandShake(RTMP* r, int FP9HandShake); -// rtmp.c +3744 -func C_HandShake(r *C_RTMP, FP9HandShake int32) error { +func handshake(s *Session) error { var clientbuf [RTMP_SIG_SIZE + 1]byte clientsig := clientbuf[1:] var serversig [RTMP_SIG_SIZE]byte - - clientbuf[0] = 0x03 // not encrypted - - binary.BigEndian.PutUint32(clientsig, uint32(C_RTMP_GetTime())) + clientbuf[0] = RTMP_CHANNEL_CONTROL + binary.BigEndian.PutUint32(clientsig, uint32(time.Now().UnixNano()/1000000)) copy(clientsig[4:8], []byte{0, 0, 0, 0}) for i := 8; i < RTMP_SIG_SIZE; i++ { clientsig[i] = byte(rand.Intn(256)) } - err := C_WriteN(r, clientbuf[:]) + err := writeN(s, clientbuf[:]) if err != nil { return err } var typ [1]byte - err = C_ReadN(r, typ[:]) + err = readN(s, typ[:]) if err != nil { return err } if debugMode { - log.Printf("C_HandShake: Type answer: %v\n", typ[0]) + log.Printf("handshake: Type answer: %v\n", typ[0]) } if typ[0] != clientbuf[0] { - log.Printf("C_HandShake: type mismatch: client sent %v, server sent: %v\n", + log.Printf("handshake: type mismatch: client sent %v, server sent: %v\n", clientbuf[0], typ) } - err = C_ReadN(r, serversig[:]) + err = readN(s, serversig[:]) if err != nil { return err } @@ -1279,12 +874,12 @@ func C_HandShake(r *C_RTMP, FP9HandShake int32) error { // serversig[4], serversig[5], serversig[6], serversig[7]) // 2nd part of handshake - err = C_WriteN(r, serversig[:]) + err = writeN(s, serversig[:]) if err != nil { return err } - err = C_ReadN(r, serversig[:]) + err = readN(s, serversig[:]) if err != nil { return err } @@ -1296,293 +891,17 @@ func C_HandShake(r *C_RTMP, FP9HandShake int32) error { return nil } -// int RTMP_SendPacket(RTMP* r, RTMPPacket* packet, int queue); -// rtmp.c +3896 -func C_RTMP_SendPacket(r *C_RTMP, packet *C_RTMPPacket, queue int) error { - var prevPacket *C_RTMPPacket - var last int - - if packet.m_nChannel >= r.m_channelsAllocatedOut { - n := int(packet.m_nChannel + 10) - - var packets []*C_RTMPPacket - if r.m_vecChannelsOut == nil { - packets = make([]*C_RTMPPacket, n) - } else { - packets = append(r.m_vecChannelsOut[:packet.m_nChannel:packet.m_nChannel], make([]*C_RTMPPacket, 10)...) - } - r.m_vecChannelsOut = packets - - for i := int(r.m_channelsAllocatedOut); i < n; i++ { - r.m_vecChannelsOut[i] = nil - } - - r.m_channelsAllocatedOut = int32(n) +// write prepares data to write then sends it. +func (s *Session) write(buf []byte) error { + pkt := packet{ + channel: RTMP_CHANNEL_SOURCE, + info: s.streamID, } - prevPacket = r.m_vecChannelsOut[packet.m_nChannel] - - if prevPacket != nil && packet.m_headerType != RTMP_PACKET_SIZE_LARGE { - // compress a bit by using the prev packet's attributes - if prevPacket.m_nBodySize == packet.m_nBodySize && prevPacket.m_packetType == packet.m_packetType && packet.m_headerType == RTMP_PACKET_SIZE_MEDIUM { - packet.m_headerType = RTMP_PACKET_SIZE_SMALL - } - - if prevPacket.m_nTimeStamp == packet.m_nTimeStamp && packet.m_headerType == RTMP_PACKET_SIZE_SMALL { - packet.m_headerType = RTMP_PACKET_SIZE_MINIMUM - } - - last = int(prevPacket.m_nTimeStamp) - } - - if packet.m_headerType > 3 { - log.Printf("Sanity failed! trying to send header of type: 0x%02x.", - packet.m_headerType) - return errInvalidHeader - } - - var headBytes []byte - var origIdx int - if packet.m_body != nil { - // Span from -packetsize for the type to the start of the body. - headBytes = packet.m_header - origIdx = RTMP_MAX_HEADER_SIZE - packetSize[packet.m_headerType] - } else { - // Allocate a new header and allow 6 bytes of movement backward. - var hbuf [RTMP_MAX_HEADER_SIZE]byte - headBytes = hbuf[:] - origIdx = 6 - } - - var cSize int - switch { - case packet.m_nChannel > 319: - cSize = 2 - case packet.m_nChannel > 63: - cSize = 1 - } - - hSize := packetSize[packet.m_headerType] - if cSize != 0 { - origIdx -= cSize - hSize += cSize - } - - var ts uint32 - if prevPacket != nil { - ts = uint32(int(packet.m_nTimeStamp) - last) - } - if ts >= 0xffffff { - origIdx -= 4 - hSize += 4 - log.Printf("Larger timestamp than 24-bit: 0x%v", ts) - } - - headerIdx := origIdx - - c := packet.m_headerType << 6 - switch cSize { - case 0: - c |= byte(packet.m_nChannel) - case 1: - // Do nothing. - case 2: - c |= 1 - } - headBytes[headerIdx] = c - headerIdx++ - - if cSize != 0 { - tmp := packet.m_nChannel - 64 - headBytes[headerIdx] = byte(tmp & 0xff) - headerIdx++ - - if cSize == 2 { - headBytes[headerIdx] = byte(tmp >> 8) - headerIdx++ - } - } - - if packetSize[packet.m_headerType] > 1 { - res := ts - if ts > 0xffffff { - res = 0xffffff - } - C_AMF_EncodeInt24(headBytes[headerIdx:], int32(res)) - headerIdx += 3 // 24bits - } - - if packetSize[packet.m_headerType] > 4 { - C_AMF_EncodeInt24(headBytes[headerIdx:], int32(packet.m_nBodySize)) - headerIdx += 3 // 24bits - headBytes[headerIdx] = packet.m_packetType - headerIdx++ - } - - if packetSize[packet.m_headerType] > 8 { - n := int(C_EncodeInt32LE(headBytes[headerIdx:headerIdx+4], packet.m_nInfoField2)) - headerIdx += n - } - - if ts >= 0xffffff { - C_AMF_EncodeInt32(headBytes[headerIdx:], int32(ts)) - headerIdx += 4 // 32bits - } - - nSize := int(packet.m_nBodySize) - nChunkSize := int(r.m_outChunkSize) - - if debugMode { - if r.Link.conn != nil { - log.Printf("C_RTMP_SendPacket: %v->%v, size=%v", r.Link.conn.LocalAddr(), r.Link.conn.RemoteAddr(), nSize) - } - } - - // TODO(kortschak): Rewrite this horrific peice of premature optimisation. - // NB: RTMP wants packets in chunks which are 128 bytes by default, but the server may request a different size. - for nSize+hSize != 0 { - if nChunkSize > nSize { - nChunkSize = nSize - } - - err := C_WriteN(r, headBytes[origIdx:][:nChunkSize+hSize]) - if err != nil { - return err - } - - nSize -= nChunkSize - origIdx += nChunkSize + hSize - hSize = 0 - - if nSize > 0 { - origIdx -= 1 + cSize - hSize = 1 + cSize - - if ts >= 0xffffff { - origIdx -= 4 - hSize += 4 - } - - headBytes[origIdx] = 0xc0 | c - - if cSize != 0 { - tmp := int(packet.m_nChannel) - 64 - headBytes[origIdx+1] = byte(tmp) - - if cSize == 2 { - headBytes[origIdx+2] = byte(tmp >> 8) - } - } - if ts >= 0xffffff { - extendedTimestamp := headBytes[origIdx+1+cSize:] - C_AMF_EncodeInt32(extendedTimestamp[:4], int32(ts)) - } - } - } - - // We invoked a remote method - // TODO: port the const - if packet.m_packetType == RTMP_PACKET_TYPE_INVOKE { - buf := packet.m_body[1:] - method := C_AMF_DecodeString(buf) - - if debugMode { - log.Printf("invoking %v", method) - } - // keep it in call queue till result arrives - if queue != 0 { - buf = buf[3+len(method):] - txn := int32(C_AMF_DecodeNumber(buf[:8])) - r.m_methodCalls = append(r.m_methodCalls, C_RTMP_METHOD{name: method, num: txn}) - } - } - - if r.m_vecChannelsOut[packet.m_nChannel] == nil { - r.m_vecChannelsOut[packet.m_nChannel] = &C_RTMPPacket{} - } - *(r.m_vecChannelsOut[packet.m_nChannel]) = *packet - - return nil -} - -// void RTMP_Close(RTMP *r); -// rtmp.c +4168 -func C_RTMP_Close(r *C_RTMP) { - C_CloseInternal(r, false) -} - -// static void CloseInternal(RTMP *r, int reconnect); -// rtmp.c +4175 -func C_CloseInternal(r *C_RTMP, reconnect bool) { - var i int32 - - if C_RTMP_IsConnected(r) { - if r.m_stream_id > 0 { - i = r.m_stream_id - if r.Link.protocol&RTMP_FEATURE_WRITE != 0 { - C_SendFCUnpublish(r) - } - C_SendDeleteStream(r, float64(i)) - } - err := r.Link.conn.Close() - if err != nil && debugMode { - log.Printf("C_RTMPSockBuf_Close error: %v\n", err) - } - } - - r.m_stream_id = -1 - r.Link.conn = nil - r.m_nBWCheckCounter = 0 - r.m_nBytesIn = 0 - r.m_nBytesInSent = 0 - - r.m_write.m_nBytesRead = 0 - C_RTMPPacket_Free(&r.m_write) - - // NOTE: C frees - not using in our case - for i := 0; i < int(r.m_channelsAllocatedIn); i++ { - if r.m_vecChannelsIn[i] != nil { - r.m_vecChannelsIn[i] = nil - } - } - - //C.free(unsafe.Pointer(r.m_vecChannelsOut)) - r.m_vecChannelsOut = nil - r.m_channelsAllocatedOut = 0 - r.m_methodCalls = nil //C_AV_clear(r.m_methodCalls, r.m_numCalls) - - r.m_methodCalls = r.m_methodCalls[:0] - r.m_numInvokes = 0 - - r.m_bPlaying = false - - r.m_msgCounter = 0 - r.m_resplen = 0 - r.m_unackd = 0 - - if ((r.Link.lFlags & RTMP_LF_FTCU) != 0) && !reconnect { - r.Link.app = "" - r.Link.lFlags ^= RTMP_LF_FAPU - } - - if !reconnect { - r.Link.playpath0 = "" - } -} - -/// int RTMP_Write(RTMP* r, const char* buf, int size); -// rtmp.c +5136 -func C_RTMP_Write(r *C_RTMP, buf []byte) error { - // TODO: port RTMPPacket - var pkt = &r.m_write var enc []byte - size := len(buf) - var num int - pkt.m_nChannel = 0x04 - pkt.m_nInfoField2 = r.m_stream_id for len(buf) != 0 { - if pkt.m_nBytesRead == 0 { - if size < minDataSize { + if pkt.bytesRead == 0 { + if len(buf) < minDataSize { return errTinyPacket } @@ -1590,54 +909,52 @@ func C_RTMP_Write(r *C_RTMP, buf []byte) error { buf = buf[13:] } - pkt.m_packetType = buf[0] + pkt.packetType = buf[0] buf = buf[1:] - pkt.m_nBodySize = C_AMF_DecodeInt24(buf[:3]) + pkt.bodySize = C_AMF_DecodeInt24(buf[:3]) buf = buf[3:] - pkt.m_nTimeStamp = C_AMF_DecodeInt24(buf[:3]) + pkt.timestamp = C_AMF_DecodeInt24(buf[:3]) buf = buf[3:] - pkt.m_nTimeStamp |= uint32(buf[0]) << 24 + pkt.timestamp |= uint32(buf[0]) << 24 buf = buf[4:] - if ((pkt.m_packetType == RTMP_PACKET_TYPE_AUDIO || - pkt.m_packetType == RTMP_PACKET_TYPE_VIDEO) && - pkt.m_nTimeStamp == 0) || pkt.m_packetType == RTMP_PACKET_TYPE_INFO { - - pkt.m_headerType = RTMP_PACKET_SIZE_LARGE - - if pkt.m_packetType == RTMP_PACKET_TYPE_INFO { - pkt.m_nBodySize += 16 + headerType := uint8(RTMP_PACKET_SIZE_MEDIUM) + switch pkt.packetType { + case RTMP_PACKET_TYPE_VIDEO, RTMP_PACKET_TYPE_AUDIO: + if pkt.timestamp == 0 { + headerType = RTMP_PACKET_SIZE_LARGE } - } else { - pkt.m_headerType = RTMP_PACKET_SIZE_MEDIUM + case RTMP_PACKET_TYPE_INFO: + headerType = RTMP_PACKET_SIZE_LARGE + pkt.bodySize += 16 } - // TODO: Port this - C_RTMPPacket_Alloc(pkt, pkt.m_nBodySize) - enc = pkt.m_body[:pkt.m_nBodySize] - if pkt.m_packetType == RTMP_PACKET_TYPE_INFO { + resizePacket(&pkt, pkt.bodySize, headerType) + + enc = pkt.body[:pkt.bodySize] + if pkt.packetType == RTMP_PACKET_TYPE_INFO { enc = C_AMF_EncodeString(enc, setDataFrame) if enc == nil { return errEncoding } - pkt.m_nBytesRead = uint32(len(pkt.m_body) - len(enc)) + pkt.bytesRead = uint32(len(pkt.body) - len(enc)) } } else { - enc = pkt.m_body[:pkt.m_nBodySize][pkt.m_nBytesRead:] + enc = pkt.body[:pkt.bodySize][pkt.bytesRead:] } - num = int(pkt.m_nBodySize - pkt.m_nBytesRead) + num := int(pkt.bodySize - pkt.bytesRead) if num > len(buf) { num = len(buf) } copy(enc[:num], buf[:num]) - pkt.m_nBytesRead += uint32(num) + pkt.bytesRead += uint32(num) buf = buf[num:] - if pkt.m_nBytesRead == pkt.m_nBodySize { - err := C_RTMP_SendPacket(r, pkt, 0) - C_RTMPPacket_Free(pkt) - pkt.m_nBytesRead = 0 + if pkt.bytesRead == pkt.bodySize { + err := sendPacket(s, &pkt, false) + pkt.body = nil + pkt.bytesRead = 0 if err != nil { return err } @@ -1649,21 +966,3 @@ func C_RTMP_Write(r *C_RTMP, buf []byte) error { } return nil } - -var rtmpErrs = [...]string{ - 1: "rtmp: not connected", - 2: "rtmp: write error", - 3: "rtmp: not started", -} - -type Err uint - -func (e Err) Error() string { - if 0 <= int(e) && int(e) < len(rtmpErrs) { - s := rtmpErrs[e] - if s != "" { - return s - } - } - return "rtmp: " + strconv.Itoa(int(e)) -} diff --git a/rtmp/rtmp_headers.go b/rtmp/rtmp_headers.go index 53061517..becb48be 100644 --- a/rtmp/rtmp_headers.go +++ b/rtmp/rtmp_headers.go @@ -7,9 +7,11 @@ DESCRIPTION AUTHORS Saxon Nelson-Milton <saxon@ausocean.org> + Dan Kortschak <dan@ausocean.org> + Alan Noble <alan@ausocean.org> LICENSE - rtmp_headers.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + rtmp_headers.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the @@ -31,7 +33,9 @@ LICENSE */ package rtmp -import "net" +import ( + "net" +) const ( RTMPT_OPEN = iota @@ -40,28 +44,6 @@ const ( RTMPT_CLOSE ) -const ( - RTMP_PACKET_TYPE_CHUNK_SIZE = 0x01 - RTMP_PACKET_TYPE_BYTES_READ_REPORT = 0x03 - RTMP_PACKET_TYPE_CONTROL = 0x04 - RTMP_PACKET_TYPE_SERVER_BW = 0x05 - RTMP_PACKET_TYPE_CLIENT_BW = 0x06 - RTMP_PACKET_TYPE_AUDIO = 0x08 - RTMP_PACKET_TYPE_VIDEO = 0x09 - RTMP_PACKET_TYPE_FLEX_STREAM_SEND = 0x0F - RTMP_PACKET_TYPE_FLEX_SHARED_OBJECT = 0x10 - RTMP_PACKET_TYPE_FLEX_MESSAGE = 0x11 - RTMP_PACKET_TYPE_INFO = 0x12 - RTMP_PACKET_TYPE_INVOKE = 0x14 - RTMP_PACKET_TYPE_FLASH_VIDEO = 0x16 -) - -const ( - RTMP_PACKET_SIZE_LARGE = 0 - RTMP_PACKET_SIZE_MEDIUM = 1 - RTMP_PACKET_SIZE_SMALL = 2 - RTMP_PACKET_SIZE_MINIMUM = 3 -) const ( RTMP_READ_HEADER = 0x01 RTMP_READ_RESUME = 0x02 @@ -112,101 +94,26 @@ const ( RTMP_MAX_HEADER_SIZE = 18 ) -// typedef struct RTMPChunk -// rtmp.h +105 -type C_RTMPChunk struct { - c_headerSize int32 - c_chunk []byte - c_header [RTMP_MAX_HEADER_SIZE]byte +type link struct { + host string + playpath string + tcUrl string + swfUrl string + pageUrl string + app string + auth string + flashVer string + token string + extras C_AMFObject + lFlags int32 + swfAge int32 + protocol int32 + timeout uint + port uint16 + conn *net.TCPConn } -// typedef struct RTMPPacket -// rtmp.h +113 -type C_RTMPPacket struct { - m_headerType uint8 - m_packetType uint8 - m_hasAbsTimestamp bool - m_nChannel int32 - m_nTimeStamp uint32 - m_nInfoField2 int32 - m_nBodySize uint32 - m_nBytesRead uint32 - m_chunk *C_RTMPChunk - m_header []byte - m_body []byte -} - -// typedef struct RTMPSockBuf -// rtmp.h +127 -// DELETED: subsumed by C_RTMP_LNK - -// RTMPPacket_IsReady(a) -// rtmp.h +142 -func C_RTMPPacket_IsReady(p *C_RTMPPacket) bool { - return p.m_nBytesRead == p.m_nBodySize -} - -// typedef struct RTMP_LNK -// rtmp.h +144 -type C_RTMP_LNK struct { - host string - playpath0 string - playpath string - tcUrl string - swfUrl string - pageUrl string - app string - auth string - flashVer string - token string - extras C_AMFObject - seekTime int32 - lFlags int32 - swfAge int32 - protocol int32 - timeout uint - port uint16 - conn *net.TCPConn -} - -// typedef struct RTMPMethod -// rtmp.h +231 -type C_RTMP_METHOD struct { +type method struct { name string num int32 } - -// typedef struct RTMP -// rtmp.h +237 -type C_RTMP struct { - m_inChunkSize int32 - m_outChunkSize int32 - m_nBWCheckCounter int32 - m_nBytesIn int32 - m_nBytesInSent int32 - m_nBufferMS int32 - m_stream_id int32 - m_mediaChannel int32 - m_pausing int32 - m_nServerBW int32 - m_nClientBW int32 - m_nClientBW2 uint8 - m_bPlaying bool - m_bSendEncoding bool - m_numInvokes int32 - m_methodCalls []C_RTMP_METHOD - m_channelsAllocatedIn int32 - m_channelsAllocatedOut int32 - m_vecChannelsIn []*C_RTMPPacket - m_vecChannelsOut []*C_RTMPPacket - m_channelTimestamp []int32 - m_fAudioCodecs float64 - m_fVideoCodecs float64 - m_fEncoding float64 - m_fDuration float64 - m_msgCounter int32 - m_resplen int32 - m_unackd int32 - m_write C_RTMPPacket - Link C_RTMP_LNK -} diff --git a/rtmp/rtmp_sys.go b/rtmp/rtmp_sys.go deleted file mode 100644 index 13fdc361..00000000 --- a/rtmp/rtmp_sys.go +++ /dev/null @@ -1,7 +0,0 @@ -package rtmp - -// #define SET_RCVTIMEO(tv,s) int tv = s*1000 -// rtmp_sys.h +43 -func SET_RCVTIMEO(tv *int32, s int32) { - *tv = s * 1000 -} diff --git a/rtmp/session.go b/rtmp/session.go index ff2cbd83..6d1fad40 100644 --- a/rtmp/session.go +++ b/rtmp/session.go @@ -8,9 +8,10 @@ DESCRIPTION AUTHORS Saxon Nelson-Milton <saxon@ausocean.org> Dan Kortschak <dan@ausocean.org> + Alan Noble <alan@ausocean.org> LICENSE - session.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + session.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the @@ -36,14 +37,36 @@ import ( "errors" ) -// session provides parameters required for an rtmp communication session. +// Session holds the state for an RTMP session. type Session struct { - rtmp *C_RTMP - url string - timeout uint + url string + timeout uint + inChunkSize int32 + outChunkSize int32 + bwCheckCounter int32 + nBytesIn int32 + nBytesInSent int32 + streamID int32 + serverBW int32 + clientBW int32 + clientBW2 uint8 + isPlaying bool + sendEncoding bool + numInvokes int32 + methodCalls []method + channelsAllocatedIn int32 + channelsAllocatedOut int32 + vecChannelsIn []*packet + vecChannelsOut []*packet + channelTimestamp []int32 + audioCodecs float64 + videoCodecs float64 + encoding float64 + defered []byte + link link } -// NewSession returns a new session. +// NewSession returns a new Session. func NewSession(url string, connectTimeout uint) *Session { return &Session{ url: url, @@ -51,48 +74,98 @@ func NewSession(url string, connectTimeout uint) *Session { } } -// Open establishes an rtmp connection with the url passed into the -// constructor +// Open establishes an rtmp connection with the url passed into the constructor. func (s *Session) Open() error { - if s.rtmp != nil { + if s.isConnected() { return errors.New("rtmp: attempt to start already running session") } - var err error - s.rtmp, err = startSession(s.rtmp, s.url, s.timeout) - if s.rtmp == nil { + err := s.start() + if err != nil { return err } return nil } -// Close terminates the rtmp connection -func (s *Session) Close() error { - if s.rtmp == nil { - return Err(3) +// start does the heavylifting for Open(). +func (s *Session) start() error { + s.init() + err := setupURL(s, s.url) + if err != nil { + s.close() + return err } - ret := endSession(s.rtmp) - s.rtmp = nil - if ret != 0 { - return Err(ret) + + s.enableWrite() + err = connect(s) + if err != nil { + s.close() + return err + } + + err = connectStream(s) + if err != nil { + s.close() + return err } return nil } -// Write writes a frame (flv tag) to the rtmp connection +// init initializes various RTMP defauls. +// ToDo: define consts for the magic numbers. +func (s *Session) init() { + s.inChunkSize = RTMP_DEFAULT_CHUNKSIZE + s.outChunkSize = RTMP_DEFAULT_CHUNKSIZE + s.clientBW = 2500000 + s.clientBW2 = 2 + s.serverBW = 2500000 + s.audioCodecs = 3191.0 + s.videoCodecs = 252.0 + s.link.timeout = s.timeout + s.link.swfAge = 30 +} + +// Close terminates the rtmp connection, +func (s *Session) Close() error { + if !s.isConnected() { + return errNotConnected + } + s.close() + return nil +} + +// close does the heavylifting for Close(). +// Any errors are ignored as it is often called in response to an earlier error. +func (s *Session) close() { + if s.isConnected() { + if s.streamID > 0 { + if s.link.protocol&RTMP_FEATURE_WRITE != 0 { + sendFCUnpublish(s) + } + sendDeleteStream(s, float64(s.streamID)) + } + s.link.conn.Close() + } + *s = Session{} +} + +// Write writes a frame (flv tag) to the rtmp connection. func (s *Session) Write(data []byte) (int, error) { - if s.rtmp == nil { - return 0, Err(3) + if !s.isConnected() { + return 0, errNotConnected } - - if !C_RTMP_IsConnected(s.rtmp) { - return 0, Err(1) - } - - err := C_RTMP_Write(s.rtmp, data) + err := s.write(data) if err != nil { - //if C.RTMP_Write(s.rtmp, (*byte)(unsafe.Pointer(&data[0])), int32(len(data))) == 0 { - // TODO: propagate err - return 0, Err(2) + return 0, err } return len(data), nil } + +// isConnected returns true if the RTMP connection is up. +func (s *Session) isConnected() bool { + return s.link.conn != nil +} + +// enableWrite enables the current session for writing. +func (s *Session) enableWrite() { + s.link.protocol |= RTMP_FEATURE_WRITE +} diff --git a/rtmp/timeval_arm.go b/rtmp/timeval_arm.go deleted file mode 100644 index 8bf02ed6..00000000 --- a/rtmp/timeval_arm.go +++ /dev/null @@ -1,7 +0,0 @@ -package rtmp - -import "golang.org/x/sys/unix" - -func setTimeval(sec int) unix.Timeval { - return unix.Timeval{Sec: int32(sec)} -}