mirror of https://bitbucket.org/ausocean/av.git
revid: simplified mtsSender to remove load and send
mtsSender has been simplified such that load and send are no longer called in Write. Load and Send have removed and logic is now in Write. The logic has been simplified such that it does not try to send again. On next PR when ringbuffers are added to senders, we will add logic to deal with this.
This commit is contained in:
parent
ec3e0df977
commit
24e484c07f
|
@ -181,19 +181,12 @@ func (s *fileSender) close() error { return s.file.Close() }
|
||||||
// lengthed clips based on PSI. It also fixes accounts for discontinuities by
|
// lengthed clips based on PSI. It also fixes accounts for discontinuities by
|
||||||
// setting the discontinuity indicator for the first packet of a clip.
|
// setting the discontinuity indicator for the first packet of a clip.
|
||||||
type mtsSender struct {
|
type mtsSender struct {
|
||||||
sender Sender
|
sender Sender
|
||||||
buf []byte
|
buf []byte
|
||||||
next []byte
|
next []byte
|
||||||
pkt packet.Packet
|
pkt packet.Packet
|
||||||
failed bool
|
repairer *mts.DiscontinuityRepairer
|
||||||
discarded bool
|
curPid int
|
||||||
repairer *mts.DiscontinuityRepairer
|
|
||||||
curPid int
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write implements io.Writer.
|
|
||||||
func (s *mtsSender) Write(d []byte) (int, error) {
|
|
||||||
return write(s, d)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// newMtsSender returns a new mtsSender.
|
// newMtsSender returns a new mtsSender.
|
||||||
|
@ -204,9 +197,8 @@ func newMtsSender(s Sender, log func(lvl int8, msg string, args ...interface{}))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// load takes a *ring.Chunk and assigns to s.next, also grabbing it's pid and
|
// Write implements io.Writer.
|
||||||
// assigning to s.curPid. s.next if exists is also appended to the sender buf.
|
func (s *mtsSender) Write(d []byte) (int, error) {
|
||||||
func (s *mtsSender) load(d []byte) error {
|
|
||||||
if s.next != nil {
|
if s.next != nil {
|
||||||
s.buf = append(s.buf, s.next...)
|
s.buf = append(s.buf, s.next...)
|
||||||
}
|
}
|
||||||
|
@ -215,49 +207,23 @@ func (s *mtsSender) load(d []byte) error {
|
||||||
s.next = bytes
|
s.next = bytes
|
||||||
copy(s.pkt[:], bytes)
|
copy(s.pkt[:], bytes)
|
||||||
s.curPid = s.pkt.PID()
|
s.curPid = s.pkt.PID()
|
||||||
return nil
|
if s.curPid == mts.PatPid && len(s.buf) > 0 {
|
||||||
}
|
err := s.repairer.Repair(s.buf)
|
||||||
|
|
||||||
// send checks the currently loaded paackets PID; if it is a PAT then what is in
|
|
||||||
// the mtsSenders buffer is fixed and sent.
|
|
||||||
func (ms *mtsSender) send() error {
|
|
||||||
if ms.curPid == mts.PatPid && len(ms.buf) > 0 {
|
|
||||||
err := ms.fixAndSend()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
ms.buf = ms.buf[:0]
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// fixAndSend checks for discontinuities in the senders buffer and then sends.
|
|
||||||
// If a discontinuity is found the PAT packet at the start of the clip has it's
|
|
||||||
// discontintuity indicator set to true.
|
|
||||||
func (ms *mtsSender) fixAndSend() error {
|
|
||||||
err := ms.repairer.Repair(ms.buf)
|
|
||||||
if err == nil {
|
|
||||||
err = ms.sender.send(ms.buf)
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return nil
|
err = s.sender.send(s.buf)
|
||||||
|
if err == nil {
|
||||||
|
goto done
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
s.repairer.Failed()
|
||||||
|
done:
|
||||||
|
s.buf = s.buf[:0]
|
||||||
}
|
}
|
||||||
ms.failed = true
|
return len(d), nil
|
||||||
ms.repairer.Failed()
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *mtsSender) close() error { return nil }
|
func (s *mtsSender) close() error { return nil }
|
||||||
|
|
||||||
// release will set the s.fail flag to false and clear the buffer if
|
|
||||||
// the previous send was a fail. The currently loaded chunk is also closed.
|
|
||||||
func (s *mtsSender) release() {
|
|
||||||
if s.failed {
|
|
||||||
s.buf = s.buf[:0]
|
|
||||||
s.failed = false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// httpSender implements loadSender for posting HTTP to NetReceiver
|
// httpSender implements loadSender for posting HTTP to NetReceiver
|
||||||
type httpSender struct {
|
type httpSender struct {
|
||||||
client *netsender.Sender
|
client *netsender.Sender
|
||||||
|
|
|
@ -69,6 +69,7 @@ type sender struct {
|
||||||
// send takes d and neglects if testDiscontinuities is true, returning an error,
|
// send takes d and neglects if testDiscontinuities is true, returning an error,
|
||||||
// otherwise d is appended to senders buf.
|
// otherwise d is appended to senders buf.
|
||||||
func (ts *sender) send(d []byte) error {
|
func (ts *sender) send(d []byte) error {
|
||||||
|
//fmt.Println("sending")
|
||||||
if ts.testDiscontinuities && ts.currentPkt == ts.discontinuityAt {
|
if ts.testDiscontinuities && ts.currentPkt == ts.discontinuityAt {
|
||||||
ts.currentPkt++
|
ts.currentPkt++
|
||||||
return errSendFailed
|
return errSendFailed
|
||||||
|
@ -131,16 +132,10 @@ func TestMtsSenderSegment(t *testing.T) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
err = loadSender.load(next.Bytes())
|
_, err = loadSender.Write(next.Bytes())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected err: %v\n", err)
|
t.Fatalf("Unexpected err: %v\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = loadSender.send()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Unexpected err: %v\n", err)
|
|
||||||
}
|
|
||||||
loadSender.release()
|
|
||||||
next.Close()
|
next.Close()
|
||||||
next = nil
|
next = nil
|
||||||
}
|
}
|
||||||
|
@ -220,13 +215,10 @@ func TestMtsSenderDiscontinuity(t *testing.T) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
err = loadSender.load(next.Bytes())
|
_, err = loadSender.Write(next.Bytes())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected err: %v\n", err)
|
t.Fatalf("Unexpected err: %v\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
loadSender.send()
|
|
||||||
loadSender.release()
|
|
||||||
next.Close()
|
next.Close()
|
||||||
next = nil
|
next = nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue