From 96d1b850095ce5b9f1ca4479bd649e093fb11ea7 Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Mon, 6 Aug 2018 13:59:15 +0300 Subject: [PATCH] Cleanups --- cluster_test.go | 13 ++++++++---- command.go | 6 +++--- example_test.go | 8 ++++---- internal/pool/conn.go | 27 +++++++++++++++++++++++-- internal/proto/reader.go | 8 ++++++++ internal/proto/write_buffer.go | 31 +++++++++-------------------- internal/proto/write_buffer_test.go | 5 ++--- pubsub.go | 2 +- 8 files changed, 61 insertions(+), 39 deletions(-) diff --git a/cluster_test.go b/cluster_test.go index f9c3a90f..03f75337 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -727,15 +727,20 @@ var _ = Describe("ClusterClient", func() { Eventually(func() int64 { return slave.DBSize().Val() - }, 30*time.Second).Should(Equal(int64(0))) + }, "30s").Should(Equal(int64(0))) return nil }) Expect(err).NotTo(HaveOccurred()) state, err := client.LoadState() - Expect(err).NotTo(HaveOccurred()) - Expect(state.IsConsistent()).To(BeTrue()) + Eventually(func() bool { + state, err = client.LoadState() + if err != nil { + return false + } + return state.IsConsistent() + }, "30s").Should(BeTrue()) for _, slave := range state.Slaves { err = slave.Client.ClusterFailover().Err() @@ -744,7 +749,7 @@ var _ = Describe("ClusterClient", func() { Eventually(func() bool { state, _ := client.LoadState() return state.IsConsistent() - }, 30*time.Second).Should(BeTrue()) + }, "30s").Should(BeTrue()) } }) diff --git a/command.go b/command.go index 7fcbe5a9..8928b8a1 100644 --- a/command.go +++ b/command.go @@ -46,15 +46,15 @@ func firstCmdsErr(cmds []Cmder) error { } func writeCmd(cn *pool.Conn, cmds ...Cmder) error { - cn.WB.Reset() + wb := cn.PrepareWriteBuffer() for _, cmd := range cmds { - err := cn.WB.Append(cmd.Args()) + err := wb.Append(cmd.Args()) if err != nil { return err } } - _, err := cn.Write(cn.WB.Flush()) + err := cn.FlushWriteBuffer(wb) return err } diff --git a/example_test.go b/example_test.go index b92c6035..7d8cc52e 100644 --- a/example_test.go +++ b/example_test.go @@ -140,16 +140,16 @@ func ExampleClient() { } fmt.Println("key", val) - val2, err := client.Get("key2").Result() + val2, err := client.Get("missing_key").Result() if err == redis.Nil { - fmt.Println("key2 does not exist") + fmt.Println("missing_key does not exist") } else if err != nil { panic(err) } else { - fmt.Println("key2", val2) + fmt.Println("missing_key", val2) } // Output: key value - // key2 does not exist + // missing_key does not exist } func ExampleClient_Set() { diff --git a/internal/pool/conn.go b/internal/pool/conn.go index 4cab6562..39f4ed11 100644 --- a/internal/pool/conn.go +++ b/internal/pool/conn.go @@ -14,7 +14,9 @@ type Conn struct { netConn net.Conn Rd *proto.Reader - WB *proto.WriteBuffer + wb *proto.WriteBuffer + + concurrentReadWrite bool Inited bool usedAt atomic.Value @@ -26,7 +28,7 @@ func NewConn(netConn net.Conn) *Conn { } buf := proto.NewElasticBufReader(netConn) cn.Rd = proto.NewReader(buf) - cn.WB = proto.NewWriteBuffer(buf) + cn.wb = proto.NewWriteBuffer() cn.SetUsedAt(time.Now()) return cn } @@ -76,6 +78,27 @@ func (cn *Conn) RemoteAddr() net.Addr { return cn.netConn.RemoteAddr() } +func (cn *Conn) EnableConcurrentReadWrite() { + cn.concurrentReadWrite = true + cn.wb.ResetBuffer(make([]byte, 4096)) +} + +func (cn *Conn) PrepareWriteBuffer() *proto.WriteBuffer { + if !cn.concurrentReadWrite { + cn.wb.ResetBuffer(cn.Rd.Buffer()) + } + cn.wb.Reset() + return cn.wb +} + +func (cn *Conn) FlushWriteBuffer(wb *proto.WriteBuffer) error { + _, err := cn.netConn.Write(wb.Bytes()) + if !cn.concurrentReadWrite { + cn.Rd.ResetBuffer(wb.Buffer()) + } + return err +} + func (cn *Conn) Close() error { return cn.netConn.Close() } diff --git a/internal/proto/reader.go b/internal/proto/reader.go index 9e42b00a..49890d4d 100644 --- a/internal/proto/reader.go +++ b/internal/proto/reader.go @@ -44,6 +44,14 @@ func (r *Reader) Reset(rd io.Reader) { r.src.Reset(rd) } +func (r *Reader) Buffer() []byte { + return r.src.Buffer() +} + +func (r *Reader) ResetBuffer(buf []byte) { + r.src.ResetBuffer(buf) +} + func (r *Reader) Bytes() []byte { return r.src.Bytes() } diff --git a/internal/proto/write_buffer.go b/internal/proto/write_buffer.go index cb9520b5..3e0e3bae 100644 --- a/internal/proto/write_buffer.go +++ b/internal/proto/write_buffer.go @@ -7,14 +7,11 @@ import ( ) type WriteBuffer struct { - rb *ElasticBufReader buf []byte } -func NewWriteBuffer(rb *ElasticBufReader) *WriteBuffer { - return &WriteBuffer{ - rb: rb, - } +func NewWriteBuffer() *WriteBuffer { + return &WriteBuffer{} } func (w *WriteBuffer) Len() int { @@ -25,26 +22,16 @@ func (w *WriteBuffer) Bytes() []byte { return w.buf } -func (w *WriteBuffer) AllocBuffer() { - w.rb = nil - w.buf = make([]byte, defaultBufSize) -} - func (w *WriteBuffer) Reset() { - if w.rb != nil { - w.buf = w.rb.Buffer()[:0] - } else { - w.buf = w.buf[:0] - } + w.buf = w.buf[:0] } -func (w *WriteBuffer) Flush() []byte { - b := w.buf - if w.rb != nil { - w.rb.ResetBuffer(w.buf[:cap(w.buf)]) - w.buf = nil - } - return b +func (w *WriteBuffer) Buffer() []byte { + return w.buf[:cap(w.buf)] +} + +func (w *WriteBuffer) ResetBuffer(buf []byte) { + w.buf = buf[:0] } func (w *WriteBuffer) Append(args []interface{}) error { diff --git a/internal/proto/write_buffer_test.go b/internal/proto/write_buffer_test.go index b40c65e9..84799ff3 100644 --- a/internal/proto/write_buffer_test.go +++ b/internal/proto/write_buffer_test.go @@ -1,7 +1,6 @@ package proto_test import ( - "strings" "testing" "time" @@ -15,7 +14,7 @@ var _ = Describe("WriteBuffer", func() { var buf *proto.WriteBuffer BeforeEach(func() { - buf = proto.NewWriteBuffer(proto.NewElasticBufReader(strings.NewReader(""))) + buf = proto.NewWriteBuffer() }) It("should reset", func() { @@ -54,7 +53,7 @@ var _ = Describe("WriteBuffer", func() { }) func BenchmarkWriteBuffer_Append(b *testing.B) { - buf := proto.NewWriteBuffer(proto.NewElasticBufReader(strings.NewReader(""))) + buf := proto.NewWriteBuffer() args := []interface{}{"hello", "world", "foo", "bar"} for i := 0; i < b.N; i++ { diff --git a/pubsub.go b/pubsub.go index 0289265b..546c2348 100644 --- a/pubsub.go +++ b/pubsub.go @@ -62,7 +62,7 @@ func (c *PubSub) _conn(newChannels []string) (*pool.Conn, error) { if err != nil { return nil, err } - cn.WB.AllocBuffer() + cn.EnableConcurrentReadWrite() if err := c.resubscribe(cn); err != nil { _ = c.closeConn(cn)