diff --git a/README.md b/README.md index 9943c33..6d7f61f 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,17 @@ -# ledisdb +# LedisDB -Ledisdb is a high performance nosql like redis based on leveldb written by go. It's supports some advanced data structure like kv, list, hash and zset. +Ledisdb is a high performance NoSQL like Redis based on LevelDB written by go. It supports some advanced data structure like kv, list, hash and zset, and may be alternative for Redis. + +## Features + ++ Rich advanced data structure: KV, List, Hash, ZSet, Bit. ++ Uses leveldb to store lots of data, over the memory limit. ++ Supports expiration and ttl. ++ Redis clients, like redis-cli, are supported directly. ++ Multi client API supports, including Golang, Python, Lua(Openresty). ++ Easily to embed in Golang application. ++ Replication to guarantee data safe. ++ Supplies tools to load, dump, repair database. ## Build and Install diff --git a/README_CN.md b/README_CN.md deleted file mode 100644 index 8faade3..0000000 --- a/README_CN.md +++ /dev/null @@ -1,94 +0,0 @@ -# ledisdb - -ledisdb是一个用go实现的类似redis的高性能nosql数据库,底层基于leveldb实现。提供了kv,list,hash以及zset几种数据结构的支持。 - -最开始源于[ssdb](https://github.com/ideawu/ssdb),在使用了一段时间之后,因为兴趣的原因,决定用go实现一个。 - -## 编译 - -+ 创建一个工作目录,并check ledisdb源码 - - mkdir $WORKSPACE - cd $WORKSPACE - git clone git@github.com:siddontang/ledisdb.git src/github.com/siddontang/ledisdb - - cd src/github.com/siddontang/ledisdb - -+ 安装leveldb以及snappy,如果你已经安装,忽略 - - 我提供了一个简单的脚本进行leveldb的安装,你可以直接在shell中输入: - - sh build_leveldb.sh - - 默认该脚本会将leveldb以及snappy安装到/usr/local/leveldb以及/usr/local/snappy目录 - -+ 在dev.sh里面设置LEVELDB_DIR以及SNAPPY_DIR为实际的安装路径,默认为/usr/local/leveldb以及/usr/local/snappy - -+ 运行bootstrap.sh构建ledisdb go的依赖库 - - . ./bootstap.sh 或者 source ./bootstrap.sh - -+ 运行dev.sh - - . ./dev.sh 或者 source ./dev.sh - -+ 编译安装ledisdb - - go install ./... - -## 运行 - - ./ledis-server -config=/etc/ledis.json - - //another shell - ledis-cli -p 6380 - - ledis 127.0.0.1:6380> set a 1 - OK - ledis 127.0.0.1:6380> get a - "1" - -## 嵌入库 - - import "github.com/siddontang/ledisdb/ledis" - l, _ := ledis.Open(cfg) - db, _ := l.Select(0) - - db.Set(key, value) - - db.Get(key) - -## Benchmark - -可以通过查看benchmark.md获取最新的性能测试结果 - -## Replication - -通过配置或者运行时输入slaveof开启slave的replication功能 - - ledis-cli -p 6381 - - ledis 127.0.0.1:6381> slaveof 127.0.0.1:6380 - OK - -## Todo - -+ Admin - -## GoDoc - -[![GoDoc](https://godoc.org/github.com/siddontang/ledisdb?status.png)](https://godoc.org/github.com/siddontang/ledisdb) - -## Commands - -一些命令的解释在[这里](https://github.com/siddontang/ledisdb/wiki/Commands), 后续会不断加入。 - -## 感谢 - -Gmail: cenqichao@gmail.com - -Gmail: chendahui007@gmail.com - -## 联系我 - -Gmail: siddontang@gmail.com \ No newline at end of file diff --git a/client/ledis-py/README.md b/client/ledis-py/README.md index 616e911..23549d0 100644 --- a/client/ledis-py/README.md +++ b/client/ledis-py/README.md @@ -47,5 +47,46 @@ For full API reference, please visit [rtfd](http://ledis-py.readthedocs.org/). ### Connection Pools -### Connnections +Behind the scenes, ledis-py uses a connection pool to manage connections to a Ledis server. By default, each Ledis instance you create will in turn create its own connection pool. You can override this behavior and use an existing connection pool by passing an already created connection pool instance to the connection_pool argument of the Ledis class. You may choose to do this in order to implement client side sharding or have finer grain control of how connections are managed. +``` +>>> pool = ledis.ConnectionPool(host='localhost', port=6380, db=0) +>>> l = ledis.Ledis(connection_pool=pool) +``` + +### Connections + +ConnectionPools manage a set of Connection instances. ledis-py ships with two types of Connections. The default, Connection, is a normal TCP socket based connection. The UnixDomainSocketConnection allows for clients running on the same device as the server to connect via a unix domain socket. To use a UnixDomainSocketConnection connection, simply pass the unix_socket_path argument, which is a string to the unix domain socket file. Additionally, make sure the unixsocket parameter is defined in your `ledis.json` file. e.g.: + +``` +{ + "addr": "/tmp/ledis.sock", + ... +} +``` + +``` +>>> l = ledis.Ledis(unix_socket_path='/tmp/ledis.sock') +``` + +You can create your own Connection subclasses as well. This may be useful if you want to control the socket behavior within an async framework. To instantiate a client class using your own connection, you need to create a connection pool, passing your class to the connection_class argument. Other keyword parameters your pass to the pool will be passed to the class specified during initialization. + +``` +>>> pool = ledis.ConnectionPool(connection_class=YourConnectionClass, + your_arg='...', ...) +``` + +e.g.: + +``` +>>> from ledis import UnixDomainSocketConnection +>>> pool = ledis.ConnectionPool(connection_class=UnixDomainSocketConnection, path='/tmp/ledis.sock') +``` + +## Response Callbacks + +The client class uses a set of callbacks to cast Ledis responses to the appropriate Python type. There are a number of these callbacks defined on the Ledis client class in a dictionary called RESPONSE_CALLBACKS. + +Custom callbacks can be added on a per-instance basis using the `set_response_callback` method. This method accepts two arguments: a command name and the callback. Callbacks added in this manner are only valid on the instance the callback is added to. If you want to define or override a callback globally, you should make a subclass of the Ledis client and add your callback to its RESPONSE_CALLBACKS class dictionary. + +Response callbacks take at least one parameter: the response from the Ledis server. Keyword arguments may also be accepted in order to further control how to interpret the response. These keyword arguments are specified during the command's call to execute_command. The ZRANGE implementation demonstrates the use of response callback keyword arguments with its "withscores" argument. \ No newline at end of file diff --git a/client/ledis-py/ledis/client.py b/client/ledis-py/ledis/client.py index c159b8f..a855e84 100644 --- a/client/ledis-py/ledis/client.py +++ b/client/ledis-py/ledis/client.py @@ -112,14 +112,14 @@ class Ledis(object): For example:: - redis://[:password]@localhost:6380/0 - unix://[:password]@/path/to/socket.sock?db=0 + ledis://localhost:6380/0 + unix:///path/to/socket.sock?db=0 There are several ways to specify a database number. The parse function will return the first specified option: - 1. A ``db`` querystring option, e.g. redis://localhost?db=0 - 2. If using the redis:// scheme, the path argument of the url, e.g. - redis://localhost/0 + 1. A ``db`` querystring option, e.g. ledis://localhost?db=0 + 2. If using the ledis:// scheme, the path argument of the url, e.g. + ledis://localhost/0 3. The ``db`` argument to this function. If none of these options are specified, db=0 is used. diff --git a/client/ledis-py/ledis/connection.py b/client/ledis-py/ledis/connection.py index 18cf3b3..4a39317 100644 --- a/client/ledis-py/ledis/connection.py +++ b/client/ledis-py/ledis/connection.py @@ -145,7 +145,7 @@ DefaultParser = PythonParser class Connection(object): "Manages TCP communication to and from a Ledis server" - def __init__(self, host='localhost', port=6380, db=0, password=None, + def __init__(self, host='localhost', port=6380, db=0, socket_timeout=None, encoding='utf-8', encoding_errors='strict', decode_responses=False, parser_class=DefaultParser): @@ -153,7 +153,6 @@ class Connection(object): self.host = host self.port = port self.db = db - self.password = password self.socket_timeout = socket_timeout self.encoding = encoding self.encoding_errors = encoding_errors @@ -201,12 +200,6 @@ class Connection(object): "Initialize the connection, authenticate and select a database" self._parser.on_connect(self) - # if a password is specified, authenticate - if self.password: - self.send_command('AUTH', self.password) - if nativestr(self.read_response()) != 'OK': - raise AuthenticationError('Invalid Password') - # if a database is specified, switch to it if self.db: self.send_command('SELECT', self.db) @@ -283,14 +276,12 @@ class Connection(object): class UnixDomainSocketConnection(Connection): - def __init__(self, path='', db=0, password=None, - socket_timeout=None, encoding='utf-8', + def __init__(self, path='', db=0, socket_timeout=None, encoding='utf-8', encoding_errors='strict', decode_responses=False, parser_class=DefaultParser): self.pid = os.getpid() self.path = path self.db = db - self.password = password self.socket_timeout = socket_timeout self.encoding = encoding self.encoding_errors = encoding_errors @@ -326,20 +317,18 @@ class ConnectionPool(object): For example:: - redis://[:password]@localhost:6379/0 - rediss://[:password]@localhost:6379/0 - unix://[:password]@/path/to/socket.sock?db=0 + ledis://localhost:6380/0 + unix:///path/to/socket.sock?db=0 Three URL schemes are supported: - redis:// creates a normal TCP socket connection - rediss:// creates a SSL wrapped TCP socket connection + ledis:// creates a normal TCP socket connection unix:// creates a Unix Domain Socket connection There are several ways to specify a database number. The parse function will return the first specified option: - 1. A ``db`` querystring option, e.g. redis://localhost?db=0 - 2. If using the redis:// scheme, the path argument of the url, e.g. - redis://localhost/0 + 1. A ``db`` querystring option, e.g. ledis://localhost?db=0 + 2. If using the ledis:// scheme, the path argument of the url, e.g. + ledis://localhost/0 3. The ``db`` argument to this function. If none of these options are specified, db=0 is used. @@ -368,10 +357,9 @@ class ConnectionPool(object): if value and len(value) > 0: url_options[name] = value[0] - # We only support redis:// and unix:// schemes. + # We only support ledis:// and unix:// schemes. if url.scheme == 'unix': url_options.update({ - 'password': url.password, 'path': url.path, 'connection_class': UnixDomainSocketConnection, }) @@ -380,7 +368,6 @@ class ConnectionPool(object): url_options.update({ 'host': url.hostname, 'port': int(url.port or 6380), - 'password': url.password, }) # If there's a path argument, use it as the db argument if a @@ -391,9 +378,6 @@ class ConnectionPool(object): except (AttributeError, ValueError): pass - if url.scheme == 'lediss': - url_options['connection_class'] = SSLConnection - # last shot at the db value url_options['db'] = int(url_options.get('db', db or 0)) @@ -453,18 +437,18 @@ class BlockingConnectionPool(object): """ Thread-safe blocking connection pool:: - >>> from redis.client import Redis - >>> client = Redis(connection_pool=BlockingConnectionPool()) + >>> from ledis.client import Ledis + >>> client = Ledis(connection_pool=BlockingConnectionPool()) It performs the same function as the default - ``:py:class: ~redis.connection.ConnectionPool`` implementation, in that, + ``:py:class: ~ledis.connection.ConnectionPool`` implementation, in that, it maintains a pool of reusable connections that can be shared by - multiple redis clients (safely across threads if required). + multiple ledis clients (safely across threads if required). The difference is that, in the event that a client tries to get a connection from the pool when all of connections are in use, rather than - raising a ``:py:class: ~redis.exceptions.ConnectionError`` (as the default - ``:py:class: ~redis.connection.ConnectionPool`` implementation does), it + raising a ``:py:class: ~ledis.exceptions.ConnectionError`` (as the default + ``:py:class: ~ledis.connection.ConnectionPool`` implementation does), it makes the client wait ("blocks") for a specified number of seconds until a connection becomes available. @@ -564,7 +548,7 @@ class BlockingConnectionPool(object): try: connection = self.pool.get(block=True, timeout=self.timeout) except Empty: - # Note that this is not caught by the redis client and will be + # Note that this is not caught by the ledis client and will be # raised unless handled by application code. If you want never to raise ConnectionError("No connection available.") diff --git a/client/openresty/ledis_test.lua b/client/openresty/ledis_test.lua index 0e6d65e..a7f9262 100644 --- a/client/openresty/ledis_test.lua +++ b/client/openresty/ledis_test.lua @@ -11,6 +11,13 @@ --]] local ledis = require "ledis" + +-- if you want to test add_commands, remove the command from +-- `local commands `, e.g.: ``get``, then use ``add_commands`` to add +-- command ``get`` + +--ledis.add_commands("get") + local lds = ledis:new() lds:set_timeout(1000) @@ -823,3 +830,30 @@ if not res then end ngx.say("SELECT should be OK <=>", res) + +-- get_reused_times + +times, err = lds:get_reused_times() +if not times then + ngx.say("failed to get_reused_times", err) + return +end + +ngx.say("time is", times) + +-- local ok, err = lds:set_keepalive(10000, 100) +-- if not ok then +-- ngx.say("failed to set keepalive: ", err) +-- return +-- end +-- ngx.say("set_keepalive success") +-- or just close the connection right away: + + + +local ok, err = lds:close() +if not ok then + ngx.say("failed to close: ", err) + return +end +ngx.say("close success") \ No newline at end of file diff --git a/cmd/ledis-binlog/main.go b/cmd/ledis-binlog/main.go new file mode 100644 index 0000000..7212b0e --- /dev/null +++ b/cmd/ledis-binlog/main.go @@ -0,0 +1,85 @@ +package main + +import ( + "bufio" + "flag" + "fmt" + "github.com/siddontang/ledisdb/ledis" + "os" + "time" +) + +var TimeFormat = "2006-01-02 15:04:05" + +var startDateTime = flag.String("start-datetime", "", + "Start reading the binary log at the first event having a timestamp equal to or later than the datetime argument.") +var stopDateTime = flag.String("stop-datetime", "", + "Stop reading the binary log at the first event having a timestamp equal to or earlier than the datetime argument.") + +var startTime uint32 = 0 +var stopTime uint32 = 0xFFFFFFFF + +func main() { + flag.Usage = func() { + fmt.Fprintf(os.Stderr, "Usage of %s [options] log_file\n", os.Args[0]) + flag.PrintDefaults() + } + + flag.Parse() + + logFile := flag.Arg(0) + f, err := os.Open(logFile) + if err != nil { + println(err.Error()) + return + } + defer f.Close() + + var t time.Time + + if len(*startDateTime) > 0 { + if t, err = time.Parse(TimeFormat, *startDateTime); err != nil { + println("parse start-datetime error: ", err.Error()) + return + } + + startTime = uint32(t.Unix()) + } + + if len(*stopDateTime) > 0 { + if t, err = time.Parse(TimeFormat, *stopDateTime); err != nil { + println("parse stop-datetime error: ", err.Error()) + return + } + + stopTime = uint32(t.Unix()) + } + + rb := bufio.NewReaderSize(f, 4096) + err = ledis.ReadEventFromReader(rb, printEvent) + if err != nil { + println("read event error: ", err.Error()) + return + } +} + +func printEvent(createTime uint32, event []byte) error { + if createTime < startTime || createTime > stopTime { + return nil + } + + t := time.Unix(int64(createTime), 0) + + fmt.Printf("%s ", t.Format(TimeFormat)) + + s, err := ledis.FormatBinLogEvent(event) + if err != nil { + fmt.Printf("%s", err.Error()) + } else { + fmt.Printf(s) + } + + fmt.Printf("\n") + + return nil +} diff --git a/cmd/ledis-dump/main.go b/cmd/ledis-dump/main.go new file mode 100644 index 0000000..bf02ad7 --- /dev/null +++ b/cmd/ledis-dump/main.go @@ -0,0 +1,62 @@ +package main + +import ( + "bufio" + "flag" + "fmt" + "github.com/siddontang/ledisdb/server" + "net" + "os" +) + +var host = flag.String("host", "127.0.0.1", "ledis server host") +var port = flag.Int("port", 6380, "ledis server port") +var sock = flag.String("sock", "", "ledis unix socket domain") +var dumpFile = flag.String("o", "./ledis.dump", "dump file to save") + +var fullSyncCmd = []byte("*1\r\n$8\r\nfullsync\r\n") //fullsync + +func main() { + flag.Parse() + + var c net.Conn + var err error + var f *os.File + + if f, err = os.OpenFile(*dumpFile, os.O_CREATE|os.O_WRONLY, os.ModePerm); err != nil { + println(err.Error()) + return + } + + defer f.Close() + + if len(*sock) != 0 { + c, err = net.Dial("unix", *sock) + } else { + addr := fmt.Sprintf("%s:%d", *host, *port) + c, err = net.Dial("tcp", addr) + } + + if err != nil { + println(err.Error()) + return + } + + defer c.Close() + + println("dump begin") + + if _, err = c.Write(fullSyncCmd); err != nil { + println(err.Error()) + return + } + + rb := bufio.NewReaderSize(c, 16*1024) + + if err = server.ReadBulkTo(rb, f); err != nil { + println(err.Error()) + return + } + + println("dump end") +} diff --git a/cmd/ledis-load/main.go b/cmd/ledis-load/main.go new file mode 100644 index 0000000..da1fad3 --- /dev/null +++ b/cmd/ledis-load/main.go @@ -0,0 +1,86 @@ +package main + +import ( + "encoding/json" + "flag" + "github.com/siddontang/ledisdb/ledis" + "github.com/siddontang/ledisdb/server" + "io/ioutil" + "path" +) + +var configPath = flag.String("config", "/etc/ledis.json", "ledisdb config file") +var dumpPath = flag.String("dump_file", "", "ledisdb dump file") +var masterAddr = flag.String("master_addr", "", + "master addr to set where dump file comes from, if not set correctly, next slaveof may cause fullsync") + +func main() { + flag.Parse() + + if len(*configPath) == 0 { + println("need ledis config file") + return + } + + data, err := ioutil.ReadFile(*configPath) + if err != nil { + println(err.Error()) + return + } + + if len(*dumpPath) == 0 { + println("need dump file") + return + } + + var cfg ledis.Config + if err = json.Unmarshal(data, &cfg); err != nil { + println(err.Error()) + return + } + + if len(cfg.DataDir) == 0 { + println("must set data dir") + return + } + + ldb, err := ledis.Open(&cfg) + if err != nil { + println("ledis open error ", err.Error()) + return + } + + err = loadDump(&cfg, ldb) + ldb.Close() + + if err != nil { + println(err.Error()) + return + } + + println("Load OK") +} + +func loadDump(cfg *ledis.Config, ldb *ledis.Ledis) error { + var err error + if err = ldb.FlushAll(); err != nil { + return err + } + + var head *ledis.MasterInfo + head, err = ldb.LoadDumpFile(*dumpPath) + + if err != nil { + return err + } + + info := new(server.MasterInfo) + + info.Addr = *masterAddr + info.LogFileIndex = head.LogFileIndex + info.LogPos = head.LogPos + + infoFile := path.Join(cfg.DataDir, "master.info") + + return info.Save(infoFile) +} diff --git a/ledis/binlog_util.go b/ledis/binlog_util.go index bc1cd63..766d3e6 100644 --- a/ledis/binlog_util.go +++ b/ledis/binlog_util.go @@ -3,6 +3,8 @@ package ledis import ( "encoding/binary" "errors" + "fmt" + "strconv" ) var ( @@ -60,3 +62,149 @@ func encodeBinLogCommand(commandType uint8, args ...[]byte) []byte { func decodeBinLogCommand(sz []byte) (uint8, [][]byte, error) { return 0, nil, errBinLogCommandType } + +func FormatBinLogEvent(event []byte) (string, error) { + logType := uint8(event[0]) + + var err error + var k []byte + var v []byte + + var buf []byte = make([]byte, 0, 1024) + + switch logType { + case BinLogTypePut: + k, v, err = decodeBinLogPut(event) + buf = append(buf, "PUT "...) + case BinLogTypeDeletion: + k, err = decodeBinLogDelete(event) + buf = append(buf, "DELETE "...) + default: + err = errInvalidBinLogEvent + } + + if err != nil { + return "", err + } + + if buf, err = formatDataKey(buf, k); err != nil { + return "", err + } + + if v != nil && len(v) != 0 { + buf = append(buf, fmt.Sprintf(" %q", v)...) + } + + return String(buf), nil +} + +func formatDataKey(buf []byte, k []byte) ([]byte, error) { + if len(k) < 2 { + return nil, errInvalidBinLogEvent + } + + buf = append(buf, fmt.Sprintf("DB:%2d ", k[0])...) + buf = append(buf, fmt.Sprintf("%s ", TypeName[k[1]])...) + + db := new(DB) + db.index = k[0] + + //to do format at respective place + + switch k[1] { + case KVType: + if key, err := db.decodeKVKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + } + case HashType: + if key, field, err := db.hDecodeHashKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + buf = append(buf, ' ') + buf = strconv.AppendQuote(buf, String(field)) + } + case HSizeType: + if key, err := db.hDecodeSizeKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + } + case ListType: + if key, seq, err := db.lDecodeListKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + buf = append(buf, ' ') + buf = strconv.AppendInt(buf, int64(seq), 10) + } + case LMetaType: + if key, err := db.lDecodeMetaKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + } + case ZSetType: + if key, m, err := db.zDecodeSetKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + buf = append(buf, ' ') + buf = strconv.AppendQuote(buf, String(m)) + } + case ZSizeType: + if key, err := db.zDecodeSizeKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + } + case ZScoreType: + if key, m, score, err := db.zDecodeScoreKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + buf = append(buf, ' ') + buf = strconv.AppendQuote(buf, String(m)) + buf = append(buf, ' ') + buf = strconv.AppendInt(buf, score, 10) + } + case BitType: + if key, seq, err := db.bDecodeBinKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + buf = append(buf, ' ') + buf = strconv.AppendUint(buf, uint64(seq), 10) + } + case BitMetaType: + if key, err := db.bDecodeMetaKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + } + case ExpTimeType: + if tp, key, t, err := db.expDecodeTimeKey(k); err != nil { + return nil, err + } else { + buf = append(buf, TypeName[tp]...) + buf = append(buf, ' ') + buf = strconv.AppendQuote(buf, String(key)) + buf = append(buf, ' ') + buf = strconv.AppendInt(buf, t, 10) + } + case ExpMetaType: + if tp, key, err := db.expDecodeMetaKey(k); err != nil { + return nil, err + } else { + buf = append(buf, TypeName[tp]...) + buf = append(buf, ' ') + buf = strconv.AppendQuote(buf, String(key)) + } + default: + return nil, errInvalidBinLogEvent + } + + return buf, nil +} diff --git a/ledis/const.go b/ledis/const.go index ce5a4e6..9b2daaf 100644 --- a/ledis/const.go +++ b/ledis/const.go @@ -5,22 +5,39 @@ import ( ) const ( - noneType byte = 0 - kvType byte = 1 - hashType byte = 2 - hSizeType byte = 3 - listType byte = 4 - lMetaType byte = 5 - zsetType byte = 6 - zSizeType byte = 7 - zScoreType byte = 8 - binType byte = 9 - binMetaType byte = 10 + NoneType byte = 0 + KVType byte = 1 + HashType byte = 2 + HSizeType byte = 3 + ListType byte = 4 + LMetaType byte = 5 + ZSetType byte = 6 + ZSizeType byte = 7 + ZScoreType byte = 8 + BitType byte = 9 + BitMetaType byte = 10 maxDataType byte = 100 - expTimeType byte = 101 - expMetaType byte = 102 + ExpTimeType byte = 101 + ExpMetaType byte = 102 +) + +var ( + TypeName = map[byte]string{ + KVType: "kv", + HashType: "hash", + HSizeType: "hsize", + ListType: "list", + LMetaType: "lmeta", + ZSetType: "zset", + ZSizeType: "zsize", + ZScoreType: "zscore", + BitType: "bit", + BitMetaType: "bitmeta", + ExpTimeType: "exptime", + ExpMetaType: "expmeta", + } ) const ( diff --git a/ledis/ledis_db.go b/ledis/ledis_db.go index 58b93e8..83819ae 100644 --- a/ledis/ledis_db.go +++ b/ledis/ledis_db.go @@ -26,11 +26,11 @@ func (db *DB) FlushAll() (drop int64, err error) { func (db *DB) newEliminator() *elimination { eliminator := newEliminator(db) - eliminator.regRetireContext(kvType, db.kvTx, db.delete) - eliminator.regRetireContext(listType, db.listTx, db.lDelete) - eliminator.regRetireContext(hashType, db.hashTx, db.hDelete) - eliminator.regRetireContext(zsetType, db.zsetTx, db.zDelete) - eliminator.regRetireContext(binType, db.binTx, db.bDelete) + eliminator.regRetireContext(KVType, db.kvTx, db.delete) + eliminator.regRetireContext(ListType, db.listTx, db.lDelete) + eliminator.regRetireContext(HashType, db.hashTx, db.hDelete) + eliminator.regRetireContext(ZSetType, db.zsetTx, db.zDelete) + eliminator.regRetireContext(BitType, db.binTx, db.bDelete) return eliminator } diff --git a/ledis/replication.go b/ledis/replication.go index e19da6a..bd6c192 100644 --- a/ledis/replication.go +++ b/ledis/replication.go @@ -10,6 +10,10 @@ import ( "os" ) +var ( + ErrSkipEvent = errors.New("skip to next event") +) + var ( errInvalidBinLogEvent = errors.New("invalid binglog event") errInvalidBinLogFile = errors.New("invalid binlog file") @@ -71,7 +75,7 @@ func (l *Ledis) replicateCommandEvent(event []byte) error { return errors.New("command event not supported now") } -func (l *Ledis) ReplicateFromReader(rb io.Reader) error { +func ReadEventFromReader(rb io.Reader, f func(createTime uint32, event []byte) error) error { var createTime uint32 var dataLen uint32 var dataBuf bytes.Buffer @@ -94,9 +98,9 @@ func (l *Ledis) ReplicateFromReader(rb io.Reader) error { return err } - err = l.ReplicateEvent(dataBuf.Bytes()) - if err != nil { - log.Fatal("replication error %s, skip to next", err.Error()) + err = f(createTime, dataBuf.Bytes()) + if err != nil && err != ErrSkipEvent { + return err } dataBuf.Reset() @@ -105,6 +109,19 @@ func (l *Ledis) ReplicateFromReader(rb io.Reader) error { return nil } +func (l *Ledis) ReplicateFromReader(rb io.Reader) error { + f := func(createTime uint32, event []byte) error { + err := l.ReplicateEvent(event) + if err != nil { + log.Fatal("replication error %s, skip to next", err.Error()) + return ErrSkipEvent + } + return nil + } + + return ReadEventFromReader(rb, f) +} + func (l *Ledis) ReplicateFromData(data []byte) error { rb := bytes.NewReader(data) diff --git a/ledis/t_bin.go b/ledis/t_bit.go similarity index 97% rename from ledis/t_bin.go rename to ledis/t_bit.go index 2d62047..4111df8 100644 --- a/ledis/t_bin.go +++ b/ledis/t_bit.go @@ -119,19 +119,27 @@ func (datas segBitInfoArray) Swap(i, j int) { func (db *DB) bEncodeMetaKey(key []byte) []byte { mk := make([]byte, len(key)+2) mk[0] = db.index - mk[1] = binMetaType + mk[1] = BitMetaType copy(mk, key) return mk } +func (db *DB) bDecodeMetaKey(bkey []byte) ([]byte, error) { + if len(bkey) < 2 || bkey[0] != db.index || bkey[1] != BitMetaType { + return nil, errBinKey + } + + return bkey[2:], nil +} + func (db *DB) bEncodeBinKey(key []byte, seq uint32) []byte { bk := make([]byte, len(key)+8) pos := 0 bk[pos] = db.index pos++ - bk[pos] = binType + bk[pos] = BitType pos++ binary.BigEndian.PutUint16(bk[pos:], uint16(len(key))) @@ -355,7 +363,7 @@ func (db *DB) bExpireAt(key []byte, when int64) (int64, error) { if seq, _, err := db.bGetMeta(key); err != nil || seq < 0 { return 0, err } else { - db.expireAt(t, binType, key, when) + db.expireAt(t, BitType, key, when) if err := t.Commit(); err != nil { return 0, err } @@ -407,7 +415,7 @@ func (db *DB) BDelete(key []byte) (drop int64, err error) { defer t.Unlock() drop = db.bDelete(t, key) - db.rmExpire(t, binType, key) + db.rmExpire(t, BitType, key) err = t.Commit() return @@ -736,7 +744,7 @@ func (db *DB) BOperation(op uint8, dstkey []byte, srckeys ...[]byte) (blen int32 // clear the old data in case db.bDelete(t, dstkey) - db.rmExpire(t, binType, dstkey) + db.rmExpire(t, BitType, dstkey) // set data db.bSetMeta(t, dstkey, maxDstSeq, maxDstOff) @@ -786,7 +794,7 @@ func (db *DB) BTTL(key []byte) (int64, error) { return -1, err } - return db.ttl(binType, key) + return db.ttl(BitType, key) } func (db *DB) BPersist(key []byte) (int64, error) { @@ -798,7 +806,7 @@ func (db *DB) BPersist(key []byte) (int64, error) { t.Lock() defer t.Unlock() - n, err := db.rmExpire(t, binType, key) + n, err := db.rmExpire(t, BitType, key) if err != nil { return 0, err } @@ -818,14 +826,14 @@ func (db *DB) bFlush() (drop int64, err error) { minKey := make([]byte, 2) minKey[0] = db.index - minKey[1] = binType + minKey[1] = BitType maxKey := make([]byte, 2) maxKey[0] = db.index - maxKey[1] = binMetaType + 1 + maxKey[1] = BitMetaType + 1 drop, err = db.flushRegion(t, minKey, maxKey) - err = db.expFlush(t, binType) + err = db.expFlush(t, BitType) err = t.Commit() return diff --git a/ledis/t_bin_test.go b/ledis/t_bit_test.go similarity index 100% rename from ledis/t_bin_test.go rename to ledis/t_bit_test.go diff --git a/ledis/t_hash.go b/ledis/t_hash.go index 711d83b..5f6be64 100644 --- a/ledis/t_hash.go +++ b/ledis/t_hash.go @@ -33,14 +33,14 @@ func (db *DB) hEncodeSizeKey(key []byte) []byte { buf := make([]byte, len(key)+2) buf[0] = db.index - buf[1] = hSizeType + buf[1] = HSizeType copy(buf[2:], key) return buf } func (db *DB) hDecodeSizeKey(ek []byte) ([]byte, error) { - if len(ek) < 2 || ek[0] != db.index || ek[1] != hSizeType { + if len(ek) < 2 || ek[0] != db.index || ek[1] != HSizeType { return nil, errHSizeKey } @@ -53,7 +53,7 @@ func (db *DB) hEncodeHashKey(key []byte, field []byte) []byte { pos := 0 buf[pos] = db.index pos++ - buf[pos] = hashType + buf[pos] = HashType pos++ binary.BigEndian.PutUint16(buf[pos:], uint16(len(key))) @@ -70,7 +70,7 @@ func (db *DB) hEncodeHashKey(key []byte, field []byte) []byte { } func (db *DB) hDecodeHashKey(ek []byte) ([]byte, []byte, error) { - if len(ek) < 5 || ek[0] != db.index || ek[1] != hashType { + if len(ek) < 5 || ek[0] != db.index || ek[1] != HashType { return nil, nil, errHashKey } @@ -151,7 +151,7 @@ func (db *DB) hExpireAt(key []byte, when int64) (int64, error) { if hlen, err := db.HLen(key); err != nil || hlen == 0 { return 0, err } else { - db.expireAt(t, hashType, key, when) + db.expireAt(t, HashType, key, when) if err := t.Commit(); err != nil { return 0, err } @@ -304,7 +304,7 @@ func (db *DB) hIncrSize(key []byte, delta int64) (int64, error) { if size <= 0 { size = 0 t.Delete(sk) - db.rmExpire(t, hashType, key) + db.rmExpire(t, HashType, key) } else { t.Put(sk, PutInt64(size)) } @@ -428,7 +428,7 @@ func (db *DB) HClear(key []byte) (int64, error) { defer t.Unlock() num := db.hDelete(t, key) - db.rmExpire(t, hashType, key) + db.rmExpire(t, HashType, key) err := t.Commit() return num, err @@ -445,7 +445,7 @@ func (db *DB) HMclear(keys ...[]byte) (int64, error) { } db.hDelete(t, key) - db.rmExpire(t, hashType, key) + db.rmExpire(t, HashType, key) } err := t.Commit() @@ -455,18 +455,18 @@ func (db *DB) HMclear(keys ...[]byte) (int64, error) { func (db *DB) hFlush() (drop int64, err error) { minKey := make([]byte, 2) minKey[0] = db.index - minKey[1] = hashType + minKey[1] = HashType maxKey := make([]byte, 2) maxKey[0] = db.index - maxKey[1] = hSizeType + 1 + maxKey[1] = HSizeType + 1 t := db.kvTx t.Lock() defer t.Unlock() drop, err = db.flushRegion(t, minKey, maxKey) - err = db.expFlush(t, hashType) + err = db.expFlush(t, HashType) err = t.Commit() return @@ -530,7 +530,7 @@ func (db *DB) HTTL(key []byte) (int64, error) { return -1, err } - return db.ttl(hashType, key) + return db.ttl(HashType, key) } func (db *DB) HPersist(key []byte) (int64, error) { @@ -542,7 +542,7 @@ func (db *DB) HPersist(key []byte) (int64, error) { t.Lock() defer t.Unlock() - n, err := db.rmExpire(t, hashType, key) + n, err := db.rmExpire(t, HashType, key) if err != nil { return 0, err } diff --git a/ledis/t_kv.go b/ledis/t_kv.go index b62700b..116aa87 100644 --- a/ledis/t_kv.go +++ b/ledis/t_kv.go @@ -31,13 +31,13 @@ func checkValueSize(value []byte) error { func (db *DB) encodeKVKey(key []byte) []byte { ek := make([]byte, len(key)+2) ek[0] = db.index - ek[1] = kvType + ek[1] = KVType copy(ek[2:], key) return ek } func (db *DB) decodeKVKey(ek []byte) ([]byte, error) { - if len(ek) < 2 || ek[0] != db.index || ek[1] != kvType { + if len(ek) < 2 || ek[0] != db.index || ek[1] != KVType { return nil, errKVKey } @@ -51,7 +51,7 @@ func (db *DB) encodeKVMinKey() []byte { func (db *DB) encodeKVMaxKey() []byte { ek := db.encodeKVKey(nil) - ek[len(ek)-1] = kvType + 1 + ek[len(ek)-1] = KVType + 1 return ek } @@ -100,7 +100,7 @@ func (db *DB) setExpireAt(key []byte, when int64) (int64, error) { if exist, err := db.Exists(key); err != nil || exist == 0 { return 0, err } else { - db.expireAt(t, kvType, key, when) + db.expireAt(t, KVType, key, when) if err := t.Commit(); err != nil { return 0, err } @@ -132,7 +132,7 @@ func (db *DB) Del(keys ...[]byte) (int64, error) { for i, k := range keys { t.Delete(codedKeys[i]) - db.rmExpire(t, kvType, k) + db.rmExpire(t, KVType, k) } err := t.Commit() @@ -317,7 +317,7 @@ func (db *DB) flush() (drop int64, err error) { defer t.Unlock() drop, err = db.flushRegion(t, minKey, maxKey) - err = db.expFlush(t, kvType) + err = db.expFlush(t, KVType) err = t.Commit() return @@ -382,7 +382,7 @@ func (db *DB) TTL(key []byte) (int64, error) { return -1, err } - return db.ttl(kvType, key) + return db.ttl(KVType, key) } func (db *DB) Persist(key []byte) (int64, error) { @@ -393,7 +393,7 @@ func (db *DB) Persist(key []byte) (int64, error) { t := db.kvTx t.Lock() defer t.Unlock() - n, err := db.rmExpire(t, kvType, key) + n, err := db.rmExpire(t, KVType, key) if err != nil { return 0, err } diff --git a/ledis/t_list.go b/ledis/t_list.go index 39ca6ed..a7ab1ac 100644 --- a/ledis/t_list.go +++ b/ledis/t_list.go @@ -23,14 +23,14 @@ var errListSeq = errors.New("invalid list sequence, overflow") func (db *DB) lEncodeMetaKey(key []byte) []byte { buf := make([]byte, len(key)+2) buf[0] = db.index - buf[1] = lMetaType + buf[1] = LMetaType copy(buf[2:], key) return buf } func (db *DB) lDecodeMetaKey(ek []byte) ([]byte, error) { - if len(ek) < 2 || ek[0] != db.index || ek[1] != lMetaType { + if len(ek) < 2 || ek[0] != db.index || ek[1] != LMetaType { return nil, errLMetaKey } @@ -43,7 +43,7 @@ func (db *DB) lEncodeListKey(key []byte, seq int32) []byte { pos := 0 buf[pos] = db.index pos++ - buf[pos] = listType + buf[pos] = ListType pos++ binary.BigEndian.PutUint16(buf[pos:], uint16(len(key))) @@ -58,7 +58,7 @@ func (db *DB) lEncodeListKey(key []byte, seq int32) []byte { } func (db *DB) lDecodeListKey(ek []byte) (key []byte, seq int32, err error) { - if len(ek) < 8 || ek[0] != db.index || ek[1] != listType { + if len(ek) < 8 || ek[0] != db.index || ek[1] != ListType { err = errListKey return } @@ -84,8 +84,12 @@ func (db *DB) lpush(key []byte, whereSeq int32, args ...[]byte) (int64, error) { var size int32 var err error + t := db.listTx + t.Lock() + defer t.Unlock() + metaKey := db.lEncodeMetaKey(key) - headSeq, tailSeq, size, err = db.lGetMeta(metaKey) + headSeq, tailSeq, size, err = db.lGetMeta(nil, metaKey) if err != nil { return 0, err } @@ -102,10 +106,6 @@ func (db *DB) lpush(key []byte, whereSeq int32, args ...[]byte) (int64, error) { delta = 1 } - t := db.listTx - t.Lock() - defer t.Unlock() - // append elements if size > 0 { seq += delta @@ -148,7 +148,7 @@ func (db *DB) lpop(key []byte, whereSeq int32) ([]byte, error) { var err error metaKey := db.lEncodeMetaKey(key) - headSeq, tailSeq, _, err = db.lGetMeta(metaKey) + headSeq, tailSeq, _, err = db.lGetMeta(nil, metaKey) if err != nil { return nil, err } @@ -175,7 +175,7 @@ func (db *DB) lpop(key []byte, whereSeq int32) ([]byte, error) { t.Delete(itemKey) size := db.lSetMeta(metaKey, headSeq, tailSeq) if size == 0 { - db.rmExpire(t, hashType, key) + db.rmExpire(t, HashType, key) } err = t.Commit() @@ -191,7 +191,10 @@ func (db *DB) lDelete(t *tx, key []byte) int64 { var tailSeq int32 var err error - headSeq, tailSeq, _, err = db.lGetMeta(mk) + it := db.db.NewIterator() + defer it.Close() + + headSeq, tailSeq, _, err = db.lGetMeta(it, mk) if err != nil { return 0 } @@ -200,27 +203,24 @@ func (db *DB) lDelete(t *tx, key []byte) int64 { startKey := db.lEncodeListKey(key, headSeq) stopKey := db.lEncodeListKey(key, tailSeq) - it := db.db.RangeLimitIterator(startKey, stopKey, leveldb.RangeClose, 0, -1) - for ; it.Valid(); it.Next() { - t.Delete(it.Key()) + rit := leveldb.NewRangeIterator(it, &leveldb.Range{startKey, stopKey, leveldb.RangeClose}) + for ; rit.Valid(); rit.Next() { + t.Delete(rit.RawKey()) num++ } - it.Close() t.Delete(mk) return num } -func (db *DB) lGetSeq(key []byte, whereSeq int32) (int64, error) { - ek := db.lEncodeListKey(key, whereSeq) - - return Int64(db.db.Get(ek)) -} - -func (db *DB) lGetMeta(ek []byte) (headSeq int32, tailSeq int32, size int32, err error) { +func (db *DB) lGetMeta(it *leveldb.Iterator, ek []byte) (headSeq int32, tailSeq int32, size int32, err error) { var v []byte - v, err = db.db.Get(ek) + if it != nil { + v = it.Find(ek) + } else { + v, err = db.db.Get(ek) + } if err != nil { return } else if v == nil { @@ -264,7 +264,7 @@ func (db *DB) lExpireAt(key []byte, when int64) (int64, error) { if llen, err := db.LLen(key); err != nil || llen == 0 { return 0, err } else { - db.expireAt(t, listType, key, when) + db.expireAt(t, ListType, key, when) if err := t.Commit(); err != nil { return 0, err } @@ -284,7 +284,10 @@ func (db *DB) LIndex(key []byte, index int32) ([]byte, error) { metaKey := db.lEncodeMetaKey(key) - headSeq, tailSeq, _, err = db.lGetMeta(metaKey) + it := db.db.NewIterator() + defer it.Close() + + headSeq, tailSeq, _, err = db.lGetMeta(it, metaKey) if err != nil { return nil, err } @@ -296,7 +299,9 @@ func (db *DB) LIndex(key []byte, index int32) ([]byte, error) { } sk := db.lEncodeListKey(key, seq) - return db.db.Get(sk) + v := it.Find(sk) + + return v, nil } func (db *DB) LLen(key []byte) (int64, error) { @@ -305,7 +310,7 @@ func (db *DB) LLen(key []byte) (int64, error) { } ek := db.lEncodeMetaKey(key) - _, _, size, err := db.lGetMeta(ek) + _, _, size, err := db.lGetMeta(nil, ek) return int64(size), err } @@ -328,7 +333,10 @@ func (db *DB) LRange(key []byte, start int32, stop int32) ([][]byte, error) { metaKey := db.lEncodeMetaKey(key) - if headSeq, _, llen, err = db.lGetMeta(metaKey); err != nil { + it := db.db.NewIterator() + defer it.Close() + + if headSeq, _, llen, err = db.lGetMeta(it, metaKey); err != nil { return nil, err } @@ -356,12 +364,18 @@ func (db *DB) LRange(key []byte, start int32, stop int32) ([][]byte, error) { v := make([][]byte, 0, limit) startKey := db.lEncodeListKey(key, headSeq) - it := db.db.RangeLimitIterator(startKey, nil, leveldb.RangeClose, 0, int(limit)) - for ; it.Valid(); it.Next() { - v = append(v, it.Value()) - } + rit := leveldb.NewRangeLimitIterator(it, + &leveldb.Range{ + Min: startKey, + Max: nil, + Type: leveldb.RangeClose}, + &leveldb.Limit{ + Offset: 0, + Count: int(limit)}) - it.Close() + for ; rit.Valid(); rit.Next() { + v = append(v, rit.Value()) + } return v, nil } @@ -384,7 +398,7 @@ func (db *DB) LClear(key []byte) (int64, error) { defer t.Unlock() num := db.lDelete(t, key) - db.rmExpire(t, listType, key) + db.rmExpire(t, ListType, key) err := t.Commit() return num, err @@ -401,7 +415,7 @@ func (db *DB) LMclear(keys ...[]byte) (int64, error) { } db.lDelete(t, key) - db.rmExpire(t, listType, key) + db.rmExpire(t, ListType, key) } @@ -412,18 +426,18 @@ func (db *DB) LMclear(keys ...[]byte) (int64, error) { func (db *DB) lFlush() (drop int64, err error) { minKey := make([]byte, 2) minKey[0] = db.index - minKey[1] = listType + minKey[1] = ListType maxKey := make([]byte, 2) maxKey[0] = db.index - maxKey[1] = lMetaType + 1 + maxKey[1] = LMetaType + 1 t := db.listTx t.Lock() defer t.Unlock() drop, err = db.flushRegion(t, minKey, maxKey) - err = db.expFlush(t, listType) + err = db.expFlush(t, ListType) err = t.Commit() return @@ -450,7 +464,7 @@ func (db *DB) LTTL(key []byte) (int64, error) { return -1, err } - return db.ttl(listType, key) + return db.ttl(ListType, key) } func (db *DB) LPersist(key []byte) (int64, error) { @@ -462,7 +476,7 @@ func (db *DB) LPersist(key []byte) (int64, error) { t.Lock() defer t.Unlock() - n, err := db.rmExpire(t, listType, key) + n, err := db.rmExpire(t, ListType, key) if err != nil { return 0, err } diff --git a/ledis/t_ttl.go b/ledis/t_ttl.go index f1614bd..aa34b73 100644 --- a/ledis/t_ttl.go +++ b/ledis/t_ttl.go @@ -26,7 +26,7 @@ func (db *DB) expEncodeTimeKey(dataType byte, key []byte, when int64) []byte { buf := make([]byte, len(key)+11) buf[0] = db.index - buf[1] = expTimeType + buf[1] = ExpTimeType buf[2] = dataType pos := 3 @@ -42,7 +42,7 @@ func (db *DB) expEncodeMetaKey(dataType byte, key []byte) []byte { buf := make([]byte, len(key)+3) buf[0] = db.index - buf[1] = expMetaType + buf[1] = ExpMetaType buf[2] = dataType pos := 3 @@ -52,7 +52,7 @@ func (db *DB) expEncodeMetaKey(dataType byte, key []byte) []byte { } func (db *DB) expDecodeMetaKey(mk []byte) (byte, []byte, error) { - if len(mk) <= 3 || mk[0] != db.index || mk[1] != expMetaType { + if len(mk) <= 3 || mk[0] != db.index || mk[1] != ExpMetaType { return 0, nil, errExpMetaKey } @@ -60,7 +60,7 @@ func (db *DB) expDecodeMetaKey(mk []byte) (byte, []byte, error) { } func (db *DB) expDecodeTimeKey(tk []byte) (byte, []byte, int64, error) { - if len(tk) < 11 || tk[0] != db.index || tk[1] != expTimeType { + if len(tk) < 11 || tk[0] != db.index || tk[1] != ExpTimeType { return 0, nil, 0, errExpTimeKey } @@ -114,12 +114,12 @@ func (db *DB) rmExpire(t *tx, dataType byte, key []byte) (int64, error) { func (db *DB) expFlush(t *tx, dataType byte) (err error) { minKey := make([]byte, 3) minKey[0] = db.index - minKey[1] = expTimeType + minKey[1] = ExpTimeType minKey[2] = dataType maxKey := make([]byte, 3) maxKey[0] = db.index - maxKey[1] = expMetaType + maxKey[1] = ExpMetaType maxKey[2] = dataType + 1 _, err = db.flushRegion(t, minKey, maxKey) @@ -153,7 +153,7 @@ func (eli *elimination) active() { db := eli.db dbGet := db.db.Get - minKey := db.expEncodeTimeKey(noneType, nil, 0) + minKey := db.expEncodeTimeKey(NoneType, nil, 0) maxKey := db.expEncodeTimeKey(maxDataType, nil, now) it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) diff --git a/ledis/t_zset.go b/ledis/t_zset.go index e9b9c4b..4931ded 100644 --- a/ledis/t_zset.go +++ b/ledis/t_zset.go @@ -45,14 +45,14 @@ func checkZSetKMSize(key []byte, member []byte) error { func (db *DB) zEncodeSizeKey(key []byte) []byte { buf := make([]byte, len(key)+2) buf[0] = db.index - buf[1] = zSizeType + buf[1] = ZSizeType copy(buf[2:], key) return buf } func (db *DB) zDecodeSizeKey(ek []byte) ([]byte, error) { - if len(ek) < 2 || ek[0] != db.index || ek[1] != zSizeType { + if len(ek) < 2 || ek[0] != db.index || ek[1] != ZSizeType { return nil, errZSizeKey } @@ -66,7 +66,7 @@ func (db *DB) zEncodeSetKey(key []byte, member []byte) []byte { buf[pos] = db.index pos++ - buf[pos] = zsetType + buf[pos] = ZSetType pos++ binary.BigEndian.PutUint16(buf[pos:], uint16(len(key))) @@ -84,7 +84,7 @@ func (db *DB) zEncodeSetKey(key []byte, member []byte) []byte { } func (db *DB) zDecodeSetKey(ek []byte) ([]byte, []byte, error) { - if len(ek) < 5 || ek[0] != db.index || ek[1] != zsetType { + if len(ek) < 5 || ek[0] != db.index || ek[1] != ZSetType { return nil, nil, errZSetKey } @@ -121,7 +121,7 @@ func (db *DB) zEncodeScoreKey(key []byte, member []byte, score int64) []byte { buf[pos] = db.index pos++ - buf[pos] = zScoreType + buf[pos] = ZScoreType pos++ binary.BigEndian.PutUint16(buf[pos:], uint16(len(key))) @@ -158,7 +158,7 @@ func (db *DB) zEncodeStopScoreKey(key []byte, score int64) []byte { } func (db *DB) zDecodeScoreKey(ek []byte) (key []byte, member []byte, score int64, err error) { - if len(ek) < 14 || ek[0] != db.index || ek[1] != zScoreType { + if len(ek) < 14 || ek[0] != db.index || ek[1] != ZScoreType { err = errZScoreKey return } @@ -260,7 +260,7 @@ func (db *DB) zExpireAt(key []byte, when int64) (int64, error) { if zcnt, err := db.ZCard(key); err != nil || zcnt == 0 { return 0, err } else { - db.expireAt(t, zsetType, key, when) + db.expireAt(t, ZSetType, key, when) if err := t.Commit(); err != nil { return 0, err } @@ -314,7 +314,7 @@ func (db *DB) zIncrSize(t *tx, key []byte, delta int64) (int64, error) { if size <= 0 { size = 0 t.Delete(sk) - db.rmExpire(t, zsetType, key) + db.rmExpire(t, ZSetType, key) } else { t.Put(sk, PutInt64(size)) } @@ -451,37 +451,37 @@ func (db *DB) zrank(key []byte, member []byte, reverse bool) (int64, error) { k := db.zEncodeSetKey(key, member) - if v, err := db.db.Get(k); err != nil { - return 0, err - } else if v == nil { + it := db.db.NewIterator() + defer it.Close() + + if v := it.Find(k); v == nil { return -1, nil } else { - if s, err := Int64(v, err); err != nil { + if s, err := Int64(v, nil); err != nil { return 0, err } else { - var it *leveldb.RangeLimitIterator + var rit *leveldb.RangeLimitIterator sk := db.zEncodeScoreKey(key, member, s) if !reverse { minKey := db.zEncodeStartScoreKey(key, MinScore) - it = db.db.RangeLimitIterator(minKey, sk, leveldb.RangeClose, 0, -1) + + rit = leveldb.NewRangeIterator(it, &leveldb.Range{minKey, sk, leveldb.RangeClose}) } else { maxKey := db.zEncodeStopScoreKey(key, MaxScore) - it = db.db.RevRangeLimitIterator(sk, maxKey, leveldb.RangeClose, 0, -1) + rit = leveldb.NewRevRangeIterator(it, &leveldb.Range{sk, maxKey, leveldb.RangeClose}) } var lastKey []byte = nil var n int64 = 0 - for ; it.Valid(); it.Next() { + for ; rit.Valid(); rit.Next() { n++ - lastKey = it.BufKey(lastKey) + lastKey = rit.BufKey(lastKey) } - it.Close() - if _, m, _, err := db.zDecodeScoreKey(lastKey); err == nil && bytes.Equal(m, member) { n-- return n, nil @@ -734,15 +734,17 @@ func (db *DB) zFlush() (drop int64, err error) { minKey := make([]byte, 2) minKey[0] = db.index - minKey[1] = zsetType + minKey[1] = ZSetType maxKey := make([]byte, 2) maxKey[0] = db.index - maxKey[1] = zScoreType + 1 + maxKey[1] = ZScoreType + 1 it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) + defer it.Close() + for ; it.Valid(); it.Next() { - t.Delete(it.Key()) + t.Delete(it.RawKey()) drop++ if drop&1023 == 0 { if err = t.Commit(); err != nil { @@ -750,9 +752,8 @@ func (db *DB) zFlush() (drop int64, err error) { } } } - it.Close() - db.expFlush(t, zsetType) + db.expFlush(t, ZSetType) err = t.Commit() return @@ -818,7 +819,7 @@ func (db *DB) ZTTL(key []byte) (int64, error) { return -1, err } - return db.ttl(zsetType, key) + return db.ttl(ZSetType, key) } func (db *DB) ZPersist(key []byte) (int64, error) { @@ -830,7 +831,7 @@ func (db *DB) ZPersist(key []byte) (int64, error) { t.Lock() defer t.Unlock() - n, err := db.rmExpire(t, zsetType, key) + n, err := db.rmExpire(t, ZSetType, key) if err != nil { return 0, err } diff --git a/leveldb/iterator.go b/leveldb/iterator.go index 3c3e811..a75e4d7 100644 --- a/leveldb/iterator.go +++ b/leveldb/iterator.go @@ -125,8 +125,10 @@ func (it *Iterator) BufValue(b []byte) []byte { } func (it *Iterator) Close() { - C.leveldb_iter_destroy(it.it) - it.it = nil + if it.it != nil { + C.leveldb_iter_destroy(it.it) + it.it = nil + } } func (it *Iterator) Valid() bool { @@ -274,6 +276,14 @@ func NewRevRangeLimitIterator(i *Iterator, r *Range, l *Limit) *RangeLimitIterat return rangeLimitIterator(i, r, l, IteratorBackward) } +func NewRangeIterator(i *Iterator, r *Range) *RangeLimitIterator { + return rangeLimitIterator(i, r, &Limit{0, -1}, IteratorForward) +} + +func NewRevRangeIterator(i *Iterator, r *Range) *RangeLimitIterator { + return rangeLimitIterator(i, r, &Limit{0, -1}, IteratorBackward) +} + func rangeLimitIterator(i *Iterator, r *Range, l *Limit, direction uint8) *RangeLimitIterator { it := new(RangeLimitIterator) diff --git a/server/client.go b/server/client.go index bb4fe0e..11c9f97 100644 --- a/server/client.go +++ b/server/client.go @@ -81,7 +81,7 @@ func (c *client) run() { } func (c *client) readLine() ([]byte, error) { - return readLine(c.rb) + return ReadLine(c.rb) } //A client sends to the Redis server a RESP Array consisting of just Bulk Strings. diff --git a/server/cmd_bin.go b/server/cmd_bit.go similarity index 100% rename from server/cmd_bin.go rename to server/cmd_bit.go diff --git a/server/cmd_bin_test.go b/server/cmd_bit_test.go similarity index 100% rename from server/cmd_bin_test.go rename to server/cmd_bit_test.go diff --git a/server/replication.go b/server/replication.go index 6c30968..4d185a1 100644 --- a/server/replication.go +++ b/server/replication.go @@ -23,13 +23,55 @@ var ( errConnectMaster = errors.New("connect master error") ) +type MasterInfo struct { + Addr string `json:"addr"` + LogFileIndex int64 `json:"log_file_index"` + LogPos int64 `json:"log_pos"` +} + +func (m *MasterInfo) Save(filePath string) error { + data, err := json.Marshal(m) + if err != nil { + return err + } + + filePathBak := fmt.Sprintf("%s.bak", filePath) + + var fd *os.File + fd, err = os.OpenFile(filePathBak, os.O_CREATE|os.O_WRONLY, os.ModePerm) + if err != nil { + return err + } + + if _, err = fd.Write(data); err != nil { + fd.Close() + return err + } + + fd.Close() + return os.Rename(filePathBak, filePath) +} + +func (m *MasterInfo) Load(filePath string) error { + data, err := ioutil.ReadFile(filePath) + if err != nil { + if os.IsNotExist(err) { + return nil + } else { + return err + } + } + + if err = json.Unmarshal(data, m); err != nil { + return err + } + + return nil +} + type master struct { sync.Mutex - addr string `json:"addr"` - logFileIndex int64 `json:"log_file_index"` - logPos int64 `json:"log_pos"` - c net.Conn rb *bufio.Reader @@ -37,8 +79,9 @@ type master struct { quit chan struct{} - infoName string - infoNameBak string + infoName string + + info *MasterInfo wg sync.WaitGroup @@ -52,12 +95,13 @@ func newMaster(app *App) *master { m.app = app m.infoName = path.Join(m.app.cfg.DataDir, "master.info") - m.infoNameBak = fmt.Sprintf("%s.bak", m.infoName) m.quit = make(chan struct{}, 1) m.compressBuf = make([]byte, 256) + m.info = new(MasterInfo) + //if load error, we will start a fullsync later m.loadInfo() @@ -79,53 +123,15 @@ func (m *master) Close() { } func (m *master) loadInfo() error { - data, err := ioutil.ReadFile(m.infoName) - if err != nil { - if os.IsNotExist(err) { - return nil - } else { - return err - } - } - - if err = json.Unmarshal(data, m); err != nil { - return err - } - - return nil + return m.info.Load(m.infoName) } func (m *master) saveInfo() error { - data, err := json.Marshal(struct { - Addr string `json:"addr"` - LogFileIndex int64 `json:"log_file_index"` - LogPos int64 `json:"log_pos"` - }{ - m.addr, - m.logFileIndex, - m.logPos, - }) - if err != nil { - return err - } - - var fd *os.File - fd, err = os.OpenFile(m.infoNameBak, os.O_CREATE|os.O_WRONLY, os.ModePerm) - if err != nil { - return err - } - - if _, err = fd.Write(data); err != nil { - fd.Close() - return err - } - - fd.Close() - return os.Rename(m.infoNameBak, m.infoName) + return m.info.Save(m.infoName) } func (m *master) connect() error { - if len(m.addr) == 0 { + if len(m.info.Addr) == 0 { return fmt.Errorf("no assign master addr") } @@ -134,7 +140,7 @@ func (m *master) connect() error { m.c = nil } - if c, err := net.Dial("tcp", m.addr); err != nil { + if c, err := net.Dial("tcp", m.info.Addr); err != nil { return err } else { m.c = c @@ -145,9 +151,9 @@ func (m *master) connect() error { } func (m *master) resetInfo(addr string) { - m.addr = addr - m.logFileIndex = 0 - m.logPos = 0 + m.info.Addr = addr + m.info.LogFileIndex = 0 + m.info.LogPos = 0 } func (m *master) stopReplication() error { @@ -165,7 +171,7 @@ func (m *master) startReplication(masterAddr string) error { //stop last replcation, if avaliable m.Close() - if masterAddr != m.addr { + if masterAddr != m.info.Addr { m.resetInfo(masterAddr) if err := m.saveInfo(); err != nil { log.Error("save master info error %s", err.Error()) @@ -189,20 +195,20 @@ func (m *master) runReplication() { return default: if err := m.connect(); err != nil { - log.Error("connect master %s error %s, try 2s later", m.addr, err.Error()) + log.Error("connect master %s error %s, try 2s later", m.info.Addr, err.Error()) time.Sleep(2 * time.Second) continue } } - if m.logFileIndex == 0 { + if m.info.LogFileIndex == 0 { //try a fullsync if err := m.fullSync(); err != nil { log.Warn("full sync error %s", err.Error()) return } - if m.logFileIndex == 0 { + if m.info.LogFileIndex == 0 { //master not support binlog, we cannot sync, so stop replication m.stopReplication() return @@ -211,14 +217,14 @@ func (m *master) runReplication() { for { for { - lastIndex := m.logFileIndex - lastPos := m.logPos + lastIndex := m.info.LogFileIndex + lastPos := m.info.LogPos if err := m.sync(); err != nil { log.Warn("sync error %s", err.Error()) return } - if m.logFileIndex == lastIndex && m.logPos == lastPos { + if m.info.LogFileIndex == lastIndex && m.info.LogPos == lastPos { //sync no data, wait 1s and retry break } @@ -254,7 +260,7 @@ func (m *master) fullSync() error { defer os.Remove(dumpPath) - err = readBulkTo(m.rb, f) + err = ReadBulkTo(m.rb, f) f.Close() if err != nil { log.Error("read dump data error %s", err.Error()) @@ -273,15 +279,15 @@ func (m *master) fullSync() error { return err } - m.logFileIndex = head.LogFileIndex - m.logPos = head.LogPos + m.info.LogFileIndex = head.LogFileIndex + m.info.LogPos = head.LogPos return m.saveInfo() } func (m *master) sync() error { - logIndexStr := strconv.FormatInt(m.logFileIndex, 10) - logPosStr := strconv.FormatInt(m.logPos, 10) + logIndexStr := strconv.FormatInt(m.info.LogFileIndex, 10) + logPosStr := strconv.FormatInt(m.info.LogPos, 10) cmd := ledis.Slice(fmt.Sprintf(syncCmdFormat, len(logIndexStr), logIndexStr, len(logPosStr), logPosStr)) @@ -291,7 +297,7 @@ func (m *master) sync() error { m.syncBuf.Reset() - err := readBulkTo(m.rb, &m.syncBuf) + err := ReadBulkTo(m.rb, &m.syncBuf) if err != nil { return err } @@ -308,14 +314,14 @@ func (m *master) sync() error { return fmt.Errorf("invalid sync data len %d", len(buf)) } - m.logFileIndex = int64(binary.BigEndian.Uint64(buf[0:8])) - m.logPos = int64(binary.BigEndian.Uint64(buf[8:16])) + m.info.LogFileIndex = int64(binary.BigEndian.Uint64(buf[0:8])) + m.info.LogPos = int64(binary.BigEndian.Uint64(buf[8:16])) - if m.logFileIndex == 0 { + if m.info.LogFileIndex == 0 { //master now not support binlog, stop replication m.stopReplication() return nil - } else if m.logFileIndex == -1 { + } else if m.info.LogFileIndex == -1 { //-1 means than binlog index and pos are lost, we must start a full sync instead return m.fullSync() } diff --git a/server/util.go b/server/util.go index 9afee4e..49c20ba 100644 --- a/server/util.go +++ b/server/util.go @@ -14,7 +14,7 @@ var ( errLineFormat = errors.New("bad response line format") ) -func readLine(rb *bufio.Reader) ([]byte, error) { +func ReadLine(rb *bufio.Reader) ([]byte, error) { p, err := rb.ReadSlice('\n') if err != nil { @@ -27,8 +27,8 @@ func readLine(rb *bufio.Reader) ([]byte, error) { return p[:i], nil } -func readBulkTo(rb *bufio.Reader, w io.Writer) error { - l, err := readLine(rb) +func ReadBulkTo(rb *bufio.Reader, w io.Writer) error { + l, err := ReadLine(rb) if len(l) == 0 { return errBulkFormat } else if l[0] == '$' { @@ -43,7 +43,7 @@ func readBulkTo(rb *bufio.Reader, w io.Writer) error { return err } - if l, err = readLine(rb); err != nil { + if l, err = ReadLine(rb); err != nil { return err } else if len(l) != 0 { return errBulkFormat