diff --git a/mpegts/MpegTs.go b/mpegts/MpegTs.go index 3bf71fdf..26833d9e 100644 --- a/mpegts/MpegTs.go +++ b/mpegts/MpegTs.go @@ -29,8 +29,8 @@ LICENSE package mpegts import ( - "bitbucket.org/ausocean/av/tools" - //"../tools" + //"bitbucket.org/ausocean/av/tools" + "../tools" "errors" //"fmt" diff --git a/parser/H264Writer.go b/parser/H264Writer.go index a6989cd5..7874a567 100644 --- a/parser/H264Writer.go +++ b/parser/H264Writer.go @@ -32,14 +32,14 @@ import ( _"fmt" "os" - "bitbucket.org/ausocean/av/mpegts" - "bitbucket.org/ausocean/av/rtp" - "bitbucket.org/ausocean/av/tools" - "bitbucket.org/ausocean/av/itut" - //"../mpegts" - //"../rtp" - //"../tools" - //"../itut" + //"bitbucket.org/ausocean/av/mpegts" + //"bitbucket.org/ausocean/av/rtp" + //"bitbucket.org/ausocean/av/tools" + //"bitbucket.org/ausocean/av/itut" + "../mpegts" + "../rtp" + "../tools" + "../itut" ) type RtpToH264Converter interface { diff --git a/parser/Parser.go b/parser/Parser.go index 63a88e77..93db3ef5 100644 --- a/parser/Parser.go +++ b/parser/Parser.go @@ -28,8 +28,8 @@ LICENSE package parser import ( - "bitbucket.org/ausocean/av/itut" - //"../itut" + //"bitbucket.org/ausocean/av/itut" + "../itut" "log" "sync" _"time" diff --git a/pes/Pes.go b/pes/Pes.go index 413c1732..c558d015 100644 --- a/pes/Pes.go +++ b/pes/Pes.go @@ -27,8 +27,8 @@ LICENSE package pes import ( - "bitbucket.org/ausocean/av/tools" - //"../tools" + //"bitbucket.org/ausocean/av/tools" + "../tools" ) const ( diff --git a/revid/RevidInstance.go b/revid/RevidInstance.go index 3c60e5a6..ae5d1cdd 100644 --- a/revid/RevidInstance.go +++ b/revid/RevidInstance.go @@ -42,29 +42,30 @@ import ( "strconv" "time" - "bitbucket.org/ausocean/av/parser" - "bitbucket.org/ausocean/av/tsgenerator" - //"../parser" - //"../tsgenerator" + //"bitbucket.org/ausocean/av/parser" + //"bitbucket.org/ausocean/av/tsgenerator" + "../parser" + "../tsgenerator" - "bitbucket.org/ausocean/av/ringbuffer" - "bitbucket.org/ausocean/utils/smartLogger" - //"../../utils/smartLogger" - //"../ringbuffer" + //"bitbucket.org/ausocean/av/ringbuffer" + //"bitbucket.org/ausocean/utils/smartLogger" + "../../utils/smartLogger" + "../ringbuffer" ) // Misc constants const ( - clipDuration = 1 // s - mp2tPacketSize = 188 // MPEG-TS packet size - mp2tMaxPackets = 2016 * clipDuration // # first multiple of 7 and 8 greater than 2000 - ringBufferSize = 100 / clipDuration + clipDuration = 1 // s + mp2tPacketSize = 188 // MPEG-TS packet size + mp2tMaxPackets = 2016 * clipDuration // # first multiple of 7 and 8 greater than 2000 + ringBufferSize = 100 / clipDuration ringBufferElementSize = 1000000 - httpTimeOut = 5 // s - packetsPerFrame = 7 - h264BufferSize = 1000000 - bitrateTime = 60 - mjpegParserInChanLen = 100000 + httpTimeOut = 5 // s + packetsPerFrame = 7 + h264BufferSize = 1000000 + bitrateTime = 60 + mjpegParserInChanLen = 100000 + ffmpegPath = "/home/$USER/bin/ffmpeg" ) // Log Types @@ -82,8 +83,8 @@ type Config struct { InputCodec uint8 Output uint8 RtmpEncodingMethod uint8 - RtmpURL string - Bitrate string + RtmpUrl string + Bitrate string OutputFileName string InputFileName string Height string @@ -109,9 +110,9 @@ const ( Mjpeg = 7 None = 8 Mpegts = 9 - Rtmp = 10 - Ffmpeg = 11 - Revid = 12 + Rtmp = 10 + Ffmpeg = 11 + Revid = 12 ) // Default config settings @@ -120,9 +121,9 @@ const ( defaultWidth = "1280" defaultHeight = "720" defaultIntraRefreshPeriod = "100" - defaultTimeout = "0" - defaultQuantization = "35" - defaultBitrate = "0" + defaultTimeout = "0" + defaultQuantization = "35" + defaultBitrate = "0" ) // RevidInst provides methods to control a revidInst session; providing methods @@ -148,7 +149,9 @@ type revidInst struct { generator tsgenerator.TsGenerator parser parser.Parser cmd *exec.Cmd + ffmpegCmd *exec.Cmd inputReader *bufio.Reader + ffmpegStdin io.WriteCloser mjpegOutputChan chan []byte } @@ -233,7 +236,7 @@ func (r *revidInst) ChangeState(config Config) error { if config.Quantization != "" { quantization, _ := strconv.Atoi(config.Quantization) if quantization > 0 || config.Bitrate == "" { - return errors.New("Bad bitrate or quantization for mjpeg input!") + return errors.New("Bad bitrate or quantization for mjpeg input!") } } case NothingDefined: @@ -253,10 +256,13 @@ func (r *revidInst) ChangeState(config Config) error { switch config.RtmpEncodingMethod { case Revid: case Ffmpeg: - case NothingDefine: + case NothingDefined: r.Log(Warning, "No RTMP encoding method defined, defautling to ffmpeg!") config.RtmpEncodingMethod = Ffmpeg } + if config.RtmpUrl == "" { + return errors.New("Bad RTMP URL") + } case NothingDefined: r.Log(Warning, "No output defined, defaulting to httpOut!") config.Output = HttpOut @@ -327,7 +333,6 @@ func (r *revidInst) ChangeState(config Config) error { return errors.New("Bad quantization defined in config!") } } - r.config = config return nil } @@ -356,7 +361,7 @@ func (r *revidInst) Start() { return } r.Log(Info, "Starting Revid!") - + // Configure input switch r.config.Input { case Raspivid: r.Log(Info, "Starting raspivid!") @@ -384,7 +389,6 @@ func (r *revidInst) Start() { "-fps", r.config.FrameRate, ) } - stdout, _ := r.cmd.StdoutPipe() err := r.cmd.Start() r.inputReader = bufio.NewReader(stdout) @@ -407,8 +411,39 @@ func (r *revidInst) Start() { r.Stop() return } - for i := range data { - r.parser.GetInputChan() <- data[i] + go func() = { + for i := range data { + r.parser.GetInputChan() <- data[i] + } + }{} + + } + + // Configure output + switch r.config.Output { + case Rtmp: + r.ffmpegCmd = exec.Command(ffmpegPath, + "-f", "h264", + "-r", r.config.FrameRate, + "-i", "-", + "-itsoffset", "5.5", + "-fflags", "nobuffer", + "-vcodec", "copy", + "-strict", "experimental", + "-f", "flv", + r.config.RtmpUrl, + ) + err := r.ffmpegCmd.Start() + if err != nil { + r.Log(Error, err.Error()) + r.Stop() + return + } + r.ffmpegStdin, err = r.ffmpegCmd.StdinPipe() + if err != nil { + r.Log(Error, err.Error()) + r.Stop() + return } } go r.readCamera() @@ -474,8 +509,10 @@ func (r *revidInst) packClips() { for { switch r.config.Packetization { case None: + frame := <-r.mjpegOutputChan upperBound := clipSize + len(frame) + fmt.Printf("clipSize: %v\n len(frame): %v\n", clipSize, len(frame)) copy(clip[clipSize:upperBound], frame) packetCount++ clipSize += len(frame) @@ -515,6 +552,7 @@ func (r *revidInst) outputClips() { bytes := 0 delay := 0 for r.isRunning { + // Here we slow things down as much as we can to decrease cpu usage switch { case r.ringBuffer.GetNoOfElements() < 2: delay++ @@ -522,12 +560,15 @@ func (r *revidInst) outputClips() { case delay > 10: delay -= 10 } + + // If the ringbuffer has something we can read and send off if clip, err := r.ringBuffer.Read(); err == nil { r.Log(Debug, fmt.Sprintf("Delay is: %v\n", delay)) r.Log(Debug, fmt.Sprintf("Ring buffer size: %v\n", r.ringBuffer.GetNoOfElements())) + + // Output clip to the output specified in the configuration struct switch r.config.Output { case File: - fmt.Println("") r.outputFile.Write(clip) case HttpOut: bytes += len(clip) @@ -536,12 +577,19 @@ func (r *revidInst) outputClips() { r.Log(Warning, "Post failed trying again!") err = r.sendClipToHTTP(clip, r.config.HttpAddress) } + case Rtmp: + _, err := r.ffmpegStdin.Write(clip) + if err != nil { + r.Log(Error, err.Error()) + } default: r.Log(Error, "No output defined!") } if err := r.ringBuffer.DoneReading(); err != nil { r.Log(Error, err.Error()) } + + // Log some information regarding bitrate and ring buffer size if it's time now = time.Now() deltaTime := now.Sub(prevTime) if deltaTime > time.Duration(bitrateTime)*time.Second { diff --git a/revid/revid_test.go b/revid/revid_test.go index dc6b8731..90ed1588 100644 --- a/revid/revid_test.go +++ b/revid/revid_test.go @@ -71,7 +71,7 @@ func TestRaspividH264Input(t *testing.T){ time.Sleep(100*time.Second) revidInst.Stop() } -*/ + // Test revidInst with a raspivid mjpeg input func TestRaspividMJPEGInput(t *testing.T){ @@ -94,3 +94,26 @@ func TestRaspividMJPEGInput(t *testing.T){ time.Sleep(20*time.Second) revidInst.Stop() } +*/ + +// Test revidInst with rtmp output +func TestRtmpOutput(t *testing.T){ + config := Config{ + Input: File, + InputFileName: "testInput.h264", + InputCodec: H264, + Output: Rtmp, + RtmpUrl: "-", + Width: "1280", + Height: "720", + FrameRate: "25", + } + revidInst, err := NewRevidInstance(config) + if err != nil { + t.Errorf("Should not of have got an error!: %v\n", err.Error()) + return + } + revidInst.Start() + time.Sleep(60*time.Second) + revidInst.Stop() +} diff --git a/tools/helpers.go b/tools/helpers.go index b5951b2b..638de03e 100644 --- a/tools/helpers.go +++ b/tools/helpers.go @@ -31,8 +31,8 @@ package tools import ( _"os" _"fmt" - "bitbucket.org/ausocean/av/rtp" - //"../rtp" + //"bitbucket.org/ausocean/av/rtp" + "../rtp" ) func BoolToByte(in bool) (out byte) { diff --git a/tsgenerator/TsGenerator.go b/tsgenerator/TsGenerator.go index fe7d0624..381d110e 100644 --- a/tsgenerator/TsGenerator.go +++ b/tsgenerator/TsGenerator.go @@ -31,14 +31,14 @@ package tsgenerator import ( _"fmt" _"os" - "bitbucket.org/ausocean/av/mpegts" - "bitbucket.org/ausocean/av/pes" - "bitbucket.org/ausocean/av/tools" - "bitbucket.org/ausocean/av/rtp" - //"../mpegts" - //"../pes" - //"../tools" - //"../rtp" + //"bitbucket.org/ausocean/av/mpegts" + //"bitbucket.org/ausocean/av/pes" + //"bitbucket.org/ausocean/av/tools" + //"bitbucket.org/ausocean/av/rtp" + "../mpegts" + "../pes" + "../tools" + "../rtp" ) var (