diff --git a/.travis.yml b/.travis.yml index f8e0d652..7df76b9c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -16,5 +16,6 @@ matrix: - go: tip install: + - go get go4.org/syncutil - go get github.com/onsi/ginkgo - go get github.com/onsi/gomega diff --git a/cluster.go b/cluster.go index e3c5832f..51e94a75 100644 --- a/cluster.go +++ b/cluster.go @@ -7,6 +7,8 @@ import ( "sync/atomic" "time" + "go4.org/syncutil" + "github.com/go-redis/redis/internal" "github.com/go-redis/redis/internal/hashtag" "github.com/go-redis/redis/internal/pool" @@ -335,10 +337,12 @@ type ClusterClient struct { cmdable opt *ClusterOptions - cmds map[string]*CommandInfo nodes *clusterNodes _state atomic.Value + cmdsInfoOnce syncutil.Once + cmdsInfo map[string]*CommandInfo + // Reports where slots reloading is in progress. reloading uint32 } @@ -389,13 +393,34 @@ func (c *ClusterClient) state() *clusterState { return nil } +func (c *ClusterClient) cmdInfo(name string) *CommandInfo { + err := c.cmdsInfoOnce.Do(func() error { + node, err := c.nodes.Random() + if err != nil { + return err + } + + cmdsInfo, err := node.Client.Command().Result() + if err != nil { + return err + } + + c.cmdsInfo = cmdsInfo + return nil + }) + if err != nil { + return nil + } + return c.cmdsInfo[name] +} + func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *clusterNode, error) { if state == nil { node, err := c.nodes.Random() return 0, node, err } - cmdInfo := c.cmds[cmd.Name()] + cmdInfo := c.cmdInfo(cmd.Name()) firstKey := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo)) slot := hashtag.Slot(firstKey) @@ -631,15 +656,6 @@ func (c *ClusterClient) reloadSlots() (*clusterState, error) { return nil, err } - // TODO: fix race - if c.cmds == nil { - cmds, err := node.Client.Command().Result() - if err != nil { - return nil, err - } - c.cmds = cmds - } - slots, err := node.Client.ClusterSlots().Result() if err != nil { return nil, err diff --git a/ring.go b/ring.go index a9666bc7..5d945190 100644 --- a/ring.go +++ b/ring.go @@ -9,6 +9,8 @@ import ( "sync/atomic" "time" + "go4.org/syncutil" + "github.com/go-redis/redis/internal" "github.com/go-redis/redis/internal/consistenthash" "github.com/go-redis/redis/internal/hashtag" @@ -134,7 +136,7 @@ type Ring struct { hash *consistenthash.Map shards map[string]*ringShard - cmdsInfoOnce *sync.Once + cmdsInfoOnce syncutil.Once cmdsInfo map[string]*CommandInfo closed bool @@ -149,8 +151,6 @@ func NewRing(opt *RingOptions) *Ring { hash: consistenthash.New(nreplicas, nil), shards: make(map[string]*ringShard), - - cmdsInfoOnce: new(sync.Once), } ring.setProcessor(ring.Process) for name, addr := range opt.Addrs { @@ -242,17 +242,21 @@ func (c *Ring) ForEachShard(fn func(client *Client) error) error { } func (c *Ring) cmdInfo(name string) *CommandInfo { - c.cmdsInfoOnce.Do(func() { + err := c.cmdsInfoOnce.Do(func() error { + var firstErr error for _, shard := range c.shards { cmdsInfo, err := shard.Client.Command().Result() if err == nil { c.cmdsInfo = cmdsInfo - return + return nil + } + if firstErr == nil { + firstErr = err } } - c.cmdsInfoOnce = &sync.Once{} + return firstErr }) - if c.cmdsInfo == nil { + if err != nil { return nil } return c.cmdsInfo[name]