mirror of https://bitbucket.org/ausocean/av.git
Additional logging.
This commit is contained in:
parent
e772d37a1b
commit
3dbaa810fc
43
rtmp/rtmp.go
43
rtmp/rtmp.go
|
@ -191,40 +191,38 @@ func connect(s *Session) error {
|
|||
|
||||
// connectStream reads a packet and handles it
|
||||
func connectStream(s *Session) error {
|
||||
var pkt packet
|
||||
|
||||
var err error
|
||||
for !s.isPlaying && s.isConnected() {
|
||||
err := readPacket(s, &pkt)
|
||||
pkt := packet{}
|
||||
err = readPacket(s, &pkt)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
if pkt.bodySize == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
if pkt.packetType == RTMP_PACKET_TYPE_AUDIO ||
|
||||
pkt.packetType == RTMP_PACKET_TYPE_VIDEO ||
|
||||
pkt.packetType == RTMP_PACKET_TYPE_INFO {
|
||||
s.log(DebugLevel, pkg+"got packet before play; ignoring")
|
||||
pkt.body = nil
|
||||
continue
|
||||
}
|
||||
|
||||
handlePacket(s, &pkt)
|
||||
pkt.body = nil
|
||||
err = handlePacket(s, &pkt)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !s.isPlaying {
|
||||
return errConnStream
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// handlePacket handles a packet that the client has received.
|
||||
// NB: cases have been commented out that are not currently used by AusOcean
|
||||
func handlePacket(s *Session, pkt *packet) int32 {
|
||||
var hasMediaPacket int32
|
||||
func handlePacket(s *Session, pkt *packet) error {
|
||||
switch pkt.packetType {
|
||||
|
||||
case RTMP_PACKET_TYPE_CHUNK_SIZE:
|
||||
|
@ -266,7 +264,7 @@ func handlePacket(s *Session, pkt *packet) int32 {
|
|||
if err != nil {
|
||||
// This will never happen with the methods we implement.
|
||||
s.log(WarnLevel, pkg+"unexpected error from handleInvoke", "error", err.Error())
|
||||
hasMediaPacket = 2
|
||||
return err
|
||||
}
|
||||
|
||||
case RTMP_PACKET_TYPE_FLASH_VIDEO:
|
||||
|
@ -275,7 +273,7 @@ func handlePacket(s *Session, pkt *packet) int32 {
|
|||
default:
|
||||
s.log(WarnLevel, pkg+"unknown packet type", "type", pkt.packetType)
|
||||
}
|
||||
return hasMediaPacket
|
||||
return nil
|
||||
}
|
||||
|
||||
func readN(s *Session, buf []byte) error {
|
||||
|
@ -286,7 +284,6 @@ func readN(s *Session, buf []byte) error {
|
|||
n, err := io.ReadFull(s.link.conn, buf)
|
||||
if err != nil {
|
||||
s.log(DebugLevel, pkg+"read failed", "error", err.Error())
|
||||
s.close()
|
||||
return err
|
||||
}
|
||||
s.nBytesIn += int32(n)
|
||||
|
@ -308,7 +305,6 @@ func writeN(s *Session, buf []byte) error {
|
|||
_, err = s.link.conn.Write(buf)
|
||||
if err != nil {
|
||||
s.log(WarnLevel, pkg+"write failed", "error", err.Error())
|
||||
s.close()
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
@ -404,12 +400,10 @@ func sendConnectPacket(s *Session) error {
|
|||
}
|
||||
}
|
||||
|
||||
if copy(enc, []byte{0, 0, AMF_OBJECT_END}) != 3 {
|
||||
return errCopying // TODO: is this even possible?
|
||||
}
|
||||
copy(enc, []byte{0, 0, AMF_OBJECT_END})
|
||||
enc = enc[3:]
|
||||
|
||||
/* add auth string */
|
||||
// add auth string
|
||||
if s.link.auth != "" {
|
||||
enc = C_AMF_EncodeBoolean(enc, s.link.lFlags&RTMP_LF_AUTH != 0)
|
||||
if enc == nil {
|
||||
|
@ -430,7 +424,7 @@ func sendConnectPacket(s *Session) error {
|
|||
|
||||
pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc))
|
||||
|
||||
return sendPacket(s, &pkt, true)
|
||||
return sendPacket(s, &pkt, true) // response expected
|
||||
}
|
||||
|
||||
func sendCreateStream(s *Session) error {
|
||||
|
@ -458,7 +452,7 @@ func sendCreateStream(s *Session) error {
|
|||
|
||||
pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc))
|
||||
|
||||
return sendPacket(s, &pkt, true)
|
||||
return sendPacket(s, &pkt, true) // response expected
|
||||
}
|
||||
|
||||
func sendReleaseStream(s *Session) error {
|
||||
|
@ -589,7 +583,7 @@ func sendPublish(s *Session) error {
|
|||
|
||||
pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc))
|
||||
|
||||
return sendPacket(s, &pkt, true)
|
||||
return sendPacket(s, &pkt, true) // response expected
|
||||
}
|
||||
|
||||
func sendDeleteStream(s *Session, dStreamId float64) error {
|
||||
|
@ -779,6 +773,7 @@ func handleInvoke(s *Session, body []byte) error {
|
|||
s.log(FatalLevel, pkg+"unsupported method av_NetStream_Play_Start/av_NetStream_Play_PublishNotify")
|
||||
|
||||
case av_NetStream_Publish_Start:
|
||||
s.log(DebugLevel, pkg+"playing")
|
||||
s.isPlaying = true
|
||||
for i, m := range s.methodCalls {
|
||||
if m.name == av_publish {
|
||||
|
@ -786,6 +781,7 @@ func handleInvoke(s *Session, body []byte) error {
|
|||
break
|
||||
}
|
||||
}
|
||||
// ToDo: handle case when av_publish method not found
|
||||
|
||||
case av_NetStream_Play_Complete, av_NetStream_Play_Stop, av_NetStream_Play_UnpublishNotify:
|
||||
s.log(FatalLevel, pkg+"unsupported method av_NetStream_Play_Complete/av_NetStream_Play_Stop/av_NetStream_Play_UnpublishNotify")
|
||||
|
@ -825,14 +821,15 @@ func handshake(s *Session) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.log(DebugLevel, pkg+"handshake sent")
|
||||
|
||||
var typ [1]byte
|
||||
err = readN(s, typ[:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.log(DebugLevel, pkg+"handshake received")
|
||||
|
||||
s.log(DebugLevel, pkg+"handshake", "received", typ[0])
|
||||
if typ[0] != clientbuf[0] {
|
||||
s.log(WarnLevel, pkg+"handshake type mismatch", "sent", clientbuf[0], "received", typ)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue