diff --git a/CHANGELOG.md b/CHANGELOG.md index 26609ced..c8b904ee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ - Added Options.MinIdleConns. - Added Options.MaxConnAge. - PoolStats.FreeConns is renamed to PoolStats.IdleConns. +- Add Client.Do to simplify creating custom commands. ## v6.13 diff --git a/cluster.go b/cluster.go index 863e69a2..e68cb7ee 100644 --- a/cluster.go +++ b/cluster.go @@ -898,6 +898,13 @@ func (c *ClusterClient) Close() error { return c.nodes.Close() } +// Do creates a Cmd from the args and processes the cmd. +func (c *ClusterClient) Do(args ...interface{}) *Cmd { + cmd := NewCmd(args...) + c.Process(cmd) + return cmd +} + func (c *ClusterClient) WrapProcess( fn func(oldProcess func(Cmder) error) func(Cmder) error, ) { @@ -1242,7 +1249,7 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error { cmdsMap = failedCmds } - return firstCmdsErr(cmds) + return cmdsFirstErr(cmds) } func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, error) { @@ -1424,7 +1431,7 @@ func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error { } } - return firstCmdsErr(cmds) + return cmdsFirstErr(cmds) } func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder { diff --git a/command.go b/command.go index 8928b8a1..14494cd9 100644 --- a/command.go +++ b/command.go @@ -25,7 +25,6 @@ type Cmder interface { readTimeout() *time.Duration Err() error - fmt.Stringer } func setCmdsErr(cmds []Cmder, e error) { @@ -36,7 +35,7 @@ func setCmdsErr(cmds []Cmder, e error) { } } -func firstCmdsErr(cmds []Cmder) error { +func cmdsFirstErr(cmds []Cmder) error { for _, cmd := range cmds { if err := cmd.Err(); err != nil { return err @@ -167,8 +166,77 @@ func (cmd *Cmd) Result() (interface{}, error) { return cmd.val, cmd.err } -func (cmd *Cmd) String() string { - return cmdString(cmd, cmd.val) +func (cmd *Cmd) String() (string, error) { + if cmd.err != nil { + return "", cmd.err + } + switch val := cmd.val.(type) { + case string: + return val, nil + default: + err := fmt.Errorf("redis: unexpected type=%T for String", val) + return "", err + } +} + +func (cmd *Cmd) Int64() (int64, error) { + if cmd.err != nil { + return 0, cmd.err + } + switch val := cmd.val.(type) { + case int64: + return val, nil + case string: + return strconv.ParseInt(val, 10, 64) + default: + err := fmt.Errorf("redis: unexpected type=%T for Int64", val) + return 0, err + } +} + +func (cmd *Cmd) Uint64() (uint64, error) { + if cmd.err != nil { + return 0, cmd.err + } + switch val := cmd.val.(type) { + case int64: + return uint64(val), nil + case string: + return strconv.ParseUint(val, 10, 64) + default: + err := fmt.Errorf("redis: unexpected type=%T for Uint64", val) + return 0, err + } +} + +func (cmd *Cmd) Float64() (float64, error) { + if cmd.err != nil { + return 0, cmd.err + } + switch val := cmd.val.(type) { + case int64: + return float64(val), nil + case string: + return strconv.ParseFloat(val, 64) + default: + err := fmt.Errorf("redis: unexpected type=%T for Float64", val) + return 0, err + } +} + +func (cmd *Cmd) Bool() (bool, error) { + if cmd.err != nil { + return false, cmd.err + } + switch val := cmd.val.(type) { + case int64: + return val != 0, nil + case string: + return strconv.ParseBool(val) + default: + err := fmt.Errorf("redis: unexpected type=%T for Bool", val) + return false, err + } } func (cmd *Cmd) readReply(cn *pool.Conn) error { diff --git a/example_instrumentation_test.go b/example_instrumentation_test.go index 82f655f2..f9444a6e 100644 --- a/example_instrumentation_test.go +++ b/example_instrumentation_test.go @@ -7,10 +7,10 @@ import ( ) func Example_instrumentation() { - cl := redis.NewClient(&redis.Options{ + redisdb := redis.NewClient(&redis.Options{ Addr: ":6379", }) - cl.WrapProcess(func(old func(cmd redis.Cmder) error) func(cmd redis.Cmder) error { + redisdb.WrapProcess(func(old func(cmd redis.Cmder) error) func(cmd redis.Cmder) error { return func(cmd redis.Cmder) error { fmt.Printf("starting processing: <%s>\n", cmd) err := old(cmd) @@ -19,17 +19,17 @@ func Example_instrumentation() { } }) - cl.Ping() + redisdb.Ping() // Output: starting processing: // finished processing: } func ExamplePipeline_instrumentation() { - client := redis.NewClient(&redis.Options{ + redisdb := redis.NewClient(&redis.Options{ Addr: ":6379", }) - client.WrapProcessPipeline(func(old func([]redis.Cmder) error) func([]redis.Cmder) error { + redisdb.WrapProcessPipeline(func(old func([]redis.Cmder) error) func([]redis.Cmder) error { return func(cmds []redis.Cmder) error { fmt.Printf("pipeline starting processing: %v\n", cmds) err := old(cmds) @@ -38,7 +38,7 @@ func ExamplePipeline_instrumentation() { } }) - client.Pipelined(func(pipe redis.Pipeliner) error { + redisdb.Pipelined(func(pipe redis.Pipeliner) error { pipe.Ping() pipe.Ping() return nil diff --git a/example_test.go b/example_test.go index 7d8cc52e..2a8a22b4 100644 --- a/example_test.go +++ b/example_test.go @@ -9,10 +9,10 @@ import ( "github.com/go-redis/redis" ) -var client *redis.Client +var redisdb *redis.Client func init() { - client = redis.NewClient(&redis.Options{ + redisdb = redis.NewClient(&redis.Options{ Addr: ":6379", DialTimeout: 10 * time.Second, ReadTimeout: 30 * time.Second, @@ -20,17 +20,17 @@ func init() { PoolSize: 10, PoolTimeout: 30 * time.Second, }) - client.FlushDB() + redisdb.FlushDB() } func ExampleNewClient() { - client := redis.NewClient(&redis.Options{ + redisdb := redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "", // no password set DB: 0, // use default DB }) - pong, err := client.Ping().Result() + pong, err := redisdb.Ping().Result() fmt.Println(pong, err) // Output: PONG } @@ -55,20 +55,20 @@ func ExampleParseURL() { func ExampleNewFailoverClient() { // See http://redis.io/topics/sentinel for instructions how to // setup Redis Sentinel. - client := redis.NewFailoverClient(&redis.FailoverOptions{ + redisdb := redis.NewFailoverClient(&redis.FailoverOptions{ MasterName: "master", SentinelAddrs: []string{":26379"}, }) - client.Ping() + redisdb.Ping() } func ExampleNewClusterClient() { // See http://redis.io/topics/cluster-tutorial for instructions // how to setup Redis Cluster. - client := redis.NewClusterClient(&redis.ClusterOptions{ + redisdb := redis.NewClusterClient(&redis.ClusterOptions{ Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"}, }) - client.Ping() + redisdb.Ping() } // Following example creates a cluster from 2 master nodes and 2 slave nodes @@ -103,44 +103,44 @@ func ExampleNewClusterClient_manualSetup() { return slots, nil } - client := redis.NewClusterClient(&redis.ClusterOptions{ + redisdb := redis.NewClusterClient(&redis.ClusterOptions{ ClusterSlots: clusterSlots, RouteRandomly: true, }) - client.Ping() + redisdb.Ping() // ReloadState reloads cluster state. It calls ClusterSlots func // to get cluster slots information. - err := client.ReloadState() + err := redisdb.ReloadState() if err != nil { panic(err) } } func ExampleNewRing() { - client := redis.NewRing(&redis.RingOptions{ + redisdb := redis.NewRing(&redis.RingOptions{ Addrs: map[string]string{ "shard1": ":7000", "shard2": ":7001", "shard3": ":7002", }, }) - client.Ping() + redisdb.Ping() } func ExampleClient() { - err := client.Set("key", "value", 0).Err() + err := redisdb.Set("key", "value", 0).Err() if err != nil { panic(err) } - val, err := client.Get("key").Result() + val, err := redisdb.Get("key").Result() if err != nil { panic(err) } fmt.Println("key", val) - val2, err := client.Get("missing_key").Result() + val2, err := redisdb.Get("missing_key").Result() if err == redis.Nil { fmt.Println("missing_key does not exist") } else if err != nil { @@ -155,20 +155,20 @@ func ExampleClient() { func ExampleClient_Set() { // Last argument is expiration. Zero means the key has no // expiration time. - err := client.Set("key", "value", 0).Err() + err := redisdb.Set("key", "value", 0).Err() if err != nil { panic(err) } // key2 will expire in an hour. - err = client.Set("key2", "value", time.Hour).Err() + err = redisdb.Set("key2", "value", time.Hour).Err() if err != nil { panic(err) } } func ExampleClient_Incr() { - result, err := client.Incr("counter").Result() + result, err := redisdb.Incr("counter").Result() if err != nil { panic(err) } @@ -178,12 +178,12 @@ func ExampleClient_Incr() { } func ExampleClient_BLPop() { - if err := client.RPush("queue", "message").Err(); err != nil { + if err := redisdb.RPush("queue", "message").Err(); err != nil { panic(err) } - // use `client.BLPop(0, "queue")` for infinite waiting time - result, err := client.BLPop(1*time.Second, "queue").Result() + // use `redisdb.BLPop(0, "queue")` for infinite waiting time + result, err := redisdb.BLPop(1*time.Second, "queue").Result() if err != nil { panic(err) } @@ -193,9 +193,9 @@ func ExampleClient_BLPop() { } func ExampleClient_Scan() { - client.FlushDB() + redisdb.FlushDB() for i := 0; i < 33; i++ { - err := client.Set(fmt.Sprintf("key%d", i), "value", 0).Err() + err := redisdb.Set(fmt.Sprintf("key%d", i), "value", 0).Err() if err != nil { panic(err) } @@ -206,7 +206,7 @@ func ExampleClient_Scan() { for { var keys []string var err error - keys, cursor, err = client.Scan(cursor, "", 10).Result() + keys, cursor, err = redisdb.Scan(cursor, "", 10).Result() if err != nil { panic(err) } @@ -222,7 +222,7 @@ func ExampleClient_Scan() { func ExampleClient_Pipelined() { var incr *redis.IntCmd - _, err := client.Pipelined(func(pipe redis.Pipeliner) error { + _, err := redisdb.Pipelined(func(pipe redis.Pipeliner) error { incr = pipe.Incr("pipelined_counter") pipe.Expire("pipelined_counter", time.Hour) return nil @@ -232,7 +232,7 @@ func ExampleClient_Pipelined() { } func ExampleClient_Pipeline() { - pipe := client.Pipeline() + pipe := redisdb.Pipeline() incr := pipe.Incr("pipeline_counter") pipe.Expire("pipeline_counter", time.Hour) @@ -242,7 +242,7 @@ func ExampleClient_Pipeline() { // INCR pipeline_counter // EXPIRE pipeline_counts 3600 // - // using one client-server roundtrip. + // using one redisdb-server roundtrip. _, err := pipe.Exec() fmt.Println(incr.Val(), err) // Output: 1 @@ -250,7 +250,7 @@ func ExampleClient_Pipeline() { func ExampleClient_TxPipelined() { var incr *redis.IntCmd - _, err := client.TxPipelined(func(pipe redis.Pipeliner) error { + _, err := redisdb.TxPipelined(func(pipe redis.Pipeliner) error { incr = pipe.Incr("tx_pipelined_counter") pipe.Expire("tx_pipelined_counter", time.Hour) return nil @@ -260,7 +260,7 @@ func ExampleClient_TxPipelined() { } func ExampleClient_TxPipeline() { - pipe := client.TxPipeline() + pipe := redisdb.TxPipeline() incr := pipe.Incr("tx_pipeline_counter") pipe.Expire("tx_pipeline_counter", time.Hour) @@ -272,7 +272,7 @@ func ExampleClient_TxPipeline() { // EXPIRE pipeline_counts 3600 // EXEC // - // using one client-server roundtrip. + // using one redisdb-server roundtrip. _, err := pipe.Exec() fmt.Println(incr.Val(), err) // Output: 1 @@ -283,7 +283,7 @@ func ExampleClient_Watch() { // Transactionally increments key using GET and SET commands. incr = func(key string) error { - err := client.Watch(func(tx *redis.Tx) error { + err := redisdb.Watch(func(tx *redis.Tx) error { n, err := tx.Get(key).Int64() if err != nil && err != redis.Nil { return err @@ -315,13 +315,13 @@ func ExampleClient_Watch() { } wg.Wait() - n, err := client.Get("counter3").Int64() + n, err := redisdb.Get("counter3").Int64() fmt.Println(n, err) // Output: 100 } func ExamplePubSub() { - pubsub := client.Subscribe("mychannel1") + pubsub := redisdb.Subscribe("mychannel1") // Wait for confirmation that subscription is created before publishing anything. _, err := pubsub.Receive() @@ -333,7 +333,7 @@ func ExamplePubSub() { ch := pubsub.Channel() // Publish a message. - err = client.Publish("mychannel1", "hello").Err() + err = redisdb.Publish("mychannel1", "hello").Err() if err != nil { panic(err) } @@ -356,7 +356,7 @@ func ExamplePubSub() { } func ExamplePubSub_Receive() { - pubsub := client.Subscribe("mychannel2") + pubsub := redisdb.Subscribe("mychannel2") defer pubsub.Close() for i := 0; i < 2; i++ { @@ -370,7 +370,7 @@ func ExamplePubSub_Receive() { case *redis.Subscription: fmt.Println("subscribed to", msg.Channel) - _, err := client.Publish("mychannel2", "hello").Result() + _, err := redisdb.Publish("mychannel2", "hello").Result() if err != nil { panic(err) } @@ -381,7 +381,7 @@ func ExamplePubSub_Receive() { } } - // sent message to 1 client + // sent message to 1 redisdb // received hello from mychannel2 } @@ -393,15 +393,15 @@ func ExampleScript() { return false `) - n, err := IncrByXX.Run(client, []string{"xx_counter"}, 2).Result() + n, err := IncrByXX.Run(redisdb, []string{"xx_counter"}, 2).Result() fmt.Println(n, err) - err = client.Set("xx_counter", "40", 0).Err() + err = redisdb.Set("xx_counter", "40", 0).Err() if err != nil { panic(err) } - n, err = IncrByXX.Run(client, []string{"xx_counter"}, 2).Result() + n, err = IncrByXX.Run(redisdb, []string{"xx_counter"}, 2).Result() fmt.Println(n, err) // Output: redis: nil @@ -409,19 +409,25 @@ func ExampleScript() { } func Example_customCommand() { - Get := func(client *redis.Client, key string) *redis.StringCmd { + Get := func(redisdb *redis.Client, key string) *redis.StringCmd { cmd := redis.NewStringCmd("get", key) - client.Process(cmd) + redisdb.Process(cmd) return cmd } - v, err := Get(client, "key_does_not_exist").Result() + v, err := Get(redisdb, "key_does_not_exist").Result() + fmt.Printf("%q %s", v, err) + // Output: "" redis: nil +} + +func Example_customCommand2() { + v, err := redisdb.Do("get", "key_does_not_exist").String() fmt.Printf("%q %s", v, err) // Output: "" redis: nil } func ExampleScanIterator() { - iter := client.Scan(0, "", 0).Iterator() + iter := redisdb.Scan(0, "", 0).Iterator() for iter.Next() { fmt.Println(iter.Val()) } @@ -431,7 +437,7 @@ func ExampleScanIterator() { } func ExampleScanCmd_Iterator() { - iter := client.Scan(0, "", 0).Iterator() + iter := redisdb.Scan(0, "", 0).Iterator() for iter.Next() { fmt.Println(iter.Val()) } @@ -441,29 +447,29 @@ func ExampleScanCmd_Iterator() { } func ExampleNewUniversalClient_simple() { - client := redis.NewUniversalClient(&redis.UniversalOptions{ + redisdb := redis.NewUniversalClient(&redis.UniversalOptions{ Addrs: []string{":6379"}, }) - defer client.Close() + defer redisdb.Close() - client.Ping() + redisdb.Ping() } func ExampleNewUniversalClient_failover() { - client := redis.NewUniversalClient(&redis.UniversalOptions{ + redisdb := redis.NewUniversalClient(&redis.UniversalOptions{ MasterName: "master", Addrs: []string{":26379"}, }) - defer client.Close() + defer redisdb.Close() - client.Ping() + redisdb.Ping() } func ExampleNewUniversalClient_cluster() { - client := redis.NewUniversalClient(&redis.UniversalOptions{ + redisdb := redis.NewUniversalClient(&redis.UniversalOptions{ Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"}, }) - defer client.Close() + defer redisdb.Close() - client.Ping() + redisdb.Ping() } diff --git a/redis.go b/redis.go index 4747ff97..2913938e 100644 --- a/redis.go +++ b/redis.go @@ -123,8 +123,17 @@ func (c *baseClient) initConn(cn *pool.Conn) error { return nil } +// Do creates a Cmd from the args and processes the cmd. +func (c *baseClient) Do(args ...interface{}) *Cmd { + cmd := NewCmd(args...) + c.Process(cmd) + return cmd +} + // WrapProcess wraps function that processes Redis commands. -func (c *baseClient) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) { +func (c *baseClient) WrapProcess( + fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error, +) { c.process = fn(c.process) } @@ -243,7 +252,7 @@ func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) e break } } - return firstCmdsErr(cmds) + return cmdsFirstErr(cmds) } func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) { diff --git a/ring.go b/ring.go index 74862026..3ded2806 100644 --- a/ring.go +++ b/ring.go @@ -516,7 +516,16 @@ func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) { return c.shards.GetByKey(firstKey) } -func (c *Ring) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) { +// Do creates a Cmd from the args and processes the cmd. +func (c *Ring) Do(args ...interface{}) *Cmd { + cmd := NewCmd(args...) + c.Process(cmd) + return cmd +} + +func (c *Ring) WrapProcess( + fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error, +) { c.ForEachShard(func(c *Client) error { c.WrapProcess(fn) return nil @@ -602,7 +611,7 @@ func (c *Ring) defaultProcessPipeline(cmds []Cmder) error { cmdsMap = failedCmdsMap } - return firstCmdsErr(cmds) + return cmdsFirstErr(cmds) } func (c *Ring) TxPipeline() Pipeliner { diff --git a/sentinel.go b/sentinel.go index 12c29a71..ef0e4701 100644 --- a/sentinel.go +++ b/sentinel.go @@ -92,7 +92,7 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client { }, } c.baseClient.init() - c.setProcessor(c.Process) + c.cmdable.setProcessor(c.Process) return &c }