diff --git a/flv/FLV.go b/flv/FLV.go index a3fd8640..bfd8a130 100644 --- a/flv/FLV.go +++ b/flv/FLV.go @@ -14,11 +14,14 @@ const ( const ( VideoTagType = 9 + AudioTagType = 8 KeyFrameType = 1 H264 = 7 AVCNALU = 1 DataHeaderLength = 5 NoTimestampExtension = 0 + AACAudioFormat = 10 + ) type Header struct { @@ -75,16 +78,36 @@ func (t *VideoTag) ToByteSlice() (output []byte) { } type AudioTag struct { + PrevTagSize uint32 + TagType uint8 + DataSize uint32 + Timestamp uint32 + TimestampExtended uint32 SoundFormat uint8 - SoundRate uint8 - SoundSize uint8 - SoundType uint8 - Data []byte + SoundRate uint8 + SoundSize bool + SoundType bool + Data []byte } func (t *AudioTag) ToByteSlice() (output []byte) { - output = make([]byte, 0, maxAudioTagSize) - output = append(output, byte(t.SoundFormat<<4)|byte(t.SoundRate<<2)|byte(t.SoundSize<<1)|byte(t.SoundType)) + output = make([]byte, 0, maxVideoTagSize) + output = append(output, []byte{ + byte(t.PrevTagSize >> 24), + byte(t.PrevTagSize >> 16), + byte(t.PrevTagSize >> 8), + byte(t.PrevTagSize), + byte(t.TagType), + byte(t.DataSize >> 16), + byte(t.DataSize >> 8), + byte(t.DataSize), + byte(t.Timestamp >> 16), + byte(t.Timestamp >> 8), + byte(t.Timestamp), + byte(t.TimestampExtended), + 0x00, 0x00, 0x00, + byte(t.SoundFormat << 4) | byte(t.SoundRate<<2) | byte(tools.BoolToByte(t.SoundSize)<<1) | byte(tools.BoolToByte(t.SoundType)), + }...) output = append(output, t.Data...) return } diff --git a/generator/FLVGenerator.go b/generator/FLVGenerator.go index 693656e3..a29c965c 100644 --- a/generator/FLVGenerator.go +++ b/generator/FLVGenerator.go @@ -29,7 +29,8 @@ package generator import ( "../flv" - "fmt" + _"fmt" + "time" ) const ( @@ -93,15 +94,14 @@ func (g *flvGenerator) ResetTimestamp() { func (g *flvGenerator) generate() { g.GenHeader() for { - fmt.Println("in this loop") select { case videoFrame := <-g.inputChan: - fmt.Println("Got video frame!") - tag := flv.VideoTag{ + timeStamp := g.getNextTimestamp() + videoTag := flv.VideoTag{ PrevTagSize: uint32(g.lastTagSize), TagType: uint8(flv.VideoTagType), DataSize: uint32(len(videoFrame)) + flv.DataHeaderLength, - Timestamp: g.getNextTimestamp(), + Timestamp: timeStamp, TimestampExtended: flv.NoTimestampExtension, FrameType: flv.KeyFrameType, Codec: flv.H264, @@ -109,9 +109,35 @@ func (g *flvGenerator) generate() { CompositionTime: 0, Data: videoFrame, } - tagAsByteSlice := tag.ToByteSlice() - g.lastTagSize = len(tagAsByteSlice) - g.outputChan<-tagAsByteSlice + videoTagAsByteSlice := videoTag.ToByteSlice() + g.lastTagSize = len(videoTagAsByteSlice) + g.outputChan<-videoTagAsByteSlice + + soundData := make([]byte, 10) + for i := range soundData { + if i == 0 { + soundData[i] = 1 + } else { + soundData[i] = 0 + } + } + audioTag := flv.AudioTag{ + PrevTagSize: uint32(g.lastTagSize), + TagType: uint8(flv.AudioTagType), + DataSize: 1 + 10, + Timestamp: timeStamp, + TimestampExtended: flv.NoTimestampExtension, + SoundFormat: flv.AACAudioFormat, + SoundRate: 0, + SoundSize: false, + SoundType: false, + Data: soundData, + } + audioTagAsByteSlice := audioTag.ToByteSlice() + g.lastTagSize = len(audioTagAsByteSlice) + g.outputChan<-audioTagAsByteSlice + + time.Sleep(40*time.Millisecond) } } } diff --git a/parser/H264Parser.go b/parser/H264Parser.go index 430ecf74..c7c2c8dc 100644 --- a/parser/H264Parser.go +++ b/parser/H264Parser.go @@ -4,6 +4,7 @@ import ( //"bitbucket.org/ausocean/av/itut" "../itut" _"fmt" + _"time" ) type h264Parser struct { @@ -17,7 +18,7 @@ type h264Parser struct { func NewH264Parser() (p *h264Parser) { p = new(h264Parser) p.isParsing = true - p.inputChan = make(chan byte, 10000) + p.inputChan = make(chan byte, 100000) return } @@ -54,6 +55,7 @@ func (p *h264Parser)parse() { if ( aByte == 0x01 && i == 2 ) || ( aByte == 0x01 && i == 3 ) { if searchingForEnd { output := append(append(itut.StartCode1(),itut.AUD()...),outputBuffer[:len(outputBuffer)-(i+1)]...) + //time.Sleep(40*time.Millisecond) p.parserOutputChanRef<-output outputBuffer = outputBuffer[len(outputBuffer)-1-i:] searchingForEnd = false diff --git a/revid/RevidInstance.go b/revid/RevidInstance.go index 9bfd9d6d..1a662689 100644 --- a/revid/RevidInstance.go +++ b/revid/RevidInstance.go @@ -182,17 +182,17 @@ func (r *revidInst) ChangeState(config Config) error { } else { switch r.config.Packetization { case Mpegts: + r.Log(Info, "Using MPEGTS packetisation!") frameRateAsInt, _ := strconv.Atoi(r.config.FrameRate) r.generator = generator.NewTsGenerator(uint(frameRateAsInt)) case Flv: + r.Log(Info, "Using FLV packetisation!") frameRateAsInt, _ := strconv.Atoi(r.config.FrameRate) - r.generator = generator.NewFlvGenerator(false, true, uint(frameRateAsInt)) + r.generator = generator.NewFlvGenerator(true, true, uint(frameRateAsInt)) } r.getFrame = r.getFramePacketization r.parser.SetOutputChan(r.generator.GetInputChan()) - fmt.Println("about to start generator") r.generator.Start() - fmt.Println("generator started") } return nil } @@ -221,7 +221,11 @@ func (r *revidInst) Start() { return } r.Log(Info, "Starting Revid!") - r.setupOutput() + err := r.setupOutput() + if err != nil { + r.Log(Error, "Output setup didn't work!") + return + } go r.setupInput() go r.outputClips() r.isRunning = true @@ -279,9 +283,7 @@ func (r *revidInst) packClips() { r.flushData() } else { for { - fmt.Println("Getting frame") frame := r.getFrame() - fmt.Println("Got frame") lenOfFrame := len(frame) upperBound := clipSize + lenOfFrame copy(clip[clipSize:upperBound], frame) @@ -325,9 +327,14 @@ func (r *revidInst) outputClips() { bytes += len(clip) for err := r.sendClip(clip); err != nil; { r.Log(Error, err.Error()) - r.Log(Warning, "Send failed trying again!") - err = r.sendClip(clip) + if len(clip) >= 11 { + r.Log(Warning, "Send failed trying again!") + err = r.sendClip(clip) + } else { + break + } } + if err := r.ringBuffer.DoneReading(); err != nil { r.Log(Error, err.Error()) } @@ -342,7 +349,6 @@ func (r *revidInst) outputClips() { } } else { r.Log(Debug, err.Error()) - time.Sleep(1*time.Second) } } } @@ -382,7 +388,6 @@ func (r *revidInst) sendClipToHTTP(clip []byte) error { // Start invokes a revidInst to start processing video from a defined input // and packetising to a defined output. func (r *revidInst) sendClipToFfmpegRtmp(clip []byte) error { - fmt.Println("Outputting!") _, err := r.ffmpegStdin.Write(clip) if err != nil { return err @@ -506,32 +511,29 @@ func (r *revidInst) readCamera() { // readFile reads data from the defined file while the revidInst is running. func (r *revidInst) readFile() error { - if len(r.parser.GetInputChan()) == 0 { - var err error - r.inputFile, err = os.Open(r.config.InputFileName) - if err != nil { - r.Log(Error, err.Error()) - r.Stop() - return err - } - stats, err := r.inputFile.Stat() - if err != nil { - r.Log(Error, "Could not get input file stats!") - r.Stop() - return err - } - data := make([]byte, stats.Size()) - _, err = r.inputFile.Read(data) - if err != nil { - r.Log(Error, err.Error()) - r.Stop() - return err - } - for i := range data { - time.Sleep(10*time.Millisecond) - r.parser.GetInputChan() <- data[i] - } - r.inputFile.Close() + var err error + r.inputFile, err = os.Open(r.config.InputFileName) + if err != nil { + r.Log(Error, err.Error()) + r.Stop() + return err } + stats, err := r.inputFile.Stat() + if err != nil { + r.Log(Error, "Could not get input file stats!") + r.Stop() + return err + } + data := make([]byte, stats.Size()) + _, err = r.inputFile.Read(data) + if err != nil { + r.Log(Error, err.Error()) + r.Stop() + return err + } + for i := range data { + r.parser.GetInputChan() <- data[i] + } + r.inputFile.Close() return nil } diff --git a/revid/newOutput.flv b/revid/newOutput.flv new file mode 100644 index 00000000..afce4beb Binary files /dev/null and b/revid/newOutput.flv differ diff --git a/revid/revid_test.go b/revid/revid_test.go index 1cebc9df..c8df193a 100644 --- a/revid/revid_test.go +++ b/revid/revid_test.go @@ -122,7 +122,7 @@ func TestRtmpOutput(t *testing.T){ } */ -/* + // Test h264 inputfile to flv output files func TestFlvOutputFile(t *testing.T) { config := Config{ @@ -130,7 +130,7 @@ func TestFlvOutputFile(t *testing.T) { InputFileName: "testInput.h264", InputCodec: H264, Output: File, - OutputFileName: "testOutput.flv", + OutputFileName: "pls.flv", Packetization: Flv, FrameRate: "25", } @@ -140,11 +140,13 @@ func TestFlvOutputFile(t *testing.T) { return } revidInst.Start() - time.Sleep(5 * time.Second) + time.Sleep(30 * time.Second) revidInst.Stop() } -*/ + + +/* // Test h264 inputfile to flv format into rtmp using librtmp c wrapper func TestRtmpOutputUsingLibRtmp(t *testing.T){ config := Config{ @@ -154,9 +156,9 @@ func TestRtmpOutputUsingLibRtmp(t *testing.T){ Output: Rtmp, RtmpMethod: LibRtmp, RtmpUrl: "rtmp://a.rtmp.youtube.com/live2/w44c-mkuu-aezg-ceb1", - FramesPerClip: 1, + FramesPerClip: 2, Packetization: Flv, - FrameRate: "25", + FrameRate: "30", } revidInst, err := NewRevidInstance(config) if err != nil { @@ -167,3 +169,4 @@ func TestRtmpOutputUsingLibRtmp(t *testing.T){ time.Sleep(30*time.Second) revidInst.Stop() } +*/ diff --git a/rtmp/RTMP.go b/rtmp/RTMP.go index 392774bd..5faec442 100644 --- a/rtmp/RTMP.go +++ b/rtmp/RTMP.go @@ -36,8 +36,8 @@ import "C" import ( "errors" - _"unsafe" - "fmt" + "unsafe" + _"fmt" ) type RTMPSession interface { @@ -47,7 +47,6 @@ type RTMPSession interface { } type rtmpSession struct { - rtmp *C.struct_RTMP url string timeout uint } @@ -60,27 +59,23 @@ func NewRTMPSession(url string, connectTimeout uint) (session *rtmpSession){ } func (s *rtmpSession) StartSession() error { - fmt.Println("Starting session!") - /* - if !uintToBool(uint(C.RTMP_start_session(s.rtmp, C.CString(s.url), C.uint(s.timeout)))) { + if !uintToBool(uint(C.RTMP_start_session(C.CString(s.url), C.uint(s.timeout)))) { return errors.New("RTMP start error! Check rtmp log for details!") } - */ return nil } func (s *rtmpSession) WriteFrame(data []byte, dataLength uint) error { - fmt.Println("writing frame!") - /* - if !uintToBool(uint(C.RTMP_write_frame(s.rtmp, (*C.char)(unsafe.Pointer(&data[0])), C.uint(dataLength)))) { + dataCopy := make([]byte, len(data)) + copy(dataCopy, data) + if !uintToBool(uint(C.RTMP_write_frame((*C.char)(unsafe.Pointer(&dataCopy[0])), C.uint(dataLength)))) { return errors.New("RTMP write error! Check rtmp log for details!") } - */ return nil } func (s *rtmpSession) EndSession() error { - if !uintToBool(uint(C.RTMP_end_session(s.rtmp))) { + if !uintToBool(uint(C.RTMP_end_session())) { return errors.New("RTMP end session error! Check rtmp log for details!") } return nil diff --git a/rtmp/RTMPWrapper.c b/rtmp/RTMPWrapper.c index eac0c281..11301019 100644 --- a/rtmp/RTMPWrapper.c +++ b/rtmp/RTMPWrapper.c @@ -33,12 +33,15 @@ LICENSE #include "rtmp_c/librtmp/log.h" #include "rtmp_c/librtmp/rtmp.h" -unsigned int RTMP_start_session(RTMP* rtmp, char* url, uint connect_timeout){ +static RTMP* rtmp = NULL; + +unsigned int RTMP_start_session(char* url, uint connect_timeout){ + printf("RTMP url: %s\n", url); rtmp = RTMP_Alloc(); RTMP_Init(rtmp); rtmp->Link.timeout = connect_timeout; if (!RTMP_SetupURL(rtmp, url)) { - RTMP_Log(RTMP_LOGERROR, "SetupURL Err\n"); + printf("Can't setup url!\n"); RTMP_Free(rtmp); return 0; } @@ -46,13 +49,13 @@ unsigned int RTMP_start_session(RTMP* rtmp, char* url, uint connect_timeout){ RTMP_EnableWrite(rtmp); RTMP_SetBufferMS(rtmp, 3600 * 1000); if (!RTMP_Connect(rtmp, NULL)) { - RTMP_Log(RTMP_LOGERROR, "Connect Err\n"); + printf("RTMP can't connect!\n"); RTMP_Free(rtmp); return 0; } if (!RTMP_ConnectStream(rtmp, 0)) { - RTMP_Log(RTMP_LOGERROR, "ConnectStream Err\n"); + printf("RTMP can't connect stream!\n"); RTMP_Close(rtmp); RTMP_Free(rtmp); return 0; @@ -60,26 +63,26 @@ unsigned int RTMP_start_session(RTMP* rtmp, char* url, uint connect_timeout){ return 1; } -unsigned int RTMP_write_frame(RTMP* rtmp, char* data, uint data_length){ +unsigned int RTMP_write_frame(char* data, uint data_length){ if (!RTMP_IsConnected(rtmp)) { - RTMP_Log(RTMP_LOGERROR, "RTMP is not connected!\n"); + printf("RTMP is not connected!\n"); return 0; } - if (!RTMP_Write(rtmp, data, data_length)) { - RTMP_Log(RTMP_LOGERROR, "RTMP write error!\n"); + if (!RTMP_Write(rtmp, (const char*)data, data_length)) { + printf("RTMP write error!\n"); return 0; } return 1; } -unsigned int RTMP_end_session(RTMP* rtmp){ +unsigned int RTMP_end_session(){ if (rtmp != NULL) { RTMP_Close(rtmp); RTMP_Free(rtmp); rtmp = NULL; return 1; } else { - RTMP_Log(RTMP_LOGERROR, "Tried to end RTMP session, but not allocated yet!\n"); + printf("Tried to end RTMP session, but not allocated yet!\n"); return 0; } } diff --git a/rtmp/RTMPWrapper.h b/rtmp/RTMPWrapper.h index 7f17b42e..fbf73086 100644 --- a/rtmp/RTMPWrapper.h +++ b/rtmp/RTMPWrapper.h @@ -34,6 +34,6 @@ LICENSE #include "rtmp_c/librtmp/rtmp.h" -int RTMP_start_session(RTMP* rtmp, char* url, uint connect_timeout); -int RTMP_write_frame(RTMP* rtmp, char* data, uint data_length); -int RTMP_end_session(RTMP* rtmp); +int RTMP_start_session(char* url, uint connect_timeout); +int RTMP_write_frame(char* data, uint data_length); +int RTMP_end_session(); diff --git a/rtmp/barsandtone.flv b/rtmp/barsandtone.flv new file mode 100644 index 00000000..799d137e Binary files /dev/null and b/rtmp/barsandtone.flv differ diff --git a/rtmp/pls.flv b/rtmp/pls.flv new file mode 100644 index 00000000..2060453a Binary files /dev/null and b/rtmp/pls.flv differ diff --git a/rtmp/rtmpTesting b/rtmp/rtmpTesting new file mode 100755 index 00000000..1c4b566a Binary files /dev/null and b/rtmp/rtmpTesting differ diff --git a/rtmp/rtmpTesting.go b/rtmp/rtmpTesting.go new file mode 100644 index 00000000..fcdcca36 --- /dev/null +++ b/rtmp/rtmpTesting.go @@ -0,0 +1,19 @@ + +package main + + +// #cgo CFLAGS: -I/home/saxon/Desktop/AusOcean/av/rtmp/ +// #cgo CFLAGS: -I/home/saxon/Desktop/AusOcean/av/rtmp/rtmp_c/librtmp +// #cgo LDFLAGS: /home/saxon/Desktop/AusOcean/av/rtmp/rtmp_c/librtmp/librtmp.a +// #cgo LDFLAGS: -lssl -lcrypto -lz +// #include +import "C" + +const ( + inputFile = "sample.flv" + outputUrl = "rtmp://a.rtmp.youtube.com/live2/w44c-mkuu-aezg-ceb1" +) + +func main(){ + C.publish_using_write(C.CString(inputFile), C.CString(outputUrl)) +} diff --git a/rtmp/rtmp_c/librtmp/librtmp.a b/rtmp/rtmp_c/librtmp/librtmp.a index d27af549..a37ef8f9 100644 Binary files a/rtmp/rtmp_c/librtmp/librtmp.a and b/rtmp/rtmp_c/librtmp/librtmp.a differ diff --git a/rtmp/rtmp_c/librtmp/rtmp.c b/rtmp/rtmp_c/librtmp/rtmp.c index a2863b0e..15c0d0dd 100644 --- a/rtmp/rtmp_c/librtmp/rtmp.c +++ b/rtmp/rtmp_c/librtmp/rtmp.c @@ -5106,7 +5106,8 @@ RTMP_Write(RTMP *r, const char *buf, int size) if (!pkt->m_nBytesRead) { if (size < 11) { - /* FLV pkt too small */ + printf("size: %d\n", size); + printf("too small\n"); return 0; } diff --git a/rtmp/rtmp_c/librtmp/rtmp.o b/rtmp/rtmp_c/librtmp/rtmp.o index 5260b9cf..11585755 100644 Binary files a/rtmp/rtmp_c/librtmp/rtmp.o and b/rtmp/rtmp_c/librtmp/rtmp.o differ diff --git a/rtmp/rtmp_c/openssl-1.0.1g.tar.gz.md5 b/rtmp/rtmp_c/openssl-1.0.1g.tar.gz.md5 new file mode 100644 index 00000000..ab27bf7f --- /dev/null +++ b/rtmp/rtmp_c/openssl-1.0.1g.tar.gz.md5 @@ -0,0 +1 @@ +de62b43dfcd858e66a74bee1c834e959 diff --git a/rtmp/sample.flv b/rtmp/sample.flv new file mode 100644 index 00000000..af9b5c5c Binary files /dev/null and b/rtmp/sample.flv differ diff --git a/rtmp/test.flv b/rtmp/test.flv new file mode 100644 index 00000000..5763191a Binary files /dev/null and b/rtmp/test.flv differ diff --git a/revid/testOutput.flv b/rtmp/testOutput.flv similarity index 100% rename from revid/testOutput.flv rename to rtmp/testOutput.flv diff --git a/rtmp/testRtmp.h b/rtmp/testRtmp.h new file mode 100644 index 00000000..cfa371f0 --- /dev/null +++ b/rtmp/testRtmp.h @@ -0,0 +1,379 @@ +/* + * @file send_flv_over_rtmp.c + * @author Akagi201 + * @date 2015/01/01 + * + * send local flv file to net server as a rtmp live stream. + */ + + #include + #include + #include + #include + #include "rtmp_c/librtmp/rtmp_sys.h" + #include "rtmp_c/librtmp/log.h" + #include "rtmp_c/librtmp/rtmp.h" + +#define HTON16(x) ((x>>8&0xff)|(x<<8&0xff00)) +#define HTON24(x) ((x>>16&0xff)|(x<<16&0xff0000)|(x&0xff00)) +#define HTON32(x) ((x>>24&0xff)|(x>>8&0xff00)|\ + (x<<8&0xff0000)|(x<<24&0xff000000)) +#define HTONTIME(x) ((x>>16&0xff)|(x<<16&0xff0000)|(x&0xff00)|(x&0xff000000)) + +/*read 1 byte*/ +int ReadU8(uint32_t *u8, FILE *fp) { + if (fread(u8, 1, 1, fp) != 1) { + return 0; + } + return 1; +} + +/*read 2 byte*/ +int ReadU16(uint32_t *u16, FILE *fp) { + if (fread(u16, 2, 1, fp) != 1) { + return 0; + } + *u16 = HTON16(*u16); + return 1; +} + +/*read 3 byte*/ +int ReadU24(uint32_t *u24, FILE *fp) { + if (fread(u24, 3, 1, fp) != 1) { + return 0; + } + *u24 = HTON24(*u24); + return 1; +} + +/*read 4 byte*/ +int ReadU32(uint32_t *u32, FILE *fp) { + if (fread(u32, 4, 1, fp) != 1) { + return 0; + } + *u32 = HTON32(*u32); + return 1; +} + +/*read 1 byte,and loopback 1 byte at once*/ +int PeekU8(uint32_t *u8, FILE *fp) { + if (fread(u8, 1, 1, fp) != 1) { + return 0; + } + fseek(fp, -1, SEEK_CUR); + return 1; +} + +/*read 4 byte and convert to time format*/ +int ReadTime(uint32_t *utime, FILE *fp) { + if (fread(utime, 4, 1, fp) != 1) { + return 0; + } + *utime = HTONTIME(*utime); + return 1; +} + +//Publish using RTMP_SendPacket() +int publish_using_packet(char* inputFile, char* outputUrl) { + RTMP *rtmp = NULL; + RTMPPacket *packet = NULL; + uint32_t start_time = 0; + uint32_t now_time = 0; + //the timestamp of the previous frame + long pre_frame_time = 0; + long lasttime = 0; + int b_next_is_key = 1; + uint32_t pre_tag_size = 0; + + //packet attributes + uint32_t type = 0; + uint32_t datalength = 0; + uint32_t timestamp = 0; + uint32_t streamid = 0; + + FILE *fp = NULL; + fp = fopen(inputFile, "rb"); + if (NULL == fp) { + return -1; + } + + /* set log level */ + //RTMP_LogLevel loglvl=RTMP_LOGDEBUG; + //RTMP_LogSetLevel(loglvl); + + rtmp = RTMP_Alloc(); + RTMP_Init(rtmp); + //set connection timeout,default 30s + rtmp->Link.timeout = 5; + if (!RTMP_SetupURL(rtmp, outputUrl)) { + RTMP_Log(RTMP_LOGERROR, "SetupURL Err\n"); + RTMP_Free(rtmp); + return -1; + } + + //if unable,the AMF command would be 'play' instead of 'publish' + RTMP_EnableWrite(rtmp); + + if (!RTMP_Connect(rtmp, NULL)) { + RTMP_Log(RTMP_LOGERROR, "Connect Err\n"); + RTMP_Free(rtmp); + return -1; + } + + if (!RTMP_ConnectStream(rtmp, 0)) { + RTMP_Log(RTMP_LOGERROR, "ConnectStream Err\n"); + RTMP_Close(rtmp); + RTMP_Free(rtmp); + return -1; + } + + packet = (RTMPPacket *) malloc(sizeof(RTMPPacket)); + RTMPPacket_Alloc(packet, 1024 * 64); + RTMPPacket_Reset(packet); + + packet->m_hasAbsTimestamp = 0; + packet->m_nChannel = 0x04; + packet->m_nInfoField2 = rtmp->m_stream_id; + + //jump over FLV Header + fseek(fp, 9, SEEK_SET); + //jump over previousTagSizen + fseek(fp, 4, SEEK_CUR); + start_time = RTMP_GetTime(); + while (1) { + if ((((now_time = RTMP_GetTime()) - start_time) + < (pre_frame_time)) && b_next_is_key) { + //wait for 1 sec if the send process is too fast + //this mechanism is not very good,need some improvement + if (pre_frame_time > lasttime) { + RTMP_LogPrintf("TimeStamp:%8lu ms\n", pre_frame_time); + lasttime = pre_frame_time; + } + sleep(1); + continue; + } + + //not quite the same as FLV spec + if (!ReadU8(&type, fp)) { + break; + } + if (!ReadU24(&datalength, fp)) { + break; + } + if (!ReadTime(×tamp, fp)) { + break; + } + if (!ReadU24(&streamid, fp)) { + break; + } + + if (type != 0x08 && type != 0x09) { + //jump over non_audio and non_video frameļ¼Œ + //jump over next previousTagSizen at the same time + fseek(fp, datalength + 4, SEEK_CUR); + continue; + } + + if (fread(packet->m_body, 1, datalength, fp) != datalength) { + break; + } + + packet->m_headerType = RTMP_PACKET_SIZE_LARGE; + packet->m_nTimeStamp = timestamp; + packet->m_packetType = type; + packet->m_nBodySize = datalength; + pre_frame_time = timestamp; + + if (!RTMP_IsConnected(rtmp)) { + RTMP_Log(RTMP_LOGERROR, "rtmp is not connect\n"); + break; + } + if (!RTMP_SendPacket(rtmp, packet, 0)) { + RTMP_Log(RTMP_LOGERROR, "Send Error\n"); + break; + } + + if (!ReadU32(&pre_tag_size, fp)) { + break; + } + + if (!PeekU8(&type, fp)) { + break; + } + if (type == 0x09) { + if (fseek(fp, 11, SEEK_CUR) != 0) { + break; + } + if (!PeekU8(&type, fp)) { + break; + } + if (type == 0x17) { + b_next_is_key = 1; + } else { + b_next_is_key = 0; + } + + fseek(fp, -11, SEEK_CUR); + } + } + + if (fp != NULL) { + fclose(fp); + fp = NULL; + } + + if (rtmp != NULL) { + RTMP_Close(rtmp); + RTMP_Free(rtmp); + rtmp = NULL; + } + if (packet != NULL) { + RTMPPacket_Free(packet); + free(packet); + packet = NULL; + } + + return 0; +} + +//Publish using RTMP_Write() +int publish_using_write(char* inputFile, char* outputUrl) { + uint32_t start_time = 0; + uint32_t now_time = 0; + uint32_t pre_frame_time = 0; + uint32_t lasttime = 0; + int b_next_is_key = 0; + char *p_file_buf = NULL; + + //read from tag header + uint32_t type = 0; + uint32_t datalength = 0; + uint32_t timestamp = 0; + + RTMP *rtmp = NULL; + + FILE *fp = NULL; + fp = fopen(inputFile, "rb"); + if (NULL == fp) { + RTMP_LogPrintf("Open File Error.\n"); + return -1; + } + + /* set log level */ + //RTMP_LogLevel loglvl=RTMP_LOGDEBUG; + //RTMP_LogSetLevel(loglvl); + + rtmp = RTMP_Alloc(); + RTMP_Init(rtmp); + //set connection timeout,default 30s + rtmp->Link.timeout = 5; + if (!RTMP_SetupURL(rtmp, outputUrl)) { + RTMP_Log(RTMP_LOGERROR, "SetupURL Err\n"); + RTMP_Free(rtmp); + return -1; + } + + RTMP_EnableWrite(rtmp); + //1hour + RTMP_SetBufferMS(rtmp, 3600 * 1000); + if (!RTMP_Connect(rtmp, NULL)) { + RTMP_Log(RTMP_LOGERROR, "Connect Err\n"); + RTMP_Free(rtmp); + return -1; + } + + if (!RTMP_ConnectStream(rtmp, 0)) { + RTMP_Log(RTMP_LOGERROR, "ConnectStream Err\n"); + RTMP_Close(rtmp); + RTMP_Free(rtmp); + return -1; + } + + //jump over FLV Header + fseek(fp, 9, SEEK_SET); + //jump over previousTagSizen + fseek(fp, 4, SEEK_CUR); + start_time = RTMP_GetTime(); + while (1) { + if ((((now_time = RTMP_GetTime()) - start_time) + < (pre_frame_time)) && b_next_is_key) { + //wait for 1 sec if the send process is too fast + //this mechanism is not very good,need some improvement + if (pre_frame_time > lasttime) { + RTMP_LogPrintf("TimeStamp:%8u ms\n", pre_frame_time); + lasttime = pre_frame_time; + } + sleep(1); + continue; + } + + //jump over type + fseek(fp, 1, SEEK_CUR); + if (!ReadU24(&datalength, fp)) { + break; + } + if (!ReadTime(×tamp, fp)) { + break; + } + //jump back + fseek(fp, -8, SEEK_CUR); + + p_file_buf = (char *) malloc(11 + datalength + 4); + memset(p_file_buf, 0, 11 + datalength + 4); + if (fread(p_file_buf, 1, 11 + datalength + 4, fp) != (11 + datalength + 4)) { + break; + } + + pre_frame_time = timestamp; + + if (!RTMP_IsConnected(rtmp)) { + RTMP_Log(RTMP_LOGERROR, "rtmp is not connect\n"); + break; + } + if (!RTMP_Write(rtmp, p_file_buf, 11 + datalength + 4)) { + RTMP_Log(RTMP_LOGERROR, "Rtmp Write Error\n"); + break; + } + + free(p_file_buf); + p_file_buf = NULL; + + if (!PeekU8(&type, fp)) { + break; + } + if (0x09 == type) { + if (fseek(fp, 11, SEEK_CUR) != 0) { + break; + } + if (!PeekU8(&type, fp)) { + break; + } + if (type == 0x17) { + b_next_is_key = 1; + } else { + b_next_is_key = 0; + } + fseek(fp, -11, SEEK_CUR); + } + } + + RTMP_LogPrintf("\nSend Data Over\n"); + + if (fp != NULL) { + fclose(fp); + fp = NULL; + } + + if (rtmp != NULL) { + RTMP_Close(rtmp); + RTMP_Free(rtmp); + rtmp = NULL; + } + + if (p_file_buf != NULL) { + free(p_file_buf); + p_file_buf = NULL; + } + + return 0; +}