Got youtube streaming working to an extent

This commit is contained in:
Unknown 2018-02-09 16:53:06 +10:30
parent ed14ebb9b9
commit 871fb45b5b
3 changed files with 84 additions and 70 deletions

View File

@ -32,7 +32,7 @@ import (
"../itut"
"log"
"sync"
_"time"
_"fmt"
)
// h264 consts

View File

@ -59,13 +59,14 @@ const (
mp2tPacketSize = 188 // MPEG-TS packet size
mp2tMaxPackets = 2016 * clipDuration // # first multiple of 7 and 8 greater than 2000
ringBufferSize = 100 / clipDuration
ringBufferElementSize = 1000000
ringBufferElementSize = 10000000
maxClipSize = 100000
httpTimeOut = 5 // s
packetsPerFrame = 7
h264BufferSize = 1000000
bitrateTime = 60
mjpegParserInChanLen = 100000
ffmpegPath = "/home/$USER/bin/ffmpeg"
ffmpegPath = "/home/saxon/bin/ffmpeg"
)
// Log Types
@ -152,7 +153,7 @@ type revidInst struct {
ffmpegCmd *exec.Cmd
inputReader *bufio.Reader
ffmpegStdin io.WriteCloser
mjpegOutputChan chan []byte
outputChan chan []byte
}
// NewRevidInstance returns a pointer to a new revidInst with the desired
@ -172,13 +173,6 @@ func NewRevidInstance(config Config) (r *revidInst, err error) {
return nil, err
}
}
switch r.config.Input {
case File:
r.inputFile, err = os.Open(r.config.InputFileName)
if err != nil {
return nil, err
}
}
switch r.config.InputCodec {
case H264:
r.Log(Info, "Using H264 parser!")
@ -186,10 +180,10 @@ func NewRevidInstance(config Config) (r *revidInst, err error) {
case Mjpeg:
r.parser = parser.NewMJPEGParser(mjpegParserInChanLen)
}
r.mjpegOutputChan = make(chan []byte, 10000)
r.outputChan = make(chan []byte, 10000)
switch r.config.Packetization {
case None:
r.parser.SetOutputChan(r.mjpegOutputChan)
r.parser.SetOutputChan(r.outputChan)
case Mpegts:
frameRateAsInt, _ := strconv.Atoi(r.config.FrameRate)
r.generator = tsgenerator.NewTsGenerator(uint(frameRateAsInt))
@ -361,6 +355,39 @@ func (r *revidInst) Start() {
return
}
r.Log(Info, "Starting Revid!")
// Configure output
switch r.config.Output {
case Rtmp:
r.ffmpegCmd = exec.Command(ffmpegPath,
"-f", "h264",
"-r", r.config.FrameRate,
"-i", "-",
"-f", "lavfi",
"-i", "aevalsrc=0",
"-fflags", "nobuffer",
"-vcodec", "copy",
"-acodec", "aac",
"-map", "0:0",
"-map", "1:0",
"-strict", "experimental",
"-f", "flv",
r.config.RtmpUrl,
)
var err error
r.ffmpegStdin, err = r.ffmpegCmd.StdinPipe()
if err != nil {
r.Log(Error, err.Error())
r.Stop()
return
}
err = r.ffmpegCmd.Start()
if err != nil {
r.Log(Error, err.Error())
r.Stop()
return
}
}
// Configure input
switch r.config.Input {
case Raspivid:
@ -396,58 +423,12 @@ func (r *revidInst) Start() {
r.Log(Error, err.Error())
return
}
r.isRunning = true
go r.readCamera()
case File:
stats, err := r.inputFile.Stat()
if err != nil {
r.Log(Error, "Could not get input file stats!")
r.Stop()
return
}
data := make([]byte, stats.Size())
_, err = r.inputFile.Read(data)
if err != nil {
r.Log(Error, err.Error())
r.Stop()
return
}
go func() = {
for i := range data {
r.parser.GetInputChan() <- data[i]
}
}{}
go r.readFile()
}
// Configure output
switch r.config.Output {
case Rtmp:
r.ffmpegCmd = exec.Command(ffmpegPath,
"-f", "h264",
"-r", r.config.FrameRate,
"-i", "-",
"-itsoffset", "5.5",
"-fflags", "nobuffer",
"-vcodec", "copy",
"-strict", "experimental",
"-f", "flv",
r.config.RtmpUrl,
)
err := r.ffmpegCmd.Start()
if err != nil {
r.Log(Error, err.Error())
r.Stop()
return
}
r.ffmpegStdin, err = r.ffmpegCmd.StdinPipe()
if err != nil {
r.Log(Error, err.Error())
r.Stop()
return
}
}
go r.readCamera()
go r.outputClips()
r.isRunning = true
}
// readCamera reads data from the defined camera while the revidInst is running.
@ -469,6 +450,38 @@ func (r *revidInst) readCamera() {
r.Log(Info, "Out of reading routine!")
}
// readFile reads data from the defined file while the revidInst is running.
func (r *revidInst) readFile() {
for {
if len(r.parser.GetInputChan()) == 0 {
var err error
r.inputFile, err = os.Open(r.config.InputFileName)
if err != nil {
r.Log(Error, err.Error())
r.Stop()
return
}
stats, err := r.inputFile.Stat()
if err != nil {
r.Log(Error, "Could not get input file stats!")
r.Stop()
return
}
data := make([]byte, stats.Size())
_, err = r.inputFile.Read(data)
if err != nil {
r.Log(Error, err.Error())
r.Stop()
return
}
for i := range data {
r.parser.GetInputChan() <- data[i]
}
r.inputFile.Close()
}
}
}
// Stop halts any processing of video data from a camera
func (r *revidInst) Stop() {
if r.isRunning {
@ -495,8 +508,8 @@ func (r *revidInst) packClips() {
switch r.config.Packetization {
case None:
r.Log(Warning, "Clearing mjpeg chan!")
for len(r.mjpegOutputChan) > 0 {
<-(r.mjpegOutputChan)
for len(r.outputChan) > 0 {
<-(r.outputChan)
}
case Mpegts:
r.Log(Warning, "Clearing TS chan!")
@ -509,10 +522,8 @@ func (r *revidInst) packClips() {
for {
switch r.config.Packetization {
case None:
frame := <-r.mjpegOutputChan
frame := <-r.outputChan
upperBound := clipSize + len(frame)
fmt.Printf("clipSize: %v\n len(frame): %v\n", clipSize, len(frame))
copy(clip[clipSize:upperBound], frame)
packetCount++
clipSize += len(frame)
@ -529,7 +540,8 @@ func (r *revidInst) packClips() {
}
// send if (1) our buffer is full or (2) 1 second has elapsed and we have % packetsPerFrame
now = time.Now()
if now.Sub(prevTime) > clipDuration*time.Second && len(clip) > 0 {
if (now.Sub(prevTime) > clipDuration*time.Second && clipSize > 0 ) || clipSize > maxClipSize {
fmt.Println("DoneWriting!")
if err := r.ringBuffer.DoneWriting(clipSize); err != nil {
r.Log(Error, err.Error())
r.Log(Warning, "Dropping clip!")
@ -578,6 +590,7 @@ func (r *revidInst) outputClips() {
err = r.sendClipToHTTP(clip, r.config.HttpAddress)
}
case Rtmp:
fmt.Println("Outputting!")
_, err := r.ffmpegStdin.Write(clip)
if err != nil {
r.Log(Error, err.Error())

View File

@ -103,10 +103,11 @@ func TestRtmpOutput(t *testing.T){
InputFileName: "testInput.h264",
InputCodec: H264,
Output: Rtmp,
RtmpUrl: "-",
RtmpUrl: "rtmp://a.rtmp.youtube.com/live2/w44c-mkuu-aezg-ceb1",
Width: "1280",
Height: "720",
FrameRate: "25",
Packetization: None,
}
revidInst, err := NewRevidInstance(config)
if err != nil {
@ -114,6 +115,6 @@ func TestRtmpOutput(t *testing.T){
return
}
revidInst.Start()
time.Sleep(60*time.Second)
time.Sleep(120*time.Second)
revidInst.Stop()
}