mirror of https://bitbucket.org/ausocean/av.git
Updating remote
Got h264 parsing working. Got testing utilities for this. Working on modifying code to be more robust, with mpegts. Adding pcr and pts
This commit is contained in:
parent
f37a073824
commit
df5ff04fd1
|
@ -57,21 +57,36 @@ const (
|
|||
)
|
||||
|
||||
type MpegTsPacket struct {
|
||||
SyncByte byte
|
||||
TEI bool // Transport Error Indicator
|
||||
PUSI bool // Payload Unit Start Indicator
|
||||
Priority bool
|
||||
PID uint16
|
||||
TSC byte // Transport Scrambling Control
|
||||
AFC byte // Adaption Field Control
|
||||
CC byte // Continuity Counter
|
||||
AF []byte // Adaption Field
|
||||
Payload []byte
|
||||
// syncByte byte // (octet:0 bit:0 - octet:0 bit:7)
|
||||
TEI bool // (octet:1 bit:0) Transport Error Indicator
|
||||
PUSI bool // (octet:1 bit:1) Payload Unit Start Indicator
|
||||
Priority bool // (octet:1 bit:2) Tranposrt priority indicator
|
||||
PID uint16 // (octet:1 bit:3 - octect:3 bit:7) Packet identifier
|
||||
TSC byte // (octet:4 bit:0 - octect:4 bit:1) Transport Scrambling Control
|
||||
AFC byte // (octet:4 bit:2 - octect:4 bit:3) Adaption Field Control
|
||||
CC byte // (octet:4 bit:4 - octect:4 bit:7) Continuity Counter
|
||||
AFL byte // (octet:5 bit:0 - octect:5 bit:7) Adaptation field length
|
||||
DI bool // (octet:6 bit:0) Discontinouty indicator
|
||||
RAI bool // (octet:6 bit:1) random access indicator
|
||||
ESPI bool // (octet:6 bit:2) Elementary stream priority indicator
|
||||
PCRF bool // (octet:6 bit:3) pcr flag
|
||||
OPCRF bool // (octet:6 bit:4) opcr flag
|
||||
SPF bool // (octet:6 bit:5) splicing point flag
|
||||
TPDF bool // (octet:6 bit:6) transport private data flag
|
||||
AFEF bool // (octet:6 bit:7) adaptation field extension flag
|
||||
PCR uint64 // (optional 48 bits) program clock reference
|
||||
OPCR uint64 // (optional 48 bits) Original program clock reference
|
||||
SC byte // (optional 8 bits) splice countdown
|
||||
TPDL byte // (optional 8 bits) tranposrt private data length
|
||||
TPD []byte // (optional variable length) private data
|
||||
Extension []byte // (optional variable length) adaptation field extension
|
||||
Stuffing []byte // (optional variable length) stuffing bytes
|
||||
Payload []byte // (optional variable length) mpeg ts payload
|
||||
}
|
||||
|
||||
func (p *MpegTsPacket) ToByteSlice() (output []byte) {
|
||||
output = make([]byte, 188)
|
||||
output[0] = p.SyncByte
|
||||
output[0] = 0x47 // sync byte always the same
|
||||
output[1] = (boolToByte(p.TEI) << (7 - teiIndex%8)) |
|
||||
(boolToByte(p.PUSI) << (7 - pusiIndex%8)) |
|
||||
(boolToByte(p.Priority) << (7 - priorityIndex%8)) |
|
||||
|
|
|
@ -31,6 +31,7 @@ package packets
|
|||
import (
|
||||
_"os"
|
||||
_"fmt"
|
||||
"reflect"
|
||||
)
|
||||
|
||||
func boolToByte(in bool) (out uint8) {
|
||||
|
@ -52,28 +53,26 @@ func getEndBit(p *RtpPacket) byte {
|
|||
return (p.Payload[1] & 0x40) >> 6
|
||||
}
|
||||
|
||||
func ParseRawH264(buffer []byte, outputChan chan<- []byte) {
|
||||
func ParseH264Buffer(buffer []byte, outputChan chan<- []byte) {
|
||||
startCode1 := []byte{0x00,0x00,0x01}
|
||||
startCode2 := []byte{0x00,0x00,0x00,0x01}
|
||||
for i := 0; i < len(buffer); i++ {
|
||||
i, start := func() (int,bool) {
|
||||
var start bool
|
||||
i, start = func() (int,bool) {
|
||||
switch{
|
||||
case buffer[i:i+3] == []byte{0x00,0x00,0x01}:
|
||||
return 3, true
|
||||
case buffer[i:i+4] == []byte{0x00,0x00,0x00,0x01}:
|
||||
return 4, true
|
||||
default:
|
||||
return 4, false
|
||||
case reflect.DeepEqual(buffer[i:i+3],startCode1):
|
||||
return i+3, true
|
||||
case reflect.DeepEqual(buffer[i:i+4],startCode2):
|
||||
return i+4, true
|
||||
}
|
||||
}
|
||||
if start {
|
||||
nalHeader := buffer[i]
|
||||
nalType := nalHeader & 0x1F
|
||||
if nalType == 1 || nalType == 5 {
|
||||
for ; i < len(buffer) && buffer[i+1:i+4] != []byte{0x00,0x00,0x01} &&
|
||||
buffer[i+1:i+5] != []byte{0x00,0x00,0x00,0x01}; i++ {}
|
||||
outputChan<-append(append([]byte{0x00,0x00,0x01},[]byte{0x09,0xF0}...),buffer[0:i]...)
|
||||
buffer = buffer[i+1:]
|
||||
return i, false
|
||||
}()
|
||||
if nalType := buffer[i] & 0x1F; start && ( nalType == 1 || nalType == 5) {
|
||||
for ; i < len(buffer) && !(i+3 < len(buffer) && ( reflect.DeepEqual(buffer[i:i+3],startCode1) ||
|
||||
reflect.DeepEqual(buffer[i:i+4],startCode2))); i++ {}
|
||||
outputChan<-append(append(startCode1,[]byte{0x09,0xF0}...),buffer[:i]...)
|
||||
buffer = buffer[i:]
|
||||
i=0
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,7 +37,7 @@ import (
|
|||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
"math/rand"
|
||||
_"math/rand"
|
||||
|
||||
"github.com/beatgammit/rtsp"
|
||||
)
|
||||
|
@ -540,25 +540,24 @@ func TestH264Parsing(t *testing.T) {
|
|||
4,34,2,3,2,0,0,1,1,3,4,5,5,76,4,234,78,65,34,34,43,45,
|
||||
}
|
||||
nalAccess1 := []byte{
|
||||
0,0,1,7,59,100,45,82,93,0,0,1,8,23,78,65,0,0,1,6,45,34,23,3,2,0,0,1,5,3,4,5,
|
||||
0,0,1,9,240,0,0,1,7,59,100,45,82,93,0,0,1,8,23,78,65,0,0,1,6,45,34,23,3,2,0,0,1,5,3,4,5,
|
||||
56,76,4,234,78,65,34,34,43,
|
||||
}
|
||||
nalAccess2 := []byte{
|
||||
0,0,1,7,67,10,45,8,93,0,0,1,8,23,7,5,0,0,1,6,
|
||||
0,0,1,9,240,0,0,1,7,67,10,45,8,93,0,0,1,8,23,7,5,0,0,1,6,
|
||||
4,34,2,3,2,0,0,1,1,3,4,5,5,76,4,234,78,65,34,34,43,45,
|
||||
}
|
||||
|
||||
aChannel := make(chan []byte, 10)
|
||||
var nalAccessChan chan<- []byte
|
||||
nalAccessChan = aChannel
|
||||
go parseH264Buffer(someData,nalAccessChan)
|
||||
anAccessUnit := <-nalAccessChan
|
||||
go ParseH264Buffer(someData,nalAccessChan)
|
||||
anAccessUnit := <-aChannel
|
||||
for i := range anAccessUnit {
|
||||
if anAccessUnit[i] != nalAccess1[i] {
|
||||
t.Errorf("Should have been equal!")
|
||||
}
|
||||
}
|
||||
anAccessUnit = <-nalAccessChan
|
||||
anAccessUnit = <-aChannel
|
||||
for i := range anAccessUnit {
|
||||
if anAccessUnit[i] != nalAccess2[i] {
|
||||
t.Errorf("Should have been equal!")
|
||||
|
|
|
@ -1,11 +0,0 @@
|
|||
package main
|
||||
|
||||
import "../packets"
|
||||
const (
|
||||
fileName = "out.h264"
|
||||
)
|
||||
|
||||
func main(){
|
||||
converter := packets.NewRtpToTsConverter()
|
||||
packets.ParseRawH264(fileName,&converter.NalInputChan)
|
||||
}
|
BIN
revid/bus.mp4
BIN
revid/bus.mp4
Binary file not shown.
|
@ -0,0 +1,592 @@
|
|||
/*
|
||||
NAME
|
||||
revid - a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols.
|
||||
|
||||
DESCRIPTION
|
||||
See Readme.md
|
||||
|
||||
AUTHOR
|
||||
Alan Noble <anoble@gmail.com>
|
||||
|
||||
LICENSE
|
||||
revid is Copyright (C) 2017 Alan Noble.
|
||||
|
||||
It is free software: you can redistribute it and/or modify them
|
||||
under the terms of the GNU General Public License as published by the
|
||||
Free Software Foundation, either version 3 of the License, or (at your
|
||||
option) any later version.
|
||||
|
||||
It is distributed in the hope that it will be useful, but WITHOUT
|
||||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses).
|
||||
*/
|
||||
|
||||
// revid is a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols.
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"crypto/md5"
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"math/rand"
|
||||
"net"
|
||||
"net/http"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
"os"
|
||||
|
||||
"../packets"
|
||||
|
||||
"bitbucket.org/ausocean/av/ringbuffer"
|
||||
|
||||
"github.com/Comcast/gots/packet"
|
||||
"github.com/Comcast/gots/packet/adaptationfield"
|
||||
"github.com/Comcast/gots/psi"
|
||||
)
|
||||
|
||||
// defaults and networking consts
|
||||
const (
|
||||
clipDuration = 1 // s
|
||||
defaultPID = 256
|
||||
defaultFrameRate = 25
|
||||
defaultHTTPOutput = "http://localhost:8080?"
|
||||
defaultUDPOutput = "udp://0.0.0.0:16384"
|
||||
defaultRTPOutput = "rtp://0.0.0.0:16384"
|
||||
mp2tPacketSize = 188 // MPEG-TS packet size
|
||||
mp2tMaxPackets = 2016 * clipDuration // # first multiple of 7 and 8 greater than 2000
|
||||
udpPackets = 7 // # of UDP packets per ethernet frame (8 is the max)
|
||||
rtpPackets = 7 // # of RTP packets per ethernet frame (7 is the max)
|
||||
rtpHeaderSize = 12
|
||||
rtpSSRC = 1 // any value will do
|
||||
bufferSize = 1000 / clipDuration
|
||||
bitrateOutputDelay = 60 // s
|
||||
httpTimeOut = 5 // s
|
||||
motionThreshold = "0.0025"
|
||||
qscale = "3"
|
||||
rtpPort = 17300
|
||||
rtcpPort = 17319
|
||||
rtspUrl = "rtsp://192.168.0.50:8554/CH002.sdp"
|
||||
rtpUrl = "rtsp://192.168.0.50:8554/CH002.sdp/track1"
|
||||
inputFileName = "testInput.h264"
|
||||
)
|
||||
|
||||
// flag values
|
||||
const (
|
||||
filterFixPTS = 0x0001
|
||||
filterDropAudio = 0x0002
|
||||
filterScale640 = 0x0004
|
||||
filterScale320 = 0x0008
|
||||
filterFixContinuity = 0x0010
|
||||
filterEdgeDetection = 0x0020
|
||||
filterMotionDetect = 0x0040
|
||||
filterRepacket = 0x0080
|
||||
dumpProgramInfo = 0x0100 // 256
|
||||
dumpPacketStats = 0x0200 // 512
|
||||
dumpPacketHeader = 0x0400 // 1024
|
||||
dumpPacketPayload = 0x0800 // 2048
|
||||
)
|
||||
|
||||
// globals
|
||||
var (
|
||||
sendClip = sendClipToRTP
|
||||
packetsPerFrame = rtpPackets
|
||||
clipCount int
|
||||
expectCC int
|
||||
dumpCC int
|
||||
dumpPCRBase uint64
|
||||
rtpSequenceNum uint16
|
||||
conn net.Conn
|
||||
ffmpegPath string
|
||||
tempDir string
|
||||
inputErrChan chan error
|
||||
outputErrChan chan error
|
||||
ringBuffer ringbuffer.RingBuffer
|
||||
)
|
||||
|
||||
// command-line flags
|
||||
var (
|
||||
inputURL = flag.String("i", "", "Input RTSP URL")
|
||||
outputURL = flag.String("o", "", "Output URL (HTTP, UDP or RTP)")
|
||||
mode = flag.String("m", "r", "Mode: one of f,h,u,r or d")
|
||||
flags = flag.Int("f", 0, "Flags: see readme for explanation")
|
||||
frameRate = flag.Int("r", defaultFrameRate, "Input video frame rate (25fps by default)")
|
||||
selectedPID = flag.Int("p", defaultPID, "Select packets with this packet ID (PID)")
|
||||
)
|
||||
|
||||
func main() {
|
||||
setUpDirs()
|
||||
flag.Parse()
|
||||
|
||||
if *inputURL == "" {
|
||||
log.Fatal("Input (-i) required\n")
|
||||
}
|
||||
|
||||
switch *mode {
|
||||
case "f":
|
||||
sendClip = sendClipToFile
|
||||
case "h":
|
||||
sendClip = sendClipToHTTP
|
||||
if *outputURL == "" {
|
||||
*outputURL = defaultHTTPOutput
|
||||
}
|
||||
case "u":
|
||||
sendClip = sendClipToUDP
|
||||
packetsPerFrame = udpPackets
|
||||
if *outputURL == "" {
|
||||
*outputURL = defaultUDPOutput
|
||||
}
|
||||
case "r":
|
||||
sendClip = sendClipToRTP
|
||||
packetsPerFrame = rtpPackets
|
||||
if *outputURL == "" {
|
||||
*outputURL = defaultRTPOutput
|
||||
}
|
||||
case "d":
|
||||
//sendClip = sendClipToStdout
|
||||
default:
|
||||
log.Fatalf("Invalid mode %s\n", *mode)
|
||||
}
|
||||
|
||||
if *flags&filterFixContinuity != 0 && *flags&dumpProgramInfo != 0 {
|
||||
log.Fatal("Cannot combine filterFixContinuity and dumpProgramInfo flags\n")
|
||||
}
|
||||
|
||||
ringBuffer = ringbuffer.NewRingBuffer(bufferSize, mp2tPacketSize*mp2tMaxPackets)
|
||||
inputErrChan = make(chan error, 10)
|
||||
outputErrChan = make(chan error, 10)
|
||||
|
||||
go input(*inputURL, *outputURL)
|
||||
go output(*outputURL)
|
||||
|
||||
for {
|
||||
select {
|
||||
default:
|
||||
case err := <-inputErrChan:
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
fmt.Fprintln(os.Stderr, "Trying again in 10s")
|
||||
time.Sleep(10 * time.Second)
|
||||
go input(*inputURL, *outputURL)
|
||||
case err := <-outputErrChan:
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
fmt.Fprintln(os.Stderr, "Attempting to write again!")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// setUpDirs sets directories based on the OS that Revid is running on
|
||||
func setUpDirs() {
|
||||
switch runtime.GOOS {
|
||||
case "windows":
|
||||
ffmpegPath = "C:/ffmpeg/ffmpeg"
|
||||
tempDir = "tmp/"
|
||||
case "darwin":
|
||||
ffmpegPath = "/usr/local/bin/ffmpeg"
|
||||
tempDir = "/tmp/"
|
||||
default:
|
||||
ffmpegPath = "/home/$USER/bin/ffmpeg"
|
||||
tempDir = "/output/"
|
||||
}
|
||||
}
|
||||
|
||||
// input handles the reading from the specified input
|
||||
func input(input string, output string) {
|
||||
fmt.Printf("Reading video from %s\n", input)
|
||||
// (re)initialize globals
|
||||
clipCount = 0
|
||||
expectCC = -1
|
||||
dumpCC = -1
|
||||
dumpPCRBase = 0
|
||||
rtpSequenceNum = uint16(rand.Intn(1 << 15))
|
||||
|
||||
// for UDP and RTP only dial once
|
||||
var err error
|
||||
if strings.HasPrefix(output, "udp://") || strings.HasPrefix(output, "rtp://") {
|
||||
conn, err = net.Dial("udp", output[6:])
|
||||
if err != nil {
|
||||
inputErrChan <- err
|
||||
return
|
||||
}
|
||||
}
|
||||
converter := packets.NewRtpToTsConverter()
|
||||
// Open the h264 file
|
||||
file, err := os.Open(inputFileName)
|
||||
if err != nil {
|
||||
panic("Could not open file!")
|
||||
return
|
||||
}
|
||||
stats, err := file.Stat()
|
||||
if err != nil {
|
||||
panic("Could not get file stats!")
|
||||
}
|
||||
buffer := make([]byte, stats.Size())
|
||||
_, err = file.Read(buffer)
|
||||
if err != nil {
|
||||
panic("Could not read file!")
|
||||
}
|
||||
|
||||
// Start parsing the h264 file and send nal access units to the converter
|
||||
go packets.ParseH264Buffer(buffer,converter.NalInputChan)
|
||||
go converter.Convert()
|
||||
clipSize := 0
|
||||
packetCount := 0
|
||||
now := time.Now()
|
||||
prevTime := now
|
||||
fmt.Printf("Looping\n")
|
||||
|
||||
startPackets := [][]byte{
|
||||
{71,64,17,16,0,66,240,65,0,1,193,0,0,255,1,255,0,1,252,128,48,72,46,1,6,70,70,109,112,101,103,37,115,116,114,101,97,109,101,100,32,98,121,32,116,104,101,32,71,101,111,86,105,115,105,111,110,32,82,116,115,112,32,83,101,114,118,101,114,99,176,214,195,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255},
|
||||
{71,64,0,16,0,0,176,13,0,1,193,0,0,0,1,240,0,42,177,4,178,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255},
|
||||
/*PMT*/{71,80,0,16,
|
||||
/*Start of payload*/
|
||||
0,2,176,18,0,1,193,0,0,255,255,240,0,27,225,0,240,0,193,91,65,224,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255},
|
||||
}
|
||||
|
||||
donePSI := false
|
||||
|
||||
for {
|
||||
if clip, err := ringBuffer.Get(); err != nil {
|
||||
inputErrChan <- err
|
||||
return
|
||||
} else {
|
||||
ii := 0
|
||||
for {
|
||||
upperBound := clipSize + mp2tPacketSize
|
||||
|
||||
if ii < 3 && !donePSI {
|
||||
packetByteSlice := startPackets[ii]
|
||||
copy(clip[clipSize:upperBound],packetByteSlice)
|
||||
ii++
|
||||
} else {
|
||||
donePSI = true
|
||||
packet := <-converter.TsChan
|
||||
packetByteSlice := packet.ToByteSlice()
|
||||
copy(clip[clipSize:upperBound],packetByteSlice)
|
||||
}
|
||||
//fmt.Println(clip[clipSize:upperBound])
|
||||
packetCount++
|
||||
clipSize += mp2tPacketSize
|
||||
// send if (1) our buffer is full or (2) 1 second has elapsed and we have % packetsPerFrame
|
||||
now = time.Now()
|
||||
if (packetCount == mp2tMaxPackets) ||
|
||||
(now.Sub(prevTime) > clipDuration*time.Second && packetCount%packetsPerFrame == 0) {
|
||||
clipCount++
|
||||
if err := ringBuffer.DoneWriting(clipSize); err != nil {
|
||||
inputErrChan <- err
|
||||
return
|
||||
}
|
||||
clipSize = 0
|
||||
packetCount = 0
|
||||
prevTime = now
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// output handles the writing to specified output
|
||||
func output(output string) {
|
||||
file, err := os.Create("output/saxonOut.ts")
|
||||
if err != nil {
|
||||
panic("Can't create output file!")
|
||||
}
|
||||
for {
|
||||
if clip, err := ringBuffer.Read(); err == nil {
|
||||
file.Write(clip)
|
||||
/*
|
||||
for err = sendClip(clip, output, conn); err != nil; {
|
||||
outputErrChan <- err
|
||||
//err = sendClip(clip, output, conn)
|
||||
// TODO: figure out how to write to single file
|
||||
|
||||
}
|
||||
*/
|
||||
if err := ringBuffer.DoneReading(); err != nil {
|
||||
outputErrChan <- err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// sendClipToFile writes a video clip to a /tmp file.
|
||||
func sendClipToFile(clip []byte, _ string, _ net.Conn) error {
|
||||
filename := fmt.Sprintf(tempDir+"vid%03d.ts", clipCount)
|
||||
fmt.Printf("Writing %s (%d bytes)\n", filename, len(clip))
|
||||
err := ioutil.WriteFile(filename, clip, 0644)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error writing file %s: %s", filename, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// sendClipToHTPP posts a video clip via HTTP, using a new TCP connection each time.
|
||||
func sendClipToHTTP(clip []byte, output string, _ net.Conn) error {
|
||||
timeout := time.Duration(httpTimeOut * time.Second)
|
||||
client := http.Client{
|
||||
Timeout: timeout,
|
||||
}
|
||||
hash := md5.Sum(clip)
|
||||
url := output + strconv.Itoa(len(clip)) + "." + hex.EncodeToString(hash[:]) // NB: append size.digest to output
|
||||
fmt.Printf("Posting %s (%d bytes)\n", url, len(clip))
|
||||
resp, err := client.Post(url, "video/mp2t", bytes.NewReader(clip)) // lighter than NewBuffer
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error posting to %s: %s", output, err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err == nil {
|
||||
fmt.Printf("%s\n", body)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// sendClipToUDP sends a video clip over UDP.
|
||||
func sendClipToUDP(clip []byte, _ string, conn net.Conn) error {
|
||||
size := udpPackets * mp2tPacketSize
|
||||
fmt.Printf("Sending %d UDP packets of size %d (%d bytes)\n", len(clip)/size, size, len(clip))
|
||||
for offset := 0; offset < len(clip); offset += size {
|
||||
pkt := clip[offset : offset+size]
|
||||
_, err := conn.Write(pkt)
|
||||
if err != nil {
|
||||
return fmt.Errorf("UDP write error %s. Is your player listening?", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// sendClipToRTP sends a video clip over RTP.
|
||||
func sendClipToRTP(clip []byte, _ string, conn net.Conn) error {
|
||||
size := rtpPackets * mp2tPacketSize
|
||||
fmt.Printf("Sending %d RTP packets of size %d (%d bytes)\n",
|
||||
len(clip)/size, size+rtpHeaderSize, len(clip))
|
||||
pkt := make([]byte, rtpHeaderSize+rtpPackets*mp2tPacketSize)
|
||||
for offset := 0; offset < len(clip); offset += size {
|
||||
rtpEncapsulate(clip[offset:offset+size], pkt)
|
||||
_, err := conn.Write(pkt)
|
||||
if err != nil {
|
||||
return fmt.Errorf("RTP write error %s. Is your player listening?", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// checkContinuityCounts checks that the continuity of the clip is correct
|
||||
func checkContinuityCounts(clip []byte) error {
|
||||
for offset := 0; offset < len(clip); offset += mp2tPacketSize {
|
||||
dumpCC = -1
|
||||
pkt := clip[offset : offset+mp2tPacketSize]
|
||||
cc := int(pkt[3] & 0xf)
|
||||
if dumpCC != -1 && cc != dumpCC {
|
||||
return fmt.Errorf("Continuity count out of order. Expected %v, Got: %v.", dumpCC, cc)
|
||||
}
|
||||
dumpCC = (cc + 1) % 16
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// sendClipToStdout dumps video stats to stdout.
|
||||
func sendClipToStdout(clip []byte) error {
|
||||
fmt.Printf("Dumping clip (%d bytes)\n", len(clip))
|
||||
/*
|
||||
if *flags&dumpProgramInfo != 0 {
|
||||
return mp2tDumpProgram(clip)
|
||||
}
|
||||
*/
|
||||
|
||||
packetCount := 0
|
||||
discontinuities := 0
|
||||
var cc int
|
||||
|
||||
for offset := 0; offset < len(clip); offset += mp2tPacketSize {
|
||||
packetCount++
|
||||
pkt := clip[offset : offset+mp2tPacketSize]
|
||||
|
||||
pktPID, err := packet.Pid(pkt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if pktPID != uint16(*selectedPID) {
|
||||
continue
|
||||
}
|
||||
|
||||
if *flags&(dumpPacketHeader|dumpPacketPayload) != 0 {
|
||||
fmt.Printf("Packet #%d.%d\n", clipCount, packetCount)
|
||||
}
|
||||
|
||||
hasPayload := pkt[3]&0x10 != 0
|
||||
if !hasPayload {
|
||||
continue // nothing to do
|
||||
}
|
||||
|
||||
// extract interesting info from header
|
||||
tei := pkt[1] & 0x80 >> 7
|
||||
pusi := pkt[1] & 0x40 >> 6
|
||||
tp := pkt[1] & 0x20 >> 5
|
||||
tcs := pkt[3] & 0xc0 >> 6
|
||||
afc := pkt[3] & 0x30 >> 4
|
||||
cc = int(pkt[3] & 0xf)
|
||||
di := pkt[5] & 0x80
|
||||
|
||||
if dumpCC != -1 && cc != dumpCC {
|
||||
discontinuities++
|
||||
fmt.Printf("Warning: Packet #%d.%d continuity counter out of order! Got %d, expected %d.\n",
|
||||
clipCount, packetCount, cc, dumpCC)
|
||||
}
|
||||
dumpCC = (cc + 1) % 16
|
||||
|
||||
//if *flags&dumpPacketHeader != 0 {
|
||||
fmt.Printf("\t\tTEI=%d, PUSI=%d, TP=%d, TSC=%d, AFC=%d, CC=%d\n", tei, pusi, tp, tcs, afc, cc)
|
||||
//}
|
||||
|
||||
if afc == 3 {
|
||||
// adaptation field, followed by payload
|
||||
afl := adaptationfield.Length(pkt)
|
||||
if adaptationfield.HasPCR(pkt) {
|
||||
pcrBase, pcrExt, _ := mp2tGetPCR(pkt)
|
||||
if *flags&dumpPacketHeader != 0 {
|
||||
fmt.Printf("\t\tAFL=%d, PCRbase=%d, PCRext=%d, DI=%v\n", afl, pcrBase, pcrExt, di)
|
||||
}
|
||||
if pcrBase < dumpPCRBase {
|
||||
fmt.Printf("Warning: PCRbase went backwards!\n")
|
||||
}
|
||||
dumpPCRBase = pcrBase
|
||||
} else if *flags&dumpPacketHeader != 0 {
|
||||
fmt.Printf("\t\tAFL=%d, DI=%v\n", afl, di)
|
||||
}
|
||||
}
|
||||
if *flags&dumpPacketPayload != 0 {
|
||||
fmt.Printf("\t\tPayload=%x\n", pkt)
|
||||
}
|
||||
|
||||
}
|
||||
if *flags&dumpPacketStats != 0 {
|
||||
fmt.Printf("%d packets of size %d bytes (%d bytes, %d discontinuites)\n",
|
||||
packetCount, packet.PacketSize, packetCount*packet.PacketSize, discontinuities)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// mp2tDumpProgram dumps MPEG-TS Program Association Table (PAT) and Program Map Tables (PMT).
|
||||
func mp2tDumpProgram(clip []byte) error {
|
||||
// NB: Comcast API requires a buffered reader
|
||||
reader := bufio.NewReader(bytes.NewReader(clip))
|
||||
|
||||
_, err := packet.Sync(reader)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error reading sync byte: %s", err)
|
||||
}
|
||||
pat, err := psi.ReadPAT(reader)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error reading PAT: %s", err)
|
||||
}
|
||||
mp2tDumpPat(pat)
|
||||
|
||||
var pmts []psi.PMT
|
||||
pm := pat.ProgramMap()
|
||||
for pn, pid := range pm {
|
||||
pmt, err := psi.ReadPMT(reader, pid)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error reading PMT: %s", err)
|
||||
}
|
||||
pmts = append(pmts, pmt)
|
||||
mp2tDumpPmt(pn, pmt)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func mp2tDumpPat(pat psi.PAT) {
|
||||
fmt.Printf("Pat\n")
|
||||
fmt.Printf("\tPMT PIDs %v\n", pat.ProgramMap())
|
||||
fmt.Printf("\tNumber of Programs %v\n", pat.NumPrograms())
|
||||
}
|
||||
|
||||
func mp2tDumpPmt(pn uint16, pmt psi.PMT) {
|
||||
// pn = program number
|
||||
fmt.Printf("Program #%v PMT\n", pn)
|
||||
fmt.Printf("\tPIDs %v\n", pmt.Pids())
|
||||
fmt.Printf("\tElementary Streams")
|
||||
for _, es := range pmt.ElementaryStreams() {
|
||||
fmt.Printf("\t\tPID %v : StreamType %v\n", es.ElementaryPid(), es.StreamType())
|
||||
for _, d := range es.Descriptors() {
|
||||
fmt.Printf("\t\t\t%+v\n", d)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Mp2tFixContinuity fixes discontinous MPEG-TS continuity counts (CC)
|
||||
func mp2tFixContinuity(pkt []byte, pid uint16) bool {
|
||||
|
||||
hasPayload, err := packet.ContainsPayload(pkt)
|
||||
if err != nil {
|
||||
fmt.Printf("Warning: Packet bad.\n")
|
||||
return false
|
||||
}
|
||||
if !hasPayload {
|
||||
return false
|
||||
}
|
||||
if pktPID, _ := packet.Pid(pkt); pktPID != pid {
|
||||
return false
|
||||
}
|
||||
fixed := false
|
||||
// extract continuity counter from 2nd nibble of 4th byte of header
|
||||
cc := int(pkt[3] & 0xf)
|
||||
if expectCC == -1 {
|
||||
expectCC = cc
|
||||
} else if cc != expectCC {
|
||||
fmt.Println("Have to fix")
|
||||
pkt[3] = pkt[3]&0xf0 | byte(expectCC&0xf)
|
||||
fixed = true
|
||||
}
|
||||
expectCC = (expectCC + 1) % 16
|
||||
return fixed
|
||||
}
|
||||
|
||||
// Mp2tGetPCR extracts the Program Clock Reference (PCR) from an MPEG-TS packet (if any)
|
||||
func mp2tGetPCR(pkt []byte) (uint64, uint32, bool) {
|
||||
if !adaptationfield.HasPCR(pkt) {
|
||||
return 0, 0, false
|
||||
}
|
||||
pcrBytes, _ := adaptationfield.PCR(pkt) // 6 bytes
|
||||
// first 33 bits are PCR base, next 6 bits are reserved, final 9 bits are PCR extension.
|
||||
pcrBase := uint64(binary.BigEndian.Uint32(pcrBytes[:4]))<<1 | uint64(pcrBytes[4]&0x80>>7)
|
||||
pcrExt := uint32(pcrBytes[4]&0x01)<<1 | uint32(pcrBytes[5])
|
||||
return pcrBase, pcrExt, true
|
||||
}
|
||||
|
||||
// rtpEncapsulate encapsulates MPEG-TS packets within an RTP header,
|
||||
// setting the payload type accordingly (to 33) and incrementing the RTP sequence number.
|
||||
func rtpEncapsulate(mp2tPacket []byte, pkt []byte) {
|
||||
// RTP packet encapsulates the MP2T
|
||||
// first 12 bytes is the header
|
||||
// byte 0: version=2, padding=0, extension=0, cc=0
|
||||
pkt[0] = 0x80 // version (2)
|
||||
// byte 1: marker=0, pt = 33 (MP2T)
|
||||
pkt[1] = 33
|
||||
// bytes 2 & 3: sequence number
|
||||
binary.BigEndian.PutUint16(pkt[2:4], rtpSequenceNum)
|
||||
if rtpSequenceNum == ^uint16(0) {
|
||||
rtpSequenceNum = 0
|
||||
} else {
|
||||
rtpSequenceNum++
|
||||
}
|
||||
// bytes 4,5,6&7: timestamp
|
||||
timestamp := uint32(time.Now().UnixNano() / 1e6) // ms timestamp
|
||||
binary.BigEndian.PutUint32(pkt[4:8], timestamp)
|
||||
// bytes 8,9,10&11: SSRC
|
||||
binary.BigEndian.PutUint32(pkt[8:12], rtpSSRC)
|
||||
|
||||
// payload follows
|
||||
copy(pkt[rtpHeaderSize:rtpHeaderSize+rtpPackets*mp2tPacketSize], mp2tPacket)
|
||||
}
|
BIN
revid/out.h264
BIN
revid/out.h264
Binary file not shown.
BIN
revid/out.mp4
BIN
revid/out.mp4
Binary file not shown.
BIN
revid/out.ts
BIN
revid/out.ts
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading…
Reference in New Issue