diff --git a/cmd/ssdb/main.go b/cmd/ssdb/main.go index 64b50c2..92ba933 100644 --- a/cmd/ssdb/main.go +++ b/cmd/ssdb/main.go @@ -5,12 +5,15 @@ import ( "github.com/siddontang/go-ssdb/ssdb" "os" "os/signal" + "runtime" "syscall" ) var configFile = flag.String("config", "", "ssdb config file") func main() { + runtime.GOMAXPROCS(runtime.NumCPU()) + flag.Parse() if len(*configFile) == 0 { diff --git a/etc/ssdb.json b/etc/ssdb.json index d0bcd0f..ca79fdf 100644 --- a/etc/ssdb.json +++ b/etc/ssdb.json @@ -2,9 +2,9 @@ "addr": "127.0.0.1:6380", "leveldb": { "path": "/tmp/ssdb", - "compression": true, + "compression": false, "block_size": 32768, - "write_buffer_size": 2097152, - "cache_size": 20971520 + "write_buffer_size": 67108864, + "cache_size": 524288000 } } \ No newline at end of file diff --git a/ssdb/client.go b/ssdb/client.go index 8c7b296..999aac9 100644 --- a/ssdb/client.go +++ b/ssdb/client.go @@ -23,6 +23,8 @@ type client struct { cmd string args [][]byte + + reqC chan error } func newClient(c net.Conn, app *App) { @@ -33,6 +35,8 @@ func newClient(c net.Conn, app *App) { co.rb = bufio.NewReaderSize(c, 256) co.wb = bufio.NewWriterSize(c, 256) + co.reqC = make(chan error, 1) + go co.run() } @@ -52,7 +56,6 @@ func (c *client) run() { for { req, err := c.readRequest() if err != nil { - log.Info("read request error %v", err) return } @@ -143,7 +146,10 @@ func (c *client) handleRequest(req [][]byte) { if !ok { err = ErrNotFound } else { - err = f(c) + go func() { + c.reqC <- f(c) + }() + err = <-c.reqC } } diff --git a/ssdb/cmd_list.go b/ssdb/cmd_list.go index 4fba6e0..a680a79 100644 --- a/ssdb/cmd_list.go +++ b/ssdb/cmd_list.go @@ -91,7 +91,7 @@ func lindexCommand(c *client) error { return err } - if v, err := c.app.list_index(args[0], index); err != nil { + if v, err := c.app.list_index(args[0], int32(index)); err != nil { return err } else { c.writeBulk(v) @@ -120,7 +120,7 @@ func lrangeCommand(c *client) error { return err } - if v, err := c.app.list_range(args[0], start, stop); err != nil { + if v, err := c.app.list_range(args[0], int32(start), int32(stop)); err != nil { return err } else { c.writeArray(v) diff --git a/ssdb/cmd_list_test.go b/ssdb/cmd_list_test.go index a343b93..4f14731 100644 --- a/ssdb/cmd_list_test.go +++ b/ssdb/cmd_list_test.go @@ -54,19 +54,11 @@ func testListRange(key []byte, start int64, stop int64, checkValues ...int) erro } func testPrintList(key []byte) { - headSeq, _ := testApp.db.GetString(encode_list_key(key, listHeadSeq)) - tailSeq, _ := testApp.db.GetString(encode_list_key(key, listTailSeq)) - - size, _ := testApp.db.GetString(encode_lsize_key(key)) - - println("begin ---------------------") - println(headSeq, tailSeq, size) - it := testApp.db.Iterator(encode_list_key(key, listMinSeq), encode_list_key(key, listMaxSeq), 0, 0, -1) for ; it.Valid(); it.Next() { k, seq, _ := decode_list_key(it.Key()) - println(string(k), seq, string(it.Value())) + println(string(k), "seq ", seq, "value:", string(it.Value())) } println("end ---------------------") } diff --git a/ssdb/const.go b/ssdb/const.go index 93a830f..c2cd1ba 100644 --- a/ssdb/const.go +++ b/ssdb/const.go @@ -25,7 +25,7 @@ const ( HASH_TYPE HSIZE_TYPE LIST_TYPE - LSIZE_TYPE + LMETA_TYPE ZSET_TYPE ZSIZE_TYPE ZSCORE_TYPE diff --git a/ssdb/t_list.go b/ssdb/t_list.go index 0fd0ad4..7264cb6 100644 --- a/ssdb/t_list.go +++ b/ssdb/t_list.go @@ -3,42 +3,40 @@ package ssdb import ( "encoding/binary" "errors" - "github.com/siddontang/golib/hack" "github.com/siddontang/golib/leveldb" - "strconv" ) const ( - listHeadSeq int64 = 1 - listTailSeq int64 = 2 + listHeadSeq int32 = 1 + listTailSeq int32 = 2 - listMinSeq int64 = 1000 - listMaxSeq int64 = 1<<63 - 1000 - listInitialSeq int64 = listMinSeq + (listMaxSeq-listMinSeq)/2 + listMinSeq int32 = 1000 + listMaxSeq int32 = 1<<31 - 1000 + listInitialSeq int32 = listMinSeq + (listMaxSeq-listMinSeq)/2 ) -var errLSizeKey = errors.New("invalid lsize key") +var errLMetaKey = errors.New("invalid lmeta key") var errListKey = errors.New("invalid list key") var errListSeq = errors.New("invalid list sequence, overflow") -func encode_lsize_key(key []byte) []byte { +func encode_lmeta_key(key []byte) []byte { buf := make([]byte, len(key)+1) - buf[0] = LSIZE_TYPE + buf[0] = LMETA_TYPE copy(buf[1:], key) return buf } -func decode_lsize_key(ek []byte) ([]byte, error) { - if len(ek) == 0 || ek[0] != LSIZE_TYPE { - return nil, errLSizeKey +func decode_lmeta_key(ek []byte) ([]byte, error) { + if len(ek) == 0 || ek[0] != LMETA_TYPE { + return nil, errLMetaKey } return ek[1:], nil } -func encode_list_key(key []byte, seq int64) []byte { - buf := make([]byte, len(key)+13) +func encode_list_key(key []byte, seq int32) []byte { + buf := make([]byte, len(key)+9) pos := 0 buf[pos] = LIST_TYPE @@ -50,25 +48,25 @@ func encode_list_key(key []byte, seq int64) []byte { copy(buf[pos:], key) pos += len(key) - binary.BigEndian.PutUint64(buf[pos:], uint64(seq)) + binary.BigEndian.PutUint32(buf[pos:], uint32(seq)) return buf } -func decode_list_key(ek []byte) (key []byte, seq int64, err error) { - if len(ek) < 13 || ek[0] != LIST_TYPE { +func decode_list_key(ek []byte) (key []byte, seq int32, err error) { + if len(ek) < 9 || ek[0] != LIST_TYPE { err = errListKey return } keyLen := int(binary.BigEndian.Uint32(ek[1:])) - if keyLen+13 != len(ek) { + if keyLen+9 != len(ek) { err = errListKey return } key = ek[5 : 5+keyLen] - seq = int64(binary.BigEndian.Uint64(ek[5+keyLen:])) + seq = int32(binary.BigEndian.Uint32(ek[5+keyLen:])) return } @@ -88,108 +86,143 @@ func (a *App) list_rpop(key []byte) ([]byte, error) { return a.list_pop(key, listTailSeq) } -func (a *App) list_getSeq(key []byte, whereSeq int64) (int64, error) { +func (a *App) list_getSeq(key []byte, whereSeq int32) (int64, error) { ek := encode_list_key(key, whereSeq) - return a.db.GetInt(ek) + return Int64(a.db.Get(ek)) +} + +func (a *App) list_getMeta(ek []byte) (headSeq int32, tailSeq int32, size int32, err error) { + var v []byte + v, err = a.db.Get(ek) + if err != nil { + return + } else if v == nil { + size = 0 + return + } else { + headSeq = int32(binary.LittleEndian.Uint32(v[0:4])) + tailSeq = int32(binary.LittleEndian.Uint32(v[4:8])) + size = int32(binary.LittleEndian.Uint32(v[8:])) + } + return +} + +func (a *App) list_setMeta(ek []byte, headSeq int32, tailSeq int32, size int32) { + t := a.listTx + + buf := make([]byte, 12) + + binary.LittleEndian.PutUint32(buf[0:4], uint32(headSeq)) + binary.LittleEndian.PutUint32(buf[4:8], uint32(tailSeq)) + binary.LittleEndian.PutUint32(buf[8:], uint32(size)) + + t.Put(ek, buf) } func (a *App) list_len(key []byte) (int64, error) { - ek := encode_lsize_key(key) - - return a.db.GetInt(ek) + ek := encode_lmeta_key(key) + _, _, size, err := a.list_getMeta(ek) + return int64(size), err } -func (a *App) list_push(key []byte, args [][]byte, whereSeq int64) (int64, error) { +func (a *App) list_push(key []byte, args [][]byte, whereSeq int32) (int64, error) { t := a.listTx t.Lock() defer t.Unlock() - seq, err := a.list_getSeq(key, whereSeq) + metaKey := encode_lmeta_key(key) + headSeq, tailSeq, size, err := a.list_getMeta(metaKey) + if err != nil { return 0, err } - var size int64 = 0 - - var delta int64 = 1 + var delta int32 = 1 + var seq int32 = 0 if whereSeq == listHeadSeq { delta = -1 + seq = headSeq + } else { + seq = tailSeq } - if seq == 0 { - seq = listInitialSeq - - t.Put(encode_list_key(key, listHeadSeq), hack.Slice(strconv.FormatInt(seq, 10))) - t.Put(encode_list_key(key, listTailSeq), hack.Slice(strconv.FormatInt(seq, 10))) + if size == 0 { + headSeq = listInitialSeq + tailSeq = listInitialSeq + seq = headSeq } else { - size, err = a.list_len(key) - if err != nil { - return 0, err - } - seq += delta } for i := 0; i < len(args); i++ { - t.Put(encode_list_key(key, seq+int64(i)*delta), args[i]) + t.Put(encode_list_key(key, seq+int32(i)*delta), args[i]) //to do add binlog } - seq += int64(len(args)-1) * delta + seq += int32(len(args)-1) * delta if seq <= listMinSeq || seq >= listMaxSeq { return 0, errListSeq } - size += int64(len(args)) + size += int32(len(args)) - t.Put(encode_lsize_key(key), hack.Slice(strconv.FormatInt(size, 10))) - t.Put(encode_list_key(key, whereSeq), hack.Slice(strconv.FormatInt(seq, 10))) + if whereSeq == listHeadSeq { + headSeq = seq + } else { + tailSeq = seq + } + + a.list_setMeta(metaKey, headSeq, tailSeq, size) err = t.Commit() - return size, err + return int64(size), err } -func (a *App) list_pop(key []byte, whereSeq int64) ([]byte, error) { +func (a *App) list_pop(key []byte, whereSeq int32) ([]byte, error) { t := a.listTx t.Lock() defer t.Unlock() - var delta int64 = 1 - if whereSeq == listTailSeq { + metaKey := encode_lmeta_key(key) + headSeq, tailSeq, size, err := a.list_getMeta(metaKey) + + if err != nil { + return nil, err + } + + var seq int32 = 0 + var delta int32 = 1 + if whereSeq == listHeadSeq { + seq = headSeq + } else { delta = -1 + seq = tailSeq } - seq, err := a.list_getSeq(key, whereSeq) - if err != nil { - return nil, err - } - + itemKey := encode_list_key(key, seq) var value []byte - value, err = a.db.Get(encode_list_key(key, seq)) + value, err = a.db.Get(itemKey) if err != nil { return nil, err } - t.Delete(encode_list_key(key, seq)) + t.Delete(itemKey) seq += delta - var size int64 - size, err = a.list_len(key) - if err != nil { - return nil, err - } - size-- if size <= 0 { - t.Delete(encode_lsize_key(key)) - t.Delete(encode_list_key(key, listHeadSeq)) - t.Delete(encode_list_key(key, listTailSeq)) + t.Delete(metaKey) } else { - t.Put(encode_list_key(key, whereSeq), hack.Slice(strconv.FormatInt(seq, 10))) - t.Put(encode_lsize_key(key), hack.Slice(strconv.FormatInt(size, 10))) + if whereSeq == listHeadSeq { + headSeq = seq + } else { + tailSeq = seq + } + + a.list_setMeta(metaKey, headSeq, tailSeq, size) } //todo add binlog @@ -197,47 +230,31 @@ func (a *App) list_pop(key []byte, whereSeq int64) ([]byte, error) { return value, err } -func (a *App) list_range(key []byte, start int64, stop int64) ([]interface{}, error) { +func (a *App) list_range(key []byte, start int32, stop int32) ([]interface{}, error) { v := make([]interface{}, 0, 16) - var startSeq int64 - var stopSeq int64 + var startSeq int32 + var stopSeq int32 if start > stop { return []interface{}{}, nil - } else if start >= 0 && stop >= 0 { - seq, err := a.list_getSeq(key, listHeadSeq) - if err != nil { - return nil, err - } + } - startSeq = seq + start - stopSeq = seq + stop + headSeq, tailSeq, _, err := a.list_getMeta(encode_lmeta_key(key)) + if err != nil { + return nil, err + } + if start >= 0 && stop >= 0 { + startSeq = headSeq + start + stopSeq = headSeq + stop } else if start < 0 && stop < 0 { - seq, err := a.list_getSeq(key, listTailSeq) - if err != nil { - return nil, err - } - - startSeq = seq + start + 1 - stopSeq = seq + stop + 1 + startSeq = tailSeq + start + 1 + stopSeq = tailSeq + stop + 1 } else { //start < 0 && stop > 0 - var err error - startSeq, err = a.list_getSeq(key, listTailSeq) - if err != nil { - return nil, err - } - - startSeq += start + 1 - - stopSeq, err = a.list_getSeq(key, listHeadSeq) - if err != nil { - return nil, err - } - - stopSeq += stop + startSeq = tailSeq + start + 1 + stopSeq = headSeq + stop } if startSeq < listMinSeq { @@ -257,24 +274,17 @@ func (a *App) list_range(key []byte, start int64, stop int64) ([]interface{}, er return v, nil } -func (a *App) list_index(key []byte, index int64) ([]byte, error) { - var seq int64 - var err error +func (a *App) list_index(key []byte, index int32) ([]byte, error) { + var seq int32 + headSeq, tailSeq, _, err := a.list_getMeta(encode_lmeta_key(key)) + if err != nil { + return nil, err + } + if index >= 0 { - seq, err = a.list_getSeq(key, listHeadSeq) - if err != nil { - return nil, err - } - - seq = seq + index - + seq = headSeq + index } else { - seq, err = a.list_getSeq(key, listTailSeq) - if err != nil { - return nil, err - } - - seq = seq + index + 1 + seq = tailSeq + index + 1 } return a.db.Get(encode_list_key(key, seq)) diff --git a/ssdb/t_zset.go b/ssdb/t_zset.go index 7de1d9c..290370a 100644 --- a/ssdb/t_zset.go +++ b/ssdb/t_zset.go @@ -161,7 +161,7 @@ func (a *App) zset_setItem(key []byte, score int64, member []byte) (int64, error } else if v != nil { exists = 1 - if s, err := strconv.ParseInt(hack.String(v), 10, 64); err != nil { + if s, err := Int64(v, err); err != nil { return 0, err } else { sk := encode_zscore_key(key, member, s) @@ -169,7 +169,7 @@ func (a *App) zset_setItem(key []byte, score int64, member []byte) (int64, error } } - t.Put(ek, hack.Slice(strconv.FormatInt(score, 10))) + t.Put(ek, PutInt64(score)) sk := encode_zscore_key(key, member, score) t.Put(sk, []byte{}) @@ -190,7 +190,7 @@ func (a *App) zset_delItem(key []byte, member []byte, skipDelScore bool) (int64, //exists if !skipDelScore { //we must del score - if s, err := strconv.ParseInt(hack.String(v), 10, 64); err != nil { + if s, err := Int64(v, err); err != nil { return 0, err } else { sk := encode_zscore_key(key, member, s) @@ -233,7 +233,7 @@ func (a *App) zset_add(key []byte, args []interface{}) (int64, error) { func (a *App) zset_incrSize(key []byte, delta int64) (int64, error) { t := a.zsetTx sk := encode_zsize_key(key) - size, err := a.db.GetInt(sk) + size, err := Int64(a.db.Get(sk)) if err != nil { return 0, err } else { @@ -242,7 +242,7 @@ func (a *App) zset_incrSize(key []byte, delta int64) (int64, error) { size = 0 t.Delete(sk) } else { - t.Put(sk, hack.Slice(strconv.FormatInt(size, 10))) + t.Put(sk, PutInt64(size)) } } @@ -251,13 +251,18 @@ func (a *App) zset_incrSize(key []byte, delta int64) (int64, error) { func (a *App) zset_card(key []byte) (int64, error) { sk := encode_zsize_key(key) - size, err := a.db.GetInt(sk) + size, err := Int64(a.db.Get(sk)) return size, err } func (a *App) zset_score(key []byte, member []byte) ([]byte, error) { k := encode_zset_key(key, member) - return a.db.Get(k) + score, err := Int64(a.db.Get(k)) + if err != nil { + return nil, err + } + + return hack.Slice(strconv.FormatInt(score, 10)), nil } func (a *App) zset_rem(key []byte, args [][]byte) (int64, error) { @@ -293,7 +298,7 @@ func (a *App) zset_incrby(key []byte, delta int64, member []byte) ([]byte, error if err != nil { return nil, err } else if v != nil { - if s, err := strconv.ParseInt(hack.String(v), 10, 64); err != nil { + if s, err := Int64(v, err); err != nil { return nil, err } else { sk := encode_zscore_key(key, member, s) @@ -309,13 +314,12 @@ func (a *App) zset_incrby(key []byte, delta int64, member []byte) ([]byte, error a.zset_incrSize(key, 1) } - buf := hack.Slice(strconv.FormatInt(score, 10)) - t.Put(ek, buf) + t.Put(ek, PutInt64(score)) t.Put(encode_zscore_key(key, member, score), []byte{}) err = t.Commit() - return buf, err + return hack.Slice(strconv.FormatInt(score, 10)), err } func (a *App) zset_count(key []byte, min int64, max int64) (int64, error) { @@ -342,7 +346,7 @@ func (a *App) zset_rank(key []byte, member []byte, reverse bool) (int64, error) } else if v == nil { return -1, nil } else { - if s, err := strconv.ParseInt(hack.String(v), 10, 64); err != nil { + if s, err := Int64(v, err); err != nil { return 0, err } else { var it *leveldb.Iterator diff --git a/ssdb/util.go b/ssdb/util.go new file mode 100644 index 0000000..61c2b83 --- /dev/null +++ b/ssdb/util.go @@ -0,0 +1,29 @@ +package ssdb + +import ( + "encoding/binary" + "errors" + "github.com/siddontang/golib/hack" +) + +var errIntNumber = errors.New("invalid integer") + +func Int64(v []byte, err error) (int64, error) { + if err != nil { + return 0, err + } else if v == nil || len(v) == 0 { + return 0, nil + } else if len(v) != 8 { + return 0, errIntNumber + } + + return int64(binary.LittleEndian.Uint64(v)), nil +} + +func PutInt64(v int64) []byte { + return hack.Int64Slice(v) +} + +func PutInt32(v int32) []byte { + return hack.Int32Slice(v) +}