forked from mirror/redis
Merge pull request #77 from go-redis/feature/cluster-pipeline-rebas
Feature/cluster pipeline rebas
This commit is contained in:
commit
053939b6e8
75
cluster.go
75
cluster.go
|
@ -47,12 +47,16 @@ func (c *ClusterClient) Close() error {
|
||||||
// ------------------------------------------------------------------------
|
// ------------------------------------------------------------------------
|
||||||
|
|
||||||
// getClient returns a Client for a given address.
|
// getClient returns a Client for a given address.
|
||||||
func (c *ClusterClient) getClient(addr string) *Client {
|
func (c *ClusterClient) getClient(addr string) (*Client, error) {
|
||||||
|
if addr == "" {
|
||||||
|
return c.randomClient()
|
||||||
|
}
|
||||||
|
|
||||||
c.clientsMx.RLock()
|
c.clientsMx.RLock()
|
||||||
client, ok := c.clients[addr]
|
client, ok := c.clients[addr]
|
||||||
if ok {
|
if ok {
|
||||||
c.clientsMx.RUnlock()
|
c.clientsMx.RUnlock()
|
||||||
return client
|
return client, nil
|
||||||
}
|
}
|
||||||
c.clientsMx.RUnlock()
|
c.clientsMx.RUnlock()
|
||||||
|
|
||||||
|
@ -66,14 +70,24 @@ func (c *ClusterClient) getClient(addr string) *Client {
|
||||||
}
|
}
|
||||||
c.clientsMx.Unlock()
|
c.clientsMx.Unlock()
|
||||||
|
|
||||||
return client
|
return client, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClusterClient) slotAddrs(slot int) []string {
|
||||||
|
c.slotsMx.RLock()
|
||||||
|
addrs := c.slots[slot]
|
||||||
|
c.slotsMx.RUnlock()
|
||||||
|
return addrs
|
||||||
}
|
}
|
||||||
|
|
||||||
// randomClient returns a Client for the first live node.
|
// randomClient returns a Client for the first live node.
|
||||||
func (c *ClusterClient) randomClient() (client *Client, err error) {
|
func (c *ClusterClient) randomClient() (client *Client, err error) {
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
n := rand.Intn(len(c.addrs))
|
n := rand.Intn(len(c.addrs))
|
||||||
client = c.getClient(c.addrs[n])
|
client, err = c.getClient(c.addrs[n])
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
err = client.Ping().Err()
|
err = client.Ping().Err()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return client, nil
|
return client, nil
|
||||||
|
@ -82,27 +96,22 @@ func (c *ClusterClient) randomClient() (client *Client, err error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process a command
|
|
||||||
func (c *ClusterClient) process(cmd Cmder) {
|
func (c *ClusterClient) process(cmd Cmder) {
|
||||||
var client *Client
|
|
||||||
var ask bool
|
var ask bool
|
||||||
|
|
||||||
c.reloadIfDue()
|
c.reloadIfDue()
|
||||||
|
|
||||||
slot := hashSlot(cmd.clusterKey())
|
slot := hashSlot(cmd.clusterKey())
|
||||||
c.slotsMx.RLock()
|
|
||||||
addrs := c.slots[slot]
|
|
||||||
c.slotsMx.RUnlock()
|
|
||||||
|
|
||||||
if len(addrs) > 0 {
|
var addr string
|
||||||
client = c.getClient(addrs[0]) // First address is master.
|
if addrs := c.slotAddrs(slot); len(addrs) > 0 {
|
||||||
} else {
|
addr = addrs[0] // First address is master.
|
||||||
var err error
|
}
|
||||||
client, err = c.randomClient()
|
|
||||||
if err != nil {
|
client, err := c.getClient(addr)
|
||||||
cmd.setErr(err)
|
if err != nil {
|
||||||
return
|
cmd.setErr(err)
|
||||||
}
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
for attempt := 0; attempt <= c.opt.getMaxRedirects(); attempt++ {
|
for attempt := 0; attempt <= c.opt.getMaxRedirects(); attempt++ {
|
||||||
|
@ -132,24 +141,22 @@ func (c *ClusterClient) process(cmd Cmder) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check the error message, return if unexpected
|
var moved bool
|
||||||
parts := strings.SplitN(err.Error(), " ", 3)
|
var addr string
|
||||||
if len(parts) != 3 {
|
moved, ask, addr = isMovedError(err)
|
||||||
return
|
if moved || ask {
|
||||||
|
if moved {
|
||||||
|
c.scheduleReload()
|
||||||
|
}
|
||||||
|
client, err = c.getClient(addr)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
cmd.reset()
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle MOVE and ASK redirections, return on any other error
|
break
|
||||||
switch parts[0] {
|
|
||||||
case "MOVED":
|
|
||||||
c.scheduleReload()
|
|
||||||
client = c.getClient(parts[2])
|
|
||||||
case "ASK":
|
|
||||||
ask = true
|
|
||||||
client = c.getClient(parts[2])
|
|
||||||
default:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
cmd.reset()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,6 +5,23 @@ import (
|
||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// GetSlot returns the cached slot addresses
|
||||||
|
func (c *ClusterClient) GetSlot(pos int) []string {
|
||||||
|
c.slotsMx.RLock()
|
||||||
|
defer c.slotsMx.RUnlock()
|
||||||
|
|
||||||
|
return c.slots[pos]
|
||||||
|
}
|
||||||
|
|
||||||
|
// SwapSlot swaps a slot's master/slave address
|
||||||
|
// for testing MOVED redirects
|
||||||
|
func (c *ClusterClient) SwapSlot(pos int) []string {
|
||||||
|
c.slotsMx.Lock()
|
||||||
|
defer c.slotsMx.Unlock()
|
||||||
|
c.slots[pos][0], c.slots[pos][1] = c.slots[pos][1], c.slots[pos][0]
|
||||||
|
return c.slots[pos]
|
||||||
|
}
|
||||||
|
|
||||||
var _ = Describe("ClusterClient", func() {
|
var _ = Describe("ClusterClient", func() {
|
||||||
var subject *ClusterClient
|
var subject *ClusterClient
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,128 @@
|
||||||
|
package redis
|
||||||
|
|
||||||
|
// ClusterPipeline is not thread-safe.
|
||||||
|
type ClusterPipeline struct {
|
||||||
|
commandable
|
||||||
|
|
||||||
|
cmds []Cmder
|
||||||
|
cluster *ClusterClient
|
||||||
|
closed bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pipeline creates a new pipeline which is able to execute commands
|
||||||
|
// against multiple shards.
|
||||||
|
func (c *ClusterClient) Pipeline() *ClusterPipeline {
|
||||||
|
pipe := &ClusterPipeline{
|
||||||
|
cluster: c,
|
||||||
|
cmds: make([]Cmder, 0, 10),
|
||||||
|
}
|
||||||
|
pipe.commandable.process = pipe.process
|
||||||
|
return pipe
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClusterPipeline) process(cmd Cmder) {
|
||||||
|
c.cmds = append(c.cmds, cmd)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close marks the pipeline as closed
|
||||||
|
func (c *ClusterPipeline) Close() error {
|
||||||
|
c.closed = true
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Discard resets the pipeline and discards queued commands
|
||||||
|
func (c *ClusterPipeline) Discard() error {
|
||||||
|
if c.closed {
|
||||||
|
return errClosed
|
||||||
|
}
|
||||||
|
c.cmds = c.cmds[:0]
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClusterPipeline) Exec() (cmds []Cmder, retErr error) {
|
||||||
|
if c.closed {
|
||||||
|
return nil, errClosed
|
||||||
|
}
|
||||||
|
if len(c.cmds) == 0 {
|
||||||
|
return []Cmder{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
cmds = c.cmds
|
||||||
|
c.cmds = make([]Cmder, 0, 10)
|
||||||
|
|
||||||
|
cmdsMap := make(map[string][]Cmder)
|
||||||
|
for _, cmd := range cmds {
|
||||||
|
slot := hashSlot(cmd.clusterKey())
|
||||||
|
addrs := c.cluster.slotAddrs(slot)
|
||||||
|
|
||||||
|
var addr string
|
||||||
|
if len(addrs) > 0 {
|
||||||
|
addr = addrs[0] // First address is master.
|
||||||
|
}
|
||||||
|
|
||||||
|
cmdsMap[addr] = append(cmdsMap[addr], cmd)
|
||||||
|
}
|
||||||
|
|
||||||
|
for attempt := 0; attempt <= c.cluster.opt.getMaxRedirects(); attempt++ {
|
||||||
|
failedCmds := make(map[string][]Cmder)
|
||||||
|
|
||||||
|
for addr, cmds := range cmdsMap {
|
||||||
|
client, err := c.cluster.getClient(addr)
|
||||||
|
if err != nil {
|
||||||
|
setCmdsErr(cmds, err)
|
||||||
|
retErr = err
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
cn, err := client.conn()
|
||||||
|
if err != nil {
|
||||||
|
setCmdsErr(cmds, err)
|
||||||
|
retErr = err
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
failedCmds, err = c.execClusterCmds(cn, cmds, failedCmds)
|
||||||
|
if err != nil {
|
||||||
|
retErr = err
|
||||||
|
}
|
||||||
|
client.freeConn(cn, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cmdsMap = failedCmds
|
||||||
|
}
|
||||||
|
|
||||||
|
return cmds, retErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClusterPipeline) execClusterCmds(
|
||||||
|
cn *conn, cmds []Cmder, failedCmds map[string][]Cmder,
|
||||||
|
) (map[string][]Cmder, error) {
|
||||||
|
if err := cn.writeCmds(cmds...); err != nil {
|
||||||
|
setCmdsErr(cmds, err)
|
||||||
|
return failedCmds, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var firstCmdErr error
|
||||||
|
for i, cmd := range cmds {
|
||||||
|
err := cmd.parseReply(cn.rd)
|
||||||
|
if err == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if isNetworkError(err) {
|
||||||
|
cmd.reset()
|
||||||
|
failedCmds[""] = append(failedCmds[""], cmds[i:]...)
|
||||||
|
break
|
||||||
|
} else if moved, ask, addr := isMovedError(err); moved {
|
||||||
|
c.cluster.scheduleReload()
|
||||||
|
cmd.reset()
|
||||||
|
failedCmds[addr] = append(failedCmds[addr], cmd)
|
||||||
|
} else if ask {
|
||||||
|
cmd.reset()
|
||||||
|
failedCmds[addr] = append(failedCmds[addr], NewCmd("ASKING"), cmd)
|
||||||
|
} else if firstCmdErr == nil {
|
||||||
|
firstCmdErr = err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return failedCmds, firstCmdErr
|
||||||
|
}
|
|
@ -2,9 +2,11 @@ package redis_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"time"
|
||||||
|
|
||||||
. "github.com/onsi/ginkgo"
|
. "github.com/onsi/ginkgo"
|
||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
|
|
||||||
"gopkg.in/redis.v2"
|
"gopkg.in/redis.v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -181,22 +183,50 @@ var _ = Describe("Cluster", func() {
|
||||||
It("should follow redirects", func() {
|
It("should follow redirects", func() {
|
||||||
Expect(client.Set("A", "VALUE", 0).Err()).NotTo(HaveOccurred())
|
Expect(client.Set("A", "VALUE", 0).Err()).NotTo(HaveOccurred())
|
||||||
Expect(redis.HashSlot("A")).To(Equal(6373))
|
Expect(redis.HashSlot("A")).To(Equal(6373))
|
||||||
|
Expect(client.SwapSlot(6373)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
|
||||||
// Slot 6373 is stored on the second node
|
|
||||||
defer func() {
|
|
||||||
scenario.masters()[1].ClusterFailover()
|
|
||||||
}()
|
|
||||||
|
|
||||||
slave := scenario.slaves()[1]
|
|
||||||
Expect(slave.ClusterFailover().Err()).NotTo(HaveOccurred())
|
|
||||||
Eventually(func() string {
|
|
||||||
return slave.Info().Val()
|
|
||||||
}, "10s", "200ms").Should(ContainSubstring("role:master"))
|
|
||||||
|
|
||||||
val, err := client.Get("A").Result()
|
val, err := client.Get("A").Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(val).To(Equal("VALUE"))
|
Expect(val).To(Equal("VALUE"))
|
||||||
|
Expect(client.GetSlot(6373)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
|
||||||
|
|
||||||
|
val, err = client.Get("A").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(val).To(Equal("VALUE"))
|
||||||
|
Expect(client.GetSlot(6373)).To(Equal([]string{"127.0.0.1:8221", "127.0.0.1:8224"}))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("should perform multi-pipelines", func() {
|
||||||
|
// Dummy command to load slots info.
|
||||||
|
Expect(client.Ping().Err()).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
slot := redis.HashSlot("A")
|
||||||
|
Expect(client.SwapSlot(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
|
||||||
|
|
||||||
|
pipe := client.Pipeline()
|
||||||
|
defer pipe.Close()
|
||||||
|
|
||||||
|
keys := []string{"A", "B", "C", "D", "E", "F", "G"}
|
||||||
|
for i, key := range keys {
|
||||||
|
pipe.Set(key, key+"_value", 0)
|
||||||
|
pipe.Expire(key, time.Duration(i+1)*time.Hour)
|
||||||
|
}
|
||||||
|
for _, key := range keys {
|
||||||
|
pipe.Get(key)
|
||||||
|
pipe.TTL(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
cmds, err := pipe.Exec()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(cmds).To(HaveLen(28))
|
||||||
|
Expect(cmds[14].(*redis.StringCmd).Val()).To(Equal("A_value"))
|
||||||
|
Expect(cmds[15].(*redis.DurationCmd).Val()).To(BeNumerically("~", 1*time.Hour, time.Second))
|
||||||
|
Expect(cmds[20].(*redis.StringCmd).Val()).To(Equal("D_value"))
|
||||||
|
Expect(cmds[21].(*redis.DurationCmd).Val()).To(BeNumerically("~", 4*time.Hour, time.Second))
|
||||||
|
Expect(cmds[26].(*redis.StringCmd).Val()).To(Equal("G_value"))
|
||||||
|
Expect(cmds[27].(*redis.DurationCmd).Val()).To(BeNumerically("~", 7*time.Hour, time.Second))
|
||||||
|
})
|
||||||
|
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -117,51 +117,52 @@ var _ = Describe("Command", func() {
|
||||||
})
|
})
|
||||||
|
|
||||||
Describe("races", func() {
|
Describe("races", func() {
|
||||||
|
var C, N = 10, 1000
|
||||||
|
if testing.Short() {
|
||||||
|
N = 100
|
||||||
|
}
|
||||||
|
|
||||||
It("should echo", func() {
|
It("should echo", func() {
|
||||||
var n = 10000
|
|
||||||
if testing.Short() {
|
|
||||||
n = 1000
|
|
||||||
}
|
|
||||||
|
|
||||||
wg := &sync.WaitGroup{}
|
wg := &sync.WaitGroup{}
|
||||||
wg.Add(n)
|
for i := 0; i < C; i++ {
|
||||||
for i := 0; i < n; i++ {
|
wg.Add(1)
|
||||||
|
|
||||||
go func(i int) {
|
go func(i int) {
|
||||||
|
defer GinkgoRecover()
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
msg := "echo" + strconv.Itoa(i)
|
for j := 0; j < N; j++ {
|
||||||
echo := client.Echo(msg)
|
msg := "echo" + strconv.Itoa(i)
|
||||||
Expect(echo.Err()).NotTo(HaveOccurred())
|
echo := client.Echo(msg)
|
||||||
Expect(echo.Val()).To(Equal(msg))
|
Expect(echo.Err()).NotTo(HaveOccurred())
|
||||||
|
Expect(echo.Val()).To(Equal(msg))
|
||||||
|
}
|
||||||
}(i)
|
}(i)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should incr", func() {
|
It("should incr", func() {
|
||||||
var n = 10000
|
|
||||||
if testing.Short() {
|
|
||||||
n = 1000
|
|
||||||
}
|
|
||||||
|
|
||||||
key := "TestIncrFromGoroutines"
|
key := "TestIncrFromGoroutines"
|
||||||
wg := &sync.WaitGroup{}
|
wg := &sync.WaitGroup{}
|
||||||
wg.Add(n)
|
for i := 0; i < C; i++ {
|
||||||
for i := 0; i < n; i++ {
|
wg.Add(1)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer GinkgoRecover()
|
defer GinkgoRecover()
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
err := client.Incr(key).Err()
|
for j := 0; j < N; j++ {
|
||||||
Expect(err).NotTo(HaveOccurred())
|
err := client.Incr(key).Err()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
val, err := client.Get(key).Int64()
|
val, err := client.Get(key).Int64()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(val).To(Equal(int64(n)))
|
Expect(val).To(Equal(int64(C * N)))
|
||||||
})
|
})
|
||||||
|
|
||||||
})
|
})
|
||||||
|
|
|
@ -2245,53 +2245,49 @@ var _ = Describe("Commands", func() {
|
||||||
|
|
||||||
Describe("watch/unwatch", func() {
|
Describe("watch/unwatch", func() {
|
||||||
|
|
||||||
var safeIncr = func() ([]redis.Cmder, error) {
|
|
||||||
multi := client.Multi()
|
|
||||||
defer multi.Close()
|
|
||||||
|
|
||||||
watch := multi.Watch("key")
|
|
||||||
Expect(watch.Err()).NotTo(HaveOccurred())
|
|
||||||
Expect(watch.Val()).To(Equal("OK"))
|
|
||||||
|
|
||||||
get := multi.Get("key")
|
|
||||||
Expect(get.Err()).NotTo(HaveOccurred())
|
|
||||||
Expect(get.Val()).NotTo(Equal(redis.Nil))
|
|
||||||
|
|
||||||
v, err := strconv.ParseInt(get.Val(), 10, 64)
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
|
|
||||||
return multi.Exec(func() error {
|
|
||||||
multi.Set("key", strconv.FormatInt(v+1, 10), 0)
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
It("should WatchUnwatch", func() {
|
It("should WatchUnwatch", func() {
|
||||||
var n = 10000
|
var C, N = 10, 1000
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
n = 1000
|
N = 100
|
||||||
}
|
}
|
||||||
|
|
||||||
err := client.Set("key", "0", 0).Err()
|
err := client.Set("key", "0", 0).Err()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
wg := &sync.WaitGroup{}
|
wg := &sync.WaitGroup{}
|
||||||
for i := 0; i < n; i++ {
|
for i := 0; i < C; i++ {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer GinkgoRecover()
|
defer GinkgoRecover()
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
for {
|
multi := client.Multi()
|
||||||
cmds, err := safeIncr()
|
defer multi.Close()
|
||||||
|
|
||||||
|
for j := 0; j < N; j++ {
|
||||||
|
val, err := multi.Watch("key").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(val).To(Equal("OK"))
|
||||||
|
|
||||||
|
val, err = multi.Get("key").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(val).NotTo(Equal(redis.Nil))
|
||||||
|
|
||||||
|
num, err := strconv.ParseInt(val, 10, 64)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
cmds, err := multi.Exec(func() error {
|
||||||
|
multi.Set("key", strconv.FormatInt(num+1, 10), 0)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
if err == redis.TxFailedErr {
|
if err == redis.TxFailedErr {
|
||||||
|
j--
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(cmds).To(HaveLen(1))
|
Expect(cmds).To(HaveLen(1))
|
||||||
Expect(cmds[0].Err()).NotTo(HaveOccurred())
|
Expect(cmds[0].Err()).NotTo(HaveOccurred())
|
||||||
break
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
@ -2299,7 +2295,7 @@ var _ = Describe("Commands", func() {
|
||||||
|
|
||||||
val, err := client.Get("key").Int64()
|
val, err := client.Get("key").Int64()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(val).To(Equal(int64(n)))
|
Expect(val).To(Equal(int64(C * N)))
|
||||||
})
|
})
|
||||||
|
|
||||||
})
|
})
|
||||||
|
|
23
error.go
23
error.go
|
@ -4,6 +4,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Redis nil reply.
|
// Redis nil reply.
|
||||||
|
@ -30,3 +31,25 @@ func isNetworkError(err error) bool {
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func isMovedError(err error) (moved bool, ask bool, addr string) {
|
||||||
|
if _, ok := err.(redisError); !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
parts := strings.SplitN(err.Error(), " ", 3)
|
||||||
|
if len(parts) != 3 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
switch parts[0] {
|
||||||
|
case "MOVED":
|
||||||
|
moved = true
|
||||||
|
addr = parts[2]
|
||||||
|
case "ASK":
|
||||||
|
ask = true
|
||||||
|
addr = parts[2]
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
22
pipeline.go
22
pipeline.go
|
@ -54,14 +54,13 @@ func (c *Pipeline) Exec() ([]Cmder, error) {
|
||||||
if c.closed {
|
if c.closed {
|
||||||
return nil, errClosed
|
return nil, errClosed
|
||||||
}
|
}
|
||||||
|
if len(c.cmds) == 0 {
|
||||||
|
return []Cmder{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
cmds := c.cmds
|
cmds := c.cmds
|
||||||
c.cmds = make([]Cmder, 0, 0)
|
c.cmds = make([]Cmder, 0, 0)
|
||||||
|
|
||||||
if len(cmds) == 0 {
|
|
||||||
return []Cmder{}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
cn, err := c.client.conn()
|
cn, err := c.client.conn()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
setCmdsErr(cmds, err)
|
setCmdsErr(cmds, err)
|
||||||
|
@ -84,11 +83,16 @@ func (c *Pipeline) execCmds(cn *conn, cmds []Cmder) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
var firstCmdErr error
|
var firstCmdErr error
|
||||||
for _, cmd := range cmds {
|
for i, cmd := range cmds {
|
||||||
if err := cmd.parseReply(cn.rd); err != nil {
|
err := cmd.parseReply(cn.rd)
|
||||||
if firstCmdErr == nil {
|
if err == nil {
|
||||||
firstCmdErr = err
|
continue
|
||||||
}
|
}
|
||||||
|
if firstCmdErr == nil {
|
||||||
|
firstCmdErr = err
|
||||||
|
}
|
||||||
|
if isNetworkError(err) {
|
||||||
|
setCmdsErr(cmds[i:], err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -127,7 +127,7 @@ func TestGinkgoSuite(t *testing.T) {
|
||||||
|
|
||||||
func execCmd(name string, args ...string) (*os.Process, error) {
|
func execCmd(name string, args ...string) (*os.Process, error) {
|
||||||
cmd := exec.Command(name, args...)
|
cmd := exec.Command(name, args...)
|
||||||
if false {
|
if testing.Verbose() {
|
||||||
cmd.Stdout = os.Stdout
|
cmd.Stdout = os.Stdout
|
||||||
cmd.Stderr = os.Stderr
|
cmd.Stderr = os.Stderr
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue