/* NAME rtmp.go DESCRIPTION See Readme.md AUTHOR Saxon Nelson-Milton Dan Kortschak Jake Lane LICENSE rtmp.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. */ package rtmp /* #cgo CFLAGS: -I/usr/local/include/librtmp #cgo LDFLAGS: -L/usr/local/lib -lrtmp #include #include #include #include typedef enum { RTMPT_OPEN=0, RTMPT_SEND, RTMPT_IDLE, RTMPT_CLOSE } RTMPTCmd; RTMP* start_session(RTMP* rtmp, char* url, uint connect_timeout); int write_frame(RTMP* rtmp, char* data, uint data_length); int end_session(RTMP* rtmp); void AV_queue(RTMP_METHOD **vals, int *num, AVal *av, int txn); int WriteN(RTMP *r, const char *buffer, int n); int EncodeInt32LE(char *output, int nVal); int HTTP_Post(RTMP *r, RTMPTCmd cmd, const char *buf, int len); */ import "C" import ( "errors" "fmt" "log" "math" "reflect" "strconv" "unsafe" ) const ( RTMPT_OPEN = iota RTMPT_SEND RTMPT_IDLE RTMPT_CLOSE ) const ( RTMP_PACKET_TYPE_CHUNK_SIZE = 0x01 RTMP_PACKET_TYPE_BYTES_READ_REPORT = 0x03 RTMP_PACKET_TYPE_CONTROL = 0x04 RTMP_PACKET_TYPE_SERVER_BW = 0x05 RTMP_PACKET_TYPE_CLIENT_BW = 0x06 RTMP_PACKET_TYPE_AUDIO = 0x08 RTMP_PACKET_TYPE_VIDEO = 0x09 RTMP_PACKET_TYPE_FLEX_STREAM_SEND = 0x0F RTMP_PACKET_TYPE_FLEX_SHARED_OBJECT = 0x10 RTMP_PACKET_TYPE_FLEX_MESSAGE = 0x11 RTMP_PACKET_TYPE_INFO = 0x12 RTMP_PACKET_TYPE_SHARED_OBJECT = 0x13 RTMP_PACKET_TYPE_INVOKE = 0x14 RTMP_PACKET_TYPE_FLASH_VIDEO = 0x16 RTMP_MAX_HEADER_SIZE = 18 RTMP_PACKET_SIZE_LARGE = 0 RTMP_PACKET_SIZE_MEDIUM = 1 RTMP_PACKET_SIZE_SMALL = 2 RTMP_PACKET_SIZE_MINIMUM = 3 RTMP_READ_HEADER = 0x01 RTMP_READ_RESUME = 0x02 RTMP_READ_NO_IGNORE = 0x04 RTMP_READ_GOTKF = 0x08 RTMP_READ_GOTFLVK = 0x10 RTMP_READ_SEEKING = 0x20 RTMP_READ_COMPLETE = -3 RTMP_READ_ERROR = -2 RTMP_READ_EOF = -1 RTMP_READ_IGNORE = 0 RTMP_LF_AUTH = 0x0001 /* using auth param */ RTMP_LF_LIVE = 0x0002 /* stream is live */ RTMP_LF_SWFV = 0x0004 /* do SWF verification */ RTMP_LF_PLST = 0x0008 /* send playlist before play */ RTMP_LF_BUFX = 0x0010 /* toggle stream on BufferEmpty msg */ RTMP_LF_FTCU = 0x0020 /* free tcUrl on close */ RTMP_LF_FAPU = 0x0040 /* free app on close */ RTMP_FEATURE_HTTP = 0x01 RTMP_FEATURE_ENC = 0x02 RTMP_FEATURE_SSL = 0x04 RTMP_FEATURE_MFP = 0x08 /* not yet supported */ RTMP_FEATURE_WRITE = 0x10 /* publish, not play */ RTMP_FEATURE_HTTP2 = 0x20 /* server-side rtmpt */ RTMP_PROTOCOL_UNDEFINED = -1 RTMP_PROTOCOL_RTMP = 0 RTMP_PROTOCOL_RTMPE = RTMP_FEATURE_ENC RTMP_PROTOCOL_RTMPT = RTMP_FEATURE_HTTP RTMP_PROTOCOL_RTMPS = RTMP_FEATURE_SSL RTMP_PROTOCOL_RTMPTE = (RTMP_FEATURE_HTTP | RTMP_FEATURE_ENC) RTMP_PROTOCOL_RTMPTS = (RTMP_FEATURE_HTTP | RTMP_FEATURE_SSL) RTMP_PROTOCOL_RTMFP = RTMP_FEATURE_MFP RTMP_DEFAULT_CHUNKSIZE = 128 RTMP_BUFFER_CACHE_SIZE = (16 * 1024) RTMP_CHANNELS = 65600 RTMP_SWF_HASHLEN = 32 ) const ( minDataSize = 11 debugMode = false ) const ( byteSize = 1 int32Size = 4 int64Size = 8 ) // memmove copies n bytes from "from" to "to". //go:linkname memmove runtime.memmove func memmove(to, from unsafe.Pointer, n uintptr) // av_setDataFrame is a static const global in rtmp.c var setDataFrame = AVC("@setDataFrame") var packetSize = [...]int{12, 8, 4, 1} var RTMPProtocolStringsLower = [...]string{ "rtmp", "rtmpt", "rtmpe", "rtmpte", "rtmps", "rtmpts", "", "", "rtmfp", } // Session provides an interface for sending flv tags over rtmp. type Session interface { Open() error Write([]byte) (int, error) Close() error } // session provides parameters required for an rtmp communication session. type session struct { rtmp *C.RTMP url string timeout uint } type RTMP struct { m_inChunkSize int m_outChunkSize int m_nBWCheckCounter int m_nBytesIn int m_nBytesInSent int m_nBufferMS int m_stream_id int m_mediaChannel int m_mediaStamp uint32 m_pauseStamp uint32 m_pausing int m_nServerBw int m_nClientBw int m_nClientBw2 uint8 m_bPlaying uint8 m_bSendEncoding uint8 m_bSendCounter uint8 m_numInvokes int m_numCalls int m_methodCalls *RTMP_METHOD m_channelsAllocatedIn int m_channelsAllocatedOut int m_vecChannelsIn **RTMPPacket m_vecChannelsOut **RTMPPacket m_channelTimestamp *int m_fAudioCodecs float64 m_fVideoCodecs float64 m_fEncoding float64 m_fDuration float64 m_msgCounter int m_polling int m_resplen int m_unackd int m_clientID AVal m_read RTMP_READ m_write RTMPPacket m_sb RTMPSockBuf Link RTMP_LNK } type RTMPPacket struct { m_headerType uint8 m_packetType uint8 m_hasAbsTimestamp uint8 m_nChannel int m_nTimeStamp uint32 m_nInfoField2 int32 m_nBodySize uint32 m_nBytesRead uint32 m_chunk *RTMPChunk m_body *byte } type RTMP_METHOD struct { name AVal num int } type AVal struct { av_val *byte av_len int } type RTMP_READ struct { buf *byte bufpos *byte buflen uint timestamp uint32 dataType uint8 flags uint8 status int8 initialFrameType uint8 nResumeTS uint32 metaHeader *byte initialFrame *byte nMetaHeaderSize uint32 nInitialFrameSize uint32 nIgnoredFrameCounter uint32 nIgnoredFlvFrameCounter uint32 } type RTMPSockBuf struct { sb_socket int sb_size int sb_start *byte sb_buf [C.RTMP_BUFFER_CACHE_SIZE]byte // port const sb_timedout int sb_ssl uintptr } type RTMPChunk struct { c_headerSize int c_chunkSize int c_chunk *byte c_header [RTMP_MAX_HEADER_SIZE]byte } type ushort [2]byte type RTMP_LNK struct { hostname AVal sockshost AVal playpath0 AVal playpath AVal tcUrl AVal swfUrl AVal pageUrl AVal app AVal auth AVal flashVer AVal subscribepath AVal usherToken AVal token AVal pubUser AVal pubPasswd AVal extras AMFObject edepth int seekTime int stopTime int lFlags int swfAge int protocol int timeout int pFlags int socksport ushort port ushort } type AMFObject struct { o_num int o_props *C.AMFObjectProperty } var _ Session = (*session)(nil) // NewSession returns a new session. func NewSession(url string, connectTimeout uint) Session { return &session{ url: url, timeout: connectTimeout, } } // Open establishes an rtmp connection with the url passed into the // constructor func (s *session) Open() error { if s.rtmp != nil { return errors.New("rtmp: attempt to start already running session") } var err error s.rtmp, err = startSession(s.rtmp, s.url, uint32(s.timeout)) if s.rtmp == nil { return err } return nil } // Close terminates the rtmp connection func (s *session) Close() error { if s.rtmp == nil { return Err(3) } ret := endSession(s.rtmp) s.rtmp = nil if ret != 0 { return Err(ret) } return nil } // Write writes a frame (flv tag) to the rtmp connection func (s *session) Write(data []byte) (int, error) { if s.rtmp == nil { return 0, Err(3) } if C.RTMP_IsConnected(s.rtmp) == 0 { return 0, Err(1) } //if C.RTMP_Write(s.rtmp,(*C.char)(unsafe.Pointer(&data[0])),C.int(len(data))) == 0 { if rtmpWrite(s.rtmp, data) == 0 { return 0, Err(2) } return len(data), nil } func startSession(rtmp *C.RTMP, u string, timeout uint32) (*C.RTMP, error) { connect_timeout := C.int(timeout) rtmp = rtmpAlloc() rtmpInit(rtmp) rtmp.Link.timeout = connect_timeout if rtmpSetupUrl(rtmp, u) == 0 { C.RTMP_Close(rtmp) C.RTMP_Free(rtmp) return nil, errors.New("rtmp startSession: Failed to setup URL!") } C.RTMP_EnableWrite(rtmp) C.RTMP_SetBufferMS(rtmp, 3600*1000) if C.RTMP_Connect(rtmp, nil) == 0 { C.RTMP_Close(rtmp) C.RTMP_Free(rtmp) return nil, errors.New("rtmp startSession: Failed to connect!") } if C.RTMP_ConnectStream(rtmp, 0) == 0 { C.RTMP_Close(rtmp) C.RTMP_Free(rtmp) return nil, errors.New("rtmp startSession: Failed to connect stream!") } return rtmp, nil } func rtmpAlloc() *C.RTMP { var r C.RTMP return (*C.RTMP)(allocate(unsafe.Sizeof(r))) } func rtmpInit(r *C.RTMP) { r.m_sb.sb_socket = -1 r.m_inChunkSize = RTMP_DEFAULT_CHUNKSIZE r.m_outChunkSize = RTMP_DEFAULT_CHUNKSIZE r.m_nBufferMS = 30000 r.m_nClientBW = 2500000 r.m_nClientBW2 = 2 r.m_nServerBW = 2500000 r.m_fAudioCodecs = 3191.0 r.m_fVideoCodecs = 252.0 r.Link.timeout = 30 r.Link.swfAge = 30 } func rtmpSetupUrl(r *C.RTMP, u string) int32 { length := len(u) url := goStrToCStr(u) var opt, arg AVal var p1, p2 *byte ptr := strchr(url, byte(' ')) var ret, len int32 var port uint32 port = 0 len = strlen(url) ret = int32(C.RTMP_ParseURL((*C.char)(unsafe.Pointer(url)), &r.Link.protocol, &r.Link.hostname, (*C.uint)(&port), &r.Link.playpath0, &r.Link.app)) if ret == 0 { return ret } r.Link.port = C.ushort(port) r.Link.playpath = r.Link.playpath0 if r.Link.tcUrl.av_len == 0 { r.Link.tcUrl.av_val = (*C.char)(unsafe.Pointer(url)) if r.Link.app.av_len != 0 { if int(uintptr(unsafe.Pointer(r.Link.app.av_val))) < int(uintptr(incBytePtr(unsafe.Pointer(url), len))) { r.Link.tcUrl.av_len = r.Link.app.av_len + int(uintptr(decBytePtr(r.Link.app.av_val, int(uintptr(unsafe.Pointer(url)))))) } else { len = r.Link.hostname.av_len + r.Link.app.av_len + int(unsafe.Sizeof("rtmpte://:65535/")) r.Link.tcUrl.av_val = (*C.char)(allocate(uintptr(len))) hostname := string(ptrToSlice(unsafe.Pointer(r.Link.hostname.av_val), int(r.Link.hostname.av_len))) app := string(ptrToSlice(unsafe.Pointer(r.Link.app.av_val), int(r.Link.app.av_len))) fString := fmt.Sprintf("%v://%v:%v/%v", RTMPProtocolStringsLower[r.Link.protocol], hostname, r.Link.port, app) r.Link.tcUrl.av_val = (*C.char)(goStrToCStr(fString)) r.Link.tcUrl.av_len = len(RTMPProtocolStringsLower[r.Link.protocol]) + len(string("://")) + len(hostname) + len(string(":")) + len(strconv.Itoa(int(r.Link.port))) + len(string("/")) + len(app) r.Link.lFlags |= RTMP_LF_FTCU } } else { r.Link.tcUrl.av_len = strlen(url) } } C.SocksSetup(r, &r.Link.sockshost) if r.Link.port == 0 { switch { case r.Link.Protocol & RRTMP_FEATURE_SSL: r.Link.port = 433 case r.Link.protocol & RTMP_FEATURE_HTTP: r.Link.port = 80 default: r.Link.port = 1935 } } return 1 } func socksSetup(r *RTMP, sockshost *AVal) { if sockshost.av_len != 0 { socksport := strchr((*byte)(unsafe.Pointer(sockshost.av_val)), ':') hostname := strdup((*byte)(sockshost.av_val)) } } func endSession(rtmp *C.RTMP) uint32 { if rtmp == nil { return 3 } C.RTMP_Close(rtmp) C.RTMP_Free(rtmp) return 0 } // rtmpWrite writes data to the current rtmp connection encapsulated by r func rtmpWrite(r *C.RTMP, data []byte) int { buf := sliceToPtr(data) // TODO: port RTMPPacket var pkt = &r.m_write var pend, enc unsafe.Pointer size := len(data) s2 := size var ret, num int pkt.m_nChannel = 0x04 pkt.m_nInfoField2 = C.int32_t(r.m_stream_id) for s2 != 0 { if pkt.m_nBytesRead == 0 { if size < minDataSize { log.Printf("size: %d\n", size) log.Printf("too small \n") return 0 } if *indxBytePtr(buf, 0) == 'F' && *indxBytePtr(buf, 1) == 'L' && *indxBytePtr(buf, 2) == 'V' { buf = unsafe.Pointer(uintptr(buf) + uintptr(13)) s2 -= 13 } pkt.m_packetType = C.uint8_t(*indxBytePtr(buf, 0)) buf = incBytePtr(buf, 1) pkt.m_nBodySize = C.uint32_t(afmDecodeInt24((*byte)(buf))) buf = incBytePtr(buf, 3) pkt.m_nTimeStamp = C.uint32_t(afmDecodeInt24((*byte)(buf))) buf = incBytePtr(buf, 3) pkt.m_nTimeStamp |= C.uint32_t(*indxBytePtr(buf, 0)) << 24 buf = incBytePtr(buf, 4) s2 -= 11 if ((pkt.m_packetType == RTMP_PACKET_TYPE_AUDIO || pkt.m_packetType == RTMP_PACKET_TYPE_VIDEO) && pkt.m_nTimeStamp == 0) || pkt.m_packetType == RTMP_PACKET_TYPE_INFO { pkt.m_headerType = RTMP_PACKET_SIZE_LARGE if pkt.m_packetType == RTMP_PACKET_TYPE_INFO { pkt.m_nBodySize += 16 } } else { pkt.m_headerType = RTMP_PACKET_SIZE_MEDIUM } // TODO: Port this if int(C.RTMPPacket_Alloc(pkt, pkt.m_nBodySize)) == 0 { log.Println("Failed to allocate packet") return 0 } enc = unsafe.Pointer(pkt.m_body) pend = incBytePtr(enc, int(pkt.m_nBodySize)) if pkt.m_packetType == RTMP_PACKET_TYPE_INFO { enc = unsafe.Pointer(afmEncodeString((*byte)(enc), (*byte)(pend), &setDataFrame)) pkt.m_nBytesRead = C.uint32_t(math.Abs(float64(uintptr(enc) - uintptr(unsafe.Pointer(pkt.m_body))))) } } else { enc = incBytePtr(unsafe.Pointer(pkt.m_body), int(pkt.m_nBytesRead)) } num = int(pkt.m_nBodySize - pkt.m_nBytesRead) if num > s2 { num = s2 } //memmove(enc,buf,uintptr(num)) copy(ptrToSlice(enc, num), ptrToSlice(buf, num)) pkt.m_nBytesRead += C.uint32_t(num) s2 -= num buf = incBytePtr(buf, num) if pkt.m_nBytesRead == pkt.m_nBodySize { // TODO: Port this ret = sendPacket(r, pkt, 0) // TODO: Port this C.RTMPPacket_Free(pkt) pkt.m_nBytesRead = 0 if ret == 0 { return -1 } buf = incBytePtr(buf, 4) s2 -= 4 if s2 < 0 { break } } } return size + s2 } // afmDecodeInt24 decodes data into an unsigned int func afmDecodeInt24(data *byte) uint32 { // TODO Understand logic and simplify c := (*uint8)(unsafe.Pointer(data)) dst := uint32(int32(*c) << 16) dst |= uint32(int32(*((*uint8)(unsafe.Pointer(uintptr(unsafe.Pointer(c)) + (uintptr)(int32(1))*unsafe.Sizeof(*c))))) << 8) dst |= uint32(int32(*((*uint8)(unsafe.Pointer(uintptr(unsafe.Pointer(c)) + (uintptr)(int32(2))*unsafe.Sizeof(*c)))))) return dst } func afmEncodeString(output *byte, outend *byte, bv *C.AVal) *byte { outputPtr := unsafe.Pointer(output) outendPtr := unsafe.Pointer(outend) if (bv.av_len < 65536 && uintptr(incBytePtr(outputPtr, 1+2+int(bv.av_len))) > uintptr(outendPtr)) || uintptr(incBytePtr(outputPtr, 1+4+int(bv.av_len))) > uintptr(outendPtr) { return nil } if bv.av_len < 65536 { *(*C.char)(outputPtr) = C.AMF_STRING incBytePtr(outputPtr, 1) outputPtr = unsafe.Pointer(afmEncodeInt16((*byte)(outputPtr), (*byte)(outendPtr), (int16)(bv.av_len))) } else { *(*C.char)(outputPtr) = C.AMF_LONG_STRING incBytePtr(outputPtr, 1) outputPtr = unsafe.Pointer(afmEncodeInt32((*byte)(outputPtr), (*byte)(outendPtr), (int32)(bv.av_len))) } memmove(unsafe.Pointer(outputPtr), unsafe.Pointer(bv.av_val), uintptr(bv.av_len)) //C.memcpy(unsafe.Pointer(outputPtr), unsafe.Pointer(bv.av_val), (C.size_t)(bv.av_len)) incBytePtr(outputPtr, int(bv.av_len)) return (*byte)(outputPtr) } // afmEncodeInt16 encodes a int16 into data func afmEncodeInt16(output *byte, outend *byte, nVal int16) *byte { outputPtr := unsafe.Pointer(output) outendPtr := unsafe.Pointer(outend) if uintptr(outputPtr)+2 > uintptr(outendPtr) { // length < 2 return nil } // Assign output[1] second := (*byte)(incBytePtr(outputPtr, 1)) *second = (byte)(nVal & 0xff) // Assign output[0] *output = (byte)(nVal >> 8) return (*byte)(incBytePtr(outputPtr, 2)) } // afmEncodeInt32 encodes a int32 into data func afmEncodeInt32(output *byte, outend *byte, nVal int32) *byte { outputPtr := unsafe.Pointer(output) outendPtr := unsafe.Pointer(outend) if uintptr(outputPtr)+4 > uintptr(outendPtr) { // length < 4 return nil } // Assign output[3] forth := (*byte)(incBytePtr(outputPtr, 3)) *forth = (byte)(nVal & 0xff) // Assign output[2] third := (*byte)(incBytePtr(outputPtr, 2)) *third = (byte)(nVal >> 8) // Assign output[1] second := (*byte)(incBytePtr(outputPtr, 1)) *second = (byte)(nVal >> 16) // Assign output[0] *output = (byte)(nVal >> 24) return (*byte)(incBytePtr(outputPtr, 4)) } // send packet version 1 - less C stuff func sendPacket(r *C.RTMP, packet *C.RTMPPacket, queue int) int { var prevPacket *C.RTMPPacket last := 0 var nSize, hSize, cSize, nChunkSize, tlen int var header, hptr, hend, buffer, tbuf, toff unsafe.Pointer var goHbuf [RTMP_MAX_HEADER_SIZE]byte var hbuf = unsafe.Pointer(&goHbuf[0]) var c byte var t int32 var packets unsafe.Pointer if packet.m_nChannel >= r.m_channelsAllocatedOut { log.Println("Resize") n := int(packet.m_nChannel + 10) packets = C.realloc(unsafe.Pointer(r.m_vecChannelsOut), C.size_t(unsafe.Sizeof(packet)*uintptr(n))) if uintptr(packets) == uintptr(0) { C.free(unsafe.Pointer(r.m_vecChannelsOut)) r.m_vecChannelsOut = nil r.m_channelsAllocatedOut = 0 return 0 } r.m_vecChannelsOut = (**C.RTMPPacket)(packets) C.memset(incPtr(unsafe.Pointer(r.m_vecChannelsOut), int(r.m_channelsAllocatedOut), int(unsafe.Sizeof(packet))), 0, C.size_t(unsafe.Sizeof(packet)* uintptr(n-int(r.m_channelsAllocatedOut)))) r.m_channelsAllocatedOut = C.int(n) } prevPacket = *(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsOut), int(packet.m_nChannel), int(unsafe.Sizeof(packet)))) if prevPacket != nil && packet.m_headerType != RTMP_PACKET_SIZE_LARGE { // compress a bit by using the prev packet's attributes if prevPacket.m_nBodySize == packet.m_nBodySize && prevPacket.m_packetType == packet.m_packetType && packet.m_headerType == RTMP_PACKET_SIZE_MEDIUM { packet.m_headerType = RTMP_PACKET_SIZE_SMALL } if prevPacket.m_nTimeStamp == packet.m_nTimeStamp && packet.m_headerType == RTMP_PACKET_SIZE_SMALL { // TODO: port this constant packet.m_headerType = RTMP_PACKET_SIZE_MINIMUM } last = int(prevPacket.m_nTimeStamp) } if packet.m_headerType > 3 { log.Printf("Sanity failed! trying to send header of type: 0x%02x.", packet.m_headerType) return 0 } nSize = packetSize[int(packet.m_headerType)] hSize = nSize cSize = 0 t = int32(int(packet.m_nTimeStamp) - last) if packet.m_body != nil { header = decBytePtr(unsafe.Pointer(packet.m_body), nSize) hend = unsafe.Pointer(packet.m_body) } else { header = incBytePtr(hbuf, 6) // TODO: be cautious about this sizeof - make sure it works how you think it // does. C code used sizeof(hbuf) where hbuf is a *char hend = incBytePtr(hbuf, RTMP_MAX_HEADER_SIZE) } switch { case packet.m_nChannel > 319: cSize = 2 case packet.m_nChannel > 63: cSize = 1 } if cSize != 0 { header = decBytePtr(header, cSize) hSize += cSize } if t >= 0xffffff { header = decBytePtr(header, 4) hSize += 4 log.Printf("Larger timestamp than 24-bit: 0x%v", t) } hptr = header c = byte(packet.m_headerType) << 6 switch cSize { case 0: c |= byte(packet.m_nChannel) case 1: case 2: c |= byte(1) } *(*byte)(hptr) = c hptr = incBytePtr(hptr, 1) if cSize != 0 { tmp := packet.m_nChannel - 64 *(*byte)(hptr) = byte(tmp & 0xff) hptr = incBytePtr(hptr, 1) if cSize == 2 { *(*byte)(hptr) = byte(tmp >> 8) hptr = incBytePtr(hptr, 1) } } if nSize > 1 { res := t if t > 0xffffff { res = 0xffffff } hptr = unsafe.Pointer(C.AMF_EncodeInt24((*C.char)(hptr), (*C.char)(hend), C.int(res))) } if nSize > 4 { hptr = unsafe.Pointer(C.AMF_EncodeInt24((*C.char)(hptr), (*C.char)(hend), C.int(packet.m_nBodySize))) *(*byte)(hptr) = byte(packet.m_packetType) hptr = incBytePtr(hptr, 1) } if nSize > 8 { hptr = incBytePtr(hptr, int(C.EncodeInt32LE((*C.char)(hptr), C.int(packet.m_nInfoField2)))) } if t >= 0xffffff { hptr = unsafe.Pointer(afmEncodeInt32((*byte)(hptr), (*byte)(hend), (int32)(t))) } nSize = int(packet.m_nBodySize) buffer = unsafe.Pointer(packet.m_body) nChunkSize = int(r.m_outChunkSize) if debugMode { log.Printf("sendPacket: fd=%v, size=%v", r.m_sb.sb_socket, nSize) } // send all chunks in one HTTP request // TODO: port RTMP_FEATURE_HTTP if int(r.Link.protocol&RTMP_FEATURE_HTTP) != 0 { chunks := (nSize + nChunkSize - 1) / nChunkSize if chunks > 1 { tlen = chunks*(cSize+1) + nSize + hSize // TODO: figure out how to do this in go tbuf = C.malloc(C.size_t(tlen)) if tbuf == nil { return 0 } toff = tbuf } } for (nSize + hSize) != 0 { var wrote int if nSize < nChunkSize { nChunkSize = nSize } if tbuf != nil { //memmove(toff, header, uintptr(nChunkSize + hSize)) copy(ptrToSlice(toff, int(nChunkSize+hSize)), ptrToSlice(header, int(nChunkSize+hSize))) toff = incBytePtr(toff, nChunkSize+hSize) } else { // TODO: port this wrote = int(writeN(r, header, nChunkSize+hSize)) if wrote == 0 { return 0 } } nSize -= nChunkSize buffer = incBytePtr(buffer, nChunkSize) hSize = 0 if nSize > 0 { header = decBytePtr(buffer, 1) hSize = 1 if cSize != 0 { header = decBytePtr(header, cSize) hSize += cSize } if t >= 0xffffff { header = decBytePtr(header, 4) hSize += 4 } *(*byte)(header) = byte(0xc0 | c) if cSize != 0 { tmp := int(packet.m_nChannel) - 64 *indxBytePtr(header, 1) = byte(tmp & 0xff) if cSize == 2 { *indxBytePtr(header, 2) = byte(tmp >> 8) } } if t >= 0xffffff { extendedTimestamp := incBytePtr(header, 1+cSize) afmEncodeInt32((*byte)(extendedTimestamp), (*byte)(incBytePtr(extendedTimestamp, 4)), (int32)(t)) } } } if tbuf != nil { // TODO: port C.writeN wrote := int(writeN(r, tbuf, int(uintptr(decBytePtr(toff, int(uintptr(unsafe.Pointer(tbuf)))))))) C.free(tbuf) tbuf = nil if wrote == 0 { return 0 } } // We invoked a remote method // TODO: port the const if packet.m_packetType == RTMP_PACKET_TYPE_INVOKE { // TODO: port C.AVal var method C.AVal var ptr unsafe.Pointer ptr = incBytePtr(unsafe.Pointer(packet.m_body), 1) afmDecodeString((*byte)(ptr), &method) if debugMode { log.Printf("Invoking %v", method.av_val) } // keep it in call queue till result arrives if queue != 0 { var txn int ptr = incBytePtr(ptr, 3+int(method.av_len)) // TODO: port this txn = int(C.AMF_DecodeNumber((*C.char)(ptr))) // TODO: port this C.AV_queue(&r.m_methodCalls, &r.m_numCalls, &method, C.int(txn)) } } if *(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsOut), int(packet.m_nChannel), int(unsafe.Sizeof(packet)))) == nil { *(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsOut), int(packet.m_nChannel), int(unsafe.Sizeof(packet)))) = (*C.RTMPPacket)(C.malloc(C.size_t(unsafe.Sizeof(*packet)))) } memmove(unsafe.Pointer(*(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsOut), int(packet.m_nChannel), int(unsafe.Sizeof(packet))))), unsafe.Pointer(packet), uintptr(unsafe.Sizeof(*packet))) //C.memcpy(unsafe.Pointer(*(**C.RTMPPacket)(incPtr(unsafe.Pointer(r.m_vecChannelsOut), //int(packet.m_nChannel), int(unsafe.Sizeof(packet))))), unsafe.Pointer(packet), //C.size_t(uintptr(unsafe.Sizeof(*packet)))) return 1 } // afmDecodeString decodes data into a string inside a AVal func afmDecodeString(data *byte, bv *C.AVal) { dataPtr := unsafe.Pointer(data) bv.av_len = C.int(afmDecodeInt16((*byte)(dataPtr))) if bv.av_len > 0 { bv.av_val = (*C.char)(incBytePtr(dataPtr, 2)) } else { bv.av_val = nil } } // afmDecodeInt16 decodes data into a 16 bit number func afmDecodeInt16(data *byte) uint16 { c := unsafe.Pointer(data) return (uint16(*(*uint8)(c)) << 8) | *(*uint16)(incBytePtr(c, 1)) } func writeN(r *C.RTMP, buffer unsafe.Pointer, n int) int { ptr := buffer for n > 0 { var nBytes int if (r.Link.protocol & RTMP_FEATURE_HTTP) != 0 { // TODO: port HTTP_POST nBytes = httpPost(r, RTMPT_SEND, (*byte)(ptr), n) } else { // TODO: port this if necessary nBytes = int(sockBufSend(&r.m_sb, (*byte)(ptr), int32(n))) } if nBytes < 0 { if debugMode { log.Println("WriteN, RTMP send error") } // TODO: port this C.RTMP_Close(r) n = 1 break } if nBytes == 0 { break } n -= nBytes ptr = incBytePtr(ptr, nBytes) } if n == 0 { return 1 } return 0 } const length = 512 var RTMPT_cmds = []string{ "open", "send", "idle", "close", } func httpPost(r *C.RTMP, cmd C.RTMPTCmd, buf *byte, l int) int { res := "" if r.m_clientID.av_val != nil { res = string(ptrToSlice(unsafe.Pointer(r.m_clientID.av_val), int(r.m_clientID.av_len))) } fString := fmt.Sprintf("POST /%s%s/%d HTTP/1.1\r\n"+ "Host: %v:%d\r\n"+ "Accept: */*\r\n"+ "User-Agent: Shockwave Flash\r\n"+ "Connection: Keep-Alive\r\n"+ "Cache-Control: no-cache\r\n"+ "Content-type: application/x-fcs\r\n"+ "Content-length: %d\r\n\r\n", RTMPT_cmds[cmd], res, r.m_msgCounter, r.Link.hostname.av_len, r.Link.hostname.av_val, r.Link.port, l) hlen := len(fString) hbuf := (*byte)(unsafe.Pointer(&(([]byte(fString))[0]))) // TODO: port this sockBufSend(&r.m_sb, (*byte)(unsafe.Pointer(hbuf)), int32(hlen)) hlen = int(sockBufSend(&r.m_sb, (*byte)(unsafe.Pointer(buf)), int32(l))) r.m_msgCounter++ r.m_unackd++ return hlen } func sockBufSend(sb *C.RTMPSockBuf, buf *byte, l int32) int32 { return int32(C.send(sb.sb_socket, unsafe.Pointer(buf), C.size_t(l), 0)) } // TODO: port RTMP_METHOD func avQueue(vals **C.RTMP_METHOD, num *int, av *C.AVal, txn int) { var rtmpMethodPtr *C.RTMP_METHOD if (*num & 0x0f) == 0 { // TODO: work out what to do with the realloc *vals = (*C.RTMP_METHOD)(C.realloc(unsafe.Pointer(*vals), C.size_t((*num+16)*int(unsafe.Sizeof(*(*vals)))))) } tmp := unsafe.Pointer(C.malloc(C.size_t(av.av_len + 1))) memmove(tmp, unsafe.Pointer(av.av_val), uintptr(av.av_len)) //C.memcpy(tmp, unsafe.Pointer(av.av_val), C.size_t(av.av_len)) *indxBytePtr(tmp, int(av.av_len)) = *(*byte)(unsafe.Pointer(C.CString(""))) (*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(*vals), *num, int(unsafe.Sizeof(rtmpMethodPtr))))).num = C.int(txn) (*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(*vals), *num, int(unsafe.Sizeof(rtmpMethodPtr))))).name.av_len = av.av_len (*(*C.RTMP_METHOD)(incPtr(unsafe.Pointer(*vals), *num, int(unsafe.Sizeof(rtmpMethodPtr))))).name.av_val = (*C.char)(tmp) } // wrapper for converting byte pointer to unsafe.Pointer func uBP(b *byte) unsafe.Pointer { return unsafe.Pointer(b) } // wrapper for converting slice to unsafe.pointer func uBSP(b []byte) unsafe.Pointer { return unsafe.Pointer(&b[0]) } func goStrToCStr(str string) *byte { l := len(str) slice := make([]byte, l+1) ptr := unsafe.Pointer(&[]byte(str)[0]) for i := 0; i < l; i++ { slice[i] = *indxBytePtr(ptr, i) } slice[l] = '\000' return slice } func strlen(str *byte) int32 { for i := 0; true; i++ { ptr = indxBytePtr(unsafe.Pointer(str), i) if *ptr == '\000' { return i } } return 0 } func strchr(str *byte, val byte) *byte { var ptr *byte for i := 0; true; i++ { ptr = indxBytePtr(unsafe.Pointer(str), i) if *ptr == val { return ptr } if *ptr == '\000' { break } } return nil } func allocate(nOfBytes uintptr) unsafe.Pointer { mem := make([]byte, int(nOfBytes)) return unsafe.Pointer(&mem[0]) } // indxBytePtr returns a byte at the indx inc give a ptr func indxBytePtr(ptr unsafe.Pointer, inc int) *byte { return (*byte)(incPtr(ptr, inc, byteSize)) } // indxInt32Ptr returns an int32 at the indx inc given a ptr func indxInt32Ptr(ptr unsafe.Pointer, inc int) *int32 { return (*int32)(incPtr(ptr, inc, int32Size)) } // indxInt64Ptr returns an int64 at the indx inc given a ptr func indxInt64Ptr(ptr unsafe.Pointer, inc int) *int64 { return (*int64)(incPtr(ptr, inc, int64Size)) } // incBytePtr returns an unsafe.Pointer to a byte that is inc positive positions // from the passed ptr func incBytePtr(ptr unsafe.Pointer, inc int) unsafe.Pointer { return incPtr(ptr, inc, byteSize) } // incInt32Ptr returns an unsafe.Pointer to an int32 that is inc positive // positions from the passed ptr func incInt32Ptr(ptr unsafe.Pointer, inc int) unsafe.Pointer { return incPtr(ptr, inc, int32Size) } // incInt64Ptr returns an unsafe.Pointer to an int64 that is inc positive // positions from the passed ptr func incInt64Ptr(ptr unsafe.Pointer, inc int) unsafe.Pointer { return incPtr(ptr, inc, int64Size) } // incPtr attempts to replicate C like pointer arithmatic functionality func incPtr(ptr unsafe.Pointer, inc, typeSize int) unsafe.Pointer { return unsafe.Pointer(uintptr(ptr) + uintptr(inc*typeSize)) } // incPtr attempts to replicate C like pointer arithmatic functionality func decPtr(ptr unsafe.Pointer, dec, typeSize int) unsafe.Pointer { return unsafe.Pointer(uintptr(ptr) - uintptr(dec*typeSize)) } // decBytePtr returns an unsafe.Pointer to a byte that is dec negative positions // from ptr func decBytePtr(ptr unsafe.Pointer, dec int) unsafe.Pointer { return decPtr(ptr, dec, byteSize) } // decBytePtr returns an unsafe.Pointer to a int32 that is dec negative positions // from ptr func decInt32Ptr(ptr unsafe.Pointer, dec int) unsafe.Pointer { return decPtr(ptr, dec, int32Size) } // decBytePtr returns an unsafe.Pointer to a int64 that is dec negative positions // from ptr func decInt64Ptr(ptr unsafe.Pointer, dec int) unsafe.Pointer { return decPtr(ptr, dec, int64Size) } // sliceToPtr get's the address of the first data element and returns as unsafe // pointer func sliceToPtr(data []byte) unsafe.Pointer { return unsafe.Pointer(&data[0]) } // ptrToSlice returns a slice given unsafe pointer and size - no allocation and // copying is required - same data is used. func ptrToSlice(data unsafe.Pointer, size int) []byte { var ret []byte shDest := (*reflect.SliceHeader)(unsafe.Pointer(&ret)) shDest.Data = uintptr(data) shDest.Len = size shDest.Cap = size return ret } // C.AVal is in amf.h // See AVC(str) {str, sizeof(str)-1} in amf.h func AVC(str string) C.AVal { var aval C.AVal aval.av_val = C.CString(str) aval.av_len = C.int(len(str)) return aval } var rtmpErrs = [...]string{ 1: "rtmp: not connected", 2: "rtmp: write error", 3: "rtmp: not started", } type Err uint func (e Err) Error() string { if 0 <= int(e) && int(e) < len(rtmpErrs) { s := rtmpErrs[e] if s != "" { return s } } return "rtmp: " + strconv.Itoa(int(e)) }