forked from mirror/ledisdb
update list block pop
This commit is contained in:
parent
336c1dbb94
commit
7dd3a5381d
|
@ -1,6 +1,7 @@
|
||||||
package ledis
|
package ledis
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"container/list"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"errors"
|
"errors"
|
||||||
"github.com/siddontang/go/hack"
|
"github.com/siddontang/go/hack"
|
||||||
|
@ -133,6 +134,11 @@ func (db *DB) lpush(key []byte, whereSeq int32, args ...[]byte) (int64, error) {
|
||||||
db.lSetMeta(metaKey, headSeq, tailSeq)
|
db.lSetMeta(metaKey, headSeq, tailSeq)
|
||||||
|
|
||||||
err = t.Commit()
|
err = t.Commit()
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
db.lSignalAsReady(key, pushCnt)
|
||||||
|
}
|
||||||
|
|
||||||
return int64(size) + int64(pushCnt), err
|
return int64(size) + int64(pushCnt), err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -547,6 +553,7 @@ func (db *DB) lblockPop(keys [][]byte, whereSeq int32, timeout int) ([]interface
|
||||||
if v, err := db.lpop(key, whereSeq); err != nil {
|
if v, err := db.lpop(key, whereSeq); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
} else if v == nil {
|
} else if v == nil {
|
||||||
|
db.lbkeys.wait(key, ch)
|
||||||
continue
|
continue
|
||||||
} else {
|
} else {
|
||||||
return []interface{}{key, v}, nil
|
return []interface{}{key, v}, nil
|
||||||
|
@ -559,14 +566,14 @@ func (db *DB) lblockPop(keys [][]byte, whereSeq int32, timeout int) ([]interface
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) lSignalAsReady(key []byte) {
|
func (db *DB) lSignalAsReady(key []byte, num int) {
|
||||||
if db.status == DBInTransaction {
|
if db.status == DBInTransaction {
|
||||||
//for transaction, only data can be pushed after tx commit and it is hard to signal
|
//for transaction, only data can be pushed after tx commit and it is hard to signal
|
||||||
//so we don't handle it now
|
//so we don't handle it now
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
db.lbkeys.signal(key)
|
db.lbkeys.signal(key, num)
|
||||||
}
|
}
|
||||||
|
|
||||||
type lbKeyCh chan<- []byte
|
type lbKeyCh chan<- []byte
|
||||||
|
@ -574,17 +581,17 @@ type lbKeyCh chan<- []byte
|
||||||
type lBlockKeys struct {
|
type lBlockKeys struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
|
|
||||||
keys map[string][]lbKeyCh
|
keys map[string]*list.List
|
||||||
}
|
}
|
||||||
|
|
||||||
func newLBlockKeys() *lBlockKeys {
|
func newLBlockKeys() *lBlockKeys {
|
||||||
l := new(lBlockKeys)
|
l := new(lBlockKeys)
|
||||||
|
|
||||||
l.keys = make(map[string][]lbKeyCh)
|
l.keys = make(map[string]*list.List)
|
||||||
return l
|
return l
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *lBlockKeys) signal(key []byte) {
|
func (l *lBlockKeys) signal(key []byte, num int) {
|
||||||
l.Lock()
|
l.Lock()
|
||||||
defer l.Unlock()
|
defer l.Unlock()
|
||||||
|
|
||||||
|
@ -594,23 +601,23 @@ func (l *lBlockKeys) signal(key []byte) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
LOOP:
|
var n *list.Element
|
||||||
for i, ch := range chs {
|
|
||||||
if ch == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
|
i := 0
|
||||||
|
for e := chs.Front(); e != nil && i < num; e = n {
|
||||||
|
ch := e.Value.(lbKeyCh)
|
||||||
|
n = e.Next()
|
||||||
select {
|
select {
|
||||||
case ch <- key:
|
case ch <- key:
|
||||||
break LOOP
|
chs.Remove(e)
|
||||||
|
i++
|
||||||
default:
|
default:
|
||||||
//waiter unwait
|
//waiter unwait
|
||||||
chs[i] = nil
|
chs.Remove(e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
chs = l.deleteCh(chs, nil)
|
if chs.Len() == 0 {
|
||||||
if len(chs) == 0 {
|
|
||||||
delete(l.keys, s)
|
delete(l.keys, s)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -622,20 +629,11 @@ func (l *lBlockKeys) wait(key []byte, ch lbKeyCh) {
|
||||||
s := hack.String(key)
|
s := hack.String(key)
|
||||||
chs, ok := l.keys[s]
|
chs, ok := l.keys[s]
|
||||||
if !ok {
|
if !ok {
|
||||||
chs = []lbKeyCh{ch}
|
chs = list.New()
|
||||||
l.keys[s] = chs
|
l.keys[s] = chs
|
||||||
} else {
|
|
||||||
exists := false
|
|
||||||
for _, c := range chs {
|
|
||||||
if c == ch {
|
|
||||||
exists = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !exists {
|
|
||||||
chs = append(chs, ch)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
chs.PushBack(ch)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *lBlockKeys) unwait(key []byte, ch lbKeyCh) {
|
func (l *lBlockKeys) unwait(key []byte, ch lbKeyCh) {
|
||||||
|
@ -647,23 +645,17 @@ func (l *lBlockKeys) unwait(key []byte, ch lbKeyCh) {
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
} else {
|
} else {
|
||||||
chs = l.deleteCh(chs, ch)
|
var n *list.Element
|
||||||
if len(chs) == 0 {
|
for e := chs.Front(); e != nil; e = n {
|
||||||
|
c := e.Value.(lbKeyCh)
|
||||||
|
n = e.Next()
|
||||||
|
if c == ch {
|
||||||
|
chs.Remove(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if chs.Len() == 0 {
|
||||||
delete(l.keys, s)
|
delete(l.keys, s)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *lBlockKeys) deleteCh(chs []lbKeyCh, ch lbKeyCh) []lbKeyCh {
|
|
||||||
i := 0
|
|
||||||
LOOP:
|
|
||||||
for _, c := range chs {
|
|
||||||
if c == ch {
|
|
||||||
continue LOOP
|
|
||||||
}
|
|
||||||
chs[i] = c
|
|
||||||
i++
|
|
||||||
}
|
|
||||||
|
|
||||||
return chs[:i]
|
|
||||||
}
|
|
||||||
|
|
|
@ -2,7 +2,9 @@ package ledis
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestListCodec(t *testing.T) {
|
func TestListCodec(t *testing.T) {
|
||||||
|
@ -102,6 +104,36 @@ func TestListPersist(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestLBlock(t *testing.T) {
|
||||||
|
db := getTestDB()
|
||||||
|
|
||||||
|
key1 := []byte("test_lblock_key1")
|
||||||
|
key2 := []byte("test_lblock_key2")
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(2)
|
||||||
|
|
||||||
|
f := func(i int) {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
ay, err := db.BLPop([][]byte{key1, key2}, 0)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if len(ay) != 2 {
|
||||||
|
t.Fatal(len(ay))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
go f(1)
|
||||||
|
go f(2)
|
||||||
|
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
db.LPush(key1, []byte("value"))
|
||||||
|
db.LPush(key2, []byte("value"))
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
func TestLFlush(t *testing.T) {
|
func TestLFlush(t *testing.T) {
|
||||||
db := getTestDB()
|
db := getTestDB()
|
||||||
db.FlushAll()
|
db.FlushAll()
|
||||||
|
|
|
@ -1,7 +1,9 @@
|
||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/siddontang/go/hack"
|
||||||
"github.com/siddontang/ledisdb/ledis"
|
"github.com/siddontang/ledisdb/ledis"
|
||||||
|
"strconv"
|
||||||
)
|
)
|
||||||
|
|
||||||
func lpushCommand(c *client) error {
|
func lpushCommand(c *client) error {
|
||||||
|
@ -249,7 +251,53 @@ func lxscanCommand(c *client) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func blpopCommand(c *client) error {
|
||||||
|
keys, timeout, err := lParseBPopArgs(c)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if ay, err := c.db.BLPop(keys, timeout); err != nil {
|
||||||
|
return err
|
||||||
|
} else {
|
||||||
|
c.resp.writeArray(ay)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func brpopCommand(c *client) error {
|
||||||
|
keys, timeout, err := lParseBPopArgs(c)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if ay, err := c.db.BRPop(keys, timeout); err != nil {
|
||||||
|
return err
|
||||||
|
} else {
|
||||||
|
c.resp.writeArray(ay)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func lParseBPopArgs(c *client) (keys [][]byte, timeout int, err error) {
|
||||||
|
args := c.args
|
||||||
|
if len(args) < 2 {
|
||||||
|
err = ErrCmdParams
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if timeout, err = strconv.Atoi(hack.String(args[len(args)-1])); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
keys = args[0 : len(args)-1]
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
register("blpop", blpopCommand)
|
||||||
|
register("brpop", brpopCommand)
|
||||||
register("lindex", lindexCommand)
|
register("lindex", lindexCommand)
|
||||||
register("llen", llenCommand)
|
register("llen", llenCommand)
|
||||||
register("lpop", lpopCommand)
|
register("lpop", lpopCommand)
|
||||||
|
|
Loading…
Reference in New Issue