/* NAME revid - a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols. DESCRIPTION See Readme.md AUTHORS Alan Noble Saxon A. Nelson-Milton LICENSE revid is Copyright (C) 2017 Alan Noble. It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). */ // revid is a testbed for re-muxing and re-directing video streams as MPEG-TS over various protocols. package revid // TODO: do error handling using logger import ( "bufio" "bytes" "crypto/md5" "encoding/binary" "encoding/hex" "flag" "fmt" "io" "io/ioutil" "log" "math/rand" "net" "net/http" "os" "os/exec" "runtime" "strconv" "strings" "time" "errors" "log" "/tools" "ioutil" "bitbucket.org/ausocean/av/ringbuffer" "github.com/Comcast/gots/packet" "github.com/Comcast/gots/packet/adaptationfield" "github.com/Comcast/gots/psi" ) // defaults and networking consts const ( clipDuration = 1 // s defaultPID = 256 defaultFrameRate = 25 defaultHTTPOutput = "http://localhost:8080?" defaultUDPOutput = "udp://0.0.0.0:16384" defaultRTPOutput = "rtp://0.0.0.0:16384" mp2tPacketSize = 188 // MPEG-TS packet size mp2tMaxPackets = 2016 * clipDuration // # first multiple of 7 and 8 greater than 2000 udpPackets = 7 // # of UDP packets per ethernet frame (8 is the max) rtpPackets = 7 // # of RTP packets per ethernet frame (7 is the max) rtpHeaderSize = 12 rtpSSRC = 1 // any value will do bufferSize = 1000 / clipDuration bitrateOutputDelay = 60 // s httpTimeOut = 5 // s motionThreshold = "0.0025" qscale = "3" ) type RevidInst interface { Run() Stop() ChangeState(newConfig Config) err } type revidInst struct { expectCC int dumpCC int dumpPCRBase uint64 conn net.Conn ffmpegPath string tempDir string ringBuffer ringbuffer.RingBuffer Config Config isRunning bool Error *log.Logger } func NewRevidInstance(config Config)(r *revid, err error){ ringBuffer = ringbuffer.NewRingBuffer(bufferSize, mp2tPacketSize*mp2tMaxPackets) r.Error = log.New(os.Stderr, "ERROR: ",log.Ldat|log.Ltime|log.Lshortfile) r.expectCC = -1 r.dumpCC = -1 r.dumpPCRBase = 0 } func(r *revidInst) Run(){ isRunning = true go r.input() go r.output() } func(r *revidInst) Stop(){ r.isRunning = false } // input handles the reading from the specified input func (r *revid)input(input string, output string) { // (re)initialize globals r.expectCC = -1 r.dumpCC = -1 r.dumpPCRBase = 0 converter := tscreator.NewTsCreator(framesPerSec) go converter.Convert() h264Parser := h264.NewH264Parser(converter.NalInputChan) go h264Parser.Parse() var err error var inputBuffer *Reader switch(r.config.input){ case raspivid: cmd := exec.Command("raspivid -o -") stdout, _ := cmd.StdoutPipe() err = cmd.Start() inputReader = bufio.NewReader(stdout) if err != nil { r.Error.Println(err.Error()) return } default: r.Error.Println("Input not valid!") } clipSize := 0 packetCount := 0 now := time.Now() prevTime := now for r.isRunning { var h264Data []byte if h264Data, err = ioutil.ReadAll(inputReader); err != nil { r.Error.Println(err.Error()) } h264Parser.SendInputData(h264Data) if clip, err := ringBuffer.Get(); err != nil { r.Error.Println(err.Error()) return } else { for { upperBound := clipSize + mp2tPacketSize // get ts packet from converter clip = (<-converter.TsChan).ToByteSlice() packetCount++ clipSize += mp2tPacketSize // send if (1) our buffer is full or (2) 1 second has elapsed and we have % packetsPerFrame now = time.Now() if (packetCount == mp2tMaxPackets) || (now.Sub(prevTime) > clipDuration*time.Second && packetCount%packetsPerFrame == 0) { if err := ringBuffer.DoneWriting(clipSize); err != nil { r.Error.Println(err.Error()) return } clipSize = 0 packetCount = 0 prevTime = now break } } } } } // output handles the writing to specified output func (r *revid)output(output string) { for r.isRunning { if clip, err := ringBuffer.Read(); err == nil { fmt.Println(clip) for err = sendClip(clip, output, conn); err != nil; { r.Error.Println(err.Error()) err = sendClip(clip, output, conn) } if err := ringBuffer.DoneReading(); err != nil { r.Error.Println(err.Error()) } } } } // sendClipToFile writes a video clip to a /tmp file. func sendClipToFile(clip []byte, _ string, _ net.Conn) error { return nil } // sendClipToHTPP posts a video clip via HTTP, using a new TCP connection each time. func sendClipToHTTP(clip []byte, output string, _ net.Conn) error { timeout := time.Duration(httpTimeOut * time.Second) client := http.Client{ Timeout: timeout, } hash := md5.Sum(clip) url := output + strconv.Itoa(len(clip)) + "." + hex.EncodeToString(hash[:]) // NB: append size.digest to output fmt.Printf("Posting %s (%d bytes)\n", url, len(clip)) resp, err := client.Post(url, "video/mp2t", bytes.NewReader(clip)) // lighter than NewBuffer if err != nil { return fmt.Errorf("Error posting to %s: %s", output, err) } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err == nil { fmt.Printf("%s\n", body) } return err } // sendClipToUDP sends a video clip over UDP. func sendClipToUDP(clip []byte, _ string, conn net.Conn) error { size := udpPackets * mp2tPacketSize fmt.Printf("Sending %d UDP packets of size %d (%d bytes)\n", len(clip)/size, size, len(clip)) for offset := 0; offset < len(clip); offset += size { pkt := clip[offset : offset+size] _, err := conn.Write(pkt) if err != nil { return fmt.Errorf("UDP write error %s. Is your player listening?", err) } } return nil }