/* NAME rtmp.go DESCRIPTION See Readme.md AUTHOR Saxon Nelson-Milton Dan Kortschak LICENSE rtmp.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. */ package rtmp /* #cgo CFLAGS: -I/usr/local/include/librtmp #cgo LDFLAGS: -lrtmp -Wl,-rpath=/usr/local/lib #include #include #include RTMP* start_session(RTMP* rtmp, char* url, uint connect_timeout); int write_frame(RTMP* rtmp, char* data, uint data_length); int end_session(RTMP* rtmp); */ import "C" import ( "errors" _ "fmt" "log" "math" "reflect" "strconv" "unsafe" ) const ( minDataSize = 11 ) const ( RTMP_PACKET_SIZE_LARGE = 0 RTMP_PACKET_SIZE_MEDIUM = 1 RTMP_PACKET_SIZE_SMALL = 2 RTMP_PACKET_TYPE_INFO = 0x12 RTMP_PACKET_TYPE_AUDIO = 0x08 RTMP_PACKET_TYPE_VIDEO = 0x09 ) // Session provides an interface for sending flv tags over rtmp. type Session interface { Open() error Write([]byte) (int, error) Close() error } // session provides parameters required for an rtmp communication session. type session struct { rtmp *C.RTMP url string timeout uint } var _ Session = (*session)(nil) // NewSession returns a new session. func NewSession(url string, connectTimeout uint) Session { return &session{ url: url, timeout: connectTimeout, } } // C.AVal is in amf.h // See #define AVC(str) {str, sizeof(str)-1} in amf.h func AVC(str string) C.AVal { var aval C.AVal aval.av_val = C.CString(str) aval.av_len = C.int(len(str)) return aval } // av_setDataFrame is a static const global in rtmp.c var setDataFrame = AVC("@setDataFrame") // Open establishes an rtmp connection with the url passed into the // constructor func (s *session) Open() error { if s.rtmp != nil { return errors.New("rtmp: attempt to start already running session") } var err error s.rtmp, err = C.start_session(s.rtmp, C.CString(s.url), C.uint(s.timeout)) if s.rtmp == nil { return err } return nil } // Write writes a frame (flv tag) to the rtmp connection func (s *session) Write(data []byte) (int, error) { if s.rtmp == nil { return 0, Err(3) } if C.RTMP_IsConnected(s.rtmp) == 0 { return 0, Err(1) } //if C.RTMP_Write(s.rtmp,(*C.char)(unsafe.Pointer(&data[0])),C.int(len(data))) == 0 { if rtmpWrite(s.rtmp, data) == 0 { return 0, Err(2) } return len(data), nil } // memmove copies n bytes from "from" to "to". //go:linkname memmove runtime.memmove func memmove(to, from unsafe.Pointer, n uintptr) // rtmpWrite writes data to the current rtmp connection encapsulated by r func rtmpWrite(r *C.RTMP, data []byte) int { buf := sliceToPtr(data) // TODO: port RTMPPacket var pkt = &r.m_write var pend, enc unsafe.Pointer size := len(data) s2 := size var ret, num int pkt.m_nChannel = 0x04 pkt.m_nInfoField2 = r.m_stream_id for s2 != 0 { if pkt.m_nBytesRead == 0 { if size < minDataSize { log.Printf("size: %d\n", size) log.Printf("too small \n") return 0 } if indxPtr(buf,0) == 'F' && indxPtr(buf,1) == 'L' && indxPtr(buf,2) == 'V' { buf = unsafe.Pointer(uintptr(buf) + uintptr(13)) s2 -= 13 } pkt.m_packetType = C.uchar(indxPtr(buf,0)) buf = incPtr(buf, 1) // TODO: port this pkt.m_nBodySize = C.AMF_DecodeInt24((*C.char)(buf)) buf = incPtr(buf, 3) // TODO: replace with ported version pkt.m_nTimeStamp = C.AMF_DecodeInt24((*C.char)(buf)) buf = incPtr(buf, 3) pkt.m_nTimeStamp |= C.uint(indxPtr(buf,0)) << 24 buf = incPtr(buf, 4) s2 -= 11 if ((pkt.m_packetType == RTMP_PACKET_TYPE_AUDIO || pkt.m_packetType == RTMP_PACKET_TYPE_VIDEO) && pkt.m_nTimeStamp == 0) || pkt.m_packetType == RTMP_PACKET_TYPE_INFO { pkt.m_headerType = RTMP_PACKET_SIZE_LARGE if pkt.m_packetType == RTMP_PACKET_TYPE_INFO { pkt.m_nBodySize += 16 } } else { pkt.m_headerType = RTMP_PACKET_SIZE_MEDIUM } // TODO: Port this if int(C.RTMPPacket_Alloc(pkt, pkt.m_nBodySize)) == 0 { log.Println("Failed to allocate packet") return 0 } enc = unsafe.Pointer(pkt.m_body) pend = incPtr(enc, int(pkt.m_nBodySize)) if pkt.m_packetType == RTMP_PACKET_TYPE_INFO { // TODO: Port this enc = unsafe.Pointer(C.AMF_EncodeString((*C.char)(enc), (*C.char)(pend), &setDataFrame)) pkt.m_nBytesRead = C.uint(math.Abs(float64(uintptr(enc) - uintptr(unsafe.Pointer(pkt.m_body))))) } } else { enc = incPtr(unsafe.Pointer(pkt.m_body), int(pkt.m_nBytesRead)) } num = int(pkt.m_nBodySize - pkt.m_nBytesRead) if num > s2 { num = s2 } memmove(enc,buf,uintptr(num)) //copy(ptrToSlice(enc, num), ptrToSlice(buf, num)) pkt.m_nBytesRead += C.uint(num) s2 -= num buf = incPtr(buf, num) if pkt.m_nBytesRead == pkt.m_nBodySize { // TODO: Port this ret = int(C.RTMP_SendPacket(r, pkt, 0)) // TODO: Port this C.RTMPPacket_Free(pkt) pkt.m_nBytesRead = 0 if ret == 0 { return -1 } buf = incPtr(buf, 4) s2 -= 4 if s2 < 0 { break } } } return size + s2 } // indxPtr replicates C array indexing using an unsafe pointer func indxPtr(ptr unsafe.Pointer, inc int) byte { return *(*byte)(incPtr(ptr,inc)) } // sliceToPtr get's the address of the first data element and returns as unsafe // pointer func sliceToPtr(data []byte) unsafe.Pointer { return unsafe.Pointer(&data[0]) } // ptrToSlice returns a slice given unsafe pointer and size - no allocation and // copying is required - same data is used. func ptrToSlice(data unsafe.Pointer, size int) []byte { var ret []byte shDest := (*reflect.SliceHeader)(unsafe.Pointer(&ret)) shDest.Data = uintptr(data) shDest.Len = size shDest.Cap = size return ret } // incPtr attempts to replicate C like pointer arithmatic functionality func incPtr(ptr unsafe.Pointer, inc int) unsafe.Pointer { return unsafe.Pointer(uintptr(ptr) + uintptr(inc)) } /* func sendPacket(r *C.RTMP, pkt *C.RTMPPacket, queue int) int { const prevPacket *C.RTMPPacket last := 0 var nSize, hSize, cSize, nChunkSize, tlen int var header, hptr, hend, buffer *C.char var goHbuf [RTMP_MAX_HEADER_SIZE]byte var hbuf = (*C.char)(unsafe.Pointer(&goHbuf[0])) var c C.char var t int32 var tbuf *C.char = nil var toff *C.char = nil if packet.m_nChannel >= r.m_channelsAllocatedOut { n := int(packet.m_nChannel+10) packets := C.realloc(r.m_vecChannelsOut, unsafe.Sizeof(*C.RTMPPacket) * n) if packets == 0 { C.free(r.m_vecChannelsOut) r.m_vecChannelsOut = nil r.m_channelsAllocatedOut = 0 return 0 } r.m_vecChannelsOut = packets C.memset(r.m_vecChannelsOut + r.m_channelsAllocatedOut, 0, unsafe.Sizeof(*RTMPPacket) * (n-r.m_channelsAllocatedOut)) r.m_channelsAllocatedOut = n } prevPackt = (*RTMPPacket)(unsafe.Pointer() ) } */ // Close terminates the rtmp connection func (s *session) Close() error { if s.rtmp == nil { return Err(3) } ret := C.end_session(s.rtmp) s.rtmp = nil if ret != 0 { return Err(ret) } return nil } var rtmpErrs = [...]string{ 1: "rtmp: not connected", 2: "rtmp: write error", 3: "rtmp: not started", } type Err uint func (e Err) Error() string { if 0 <= int(e) && int(e) < len(rtmpErrs) { s := rtmpErrs[e] if s != "" { return s } } return "rtmp: " + strconv.Itoa(int(e)) }