/* NAME senders.go DESCRIPTION See Readme.md AUTHORS Saxon A. Nelson-Milton Alan Noble LICENSE revid is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. */ package revid import ( "errors" "fmt" "net" "os" "strconv" "github.com/Comcast/gots/packet" "bitbucket.org/ausocean/av/rtmp" "bitbucket.org/ausocean/av/stream/mts" "bitbucket.org/ausocean/av/stream/rtp" "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/logger" ) // Sender is intended to provided functionality for the sending of a byte slice // to an implemented destination. type Sender interface { // send takes the bytes slice d and sends to a particular destination as // implemented. send(d []byte) error } // multiSender allows for the sending through multi loadSenders using a single // call to multiSender.Write. type multiSender struct { active func() bool senders []loadSender retry bool } // newMultiSender returns a pointer to a new multiSender. active is a function // to indicate the state of the multiSenders owner i.e. whether it is running // or not. func newMultiSender(senders []loadSender, active func() bool) (*multiSender, error) { if active == nil { return nil, errors.New("multi sender requires that active func is provided") } s := &multiSender{ senders: senders, active: active, } return s, nil } // Write implements io.Writer. The written slice will be sent to each loadSender // in multiSender.senders as long as s.active() is true. If a send fails, and // s.retry is true, the send will be tried again. func (s *multiSender) Write(d []byte) (int, error) { for _, sender := range s.senders { sender.load(d) for s.active() { err := sender.send() if err != nil { sender.handleSendFail(err) } if err == nil || !sender.retrySend() { break } } } for _, sender := range s.senders { sender.release() } return len(d), nil } // minimalHttpSender implements Sender for posting HTTP to netreceiver or vidgrind. type minimalHttpSender struct { client *netsender.Sender log func(lvl int8, msg string, args ...interface{}) } // newMinimalHttpSender returns a pointer to a new minimalHttpSender. func newMinimalHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *minimalHttpSender { return &minimalHttpSender{ client: ns, log: log, } } // send takes a bytes slice d and sends to http using s' http client. func (s *minimalHttpSender) send(d []byte) error { return httpSend(d, s.client, s.log) } // loadSender is a destination to send a *ring.Chunk to. // When a loadSender has finished using the *ring.Chunk // it must be Closed. type loadSender interface { // load assigns the *ring.Chunk to the loadSender. // The load call may fail, but must not mutate the // the chunk. load(d []byte) error // send performs a destination-specific send // operation. It must not mutate the chunk. send() error // release releases the *ring.Chunk. release() // close cleans up after use of the loadSender. close() error // handleSendFail performs any actions necessary in response to a failed send. handleSendFail(err error) error // retry returns true if this sender has been set for send retry. retrySend() bool } // restart is an optional interface for loadSenders that // can restart their connection. type restarter interface { restart() error } // fileSender implements loadSender for a local file destination. type fileSender struct { file *os.File data []byte } func newFileSender(path string) (*fileSender, error) { f, err := os.Create(path) if err != nil { return nil, err } return &fileSender{file: f}, nil } func (s *fileSender) load(d []byte) error { s.data = d return nil } func (s *fileSender) send() error { _, err := s.file.Write(s.data) return err } func (s *fileSender) release() {} func (s *fileSender) close() error { return s.file.Close() } func (s *fileSender) handleSendFail(err error) error { return nil } func (s *fileSender) retrySend() bool { return false } // mtsSender implements loadSender and provides sending capability specifically // for use with MPEGTS packetization. It handles the construction of appropriately // lengthed clips based on PSI. It also fixes accounts for discontinuities by // setting the discontinuity indicator for the first packet of a clip. type mtsSender struct { sender Sender buf []byte next []byte pkt packet.Packet failed bool discarded bool repairer *mts.DiscontinuityRepairer curPid int retry bool } // newMtsSender returns a new mtsSender. func newMtsSender(s Sender, retry bool, log func(lvl int8, msg string, args ...interface{})) *mtsSender { return &mtsSender{ sender: s, repairer: mts.NewDiscontinuityRepairer(), retry: retry, } } // load takes a *ring.Chunk and assigns to s.next, also grabbing it's pid and // assigning to s.curPid. s.next if exists is also appended to the sender buf. func (s *mtsSender) load(d []byte) error { if s.next != nil { s.buf = append(s.buf, s.next...) } bytes := make([]byte, len(d)) copy(bytes, d) s.next = bytes copy(s.pkt[:], bytes) s.curPid = s.pkt.PID() return nil } // send checks the currently loaded paackets PID; if it is a PAT then what is in // the mtsSenders buffer is fixed and sent. func (ms *mtsSender) send() error { if ms.curPid == mts.PatPid && len(ms.buf) > 0 { err := ms.fixAndSend() if err != nil { return err } ms.buf = ms.buf[:0] } return nil } // fixAndSend checks for discontinuities in the senders buffer and then sends. // If a discontinuity is found the PAT packet at the start of the clip has it's // discontintuity indicator set to true. func (ms *mtsSender) fixAndSend() error { err := ms.repairer.Repair(ms.buf) if err == nil { err = ms.sender.send(ms.buf) if err == nil { return nil } } ms.failed = true ms.repairer.Failed() return err } func (s *mtsSender) close() error { return nil } // release will set the s.fail flag to false and clear the buffer if // the previous send was a fail. The currently loaded chunk is also closed. func (s *mtsSender) release() { if s.failed { s.buf = s.buf[:0] s.failed = false } } func (s *mtsSender) handleSendFail(err error) error { return nil } func (s *mtsSender) retrySend() bool { return s.retry } // httpSender implements loadSender for posting HTTP to NetReceiver type httpSender struct { client *netsender.Sender log func(lvl int8, msg string, args ...interface{}) data []byte retry bool } func newHttpSender(ns *netsender.Sender, retry bool, log func(lvl int8, msg string, args ...interface{})) *httpSender { return &httpSender{ client: ns, log: log, } } func (s *httpSender) load(d []byte) error { s.data = d return nil } func (s *httpSender) send() error { return httpSend(s.data, s.client, s.log) } func httpSend(d []byte, client *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) error { // Only send if "V0" is configured as an input. send := false ip := client.Param("ip") pins := netsender.MakePins(ip, "V") for i, pin := range pins { if pin.Name == "V0" { send = true pins[i].Value = len(d) pins[i].Data = d pins[i].MimeType = "video/mp2t" break } } if !send { return nil } var err error var reply string reply, _, err = client.Send(netsender.RequestRecv, pins) if err != nil { return err } return extractMeta(reply, log) } // extractMeta looks at a reply at extracts any time or location data - then used // to update time and location information in the mpegts encoder. func extractMeta(r string, log func(lvl int8, msg string, args ...interface{})) error { dec, err := netsender.NewJSONDecoder(r) if err != nil { return nil } // Extract time from reply t, err := dec.Int("ts") if err != nil { log(logger.Warning, pkg+"No timestamp in reply") } else { log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t)) mts.Meta.Add("ts", strconv.Itoa(t)) } // Extract location from reply g, err := dec.String("ll") if err != nil { log(logger.Warning, pkg+"No location in reply") } else { log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g)) mts.Meta.Add("loc", g) } return nil } func (s *httpSender) release() {} func (s *httpSender) close() error { return nil } func (s *httpSender) handleSendFail(err error) error { return nil } func (s *httpSender) retrySend() bool { return s.retry } // rtmpSender implements loadSender for a native RTMP destination. type rtmpSender struct { conn *rtmp.Conn url string timeout uint retries int log func(lvl int8, msg string, args ...interface{}) data []byte retry bool } var _ restarter = (*rtmpSender)(nil) func newRtmpSender(url string, timeout uint, retries int, retry bool, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) { var conn *rtmp.Conn var err error for n := 0; n < retries; n++ { conn, err = rtmp.Dial(url, timeout, log) if err == nil { break } log(logger.Error, err.Error()) if n < retries-1 { log(logger.Info, pkg+"retry rtmp connection") } } s := &rtmpSender{ conn: conn, url: url, timeout: timeout, retries: retries, log: log, retry: retry, } return s, err } func (s *rtmpSender) load(d []byte) error { s.data = d return nil } func (s *rtmpSender) send() error { if s.conn == nil { return errors.New("no rtmp connection, cannot write") } _, err := s.conn.Write(s.data) return err } func (s *rtmpSender) release() {} func (s *rtmpSender) restart() error { s.close() var err error for n := 0; n < s.retries; n++ { s.conn, err = rtmp.Dial(s.url, s.timeout, s.log) if err == nil { break } s.log(logger.Error, err.Error()) if n < s.retries-1 { s.log(logger.Info, pkg+"retry rtmp connection") } } return err } func (s *rtmpSender) close() error { if s.conn != nil { return s.conn.Close() } return nil } func (s *rtmpSender) handleSendFail(err error) error { return s.restart() } func (s *rtmpSender) retrySend() bool { return s.retry } // TODO: Write restart func for rtpSender // rtpSender implements loadSender for a native udp destination with rtp packetization. type rtpSender struct { log func(lvl int8, msg string, args ...interface{}) encoder *rtp.Encoder data []byte } func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps uint) (*rtpSender, error) { conn, err := net.Dial("udp", addr) if err != nil { return nil, err } s := &rtpSender{ log: log, encoder: rtp.NewEncoder(conn, int(fps)), } return s, nil } func (s *rtpSender) load(d []byte) error { s.data = make([]byte, len(d)) copy(s.data, d) return nil } func (s *rtpSender) close() error { return nil } func (s *rtpSender) release() {} func (s *rtpSender) send() error { _, err := s.encoder.Write(s.data) return err } func (s *rtpSender) handleSendFail(err error) error { return nil } func (s *rtpSender) retrySend() bool { return false }