/* NAME revid - a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols. DESCRIPTION See Readme.md AUTHOR Alan Noble LICENSE revid is Copyright (C) 2017 Alan Noble. It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). */ // revid is a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols. package main import ( "bufio" "bytes" "crypto/md5" "encoding/binary" "encoding/hex" "flag" "fmt" "io/ioutil" "log" "math/rand" "net" "net/http" "os" "runtime" "strconv" "strings" "time" "io" "../packets" "bitbucket.org/ausocean/av/ringbuffer" "github.com/Comcast/gots/packet" "github.com/Comcast/gots/packet/adaptationfield" "github.com/Comcast/gots/psi" "github.com/beatgammit/rtsp" ) // defaults and networking consts const ( clipDuration = 1 // s defaultPID = 256 defaultFrameRate = 25 defaultHTTPOutput = "http://localhost:8080?" defaultUDPOutput = "udp://0.0.0.0:16384" defaultRTPOutput = "rtp://0.0.0.0:16384" mp2tPacketSize = 188 // MPEG-TS packet size mp2tMaxPackets = 2016 * clipDuration // # first multiple of 7 and 8 greater than 2000 udpPackets = 7 // # of UDP packets per ethernet frame (8 is the max) rtpPackets = 7 // # of RTP packets per ethernet frame (7 is the max) rtpHeaderSize = 12 rtpSSRC = 1 // any value will do bufferSize = 1000 / clipDuration bitrateOutputDelay = 60 // s httpTimeOut = 5 // s motionThreshold = "0.0025" qscale = "3" rtpPort = 17300 rtcpPort = 17319 rtspUrl = "rtsp://192.168.0.50:8554/CH002.sdp" rtpUrl = "rtsp://192.168.0.50:8554/CH002.sdp/track1" ) // flag values const ( filterFixPTS = 0x0001 filterDropAudio = 0x0002 filterScale640 = 0x0004 filterScale320 = 0x0008 filterFixContinuity = 0x0010 filterEdgeDetection = 0x0020 filterMotionDetect = 0x0040 filterRepacket = 0x0080 dumpProgramInfo = 0x0100 // 256 dumpPacketStats = 0x0200 // 512 dumpPacketHeader = 0x0400 // 1024 dumpPacketPayload = 0x0800 // 2048 ) // globals var ( sendClip = sendClipToRTP packetsPerFrame = rtpPackets clipCount int expectCC int dumpCC int dumpPCRBase uint64 rtpSequenceNum uint16 conn net.Conn ffmpegPath string tempDir string inputErrChan chan error outputErrChan chan error ringBuffer ringbuffer.RingBuffer ) // command-line flags var ( inputURL = flag.String("i", "", "Input RTSP URL") outputURL = flag.String("o", "", "Output URL (HTTP, UDP or RTP)") mode = flag.String("m", "r", "Mode: one of f,h,u,r or d") flags = flag.Int("f", 0, "Flags: see readme for explanation") frameRate = flag.Int("r", defaultFrameRate, "Input video frame rate (25fps by default)") selectedPID = flag.Int("p", defaultPID, "Select packets with this packet ID (PID)") ) func main() { setUpDirs() flag.Parse() if *inputURL == "" { log.Fatal("Input (-i) required\n") } switch *mode { case "f": sendClip = sendClipToFile case "h": sendClip = sendClipToHTTP if *outputURL == "" { *outputURL = defaultHTTPOutput } case "u": sendClip = sendClipToUDP packetsPerFrame = udpPackets if *outputURL == "" { *outputURL = defaultUDPOutput } case "r": sendClip = sendClipToRTP packetsPerFrame = rtpPackets if *outputURL == "" { *outputURL = defaultRTPOutput } case "d": //sendClip = sendClipToStdout default: log.Fatalf("Invalid mode %s\n", *mode) } if *flags&filterFixContinuity != 0 && *flags&dumpProgramInfo != 0 { log.Fatal("Cannot combine filterFixContinuity and dumpProgramInfo flags\n") } ringBuffer = ringbuffer.NewRingBuffer(bufferSize, mp2tPacketSize*mp2tMaxPackets) inputErrChan = make(chan error, 10) outputErrChan = make(chan error, 10) go input(*inputURL, *outputURL) go output(*outputURL) for { select { default: case err := <-inputErrChan: fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, "Trying again in 10s") time.Sleep(10 * time.Second) go input(*inputURL, *outputURL) case err := <-outputErrChan: fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, "Attempting to write again!") } } } // setUpDirs sets directories based on the OS that Revid is running on func setUpDirs() { switch runtime.GOOS { case "windows": ffmpegPath = "C:/ffmpeg/ffmpeg" tempDir = "tmp/" case "darwin": ffmpegPath = "/usr/local/bin/ffmpeg" tempDir = "/tmp/" default: ffmpegPath = "/home/$USER/bin/ffmpeg" tempDir = "/tmp/" } } // input handles the reading from the specified input func input(input string, output string) { fmt.Printf("Reading video from %s\n", input) // (re)initialize globals clipCount = 0 expectCC = -1 dumpCC = -1 dumpPCRBase = 0 rtpSequenceNum = uint16(rand.Intn(1 << 15)) // for UDP and RTP only dial once var err error if strings.HasPrefix(output, "udp://") || strings.HasPrefix(output, "rtp://") { conn, err = net.Dial("udp", output[6:]) if err != nil { inputErrChan <- err return } } sess := rtsp.NewSession() res, err := sess.Options(rtspUrl) if err != nil { log.Fatalln(err) } fmt.Println("Options:") fmt.Println(res) res, err = sess.Describe(rtspUrl) if err != nil { log.Fatalln(err) } fmt.Println("Describe:") fmt.Println(res) p, err := rtsp.ParseSdp(&io.LimitedReader{R: res.Body, N: res.ContentLength}) if err != nil { log.Fatalln(err) } log.Printf("%+v", p) fmt.Println("Setting up!") rtpPort, rtcpPort := 17300, 17319 res, err = sess.Setup(rtpUrl, fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", rtpPort, rtcpPort)) if err != nil { log.Fatalln(err) } log.Println(res) fmt.Println("Playing !") res, err = sess.Play(rtspUrl, res.Header.Get("Session")) if err != nil { log.Fatalln(err) } log.Println(res) // create udp connection for rtp stuff rtpLaddr, err := net.ResolveUDPAddr("udp", "192.168.0.109:17300") if err != nil { fmt.Println("Local rtp addr not set!") } rtpAddr, err := net.ResolveUDPAddr("udp", "192.168.0.50:17300") if err != nil { fmt.Println("Resolving rtp address didn't work!") } rtpConn, err := net.DialUDP("udp", rtpLaddr, rtpAddr) if err != nil { fmt.Println("Rtp dial didn't work!") } // Create udp connection for rtcp stuff rtcpLaddr, err := net.ResolveUDPAddr("udp", "192.168.0.109:17319") if err != nil { fmt.Println("Local ") } rtcpAddr, err := net.ResolveUDPAddr("udp", "192.168.0.50:17301") if err != nil { fmt.Println("resolving rtcp address didn't work!") } rtcpConn, err := net.DialUDP("udp", rtcpLaddr, rtcpAddr) if err != nil { fmt.Println("Rtcp dial didnt't work!") } rtpSession := packets.NewSession(rtpConn,rtcpConn) converter := packets.NewRtpToTsConverter() go func(){ for{ select { default: case aPacket := <-rtpSession.RtpChan: converter.InputChan<-aPacket } } }() go converter.Convert() clipSize := 0 packetCount := 0 now := time.Now() prevTime := now fmt.Printf("Looping\n") startPackets := [][]byte{ {71,64,17,16,0,66,240,65,0,1,193,0,0,255,1,255,0,1,252,128,48,72,46,1,6,70,70,109,112,101,103,37,115,116,114,101,97,109,101,100,32,98,121,32,116,104,101,32,71,101,111,86,105,115,105,111,110,32,82,116,115,112,32,83,101,114,118,101,114,99,176,214,195,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255}, {71,64,0,16,0,0,176,13,0,1,193,0,0,0,1,240,0,42,177,4,178,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255}, /*PMT*/{71,80,0,16, /*Start of payload*/ 0,2,176,18,0,1,193,0,0,255,255,240,0,27,225,0,240,0,193,91,65,224,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255,255}, } donePSI := false for { if clip, err := ringBuffer.Get(); err != nil { inputErrChan <- err return } else { ii := 0 for { upperBound := clipSize + mp2tPacketSize if ii < 3 && !donePSI { packetByteSlice := startPackets[ii] copy(clip[clipSize:upperBound],packetByteSlice) ii++ } else { donePSI = true fmt.Println("getting TS packet") packet := <-converter.TsChan packetByteSlice := packet.ToByteSlice() copy(clip[clipSize:upperBound],packetByteSlice) } //fmt.Println(clip[clipSize:upperBound]) packetCount++ clipSize += mp2tPacketSize // send if (1) our buffer is full or (2) 1 second has elapsed and we have % packetsPerFrame now = time.Now() if (packetCount == mp2tMaxPackets) || (now.Sub(prevTime) > clipDuration*time.Second && packetCount%packetsPerFrame == 0) { clipCount++ if err := ringBuffer.DoneWriting(clipSize); err != nil { inputErrChan <- err return } clipSize = 0 packetCount = 0 prevTime = now break } } } } } // output handles the writing to specified output func output(output string) { elapsedTime := time.Duration(0) now := time.Now() prevTime := now for { if clip, err := ringBuffer.Read(); err == nil { now := time.Now() for err = sendClip(clip, output, conn); err != nil; { outputErrChan <- err err = sendClip(clip, output, conn) } deltaTime := now.Sub(prevTime) elapsedTime += deltaTime if elapsedTime > bitrateOutputDelay*time.Second { noOfBits := float64(len(clip)*8) / 1024.0 // convert bytes to kilobits fmt.Printf("Bitrate: %d kbps\n", int64(noOfBits/float64(deltaTime/1e9))) elapsedTime = time.Duration(0) } prevTime = now if err := ringBuffer.DoneReading(); err != nil { outputErrChan <- err } } } } // sendClipToFile writes a video clip to a /tmp file. func sendClipToFile(clip []byte, _ string, _ net.Conn) error { filename := fmt.Sprintf(tempDir+"vid%03d.ts", clipCount) fmt.Printf("Writing %s (%d bytes)\n", filename, len(clip)) err := ioutil.WriteFile(filename, clip, 0644) if err != nil { return fmt.Errorf("Error writing file %s: %s", filename, err) } return nil } // sendClipToHTPP posts a video clip via HTTP, using a new TCP connection each time. func sendClipToHTTP(clip []byte, output string, _ net.Conn) error { timeout := time.Duration(httpTimeOut * time.Second) client := http.Client{ Timeout: timeout, } hash := md5.Sum(clip) url := output + strconv.Itoa(len(clip)) + "." + hex.EncodeToString(hash[:]) // NB: append size.digest to output fmt.Printf("Posting %s (%d bytes)\n", url, len(clip)) resp, err := client.Post(url, "video/mp2t", bytes.NewReader(clip)) // lighter than NewBuffer if err != nil { return fmt.Errorf("Error posting to %s: %s", output, err) } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err == nil { fmt.Printf("%s\n", body) } return err } // sendClipToUDP sends a video clip over UDP. func sendClipToUDP(clip []byte, _ string, conn net.Conn) error { size := udpPackets * mp2tPacketSize fmt.Printf("Sending %d UDP packets of size %d (%d bytes)\n", len(clip)/size, size, len(clip)) for offset := 0; offset < len(clip); offset += size { pkt := clip[offset : offset+size] _, err := conn.Write(pkt) if err != nil { return fmt.Errorf("UDP write error %s. Is your player listening?", err) } } return nil } // sendClipToRTP sends a video clip over RTP. func sendClipToRTP(clip []byte, _ string, conn net.Conn) error { size := rtpPackets * mp2tPacketSize fmt.Printf("Sending %d RTP packets of size %d (%d bytes)\n", len(clip)/size, size+rtpHeaderSize, len(clip)) pkt := make([]byte, rtpHeaderSize+rtpPackets*mp2tPacketSize) for offset := 0; offset < len(clip); offset += size { rtpEncapsulate(clip[offset:offset+size], pkt) _, err := conn.Write(pkt) if err != nil { return fmt.Errorf("RTP write error %s. Is your player listening?", err) } } return nil } // checkContinuityCounts checks that the continuity of the clip is correct func checkContinuityCounts(clip []byte) error { for offset := 0; offset < len(clip); offset += mp2tPacketSize { dumpCC = -1 pkt := clip[offset : offset+mp2tPacketSize] cc := int(pkt[3] & 0xf) if dumpCC != -1 && cc != dumpCC { return fmt.Errorf("Continuity count out of order. Expected %v, Got: %v.", dumpCC, cc) } dumpCC = (cc + 1) % 16 } return nil } // sendClipToStdout dumps video stats to stdout. func sendClipToStdout(clip []byte) error { fmt.Printf("Dumping clip (%d bytes)\n", len(clip)) /* if *flags&dumpProgramInfo != 0 { return mp2tDumpProgram(clip) } */ packetCount := 0 discontinuities := 0 var cc int for offset := 0; offset < len(clip); offset += mp2tPacketSize { packetCount++ pkt := clip[offset : offset+mp2tPacketSize] pktPID, err := packet.Pid(pkt) if err != nil { return err } if pktPID != uint16(*selectedPID) { continue } if *flags&(dumpPacketHeader|dumpPacketPayload) != 0 { fmt.Printf("Packet #%d.%d\n", clipCount, packetCount) } hasPayload := pkt[3]&0x10 != 0 if !hasPayload { continue // nothing to do } // extract interesting info from header tei := pkt[1] & 0x80 >> 7 pusi := pkt[1] & 0x40 >> 6 tp := pkt[1] & 0x20 >> 5 tcs := pkt[3] & 0xc0 >> 6 afc := pkt[3] & 0x30 >> 4 cc = int(pkt[3] & 0xf) di := pkt[5] & 0x80 if dumpCC != -1 && cc != dumpCC { discontinuities++ fmt.Printf("Warning: Packet #%d.%d continuity counter out of order! Got %d, expected %d.\n", clipCount, packetCount, cc, dumpCC) } dumpCC = (cc + 1) % 16 //if *flags&dumpPacketHeader != 0 { fmt.Printf("\t\tTEI=%d, PUSI=%d, TP=%d, TSC=%d, AFC=%d, CC=%d\n", tei, pusi, tp, tcs, afc, cc) //} if afc == 3 { // adaptation field, followed by payload afl := adaptationfield.Length(pkt) if adaptationfield.HasPCR(pkt) { pcrBase, pcrExt, _ := mp2tGetPCR(pkt) if *flags&dumpPacketHeader != 0 { fmt.Printf("\t\tAFL=%d, PCRbase=%d, PCRext=%d, DI=%v\n", afl, pcrBase, pcrExt, di) } if pcrBase < dumpPCRBase { fmt.Printf("Warning: PCRbase went backwards!\n") } dumpPCRBase = pcrBase } else if *flags&dumpPacketHeader != 0 { fmt.Printf("\t\tAFL=%d, DI=%v\n", afl, di) } } if *flags&dumpPacketPayload != 0 { fmt.Printf("\t\tPayload=%x\n", pkt) } } if *flags&dumpPacketStats != 0 { fmt.Printf("%d packets of size %d bytes (%d bytes, %d discontinuites)\n", packetCount, packet.PacketSize, packetCount*packet.PacketSize, discontinuities) } return nil } // mp2tDumpProgram dumps MPEG-TS Program Association Table (PAT) and Program Map Tables (PMT). func mp2tDumpProgram(clip []byte) error { // NB: Comcast API requires a buffered reader reader := bufio.NewReader(bytes.NewReader(clip)) _, err := packet.Sync(reader) if err != nil { return fmt.Errorf("Error reading sync byte: %s", err) } pat, err := psi.ReadPAT(reader) if err != nil { return fmt.Errorf("Error reading PAT: %s", err) } mp2tDumpPat(pat) var pmts []psi.PMT pm := pat.ProgramMap() for pn, pid := range pm { pmt, err := psi.ReadPMT(reader, pid) if err != nil { return fmt.Errorf("Error reading PMT: %s", err) } pmts = append(pmts, pmt) mp2tDumpPmt(pn, pmt) } return nil } func mp2tDumpPat(pat psi.PAT) { fmt.Printf("Pat\n") fmt.Printf("\tPMT PIDs %v\n", pat.ProgramMap()) fmt.Printf("\tNumber of Programs %v\n", pat.NumPrograms()) } func mp2tDumpPmt(pn uint16, pmt psi.PMT) { // pn = program number fmt.Printf("Program #%v PMT\n", pn) fmt.Printf("\tPIDs %v\n", pmt.Pids()) fmt.Printf("\tElementary Streams") for _, es := range pmt.ElementaryStreams() { fmt.Printf("\t\tPID %v : StreamType %v\n", es.ElementaryPid(), es.StreamType()) for _, d := range es.Descriptors() { fmt.Printf("\t\t\t%+v\n", d) } } } // Mp2tFixContinuity fixes discontinous MPEG-TS continuity counts (CC) func mp2tFixContinuity(pkt []byte, pid uint16) bool { hasPayload, err := packet.ContainsPayload(pkt) if err != nil { fmt.Printf("Warning: Packet bad.\n") return false } if !hasPayload { return false } if pktPID, _ := packet.Pid(pkt); pktPID != pid { return false } fixed := false // extract continuity counter from 2nd nibble of 4th byte of header cc := int(pkt[3] & 0xf) if expectCC == -1 { expectCC = cc } else if cc != expectCC { fmt.Println("Have to fix") pkt[3] = pkt[3]&0xf0 | byte(expectCC&0xf) fixed = true } expectCC = (expectCC + 1) % 16 return fixed } // Mp2tGetPCR extracts the Program Clock Reference (PCR) from an MPEG-TS packet (if any) func mp2tGetPCR(pkt []byte) (uint64, uint32, bool) { if !adaptationfield.HasPCR(pkt) { return 0, 0, false } pcrBytes, _ := adaptationfield.PCR(pkt) // 6 bytes // first 33 bits are PCR base, next 6 bits are reserved, final 9 bits are PCR extension. pcrBase := uint64(binary.BigEndian.Uint32(pcrBytes[:4]))<<1 | uint64(pcrBytes[4]&0x80>>7) pcrExt := uint32(pcrBytes[4]&0x01)<<1 | uint32(pcrBytes[5]) return pcrBase, pcrExt, true } // rtpEncapsulate encapsulates MPEG-TS packets within an RTP header, // setting the payload type accordingly (to 33) and incrementing the RTP sequence number. func rtpEncapsulate(mp2tPacket []byte, pkt []byte) { // RTP packet encapsulates the MP2T // first 12 bytes is the header // byte 0: version=2, padding=0, extension=0, cc=0 pkt[0] = 0x80 // version (2) // byte 1: marker=0, pt = 33 (MP2T) pkt[1] = 33 // bytes 2 & 3: sequence number binary.BigEndian.PutUint16(pkt[2:4], rtpSequenceNum) if rtpSequenceNum == ^uint16(0) { rtpSequenceNum = 0 } else { rtpSequenceNum++ } // bytes 4,5,6&7: timestamp timestamp := uint32(time.Now().UnixNano() / 1e6) // ms timestamp binary.BigEndian.PutUint32(pkt[4:8], timestamp) // bytes 8,9,10&11: SSRC binary.BigEndian.PutUint32(pkt[8:12], rtpSSRC) // payload follows copy(pkt[rtpHeaderSize:rtpHeaderSize+rtpPackets*mp2tPacketSize], mp2tPacket) }