From c8bfccb3100e580d80504e41ba567c1fce50ba21 Mon Sep 17 00:00:00 2001 From: Alan Noble Date: Wed, 13 Sep 2017 14:30:26 +0930 Subject: [PATCH] Initial revision --- contributors.txt | 1 + revid/Readme.md | 110 ++++++++++ revid/revid.go | 547 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 658 insertions(+) create mode 100644 contributors.txt create mode 100644 revid/Readme.md create mode 100644 revid/revid.go diff --git a/contributors.txt b/contributors.txt new file mode 100644 index 00000000..cf9c3b2d --- /dev/null +++ b/contributors.txt @@ -0,0 +1 @@ +Alan Noble diff --git a/revid/Readme.md b/revid/Readme.md new file mode 100644 index 00000000..a019d7eb --- /dev/null +++ b/revid/Readme.md @@ -0,0 +1,110 @@ +# Readme + +revid is a testbed for re-muxing and re-directing video streams as +MPEG-TS over various protocols. + +# Description + +The mode (-m) determine the mode of operation: + + * h = send HTTP (as a POST) + * u = send UDP + * r = send RTP + * f = write to /tmp files + * d = inspect packets and dump to screen + +Flags (-f) determine video filtering and other actions. + +For example, to send as raw UDP to on the current host, passing the video and audio as is: + +revid -i -m u -o udp://0.0.0.0: + +Or, to post as HTTP to , fixing PTS and dropping the audio along the way: + +revid -i -m h -f 3 -o + +Note that revid appends the size of the video to the URL to supply a query param. +Append a ? to your if you don't need it + +List of flags: + + * FilterFixPTS = 0x0001 + * FilterDropAudio = 0x0002 + * FilterScale640 = 0x0004 + * FilterScale320 = 0x0008 + * FilterFixContinuity = 0x0010 + * DumpProgramInfo = 0x0100 + * DumpPacketStats = 0x0200 + * DumpPacketHeader = 0x0400 + * DumpPacketPayload = 0x0800 + +Common flag combos: + + * 3: fix pts and drop audio + * 7: fix pts, drop audo and scale 640 + * 17: fix pts and fix continuity + * 256: dump program info + * 512: dump packet stats + * 513: fix pts, plus above + * 529: fix pts and fix continuity, plus above + +# Errors + +If you see "Error reading from ffmpeg: EOF" that means ffmpeg has +crashed for some reason, usually because of a bad parameter. Copy and +paste the ffmpeg command line into a terminal to see what is +happening. + +RTSP feeds from certain cameras (such as TechView ones) do not +generate presentation timestamps (PTS), resulting in errors such as +the following: + + * [mpegts @ 0xX] Timestamps are unset in a packet for stream 0... + * [mpegts @ 0xX] first pts value must be set + +This can be fixed with an ffmpeg video filter (specified by flag 0x0001). +Another issue is that MPEG-TS continuity counters may not be continuous. +You can fix this with the fix continuity flag (0x0010). + +FFmpeg will also complain if doesn't have the necessary audio codec +installed. If so, you can drop the audio (flag 0x0002). +OTES + +MPEG2 TS stream clocks (PCR, PTS, and DTS) all have units of 1/90000 +second and header fields are read as big endian (like most protocols). + + * TEI = Transport Error Indicator + * PUSI = Payload Unit Start Indicator + * TP = Transport Priority + * TCS = Transport Scrambling Control + * AFC = Adapation Field Control + * CC = Continuity Counter + * AFL = Adapation Field Length + +# Dependencies + +revid uses ffmpeg for video remuxing. +See (Ffmepg filters)[https://ffmpeg.org/ffmpeg-filters.html]. + +revid also uses (Comcast's gots package)[https://github.com/Comcast/gots]. + +# Author + +Alan Noble + +# License + +revid is Copyright (C) 2017 Alan Noble. + +It is free software: you can redistribute it and/or modify them +under the terms of the GNU General Public License as published by the +Free Software Foundation, either version 3 of the License, or (at your +option) any later version. + +It is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +or more details. + +You should have received a copy of the GNU General Public License +along with NetReceiver in gpl.txt. If not, see [GNU licenses](http://www.gnu.orlicenses). diff --git a/revid/revid.go b/revid/revid.go new file mode 100644 index 00000000..9e65a35f --- /dev/null +++ b/revid/revid.go @@ -0,0 +1,547 @@ +/* +NAME + revid - a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols. + +DESCRIPTION + See Readme.md + +AUTHOR + Alan Noble + +LICENSE + revid is Copyright (C) 2017 Alan Noble. + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with NetReceiver in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). +*/ + +package main + +import ( + "bufio" + "bytes" + "encoding/binary" + "flag" + "fmt" + "github.com/Comcast/gots/packet" + "github.com/Comcast/gots/packet/adaptationfield" + "github.com/Comcast/gots/psi" + "io" + "io/ioutil" + "log" + "math/rand" + "net" + "net/http" + "os" + "os/exec" + "strconv" + "time" +) + +// program modes +const ( + File = iota + Http + Udp + Rtp + Dump +) + +// defaults +const DefaultPid = 256 +const DefaultFrameRate = 25 +const DefaultInput = "rtsp://RTSP_URL" +const DefaultHttpOutput = "http://HTTP_URL" +const DefaultUdpOutput = "0.0.0.0:16384" + +// packet/clip consts +const Mp2tPacketSize = 188 // MPEG-TS packet size +const UdpPackets = 7 // # of UDP packets per ethernet frame (8 is the max) +const RtpPackets = 7 // # of RTP packets per ethernet frame +const MaxPackets = 2240 // # first multiple of 7 and 8 greater than 2000 +const ErrorSleep = 1000000000 // ns + +// ffmepg consts +const FfmpegPath = "/usr/bin/ffmpeg" + +// flags +const ( + FilterFixPTS = 0x0001 + FilterDropAudio = 0x0002 + FilterScale640 = 0x0004 + FilterScale320 = 0x0008 + FilterFixContinuity = 0x0010 + DumpProgramInfo = 0x0100 // 256 + DumpPacketStats = 0x0200 // 512 + DumpPacketHeader = 0x0400 // 1024 + DumpPacketPayload = 0x0800 // 2048 +) + +// globals +var Flags uint32 = 0 +var ExpectCC int = -1 + +// utility functions +func check(err error, msg string) { + if err != nil { + log.Fatal("%s: %s\n", msg, err) + } +} + +func printlnf(format string, a ...interface{}) { + fmt.Printf(format+"\n", a...) +} + +// MPEG-TS (MP2T) functions +func mp2tDumpPmt(pn uint16, pmt psi.PMT) { + // pn = program number + printlnf("Program #%v PMT", pn) + printlnf("\tPIDs %v", pmt.Pids()) + println("\tElementary Streams") + for _, es := range pmt.ElementaryStreams() { + printlnf("\t\tPid %v : StreamType %v", es.ElementaryPid(), es.StreamType()) + for _, d := range es.Descriptors() { + printlnf("\t\t\t%+v", d) + } + } +} + +func mp2tDumpPat(pat psi.PAT) { + println("Pat") + printlnf("\tPMT PIDs %v", pat.ProgramMap()) + printlnf("\tNumber of Programs %v", pat.NumPrograms()) +} + +func mp2tGetPcr(pkt []byte) (uint64, uint32, bool) { + if adaptationfield.HasPCR(pkt) { + pcrBytes, _ := adaptationfield.PCR(pkt) // 6 bytes + // first 33 bits are PCR base, next 6 bits are reserved, final 9 bits are PCR extension. + pcrBase := uint64(binary.BigEndian.Uint32(pcrBytes[0:4]))<<1 | uint64(pcrBytes[4]&0x80>>7) + pcrExt := uint32(pcrBytes[4]&0x01)<<1 | uint32(pcrBytes[5]) + return pcrBase, pcrExt, true + } else { + return 0, 0, false + } +} + +var DumpPcrBase uint64 = 0 +var DumpExpectCC int = -1 + +func mp2tDump(tsData []byte, pid uint16) { + // verify if sync-byte is present and correct + reader := bufio.NewReader(bytes.NewReader(tsData)) + _, err := packet.Sync(reader) + if err != nil { + fmt.Println("Warning: Bad sync byte") + return + } + + if Flags&DumpProgramInfo != 0 { + // NB: ReadPat and ReadPMT consume packets so mess up continuity counting + pat, err := psi.ReadPAT(reader) + if err != nil { + fmt.Println(err) + return + } + mp2tDumpPat(pat) + + var pmts []psi.PMT + pm := pat.ProgramMap() + for pn, pid := range pm { + pmt, err := psi.ReadPMT(reader, pid) + if err != nil { + panic(err) + } + pmts = append(pmts, pmt) + mp2tDumpPmt(pn, pmt) + } + } + + pkt := make(packet.Packet, packet.PacketSize) + var packetCount uint64 = 0 + discontinuities := 0 + + for { + if _, err := io.ReadFull(reader, pkt); err != nil { + if err == io.EOF { + break + } + println(err) + return + } + packetCount++ + if Flags&(DumpPacketHeader|DumpPacketPayload) != 0 { + fmt.Printf("Packet #%d\n", packetCount) + } + if Flags&FilterFixContinuity != 0 { + fixed := mp2tFixContinuity((*[]byte)(&pkt), packetCount, pid) + if fixed { + fmt.Printf("Packet #%d fixed!\n", packetCount) + } + + } + + hasPayload, err := packet.ContainsPayload(pkt) + if err != nil { + fmt.Printf("Packet #%d bad\n", packetCount) + continue + } + if !hasPayload { + continue // nothing to do + } + + pktPid, err := packet.Pid(pkt) + if err != nil || pktPid != pid { + continue + } + + // extract interesting info from header + headerByte1 := uint8(pkt[1]) + headerByte3 := uint8(pkt[3]) + tei := headerByte1 & 0x80 >> 7 + pusi := headerByte1 & 0x40 >> 6 + tp := headerByte1 & 0x20 >> 5 + tcs := headerByte3 & 0xc0 >> 6 + afc := headerByte3 & 0x30 >> 4 + cc := int(headerByte3 & 0x0f) + + if Flags&DumpPacketHeader != 0 { + fmt.Printf("\t\tTEI=%d, PUSI=%d, TP=%d, TSC=%d, AFC=%d, CC=%d\n", tei, pusi, tp, tcs, afc, cc) + } + // NB: don't misreport discontinuities when dumping program info + if Flags&DumpProgramInfo == 0 && DumpExpectCC != -1 && cc != DumpExpectCC { + discontinuities += 1 + fmt.Printf("Warning: Packet #%d continuity counter out of order! Got %d, expected %d.\n", + packetCount, cc, DumpExpectCC) + } + DumpExpectCC = (cc + 1) % 16 + + if afc == 3 { + // adaptation field, followed by payload + afl := adaptationfield.Length(pkt) + if adaptationfield.HasPCR(pkt) { + pcrBase, pcrExt, _ := mp2tGetPcr(pkt) + if Flags&DumpPacketHeader != 0 { + fmt.Printf("\t\tAFL=%d, PCRbase=%d, PCRext=%d\n", afl, pcrBase, pcrExt) + } + if pcrBase < DumpPcrBase { + fmt.Printf("Warning: PCRbase went backwards!\n") + } + DumpPcrBase = pcrBase + } else if Flags&DumpPacketHeader != 0 { + fmt.Printf("\t\tAFL=%d\n", afl) + } + } + if Flags&DumpPacketPayload != 0 { + fmt.Printf("\t\tPayload=%x\n", pkt) + } + + } + if Flags&DumpPacketStats != 0 { + fmt.Printf("%d packets of size %d bytes (%d bytes, %d discontinuites)\n", + packetCount, packet.PacketSize, packetCount*packet.PacketSize, discontinuities) + } +} + +// fix continuity counter +func mp2tFixContinuity(pkt *[]byte, packetCount uint64, pid uint16) bool { + hasPayload, err := packet.ContainsPayload(*pkt) + if err != nil { + fmt.Printf("Warning: Packet #%d bad.\n", packetCount) + return false + } + if !hasPayload { + return false // nothing to do + } + if pktPid, _ := packet.Pid(*pkt); pktPid != pid { + return false // nothing to do + } + fixed := false + // extract continuity counter from 2nd nibble of 4th byte of header + afc := (*pkt)[3] & 0x30 >> 4 + cc := int((*pkt)[3] & 0xf) + if afc&0x01 == 1 { + if ExpectCC != -1 && cc != ExpectCC { + (*pkt)[3] = (*pkt)[3]&0xf0 | byte(ExpectCC) + fixed = true + } + ExpectCC = (cc + 1) % 16 + } + return fixed +} + +// RTP functions +const RtpHeaderSize = 12 +const RtpSSRC = 1 // any value seems to work +var RtpSequenceNum uint16 = 0 + +func rtpInit() { + // per spec, intialize RTP sequence number with a random number + RtpSequenceNum = uint16(rand.Intn(1 << 15)) + fmt.Printf("Initial RTP sequence number %d\n", RtpSequenceNum) +} + +func rtpEncapsulate(mp2tPacket []byte, pkt *[]byte) { + // RTP packet encapsulates the MP2T + // first 12 bytes is the header + // byte 0: version=2, padding=0, extension=0, cc=0 + (*pkt)[0] = 0x80 // version (2) + // byte 1: marker=0, pt = 33 (MP2T) + (*pkt)[1] = 33 + // bytes 2 & 3: sequence number + binary.BigEndian.PutUint16((*pkt)[2:4], RtpSequenceNum) + if RtpSequenceNum == ^uint16(0) { + RtpSequenceNum = 0 + } else { + RtpSequenceNum += 1 + } + // bytes 4,5,6&7: timestamp + timestamp := uint32(time.Now().UnixNano() * 1000) // ms timestamp + binary.BigEndian.PutUint32((*pkt)[4:8], timestamp) + // bytes 8,9,10&11: SSRC + binary.BigEndian.PutUint32((*pkt)[8:12], RtpSSRC) + + // payload follows + copy((*pkt)[RtpHeaderSize:RtpHeaderSize+RtpPackets*Mp2tPacketSize], mp2tPacket) +} + +// send a video clip in various ways: to a file, over http/udp/rtp, or dump to stdout +func sendClip(clip []byte, clipSize int, pid uint16, mode int, output string) { + switch mode { + case File: + fmt.Printf("Writing %s (%d bytes)\n", output, clipSize) + ff, err := os.Create(output) + check(err, "Error creating "+output) + ff.Write(clip[0:clipSize]) + ff.Close() + case Http: + url := output + strconv.Itoa(clipSize) // NB: append the size to the output + fmt.Printf("Posting %s (%d bytes)\n", url, clipSize) + resp, err := http.Post(url, "video/mp2t", bytes.NewBuffer(clip[0:clipSize])) + check(err, "Post to "+output+" failed") + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + fmt.Printf(string(body) + "\n") + case Udp: + size := UdpPackets * Mp2tPacketSize + if clipSize%size != 0 { + fmt.Printf("Warning: Bad clip size (%d bytes)\n", clipSize) + } + count := clipSize / size + fmt.Printf("Sending %d UDP packets of size %d (%d bytes)\n", count, size, clipSize) + pkt := make([]byte, size) + conn, err := net.Dial("udp", output) + check(err, "Error dialing "+output) + for offset := 0; offset < clipSize; offset += size { + copy(pkt, clip[offset:offset+size]) + _, err = conn.Write(pkt) + if err != nil { + fmt.Printf("UDP write error %s. Is your player listening?\n", err) + time.Sleep(ErrorSleep) + continue + } + } + conn.Close() + case Rtp: + size := RtpPackets * Mp2tPacketSize + if clipSize%size != 0 { + fmt.Printf("Warning: Bad clip size (%d bytes)\n", clipSize) + } + count := clipSize / size + fmt.Printf("Sending %d RTP packets of size %d (%d bytes)\n", + count, size+RtpHeaderSize, clipSize) + pkt := make([]byte, RtpHeaderSize+RtpPackets*Mp2tPacketSize) + conn, err := net.Dial("udp", output) // RTP uses UDP + check(err, "Error dialing "+output) + for offset := 0; offset < clipSize; offset += size { + rtpEncapsulate(clip[offset:offset+size], &pkt) + _, err = conn.Write(pkt) + if err != nil { + fmt.Printf("RTP write error %s. Is your player listening?\n", err) + time.Sleep(ErrorSleep) + continue + } + } + conn.Close() + case Dump: + fmt.Printf("Dumping clip (%d bytes)\n", clipSize) + mp2tDump(clip[0:clipSize], pid) + default: + log.Fatal("Invalid mode in sendClip: %d\n", mode) + } +} + +// does the real work +func process(input string, framerate int, pid uint16, mode int, output string) { + fmt.Printf("Reading H264 video from %s, pid %d\n", input, pid) + + var videoArg, audioArg [2]string + + if Flags&(FilterFixPTS|Flags&FilterScale640|Flags&FilterScale320) == 0 { + videoArg[0] = "-vcodec" + videoArg[1] = "copy" + } else { + videoArg[0] = "-vf" + videoArg[1] = "" + if Flags&FilterFixPTS != 0 { + videoArg[1] += "setpts='PTS-STARTPTS'" // start counting PTS from zero + } + if Flags&FilterScale640 != 0 { + videoArg[1] += ", scale=640:352" + } else if Flags&FilterScale320 != 0 { + videoArg[1] += ", scale=320:176" + } + } + + if Flags&(FilterDropAudio) == 0 { + audioArg[0] = "-acodec" + audioArg[1] = "copy" + } else { + audioArg[0] = "-an" + audioArg[1] = "" + } + + fmt.Printf("Executing: %s -r %d -i %s %s \"%s\" %s %s -f mpegts -\n", + FfmpegPath, framerate, input, videoArg[0], videoArg[1], audioArg[0], audioArg[1]) + + var cmd *exec.Cmd + if audioArg[1] == "" { + cmd = exec.Command(FfmpegPath, + "-r", strconv.Itoa(framerate), + "-i", input, + videoArg[0], videoArg[1], + audioArg[0], + "-f", "mpegts", "-") + + } else { + cmd = exec.Command(FfmpegPath, + "-r", strconv.Itoa(framerate), + "-i", input, + videoArg[0], videoArg[1], + audioArg[0], audioArg[1], + "-f", "mpegts", "-") + } + + stdout, err := cmd.StdoutPipe() + check(err, "Error creating pipe") + + err = cmd.Start() + check(err, "Error starting pipe") + + pkt := make([]byte, Mp2tPacketSize) + br := bufio.NewReader(stdout) + + // instantiate a Ticker that will send to its channel every second + everySecond := time.NewTicker(1 * time.Second) + + fmt.Printf("Looping\n") + clip := make([]byte, MaxPackets*Mp2tPacketSize) + clipSize := 0 + clipCount := 1 + var packetCount uint64 = 0 + send := false + var packetsPerFrame uint64 = UdpPackets + if mode == Rtp { + packetsPerFrame = RtpPackets + } + + for { + sz, err := io.ReadFull(br, pkt) + if err != nil { + fmt.Printf("Error reading from ffmpeg: %s\n", err) + time.Sleep(ErrorSleep) + continue + } + + if sz == Mp2tPacketSize { + if Flags&FilterFixContinuity != 0 { + mp2tFixContinuity(&pkt, packetCount, pid) + } + if packetCount == MaxPackets { + send = true + } + if send && packetCount%packetsPerFrame == 0 { + if mode == File { + output = fmt.Sprintf("/tmp/vid%03d.ts", clipCount) + } + sendClip(clip, clipSize, pid, mode, output) + clipCount += 1 + clipSize = 0 + packetCount = 0 + send = false + } + copy(clip[clipSize:], pkt) + packetCount += 1 + clipSize += Mp2tPacketSize + } else { + fmt.Printf("Warning: read short packet with %d bytes\n", sz) + continue + } + + select { + case <-everySecond.C: + send = (packetCount > 1) + default: + } + + } +} + +func main() { + var input = flag.String("i", DefaultInput, "Input RTSP URL") + var output = flag.String("o", "", "Output URL (HTTP, UDP or RTP)") + var modeCh = flag.String("m", "u", "Mode: one of f,h,u,r or d") + var flags = flag.Int("f", 0, "Flags: see source code for explanation") + var pid = flag.Int("p", DefaultPid, "Show packets for this PID only") + var rate = flag.Int("r", DefaultFrameRate, "Input frame rate") + + flag.Parse() + + Flags = uint32(*flags) + + mode := Udp + switch *modeCh { + case "f": + mode = File + case "h": + mode = Http + if *output == "" { + *output = DefaultHttpOutput + } + case "u": + mode = Udp + if *output == "" { + *output = DefaultUdpOutput + } else if (*output)[0:6] == "udp://" { + *output = (*output)[6:] + } + case "r": + mode = Rtp + rtpInit() + if *output == "" { + *output = DefaultUdpOutput + } else if (*output)[0:6] == "rtp://" { + *output = (*output)[6:] + } + case "d": + mode = Dump + default: + log.Fatal("Invalid mode %s\n", *modeCh) + } + + if Flags&FilterFixContinuity != 0 && Flags&DumpProgramInfo != 0 { + log.Fatal("Cannot combine FilterFixContinuity and DumpProgramInfo flags\n") + } + fmt.Printf("revid -s %s -r %d -f %d -p %d -m %s -o %s\n", *input, *rate, Flags, uint16(*pid), *modeCh, *output) + process(*input, *rate, uint16(*pid), mode, *output) +}