diff --git a/flv/FLV.go b/flv/FLV.go index 446b39c4..4856b0ea 100644 --- a/flv/FLV.go +++ b/flv/FLV.go @@ -23,7 +23,7 @@ const ( DataHeaderLength = 5 NoTimestampExtension = 0 AACAudioFormat = 10 - + PCMAudioFormat = 0 ) type Header struct { @@ -43,7 +43,6 @@ func (h *Header) ToByteSlice() (output []byte) { } type VideoTag struct { - PrevTagSize uint32 TagType uint8 DataSize uint32 Timestamp uint32 @@ -53,15 +52,12 @@ type VideoTag struct { PacketType byte CompositionTime uint32 Data []byte + PrevTagSize uint32 } func (t *VideoTag) ToByteSlice() (output []byte) { output = make([]byte, 0, maxVideoTagSize) output = append(output, []byte{ - byte(t.PrevTagSize >> 24), - byte(t.PrevTagSize >> 16), - byte(t.PrevTagSize >> 8), - byte(t.PrevTagSize), byte(t.TagType), byte(t.DataSize >> 16), byte(t.DataSize >> 8), @@ -76,11 +72,16 @@ func (t *VideoTag) ToByteSlice() (output []byte) { byte(t.CompositionTime >> 16),byte(t.CompositionTime >> 8),byte(t.CompositionTime), }...) output = append(output, t.Data...) + output = append(output, []byte{ + byte(t.PrevTagSize >> 24), + byte(t.PrevTagSize >> 16), + byte(t.PrevTagSize >> 8), + byte(t.PrevTagSize), + }...) return } type AudioTag struct { - PrevTagSize uint32 TagType uint8 DataSize uint32 Timestamp uint32 @@ -90,15 +91,12 @@ type AudioTag struct { SoundSize bool SoundType bool Data []byte + PrevTagSize uint32 } func (t *AudioTag) ToByteSlice() (output []byte) { output = make([]byte, 0, maxVideoTagSize) output = append(output, []byte{ - byte(t.PrevTagSize >> 24), - byte(t.PrevTagSize >> 16), - byte(t.PrevTagSize >> 8), - byte(t.PrevTagSize), byte(t.TagType), byte(t.DataSize >> 16), byte(t.DataSize >> 8), @@ -111,5 +109,11 @@ func (t *AudioTag) ToByteSlice() (output []byte) { byte(t.SoundFormat << 4) | byte(t.SoundRate<<2) | byte(tools.BoolToByte(t.SoundSize)<<1) | byte(tools.BoolToByte(t.SoundType)), }...) output = append(output, t.Data...) + output = append(output, []byte{ + byte(t.PrevTagSize >> 24), + byte(t.PrevTagSize >> 16), + byte(t.PrevTagSize >> 8), + byte(t.PrevTagSize), + }...) return } diff --git a/generator/FLVGenerator.go b/generator/FLVGenerator.go index 323f7a86..9127d57c 100644 --- a/generator/FLVGenerator.go +++ b/generator/FLVGenerator.go @@ -30,12 +30,17 @@ package generator import ( "../flv" _"fmt" - "time" + _"time" ) const ( inputChanLength = 1000 outputChanLength = 1000 + audioSize = 18 + videoHeaderSize = 16 + interFrameCode = 1 + keyFrameCode = 5 + sequenceCode = 6 ) type flvGenerator struct { @@ -104,9 +109,9 @@ func isKeyFrame(frame []byte) bool { aByte = <-byteChannel nalType := aByte & 0x1F switch nalType { - case 1: + case interFrameCode: return false - case 5: + case keyFrameCode: return true case 6: return true @@ -152,12 +157,14 @@ func (g *flvGenerator) generate() { for { select { case videoFrame := <-g.inputChan: + var frameType byte if isKeyFrame(videoFrame) { frameType = flv.KeyFrameType } else { frameType = flv.InterFrameType } + var packetType byte if isSequenceHeader(videoFrame){ packetType = flv.SequenceHeader @@ -166,47 +173,58 @@ func (g *flvGenerator) generate() { } timeStamp := g.getNextTimestamp() - videoTag := flv.VideoTag{ - PrevTagSize: uint32(g.lastTagSize), - TagType: uint8(flv.VideoTagType), - DataSize: uint32(len(videoFrame)) + flv.DataHeaderLength, - Timestamp: timeStamp, - TimestampExtended: flv.NoTimestampExtension, - FrameType: frameType, - Codec: flv.H264, - PacketType: packetType, - CompositionTime: 0, - Data: videoFrame, - } - videoTagAsByteSlice := videoTag.ToByteSlice() - g.lastTagSize = len(videoTagAsByteSlice) - g.outputChan<-videoTagAsByteSlice - soundData := make([]byte, 10) - for i := range soundData { - if i == 0 { - soundData[i] = 1 - } else { - soundData[i] = 0 + if g.videoFlag { + videoTag := flv.VideoTag{ + TagType: uint8(flv.VideoTagType), + DataSize: uint32(len(videoFrame)) + flv.DataHeaderLength, + Timestamp: timeStamp, + TimestampExtended: flv.NoTimestampExtension, + FrameType: frameType, + Codec: flv.H264, + PacketType: packetType, + CompositionTime: 0, + Data: videoFrame, + PrevTagSize: uint32(videoHeaderSize+len(videoFrame)), } + videoTagAsByteSlice := videoTag.ToByteSlice() + g.outputChan<-videoTagAsByteSlice } - audioTag := flv.AudioTag{ - PrevTagSize: uint32(g.lastTagSize), - TagType: uint8(flv.AudioTagType), - DataSize: 7, - Timestamp: timeStamp, - TimestampExtended: flv.NoTimestampExtension, - SoundFormat: flv.AACAudioFormat, - SoundRate: 3, - SoundSize: true, - SoundType: true, - Data: []byte{0x00,0x12,0x08,0x56,0xe5,0x00}, - } - audioTagAsByteSlice := audioTag.ToByteSlice() - g.lastTagSize = len(audioTagAsByteSlice) - g.outputChan<-audioTagAsByteSlice - time.Sleep(60*time.Millisecond) + // TODO: Create some more constants + + if g.audioFlag { + audioTag := flv.AudioTag{ + TagType: uint8(flv.AudioTagType), + DataSize: 7, + Timestamp: timeStamp, + TimestampExtended: flv.NoTimestampExtension, + SoundFormat: flv.AACAudioFormat, + SoundRate: 3, + SoundSize: true, + SoundType: true, + Data: []byte{0x00,0x12,0x08,0x56,0xe5,0x00}, + PrevTagSize: uint32(audioSize), + } + audioTagAsByteSlice := audioTag.ToByteSlice() + g.outputChan<-audioTagAsByteSlice + + audioTag = flv.AudioTag{ + TagType: uint8(flv.AudioTagType), + DataSize: 21, + Timestamp: timeStamp, + TimestampExtended: flv.NoTimestampExtension, + SoundFormat: flv.AACAudioFormat, + SoundRate: 3, + SoundSize: true, + SoundType: true, + Data: []byte{0x01,0xdc,0x00,0x4c,0x61,0x76,0x63,0x35,0x38,0x2e,0x36, + 0x2e,0x31,0x30,0x32,0x00,0x02,0x30,0x40,0x0e,}, + PrevTagSize: uint32(22), + } + audioTagAsByteSlice = audioTag.ToByteSlice() + g.outputChan<-audioTagAsByteSlice + } } } } diff --git a/parser/H264Parser.go b/parser/H264Parser.go index 21d83b5a..4d341213 100644 --- a/parser/H264Parser.go +++ b/parser/H264Parser.go @@ -31,7 +31,7 @@ import ( //"bitbucket.org/ausocean/av/itut" "../itut" _"fmt" - _"time" + "time" ) type h264Parser struct { @@ -40,12 +40,14 @@ type h264Parser struct { parserOutputChanRef chan []byte userOutputChanRef chan []byte inputChan chan byte + delay uint } func NewH264Parser() (p *h264Parser) { p = new(h264Parser) p.isParsing = true p.inputChan = make(chan byte, 100000) + p.delay = 0 return } @@ -57,6 +59,10 @@ func (p *h264Parser)Start(){ go p.parse() } +func (p *h264Parser)SetDelay(delay uint){ + p.delay = delay +} + func (p *h264Parser)GetInputChan() chan byte { return p.inputChan } @@ -82,7 +88,7 @@ func (p *h264Parser)parse() { if ( aByte == 0x01 && i == 2 ) || ( aByte == 0x01 && i == 3 ) { if searchingForEnd { output := append(append(itut.StartCode1(),itut.AUD()...),outputBuffer[:len(outputBuffer)-(i+1)]...) - //time.Sleep(40*time.Millisecond) + time.Sleep(time.Duration(p.delay)*time.Millisecond) p.parserOutputChanRef<-output outputBuffer = outputBuffer[len(outputBuffer)-1-i:] searchingForEnd = false diff --git a/parser/MJPEGParser.go b/parser/MJPEGParser.go index 715c20f3..a65074fd 100644 --- a/parser/MJPEGParser.go +++ b/parser/MJPEGParser.go @@ -11,6 +11,7 @@ type mjpegParser struct { parserOutputChanRef chan []byte userOutputChanRef chan []byte inputChan chan byte + delay uint } func NewMJPEGParser(inputChanLen int) (p *mjpegParser){ @@ -28,6 +29,11 @@ func (p *mjpegParser)Start(){ go p.parse() } +func (p *mjpegParser)SetDelay(delay uint){ + p.delay = delay +} + + func (p *mjpegParser)GetInputChan() chan byte { return p.inputChan } diff --git a/parser/Parser.go b/parser/Parser.go index 8495b60c..89e27602 100644 --- a/parser/Parser.go +++ b/parser/Parser.go @@ -50,4 +50,5 @@ type Parser interface { GetInputChan() chan byte GetOutputChan() chan []byte SetOutputChan(achan chan []byte) + SetDelay(delay uint) } diff --git a/revid/RevidInstance.go b/revid/RevidInstance.go index 1a662689..f7749b7d 100644 --- a/revid/RevidInstance.go +++ b/revid/RevidInstance.go @@ -58,7 +58,7 @@ const ( clipDuration = 1 // s mp2tPacketSize = 188 // MPEG-TS packet size mp2tMaxPackets = 2016 * clipDuration // # first multiple of 7 and 8 greater than 2000 - ringBufferSize = 1000 / clipDuration + ringBufferSize = 10000 / clipDuration ringBufferElementSize = 10000000 maxClipSize = 100000 httpTimeOut = 5 // s @@ -437,6 +437,7 @@ func (r *revidInst) setupOutputForFfmpegRtmp() error { func (r *revidInst) setupOutputForLibRtmp() (err error) { r.rtmpInst = rtmp.NewRTMPSession(r.config.RtmpUrl, rtmpConnectionTimout) err = r.rtmpInst.StartSession() + //go r.testRtmp(5000) return } @@ -486,10 +487,20 @@ func (r *revidInst) setupInputForRaspivid() error { // Start invokes a revidInst to start processing video from a defined input func (r *revidInst) setupInputForFile() error { + fps,_ := strconv.Atoi(r.config.FrameRate) + r.parser.SetDelay( uint( float64(1000) / float64(fps) ) ) r.readFile() return nil } +func (r *revidInst)testRtmp(delayTime uint){ + for { + time.Sleep(time.Duration(delayTime)*time.Millisecond) + r.rtmpInst.EndSession() + r.rtmpInst.StartSession() + } +} + // readCamera reads data from the defined camera while the revidInst is running. func (r *revidInst) readCamera() { r.Log(Info, "Reading camera data!") diff --git a/revid/newOutput.flv b/revid/newOutput.flv deleted file mode 100644 index afce4beb..00000000 Binary files a/revid/newOutput.flv and /dev/null differ diff --git a/revid/out.mp4 b/revid/out.mp4 deleted file mode 100644 index 3c774417..00000000 Binary files a/revid/out.mp4 and /dev/null differ diff --git a/revid/out.ts b/revid/out.ts deleted file mode 100644 index f05b9430..00000000 Binary files a/revid/out.ts and /dev/null differ diff --git a/revid/pls.flv b/revid/pls.flv deleted file mode 100644 index 5f2333d5..00000000 Binary files a/revid/pls.flv and /dev/null differ diff --git a/revid/revid_test.go b/revid/revid_test.go index 078cc59c..9ed7235a 100644 --- a/revid/revid_test.go +++ b/revid/revid_test.go @@ -145,9 +145,12 @@ func TestFlvOutputFile(t *testing.T) { } */ +<<<<<<< Updated upstream +======= +>>>>>>> Stashed changes // Test h264 inputfile to flv format into rtmp using librtmp c wrapper func TestRtmpOutputUsingLibRtmp(t *testing.T){ config := Config{ @@ -167,6 +170,6 @@ func TestRtmpOutputUsingLibRtmp(t *testing.T){ return } revidInst.Start() - time.Sleep(30*time.Second) + time.Sleep(120*time.Second) revidInst.Stop() } diff --git a/revid/saxonOut.flv b/revid/saxonOut.flv deleted file mode 100644 index e96a1270..00000000 Binary files a/revid/saxonOut.flv and /dev/null differ diff --git a/rtmp/RTMP.go b/rtmp/RTMP.go index 5faec442..14108bd7 100644 --- a/rtmp/RTMP.go +++ b/rtmp/RTMP.go @@ -38,6 +38,7 @@ import ( "errors" "unsafe" _"fmt" + "sync" ) type RTMPSession interface { @@ -49,35 +50,55 @@ type RTMPSession interface { type rtmpSession struct { url string timeout uint + running bool + mutex *sync.Mutex } func NewRTMPSession(url string, connectTimeout uint) (session *rtmpSession){ session = new(rtmpSession) session.url = url session.timeout = connectTimeout + session.mutex = &sync.Mutex{} return } func (s *rtmpSession) StartSession() error { - if !uintToBool(uint(C.RTMP_start_session(C.CString(s.url), C.uint(s.timeout)))) { - return errors.New("RTMP start error! Check rtmp log for details!") + if !s.running { + if !uintToBool(uint(C.RTMP_start_session(C.CString(s.url), C.uint(s.timeout)))) { + return errors.New("RTMP start error! Check rtmp log for details!") + } + s.running = true + } else { + return errors.New("Tried to start rtmp session, but already started!") } return nil } func (s *rtmpSession) WriteFrame(data []byte, dataLength uint) error { - dataCopy := make([]byte, len(data)) - copy(dataCopy, data) - if !uintToBool(uint(C.RTMP_write_frame((*C.char)(unsafe.Pointer(&dataCopy[0])), C.uint(dataLength)))) { - return errors.New("RTMP write error! Check rtmp log for details!") + s.mutex.Lock() + defer s.mutex.Unlock() + if s.running { + dataCopy := make([]byte, len(data)) + copy(dataCopy, data) + if !uintToBool(uint(C.RTMP_write_frame((*C.char)(unsafe.Pointer(&dataCopy[0])), C.uint(dataLength)))) { + return errors.New("RTMP write error! Check rtmp log for details!") + } + } else { + return errors.New("RTMP session not running, can't write!") } return nil } func (s *rtmpSession) EndSession() error { - if !uintToBool(uint(C.RTMP_end_session())) { - return errors.New("RTMP end session error! Check rtmp log for details!") + if s.running { + if !uintToBool(uint(C.RTMP_end_session())) { + return errors.New("RTMP end session error! Check rtmp log for details!") + } + s.running = false + } else { + return errors.New("Tried to stop rtmp session, but not running!") } + return nil }