/* NAME packet.go DESCRIPTION See Readme.md AUTHORS Saxon Nelson-Milton Dan Kortschak Alan Noble LICENSE packet.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. Derived from librtmp under the GNU Lesser General Public License 2.1 Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org Copyright (C) 2008-2009 Andrej Stepanchuk Copyright (C) 2009-2010 Howard Chu */ package rtmp import ( "encoding/binary" "io" ) const ( RTMP_PACKET_TYPE_CHUNK_SIZE = 0x01 RTMP_PACKET_TYPE_BYTES_READ_REPORT = 0x03 RTMP_PACKET_TYPE_CONTROL = 0x04 RTMP_PACKET_TYPE_SERVER_BW = 0x05 RTMP_PACKET_TYPE_CLIENT_BW = 0x06 RTMP_PACKET_TYPE_AUDIO = 0x08 RTMP_PACKET_TYPE_VIDEO = 0x09 RTMP_PACKET_TYPE_FLEX_STREAM_SEND = 0x0F RTMP_PACKET_TYPE_FLEX_SHARED_OBJECT = 0x10 RTMP_PACKET_TYPE_FLEX_MESSAGE = 0x11 RTMP_PACKET_TYPE_INFO = 0x12 RTMP_PACKET_TYPE_INVOKE = 0x14 RTMP_PACKET_TYPE_FLASH_VIDEO = 0x16 ) const ( RTMP_PACKET_SIZE_LARGE = 0 RTMP_PACKET_SIZE_MEDIUM = 1 RTMP_PACKET_SIZE_SMALL = 2 RTMP_PACKET_SIZE_MINIMUM = 3 RTMP_PACKET_SIZE_AUTO = 4 ) const ( RTMP_CHANNEL_BYTES_READ = 0x02 RTMP_CHANNEL_CONTROL = 0x03 RTMP_CHANNEL_SOURCE = 0x04 ) // headerSizes defines header sizes for header types 0, 1, 2 and 3 respectively: // 0: full header (12 bytes) // 1: header without message ID (8 bytes) // 2: basic header + timestamp (4 byes) // 3: basic header (chunk type and stream ID) (1 byte) var headerSizes = [...]int{12, 8, 4, 1} // packet defines an RTMP packet. type packet struct { headerType uint8 packetType uint8 channel int32 hasAbsTimestamp bool timestamp uint32 info int32 bodySize uint32 bytesRead uint32 chunk *chunk header []byte body []byte } // chunk defines an RTMP packet chunk. type chunk struct { headerSize int32 data []byte header [RTMP_MAX_HEADER_SIZE]byte } // ToDo: Consider making the following functions into methods. // readPacket reads a packet. func readPacket(s *Session, pkt *packet) error { var hbuf [RTMP_MAX_HEADER_SIZE]byte header := hbuf[:] err := s.read(header[:1]) if err != nil { s.log(DebugLevel, pkg+"failed to read packet header 1st byte", "error", err.Error()) if err == io.EOF { s.log(WarnLevel, pkg+"EOF error; connection likely terminated") } return err } pkt.headerType = (header[0] & 0xc0) >> 6 pkt.channel = int32(header[0] & 0x3f) header = header[1:] switch { case pkt.channel == 0: err = s.read(header[:1]) if err != nil { s.log(DebugLevel, pkg+"failed to read packet header 2nd byte", "error", err.Error()) return err } header = header[1:] pkt.channel = int32(header[0]) + 64 case pkt.channel == 1: err = s.read(header[:2]) if err != nil { s.log(DebugLevel, pkg+"failed to read packet header 3rd byte", "error", err.Error()) return err } header = header[2:] pkt.channel = int32(binary.BigEndian.Uint16(header[:2])) + 64 } if pkt.channel >= s.channelsAllocatedIn { n := pkt.channel + 10 timestamp := append(s.channelTimestamp, make([]int32, 10)...) var pkts []*packet if s.channelsIn == nil { pkts = make([]*packet, n) } else { pkts = append(s.channelsIn[:pkt.channel:pkt.channel], make([]*packet, 10)...) } s.channelTimestamp = timestamp s.channelsIn = pkts for i := int(s.channelsAllocatedIn); i < len(s.channelTimestamp); i++ { s.channelTimestamp[i] = 0 } for i := int(s.channelsAllocatedIn); i < int(n); i++ { s.channelsIn[i] = nil } s.channelsAllocatedIn = n } size := headerSizes[pkt.headerType] switch { case size == RTMP_LARGE_HEADER_SIZE: pkt.hasAbsTimestamp = true case size < RTMP_LARGE_HEADER_SIZE: if s.channelsIn[pkt.channel] != nil { *pkt = *(s.channelsIn[pkt.channel]) } } size-- if size > 0 { err = s.read(header[:size]) if err != nil { s.log(DebugLevel, pkg+"failed to read packet header", "error", err.Error()) return err } } hSize := len(hbuf) - len(header) + size if size >= 3 { pkt.timestamp = C_AMF_DecodeInt24(header[:3]) if size >= 6 { pkt.bodySize = C_AMF_DecodeInt24(header[3:6]) pkt.bytesRead = 0 if size > 6 { pkt.packetType = header[6] if size == 11 { pkt.info = decodeInt32LE(header[7:11]) } } } } extendedTimestamp := pkt.timestamp == 0xffffff if extendedTimestamp { err = s.read(header[size : size+4]) if err != nil { s.log(DebugLevel, pkg+"failed to read extended timestamp", "error", err.Error()) return err } // TODO: port this pkt.timestamp = C_AMF_DecodeInt32(header[size : size+4]) hSize += 4 } if pkt.bodySize > 0 && pkt.body == nil { resizePacket(pkt, pkt.bodySize, (hbuf[0]&0xc0)>>6) } toRead := int32(pkt.bodySize - pkt.bytesRead) chunkSize := s.inChunkSize if toRead < chunkSize { chunkSize = toRead } if pkt.chunk != nil { pkt.chunk.headerSize = int32(hSize) copy(pkt.chunk.header[:], hbuf[:hSize]) pkt.chunk.data = pkt.body[pkt.bytesRead : pkt.bytesRead+uint32(chunkSize)] } err = s.read(pkt.body[pkt.bytesRead:][:chunkSize]) if err != nil { s.log(DebugLevel, pkg+"failed to read packet body", "error", err.Error()) return err } pkt.bytesRead += uint32(chunkSize) // keep the packet as ref for other packets on this channel if s.channelsIn[pkt.channel] == nil { s.channelsIn[pkt.channel] = &packet{} } *(s.channelsIn[pkt.channel]) = *pkt if extendedTimestamp { s.channelsIn[pkt.channel].timestamp = 0xffffff } if pkt.bytesRead != pkt.bodySize { panic("readPacket: bytesRead != bodySize") } if !pkt.hasAbsTimestamp { // timestamps seem to always be relative pkt.timestamp += uint32(s.channelTimestamp[pkt.channel]) } s.channelTimestamp[pkt.channel] = int32(pkt.timestamp) s.channelsIn[pkt.channel].body = nil s.channelsIn[pkt.channel].bytesRead = 0 s.channelsIn[pkt.channel].hasAbsTimestamp = false return nil } // resizePacket adjusts the packet's storage to accommodate a body of the given size. func resizePacket(pkt *packet, size uint32, ht uint8) { buf := make([]byte, RTMP_MAX_HEADER_SIZE+size) pkt.header = buf pkt.body = buf[RTMP_MAX_HEADER_SIZE:] if ht != RTMP_PACKET_SIZE_AUTO { pkt.headerType = ht return } switch pkt.packetType { case RTMP_PACKET_TYPE_VIDEO, RTMP_PACKET_TYPE_AUDIO: if pkt.timestamp == 0 { pkt.headerType = RTMP_PACKET_SIZE_LARGE } else { pkt.headerType = RTMP_PACKET_SIZE_MEDIUM } case RTMP_PACKET_TYPE_INFO: pkt.headerType = RTMP_PACKET_SIZE_LARGE pkt.bodySize += 16 default: pkt.headerType = RTMP_PACKET_SIZE_MEDIUM } } // sendPacket sends a packet. func sendPacket(s *Session, pkt *packet, queue bool) error { var prevPkt *packet var last int if pkt.channel >= s.channelsAllocatedOut { n := int(pkt.channel + 10) var pkts []*packet if s.channelsOut == nil { pkts = make([]*packet, n) } else { pkts = append(s.channelsOut[:pkt.channel:pkt.channel], make([]*packet, 10)...) } s.channelsOut = pkts for i := int(s.channelsAllocatedOut); i < n; i++ { s.channelsOut[i] = nil } s.channelsAllocatedOut = int32(n) } prevPkt = s.channelsOut[pkt.channel] if prevPkt != nil && pkt.headerType != RTMP_PACKET_SIZE_LARGE { // compress a bit by using the prev packet's attributes if prevPkt.bodySize == pkt.bodySize && prevPkt.packetType == pkt.packetType && pkt.headerType == RTMP_PACKET_SIZE_MEDIUM { pkt.headerType = RTMP_PACKET_SIZE_SMALL } if prevPkt.timestamp == pkt.timestamp && pkt.headerType == RTMP_PACKET_SIZE_SMALL { pkt.headerType = RTMP_PACKET_SIZE_MINIMUM } last = int(prevPkt.timestamp) } if pkt.headerType > 3 { s.log(WarnLevel, pkg+"unexpected header type", "type", pkt.headerType) return errInvalidHeader } var headBytes []byte var origIdx int if pkt.body != nil { // Span from -packetsize for the type to the start of the body. headBytes = pkt.header origIdx = RTMP_MAX_HEADER_SIZE - headerSizes[pkt.headerType] } else { // Allocate a new header and allow 6 bytes of movement backward. var hbuf [RTMP_MAX_HEADER_SIZE]byte headBytes = hbuf[:] origIdx = 6 } var cSize int switch { case pkt.channel > 319: cSize = 2 case pkt.channel > 63: cSize = 1 } hSize := headerSizes[pkt.headerType] if cSize != 0 { origIdx -= cSize hSize += cSize } var ts uint32 if prevPkt != nil { ts = uint32(int(pkt.timestamp) - last) } if ts >= 0xffffff { origIdx -= 4 hSize += 4 s.log(DebugLevel, pkg+"larger timestamp than 24 bits", "timestamp", ts) } headerIdx := origIdx c := pkt.headerType << 6 switch cSize { case 0: c |= byte(pkt.channel) case 1: // Do nothing. case 2: c |= 1 } headBytes[headerIdx] = c headerIdx++ if cSize != 0 { tmp := pkt.channel - 64 headBytes[headerIdx] = byte(tmp & 0xff) headerIdx++ if cSize == 2 { headBytes[headerIdx] = byte(tmp >> 8) headerIdx++ } } if headerSizes[pkt.headerType] > 1 { res := ts if ts > 0xffffff { res = 0xffffff } C_AMF_EncodeInt24(headBytes[headerIdx:], int32(res)) headerIdx += 3 // 24bits } if headerSizes[pkt.headerType] > 4 { C_AMF_EncodeInt24(headBytes[headerIdx:], int32(pkt.bodySize)) headerIdx += 3 // 24bits headBytes[headerIdx] = pkt.packetType headerIdx++ } if headerSizes[pkt.headerType] > 8 { n := int(encodeInt32LE(headBytes[headerIdx:headerIdx+4], pkt.info)) headerIdx += n } if ts >= 0xffffff { C_AMF_EncodeInt32(headBytes[headerIdx:], int32(ts)) headerIdx += 4 // 32bits } size := int(pkt.bodySize) chunkSize := int(s.outChunkSize) s.log(DebugLevel, pkg+"sending packet", "size", size, "la", s.link.conn.LocalAddr(), "ra", s.link.conn.RemoteAddr()) if s.deferred != nil && len(s.deferred)+size+hSize > chunkSize { err := s.write(s.deferred) if err != nil { return err } s.deferred = nil } // TODO(kortschak): Rewrite this horrific peice of premature optimisation. // NB: RTMP wants packets in chunks which are 128 bytes by default, but the server may request a different size. for size+hSize != 0 { if s.deferred == nil && pkt.packetType == RTMP_PACKET_TYPE_AUDIO && size < chunkSize { s.deferred = headBytes[origIdx:][:size+hSize] s.log(DebugLevel, pkg+"deferred sending packet") break } if chunkSize > size { chunkSize = size } bytes := headBytes[origIdx:][:chunkSize+hSize] if s.deferred != nil { // Prepend the previously deferred packet and write it with the current one. bytes = append(s.deferred, bytes...) } err := s.write(bytes) if err != nil { return err } s.deferred = nil size -= chunkSize origIdx += chunkSize + hSize hSize = 0 if size > 0 { origIdx -= 1 + cSize hSize = 1 + cSize if ts >= 0xffffff { origIdx -= 4 hSize += 4 } headBytes[origIdx] = 0xc0 | c if cSize != 0 { tmp := int(pkt.channel) - 64 headBytes[origIdx+1] = byte(tmp) if cSize == 2 { headBytes[origIdx+2] = byte(tmp >> 8) } } if ts >= 0xffffff { extendedTimestamp := headBytes[origIdx+1+cSize:] C_AMF_EncodeInt32(extendedTimestamp[:4], int32(ts)) } } } // We invoked a remote method if pkt.packetType == RTMP_PACKET_TYPE_INVOKE { buf := pkt.body[1:] meth := C_AMF_DecodeString(buf) // keep it in call queue till result arrives if queue { s.log(DebugLevel, pkg+"queuing method "+meth) buf = buf[3+len(meth):] txn := int32(C_AMF_DecodeNumber(buf[:8])) s.methodCalls = append(s.methodCalls, method{name: meth, num: txn}) } } if s.channelsOut[pkt.channel] == nil { s.channelsOut[pkt.channel] = &packet{} } *(s.channelsOut[pkt.channel]) = *pkt return nil } func decodeInt32LE(data []byte) int32 { return int32(data[3])<<24 | int32(data[2])<<16 | int32(data[1])<<8 | int32(data[0]) } func encodeInt32LE(dst []byte, v int32) int32 { binary.LittleEndian.PutUint32(dst, uint32(v)) return 4 }