Updating remote

This commit is contained in:
Jack Richardson 2018-01-22 16:16:56 +10:30
parent b09cea19f6
commit 11620de642
1 changed files with 35 additions and 17 deletions

View File

@ -32,8 +32,6 @@ package revid
import ( import (
"bufio" "bufio"
"bytes" "bytes"
"crypto/md5"
"encoding/hex"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
@ -49,6 +47,7 @@ import (
"bitbucket.org/ausocean/av/tsgenerator" "bitbucket.org/ausocean/av/tsgenerator"
"bitbucket.org/ausocean/av/ringbuffer" "bitbucket.org/ausocean/av/ringbuffer"
"bitbucket.org/ausocean/utils/smartLogger"
) )
// defaults and networking consts // defaults and networking consts
@ -70,7 +69,7 @@ const (
framesPerSec = 25 framesPerSec = 25
packetsPerFrame = 7 packetsPerFrame = 7
h264BufferSize = 500000 h264BufferSize = 500000
bitrateTime = 10 bitrateTime = 60
) )
const ( const (
@ -95,7 +94,9 @@ type Config struct {
Bitrate string Bitrate string
FrameRate string FrameRate string
HttpAddress string HttpAddress string
Quantization string
Debug bool Debug bool
Logger smartLogger.LogInstance
} }
type RevidInst interface { type RevidInst interface {
@ -151,6 +152,8 @@ func NewRevidInstance(config Config) (r *revidInst, err error) {
go r.h264Parser.Parse() go r.h264Parser.Parse()
go r.input() go r.input()
go r.generator.Generate() go r.generator.Generate()
r.ErrOut("New revid instance created! Config is:")
r.ErrOut(fmt.Sprintf("%v",r.config))
return return
} }
@ -162,16 +165,16 @@ func (r *revidInst) ChangeState(newConfig Config) error {
func (r *revidInst) ErrOut(m string){ func (r *revidInst) ErrOut(m string){
if r.config.Debug { if r.config.Debug {
r.Error.Println(m) r.config.Logger.Log("Debug",m)
} }
} }
func (r *revidInst) Start() { func (r *revidInst) Start() {
r.isRunning = true
switch r.config.Input { switch r.config.Input {
case Raspivid: case Raspivid:
r.ErrOut("Starting raspivid!")
cmd = exec.Command("raspivid", "-o", "-", "-n", "-t", "0", "-b", cmd = exec.Command("raspivid", "-o", "-", "-n", "-t", "0", "-b",
r.config.Bitrate, "-w", r.config.Width, "-h", r.config.Height, "-fps", r.config.FrameRate, "-ih", "-g", "100") r.config.Bitrate,"-qp", r.config.Quantization, "-w", r.config.Width, "-h", r.config.Height, "-fps", r.config.FrameRate, "-ih", "-g", "100")
stdout, _ := cmd.StdoutPipe() stdout, _ := cmd.StdoutPipe()
err := cmd.Start() err := cmd.Start()
inputReader = bufio.NewReader(stdout) inputReader = bufio.NewReader(stdout)
@ -181,12 +184,14 @@ func (r *revidInst) Start() {
} }
case File: case File:
default: default:
r.Error.Println("Input not valid!") r.ErrOut("Input not valid!")
} }
r.isRunning = true
var h264Data []byte var h264Data []byte
switch r.config.Input { switch r.config.Input {
case Raspivid: case Raspivid:
go func() { go func() {
r.ErrOut("Reading camera data!")
for r.isRunning { for r.isRunning {
h264Data = make([]byte, 1) h264Data = make([]byte, 1)
_, err := io.ReadFull(inputReader, h264Data) _, err := io.ReadFull(inputReader, h264Data)
@ -205,7 +210,8 @@ func (r *revidInst) Start() {
case File: case File:
stats, err := r.inputFile.Stat() stats, err := r.inputFile.Stat()
if err != nil { if err != nil {
panic("Could not get file stats!") r.ErrOut("Could not get file stats!")
return
} }
h264Data = make([]byte, stats.Size()) h264Data = make([]byte, stats.Size())
_, err = r.inputFile.Read(h264Data) _, err = r.inputFile.Read(h264Data)
@ -221,6 +227,7 @@ func (r *revidInst) Start() {
func (r *revidInst) Stop() { func (r *revidInst) Stop() {
if r.isRunning { if r.isRunning {
r.ErrOut("Stopping revid!")
r.isRunning = false r.isRunning = false
cmd.Process.Kill() cmd.Process.Kill()
} }
@ -233,7 +240,9 @@ func (r *revidInst) input() {
prevTime := now prevTime := now
for { for {
if clip, err := r.ringBuffer.Get(); err != nil { if clip, err := r.ringBuffer.Get(); err != nil {
panic(err.Error()) r.ErrOut(err.Error())
r.Stop()
return
} else { } else {
for { for {
tsPacket := <-(r.generator.GetTsOutputChan()) tsPacket := <-(r.generator.GetTsOutputChan())
@ -250,7 +259,8 @@ func (r *revidInst) input() {
if (packetCount == mp2tMaxPackets) || if (packetCount == mp2tMaxPackets) ||
(now.Sub(prevTime) > clipDuration*time.Second && packetCount%packetsPerFrame == 0) { (now.Sub(prevTime) > clipDuration*time.Second && packetCount%packetsPerFrame == 0) {
if err := r.ringBuffer.DoneWriting(clipSize); err != nil { if err := r.ringBuffer.DoneWriting(clipSize); err != nil {
panic(err.Error()) r.ErrOut(err.Error())
r.ErrOut("Dropping that clip!")
} }
clipSize = 0 clipSize = 0
packetCount = 0 packetCount = 0
@ -266,16 +276,27 @@ func (r *revidInst) output() {
now := time.Now() now := time.Now()
prevTime := now prevTime := now
bytes := 0 bytes := 0
delay := 0
for r.isRunning { for r.isRunning {
if r.ringBuffer.GetNoOfElements() < 2 {
delay++
time.Sleep(time.Duration(delay)*time.Millisecond)
} else {
if delay > 10 {
delay -= 10
}
}
if clip, err := r.ringBuffer.Read(); err == nil { if clip, err := r.ringBuffer.Read(); err == nil {
r.ErrOut(fmt.Sprintf("Delay is: %v\n", delay))
r.ErrOut(fmt.Sprintf("Ring buffer size: %v\n", r.ringBuffer.GetNoOfElements()))
switch r.config.Output { switch r.config.Output {
case File: case File:
r.outputFile.Write(clip) r.outputFile.Write(clip)
case HttpOut: case HttpOut:
bytes += len(clip) bytes += len(clip)
for err := r.sendClipToHTTP(clip, r.config.HttpAddress); err != nil; { for err := r.sendClipToHTTP(clip, r.config.HttpAddress); err != nil; {
r.ErrOut("Post failed trying again!")
err = r.sendClipToHTTP(clip, r.config.HttpAddress) err = r.sendClipToHTTP(clip, r.config.HttpAddress)
time.Sleep(5*time.Second)
} }
default: default:
r.ErrOut("No output defined!") r.ErrOut("No output defined!")
@ -285,10 +306,8 @@ func (r *revidInst) output() {
} }
now = time.Now() now = time.Now()
deltaTime := now.Sub(prevTime) deltaTime := now.Sub(prevTime)
if deltaTime > time.Duration(bitrateTime)*time.Second { if r.config.Debug && deltaTime > time.Duration(bitrateTime)*time.Second {
if r.config.Debug {
fmt.Printf("Bitrate: %v bytes/s\n", int64(float64(bytes) / float64(deltaTime/1e9))) fmt.Printf("Bitrate: %v bytes/s\n", int64(float64(bytes) / float64(deltaTime/1e9)))
}
prevTime = now prevTime = now
bytes = 0 bytes = 0
} }
@ -302,8 +321,7 @@ func (r *revidInst)sendClipToHTTP(clip []byte, output string) error {
client := http.Client{ client := http.Client{
Timeout: timeout, Timeout: timeout,
} }
hash := md5.Sum(clip) url := output + strconv.Itoa(len(clip))
url := output + strconv.Itoa(len(clip)) + "." + hex.EncodeToString(hash[:]) // NB: append size.digest to output
if r.config.Debug { if r.config.Debug {
fmt.Printf("Posting %s (%d bytes)\n", url, len(clip)) fmt.Printf("Posting %s (%d bytes)\n", url, len(clip))
} }