/* NAME rtmp.go DESCRIPTION See Readme.md AUTHOR Saxon Nelson-Milton Dan Kortschak Jake Lane LICENSE rtmp.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. */ package rtmp /* #cgo CFLAGS: -I/usr/local/include/librtmp #cgo LDFLAGS: -L/usr/local/lib -lrtmp #include #include #include #include #include #include #include typedef enum { RTMPT_OPEN=0, RTMPT_SEND, RTMPT_IDLE, RTMPT_CLOSE } RTMPTCmd; typedef struct sockaddr_in sockaddr_in; typedef struct sockaddr sockaddr; int add_addr_info(struct sockaddr_in *service, AVal *host, int port); RTMP* start_session(RTMP* rtmp, char* url, uint connect_timeout); int write_frame(RTMP* rtmp, char* data, uint data_length); int end_session(RTMP* rtmp); void AV_queue(RTMP_METHOD **vals, int *num, AVal *av, int txn); int writeN(RTMP *r, const char *buffer, int n); int EncodeInt32LE(char *output, int nVal); int HTTP_Post(RTMP *r, RTMPTCmd cmd, const char *buf, int len); */ import "C" import ( "errors" "fmt" "log" "math" "math/rand" "reflect" "strconv" "unsafe" "github.com/chamaken/cgolmnl/inet" ) const ( RTMPT_OPEN = iota RTMPT_SEND RTMPT_IDLE RTMPT_CLOSE ) const ( AMF_NUMBER = iota AMF_BOOLEAN AMF_STRING AMF_OBJECT AMF_MOVIECLIP /* reserved, not used */ AMF_NULL AMF_UNDEFINED AMF_REFERENCE AMF_ECMA_ARRAY AMF_OBJECT_END AMF_STRICT_ARRAY AMF_DATE AMF_LONG_STRING AMF_UNSUPPORTED AMF_RECORDSET /* reserved, not used */ AMF_XML_DOC AMF_TYPED_OBJECT AMF_AVMPLUS /* switch to AMF3 */ ) const ( RTMP_PACKET_TYPE_CHUNK_SIZE = 0x01 RTMP_PACKET_TYPE_BYTES_READ_REPORT = 0x03 RTMP_PACKET_TYPE_CONTROL = 0x04 RTMP_PACKET_TYPE_SERVER_BW = 0x05 RTMP_PACKET_TYPE_CLIENT_BW = 0x06 RTMP_PACKET_TYPE_AUDIO = 0x08 RTMP_PACKET_TYPE_VIDEO = 0x09 RTMP_PACKET_TYPE_FLEX_STREAM_SEND = 0x0F RTMP_PACKET_TYPE_FLEX_SHARED_OBJECT = 0x10 RTMP_PACKET_TYPE_FLEX_MESSAGE = 0x11 RTMP_PACKET_TYPE_INFO = 0x12 RTMP_PACKET_TYPE_SHARED_OBJECT = 0x13 RTMP_PACKET_TYPE_INVOKE = 0x14 RTMP_PACKET_TYPE_FLASH_VIDEO = 0x16 RTMP_MAX_HEADER_SIZE = 18 RTMP_PACKET_SIZE_LARGE = 0 RTMP_PACKET_SIZE_MEDIUM = 1 RTMP_PACKET_SIZE_SMALL = 2 RTMP_PACKET_SIZE_MINIMUM = 3 RTMP_READ_HEADER = 0x01 RTMP_READ_RESUME = 0x02 RTMP_READ_NO_IGNORE = 0x04 RTMP_READ_GOTKF = 0x08 RTMP_READ_GOTFLVK = 0x10 RTMP_READ_SEEKING = 0x20 RTMP_READ_COMPLETE = -3 RTMP_READ_ERROR = -2 RTMP_READ_EOF = -1 RTMP_READ_IGNORE = 0 RTMP_LF_AUTH = 0x0001 /* using auth param */ RTMP_LF_LIVE = 0x0002 /* stream is live */ RTMP_LF_SWFV = 0x0004 /* do SWF verification */ RTMP_LF_PLST = 0x0008 /* send playlist before play */ RTMP_LF_BUFX = 0x0010 /* toggle stream on BufferEmpty msg */ RTMP_LF_FTCU = 0x0020 /* free tcUrl on close */ RTMP_LF_FAPU = 0x0040 /* free app on close */ RTMP_FEATURE_HTTP = 0x01 RTMP_FEATURE_ENC = 0x02 RTMP_FEATURE_SSL = 0x04 RTMP_FEATURE_MFP = 0x08 /* not yet supported */ RTMP_FEATURE_WRITE = 0x10 /* publish, not play */ RTMP_FEATURE_HTTP2 = 0x20 /* server-side rtmpt */ RTMP_PROTOCOL_UNDEFINED = -1 RTMP_PROTOCOL_RTMP = 0 RTMP_PROTOCOL_RTMPE = RTMP_FEATURE_ENC RTMP_PROTOCOL_RTMPT = RTMP_FEATURE_HTTP RTMP_PROTOCOL_RTMPS = RTMP_FEATURE_SSL RTMP_PROTOCOL_RTMPTE = (RTMP_FEATURE_HTTP | RTMP_FEATURE_ENC) RTMP_PROTOCOL_RTMPTS = (RTMP_FEATURE_HTTP | RTMP_FEATURE_SSL) RTMP_PROTOCOL_RTMFP = RTMP_FEATURE_MFP RTMP_DEFAULT_CHUNKSIZE = 128 RTMP_BUFFER_CACHE_SIZE = (16 * 1024) RTMP_CHANNELS = 65600 RTMP_SWF_HASHLEN = 32 RTMP_SIG_SIZE = 1536 RTMP_LARGE_HEADER_SIZE = 12 AMF_INVALID = 0xff ) const ( minDataSize = 11 debugMode = false ) const ( byteSize = 1 int32Size = 4 int64Size = 8 ) // av_setDataFrame is a static const global in rtmp.c var ( setDataFrame = AVC("@setDataFrame") av_connect = AVC("connect") av_app = AVC("app") av_type = AVC("type") av_nonprivate = AVC("nonprivate") av_flashVer = AVC("flashVer") av_swfUrl = AVC("swfUrl") av_tcUrl = AVC("tcUrl") av_fpad = AVC("fpad") av_capabilities = AVC("capabilities") av_audioCodecs = AVC("audioCodecs") av_videoCodecs = AVC("videoCodecs") av_videoFunction = AVC("videoFunction") av_pageUrl = AVC("pageUrl") av_objectEncoding = AVC("objectEncoding") ) var packetSize = [...]int{12, 8, 4, 1} var RTMPProtocolStringsLower = [...]string{ "rtmp", "rtmpt", "rtmpe", "rtmpte", "rtmps", "rtmpts", "", "", "rtmfp", } // Session provides an interface for sending flv tags over rtmp. type Session interface { Open() error Write([]byte) (int, error) Close() error } // session provides parameters required for an rtmp communication session. type session struct { rtmp *C.RTMP url string timeout uint } type RTMP struct { m_inChunkSize int m_outChunkSize int m_nBWCheckCounter int m_nBytesIn int m_nBytesInSent int m_nBufferMS int m_stream_id int m_mediaChannel int m_mediaStamp uint32 m_pauseStamp uint32 m_pausing int m_nServerBw int m_nClientBw int m_nClientBw2 uint8 m_bPlaying uint8 m_bSendEncoding uint8 m_bSendCounter uint8 m_numInvokes int m_numCalls int m_methodCalls *RTMP_METHOD m_channelsAllocatedIn int m_channelsAllocatedOut int m_vecChannelsIn **RTMPPacket m_vecChannelsOut **RTMPPacket m_channelTimestamp *int m_fAudioCodecs float64 m_fVideoCodecs float64 m_fEncoding float64 m_fDuration float64 m_msgCounter int m_polling int m_resplen int m_unackd int m_clientID AVal m_read RTMP_READ m_write RTMPPacket m_sb RTMPSockBuf Link RTMP_LNK } type RTMPPacket struct { m_headerType uint8 m_packetType uint8 m_hasAbsTimestamp uint8 m_nChannel int m_nTimeStamp uint32 m_nInfoField2 int32 m_nBodySize uint32 m_nBytesRead uint32 m_chunk *RTMPChunk m_body *byte } type RTMP_METHOD struct { name AVal num int } type AVal struct { av_val *byte av_len int } type RTMP_READ struct { buf *byte bufpos *byte buflen uint timestamp uint32 dataType uint8 flags uint8 status int8 initialFrameType uint8 nResumeTS uint32 metaHeader *byte initialFrame *byte nMetaHeaderSize uint32 nInitialFrameSize uint32 nIgnoredFrameCounter uint32 nIgnoredFlvFrameCounter uint32 } type RTMPSockBuf struct { sb_socket int sb_size int sb_start *byte sb_buf [RTMP_BUFFER_CACHE_SIZE]byte // port const sb_timedout int sb_ssl uintptr } type RTMPChunk struct { c_headerSize int c_chunkSize int c_chunk *byte c_header [RTMP_MAX_HEADER_SIZE]byte } type RTMP_LNK struct { hostname AVal sockshost AVal playpath0 AVal playpath AVal tcUrl AVal swfUrl AVal pageUrl AVal app AVal auth AVal flashVer AVal subscribepath AVal usherToken AVal token AVal pubUser AVal pubPasswd AVal extras AMFObject edepth int seekTime int stopTime int lFlags int swfAge int protocol int timeout int pFlags int socksport uint16 port uint16 } type AMFObject struct { o_num int o_props *C.AMFObjectProperty } var _ Session = (*session)(nil) // NewSession returns a new session. func NewSession(url string, connectTimeout uint) Session { return &session{ url: url, timeout: connectTimeout, } } // Open establishes an rtmp connection with the url passed into the // constructor func (s *session) Open() error { if s.rtmp != nil { return errors.New("rtmp: attempt to start already running session") } var err error s.rtmp, err = startSession(s.rtmp, s.url, uint32(s.timeout)) if s.rtmp == nil { return err } return nil } // Close terminates the rtmp connection func (s *session) Close() error { if s.rtmp == nil { return Err(3) } ret := endSession(s.rtmp) s.rtmp = nil if ret != 0 { return Err(ret) } return nil } // Write writes a frame (flv tag) to the rtmp connection func (s *session) Write(data []byte) (int, error) { if s.rtmp == nil { return 0, Err(3) } // if rtmpIsConnected(s.rtmp) == 0 { if C.RTMP_IsConnected(s.rtmp) == 0 { return 0, Err(1) } // if rtmpWrite(s.rtmp, data) == 0 { if C.RTMP_Write(s.rtmp, (*C.char)(unsafe.Pointer(&data[0])), C.int(len(data))) == 0 { return 0, Err(2) } return len(data), nil } func rtmpIsConnected(r *C.RTMP) int { if r.m_sb.sb_socket != -1 { return 1 } return 0 } func startSession(rtmp *C.RTMP, u string, timeout uint32) (*C.RTMP, error) { connect_timeout := C.int(timeout) rtmp = rtmpAlloc() //rtmp = C.RTMP_Alloc() rtmpInit(rtmp) //C.RTMP_Init(rtmp) rtmp.Link.timeout = connect_timeout if rtmpSetupUrl(rtmp, u) == 0 { // if C.RTMP_SetupURL(rtmp, C.CString(u)) == 0 { //C.RTMP_Close(rtmp) //C.RTMP_Free(rtmp) return nil, errors.New("rtmp startSession: Failed to setup URL!") } rtmpEnableWrite(rtmp) //C.RTMP_EnableWrite(rtmp) rtmpSetBufferMS(rtmp, 3600*1000) //C.RTMP_SetBufferMS(rtmp, 3600*1000) if rtmpConnect(rtmp, nil) == 0 { //if C.RTMP_Connect(rtmp, nil) == 0 { //C.RTMP_Close(rtmp) //C.RTMP_Free(rtmp) return nil, errors.New("rtmp startSession: Failed to connect!") } // TODO: port this // if rtmpConnectStream(rtmp, 0) == 0 { if C.RTMP_ConnectStream(rtmp, 0) == 0 { //C.RTMP_Close(rtmp) //C.RTMP_Free(rtmp) return nil, errors.New("rtmp startSession: Failed to connect stream!") } return rtmp, nil } func rtmpAlloc() *C.RTMP { var r C.RTMP //return (*C.RTMP)(C.malloc(C.size_t(unsafe.Sizeof(r)))) return (*C.RTMP)(allocate(unsafe.Sizeof(r))) } func rtmpInit(r *C.RTMP) { r.m_sb.sb_socket = -1 r.m_inChunkSize = RTMP_DEFAULT_CHUNKSIZE r.m_outChunkSize = RTMP_DEFAULT_CHUNKSIZE r.m_nBufferMS = 30000 r.m_nClientBW = 2500000 r.m_nClientBW2 = 2 r.m_nServerBW = 2500000 r.m_fAudioCodecs = 3191.0 r.m_fVideoCodecs = 252.0 r.Link.timeout = 30 r.Link.swfAge = 30 } func rtmpSetupUrl(r *C.RTMP, u string) int32 { url := goStrToCStr(u) var ret, len int32 var port uint32 port = 0 len = strlen(url) // TODO: port this ret = int32(C.RTMP_ParseURL((*C.char)(unsafe.Pointer(url)), &r.Link.protocol, &r.Link.hostname, (*C.uint)(&port), &r.Link.playpath0, &r.Link.app)) if ret == 0 { return ret } r.Link.port = C.ushort(port) r.Link.playpath = r.Link.playpath0 if r.Link.tcUrl.av_len == 0 { r.Link.tcUrl.av_val = (*C.char)(unsafe.Pointer(url)) if r.Link.app.av_len != 0 { if int(uintptr(unsafe.Pointer(r.Link.app.av_val))) < int(uintptr(incBytePtr(unsafe.Pointer(url), int(len)))) { r.Link.tcUrl.av_len = C.int(int(r.Link.app.av_len) + int(uintptr(decBytePtr(unsafe.Pointer(r.Link.app.av_val), int(uintptr(unsafe.Pointer(url))))))) } else { len = int32(r.Link.hostname.av_len) + int32(r.Link.app.av_len) + int32(unsafe.Sizeof("rtmpte://:65535/")) r.Link.tcUrl.av_val = (*C.char)(allocate(uintptr(len))) hostname := string(ptrToSlice(unsafe.Pointer(r.Link.hostname.av_val), int(r.Link.hostname.av_len))) app := string(ptrToSlice(unsafe.Pointer(r.Link.app.av_val), int(r.Link.app.av_len))) fString := fmt.Sprintf("%v://%v:%v/%v", RTMPProtocolStringsLower[r.Link.protocol], hostname, r.Link.port, app) r.Link.tcUrl.av_val = (*C.char)(bToUP(goStrToCStr(fString))) r.Link.tcUrl.av_len = C.int(strLen(RTMPProtocolStringsLower[r.Link.protocol]) + strLen(string("://")) + strLen(hostname) + strLen(string(":")) + strLen(strconv.Itoa(int(r.Link.port))) + strLen(string("/")) + strLen(app)) r.Link.lFlags |= RTMP_LF_FTCU } } else { r.Link.tcUrl.av_len = C.int(strlen(url)) } } socksSetup(r, &r.Link.sockshost) if r.Link.port == 0 { switch { case (r.Link.protocol & RTMP_FEATURE_SSL) != 0: r.Link.port = 433 case (r.Link.protocol & RTMP_FEATURE_HTTP) != 0: r.Link.port = 80 default: r.Link.port = 1935 } } return 1 } func socksSetup(r *C.RTMP, sockshost *C.AVal) { if sockshost.av_len != 0 { socksport := strchr((*byte)(unsafe.Pointer(sockshost.av_val)), ':') hostname := strdup((*byte)(unsafe.Pointer(sockshost.av_val))) if uintptr(unsafe.Pointer(socksport)) != 0 { *indxBytePtr(unsafe.Pointer(hostname), int(uintptr(decBytePtr(unsafe.Pointer(socksport), int(uintptr(unsafe.Pointer(sockshost.av_val))))))) = '\000' r.Link.sockshost.av_val = (*C.char)(unsafe.Pointer(hostname)) r.Link.sockshost.av_len = C.int(strlen(hostname)) value, err := strconv.Atoi(string(ptrToSlice(unsafe.Pointer(uintptr( unsafe.Pointer(socksport))+uintptr(1)), int(strlen((*byte)(unsafe.Pointer( uintptr(unsafe.Pointer(socksport))+uintptr(1)))))+1))) if err != nil { log.Println("socksSetup: bad string conversion!") } if uintptr(unsafe.Pointer(socksport)) == 0 { value = 1080 } r.Link.socksport = C.ushort(value) } } } /* func rtmpClose(r *C.RTMP) { closeInternal(r, 0) } func closeInternal(r *C.RTMP, reconnect int32) { var i int32 if rtmpIsConnected(r) != 0 { if r.m_stream_id > 0 { i = int32(r.m_stream_id) if r.Link.protocol&RTMP_FEATURE_WRITE != 0 { C.SendFCUnpublish(r) } C.SendDeleteStream(r, C.double(i)) } C.RTMPSockBuf_Close(&r.m_sb) } r.m_stream_id = -1 r.m_sb.sb_socket = -1 r.m_nBWCheckCounter = 0 r.m_nBytesIn = 0 r.m_nBytesInSent = 0 if r.m_read.flags&RTMP_READ_HEADER != 0 { C.free(unsafe.Pointer(r.m_read.buf)) r.m_read.buf = nil } r.m_read.dataType = 0 r.m_read.flags = 0 r.m_read.status = 0 r.m_read.nResumeTS = 0 r.m_read.nIgnoredFrameCounter = 0 r.m_read.nIgnoredFlvFrameCounter = 0 r.m_write.m_nBytesRead = 0 C.RTMPPacket_Free(&r.m_write) for i := 0; i < int(r.m_channelsAllocatedIn); i++ { if *(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsIn), i, int(unsafe.Sizeof(&r.m_write)))) != nil { C.RTMPPacket_Free(*(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsIn), i, int(unsafe.Sizeof(&r.m_write))))) C.free(unsafe.Pointer(*(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsIn), i, int(unsafe.Sizeof(&r.m_write)))))) *(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsIn), i, int(unsafe.Sizeof(&r.m_write)))) = nil } } C.free(unsafe.Pointer(r.m_vecChannelsOut)) r.m_vecChannelsOut = nil r.m_channelsAllocatedOut = 0 C.AV_clear(r.m_methodCalls, r.m_numCalls) r.m_methodCalls = nil r.m_numCalls = 0 r.m_numInvokes = 0 r.m_bPlaying = C.uchar(0) r.m_sb.sb_size = 0 r.m_msgCounter = 0 r.m_resplen = 0 r.m_unackd = 0 if ((r.Link.lFlags & RTMP_LF_FTCU) != 0) && (reconnect == 0) { C.free(unsafe.Pointer(r.Link.app.av_val)) r.Link.app.av_val = nil r.Link.lFlags ^= RTMP_LF_FAPU } if reconnect == 0 { C.free(unsafe.Pointer(r.Link.playpath0.av_val)) r.Link.playpath0.av_val = nil } } */ func rtmpEnableWrite(r *C.RTMP) { r.Link.protocol |= RTMP_FEATURE_WRITE } func rtmpSetBufferMS(r *C.RTMP, size int32) { r.m_nBufferMS = C.int(size) } func rtmpConnect(r *C.RTMP, cp *C.RTMPPacket) int { // TODO: port this var service C.sockaddr_in if r.Link.hostname.av_len == 0 { return 0 } var tmp C.sockaddr_in memset((*byte)(unsafe.Pointer(&service)), 0, int(unsafe.Sizeof(tmp))) // TODO: port this service.sin_family = C.AF_INET if r.Link.socksport != 0 { // TODO: port this if C.add_addr_info(&service, &r.Link.sockshost, C.int(r.Link.socksport)) == 0 { return 0 } } else { // connect directly if C.add_addr_info(&service, (*C.AVal)(unsafe.Pointer(&r.Link.hostname)), C.int(r.Link.port)) == 0 { return 0 } } //if C.RTMP_Connect0(r, (*C.sockaddr)(unsafe.Pointer(&service))) == 0 { if rtmpConnect0(r, (*C.sockaddr)(unsafe.Pointer(&service))) == 0 { return 0 } r.m_bSendCounter = 1 return int(rtmpConnect1(r, cp)) //return int(C.RTMP_Connect1(r, cp)) } func rtmpConnect0(r *C.RTMP, service *C.sockaddr) int { on := 1 r.m_sb.sb_timedout = 0 r.m_pausing = 0 r.m_fDuration = 0.0 r.m_sb.sb_socket = C.socket(C.AF_INET, C.SOCK_STREAM, C.IPPROTO_TCP) if r.m_sb.sb_socket != -1 { if C.connect(r.m_sb.sb_socket, service, C.socklen_t(unsafe.Sizeof(*service))) < 0 { log.Println("rtmpConnect0, failed to connect socket.") } if r.Link.socksport != 0 { if debugMode { log.Println("rtmpConnect0: socks negotiation.") } if C.SocksNegotiate(r) == 0 { log.Println("rtmpConnect0: SOCKS negotiation failed.") return 0 } } } else { log.Println("rtmpConnect0: failed to create socket.") return 0 } { tv := C.int(r.Link.timeout * 1000) if C.setsockopt(r.m_sb.sb_socket, C.SOL_SOCKET, C.SO_RCVTIMEO, unsafe.Pointer(&tv), C.socklen_t(unsafe.Sizeof(tv))) != 0 { log.Println("rtmpConnect0: Setting socket timeout failed") } } C.setsockopt(r.m_sb.sb_socket, C.IPPROTO_TCP, C.TCP_NODELAY, unsafe.Pointer(&on), C.socklen_t(unsafe.Sizeof(on))) return 1 } func rtmpConnect1(r *C.RTMP, cp *C.RTMPPacket) int { if debugMode { log.Println("... connected, handshaking...") } //if C.HandShake(r, 1) == 0 { if handShake(r, 1) == 0 { log.Println("rtmpConnect1: handshake failed!") return 0 } if debugMode { log.Println("... handshaked...") } //if C.SendConnectPacket(r, cp) == 0 { if sendConnectPacket(r, cp) == 0 { log.Println("RTMP connect failed!") return 0 } return 1 } func handShake(r *C.RTMP, FP9HandShake int32) int { var bMatch int //uptime := uint32(0) //suptime := uint32(0) //typ := byte(0) var uptime, suptime uint32 var typ byte //clientbuf := make([]byte, RTMP_SIG_SIZE+1) var clientbuf [RTMP_SIG_SIZE + 1]byte clientsig := (*byte)(incBytePtr(unsafe.Pointer(&clientbuf[0]), 1)) //serversig := make([]byte, RTMP_SIG_SIZE) var serversig [RTMP_SIG_SIZE]byte clientbuf[0] = 0x03 // not encrypted // TODO: port rtmp_getTime uptime = inet.Htonl(uint32(C.RTMP_GetTime())) memmove(unsafe.Pointer(clientsig), unsafe.Pointer(&uptime), 4) memset(indxBytePtr(unsafe.Pointer(clientsig), 4), 0, 4) for i := 8; i < RTMP_SIG_SIZE; i++ { *indxBytePtr(unsafe.Pointer(clientsig), i) = byte(rand.Intn(256)) } if writeN(r, unsafe.Pointer(&clientbuf[0]), RTMP_SIG_SIZE+1) == 0 { return 0 } if C.ReadN(r, (*C.char)(unsafe.Pointer(&typ)), 1) != 1 { //if readN(r, (*byte)(unsafe.Pointer(&typ)), 1) != 1 { return 0 } if debugMode { log.Println("handShake: Type answer: %v", typ) } if typ != clientbuf[0] { log.Println("handShake: type mismatch: client sent %v, server sent: %v", clientbuf[0], typ) } if readN(r, (*byte)(unsafe.Pointer(&serversig[0])), RTMP_SIG_SIZE) != RTMP_SIG_SIZE { //if C.ReadN(r, (*C.char)(unsafe.Pointer(&serversig[0])), RTMP_SIG_SIZE) != RTMP_SIG_SIZE { return 0 } // decode server response memmove(unsafe.Pointer(&suptime), unsafe.Pointer(&serversig[0]), 4) suptime = inet.Ntohl(suptime) // 2nd part of handshake if writeN(r, unsafe.Pointer(&serversig[0]), RTMP_SIG_SIZE) == 0 { return 0 } //if readN(r, (*byte)(unsafe.Pointer(&serversig[0])), RTMP_SIG_SIZE) != RTMP_SIG_SIZE { if C.ReadN(r, (*C.char)(unsafe.Pointer(&serversig[0])), RTMP_SIG_SIZE) != RTMP_SIG_SIZE { return 0 } // TODO: find golang memcmp bMatch = 0 if memcmp(unsafe.Pointer(&serversig[0]), unsafe.Pointer(clientsig), RTMP_SIG_SIZE) == 0 { bMatch = 1 } if bMatch == 0 { log.Println("Client signature does not match!") } return 1 } func readN(r *C.RTMP, buffer *byte, n int) int { nOriginalSize := n var avail int var ptr *byte r.m_sb.sb_timedout = 0 ptr = buffer for n > 0 { nBytes := 0 var nRead int avail = int(r.m_sb.sb_size) if avail == 0 { if rtmpSockBufFill(&r.m_sb) < 1 { // if C.RTMPSockBuf_Fill(&r.m_sb) < 1 { if r.m_sb.sb_timedout == 0 { return 0 } } avail = int(r.m_sb.sb_size) } if n < avail { nRead = n } else { nRead = avail } if nRead > 0 { memmove(unsafe.Pointer(ptr), unsafe.Pointer(r.m_sb.sb_start), uintptr(nRead)) r.m_sb.sb_start = (*C.char)(incBytePtr(unsafe.Pointer(r.m_sb.sb_start), nRead)) r.m_sb.sb_size -= C.int(nRead) nBytes = nRead r.m_nBytesIn += C.int(nRead) if r.m_bSendCounter != 0 && r.m_nBytesIn > (r.m_nBytesInSent+ r.m_nClientBW/10) { //if C.SendBytesReceived(r) == 0 { if sendBytesReceived(r) == 0 { return 0 } } } if nBytes == 0 { log.Println("RTMP socket closed by peer") // RTMP_Close(r) break } n -= nBytes ptr = (*byte)(incBytePtr(unsafe.Pointer(ptr), nBytes)) } return nOriginalSize - n } func rtmpSockBufFill(sb *C.RTMPSockBuf) int { var nBytes int if sb.sb_size == 0 { sb.sb_start = &sb.sb_buf[0] } for { nBytes = int(unsafe.Sizeof(sb.sb_buf)) - 1 - int(sb.sb_size) - int(uintptr(unsafe.Pointer(sb.sb_start))-uintptr(unsafe.Pointer( &sb.sb_buf[0]))) // TODO: figure out what to do with recv nBytes = int(C.recv(sb.sb_socket, unsafe.Pointer(uintptr(unsafe.Pointer( sb.sb_start))+uintptr(int(sb.sb_size))), C.size_t(nBytes), 0)) if nBytes != -1 { sb.sb_size += C.int(nBytes) } else { log.Println("rtmpSockBufFill: recv error!") } break } return nBytes } func sendBytesReceived(r *C.RTMP) int { var packet C.RTMPPacket var pbuf [256]byte pend := (*byte)(incBytePtr(unsafe.Pointer(&pbuf[0]), 256)) packet.m_nChannel = 0x02 /* control channel (invoke) */ packet.m_headerType = RTMP_PACKET_SIZE_MEDIUM packet.m_packetType = RTMP_PACKET_TYPE_BYTES_READ_REPORT packet.m_nTimeStamp = 0 packet.m_nInfoField2 = 0 packet.m_hasAbsTimestamp = 0 packet.m_body = (*C.char)(incBytePtr(unsafe.Pointer(&pbuf[0]), RTMP_MAX_HEADER_SIZE)) packet.m_nBodySize = 4 amfEncodeInt32((*byte)(unsafe.Pointer(packet.m_body)), pend, int32(r.m_nBytesIn)) // C.AMF_EncodeInt32(packet.m_body, (*C.char)(unsafe.Pointer(pend)), r.m_nBytesIn) r.m_nBytesInSent = r.m_nBytesIn return int(C.RTMP_SendPacket(r, &packet, 0)) } func sendConnectPacket(r *C.RTMP, cp *C.RTMPPacket) int { var packet C.RTMPPacket var pbuf [4096]byte pend := (*byte)(unsafe.Pointer(incBytePtr(unsafe.Pointer(&pbuf[0]), int(unsafe.Sizeof(pbuf))))) var enc *byte if cp != nil { return rtmpSendPacket(r, cp, 1) //return int(C.RTMP_SendPacket(r, cp, 1)) } packet.m_nChannel = 0x03 packet.m_headerType = RTMP_PACKET_SIZE_LARGE packet.m_packetType = RTMP_PACKET_TYPE_INVOKE packet.m_nTimeStamp = 0 packet.m_nInfoField2 = 0 packet.m_hasAbsTimestamp = 0 packet.m_body = (*C.char)(incBytePtr(unsafe.Pointer(&pbuf[0]), RTMP_MAX_HEADER_SIZE)) enc = (*byte)(unsafe.Pointer(packet.m_body)) //enc = (*byte)(unsafe.Pointer(C.AMF_EncodeString((*C.char)(unsafe.Pointer(enc)), //(*C.char)(unsafe.Pointer(pend)), &av_connect))) enc = amfEncodeString(enc, pend, &av_connect) r.m_numInvokes += 1 //enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNumber((*C.char)(unsafe.Pointer(enc)), //(*C.char)(unsafe.Pointer(pend)), C.double(r.m_numInvokes)))) enc = amfEncodeNumber(enc, pend, float64(r.m_numInvokes)) *indxBytePtr(unsafe.Pointer(enc), 0) = AMF_OBJECT enc = (*byte)(unsafe.Pointer(incBytePtr(unsafe.Pointer(enc), 1))) //enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedString((*C.char)( //unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_app, &r.Link.app))) enc = amfEncodeNamedString(enc, pend, &av_app, &r.Link.app) if enc == nil { return 0 } if r.Link.protocol&RTMP_FEATURE_WRITE != 0 { //enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedString((*C.char)( //unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_type, &av_nonprivate))) enc = amfEncodeNamedString(enc, pend, &av_type, &av_nonprivate) if enc == nil { return 0 } } if r.Link.flashVer.av_len != 0 { //enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedString((*C.char)( //unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_flashVer, &r.Link.flashVer))) enc = amfEncodeNamedString(enc, pend, &av_flashVer, &r.Link.flashVer) if enc == nil { return 0 } } if r.Link.swfUrl.av_len != 0 { //enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedString((*C.char)( // unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_swfUrl, &r.Link.swfUrl))) enc = amfEncodeNamedString(enc, pend, &av_swfUrl, &r.Link.swfUrl) if enc == nil { return 0 } } if r.Link.tcUrl.av_len != 0 { //enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedString((*C.char)( //unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_tcUrl, &r.Link.tcUrl))) enc = amfEncodeNamedString(enc, pend, &av_tcUrl, &r.Link.tcUrl) if enc == nil { return 0 } } if r.Link.protocol&RTMP_FEATURE_WRITE == 0 { //enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedBoolean((*C.char)( //unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_fpad, 0))) enc = amfEncodeNamedBoolean(enc, pend, &av_fpad, 0) if enc == nil { return 0 } //enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedNumber((*C.char)( //unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_capabilities, 15.0))) enc = amfEncodeNamedNumber(enc, pend, &av_capabilities, 15.0) if enc == nil { return 0 } //enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedNumber((*C.char)( // unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_audioCodecs, r.m_fAudioCodecs))) enc = amfEncodeNamedNumber(enc, pend, &av_audioCodecs, float64(r.m_fAudioCodecs)) if enc == nil { return 0 } //enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedNumber((*C.char)( //unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_videoCodecs, r.m_fVideoCodecs))) enc = amfEncodeNamedNumber(enc, pend, &av_videoCodecs, float64(r.m_fVideoCodecs)) if enc == nil { return 0 } //enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedNumber((*C.char)( // unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_videoFunction, 1.0))) enc = amfEncodeNamedNumber(enc, pend, &av_videoFunction, 1.0) if enc == nil { return 0 } if r.Link.pageUrl.av_len != 0 { //enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedString((*C.char)( //unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_pageUrl, &r.Link.pageUrl))) enc = amfEncodeNamedString(enc, pend, &av_pageUrl, &r.Link.pageUrl) if enc == nil { return 0 } } } if r.m_fEncoding != 0.0 || r.m_bSendEncoding != 0 { //enc = (*byte)(unsafe.Pointer(C.AMF_EncodeNamedNumber((*C.char)( //unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), &av_objectEncoding, r.m_fEncoding))) enc = amfEncodeNamedNumber(enc, pend, &av_objectEncoding, float64(r.m_fEncoding)) if enc == nil { return 0 } } if int(uintptr(incBytePtr(unsafe.Pointer(enc), 3))) >= int(uintptr( unsafe.Pointer(pend))) { return 0 } *indxBytePtr(unsafe.Pointer(enc), 0) = 0 enc = (*byte)(incBytePtr(unsafe.Pointer(enc), 1)) *indxBytePtr(unsafe.Pointer(enc), 0) = 0 enc = (*byte)(incBytePtr(unsafe.Pointer(enc), 1)) *indxBytePtr(unsafe.Pointer(enc), 0) = C.AMF_OBJECT_END enc = (*byte)(incBytePtr(unsafe.Pointer(enc), 1)) /* add auth string */ if r.Link.auth.av_len != 0 { //enc = (*byte)(unsafe.Pointer(C.AMF_EncodeBoolean((*C.char)( // unsafe.Pointer(enc)), (*C.char)(unsafe.Pointer(pend)), r.Link.lFlags&RTMP_LF_AUTH))) enc = amfEncodeBoolean(enc, pend, int(r.Link.lFlags&RTMP_LF_AUTH)) if enc == nil { return 0 } //enc = (*byte)(unsafe.Pointer(C.AMF_EncodeString((*C.char)(unsafe.Pointer(enc)), //(*C.char)(unsafe.Pointer(pend)), &r.Link.auth))) enc = amfEncodeString(enc, (*byte)(pend), &r.Link.auth) if enc == nil { return 0 } } if r.Link.extras.o_num != 0 { for i := 0; i < int(r.Link.extras.o_num); i++ { //enc = (*byte)(unsafe.Pointer(C.AMFProp_Encode((*C.AMFObjectProperty)( //incPtr(unsafe.Pointer(&r.Link.extras.o_props), int(unsafe.Sizeof( //r.Link.extras.o_props)), i)), (*C.char)(unsafe.Pointer(enc)), (*C.char)( //unsafe.Pointer(pend))))) enc = amfPropEncode((*C.AMFObjectProperty)(incPtr(unsafe.Pointer( &r.Link.extras.o_props), int(unsafe.Sizeof(r.Link.extras.o_props)), i)), enc, pend) if enc == nil { return 0 } } } packet.m_nBodySize = C.uint32_t(int(uintptr(decBytePtr(unsafe.Pointer(enc), int(uintptr(unsafe.Pointer(packet.m_body))))))) //return int(C.RTMP_SendPacket(r, &packet, 1)) return rtmpSendPacket(r, &packet, 1) } func amfPropEncode(p *C.AMFObjectProperty, pBuffer *byte, pBufEnd *byte) *byte { if p.p_type == AMF_INVALID { return nil } if p.p_type != AMF_NULL && int(uintptr(unsafe.Pointer(pBuffer)))+ int(p.p_name.av_len)+2+1 >= int( uintptr(unsafe.Pointer(pBufEnd))) { return nil } if p.p_type != AMF_NULL && p.p_name.av_len != 0 { *indxBytePtr(unsafe.Pointer(pBuffer), 0) = byte(p.p_name.av_len >> 8) pBuffer = (*byte)(incBytePtr(unsafe.Pointer(pBuffer), 1)) *indxBytePtr(unsafe.Pointer(pBuffer), 0) = byte(p.p_name.av_len & 0xff) pBuffer = (*byte)(incBytePtr(unsafe.Pointer(pBuffer), 1)) memmove(unsafe.Pointer(pBuffer), unsafe.Pointer(p.p_name.av_val), uintptr(p.p_name.av_len)) pBuffer = (*byte)(incBytePtr(unsafe.Pointer(pBuffer), int(p.p_name.av_len))) } switch p.p_type { case AMF_NUMBER: pBuffer = amfEncodeNumber(pBuffer, pBufEnd, float64(p.p_vu.p_number)) case AMF_BOOLEAN: val := 0 if p.p_vu.p_number != 0 { val = 1 } pBuffer = amfEncodeBoolean(pBuffer, pBufEnd, val) case AMF_STRING: pBuffer = amfEncodeString(pBuffer, pBufEnd, &p.p_vu.p_aval) case AMF_NULL: if uintptr(incBytePtr(unsafe.Pointer(pBuffer), 1)) >= uintptr(unsafe.Pointer( pBufEnd)) { return nil } *(*byte)(unsafe.Pointer(pBuffer)) = AMF_NULL pBuffer = (*byte)(incBytePtr(unsafe.Pointer(pBuffer), 1)) case AMF_OBJECT: pBuffer = (*byte)(unsafe.Pointer(C.AMF_Encode(&p.p_vu.p_object, (*C.char)( unsafe.Pointer(pBuffer)), (*C.char)(unsafe.Pointer(pBufEnd))))) case AMF_ECMA_ARRAY: pBuffer = (*byte)(unsafe.Pointer(C.AMF_EncodeEcmaArray(&p.p_vu.p_object, (*C.char)(unsafe.Pointer(pBuffer)), (*C.char)(unsafe.Pointer(pBufEnd))))) case AMF_STRICT_ARRAY: pBuffer = (*byte)(unsafe.Pointer(C.AMF_EncodeArray(&p.p_vu.p_object, (*C.char)(unsafe.Pointer(pBuffer)), (*C.char)(unsafe.Pointer(pBufEnd))))) default: log.Println("amfPropEncode: invalid type!") pBuffer = nil } return pBuffer } func amfEncode(obj *AMFObject, pBuffer *byte, pBufEnd *byte) *byte { if uintptr(unsafe.Pointer(pBuffer))+uintptr(4) >= uintptr(unsafe.Pointer(pBufEnd)) { return nil } *(*byte)(unsafe.Pointer(pBuffer)) = AMF_OBJECT pBuffer = (*byte)(incBytePtr(unsafe.Pointer(pBuffer), 1)) for i := 0; i < int(obj.o_num); i++ { res := amfPropEncode((*C.AMFObjectProperty)(incPtr(unsafe.Pointer( obj.o_props), i, int(unsafe.Sizeof(*obj.o_props)))), pBuffer, pBufEnd) if res == nil { log.Println("amfEncode: failed to encode property in index") break } else { pBuffer = res } } if uintptr(incBytePtr(unsafe.Pointer(pBuffer), 3)) >= uintptr(unsafe.Pointer(pBufEnd)) { return nil } pBuffer = amfEncodeInt24(pBuffer, pBufEnd, int32(AMF_OBJECT_END)) return pBuffer } func rtmpConnectStream(r *C.RTMP, seekTime int32) int { var packet C.RTMPPacket memset((*byte)(unsafe.Pointer(&packet)), 0, int(unsafe.Sizeof(packet))) if seekTime > 0 { r.Link.seekTime = C.int(seekTime) } r.m_mediaChannel = 0 // TODO: read packet for r.m_bPlaying == 0 && rtmpIsConnected(r) != 0 && C.RTMP_ReadPacket(r, &packet) != 0 { // TODO: port is ready if rtmpPacketIsReady(&packet) != 0 { if packet.m_nBodySize == 0 { continue } if packet.m_packetType == RTMP_PACKET_TYPE_AUDIO || packet.m_packetType == RTMP_PACKET_TYPE_VIDEO || packet.m_packetType == RTMP_PACKET_TYPE_INFO { log.Println("rtmpConnectStream: got packet before play()! Ignoring.") C.RTMPPacket_Free(&packet) continue } // TODO: port this C.RTMP_ClientPacket(r, &packet) C.RTMPPacket_Free(&packet) } } return int(r.m_bPlaying) } func rtmpPacketIsReady(p *C.RTMPPacket) int { if p.m_nBytesRead == p.m_nBodySize { return 1 } return 0 } func endSession(rtmp *C.RTMP) uint32 { if rtmp == nil { return 3 } //C.RTMP_Close(rtmp) //C.RTMP_Free(rtmp) return 0 } // rtmpWrite writes data to the current rtmp connection encapsulated by r func rtmpWrite(r *C.RTMP, data []byte) int { buf := sliceToPtr(data) // TODO: port RTMPPacket var pkt = &r.m_write var pend, enc unsafe.Pointer size := len(data) s2 := size var ret, num int pkt.m_nChannel = 0x04 pkt.m_nInfoField2 = C.int32_t(r.m_stream_id) for s2 != 0 { if pkt.m_nBytesRead == 0 { if size < minDataSize { log.Printf("size: %d\n", size) log.Printf("too small \n") return 0 } if *indxBytePtr(buf, 0) == 'F' && *indxBytePtr(buf, 1) == 'L' && *indxBytePtr(buf, 2) == 'V' { buf = unsafe.Pointer(uintptr(buf) + uintptr(13)) s2 -= 13 } pkt.m_packetType = C.uint8_t(*indxBytePtr(buf, 0)) buf = incBytePtr(buf, 1) pkt.m_nBodySize = C.uint32_t(amfDecodeInt24((*byte)(buf))) buf = incBytePtr(buf, 3) pkt.m_nTimeStamp = C.uint32_t(amfDecodeInt24((*byte)(buf))) buf = incBytePtr(buf, 3) pkt.m_nTimeStamp |= C.uint32_t(*indxBytePtr(buf, 0)) << 24 buf = incBytePtr(buf, 4) s2 -= 11 if ((pkt.m_packetType == RTMP_PACKET_TYPE_AUDIO || pkt.m_packetType == RTMP_PACKET_TYPE_VIDEO) && pkt.m_nTimeStamp == 0) || pkt.m_packetType == RTMP_PACKET_TYPE_INFO { pkt.m_headerType = RTMP_PACKET_SIZE_LARGE if pkt.m_packetType == RTMP_PACKET_TYPE_INFO { pkt.m_nBodySize += 16 } } else { pkt.m_headerType = RTMP_PACKET_SIZE_MEDIUM } // TODO: Port this if int(C.RTMPPacket_Alloc(pkt, pkt.m_nBodySize)) == 0 { log.Println("Failed to allocate packet") return 0 } enc = unsafe.Pointer(pkt.m_body) pend = incBytePtr(enc, int(pkt.m_nBodySize)) if pkt.m_packetType == RTMP_PACKET_TYPE_INFO { enc = unsafe.Pointer(amfEncodeString((*byte)(enc), (*byte)(pend), &setDataFrame)) pkt.m_nBytesRead = C.uint32_t(math.Abs(float64(uintptr(enc) - uintptr(unsafe.Pointer(pkt.m_body))))) } } else { enc = incBytePtr(unsafe.Pointer(pkt.m_body), int(pkt.m_nBytesRead)) } num = int(pkt.m_nBodySize - pkt.m_nBytesRead) if num > s2 { num = s2 } memmove(enc, buf, uintptr(num)) pkt.m_nBytesRead += C.uint32_t(num) s2 -= num buf = incBytePtr(buf, num) if pkt.m_nBytesRead == pkt.m_nBodySize { // TODO: Port this ret = rtmpSendPacket(r, pkt, 0) // TODO: Port this C.RTMPPacket_Free(pkt) pkt.m_nBytesRead = 0 if ret == 0 { return -1 } buf = incBytePtr(buf, 4) s2 -= 4 if s2 < 0 { break } } } return size + s2 } // send packet version 1 - less C stuff func rtmpSendPacket(r *C.RTMP, packet *C.RTMPPacket, queue int) int { var prevPacket *C.RTMPPacket last := 0 var nSize, hSize, cSize, nChunkSize int var header, hptr, hend, buffer, tbuf, toff unsafe.Pointer var goHbuf [RTMP_MAX_HEADER_SIZE]byte var hbuf = unsafe.Pointer(&goHbuf[0]) var c byte var t int32 var packets unsafe.Pointer if packet.m_nChannel >= r.m_channelsAllocatedOut { n := int(packet.m_nChannel + 10) packets = C.realloc(unsafe.Pointer(r.m_vecChannelsOut), C.size_t( unsafe.Sizeof(packet)*uintptr(n))) //packets = realloc(unsafe.Pointer(r.m_vecChannelsOut), //int(unsafe.Sizeof(packet)*uintptr(n))) if uintptr(packets) == uintptr(0) { C.free(unsafe.Pointer(r.m_vecChannelsOut)) r.m_vecChannelsOut = nil r.m_channelsAllocatedOut = 0 return 0 } r.m_vecChannelsOut = (**C.RTMPPacket)(packets) C.memset(incPtr(unsafe.Pointer(r.m_vecChannelsOut), int(r.m_channelsAllocatedOut), int(unsafe.Sizeof(packet))), 0, C.size_t(unsafe.Sizeof(packet)* uintptr(n-int(r.m_channelsAllocatedOut)))) //memset((*byte)(incPtr(unsafe.Pointer(r.m_vecChannelsOut), int( // r.m_channelsAllocatedOut), int(unsafe.Sizeof(packet)))), 0, int( // unsafe.Sizeof(packet)*uintptr(n-int(r.m_channelsAllocatedOut)))) r.m_channelsAllocatedOut = C.int(n) } prevPacket = *(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsOut), int(packet.m_nChannel), int(unsafe.Sizeof(packet)))) if prevPacket != nil && packet.m_headerType != RTMP_PACKET_SIZE_LARGE { // compress a bit by using the prev packet's attributes if prevPacket.m_nBodySize == packet.m_nBodySize && prevPacket.m_packetType == packet.m_packetType && packet.m_headerType == RTMP_PACKET_SIZE_MEDIUM { packet.m_headerType = RTMP_PACKET_SIZE_SMALL } if prevPacket.m_nTimeStamp == packet.m_nTimeStamp && packet.m_headerType == RTMP_PACKET_SIZE_SMALL { packet.m_headerType = RTMP_PACKET_SIZE_MINIMUM } last = int(prevPacket.m_nTimeStamp) } if packet.m_headerType > 3 { log.Printf("Sanity failed! trying to send header of type: 0x%02x.", packet.m_headerType) return 0 } nSize = packetSize[int(packet.m_headerType)] hSize = nSize cSize = 0 t = int32(int(packet.m_nTimeStamp) - last) if packet.m_body != nil { header = decBytePtr(unsafe.Pointer(packet.m_body), nSize) hend = unsafe.Pointer(packet.m_body) } else { header = incBytePtr(hbuf, 6) // TODO: be cautious about this sizeof - make sure it works how you think it // does. C code used sizeof(hbuf) where hbuf is a *char hend = incBytePtr(hbuf, RTMP_MAX_HEADER_SIZE) } switch { case packet.m_nChannel > 319: cSize = 2 case packet.m_nChannel > 63: cSize = 1 } if cSize != 0 { header = decBytePtr(header, cSize) hSize += cSize } if t >= 0xffffff { header = decBytePtr(header, 4) hSize += 4 log.Printf("Larger timestamp than 24-bit: 0x%v", t) } hptr = header c = byte(packet.m_headerType) << 6 switch cSize { case 0: c |= byte(packet.m_nChannel) case 1: case 2: c |= byte(1) } *(*byte)(hptr) = c hptr = incBytePtr(hptr, 1) if cSize != 0 { tmp := packet.m_nChannel - 64 *(*byte)(hptr) = byte(tmp & 0xff) hptr = incBytePtr(hptr, 1) if cSize == 2 { *(*byte)(hptr) = byte(tmp >> 8) hptr = incBytePtr(hptr, 1) } } if nSize > 1 { res := t if t > 0xffffff { res = 0xffffff } hptr = unsafe.Pointer(amfEncodeInt24((*byte)(hptr), (*byte)(hend), res)) } if nSize > 4 { hptr = unsafe.Pointer(amfEncodeInt24((*byte)(hptr), (*byte)(hend), (int32(packet.m_nBodySize)))) *(*byte)(hptr) = byte(packet.m_packetType) hptr = incBytePtr(hptr, 1) } if nSize > 8 { // TODO: port this hptr = incBytePtr(hptr, int(C.EncodeInt32LE((*C.char)(hptr), C.int(packet.m_nInfoField2)))) } if t >= 0xffffff { hptr = unsafe.Pointer(amfEncodeInt32((*byte)(hptr), (*byte)(hend), (int32)(t))) } nSize = int(packet.m_nBodySize) buffer = unsafe.Pointer(packet.m_body) nChunkSize = int(r.m_outChunkSize) if debugMode { log.Printf("rtmpSendPacket: fd=%v, size=%v", r.m_sb.sb_socket, nSize) } for (nSize + hSize) != 0 { var wrote int if nSize < nChunkSize { nChunkSize = nSize } if tbuf != nil { //memmove(toff, header, uintptr(nChunkSize + hSize)) copy(ptrToSlice(toff, int(nChunkSize+hSize)), ptrToSlice(header, int(nChunkSize+hSize))) toff = incBytePtr(toff, nChunkSize+hSize) } else { // TODO: port this wrote = int(writeN(r, header, nChunkSize+hSize)) if wrote == 0 { return 0 } } nSize -= nChunkSize buffer = incBytePtr(buffer, nChunkSize) hSize = 0 if nSize > 0 { header = decBytePtr(buffer, 1) hSize = 1 if cSize != 0 { header = decBytePtr(header, cSize) hSize += cSize } if t >= 0xffffff { header = decBytePtr(header, 4) hSize += 4 } *(*byte)(header) = byte(0xc0 | c) if cSize != 0 { tmp := int(packet.m_nChannel) - 64 *indxBytePtr(header, 1) = byte(tmp & 0xff) if cSize == 2 { *indxBytePtr(header, 2) = byte(tmp >> 8) } } if t >= 0xffffff { extendedTimestamp := incBytePtr(header, 1+cSize) amfEncodeInt32((*byte)(extendedTimestamp), (*byte)(incBytePtr(extendedTimestamp, 4)), (int32)(t)) } } } if tbuf != nil { wrote := int(writeN(r, tbuf, int(uintptr(decBytePtr(toff, int(uintptr(unsafe.Pointer(tbuf)))))))) //C.free(tbuf) tbuf = nil if wrote == 0 { return 0 } } // We invoked a remote method // TODO: port the const if packet.m_packetType == RTMP_PACKET_TYPE_INVOKE { // TODO: port C.AVal var method C.AVal var ptr unsafe.Pointer ptr = incBytePtr(unsafe.Pointer(packet.m_body), 1) //C.AMF_DecodeString((*C.char)(ptr), &method) amfDecodeString((*byte)(ptr), &method) if debugMode { log.Printf("Invoking %v", method.av_val) } // keep it in call queue till result arrives if queue != 0 { var txn int ptr = incBytePtr(ptr, 3+int(method.av_len)) //txn = int(C.AMF_DecodeNumber((*C.char)(ptr))) txn = int(amfDecodeNumber((*byte)(ptr))) avQueue(&r.m_methodCalls, (*int32)(unsafe.Pointer(&r.m_numCalls)), &method, int32(txn)) //C.AV_queue(&r.m_methodCalls, (*C.int)(unsafe.Pointer(&r.m_numCalls)), &method, //C.int(txn)) } } if *(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsOut), int(packet.m_nChannel), int(unsafe.Sizeof(packet)))) == nil { *(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsOut), int(packet.m_nChannel), int(unsafe.Sizeof(packet)))) = (*C.RTMPPacket)(allocate(unsafe.Sizeof(*packet))) } memmove(unsafe.Pointer(*(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsOut), int(packet.m_nChannel), int(unsafe.Sizeof(packet))))), unsafe.Pointer(packet), uintptr(unsafe.Sizeof(*packet))) return 1 } func writeN(r *C.RTMP, buffer unsafe.Pointer, n int) int { ptr := buffer for n > 0 { var nBytes int // TODO: port this if necessary nBytes = int(sockBufSend(&r.m_sb, (*byte)(ptr), int32(n))) if nBytes < 0 { if debugMode { log.Println("writeN, RTMP send error") } // TODO: port this C.RTMP_Close(r) n = 1 break } if nBytes == 0 { break } n -= nBytes ptr = incBytePtr(ptr, nBytes) } if n == 0 { return 1 } return 0 } const length = 512 var RTMPT_cmds = []string{ "open", "send", "idle", "close", } func sockBufSend(sb *C.RTMPSockBuf, buf *byte, l int32) int32 { return int32(C.send(sb.sb_socket, unsafe.Pointer(buf), C.size_t(l), 0)) } // TODO: port RTMP_METHOD func avQueue(vals **C.RTMP_METHOD, num *int32, av *C.AVal, txn int32) { if (*num & 0x0f) == 0 { // TODO: work out what to do with the realloc //*vals = (*C.RTMP_METHOD)(realloc(unsafe.Pointer(*vals), int((*num+16)* //int(unsafe.Sizeof(*(*vals)))))) *vals = (*C.RTMP_METHOD)(C.realloc(unsafe.Pointer(*vals), C.size_t((*num+16)* int32(unsafe.Sizeof(*(*vals)))))) } tmp := C.malloc(C.size_t(av.av_len + 1)) //tmp := allocate(uintptr(av.av_len + 1)) memmove(tmp, unsafe.Pointer(av.av_val), uintptr(av.av_len)) *indxBytePtr(tmp, int(av.av_len)) = '\000' (*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(*vals), int(*num), int(unsafe.Sizeof(*(*vals)))))).num = C.int(txn) (*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(*vals), int(*num), int(unsafe.Sizeof(*(*vals)))))).name.av_len = av.av_len (*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(*vals), int(*num), int(unsafe.Sizeof(*(*vals)))))).name.av_val = (*C.char)(tmp) (*num)++ } func amfEncodeNamedNumber(output *byte, outend *byte, strName *C.AVal, dVal float64) *byte { if int(uintptr(unsafe.Pointer(output)))+2+int(strName.av_len) > int(uintptr(unsafe.Pointer(outend))) { return nil } output = amfEncodeInt16(output, outend, int16(strName.av_len)) memmove(unsafe.Pointer(output), unsafe.Pointer(strName.av_val), uintptr(strName.av_len)) output = (*byte)(incBytePtr(unsafe.Pointer(output), int(strName.av_len))) return amfEncodeNumber(output, outend, dVal) } func amfEncodeNamedBoolean(output *byte, outend *byte, strName *C.AVal, bVal int) *byte { if int(uintptr(unsafe.Pointer(output)))+2+int(strName.av_len) > int(uintptr(unsafe.Pointer(outend))) { return nil } output = amfEncodeInt16(output, outend, int16(strName.av_len)) memmove(unsafe.Pointer(output), unsafe.Pointer(strName.av_val), uintptr(strName.av_len)) output = (*byte)(incBytePtr(unsafe.Pointer(output), int(strName.av_len))) return amfEncodeBoolean(output, outend, bVal) } func amfEncodeBoolean(output *byte, outend *byte, bVal int) *byte { if int(uintptr(unsafe.Pointer(output)))+2 > int(uintptr(unsafe.Pointer(outend))) { return nil } *(*byte)(unsafe.Pointer(output)) = AMF_BOOLEAN output = (*byte)(incBytePtr(unsafe.Pointer(output), 1)) val := byte(0x01) if bVal == 0 { val = byte(0x00) } *(*byte)(unsafe.Pointer(output)) = val output = (*byte)(incBytePtr(unsafe.Pointer(output), 1)) return output } func amfEncodeNumber(output *byte, outend *byte, dVal float64) *byte { if int(uintptr(unsafe.Pointer(output)))+1+8 > int(uintptr(unsafe.Pointer(outend))) { return nil } // TODO: port this *(*byte)(unsafe.Pointer(output)) = C.AMF_NUMBER output = (*byte)(incBytePtr(unsafe.Pointer(output), 1)) // NOTE: here we are assuming little endian for both byte order and float // word order var ci, co *uint8 ci = (*uint8)(unsafe.Pointer(&dVal)) co = (*uint8)(unsafe.Pointer(output)) for i := 0; i < 8; i++ { *indxBytePtr(unsafe.Pointer(co), i) = *indxBytePtr(unsafe.Pointer(ci), 7-i) } return (*byte)(incBytePtr(unsafe.Pointer(output), 8)) } func amfDecodeNumber(data *byte) float64 { var dVal float64 var ci, co *uint8 ci = (*uint8)(unsafe.Pointer(data)) co = (*uint8)(unsafe.Pointer(&dVal)) for i := 0; i < 8; i++ { *indxBytePtr(unsafe.Pointer(co), i) = *indxBytePtr(unsafe.Pointer(ci), 7-i) } return dVal } func amfEncodeNamedString(output *byte, outend *byte, strName *C.AVal, strValue *C.AVal) *byte { if int(uintptr(unsafe.Pointer(output)))+2+int(strName.av_len) > int(uintptr(unsafe.Pointer(outend))) { return nil } output = amfEncodeInt16(output, outend, int16(strName.av_len)) memmove(unsafe.Pointer(output), unsafe.Pointer(strName.av_val), uintptr(strName.av_len)) output = (*byte)(incBytePtr(unsafe.Pointer(output), int(strName.av_len))) return amfEncodeString(output, outend, strValue) } // amfDecodeString decodes data into a string inside a AVal func amfDecodeString(data *byte, bv *C.AVal) { dataPtr := unsafe.Pointer(data) //bv.av_len = C.int(C.AMF_DecodeInt16((*C.char)(dataPtr))) bv.av_len = C.int(amfDecodeInt16((*byte)(dataPtr))) if bv.av_len > 0 { bv.av_val = (*C.char)(incBytePtr(dataPtr, 2)) } else { bv.av_val = nil } } // amfDecodeInt16 decodes data into a 16 bit number func amfDecodeInt16(data *byte) uint16 { c := unsafe.Pointer(data) return uint16(*(*uint8)(c)<<8 | *(*byte)(incBytePtr(c, 1))) } // amfEncodeInt24 encodes a int24 into data func amfEncodeInt24(output *byte, outend *byte, nVal int32) *byte { outputPtr := unsafe.Pointer(output) outendPtr := unsafe.Pointer(outend) if uintptr(outputPtr)+3 > uintptr(outendPtr) { // length < 3 return nil } // Assign output[2] third := (*byte)(incBytePtr(outputPtr, 2)) *third = (byte)(nVal & 0xff) // Assign output[1] second := (*byte)(incBytePtr(outputPtr, 1)) *second = (byte)(nVal >> 8) // Assign output[0] *output = (byte)(nVal >> 16) return (*byte)(incBytePtr(outputPtr, 3)) } // amfDecodeInt24 decodes data into an unsigned int func amfDecodeInt24(data *byte) uint32 { // TODO Understand logic and simplify c := (*uint8)(unsafe.Pointer(data)) dst := uint32(int32(*c) << 16) dst |= uint32(int32(*((*uint8)(unsafe.Pointer(uintptr(unsafe.Pointer(c)) + (uintptr)(int32(1))*unsafe.Sizeof(*c))))) << 8) dst |= uint32(int32(*((*uint8)(unsafe.Pointer(uintptr(unsafe.Pointer(c)) + (uintptr)(int32(2))*unsafe.Sizeof(*c)))))) return dst } func amfEncodeString(output *byte, outend *byte, bv *C.AVal) *byte { outputPtr := unsafe.Pointer(output) outendPtr := unsafe.Pointer(outend) if (bv.av_len < 65536 && uintptr(incBytePtr(outputPtr, 1+2+int(bv.av_len))) > uintptr(outendPtr)) || uintptr(incBytePtr(outputPtr, 1+4+int(bv.av_len))) > uintptr(outendPtr) { return nil } if bv.av_len < 65536 { *(*byte)(outputPtr) = AMF_STRING outputPtr = incBytePtr(outputPtr, 1) outputPtr = unsafe.Pointer(C.AMF_EncodeInt16((*C.char)(outputPtr), (*C.char)( outendPtr), C.short(bv.av_len))) //outputPtr = unsafe.Pointer(amfEncodeInt16((*byte)(outputPtr), //(*byte)(outendPtr), (int16)(bv.av_len))) } else { *(*byte)(outputPtr) = AMF_LONG_STRING outputPtr = incBytePtr(outputPtr, 1) outputPtr = unsafe.Pointer(C.AMF_EncodeInt32((*C.char)(outputPtr), (*C.char)( outendPtr), C.int(bv.av_len))) //outputPtr = unsafe.Pointer(amfEncodeInt32((*byte)(outputPtr), //(*byte)(outendPtr), (int32)(bv.av_len))) } memmove(unsafe.Pointer(outputPtr), unsafe.Pointer(bv.av_val), uintptr(bv.av_len)) //C.memcpy(unsafe.Pointer(outputPtr), unsafe.Pointer(bv.av_val), (C.size_t)(bv.av_len)) outputPtr = incBytePtr(outputPtr, int(bv.av_len)) return (*byte)(outputPtr) } // amfEncodeInt16 encodes a int16 into data func amfEncodeInt16(output *byte, outend *byte, nVal int16) *byte { outputPtr := unsafe.Pointer(output) outendPtr := unsafe.Pointer(outend) if uintptr(outputPtr)+2 > uintptr(outendPtr) { // length < 2 return nil } // Assign output[1] second := (*byte)(incBytePtr(outputPtr, 1)) *second = (byte)(nVal & 0xff) // Assign output[0] *output = (byte)(nVal >> 8) return (*byte)(incBytePtr(outputPtr, 2)) } // amfEncodeInt32 encodes a int32 into data func amfEncodeInt32(output *byte, outend *byte, nVal int32) *byte { outputPtr := unsafe.Pointer(output) outendPtr := unsafe.Pointer(outend) if uintptr(outputPtr)+4 > uintptr(outendPtr) { // length < 4 return nil } // Assign output[3] forth := (*byte)(incBytePtr(outputPtr, 3)) *forth = (byte)(nVal & 0xff) // Assign output[2] third := (*byte)(incBytePtr(outputPtr, 2)) *third = (byte)(nVal >> 8) // Assign output[1] second := (*byte)(incBytePtr(outputPtr, 1)) *second = (byte)(nVal >> 16) // Assign output[0] *output = (byte)(nVal >> 24) return (*byte)(incBytePtr(outputPtr, 4)) } func realloc(ptr unsafe.Pointer, size int) unsafe.Pointer { dest := allocate(uintptr(size)) if ptr != nil { memmove(dest, ptr, uintptr(size)) } return dest } func memmove(to, from unsafe.Pointer, n uintptr) { copy(ptrToSlice(to, int(n)), ptrToSlice(from, int(n))) } // TODO: write test for this func func memcmp(a, b unsafe.Pointer, size int) int { for i := 0; i < size; i++ { aValue := *indxBytePtr(a, i) bValue := *indxBytePtr(b, i) if aValue != bValue { if aValue < bValue { return -1 } else { return 1 } } } return 0 } func memset(ptr *byte, val int, num int) { for i := 0; i < num; i++ { *indxBytePtr(unsafe.Pointer(ptr), int(i)) = byte(uint8(val)) } } func strLen(str string) int { return len(str) } // wrapper for converting byte pointer to unsafe.Pointer func bToUP(b *byte) unsafe.Pointer { return unsafe.Pointer(b) } // wrapper for converting slice to unsafe.pointer func sToUP(b []byte) unsafe.Pointer { return unsafe.Pointer(&b[0]) } // Creates a new C style string from a go string func goStrToCStr(str string) *byte { l := len(str) slice := make([]byte, l+1) ptr := unsafe.Pointer(&[]byte(str)[0]) for i := 0; i < l; i++ { slice[i] = *indxBytePtr(ptr, i) } slice[l] = '\000' return &slice[0] } // Duplicates a string given as a byte pointer func strdup(str *byte) *byte { length := strlen(str) newMem := make([]byte, length+1) oldMem := ptrToSlice(unsafe.Pointer(str), int(length+1)) copy(newMem, oldMem) return &newMem[0] } // Gets the length of the string found at str - length is number of chars // between start and terminating null char. Returns -1 if a null char is not // found before a count of 1000 func strlen(str *byte) int32 { var ptr *byte for i := 0; i < 1000; i++ { ptr = indxBytePtr(unsafe.Pointer(str), i) if *ptr == '\000' { return int32(i) } } return int32(-1) } // Returns the pointer where the first occurance of val is located in a string // which is terminated by a null char. Returns nil if null char is not found // before a count of 10000 func strchr(str *byte, val byte) *byte { var ptr *byte for i := 0; i < 1000; i++ { ptr = indxBytePtr(unsafe.Pointer(str), i) if *ptr == val { return ptr } if *ptr == '\000' { break } } return nil } // Creates mem of the size noOfBytes. returns as unsafe pointer func allocate(nOfBytes uintptr) unsafe.Pointer { mem := make([]byte, int(nOfBytes)) return unsafe.Pointer(&mem[0]) } // indxBytePtr returns a byte at the indx inc give a ptr func indxBytePtr(ptr unsafe.Pointer, inc int) *byte { return (*byte)(incPtr(ptr, inc, byteSize)) } // indxInt32Ptr returns an int32 at the indx inc given a ptr func indxInt32Ptr(ptr unsafe.Pointer, inc int) *int32 { return (*int32)(incPtr(ptr, inc, int32Size)) } // indxInt64Ptr returns an int64 at the indx inc given a ptr func indxInt64Ptr(ptr unsafe.Pointer, inc int) *int64 { return (*int64)(incPtr(ptr, inc, int64Size)) } // incBytePtr returns an unsafe.Pointer to a byte that is inc positive positions // from the passed ptr func incBytePtr(ptr unsafe.Pointer, inc int) unsafe.Pointer { return incPtr(ptr, inc, byteSize) } // incInt32Ptr returns an unsafe.Pointer to an int32 that is inc positive // positions from the passed ptr func incInt32Ptr(ptr unsafe.Pointer, inc int) unsafe.Pointer { return incPtr(ptr, inc, int32Size) } // incInt64Ptr returns an unsafe.Pointer to an int64 that is inc positive // positions from the passed ptr func incInt64Ptr(ptr unsafe.Pointer, inc int) unsafe.Pointer { return incPtr(ptr, inc, int64Size) } // incPtr attempts to replicate C like pointer arithmatic functionality func incPtr(ptr unsafe.Pointer, inc, typeSize int) unsafe.Pointer { return unsafe.Pointer(uintptr(ptr) + uintptr(inc*typeSize)) } // incPtr attempts to replicate C like pointer arithmatic functionality func decPtr(ptr unsafe.Pointer, dec, typeSize int) unsafe.Pointer { return unsafe.Pointer(uintptr(ptr) - uintptr(dec*typeSize)) } // decBytePtr returns an unsafe.Pointer to a byte that is dec negative positions // from ptr func decBytePtr(ptr unsafe.Pointer, dec int) unsafe.Pointer { return decPtr(ptr, dec, byteSize) } // decBytePtr returns an unsafe.Pointer to a int32 that is dec negative positions // from ptr func decInt32Ptr(ptr unsafe.Pointer, dec int) unsafe.Pointer { return decPtr(ptr, dec, int32Size) } // decBytePtr returns an unsafe.Pointer to a int64 that is dec negative positions // from ptr func decInt64Ptr(ptr unsafe.Pointer, dec int) unsafe.Pointer { return decPtr(ptr, dec, int64Size) } // sliceToPtr get's the address of the first data element and returns as unsafe // pointer func sliceToPtr(data []byte) unsafe.Pointer { return unsafe.Pointer(&data[0]) } // ptrToSlice returns a slice given unsafe pointer and size - no allocation and // copying is required - same data is used. func ptrToSlice(data unsafe.Pointer, size int) []byte { var ret []byte shDest := (*reflect.SliceHeader)(unsafe.Pointer(&ret)) shDest.Data = uintptr(data) shDest.Len = size shDest.Cap = size return ret } // C.AVal is in amf.h // See AVC(str) {str, sizeof(str)-1} in amf.h func AVC(str string) C.AVal { var aval C.AVal aval.av_val = C.CString(str) aval.av_len = C.int(len(str)) return aval } var rtmpErrs = [...]string{ 1: "rtmp: not connected", 2: "rtmp: write error", 3: "rtmp: not started", } type Err uint func (e Err) Error() string { if 0 <= int(e) && int(e) < len(rtmpErrs) { s := rtmpErrs[e] if s != "" { return s } } return "rtmp: " + strconv.Itoa(int(e)) }