From 91171f5e19a261dc4cfbf8706626d461b6ba03e4 Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Sat, 4 Jun 2022 15:02:53 +0300 Subject: [PATCH] feat: add ClientUnpause --- command.go | 36 +++++++++++++-- commands.go | 7 +++ internal/pool/main_test.go | 89 +++++++++++++++++++++++++++++++++++++- 3 files changed, 128 insertions(+), 4 deletions(-) diff --git a/command.go b/command.go index a6beea66..1e5c11dc 100644 --- a/command.go +++ b/command.go @@ -2649,13 +2649,14 @@ func (cmd *ClusterSlotsCmd) readReply(rd *proto.Reader) error { // subtract start and end. nodes := make([]ClusterNode, n-2) + for j := 0; j < len(nodes); j++ { nn, err := rd.ReadArrayLen() if err != nil { return err } - if nn != 2 && nn != 3 { - return fmt.Errorf("got %d elements in cluster info address, expected 2 or 3", nn) + if nn < 2 || nn > 4 { + return fmt.Errorf("got %d elements in cluster info address, expected 2, 3, or 4", n) } ip, err := rd.ReadString() @@ -2670,14 +2671,43 @@ func (cmd *ClusterSlotsCmd) readReply(rd *proto.Reader) error { nodes[j].Addr = net.JoinHostPort(ip, port) - if nn == 3 { + if nn >= 3 { id, err := rd.ReadString() if err != nil { return err } nodes[j].ID = id } + + if nn >= 4 { + networkingMetadata := make(map[string]string) + + metadataLength, err := rd.ReadArrayLen() + if err != nil { + return err + } + + if metadataLength%2 != 0 { + return fmt.Errorf( + "got %d elements in metadata, expected an even number", metadataLength) + } + + for i := 0; i < metadataLength; i += 2 { + key, err := rd.ReadString() + if err != nil { + return err + } + value, err := rd.ReadString() + if err != nil { + return err + } + networkingMetadata[key] = value + } + + nodes[j].NetworkingMetadata = networkingMetadata + } } + cmd.val[i] = ClusterSlot{ Start: int(start), End: int(end), diff --git a/commands.go b/commands.go index eb0757d1..60997a8f 100644 --- a/commands.go +++ b/commands.go @@ -310,6 +310,7 @@ type Cmdable interface { ClientKillByFilter(ctx context.Context, keys ...string) *IntCmd ClientList(ctx context.Context) *StringCmd ClientPause(ctx context.Context, dur time.Duration) *BoolCmd + ClientUnpause(ctx context.Context) *BoolCmd ClientID(ctx context.Context) *IntCmd ClientUnblock(ctx context.Context, id int64) *IntCmd ClientUnblockWithError(ctx context.Context, id int64) *IntCmd @@ -2818,6 +2819,12 @@ func (c cmdable) ClientPause(ctx context.Context, dur time.Duration) *BoolCmd { return cmd } +func (c cmdable) ClientUnpause(ctx context.Context) *BoolCmd { + cmd := NewBoolCmd(ctx, "client", "unpause") + _ = c(ctx, cmd) + return cmd +} + func (c cmdable) ClientID(ctx context.Context) *IntCmd { cmd := NewIntCmd(ctx, "client", "id") _ = c(ctx, cmd) diff --git a/internal/pool/main_test.go b/internal/pool/main_test.go index 2365dbc6..8ad16747 100644 --- a/internal/pool/main_test.go +++ b/internal/pool/main_test.go @@ -2,9 +2,12 @@ package pool_test import ( "context" + "fmt" "net" "sync" + "syscall" "testing" + "time" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -32,5 +35,89 @@ func perform(n int, cbs ...func(int)) { } func dummyDialer(context.Context) (net.Conn, error) { - return &net.TCPConn{}, nil + return newDummyConn(), nil +} + +func newDummyConn() net.Conn { + return &dummyConn{ + rawConn: new(dummyRawConn), + } +} + +var ( + _ net.Conn = (*dummyConn)(nil) + _ syscall.Conn = (*dummyConn)(nil) +) + +type dummyConn struct { + rawConn *dummyRawConn +} + +func (d *dummyConn) SyscallConn() (syscall.RawConn, error) { + return d.rawConn, nil +} + +var errDummy = fmt.Errorf("dummyConn err") + +func (d *dummyConn) Read(b []byte) (n int, err error) { + return 0, errDummy +} + +func (d *dummyConn) Write(b []byte) (n int, err error) { + return 0, errDummy +} + +func (d *dummyConn) Close() error { + d.rawConn.Close() + return nil +} + +func (d *dummyConn) LocalAddr() net.Addr { + return &net.TCPAddr{} +} + +func (d *dummyConn) RemoteAddr() net.Addr { + return &net.TCPAddr{} +} + +func (d *dummyConn) SetDeadline(t time.Time) error { + return nil +} + +func (d *dummyConn) SetReadDeadline(t time.Time) error { + return nil +} + +func (d *dummyConn) SetWriteDeadline(t time.Time) error { + return nil +} + +var _ syscall.RawConn = (*dummyRawConn)(nil) + +type dummyRawConn struct { + mu sync.Mutex + closed bool +} + +func (d *dummyRawConn) Control(f func(fd uintptr)) error { + return nil +} + +func (d *dummyRawConn) Read(f func(fd uintptr) (done bool)) error { + d.mu.Lock() + defer d.mu.Unlock() + if d.closed { + return fmt.Errorf("dummyRawConn closed") + } + return nil +} + +func (d *dummyRawConn) Write(f func(fd uintptr) (done bool)) error { + return nil +} + +func (d *dummyRawConn) Close() { + d.mu.Lock() + d.closed = true + d.mu.Unlock() }