rtmp: clean up session handling and CGO interface

* Make C compilation arch-independent, including shared lib use and fix
  for broken librtmp include install.
* Provide package-root level makefile for building C lib.
* Remove package global.
* Use Go-idiomatic naming for methods.
* Restucture flow to reduce indentation.
* Use []byte len in place of additional param.
* Use unix-idiomatic return status values.
This commit is contained in:
Dan Kortschak 2018-05-29 15:26:36 +09:30
parent 093291d214
commit 173d7c3879
10 changed files with 225 additions and 648 deletions

View File

@ -114,7 +114,7 @@ type revid struct {
setupOutput func() error
getFrame func() []byte
sendClip func(clip []byte) error
rtmpInst rtmp.RTMPSession
rtmpInst rtmp.Session
mutex sync.Mutex
sendMutex sync.Mutex
currentBitrate int64
@ -229,7 +229,7 @@ func (r *revid) ChangeConfig(config Config) (err error) {
func (r *revid) Log(logType, m string) {
if r.config.Verbosity == Yes {
if r.config.Logger != nil {
r.config.Logger.Log("revid",logType, m)
r.config.Logger.Log("revid", logType, m)
return
}
fmt.Println(logType + ": " + m)
@ -286,7 +286,7 @@ func (r *revid) Stop() {
// wait for sending to finish
r.sendMutex.Lock()
defer r.sendMutex.Unlock()
r.rtmpInst.EndSession()
r.rtmpInst.Close()
r.isRunning = false
r.Log(Info, "Stopping generator!")
@ -355,7 +355,7 @@ func (r *revid) packClips() {
clip, err = r.ringBuffer.Get()
}
r.Log(Debug, "Finally got mem from ringbuffer!")
outputChanFullFlag = false
outputChanFullFlag = false
}
for r.isRunning {
@ -433,7 +433,7 @@ func (r *revid) outputClips() {
if r.config.Output == NativeRtmp {
r.Log(Debug, "Ending current rtmp session...")
r.rtmpInst.EndSession()
r.rtmpInst.Close()
}
if r.ringBuffer.Full() {
r.Log(Debug, "Flushing incoming data...")
@ -519,7 +519,7 @@ func (r *revid) sendClipToFfmpegRtmp(clip []byte) (err error) {
func (r *revid) sendClipToLibRtmp(clip []byte) (err error) {
r.sendMutex.Lock()
defer r.sendMutex.Unlock()
err = r.rtmpInst.WriteFrame(clip, uint(len(clip)))
err = r.rtmpInst.Write(clip)
return
}
@ -559,13 +559,13 @@ func (r *revid) setupOutputForFfmpegRtmp() error {
// setupOutputForLibRtmp sets up rtmp output using the wrapper for the c based
// librtmp library - makes connection and starts comms etc.
func (r *revid) setupOutputForLibRtmp() error {
r.rtmpInst = rtmp.NewRTMPSession(r.config.RtmpUrl, rtmpConnectionTimout)
r.rtmpInst = rtmp.NewSession(r.config.RtmpUrl, rtmpConnectionTimout)
err := r.rtmpInst.StartSession()
for noOfTries := 0; err != nil && noOfTries < rtmpConnectionMaxTries; noOfTries++ {
r.rtmpInst.EndSession()
r.rtmpInst.Close()
r.Log(Error, err.Error())
r.Log(Info, "Trying to establish rtmp connection again!")
r.rtmpInst = rtmp.NewRTMPSession(r.config.RtmpUrl, rtmpConnectionTimout)
r.rtmpInst = rtmp.NewSession(r.config.RtmpUrl, rtmpConnectionTimout)
err = r.rtmpInst.StartSession()
}
if err != nil {
@ -655,7 +655,7 @@ func (r *revid) setupInputForFile() error {
func (r *revid) testRtmp(delayTime uint) {
for {
time.Sleep(time.Duration(delayTime) * time.Millisecond)
r.rtmpInst.EndSession()
r.rtmpInst.Close()
r.rtmpInst.StartSession()
}
}

8
rtmp/Makefile Normal file
View File

@ -0,0 +1,8 @@
all:
@cd rtmp_c/librtmp; $(MAKE) all
install:
@cd rtmp_c/librtmp; $(MAKE) install
clean:
@cd rtmp_c; $(MAKE) clean

View File

@ -1,106 +0,0 @@
/*
NAME
RTMP.go
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE
RTMP.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses).
*/
package rtmp
// #cgo CFLAGS: -I/home/pi/go/src/bitbucket.org/AusOcean/av/rtmp/
// #cgo CFLAGS: -I/home/pi/go/src/bitbucket.org/ausocean/av/rtmp/rtmp_c/librtmp
// #cgo LDFLAGS: /home/pi/go/src/bitbucket.org/ausocean/av/rtmp/rtmp_c/librtmp/librtmp.a
// #cgo LDFLAGS: -lz
// #include <RTMPWrapper.h>
import "C"
import (
"errors"
_ "fmt"
"sync"
"unsafe"
"bitbucket.org/ausocean/av/tools"
)
// RTMPSession provides a crude interface for sending flv tags over rtmp
type RTMPSession interface {
StartSession() error
WriteFrame(data []byte, dataLength uint) error
EndSession() error
}
// rtmpSession provides parameters required for an rtmp communication session
type rtmpSession struct {
url string
timeout uint
running bool
mutex *sync.Mutex
}
// NewRTMPSession returns a new instance of an rtmpSession struct
func NewRTMPSession(url string, connectTimeout uint) (session *rtmpSession) {
session = new(rtmpSession)
session.url = url
session.timeout = connectTimeout
session.mutex = &sync.Mutex{}
return
}
// StartSession establishes an rtmp connection with the url passed into the
// constructor
func (s *rtmpSession) StartSession() error {
if !s.running {
if !tools.UintToBool(uint(C.RTMP_start_session(C.CString(s.url), C.uint(s.timeout)))) {
return errors.New("RTMP start error! Check rtmp log for details!")
}
s.running = true
} else {
return errors.New("Tried to start rtmp session, but already started!")
}
return nil
}
// WriteFrame writes a frame (flv tag) to the rtmp connection
// TODO: Remove mutex
func (s *rtmpSession) WriteFrame(data []byte, dataLength uint) error {
if s.running {
if !tools.UintToBool(uint(C.RTMP_write_frame((*C.char)(unsafe.Pointer(&data[0])), C.uint(dataLength)))) {
return errors.New("RTMP write error! Check rtmp log for details!")
}
} else {
return errors.New("RTMP session not running, can't write!")
}
return nil
}
// EndSession terminates the rtmp connection
func (s *rtmpSession) EndSession() error {
if s.running {
s.running = false
if !tools.UintToBool(uint(C.RTMP_end_session())) {
return errors.New("RTMP end session error! Check rtmp log for details!")
}
} else {
return errors.New("Tried to stop rtmp session, but not running!")
}
return nil
}

View File

@ -1,94 +0,0 @@
/*
NAME
RTMPWrapper.c
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE
RTMPWrapper.c is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses).
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "rtmp_c/librtmp/rtmp_sys.h"
#include "rtmp_c/librtmp/log.h"
#include "rtmp_c/librtmp/rtmp.h"
RTMP* rtmp = NULL;
unsigned int RTMP_start_session(char* url, uint connect_timeout){
printf("RTMP url: %s\n", url);
rtmp = RTMP_Alloc();
RTMP_Init(rtmp);
rtmp->Link.timeout = connect_timeout;
if (!RTMP_SetupURL(rtmp, url)) {
printf("Can't setup url!\n");
RTMP_Close(rtmp);
RTMP_Free(rtmp);
return 0;
}
RTMP_EnableWrite(rtmp);
RTMP_SetBufferMS(rtmp, 3600 * 1000);
if (!RTMP_Connect(rtmp, NULL)) {
printf("RTMP can't connect!\n");
RTMP_Close(rtmp);
RTMP_Free(rtmp);
return 0;
}
if (!RTMP_ConnectStream(rtmp, 0)) {
printf("RTMP can't connect stream!\n");
RTMP_Close(rtmp);
RTMP_Free(rtmp);
return 0;
}
return 1;
}
unsigned int RTMP_write_frame(char* data, uint data_length){
char* dataForC = malloc(data_length);
memcpy(dataForC,data,data_length);
if (!RTMP_IsConnected(rtmp)) {
printf("RTMP is not connected!\n");
free(dataForC);
return 0;
}
if (!RTMP_Write(rtmp, (const char*)data, data_length)) {
printf("RTMP write error!\n");
free(dataForC);
return 0;
}
free(dataForC);
return 1;
}
unsigned int RTMP_end_session(){
if (rtmp != NULL) {
RTMP_Close(rtmp);
RTMP_Free(rtmp);
rtmp = NULL;
return 1;
} else {
printf("Tried to end RTMP session, but not allocated yet!\n");
return 0;
}
}

View File

@ -1,39 +0,0 @@
/*
NAME
RtpToTsConverter.go - provides utilities for the conversion of Rtp packets
to equivalent MpegTs packets.
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon.milton@gmail.com>
LICENSE
RtpToTsConverter.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses).
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "rtmp_c/librtmp/rtmp_sys.h"
#include "rtmp_c/librtmp/log.h"
#include "rtmp_c/librtmp/rtmp.h"
int RTMP_start_session(char* url, uint connect_timeout);
int RTMP_write_frame(char* data, uint data_length);
int RTMP_end_session();

97
rtmp/rtmp.c Normal file
View File

@ -0,0 +1,97 @@
/*
NAME
rtmp.c
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon@ausocean.org>
Dan Kortschak <dan@ausocean.org>
LICENSE
RTMPWrapper.c is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <rtmp_sys.h>
#include <log.h>
#include <rtmp.h>
unsigned int start_session(RTMP* rtmp, char* url, uint connect_timeout) {
printf("RTMP url: %s\n", url);
rtmp = RTMP_Alloc();
RTMP_Init(rtmp);
rtmp->Link.timeout = connect_timeout;
if (!RTMP_SetupURL(rtmp, url)) {
printf("Can't setup url!\n");
RTMP_Close(rtmp);
RTMP_Free(rtmp);
return 1;
}
RTMP_EnableWrite(rtmp);
RTMP_SetBufferMS(rtmp, 3600 * 1000);
if (!RTMP_Connect(rtmp, NULL)) {
printf("RTMP can't connect!\n");
RTMP_Close(rtmp);
RTMP_Free(rtmp);
return 1;
}
if (!RTMP_ConnectStream(rtmp, 0)) {
printf("RTMP can't connect stream!\n");
RTMP_Close(rtmp);
RTMP_Free(rtmp);
return 1;
}
return 0;
}
unsigned int write_frame(RTMP* rtmp, char* data, uint data_length) {
char* dataForC = malloc(data_length);
memcpy(dataForC,data,data_length);
if (!RTMP_IsConnected(rtmp)) {
printf("RTMP is not connected!\n");
free(dataForC);
return 1;
}
if (!RTMP_Write(rtmp, (const char*)data, data_length)) {
printf("RTMP write error!\n");
free(dataForC);
return 1;
}
free(dataForC);
return 0;
}
unsigned int end_session(RTMP* rtmp) {
if (rtmp == NULL) {
printf("Tried to end RTMP session, but not allocated yet!\n");
return 1;
}
RTMP_Close(rtmp);
RTMP_Free(rtmp);
rtmp = NULL;
return 0;
}

109
rtmp/rtmp.go Normal file
View File

@ -0,0 +1,109 @@
/*
NAME
rtmp.go
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon@ausocean.org>
Dan Kortschak <dan@ausocean.org>
LICENSE
rtmp.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
package rtmp
/*
#cgo CFLAGS: -I/usr/local/include/librtmp
#cgo LDFLAGS: -lssl -lcrypto -lrtmp -lz
#include <stdlib.h>
#include <rtmp.h>
int start_session(RTMP* rtmp, char* url, uint connect_timeout);
int write_frame(RTMP* rtmp, char* data, uint data_length);
int end_session(RTMP* rtmp);
*/
import "C"
import (
"errors"
"unsafe"
)
// Session provides an interface for sending flv tags over rtmp.
type Session interface {
StartSession() error
Write([]byte) error
Close() error
}
// session provides parameters required for an rtmp communication session.
type session struct {
rtmp *C.RTMP
url string
timeout uint
running bool
}
var _ Session = (*session)(nil)
// NewSession returns a new session.
func NewSession(url string, connectTimeout uint) Session {
return &session{
url: url,
timeout: connectTimeout,
}
}
// StartSession establishes an rtmp connection with the url passed into the
// constructor
func (s *session) StartSession() error {
if s.running {
return errors.New("rtmp: attempt to start already running session")
}
if C.start_session(s.rtmp, C.CString(s.url), C.uint(s.timeout)) != 0 {
return errors.New("RTMP start error! Check rtmp log for details!")
}
s.running = true
return nil
}
// Write writes a frame (flv tag) to the rtmp connection
func (s *session) Write(data []byte) error {
if !s.running {
return errors.New("rtmp: attempt to write to non-running session")
}
if C.write_frame(s.rtmp, (*C.char)(unsafe.Pointer(&data[0])), C.uint(len(data))) != 0 {
return errors.New("RTMP write error! Check rtmp log for details!")
}
return nil
}
// Close terminates the rtmp connection
func (s *session) Close() error {
if !s.running {
return errors.New("Tried to stop rtmp session, but not running!")
}
s.running = false
if C.end_session(s.rtmp) != 0 {
return errors.New("RTMP end session error! Check rtmp log for details!")
}
return nil
}

View File

@ -1,19 +0,0 @@
package rtmp
/*
// #cgo CFLAGS: -I/home/saxon/Desktop/AusOcean/av/rtmp/
// #cgo CFLAGS: -I/home/saxon/Desktop/AusOcean/av/rtmp/rtmp_c/librtmp
// #cgo LDFLAGS: /home/saxon/Desktop/AusOcean/av/rtmp/rtmp_c/librtmp/librtmp.a
// #cgo LDFLAGS: -lssl -lcrypto -lz
// #include <testRtmp.h>
import "C"
const (
inputFile = "saxonOut.flv"
outputUrl = "rtmp://a.rtmp.youtube.com/live2/w44c-mkuu-aezg-ceb1"
)
func main(){
C.publish_using_write(C.CString(inputFile), C.CString(outputUrl))
}
*/

View File

@ -109,7 +109,7 @@ install: install_base $(SO_INST)
install_base: librtmp.a librtmp.pc
-mkdir -p $(INCDIR) $(LIBDIR)/pkgconfig $(MANDIR)/man3 $(SODIR)
cp amf.h http.h log.h rtmp.h $(INCDIR)
cp amf.h http.h log.h rtmp.h rtmp_sys.h $(INCDIR)
cp librtmp.a $(LIBDIR)
cp librtmp.pc $(LIBDIR)/pkgconfig
cp librtmp.3 $(MANDIR)/man3

View File

@ -1,379 +0,0 @@
/*
* @file send_flv_over_rtmp.c
* @author Akagi201
* @date 2015/01/01
*
* send local flv file to net server as a rtmp live stream.
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "rtmp_c/librtmp/rtmp_sys.h"
#include "rtmp_c/librtmp/log.h"
#include "rtmp_c/librtmp/rtmp.h"
#define HTON16(x) ((x>>8&0xff)|(x<<8&0xff00))
#define HTON24(x) ((x>>16&0xff)|(x<<16&0xff0000)|(x&0xff00))
#define HTON32(x) ((x>>24&0xff)|(x>>8&0xff00)|\
(x<<8&0xff0000)|(x<<24&0xff000000))
#define HTONTIME(x) ((x>>16&0xff)|(x<<16&0xff0000)|(x&0xff00)|(x&0xff000000))
/*read 1 byte*/
int ReadU8(uint32_t *u8, FILE *fp) {
if (fread(u8, 1, 1, fp) != 1) {
return 0;
}
return 1;
}
/*read 2 byte*/
int ReadU16(uint32_t *u16, FILE *fp) {
if (fread(u16, 2, 1, fp) != 1) {
return 0;
}
*u16 = HTON16(*u16);
return 1;
}
/*read 3 byte*/
int ReadU24(uint32_t *u24, FILE *fp) {
if (fread(u24, 3, 1, fp) != 1) {
return 0;
}
*u24 = HTON24(*u24);
return 1;
}
/*read 4 byte*/
int ReadU32(uint32_t *u32, FILE *fp) {
if (fread(u32, 4, 1, fp) != 1) {
return 0;
}
*u32 = HTON32(*u32);
return 1;
}
/*read 1 byte,and loopback 1 byte at once*/
int PeekU8(uint32_t *u8, FILE *fp) {
if (fread(u8, 1, 1, fp) != 1) {
return 0;
}
fseek(fp, -1, SEEK_CUR);
return 1;
}
/*read 4 byte and convert to time format*/
int ReadTime(uint32_t *utime, FILE *fp) {
if (fread(utime, 4, 1, fp) != 1) {
return 0;
}
*utime = HTONTIME(*utime);
return 1;
}
//Publish using RTMP_SendPacket()
int publish_using_packet(char* inputFile, char* outputUrl) {
RTMP *rtmp = NULL;
RTMPPacket *packet = NULL;
uint32_t start_time = 0;
uint32_t now_time = 0;
//the timestamp of the previous frame
long pre_frame_time = 0;
long lasttime = 0;
int b_next_is_key = 1;
uint32_t pre_tag_size = 0;
//packet attributes
uint32_t type = 0;
uint32_t datalength = 0;
uint32_t timestamp = 0;
uint32_t streamid = 0;
FILE *fp = NULL;
fp = fopen(inputFile, "rb");
if (NULL == fp) {
return -1;
}
/* set log level */
//RTMP_LogLevel loglvl=RTMP_LOGDEBUG;
//RTMP_LogSetLevel(loglvl);
rtmp = RTMP_Alloc();
RTMP_Init(rtmp);
//set connection timeout,default 30s
rtmp->Link.timeout = 5;
if (!RTMP_SetupURL(rtmp, outputUrl)) {
RTMP_Log(RTMP_LOGERROR, "SetupURL Err\n");
RTMP_Free(rtmp);
return -1;
}
//if unable,the AMF command would be 'play' instead of 'publish'
RTMP_EnableWrite(rtmp);
if (!RTMP_Connect(rtmp, NULL)) {
RTMP_Log(RTMP_LOGERROR, "Connect Err\n");
RTMP_Free(rtmp);
return -1;
}
if (!RTMP_ConnectStream(rtmp, 0)) {
RTMP_Log(RTMP_LOGERROR, "ConnectStream Err\n");
RTMP_Close(rtmp);
RTMP_Free(rtmp);
return -1;
}
packet = (RTMPPacket *) malloc(sizeof(RTMPPacket));
RTMPPacket_Alloc(packet, 1024 * 64);
RTMPPacket_Reset(packet);
packet->m_hasAbsTimestamp = 0;
packet->m_nChannel = 0x04;
packet->m_nInfoField2 = rtmp->m_stream_id;
//jump over FLV Header
fseek(fp, 9, SEEK_SET);
//jump over previousTagSizen
fseek(fp, 4, SEEK_CUR);
start_time = RTMP_GetTime();
while (1) {
if ((((now_time = RTMP_GetTime()) - start_time)
< (pre_frame_time)) && b_next_is_key) {
//wait for 1 sec if the send process is too fast
//this mechanism is not very good,need some improvement
if (pre_frame_time > lasttime) {
RTMP_LogPrintf("TimeStamp:%8lu ms\n", pre_frame_time);
lasttime = pre_frame_time;
}
sleep(1);
continue;
}
//not quite the same as FLV spec
if (!ReadU8(&type, fp)) {
break;
}
if (!ReadU24(&datalength, fp)) {
break;
}
if (!ReadTime(&timestamp, fp)) {
break;
}
if (!ReadU24(&streamid, fp)) {
break;
}
if (type != 0x08 && type != 0x09) {
//jump over non_audio and non_video frame
//jump over next previousTagSizen at the same time
fseek(fp, datalength + 4, SEEK_CUR);
continue;
}
if (fread(packet->m_body, 1, datalength, fp) != datalength) {
break;
}
packet->m_headerType = RTMP_PACKET_SIZE_LARGE;
packet->m_nTimeStamp = timestamp;
packet->m_packetType = type;
packet->m_nBodySize = datalength;
pre_frame_time = timestamp;
if (!RTMP_IsConnected(rtmp)) {
RTMP_Log(RTMP_LOGERROR, "rtmp is not connect\n");
break;
}
if (!RTMP_SendPacket(rtmp, packet, 0)) {
RTMP_Log(RTMP_LOGERROR, "Send Error\n");
break;
}
if (!ReadU32(&pre_tag_size, fp)) {
break;
}
if (!PeekU8(&type, fp)) {
break;
}
if (type == 0x09) {
if (fseek(fp, 11, SEEK_CUR) != 0) {
break;
}
if (!PeekU8(&type, fp)) {
break;
}
if (type == 0x17) {
b_next_is_key = 1;
} else {
b_next_is_key = 0;
}
fseek(fp, -11, SEEK_CUR);
}
}
if (fp != NULL) {
fclose(fp);
fp = NULL;
}
if (rtmp != NULL) {
RTMP_Close(rtmp);
RTMP_Free(rtmp);
rtmp = NULL;
}
if (packet != NULL) {
RTMPPacket_Free(packet);
free(packet);
packet = NULL;
}
return 0;
}
//Publish using RTMP_Write()
int publish_using_write(char* inputFile, char* outputUrl) {
uint32_t start_time = 0;
uint32_t now_time = 0;
uint32_t pre_frame_time = 0;
uint32_t lasttime = 0;
int b_next_is_key = 0;
char *p_file_buf = NULL;
//read from tag header
uint32_t type = 0;
uint32_t datalength = 0;
uint32_t timestamp = 0;
RTMP *rtmp = NULL;
FILE *fp = NULL;
fp = fopen(inputFile, "rb");
if (NULL == fp) {
RTMP_LogPrintf("Open File Error.\n");
return -1;
}
/* set log level */
//RTMP_LogLevel loglvl=RTMP_LOGDEBUG;
//RTMP_LogSetLevel(loglvl);
rtmp = RTMP_Alloc();
RTMP_Init(rtmp);
//set connection timeout,default 30s
rtmp->Link.timeout = 5;
if (!RTMP_SetupURL(rtmp, outputUrl)) {
RTMP_Log(RTMP_LOGERROR, "SetupURL Err\n");
RTMP_Free(rtmp);
return -1;
}
RTMP_EnableWrite(rtmp);
//1hour
RTMP_SetBufferMS(rtmp, 3600 * 1000);
if (!RTMP_Connect(rtmp, NULL)) {
RTMP_Log(RTMP_LOGERROR, "Connect Err\n");
RTMP_Free(rtmp);
return -1;
}
if (!RTMP_ConnectStream(rtmp, 0)) {
RTMP_Log(RTMP_LOGERROR, "ConnectStream Err\n");
RTMP_Close(rtmp);
RTMP_Free(rtmp);
return -1;
}
//jump over FLV Header
fseek(fp, 9, SEEK_SET);
//jump over previousTagSizen
fseek(fp, 4, SEEK_CUR);
start_time = RTMP_GetTime();
while (1) {
if ((((now_time = RTMP_GetTime()) - start_time)
< (pre_frame_time)) && b_next_is_key) {
//wait for 1 sec if the send process is too fast
//this mechanism is not very good,need some improvement
if (pre_frame_time > lasttime) {
RTMP_LogPrintf("TimeStamp:%8u ms\n", pre_frame_time);
lasttime = pre_frame_time;
}
sleep(1);
continue;
}
//jump over type
fseek(fp, 1, SEEK_CUR);
if (!ReadU24(&datalength, fp)) {
break;
}
if (!ReadTime(&timestamp, fp)) {
break;
}
//jump back
fseek(fp, -8, SEEK_CUR);
p_file_buf = (char *) malloc(11 + datalength + 4);
memset(p_file_buf, 0, 11 + datalength + 4);
if (fread(p_file_buf, 1, 11 + datalength + 4, fp) != (11 + datalength + 4)) {
break;
}
pre_frame_time = timestamp;
if (!RTMP_IsConnected(rtmp)) {
RTMP_Log(RTMP_LOGERROR, "rtmp is not connect\n");
break;
}
if (!RTMP_Write(rtmp, p_file_buf, 11 + datalength + 4)) {
RTMP_Log(RTMP_LOGERROR, "Rtmp Write Error\n");
break;
}
free(p_file_buf);
p_file_buf = NULL;
if (!PeekU8(&type, fp)) {
break;
}
if (0x09 == type) {
if (fseek(fp, 11, SEEK_CUR) != 0) {
break;
}
if (!PeekU8(&type, fp)) {
break;
}
if (type == 0x17) {
b_next_is_key = 1;
} else {
b_next_is_key = 0;
}
fseek(fp, -11, SEEK_CUR);
}
}
RTMP_LogPrintf("\nSend Data Over\n");
if (fp != NULL) {
fclose(fp);
fp = NULL;
}
if (rtmp != NULL) {
RTMP_Close(rtmp);
RTMP_Free(rtmp);
rtmp = NULL;
}
if (p_file_buf != NULL) {
free(p_file_buf);
p_file_buf = NULL;
}
return 0;
}