/* NAME rtmp.go DESCRIPTION See Readme.md AUTHOR Saxon Nelson-Milton Dan Kortschak LICENSE rtmp.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. */ package rtmp /* #cgo CFLAGS: -I/usr/local/include/librtmp #cgo LDFLAGS: -lrtmp -Wl,-rpath=/usr/local/lib #include #include RTMP* start_session(RTMP* rtmp, char* url, uint connect_timeout); int write_frame(RTMP* rtmp, char* data, uint data_length); int end_session(RTMP* rtmp); */ import "C" import ( "errors" "strconv" "unsafe" "log" "bitbucket.org/ausocean/av/tools" ) // Session provides an interface for sending flv tags over rtmp. type Session interface { Open() error Write([]byte) (int, error) Close() error } // session provides parameters required for an rtmp communication session. type session struct { rtmp *C.RTMP url string timeout uint } var _ Session = (*session)(nil) // NewSession returns a new session. func NewSession(url string, connectTimeout uint) Session { return &session{ url: url, timeout: connectTimeout, } } // Open establishes an rtmp connection with the url passed into the // constructor func (s *session) Open() error { if s.rtmp != nil { return errors.New("rtmp: attempt to start already running session") } var err error s.rtmp, err = C.start_session(s.rtmp, C.CString(s.url), C.uint(s.timeout)) if s.rtmp == nil { return err } return nil } func AVC(str string) *C.char { slice := [2]byte{str, C.sizeof(str)-1} return (*C.char)(unsafe.Pointer(&slice[0])) } func (s *session) rtmpWrite(r *C.RTMP, buf []byte) int { var pkt *C.RTMPPacket = &rtmp.m_write var pend, enc *C.char s2 := len(buf) var ret, num int pkt.m_nChannel = 0x04 pkt.m_nInfoField2 = r.m_stream_id for s2 > 0 { if !pkt.m_nBytesRead { if size < 11 { log.Printf("size: %d\n", size) log.Printf("too small \n") return 0 } if buf[0] == 'F' && buf[1] == 'L' && buf[2] == 'V' { buf += 13 s2 -= 13 } // pkt.m_packetType = *buf++ buf = buf[1:] pkt.m_packetType = buf[0] cChar := (*C.char)(unsafe.Pointer(&buf[0])) pkt.m_nBodySize = C.AMF_DecodeInt24(cChar) // C: buf+=3 buf = buf[3:] cChar = (*C.char)(unsafe.Pointer(&buf[0])) pkt.m_nTimeStamp = C.AMF_DecodeInt24(cChar) // C: buf+=3 buf = buf[3:] // C: pkt->m_nTimeStamp |= *buf++ << 24; buf = buf[1:] pkt.m_nTimeStamp |= ( buf[0] << 24 ) // C: buf+=3 buf = buf[3:] s2 -= 11 if ((pkt.m_packetType == C.RTMP_PACKET_TYPE_AUDIO || pkt.m_packetType == C.RTMP_PACKET_TYPE_VIDEO) && !pkt.m_nTimeStamp) || pkt.m_packetType == C.RTMP_PACKET_TYPE_INFO { pkt.m_headerType = C.RTMP_PACKET_SIZE_LARGE if pkt.m_packetType == C.RTMP_PACKET_TYPE_INFO { pkt.m_nBodySize += 16 } } else { pkt.m_headerType = C.RTMP_PACKET_SIZE_MEDIUM } // C: if (!RTMPPacket_Alloc(pkt, pkt->m_nBodySize)) if !helpers.UintToBool(uint( C.RTMPPacket_Alloc(pkt, pkt.m_nBodySize))) { log.Println("Failed to allocated packet") return 0 } // Note this is pointer arithmatic enc = pkt.m_body pend = enc + pkt.m_nBodySize if pkt.m_packetType == C.RTMP_PACKET_TYPE_INFO { // av_setDataFrame is a static const global in rtmp.c av_setDataFram := AVC("@setDataFrame") enc = C.AMF_EncodeString(enc, pend, &av_setDataFrame ) pkt.m_nBytesRead = enc - pkt.m_body } } else { enc = pkt.m_body + pkt.m_nBytesRead } num = pkt.m_nBodySize - pkt.m_nBytesRead if num > s2 { num = s2 } // C.memcpy(enc, buf, num) copy(enc,buf) pkt.m_nBytesRead += num s2 -= num buf += num if pkt.m_nBytesRead == pkt.m_nBodySize { ret = C.RTMP_SendPacket(r, pkt, 0) c.RTMPPacket_Free(pkt) pkt.m_nBytesRead = 0 if !uintToBool(ret) { return -1 } // buf += 4 buf = buf[4:] s2 -= 4 if s2 < 0 { break } } } return size+s2 } func (s *session) writeFrame(data []byte) uint { if C.RTMP_IsConnected(s.rtmp) <= 0 { return 1 } /* // If RTMP_Write returns less than or equal to zero then something is wrong if C.RTMP_Write(s.rtmp, (*C.char)(unsafe.Pointer(&data[0])), C.int(len(data))) <= 0 { return 2 } */ if rtmpWrite(s.rtmp, data) <= 0 { return 2 } return 0 } // Write writes a frame (flv tag) to the rtmp connection func (s *session) Write(data []byte) (int, error) { if s.rtmp == nil { return 0, Err(3) } ret := s.writeFrame(data) if ret != 0 { return 0, Err(ret) } return len(data), nil } // Close terminates the rtmp connection func (s *session) Close() error { if s.rtmp == nil { return Err(3) } ret := C.end_session(s.rtmp) s.rtmp = nil if ret != 0 { return Err(ret) } return nil } var rtmpErrs = [...]string{ 1: "rtmp: not connected", 2: "rtmp: write error", 3: "rtmp: not started", } type Err uint func (e Err) Error() string { if 0 <= int(e) && int(e) < len(rtmpErrs) { s := rtmpErrs[e] if s != "" { return s } } return "rtmp: " + strconv.Itoa(int(e)) }