From 0f028b799c432be887deec63740208b38143553c Mon Sep 17 00:00:00 2001 From: Saxon Milton Date: Sat, 2 Dec 2017 15:57:10 +1030 Subject: [PATCH 1/5] Update revid to incorperate use of RingBuffer --- revid/Readme.md | 2 +- revid/revid.go | 193 ++++++++++++++++++++++++++++++++++-------------- revid/test.bash | 4 + revid/test.bat | 4 + 4 files changed, 147 insertions(+), 56 deletions(-) create mode 100644 revid/test.bash create mode 100644 revid/test.bat diff --git a/revid/Readme.md b/revid/Readme.md index 6cb7baa3..d31838c5 100644 --- a/revid/Readme.md +++ b/revid/Readme.md @@ -96,7 +96,7 @@ Alan Noble # License -revid is Copyright (C) 2017 Alan Noble. +ringBuffer is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the diff --git a/revid/revid.go b/revid/revid.go index b2cfae43..e44f6c19 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -31,7 +31,9 @@ package main import ( "bufio" "bytes" + "crypto/md5" "encoding/binary" + "encoding/hex" "flag" "fmt" "io" @@ -47,6 +49,8 @@ import ( "strings" "time" + "../utilities" + "github.com/Comcast/gots/packet" "github.com/Comcast/gots/packet/adaptationfield" "github.com/Comcast/gots/psi" @@ -65,8 +69,10 @@ const ( rtpPackets = 7 // # of RTP packets per ethernet frame (7 is the max) rtpHeaderSize = 12 rtpSSRC = 1 // any value will do - bitsInByte = 8 + bufferSize = 1000 bitrateOutputDelay = 60 // s + httpTimeOut = 5 // s + clipDuration = 1 // s ) // flag values @@ -91,14 +97,18 @@ var ( dumpCC int dumpPCRBase uint64 rtpSequenceNum uint16 + conn net.Conn ffmpegPath string tempDir string + inputErrChan chan error + outputErrChan chan error + ringBuffer utilities.RingBuffer ) // command-line flags var ( - input = flag.String("i", "", "Input RTSP URL") - output = flag.String("o", "", "Output URL (HTTP, UDP or RTP)") + inputURL = flag.String("i", "", "Input RTSP URL") + outputURL = flag.String("o", "", "Output URL (HTTP, UDP or RTP)") mode = flag.String("m", "r", "Mode: one of f,h,u,r or d") flags = flag.Int("f", 0, "Flags: see readme for explanation") frameRate = flag.Int("r", defaultFrameRate, "Input video frame rate (25fps by default)") @@ -109,7 +119,7 @@ func main() { setUpDirs() flag.Parse() - if *input == "" { + if *inputURL == "" { log.Fatal("Input (-i) required\n") } @@ -118,20 +128,20 @@ func main() { sendClip = sendClipToFile case "h": sendClip = sendClipToHTTP - if *output == "" { - *output = defaultHTTPOutput + if *outputURL == "" { + *outputURL = defaultHTTPOutput } case "u": sendClip = sendClipToUDP packetsPerFrame = udpPackets - if *output == "" { - *output = defaultUDPOutput + if *outputURL == "" { + *outputURL = defaultUDPOutput } case "r": sendClip = sendClipToRTP packetsPerFrame = rtpPackets - if *output == "" { - *output = defaultRTPOutput + if *outputURL == "" { + *outputURL = defaultRTPOutput } case "d": sendClip = sendClipToStdout @@ -143,28 +153,47 @@ func main() { log.Fatal("Cannot combine filterFixContinuity and dumpProgramInfo flags\n") } + ringBuffer.Make(bufferSize, mp2tPacketSize*mp2tMaxPackets) + inputErrChan = make(chan error, 10) + outputErrChan = make(chan error, 10) + + go input(*inputURL, *outputURL) + go output(*outputURL) + + // Sit in here once we execute the go routines and handle errors that may + // appear in the error channels for { - err := readWriteVideo(*input, *output) - fmt.Fprintln(os.Stderr, err) - fmt.Fprintln(os.Stderr, "Trying again in 10s") - time.Sleep(10 * time.Second) + select { + default: + case err := <-inputErrChan: + fmt.Fprintln(os.Stderr, err) + fmt.Fprintln(os.Stderr, "Trying again in 10s") + time.Sleep(10 * time.Second) + go input(*inputURL, *outputURL) + case err := <-outputErrChan: + fmt.Fprintln(os.Stderr, err) + fmt.Fprintln(os.Stderr, "Attempting to write again!") + } } } -// set up directories based on the OS we're running on +// setUpDirs sets directories based on the OS that Revid is running on func setUpDirs() { - if runtime.GOOS == "windows" { + switch runtime.GOOS { + case "windows": ffmpegPath = "C:/ffmpeg/ffmpeg" tempDir = "tmp/" - } else { + case "darwin": + ffmpegPath = "/usr/local/bin/ffmpeg" + tempDir = "/tmp/" + default: ffmpegPath = "/usr/bin/ffmpeg" tempDir = "/tmp/" } } -// readWriteVideo reads video from an RTSP stream (specified by the input URL) and -// rewrites the video in various formats and/or different protocols (HTTP, UDP or RTP). -func readWriteVideo(input string, output string) error { +// input handles the reading from the specified input +func input(input string, output string) { fmt.Printf("Reading video from %s\n", input) args := []string{ @@ -200,11 +229,13 @@ func readWriteVideo(input string, output string) error { cmd := exec.Command(ffmpegPath, args...) stdout, err := cmd.StdoutPipe() if err != nil { - return fmt.Errorf("Error creating pipe: %s", err) + inputErrChan <- err + return } err = cmd.Start() if err != nil { - return fmt.Errorf("Error starting pipe: %s", err) + inputErrChan <- err + return } // (re)initialize globals @@ -215,61 +246,83 @@ func readWriteVideo(input string, output string) error { rtpSequenceNum = uint16(rand.Intn(1 << 15)) // for UDP and RTP only dial once - var conn net.Conn if strings.HasPrefix(output, "udp://") || strings.HasPrefix(output, "rtp://") { conn, err = net.Dial("udp", output[6:]) if err != nil { - return fmt.Errorf("Error dialing %s: %s", output, err) + inputErrChan <- err + return } - defer conn.Close() } br := bufio.NewReader(stdout) pkt := make([]byte, mp2tPacketSize) - clip := make([]byte, mp2tMaxPackets*mp2tPacketSize) clipSize := 0 packetCount := 0 now := time.Now() prevTime := now fmt.Printf("Looping\n") - - elapsedTime := time.Duration(0) for { - _, err := io.ReadFull(br, pkt) - if err != nil { - return fmt.Errorf("Error reading from ffmpeg: %s", err) - - } - if *flags&filterFixContinuity != 0 && mp2tFixContinuity(pkt, packetCount, uint16(*selectedPID)) { - fmt.Printf("Packet #%d.%d fixed\n", clipCount, packetCount) - } - copy(clip[clipSize:], pkt) - packetCount++ - clipSize += mp2tPacketSize - - // send if (1) our buffer is full or (2) 1 second has elapsed and we have % packetsPerFrame - now = time.Now() - if (packetCount == mp2tMaxPackets) || - (now.Sub(prevTime) > 1*time.Second && packetCount%packetsPerFrame == 0) { - clipCount++ - if err = sendClip(clip[:clipSize], output, conn); err != nil { - return err + if clip, err := ringBuffer.Get(); err != nil { + inputErrChan <- err + return + } else { + for { + _, err := io.ReadFull(br, pkt) + if err != nil { + inputErrChan <- err + return + } + if *flags&filterFixContinuity != 0 && mp2tFixContinuity(pkt, packetCount, uint16(*selectedPID)) { + fmt.Printf("Packet #%d.%d fixed\n", clipCount, packetCount) + } + copy(clip[clipSize:], pkt) + packetCount++ + clipSize += mp2tPacketSize + // send if (1) our buffer is full or (2) 1 second has elapsed and we have % packetsPerFrame + now = time.Now() + if (packetCount == mp2tMaxPackets) || + (now.Sub(prevTime) > clipDuration*time.Second && packetCount%packetsPerFrame == 0) { + clipCount++ + if err := ringBuffer.DoneWriting(clipSize); err != nil { + inputErrChan <- err + return + } + clipSize = 0 + packetCount = 0 + prevTime = now + break + } } + } + } +} - // Calculate bitrate and Output +// output handles the writing to specified output +func output(output string) { + elapsedTime := time.Duration(0) + now := time.Now() + prevTime := now + for { + if clip, clipSize, _ := ringBuffer.Read(); clip != nil { + now := time.Now() + err := sendClip(clip[:clipSize], output, conn) + for err != nil { + outputErrChan <- err + err = sendClip(clip[:clipSize], output, conn) + } deltaTime := now.Sub(prevTime) elapsedTime += deltaTime - if elapsedTime > bitrateOutputDelay*time.Nanosecond { - noOfBits := float64(clipSize * bitsInByte) / 1024.0 // to kbits + if elapsedTime > bitrateOutputDelay*time.Second { + noOfBits := float64(len(clip[:clipSize])*8) / 1024.0 // convert bytes to kilobits fmt.Printf("Bitrate: %d kbps\n", int64(noOfBits/float64(deltaTime/1e9))) elapsedTime = time.Duration(0) } - clipSize = 0 - packetCount = 0 prevTime = now + if err := ringBuffer.DoneReading(); err != nil { + outputErrChan <- err + } } } - return nil } // sendClipToFile writes a video clip to a /tmp file. @@ -285,9 +338,21 @@ func sendClipToFile(clip []byte, _ string, _ net.Conn) error { // sendClipToHTPP posts a video clip via HTTP, using a new TCP connection each time. func sendClipToHTTP(clip []byte, output string, _ net.Conn) error { - url := output + strconv.Itoa(len(clip)) // NB: append the size to the output + if err := checkContinuityCounts(clip); err != nil { + return err + } + + timeout := time.Duration(httpTimeOut * time.Second) + client := http.Client{ + Timeout: timeout, + } + hash := md5.Sum(clip) + url := output + strconv.Itoa(len(clip)) + "." + hex.EncodeToString(hash[:]) // NB: append size.digest to output fmt.Printf("Posting %s (%d bytes)\n", url, len(clip)) - resp, err := http.Post(url, "video/mp2t", bytes.NewReader(clip)) // lighter than NewBuffer + resp, err := client.Post(url, "video/mp2t", bytes.NewReader(clip)) // lighter than NewBuffer + if err := checkContinuityCounts(clip); err != nil { + return err + } if err != nil { return fmt.Errorf("Error posting to %s: %s", output, err) } @@ -315,6 +380,9 @@ func sendClipToUDP(clip []byte, _ string, conn net.Conn) error { // sendClipToRTP sends a video clip over RTP. func sendClipToRTP(clip []byte, _ string, conn net.Conn) error { + if err := checkContinuityCounts(clip); err != nil { + return err + } size := rtpPackets * mp2tPacketSize fmt.Printf("Sending %d RTP packets of size %d (%d bytes)\n", len(clip)/size, size+rtpHeaderSize, len(clip)) @@ -329,6 +397,21 @@ func sendClipToRTP(clip []byte, _ string, conn net.Conn) error { return nil } +func checkContinuityCounts(clip []byte) error { + for offset := 0; offset < len(clip); offset += mp2tPacketSize { + dumpCC = -1 + pkt := clip[offset : offset+mp2tPacketSize] + + cc := int(pkt[3] & 0xf) + + if dumpCC != -1 && cc != dumpCC { + return fmt.Errorf("Continuity count out of order. Expected %v, Got: %v.", dumpCC, cc) + } + dumpCC = (cc + 1) % 16 + } + return nil +} + // sendClipToStdout dumps video stats to stdout. func sendClipToStdout(clip []byte, _ string, _ net.Conn) error { fmt.Printf("Dumping clip (%d bytes)\n", len(clip)) diff --git a/revid/test.bash b/revid/test.bash new file mode 100644 index 00000000..293b4e74 --- /dev/null +++ b/revid/test.bash @@ -0,0 +1,4 @@ +#!/bin/bash +echo Running Revid with input: rtsp://184.72.239.149/vod/mp4:BigBuckBunny_175k.mov +echo and output: rtp://0.0.0.0:1234 +revid -i rtsp://184.72.239.149/vod/mp4:BigBuckBunny_175k.mov -m r -o rtp://0.0.0.0:1234 diff --git a/revid/test.bat b/revid/test.bat new file mode 100644 index 00000000..59f5c573 --- /dev/null +++ b/revid/test.bat @@ -0,0 +1,4 @@ +@echo off +echo Running Revid with input: rtsp://184.72.239.149/vod/mp4:BigBuckBunny_175k.mov +echo and output: rtp://0.0.0.0:1234 +revid -i rtsp://184.72.239.149/vod/mp4:BigBuckBunny_175k.mov -m r -o rtp://0.0.0.0:1234 From d63b5f6751101a160d3931ebcdc9deb299e2dea8 Mon Sep 17 00:00:00 2001 From: Saxon1 Date: Sun, 3 Dec 2017 02:34:14 +1030 Subject: [PATCH 2/5] Removed continuity check and also checked read error rather than whether or not clip is nil. --- revid/revid.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index e44f6c19..eed58893 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -303,7 +303,7 @@ func output(output string) { now := time.Now() prevTime := now for { - if clip, clipSize, _ := ringBuffer.Read(); clip != nil { + if clip, clipSize, err := ringBuffer.Read(); err == nil { now := time.Now() err := sendClip(clip[:clipSize], output, conn) for err != nil { @@ -338,10 +338,6 @@ func sendClipToFile(clip []byte, _ string, _ net.Conn) error { // sendClipToHTPP posts a video clip via HTTP, using a new TCP connection each time. func sendClipToHTTP(clip []byte, output string, _ net.Conn) error { - if err := checkContinuityCounts(clip); err != nil { - return err - } - timeout := time.Duration(httpTimeOut * time.Second) client := http.Client{ Timeout: timeout, @@ -397,13 +393,12 @@ func sendClipToRTP(clip []byte, _ string, conn net.Conn) error { return nil } +// checkContinuityCounts checks that the continuity of the clip is correct func checkContinuityCounts(clip []byte) error { for offset := 0; offset < len(clip); offset += mp2tPacketSize { dumpCC = -1 pkt := clip[offset : offset+mp2tPacketSize] - cc := int(pkt[3] & 0xf) - if dumpCC != -1 && cc != dumpCC { return fmt.Errorf("Continuity count out of order. Expected %v, Got: %v.", dumpCC, cc) } From 9b779382eb991356b74ec2d250e14589743fe6f6 Mon Sep 17 00:00:00 2001 From: Saxon1 Date: Sun, 3 Dec 2017 02:42:47 +1030 Subject: [PATCH 3/5] Removed copy to increase efficiency --- revid/revid.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index eed58893..f04ad86f 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -49,7 +49,7 @@ import ( "strings" "time" - "../utilities" + "../ringbuffer" "github.com/Comcast/gots/packet" "github.com/Comcast/gots/packet/adaptationfield" @@ -255,7 +255,6 @@ func input(input string, output string) { } br := bufio.NewReader(stdout) - pkt := make([]byte, mp2tPacketSize) clipSize := 0 packetCount := 0 now := time.Now() @@ -267,15 +266,15 @@ func input(input string, output string) { return } else { for { - _, err := io.ReadFull(br, pkt) + upperBound := clipSize+mp2tPacketSize + _, err := io.ReadFull(br, clip[clipSize:upperBound]) if err != nil { inputErrChan <- err return } - if *flags&filterFixContinuity != 0 && mp2tFixContinuity(pkt, packetCount, uint16(*selectedPID)) { + if *flags&filterFixContinuity != 0 && mp2tFixContinuity(clip[clipsSize:upperBound], packetCount, uint16(*selectedPID)) { fmt.Printf("Packet #%d.%d fixed\n", clipCount, packetCount) } - copy(clip[clipSize:], pkt) packetCount++ clipSize += mp2tPacketSize // send if (1) our buffer is full or (2) 1 second has elapsed and we have % packetsPerFrame From 98cd4ff9ac8c37ec9d0320b5da7a981d93777947 Mon Sep 17 00:00:00 2001 From: Saxon1 Date: Sun, 3 Dec 2017 02:45:33 +1030 Subject: [PATCH 4/5] Removed redundant else, to further abide by go style standards --- revid/revid.go | 47 +++++++++++++++++++++++------------------------ 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index f04ad86f..0ea02a9b 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -264,33 +264,32 @@ func input(input string, output string) { if clip, err := ringBuffer.Get(); err != nil { inputErrChan <- err return - } else { - for { - upperBound := clipSize+mp2tPacketSize - _, err := io.ReadFull(br, clip[clipSize:upperBound]) - if err != nil { + } + for { + upperBound := clipSize+mp2tPacketSize + _, err := io.ReadFull(br, clip[clipSize:upperBound]) + if err != nil { + inputErrChan <- err + return + } + if *flags&filterFixContinuity != 0 && mp2tFixContinuity(clip[clipsSize:upperBound], packetCount, uint16(*selectedPID)) { + fmt.Printf("Packet #%d.%d fixed\n", clipCount, packetCount) + } + packetCount++ + clipSize += mp2tPacketSize + // send if (1) our buffer is full or (2) 1 second has elapsed and we have % packetsPerFrame + now = time.Now() + if (packetCount == mp2tMaxPackets) || + (now.Sub(prevTime) > clipDuration*time.Second && packetCount%packetsPerFrame == 0) { + clipCount++ + if err := ringBuffer.DoneWriting(clipSize); err != nil { inputErrChan <- err return } - if *flags&filterFixContinuity != 0 && mp2tFixContinuity(clip[clipsSize:upperBound], packetCount, uint16(*selectedPID)) { - fmt.Printf("Packet #%d.%d fixed\n", clipCount, packetCount) - } - packetCount++ - clipSize += mp2tPacketSize - // send if (1) our buffer is full or (2) 1 second has elapsed and we have % packetsPerFrame - now = time.Now() - if (packetCount == mp2tMaxPackets) || - (now.Sub(prevTime) > clipDuration*time.Second && packetCount%packetsPerFrame == 0) { - clipCount++ - if err := ringBuffer.DoneWriting(clipSize); err != nil { - inputErrChan <- err - return - } - clipSize = 0 - packetCount = 0 - prevTime = now - break - } + clipSize = 0 + packetCount = 0 + prevTime = now + break } } } From 174acf5ba8c12748c68bc7f99064d546408189ec Mon Sep 17 00:00:00 2001 From: Saxon1 Date: Sun, 3 Dec 2017 02:47:13 +1030 Subject: [PATCH 5/5] changed package corresponding to RingBuffer.go to ringbuffer from utilities --- revid/revid.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/revid/revid.go b/revid/revid.go index 0ea02a9b..70a50f2f 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -102,7 +102,7 @@ var ( tempDir string inputErrChan chan error outputErrChan chan error - ringBuffer utilities.RingBuffer + ringBuffer ringbuffer.RingBuffer ) // command-line flags