Removed used fields and initalize packets lazily.

This commit is contained in:
scruzin 2019-01-07 21:45:00 +10:30
parent 0b869523b4
commit 04679e4757
4 changed files with 78 additions and 116 deletions

View File

@ -62,6 +62,12 @@ const (
RTMP_PACKET_SIZE_MINIMUM = 3 RTMP_PACKET_SIZE_MINIMUM = 3
) )
const (
RTMP_CHANNEL_BYTES_READ = 0x02
RTMP_CHANNEL_CONTROL = 0x03
RTMP_CHANNEL_SOURCE = 0x04
)
// packetSize defines valid packet sizes. // packetSize defines valid packet sizes.
var packetSize = [...]int{12, 8, 4, 1} var packetSize = [...]int{12, 8, 4, 1}
@ -257,7 +263,7 @@ func resizePacket(pkt *packet, size uint32, ht uint8) {
} }
// sendPacket sends a packet. // sendPacket sends a packet.
func sendPacket(s *Session, pkt *packet, queue int) error { func sendPacket(s *Session, pkt *packet, queue bool) error {
var prevPkt *packet var prevPkt *packet
var last int var last int
@ -455,7 +461,6 @@ func sendPacket(s *Session, pkt *packet, queue int) error {
} }
// We invoked a remote method // We invoked a remote method
// TODO: port the const
if pkt.packetType == RTMP_PACKET_TYPE_INVOKE { if pkt.packetType == RTMP_PACKET_TYPE_INVOKE {
buf := pkt.body[1:] buf := pkt.body[1:]
meth := C_AMF_DecodeString(buf) meth := C_AMF_DecodeString(buf)
@ -464,7 +469,7 @@ func sendPacket(s *Session, pkt *packet, queue int) error {
log.Printf("invoking %v", meth) log.Printf("invoking %v", meth)
} }
// keep it in call queue till result arrives // keep it in call queue till result arrives
if queue != 0 { if queue {
buf = buf[3+len(meth):] buf = buf[3+len(meth):]
txn := int32(C_AMF_DecodeNumber(buf[:8])) txn := int32(C_AMF_DecodeNumber(buf[:8]))
s.methodCalls = append(s.methodCalls, method{name: meth, num: txn}) s.methodCalls = append(s.methodCalls, method{name: meth, num: txn})

View File

@ -166,7 +166,7 @@ func setupURL(s *Session, addr string) (err error) {
} }
// connect establishes an RTMP connection. // connect establishes an RTMP connection.
func connect(s *Session, cp *packet) error { func connect(s *Session) error {
addr, err := net.ResolveTCPAddr("tcp4", s.link.host+":"+strconv.Itoa(int(s.link.port))) addr, err := net.ResolveTCPAddr("tcp4", s.link.host+":"+strconv.Itoa(int(s.link.port)))
if err != nil { if err != nil {
return err return err
@ -178,7 +178,7 @@ func connect(s *Session, cp *packet) error {
if debugMode { if debugMode {
log.Println("... connected, handshaking...") log.Println("... connected, handshaking...")
} }
err = handshake(s, 1) err = handshake(s)
if err != nil { if err != nil {
log.Println("connect: handshake failed") log.Println("connect: handshake failed")
return errHandshake return errHandshake
@ -186,7 +186,7 @@ func connect(s *Session, cp *packet) error {
if debugMode { if debugMode {
log.Println("... handshaked...") log.Println("... handshaked...")
} }
err = sendConnectPacket(s, cp) err = sendConnectPacket(s)
if err != nil { if err != nil {
log.Println("connect: sendConnect failed") log.Println("connect: sendConnect failed")
return errConnSend return errConnSend
@ -195,13 +195,9 @@ func connect(s *Session, cp *packet) error {
} }
// connectStream reads a packet and handles it // connectStream reads a packet and handles it
func connectStream(s *Session, seekTime int32) error { func connectStream(s *Session) error {
var pkt packet var pkt packet
if seekTime > 0 {
s.link.seekTime = seekTime
}
for !s.isPlaying && s.isConnected() { for !s.isPlaying && s.isConnected() {
err := readPacket(s, &pkt) err := readPacket(s, &pkt)
if err != nil { if err != nil {
@ -331,19 +327,12 @@ func writeN(s *Session, buf []byte) error {
return nil return nil
} }
func sendConnectPacket(s *Session, cp *packet) error { func sendConnectPacket(s *Session) error {
if cp != nil {
return sendPacket(s, cp, 1)
}
var pbuf [4096]byte var pbuf [4096]byte
pkt := packet{ pkt := packet{
channel: 0x03, channel: RTMP_CHANNEL_CONTROL,
headerType: RTMP_PACKET_SIZE_LARGE, headerType: RTMP_PACKET_SIZE_LARGE,
packetType: RTMP_PACKET_TYPE_INVOKE, packetType: RTMP_PACKET_TYPE_INVOKE,
timestamp: 0,
info: 0,
hasAbsTimestamp: false,
header: pbuf[:], header: pbuf[:],
body: pbuf[RTMP_MAX_HEADER_SIZE:], body: pbuf[RTMP_MAX_HEADER_SIZE:],
} }
@ -454,18 +443,15 @@ func sendConnectPacket(s *Session, cp *packet) error {
pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc))
return sendPacket(s, &pkt, 1) return sendPacket(s, &pkt, true)
} }
func sendCreateStream(s *Session) error { func sendCreateStream(s *Session) error {
var pbuf [256]byte var pbuf [256]byte
pkt := packet{ pkt := packet{
channel: 0x03, /* control channel (invoke) */ channel: RTMP_CHANNEL_CONTROL,
headerType: RTMP_PACKET_SIZE_MEDIUM, headerType: RTMP_PACKET_SIZE_MEDIUM,
packetType: RTMP_PACKET_TYPE_INVOKE, packetType: RTMP_PACKET_TYPE_INVOKE,
timestamp: 0,
info: 0,
hasAbsTimestamp: false,
header: pbuf[:], header: pbuf[:],
body: pbuf[RTMP_MAX_HEADER_SIZE:], body: pbuf[RTMP_MAX_HEADER_SIZE:],
} }
@ -485,18 +471,15 @@ func sendCreateStream(s *Session) error {
pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc))
return sendPacket(s, &pkt, 1) return sendPacket(s, &pkt, true)
} }
func sendReleaseStream(s *Session) error { func sendReleaseStream(s *Session) error {
var pbuf [1024]byte var pbuf [1024]byte
pkt := packet{ pkt := packet{
channel: 0x03, /* control channel (invoke) */ channel: RTMP_CHANNEL_CONTROL,
headerType: RTMP_PACKET_SIZE_MEDIUM, headerType: RTMP_PACKET_SIZE_MEDIUM,
packetType: RTMP_PACKET_TYPE_INVOKE, packetType: RTMP_PACKET_TYPE_INVOKE,
timestamp: 0,
info: 0,
hasAbsTimestamp: false,
header: pbuf[:], header: pbuf[:],
body: pbuf[RTMP_MAX_HEADER_SIZE:], body: pbuf[RTMP_MAX_HEADER_SIZE:],
} }
@ -519,18 +502,15 @@ func sendReleaseStream(s *Session) error {
} }
pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc))
return sendPacket(s, &pkt, 0) return sendPacket(s, &pkt, false)
} }
func sendFCPublish(s *Session) error { func sendFCPublish(s *Session) error {
var pbuf [1024]byte var pbuf [1024]byte
pkt := packet{ pkt := packet{
channel: 0x03, /* control channel (invoke) */ channel: RTMP_CHANNEL_CONTROL,
headerType: RTMP_PACKET_SIZE_MEDIUM, headerType: RTMP_PACKET_SIZE_MEDIUM,
packetType: RTMP_PACKET_TYPE_INVOKE, packetType: RTMP_PACKET_TYPE_INVOKE,
timestamp: 0,
info: 0,
hasAbsTimestamp: false,
header: pbuf[:], header: pbuf[:],
body: pbuf[RTMP_MAX_HEADER_SIZE:], body: pbuf[RTMP_MAX_HEADER_SIZE:],
} }
@ -554,18 +534,15 @@ func sendFCPublish(s *Session) error {
pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc))
return sendPacket(s, &pkt, 0) return sendPacket(s, &pkt, false)
} }
func sendFCUnpublish(s *Session) error { func sendFCUnpublish(s *Session) error {
var pbuf [1024]byte var pbuf [1024]byte
pkt := packet{ pkt := packet{
channel: 0x03, /* control channel (invoke) */ channel: RTMP_CHANNEL_CONTROL,
headerType: RTMP_PACKET_SIZE_MEDIUM, headerType: RTMP_PACKET_SIZE_MEDIUM,
packetType: RTMP_PACKET_TYPE_INVOKE, packetType: RTMP_PACKET_TYPE_INVOKE,
timestamp: 0,
info: 0,
hasAbsTimestamp: false,
header: pbuf[:], header: pbuf[:],
body: pbuf[RTMP_MAX_HEADER_SIZE:], body: pbuf[RTMP_MAX_HEADER_SIZE:],
} }
@ -589,18 +566,15 @@ func sendFCUnpublish(s *Session) error {
pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc))
return sendPacket(s, &pkt, 0) return sendPacket(s, &pkt, false)
} }
func sendPublish(s *Session) error { func sendPublish(s *Session) error {
var pbuf [1024]byte var pbuf [1024]byte
pkt := packet{ pkt := packet{
channel: 0x04, /* source channel (invoke) */ channel: RTMP_CHANNEL_SOURCE,
headerType: RTMP_PACKET_SIZE_LARGE, headerType: RTMP_PACKET_SIZE_LARGE,
packetType: RTMP_PACKET_TYPE_INVOKE, packetType: RTMP_PACKET_TYPE_INVOKE,
timestamp: 0,
info: s.streamID,
hasAbsTimestamp: false,
header: pbuf[:], header: pbuf[:],
body: pbuf[RTMP_MAX_HEADER_SIZE:], body: pbuf[RTMP_MAX_HEADER_SIZE:],
} }
@ -628,18 +602,15 @@ func sendPublish(s *Session) error {
pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc))
return sendPacket(s, &pkt, 1) return sendPacket(s, &pkt, true)
} }
func sendDeleteStream(s *Session, dStreamId float64) error { func sendDeleteStream(s *Session, dStreamId float64) error {
var pbuf [256]byte var pbuf [256]byte
pkt := packet{ pkt := packet{
channel: 0x03, /* control channel (invoke) */ channel: RTMP_CHANNEL_CONTROL,
headerType: RTMP_PACKET_SIZE_MEDIUM, headerType: RTMP_PACKET_SIZE_MEDIUM,
packetType: RTMP_PACKET_TYPE_INVOKE, packetType: RTMP_PACKET_TYPE_INVOKE,
timestamp: 0,
info: 0,
hasAbsTimestamp: false,
header: pbuf[:], header: pbuf[:],
body: pbuf[RTMP_MAX_HEADER_SIZE:], body: pbuf[RTMP_MAX_HEADER_SIZE:],
} }
@ -663,19 +634,16 @@ func sendDeleteStream(s *Session, dStreamId float64) error {
pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc))
/* no response expected */ /* no response expected */
return sendPacket(s, &pkt, 0) return sendPacket(s, &pkt, false)
} }
// sendBytesReceived tells the server how many bytes the client has received. // sendBytesReceived tells the server how many bytes the client has received.
func sendBytesReceived(s *Session) error { func sendBytesReceived(s *Session) error {
var pbuf [256]byte var pbuf [256]byte
pkt := packet{ pkt := packet{
channel: 0x02, /* control channel (invoke) */ channel: RTMP_CHANNEL_BYTES_READ,
headerType: RTMP_PACKET_SIZE_MEDIUM, headerType: RTMP_PACKET_SIZE_MEDIUM,
packetType: RTMP_PACKET_TYPE_BYTES_READ_REPORT, packetType: RTMP_PACKET_TYPE_BYTES_READ_REPORT,
timestamp: 0,
info: 0,
hasAbsTimestamp: false,
header: pbuf[:], header: pbuf[:],
body: pbuf[RTMP_MAX_HEADER_SIZE:], body: pbuf[RTMP_MAX_HEADER_SIZE:],
} }
@ -688,18 +656,15 @@ func sendBytesReceived(s *Session) error {
} }
pkt.bodySize = 4 pkt.bodySize = 4
return sendPacket(s, &pkt, 0) return sendPacket(s, &pkt, false)
} }
func sendCheckBW(s *Session) error { func sendCheckBW(s *Session) error {
var pbuf [256]byte var pbuf [256]byte
pkt := packet{ pkt := packet{
channel: 0x03, /* control channel (invoke) */ channel: RTMP_CHANNEL_CONTROL,
headerType: RTMP_PACKET_SIZE_LARGE, headerType: RTMP_PACKET_SIZE_LARGE,
packetType: RTMP_PACKET_TYPE_INVOKE, packetType: RTMP_PACKET_TYPE_INVOKE,
timestamp: 0,
info: 0,
hasAbsTimestamp: false,
header: pbuf[:], header: pbuf[:],
body: pbuf[RTMP_MAX_HEADER_SIZE:], body: pbuf[RTMP_MAX_HEADER_SIZE:],
} }
@ -719,7 +684,7 @@ func sendCheckBW(s *Session) error {
pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc))
return sendPacket(s, &pkt, 0) return sendPacket(s, &pkt, false)
} }
func eraseMethod(m []method, i int) []method { func eraseMethod(m []method, i int) []method {
@ -740,9 +705,6 @@ func handleInvoke(s *Session, body []byte) error {
return errDecoding return errDecoding
} }
// NOTE we don't really need this ?? still functions without it
//C.AMF_Dump(&obj)
//C.AMFProp_GetString(C_AMF_GetProp(&obj, nil, 0), &method)
meth := C_AMFProp_GetString(C_AMF_GetProp(&obj, "", 0)) meth := C_AMFProp_GetString(C_AMF_GetProp(&obj, "", 0))
txn := C_AMFProp_GetNumber(C_AMF_GetProp(&obj, "", 1)) txn := C_AMFProp_GetNumber(C_AMF_GetProp(&obj, "", 1))
// TODO use new logger here // TODO use new logger here
@ -865,18 +827,15 @@ func handleInvoke(s *Session, body []byte) error {
} }
leave: leave:
C_AMF_Reset(&obj) C_AMF_Reset(&obj)
// None of the methods we implement will result in a true return.
return nil return nil
} }
func handshake(s *Session, FP9HandShake int32) error { func handshake(s *Session) error {
var clientbuf [RTMP_SIG_SIZE + 1]byte var clientbuf [RTMP_SIG_SIZE + 1]byte
clientsig := clientbuf[1:] clientsig := clientbuf[1:]
var serversig [RTMP_SIG_SIZE]byte var serversig [RTMP_SIG_SIZE]byte
clientbuf[0] = RTMP_CHANNEL_CONTROL
clientbuf[0] = 0x03 // not encrypted
binary.BigEndian.PutUint32(clientsig, uint32(time.Now().UnixNano()/1000000)) binary.BigEndian.PutUint32(clientsig, uint32(time.Now().UnixNano()/1000000))
copy(clientsig[4:8], []byte{0, 0, 0, 0}) copy(clientsig[4:8], []byte{0, 0, 0, 0})
@ -934,16 +893,15 @@ func handshake(s *Session, FP9HandShake int32) error {
// write prepares data to write then sends it. // write prepares data to write then sends it.
func (s *Session) write(buf []byte) error { func (s *Session) write(buf []byte) error {
var pkt packet pkt := packet{
channel: RTMP_CHANNEL_SOURCE,
info: s.streamID,
}
var enc []byte var enc []byte
size := len(buf)
var num int
pkt.channel = 0x04
pkt.info = s.streamID
for len(buf) != 0 { for len(buf) != 0 {
if pkt.bytesRead == 0 { if pkt.bytesRead == 0 {
if size < minDataSize { if len(buf) < minDataSize {
return errTinyPacket return errTinyPacket
} }
@ -985,7 +943,7 @@ func (s *Session) write(buf []byte) error {
} else { } else {
enc = pkt.body[:pkt.bodySize][pkt.bytesRead:] enc = pkt.body[:pkt.bodySize][pkt.bytesRead:]
} }
num = int(pkt.bodySize - pkt.bytesRead) num := int(pkt.bodySize - pkt.bytesRead)
if num > len(buf) { if num > len(buf) {
num = len(buf) num = len(buf)
} }
@ -994,7 +952,7 @@ func (s *Session) write(buf []byte) error {
pkt.bytesRead += uint32(num) pkt.bytesRead += uint32(num)
buf = buf[num:] buf = buf[num:]
if pkt.bytesRead == pkt.bodySize { if pkt.bytesRead == pkt.bodySize {
err := sendPacket(s, &pkt, 0) err := sendPacket(s, &pkt, false)
pkt.body = nil pkt.body = nil
pkt.bytesRead = 0 pkt.bytesRead = 0
if err != nil { if err != nil {

View File

@ -105,7 +105,6 @@ type link struct {
flashVer string flashVer string
token string token string
extras C_AMFObject extras C_AMFObject
seekTime int32
lFlags int32 lFlags int32
swfAge int32 swfAge int32
protocol int32 protocol int32

View File

@ -96,13 +96,13 @@ func (s *Session) start() error {
} }
s.enableWrite() s.enableWrite()
err = connect(s, nil) err = connect(s)
if err != nil { if err != nil {
s.close() s.close()
return err return err
} }
err = connectStream(s, 0) err = connectStream(s)
if err != nil { if err != nil {
s.close() s.close()
return err return err