mirror of https://github.com/go-redis/redis.git
Merge pull request #579 from go-redis/fix/cmd-info-race
Fix cmd info race. Fixes #578
This commit is contained in:
commit
1471ec2b0a
|
@ -16,5 +16,6 @@ matrix:
|
||||||
- go: tip
|
- go: tip
|
||||||
|
|
||||||
install:
|
install:
|
||||||
|
- go get go4.org/syncutil
|
||||||
- go get github.com/onsi/ginkgo
|
- go get github.com/onsi/ginkgo
|
||||||
- go get github.com/onsi/gomega
|
- go get github.com/onsi/gomega
|
||||||
|
|
38
cluster.go
38
cluster.go
|
@ -7,6 +7,8 @@ import (
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"go4.org/syncutil"
|
||||||
|
|
||||||
"github.com/go-redis/redis/internal"
|
"github.com/go-redis/redis/internal"
|
||||||
"github.com/go-redis/redis/internal/hashtag"
|
"github.com/go-redis/redis/internal/hashtag"
|
||||||
"github.com/go-redis/redis/internal/pool"
|
"github.com/go-redis/redis/internal/pool"
|
||||||
|
@ -335,10 +337,12 @@ type ClusterClient struct {
|
||||||
cmdable
|
cmdable
|
||||||
|
|
||||||
opt *ClusterOptions
|
opt *ClusterOptions
|
||||||
cmds map[string]*CommandInfo
|
|
||||||
nodes *clusterNodes
|
nodes *clusterNodes
|
||||||
_state atomic.Value
|
_state atomic.Value
|
||||||
|
|
||||||
|
cmdsInfoOnce syncutil.Once
|
||||||
|
cmdsInfo map[string]*CommandInfo
|
||||||
|
|
||||||
// Reports where slots reloading is in progress.
|
// Reports where slots reloading is in progress.
|
||||||
reloading uint32
|
reloading uint32
|
||||||
}
|
}
|
||||||
|
@ -389,13 +393,34 @@ func (c *ClusterClient) state() *clusterState {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *ClusterClient) cmdInfo(name string) *CommandInfo {
|
||||||
|
err := c.cmdsInfoOnce.Do(func() error {
|
||||||
|
node, err := c.nodes.Random()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
cmdsInfo, err := node.Client.Command().Result()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
c.cmdsInfo = cmdsInfo
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return c.cmdsInfo[name]
|
||||||
|
}
|
||||||
|
|
||||||
func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *clusterNode, error) {
|
func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *clusterNode, error) {
|
||||||
if state == nil {
|
if state == nil {
|
||||||
node, err := c.nodes.Random()
|
node, err := c.nodes.Random()
|
||||||
return 0, node, err
|
return 0, node, err
|
||||||
}
|
}
|
||||||
|
|
||||||
cmdInfo := c.cmds[cmd.Name()]
|
cmdInfo := c.cmdInfo(cmd.Name())
|
||||||
firstKey := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo))
|
firstKey := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo))
|
||||||
slot := hashtag.Slot(firstKey)
|
slot := hashtag.Slot(firstKey)
|
||||||
|
|
||||||
|
@ -631,15 +656,6 @@ func (c *ClusterClient) reloadSlots() (*clusterState, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: fix race
|
|
||||||
if c.cmds == nil {
|
|
||||||
cmds, err := node.Client.Command().Result()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
c.cmds = cmds
|
|
||||||
}
|
|
||||||
|
|
||||||
slots, err := node.Client.ClusterSlots().Result()
|
slots, err := node.Client.ClusterSlots().Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
18
ring.go
18
ring.go
|
@ -9,6 +9,8 @@ import (
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"go4.org/syncutil"
|
||||||
|
|
||||||
"github.com/go-redis/redis/internal"
|
"github.com/go-redis/redis/internal"
|
||||||
"github.com/go-redis/redis/internal/consistenthash"
|
"github.com/go-redis/redis/internal/consistenthash"
|
||||||
"github.com/go-redis/redis/internal/hashtag"
|
"github.com/go-redis/redis/internal/hashtag"
|
||||||
|
@ -134,7 +136,7 @@ type Ring struct {
|
||||||
hash *consistenthash.Map
|
hash *consistenthash.Map
|
||||||
shards map[string]*ringShard
|
shards map[string]*ringShard
|
||||||
|
|
||||||
cmdsInfoOnce *sync.Once
|
cmdsInfoOnce syncutil.Once
|
||||||
cmdsInfo map[string]*CommandInfo
|
cmdsInfo map[string]*CommandInfo
|
||||||
|
|
||||||
closed bool
|
closed bool
|
||||||
|
@ -149,8 +151,6 @@ func NewRing(opt *RingOptions) *Ring {
|
||||||
|
|
||||||
hash: consistenthash.New(nreplicas, nil),
|
hash: consistenthash.New(nreplicas, nil),
|
||||||
shards: make(map[string]*ringShard),
|
shards: make(map[string]*ringShard),
|
||||||
|
|
||||||
cmdsInfoOnce: new(sync.Once),
|
|
||||||
}
|
}
|
||||||
ring.setProcessor(ring.Process)
|
ring.setProcessor(ring.Process)
|
||||||
for name, addr := range opt.Addrs {
|
for name, addr := range opt.Addrs {
|
||||||
|
@ -242,17 +242,21 @@ func (c *Ring) ForEachShard(fn func(client *Client) error) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Ring) cmdInfo(name string) *CommandInfo {
|
func (c *Ring) cmdInfo(name string) *CommandInfo {
|
||||||
c.cmdsInfoOnce.Do(func() {
|
err := c.cmdsInfoOnce.Do(func() error {
|
||||||
|
var firstErr error
|
||||||
for _, shard := range c.shards {
|
for _, shard := range c.shards {
|
||||||
cmdsInfo, err := shard.Client.Command().Result()
|
cmdsInfo, err := shard.Client.Command().Result()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
c.cmdsInfo = cmdsInfo
|
c.cmdsInfo = cmdsInfo
|
||||||
return
|
return nil
|
||||||
|
}
|
||||||
|
if firstErr == nil {
|
||||||
|
firstErr = err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
c.cmdsInfoOnce = &sync.Once{}
|
return firstErr
|
||||||
})
|
})
|
||||||
if c.cmdsInfo == nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return c.cmdsInfo[name]
|
return c.cmdsInfo[name]
|
||||||
|
|
Loading…
Reference in New Issue