Clarify thread safety. Fixes #166.

This commit is contained in:
Vladimir Mihailenco 2015-09-12 09:36:03 +03:00
parent 02154c3b3a
commit c809246d8b
8 changed files with 33 additions and 16 deletions

View File

@ -9,6 +9,9 @@ import (
"time" "time"
) )
// ClusterClient is a Redis Cluster client representing a pool of zero
// or more underlying connections. It's safe for concurrent use by
// multiple goroutines.
type ClusterClient struct { type ClusterClient struct {
commandable commandable
@ -26,7 +29,7 @@ type ClusterClient struct {
reloading uint32 reloading uint32
} }
// NewClusterClient returns a new Redis Cluster client as described in // NewClusterClient returns a Redis Cluster client as described in
// http://redis.io/topics/cluster-spec. // http://redis.io/topics/cluster-spec.
func NewClusterClient(opt *ClusterOptions) *ClusterClient { func NewClusterClient(opt *ClusterOptions) *ClusterClient {
client := &ClusterClient{ client := &ClusterClient{
@ -43,8 +46,8 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
// Close closes the cluster client, releasing any open resources. // Close closes the cluster client, releasing any open resources.
// //
// It is rare to Close a Client, as the Client is meant to be // It is rare to Close a ClusterClient, as the ClusterClient is meant
// long-lived and shared between many goroutines. // to be long-lived and shared between many goroutines.
func (c *ClusterClient) Close() error { func (c *ClusterClient) Close() error {
defer c.clientsMx.Unlock() defer c.clientsMx.Unlock()
c.clientsMx.Lock() c.clientsMx.Lock()

View File

@ -10,7 +10,8 @@ type ClusterPipeline struct {
} }
// Pipeline creates a new pipeline which is able to execute commands // Pipeline creates a new pipeline which is able to execute commands
// against multiple shards. // against multiple shards. It's NOT safe for concurrent use by
// multiple goroutines.
func (c *ClusterClient) Pipeline() *ClusterPipeline { func (c *ClusterClient) Pipeline() *ClusterPipeline {
pipe := &ClusterPipeline{ pipe := &ClusterPipeline{
cluster: c, cluster: c,
@ -82,7 +83,7 @@ func (pipe *ClusterPipeline) Exec() (cmds []Cmder, retErr error) {
return cmds, retErr return cmds, retErr
} }
// Close marks the pipeline as closed // Close closes the pipeline, releasing any open resources.
func (pipe *ClusterPipeline) Close() error { func (pipe *ClusterPipeline) Close() error {
pipe.Discard() pipe.Discard()
pipe.closed = true pipe.closed = true

View File

@ -9,7 +9,8 @@ import (
var errDiscard = errors.New("redis: Discard can be used only inside Exec") var errDiscard = errors.New("redis: Discard can be used only inside Exec")
// Multi implements Redis transactions as described in // Multi implements Redis transactions as described in
// http://redis.io/topics/transactions. // http://redis.io/topics/transactions. It's NOT safe for concurrent
// use by multiple goroutines.
type Multi struct { type Multi struct {
commandable commandable

View File

@ -1,9 +1,8 @@
package redis package redis
// Pipeline implements pipelining as described in // Pipeline implements pipelining as described in
// http://redis.io/topics/pipelining. // http://redis.io/topics/pipelining. It's NOT safe for concurrent use
// // by multiple goroutines.
// Pipeline is not thread-safe.
type Pipeline struct { type Pipeline struct {
commandable commandable
@ -36,6 +35,7 @@ func (pipe *Pipeline) process(cmd Cmder) {
pipe.cmds = append(pipe.cmds, cmd) pipe.cmds = append(pipe.cmds, cmd)
} }
// Close closes the pipeline, releasing any open resources.
func (pipe *Pipeline) Close() error { func (pipe *Pipeline) Close() error {
pipe.Discard() pipe.Discard()
pipe.closed = true pipe.closed = true

View File

@ -15,7 +15,8 @@ func (c *Client) Publish(channel, message string) *IntCmd {
} }
// PubSub implements Pub/Sub commands as described in // PubSub implements Pub/Sub commands as described in
// http://redis.io/topics/pubsub. // http://redis.io/topics/pubsub. It's NOT safe for concurrent use by
// multiple goroutines.
type PubSub struct { type PubSub struct {
*baseClient *baseClient

View File

@ -80,6 +80,9 @@ func (c *baseClient) process(cmd Cmder) {
} }
// Close closes the client, releasing any open resources. // Close closes the client, releasing any open resources.
//
// It is rare to Close a Client, as the Client is meant to be
// long-lived and shared between many goroutines.
func (c *baseClient) Close() error { func (c *baseClient) Close() error {
return c.connPool.Close() return c.connPool.Close()
} }
@ -173,6 +176,9 @@ func (opt *Options) getIdleTimeout() time.Duration {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// Client is a Redis client representing a pool of zero or more
// underlying connections. It's safe for concurrent use by multiple
// goroutines.
type Client struct { type Client struct {
*baseClient *baseClient
commandable commandable
@ -186,6 +192,7 @@ func newClient(opt *Options, pool pool) *Client {
} }
} }
// NewClient returns a client to the Redis Server specified by Options.
func NewClient(opt *Options) *Client { func NewClient(opt *Options) *Client {
pool := newConnPool(opt) pool := newConnPool(opt)
return newClient(opt, pool) return newClient(opt, pool)

11
ring.go
View File

@ -92,7 +92,8 @@ func (shard *ringShard) Vote(up bool) bool {
} }
// Ring is a Redis client that uses constistent hashing to distribute // Ring is a Redis client that uses constistent hashing to distribute
// keys across multiple Redis servers (shards). // keys across multiple Redis servers (shards). It's safe for
// concurrent use by multiple goroutines.
// //
// It monitors the state of each shard and removes dead shards from // It monitors the state of each shard and removes dead shards from
// the ring. When shard comes online it is added back to the ring. This // the ring. When shard comes online it is added back to the ring. This
@ -215,8 +216,8 @@ func (ring *Ring) heartbeat() {
// Close closes the ring client, releasing any open resources. // Close closes the ring client, releasing any open resources.
// //
// It is rare to Close a Client, as the Client is meant to be // It is rare to Close a Ring, as the Ring is meant to be long-lived
// long-lived and shared between many goroutines. // and shared between many goroutines.
func (ring *Ring) Close() (retErr error) { func (ring *Ring) Close() (retErr error) {
defer ring.mx.Unlock() defer ring.mx.Unlock()
ring.mx.Lock() ring.mx.Lock()
@ -238,7 +239,8 @@ func (ring *Ring) Close() (retErr error) {
} }
// RingPipeline creates a new pipeline which is able to execute commands // RingPipeline creates a new pipeline which is able to execute commands
// against multiple shards. // against multiple shards. It's NOT safe for concurrent use by
// multiple goroutines.
type RingPipeline struct { type RingPipeline struct {
commandable commandable
@ -342,6 +344,7 @@ func (pipe *RingPipeline) Exec() (cmds []Cmder, retErr error) {
return cmds, retErr return cmds, retErr
} }
// Close closes the pipeline, releasing any open resources.
func (pipe *RingPipeline) Close() error { func (pipe *RingPipeline) Close() error {
pipe.Discard() pipe.Discard()
pipe.closed = true pipe.closed = true

View File

@ -54,8 +54,9 @@ func (opt *FailoverOptions) options() *Options {
} }
} }
// NewFailoverClient returns a Redis client with automatic failover // NewFailoverClient returns a Redis client that uses Redis Sentinel
// capabilities using Redis Sentinel. // for automatic failover. It's safe for concurrent use by multiple
// goroutines.
func NewFailoverClient(failoverOpt *FailoverOptions) *Client { func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
opt := failoverOpt.options() opt := failoverOpt.options()
failover := &sentinelFailover{ failover := &sentinelFailover{