diff --git a/revid/revid.go b/revid/revid.go index e6bc2f3a..0828d7da 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -177,10 +177,11 @@ func (r *Revid) setConfig(config Config) error { return nil } -func (r *Revid) setupPipeline(mtsEnc func(io.Writer, int) io.Writer, flvEnc func(io.Writer, int) (io.Writer, error)) error { +// setupPipeline constructs a data pipeline. +func (r *Revid) setupPipeline(mtsEnc, flvEnc func(dst io.Writer, rate int) (io.Writer, error), multiWriter func(...io.Writer) io.Writer) error { r.buffer = (*buffer)(ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout)) - r.encoder = make([]io.Writer, 0) + r.encoder = r.encoder[:0] // mtsSenders will hold the senders the require MPEGTS encoding, and flvSenders // will hold senders that require FLV encoding. @@ -193,7 +194,7 @@ func (r *Revid) setupPipeline(mtsEnc func(io.Writer, int) io.Writer, flvEnc func for _, out := range r.config.Outputs { switch out { case Http: - w = newMtsSender(newMinimalHttpSender(r.ns, r.config.Logger.Log), nil) + w = newMtsSender(newHttpSender(r.ns, r.config.Logger.Log), nil) mtsSenders = append(mtsSenders, w) case Rtp: w, err := newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) @@ -220,8 +221,8 @@ func (r *Revid) setupPipeline(mtsEnc func(io.Writer, int) io.Writer, flvEnc func // encoder to revid's encoder slice, and give this encoder the mtsSenders // as a destination. if len(mtsSenders) != 0 { - ms := newMultiSender(mtsSenders, r.config.Logger.Log) - e := mtsEnc(ms, int(r.config.FrameRate)) + mw := multiWriter(mtsSenders...) + e, _ := mtsEnc(mw, int(r.config.FrameRate)) r.encoder = append(r.encoder, e) } @@ -229,8 +230,8 @@ func (r *Revid) setupPipeline(mtsEnc func(io.Writer, int) io.Writer, flvEnc func // encoder to revid's encoder slice, and give this encoder the flvSenders // as a destination. if len(flvSenders) != 0 { - ms := newMultiSender(flvSenders, r.config.Logger.Log) - e, err := flvEnc(ms, int(r.config.FrameRate)) + mw := multiWriter(flvSenders...) + e, err := flvEnc(mw, int(r.config.FrameRate)) if err != nil { return err } @@ -257,9 +258,9 @@ func (r *Revid) setupPipeline(mtsEnc func(io.Writer, int) io.Writer, flvEnc func return nil } -func newMtsEncoder(dst io.Writer, fps int) io.Writer { +func newMtsEncoder(dst io.Writer, fps int) (io.Writer, error) { e := mts.NewEncoder(dst, float64(fps)) - return e + return e, nil } func newFlvEncoder(dst io.Writer, fps int) (io.Writer, error) { @@ -278,7 +279,7 @@ func (r *Revid) reset(config Config) error { return err } - err = r.setupPipeline(newMtsEncoder, newFlvEncoder) + err = r.setupPipeline(newMtsEncoder, newFlvEncoder, io.MultiWriter) if err != nil { return err } diff --git a/revid/revid_test.go b/revid/revid_test.go index fbd13e72..867ded78 100644 --- a/revid/revid_test.go +++ b/revid/revid_test.go @@ -75,8 +75,8 @@ type tstMtsEncoder struct { } // newTstMtsEncoder returns a pointer to a newTsMtsEncoder. -func newTstMtsEncoder(dst io.Writer, fps int) io.Writer { - return &tstMtsEncoder{dst: dst} +func newTstMtsEncoder(dst io.Writer, fps int) (io.Writer, error) { + return &tstMtsEncoder{dst: dst}, nil } func (e *tstMtsEncoder) Write(d []byte) (int, error) { return 0, nil } @@ -92,7 +92,21 @@ func newTstFlvEncoder(dst io.Writer, fps int) (io.Writer, error) { return &tstFlvEncoder{dst: dst}, nil } -func (e *tstFlvEncoder) Write(d []byte) (int, error) { return 0, nil } +func (e *tstFlvEncoder) Write(d []byte) (int, error) { return len(d), nil } + +// dummyMultiWriter emulates the MultiWriter provided by std lib, so that we +// can access the destinations. +type dummyMultiWriter struct { + dst []io.Writer +} + +func newDummyMultiWriter(dst ...io.Writer) io.Writer { + return &dummyMultiWriter{ + dst: dst, + } +} + +func (w *dummyMultiWriter) Write(d []byte) (int, error) { return len(d), nil } // TestResetEncoderSenderSetup checks that revid.reset() correctly sets up the // revid.encoder slice and the senders the encoders write to. @@ -200,7 +214,7 @@ func TestResetEncoderSenderSetup(t *testing.T) { } // This logic is what we want to check. - err = rv.setupPipeline(newTstMtsEncoder, newTstFlvEncoder) + err = rv.setupPipeline(newTstMtsEncoder, newTstFlvEncoder, newDummyMultiWriter) if err != nil { t.Fatalf("unexpected error: %v for test %v", err, testNum) } @@ -237,7 +251,7 @@ func TestResetEncoderSenderSetup(t *testing.T) { ms = e.(*tstFlvEncoder).dst } - senders := ms.(*multiSender).dst + senders := ms.(*dummyMultiWriter).dst got = len(senders) want = len(test.encoders[idx].destinations) if got != want { diff --git a/revid/senders.go b/revid/senders.go index 53ae7e45..b8e73a21 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -45,228 +45,17 @@ import ( "bitbucket.org/ausocean/utils/logger" ) -// Sender is intended to provided functionality for the sending of a byte slice -// to an implemented destination. -type Sender interface { - // send takes the bytes slice d and sends to a particular destination as - // implemented. - send(d []byte) error -} - // Log is used by the multiSender. type Log func(level int8, message string, params ...interface{}) -// multiSender implements io.Writer. It provides the capacity to send to multiple -// senders from a single Write call. -type multiSender struct { - dst []io.Writer - log Log -} - -// newMultiSender returns a pointer to a new multiSender. -func newMultiSender(senders []io.Writer, log Log) *multiSender { - return &multiSender{ - dst: senders, - log: log, - } -} - -// Write implements io.Writer. This will call load (with the passed slice), send -// and release on all senders of multiSender. -func (s *multiSender) Write(d []byte) (int, error) { - for i, sender := range s.dst { - _, err := sender.Write(d) - if err != nil { - s.log(logger.Warning, pkg+"send failed", "sender", i, "error", err) - } - } - for _, sender := range s.dst { - s, ok := sender.(loadSender) - if !ok { - panic("sender is not a loadSender") - } - s.release() - } - return len(d), nil -} - -// minimalHttpSender implements Sender for posting HTTP to netreceiver or vidgrind. -type minimalHttpSender struct { +// httpSender provides an implemntation of io.Writer to perform sends to a http +// destination. +type httpSender struct { client *netsender.Sender log func(lvl int8, msg string, args ...interface{}) } // newMinimalHttpSender returns a pointer to a new minimalHttpSender. -func newMinimalHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *minimalHttpSender { - return &minimalHttpSender{ - client: ns, - log: log, - } -} - -// send takes a bytes slice d and sends to http using s' http client. -func (s *minimalHttpSender) send(d []byte) error { - return httpSend(d, s.client, s.log) -} - -// loadSender is a destination to send a *ring.Chunk to. -// When a loadSender has finished using the *ring.Chunk -// it must be Closed. -type loadSender interface { - // load assigns the *ring.Chunk to the loadSender. - // The load call may fail, but must not mutate the - // the chunk. - load(d []byte) error - - // send performs a destination-specific send - // operation. It must not mutate the chunk. - send() error - - // release releases the *ring.Chunk. - release() - - // close cleans up after use of the loadSender. - close() error -} - -// restart is an optional interface for loadSenders that -// can restart their connection. -type restarter interface { - restart() error -} - -// fileSender implements loadSender for a local file destination. -type fileSender struct { - file *os.File - data []byte -} - -// Write implements io.Writer. -func (s *fileSender) Write(d []byte) (int, error) { - err := s.load(d) - if err != nil { - return 0, err - } - err = s.send() - if err != nil { - return len(d), err - } - return len(d), nil -} - -func newFileSender(path string) (io.Writer, error) { - f, err := os.Create(path) - if err != nil { - return nil, err - } - return &fileSender{file: f}, nil -} - -func (s *fileSender) load(d []byte) error { - s.data = d - return nil -} - -func (s *fileSender) send() error { - _, err := s.file.Write(s.data) - return err -} - -func (s *fileSender) release() {} - -func (s *fileSender) close() error { return s.file.Close() } - -// mtsSender implements loadSender and provides sending capability specifically -// for use with MPEGTS packetization. It handles the construction of appropriately -// lengthed clips based on PSI. It also fixes accounts for discontinuities by -// setting the discontinuity indicator for the first packet of a clip. -type mtsSender struct { - sender Sender - buf []byte - next []byte - pkt packet.Packet - failed bool - discarded bool - repairer *mts.DiscontinuityRepairer - curPid int -} - -// Write implements io.Writer. -func (s *mtsSender) Write(d []byte) (int, error) { - return write(s, d) -} - -// newMtsSender returns a new mtsSender. -func newMtsSender(s Sender, log func(lvl int8, msg string, args ...interface{})) *mtsSender { - return &mtsSender{ - sender: s, - repairer: mts.NewDiscontinuityRepairer(), - } -} - -// load takes a *ring.Chunk and assigns to s.next, also grabbing it's pid and -// assigning to s.curPid. s.next if exists is also appended to the sender buf. -func (s *mtsSender) load(d []byte) error { - if s.next != nil { - s.buf = append(s.buf, s.next...) - } - bytes := make([]byte, len(d)) - copy(bytes, d) - s.next = bytes - copy(s.pkt[:], bytes) - s.curPid = s.pkt.PID() - return nil -} - -// send checks the currently loaded paackets PID; if it is a PAT then what is in -// the mtsSenders buffer is fixed and sent. -func (ms *mtsSender) send() error { - if ms.curPid == mts.PatPid && len(ms.buf) > 0 { - err := ms.fixAndSend() - if err != nil { - return err - } - ms.buf = ms.buf[:0] - } - return nil -} - -// fixAndSend checks for discontinuities in the senders buffer and then sends. -// If a discontinuity is found the PAT packet at the start of the clip has it's -// discontintuity indicator set to true. -func (ms *mtsSender) fixAndSend() error { - err := ms.repairer.Repair(ms.buf) - if err == nil { - err = ms.sender.send(ms.buf) - if err == nil { - return nil - } - } - ms.failed = true - ms.repairer.Failed() - return err -} - -func (s *mtsSender) close() error { return nil } - -// release will set the s.fail flag to false and clear the buffer if -// the previous send was a fail. The currently loaded chunk is also closed. -func (s *mtsSender) release() { - if s.failed { - s.buf = s.buf[:0] - s.failed = false - } -} - -// httpSender implements loadSender for posting HTTP to NetReceiver -type httpSender struct { - client *netsender.Sender - - log func(lvl int8, msg string, args ...interface{}) - - data []byte -} - func newHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *httpSender { return &httpSender{ client: ns, @@ -274,13 +63,9 @@ func newHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ... } } -func (s *httpSender) load(d []byte) error { - s.data = d - return nil -} - -func (s *httpSender) send() error { - return httpSend(s.data, s.client, s.log) +// Write implements io.Writer. +func (s *httpSender) Write(d []byte) (int, error) { + return len(d), httpSend(d, s.client, s.log) } func httpSend(d []byte, client *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) error { @@ -338,9 +123,72 @@ func extractMeta(r string, log func(lvl int8, msg string, args ...interface{})) return nil } -func (s *httpSender) release() {} +// fileSender implements loadSender for a local file destination. +type fileSender struct { + file *os.File + data []byte +} -func (s *httpSender) close() error { return nil } +func newFileSender(path string) (io.Writer, error) { + f, err := os.Create(path) + if err != nil { + return nil, err + } + return &fileSender{file: f}, nil +} + +// Write implements io.Writer. +func (s *fileSender) Write(d []byte) (int, error) { + return s.file.Write(d) +} + +func (s *fileSender) close() error { return s.file.Close() } + +// mtsSender implements loadSender and provides sending capability specifically +// for use with MPEGTS packetization. It handles the construction of appropriately +// lengthed clips based on PSI. It also fixes accounts for discontinuities by +// setting the discontinuity indicator for the first packet of a clip. +type mtsSender struct { + dst io.Writer + buf []byte + next []byte + pkt packet.Packet + repairer *mts.DiscontinuityRepairer + curPid int +} + +// newMtsSender returns a new mtsSender. +func newMtsSender(dst io.Writer, log func(lvl int8, msg string, args ...interface{})) *mtsSender { + return &mtsSender{ + dst: dst, + repairer: mts.NewDiscontinuityRepairer(), + } +} + +// Write implements io.Writer. +func (s *mtsSender) Write(d []byte) (int, error) { + if s.next != nil { + s.buf = append(s.buf, s.next...) + } + bytes := make([]byte, len(d)) + copy(bytes, d) + s.next = bytes + copy(s.pkt[:], bytes) + s.curPid = s.pkt.PID() + if s.curPid == mts.PatPid && len(s.buf) > 0 { + err := s.repairer.Repair(s.buf) + if err == nil { + _, err = s.dst.Write(s.buf) + if err == nil { + goto done + } + } + s.repairer.Failed() + done: + s.buf = s.buf[:0] + } + return len(d), nil +} // rtmpSender implements loadSender for a native RTMP destination. type rtmpSender struct { @@ -354,8 +202,6 @@ type rtmpSender struct { data []byte } -var _ restarter = (*rtmpSender)(nil) - func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) { var conn *rtmp.Conn var err error @@ -381,27 +227,16 @@ func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg // Write implements io.Writer. func (s *rtmpSender) Write(d []byte) (int, error) { - return write(s, d) -} - -func (s *rtmpSender) load(d []byte) error { - s.data = d - return nil -} - -func (s *rtmpSender) send() error { if s.conn == nil { - return errors.New("no rtmp connection, cannot write") + return 0, errors.New("no rtmp connection, cannot write") } - _, err := s.conn.Write(s.data) + _, err := s.conn.Write(d) if err != nil { err = s.restart() } - return err + return len(d), err } -func (s *rtmpSender) release() {} - func (s *rtmpSender) restart() error { s.close() var err error @@ -433,11 +268,6 @@ type rtpSender struct { data []byte } -// Write implements io.Writer. -func (s *rtpSender) Write(d []byte) (int, error) { - return write(s, d) -} - func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps uint) (*rtpSender, error) { conn, err := net.Dial("udp", addr) if err != nil { @@ -450,30 +280,7 @@ func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{ return s, nil } -func (s *rtpSender) load(d []byte) error { - s.data = make([]byte, len(d)) - copy(s.data, d) - return nil -} - -func (s *rtpSender) close() error { return nil } - -func (s *rtpSender) release() {} - -func (s *rtpSender) send() error { - _, err := s.encoder.Write(s.data) - return err -} - -// write wraps the load and send method for loadSenders. -func write(s loadSender, d []byte) (int, error) { - err := s.load(d) - if err != nil { - return 0, err - } - err = s.send() - if err != nil { - return len(d), err - } - return len(d), nil +// Write implements io.Writer. +func (s *rtpSender) Write(d []byte) (int, error) { + return s.encoder.Write(s.data) } diff --git a/revid/senders_test.go b/revid/senders_test.go index a435e512..95474922 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -31,8 +31,6 @@ package revid import ( "errors" "fmt" - "io" - "sync" "testing" "time" @@ -59,25 +57,26 @@ var ( // sender simulates sending of video data, creating discontinuities if // testDiscontinuities is set to true. -type sender struct { +type destination struct { buf [][]byte testDiscontinuities bool discontinuityAt int currentPkt int } -// send takes d and neglects if testDiscontinuities is true, returning an error, +// Write implements io.Writer. +// Write takes d and neglects if testDiscontinuities is true, returning an error, // otherwise d is appended to senders buf. -func (ts *sender) send(d []byte) error { +func (ts *destination) Write(d []byte) (int, error) { if ts.testDiscontinuities && ts.currentPkt == ts.discontinuityAt { ts.currentPkt++ - return errSendFailed + return 0, errSendFailed } cpy := make([]byte, len(d)) copy(cpy, d) ts.buf = append(ts.buf, cpy) ts.currentPkt++ - return nil + return len(d), nil } // log implements the required logging func for some of the structs in use @@ -109,8 +108,8 @@ func TestMtsSenderSegment(t *testing.T) { mts.Meta = meta.New() // Create ringBuffer, sender, loadsender and the MPEGTS encoder. - tstSender := &sender{} - loadSender := newMtsSender(tstSender, log) + tstDst := &destination{} + loadSender := newMtsSender(tstDst, log) rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) encoder := mts.NewEncoder((*buffer)(rb), 25) @@ -131,22 +130,16 @@ func TestMtsSenderSegment(t *testing.T) { break } - err = loadSender.load(next.Bytes()) + _, err = loadSender.Write(next.Bytes()) if err != nil { t.Fatalf("Unexpected err: %v\n", err) } - - err = loadSender.send() - if err != nil { - t.Fatalf("Unexpected err: %v\n", err) - } - loadSender.release() next.Close() next = nil } } - result := tstSender.buf + result := tstDst.buf expectData := 0 for clipNo, clip := range result { t.Logf("Checking clip: %v\n", clipNo) @@ -199,8 +192,8 @@ func TestMtsSenderDiscontinuity(t *testing.T) { // Create ringBuffer sender, loadSender and the MPEGTS encoder. const clipWithDiscontinuity = 3 - tstSender := &sender{testDiscontinuities: true, discontinuityAt: clipWithDiscontinuity} - loadSender := newMtsSender(tstSender, log) + tstDst := &destination{testDiscontinuities: true, discontinuityAt: clipWithDiscontinuity} + loadSender := newMtsSender(tstDst, log) rb := ring.NewBuffer(rbSize, rbElementSize, wTimeout) encoder := mts.NewEncoder((*buffer)(rb), 25) @@ -220,19 +213,16 @@ func TestMtsSenderDiscontinuity(t *testing.T) { break } - err = loadSender.load(next.Bytes()) + _, err = loadSender.Write(next.Bytes()) if err != nil { t.Fatalf("Unexpected err: %v\n", err) } - - loadSender.send() - loadSender.release() next.Close() next = nil } } - result := tstSender.buf + result := tstDst.buf // First check that we have less clips as expected. expectedLen := (((noOfPacketsToWrite/psiSendCount)*2 + noOfPacketsToWrite) / psiSendCount) - 1 @@ -255,91 +245,3 @@ func TestMtsSenderDiscontinuity(t *testing.T) { t.Fatalf("Did not get discontinuity indicator for PAT") } } - -// dummyLoadSender is a loadSender implementation that allows us to simulate -// the behaviour of a loadSender and check that it performas as expected. -type dummyLoadSender struct { - data []byte - buf [][]byte - failOnSend bool - failHandled bool - retry bool - mu sync.Mutex -} - -// newDummyLoadSender returns a pointer to a new dummyLoadSender. -func newDummyLoadSender(fail bool, retry bool) *dummyLoadSender { - return &dummyLoadSender{failOnSend: fail, failHandled: true, retry: retry} -} - -func (s *dummyLoadSender) Write(d []byte) (int, error) { - return write(s, d) -} - -// load takes a byte slice and assigns it to the dummyLoadSenders data slice. -func (s *dummyLoadSender) load(d []byte) error { - s.data = d - return nil -} - -// send will append to dummyLoadSender's buf slice, only if failOnSend is false. -// If failOnSend is set to true, we expect that data sent won't be written to -// the buf simulating a failed send. -func (s *dummyLoadSender) send() error { - if !s.getFailOnSend() { - s.buf = append(s.buf, s.data) - return nil - } - s.failHandled = false - return errSendFailed -} - -func (s *dummyLoadSender) getFailOnSend() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.failOnSend -} - -// release sets dummyLoadSender's data slice to nil. data can be checked to see -// if release has been called at the right time. -func (s *dummyLoadSender) release() { - s.data = nil -} - -func (s *dummyLoadSender) close() error { return nil } - -// handleSendFail simply sets the failHandled flag to true. This can be checked -// to see if handleSendFail has been called by the multiSender at the right time. -func (s *dummyLoadSender) handleSendFail(err error) error { - s.failHandled = true - return nil -} - -func (s *dummyLoadSender) retrySend() bool { return s.retry } - -// TestMultiSenderWrite checks that we can do basic writing to multiple senders -// using the multiSender. -func TestMultiSenderWrite(t *testing.T) { - senders := []io.Writer{ - newDummyLoadSender(false, false), - newDummyLoadSender(false, false), - newDummyLoadSender(false, false), - } - ms := newMultiSender(senders, log) - - // Perform some multiSender writes. - const noOfWrites = 5 - for i := byte(0); i < noOfWrites; i++ { - ms.Write([]byte{i}) - } - - // Check that the senders got the data correctly from the writes. - for i := byte(0); i < noOfWrites; i++ { - for j, dest := range ms.dst { - got := dest.(*dummyLoadSender).buf[i][0] - if got != i { - t.Errorf("Did not get expected result for sender: %v. \nGot: %v\nWant: %v\n", j, got, i) - } - } - } -}