Merged in IncorperatingRingBufferIntoRevid (pull request #5)

IncorperatingRingBufferIntoRevid

Approved-by: Alan Noble <anoble@gmail.com>
This commit is contained in:
saxon.milton@gmail.com 2017-12-04 05:31:12 +00:00 committed by Alan Noble
commit 4997c561a1
2 changed files with 40 additions and 34 deletions

View File

@ -153,7 +153,7 @@ func main() {
log.Fatal("Cannot combine filterFixContinuity and dumpProgramInfo flags\n")
}
ringBuffer.Make(bufferSize, mp2tPacketSize*mp2tMaxPackets)
ringBuffer = ringbuffer.NewRingBuffer(bufferSize, mp2tPacketSize*mp2tMaxPackets)
inputErrChan = make(chan error, 10)
outputErrChan = make(chan error, 10)
@ -264,32 +264,33 @@ func input(input string, output string) {
if clip, err := ringBuffer.Get(); err != nil {
inputErrChan <- err
return
}
for {
upperBound := clipSize+mp2tPacketSize
_, err := io.ReadFull(br, clip[clipSize:upperBound])
if err != nil {
inputErrChan <- err
return
}
if *flags&filterFixContinuity != 0 && mp2tFixContinuity(clip[clipsSize:upperBound], packetCount, uint16(*selectedPID)) {
fmt.Printf("Packet #%d.%d fixed\n", clipCount, packetCount)
}
packetCount++
clipSize += mp2tPacketSize
// send if (1) our buffer is full or (2) 1 second has elapsed and we have % packetsPerFrame
now = time.Now()
if (packetCount == mp2tMaxPackets) ||
(now.Sub(prevTime) > clipDuration*time.Second && packetCount%packetsPerFrame == 0) {
clipCount++
if err := ringBuffer.DoneWriting(clipSize); err != nil {
} else {
for {
upperBound := clipSize + mp2tPacketSize
_, err := io.ReadFull(br, clip[clipSize:upperBound])
if err != nil {
inputErrChan <- err
return
}
clipSize = 0
packetCount = 0
prevTime = now
break
if *flags&filterFixContinuity != 0 && mp2tFixContinuity(clip[clipSize:upperBound], packetCount, uint16(*selectedPID)) {
fmt.Printf("Packet #%d.%d fixed\n", clipCount, packetCount)
}
packetCount++
clipSize += mp2tPacketSize
// send if (1) our buffer is full or (2) 1 second has elapsed and we have % packetsPerFrame
now = time.Now()
if (packetCount == mp2tMaxPackets) ||
(now.Sub(prevTime) > clipDuration*time.Second && packetCount%packetsPerFrame == 0) {
clipCount++
if err := ringBuffer.DoneWriting(clipSize); err != nil {
inputErrChan <- err
return
}
clipSize = 0
packetCount = 0
prevTime = now
break
}
}
}
}
@ -301,17 +302,17 @@ func output(output string) {
now := time.Now()
prevTime := now
for {
if clip, clipSize, err := ringBuffer.Read(); err == nil {
if clip, err := ringBuffer.Read(); err == nil {
now := time.Now()
err := sendClip(clip[:clipSize], output, conn)
err := sendClip(clip, output, conn)
for err != nil {
outputErrChan <- err
err = sendClip(clip[:clipSize], output, conn)
err = sendClip(clip, output, conn)
}
deltaTime := now.Sub(prevTime)
elapsedTime += deltaTime
if elapsedTime > bitrateOutputDelay*time.Second {
noOfBits := float64(len(clip[:clipSize])*8) / 1024.0 // convert bytes to kilobits
noOfBits := float64(len(clip)*8) / 1024.0 // convert bytes to kilobits
fmt.Printf("Bitrate: %d kbps\n", int64(noOfBits/float64(deltaTime/1e9)))
elapsedTime = time.Duration(0)
}
@ -344,9 +345,6 @@ func sendClipToHTTP(clip []byte, output string, _ net.Conn) error {
url := output + strconv.Itoa(len(clip)) + "." + hex.EncodeToString(hash[:]) // NB: append size.digest to output
fmt.Printf("Posting %s (%d bytes)\n", url, len(clip))
resp, err := client.Post(url, "video/mp2t", bytes.NewReader(clip)) // lighter than NewBuffer
if err := checkContinuityCounts(clip); err != nil {
return err
}
if err != nil {
return fmt.Errorf("Error posting to %s: %s", output, err)
}
@ -374,9 +372,6 @@ func sendClipToUDP(clip []byte, _ string, conn net.Conn) error {
// sendClipToRTP sends a video clip over RTP.
func sendClipToRTP(clip []byte, _ string, conn net.Conn) error {
if err := checkContinuityCounts(clip); err != nil {
return err
}
size := rtpPackets * mp2tPacketSize
fmt.Printf("Sending %d RTP packets of size %d (%d bytes)\n",
len(clip)/size, size+rtpHeaderSize, len(clip))

View File

@ -37,6 +37,17 @@ import (
RingBuffer aims to provide functionality of a RingBuffer data structure.
It may be used in concurrent routines.
*/
type RingBuffer interface {
Get() ([]byte, error)
DoneWriting(size int) error
Read() ([]byte, error)
DoneReading() error
IsReadable() bool
IsWritable() bool
}
// ringBuffer implements the RingBuffer interface
type ringBuffer struct {
dataMemory [][]byte
sizeMemory []int