Merge branch 'develop'

This commit is contained in:
siddontang 2014-07-09 17:15:57 +08:00
commit a52d850ec2
18 changed files with 546 additions and 184 deletions

21
client/go/LICENSE Normal file
View File

@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright (c) 2014 siddontang
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

21
client/ledis-py/LICENSE Normal file
View File

@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright (c) 2014 siddontang
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -0,0 +1,5 @@
include README.md
include redis-py_license
exclude tests/__pycache__
recursive-include tests *
recursive-exclude tests *.pyc

21
client/openresty/LICENSE Normal file
View File

@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright (c) 2014 siddontang
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

326
client/openresty/README.md Normal file
View File

@ -0,0 +1,326 @@
Name
=====
lua-resty-ledis - Lua ledisdb client driver for the ngx_lua based on the cosocket API
Status
======
The library is not production ready. Do it at your own risk.
Description
===========
This Lua library is a [ledisdb](https://github.com/siddontang/ledisdb) client driver for the ngx_lua nginx module:
http://wiki.nginx.org/HttpLuaModule
This Lua library takes advantage of ngx_lua's cosocket API, which ensures 100% nonblocking behavior.
Note that at least [ngx_lua 0.5.14](https://github.com/openresty/lua-nginx-module/tags) or [ngx_openresty 1.2.1.14](http://openresty.org/#Download) is required.
Synopsis
========
```lua
# you have to add the following line
lua_package_path "/path/to/ledis/client/openresty/?.lua;;";
server {
location /test {
content_by_lua '
local ledis = require "ledis"
local lds = ledis:new()
lds:set_timeout(1000) -- 1 sec
-- or connect to a unix domain socket file listened
-- by a ledis server:
-- local ok, err = lds:connect("unix:/path/to/ledis.sock")
local ok, err = lds:connect("127.0.0.1", 6380)
if not ok then
ngx.say("failed to connect: ", err)
return
end
ok, err = lds:set("dog", "an animal")
if not ok then
ngx.say("failed to set dog: ", err)
return
end
ngx.say("set result: ", ok)
local res, err = lds:get("dog")
if not res then
ngx.say("failed to get dog: ", err)
return
end
if res == ngx.null then
ngx.say("dog not found.")
return
end
ngx.say("dog: ", res)
-- put it into the connection pool of size 100,
-- with 10 seconds max idle time
local ok, err = lds:set_keepalive(10000, 100)
if not ok then
ngx.say("failed to set keepalive: ", err)
return
end
-- or just close the connection right away:
-- local ok, err = lds:close()
-- if not ok then
-- ngx.say("failed to close: ", err)
-- return
-- end
';
}
}
```
Methods
=========
All of the ledisdb commands have their own methods with the same name except all in **lower case**.
You can find the complete list of ledisdb commands here"
https://github.com/siddontang/ledisdb/wiki/Commands
You need to check out this ledisdb command reference to see what ledisdb command accepts what arguments.
The ledisdb command arguments can be directly fed into the corresponding method call. For example, the `GET` ledisdb command accepts a single key argument, then you can just call the `get` method like this:
local res, err = lds:get("key")
Similarly, the "LRANGE" ledisdb command accepts threee arguments, then you should call the "lrange" method like this:
local res, err = lds:lrange("nokey", 0, 1)
For example, "SET", "GET", "LRANGE", and "LPOP" commands correspond to the methods "set", "get", "lrange", and "lpop".
Here are some more examples:
-- HMGET myhash field1 field2 nofield
local res, err = lds:hmget("myhash", "field1", "field2", "nofield")
-- HMSET myhash field1 "Hello" field2 "World"
local res, err = lds:hmset("myhash", "field1", "Hello", "field2", "World")
All these command methods returns a single result in success and nil otherwise. In case of errors or failures, it will also return a second value which is a string describing the error.
All these command methods returns a single result in success and nil otherwise. In case of errors or failures, it will also return a second value which is a string describing the error.
- A Redis "status reply" results in a string typed return value with the "+" prefix stripped.
- A Redis "integer reply" results in a Lua number typed return value.
- A Redis "error reply" results in a false value and a string describing the error.
- A non-nil Redis "bulk reply" results in a Lua string as the return value. A nil bulk reply results in a ngx.null return value.
- A non-nil Redis "multi-bulk reply" results in a Lua table holding all the composing values (if any). If any of the composing value is a valid redis error value, then it will be a two element table {false, err}.
- A nil multi-bulk reply returns in a ngx.null value.
See http://redis.io/topics/protocol for details regarding various Redis reply types.
In addition to all those ledisdb command methods, the following methods are also provided:
new
====
synxtax: lds, err = ledis:new()
Creates a ledis object. In case of failures, returns nil and a string describing the error.
connect
========
syntax: ok, err = lds:connect(host, port, options_table?)
syntax: ok, err = lds:connect("unix:/path/to/unix.sock", options_table?)
Attempts to connect to the remote host and port that the ledis server is listening to or a local unix domain socket file listened by the ledis server.
Before actually resolving the host name and connecting to the remote backend, this method will always look up the connection pool for matched idle connections created by previous calls of this method.
An optional Lua table can be specified as the last argument to this method to specify various connect options:
- pool
Specifies a custom name for the connection pool being used. If omitted, then the connection pool name will be generated from the string template ` <host>:<port>` or `<unix-socket-path>`.
set_timeout
============
syntax: lds:set_timeout(time)
Sets the timeout (in `ms`) protection for subsequent operations, including the connect method.
set_keepalive
==============
syntax: ok, err = lds:set_keepalive(max_idle_timeout, pool_size)
Puts the current ledis connection immediately into the ngx_lua cosocket connection pool.
You can specify the max idle timeout (in ms) when the connection is in the pool and the maximal size of the pool every nginx worker process.
In case of success, returns 1. In case of errors, returns nil with a string describing the error.
Only call this method in the place you would have called the close method instead. Calling this method will immediately turn the current ledis object into the `closed` state. Any subsequent operations other than connect() on the current objet will return the closed error.
get_reused_times
=================
syntax: times, err = lds:get_reused_times()
This method returns the (successfully) reused times for the current connection. In case of error, it returns nil and a string describing the error.
If the current connection does not come from the built-in connection pool, then this method always returns 0, that is, the connection has never been reused (yet). If the connection comes from the connection pool, then the return value is always non-zero. So this method can also be used to determine if the current connection comes from the pool.
close
=======
syntax: ok, err = lds:close()
Closes the current ledis connection and returns the status.
In case of success, returns 1. In case of errors, returns nil with a string describing the error.
hmset
======
syntax: lds:hmset(myhash, field1, value1, field2, value2, ...)
syntax: lds:hmset(myhash, { field1 = value1, field2 = value2, ... })
Special wrapper for the ledis `hmset` command.
When there are only three arguments (including the "lds" object itself), then the last argument must be a Lua table holding all the field/value pairs.
add_commands
============
syntax: hash = ledis.add_commands(cmd_name1, cmd_name2, ...)
Adds new ledis commands to the resty.ledis class. Here is an example:
```lua
local ledis = require "ledis"
ledis.add_commands("foo", "bar")
local lds = ledis:new()
lds:set_timeout(1000) -- 1 sec
local ok, err = lds:connect("127.0.0.1", 6380)
if not ok then
ngx.say("failed to connect: ", err)
return
end
local res, err = lds:foo("a")
if not res then
ngx.say("failed to foo: ", err)
end
res, err = lds:bar()
if not res then
ngx.say("failed to bar: ", err)
end
```
Debugging
=========
It is usually convenient to use the lua-cjson library to encode the return values of the ledis command methods to JSON. For example,
local cjson = require "cjson"
...
local res, err = lds:mget("h1234", "h5678")
if res then
print("res: ", cjson.encode(res))
end
Automatic Error Logging
========================
By default the underlying [ngx_lua](http://wiki.nginx.org/HttpLuaModule) module does error logging when socket errors happen. If you are already doing proper error handling in your own Lua code, then you are recommended to disable this automatic error logging by turning off ngx_lua's [lua_socket_log_errors](http://wiki.nginx.org/HttpLuaModule#lua_socket_log_errors) directive, that is,
lua_socket_log_errors off;
Check List for Issues
=======================
Please refer to [lua-resty-redis](https://github.com/openresty/lua-resty-redis#check-list-for-issues).
Limitations
===========
Please refer to [lua-resty-redis](https://github.com/openresty/lua-resty-redis#limitations).
Installation
============
You need to configure the `lua_package_path` directive to add the path of your lua-resty-ledis source tree to ngx_lua's `LUA_PATH` search path, as in
# nginx.conf
http {
lua_package_path "/path/to/ledis/client/openresty/?.lua;;";
...
}
Ensure that the system account running your Nginx ''worker'' proceses have enough permission to read the `.lua` file.
Bugs and Patches
================
Please report bugs or submit patches by [shooting an issue](https://github.com/siddontang/ledisdb/issues/new). Thank you.
Author
======
The original author is Yichun "agentzh" Zhang (章亦春) agentzh@gmail.com, CloudFlare Inc.
Thanks
======
Thanks Yichun "agentzh" Zhang (章亦春) for making such great works.

View File

@ -307,19 +307,6 @@ end
function _M.read_reply(self)
local sock = self.sock
if not sock then
return nil, "not initialized"
end
local res, err = _read_reply(self, sock)
return res, err
end
for i = 1, #commands do
local cmd = commands[i]

View File

@ -5,28 +5,22 @@ import (
)
const (
kvType byte = iota + 1
hashType
hSizeType
listType
lMetaType
zsetType
zSizeType
zScoreType
noneType byte = 0
kvType byte = 1
hashType byte = 2
hSizeType byte = 3
listType byte = 4
lMetaType byte = 5
zsetType byte = 6
zSizeType byte = 7
zScoreType byte = 8
binType byte = 9
binMetaType byte = 10
kvExpType
kvExpMetaType
lExpType
lExpMetaType
hExpType
hExpMetaType
zExpType
zExpMetaType
maxDataType byte = 100
binType
binMetaType
bExpType
bExpMetaType
expTimeType byte = 101
expMetaType byte = 102
)
const (

View File

@ -26,11 +26,11 @@ func (db *DB) FlushAll() (drop int64, err error) {
func (db *DB) newEliminator() *elimination {
eliminator := newEliminator(db)
eliminator.regRetireContext(kvExpType, db.kvTx, db.delete)
eliminator.regRetireContext(lExpType, db.listTx, db.lDelete)
eliminator.regRetireContext(hExpType, db.hashTx, db.hDelete)
eliminator.regRetireContext(zExpType, db.zsetTx, db.zDelete)
eliminator.regRetireContext(bExpType, db.binTx, db.bDelete)
eliminator.regRetireContext(kvType, db.kvTx, db.delete)
eliminator.regRetireContext(listType, db.listTx, db.lDelete)
eliminator.regRetireContext(hashType, db.hashTx, db.hDelete)
eliminator.regRetireContext(zsetType, db.zsetTx, db.zDelete)
eliminator.regRetireContext(binType, db.binTx, db.bDelete)
return eliminator
}
@ -38,7 +38,7 @@ func (db *DB) newEliminator() *elimination {
func (db *DB) flushRegion(t *tx, minKey []byte, maxKey []byte) (drop int64, err error) {
it := db.db.RangeIterator(minKey, maxKey, leveldb.RangeROpen)
for ; it.Valid(); it.Next() {
t.Delete(it.Key())
t.Delete(it.RawKey())
drop++
if drop&1023 == 0 {
if err = t.Commit(); err != nil {

View File

@ -355,7 +355,7 @@ func (db *DB) bExpireAt(key []byte, when int64) (int64, error) {
if seq, _, err := db.bGetMeta(key); err != nil || seq < 0 {
return 0, err
} else {
db.expireAt(t, bExpType, key, when)
db.expireAt(t, binType, key, when)
if err := t.Commit(); err != nil {
return 0, err
}
@ -407,7 +407,7 @@ func (db *DB) BDelete(key []byte) (drop int64, err error) {
defer t.Unlock()
drop = db.bDelete(t, key)
db.rmExpire(t, bExpType, key)
db.rmExpire(t, binType, key)
err = t.Commit()
return
@ -736,7 +736,7 @@ func (db *DB) BOperation(op uint8, dstkey []byte, srckeys ...[]byte) (blen int32
// clear the old data in case
db.bDelete(t, dstkey)
db.rmExpire(t, bExpType, dstkey)
db.rmExpire(t, binType, dstkey)
// set data
db.bSetMeta(t, dstkey, maxDstSeq, maxDstOff)
@ -786,7 +786,7 @@ func (db *DB) BTTL(key []byte) (int64, error) {
return -1, err
}
return db.ttl(bExpType, key)
return db.ttl(binType, key)
}
func (db *DB) BPersist(key []byte) (int64, error) {
@ -798,7 +798,7 @@ func (db *DB) BPersist(key []byte) (int64, error) {
t.Lock()
defer t.Unlock()
n, err := db.rmExpire(t, bExpType, key)
n, err := db.rmExpire(t, binType, key)
if err != nil {
return 0, err
}
@ -825,7 +825,7 @@ func (db *DB) bFlush() (drop int64, err error) {
maxKey[1] = binMetaType + 1
drop, err = db.flushRegion(t, minKey, maxKey)
err = db.expFlush(t, bExpType)
err = db.expFlush(t, binType)
err = t.Commit()
return

View File

@ -151,7 +151,7 @@ func (db *DB) hExpireAt(key []byte, when int64) (int64, error) {
if hlen, err := db.HLen(key); err != nil || hlen == 0 {
return 0, err
} else {
db.expireAt(t, hExpType, key, when)
db.expireAt(t, hashType, key, when)
if err := t.Commit(); err != nil {
return 0, err
}
@ -304,7 +304,7 @@ func (db *DB) hIncrSize(key []byte, delta int64) (int64, error) {
if size <= 0 {
size = 0
t.Delete(sk)
db.rmExpire(t, hExpType, key)
db.rmExpire(t, hashType, key)
} else {
t.Put(sk, PutInt64(size))
}
@ -428,7 +428,7 @@ func (db *DB) HClear(key []byte) (int64, error) {
defer t.Unlock()
num := db.hDelete(t, key)
db.rmExpire(t, hExpType, key)
db.rmExpire(t, hashType, key)
err := t.Commit()
return num, err
@ -445,7 +445,7 @@ func (db *DB) HMclear(keys ...[]byte) (int64, error) {
}
db.hDelete(t, key)
db.rmExpire(t, hExpType, key)
db.rmExpire(t, hashType, key)
}
err := t.Commit()
@ -466,7 +466,7 @@ func (db *DB) hFlush() (drop int64, err error) {
defer t.Unlock()
drop, err = db.flushRegion(t, minKey, maxKey)
err = db.expFlush(t, hExpType)
err = db.expFlush(t, hashType)
err = t.Commit()
return
@ -530,7 +530,7 @@ func (db *DB) HTTL(key []byte) (int64, error) {
return -1, err
}
return db.ttl(hExpType, key)
return db.ttl(hashType, key)
}
func (db *DB) HPersist(key []byte) (int64, error) {
@ -542,7 +542,7 @@ func (db *DB) HPersist(key []byte) (int64, error) {
t.Lock()
defer t.Unlock()
n, err := db.rmExpire(t, hExpType, key)
n, err := db.rmExpire(t, hashType, key)
if err != nil {
return 0, err
}

View File

@ -100,7 +100,7 @@ func (db *DB) setExpireAt(key []byte, when int64) (int64, error) {
if exist, err := db.Exists(key); err != nil || exist == 0 {
return 0, err
} else {
db.expireAt(t, kvExpType, key, when)
db.expireAt(t, kvType, key, when)
if err := t.Commit(); err != nil {
return 0, err
}
@ -132,7 +132,7 @@ func (db *DB) Del(keys ...[]byte) (int64, error) {
for i, k := range keys {
t.Delete(codedKeys[i])
db.rmExpire(t, kvExpType, k)
db.rmExpire(t, kvType, k)
}
err := t.Commit()
@ -317,7 +317,7 @@ func (db *DB) flush() (drop int64, err error) {
defer t.Unlock()
drop, err = db.flushRegion(t, minKey, maxKey)
err = db.expFlush(t, kvExpType)
err = db.expFlush(t, kvType)
err = t.Commit()
return
@ -382,7 +382,7 @@ func (db *DB) TTL(key []byte) (int64, error) {
return -1, err
}
return db.ttl(kvExpType, key)
return db.ttl(kvType, key)
}
func (db *DB) Persist(key []byte) (int64, error) {
@ -393,7 +393,7 @@ func (db *DB) Persist(key []byte) (int64, error) {
t := db.kvTx
t.Lock()
defer t.Unlock()
n, err := db.rmExpire(t, kvExpType, key)
n, err := db.rmExpire(t, kvType, key)
if err != nil {
return 0, err
}

View File

@ -175,7 +175,7 @@ func (db *DB) lpop(key []byte, whereSeq int32) ([]byte, error) {
t.Delete(itemKey)
size := db.lSetMeta(metaKey, headSeq, tailSeq)
if size == 0 {
db.rmExpire(t, hExpType, key)
db.rmExpire(t, hashType, key)
}
err = t.Commit()
@ -264,7 +264,7 @@ func (db *DB) lExpireAt(key []byte, when int64) (int64, error) {
if llen, err := db.LLen(key); err != nil || llen == 0 {
return 0, err
} else {
db.expireAt(t, lExpType, key, when)
db.expireAt(t, listType, key, when)
if err := t.Commit(); err != nil {
return 0, err
}
@ -384,7 +384,7 @@ func (db *DB) LClear(key []byte) (int64, error) {
defer t.Unlock()
num := db.lDelete(t, key)
db.rmExpire(t, lExpType, key)
db.rmExpire(t, listType, key)
err := t.Commit()
return num, err
@ -401,7 +401,7 @@ func (db *DB) LMclear(keys ...[]byte) (int64, error) {
}
db.lDelete(t, key)
db.rmExpire(t, lExpType, key)
db.rmExpire(t, listType, key)
}
@ -423,7 +423,7 @@ func (db *DB) lFlush() (drop int64, err error) {
defer t.Unlock()
drop, err = db.flushRegion(t, minKey, maxKey)
err = db.expFlush(t, lExpType)
err = db.expFlush(t, listType)
err = t.Commit()
return
@ -450,7 +450,7 @@ func (db *DB) LTTL(key []byte) (int64, error) {
return -1, err
}
return db.ttl(lExpType, key)
return db.ttl(listType, key)
}
func (db *DB) LPersist(key []byte) (int64, error) {
@ -462,7 +462,7 @@ func (db *DB) LPersist(key []byte) (int64, error) {
t.Lock()
defer t.Unlock()
n, err := db.rmExpire(t, lExpType, key)
n, err := db.rmExpire(t, listType, key)
if err != nil {
return 0, err
}

View File

@ -7,30 +7,28 @@ import (
"time"
)
var mapExpMetaType = map[byte]byte{
kvExpType: kvExpMetaType,
lExpType: lExpMetaType,
hExpType: hExpMetaType,
zExpType: zExpMetaType,
bExpType: bExpMetaType}
var (
errExpMetaKey = errors.New("invalid expire meta key")
errExpTimeKey = errors.New("invalid expire time key")
)
type retireCallback func(*tx, []byte) int64
type elimination struct {
db *DB
exp2Tx map[byte]*tx
exp2Retire map[byte]retireCallback
exp2Tx []*tx
exp2Retire []retireCallback
}
var errExpType = errors.New("invalid expire type")
func (db *DB) expEncodeTimeKey(expType byte, key []byte, when int64) []byte {
// format : db[8] / expType[8] / when[64] / key[...]
buf := make([]byte, len(key)+10)
func (db *DB) expEncodeTimeKey(dataType byte, key []byte, when int64) []byte {
buf := make([]byte, len(key)+11)
buf[0] = db.index
buf[1] = expType
pos := 2
buf[1] = expTimeType
buf[2] = dataType
pos := 3
binary.BigEndian.PutUint64(buf[pos:], uint64(when))
pos += 8
@ -40,43 +38,49 @@ func (db *DB) expEncodeTimeKey(expType byte, key []byte, when int64) []byte {
return buf
}
func (db *DB) expEncodeMetaKey(expType byte, key []byte) []byte {
// format : db[8] / expType[8] / key[...]
buf := make([]byte, len(key)+2)
func (db *DB) expEncodeMetaKey(dataType byte, key []byte) []byte {
buf := make([]byte, len(key)+3)
buf[0] = db.index
buf[1] = expType
pos := 2
buf[1] = expMetaType
buf[2] = dataType
pos := 3
copy(buf[pos:], key)
return buf
}
// usage : separate out the original key
func (db *DB) expDecodeMetaKey(mk []byte) []byte {
if len(mk) <= 2 {
// check db ? check type ?
return nil
func (db *DB) expDecodeMetaKey(mk []byte) (byte, []byte, error) {
if len(mk) <= 3 || mk[0] != db.index || mk[1] != expMetaType {
return 0, nil, errExpMetaKey
}
return mk[2:]
return mk[2], mk[3:], nil
}
func (db *DB) expire(t *tx, expType byte, key []byte, duration int64) {
db.expireAt(t, expType, key, time.Now().Unix()+duration)
func (db *DB) expDecodeTimeKey(tk []byte) (byte, []byte, int64, error) {
if len(tk) < 11 || tk[0] != db.index || tk[1] != expTimeType {
return 0, nil, 0, errExpTimeKey
}
return tk[2], tk[11:], int64(binary.BigEndian.Uint64(tk[3:])), nil
}
func (db *DB) expireAt(t *tx, expType byte, key []byte, when int64) {
mk := db.expEncodeMetaKey(expType+1, key)
tk := db.expEncodeTimeKey(expType, key, when)
func (db *DB) expire(t *tx, dataType byte, key []byte, duration int64) {
db.expireAt(t, dataType, key, time.Now().Unix()+duration)
}
func (db *DB) expireAt(t *tx, dataType byte, key []byte, when int64) {
mk := db.expEncodeMetaKey(dataType, key)
tk := db.expEncodeTimeKey(dataType, key, when)
t.Put(tk, mk)
t.Put(mk, PutInt64(when))
}
func (db *DB) ttl(expType byte, key []byte) (t int64, err error) {
mk := db.expEncodeMetaKey(expType+1, key)
func (db *DB) ttl(dataType byte, key []byte) (t int64, err error) {
mk := db.expEncodeMetaKey(dataType, key)
if t, err = Int64(db.db.Get(mk)); err != nil || t == 0 {
t = -1
@ -91,8 +95,8 @@ func (db *DB) ttl(expType byte, key []byte) (t int64, err error) {
return t, err
}
func (db *DB) rmExpire(t *tx, expType byte, key []byte) (int64, error) {
mk := db.expEncodeMetaKey(expType+1, key)
func (db *DB) rmExpire(t *tx, dataType byte, key []byte) (int64, error) {
mk := db.expEncodeMetaKey(dataType, key)
if v, err := db.db.Get(mk); err != nil {
return 0, err
} else if v == nil {
@ -100,26 +104,23 @@ func (db *DB) rmExpire(t *tx, expType byte, key []byte) (int64, error) {
} else if when, err2 := Int64(v, nil); err2 != nil {
return 0, err2
} else {
tk := db.expEncodeTimeKey(expType, key, when)
tk := db.expEncodeTimeKey(dataType, key, when)
t.Delete(mk)
t.Delete(tk)
return 1, nil
}
}
func (db *DB) expFlush(t *tx, expType byte) (err error) {
expMetaType, ok := mapExpMetaType[expType]
if !ok {
return errExpType
}
minKey := make([]byte, 2)
func (db *DB) expFlush(t *tx, dataType byte) (err error) {
minKey := make([]byte, 3)
minKey[0] = db.index
minKey[1] = expType
minKey[1] = expTimeType
minKey[2] = dataType
maxKey := make([]byte, 2)
maxKey := make([]byte, 3)
maxKey[0] = db.index
maxKey[1] = expMetaType + 1
maxKey[1] = expMetaType
maxKey[2] = dataType + 1
_, err = db.flushRegion(t, minKey, maxKey)
err = t.Commit()
@ -133,17 +134,17 @@ func (db *DB) expFlush(t *tx, expType byte) (err error) {
func newEliminator(db *DB) *elimination {
eli := new(elimination)
eli.db = db
eli.exp2Tx = make(map[byte]*tx)
eli.exp2Retire = make(map[byte]retireCallback)
eli.exp2Tx = make([]*tx, maxDataType)
eli.exp2Retire = make([]retireCallback, maxDataType)
return eli
}
func (eli *elimination) regRetireContext(expType byte, t *tx, onRetire retireCallback) {
func (eli *elimination) regRetireContext(dataType byte, t *tx, onRetire retireCallback) {
// todo .. need to ensure exist - mapExpMetaType[expType]
eli.exp2Tx[expType] = t
eli.exp2Retire[expType] = onRetire
eli.exp2Tx[dataType] = t
eli.exp2Retire[dataType] = onRetire
}
// call by outside ... (from *db to another *db)
@ -151,56 +152,43 @@ func (eli *elimination) active() {
now := time.Now().Unix()
db := eli.db
dbGet := db.db.Get
expKeys := make([][]byte, 0, 1024)
expTypes := [...]byte{kvExpType, lExpType, hExpType, zExpType}
for _, et := range expTypes {
// search those keys' which expire till the moment
minKey := db.expEncodeTimeKey(et, nil, 0)
maxKey := db.expEncodeTimeKey(et, nil, now+1)
expKeys = expKeys[0:0]
minKey := db.expEncodeTimeKey(noneType, nil, 0)
maxKey := db.expEncodeTimeKey(maxDataType, nil, now)
t, _ := eli.exp2Tx[et]
onRetire, _ := eli.exp2Retire[et]
if t == nil || onRetire == nil {
// todo : log error
it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1)
for ; it.Valid(); it.Next() {
tk := it.RawKey()
mk := it.RawValue()
dt, k, _, err := db.expDecodeTimeKey(tk)
if err != nil {
continue
}
it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1)
for it.Valid() {
for i := 1; i < 512 && it.Valid(); i++ {
expKeys = append(expKeys, it.Key(), it.Value())
it.Next()
}
var cnt int = len(expKeys)
if cnt == 0 {
t := eli.exp2Tx[dt]
onRetire := eli.exp2Retire[dt]
if tk == nil || onRetire == nil {
continue
}
t.Lock()
var mk, ek, k []byte
for i := 0; i < cnt; i += 2 {
ek, mk = expKeys[i], expKeys[i+1]
if exp, err := Int64(dbGet(mk)); err == nil {
// check expire again
if exp > now {
continue
if exp <= now {
onRetire(t, k)
t.Delete(tk)
t.Delete(mk)
t.Commit()
}
// delete keys
k = db.expDecodeMetaKey(mk)
onRetire(t, k)
t.Delete(ek)
t.Delete(mk)
}
}
t.Commit()
t.Unlock()
} // end : it
}
it.Close()
} // end : expType
return
}

View File

@ -260,7 +260,7 @@ func (db *DB) zExpireAt(key []byte, when int64) (int64, error) {
if zcnt, err := db.ZCard(key); err != nil || zcnt == 0 {
return 0, err
} else {
db.expireAt(t, zExpType, key, when)
db.expireAt(t, zsetType, key, when)
if err := t.Commit(); err != nil {
return 0, err
}
@ -314,7 +314,7 @@ func (db *DB) zIncrSize(t *tx, key []byte, delta int64) (int64, error) {
if size <= 0 {
size = 0
t.Delete(sk)
db.rmExpire(t, zExpType, key)
db.rmExpire(t, zsetType, key)
} else {
t.Put(sk, PutInt64(size))
}
@ -752,7 +752,7 @@ func (db *DB) zFlush() (drop int64, err error) {
}
it.Close()
db.expFlush(t, zExpType)
db.expFlush(t, zsetType)
err = t.Commit()
return
@ -818,7 +818,7 @@ func (db *DB) ZTTL(key []byte) (int64, error) {
return -1, err
}
return db.ttl(zExpType, key)
return db.ttl(zsetType, key)
}
func (db *DB) ZPersist(key []byte) (int64, error) {
@ -830,7 +830,7 @@ func (db *DB) ZPersist(key []byte) (int64, error) {
t.Lock()
defer t.Unlock()
n, err := db.rmExpire(t, zExpType, key)
n, err := db.rmExpire(t, zsetType, key)
if err != nil {
return 0, err
}

View File

@ -289,14 +289,16 @@ func (db *DB) RevRangeIterator(min []byte, max []byte, rangeType uint8) *RangeLi
return NewRevRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{0, -1})
}
//count < 0, unlimit
//offset must >= 0, if < 0, will get nothing
//count < 0, unlimit.
//
//offset must >= 0, if < 0, will get nothing.
func (db *DB) RangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator {
return NewRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count})
}
//count < 0, unlimit
//offset must >= 0, if < 0, will get nothing
//count < 0, unlimit.
//
//offset must >= 0, if < 0, will get nothing.
func (db *DB) RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator {
return NewRevRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count})
}

View File

@ -24,11 +24,14 @@ const (
)
// min must less or equal than max
//
// range type:
//
// close: [min, max]
// open: (min, max)
// lopen: (min, max]
// ropen: [min, max)
//
type Range struct {
Min []byte
Max []byte
@ -92,7 +95,7 @@ func (it *Iterator) RawValue() []byte {
return slice(unsafe.Pointer(vdata), int(C.int(vlen)))
}
// Copy key to b, if b len is small or nil, returns a new one
// Copy key to b, if b len is small or nil, returns a new one.
func (it *Iterator) BufKey(b []byte) []byte {
k := it.RawKey()
if k == nil {
@ -106,7 +109,7 @@ func (it *Iterator) BufKey(b []byte) []byte {
return append(b, k...)
}
// Copy value to b, if b len is small or nil, returns a new one
// Copy value to b, if b len is small or nil, returns a new one.
func (it *Iterator) BufValue(b []byte) []byte {
v := it.RawValue()
if v == nil {
@ -150,7 +153,7 @@ func (it *Iterator) Seek(key []byte) {
it.isValid = C.leveldb_iter_seek_ext(it.it, (*C.char)(unsafe.Pointer(&key[0])), C.size_t(len(key)))
}
// Finds by key, if not found, nil returns
// Finds by key, if not found, nil returns.
func (it *Iterator) Find(key []byte) []byte {
it.Seek(key)
if it.Valid() {
@ -165,7 +168,7 @@ func (it *Iterator) Find(key []byte) []byte {
return nil
}
// Finds by key, if not found, nil returns, else a reference of value returns
// Finds by key, if not found, nil returns, else a reference of value returns.
// you must be careful that it will be changed after next iterate.
func (it *Iterator) RawFind(key []byte) []byte {
it.Seek(key)

View File

@ -55,14 +55,6 @@ func (o *Options) SetCache(cache *Cache) {
C.leveldb_options_set_cache(o.Opt, cache.Cache)
}
// func (o *Options) SetEnv(env *Env) {
// C.leveldb_options_set_env(o.Opt, env.Env)
// }
func (o *Options) SetInfoLog(log *C.leveldb_logger_t) {
C.leveldb_options_set_info_log(o.Opt, log)
}
func (o *Options) SetWriteBufferSize(s int) {
C.leveldb_options_set_write_buffer_size(o.Opt, C.size_t(s))
}

View File

@ -45,14 +45,16 @@ func (s *Snapshot) RevRangeIterator(min []byte, max []byte, rangeType uint8) *Ra
return NewRevRangeLimitIterator(s.NewIterator(), &Range{min, max, rangeType}, &Limit{0, -1})
}
//count < 0, unlimit
//offset must >= 0, if < 0, will get nothing
//count < 0, unlimit.
//
//offset must >= 0, if < 0, will get nothing.
func (s *Snapshot) RangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator {
return NewRangeLimitIterator(s.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count})
}
//count < 0, unlimit
//offset must >= 0, if < 0, will get nothing
//count < 0, unlimit.
//
//offset must >= 0, if < 0, will get nothing.
func (s *Snapshot) RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator {
return NewRevRangeLimitIterator(s.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count})
}