/* NAME senders.go DESCRIPTION See Readme.md AUTHORS Saxon A. Nelson-Milton Alan Noble LICENSE revid is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. */ package revid import ( "fmt" "io" "net" "os" "os/exec" "strconv" "bitbucket.org/ausocean/av/rtmp" "bitbucket.org/ausocean/av/stream/mts" "bitbucket.org/ausocean/av/stream/rtp" "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/ring" "github.com/Comcast/gots/packet" ) // loadSender is a destination to send a *ring.Chunk to. // When a loadSender has finished using the *ring.Chunk // it must be Closed. type loadSender interface { // load assigns the *ring.Chunk to the loadSender. // The load call may fail, but must not mutate the // the chunk. load(*ring.Chunk) error // send performs a destination-specific send // operation. It must not mutate the chunk. send() error // release releases the *ring.Chunk. release() // close cleans up after use of the loadSender. close() error } // restart is an optional interface for loadSenders that // can restart their connection. type restarter interface { restart() error } // fileSender implements loadSender for a local file destination. type fileSender struct { file *os.File chunk *ring.Chunk } func newFileSender(path string) (*fileSender, error) { f, err := os.Create(path) if err != nil { return nil, err } return &fileSender{file: f}, nil } func (s *fileSender) load(c *ring.Chunk) error { s.chunk = c return nil } func (s *fileSender) send() error { _, err := s.chunk.WriteTo(s.file) return err } func (s *fileSender) release() { s.chunk.Close() s.chunk = nil } func (s *fileSender) close() error { return s.file.Close() } // mtsSender provides sending capability specifically for use with // mpegts packetization. It handles the construction of appropriately lengthed // clips based on PSI. It also fixes accounts for discontinuities by setting // the discontinuity indicator for the first packet of a clip. type mtsSender struct { ls loadSender buf []byte pkt [mts.PacketSize]byte fail bool repairer *mts.DiscontinuityRepairer chunk *ring.Chunk } // newmtsSender returns a new mtsSender. func newMtsSender(s loadSender, log func(lvl int8, msg string, args ...interface{})) *mtsSender { return &mtsSender{ ls: s, repairer: mts.NewDiscontinuityRepairer(), } } // load takes a *ring.Chunk and extracts bytes copying into s.pkt for use by the sender. func (s *mtsSender) load(c *ring.Chunk) error { s.chunk = c return nil } // send checks the most recently loaded packet and if it is a PAT then the clip // in s.buf is sent, otherwise the packet is added to s.buf. func (s *mtsSender) send() error { copy(s.pkt[:], s.chunk.Bytes()) if s.fail || (((*packet.Packet)(&s.pkt)).PID() == mts.PatPid && len(s.buf) != 0) { err := s.fixAndSend() if err != nil { s.failed() return err } s.fail = false s.buf = s.buf[:0] } s.buf = append(s.buf, s.chunk.Bytes()...) return nil } // failed sets the s.fail flag to true, and let's the discontinuity // repairer know that there has been a failed send. func (s *mtsSender) failed() { s.fail = true s.repairer.Failed() } // fixAndSend uses the discontinuity repairer to ensure there is not a // discontinuity, and if so sets the discontinuity indicator of the PAT packet. func (s *mtsSender) fixAndSend() error { err := s.repairer.Repair(s.buf) if err != nil { return err } err = s.ls.load(ring.NewChunk(s.buf)) if err != nil { return err } err = s.ls.send() if err != nil { return err } s.ls.release() return nil } func (s *mtsSender) close() error { return nil } // release will set the s.fail flag to fals and clear the buffer if // the previous send was a fail. func (s *mtsSender) release() { if s.fail { s.fail = false s.buf = s.buf[:0] } } // httpSender implements loadSender for posting HTTP to NetReceiver type httpSender struct { client *netsender.Sender log func(lvl int8, msg string, args ...interface{}) chunk *ring.Chunk } func newHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *httpSender { return &httpSender{ client: ns, log: log, } } func (s *httpSender) load(c *ring.Chunk) error { s.chunk = c return nil } func (s *httpSender) send() error { if s.chunk == nil { // Do not retry with httpSender, // so just return without error // if the chunk has been cleared. return nil } return s.httpSend(s.chunk.Bytes()) } func (s *httpSender) httpSend(d []byte) error { // Only send if "V0" is configured as an input. send := false ip := s.client.Param("ip") pins := netsender.MakePins(ip, "V") for i, pin := range pins { if pin.Name == "V0" { send = true pins[i].Value = len(d) pins[i].Data = d pins[i].MimeType = "video/mp2t" break } } if !send { return nil } var err error var reply string reply, _, err = s.client.Send(netsender.RequestRecv, pins) if err != nil { return err } return s.extractMeta(reply) } // extractMeta looks at a reply at extracts any time or location data - then used // to update time and location information in the mpegts encoder. func (s *httpSender) extractMeta(r string) error { dec, err := netsender.NewJSONDecoder(r) if err != nil { return nil } // Extract time from reply t, err := dec.Int("ts") if err != nil { s.log(logger.Warning, pkg+"No timestamp in reply") } else { s.log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t)) mts.Meta.Add("ts", strconv.Itoa(t)) } // Extract location from reply g, err := dec.String("ll") if err != nil { s.log(logger.Warning, pkg+"No location in reply") } else { s.log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g)) mts.Meta.Add("loc", g) } return nil } func (s *httpSender) release() { // We will not retry, so release // the chunk and clear it now. s.chunk.Close() s.chunk = nil } func (s *httpSender) close() error { return nil } // ffmpegSender implements loadSender for an FFMPEG RTMP destination. type ffmpegSender struct { ffmpeg io.WriteCloser chunk *ring.Chunk } func newFfmpegSender(url, framerate string) (*ffmpegSender, error) { cmd := exec.Command(ffmpegPath, "-f", "h264", "-r", framerate, "-i", "-", "-f", "lavfi", "-i", "aevalsrc=0", "-fflags", "nobuffer", "-vcodec", "copy", "-acodec", "aac", "-map", "0:0", "-map", "1:0", "-strict", "experimental", "-f", "flv", url, ) w, err := cmd.StdinPipe() if err != nil { return nil, err } err = cmd.Start() if err != nil { return nil, err } return &ffmpegSender{ffmpeg: w}, nil } func (s *ffmpegSender) load(c *ring.Chunk) error { s.chunk = c return nil } func (s *ffmpegSender) send() error { _, err := s.chunk.WriteTo(s.ffmpeg) return err } func (s *ffmpegSender) release() { s.chunk.Close() s.chunk = nil } func (s *ffmpegSender) close() error { return s.ffmpeg.Close() } // rtmpSender implements loadSender for a native RTMP destination. type rtmpSender struct { conn *rtmp.Conn url string timeout uint retries int log func(lvl int8, msg string, args ...interface{}) chunk *ring.Chunk } var _ restarter = (*rtmpSender)(nil) func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) { var conn *rtmp.Conn var err error for n := 0; n < retries; n++ { conn, err = rtmp.Dial(url, timeout, log) if err == nil { break } log(logger.Error, err.Error()) conn.Close() if n < retries-1 { log(logger.Info, pkg+"retry rtmp connection") } } if err != nil { return nil, err } s := &rtmpSender{ conn: conn, url: url, timeout: timeout, retries: retries, log: log, } return s, nil } func (s *rtmpSender) load(c *ring.Chunk) error { s.chunk = c return nil } func (s *rtmpSender) send() error { _, err := s.chunk.WriteTo(s.conn) return err } func (s *rtmpSender) release() { s.chunk.Close() s.chunk = nil } func (s *rtmpSender) restart() error { err := s.conn.Close() if err != nil { return err } for n := 0; n < s.retries; n++ { s.conn, err = rtmp.Dial(s.url, s.timeout, s.log) if err == nil { break } s.log(logger.Error, err.Error()) s.conn.Close() if n < s.retries-1 { s.log(logger.Info, pkg+"retry rtmp connection") } } return err } func (s *rtmpSender) close() error { return s.conn.Close() } // udpSender implements loadSender for a native udp destination. type udpSender struct { conn net.Conn log func(lvl int8, msg string, args ...interface{}) chunk *ring.Chunk } func newUdpSender(addr string, log func(lvl int8, msg string, args ...interface{})) (*udpSender, error) { conn, err := net.Dial("udp", addr) if err != nil { return nil, err } return &udpSender{ conn: conn, log: log, }, nil } func (s *udpSender) load(c *ring.Chunk) error { s.chunk = c return nil } func (s *udpSender) send() error { _, err := s.chunk.WriteTo(s.conn) return err } func (s *udpSender) release() { s.chunk.Close() s.chunk = nil } func (s *udpSender) close() error { return nil } // TODO: Write restart func for rtpSender // rtpSender implements loadSender for a native udp destination with rtp packetization. type rtpSender struct { log func(lvl int8, msg string, args ...interface{}) encoder *rtp.Encoder chunk *ring.Chunk } func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps uint) (*rtpSender, error) { conn, err := net.Dial("udp", addr) if err != nil { return nil, err } s := &rtpSender{ log: log, encoder: rtp.NewEncoder(conn, int(fps)), } return s, nil } func (s *rtpSender) load(c *ring.Chunk) error { s.chunk = c return nil } func (s *rtpSender) close() error { return nil } func (s *rtpSender) release() {} func (s *rtpSender) send() error { _, err := s.chunk.WriteTo(s.encoder) return err }