diff --git a/cluster_pipeline.go b/cluster_pipeline.go index 466be7d4..2e119406 100644 --- a/cluster_pipeline.go +++ b/cluster_pipeline.go @@ -20,48 +20,42 @@ func (c *ClusterClient) Pipeline() *ClusterPipeline { return pipe } -func (c *ClusterPipeline) process(cmd Cmder) { - c.cmds = append(c.cmds, cmd) +func (pipe *ClusterPipeline) process(cmd Cmder) { + pipe.cmds = append(pipe.cmds, cmd) } -// Close marks the pipeline as closed -func (c *ClusterPipeline) Close() error { - c.closed = true - return nil -} - -// Discard resets the pipeline and discards queued commands -func (c *ClusterPipeline) Discard() error { - if c.closed { +// Discard resets the pipeline and discards queued commands. +func (pipe *ClusterPipeline) Discard() error { + if pipe.closed { return errClosed } - c.cmds = c.cmds[:0] + pipe.cmds = pipe.cmds[:0] return nil } -func (c *ClusterPipeline) Exec() (cmds []Cmder, retErr error) { - if c.closed { +func (pipe *ClusterPipeline) Exec() (cmds []Cmder, retErr error) { + if pipe.closed { return nil, errClosed } - if len(c.cmds) == 0 { + if len(pipe.cmds) == 0 { return []Cmder{}, nil } - cmds = c.cmds - c.cmds = make([]Cmder, 0, 10) + cmds = pipe.cmds + pipe.cmds = make([]Cmder, 0, 10) cmdsMap := make(map[string][]Cmder) for _, cmd := range cmds { slot := hashSlot(cmd.clusterKey()) - addr := c.cluster.slotMasterAddr(slot) + addr := pipe.cluster.slotMasterAddr(slot) cmdsMap[addr] = append(cmdsMap[addr], cmd) } - for attempt := 0; attempt <= c.cluster.opt.getMaxRedirects(); attempt++ { + for attempt := 0; attempt <= pipe.cluster.opt.getMaxRedirects(); attempt++ { failedCmds := make(map[string][]Cmder) for addr, cmds := range cmdsMap { - client, err := c.cluster.getClient(addr) + client, err := pipe.cluster.getClient(addr) if err != nil { setCmdsErr(cmds, err) retErr = err @@ -75,7 +69,7 @@ func (c *ClusterPipeline) Exec() (cmds []Cmder, retErr error) { continue } - failedCmds, err = c.execClusterCmds(cn, cmds, failedCmds) + failedCmds, err = pipe.execClusterCmds(cn, cmds, failedCmds) if err != nil { retErr = err } @@ -88,7 +82,14 @@ func (c *ClusterPipeline) Exec() (cmds []Cmder, retErr error) { return cmds, retErr } -func (c *ClusterPipeline) execClusterCmds( +// Close marks the pipeline as closed +func (pipe *ClusterPipeline) Close() error { + pipe.Discard() + pipe.closed = true + return nil +} + +func (pipe *ClusterPipeline) execClusterCmds( cn *conn, cmds []Cmder, failedCmds map[string][]Cmder, ) (map[string][]Cmder, error) { if err := cn.writeCmds(cmds...); err != nil { @@ -107,7 +108,7 @@ func (c *ClusterPipeline) execClusterCmds( failedCmds[""] = append(failedCmds[""], cmds[i:]...) break } else if moved, ask, addr := isMovedError(err); moved { - c.cluster.lazyReloadSlots() + pipe.cluster.lazyReloadSlots() cmd.reset() failedCmds[addr] = append(failedCmds[addr], cmd) } else if ask { diff --git a/pipeline.go b/pipeline.go index 62cf7fd1..8981cb50 100644 --- a/pipeline.go +++ b/pipeline.go @@ -1,102 +1,113 @@ package redis -// Not thread-safe. +// Pipeline implements pipelining as described in +// http://redis.io/topics/pipelining. +// +// Pipeline is not thread-safe. type Pipeline struct { commandable - cmds []Cmder client *baseClient + + cmds []Cmder closed bool } func (c *Client) Pipeline() *Pipeline { pipe := &Pipeline{ - client: &baseClient{ - opt: c.opt, - connPool: c.connPool, - }, - cmds: make([]Cmder, 0, 10), + client: c.baseClient, + cmds: make([]Cmder, 0, 10), } pipe.commandable.process = pipe.process return pipe } -func (c *Client) Pipelined(f func(*Pipeline) error) ([]Cmder, error) { - pc := c.Pipeline() - if err := f(pc); err != nil { +func (c *Client) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) { + pipe := c.Pipeline() + if err := fn(pipe); err != nil { return nil, err } - cmds, err := pc.Exec() - pc.Close() + cmds, err := pipe.Exec() + pipe.Close() return cmds, err } -func (c *Pipeline) process(cmd Cmder) { - c.cmds = append(c.cmds, cmd) +func (pipe *Pipeline) process(cmd Cmder) { + pipe.cmds = append(pipe.cmds, cmd) } -func (c *Pipeline) Close() error { - c.closed = true +func (pipe *Pipeline) Close() error { + pipe.Discard() + pipe.closed = true return nil } -func (c *Pipeline) Discard() error { - if c.closed { +// Discard resets the pipeline and discards queued commands. +func (pipe *Pipeline) Discard() error { + if pipe.closed { return errClosed } - c.cmds = c.cmds[:0] + pipe.cmds = pipe.cmds[:0] return nil } // Exec always returns list of commands and error of the first failed // command if any. -func (c *Pipeline) Exec() (cmds []Cmder, retErr error) { - if c.closed { +func (pipe *Pipeline) Exec() (cmds []Cmder, retErr error) { + if pipe.closed { return nil, errClosed } - if len(c.cmds) == 0 { - return c.cmds, nil + if len(pipe.cmds) == 0 { + return pipe.cmds, nil } - cmds = c.cmds - c.cmds = make([]Cmder, 0, 0) + cmds = pipe.cmds + pipe.cmds = make([]Cmder, 0, 10) - for i := 0; i <= c.client.opt.MaxRetries; i++ { - if i > 0 { - resetCmds(cmds) - } - - cn, err := c.client.conn() + failedCmds := cmds + for i := 0; i <= pipe.client.opt.MaxRetries; i++ { + cn, err := pipe.client.conn() if err != nil { - setCmdsErr(cmds, err) + setCmdsErr(failedCmds, err) return cmds, err } - retErr = c.execCmds(cn, cmds) - c.client.putConn(cn, err) - if shouldRetry(err) { - continue + if i > 0 { + resetCmds(failedCmds) + } + failedCmds, err = execCmds(cn, failedCmds) + pipe.client.putConn(cn, err) + if err != nil && retErr == nil { + retErr = err + } + if len(failedCmds) == 0 { + break } - - break } return cmds, retErr } -func (c *Pipeline) execCmds(cn *conn, cmds []Cmder) error { +func execCmds(cn *conn, cmds []Cmder) ([]Cmder, error) { if err := cn.writeCmds(cmds...); err != nil { setCmdsErr(cmds, err) - return err + return cmds, err } var firstCmdErr error + var failedCmds []Cmder for _, cmd := range cmds { err := cmd.parseReply(cn.rd) - if err != nil && firstCmdErr == nil { + if err == nil { + continue + } + if firstCmdErr == nil { firstCmdErr = err } + if shouldRetry(err) { + failedCmds = append(failedCmds, cmd) + } } - return firstCmdErr + return failedCmds, firstCmdErr } diff --git a/ring.go b/ring.go index 4772581f..2124357c 100644 --- a/ring.go +++ b/ring.go @@ -25,6 +25,8 @@ type RingOptions struct { DB int64 Password string + MaxRetries int + DialTimeout time.Duration ReadTimeout time.Duration WriteTimeout time.Duration @@ -105,6 +107,7 @@ func (shard *ringShard) Vote(up bool) bool { type Ring struct { commandable + opt *RingOptions nreplicas int mx sync.RWMutex @@ -117,9 +120,11 @@ type Ring struct { func NewRing(opt *RingOptions) *Ring { const nreplicas = 100 ring := &Ring{ + opt: opt, nreplicas: nreplicas, - hash: consistenthash.New(nreplicas, nil), - shards: make(map[string]*ringShard), + + hash: consistenthash.New(nreplicas, nil), + shards: make(map[string]*ringShard), } ring.commandable.process = ring.process for name, addr := range opt.Addrs { @@ -235,3 +240,115 @@ func (ring *Ring) Close() (retErr error) { return retErr } + +// RingPipeline creates a new pipeline which is able to execute commands +// against multiple shards. +type RingPipeline struct { + commandable + + ring *Ring + + cmds []Cmder + closed bool +} + +func (ring *Ring) Pipeline() *RingPipeline { + pipe := &RingPipeline{ + ring: ring, + cmds: make([]Cmder, 0, 10), + } + pipe.commandable.process = pipe.process + return pipe +} + +func (ring *Ring) Pipelined(fn func(*RingPipeline) error) ([]Cmder, error) { + pipe := ring.Pipeline() + if err := fn(pipe); err != nil { + return nil, err + } + cmds, err := pipe.Exec() + pipe.Close() + return cmds, err +} + +func (pipe *RingPipeline) process(cmd Cmder) { + pipe.cmds = append(pipe.cmds, cmd) +} + +// Discard resets the pipeline and discards queued commands. +func (pipe *RingPipeline) Discard() error { + if pipe.closed { + return errClosed + } + pipe.cmds = pipe.cmds[:0] + return nil +} + +// Exec always returns list of commands and error of the first failed +// command if any. +func (pipe *RingPipeline) Exec() (cmds []Cmder, retErr error) { + if pipe.closed { + return nil, errClosed + } + if len(pipe.cmds) == 0 { + return pipe.cmds, nil + } + + cmds = pipe.cmds + pipe.cmds = make([]Cmder, 0, 10) + + cmdsMap := make(map[string][]Cmder) + for _, cmd := range cmds { + name := pipe.ring.hash.Get(cmd.clusterKey()) + cmdsMap[name] = append(cmdsMap[name], cmd) + } + + for i := 0; i <= pipe.ring.opt.MaxRetries; i++ { + failedCmdsMap := make(map[string][]Cmder) + + for name, cmds := range cmdsMap { + client, err := pipe.ring.getClient(name) + if err != nil { + setCmdsErr(cmds, err) + if retErr == nil { + retErr = err + } + continue + } + + cn, err := client.conn() + if err != nil { + setCmdsErr(cmds, err) + if retErr == nil { + retErr = err + } + continue + } + + if i > 0 { + resetCmds(cmds) + } + failedCmds, err := execCmds(cn, cmds) + client.putConn(cn, err) + if err != nil && retErr == nil { + retErr = err + } + if len(failedCmds) > 0 { + failedCmdsMap[name] = failedCmds + } + } + + if len(failedCmdsMap) == 0 { + break + } + cmdsMap = failedCmdsMap + } + + return cmds, retErr +} + +func (pipe *RingPipeline) Close() error { + pipe.Discard() + pipe.closed = true + return nil +} diff --git a/ring_test.go b/ring_test.go index 35212c3f..6234aeed 100644 --- a/ring_test.go +++ b/ring_test.go @@ -81,4 +81,25 @@ var _ = Describe("Redis ring", func() { // RingShard2 should have its keys. Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=43")) }) + + It("supports pipelining", func() { + pipe := ring.Pipeline() + for i := 0; i < 100; i++ { + err := pipe.Set(fmt.Sprintf("key%d", i), "value", 0).Err() + Expect(err).NotTo(HaveOccurred()) + } + cmds, err := pipe.Exec() + Expect(err).NotTo(HaveOccurred()) + Expect(cmds).To(HaveLen(100)) + Expect(pipe.Close()).NotTo(HaveOccurred()) + + for _, cmd := range cmds { + Expect(cmd.Err()).NotTo(HaveOccurred()) + Expect(cmd.(*redis.StatusCmd).Val()).To(Equal("OK")) + } + + // Both shards should have some keys now. + Expect(ringShard1.Info().Val()).To(ContainSubstring("keys=57")) + Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=43")) + }) })