mirror of https://bitbucket.org/ausocean/av.git
protocol/rtmp: improving errors
This commit is contained in:
parent
3db7d1b1f3
commit
92155b376e
|
@ -138,14 +138,14 @@ func Dial(url string, log Log, options ...func(*Conn) error) (*Conn, error) {
|
||||||
var err error
|
var err error
|
||||||
c.link.protocol, c.link.host, c.link.port, c.link.app, c.link.playpath, err = parseURL(url)
|
c.link.protocol, c.link.host, c.link.port, c.link.app, c.link.playpath, err = parseURL(url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, fmt.Errorf("could not parse url: %w",err)
|
||||||
}
|
}
|
||||||
c.link.url = rtmpProtocolStrings[c.link.protocol] + "://" + c.link.host + ":" + strconv.Itoa(int(c.link.port)) + "/" + c.link.app
|
c.link.url = rtmpProtocolStrings[c.link.protocol] + "://" + c.link.host + ":" + strconv.Itoa(int(c.link.port)) + "/" + c.link.app
|
||||||
c.link.protocol |= featureWrite
|
c.link.protocol |= featureWrite
|
||||||
|
|
||||||
err = connect(&c)
|
err = connect(&c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, fmt.Errorf("could not connect: %w",err)
|
||||||
}
|
}
|
||||||
return &c, nil
|
return &c, nil
|
||||||
}
|
}
|
||||||
|
@ -159,11 +159,20 @@ func (c *Conn) Close() error {
|
||||||
c.log(DebugLevel, pkg+"Conn.Close")
|
c.log(DebugLevel, pkg+"Conn.Close")
|
||||||
if c.streamID > 0 {
|
if c.streamID > 0 {
|
||||||
if c.link.protocol&featureWrite != 0 {
|
if c.link.protocol&featureWrite != 0 {
|
||||||
sendFCUnpublish(c)
|
err := sendFCUnpublish(c)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not send fc unpublish: %w",err)
|
||||||
}
|
}
|
||||||
sendDeleteStream(c, float64(c.streamID))
|
|
||||||
}
|
}
|
||||||
c.link.conn.Close()
|
err := sendDeleteStream(c, float64(c.streamID))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not send delete stream: %w",err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
err := c.link.conn.Close()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not close link conn: %w",err)
|
||||||
|
}
|
||||||
*c = Conn{}
|
*c = Conn{}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -192,7 +201,7 @@ func (c *Conn) Write(data []byte) (int, error) {
|
||||||
copy(pkt.body, data[flvTagheaderSize:flvTagheaderSize+pkt.bodySize])
|
copy(pkt.body, data[flvTagheaderSize:flvTagheaderSize+pkt.bodySize])
|
||||||
err := pkt.writeTo(c, false)
|
err := pkt.writeTo(c, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, fmt.Errorf("could not write packet to connection: %w",err)
|
||||||
}
|
}
|
||||||
return len(data), nil
|
return len(data), nil
|
||||||
}
|
}
|
||||||
|
@ -205,18 +214,18 @@ func (c *Conn) Write(data []byte) (int, error) {
|
||||||
func (c *Conn) read(buf []byte) (int, error) {
|
func (c *Conn) read(buf []byte) (int, error) {
|
||||||
err := c.link.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(c.link.timeout)))
|
err := c.link.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(c.link.timeout)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, fmt.Errorf("could not set read deadline: %w",err)
|
||||||
}
|
}
|
||||||
n, err := io.ReadFull(c.link.conn, buf)
|
n, err := io.ReadFull(c.link.conn, buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log(DebugLevel, pkg+"read failed", "error", err.Error())
|
c.log(DebugLevel, pkg+"read failed", "error", err.Error())
|
||||||
return 0, err
|
return 0, fmt.Errorf("could not read conn: %w",err)
|
||||||
}
|
}
|
||||||
c.nBytesIn += uint32(n)
|
c.nBytesIn += uint32(n)
|
||||||
if c.nBytesIn > (c.nBytesInSent + c.clientBW/10) {
|
if c.nBytesIn > (c.nBytesInSent + c.clientBW/10) {
|
||||||
err := sendBytesReceived(c)
|
err := sendBytesReceived(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return n, err // NB: we still read n bytes, even though send bytes failed
|
return n, fmt.Errorf("could not send bytes received: %w",err) // NB: we still read n bytes, even though send bytes failed
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return n, nil
|
return n, nil
|
||||||
|
@ -227,12 +236,12 @@ func (c *Conn) write(buf []byte) (int, error) {
|
||||||
//ToDo: consider using a different timeout for writes than for reads
|
//ToDo: consider using a different timeout for writes than for reads
|
||||||
err := c.link.conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(c.link.timeout)))
|
err := c.link.conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(c.link.timeout)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, fmt.Errorf("could not set write deadline: %w",err)
|
||||||
}
|
}
|
||||||
n, err := c.link.conn.Write(buf)
|
n, err := c.link.conn.Write(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log(WarnLevel, pkg+"write failed", "error", err.Error())
|
c.log(WarnLevel, pkg+"write failed", "error", err.Error())
|
||||||
return 0, err
|
return 0, fmt.Errorf("could not write to conn: %w",err)
|
||||||
}
|
}
|
||||||
return n, nil
|
return n, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,6 +36,7 @@ package rtmp
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
"bitbucket.org/ausocean/av/protocol/rtmp/amf"
|
"bitbucket.org/ausocean/av/protocol/rtmp/amf"
|
||||||
|
@ -105,7 +106,7 @@ func (pkt *packet) readFrom(c *Conn) error {
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
c.log(WarnLevel, pkg+"EOF error; connection likely terminated")
|
c.log(WarnLevel, pkg+"EOF error; connection likely terminated")
|
||||||
}
|
}
|
||||||
return err
|
return fmt.Errorf("failed to read packet header 1st byte: %w", err)
|
||||||
}
|
}
|
||||||
pkt.headerType = (header[0] & 0xc0) >> 6
|
pkt.headerType = (header[0] & 0xc0) >> 6
|
||||||
pkt.channel = int32(header[0] & 0x3f)
|
pkt.channel = int32(header[0] & 0x3f)
|
||||||
|
@ -116,7 +117,7 @@ func (pkt *packet) readFrom(c *Conn) error {
|
||||||
_, err = c.read(header[:1])
|
_, err = c.read(header[:1])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log(DebugLevel, pkg+"failed to read packet header 2nd byte", "error", err.Error())
|
c.log(DebugLevel, pkg+"failed to read packet header 2nd byte", "error", err.Error())
|
||||||
return err
|
return fmt.Errorf("failed to read packet header second byte: %w", err)
|
||||||
}
|
}
|
||||||
header = header[1:]
|
header = header[1:]
|
||||||
pkt.channel = int32(header[0]) + 64
|
pkt.channel = int32(header[0]) + 64
|
||||||
|
@ -125,7 +126,7 @@ func (pkt *packet) readFrom(c *Conn) error {
|
||||||
_, err = c.read(header[:2])
|
_, err = c.read(header[:2])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log(DebugLevel, pkg+"failed to read packet header 3rd byte", "error", err.Error())
|
c.log(DebugLevel, pkg+"failed to read packet header 3rd byte", "error", err.Error())
|
||||||
return err
|
return fmt.Errorf("failed to read packet header 3rd byte: %w", err)
|
||||||
}
|
}
|
||||||
header = header[2:]
|
header = header[2:]
|
||||||
pkt.channel = int32(binary.BigEndian.Uint16(header[:2])) + 64
|
pkt.channel = int32(binary.BigEndian.Uint16(header[:2])) + 64
|
||||||
|
@ -169,7 +170,7 @@ func (pkt *packet) readFrom(c *Conn) error {
|
||||||
_, err = c.read(header[:size])
|
_, err = c.read(header[:size])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log(DebugLevel, pkg+"failed to read packet header", "error", err.Error())
|
c.log(DebugLevel, pkg+"failed to read packet header", "error", err.Error())
|
||||||
return err
|
return fmt.Errorf("failed to read packet header: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
hSize := len(hbuf) - len(header) + size
|
hSize := len(hbuf) - len(header) + size
|
||||||
|
@ -193,7 +194,7 @@ func (pkt *packet) readFrom(c *Conn) error {
|
||||||
_, err = c.read(header[size : size+4])
|
_, err = c.read(header[size : size+4])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log(DebugLevel, pkg+"failed to read extended timestamp", "error", err.Error())
|
c.log(DebugLevel, pkg+"failed to read extended timestamp", "error", err.Error())
|
||||||
return err
|
return fmt.Errorf("failed to read extended timestamp: %w", err)
|
||||||
}
|
}
|
||||||
pkt.timestamp = amf.DecodeInt32(header[size : size+4])
|
pkt.timestamp = amf.DecodeInt32(header[size : size+4])
|
||||||
hSize += 4
|
hSize += 4
|
||||||
|
@ -208,7 +209,7 @@ func (pkt *packet) readFrom(c *Conn) error {
|
||||||
_, err = c.read(pkt.body[:pkt.bodySize])
|
_, err = c.read(pkt.body[:pkt.bodySize])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log(DebugLevel, pkg+"failed to read packet body", "error", err.Error())
|
c.log(DebugLevel, pkg+"failed to read packet body", "error", err.Error())
|
||||||
return err
|
return fmt.Errorf("failed to read packet body: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Keep the packet as a reference for other packets on this channel.
|
// Keep the packet as a reference for other packets on this channel.
|
||||||
|
@ -404,7 +405,7 @@ func (pkt *packet) writeTo(c *Conn, queue bool) error {
|
||||||
c.log(DebugLevel, pkg+"sending deferred packet separately", "size", len(c.deferred))
|
c.log(DebugLevel, pkg+"sending deferred packet separately", "size", len(c.deferred))
|
||||||
_, err := c.write(c.deferred)
|
_, err := c.write(c.deferred)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not write deferred packet: %w", err)
|
||||||
}
|
}
|
||||||
c.deferred = nil
|
c.deferred = nil
|
||||||
}
|
}
|
||||||
|
@ -424,7 +425,7 @@ func (pkt *packet) writeTo(c *Conn, queue bool) error {
|
||||||
}
|
}
|
||||||
_, err := c.write(bytes)
|
_, err := c.write(bytes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not write combined packet: %w", err)
|
||||||
}
|
}
|
||||||
c.deferred = nil
|
c.deferred = nil
|
||||||
|
|
||||||
|
|
|
@ -34,17 +34,25 @@ LICENSE
|
||||||
package rtmp
|
package rtmp
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
"net/url"
|
"net/url"
|
||||||
"path"
|
"path"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Errors.
|
||||||
|
var (
|
||||||
|
errInvalidPath = errors.New("invalid url path")
|
||||||
|
errInvalidElements = errors.New("invalid url elements")
|
||||||
|
)
|
||||||
|
|
||||||
// parseURL parses an RTMP URL (ok, technically it is lexing).
|
// parseURL parses an RTMP URL (ok, technically it is lexing).
|
||||||
func parseURL(addr string) (protocol int32, host string, port uint16, app, playpath string, err error) {
|
func parseURL(addr string) (protocol int32, host string, port uint16, app, playpath string, err error) {
|
||||||
u, err := url.Parse(addr)
|
u, err := url.Parse(addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return protocol, host, port, app, playpath, err
|
return protocol, host, port, app, playpath, fmt.Errorf("could not parse to url value: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
switch u.Scheme {
|
switch u.Scheme {
|
||||||
|
@ -63,24 +71,24 @@ func parseURL(addr string) (protocol int32, host string, port uint16, app, playp
|
||||||
case "rtmpts":
|
case "rtmpts":
|
||||||
protocol = protoRTMPTS
|
protocol = protoRTMPTS
|
||||||
default:
|
default:
|
||||||
return protocol, host, port, app, playpath, errUnknownScheme
|
return protocol, host, port, app, playpath, fmt.Errorf("unknown scheme: %s", u.Scheme)
|
||||||
}
|
}
|
||||||
|
|
||||||
host = u.Host
|
host = u.Host
|
||||||
if p := u.Port(); p != "" {
|
if p := u.Port(); p != "" {
|
||||||
pi, err := strconv.Atoi(p)
|
pi, err := strconv.Atoi(p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return protocol, host, port, app, playpath, err
|
return protocol, host, port, app, playpath, fmt.Errorf("could convert port to integer: %w", err)
|
||||||
}
|
}
|
||||||
port = uint16(pi)
|
port = uint16(pi)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(u.Path) < 1 || !path.IsAbs(u.Path) {
|
if len(u.Path) < 1 || !path.IsAbs(u.Path) {
|
||||||
return protocol, host, port, app, playpath, errInvalidURL
|
return protocol, host, port, app, playpath, errInvalidPath
|
||||||
}
|
}
|
||||||
elems := strings.SplitN(u.Path[1:], "/", 3)
|
elems := strings.SplitN(u.Path[1:], "/", 3)
|
||||||
if len(elems) < 2 || elems[0] == "" || elems[1] == "" {
|
if len(elems) < 2 || elems[0] == "" || elems[1] == "" {
|
||||||
return protocol, host, port, app, playpath, errInvalidURL
|
return protocol, host, port, app, playpath, errInvalidElements
|
||||||
}
|
}
|
||||||
app = elems[0]
|
app = elems[0]
|
||||||
playpath = path.Join(elems[1:]...)
|
playpath = path.Join(elems[1:]...)
|
||||||
|
@ -106,7 +114,7 @@ func parseURL(addr string) (protocol int32, host string, port uint16, app, playp
|
||||||
switch {
|
switch {
|
||||||
case port != 0:
|
case port != 0:
|
||||||
case (protocol & featureSSL) != 0:
|
case (protocol & featureSSL) != 0:
|
||||||
return protocol, host, port, app, playpath, errUnimplemented // port = 433
|
return protocol, host, port, app, playpath, errors.New("ssl not implemented")
|
||||||
case (protocol & featureHTTP) != 0:
|
case (protocol & featureHTTP) != 0:
|
||||||
port = 80
|
port = 80
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -38,6 +38,7 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net"
|
"net"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
@ -164,14 +165,15 @@ var (
|
||||||
|
|
||||||
// connect establishes an RTMP connection.
|
// connect establishes an RTMP connection.
|
||||||
func connect(c *Conn) error {
|
func connect(c *Conn) error {
|
||||||
addr, err := net.ResolveTCPAddr("tcp4", c.link.host+":"+strconv.Itoa(int(c.link.port)))
|
addrStr := c.link.host + ":" + strconv.Itoa(int(c.link.port))
|
||||||
|
addr, err := net.ResolveTCPAddr("tcp4", addrStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not resolve tcp address (%s):%w", addrStr, err)
|
||||||
}
|
}
|
||||||
c.link.conn, err = net.DialTCP("tcp4", nil, addr)
|
c.link.conn, err = net.DialTCP("tcp4", nil, addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log(WarnLevel, pkg+"dial failed", "error", err.Error())
|
c.log(WarnLevel, pkg+"dial failed", "error", err.Error())
|
||||||
return err
|
return fmt.Errorf("could not dial tcp: %w", err)
|
||||||
}
|
}
|
||||||
c.log(DebugLevel, pkg+"connected")
|
c.log(DebugLevel, pkg+"connected")
|
||||||
|
|
||||||
|
@ -184,13 +186,13 @@ func connect(c *Conn) error {
|
||||||
err = handshake(c)
|
err = handshake(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log(WarnLevel, pkg+"handshake failed", "error", err.Error())
|
c.log(WarnLevel, pkg+"handshake failed", "error", err.Error())
|
||||||
return err
|
return fmt.Errorf("could not handshake: %w", err)
|
||||||
}
|
}
|
||||||
c.log(DebugLevel, pkg+"handshaked")
|
c.log(DebugLevel, pkg+"handshaked")
|
||||||
err = sendConnectPacket(c)
|
err = sendConnectPacket(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log(WarnLevel, pkg+"sendConnect failed", "error", err.Error())
|
c.log(WarnLevel, pkg+"sendConnect failed", "error", err.Error())
|
||||||
return err
|
return fmt.Errorf("could not send connect packet: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.log(DebugLevel, pkg+"negotiating")
|
c.log(DebugLevel, pkg+"negotiating")
|
||||||
|
@ -199,7 +201,7 @@ func connect(c *Conn) error {
|
||||||
pkt := packet{buf: buf[:]}
|
pkt := packet{buf: buf[:]}
|
||||||
err = pkt.readFrom(c)
|
err = pkt.readFrom(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not read from packet: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
switch pkt.packetType {
|
switch pkt.packetType {
|
||||||
|
@ -208,7 +210,7 @@ func connect(c *Conn) error {
|
||||||
default:
|
default:
|
||||||
err = handlePacket(c, &pkt)
|
err = handlePacket(c, &pkt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not handle packet: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -248,7 +250,7 @@ func handlePacket(c *Conn, pkt *packet) error {
|
||||||
err := handleInvoke(c, pkt.body[:pkt.bodySize])
|
err := handleInvoke(c, pkt.body[:pkt.bodySize])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log(WarnLevel, pkg+"unexpected error from handleInvoke", "error", err.Error())
|
c.log(WarnLevel, pkg+"unexpected error from handleInvoke", "error", err.Error())
|
||||||
return err
|
return fmt.Errorf("could not handle invoke: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
case packetTypeControl, packetTypeAudio, packetTypeVideo, packetTypeFlashVideo, packetTypeFlexMessage, packetTypeInfo:
|
case packetTypeControl, packetTypeAudio, packetTypeVideo, packetTypeFlashVideo, packetTypeFlexMessage, packetTypeInfo:
|
||||||
|
@ -273,12 +275,12 @@ func sendConnectPacket(c *Conn) error {
|
||||||
|
|
||||||
enc, err := amf.EncodeString(enc, avConnect)
|
enc, err := amf.EncodeString(enc, avConnect)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not encode string: %w", err)
|
||||||
}
|
}
|
||||||
c.numInvokes += 1
|
c.numInvokes += 1
|
||||||
enc, err = amf.EncodeNumber(enc, float64(c.numInvokes))
|
enc, err = amf.EncodeNumber(enc, float64(c.numInvokes))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not encode number of invokes: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// required link info
|
// required link info
|
||||||
|
@ -289,24 +291,29 @@ func sendConnectPacket(c *Conn) error {
|
||||||
}
|
}
|
||||||
enc, err = amf.Encode(&info, enc)
|
enc, err = amf.Encode(&info, enc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not encode info: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// optional link auth info
|
// optional link auth info
|
||||||
if c.link.auth != "" {
|
if c.link.auth != "" {
|
||||||
enc, err = amf.EncodeBoolean(enc, c.link.flags&linkAuth != 0)
|
enc, err = amf.EncodeBoolean(enc, c.link.flags&linkAuth != 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not encode link auth bool: %w", err)
|
||||||
}
|
}
|
||||||
enc, err = amf.EncodeString(enc, c.link.auth)
|
enc, err = amf.EncodeString(enc, c.link.auth)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not encode link auth string: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
|
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
|
||||||
|
|
||||||
return pkt.writeTo(c, true) // response expected
|
err = pkt.writeTo(c, true)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not write packet: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func sendCreateStream(c *Conn) error {
|
func sendCreateStream(c *Conn) error {
|
||||||
|
@ -322,19 +329,23 @@ func sendCreateStream(c *Conn) error {
|
||||||
|
|
||||||
enc, err := amf.EncodeString(enc, avCreatestream)
|
enc, err := amf.EncodeString(enc, avCreatestream)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not encode av create stream token: %w", err)
|
||||||
}
|
}
|
||||||
c.numInvokes++
|
c.numInvokes++
|
||||||
enc, err = amf.EncodeNumber(enc, float64(c.numInvokes))
|
enc, err = amf.EncodeNumber(enc, float64(c.numInvokes))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not encode number of invokes: %w", err)
|
||||||
}
|
}
|
||||||
enc[0] = amf.TypeNull
|
enc[0] = amf.TypeNull
|
||||||
enc = enc[1:]
|
enc = enc[1:]
|
||||||
|
|
||||||
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
|
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
|
||||||
|
|
||||||
return pkt.writeTo(c, true) // response expected
|
err = pkt.writeTo(c, true)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not write packet: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func sendReleaseStream(c *Conn) error {
|
func sendReleaseStream(c *Conn) error {
|
||||||
|
@ -350,22 +361,27 @@ func sendReleaseStream(c *Conn) error {
|
||||||
|
|
||||||
enc, err := amf.EncodeString(enc, avReleasestream)
|
enc, err := amf.EncodeString(enc, avReleasestream)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not encode av release stream token: %w", err)
|
||||||
}
|
}
|
||||||
c.numInvokes++
|
c.numInvokes++
|
||||||
enc, err = amf.EncodeNumber(enc, float64(c.numInvokes))
|
enc, err = amf.EncodeNumber(enc, float64(c.numInvokes))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not encode number of invokes: %w", err)
|
||||||
}
|
}
|
||||||
enc[0] = amf.TypeNull
|
enc[0] = amf.TypeNull
|
||||||
enc = enc[1:]
|
enc = enc[1:]
|
||||||
enc, err = amf.EncodeString(enc, c.link.playpath)
|
enc, err = amf.EncodeString(enc, c.link.playpath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not encode playpath: %w", err)
|
||||||
}
|
}
|
||||||
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
|
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
|
||||||
|
|
||||||
return pkt.writeTo(c, false)
|
err = pkt.writeTo(c, false)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not write packet: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func sendFCPublish(c *Conn) error {
|
func sendFCPublish(c *Conn) error {
|
||||||
|
@ -381,23 +397,28 @@ func sendFCPublish(c *Conn) error {
|
||||||
|
|
||||||
enc, err := amf.EncodeString(enc, avFCPublish)
|
enc, err := amf.EncodeString(enc, avFCPublish)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not encode av fc publish token: %w", err)
|
||||||
}
|
}
|
||||||
c.numInvokes++
|
c.numInvokes++
|
||||||
enc, err = amf.EncodeNumber(enc, float64(c.numInvokes))
|
enc, err = amf.EncodeNumber(enc, float64(c.numInvokes))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not encode number of invokes: %w", err)
|
||||||
}
|
}
|
||||||
enc[0] = amf.TypeNull
|
enc[0] = amf.TypeNull
|
||||||
enc = enc[1:]
|
enc = enc[1:]
|
||||||
enc, err = amf.EncodeString(enc, c.link.playpath)
|
enc, err = amf.EncodeString(enc, c.link.playpath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not encode playpath: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
|
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
|
||||||
|
|
||||||
return pkt.writeTo(c, false)
|
err = pkt.writeTo(c, false)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not write packet: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func sendFCUnpublish(c *Conn) error {
|
func sendFCUnpublish(c *Conn) error {
|
||||||
|
@ -413,23 +434,28 @@ func sendFCUnpublish(c *Conn) error {
|
||||||
|
|
||||||
enc, err := amf.EncodeString(enc, avFCUnpublish)
|
enc, err := amf.EncodeString(enc, avFCUnpublish)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not encode av fc unpublish token: %w", err)
|
||||||
}
|
}
|
||||||
c.numInvokes++
|
c.numInvokes++
|
||||||
enc, err = amf.EncodeNumber(enc, float64(c.numInvokes))
|
enc, err = amf.EncodeNumber(enc, float64(c.numInvokes))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not encode number of invokes: %w", err)
|
||||||
}
|
}
|
||||||
enc[0] = amf.TypeNull
|
enc[0] = amf.TypeNull
|
||||||
enc = enc[1:]
|
enc = enc[1:]
|
||||||
enc, err = amf.EncodeString(enc, c.link.playpath)
|
enc, err = amf.EncodeString(enc, c.link.playpath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not encode link playpath: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
|
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
|
||||||
|
|
||||||
return pkt.writeTo(c, false)
|
err = pkt.writeTo(c, false)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not write packet: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func sendPublish(c *Conn) error {
|
func sendPublish(c *Conn) error {
|
||||||
|
@ -445,27 +471,32 @@ func sendPublish(c *Conn) error {
|
||||||
|
|
||||||
enc, err := amf.EncodeString(enc, avPublish)
|
enc, err := amf.EncodeString(enc, avPublish)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not encode av publish token: %w", err)
|
||||||
}
|
}
|
||||||
c.numInvokes++
|
c.numInvokes++
|
||||||
enc, err = amf.EncodeNumber(enc, float64(c.numInvokes))
|
enc, err = amf.EncodeNumber(enc, float64(c.numInvokes))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not encode number of invokes: %w", err)
|
||||||
}
|
}
|
||||||
enc[0] = amf.TypeNull
|
enc[0] = amf.TypeNull
|
||||||
enc = enc[1:]
|
enc = enc[1:]
|
||||||
enc, err = amf.EncodeString(enc, c.link.playpath)
|
enc, err = amf.EncodeString(enc, c.link.playpath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not encode link playpath: %w", err)
|
||||||
}
|
}
|
||||||
enc, err = amf.EncodeString(enc, avLive)
|
enc, err = amf.EncodeString(enc, avLive)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not encode av live token: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
|
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
|
||||||
|
|
||||||
return pkt.writeTo(c, true) // response expected
|
err = pkt.writeTo(c, true)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not write packet: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func sendDeleteStream(c *Conn, streamID float64) error {
|
func sendDeleteStream(c *Conn, streamID float64) error {
|
||||||
|
@ -481,22 +512,27 @@ func sendDeleteStream(c *Conn, streamID float64) error {
|
||||||
|
|
||||||
enc, err := amf.EncodeString(enc, avDeletestream)
|
enc, err := amf.EncodeString(enc, avDeletestream)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not encode av delete stream token: %w", err)
|
||||||
}
|
}
|
||||||
c.numInvokes++
|
c.numInvokes++
|
||||||
enc, err = amf.EncodeNumber(enc, float64(c.numInvokes))
|
enc, err = amf.EncodeNumber(enc, float64(c.numInvokes))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not encode number of invokes: %w", err)
|
||||||
}
|
}
|
||||||
enc[0] = amf.TypeNull
|
enc[0] = amf.TypeNull
|
||||||
enc = enc[1:]
|
enc = enc[1:]
|
||||||
enc, err = amf.EncodeNumber(enc, streamID)
|
enc, err = amf.EncodeNumber(enc, streamID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not encode stream id: %w", err)
|
||||||
}
|
}
|
||||||
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
|
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
|
||||||
|
|
||||||
return pkt.writeTo(c, false)
|
err = pkt.writeTo(c, false)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not write packet: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// sendBytesReceived tells the server how many bytes the client has received.
|
// sendBytesReceived tells the server how many bytes the client has received.
|
||||||
|
@ -515,11 +551,16 @@ func sendBytesReceived(c *Conn) error {
|
||||||
|
|
||||||
enc, err := amf.EncodeInt32(enc, c.nBytesIn)
|
enc, err := amf.EncodeInt32(enc, c.nBytesIn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not encode number of bytes in: %w", err)
|
||||||
}
|
}
|
||||||
pkt.bodySize = 4
|
pkt.bodySize = 4
|
||||||
|
|
||||||
return pkt.writeTo(c, false)
|
err = pkt.writeTo(c, false)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not write packet: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func sendCheckBW(c *Conn) error {
|
func sendCheckBW(c *Conn) error {
|
||||||
|
@ -535,19 +576,24 @@ func sendCheckBW(c *Conn) error {
|
||||||
|
|
||||||
enc, err := amf.EncodeString(enc, av_checkbw)
|
enc, err := amf.EncodeString(enc, av_checkbw)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not encode av check bw token: %w", err)
|
||||||
}
|
}
|
||||||
c.numInvokes++
|
c.numInvokes++
|
||||||
enc, err = amf.EncodeNumber(enc, float64(c.numInvokes))
|
enc, err = amf.EncodeNumber(enc, float64(c.numInvokes))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not encode number of invokes: %w", err)
|
||||||
}
|
}
|
||||||
enc[0] = amf.TypeNull
|
enc[0] = amf.TypeNull
|
||||||
enc = enc[1:]
|
enc = enc[1:]
|
||||||
|
|
||||||
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
|
pkt.bodySize = uint32((len(pbuf) - fullHeaderSize) - len(enc))
|
||||||
|
|
||||||
return pkt.writeTo(c, false)
|
err = pkt.writeTo(c, false)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not write packet: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func eraseMethod(m []method, i int) []method {
|
func eraseMethod(m []method, i int) []method {
|
||||||
|
@ -565,16 +611,16 @@ func handleInvoke(c *Conn, body []byte) error {
|
||||||
var obj amf.Object
|
var obj amf.Object
|
||||||
_, err := amf.Decode(&obj, body, false)
|
_, err := amf.Decode(&obj, body, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not decode: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
meth, err := obj.StringProperty("", 0)
|
meth, err := obj.StringProperty("", 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not get value of string property meth: %w", err)
|
||||||
}
|
}
|
||||||
txn, err := obj.NumberProperty("", 1)
|
txn, err := obj.NumberProperty("", 1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not get value of number property txn: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.log(DebugLevel, pkg+"invoking method "+meth)
|
c.log(DebugLevel, pkg+"invoking method "+meth)
|
||||||
|
@ -601,21 +647,21 @@ func handleInvoke(c *Conn, body []byte) error {
|
||||||
case avConnect:
|
case avConnect:
|
||||||
err := sendReleaseStream(c)
|
err := sendReleaseStream(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not send release stream: %w", err)
|
||||||
}
|
}
|
||||||
err = sendFCPublish(c)
|
err = sendFCPublish(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not send fc publish: %w", err)
|
||||||
}
|
}
|
||||||
err = sendCreateStream(c)
|
err = sendCreateStream(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not send create stream: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
case avCreatestream:
|
case avCreatestream:
|
||||||
n, err := obj.NumberProperty("", 3)
|
n, err := obj.NumberProperty("", 3)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not get value for stream id number property: %w",err)
|
||||||
}
|
}
|
||||||
c.streamID = uint32(n)
|
c.streamID = uint32(n)
|
||||||
err = sendPublish(c)
|
err = sendPublish(c)
|
||||||
|
@ -630,27 +676,27 @@ func handleInvoke(c *Conn, body []byte) error {
|
||||||
case avOnBWDone:
|
case avOnBWDone:
|
||||||
err := sendCheckBW(c)
|
err := sendCheckBW(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not send check bw: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
case avOnStatus:
|
case avOnStatus:
|
||||||
obj2, err := obj.ObjectProperty("", 3)
|
obj2, err := obj.ObjectProperty("", 3)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not get object property value for obj2: %w", err)
|
||||||
}
|
}
|
||||||
code, err := obj2.StringProperty(avCode, -1)
|
code, err := obj2.StringProperty(avCode, -1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not get string property value for code: %w", err)
|
||||||
}
|
}
|
||||||
level, err := obj2.StringProperty(avLevel, -1)
|
level, err := obj2.StringProperty(avLevel, -1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not get string property value for level: %w", err)
|
||||||
}
|
}
|
||||||
c.log(DebugLevel, pkg+"onStatus", "code", code, "level", level)
|
c.log(DebugLevel, pkg+"onStatus", "code", code, "level", level)
|
||||||
|
|
||||||
if code != avNetStreamPublish_Start {
|
if code != avNetStreamPublish_Start {
|
||||||
c.log(ErrorLevel, pkg+"unexpected response "+code)
|
c.log(ErrorLevel, pkg+"unexpected response "+code)
|
||||||
return errUnimplemented
|
return fmt.Errorf("unimplemented code: %v", code)
|
||||||
}
|
}
|
||||||
c.log(DebugLevel, pkg+"playing")
|
c.log(DebugLevel, pkg+"playing")
|
||||||
c.isPlaying = true
|
c.isPlaying = true
|
||||||
|
@ -681,14 +727,14 @@ func handshake(c *Conn) error {
|
||||||
|
|
||||||
_, err := c.write(clientbuf[:])
|
_, err := c.write(clientbuf[:])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not write handshake: %w", err)
|
||||||
}
|
}
|
||||||
c.log(DebugLevel, pkg+"handshake sent")
|
c.log(DebugLevel, pkg+"handshake sent")
|
||||||
|
|
||||||
var typ [1]byte
|
var typ [1]byte
|
||||||
_, err = c.read(typ[:])
|
_, err = c.read(typ[:])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not read handshake: %w", err)
|
||||||
}
|
}
|
||||||
c.log(DebugLevel, pkg+"handshake received")
|
c.log(DebugLevel, pkg+"handshake received")
|
||||||
|
|
||||||
|
@ -697,7 +743,7 @@ func handshake(c *Conn) error {
|
||||||
}
|
}
|
||||||
_, err = c.read(serversig[:])
|
_, err = c.read(serversig[:])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not read server signal: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// decode server response
|
// decode server response
|
||||||
|
@ -707,12 +753,12 @@ func handshake(c *Conn) error {
|
||||||
// 2nd part of handshake
|
// 2nd part of handshake
|
||||||
_, err = c.write(serversig[:])
|
_, err = c.write(serversig[:])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not write part 2 of handshake: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = c.read(serversig[:])
|
_, err = c.read(serversig[:])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not read part 2 of handshake: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !bytes.Equal(serversig[:signatureSize], clientbuf[1:signatureSize+1]) {
|
if !bytes.Equal(serversig[:signatureSize], clientbuf[1:signatureSize+1]) {
|
||||||
|
|
Loading…
Reference in New Issue