add kv support

This commit is contained in:
siddontang 2014-05-04 19:02:55 +08:00
parent 687805549e
commit 455c476682
6 changed files with 430 additions and 4 deletions

View File

@ -4,6 +4,7 @@ import (
"github.com/siddontang/golib/leveldb" "github.com/siddontang/golib/leveldb"
"net" "net"
"strings" "strings"
"sync"
) )
type App struct { type App struct {
@ -12,6 +13,11 @@ type App struct {
listener net.Listener listener net.Listener
db *leveldb.DB db *leveldb.DB
kvMutex sync.Mutex
hashMutex sync.Mutex
listMutex sync.Mutex
zsetMutex sync.Mutex
} }
func NewApp(cfg *Config) (*App, error) { func NewApp(cfg *Config) (*App, error) {
@ -36,6 +42,8 @@ func NewApp(cfg *Config) (*App, error) {
return nil, err return nil, err
} }
app.dbMutex = newKeyMutex(128)
return app, nil return app, nil
} }
@ -55,3 +63,7 @@ func (app *App) Run() {
newClient(conn, app) newClient(conn, app)
} }
} }
func (app *App) getMutex(key []byte) *sync.Mutex {
return app.dbMutex.Get(key)
}

View File

@ -1,54 +1,200 @@
package ssdb package ssdb
import (
"github.com/siddontang/golib/hack"
"strconv"
)
func getCommand(c *client) error { func getCommand(c *client) error {
args := c.args
if len(args) != 1 {
return ErrCmdParams
}
if v, err := c.app.kv_get(args[0]); err != nil {
return err
} else {
c.writeBulk(v)
}
return nil return nil
} }
func setCommand(c *client) error { func setCommand(c *client) error {
args := c.args
if len(args) < 2 {
return ErrCmdParams
}
if err := c.app.kv_set(args[0], args[1]); err != nil {
return err
} else {
c.writeStatus(OK)
}
return nil return nil
} }
func getsetCommand(c *client) error { func getsetCommand(c *client) error {
args := c.args
if len(args) != 2 {
return ErrCmdParams
}
if v, err := c.app.kv_getset(args[0], args[1]); err != nil {
return err
} else {
c.writeBulk(v)
}
return nil return nil
} }
func setnxCommand(c *client) error { func setnxCommand(c *client) error {
args := c.args
if len(args) != 2 {
return ErrCmdParams
}
if n, err := c.app.kv_setnx(args[0], args[1]); err != nil {
return err
} else {
c.writeInteger(n)
}
return nil return nil
} }
func existsCommand(c *client) error { func existsCommand(c *client) error {
args := c.args
if len(args) != 1 {
return ErrCmdParams
}
if n, err := c.app.kv_exists(args[0]); err != nil {
return err
} else {
c.writeInteger(n)
}
return nil return nil
} }
func incrCommand(c *client) error { func incrCommand(c *client) error {
args := c.args
if len(args) != 1 {
return ErrCmdParams
}
if n, err := c.app.kv_incr(c.args[0], 1); err != nil {
return err
} else {
c.writeInteger(n)
}
return nil return nil
} }
func decrCommand(c *client) error { func decrCommand(c *client) error {
args := c.args
if len(args) != 1 {
return ErrCmdParams
}
if n, err := c.app.kv_incr(c.args[0], -1); err != nil {
return err
} else {
c.writeInteger(n)
}
return nil return nil
} }
func incrbyCommand(c *client) error { func incrbyCommand(c *client) error {
args := c.args
if len(args) != 2 {
return ErrCmdParams
}
delta, err := strconv.ParseInt(hack.String(args[1]), 10, 64)
if err != nil {
return err
}
if n, err := c.app.kv_incr(c.args[0], delta); err != nil {
return err
} else {
c.writeInteger(n)
}
return nil return nil
} }
func decrbyCommand(c *client) error { func decrbyCommand(c *client) error {
args := c.args
if len(args) != 2 {
return ErrCmdParams
}
delta, err := strconv.ParseInt(hack.String(args[1]), 10, 64)
if err != nil {
return err
}
if n, err := c.app.kv_incr(c.args[0], -delta); err != nil {
return err
} else {
c.writeInteger(n)
}
return nil return nil
} }
func delCommand(c *client) error { func delCommand(c *client) error {
args := c.args
if len(args) == 0 {
return ErrCmdParams
}
if n, err := c.app.tx_del(args); err != nil {
return err
} else {
c.writeInteger(n)
}
return nil return nil
} }
func msetCommand(c *client) error { func msetCommand(c *client) error {
args := c.args
if len(args) == 0 || len(args)%2 != 0 {
return ErrCmdParams
}
if err := c.app.tx_mset(args); err != nil {
return err
} else {
c.writeStatus(OK)
}
return nil return nil
} }
func setexCommand(c *client) error { // func setexCommand(c *client) error {
return nil // return nil
} // }
func mgetCommand(c *client) error { func mgetCommand(c *client) error {
args := c.args
if len(args) == 0 {
return ErrCmdParams
}
if v, err := c.app.kv_mget(args); err != nil {
return err
} else {
c.writeArray(v)
}
return nil return nil
} }
@ -64,6 +210,6 @@ func init() {
register("mget", mgetCommand) register("mget", mgetCommand)
register("mset", msetCommand) register("mset", msetCommand)
register("set", setCommand) register("set", setCommand)
register("setex", setexCommand) // register("setex", setexCommand)
register("setnx", setnxCommand) register("setnx", setnxCommand)
} }

View File

@ -19,3 +19,13 @@ var (
PONG = "PONG" PONG = "PONG"
OK = "OK" OK = "OK"
) )
const (
KV_TYPE byte = iota + 1
HASH_TYPE
HSIZE_TYPE
ZSET_TYPE
ZSIZE_TYPE
LIST_TYPE
LSIZE_TYPE
)

27
ssdb/lock.go Normal file
View File

@ -0,0 +1,27 @@
package ssdb
import (
"hash/crc32"
"sync"
)
type keyMutex struct {
mutexs []*sync.Mutex
}
func newKeyMutex(size int) *keyMutex {
m := new(keyMutex)
m.mutexs = make([]*sync.Mutex, size)
for i := range m.mutexs {
m.mutexs[i] = &sync.Mutex{}
}
return m
}
func (k *keyMutex) Get(key []byte) *sync.Mutex {
h := int(crc32.ChecksumIEEE(key))
return k.mutexs[h%len(k.mutexs)]
}

192
ssdb/t_kv.go Normal file
View File

@ -0,0 +1,192 @@
package ssdb
import (
"errors"
"github.com/siddontang/golib/hack"
"strconv"
)
var errEmptyKVKey = errors.New("invalid empty kv key")
var errKVKey = errors.New("invalid encode kv key")
func encode_kv_key(key []byte) []byte {
ek := make([]byte, key+1)
ek[0] = KV_TYPE
copy(ek[1:], key)
return ek
}
func decode_kv_key(encodeKey []byte) ([]byte, error) {
if encodeKey[0] != KV_TYPE {
return nil, errKVKey
}
return encodeKey[1:], nil
}
func (a *App) kv_get(key []byte) ([]byte, error) {
key = encode_kv_key(key)
return a.db.Get(key)
}
func (a *App) kv_set(key []byte, value []byte) error {
key = encode_kv_key(key)
var err error
t := a.newTx()
a.kvMutex.Lock()
defer a.kvMutex.Unlock()
t.Put(key, value)
//todo, binlog
err = t.Commit()
return err
}
func (a *App) kv_getset(key []byte, value []byte) ([]byte, error) {
key = encode_kv_key(key)
var err error
a.kvMutex.Lock()
defer a.kvMutex.Unlock()
oldValue, _ := a.db.Get(key)
t := a.newTx()
t.Put(key, value)
//todo, binlog
err = t.Commit()
return oldValue, err
}
func (a *App) kv_setnx(key []byte, value []byte) (int64, error) {
key = encode_kv_key(key)
var err error
var n int64 = 1
t := a.newTx()
a.kvMutex.Lock()
defer a.kvMutex.Unlock()
if v, _ := a.db.Get(key); v != nil {
n = 0
} else {
t.Put(key, value)
//todo binlog
err = t.Commit()
}
return n, err
}
func (a *App) kv_exists(key []byte) (int64, error) {
key = encode_kv_key(key)
var err error
var v []byte
v, err = a.db.Get(key)
if v != nil && err != nil {
return 1, nil
} else {
return 0, err
}
}
func (a *App) kv_incr(key []byte, delta int64) (int64, error) {
key = encode_kv_key(key)
var err error
t := a.newTx()
a.kvMutex.Lock()
defer a.kvMutex.Unlock()
var v []byte
v, err = a.db.Get(key)
if err != nil {
return 0, err
}
var n int64 = 0
if v != nil {
n, err = strconv.ParseInt(hack.String(v), 10, 64)
if err != nil {
return 0, err
}
}
n += delta
t.Put(key, hack.Slice(strconv.FormatInt(n, 10)))
//todo binlog
err = t.Commit()
return n, err
}
func (a *App) tx_del(keys [][]byte) (int64, error) {
for i := range keys {
keys[i] = encode_kv_key(keys[i])
}
t := a.newTx()
a.kvMutex.Lock()
defer a.kvMutex.Unlock()
for i := range keys {
t.Delete(keys[i])
//todo binlog
}
err := t.Commit()
return int64(len(keys)), err
}
func (a *App) tx_mset(args [][]byte) error {
t := a.newTx()
a.kvMutex.Lock()
defer a.kvMutex.Unlock()
for i := 0; i < len(args); i += 2 {
key := encode_kv_key(args[i])
value := args[i+1]
t.Put(key, value)
//todo binlog
}
err := t.Commit()
return err
}
func (a *App) kv_mget(args [][]byte) ([]interface{}, error) {
values := make([]interface{}, len(args))
for i := range args {
key := encode_kv_key(args[i])
value, err := a.db.Get(key)
if err != nil {
return nil, err
}
values[i] = value
}
return values, nil
}

39
ssdb/tx.go Normal file
View File

@ -0,0 +1,39 @@
package ssdb
import (
"github.com/siddontang/golib/leveldb"
)
type tx struct {
app *App
wb *leveldb.WriteBatch
}
func (app *App) newTx() *tx {
t := new(tx)
t.app = app
t.wb = app.db.NewWriteBatch()
return t
}
func (t *tx) Put(key []byte, value []byte) {
t.wb.Put(key, value)
}
func (t *tx) Delete(key []byte) {
t.wb.Delete(key)
}
func (t *tx) Commit() error {
err := t.wb.Commit()
return err
}
func (t *tx) Rollback() error {
err := t.wb.Rollback()
return err
}