diff --git a/camstreamer/camstreamer.go b/camstreamer/camstreamer.go new file mode 100644 index 00000000..b0910f41 --- /dev/null +++ b/camstreamer/camstreamer.go @@ -0,0 +1,65 @@ +package camstreamer + +import ( + "errors" +) + +type CamStreamer struct { + RtspUrl string + RtpUrl string + RtpPort uint16 + RtcpPort uint16 +} + +func (cs *CamStreamer)Connect()(session *RtpSession, err error){ + var res string + sess := rtsp.NewSession() + + if res, err = sess.Options(sc.RtspUrl); err != nil { + return + } + res, err = sess.Describe(rtspUrl) + if err != nil { + log.Fatalln(err) + } + p, err := rtsp.ParseSdp(&io.LimitedReader{R: res.Body, N: res.ContentLength}) + if err != nil { + } + log.Printf("%+v", p) + res, err = sess.Setup(rtpUrl, fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", rtpPort, rtcpPort)) + if err != nil { + } + log.Println(res) + res, err = sess.Play(rtspUrl, res.Header.Get("Session")) + if err != nil { + } + log.Println(res) + // create udp connection for rtp stuff + rtpLaddr, err := net.ResolveUDPAddr("udp", "192.168.0.109:17300") + if err != nil { + t.Errorf("Local rtp addr not set! %v\n", err) + } + rtpAddr, err := net.ResolveUDPAddr("udp", "192.168.0.50:17300") + if err != nil { + t.Errorf("Resolving rtp address didn't work! %v\n", err) + } + rtpConn, err := net.DialUDP("udp", rtpLaddr, rtpAddr) + if err != nil { + t.Errorf("Conncection not established! %v\n", err) + } + // Create udp connection for rtcp stuff + rtcpLaddr, err := net.ResolveUDPAddr("udp", "192.168.0.109:17319") + if err != nil { + t.Errorf("Local RTCP address not resolved! %v\n", err) + } + rtcpAddr, err := net.ResolveUDPAddr("udp", "192.168.0.50:17301") + if err != nil { + t.Errorf("Remote RTCP address not resolved! %v\n", err) + } + rtcpConn, err := net.DialUDP("udp", rtcpLaddr, rtcpAddr) + if err != nil { + t.Errorf("Connection not established! %v\n", err) + } + // let's create a session that will store useful stuff from the connections + rtpSession := NewSession(rtpConn, rtcpConn) +} diff --git a/h264extractor/H264Extractor.go b/h264/H264Writer.go similarity index 100% rename from h264extractor/H264Extractor.go rename to h264/H264Writer.go diff --git a/h264/h264Parser.go b/h264/h264Parser.go new file mode 100644 index 00000000..b5e667c1 --- /dev/null +++ b/h264/h264Parser.go @@ -0,0 +1 @@ +package h264 diff --git a/revid/revid.go b/revid/revid.go deleted file mode 100644 index 56036f2d..00000000 --- a/revid/revid.go +++ /dev/null @@ -1,624 +0,0 @@ -/* -NAME - revid - a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols. - -DESCRIPTION - See Readme.md - -AUTHOR - Alan Noble - -LICENSE - revid is Copyright (C) 2017 Alan Noble. - - It is free software: you can redistribute it and/or modify them - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - It is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. - - You should have received a copy of the GNU General Public License - along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). -*/ - -// revid is a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols. -package main - -import ( - "bufio" - "bytes" - "crypto/md5" - "encoding/binary" - "encoding/hex" - "flag" - "fmt" - "io" - "io/ioutil" - "log" - "math/rand" - "net" - "net/http" - "os" - "os/exec" - "runtime" - "strconv" - "strings" - "time" - - "bitbucket.org/ausocean/av/ringbuffer" - - "github.com/Comcast/gots/packet" - "github.com/Comcast/gots/packet/adaptationfield" - "github.com/Comcast/gots/psi" -) - -// defaults and networking consts -const ( - clipDuration = 1 // s - defaultPID = 256 - defaultFrameRate = 25 - defaultHTTPOutput = "http://localhost:8080?" - defaultUDPOutput = "udp://0.0.0.0:16384" - defaultRTPOutput = "rtp://0.0.0.0:16384" - mp2tPacketSize = 188 // MPEG-TS packet size - mp2tMaxPackets = 2016 * clipDuration // # first multiple of 7 and 8 greater than 2000 - udpPackets = 7 // # of UDP packets per ethernet frame (8 is the max) - rtpPackets = 7 // # of RTP packets per ethernet frame (7 is the max) - rtpHeaderSize = 12 - rtpSSRC = 1 // any value will do - bufferSize = 1000 / clipDuration - bitrateOutputDelay = 60 // s - httpTimeOut = 5 // s - motionThreshold = "0.0025" - qscale = "3" -) - -// flag values -const ( - filterFixPTS = 0x0001 - filterDropAudio = 0x0002 - filterScale640 = 0x0004 - filterScale320 = 0x0008 - filterFixContinuity = 0x0010 - filterEdgeDetection = 0x0020 - filterMotionDetect = 0x0040 - filterRepacket = 0x0080 - dumpProgramInfo = 0x0100 // 256 - dumpPacketStats = 0x0200 // 512 - dumpPacketHeader = 0x0400 // 1024 - dumpPacketPayload = 0x0800 // 2048 -) - -// globals -var ( - sendClip = sendClipToRTP - packetsPerFrame = rtpPackets - clipCount int - expectCC int - dumpCC int - dumpPCRBase uint64 - rtpSequenceNum uint16 - conn net.Conn - ffmpegPath string - tempDir string - inputErrChan chan error - outputErrChan chan error - ringBuffer ringbuffer.RingBuffer -) - -// command-line flags -var ( - inputURL = flag.String("i", "", "Input RTSP URL") - outputURL = flag.String("o", "", "Output URL (HTTP, UDP or RTP)") - mode = flag.String("m", "r", "Mode: one of f,h,u,r or d") - flags = flag.Int("f", 0, "Flags: see readme for explanation") - frameRate = flag.Int("r", defaultFrameRate, "Input video frame rate (25fps by default)") - selectedPID = flag.Int("p", defaultPID, "Select packets with this packet ID (PID)") -) - -func main() { - setUpDirs() - flag.Parse() - - if *inputURL == "" { - log.Fatal("Input (-i) required\n") - } - - switch *mode { - case "f": - sendClip = sendClipToFile - case "h": - sendClip = sendClipToHTTP - if *outputURL == "" { - *outputURL = defaultHTTPOutput - } - case "u": - sendClip = sendClipToUDP - packetsPerFrame = udpPackets - if *outputURL == "" { - *outputURL = defaultUDPOutput - } - case "r": - sendClip = sendClipToRTP - packetsPerFrame = rtpPackets - if *outputURL == "" { - *outputURL = defaultRTPOutput - } - case "d": - sendClip = sendClipToStdout - default: - log.Fatalf("Invalid mode %s\n", *mode) - } - - if *flags&filterFixContinuity != 0 && *flags&dumpProgramInfo != 0 { - log.Fatal("Cannot combine filterFixContinuity and dumpProgramInfo flags\n") - } - - ringBuffer = ringbuffer.NewRingBuffer(bufferSize, mp2tPacketSize*mp2tMaxPackets) - inputErrChan = make(chan error, 10) - outputErrChan = make(chan error, 10) - - go input(*inputURL, *outputURL) - fmt.Println(*outputURL) - go output(*outputURL) - - for { - select { - default: - case err := <-inputErrChan: - fmt.Fprintln(os.Stderr, err) - fmt.Fprintln(os.Stderr, "Trying again in 10s") - time.Sleep(10 * time.Second) - go input(*inputURL, *outputURL) - case err := <-outputErrChan: - fmt.Fprintln(os.Stderr, err) - fmt.Fprintln(os.Stderr, "Attempting to write again!") - } - } -} - -// setUpDirs sets directories based on the OS that Revid is running on -func setUpDirs() { - switch runtime.GOOS { - case "windows": - ffmpegPath = "C:/ffmpeg/ffmpeg" - tempDir = "tmp/" - case "darwin": - ffmpegPath = "/usr/local/bin/ffmpeg" - tempDir = "/tmp/" - default: - ffmpegPath = "/home/saxon/bin/ffmpeg" - tempDir = "/tmp/" - } -} - -// input handles the reading from the specified input -func input(input string, output string) { - fmt.Printf("Reading video from %s\n", input) - // (re)initialize globals - clipCount = 0 - expectCC = -1 - dumpCC = -1 - dumpPCRBase = 0 - rtpSequenceNum = uint16(rand.Intn(1 << 15)) - - // for UDP and RTP only dial once - var err error - if strings.HasPrefix(output, "udp://") || strings.HasPrefix(output, "rtp://") { - conn, err = net.Dial("udp", output[6:]) - if err != nil { - inputErrChan <- err - return - } - } - - var args []string - switch { - case *flags&filterEdgeDetection != 0: - args = []string{ - "-r", "25", - "-i", input, - "-pix_fmt", "gray", - "-vf", "edgedetect", - "-f", "mpegts", "-", - } - case *flags&filterMotionDetect != 0: - args = []string{ - "-i", input, - "-q:v", qscale, - "-vf", "select=gt(scene\\," + motionThreshold + "),setpts=N/(FRAME_RATE*TB)", - "-f", "mpegts", "-", - } - default: - args = []string{ - "-r", strconv.Itoa(*frameRate), - "-i", input, - } - - if *flags&(filterFixPTS|filterScale640|filterScale320) == 0 { - args = append(args, "-vcodec", "copy") - } else { - vfArg := []string{} - - if *flags&filterFixPTS != 0 { - vfArg = append(vfArg, "setpts='PTS-STARTPTS'") // start counting PTS from zero - } - if *flags&filterScale640 != 0 { - vfArg = append(vfArg, "scale=640:352") - } else if *flags&filterScale320 != 0 { - vfArg = append(vfArg, "scale=320:176") - } - args = append(args, "-vf", strings.Join(vfArg, ",")) - } - - if *flags&filterDropAudio == 0 { - args = append(args, "-acodec", "copy") - } else { - args = append(args, "-an") - } - args = append(args, "-f", "mpegts", "-") - } - - fmt.Printf("Executing: %s %s\n", ffmpegPath, strings.Join(args, " ")) - cmd := exec.Command(ffmpegPath, args...) - stdout, _ := cmd.StdoutPipe() - err = cmd.Start() - br := bufio.NewReader(stdout) - - if err != nil { - inputErrChan <- err - return - } - - clipSize := 0 - packetCount := 0 - now := time.Now() - prevTime := now - fmt.Printf("Looping\n") - for { - if clip, err := ringBuffer.Get(); err != nil { - inputErrChan <- err - return - } else { - for { - upperBound := clipSize + mp2tPacketSize - for _, err := io.ReadFull(br, clip[clipSize:upperBound]); err != nil; { - if err.Error() != "EOF" { - inputErrChan <- err - return - } - _, err = io.ReadFull(br, clip[clipSize:upperBound]) - } - - - if *flags&filterFixContinuity != 0 && mp2tFixContinuity(clip[clipSize:upperBound], uint16(*selectedPID)) { - fmt.Printf("Packet #%d.%d fixed\n", clipCount, packetCount) - } - - packetCount++ - clipSize += mp2tPacketSize - - // send if (1) our buffer is full or (2) 1 second has elapsed and we have % packetsPerFrame - now = time.Now() - if (packetCount == mp2tMaxPackets) || - (now.Sub(prevTime) > clipDuration*time.Second && packetCount%packetsPerFrame == 0) { - clipCount++ - if err := ringBuffer.DoneWriting(clipSize); err != nil { - inputErrChan <- err - return - } - clipSize = 0 - packetCount = 0 - prevTime = now - break - } - } - } - } -} - -// output handles the writing to specified output -func output(output string) { - elapsedTime := time.Duration(0) - now := time.Now() - prevTime := now - for { - if clip, err := ringBuffer.Read(); err == nil { - now := time.Now() - fmt.Println(clip) - for err = sendClip(clip, output, conn); err != nil; { - outputErrChan <- err - err = sendClip(clip, output, conn) - } - deltaTime := now.Sub(prevTime) - elapsedTime += deltaTime - if elapsedTime > bitrateOutputDelay*time.Second { - noOfBits := float64(len(clip)*8) / 1024.0 // convert bytes to kilobits - fmt.Printf("Bitrate: %d kbps\n", int64(noOfBits/float64(deltaTime/1e9))) - elapsedTime = time.Duration(0) - } - prevTime = now - if err := ringBuffer.DoneReading(); err != nil { - outputErrChan <- err - } - } - } -} - -// sendClipToFile writes a video clip to a /tmp file. -func sendClipToFile(clip []byte, _ string, _ net.Conn) error { - filename := fmt.Sprintf(tempDir+"vid%03d.ts", clipCount) - fmt.Printf("Writing %s (%d bytes)\n", filename, len(clip)) - err := ioutil.WriteFile(filename, clip, 0644) - if err != nil { - return fmt.Errorf("Error writing file %s: %s", filename, err) - } - return nil -} - -// sendClipToHTPP posts a video clip via HTTP, using a new TCP connection each time. -func sendClipToHTTP(clip []byte, output string, _ net.Conn) error { - timeout := time.Duration(httpTimeOut * time.Second) - client := http.Client{ - Timeout: timeout, - } - hash := md5.Sum(clip) - url := output + strconv.Itoa(len(clip)) + "." + hex.EncodeToString(hash[:]) // NB: append size.digest to output - fmt.Printf("Posting %s (%d bytes)\n", url, len(clip)) - resp, err := client.Post(url, "video/mp2t", bytes.NewReader(clip)) // lighter than NewBuffer - if err != nil { - return fmt.Errorf("Error posting to %s: %s", output, err) - } - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) - if err == nil { - fmt.Printf("%s\n", body) - } - return err -} - -// sendClipToUDP sends a video clip over UDP. -func sendClipToUDP(clip []byte, _ string, conn net.Conn) error { - size := udpPackets * mp2tPacketSize - fmt.Printf("Sending %d UDP packets of size %d (%d bytes)\n", len(clip)/size, size, len(clip)) - for offset := 0; offset < len(clip); offset += size { - pkt := clip[offset : offset+size] - _, err := conn.Write(pkt) - if err != nil { - return fmt.Errorf("UDP write error %s. Is your player listening?", err) - } - } - return nil -} - -// sendClipToRTP sends a video clip over RTP. -func sendClipToRTP(clip []byte, _ string, conn net.Conn) error { - size := rtpPackets * mp2tPacketSize - fmt.Printf("Sending %d RTP packets of size %d (%d bytes)\n", - len(clip)/size, size+rtpHeaderSize, len(clip)) - pkt := make([]byte, rtpHeaderSize+rtpPackets*mp2tPacketSize) - for offset := 0; offset < len(clip); offset += size { - rtpEncapsulate(clip[offset:offset+size], pkt) - _, err := conn.Write(pkt) - if err != nil { - return fmt.Errorf("RTP write error %s. Is your player listening?", err) - } - } - return nil -} - -// checkContinuityCounts checks that the continuity of the clip is correct -func checkContinuityCounts(clip []byte) error { - for offset := 0; offset < len(clip); offset += mp2tPacketSize { - dumpCC = -1 - pkt := clip[offset : offset+mp2tPacketSize] - cc := int(pkt[3] & 0xf) - if dumpCC != -1 && cc != dumpCC { - return fmt.Errorf("Continuity count out of order. Expected %v, Got: %v.", dumpCC, cc) - } - dumpCC = (cc + 1) % 16 - } - return nil -} - -// sendClipToStdout dumps video stats to stdout. -func sendClipToStdout(clip []byte, _ string, _ net.Conn) error { - fmt.Printf("Dumping clip (%d bytes)\n", len(clip)) -/* - if *flags&dumpProgramInfo != 0 { - return mp2tDumpProgram(clip) - } - */ - - packetCount := 0 - discontinuities := 0 - var cc int - - for offset := 0; offset < len(clip); offset += mp2tPacketSize { - packetCount++ - pkt := clip[offset : offset+mp2tPacketSize] - - //pktPID, err := packet.Pid(pkt) - //fmt.Printf("pktID: %v\n", pkt) - //if err != nil { - //return err - //} - /* - if pktPID != uint16(*selectedPID) { - continue - } - */ - - if *flags&(dumpPacketHeader|dumpPacketPayload) != 0 { - fmt.Printf("Packet #%d.%d\n", clipCount, packetCount) - } - - hasPayload := pkt[3]&0x10 != 0 - if !hasPayload { - continue // nothing to do - } - - // extract interesting info from header - tei := pkt[1] & 0x80 >> 7 - pusi := pkt[1] & 0x40 >> 6 - tp := pkt[1] & 0x20 >> 5 - tcs := pkt[3] & 0xc0 >> 6 - afc := pkt[3] & 0x30 >> 4 - cc = int(pkt[3] & 0xf) - di := pkt[5] & 0x80 - - if dumpCC != -1 && cc != dumpCC { - discontinuities++ - fmt.Printf("Warning: Packet #%d.%d continuity counter out of order! Got %d, expected %d.\n", - clipCount, packetCount, cc, dumpCC) - } - dumpCC = (cc + 1) % 16 - - //if *flags&dumpPacketHeader != 0 { - fmt.Printf("\t\tTEI=%d, PUSI=%d, TP=%d, TSC=%d, AFC=%d, CC=%d\n", tei, pusi, tp, tcs, afc, cc) - //} - - if afc == 3 { - // adaptation field, followed by payload - afl := adaptationfield.Length(pkt) - if adaptationfield.HasPCR(pkt) { - pcrBase, pcrExt, _ := mp2tGetPCR(pkt) - if *flags&dumpPacketHeader != 0 { - fmt.Printf("\t\tAFL=%d, PCRbase=%d, PCRext=%d, DI=%v\n", afl, pcrBase, pcrExt, di) - } - if pcrBase < dumpPCRBase { - fmt.Printf("Warning: PCRbase went backwards!\n") - } - dumpPCRBase = pcrBase - } else if *flags&dumpPacketHeader != 0 { - fmt.Printf("\t\tAFL=%d, DI=%v\n", afl, di) - } - } - if *flags&dumpPacketPayload != 0 { - fmt.Printf("\t\tPayload=%x\n", pkt) - } - - } - if *flags&dumpPacketStats != 0 { - fmt.Printf("%d packets of size %d bytes (%d bytes, %d discontinuites)\n", - packetCount, packet.PacketSize, packetCount*packet.PacketSize, discontinuities) - } - return nil -} - -// mp2tDumpProgram dumps MPEG-TS Program Association Table (PAT) and Program Map Tables (PMT). -func mp2tDumpProgram(clip []byte) error { - // NB: Comcast API requires a buffered reader - reader := bufio.NewReader(bytes.NewReader(clip)) - - _, err := packet.Sync(reader) - if err != nil { - return fmt.Errorf("Error reading sync byte: %s", err) - } - pat, err := psi.ReadPAT(reader) - if err != nil { - return fmt.Errorf("Error reading PAT: %s", err) - } - mp2tDumpPat(pat) - - var pmts []psi.PMT - pm := pat.ProgramMap() - for pn, pid := range pm { - pmt, err := psi.ReadPMT(reader, pid) - if err != nil { - return fmt.Errorf("Error reading PMT: %s", err) - } - pmts = append(pmts, pmt) - mp2tDumpPmt(pn, pmt) - } - return nil -} - -func mp2tDumpPat(pat psi.PAT) { - fmt.Printf("Pat\n") - fmt.Printf("\tPMT PIDs %v\n", pat.ProgramMap()) - fmt.Printf("\tNumber of Programs %v\n", pat.NumPrograms()) -} - -func mp2tDumpPmt(pn uint16, pmt psi.PMT) { - // pn = program number - fmt.Printf("Program #%v PMT\n", pn) - fmt.Printf("\tPIDs %v\n", pmt.Pids()) - fmt.Printf("\tElementary Streams") - for _, es := range pmt.ElementaryStreams() { - fmt.Printf("\t\tPID %v : StreamType %v\n", es.ElementaryPid(), es.StreamType()) - for _, d := range es.Descriptors() { - fmt.Printf("\t\t\t%+v\n", d) - } - } -} - -// Mp2tFixContinuity fixes discontinous MPEG-TS continuity counts (CC) -func mp2tFixContinuity(pkt []byte, pid uint16) bool { - - hasPayload, err := packet.ContainsPayload(pkt) - if err != nil { - fmt.Printf("Warning: Packet bad.\n") - return false - } - if !hasPayload { - return false - } - if pktPID, _ := packet.Pid(pkt); pktPID != pid { - return false - } - fixed := false - // extract continuity counter from 2nd nibble of 4th byte of header - cc := int(pkt[3] & 0xf) - if expectCC == -1 { - expectCC = cc - } else if cc != expectCC { - fmt.Println("Have to fix") - pkt[3] = pkt[3]&0xf0 | byte(expectCC&0xf) - fixed = true - } - expectCC = (expectCC + 1) % 16 - return fixed -} - -// Mp2tGetPCR extracts the Program Clock Reference (PCR) from an MPEG-TS packet (if any) -func mp2tGetPCR(pkt []byte) (uint64, uint32, bool) { - if !adaptationfield.HasPCR(pkt) { - return 0, 0, false - } - pcrBytes, _ := adaptationfield.PCR(pkt) // 6 bytes - // first 33 bits are PCR base, next 6 bits are reserved, final 9 bits are PCR extension. - pcrBase := uint64(binary.BigEndian.Uint32(pcrBytes[:4]))<<1 | uint64(pcrBytes[4]&0x80>>7) - pcrExt := uint32(pcrBytes[4]&0x01)<<1 | uint32(pcrBytes[5]) - return pcrBase, pcrExt, true -} - -// rtpEncapsulate encapsulates MPEG-TS packets within an RTP header, -// setting the payload type accordingly (to 33) and incrementing the RTP sequence number. -func rtpEncapsulate(mp2tPacket []byte, pkt []byte) { - // RTP packet encapsulates the MP2T - // first 12 bytes is the header - // byte 0: version=2, padding=0, extension=0, cc=0 - pkt[0] = 0x80 // version (2) - // byte 1: marker=0, pt = 33 (MP2T) - pkt[1] = 33 - // bytes 2 & 3: sequence number - binary.BigEndian.PutUint16(pkt[2:4], rtpSequenceNum) - if rtpSequenceNum == ^uint16(0) { - rtpSequenceNum = 0 - } else { - rtpSequenceNum++ - } - // bytes 4,5,6&7: timestamp - timestamp := uint32(time.Now().UnixNano() / 1e6) // ms timestamp - binary.BigEndian.PutUint32(pkt[4:8], timestamp) - // bytes 8,9,10&11: SSRC - binary.BigEndian.PutUint32(pkt[8:12], rtpSSRC) - - // payload follows - copy(pkt[rtpHeaderSize:rtpHeaderSize+rtpPackets*mp2tPacketSize], mp2tPacket) -} diff --git a/revid/revid_test.go b/revid/revid_test.go index 97c6445f..ff1525b7 100644 --- a/revid/revid_test.go +++ b/revid/revid_test.go @@ -134,3 +134,10 @@ func TestRTP(t *testing.T) { fmt.Printf("RTP packet: %v\n", rtpPacket) } } + +/* + Testing use with raspivid +*/ +func TestRaspividInput(t *testing.T){ + +} diff --git a/revid/revidinstance.go b/revid/revidinstance.go new file mode 100644 index 00000000..1df058e6 --- /dev/null +++ b/revid/revidinstance.go @@ -0,0 +1,271 @@ +/* +NAME + revid - a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols. + +DESCRIPTION + See Readme.md + +AUTHORS + Alan Noble + Saxon A. Nelson-Milton + +LICENSE + revid is Copyright (C) 2017 Alan Noble. + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). +*/ + +// revid is a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols. +package revid + +// TODO: do error handling using logger + + +import ( + "bufio" + "bytes" + "crypto/md5" + "encoding/binary" + "encoding/hex" + "flag" + "fmt" + "io" + "io/ioutil" + "log" + "math/rand" + "net" + "net/http" + "os" + "os/exec" + "runtime" + "strconv" + "strings" + "time" + "errors" + "log" + "/tools" + "ioutil" + + "bitbucket.org/ausocean/av/ringbuffer" + + "github.com/Comcast/gots/packet" + "github.com/Comcast/gots/packet/adaptationfield" + "github.com/Comcast/gots/psi" +) + +// defaults and networking consts +const ( + clipDuration = 1 // s + defaultPID = 256 + defaultFrameRate = 25 + defaultHTTPOutput = "http://localhost:8080?" + defaultUDPOutput = "udp://0.0.0.0:16384" + defaultRTPOutput = "rtp://0.0.0.0:16384" + mp2tPacketSize = 188 // MPEG-TS packet size + mp2tMaxPackets = 2016 * clipDuration // # first multiple of 7 and 8 greater than 2000 + udpPackets = 7 // # of UDP packets per ethernet frame (8 is the max) + rtpPackets = 7 // # of RTP packets per ethernet frame (7 is the max) + rtpHeaderSize = 12 + rtpSSRC = 1 // any value will do + bufferSize = 1000 / clipDuration + bitrateOutputDelay = 60 // s + httpTimeOut = 5 // s + motionThreshold = "0.0025" + qscale = "3" +) + +type RevidInst interface { + Run() + Stop() + ChangeState(newConfig Config) err +} + +const ( + // input types + raspivid = 0 + rtp = 1 + // input format + h264 = 2 + rtp = 3 + // output types + file = 4 + http = 5 +) + +type Config struct { + input uint8 + output uint8 +} + +type revidInst struct { + clipCount int + expectCC int + dumpCC int + dumpPCRBase uint64 + conn net.Conn + ffmpegPath string + tempDir string + ringBuffer ringbuffer.RingBuffer + Config Config + isRunning bool + Error *log.Logger +} + +func NewRevidInstance(config Config)(r *revid, err error){ + ringBuffer = ringbuffer.NewRingBuffer(bufferSize, mp2tPacketSize*mp2tMaxPackets) + Error = log.New(os.Stderr, "ERROR: ",log.Ldat|log.Ltime|log.Lshortfile) +} + +func(r *revidInst) Run(){ + isRunning = true + go r.input() + go r.output() +} + +func(r *revidInst) Stop(){ + r.isRunning = false +} + +// input handles the reading from the specified input +func (r *revid)input(input string, output string) { + // (re)initialize globals + clipCount = 0 + expectCC = -1 + dumpCC = -1 + dumpPCRBase = 0 + + converter := tscreator.NewTsCreator(framesPerSec) + go converter.Convert() + h264Parser := h264.NewH264Parser(converter.NalInputChan) + go h264Parser.Parse() + + var err error + var inputBuffer *Reader + switch(r.config.input){ + case raspivid: + cmd := exec.Command("raspivid -o -") + stdout, _ := cmd.StdoutPipe() + err = cmd.Start() + inputReader = bufio.NewReader(stdout) + if err != nil { + r.Error.Println(err.Error()) + return + } + default: + r.Error.Println("Input not valid!") + } + clipSize := 0 + packetCount := 0 + now := time.Now() + prevTime := now + for r.isRunning { + var h264Data []byte + if h264Data, err = ioutil.ReadAll(inputReader); err != nil { + r.Error.Println(err.Error()) + } + h264Parser.SendInputData(h264Data) + + if clip, err := ringBuffer.Get(); err != nil { + r.Error.Println(err.Error()) + return + } else { + for { + upperBound := clipSize + mp2tPacketSize + // get ts packet from converter + clip = (<-converter.TsChan).ToByteSlice() + packetCount++ + clipSize += mp2tPacketSize + // send if (1) our buffer is full or (2) 1 second has elapsed and we have % packetsPerFrame + now = time.Now() + if (packetCount == mp2tMaxPackets) || + (now.Sub(prevTime) > clipDuration*time.Second && packetCount%packetsPerFrame == 0) { + clipCount++ + if err := ringBuffer.DoneWriting(clipSize); err != nil { + r.Error.Println(err.Error()) + return + } + clipSize = 0 + packetCount = 0 + prevTime = now + break + } + } + } + } +} + +// output handles the writing to specified output +func (r *revid)output(output string) { + for r.isRunning { + if clip, err := ringBuffer.Read(); err == nil { + fmt.Println(clip) + for err = sendClip(clip, output, conn); err != nil; { + r.Error.Println(err.Error()) + err = sendClip(clip, output, conn) + } + if err := ringBuffer.DoneReading(); err != nil { + r.Error.Println(err.Error()) + } + } + } +} + +// sendClipToFile writes a video clip to a /tmp file. +func sendClipToFile(clip []byte, _ string, _ net.Conn) error { + filename := fmt.Sprintf(tempDir+"vid%03d.ts", clipCount) + fmt.Printf("Writing %s (%d bytes)\n", filename, len(clip)) + err := ioutil.WriteFile(filename, clip, 0644) + if err != nil { + return fmt.Errorf("Error writing file %s: %s", filename, err) + } + return nil +} + +// sendClipToHTPP posts a video clip via HTTP, using a new TCP connection each time. +func sendClipToHTTP(clip []byte, output string, _ net.Conn) error { + timeout := time.Duration(httpTimeOut * time.Second) + client := http.Client{ + Timeout: timeout, + } + hash := md5.Sum(clip) + url := output + strconv.Itoa(len(clip)) + "." + hex.EncodeToString(hash[:]) // NB: append size.digest to output + fmt.Printf("Posting %s (%d bytes)\n", url, len(clip)) + resp, err := client.Post(url, "video/mp2t", bytes.NewReader(clip)) // lighter than NewBuffer + if err != nil { + return fmt.Errorf("Error posting to %s: %s", output, err) + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err == nil { + fmt.Printf("%s\n", body) + } + return err +} + +// sendClipToUDP sends a video clip over UDP. +func sendClipToUDP(clip []byte, _ string, conn net.Conn) error { + size := udpPackets * mp2tPacketSize + fmt.Printf("Sending %d UDP packets of size %d (%d bytes)\n", len(clip)/size, size, len(clip)) + for offset := 0; offset < len(clip); offset += size { + pkt := clip[offset : offset+size] + _, err := conn.Write(pkt) + if err != nil { + return fmt.Errorf("UDP write error %s. Is your player listening?", err) + } + } + return nil +} +} + } +} diff --git a/revid/tools/tsFuncs.go b/revid/tools/tsFuncs.go new file mode 100644 index 00000000..1c1ecc67 --- /dev/null +++ b/revid/tools/tsFuncs.go @@ -0,0 +1,48 @@ +package tools + +// mp2tDumpProgram dumps MPEG-TS Program Association Table (PAT) and Program Map Tables (PMT). +func mp2tDumpProgram(clip []byte) error { + // NB: Comcast API requires a buffered reader + reader := bufio.NewReader(bytes.NewReader(clip)) + + _, err := packet.Sync(reader) + if err != nil { + return fmt.Errorf("Error reading sync byte: %s", err) + } + pat, err := psi.ReadPAT(reader) + if err != nil { + return fmt.Errorf("Error reading PAT: %s", err) + } + mp2tDumpPat(pat) + + var pmts []psi.PMT + pm := pat.ProgramMap() + for pn, pid := range pm { + pmt, err := psi.ReadPMT(reader, pid) + if err != nil { + return fmt.Errorf("Error reading PMT: %s", err) + } + pmts = append(pmts, pmt) + mp2tDumpPmt(pn, pmt) + } + return nil +} + +func mp2tDumpPat(pat psi.PAT) { + fmt.Printf("Pat\n") + fmt.Printf("\tPMT PIDs %v\n", pat.ProgramMap()) + fmt.Printf("\tNumber of Programs %v\n", pat.NumPrograms()) +} + +func mp2tDumpPmt(pn uint16, pmt psi.PMT) { + // pn = program number + fmt.Printf("Program #%v PMT\n", pn) + fmt.Printf("\tPIDs %v\n", pmt.Pids()) + fmt.Printf("\tElementary Streams") + for _, es := range pmt.ElementaryStreams() { + fmt.Printf("\t\tPID %v : StreamType %v\n", es.ElementaryPid(), es.StreamType()) + for _, d := range es.Descriptors() { + fmt.Printf("\t\t\t%+v\n", d) + } + } +} diff --git a/tscreator/TsCreator.go b/tsgenerator/tsgenerator.go similarity index 99% rename from tscreator/TsCreator.go rename to tsgenerator/tsgenerator.go index e5c7f7b5..eff9e986 100644 --- a/tscreator/TsCreator.go +++ b/tsgenerator/tsgenerator.go @@ -26,7 +26,7 @@ LICENSE along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). */ -package tscreator +package tsgenerator import ( _ "fmt" diff --git a/tsgenerator/tsgenerator_test.go b/tsgenerator/tsgenerator_test.go new file mode 100644 index 00000000..6da42621 --- /dev/null +++ b/tsgenerator/tsgenerator_test.go @@ -0,0 +1 @@ +package tsgenerator