refactor, support use embedded ledisdb

now you can embed ledisdb as a lib to use.
This commit is contained in:
siddontang 2014-05-15 14:19:48 +08:00
parent 4eeadc7389
commit e860902fef
16 changed files with 509 additions and 442 deletions

View File

@ -42,4 +42,4 @@ else
echo "skip install leveldb" echo "skip install leveldb"
fi fi
cd $ROOT_DIR cd $ROOT_DIR

5
dev.sh
View File

@ -31,6 +31,11 @@ export GOPATH=$(add_path $GOPATH $VTROOT)
export CGO_CFLAGS="-I$LEVELDB_DIR/include -I$SNAPPY_DIR/include" export CGO_CFLAGS="-I$LEVELDB_DIR/include -I$SNAPPY_DIR/include"
export CGO_CXXFLAGS="-I$LEVELDB_DIR/include -I$SNAPPY_DIR/include" export CGO_CXXFLAGS="-I$LEVELDB_DIR/include -I$SNAPPY_DIR/include"
export CGO_LDFLAGS="-L$LEVELDB_DIR/lib -L$SNAPPY_DIR/lib -lsnappy" export CGO_LDFLAGS="-L$LEVELDB_DIR/lib -L$SNAPPY_DIR/lib -lsnappy"
#for linux, use LD_LIBRARY_PATH
export LD_LIBRARY_PATH=$(add_path $LD_LIBRARY_PATH $SNAPPY_DIR/lib) export LD_LIBRARY_PATH=$(add_path $LD_LIBRARY_PATH $SNAPPY_DIR/lib)
export LD_LIBRARY_PATH=$(add_path $LD_LIBRARY_PATH $LEVELDB_DIR/lib) export LD_LIBRARY_PATH=$(add_path $LD_LIBRARY_PATH $LEVELDB_DIR/lib)
#for macos, use DYLD_LIBRARY_PATH
export DYLD_LIBRARY_PATH=$(add_path $DYLD_LIBRARY_PATH $SNAPPY_DIR/lib)
export DYLD_LIBRARY_PATH=$(add_path $DYLD_LIBRARY_PATH $LEVELDB_DIR/lib)

View File

@ -1,7 +1,6 @@
package ledis package ledis
import ( import (
"github.com/siddontang/go-leveldb/leveldb"
"net" "net"
"strings" "strings"
) )
@ -11,12 +10,7 @@ type App struct {
listener net.Listener listener net.Listener
db *leveldb.DB db *DB
kvTx *tx
listTx *tx
hashTx *tx
zsetTx *tx
closed bool closed bool
} }
@ -40,16 +34,11 @@ func NewApp(cfg *Config) (*App, error) {
return nil, err return nil, err
} }
app.db, err = leveldb.OpenWithConfig(&cfg.DB) app.db, err = OpenDBWithConfig(&cfg.DB)
if err != nil { if err != nil {
return nil, err return nil, err
} }
app.kvTx = app.newTx()
app.listTx = app.newTx()
app.hashTx = app.newTx()
app.zsetTx = app.newTx()
return app, nil return app, nil
} }
@ -72,6 +61,6 @@ func (app *App) Run() {
continue continue
} }
newClient(conn, app) newClient(conn, app.db)
} }
} }

View File

@ -14,8 +14,8 @@ import (
var errReadRequest = errors.New("invalid request protocol") var errReadRequest = errors.New("invalid request protocol")
type client struct { type client struct {
app *App db *DB
c net.Conn c net.Conn
rb *bufio.Reader rb *bufio.Reader
wb *bufio.Writer wb *bufio.Writer
@ -26,9 +26,9 @@ type client struct {
reqC chan error reqC chan error
} }
func newClient(c net.Conn, app *App) { func newClient(c net.Conn, db *DB) {
co := new(client) co := new(client)
co.app = app co.db = db
co.c = c co.c = c
co.rb = bufio.NewReaderSize(c, 256) co.rb = bufio.NewReaderSize(c, 256)

View File

@ -8,7 +8,7 @@ func hsetCommand(c *client) error {
return ErrCmdParams return ErrCmdParams
} }
if n, err := c.app.hash_set(args[0], args[1], args[2]); err != nil { if n, err := c.db.HSet(args[0], args[1], args[2]); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.writeInteger(n)
@ -23,7 +23,7 @@ func hgetCommand(c *client) error {
return ErrCmdParams return ErrCmdParams
} }
if v, err := c.app.hash_get(args[0], args[1]); err != nil { if v, err := c.db.HGet(args[0], args[1]); err != nil {
return err return err
} else { } else {
c.writeBulk(v) c.writeBulk(v)
@ -39,7 +39,7 @@ func hexistsCommand(c *client) error {
} }
var n int64 = 1 var n int64 = 1
if v, err := c.app.hash_get(args[0], args[1]); err != nil { if v, err := c.db.HGet(args[0], args[1]); err != nil {
return err return err
} else { } else {
if v == nil { if v == nil {
@ -57,7 +57,7 @@ func hdelCommand(c *client) error {
return ErrCmdParams return ErrCmdParams
} }
if n, err := c.app.hash_del(args[0], args[1:]); err != nil { if n, err := c.db.HDel(args[0], args[1:]); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.writeInteger(n)
@ -72,7 +72,7 @@ func hlenCommand(c *client) error {
return ErrCmdParams return ErrCmdParams
} }
if n, err := c.app.hash_len(args[0]); err != nil { if n, err := c.db.HLen(args[0]); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.writeInteger(n)
@ -93,7 +93,7 @@ func hincrbyCommand(c *client) error {
} }
var n int64 var n int64
if n, err = c.app.hash_incrby(args[0], args[1], delta); err != nil { if n, err = c.db.HIncrBy(args[0], args[1], delta); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.writeInteger(n)
@ -111,7 +111,7 @@ func hmsetCommand(c *client) error {
return ErrCmdParams return ErrCmdParams
} }
if err := c.app.hash_mset(args[0], args[1:]); err != nil { if err := c.db.HMset(args[0], args[1:]); err != nil {
return err return err
} else { } else {
c.writeStatus(OK) c.writeStatus(OK)
@ -126,7 +126,7 @@ func hmgetCommand(c *client) error {
return ErrCmdParams return ErrCmdParams
} }
if v, err := c.app.hash_mget(args[0], args[1:]); err != nil { if v, err := c.db.HMget(args[0], args[1:]); err != nil {
return err return err
} else { } else {
c.writeArray(v) c.writeArray(v)
@ -141,7 +141,7 @@ func hgetallCommand(c *client) error {
return ErrCmdParams return ErrCmdParams
} }
if v, err := c.app.hash_getall(args[0]); err != nil { if v, err := c.db.HGetAll(args[0]); err != nil {
return err return err
} else { } else {
c.writeArray(v) c.writeArray(v)
@ -156,7 +156,7 @@ func hkeysCommand(c *client) error {
return ErrCmdParams return ErrCmdParams
} }
if v, err := c.app.hash_keys(args[0]); err != nil { if v, err := c.db.HKeys(args[0]); err != nil {
return err return err
} else { } else {
c.writeArray(v) c.writeArray(v)
@ -171,7 +171,7 @@ func hvalsCommand(c *client) error {
return ErrCmdParams return ErrCmdParams
} }
if v, err := c.app.hash_values(args[0]); err != nil { if v, err := c.db.HValues(args[0]); err != nil {
return err return err
} else { } else {
c.writeArray(v) c.writeArray(v)
@ -186,7 +186,7 @@ func hclearCommand(c *client) error {
return ErrCmdParams return ErrCmdParams
} }
if n, err := c.app.hash_clear(args[0]); err != nil { if n, err := c.db.HClear(args[0]); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.writeInteger(n)

View File

@ -8,7 +8,7 @@ func getCommand(c *client) error {
return ErrCmdParams return ErrCmdParams
} }
if v, err := c.app.kv_get(args[0]); err != nil { if v, err := c.db.Get(args[0]); err != nil {
return err return err
} else { } else {
c.writeBulk(v) c.writeBulk(v)
@ -22,7 +22,7 @@ func setCommand(c *client) error {
return ErrCmdParams return ErrCmdParams
} }
if err := c.app.kv_set(args[0], args[1]); err != nil { if err := c.db.Set(args[0], args[1]); err != nil {
return err return err
} else { } else {
c.writeStatus(OK) c.writeStatus(OK)
@ -37,7 +37,7 @@ func getsetCommand(c *client) error {
return ErrCmdParams return ErrCmdParams
} }
if v, err := c.app.kv_getset(args[0], args[1]); err != nil { if v, err := c.db.GetSet(args[0], args[1]); err != nil {
return err return err
} else { } else {
c.writeBulk(v) c.writeBulk(v)
@ -52,7 +52,7 @@ func setnxCommand(c *client) error {
return ErrCmdParams return ErrCmdParams
} }
if n, err := c.app.kv_setnx(args[0], args[1]); err != nil { if n, err := c.db.SetNX(args[0], args[1]); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.writeInteger(n)
@ -67,7 +67,7 @@ func existsCommand(c *client) error {
return ErrCmdParams return ErrCmdParams
} }
if n, err := c.app.kv_exists(args[0]); err != nil { if n, err := c.db.Exists(args[0]); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.writeInteger(n)
@ -82,7 +82,7 @@ func incrCommand(c *client) error {
return ErrCmdParams return ErrCmdParams
} }
if n, err := c.app.kv_incr(c.args[0], 1); err != nil { if n, err := c.db.Incr(c.args[0]); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.writeInteger(n)
@ -97,7 +97,7 @@ func decrCommand(c *client) error {
return ErrCmdParams return ErrCmdParams
} }
if n, err := c.app.kv_incr(c.args[0], -1); err != nil { if n, err := c.db.Decr(c.args[0]); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.writeInteger(n)
@ -117,7 +117,7 @@ func incrbyCommand(c *client) error {
return err return err
} }
if n, err := c.app.kv_incr(c.args[0], delta); err != nil { if n, err := c.db.IncryBy(c.args[0], delta); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.writeInteger(n)
@ -137,7 +137,7 @@ func decrbyCommand(c *client) error {
return err return err
} }
if n, err := c.app.kv_incr(c.args[0], -delta); err != nil { if n, err := c.db.DecrBy(c.args[0], -delta); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.writeInteger(n)
@ -152,7 +152,7 @@ func delCommand(c *client) error {
return ErrCmdParams return ErrCmdParams
} }
if n, err := c.app.tx_del(args); err != nil { if n, err := c.db.Del(args); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.writeInteger(n)
@ -167,7 +167,7 @@ func msetCommand(c *client) error {
return ErrCmdParams return ErrCmdParams
} }
if err := c.app.tx_mset(args); err != nil { if err := c.db.MSet(args); err != nil {
return err return err
} else { } else {
c.writeStatus(OK) c.writeStatus(OK)
@ -186,7 +186,7 @@ func mgetCommand(c *client) error {
return ErrCmdParams return ErrCmdParams
} }
if v, err := c.app.kv_mget(args); err != nil { if v, err := c.db.MGet(args); err != nil {
return err return err
} else { } else {
c.writeArray(v) c.writeArray(v)
@ -207,6 +207,5 @@ func init() {
register("mget", mgetCommand) register("mget", mgetCommand)
register("mset", msetCommand) register("mset", msetCommand)
register("set", setCommand) register("set", setCommand)
// register("setex", setexCommand)
register("setnx", setnxCommand) register("setnx", setnxCommand)
} }

View File

@ -8,7 +8,7 @@ func lpushCommand(c *client) error {
return ErrCmdParams return ErrCmdParams
} }
if n, err := c.app.list_lpush(args[0], args[1:]); err != nil { if n, err := c.db.LPush(args[0], args[1:]); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.writeInteger(n)
@ -23,7 +23,7 @@ func rpushCommand(c *client) error {
return ErrCmdParams return ErrCmdParams
} }
if n, err := c.app.list_rpush(args[0], args[1:]); err != nil { if n, err := c.db.RPush(args[0], args[1:]); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.writeInteger(n)
@ -38,7 +38,7 @@ func lpopCommand(c *client) error {
return ErrCmdParams return ErrCmdParams
} }
if v, err := c.app.list_lpop(args[0]); err != nil { if v, err := c.db.LPop(args[0]); err != nil {
return err return err
} else { } else {
c.writeBulk(v) c.writeBulk(v)
@ -53,7 +53,7 @@ func rpopCommand(c *client) error {
return ErrCmdParams return ErrCmdParams
} }
if v, err := c.app.list_rpop(args[0]); err != nil { if v, err := c.db.RPop(args[0]); err != nil {
return err return err
} else { } else {
c.writeBulk(v) c.writeBulk(v)
@ -68,7 +68,7 @@ func llenCommand(c *client) error {
return ErrCmdParams return ErrCmdParams
} }
if n, err := c.app.list_len(args[0]); err != nil { if n, err := c.db.LLen(args[0]); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.writeInteger(n)
@ -88,7 +88,7 @@ func lindexCommand(c *client) error {
return err return err
} }
if v, err := c.app.list_index(args[0], int32(index)); err != nil { if v, err := c.db.LIndex(args[0], int32(index)); err != nil {
return err return err
} else { } else {
c.writeBulk(v) c.writeBulk(v)
@ -117,7 +117,7 @@ func lrangeCommand(c *client) error {
return err return err
} }
if v, err := c.app.list_range(args[0], int32(start), int32(stop)); err != nil { if v, err := c.db.LRange(args[0], int32(start), int32(stop)); err != nil {
return err return err
} else { } else {
c.writeArray(v) c.writeArray(v)
@ -132,7 +132,7 @@ func lclearCommand(c *client) error {
return ErrCmdParams return ErrCmdParams
} }
if n, err := c.app.list_clear(args[0]); err != nil { if n, err := c.db.LClear(args[0]); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.writeInteger(n)

View File

@ -53,16 +53,6 @@ func testListRange(key []byte, start int64, stop int64, checkValues ...int) erro
return nil return nil
} }
func testPrintList(key []byte) {
it := testApp.db.Iterator(encode_list_key(key, listMinSeq),
encode_list_key(key, listMaxSeq), 0, 0, -1)
for ; it.Valid(); it.Next() {
k, seq, _ := decode_list_key(it.Key())
println(string(k), "seq ", seq, "value:", string(it.Value()))
}
println("end ---------------------")
}
func TestList(t *testing.T) { func TestList(t *testing.T) {
startTestApp() startTestApp()

View File

@ -9,11 +9,6 @@ import (
//for simple implementation, we only support int64 score //for simple implementation, we only support int64 score
const (
MinScore int64 = -1<<63 + 1
MaxScore int64 = 1<<63 - 1
)
var errScoreOverflow = errors.New("zset score overflow") var errScoreOverflow = errors.New("zset score overflow")
func zaddCommand(c *client) error { func zaddCommand(c *client) error {
@ -39,7 +34,7 @@ func zaddCommand(c *client) error {
params[i+1] = args[i+1] params[i+1] = args[i+1]
} }
if n, err := c.app.zset_add(key, params); err != nil { if n, err := c.db.ZAdd(key, params); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.writeInteger(n)
@ -54,7 +49,7 @@ func zcardCommand(c *client) error {
return ErrCmdParams return ErrCmdParams
} }
if n, err := c.app.zset_card(args[0]); err != nil { if n, err := c.db.ZCard(args[0]); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.writeInteger(n)
@ -69,7 +64,7 @@ func zscoreCommand(c *client) error {
return ErrCmdParams return ErrCmdParams
} }
if v, err := c.app.zset_score(args[0], args[1]); err != nil { if v, err := c.db.ZScore(args[0], args[1]); err != nil {
return err return err
} else { } else {
c.writeBulk(v) c.writeBulk(v)
@ -84,7 +79,7 @@ func zremCommand(c *client) error {
return ErrCmdParams return ErrCmdParams
} }
if n, err := c.app.zset_rem(args[0], args[1:]); err != nil { if n, err := c.db.ZRem(args[0], args[1:]); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.writeInteger(n)
@ -106,7 +101,7 @@ func zincrbyCommand(c *client) error {
return err return err
} }
if v, err := c.app.zset_incrby(key, delta, args[2]); err != nil { if v, err := c.db.ZIncrBy(key, delta, args[2]); err != nil {
return err return err
} else { } else {
c.writeBulk(v) c.writeBulk(v)
@ -193,7 +188,7 @@ func zcountCommand(c *client) error {
return nil return nil
} }
if n, err := c.app.zset_count(args[0], min, max); err != nil { if n, err := c.db.ZCount(args[0], min, max); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.writeInteger(n)
@ -208,7 +203,7 @@ func zrankCommand(c *client) error {
return ErrCmdParams return ErrCmdParams
} }
if n, err := c.app.zset_rank(args[0], args[1], false); err != nil { if n, err := c.db.ZRank(args[0], args[1]); err != nil {
return err return err
} else if n == -1 { } else if n == -1 {
c.writeBulk(nil) c.writeBulk(nil)
@ -225,7 +220,7 @@ func zrevrankCommand(c *client) error {
return ErrCmdParams return ErrCmdParams
} }
if n, err := c.app.zset_rank(args[0], args[1], true); err != nil { if n, err := c.db.ZRevRank(args[0], args[1]); err != nil {
return err return err
} else if n == -1 { } else if n == -1 {
c.writeBulk(nil) c.writeBulk(nil)
@ -244,17 +239,12 @@ func zremrangebyrankCommand(c *client) error {
key := args[0] key := args[0]
offset, limit, err := zparseRange(c, key, args[1], args[2]) start, stop, err := zparseRange(c, args[1], args[2])
if err != nil { if err != nil {
return err return err
} }
if offset < 0 { if n, err := c.db.ZRemRangeByRank(key, start, stop); err != nil {
c.writeInteger(0)
return nil
}
if n, err := c.app.zset_remRange(key, MinScore, MaxScore, offset, limit); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.writeInteger(n)
@ -275,7 +265,7 @@ func zremrangebyscoreCommand(c *client) error {
return err return err
} }
if n, err := c.app.zset_remRange(key, min, max, 0, -1); err != nil { if n, err := c.db.ZRemRangeByScore(key, min, max); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.writeInteger(n)
@ -284,51 +274,15 @@ func zremrangebyscoreCommand(c *client) error {
return nil return nil
} }
func zparseRange(c *client, key []byte, startBuf []byte, stopBuf []byte) (offset int, limit int, err error) { func zparseRange(c *client, a1 []byte, a2 []byte) (start int, stop int, err error) {
var start int if start, err = strconv.Atoi(String(a1)); err != nil {
var stop int
if start, err = strconv.Atoi(String(startBuf)); err != nil {
return return
} }
if stop, err = strconv.Atoi(String(stopBuf)); err != nil { if stop, err = strconv.Atoi(String(a2)); err != nil {
return return
} }
if start < 0 || stop < 0 {
//refer redis implementation
var size int64
size, err = c.app.zset_card(key)
if err != nil {
return
}
llen := int(size)
if start < 0 {
start = llen + start
}
if stop < 0 {
stop = llen + stop
}
if start < 0 {
start = 0
}
if start >= llen {
offset = -1
return
}
}
if start > stop {
offset = -1
return
}
offset = start
limit = (stop - start) + 1
return return
} }
@ -340,16 +294,11 @@ func zrangeGeneric(c *client, reverse bool) error {
key := args[0] key := args[0]
offset, limit, err := zparseRange(c, key, args[1], args[2]) start, stop, err := zparseRange(c, args[1], args[2])
if err != nil { if err != nil {
return err return err
} }
if offset < 0 {
c.writeArray([]interface{}{})
return nil
}
args = args[3:] args = args[3:]
var withScores bool = false var withScores bool = false
@ -357,7 +306,7 @@ func zrangeGeneric(c *client, reverse bool) error {
withScores = true withScores = true
} }
if v, err := c.app.zset_range(key, MinScore, MaxScore, withScores, offset, limit, reverse); err != nil { if v, err := c.db.ZRangeGeneric(key, start, stop, withScores, reverse); err != nil {
return err return err
} else { } else {
c.writeArray(v) c.writeArray(v)
@ -395,7 +344,7 @@ func zrangebyscoreGeneric(c *client, reverse bool) error {
} }
var offset int = 0 var offset int = 0
var limit int = -1 var count int = -1
if len(args) > 0 { if len(args) > 0 {
if len(args) != 3 { if len(args) != 3 {
@ -410,7 +359,7 @@ func zrangebyscoreGeneric(c *client, reverse bool) error {
return ErrCmdParams return ErrCmdParams
} }
if limit, err = strconv.Atoi(String(args[2])); err != nil { if count, err = strconv.Atoi(String(args[2])); err != nil {
return ErrCmdParams return ErrCmdParams
} }
} }
@ -422,7 +371,7 @@ func zrangebyscoreGeneric(c *client, reverse bool) error {
return nil return nil
} }
if v, err := c.app.zset_range(key, min, max, withScores, offset, limit, reverse); err != nil { if v, err := c.db.ZRangeByScoreGeneric(key, min, max, withScores, offset, count, reverse); err != nil {
return err return err
} else { } else {
c.writeArray(v) c.writeArray(v)
@ -445,7 +394,7 @@ func zclearCommand(c *client) error {
return ErrCmdParams return ErrCmdParams
} }
if n, err := c.app.zset_clear(args[0]); err != nil { if n, err := c.db.ZClear(args[0]); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.writeInteger(n)

50
ledis/db.go Normal file
View File

@ -0,0 +1,50 @@
package ledis
import (
"encoding/json"
"github.com/siddontang/go-leveldb/leveldb"
)
type DB struct {
db *leveldb.DB
kvTx *tx
listTx *tx
hashTx *tx
zsetTx *tx
}
func OpenDB(configJson json.RawMessage) (*DB, error) {
db, err := leveldb.Open(configJson)
if err != nil {
return nil, err
}
return newDB(db)
}
func OpenDBWithConfig(cfg *leveldb.Config) (*DB, error) {
db, err := leveldb.OpenWithConfig(cfg)
if err != nil {
return nil, err
}
return newDB(db)
}
func newDB(db *leveldb.DB) (*DB, error) {
d := new(DB)
d.db = db
d.kvTx = &tx{wb: db.NewWriteBatch()}
d.listTx = &tx{wb: db.NewWriteBatch()}
d.hashTx = &tx{wb: db.NewWriteBatch()}
d.zsetTx = &tx{wb: db.NewWriteBatch()}
return d, nil
}
func (db *DB) Close() {
db.db.Close()
}

View File

@ -1,27 +0,0 @@
package ledis
import (
"hash/crc32"
"sync"
)
type keyMutex struct {
mutexs []*sync.Mutex
}
func newKeyMutex(size int) *keyMutex {
m := new(keyMutex)
m.mutexs = make([]*sync.Mutex, size)
for i := range m.mutexs {
m.mutexs[i] = &sync.Mutex{}
}
return m
}
func (k *keyMutex) Get(key []byte) *sync.Mutex {
h := int(crc32.ChecksumIEEE(key))
return k.mutexs[h%len(k.mutexs)]
}

View File

@ -87,20 +87,20 @@ func decode_hash_key(ek []byte) ([]byte, []byte, error) {
return key, field, nil return key, field, nil
} }
func (a *App) hash_len(key []byte) (int64, error) { func (db *DB) HLen(key []byte) (int64, error) {
return Int64(a.db.Get(encode_hsize_key(key))) return Int64(db.db.Get(encode_hsize_key(key)))
} }
func (a *App) hash_setItem(key []byte, field []byte, value []byte) (int64, error) { func (db *DB) hSetItem(key []byte, field []byte, value []byte) (int64, error) {
t := a.hashTx t := db.hashTx
ek := encode_hash_key(key, field) ek := encode_hash_key(key, field)
var n int64 = 1 var n int64 = 1
if v, _ := a.db.Get(ek); v != nil { if v, _ := db.db.Get(ek); v != nil {
n = 0 n = 0
} else { } else {
if _, err := a.hash_incrSize(key, 1); err != nil { if _, err := db.hIncrSize(key, 1); err != nil {
return 0, err return 0, err
} }
} }
@ -109,12 +109,12 @@ func (a *App) hash_setItem(key []byte, field []byte, value []byte) (int64, error
return n, nil return n, nil
} }
func (a *App) hash_set(key []byte, field []byte, value []byte) (int64, error) { func (db *DB) HSet(key []byte, field []byte, value []byte) (int64, error) {
t := a.hashTx t := db.hashTx
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
n, err := a.hash_setItem(key, field, value) n, err := db.hSetItem(key, field, value)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -125,26 +125,26 @@ func (a *App) hash_set(key []byte, field []byte, value []byte) (int64, error) {
return n, err return n, err
} }
func (a *App) hash_get(key []byte, field []byte) ([]byte, error) { func (db *DB) HGet(key []byte, field []byte) ([]byte, error) {
return a.db.Get(encode_hash_key(key, field)) return db.db.Get(encode_hash_key(key, field))
} }
func (a *App) hash_mset(key []byte, args [][]byte) error { func (db *DB) HMset(key []byte, args [][]byte) error {
t := a.hashTx t := db.hashTx
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
var num int64 = 0 var num int64 = 0
for i := 0; i < len(args); i += 2 { for i := 0; i < len(args); i += 2 {
ek := encode_hash_key(key, args[i]) ek := encode_hash_key(key, args[i])
if v, _ := a.db.Get(ek); v == nil { if v, _ := db.db.Get(ek); v == nil {
num++ num++
} }
t.Put(ek, args[i+1]) t.Put(ek, args[i+1])
} }
if _, err := a.hash_incrSize(key, num); err != nil { if _, err := db.hIncrSize(key, num); err != nil {
return err return err
} }
@ -153,10 +153,10 @@ func (a *App) hash_mset(key []byte, args [][]byte) error {
return err return err
} }
func (a *App) hash_mget(key []byte, args [][]byte) ([]interface{}, error) { func (db *DB) HMget(key []byte, args [][]byte) ([]interface{}, error) {
r := make([]interface{}, len(args)) r := make([]interface{}, len(args))
for i := 0; i < len(args); i++ { for i := 0; i < len(args); i++ {
v, err := a.db.Get(encode_hash_key(key, args[i])) v, err := db.db.Get(encode_hash_key(key, args[i]))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -167,15 +167,15 @@ func (a *App) hash_mget(key []byte, args [][]byte) ([]interface{}, error) {
return r, nil return r, nil
} }
func (a *App) hash_del(key []byte, args [][]byte) (int64, error) { func (db *DB) HDel(key []byte, args [][]byte) (int64, error) {
t := a.hashTx t := db.hashTx
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
var num int64 = 0 var num int64 = 0
for i := 0; i < len(args); i++ { for i := 0; i < len(args); i++ {
ek := encode_hash_key(key, args[i]) ek := encode_hash_key(key, args[i])
if v, err := a.db.Get(ek); err != nil { if v, err := db.db.Get(ek); err != nil {
return 0, err return 0, err
} else if v == nil { } else if v == nil {
continue continue
@ -185,7 +185,7 @@ func (a *App) hash_del(key []byte, args [][]byte) (int64, error) {
} }
} }
if _, err := a.hash_incrSize(key, -num); err != nil { if _, err := db.hIncrSize(key, -num); err != nil {
return 0, err return 0, err
} }
@ -194,10 +194,10 @@ func (a *App) hash_del(key []byte, args [][]byte) (int64, error) {
return num, err return num, err
} }
func (a *App) hash_incrSize(key []byte, delta int64) (int64, error) { func (db *DB) hIncrSize(key []byte, delta int64) (int64, error) {
t := a.hashTx t := db.hashTx
sk := encode_hsize_key(key) sk := encode_hsize_key(key)
size, err := Int64(a.db.Get(sk)) size, err := Int64(db.db.Get(sk))
if err != nil { if err != nil {
return 0, err return 0, err
} else { } else {
@ -213,22 +213,22 @@ func (a *App) hash_incrSize(key []byte, delta int64) (int64, error) {
return size, nil return size, nil
} }
func (a *App) hash_incrby(key []byte, field []byte, delta int64) (int64, error) { func (db *DB) HIncrBy(key []byte, field []byte, delta int64) (int64, error) {
t := a.hashTx t := db.hashTx
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
ek := encode_hash_key(key, field) ek := encode_hash_key(key, field)
var n int64 = 0 var n int64 = 0
n, err := StrInt64(a.db.Get(ek)) n, err := StrInt64(db.db.Get(ek))
if err != nil { if err != nil {
return 0, err return 0, err
} }
n += delta n += delta
_, err = a.hash_setItem(key, field, StrPutInt64(n)) _, err = db.hSetItem(key, field, StrPutInt64(n))
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -238,13 +238,13 @@ func (a *App) hash_incrby(key []byte, field []byte, delta int64) (int64, error)
return n, err return n, err
} }
func (a *App) hash_getall(key []byte) ([]interface{}, error) { func (db *DB) HGetAll(key []byte) ([]interface{}, error) {
start := encode_hash_start_key(key) start := encode_hash_start_key(key)
stop := encode_hash_stop_key(key) stop := encode_hash_stop_key(key)
v := make([]interface{}, 0, 16) v := make([]interface{}, 0, 16)
it := a.db.Iterator(start, stop, leveldb.RangeROpen, 0, -1) it := db.db.Iterator(start, stop, leveldb.RangeROpen, 0, -1)
for ; it.Valid(); it.Next() { for ; it.Valid(); it.Next() {
_, k, err := decode_hash_key(it.Key()) _, k, err := decode_hash_key(it.Key())
if err != nil { if err != nil {
@ -259,13 +259,13 @@ func (a *App) hash_getall(key []byte) ([]interface{}, error) {
return v, nil return v, nil
} }
func (a *App) hash_keys(key []byte) ([]interface{}, error) { func (db *DB) HKeys(key []byte) ([]interface{}, error) {
start := encode_hash_start_key(key) start := encode_hash_start_key(key)
stop := encode_hash_stop_key(key) stop := encode_hash_stop_key(key)
v := make([]interface{}, 0, 16) v := make([]interface{}, 0, 16)
it := a.db.Iterator(start, stop, leveldb.RangeROpen, 0, -1) it := db.db.Iterator(start, stop, leveldb.RangeROpen, 0, -1)
for ; it.Valid(); it.Next() { for ; it.Valid(); it.Next() {
_, k, err := decode_hash_key(it.Key()) _, k, err := decode_hash_key(it.Key())
if err != nil { if err != nil {
@ -279,13 +279,13 @@ func (a *App) hash_keys(key []byte) ([]interface{}, error) {
return v, nil return v, nil
} }
func (a *App) hash_values(key []byte) ([]interface{}, error) { func (db *DB) HValues(key []byte) ([]interface{}, error) {
start := encode_hash_start_key(key) start := encode_hash_start_key(key)
stop := encode_hash_stop_key(key) stop := encode_hash_stop_key(key)
v := make([]interface{}, 0, 16) v := make([]interface{}, 0, 16)
it := a.db.Iterator(start, stop, leveldb.RangeROpen, 0, -1) it := db.db.Iterator(start, stop, leveldb.RangeROpen, 0, -1)
for ; it.Valid(); it.Next() { for ; it.Valid(); it.Next() {
v = append(v, it.Value()) v = append(v, it.Value())
} }
@ -295,10 +295,10 @@ func (a *App) hash_values(key []byte) ([]interface{}, error) {
return v, nil return v, nil
} }
func (a *App) hash_clear(key []byte) (int64, error) { func (db *DB) HClear(key []byte) (int64, error) {
sk := encode_hsize_key(key) sk := encode_hsize_key(key)
t := a.hashTx t := db.hashTx
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
@ -306,7 +306,7 @@ func (a *App) hash_clear(key []byte) (int64, error) {
stop := encode_hash_stop_key(key) stop := encode_hash_stop_key(key)
var num int64 = 0 var num int64 = 0
it := a.db.Iterator(start, stop, leveldb.RangeROpen, 0, -1) it := db.db.Iterator(start, stop, leveldb.RangeROpen, 0, -1)
for ; it.Valid(); it.Next() { for ; it.Valid(); it.Next() {
t.Delete(it.Key()) t.Delete(it.Key())
num++ num++

View File

@ -21,101 +21,17 @@ func decode_kv_key(ek []byte) ([]byte, error) {
return ek[1:], nil return ek[1:], nil
} }
func (a *App) kv_get(key []byte) ([]byte, error) { func (db *DB) incr(key []byte, delta int64) (int64, error) {
key = encode_kv_key(key)
return a.db.Get(key)
}
func (a *App) kv_set(key []byte, value []byte) error {
key = encode_kv_key(key) key = encode_kv_key(key)
var err error var err error
t := a.kvTx t := db.kvTx
t.Lock()
defer t.Unlock()
t.Put(key, value)
//todo, binlog
err = t.Commit()
return err
}
func (a *App) kv_getset(key []byte, value []byte) ([]byte, error) {
key = encode_kv_key(key)
t := a.kvTx
t.Lock()
defer t.Unlock()
oldValue, err := a.db.Get(key)
if err != nil {
return nil, err
}
t.Put(key, value)
//todo, binlog
err = t.Commit()
return oldValue, err
}
func (a *App) kv_setnx(key []byte, value []byte) (int64, error) {
key = encode_kv_key(key)
var err error
var n int64 = 1
t := a.kvTx
t.Lock()
defer t.Unlock()
if v, err := a.db.Get(key); err != nil {
return 0, err
} else if v != nil {
n = 0
} else {
t.Put(key, value)
//todo binlog
err = t.Commit()
}
return n, err
}
func (a *App) kv_exists(key []byte) (int64, error) {
key = encode_kv_key(key)
var err error
var v []byte
v, err = a.db.Get(key)
if v != nil && err == nil {
return 1, nil
} else {
return 0, err
}
}
func (a *App) kv_incr(key []byte, delta int64) (int64, error) {
key = encode_kv_key(key)
var err error
t := a.kvTx
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
var n int64 var n int64
n, err = StrInt64(a.db.Get(key)) n, err = StrInt64(db.db.Get(key))
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -130,12 +46,20 @@ func (a *App) kv_incr(key []byte, delta int64) (int64, error) {
return n, err return n, err
} }
func (a *App) tx_del(keys [][]byte) (int64, error) { func (db *DB) Decr(key []byte) (int64, error) {
return db.incr(key, -1)
}
func (db *DB) DecrBy(key []byte, decrement int64) (int64, error) {
return db.incr(key, decrement)
}
func (db *DB) Del(keys [][]byte) (int64, error) {
for i := range keys { for i := range keys {
keys[i] = encode_kv_key(keys[i]) keys[i] = encode_kv_key(keys[i])
} }
t := a.kvTx t := db.kvTx
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
@ -149,8 +73,72 @@ func (a *App) tx_del(keys [][]byte) (int64, error) {
return int64(len(keys)), err return int64(len(keys)), err
} }
func (a *App) tx_mset(args [][]byte) error { func (db *DB) Exists(key []byte) (int64, error) {
t := a.kvTx key = encode_kv_key(key)
var err error
var v []byte
v, err = db.db.Get(key)
if v != nil && err == nil {
return 1, nil
}
return 0, err
}
func (db *DB) Get(key []byte) ([]byte, error) {
key = encode_kv_key(key)
return db.db.Get(key)
}
func (db *DB) GetSet(key []byte, value []byte) ([]byte, error) {
key = encode_kv_key(key)
t := db.kvTx
t.Lock()
defer t.Unlock()
oldValue, err := db.db.Get(key)
if err != nil {
return nil, err
}
t.Put(key, value)
//todo, binlog
err = t.Commit()
return oldValue, err
}
func (db *DB) Incr(key []byte) (int64, error) {
return db.incr(key, 1)
}
func (db *DB) IncryBy(key []byte, increment int64) (int64, error) {
return db.incr(key, increment)
}
func (db *DB) MGet(keys [][]byte) ([]interface{}, error) {
values := make([]interface{}, len(keys))
for i := range keys {
key := encode_kv_key(keys[i])
value, err := db.db.Get(key)
if err != nil {
return nil, err
}
values[i] = value
}
return values, nil
}
func (db *DB) MSet(args [][]byte) error {
t := db.kvTx
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
@ -168,18 +156,46 @@ func (a *App) tx_mset(args [][]byte) error {
return err return err
} }
func (a *App) kv_mget(args [][]byte) ([]interface{}, error) { func (db *DB) Set(key []byte, value []byte) error {
values := make([]interface{}, len(args)) key = encode_kv_key(key)
var err error
for i := range args { t := db.kvTx
key := encode_kv_key(args[i])
value, err := a.db.Get(key)
if err != nil {
return nil, err
}
values[i] = value t.Lock()
defer t.Unlock()
t.Put(key, value)
//todo, binlog
err = t.Commit()
return err
}
func (db *DB) SetNX(key []byte, value []byte) (int64, error) {
key = encode_kv_key(key)
var err error
var n int64 = 1
t := db.kvTx
t.Lock()
defer t.Unlock()
if v, err := db.db.Get(key); err != nil {
return 0, err
} else if v != nil {
n = 0
} else {
t.Put(key, value)
//todo binlog
err = t.Commit()
} }
return values, nil return n, err
} }

View File

@ -70,69 +70,13 @@ func decode_list_key(ek []byte) (key []byte, seq int32, err error) {
return return
} }
func (a *App) list_lpush(key []byte, args [][]byte) (int64, error) { func (db *DB) lpush(key []byte, args [][]byte, whereSeq int32) (int64, error) {
return a.list_push(key, args, listHeadSeq) t := db.listTx
}
func (a *App) list_rpush(key []byte, args [][]byte) (int64, error) {
return a.list_push(key, args, listTailSeq)
}
func (a *App) list_lpop(key []byte) ([]byte, error) {
return a.list_pop(key, listHeadSeq)
}
func (a *App) list_rpop(key []byte) ([]byte, error) {
return a.list_pop(key, listTailSeq)
}
func (a *App) list_getSeq(key []byte, whereSeq int32) (int64, error) {
ek := encode_list_key(key, whereSeq)
return Int64(a.db.Get(ek))
}
func (a *App) list_getMeta(ek []byte) (headSeq int32, tailSeq int32, size int32, err error) {
var v []byte
v, err = a.db.Get(ek)
if err != nil {
return
} else if v == nil {
size = 0
return
} else {
headSeq = int32(binary.LittleEndian.Uint32(v[0:4]))
tailSeq = int32(binary.LittleEndian.Uint32(v[4:8]))
size = int32(binary.LittleEndian.Uint32(v[8:]))
}
return
}
func (a *App) list_setMeta(ek []byte, headSeq int32, tailSeq int32, size int32) {
t := a.listTx
buf := make([]byte, 12)
binary.LittleEndian.PutUint32(buf[0:4], uint32(headSeq))
binary.LittleEndian.PutUint32(buf[4:8], uint32(tailSeq))
binary.LittleEndian.PutUint32(buf[8:], uint32(size))
t.Put(ek, buf)
}
func (a *App) list_len(key []byte) (int64, error) {
ek := encode_lmeta_key(key)
_, _, size, err := a.list_getMeta(ek)
return int64(size), err
}
func (a *App) list_push(key []byte, args [][]byte, whereSeq int32) (int64, error) {
t := a.listTx
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
metaKey := encode_lmeta_key(key) metaKey := encode_lmeta_key(key)
headSeq, tailSeq, size, err := a.list_getMeta(metaKey) headSeq, tailSeq, size, err := db.lGetMeta(metaKey)
if err != nil { if err != nil {
return 0, err return 0, err
@ -174,20 +118,20 @@ func (a *App) list_push(key []byte, args [][]byte, whereSeq int32) (int64, error
tailSeq = seq tailSeq = seq
} }
a.list_setMeta(metaKey, headSeq, tailSeq, size) db.lSetMeta(metaKey, headSeq, tailSeq, size)
err = t.Commit() err = t.Commit()
return int64(size), err return int64(size), err
} }
func (a *App) list_pop(key []byte, whereSeq int32) ([]byte, error) { func (db *DB) lpop(key []byte, whereSeq int32) ([]byte, error) {
t := a.listTx t := db.listTx
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
metaKey := encode_lmeta_key(key) metaKey := encode_lmeta_key(key)
headSeq, tailSeq, size, err := a.list_getMeta(metaKey) headSeq, tailSeq, size, err := db.lGetMeta(metaKey)
if err != nil { if err != nil {
return nil, err return nil, err
@ -204,7 +148,7 @@ func (a *App) list_pop(key []byte, whereSeq int32) ([]byte, error) {
itemKey := encode_list_key(key, seq) itemKey := encode_list_key(key, seq)
var value []byte var value []byte
value, err = a.db.Get(itemKey) value, err = db.db.Get(itemKey)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -222,7 +166,7 @@ func (a *App) list_pop(key []byte, whereSeq int32) ([]byte, error) {
tailSeq = seq tailSeq = seq
} }
a.list_setMeta(metaKey, headSeq, tailSeq, size) db.lSetMeta(metaKey, headSeq, tailSeq, size)
} }
//todo add binlog //todo add binlog
@ -230,7 +174,71 @@ func (a *App) list_pop(key []byte, whereSeq int32) ([]byte, error) {
return value, err return value, err
} }
func (a *App) list_range(key []byte, start int32, stop int32) ([]interface{}, error) { func (db *DB) lGetSeq(key []byte, whereSeq int32) (int64, error) {
ek := encode_list_key(key, whereSeq)
return Int64(db.db.Get(ek))
}
func (db *DB) lGetMeta(ek []byte) (headSeq int32, tailSeq int32, size int32, err error) {
var v []byte
v, err = db.db.Get(ek)
if err != nil {
return
} else if v == nil {
size = 0
return
} else {
headSeq = int32(binary.LittleEndian.Uint32(v[0:4]))
tailSeq = int32(binary.LittleEndian.Uint32(v[4:8]))
size = int32(binary.LittleEndian.Uint32(v[8:]))
}
return
}
func (db *DB) lSetMeta(ek []byte, headSeq int32, tailSeq int32, size int32) {
t := db.listTx
buf := make([]byte, 12)
binary.LittleEndian.PutUint32(buf[0:4], uint32(headSeq))
binary.LittleEndian.PutUint32(buf[4:8], uint32(tailSeq))
binary.LittleEndian.PutUint32(buf[8:], uint32(size))
t.Put(ek, buf)
}
func (db *DB) LIndex(key []byte, index int32) ([]byte, error) {
var seq int32
headSeq, tailSeq, _, err := db.lGetMeta(encode_lmeta_key(key))
if err != nil {
return nil, err
}
if index >= 0 {
seq = headSeq + index
} else {
seq = tailSeq + index + 1
}
return db.db.Get(encode_list_key(key, seq))
}
func (db *DB) LLen(key []byte) (int64, error) {
ek := encode_lmeta_key(key)
_, _, size, err := db.lGetMeta(ek)
return int64(size), err
}
func (db *DB) LPop(key []byte) ([]byte, error) {
return db.lpop(key, listHeadSeq)
}
func (db *DB) LPush(key []byte, args [][]byte) (int64, error) {
return db.lpush(key, args, listHeadSeq)
}
func (db *DB) LRange(key []byte, start int32, stop int32) ([]interface{}, error) {
v := make([]interface{}, 0, 16) v := make([]interface{}, 0, 16)
var startSeq int32 var startSeq int32
@ -240,7 +248,7 @@ func (a *App) list_range(key []byte, start int32, stop int32) ([]interface{}, er
return []interface{}{}, nil return []interface{}{}, nil
} }
headSeq, tailSeq, _, err := a.list_getMeta(encode_lmeta_key(key)) headSeq, tailSeq, _, err := db.lGetMeta(encode_lmeta_key(key))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -263,7 +271,7 @@ func (a *App) list_range(key []byte, start int32, stop int32) ([]interface{}, er
stopSeq = listMaxSeq stopSeq = listMaxSeq
} }
it := a.db.Iterator(encode_list_key(key, startSeq), it := db.db.Iterator(encode_list_key(key, startSeq),
encode_list_key(key, stopSeq), leveldb.RangeClose, 0, -1) encode_list_key(key, stopSeq), leveldb.RangeClose, 0, -1)
for ; it.Valid(); it.Next() { for ; it.Valid(); it.Next() {
v = append(v, it.Value()) v = append(v, it.Value())
@ -274,38 +282,30 @@ func (a *App) list_range(key []byte, start int32, stop int32) ([]interface{}, er
return v, nil return v, nil
} }
func (a *App) list_index(key []byte, index int32) ([]byte, error) { func (db *DB) RPop(key []byte) ([]byte, error) {
var seq int32 return db.lpop(key, listTailSeq)
headSeq, tailSeq, _, err := a.list_getMeta(encode_lmeta_key(key))
if err != nil {
return nil, err
}
if index >= 0 {
seq = headSeq + index
} else {
seq = tailSeq + index + 1
}
return a.db.Get(encode_list_key(key, seq))
} }
func (a *App) list_clear(key []byte) (int64, error) { func (db *DB) RPush(key []byte, args [][]byte) (int64, error) {
return db.lpush(key, args, listTailSeq)
}
func (db *DB) LClear(key []byte) (int64, error) {
mk := encode_lmeta_key(key) mk := encode_lmeta_key(key)
t := a.listTx t := db.listTx
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
metaKey := encode_lmeta_key(key) metaKey := encode_lmeta_key(key)
headSeq, tailSeq, _, err := a.list_getMeta(metaKey) headSeq, tailSeq, _, err := db.lGetMeta(metaKey)
if err != nil { if err != nil {
return 0, err return 0, err
} }
var num int64 = 0 var num int64 = 0
it := a.db.Iterator(encode_list_key(key, headSeq), it := db.db.Iterator(encode_list_key(key, headSeq),
encode_list_key(key, tailSeq), leveldb.RangeClose, 0, -1) encode_list_key(key, tailSeq), leveldb.RangeClose, 0, -1)
for ; it.Valid(); it.Next() { for ; it.Valid(); it.Next() {
t.Delete(it.Key()) t.Delete(it.Key())

View File

@ -8,6 +8,11 @@ import (
"strconv" "strconv"
) )
const (
MinScore int64 = -1<<63 + 1
MaxScore int64 = 1<<63 - 1
)
var errZSizeKey = errors.New("invalid zsize key") var errZSizeKey = errors.New("invalid zsize key")
var errZSetKey = errors.New("invalid zset key") var errZSetKey = errors.New("invalid zset key")
var errZScoreKey = errors.New("invalid zscore key") var errZScoreKey = errors.New("invalid zscore key")
@ -146,16 +151,16 @@ func decode_zscore_key(ek []byte) (key []byte, member []byte, score int64, err e
return return
} }
func (a *App) zset_setItem(key []byte, score int64, member []byte) (int64, error) { func (db *DB) zSetItem(key []byte, score int64, member []byte) (int64, error) {
if score <= MinScore || score >= MaxScore { if score <= MinScore || score >= MaxScore {
return 0, errScoreOverflow return 0, errScoreOverflow
} }
t := a.zsetTx t := db.zsetTx
var exists int64 = 0 var exists int64 = 0
ek := encode_zset_key(key, member) ek := encode_zset_key(key, member)
if v, err := a.db.Get(ek); err != nil { if v, err := db.db.Get(ek); err != nil {
return 0, err return 0, err
} else if v != nil { } else if v != nil {
exists = 1 exists = 1
@ -176,11 +181,11 @@ func (a *App) zset_setItem(key []byte, score int64, member []byte) (int64, error
return exists, nil return exists, nil
} }
func (a *App) zset_delItem(key []byte, member []byte, skipDelScore bool) (int64, error) { func (db *DB) zDelItem(key []byte, member []byte, skipDelScore bool) (int64, error) {
t := a.zsetTx t := db.zsetTx
ek := encode_zset_key(key, member) ek := encode_zset_key(key, member)
if v, err := a.db.Get(ek); err != nil { if v, err := db.db.Get(ek); err != nil {
return 0, err return 0, err
} else if v == nil { } else if v == nil {
//not exists //not exists
@ -202,8 +207,8 @@ func (a *App) zset_delItem(key []byte, member []byte, skipDelScore bool) (int64,
return 1, nil return 1, nil
} }
func (a *App) zset_add(key []byte, args []interface{}) (int64, error) { func (db *DB) ZAdd(key []byte, args []interface{}) (int64, error) {
t := a.zsetTx t := db.zsetTx
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
@ -212,7 +217,7 @@ func (a *App) zset_add(key []byte, args []interface{}) (int64, error) {
score := args[i].(int64) score := args[i].(int64)
member := args[i+1].([]byte) member := args[i+1].([]byte)
if n, err := a.zset_setItem(key, score, member); err != nil { if n, err := db.zSetItem(key, score, member); err != nil {
return 0, err return 0, err
} else if n == 0 { } else if n == 0 {
//add new //add new
@ -220,7 +225,7 @@ func (a *App) zset_add(key []byte, args []interface{}) (int64, error) {
} }
} }
if _, err := a.zset_incrSize(key, num); err != nil { if _, err := db.zIncrSize(key, num); err != nil {
return 0, err return 0, err
} }
@ -229,10 +234,10 @@ func (a *App) zset_add(key []byte, args []interface{}) (int64, error) {
return num, err return num, err
} }
func (a *App) zset_incrSize(key []byte, delta int64) (int64, error) { func (db *DB) zIncrSize(key []byte, delta int64) (int64, error) {
t := a.zsetTx t := db.zsetTx
sk := encode_zsize_key(key) sk := encode_zsize_key(key)
size, err := Int64(a.db.Get(sk)) size, err := Int64(db.db.Get(sk))
if err != nil { if err != nil {
return 0, err return 0, err
} else { } else {
@ -248,15 +253,15 @@ func (a *App) zset_incrSize(key []byte, delta int64) (int64, error) {
return size, nil return size, nil
} }
func (a *App) zset_card(key []byte) (int64, error) { func (db *DB) ZCard(key []byte) (int64, error) {
sk := encode_zsize_key(key) sk := encode_zsize_key(key)
size, err := Int64(a.db.Get(sk)) size, err := Int64(db.db.Get(sk))
return size, err return size, err
} }
func (a *App) zset_score(key []byte, member []byte) ([]byte, error) { func (db *DB) ZScore(key []byte, member []byte) ([]byte, error) {
k := encode_zset_key(key, member) k := encode_zset_key(key, member)
score, err := Int64(a.db.Get(k)) score, err := Int64(db.db.Get(k))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -264,21 +269,21 @@ func (a *App) zset_score(key []byte, member []byte) ([]byte, error) {
return Slice(strconv.FormatInt(score, 10)), nil return Slice(strconv.FormatInt(score, 10)), nil
} }
func (a *App) zset_rem(key []byte, args [][]byte) (int64, error) { func (db *DB) ZRem(key []byte, args [][]byte) (int64, error) {
t := a.zsetTx t := db.zsetTx
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
var num int64 = 0 var num int64 = 0
for i := 0; i < len(args); i++ { for i := 0; i < len(args); i++ {
if n, err := a.zset_delItem(key, args[i], false); err != nil { if n, err := db.zDelItem(key, args[i], false); err != nil {
return 0, err return 0, err
} else if n == 1 { } else if n == 1 {
num++ num++
} }
} }
if _, err := a.zset_incrSize(key, -num); err != nil { if _, err := db.zIncrSize(key, -num); err != nil {
return 0, err return 0, err
} }
@ -286,14 +291,14 @@ func (a *App) zset_rem(key []byte, args [][]byte) (int64, error) {
return num, err return num, err
} }
func (a *App) zset_incrby(key []byte, delta int64, member []byte) ([]byte, error) { func (db *DB) ZIncrBy(key []byte, delta int64, member []byte) ([]byte, error) {
t := a.zsetTx t := db.zsetTx
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
ek := encode_zset_key(key, member) ek := encode_zset_key(key, member)
var score int64 = delta var score int64 = delta
v, err := a.db.Get(ek) v, err := db.db.Get(ek)
if err != nil { if err != nil {
return nil, err return nil, err
} else if v != nil { } else if v != nil {
@ -310,7 +315,7 @@ func (a *App) zset_incrby(key []byte, delta int64, member []byte) ([]byte, error
} }
} }
} else { } else {
a.zset_incrSize(key, 1) db.zIncrSize(key, 1)
} }
t.Put(ek, PutInt64(score)) t.Put(ek, PutInt64(score))
@ -321,13 +326,13 @@ func (a *App) zset_incrby(key []byte, delta int64, member []byte) ([]byte, error
return Slice(strconv.FormatInt(score, 10)), err return Slice(strconv.FormatInt(score, 10)), err
} }
func (a *App) zset_count(key []byte, min int64, max int64) (int64, error) { func (db *DB) ZCount(key []byte, min int64, max int64) (int64, error) {
minKey := encode_start_zscore_key(key, min) minKey := encode_start_zscore_key(key, min)
maxKey := encode_stop_zscore_key(key, max) maxKey := encode_stop_zscore_key(key, max)
rangeType := leveldb.RangeROpen rangeType := leveldb.RangeROpen
it := a.db.Iterator(minKey, maxKey, rangeType, 0, -1) it := db.db.Iterator(minKey, maxKey, rangeType, 0, -1)
var n int64 = 0 var n int64 = 0
for ; it.Valid(); it.Next() { for ; it.Valid(); it.Next() {
n++ n++
@ -337,10 +342,10 @@ func (a *App) zset_count(key []byte, min int64, max int64) (int64, error) {
return n, nil return n, nil
} }
func (a *App) zset_rank(key []byte, member []byte, reverse bool) (int64, error) { func (db *DB) zrank(key []byte, member []byte, reverse bool) (int64, error) {
k := encode_zset_key(key, member) k := encode_zset_key(key, member)
if v, err := a.db.Get(k); err != nil { if v, err := db.db.Get(k); err != nil {
return 0, err return 0, err
} else if v == nil { } else if v == nil {
return -1, nil return -1, nil
@ -354,10 +359,10 @@ func (a *App) zset_rank(key []byte, member []byte, reverse bool) (int64, error)
if !reverse { if !reverse {
minKey := encode_start_zscore_key(key, MinScore) minKey := encode_start_zscore_key(key, MinScore)
it = a.db.Iterator(minKey, sk, leveldb.RangeClose, 0, -1) it = db.db.Iterator(minKey, sk, leveldb.RangeClose, 0, -1)
} else { } else {
maxKey := encode_stop_zscore_key(key, MaxScore) maxKey := encode_stop_zscore_key(key, MaxScore)
it = a.db.RevIterator(sk, maxKey, leveldb.RangeClose, 0, -1) it = db.db.RevIterator(sk, maxKey, leveldb.RangeClose, 0, -1)
} }
var lastKey []byte = nil var lastKey []byte = nil
@ -381,23 +386,23 @@ func (a *App) zset_rank(key []byte, member []byte, reverse bool) (int64, error)
return -1, nil return -1, nil
} }
func (a *App) zset_iterator(key []byte, min int64, max int64, offset int, limit int, reverse bool) *leveldb.Iterator { func (db *DB) zIterator(key []byte, min int64, max int64, offset int, limit int, reverse bool) *leveldb.Iterator {
minKey := encode_start_zscore_key(key, min) minKey := encode_start_zscore_key(key, min)
maxKey := encode_stop_zscore_key(key, max) maxKey := encode_stop_zscore_key(key, max)
if !reverse { if !reverse {
return a.db.Iterator(minKey, maxKey, leveldb.RangeClose, offset, limit) return db.db.Iterator(minKey, maxKey, leveldb.RangeClose, offset, limit)
} else { } else {
return a.db.RevIterator(minKey, maxKey, leveldb.RangeClose, offset, limit) return db.db.RevIterator(minKey, maxKey, leveldb.RangeClose, offset, limit)
} }
} }
func (a *App) zset_remRange(key []byte, min int64, max int64, offset int, limit int) (int64, error) { func (db *DB) zRemRange(key []byte, min int64, max int64, offset int, limit int) (int64, error) {
t := a.zsetTx t := db.zsetTx
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
it := a.zset_iterator(key, min, max, offset, limit, false) it := db.zIterator(key, min, max, offset, limit, false)
var num int64 = 0 var num int64 = 0
for ; it.Valid(); it.Next() { for ; it.Valid(); it.Next() {
k := it.Key() k := it.Key()
@ -406,7 +411,7 @@ func (a *App) zset_remRange(key []byte, min int64, max int64, offset int, limit
continue continue
} }
if n, err := a.zset_delItem(key, m, true); err != nil { if n, err := db.zDelItem(key, m, true); err != nil {
return 0, err return 0, err
} else if n == 1 { } else if n == 1 {
num++ num++
@ -415,7 +420,7 @@ func (a *App) zset_remRange(key []byte, min int64, max int64, offset int, limit
t.Delete(k) t.Delete(k)
} }
if _, err := a.zset_incrSize(key, -num); err != nil { if _, err := db.zIncrSize(key, -num); err != nil {
return 0, err return 0, err
} }
@ -425,7 +430,7 @@ func (a *App) zset_remRange(key []byte, min int64, max int64, offset int, limit
return num, err return num, err
} }
func (a *App) zset_reverse(s []interface{}, withScores bool) []interface{} { func (db *DB) zReverse(s []interface{}, withScores bool) []interface{} {
if withScores { if withScores {
for i, j := 0, len(s)-2; i < j; i, j = i+2, j-2 { for i, j := 0, len(s)-2; i < j; i, j = i+2, j-2 {
s[i], s[j] = s[j], s[i] s[i], s[j] = s[j], s[i]
@ -440,7 +445,11 @@ func (a *App) zset_reverse(s []interface{}, withScores bool) []interface{} {
return s return s
} }
func (a *App) zset_range(key []byte, min int64, max int64, withScores bool, offset int, limit int, reverse bool) ([]interface{}, error) { func (db *DB) zRange(key []byte, min int64, max int64, withScores bool, offset int, limit int, reverse bool) ([]interface{}, error) {
if offset < 0 {
return []interface{}{}, nil
}
nv := 64 nv := 64
if limit > 0 { if limit > 0 {
nv = limit nv = limit
@ -455,9 +464,9 @@ func (a *App) zset_range(key []byte, min int64, max int64, withScores bool, offs
//if reverse and offset is 0, limit < 0, we may use forward iterator then reverse //if reverse and offset is 0, limit < 0, we may use forward iterator then reverse
//because leveldb iterator prev is slower than next //because leveldb iterator prev is slower than next
if !reverse || (offset == 0 && limit < 0) { if !reverse || (offset == 0 && limit < 0) {
it = a.zset_iterator(key, min, max, offset, limit, false) it = db.zIterator(key, min, max, offset, limit, false)
} else { } else {
it = a.zset_iterator(key, min, max, offset, limit, true) it = db.zIterator(key, min, max, offset, limit, true)
} }
for ; it.Valid(); it.Next() { for ; it.Valid(); it.Next() {
@ -475,12 +484,111 @@ func (a *App) zset_range(key []byte, min int64, max int64, withScores bool, offs
} }
if reverse && (offset == 0 && limit < 0) { if reverse && (offset == 0 && limit < 0) {
v = a.zset_reverse(v, withScores) v = db.zReverse(v, withScores)
} }
return v, nil return v, nil
} }
func (a *App) zset_clear(key []byte) (int64, error) { func (db *DB) zParseLimit(key []byte, start int, stop int) (offset int, limit int, err error) {
return a.zset_remRange(key, MinScore, MaxScore, 0, -1) if start < 0 || stop < 0 {
//refer redis implementation
var size int64
size, err = db.ZCard(key)
if err != nil {
return
}
llen := int(size)
if start < 0 {
start = llen + start
}
if stop < 0 {
stop = llen + stop
}
if start < 0 {
start = 0
}
if start >= llen {
offset = -1
return
}
}
if start > stop {
offset = -1
return
}
offset = start
limit = (stop - start) + 1
return
}
func (db *DB) ZClear(key []byte) (int64, error) {
return db.zRemRange(key, MinScore, MaxScore, 0, -1)
}
func (db *DB) ZRange(key []byte, start int, stop int, withScores bool) ([]interface{}, error) {
return db.ZRangeGeneric(key, start, stop, withScores, false)
}
//min and max must be inclusive
//if no limit, set offset = 0 and count = -1
func (db *DB) ZRangeByScore(key []byte, min int64, max int64,
withScores bool, offset int, count int) ([]interface{}, error) {
return db.ZRangeByScoreGeneric(key, min, max, withScores, offset, count, false)
}
func (db *DB) ZRank(key []byte, member []byte) (int64, error) {
return db.zrank(key, member, false)
}
func (db *DB) ZRemRangeByRank(key []byte, start int, stop int) (int64, error) {
offset, limit, err := db.zParseLimit(key, start, stop)
if err != nil {
return 0, err
}
return db.zRemRange(key, MinScore, MaxScore, offset, limit)
}
//min and max must be inclusive
func (db *DB) ZRemRangeByScore(key []byte, min int64, max int64) (int64, error) {
return db.zRemRange(key, min, max, 0, -1)
}
func (db *DB) ZRevRange(key []byte, start int, stop int, withScores bool) ([]interface{}, error) {
return db.ZRangeGeneric(key, start, stop, withScores, true)
}
func (db *DB) ZRevRank(key []byte, member []byte) (int64, error) {
return db.zrank(key, member, true)
}
//min and max must be inclusive
//if no limit, set offset = 0 and count = -1
func (db *DB) ZRevRangeByScore(key []byte, min int64, max int64,
withScores bool, offset int, count int) ([]interface{}, error) {
return db.ZRangeByScoreGeneric(key, min, max, withScores, offset, count, true)
}
func (db *DB) ZRangeGeneric(key []byte, start int, stop int,
withScores bool, reverse bool) ([]interface{}, error) {
offset, limit, err := db.zParseLimit(key, start, stop)
if err != nil {
return nil, err
}
return db.zRange(key, MinScore, MaxScore, withScores, offset, limit, reverse)
}
//min and max must be inclusive
//if no limit, set offset = 0 and count = -1
func (db *DB) ZRangeByScoreGeneric(key []byte, min int64, max int64,
withScores bool, offset int, count int, reverse bool) ([]interface{}, error) {
return db.zRange(key, min, max, withScores, offset, count, reverse)
} }

View File

@ -8,21 +8,9 @@ import (
type tx struct { type tx struct {
m sync.Mutex m sync.Mutex
app *App
wb *leveldb.WriteBatch wb *leveldb.WriteBatch
} }
func (app *App) newTx() *tx {
t := new(tx)
t.app = app
t.wb = app.db.NewWriteBatch()
return t
}
func (t *tx) Close() { func (t *tx) Close() {
t.wb.Close() t.wb.Close()
} }