Fixed errors and bugs and it seems like flv packetization is working

This commit is contained in:
Unknown 2018-02-12 18:28:29 +10:30
parent ec796bd9ae
commit c46a8d8f08
12 changed files with 250 additions and 183 deletions

View File

@ -2,6 +2,7 @@ package flv
import (
"../tools"
"fmt"
)
const (
@ -12,7 +13,11 @@ const (
)
const (
videoTagType = 9
VideoTagType = 9
KeyFrameType = 1
H264 = 7
AVCNALU = 1
DataHeaderLength = 5
)
type Header struct {
@ -20,13 +25,14 @@ type Header struct {
VideoFlag bool
}
func (h *Header) toByteSlice() []byte {
func (h *Header) ToByteSlice() (output []byte) {
output = make([]byte, 0, headerLength)
output = append(output, []byte{0x46, 0x4C, 0x56,
version,
0x00 | tools.boolToByte(h.audioFlag)<<3 | tools.boolToByte(h.videoFlag),
0x00, 0x00, 0x00, byte(72),
0x00 | tools.BoolToByte(h.AudioFlag)<<2 | tools.BoolToByte(h.VideoFlag),
0x00, 0x00, 0x00, byte(9),
}...)
fmt.Println(output)
return
}
@ -36,41 +42,48 @@ type VideoTag struct {
DataSize uint32
Timestamp uint32
TimestampExtended uint32
FrameType byte
Codec byte
PacketType byte
CompositionTime uint32
Data []byte
}
func (t *VideoTag) toByteSlice() (output []byte) {
func (t *VideoTag) ToByteSlice() (output []byte) {
output = make([]byte, 0, maxVideoTagSize)
output = append(output, []byte{
byte(t.prevTagSize >> 24),
byte(t.prevTagSize >> 16),
byte(t.prevTagSize >> 8),
byte(t.prevTagSize),
byte(t.tageType),
byte(t.dataSize >> 16),
byte(t.dataSize >> 8),
byte(t.dataSize),
byte(t.timeStamp >> 16),
byte(t.timeStamp >> 8),
byte(t.timeStamp),
byte(t.timestampExtended),
byte(t.PrevTagSize >> 24),
byte(t.PrevTagSize >> 16),
byte(t.PrevTagSize >> 8),
byte(t.PrevTagSize),
byte(t.TagType),
byte(t.DataSize >> 16),
byte(t.DataSize >> 8),
byte(t.DataSize),
byte(t.Timestamp >> 16),
byte(t.Timestamp >> 8),
byte(t.Timestamp),
byte(t.TimestampExtended),
0x00, 0x00, 0x00,
byte(t.FrameType << 4) | byte(t.Codec),
t.PacketType,
byte(t.CompositionTime >> 16),byte(t.CompositionTime >> 8),byte(t.CompositionTime),
}...)
output = append(output, data...)
output = append(output, t.Data...)
return
}
type AudioTag struct {
soundFormat uint8
soundRate uint8
soundSize uint8
soundType uint8
data []byte
SoundFormat uint8
SoundRate uint8
SoundSize uint8
SoundType uint8
Data []byte
}
func (t *AudioTage) toByteSlice() (output []byte) {
func (t *AudioTag) ToByteSlice() (output []byte) {
output = make([]byte, 0, maxAudioTagSize)
output = append(output, byte(soundFormat<<4)|byte(soundRate<<2)|byte(soundSize<<1)|byte(soundType))
output = append(output, data...)
output = append(output, byte(t.SoundFormat<<4)|byte(t.SoundRate<<2)|byte(t.SoundSize<<1)|byte(t.SoundType))
output = append(output, t.Data...)
return
}

View File

@ -1,10 +1,23 @@
package generator
import (
"../flv"
)
const (
inputChanLength = 1000
outputChanLength = 1000
)
type flvGenerator struct {
fps uint
inputChan chan []byte
outputChan chan []byte
header Header
audioFlag bool
videoFlag bool
lastTagSize int
currentTimestamp uint32
header flv.Header
}
func (g *flvGenerator)GetInputChan() chan []byte {
@ -15,10 +28,15 @@ func (g *flvGenerator)GetOutputChan() chan []byte {
return g.outputChan
}
func NewFlvGenerator() (g *flvGenerator) {
func NewFlvGenerator(audio bool, video bool, fps uint) (g *flvGenerator) {
g = new(flvGenerator)
g.timestamp = 0
g.fps = fps
g.audioFlag = audio
g.videoFlag = video
g.currentTimestamp = 0
g.lastTagSize = 0
g.inputChan = make(chan []byte, inputChanLength)
g.outputChan = make(chan []byte, outputChanLength)
return
}
@ -28,36 +46,42 @@ func (g *flvGenerator) Start(){
func (g *flvGenerator) GenHeader(){
header := flv.Header{
AudioFlag: true,
VideoFlag: true,
AudioFlag: g.audioFlag,
VideoFlag: g.videoFlag,
}
g.outputChan <- header.toByteSlice()
g.outputChan <- header.ToByteSlice()
}
func (g *flvGenerator) getNextTimestamp() (timestamp uint32){
timestamp = g.currentTimestamp
g.currentTimeStamp += 100*time.Millisecond() / g.fps
g.currentTimestamp += uint32(1000) / uint32(g.fps)
return
}
func (g *flvGenerator) ResetTimestamp() {
g.timestamp = 0
g.currentTimestamp = 0
}
func (g *tsGenerator) generate() {
func (g *flvGenerator) generate() {
g.GenHeader()
for {
select {
case videoFrame := <-g.inputChan
tag := VideoTage{
PrevTagSize: g.lastTagSize,
TagType: flv.videoTagType,
DataSize: len(videoFrame),
case videoFrame := <-g.inputChan:
tag := flv.VideoTag{
PrevTagSize: uint32(g.lastTagSize),
TagType: uint8(flv.VideoTagType),
DataSize: uint32(len(videoFrame)) + flv.DataHeaderLength,
Timestamp: g.getNextTimestamp(),
TimestampExtended: 0,
Data: videoFrame
FrameType: flv.KeyFrameType,
Codec: flv.H264,
PacketType: flv.AVCNALU,
CompositionTime: 0,
Data: videoFrame,
}
g.outputChan<-tag.toByteSlice()
tagAsByteSlice := tag.ToByteSlice()
g.lastTagSize = len(tagAsByteSlice)
g.outputChan<-tagAsByteSlice
}
}
}

View File

@ -28,22 +28,8 @@ LICENSE
package generator
import (
_"fmt"
_"os"
//"bitbucket.org/ausocean/av/mpegts"
//"bitbucket.org/ausocean/av/pes"
//"bitbucket.org/ausocean/av/tools"
//"bitbucket.org/ausocean/av/rtp"
"../mpegts"
"../pes"
"../tools"
"../rtp"
)
type Generator interface {
GetInputChan() chan []byte
GetOutputChan() <-chan *mpegts.MpegTsPacket
GetOutputChan() chan []byte
Start()
Stop()
}

View File

@ -1,5 +1,18 @@
package generator
import (
_"fmt"
_"os"
//"bitbucket.org/ausocean/av/mpegts"
//"bitbucket.org/ausocean/av/pes"
//"bitbucket.org/ausocean/av/tools"
//"bitbucket.org/ausocean/av/rtp"
"../mpegts"
"../pes"
"../tools"
"../rtp"
)
var (
PatTable = []byte{0, 0, 176, 13, 0, 1, 193, 0, 0, 0, 1, 240, 0, 42, 177, 4, 178, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,}
@ -14,10 +27,8 @@ const (
)
type tsGenerator struct {
TsChan <-chan *mpegts.MpegTsPacket
tsChan chan<- *mpegts.MpegTsPacket
InputChan chan<- rtp.RtpPacket
inputChan <-chan rtp.RtpPacket
rtpInputChan chan rtp.RtpPacket
outputChan chan []byte
nalInputChan chan []byte
currentTsPacket *mpegts.MpegTsPacket
payloadByteChan chan byte
@ -33,18 +44,14 @@ func (g *tsGenerator)GetInputChan() chan []byte {
return g.nalInputChan
}
func (g *tsGenerator)GetOutputChan() <-chan *mpegts.MpegTsPacket {
return g.TsChan
func (g *tsGenerator)GetOutputChan() chan []byte {
return g.outputChan
}
func NewTsGenerator(fps uint) (g *tsGenerator) {
g = new(tsGenerator)
tsChan := make(chan *mpegts.MpegTsPacket, 100)
g.TsChan = tsChan
g.tsChan = tsChan
inputChan := make(chan rtp.RtpPacket, 100)
g.InputChan = inputChan
g.inputChan = inputChan
g.outputChan = make(chan []byte, 100)
g.rtpInputChan = make(chan rtp.RtpPacket, 100)
g.nalInputChan = make(chan []byte, 10000)
g.currentCC = 0
g.fps = fps
@ -80,7 +87,7 @@ func (g *tsGenerator) generate() {
var rtpBuffer [](*rtp.RtpPacket)
for {
select {
case rtpPacket := <-g.inputChan:
case rtpPacket := <-g.rtpInputChan:
rtpBuffer = append(rtpBuffer, &rtpPacket)
if len(rtpBuffer) > 2 {
// if there's something weird going on with sequence numbers then
@ -189,7 +196,9 @@ func (g *tsGenerator) generate() {
AFC: 1,
Payload: PatTable,
}
g.tsChan <- &patPkt
patPktAsByteSlice, _ := patPkt.ToByteSlice()
g.outputChan <- patPktAsByteSlice
// Create pmt table and send off
pmtPkt := mpegts.MpegTsPacket{
@ -199,12 +208,14 @@ func (g *tsGenerator) generate() {
AFC: 1,
Payload: PmtTable,
}
g.tsChan <- &pmtPkt
pmtPktAsByteSlice, _ := pmtPkt.ToByteSlice()
g.outputChan <- pmtPktAsByteSlice
pkt.PCR = g.genPcr()
pusi = false
}
g.tsChan <- &pkt
pktAsBytelice, _ := pkt.ToByteSlice()
g.outputChan<-pktAsBytelice
}
}
}

View File

@ -26,4 +26,4 @@ LICENSE
along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses).
*/
package tsgenerator
package generator

View File

@ -1,5 +1,11 @@
package parser
import (
//"bitbucket.org/ausocean/av/itut"
"../itut"
_"fmt"
)
type h264Parser struct {
inputBuffer []byte
isParsing bool

View File

@ -1,5 +1,10 @@
package parser
import (
//"bitbucket.org/ausocean/av/itut"
_"fmt"
)
type mjpegParser struct {
inputBuffer []byte
isParsing bool

View File

@ -29,7 +29,6 @@ package parser
import (
//"bitbucket.org/ausocean/av/itut"
"../itut"
"log"
"sync"
_"fmt"

View File

@ -1,5 +1,17 @@
package revid
import (
"errors"
"strconv"
//"bitbucket.org/ausocean/av/parser"
//"bitbucket.org/ausocean/av/tsgenerator"
//"bitbucket.org/ausocean/av/ringbuffer"
//"bitbucket.org/ausocean/utils/smartLogger"
"../../utils/smartLogger"
)
// Config provides parameters relevant to a revid instance. A new config must
// be passed to the constructor.
type Config struct {
@ -7,7 +19,7 @@ type Config struct {
InputCodec uint8
Output uint8
RtmpEncodingMethod uint8
FramesPerClip uint
FramesPerClip int
RtmpUrl string
Bitrate string
OutputFileName string
@ -30,7 +42,7 @@ const (
Rtp = 2
H264Codec = 3
File = 4
HttpOut = 5
Http = 5
H264 = 6
Mjpeg = 7
None = 8
@ -93,7 +105,7 @@ func (config *Config) Validate(r *revidInst) error {
}
switch config.Output {
case HttpOut:
case Http:
case File:
case Rtmp:
switch config.RtmpEncodingMethod {
@ -108,7 +120,7 @@ func (config *Config) Validate(r *revidInst) error {
}
case NothingDefined:
r.Log(Warning, "No output defined, defaulting to httpOut!")
config.Output = HttpOut
config.Output = Http
default:
return errors.New("Bad output type defined in config!")
}
@ -116,6 +128,7 @@ func (config *Config) Validate(r *revidInst) error {
switch config.Packetization {
case None:
case Mpegts:
case Flv:
case NothingDefined:
r.Log(Warning, "No packetization option defined, defaulting to none!")
config.Packetization = None
@ -185,4 +198,5 @@ func (config *Config) Validate(r *revidInst) error {
return errors.New("Bad quantization defined in config!")
}
}
return nil
}

View File

@ -44,12 +44,11 @@ import (
//"bitbucket.org/ausocean/av/parser"
//"bitbucket.org/ausocean/av/tsgenerator"
"../generator"
"../parser"
"../tsgenerator"
//"bitbucket.org/ausocean/av/ringbuffer"
//"bitbucket.org/ausocean/utils/smartLogger"
"../../utils/smartLogger"
"../ringbuffer"
)
@ -58,7 +57,7 @@ const (
clipDuration = 1 // s
mp2tPacketSize = 188 // MPEG-TS packet size
mp2tMaxPackets = 2016 * clipDuration // # first multiple of 7 and 8 greater than 2000
ringBufferSize = 100 / clipDuration
ringBufferSize = 1000 / clipDuration
ringBufferElementSize = 10000000
maxClipSize = 100000
httpTimeOut = 5 // s
@ -97,16 +96,18 @@ type revidInst struct {
isRunning bool
outputFile *os.File
inputFile *os.File
generator generator.TsGenerator
generator generator.Generator
parser parser.Parser
cmd *exec.Cmd
ffmpegCmd *exec.Cmd
inputReader *bufio.Reader
ffmpegStdin io.WriteCloser
outputChan chan []byte
configureOutput func()
setupInput func() error
setupOutput func() error
getFrame func() []byte
flushData func()
sendClip func(clip []byte) error
}
// NewRevidInstance returns a pointer to a new revidInst with the desired
@ -140,47 +141,48 @@ func (r *revidInst) ChangeState(config Config) error {
if err != nil {
return errors.New("Config struct is bad!: " + err.Error())
}
r.config = config
switch r.config.Output {
case File:
r.outputFile, err = os.Create(r.config.OutputFileName)
if err != nil {
return nil, err
}
configureOutput = configureOutputForFile
r.sendClip = r.sendClipToFile
r.setupOutput = r.setupOutputForFile
case Rtmp:
configureOutput = configureOutputForRtmp
r.setupOutput = r.setupOutputForRtmp
r.sendClip = r.sendClipToRtmp
case Http:
r.sendClip = r.sendClipToHTTP
}
switch r.config.Input {
case Raspivid:
configureInput = configureInputForRaspivid
r.setupInput = r.setupInputForRaspivid
case File:
configureInput = configureInputForFile
r.setupInput = r.setupInputForFile
}
switch r.config.InputCodec {
case H264:
r.Log(Info, "Using H264 parser!")
r.parser = parser.NewH264Parser()
fmt.Println("here")
case Mjpeg:
r.Log(Info, "Using MJPEG parser!")
r.parser = parser.NewMJPEGParser(mjpegParserInChanLen)
}
if r.config.Packetization == None {
r.parser.SetOutputChan(r.outputChan)
getFrame = getFrameForNoPacketization
r.getFrame = r.getFrameNoPacketization
} else {
switch r.config.Packetization {
case Mpegts:
frameRateAsInt, _ := strconv.Atoi(r.config.FrameRate)
r.generator = tsgenerator.NewTsGenerator(uint(frameRateAsInt))
r.generator = generator.NewTsGenerator(uint(frameRateAsInt))
case Flv:
frameRateAsInt, _ := strconv.Atoi(r.config.FrameRate)
r.generator = flvGenerator.NewFlvGenerator(uint(frameRateAsInt))
r.generator = generator.NewFlvGenerator(false, true, uint(frameRateAsInt))
}
getFrame = getFrameForPacketization
r.getFrame = r.getFramePacketization
r.parser.SetOutputChan(r.generator.GetInputChan())
r.generator.Start()
}
r.config = config
return nil
}
@ -235,7 +237,7 @@ func (r *revidInst) getFrameNoPacketization() []byte {
// Start invokes a revidInst to start processing video from a defined input
// and packetising to a defined output.
func (r *revidInst) getFrameWithPacketization() []byte {
func (r *revidInst) getFramePacketization() []byte {
return <-(r.generator.GetOutputChan())
}
@ -250,7 +252,7 @@ func (r *revidInst) flushDataNoPacketisation(){
// Start invokes a revidInst to start processing video from a defined input
func (r *revidInst) flushDataMpegtsPacketisation() {
for len(r.generator.GetOutputChan()) > 0 {
<-(r.generator.GetTsOutputChan())
<-(r.generator.GetOutputChan())
}
}
@ -259,13 +261,11 @@ func (r *revidInst) flushDataMpegtsPacketisation(){
func (r *revidInst) packClips() {
clipSize := 0
packetCount := 0
now := time.Now()
prevTime := now
for {
if clip, err := r.ringBuffer.Get(); err != nil {
r.Log(Error, err.Error())
r.Log(Warning, "Clearing output chan!")
r.flushOutputData()
r.flushData()
} else {
for {
frame := r.getFrame()
@ -274,7 +274,6 @@ func (r *revidInst) packClips() {
copy(clip[clipSize:upperBound], frame)
packetCount++
clipSize += lenOfFrame
now = time.Now()
if packetCount >= r.config.FramesPerClip {
if err := r.ringBuffer.DoneWriting(clipSize); err != nil {
r.Log(Error, err.Error())
@ -282,7 +281,6 @@ func (r *revidInst) packClips() {
}
clipSize = 0
packetCount = 0
prevTime = now
break
}
}
@ -329,6 +327,9 @@ func (r *revidInst) outputClips() {
prevTime = now
bytes = 0
}
} else {
r.Log(Debug, err.Error())
time.Sleep(1*time.Second)
}
}
}
@ -336,7 +337,7 @@ func (r *revidInst) outputClips() {
// Start invokes a revidInst to start processing video from a defined input
// and packetising to a defined output.
func (r *revidInst) sendClipToFile(clip []byte) error {
err := r.outputFile.Write(clip)
_,err := r.outputFile.Write(clip)
if err != nil {
return err
}
@ -349,10 +350,11 @@ func (r *revidInst) sendClipToHTTP(clip []byte) error {
client := http.Client{
Timeout: timeout,
}
url := r.config.HttpAddress+strconv.Itoa(len(clip))
r.Log(Debug, fmt.Sprintf("Posting %s (%d bytes)\n", url, len(clip)))
resp, err := client.Post(r.config.HttpAddress + strconv.Itoa(len(clip)), "video/mp2t", bytes.NewReader(clip)) // lighter than NewBuffer
resp, err := client.Post(url, "video/mp2t", bytes.NewReader(clip)) // lighter than NewBuffer
if err != nil {
return fmt.Errorf("Error posting to %s: %s", output, err)
return fmt.Errorf("Error posting to %s: %s", url, err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
@ -377,7 +379,7 @@ func (r *revidInst) sendClipToRtmp(clip []byte) error {
// Start invokes a revidInst to start processing video from a defined input
// and packetising to a defined output.
func (r *revidInst) setupOutputForRtmp(){
func (r *revidInst) setupOutputForRtmp() error {
r.ffmpegCmd = exec.Command(ffmpegPath,
"-f", "h264",
"-r", r.config.FrameRate,
@ -398,19 +400,25 @@ func (r *revidInst) setupOutputForRtmp(){
if err != nil {
r.Log(Error, err.Error())
r.Stop()
return
return err
}
err = r.ffmpegCmd.Start()
if err != nil {
r.Log(Error, err.Error())
r.Stop()
return
return err
}
return nil
}
func (r *revidInst) setupOutputForFile() (err error) {
r.outputFile, err = os.Create(r.config.OutputFileName)
return
}
// Start invokes a revidInst to start processing video from a defined input
// and packetising to a defined output.
func (r *revidInst) setupInputForRaspivid(){
func (r *revidInst) setupInputForRaspivid() error {
r.Log(Info, "Starting raspivid!")
switch r.config.InputCodec {
case H264:
@ -441,14 +449,16 @@ func (r *revidInst) setupInputForRaspivid(){
r.inputReader = bufio.NewReader(stdout)
if err != nil {
r.Log(Error, err.Error())
return
return err
}
go r.readCamera()
return nil
}
// Start invokes a revidInst to start processing video from a defined input
func (r *revidInst) setupInputForFile(){
func (r *revidInst) setupInputForFile() error {
go r.readFile()
return nil
}
// readCamera reads data from the defined camera while the revidInst is running.
@ -471,33 +481,32 @@ func (r *revidInst) readCamera() {
}
// readFile reads data from the defined file while the revidInst is running.
func (r *revidInst) readFile() {
for {
func (r *revidInst) readFile() error {
if len(r.parser.GetInputChan()) == 0 {
var err error
r.inputFile, err = os.Open(r.config.InputFileName)
if err != nil {
r.Log(Error, err.Error())
r.Stop()
return
return err
}
stats, err := r.inputFile.Stat()
if err != nil {
r.Log(Error, "Could not get input file stats!")
r.Stop()
return
return err
}
data := make([]byte, stats.Size())
_, err = r.inputFile.Read(data)
if err != nil {
r.Log(Error, err.Error())
r.Stop()
return
return err
}
for i := range data {
r.parser.GetInputChan() <- data[i]
}
r.inputFile.Close()
}
}
return nil
}