diff --git a/rtmp/packet.go b/rtmp/packet.go index b88b15bc..73cc0088 100644 --- a/rtmp/packet.go +++ b/rtmp/packet.go @@ -96,15 +96,15 @@ type packet struct { } // readFrom reads a packet from the RTMP connection. -func (pkt *packet) readFrom(s *Session) error { +func (pkt *packet) readFrom(c *Conn) error { var hbuf [fullHeaderSize]byte header := hbuf[:] - _, err := s.read(header[:1]) + _, err := c.read(header[:1]) if err != nil { - s.log(DebugLevel, pkg+"failed to read packet header 1st byte", "error", err.Error()) + c.log(DebugLevel, pkg+"failed to read packet header 1st byte", "error", err.Error()) if err == io.EOF { - s.log(WarnLevel, pkg+"EOF error; connection likely terminated") + c.log(WarnLevel, pkg+"EOF error; connection likely terminated") } return err } @@ -114,45 +114,45 @@ func (pkt *packet) readFrom(s *Session) error { switch { case pkt.channel == 0: - _, err = s.read(header[:1]) + _, err = c.read(header[:1]) if err != nil { - s.log(DebugLevel, pkg+"failed to read packet header 2nd byte", "error", err.Error()) + c.log(DebugLevel, pkg+"failed to read packet header 2nd byte", "error", err.Error()) return err } header = header[1:] pkt.channel = int32(header[0]) + 64 case pkt.channel == 1: - _, err = s.read(header[:2]) + _, err = c.read(header[:2]) if err != nil { - s.log(DebugLevel, pkg+"failed to read packet header 3rd byte", "error", err.Error()) + c.log(DebugLevel, pkg+"failed to read packet header 3rd byte", "error", err.Error()) return err } header = header[2:] pkt.channel = int32(binary.BigEndian.Uint16(header[:2])) + 64 } - if pkt.channel >= s.channelsAllocatedIn { + if pkt.channel >= c.channelsAllocatedIn { n := pkt.channel + 10 - timestamp := append(s.channelTimestamp, make([]int32, 10)...) + timestamp := append(c.channelTimestamp, make([]int32, 10)...) var pkts []*packet - if s.channelsIn == nil { + if c.channelsIn == nil { pkts = make([]*packet, n) } else { - pkts = append(s.channelsIn[:pkt.channel:pkt.channel], make([]*packet, 10)...) + pkts = append(c.channelsIn[:pkt.channel:pkt.channel], make([]*packet, 10)...) } - s.channelTimestamp = timestamp - s.channelsIn = pkts + c.channelTimestamp = timestamp + c.channelsIn = pkts - for i := int(s.channelsAllocatedIn); i < len(s.channelTimestamp); i++ { - s.channelTimestamp[i] = 0 + for i := int(c.channelsAllocatedIn); i < len(c.channelTimestamp); i++ { + c.channelTimestamp[i] = 0 } - for i := int(s.channelsAllocatedIn); i < int(n); i++ { - s.channelsIn[i] = nil + for i := int(c.channelsAllocatedIn); i < int(n); i++ { + c.channelsIn[i] = nil } - s.channelsAllocatedIn = n + c.channelsAllocatedIn = n } size := headerSizes[pkt.headerType] @@ -160,16 +160,16 @@ func (pkt *packet) readFrom(s *Session) error { case size == fullHeaderSize: pkt.hasAbsTimestamp = true case size < fullHeaderSize: - if s.channelsIn[pkt.channel] != nil { - *pkt = *(s.channelsIn[pkt.channel]) + if c.channelsIn[pkt.channel] != nil { + *pkt = *(c.channelsIn[pkt.channel]) } } size-- if size > 0 { - _, err = s.read(header[:size]) + _, err = c.read(header[:size]) if err != nil { - s.log(DebugLevel, pkg+"failed to read packet header", "error", err.Error()) + c.log(DebugLevel, pkg+"failed to read packet header", "error", err.Error()) return err } } @@ -192,9 +192,9 @@ func (pkt *packet) readFrom(s *Session) error { extendedTimestamp := pkt.timestamp == 0xffffff if extendedTimestamp { - _, err = s.read(header[size : size+4]) + _, err = c.read(header[size : size+4]) if err != nil { - s.log(DebugLevel, pkg+"failed to read extended timestamp", "error", err.Error()) + c.log(DebugLevel, pkg+"failed to read extended timestamp", "error", err.Error()) return err } pkt.timestamp = amf.DecodeInt32(header[size : size+4]) @@ -206,39 +206,39 @@ func (pkt *packet) readFrom(s *Session) error { } toRead := pkt.bodySize - pkt.bytesRead - chunkSize := s.inChunkSize + chunkSize := c.inChunkSize if toRead < chunkSize { chunkSize = toRead } - _, err = s.read(pkt.body[pkt.bytesRead:][:chunkSize]) + _, err = c.read(pkt.body[pkt.bytesRead:][:chunkSize]) if err != nil { - s.log(DebugLevel, pkg+"failed to read packet body", "error", err.Error()) + c.log(DebugLevel, pkg+"failed to read packet body", "error", err.Error()) return err } pkt.bytesRead += uint32(chunkSize) // Keep the packet as a reference for other packets on this channel. - if s.channelsIn[pkt.channel] == nil { - s.channelsIn[pkt.channel] = &packet{} + if c.channelsIn[pkt.channel] == nil { + c.channelsIn[pkt.channel] = &packet{} } - *(s.channelsIn[pkt.channel]) = *pkt + *(c.channelsIn[pkt.channel]) = *pkt if extendedTimestamp { - s.channelsIn[pkt.channel].timestamp = 0xffffff + c.channelsIn[pkt.channel].timestamp = 0xffffff } if !pkt.hasAbsTimestamp { // Timestamps seem to always be relative. - pkt.timestamp += uint32(s.channelTimestamp[pkt.channel]) + pkt.timestamp += uint32(c.channelTimestamp[pkt.channel]) } - s.channelTimestamp[pkt.channel] = int32(pkt.timestamp) + c.channelTimestamp[pkt.channel] = int32(pkt.timestamp) - s.channelsIn[pkt.channel].body = nil - s.channelsIn[pkt.channel].bytesRead = 0 - s.channelsIn[pkt.channel].hasAbsTimestamp = false + c.channelsIn[pkt.channel].body = nil + c.channelsIn[pkt.channel].bytesRead = 0 + c.channelsIn[pkt.channel].hasAbsTimestamp = false return nil } @@ -270,35 +270,35 @@ func (pkt *packet) resize(size uint32, ht uint8) { // writeTo writes a packet to the RTMP connection. // Packets are written in chunks which are Session.chunkSize in length (128 bytes in length). // We defer sending small audio packets and combine consecutive small audio packets where possible to reduce I/O. -// When queue is true, we expect a response to this request and cache the method on s.methodCalls. -func (pkt *packet) writeTo(s *Session, queue bool) error { +// When queue is true, we expect a response to this request and cache the method on c.methodCallc. +func (pkt *packet) writeTo(c *Conn, queue bool) error { if pkt.body == nil || pkt.bodySize == 0 { return errInvalidBody } - if pkt.channel >= s.channelsAllocatedOut { - s.log(DebugLevel, pkg+"growing channelsOut", "channel", pkt.channel) + if pkt.channel >= c.channelsAllocatedOut { + c.log(DebugLevel, pkg+"growing channelsOut", "channel", pkt.channel) n := int(pkt.channel + 10) var pkts []*packet - if s.channelsOut == nil { + if c.channelsOut == nil { pkts = make([]*packet, n) } else { - pkts = append(s.channelsOut[:pkt.channel:pkt.channel], make([]*packet, 10)...) + pkts = append(c.channelsOut[:pkt.channel:pkt.channel], make([]*packet, 10)...) } - s.channelsOut = pkts + c.channelsOut = pkts - for i := int(s.channelsAllocatedOut); i < n; i++ { - s.channelsOut[i] = nil + for i := int(c.channelsAllocatedOut); i < n; i++ { + c.channelsOut[i] = nil } - s.channelsAllocatedOut = int32(n) + c.channelsAllocatedOut = int32(n) } - prevPkt := s.channelsOut[pkt.channel] + prevPkt := c.channelsOut[pkt.channel] var last int if prevPkt != nil && pkt.headerType != headerSizeLarge { - // Compress header by using the previous packet's attributes. + // Compress header by using the previous packet's attributec. if prevPkt.bodySize == pkt.bodySize && prevPkt.packetType == pkt.packetType && pkt.headerType == headerSizeMedium { pkt.headerType = headerSizeSmall } @@ -311,7 +311,7 @@ func (pkt *packet) writeTo(s *Session, queue bool) error { } if pkt.headerType > 3 { - s.log(WarnLevel, pkg+"unexpected header type", "type", pkt.headerType) + c.log(WarnLevel, pkg+"unexpected header type", "type", pkt.headerType) return errInvalidHeader } @@ -343,21 +343,21 @@ func (pkt *packet) writeTo(s *Session, queue bool) error { if ts >= 0xffffff { origIdx -= 4 hSize += 4 - s.log(DebugLevel, pkg+"larger timestamp than 24 bits", "timestamp", ts) + c.log(DebugLevel, pkg+"larger timestamp than 24 bits", "timestamp", ts) } headerIdx := origIdx - c := pkt.headerType << 6 + ch := pkt.headerType << 6 switch cSize { case 0: - c |= byte(pkt.channel) + ch |= byte(pkt.channel) case 1: // Do nothing. case 2: - c |= 1 + ch |= 1 } - headBytes[headerIdx] = c + headBytes[headerIdx] = ch headerIdx++ if cSize != 0 { @@ -398,44 +398,44 @@ func (pkt *packet) writeTo(s *Session, queue bool) error { } size := int(pkt.bodySize) - chunkSize := int(s.outChunkSize) + chunkSize := int(c.outChunkSize) - if s.deferred == nil { + if c.deferred == nil { // Defer sending small audio packets (at most once). if pkt.packetType == packetTypeAudio && size < chunkSize { - s.deferred = headBytes[origIdx:][:size+hSize] - s.log(DebugLevel, pkg+"deferred sending packet", "size", size, "la", s.link.conn.LocalAddr(), "ra", s.link.conn.RemoteAddr()) + c.deferred = headBytes[origIdx:][:size+hSize] + c.log(DebugLevel, pkg+"deferred sending packet", "size", size, "la", c.link.conn.LocalAddr(), "ra", c.link.conn.RemoteAddr()) return nil } } else { // Send previously deferrd packet if combining it with the next one would exceed the chunk size. - if len(s.deferred)+size+hSize > chunkSize { - s.log(DebugLevel, pkg+"sending deferred packet separately", "size", len(s.deferred)) - _, err := s.write(s.deferred) + if len(c.deferred)+size+hSize > chunkSize { + c.log(DebugLevel, pkg+"sending deferred packet separately", "size", len(c.deferred)) + _, err := c.write(c.deferred) if err != nil { return err } - s.deferred = nil + c.deferred = nil } } // TODO(kortschak): Rewrite this horrific peice of premature optimisation. - s.log(DebugLevel, pkg+"sending packet", "la", s.link.conn.LocalAddr(), "ra", s.link.conn.RemoteAddr(), "size", size) + c.log(DebugLevel, pkg+"sending packet", "la", c.link.conn.LocalAddr(), "ra", c.link.conn.RemoteAddr(), "size", size) for size+hSize != 0 { if chunkSize > size { chunkSize = size } bytes := headBytes[origIdx:][:chunkSize+hSize] - if s.deferred != nil { + if c.deferred != nil { // Prepend the previously deferred packet and write it with the current one. - s.log(DebugLevel, pkg+"combining deferred packet", "size", len(s.deferred)) - bytes = append(s.deferred, bytes...) + c.log(DebugLevel, pkg+"combining deferred packet", "size", len(c.deferred)) + bytes = append(c.deferred, bytes...) } - _, err := s.write(bytes) + _, err := c.write(bytes) if err != nil { return err } - s.deferred = nil + c.deferred = nil size -= chunkSize origIdx += chunkSize + hSize @@ -451,7 +451,7 @@ func (pkt *packet) writeTo(s *Session, queue bool) error { hSize += 4 } - headBytes[origIdx] = 0xc0 | c + headBytes[origIdx] = 0xc0 | ch if cSize != 0 { tmp := int(pkt.channel) - 64 @@ -468,20 +468,20 @@ func (pkt *packet) writeTo(s *Session, queue bool) error { } } - // If we invoked a remote method and queue is true, we queue the method until the result arrives. + // If we invoked a remote method and queue is true, we queue the method until the result arrivec. if pkt.packetType == packetTypeInvoke && queue { buf := pkt.body[1:] meth := amf.DecodeString(buf) - s.log(DebugLevel, pkg+"queuing method "+meth) + c.log(DebugLevel, pkg+"queuing method "+meth) buf = buf[3+len(meth):] txn := int32(amf.DecodeNumber(buf[:8])) - s.methodCalls = append(s.methodCalls, method{name: meth, num: txn}) + c.methodCalls = append(c.methodCalls, method{name: meth, num: txn}) } - if s.channelsOut[pkt.channel] == nil { - s.channelsOut[pkt.channel] = &packet{} + if c.channelsOut[pkt.channel] == nil { + c.channelsOut[pkt.channel] = &packet{} } - *(s.channelsOut[pkt.channel]) = *pkt + *(c.channelsOut[pkt.channel]) = *pkt return nil }