Seems to be compiling fine

builds fine -  need to test tomorrow
This commit is contained in:
Unknown 2018-01-30 16:54:39 +10:30
parent 76e253753d
commit ccd2bd2bed
4 changed files with 215 additions and 93 deletions

View File

@ -49,15 +49,16 @@ var (
type Parser interface { type Parser interface {
Stop() Stop()
Start() Start()
GetInputChan() GetInputChan() chan byte
GetOutputChan() GetOutputChan() chan []byte
SetOutputChan(achan chan []byte)
} }
type h264Parser struct { type h264Parser struct {
inputBuffer []byte inputBuffer []byte
isParsing bool isParsing bool
parserOutputChanRef chan<- []byte parserOutputChanRef chan []byte
userOutputChanRef <-chan []byte userOutputChanRef chan []byte
inputChan chan byte inputChan chan byte
} }
@ -80,7 +81,7 @@ func (p *h264Parser)GetInputChan() chan byte {
return p.inputChan return p.inputChan
} }
func (p *h264Parser)GetOutputChan() <-chan []byte { func (p *h264Parser)GetOutputChan() chan []byte {
return p.userOutputChanRef return p.userOutputChanRef
} }
@ -119,8 +120,8 @@ func (p *h264Parser)parse() {
type mjpegParser struct { type mjpegParser struct {
inputBuffer []byte inputBuffer []byte
isParsing bool isParsing bool
parserOutputChanRef chan<- []byte parserOutputChanRef chan []byte
userOutputChanRef <-chan []byte userOutputChanRef chan []byte
inputChan chan byte inputChan chan byte
} }
@ -143,7 +144,7 @@ func (p *mjpegParser)GetInputChan() chan byte {
return p.inputChan return p.inputChan
} }
func (p *mjpegParser)GetOutputChan() <-chan []byte { func (p *mjpegParser)GetOutputChan() chan []byte {
return p.userOutputChanRef return p.userOutputChanRef
} }

View File

@ -27,7 +27,8 @@ LICENSE
package pes package pes
import ( import (
"bitbucket.org/ausocean/av/tools" //"bitbucket.org/ausocean/av/tools"
"../tools"
) )
const ( const (

View File

@ -41,12 +41,17 @@ import (
"os/exec" "os/exec"
"strconv" "strconv"
"time" "time"
"errors"
"bitbucket.org/ausocean/av/h264" //"bitbucket.org/ausocean/av/h264"
"bitbucket.org/ausocean/av/tsgenerator" //"bitbucket.org/ausocean/av/tsgenerator"
"../parser"
"../tsgenerator"
"bitbucket.org/ausocean/av/ringbuffer" //"bitbucket.org/ausocean/av/ringbuffer"
"bitbucket.org/ausocean/utils/smartLogger" //"bitbucket.org/ausocean/utils/smartLogger"
"../ringbuffer"
"../../utils/smartLogger"
) )
// defaults and networking consts // defaults and networking consts
@ -63,7 +68,7 @@ const (
packetsPerFrame = 7 packetsPerFrame = 7
h264BufferSize = 500000 h264BufferSize = 500000
bitrateTime = 60 bitrateTime = 60
defualtFrameRate = 25 mjpegParserInChanLen= 10000
) )
// Log Types // Log Types
@ -76,28 +81,42 @@ const (
// Config enums // Config enums
const ( const (
Raspivid = 0 raspivid = 1
Rtp = 1 rtp = 2
H264Codec = 2 h264Codec = 3
File = 4 file = 4
HttpOut = 5 httpOut = 5
h264 = 6
mjpeg = 7
none = 8
mpegts = 9
) )
type Config struct { type Config struct {
Input uint8 Input uint8
InputCmd string InputCodec uint8
Output uint8 Output uint8
OutputFileName string OutputFileName string
InputFileName string InputFileName string
Height string Height string
Width string Width string
Bitrate string
FrameRate string FrameRate string
HttpAddress string HttpAddress string
Quantization string Quantization string
Timeout string
Packetization uint8
IntraRefreshPeriod string
Logger smartLogger.LogInstance Logger smartLogger.LogInstance
} }
// Default config settings
const (
defaultFrameRate = "25"
defaultWidth = "1280"
defaultHeight = "720"
defaultIntraRefreshPeriod = "100"
)
type RevidInst interface { type RevidInst interface {
Start() Start()
Stop() Stop()
@ -118,9 +137,10 @@ type revidInst struct {
outputFile *os.File outputFile *os.File
inputFile *os.File inputFile *os.File
generator tsgenerator.TsGenerator generator tsgenerator.TsGenerator
h264Parser h264.H264Parser parser parser.Parser
cmd *exec.Cmd cmd *exec.Cmd
inputReader *bufio.Reader inputReader *bufio.Reader
mjpegOutputChan chan []byte
} }
func NewRevidInstance(config Config) (r *revidInst, err error) { func NewRevidInstance(config Config) (r *revidInst, err error) {
@ -129,25 +149,36 @@ func NewRevidInstance(config Config) (r *revidInst, err error) {
r.dumpPCRBase = 0 r.dumpPCRBase = 0
r.ChangeState(config) r.ChangeState(config)
switch r.config.Output { switch r.config.Output {
case File: case file:
r.outputFile, err = os.Create(r.config.OutputFileName) r.outputFile, err = os.Create(r.config.OutputFileName)
if err != nil { if err != nil {
return nil, err return nil, err
} }
} }
switch r.config.Input { switch r.config.Input {
case File: case file:
r.inputFile, err = os.Open(r.config.InputFileName) r.inputFile, err = os.Open(r.config.InputFileName)
if err != nil { if err != nil {
return nil, err return nil, err
} }
} }
r.generator = tsgenerator.NewTsGenerator(defaultFrameRate) switch r.config.InputCodec {
case h264:
r.parser = parser.NewH264Parser() r.parser = parser.NewH264Parser()
case mjpeg:
r.parser = parser.NewMJPEGParser(mjpegParserInChanLen)
}
switch r.config.Packetization {
case none:
r.parser.SetOutputChan(r.mjpegOutputChan)
case mpegts:
frameRateAsInt, _ := strconv.Atoi(r.config.FrameRate)
r.generator = tsgenerator.NewTsGenerator(uint(frameRateAsInt))
r.parser.SetOutputChan(r.generator.GetNalInputChan()) r.parser.SetOutputChan(r.generator.GetNalInputChan())
r.generator.Start() r.generator.Start()
}
r.parser.Start() r.parser.Start()
go r.input() go r.packClips()
r.Log(Info, "New revid instance created! config is:") r.Log(Info, "New revid instance created! config is:")
r.Log(Info, fmt.Sprintf("%v", r.config)) r.Log(Info, fmt.Sprintf("%v", r.config))
return return
@ -157,14 +188,67 @@ func (r *revidInst) GetConfigRef() *Config {
return &r.config return &r.config
} }
func (r *revidInst) ChangeState(newconfig Config) error { func (r *revidInst) ChangeState(config Config) error {
// TODO: check that the config is legit switch config.Input {
r.config = newconfig case rtp:
case raspivid:
case file:
default:
return errors.New("Bad input type defined in config!")
}
switch config.InputCodec {
case h264:
case mjpeg:
default:
return errors.New("Bad input format defined in config!")
}
switch config.Output {
case httpOut:
case file:
default:
return errors.New("Bad output type defined in config!")
}
if integer, err := strconv.Atoi(config.Width); integer < 0 || err != nil {
return errors.New("Bad width defined in config!")
}
if integer, _ := strconv.Atoi(config.Width); integer == 0 {
config.Width = defaultWidth
}
if integer, err := strconv.Atoi(config.Height); integer < 0 || err != nil {
return errors.New("Bad height defined in config!")
}
if integer, _ := strconv.Atoi(config.Height); integer == 0 {
config.Height = defaultHeight
}
if integer, err := strconv.Atoi(config.FrameRate); integer < 0 || err != nil {
return errors.New("Bad FrameRate defined in config!")
}
if integer, _ := strconv.Atoi(config.FrameRate); integer == 0 {
config.FrameRate = defaultFrameRate
}
if integer, err := strconv.Atoi(config.Timeout); integer < 0 || err != nil {
return errors.New("Bad timeout define in config!")
}
if integer, err := strconv.Atoi(config.IntraRefreshPeriod); integer < 0 || err != nil {
return errors.New("Bad intra refresh period defined in config!")
}
if integer, _ := strconv.Atoi(config.IntraRefreshPeriod); integer == 0 {
config.IntraRefreshPeriod = defaultIntraRefreshPeriod
}
if integer, err := strconv.Atoi(config.Quantization); integer <= 0 || integer > 51 || err != nil {
return errors.New("Bad quantization level defined in config!")
}
r.config = config
return nil return nil
} }
func (r *revidInst) Log(logType, m string) { func (r *revidInst) Log(logType, m string) error {
if r.config.Logger != nil {
r.config.Logger.Log(logType, m) r.config.Logger.Log(logType, m)
return nil
} else {
return errors.New("No logger was defined in config for this instance!")
}
} }
func (r *revidInst) IsRunning() bool { func (r *revidInst) IsRunning() bool {
@ -177,12 +261,29 @@ func (r *revidInst) Start() {
return return
} }
r.Log(Info, "Starting Revid!") r.Log(Info, "Starting Revid!")
var data []byte
switch r.config.Input { switch r.config.Input {
case Raspivid: case raspivid:
r.Log(Info, "Starting raspivid!") r.Log(Info, "Starting raspivid!")
r.cmd = exec.Command("raspivid", "-o", "-", "-n", "-t", "0", "-b", var codec string
r.config.Bitrate, "-qp", r.config.Quantization, "-w", r.config.Width, "-h", r.config.Height, "-fps", r.config.FrameRate, "-ih", "-g", "100") switch r.config.InputCodec{
case h264:
codec = "H264"
case mjpeg:
codec = "MJPEG"
}
r.cmd = exec.Command("raspivid",
"-cd", codec,
"-o", "-",
"-n",
"-t", r.config.Timeout,
"-b", "0",
"-qp", r.config.Quantization,
"-w", r.config.Width,
"-h", r.config.Height,
"-fps", r.config.FrameRate,
"-ih",
"-g", r.config.IntraRefreshPeriod,
)
stdout, _ := r.cmd.StdoutPipe() stdout, _ := r.cmd.StdoutPipe()
err := r.cmd.Start() err := r.cmd.Start()
r.inputReader = bufio.NewReader(stdout) r.inputReader = bufio.NewReader(stdout)
@ -191,10 +292,32 @@ func (r *revidInst) Start() {
return return
} }
r.isRunning = true r.isRunning = true
go func() { case file:
stats, err := r.inputFile.Stat()
if err != nil {
r.Log(Error, "Could not get input file stats!")
r.Stop()
return
}
data := make([]byte, stats.Size())
_, err = r.inputFile.Read(data)
if err != nil {
r.Log(Error, err.Error())
r.Stop()
return
}
for i := range data {
r.parser.GetInputChan() <- data[i]
}
}
go r.readCamera()
go r.outputClips()
}
func (r *revidInst)readCamera() {
r.Log(Info, "Reading camera data!") r.Log(Info, "Reading camera data!")
for r.isRunning { for r.isRunning {
data = make([]byte, 1) data := make([]byte, 1)
_, err := io.ReadFull(r.inputReader, data) _, err := io.ReadFull(r.inputReader, data)
switch { switch {
case err != nil && err.Error() == "EOF" && r.isRunning: case err != nil && err.Error() == "EOF" && r.isRunning:
@ -203,30 +326,10 @@ func (r *revidInst) Start() {
case err != nil && r.isRunning: case err != nil && r.isRunning:
r.Log(Error, err.Error()) r.Log(Error, err.Error())
default: default:
r.parser.GetInput() <-h264Data[0] r.parser.GetInputChan()<-data[0]
} }
} }
r.Log(Info, "Out of reading routine!") r.Log(Info, "Out of reading routine!")
}()
case File:
stats, err := r.inputFile.Stat()
if err != nil {
r.Log(Error, "Could not get input file stats!")
r.Stop()
return
}
h264Data = make([]byte, stats.Size())
_, err = r.inputFile.Read(h264Data)
if err != nil {
r.Log(Error, err.Error())
r.Stop()
return
}
for i := range h264Data {
r.h264Parser.InputByteChan <- h264Data[i]
}
}
go r.output()
} }
func (r *revidInst) Stop() { func (r *revidInst) Stop() {
@ -239,7 +342,7 @@ func (r *revidInst) Stop() {
} }
} }
func (r *revidInst) input() { func (r *revidInst) packClips() {
clipSize := 0 clipSize := 0
packetCount := 0 packetCount := 0
now := time.Now() now := time.Now()
@ -247,13 +350,29 @@ func (r *revidInst) input() {
for { for {
if clip, err := r.ringBuffer.Get(); err != nil { if clip, err := r.ringBuffer.Get(); err != nil {
r.Log(Error, err.Error()) r.Log(Error, err.Error())
switch r.config.Packetization {
case none:
r.Log(Warning, "Clearing mjpeg chan!")
for len(r.mjpegOutputChan) > 0 {
<-(r.mjpegOutputChan)
}
case mpegts:
r.Log(Warning, "Clearing TS chan!") r.Log(Warning, "Clearing TS chan!")
for len(r.generator.GetTsOutputChan()) > 0 { for len(r.generator.GetTsOutputChan()) > 0 {
<-(r.generator.GetTsOutputChan()) <-(r.generator.GetTsOutputChan())
} }
}
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
} else { } else {
for { for {
switch r.config.Packetization {
case none:
frame := <-r.mjpegOutputChan
upperBound := clipSize + len(frame)
copy(clip[clipSize:upperBound], frame)
packetCount ++
clipSize += len(frame)
case mpegts:
tsPacket := <-(r.generator.GetTsOutputChan()) tsPacket := <-(r.generator.GetTsOutputChan())
tsByteSlice, err := tsPacket.ToByteSlice() tsByteSlice, err := tsPacket.ToByteSlice()
if err != nil { if err != nil {
@ -263,10 +382,10 @@ func (r *revidInst) input() {
copy(clip[clipSize:upperBound], tsByteSlice) copy(clip[clipSize:upperBound], tsByteSlice)
packetCount++ packetCount++
clipSize += mp2tPacketSize clipSize += mp2tPacketSize
}
// send if (1) our buffer is full or (2) 1 second has elapsed and we have % packetsPerFrame // send if (1) our buffer is full or (2) 1 second has elapsed and we have % packetsPerFrame
now = time.Now() now = time.Now()
if (packetCount == mp2tMaxPackets) || if now.Sub(prevTime) > clipDuration*time.Second && len(clip) > 0 {
(now.Sub(prevTime) > clipDuration*time.Second && packetCount%packetsPerFrame == 0) {
if err := r.ringBuffer.DoneWriting(clipSize); err != nil { if err := r.ringBuffer.DoneWriting(clipSize); err != nil {
r.Log(Error, err.Error()) r.Log(Error, err.Error())
r.Log(Warning, "Dropping clip!") r.Log(Warning, "Dropping clip!")
@ -281,7 +400,7 @@ func (r *revidInst) input() {
} }
} }
func (r *revidInst) output() { func (r *revidInst) outputClips() {
now := time.Now() now := time.Now()
prevTime := now prevTime := now
bytes := 0 bytes := 0
@ -298,9 +417,9 @@ func (r *revidInst) output() {
r.Log(Debug, fmt.Sprintf("Delay is: %v\n", delay)) r.Log(Debug, fmt.Sprintf("Delay is: %v\n", delay))
r.Log(Debug, fmt.Sprintf("Ring buffer size: %v\n", r.ringBuffer.GetNoOfElements())) r.Log(Debug, fmt.Sprintf("Ring buffer size: %v\n", r.ringBuffer.GetNoOfElements()))
switch r.config.Output { switch r.config.Output {
case File: case file:
r.outputFile.Write(clip) r.outputFile.Write(clip)
case HttpOut: case httpOut:
bytes += len(clip) bytes += len(clip)
for err := r.sendClipToHTTP(clip, r.config.HttpAddress); err != nil; { for err := r.sendClipToHTTP(clip, r.config.HttpAddress); err != nil; {
r.Log(Error, err.Error()) r.Log(Error, err.Error())

View File

@ -31,10 +31,14 @@ package tsgenerator
import ( import (
_"fmt" _"fmt"
_"os" _"os"
"bitbucket.org/ausocean/av/mpegts" //"bitbucket.org/ausocean/av/mpegts"
"bitbucket.org/ausocean/av/pes" //"bitbucket.org/ausocean/av/pes"
"bitbucket.org/ausocean/av/tools" //"bitbucket.org/ausocean/av/tools"
"bitbucket.org/ausocean/av/rtp" //"bitbucket.org/ausocean/av/rtp"
"../mpegts"
"../pes"
"../tools"
"../rtp"
) )
var ( var (
@ -52,7 +56,7 @@ const (
type TsGenerator interface { type TsGenerator interface {
generate() generate()
GetNalInputChan() chan<- []byte GetNalInputChan() chan []byte
GetTsOutputChan() <-chan *mpegts.MpegTsPacket GetTsOutputChan() <-chan *mpegts.MpegTsPacket
Start() Start()
genPts()(pts uint64) genPts()(pts uint64)
@ -64,8 +68,7 @@ type tsGenerator struct {
tsChan chan<- *mpegts.MpegTsPacket tsChan chan<- *mpegts.MpegTsPacket
InputChan chan<- rtp.RtpPacket InputChan chan<- rtp.RtpPacket
inputChan <-chan rtp.RtpPacket inputChan <-chan rtp.RtpPacket
NalInputChan chan<- []byte nalInputChan chan []byte
nalInputChan <-chan []byte
currentTsPacket *mpegts.MpegTsPacket currentTsPacket *mpegts.MpegTsPacket
payloadByteChan chan byte payloadByteChan chan byte
currentCC byte currentCC byte
@ -76,8 +79,8 @@ type tsGenerator struct {
ccMap map[int]int ccMap map[int]int
} }
func (g *tsGenerator)GetNalInputChan() chan<- []byte { func (g *tsGenerator)GetNalInputChan() chan []byte {
return g.NalInputChan return g.nalInputChan
} }
func (g *tsGenerator)GetTsOutputChan() <-chan *mpegts.MpegTsPacket { func (g *tsGenerator)GetTsOutputChan() <-chan *mpegts.MpegTsPacket {
@ -92,9 +95,7 @@ func NewTsGenerator(fps uint) (g *tsGenerator) {
inputChan := make(chan rtp.RtpPacket, 100) inputChan := make(chan rtp.RtpPacket, 100)
g.InputChan = inputChan g.InputChan = inputChan
g.inputChan = inputChan g.inputChan = inputChan
nalInputChan := make(chan []byte, 10000) g.nalInputChan = make(chan []byte, 10000)
g.NalInputChan = nalInputChan
g.nalInputChan = nalInputChan
g.currentCC = 0 g.currentCC = 0
g.fps = fps g.fps = fps
g.currentPcrTime = .0 g.currentPcrTime = .0
@ -179,7 +180,7 @@ func (g *tsGenerator) generate() {
buffer = append(buffer, rtpBuffer[0].Payload[2:]...) buffer = append(buffer, rtpBuffer[0].Payload[2:]...)
if tools.GetEndBit(rtpBuffer[0]) == 1 { if tools.GetEndBit(rtpBuffer[0]) == 1 {
rtpBuffer = rtpBuffer[1:] rtpBuffer = rtpBuffer[1:]
g.NalInputChan <- buffer g.nalInputChan <- buffer
break break
} }
rtpBuffer = rtpBuffer[1:] rtpBuffer = rtpBuffer[1:]
@ -199,7 +200,7 @@ func (g *tsGenerator) generate() {
buffer = append(buffer, rtpBuffer[0].Payload[0]&0xE0|rtpBuffer[0].Payload[1]&0x1F) buffer = append(buffer, rtpBuffer[0].Payload[0]&0xE0|rtpBuffer[0].Payload[1]&0x1F)
buffer = append(buffer, rtpBuffer[0].Payload[2:]...) buffer = append(buffer, rtpBuffer[0].Payload[2:]...)
rtpBuffer = rtpBuffer[1:] rtpBuffer = rtpBuffer[1:]
g.NalInputChan <- buffer g.nalInputChan <- buffer
default: default:
} }
} }