All rmtp functions now return an error (or nothing), except for C_RTMP_IsConnected() which now returns a bool instead of an int.

This commit is contained in:
scruzin 2019-01-06 14:42:51 +10:30
parent b7871cb2f2
commit 81b92b2302
3 changed files with 165 additions and 177 deletions

View File

@ -41,6 +41,7 @@ import (
"io" "io"
"log" "log"
"math/rand" "math/rand"
"net"
"strconv" "strconv"
"time" "time"
) )
@ -133,6 +134,12 @@ var (
errHandshake = errors.New("rtmp: handshake failed") errHandshake = errors.New("rtmp: handshake failed")
errConnSend = errors.New("rtmp: connection send error") errConnSend = errors.New("rtmp: connection send error")
errConnStream = errors.New("rtmp: connection stream error") errConnStream = errors.New("rtmp: connection stream error")
errInvalidHeader = errors.New("rtmp: invalid header")
errInvalidBody = errors.New("rtmp: invalid body")
errTinyPacket = errors.New("rtmp: packet too small")
errEncoding = errors.New("rtmp: encoding error")
errDecoding = errors.New("rtmp: decoding error")
errCopying = errors.New("rtmp: copying error")
) )
func startSession(rtmp *C_RTMP, u string, timeout uint) (*C_RTMP, error) { func startSession(rtmp *C_RTMP, u string, timeout uint) (*C_RTMP, error) {
@ -183,12 +190,11 @@ func C_RTMP_GetTime() int32 {
// int RTMPPacket_Alloc(RTMPPacket* p, uint32_t nSize); // int RTMPPacket_Alloc(RTMPPacket* p, uint32_t nSize);
// rtmp.c +189 // rtmp.c +189
func C_RTMPPacket_Alloc(p *C_RTMPPacket, nSize uint32) (ok bool) { func C_RTMPPacket_Alloc(p *C_RTMPPacket, nSize uint32) {
buf := make([]byte, RTMP_MAX_HEADER_SIZE+nSize) buf := make([]byte, RTMP_MAX_HEADER_SIZE+nSize)
p.m_header = buf p.m_header = buf
p.m_body = buf[RTMP_MAX_HEADER_SIZE:] p.m_body = buf[RTMP_MAX_HEADER_SIZE:]
p.m_nBytesRead = 0 p.m_nBytesRead = 0
return true
} }
// void RTMPPacket_Free(RTMPPacket* p); // void RTMPPacket_Free(RTMPPacket* p);
@ -229,11 +235,11 @@ func C_RTMP_EnableWrite(r *C_RTMP) {
// int RTMP_IsConnected(RTMP *r); // int RTMP_IsConnected(RTMP *r);
// rtmp.c +363 // rtmp.c +363
func C_RTMP_IsConnected(r *C_RTMP) int32 { func C_RTMP_IsConnected(r *C_RTMP) bool {
if r.m_sb.conn != nil { if r.m_sb.conn != nil {
return 1 return true
} }
return 0 return false
} }
// void RTMP_SetBufferMS(RTMP *r, int size); // void RTMP_SetBufferMS(RTMP *r, int size);
@ -279,26 +285,41 @@ func C_RTMP_SetupURL(r *C_RTMP, addr string) (err error) {
return nil return nil
} }
// int RTMP_Connect1(RTMP* r, RTMPPacket* cp); // int RTMP_Connect(RTMP *r, RTMPPacket* cp);
// rtmp.c +978 // rtmp.c +1032
func C_RTMP_Connect1(r *C_RTMP, cp *C_RTMPPacket) error { func C_RTMP_Connect(r *C_RTMP, cp *C_RTMPPacket) error {
addr, err := net.ResolveTCPAddr("tcp4", r.Link.host+":"+strconv.Itoa(int(r.Link.port)))
if err != nil {
return err
}
r.m_sb.conn, err = net.DialTCP("tcp4", nil, addr)
if err != nil {
return err
}
r.m_sb.timeout = r.Link.timeout
if debugMode { if debugMode {
log.Println("... connected, handshaking...") log.Println("... connected, handshaking...")
} }
if !C_HandShake(r, 1) { err = C_HandShake(r, 1)
if err != nil {
log.Println("C_RTMP_Connect1: handshake failed!") log.Println("C_RTMP_Connect1: handshake failed!")
return errHandshake return errHandshake
} }
if debugMode { if debugMode {
log.Println("... handshaked...") log.Println("... handshaked...")
} }
if !C_SendConnectPacket(r, cp) { err = C_SendConnectPacket(r, cp)
if err != nil {
log.Println("RTMP connect failed!") log.Println("RTMP connect failed!")
return errConnSend return errConnSend
} }
return nil return nil
} }
// int RTMP_Connect1(RTMP* r, RTMPPacket* cp);
// rtmp.c +978
// DELETED - subsumed by RTMP_Connect
// int RTMP_ConnectStream(RTMP* r, int seekTime); // int RTMP_ConnectStream(RTMP* r, int seekTime);
// rtmp.c +1099 // rtmp.c +1099
// Side effects: r.m_bPlaying is set true upon successful connection // Side effects: r.m_bPlaying is set true upon successful connection
@ -311,8 +332,11 @@ func C_RTMP_ConnectStream(r *C_RTMP, seekTime int32) error {
r.m_mediaChannel = 0 r.m_mediaChannel = 0
// TODO: read packet for !r.m_bPlaying && C_RTMP_IsConnected(r) {
for !r.m_bPlaying && C_RTMP_IsConnected(r) != 0 && C_RTMP_ReadPacket(r, &packet) { err := C_RTMP_ReadPacket(r, &packet)
if err != nil {
break
}
// TODO: port is ready // TODO: port is ready
if C_RTMPPacket_IsReady(&packet) { if C_RTMPPacket_IsReady(&packet) {
if packet.m_nBodySize == 0 { if packet.m_nBodySize == 0 {
@ -385,7 +409,8 @@ func C_RTMP_ClientPacket(r *C_RTMP, packet *C_RTMPPacket) int32 {
// TODO use new logger here // TODO use new logger here
//RTMP_Log(RTMP_LOGDEBUG, "%s, received: invoke %u bytes", __FUNCTION__,packet.m_nBodySize); //RTMP_Log(RTMP_LOGDEBUG, "%s, received: invoke %u bytes", __FUNCTION__,packet.m_nBodySize);
if C_HandleInvoke(r, packet.m_body[:packet.m_nBodySize]) { err := C_HandleInvoke(r, packet.m_body[:packet.m_nBodySize])
if err != nil {
// This will never happen with the methods we implement. // This will never happen with the methods we implement.
log.Println("HasMediaPacket") log.Println("HasMediaPacket")
bHasMediaPacket = 2 bHasMediaPacket = 2
@ -403,60 +428,50 @@ func C_RTMP_ClientPacket(r *C_RTMP, packet *C_RTMPPacket) int32 {
// int ReadN(RTMP* r, char* buffer, int n); // int ReadN(RTMP* r, char* buffer, int n);
// rtmp.c +1390 // rtmp.c +1390
func C_ReadN(r *C_RTMP, buf []byte) int { func C_ReadN(r *C_RTMP, buf []byte) error {
err := r.m_sb.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(r.m_sb.timeout))) err := r.m_sb.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(r.m_sb.timeout)))
if err != nil { if err != nil {
return 0 return err
} }
n, err := io.ReadFull(r.m_sb.conn, buf) n, err := io.ReadFull(r.m_sb.conn, buf)
if err != nil { if err != nil {
if debugMode { if debugMode {
log.Printf("C_ReadN error: %v\n", err) log.Printf("C_ReadN error: %v\n", err)
} }
return 0
}
if n == 0 {
if debugMode {
log.Println("RTMP socket closed by peer")
}
C_RTMP_Close(r) C_RTMP_Close(r)
return 0 return err
} }
r.m_nBytesIn += int32(n) r.m_nBytesIn += int32(n)
if r.m_nBytesIn > (r.m_nBytesInSent + r.m_nClientBW/10) { if r.m_nBytesIn > (r.m_nBytesInSent + r.m_nClientBW/10) {
if !C_SendBytesReceived(r) { err := C_SendBytesReceived(r)
return 0 if err != nil {
return err
} }
} }
return n return nil
} }
// int WriteN(RTMP* r, const char* buffer, int n); // int WriteN(RTMP* r, const char* buffer, int n);
// rtmp.c +1502 // rtmp.c +1502
func C_WriteN(r *C_RTMP, buf []byte) (ok bool) { func C_WriteN(r *C_RTMP, buf []byte) error {
for len(buf) != 0 { err := r.m_sb.conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(r.m_sb.timeout)))
n, err := C_RTMPSockBuf_Send(&r.m_sb, buf) if err != nil {
if n < 0 { return err
}
_, err = r.m_sb.conn.Write(buf)
if err != nil {
if debugMode { if debugMode {
log.Printf("C_WriteN, RTMP send error: %v\n", err) log.Printf("C_WriteN, RTMP send error: %v\n", err)
} }
C_RTMP_Close(r) C_RTMP_Close(r)
return false return err
} }
return nil
if n == 0 {
break
}
buf = buf[n:]
}
// !ok here is equivalent to io.ErrShortWrite.
return len(buf) == 0
} }
// int SendConnectPacket(RTMP* r, RTMPPacket* cp); // int SendConnectPacket(RTMP* r, RTMPPacket* cp);
// rtmp.c +1579 // rtmp.c +1579
func C_SendConnectPacket(r *C_RTMP, cp *C_RTMPPacket) (ok bool) { func C_SendConnectPacket(r *C_RTMP, cp *C_RTMPPacket) error {
if cp != nil { if cp != nil {
return C_RTMP_SendPacket(r, cp, 1) return C_RTMP_SendPacket(r, cp, 1)
} }
@ -484,60 +499,60 @@ func C_SendConnectPacket(r *C_RTMP, cp *C_RTMPPacket) (ok bool) {
enc = C_AMF_EncodeNamedString(enc, av_app, r.Link.app) enc = C_AMF_EncodeNamedString(enc, av_app, r.Link.app)
if enc == nil { if enc == nil {
return false return errEncoding
} }
if r.Link.protocol&RTMP_FEATURE_WRITE != 0 { if r.Link.protocol&RTMP_FEATURE_WRITE != 0 {
enc = C_AMF_EncodeNamedString(enc, av_type, av_nonprivate) enc = C_AMF_EncodeNamedString(enc, av_type, av_nonprivate)
if enc == nil { if enc == nil {
return false return errEncoding
} }
} }
if r.Link.flashVer != "" { if r.Link.flashVer != "" {
enc = C_AMF_EncodeNamedString(enc, av_flashVer, r.Link.flashVer) enc = C_AMF_EncodeNamedString(enc, av_flashVer, r.Link.flashVer)
if enc == nil { if enc == nil {
return false return errEncoding
} }
} }
if r.Link.swfUrl != "" { if r.Link.swfUrl != "" {
enc = C_AMF_EncodeNamedString(enc, av_swfUrl, r.Link.swfUrl) enc = C_AMF_EncodeNamedString(enc, av_swfUrl, r.Link.swfUrl)
if enc == nil { if enc == nil {
return false return errEncoding
} }
} }
if r.Link.tcUrl != "" { if r.Link.tcUrl != "" {
enc = C_AMF_EncodeNamedString(enc, av_tcUrl, r.Link.tcUrl) enc = C_AMF_EncodeNamedString(enc, av_tcUrl, r.Link.tcUrl)
if enc == nil { if enc == nil {
return false return errEncoding
} }
} }
if r.Link.protocol&RTMP_FEATURE_WRITE == 0 { if r.Link.protocol&RTMP_FEATURE_WRITE == 0 {
enc = C_AMF_EncodeNamedBoolean(enc, av_fpad, false) enc = C_AMF_EncodeNamedBoolean(enc, av_fpad, false)
if enc == nil { if enc == nil {
return false return errEncoding
} }
enc = C_AMF_EncodeNamedNumber(enc, av_capabilities, 15) enc = C_AMF_EncodeNamedNumber(enc, av_capabilities, 15)
if enc == nil { if enc == nil {
return false return errEncoding
} }
enc = C_AMF_EncodeNamedNumber(enc, av_audioCodecs, r.m_fAudioCodecs) enc = C_AMF_EncodeNamedNumber(enc, av_audioCodecs, r.m_fAudioCodecs)
if enc == nil { if enc == nil {
return false return errEncoding
} }
enc = C_AMF_EncodeNamedNumber(enc, av_videoCodecs, r.m_fVideoCodecs) enc = C_AMF_EncodeNamedNumber(enc, av_videoCodecs, r.m_fVideoCodecs)
if enc == nil { if enc == nil {
return false return errEncoding
} }
enc = C_AMF_EncodeNamedNumber(enc, av_videoFunction, 1) enc = C_AMF_EncodeNamedNumber(enc, av_videoFunction, 1)
if enc == nil { if enc == nil {
return false return errEncoding
} }
if r.Link.pageUrl != "" { if r.Link.pageUrl != "" {
enc = C_AMF_EncodeNamedString(enc, av_pageUrl, r.Link.pageUrl) enc = C_AMF_EncodeNamedString(enc, av_pageUrl, r.Link.pageUrl)
if enc == nil { if enc == nil {
return false return errEncoding
} }
} }
} }
@ -545,12 +560,12 @@ func C_SendConnectPacket(r *C_RTMP, cp *C_RTMPPacket) (ok bool) {
if r.m_fEncoding != 0.0 || r.m_bSendEncoding { if r.m_fEncoding != 0.0 || r.m_bSendEncoding {
enc = C_AMF_EncodeNamedNumber(enc, av_objectEncoding, r.m_fEncoding) enc = C_AMF_EncodeNamedNumber(enc, av_objectEncoding, r.m_fEncoding)
if enc == nil { if enc == nil {
return false return errEncoding
} }
} }
if copy(enc, []byte{0, 0, AMF_OBJECT_END}) != 3 { if copy(enc, []byte{0, 0, AMF_OBJECT_END}) != 3 {
return false return errCopying // TODO: is this even possible?
} }
enc = enc[3:] enc = enc[3:]
@ -558,18 +573,18 @@ func C_SendConnectPacket(r *C_RTMP, cp *C_RTMPPacket) (ok bool) {
if r.Link.auth != "" { if r.Link.auth != "" {
enc = C_AMF_EncodeBoolean(enc, r.Link.lFlags&RTMP_LF_AUTH != 0) enc = C_AMF_EncodeBoolean(enc, r.Link.lFlags&RTMP_LF_AUTH != 0)
if enc == nil { if enc == nil {
return false return errEncoding
} }
enc = C_AMF_EncodeString(enc, r.Link.auth) enc = C_AMF_EncodeString(enc, r.Link.auth)
if enc == nil { if enc == nil {
return false return errEncoding
} }
} }
for i := range r.Link.extras.o_props { for i := range r.Link.extras.o_props {
enc = C_AMF_PropEncode(&r.Link.extras.o_props[i], enc) enc = C_AMF_PropEncode(&r.Link.extras.o_props[i], enc)
if enc == nil { if enc == nil {
return false return errEncoding
} }
} }
@ -580,7 +595,7 @@ func C_SendConnectPacket(r *C_RTMP, cp *C_RTMPPacket) (ok bool) {
// int RTMP_SendCreateStream(RTMP* r); // int RTMP_SendCreateStream(RTMP* r);
// rtmp.c +1725 // rtmp.c +1725
func C_RTMP_SendCreateStream(r *C_RTMP) (ok bool) { func C_RTMP_SendCreateStream(r *C_RTMP) error {
var pbuf [256]byte var pbuf [256]byte
packet := C_RTMPPacket{ packet := C_RTMPPacket{
m_nChannel: 0x03, /* control channel (invoke) */ m_nChannel: 0x03, /* control channel (invoke) */
@ -607,7 +622,7 @@ func C_RTMP_SendCreateStream(r *C_RTMP) (ok bool) {
// int SendReleaseStream(RTMP* r); // int SendReleaseStream(RTMP* r);
// rtmp.c +1816 // rtmp.c +1816
func C_SendReleaseStream(r *C_RTMP) (ok bool) { func C_SendReleaseStream(r *C_RTMP) error {
var pbuf [1024]byte var pbuf [1024]byte
packet := C_RTMPPacket{ packet := C_RTMPPacket{
m_nChannel: 0x03, /* control channel (invoke) */ m_nChannel: 0x03, /* control channel (invoke) */
@ -628,7 +643,7 @@ func C_SendReleaseStream(r *C_RTMP) (ok bool) {
enc = enc[1:] enc = enc[1:]
enc = C_AMF_EncodeString(enc, r.Link.playpath) enc = C_AMF_EncodeString(enc, r.Link.playpath)
if enc == nil { if enc == nil {
return false return errEncoding
} }
packet.m_nBodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) packet.m_nBodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc))
@ -638,7 +653,7 @@ func C_SendReleaseStream(r *C_RTMP) (ok bool) {
// int SendFCPublish(RTMP* r); // int SendFCPublish(RTMP* r);
// rtmp.c +1846 // rtmp.c +1846
func C_SendFCPublish(r *C_RTMP) (ok bool) { func C_SendFCPublish(r *C_RTMP) error {
var pbuf [1024]byte var pbuf [1024]byte
packet := C_RTMPPacket{ packet := C_RTMPPacket{
m_nChannel: 0x03, /* control channel (invoke) */ m_nChannel: 0x03, /* control channel (invoke) */
@ -659,7 +674,7 @@ func C_SendFCPublish(r *C_RTMP) (ok bool) {
enc = enc[1:] enc = enc[1:]
enc = C_AMF_EncodeString(enc, r.Link.playpath) enc = C_AMF_EncodeString(enc, r.Link.playpath)
if enc == nil { if enc == nil {
return false return errEncoding
} }
packet.m_nBodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) packet.m_nBodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc))
@ -669,7 +684,7 @@ func C_SendFCPublish(r *C_RTMP) (ok bool) {
// int SendFCUnpublish(RTMP *r); // int SendFCUnpublish(RTMP *r);
// rtmp.c +1875 // rtmp.c +1875
func C_SendFCUnpublish(r *C_RTMP) (ok bool) { func C_SendFCUnpublish(r *C_RTMP) error {
var pbuf [1024]byte var pbuf [1024]byte
packet := C_RTMPPacket{ packet := C_RTMPPacket{
m_nChannel: 0x03, /* control channel (invoke) */ m_nChannel: 0x03, /* control channel (invoke) */
@ -691,7 +706,7 @@ func C_SendFCUnpublish(r *C_RTMP) (ok bool) {
enc = C_AMF_EncodeString(enc, r.Link.playpath) enc = C_AMF_EncodeString(enc, r.Link.playpath)
if enc == nil { if enc == nil {
return false return errEncoding
} }
packet.m_nBodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) packet.m_nBodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc))
@ -701,7 +716,7 @@ func C_SendFCUnpublish(r *C_RTMP) (ok bool) {
// int SendPublish(RTMP* r); // int SendPublish(RTMP* r);
// rtmp.c +1908 // rtmp.c +1908
func C_SendPublish(r *C_RTMP) (ok bool) { func C_SendPublish(r *C_RTMP) error {
var pbuf [1024]byte var pbuf [1024]byte
packet := C_RTMPPacket{ packet := C_RTMPPacket{
m_nChannel: 0x04, /* source channel (invoke) */ m_nChannel: 0x04, /* source channel (invoke) */
@ -721,14 +736,12 @@ func C_SendPublish(r *C_RTMP) (ok bool) {
enc[0] = AMF_NULL enc[0] = AMF_NULL
enc = enc[1:] enc = enc[1:]
enc = C_AMF_EncodeString(enc, r.Link.playpath) enc = C_AMF_EncodeString(enc, r.Link.playpath)
if enc == nil { if enc == nil {
return false return errEncoding
} }
enc = C_AMF_EncodeString(enc, av_live) enc = C_AMF_EncodeString(enc, av_live)
if enc == nil { if enc == nil {
return false return errEncoding
} }
packet.m_nBodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) packet.m_nBodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc))
@ -739,7 +752,7 @@ func C_SendPublish(r *C_RTMP) (ok bool) {
// int // int
// SendDeleteStream(RTMP *r, double dStreamId) // SendDeleteStream(RTMP *r, double dStreamId)
// rtmp.c +1942 // rtmp.c +1942
func C_SendDeleteStream(r *C_RTMP, dStreamId float64) (ok bool) { func C_SendDeleteStream(r *C_RTMP, dStreamId float64) error {
var pbuf [256]byte var pbuf [256]byte
packet := C_RTMPPacket{ packet := C_RTMPPacket{
m_nChannel: 0x03, /* control channel (invoke) */ m_nChannel: 0x03, /* control channel (invoke) */
@ -754,12 +767,20 @@ func C_SendDeleteStream(r *C_RTMP, dStreamId float64) (ok bool) {
enc := packet.m_body enc := packet.m_body
enc = C_AMF_EncodeString(enc, av_deleteStream) enc = C_AMF_EncodeString(enc, av_deleteStream)
if enc == nil {
return errEncoding
}
r.m_numInvokes++ r.m_numInvokes++
enc = C_AMF_EncodeNumber(enc, float64(r.m_numInvokes)) enc = C_AMF_EncodeNumber(enc, float64(r.m_numInvokes))
if enc == nil {
return errEncoding
}
enc[0] = AMF_NULL enc[0] = AMF_NULL
enc = enc[1:] enc = enc[1:]
enc = C_AMF_EncodeNumber(enc, dStreamId) enc = C_AMF_EncodeNumber(enc, dStreamId)
if enc == nil {
return errEncoding
}
packet.m_nBodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) packet.m_nBodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc))
/* no response expected */ /* no response expected */
@ -768,7 +789,7 @@ func C_SendDeleteStream(r *C_RTMP, dStreamId float64) (ok bool) {
// int SendBytesReceived(RTMP* r); // int SendBytesReceived(RTMP* r);
// rtmp.c +2080 // rtmp.c +2080
func C_SendBytesReceived(r *C_RTMP) (ok bool) { func C_SendBytesReceived(r *C_RTMP) error {
var pbuf [256]byte var pbuf [256]byte
packet := C_RTMPPacket{ packet := C_RTMPPacket{
m_nChannel: 0x02, /* control channel (invoke) */ m_nChannel: 0x02, /* control channel (invoke) */
@ -792,7 +813,7 @@ func C_SendBytesReceived(r *C_RTMP) (ok bool) {
// int SendCheckBW(RTMP* r); // int SendCheckBW(RTMP* r);
// rtmp.c +2105 // rtmp.c +2105
func C_SendCheckBW(r *C_RTMP) (ok bool) { func C_SendCheckBW(r *C_RTMP) error {
var pbuf [256]byte var pbuf [256]byte
packet := C_RTMPPacket{ packet := C_RTMPPacket{
m_nChannel: 0x03, /* control channel (invoke) */ m_nChannel: 0x03, /* control channel (invoke) */
@ -828,19 +849,14 @@ func C_AV_erase(m []C_RTMP_METHOD, i int) []C_RTMP_METHOD {
// int HandleInvoke(RTMP* r, const char* body, unsigned int nBodySize); // int HandleInvoke(RTMP* r, const char* body, unsigned int nBodySize);
// rtmp.c +2912 // rtmp.c +2912
// Side effects: r.m_bPlaying set to true upon av_NetStream_Publish_Start // Side effects: r.m_bPlaying set to true upon av_NetStream_Publish_Start
func C_HandleInvoke(r *C_RTMP, body []byte) (ok bool) { func C_HandleInvoke(r *C_RTMP, body []byte) error {
if body[0] != 0x02 { if body[0] != 0x02 {
// TODO use new logger here return errInvalidBody
//RTMP_Log(RTMP_LOGWARNING, "%s, Sanity failed. no string method in invoke packet",
//__FUNCTION__);
return false
} }
var obj C_AMFObject var obj C_AMFObject
nRes := C_AMF_Decode(&obj, body, 0) nRes := C_AMF_Decode(&obj, body, 0)
if nRes < 0 { if nRes < 0 {
// TODO use new logger here return errDecoding
//RTMP_Log(RTMP_LOGERROR, "%s, error decoding invoke packet", __FUNCTION__);
return false
} }
// NOTE we don't really need this ?? still functions without it // NOTE we don't really need this ?? still functions without it
@ -969,7 +985,7 @@ func C_HandleInvoke(r *C_RTMP, body []byte) (ok bool) {
leave: leave:
C_AMF_Reset(&obj) C_AMF_Reset(&obj)
// None of the methods we implement will result in a true return. // None of the methods we implement will result in a true return.
return ok return nil
} }
// void HandleChangeChunkSize(RTMP* r, const RTMPPacket* packet); // void HandleChangeChunkSize(RTMP* r, const RTMPPacket* packet);
@ -1022,13 +1038,14 @@ func C_EncodeInt32LE(dst []byte, v int32) int32 {
// int RTMP_ReadPacket(RTMP* r, RTMPPacket* packet); // int RTMP_ReadPacket(RTMP* r, RTMPPacket* packet);
// rtmp.c +3550 // rtmp.c +3550
func C_RTMP_ReadPacket(r *C_RTMP, packet *C_RTMPPacket) (ok bool) { func C_RTMP_ReadPacket(r *C_RTMP, packet *C_RTMPPacket) error {
var hbuf [RTMP_MAX_HEADER_SIZE]byte var hbuf [RTMP_MAX_HEADER_SIZE]byte
header := hbuf[:] header := hbuf[:]
if C_ReadN(r, header[:1]) != 1 { err := C_ReadN(r, header[:1])
if err != nil {
log.Println("C_RTMP_ReadPacket: failed to read RTMP packet header!") log.Println("C_RTMP_ReadPacket: failed to read RTMP packet header!")
return false return err
} }
packet.m_headerType = (header[0] & 0xc0) >> 6 packet.m_headerType = (header[0] & 0xc0) >> 6
packet.m_nChannel = int32(header[0] & 0x3f) packet.m_nChannel = int32(header[0] & 0x3f)
@ -1036,17 +1053,19 @@ func C_RTMP_ReadPacket(r *C_RTMP, packet *C_RTMPPacket) (ok bool) {
switch { switch {
case packet.m_nChannel == 0: case packet.m_nChannel == 0:
if C_ReadN(r, header[:1]) != 1 { err = C_ReadN(r, header[:1])
if err != nil {
log.Println("C_RTMP_ReadPacket: failed to read rtmp packet header 2nd byte.") log.Println("C_RTMP_ReadPacket: failed to read rtmp packet header 2nd byte.")
return false return err
} }
header = header[1:] header = header[1:]
packet.m_nChannel = int32(header[0]) + 64 packet.m_nChannel = int32(header[0]) + 64
case packet.m_nChannel == 1: case packet.m_nChannel == 1:
if C_ReadN(r, header[:2]) != 2 { err = C_ReadN(r, header[:2])
if err != nil {
log.Println("C_RTMP_ReadPacket: failed to read RTMP packet 3rd byte") log.Println("C_RTMP_ReadPacket: failed to read RTMP packet 3rd byte")
return false return err
} }
header = header[2:] header = header[2:]
packet.m_nChannel = int32(binary.BigEndian.Uint16(header[:2])) + 64 packet.m_nChannel = int32(binary.BigEndian.Uint16(header[:2])) + 64
@ -1088,9 +1107,12 @@ func C_RTMP_ReadPacket(r *C_RTMP, packet *C_RTMPPacket) (ok bool) {
nSize-- nSize--
if nSize > 0 && C_ReadN(r, header[:nSize]) != int(nSize) { if nSize > 0 {
err = C_ReadN(r, header[:nSize])
if err != nil {
log.Println("C_RTMP_ReadPacket: failed to read rtmp packet header.") log.Println("C_RTMP_ReadPacket: failed to read rtmp packet header.")
return false return err
}
} }
hSize := len(hbuf) - len(header) + nSize hSize := len(hbuf) - len(header) + nSize
@ -1114,9 +1136,10 @@ func C_RTMP_ReadPacket(r *C_RTMP, packet *C_RTMPPacket) (ok bool) {
extendedTimestamp := packet.m_nTimeStamp == 0xffffff extendedTimestamp := packet.m_nTimeStamp == 0xffffff
if extendedTimestamp { if extendedTimestamp {
if C_ReadN(r, header[nSize:nSize+4]) != 4 { err = C_ReadN(r, header[nSize:nSize+4])
if err != nil {
log.Println("RTMPRead_Packet: Failed to read extended timestamp") log.Println("RTMPRead_Packet: Failed to read extended timestamp")
return false return err
} }
// TODO: port this // TODO: port this
packet.m_nTimeStamp = C_AMF_DecodeInt32(header[nSize : nSize+4]) packet.m_nTimeStamp = C_AMF_DecodeInt32(header[nSize : nSize+4])
@ -1125,10 +1148,7 @@ func C_RTMP_ReadPacket(r *C_RTMP, packet *C_RTMPPacket) (ok bool) {
if packet.m_nBodySize > 0 && packet.m_body == nil { if packet.m_nBodySize > 0 && packet.m_body == nil {
// TODO: port this // TODO: port this
if !C_RTMPPacket_Alloc(packet, packet.m_nBodySize) { C_RTMPPacket_Alloc(packet, packet.m_nBodySize)
log.Println("RTMPRead_Packet: failed to allocate packet")
return false
}
packet.m_headerType = (hbuf[0] & 0xc0) >> 6 packet.m_headerType = (hbuf[0] & 0xc0) >> 6
} }
@ -1145,9 +1165,10 @@ func C_RTMP_ReadPacket(r *C_RTMP, packet *C_RTMPPacket) (ok bool) {
packet.m_chunk.c_chunk = packet.m_body[packet.m_nBytesRead : packet.m_nBytesRead+uint32(nChunk)] packet.m_chunk.c_chunk = packet.m_body[packet.m_nBytesRead : packet.m_nBytesRead+uint32(nChunk)]
} }
if C_ReadN(r, packet.m_body[packet.m_nBytesRead:][:nChunk]) != int(nChunk) { err = C_ReadN(r, packet.m_body[packet.m_nBytesRead:][:nChunk])
if err != nil {
log.Println("C_RTMP_ReadPacket: failed to read RTMP packet body") log.Println("C_RTMP_ReadPacket: failed to read RTMP packet body")
return false return err
} }
packet.m_nBytesRead += uint32(nChunk) packet.m_nBytesRead += uint32(nChunk)
@ -1176,12 +1197,12 @@ func C_RTMP_ReadPacket(r *C_RTMP, packet *C_RTMPPacket) (ok bool) {
} else { } else {
packet.m_body = nil /* so it won't be erased on free */ packet.m_body = nil /* so it won't be erased on free */
} }
return true return nil
} }
// int HandShake(RTMP* r, int FP9HandShake); // int HandShake(RTMP* r, int FP9HandShake);
// rtmp.c +3744 // rtmp.c +3744
func C_HandShake(r *C_RTMP, FP9HandShake int32) (ok bool) { func C_HandShake(r *C_RTMP, FP9HandShake int32) error {
var clientbuf [RTMP_SIG_SIZE + 1]byte var clientbuf [RTMP_SIG_SIZE + 1]byte
clientsig := clientbuf[1:] clientsig := clientbuf[1:]
@ -1196,13 +1217,15 @@ func C_HandShake(r *C_RTMP, FP9HandShake int32) (ok bool) {
clientsig[i] = byte(rand.Intn(256)) clientsig[i] = byte(rand.Intn(256))
} }
if !C_WriteN(r, clientbuf[:]) { err := C_WriteN(r, clientbuf[:])
return false if err != nil {
return err
} }
var typ [1]byte var typ [1]byte
if C_ReadN(r, typ[:]) != 1 { err = C_ReadN(r, typ[:])
return false if err != nil {
return err
} }
if debugMode { if debugMode {
@ -1212,8 +1235,9 @@ func C_HandShake(r *C_RTMP, FP9HandShake int32) (ok bool) {
log.Printf("C_HandShake: type mismatch: client sent %v, server sent: %v\n", log.Printf("C_HandShake: type mismatch: client sent %v, server sent: %v\n",
clientbuf[0], typ) clientbuf[0], typ)
} }
if C_ReadN(r, serversig[:]) != RTMP_SIG_SIZE { err = C_ReadN(r, serversig[:])
return false if err != nil {
return err
} }
// decode server response // decode server response
@ -1224,24 +1248,26 @@ func C_HandShake(r *C_RTMP, FP9HandShake int32) (ok bool) {
// serversig[4], serversig[5], serversig[6], serversig[7]) // serversig[4], serversig[5], serversig[6], serversig[7])
// 2nd part of handshake // 2nd part of handshake
if !C_WriteN(r, serversig[:]) { err = C_WriteN(r, serversig[:])
return false if err != nil {
return err
} }
if C_ReadN(r, serversig[:]) != RTMP_SIG_SIZE { err = C_ReadN(r, serversig[:])
return false if err != nil {
return err
} }
if !bytes.Equal(serversig[:RTMP_SIG_SIZE], clientbuf[1:RTMP_SIG_SIZE+1]) { if !bytes.Equal(serversig[:RTMP_SIG_SIZE], clientbuf[1:RTMP_SIG_SIZE+1]) {
log.Printf("Client signature does not match: %q != %q", log.Printf("Client signature does not match: %q != %q",
serversig[:RTMP_SIG_SIZE], clientbuf[1:RTMP_SIG_SIZE+1]) serversig[:RTMP_SIG_SIZE], clientbuf[1:RTMP_SIG_SIZE+1])
} }
return true return nil
} }
// int RTMP_SendPacket(RTMP* r, RTMPPacket* packet, int queue); // int RTMP_SendPacket(RTMP* r, RTMPPacket* packet, int queue);
// rtmp.c +3896 // rtmp.c +3896
func C_RTMP_SendPacket(r *C_RTMP, packet *C_RTMPPacket, queue int) (ok bool) { func C_RTMP_SendPacket(r *C_RTMP, packet *C_RTMPPacket, queue int) error {
var prevPacket *C_RTMPPacket var prevPacket *C_RTMPPacket
var last int var last int
@ -1280,7 +1306,7 @@ func C_RTMP_SendPacket(r *C_RTMP, packet *C_RTMPPacket, queue int) (ok bool) {
if packet.m_headerType > 3 { if packet.m_headerType > 3 {
log.Printf("Sanity failed! trying to send header of type: 0x%02x.", log.Printf("Sanity failed! trying to send header of type: 0x%02x.",
packet.m_headerType) packet.m_headerType)
return false return errInvalidHeader
} }
var headBytes []byte var headBytes []byte
@ -1388,8 +1414,9 @@ func C_RTMP_SendPacket(r *C_RTMP, packet *C_RTMPPacket, queue int) (ok bool) {
nChunkSize = nSize nChunkSize = nSize
} }
if !C_WriteN(r, headBytes[origIdx:][:nChunkSize+hSize]) { err := C_WriteN(r, headBytes[origIdx:][:nChunkSize+hSize])
return false if err != nil {
return err
} }
n := nChunkSize + hSize // Since C_WriteN doesn't return number of bytes written. n := nChunkSize + hSize // Since C_WriteN doesn't return number of bytes written.
@ -1445,7 +1472,7 @@ func C_RTMP_SendPacket(r *C_RTMP, packet *C_RTMPPacket, queue int) (ok bool) {
} }
*(r.m_vecChannelsOut[packet.m_nChannel]) = *packet *(r.m_vecChannelsOut[packet.m_nChannel]) = *packet
return true return nil
} }
// void RTMP_Close(RTMP *r); // void RTMP_Close(RTMP *r);
@ -1459,7 +1486,7 @@ func C_RTMP_Close(r *C_RTMP) {
func C_CloseInternal(r *C_RTMP, reconnect bool) { func C_CloseInternal(r *C_RTMP, reconnect bool) {
var i int32 var i int32
if C_RTMP_IsConnected(r) != 0 { if C_RTMP_IsConnected(r) {
if r.m_stream_id > 0 { if r.m_stream_id > 0 {
i = r.m_stream_id i = r.m_stream_id
if r.Link.protocol&RTMP_FEATURE_WRITE != 0 { if r.Link.protocol&RTMP_FEATURE_WRITE != 0 {
@ -1467,7 +1494,7 @@ func C_CloseInternal(r *C_RTMP, reconnect bool) {
} }
C_SendDeleteStream(r, float64(i)) C_SendDeleteStream(r, float64(i))
} }
err := C_RTMPSockBuf_Close(&r.m_sb) err := r.m_sb.conn.Close()
if err != nil && debugMode { if err != nil && debugMode {
log.Printf("C_RTMPSockBuf_Close error: %v\n", err) log.Printf("C_RTMPSockBuf_Close error: %v\n", err)
} }
@ -1515,7 +1542,7 @@ func C_CloseInternal(r *C_RTMP, reconnect bool) {
/// int RTMP_Write(RTMP* r, const char* buf, int size); /// int RTMP_Write(RTMP* r, const char* buf, int size);
// rtmp.c +5136 // rtmp.c +5136
func C_RTMP_Write(r *C_RTMP, buf []byte) int { func C_RTMP_Write(r *C_RTMP, buf []byte) error {
// TODO: port RTMPPacket // TODO: port RTMPPacket
var pkt = &r.m_write var pkt = &r.m_write
var enc []byte var enc []byte
@ -1527,9 +1554,7 @@ func C_RTMP_Write(r *C_RTMP, buf []byte) int {
for len(buf) != 0 { for len(buf) != 0 {
if pkt.m_nBytesRead == 0 { if pkt.m_nBytesRead == 0 {
if size < minDataSize { if size < minDataSize {
log.Printf("size: %d\n", size) return errTinyPacket
log.Printf("too small \n")
return 0
} }
if buf[0] == 'F' && buf[1] == 'L' && buf[2] == 'V' { if buf[0] == 'F' && buf[1] == 'L' && buf[2] == 'V' {
@ -1558,10 +1583,7 @@ func C_RTMP_Write(r *C_RTMP, buf []byte) int {
pkt.m_headerType = RTMP_PACKET_SIZE_MEDIUM pkt.m_headerType = RTMP_PACKET_SIZE_MEDIUM
} }
// TODO: Port this // TODO: Port this
if !C_RTMPPacket_Alloc(pkt, pkt.m_nBodySize) { C_RTMPPacket_Alloc(pkt, pkt.m_nBodySize)
log.Println("Failed to allocate packet")
return 0
}
enc = pkt.m_body[:pkt.m_nBodySize] enc = pkt.m_body[:pkt.m_nBodySize]
if pkt.m_packetType == RTMP_PACKET_TYPE_INFO { if pkt.m_packetType == RTMP_PACKET_TYPE_INFO {
@ -1581,21 +1603,19 @@ func C_RTMP_Write(r *C_RTMP, buf []byte) int {
pkt.m_nBytesRead += uint32(num) pkt.m_nBytesRead += uint32(num)
buf = buf[num:] buf = buf[num:]
if pkt.m_nBytesRead == pkt.m_nBodySize { if pkt.m_nBytesRead == pkt.m_nBodySize {
// TODO: Port this err := C_RTMP_SendPacket(r, pkt, 0)
ok := C_RTMP_SendPacket(r, pkt, 0)
// TODO: Port this
C_RTMPPacket_Free(pkt) C_RTMPPacket_Free(pkt)
pkt.m_nBytesRead = 0 pkt.m_nBytesRead = 0
if !ok { if err != nil {
return -1 return err
} }
if len(buf) < 4 { if len(buf) < 4 {
return size + (len(buf) - 4) return nil
} }
buf = buf[4:] buf = buf[4:]
} }
} }
return size return nil
} }
var rtmpErrs = [...]string{ var rtmpErrs = [...]string{

View File

@ -84,13 +84,14 @@ func (s *Session) Write(data []byte) (int, error) {
return 0, Err(3) return 0, Err(3)
} }
if C_RTMP_IsConnected(s.rtmp) == 0 { if !C_RTMP_IsConnected(s.rtmp) {
//if C.RTMP_IsConnected(s.rtmp) == 0 {
return 0, Err(1) return 0, Err(1)
} }
if C_RTMP_Write(s.rtmp, data) == 0 { err := C_RTMP_Write(s.rtmp, data)
if err != nil {
//if C.RTMP_Write(s.rtmp, (*byte)(unsafe.Pointer(&data[0])), int32(len(data))) == 0 { //if C.RTMP_Write(s.rtmp, (*byte)(unsafe.Pointer(&data[0])), int32(len(data))) == 0 {
// TODO: propagate err
return 0, Err(2) return 0, Err(2)
} }
return len(data), nil return len(data), nil

View File

@ -33,30 +33,9 @@ LICENSE
package rtmp package rtmp
import (
"errors"
"net"
"strconv"
"time"
)
// int RTMP_Connect(RTMP *r, RTMPPacket* cp); // int RTMP_Connect(RTMP *r, RTMPPacket* cp);
// rtmp.c +1032 // rtmp.c +1032
func C_RTMP_Connect(r *C_RTMP, cp *C_RTMPPacket) error { // MOVED to rtmp.go
if r.Link.host == "" {
return errors.New("Empty host")
}
addr, err := net.ResolveTCPAddr("tcp4", r.Link.host+":"+strconv.Itoa(int(r.Link.port)))
if err != nil {
return err
}
r.m_sb.conn, err = net.DialTCP("tcp4", nil, addr)
if err != nil {
return err
}
r.m_sb.timeout = r.Link.timeout
return C_RTMP_Connect1(r, cp)
}
// int SocksNegotiate(RTMP* r); // int SocksNegotiate(RTMP* r);
// rtmp.c +1062 // rtmp.c +1062
@ -69,21 +48,9 @@ func C_RTMP_Connect(r *C_RTMP, cp *C_RTMPPacket) error {
// int RTMPSockBuf_Send(RTMPSockBuf* sb, const char* buf, int len); // int RTMPSockBuf_Send(RTMPSockBuf* sb, const char* buf, int len);
// rtmp.c +4297 // rtmp.c +4297
// TODO replace send with golang net connection send // DELETED
func C_RTMPSockBuf_Send(sb *C_RTMPSockBuf, buf []byte) (int, error) {
err := sb.conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(sb.timeout)))
if err != nil {
return 0, err
}
return sb.conn.Write(buf)
}
// int // int
// RTMPSockBuf_Close(RTMPSockBuf *sb) // RTMPSockBuf_Close(RTMPSockBuf *sb)
// rtmp.c +4369 // rtmp.c +4369
func C_RTMPSockBuf_Close(sb *C_RTMPSockBuf) error { // DELETED
if sb.conn == nil {
return nil
}
return sb.conn.Close()
}