rtmp: added streaming from file test

This commit is contained in:
saxon 2019-01-10 13:58:58 +10:30
commit b219690b10
3 changed files with 53 additions and 39 deletions

View File

@ -35,6 +35,7 @@ LICENSE
package rtmp package rtmp
import ( import (
"io"
"encoding/binary" "encoding/binary"
) )
@ -67,8 +68,12 @@ const (
RTMP_CHANNEL_SOURCE = 0x04 RTMP_CHANNEL_SOURCE = 0x04
) )
// packetSize defines valid packet sizes. // headerSizes defines header sizes for header types 0, 1, 2 and 3 respectively:
var packetSize = [...]int{12, 8, 4, 1} // 0: full header (12 bytes)
// 1: header without message ID (8 bytes)
// 2: basic header + timestamp (4 byes)
// 3: basic header (chunk type and stream ID) (1 byte)
var headerSizes = [...]int{12, 8, 4, 1}
// packet defines an RTMP packet. // packet defines an RTMP packet.
type packet struct { type packet struct {
@ -101,6 +106,9 @@ func readPacket(s *Session, pkt *packet) error {
err := readN(s, header[:1]) err := readN(s, header[:1])
if err != nil { if err != nil {
s.log(DebugLevel, pkg+"failed to read packet header 1st byte", "error", err.Error()) s.log(DebugLevel, pkg+"failed to read packet header 1st byte", "error", err.Error())
if err == io.EOF {
s.log(WarnLevel, pkg+"EOF error; connection likely terminated")
}
return err return err
} }
pkt.headerType = (header[0] & 0xc0) >> 6 pkt.headerType = (header[0] & 0xc0) >> 6
@ -150,7 +158,7 @@ func readPacket(s *Session, pkt *packet) error {
s.channelsAllocatedIn = n s.channelsAllocatedIn = n
} }
size := packetSize[pkt.headerType] size := headerSizes[pkt.headerType]
switch { switch {
case size == RTMP_LARGE_HEADER_SIZE: case size == RTMP_LARGE_HEADER_SIZE:
pkt.hasAbsTimestamp = true pkt.hasAbsTimestamp = true
@ -304,7 +312,7 @@ func sendPacket(s *Session, pkt *packet, queue bool) error {
if pkt.body != nil { if pkt.body != nil {
// Span from -packetsize for the type to the start of the body. // Span from -packetsize for the type to the start of the body.
headBytes = pkt.header headBytes = pkt.header
origIdx = RTMP_MAX_HEADER_SIZE - packetSize[pkt.headerType] origIdx = RTMP_MAX_HEADER_SIZE - headerSizes[pkt.headerType]
} else { } else {
// Allocate a new header and allow 6 bytes of movement backward. // Allocate a new header and allow 6 bytes of movement backward.
var hbuf [RTMP_MAX_HEADER_SIZE]byte var hbuf [RTMP_MAX_HEADER_SIZE]byte
@ -320,7 +328,7 @@ func sendPacket(s *Session, pkt *packet, queue bool) error {
cSize = 1 cSize = 1
} }
hSize := packetSize[pkt.headerType] hSize := headerSizes[pkt.headerType]
if cSize != 0 { if cSize != 0 {
origIdx -= cSize origIdx -= cSize
hSize += cSize hSize += cSize
@ -361,7 +369,7 @@ func sendPacket(s *Session, pkt *packet, queue bool) error {
} }
} }
if packetSize[pkt.headerType] > 1 { if headerSizes[pkt.headerType] > 1 {
res := ts res := ts
if ts > 0xffffff { if ts > 0xffffff {
res = 0xffffff res = 0xffffff
@ -370,14 +378,14 @@ func sendPacket(s *Session, pkt *packet, queue bool) error {
headerIdx += 3 // 24bits headerIdx += 3 // 24bits
} }
if packetSize[pkt.headerType] > 4 { if headerSizes[pkt.headerType] > 4 {
C_AMF_EncodeInt24(headBytes[headerIdx:], int32(pkt.bodySize)) C_AMF_EncodeInt24(headBytes[headerIdx:], int32(pkt.bodySize))
headerIdx += 3 // 24bits headerIdx += 3 // 24bits
headBytes[headerIdx] = pkt.packetType headBytes[headerIdx] = pkt.packetType
headerIdx++ headerIdx++
} }
if packetSize[pkt.headerType] > 8 { if headerSizes[pkt.headerType] > 8 {
n := int(encodeInt32LE(headBytes[headerIdx:headerIdx+4], pkt.info)) n := int(encodeInt32LE(headBytes[headerIdx:headerIdx+4], pkt.info))
headerIdx += n headerIdx += n
} }
@ -415,7 +423,6 @@ func sendPacket(s *Session, pkt *packet, queue bool) error {
if s.deferred != nil { if s.deferred != nil {
// Prepend the previously deferred packet and write it with the current one. // Prepend the previously deferred packet and write it with the current one.
bytes = append(s.deferred, bytes...) bytes = append(s.deferred, bytes...)
s.deferred = nil
} }
err := writeN(s, bytes) err := writeN(s, bytes)
if err != nil { if err != nil {

View File

@ -191,40 +191,38 @@ func connect(s *Session) error {
// connectStream reads a packet and handles it // connectStream reads a packet and handles it
func connectStream(s *Session) error { func connectStream(s *Session) error {
var pkt packet var err error
for !s.isPlaying && s.isConnected() { for !s.isPlaying && s.isConnected() {
err := readPacket(s, &pkt) pkt := packet{}
err = readPacket(s, &pkt)
if err != nil { if err != nil {
break break
} }
if pkt.bodySize == 0 { if pkt.bodySize == 0 {
continue continue
} }
if pkt.packetType == RTMP_PACKET_TYPE_AUDIO || if pkt.packetType == RTMP_PACKET_TYPE_AUDIO ||
pkt.packetType == RTMP_PACKET_TYPE_VIDEO || pkt.packetType == RTMP_PACKET_TYPE_VIDEO ||
pkt.packetType == RTMP_PACKET_TYPE_INFO { pkt.packetType == RTMP_PACKET_TYPE_INFO {
s.log(DebugLevel, pkg+"got packet before play; ignoring") s.log(DebugLevel, pkg+"got packet before play; ignoring")
pkt.body = nil
continue continue
} }
handlePacket(s, &pkt) err = handlePacket(s, &pkt)
pkt.body = nil if err != nil {
break
}
} }
if !s.isPlaying { if !s.isPlaying {
return errConnStream return err
} }
return nil return nil
} }
// handlePacket handles a packet that the client has received. // handlePacket handles a packet that the client has received.
// NB: cases have been commented out that are not currently used by AusOcean // NB: cases have been commented out that are not currently used by AusOcean
func handlePacket(s *Session, pkt *packet) int32 { func handlePacket(s *Session, pkt *packet) error {
var hasMediaPacket int32
switch pkt.packetType { switch pkt.packetType {
case RTMP_PACKET_TYPE_CHUNK_SIZE: case RTMP_PACKET_TYPE_CHUNK_SIZE:
@ -266,7 +264,7 @@ func handlePacket(s *Session, pkt *packet) int32 {
if err != nil { if err != nil {
// This will never happen with the methods we implement. // This will never happen with the methods we implement.
s.log(WarnLevel, pkg+"unexpected error from handleInvoke", "error", err.Error()) s.log(WarnLevel, pkg+"unexpected error from handleInvoke", "error", err.Error())
hasMediaPacket = 2 return err
} }
case RTMP_PACKET_TYPE_FLASH_VIDEO: case RTMP_PACKET_TYPE_FLASH_VIDEO:
@ -275,7 +273,7 @@ func handlePacket(s *Session, pkt *packet) int32 {
default: default:
s.log(WarnLevel, pkg+"unknown packet type", "type", pkt.packetType) s.log(WarnLevel, pkg+"unknown packet type", "type", pkt.packetType)
} }
return hasMediaPacket return nil
} }
func readN(s *Session, buf []byte) error { func readN(s *Session, buf []byte) error {
@ -286,7 +284,6 @@ func readN(s *Session, buf []byte) error {
n, err := io.ReadFull(s.link.conn, buf) n, err := io.ReadFull(s.link.conn, buf)
if err != nil { if err != nil {
s.log(DebugLevel, pkg+"read failed", "error", err.Error()) s.log(DebugLevel, pkg+"read failed", "error", err.Error())
s.close()
return err return err
} }
s.nBytesIn += int32(n) s.nBytesIn += int32(n)
@ -308,7 +305,6 @@ func writeN(s *Session, buf []byte) error {
_, err = s.link.conn.Write(buf) _, err = s.link.conn.Write(buf)
if err != nil { if err != nil {
s.log(WarnLevel, pkg+"write failed", "error", err.Error()) s.log(WarnLevel, pkg+"write failed", "error", err.Error())
s.close()
return err return err
} }
return nil return nil
@ -404,12 +400,10 @@ func sendConnectPacket(s *Session) error {
} }
} }
if copy(enc, []byte{0, 0, AMF_OBJECT_END}) != 3 { copy(enc, []byte{0, 0, AMF_OBJECT_END})
return errCopying // TODO: is this even possible?
}
enc = enc[3:] enc = enc[3:]
/* add auth string */ // add auth string
if s.link.auth != "" { if s.link.auth != "" {
enc = C_AMF_EncodeBoolean(enc, s.link.lFlags&RTMP_LF_AUTH != 0) enc = C_AMF_EncodeBoolean(enc, s.link.lFlags&RTMP_LF_AUTH != 0)
if enc == nil { if enc == nil {
@ -430,7 +424,7 @@ func sendConnectPacket(s *Session) error {
pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc))
return sendPacket(s, &pkt, true) return sendPacket(s, &pkt, true) // response expected
} }
func sendCreateStream(s *Session) error { func sendCreateStream(s *Session) error {
@ -458,7 +452,7 @@ func sendCreateStream(s *Session) error {
pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc))
return sendPacket(s, &pkt, true) return sendPacket(s, &pkt, true) // response expected
} }
func sendReleaseStream(s *Session) error { func sendReleaseStream(s *Session) error {
@ -589,7 +583,7 @@ func sendPublish(s *Session) error {
pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc))
return sendPacket(s, &pkt, true) return sendPacket(s, &pkt, true) // response expected
} }
func sendDeleteStream(s *Session, dStreamId float64) error { func sendDeleteStream(s *Session, dStreamId float64) error {
@ -779,6 +773,7 @@ func handleInvoke(s *Session, body []byte) error {
s.log(FatalLevel, pkg+"unsupported method av_NetStream_Play_Start/av_NetStream_Play_PublishNotify") s.log(FatalLevel, pkg+"unsupported method av_NetStream_Play_Start/av_NetStream_Play_PublishNotify")
case av_NetStream_Publish_Start: case av_NetStream_Publish_Start:
s.log(DebugLevel, pkg+"playing")
s.isPlaying = true s.isPlaying = true
for i, m := range s.methodCalls { for i, m := range s.methodCalls {
if m.name == av_publish { if m.name == av_publish {
@ -786,6 +781,7 @@ func handleInvoke(s *Session, body []byte) error {
break break
} }
} }
// ToDo: handle case when av_publish method not found
case av_NetStream_Play_Complete, av_NetStream_Play_Stop, av_NetStream_Play_UnpublishNotify: case av_NetStream_Play_Complete, av_NetStream_Play_Stop, av_NetStream_Play_UnpublishNotify:
s.log(FatalLevel, pkg+"unsupported method av_NetStream_Play_Complete/av_NetStream_Play_Stop/av_NetStream_Play_UnpublishNotify") s.log(FatalLevel, pkg+"unsupported method av_NetStream_Play_Complete/av_NetStream_Play_Stop/av_NetStream_Play_UnpublishNotify")
@ -825,14 +821,15 @@ func handshake(s *Session) error {
if err != nil { if err != nil {
return err return err
} }
s.log(DebugLevel, pkg+"handshake sent")
var typ [1]byte var typ [1]byte
err = readN(s, typ[:]) err = readN(s, typ[:])
if err != nil { if err != nil {
return err return err
} }
s.log(DebugLevel, pkg+"handshake received")
s.log(DebugLevel, pkg+"handshake", "received", typ[0])
if typ[0] != clientbuf[0] { if typ[0] != clientbuf[0] {
s.log(WarnLevel, pkg+"handshake type mismatch", "sent", clientbuf[0], "received", typ) s.log(WarnLevel, pkg+"handshake type mismatch", "sent", clientbuf[0], "received", typ)
} }

View File

@ -29,7 +29,6 @@ package rtmp
import ( import (
"fmt" "fmt"
"io/ioutil"
"os" "os"
"runtime" "runtime"
"testing" "testing"
@ -37,6 +36,7 @@ import (
"bitbucket.org/ausocean/av/stream/flv" "bitbucket.org/ausocean/av/stream/flv"
"bitbucket.org/ausocean/av/stream/lex" "bitbucket.org/ausocean/av/stream/lex"
"io/ioutil"
) )
const ( const (
@ -87,7 +87,7 @@ func TestKey(t *testing.T) {
testLog(0, "TestKey") testLog(0, "TestKey")
testKey = os.Getenv("RTMP_TEST_KEY") testKey = os.Getenv("RTMP_TEST_KEY")
if testKey == "" { if testKey == "" {
t.Errorf("RTMP_TEST_KEY environment variable not defined") fmt.Printf("RTMP_TEST_KEY environment variable not defined\n")
os.Exit(1) os.Exit(1)
} }
testLog(0, "Testing against URL "+testBaseURL+testKey) testLog(0, "Testing against URL "+testBaseURL+testKey)
@ -122,18 +122,28 @@ func TestSetupURL(t *testing.T) {
} }
} }
func TestOpen(t *testing.T) {
testLog(0, "TestOpen")
s := NewSession(testBaseURL+testKey, testTimeout, testLog)
err := setupURL(s, s.url)
if err != nil {
t.Errorf("setupURL failed with error: %v", err)
}
s.enableWrite()
err = s.Open()
if err != nil {
t.Errorf("connect failed with error: %v", err)
}
}
func TestOpenClose(t *testing.T) { func TestOpenClose(t *testing.T) {
testLog(0, "TestOpenClose") testLog(0, "TestOpenClose")
s := NewSession(testBaseURL+testKey, testTimeout, testLog) s := NewSession(testBaseURL+testKey, testTimeout, testLog)
err := s.Open() err := s.Open()
if err != nil { if err != nil {
t.Errorf("Session.Open failed with error: %v", err) t.Errorf("Open failed with error: %v", err)
return return
} }
err = s.Close()
if err != nil {
t.Errorf("Session.Close failed with error: %v", err)
}
} }
func TestFromFile(t *testing.T) { func TestFromFile(t *testing.T) {