From 974d9a484bfc858f64f71c3a7a4089f5ebab53a5 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sat, 19 Jan 2019 11:30:59 +1030 Subject: [PATCH 01/13] Fixed packetTypeBytesReadReport case in handlePacket and added debug logging. --- rtmp/rtmp.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index 11a9d71b..37460f07 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -250,17 +250,21 @@ func handlePacket(s *Session, pkt *packet) error { switch pkt.packetType { case packetTypeChunkSize: s.inChunkSize = amf.DecodeInt32(pkt.body[:4]) + s.log(DebugLevel, pkg+"set inChunkSize", "size", int(s.inChunkSize)) case packetTypeBytesReadReport: - s.serverBW = amf.DecodeInt32(pkt.body[:4]) + s.log(DebugLevel, pkg+"received packetTypeBytesReadReport") case packetTypeServerBW: s.serverBW = amf.DecodeInt32(pkt.body[:4]) + s.log(DebugLevel, pkg+"set serverBW", "size", int(s.serverBW)) case packetTypeClientBW: s.clientBW = amf.DecodeInt32(pkt.body[:4]) + s.log(DebugLevel, pkg+"set clientBW", "size", int(s.clientBW)) if pkt.bodySize > 4 { s.clientBW2 = pkt.body[4] + s.log(DebugLevel, pkg+"set clientBW2", "size", int(s.clientBW2)) } else { s.clientBW2 = 0xff } From fd903b4addfd32d27b7cb8fee98c7a79a2d8a993 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sat, 19 Jan 2019 13:10:38 +1030 Subject: [PATCH 02/13] rtmp.Session now rtmp.Conn and rtmp.NewSession() and Open() replaced with Dial(). --- revid/senders.go | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/revid/senders.go b/revid/senders.go index d1689396..a73523d5 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -251,7 +251,7 @@ func (s *ffmpegSender) close() error { // rtmpSender implements loadSender for a native RTMP destination. type rtmpSender struct { - sess *rtmp.Session + conn *rtmp.Conn url string timeout uint @@ -264,16 +264,15 @@ type rtmpSender struct { var _ restarter = (*rtmpSender)(nil) func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) { - var sess *rtmp.Session + var conn *rtmp.Conn var err error for n := 0; n < retries; n++ { - sess = rtmp.NewSession(url, timeout, log) - err = sess.Open() + conn, err = rtmp.Dial(url, timeout, log) if err == nil { break } log(logger.Error, err.Error()) - sess.Close() + conn.Close() if n < retries-1 { log(logger.Info, pkg+"retry rtmp connection") } @@ -283,7 +282,7 @@ func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg } s := &rtmpSender{ - sess: sess, + conn: conn, url: url, timeout: timeout, retries: retries, @@ -298,7 +297,7 @@ func (s *rtmpSender) load(c *ring.Chunk) error { } func (s *rtmpSender) send() error { - _, err := s.chunk.WriteTo(s.sess) + _, err := s.chunk.WriteTo(s.conn) return err } @@ -308,18 +307,17 @@ func (s *rtmpSender) release() { } func (s *rtmpSender) restart() error { - err := s.sess.Close() + err := s.conn.Close() if err != nil { return err } for n := 0; n < s.retries; n++ { - s.sess = rtmp.NewSession(s.url, s.timeout, s.log) - err = s.sess.Open() + s.conn, err = rtmp.Dial(s.url, s.timeout, s.log) if err == nil { break } s.log(logger.Error, err.Error()) - s.sess.Close() + s.conn.Close() if n < s.retries-1 { s.log(logger.Info, pkg+"retry rtmp connection") } @@ -328,7 +326,7 @@ func (s *rtmpSender) restart() error { } func (s *rtmpSender) close() error { - return s.sess.Close() + return s.conn.Close() } // udpSender implements loadSender for a native udp destination. From a73c73617a40d15669b2ac37e6615f781962808b Mon Sep 17 00:00:00 2001 From: scruzin Date: Sat, 19 Jan 2019 13:11:19 +1030 Subject: [PATCH 03/13] Session now Conn. --- rtmp/packet.go | 152 ++++++++++++++++++++++++------------------------- 1 file changed, 76 insertions(+), 76 deletions(-) diff --git a/rtmp/packet.go b/rtmp/packet.go index b88b15bc..73cc0088 100644 --- a/rtmp/packet.go +++ b/rtmp/packet.go @@ -96,15 +96,15 @@ type packet struct { } // readFrom reads a packet from the RTMP connection. -func (pkt *packet) readFrom(s *Session) error { +func (pkt *packet) readFrom(c *Conn) error { var hbuf [fullHeaderSize]byte header := hbuf[:] - _, err := s.read(header[:1]) + _, err := c.read(header[:1]) if err != nil { - s.log(DebugLevel, pkg+"failed to read packet header 1st byte", "error", err.Error()) + c.log(DebugLevel, pkg+"failed to read packet header 1st byte", "error", err.Error()) if err == io.EOF { - s.log(WarnLevel, pkg+"EOF error; connection likely terminated") + c.log(WarnLevel, pkg+"EOF error; connection likely terminated") } return err } @@ -114,45 +114,45 @@ func (pkt *packet) readFrom(s *Session) error { switch { case pkt.channel == 0: - _, err = s.read(header[:1]) + _, err = c.read(header[:1]) if err != nil { - s.log(DebugLevel, pkg+"failed to read packet header 2nd byte", "error", err.Error()) + c.log(DebugLevel, pkg+"failed to read packet header 2nd byte", "error", err.Error()) return err } header = header[1:] pkt.channel = int32(header[0]) + 64 case pkt.channel == 1: - _, err = s.read(header[:2]) + _, err = c.read(header[:2]) if err != nil { - s.log(DebugLevel, pkg+"failed to read packet header 3rd byte", "error", err.Error()) + c.log(DebugLevel, pkg+"failed to read packet header 3rd byte", "error", err.Error()) return err } header = header[2:] pkt.channel = int32(binary.BigEndian.Uint16(header[:2])) + 64 } - if pkt.channel >= s.channelsAllocatedIn { + if pkt.channel >= c.channelsAllocatedIn { n := pkt.channel + 10 - timestamp := append(s.channelTimestamp, make([]int32, 10)...) + timestamp := append(c.channelTimestamp, make([]int32, 10)...) var pkts []*packet - if s.channelsIn == nil { + if c.channelsIn == nil { pkts = make([]*packet, n) } else { - pkts = append(s.channelsIn[:pkt.channel:pkt.channel], make([]*packet, 10)...) + pkts = append(c.channelsIn[:pkt.channel:pkt.channel], make([]*packet, 10)...) } - s.channelTimestamp = timestamp - s.channelsIn = pkts + c.channelTimestamp = timestamp + c.channelsIn = pkts - for i := int(s.channelsAllocatedIn); i < len(s.channelTimestamp); i++ { - s.channelTimestamp[i] = 0 + for i := int(c.channelsAllocatedIn); i < len(c.channelTimestamp); i++ { + c.channelTimestamp[i] = 0 } - for i := int(s.channelsAllocatedIn); i < int(n); i++ { - s.channelsIn[i] = nil + for i := int(c.channelsAllocatedIn); i < int(n); i++ { + c.channelsIn[i] = nil } - s.channelsAllocatedIn = n + c.channelsAllocatedIn = n } size := headerSizes[pkt.headerType] @@ -160,16 +160,16 @@ func (pkt *packet) readFrom(s *Session) error { case size == fullHeaderSize: pkt.hasAbsTimestamp = true case size < fullHeaderSize: - if s.channelsIn[pkt.channel] != nil { - *pkt = *(s.channelsIn[pkt.channel]) + if c.channelsIn[pkt.channel] != nil { + *pkt = *(c.channelsIn[pkt.channel]) } } size-- if size > 0 { - _, err = s.read(header[:size]) + _, err = c.read(header[:size]) if err != nil { - s.log(DebugLevel, pkg+"failed to read packet header", "error", err.Error()) + c.log(DebugLevel, pkg+"failed to read packet header", "error", err.Error()) return err } } @@ -192,9 +192,9 @@ func (pkt *packet) readFrom(s *Session) error { extendedTimestamp := pkt.timestamp == 0xffffff if extendedTimestamp { - _, err = s.read(header[size : size+4]) + _, err = c.read(header[size : size+4]) if err != nil { - s.log(DebugLevel, pkg+"failed to read extended timestamp", "error", err.Error()) + c.log(DebugLevel, pkg+"failed to read extended timestamp", "error", err.Error()) return err } pkt.timestamp = amf.DecodeInt32(header[size : size+4]) @@ -206,39 +206,39 @@ func (pkt *packet) readFrom(s *Session) error { } toRead := pkt.bodySize - pkt.bytesRead - chunkSize := s.inChunkSize + chunkSize := c.inChunkSize if toRead < chunkSize { chunkSize = toRead } - _, err = s.read(pkt.body[pkt.bytesRead:][:chunkSize]) + _, err = c.read(pkt.body[pkt.bytesRead:][:chunkSize]) if err != nil { - s.log(DebugLevel, pkg+"failed to read packet body", "error", err.Error()) + c.log(DebugLevel, pkg+"failed to read packet body", "error", err.Error()) return err } pkt.bytesRead += uint32(chunkSize) // Keep the packet as a reference for other packets on this channel. - if s.channelsIn[pkt.channel] == nil { - s.channelsIn[pkt.channel] = &packet{} + if c.channelsIn[pkt.channel] == nil { + c.channelsIn[pkt.channel] = &packet{} } - *(s.channelsIn[pkt.channel]) = *pkt + *(c.channelsIn[pkt.channel]) = *pkt if extendedTimestamp { - s.channelsIn[pkt.channel].timestamp = 0xffffff + c.channelsIn[pkt.channel].timestamp = 0xffffff } if !pkt.hasAbsTimestamp { // Timestamps seem to always be relative. - pkt.timestamp += uint32(s.channelTimestamp[pkt.channel]) + pkt.timestamp += uint32(c.channelTimestamp[pkt.channel]) } - s.channelTimestamp[pkt.channel] = int32(pkt.timestamp) + c.channelTimestamp[pkt.channel] = int32(pkt.timestamp) - s.channelsIn[pkt.channel].body = nil - s.channelsIn[pkt.channel].bytesRead = 0 - s.channelsIn[pkt.channel].hasAbsTimestamp = false + c.channelsIn[pkt.channel].body = nil + c.channelsIn[pkt.channel].bytesRead = 0 + c.channelsIn[pkt.channel].hasAbsTimestamp = false return nil } @@ -270,35 +270,35 @@ func (pkt *packet) resize(size uint32, ht uint8) { // writeTo writes a packet to the RTMP connection. // Packets are written in chunks which are Session.chunkSize in length (128 bytes in length). // We defer sending small audio packets and combine consecutive small audio packets where possible to reduce I/O. -// When queue is true, we expect a response to this request and cache the method on s.methodCalls. -func (pkt *packet) writeTo(s *Session, queue bool) error { +// When queue is true, we expect a response to this request and cache the method on c.methodCallc. +func (pkt *packet) writeTo(c *Conn, queue bool) error { if pkt.body == nil || pkt.bodySize == 0 { return errInvalidBody } - if pkt.channel >= s.channelsAllocatedOut { - s.log(DebugLevel, pkg+"growing channelsOut", "channel", pkt.channel) + if pkt.channel >= c.channelsAllocatedOut { + c.log(DebugLevel, pkg+"growing channelsOut", "channel", pkt.channel) n := int(pkt.channel + 10) var pkts []*packet - if s.channelsOut == nil { + if c.channelsOut == nil { pkts = make([]*packet, n) } else { - pkts = append(s.channelsOut[:pkt.channel:pkt.channel], make([]*packet, 10)...) + pkts = append(c.channelsOut[:pkt.channel:pkt.channel], make([]*packet, 10)...) } - s.channelsOut = pkts + c.channelsOut = pkts - for i := int(s.channelsAllocatedOut); i < n; i++ { - s.channelsOut[i] = nil + for i := int(c.channelsAllocatedOut); i < n; i++ { + c.channelsOut[i] = nil } - s.channelsAllocatedOut = int32(n) + c.channelsAllocatedOut = int32(n) } - prevPkt := s.channelsOut[pkt.channel] + prevPkt := c.channelsOut[pkt.channel] var last int if prevPkt != nil && pkt.headerType != headerSizeLarge { - // Compress header by using the previous packet's attributes. + // Compress header by using the previous packet's attributec. if prevPkt.bodySize == pkt.bodySize && prevPkt.packetType == pkt.packetType && pkt.headerType == headerSizeMedium { pkt.headerType = headerSizeSmall } @@ -311,7 +311,7 @@ func (pkt *packet) writeTo(s *Session, queue bool) error { } if pkt.headerType > 3 { - s.log(WarnLevel, pkg+"unexpected header type", "type", pkt.headerType) + c.log(WarnLevel, pkg+"unexpected header type", "type", pkt.headerType) return errInvalidHeader } @@ -343,21 +343,21 @@ func (pkt *packet) writeTo(s *Session, queue bool) error { if ts >= 0xffffff { origIdx -= 4 hSize += 4 - s.log(DebugLevel, pkg+"larger timestamp than 24 bits", "timestamp", ts) + c.log(DebugLevel, pkg+"larger timestamp than 24 bits", "timestamp", ts) } headerIdx := origIdx - c := pkt.headerType << 6 + ch := pkt.headerType << 6 switch cSize { case 0: - c |= byte(pkt.channel) + ch |= byte(pkt.channel) case 1: // Do nothing. case 2: - c |= 1 + ch |= 1 } - headBytes[headerIdx] = c + headBytes[headerIdx] = ch headerIdx++ if cSize != 0 { @@ -398,44 +398,44 @@ func (pkt *packet) writeTo(s *Session, queue bool) error { } size := int(pkt.bodySize) - chunkSize := int(s.outChunkSize) + chunkSize := int(c.outChunkSize) - if s.deferred == nil { + if c.deferred == nil { // Defer sending small audio packets (at most once). if pkt.packetType == packetTypeAudio && size < chunkSize { - s.deferred = headBytes[origIdx:][:size+hSize] - s.log(DebugLevel, pkg+"deferred sending packet", "size", size, "la", s.link.conn.LocalAddr(), "ra", s.link.conn.RemoteAddr()) + c.deferred = headBytes[origIdx:][:size+hSize] + c.log(DebugLevel, pkg+"deferred sending packet", "size", size, "la", c.link.conn.LocalAddr(), "ra", c.link.conn.RemoteAddr()) return nil } } else { // Send previously deferrd packet if combining it with the next one would exceed the chunk size. - if len(s.deferred)+size+hSize > chunkSize { - s.log(DebugLevel, pkg+"sending deferred packet separately", "size", len(s.deferred)) - _, err := s.write(s.deferred) + if len(c.deferred)+size+hSize > chunkSize { + c.log(DebugLevel, pkg+"sending deferred packet separately", "size", len(c.deferred)) + _, err := c.write(c.deferred) if err != nil { return err } - s.deferred = nil + c.deferred = nil } } // TODO(kortschak): Rewrite this horrific peice of premature optimisation. - s.log(DebugLevel, pkg+"sending packet", "la", s.link.conn.LocalAddr(), "ra", s.link.conn.RemoteAddr(), "size", size) + c.log(DebugLevel, pkg+"sending packet", "la", c.link.conn.LocalAddr(), "ra", c.link.conn.RemoteAddr(), "size", size) for size+hSize != 0 { if chunkSize > size { chunkSize = size } bytes := headBytes[origIdx:][:chunkSize+hSize] - if s.deferred != nil { + if c.deferred != nil { // Prepend the previously deferred packet and write it with the current one. - s.log(DebugLevel, pkg+"combining deferred packet", "size", len(s.deferred)) - bytes = append(s.deferred, bytes...) + c.log(DebugLevel, pkg+"combining deferred packet", "size", len(c.deferred)) + bytes = append(c.deferred, bytes...) } - _, err := s.write(bytes) + _, err := c.write(bytes) if err != nil { return err } - s.deferred = nil + c.deferred = nil size -= chunkSize origIdx += chunkSize + hSize @@ -451,7 +451,7 @@ func (pkt *packet) writeTo(s *Session, queue bool) error { hSize += 4 } - headBytes[origIdx] = 0xc0 | c + headBytes[origIdx] = 0xc0 | ch if cSize != 0 { tmp := int(pkt.channel) - 64 @@ -468,20 +468,20 @@ func (pkt *packet) writeTo(s *Session, queue bool) error { } } - // If we invoked a remote method and queue is true, we queue the method until the result arrives. + // If we invoked a remote method and queue is true, we queue the method until the result arrivec. if pkt.packetType == packetTypeInvoke && queue { buf := pkt.body[1:] meth := amf.DecodeString(buf) - s.log(DebugLevel, pkg+"queuing method "+meth) + c.log(DebugLevel, pkg+"queuing method "+meth) buf = buf[3+len(meth):] txn := int32(amf.DecodeNumber(buf[:8])) - s.methodCalls = append(s.methodCalls, method{name: meth, num: txn}) + c.methodCalls = append(c.methodCalls, method{name: meth, num: txn}) } - if s.channelsOut[pkt.channel] == nil { - s.channelsOut[pkt.channel] = &packet{} + if c.channelsOut[pkt.channel] == nil { + c.channelsOut[pkt.channel] = &packet{} } - *(s.channelsOut[pkt.channel]) = *pkt + *(c.channelsOut[pkt.channel]) = *pkt return nil } From 998d41c96fe140e6c2b6dd9412ee64cf099d2271 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sat, 19 Jan 2019 13:12:24 +1030 Subject: [PATCH 04/13] Session now Conn, init() moved into Dial(), and connectStream() merged into connect(). --- rtmp/rtmp.go | 248 ++++++++++++++++++++++----------------------------- 1 file changed, 109 insertions(+), 139 deletions(-) diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index 37460f07..fe4c04d6 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -162,79 +162,49 @@ var ( errUnimplemented = errors.New("rtmp: unimplemented feature") ) -// init initialises the Session link -func (s *Session) init() (err error) { - s.link.protocol, s.link.host, s.link.port, s.link.app, s.link.playpath, err = parseURL(s.url) - if err != nil { - return err - } - if s.link.app == "" { - return errInvalidURL - } - if s.link.port == 0 { - switch { - case (s.link.protocol & featureSSL) != 0: - s.link.port = 433 - s.log(FatalLevel, pkg+"SSL not supported") - case (s.link.protocol & featureHTTP) != 0: - s.link.port = 80 - default: - s.link.port = 1935 - } - } - s.link.url = rtmpProtocolStrings[s.link.protocol] + "://" + s.link.host + ":" + strconv.Itoa(int(s.link.port)) + "/" + s.link.app - s.link.protocol |= featureWrite - return nil -} - // connect establishes an RTMP connection. -func connect(s *Session) error { - addr, err := net.ResolveTCPAddr("tcp4", s.link.host+":"+strconv.Itoa(int(s.link.port))) +func connect(c *Conn) error { + addr, err := net.ResolveTCPAddr("tcp4", c.link.host+":"+strconv.Itoa(int(c.link.port))) if err != nil { return err } - s.link.conn, err = net.DialTCP("tcp4", nil, addr) + c.link.conn, err = net.DialTCP("tcp4", nil, addr) if err != nil { - s.log(WarnLevel, pkg+"dial failed", "error", err.Error()) + c.log(WarnLevel, pkg+"dial failed", "error", err.Error()) return err } - s.log(DebugLevel, pkg+"connected") - err = handshake(s) + c.log(DebugLevel, pkg+"connected") + err = handshake(c) if err != nil { - s.log(WarnLevel, pkg+"handshake failed", "error", err.Error()) + c.log(WarnLevel, pkg+"handshake failed", "error", err.Error()) return err } - s.log(DebugLevel, pkg+"handshaked") - err = sendConnectPacket(s) + c.log(DebugLevel, pkg+"handshaked") + err = sendConnectPacket(c) if err != nil { - s.log(WarnLevel, pkg+"sendConnect failed", "error", err.Error()) + c.log(WarnLevel, pkg+"sendConnect failed", "error", err.Error()) return err } - return nil -} - -// connectStream reads a packet and handles it -func connectStream(s *Session) error { - var err error - for !s.isPlaying { + c.log(DebugLevel, pkg+"negotiating") + for !c.isPlaying { pkt := packet{} - err = pkt.readFrom(s) + err = pkt.readFrom(c) if err != nil { break } switch pkt.packetType { case packetTypeAudio, packetTypeVideo, packetTypeInfo: - s.log(WarnLevel, pkg+"got packet before play; ignoring", "type", pkt.packetType) + c.log(WarnLevel, pkg+"got packet before play; ignoring", "type", pkt.packetType) default: - err = handlePacket(s, &pkt) + err = handlePacket(c, &pkt) if err != nil { break } } } - if !s.isPlaying { + if !c.isPlaying { return err } return nil @@ -242,50 +212,50 @@ func connectStream(s *Session) error { // handlePacket handles a packet that the client has received. // NB: Unsupported packet types are logged fatally. -func handlePacket(s *Session, pkt *packet) error { +func handlePacket(c *Conn, pkt *packet) error { if pkt.bodySize < 4 { return errInvalidBody } switch pkt.packetType { case packetTypeChunkSize: - s.inChunkSize = amf.DecodeInt32(pkt.body[:4]) - s.log(DebugLevel, pkg+"set inChunkSize", "size", int(s.inChunkSize)) + c.inChunkSize = amf.DecodeInt32(pkt.body[:4]) + c.log(DebugLevel, pkg+"set inChunkSize", "size", int(c.inChunkSize)) case packetTypeBytesReadReport: - s.log(DebugLevel, pkg+"received packetTypeBytesReadReport") + c.log(DebugLevel, pkg+"received packetTypeBytesReadReport") case packetTypeServerBW: - s.serverBW = amf.DecodeInt32(pkt.body[:4]) - s.log(DebugLevel, pkg+"set serverBW", "size", int(s.serverBW)) + c.serverBW = amf.DecodeInt32(pkt.body[:4]) + c.log(DebugLevel, pkg+"set serverBW", "size", int(c.serverBW)) case packetTypeClientBW: - s.clientBW = amf.DecodeInt32(pkt.body[:4]) - s.log(DebugLevel, pkg+"set clientBW", "size", int(s.clientBW)) + c.clientBW = amf.DecodeInt32(pkt.body[:4]) + c.log(DebugLevel, pkg+"set clientBW", "size", int(c.clientBW)) if pkt.bodySize > 4 { - s.clientBW2 = pkt.body[4] - s.log(DebugLevel, pkg+"set clientBW2", "size", int(s.clientBW2)) + c.clientBW2 = pkt.body[4] + c.log(DebugLevel, pkg+"set clientBW2", "size", int(c.clientBW2)) } else { - s.clientBW2 = 0xff + c.clientBW2 = 0xff } case packetTypeInvoke: - err := handleInvoke(s, pkt.body[:pkt.bodySize]) + err := handleInvoke(c, pkt.body[:pkt.bodySize]) if err != nil { - s.log(WarnLevel, pkg+"unexpected error from handleInvoke", "error", err.Error()) + c.log(WarnLevel, pkg+"unexpected error from handleInvoke", "error", err.Error()) return err } case packetTypeControl, packetTypeAudio, packetTypeVideo, packetTypeFlashVideo, packetTypeFlexMessage, packetTypeInfo: - s.log(FatalLevel, pkg+"unsupported packet type "+strconv.Itoa(int(pkt.packetType))) + c.log(FatalLevel, pkg+"unsupported packet type "+strconv.Itoa(int(pkt.packetType))) default: - s.log(WarnLevel, pkg+"unknown packet type", "type", pkt.packetType) + c.log(WarnLevel, pkg+"unknown packet type", "type", pkt.packetType) } return nil } -func sendConnectPacket(s *Session) error { +func sendConnectPacket(c *Conn) error { var pbuf [4096]byte pkt := packet{ channel: chanControl, @@ -300,15 +270,15 @@ func sendConnectPacket(s *Session) error { if err != nil { return err } - s.numInvokes += 1 - enc, err = amf.EncodeNumber(enc, float64(s.numInvokes)) + c.numInvokes += 1 + enc, err = amf.EncodeNumber(enc, float64(c.numInvokes)) if err != nil { return err } enc[0] = amf.TypeObject enc = enc[1:] - enc, err = amf.EncodeNamedString(enc, avApp, s.link.app) + enc, err = amf.EncodeNamedString(enc, avApp, c.link.app) if err != nil { return err } @@ -316,7 +286,7 @@ func sendConnectPacket(s *Session) error { if err != nil { return err } - enc, err = amf.EncodeNamedString(enc, avTcUrl, s.link.url) + enc, err = amf.EncodeNamedString(enc, avTcUrl, c.link.url) if err != nil { return err } @@ -326,12 +296,12 @@ func sendConnectPacket(s *Session) error { } // add auth string, if any - if s.link.auth != "" { - enc, err = amf.EncodeBoolean(enc, s.link.flags&linkAuth != 0) + if c.link.auth != "" { + enc, err = amf.EncodeBoolean(enc, c.link.flags&linkAuth != 0) if err != nil { return err } - enc, err = amf.EncodeString(enc, s.link.auth) + enc, err = amf.EncodeString(enc, c.link.auth) if err != nil { return err } @@ -339,10 +309,10 @@ func sendConnectPacket(s *Session) error { pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) - return pkt.writeTo(s, true) // response expected + return pkt.writeTo(c, true) // response expected } -func sendCreateStream(s *Session) error { +func sendCreateStream(c *Conn) error { var pbuf [256]byte pkt := packet{ channel: chanControl, @@ -357,8 +327,8 @@ func sendCreateStream(s *Session) error { if err != nil { return err } - s.numInvokes++ - enc, err = amf.EncodeNumber(enc, float64(s.numInvokes)) + c.numInvokes++ + enc, err = amf.EncodeNumber(enc, float64(c.numInvokes)) if err != nil { return err } @@ -367,10 +337,10 @@ func sendCreateStream(s *Session) error { pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) - return pkt.writeTo(s, true) // response expected + return pkt.writeTo(c, true) // response expected } -func sendReleaseStream(s *Session) error { +func sendReleaseStream(c *Conn) error { var pbuf [1024]byte pkt := packet{ channel: chanControl, @@ -385,23 +355,23 @@ func sendReleaseStream(s *Session) error { if err != nil { return err } - s.numInvokes++ - enc, err = amf.EncodeNumber(enc, float64(s.numInvokes)) + c.numInvokes++ + enc, err = amf.EncodeNumber(enc, float64(c.numInvokes)) if err != nil { return err } enc[0] = amf.TypeNull enc = enc[1:] - enc, err = amf.EncodeString(enc, s.link.playpath) + enc, err = amf.EncodeString(enc, c.link.playpath) if err != nil { return err } pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) - return pkt.writeTo(s, false) + return pkt.writeTo(c, false) } -func sendFCPublish(s *Session) error { +func sendFCPublish(c *Conn) error { var pbuf [1024]byte pkt := packet{ channel: chanControl, @@ -416,24 +386,24 @@ func sendFCPublish(s *Session) error { if err != nil { return err } - s.numInvokes++ - enc, err = amf.EncodeNumber(enc, float64(s.numInvokes)) + c.numInvokes++ + enc, err = amf.EncodeNumber(enc, float64(c.numInvokes)) if err != nil { return err } enc[0] = amf.TypeNull enc = enc[1:] - enc, err = amf.EncodeString(enc, s.link.playpath) + enc, err = amf.EncodeString(enc, c.link.playpath) if err != nil { return err } pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) - return pkt.writeTo(s, false) + return pkt.writeTo(c, false) } -func sendFCUnpublish(s *Session) error { +func sendFCUnpublish(c *Conn) error { var pbuf [1024]byte pkt := packet{ channel: chanControl, @@ -448,24 +418,24 @@ func sendFCUnpublish(s *Session) error { if err != nil { return err } - s.numInvokes++ - enc, err = amf.EncodeNumber(enc, float64(s.numInvokes)) + c.numInvokes++ + enc, err = amf.EncodeNumber(enc, float64(c.numInvokes)) if err != nil { return err } enc[0] = amf.TypeNull enc = enc[1:] - enc, err = amf.EncodeString(enc, s.link.playpath) + enc, err = amf.EncodeString(enc, c.link.playpath) if err != nil { return err } pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) - return pkt.writeTo(s, false) + return pkt.writeTo(c, false) } -func sendPublish(s *Session) error { +func sendPublish(c *Conn) error { var pbuf [1024]byte pkt := packet{ channel: chanSource, @@ -480,14 +450,14 @@ func sendPublish(s *Session) error { if err != nil { return err } - s.numInvokes++ - enc, err = amf.EncodeNumber(enc, float64(s.numInvokes)) + c.numInvokes++ + enc, err = amf.EncodeNumber(enc, float64(c.numInvokes)) if err != nil { return err } enc[0] = amf.TypeNull enc = enc[1:] - enc, err = amf.EncodeString(enc, s.link.playpath) + enc, err = amf.EncodeString(enc, c.link.playpath) if err != nil { return err } @@ -498,10 +468,10 @@ func sendPublish(s *Session) error { pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) - return pkt.writeTo(s, true) // response expected + return pkt.writeTo(c, true) // response expected } -func sendDeleteStream(s *Session, dStreamId float64) error { +func sendDeleteStream(c *Conn, dStreamId float64) error { var pbuf [256]byte pkt := packet{ channel: chanControl, @@ -516,8 +486,8 @@ func sendDeleteStream(s *Session, dStreamId float64) error { if err != nil { return err } - s.numInvokes++ - enc, err = amf.EncodeNumber(enc, float64(s.numInvokes)) + c.numInvokes++ + enc, err = amf.EncodeNumber(enc, float64(c.numInvokes)) if err != nil { return err } @@ -529,11 +499,11 @@ func sendDeleteStream(s *Session, dStreamId float64) error { } pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) - return pkt.writeTo(s, false) + return pkt.writeTo(c, false) } // sendBytesReceived tells the server how many bytes the client has received. -func sendBytesReceived(s *Session) error { +func sendBytesReceived(c *Conn) error { var pbuf [256]byte pkt := packet{ channel: chanBytesRead, @@ -544,18 +514,18 @@ func sendBytesReceived(s *Session) error { } enc := pkt.body - s.nBytesInSent = s.nBytesIn + c.nBytesInSent = c.nBytesIn - enc, err := amf.EncodeInt32(enc, s.nBytesIn) + enc, err := amf.EncodeInt32(enc, c.nBytesIn) if err != nil { return err } pkt.bodySize = 4 - return pkt.writeTo(s, false) + return pkt.writeTo(c, false) } -func sendCheckBW(s *Session) error { +func sendCheckBW(c *Conn) error { var pbuf [256]byte pkt := packet{ channel: chanControl, @@ -570,8 +540,8 @@ func sendCheckBW(s *Session) error { if err != nil { return err } - s.numInvokes++ - enc, err = amf.EncodeNumber(enc, float64(s.numInvokes)) + c.numInvokes++ + enc, err = amf.EncodeNumber(enc, float64(c.numInvokes)) if err != nil { return err } @@ -580,7 +550,7 @@ func sendCheckBW(s *Session) error { pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc)) - return pkt.writeTo(s, false) + return pkt.writeTo(c, false) } func eraseMethod(m []method, i int) []method { @@ -590,8 +560,8 @@ func eraseMethod(m []method, i int) []method { } // int handleInvoke handles a packet invoke request -// Side effects: s.isPlaying set to true upon avNetStreamPublish_Start -func handleInvoke(s *Session, body []byte) error { +// Side effects: c.isPlaying set to true upon avNetStreamPublish_Start +func handleInvoke(c *Conn, body []byte) error { if body[0] != 0x02 { return errInvalidBody } @@ -610,37 +580,37 @@ func handleInvoke(s *Session, body []byte) error { return err } - s.log(DebugLevel, pkg+"invoking method "+meth) + c.log(DebugLevel, pkg+"invoking method "+meth) switch meth { case av_result: - if (s.link.protocol & featureWrite) == 0 { + if (c.link.protocol & featureWrite) == 0 { return errNotWritable } var methodInvoked string - for i, m := range s.methodCalls { + for i, m := range c.methodCalls { if float64(m.num) == txn { methodInvoked = m.name - s.methodCalls = eraseMethod(s.methodCalls, i) + c.methodCalls = eraseMethod(c.methodCalls, i) break } } if methodInvoked == "" { - s.log(WarnLevel, pkg+"received result without matching request", "id", txn) + c.log(WarnLevel, pkg+"received result without matching request", "id", txn) return nil } - s.log(DebugLevel, pkg+"received result for "+methodInvoked) + c.log(DebugLevel, pkg+"received result for "+methodInvoked) switch methodInvoked { case avConnect: - err := sendReleaseStream(s) + err := sendReleaseStream(c) if err != nil { return err } - err = sendFCPublish(s) + err = sendFCPublish(c) if err != nil { return err } - err = sendCreateStream(s) + err = sendCreateStream(c) if err != nil { return err } @@ -650,18 +620,18 @@ func handleInvoke(s *Session, body []byte) error { if err != nil { return err } - s.streamID = int32(n) - err = sendPublish(s) + c.streamID = int32(n) + err = sendPublish(c) if err != nil { return err } default: - s.log(FatalLevel, pkg+"unexpected method invoked"+methodInvoked) + c.log(FatalLevel, pkg+"unexpected method invoked"+methodInvoked) } case avOnBWDone: - err := sendCheckBW(s) + err := sendCheckBW(c) if err != nil { return err } @@ -679,27 +649,27 @@ func handleInvoke(s *Session, body []byte) error { if err != nil { return err } - s.log(DebugLevel, pkg+"onStatus", "code", code, "level", level) + c.log(DebugLevel, pkg+"onStatus", "code", code, "level", level) if code != avNetStreamPublish_Start { - s.log(ErrorLevel, pkg+"unexpected response "+code) + c.log(ErrorLevel, pkg+"unexpected response "+code) return errUnimplemented } - s.log(DebugLevel, pkg+"playing") - s.isPlaying = true - for i, m := range s.methodCalls { + c.log(DebugLevel, pkg+"playing") + c.isPlaying = true + for i, m := range c.methodCalls { if m.name == avPublish { - s.methodCalls = eraseMethod(s.methodCalls, i) + c.methodCalls = eraseMethod(c.methodCalls, i) } } default: - s.log(FatalLevel, pkg+"unsuppoted method "+meth) + c.log(FatalLevel, pkg+"unsuppoted method "+meth) } return nil } -func handshake(s *Session) error { +func handshake(c *Conn) error { var clientbuf [signatureSize + 1]byte clientsig := clientbuf[1:] @@ -712,44 +682,44 @@ func handshake(s *Session) error { clientsig[i] = byte(rand.Intn(256)) } - _, err := s.write(clientbuf[:]) + _, err := c.write(clientbuf[:]) if err != nil { return err } - s.log(DebugLevel, pkg+"handshake sent") + c.log(DebugLevel, pkg+"handshake sent") var typ [1]byte - _, err = s.read(typ[:]) + _, err = c.read(typ[:]) if err != nil { return err } - s.log(DebugLevel, pkg+"handshake received") + c.log(DebugLevel, pkg+"handshake received") if typ[0] != clientbuf[0] { - s.log(WarnLevel, pkg+"handshake type mismatch", "sent", clientbuf[0], "received", typ) + c.log(WarnLevel, pkg+"handshake type mismatch", "sent", clientbuf[0], "received", typ) } - _, err = s.read(serversig[:]) + _, err = c.read(serversig[:]) if err != nil { return err } // decode server response suptime := binary.BigEndian.Uint32(serversig[:4]) - s.log(DebugLevel, pkg+"server uptime", "uptime", suptime) + c.log(DebugLevel, pkg+"server uptime", "uptime", suptime) // 2nd part of handshake - _, err = s.write(serversig[:]) + _, err = c.write(serversig[:]) if err != nil { return err } - _, err = s.read(serversig[:]) + _, err = c.read(serversig[:]) if err != nil { return err } if !bytes.Equal(serversig[:signatureSize], clientbuf[1:signatureSize+1]) { - s.log(WarnLevel, pkg+"signature mismatch", "serversig", serversig[:signatureSize], "clientsig", clientbuf[1:signatureSize+1]) + c.log(WarnLevel, pkg+"signature mismatch", "serversig", serversig[:signatureSize], "clientsig", clientbuf[1:signatureSize+1]) } return nil } From ce95901b6697ff19e50e2921a3751cb33d198ee6 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sat, 19 Jan 2019 13:16:46 +1030 Subject: [PATCH 05/13] Session renamed Conn and NewSession() and Open() replaced with Dial(). --- rtmp/session.go | 133 ++++++++++++++++++++++++------------------------ 1 file changed, 66 insertions(+), 67 deletions(-) diff --git a/rtmp/session.go b/rtmp/session.go index 8da2626b..1d43c2a4 100644 --- a/rtmp/session.go +++ b/rtmp/session.go @@ -1,9 +1,9 @@ /* NAME - session.go + conn.go DESCRIPTION - RTMP session functionality. + RTMP connection functionality. AUTHORS Saxon Nelson-Milton @@ -11,7 +11,7 @@ AUTHORS Alan Noble LICENSE - session.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) + conn.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the @@ -36,14 +36,14 @@ package rtmp import ( "io" "net" + "strconv" "time" "bitbucket.org/ausocean/av/rtmp/amf" ) -// Session holds the state for an RTMP session. -type Session struct { - url string +// Conn represents an RTMP connection. +type Conn struct { inChunkSize uint32 outChunkSize uint32 nBytesIn uint32 @@ -60,8 +60,6 @@ type Session struct { channelsIn []*packet channelsOut []*packet channelTimestamp []int32 - audioCodecs float64 - videoCodecs float64 deferred []byte link link log Log @@ -103,69 +101,71 @@ const ( // NB: We don't accept extended headers. const flvTagheaderSize = 11 -// NewSession returns a new Session. -func NewSession(url string, timeout uint, log Log) *Session { - return &Session{ - url: url, +// Dial connects to RTMP server specified by the given URL and returns the connection. +func Dial(url string, timeout uint, log Log) (*Conn, error) { + log(DebugLevel, pkg+"rtmp.Dial") + c := Conn{ inChunkSize: 128, outChunkSize: 128, clientBW: 2500000, clientBW2: 2, serverBW: 2500000, - audioCodecs: 3191.0, - videoCodecs: 252.0, log: log, link: link{ timeout: timeout, }, } + + var err error + c.link.protocol, c.link.host, c.link.port, c.link.app, c.link.playpath, err = parseURL(url) + if err != nil { + return nil, err + } + if c.link.app == "" { + return nil, errInvalidURL + } + if c.link.port == 0 { + switch { + case (c.link.protocol & featureSSL) != 0: + c.link.port = 433 + c.log(FatalLevel, pkg+"SSL not supported") + case (c.link.protocol & featureHTTP) != 0: + c.link.port = 80 + default: + c.link.port = 1935 + } + } + c.link.url = rtmpProtocolStrings[c.link.protocol] + "://" + c.link.host + ":" + strconv.Itoa(int(c.link.port)) + "/" + c.link.app + c.link.protocol |= featureWrite + + err = connect(&c) + if err != nil { + return nil, err + } + return &c, nil } -// Open establishes an rtmp connection with the url passed into the constructor. -func (s *Session) Open() error { - s.log(DebugLevel, pkg+"Session.Open") - if s.isConnected() { - return errConnected - } - err := s.init() - if err != nil { - return err - } - err = connect(s) - if err != nil { - s.Close() - return err - } - - err = connectStream(s) - if err != nil { - s.Close() - return err - } - return nil -} - -// Close terminates the rtmp connection. -// NB: Close is idempotent and the session value is cleared completely. -func (s *Session) Close() error { - s.log(DebugLevel, pkg+"Session.Close") - if !s.isConnected() { +// Close terminates the RTMP connection. +// NB: Close is idempotent and the connection value is cleared completely. +func (c *Conn) Close() error { + c.log(DebugLevel, pkg+"Conn.Close") + if !c.isConnected() { return errNotConnected } - if s.streamID > 0 { - if s.link.protocol&featureWrite != 0 { - sendFCUnpublish(s) + if c.streamID > 0 { + if c.link.protocol&featureWrite != 0 { + sendFCUnpublish(c) } - sendDeleteStream(s, float64(s.streamID)) + sendDeleteStream(c, float64(c.streamID)) } - s.link.conn.Close() - *s = Session{} + c.link.conn.Close() + *c = Conn{} return nil } // Write writes a frame (flv tag) to the rtmp connection. -func (s *Session) Write(data []byte) (int, error) { - if !s.isConnected() { +func (c *Conn) Write(data []byte) (int, error) { + if !c.isConnected() { return 0, errNotConnected } if len(data) < flvTagheaderSize { @@ -180,12 +180,12 @@ func (s *Session) Write(data []byte) (int, error) { bodySize: amf.DecodeInt24(data[1:4]), timestamp: amf.DecodeInt24(data[4:7]) | uint32(data[7])<<24, channel: chanSource, - info: s.streamID, + info: c.streamID, } pkt.resize(pkt.bodySize, headerSizeAuto) copy(pkt.body, data[flvTagheaderSize:flvTagheaderSize+pkt.bodySize]) - err := pkt.writeTo(s, false) + err := pkt.writeTo(c, false) if err != nil { return 0, err } @@ -197,19 +197,19 @@ func (s *Session) Write(data []byte) (int, error) { // read from an RTMP connection. Sends a bytes received message if the // number of bytes received (nBytesIn) is greater than the number sent // (nBytesInSent) by 10% of the bandwidth. -func (s *Session) read(buf []byte) (int, error) { - err := s.link.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(s.link.timeout))) +func (c *Conn) read(buf []byte) (int, error) { + err := c.link.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(c.link.timeout))) if err != nil { return 0, err } - n, err := io.ReadFull(s.link.conn, buf) + n, err := io.ReadFull(c.link.conn, buf) if err != nil { - s.log(DebugLevel, pkg+"read failed", "error", err.Error()) + c.log(DebugLevel, pkg+"read failed", "error", err.Error()) return 0, err } - s.nBytesIn += uint32(n) - if s.nBytesIn > (s.nBytesInSent + s.clientBW/10) { - err := sendBytesReceived(s) + c.nBytesIn += uint32(n) + if c.nBytesIn > (c.nBytesInSent + c.clientBW/10) { + err := sendBytesReceived(c) if err != nil { return n, err // NB: we still read n bytes, even though send bytes failed } @@ -218,22 +218,21 @@ func (s *Session) read(buf []byte) (int, error) { } // write to an RTMP connection. -func (s *Session) write(buf []byte) (int, error) { +func (c *Conn) write(buf []byte) (int, error) { //ToDo: consider using a different timeout for writes than for reads - err := s.link.conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(s.link.timeout))) + err := c.link.conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(c.link.timeout))) if err != nil { return 0, err } - n, err := s.link.conn.Write(buf) + n, err := c.link.conn.Write(buf) if err != nil { - s.log(WarnLevel, pkg+"write failed", "error", err.Error()) + c.log(WarnLevel, pkg+"write failed", "error", err.Error()) return 0, err } return n, nil } // isConnected returns true if the RTMP connection is up. -func (s *Session) isConnected() bool { - return s.link.conn != nil +func (c *Conn) isConnected() bool { + return c.link.conn != nil } - From fc7ae413d3b223a4a41a05a1a21c1c53bdfccc22 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sat, 19 Jan 2019 13:18:15 +1030 Subject: [PATCH 06/13] Session renamed Conn. --- rtmp/conn.go | 238 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 238 insertions(+) create mode 100644 rtmp/conn.go diff --git a/rtmp/conn.go b/rtmp/conn.go new file mode 100644 index 00000000..1d43c2a4 --- /dev/null +++ b/rtmp/conn.go @@ -0,0 +1,238 @@ +/* +NAME + conn.go + +DESCRIPTION + RTMP connection functionality. + +AUTHORS + Saxon Nelson-Milton + Dan Kortschak + Alan Noble + +LICENSE + conn.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. + + Derived from librtmp under the GNU Lesser General Public License 2.1 + Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org + Copyright (C) 2008-2009 Andrej Stepanchuk + Copyright (C) 2009-2010 Howard Chu +*/ +package rtmp + +import ( + "io" + "net" + "strconv" + "time" + + "bitbucket.org/ausocean/av/rtmp/amf" +) + +// Conn represents an RTMP connection. +type Conn struct { + inChunkSize uint32 + outChunkSize uint32 + nBytesIn uint32 + nBytesInSent uint32 + streamID int32 + serverBW uint32 + clientBW uint32 + clientBW2 uint8 + isPlaying bool + numInvokes int32 + methodCalls []method + channelsAllocatedIn int32 + channelsAllocatedOut int32 + channelsIn []*packet + channelsOut []*packet + channelTimestamp []int32 + deferred []byte + link link + log Log +} + +// link represents RTMP URL and connection information. +type link struct { + host string + playpath string + url string + app string + auth string + flags int32 + protocol int32 + timeout uint + port uint16 + conn *net.TCPConn +} + +// method represents an RTMP method. +type method struct { + name string + num int32 +} + +// Log defines the RTMP logging function. +type Log func(level int8, message string, params ...interface{}) + +// Log levels used by Log. +const ( + DebugLevel int8 = -1 + InfoLevel int8 = 0 + WarnLevel int8 = 1 + ErrorLevel int8 = 2 + FatalLevel int8 = 5 +) + +// flvTagheaderSize is the FLV header size we expect. +// NB: We don't accept extended headers. +const flvTagheaderSize = 11 + +// Dial connects to RTMP server specified by the given URL and returns the connection. +func Dial(url string, timeout uint, log Log) (*Conn, error) { + log(DebugLevel, pkg+"rtmp.Dial") + c := Conn{ + inChunkSize: 128, + outChunkSize: 128, + clientBW: 2500000, + clientBW2: 2, + serverBW: 2500000, + log: log, + link: link{ + timeout: timeout, + }, + } + + var err error + c.link.protocol, c.link.host, c.link.port, c.link.app, c.link.playpath, err = parseURL(url) + if err != nil { + return nil, err + } + if c.link.app == "" { + return nil, errInvalidURL + } + if c.link.port == 0 { + switch { + case (c.link.protocol & featureSSL) != 0: + c.link.port = 433 + c.log(FatalLevel, pkg+"SSL not supported") + case (c.link.protocol & featureHTTP) != 0: + c.link.port = 80 + default: + c.link.port = 1935 + } + } + c.link.url = rtmpProtocolStrings[c.link.protocol] + "://" + c.link.host + ":" + strconv.Itoa(int(c.link.port)) + "/" + c.link.app + c.link.protocol |= featureWrite + + err = connect(&c) + if err != nil { + return nil, err + } + return &c, nil +} + +// Close terminates the RTMP connection. +// NB: Close is idempotent and the connection value is cleared completely. +func (c *Conn) Close() error { + c.log(DebugLevel, pkg+"Conn.Close") + if !c.isConnected() { + return errNotConnected + } + if c.streamID > 0 { + if c.link.protocol&featureWrite != 0 { + sendFCUnpublish(c) + } + sendDeleteStream(c, float64(c.streamID)) + } + c.link.conn.Close() + *c = Conn{} + return nil +} + +// Write writes a frame (flv tag) to the rtmp connection. +func (c *Conn) Write(data []byte) (int, error) { + if !c.isConnected() { + return 0, errNotConnected + } + if len(data) < flvTagheaderSize { + return 0, errInvalidFlvTag + } + if data[0] == packetTypeInfo || (data[0] == 'F' && data[1] == 'L' && data[2] == 'V') { + return 0, errUnimplemented + } + + pkt := packet{ + packetType: data[0], + bodySize: amf.DecodeInt24(data[1:4]), + timestamp: amf.DecodeInt24(data[4:7]) | uint32(data[7])<<24, + channel: chanSource, + info: c.streamID, + } + + pkt.resize(pkt.bodySize, headerSizeAuto) + copy(pkt.body, data[flvTagheaderSize:flvTagheaderSize+pkt.bodySize]) + err := pkt.writeTo(c, false) + if err != nil { + return 0, err + } + return len(data), nil +} + +// I/O functions + +// read from an RTMP connection. Sends a bytes received message if the +// number of bytes received (nBytesIn) is greater than the number sent +// (nBytesInSent) by 10% of the bandwidth. +func (c *Conn) read(buf []byte) (int, error) { + err := c.link.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(c.link.timeout))) + if err != nil { + return 0, err + } + n, err := io.ReadFull(c.link.conn, buf) + if err != nil { + c.log(DebugLevel, pkg+"read failed", "error", err.Error()) + return 0, err + } + c.nBytesIn += uint32(n) + if c.nBytesIn > (c.nBytesInSent + c.clientBW/10) { + err := sendBytesReceived(c) + if err != nil { + return n, err // NB: we still read n bytes, even though send bytes failed + } + } + return n, nil +} + +// write to an RTMP connection. +func (c *Conn) write(buf []byte) (int, error) { + //ToDo: consider using a different timeout for writes than for reads + err := c.link.conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(c.link.timeout))) + if err != nil { + return 0, err + } + n, err := c.link.conn.Write(buf) + if err != nil { + c.log(WarnLevel, pkg+"write failed", "error", err.Error()) + return 0, err + } + return n, nil +} + +// isConnected returns true if the RTMP connection is up. +func (c *Conn) isConnected() bool { + return c.link.conn != nil +} From 9cc0e5631cd4b72290cdc34b231c1df3a56404c9 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sat, 19 Jan 2019 13:25:05 +1030 Subject: [PATCH 07/13] Session now Conn, and NewSession()+Open() replaced by Dial(). --- rtmp/rtmp_test.go | 86 +++++++++++++++++------------------------------ 1 file changed, 31 insertions(+), 55 deletions(-) diff --git a/rtmp/rtmp_test.go b/rtmp/rtmp_test.go index 2ce90e63..618f2b95 100644 --- a/rtmp/rtmp_test.go +++ b/rtmp/rtmp_test.go @@ -116,75 +116,54 @@ func TestKey(t *testing.T) { testLog(0, "Testing against URL "+testBaseURL+testKey) } -// TestInit tests session construction and link initialization. -func TestInit(t *testing.T) { - testLog(0, "TestInit") - // test with just the base URL - s := NewSession(testBaseURL, testTimeout, testLog) - if s.url != testBaseURL && s.link.timeout != testTimeout { - t.Errorf("NewSession failed") - } - err := s.init() - if err != nil { - t.Errorf("setupURL: failed with error: %v", err) - } - // test the parts are as expected - if s.link.protocol&featureWrite == 0 { - t.Errorf("setupURL: link not writable") - } - if rtmpProtocolStrings[s.link.protocol&^featureWrite] != rtmpProtocol { - t.Errorf("setupURL: wrong protocol: %v", s.link.protocol) - } - if s.link.host != testHost { - t.Errorf("setupURL: wrong host: %v", s.link.host) - } - if s.link.app != testApp { - t.Errorf("setupURL: wrong app: %v", s.link.app) - } -} - // TestErrorHandling tests error handling func TestErorHandling(t *testing.T) { testLog(0, "TestErrorHandling") if testKey == "" { t.Skip("Skipping TestErrorHandling since no RTMP_TEST_KEY") } - s := NewSession(testBaseURL+testKey, testTimeout, testLog) - - // test errNotConnected - var buf [1024]byte - tag := buf[:0] - _, err := s.Write(tag) - if err == nil { - t.Errorf("Write did not return errNotConnected") + c, err := Dial(testBaseURL+testKey, testTimeout, testLog) + if err != nil { + t.Errorf("Dial failed with error: %v", err) } - err = s.Open() - if err != nil { - t.Errorf("Open failed with error: %v", err) + // test the link parts are as expected + if c.link.protocol&featureWrite == 0 { + t.Errorf("link not writable") + } + if rtmpProtocolStrings[c.link.protocol&^featureWrite] != rtmpProtocol { + t.Errorf("wrong protocol: %v", c.link.protocol) + } + if c.link.host != testHost { + t.Errorf("wrong host: %v", c.link.host) + } + if c.link.app != testApp { + t.Errorf("wrong app: %v", c.link.app) } // test errInvalidFlvTag - _, err = s.Write(tag) + var buf [1024]byte + tag := buf[:0] + _, err = c.Write(tag) if err == nil { t.Errorf("Write did not return errInvalidFlvTag") } // test errUnimplemented copy(tag, []byte("FLV")) - _, err = s.Write(tag) + _, err = c.Write(tag) if err == nil { t.Errorf("Write did not return errUnimplemented") } // test errInvalidBody tag = buf[:11] - _, err = s.Write(tag) + _, err = c.Write(tag) if err == nil { t.Errorf("Write did not return errInvalidBody") } - err = s.Close() + err = c.Close() if err != nil { t.Errorf("Close failed with error: %v", err) return @@ -197,10 +176,9 @@ func TestFromFrame(t *testing.T) { if testKey == "" { t.Skip("Skipping TestFromFrame since no RTMP_TEST_KEY") } - s := NewSession(testBaseURL+testKey, testTimeout, testLog) - err := s.Open() + c, err := Dial(testBaseURL+testKey, testTimeout, testLog) if err != nil { - t.Errorf("Session.Open failed with error: %v", err) + t.Errorf("Dial failed with error: %v", err) } testFrame := os.Getenv("RTMP_TEST_FRAME") @@ -216,7 +194,7 @@ func TestFromFrame(t *testing.T) { } const frameRate = 25 - rs := &rtmpSender{s: s} + rs := &rtmpSender{conn: c} flvEncoder, err := flv.NewEncoder(rs, true, true, frameRate) if err != nil { t.Errorf("Failed to create flv encoder with error: %v", err) @@ -226,18 +204,18 @@ func TestFromFrame(t *testing.T) { t.Errorf("Lexing failed with error: %v", err) } - err = s.Close() + err = c.Close() if err != nil { t.Errorf("Session.Close failed with error: %v", err) } } type rtmpSender struct { - s *Session + conn *Conn } func (rs *rtmpSender) Write(p []byte) (int, error) { - n, err := rs.s.Write(p) + n, err := rs.conn.Write(p) if err != errInvalidFlvTag && err != nil { return 0, err } @@ -255,12 +233,10 @@ func TestFromFile(t *testing.T) { if testKey == "" { t.Skip("Skipping TestFromFile since no RTMP_TEST_KEY") } - s := NewSession(testBaseURL+testKey, testTimeout, testLog) - err := s.Open() + c, err := Dial(testBaseURL+testKey, testTimeout, testLog) if err != nil { - t.Errorf("Session.Open failed with error: %v", err) + t.Errorf("Dial failed with error: %v", err) } - f, err := os.Open(testFile) if err != nil { t.Errorf("Open failed with error: %v", err) @@ -268,7 +244,7 @@ func TestFromFile(t *testing.T) { defer f.Close() // Pass RTMP session, true for audio, true for video, and 25 FPS - flvEncoder, err := flv.NewEncoder(s, true, true, 25) + flvEncoder, err := flv.NewEncoder(c, true, true, 25) if err != nil { t.Fatalf("failed to create encoder: %v", err) } @@ -277,7 +253,7 @@ func TestFromFile(t *testing.T) { t.Errorf("Lexing and encoding failed with error: %v", err) } - err = s.Close() + err = c.Close() if err != nil { t.Errorf("Session.Close failed with error: %v", err) } From 55483454396e5ac858a690726ee4abf55dc4552b Mon Sep 17 00:00:00 2001 From: scruzin Date: Sat, 19 Jan 2019 15:18:10 +1030 Subject: [PATCH 08/13] Fix comments munged by /.s/.c search and replaces. --- rtmp/packet.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rtmp/packet.go b/rtmp/packet.go index 73cc0088..a949cbbc 100644 --- a/rtmp/packet.go +++ b/rtmp/packet.go @@ -270,7 +270,7 @@ func (pkt *packet) resize(size uint32, ht uint8) { // writeTo writes a packet to the RTMP connection. // Packets are written in chunks which are Session.chunkSize in length (128 bytes in length). // We defer sending small audio packets and combine consecutive small audio packets where possible to reduce I/O. -// When queue is true, we expect a response to this request and cache the method on c.methodCallc. +// When queue is true, we expect a response to this request and cache the method on c.methodCalls. func (pkt *packet) writeTo(c *Conn, queue bool) error { if pkt.body == nil || pkt.bodySize == 0 { return errInvalidBody @@ -298,7 +298,7 @@ func (pkt *packet) writeTo(c *Conn, queue bool) error { prevPkt := c.channelsOut[pkt.channel] var last int if prevPkt != nil && pkt.headerType != headerSizeLarge { - // Compress header by using the previous packet's attributec. + // Compress header by using the previous packet's attributes. if prevPkt.bodySize == pkt.bodySize && prevPkt.packetType == pkt.packetType && pkt.headerType == headerSizeMedium { pkt.headerType = headerSizeSmall } @@ -468,7 +468,7 @@ func (pkt *packet) writeTo(c *Conn, queue bool) error { } } - // If we invoked a remote method and queue is true, we queue the method until the result arrivec. + // If we invoked a remote method and queue is true, we queue the method until the result arrives. if pkt.packetType == packetTypeInvoke && queue { buf := pkt.body[1:] meth := amf.DecodeString(buf) From 9bb6b3ae6d89058b8514632153ce8d82a49a6d6e Mon Sep 17 00:00:00 2001 From: Alan Noble Date: Sat, 19 Jan 2019 04:52:32 +0000 Subject: [PATCH 09/13] rtmp/session.go deleted (replaced by conn.go) --- rtmp/session.go | 238 ------------------------------------------------ 1 file changed, 238 deletions(-) delete mode 100644 rtmp/session.go diff --git a/rtmp/session.go b/rtmp/session.go deleted file mode 100644 index 1d43c2a4..00000000 --- a/rtmp/session.go +++ /dev/null @@ -1,238 +0,0 @@ -/* -NAME - conn.go - -DESCRIPTION - RTMP connection functionality. - -AUTHORS - Saxon Nelson-Milton - Dan Kortschak - Alan Noble - -LICENSE - conn.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) - - It is free software: you can redistribute it and/or modify them - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - It is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. - - You should have received a copy of the GNU General Public License - along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. - - Derived from librtmp under the GNU Lesser General Public License 2.1 - Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org - Copyright (C) 2008-2009 Andrej Stepanchuk - Copyright (C) 2009-2010 Howard Chu -*/ -package rtmp - -import ( - "io" - "net" - "strconv" - "time" - - "bitbucket.org/ausocean/av/rtmp/amf" -) - -// Conn represents an RTMP connection. -type Conn struct { - inChunkSize uint32 - outChunkSize uint32 - nBytesIn uint32 - nBytesInSent uint32 - streamID int32 - serverBW uint32 - clientBW uint32 - clientBW2 uint8 - isPlaying bool - numInvokes int32 - methodCalls []method - channelsAllocatedIn int32 - channelsAllocatedOut int32 - channelsIn []*packet - channelsOut []*packet - channelTimestamp []int32 - deferred []byte - link link - log Log -} - -// link represents RTMP URL and connection information. -type link struct { - host string - playpath string - url string - app string - auth string - flags int32 - protocol int32 - timeout uint - port uint16 - conn *net.TCPConn -} - -// method represents an RTMP method. -type method struct { - name string - num int32 -} - -// Log defines the RTMP logging function. -type Log func(level int8, message string, params ...interface{}) - -// Log levels used by Log. -const ( - DebugLevel int8 = -1 - InfoLevel int8 = 0 - WarnLevel int8 = 1 - ErrorLevel int8 = 2 - FatalLevel int8 = 5 -) - -// flvTagheaderSize is the FLV header size we expect. -// NB: We don't accept extended headers. -const flvTagheaderSize = 11 - -// Dial connects to RTMP server specified by the given URL and returns the connection. -func Dial(url string, timeout uint, log Log) (*Conn, error) { - log(DebugLevel, pkg+"rtmp.Dial") - c := Conn{ - inChunkSize: 128, - outChunkSize: 128, - clientBW: 2500000, - clientBW2: 2, - serverBW: 2500000, - log: log, - link: link{ - timeout: timeout, - }, - } - - var err error - c.link.protocol, c.link.host, c.link.port, c.link.app, c.link.playpath, err = parseURL(url) - if err != nil { - return nil, err - } - if c.link.app == "" { - return nil, errInvalidURL - } - if c.link.port == 0 { - switch { - case (c.link.protocol & featureSSL) != 0: - c.link.port = 433 - c.log(FatalLevel, pkg+"SSL not supported") - case (c.link.protocol & featureHTTP) != 0: - c.link.port = 80 - default: - c.link.port = 1935 - } - } - c.link.url = rtmpProtocolStrings[c.link.protocol] + "://" + c.link.host + ":" + strconv.Itoa(int(c.link.port)) + "/" + c.link.app - c.link.protocol |= featureWrite - - err = connect(&c) - if err != nil { - return nil, err - } - return &c, nil -} - -// Close terminates the RTMP connection. -// NB: Close is idempotent and the connection value is cleared completely. -func (c *Conn) Close() error { - c.log(DebugLevel, pkg+"Conn.Close") - if !c.isConnected() { - return errNotConnected - } - if c.streamID > 0 { - if c.link.protocol&featureWrite != 0 { - sendFCUnpublish(c) - } - sendDeleteStream(c, float64(c.streamID)) - } - c.link.conn.Close() - *c = Conn{} - return nil -} - -// Write writes a frame (flv tag) to the rtmp connection. -func (c *Conn) Write(data []byte) (int, error) { - if !c.isConnected() { - return 0, errNotConnected - } - if len(data) < flvTagheaderSize { - return 0, errInvalidFlvTag - } - if data[0] == packetTypeInfo || (data[0] == 'F' && data[1] == 'L' && data[2] == 'V') { - return 0, errUnimplemented - } - - pkt := packet{ - packetType: data[0], - bodySize: amf.DecodeInt24(data[1:4]), - timestamp: amf.DecodeInt24(data[4:7]) | uint32(data[7])<<24, - channel: chanSource, - info: c.streamID, - } - - pkt.resize(pkt.bodySize, headerSizeAuto) - copy(pkt.body, data[flvTagheaderSize:flvTagheaderSize+pkt.bodySize]) - err := pkt.writeTo(c, false) - if err != nil { - return 0, err - } - return len(data), nil -} - -// I/O functions - -// read from an RTMP connection. Sends a bytes received message if the -// number of bytes received (nBytesIn) is greater than the number sent -// (nBytesInSent) by 10% of the bandwidth. -func (c *Conn) read(buf []byte) (int, error) { - err := c.link.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(c.link.timeout))) - if err != nil { - return 0, err - } - n, err := io.ReadFull(c.link.conn, buf) - if err != nil { - c.log(DebugLevel, pkg+"read failed", "error", err.Error()) - return 0, err - } - c.nBytesIn += uint32(n) - if c.nBytesIn > (c.nBytesInSent + c.clientBW/10) { - err := sendBytesReceived(c) - if err != nil { - return n, err // NB: we still read n bytes, even though send bytes failed - } - } - return n, nil -} - -// write to an RTMP connection. -func (c *Conn) write(buf []byte) (int, error) { - //ToDo: consider using a different timeout for writes than for reads - err := c.link.conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(c.link.timeout))) - if err != nil { - return 0, err - } - n, err := c.link.conn.Write(buf) - if err != nil { - c.log(WarnLevel, pkg+"write failed", "error", err.Error()) - return 0, err - } - return n, nil -} - -// isConnected returns true if the RTMP connection is up. -func (c *Conn) isConnected() bool { - return c.link.conn != nil -} From b77283aa3b272ce9354e7d2847e2b7ae81cf3157 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sat, 19 Jan 2019 16:38:40 +1030 Subject: [PATCH 10/13] packet.header renamed buf. --- rtmp/packet.go | 9 +- rtmp/rtmp.go | 18 ++-- rtmp/session.go | 238 ------------------------------------------------ 3 files changed, 13 insertions(+), 252 deletions(-) delete mode 100644 rtmp/session.go diff --git a/rtmp/packet.go b/rtmp/packet.go index a949cbbc..23671eac 100644 --- a/rtmp/packet.go +++ b/rtmp/packet.go @@ -91,7 +91,7 @@ type packet struct { info int32 bodySize uint32 bytesRead uint32 - header []byte + buf []byte body []byte } @@ -245,9 +245,8 @@ func (pkt *packet) readFrom(c *Conn) error { // resize adjusts the packet's storage to accommodate a body of the given size and header type. // When headerSizeAuto is specified, the header type is computed based on packet type. func (pkt *packet) resize(size uint32, ht uint8) { - buf := make([]byte, fullHeaderSize+size) - pkt.header = buf - pkt.body = buf[fullHeaderSize:] + pkt.buf = make([]byte, fullHeaderSize+size) + pkt.body = pkt.buf[fullHeaderSize:] if ht != headerSizeAuto { pkt.headerType = ht return @@ -317,7 +316,7 @@ func (pkt *packet) writeTo(c *Conn, queue bool) error { // The complete packet starts from headerSize _before_ the start the body. // origIdx is the original offset, which will be 0 for a full (12-byte) header or 11 for a minimum (1-byte) header. - headBytes := pkt.header + headBytes := pkt.buf hSize := headerSizes[pkt.headerType] origIdx := fullHeaderSize - hSize diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index fe4c04d6..86c1f09e 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -261,7 +261,7 @@ func sendConnectPacket(c *Conn) error { channel: chanControl, headerType: headerSizeLarge, packetType: packetTypeInvoke, - header: pbuf[:], + buf: pbuf[:], body: pbuf[fullHeaderSize:], } enc := pkt.body @@ -318,7 +318,7 @@ func sendCreateStream(c *Conn) error { channel: chanControl, headerType: headerSizeMedium, packetType: packetTypeInvoke, - header: pbuf[:], + buf: pbuf[:], body: pbuf[fullHeaderSize:], } enc := pkt.body @@ -346,7 +346,7 @@ func sendReleaseStream(c *Conn) error { channel: chanControl, headerType: headerSizeMedium, packetType: packetTypeInvoke, - header: pbuf[:], + buf: pbuf[:], body: pbuf[fullHeaderSize:], } enc := pkt.body @@ -377,7 +377,7 @@ func sendFCPublish(c *Conn) error { channel: chanControl, headerType: headerSizeMedium, packetType: packetTypeInvoke, - header: pbuf[:], + buf: pbuf[:], body: pbuf[fullHeaderSize:], } enc := pkt.body @@ -409,7 +409,7 @@ func sendFCUnpublish(c *Conn) error { channel: chanControl, headerType: headerSizeMedium, packetType: packetTypeInvoke, - header: pbuf[:], + buf: pbuf[:], body: pbuf[fullHeaderSize:], } enc := pkt.body @@ -441,7 +441,7 @@ func sendPublish(c *Conn) error { channel: chanSource, headerType: headerSizeLarge, packetType: packetTypeInvoke, - header: pbuf[:], + buf: pbuf[:], body: pbuf[fullHeaderSize:], } enc := pkt.body @@ -477,7 +477,7 @@ func sendDeleteStream(c *Conn, dStreamId float64) error { channel: chanControl, headerType: headerSizeMedium, packetType: packetTypeInvoke, - header: pbuf[:], + buf: pbuf[:], body: pbuf[fullHeaderSize:], } enc := pkt.body @@ -509,7 +509,7 @@ func sendBytesReceived(c *Conn) error { channel: chanBytesRead, headerType: headerSizeMedium, packetType: packetTypeBytesReadReport, - header: pbuf[:], + buf: pbuf[:], body: pbuf[fullHeaderSize:], } enc := pkt.body @@ -531,7 +531,7 @@ func sendCheckBW(c *Conn) error { channel: chanControl, headerType: headerSizeLarge, packetType: packetTypeInvoke, - header: pbuf[:], + buf: pbuf[:], body: pbuf[fullHeaderSize:], } enc := pkt.body diff --git a/rtmp/session.go b/rtmp/session.go deleted file mode 100644 index 1d43c2a4..00000000 --- a/rtmp/session.go +++ /dev/null @@ -1,238 +0,0 @@ -/* -NAME - conn.go - -DESCRIPTION - RTMP connection functionality. - -AUTHORS - Saxon Nelson-Milton - Dan Kortschak - Alan Noble - -LICENSE - conn.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) - - It is free software: you can redistribute it and/or modify them - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - It is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. - - You should have received a copy of the GNU General Public License - along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. - - Derived from librtmp under the GNU Lesser General Public License 2.1 - Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org - Copyright (C) 2008-2009 Andrej Stepanchuk - Copyright (C) 2009-2010 Howard Chu -*/ -package rtmp - -import ( - "io" - "net" - "strconv" - "time" - - "bitbucket.org/ausocean/av/rtmp/amf" -) - -// Conn represents an RTMP connection. -type Conn struct { - inChunkSize uint32 - outChunkSize uint32 - nBytesIn uint32 - nBytesInSent uint32 - streamID int32 - serverBW uint32 - clientBW uint32 - clientBW2 uint8 - isPlaying bool - numInvokes int32 - methodCalls []method - channelsAllocatedIn int32 - channelsAllocatedOut int32 - channelsIn []*packet - channelsOut []*packet - channelTimestamp []int32 - deferred []byte - link link - log Log -} - -// link represents RTMP URL and connection information. -type link struct { - host string - playpath string - url string - app string - auth string - flags int32 - protocol int32 - timeout uint - port uint16 - conn *net.TCPConn -} - -// method represents an RTMP method. -type method struct { - name string - num int32 -} - -// Log defines the RTMP logging function. -type Log func(level int8, message string, params ...interface{}) - -// Log levels used by Log. -const ( - DebugLevel int8 = -1 - InfoLevel int8 = 0 - WarnLevel int8 = 1 - ErrorLevel int8 = 2 - FatalLevel int8 = 5 -) - -// flvTagheaderSize is the FLV header size we expect. -// NB: We don't accept extended headers. -const flvTagheaderSize = 11 - -// Dial connects to RTMP server specified by the given URL and returns the connection. -func Dial(url string, timeout uint, log Log) (*Conn, error) { - log(DebugLevel, pkg+"rtmp.Dial") - c := Conn{ - inChunkSize: 128, - outChunkSize: 128, - clientBW: 2500000, - clientBW2: 2, - serverBW: 2500000, - log: log, - link: link{ - timeout: timeout, - }, - } - - var err error - c.link.protocol, c.link.host, c.link.port, c.link.app, c.link.playpath, err = parseURL(url) - if err != nil { - return nil, err - } - if c.link.app == "" { - return nil, errInvalidURL - } - if c.link.port == 0 { - switch { - case (c.link.protocol & featureSSL) != 0: - c.link.port = 433 - c.log(FatalLevel, pkg+"SSL not supported") - case (c.link.protocol & featureHTTP) != 0: - c.link.port = 80 - default: - c.link.port = 1935 - } - } - c.link.url = rtmpProtocolStrings[c.link.protocol] + "://" + c.link.host + ":" + strconv.Itoa(int(c.link.port)) + "/" + c.link.app - c.link.protocol |= featureWrite - - err = connect(&c) - if err != nil { - return nil, err - } - return &c, nil -} - -// Close terminates the RTMP connection. -// NB: Close is idempotent and the connection value is cleared completely. -func (c *Conn) Close() error { - c.log(DebugLevel, pkg+"Conn.Close") - if !c.isConnected() { - return errNotConnected - } - if c.streamID > 0 { - if c.link.protocol&featureWrite != 0 { - sendFCUnpublish(c) - } - sendDeleteStream(c, float64(c.streamID)) - } - c.link.conn.Close() - *c = Conn{} - return nil -} - -// Write writes a frame (flv tag) to the rtmp connection. -func (c *Conn) Write(data []byte) (int, error) { - if !c.isConnected() { - return 0, errNotConnected - } - if len(data) < flvTagheaderSize { - return 0, errInvalidFlvTag - } - if data[0] == packetTypeInfo || (data[0] == 'F' && data[1] == 'L' && data[2] == 'V') { - return 0, errUnimplemented - } - - pkt := packet{ - packetType: data[0], - bodySize: amf.DecodeInt24(data[1:4]), - timestamp: amf.DecodeInt24(data[4:7]) | uint32(data[7])<<24, - channel: chanSource, - info: c.streamID, - } - - pkt.resize(pkt.bodySize, headerSizeAuto) - copy(pkt.body, data[flvTagheaderSize:flvTagheaderSize+pkt.bodySize]) - err := pkt.writeTo(c, false) - if err != nil { - return 0, err - } - return len(data), nil -} - -// I/O functions - -// read from an RTMP connection. Sends a bytes received message if the -// number of bytes received (nBytesIn) is greater than the number sent -// (nBytesInSent) by 10% of the bandwidth. -func (c *Conn) read(buf []byte) (int, error) { - err := c.link.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(c.link.timeout))) - if err != nil { - return 0, err - } - n, err := io.ReadFull(c.link.conn, buf) - if err != nil { - c.log(DebugLevel, pkg+"read failed", "error", err.Error()) - return 0, err - } - c.nBytesIn += uint32(n) - if c.nBytesIn > (c.nBytesInSent + c.clientBW/10) { - err := sendBytesReceived(c) - if err != nil { - return n, err // NB: we still read n bytes, even though send bytes failed - } - } - return n, nil -} - -// write to an RTMP connection. -func (c *Conn) write(buf []byte) (int, error) { - //ToDo: consider using a different timeout for writes than for reads - err := c.link.conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(c.link.timeout))) - if err != nil { - return 0, err - } - n, err := c.link.conn.Write(buf) - if err != nil { - c.log(WarnLevel, pkg+"write failed", "error", err.Error()) - return 0, err - } - return n, nil -} - -// isConnected returns true if the RTMP connection is up. -func (c *Conn) isConnected() bool { - return c.link.conn != nil -} From 45f5817307ee3588b3b0b5707f49d2759687fb81 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sat, 19 Jan 2019 16:41:22 +1030 Subject: [PATCH 11/13] headBytes renamed buf. --- rtmp/packet.go | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/rtmp/packet.go b/rtmp/packet.go index 23671eac..eb2c8aa9 100644 --- a/rtmp/packet.go +++ b/rtmp/packet.go @@ -316,7 +316,7 @@ func (pkt *packet) writeTo(c *Conn, queue bool) error { // The complete packet starts from headerSize _before_ the start the body. // origIdx is the original offset, which will be 0 for a full (12-byte) header or 11 for a minimum (1-byte) header. - headBytes := pkt.buf + buf := pkt.buf hSize := headerSizes[pkt.headerType] origIdx := fullHeaderSize - hSize @@ -356,16 +356,16 @@ func (pkt *packet) writeTo(c *Conn, queue bool) error { case 2: ch |= 1 } - headBytes[headerIdx] = ch + buf[headerIdx] = ch headerIdx++ if cSize != 0 { tmp := pkt.channel - 64 - headBytes[headerIdx] = byte(tmp & 0xff) + buf[headerIdx] = byte(tmp & 0xff) headerIdx++ if cSize == 2 { - headBytes[headerIdx] = byte(tmp >> 8) + buf[headerIdx] = byte(tmp >> 8) headerIdx++ } } @@ -375,24 +375,24 @@ func (pkt *packet) writeTo(c *Conn, queue bool) error { if ts > 0xffffff { tmp = 0xffffff } - amf.EncodeInt24(headBytes[headerIdx:], tmp) + amf.EncodeInt24(buf[headerIdx:], tmp) headerIdx += 3 // 24bits } if headerSizes[pkt.headerType] > 4 { - amf.EncodeInt24(headBytes[headerIdx:], pkt.bodySize) + amf.EncodeInt24(buf[headerIdx:], pkt.bodySize) headerIdx += 3 // 24bits - headBytes[headerIdx] = pkt.packetType + buf[headerIdx] = pkt.packetType headerIdx++ } if headerSizes[pkt.headerType] > 8 { - binary.LittleEndian.PutUint32(headBytes[headerIdx:headerIdx+4], uint32(pkt.info)) + binary.LittleEndian.PutUint32(buf[headerIdx:headerIdx+4], uint32(pkt.info)) headerIdx += 4 // 32bits } if ts >= 0xffffff { - amf.EncodeInt32(headBytes[headerIdx:], ts) + amf.EncodeInt32(buf[headerIdx:], ts) headerIdx += 4 // 32bits } @@ -402,7 +402,7 @@ func (pkt *packet) writeTo(c *Conn, queue bool) error { if c.deferred == nil { // Defer sending small audio packets (at most once). if pkt.packetType == packetTypeAudio && size < chunkSize { - c.deferred = headBytes[origIdx:][:size+hSize] + c.deferred = buf[origIdx:][:size+hSize] c.log(DebugLevel, pkg+"deferred sending packet", "size", size, "la", c.link.conn.LocalAddr(), "ra", c.link.conn.RemoteAddr()) return nil } @@ -424,7 +424,7 @@ func (pkt *packet) writeTo(c *Conn, queue bool) error { if chunkSize > size { chunkSize = size } - bytes := headBytes[origIdx:][:chunkSize+hSize] + bytes := buf[origIdx:][:chunkSize+hSize] if c.deferred != nil { // Prepend the previously deferred packet and write it with the current one. c.log(DebugLevel, pkg+"combining deferred packet", "size", len(c.deferred)) @@ -450,18 +450,18 @@ func (pkt *packet) writeTo(c *Conn, queue bool) error { hSize += 4 } - headBytes[origIdx] = 0xc0 | ch + buf[origIdx] = 0xc0 | ch if cSize != 0 { tmp := int(pkt.channel) - 64 - headBytes[origIdx+1] = byte(tmp) + buf[origIdx+1] = byte(tmp) if cSize == 2 { - headBytes[origIdx+2] = byte(tmp >> 8) + buf[origIdx+2] = byte(tmp >> 8) } } if ts >= 0xffffff { - extendedTimestamp := headBytes[origIdx+1+cSize:] + extendedTimestamp := buf[origIdx+1+cSize:] amf.EncodeInt32(extendedTimestamp[:4], ts) } } From de07f1ae408628b326236b7f042ac972f94d9c92 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sat, 19 Jan 2019 17:20:34 +1030 Subject: [PATCH 12/13] dStreamId renamed streamID. --- rtmp/rtmp.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index 86c1f09e..bfee3e37 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -471,7 +471,7 @@ func sendPublish(c *Conn) error { return pkt.writeTo(c, true) // response expected } -func sendDeleteStream(c *Conn, dStreamId float64) error { +func sendDeleteStream(c *Conn, streamID float64) error { var pbuf [256]byte pkt := packet{ channel: chanControl, @@ -493,7 +493,7 @@ func sendDeleteStream(c *Conn, dStreamId float64) error { } enc[0] = amf.TypeNull enc = enc[1:] - enc, err = amf.EncodeNumber(enc, dStreamId) + enc, err = amf.EncodeNumber(enc, streamID) if err != nil { return err } @@ -620,7 +620,7 @@ func handleInvoke(c *Conn, body []byte) error { if err != nil { return err } - c.streamID = int32(n) + c.streamID = uint32(n) err = sendPublish(c) if err != nil { return err From 6959243d505520aa7aee9019ed27a9f62ba93137 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sat, 19 Jan 2019 17:51:34 +1030 Subject: [PATCH 13/13] packet.info renamed streamID and made a uint32 along with Conn.streamID. --- rtmp/conn.go | 4 ++-- rtmp/packet.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/rtmp/conn.go b/rtmp/conn.go index 1d43c2a4..2554d092 100644 --- a/rtmp/conn.go +++ b/rtmp/conn.go @@ -48,7 +48,7 @@ type Conn struct { outChunkSize uint32 nBytesIn uint32 nBytesInSent uint32 - streamID int32 + streamID uint32 serverBW uint32 clientBW uint32 clientBW2 uint8 @@ -180,7 +180,7 @@ func (c *Conn) Write(data []byte) (int, error) { bodySize: amf.DecodeInt24(data[1:4]), timestamp: amf.DecodeInt24(data[4:7]) | uint32(data[7])<<24, channel: chanSource, - info: c.streamID, + streamID: c.streamID, } pkt.resize(pkt.bodySize, headerSizeAuto) diff --git a/rtmp/packet.go b/rtmp/packet.go index eb2c8aa9..3cf18f14 100644 --- a/rtmp/packet.go +++ b/rtmp/packet.go @@ -88,7 +88,7 @@ type packet struct { channel int32 hasAbsTimestamp bool timestamp uint32 - info int32 + streamID uint32 bodySize uint32 bytesRead uint32 buf []byte @@ -184,7 +184,7 @@ func (pkt *packet) readFrom(c *Conn) error { if size > 6 { pkt.packetType = header[6] if size == 11 { - pkt.info = int32(amf.DecodeInt32LE(header[7:11])) + pkt.streamID = amf.DecodeInt32LE(header[7:11]) } } } @@ -387,7 +387,7 @@ func (pkt *packet) writeTo(c *Conn, queue bool) error { } if headerSizes[pkt.headerType] > 8 { - binary.LittleEndian.PutUint32(buf[headerIdx:headerIdx+4], uint32(pkt.info)) + binary.LittleEndian.PutUint32(buf[headerIdx:headerIdx+4], pkt.streamID) headerIdx += 4 // 32bits }