diff --git a/ssdb/app.go b/ssdb/app.go index d9dd0ed..e478e64 100644 --- a/ssdb/app.go +++ b/ssdb/app.go @@ -4,6 +4,7 @@ import ( "github.com/siddontang/golib/leveldb" "net" "strings" + "sync" ) type App struct { @@ -12,6 +13,11 @@ type App struct { listener net.Listener db *leveldb.DB + + kvMutex sync.Mutex + hashMutex sync.Mutex + listMutex sync.Mutex + zsetMutex sync.Mutex } func NewApp(cfg *Config) (*App, error) { @@ -36,6 +42,8 @@ func NewApp(cfg *Config) (*App, error) { return nil, err } + app.dbMutex = newKeyMutex(128) + return app, nil } @@ -55,3 +63,7 @@ func (app *App) Run() { newClient(conn, app) } } + +func (app *App) getMutex(key []byte) *sync.Mutex { + return app.dbMutex.Get(key) +} diff --git a/ssdb/cmd_kv.go b/ssdb/cmd_kv.go index 10d9348..5750d58 100644 --- a/ssdb/cmd_kv.go +++ b/ssdb/cmd_kv.go @@ -1,54 +1,200 @@ package ssdb +import ( + "github.com/siddontang/golib/hack" + "strconv" +) + func getCommand(c *client) error { + args := c.args + if len(args) != 1 { + return ErrCmdParams + } + + if v, err := c.app.kv_get(args[0]); err != nil { + return err + } else { + c.writeBulk(v) + } return nil } func setCommand(c *client) error { + args := c.args + if len(args) < 2 { + return ErrCmdParams + } + + if err := c.app.kv_set(args[0], args[1]); err != nil { + return err + } else { + c.writeStatus(OK) + } + return nil } func getsetCommand(c *client) error { + args := c.args + if len(args) != 2 { + return ErrCmdParams + } + + if v, err := c.app.kv_getset(args[0], args[1]); err != nil { + return err + } else { + c.writeBulk(v) + } + return nil } func setnxCommand(c *client) error { + args := c.args + if len(args) != 2 { + return ErrCmdParams + } + + if n, err := c.app.kv_setnx(args[0], args[1]); err != nil { + return err + } else { + c.writeInteger(n) + } + return nil } func existsCommand(c *client) error { + args := c.args + if len(args) != 1 { + return ErrCmdParams + } + + if n, err := c.app.kv_exists(args[0]); err != nil { + return err + } else { + c.writeInteger(n) + } + return nil } func incrCommand(c *client) error { + args := c.args + if len(args) != 1 { + return ErrCmdParams + } + + if n, err := c.app.kv_incr(c.args[0], 1); err != nil { + return err + } else { + c.writeInteger(n) + } + return nil } func decrCommand(c *client) error { + args := c.args + if len(args) != 1 { + return ErrCmdParams + } + + if n, err := c.app.kv_incr(c.args[0], -1); err != nil { + return err + } else { + c.writeInteger(n) + } + return nil } func incrbyCommand(c *client) error { + args := c.args + if len(args) != 2 { + return ErrCmdParams + } + + delta, err := strconv.ParseInt(hack.String(args[1]), 10, 64) + if err != nil { + return err + } + + if n, err := c.app.kv_incr(c.args[0], delta); err != nil { + return err + } else { + c.writeInteger(n) + } + return nil } func decrbyCommand(c *client) error { + args := c.args + if len(args) != 2 { + return ErrCmdParams + } + + delta, err := strconv.ParseInt(hack.String(args[1]), 10, 64) + if err != nil { + return err + } + + if n, err := c.app.kv_incr(c.args[0], -delta); err != nil { + return err + } else { + c.writeInteger(n) + } + return nil } func delCommand(c *client) error { + args := c.args + if len(args) == 0 { + return ErrCmdParams + } + + if n, err := c.app.tx_del(args); err != nil { + return err + } else { + c.writeInteger(n) + } + return nil } func msetCommand(c *client) error { + args := c.args + if len(args) == 0 || len(args)%2 != 0 { + return ErrCmdParams + } + + if err := c.app.tx_mset(args); err != nil { + return err + } else { + c.writeStatus(OK) + } + return nil } -func setexCommand(c *client) error { - return nil -} +// func setexCommand(c *client) error { +// return nil +// } func mgetCommand(c *client) error { + args := c.args + if len(args) == 0 { + return ErrCmdParams + } + + if v, err := c.app.kv_mget(args); err != nil { + return err + } else { + c.writeArray(v) + } + return nil } @@ -64,6 +210,6 @@ func init() { register("mget", mgetCommand) register("mset", msetCommand) register("set", setCommand) - register("setex", setexCommand) + // register("setex", setexCommand) register("setnx", setnxCommand) } diff --git a/ssdb/const.go b/ssdb/const.go index 541d8bb..00b9b08 100644 --- a/ssdb/const.go +++ b/ssdb/const.go @@ -19,3 +19,13 @@ var ( PONG = "PONG" OK = "OK" ) + +const ( + KV_TYPE byte = iota + 1 + HASH_TYPE + HSIZE_TYPE + ZSET_TYPE + ZSIZE_TYPE + LIST_TYPE + LSIZE_TYPE +) diff --git a/ssdb/lock.go b/ssdb/lock.go new file mode 100644 index 0000000..02c2768 --- /dev/null +++ b/ssdb/lock.go @@ -0,0 +1,27 @@ +package ssdb + +import ( + "hash/crc32" + "sync" +) + +type keyMutex struct { + mutexs []*sync.Mutex +} + +func newKeyMutex(size int) *keyMutex { + m := new(keyMutex) + + m.mutexs = make([]*sync.Mutex, size) + + for i := range m.mutexs { + m.mutexs[i] = &sync.Mutex{} + } + + return m +} + +func (k *keyMutex) Get(key []byte) *sync.Mutex { + h := int(crc32.ChecksumIEEE(key)) + return k.mutexs[h%len(k.mutexs)] +} diff --git a/ssdb/t_kv.go b/ssdb/t_kv.go new file mode 100644 index 0000000..96240f7 --- /dev/null +++ b/ssdb/t_kv.go @@ -0,0 +1,192 @@ +package ssdb + +import ( + "errors" + "github.com/siddontang/golib/hack" + "strconv" +) + +var errEmptyKVKey = errors.New("invalid empty kv key") +var errKVKey = errors.New("invalid encode kv key") + +func encode_kv_key(key []byte) []byte { + ek := make([]byte, key+1) + ek[0] = KV_TYPE + copy(ek[1:], key) + return ek +} + +func decode_kv_key(encodeKey []byte) ([]byte, error) { + if encodeKey[0] != KV_TYPE { + return nil, errKVKey + } + + return encodeKey[1:], nil +} + +func (a *App) kv_get(key []byte) ([]byte, error) { + key = encode_kv_key(key) + + return a.db.Get(key) +} + +func (a *App) kv_set(key []byte, value []byte) error { + key = encode_kv_key(key) + var err error + + t := a.newTx() + + a.kvMutex.Lock() + defer a.kvMutex.Unlock() + + t.Put(key, value) + + //todo, binlog + + err = t.Commit() + + return err +} + +func (a *App) kv_getset(key []byte, value []byte) ([]byte, error) { + key = encode_kv_key(key) + var err error + + a.kvMutex.Lock() + defer a.kvMutex.Unlock() + + oldValue, _ := a.db.Get(key) + + t := a.newTx() + + t.Put(key, value) + //todo, binlog + + err = t.Commit() + + return oldValue, err +} + +func (a *App) kv_setnx(key []byte, value []byte) (int64, error) { + key = encode_kv_key(key) + var err error + + var n int64 = 1 + + t := a.newTx() + + a.kvMutex.Lock() + defer a.kvMutex.Unlock() + + if v, _ := a.db.Get(key); v != nil { + n = 0 + } else { + t.Put(key, value) + + //todo binlog + + err = t.Commit() + } + + return n, err +} + +func (a *App) kv_exists(key []byte) (int64, error) { + key = encode_kv_key(key) + var err error + + var v []byte + v, err = a.db.Get(key) + if v != nil && err != nil { + return 1, nil + } else { + return 0, err + } +} + +func (a *App) kv_incr(key []byte, delta int64) (int64, error) { + key = encode_kv_key(key) + var err error + + t := a.newTx() + + a.kvMutex.Lock() + defer a.kvMutex.Unlock() + + var v []byte + v, err = a.db.Get(key) + if err != nil { + return 0, err + } + + var n int64 = 0 + if v != nil { + n, err = strconv.ParseInt(hack.String(v), 10, 64) + if err != nil { + return 0, err + } + } + + n += delta + + t.Put(key, hack.Slice(strconv.FormatInt(n, 10))) + + //todo binlog + + err = t.Commit() + return n, err +} + +func (a *App) tx_del(keys [][]byte) (int64, error) { + for i := range keys { + keys[i] = encode_kv_key(keys[i]) + } + + t := a.newTx() + + a.kvMutex.Lock() + defer a.kvMutex.Unlock() + + for i := range keys { + t.Delete(keys[i]) + //todo binlog + } + + err := t.Commit() + return int64(len(keys)), err +} + +func (a *App) tx_mset(args [][]byte) error { + t := a.newTx() + + a.kvMutex.Lock() + defer a.kvMutex.Unlock() + + for i := 0; i < len(args); i += 2 { + key := encode_kv_key(args[i]) + value := args[i+1] + + t.Put(key, value) + + //todo binlog + } + + err := t.Commit() + return err +} + +func (a *App) kv_mget(args [][]byte) ([]interface{}, error) { + values := make([]interface{}, len(args)) + + for i := range args { + key := encode_kv_key(args[i]) + value, err := a.db.Get(key) + if err != nil { + return nil, err + } + + values[i] = value + } + + return values, nil +} diff --git a/ssdb/tx.go b/ssdb/tx.go new file mode 100644 index 0000000..658b034 --- /dev/null +++ b/ssdb/tx.go @@ -0,0 +1,39 @@ +package ssdb + +import ( + "github.com/siddontang/golib/leveldb" +) + +type tx struct { + app *App + + wb *leveldb.WriteBatch +} + +func (app *App) newTx() *tx { + t := new(tx) + + t.app = app + + t.wb = app.db.NewWriteBatch() + + return t +} + +func (t *tx) Put(key []byte, value []byte) { + t.wb.Put(key, value) +} + +func (t *tx) Delete(key []byte) { + t.wb.Delete(key) +} + +func (t *tx) Commit() error { + err := t.wb.Commit() + return err +} + +func (t *tx) Rollback() error { + err := t.wb.Rollback() + return err +}