Auto stash before merge of "rtmpOutputFunctionality" and "origin/rtmpOutputFunctionality"

This commit is contained in:
Unknown 2018-02-19 15:36:13 +10:30
parent 182cfeef88
commit ed88df0110
13 changed files with 133 additions and 63 deletions

View File

@ -23,7 +23,7 @@ const (
DataHeaderLength = 5 DataHeaderLength = 5
NoTimestampExtension = 0 NoTimestampExtension = 0
AACAudioFormat = 10 AACAudioFormat = 10
PCMAudioFormat = 0
) )
type Header struct { type Header struct {
@ -43,7 +43,6 @@ func (h *Header) ToByteSlice() (output []byte) {
} }
type VideoTag struct { type VideoTag struct {
PrevTagSize uint32
TagType uint8 TagType uint8
DataSize uint32 DataSize uint32
Timestamp uint32 Timestamp uint32
@ -53,15 +52,12 @@ type VideoTag struct {
PacketType byte PacketType byte
CompositionTime uint32 CompositionTime uint32
Data []byte Data []byte
PrevTagSize uint32
} }
func (t *VideoTag) ToByteSlice() (output []byte) { func (t *VideoTag) ToByteSlice() (output []byte) {
output = make([]byte, 0, maxVideoTagSize) output = make([]byte, 0, maxVideoTagSize)
output = append(output, []byte{ output = append(output, []byte{
byte(t.PrevTagSize >> 24),
byte(t.PrevTagSize >> 16),
byte(t.PrevTagSize >> 8),
byte(t.PrevTagSize),
byte(t.TagType), byte(t.TagType),
byte(t.DataSize >> 16), byte(t.DataSize >> 16),
byte(t.DataSize >> 8), byte(t.DataSize >> 8),
@ -76,11 +72,16 @@ func (t *VideoTag) ToByteSlice() (output []byte) {
byte(t.CompositionTime >> 16),byte(t.CompositionTime >> 8),byte(t.CompositionTime), byte(t.CompositionTime >> 16),byte(t.CompositionTime >> 8),byte(t.CompositionTime),
}...) }...)
output = append(output, t.Data...) output = append(output, t.Data...)
output = append(output, []byte{
byte(t.PrevTagSize >> 24),
byte(t.PrevTagSize >> 16),
byte(t.PrevTagSize >> 8),
byte(t.PrevTagSize),
}...)
return return
} }
type AudioTag struct { type AudioTag struct {
PrevTagSize uint32
TagType uint8 TagType uint8
DataSize uint32 DataSize uint32
Timestamp uint32 Timestamp uint32
@ -90,15 +91,12 @@ type AudioTag struct {
SoundSize bool SoundSize bool
SoundType bool SoundType bool
Data []byte Data []byte
PrevTagSize uint32
} }
func (t *AudioTag) ToByteSlice() (output []byte) { func (t *AudioTag) ToByteSlice() (output []byte) {
output = make([]byte, 0, maxVideoTagSize) output = make([]byte, 0, maxVideoTagSize)
output = append(output, []byte{ output = append(output, []byte{
byte(t.PrevTagSize >> 24),
byte(t.PrevTagSize >> 16),
byte(t.PrevTagSize >> 8),
byte(t.PrevTagSize),
byte(t.TagType), byte(t.TagType),
byte(t.DataSize >> 16), byte(t.DataSize >> 16),
byte(t.DataSize >> 8), byte(t.DataSize >> 8),
@ -111,5 +109,11 @@ func (t *AudioTag) ToByteSlice() (output []byte) {
byte(t.SoundFormat << 4) | byte(t.SoundRate<<2) | byte(tools.BoolToByte(t.SoundSize)<<1) | byte(tools.BoolToByte(t.SoundType)), byte(t.SoundFormat << 4) | byte(t.SoundRate<<2) | byte(tools.BoolToByte(t.SoundSize)<<1) | byte(tools.BoolToByte(t.SoundType)),
}...) }...)
output = append(output, t.Data...) output = append(output, t.Data...)
output = append(output, []byte{
byte(t.PrevTagSize >> 24),
byte(t.PrevTagSize >> 16),
byte(t.PrevTagSize >> 8),
byte(t.PrevTagSize),
}...)
return return
} }

View File

@ -30,12 +30,17 @@ package generator
import ( import (
"../flv" "../flv"
_"fmt" _"fmt"
"time" _"time"
) )
const ( const (
inputChanLength = 1000 inputChanLength = 1000
outputChanLength = 1000 outputChanLength = 1000
audioSize = 18
videoHeaderSize = 16
interFrameCode = 1
keyFrameCode = 5
sequenceCode = 6
) )
type flvGenerator struct { type flvGenerator struct {
@ -104,9 +109,9 @@ func isKeyFrame(frame []byte) bool {
aByte = <-byteChannel aByte = <-byteChannel
nalType := aByte & 0x1F nalType := aByte & 0x1F
switch nalType { switch nalType {
case 1: case interFrameCode:
return false return false
case 5: case keyFrameCode:
return true return true
case 6: case 6:
return true return true
@ -152,12 +157,14 @@ func (g *flvGenerator) generate() {
for { for {
select { select {
case videoFrame := <-g.inputChan: case videoFrame := <-g.inputChan:
var frameType byte var frameType byte
if isKeyFrame(videoFrame) { if isKeyFrame(videoFrame) {
frameType = flv.KeyFrameType frameType = flv.KeyFrameType
} else { } else {
frameType = flv.InterFrameType frameType = flv.InterFrameType
} }
var packetType byte var packetType byte
if isSequenceHeader(videoFrame){ if isSequenceHeader(videoFrame){
packetType = flv.SequenceHeader packetType = flv.SequenceHeader
@ -166,47 +173,58 @@ func (g *flvGenerator) generate() {
} }
timeStamp := g.getNextTimestamp() timeStamp := g.getNextTimestamp()
videoTag := flv.VideoTag{
PrevTagSize: uint32(g.lastTagSize),
TagType: uint8(flv.VideoTagType),
DataSize: uint32(len(videoFrame)) + flv.DataHeaderLength,
Timestamp: timeStamp,
TimestampExtended: flv.NoTimestampExtension,
FrameType: frameType,
Codec: flv.H264,
PacketType: packetType,
CompositionTime: 0,
Data: videoFrame,
}
videoTagAsByteSlice := videoTag.ToByteSlice()
g.lastTagSize = len(videoTagAsByteSlice)
g.outputChan<-videoTagAsByteSlice
soundData := make([]byte, 10) if g.videoFlag {
for i := range soundData { videoTag := flv.VideoTag{
if i == 0 { TagType: uint8(flv.VideoTagType),
soundData[i] = 1 DataSize: uint32(len(videoFrame)) + flv.DataHeaderLength,
} else { Timestamp: timeStamp,
soundData[i] = 0 TimestampExtended: flv.NoTimestampExtension,
FrameType: frameType,
Codec: flv.H264,
PacketType: packetType,
CompositionTime: 0,
Data: videoFrame,
PrevTagSize: uint32(videoHeaderSize+len(videoFrame)),
} }
videoTagAsByteSlice := videoTag.ToByteSlice()
g.outputChan<-videoTagAsByteSlice
} }
audioTag := flv.AudioTag{
PrevTagSize: uint32(g.lastTagSize),
TagType: uint8(flv.AudioTagType),
DataSize: 7,
Timestamp: timeStamp,
TimestampExtended: flv.NoTimestampExtension,
SoundFormat: flv.AACAudioFormat,
SoundRate: 3,
SoundSize: true,
SoundType: true,
Data: []byte{0x00,0x12,0x08,0x56,0xe5,0x00},
}
audioTagAsByteSlice := audioTag.ToByteSlice()
g.lastTagSize = len(audioTagAsByteSlice)
g.outputChan<-audioTagAsByteSlice
time.Sleep(60*time.Millisecond) // TODO: Create some more constants
if g.audioFlag {
audioTag := flv.AudioTag{
TagType: uint8(flv.AudioTagType),
DataSize: 7,
Timestamp: timeStamp,
TimestampExtended: flv.NoTimestampExtension,
SoundFormat: flv.AACAudioFormat,
SoundRate: 3,
SoundSize: true,
SoundType: true,
Data: []byte{0x00,0x12,0x08,0x56,0xe5,0x00},
PrevTagSize: uint32(audioSize),
}
audioTagAsByteSlice := audioTag.ToByteSlice()
g.outputChan<-audioTagAsByteSlice
audioTag = flv.AudioTag{
TagType: uint8(flv.AudioTagType),
DataSize: 21,
Timestamp: timeStamp,
TimestampExtended: flv.NoTimestampExtension,
SoundFormat: flv.AACAudioFormat,
SoundRate: 3,
SoundSize: true,
SoundType: true,
Data: []byte{0x01,0xdc,0x00,0x4c,0x61,0x76,0x63,0x35,0x38,0x2e,0x36,
0x2e,0x31,0x30,0x32,0x00,0x02,0x30,0x40,0x0e,},
PrevTagSize: uint32(22),
}
audioTagAsByteSlice = audioTag.ToByteSlice()
g.outputChan<-audioTagAsByteSlice
}
} }
} }
} }

View File

@ -31,7 +31,7 @@ import (
//"bitbucket.org/ausocean/av/itut" //"bitbucket.org/ausocean/av/itut"
"../itut" "../itut"
_"fmt" _"fmt"
_"time" "time"
) )
type h264Parser struct { type h264Parser struct {
@ -40,12 +40,14 @@ type h264Parser struct {
parserOutputChanRef chan []byte parserOutputChanRef chan []byte
userOutputChanRef chan []byte userOutputChanRef chan []byte
inputChan chan byte inputChan chan byte
delay uint
} }
func NewH264Parser() (p *h264Parser) { func NewH264Parser() (p *h264Parser) {
p = new(h264Parser) p = new(h264Parser)
p.isParsing = true p.isParsing = true
p.inputChan = make(chan byte, 100000) p.inputChan = make(chan byte, 100000)
p.delay = 0
return return
} }
@ -57,6 +59,10 @@ func (p *h264Parser)Start(){
go p.parse() go p.parse()
} }
func (p *h264Parser)SetDelay(delay uint){
p.delay = delay
}
func (p *h264Parser)GetInputChan() chan byte { func (p *h264Parser)GetInputChan() chan byte {
return p.inputChan return p.inputChan
} }
@ -82,7 +88,7 @@ func (p *h264Parser)parse() {
if ( aByte == 0x01 && i == 2 ) || ( aByte == 0x01 && i == 3 ) { if ( aByte == 0x01 && i == 2 ) || ( aByte == 0x01 && i == 3 ) {
if searchingForEnd { if searchingForEnd {
output := append(append(itut.StartCode1(),itut.AUD()...),outputBuffer[:len(outputBuffer)-(i+1)]...) output := append(append(itut.StartCode1(),itut.AUD()...),outputBuffer[:len(outputBuffer)-(i+1)]...)
//time.Sleep(40*time.Millisecond) time.Sleep(time.Duration(p.delay)*time.Millisecond)
p.parserOutputChanRef<-output p.parserOutputChanRef<-output
outputBuffer = outputBuffer[len(outputBuffer)-1-i:] outputBuffer = outputBuffer[len(outputBuffer)-1-i:]
searchingForEnd = false searchingForEnd = false

View File

@ -11,6 +11,7 @@ type mjpegParser struct {
parserOutputChanRef chan []byte parserOutputChanRef chan []byte
userOutputChanRef chan []byte userOutputChanRef chan []byte
inputChan chan byte inputChan chan byte
delay uint
} }
func NewMJPEGParser(inputChanLen int) (p *mjpegParser){ func NewMJPEGParser(inputChanLen int) (p *mjpegParser){
@ -28,6 +29,11 @@ func (p *mjpegParser)Start(){
go p.parse() go p.parse()
} }
func (p *mjpegParser)SetDelay(delay uint){
p.delay = delay
}
func (p *mjpegParser)GetInputChan() chan byte { func (p *mjpegParser)GetInputChan() chan byte {
return p.inputChan return p.inputChan
} }

View File

@ -50,4 +50,5 @@ type Parser interface {
GetInputChan() chan byte GetInputChan() chan byte
GetOutputChan() chan []byte GetOutputChan() chan []byte
SetOutputChan(achan chan []byte) SetOutputChan(achan chan []byte)
SetDelay(delay uint)
} }

View File

@ -58,7 +58,7 @@ const (
clipDuration = 1 // s clipDuration = 1 // s
mp2tPacketSize = 188 // MPEG-TS packet size mp2tPacketSize = 188 // MPEG-TS packet size
mp2tMaxPackets = 2016 * clipDuration // # first multiple of 7 and 8 greater than 2000 mp2tMaxPackets = 2016 * clipDuration // # first multiple of 7 and 8 greater than 2000
ringBufferSize = 1000 / clipDuration ringBufferSize = 10000 / clipDuration
ringBufferElementSize = 10000000 ringBufferElementSize = 10000000
maxClipSize = 100000 maxClipSize = 100000
httpTimeOut = 5 // s httpTimeOut = 5 // s
@ -437,6 +437,7 @@ func (r *revidInst) setupOutputForFfmpegRtmp() error {
func (r *revidInst) setupOutputForLibRtmp() (err error) { func (r *revidInst) setupOutputForLibRtmp() (err error) {
r.rtmpInst = rtmp.NewRTMPSession(r.config.RtmpUrl, rtmpConnectionTimout) r.rtmpInst = rtmp.NewRTMPSession(r.config.RtmpUrl, rtmpConnectionTimout)
err = r.rtmpInst.StartSession() err = r.rtmpInst.StartSession()
//go r.testRtmp(5000)
return return
} }
@ -486,10 +487,20 @@ func (r *revidInst) setupInputForRaspivid() error {
// Start invokes a revidInst to start processing video from a defined input // Start invokes a revidInst to start processing video from a defined input
func (r *revidInst) setupInputForFile() error { func (r *revidInst) setupInputForFile() error {
fps,_ := strconv.Atoi(r.config.FrameRate)
r.parser.SetDelay( uint( float64(1000) / float64(fps) ) )
r.readFile() r.readFile()
return nil return nil
} }
func (r *revidInst)testRtmp(delayTime uint){
for {
time.Sleep(time.Duration(delayTime)*time.Millisecond)
r.rtmpInst.EndSession()
r.rtmpInst.StartSession()
}
}
// readCamera reads data from the defined camera while the revidInst is running. // readCamera reads data from the defined camera while the revidInst is running.
func (r *revidInst) readCamera() { func (r *revidInst) readCamera() {
r.Log(Info, "Reading camera data!") r.Log(Info, "Reading camera data!")

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -145,9 +145,12 @@ func TestFlvOutputFile(t *testing.T) {
} }
*/ */
<<<<<<< Updated upstream
=======
>>>>>>> Stashed changes
// Test h264 inputfile to flv format into rtmp using librtmp c wrapper // Test h264 inputfile to flv format into rtmp using librtmp c wrapper
func TestRtmpOutputUsingLibRtmp(t *testing.T){ func TestRtmpOutputUsingLibRtmp(t *testing.T){
config := Config{ config := Config{
@ -167,6 +170,6 @@ func TestRtmpOutputUsingLibRtmp(t *testing.T){
return return
} }
revidInst.Start() revidInst.Start()
time.Sleep(30*time.Second) time.Sleep(120*time.Second)
revidInst.Stop() revidInst.Stop()
} }

Binary file not shown.

View File

@ -38,6 +38,7 @@ import (
"errors" "errors"
"unsafe" "unsafe"
_"fmt" _"fmt"
"sync"
) )
type RTMPSession interface { type RTMPSession interface {
@ -49,35 +50,55 @@ type RTMPSession interface {
type rtmpSession struct { type rtmpSession struct {
url string url string
timeout uint timeout uint
running bool
mutex *sync.Mutex
} }
func NewRTMPSession(url string, connectTimeout uint) (session *rtmpSession){ func NewRTMPSession(url string, connectTimeout uint) (session *rtmpSession){
session = new(rtmpSession) session = new(rtmpSession)
session.url = url session.url = url
session.timeout = connectTimeout session.timeout = connectTimeout
session.mutex = &sync.Mutex{}
return return
} }
func (s *rtmpSession) StartSession() error { func (s *rtmpSession) StartSession() error {
if !uintToBool(uint(C.RTMP_start_session(C.CString(s.url), C.uint(s.timeout)))) { if !s.running {
return errors.New("RTMP start error! Check rtmp log for details!") if !uintToBool(uint(C.RTMP_start_session(C.CString(s.url), C.uint(s.timeout)))) {
return errors.New("RTMP start error! Check rtmp log for details!")
}
s.running = true
} else {
return errors.New("Tried to start rtmp session, but already started!")
} }
return nil return nil
} }
func (s *rtmpSession) WriteFrame(data []byte, dataLength uint) error { func (s *rtmpSession) WriteFrame(data []byte, dataLength uint) error {
dataCopy := make([]byte, len(data)) s.mutex.Lock()
copy(dataCopy, data) defer s.mutex.Unlock()
if !uintToBool(uint(C.RTMP_write_frame((*C.char)(unsafe.Pointer(&dataCopy[0])), C.uint(dataLength)))) { if s.running {
return errors.New("RTMP write error! Check rtmp log for details!") dataCopy := make([]byte, len(data))
copy(dataCopy, data)
if !uintToBool(uint(C.RTMP_write_frame((*C.char)(unsafe.Pointer(&dataCopy[0])), C.uint(dataLength)))) {
return errors.New("RTMP write error! Check rtmp log for details!")
}
} else {
return errors.New("RTMP session not running, can't write!")
} }
return nil return nil
} }
func (s *rtmpSession) EndSession() error { func (s *rtmpSession) EndSession() error {
if !uintToBool(uint(C.RTMP_end_session())) { if s.running {
return errors.New("RTMP end session error! Check rtmp log for details!") if !uintToBool(uint(C.RTMP_end_session())) {
return errors.New("RTMP end session error! Check rtmp log for details!")
}
s.running = false
} else {
return errors.New("Tried to stop rtmp session, but not running!")
} }
return nil return nil
} }