redis/cluster_pipeline.go

130 lines
2.7 KiB
Go
Raw Normal View History

2015-03-18 13:41:24 +03:00
package redis
2015-12-30 16:53:45 +03:00
import (
"gopkg.in/redis.v3/internal/hashtag"
)
2015-03-18 13:41:24 +03:00
// ClusterPipeline is not thread-safe.
type ClusterPipeline struct {
commandable
cluster *ClusterClient
2015-12-16 17:11:52 +03:00
cmds []Cmder
closed bool
2015-03-18 13:41:24 +03:00
}
// Pipeline creates a new pipeline which is able to execute commands
2015-09-12 09:36:03 +03:00
// against multiple shards. It's NOT safe for concurrent use by
// multiple goroutines.
2015-03-18 13:41:24 +03:00
func (c *ClusterClient) Pipeline() *ClusterPipeline {
pipe := &ClusterPipeline{
cluster: c,
cmds: make([]Cmder, 0, 10),
}
pipe.commandable.process = pipe.process
return pipe
}
2015-06-04 11:50:24 +03:00
func (pipe *ClusterPipeline) process(cmd Cmder) {
pipe.cmds = append(pipe.cmds, cmd)
2015-03-18 13:41:24 +03:00
}
2015-06-04 11:50:24 +03:00
// Discard resets the pipeline and discards queued commands.
func (pipe *ClusterPipeline) Discard() error {
if pipe.closed {
2015-03-18 13:41:24 +03:00
return errClosed
}
2015-06-04 11:50:24 +03:00
pipe.cmds = pipe.cmds[:0]
2015-03-18 13:41:24 +03:00
return nil
}
2015-06-04 11:50:24 +03:00
func (pipe *ClusterPipeline) Exec() (cmds []Cmder, retErr error) {
if pipe.closed {
2015-03-18 13:41:24 +03:00
return nil, errClosed
}
2015-06-04 11:50:24 +03:00
if len(pipe.cmds) == 0 {
2015-03-18 13:41:24 +03:00
return []Cmder{}, nil
}
2015-06-04 11:50:24 +03:00
cmds = pipe.cmds
pipe.cmds = make([]Cmder, 0, 10)
2015-03-18 13:41:24 +03:00
cmdsMap := make(map[string][]Cmder)
for _, cmd := range cmds {
2015-12-30 16:53:45 +03:00
slot := hashtag.Slot(cmd.clusterKey())
2015-06-04 11:50:24 +03:00
addr := pipe.cluster.slotMasterAddr(slot)
2015-03-18 13:41:24 +03:00
cmdsMap[addr] = append(cmdsMap[addr], cmd)
}
2015-06-04 11:50:24 +03:00
for attempt := 0; attempt <= pipe.cluster.opt.getMaxRedirects(); attempt++ {
2015-03-18 13:41:24 +03:00
failedCmds := make(map[string][]Cmder)
for addr, cmds := range cmdsMap {
2015-06-04 11:50:24 +03:00
client, err := pipe.cluster.getClient(addr)
2015-03-18 13:41:24 +03:00
if err != nil {
setCmdsErr(cmds, err)
retErr = err
continue
}
cn, _, err := client.conn()
2015-03-18 13:41:24 +03:00
if err != nil {
setCmdsErr(cmds, err)
retErr = err
continue
}
2015-06-04 11:50:24 +03:00
failedCmds, err = pipe.execClusterCmds(cn, cmds, failedCmds)
2015-03-18 13:41:24 +03:00
if err != nil {
retErr = err
}
2015-04-17 16:18:44 +03:00
client.putConn(cn, err)
2015-03-18 13:41:24 +03:00
}
cmdsMap = failedCmds
}
return cmds, retErr
}
2015-09-12 09:36:03 +03:00
// Close closes the pipeline, releasing any open resources.
2015-06-04 11:50:24 +03:00
func (pipe *ClusterPipeline) Close() error {
pipe.Discard()
pipe.closed = true
return nil
}
func (pipe *ClusterPipeline) execClusterCmds(
2015-03-18 13:41:24 +03:00
cn *conn, cmds []Cmder, failedCmds map[string][]Cmder,
) (map[string][]Cmder, error) {
if err := cn.writeCmds(cmds...); err != nil {
setCmdsErr(cmds, err)
return failedCmds, err
}
var firstCmdErr error
for i, cmd := range cmds {
2015-10-07 17:09:20 +03:00
err := cmd.readReply(cn)
2015-03-18 13:41:24 +03:00
if err == nil {
continue
}
if isNetworkError(err) {
cmd.reset()
failedCmds[""] = append(failedCmds[""], cmds[i:]...)
break
} else if moved, ask, addr := isMovedError(err); moved {
2015-06-04 11:50:24 +03:00
pipe.cluster.lazyReloadSlots()
2015-03-18 13:41:24 +03:00
cmd.reset()
failedCmds[addr] = append(failedCmds[addr], cmd)
} else if ask {
cmd.reset()
failedCmds[addr] = append(failedCmds[addr], NewCmd("ASKING"), cmd)
} else if firstCmdErr == nil {
firstCmdErr = err
}
}
return failedCmds, firstCmdErr
}