Merge pull request from allenwq/clusternode-race

Fix race condition when creating the cluster node
This commit is contained in:
Vladimir Mihailenco 2019-01-08 14:04:09 +02:00 committed by GitHub
commit 22be8a3eaf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 3 additions and 77 deletions
cluster.go
internal/singleflight

View File

@ -17,7 +17,6 @@ import (
"github.com/go-redis/redis/internal/hashtag" "github.com/go-redis/redis/internal/hashtag"
"github.com/go-redis/redis/internal/pool" "github.com/go-redis/redis/internal/pool"
"github.com/go-redis/redis/internal/proto" "github.com/go-redis/redis/internal/proto"
"github.com/go-redis/redis/internal/singleflight"
) )
var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes") var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
@ -243,8 +242,6 @@ type clusterNodes struct {
clusterAddrs []string clusterAddrs []string
closed bool closed bool
nodeCreateGroup singleflight.Group
_generation uint32 // atomic _generation uint32 // atomic
} }
@ -347,11 +344,6 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
return node, nil return node, nil
} }
v, err := c.nodeCreateGroup.Do(addr, func() (interface{}, error) {
node := newClusterNode(c.opt, addr)
return node, nil
})
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
@ -361,15 +353,13 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
node, ok := c.allNodes[addr] node, ok := c.allNodes[addr]
if ok { if ok {
_ = v.(*clusterNode).Close()
return node, err return node, err
} }
node = v.(*clusterNode)
node = newClusterNode(c.opt, addr)
c.allAddrs = appendIfNotExists(c.allAddrs, addr) c.allAddrs = appendIfNotExists(c.allAddrs, addr)
if err == nil { c.clusterAddrs = append(c.clusterAddrs, addr)
c.clusterAddrs = append(c.clusterAddrs, addr)
}
c.allNodes[addr] = node c.allNodes[addr] = node
return node, err return node, err

View File

@ -1,64 +0,0 @@
/*
Copyright 2013 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package singleflight provides a duplicate function call suppression
// mechanism.
package singleflight
import "sync"
// call is an in-flight or completed Do call
type call struct {
wg sync.WaitGroup
val interface{}
err error
}
// Group represents a class of work and forms a namespace in which
// units of work can be executed with duplicate suppression.
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
c.val, c.err = fn()
c.wg.Done()
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
return c.val, c.err
}