From f64c4e149047b121dcb145ae4ad2f72710b312c5 Mon Sep 17 00:00:00 2001 From: holys Date: Wed, 9 Jul 2014 17:29:49 +0800 Subject: [PATCH 01/15] update test case for openresty(lua) --- client/openresty/ledis_test.lua | 34 +++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/client/openresty/ledis_test.lua b/client/openresty/ledis_test.lua index 0e6d65e..a7f9262 100644 --- a/client/openresty/ledis_test.lua +++ b/client/openresty/ledis_test.lua @@ -11,6 +11,13 @@ --]] local ledis = require "ledis" + +-- if you want to test add_commands, remove the command from +-- `local commands `, e.g.: ``get``, then use ``add_commands`` to add +-- command ``get`` + +--ledis.add_commands("get") + local lds = ledis:new() lds:set_timeout(1000) @@ -823,3 +830,30 @@ if not res then end ngx.say("SELECT should be OK <=>", res) + +-- get_reused_times + +times, err = lds:get_reused_times() +if not times then + ngx.say("failed to get_reused_times", err) + return +end + +ngx.say("time is", times) + +-- local ok, err = lds:set_keepalive(10000, 100) +-- if not ok then +-- ngx.say("failed to set keepalive: ", err) +-- return +-- end +-- ngx.say("set_keepalive success") +-- or just close the connection right away: + + + +local ok, err = lds:close() +if not ok then + ngx.say("failed to close: ", err) + return +end +ngx.say("close success") \ No newline at end of file From 9f3acd5a50b012b9c942a8c8d4b3ded094fb4a4a Mon Sep 17 00:00:00 2001 From: siddontang Date: Thu, 10 Jul 2014 09:09:23 +0800 Subject: [PATCH 02/15] remove cn readme MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit we don’t need Chinese explain --- README_CN.md | 94 ---------------------------------------------------- 1 file changed, 94 deletions(-) delete mode 100644 README_CN.md diff --git a/README_CN.md b/README_CN.md deleted file mode 100644 index 8faade3..0000000 --- a/README_CN.md +++ /dev/null @@ -1,94 +0,0 @@ -# ledisdb - -ledisdb是一个用go实现的类似redis的高性能nosql数据库,底层基于leveldb实现。提供了kv,list,hash以及zset几种数据结构的支持。 - -最开始源于[ssdb](https://github.com/ideawu/ssdb),在使用了一段时间之后,因为兴趣的原因,决定用go实现一个。 - -## 编译 - -+ 创建一个工作目录,并check ledisdb源码 - - mkdir $WORKSPACE - cd $WORKSPACE - git clone git@github.com:siddontang/ledisdb.git src/github.com/siddontang/ledisdb - - cd src/github.com/siddontang/ledisdb - -+ 安装leveldb以及snappy,如果你已经安装,忽略 - - 我提供了一个简单的脚本进行leveldb的安装,你可以直接在shell中输入: - - sh build_leveldb.sh - - 默认该脚本会将leveldb以及snappy安装到/usr/local/leveldb以及/usr/local/snappy目录 - -+ 在dev.sh里面设置LEVELDB_DIR以及SNAPPY_DIR为实际的安装路径,默认为/usr/local/leveldb以及/usr/local/snappy - -+ 运行bootstrap.sh构建ledisdb go的依赖库 - - . ./bootstap.sh 或者 source ./bootstrap.sh - -+ 运行dev.sh - - . ./dev.sh 或者 source ./dev.sh - -+ 编译安装ledisdb - - go install ./... - -## 运行 - - ./ledis-server -config=/etc/ledis.json - - //another shell - ledis-cli -p 6380 - - ledis 127.0.0.1:6380> set a 1 - OK - ledis 127.0.0.1:6380> get a - "1" - -## 嵌入库 - - import "github.com/siddontang/ledisdb/ledis" - l, _ := ledis.Open(cfg) - db, _ := l.Select(0) - - db.Set(key, value) - - db.Get(key) - -## Benchmark - -可以通过查看benchmark.md获取最新的性能测试结果 - -## Replication - -通过配置或者运行时输入slaveof开启slave的replication功能 - - ledis-cli -p 6381 - - ledis 127.0.0.1:6381> slaveof 127.0.0.1:6380 - OK - -## Todo - -+ Admin - -## GoDoc - -[![GoDoc](https://godoc.org/github.com/siddontang/ledisdb?status.png)](https://godoc.org/github.com/siddontang/ledisdb) - -## Commands - -一些命令的解释在[这里](https://github.com/siddontang/ledisdb/wiki/Commands), 后续会不断加入。 - -## 感谢 - -Gmail: cenqichao@gmail.com - -Gmail: chendahui007@gmail.com - -## 联系我 - -Gmail: siddontang@gmail.com \ No newline at end of file From 70df6e2d043b0672a0ef673ed686146e22ee8a58 Mon Sep 17 00:00:00 2001 From: siddontang Date: Thu, 10 Jul 2014 09:13:55 +0800 Subject: [PATCH 03/15] update read me --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 9943c33..1712d8d 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # ledisdb -Ledisdb is a high performance nosql like redis based on leveldb written by go. It's supports some advanced data structure like kv, list, hash and zset. +Ledisdb is a high performance NoSQL like Redis based on LevelDB written by go. It supports some advanced data structure like kv, list, hash and zset, and may be alternative for Redis. ## Build and Install From 612367d472d5869125d9e62b7d17cb6d10b46e54 Mon Sep 17 00:00:00 2001 From: siddontang Date: Thu, 10 Jul 2014 09:29:34 +0800 Subject: [PATCH 04/15] update doc --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 1712d8d..123e413 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# ledisdb +# LedisDB Ledisdb is a high performance NoSQL like Redis based on LevelDB written by go. It supports some advanced data structure like kv, list, hash and zset, and may be alternative for Redis. From cdb6b1cdd2f030156f8293eb882ea146ef4dcc2f Mon Sep 17 00:00:00 2001 From: siddontang Date: Thu, 10 Jul 2014 11:32:37 +0800 Subject: [PATCH 05/15] use iterator to read many consistently in api for a read api, it not uses lock, so multi read in a api may cause some unpredictable when something writes at same time. --- ledis/t_list.go | 70 +++++++++++++++++++++++++++------------------ ledis/t_zset.go | 27 ++++++++--------- leveldb/iterator.go | 14 +++++++-- 3 files changed, 68 insertions(+), 43 deletions(-) diff --git a/ledis/t_list.go b/ledis/t_list.go index 39ca6ed..2f17609 100644 --- a/ledis/t_list.go +++ b/ledis/t_list.go @@ -84,8 +84,12 @@ func (db *DB) lpush(key []byte, whereSeq int32, args ...[]byte) (int64, error) { var size int32 var err error + t := db.listTx + t.Lock() + defer t.Unlock() + metaKey := db.lEncodeMetaKey(key) - headSeq, tailSeq, size, err = db.lGetMeta(metaKey) + headSeq, tailSeq, size, err = db.lGetMeta(nil, metaKey) if err != nil { return 0, err } @@ -102,10 +106,6 @@ func (db *DB) lpush(key []byte, whereSeq int32, args ...[]byte) (int64, error) { delta = 1 } - t := db.listTx - t.Lock() - defer t.Unlock() - // append elements if size > 0 { seq += delta @@ -148,7 +148,7 @@ func (db *DB) lpop(key []byte, whereSeq int32) ([]byte, error) { var err error metaKey := db.lEncodeMetaKey(key) - headSeq, tailSeq, _, err = db.lGetMeta(metaKey) + headSeq, tailSeq, _, err = db.lGetMeta(nil, metaKey) if err != nil { return nil, err } @@ -191,7 +191,10 @@ func (db *DB) lDelete(t *tx, key []byte) int64 { var tailSeq int32 var err error - headSeq, tailSeq, _, err = db.lGetMeta(mk) + it := db.db.NewIterator() + defer it.Close() + + headSeq, tailSeq, _, err = db.lGetMeta(it, mk) if err != nil { return 0 } @@ -200,27 +203,24 @@ func (db *DB) lDelete(t *tx, key []byte) int64 { startKey := db.lEncodeListKey(key, headSeq) stopKey := db.lEncodeListKey(key, tailSeq) - it := db.db.RangeLimitIterator(startKey, stopKey, leveldb.RangeClose, 0, -1) - for ; it.Valid(); it.Next() { - t.Delete(it.Key()) + rit := leveldb.NewRangeIterator(it, &leveldb.Range{startKey, stopKey, leveldb.RangeClose}) + for ; rit.Valid(); rit.Next() { + t.Delete(rit.RawKey()) num++ } - it.Close() t.Delete(mk) return num } -func (db *DB) lGetSeq(key []byte, whereSeq int32) (int64, error) { - ek := db.lEncodeListKey(key, whereSeq) - - return Int64(db.db.Get(ek)) -} - -func (db *DB) lGetMeta(ek []byte) (headSeq int32, tailSeq int32, size int32, err error) { +func (db *DB) lGetMeta(it *leveldb.Iterator, ek []byte) (headSeq int32, tailSeq int32, size int32, err error) { var v []byte - v, err = db.db.Get(ek) + if it != nil { + v = it.Find(ek) + } else { + v, err = db.db.Get(ek) + } if err != nil { return } else if v == nil { @@ -284,7 +284,10 @@ func (db *DB) LIndex(key []byte, index int32) ([]byte, error) { metaKey := db.lEncodeMetaKey(key) - headSeq, tailSeq, _, err = db.lGetMeta(metaKey) + it := db.db.NewIterator() + defer it.Close() + + headSeq, tailSeq, _, err = db.lGetMeta(it, metaKey) if err != nil { return nil, err } @@ -296,7 +299,9 @@ func (db *DB) LIndex(key []byte, index int32) ([]byte, error) { } sk := db.lEncodeListKey(key, seq) - return db.db.Get(sk) + v := it.Find(sk) + + return v, nil } func (db *DB) LLen(key []byte) (int64, error) { @@ -305,7 +310,7 @@ func (db *DB) LLen(key []byte) (int64, error) { } ek := db.lEncodeMetaKey(key) - _, _, size, err := db.lGetMeta(ek) + _, _, size, err := db.lGetMeta(nil, ek) return int64(size), err } @@ -328,7 +333,10 @@ func (db *DB) LRange(key []byte, start int32, stop int32) ([][]byte, error) { metaKey := db.lEncodeMetaKey(key) - if headSeq, _, llen, err = db.lGetMeta(metaKey); err != nil { + it := db.db.NewIterator() + defer it.Close() + + if headSeq, _, llen, err = db.lGetMeta(it, metaKey); err != nil { return nil, err } @@ -356,12 +364,18 @@ func (db *DB) LRange(key []byte, start int32, stop int32) ([][]byte, error) { v := make([][]byte, 0, limit) startKey := db.lEncodeListKey(key, headSeq) - it := db.db.RangeLimitIterator(startKey, nil, leveldb.RangeClose, 0, int(limit)) - for ; it.Valid(); it.Next() { - v = append(v, it.Value()) - } + rit := leveldb.NewRangeLimitIterator(it, + &leveldb.Range{ + Min: startKey, + Max: nil, + Type: leveldb.RangeClose}, + &leveldb.Limit{ + Offset: 0, + Count: int(limit)}) - it.Close() + for ; rit.Valid(); rit.Next() { + v = append(v, rit.Value()) + } return v, nil } diff --git a/ledis/t_zset.go b/ledis/t_zset.go index e9b9c4b..9a2f105 100644 --- a/ledis/t_zset.go +++ b/ledis/t_zset.go @@ -451,37 +451,37 @@ func (db *DB) zrank(key []byte, member []byte, reverse bool) (int64, error) { k := db.zEncodeSetKey(key, member) - if v, err := db.db.Get(k); err != nil { - return 0, err - } else if v == nil { + it := db.db.NewIterator() + defer it.Close() + + if v := it.Find(k); v == nil { return -1, nil } else { - if s, err := Int64(v, err); err != nil { + if s, err := Int64(v, nil); err != nil { return 0, err } else { - var it *leveldb.RangeLimitIterator + var rit *leveldb.RangeLimitIterator sk := db.zEncodeScoreKey(key, member, s) if !reverse { minKey := db.zEncodeStartScoreKey(key, MinScore) - it = db.db.RangeLimitIterator(minKey, sk, leveldb.RangeClose, 0, -1) + + rit = leveldb.NewRangeIterator(it, &leveldb.Range{minKey, sk, leveldb.RangeClose}) } else { maxKey := db.zEncodeStopScoreKey(key, MaxScore) - it = db.db.RevRangeLimitIterator(sk, maxKey, leveldb.RangeClose, 0, -1) + rit = leveldb.NewRevRangeIterator(it, &leveldb.Range{sk, maxKey, leveldb.RangeClose}) } var lastKey []byte = nil var n int64 = 0 - for ; it.Valid(); it.Next() { + for ; rit.Valid(); rit.Next() { n++ - lastKey = it.BufKey(lastKey) + lastKey = rit.BufKey(lastKey) } - it.Close() - if _, m, _, err := db.zDecodeScoreKey(lastKey); err == nil && bytes.Equal(m, member) { n-- return n, nil @@ -741,8 +741,10 @@ func (db *DB) zFlush() (drop int64, err error) { maxKey[1] = zScoreType + 1 it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) + defer it.Close() + for ; it.Valid(); it.Next() { - t.Delete(it.Key()) + t.Delete(it.RawKey()) drop++ if drop&1023 == 0 { if err = t.Commit(); err != nil { @@ -750,7 +752,6 @@ func (db *DB) zFlush() (drop int64, err error) { } } } - it.Close() db.expFlush(t, zsetType) diff --git a/leveldb/iterator.go b/leveldb/iterator.go index 3c3e811..a75e4d7 100644 --- a/leveldb/iterator.go +++ b/leveldb/iterator.go @@ -125,8 +125,10 @@ func (it *Iterator) BufValue(b []byte) []byte { } func (it *Iterator) Close() { - C.leveldb_iter_destroy(it.it) - it.it = nil + if it.it != nil { + C.leveldb_iter_destroy(it.it) + it.it = nil + } } func (it *Iterator) Valid() bool { @@ -274,6 +276,14 @@ func NewRevRangeLimitIterator(i *Iterator, r *Range, l *Limit) *RangeLimitIterat return rangeLimitIterator(i, r, l, IteratorBackward) } +func NewRangeIterator(i *Iterator, r *Range) *RangeLimitIterator { + return rangeLimitIterator(i, r, &Limit{0, -1}, IteratorForward) +} + +func NewRevRangeIterator(i *Iterator, r *Range) *RangeLimitIterator { + return rangeLimitIterator(i, r, &Limit{0, -1}, IteratorBackward) +} + func rangeLimitIterator(i *Iterator, r *Range, l *Limit, direction uint8) *RangeLimitIterator { it := new(RangeLimitIterator) From 7c4c9b1378a44842a873c4fa324609ed6389c687 Mon Sep 17 00:00:00 2001 From: holys Date: Thu, 10 Jul 2014 12:02:34 +0800 Subject: [PATCH 06/15] delete code about unsupported feature like password and SSL connection --- client/ledis-py/ledis/client.py | 4 ++-- client/ledis-py/ledis/connection.py | 24 ++++-------------------- 2 files changed, 6 insertions(+), 22 deletions(-) diff --git a/client/ledis-py/ledis/client.py b/client/ledis-py/ledis/client.py index c159b8f..1138f69 100644 --- a/client/ledis-py/ledis/client.py +++ b/client/ledis-py/ledis/client.py @@ -112,8 +112,8 @@ class Ledis(object): For example:: - redis://[:password]@localhost:6380/0 - unix://[:password]@/path/to/socket.sock?db=0 + redis://localhost:6380/0 + unix:///path/to/socket.sock?db=0 There are several ways to specify a database number. The parse function will return the first specified option: diff --git a/client/ledis-py/ledis/connection.py b/client/ledis-py/ledis/connection.py index 18cf3b3..5762da1 100644 --- a/client/ledis-py/ledis/connection.py +++ b/client/ledis-py/ledis/connection.py @@ -145,7 +145,7 @@ DefaultParser = PythonParser class Connection(object): "Manages TCP communication to and from a Ledis server" - def __init__(self, host='localhost', port=6380, db=0, password=None, + def __init__(self, host='localhost', port=6380, db=0, socket_timeout=None, encoding='utf-8', encoding_errors='strict', decode_responses=False, parser_class=DefaultParser): @@ -153,7 +153,6 @@ class Connection(object): self.host = host self.port = port self.db = db - self.password = password self.socket_timeout = socket_timeout self.encoding = encoding self.encoding_errors = encoding_errors @@ -201,12 +200,6 @@ class Connection(object): "Initialize the connection, authenticate and select a database" self._parser.on_connect(self) - # if a password is specified, authenticate - if self.password: - self.send_command('AUTH', self.password) - if nativestr(self.read_response()) != 'OK': - raise AuthenticationError('Invalid Password') - # if a database is specified, switch to it if self.db: self.send_command('SELECT', self.db) @@ -283,14 +276,12 @@ class Connection(object): class UnixDomainSocketConnection(Connection): - def __init__(self, path='', db=0, password=None, - socket_timeout=None, encoding='utf-8', + def __init__(self, path='', db=0, socket_timeout=None, encoding='utf-8', encoding_errors='strict', decode_responses=False, parser_class=DefaultParser): self.pid = os.getpid() self.path = path self.db = db - self.password = password self.socket_timeout = socket_timeout self.encoding = encoding self.encoding_errors = encoding_errors @@ -326,13 +317,11 @@ class ConnectionPool(object): For example:: - redis://[:password]@localhost:6379/0 - rediss://[:password]@localhost:6379/0 - unix://[:password]@/path/to/socket.sock?db=0 + redis://localhost:6380/0 + unix:///path/to/socket.sock?db=0 Three URL schemes are supported: redis:// creates a normal TCP socket connection - rediss:// creates a SSL wrapped TCP socket connection unix:// creates a Unix Domain Socket connection There are several ways to specify a database number. The parse function @@ -371,7 +360,6 @@ class ConnectionPool(object): # We only support redis:// and unix:// schemes. if url.scheme == 'unix': url_options.update({ - 'password': url.password, 'path': url.path, 'connection_class': UnixDomainSocketConnection, }) @@ -380,7 +368,6 @@ class ConnectionPool(object): url_options.update({ 'host': url.hostname, 'port': int(url.port or 6380), - 'password': url.password, }) # If there's a path argument, use it as the db argument if a @@ -391,9 +378,6 @@ class ConnectionPool(object): except (AttributeError, ValueError): pass - if url.scheme == 'lediss': - url_options['connection_class'] = SSLConnection - # last shot at the db value url_options['db'] = int(url_options.get('db', db or 0)) From 4d909642a42aa751f3337abb3d444fce58fafc3b Mon Sep 17 00:00:00 2001 From: holys Date: Thu, 10 Jul 2014 12:03:36 +0800 Subject: [PATCH 07/15] update ledis-py README --- client/ledis-py/README.md | 43 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 42 insertions(+), 1 deletion(-) diff --git a/client/ledis-py/README.md b/client/ledis-py/README.md index 616e911..23549d0 100644 --- a/client/ledis-py/README.md +++ b/client/ledis-py/README.md @@ -47,5 +47,46 @@ For full API reference, please visit [rtfd](http://ledis-py.readthedocs.org/). ### Connection Pools -### Connnections +Behind the scenes, ledis-py uses a connection pool to manage connections to a Ledis server. By default, each Ledis instance you create will in turn create its own connection pool. You can override this behavior and use an existing connection pool by passing an already created connection pool instance to the connection_pool argument of the Ledis class. You may choose to do this in order to implement client side sharding or have finer grain control of how connections are managed. +``` +>>> pool = ledis.ConnectionPool(host='localhost', port=6380, db=0) +>>> l = ledis.Ledis(connection_pool=pool) +``` + +### Connections + +ConnectionPools manage a set of Connection instances. ledis-py ships with two types of Connections. The default, Connection, is a normal TCP socket based connection. The UnixDomainSocketConnection allows for clients running on the same device as the server to connect via a unix domain socket. To use a UnixDomainSocketConnection connection, simply pass the unix_socket_path argument, which is a string to the unix domain socket file. Additionally, make sure the unixsocket parameter is defined in your `ledis.json` file. e.g.: + +``` +{ + "addr": "/tmp/ledis.sock", + ... +} +``` + +``` +>>> l = ledis.Ledis(unix_socket_path='/tmp/ledis.sock') +``` + +You can create your own Connection subclasses as well. This may be useful if you want to control the socket behavior within an async framework. To instantiate a client class using your own connection, you need to create a connection pool, passing your class to the connection_class argument. Other keyword parameters your pass to the pool will be passed to the class specified during initialization. + +``` +>>> pool = ledis.ConnectionPool(connection_class=YourConnectionClass, + your_arg='...', ...) +``` + +e.g.: + +``` +>>> from ledis import UnixDomainSocketConnection +>>> pool = ledis.ConnectionPool(connection_class=UnixDomainSocketConnection, path='/tmp/ledis.sock') +``` + +## Response Callbacks + +The client class uses a set of callbacks to cast Ledis responses to the appropriate Python type. There are a number of these callbacks defined on the Ledis client class in a dictionary called RESPONSE_CALLBACKS. + +Custom callbacks can be added on a per-instance basis using the `set_response_callback` method. This method accepts two arguments: a command name and the callback. Callbacks added in this manner are only valid on the instance the callback is added to. If you want to define or override a callback globally, you should make a subclass of the Ledis client and add your callback to its RESPONSE_CALLBACKS class dictionary. + +Response callbacks take at least one parameter: the response from the Ledis server. Keyword arguments may also be accepted in order to further control how to interpret the response. These keyword arguments are specified during the command's call to execute_command. The ZRANGE implementation demonstrates the use of response callback keyword arguments with its "withscores" argument. \ No newline at end of file From b2fa4082ffed9b25dc22611cc476c475d709940c Mon Sep 17 00:00:00 2001 From: holys Date: Thu, 10 Jul 2014 15:17:51 +0800 Subject: [PATCH 08/15] fix typo --- client/ledis-py/ledis/client.py | 8 ++++---- client/ledis-py/ledis/connection.py | 26 +++++++++++++------------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/client/ledis-py/ledis/client.py b/client/ledis-py/ledis/client.py index 1138f69..a855e84 100644 --- a/client/ledis-py/ledis/client.py +++ b/client/ledis-py/ledis/client.py @@ -112,14 +112,14 @@ class Ledis(object): For example:: - redis://localhost:6380/0 + ledis://localhost:6380/0 unix:///path/to/socket.sock?db=0 There are several ways to specify a database number. The parse function will return the first specified option: - 1. A ``db`` querystring option, e.g. redis://localhost?db=0 - 2. If using the redis:// scheme, the path argument of the url, e.g. - redis://localhost/0 + 1. A ``db`` querystring option, e.g. ledis://localhost?db=0 + 2. If using the ledis:// scheme, the path argument of the url, e.g. + ledis://localhost/0 3. The ``db`` argument to this function. If none of these options are specified, db=0 is used. diff --git a/client/ledis-py/ledis/connection.py b/client/ledis-py/ledis/connection.py index 5762da1..4a39317 100644 --- a/client/ledis-py/ledis/connection.py +++ b/client/ledis-py/ledis/connection.py @@ -317,18 +317,18 @@ class ConnectionPool(object): For example:: - redis://localhost:6380/0 + ledis://localhost:6380/0 unix:///path/to/socket.sock?db=0 Three URL schemes are supported: - redis:// creates a normal TCP socket connection + ledis:// creates a normal TCP socket connection unix:// creates a Unix Domain Socket connection There are several ways to specify a database number. The parse function will return the first specified option: - 1. A ``db`` querystring option, e.g. redis://localhost?db=0 - 2. If using the redis:// scheme, the path argument of the url, e.g. - redis://localhost/0 + 1. A ``db`` querystring option, e.g. ledis://localhost?db=0 + 2. If using the ledis:// scheme, the path argument of the url, e.g. + ledis://localhost/0 3. The ``db`` argument to this function. If none of these options are specified, db=0 is used. @@ -357,7 +357,7 @@ class ConnectionPool(object): if value and len(value) > 0: url_options[name] = value[0] - # We only support redis:// and unix:// schemes. + # We only support ledis:// and unix:// schemes. if url.scheme == 'unix': url_options.update({ 'path': url.path, @@ -437,18 +437,18 @@ class BlockingConnectionPool(object): """ Thread-safe blocking connection pool:: - >>> from redis.client import Redis - >>> client = Redis(connection_pool=BlockingConnectionPool()) + >>> from ledis.client import Ledis + >>> client = Ledis(connection_pool=BlockingConnectionPool()) It performs the same function as the default - ``:py:class: ~redis.connection.ConnectionPool`` implementation, in that, + ``:py:class: ~ledis.connection.ConnectionPool`` implementation, in that, it maintains a pool of reusable connections that can be shared by - multiple redis clients (safely across threads if required). + multiple ledis clients (safely across threads if required). The difference is that, in the event that a client tries to get a connection from the pool when all of connections are in use, rather than - raising a ``:py:class: ~redis.exceptions.ConnectionError`` (as the default - ``:py:class: ~redis.connection.ConnectionPool`` implementation does), it + raising a ``:py:class: ~ledis.exceptions.ConnectionError`` (as the default + ``:py:class: ~ledis.connection.ConnectionPool`` implementation does), it makes the client wait ("blocks") for a specified number of seconds until a connection becomes available. @@ -548,7 +548,7 @@ class BlockingConnectionPool(object): try: connection = self.pool.get(block=True, timeout=self.timeout) except Empty: - # Note that this is not caught by the redis client and will be + # Note that this is not caught by the ledis client and will be # raised unless handled by application code. If you want never to raise ConnectionError("No connection available.") From 55056660e304c2ffd4b02b11405165f1bcc338a2 Mon Sep 17 00:00:00 2001 From: siddontang Date: Fri, 11 Jul 2014 10:43:39 +0800 Subject: [PATCH 09/15] add ledis-dump tool --- cmd/ledis-dump/main.go | 62 ++++++++++++++++++++++++++++++++++++++++++ server/client.go | 2 +- server/replication.go | 4 +-- server/util.go | 8 +++--- 4 files changed, 69 insertions(+), 7 deletions(-) create mode 100644 cmd/ledis-dump/main.go diff --git a/cmd/ledis-dump/main.go b/cmd/ledis-dump/main.go new file mode 100644 index 0000000..bf02ad7 --- /dev/null +++ b/cmd/ledis-dump/main.go @@ -0,0 +1,62 @@ +package main + +import ( + "bufio" + "flag" + "fmt" + "github.com/siddontang/ledisdb/server" + "net" + "os" +) + +var host = flag.String("host", "127.0.0.1", "ledis server host") +var port = flag.Int("port", 6380, "ledis server port") +var sock = flag.String("sock", "", "ledis unix socket domain") +var dumpFile = flag.String("o", "./ledis.dump", "dump file to save") + +var fullSyncCmd = []byte("*1\r\n$8\r\nfullsync\r\n") //fullsync + +func main() { + flag.Parse() + + var c net.Conn + var err error + var f *os.File + + if f, err = os.OpenFile(*dumpFile, os.O_CREATE|os.O_WRONLY, os.ModePerm); err != nil { + println(err.Error()) + return + } + + defer f.Close() + + if len(*sock) != 0 { + c, err = net.Dial("unix", *sock) + } else { + addr := fmt.Sprintf("%s:%d", *host, *port) + c, err = net.Dial("tcp", addr) + } + + if err != nil { + println(err.Error()) + return + } + + defer c.Close() + + println("dump begin") + + if _, err = c.Write(fullSyncCmd); err != nil { + println(err.Error()) + return + } + + rb := bufio.NewReaderSize(c, 16*1024) + + if err = server.ReadBulkTo(rb, f); err != nil { + println(err.Error()) + return + } + + println("dump end") +} diff --git a/server/client.go b/server/client.go index bb4fe0e..11c9f97 100644 --- a/server/client.go +++ b/server/client.go @@ -81,7 +81,7 @@ func (c *client) run() { } func (c *client) readLine() ([]byte, error) { - return readLine(c.rb) + return ReadLine(c.rb) } //A client sends to the Redis server a RESP Array consisting of just Bulk Strings. diff --git a/server/replication.go b/server/replication.go index 6c30968..ee74466 100644 --- a/server/replication.go +++ b/server/replication.go @@ -254,7 +254,7 @@ func (m *master) fullSync() error { defer os.Remove(dumpPath) - err = readBulkTo(m.rb, f) + err = ReadBulkTo(m.rb, f) f.Close() if err != nil { log.Error("read dump data error %s", err.Error()) @@ -291,7 +291,7 @@ func (m *master) sync() error { m.syncBuf.Reset() - err := readBulkTo(m.rb, &m.syncBuf) + err := ReadBulkTo(m.rb, &m.syncBuf) if err != nil { return err } diff --git a/server/util.go b/server/util.go index 9afee4e..49c20ba 100644 --- a/server/util.go +++ b/server/util.go @@ -14,7 +14,7 @@ var ( errLineFormat = errors.New("bad response line format") ) -func readLine(rb *bufio.Reader) ([]byte, error) { +func ReadLine(rb *bufio.Reader) ([]byte, error) { p, err := rb.ReadSlice('\n') if err != nil { @@ -27,8 +27,8 @@ func readLine(rb *bufio.Reader) ([]byte, error) { return p[:i], nil } -func readBulkTo(rb *bufio.Reader, w io.Writer) error { - l, err := readLine(rb) +func ReadBulkTo(rb *bufio.Reader, w io.Writer) error { + l, err := ReadLine(rb) if len(l) == 0 { return errBulkFormat } else if l[0] == '$' { @@ -43,7 +43,7 @@ func readBulkTo(rb *bufio.Reader, w io.Writer) error { return err } - if l, err = readLine(rb); err != nil { + if l, err = ReadLine(rb); err != nil { return err } else if len(l) != 0 { return errBulkFormat From a41b1ef66965013f97cb5d41a5e4754fddcb339e Mon Sep 17 00:00:00 2001 From: siddontang Date: Fri, 11 Jul 2014 13:27:40 +0800 Subject: [PATCH 10/15] refactor, bin rename to bit, rename datatype --- ledis/const.go | 43 ++++++++++++++------- ledis/ledis_db.go | 10 ++--- ledis/{t_bin.go => t_bit.go} | 28 +++++++++----- ledis/{t_bin_test.go => t_bit_test.go} | 0 ledis/t_hash.go | 26 ++++++------- ledis/t_kv.go | 16 ++++---- ledis/t_list.go | 26 ++++++------- ledis/t_ttl.go | 14 +++---- ledis/t_zset.go | 26 ++++++------- server/{cmd_bin.go => cmd_bit.go} | 0 server/{cmd_bin_test.go => cmd_bit_test.go} | 0 11 files changed, 107 insertions(+), 82 deletions(-) rename ledis/{t_bin.go => t_bit.go} (97%) rename ledis/{t_bin_test.go => t_bit_test.go} (100%) rename server/{cmd_bin.go => cmd_bit.go} (100%) rename server/{cmd_bin_test.go => cmd_bit_test.go} (100%) diff --git a/ledis/const.go b/ledis/const.go index ce5a4e6..9b2daaf 100644 --- a/ledis/const.go +++ b/ledis/const.go @@ -5,22 +5,39 @@ import ( ) const ( - noneType byte = 0 - kvType byte = 1 - hashType byte = 2 - hSizeType byte = 3 - listType byte = 4 - lMetaType byte = 5 - zsetType byte = 6 - zSizeType byte = 7 - zScoreType byte = 8 - binType byte = 9 - binMetaType byte = 10 + NoneType byte = 0 + KVType byte = 1 + HashType byte = 2 + HSizeType byte = 3 + ListType byte = 4 + LMetaType byte = 5 + ZSetType byte = 6 + ZSizeType byte = 7 + ZScoreType byte = 8 + BitType byte = 9 + BitMetaType byte = 10 maxDataType byte = 100 - expTimeType byte = 101 - expMetaType byte = 102 + ExpTimeType byte = 101 + ExpMetaType byte = 102 +) + +var ( + TypeName = map[byte]string{ + KVType: "kv", + HashType: "hash", + HSizeType: "hsize", + ListType: "list", + LMetaType: "lmeta", + ZSetType: "zset", + ZSizeType: "zsize", + ZScoreType: "zscore", + BitType: "bit", + BitMetaType: "bitmeta", + ExpTimeType: "exptime", + ExpMetaType: "expmeta", + } ) const ( diff --git a/ledis/ledis_db.go b/ledis/ledis_db.go index 58b93e8..83819ae 100644 --- a/ledis/ledis_db.go +++ b/ledis/ledis_db.go @@ -26,11 +26,11 @@ func (db *DB) FlushAll() (drop int64, err error) { func (db *DB) newEliminator() *elimination { eliminator := newEliminator(db) - eliminator.regRetireContext(kvType, db.kvTx, db.delete) - eliminator.regRetireContext(listType, db.listTx, db.lDelete) - eliminator.regRetireContext(hashType, db.hashTx, db.hDelete) - eliminator.regRetireContext(zsetType, db.zsetTx, db.zDelete) - eliminator.regRetireContext(binType, db.binTx, db.bDelete) + eliminator.regRetireContext(KVType, db.kvTx, db.delete) + eliminator.regRetireContext(ListType, db.listTx, db.lDelete) + eliminator.regRetireContext(HashType, db.hashTx, db.hDelete) + eliminator.regRetireContext(ZSetType, db.zsetTx, db.zDelete) + eliminator.regRetireContext(BitType, db.binTx, db.bDelete) return eliminator } diff --git a/ledis/t_bin.go b/ledis/t_bit.go similarity index 97% rename from ledis/t_bin.go rename to ledis/t_bit.go index 2d62047..4111df8 100644 --- a/ledis/t_bin.go +++ b/ledis/t_bit.go @@ -119,19 +119,27 @@ func (datas segBitInfoArray) Swap(i, j int) { func (db *DB) bEncodeMetaKey(key []byte) []byte { mk := make([]byte, len(key)+2) mk[0] = db.index - mk[1] = binMetaType + mk[1] = BitMetaType copy(mk, key) return mk } +func (db *DB) bDecodeMetaKey(bkey []byte) ([]byte, error) { + if len(bkey) < 2 || bkey[0] != db.index || bkey[1] != BitMetaType { + return nil, errBinKey + } + + return bkey[2:], nil +} + func (db *DB) bEncodeBinKey(key []byte, seq uint32) []byte { bk := make([]byte, len(key)+8) pos := 0 bk[pos] = db.index pos++ - bk[pos] = binType + bk[pos] = BitType pos++ binary.BigEndian.PutUint16(bk[pos:], uint16(len(key))) @@ -355,7 +363,7 @@ func (db *DB) bExpireAt(key []byte, when int64) (int64, error) { if seq, _, err := db.bGetMeta(key); err != nil || seq < 0 { return 0, err } else { - db.expireAt(t, binType, key, when) + db.expireAt(t, BitType, key, when) if err := t.Commit(); err != nil { return 0, err } @@ -407,7 +415,7 @@ func (db *DB) BDelete(key []byte) (drop int64, err error) { defer t.Unlock() drop = db.bDelete(t, key) - db.rmExpire(t, binType, key) + db.rmExpire(t, BitType, key) err = t.Commit() return @@ -736,7 +744,7 @@ func (db *DB) BOperation(op uint8, dstkey []byte, srckeys ...[]byte) (blen int32 // clear the old data in case db.bDelete(t, dstkey) - db.rmExpire(t, binType, dstkey) + db.rmExpire(t, BitType, dstkey) // set data db.bSetMeta(t, dstkey, maxDstSeq, maxDstOff) @@ -786,7 +794,7 @@ func (db *DB) BTTL(key []byte) (int64, error) { return -1, err } - return db.ttl(binType, key) + return db.ttl(BitType, key) } func (db *DB) BPersist(key []byte) (int64, error) { @@ -798,7 +806,7 @@ func (db *DB) BPersist(key []byte) (int64, error) { t.Lock() defer t.Unlock() - n, err := db.rmExpire(t, binType, key) + n, err := db.rmExpire(t, BitType, key) if err != nil { return 0, err } @@ -818,14 +826,14 @@ func (db *DB) bFlush() (drop int64, err error) { minKey := make([]byte, 2) minKey[0] = db.index - minKey[1] = binType + minKey[1] = BitType maxKey := make([]byte, 2) maxKey[0] = db.index - maxKey[1] = binMetaType + 1 + maxKey[1] = BitMetaType + 1 drop, err = db.flushRegion(t, minKey, maxKey) - err = db.expFlush(t, binType) + err = db.expFlush(t, BitType) err = t.Commit() return diff --git a/ledis/t_bin_test.go b/ledis/t_bit_test.go similarity index 100% rename from ledis/t_bin_test.go rename to ledis/t_bit_test.go diff --git a/ledis/t_hash.go b/ledis/t_hash.go index 711d83b..5f6be64 100644 --- a/ledis/t_hash.go +++ b/ledis/t_hash.go @@ -33,14 +33,14 @@ func (db *DB) hEncodeSizeKey(key []byte) []byte { buf := make([]byte, len(key)+2) buf[0] = db.index - buf[1] = hSizeType + buf[1] = HSizeType copy(buf[2:], key) return buf } func (db *DB) hDecodeSizeKey(ek []byte) ([]byte, error) { - if len(ek) < 2 || ek[0] != db.index || ek[1] != hSizeType { + if len(ek) < 2 || ek[0] != db.index || ek[1] != HSizeType { return nil, errHSizeKey } @@ -53,7 +53,7 @@ func (db *DB) hEncodeHashKey(key []byte, field []byte) []byte { pos := 0 buf[pos] = db.index pos++ - buf[pos] = hashType + buf[pos] = HashType pos++ binary.BigEndian.PutUint16(buf[pos:], uint16(len(key))) @@ -70,7 +70,7 @@ func (db *DB) hEncodeHashKey(key []byte, field []byte) []byte { } func (db *DB) hDecodeHashKey(ek []byte) ([]byte, []byte, error) { - if len(ek) < 5 || ek[0] != db.index || ek[1] != hashType { + if len(ek) < 5 || ek[0] != db.index || ek[1] != HashType { return nil, nil, errHashKey } @@ -151,7 +151,7 @@ func (db *DB) hExpireAt(key []byte, when int64) (int64, error) { if hlen, err := db.HLen(key); err != nil || hlen == 0 { return 0, err } else { - db.expireAt(t, hashType, key, when) + db.expireAt(t, HashType, key, when) if err := t.Commit(); err != nil { return 0, err } @@ -304,7 +304,7 @@ func (db *DB) hIncrSize(key []byte, delta int64) (int64, error) { if size <= 0 { size = 0 t.Delete(sk) - db.rmExpire(t, hashType, key) + db.rmExpire(t, HashType, key) } else { t.Put(sk, PutInt64(size)) } @@ -428,7 +428,7 @@ func (db *DB) HClear(key []byte) (int64, error) { defer t.Unlock() num := db.hDelete(t, key) - db.rmExpire(t, hashType, key) + db.rmExpire(t, HashType, key) err := t.Commit() return num, err @@ -445,7 +445,7 @@ func (db *DB) HMclear(keys ...[]byte) (int64, error) { } db.hDelete(t, key) - db.rmExpire(t, hashType, key) + db.rmExpire(t, HashType, key) } err := t.Commit() @@ -455,18 +455,18 @@ func (db *DB) HMclear(keys ...[]byte) (int64, error) { func (db *DB) hFlush() (drop int64, err error) { minKey := make([]byte, 2) minKey[0] = db.index - minKey[1] = hashType + minKey[1] = HashType maxKey := make([]byte, 2) maxKey[0] = db.index - maxKey[1] = hSizeType + 1 + maxKey[1] = HSizeType + 1 t := db.kvTx t.Lock() defer t.Unlock() drop, err = db.flushRegion(t, minKey, maxKey) - err = db.expFlush(t, hashType) + err = db.expFlush(t, HashType) err = t.Commit() return @@ -530,7 +530,7 @@ func (db *DB) HTTL(key []byte) (int64, error) { return -1, err } - return db.ttl(hashType, key) + return db.ttl(HashType, key) } func (db *DB) HPersist(key []byte) (int64, error) { @@ -542,7 +542,7 @@ func (db *DB) HPersist(key []byte) (int64, error) { t.Lock() defer t.Unlock() - n, err := db.rmExpire(t, hashType, key) + n, err := db.rmExpire(t, HashType, key) if err != nil { return 0, err } diff --git a/ledis/t_kv.go b/ledis/t_kv.go index b62700b..116aa87 100644 --- a/ledis/t_kv.go +++ b/ledis/t_kv.go @@ -31,13 +31,13 @@ func checkValueSize(value []byte) error { func (db *DB) encodeKVKey(key []byte) []byte { ek := make([]byte, len(key)+2) ek[0] = db.index - ek[1] = kvType + ek[1] = KVType copy(ek[2:], key) return ek } func (db *DB) decodeKVKey(ek []byte) ([]byte, error) { - if len(ek) < 2 || ek[0] != db.index || ek[1] != kvType { + if len(ek) < 2 || ek[0] != db.index || ek[1] != KVType { return nil, errKVKey } @@ -51,7 +51,7 @@ func (db *DB) encodeKVMinKey() []byte { func (db *DB) encodeKVMaxKey() []byte { ek := db.encodeKVKey(nil) - ek[len(ek)-1] = kvType + 1 + ek[len(ek)-1] = KVType + 1 return ek } @@ -100,7 +100,7 @@ func (db *DB) setExpireAt(key []byte, when int64) (int64, error) { if exist, err := db.Exists(key); err != nil || exist == 0 { return 0, err } else { - db.expireAt(t, kvType, key, when) + db.expireAt(t, KVType, key, when) if err := t.Commit(); err != nil { return 0, err } @@ -132,7 +132,7 @@ func (db *DB) Del(keys ...[]byte) (int64, error) { for i, k := range keys { t.Delete(codedKeys[i]) - db.rmExpire(t, kvType, k) + db.rmExpire(t, KVType, k) } err := t.Commit() @@ -317,7 +317,7 @@ func (db *DB) flush() (drop int64, err error) { defer t.Unlock() drop, err = db.flushRegion(t, minKey, maxKey) - err = db.expFlush(t, kvType) + err = db.expFlush(t, KVType) err = t.Commit() return @@ -382,7 +382,7 @@ func (db *DB) TTL(key []byte) (int64, error) { return -1, err } - return db.ttl(kvType, key) + return db.ttl(KVType, key) } func (db *DB) Persist(key []byte) (int64, error) { @@ -393,7 +393,7 @@ func (db *DB) Persist(key []byte) (int64, error) { t := db.kvTx t.Lock() defer t.Unlock() - n, err := db.rmExpire(t, kvType, key) + n, err := db.rmExpire(t, KVType, key) if err != nil { return 0, err } diff --git a/ledis/t_list.go b/ledis/t_list.go index 2f17609..a7ab1ac 100644 --- a/ledis/t_list.go +++ b/ledis/t_list.go @@ -23,14 +23,14 @@ var errListSeq = errors.New("invalid list sequence, overflow") func (db *DB) lEncodeMetaKey(key []byte) []byte { buf := make([]byte, len(key)+2) buf[0] = db.index - buf[1] = lMetaType + buf[1] = LMetaType copy(buf[2:], key) return buf } func (db *DB) lDecodeMetaKey(ek []byte) ([]byte, error) { - if len(ek) < 2 || ek[0] != db.index || ek[1] != lMetaType { + if len(ek) < 2 || ek[0] != db.index || ek[1] != LMetaType { return nil, errLMetaKey } @@ -43,7 +43,7 @@ func (db *DB) lEncodeListKey(key []byte, seq int32) []byte { pos := 0 buf[pos] = db.index pos++ - buf[pos] = listType + buf[pos] = ListType pos++ binary.BigEndian.PutUint16(buf[pos:], uint16(len(key))) @@ -58,7 +58,7 @@ func (db *DB) lEncodeListKey(key []byte, seq int32) []byte { } func (db *DB) lDecodeListKey(ek []byte) (key []byte, seq int32, err error) { - if len(ek) < 8 || ek[0] != db.index || ek[1] != listType { + if len(ek) < 8 || ek[0] != db.index || ek[1] != ListType { err = errListKey return } @@ -175,7 +175,7 @@ func (db *DB) lpop(key []byte, whereSeq int32) ([]byte, error) { t.Delete(itemKey) size := db.lSetMeta(metaKey, headSeq, tailSeq) if size == 0 { - db.rmExpire(t, hashType, key) + db.rmExpire(t, HashType, key) } err = t.Commit() @@ -264,7 +264,7 @@ func (db *DB) lExpireAt(key []byte, when int64) (int64, error) { if llen, err := db.LLen(key); err != nil || llen == 0 { return 0, err } else { - db.expireAt(t, listType, key, when) + db.expireAt(t, ListType, key, when) if err := t.Commit(); err != nil { return 0, err } @@ -398,7 +398,7 @@ func (db *DB) LClear(key []byte) (int64, error) { defer t.Unlock() num := db.lDelete(t, key) - db.rmExpire(t, listType, key) + db.rmExpire(t, ListType, key) err := t.Commit() return num, err @@ -415,7 +415,7 @@ func (db *DB) LMclear(keys ...[]byte) (int64, error) { } db.lDelete(t, key) - db.rmExpire(t, listType, key) + db.rmExpire(t, ListType, key) } @@ -426,18 +426,18 @@ func (db *DB) LMclear(keys ...[]byte) (int64, error) { func (db *DB) lFlush() (drop int64, err error) { minKey := make([]byte, 2) minKey[0] = db.index - minKey[1] = listType + minKey[1] = ListType maxKey := make([]byte, 2) maxKey[0] = db.index - maxKey[1] = lMetaType + 1 + maxKey[1] = LMetaType + 1 t := db.listTx t.Lock() defer t.Unlock() drop, err = db.flushRegion(t, minKey, maxKey) - err = db.expFlush(t, listType) + err = db.expFlush(t, ListType) err = t.Commit() return @@ -464,7 +464,7 @@ func (db *DB) LTTL(key []byte) (int64, error) { return -1, err } - return db.ttl(listType, key) + return db.ttl(ListType, key) } func (db *DB) LPersist(key []byte) (int64, error) { @@ -476,7 +476,7 @@ func (db *DB) LPersist(key []byte) (int64, error) { t.Lock() defer t.Unlock() - n, err := db.rmExpire(t, listType, key) + n, err := db.rmExpire(t, ListType, key) if err != nil { return 0, err } diff --git a/ledis/t_ttl.go b/ledis/t_ttl.go index f1614bd..aa34b73 100644 --- a/ledis/t_ttl.go +++ b/ledis/t_ttl.go @@ -26,7 +26,7 @@ func (db *DB) expEncodeTimeKey(dataType byte, key []byte, when int64) []byte { buf := make([]byte, len(key)+11) buf[0] = db.index - buf[1] = expTimeType + buf[1] = ExpTimeType buf[2] = dataType pos := 3 @@ -42,7 +42,7 @@ func (db *DB) expEncodeMetaKey(dataType byte, key []byte) []byte { buf := make([]byte, len(key)+3) buf[0] = db.index - buf[1] = expMetaType + buf[1] = ExpMetaType buf[2] = dataType pos := 3 @@ -52,7 +52,7 @@ func (db *DB) expEncodeMetaKey(dataType byte, key []byte) []byte { } func (db *DB) expDecodeMetaKey(mk []byte) (byte, []byte, error) { - if len(mk) <= 3 || mk[0] != db.index || mk[1] != expMetaType { + if len(mk) <= 3 || mk[0] != db.index || mk[1] != ExpMetaType { return 0, nil, errExpMetaKey } @@ -60,7 +60,7 @@ func (db *DB) expDecodeMetaKey(mk []byte) (byte, []byte, error) { } func (db *DB) expDecodeTimeKey(tk []byte) (byte, []byte, int64, error) { - if len(tk) < 11 || tk[0] != db.index || tk[1] != expTimeType { + if len(tk) < 11 || tk[0] != db.index || tk[1] != ExpTimeType { return 0, nil, 0, errExpTimeKey } @@ -114,12 +114,12 @@ func (db *DB) rmExpire(t *tx, dataType byte, key []byte) (int64, error) { func (db *DB) expFlush(t *tx, dataType byte) (err error) { minKey := make([]byte, 3) minKey[0] = db.index - minKey[1] = expTimeType + minKey[1] = ExpTimeType minKey[2] = dataType maxKey := make([]byte, 3) maxKey[0] = db.index - maxKey[1] = expMetaType + maxKey[1] = ExpMetaType maxKey[2] = dataType + 1 _, err = db.flushRegion(t, minKey, maxKey) @@ -153,7 +153,7 @@ func (eli *elimination) active() { db := eli.db dbGet := db.db.Get - minKey := db.expEncodeTimeKey(noneType, nil, 0) + minKey := db.expEncodeTimeKey(NoneType, nil, 0) maxKey := db.expEncodeTimeKey(maxDataType, nil, now) it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) diff --git a/ledis/t_zset.go b/ledis/t_zset.go index 9a2f105..4931ded 100644 --- a/ledis/t_zset.go +++ b/ledis/t_zset.go @@ -45,14 +45,14 @@ func checkZSetKMSize(key []byte, member []byte) error { func (db *DB) zEncodeSizeKey(key []byte) []byte { buf := make([]byte, len(key)+2) buf[0] = db.index - buf[1] = zSizeType + buf[1] = ZSizeType copy(buf[2:], key) return buf } func (db *DB) zDecodeSizeKey(ek []byte) ([]byte, error) { - if len(ek) < 2 || ek[0] != db.index || ek[1] != zSizeType { + if len(ek) < 2 || ek[0] != db.index || ek[1] != ZSizeType { return nil, errZSizeKey } @@ -66,7 +66,7 @@ func (db *DB) zEncodeSetKey(key []byte, member []byte) []byte { buf[pos] = db.index pos++ - buf[pos] = zsetType + buf[pos] = ZSetType pos++ binary.BigEndian.PutUint16(buf[pos:], uint16(len(key))) @@ -84,7 +84,7 @@ func (db *DB) zEncodeSetKey(key []byte, member []byte) []byte { } func (db *DB) zDecodeSetKey(ek []byte) ([]byte, []byte, error) { - if len(ek) < 5 || ek[0] != db.index || ek[1] != zsetType { + if len(ek) < 5 || ek[0] != db.index || ek[1] != ZSetType { return nil, nil, errZSetKey } @@ -121,7 +121,7 @@ func (db *DB) zEncodeScoreKey(key []byte, member []byte, score int64) []byte { buf[pos] = db.index pos++ - buf[pos] = zScoreType + buf[pos] = ZScoreType pos++ binary.BigEndian.PutUint16(buf[pos:], uint16(len(key))) @@ -158,7 +158,7 @@ func (db *DB) zEncodeStopScoreKey(key []byte, score int64) []byte { } func (db *DB) zDecodeScoreKey(ek []byte) (key []byte, member []byte, score int64, err error) { - if len(ek) < 14 || ek[0] != db.index || ek[1] != zScoreType { + if len(ek) < 14 || ek[0] != db.index || ek[1] != ZScoreType { err = errZScoreKey return } @@ -260,7 +260,7 @@ func (db *DB) zExpireAt(key []byte, when int64) (int64, error) { if zcnt, err := db.ZCard(key); err != nil || zcnt == 0 { return 0, err } else { - db.expireAt(t, zsetType, key, when) + db.expireAt(t, ZSetType, key, when) if err := t.Commit(); err != nil { return 0, err } @@ -314,7 +314,7 @@ func (db *DB) zIncrSize(t *tx, key []byte, delta int64) (int64, error) { if size <= 0 { size = 0 t.Delete(sk) - db.rmExpire(t, zsetType, key) + db.rmExpire(t, ZSetType, key) } else { t.Put(sk, PutInt64(size)) } @@ -734,11 +734,11 @@ func (db *DB) zFlush() (drop int64, err error) { minKey := make([]byte, 2) minKey[0] = db.index - minKey[1] = zsetType + minKey[1] = ZSetType maxKey := make([]byte, 2) maxKey[0] = db.index - maxKey[1] = zScoreType + 1 + maxKey[1] = ZScoreType + 1 it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) defer it.Close() @@ -753,7 +753,7 @@ func (db *DB) zFlush() (drop int64, err error) { } } - db.expFlush(t, zsetType) + db.expFlush(t, ZSetType) err = t.Commit() return @@ -819,7 +819,7 @@ func (db *DB) ZTTL(key []byte) (int64, error) { return -1, err } - return db.ttl(zsetType, key) + return db.ttl(ZSetType, key) } func (db *DB) ZPersist(key []byte) (int64, error) { @@ -831,7 +831,7 @@ func (db *DB) ZPersist(key []byte) (int64, error) { t.Lock() defer t.Unlock() - n, err := db.rmExpire(t, zsetType, key) + n, err := db.rmExpire(t, ZSetType, key) if err != nil { return 0, err } diff --git a/server/cmd_bin.go b/server/cmd_bit.go similarity index 100% rename from server/cmd_bin.go rename to server/cmd_bit.go diff --git a/server/cmd_bin_test.go b/server/cmd_bit_test.go similarity index 100% rename from server/cmd_bin_test.go rename to server/cmd_bit_test.go From eec14806d2d3ebb283313f916c1503ba474eb0fa Mon Sep 17 00:00:00 2001 From: siddontang Date: Fri, 11 Jul 2014 13:28:34 +0800 Subject: [PATCH 11/15] add support for read and print bin log --- ledis/binlog_util.go | 148 +++++++++++++++++++++++++++++++++++++++++++ ledis/replication.go | 25 ++++++-- 2 files changed, 169 insertions(+), 4 deletions(-) diff --git a/ledis/binlog_util.go b/ledis/binlog_util.go index bc1cd63..766d3e6 100644 --- a/ledis/binlog_util.go +++ b/ledis/binlog_util.go @@ -3,6 +3,8 @@ package ledis import ( "encoding/binary" "errors" + "fmt" + "strconv" ) var ( @@ -60,3 +62,149 @@ func encodeBinLogCommand(commandType uint8, args ...[]byte) []byte { func decodeBinLogCommand(sz []byte) (uint8, [][]byte, error) { return 0, nil, errBinLogCommandType } + +func FormatBinLogEvent(event []byte) (string, error) { + logType := uint8(event[0]) + + var err error + var k []byte + var v []byte + + var buf []byte = make([]byte, 0, 1024) + + switch logType { + case BinLogTypePut: + k, v, err = decodeBinLogPut(event) + buf = append(buf, "PUT "...) + case BinLogTypeDeletion: + k, err = decodeBinLogDelete(event) + buf = append(buf, "DELETE "...) + default: + err = errInvalidBinLogEvent + } + + if err != nil { + return "", err + } + + if buf, err = formatDataKey(buf, k); err != nil { + return "", err + } + + if v != nil && len(v) != 0 { + buf = append(buf, fmt.Sprintf(" %q", v)...) + } + + return String(buf), nil +} + +func formatDataKey(buf []byte, k []byte) ([]byte, error) { + if len(k) < 2 { + return nil, errInvalidBinLogEvent + } + + buf = append(buf, fmt.Sprintf("DB:%2d ", k[0])...) + buf = append(buf, fmt.Sprintf("%s ", TypeName[k[1]])...) + + db := new(DB) + db.index = k[0] + + //to do format at respective place + + switch k[1] { + case KVType: + if key, err := db.decodeKVKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + } + case HashType: + if key, field, err := db.hDecodeHashKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + buf = append(buf, ' ') + buf = strconv.AppendQuote(buf, String(field)) + } + case HSizeType: + if key, err := db.hDecodeSizeKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + } + case ListType: + if key, seq, err := db.lDecodeListKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + buf = append(buf, ' ') + buf = strconv.AppendInt(buf, int64(seq), 10) + } + case LMetaType: + if key, err := db.lDecodeMetaKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + } + case ZSetType: + if key, m, err := db.zDecodeSetKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + buf = append(buf, ' ') + buf = strconv.AppendQuote(buf, String(m)) + } + case ZSizeType: + if key, err := db.zDecodeSizeKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + } + case ZScoreType: + if key, m, score, err := db.zDecodeScoreKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + buf = append(buf, ' ') + buf = strconv.AppendQuote(buf, String(m)) + buf = append(buf, ' ') + buf = strconv.AppendInt(buf, score, 10) + } + case BitType: + if key, seq, err := db.bDecodeBinKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + buf = append(buf, ' ') + buf = strconv.AppendUint(buf, uint64(seq), 10) + } + case BitMetaType: + if key, err := db.bDecodeMetaKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + } + case ExpTimeType: + if tp, key, t, err := db.expDecodeTimeKey(k); err != nil { + return nil, err + } else { + buf = append(buf, TypeName[tp]...) + buf = append(buf, ' ') + buf = strconv.AppendQuote(buf, String(key)) + buf = append(buf, ' ') + buf = strconv.AppendInt(buf, t, 10) + } + case ExpMetaType: + if tp, key, err := db.expDecodeMetaKey(k); err != nil { + return nil, err + } else { + buf = append(buf, TypeName[tp]...) + buf = append(buf, ' ') + buf = strconv.AppendQuote(buf, String(key)) + } + default: + return nil, errInvalidBinLogEvent + } + + return buf, nil +} diff --git a/ledis/replication.go b/ledis/replication.go index e19da6a..bd6c192 100644 --- a/ledis/replication.go +++ b/ledis/replication.go @@ -10,6 +10,10 @@ import ( "os" ) +var ( + ErrSkipEvent = errors.New("skip to next event") +) + var ( errInvalidBinLogEvent = errors.New("invalid binglog event") errInvalidBinLogFile = errors.New("invalid binlog file") @@ -71,7 +75,7 @@ func (l *Ledis) replicateCommandEvent(event []byte) error { return errors.New("command event not supported now") } -func (l *Ledis) ReplicateFromReader(rb io.Reader) error { +func ReadEventFromReader(rb io.Reader, f func(createTime uint32, event []byte) error) error { var createTime uint32 var dataLen uint32 var dataBuf bytes.Buffer @@ -94,9 +98,9 @@ func (l *Ledis) ReplicateFromReader(rb io.Reader) error { return err } - err = l.ReplicateEvent(dataBuf.Bytes()) - if err != nil { - log.Fatal("replication error %s, skip to next", err.Error()) + err = f(createTime, dataBuf.Bytes()) + if err != nil && err != ErrSkipEvent { + return err } dataBuf.Reset() @@ -105,6 +109,19 @@ func (l *Ledis) ReplicateFromReader(rb io.Reader) error { return nil } +func (l *Ledis) ReplicateFromReader(rb io.Reader) error { + f := func(createTime uint32, event []byte) error { + err := l.ReplicateEvent(event) + if err != nil { + log.Fatal("replication error %s, skip to next", err.Error()) + return ErrSkipEvent + } + return nil + } + + return ReadEventFromReader(rb, f) +} + func (l *Ledis) ReplicateFromData(data []byte) error { rb := bytes.NewReader(data) From 31320944e6f6afe08176d881a871982dde211479 Mon Sep 17 00:00:00 2001 From: siddontang Date: Fri, 11 Jul 2014 13:28:59 +0800 Subject: [PATCH 12/15] add handle binlog tool --- cmd/ledis-binlog/main.go | 85 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 cmd/ledis-binlog/main.go diff --git a/cmd/ledis-binlog/main.go b/cmd/ledis-binlog/main.go new file mode 100644 index 0000000..7212b0e --- /dev/null +++ b/cmd/ledis-binlog/main.go @@ -0,0 +1,85 @@ +package main + +import ( + "bufio" + "flag" + "fmt" + "github.com/siddontang/ledisdb/ledis" + "os" + "time" +) + +var TimeFormat = "2006-01-02 15:04:05" + +var startDateTime = flag.String("start-datetime", "", + "Start reading the binary log at the first event having a timestamp equal to or later than the datetime argument.") +var stopDateTime = flag.String("stop-datetime", "", + "Stop reading the binary log at the first event having a timestamp equal to or earlier than the datetime argument.") + +var startTime uint32 = 0 +var stopTime uint32 = 0xFFFFFFFF + +func main() { + flag.Usage = func() { + fmt.Fprintf(os.Stderr, "Usage of %s [options] log_file\n", os.Args[0]) + flag.PrintDefaults() + } + + flag.Parse() + + logFile := flag.Arg(0) + f, err := os.Open(logFile) + if err != nil { + println(err.Error()) + return + } + defer f.Close() + + var t time.Time + + if len(*startDateTime) > 0 { + if t, err = time.Parse(TimeFormat, *startDateTime); err != nil { + println("parse start-datetime error: ", err.Error()) + return + } + + startTime = uint32(t.Unix()) + } + + if len(*stopDateTime) > 0 { + if t, err = time.Parse(TimeFormat, *stopDateTime); err != nil { + println("parse stop-datetime error: ", err.Error()) + return + } + + stopTime = uint32(t.Unix()) + } + + rb := bufio.NewReaderSize(f, 4096) + err = ledis.ReadEventFromReader(rb, printEvent) + if err != nil { + println("read event error: ", err.Error()) + return + } +} + +func printEvent(createTime uint32, event []byte) error { + if createTime < startTime || createTime > stopTime { + return nil + } + + t := time.Unix(int64(createTime), 0) + + fmt.Printf("%s ", t.Format(TimeFormat)) + + s, err := ledis.FormatBinLogEvent(event) + if err != nil { + fmt.Printf("%s", err.Error()) + } else { + fmt.Printf(s) + } + + fmt.Printf("\n") + + return nil +} From ac5c64aab8881729c48a9a035377eee60f7f8259 Mon Sep 17 00:00:00 2001 From: siddontang Date: Fri, 11 Jul 2014 16:57:19 +0800 Subject: [PATCH 13/15] add MasterInfo for replication --- server/replication.go | 140 ++++++++++++++++++++++-------------------- 1 file changed, 73 insertions(+), 67 deletions(-) diff --git a/server/replication.go b/server/replication.go index ee74466..4d185a1 100644 --- a/server/replication.go +++ b/server/replication.go @@ -23,13 +23,55 @@ var ( errConnectMaster = errors.New("connect master error") ) +type MasterInfo struct { + Addr string `json:"addr"` + LogFileIndex int64 `json:"log_file_index"` + LogPos int64 `json:"log_pos"` +} + +func (m *MasterInfo) Save(filePath string) error { + data, err := json.Marshal(m) + if err != nil { + return err + } + + filePathBak := fmt.Sprintf("%s.bak", filePath) + + var fd *os.File + fd, err = os.OpenFile(filePathBak, os.O_CREATE|os.O_WRONLY, os.ModePerm) + if err != nil { + return err + } + + if _, err = fd.Write(data); err != nil { + fd.Close() + return err + } + + fd.Close() + return os.Rename(filePathBak, filePath) +} + +func (m *MasterInfo) Load(filePath string) error { + data, err := ioutil.ReadFile(filePath) + if err != nil { + if os.IsNotExist(err) { + return nil + } else { + return err + } + } + + if err = json.Unmarshal(data, m); err != nil { + return err + } + + return nil +} + type master struct { sync.Mutex - addr string `json:"addr"` - logFileIndex int64 `json:"log_file_index"` - logPos int64 `json:"log_pos"` - c net.Conn rb *bufio.Reader @@ -37,8 +79,9 @@ type master struct { quit chan struct{} - infoName string - infoNameBak string + infoName string + + info *MasterInfo wg sync.WaitGroup @@ -52,12 +95,13 @@ func newMaster(app *App) *master { m.app = app m.infoName = path.Join(m.app.cfg.DataDir, "master.info") - m.infoNameBak = fmt.Sprintf("%s.bak", m.infoName) m.quit = make(chan struct{}, 1) m.compressBuf = make([]byte, 256) + m.info = new(MasterInfo) + //if load error, we will start a fullsync later m.loadInfo() @@ -79,53 +123,15 @@ func (m *master) Close() { } func (m *master) loadInfo() error { - data, err := ioutil.ReadFile(m.infoName) - if err != nil { - if os.IsNotExist(err) { - return nil - } else { - return err - } - } - - if err = json.Unmarshal(data, m); err != nil { - return err - } - - return nil + return m.info.Load(m.infoName) } func (m *master) saveInfo() error { - data, err := json.Marshal(struct { - Addr string `json:"addr"` - LogFileIndex int64 `json:"log_file_index"` - LogPos int64 `json:"log_pos"` - }{ - m.addr, - m.logFileIndex, - m.logPos, - }) - if err != nil { - return err - } - - var fd *os.File - fd, err = os.OpenFile(m.infoNameBak, os.O_CREATE|os.O_WRONLY, os.ModePerm) - if err != nil { - return err - } - - if _, err = fd.Write(data); err != nil { - fd.Close() - return err - } - - fd.Close() - return os.Rename(m.infoNameBak, m.infoName) + return m.info.Save(m.infoName) } func (m *master) connect() error { - if len(m.addr) == 0 { + if len(m.info.Addr) == 0 { return fmt.Errorf("no assign master addr") } @@ -134,7 +140,7 @@ func (m *master) connect() error { m.c = nil } - if c, err := net.Dial("tcp", m.addr); err != nil { + if c, err := net.Dial("tcp", m.info.Addr); err != nil { return err } else { m.c = c @@ -145,9 +151,9 @@ func (m *master) connect() error { } func (m *master) resetInfo(addr string) { - m.addr = addr - m.logFileIndex = 0 - m.logPos = 0 + m.info.Addr = addr + m.info.LogFileIndex = 0 + m.info.LogPos = 0 } func (m *master) stopReplication() error { @@ -165,7 +171,7 @@ func (m *master) startReplication(masterAddr string) error { //stop last replcation, if avaliable m.Close() - if masterAddr != m.addr { + if masterAddr != m.info.Addr { m.resetInfo(masterAddr) if err := m.saveInfo(); err != nil { log.Error("save master info error %s", err.Error()) @@ -189,20 +195,20 @@ func (m *master) runReplication() { return default: if err := m.connect(); err != nil { - log.Error("connect master %s error %s, try 2s later", m.addr, err.Error()) + log.Error("connect master %s error %s, try 2s later", m.info.Addr, err.Error()) time.Sleep(2 * time.Second) continue } } - if m.logFileIndex == 0 { + if m.info.LogFileIndex == 0 { //try a fullsync if err := m.fullSync(); err != nil { log.Warn("full sync error %s", err.Error()) return } - if m.logFileIndex == 0 { + if m.info.LogFileIndex == 0 { //master not support binlog, we cannot sync, so stop replication m.stopReplication() return @@ -211,14 +217,14 @@ func (m *master) runReplication() { for { for { - lastIndex := m.logFileIndex - lastPos := m.logPos + lastIndex := m.info.LogFileIndex + lastPos := m.info.LogPos if err := m.sync(); err != nil { log.Warn("sync error %s", err.Error()) return } - if m.logFileIndex == lastIndex && m.logPos == lastPos { + if m.info.LogFileIndex == lastIndex && m.info.LogPos == lastPos { //sync no data, wait 1s and retry break } @@ -273,15 +279,15 @@ func (m *master) fullSync() error { return err } - m.logFileIndex = head.LogFileIndex - m.logPos = head.LogPos + m.info.LogFileIndex = head.LogFileIndex + m.info.LogPos = head.LogPos return m.saveInfo() } func (m *master) sync() error { - logIndexStr := strconv.FormatInt(m.logFileIndex, 10) - logPosStr := strconv.FormatInt(m.logPos, 10) + logIndexStr := strconv.FormatInt(m.info.LogFileIndex, 10) + logPosStr := strconv.FormatInt(m.info.LogPos, 10) cmd := ledis.Slice(fmt.Sprintf(syncCmdFormat, len(logIndexStr), logIndexStr, len(logPosStr), logPosStr)) @@ -308,14 +314,14 @@ func (m *master) sync() error { return fmt.Errorf("invalid sync data len %d", len(buf)) } - m.logFileIndex = int64(binary.BigEndian.Uint64(buf[0:8])) - m.logPos = int64(binary.BigEndian.Uint64(buf[8:16])) + m.info.LogFileIndex = int64(binary.BigEndian.Uint64(buf[0:8])) + m.info.LogPos = int64(binary.BigEndian.Uint64(buf[8:16])) - if m.logFileIndex == 0 { + if m.info.LogFileIndex == 0 { //master now not support binlog, stop replication m.stopReplication() return nil - } else if m.logFileIndex == -1 { + } else if m.info.LogFileIndex == -1 { //-1 means than binlog index and pos are lost, we must start a full sync instead return m.fullSync() } From 31166efd5ca3b9d16c6ad473bae6d727fad495fd Mon Sep 17 00:00:00 2001 From: siddontang Date: Fri, 11 Jul 2014 16:57:25 +0800 Subject: [PATCH 14/15] add ledis-load --- cmd/ledis-load/main.go | 86 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 cmd/ledis-load/main.go diff --git a/cmd/ledis-load/main.go b/cmd/ledis-load/main.go new file mode 100644 index 0000000..da1fad3 --- /dev/null +++ b/cmd/ledis-load/main.go @@ -0,0 +1,86 @@ +package main + +import ( + "encoding/json" + "flag" + "github.com/siddontang/ledisdb/ledis" + "github.com/siddontang/ledisdb/server" + "io/ioutil" + "path" +) + +var configPath = flag.String("config", "/etc/ledis.json", "ledisdb config file") +var dumpPath = flag.String("dump_file", "", "ledisdb dump file") +var masterAddr = flag.String("master_addr", "", + "master addr to set where dump file comes from, if not set correctly, next slaveof may cause fullsync") + +func main() { + flag.Parse() + + if len(*configPath) == 0 { + println("need ledis config file") + return + } + + data, err := ioutil.ReadFile(*configPath) + if err != nil { + println(err.Error()) + return + } + + if len(*dumpPath) == 0 { + println("need dump file") + return + } + + var cfg ledis.Config + if err = json.Unmarshal(data, &cfg); err != nil { + println(err.Error()) + return + } + + if len(cfg.DataDir) == 0 { + println("must set data dir") + return + } + + ldb, err := ledis.Open(&cfg) + if err != nil { + println("ledis open error ", err.Error()) + return + } + + err = loadDump(&cfg, ldb) + ldb.Close() + + if err != nil { + println(err.Error()) + return + } + + println("Load OK") +} + +func loadDump(cfg *ledis.Config, ldb *ledis.Ledis) error { + var err error + if err = ldb.FlushAll(); err != nil { + return err + } + + var head *ledis.MasterInfo + head, err = ldb.LoadDumpFile(*dumpPath) + + if err != nil { + return err + } + + info := new(server.MasterInfo) + + info.Addr = *masterAddr + info.LogFileIndex = head.LogFileIndex + info.LogPos = head.LogPos + + infoFile := path.Join(cfg.DataDir, "master.info") + + return info.Save(infoFile) +} From 8abf1be6cda9f75ea7eeca91906864a2f482dce4 Mon Sep 17 00:00:00 2001 From: siddontang Date: Fri, 11 Jul 2014 17:30:40 +0800 Subject: [PATCH 15/15] update read me --- README.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/README.md b/README.md index 123e413..6d7f61f 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,17 @@ Ledisdb is a high performance NoSQL like Redis based on LevelDB written by go. It supports some advanced data structure like kv, list, hash and zset, and may be alternative for Redis. +## Features + ++ Rich advanced data structure: KV, List, Hash, ZSet, Bit. ++ Uses leveldb to store lots of data, over the memory limit. ++ Supports expiration and ttl. ++ Redis clients, like redis-cli, are supported directly. ++ Multi client API supports, including Golang, Python, Lua(Openresty). ++ Easily to embed in Golang application. ++ Replication to guarantee data safe. ++ Supplies tools to load, dump, repair database. + ## Build and Install + Create a workspace and checkout ledisdb source