Merged in RevidUpdateToIncorporateRingBuffer (pull request #4)

Update revid to incorporate use of RingBuffer

Approved-by: Alan Noble <anoble@gmail.com>
This commit is contained in:
saxon.milton@gmail.com 2017-12-02 22:13:25 +00:00 committed by Alan Noble
commit 5c52e6c5df
4 changed files with 140 additions and 56 deletions

View File

@ -96,7 +96,7 @@ Alan Noble <anoble@gmail.com>
# License # License
revid is Copyright (C) 2017 Alan Noble. ringBuffer is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the under the terms of the GNU General Public License as published by the

View File

@ -31,7 +31,9 @@ package main
import ( import (
"bufio" "bufio"
"bytes" "bytes"
"crypto/md5"
"encoding/binary" "encoding/binary"
"encoding/hex"
"flag" "flag"
"fmt" "fmt"
"io" "io"
@ -47,6 +49,8 @@ import (
"strings" "strings"
"time" "time"
"../ringbuffer"
"github.com/Comcast/gots/packet" "github.com/Comcast/gots/packet"
"github.com/Comcast/gots/packet/adaptationfield" "github.com/Comcast/gots/packet/adaptationfield"
"github.com/Comcast/gots/psi" "github.com/Comcast/gots/psi"
@ -65,8 +69,10 @@ const (
rtpPackets = 7 // # of RTP packets per ethernet frame (7 is the max) rtpPackets = 7 // # of RTP packets per ethernet frame (7 is the max)
rtpHeaderSize = 12 rtpHeaderSize = 12
rtpSSRC = 1 // any value will do rtpSSRC = 1 // any value will do
bitsInByte = 8 bufferSize = 1000
bitrateOutputDelay = 60 // s bitrateOutputDelay = 60 // s
httpTimeOut = 5 // s
clipDuration = 1 // s
) )
// flag values // flag values
@ -91,14 +97,18 @@ var (
dumpCC int dumpCC int
dumpPCRBase uint64 dumpPCRBase uint64
rtpSequenceNum uint16 rtpSequenceNum uint16
conn net.Conn
ffmpegPath string ffmpegPath string
tempDir string tempDir string
inputErrChan chan error
outputErrChan chan error
ringBuffer ringbuffer.RingBuffer
) )
// command-line flags // command-line flags
var ( var (
input = flag.String("i", "", "Input RTSP URL") inputURL = flag.String("i", "", "Input RTSP URL")
output = flag.String("o", "", "Output URL (HTTP, UDP or RTP)") outputURL = flag.String("o", "", "Output URL (HTTP, UDP or RTP)")
mode = flag.String("m", "r", "Mode: one of f,h,u,r or d") mode = flag.String("m", "r", "Mode: one of f,h,u,r or d")
flags = flag.Int("f", 0, "Flags: see readme for explanation") flags = flag.Int("f", 0, "Flags: see readme for explanation")
frameRate = flag.Int("r", defaultFrameRate, "Input video frame rate (25fps by default)") frameRate = flag.Int("r", defaultFrameRate, "Input video frame rate (25fps by default)")
@ -109,7 +119,7 @@ func main() {
setUpDirs() setUpDirs()
flag.Parse() flag.Parse()
if *input == "" { if *inputURL == "" {
log.Fatal("Input (-i) required\n") log.Fatal("Input (-i) required\n")
} }
@ -118,20 +128,20 @@ func main() {
sendClip = sendClipToFile sendClip = sendClipToFile
case "h": case "h":
sendClip = sendClipToHTTP sendClip = sendClipToHTTP
if *output == "" { if *outputURL == "" {
*output = defaultHTTPOutput *outputURL = defaultHTTPOutput
} }
case "u": case "u":
sendClip = sendClipToUDP sendClip = sendClipToUDP
packetsPerFrame = udpPackets packetsPerFrame = udpPackets
if *output == "" { if *outputURL == "" {
*output = defaultUDPOutput *outputURL = defaultUDPOutput
} }
case "r": case "r":
sendClip = sendClipToRTP sendClip = sendClipToRTP
packetsPerFrame = rtpPackets packetsPerFrame = rtpPackets
if *output == "" { if *outputURL == "" {
*output = defaultRTPOutput *outputURL = defaultRTPOutput
} }
case "d": case "d":
sendClip = sendClipToStdout sendClip = sendClipToStdout
@ -143,28 +153,47 @@ func main() {
log.Fatal("Cannot combine filterFixContinuity and dumpProgramInfo flags\n") log.Fatal("Cannot combine filterFixContinuity and dumpProgramInfo flags\n")
} }
ringBuffer.Make(bufferSize, mp2tPacketSize*mp2tMaxPackets)
inputErrChan = make(chan error, 10)
outputErrChan = make(chan error, 10)
go input(*inputURL, *outputURL)
go output(*outputURL)
// Sit in here once we execute the go routines and handle errors that may
// appear in the error channels
for { for {
err := readWriteVideo(*input, *output) select {
fmt.Fprintln(os.Stderr, err) default:
fmt.Fprintln(os.Stderr, "Trying again in 10s") case err := <-inputErrChan:
time.Sleep(10 * time.Second) fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, "Trying again in 10s")
time.Sleep(10 * time.Second)
go input(*inputURL, *outputURL)
case err := <-outputErrChan:
fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, "Attempting to write again!")
}
} }
} }
// set up directories based on the OS we're running on // setUpDirs sets directories based on the OS that Revid is running on
func setUpDirs() { func setUpDirs() {
if runtime.GOOS == "windows" { switch runtime.GOOS {
case "windows":
ffmpegPath = "C:/ffmpeg/ffmpeg" ffmpegPath = "C:/ffmpeg/ffmpeg"
tempDir = "tmp/" tempDir = "tmp/"
} else { case "darwin":
ffmpegPath = "/usr/local/bin/ffmpeg"
tempDir = "/tmp/"
default:
ffmpegPath = "/usr/bin/ffmpeg" ffmpegPath = "/usr/bin/ffmpeg"
tempDir = "/tmp/" tempDir = "/tmp/"
} }
} }
// readWriteVideo reads video from an RTSP stream (specified by the input URL) and // input handles the reading from the specified input
// rewrites the video in various formats and/or different protocols (HTTP, UDP or RTP). func input(input string, output string) {
func readWriteVideo(input string, output string) error {
fmt.Printf("Reading video from %s\n", input) fmt.Printf("Reading video from %s\n", input)
args := []string{ args := []string{
@ -200,11 +229,13 @@ func readWriteVideo(input string, output string) error {
cmd := exec.Command(ffmpegPath, args...) cmd := exec.Command(ffmpegPath, args...)
stdout, err := cmd.StdoutPipe() stdout, err := cmd.StdoutPipe()
if err != nil { if err != nil {
return fmt.Errorf("Error creating pipe: %s", err) inputErrChan <- err
return
} }
err = cmd.Start() err = cmd.Start()
if err != nil { if err != nil {
return fmt.Errorf("Error starting pipe: %s", err) inputErrChan <- err
return
} }
// (re)initialize globals // (re)initialize globals
@ -215,61 +246,81 @@ func readWriteVideo(input string, output string) error {
rtpSequenceNum = uint16(rand.Intn(1 << 15)) rtpSequenceNum = uint16(rand.Intn(1 << 15))
// for UDP and RTP only dial once // for UDP and RTP only dial once
var conn net.Conn
if strings.HasPrefix(output, "udp://") || strings.HasPrefix(output, "rtp://") { if strings.HasPrefix(output, "udp://") || strings.HasPrefix(output, "rtp://") {
conn, err = net.Dial("udp", output[6:]) conn, err = net.Dial("udp", output[6:])
if err != nil { if err != nil {
return fmt.Errorf("Error dialing %s: %s", output, err) inputErrChan <- err
return
} }
defer conn.Close()
} }
br := bufio.NewReader(stdout) br := bufio.NewReader(stdout)
pkt := make([]byte, mp2tPacketSize)
clip := make([]byte, mp2tMaxPackets*mp2tPacketSize)
clipSize := 0 clipSize := 0
packetCount := 0 packetCount := 0
now := time.Now() now := time.Now()
prevTime := now prevTime := now
fmt.Printf("Looping\n") fmt.Printf("Looping\n")
elapsedTime := time.Duration(0)
for { for {
_, err := io.ReadFull(br, pkt) if clip, err := ringBuffer.Get(); err != nil {
if err != nil { inputErrChan <- err
return fmt.Errorf("Error reading from ffmpeg: %s", err) return
} }
if *flags&filterFixContinuity != 0 && mp2tFixContinuity(pkt, packetCount, uint16(*selectedPID)) { for {
fmt.Printf("Packet #%d.%d fixed\n", clipCount, packetCount) upperBound := clipSize+mp2tPacketSize
} _, err := io.ReadFull(br, clip[clipSize:upperBound])
copy(clip[clipSize:], pkt) if err != nil {
packetCount++ inputErrChan <- err
clipSize += mp2tPacketSize return
// send if (1) our buffer is full or (2) 1 second has elapsed and we have % packetsPerFrame
now = time.Now()
if (packetCount == mp2tMaxPackets) ||
(now.Sub(prevTime) > 1*time.Second && packetCount%packetsPerFrame == 0) {
clipCount++
if err = sendClip(clip[:clipSize], output, conn); err != nil {
return err
} }
if *flags&filterFixContinuity != 0 && mp2tFixContinuity(clip[clipsSize:upperBound], packetCount, uint16(*selectedPID)) {
fmt.Printf("Packet #%d.%d fixed\n", clipCount, packetCount)
}
packetCount++
clipSize += mp2tPacketSize
// send if (1) our buffer is full or (2) 1 second has elapsed and we have % packetsPerFrame
now = time.Now()
if (packetCount == mp2tMaxPackets) ||
(now.Sub(prevTime) > clipDuration*time.Second && packetCount%packetsPerFrame == 0) {
clipCount++
if err := ringBuffer.DoneWriting(clipSize); err != nil {
inputErrChan <- err
return
}
clipSize = 0
packetCount = 0
prevTime = now
break
}
}
}
}
// Calculate bitrate and Output // output handles the writing to specified output
func output(output string) {
elapsedTime := time.Duration(0)
now := time.Now()
prevTime := now
for {
if clip, clipSize, err := ringBuffer.Read(); err == nil {
now := time.Now()
err := sendClip(clip[:clipSize], output, conn)
for err != nil {
outputErrChan <- err
err = sendClip(clip[:clipSize], output, conn)
}
deltaTime := now.Sub(prevTime) deltaTime := now.Sub(prevTime)
elapsedTime += deltaTime elapsedTime += deltaTime
if elapsedTime > bitrateOutputDelay*time.Nanosecond { if elapsedTime > bitrateOutputDelay*time.Second {
noOfBits := float64(clipSize * bitsInByte) / 1024.0 // to kbits noOfBits := float64(len(clip[:clipSize])*8) / 1024.0 // convert bytes to kilobits
fmt.Printf("Bitrate: %d kbps\n", int64(noOfBits/float64(deltaTime/1e9))) fmt.Printf("Bitrate: %d kbps\n", int64(noOfBits/float64(deltaTime/1e9)))
elapsedTime = time.Duration(0) elapsedTime = time.Duration(0)
} }
clipSize = 0
packetCount = 0
prevTime = now prevTime = now
if err := ringBuffer.DoneReading(); err != nil {
outputErrChan <- err
}
} }
} }
return nil
} }
// sendClipToFile writes a video clip to a /tmp file. // sendClipToFile writes a video clip to a /tmp file.
@ -285,9 +336,17 @@ func sendClipToFile(clip []byte, _ string, _ net.Conn) error {
// sendClipToHTPP posts a video clip via HTTP, using a new TCP connection each time. // sendClipToHTPP posts a video clip via HTTP, using a new TCP connection each time.
func sendClipToHTTP(clip []byte, output string, _ net.Conn) error { func sendClipToHTTP(clip []byte, output string, _ net.Conn) error {
url := output + strconv.Itoa(len(clip)) // NB: append the size to the output timeout := time.Duration(httpTimeOut * time.Second)
client := http.Client{
Timeout: timeout,
}
hash := md5.Sum(clip)
url := output + strconv.Itoa(len(clip)) + "." + hex.EncodeToString(hash[:]) // NB: append size.digest to output
fmt.Printf("Posting %s (%d bytes)\n", url, len(clip)) fmt.Printf("Posting %s (%d bytes)\n", url, len(clip))
resp, err := http.Post(url, "video/mp2t", bytes.NewReader(clip)) // lighter than NewBuffer resp, err := client.Post(url, "video/mp2t", bytes.NewReader(clip)) // lighter than NewBuffer
if err := checkContinuityCounts(clip); err != nil {
return err
}
if err != nil { if err != nil {
return fmt.Errorf("Error posting to %s: %s", output, err) return fmt.Errorf("Error posting to %s: %s", output, err)
} }
@ -315,6 +374,9 @@ func sendClipToUDP(clip []byte, _ string, conn net.Conn) error {
// sendClipToRTP sends a video clip over RTP. // sendClipToRTP sends a video clip over RTP.
func sendClipToRTP(clip []byte, _ string, conn net.Conn) error { func sendClipToRTP(clip []byte, _ string, conn net.Conn) error {
if err := checkContinuityCounts(clip); err != nil {
return err
}
size := rtpPackets * mp2tPacketSize size := rtpPackets * mp2tPacketSize
fmt.Printf("Sending %d RTP packets of size %d (%d bytes)\n", fmt.Printf("Sending %d RTP packets of size %d (%d bytes)\n",
len(clip)/size, size+rtpHeaderSize, len(clip)) len(clip)/size, size+rtpHeaderSize, len(clip))
@ -329,6 +391,20 @@ func sendClipToRTP(clip []byte, _ string, conn net.Conn) error {
return nil return nil
} }
// checkContinuityCounts checks that the continuity of the clip is correct
func checkContinuityCounts(clip []byte) error {
for offset := 0; offset < len(clip); offset += mp2tPacketSize {
dumpCC = -1
pkt := clip[offset : offset+mp2tPacketSize]
cc := int(pkt[3] & 0xf)
if dumpCC != -1 && cc != dumpCC {
return fmt.Errorf("Continuity count out of order. Expected %v, Got: %v.", dumpCC, cc)
}
dumpCC = (cc + 1) % 16
}
return nil
}
// sendClipToStdout dumps video stats to stdout. // sendClipToStdout dumps video stats to stdout.
func sendClipToStdout(clip []byte, _ string, _ net.Conn) error { func sendClipToStdout(clip []byte, _ string, _ net.Conn) error {
fmt.Printf("Dumping clip (%d bytes)\n", len(clip)) fmt.Printf("Dumping clip (%d bytes)\n", len(clip))

4
revid/test.bash Normal file
View File

@ -0,0 +1,4 @@
#!/bin/bash
echo Running Revid with input: rtsp://184.72.239.149/vod/mp4:BigBuckBunny_175k.mov
echo and output: rtp://0.0.0.0:1234
revid -i rtsp://184.72.239.149/vod/mp4:BigBuckBunny_175k.mov -m r -o rtp://0.0.0.0:1234

4
revid/test.bat Normal file
View File

@ -0,0 +1,4 @@
@echo off
echo Running Revid with input: rtsp://184.72.239.149/vod/mp4:BigBuckBunny_175k.mov
echo and output: rtp://0.0.0.0:1234
revid -i rtsp://184.72.239.149/vod/mp4:BigBuckBunny_175k.mov -m r -o rtp://0.0.0.0:1234