diff --git a/revid/senders.go b/revid/senders.go index 53ae7e45..db122293 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -181,19 +181,12 @@ func (s *fileSender) close() error { return s.file.Close() } // lengthed clips based on PSI. It also fixes accounts for discontinuities by // setting the discontinuity indicator for the first packet of a clip. type mtsSender struct { - sender Sender - buf []byte - next []byte - pkt packet.Packet - failed bool - discarded bool - repairer *mts.DiscontinuityRepairer - curPid int -} - -// Write implements io.Writer. -func (s *mtsSender) Write(d []byte) (int, error) { - return write(s, d) + sender Sender + buf []byte + next []byte + pkt packet.Packet + repairer *mts.DiscontinuityRepairer + curPid int } // newMtsSender returns a new mtsSender. @@ -204,9 +197,8 @@ func newMtsSender(s Sender, log func(lvl int8, msg string, args ...interface{})) } } -// load takes a *ring.Chunk and assigns to s.next, also grabbing it's pid and -// assigning to s.curPid. s.next if exists is also appended to the sender buf. -func (s *mtsSender) load(d []byte) error { +// Write implements io.Writer. +func (s *mtsSender) Write(d []byte) (int, error) { if s.next != nil { s.buf = append(s.buf, s.next...) } @@ -215,49 +207,23 @@ func (s *mtsSender) load(d []byte) error { s.next = bytes copy(s.pkt[:], bytes) s.curPid = s.pkt.PID() - return nil -} - -// send checks the currently loaded paackets PID; if it is a PAT then what is in -// the mtsSenders buffer is fixed and sent. -func (ms *mtsSender) send() error { - if ms.curPid == mts.PatPid && len(ms.buf) > 0 { - err := ms.fixAndSend() - if err != nil { - return err - } - ms.buf = ms.buf[:0] - } - return nil -} - -// fixAndSend checks for discontinuities in the senders buffer and then sends. -// If a discontinuity is found the PAT packet at the start of the clip has it's -// discontintuity indicator set to true. -func (ms *mtsSender) fixAndSend() error { - err := ms.repairer.Repair(ms.buf) - if err == nil { - err = ms.sender.send(ms.buf) + if s.curPid == mts.PatPid && len(s.buf) > 0 { + err := s.repairer.Repair(s.buf) if err == nil { - return nil + err = s.sender.send(s.buf) + if err == nil { + goto done + } } + s.repairer.Failed() + done: + s.buf = s.buf[:0] } - ms.failed = true - ms.repairer.Failed() - return err + return len(d), nil } func (s *mtsSender) close() error { return nil } -// release will set the s.fail flag to false and clear the buffer if -// the previous send was a fail. The currently loaded chunk is also closed. -func (s *mtsSender) release() { - if s.failed { - s.buf = s.buf[:0] - s.failed = false - } -} - // httpSender implements loadSender for posting HTTP to NetReceiver type httpSender struct { client *netsender.Sender diff --git a/revid/senders_test.go b/revid/senders_test.go index a435e512..d6620002 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -69,6 +69,7 @@ type sender struct { // send takes d and neglects if testDiscontinuities is true, returning an error, // otherwise d is appended to senders buf. func (ts *sender) send(d []byte) error { + //fmt.Println("sending") if ts.testDiscontinuities && ts.currentPkt == ts.discontinuityAt { ts.currentPkt++ return errSendFailed @@ -131,16 +132,10 @@ func TestMtsSenderSegment(t *testing.T) { break } - err = loadSender.load(next.Bytes()) + _, err = loadSender.Write(next.Bytes()) if err != nil { t.Fatalf("Unexpected err: %v\n", err) } - - err = loadSender.send() - if err != nil { - t.Fatalf("Unexpected err: %v\n", err) - } - loadSender.release() next.Close() next = nil } @@ -220,13 +215,10 @@ func TestMtsSenderDiscontinuity(t *testing.T) { break } - err = loadSender.load(next.Bytes()) + _, err = loadSender.Write(next.Bytes()) if err != nil { t.Fatalf("Unexpected err: %v\n", err) } - - loadSender.send() - loadSender.release() next.Close() next = nil }