Seems to be working to some degree. Need to improve PSI stuff by adding CC information

This commit is contained in:
Jack Richardson 2018-01-16 15:36:51 +10:30
parent 0cf51ee5b1
commit 85ae2189f3
7 changed files with 151 additions and 106 deletions

View File

@ -58,7 +58,7 @@ func (p* H264Parser)Stop(){
func (p* H264Parser)Parse() {
p.isParsing = true
outputBuffer := []byte{}
outputBuffer := make([]byte, 0, 10000)
searchingForEnd := false
p.InputByteChan = make(chan byte, 10000)
for p.isParsing {

View File

@ -34,6 +34,11 @@ import (
_"fmt"
)
const (
maxMpegTsSize = 188
mpegtsPayloadSize = 176
)
/*
The below data struct encapsulates the fields of an MPEG-TS packet. Below is
the formatting of an MPEG-TS packet for reference!
@ -127,7 +132,7 @@ type MpegTsPacket struct {
// TODO: make payload private considering we now have FillPayload method
func (p *MpegTsPacket) FillPayload(channel chan byte){
p.Payload = []byte{}
p.Payload = make([]byte,0,mpegtsPayloadSize)
currentPktLength := 6 + int(tools.BoolToByte(p.PCRF))*6+int(tools.BoolToByte(p.OPCRF))*6+
int(tools.BoolToByte(p.SPF))*1+int(tools.BoolToByte(p.TPDF))*1+len(p.TPD)
for len(channel) > 0 && (currentPktLength+len(p.Payload)) < 188 {
@ -145,6 +150,7 @@ func (p *MpegTsPacket) ToByteSlice() (output []byte, err error) {
}
afl := 1+int(tools.BoolToByte(p.PCRF))*6+int(tools.BoolToByte(p.OPCRF))*
6+int(tools.BoolToByte(p.SPF))*1+int(tools.BoolToByte(p.TPDF))*1+len(p.TPD)+len(stuffing)
output = make([]byte,0,maxMpegTsSize)
output = append(output, []byte{
0x47,
(tools.BoolToByte(p.TEI)<<7 | tools.BoolToByte(p.PUSI)<<6 | tools.BoolToByte(p.Priority)<<5 |
@ -167,7 +173,6 @@ func (p *MpegTsPacket) ToByteSlice() (output []byte, err error) {
if p.TPDF {
output = append(output, append([]byte{p.TPDL}, p.TPD...)...)
}
output = append(output, append(p.Ext, append(stuffing, p.Payload...)...)...)
if len(output) != 188 {
err = errors.New("Length of MPEG-TS packet is not 188! Something is wrong!")

View File

@ -29,6 +29,11 @@ package pes
import (
"bitbucket.org/ausocean/av/tools"
)
const (
maxPesSize = 10000
)
/*
The below data struct encapsulates the fields of an PES packet. Below is
the formatting of a PES packet for reference!
@ -68,6 +73,7 @@ the formatting of a PES packet for reference!
| - | ... |
----------------------------------------------------------------------------
*/
// TODO: add DSMTM, ACI, CRC, Ext fields
type PESPacket struct {
StreamID byte // Type of stream
Length uint16 // Pes packet length in bytes after this field
@ -88,12 +94,12 @@ type PESPacket struct {
DTS uint64 // Decoding timestamp
ESCR uint64 // Elementary stream clock reference
ESR uint32 // Elementary stream rate reference
// TODO: add DSMTM, ACI, CRC, Ext fields
Stuff []byte // Stuffing bytes
Data []byte // Pes packet data
}
func (p *PESPacket) ToByteSlice() (output []byte) {
output = make([]byte, 0, maxPesSize)
output = append(output, []byte{
0x00, 0x00, 0x01,
p.StreamID,
@ -106,13 +112,13 @@ func (p *PESPacket) ToByteSlice() (output []byte) {
p.HeaderLength,
}...)
if p.PDI == byte(2) {
pts := 0x2100010001 | (p.PTS & 0x1C0000000) << 3 | (p.PTS & 0x3FFF8000)<<2 |
(p.PTS & 0x7FFF) << 1
pts := 0x2100010001 | (p.PTS&0x1C0000000)<<3 | (p.PTS&0x3FFF8000)<<2 |
(p.PTS&0x7FFF)<<1
output = append(output, []byte{
byte((pts & 0xFF00000000)>> 32),
byte((pts & 0x00FF000000)>> 24),
byte((pts & 0x0000FF0000)>>16),
byte((pts & 0x000000FF00)>>8),
byte((pts & 0xFF00000000) >> 32),
byte((pts & 0x00FF000000) >> 24),
byte((pts & 0x0000FF0000) >> 16),
byte((pts & 0x000000FF00) >> 8),
byte(pts & 0x00000000FF),
}...)
}

View File

@ -35,6 +35,7 @@ import (
"crypto/md5"
"encoding/hex"
"fmt"
"io"
"io/ioutil"
"log"
"net"
@ -43,7 +44,6 @@ import (
"os/exec"
"strconv"
"time"
"io"
"bitbucket.org/ausocean/av/h264"
"bitbucket.org/ausocean/av/tsgenerator"
@ -62,7 +62,7 @@ const (
rtpPackets = 7 // # of RTP packets per ethernet frame (7 is the max)
rtpHeaderSize = 12
rtpSSRC = 1 // any value will do
bufferSize = 1000 / clipDuration
bufferSize = 100 / clipDuration
httpTimeOut = 5 // s
motionThreshold = "0.0025"
qscale = "3"
@ -73,19 +73,27 @@ const (
)
const (
raspivid = 0
rtp = 1
h264Codec = 2
file = 4
httpOut = 5
Raspivid = 0
Rtp = 1
H264Codec = 2
File = 4
HttpOut = 5
)
var cmd *exec.Cmd
var inputReader *bufio.Reader
type Config struct {
Input uint8
InputCmd string
Output uint8
OutputFileName string
InputFileName string
Height string
Width string
Bitrate string
FrameRate string
HttpAddress string
}
type RevidInst interface {
@ -107,6 +115,8 @@ type revidInst struct {
Error *log.Logger
outputFile *os.File
inputFile *os.File
generator tsgenerator.TsGenerator
h264Parser h264.H264Parser
}
func NewRevidInstance(config Config) (r *revidInst, err error) {
@ -118,19 +128,25 @@ func NewRevidInstance(config Config) (r *revidInst, err error) {
r.dumpPCRBase = 0
r.ChangeState(config)
switch r.config.Output {
case file:
case File:
r.outputFile, err = os.Create(r.config.OutputFileName)
if err != nil {
return nil, err
}
}
switch r.config.Input {
case file:
case File:
r.inputFile, err = os.Open(r.config.InputFileName)
if err != nil {
return nil, err
}
}
r.generator = tsgenerator.NewTsGenerator(framesPerSec)
r.h264Parser = h264.H264Parser{OutputChan: r.generator.GetNalInputChan()}
// TODO: Need to create constructor for parser otherwise I'm going to break
// something eventuallyl
go r.h264Parser.Parse()
go r.input()
return
}
@ -142,25 +158,11 @@ func (r *revidInst) ChangeState(newConfig Config) error {
func (r *revidInst) Start() {
r.isRunning = true
go r.input()
go r.output()
}
func (r *revidInst) Stop() {
r.isRunning = false
}
func (r *revidInst) input() {
generator := tsgenerator.NewTsGenerator(framesPerSec)
go generator.Generate()
h264Parser := h264.H264Parser{OutputChan: generator.NalInputChan}
// TODO: Need to create constructor for parser otherwise I'm going to break
// something eventuallyl
go h264Parser.Parse()
var inputReader *bufio.Reader
go r.generator.Generate()
switch r.config.Input {
case raspivid:
cmd := exec.Command("raspivid", "-o", "-", "-n", "-t", "0", "-b", "1000000", "-w","1280","-h","720")
case Raspivid:
cmd = exec.Command("raspivid", "-o", "-", "-n", "-t", "0", "-b",
r.config.Bitrate, "-w", r.config.Width, "-h", r.config.Height, "-fps", r.config.FrameRate)
stdout, _ := cmd.StdoutPipe()
err := cmd.Start()
inputReader = bufio.NewReader(stdout)
@ -168,40 +170,30 @@ func (r *revidInst) input() {
r.Error.Println(err.Error())
return
}
case file:
case File:
default:
r.Error.Println("Input not valid!")
}
clipSize := 0
packetCount := 0
now := time.Now()
prevTime := now
startPackets := [][]byte{
{71, 64, 17, 16, 0, 66, 240, 65, 0, 1, 193, 0, 0, 255, 1, 255, 0, 1, 252, 128, 48, 72, 46, 1, 6, 70, 70, 109, 112, 101, 103, 37, 115, 116, 114, 101, 97, 109, 101, 100, 32, 98, 121, 32, 116, 104, 101, 32, 71, 101, 111, 86, 105, 115, 105, 111, 110, 32, 82, 116, 115, 112, 32, 83, 101, 114, 118, 101, 114, 99, 176, 214, 195, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255},
{71, 64, 0, 16, 0, 0, 176, 13, 0, 1, 193, 0, 0, 0, 1, 240, 0, 42, 177, 4, 178, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255},
/*PMT*/ {71, 80, 0, 16,
/*Start of payload*/
0, 2, 176, 18, 0, 1, 193, 0, 0, 0xE1, 0x00, 0xF0, 0, 0x1B, 0xE1, 0, 0xF0, 0, 0x15, 0xBD, 0x4D, 0x56, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255},
}
donePSI := false
ii := 0
fmt.Println("reading")
var h264Data []byte
switch(r.config.Input){
case raspivid:
go func(){
for {
h264Data = make([]byte, 2)
_,err := io.ReadFull(inputReader, h264Data)
if err == nil {
h264Parser.InputByteChan<-h264Data[0]
h264Parser.InputByteChan<-h264Data[1]
switch r.config.Input {
case Raspivid:
go func() {
for r.isRunning {
h264Data = make([]byte, 1)
_, err := io.ReadFull(inputReader, h264Data)
if err != nil {
if err.Error() == "EOF" {
r.Error.Println("No data from camera!")
time.Sleep(5*time.Second)
} else {
r.Error.Println(err.Error())
}
} else {
r.h264Parser.InputByteChan <- h264Data[0]
}
}
}()
case file:
case File:
stats, err := r.inputFile.Stat()
if err != nil {
panic("Could not get file stats!")
@ -212,32 +204,53 @@ func (r *revidInst) input() {
r.Error.Println(err.Error())
}
for i := range h264Data {
h264Parser.InputByteChan<-h264Data[i]
r.h264Parser.InputByteChan <- h264Data[i]
}
}
for r.isRunning {
go r.output()
}
func (r *revidInst) Stop() {
if r.isRunning {
r.isRunning = false
r.generator.Stop()
cmd.Process.Kill()
}
}
func (r *revidInst) input() {
clipSize := 0
packetCount := 0
now := time.Now()
prevTime := now
startPackets := [][]byte{
{71, 64, 17, 16, 0, 66, 240, 65, 0, 1, 193, 0, 0, 255, 1, 255, 0, 1, 252, 128, 48, 72, 46, 1, 6, 70, 70, 109, 112, 101, 103, 37, 115, 116, 114, 101, 97, 109, 101, 100, 32, 98, 121, 32, 116, 104, 101, 32, 71, 101, 111, 86, 105, 115, 105, 111, 110, 32, 82, 116, 115, 112, 32, 83, 101, 114, 118, 101, 114, 99, 176, 214, 195, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255},
{71, 64, 0, 16, 0, 0, 176, 13, 0, 1, 193, 0, 0, 0, 1, 240, 0, 42, 177, 4, 178, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255},
/*PMT*/ {71, 80, 0, 16,
/*Start of payload*/
0, 2, 176, 18, 0, 1, 193, 0, 0, 0xE1, 0x00, 0xF0, 0, 0x1B, 0xE1, 0, 0xF0, 0, 0x15, 0xBD, 0x4D, 0x56, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255},
}
fmt.Println("reading")
for {
if clip, err := r.ringBuffer.Get(); err != nil {
r.Error.Println(err.Error())
return
} else {
for {
tsPacket := <-(r.generator.GetTsOutputChan())
for ii:=0; ii < 3 && tsPacket.PUSI; ii++ {
upperBound := clipSize + mp2tPacketSize
if ii < 3 && !donePSI {
packetByteSlice := startPackets[ii]
copy(clip[clipSize:upperBound], packetByteSlice)
ii++
} else {
donePSI = true
if err != nil {
fmt.Println(err)
packetCount++
clipSize += mp2tPacketSize
}
tsPacket := <-generator.TsChan
byteSlice, err := tsPacket.ToByteSlice()
if err != nil {
r.Error.Println(err.Error())
}
copy(clip[clipSize:upperBound],byteSlice)
}
upperBound := clipSize + mp2tPacketSize
copy(clip[clipSize:upperBound], byteSlice)
packetCount++
clipSize += mp2tPacketSize
// send if (1) our buffer is full or (2) 1 second has elapsed and we have % packetsPerFrame
@ -262,8 +275,12 @@ func (r *revidInst) output() {
for r.isRunning {
if clip, err := r.ringBuffer.Read(); err == nil {
switch r.config.Output {
case file:
case File:
r.outputFile.Write(clip)
case HttpOut:
for err := sendClipToHTTP(clip, r.config.HttpAddress); err != nil; {
err = sendClipToHTTP(clip, r.config.HttpAddress)
}
default:
r.Error.Println("No output?")
}
@ -275,7 +292,7 @@ func (r *revidInst) output() {
}
// sendClipToHTPP posts a video clip via HTTP, using a new TCP connection each time.
func sendClipToHTTP(clip []byte, output string, _ net.Conn) error {
func sendClipToHTTP(clip []byte, output string) error {
timeout := time.Duration(httpTimeOut * time.Second)
client := http.Client{
Timeout: timeout,

View File

@ -61,9 +61,13 @@ func TestFileInput(t *testing.T){
*/
func TestRaspividInput(t *testing.T){
config := Config{
Input: raspivid,
Output: file,
Input: Raspivid,
Output: File,
OutputFileName: "output/TestRaspividOutput.ts",
Width: "1280",
Height: "720",
Bitrate: "1000000",
FrameRate: "25",
}
revidInst, err := NewRevidInstance(config)
if err != nil {

View File

@ -95,14 +95,16 @@ func (rb *ringBuffer) Get() ([]byte, error) {
if !rb.IsWritable() {
return nil, errors.New("Buffer full!")
}
if rb.currentlyWriting {
return nil, errors.New("Second call to Get! Call DoneWriting first!")
}
var nextlast int
if !rb.currentlyWriting {
rb.currentlyWriting = true
nextlast := rb.last + 1
nextlast = rb.last + 1
if nextlast == rb.size {
nextlast = 0
}
} else {
nextlast = rb.last
}
return rb.dataMemory[nextlast], nil
}

View File

@ -39,6 +39,9 @@ import (
type TsGenerator interface {
Generate()
GetNalInputChan() chan<- []byte
GetTsOutputChan() <-chan *mpegts.MpegTsPacket
Stop()
}
type tsGenerator struct {
@ -55,6 +58,15 @@ type tsGenerator struct {
currentPcrTime float64
fps uint
isGenerating bool
pesPktChan chan []byte
}
func (g *tsGenerator)GetNalInputChan() chan<- []byte {
return g.NalInputChan
}
func (g *tsGenerator)GetTsOutputChan() <-chan *mpegts.MpegTsPacket {
return g.TsChan
}
func NewTsGenerator(fps uint) (g *tsGenerator) {
@ -72,6 +84,8 @@ func NewTsGenerator(fps uint) (g *tsGenerator) {
g.fps = fps
g.currentPcrTime = .0
g.currentPtsTime = .7
g.pesPktChan = make(chan []byte, 1000)
g.payloadByteChan = make(chan byte, 100000)
return
}
@ -93,12 +107,9 @@ func (g *tsGenerator) Stop(){
func (g *tsGenerator) Generate() {
g.isGenerating = true
pesPktChan := make(chan []byte, 1000)
payloadByteChan := make(chan byte, 100000)
var rtpBuffer [](*rtp.RtpPacket)
for g.isGenerating {
select {
default:
case rtpPacket := <-g.inputChan:
rtpBuffer = append(rtpBuffer, &rtpPacket)
if len(rtpBuffer) > 2 {
@ -182,13 +193,13 @@ func (g *tsGenerator) Generate() {
Data: nalUnit,
HeaderLength: 5,
}
pesPktChan <- pesPkt.ToByteSlice()
case pesPkt := <-pesPktChan:
g.pesPktChan <- pesPkt.ToByteSlice()
case pesPkt := <-g.pesPktChan:
for ii := range pesPkt {
payloadByteChan <- pesPkt[ii]
g.payloadByteChan <- pesPkt[ii]
}
pusi := true
for len(payloadByteChan) > 0 {
for len(g.payloadByteChan) > 0 {
pkt := mpegts.MpegTsPacket{
PUSI: pusi,
PID: 256,
@ -197,7 +208,7 @@ func (g *tsGenerator) Generate() {
AFC: byte(3),
PCRF: pusi,
}
pkt.FillPayload(payloadByteChan)
pkt.FillPayload(g.payloadByteChan)
if pusi {
pkt.PCR = g.genPcr()
pusi = false