diff --git a/export_test.go b/export_test.go index 5e6d7466..cae7faa3 100644 --- a/export_test.go +++ b/export_test.go @@ -93,3 +93,11 @@ func GetSlavesAddrByName(ctx context.Context, c *SentinelClient, name string) [] } return parseReplicaAddrs(addrs, false) } + +func (c *Ring) GetAddr(addr string) *ringShard { + return c.shards.GetAddr(addr) +} + +func (c *ringShards) GetAddr(addr string) *ringShard { + return c.shards[addr] +} diff --git a/ring.go b/ring.go index dede1e49..65c7ce89 100644 --- a/ring.go +++ b/ring.go @@ -160,6 +160,7 @@ func (opt *RingOptions) clientOptions() *Options { type ringShard struct { Client *Client down int32 + addr string } func newRingShard(opt *RingOptions, name, addr string) *ringShard { @@ -168,6 +169,7 @@ func newRingShard(opt *RingOptions, name, addr string) *ringShard { return &ringShard{ Client: opt.NewClient(name, clopt), + addr: addr, } } @@ -212,33 +214,68 @@ type ringShards struct { opt *RingOptions mu sync.RWMutex + muClose sync.Mutex hash ConsistentHash - shards map[string]*ringShard // read only - list []*ringShard // read only + shards map[string]*ringShard // read only, updated by SetAddrs + list []*ringShard // read only, updated by SetAddrs numShard int closed bool } func newRingShards(opt *RingOptions) *ringShards { - shards := make(map[string]*ringShard, len(opt.Addrs)) + c := &ringShards{ + opt: opt, + } + c.SetAddrs(opt.Addrs) + + return c +} + +// SetAddrs replaces the shards in use, such that you can increase and +// decrease number of shards, that you use. It will reuse shards that +// existed before and close the ones that will not be used anymore. +func (c *ringShards) SetAddrs(addrs map[string]string) { + c.muClose.Lock() + defer c.muClose.Unlock() + if c.closed { + return + } + + shards := make(map[string]*ringShard) + unusedShards := make(map[string]*ringShard) + + for k, shard := range c.shards { + if addr, ok := addrs[k]; ok && shard.addr == addr { + shards[k] = shard + } else { + unusedShards[k] = shard + } + } + + for k, addr := range addrs { + if shard, ok := c.shards[k]; !ok || shard.addr != addr { + shards[k] = newRingShard(c.opt, k, addr) + } + } + list := make([]*ringShard, 0, len(shards)) - - for name, addr := range opt.Addrs { - shard := newRingShard(opt, name, addr) - shards[name] = shard - + for _, shard := range shards { list = append(list, shard) } - c := &ringShards{ - opt: opt, + c.mu.Lock() + c.shards = shards + c.list = list - shards: shards, - list: list, + c.rebalanceLocked() + c.mu.Unlock() + + for k, shard := range unusedShards { + err := shard.Client.Close() + if err != nil { + internal.Logger.Printf(context.Background(), "Failed to close ring shard client %s %s: %v", k, shard.addr, err) + } } - c.rebalance() - - return c } func (c *ringShards) List() []*ringShard { @@ -355,6 +392,23 @@ func (c *ringShards) rebalance() { c.mu.Unlock() } +// rebalanceLocked removes dead shards from the Ring and callers need to hold the locl +func (c *ringShards) rebalanceLocked() { + shards := c.shards + liveShards := make([]string, 0, len(shards)) + + for name, shard := range shards { + if shard.IsUp() { + liveShards = append(liveShards, name) + } + } + + hash := c.opt.NewConsistentHash(liveShards) + + c.hash = hash + c.numShard = len(liveShards) +} + func (c *ringShards) Len() int { c.mu.RLock() defer c.mu.RUnlock() @@ -363,6 +417,8 @@ func (c *ringShards) Len() int { } func (c *ringShards) Close() error { + c.muClose.Lock() + defer c.muClose.Unlock() c.mu.Lock() defer c.mu.Unlock() @@ -436,6 +492,10 @@ func NewRing(opt *RingOptions) *Ring { return &ring } +func (c *Ring) SetAddrs(ctx context.Context, addrs map[string]string) { + c.shards.SetAddrs(addrs) +} + // Do creates a Cmd from the args and processes the cmd. func (c *Ring) Do(ctx context.Context, args ...interface{}) *Cmd { cmd := NewCmd(ctx, args...) diff --git a/ring_test.go b/ring_test.go index 1a6ec84b..d8040628 100644 --- a/ring_test.go +++ b/ring_test.go @@ -113,6 +113,79 @@ var _ = Describe("Redis Ring", func() { Expect(ringShard2.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=100")) }) + Describe("[new] dynamic setting ring shards", func() { + It("downscale shard and check reuse shard, upscale shard and check reuse", func() { + Expect(ring.Len(), 2) + + wantShard := ring.GetAddr("ringShardOne") + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + ring.SetAddrs(ctx, map[string]string{ + "ringShardOne": ":" + ringShard1Port, + }) + Expect(ring.Len(), 1) + gotShard := ring.GetAddr("ringShardOne") + Expect(gotShard).To(Equal(wantShard)) + + ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + ring.SetAddrs(ctx, map[string]string{ + "ringShardOne": ":" + ringShard1Port, + "ringShardTwo": ":" + ringShard2Port, + }) + Expect(ring.Len(), 2) + gotShard = ring.GetAddr("ringShardOne") + Expect(gotShard).To(Equal(wantShard)) + + }) + + It("uses 3 shards after setting it to 3 shards", func() { + Expect(ring.Len(), 2) + + // Start ringShard3. + var err error + ringShard3, err = startRedis(ringShard3Port) + Expect(err).NotTo(HaveOccurred()) + + shardName1 := "ringShardOne" + shardAddr1 := ":" + ringShard1Port + wantShard1 := ring.GetAddr(shardName1) + shardName2 := "ringShardTwo" + shardAddr2 := ":" + ringShard2Port + wantShard2 := ring.GetAddr(shardName2) + shardName3 := "ringShardThree" + shardAddr3 := ":" + ringShard3Port + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + ring.SetAddrs(ctx, map[string]string{ + shardName1: shardAddr1, + shardName2: shardAddr2, + shardName3: shardAddr3, + }) + Expect(ring.Len(), 3) + gotShard1 := ring.GetAddr(shardName1) + gotShard2 := ring.GetAddr(shardName2) + gotShard3 := ring.GetAddr(shardName3) + Expect(gotShard1).To(Equal(wantShard1)) + Expect(gotShard2).To(Equal(wantShard2)) + Expect(gotShard3).ToNot(BeNil()) + + ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + ring.SetAddrs(ctx, map[string]string{ + shardName1: shardAddr1, + shardName2: shardAddr2, + }) + Expect(ring.Len(), 2) + gotShard1 = ring.GetAddr(shardName1) + gotShard2 = ring.GetAddr(shardName2) + gotShard3 = ring.GetAddr(shardName3) + Expect(gotShard1).To(Equal(wantShard1)) + Expect(gotShard2).To(Equal(wantShard2)) + Expect(gotShard3).To(BeNil()) + }) + + }) Describe("pipeline", func() { It("doesn't panic closed ring, returns error", func() { pipe := ring.Pipeline()