/* NAME senders.go DESCRIPTION See Readme.md AUTHORS Saxon A. Nelson-Milton Alan Noble LICENSE revid is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. */ package revid import ( "errors" "fmt" "io" "net" "os" "sync" "time" "github.com/Comcast/gots/packet" "bitbucket.org/ausocean/av/container/mts" "bitbucket.org/ausocean/av/protocol/rtmp" "bitbucket.org/ausocean/av/protocol/rtp" "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/ring" ) // Log is used by the multiSender. type Log func(level int8, message string, params ...interface{}) // Sender ring buffer read timeouts. const ( rtmpRBReadTimeout = 1 * time.Second mtsRBReadTimeout = 1 * time.Second ) // httpSender provides an implemntation of io.Writer to perform sends to a http // destination. type httpSender struct { client *netsender.Sender log func(lvl int8, msg string, args ...interface{}) } // newHttpSender returns a pointer to a new httpSender. func newHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *httpSender { return &httpSender{ client: ns, log: log, } } // Write implements io.Writer. func (s *httpSender) Write(d []byte) (int, error) { return len(d), httpSend(d, s.client, s.log) } func (s *httpSender) Close() error { return nil } func httpSend(d []byte, client *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) error { // Only send if "V0" is configured as an input. send := false ip := client.Param("ip") pins := netsender.MakePins(ip, "V") for i, pin := range pins { if pin.Name == "V0" { send = true pins[i].Value = len(d) pins[i].Data = d pins[i].MimeType = "video/mp2t" break } } if !send { return nil } var err error var reply string reply, _, err = client.Send(netsender.RequestRecv, pins) if err != nil { return err } return extractMeta(reply, log) } // extractMeta looks at a reply at extracts any time or location data - then used // to update time and location information in the mpegts encoder. func extractMeta(r string, log func(lvl int8, msg string, args ...interface{})) error { dec, err := netsender.NewJSONDecoder(r) if err != nil { return nil } // Extract time from reply t, err := dec.Int("ts") if err != nil { log(logger.Warning, pkg+"No timestamp in reply") } else { log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t)) mts.RealTime.Set(time.Unix(int64(t), 0)) } // Extract location from reply g, err := dec.String("ll") if err != nil { log(logger.Debug, pkg+"No location in reply") } else { log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g)) mts.Meta.Add("loc", g) } return nil } // fileSender implements loadSender for a local file destination. type fileSender struct { file *os.File data []byte } func newFileSender(path string) (*fileSender, error) { f, err := os.Create(path) if err != nil { return nil, err } return &fileSender{file: f}, nil } // Write implements io.Writer. func (s *fileSender) Write(d []byte) (int, error) { return s.file.Write(d) } func (s *fileSender) Close() error { return s.file.Close() } // mtsSender implements io.WriteCloser and provides sending capability specifically // for use with MPEGTS packetization. It handles the construction of appropriately // lengthed clips based on clip duration and PSI. It also accounts for // discontinuities by setting the discontinuity indicator for the first packet of a clip. type mtsSender struct { dst io.WriteCloser buf []byte ring *ring.Buffer next []byte pkt packet.Packet repairer *mts.DiscontinuityRepairer curPid int clipDur time.Duration prev time.Time done chan struct{} log func(lvl int8, msg string, args ...interface{}) wg sync.WaitGroup } // newMtsSender returns a new mtsSender. func newMtsSender(dst io.WriteCloser, log func(lvl int8, msg string, args ...interface{}), rb *ring.Buffer, clipDur time.Duration) *mtsSender { s := &mtsSender{ dst: dst, repairer: mts.NewDiscontinuityRepairer(), log: log, ring: rb, done: make(chan struct{}), clipDur: clipDur, } s.wg.Add(1) go s.output() return s } // output starts an mtsSender's data handling routine. func (s *mtsSender) output() { var chunk *ring.Chunk for { select { case <-s.done: s.log(logger.Info, pkg+"mtsSender: got done signal, terminating output routine") defer s.wg.Done() return default: // If chunk is nil then we're ready to get another from the ringBuffer. if chunk == nil { var err error chunk, err = s.ring.Next(mtsRBReadTimeout) switch err { case nil, io.EOF: continue case ring.ErrTimeout: s.log(logger.Debug, pkg+"mtsSender: ring buffer read timeout") continue default: s.log(logger.Error, pkg+"mtsSender: unexpected error", "error", err.Error()) continue } } err := s.repairer.Repair(chunk.Bytes()) if err != nil { chunk.Close() chunk = nil continue } s.log(logger.Debug, pkg+"mtsSender: writing") _, err = s.dst.Write(chunk.Bytes()) if err != nil { s.repairer.Failed() continue } chunk.Close() chunk = nil } } } // Write implements io.Writer. func (s *mtsSender) Write(d []byte) (int, error) { if len(d) < mts.PacketSize { return 0, errors.New("do not have full MTS packet") } if s.next != nil { s.buf = append(s.buf, s.next...) } bytes := make([]byte, len(d)) copy(bytes, d) s.next = bytes p, _ := mts.PID(bytes) s.curPid = int(p) if time.Now().Sub(s.prev) >= s.clipDur && s.curPid == mts.PatPid && len(s.buf) > 0 { s.prev = time.Now() _, err := s.ring.Write(s.buf) if err == nil { s.ring.Flush() } if err != nil { s.log(logger.Warning, pkg+"mtsSender: ringBuffer write error", "error", err.Error()) } s.buf = s.buf[:0] } return len(d), nil } // Close implements io.Closer. func (s *mtsSender) Close() error { close(s.done) s.wg.Wait() return nil } // rtmpSender implements loadSender for a native RTMP destination. type rtmpSender struct { conn *rtmp.Conn url string timeout uint retries int log func(lvl int8, msg string, args ...interface{}) ring *ring.Buffer done chan struct{} wg sync.WaitGroup } func newRtmpSender(url string, timeout uint, retries int, rb *ring.Buffer, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) { var conn *rtmp.Conn var err error for n := 0; n < retries; n++ { conn, err = rtmp.Dial(url, timeout, log) if err == nil { break } log(logger.Error, err.Error()) if n < retries-1 { log(logger.Info, pkg+"retry rtmp connection") } } s := &rtmpSender{ conn: conn, url: url, timeout: timeout, retries: retries, log: log, ring: rb, done: make(chan struct{}), } s.wg.Add(1) go s.output() return s, err } // output starts an mtsSender's data handling routine. func (s *rtmpSender) output() { var chunk *ring.Chunk for { select { case <-s.done: s.log(logger.Info, pkg+"rtmpSender: got done signal, terminating output routine") defer s.wg.Done() return default: // If chunk is nil then we're ready to get another from the ring buffer. if chunk == nil { var err error chunk, err = s.ring.Next(rtmpRBReadTimeout) switch err { case nil, io.EOF: continue case ring.ErrTimeout: s.log(logger.Debug, pkg+"rtmpSender: ring buffer read timeout") continue default: s.log(logger.Error, pkg+"rtmpSender: unexpected error", "error", err.Error()) continue } } if s.conn == nil { s.log(logger.Warning, pkg+"rtmpSender: no rtmp connection, going to restart...") err := s.restart() if err != nil { s.log(logger.Warning, pkg+"rtmpSender: could not restart connection", "error", err.Error()) continue } } _, err := s.conn.Write(chunk.Bytes()) switch err { case nil, rtmp.ErrInvalidFlvTag: default: s.log(logger.Warning, pkg+"rtmpSender: send error, restarting...", "error", err.Error()) err = s.restart() if err != nil { s.log(logger.Warning, pkg+"rtmpSender: could not restart connection", "error", err.Error()) } continue } chunk.Close() chunk = nil } } } // Write implements io.Writer. func (s *rtmpSender) Write(d []byte) (int, error) { _, err := s.ring.Write(d) if err == nil { s.ring.Flush() } if err != nil { s.log(logger.Warning, pkg+"rtmpSender: ring buffer write error", "error", err.Error()) } return len(d), nil } func (s *rtmpSender) restart() error { s.close() var err error for n := 0; n < s.retries; n++ { s.conn, err = rtmp.Dial(s.url, s.timeout, s.log) if err == nil { break } s.log(logger.Error, err.Error()) if n < s.retries-1 { s.log(logger.Info, pkg+"retry rtmp connection") } } return err } func (s *rtmpSender) Close() error { if s.done != nil { close(s.done) } s.wg.Wait() return s.close() } func (s *rtmpSender) close() error { if s.conn == nil { return nil } return s.conn.Close() } // TODO: Write restart func for rtpSender // rtpSender implements loadSender for a native udp destination with rtp packetization. type rtpSender struct { log func(lvl int8, msg string, args ...interface{}) encoder *rtp.Encoder data []byte } func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps uint) (*rtpSender, error) { conn, err := net.Dial("udp", addr) if err != nil { return nil, err } s := &rtpSender{ log: log, encoder: rtp.NewEncoder(conn, int(fps)), } return s, nil } // Write implements io.Writer. func (s *rtpSender) Write(d []byte) (int, error) { s.data = make([]byte, len(d)) copy(s.data, d) _, err := s.encoder.Write(s.data) if err != nil { s.log(logger.Warning, pkg+"rtpSender: write error", err.Error()) } return len(d), nil } func (s *rtpSender) Close() error { return nil }