Fix BRPop infinite blocking (#316)

This commit is contained in:
Muhamad Azmy 2017-08-21 05:14:17 +02:00 committed by siddontang
parent f49ad4d5ed
commit 57a07edd1a
6 changed files with 191 additions and 208 deletions

View File

@ -36,103 +36,6 @@
// Contexts. // Contexts.
package context // import "golang.org/x/net/context" package context // import "golang.org/x/net/context"
import "time"
// A Context carries a deadline, a cancelation signal, and other values across
// API boundaries.
//
// Context's methods may be called by multiple goroutines simultaneously.
type Context interface {
// Deadline returns the time when work done on behalf of this context
// should be canceled. Deadline returns ok==false when no deadline is
// set. Successive calls to Deadline return the same results.
Deadline() (deadline time.Time, ok bool)
// Done returns a channel that's closed when work done on behalf of this
// context should be canceled. Done may return nil if this context can
// never be canceled. Successive calls to Done return the same value.
//
// WithCancel arranges for Done to be closed when cancel is called;
// WithDeadline arranges for Done to be closed when the deadline
// expires; WithTimeout arranges for Done to be closed when the timeout
// elapses.
//
// Done is provided for use in select statements:
//
// // Stream generates values with DoSomething and sends them to out
// // until DoSomething returns an error or ctx.Done is closed.
// func Stream(ctx context.Context, out chan<- Value) error {
// for {
// v, err := DoSomething(ctx)
// if err != nil {
// return err
// }
// select {
// case <-ctx.Done():
// return ctx.Err()
// case out <- v:
// }
// }
// }
//
// See http://blog.golang.org/pipelines for more examples of how to use
// a Done channel for cancelation.
Done() <-chan struct{}
// Err returns a non-nil error value after Done is closed. Err returns
// Canceled if the context was canceled or DeadlineExceeded if the
// context's deadline passed. No other values for Err are defined.
// After Done is closed, successive calls to Err return the same value.
Err() error
// Value returns the value associated with this context for key, or nil
// if no value is associated with key. Successive calls to Value with
// the same key returns the same result.
//
// Use context values only for request-scoped data that transits
// processes and API boundaries, not for passing optional parameters to
// functions.
//
// A key identifies a specific value in a Context. Functions that wish
// to store values in Context typically allocate a key in a global
// variable then use that key as the argument to context.WithValue and
// Context.Value. A key can be any type that supports equality;
// packages should define keys as an unexported type to avoid
// collisions.
//
// Packages that define a Context key should provide type-safe accessors
// for the values stores using that key:
//
// // Package user defines a User type that's stored in Contexts.
// package user
//
// import "golang.org/x/net/context"
//
// // User is the type of value stored in the Contexts.
// type User struct {...}
//
// // key is an unexported type for keys defined in this package.
// // This prevents collisions with keys defined in other packages.
// type key int
//
// // userKey is the key for user.User values in Contexts. It is
// // unexported; clients use user.NewContext and user.FromContext
// // instead of using this key directly.
// var userKey key = 0
//
// // NewContext returns a new Context that carries value u.
// func NewContext(ctx context.Context, u *User) context.Context {
// return context.WithValue(ctx, userKey, u)
// }
//
// // FromContext returns the User value stored in ctx, if any.
// func FromContext(ctx context.Context) (*User, bool) {
// u, ok := ctx.Value(userKey).(*User)
// return u, ok
// }
Value(key interface{}) interface{}
}
// Background returns a non-nil, empty Context. It is never canceled, has no // Background returns a non-nil, empty Context. It is never canceled, has no
// values, and has no deadline. It is typically used by the main function, // values, and has no deadline. It is typically used by the main function,
// initialization, and tests, and as the top-level Context for incoming // initialization, and tests, and as the top-level Context for incoming
@ -149,8 +52,3 @@ func Background() Context {
func TODO() Context { func TODO() Context {
return todo return todo
} }
// A CancelFunc tells an operation to abandon its work.
// A CancelFunc does not wait for the work to stop.
// After the first call, subsequent calls to a CancelFunc do nothing.
type CancelFunc func()

20
_vendor/vendor/golang.org/x/net/context/go19.go generated vendored Normal file
View File

@ -0,0 +1,20 @@
// Copyright 2017 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build go1.9
package context
import "context" // standard library's context, as of Go 1.7
// A Context carries a deadline, a cancelation signal, and other values across
// API boundaries.
//
// Context's methods may be called by multiple goroutines simultaneously.
type Context = context.Context
// A CancelFunc tells an operation to abandon its work.
// A CancelFunc does not wait for the work to stop.
// After the first call, subsequent calls to a CancelFunc do nothing.
type CancelFunc = context.CancelFunc

109
_vendor/vendor/golang.org/x/net/context/pre_go19.go generated vendored Normal file
View File

@ -0,0 +1,109 @@
// Copyright 2014 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build !go1.9
package context
import "time"
// A Context carries a deadline, a cancelation signal, and other values across
// API boundaries.
//
// Context's methods may be called by multiple goroutines simultaneously.
type Context interface {
// Deadline returns the time when work done on behalf of this context
// should be canceled. Deadline returns ok==false when no deadline is
// set. Successive calls to Deadline return the same results.
Deadline() (deadline time.Time, ok bool)
// Done returns a channel that's closed when work done on behalf of this
// context should be canceled. Done may return nil if this context can
// never be canceled. Successive calls to Done return the same value.
//
// WithCancel arranges for Done to be closed when cancel is called;
// WithDeadline arranges for Done to be closed when the deadline
// expires; WithTimeout arranges for Done to be closed when the timeout
// elapses.
//
// Done is provided for use in select statements:
//
// // Stream generates values with DoSomething and sends them to out
// // until DoSomething returns an error or ctx.Done is closed.
// func Stream(ctx context.Context, out chan<- Value) error {
// for {
// v, err := DoSomething(ctx)
// if err != nil {
// return err
// }
// select {
// case <-ctx.Done():
// return ctx.Err()
// case out <- v:
// }
// }
// }
//
// See http://blog.golang.org/pipelines for more examples of how to use
// a Done channel for cancelation.
Done() <-chan struct{}
// Err returns a non-nil error value after Done is closed. Err returns
// Canceled if the context was canceled or DeadlineExceeded if the
// context's deadline passed. No other values for Err are defined.
// After Done is closed, successive calls to Err return the same value.
Err() error
// Value returns the value associated with this context for key, or nil
// if no value is associated with key. Successive calls to Value with
// the same key returns the same result.
//
// Use context values only for request-scoped data that transits
// processes and API boundaries, not for passing optional parameters to
// functions.
//
// A key identifies a specific value in a Context. Functions that wish
// to store values in Context typically allocate a key in a global
// variable then use that key as the argument to context.WithValue and
// Context.Value. A key can be any type that supports equality;
// packages should define keys as an unexported type to avoid
// collisions.
//
// Packages that define a Context key should provide type-safe accessors
// for the values stores using that key:
//
// // Package user defines a User type that's stored in Contexts.
// package user
//
// import "golang.org/x/net/context"
//
// // User is the type of value stored in the Contexts.
// type User struct {...}
//
// // key is an unexported type for keys defined in this package.
// // This prevents collisions with keys defined in other packages.
// type key int
//
// // userKey is the key for user.User values in Contexts. It is
// // unexported; clients use user.NewContext and user.FromContext
// // instead of using this key directly.
// var userKey key = 0
//
// // NewContext returns a new Context that carries value u.
// func NewContext(ctx context.Context, u *User) context.Context {
// return context.WithValue(ctx, userKey, u)
// }
//
// // FromContext returns the User value stored in ctx, if any.
// func FromContext(ctx context.Context) (*User, bool) {
// u, ok := ctx.Value(userKey).(*User)
// return u, ok
// }
Value(key interface{}) interface{}
}
// A CancelFunc tells an operation to abandon its work.
// A CancelFunc does not wait for the work to stop.
// After the first call, subsequent calls to a CancelFunc do nothing.
type CancelFunc func()

4
glide.lock generated
View File

@ -1,5 +1,5 @@
hash: 83229e7ecea008add55b2547fb28a23d247a4740fc30b4659f58bf5d495e3584 hash: 83229e7ecea008add55b2547fb28a23d247a4740fc30b4659f58bf5d495e3584
updated: 2017-06-08T13:54:52.259026852+02:00 updated: 2017-08-16T11:06:05.978126639+02:00
imports: imports:
- name: github.com/cupcake/rdb - name: github.com/cupcake/rdb
version: 43ba34106c765f2111c0dc7b74cdf8ee437411e0 version: 43ba34106c765f2111c0dc7b74cdf8ee437411e0
@ -59,7 +59,7 @@ imports:
- parse - parse
- pm - pm
- name: golang.org/x/net - name: golang.org/x/net
version: 59a0b19b5533c7977ddeb86b017bf507ed407b12 version: 1c05540f6879653db88113bc4a2b70aec4bd491f
subpackages: subpackages:
- context - context
testImports: [] testImports: []

View File

@ -11,6 +11,7 @@ import (
"github.com/siddontang/go/log" "github.com/siddontang/go/log"
"github.com/siddontang/go/num" "github.com/siddontang/go/num"
"github.com/siddontang/ledisdb/store" "github.com/siddontang/ledisdb/store"
"golang.org/x/net/context"
) )
const ( const (
@ -159,7 +160,7 @@ func (db *DB) lpush(key []byte, whereSeq int32, args ...[]byte) (int64, error) {
err = t.Commit() err = t.Commit()
if err == nil { if err == nil {
db.lSignalAsReady(key, pushCnt) db.lSignalAsReady(key)
} }
return int64(size) + int64(pushCnt), err return int64(size) + int64(pushCnt), err
@ -692,72 +693,43 @@ func (db *DB) LKeyExists(key []byte) (int64, error) {
} }
func (db *DB) lblockPop(keys [][]byte, whereSeq int32, timeout time.Duration) ([]interface{}, error) { func (db *DB) lblockPop(keys [][]byte, whereSeq int32, timeout time.Duration) ([]interface{}, error) {
ch := make(chan []byte) for {
var ctx context.Context
var cancel context.CancelFunc
if timeout > 0 {
ctx, cancel = context.WithTimeout(context.Background(), timeout)
} else {
ctx, cancel = context.WithCancel(context.Background())
}
bkeys := [][]byte{}
for _, key := range keys { for _, key := range keys {
v, err := db.lpop(key, whereSeq) v, err := db.lbkeys.popOrWait(db, key, whereSeq, cancel)
if err != nil { if err != nil {
cancel()
return nil, err return nil, err
} else if v != nil { } else if v != nil {
return []interface{}{key, v}, nil cancel()
} else {
db.lbkeys.wait(key, ch)
bkeys = append(bkeys, key)
}
}
if len(bkeys) == 0 {
return nil, nil
}
defer func() {
for _, key := range bkeys {
db.lbkeys.unwait(key, ch)
}
}()
deadT := time.Now().Add(timeout)
for {
if timeout == 0 {
key := <-ch
if v, err := db.lpop(key, whereSeq); err != nil {
return nil, err
} else if v == nil {
continue
} else {
return []interface{}{key, v}, nil return []interface{}{key, v}, nil
} }
} else {
d := deadT.Sub(time.Now())
if d < 0 {
return nil, nil
} }
select { //blocking wait
case key := <-ch: <-ctx.Done()
if v, err := db.lpop(key, whereSeq); err != nil { cancel()
return nil, err
} else if v == nil { //if ctx.Err() is a deadline exceeded (timeout) we return
db.lbkeys.wait(key, ch) //otherwise we try to pop one of the keys again.
continue if ctx.Err() == context.DeadlineExceeded {
} else {
return []interface{}{key, v}, nil
}
case <-time.After(d):
return nil, nil return nil, nil
} }
} }
}
} }
func (db *DB) lSignalAsReady(key []byte, num int) { func (db *DB) lSignalAsReady(key []byte) {
db.lbkeys.signal(key, num) db.lbkeys.signal(key)
} }
type lbKeyCh chan<- []byte
type lBlockKeys struct { type lBlockKeys struct {
sync.Mutex sync.Mutex
@ -771,40 +743,32 @@ func newLBlockKeys() *lBlockKeys {
return l return l
} }
func (l *lBlockKeys) signal(key []byte, num int) { func (l *lBlockKeys) signal(key []byte) {
l.Lock() l.Lock()
defer l.Unlock() defer l.Unlock()
s := hack.String(key) s := hack.String(key)
chs, ok := l.keys[s] fns, ok := l.keys[s]
if !ok { if !ok {
return return
} }
for e := fns.Front(); e != nil; e = e.Next() {
var n *list.Element fn := e.Value.(context.CancelFunc)
fn()
i := 0
for e := chs.Front(); e != nil && i < num; e = n {
ch := e.Value.(lbKeyCh)
n = e.Next()
select {
case ch <- key:
chs.Remove(e)
i++
default:
//waiter unwait
chs.Remove(e)
}
} }
if chs.Len() == 0 {
delete(l.keys, s) delete(l.keys, s)
} }
func (l *lBlockKeys) popOrWait(db *DB, key []byte, whereSeq int32, fn context.CancelFunc) ([]interface{}, error) {
v, err := db.lpop(key, whereSeq)
if err != nil {
return nil, err
} else if v != nil {
return []interface{}{key, v}, nil
} }
func (l *lBlockKeys) wait(key []byte, ch lbKeyCh) {
l.Lock() l.Lock()
defer l.Unlock()
s := hack.String(key) s := hack.String(key)
chs, ok := l.keys[s] chs, ok := l.keys[s]
@ -813,29 +777,7 @@ func (l *lBlockKeys) wait(key []byte, ch lbKeyCh) {
l.keys[s] = chs l.keys[s] = chs
} }
chs.PushBack(ch) chs.PushBack(fn)
} l.Unlock()
return nil, nil
func (l *lBlockKeys) unwait(key []byte, ch lbKeyCh) {
l.Lock()
defer l.Unlock()
s := hack.String(key)
chs, ok := l.keys[s]
if !ok {
return
} else {
var n *list.Element
for e := chs.Front(); e != nil; e = n {
c := e.Value.(lbKeyCh)
n = e.Next()
if c == ch {
chs.Remove(e)
}
}
if chs.Len() == 0 {
delete(l.keys, s)
}
}
} }

View File

@ -215,13 +215,27 @@ func TestLBlock(t *testing.T) {
go f(1) go f(1)
go f(2) go f(2)
time.Sleep(100 * time.Millisecond) time.Sleep(1 * time.Millisecond)
db.LPush(key1, []byte("value")) db.LPush(key1, []byte("value"))
db.LPush(key2, []byte("value")) db.LPush(key2, []byte("value"))
wg.Wait() wg.Wait()
} }
func TestLBlockTimeout(t *testing.T) {
db := getTestDB()
key1 := []byte("test_lblock_key1")
key2 := []byte("test_lblock_key2")
ay, err := db.BLPop([][]byte{key1, key2}, 10*time.Millisecond)
if err != nil {
t.Fatal(err)
} else if len(ay) != 0 {
t.Fatal(len(ay))
}
}
func TestLFlush(t *testing.T) { func TestLFlush(t *testing.T) {
db := getTestDB() db := getTestDB()
db.FlushAll() db.FlushAll()