/* NAME revid - a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols. DESCRIPTION See Readme.md AUTHORS Alan Noble Saxon A. Nelson-Milton LICENSE revid is Copyright (C) 2017 Alan Noble. It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). */ // revid is a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols. package revid import ( "bufio" "bytes" "crypto/md5" "encoding/hex" "fmt" "io/ioutil" "log" "net" "net/http" "os" "os/exec" "strconv" "time" "io" "bitbucket.org/ausocean/av/h264" "bitbucket.org/ausocean/av/tsgenerator" "bitbucket.org/ausocean/av/ringbuffer" ) // defaults and networking consts const ( clipDuration = 1 // s defaultPID = 256 defaultFrameRate = 25 mp2tPacketSize = 188 // MPEG-TS packet size mp2tMaxPackets = 2016 * clipDuration // # first multiple of 7 and 8 greater than 2000 udpPackets = 7 // # of UDP packets per ethernet frame (8 is the max) rtpPackets = 7 // # of RTP packets per ethernet frame (7 is the max) rtpHeaderSize = 12 rtpSSRC = 1 // any value will do bufferSize = 1000 / clipDuration httpTimeOut = 5 // s motionThreshold = "0.0025" qscale = "3" defaultRaspividCmd = "raspivid -o -" framesPerSec = 25 packetsPerFrame = 7 h264BufferSize = 500000 ) const ( raspivid = 0 rtp = 1 h264Codec = 2 file = 4 httpOut = 5 ) type Config struct { Input uint8 InputCmd string Output uint8 OutputFileName string InputFileName string } type RevidInst interface { Start() Stop() ChangeState(newConfig Config) error } type revidInst struct { expectCC int dumpCC int dumpPCRBase uint64 conn net.Conn ffmpegPath string tempDir string ringBuffer ringbuffer.RingBuffer config Config isRunning bool Error *log.Logger outputFile *os.File inputFile *os.File } func NewRevidInstance(config Config) (r *revidInst, err error) { r = new(revidInst) r.ringBuffer = ringbuffer.NewRingBuffer(bufferSize, mp2tPacketSize*mp2tMaxPackets) r.Error = log.New(os.Stderr, "ERROR: ", log.Ldate|log.Ltime|log.Lshortfile) r.expectCC = -1 r.dumpCC = -1 r.dumpPCRBase = 0 r.ChangeState(config) switch r.config.Output { case file: r.outputFile, err = os.Create(r.config.OutputFileName) if err != nil { return nil, err } } switch r.config.Input { case file: r.inputFile, err = os.Open(r.config.InputFileName) if err != nil { return nil, err } } return } func (r *revidInst) ChangeState(newConfig Config) error { // TODO: check that the config is G r.config = newConfig return nil } func (r *revidInst) Start() { r.isRunning = true go r.input() go r.output() } func (r *revidInst) Stop() { r.isRunning = false } func (r *revidInst) input() { generator := tsgenerator.NewTsGenerator(framesPerSec) go generator.Generate() h264Parser := h264.H264Parser{OutputChan: generator.NalInputChan} // TODO: Need to create constructor for parser otherwise I'm going to break // something eventuallyl go h264Parser.Parse() var inputReader *bufio.Reader switch r.config.Input { case raspivid: cmd := exec.Command("raspivid", "-o", "-", "-n", "-t", "0", "-b", "1000000", "-w","1280","-h","720") stdout, _ := cmd.StdoutPipe() err := cmd.Start() inputReader = bufio.NewReader(stdout) if err != nil { r.Error.Println(err.Error()) return } case file: default: r.Error.Println("Input not valid!") } clipSize := 0 packetCount := 0 now := time.Now() prevTime := now startPackets := [][]byte{ {71, 64, 17, 16, 0, 66, 240, 65, 0, 1, 193, 0, 0, 255, 1, 255, 0, 1, 252, 128, 48, 72, 46, 1, 6, 70, 70, 109, 112, 101, 103, 37, 115, 116, 114, 101, 97, 109, 101, 100, 32, 98, 121, 32, 116, 104, 101, 32, 71, 101, 111, 86, 105, 115, 105, 111, 110, 32, 82, 116, 115, 112, 32, 83, 101, 114, 118, 101, 114, 99, 176, 214, 195, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255}, {71, 64, 0, 16, 0, 0, 176, 13, 0, 1, 193, 0, 0, 0, 1, 240, 0, 42, 177, 4, 178, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255}, /*PMT*/ {71, 80, 0, 16, /*Start of payload*/ 0, 2, 176, 18, 0, 1, 193, 0, 0, 0xE1, 0x00, 0xF0, 0, 0x1B, 0xE1, 0, 0xF0, 0, 0x15, 0xBD, 0x4D, 0x56, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255}, } donePSI := false ii := 0 fmt.Println("reading") var h264Data []byte switch(r.config.Input){ case raspivid: go func(){ for { h264Data = make([]byte, 2) _,err := io.ReadFull(inputReader, h264Data) if err == nil { h264Parser.InputByteChan<-h264Data[0] h264Parser.InputByteChan<-h264Data[1] } } }() case file: stats, err := r.inputFile.Stat() if err != nil { panic("Could not get file stats!") } h264Data = make([]byte, stats.Size()) _, err = r.inputFile.Read(h264Data) if err != nil { r.Error.Println(err.Error()) } for i := range h264Data { h264Parser.InputByteChan<-h264Data[i] } } for r.isRunning { if clip, err := r.ringBuffer.Get(); err != nil { r.Error.Println(err.Error()) return } else { for { upperBound := clipSize + mp2tPacketSize if ii < 3 && !donePSI { packetByteSlice := startPackets[ii] copy(clip[clipSize:upperBound], packetByteSlice) ii++ } else { donePSI = true if err != nil { fmt.Println(err) } tsPacket := <-generator.TsChan byteSlice, err := tsPacket.ToByteSlice() if err != nil { r.Error.Println(err.Error()) } copy(clip[clipSize:upperBound],byteSlice) } packetCount++ clipSize += mp2tPacketSize // send if (1) our buffer is full or (2) 1 second has elapsed and we have % packetsPerFrame now = time.Now() if (packetCount == mp2tMaxPackets) || (now.Sub(prevTime) > clipDuration*time.Second && packetCount%packetsPerFrame == 0) { if err := r.ringBuffer.DoneWriting(clipSize); err != nil { r.Error.Println(err.Error()) return } clipSize = 0 packetCount = 0 prevTime = now break } } } } } func (r *revidInst) output() { for r.isRunning { if clip, err := r.ringBuffer.Read(); err == nil { switch r.config.Output { case file: r.outputFile.Write(clip) default: r.Error.Println("No output?") } if err := r.ringBuffer.DoneReading(); err != nil { r.Error.Println(err.Error()) } } } } // sendClipToHTPP posts a video clip via HTTP, using a new TCP connection each time. func sendClipToHTTP(clip []byte, output string, _ net.Conn) error { timeout := time.Duration(httpTimeOut * time.Second) client := http.Client{ Timeout: timeout, } hash := md5.Sum(clip) url := output + strconv.Itoa(len(clip)) + "." + hex.EncodeToString(hash[:]) // NB: append size.digest to output fmt.Printf("Posting %s (%d bytes)\n", url, len(clip)) resp, err := client.Post(url, "video/mp2t", bytes.NewReader(clip)) // lighter than NewBuffer if err != nil { return fmt.Errorf("Error posting to %s: %s", output, err) } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err == nil { fmt.Printf("%s\n", body) } return nil }