/* NAME senders.go DESCRIPTION See Readme.md AUTHORS Saxon A. Nelson-Milton Alan Noble LICENSE revid is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. */ package revid import ( "errors" "fmt" "io" "net" "os" "sync" "syscall" "time" "github.com/Comcast/gots/packet" "bitbucket.org/ausocean/av/container/mts" "bitbucket.org/ausocean/av/protocol/rtmp" "bitbucket.org/ausocean/av/protocol/rtp" "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/utils/logging" "bitbucket.org/ausocean/utils/pool" ) // Sender pool buffer read timeouts. const ( rtmpPoolReadTimeout = 1 * time.Second mtsPoolReadTimeout = 1 * time.Second mtsBufferPoolMaxAlloc = 5 << 20 // 5MiB. maxBuffLen = 50000000 ) var ( adjustedRTMPPoolElementSize int adjustedMTSPoolElementSize int ) var errReportCallbackNil = errors.New("report callback is nil") type httpSenderOption func(s *httpSender) error // withReportCallback provides a functional option to set the report callback // function. This can be used to record the number of bytes sent. func withReportCallback(report func(sent int)) httpSenderOption { return func(s *httpSender) error { if report == nil { return errReportCallbackNil } s.report = report return nil } } // withHTTPAddress provides a functional option to set the destination http // address. func withHTTPAddress(addr string) httpSenderOption { return func(s *httpSender) error { s.addr = addr return nil } } // httpSender provides an implemntation of io.Writer to perform sends to a http // destination. type httpSender struct { client *netsender.Sender log logging.Logger report func(sent int) addr string } // newHttpSender returns a pointer to a new httpSender. // report is callback that can be used to report the amount of data sent per write. // This can be set to nil. func newHTTPSender(ns *netsender.Sender, log logging.Logger, opts ...httpSenderOption) (*httpSender, error) { s := &httpSender{client: ns, log: log} for _, opt := range opts { err := opt(s) if err != nil { return nil, err } } return s, nil } // Write implements io.Writer. func (s *httpSender) Write(d []byte) (int, error) { s.log.Debug("HTTP sending", "address", s.addr) err := httpSend(d, s.client, s.log, s.addr) if err == nil { s.log.Debug("good send", "len", len(d)) if s.report != nil { s.report(len(d)) } } else { s.log.Debug("bad send", "error", err) } return len(d), err } func (s *httpSender) Close() error { return nil } func httpSend(d []byte, client *netsender.Sender, log logging.Logger, addr string) error { // Only send if "V0" or "S0" are configured as input. send := false ip := client.Param("ip") log.Debug("making pins, and sending recv request", "ip", ip) pins := netsender.MakePins(ip, "V,S") for i, pin := range pins { switch pin.Name { case "V0": pins[i].MimeType = "video/mp2t" case "S0": pins[i].MimeType = "audio/x-wav" default: continue } pins[i].Value = len(d) pins[i].Data = d send = true break } if !send { return nil } reply, _, err := client.Send(netsender.RequestRecv, pins, netsender.WithRecvAddress(addr)) if err != nil { return err } log.Debug("good request", "reply", reply) return extractMeta(reply, log) } // extractMeta looks at a reply at extracts any time or location data - then used // to update time and location information in the mpegts encoder. func extractMeta(r string, log logging.Logger) error { dec, err := netsender.NewJSONDecoder(r) if err != nil { return nil } // Extract time from reply if mts.Realtime has not been set. if !mts.RealTime.IsSet() { t, err := dec.Int("ts") if err != nil { log.Warning("No timestamp in reply") } else { log.Debug("got timestamp", "ts", t) mts.RealTime.Set(time.Unix(int64(t), 0)) } } // Extract location from reply g, err := dec.String("ll") if err != nil { log.Debug("No location in reply") } else { log.Debug(fmt.Sprintf("got location: %v", g)) mts.Meta.Add(mts.LocationKey, g) } return nil } // fileSender implements loadSender for a local file destination. type fileSender struct { file *os.File data []byte multiFile bool maxFileSize uint // maxFileSize is in bytes. A size of 0 means there is no size limit. path string log logging.Logger } // newFileSender returns a new fileSender. Setting multi true will write a new // file for each write to this sender. func newFileSender(l logging.Logger, path string, multiFile bool, maxFileSize uint) (*fileSender, error) { return &fileSender{ path: path, log: l, multiFile: multiFile, maxFileSize: maxFileSize, }, nil } // Write implements io.Writer. func (s *fileSender) Write(d []byte) (int, error) { s.log.Debug("checking disk space") var stat syscall.Statfs_t if err := syscall.Statfs("/", &stat); err != nil { return 0, fmt.Errorf("could not read system disk space, abandoning write: %w", err) } availableSpace := stat.Bavail * uint64(stat.Bsize) totalSpace := stat.Blocks * uint64(stat.Bsize) s.log.Debug("available, total disk space in bytes", "availableSpace", availableSpace, "totalSpace", totalSpace) var spaceBuffer uint64 = 50000000 // 50MB. if availableSpace < spaceBuffer { return 0, fmt.Errorf("reached limit of disk space with a buffer of %v bytes, abandoning write", spaceBuffer) } // If the write will exceed the max file size, close the file so that a new one can be created. if s.maxFileSize != 0 && s.file != nil { fileInfo, err := s.file.Stat() if err != nil { return 0, fmt.Errorf("could not read files stats: %w", err) } size := uint(fileInfo.Size()) s.log.Debug("checked file size", "bytes", size) if size+uint(len(d)) > s.maxFileSize { s.log.Debug("new write would exceed max file size, closing existing file", "maxFileSize", s.maxFileSize) s.file.Close() s.file = nil } } if s.file == nil { fileName := s.path + time.Now().Format("2006-01-02_15-04-05") s.log.Debug("creating new output file", "multiFile", s.multiFile, "fileName", fileName) f, err := os.Create(fileName) if err != nil { return 0, fmt.Errorf("could not create file to write media to: %w", err) } s.file = f } s.log.Debug("writing to output file", "bytes", len(d)) n, err := s.file.Write(d) if err != nil { return n, err } if s.multiFile { s.file.Close() s.file = nil } return n, nil } func (s *fileSender) Close() error { return s.file.Close() } // mtsSender implements io.WriteCloser and provides sending capability specifically // for use with MPEGTS packetization. It handles the construction of appropriately // lengthed clips based on clip duration and PSI. It also accounts for // discontinuities by setting the discontinuity indicator for the first packet of a clip. type mtsSender struct { dst io.WriteCloser buf []byte pool *pool.Buffer next []byte pkt packet.Packet repairer *mts.DiscontinuityRepairer curPid int clipDur time.Duration prev time.Time done chan struct{} log logging.Logger wg sync.WaitGroup } // newMtsSender returns a new mtsSender. func newMTSSender(dst io.WriteCloser, log logging.Logger, rb *pool.Buffer, clipDur time.Duration) *mtsSender { log.Debug("setting up mtsSender", "clip duration", int(clipDur)) s := &mtsSender{ dst: dst, repairer: mts.NewDiscontinuityRepairer(), log: log, pool: rb, done: make(chan struct{}), clipDur: clipDur, } // mtsSender will do particularly large writes to the pool buffer; let's // increase its max allowable allocation. pool.MaxAlloc(mtsBufferPoolMaxAlloc) s.wg.Add(1) go s.output() return s } // output starts an mtsSender's data handling routine. func (s *mtsSender) output() { var chunk *pool.Chunk for { select { case <-s.done: s.log.Info("terminating sender output routine") defer s.wg.Done() return default: // If chunk is nil then we're ready to get another from the ringBuffer. if chunk == nil { var err error chunk, err = s.pool.Next(mtsPoolReadTimeout) switch err { case nil, io.EOF: continue case pool.ErrTimeout: s.log.Debug("mtsSender: pool buffer read timeout") continue default: s.log.Error("unexpected error", "error", err.Error()) continue } } err := s.repairer.Repair(chunk.Bytes()) if err != nil { chunk.Close() chunk = nil continue } s.log.Debug("mtsSender: writing") _, err = s.dst.Write(chunk.Bytes()) if err != nil { s.log.Debug("failed write, repairing MTS", "error", err) s.repairer.Failed() continue } else { s.log.Debug("good write") } chunk.Close() chunk = nil } } } // Write implements io.Writer. func (s *mtsSender) Write(d []byte) (int, error) { if len(d) < mts.PacketSize { return 0, errors.New("do not have full MTS packet") } if s.next != nil { s.log.Debug("appending packet to clip") s.buf = append(s.buf, s.next...) } bytes := make([]byte, len(d)) copy(bytes, d) s.next = bytes p, _ := mts.PID(bytes) s.curPid = int(p) curDur := time.Now().Sub(s.prev) s.log.Debug("checking send conditions", "curDuration", int(curDur), "sendDur", int(s.clipDur), "curPID", s.curPid, "len", len(s.buf)) if curDur >= s.clipDur && s.curPid == mts.PatPid && len(s.buf) > 0 { s.log.Debug("writing clip to pool buffer for sending", "size", len(s.buf)) s.prev = time.Now() n, err := s.pool.Write(s.buf) if err == nil { s.pool.Flush() } if err != nil { s.log.Warning("ringBuffer write error", "error", err.Error(), "n", n, "writeSize", len(s.buf), "rbElementSize", adjustedMTSPoolElementSize) if err == pool.ErrTooLong { adjustedMTSPoolElementSize = len(s.buf) * 2 numElements := maxBuffLen / adjustedMTSPoolElementSize s.pool = pool.NewBuffer(maxBuffLen/adjustedMTSPoolElementSize, adjustedMTSPoolElementSize, 5*time.Second) s.log.Info("adjusted MTS pool buffer element size", "new size", adjustedMTSPoolElementSize, "num elements", numElements, "size(MB)", numElements*adjustedMTSPoolElementSize) } } s.buf = s.buf[:0] } return len(d), nil } // Close implements io.Closer. func (s *mtsSender) Close() error { s.log.Debug("closing sender output routine") close(s.done) s.wg.Wait() s.log.Info("sender output routine closed") return nil } // rtmpSender implements loadSender for a native RTMP destination. type rtmpSender struct { conn *rtmp.Conn url string retries int log logging.Logger pool *pool.Buffer done chan struct{} wg sync.WaitGroup report func(sent int) } func newRtmpSender(url string, retries int, rb *pool.Buffer, log logging.Logger, report func(sent int)) (*rtmpSender, error) { var conn *rtmp.Conn var err error for n := 0; n < retries; n++ { conn, err = rtmp.Dial(url, log.Log) if err == nil { break } log.Error("dial error", "error", err) if n < retries-1 { log.Info("retrying dial") } } s := &rtmpSender{ conn: conn, url: url, retries: retries, log: log, pool: rb, done: make(chan struct{}), report: report, } s.wg.Add(1) go s.output() return s, err } // output starts an mtsSender's data handling routine. func (s *rtmpSender) output() { var chunk *pool.Chunk for { select { case <-s.done: s.log.Info("terminating sender output routine") defer s.wg.Done() return default: // If chunk is nil then we're ready to get another from the pool buffer. if chunk == nil { var err error chunk, err = s.pool.Next(rtmpPoolReadTimeout) switch err { case nil, io.EOF: continue case pool.ErrTimeout: s.log.Debug("rtmpSender: pool buffer read timeout") continue default: s.log.Error("unexpected error", "error", err.Error()) continue } } if s.conn == nil { s.log.Warning("no rtmp connection, re-dialing") err := s.restart() if err != nil { s.log.Warning("could not restart connection", "error", err) continue } } _, err := s.conn.Write(chunk.Bytes()) switch err { case nil, rtmp.ErrInvalidFlvTag: s.log.Debug("good write to conn") default: s.log.Warning("send error, re-dialing", "error", err) err = s.restart() if err != nil { s.log.Warning("could not restart connection", "error", err) } continue } chunk.Close() chunk = nil } } } // Write implements io.Writer. func (s *rtmpSender) Write(d []byte) (int, error) { s.log.Debug("writing to pool buffer") _, err := s.pool.Write(d) if err == nil { s.pool.Flush() s.log.Debug("good pool buffer write", "len", len(d)) } else { s.log.Warning("pool buffer write error", "error", err.Error()) if err == pool.ErrTooLong { adjustedRTMPPoolElementSize = len(d) * 2 numElements := maxBuffLen / adjustedRTMPPoolElementSize s.pool = pool.NewBuffer(numElements, adjustedRTMPPoolElementSize, 5*time.Second) s.log.Info("adjusted RTMP pool buffer element size", "new size", adjustedRTMPPoolElementSize, "num elements", numElements, "size(MB)", numElements*adjustedRTMPPoolElementSize) } } s.report(len(d)) return len(d), nil } func (s *rtmpSender) restart() error { s.close() var err error for n := 0; n < s.retries; n++ { s.log.Debug("dialing", "dials", n) s.conn, err = rtmp.Dial(s.url, s.log.Log) if err == nil { break } s.log.Error("dial error", "error", err) if n < s.retries-1 { s.log.Info("retry rtmp connection") } } return err } func (s *rtmpSender) Close() error { s.log.Debug("closing output routine") if s.done != nil { close(s.done) } s.wg.Wait() s.log.Info("output routine closed") return s.close() } func (s *rtmpSender) close() error { s.log.Debug("closing connection") if s.conn == nil { return nil } return s.conn.Close() } // TODO: Write restart func for rtpSender // rtpSender implements loadSender for a native udp destination with rtp packetization. type rtpSender struct { log logging.Logger encoder *rtp.Encoder data []byte report func(sent int) } func newRtpSender(addr string, log logging.Logger, fps uint, report func(sent int)) (*rtpSender, error) { conn, err := net.Dial("udp", addr) if err != nil { return nil, err } s := &rtpSender{ log: log, encoder: rtp.NewEncoder(conn, int(fps)), report: report, } return s, nil } // Write implements io.Writer. func (s *rtpSender) Write(d []byte) (int, error) { s.data = make([]byte, len(d)) copy(s.data, d) _, err := s.encoder.Write(s.data) if err != nil { s.log.Warning("rtpSender: write error", err.Error()) } s.report(len(d)) return len(d), nil } func (s *rtpSender) Close() error { return nil }