/* NAME packet.go DESCRIPTION RTMP packet functionality. AUTHORS Saxon Nelson-Milton Dan Kortschak Alan Noble LICENSE packet.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. Derived from librtmp under the GNU Lesser General Public License 2.1 Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org Copyright (C) 2008-2009 Andrej Stepanchuk Copyright (C) 2009-2010 Howard Chu */ package rtmp import ( "encoding/binary" "io" "bitbucket.org/ausocean/av/rtmp/amf" ) // Packet types. const ( packetTypeChunkSize = 0x01 packetTypeBytesReadReport = 0x03 packetTypeControl = 0x04 packetTypeServerBW = 0x05 packetTypeClientBW = 0x06 packetTypeAudio = 0x08 packetTypeVideo = 0x09 packetTypeFlexStreamSend = 0x0F // not implemented packetTypeFlexSharedObject = 0x10 // not implemented packetTypeFlexMessage = 0x11 // not implemented packetTypeInfo = 0x12 packetTypeInvoke = 0x14 packetTypeFlashVideo = 0x16 // not implemented ) // Header sizes. const ( headerSizeLarge = 0 headerSizeMedium = 1 headerSizeSmall = 2 headerSizeMinimum = 3 headerSizeAuto = 4 ) // Special channels. const ( chanBytesRead = 0x02 chanControl = 0x03 chanSource = 0x04 ) // headerSizes defines header sizes for header types 0, 1, 2 and 3 respectively: // 0: full header (12 bytes) // 1: header without message ID (8 bytes) // 2: basic header + timestamp (4 byes) // 3: basic header (chunk type and stream ID) (1 byte) var headerSizes = [...]int{12, 8, 4, 1} // packet defines an RTMP packet. type packet struct { headerType uint8 packetType uint8 channel int32 hasAbsTimestamp bool timestamp uint32 info int32 bodySize uint32 bytesRead uint32 header []byte body []byte } // read reads an RTMP packet. func (pkt *packet) read(s *Session) error { var hbuf [fullHeaderSize]byte header := hbuf[:] _, err := s.read(header[:1]) if err != nil { s.log(DebugLevel, pkg+"failed to read packet header 1st byte", "error", err.Error()) if err == io.EOF { s.log(WarnLevel, pkg+"EOF error; connection likely terminated") } return err } pkt.headerType = (header[0] & 0xc0) >> 6 pkt.channel = int32(header[0] & 0x3f) header = header[1:] switch { case pkt.channel == 0: _, err = s.read(header[:1]) if err != nil { s.log(DebugLevel, pkg+"failed to read packet header 2nd byte", "error", err.Error()) return err } header = header[1:] pkt.channel = int32(header[0]) + 64 case pkt.channel == 1: _, err = s.read(header[:2]) if err != nil { s.log(DebugLevel, pkg+"failed to read packet header 3rd byte", "error", err.Error()) return err } header = header[2:] pkt.channel = int32(binary.BigEndian.Uint16(header[:2])) + 64 } if pkt.channel >= s.channelsAllocatedIn { n := pkt.channel + 10 timestamp := append(s.channelTimestamp, make([]int32, 10)...) var pkts []*packet if s.channelsIn == nil { pkts = make([]*packet, n) } else { pkts = append(s.channelsIn[:pkt.channel:pkt.channel], make([]*packet, 10)...) } s.channelTimestamp = timestamp s.channelsIn = pkts for i := int(s.channelsAllocatedIn); i < len(s.channelTimestamp); i++ { s.channelTimestamp[i] = 0 } for i := int(s.channelsAllocatedIn); i < int(n); i++ { s.channelsIn[i] = nil } s.channelsAllocatedIn = n } size := headerSizes[pkt.headerType] switch { case size == fullHeaderSize: pkt.hasAbsTimestamp = true case size < fullHeaderSize: if s.channelsIn[pkt.channel] != nil { *pkt = *(s.channelsIn[pkt.channel]) } } size-- if size > 0 { _, err = s.read(header[:size]) if err != nil { s.log(DebugLevel, pkg+"failed to read packet header", "error", err.Error()) return err } } hSize := len(hbuf) - len(header) + size if size >= 3 { pkt.timestamp = amf.DecodeInt24(header[:3]) if size >= 6 { pkt.bodySize = amf.DecodeInt24(header[3:6]) pkt.bytesRead = 0 if size > 6 { pkt.packetType = header[6] if size == 11 { pkt.info = int32(amf.DecodeInt32LE(header[7:11])) } } } } extendedTimestamp := pkt.timestamp == 0xffffff if extendedTimestamp { _, err = s.read(header[size : size+4]) if err != nil { s.log(DebugLevel, pkg+"failed to read extended timestamp", "error", err.Error()) return err } // TODO: port this pkt.timestamp = amf.DecodeInt32(header[size : size+4]) hSize += 4 } if pkt.bodySize > 0 && pkt.body == nil { pkt.resize(pkt.bodySize, (hbuf[0]&0xc0)>>6) } toRead := int32(pkt.bodySize - pkt.bytesRead) chunkSize := s.inChunkSize if toRead < chunkSize { chunkSize = toRead } _, err = s.read(pkt.body[pkt.bytesRead:][:chunkSize]) if err != nil { s.log(DebugLevel, pkg+"failed to read packet body", "error", err.Error()) return err } pkt.bytesRead += uint32(chunkSize) // keep the packet as a reference for other packets on this channel if s.channelsIn[pkt.channel] == nil { s.channelsIn[pkt.channel] = &packet{} } *(s.channelsIn[pkt.channel]) = *pkt if extendedTimestamp { s.channelsIn[pkt.channel].timestamp = 0xffffff } if !pkt.hasAbsTimestamp { // timestamps seem to always be relative pkt.timestamp += uint32(s.channelTimestamp[pkt.channel]) } s.channelTimestamp[pkt.channel] = int32(pkt.timestamp) s.channelsIn[pkt.channel].body = nil s.channelsIn[pkt.channel].bytesRead = 0 s.channelsIn[pkt.channel].hasAbsTimestamp = false return nil } // resize adjusts the packet's storage to accommodate a body of the given size and header type. // When headerSizeAuto is specified, the header type is computed based on packet type. func (pkt *packet) resize(size uint32, ht uint8) { buf := make([]byte, fullHeaderSize+size) pkt.header = buf pkt.body = buf[fullHeaderSize:] if ht != headerSizeAuto { pkt.headerType = ht return } switch pkt.packetType { case packetTypeVideo, packetTypeAudio: if pkt.timestamp == 0 { pkt.headerType = headerSizeLarge } else { pkt.headerType = headerSizeMedium } case packetTypeInfo: pkt.headerType = headerSizeLarge pkt.bodySize += 16 default: pkt.headerType = headerSizeMedium } } // write sends an RTMP packet. // Packets are written in chunks which are Session.chunkSize in length (128 bytes in length). // We defers sending small audio packets and combine consecutive small audio packets where possible to reduce I/O. // When queue is true, we expect a response to this request and cache the method on s.methodCalls. func (pkt *packet) write(s *Session, queue bool) error { if pkt.body == nil { return errInvalidBody } if pkt.channel >= s.channelsAllocatedOut { s.log(DebugLevel, pkg+"growing channelsOut", "channel", pkt.channel) n := int(pkt.channel + 10) var pkts []*packet if s.channelsOut == nil { pkts = make([]*packet, n) } else { pkts = append(s.channelsOut[:pkt.channel:pkt.channel], make([]*packet, 10)...) } s.channelsOut = pkts for i := int(s.channelsAllocatedOut); i < n; i++ { s.channelsOut[i] = nil } s.channelsAllocatedOut = int32(n) } prevPkt := s.channelsOut[pkt.channel] var last int if prevPkt != nil && pkt.headerType != headerSizeLarge { // compress a bit by using the prev packet's attributes if prevPkt.bodySize == pkt.bodySize && prevPkt.packetType == pkt.packetType && pkt.headerType == headerSizeMedium { pkt.headerType = headerSizeSmall } if prevPkt.timestamp == pkt.timestamp && pkt.headerType == headerSizeSmall { pkt.headerType = headerSizeMinimum } last = int(prevPkt.timestamp) } if pkt.headerType > 3 { s.log(WarnLevel, pkg+"unexpected header type", "type", pkt.headerType) return errInvalidHeader } // The complete packet starts from headerSize _before_ the start the body. // origIdx is the original offset, which will be 0 for a full (12-byte) header or 11 for a minimum (1-byte) header. headBytes := pkt.header hSize := headerSizes[pkt.headerType] origIdx := fullHeaderSize - hSize // adjust 1 or 2 bytes for the channel cSize := 0 switch { case pkt.channel > 319: cSize = 2 case pkt.channel > 63: cSize = 1 } if cSize != 0 { origIdx -= cSize hSize += cSize } // adjust 4 bytes for the timestamp var ts uint32 if prevPkt != nil { ts = uint32(int(pkt.timestamp) - last) } if ts >= 0xffffff { origIdx -= 4 hSize += 4 s.log(DebugLevel, pkg+"larger timestamp than 24 bits", "timestamp", ts) } headerIdx := origIdx c := pkt.headerType << 6 switch cSize { case 0: c |= byte(pkt.channel) case 1: // Do nothing. case 2: c |= 1 } headBytes[headerIdx] = c headerIdx++ if cSize != 0 { tmp := pkt.channel - 64 headBytes[headerIdx] = byte(tmp & 0xff) headerIdx++ if cSize == 2 { headBytes[headerIdx] = byte(tmp >> 8) headerIdx++ } } if headerSizes[pkt.headerType] > 1 { res := ts if ts > 0xffffff { res = 0xffffff } amf.EncodeInt24(headBytes[headerIdx:], int32(res)) headerIdx += 3 // 24bits } if headerSizes[pkt.headerType] > 4 { amf.EncodeInt24(headBytes[headerIdx:], int32(pkt.bodySize)) headerIdx += 3 // 24bits headBytes[headerIdx] = pkt.packetType headerIdx++ } if headerSizes[pkt.headerType] > 8 { binary.LittleEndian.PutUint32(headBytes[headerIdx:headerIdx+4], uint32(pkt.info)) headerIdx += 4 // 32bits } if ts >= 0xffffff { amf.EncodeInt32(headBytes[headerIdx:], int32(ts)) headerIdx += 4 // 32bits } size := int(pkt.bodySize) chunkSize := int(s.outChunkSize) if s.deferred == nil { // Defer sending small audio packets (at most once). if pkt.packetType == packetTypeAudio && size < chunkSize { s.deferred = headBytes[origIdx:][:size+hSize] s.log(DebugLevel, pkg+"deferred sending packet", "size", size, "la", s.link.conn.LocalAddr(), "ra", s.link.conn.RemoteAddr()) return nil } } else { // Send previously deferrd packet if combining it with the next one would exceed the chunk size. if len(s.deferred)+size+hSize > chunkSize { s.log(DebugLevel, pkg+"sending deferred packet separately", "size", len(s.deferred)) _, err := s.write(s.deferred) if err != nil { return err } s.deferred = nil } } // TODO(kortschak): Rewrite this horrific peice of premature optimisation. s.log(DebugLevel, pkg+"sending packet", "la", s.link.conn.LocalAddr(), "ra", s.link.conn.RemoteAddr(), "size", size) for size+hSize != 0 { if chunkSize > size { chunkSize = size } bytes := headBytes[origIdx:][:chunkSize+hSize] if s.deferred != nil { // Prepend the previously deferred packet and write it with the current one. s.log(DebugLevel, pkg+"combining deferred packet", "size", len(s.deferred)) bytes = append(s.deferred, bytes...) } _, err := s.write(bytes) if err != nil { return err } s.deferred = nil size -= chunkSize origIdx += chunkSize + hSize hSize = 0 if size > 0 { // We are writing the 2nd or subsequent chunk. origIdx -= 1 + cSize hSize = 1 + cSize if ts >= 0xffffff { origIdx -= 4 hSize += 4 } headBytes[origIdx] = 0xc0 | c if cSize != 0 { tmp := int(pkt.channel) - 64 headBytes[origIdx+1] = byte(tmp) if cSize == 2 { headBytes[origIdx+2] = byte(tmp >> 8) } } if ts >= 0xffffff { extendedTimestamp := headBytes[origIdx+1+cSize:] amf.EncodeInt32(extendedTimestamp[:4], int32(ts)) } } } // We invoked a remote method, if pkt.packetType == packetTypeInvoke { buf := pkt.body[1:] meth := amf.DecodeString(buf) s.log(DebugLevel, pkg+"invoking method "+meth) // keep it in call queue till result arrives if queue { buf = buf[3+len(meth):] txn := int32(amf.DecodeNumber(buf[:8])) s.methodCalls = append(s.methodCalls, method{name: meth, num: txn}) } } if s.channelsOut[pkt.channel] == nil { s.channelsOut[pkt.channel] = &packet{} } *(s.channelsOut[pkt.channel]) = *pkt return nil }