This commit is contained in:
Vladimir Mihailenco 2018-08-06 13:59:15 +03:00
parent 464daeb271
commit 96d1b85009
8 changed files with 61 additions and 39 deletions

View File

@ -727,15 +727,20 @@ var _ = Describe("ClusterClient", func() {
Eventually(func() int64 { Eventually(func() int64 {
return slave.DBSize().Val() return slave.DBSize().Val()
}, 30*time.Second).Should(Equal(int64(0))) }, "30s").Should(Equal(int64(0)))
return nil return nil
}) })
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
state, err := client.LoadState() state, err := client.LoadState()
Expect(err).NotTo(HaveOccurred()) Eventually(func() bool {
Expect(state.IsConsistent()).To(BeTrue()) state, err = client.LoadState()
if err != nil {
return false
}
return state.IsConsistent()
}, "30s").Should(BeTrue())
for _, slave := range state.Slaves { for _, slave := range state.Slaves {
err = slave.Client.ClusterFailover().Err() err = slave.Client.ClusterFailover().Err()
@ -744,7 +749,7 @@ var _ = Describe("ClusterClient", func() {
Eventually(func() bool { Eventually(func() bool {
state, _ := client.LoadState() state, _ := client.LoadState()
return state.IsConsistent() return state.IsConsistent()
}, 30*time.Second).Should(BeTrue()) }, "30s").Should(BeTrue())
} }
}) })

View File

@ -46,15 +46,15 @@ func firstCmdsErr(cmds []Cmder) error {
} }
func writeCmd(cn *pool.Conn, cmds ...Cmder) error { func writeCmd(cn *pool.Conn, cmds ...Cmder) error {
cn.WB.Reset() wb := cn.PrepareWriteBuffer()
for _, cmd := range cmds { for _, cmd := range cmds {
err := cn.WB.Append(cmd.Args()) err := wb.Append(cmd.Args())
if err != nil { if err != nil {
return err return err
} }
} }
_, err := cn.Write(cn.WB.Flush()) err := cn.FlushWriteBuffer(wb)
return err return err
} }

View File

@ -140,16 +140,16 @@ func ExampleClient() {
} }
fmt.Println("key", val) fmt.Println("key", val)
val2, err := client.Get("key2").Result() val2, err := client.Get("missing_key").Result()
if err == redis.Nil { if err == redis.Nil {
fmt.Println("key2 does not exist") fmt.Println("missing_key does not exist")
} else if err != nil { } else if err != nil {
panic(err) panic(err)
} else { } else {
fmt.Println("key2", val2) fmt.Println("missing_key", val2)
} }
// Output: key value // Output: key value
// key2 does not exist // missing_key does not exist
} }
func ExampleClient_Set() { func ExampleClient_Set() {

View File

@ -14,7 +14,9 @@ type Conn struct {
netConn net.Conn netConn net.Conn
Rd *proto.Reader Rd *proto.Reader
WB *proto.WriteBuffer wb *proto.WriteBuffer
concurrentReadWrite bool
Inited bool Inited bool
usedAt atomic.Value usedAt atomic.Value
@ -26,7 +28,7 @@ func NewConn(netConn net.Conn) *Conn {
} }
buf := proto.NewElasticBufReader(netConn) buf := proto.NewElasticBufReader(netConn)
cn.Rd = proto.NewReader(buf) cn.Rd = proto.NewReader(buf)
cn.WB = proto.NewWriteBuffer(buf) cn.wb = proto.NewWriteBuffer()
cn.SetUsedAt(time.Now()) cn.SetUsedAt(time.Now())
return cn return cn
} }
@ -76,6 +78,27 @@ func (cn *Conn) RemoteAddr() net.Addr {
return cn.netConn.RemoteAddr() return cn.netConn.RemoteAddr()
} }
func (cn *Conn) EnableConcurrentReadWrite() {
cn.concurrentReadWrite = true
cn.wb.ResetBuffer(make([]byte, 4096))
}
func (cn *Conn) PrepareWriteBuffer() *proto.WriteBuffer {
if !cn.concurrentReadWrite {
cn.wb.ResetBuffer(cn.Rd.Buffer())
}
cn.wb.Reset()
return cn.wb
}
func (cn *Conn) FlushWriteBuffer(wb *proto.WriteBuffer) error {
_, err := cn.netConn.Write(wb.Bytes())
if !cn.concurrentReadWrite {
cn.Rd.ResetBuffer(wb.Buffer())
}
return err
}
func (cn *Conn) Close() error { func (cn *Conn) Close() error {
return cn.netConn.Close() return cn.netConn.Close()
} }

View File

@ -44,6 +44,14 @@ func (r *Reader) Reset(rd io.Reader) {
r.src.Reset(rd) r.src.Reset(rd)
} }
func (r *Reader) Buffer() []byte {
return r.src.Buffer()
}
func (r *Reader) ResetBuffer(buf []byte) {
r.src.ResetBuffer(buf)
}
func (r *Reader) Bytes() []byte { func (r *Reader) Bytes() []byte {
return r.src.Bytes() return r.src.Bytes()
} }

View File

@ -7,14 +7,11 @@ import (
) )
type WriteBuffer struct { type WriteBuffer struct {
rb *ElasticBufReader
buf []byte buf []byte
} }
func NewWriteBuffer(rb *ElasticBufReader) *WriteBuffer { func NewWriteBuffer() *WriteBuffer {
return &WriteBuffer{ return &WriteBuffer{}
rb: rb,
}
} }
func (w *WriteBuffer) Len() int { func (w *WriteBuffer) Len() int {
@ -25,26 +22,16 @@ func (w *WriteBuffer) Bytes() []byte {
return w.buf return w.buf
} }
func (w *WriteBuffer) AllocBuffer() {
w.rb = nil
w.buf = make([]byte, defaultBufSize)
}
func (w *WriteBuffer) Reset() { func (w *WriteBuffer) Reset() {
if w.rb != nil {
w.buf = w.rb.Buffer()[:0]
} else {
w.buf = w.buf[:0] w.buf = w.buf[:0]
} }
func (w *WriteBuffer) Buffer() []byte {
return w.buf[:cap(w.buf)]
} }
func (w *WriteBuffer) Flush() []byte { func (w *WriteBuffer) ResetBuffer(buf []byte) {
b := w.buf w.buf = buf[:0]
if w.rb != nil {
w.rb.ResetBuffer(w.buf[:cap(w.buf)])
w.buf = nil
}
return b
} }
func (w *WriteBuffer) Append(args []interface{}) error { func (w *WriteBuffer) Append(args []interface{}) error {

View File

@ -1,7 +1,6 @@
package proto_test package proto_test
import ( import (
"strings"
"testing" "testing"
"time" "time"
@ -15,7 +14,7 @@ var _ = Describe("WriteBuffer", func() {
var buf *proto.WriteBuffer var buf *proto.WriteBuffer
BeforeEach(func() { BeforeEach(func() {
buf = proto.NewWriteBuffer(proto.NewElasticBufReader(strings.NewReader(""))) buf = proto.NewWriteBuffer()
}) })
It("should reset", func() { It("should reset", func() {
@ -54,7 +53,7 @@ var _ = Describe("WriteBuffer", func() {
}) })
func BenchmarkWriteBuffer_Append(b *testing.B) { func BenchmarkWriteBuffer_Append(b *testing.B) {
buf := proto.NewWriteBuffer(proto.NewElasticBufReader(strings.NewReader(""))) buf := proto.NewWriteBuffer()
args := []interface{}{"hello", "world", "foo", "bar"} args := []interface{}{"hello", "world", "foo", "bar"}
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {

View File

@ -62,7 +62,7 @@ func (c *PubSub) _conn(newChannels []string) (*pool.Conn, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
cn.WB.AllocBuffer() cn.EnableConcurrentReadWrite()
if err := c.resubscribe(cn); err != nil { if err := c.resubscribe(cn); err != nil {
_ = c.closeConn(cn) _ = c.closeConn(cn)