/* NAME revid - a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols. DESCRIPTION See Readme.md AUTHOR Alan Noble LICENSE revid is Copyright (C) 2017 Alan Noble. It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with NetReceiver in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). */ package main import ( "bufio" "bytes" "encoding/binary" "flag" "fmt" "github.com/Comcast/gots/packet" "github.com/Comcast/gots/packet/adaptationfield" "github.com/Comcast/gots/psi" "io" "io/ioutil" "log" "math/rand" "net" "net/http" "os" "os/exec" "strconv" "time" ) // program modes const ( File = iota Http Udp Rtp Dump ) // defaults const DefaultPid = 256 const DefaultFrameRate = 25 const DefaultInput = "rtsp://RTSP_URL" const DefaultHttpOutput = "http://HTTP_URL" const DefaultUdpOutput = "0.0.0.0:16384" // packet/clip consts const Mp2tPacketSize = 188 // MPEG-TS packet size const UdpPackets = 7 // # of UDP packets per ethernet frame (8 is the max) const RtpPackets = 7 // # of RTP packets per ethernet frame const MaxPackets = 2240 // # first multiple of 7 and 8 greater than 2000 const ErrorSleep = 1000000000 // ns // ffmepg consts const FfmpegPath = "/usr/bin/ffmpeg" // flags const ( FilterFixPTS = 0x0001 FilterDropAudio = 0x0002 FilterScale640 = 0x0004 FilterScale320 = 0x0008 FilterFixContinuity = 0x0010 DumpProgramInfo = 0x0100 // 256 DumpPacketStats = 0x0200 // 512 DumpPacketHeader = 0x0400 // 1024 DumpPacketPayload = 0x0800 // 2048 ) // globals var Flags uint32 = 0 var ExpectCC int = -1 // utility functions func check(err error, msg string) { if err != nil { log.Fatal("%s: %s\n", msg, err) } } func printlnf(format string, a ...interface{}) { fmt.Printf(format+"\n", a...) } // MPEG-TS (MP2T) functions func mp2tDumpPmt(pn uint16, pmt psi.PMT) { // pn = program number printlnf("Program #%v PMT", pn) printlnf("\tPIDs %v", pmt.Pids()) println("\tElementary Streams") for _, es := range pmt.ElementaryStreams() { printlnf("\t\tPid %v : StreamType %v", es.ElementaryPid(), es.StreamType()) for _, d := range es.Descriptors() { printlnf("\t\t\t%+v", d) } } } func mp2tDumpPat(pat psi.PAT) { println("Pat") printlnf("\tPMT PIDs %v", pat.ProgramMap()) printlnf("\tNumber of Programs %v", pat.NumPrograms()) } func mp2tGetPcr(pkt []byte) (uint64, uint32, bool) { if adaptationfield.HasPCR(pkt) { pcrBytes, _ := adaptationfield.PCR(pkt) // 6 bytes // first 33 bits are PCR base, next 6 bits are reserved, final 9 bits are PCR extension. pcrBase := uint64(binary.BigEndian.Uint32(pcrBytes[0:4]))<<1 | uint64(pcrBytes[4]&0x80>>7) pcrExt := uint32(pcrBytes[4]&0x01)<<1 | uint32(pcrBytes[5]) return pcrBase, pcrExt, true } else { return 0, 0, false } } var DumpPcrBase uint64 = 0 var DumpExpectCC int = -1 func mp2tDump(tsData []byte, pid uint16) { // verify if sync-byte is present and correct reader := bufio.NewReader(bytes.NewReader(tsData)) _, err := packet.Sync(reader) if err != nil { fmt.Println("Warning: Bad sync byte") return } if Flags&DumpProgramInfo != 0 { // NB: ReadPat and ReadPMT consume packets so mess up continuity counting pat, err := psi.ReadPAT(reader) if err != nil { fmt.Println(err) return } mp2tDumpPat(pat) var pmts []psi.PMT pm := pat.ProgramMap() for pn, pid := range pm { pmt, err := psi.ReadPMT(reader, pid) if err != nil { panic(err) } pmts = append(pmts, pmt) mp2tDumpPmt(pn, pmt) } } pkt := make(packet.Packet, packet.PacketSize) var packetCount uint64 = 0 discontinuities := 0 for { if _, err := io.ReadFull(reader, pkt); err != nil { if err == io.EOF { break } println(err) return } packetCount++ if Flags&(DumpPacketHeader|DumpPacketPayload) != 0 { fmt.Printf("Packet #%d\n", packetCount) } if Flags&FilterFixContinuity != 0 { fixed := mp2tFixContinuity((*[]byte)(&pkt), packetCount, pid) if fixed { fmt.Printf("Packet #%d fixed!\n", packetCount) } } hasPayload, err := packet.ContainsPayload(pkt) if err != nil { fmt.Printf("Packet #%d bad\n", packetCount) continue } if !hasPayload { continue // nothing to do } pktPid, err := packet.Pid(pkt) if err != nil || pktPid != pid { continue } // extract interesting info from header headerByte1 := uint8(pkt[1]) headerByte3 := uint8(pkt[3]) tei := headerByte1 & 0x80 >> 7 pusi := headerByte1 & 0x40 >> 6 tp := headerByte1 & 0x20 >> 5 tcs := headerByte3 & 0xc0 >> 6 afc := headerByte3 & 0x30 >> 4 cc := int(headerByte3 & 0x0f) if Flags&DumpPacketHeader != 0 { fmt.Printf("\t\tTEI=%d, PUSI=%d, TP=%d, TSC=%d, AFC=%d, CC=%d\n", tei, pusi, tp, tcs, afc, cc) } // NB: don't misreport discontinuities when dumping program info if Flags&DumpProgramInfo == 0 && DumpExpectCC != -1 && cc != DumpExpectCC { discontinuities += 1 fmt.Printf("Warning: Packet #%d continuity counter out of order! Got %d, expected %d.\n", packetCount, cc, DumpExpectCC) } DumpExpectCC = (cc + 1) % 16 if afc == 3 { // adaptation field, followed by payload afl := adaptationfield.Length(pkt) if adaptationfield.HasPCR(pkt) { pcrBase, pcrExt, _ := mp2tGetPcr(pkt) if Flags&DumpPacketHeader != 0 { fmt.Printf("\t\tAFL=%d, PCRbase=%d, PCRext=%d\n", afl, pcrBase, pcrExt) } if pcrBase < DumpPcrBase { fmt.Printf("Warning: PCRbase went backwards!\n") } DumpPcrBase = pcrBase } else if Flags&DumpPacketHeader != 0 { fmt.Printf("\t\tAFL=%d\n", afl) } } if Flags&DumpPacketPayload != 0 { fmt.Printf("\t\tPayload=%x\n", pkt) } } if Flags&DumpPacketStats != 0 { fmt.Printf("%d packets of size %d bytes (%d bytes, %d discontinuites)\n", packetCount, packet.PacketSize, packetCount*packet.PacketSize, discontinuities) } } // fix continuity counter func mp2tFixContinuity(pkt *[]byte, packetCount uint64, pid uint16) bool { hasPayload, err := packet.ContainsPayload(*pkt) if err != nil { fmt.Printf("Warning: Packet #%d bad.\n", packetCount) return false } if !hasPayload { return false // nothing to do } if pktPid, _ := packet.Pid(*pkt); pktPid != pid { return false // nothing to do } fixed := false // extract continuity counter from 2nd nibble of 4th byte of header afc := (*pkt)[3] & 0x30 >> 4 cc := int((*pkt)[3] & 0xf) if afc&0x01 == 1 { if ExpectCC != -1 && cc != ExpectCC { (*pkt)[3] = (*pkt)[3]&0xf0 | byte(ExpectCC) fixed = true } ExpectCC = (cc + 1) % 16 } return fixed } // RTP functions const RtpHeaderSize = 12 const RtpSSRC = 1 // any value seems to work var RtpSequenceNum uint16 = 0 func rtpInit() { // per spec, intialize RTP sequence number with a random number RtpSequenceNum = uint16(rand.Intn(1 << 15)) fmt.Printf("Initial RTP sequence number %d\n", RtpSequenceNum) } func rtpEncapsulate(mp2tPacket []byte, pkt *[]byte) { // RTP packet encapsulates the MP2T // first 12 bytes is the header // byte 0: version=2, padding=0, extension=0, cc=0 (*pkt)[0] = 0x80 // version (2) // byte 1: marker=0, pt = 33 (MP2T) (*pkt)[1] = 33 // bytes 2 & 3: sequence number binary.BigEndian.PutUint16((*pkt)[2:4], RtpSequenceNum) if RtpSequenceNum == ^uint16(0) { RtpSequenceNum = 0 } else { RtpSequenceNum += 1 } // bytes 4,5,6&7: timestamp timestamp := uint32(time.Now().UnixNano() * 1000) // ms timestamp binary.BigEndian.PutUint32((*pkt)[4:8], timestamp) // bytes 8,9,10&11: SSRC binary.BigEndian.PutUint32((*pkt)[8:12], RtpSSRC) // payload follows copy((*pkt)[RtpHeaderSize:RtpHeaderSize+RtpPackets*Mp2tPacketSize], mp2tPacket) } // send a video clip in various ways: to a file, over http/udp/rtp, or dump to stdout func sendClip(clip []byte, clipSize int, pid uint16, mode int, output string) { switch mode { case File: fmt.Printf("Writing %s (%d bytes)\n", output, clipSize) ff, err := os.Create(output) check(err, "Error creating "+output) ff.Write(clip[0:clipSize]) ff.Close() case Http: url := output + strconv.Itoa(clipSize) // NB: append the size to the output fmt.Printf("Posting %s (%d bytes)\n", url, clipSize) resp, err := http.Post(url, "video/mp2t", bytes.NewBuffer(clip[0:clipSize])) check(err, "Post to "+output+" failed") defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) fmt.Printf(string(body) + "\n") case Udp: size := UdpPackets * Mp2tPacketSize if clipSize%size != 0 { fmt.Printf("Warning: Bad clip size (%d bytes)\n", clipSize) } count := clipSize / size fmt.Printf("Sending %d UDP packets of size %d (%d bytes)\n", count, size, clipSize) pkt := make([]byte, size) conn, err := net.Dial("udp", output) check(err, "Error dialing "+output) for offset := 0; offset < clipSize; offset += size { copy(pkt, clip[offset:offset+size]) _, err = conn.Write(pkt) if err != nil { fmt.Printf("UDP write error %s. Is your player listening?\n", err) time.Sleep(ErrorSleep) continue } } conn.Close() case Rtp: size := RtpPackets * Mp2tPacketSize if clipSize%size != 0 { fmt.Printf("Warning: Bad clip size (%d bytes)\n", clipSize) } count := clipSize / size fmt.Printf("Sending %d RTP packets of size %d (%d bytes)\n", count, size+RtpHeaderSize, clipSize) pkt := make([]byte, RtpHeaderSize+RtpPackets*Mp2tPacketSize) conn, err := net.Dial("udp", output) // RTP uses UDP check(err, "Error dialing "+output) for offset := 0; offset < clipSize; offset += size { rtpEncapsulate(clip[offset:offset+size], &pkt) _, err = conn.Write(pkt) if err != nil { fmt.Printf("RTP write error %s. Is your player listening?\n", err) time.Sleep(ErrorSleep) continue } } conn.Close() case Dump: fmt.Printf("Dumping clip (%d bytes)\n", clipSize) mp2tDump(clip[0:clipSize], pid) default: log.Fatal("Invalid mode in sendClip: %d\n", mode) } } // does the real work func process(input string, framerate int, pid uint16, mode int, output string) { fmt.Printf("Reading H264 video from %s, pid %d\n", input, pid) var videoArg, audioArg [2]string if Flags&(FilterFixPTS|Flags&FilterScale640|Flags&FilterScale320) == 0 { videoArg[0] = "-vcodec" videoArg[1] = "copy" } else { videoArg[0] = "-vf" videoArg[1] = "" if Flags&FilterFixPTS != 0 { videoArg[1] += "setpts='PTS-STARTPTS'" // start counting PTS from zero } if Flags&FilterScale640 != 0 { videoArg[1] += ", scale=640:352" } else if Flags&FilterScale320 != 0 { videoArg[1] += ", scale=320:176" } } if Flags&(FilterDropAudio) == 0 { audioArg[0] = "-acodec" audioArg[1] = "copy" } else { audioArg[0] = "-an" audioArg[1] = "" } fmt.Printf("Executing: %s -r %d -i %s %s \"%s\" %s %s -f mpegts -\n", FfmpegPath, framerate, input, videoArg[0], videoArg[1], audioArg[0], audioArg[1]) var cmd *exec.Cmd if audioArg[1] == "" { cmd = exec.Command(FfmpegPath, "-r", strconv.Itoa(framerate), "-i", input, videoArg[0], videoArg[1], audioArg[0], "-f", "mpegts", "-") } else { cmd = exec.Command(FfmpegPath, "-r", strconv.Itoa(framerate), "-i", input, videoArg[0], videoArg[1], audioArg[0], audioArg[1], "-f", "mpegts", "-") } stdout, err := cmd.StdoutPipe() check(err, "Error creating pipe") err = cmd.Start() check(err, "Error starting pipe") pkt := make([]byte, Mp2tPacketSize) br := bufio.NewReader(stdout) // instantiate a Ticker that will send to its channel every second everySecond := time.NewTicker(1 * time.Second) fmt.Printf("Looping\n") clip := make([]byte, MaxPackets*Mp2tPacketSize) clipSize := 0 clipCount := 1 var packetCount uint64 = 0 send := false var packetsPerFrame uint64 = UdpPackets if mode == Rtp { packetsPerFrame = RtpPackets } for { sz, err := io.ReadFull(br, pkt) if err != nil { fmt.Printf("Error reading from ffmpeg: %s\n", err) time.Sleep(ErrorSleep) continue } if sz == Mp2tPacketSize { if Flags&FilterFixContinuity != 0 { mp2tFixContinuity(&pkt, packetCount, pid) } if packetCount == MaxPackets { send = true } if send && packetCount%packetsPerFrame == 0 { if mode == File { output = fmt.Sprintf("/tmp/vid%03d.ts", clipCount) } sendClip(clip, clipSize, pid, mode, output) clipCount += 1 clipSize = 0 packetCount = 0 send = false } copy(clip[clipSize:], pkt) packetCount += 1 clipSize += Mp2tPacketSize } else { fmt.Printf("Warning: read short packet with %d bytes\n", sz) continue } select { case <-everySecond.C: send = (packetCount > 1) default: } } } func main() { var input = flag.String("i", DefaultInput, "Input RTSP URL") var output = flag.String("o", "", "Output URL (HTTP, UDP or RTP)") var modeCh = flag.String("m", "u", "Mode: one of f,h,u,r or d") var flags = flag.Int("f", 0, "Flags: see source code for explanation") var pid = flag.Int("p", DefaultPid, "Show packets for this PID only") var rate = flag.Int("r", DefaultFrameRate, "Input frame rate") flag.Parse() Flags = uint32(*flags) mode := Udp switch *modeCh { case "f": mode = File case "h": mode = Http if *output == "" { *output = DefaultHttpOutput } case "u": mode = Udp if *output == "" { *output = DefaultUdpOutput } else if (*output)[0:6] == "udp://" { *output = (*output)[6:] } case "r": mode = Rtp rtpInit() if *output == "" { *output = DefaultUdpOutput } else if (*output)[0:6] == "rtp://" { *output = (*output)[6:] } case "d": mode = Dump default: log.Fatal("Invalid mode %s\n", *modeCh) } if Flags&FilterFixContinuity != 0 && Flags&DumpProgramInfo != 0 { log.Fatal("Cannot combine FilterFixContinuity and DumpProgramInfo flags\n") } fmt.Printf("revid -s %s -r %d -f %d -p %d -m %s -o %s\n", *input, *rate, Flags, uint16(*pid), *modeCh, *output) process(*input, *rate, uint16(*pid), mode, *output) }