First cut at refactoring packet.write().

This commit is contained in:
scruzin 2019-01-11 00:41:28 +10:30
parent b3b1b04814
commit 82522643bb
1 changed files with 37 additions and 33 deletions

View File

@ -3,7 +3,7 @@ NAME
packet.go packet.go
DESCRIPTION DESCRIPTION
See Readme.md RTMP packet functionality.
AUTHORS AUTHORS
Saxon Nelson-Milton <saxon@ausocean.org> Saxon Nelson-Milton <saxon@ausocean.org>
@ -284,10 +284,12 @@ func (pkt *packet) resize(size uint32, ht uint8) {
// write sends a packet. // write sends a packet.
func (pkt *packet) write(s *Session, queue bool) error { func (pkt *packet) write(s *Session, queue bool) error {
var prevPkt *packet if pkt.body == nil {
var last int return errInvalidBody
}
if pkt.channel >= s.channelsAllocatedOut { if pkt.channel >= s.channelsAllocatedOut {
s.log(DebugLevel, pkg+"growing channelsOut", "channel", pkt.channel)
n := int(pkt.channel + 10) n := int(pkt.channel + 10)
var pkts []*packet var pkts []*packet
@ -304,8 +306,9 @@ func (pkt *packet) write(s *Session, queue bool) error {
s.channelsAllocatedOut = int32(n) s.channelsAllocatedOut = int32(n)
} }
prevPkt = s.channelsOut[pkt.channel]
prevPkt := s.channelsOut[pkt.channel]
var last int
if prevPkt != nil && pkt.headerType != RTMP_PACKET_SIZE_LARGE { if prevPkt != nil && pkt.headerType != RTMP_PACKET_SIZE_LARGE {
// compress a bit by using the prev packet's attributes // compress a bit by using the prev packet's attributes
if prevPkt.bodySize == pkt.bodySize && prevPkt.packetType == pkt.packetType && pkt.headerType == RTMP_PACKET_SIZE_MEDIUM { if prevPkt.bodySize == pkt.bodySize && prevPkt.packetType == pkt.packetType && pkt.headerType == RTMP_PACKET_SIZE_MEDIUM {
@ -324,20 +327,14 @@ func (pkt *packet) write(s *Session, queue bool) error {
return errInvalidHeader return errInvalidHeader
} }
var headBytes []byte // The complete packet starts from headerSize _before_ the start the body.
var origIdx int // origIdx is the original offset, which will be 0 for a full (12-byte) header or 11 for a minimum (1-byte) header.
if pkt.body != nil { headBytes := pkt.header
// Span from -packetsize for the type to the start of the body. hSize := headerSizes[pkt.headerType]
headBytes = pkt.header origIdx := RTMP_MAX_HEADER_SIZE - hSize
origIdx = RTMP_MAX_HEADER_SIZE - headerSizes[pkt.headerType]
} else {
// Allocate a new header and allow 6 bytes of movement backward.
var hbuf [RTMP_MAX_HEADER_SIZE]byte
headBytes = hbuf[:]
origIdx = 6
}
var cSize int // adjust 1 or 2 bytes for the channel
cSize := 0
switch { switch {
case pkt.channel > 319: case pkt.channel > 319:
cSize = 2 cSize = 2
@ -345,12 +342,12 @@ func (pkt *packet) write(s *Session, queue bool) error {
cSize = 1 cSize = 1
} }
hSize := headerSizes[pkt.headerType]
if cSize != 0 { if cSize != 0 {
origIdx -= cSize origIdx -= cSize
hSize += cSize hSize += cSize
} }
// adjust 4 bytes for the timestamp
var ts uint32 var ts uint32
if prevPkt != nil { if prevPkt != nil {
ts = uint32(int(pkt.timestamp) - last) ts = uint32(int(pkt.timestamp) - last)
@ -403,8 +400,8 @@ func (pkt *packet) write(s *Session, queue bool) error {
} }
if headerSizes[pkt.headerType] > 8 { if headerSizes[pkt.headerType] > 8 {
n := int(encodeInt32LE(headBytes[headerIdx:headerIdx+4], pkt.info)) binary.LittleEndian.PutUint32(headBytes[headerIdx:headerIdx+4], uint32(pkt.info))
headerIdx += n headerIdx += 4 // 32bits
} }
if ts >= 0xffffff { if ts >= 0xffffff {
@ -415,31 +412,38 @@ func (pkt *packet) write(s *Session, queue bool) error {
size := int(pkt.bodySize) size := int(pkt.bodySize)
chunkSize := int(s.outChunkSize) chunkSize := int(s.outChunkSize)
s.log(DebugLevel, pkg+"sending packet", "size", size, "la", s.link.conn.LocalAddr(), "ra", s.link.conn.RemoteAddr()) if s.deferred == nil {
// Defer sending small audio packets (at most once).
if s.deferred != nil && len(s.deferred)+size+hSize > chunkSize { if pkt.packetType == RTMP_PACKET_TYPE_AUDIO && size < chunkSize {
s.deferred = headBytes[origIdx:][:size+hSize]
s.log(DebugLevel, pkg+"deferred sending packet", "size", size, "la", s.link.conn.LocalAddr(), "ra", s.link.conn.RemoteAddr())
return nil
}
} else {
// Send previously deferrd packet if combining it with the next one would exceed the chunk size.
if len(s.deferred)+size+hSize > chunkSize {
s.log(DebugLevel, pkg+"sending deferred packet separately", "size", len(s.deferred))
err := s.write(s.deferred) err := s.write(s.deferred)
if err != nil { if err != nil {
return err return err
} }
s.deferred = nil s.deferred = nil
} }
}
// TODO(kortschak): Rewrite this horrific peice of premature optimisation. // TODO(kortschak): Rewrite this horrific peice of premature optimisation.
// NB: RTMP wants packets in chunks which are 128 bytes by default, but the server may request a different size. // NB: RTMP wants packets in chunks which are 128 bytes by default, but the server may request a different size.
s.log(DebugLevel, pkg+"sending packet", "la", s.link.conn.LocalAddr(), "ra", s.link.conn.RemoteAddr(), "size", size)
for size+hSize != 0 { for size+hSize != 0 {
if s.deferred == nil && pkt.packetType == RTMP_PACKET_TYPE_AUDIO && size < chunkSize {
s.deferred = headBytes[origIdx:][:size+hSize]
s.log(DebugLevel, pkg+"deferred sending packet")
break
}
if chunkSize > size { if chunkSize > size {
chunkSize = size chunkSize = size
} }
bytes := headBytes[origIdx:][:chunkSize+hSize] bytes := headBytes[origIdx:][:chunkSize+hSize]
if s.deferred != nil { if s.deferred != nil {
// Prepend the previously deferred packet and write it with the current one. // Prepend the previously deferred packet and write it with the current one.
s.log(DebugLevel, pkg+"combining deferred packet", "size", len(s.deferred))
bytes = append(s.deferred, bytes...) bytes = append(s.deferred, bytes...)
s.deferred = nil
} }
err := s.write(bytes) err := s.write(bytes)
if err != nil { if err != nil {
@ -481,9 +485,9 @@ func (pkt *packet) write(s *Session, queue bool) error {
if pkt.packetType == RTMP_PACKET_TYPE_INVOKE { if pkt.packetType == RTMP_PACKET_TYPE_INVOKE {
buf := pkt.body[1:] buf := pkt.body[1:]
meth := C_AMF_DecodeString(buf) meth := C_AMF_DecodeString(buf)
s.log(DebugLevel, pkg+"invoking method "+meth)
// keep it in call queue till result arrives // keep it in call queue till result arrives
if queue { if queue {
s.log(DebugLevel, pkg+"queuing method "+meth)
buf = buf[3+len(meth):] buf = buf[3+len(meth):]
txn := int32(C_AMF_DecodeNumber(buf[:8])) txn := int32(C_AMF_DecodeNumber(buf[:8]))
s.methodCalls = append(s.methodCalls, method{name: meth, num: txn}) s.methodCalls = append(s.methodCalls, method{name: meth, num: txn})