diff --git a/example_test.go b/example_test.go index 291e5b4..97b9242 100644 --- a/example_test.go +++ b/example_test.go @@ -1,8 +1,8 @@ package redis_test import ( + "errors" "fmt" - "strconv" "sync" "time" @@ -278,45 +278,55 @@ func ExampleClient_TxPipeline() { } func ExampleClient_Watch() { - var incr func(string) error + const routineCount = 100 // Transactionally increments key using GET and SET commands. - incr = func(key string) error { - err := redisdb.Watch(func(tx *redis.Tx) error { - n, err := tx.Get(key).Int64() + increment := func(key string) error { + txf := func(tx *redis.Tx) error { + // get current value or zero + n, err := tx.Get(key).Int() if err != nil && err != redis.Nil { return err } + // actual opperation (local in optimistic lock) + n++ + + // runs only if the watched keys remain unchanged _, err = tx.Pipelined(func(pipe redis.Pipeliner) error { - pipe.Set(key, strconv.FormatInt(n+1, 10), 0) + // pipe handles the error case + pipe.Set(key, n, 0) return nil }) return err - }, key) - if err == redis.TxFailedErr { - return incr(key) } - return err + + for retries := routineCount; retries > 0; retries-- { + err := redisdb.Watch(txf, key) + if err != redis.TxFailedErr { + return err + } + // optimistic lock lost + } + return errors.New("increment reached maximum number of retries") } var wg sync.WaitGroup - for i := 0; i < 100; i++ { - wg.Add(1) + wg.Add(routineCount) + for i := 0; i < routineCount; i++ { go func() { defer wg.Done() - err := incr("counter3") - if err != nil { - panic(err) + if err := increment("counter3"); err != nil { + fmt.Println("increment error:", err) } }() } wg.Wait() - n, err := redisdb.Get("counter3").Int64() - fmt.Println(n, err) - // Output: 100 + n, err := redisdb.Get("counter3").Int() + fmt.Println("ended with", n, err) + // Output: ended with 100 } func ExamplePubSub() {