protocol/rtmp: fixing handling of large packet sizes

This commit is contained in:
Saxon Nelson-Milton 2022-03-18 13:55:09 +10:30
parent dd6adbf9d0
commit aacb4ca3b3
2 changed files with 46 additions and 16 deletions

View File

@ -91,10 +91,15 @@ type packet struct {
timestamp uint32 timestamp uint32
streamID uint32 streamID uint32
bodySize uint32 bodySize uint32
bytesRead uint32
buf []byte buf []byte
body []byte body []byte
} }
func (pkt *packet) isReady() bool {
return pkt.bytesRead == pkt.bodySize
}
// readFrom reads a packet from the RTMP connection. // readFrom reads a packet from the RTMP connection.
func (pkt *packet) readFrom(c *Conn) error { func (pkt *packet) readFrom(c *Conn) error {
var hbuf [fullHeaderSize]byte var hbuf [fullHeaderSize]byte
@ -177,6 +182,7 @@ func (pkt *packet) readFrom(c *Conn) error {
if size >= 3 { if size >= 3 {
pkt.timestamp = amf.DecodeInt24(header[:3]) pkt.timestamp = amf.DecodeInt24(header[:3])
pkt.bytesRead = 0
if size >= 6 { if size >= 6 {
pkt.bodySize = amf.DecodeInt24(header[3:6]) pkt.bodySize = amf.DecodeInt24(header[3:6])
@ -206,12 +212,24 @@ func (pkt *packet) readFrom(c *Conn) error {
c.log(WarnLevel, pkg+"reading large packet", "size", int(pkt.bodySize)) c.log(WarnLevel, pkg+"reading large packet", "size", int(pkt.bodySize))
} }
_, err = c.read(pkt.body[:pkt.bodySize]) nToRead := pkt.bodySize - pkt.bytesRead
nChunk := c.inChunkSize
if nToRead < nChunk {
nChunk = nToRead
}
n, err := c.read(pkt.body[pkt.bytesRead : pkt.bytesRead+nChunk])
if err != nil { if err != nil {
c.log(DebugLevel, pkg+"failed to read packet body", "error", err.Error()) c.log(DebugLevel, pkg+"failed to read packet body", "error", err.Error())
return fmt.Errorf("failed to read packet body: %w", err) return fmt.Errorf("failed to read packet body: %w", err)
} }
if uint32(n) != nChunk {
return fmt.Errorf("did not read correct number of bytes, read: %d, expected: %d", n, nChunk)
}
pkt.bytesRead += nChunk
// Keep the packet as a reference for other packets on this channel. // Keep the packet as a reference for other packets on this channel.
if c.channelsIn[pkt.channel] == nil { if c.channelsIn[pkt.channel] == nil {
c.channelsIn[pkt.channel] = &packet{} c.channelsIn[pkt.channel] = &packet{}
@ -222,6 +240,7 @@ func (pkt *packet) readFrom(c *Conn) error {
c.channelsIn[pkt.channel].timestamp = 0xffffff c.channelsIn[pkt.channel].timestamp = 0xffffff
} }
if pkt.isReady() {
if !pkt.hasAbsTimestamp { if !pkt.hasAbsTimestamp {
// Timestamps seem to always be relative. // Timestamps seem to always be relative.
pkt.timestamp += uint32(c.channelTimestamp[pkt.channel]) pkt.timestamp += uint32(c.channelTimestamp[pkt.channel])
@ -230,6 +249,8 @@ func (pkt *packet) readFrom(c *Conn) error {
c.channelsIn[pkt.channel].body = nil c.channelsIn[pkt.channel].body = nil
c.channelsIn[pkt.channel].hasAbsTimestamp = false c.channelsIn[pkt.channel].hasAbsTimestamp = false
}
return nil return nil
} }

View File

@ -197,13 +197,18 @@ func connect(c *Conn) error {
c.log(DebugLevel, pkg+"negotiating") c.log(DebugLevel, pkg+"negotiating")
var buf [256]byte var buf [256]byte
for !c.isPlaying {
pkt := packet{buf: buf[:]} pkt := packet{buf: buf[:]}
for !c.isPlaying {
err = pkt.readFrom(c) err = pkt.readFrom(c)
if err != nil { if err != nil {
return fmt.Errorf("could not read from packet: %w", err) return fmt.Errorf("could not read from packet: %w", err)
} }
if pkt.isReady() {
if pkt.bodySize == 0 {
continue
}
switch pkt.packetType { switch pkt.packetType {
case packetTypeAudio, packetTypeVideo, packetTypeInfo: case packetTypeAudio, packetTypeVideo, packetTypeInfo:
c.log(WarnLevel, pkg+"got packet before play; ignoring", "type", pkt.packetType) c.log(WarnLevel, pkg+"got packet before play; ignoring", "type", pkt.packetType)
@ -213,6 +218,10 @@ func connect(c *Conn) error {
return fmt.Errorf("could not handle packet: %w", err) return fmt.Errorf("could not handle packet: %w", err)
} }
} }
pkt = packet{buf: buf[:]}
}
} }
return nil return nil
} }