forked from mirror/redis
Merge pull request #939 from pascaldekloe/master
FIX: WATCH example stack protection (with retry count)
This commit is contained in:
commit
c7d2ae66a0
|
@ -1,8 +1,8 @@
|
||||||
package redis_test
|
package redis_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -278,45 +278,55 @@ func ExampleClient_TxPipeline() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func ExampleClient_Watch() {
|
func ExampleClient_Watch() {
|
||||||
var incr func(string) error
|
const routineCount = 100
|
||||||
|
|
||||||
// Transactionally increments key using GET and SET commands.
|
// Transactionally increments key using GET and SET commands.
|
||||||
incr = func(key string) error {
|
increment := func(key string) error {
|
||||||
err := redisdb.Watch(func(tx *redis.Tx) error {
|
txf := func(tx *redis.Tx) error {
|
||||||
n, err := tx.Get(key).Int64()
|
// get current value or zero
|
||||||
|
n, err := tx.Get(key).Int()
|
||||||
if err != nil && err != redis.Nil {
|
if err != nil && err != redis.Nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// actual opperation (local in optimistic lock)
|
||||||
|
n++
|
||||||
|
|
||||||
|
// runs only if the watched keys remain unchanged
|
||||||
_, err = tx.Pipelined(func(pipe redis.Pipeliner) error {
|
_, err = tx.Pipelined(func(pipe redis.Pipeliner) error {
|
||||||
pipe.Set(key, strconv.FormatInt(n+1, 10), 0)
|
// pipe handles the error case
|
||||||
|
pipe.Set(key, n, 0)
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
return err
|
return err
|
||||||
}, key)
|
|
||||||
if err == redis.TxFailedErr {
|
|
||||||
return incr(key)
|
|
||||||
}
|
}
|
||||||
return err
|
|
||||||
|
for retries := routineCount; retries > 0; retries-- {
|
||||||
|
err := redisdb.Watch(txf, key)
|
||||||
|
if err != redis.TxFailedErr {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// optimistic lock lost
|
||||||
|
}
|
||||||
|
return errors.New("increment reached maximum number of retries")
|
||||||
}
|
}
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
for i := 0; i < 100; i++ {
|
wg.Add(routineCount)
|
||||||
wg.Add(1)
|
for i := 0; i < routineCount; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
err := incr("counter3")
|
if err := increment("counter3"); err != nil {
|
||||||
if err != nil {
|
fmt.Println("increment error:", err)
|
||||||
panic(err)
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
n, err := redisdb.Get("counter3").Int64()
|
n, err := redisdb.Get("counter3").Int()
|
||||||
fmt.Println(n, err)
|
fmt.Println("ended with", n, err)
|
||||||
// Output: 100 <nil>
|
// Output: ended with 100 <nil>
|
||||||
}
|
}
|
||||||
|
|
||||||
func ExamplePubSub() {
|
func ExamplePubSub() {
|
||||||
|
|
Loading…
Reference in New Issue