From 57a07edd1a2e45e5e812db7aabfc9c7fb43dc7e6 Mon Sep 17 00:00:00 2001 From: Muhamad Azmy Date: Mon, 21 Aug 2017 05:14:17 +0200 Subject: [PATCH] Fix BRPop infinite blocking (#316) --- .../golang.org/x/net/context/context.go | 102 ------------ .../vendor/golang.org/x/net/context/go19.go | 20 +++ .../golang.org/x/net/context/pre_go19.go | 109 +++++++++++++ glide.lock | 4 +- ledis/t_list.go | 148 ++++++------------ ledis/t_list_test.go | 16 +- 6 files changed, 191 insertions(+), 208 deletions(-) create mode 100644 _vendor/vendor/golang.org/x/net/context/go19.go create mode 100644 _vendor/vendor/golang.org/x/net/context/pre_go19.go diff --git a/_vendor/vendor/golang.org/x/net/context/context.go b/_vendor/vendor/golang.org/x/net/context/context.go index f143ed6..d3681ab 100644 --- a/_vendor/vendor/golang.org/x/net/context/context.go +++ b/_vendor/vendor/golang.org/x/net/context/context.go @@ -36,103 +36,6 @@ // Contexts. package context // import "golang.org/x/net/context" -import "time" - -// A Context carries a deadline, a cancelation signal, and other values across -// API boundaries. -// -// Context's methods may be called by multiple goroutines simultaneously. -type Context interface { - // Deadline returns the time when work done on behalf of this context - // should be canceled. Deadline returns ok==false when no deadline is - // set. Successive calls to Deadline return the same results. - Deadline() (deadline time.Time, ok bool) - - // Done returns a channel that's closed when work done on behalf of this - // context should be canceled. Done may return nil if this context can - // never be canceled. Successive calls to Done return the same value. - // - // WithCancel arranges for Done to be closed when cancel is called; - // WithDeadline arranges for Done to be closed when the deadline - // expires; WithTimeout arranges for Done to be closed when the timeout - // elapses. - // - // Done is provided for use in select statements: - // - // // Stream generates values with DoSomething and sends them to out - // // until DoSomething returns an error or ctx.Done is closed. - // func Stream(ctx context.Context, out chan<- Value) error { - // for { - // v, err := DoSomething(ctx) - // if err != nil { - // return err - // } - // select { - // case <-ctx.Done(): - // return ctx.Err() - // case out <- v: - // } - // } - // } - // - // See http://blog.golang.org/pipelines for more examples of how to use - // a Done channel for cancelation. - Done() <-chan struct{} - - // Err returns a non-nil error value after Done is closed. Err returns - // Canceled if the context was canceled or DeadlineExceeded if the - // context's deadline passed. No other values for Err are defined. - // After Done is closed, successive calls to Err return the same value. - Err() error - - // Value returns the value associated with this context for key, or nil - // if no value is associated with key. Successive calls to Value with - // the same key returns the same result. - // - // Use context values only for request-scoped data that transits - // processes and API boundaries, not for passing optional parameters to - // functions. - // - // A key identifies a specific value in a Context. Functions that wish - // to store values in Context typically allocate a key in a global - // variable then use that key as the argument to context.WithValue and - // Context.Value. A key can be any type that supports equality; - // packages should define keys as an unexported type to avoid - // collisions. - // - // Packages that define a Context key should provide type-safe accessors - // for the values stores using that key: - // - // // Package user defines a User type that's stored in Contexts. - // package user - // - // import "golang.org/x/net/context" - // - // // User is the type of value stored in the Contexts. - // type User struct {...} - // - // // key is an unexported type for keys defined in this package. - // // This prevents collisions with keys defined in other packages. - // type key int - // - // // userKey is the key for user.User values in Contexts. It is - // // unexported; clients use user.NewContext and user.FromContext - // // instead of using this key directly. - // var userKey key = 0 - // - // // NewContext returns a new Context that carries value u. - // func NewContext(ctx context.Context, u *User) context.Context { - // return context.WithValue(ctx, userKey, u) - // } - // - // // FromContext returns the User value stored in ctx, if any. - // func FromContext(ctx context.Context) (*User, bool) { - // u, ok := ctx.Value(userKey).(*User) - // return u, ok - // } - Value(key interface{}) interface{} -} - // Background returns a non-nil, empty Context. It is never canceled, has no // values, and has no deadline. It is typically used by the main function, // initialization, and tests, and as the top-level Context for incoming @@ -149,8 +52,3 @@ func Background() Context { func TODO() Context { return todo } - -// A CancelFunc tells an operation to abandon its work. -// A CancelFunc does not wait for the work to stop. -// After the first call, subsequent calls to a CancelFunc do nothing. -type CancelFunc func() diff --git a/_vendor/vendor/golang.org/x/net/context/go19.go b/_vendor/vendor/golang.org/x/net/context/go19.go new file mode 100644 index 0000000..d88bd1d --- /dev/null +++ b/_vendor/vendor/golang.org/x/net/context/go19.go @@ -0,0 +1,20 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build go1.9 + +package context + +import "context" // standard library's context, as of Go 1.7 + +// A Context carries a deadline, a cancelation signal, and other values across +// API boundaries. +// +// Context's methods may be called by multiple goroutines simultaneously. +type Context = context.Context + +// A CancelFunc tells an operation to abandon its work. +// A CancelFunc does not wait for the work to stop. +// After the first call, subsequent calls to a CancelFunc do nothing. +type CancelFunc = context.CancelFunc diff --git a/_vendor/vendor/golang.org/x/net/context/pre_go19.go b/_vendor/vendor/golang.org/x/net/context/pre_go19.go new file mode 100644 index 0000000..b105f80 --- /dev/null +++ b/_vendor/vendor/golang.org/x/net/context/pre_go19.go @@ -0,0 +1,109 @@ +// Copyright 2014 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build !go1.9 + +package context + +import "time" + +// A Context carries a deadline, a cancelation signal, and other values across +// API boundaries. +// +// Context's methods may be called by multiple goroutines simultaneously. +type Context interface { + // Deadline returns the time when work done on behalf of this context + // should be canceled. Deadline returns ok==false when no deadline is + // set. Successive calls to Deadline return the same results. + Deadline() (deadline time.Time, ok bool) + + // Done returns a channel that's closed when work done on behalf of this + // context should be canceled. Done may return nil if this context can + // never be canceled. Successive calls to Done return the same value. + // + // WithCancel arranges for Done to be closed when cancel is called; + // WithDeadline arranges for Done to be closed when the deadline + // expires; WithTimeout arranges for Done to be closed when the timeout + // elapses. + // + // Done is provided for use in select statements: + // + // // Stream generates values with DoSomething and sends them to out + // // until DoSomething returns an error or ctx.Done is closed. + // func Stream(ctx context.Context, out chan<- Value) error { + // for { + // v, err := DoSomething(ctx) + // if err != nil { + // return err + // } + // select { + // case <-ctx.Done(): + // return ctx.Err() + // case out <- v: + // } + // } + // } + // + // See http://blog.golang.org/pipelines for more examples of how to use + // a Done channel for cancelation. + Done() <-chan struct{} + + // Err returns a non-nil error value after Done is closed. Err returns + // Canceled if the context was canceled or DeadlineExceeded if the + // context's deadline passed. No other values for Err are defined. + // After Done is closed, successive calls to Err return the same value. + Err() error + + // Value returns the value associated with this context for key, or nil + // if no value is associated with key. Successive calls to Value with + // the same key returns the same result. + // + // Use context values only for request-scoped data that transits + // processes and API boundaries, not for passing optional parameters to + // functions. + // + // A key identifies a specific value in a Context. Functions that wish + // to store values in Context typically allocate a key in a global + // variable then use that key as the argument to context.WithValue and + // Context.Value. A key can be any type that supports equality; + // packages should define keys as an unexported type to avoid + // collisions. + // + // Packages that define a Context key should provide type-safe accessors + // for the values stores using that key: + // + // // Package user defines a User type that's stored in Contexts. + // package user + // + // import "golang.org/x/net/context" + // + // // User is the type of value stored in the Contexts. + // type User struct {...} + // + // // key is an unexported type for keys defined in this package. + // // This prevents collisions with keys defined in other packages. + // type key int + // + // // userKey is the key for user.User values in Contexts. It is + // // unexported; clients use user.NewContext and user.FromContext + // // instead of using this key directly. + // var userKey key = 0 + // + // // NewContext returns a new Context that carries value u. + // func NewContext(ctx context.Context, u *User) context.Context { + // return context.WithValue(ctx, userKey, u) + // } + // + // // FromContext returns the User value stored in ctx, if any. + // func FromContext(ctx context.Context) (*User, bool) { + // u, ok := ctx.Value(userKey).(*User) + // return u, ok + // } + Value(key interface{}) interface{} +} + +// A CancelFunc tells an operation to abandon its work. +// A CancelFunc does not wait for the work to stop. +// After the first call, subsequent calls to a CancelFunc do nothing. +type CancelFunc func() diff --git a/glide.lock b/glide.lock index 6824efc..b4c0703 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ hash: 83229e7ecea008add55b2547fb28a23d247a4740fc30b4659f58bf5d495e3584 -updated: 2017-06-08T13:54:52.259026852+02:00 +updated: 2017-08-16T11:06:05.978126639+02:00 imports: - name: github.com/cupcake/rdb version: 43ba34106c765f2111c0dc7b74cdf8ee437411e0 @@ -59,7 +59,7 @@ imports: - parse - pm - name: golang.org/x/net - version: 59a0b19b5533c7977ddeb86b017bf507ed407b12 + version: 1c05540f6879653db88113bc4a2b70aec4bd491f subpackages: - context testImports: [] diff --git a/ledis/t_list.go b/ledis/t_list.go index 718ad8e..83ed56e 100644 --- a/ledis/t_list.go +++ b/ledis/t_list.go @@ -11,6 +11,7 @@ import ( "github.com/siddontang/go/log" "github.com/siddontang/go/num" "github.com/siddontang/ledisdb/store" + "golang.org/x/net/context" ) const ( @@ -159,7 +160,7 @@ func (db *DB) lpush(key []byte, whereSeq int32, args ...[]byte) (int64, error) { err = t.Commit() if err == nil { - db.lSignalAsReady(key, pushCnt) + db.lSignalAsReady(key) } return int64(size) + int64(pushCnt), err @@ -692,72 +693,43 @@ func (db *DB) LKeyExists(key []byte) (int64, error) { } func (db *DB) lblockPop(keys [][]byte, whereSeq int32, timeout time.Duration) ([]interface{}, error) { - ch := make(chan []byte) - - bkeys := [][]byte{} - for _, key := range keys { - v, err := db.lpop(key, whereSeq) - if err != nil { - return nil, err - } else if v != nil { - return []interface{}{key, v}, nil - } else { - db.lbkeys.wait(key, ch) - bkeys = append(bkeys, key) - } - } - if len(bkeys) == 0 { - return nil, nil - } - - defer func() { - for _, key := range bkeys { - db.lbkeys.unwait(key, ch) - } - }() - - deadT := time.Now().Add(timeout) - for { - if timeout == 0 { - key := <-ch - if v, err := db.lpop(key, whereSeq); err != nil { + var ctx context.Context + var cancel context.CancelFunc + if timeout > 0 { + ctx, cancel = context.WithTimeout(context.Background(), timeout) + } else { + ctx, cancel = context.WithCancel(context.Background()) + } + + for _, key := range keys { + v, err := db.lbkeys.popOrWait(db, key, whereSeq, cancel) + + if err != nil { + cancel() return nil, err - } else if v == nil { - continue - } else { + } else if v != nil { + cancel() return []interface{}{key, v}, nil } - } else { - d := deadT.Sub(time.Now()) - if d < 0 { - return nil, nil - } - - select { - case key := <-ch: - if v, err := db.lpop(key, whereSeq); err != nil { - return nil, err - } else if v == nil { - db.lbkeys.wait(key, ch) - continue - } else { - return []interface{}{key, v}, nil - } - case <-time.After(d): - return nil, nil - } } + //blocking wait + <-ctx.Done() + cancel() + + //if ctx.Err() is a deadline exceeded (timeout) we return + //otherwise we try to pop one of the keys again. + if ctx.Err() == context.DeadlineExceeded { + return nil, nil + } } } -func (db *DB) lSignalAsReady(key []byte, num int) { - db.lbkeys.signal(key, num) +func (db *DB) lSignalAsReady(key []byte) { + db.lbkeys.signal(key) } -type lbKeyCh chan<- []byte - type lBlockKeys struct { sync.Mutex @@ -771,40 +743,32 @@ func newLBlockKeys() *lBlockKeys { return l } -func (l *lBlockKeys) signal(key []byte, num int) { +func (l *lBlockKeys) signal(key []byte) { l.Lock() defer l.Unlock() s := hack.String(key) - chs, ok := l.keys[s] + fns, ok := l.keys[s] if !ok { return } - - var n *list.Element - - i := 0 - for e := chs.Front(); e != nil && i < num; e = n { - ch := e.Value.(lbKeyCh) - n = e.Next() - select { - case ch <- key: - chs.Remove(e) - i++ - default: - //waiter unwait - chs.Remove(e) - } + for e := fns.Front(); e != nil; e = e.Next() { + fn := e.Value.(context.CancelFunc) + fn() } - if chs.Len() == 0 { - delete(l.keys, s) - } + delete(l.keys, s) } -func (l *lBlockKeys) wait(key []byte, ch lbKeyCh) { +func (l *lBlockKeys) popOrWait(db *DB, key []byte, whereSeq int32, fn context.CancelFunc) ([]interface{}, error) { + v, err := db.lpop(key, whereSeq) + if err != nil { + return nil, err + } else if v != nil { + return []interface{}{key, v}, nil + } + l.Lock() - defer l.Unlock() s := hack.String(key) chs, ok := l.keys[s] @@ -813,29 +777,7 @@ func (l *lBlockKeys) wait(key []byte, ch lbKeyCh) { l.keys[s] = chs } - chs.PushBack(ch) -} - -func (l *lBlockKeys) unwait(key []byte, ch lbKeyCh) { - l.Lock() - defer l.Unlock() - - s := hack.String(key) - chs, ok := l.keys[s] - if !ok { - return - } else { - var n *list.Element - for e := chs.Front(); e != nil; e = n { - c := e.Value.(lbKeyCh) - n = e.Next() - if c == ch { - chs.Remove(e) - } - } - - if chs.Len() == 0 { - delete(l.keys, s) - } - } + chs.PushBack(fn) + l.Unlock() + return nil, nil } diff --git a/ledis/t_list_test.go b/ledis/t_list_test.go index 32d960d..d179a04 100644 --- a/ledis/t_list_test.go +++ b/ledis/t_list_test.go @@ -215,13 +215,27 @@ func TestLBlock(t *testing.T) { go f(1) go f(2) - time.Sleep(100 * time.Millisecond) + time.Sleep(1 * time.Millisecond) db.LPush(key1, []byte("value")) db.LPush(key2, []byte("value")) wg.Wait() } +func TestLBlockTimeout(t *testing.T) { + db := getTestDB() + + key1 := []byte("test_lblock_key1") + key2 := []byte("test_lblock_key2") + + ay, err := db.BLPop([][]byte{key1, key2}, 10*time.Millisecond) + if err != nil { + t.Fatal(err) + } else if len(ay) != 0 { + t.Fatal(len(ay)) + } +} + func TestLFlush(t *testing.T) { db := getTestDB() db.FlushAll()