diff --git a/rtmp/packet.go b/rtmp/packet.go index 99c9e029..4fc2141b 100644 --- a/rtmp/packet.go +++ b/rtmp/packet.go @@ -3,7 +3,7 @@ NAME packet.go DESCRIPTION - See Readme.md + RTMP packet functionality. AUTHORS Saxon Nelson-Milton @@ -284,10 +284,12 @@ func (pkt *packet) resize(size uint32, ht uint8) { // write sends a packet. func (pkt *packet) write(s *Session, queue bool) error { - var prevPkt *packet - var last int + if pkt.body == nil { + return errInvalidBody + } if pkt.channel >= s.channelsAllocatedOut { + s.log(DebugLevel, pkg+"growing channelsOut", "channel", pkt.channel) n := int(pkt.channel + 10) var pkts []*packet @@ -304,8 +306,9 @@ func (pkt *packet) write(s *Session, queue bool) error { s.channelsAllocatedOut = int32(n) } - prevPkt = s.channelsOut[pkt.channel] + prevPkt := s.channelsOut[pkt.channel] + var last int if prevPkt != nil && pkt.headerType != RTMP_PACKET_SIZE_LARGE { // compress a bit by using the prev packet's attributes if prevPkt.bodySize == pkt.bodySize && prevPkt.packetType == pkt.packetType && pkt.headerType == RTMP_PACKET_SIZE_MEDIUM { @@ -324,20 +327,14 @@ func (pkt *packet) write(s *Session, queue bool) error { return errInvalidHeader } - var headBytes []byte - var origIdx int - if pkt.body != nil { - // Span from -packetsize for the type to the start of the body. - headBytes = pkt.header - origIdx = RTMP_MAX_HEADER_SIZE - headerSizes[pkt.headerType] - } else { - // Allocate a new header and allow 6 bytes of movement backward. - var hbuf [RTMP_MAX_HEADER_SIZE]byte - headBytes = hbuf[:] - origIdx = 6 - } + // The complete packet starts from headerSize _before_ the start the body. + // origIdx is the original offset, which will be 0 for a full (12-byte) header or 11 for a minimum (1-byte) header. + headBytes := pkt.header + hSize := headerSizes[pkt.headerType] + origIdx := RTMP_MAX_HEADER_SIZE - hSize - var cSize int + // adjust 1 or 2 bytes for the channel + cSize := 0 switch { case pkt.channel > 319: cSize = 2 @@ -345,12 +342,12 @@ func (pkt *packet) write(s *Session, queue bool) error { cSize = 1 } - hSize := headerSizes[pkt.headerType] if cSize != 0 { origIdx -= cSize hSize += cSize } + // adjust 4 bytes for the timestamp var ts uint32 if prevPkt != nil { ts = uint32(int(pkt.timestamp) - last) @@ -403,8 +400,8 @@ func (pkt *packet) write(s *Session, queue bool) error { } if headerSizes[pkt.headerType] > 8 { - n := int(encodeInt32LE(headBytes[headerIdx:headerIdx+4], pkt.info)) - headerIdx += n + binary.LittleEndian.PutUint32(headBytes[headerIdx:headerIdx+4], uint32(pkt.info)) + headerIdx += 4 // 32bits } if ts >= 0xffffff { @@ -415,31 +412,38 @@ func (pkt *packet) write(s *Session, queue bool) error { size := int(pkt.bodySize) chunkSize := int(s.outChunkSize) - s.log(DebugLevel, pkg+"sending packet", "size", size, "la", s.link.conn.LocalAddr(), "ra", s.link.conn.RemoteAddr()) - - if s.deferred != nil && len(s.deferred)+size+hSize > chunkSize { - err := s.write(s.deferred) - if err != nil { - return err + if s.deferred == nil { + // Defer sending small audio packets (at most once). + if pkt.packetType == RTMP_PACKET_TYPE_AUDIO && size < chunkSize { + s.deferred = headBytes[origIdx:][:size+hSize] + s.log(DebugLevel, pkg+"deferred sending packet", "size", size, "la", s.link.conn.LocalAddr(), "ra", s.link.conn.RemoteAddr()) + return nil + } + } else { + // Send previously deferrd packet if combining it with the next one would exceed the chunk size. + if len(s.deferred)+size+hSize > chunkSize { + s.log(DebugLevel, pkg+"sending deferred packet separately", "size", len(s.deferred)) + err := s.write(s.deferred) + if err != nil { + return err + } + s.deferred = nil } - s.deferred = nil } // TODO(kortschak): Rewrite this horrific peice of premature optimisation. // NB: RTMP wants packets in chunks which are 128 bytes by default, but the server may request a different size. + s.log(DebugLevel, pkg+"sending packet", "la", s.link.conn.LocalAddr(), "ra", s.link.conn.RemoteAddr(), "size", size) for size+hSize != 0 { - if s.deferred == nil && pkt.packetType == RTMP_PACKET_TYPE_AUDIO && size < chunkSize { - s.deferred = headBytes[origIdx:][:size+hSize] - s.log(DebugLevel, pkg+"deferred sending packet") - break - } if chunkSize > size { chunkSize = size } bytes := headBytes[origIdx:][:chunkSize+hSize] if s.deferred != nil { // Prepend the previously deferred packet and write it with the current one. + s.log(DebugLevel, pkg+"combining deferred packet", "size", len(s.deferred)) bytes = append(s.deferred, bytes...) + s.deferred = nil } err := s.write(bytes) if err != nil { @@ -481,9 +485,9 @@ func (pkt *packet) write(s *Session, queue bool) error { if pkt.packetType == RTMP_PACKET_TYPE_INVOKE { buf := pkt.body[1:] meth := C_AMF_DecodeString(buf) + s.log(DebugLevel, pkg+"invoking method "+meth) // keep it in call queue till result arrives if queue { - s.log(DebugLevel, pkg+"queuing method "+meth) buf = buf[3+len(meth):] txn := int32(C_AMF_DecodeNumber(buf[:8])) s.methodCalls = append(s.methodCalls, method{name: meth, num: txn})