From dbfe59432ce1abb7b90be5ec5da7cb037c14578e Mon Sep 17 00:00:00 2001 From: Alan Noble Date: Wed, 20 Sep 2017 21:27:01 +0930 Subject: [PATCH] Refactored to be more Go idiomatic. --- revid/revid.go | 822 ++++++++++++++++++++++++------------------------- 1 file changed, 403 insertions(+), 419 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index 9e65a35f..dea2183b 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -22,9 +22,10 @@ LICENSE for more details. You should have received a copy of the GNU General Public License - along with NetReceiver in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). + along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). */ +// revid is a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols. package main import ( @@ -33,377 +34,151 @@ import ( "encoding/binary" "flag" "fmt" - "github.com/Comcast/gots/packet" - "github.com/Comcast/gots/packet/adaptationfield" - "github.com/Comcast/gots/psi" "io" "io/ioutil" "log" "math/rand" "net" "net/http" - "os" "os/exec" "strconv" + "strings" "time" + + "github.com/Comcast/gots/packet" + "github.com/Comcast/gots/packet/adaptationfield" + "github.com/Comcast/gots/psi" ) // program modes const ( File = iota - Http - Udp - Rtp + HTTP + UDP + RTP Dump ) -// defaults -const DefaultPid = 256 -const DefaultFrameRate = 25 -const DefaultInput = "rtsp://RTSP_URL" -const DefaultHttpOutput = "http://HTTP_URL" -const DefaultUdpOutput = "0.0.0.0:16384" - -// packet/clip consts -const Mp2tPacketSize = 188 // MPEG-TS packet size -const UdpPackets = 7 // # of UDP packets per ethernet frame (8 is the max) -const RtpPackets = 7 // # of RTP packets per ethernet frame -const MaxPackets = 2240 // # first multiple of 7 and 8 greater than 2000 -const ErrorSleep = 1000000000 // ns - -// ffmepg consts -const FfmpegPath = "/usr/bin/ffmpeg" +// defaults and networking consts +const ( + defaultPid = 256 + defaultFrameRate = 25 + defaultHTTPOutput = "http://localhost:8080?" + defaultUDPOutput = "udp://0.0.0.0:16384" + defaultRTPOutput = "rtp://0.0.0.0:16384" + mp2tPacketSize = 188 // MPEG-TS packet size + mp2tMaxPackets = 2016 // # first multiple of 7 and 8 greater than 2000 + UDPPackets = 7 // # of UDP packets per ethernet frame (8 is the max) + RTPPackets = 7 // # of RTP packets per ethernet frame (7 is the max) + RTPHeaderSize = 12 + RTPSSRC = 1 // any value will do + ffmpegPath = "/usr/bin/ffmpeg" +) // flags const ( - FilterFixPTS = 0x0001 - FilterDropAudio = 0x0002 - FilterScale640 = 0x0004 - FilterScale320 = 0x0008 - FilterFixContinuity = 0x0010 - DumpProgramInfo = 0x0100 // 256 - DumpPacketStats = 0x0200 // 512 - DumpPacketHeader = 0x0400 // 1024 - DumpPacketPayload = 0x0800 // 2048 + filterFixPTS = 0x0001 + filterDropAudio = 0x0002 + filterScale640 = 0x0004 + filterScale320 = 0x0008 + filterFixContinuity = 0x0010 + dumpProgramInfo = 0x0100 // 256 + dumpPacketStats = 0x0200 // 512 + dumpPacketHeader = 0x0400 // 1024 + dumpPacketPayload = 0x0800 // 2048 ) // globals -var Flags uint32 = 0 -var ExpectCC int = -1 +var ( + sendClip = sendClipToRTP + packetsPerFrame = RTPPackets + flags *int + frameRate *int + selectedPid *int + clipCount int + expectCC int + dumpCC int + dumpPcrBase uint64 + RTPSequenceNum uint16 +) -// utility functions -func check(err error, msg string) { - if err != nil { - log.Fatal("%s: %s\n", msg, err) +func main() { + var ( + input = flag.String("i", "", "Input RTSP URL") + output = flag.String("o", "", "Output URL (HTTP, UDP or RTP)") + mode = flag.String("m", "r", "Mode: one of f,h,u,r or d") + ) + flags = flag.Int("f", 0, "Flags: see readme for explanation") + frameRate = flag.Int("r", defaultFrameRate, "Input video frame rate (25fps by default)") + selectedPid = flag.Int("p", defaultPid, "Select packets with this packet ID (PID)") + + flag.Parse() + + if *input == "" { + log.Fatal("Input (-i) required\n") } -} -func printlnf(format string, a ...interface{}) { - fmt.Printf(format+"\n", a...) -} - -// MPEG-TS (MP2T) functions -func mp2tDumpPmt(pn uint16, pmt psi.PMT) { - // pn = program number - printlnf("Program #%v PMT", pn) - printlnf("\tPIDs %v", pmt.Pids()) - println("\tElementary Streams") - for _, es := range pmt.ElementaryStreams() { - printlnf("\t\tPid %v : StreamType %v", es.ElementaryPid(), es.StreamType()) - for _, d := range es.Descriptors() { - printlnf("\t\t\t%+v", d) + switch *mode { + case "f": + sendClip = sendClipToFile + case "h": + sendClip = sendClipToHTTP + if *output == "" { + *output = defaultHTTPOutput } - } -} - -func mp2tDumpPat(pat psi.PAT) { - println("Pat") - printlnf("\tPMT PIDs %v", pat.ProgramMap()) - printlnf("\tNumber of Programs %v", pat.NumPrograms()) -} - -func mp2tGetPcr(pkt []byte) (uint64, uint32, bool) { - if adaptationfield.HasPCR(pkt) { - pcrBytes, _ := adaptationfield.PCR(pkt) // 6 bytes - // first 33 bits are PCR base, next 6 bits are reserved, final 9 bits are PCR extension. - pcrBase := uint64(binary.BigEndian.Uint32(pcrBytes[0:4]))<<1 | uint64(pcrBytes[4]&0x80>>7) - pcrExt := uint32(pcrBytes[4]&0x01)<<1 | uint32(pcrBytes[5]) - return pcrBase, pcrExt, true - } else { - return 0, 0, false - } -} - -var DumpPcrBase uint64 = 0 -var DumpExpectCC int = -1 - -func mp2tDump(tsData []byte, pid uint16) { - // verify if sync-byte is present and correct - reader := bufio.NewReader(bytes.NewReader(tsData)) - _, err := packet.Sync(reader) - if err != nil { - fmt.Println("Warning: Bad sync byte") - return - } - - if Flags&DumpProgramInfo != 0 { - // NB: ReadPat and ReadPMT consume packets so mess up continuity counting - pat, err := psi.ReadPAT(reader) - if err != nil { - fmt.Println(err) - return + case "u": + sendClip = sendClipToUDP + packetsPerFrame = UDPPackets + if *output == "" { + *output = defaultUDPOutput } - mp2tDumpPat(pat) - - var pmts []psi.PMT - pm := pat.ProgramMap() - for pn, pid := range pm { - pmt, err := psi.ReadPMT(reader, pid) - if err != nil { - panic(err) - } - pmts = append(pmts, pmt) - mp2tDumpPmt(pn, pmt) + case "r": + sendClip = sendClipToRTP + packetsPerFrame = RTPPackets + if *output == "" { + *output = defaultRTPOutput } + case "d": + sendClip = sendClipToStdout + default: + log.Fatal("Invalid mode %s\n", *mode) } - pkt := make(packet.Packet, packet.PacketSize) - var packetCount uint64 = 0 - discontinuities := 0 + if *flags&filterFixContinuity != 0 && *flags&dumpProgramInfo != 0 { + log.Fatal("Cannot combine filterFixContinuity and dumpProgramInfo flags\n") + } for { - if _, err := io.ReadFull(reader, pkt); err != nil { - if err == io.EOF { - break - } - println(err) - return - } - packetCount++ - if Flags&(DumpPacketHeader|DumpPacketPayload) != 0 { - fmt.Printf("Packet #%d\n", packetCount) - } - if Flags&FilterFixContinuity != 0 { - fixed := mp2tFixContinuity((*[]byte)(&pkt), packetCount, pid) - if fixed { - fmt.Printf("Packet #%d fixed!\n", packetCount) - } - - } - - hasPayload, err := packet.ContainsPayload(pkt) - if err != nil { - fmt.Printf("Packet #%d bad\n", packetCount) - continue - } - if !hasPayload { - continue // nothing to do - } - - pktPid, err := packet.Pid(pkt) - if err != nil || pktPid != pid { - continue - } - - // extract interesting info from header - headerByte1 := uint8(pkt[1]) - headerByte3 := uint8(pkt[3]) - tei := headerByte1 & 0x80 >> 7 - pusi := headerByte1 & 0x40 >> 6 - tp := headerByte1 & 0x20 >> 5 - tcs := headerByte3 & 0xc0 >> 6 - afc := headerByte3 & 0x30 >> 4 - cc := int(headerByte3 & 0x0f) - - if Flags&DumpPacketHeader != 0 { - fmt.Printf("\t\tTEI=%d, PUSI=%d, TP=%d, TSC=%d, AFC=%d, CC=%d\n", tei, pusi, tp, tcs, afc, cc) - } - // NB: don't misreport discontinuities when dumping program info - if Flags&DumpProgramInfo == 0 && DumpExpectCC != -1 && cc != DumpExpectCC { - discontinuities += 1 - fmt.Printf("Warning: Packet #%d continuity counter out of order! Got %d, expected %d.\n", - packetCount, cc, DumpExpectCC) - } - DumpExpectCC = (cc + 1) % 16 - - if afc == 3 { - // adaptation field, followed by payload - afl := adaptationfield.Length(pkt) - if adaptationfield.HasPCR(pkt) { - pcrBase, pcrExt, _ := mp2tGetPcr(pkt) - if Flags&DumpPacketHeader != 0 { - fmt.Printf("\t\tAFL=%d, PCRbase=%d, PCRext=%d\n", afl, pcrBase, pcrExt) - } - if pcrBase < DumpPcrBase { - fmt.Printf("Warning: PCRbase went backwards!\n") - } - DumpPcrBase = pcrBase - } else if Flags&DumpPacketHeader != 0 { - fmt.Printf("\t\tAFL=%d\n", afl) - } - } - if Flags&DumpPacketPayload != 0 { - fmt.Printf("\t\tPayload=%x\n", pkt) - } - - } - if Flags&DumpPacketStats != 0 { - fmt.Printf("%d packets of size %d bytes (%d bytes, %d discontinuites)\n", - packetCount, packet.PacketSize, packetCount*packet.PacketSize, discontinuities) + _ = readWriteVideo(*input, *output) + fmt.Printf("Trying again in 10s\n") + time.Sleep(1e10 * time.Nanosecond) } } -// fix continuity counter -func mp2tFixContinuity(pkt *[]byte, packetCount uint64, pid uint16) bool { - hasPayload, err := packet.ContainsPayload(*pkt) - if err != nil { - fmt.Printf("Warning: Packet #%d bad.\n", packetCount) - return false - } - if !hasPayload { - return false // nothing to do - } - if pktPid, _ := packet.Pid(*pkt); pktPid != pid { - return false // nothing to do - } - fixed := false - // extract continuity counter from 2nd nibble of 4th byte of header - afc := (*pkt)[3] & 0x30 >> 4 - cc := int((*pkt)[3] & 0xf) - if afc&0x01 == 1 { - if ExpectCC != -1 && cc != ExpectCC { - (*pkt)[3] = (*pkt)[3]&0xf0 | byte(ExpectCC) - fixed = true - } - ExpectCC = (cc + 1) % 16 - } - return fixed -} - -// RTP functions -const RtpHeaderSize = 12 -const RtpSSRC = 1 // any value seems to work -var RtpSequenceNum uint16 = 0 - -func rtpInit() { - // per spec, intialize RTP sequence number with a random number - RtpSequenceNum = uint16(rand.Intn(1 << 15)) - fmt.Printf("Initial RTP sequence number %d\n", RtpSequenceNum) -} - -func rtpEncapsulate(mp2tPacket []byte, pkt *[]byte) { - // RTP packet encapsulates the MP2T - // first 12 bytes is the header - // byte 0: version=2, padding=0, extension=0, cc=0 - (*pkt)[0] = 0x80 // version (2) - // byte 1: marker=0, pt = 33 (MP2T) - (*pkt)[1] = 33 - // bytes 2 & 3: sequence number - binary.BigEndian.PutUint16((*pkt)[2:4], RtpSequenceNum) - if RtpSequenceNum == ^uint16(0) { - RtpSequenceNum = 0 - } else { - RtpSequenceNum += 1 - } - // bytes 4,5,6&7: timestamp - timestamp := uint32(time.Now().UnixNano() * 1000) // ms timestamp - binary.BigEndian.PutUint32((*pkt)[4:8], timestamp) - // bytes 8,9,10&11: SSRC - binary.BigEndian.PutUint32((*pkt)[8:12], RtpSSRC) - - // payload follows - copy((*pkt)[RtpHeaderSize:RtpHeaderSize+RtpPackets*Mp2tPacketSize], mp2tPacket) -} - -// send a video clip in various ways: to a file, over http/udp/rtp, or dump to stdout -func sendClip(clip []byte, clipSize int, pid uint16, mode int, output string) { - switch mode { - case File: - fmt.Printf("Writing %s (%d bytes)\n", output, clipSize) - ff, err := os.Create(output) - check(err, "Error creating "+output) - ff.Write(clip[0:clipSize]) - ff.Close() - case Http: - url := output + strconv.Itoa(clipSize) // NB: append the size to the output - fmt.Printf("Posting %s (%d bytes)\n", url, clipSize) - resp, err := http.Post(url, "video/mp2t", bytes.NewBuffer(clip[0:clipSize])) - check(err, "Post to "+output+" failed") - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) - fmt.Printf(string(body) + "\n") - case Udp: - size := UdpPackets * Mp2tPacketSize - if clipSize%size != 0 { - fmt.Printf("Warning: Bad clip size (%d bytes)\n", clipSize) - } - count := clipSize / size - fmt.Printf("Sending %d UDP packets of size %d (%d bytes)\n", count, size, clipSize) - pkt := make([]byte, size) - conn, err := net.Dial("udp", output) - check(err, "Error dialing "+output) - for offset := 0; offset < clipSize; offset += size { - copy(pkt, clip[offset:offset+size]) - _, err = conn.Write(pkt) - if err != nil { - fmt.Printf("UDP write error %s. Is your player listening?\n", err) - time.Sleep(ErrorSleep) - continue - } - } - conn.Close() - case Rtp: - size := RtpPackets * Mp2tPacketSize - if clipSize%size != 0 { - fmt.Printf("Warning: Bad clip size (%d bytes)\n", clipSize) - } - count := clipSize / size - fmt.Printf("Sending %d RTP packets of size %d (%d bytes)\n", - count, size+RtpHeaderSize, clipSize) - pkt := make([]byte, RtpHeaderSize+RtpPackets*Mp2tPacketSize) - conn, err := net.Dial("udp", output) // RTP uses UDP - check(err, "Error dialing "+output) - for offset := 0; offset < clipSize; offset += size { - rtpEncapsulate(clip[offset:offset+size], &pkt) - _, err = conn.Write(pkt) - if err != nil { - fmt.Printf("RTP write error %s. Is your player listening?\n", err) - time.Sleep(ErrorSleep) - continue - } - } - conn.Close() - case Dump: - fmt.Printf("Dumping clip (%d bytes)\n", clipSize) - mp2tDump(clip[0:clipSize], pid) - default: - log.Fatal("Invalid mode in sendClip: %d\n", mode) - } -} - -// does the real work -func process(input string, framerate int, pid uint16, mode int, output string) { - fmt.Printf("Reading H264 video from %s, pid %d\n", input, pid) +// readWriteVideo reads video from an RTSP stream (specified by the input URL) and +// rewrites the video in various formats and/or different protocols (HTTP, UDP or RTP). +func readWriteVideo(input string, output string) error { + fmt.Printf("Reading video from %s\n", input) var videoArg, audioArg [2]string - if Flags&(FilterFixPTS|Flags&FilterScale640|Flags&FilterScale320) == 0 { + if *flags&(filterFixPTS|*flags&filterScale640|*flags&filterScale320) == 0 { videoArg[0] = "-vcodec" videoArg[1] = "copy" } else { videoArg[0] = "-vf" videoArg[1] = "" - if Flags&FilterFixPTS != 0 { + if *flags&filterFixPTS != 0 { videoArg[1] += "setpts='PTS-STARTPTS'" // start counting PTS from zero } - if Flags&FilterScale640 != 0 { + if *flags&filterScale640 != 0 { videoArg[1] += ", scale=640:352" - } else if Flags&FilterScale320 != 0 { + } else if *flags&filterScale320 != 0 { videoArg[1] += ", scale=320:176" } } - if Flags&(FilterDropAudio) == 0 { + if *flags&(filterDropAudio) == 0 { audioArg[0] = "-acodec" audioArg[1] = "copy" } else { @@ -412,20 +187,20 @@ func process(input string, framerate int, pid uint16, mode int, output string) { } fmt.Printf("Executing: %s -r %d -i %s %s \"%s\" %s %s -f mpegts -\n", - FfmpegPath, framerate, input, videoArg[0], videoArg[1], audioArg[0], audioArg[1]) + ffmpegPath, *frameRate, input, videoArg[0], videoArg[1], audioArg[0], audioArg[1]) var cmd *exec.Cmd if audioArg[1] == "" { - cmd = exec.Command(FfmpegPath, - "-r", strconv.Itoa(framerate), + cmd = exec.Command(ffmpegPath, + "-r", strconv.Itoa(*frameRate), "-i", input, videoArg[0], videoArg[1], audioArg[0], "-f", "mpegts", "-") } else { - cmd = exec.Command(FfmpegPath, - "-r", strconv.Itoa(framerate), + cmd = exec.Command(ffmpegPath, + "-r", strconv.Itoa(*frameRate), "-i", input, videoArg[0], videoArg[1], audioArg[0], audioArg[1], @@ -433,115 +208,324 @@ func process(input string, framerate int, pid uint16, mode int, output string) { } stdout, err := cmd.StdoutPipe() - check(err, "Error creating pipe") - - err = cmd.Start() - check(err, "Error starting pipe") - - pkt := make([]byte, Mp2tPacketSize) - br := bufio.NewReader(stdout) - - // instantiate a Ticker that will send to its channel every second - everySecond := time.NewTicker(1 * time.Second) - - fmt.Printf("Looping\n") - clip := make([]byte, MaxPackets*Mp2tPacketSize) - clipSize := 0 - clipCount := 1 - var packetCount uint64 = 0 - send := false - var packetsPerFrame uint64 = UdpPackets - if mode == Rtp { - packetsPerFrame = RtpPackets + if err != nil { + fmt.Printf("Error creating pipe: %s\n", err) + return err } + err = cmd.Start() + if err != nil { + fmt.Printf("Error starting pipe: %s\n", err) + return err + } + + // (re)initialize globals + clipCount = 0 + expectCC = -1 + dumpCC = -1 + dumpPcrBase = 0 + RTPSequenceNum = uint16(rand.Intn(1 << 15)) + + // for UDP and RTP only dial once + var conn net.Conn + if strings.Index(output, "udp://") == 0 || strings.Index(output, "rtp://") == 0 { + conn, err = net.Dial("udp", output[6:]) + if err != nil { + fmt.Printf("Error dialing %s: %s\n", output, err) + return err + } + defer conn.Close() + } + + br := bufio.NewReader(stdout) + pkt := make([]byte, mp2tPacketSize) + clip := make([]byte, mp2tMaxPackets*mp2tPacketSize) + clipSize := 0 + packetCount := 0 + prevTime := time.Now() + fmt.Printf("Looping\n") for { - sz, err := io.ReadFull(br, pkt) + _, err := io.ReadFull(br, pkt) if err != nil { fmt.Printf("Error reading from ffmpeg: %s\n", err) - time.Sleep(ErrorSleep) + return err + + } + if *flags&filterFixContinuity != 0 && mp2tFixContinuity(pkt, packetCount, (uint16)(*selectedPid)) { + fmt.Printf("Packet #%d.%d fixed\n", clipCount, packetCount) + } + copy(clip[clipSize:], pkt) + packetCount++ + clipSize += mp2tPacketSize + + // send if (1) our buffer is full or (2) 1 second has elapsed and we have % packetsPerFrame + if (packetCount == mp2tMaxPackets) || + (time.Now().Sub(prevTime) > 1*time.Second && packetCount%packetsPerFrame == 0) { + clipCount++ + if err = sendClip(clip[:clipSize], output, conn); err != nil { + return err + } + clipSize = 0 + packetCount = 0 + prevTime = time.Now() + } + } + return nil +} + +// sendClipToFile writes a video clip to a /tmp file. +func sendClipToFile(clip []byte, _ string, _ net.Conn) error { + filename := fmt.Sprintf("/tmp/vid%03d.ts", clipCount) + fmt.Printf("Writing %s (%d bytes)\n", filename, len(clip)) + err := ioutil.WriteFile(filename, clip, 0644) + if err != nil { + fmt.Printf("Error writing file %s: %s\n", filename, err) + return err + } + return nil +} + +// sendClipToHTPP posts a video clip via HTTP, using a new TCP connection each time. +func sendClipToHTTP(clip []byte, output string, _ net.Conn) error { + URL := output + strconv.Itoa(len(clip)) // NB: append the size to the output + fmt.Printf("Posting %s (%d bytes)\n", URL, len(clip)) + resp, err := http.Post(URL, "video/mp2t", bytes.NewReader(clip)) // lighter than NewBuffer + if err != nil { + fmt.Printf("Error posting to %s: %s\n", output, err) + return err + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err == nil { + fmt.Printf(string(body) + "\n") + } + return err +} + +// sendClipToUDP sends a video clip over UDP. +func sendClipToUDP(clip []byte, _ string, conn net.Conn) error { + size := UDPPackets * mp2tPacketSize + fmt.Printf("Sending %d UDP packets of size %d (%d bytes)\n", len(clip)/size, size, len(clip)) + pkt := make([]byte, size) + for offset := 0; offset < len(clip); offset += size { + copy(pkt, clip[offset:offset+size]) + _, err := conn.Write(pkt) + if err != nil { + fmt.Printf("UDP write error %s. Is your player listening?\n", err) + return err + + } + } + return nil +} + +// sendClipToRTP sends a video clip over RTP. +func sendClipToRTP(clip []byte, _ string, conn net.Conn) error { + size := RTPPackets * mp2tPacketSize + fmt.Printf("Sending %d RTP packets of size %d (%d bytes)\n", + len(clip)/size, size+RTPHeaderSize, len(clip)) + pkt := make([]byte, RTPHeaderSize+RTPPackets*mp2tPacketSize) + for offset := 0; offset < len(clip); offset += size { + RTPEncapsulate(clip[offset:offset+size], pkt) + _, err := conn.Write(pkt) + if err != nil { + fmt.Printf("RTP write error %s. Is your player listening?\n", err) + return err + } + } + return nil +} + +// sendClipToStdout dumps video stats to stdout. +func sendClipToStdout(clip []byte, _ string, _ net.Conn) error { + fmt.Printf("Dumping clip (%d bytes)\n", len(clip)) + + if *flags&dumpProgramInfo != 0 { + return mp2tDumpProgram(clip) + } + + packetCount := 0 + discontinuities := 0 + pkt := make([]byte, mp2tPacketSize) + var cc int + + for offset := 0; offset < len(clip); offset += mp2tPacketSize { + packetCount++ + copy(pkt, clip[offset:offset+mp2tPacketSize]) + + pktPid, err := packet.Pid(pkt) + if err != nil { + return err + } + if pktPid != (uint16)(*selectedPid) { continue } - if sz == Mp2tPacketSize { - if Flags&FilterFixContinuity != 0 { - mp2tFixContinuity(&pkt, packetCount, pid) - } - if packetCount == MaxPackets { - send = true - } - if send && packetCount%packetsPerFrame == 0 { - if mode == File { - output = fmt.Sprintf("/tmp/vid%03d.ts", clipCount) + if *flags&(dumpPacketHeader|dumpPacketPayload) != 0 { + fmt.Printf("Packet #%d.%d\n", clipCount, packetCount) + } + + hasPayload := pkt[3]&0x10 != 0 + if !hasPayload { + continue // nothing to do + } + + // extract interesting info from header + tei := pkt[1] & 0x80 >> 7 + pusi := pkt[1] & 0x40 >> 6 + tp := pkt[1] & 0x20 >> 5 + tcs := pkt[3] & 0xc0 >> 6 + afc := pkt[3] & 0x30 >> 4 + cc = int(pkt[3] & 0xf) + + if dumpCC != -1 && cc != dumpCC { + discontinuities++ + fmt.Printf("Warning: Packet #%d.%d continuity counter out of order! Got %d, expected %d.\n", + clipCount, packetCount, cc, dumpCC) + } + dumpCC = (cc + 1) % 16 + + if *flags&dumpPacketHeader != 0 { + fmt.Printf("\t\tTEI=%d, PUSI=%d, TP=%d, TSC=%d, AFC=%d, CC=%d\n", tei, pusi, tp, tcs, afc, cc) + } + + if afc == 3 { + // adaptation field, followed by payload + afl := adaptationfield.Length(pkt) + if adaptationfield.HasPCR(pkt) { + pcrBase, pcrExt, _ := mp2tGetPcr(pkt) + if *flags&dumpPacketHeader != 0 { + fmt.Printf("\t\tAFL=%d, PCRbase=%d, PCRext=%d\n", afl, pcrBase, pcrExt) } - sendClip(clip, clipSize, pid, mode, output) - clipCount += 1 - clipSize = 0 - packetCount = 0 - send = false + if pcrBase < dumpPcrBase { + fmt.Printf("Warning: PCRbase went backwards!\n") + } + dumpPcrBase = pcrBase + } else if *flags&dumpPacketHeader != 0 { + fmt.Printf("\t\tAFL=%d\n", afl) } - copy(clip[clipSize:], pkt) - packetCount += 1 - clipSize += Mp2tPacketSize - } else { - fmt.Printf("Warning: read short packet with %d bytes\n", sz) - continue + } + if *flags&dumpPacketPayload != 0 { + fmt.Printf("\t\tPayload=%x\n", pkt) } - select { - case <-everySecond.C: - send = (packetCount > 1) - default: - } + } + if *flags&dumpPacketStats != 0 { + fmt.Printf("%d packets of size %d bytes (%d bytes, %d discontinuites)\n", + packetCount, packet.PacketSize, packetCount*packet.PacketSize, discontinuities) + } + return nil +} +// mp2tDumpProgram dumps MPEG-TS Program Association Table (PAT) and Program Map Tables (PMT). +func mp2tDumpProgram(clip []byte) error { + reader := bufio.NewReader(bytes.NewReader(clip)) + + _, err := packet.Sync(reader) + if err != nil { + fmt.Println("Warning: Bad MPEG-TS sync byte") + return err + } + pat, err := psi.ReadPAT(reader) + if err != nil { + fmt.Printf("ReadPAT error: %s\n", err) + return err + } + mp2tDumpPat(pat) + + var pmts []psi.PMT + pm := pat.ProgramMap() + for pn, pid := range pm { + pmt, err := psi.ReadPMT(reader, pid) + if err != nil { + fmt.Printf("ReadPMT error: %s\n", err) + return err + } + pmts = append(pmts, pmt) + mp2tDumpPmt(pn, pmt) + } + return nil +} + +func mp2tDumpPat(pat psi.PAT) { + fmt.Printf("Pat\n") + fmt.Printf("\tPMT PIDs %v\n", pat.ProgramMap()) + fmt.Printf("\tNumber of Programs %v\n", pat.NumPrograms()) +} + +func mp2tDumpPmt(pn uint16, pmt psi.PMT) { + // pn = program number + fmt.Printf("Program #%v PMT\n", pn) + fmt.Printf("\tPIDs %v\n", pmt.Pids()) + fmt.Printf("\tElementary Streams") + for _, es := range pmt.ElementaryStreams() { + fmt.Printf("\t\tPid %v : StreamType %v\n", es.ElementaryPid(), es.StreamType()) + for _, d := range es.Descriptors() { + fmt.Printf("\t\t\t%+v\n", d) + } } } -func main() { - var input = flag.String("i", DefaultInput, "Input RTSP URL") - var output = flag.String("o", "", "Output URL (HTTP, UDP or RTP)") - var modeCh = flag.String("m", "u", "Mode: one of f,h,u,r or d") - var flags = flag.Int("f", 0, "Flags: see source code for explanation") - var pid = flag.Int("p", DefaultPid, "Show packets for this PID only") - var rate = flag.Int("r", DefaultFrameRate, "Input frame rate") - - flag.Parse() - - Flags = uint32(*flags) - - mode := Udp - switch *modeCh { - case "f": - mode = File - case "h": - mode = Http - if *output == "" { - *output = DefaultHttpOutput - } - case "u": - mode = Udp - if *output == "" { - *output = DefaultUdpOutput - } else if (*output)[0:6] == "udp://" { - *output = (*output)[6:] - } - case "r": - mode = Rtp - rtpInit() - if *output == "" { - *output = DefaultUdpOutput - } else if (*output)[0:6] == "rtp://" { - *output = (*output)[6:] - } - case "d": - mode = Dump - default: - log.Fatal("Invalid mode %s\n", *modeCh) +// Mp2tFixContinuity fixes discontinous MPEG-TS continuity counts (CC) +func mp2tFixContinuity(pkt []byte, packetCount int, pid uint16) bool { + hasPayload, err := packet.ContainsPayload(pkt) + if err != nil { + fmt.Printf("Warning: Packet #%d.%d bad.\n", clipCount, packetCount) + return false } - - if Flags&FilterFixContinuity != 0 && Flags&DumpProgramInfo != 0 { - log.Fatal("Cannot combine FilterFixContinuity and DumpProgramInfo flags\n") + if !hasPayload { + return false } - fmt.Printf("revid -s %s -r %d -f %d -p %d -m %s -o %s\n", *input, *rate, Flags, uint16(*pid), *modeCh, *output) - process(*input, *rate, uint16(*pid), mode, *output) + if pktPid, _ := packet.Pid(pkt); pktPid != pid { + return false + } + fixed := false + // extract continuity counter from 2nd nibble of 4th byte of header + cc := int(pkt[3] & 0xf) + if expectCC == -1 { + expectCC = cc + } else if cc != expectCC { + pkt[3] = pkt[3]&0xf0 | byte(expectCC&0xf) + fixed = true + } + expectCC = (expectCC + 1) % 16 + return fixed +} + +// Mp2tGetPcr extracts the Program Clock Reference (PCR) from an MPEG-TS packet (if any) +func mp2tGetPcr(pkt []byte) (uint64, uint32, bool) { + if !adaptationfield.HasPCR(pkt) { + return 0, 0, false + } + pcrBytes, _ := adaptationfield.PCR(pkt) // 6 bytes + // first 33 bits are PCR base, next 6 bits are reserved, final 9 bits are PCR extension. + pcrBase := uint64(binary.BigEndian.Uint32(pcrBytes[:4]))<<1 | uint64(pcrBytes[4]&0x80>>7) + pcrExt := uint32(pcrBytes[4]&0x01)<<1 | uint32(pcrBytes[5]) + return pcrBase, pcrExt, true +} + +// RTPEncapsulate encapsulates MPEG-TS packets within an RTP header, +// setting the payload type accordingly and incrementing the RTP sequence number. +func RTPEncapsulate(mp2tPacket []byte, pkt []byte) { + // RTP packet encapsulates the MP2T + // first 12 bytes is the header + // byte 0: version=2, padding=0, extension=0, cc=0 + pkt[0] = 0x80 // version (2) + // byte 1: marker=0, pt = 33 (MP2T) + pkt[1] = 33 + // bytes 2 & 3: sequence number + binary.BigEndian.PutUint16(pkt[2:4], RTPSequenceNum) + if RTPSequenceNum == ^uint16(0) { + RTPSequenceNum = 0 + } else { + RTPSequenceNum++ + } + // bytes 4,5,6&7: timestamp + timestamp := uint32(time.Now().UnixNano() / 1e6) // ms timestamp + binary.BigEndian.PutUint32(pkt[4:8], timestamp) + // bytes 8,9,10&11: SSRC + binary.BigEndian.PutUint32(pkt[8:12], RTPSSRC) + + // payload follows + copy(pkt[RTPHeaderSize:RTPHeaderSize+RTPPackets*mp2tPacketSize], mp2tPacket) }