ledisdb/ledis/t_list.go

784 lines
14 KiB
Go
Raw Normal View History

2014-05-09 10:49:22 +04:00
package ledis
2014-05-06 09:32:38 +04:00
import (
2014-10-17 05:55:43 +04:00
"container/list"
2014-05-06 09:32:38 +04:00
"encoding/binary"
"errors"
2014-10-16 13:51:52 +04:00
"sync"
"time"
2015-05-04 17:42:28 +03:00
"github.com/siddontang/go/hack"
2015-05-14 04:20:56 +03:00
"github.com/siddontang/go/log"
"github.com/siddontang/go/num"
2015-05-04 17:42:28 +03:00
"github.com/siddontang/ledisdb/store"
2017-08-21 06:14:17 +03:00
"golang.org/x/net/context"
2014-05-06 09:32:38 +04:00
)
const (
2014-05-09 05:17:28 +04:00
listHeadSeq int32 = 1
listTailSeq int32 = 2
2014-05-06 09:32:38 +04:00
2014-05-09 05:17:28 +04:00
listMinSeq int32 = 1000
listMaxSeq int32 = 1<<31 - 1000
listInitialSeq int32 = listMinSeq + (listMaxSeq-listMinSeq)/2
2014-05-06 09:32:38 +04:00
)
2014-05-09 05:17:28 +04:00
var errLMetaKey = errors.New("invalid lmeta key")
2014-05-06 09:32:38 +04:00
var errListKey = errors.New("invalid list key")
var errListSeq = errors.New("invalid list sequence, overflow")
2014-05-20 04:41:24 +04:00
func (db *DB) lEncodeMetaKey(key []byte) []byte {
2015-03-15 15:36:14 +03:00
buf := make([]byte, len(key)+1+len(db.indexVarBuf))
pos := copy(buf, db.indexVarBuf)
buf[pos] = LMetaType
pos++
2014-05-06 09:32:38 +04:00
2015-03-15 15:36:14 +03:00
copy(buf[pos:], key)
2014-05-06 09:32:38 +04:00
return buf
}
2014-05-20 04:41:24 +04:00
func (db *DB) lDecodeMetaKey(ek []byte) ([]byte, error) {
2015-03-15 15:36:14 +03:00
pos, err := db.checkKeyIndex(ek)
if err != nil {
return nil, err
}
if pos+1 > len(ek) || ek[pos] != LMetaType {
2014-05-09 05:17:28 +04:00
return nil, errLMetaKey
2014-05-06 09:32:38 +04:00
}
2015-03-15 15:36:14 +03:00
pos++
return ek[pos:], nil
2014-05-06 09:32:38 +04:00
}
2014-05-20 04:41:24 +04:00
func (db *DB) lEncodeListKey(key []byte, seq int32) []byte {
2015-03-15 15:36:14 +03:00
buf := make([]byte, len(key)+7+len(db.indexVarBuf))
pos := copy(buf, db.indexVarBuf)
2014-05-06 09:32:38 +04:00
buf[pos] = ListType
2014-05-06 09:32:38 +04:00
pos++
2014-05-20 04:41:24 +04:00
binary.BigEndian.PutUint16(buf[pos:], uint16(len(key)))
pos += 2
2014-05-06 09:32:38 +04:00
copy(buf[pos:], key)
pos += len(key)
2014-05-09 05:17:28 +04:00
binary.BigEndian.PutUint32(buf[pos:], uint32(seq))
2014-05-06 09:32:38 +04:00
return buf
}
2014-05-20 04:41:24 +04:00
func (db *DB) lDecodeListKey(ek []byte) (key []byte, seq int32, err error) {
2015-03-15 15:36:14 +03:00
pos := 0
pos, err = db.checkKeyIndex(ek)
if err != nil {
return
}
if pos+1 > len(ek) || ek[pos] != ListType {
2014-05-06 09:32:38 +04:00
err = errListKey
return
}
2015-03-15 15:36:14 +03:00
pos++
if pos+2 > len(ek) {
err = errListKey
return
}
keyLen := int(binary.BigEndian.Uint16(ek[pos:]))
pos += 2
if keyLen+pos+4 != len(ek) {
2014-05-06 09:32:38 +04:00
err = errListKey
return
}
2015-03-15 15:36:14 +03:00
key = ek[pos : pos+keyLen]
seq = int32(binary.BigEndian.Uint32(ek[pos+keyLen:]))
2014-05-06 09:32:38 +04:00
return
}
2014-05-16 04:56:32 +04:00
func (db *DB) lpush(key []byte, whereSeq int32, args ...[]byte) (int64, error) {
2014-05-20 04:41:24 +04:00
if err := checkKeySize(key); err != nil {
return 0, err
}
var headSeq int32
var tailSeq int32
var size int32
var err error
2014-08-25 10:18:23 +04:00
t := db.listBatch
t.Lock()
defer t.Unlock()
2014-05-20 04:41:24 +04:00
metaKey := db.lEncodeMetaKey(key)
headSeq, tailSeq, size, err = db.lGetMeta(nil, metaKey)
if err != nil {
return 0, err
2014-05-16 04:56:32 +04:00
}
var pushCnt int = len(args)
if pushCnt == 0 {
return int64(size), nil
2014-05-06 09:32:38 +04:00
}
var seq int32 = headSeq
var delta int32 = -1
if whereSeq == listTailSeq {
2014-05-09 05:17:28 +04:00
seq = tailSeq
delta = 1
2014-05-06 09:32:38 +04:00
}
// append elements
if size > 0 {
2014-05-06 09:32:38 +04:00
seq += delta
}
for i := 0; i < pushCnt; i++ {
2014-05-20 04:41:24 +04:00
ek := db.lEncodeListKey(key, seq+int32(i)*delta)
t.Put(ek, args[i])
2014-05-06 09:32:38 +04:00
}
seq += int32(pushCnt-1) * delta
2014-05-06 09:32:38 +04:00
if seq <= listMinSeq || seq >= listMaxSeq {
return 0, errListSeq
}
// set meta info
2014-05-09 05:17:28 +04:00
if whereSeq == listHeadSeq {
headSeq = seq
} else {
tailSeq = seq
}
db.lSetMeta(metaKey, headSeq, tailSeq)
2014-05-06 09:32:38 +04:00
err = t.Commit()
2014-10-17 05:55:43 +04:00
if err == nil {
2017-08-21 06:14:17 +03:00
db.lSignalAsReady(key)
2014-10-17 05:55:43 +04:00
}
return int64(size) + int64(pushCnt), err
2014-05-06 09:32:38 +04:00
}
func (db *DB) lpop(key []byte, whereSeq int32) ([]byte, error) {
2014-05-20 04:41:24 +04:00
if err := checkKeySize(key); err != nil {
return nil, err
}
2014-08-25 10:18:23 +04:00
t := db.listBatch
2014-05-06 09:32:38 +04:00
t.Lock()
defer t.Unlock()
2014-05-20 04:41:24 +04:00
var headSeq int32
var tailSeq int32
2015-05-14 04:20:56 +03:00
var size int32
2014-05-20 04:41:24 +04:00
var err error
metaKey := db.lEncodeMetaKey(key)
2015-05-14 04:20:56 +03:00
headSeq, tailSeq, size, err = db.lGetMeta(nil, metaKey)
2014-05-06 09:32:38 +04:00
if err != nil {
return nil, err
2015-05-14 04:20:56 +03:00
} else if size == 0 {
return nil, nil
2014-05-06 09:32:38 +04:00
}
var value []byte
var seq int32 = headSeq
if whereSeq == listTailSeq {
2014-05-09 05:17:28 +04:00
seq = tailSeq
}
2014-05-20 04:41:24 +04:00
itemKey := db.lEncodeListKey(key, seq)
2014-08-25 10:18:23 +04:00
value, err = db.bucket.Get(itemKey)
2014-05-06 09:32:38 +04:00
if err != nil {
return nil, err
}
if whereSeq == listHeadSeq {
headSeq += 1
2014-05-06 09:32:38 +04:00
} else {
tailSeq -= 1
2014-05-06 09:32:38 +04:00
}
t.Delete(itemKey)
2015-05-14 04:20:56 +03:00
size = db.lSetMeta(metaKey, headSeq, tailSeq)
if size == 0 {
2015-05-14 04:20:56 +03:00
db.rmExpire(t, ListType, key)
}
2014-05-06 09:32:38 +04:00
err = t.Commit()
return value, err
}
2015-11-26 10:04:21 +03:00
func (db *DB) ltrim2(key []byte, startP, stopP int64) (err error) {
if err := checkKeySize(key); err != nil {
return err
}
t := db.listBatch
t.Lock()
defer t.Unlock()
var headSeq int32
var llen int32
start := int32(startP)
stop := int32(stopP)
ek := db.lEncodeMetaKey(key)
if headSeq, _, llen, err = db.lGetMeta(nil, ek); err != nil {
return err
} else {
if start < 0 {
start = llen + start
}
if stop < 0 {
stop = llen + stop
}
if start >= llen || start > stop {
db.lDelete(t, key)
db.rmExpire(t, ListType, key)
return t.Commit()
}
if start < 0 {
start = 0
}
if stop >= llen {
stop = llen - 1
}
}
if start > 0 {
for i := int32(0); i < start; i++ {
t.Delete(db.lEncodeListKey(key, headSeq+i))
}
}
if stop < int32(llen-1) {
for i := int32(stop + 1); i < llen; i++ {
t.Delete(db.lEncodeListKey(key, headSeq+i))
}
}
db.lSetMeta(ek, headSeq+start, headSeq+stop)
return t.Commit()
}
func (db *DB) ltrim(key []byte, trimSize, whereSeq int32) (int32, error) {
if err := checkKeySize(key); err != nil {
return 0, err
}
if trimSize == 0 {
return 0, nil
}
t := db.listBatch
t.Lock()
defer t.Unlock()
var headSeq int32
var tailSeq int32
var size int32
var err error
metaKey := db.lEncodeMetaKey(key)
headSeq, tailSeq, size, err = db.lGetMeta(nil, metaKey)
if err != nil {
return 0, err
} else if size == 0 {
return 0, nil
}
var (
trimStartSeq int32
trimEndSeq int32
)
if whereSeq == listHeadSeq {
trimStartSeq = headSeq
trimEndSeq = num.MinInt32(trimStartSeq+trimSize-1, tailSeq)
headSeq = trimEndSeq + 1
} else {
trimEndSeq = tailSeq
trimStartSeq = num.MaxInt32(trimEndSeq-trimSize+1, headSeq)
tailSeq = trimStartSeq - 1
}
for trimSeq := trimStartSeq; trimSeq <= trimEndSeq; trimSeq++ {
itemKey := db.lEncodeListKey(key, trimSeq)
t.Delete(itemKey)
}
size = db.lSetMeta(metaKey, headSeq, tailSeq)
if size == 0 {
db.rmExpire(t, ListType, key)
}
err = t.Commit()
return trimEndSeq - trimStartSeq + 1, err
}
// ps : here just focus on deleting the list data,
// any other likes expire is ignore.
2014-08-25 10:18:23 +04:00
func (db *DB) lDelete(t *batch, key []byte) int64 {
mk := db.lEncodeMetaKey(key)
var headSeq int32
var tailSeq int32
var err error
2014-08-25 10:18:23 +04:00
it := db.bucket.NewIterator()
defer it.Close()
headSeq, tailSeq, _, err = db.lGetMeta(it, mk)
if err != nil {
return 0
}
var num int64 = 0
startKey := db.lEncodeListKey(key, headSeq)
stopKey := db.lEncodeListKey(key, tailSeq)
2014-07-25 13:58:00 +04:00
rit := store.NewRangeIterator(it, &store.Range{startKey, stopKey, store.RangeClose})
for ; rit.Valid(); rit.Next() {
t.Delete(rit.RawKey())
num++
}
t.Delete(mk)
return num
}
2014-07-25 13:58:00 +04:00
func (db *DB) lGetMeta(it *store.Iterator, ek []byte) (headSeq int32, tailSeq int32, size int32, err error) {
var v []byte
if it != nil {
v = it.Find(ek)
} else {
2014-08-25 10:18:23 +04:00
v, err = db.bucket.Get(ek)
}
if err != nil {
return
} else if v == nil {
headSeq = listInitialSeq
tailSeq = listInitialSeq
size = 0
return
} else {
headSeq = int32(binary.LittleEndian.Uint32(v[0:4]))
tailSeq = int32(binary.LittleEndian.Uint32(v[4:8]))
size = tailSeq - headSeq + 1
}
return
}
func (db *DB) lSetMeta(ek []byte, headSeq int32, tailSeq int32) int32 {
2014-08-25 10:18:23 +04:00
t := db.listBatch
var size int32 = tailSeq - headSeq + 1
if size < 0 {
// todo : log error + panic
2015-05-14 04:20:56 +03:00
log.Fatalf("invalid meta sequence range [%d, %d]", headSeq, tailSeq)
} else if size == 0 {
t.Delete(ek)
} else {
buf := make([]byte, 8)
binary.LittleEndian.PutUint32(buf[0:4], uint32(headSeq))
binary.LittleEndian.PutUint32(buf[4:8], uint32(tailSeq))
t.Put(ek, buf)
}
return size
}
func (db *DB) lExpireAt(key []byte, when int64) (int64, error) {
2014-08-25 10:18:23 +04:00
t := db.listBatch
t.Lock()
defer t.Unlock()
if llen, err := db.LLen(key); err != nil || llen == 0 {
return 0, err
} else {
db.expireAt(t, ListType, key, when)
if err := t.Commit(); err != nil {
return 0, err
}
}
return 1, nil
}
func (db *DB) LIndex(key []byte, index int32) ([]byte, error) {
2014-05-20 04:41:24 +04:00
if err := checkKeySize(key); err != nil {
return nil, err
}
var seq int32
2014-05-20 04:41:24 +04:00
var headSeq int32
var tailSeq int32
var err error
metaKey := db.lEncodeMetaKey(key)
2014-08-25 10:18:23 +04:00
it := db.bucket.NewIterator()
defer it.Close()
headSeq, tailSeq, _, err = db.lGetMeta(it, metaKey)
if err != nil {
return nil, err
}
if index >= 0 {
seq = headSeq + index
} else {
seq = tailSeq + index + 1
}
2014-05-20 04:41:24 +04:00
sk := db.lEncodeListKey(key, seq)
v := it.Find(sk)
return v, nil
}
func (db *DB) LLen(key []byte) (int64, error) {
2014-05-20 04:41:24 +04:00
if err := checkKeySize(key); err != nil {
return 0, err
}
ek := db.lEncodeMetaKey(key)
_, _, size, err := db.lGetMeta(nil, ek)
return int64(size), err
}
func (db *DB) LPop(key []byte) ([]byte, error) {
return db.lpop(key, listHeadSeq)
}
2015-11-26 10:04:21 +03:00
func (db *DB) LTrim(key []byte, start, stop int64) error {
return db.ltrim2(key, start, stop)
}
func (db *DB) LTrimFront(key []byte, trimSize int32) (int32, error) {
return db.ltrim(key, trimSize, listHeadSeq)
}
func (db *DB) LTrimBack(key []byte, trimSize int32) (int32, error) {
return db.ltrim(key, trimSize, listTailSeq)
}
2014-05-16 04:56:32 +04:00
func (db *DB) LPush(key []byte, args ...[]byte) (int64, error) {
return db.lpush(key, listHeadSeq, args...)
}
2015-12-01 03:50:42 +03:00
func (db *DB) LSet(key []byte, index int32, value []byte) error {
if err := checkKeySize(key); err != nil {
return err
}
var seq int32
var headSeq int32
var tailSeq int32
//var size int32
var err error
t := db.listBatch
t.Lock()
defer t.Unlock()
metaKey := db.lEncodeMetaKey(key)
headSeq, tailSeq, _, err = db.lGetMeta(nil, metaKey)
if err != nil {
return err
}
if index >= 0 {
seq = headSeq + index
} else {
seq = tailSeq + index + 1
}
2015-12-10 07:08:14 +03:00
if seq < headSeq || seq > tailSeq {
return errListIndex
}
2015-12-01 03:50:42 +03:00
sk := db.lEncodeListKey(key, seq)
t.Put(sk, value)
err = t.Commit()
return err
}
func (db *DB) LRange(key []byte, start int32, stop int32) ([][]byte, error) {
2014-05-20 04:41:24 +04:00
if err := checkKeySize(key); err != nil {
return nil, err
}
var headSeq int32
var llen int32
2014-05-20 04:41:24 +04:00
var err error
metaKey := db.lEncodeMetaKey(key)
2014-08-25 10:18:23 +04:00
it := db.bucket.NewIterator()
defer it.Close()
if headSeq, _, llen, err = db.lGetMeta(it, metaKey); err != nil {
2014-05-09 05:17:28 +04:00
return nil, err
}
2014-05-06 09:32:38 +04:00
if start < 0 {
start = llen + start
}
if stop < 0 {
stop = llen + stop
}
if start < 0 {
start = 0
2014-05-06 09:32:38 +04:00
}
if start > stop || start >= llen {
return [][]byte{}, nil
2014-05-06 09:32:38 +04:00
}
if stop >= llen {
stop = llen - 1
}
limit := (stop - start) + 1
headSeq += start
v := make([][]byte, 0, limit)
startKey := db.lEncodeListKey(key, headSeq)
2014-07-25 13:58:00 +04:00
rit := store.NewRangeLimitIterator(it,
&store.Range{
Min: startKey,
Max: nil,
2014-07-25 13:58:00 +04:00
Type: store.RangeClose},
&store.Limit{
Offset: 0,
Count: int(limit)})
for ; rit.Valid(); rit.Next() {
v = append(v, rit.Value())
2014-05-06 09:32:38 +04:00
}
return v, nil
}
func (db *DB) RPop(key []byte) ([]byte, error) {
return db.lpop(key, listTailSeq)
}
2014-05-06 09:32:38 +04:00
2014-05-16 04:56:32 +04:00
func (db *DB) RPush(key []byte, args ...[]byte) (int64, error) {
return db.lpush(key, listTailSeq, args...)
2014-05-06 09:32:38 +04:00
}
2014-05-12 11:08:59 +04:00
func (db *DB) LClear(key []byte) (int64, error) {
2014-05-20 04:41:24 +04:00
if err := checkKeySize(key); err != nil {
return 0, err
}
2014-08-25 10:18:23 +04:00
t := db.listBatch
2014-05-12 11:08:59 +04:00
t.Lock()
defer t.Unlock()
num := db.lDelete(t, key)
db.rmExpire(t, ListType, key)
2014-05-12 11:08:59 +04:00
err := t.Commit()
2014-05-12 11:08:59 +04:00
return num, err
}
func (db *DB) LMclear(keys ...[]byte) (int64, error) {
2014-08-25 10:18:23 +04:00
t := db.listBatch
t.Lock()
defer t.Unlock()
for _, key := range keys {
if err := checkKeySize(key); err != nil {
return 0, err
}
db.lDelete(t, key)
db.rmExpire(t, ListType, key)
}
err := t.Commit()
return int64(len(keys)), err
}
func (db *DB) lFlush() (drop int64, err error) {
2014-08-25 10:18:23 +04:00
t := db.listBatch
2014-07-03 07:38:52 +04:00
t.Lock()
defer t.Unlock()
2014-08-16 11:35:05 +04:00
return db.flushType(t, ListType)
}
func (db *DB) LExpire(key []byte, duration int64) (int64, error) {
if duration <= 0 {
return 0, errExpireValue
}
return db.lExpireAt(key, time.Now().Unix()+duration)
}
func (db *DB) LExpireAt(key []byte, when int64) (int64, error) {
if when <= time.Now().Unix() {
return 0, errExpireValue
}
return db.lExpireAt(key, when)
}
func (db *DB) LTTL(key []byte) (int64, error) {
if err := checkKeySize(key); err != nil {
return -1, err
}
return db.ttl(ListType, key)
}
2014-06-23 07:12:20 +04:00
func (db *DB) LPersist(key []byte) (int64, error) {
if err := checkKeySize(key); err != nil {
return 0, err
}
2014-08-25 10:18:23 +04:00
t := db.listBatch
2014-06-23 07:12:20 +04:00
t.Lock()
defer t.Unlock()
n, err := db.rmExpire(t, ListType, key)
2014-06-23 07:12:20 +04:00
if err != nil {
return 0, err
}
err = t.Commit()
return n, err
}
2014-08-14 11:24:41 +04:00
func (db *DB) lEncodeMinKey() []byte {
return db.lEncodeMetaKey(nil)
}
func (db *DB) lEncodeMaxKey() []byte {
ek := db.lEncodeMetaKey(nil)
ek[len(ek)-1] = LMetaType + 1
return ek
}
2014-10-16 13:51:52 +04:00
func (db *DB) BLPop(keys [][]byte, timeout time.Duration) ([]interface{}, error) {
2014-10-16 13:51:52 +04:00
return db.lblockPop(keys, listHeadSeq, timeout)
}
func (db *DB) BRPop(keys [][]byte, timeout time.Duration) ([]interface{}, error) {
2014-10-16 13:51:52 +04:00
return db.lblockPop(keys, listTailSeq, timeout)
}
func (db *DB) LKeyExists(key []byte) (int64, error) {
2015-02-01 12:41:28 +03:00
if err := checkKeySize(key); err != nil {
return 0, err
}
sk := db.lEncodeMetaKey(key)
v, err := db.bucket.Get(sk)
if v != nil && err == nil {
return 1, nil
}
return 0, err
}
func (db *DB) lblockPop(keys [][]byte, whereSeq int32, timeout time.Duration) ([]interface{}, error) {
2017-08-21 06:14:17 +03:00
for {
var ctx context.Context
var cancel context.CancelFunc
if timeout > 0 {
ctx, cancel = context.WithTimeout(context.Background(), timeout)
2014-10-16 13:51:52 +04:00
} else {
2017-08-21 06:14:17 +03:00
ctx, cancel = context.WithCancel(context.Background())
2014-10-16 13:51:52 +04:00
}
2017-08-21 06:14:17 +03:00
for _, key := range keys {
v, err := db.lbkeys.popOrWait(db, key, whereSeq, cancel)
2014-10-16 13:51:52 +04:00
2017-08-21 06:14:17 +03:00
if err != nil {
cancel()
2014-10-16 13:51:52 +04:00
return nil, err
2017-08-21 06:14:17 +03:00
} else if v != nil {
cancel()
2014-10-16 13:51:52 +04:00
return []interface{}{key, v}, nil
}
}
2017-08-21 06:14:17 +03:00
//blocking wait
<-ctx.Done()
cancel()
//if ctx.Err() is a deadline exceeded (timeout) we return
//otherwise we try to pop one of the keys again.
if ctx.Err() == context.DeadlineExceeded {
return nil, nil
}
2014-10-16 13:51:52 +04:00
}
}
2017-08-21 06:14:17 +03:00
func (db *DB) lSignalAsReady(key []byte) {
db.lbkeys.signal(key)
2014-10-16 13:51:52 +04:00
}
type lBlockKeys struct {
sync.Mutex
2014-10-17 05:55:43 +04:00
keys map[string]*list.List
2014-10-16 13:51:52 +04:00
}
func newLBlockKeys() *lBlockKeys {
l := new(lBlockKeys)
2014-10-17 05:55:43 +04:00
l.keys = make(map[string]*list.List)
2014-10-16 13:51:52 +04:00
return l
}
2017-08-21 06:14:17 +03:00
func (l *lBlockKeys) signal(key []byte) {
2014-10-16 13:51:52 +04:00
l.Lock()
defer l.Unlock()
s := hack.String(key)
2017-08-21 06:14:17 +03:00
fns, ok := l.keys[s]
2014-10-16 13:51:52 +04:00
if !ok {
return
}
2017-08-21 06:14:17 +03:00
for e := fns.Front(); e != nil; e = e.Next() {
fn := e.Value.(context.CancelFunc)
fn()
2014-10-16 13:51:52 +04:00
}
2017-08-21 06:14:17 +03:00
delete(l.keys, s)
2014-10-16 13:51:52 +04:00
}
2017-08-21 06:14:17 +03:00
func (l *lBlockKeys) popOrWait(db *DB, key []byte, whereSeq int32, fn context.CancelFunc) ([]interface{}, error) {
v, err := db.lpop(key, whereSeq)
if err != nil {
return nil, err
} else if v != nil {
return []interface{}{key, v}, nil
}
2014-10-16 13:51:52 +04:00
l.Lock()
s := hack.String(key)
chs, ok := l.keys[s]
if !ok {
2014-10-17 05:55:43 +04:00
chs = list.New()
2014-10-16 13:51:52 +04:00
l.keys[s] = chs
}
2014-10-17 05:55:43 +04:00
2017-08-21 06:14:17 +03:00
chs.PushBack(fn)
l.Unlock()
return nil, nil
2014-10-16 13:51:52 +04:00
}