Add RingOptions.OnNewShard

This commit is contained in:
Vladimir Mihailenco 2019-08-23 14:46:40 +03:00
parent 472322938c
commit 0c4c236793
2 changed files with 16 additions and 4 deletions

View File

@ -114,7 +114,7 @@ func (opt *Options) init() {
if opt.TLSConfig == nil { if opt.TLSConfig == nil {
return netDialer.DialContext(ctx, network, addr) return netDialer.DialContext(ctx, network, addr)
} }
return tls.DialWithDialer(netDialer, opt.Network, opt.Addr, opt.TLSConfig) return tls.DialWithDialer(netDialer, network, addr, opt.TLSConfig)
} }
} }
if opt.PoolSize == 0 { if opt.PoolSize == 0 {

18
ring.go
View File

@ -56,6 +56,9 @@ type RingOptions struct {
// See https://arxiv.org/abs/1406.2294 for reference // See https://arxiv.org/abs/1406.2294 for reference
HashReplicas int HashReplicas int
// Optional hook that is called when a new shard is created.
OnNewShard func(*Client)
// Following options are copied from Options struct. // Following options are copied from Options struct.
OnConnect func(*Conn) error OnConnect func(*Conn) error
@ -376,9 +379,8 @@ func NewRing(opt *RingOptions) *Ring {
ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo) ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo)
for name, addr := range opt.Addrs { for name, addr := range opt.Addrs {
clopt := opt.clientOptions(name) shard := newRingShard(opt, name, addr)
clopt.Addr = addr ring.shards.Add(name, shard)
ring.shards.Add(name, NewClient(clopt))
} }
go ring.shards.Heartbeat(opt.HeartbeatFrequency) go ring.shards.Heartbeat(opt.HeartbeatFrequency)
@ -386,6 +388,16 @@ func NewRing(opt *RingOptions) *Ring {
return &ring return &ring
} }
func newRingShard(opt *RingOptions, name, addr string) *Client {
clopt := opt.clientOptions(name)
clopt.Addr = addr
shard := NewClient(clopt)
if opt.OnNewShard != nil {
opt.OnNewShard(shard)
}
return shard
}
func (c *Ring) init() { func (c *Ring) init() {
c.cmdable = c.Process c.cmdable = c.Process
} }