Initial revision

This commit is contained in:
Alan Noble 2017-09-13 14:30:26 +09:30
commit c8bfccb310
3 changed files with 658 additions and 0 deletions

1
contributors.txt Normal file
View File

@ -0,0 +1 @@
Alan Noble

110
revid/Readme.md Normal file
View File

@ -0,0 +1,110 @@
# Readme
revid is a testbed for re-muxing and re-directing video streams as
MPEG-TS over various protocols.
# Description
The mode (-m) determine the mode of operation:
* h = send HTTP (as a POST)
* u = send UDP
* r = send RTP
* f = write to /tmp files
* d = inspect packets and dump to screen
Flags (-f) determine video filtering and other actions.
For example, to send as raw UDP to <PORT> on the current host, passing the video and audio as is:
revid -i <RTSP_URL> -m u -o udp://0.0.0.0:<PORT>
Or, to post as HTTP to <HTTP_URL>, fixing PTS and dropping the audio along the way:
revid -i <RTSP_URL> -m h -f 3 -o <HTTP_URL>
Note that revid appends the size of the video to the URL to supply a query param.
Append a ? to your <URL> if you don't need it
List of flags:
* FilterFixPTS = 0x0001
* FilterDropAudio = 0x0002
* FilterScale640 = 0x0004
* FilterScale320 = 0x0008
* FilterFixContinuity = 0x0010
* DumpProgramInfo = 0x0100
* DumpPacketStats = 0x0200
* DumpPacketHeader = 0x0400
* DumpPacketPayload = 0x0800
Common flag combos:
* 3: fix pts and drop audio
* 7: fix pts, drop audo and scale 640
* 17: fix pts and fix continuity
* 256: dump program info
* 512: dump packet stats
* 513: fix pts, plus above
* 529: fix pts and fix continuity, plus above
# Errors
If you see "Error reading from ffmpeg: EOF" that means ffmpeg has
crashed for some reason, usually because of a bad parameter. Copy and
paste the ffmpeg command line into a terminal to see what is
happening.
RTSP feeds from certain cameras (such as TechView ones) do not
generate presentation timestamps (PTS), resulting in errors such as
the following:
* [mpegts @ 0xX] Timestamps are unset in a packet for stream 0...
* [mpegts @ 0xX] first pts value must be set
This can be fixed with an ffmpeg video filter (specified by flag 0x0001).
Another issue is that MPEG-TS continuity counters may not be continuous.
You can fix this with the fix continuity flag (0x0010).
FFmpeg will also complain if doesn't have the necessary audio codec
installed. If so, you can drop the audio (flag 0x0002).
OTES
MPEG2 TS stream clocks (PCR, PTS, and DTS) all have units of 1/90000
second and header fields are read as big endian (like most protocols).
* TEI = Transport Error Indicator
* PUSI = Payload Unit Start Indicator
* TP = Transport Priority
* TCS = Transport Scrambling Control
* AFC = Adapation Field Control
* CC = Continuity Counter
* AFL = Adapation Field Length
# Dependencies
revid uses ffmpeg for video remuxing.
See (Ffmepg filters)[https://ffmpeg.org/ffmpeg-filters.html].
revid also uses (Comcast's gots package)[https://github.com/Comcast/gots].
# Author
Alan Noble <anoble@gmail.com>
# License
revid is Copyright (C) 2017 Alan Noble.
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
or more details.
You should have received a copy of the GNU General Public License
along with NetReceiver in gpl.txt. If not, see [GNU licenses](http://www.gnu.orlicenses).

547
revid/revid.go Normal file
View File

@ -0,0 +1,547 @@
/*
NAME
revid - a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols.
DESCRIPTION
See Readme.md
AUTHOR
Alan Noble <anoble@gmail.com>
LICENSE
revid is Copyright (C) 2017 Alan Noble.
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with NetReceiver in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses).
*/
package main
import (
"bufio"
"bytes"
"encoding/binary"
"flag"
"fmt"
"github.com/Comcast/gots/packet"
"github.com/Comcast/gots/packet/adaptationfield"
"github.com/Comcast/gots/psi"
"io"
"io/ioutil"
"log"
"math/rand"
"net"
"net/http"
"os"
"os/exec"
"strconv"
"time"
)
// program modes
const (
File = iota
Http
Udp
Rtp
Dump
)
// defaults
const DefaultPid = 256
const DefaultFrameRate = 25
const DefaultInput = "rtsp://RTSP_URL"
const DefaultHttpOutput = "http://HTTP_URL"
const DefaultUdpOutput = "0.0.0.0:16384"
// packet/clip consts
const Mp2tPacketSize = 188 // MPEG-TS packet size
const UdpPackets = 7 // # of UDP packets per ethernet frame (8 is the max)
const RtpPackets = 7 // # of RTP packets per ethernet frame
const MaxPackets = 2240 // # first multiple of 7 and 8 greater than 2000
const ErrorSleep = 1000000000 // ns
// ffmepg consts
const FfmpegPath = "/usr/bin/ffmpeg"
// flags
const (
FilterFixPTS = 0x0001
FilterDropAudio = 0x0002
FilterScale640 = 0x0004
FilterScale320 = 0x0008
FilterFixContinuity = 0x0010
DumpProgramInfo = 0x0100 // 256
DumpPacketStats = 0x0200 // 512
DumpPacketHeader = 0x0400 // 1024
DumpPacketPayload = 0x0800 // 2048
)
// globals
var Flags uint32 = 0
var ExpectCC int = -1
// utility functions
func check(err error, msg string) {
if err != nil {
log.Fatal("%s: %s\n", msg, err)
}
}
func printlnf(format string, a ...interface{}) {
fmt.Printf(format+"\n", a...)
}
// MPEG-TS (MP2T) functions
func mp2tDumpPmt(pn uint16, pmt psi.PMT) {
// pn = program number
printlnf("Program #%v PMT", pn)
printlnf("\tPIDs %v", pmt.Pids())
println("\tElementary Streams")
for _, es := range pmt.ElementaryStreams() {
printlnf("\t\tPid %v : StreamType %v", es.ElementaryPid(), es.StreamType())
for _, d := range es.Descriptors() {
printlnf("\t\t\t%+v", d)
}
}
}
func mp2tDumpPat(pat psi.PAT) {
println("Pat")
printlnf("\tPMT PIDs %v", pat.ProgramMap())
printlnf("\tNumber of Programs %v", pat.NumPrograms())
}
func mp2tGetPcr(pkt []byte) (uint64, uint32, bool) {
if adaptationfield.HasPCR(pkt) {
pcrBytes, _ := adaptationfield.PCR(pkt) // 6 bytes
// first 33 bits are PCR base, next 6 bits are reserved, final 9 bits are PCR extension.
pcrBase := uint64(binary.BigEndian.Uint32(pcrBytes[0:4]))<<1 | uint64(pcrBytes[4]&0x80>>7)
pcrExt := uint32(pcrBytes[4]&0x01)<<1 | uint32(pcrBytes[5])
return pcrBase, pcrExt, true
} else {
return 0, 0, false
}
}
var DumpPcrBase uint64 = 0
var DumpExpectCC int = -1
func mp2tDump(tsData []byte, pid uint16) {
// verify if sync-byte is present and correct
reader := bufio.NewReader(bytes.NewReader(tsData))
_, err := packet.Sync(reader)
if err != nil {
fmt.Println("Warning: Bad sync byte")
return
}
if Flags&DumpProgramInfo != 0 {
// NB: ReadPat and ReadPMT consume packets so mess up continuity counting
pat, err := psi.ReadPAT(reader)
if err != nil {
fmt.Println(err)
return
}
mp2tDumpPat(pat)
var pmts []psi.PMT
pm := pat.ProgramMap()
for pn, pid := range pm {
pmt, err := psi.ReadPMT(reader, pid)
if err != nil {
panic(err)
}
pmts = append(pmts, pmt)
mp2tDumpPmt(pn, pmt)
}
}
pkt := make(packet.Packet, packet.PacketSize)
var packetCount uint64 = 0
discontinuities := 0
for {
if _, err := io.ReadFull(reader, pkt); err != nil {
if err == io.EOF {
break
}
println(err)
return
}
packetCount++
if Flags&(DumpPacketHeader|DumpPacketPayload) != 0 {
fmt.Printf("Packet #%d\n", packetCount)
}
if Flags&FilterFixContinuity != 0 {
fixed := mp2tFixContinuity((*[]byte)(&pkt), packetCount, pid)
if fixed {
fmt.Printf("Packet #%d fixed!\n", packetCount)
}
}
hasPayload, err := packet.ContainsPayload(pkt)
if err != nil {
fmt.Printf("Packet #%d bad\n", packetCount)
continue
}
if !hasPayload {
continue // nothing to do
}
pktPid, err := packet.Pid(pkt)
if err != nil || pktPid != pid {
continue
}
// extract interesting info from header
headerByte1 := uint8(pkt[1])
headerByte3 := uint8(pkt[3])
tei := headerByte1 & 0x80 >> 7
pusi := headerByte1 & 0x40 >> 6
tp := headerByte1 & 0x20 >> 5
tcs := headerByte3 & 0xc0 >> 6
afc := headerByte3 & 0x30 >> 4
cc := int(headerByte3 & 0x0f)
if Flags&DumpPacketHeader != 0 {
fmt.Printf("\t\tTEI=%d, PUSI=%d, TP=%d, TSC=%d, AFC=%d, CC=%d\n", tei, pusi, tp, tcs, afc, cc)
}
// NB: don't misreport discontinuities when dumping program info
if Flags&DumpProgramInfo == 0 && DumpExpectCC != -1 && cc != DumpExpectCC {
discontinuities += 1
fmt.Printf("Warning: Packet #%d continuity counter out of order! Got %d, expected %d.\n",
packetCount, cc, DumpExpectCC)
}
DumpExpectCC = (cc + 1) % 16
if afc == 3 {
// adaptation field, followed by payload
afl := adaptationfield.Length(pkt)
if adaptationfield.HasPCR(pkt) {
pcrBase, pcrExt, _ := mp2tGetPcr(pkt)
if Flags&DumpPacketHeader != 0 {
fmt.Printf("\t\tAFL=%d, PCRbase=%d, PCRext=%d\n", afl, pcrBase, pcrExt)
}
if pcrBase < DumpPcrBase {
fmt.Printf("Warning: PCRbase went backwards!\n")
}
DumpPcrBase = pcrBase
} else if Flags&DumpPacketHeader != 0 {
fmt.Printf("\t\tAFL=%d\n", afl)
}
}
if Flags&DumpPacketPayload != 0 {
fmt.Printf("\t\tPayload=%x\n", pkt)
}
}
if Flags&DumpPacketStats != 0 {
fmt.Printf("%d packets of size %d bytes (%d bytes, %d discontinuites)\n",
packetCount, packet.PacketSize, packetCount*packet.PacketSize, discontinuities)
}
}
// fix continuity counter
func mp2tFixContinuity(pkt *[]byte, packetCount uint64, pid uint16) bool {
hasPayload, err := packet.ContainsPayload(*pkt)
if err != nil {
fmt.Printf("Warning: Packet #%d bad.\n", packetCount)
return false
}
if !hasPayload {
return false // nothing to do
}
if pktPid, _ := packet.Pid(*pkt); pktPid != pid {
return false // nothing to do
}
fixed := false
// extract continuity counter from 2nd nibble of 4th byte of header
afc := (*pkt)[3] & 0x30 >> 4
cc := int((*pkt)[3] & 0xf)
if afc&0x01 == 1 {
if ExpectCC != -1 && cc != ExpectCC {
(*pkt)[3] = (*pkt)[3]&0xf0 | byte(ExpectCC)
fixed = true
}
ExpectCC = (cc + 1) % 16
}
return fixed
}
// RTP functions
const RtpHeaderSize = 12
const RtpSSRC = 1 // any value seems to work
var RtpSequenceNum uint16 = 0
func rtpInit() {
// per spec, intialize RTP sequence number with a random number
RtpSequenceNum = uint16(rand.Intn(1 << 15))
fmt.Printf("Initial RTP sequence number %d\n", RtpSequenceNum)
}
func rtpEncapsulate(mp2tPacket []byte, pkt *[]byte) {
// RTP packet encapsulates the MP2T
// first 12 bytes is the header
// byte 0: version=2, padding=0, extension=0, cc=0
(*pkt)[0] = 0x80 // version (2)
// byte 1: marker=0, pt = 33 (MP2T)
(*pkt)[1] = 33
// bytes 2 & 3: sequence number
binary.BigEndian.PutUint16((*pkt)[2:4], RtpSequenceNum)
if RtpSequenceNum == ^uint16(0) {
RtpSequenceNum = 0
} else {
RtpSequenceNum += 1
}
// bytes 4,5,6&7: timestamp
timestamp := uint32(time.Now().UnixNano() * 1000) // ms timestamp
binary.BigEndian.PutUint32((*pkt)[4:8], timestamp)
// bytes 8,9,10&11: SSRC
binary.BigEndian.PutUint32((*pkt)[8:12], RtpSSRC)
// payload follows
copy((*pkt)[RtpHeaderSize:RtpHeaderSize+RtpPackets*Mp2tPacketSize], mp2tPacket)
}
// send a video clip in various ways: to a file, over http/udp/rtp, or dump to stdout
func sendClip(clip []byte, clipSize int, pid uint16, mode int, output string) {
switch mode {
case File:
fmt.Printf("Writing %s (%d bytes)\n", output, clipSize)
ff, err := os.Create(output)
check(err, "Error creating "+output)
ff.Write(clip[0:clipSize])
ff.Close()
case Http:
url := output + strconv.Itoa(clipSize) // NB: append the size to the output
fmt.Printf("Posting %s (%d bytes)\n", url, clipSize)
resp, err := http.Post(url, "video/mp2t", bytes.NewBuffer(clip[0:clipSize]))
check(err, "Post to "+output+" failed")
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
fmt.Printf(string(body) + "\n")
case Udp:
size := UdpPackets * Mp2tPacketSize
if clipSize%size != 0 {
fmt.Printf("Warning: Bad clip size (%d bytes)\n", clipSize)
}
count := clipSize / size
fmt.Printf("Sending %d UDP packets of size %d (%d bytes)\n", count, size, clipSize)
pkt := make([]byte, size)
conn, err := net.Dial("udp", output)
check(err, "Error dialing "+output)
for offset := 0; offset < clipSize; offset += size {
copy(pkt, clip[offset:offset+size])
_, err = conn.Write(pkt)
if err != nil {
fmt.Printf("UDP write error %s. Is your player listening?\n", err)
time.Sleep(ErrorSleep)
continue
}
}
conn.Close()
case Rtp:
size := RtpPackets * Mp2tPacketSize
if clipSize%size != 0 {
fmt.Printf("Warning: Bad clip size (%d bytes)\n", clipSize)
}
count := clipSize / size
fmt.Printf("Sending %d RTP packets of size %d (%d bytes)\n",
count, size+RtpHeaderSize, clipSize)
pkt := make([]byte, RtpHeaderSize+RtpPackets*Mp2tPacketSize)
conn, err := net.Dial("udp", output) // RTP uses UDP
check(err, "Error dialing "+output)
for offset := 0; offset < clipSize; offset += size {
rtpEncapsulate(clip[offset:offset+size], &pkt)
_, err = conn.Write(pkt)
if err != nil {
fmt.Printf("RTP write error %s. Is your player listening?\n", err)
time.Sleep(ErrorSleep)
continue
}
}
conn.Close()
case Dump:
fmt.Printf("Dumping clip (%d bytes)\n", clipSize)
mp2tDump(clip[0:clipSize], pid)
default:
log.Fatal("Invalid mode in sendClip: %d\n", mode)
}
}
// does the real work
func process(input string, framerate int, pid uint16, mode int, output string) {
fmt.Printf("Reading H264 video from %s, pid %d\n", input, pid)
var videoArg, audioArg [2]string
if Flags&(FilterFixPTS|Flags&FilterScale640|Flags&FilterScale320) == 0 {
videoArg[0] = "-vcodec"
videoArg[1] = "copy"
} else {
videoArg[0] = "-vf"
videoArg[1] = ""
if Flags&FilterFixPTS != 0 {
videoArg[1] += "setpts='PTS-STARTPTS'" // start counting PTS from zero
}
if Flags&FilterScale640 != 0 {
videoArg[1] += ", scale=640:352"
} else if Flags&FilterScale320 != 0 {
videoArg[1] += ", scale=320:176"
}
}
if Flags&(FilterDropAudio) == 0 {
audioArg[0] = "-acodec"
audioArg[1] = "copy"
} else {
audioArg[0] = "-an"
audioArg[1] = ""
}
fmt.Printf("Executing: %s -r %d -i %s %s \"%s\" %s %s -f mpegts -\n",
FfmpegPath, framerate, input, videoArg[0], videoArg[1], audioArg[0], audioArg[1])
var cmd *exec.Cmd
if audioArg[1] == "" {
cmd = exec.Command(FfmpegPath,
"-r", strconv.Itoa(framerate),
"-i", input,
videoArg[0], videoArg[1],
audioArg[0],
"-f", "mpegts", "-")
} else {
cmd = exec.Command(FfmpegPath,
"-r", strconv.Itoa(framerate),
"-i", input,
videoArg[0], videoArg[1],
audioArg[0], audioArg[1],
"-f", "mpegts", "-")
}
stdout, err := cmd.StdoutPipe()
check(err, "Error creating pipe")
err = cmd.Start()
check(err, "Error starting pipe")
pkt := make([]byte, Mp2tPacketSize)
br := bufio.NewReader(stdout)
// instantiate a Ticker that will send to its channel every second
everySecond := time.NewTicker(1 * time.Second)
fmt.Printf("Looping\n")
clip := make([]byte, MaxPackets*Mp2tPacketSize)
clipSize := 0
clipCount := 1
var packetCount uint64 = 0
send := false
var packetsPerFrame uint64 = UdpPackets
if mode == Rtp {
packetsPerFrame = RtpPackets
}
for {
sz, err := io.ReadFull(br, pkt)
if err != nil {
fmt.Printf("Error reading from ffmpeg: %s\n", err)
time.Sleep(ErrorSleep)
continue
}
if sz == Mp2tPacketSize {
if Flags&FilterFixContinuity != 0 {
mp2tFixContinuity(&pkt, packetCount, pid)
}
if packetCount == MaxPackets {
send = true
}
if send && packetCount%packetsPerFrame == 0 {
if mode == File {
output = fmt.Sprintf("/tmp/vid%03d.ts", clipCount)
}
sendClip(clip, clipSize, pid, mode, output)
clipCount += 1
clipSize = 0
packetCount = 0
send = false
}
copy(clip[clipSize:], pkt)
packetCount += 1
clipSize += Mp2tPacketSize
} else {
fmt.Printf("Warning: read short packet with %d bytes\n", sz)
continue
}
select {
case <-everySecond.C:
send = (packetCount > 1)
default:
}
}
}
func main() {
var input = flag.String("i", DefaultInput, "Input RTSP URL")
var output = flag.String("o", "", "Output URL (HTTP, UDP or RTP)")
var modeCh = flag.String("m", "u", "Mode: one of f,h,u,r or d")
var flags = flag.Int("f", 0, "Flags: see source code for explanation")
var pid = flag.Int("p", DefaultPid, "Show packets for this PID only")
var rate = flag.Int("r", DefaultFrameRate, "Input frame rate")
flag.Parse()
Flags = uint32(*flags)
mode := Udp
switch *modeCh {
case "f":
mode = File
case "h":
mode = Http
if *output == "" {
*output = DefaultHttpOutput
}
case "u":
mode = Udp
if *output == "" {
*output = DefaultUdpOutput
} else if (*output)[0:6] == "udp://" {
*output = (*output)[6:]
}
case "r":
mode = Rtp
rtpInit()
if *output == "" {
*output = DefaultUdpOutput
} else if (*output)[0:6] == "rtp://" {
*output = (*output)[6:]
}
case "d":
mode = Dump
default:
log.Fatal("Invalid mode %s\n", *modeCh)
}
if Flags&FilterFixContinuity != 0 && Flags&DumpProgramInfo != 0 {
log.Fatal("Cannot combine FilterFixContinuity and DumpProgramInfo flags\n")
}
fmt.Printf("revid -s %s -r %d -f %d -p %d -m %s -o %s\n", *input, *rate, Flags, uint16(*pid), *modeCh, *output)
process(*input, *rate, uint16(*pid), mode, *output)
}