feat: add lock for hooks.dial

Signed-off-by: monkey92t <golang@88.com>
This commit is contained in:
monkey92t 2023-02-26 18:29:59 +08:00
parent e309eaf915
commit 24fa725799
5 changed files with 35 additions and 11 deletions

View File

@ -838,7 +838,7 @@ type ClusterClient struct {
state *clusterStateHolder state *clusterStateHolder
cmdsInfoCache *cmdsInfoCache cmdsInfoCache *cmdsInfoCache
cmdable cmdable
hooksMixin *hooksMixin
} }
// NewClusterClient returns a Redis Cluster client as described in // NewClusterClient returns a Redis Cluster client as described in
@ -849,6 +849,7 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
c := &ClusterClient{ c := &ClusterClient{
opt: opt, opt: opt,
nodes: newClusterNodes(opt), nodes: newClusterNodes(opt),
hooksMixin: &hooksMixin{},
} }
c.state = newClusterStateHolder(c.loadState) c.state = newClusterStateHolder(c.loadState)

View File

@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"net" "net"
"strings" "strings"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -44,6 +45,8 @@ type hooksMixin struct {
slice []Hook slice []Hook
initial hooks initial hooks
current hooks current hooks
hooksMu sync.RWMutex
} }
func (hs *hooksMixin) initHooks(hooks hooks) { func (hs *hooksMixin) initHooks(hooks hooks) {
@ -117,6 +120,9 @@ func (hs *hooksMixin) AddHook(hook Hook) {
func (hs *hooksMixin) chain() { func (hs *hooksMixin) chain() {
hs.initial.setDefaults() hs.initial.setDefaults()
hs.hooksMu.Lock()
defer hs.hooksMu.Unlock()
hs.current.dial = hs.initial.dial hs.current.dial = hs.initial.dial
hs.current.process = hs.initial.process hs.current.process = hs.initial.process
hs.current.pipeline = hs.initial.pipeline hs.current.pipeline = hs.initial.pipeline
@ -138,8 +144,15 @@ func (hs *hooksMixin) chain() {
} }
} }
func (hs *hooksMixin) clone() hooksMixin { func (hs *hooksMixin) clone() *hooksMixin {
clone := *hs hs.hooksMu.Lock()
defer hs.hooksMu.Unlock()
clone := &hooksMixin{
slice: hs.slice,
initial: hs.initial,
current: hs.current,
}
l := len(clone.slice) l := len(clone.slice)
clone.slice = clone.slice[:l:l] clone.slice = clone.slice[:l:l]
return clone return clone
@ -166,7 +179,11 @@ func (hs *hooksMixin) withProcessPipelineHook(
} }
func (hs *hooksMixin) dialHook(ctx context.Context, network, addr string) (net.Conn, error) { func (hs *hooksMixin) dialHook(ctx context.Context, network, addr string) (net.Conn, error) {
return hs.current.dial(ctx, network, addr) hs.hooksMu.RLock()
conn, err := hs.current.dial(ctx, network, addr)
hs.hooksMu.RUnlock()
return conn, err
} }
func (hs *hooksMixin) processHook(ctx context.Context, cmd Cmder) error { func (hs *hooksMixin) processHook(ctx context.Context, cmd Cmder) error {
@ -588,8 +605,8 @@ func (c *baseClient) context(ctx context.Context) context.Context {
// of idle connections. You can control the pool size with Config.PoolSize option. // of idle connections. You can control the pool size with Config.PoolSize option.
type Client struct { type Client struct {
*baseClient *baseClient
*hooksMixin
cmdable cmdable
hooksMixin
} }
// NewClient returns a client to the Redis Server specified by Options. // NewClient returns a client to the Redis Server specified by Options.
@ -600,6 +617,7 @@ func NewClient(opt *Options) *Client {
baseClient: &baseClient{ baseClient: &baseClient{
opt: opt, opt: opt,
}, },
hooksMixin: &hooksMixin{},
} }
c.init() c.init()
c.connPool = newConnPool(opt, c.dialHook) c.connPool = newConnPool(opt, c.dialHook)
@ -620,6 +638,7 @@ func (c *Client) init() {
func (c *Client) WithTimeout(timeout time.Duration) *Client { func (c *Client) WithTimeout(timeout time.Duration) *Client {
clone := *c clone := *c
clone.baseClient = c.baseClient.withTimeout(timeout) clone.baseClient = c.baseClient.withTimeout(timeout)
clone.hooksMixin = c.hooksMixin.clone()
clone.init() clone.init()
return &clone return &clone
} }
@ -758,7 +777,7 @@ type Conn struct {
baseClient baseClient
cmdable cmdable
statefulCmdable statefulCmdable
hooksMixin *hooksMixin
} }
func newConn(opt *Options, connPool pool.Pooler) *Conn { func newConn(opt *Options, connPool pool.Pooler) *Conn {
@ -767,6 +786,7 @@ func newConn(opt *Options, connPool pool.Pooler) *Conn {
opt: opt, opt: opt,
connPool: connPool, connPool: connPool,
}, },
hooksMixin: &hooksMixin{},
} }
c.cmdable = c.Process c.cmdable = c.Process

View File

@ -487,7 +487,7 @@ func (c *ringSharding) Close() error {
// Otherwise you should use Redis Cluster. // Otherwise you should use Redis Cluster.
type Ring struct { type Ring struct {
cmdable cmdable
hooksMixin *hooksMixin
opt *RingOptions opt *RingOptions
sharding *ringSharding sharding *ringSharding
@ -504,6 +504,7 @@ func NewRing(opt *RingOptions) *Ring {
opt: opt, opt: opt,
sharding: newRingSharding(opt), sharding: newRingSharding(opt),
heartbeatCancelFn: hbCancel, heartbeatCancelFn: hbCancel,
hooksMixin: &hooksMixin{},
} }
ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo) ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo)

View File

@ -211,6 +211,7 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
baseClient: &baseClient{ baseClient: &baseClient{
opt: opt, opt: opt,
}, },
hooksMixin: &hooksMixin{},
} }
rdb.init() rdb.init()
@ -267,7 +268,7 @@ func masterReplicaDialer(
// SentinelClient is a client for a Redis Sentinel. // SentinelClient is a client for a Redis Sentinel.
type SentinelClient struct { type SentinelClient struct {
*baseClient *baseClient
hooksMixin *hooksMixin
} }
func NewSentinelClient(opt *Options) *SentinelClient { func NewSentinelClient(opt *Options) *SentinelClient {
@ -276,6 +277,7 @@ func NewSentinelClient(opt *Options) *SentinelClient {
baseClient: &baseClient{ baseClient: &baseClient{
opt: opt, opt: opt,
}, },
hooksMixin: &hooksMixin{},
} }
c.initHooks(hooks{ c.initHooks(hooks{

2
tx.go
View File

@ -19,7 +19,7 @@ type Tx struct {
baseClient baseClient
cmdable cmdable
statefulCmdable statefulCmdable
hooksMixin *hooksMixin
} }
func (c *Client) newTx() *Tx { func (c *Client) newTx() *Tx {