refactor list, optimize

This commit is contained in:
siddontang 2014-05-09 09:17:28 +08:00
parent e71c22cf76
commit 112e6bd42b
9 changed files with 187 additions and 143 deletions

View File

@ -5,12 +5,15 @@ import (
"github.com/siddontang/go-ssdb/ssdb" "github.com/siddontang/go-ssdb/ssdb"
"os" "os"
"os/signal" "os/signal"
"runtime"
"syscall" "syscall"
) )
var configFile = flag.String("config", "", "ssdb config file") var configFile = flag.String("config", "", "ssdb config file")
func main() { func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
flag.Parse() flag.Parse()
if len(*configFile) == 0 { if len(*configFile) == 0 {

View File

@ -2,9 +2,9 @@
"addr": "127.0.0.1:6380", "addr": "127.0.0.1:6380",
"leveldb": { "leveldb": {
"path": "/tmp/ssdb", "path": "/tmp/ssdb",
"compression": true, "compression": false,
"block_size": 32768, "block_size": 32768,
"write_buffer_size": 2097152, "write_buffer_size": 67108864,
"cache_size": 20971520 "cache_size": 524288000
} }
} }

View File

@ -23,6 +23,8 @@ type client struct {
cmd string cmd string
args [][]byte args [][]byte
reqC chan error
} }
func newClient(c net.Conn, app *App) { func newClient(c net.Conn, app *App) {
@ -33,6 +35,8 @@ func newClient(c net.Conn, app *App) {
co.rb = bufio.NewReaderSize(c, 256) co.rb = bufio.NewReaderSize(c, 256)
co.wb = bufio.NewWriterSize(c, 256) co.wb = bufio.NewWriterSize(c, 256)
co.reqC = make(chan error, 1)
go co.run() go co.run()
} }
@ -52,7 +56,6 @@ func (c *client) run() {
for { for {
req, err := c.readRequest() req, err := c.readRequest()
if err != nil { if err != nil {
log.Info("read request error %v", err)
return return
} }
@ -143,7 +146,10 @@ func (c *client) handleRequest(req [][]byte) {
if !ok { if !ok {
err = ErrNotFound err = ErrNotFound
} else { } else {
err = f(c) go func() {
c.reqC <- f(c)
}()
err = <-c.reqC
} }
} }

View File

@ -91,7 +91,7 @@ func lindexCommand(c *client) error {
return err return err
} }
if v, err := c.app.list_index(args[0], index); err != nil { if v, err := c.app.list_index(args[0], int32(index)); err != nil {
return err return err
} else { } else {
c.writeBulk(v) c.writeBulk(v)
@ -120,7 +120,7 @@ func lrangeCommand(c *client) error {
return err return err
} }
if v, err := c.app.list_range(args[0], start, stop); err != nil { if v, err := c.app.list_range(args[0], int32(start), int32(stop)); err != nil {
return err return err
} else { } else {
c.writeArray(v) c.writeArray(v)

View File

@ -54,19 +54,11 @@ func testListRange(key []byte, start int64, stop int64, checkValues ...int) erro
} }
func testPrintList(key []byte) { func testPrintList(key []byte) {
headSeq, _ := testApp.db.GetString(encode_list_key(key, listHeadSeq))
tailSeq, _ := testApp.db.GetString(encode_list_key(key, listTailSeq))
size, _ := testApp.db.GetString(encode_lsize_key(key))
println("begin ---------------------")
println(headSeq, tailSeq, size)
it := testApp.db.Iterator(encode_list_key(key, listMinSeq), it := testApp.db.Iterator(encode_list_key(key, listMinSeq),
encode_list_key(key, listMaxSeq), 0, 0, -1) encode_list_key(key, listMaxSeq), 0, 0, -1)
for ; it.Valid(); it.Next() { for ; it.Valid(); it.Next() {
k, seq, _ := decode_list_key(it.Key()) k, seq, _ := decode_list_key(it.Key())
println(string(k), seq, string(it.Value())) println(string(k), "seq ", seq, "value:", string(it.Value()))
} }
println("end ---------------------") println("end ---------------------")
} }

View File

@ -25,7 +25,7 @@ const (
HASH_TYPE HASH_TYPE
HSIZE_TYPE HSIZE_TYPE
LIST_TYPE LIST_TYPE
LSIZE_TYPE LMETA_TYPE
ZSET_TYPE ZSET_TYPE
ZSIZE_TYPE ZSIZE_TYPE
ZSCORE_TYPE ZSCORE_TYPE

View File

@ -3,42 +3,40 @@ package ssdb
import ( import (
"encoding/binary" "encoding/binary"
"errors" "errors"
"github.com/siddontang/golib/hack"
"github.com/siddontang/golib/leveldb" "github.com/siddontang/golib/leveldb"
"strconv"
) )
const ( const (
listHeadSeq int64 = 1 listHeadSeq int32 = 1
listTailSeq int64 = 2 listTailSeq int32 = 2
listMinSeq int64 = 1000 listMinSeq int32 = 1000
listMaxSeq int64 = 1<<63 - 1000 listMaxSeq int32 = 1<<31 - 1000
listInitialSeq int64 = listMinSeq + (listMaxSeq-listMinSeq)/2 listInitialSeq int32 = listMinSeq + (listMaxSeq-listMinSeq)/2
) )
var errLSizeKey = errors.New("invalid lsize key") var errLMetaKey = errors.New("invalid lmeta key")
var errListKey = errors.New("invalid list key") var errListKey = errors.New("invalid list key")
var errListSeq = errors.New("invalid list sequence, overflow") var errListSeq = errors.New("invalid list sequence, overflow")
func encode_lsize_key(key []byte) []byte { func encode_lmeta_key(key []byte) []byte {
buf := make([]byte, len(key)+1) buf := make([]byte, len(key)+1)
buf[0] = LSIZE_TYPE buf[0] = LMETA_TYPE
copy(buf[1:], key) copy(buf[1:], key)
return buf return buf
} }
func decode_lsize_key(ek []byte) ([]byte, error) { func decode_lmeta_key(ek []byte) ([]byte, error) {
if len(ek) == 0 || ek[0] != LSIZE_TYPE { if len(ek) == 0 || ek[0] != LMETA_TYPE {
return nil, errLSizeKey return nil, errLMetaKey
} }
return ek[1:], nil return ek[1:], nil
} }
func encode_list_key(key []byte, seq int64) []byte { func encode_list_key(key []byte, seq int32) []byte {
buf := make([]byte, len(key)+13) buf := make([]byte, len(key)+9)
pos := 0 pos := 0
buf[pos] = LIST_TYPE buf[pos] = LIST_TYPE
@ -50,25 +48,25 @@ func encode_list_key(key []byte, seq int64) []byte {
copy(buf[pos:], key) copy(buf[pos:], key)
pos += len(key) pos += len(key)
binary.BigEndian.PutUint64(buf[pos:], uint64(seq)) binary.BigEndian.PutUint32(buf[pos:], uint32(seq))
return buf return buf
} }
func decode_list_key(ek []byte) (key []byte, seq int64, err error) { func decode_list_key(ek []byte) (key []byte, seq int32, err error) {
if len(ek) < 13 || ek[0] != LIST_TYPE { if len(ek) < 9 || ek[0] != LIST_TYPE {
err = errListKey err = errListKey
return return
} }
keyLen := int(binary.BigEndian.Uint32(ek[1:])) keyLen := int(binary.BigEndian.Uint32(ek[1:]))
if keyLen+13 != len(ek) { if keyLen+9 != len(ek) {
err = errListKey err = errListKey
return return
} }
key = ek[5 : 5+keyLen] key = ek[5 : 5+keyLen]
seq = int64(binary.BigEndian.Uint64(ek[5+keyLen:])) seq = int32(binary.BigEndian.Uint32(ek[5+keyLen:]))
return return
} }
@ -88,108 +86,143 @@ func (a *App) list_rpop(key []byte) ([]byte, error) {
return a.list_pop(key, listTailSeq) return a.list_pop(key, listTailSeq)
} }
func (a *App) list_getSeq(key []byte, whereSeq int64) (int64, error) { func (a *App) list_getSeq(key []byte, whereSeq int32) (int64, error) {
ek := encode_list_key(key, whereSeq) ek := encode_list_key(key, whereSeq)
return a.db.GetInt(ek) return Int64(a.db.Get(ek))
}
func (a *App) list_getMeta(ek []byte) (headSeq int32, tailSeq int32, size int32, err error) {
var v []byte
v, err = a.db.Get(ek)
if err != nil {
return
} else if v == nil {
size = 0
return
} else {
headSeq = int32(binary.LittleEndian.Uint32(v[0:4]))
tailSeq = int32(binary.LittleEndian.Uint32(v[4:8]))
size = int32(binary.LittleEndian.Uint32(v[8:]))
}
return
}
func (a *App) list_setMeta(ek []byte, headSeq int32, tailSeq int32, size int32) {
t := a.listTx
buf := make([]byte, 12)
binary.LittleEndian.PutUint32(buf[0:4], uint32(headSeq))
binary.LittleEndian.PutUint32(buf[4:8], uint32(tailSeq))
binary.LittleEndian.PutUint32(buf[8:], uint32(size))
t.Put(ek, buf)
} }
func (a *App) list_len(key []byte) (int64, error) { func (a *App) list_len(key []byte) (int64, error) {
ek := encode_lsize_key(key) ek := encode_lmeta_key(key)
_, _, size, err := a.list_getMeta(ek)
return a.db.GetInt(ek) return int64(size), err
} }
func (a *App) list_push(key []byte, args [][]byte, whereSeq int64) (int64, error) { func (a *App) list_push(key []byte, args [][]byte, whereSeq int32) (int64, error) {
t := a.listTx t := a.listTx
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
seq, err := a.list_getSeq(key, whereSeq) metaKey := encode_lmeta_key(key)
headSeq, tailSeq, size, err := a.list_getMeta(metaKey)
if err != nil { if err != nil {
return 0, err return 0, err
} }
var size int64 = 0 var delta int32 = 1
var seq int32 = 0
var delta int64 = 1
if whereSeq == listHeadSeq { if whereSeq == listHeadSeq {
delta = -1 delta = -1
} seq = headSeq
if seq == 0 {
seq = listInitialSeq
t.Put(encode_list_key(key, listHeadSeq), hack.Slice(strconv.FormatInt(seq, 10)))
t.Put(encode_list_key(key, listTailSeq), hack.Slice(strconv.FormatInt(seq, 10)))
} else { } else {
size, err = a.list_len(key) seq = tailSeq
if err != nil {
return 0, err
} }
if size == 0 {
headSeq = listInitialSeq
tailSeq = listInitialSeq
seq = headSeq
} else {
seq += delta seq += delta
} }
for i := 0; i < len(args); i++ { for i := 0; i < len(args); i++ {
t.Put(encode_list_key(key, seq+int64(i)*delta), args[i]) t.Put(encode_list_key(key, seq+int32(i)*delta), args[i])
//to do add binlog //to do add binlog
} }
seq += int64(len(args)-1) * delta seq += int32(len(args)-1) * delta
if seq <= listMinSeq || seq >= listMaxSeq { if seq <= listMinSeq || seq >= listMaxSeq {
return 0, errListSeq return 0, errListSeq
} }
size += int64(len(args)) size += int32(len(args))
t.Put(encode_lsize_key(key), hack.Slice(strconv.FormatInt(size, 10))) if whereSeq == listHeadSeq {
t.Put(encode_list_key(key, whereSeq), hack.Slice(strconv.FormatInt(seq, 10))) headSeq = seq
} else {
tailSeq = seq
}
a.list_setMeta(metaKey, headSeq, tailSeq, size)
err = t.Commit() err = t.Commit()
return size, err return int64(size), err
} }
func (a *App) list_pop(key []byte, whereSeq int64) ([]byte, error) { func (a *App) list_pop(key []byte, whereSeq int32) ([]byte, error) {
t := a.listTx t := a.listTx
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
var delta int64 = 1 metaKey := encode_lmeta_key(key)
if whereSeq == listTailSeq { headSeq, tailSeq, size, err := a.list_getMeta(metaKey)
if err != nil {
return nil, err
}
var seq int32 = 0
var delta int32 = 1
if whereSeq == listHeadSeq {
seq = headSeq
} else {
delta = -1 delta = -1
seq = tailSeq
} }
seq, err := a.list_getSeq(key, whereSeq) itemKey := encode_list_key(key, seq)
if err != nil {
return nil, err
}
var value []byte var value []byte
value, err = a.db.Get(encode_list_key(key, seq)) value, err = a.db.Get(itemKey)
if err != nil { if err != nil {
return nil, err return nil, err
} }
t.Delete(encode_list_key(key, seq)) t.Delete(itemKey)
seq += delta seq += delta
var size int64
size, err = a.list_len(key)
if err != nil {
return nil, err
}
size-- size--
if size <= 0 { if size <= 0 {
t.Delete(encode_lsize_key(key)) t.Delete(metaKey)
t.Delete(encode_list_key(key, listHeadSeq))
t.Delete(encode_list_key(key, listTailSeq))
} else { } else {
t.Put(encode_list_key(key, whereSeq), hack.Slice(strconv.FormatInt(seq, 10))) if whereSeq == listHeadSeq {
t.Put(encode_lsize_key(key), hack.Slice(strconv.FormatInt(size, 10))) headSeq = seq
} else {
tailSeq = seq
}
a.list_setMeta(metaKey, headSeq, tailSeq, size)
} }
//todo add binlog //todo add binlog
@ -197,47 +230,31 @@ func (a *App) list_pop(key []byte, whereSeq int64) ([]byte, error) {
return value, err return value, err
} }
func (a *App) list_range(key []byte, start int64, stop int64) ([]interface{}, error) { func (a *App) list_range(key []byte, start int32, stop int32) ([]interface{}, error) {
v := make([]interface{}, 0, 16) v := make([]interface{}, 0, 16)
var startSeq int64 var startSeq int32
var stopSeq int64 var stopSeq int32
if start > stop { if start > stop {
return []interface{}{}, nil return []interface{}{}, nil
} else if start >= 0 && stop >= 0 { }
seq, err := a.list_getSeq(key, listHeadSeq)
headSeq, tailSeq, _, err := a.list_getMeta(encode_lmeta_key(key))
if err != nil { if err != nil {
return nil, err return nil, err
} }
startSeq = seq + start if start >= 0 && stop >= 0 {
stopSeq = seq + stop startSeq = headSeq + start
stopSeq = headSeq + stop
} else if start < 0 && stop < 0 { } else if start < 0 && stop < 0 {
seq, err := a.list_getSeq(key, listTailSeq) startSeq = tailSeq + start + 1
if err != nil { stopSeq = tailSeq + stop + 1
return nil, err
}
startSeq = seq + start + 1
stopSeq = seq + stop + 1
} else { } else {
//start < 0 && stop > 0 //start < 0 && stop > 0
var err error startSeq = tailSeq + start + 1
startSeq, err = a.list_getSeq(key, listTailSeq) stopSeq = headSeq + stop
if err != nil {
return nil, err
}
startSeq += start + 1
stopSeq, err = a.list_getSeq(key, listHeadSeq)
if err != nil {
return nil, err
}
stopSeq += stop
} }
if startSeq < listMinSeq { if startSeq < listMinSeq {
@ -257,24 +274,17 @@ func (a *App) list_range(key []byte, start int64, stop int64) ([]interface{}, er
return v, nil return v, nil
} }
func (a *App) list_index(key []byte, index int64) ([]byte, error) { func (a *App) list_index(key []byte, index int32) ([]byte, error) {
var seq int64 var seq int32
var err error headSeq, tailSeq, _, err := a.list_getMeta(encode_lmeta_key(key))
if err != nil {
return nil, err
}
if index >= 0 { if index >= 0 {
seq, err = a.list_getSeq(key, listHeadSeq) seq = headSeq + index
if err != nil {
return nil, err
}
seq = seq + index
} else { } else {
seq, err = a.list_getSeq(key, listTailSeq) seq = tailSeq + index + 1
if err != nil {
return nil, err
}
seq = seq + index + 1
} }
return a.db.Get(encode_list_key(key, seq)) return a.db.Get(encode_list_key(key, seq))

View File

@ -161,7 +161,7 @@ func (a *App) zset_setItem(key []byte, score int64, member []byte) (int64, error
} else if v != nil { } else if v != nil {
exists = 1 exists = 1
if s, err := strconv.ParseInt(hack.String(v), 10, 64); err != nil { if s, err := Int64(v, err); err != nil {
return 0, err return 0, err
} else { } else {
sk := encode_zscore_key(key, member, s) sk := encode_zscore_key(key, member, s)
@ -169,7 +169,7 @@ func (a *App) zset_setItem(key []byte, score int64, member []byte) (int64, error
} }
} }
t.Put(ek, hack.Slice(strconv.FormatInt(score, 10))) t.Put(ek, PutInt64(score))
sk := encode_zscore_key(key, member, score) sk := encode_zscore_key(key, member, score)
t.Put(sk, []byte{}) t.Put(sk, []byte{})
@ -190,7 +190,7 @@ func (a *App) zset_delItem(key []byte, member []byte, skipDelScore bool) (int64,
//exists //exists
if !skipDelScore { if !skipDelScore {
//we must del score //we must del score
if s, err := strconv.ParseInt(hack.String(v), 10, 64); err != nil { if s, err := Int64(v, err); err != nil {
return 0, err return 0, err
} else { } else {
sk := encode_zscore_key(key, member, s) sk := encode_zscore_key(key, member, s)
@ -233,7 +233,7 @@ func (a *App) zset_add(key []byte, args []interface{}) (int64, error) {
func (a *App) zset_incrSize(key []byte, delta int64) (int64, error) { func (a *App) zset_incrSize(key []byte, delta int64) (int64, error) {
t := a.zsetTx t := a.zsetTx
sk := encode_zsize_key(key) sk := encode_zsize_key(key)
size, err := a.db.GetInt(sk) size, err := Int64(a.db.Get(sk))
if err != nil { if err != nil {
return 0, err return 0, err
} else { } else {
@ -242,7 +242,7 @@ func (a *App) zset_incrSize(key []byte, delta int64) (int64, error) {
size = 0 size = 0
t.Delete(sk) t.Delete(sk)
} else { } else {
t.Put(sk, hack.Slice(strconv.FormatInt(size, 10))) t.Put(sk, PutInt64(size))
} }
} }
@ -251,13 +251,18 @@ func (a *App) zset_incrSize(key []byte, delta int64) (int64, error) {
func (a *App) zset_card(key []byte) (int64, error) { func (a *App) zset_card(key []byte) (int64, error) {
sk := encode_zsize_key(key) sk := encode_zsize_key(key)
size, err := a.db.GetInt(sk) size, err := Int64(a.db.Get(sk))
return size, err return size, err
} }
func (a *App) zset_score(key []byte, member []byte) ([]byte, error) { func (a *App) zset_score(key []byte, member []byte) ([]byte, error) {
k := encode_zset_key(key, member) k := encode_zset_key(key, member)
return a.db.Get(k) score, err := Int64(a.db.Get(k))
if err != nil {
return nil, err
}
return hack.Slice(strconv.FormatInt(score, 10)), nil
} }
func (a *App) zset_rem(key []byte, args [][]byte) (int64, error) { func (a *App) zset_rem(key []byte, args [][]byte) (int64, error) {
@ -293,7 +298,7 @@ func (a *App) zset_incrby(key []byte, delta int64, member []byte) ([]byte, error
if err != nil { if err != nil {
return nil, err return nil, err
} else if v != nil { } else if v != nil {
if s, err := strconv.ParseInt(hack.String(v), 10, 64); err != nil { if s, err := Int64(v, err); err != nil {
return nil, err return nil, err
} else { } else {
sk := encode_zscore_key(key, member, s) sk := encode_zscore_key(key, member, s)
@ -309,13 +314,12 @@ func (a *App) zset_incrby(key []byte, delta int64, member []byte) ([]byte, error
a.zset_incrSize(key, 1) a.zset_incrSize(key, 1)
} }
buf := hack.Slice(strconv.FormatInt(score, 10)) t.Put(ek, PutInt64(score))
t.Put(ek, buf)
t.Put(encode_zscore_key(key, member, score), []byte{}) t.Put(encode_zscore_key(key, member, score), []byte{})
err = t.Commit() err = t.Commit()
return buf, err return hack.Slice(strconv.FormatInt(score, 10)), err
} }
func (a *App) zset_count(key []byte, min int64, max int64) (int64, error) { func (a *App) zset_count(key []byte, min int64, max int64) (int64, error) {
@ -342,7 +346,7 @@ func (a *App) zset_rank(key []byte, member []byte, reverse bool) (int64, error)
} else if v == nil { } else if v == nil {
return -1, nil return -1, nil
} else { } else {
if s, err := strconv.ParseInt(hack.String(v), 10, 64); err != nil { if s, err := Int64(v, err); err != nil {
return 0, err return 0, err
} else { } else {
var it *leveldb.Iterator var it *leveldb.Iterator

29
ssdb/util.go Normal file
View File

@ -0,0 +1,29 @@
package ssdb
import (
"encoding/binary"
"errors"
"github.com/siddontang/golib/hack"
)
var errIntNumber = errors.New("invalid integer")
func Int64(v []byte, err error) (int64, error) {
if err != nil {
return 0, err
} else if v == nil || len(v) == 0 {
return 0, nil
} else if len(v) != 8 {
return 0, errIntNumber
}
return int64(binary.LittleEndian.Uint64(v)), nil
}
func PutInt64(v int64) []byte {
return hack.Int64Slice(v)
}
func PutInt32(v int32) []byte {
return hack.Int32Slice(v)
}