/* NAME rtmp.go DESCRIPTION See Readme.md AUTHORS Saxon Nelson-Milton Dan Kortschak Alan Noble LICENSE rtmp.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. Derived from librtmp under the GNU Lesser General Public License 2.1 Copyright (C) 2005-2008 Team XBMC http://www.xbmc.org Copyright (C) 2008-2009 Andrej Stepanchuk Copyright (C) 2009-2010 Howard Chu */ package rtmp import ( "bytes" "encoding/binary" "errors" "io" "math/rand" "net" "strconv" "time" ) const ( pkg = "rtmp:" minDataSize = 11 ) const ( setDataFrame = "@setDataFrame" av__checkbw = "_checkbw" av__onbwcheck = "_onbwcheck" av__onbwdone = "_onbwdone" av__result = "_result" av_app = "app" av_audioCodecs = "audioCodecs" av_capabilities = "capabilities" av_close = "close" av_code = "code" av_connect = "connect" av_createStream = "createStream" av_deleteStream = "deleteStream" av_FCPublish = "FCPublish" av_FCUnpublish = "FCUnpublish" av_flashVer = "flashVer" av_fpad = "fpad" av_level = "level" av_live = "live" av_NetConnection_Connect_InvalidApp = "NetConnection.Connect.InvalidApp" av_NetStream_Failed = "NetStream.Failed" av_NetStream_Pause_Notify = "NetStream.Pause.Notify" av_NetStream_Play_Complete = "NetStream.Play.Complete" av_NetStream_Play_Failed = "NetStream.Play.Failed" av_NetStream_Play_PublishNotify = "NetStream.Play.PublishNotify" av_NetStream_Play_Start = "NetStream.Play.Start" av_NetStream_Play_Stop = "NetStream.Play.Stop" av_NetStream_Play_StreamNotFound = "NetStream.Play.StreamNotFound" av_NetStream_Play_UnpublishNotify = "NetStream.Play.UnpublishNotify" av_NetStream_Publish_Start = "NetStream.Publish.Start" av_NetStream_Seek_Notify = "NetStream.Seek.Notify" av_nonprivate = "nonprivate" av_objectEncoding = "objectEncoding" av_onBWDone = "onBWDone" av_onFCSubscribe = "onFCSubscribe" av_onFCUnsubscribe = "onFCUnsubscribe" av_onStatus = "onStatus" av_pageUrl = "pageUrl" av_ping = "ping" av_play = "play" av_playlist_ready = "playlist_ready" av_publish = "publish" av_releaseStream = "releaseStream" av_secureToken = "secureToken" av_set_playlist = "set_playlist" av_swfUrl = "swfUrl" av_tcUrl = "tcUrl" av_type = "type" av_videoCodecs = "videoCodecs" av_videoFunction = "videoFunction" ) // RTMP protocol strings. var rtmpProtocolStrings = [...]string{ "rtmp", "rtmpt", "rtmpe", "rtmpte", "rtmps", "rtmpts", "", "", "rtmfp", } // RTMP errors. var ( errUnknownScheme = errors.New("rtmp: unknown scheme") errConnected = errors.New("rtmp: already connected") errNotConnected = errors.New("rtmp: not connected") errHandshake = errors.New("rtmp: handshake failed") errConnSend = errors.New("rtmp: connection send error") errConnStream = errors.New("rtmp: connection stream error") errInvalidHeader = errors.New("rtmp: invalid header") errInvalidBody = errors.New("rtmp: invalid body") errTinyPacket = errors.New("rtmp: packet too small") errEncoding = errors.New("rtmp: encoding error") errDecoding = errors.New("rtmp: decoding error") errCopying = errors.New("rtmp: copying error") ) // setupURL parses the RTMP URL. func setupURL(s *Session, addr string) (err error) { s.link.protocol, s.link.host, s.link.port, s.link.app, s.link.playpath, err = parseURL(addr) if err != nil { return err } if s.link.tcUrl == "" { if s.link.app != "" { s.link.tcUrl = rtmpProtocolStrings[s.link.protocol] + "://" + s.link.host + ":" + strconv.Itoa(int(s.link.port)) + "/" + s.link.app s.link.lFlags |= RTMP_LF_FTCU } else { s.link.tcUrl = addr } } if s.link.port == 0 { switch { case (s.link.protocol & RTMP_FEATURE_SSL) != 0: s.link.port = 433 s.log(FatalLevel, pkg+"SSL not supported") case (s.link.protocol & RTMP_FEATURE_HTTP) != 0: s.link.port = 80 default: s.link.port = 1935 } } return nil } // connect establishes an RTMP connection. func connect(s *Session) error { addr, err := net.ResolveTCPAddr("tcp4", s.link.host+":"+strconv.Itoa(int(s.link.port))) if err != nil { return err } s.link.conn, err = net.DialTCP("tcp4", nil, addr) if err != nil { s.log(WarnLevel, pkg+"dial failed", "error", err.Error()) return err } s.log(DebugLevel, pkg+"connected") err = handshake(s) if err != nil { s.log(WarnLevel, pkg+"handshake failed", "error", err.Error()) return errHandshake } s.log(DebugLevel, pkg+"handshaked") err = sendConnectPacket(s) if err != nil { s.log(WarnLevel, pkg+"sendConnect failed", "error", err.Error()) return errConnSend } return nil } // connectStream reads a packet and handles it func connectStream(s *Session) error { var pkt packet for !s.isPlaying && s.isConnected() { err := readPacket(s, &pkt) if err != nil { break } if pkt.bodySize == 0 { continue } if pkt.packetType == RTMP_PACKET_TYPE_AUDIO || pkt.packetType == RTMP_PACKET_TYPE_VIDEO || pkt.packetType == RTMP_PACKET_TYPE_INFO { s.log(DebugLevel, pkg+"got packet before play; ignoring") pkt.body = nil continue } handlePacket(s, &pkt) pkt.body = nil } if !s.isPlaying { return errConnStream } return nil } // handlePacket handles a packet that the client has received. // NB: cases have been commented out that are not currently used by AusOcean func handlePacket(s *Session, pkt *packet) int32 { var hasMediaPacket int32 switch pkt.packetType { case RTMP_PACKET_TYPE_CHUNK_SIZE: if pkt.bodySize >= 4 { s.inChunkSize = int32(C_AMF_DecodeInt32(pkt.body[:4])) } case RTMP_PACKET_TYPE_BYTES_READ_REPORT: s.serverBW = int32(C_AMF_DecodeInt32(pkt.body[:4])) case RTMP_PACKET_TYPE_CONTROL: s.log(FatalLevel, pkg+"unsupported packet type RTMP_PACKET_TYPE_CONTROL") case RTMP_PACKET_TYPE_SERVER_BW: s.serverBW = int32(C_AMF_DecodeInt32(pkt.body[:4])) case RTMP_PACKET_TYPE_CLIENT_BW: s.clientBW = int32(C_AMF_DecodeInt32(pkt.body[:4])) if pkt.bodySize > 4 { s.clientBW2 = pkt.body[4] } else { s.clientBW2 = 0xff } case RTMP_PACKET_TYPE_AUDIO: s.log(FatalLevel, pkg+"unsupported packet type RTMP_PACKET_TYPE_AUDIO") case RTMP_PACKET_TYPE_VIDEO: s.log(FatalLevel, pkg+"unsupported packet type RTMP_PACKET_TYPE_VIDEO") case RTMP_PACKET_TYPE_FLEX_MESSAGE: s.log(FatalLevel, pkg+"unsupported packet type RTMP_PACKET_TYPE_FLEX_MESSAGE") case RTMP_PACKET_TYPE_INFO: s.log(FatalLevel, pkg+"unsupported packet type RTMP_PACKET_TYPE_INFO") case RTMP_PACKET_TYPE_INVOKE: err := handleInvoke(s, pkt.body[:pkt.bodySize]) if err != nil { // This will never happen with the methods we implement. s.log(WarnLevel, pkg+"unexpected error from handleInvoke", "error", err.Error()) hasMediaPacket = 2 } case RTMP_PACKET_TYPE_FLASH_VIDEO: s.log(FatalLevel, pkg+"unsupported packet type RTMP_PACKET_TYPE_FLASH_VIDEO") default: s.log(WarnLevel, pkg+"unknown packet type", "type", pkt.packetType) } return hasMediaPacket } func readN(s *Session, buf []byte) error { err := s.link.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(s.link.timeout))) if err != nil { return err } n, err := io.ReadFull(s.link.conn, buf) if err != nil { s.log(WarnLevel, pkg+"read failed", "error", err.Error()) s.close() return err } s.nBytesIn += int32(n) if s.nBytesIn > (s.nBytesInSent + s.clientBW/10) { err := sendBytesReceived(s) if err != nil { return err } } return nil } func writeN(s *Session, buf []byte) error { //ToDo: consider using a different timeout for writes than for reads err := s.link.conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(s.link.timeout))) if err != nil { return err } _, err = s.link.conn.Write(buf) if err != nil { s.log(WarnLevel, pkg+"write failed", "error", err.Error()) s.close() return err } return nil } func sendConnectPacket(s *Session) error { var pbuf [4096]byte pkt := packet{ channel: RTMP_CHANNEL_CONTROL, headerType: RTMP_PACKET_SIZE_LARGE, packetType: RTMP_PACKET_TYPE_INVOKE, header: pbuf[:], body: pbuf[RTMP_MAX_HEADER_SIZE:], } enc := pkt.body enc = C_AMF_EncodeString(enc, av_connect) if enc == nil { return errEncoding } s.numInvokes += 1 enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_OBJECT enc = enc[1:] enc = C_AMF_EncodeNamedString(enc, av_app, s.link.app) if enc == nil { return errEncoding } if s.link.protocol&RTMP_FEATURE_WRITE != 0 { enc = C_AMF_EncodeNamedString(enc, av_type, av_nonprivate) if enc == nil { return errEncoding } } if s.link.flashVer != "" { enc = C_AMF_EncodeNamedString(enc, av_flashVer, s.link.flashVer) if enc == nil { return errEncoding } } if s.link.swfUrl != "" { enc = C_AMF_EncodeNamedString(enc, av_swfUrl, s.link.swfUrl) if enc == nil { return errEncoding } } if s.link.tcUrl != "" { enc = C_AMF_EncodeNamedString(enc, av_tcUrl, s.link.tcUrl) if enc == nil { return errEncoding } } if s.link.protocol&RTMP_FEATURE_WRITE == 0 { enc = C_AMF_EncodeNamedBoolean(enc, av_fpad, false) if enc == nil { return errEncoding } enc = C_AMF_EncodeNamedNumber(enc, av_capabilities, 15) if enc == nil { return errEncoding } enc = C_AMF_EncodeNamedNumber(enc, av_audioCodecs, s.audioCodecs) if enc == nil { return errEncoding } enc = C_AMF_EncodeNamedNumber(enc, av_videoCodecs, s.videoCodecs) if enc == nil { return errEncoding } enc = C_AMF_EncodeNamedNumber(enc, av_videoFunction, 1) if enc == nil { return errEncoding } if s.link.pageUrl != "" { enc = C_AMF_EncodeNamedString(enc, av_pageUrl, s.link.pageUrl) if enc == nil { return errEncoding } } } if s.encoding != 0.0 || s.sendEncoding { enc = C_AMF_EncodeNamedNumber(enc, av_objectEncoding, s.encoding) if enc == nil { return errEncoding } } if copy(enc, []byte{0, 0, AMF_OBJECT_END}) != 3 { return errCopying // TODO: is this even possible? } enc = enc[3:] /* add auth string */ if s.link.auth != "" { enc = C_AMF_EncodeBoolean(enc, s.link.lFlags&RTMP_LF_AUTH != 0) if enc == nil { return errEncoding } enc = C_AMF_EncodeString(enc, s.link.auth) if enc == nil { return errEncoding } } for i := range s.link.extras.o_props { enc = C_AMF_PropEncode(&s.link.extras.o_props[i], enc) if enc == nil { return errEncoding } } pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) return sendPacket(s, &pkt, true) } func sendCreateStream(s *Session) error { var pbuf [256]byte pkt := packet{ channel: RTMP_CHANNEL_CONTROL, headerType: RTMP_PACKET_SIZE_MEDIUM, packetType: RTMP_PACKET_TYPE_INVOKE, header: pbuf[:], body: pbuf[RTMP_MAX_HEADER_SIZE:], } enc := pkt.body enc = C_AMF_EncodeString(enc, av_createStream) if enc == nil { return errEncoding } s.numInvokes++ enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_NULL enc = enc[1:] pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) return sendPacket(s, &pkt, true) } func sendReleaseStream(s *Session) error { var pbuf [1024]byte pkt := packet{ channel: RTMP_CHANNEL_CONTROL, headerType: RTMP_PACKET_SIZE_MEDIUM, packetType: RTMP_PACKET_TYPE_INVOKE, header: pbuf[:], body: pbuf[RTMP_MAX_HEADER_SIZE:], } enc := pkt.body enc = C_AMF_EncodeString(enc, av_releaseStream) if enc == nil { return errEncoding } s.numInvokes++ enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_NULL enc = enc[1:] enc = C_AMF_EncodeString(enc, s.link.playpath) if enc == nil { return errEncoding } pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) return sendPacket(s, &pkt, false) } func sendFCPublish(s *Session) error { var pbuf [1024]byte pkt := packet{ channel: RTMP_CHANNEL_CONTROL, headerType: RTMP_PACKET_SIZE_MEDIUM, packetType: RTMP_PACKET_TYPE_INVOKE, header: pbuf[:], body: pbuf[RTMP_MAX_HEADER_SIZE:], } enc := pkt.body enc = C_AMF_EncodeString(enc, av_FCPublish) if enc == nil { return errEncoding } s.numInvokes++ enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_NULL enc = enc[1:] enc = C_AMF_EncodeString(enc, s.link.playpath) if enc == nil { return errEncoding } pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) return sendPacket(s, &pkt, false) } func sendFCUnpublish(s *Session) error { var pbuf [1024]byte pkt := packet{ channel: RTMP_CHANNEL_CONTROL, headerType: RTMP_PACKET_SIZE_MEDIUM, packetType: RTMP_PACKET_TYPE_INVOKE, header: pbuf[:], body: pbuf[RTMP_MAX_HEADER_SIZE:], } enc := pkt.body enc = C_AMF_EncodeString(enc, av_FCUnpublish) if enc == nil { return errEncoding } s.numInvokes++ enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_NULL enc = enc[1:] enc = C_AMF_EncodeString(enc, s.link.playpath) if enc == nil { return errEncoding } pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) return sendPacket(s, &pkt, false) } func sendPublish(s *Session) error { var pbuf [1024]byte pkt := packet{ channel: RTMP_CHANNEL_SOURCE, headerType: RTMP_PACKET_SIZE_LARGE, packetType: RTMP_PACKET_TYPE_INVOKE, header: pbuf[:], body: pbuf[RTMP_MAX_HEADER_SIZE:], } enc := pkt.body enc = C_AMF_EncodeString(enc, av_publish) if enc == nil { return errEncoding } s.numInvokes++ enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_NULL enc = enc[1:] enc = C_AMF_EncodeString(enc, s.link.playpath) if enc == nil { return errEncoding } enc = C_AMF_EncodeString(enc, av_live) if enc == nil { return errEncoding } pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) return sendPacket(s, &pkt, true) } func sendDeleteStream(s *Session, dStreamId float64) error { var pbuf [256]byte pkt := packet{ channel: RTMP_CHANNEL_CONTROL, headerType: RTMP_PACKET_SIZE_MEDIUM, packetType: RTMP_PACKET_TYPE_INVOKE, header: pbuf[:], body: pbuf[RTMP_MAX_HEADER_SIZE:], } enc := pkt.body enc = C_AMF_EncodeString(enc, av_deleteStream) if enc == nil { return errEncoding } s.numInvokes++ enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_NULL enc = enc[1:] enc = C_AMF_EncodeNumber(enc, dStreamId) if enc == nil { return errEncoding } pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) /* no response expected */ return sendPacket(s, &pkt, false) } // sendBytesReceived tells the server how many bytes the client has received. func sendBytesReceived(s *Session) error { var pbuf [256]byte pkt := packet{ channel: RTMP_CHANNEL_BYTES_READ, headerType: RTMP_PACKET_SIZE_MEDIUM, packetType: RTMP_PACKET_TYPE_BYTES_READ_REPORT, header: pbuf[:], body: pbuf[RTMP_MAX_HEADER_SIZE:], } enc := pkt.body s.nBytesInSent = s.nBytesIn enc = C_AMF_EncodeInt32(enc, s.nBytesIn) if enc == nil { return errEncoding } pkt.bodySize = 4 return sendPacket(s, &pkt, false) } func sendCheckBW(s *Session) error { var pbuf [256]byte pkt := packet{ channel: RTMP_CHANNEL_CONTROL, headerType: RTMP_PACKET_SIZE_LARGE, packetType: RTMP_PACKET_TYPE_INVOKE, header: pbuf[:], body: pbuf[RTMP_MAX_HEADER_SIZE:], } enc := pkt.body enc = C_AMF_EncodeString(enc, av__checkbw) if enc == nil { return errEncoding } s.numInvokes++ enc = C_AMF_EncodeNumber(enc, float64(s.numInvokes)) if enc == nil { return errEncoding } enc[0] = AMF_NULL enc = enc[1:] pkt.bodySize = uint32((len(pbuf) - RTMP_MAX_HEADER_SIZE) - len(enc)) return sendPacket(s, &pkt, false) } func eraseMethod(m []method, i int) []method { copy(m[i:], m[i+1:]) m[len(m)-1] = method{} return m[:len(m)-1] } // int handleInvoke handles a packet invoke request // Side effects: s.isPlaying set to true upon av_NetStream_Publish_Start func handleInvoke(s *Session, body []byte) error { if body[0] != 0x02 { return errInvalidBody } var obj C_AMFObject nRes := C_AMF_Decode(&obj, body, 0) if nRes < 0 { return errDecoding } meth := C_AMFProp_GetString(C_AMF_GetProp(&obj, "", 0)) s.log(DebugLevel, pkg+"invoking method "+meth) txn := C_AMFProp_GetNumber(C_AMF_GetProp(&obj, "", 1)) switch meth { case av__result: var methodInvoked string for i, m := range s.methodCalls { if float64(m.num) == txn { methodInvoked = m.name s.methodCalls = eraseMethod(s.methodCalls, i) break } } if methodInvoked == "" { s.log(WarnLevel, pkg+"received result without matching request", "id", txn) goto leave } s.log(DebugLevel, pkg+"received result for method", "id", txn) switch methodInvoked { case av_connect: if s.link.token != "" { s.log(FatalLevel, pkg+"no support for link token") } if (s.link.protocol & RTMP_FEATURE_WRITE) != 0 { sendReleaseStream(s) sendFCPublish(s) } else { s.log(FatalLevel, pkg+"link protocol has no RTMP_FEATURE_WRITE") } sendCreateStream(s) if (s.link.protocol & RTMP_FEATURE_WRITE) == 0 { s.log(FatalLevel, pkg+"link protocol has no RTMP_FEATURE_WRITE") } case av_createStream: s.streamID = int32(C_AMFProp_GetNumber(C_AMF_GetProp(&obj, "", 3))) if s.link.protocol&RTMP_FEATURE_WRITE != 0 { sendPublish(s) } else { s.log(FatalLevel, pkg+"link protocol has no RTMP_FEATURE_WRITE") } case av_play, av_publish: s.log(FatalLevel, pkg+"unsupported method av_play/av_publish") } case av_onBWDone: if s.bwCheckCounter == 0 { sendCheckBW(s) } case av_onFCUnsubscribe, av_onFCSubscribe: s.log(FatalLevel, pkg+"unsupported method av_onFCUnsubscribe/av_onFCSubscribe") case av_ping: s.log(FatalLevel, pkg+"unsupported method av_ping") case av__onbwcheck: s.log(FatalLevel, pkg+"unsupported method av_onbwcheck") case av__onbwdone: s.log(FatalLevel, pkg+"unsupported method av_onbwdone") case av_close: s.log(FatalLevel, pkg+"unsupported method av_close") case av_onStatus: var obj2 C_AMFObject C_AMFProp_GetObject(C_AMF_GetProp(&obj, "", 3), &obj2) code := C_AMFProp_GetString(C_AMF_GetProp(&obj2, av_code, -1)) level := C_AMFProp_GetString(C_AMF_GetProp(&obj2, av_level, -1)) // Not used. s.log(DebugLevel, pkg+"onStatus", "code", code, "level", level) switch code { case av_NetStream_Failed, av_NetStream_Play_Failed, av_NetStream_Play_StreamNotFound, av_NetConnection_Connect_InvalidApp: s.log(FatalLevel, pkg+"unsupported method av_NetStream/av_NetStream_Play_Failed/av_netSTream_Play_StreamNotFound/av_netConnection_Connect_invalidApp") case av_NetStream_Play_Start, av_NetStream_Play_PublishNotify: s.log(FatalLevel, pkg+"unsupported method av_NetStream_Play_Start/av_NetStream_Play_PublishNotify") case av_NetStream_Publish_Start: s.isPlaying = true for i, m := range s.methodCalls { if m.name == av_publish { s.methodCalls = eraseMethod(s.methodCalls, i) break } } case av_NetStream_Play_Complete, av_NetStream_Play_Stop, av_NetStream_Play_UnpublishNotify: s.log(FatalLevel, pkg+"unsupported method av_NetStream_Play_Complete/av_NetStream_Play_Stop/av_NetStream_Play_UnpublishNotify") case av_NetStream_Seek_Notify: s.log(FatalLevel, pkg+"unsupported method av_netStream_Seek_Notify") case av_NetStream_Pause_Notify: s.log(FatalLevel, pkg+"unsupported method av_NetStream_Pause_Notify") } case av_playlist_ready: s.log(FatalLevel, pkg+"unsupported method av_playlist_ready") default: s.log(FatalLevel, pkg+"unknown method "+meth) } leave: C_AMF_Reset(&obj) return nil } func handshake(s *Session) error { var clientbuf [RTMP_SIG_SIZE + 1]byte clientsig := clientbuf[1:] var serversig [RTMP_SIG_SIZE]byte clientbuf[0] = RTMP_CHANNEL_CONTROL binary.BigEndian.PutUint32(clientsig, uint32(time.Now().UnixNano()/1000000)) copy(clientsig[4:8], []byte{0, 0, 0, 0}) for i := 8; i < RTMP_SIG_SIZE; i++ { clientsig[i] = byte(rand.Intn(256)) } err := writeN(s, clientbuf[:]) if err != nil { return err } var typ [1]byte err = readN(s, typ[:]) if err != nil { return err } s.log(DebugLevel, pkg+"handshake", "received", typ[0]) if typ[0] != clientbuf[0] { s.log(WarnLevel, pkg+"handshake type mismatch", "sent", clientbuf[0], "received", typ) } err = readN(s, serversig[:]) if err != nil { return err } // decode server response suptime := binary.BigEndian.Uint32(serversig[:4]) s.log(DebugLevel, pkg+"server uptime", "uptime", suptime) // 2nd part of handshake err = writeN(s, serversig[:]) if err != nil { return err } err = readN(s, serversig[:]) if err != nil { return err } if !bytes.Equal(serversig[:RTMP_SIG_SIZE], clientbuf[1:RTMP_SIG_SIZE+1]) { s.log(WarnLevel, pkg+"signature mismatch", "serversig", serversig[:RTMP_SIG_SIZE], "clientsig", clientbuf[1:RTMP_SIG_SIZE+1]) } return nil } // write prepares data to write then sends it. func (s *Session) write(buf []byte) error { pkt := packet{ channel: RTMP_CHANNEL_SOURCE, info: s.streamID, } var enc []byte for len(buf) != 0 { if pkt.bytesRead == 0 { if len(buf) < minDataSize { return errTinyPacket } if buf[0] == 'F' && buf[1] == 'L' && buf[2] == 'V' { buf = buf[13:] } pkt.packetType = buf[0] buf = buf[1:] pkt.bodySize = C_AMF_DecodeInt24(buf[:3]) buf = buf[3:] pkt.timestamp = C_AMF_DecodeInt24(buf[:3]) buf = buf[3:] pkt.timestamp |= uint32(buf[0]) << 24 buf = buf[4:] headerType := uint8(RTMP_PACKET_SIZE_MEDIUM) switch pkt.packetType { case RTMP_PACKET_TYPE_VIDEO, RTMP_PACKET_TYPE_AUDIO: if pkt.timestamp == 0 { headerType = RTMP_PACKET_SIZE_LARGE } case RTMP_PACKET_TYPE_INFO: headerType = RTMP_PACKET_SIZE_LARGE pkt.bodySize += 16 } resizePacket(&pkt, pkt.bodySize, headerType) enc = pkt.body[:pkt.bodySize] if pkt.packetType == RTMP_PACKET_TYPE_INFO { enc = C_AMF_EncodeString(enc, setDataFrame) if enc == nil { return errEncoding } pkt.bytesRead = uint32(len(pkt.body) - len(enc)) } } else { enc = pkt.body[:pkt.bodySize][pkt.bytesRead:] } num := int(pkt.bodySize - pkt.bytesRead) if num > len(buf) { num = len(buf) } copy(enc[:num], buf[:num]) pkt.bytesRead += uint32(num) buf = buf[num:] if pkt.bytesRead == pkt.bodySize { err := sendPacket(s, &pkt, false) pkt.body = nil pkt.bytesRead = 0 if err != nil { return err } if len(buf) < 4 { return nil } buf = buf[4:] } } return nil }