diff --git a/ring.go b/ring.go index d07e806..c1c16d7 100644 --- a/ring.go +++ b/ring.go @@ -87,6 +87,8 @@ func (opt *RingOptions) clientOptions() *Options { } } +//------------------------------------------------------------------------------ + type ringShard struct { Client *Client down int32 @@ -127,6 +129,150 @@ func (shard *ringShard) Vote(up bool) bool { return shard.IsDown() } +//------------------------------------------------------------------------------ + +type ringShards struct { + mu sync.RWMutex + hash *consistenthash.Map + shards map[string]*ringShard // read only + list []*ringShard // read only + closed bool +} + +func newRingShards() *ringShards { + return &ringShards{ + hash: consistenthash.New(nreplicas, nil), + shards: make(map[string]*ringShard), + } +} + +func (c *ringShards) Add(name string, cl *Client) { + shard := &ringShard{Client: cl} + c.hash.Add(name) + c.shards[name] = shard + c.list = append(c.list, shard) +} + +func (c *ringShards) List() []*ringShard { + c.mu.RLock() + list := c.list + c.mu.RUnlock() + return list +} + +func (c *ringShards) Hash(key string) string { + c.mu.RLock() + hash := c.hash.Get(key) + c.mu.RUnlock() + return hash +} + +func (c *ringShards) GetByKey(key string) (*ringShard, error) { + key = hashtag.Key(key) + + c.mu.RLock() + + if c.closed { + c.mu.RUnlock() + return nil, pool.ErrClosed + } + + hash := c.hash.Get(key) + if hash == "" { + c.mu.RUnlock() + return nil, errRingShardsDown + } + + shard := c.shards[hash] + c.mu.RUnlock() + + return shard, nil +} + +func (c *ringShards) GetByHash(name string) (*ringShard, error) { + if name == "" { + return c.Random() + } + + c.mu.RLock() + shard := c.shards[name] + c.mu.RUnlock() + return shard, nil +} + +func (c *ringShards) Random() (*ringShard, error) { + return c.GetByKey(strconv.Itoa(rand.Int())) +} + +// heartbeat monitors state of each shard in the ring. +func (c *ringShards) Heartbeat(frequency time.Duration) { + ticker := time.NewTicker(frequency) + defer ticker.Stop() + for range ticker.C { + var rebalance bool + + c.mu.RLock() + + if c.closed { + c.mu.RUnlock() + break + } + + shards := c.list + c.mu.RUnlock() + + for _, shard := range shards { + err := shard.Client.Ping().Err() + if shard.Vote(err == nil || err == pool.ErrPoolTimeout) { + internal.Logf("ring shard state changed: %s", shard) + rebalance = true + } + } + + if rebalance { + c.rebalance() + } + } +} + +// rebalance removes dead shards from the Ring. +func (c *ringShards) rebalance() { + hash := consistenthash.New(nreplicas, nil) + for name, shard := range c.shards { + if shard.IsUp() { + hash.Add(name) + } + } + + c.mu.Lock() + c.hash = hash + c.mu.Unlock() +} + +func (c *ringShards) Close() error { + c.mu.Lock() + defer c.mu.Unlock() + + if c.closed { + return nil + } + c.closed = true + + var firstErr error + for _, shard := range c.shards { + if err := shard.Client.Close(); err != nil && firstErr == nil { + firstErr = err + } + } + c.hash = nil + c.shards = nil + c.list = nil + + return firstErr +} + +//------------------------------------------------------------------------------ + // Ring is a Redis client that uses constistent hashing to distribute // keys across multiple Redis servers (shards). It's safe for // concurrent use by multiple goroutines. @@ -143,18 +289,11 @@ func (shard *ringShard) Vote(up bool) bool { // Otherwise you should use Redis Cluster. type Ring struct { cmdable - opt *RingOptions + shards *ringShards cmdsInfoCache *cmdsInfoCache - mu sync.RWMutex - hash *consistenthash.Map - shards map[string]*ringShard // read only - shardsList []*ringShard // read only - processPipeline func([]Cmder) error - - closed bool } func NewRing(opt *RingOptions) *Ring { @@ -162,10 +301,8 @@ func NewRing(opt *RingOptions) *Ring { ring := &Ring{ opt: opt, + shards: newRingShards(), cmdsInfoCache: newCmdsInfoCache(), - - hash: consistenthash.New(nreplicas, nil), - shards: make(map[string]*ringShard), } ring.processPipeline = ring.defaultProcessPipeline ring.cmdable.setProcessor(ring.Process) @@ -173,19 +310,17 @@ func NewRing(opt *RingOptions) *Ring { for name, addr := range opt.Addrs { clopt := opt.clientOptions() clopt.Addr = addr - ring.addShard(name, NewClient(clopt)) + ring.shards.Add(name, NewClient(clopt)) } - go ring.heartbeat() + go ring.shards.Heartbeat(opt.HeartbeatFrequency) return ring } -func (c *Ring) addShard(name string, cl *Client) { - shard := &ringShard{Client: cl} - c.hash.Add(name) - c.shards[name] = shard - c.shardsList = append(c.shardsList, shard) +func (c *Ring) copy() *Ring { + cp := *c + return &cp } // Options returns read-only Options that were used to create the client. @@ -199,10 +334,7 @@ func (c *Ring) retryBackoff(attempt int) time.Duration { // PoolStats returns accumulated connection pool stats. func (c *Ring) PoolStats() *PoolStats { - c.mu.RLock() - shards := c.shardsList - c.mu.RUnlock() - + shards := c.shards.List() var acc PoolStats for _, shard := range shards { s := shard.Client.connPool.Stats() @@ -221,7 +353,7 @@ func (c *Ring) Subscribe(channels ...string) *PubSub { panic("at least one channel is required") } - shard, err := c.shardByKey(channels[0]) + shard, err := c.shards.GetByKey(channels[0]) if err != nil { // TODO: return PubSub with sticky error panic(err) @@ -235,7 +367,7 @@ func (c *Ring) PSubscribe(channels ...string) *PubSub { panic("at least one channel is required") } - shard, err := c.shardByKey(channels[0]) + shard, err := c.shards.GetByKey(channels[0]) if err != nil { // TODO: return PubSub with sticky error panic(err) @@ -246,10 +378,7 @@ func (c *Ring) PSubscribe(channels ...string) *PubSub { // ForEachShard concurrently calls the fn on each live shard in the ring. // It returns the first error if any. func (c *Ring) ForEachShard(fn func(client *Client) error) error { - c.mu.RLock() - shards := c.shardsList - c.mu.RUnlock() - + shards := c.shards.List() var wg sync.WaitGroup errCh := make(chan error, 1) for _, shard := range shards { @@ -281,10 +410,7 @@ func (c *Ring) ForEachShard(fn func(client *Client) error) error { func (c *Ring) cmdInfo(name string) *CommandInfo { cmdsInfo, err := c.cmdsInfoCache.Do(func() (map[string]*CommandInfo, error) { - c.mu.RLock() - shards := c.shardsList - c.mu.RUnlock() - + shards := c.shards.List() firstErr := errRingShardsDown for _, shard := range shards { cmdsInfo, err := shard.Client.Command().Result() @@ -307,50 +433,14 @@ func (c *Ring) cmdInfo(name string) *CommandInfo { return info } -func (c *Ring) shardByKey(key string) (*ringShard, error) { - key = hashtag.Key(key) - - c.mu.RLock() - - if c.closed { - c.mu.RUnlock() - return nil, pool.ErrClosed - } - - name := c.hash.Get(key) - if name == "" { - c.mu.RUnlock() - return nil, errRingShardsDown - } - - shard := c.shards[name] - c.mu.RUnlock() - return shard, nil -} - -func (c *Ring) randomShard() (*ringShard, error) { - return c.shardByKey(strconv.Itoa(rand.Int())) -} - -func (c *Ring) shardByName(name string) (*ringShard, error) { - if name == "" { - return c.randomShard() - } - - c.mu.RLock() - shard := c.shards[name] - c.mu.RUnlock() - return shard, nil -} - func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) { cmdInfo := c.cmdInfo(cmd.Name()) pos := cmdFirstKeyPos(cmd, cmdInfo) if pos == 0 { - return c.randomShard() + return c.shards.Random() } firstKey := cmd.stringArg(pos) - return c.shardByKey(firstKey) + return c.shards.GetByKey(firstKey) } func (c *Ring) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) { @@ -369,77 +459,6 @@ func (c *Ring) Process(cmd Cmder) error { return shard.Client.Process(cmd) } -// rebalance removes dead shards from the Ring. -func (c *Ring) rebalance() { - hash := consistenthash.New(nreplicas, nil) - for name, shard := range c.shards { - if shard.IsUp() { - hash.Add(name) - } - } - - c.mu.Lock() - c.hash = hash - c.mu.Unlock() -} - -// heartbeat monitors state of each shard in the ring. -func (c *Ring) heartbeat() { - ticker := time.NewTicker(c.opt.HeartbeatFrequency) - defer ticker.Stop() - for range ticker.C { - var rebalance bool - - c.mu.RLock() - - if c.closed { - c.mu.RUnlock() - break - } - - shards := c.shardsList - c.mu.RUnlock() - - for _, shard := range shards { - err := shard.Client.Ping().Err() - if shard.Vote(err == nil || err == pool.ErrPoolTimeout) { - internal.Logf("ring shard state changed: %s", shard) - rebalance = true - } - } - - if rebalance { - c.rebalance() - } - } -} - -// Close closes the ring client, releasing any open resources. -// -// It is rare to Close a Ring, as the Ring is meant to be long-lived -// and shared between many goroutines. -func (c *Ring) Close() error { - c.mu.Lock() - defer c.mu.Unlock() - - if c.closed { - return nil - } - c.closed = true - - var firstErr error - for _, shard := range c.shards { - if err := shard.Client.Close(); err != nil && firstErr == nil { - firstErr = err - } - } - c.hash = nil - c.shards = nil - c.shardsList = nil - - return firstErr -} - func (c *Ring) Pipeline() Pipeliner { pipe := Pipeline{ exec: c.processPipeline, @@ -462,11 +481,11 @@ func (c *Ring) defaultProcessPipeline(cmds []Cmder) error { cmdsMap := make(map[string][]Cmder) for _, cmd := range cmds { cmdInfo := c.cmdInfo(cmd.Name()) - name := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo)) - if name != "" { - name = c.hash.Get(hashtag.Key(name)) + hash := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo)) + if hash != "" { + hash = c.shards.Hash(hashtag.Key(hash)) } - cmdsMap[name] = append(cmdsMap[name], cmd) + cmdsMap[hash] = append(cmdsMap[hash], cmd) } for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { @@ -476,8 +495,8 @@ func (c *Ring) defaultProcessPipeline(cmds []Cmder) error { var failedCmdsMap map[string][]Cmder - for name, cmds := range cmdsMap { - shard, err := c.shardByName(name) + for hash, cmds := range cmdsMap { + shard, err := c.shards.GetByHash(hash) if err != nil { setCmdsErr(cmds, err) continue @@ -500,7 +519,7 @@ func (c *Ring) defaultProcessPipeline(cmds []Cmder) error { if failedCmdsMap == nil { failedCmdsMap = make(map[string][]Cmder) } - failedCmdsMap[name] = cmds + failedCmdsMap[hash] = cmds } } @@ -520,3 +539,11 @@ func (c *Ring) TxPipeline() Pipeliner { func (c *Ring) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { panic("not implemented") } + +// Close closes the ring client, releasing any open resources. +// +// It is rare to Close a Ring, as the Ring is meant to be long-lived +// and shared between many goroutines. +func (c *Ring) Close() error { + return c.shards.Close() +}