diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index c585d13b..e85f1658 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -8,9 +8,10 @@ DESCRIPTION AUTHORS Saxon Nelson-Milton Dan Kortschak + Alan Noble LICENSE - rtmp.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + rtmp.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the @@ -142,40 +143,10 @@ var ( errCopying = errors.New("rtmp: copying error") ) -func startSession(rtmp *C_RTMP, u string, timeout uint) (*C_RTMP, error) { - rtmp = C_RTMP_Alloc() - C_RTMP_Init(rtmp) - rtmp.Link.timeout = timeout - err := C_RTMP_SetupURL(rtmp, u) - if err != nil { - C_RTMP_Close(rtmp) - return nil, err - } - - C_RTMP_EnableWrite(rtmp) - rtmp.nBufferMS = 3600 * 1000 - err = C_RTMP_Connect(rtmp, nil) - if err != nil { - C_RTMP_Close(rtmp) - return nil, err - } - - err = C_RTMP_ConnectStream(rtmp, 0) - if err != nil { - C_RTMP_Close(rtmp) - return nil, err - } - - return rtmp, nil -} - -func endSession(rtmp *C_RTMP) uint32 { - if rtmp == nil { - return 3 - } - - C_RTMP_Close(rtmp) - return 0 +// RTMPPacket_IsReady(a) +// rtmp.h +142 +func C_RTMPPacket_IsReady(pkt *packet) bool { + return pkt.bytesRead == pkt.bodySize } // uint32_t RTMP_GetTime(); @@ -184,55 +155,10 @@ func C_RTMP_GetTime() int32 { return int32(time.Now().UnixNano() / 1000000) } -// int RTMPPacket_Alloc(RTMPPacket* p, uint32_t nSize); -// rtmp.c +189 -func C_RTMPPacket_Alloc(p *C_RTMPPacket, nSize uint32) { - buf := make([]byte, RTMP_MAX_HEADER_SIZE+nSize) - p.header = buf - p.body = buf[RTMP_MAX_HEADER_SIZE:] - p.nBytesRead = 0 -} - -// void RTMPPacket_Free(RTMPPacket* p); -// rtmp.c +203 -func C_RTMPPacket_Free(p *C_RTMPPacket) { - if p.body != nil { - p.body = nil - } -} - -// RTMP* RTMP_IsConnected(); -// rtmp.c +317 -func C_RTMP_Alloc() *C_RTMP { - return &C_RTMP{} -} - -// void RTMP_Init(RTMP *r); -// rtmp.c +329 -func C_RTMP_Init(r *C_RTMP) { - *r = C_RTMP{} - r.inChunkSize = RTMP_DEFAULT_CHUNKSIZE - r.outChunkSize = RTMP_DEFAULT_CHUNKSIZE - r.nBufferMS = 30000 - r.nClientBW = 2500000 - r.nClientBW2 = 2 - r.nServerBW = 2500000 - r.fAudioCodecs = 3191.0 - r.fVideoCodecs = 252.0 - r.Link.timeout = 30 - r.Link.swfAge = 30 -} - // void RTMP_EnableWrite(RTMP *r); // rtmp.c +351 -func C_RTMP_EnableWrite(r *C_RTMP) { - r.Link.protocol |= RTMP_FEATURE_WRITE -} - -// int RTMP_IsConnected(RTMP *r); -// rtmp.c +363 -func C_RTMP_IsConnected(r *C_RTMP) bool { - return r.Link.conn != nil +func C_RTMP_EnableWrite(s *Session) { + s.link.protocol |= RTMP_FEATURE_WRITE } // void RTMP_SetBufferMS(RTMP *r, int size); @@ -246,31 +172,30 @@ func C_RTMP_IsConnected(r *C_RTMP) bool { // int RTMP_SetupURL(RTMP *r, char* url); // rtmp.c +757 // NOTE: code dealing with rtmp over http has been disregarded -func C_RTMP_SetupURL(r *C_RTMP, addr string) (err error) { - r.Link.protocol, r.Link.host, r.Link.port, r.Link.app, r.Link.playpath0, err = C_RTMP_ParseURL(addr) +func C_RTMP_SetupURL(s *Session, addr string) (err error) { + s.link.protocol, s.link.host, s.link.port, s.link.app, s.link.playpath, err = C_RTMP_ParseURL(addr) if err != nil { return err } - r.Link.playpath = r.Link.playpath0 - if r.Link.tcUrl == "" { - if r.Link.app != "" { - r.Link.tcUrl = fmt.Sprintf("%v://%v:%v/%v", - RTMPProtocolStringsLower[r.Link.protocol], r.Link.host, r.Link.port, r.Link.app) - r.Link.lFlags |= RTMP_LF_FTCU + if s.link.tcUrl == "" { + if s.link.app != "" { + s.link.tcUrl = fmt.Sprintf("%v://%v:%v/%v", + RTMPProtocolStringsLower[s.link.protocol], s.link.host, s.link.port, s.link.app) + s.link.lFlags |= RTMP_LF_FTCU } else { - r.Link.tcUrl = addr + s.link.tcUrl = addr } } - if r.Link.port == 0 { + if s.link.port == 0 { switch { - case (r.Link.protocol & RTMP_FEATURE_SSL) != 0: - r.Link.port = 433 - case (r.Link.protocol & RTMP_FEATURE_HTTP) != 0: - r.Link.port = 80 + case (s.link.protocol & RTMP_FEATURE_SSL) != 0: + s.link.port = 433 + case (s.link.protocol & RTMP_FEATURE_HTTP) != 0: + s.link.port = 80 default: - r.Link.port = 1935 + s.link.port = 1935 } } return nil @@ -278,19 +203,19 @@ func C_RTMP_SetupURL(r *C_RTMP, addr string) (err error) { // int RTMP_Connect(RTMP *r, RTMPPacket* cp); // rtmp.c +1032 -func C_RTMP_Connect(r *C_RTMP, cp *C_RTMPPacket) error { - addr, err := net.ResolveTCPAddr("tcp4", r.Link.host+":"+strconv.Itoa(int(r.Link.port))) +func C_RTMP_Connect(s *Session, cp *packet) error { + addr, err := net.ResolveTCPAddr("tcp4", s.link.host+":"+strconv.Itoa(int(s.link.port))) if err != nil { return err } - r.Link.conn, err = net.DialTCP("tcp4", nil, addr) + s.link.conn, err = net.DialTCP("tcp4", nil, addr) if err != nil { return err } if debugMode { log.Println("... connected, handshaking...") } - err = C_HandShake(r, 1) + err = C_HandShake(s, 1) if err != nil { log.Println("C_RTMP_Connect1: handshake failed!") return errHandshake @@ -298,7 +223,7 @@ func C_RTMP_Connect(r *C_RTMP, cp *C_RTMPPacket) error { if debugMode { log.Println("... handshaked...") } - err = C_SendConnectPacket(r, cp) + err = C_SendConnectPacket(s, cp) if err != nil { log.Println("RTMP connect failed!") return errConnSend @@ -312,41 +237,39 @@ func C_RTMP_Connect(r *C_RTMP, cp *C_RTMPPacket) error { // int RTMP_ConnectStream(RTMP* r, int seekTime); // rtmp.c +1099 -// Side effects: r.bPlaying is set true upon successful connection -func C_RTMP_ConnectStream(r *C_RTMP, seekTime int32) error { - var packet C_RTMPPacket +// Side effects: s.isPlaying is set true upon successful connection +func C_RTMP_ConnectStream(s *Session, seekTime int32) error { + var pkt packet if seekTime > 0 { - r.Link.seekTime = seekTime + s.link.seekTime = seekTime } - r.mediaChannel = 0 - - for !r.bPlaying && C_RTMP_IsConnected(r) { - err := C_RTMP_ReadPacket(r, &packet) + for !s.isPlaying && s.isConnected() { + err := C_RTMP_ReadPacket(s, &pkt) if err != nil { break } // TODO: port is ready - if C_RTMPPacket_IsReady(&packet) { - if packet.nBodySize == 0 { + if C_RTMPPacket_IsReady(&pkt) { + if pkt.bodySize == 0 { continue } - if packet.packetType == RTMP_PACKET_TYPE_AUDIO || - packet.packetType == RTMP_PACKET_TYPE_VIDEO || - packet.packetType == RTMP_PACKET_TYPE_INFO { + if pkt.packetType == RTMP_PACKET_TYPE_AUDIO || + pkt.packetType == RTMP_PACKET_TYPE_VIDEO || + pkt.packetType == RTMP_PACKET_TYPE_INFO { log.Println("C_RTMP_ConnectStream: got packet before play()! Ignoring.") - C_RTMPPacket_Free(&packet) + pkt.body = nil continue } - C_RTMP_ClientPacket(r, &packet) - C_RTMPPacket_Free(&packet) + C_RTMP_ClientPacket(s, &pkt) + pkt.body = nil } } - if !r.bPlaying { + if !s.isPlaying { return errConnStream } return nil @@ -355,13 +278,13 @@ func C_RTMP_ConnectStream(r *C_RTMP, seekTime int32) error { // int RTMP_ClientPacket() // rtmp.c +1226 // NOTE cases have been commented out that are not currently used by AusOcean -func C_RTMP_ClientPacket(r *C_RTMP, packet *C_RTMPPacket) int32 { - var bHasMediaPacket int32 - switch packet.packetType { +func C_RTMP_ClientPacket(s *Session, pkt *packet) int32 { + var hasMediaPacket int32 + switch pkt.packetType { case RTMP_PACKET_TYPE_CHUNK_SIZE: // TODO: port this - C_HandleChangeChunkSize(r, packet) + C_HandleChangeChunkSize(s, pkt) case RTMP_PACKET_TYPE_BYTES_READ_REPORT: // TODO: usue new logger here @@ -372,15 +295,15 @@ func C_RTMP_ClientPacket(r *C_RTMP, packet *C_RTMPPacket) int32 { /* log.Println("RTMP_PACKET_TYPE_CONTROL") // TODO: port this - C.HandleCtrl(r, packet) + C.HandleCtrl(s, pkt) */ case RTMP_PACKET_TYPE_SERVER_BW: // TODO: port this - C_HandlServerBW(r, packet) + C_HandlServerBW(s, pkt) case RTMP_PACKET_TYPE_CLIENT_BW: // TODO: port this - C_HandleClientBW(r, packet) + C_HandleClientBW(s, pkt) case RTMP_PACKET_TYPE_AUDIO: panic("Unsupported packet type RTMP_PACKET_TYPE_AUDIO") @@ -397,13 +320,13 @@ func C_RTMP_ClientPacket(r *C_RTMP, packet *C_RTMPPacket) int32 { case RTMP_PACKET_TYPE_INVOKE: log.Println("RTMP_PACKET_TYPE_INVOKE:") // TODO use new logger here - //RTMP_Log(RTMP_LOGDEBUG, "%s, received: invoke %u bytes", __FUNCTION__,packet.nBodySize); + //RTMP_Log(RTMP_LOGDEBUG, "%s, received: invoke %u bytes", __FUNCTION__,pkt.bodySize); - err := C_HandleInvoke(r, packet.body[:packet.nBodySize]) + err := C_HandleInvoke(s, pkt.body[:pkt.bodySize]) if err != nil { // This will never happen with the methods we implement. log.Println("HasMediaPacket") - bHasMediaPacket = 2 + hasMediaPacket = 2 } case RTMP_PACKET_TYPE_FLASH_VIDEO: @@ -411,29 +334,29 @@ func C_RTMP_ClientPacket(r *C_RTMP, packet *C_RTMPPacket) int32 { default: // TODO use new logger here - // RTMP_Log(RTMP_LOGDEBUG, "%s, unknown packet type received: 0x%02x", __FUNCTION__,packet.packetType); + // RTMP_Log(RTMP_LOGDEBUG, "%s, unknown packet type received: 0x%02x", __FUNCTION__,pkt.packetType); } - return bHasMediaPacket + return hasMediaPacket } // int ReadN(RTMP* r, char* buffer, int n); // rtmp.c +1390 -func C_ReadN(r *C_RTMP, buf []byte) error { - err := r.Link.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(r.Link.timeout))) +func C_ReadN(s *Session, buf []byte) error { + err := s.link.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(s.link.timeout))) if err != nil { return err } - n, err := io.ReadFull(r.Link.conn, buf) + n, err := io.ReadFull(s.link.conn, buf) if err != nil { if debugMode { log.Printf("C_ReadN error: %v\n", err) } - C_RTMP_Close(r) + s.close() return err } - r.nBytesIn += int32(n) - if r.nBytesIn > (r.nBytesInSent + r.nClientBW/10) { - err := C_SendBytesReceived(r) + s.nBytesIn += int32(n) + if s.nBytesIn > (s.nBytesInSent + s.clientBW/10) { + err := C_SendBytesReceived(s) if err != nil { return err } @@ -443,18 +366,18 @@ func C_ReadN(r *C_RTMP, buf []byte) error { // int WriteN(RTMP* r, const char* buffer, int n); // rtmp.c +1502 -func C_WriteN(r *C_RTMP, buf []byte) error { +func C_WriteN(s *Session, buf []byte) error { //ToDo: consider using a different timeout for writes than for reads - err := r.Link.conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(r.Link.timeout))) + err := s.link.conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(s.link.timeout))) if err != nil { return err } - _, err = r.Link.conn.Write(buf) + _, err = s.link.conn.Write(buf) if err != nil { if debugMode { log.Printf("C_WriteN, RTMP send error: %v\n", err) } - C_RTMP_Close(r) + s.close() return err } return nil @@ -462,68 +385,68 @@ func C_WriteN(r *C_RTMP, buf []byte) error { // int SendConnectPacket(RTMP* r, RTMPPacket* cp); // rtmp.c +1579 -func C_SendConnectPacket(r *C_RTMP, cp *C_RTMPPacket) error { +func C_SendConnectPacket(s *Session, cp *packet) error { if cp != nil { - return C_RTMP_SendPacket(r, cp, 1) + return C_RTMP_SendPacket(s, cp, 1) } var pbuf [4096]byte - packet := C_RTMPPacket{ - nChannel: 0x03, + pkt := packet{ + channel: 0x03, headerType: RTMP_PACKET_SIZE_LARGE, packetType: RTMP_PACKET_TYPE_INVOKE, - nTimeStamp: 0, - nInfoField2: 0, + timestamp: 0, + info: 0, hasAbsTimestamp: false, header: pbuf[:], body: pbuf[RTMP_MAX_HEADER_SIZE:], } - enc := packet.body + enc := pkt.body enc = C_AMF_EncodeString(enc, av_connect) if enc == nil { return errEncoding } - r.numInvokes += 1 - enc = C_AMF_EncodeNumber(enc, float64(r.numInvokes)) + s.numInvokes += 1 + enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_OBJECT enc = enc[1:] - enc = C_AMF_EncodeNamedString(enc, av_app, r.Link.app) + enc = C_AMF_EncodeNamedString(enc, av_app, s.link.app) if enc == nil { return errEncoding } - if r.Link.protocol&RTMP_FEATURE_WRITE != 0 { + if s.link.protocol&RTMP_FEATURE_WRITE != 0 { enc = C_AMF_EncodeNamedString(enc, av_type, av_nonprivate) if enc == nil { return errEncoding } } - if r.Link.flashVer != "" { - enc = C_AMF_EncodeNamedString(enc, av_flashVer, r.Link.flashVer) + if s.link.flashVer != "" { + enc = C_AMF_EncodeNamedString(enc, av_flashVer, s.link.flashVer) if enc == nil { return errEncoding } } - if r.Link.swfUrl != "" { - enc = C_AMF_EncodeNamedString(enc, av_swfUrl, r.Link.swfUrl) + if s.link.swfUrl != "" { + enc = C_AMF_EncodeNamedString(enc, av_swfUrl, s.link.swfUrl) if enc == nil { return errEncoding } } - if r.Link.tcUrl != "" { - enc = C_AMF_EncodeNamedString(enc, av_tcUrl, r.Link.tcUrl) + if s.link.tcUrl != "" { + enc = C_AMF_EncodeNamedString(enc, av_tcUrl, s.link.tcUrl) if enc == nil { return errEncoding } } - if r.Link.protocol&RTMP_FEATURE_WRITE == 0 { + if s.link.protocol&RTMP_FEATURE_WRITE == 0 { enc = C_AMF_EncodeNamedBoolean(enc, av_fpad, false) if enc == nil { return errEncoding @@ -532,11 +455,11 @@ func C_SendConnectPacket(r *C_RTMP, cp *C_RTMPPacket) error { if enc == nil { return errEncoding } - enc = C_AMF_EncodeNamedNumber(enc, av_audioCodecs, r.fAudioCodecs) + enc = C_AMF_EncodeNamedNumber(enc, av_audioCodecs, s.audioCodecs) if enc == nil { return errEncoding } - enc = C_AMF_EncodeNamedNumber(enc, av_videoCodecs, r.fVideoCodecs) + enc = C_AMF_EncodeNamedNumber(enc, av_videoCodecs, s.videoCodecs) if enc == nil { return errEncoding } @@ -544,16 +467,16 @@ func C_SendConnectPacket(r *C_RTMP, cp *C_RTMPPacket) error { if enc == nil { return errEncoding } - if r.Link.pageUrl != "" { - enc = C_AMF_EncodeNamedString(enc, av_pageUrl, r.Link.pageUrl) + if s.link.pageUrl != "" { + enc = C_AMF_EncodeNamedString(enc, av_pageUrl, s.link.pageUrl) if enc == nil { return errEncoding } } } - if r.fEncoding != 0.0 || r.bSendEncoding { - enc = C_AMF_EncodeNamedNumber(enc, av_objectEncoding, r.fEncoding) + if s.encoding != 0.0 || s.sendEncoding { + enc = C_AMF_EncodeNamedNumber(enc, av_objectEncoding, s.encoding) if enc == nil { return errEncoding } @@ -565,200 +488,200 @@ func C_SendConnectPacket(r *C_RTMP, cp *C_RTMPPacket) error { enc = enc[3:] /* add auth string */ - if r.Link.auth != "" { - enc = C_AMF_EncodeBoolean(enc, r.Link.lFlags&RTMP_LF_AUTH != 0) + if s.link.auth != "" { + enc = C_AMF_EncodeBoolean(enc, s.link.lFlags&RTMP_LF_AUTH != 0) if enc == nil { return errEncoding } - enc = C_AMF_EncodeString(enc, r.Link.auth) + enc = C_AMF_EncodeString(enc, s.link.auth) if enc == nil { return errEncoding } } - for i := range r.Link.extras.o_props { - enc = C_AMF_PropEncode(&r.Link.extras.o_props[i], enc) + for i := range s.link.extras.o_props { + enc = C_AMF_PropEncode(&s.link.extras.o_props[i], enc) if enc == nil { return errEncoding } } - packet.nBodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return C_RTMP_SendPacket(r, &packet, 1) + return C_RTMP_SendPacket(s, &pkt, 1) } // int RTMP_SendCreateStream(RTMP* r); // rtmp.c +1725 -func C_RTMP_SendCreateStream(r *C_RTMP) error { +func C_RTMP_SendCreateStream(s *Session) error { var pbuf [256]byte - packet := C_RTMPPacket{ - nChannel: 0x03, /* control channel (invoke) */ + pkt := packet{ + channel: 0x03, /* control channel (invoke) */ headerType: RTMP_PACKET_SIZE_MEDIUM, packetType: RTMP_PACKET_TYPE_INVOKE, - nTimeStamp: 0, - nInfoField2: 0, + timestamp: 0, + info: 0, hasAbsTimestamp: false, header: pbuf[:], body: pbuf[RTMP_MAX_HEADER_SIZE:], } - enc := packet.body + enc := pkt.body enc = C_AMF_EncodeString(enc, av_createStream) if enc == nil { return errEncoding } - r.numInvokes++ - enc = C_AMF_EncodeNumber(enc, float64(r.numInvokes)) + s.numInvokes++ + enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_NULL enc = enc[1:] - packet.nBodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return C_RTMP_SendPacket(r, &packet, 1) + return C_RTMP_SendPacket(s, &pkt, 1) } // int SendReleaseStream(RTMP* r); // rtmp.c +1816 -func C_SendReleaseStream(r *C_RTMP) error { +func C_SendReleaseStream(s *Session) error { var pbuf [1024]byte - packet := C_RTMPPacket{ - nChannel: 0x03, /* control channel (invoke) */ + pkt := packet{ + channel: 0x03, /* control channel (invoke) */ headerType: RTMP_PACKET_SIZE_MEDIUM, packetType: RTMP_PACKET_TYPE_INVOKE, - nTimeStamp: 0, - nInfoField2: 0, + timestamp: 0, + info: 0, hasAbsTimestamp: false, header: pbuf[:], body: pbuf[RTMP_MAX_HEADER_SIZE:], } - enc := packet.body + enc := pkt.body enc = C_AMF_EncodeString(enc, av_releaseStream) if enc == nil { return errEncoding } - r.numInvokes++ - enc = C_AMF_EncodeNumber(enc, float64(r.numInvokes)) + s.numInvokes++ + enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_NULL enc = enc[1:] - enc = C_AMF_EncodeString(enc, r.Link.playpath) + enc = C_AMF_EncodeString(enc, s.link.playpath) if enc == nil { return errEncoding } - packet.nBodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return C_RTMP_SendPacket(r, &packet, 0) + return C_RTMP_SendPacket(s, &pkt, 0) } // int SendFCPublish(RTMP* r); // rtmp.c +1846 -func C_SendFCPublish(r *C_RTMP) error { +func C_SendFCPublish(s *Session) error { var pbuf [1024]byte - packet := C_RTMPPacket{ - nChannel: 0x03, /* control channel (invoke) */ + pkt := packet{ + channel: 0x03, /* control channel (invoke) */ headerType: RTMP_PACKET_SIZE_MEDIUM, packetType: RTMP_PACKET_TYPE_INVOKE, - nTimeStamp: 0, - nInfoField2: 0, + timestamp: 0, + info: 0, hasAbsTimestamp: false, header: pbuf[:], body: pbuf[RTMP_MAX_HEADER_SIZE:], } - enc := packet.body + enc := pkt.body enc = C_AMF_EncodeString(enc, av_FCPublish) if enc == nil { return errEncoding } - r.numInvokes++ - enc = C_AMF_EncodeNumber(enc, float64(r.numInvokes)) + s.numInvokes++ + enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_NULL enc = enc[1:] - enc = C_AMF_EncodeString(enc, r.Link.playpath) + enc = C_AMF_EncodeString(enc, s.link.playpath) if enc == nil { return errEncoding } - packet.nBodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return C_RTMP_SendPacket(r, &packet, 0) + return C_RTMP_SendPacket(s, &pkt, 0) } // int SendFCUnpublish(RTMP *r); // rtmp.c +1875 -func C_SendFCUnpublish(r *C_RTMP) error { +func C_SendFCUnpublish(s *Session) error { var pbuf [1024]byte - packet := C_RTMPPacket{ - nChannel: 0x03, /* control channel (invoke) */ + pkt := packet{ + channel: 0x03, /* control channel (invoke) */ headerType: RTMP_PACKET_SIZE_MEDIUM, packetType: RTMP_PACKET_TYPE_INVOKE, - nTimeStamp: 0, - nInfoField2: 0, + timestamp: 0, + info: 0, hasAbsTimestamp: false, header: pbuf[:], body: pbuf[RTMP_MAX_HEADER_SIZE:], } - enc := packet.body + enc := pkt.body enc = C_AMF_EncodeString(enc, av_FCUnpublish) if enc == nil { return errEncoding } - r.numInvokes++ - enc = C_AMF_EncodeNumber(enc, float64(r.numInvokes)) + s.numInvokes++ + enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_NULL enc = enc[1:] - enc = C_AMF_EncodeString(enc, r.Link.playpath) + enc = C_AMF_EncodeString(enc, s.link.playpath) if enc == nil { return errEncoding } - packet.nBodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return C_RTMP_SendPacket(r, &packet, 0) + return C_RTMP_SendPacket(s, &pkt, 0) } // int SendPublish(RTMP* r); // rtmp.c +1908 -func C_SendPublish(r *C_RTMP) error { +func C_SendPublish(s *Session) error { var pbuf [1024]byte - packet := C_RTMPPacket{ - nChannel: 0x04, /* source channel (invoke) */ + pkt := packet{ + channel: 0x04, /* source channel (invoke) */ headerType: RTMP_PACKET_SIZE_LARGE, packetType: RTMP_PACKET_TYPE_INVOKE, - nTimeStamp: 0, - nInfoField2: r.streamID, + timestamp: 0, + info: s.streamID, hasAbsTimestamp: false, header: pbuf[:], body: pbuf[RTMP_MAX_HEADER_SIZE:], } - enc := packet.body + enc := pkt.body enc = C_AMF_EncodeString(enc, av_publish) if enc == nil { return errEncoding } - r.numInvokes++ - enc = C_AMF_EncodeNumber(enc, float64(r.numInvokes)) + s.numInvokes++ + enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_NULL enc = enc[1:] - enc = C_AMF_EncodeString(enc, r.Link.playpath) + enc = C_AMF_EncodeString(enc, s.link.playpath) if enc == nil { return errEncoding } @@ -767,34 +690,34 @@ func C_SendPublish(r *C_RTMP) error { return errEncoding } - packet.nBodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return C_RTMP_SendPacket(r, &packet, 1) + return C_RTMP_SendPacket(s, &pkt, 1) } // int // SendDeleteStream(RTMP *r, double dStreamId) // rtmp.c +1942 -func C_SendDeleteStream(r *C_RTMP, dStreamId float64) error { +func C_SendDeleteStream(s *Session, dStreamId float64) error { var pbuf [256]byte - packet := C_RTMPPacket{ - nChannel: 0x03, /* control channel (invoke) */ + pkt := packet{ + channel: 0x03, /* control channel (invoke) */ headerType: RTMP_PACKET_SIZE_MEDIUM, packetType: RTMP_PACKET_TYPE_INVOKE, - nTimeStamp: 0, - nInfoField2: 0, + timestamp: 0, + info: 0, hasAbsTimestamp: false, header: pbuf[:], body: pbuf[RTMP_MAX_HEADER_SIZE:], } - enc := packet.body + enc := pkt.body enc = C_AMF_EncodeString(enc, av_deleteStream) if enc == nil { return errEncoding } - r.numInvokes++ - enc = C_AMF_EncodeNumber(enc, float64(r.numInvokes)) + s.numInvokes++ + enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } @@ -804,83 +727,83 @@ func C_SendDeleteStream(r *C_RTMP, dStreamId float64) error { if enc == nil { return errEncoding } - packet.nBodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) /* no response expected */ - return C_RTMP_SendPacket(r, &packet, 0) + return C_RTMP_SendPacket(s, &pkt, 0) } // int SendBytesReceived(RTMP* r); // rtmp.c +2080 -func C_SendBytesReceived(r *C_RTMP) error { +func C_SendBytesReceived(s *Session) error { var pbuf [256]byte - packet := C_RTMPPacket{ - nChannel: 0x02, /* control channel (invoke) */ + pkt := packet{ + channel: 0x02, /* control channel (invoke) */ headerType: RTMP_PACKET_SIZE_MEDIUM, packetType: RTMP_PACKET_TYPE_BYTES_READ_REPORT, - nTimeStamp: 0, - nInfoField2: 0, + timestamp: 0, + info: 0, hasAbsTimestamp: false, header: pbuf[:], body: pbuf[RTMP_MAX_HEADER_SIZE:], } - enc := packet.body + enc := pkt.body - r.nBytesInSent = r.nBytesIn - enc = C_AMF_EncodeInt32(enc, r.nBytesIn) + s.nBytesInSent = s.nBytesIn + enc = C_AMF_EncodeInt32(enc, s.nBytesIn) if enc == nil { return errEncoding } - packet.nBodySize = 4 + pkt.bodySize = 4 - return C_RTMP_SendPacket(r, &packet, 0) + return C_RTMP_SendPacket(s, &pkt, 0) } // int SendCheckBW(RTMP* r); // rtmp.c +2105 -func C_SendCheckBW(r *C_RTMP) error { +func C_SendCheckBW(s *Session) error { var pbuf [256]byte - packet := C_RTMPPacket{ - nChannel: 0x03, /* control channel (invoke) */ + pkt := packet{ + channel: 0x03, /* control channel (invoke) */ headerType: RTMP_PACKET_SIZE_LARGE, packetType: RTMP_PACKET_TYPE_INVOKE, - nTimeStamp: 0, - nInfoField2: 0, + timestamp: 0, + info: 0, hasAbsTimestamp: false, header: pbuf[:], body: pbuf[RTMP_MAX_HEADER_SIZE:], } - enc := packet.body + enc := pkt.body enc = C_AMF_EncodeString(enc, av__checkbw) if enc == nil { return errEncoding } - r.numInvokes++ - enc = C_AMF_EncodeNumber(enc, float64(r.numInvokes)) + s.numInvokes++ + enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_NULL enc = enc[1:] - packet.nBodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) + pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) - return C_RTMP_SendPacket(r, &packet, 0) + return C_RTMP_SendPacket(s, &pkt, 0) } -// void AV_erase(C_RTMP_METHOD* vals, int* num, int i, int freeit); +// void AV_erase(method* vals, int* num, int i, int freeit); // rtmp.c +2393 -func C_AV_erase(m []C_RTMP_METHOD, i int) []C_RTMP_METHOD { +func C_AV_erase(m []method, i int) []method { copy(m[i:], m[i+1:]) - m[len(m)-1] = C_RTMP_METHOD{} + m[len(m)-1] = method{} return m[:len(m)-1] } -// int HandleInvoke(RTMP* r, const char* body, unsigned int nBodySize); +// int HandleInvoke(RTMP* r, const char* body, unsigned int bodySize); // rtmp.c +2912 -// Side effects: r.bPlaying set to true upon av_NetStream_Publish_Start -func C_HandleInvoke(r *C_RTMP, body []byte) error { +// Side effects: s.isPlaying set to true upon av_NetStream_Publish_Start +func C_HandleInvoke(s *Session, body []byte) error { if body[0] != 0x02 { return errInvalidBody } @@ -893,18 +816,18 @@ func C_HandleInvoke(r *C_RTMP, body []byte) error { // NOTE we don't really need this ?? still functions without it //C.AMF_Dump(&obj) //C.AMFProp_GetString(C_AMF_GetProp(&obj, nil, 0), &method) - method := C_AMFProp_GetString(C_AMF_GetProp(&obj, "", 0)) + meth := C_AMFProp_GetString(C_AMF_GetProp(&obj, "", 0)) txn := C_AMFProp_GetNumber(C_AMF_GetProp(&obj, "", 1)) // TODO use new logger here // RTMP_Log(RTMP_LOGDEBUG, "%s, server invoking <%s>", __FUNCTION__, method.av_val); - switch method { + switch meth { case av__result: var methodInvoked string - for i, m := range r.methodCalls { + for i, m := range s.methodCalls { if float64(m.num) == txn { methodInvoked = m.name - r.methodCalls = C_AV_erase(r.methodCalls, i) + s.methodCalls = C_AV_erase(s.methodCalls, i) break } } @@ -919,27 +842,27 @@ func C_HandleInvoke(r *C_RTMP, body []byte) error { //methodInvoked.av_val); switch methodInvoked { case av_connect: - if r.Link.token != "" { + if s.link.token != "" { panic("No support for link token") } - if (r.Link.protocol & RTMP_FEATURE_WRITE) != 0 { - C_SendReleaseStream(r) - C_SendFCPublish(r) + if (s.link.protocol & RTMP_FEATURE_WRITE) != 0 { + C_SendReleaseStream(s) + C_SendFCPublish(s) } else { panic("Link protocol has no RTMP_FEATURE_WRITE") } - C_RTMP_SendCreateStream(r) - if (r.Link.protocol & RTMP_FEATURE_WRITE) == 0 { + C_RTMP_SendCreateStream(s) + if (s.link.protocol & RTMP_FEATURE_WRITE) == 0 { panic("Link protocol has no RTMP_FEATURE_WRITE") } case av_createStream: - r.streamID = int32(C_AMFProp_GetNumber(C_AMF_GetProp(&obj, "", 3))) + s.streamID = int32(C_AMFProp_GetNumber(C_AMF_GetProp(&obj, "", 3))) - if r.Link.protocol&RTMP_FEATURE_WRITE != 0 { - C_SendPublish(r) + if s.link.protocol&RTMP_FEATURE_WRITE != 0 { + C_SendPublish(s) } else { panic("Link protocol has no RTMP_FEATURE_WRITE") } @@ -950,8 +873,8 @@ func C_HandleInvoke(r *C_RTMP, body []byte) error { //C.free(unsafe.Pointer(methodInvoked.av_val)) case av_onBWDone: - if r.nBWCheckCounter == 0 { - C_SendCheckBW(r) + if s.bwCheckCounter == 0 { + C_SendCheckBW(s) } case av_onFCUnsubscribe, av_onFCSubscribe: @@ -989,10 +912,10 @@ func C_HandleInvoke(r *C_RTMP, body []byte) error { panic("Unsupported method av_NetStream_Play_Start/av_NetStream_Play_PublishNotify") case av_NetStream_Publish_Start: - r.bPlaying = true - for i, m := range r.methodCalls { + s.isPlaying = true + for i, m := range s.methodCalls { if m.name == av_publish { - r.methodCalls = C_AV_erase(r.methodCalls, i) + s.methodCalls = C_AV_erase(s.methodCalls, i) break } } @@ -1011,7 +934,7 @@ func C_HandleInvoke(r *C_RTMP, body []byte) error { panic("Unsupported method av_playlist_ready") default: - panic(fmt.Sprintf("unknown method: %q", method)) + panic(fmt.Sprintf("unknown method: %q", meth)) } leave: C_AMF_Reset(&obj) @@ -1021,37 +944,36 @@ leave: // void HandleChangeChunkSize(RTMP* r, const RTMPPacket* packet); // rtmp.c +3345 -func C_HandleChangeChunkSize(r *C_RTMP, packet *C_RTMPPacket) { - if packet.nBodySize >= 4 { - //r.inChunkSize = int32(C.AMF_DecodeInt32((*byte)(unsafe.Pointer(packet.body)))) - r.inChunkSize = int32(C_AMF_DecodeInt32(packet.body[:4])) +func C_HandleChangeChunkSize(s *Session, pkt *packet) { + if pkt.bodySize >= 4 { + s.inChunkSize = int32(C_AMF_DecodeInt32(pkt.body[:4])) // TODO use new logger here - // RTMP_Log(RTMP_LOGDEBUG, "%s, received: chunk size change to %d", __FUNCTION__, r.inChunkSize); + // RTMP_Log(RTMP_LOGDEBUG, "%s, received: chunk size change to %d", __FUNCTION__, s.inChunkSize); } } // void HandleServerBW(RTMP* r, const RTMPPacket* packet); // rtmp.c +3508 -func C_HandlServerBW(r *C_RTMP, packet *C_RTMPPacket) { - r.nServerBW = int32(C_AMF_DecodeInt32(packet.body[:4])) +func C_HandlServerBW(s *Session, pkt *packet) { + s.serverBW = int32(C_AMF_DecodeInt32(pkt.body[:4])) // TODO use new logger here - // RTMP_Log(RTMP_LOGDEBUG, "%s: server BW = %d", __FUNCTION__, r.nServerBW); + // RTMP_Log(RTMP_LOGDEBUG, "%s: server BW = %d", __FUNCTION__, s.serverBW); } // void HandleClientBW(RTMP* r, const RTMPPacket* packet); // rtmp.c +3515 -func C_HandleClientBW(r *C_RTMP, packet *C_RTMPPacket) { - r.nClientBW = int32(C_AMF_DecodeInt32(packet.body[:4])) - //r.nClientBW = int32(C.AMF_DecodeInt32((*byte)(unsafe.Pointer(packet.body)))) +func C_HandleClientBW(s *Session, pkt *packet) { + s.clientBW = int32(C_AMF_DecodeInt32(pkt.body[:4])) + //s.clientBW = int32(C.AMF_DecodeInt32((*byte)(unsafe.Pointer(pkt.body)))) - if packet.nBodySize > 4 { - r.nClientBW2 = packet.body[4] + if pkt.bodySize > 4 { + s.clientBW2 = pkt.body[4] } else { - r.nClientBW2 = 0xff + s.clientBW2 = 0xff } // TODO use new logger here - // RTMP_Log(RTMP_LOGDEBUG, "%s: client BW = %d %d", __FUNCTION__, r.nClientBW, - //r.nClientBW2); + // RTMP_Log(RTMP_LOGDEBUG, "%s: client BW = %d %d", __FUNCTION__, s.clientBW, + //s.clientBW2); } // static int DecodeInt32LE(const char* data); @@ -1069,171 +991,177 @@ func C_EncodeInt32LE(dst []byte, v int32) int32 { // int RTMP_ReadPacket(RTMP* r, RTMPPacket* packet); // rtmp.c +3550 -func C_RTMP_ReadPacket(r *C_RTMP, packet *C_RTMPPacket) error { +func C_RTMP_ReadPacket(s *Session, pkt *packet) error { var hbuf [RTMP_MAX_HEADER_SIZE]byte header := hbuf[:] - err := C_ReadN(r, header[:1]) + err := C_ReadN(s, header[:1]) if err != nil { log.Println("C_RTMP_ReadPacket: failed to read RTMP packet header!") return err } - packet.headerType = (header[0] & 0xc0) >> 6 - packet.nChannel = int32(header[0] & 0x3f) + pkt.headerType = (header[0] & 0xc0) >> 6 + pkt.channel = int32(header[0] & 0x3f) header = header[1:] switch { - case packet.nChannel == 0: - err = C_ReadN(r, header[:1]) + case pkt.channel == 0: + err = C_ReadN(s, header[:1]) if err != nil { log.Println("C_RTMP_ReadPacket: failed to read rtmp packet header 2nd byte.") return err } header = header[1:] - packet.nChannel = int32(header[0]) + 64 + pkt.channel = int32(header[0]) + 64 - case packet.nChannel == 1: - err = C_ReadN(r, header[:2]) + case pkt.channel == 1: + err = C_ReadN(s, header[:2]) if err != nil { log.Println("C_RTMP_ReadPacket: failed to read RTMP packet 3rd byte") return err } header = header[2:] - packet.nChannel = int32(binary.BigEndian.Uint16(header[:2])) + 64 + pkt.channel = int32(binary.BigEndian.Uint16(header[:2])) + 64 } - if packet.nChannel >= r.channelsAllocatedIn { - n := packet.nChannel + 10 - timestamp := append(r.channelTimestamp, make([]int32, 10)...) + if pkt.channel >= s.channelsAllocatedIn { + n := pkt.channel + 10 + timestamp := append(s.channelTimestamp, make([]int32, 10)...) - var packets []*C_RTMPPacket - if r.vecChannelsIn == nil { - packets = make([]*C_RTMPPacket, n) + var pkts []*packet + if s.vecChannelsIn == nil { + pkts = make([]*packet, n) } else { - packets = append(r.vecChannelsIn[:packet.nChannel:packet.nChannel], make([]*C_RTMPPacket, 10)...) + pkts = append(s.vecChannelsIn[:pkt.channel:pkt.channel], make([]*packet, 10)...) } - r.channelTimestamp = timestamp - r.vecChannelsIn = packets + s.channelTimestamp = timestamp + s.vecChannelsIn = pkts - for i := int(r.channelsAllocatedIn); i < len(r.channelTimestamp); i++ { - r.channelTimestamp[i] = 0 + for i := int(s.channelsAllocatedIn); i < len(s.channelTimestamp); i++ { + s.channelTimestamp[i] = 0 } - for i := int(r.channelsAllocatedIn); i < int(n); i++ { - r.vecChannelsIn[i] = nil + for i := int(s.channelsAllocatedIn); i < int(n); i++ { + s.vecChannelsIn[i] = nil } - r.channelsAllocatedIn = n + s.channelsAllocatedIn = n } - nSize := packetSize[packet.headerType] + size := packetSize[pkt.headerType] switch { - case nSize == RTMP_LARGE_HEADER_SIZE: - packet.hasAbsTimestamp = true - case nSize < RTMP_LARGE_HEADER_SIZE: - if r.vecChannelsIn[packet.nChannel] != nil { - *packet = *(r.vecChannelsIn[packet.nChannel]) + case size == RTMP_LARGE_HEADER_SIZE: + pkt.hasAbsTimestamp = true + case size < RTMP_LARGE_HEADER_SIZE: + if s.vecChannelsIn[pkt.channel] != nil { + *pkt = *(s.vecChannelsIn[pkt.channel]) } } - nSize-- + size-- - if nSize > 0 { - err = C_ReadN(r, header[:nSize]) + if size > 0 { + err = C_ReadN(s, header[:size]) if err != nil { - log.Println("C_RTMP_ReadPacket: failed to read rtmp packet header.") + log.Println("C_RTMP_ReadPacket: failed to read rtmp packet heades.") return err } } - hSize := len(hbuf) - len(header) + nSize + hSize := len(hbuf) - len(header) + size - if nSize >= 3 { - packet.nTimeStamp = C_AMF_DecodeInt24(header[:3]) + if size >= 3 { + pkt.timestamp = C_AMF_DecodeInt24(header[:3]) - if nSize >= 6 { - packet.nBodySize = C_AMF_DecodeInt24(header[3:6]) - packet.nBytesRead = 0 + if size >= 6 { + pkt.bodySize = C_AMF_DecodeInt24(header[3:6]) + pkt.bytesRead = 0 - if nSize > 6 { - packet.packetType = header[6] + if size > 6 { + pkt.packetType = header[6] - if nSize == 11 { - packet.nInfoField2 = C_DecodeInt32LE(header[7:11]) + if size == 11 { + pkt.info = C_DecodeInt32LE(header[7:11]) } } } } - extendedTimestamp := packet.nTimeStamp == 0xffffff + extendedTimestamp := pkt.timestamp == 0xffffff if extendedTimestamp { - err = C_ReadN(r, header[nSize:nSize+4]) + err = C_ReadN(s, header[size:size+4]) if err != nil { log.Println("RTMPRead_Packet: Failed to read extended timestamp") return err } // TODO: port this - packet.nTimeStamp = C_AMF_DecodeInt32(header[nSize : nSize+4]) + pkt.timestamp = C_AMF_DecodeInt32(header[size : size+4]) hSize += 4 } - if packet.nBodySize > 0 && packet.body == nil { - // TODO: port this - C_RTMPPacket_Alloc(packet, packet.nBodySize) - packet.headerType = (hbuf[0] & 0xc0) >> 6 + if pkt.bodySize > 0 && pkt.body == nil { + resizePacket(pkt, pkt.bodySize, (hbuf[0]&0xc0)>>6) } - nToRead := int32(packet.nBodySize - packet.nBytesRead) - nChunk := r.inChunkSize + toRead := int32(pkt.bodySize - pkt.bytesRead) + chunkSize := s.inChunkSize - if nToRead < nChunk { - nChunk = nToRead + if toRead < chunkSize { + chunkSize = toRead } - if packet.chunk != nil { - packet.chunk.headerSize = int32(hSize) - copy(packet.chunk.header[:], hbuf[:hSize]) - packet.chunk.data = packet.body[packet.nBytesRead : packet.nBytesRead+uint32(nChunk)] + if pkt.chunk != nil { + pkt.chunk.headerSize = int32(hSize) + copy(pkt.chunk.header[:], hbuf[:hSize]) + pkt.chunk.data = pkt.body[pkt.bytesRead : pkt.bytesRead+uint32(chunkSize)] } - err = C_ReadN(r, packet.body[packet.nBytesRead:][:nChunk]) + err = C_ReadN(s, pkt.body[pkt.bytesRead:][:chunkSize]) if err != nil { log.Println("C_RTMP_ReadPacket: failed to read RTMP packet body") return err } - packet.nBytesRead += uint32(nChunk) + pkt.bytesRead += uint32(chunkSize) // keep the packet as ref for other packets on this channel - if r.vecChannelsIn[packet.nChannel] == nil { - r.vecChannelsIn[packet.nChannel] = &C_RTMPPacket{} + if s.vecChannelsIn[pkt.channel] == nil { + s.vecChannelsIn[pkt.channel] = &packet{} } - *(r.vecChannelsIn[packet.nChannel]) = *packet + *(s.vecChannelsIn[pkt.channel]) = *pkt if extendedTimestamp { - r.vecChannelsIn[packet.nChannel].nTimeStamp = 0xffffff + s.vecChannelsIn[pkt.channel].timestamp = 0xffffff } // TODO: port this - if C_RTMPPacket_IsReady(packet) { - if !packet.hasAbsTimestamp { + if C_RTMPPacket_IsReady(pkt) { + if !pkt.hasAbsTimestamp { // timestamps seem to always be relative - packet.nTimeStamp += uint32(r.channelTimestamp[packet.nChannel]) + pkt.timestamp += uint32(s.channelTimestamp[pkt.channel]) } - r.channelTimestamp[packet.nChannel] = int32(packet.nTimeStamp) + s.channelTimestamp[pkt.channel] = int32(pkt.timestamp) - r.vecChannelsIn[packet.nChannel].body = nil - r.vecChannelsIn[packet.nChannel].nBytesRead = 0 - r.vecChannelsIn[packet.nChannel].hasAbsTimestamp = false + s.vecChannelsIn[pkt.channel].body = nil + s.vecChannelsIn[pkt.channel].bytesRead = 0 + s.vecChannelsIn[pkt.channel].hasAbsTimestamp = false } else { - packet.body = nil /* so it won't be erased on free */ + pkt.body = nil /* so it won't be erased on free */ } return nil } +// resizePacket adjust the packet's storage to accommodate a body of the given size. +func resizePacket(pkt *packet, size uint32, ht uint8) { + buf := make([]byte, RTMP_MAX_HEADER_SIZE+size) + pkt.headerType = ht + pkt.header = buf + pkt.body = buf[RTMP_MAX_HEADER_SIZE:] +} + // int HandShake(RTMP* r, int FP9HandShake); // rtmp.c +3744 -func C_HandShake(r *C_RTMP, FP9HandShake int32) error { +func C_HandShake(s *Session, FP9HandShake int32) error { var clientbuf [RTMP_SIG_SIZE + 1]byte clientsig := clientbuf[1:] @@ -1248,13 +1176,13 @@ func C_HandShake(r *C_RTMP, FP9HandShake int32) error { clientsig[i] = byte(rand.Intn(256)) } - err := C_WriteN(r, clientbuf[:]) + err := C_WriteN(s, clientbuf[:]) if err != nil { return err } var typ [1]byte - err = C_ReadN(r, typ[:]) + err = C_ReadN(s, typ[:]) if err != nil { return err } @@ -1266,7 +1194,7 @@ func C_HandShake(r *C_RTMP, FP9HandShake int32) error { log.Printf("C_HandShake: type mismatch: client sent %v, server sent: %v\n", clientbuf[0], typ) } - err = C_ReadN(r, serversig[:]) + err = C_ReadN(s, serversig[:]) if err != nil { return err } @@ -1279,12 +1207,12 @@ func C_HandShake(r *C_RTMP, FP9HandShake int32) error { // serversig[4], serversig[5], serversig[6], serversig[7]) // 2nd part of handshake - err = C_WriteN(r, serversig[:]) + err = C_WriteN(s, serversig[:]) if err != nil { return err } - err = C_ReadN(r, serversig[:]) + err = C_ReadN(s, serversig[:]) if err != nil { return err } @@ -1298,54 +1226,54 @@ func C_HandShake(r *C_RTMP, FP9HandShake int32) error { // int RTMP_SendPacket(RTMP* r, RTMPPacket* packet, int queue); // rtmp.c +3896 -func C_RTMP_SendPacket(r *C_RTMP, packet *C_RTMPPacket, queue int) error { - var prevPacket *C_RTMPPacket +func C_RTMP_SendPacket(s *Session, pkt *packet, queue int) error { + var prevPkt *packet var last int - if packet.nChannel >= r.channelsAllocatedOut { - n := int(packet.nChannel + 10) + if pkt.channel >= s.channelsAllocatedOut { + n := int(pkt.channel + 10) - var packets []*C_RTMPPacket - if r.vecChannelsOut == nil { - packets = make([]*C_RTMPPacket, n) + var pkts []*packet + if s.vecChannelsOut == nil { + pkts = make([]*packet, n) } else { - packets = append(r.vecChannelsOut[:packet.nChannel:packet.nChannel], make([]*C_RTMPPacket, 10)...) + pkts = append(s.vecChannelsOut[:pkt.channel:pkt.channel], make([]*packet, 10)...) } - r.vecChannelsOut = packets + s.vecChannelsOut = pkts - for i := int(r.channelsAllocatedOut); i < n; i++ { - r.vecChannelsOut[i] = nil + for i := int(s.channelsAllocatedOut); i < n; i++ { + s.vecChannelsOut[i] = nil } - r.channelsAllocatedOut = int32(n) + s.channelsAllocatedOut = int32(n) } - prevPacket = r.vecChannelsOut[packet.nChannel] + prevPkt = s.vecChannelsOut[pkt.channel] - if prevPacket != nil && packet.headerType != RTMP_PACKET_SIZE_LARGE { + if prevPkt != nil && pkt.headerType != RTMP_PACKET_SIZE_LARGE { // compress a bit by using the prev packet's attributes - if prevPacket.nBodySize == packet.nBodySize && prevPacket.packetType == packet.packetType && packet.headerType == RTMP_PACKET_SIZE_MEDIUM { - packet.headerType = RTMP_PACKET_SIZE_SMALL + if prevPkt.bodySize == pkt.bodySize && prevPkt.packetType == pkt.packetType && pkt.headerType == RTMP_PACKET_SIZE_MEDIUM { + pkt.headerType = RTMP_PACKET_SIZE_SMALL } - if prevPacket.nTimeStamp == packet.nTimeStamp && packet.headerType == RTMP_PACKET_SIZE_SMALL { - packet.headerType = RTMP_PACKET_SIZE_MINIMUM + if prevPkt.timestamp == pkt.timestamp && pkt.headerType == RTMP_PACKET_SIZE_SMALL { + pkt.headerType = RTMP_PACKET_SIZE_MINIMUM } - last = int(prevPacket.nTimeStamp) + last = int(prevPkt.timestamp) } - if packet.headerType > 3 { + if pkt.headerType > 3 { log.Printf("Sanity failed! trying to send header of type: 0x%02x.", - packet.headerType) + pkt.headerType) return errInvalidHeader } var headBytes []byte var origIdx int - if packet.body != nil { + if pkt.body != nil { // Span from -packetsize for the type to the start of the body. - headBytes = packet.header - origIdx = RTMP_MAX_HEADER_SIZE - packetSize[packet.headerType] + headBytes = pkt.header + origIdx = RTMP_MAX_HEADER_SIZE - packetSize[pkt.headerType] } else { // Allocate a new header and allow 6 bytes of movement backward. var hbuf [RTMP_MAX_HEADER_SIZE]byte @@ -1355,21 +1283,21 @@ func C_RTMP_SendPacket(r *C_RTMP, packet *C_RTMPPacket, queue int) error { var cSize int switch { - case packet.nChannel > 319: + case pkt.channel > 319: cSize = 2 - case packet.nChannel > 63: + case pkt.channel > 63: cSize = 1 } - hSize := packetSize[packet.headerType] + hSize := packetSize[pkt.headerType] if cSize != 0 { origIdx -= cSize hSize += cSize } var ts uint32 - if prevPacket != nil { - ts = uint32(int(packet.nTimeStamp) - last) + if prevPkt != nil { + ts = uint32(int(pkt.timestamp) - last) } if ts >= 0xffffff { origIdx -= 4 @@ -1379,10 +1307,10 @@ func C_RTMP_SendPacket(r *C_RTMP, packet *C_RTMPPacket, queue int) error { headerIdx := origIdx - c := packet.headerType << 6 + c := pkt.headerType << 6 switch cSize { case 0: - c |= byte(packet.nChannel) + c |= byte(pkt.channel) case 1: // Do nothing. case 2: @@ -1392,7 +1320,7 @@ func C_RTMP_SendPacket(r *C_RTMP, packet *C_RTMPPacket, queue int) error { headerIdx++ if cSize != 0 { - tmp := packet.nChannel - 64 + tmp := pkt.channel - 64 headBytes[headerIdx] = byte(tmp & 0xff) headerIdx++ @@ -1402,7 +1330,7 @@ func C_RTMP_SendPacket(r *C_RTMP, packet *C_RTMPPacket, queue int) error { } } - if packetSize[packet.headerType] > 1 { + if packetSize[pkt.headerType] > 1 { res := ts if ts > 0xffffff { res = 0xffffff @@ -1411,15 +1339,15 @@ func C_RTMP_SendPacket(r *C_RTMP, packet *C_RTMPPacket, queue int) error { headerIdx += 3 // 24bits } - if packetSize[packet.headerType] > 4 { - C_AMF_EncodeInt24(headBytes[headerIdx:], int32(packet.nBodySize)) + if packetSize[pkt.headerType] > 4 { + C_AMF_EncodeInt24(headBytes[headerIdx:], int32(pkt.bodySize)) headerIdx += 3 // 24bits - headBytes[headerIdx] = packet.packetType + headBytes[headerIdx] = pkt.packetType headerIdx++ } - if packetSize[packet.headerType] > 8 { - n := int(C_EncodeInt32LE(headBytes[headerIdx:headerIdx+4], packet.nInfoField2)) + if packetSize[pkt.headerType] > 8 { + n := int(C_EncodeInt32LE(headBytes[headerIdx:headerIdx+4], pkt.info)) headerIdx += n } @@ -1428,48 +1356,48 @@ func C_RTMP_SendPacket(r *C_RTMP, packet *C_RTMPPacket, queue int) error { headerIdx += 4 // 32bits } - nSize := int(packet.nBodySize) - nChunkSize := int(r.outChunkSize) + size := int(pkt.bodySize) + chunkSize := int(s.outChunkSize) if debugMode { - log.Printf("C_RTMP_SendPacket: %v->%v, size=%v", r.Link.conn.LocalAddr(), r.Link.conn.RemoteAddr(), nSize) + log.Printf("C_RTMP_SendPacket: %v->%v, size=%v", s.link.conn.LocalAddr(), s.link.conn.RemoteAddr(), size) } // Send the previously deferred packet if combining it with the next packet would exceed the chunk size. - if r.defered != nil && len(r.defered)+nSize+hSize > nChunkSize { - err := C_WriteN(r, r.defered) + if s.defered != nil && len(s.defered)+size+hSize > chunkSize { + err := C_WriteN(s, s.defered) if err != nil { return err } - r.defered = nil + s.defered = nil } // TODO(kortschak): Rewrite this horrific peice of premature optimisation. // NB: RTMP wants packets in chunks which are 128 bytes by default, but the server may request a different size. - for nSize+hSize != 0 { - if r.defered == nil && packet.packetType == RTMP_PACKET_TYPE_AUDIO && nSize < nChunkSize { - r.defered = headBytes[origIdx:][:nSize+hSize] + for size+hSize != 0 { + if s.defered == nil && pkt.packetType == RTMP_PACKET_TYPE_AUDIO && size < chunkSize { + s.defered = headBytes[origIdx:][:size+hSize] break } - if nChunkSize > nSize { - nChunkSize = nSize + if chunkSize > size { + chunkSize = size } - bytes := headBytes[origIdx:][:nChunkSize+hSize] - if r.defered != nil { + bytes := headBytes[origIdx:][:chunkSize+hSize] + if s.defered != nil { // Prepend the previously deferred packet and write it with the current one. - bytes = append(r.defered, bytes...) + bytes = append(s.defered, bytes...) } - err := C_WriteN(r, bytes) + err := C_WriteN(s, bytes) if err != nil { return err } - r.defered = nil + s.defered = nil - nSize -= nChunkSize - origIdx += nChunkSize + hSize + size -= chunkSize + origIdx += chunkSize + hSize hSize = 0 - if nSize > 0 { + if size > 0 { origIdx -= 1 + cSize hSize = 1 + cSize @@ -1481,7 +1409,7 @@ func C_RTMP_SendPacket(r *C_RTMP, packet *C_RTMPPacket, queue int) error { headBytes[origIdx] = 0xc0 | c if cSize != 0 { - tmp := int(packet.nChannel) - 64 + tmp := int(pkt.channel) - 64 headBytes[origIdx+1] = byte(tmp) if cSize == 2 { @@ -1497,107 +1425,41 @@ func C_RTMP_SendPacket(r *C_RTMP, packet *C_RTMPPacket, queue int) error { // We invoked a remote method // TODO: port the const - if packet.packetType == RTMP_PACKET_TYPE_INVOKE { - buf := packet.body[1:] - method := C_AMF_DecodeString(buf) + if pkt.packetType == RTMP_PACKET_TYPE_INVOKE { + buf := pkt.body[1:] + meth := C_AMF_DecodeString(buf) if debugMode { - log.Printf("invoking %v", method) + log.Printf("invoking %v", meth) } // keep it in call queue till result arrives if queue != 0 { - buf = buf[3+len(method):] + buf = buf[3+len(meth):] txn := int32(C_AMF_DecodeNumber(buf[:8])) - r.methodCalls = append(r.methodCalls, C_RTMP_METHOD{name: method, num: txn}) + s.methodCalls = append(s.methodCalls, method{name: meth, num: txn}) } } - if r.vecChannelsOut[packet.nChannel] == nil { - r.vecChannelsOut[packet.nChannel] = &C_RTMPPacket{} + if s.vecChannelsOut[pkt.channel] == nil { + s.vecChannelsOut[pkt.channel] = &packet{} } - *(r.vecChannelsOut[packet.nChannel]) = *packet + *(s.vecChannelsOut[pkt.channel]) = *pkt return nil } -// void RTMP_Close(RTMP *r); -// rtmp.c +4168 -func C_RTMP_Close(r *C_RTMP) { - C_CloseInternal(r, false) -} - -// static void CloseInternal(RTMP *r, int reconnect); -// rtmp.c +4175 -func C_CloseInternal(r *C_RTMP, reconnect bool) { - var i int32 - - if C_RTMP_IsConnected(r) { - if r.streamID > 0 { - i = r.streamID - if r.Link.protocol&RTMP_FEATURE_WRITE != 0 { - C_SendFCUnpublish(r) - } - C_SendDeleteStream(r, float64(i)) - } - err := r.Link.conn.Close() - if err != nil && debugMode { - log.Printf("C_RTMPSockBuf_Close error: %v\n", err) - } - } - - r.streamID = -1 - r.Link.conn = nil - r.nBWCheckCounter = 0 - r.nBytesIn = 0 - r.nBytesInSent = 0 - - r.write.nBytesRead = 0 - C_RTMPPacket_Free(&r.write) - - // NOTE: C frees - not using in our case - for i := 0; i < int(r.channelsAllocatedIn); i++ { - if r.vecChannelsIn[i] != nil { - r.vecChannelsIn[i] = nil - } - } - - //C.free(unsafe.Pointer(r.vecChannelsOut)) - r.vecChannelsOut = nil - r.channelsAllocatedOut = 0 - r.methodCalls = nil //C_AV_clear(r.methodCalls, r.numCalls) - - r.methodCalls = r.methodCalls[:0] - r.numInvokes = 0 - - r.bPlaying = false - - r.msgCounter = 0 - r.resplen = 0 - r.unackd = 0 - - if ((r.Link.lFlags & RTMP_LF_FTCU) != 0) && !reconnect { - r.Link.app = "" - r.Link.lFlags ^= RTMP_LF_FAPU - } - - if !reconnect { - r.Link.playpath0 = "" - } -} - /// int RTMP_Write(RTMP* r, const char* buf, int size); // rtmp.c +5136 -func C_RTMP_Write(r *C_RTMP, buf []byte) error { - // TODO: port RTMPPacket - var pkt = &r.write +func C_RTMP_Write(s *Session, buf []byte) error { + var pkt = &s.write var enc []byte size := len(buf) var num int - pkt.nChannel = 0x04 - pkt.nInfoField2 = r.streamID + pkt.channel = 0x04 + pkt.info = s.streamID for len(buf) != 0 { - if pkt.nBytesRead == 0 { + if pkt.bytesRead == 0 { if size < minDataSize { return errTinyPacket } @@ -1608,51 +1470,50 @@ func C_RTMP_Write(r *C_RTMP, buf []byte) error { pkt.packetType = buf[0] buf = buf[1:] - pkt.nBodySize = C_AMF_DecodeInt24(buf[:3]) + pkt.bodySize = C_AMF_DecodeInt24(buf[:3]) buf = buf[3:] - pkt.nTimeStamp = C_AMF_DecodeInt24(buf[:3]) + pkt.timestamp = C_AMF_DecodeInt24(buf[:3]) buf = buf[3:] - pkt.nTimeStamp |= uint32(buf[0]) << 24 + pkt.timestamp |= uint32(buf[0]) << 24 buf = buf[4:] - pkt.headerType = RTMP_PACKET_SIZE_MEDIUM + headerType := uint8(RTMP_PACKET_SIZE_MEDIUM) switch pkt.packetType { case RTMP_PACKET_TYPE_VIDEO, RTMP_PACKET_TYPE_AUDIO: - if pkt.nTimeStamp == 0 { - pkt.headerType = RTMP_PACKET_SIZE_LARGE + if pkt.timestamp == 0 { + headerType = RTMP_PACKET_SIZE_LARGE } case RTMP_PACKET_TYPE_INFO: - pkt.headerType = RTMP_PACKET_SIZE_LARGE - pkt.nBodySize += 16 + headerType = RTMP_PACKET_SIZE_LARGE + pkt.bodySize += 16 } - // TODO: Port this - C_RTMPPacket_Alloc(pkt, pkt.nBodySize) + resizePacket(pkt, pkt.bodySize, headerType) - enc = pkt.body[:pkt.nBodySize] + enc = pkt.body[:pkt.bodySize] if pkt.packetType == RTMP_PACKET_TYPE_INFO { enc = C_AMF_EncodeString(enc, setDataFrame) if enc == nil { return errEncoding } - pkt.nBytesRead = uint32(len(pkt.body) - len(enc)) + pkt.bytesRead = uint32(len(pkt.body) - len(enc)) } } else { - enc = pkt.body[:pkt.nBodySize][pkt.nBytesRead:] + enc = pkt.body[:pkt.bodySize][pkt.bytesRead:] } - num = int(pkt.nBodySize - pkt.nBytesRead) + num = int(pkt.bodySize - pkt.bytesRead) if num > len(buf) { num = len(buf) } copy(enc[:num], buf[:num]) - pkt.nBytesRead += uint32(num) + pkt.bytesRead += uint32(num) buf = buf[num:] - if pkt.nBytesRead == pkt.nBodySize { - err := C_RTMP_SendPacket(r, pkt, 0) - C_RTMPPacket_Free(pkt) - pkt.nBytesRead = 0 + if pkt.bytesRead == pkt.bodySize { + err := C_RTMP_SendPacket(s, pkt, 0) + pkt.body = nil + pkt.bytesRead = 0 if err != nil { return err } diff --git a/rtmp/rtmp_headers.go b/rtmp/rtmp_headers.go index 4174c332..2260f01d 100644 --- a/rtmp/rtmp_headers.go +++ b/rtmp/rtmp_headers.go @@ -7,9 +7,11 @@ DESCRIPTION AUTHORS Saxon Nelson-Milton + Dan Kortschak + Alan Noble LICENSE - rtmp_headers.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + rtmp_headers.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the @@ -112,102 +114,47 @@ const ( RTMP_MAX_HEADER_SIZE = 18 ) -// typedef struct RTMPChunk -// rtmp.h +105 -type C_RTMPChunk struct { +type chunk struct { headerSize int32 - data []byte + data []byte header [RTMP_MAX_HEADER_SIZE]byte } -// typedef struct RTMPPacket -// rtmp.h +113 -type C_RTMPPacket struct { +type packet struct { headerType uint8 packetType uint8 hasAbsTimestamp bool - nChannel int32 - nTimeStamp uint32 - nInfoField2 int32 - nBodySize uint32 - nBytesRead uint32 - chunk *C_RTMPChunk + channel int32 + timestamp uint32 + info int32 + bodySize uint32 + bytesRead uint32 + chunk *chunk header []byte body []byte } -// typedef struct RTMPSockBuf -// rtmp.h +127 -// DELETED: subsumed by C_RTMP_LNK - -// RTMPPacket_IsReady(a) -// rtmp.h +142 -func C_RTMPPacket_IsReady(p *C_RTMPPacket) bool { - return p.nBytesRead == p.nBodySize +type link struct { + host string + playpath string + tcUrl string + swfUrl string + pageUrl string + app string + auth string + flashVer string + token string + extras C_AMFObject + seekTime int32 + lFlags int32 + swfAge int32 + protocol int32 + timeout uint + port uint16 + conn *net.TCPConn } -// typedef struct RTMP_LNK -// rtmp.h +144 -type C_RTMP_LNK struct { - host string - playpath0 string - playpath string - tcUrl string - swfUrl string - pageUrl string - app string - auth string - flashVer string - token string - extras C_AMFObject - seekTime int32 - lFlags int32 - swfAge int32 - protocol int32 - timeout uint - port uint16 - conn *net.TCPConn -} - -// typedef struct RTMPMethod -// rtmp.h +231 -type C_RTMP_METHOD struct { +type method struct { name string num int32 } - -// typedef struct RTMP -// rtmp.h +237 -type C_RTMP struct { - inChunkSize int32 - outChunkSize int32 - nBWCheckCounter int32 - nBytesIn int32 - nBytesInSent int32 - nBufferMS int32 - streamID int32 - mediaChannel int32 - pausing int32 - nServerBW int32 - nClientBW int32 - nClientBW2 uint8 - bPlaying bool - bSendEncoding bool - numInvokes int32 - methodCalls []C_RTMP_METHOD - channelsAllocatedIn int32 - channelsAllocatedOut int32 - vecChannelsIn []*C_RTMPPacket - vecChannelsOut []*C_RTMPPacket - channelTimestamp []int32 - fAudioCodecs float64 - fVideoCodecs float64 - fEncoding float64 - fDuration float64 - msgCounter int32 - resplen int32 - unackd int32 - write C_RTMPPacket - defered []byte - Link C_RTMP_LNK -} diff --git a/rtmp/session.go b/rtmp/session.go index ff2cbd83..b6b066f4 100644 --- a/rtmp/session.go +++ b/rtmp/session.go @@ -8,9 +8,10 @@ DESCRIPTION AUTHORS Saxon Nelson-Milton Dan Kortschak + Alan Noble LICENSE - session.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + session.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the @@ -36,14 +37,37 @@ import ( "errors" ) -// session provides parameters required for an rtmp communication session. +// Session holds the state for an RTMP session. type Session struct { - rtmp *C_RTMP - url string - timeout uint + url string + timeout uint + inChunkSize int32 + outChunkSize int32 + bwCheckCounter int32 + nBytesIn int32 + nBytesInSent int32 + streamID int32 + serverBW int32 + clientBW int32 + clientBW2 uint8 + isPlaying bool + sendEncoding bool + numInvokes int32 + methodCalls []method + channelsAllocatedIn int32 + channelsAllocatedOut int32 + vecChannelsIn []*packet + vecChannelsOut []*packet + channelTimestamp []int32 + audioCodecs float64 + videoCodecs float64 + encoding float64 + write packet + defered []byte + link link } -// NewSession returns a new session. +// NewSession returns a new Session. func NewSession(url string, connectTimeout uint) *Session { return &Session{ url: url, @@ -51,48 +75,94 @@ func NewSession(url string, connectTimeout uint) *Session { } } -// Open establishes an rtmp connection with the url passed into the -// constructor +// Open establishes an rtmp connection with the url passed into the constructor. func (s *Session) Open() error { - if s.rtmp != nil { + if s.isConnected() { return errors.New("rtmp: attempt to start already running session") } - var err error - s.rtmp, err = startSession(s.rtmp, s.url, s.timeout) - if s.rtmp == nil { + err := s.start() + if err != nil { return err } return nil } -// Close terminates the rtmp connection -func (s *Session) Close() error { - if s.rtmp == nil { - return Err(3) +// start does the heavylifting for Open(). +func (s *Session) start() error { + s.init() + err := C_RTMP_SetupURL(s, s.url) + if err != nil { + s.close() + return err } - ret := endSession(s.rtmp) - s.rtmp = nil - if ret != 0 { - return Err(ret) + + C_RTMP_EnableWrite(s) + err = C_RTMP_Connect(s, nil) + if err != nil { + s.close() + return err + } + + err = C_RTMP_ConnectStream(s, 0) + if err != nil { + s.close() + return err } return nil } -// Write writes a frame (flv tag) to the rtmp connection -func (s *Session) Write(data []byte) (int, error) { - if s.rtmp == nil { - return 0, Err(3) - } +// init initializes various RTMP defauls. +// ToDo: define consts for the magic numbers. +func (s *Session) init() { + s.inChunkSize = RTMP_DEFAULT_CHUNKSIZE + s.outChunkSize = RTMP_DEFAULT_CHUNKSIZE + s.clientBW = 2500000 + s.clientBW2 = 2 + s.serverBW = 2500000 + s.audioCodecs = 3191.0 + s.videoCodecs = 252.0 + s.link.timeout = s.timeout + s.link.swfAge = 30 +} - if !C_RTMP_IsConnected(s.rtmp) { +// Close terminates the rtmp connection, +func (s *Session) Close() error { + if !s.isConnected() { + return Err(3) + } + s.close() + return nil +} + +// close does the heavylifting for Close(). +// Any errors are ignored as it is often called in response to an earlier error. +func (s *Session) close() { + if s.isConnected() { + if s.streamID > 0 { + if s.link.protocol&RTMP_FEATURE_WRITE != 0 { + C_SendFCUnpublish(s) + } + C_SendDeleteStream(s, float64(s.streamID)) + } + s.link.conn.Close() + } + s = &Session{} +} + +// Write writes a frame (flv tag) to the rtmp connection. +func (s *Session) Write(data []byte) (int, error) { + if !s.isConnected() { return 0, Err(1) } - - err := C_RTMP_Write(s.rtmp, data) + err := C_RTMP_Write(s, data) if err != nil { - //if C.RTMP_Write(s.rtmp, (*byte)(unsafe.Pointer(&data[0])), int32(len(data))) == 0 { // TODO: propagate err return 0, Err(2) } return len(data), nil } + +// isConnected returns true if the RTMP connection is up. +func (s *Session) isConnected() bool { + return s.link.conn != nil +}