add block list operation

This commit is contained in:
siddontang 2014-10-16 17:51:52 +08:00
parent 0f8e9553ac
commit 336c1dbb94
4 changed files with 190 additions and 0 deletions

View File

@ -39,6 +39,8 @@ type DB struct {
setBatch *batch
status uint8
lbkeys *lBlockKeys
}
func (l *Ledis) newDB(index uint8) *DB {
@ -60,6 +62,8 @@ func (l *Ledis) newDB(index uint8) *DB {
d.binBatch = d.newBatch()
d.setBatch = d.newBatch()
d.lbkeys = newLBlockKeys()
return d
}

View File

@ -47,6 +47,8 @@ func (db *DB) Multi() (*Multi, error) {
m.DB.binBatch = m.newBatch()
m.DB.setBatch = m.newBatch()
m.DB.lbkeys = db.lbkeys
return m, nil
}

View File

@ -3,7 +3,9 @@ package ledis
import (
"encoding/binary"
"errors"
"github.com/siddontang/go/hack"
"github.com/siddontang/ledisdb/store"
"sync"
"time"
)
@ -485,3 +487,183 @@ func (db *DB) lEncodeMaxKey() []byte {
ek[len(ek)-1] = LMetaType + 1
return ek
}
func (db *DB) BLPop(keys [][]byte, timeout int) ([]interface{}, error) {
return db.lblockPop(keys, listHeadSeq, timeout)
}
func (db *DB) BRPop(keys [][]byte, timeout int) ([]interface{}, error) {
return db.lblockPop(keys, listTailSeq, timeout)
}
func (db *DB) lblockPop(keys [][]byte, whereSeq int32, timeout int) ([]interface{}, error) {
ch := make(chan []byte)
bkeys := [][]byte{}
for _, key := range keys {
v, err := db.lpop(key, whereSeq)
if err != nil {
return nil, err
} else if v != nil {
return []interface{}{key, v}, nil
} else {
if db.IsAutoCommit() {
//block wait can not be supported in transaction and multi
db.lbkeys.wait(key, ch)
bkeys = append(bkeys, key)
}
}
}
if len(bkeys) == 0 {
return nil, nil
}
defer func() {
for _, key := range bkeys {
db.lbkeys.unwait(key, ch)
}
}()
deadT := time.Now().Add(time.Duration(timeout) * time.Second)
for {
if timeout == 0 {
key := <-ch
if v, err := db.lpop(key, whereSeq); err != nil {
return nil, err
} else if v == nil {
continue
} else {
return []interface{}{key, v}, nil
}
} else {
d := deadT.Sub(time.Now())
if d < 0 {
return nil, nil
}
select {
case key := <-ch:
if v, err := db.lpop(key, whereSeq); err != nil {
return nil, err
} else if v == nil {
continue
} else {
return []interface{}{key, v}, nil
}
case <-time.After(d):
return nil, nil
}
}
}
}
func (db *DB) lSignalAsReady(key []byte) {
if db.status == DBInTransaction {
//for transaction, only data can be pushed after tx commit and it is hard to signal
//so we don't handle it now
return
}
db.lbkeys.signal(key)
}
type lbKeyCh chan<- []byte
type lBlockKeys struct {
sync.Mutex
keys map[string][]lbKeyCh
}
func newLBlockKeys() *lBlockKeys {
l := new(lBlockKeys)
l.keys = make(map[string][]lbKeyCh)
return l
}
func (l *lBlockKeys) signal(key []byte) {
l.Lock()
defer l.Unlock()
s := hack.String(key)
chs, ok := l.keys[s]
if !ok {
return
}
LOOP:
for i, ch := range chs {
if ch == nil {
continue
}
select {
case ch <- key:
break LOOP
default:
//waiter unwait
chs[i] = nil
}
}
chs = l.deleteCh(chs, nil)
if len(chs) == 0 {
delete(l.keys, s)
}
}
func (l *lBlockKeys) wait(key []byte, ch lbKeyCh) {
l.Lock()
defer l.Unlock()
s := hack.String(key)
chs, ok := l.keys[s]
if !ok {
chs = []lbKeyCh{ch}
l.keys[s] = chs
} else {
exists := false
for _, c := range chs {
if c == ch {
exists = true
break
}
}
if !exists {
chs = append(chs, ch)
}
}
}
func (l *lBlockKeys) unwait(key []byte, ch lbKeyCh) {
l.Lock()
defer l.Unlock()
s := hack.String(key)
chs, ok := l.keys[s]
if !ok {
return
} else {
chs = l.deleteCh(chs, ch)
if len(chs) == 0 {
delete(l.keys, s)
}
}
}
func (l *lBlockKeys) deleteCh(chs []lbKeyCh, ch lbKeyCh) []lbKeyCh {
i := 0
LOOP:
for _, c := range chs {
if c == ch {
continue LOOP
}
chs[i] = c
i++
}
return chs[:i]
}

View File

@ -61,6 +61,8 @@ func (db *DB) Begin() (*Tx, error) {
tx.DB.binBatch = tx.newBatch()
tx.DB.setBatch = tx.newBatch()
tx.DB.lbkeys = db.lbkeys
return tx, nil
}