diff --git a/cluster_commands.go b/cluster_commands.go index 336ea98..085bce8 100644 --- a/cluster_commands.go +++ b/cluster_commands.go @@ -8,55 +8,63 @@ import ( func (c *ClusterClient) DBSize(ctx context.Context) *IntCmd { cmd := NewIntCmd(ctx, "dbsize") - var size int64 - err := c.ForEachMaster(ctx, func(ctx context.Context, master *Client) error { - n, err := master.DBSize(ctx).Result() + _ = c.hooks.process(ctx, cmd, func(ctx context.Context, _ Cmder) error { + var size int64 + err := c.ForEachMaster(ctx, func(ctx context.Context, master *Client) error { + n, err := master.DBSize(ctx).Result() + if err != nil { + return err + } + atomic.AddInt64(&size, n) + return nil + }) if err != nil { - return err + cmd.SetErr(err) + } else { + cmd.val = size } - atomic.AddInt64(&size, n) return nil }) - if err != nil { - cmd.SetErr(err) - return cmd - } - cmd.val = size return cmd } func (c *ClusterClient) ScriptLoad(ctx context.Context, script string) *StringCmd { cmd := NewStringCmd(ctx, "script", "load", script) - mu := &sync.Mutex{} - err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error { - val, err := shard.ScriptLoad(ctx, script).Result() + _ = c.hooks.process(ctx, cmd, func(ctx context.Context, _ Cmder) error { + mu := &sync.Mutex{} + err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error { + val, err := shard.ScriptLoad(ctx, script).Result() + if err != nil { + return err + } + + mu.Lock() + if cmd.Val() == "" { + cmd.val = val + } + mu.Unlock() + + return nil + }) if err != nil { - return err + cmd.SetErr(err) } - - mu.Lock() - if cmd.Val() == "" { - cmd.val = val - } - mu.Unlock() - return nil }) - if err != nil { - cmd.SetErr(err) - } - return cmd } func (c *ClusterClient) ScriptFlush(ctx context.Context) *StatusCmd { cmd := NewStatusCmd(ctx, "script", "flush") - _ = c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error { - shard.ScriptFlush(ctx) - + _ = c.hooks.process(ctx, cmd, func(ctx context.Context, _ Cmder) error { + err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error { + return shard.ScriptFlush(ctx).Err() + }) + if err != nil { + cmd.SetErr(err) + } return nil }) - return cmd } @@ -74,26 +82,28 @@ func (c *ClusterClient) ScriptExists(ctx context.Context, hashes ...string) *Boo result[i] = true } - mu := &sync.Mutex{} - err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error { - val, err := shard.ScriptExists(ctx, hashes...).Result() + _ = c.hooks.process(ctx, cmd, func(ctx context.Context, _ Cmder) error { + mu := &sync.Mutex{} + err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error { + val, err := shard.ScriptExists(ctx, hashes...).Result() + if err != nil { + return err + } + + mu.Lock() + for i, v := range val { + result[i] = result[i] && v + } + mu.Unlock() + + return nil + }) if err != nil { - return err + cmd.SetErr(err) + } else { + cmd.val = result } - - mu.Lock() - for i, v := range val { - result[i] = result[i] && v - } - mu.Unlock() - return nil }) - if err != nil { - cmd.SetErr(err) - } - - cmd.val = result - return cmd }