From 81b92b230201fccb69c93cd255b395bbb7521de2 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sun, 6 Jan 2019 14:42:51 +1030 Subject: [PATCH] All rmtp functions now return an error (or nothing), except for C_RTMP_IsConnected() which now returns a bool instead of an int. --- rtmp/rtmp.go | 296 ++++++++++++++++++++++++++---------------------- rtmp/session.go | 7 +- rtmp/socket.go | 39 +------ 3 files changed, 165 insertions(+), 177 deletions(-) diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index 5c67b2b2..3142389e 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -41,6 +41,7 @@ import ( "io" "log" "math/rand" + "net" "strconv" "time" ) @@ -133,6 +134,12 @@ var ( errHandshake = errors.New("rtmp: handshake failed") errConnSend = errors.New("rtmp: connection send error") errConnStream = errors.New("rtmp: connection stream error") + errInvalidHeader = errors.New("rtmp: invalid header") + errInvalidBody = errors.New("rtmp: invalid body") + errTinyPacket = errors.New("rtmp: packet too small") + errEncoding = errors.New("rtmp: encoding error") + errDecoding = errors.New("rtmp: decoding error") + errCopying = errors.New("rtmp: copying error") ) func startSession(rtmp *C_RTMP, u string, timeout uint) (*C_RTMP, error) { @@ -183,12 +190,11 @@ func C_RTMP_GetTime() int32 { // int RTMPPacket_Alloc(RTMPPacket* p, uint32_t nSize); // rtmp.c +189 -func C_RTMPPacket_Alloc(p *C_RTMPPacket, nSize uint32) (ok bool) { +func C_RTMPPacket_Alloc(p *C_RTMPPacket, nSize uint32) { buf := make([]byte, RTMP_MAX_HEADER_SIZE+nSize) p.m_header = buf p.m_body = buf[RTMP_MAX_HEADER_SIZE:] p.m_nBytesRead = 0 - return true } // void RTMPPacket_Free(RTMPPacket* p); @@ -229,11 +235,11 @@ func C_RTMP_EnableWrite(r *C_RTMP) { // int RTMP_IsConnected(RTMP *r); // rtmp.c +363 -func C_RTMP_IsConnected(r *C_RTMP) int32 { +func C_RTMP_IsConnected(r *C_RTMP) bool { if r.m_sb.conn != nil { - return 1 + return true } - return 0 + return false } // void RTMP_SetBufferMS(RTMP *r, int size); @@ -279,26 +285,41 @@ func C_RTMP_SetupURL(r *C_RTMP, addr string) (err error) { return nil } -// int RTMP_Connect1(RTMP* r, RTMPPacket* cp); -// rtmp.c +978 -func C_RTMP_Connect1(r *C_RTMP, cp *C_RTMPPacket) error { +// int RTMP_Connect(RTMP *r, RTMPPacket* cp); +// rtmp.c +1032 +func C_RTMP_Connect(r *C_RTMP, cp *C_RTMPPacket) error { + addr, err := net.ResolveTCPAddr("tcp4", r.Link.host+":"+strconv.Itoa(int(r.Link.port))) + if err != nil { + return err + } + r.m_sb.conn, err = net.DialTCP("tcp4", nil, addr) + if err != nil { + return err + } + r.m_sb.timeout = r.Link.timeout if debugMode { log.Println("... connected, handshaking...") } - if !C_HandShake(r, 1) { + err = C_HandShake(r, 1) + if err != nil { log.Println("C_RTMP_Connect1: handshake failed!") return errHandshake } if debugMode { log.Println("... handshaked...") } - if !C_SendConnectPacket(r, cp) { + err = C_SendConnectPacket(r, cp) + if err != nil { log.Println("RTMP connect failed!") return errConnSend } return nil } +// int RTMP_Connect1(RTMP* r, RTMPPacket* cp); +// rtmp.c +978 +// DELETED - subsumed by RTMP_Connect + // int RTMP_ConnectStream(RTMP* r, int seekTime); // rtmp.c +1099 // Side effects: r.m_bPlaying is set true upon successful connection @@ -311,8 +332,11 @@ func C_RTMP_ConnectStream(r *C_RTMP, seekTime int32) error { r.m_mediaChannel = 0 - // TODO: read packet - for !r.m_bPlaying && C_RTMP_IsConnected(r) != 0 && C_RTMP_ReadPacket(r, &packet) { + for !r.m_bPlaying && C_RTMP_IsConnected(r) { + err := C_RTMP_ReadPacket(r, &packet) + if err != nil { + break + } // TODO: port is ready if C_RTMPPacket_IsReady(&packet) { if packet.m_nBodySize == 0 { @@ -385,7 +409,8 @@ func C_RTMP_ClientPacket(r *C_RTMP, packet *C_RTMPPacket) int32 { // TODO use new logger here //RTMP_Log(RTMP_LOGDEBUG, "%s, received: invoke %u bytes", __FUNCTION__,packet.m_nBodySize); - if C_HandleInvoke(r, packet.m_body[:packet.m_nBodySize]) { + err := C_HandleInvoke(r, packet.m_body[:packet.m_nBodySize]) + if err != nil { // This will never happen with the methods we implement. log.Println("HasMediaPacket") bHasMediaPacket = 2 @@ -403,60 +428,50 @@ func C_RTMP_ClientPacket(r *C_RTMP, packet *C_RTMPPacket) int32 { // int ReadN(RTMP* r, char* buffer, int n); // rtmp.c +1390 -func C_ReadN(r *C_RTMP, buf []byte) int { +func C_ReadN(r *C_RTMP, buf []byte) error { err := r.m_sb.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(r.m_sb.timeout))) if err != nil { - return 0 + return err } n, err := io.ReadFull(r.m_sb.conn, buf) if err != nil { if debugMode { log.Printf("C_ReadN error: %v\n", err) } - return 0 - } - if n == 0 { - if debugMode { - log.Println("RTMP socket closed by peer") - } C_RTMP_Close(r) - return 0 + return err } r.m_nBytesIn += int32(n) if r.m_nBytesIn > (r.m_nBytesInSent + r.m_nClientBW/10) { - if !C_SendBytesReceived(r) { - return 0 + err := C_SendBytesReceived(r) + if err != nil { + return err } } - return n + return nil } // int WriteN(RTMP* r, const char* buffer, int n); // rtmp.c +1502 -func C_WriteN(r *C_RTMP, buf []byte) (ok bool) { - for len(buf) != 0 { - n, err := C_RTMPSockBuf_Send(&r.m_sb, buf) - if n < 0 { - if debugMode { - log.Printf("C_WriteN, RTMP send error: %v\n", err) - } - C_RTMP_Close(r) - return false - } - - if n == 0 { - break - } - buf = buf[n:] +func C_WriteN(r *C_RTMP, buf []byte) error { + err := r.m_sb.conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(r.m_sb.timeout))) + if err != nil { + return err } - - // !ok here is equivalent to io.ErrShortWrite. - return len(buf) == 0 + _, err = r.m_sb.conn.Write(buf) + if err != nil { + if debugMode { + log.Printf("C_WriteN, RTMP send error: %v\n", err) + } + C_RTMP_Close(r) + return err + } + return nil } // int SendConnectPacket(RTMP* r, RTMPPacket* cp); // rtmp.c +1579 -func C_SendConnectPacket(r *C_RTMP, cp *C_RTMPPacket) (ok bool) { +func C_SendConnectPacket(r *C_RTMP, cp *C_RTMPPacket) error { if cp != nil { return C_RTMP_SendPacket(r, cp, 1) } @@ -484,60 +499,60 @@ func C_SendConnectPacket(r *C_RTMP, cp *C_RTMPPacket) (ok bool) { enc = C_AMF_EncodeNamedString(enc, av_app, r.Link.app) if enc == nil { - return false + return errEncoding } if r.Link.protocol&RTMP_FEATURE_WRITE != 0 { enc = C_AMF_EncodeNamedString(enc, av_type, av_nonprivate) if enc == nil { - return false + return errEncoding } } if r.Link.flashVer != "" { enc = C_AMF_EncodeNamedString(enc, av_flashVer, r.Link.flashVer) if enc == nil { - return false + return errEncoding } } if r.Link.swfUrl != "" { enc = C_AMF_EncodeNamedString(enc, av_swfUrl, r.Link.swfUrl) if enc == nil { - return false + return errEncoding } } if r.Link.tcUrl != "" { enc = C_AMF_EncodeNamedString(enc, av_tcUrl, r.Link.tcUrl) if enc == nil { - return false + return errEncoding } } if r.Link.protocol&RTMP_FEATURE_WRITE == 0 { enc = C_AMF_EncodeNamedBoolean(enc, av_fpad, false) if enc == nil { - return false + return errEncoding } enc = C_AMF_EncodeNamedNumber(enc, av_capabilities, 15) if enc == nil { - return false + return errEncoding } enc = C_AMF_EncodeNamedNumber(enc, av_audioCodecs, r.m_fAudioCodecs) if enc == nil { - return false + return errEncoding } enc = C_AMF_EncodeNamedNumber(enc, av_videoCodecs, r.m_fVideoCodecs) if enc == nil { - return false + return errEncoding } enc = C_AMF_EncodeNamedNumber(enc, av_videoFunction, 1) if enc == nil { - return false + return errEncoding } if r.Link.pageUrl != "" { enc = C_AMF_EncodeNamedString(enc, av_pageUrl, r.Link.pageUrl) if enc == nil { - return false + return errEncoding } } } @@ -545,12 +560,12 @@ func C_SendConnectPacket(r *C_RTMP, cp *C_RTMPPacket) (ok bool) { if r.m_fEncoding != 0.0 || r.m_bSendEncoding { enc = C_AMF_EncodeNamedNumber(enc, av_objectEncoding, r.m_fEncoding) if enc == nil { - return false + return errEncoding } } if copy(enc, []byte{0, 0, AMF_OBJECT_END}) != 3 { - return false + return errCopying // TODO: is this even possible? } enc = enc[3:] @@ -558,18 +573,18 @@ func C_SendConnectPacket(r *C_RTMP, cp *C_RTMPPacket) (ok bool) { if r.Link.auth != "" { enc = C_AMF_EncodeBoolean(enc, r.Link.lFlags&RTMP_LF_AUTH != 0) if enc == nil { - return false + return errEncoding } enc = C_AMF_EncodeString(enc, r.Link.auth) if enc == nil { - return false + return errEncoding } } for i := range r.Link.extras.o_props { enc = C_AMF_PropEncode(&r.Link.extras.o_props[i], enc) if enc == nil { - return false + return errEncoding } } @@ -580,7 +595,7 @@ func C_SendConnectPacket(r *C_RTMP, cp *C_RTMPPacket) (ok bool) { // int RTMP_SendCreateStream(RTMP* r); // rtmp.c +1725 -func C_RTMP_SendCreateStream(r *C_RTMP) (ok bool) { +func C_RTMP_SendCreateStream(r *C_RTMP) error { var pbuf [256]byte packet := C_RTMPPacket{ m_nChannel: 0x03, /* control channel (invoke) */ @@ -607,7 +622,7 @@ func C_RTMP_SendCreateStream(r *C_RTMP) (ok bool) { // int SendReleaseStream(RTMP* r); // rtmp.c +1816 -func C_SendReleaseStream(r *C_RTMP) (ok bool) { +func C_SendReleaseStream(r *C_RTMP) error { var pbuf [1024]byte packet := C_RTMPPacket{ m_nChannel: 0x03, /* control channel (invoke) */ @@ -628,7 +643,7 @@ func C_SendReleaseStream(r *C_RTMP) (ok bool) { enc = enc[1:] enc = C_AMF_EncodeString(enc, r.Link.playpath) if enc == nil { - return false + return errEncoding } packet.m_nBodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) @@ -638,7 +653,7 @@ func C_SendReleaseStream(r *C_RTMP) (ok bool) { // int SendFCPublish(RTMP* r); // rtmp.c +1846 -func C_SendFCPublish(r *C_RTMP) (ok bool) { +func C_SendFCPublish(r *C_RTMP) error { var pbuf [1024]byte packet := C_RTMPPacket{ m_nChannel: 0x03, /* control channel (invoke) */ @@ -659,7 +674,7 @@ func C_SendFCPublish(r *C_RTMP) (ok bool) { enc = enc[1:] enc = C_AMF_EncodeString(enc, r.Link.playpath) if enc == nil { - return false + return errEncoding } packet.m_nBodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) @@ -669,7 +684,7 @@ func C_SendFCPublish(r *C_RTMP) (ok bool) { // int SendFCUnpublish(RTMP *r); // rtmp.c +1875 -func C_SendFCUnpublish(r *C_RTMP) (ok bool) { +func C_SendFCUnpublish(r *C_RTMP) error { var pbuf [1024]byte packet := C_RTMPPacket{ m_nChannel: 0x03, /* control channel (invoke) */ @@ -691,7 +706,7 @@ func C_SendFCUnpublish(r *C_RTMP) (ok bool) { enc = C_AMF_EncodeString(enc, r.Link.playpath) if enc == nil { - return false + return errEncoding } packet.m_nBodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) @@ -701,7 +716,7 @@ func C_SendFCUnpublish(r *C_RTMP) (ok bool) { // int SendPublish(RTMP* r); // rtmp.c +1908 -func C_SendPublish(r *C_RTMP) (ok bool) { +func C_SendPublish(r *C_RTMP) error { var pbuf [1024]byte packet := C_RTMPPacket{ m_nChannel: 0x04, /* source channel (invoke) */ @@ -721,14 +736,12 @@ func C_SendPublish(r *C_RTMP) (ok bool) { enc[0] = AMF_NULL enc = enc[1:] enc = C_AMF_EncodeString(enc, r.Link.playpath) - if enc == nil { - return false + return errEncoding } - enc = C_AMF_EncodeString(enc, av_live) if enc == nil { - return false + return errEncoding } packet.m_nBodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) @@ -739,7 +752,7 @@ func C_SendPublish(r *C_RTMP) (ok bool) { // int // SendDeleteStream(RTMP *r, double dStreamId) // rtmp.c +1942 -func C_SendDeleteStream(r *C_RTMP, dStreamId float64) (ok bool) { +func C_SendDeleteStream(r *C_RTMP, dStreamId float64) error { var pbuf [256]byte packet := C_RTMPPacket{ m_nChannel: 0x03, /* control channel (invoke) */ @@ -754,12 +767,20 @@ func C_SendDeleteStream(r *C_RTMP, dStreamId float64) (ok bool) { enc := packet.m_body enc = C_AMF_EncodeString(enc, av_deleteStream) + if enc == nil { + return errEncoding + } r.m_numInvokes++ enc = C_AMF_EncodeNumber(enc, float64(r.m_numInvokes)) + if enc == nil { + return errEncoding + } enc[0] = AMF_NULL enc = enc[1:] enc = C_AMF_EncodeNumber(enc, dStreamId) - + if enc == nil { + return errEncoding + } packet.m_nBodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) /* no response expected */ @@ -768,7 +789,7 @@ func C_SendDeleteStream(r *C_RTMP, dStreamId float64) (ok bool) { // int SendBytesReceived(RTMP* r); // rtmp.c +2080 -func C_SendBytesReceived(r *C_RTMP) (ok bool) { +func C_SendBytesReceived(r *C_RTMP) error { var pbuf [256]byte packet := C_RTMPPacket{ m_nChannel: 0x02, /* control channel (invoke) */ @@ -792,7 +813,7 @@ func C_SendBytesReceived(r *C_RTMP) (ok bool) { // int SendCheckBW(RTMP* r); // rtmp.c +2105 -func C_SendCheckBW(r *C_RTMP) (ok bool) { +func C_SendCheckBW(r *C_RTMP) error { var pbuf [256]byte packet := C_RTMPPacket{ m_nChannel: 0x03, /* control channel (invoke) */ @@ -828,19 +849,14 @@ func C_AV_erase(m []C_RTMP_METHOD, i int) []C_RTMP_METHOD { // int HandleInvoke(RTMP* r, const char* body, unsigned int nBodySize); // rtmp.c +2912 // Side effects: r.m_bPlaying set to true upon av_NetStream_Publish_Start -func C_HandleInvoke(r *C_RTMP, body []byte) (ok bool) { +func C_HandleInvoke(r *C_RTMP, body []byte) error { if body[0] != 0x02 { - // TODO use new logger here - //RTMP_Log(RTMP_LOGWARNING, "%s, Sanity failed. no string method in invoke packet", - //__FUNCTION__); - return false + return errInvalidBody } var obj C_AMFObject nRes := C_AMF_Decode(&obj, body, 0) if nRes < 0 { - // TODO use new logger here - //RTMP_Log(RTMP_LOGERROR, "%s, error decoding invoke packet", __FUNCTION__); - return false + return errDecoding } // NOTE we don't really need this ?? still functions without it @@ -969,7 +985,7 @@ func C_HandleInvoke(r *C_RTMP, body []byte) (ok bool) { leave: C_AMF_Reset(&obj) // None of the methods we implement will result in a true return. - return ok + return nil } // void HandleChangeChunkSize(RTMP* r, const RTMPPacket* packet); @@ -1022,13 +1038,14 @@ func C_EncodeInt32LE(dst []byte, v int32) int32 { // int RTMP_ReadPacket(RTMP* r, RTMPPacket* packet); // rtmp.c +3550 -func C_RTMP_ReadPacket(r *C_RTMP, packet *C_RTMPPacket) (ok bool) { +func C_RTMP_ReadPacket(r *C_RTMP, packet *C_RTMPPacket) error { var hbuf [RTMP_MAX_HEADER_SIZE]byte header := hbuf[:] - if C_ReadN(r, header[:1]) != 1 { + err := C_ReadN(r, header[:1]) + if err != nil { log.Println("C_RTMP_ReadPacket: failed to read RTMP packet header!") - return false + return err } packet.m_headerType = (header[0] & 0xc0) >> 6 packet.m_nChannel = int32(header[0] & 0x3f) @@ -1036,17 +1053,19 @@ func C_RTMP_ReadPacket(r *C_RTMP, packet *C_RTMPPacket) (ok bool) { switch { case packet.m_nChannel == 0: - if C_ReadN(r, header[:1]) != 1 { + err = C_ReadN(r, header[:1]) + if err != nil { log.Println("C_RTMP_ReadPacket: failed to read rtmp packet header 2nd byte.") - return false + return err } header = header[1:] packet.m_nChannel = int32(header[0]) + 64 case packet.m_nChannel == 1: - if C_ReadN(r, header[:2]) != 2 { + err = C_ReadN(r, header[:2]) + if err != nil { log.Println("C_RTMP_ReadPacket: failed to read RTMP packet 3rd byte") - return false + return err } header = header[2:] packet.m_nChannel = int32(binary.BigEndian.Uint16(header[:2])) + 64 @@ -1088,9 +1107,12 @@ func C_RTMP_ReadPacket(r *C_RTMP, packet *C_RTMPPacket) (ok bool) { nSize-- - if nSize > 0 && C_ReadN(r, header[:nSize]) != int(nSize) { - log.Println("C_RTMP_ReadPacket: failed to read rtmp packet header.") - return false + if nSize > 0 { + err = C_ReadN(r, header[:nSize]) + if err != nil { + log.Println("C_RTMP_ReadPacket: failed to read rtmp packet header.") + return err + } } hSize := len(hbuf) - len(header) + nSize @@ -1114,9 +1136,10 @@ func C_RTMP_ReadPacket(r *C_RTMP, packet *C_RTMPPacket) (ok bool) { extendedTimestamp := packet.m_nTimeStamp == 0xffffff if extendedTimestamp { - if C_ReadN(r, header[nSize:nSize+4]) != 4 { + err = C_ReadN(r, header[nSize:nSize+4]) + if err != nil { log.Println("RTMPRead_Packet: Failed to read extended timestamp") - return false + return err } // TODO: port this packet.m_nTimeStamp = C_AMF_DecodeInt32(header[nSize : nSize+4]) @@ -1125,10 +1148,7 @@ func C_RTMP_ReadPacket(r *C_RTMP, packet *C_RTMPPacket) (ok bool) { if packet.m_nBodySize > 0 && packet.m_body == nil { // TODO: port this - if !C_RTMPPacket_Alloc(packet, packet.m_nBodySize) { - log.Println("RTMPRead_Packet: failed to allocate packet") - return false - } + C_RTMPPacket_Alloc(packet, packet.m_nBodySize) packet.m_headerType = (hbuf[0] & 0xc0) >> 6 } @@ -1145,9 +1165,10 @@ func C_RTMP_ReadPacket(r *C_RTMP, packet *C_RTMPPacket) (ok bool) { packet.m_chunk.c_chunk = packet.m_body[packet.m_nBytesRead : packet.m_nBytesRead+uint32(nChunk)] } - if C_ReadN(r, packet.m_body[packet.m_nBytesRead:][:nChunk]) != int(nChunk) { + err = C_ReadN(r, packet.m_body[packet.m_nBytesRead:][:nChunk]) + if err != nil { log.Println("C_RTMP_ReadPacket: failed to read RTMP packet body") - return false + return err } packet.m_nBytesRead += uint32(nChunk) @@ -1176,12 +1197,12 @@ func C_RTMP_ReadPacket(r *C_RTMP, packet *C_RTMPPacket) (ok bool) { } else { packet.m_body = nil /* so it won't be erased on free */ } - return true + return nil } // int HandShake(RTMP* r, int FP9HandShake); // rtmp.c +3744 -func C_HandShake(r *C_RTMP, FP9HandShake int32) (ok bool) { +func C_HandShake(r *C_RTMP, FP9HandShake int32) error { var clientbuf [RTMP_SIG_SIZE + 1]byte clientsig := clientbuf[1:] @@ -1196,13 +1217,15 @@ func C_HandShake(r *C_RTMP, FP9HandShake int32) (ok bool) { clientsig[i] = byte(rand.Intn(256)) } - if !C_WriteN(r, clientbuf[:]) { - return false + err := C_WriteN(r, clientbuf[:]) + if err != nil { + return err } var typ [1]byte - if C_ReadN(r, typ[:]) != 1 { - return false + err = C_ReadN(r, typ[:]) + if err != nil { + return err } if debugMode { @@ -1212,8 +1235,9 @@ func C_HandShake(r *C_RTMP, FP9HandShake int32) (ok bool) { log.Printf("C_HandShake: type mismatch: client sent %v, server sent: %v\n", clientbuf[0], typ) } - if C_ReadN(r, serversig[:]) != RTMP_SIG_SIZE { - return false + err = C_ReadN(r, serversig[:]) + if err != nil { + return err } // decode server response @@ -1224,24 +1248,26 @@ func C_HandShake(r *C_RTMP, FP9HandShake int32) (ok bool) { // serversig[4], serversig[5], serversig[6], serversig[7]) // 2nd part of handshake - if !C_WriteN(r, serversig[:]) { - return false + err = C_WriteN(r, serversig[:]) + if err != nil { + return err } - if C_ReadN(r, serversig[:]) != RTMP_SIG_SIZE { - return false + err = C_ReadN(r, serversig[:]) + if err != nil { + return err } if !bytes.Equal(serversig[:RTMP_SIG_SIZE], clientbuf[1:RTMP_SIG_SIZE+1]) { log.Printf("Client signature does not match: %q != %q", serversig[:RTMP_SIG_SIZE], clientbuf[1:RTMP_SIG_SIZE+1]) } - return true + return nil } // int RTMP_SendPacket(RTMP* r, RTMPPacket* packet, int queue); // rtmp.c +3896 -func C_RTMP_SendPacket(r *C_RTMP, packet *C_RTMPPacket, queue int) (ok bool) { +func C_RTMP_SendPacket(r *C_RTMP, packet *C_RTMPPacket, queue int) error { var prevPacket *C_RTMPPacket var last int @@ -1280,7 +1306,7 @@ func C_RTMP_SendPacket(r *C_RTMP, packet *C_RTMPPacket, queue int) (ok bool) { if packet.m_headerType > 3 { log.Printf("Sanity failed! trying to send header of type: 0x%02x.", packet.m_headerType) - return false + return errInvalidHeader } var headBytes []byte @@ -1388,8 +1414,9 @@ func C_RTMP_SendPacket(r *C_RTMP, packet *C_RTMPPacket, queue int) (ok bool) { nChunkSize = nSize } - if !C_WriteN(r, headBytes[origIdx:][:nChunkSize+hSize]) { - return false + err := C_WriteN(r, headBytes[origIdx:][:nChunkSize+hSize]) + if err != nil { + return err } n := nChunkSize + hSize // Since C_WriteN doesn't return number of bytes written. @@ -1445,7 +1472,7 @@ func C_RTMP_SendPacket(r *C_RTMP, packet *C_RTMPPacket, queue int) (ok bool) { } *(r.m_vecChannelsOut[packet.m_nChannel]) = *packet - return true + return nil } // void RTMP_Close(RTMP *r); @@ -1459,7 +1486,7 @@ func C_RTMP_Close(r *C_RTMP) { func C_CloseInternal(r *C_RTMP, reconnect bool) { var i int32 - if C_RTMP_IsConnected(r) != 0 { + if C_RTMP_IsConnected(r) { if r.m_stream_id > 0 { i = r.m_stream_id if r.Link.protocol&RTMP_FEATURE_WRITE != 0 { @@ -1467,7 +1494,7 @@ func C_CloseInternal(r *C_RTMP, reconnect bool) { } C_SendDeleteStream(r, float64(i)) } - err := C_RTMPSockBuf_Close(&r.m_sb) + err := r.m_sb.conn.Close() if err != nil && debugMode { log.Printf("C_RTMPSockBuf_Close error: %v\n", err) } @@ -1515,7 +1542,7 @@ func C_CloseInternal(r *C_RTMP, reconnect bool) { /// int RTMP_Write(RTMP* r, const char* buf, int size); // rtmp.c +5136 -func C_RTMP_Write(r *C_RTMP, buf []byte) int { +func C_RTMP_Write(r *C_RTMP, buf []byte) error { // TODO: port RTMPPacket var pkt = &r.m_write var enc []byte @@ -1527,9 +1554,7 @@ func C_RTMP_Write(r *C_RTMP, buf []byte) int { for len(buf) != 0 { if pkt.m_nBytesRead == 0 { if size < minDataSize { - log.Printf("size: %d\n", size) - log.Printf("too small \n") - return 0 + return errTinyPacket } if buf[0] == 'F' && buf[1] == 'L' && buf[2] == 'V' { @@ -1558,10 +1583,7 @@ func C_RTMP_Write(r *C_RTMP, buf []byte) int { pkt.m_headerType = RTMP_PACKET_SIZE_MEDIUM } // TODO: Port this - if !C_RTMPPacket_Alloc(pkt, pkt.m_nBodySize) { - log.Println("Failed to allocate packet") - return 0 - } + C_RTMPPacket_Alloc(pkt, pkt.m_nBodySize) enc = pkt.m_body[:pkt.m_nBodySize] if pkt.m_packetType == RTMP_PACKET_TYPE_INFO { @@ -1581,21 +1603,19 @@ func C_RTMP_Write(r *C_RTMP, buf []byte) int { pkt.m_nBytesRead += uint32(num) buf = buf[num:] if pkt.m_nBytesRead == pkt.m_nBodySize { - // TODO: Port this - ok := C_RTMP_SendPacket(r, pkt, 0) - // TODO: Port this + err := C_RTMP_SendPacket(r, pkt, 0) C_RTMPPacket_Free(pkt) pkt.m_nBytesRead = 0 - if !ok { - return -1 + if err != nil { + return err } if len(buf) < 4 { - return size + (len(buf) - 4) + return nil } buf = buf[4:] } } - return size + return nil } var rtmpErrs = [...]string{ diff --git a/rtmp/session.go b/rtmp/session.go index f07107e7..ff2cbd83 100644 --- a/rtmp/session.go +++ b/rtmp/session.go @@ -84,13 +84,14 @@ func (s *Session) Write(data []byte) (int, error) { return 0, Err(3) } - if C_RTMP_IsConnected(s.rtmp) == 0 { - //if C.RTMP_IsConnected(s.rtmp) == 0 { + if !C_RTMP_IsConnected(s.rtmp) { return 0, Err(1) } - if C_RTMP_Write(s.rtmp, data) == 0 { + err := C_RTMP_Write(s.rtmp, data) + if err != nil { //if C.RTMP_Write(s.rtmp, (*byte)(unsafe.Pointer(&data[0])), int32(len(data))) == 0 { + // TODO: propagate err return 0, Err(2) } return len(data), nil diff --git a/rtmp/socket.go b/rtmp/socket.go index 453d079c..974b4ebf 100644 --- a/rtmp/socket.go +++ b/rtmp/socket.go @@ -33,30 +33,9 @@ LICENSE package rtmp -import ( - "errors" - "net" - "strconv" - "time" -) - // int RTMP_Connect(RTMP *r, RTMPPacket* cp); // rtmp.c +1032 -func C_RTMP_Connect(r *C_RTMP, cp *C_RTMPPacket) error { - if r.Link.host == "" { - return errors.New("Empty host") - } - addr, err := net.ResolveTCPAddr("tcp4", r.Link.host+":"+strconv.Itoa(int(r.Link.port))) - if err != nil { - return err - } - r.m_sb.conn, err = net.DialTCP("tcp4", nil, addr) - if err != nil { - return err - } - r.m_sb.timeout = r.Link.timeout - return C_RTMP_Connect1(r, cp) -} +// MOVED to rtmp.go // int SocksNegotiate(RTMP* r); // rtmp.c +1062 @@ -69,21 +48,9 @@ func C_RTMP_Connect(r *C_RTMP, cp *C_RTMPPacket) error { // int RTMPSockBuf_Send(RTMPSockBuf* sb, const char* buf, int len); // rtmp.c +4297 -// TODO replace send with golang net connection send -func C_RTMPSockBuf_Send(sb *C_RTMPSockBuf, buf []byte) (int, error) { - err := sb.conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(sb.timeout))) - if err != nil { - return 0, err - } - return sb.conn.Write(buf) -} +// DELETED // int // RTMPSockBuf_Close(RTMPSockBuf *sb) // rtmp.c +4369 -func C_RTMPSockBuf_Close(sb *C_RTMPSockBuf) error { - if sb.conn == nil { - return nil - } - return sb.conn.Close() -} +// DELETED