feat: add ClientUnpause

This commit is contained in:
Vladimir Mihailenco 2022-06-04 15:02:53 +03:00
parent a15a89ea58
commit 91171f5e19
3 changed files with 128 additions and 4 deletions

View File

@ -2649,13 +2649,14 @@ func (cmd *ClusterSlotsCmd) readReply(rd *proto.Reader) error {
// subtract start and end. // subtract start and end.
nodes := make([]ClusterNode, n-2) nodes := make([]ClusterNode, n-2)
for j := 0; j < len(nodes); j++ { for j := 0; j < len(nodes); j++ {
nn, err := rd.ReadArrayLen() nn, err := rd.ReadArrayLen()
if err != nil { if err != nil {
return err return err
} }
if nn != 2 && nn != 3 { if nn < 2 || nn > 4 {
return fmt.Errorf("got %d elements in cluster info address, expected 2 or 3", nn) return fmt.Errorf("got %d elements in cluster info address, expected 2, 3, or 4", n)
} }
ip, err := rd.ReadString() ip, err := rd.ReadString()
@ -2670,14 +2671,43 @@ func (cmd *ClusterSlotsCmd) readReply(rd *proto.Reader) error {
nodes[j].Addr = net.JoinHostPort(ip, port) nodes[j].Addr = net.JoinHostPort(ip, port)
if nn == 3 { if nn >= 3 {
id, err := rd.ReadString() id, err := rd.ReadString()
if err != nil { if err != nil {
return err return err
} }
nodes[j].ID = id nodes[j].ID = id
} }
if nn >= 4 {
networkingMetadata := make(map[string]string)
metadataLength, err := rd.ReadArrayLen()
if err != nil {
return err
}
if metadataLength%2 != 0 {
return fmt.Errorf(
"got %d elements in metadata, expected an even number", metadataLength)
}
for i := 0; i < metadataLength; i += 2 {
key, err := rd.ReadString()
if err != nil {
return err
}
value, err := rd.ReadString()
if err != nil {
return err
}
networkingMetadata[key] = value
}
nodes[j].NetworkingMetadata = networkingMetadata
}
} }
cmd.val[i] = ClusterSlot{ cmd.val[i] = ClusterSlot{
Start: int(start), Start: int(start),
End: int(end), End: int(end),

View File

@ -310,6 +310,7 @@ type Cmdable interface {
ClientKillByFilter(ctx context.Context, keys ...string) *IntCmd ClientKillByFilter(ctx context.Context, keys ...string) *IntCmd
ClientList(ctx context.Context) *StringCmd ClientList(ctx context.Context) *StringCmd
ClientPause(ctx context.Context, dur time.Duration) *BoolCmd ClientPause(ctx context.Context, dur time.Duration) *BoolCmd
ClientUnpause(ctx context.Context) *BoolCmd
ClientID(ctx context.Context) *IntCmd ClientID(ctx context.Context) *IntCmd
ClientUnblock(ctx context.Context, id int64) *IntCmd ClientUnblock(ctx context.Context, id int64) *IntCmd
ClientUnblockWithError(ctx context.Context, id int64) *IntCmd ClientUnblockWithError(ctx context.Context, id int64) *IntCmd
@ -2818,6 +2819,12 @@ func (c cmdable) ClientPause(ctx context.Context, dur time.Duration) *BoolCmd {
return cmd return cmd
} }
func (c cmdable) ClientUnpause(ctx context.Context) *BoolCmd {
cmd := NewBoolCmd(ctx, "client", "unpause")
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) ClientID(ctx context.Context) *IntCmd { func (c cmdable) ClientID(ctx context.Context) *IntCmd {
cmd := NewIntCmd(ctx, "client", "id") cmd := NewIntCmd(ctx, "client", "id")
_ = c(ctx, cmd) _ = c(ctx, cmd)

View File

@ -2,9 +2,12 @@ package pool_test
import ( import (
"context" "context"
"fmt"
"net" "net"
"sync" "sync"
"syscall"
"testing" "testing"
"time"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
@ -32,5 +35,89 @@ func perform(n int, cbs ...func(int)) {
} }
func dummyDialer(context.Context) (net.Conn, error) { func dummyDialer(context.Context) (net.Conn, error) {
return &net.TCPConn{}, nil return newDummyConn(), nil
}
func newDummyConn() net.Conn {
return &dummyConn{
rawConn: new(dummyRawConn),
}
}
var (
_ net.Conn = (*dummyConn)(nil)
_ syscall.Conn = (*dummyConn)(nil)
)
type dummyConn struct {
rawConn *dummyRawConn
}
func (d *dummyConn) SyscallConn() (syscall.RawConn, error) {
return d.rawConn, nil
}
var errDummy = fmt.Errorf("dummyConn err")
func (d *dummyConn) Read(b []byte) (n int, err error) {
return 0, errDummy
}
func (d *dummyConn) Write(b []byte) (n int, err error) {
return 0, errDummy
}
func (d *dummyConn) Close() error {
d.rawConn.Close()
return nil
}
func (d *dummyConn) LocalAddr() net.Addr {
return &net.TCPAddr{}
}
func (d *dummyConn) RemoteAddr() net.Addr {
return &net.TCPAddr{}
}
func (d *dummyConn) SetDeadline(t time.Time) error {
return nil
}
func (d *dummyConn) SetReadDeadline(t time.Time) error {
return nil
}
func (d *dummyConn) SetWriteDeadline(t time.Time) error {
return nil
}
var _ syscall.RawConn = (*dummyRawConn)(nil)
type dummyRawConn struct {
mu sync.Mutex
closed bool
}
func (d *dummyRawConn) Control(f func(fd uintptr)) error {
return nil
}
func (d *dummyRawConn) Read(f func(fd uintptr) (done bool)) error {
d.mu.Lock()
defer d.mu.Unlock()
if d.closed {
return fmt.Errorf("dummyRawConn closed")
}
return nil
}
func (d *dummyRawConn) Write(f func(fd uintptr) (done bool)) error {
return nil
}
func (d *dummyRawConn) Close() {
d.mu.Lock()
d.closed = true
d.mu.Unlock()
} }