Working on getting rtmp to youtube. Have a testing file to start with

This commit is contained in:
Unknown 2018-02-08 17:51:43 +10:30
parent c8c9c3f0d2
commit ed14ebb9b9
8 changed files with 129 additions and 58 deletions

View File

@ -29,8 +29,8 @@ LICENSE
package mpegts package mpegts
import ( import (
"bitbucket.org/ausocean/av/tools" //"bitbucket.org/ausocean/av/tools"
//"../tools" "../tools"
"errors" "errors"
//"fmt" //"fmt"

View File

@ -32,14 +32,14 @@ import (
_"fmt" _"fmt"
"os" "os"
"bitbucket.org/ausocean/av/mpegts" //"bitbucket.org/ausocean/av/mpegts"
"bitbucket.org/ausocean/av/rtp" //"bitbucket.org/ausocean/av/rtp"
"bitbucket.org/ausocean/av/tools" //"bitbucket.org/ausocean/av/tools"
"bitbucket.org/ausocean/av/itut" //"bitbucket.org/ausocean/av/itut"
//"../mpegts" "../mpegts"
//"../rtp" "../rtp"
//"../tools" "../tools"
//"../itut" "../itut"
) )
type RtpToH264Converter interface { type RtpToH264Converter interface {

View File

@ -28,8 +28,8 @@ LICENSE
package parser package parser
import ( import (
"bitbucket.org/ausocean/av/itut" //"bitbucket.org/ausocean/av/itut"
//"../itut" "../itut"
"log" "log"
"sync" "sync"
_"time" _"time"

View File

@ -27,8 +27,8 @@ LICENSE
package pes package pes
import ( import (
"bitbucket.org/ausocean/av/tools" //"bitbucket.org/ausocean/av/tools"
//"../tools" "../tools"
) )
const ( const (

View File

@ -42,29 +42,30 @@ import (
"strconv" "strconv"
"time" "time"
"bitbucket.org/ausocean/av/parser" //"bitbucket.org/ausocean/av/parser"
"bitbucket.org/ausocean/av/tsgenerator" //"bitbucket.org/ausocean/av/tsgenerator"
//"../parser" "../parser"
//"../tsgenerator" "../tsgenerator"
"bitbucket.org/ausocean/av/ringbuffer" //"bitbucket.org/ausocean/av/ringbuffer"
"bitbucket.org/ausocean/utils/smartLogger" //"bitbucket.org/ausocean/utils/smartLogger"
//"../../utils/smartLogger" "../../utils/smartLogger"
//"../ringbuffer" "../ringbuffer"
) )
// Misc constants // Misc constants
const ( const (
clipDuration = 1 // s clipDuration = 1 // s
mp2tPacketSize = 188 // MPEG-TS packet size mp2tPacketSize = 188 // MPEG-TS packet size
mp2tMaxPackets = 2016 * clipDuration // # first multiple of 7 and 8 greater than 2000 mp2tMaxPackets = 2016 * clipDuration // # first multiple of 7 and 8 greater than 2000
ringBufferSize = 100 / clipDuration ringBufferSize = 100 / clipDuration
ringBufferElementSize = 1000000 ringBufferElementSize = 1000000
httpTimeOut = 5 // s httpTimeOut = 5 // s
packetsPerFrame = 7 packetsPerFrame = 7
h264BufferSize = 1000000 h264BufferSize = 1000000
bitrateTime = 60 bitrateTime = 60
mjpegParserInChanLen = 100000 mjpegParserInChanLen = 100000
ffmpegPath = "/home/$USER/bin/ffmpeg"
) )
// Log Types // Log Types
@ -82,8 +83,8 @@ type Config struct {
InputCodec uint8 InputCodec uint8
Output uint8 Output uint8
RtmpEncodingMethod uint8 RtmpEncodingMethod uint8
RtmpURL string RtmpUrl string
Bitrate string Bitrate string
OutputFileName string OutputFileName string
InputFileName string InputFileName string
Height string Height string
@ -109,9 +110,9 @@ const (
Mjpeg = 7 Mjpeg = 7
None = 8 None = 8
Mpegts = 9 Mpegts = 9
Rtmp = 10 Rtmp = 10
Ffmpeg = 11 Ffmpeg = 11
Revid = 12 Revid = 12
) )
// Default config settings // Default config settings
@ -120,9 +121,9 @@ const (
defaultWidth = "1280" defaultWidth = "1280"
defaultHeight = "720" defaultHeight = "720"
defaultIntraRefreshPeriod = "100" defaultIntraRefreshPeriod = "100"
defaultTimeout = "0" defaultTimeout = "0"
defaultQuantization = "35" defaultQuantization = "35"
defaultBitrate = "0" defaultBitrate = "0"
) )
// RevidInst provides methods to control a revidInst session; providing methods // RevidInst provides methods to control a revidInst session; providing methods
@ -148,7 +149,9 @@ type revidInst struct {
generator tsgenerator.TsGenerator generator tsgenerator.TsGenerator
parser parser.Parser parser parser.Parser
cmd *exec.Cmd cmd *exec.Cmd
ffmpegCmd *exec.Cmd
inputReader *bufio.Reader inputReader *bufio.Reader
ffmpegStdin io.WriteCloser
mjpegOutputChan chan []byte mjpegOutputChan chan []byte
} }
@ -233,7 +236,7 @@ func (r *revidInst) ChangeState(config Config) error {
if config.Quantization != "" { if config.Quantization != "" {
quantization, _ := strconv.Atoi(config.Quantization) quantization, _ := strconv.Atoi(config.Quantization)
if quantization > 0 || config.Bitrate == "" { if quantization > 0 || config.Bitrate == "" {
return errors.New("Bad bitrate or quantization for mjpeg input!") return errors.New("Bad bitrate or quantization for mjpeg input!")
} }
} }
case NothingDefined: case NothingDefined:
@ -253,10 +256,13 @@ func (r *revidInst) ChangeState(config Config) error {
switch config.RtmpEncodingMethod { switch config.RtmpEncodingMethod {
case Revid: case Revid:
case Ffmpeg: case Ffmpeg:
case NothingDefine: case NothingDefined:
r.Log(Warning, "No RTMP encoding method defined, defautling to ffmpeg!") r.Log(Warning, "No RTMP encoding method defined, defautling to ffmpeg!")
config.RtmpEncodingMethod = Ffmpeg config.RtmpEncodingMethod = Ffmpeg
} }
if config.RtmpUrl == "" {
return errors.New("Bad RTMP URL")
}
case NothingDefined: case NothingDefined:
r.Log(Warning, "No output defined, defaulting to httpOut!") r.Log(Warning, "No output defined, defaulting to httpOut!")
config.Output = HttpOut config.Output = HttpOut
@ -327,7 +333,6 @@ func (r *revidInst) ChangeState(config Config) error {
return errors.New("Bad quantization defined in config!") return errors.New("Bad quantization defined in config!")
} }
} }
r.config = config r.config = config
return nil return nil
} }
@ -356,7 +361,7 @@ func (r *revidInst) Start() {
return return
} }
r.Log(Info, "Starting Revid!") r.Log(Info, "Starting Revid!")
// Configure input
switch r.config.Input { switch r.config.Input {
case Raspivid: case Raspivid:
r.Log(Info, "Starting raspivid!") r.Log(Info, "Starting raspivid!")
@ -384,7 +389,6 @@ func (r *revidInst) Start() {
"-fps", r.config.FrameRate, "-fps", r.config.FrameRate,
) )
} }
stdout, _ := r.cmd.StdoutPipe() stdout, _ := r.cmd.StdoutPipe()
err := r.cmd.Start() err := r.cmd.Start()
r.inputReader = bufio.NewReader(stdout) r.inputReader = bufio.NewReader(stdout)
@ -407,8 +411,39 @@ func (r *revidInst) Start() {
r.Stop() r.Stop()
return return
} }
for i := range data { go func() = {
r.parser.GetInputChan() <- data[i] for i := range data {
r.parser.GetInputChan() <- data[i]
}
}{}
}
// Configure output
switch r.config.Output {
case Rtmp:
r.ffmpegCmd = exec.Command(ffmpegPath,
"-f", "h264",
"-r", r.config.FrameRate,
"-i", "-",
"-itsoffset", "5.5",
"-fflags", "nobuffer",
"-vcodec", "copy",
"-strict", "experimental",
"-f", "flv",
r.config.RtmpUrl,
)
err := r.ffmpegCmd.Start()
if err != nil {
r.Log(Error, err.Error())
r.Stop()
return
}
r.ffmpegStdin, err = r.ffmpegCmd.StdinPipe()
if err != nil {
r.Log(Error, err.Error())
r.Stop()
return
} }
} }
go r.readCamera() go r.readCamera()
@ -474,8 +509,10 @@ func (r *revidInst) packClips() {
for { for {
switch r.config.Packetization { switch r.config.Packetization {
case None: case None:
frame := <-r.mjpegOutputChan frame := <-r.mjpegOutputChan
upperBound := clipSize + len(frame) upperBound := clipSize + len(frame)
fmt.Printf("clipSize: %v\n len(frame): %v\n", clipSize, len(frame))
copy(clip[clipSize:upperBound], frame) copy(clip[clipSize:upperBound], frame)
packetCount++ packetCount++
clipSize += len(frame) clipSize += len(frame)
@ -515,6 +552,7 @@ func (r *revidInst) outputClips() {
bytes := 0 bytes := 0
delay := 0 delay := 0
for r.isRunning { for r.isRunning {
// Here we slow things down as much as we can to decrease cpu usage
switch { switch {
case r.ringBuffer.GetNoOfElements() < 2: case r.ringBuffer.GetNoOfElements() < 2:
delay++ delay++
@ -522,12 +560,15 @@ func (r *revidInst) outputClips() {
case delay > 10: case delay > 10:
delay -= 10 delay -= 10
} }
// If the ringbuffer has something we can read and send off
if clip, err := r.ringBuffer.Read(); err == nil { if clip, err := r.ringBuffer.Read(); err == nil {
r.Log(Debug, fmt.Sprintf("Delay is: %v\n", delay)) r.Log(Debug, fmt.Sprintf("Delay is: %v\n", delay))
r.Log(Debug, fmt.Sprintf("Ring buffer size: %v\n", r.ringBuffer.GetNoOfElements())) r.Log(Debug, fmt.Sprintf("Ring buffer size: %v\n", r.ringBuffer.GetNoOfElements()))
// Output clip to the output specified in the configuration struct
switch r.config.Output { switch r.config.Output {
case File: case File:
fmt.Println("")
r.outputFile.Write(clip) r.outputFile.Write(clip)
case HttpOut: case HttpOut:
bytes += len(clip) bytes += len(clip)
@ -536,12 +577,19 @@ func (r *revidInst) outputClips() {
r.Log(Warning, "Post failed trying again!") r.Log(Warning, "Post failed trying again!")
err = r.sendClipToHTTP(clip, r.config.HttpAddress) err = r.sendClipToHTTP(clip, r.config.HttpAddress)
} }
case Rtmp:
_, err := r.ffmpegStdin.Write(clip)
if err != nil {
r.Log(Error, err.Error())
}
default: default:
r.Log(Error, "No output defined!") r.Log(Error, "No output defined!")
} }
if err := r.ringBuffer.DoneReading(); err != nil { if err := r.ringBuffer.DoneReading(); err != nil {
r.Log(Error, err.Error()) r.Log(Error, err.Error())
} }
// Log some information regarding bitrate and ring buffer size if it's time
now = time.Now() now = time.Now()
deltaTime := now.Sub(prevTime) deltaTime := now.Sub(prevTime)
if deltaTime > time.Duration(bitrateTime)*time.Second { if deltaTime > time.Duration(bitrateTime)*time.Second {

View File

@ -71,7 +71,7 @@ func TestRaspividH264Input(t *testing.T){
time.Sleep(100*time.Second) time.Sleep(100*time.Second)
revidInst.Stop() revidInst.Stop()
} }
*/
// Test revidInst with a raspivid mjpeg input // Test revidInst with a raspivid mjpeg input
func TestRaspividMJPEGInput(t *testing.T){ func TestRaspividMJPEGInput(t *testing.T){
@ -94,3 +94,26 @@ func TestRaspividMJPEGInput(t *testing.T){
time.Sleep(20*time.Second) time.Sleep(20*time.Second)
revidInst.Stop() revidInst.Stop()
} }
*/
// Test revidInst with rtmp output
func TestRtmpOutput(t *testing.T){
config := Config{
Input: File,
InputFileName: "testInput.h264",
InputCodec: H264,
Output: Rtmp,
RtmpUrl: "-",
Width: "1280",
Height: "720",
FrameRate: "25",
}
revidInst, err := NewRevidInstance(config)
if err != nil {
t.Errorf("Should not of have got an error!: %v\n", err.Error())
return
}
revidInst.Start()
time.Sleep(60*time.Second)
revidInst.Stop()
}

View File

@ -31,8 +31,8 @@ package tools
import ( import (
_"os" _"os"
_"fmt" _"fmt"
"bitbucket.org/ausocean/av/rtp" //"bitbucket.org/ausocean/av/rtp"
//"../rtp" "../rtp"
) )
func BoolToByte(in bool) (out byte) { func BoolToByte(in bool) (out byte) {

View File

@ -31,14 +31,14 @@ package tsgenerator
import ( import (
_"fmt" _"fmt"
_"os" _"os"
"bitbucket.org/ausocean/av/mpegts" //"bitbucket.org/ausocean/av/mpegts"
"bitbucket.org/ausocean/av/pes" //"bitbucket.org/ausocean/av/pes"
"bitbucket.org/ausocean/av/tools" //"bitbucket.org/ausocean/av/tools"
"bitbucket.org/ausocean/av/rtp" //"bitbucket.org/ausocean/av/rtp"
//"../mpegts" "../mpegts"
//"../pes" "../pes"
//"../tools" "../tools"
//"../rtp" "../rtp"
) )
var ( var (