Merge branch 'develop'

This commit is contained in:
siddontang 2014-07-11 17:36:48 +08:00
commit 4fdd184773
26 changed files with 767 additions and 337 deletions

View File

@ -1,6 +1,17 @@
# ledisdb # LedisDB
Ledisdb is a high performance nosql like redis based on leveldb written by go. It's supports some advanced data structure like kv, list, hash and zset. Ledisdb is a high performance NoSQL like Redis based on LevelDB written by go. It supports some advanced data structure like kv, list, hash and zset, and may be alternative for Redis.
## Features
+ Rich advanced data structure: KV, List, Hash, ZSet, Bit.
+ Uses leveldb to store lots of data, over the memory limit.
+ Supports expiration and ttl.
+ Redis clients, like redis-cli, are supported directly.
+ Multi client API supports, including Golang, Python, Lua(Openresty).
+ Easily to embed in Golang application.
+ Replication to guarantee data safe.
+ Supplies tools to load, dump, repair database.
## Build and Install ## Build and Install

View File

@ -1,94 +0,0 @@
# ledisdb
ledisdb是一个用go实现的类似redis的高性能nosql数据库底层基于leveldb实现。提供了kvlisthash以及zset几种数据结构的支持。
最开始源于[ssdb](https://github.com/ideawu/ssdb)在使用了一段时间之后因为兴趣的原因决定用go实现一个。
## 编译
+ 创建一个工作目录并check ledisdb源码
mkdir $WORKSPACE
cd $WORKSPACE
git clone git@github.com:siddontang/ledisdb.git src/github.com/siddontang/ledisdb
cd src/github.com/siddontang/ledisdb
+ 安装leveldb以及snappy如果你已经安装忽略
我提供了一个简单的脚本进行leveldb的安装你可以直接在shell中输入
sh build_leveldb.sh
默认该脚本会将leveldb以及snappy安装到/usr/local/leveldb以及/usr/local/snappy目录
+ 在dev.sh里面设置LEVELDB_DIR以及SNAPPY_DIR为实际的安装路径默认为/usr/local/leveldb以及/usr/local/snappy
+ 运行bootstrap.sh构建ledisdb go的依赖库
. ./bootstap.sh 或者 source ./bootstrap.sh
+ 运行dev.sh
. ./dev.sh 或者 source ./dev.sh
+ 编译安装ledisdb
go install ./...
## 运行
./ledis-server -config=/etc/ledis.json
//another shell
ledis-cli -p 6380
ledis 127.0.0.1:6380> set a 1
OK
ledis 127.0.0.1:6380> get a
"1"
## 嵌入库
import "github.com/siddontang/ledisdb/ledis"
l, _ := ledis.Open(cfg)
db, _ := l.Select(0)
db.Set(key, value)
db.Get(key)
## Benchmark
可以通过查看benchmark.md获取最新的性能测试结果
## Replication
通过配置或者运行时输入slaveof开启slave的replication功能
ledis-cli -p 6381
ledis 127.0.0.1:6381> slaveof 127.0.0.1:6380
OK
## Todo
+ Admin
## GoDoc
[![GoDoc](https://godoc.org/github.com/siddontang/ledisdb?status.png)](https://godoc.org/github.com/siddontang/ledisdb)
## Commands
一些命令的解释在[这里](https://github.com/siddontang/ledisdb/wiki/Commands), 后续会不断加入。
## 感谢
Gmail: cenqichao@gmail.com
Gmail: chendahui007@gmail.com
## 联系我
Gmail: siddontang@gmail.com

View File

@ -47,5 +47,46 @@ For full API reference, please visit [rtfd](http://ledis-py.readthedocs.org/).
### Connection Pools ### Connection Pools
### Connnections Behind the scenes, ledis-py uses a connection pool to manage connections to a Ledis server. By default, each Ledis instance you create will in turn create its own connection pool. You can override this behavior and use an existing connection pool by passing an already created connection pool instance to the connection_pool argument of the Ledis class. You may choose to do this in order to implement client side sharding or have finer grain control of how connections are managed.
```
>>> pool = ledis.ConnectionPool(host='localhost', port=6380, db=0)
>>> l = ledis.Ledis(connection_pool=pool)
```
### Connections
ConnectionPools manage a set of Connection instances. ledis-py ships with two types of Connections. The default, Connection, is a normal TCP socket based connection. The UnixDomainSocketConnection allows for clients running on the same device as the server to connect via a unix domain socket. To use a UnixDomainSocketConnection connection, simply pass the unix_socket_path argument, which is a string to the unix domain socket file. Additionally, make sure the unixsocket parameter is defined in your `ledis.json` file. e.g.:
```
{
"addr": "/tmp/ledis.sock",
...
}
```
```
>>> l = ledis.Ledis(unix_socket_path='/tmp/ledis.sock')
```
You can create your own Connection subclasses as well. This may be useful if you want to control the socket behavior within an async framework. To instantiate a client class using your own connection, you need to create a connection pool, passing your class to the connection_class argument. Other keyword parameters your pass to the pool will be passed to the class specified during initialization.
```
>>> pool = ledis.ConnectionPool(connection_class=YourConnectionClass,
your_arg='...', ...)
```
e.g.:
```
>>> from ledis import UnixDomainSocketConnection
>>> pool = ledis.ConnectionPool(connection_class=UnixDomainSocketConnection, path='/tmp/ledis.sock')
```
## Response Callbacks
The client class uses a set of callbacks to cast Ledis responses to the appropriate Python type. There are a number of these callbacks defined on the Ledis client class in a dictionary called RESPONSE_CALLBACKS.
Custom callbacks can be added on a per-instance basis using the `set_response_callback` method. This method accepts two arguments: a command name and the callback. Callbacks added in this manner are only valid on the instance the callback is added to. If you want to define or override a callback globally, you should make a subclass of the Ledis client and add your callback to its RESPONSE_CALLBACKS class dictionary.
Response callbacks take at least one parameter: the response from the Ledis server. Keyword arguments may also be accepted in order to further control how to interpret the response. These keyword arguments are specified during the command's call to execute_command. The ZRANGE implementation demonstrates the use of response callback keyword arguments with its "withscores" argument.

View File

@ -112,14 +112,14 @@ class Ledis(object):
For example:: For example::
redis://[:password]@localhost:6380/0 ledis://localhost:6380/0
unix://[:password]@/path/to/socket.sock?db=0 unix:///path/to/socket.sock?db=0
There are several ways to specify a database number. The parse function There are several ways to specify a database number. The parse function
will return the first specified option: will return the first specified option:
1. A ``db`` querystring option, e.g. redis://localhost?db=0 1. A ``db`` querystring option, e.g. ledis://localhost?db=0
2. If using the redis:// scheme, the path argument of the url, e.g. 2. If using the ledis:// scheme, the path argument of the url, e.g.
redis://localhost/0 ledis://localhost/0
3. The ``db`` argument to this function. 3. The ``db`` argument to this function.
If none of these options are specified, db=0 is used. If none of these options are specified, db=0 is used.

View File

@ -145,7 +145,7 @@ DefaultParser = PythonParser
class Connection(object): class Connection(object):
"Manages TCP communication to and from a Ledis server" "Manages TCP communication to and from a Ledis server"
def __init__(self, host='localhost', port=6380, db=0, password=None, def __init__(self, host='localhost', port=6380, db=0,
socket_timeout=None, encoding='utf-8', socket_timeout=None, encoding='utf-8',
encoding_errors='strict', decode_responses=False, encoding_errors='strict', decode_responses=False,
parser_class=DefaultParser): parser_class=DefaultParser):
@ -153,7 +153,6 @@ class Connection(object):
self.host = host self.host = host
self.port = port self.port = port
self.db = db self.db = db
self.password = password
self.socket_timeout = socket_timeout self.socket_timeout = socket_timeout
self.encoding = encoding self.encoding = encoding
self.encoding_errors = encoding_errors self.encoding_errors = encoding_errors
@ -201,12 +200,6 @@ class Connection(object):
"Initialize the connection, authenticate and select a database" "Initialize the connection, authenticate and select a database"
self._parser.on_connect(self) self._parser.on_connect(self)
# if a password is specified, authenticate
if self.password:
self.send_command('AUTH', self.password)
if nativestr(self.read_response()) != 'OK':
raise AuthenticationError('Invalid Password')
# if a database is specified, switch to it # if a database is specified, switch to it
if self.db: if self.db:
self.send_command('SELECT', self.db) self.send_command('SELECT', self.db)
@ -283,14 +276,12 @@ class Connection(object):
class UnixDomainSocketConnection(Connection): class UnixDomainSocketConnection(Connection):
def __init__(self, path='', db=0, password=None, def __init__(self, path='', db=0, socket_timeout=None, encoding='utf-8',
socket_timeout=None, encoding='utf-8',
encoding_errors='strict', decode_responses=False, encoding_errors='strict', decode_responses=False,
parser_class=DefaultParser): parser_class=DefaultParser):
self.pid = os.getpid() self.pid = os.getpid()
self.path = path self.path = path
self.db = db self.db = db
self.password = password
self.socket_timeout = socket_timeout self.socket_timeout = socket_timeout
self.encoding = encoding self.encoding = encoding
self.encoding_errors = encoding_errors self.encoding_errors = encoding_errors
@ -326,20 +317,18 @@ class ConnectionPool(object):
For example:: For example::
redis://[:password]@localhost:6379/0 ledis://localhost:6380/0
rediss://[:password]@localhost:6379/0 unix:///path/to/socket.sock?db=0
unix://[:password]@/path/to/socket.sock?db=0
Three URL schemes are supported: Three URL schemes are supported:
redis:// creates a normal TCP socket connection ledis:// creates a normal TCP socket connection
rediss:// creates a SSL wrapped TCP socket connection
unix:// creates a Unix Domain Socket connection unix:// creates a Unix Domain Socket connection
There are several ways to specify a database number. The parse function There are several ways to specify a database number. The parse function
will return the first specified option: will return the first specified option:
1. A ``db`` querystring option, e.g. redis://localhost?db=0 1. A ``db`` querystring option, e.g. ledis://localhost?db=0
2. If using the redis:// scheme, the path argument of the url, e.g. 2. If using the ledis:// scheme, the path argument of the url, e.g.
redis://localhost/0 ledis://localhost/0
3. The ``db`` argument to this function. 3. The ``db`` argument to this function.
If none of these options are specified, db=0 is used. If none of these options are specified, db=0 is used.
@ -368,10 +357,9 @@ class ConnectionPool(object):
if value and len(value) > 0: if value and len(value) > 0:
url_options[name] = value[0] url_options[name] = value[0]
# We only support redis:// and unix:// schemes. # We only support ledis:// and unix:// schemes.
if url.scheme == 'unix': if url.scheme == 'unix':
url_options.update({ url_options.update({
'password': url.password,
'path': url.path, 'path': url.path,
'connection_class': UnixDomainSocketConnection, 'connection_class': UnixDomainSocketConnection,
}) })
@ -380,7 +368,6 @@ class ConnectionPool(object):
url_options.update({ url_options.update({
'host': url.hostname, 'host': url.hostname,
'port': int(url.port or 6380), 'port': int(url.port or 6380),
'password': url.password,
}) })
# If there's a path argument, use it as the db argument if a # If there's a path argument, use it as the db argument if a
@ -391,9 +378,6 @@ class ConnectionPool(object):
except (AttributeError, ValueError): except (AttributeError, ValueError):
pass pass
if url.scheme == 'lediss':
url_options['connection_class'] = SSLConnection
# last shot at the db value # last shot at the db value
url_options['db'] = int(url_options.get('db', db or 0)) url_options['db'] = int(url_options.get('db', db or 0))
@ -453,18 +437,18 @@ class BlockingConnectionPool(object):
""" """
Thread-safe blocking connection pool:: Thread-safe blocking connection pool::
>>> from redis.client import Redis >>> from ledis.client import Ledis
>>> client = Redis(connection_pool=BlockingConnectionPool()) >>> client = Ledis(connection_pool=BlockingConnectionPool())
It performs the same function as the default It performs the same function as the default
``:py:class: ~redis.connection.ConnectionPool`` implementation, in that, ``:py:class: ~ledis.connection.ConnectionPool`` implementation, in that,
it maintains a pool of reusable connections that can be shared by it maintains a pool of reusable connections that can be shared by
multiple redis clients (safely across threads if required). multiple ledis clients (safely across threads if required).
The difference is that, in the event that a client tries to get a The difference is that, in the event that a client tries to get a
connection from the pool when all of connections are in use, rather than connection from the pool when all of connections are in use, rather than
raising a ``:py:class: ~redis.exceptions.ConnectionError`` (as the default raising a ``:py:class: ~ledis.exceptions.ConnectionError`` (as the default
``:py:class: ~redis.connection.ConnectionPool`` implementation does), it ``:py:class: ~ledis.connection.ConnectionPool`` implementation does), it
makes the client wait ("blocks") for a specified number of seconds until makes the client wait ("blocks") for a specified number of seconds until
a connection becomes available. a connection becomes available.
@ -564,7 +548,7 @@ class BlockingConnectionPool(object):
try: try:
connection = self.pool.get(block=True, timeout=self.timeout) connection = self.pool.get(block=True, timeout=self.timeout)
except Empty: except Empty:
# Note that this is not caught by the redis client and will be # Note that this is not caught by the ledis client and will be
# raised unless handled by application code. If you want never to # raised unless handled by application code. If you want never to
raise ConnectionError("No connection available.") raise ConnectionError("No connection available.")

View File

@ -11,6 +11,13 @@
--]] --]]
local ledis = require "ledis" local ledis = require "ledis"
-- if you want to test add_commands, remove the command from
-- `local commands `, e.g.: ``get``, then use ``add_commands`` to add
-- command ``get``
--ledis.add_commands("get")
local lds = ledis:new() local lds = ledis:new()
lds:set_timeout(1000) lds:set_timeout(1000)
@ -823,3 +830,30 @@ if not res then
end end
ngx.say("SELECT should be OK <=>", res) ngx.say("SELECT should be OK <=>", res)
-- get_reused_times
times, err = lds:get_reused_times()
if not times then
ngx.say("failed to get_reused_times", err)
return
end
ngx.say("time is", times)
-- local ok, err = lds:set_keepalive(10000, 100)
-- if not ok then
-- ngx.say("failed to set keepalive: ", err)
-- return
-- end
-- ngx.say("set_keepalive success")
-- or just close the connection right away:
local ok, err = lds:close()
if not ok then
ngx.say("failed to close: ", err)
return
end
ngx.say("close success")

85
cmd/ledis-binlog/main.go Normal file
View File

@ -0,0 +1,85 @@
package main
import (
"bufio"
"flag"
"fmt"
"github.com/siddontang/ledisdb/ledis"
"os"
"time"
)
var TimeFormat = "2006-01-02 15:04:05"
var startDateTime = flag.String("start-datetime", "",
"Start reading the binary log at the first event having a timestamp equal to or later than the datetime argument.")
var stopDateTime = flag.String("stop-datetime", "",
"Stop reading the binary log at the first event having a timestamp equal to or earlier than the datetime argument.")
var startTime uint32 = 0
var stopTime uint32 = 0xFFFFFFFF
func main() {
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "Usage of %s [options] log_file\n", os.Args[0])
flag.PrintDefaults()
}
flag.Parse()
logFile := flag.Arg(0)
f, err := os.Open(logFile)
if err != nil {
println(err.Error())
return
}
defer f.Close()
var t time.Time
if len(*startDateTime) > 0 {
if t, err = time.Parse(TimeFormat, *startDateTime); err != nil {
println("parse start-datetime error: ", err.Error())
return
}
startTime = uint32(t.Unix())
}
if len(*stopDateTime) > 0 {
if t, err = time.Parse(TimeFormat, *stopDateTime); err != nil {
println("parse stop-datetime error: ", err.Error())
return
}
stopTime = uint32(t.Unix())
}
rb := bufio.NewReaderSize(f, 4096)
err = ledis.ReadEventFromReader(rb, printEvent)
if err != nil {
println("read event error: ", err.Error())
return
}
}
func printEvent(createTime uint32, event []byte) error {
if createTime < startTime || createTime > stopTime {
return nil
}
t := time.Unix(int64(createTime), 0)
fmt.Printf("%s ", t.Format(TimeFormat))
s, err := ledis.FormatBinLogEvent(event)
if err != nil {
fmt.Printf("%s", err.Error())
} else {
fmt.Printf(s)
}
fmt.Printf("\n")
return nil
}

62
cmd/ledis-dump/main.go Normal file
View File

@ -0,0 +1,62 @@
package main
import (
"bufio"
"flag"
"fmt"
"github.com/siddontang/ledisdb/server"
"net"
"os"
)
var host = flag.String("host", "127.0.0.1", "ledis server host")
var port = flag.Int("port", 6380, "ledis server port")
var sock = flag.String("sock", "", "ledis unix socket domain")
var dumpFile = flag.String("o", "./ledis.dump", "dump file to save")
var fullSyncCmd = []byte("*1\r\n$8\r\nfullsync\r\n") //fullsync
func main() {
flag.Parse()
var c net.Conn
var err error
var f *os.File
if f, err = os.OpenFile(*dumpFile, os.O_CREATE|os.O_WRONLY, os.ModePerm); err != nil {
println(err.Error())
return
}
defer f.Close()
if len(*sock) != 0 {
c, err = net.Dial("unix", *sock)
} else {
addr := fmt.Sprintf("%s:%d", *host, *port)
c, err = net.Dial("tcp", addr)
}
if err != nil {
println(err.Error())
return
}
defer c.Close()
println("dump begin")
if _, err = c.Write(fullSyncCmd); err != nil {
println(err.Error())
return
}
rb := bufio.NewReaderSize(c, 16*1024)
if err = server.ReadBulkTo(rb, f); err != nil {
println(err.Error())
return
}
println("dump end")
}

86
cmd/ledis-load/main.go Normal file
View File

@ -0,0 +1,86 @@
package main
import (
"encoding/json"
"flag"
"github.com/siddontang/ledisdb/ledis"
"github.com/siddontang/ledisdb/server"
"io/ioutil"
"path"
)
var configPath = flag.String("config", "/etc/ledis.json", "ledisdb config file")
var dumpPath = flag.String("dump_file", "", "ledisdb dump file")
var masterAddr = flag.String("master_addr", "",
"master addr to set where dump file comes from, if not set correctly, next slaveof may cause fullsync")
func main() {
flag.Parse()
if len(*configPath) == 0 {
println("need ledis config file")
return
}
data, err := ioutil.ReadFile(*configPath)
if err != nil {
println(err.Error())
return
}
if len(*dumpPath) == 0 {
println("need dump file")
return
}
var cfg ledis.Config
if err = json.Unmarshal(data, &cfg); err != nil {
println(err.Error())
return
}
if len(cfg.DataDir) == 0 {
println("must set data dir")
return
}
ldb, err := ledis.Open(&cfg)
if err != nil {
println("ledis open error ", err.Error())
return
}
err = loadDump(&cfg, ldb)
ldb.Close()
if err != nil {
println(err.Error())
return
}
println("Load OK")
}
func loadDump(cfg *ledis.Config, ldb *ledis.Ledis) error {
var err error
if err = ldb.FlushAll(); err != nil {
return err
}
var head *ledis.MasterInfo
head, err = ldb.LoadDumpFile(*dumpPath)
if err != nil {
return err
}
info := new(server.MasterInfo)
info.Addr = *masterAddr
info.LogFileIndex = head.LogFileIndex
info.LogPos = head.LogPos
infoFile := path.Join(cfg.DataDir, "master.info")
return info.Save(infoFile)
}

View File

@ -3,6 +3,8 @@ package ledis
import ( import (
"encoding/binary" "encoding/binary"
"errors" "errors"
"fmt"
"strconv"
) )
var ( var (
@ -60,3 +62,149 @@ func encodeBinLogCommand(commandType uint8, args ...[]byte) []byte {
func decodeBinLogCommand(sz []byte) (uint8, [][]byte, error) { func decodeBinLogCommand(sz []byte) (uint8, [][]byte, error) {
return 0, nil, errBinLogCommandType return 0, nil, errBinLogCommandType
} }
func FormatBinLogEvent(event []byte) (string, error) {
logType := uint8(event[0])
var err error
var k []byte
var v []byte
var buf []byte = make([]byte, 0, 1024)
switch logType {
case BinLogTypePut:
k, v, err = decodeBinLogPut(event)
buf = append(buf, "PUT "...)
case BinLogTypeDeletion:
k, err = decodeBinLogDelete(event)
buf = append(buf, "DELETE "...)
default:
err = errInvalidBinLogEvent
}
if err != nil {
return "", err
}
if buf, err = formatDataKey(buf, k); err != nil {
return "", err
}
if v != nil && len(v) != 0 {
buf = append(buf, fmt.Sprintf(" %q", v)...)
}
return String(buf), nil
}
func formatDataKey(buf []byte, k []byte) ([]byte, error) {
if len(k) < 2 {
return nil, errInvalidBinLogEvent
}
buf = append(buf, fmt.Sprintf("DB:%2d ", k[0])...)
buf = append(buf, fmt.Sprintf("%s ", TypeName[k[1]])...)
db := new(DB)
db.index = k[0]
//to do format at respective place
switch k[1] {
case KVType:
if key, err := db.decodeKVKey(k); err != nil {
return nil, err
} else {
buf = strconv.AppendQuote(buf, String(key))
}
case HashType:
if key, field, err := db.hDecodeHashKey(k); err != nil {
return nil, err
} else {
buf = strconv.AppendQuote(buf, String(key))
buf = append(buf, ' ')
buf = strconv.AppendQuote(buf, String(field))
}
case HSizeType:
if key, err := db.hDecodeSizeKey(k); err != nil {
return nil, err
} else {
buf = strconv.AppendQuote(buf, String(key))
}
case ListType:
if key, seq, err := db.lDecodeListKey(k); err != nil {
return nil, err
} else {
buf = strconv.AppendQuote(buf, String(key))
buf = append(buf, ' ')
buf = strconv.AppendInt(buf, int64(seq), 10)
}
case LMetaType:
if key, err := db.lDecodeMetaKey(k); err != nil {
return nil, err
} else {
buf = strconv.AppendQuote(buf, String(key))
}
case ZSetType:
if key, m, err := db.zDecodeSetKey(k); err != nil {
return nil, err
} else {
buf = strconv.AppendQuote(buf, String(key))
buf = append(buf, ' ')
buf = strconv.AppendQuote(buf, String(m))
}
case ZSizeType:
if key, err := db.zDecodeSizeKey(k); err != nil {
return nil, err
} else {
buf = strconv.AppendQuote(buf, String(key))
}
case ZScoreType:
if key, m, score, err := db.zDecodeScoreKey(k); err != nil {
return nil, err
} else {
buf = strconv.AppendQuote(buf, String(key))
buf = append(buf, ' ')
buf = strconv.AppendQuote(buf, String(m))
buf = append(buf, ' ')
buf = strconv.AppendInt(buf, score, 10)
}
case BitType:
if key, seq, err := db.bDecodeBinKey(k); err != nil {
return nil, err
} else {
buf = strconv.AppendQuote(buf, String(key))
buf = append(buf, ' ')
buf = strconv.AppendUint(buf, uint64(seq), 10)
}
case BitMetaType:
if key, err := db.bDecodeMetaKey(k); err != nil {
return nil, err
} else {
buf = strconv.AppendQuote(buf, String(key))
}
case ExpTimeType:
if tp, key, t, err := db.expDecodeTimeKey(k); err != nil {
return nil, err
} else {
buf = append(buf, TypeName[tp]...)
buf = append(buf, ' ')
buf = strconv.AppendQuote(buf, String(key))
buf = append(buf, ' ')
buf = strconv.AppendInt(buf, t, 10)
}
case ExpMetaType:
if tp, key, err := db.expDecodeMetaKey(k); err != nil {
return nil, err
} else {
buf = append(buf, TypeName[tp]...)
buf = append(buf, ' ')
buf = strconv.AppendQuote(buf, String(key))
}
default:
return nil, errInvalidBinLogEvent
}
return buf, nil
}

View File

@ -5,22 +5,39 @@ import (
) )
const ( const (
noneType byte = 0 NoneType byte = 0
kvType byte = 1 KVType byte = 1
hashType byte = 2 HashType byte = 2
hSizeType byte = 3 HSizeType byte = 3
listType byte = 4 ListType byte = 4
lMetaType byte = 5 LMetaType byte = 5
zsetType byte = 6 ZSetType byte = 6
zSizeType byte = 7 ZSizeType byte = 7
zScoreType byte = 8 ZScoreType byte = 8
binType byte = 9 BitType byte = 9
binMetaType byte = 10 BitMetaType byte = 10
maxDataType byte = 100 maxDataType byte = 100
expTimeType byte = 101 ExpTimeType byte = 101
expMetaType byte = 102 ExpMetaType byte = 102
)
var (
TypeName = map[byte]string{
KVType: "kv",
HashType: "hash",
HSizeType: "hsize",
ListType: "list",
LMetaType: "lmeta",
ZSetType: "zset",
ZSizeType: "zsize",
ZScoreType: "zscore",
BitType: "bit",
BitMetaType: "bitmeta",
ExpTimeType: "exptime",
ExpMetaType: "expmeta",
}
) )
const ( const (

View File

@ -26,11 +26,11 @@ func (db *DB) FlushAll() (drop int64, err error) {
func (db *DB) newEliminator() *elimination { func (db *DB) newEliminator() *elimination {
eliminator := newEliminator(db) eliminator := newEliminator(db)
eliminator.regRetireContext(kvType, db.kvTx, db.delete) eliminator.regRetireContext(KVType, db.kvTx, db.delete)
eliminator.regRetireContext(listType, db.listTx, db.lDelete) eliminator.regRetireContext(ListType, db.listTx, db.lDelete)
eliminator.regRetireContext(hashType, db.hashTx, db.hDelete) eliminator.regRetireContext(HashType, db.hashTx, db.hDelete)
eliminator.regRetireContext(zsetType, db.zsetTx, db.zDelete) eliminator.regRetireContext(ZSetType, db.zsetTx, db.zDelete)
eliminator.regRetireContext(binType, db.binTx, db.bDelete) eliminator.regRetireContext(BitType, db.binTx, db.bDelete)
return eliminator return eliminator
} }

View File

@ -10,6 +10,10 @@ import (
"os" "os"
) )
var (
ErrSkipEvent = errors.New("skip to next event")
)
var ( var (
errInvalidBinLogEvent = errors.New("invalid binglog event") errInvalidBinLogEvent = errors.New("invalid binglog event")
errInvalidBinLogFile = errors.New("invalid binlog file") errInvalidBinLogFile = errors.New("invalid binlog file")
@ -71,7 +75,7 @@ func (l *Ledis) replicateCommandEvent(event []byte) error {
return errors.New("command event not supported now") return errors.New("command event not supported now")
} }
func (l *Ledis) ReplicateFromReader(rb io.Reader) error { func ReadEventFromReader(rb io.Reader, f func(createTime uint32, event []byte) error) error {
var createTime uint32 var createTime uint32
var dataLen uint32 var dataLen uint32
var dataBuf bytes.Buffer var dataBuf bytes.Buffer
@ -94,9 +98,9 @@ func (l *Ledis) ReplicateFromReader(rb io.Reader) error {
return err return err
} }
err = l.ReplicateEvent(dataBuf.Bytes()) err = f(createTime, dataBuf.Bytes())
if err != nil { if err != nil && err != ErrSkipEvent {
log.Fatal("replication error %s, skip to next", err.Error()) return err
} }
dataBuf.Reset() dataBuf.Reset()
@ -105,6 +109,19 @@ func (l *Ledis) ReplicateFromReader(rb io.Reader) error {
return nil return nil
} }
func (l *Ledis) ReplicateFromReader(rb io.Reader) error {
f := func(createTime uint32, event []byte) error {
err := l.ReplicateEvent(event)
if err != nil {
log.Fatal("replication error %s, skip to next", err.Error())
return ErrSkipEvent
}
return nil
}
return ReadEventFromReader(rb, f)
}
func (l *Ledis) ReplicateFromData(data []byte) error { func (l *Ledis) ReplicateFromData(data []byte) error {
rb := bytes.NewReader(data) rb := bytes.NewReader(data)

View File

@ -119,19 +119,27 @@ func (datas segBitInfoArray) Swap(i, j int) {
func (db *DB) bEncodeMetaKey(key []byte) []byte { func (db *DB) bEncodeMetaKey(key []byte) []byte {
mk := make([]byte, len(key)+2) mk := make([]byte, len(key)+2)
mk[0] = db.index mk[0] = db.index
mk[1] = binMetaType mk[1] = BitMetaType
copy(mk, key) copy(mk, key)
return mk return mk
} }
func (db *DB) bDecodeMetaKey(bkey []byte) ([]byte, error) {
if len(bkey) < 2 || bkey[0] != db.index || bkey[1] != BitMetaType {
return nil, errBinKey
}
return bkey[2:], nil
}
func (db *DB) bEncodeBinKey(key []byte, seq uint32) []byte { func (db *DB) bEncodeBinKey(key []byte, seq uint32) []byte {
bk := make([]byte, len(key)+8) bk := make([]byte, len(key)+8)
pos := 0 pos := 0
bk[pos] = db.index bk[pos] = db.index
pos++ pos++
bk[pos] = binType bk[pos] = BitType
pos++ pos++
binary.BigEndian.PutUint16(bk[pos:], uint16(len(key))) binary.BigEndian.PutUint16(bk[pos:], uint16(len(key)))
@ -355,7 +363,7 @@ func (db *DB) bExpireAt(key []byte, when int64) (int64, error) {
if seq, _, err := db.bGetMeta(key); err != nil || seq < 0 { if seq, _, err := db.bGetMeta(key); err != nil || seq < 0 {
return 0, err return 0, err
} else { } else {
db.expireAt(t, binType, key, when) db.expireAt(t, BitType, key, when)
if err := t.Commit(); err != nil { if err := t.Commit(); err != nil {
return 0, err return 0, err
} }
@ -407,7 +415,7 @@ func (db *DB) BDelete(key []byte) (drop int64, err error) {
defer t.Unlock() defer t.Unlock()
drop = db.bDelete(t, key) drop = db.bDelete(t, key)
db.rmExpire(t, binType, key) db.rmExpire(t, BitType, key)
err = t.Commit() err = t.Commit()
return return
@ -736,7 +744,7 @@ func (db *DB) BOperation(op uint8, dstkey []byte, srckeys ...[]byte) (blen int32
// clear the old data in case // clear the old data in case
db.bDelete(t, dstkey) db.bDelete(t, dstkey)
db.rmExpire(t, binType, dstkey) db.rmExpire(t, BitType, dstkey)
// set data // set data
db.bSetMeta(t, dstkey, maxDstSeq, maxDstOff) db.bSetMeta(t, dstkey, maxDstSeq, maxDstOff)
@ -786,7 +794,7 @@ func (db *DB) BTTL(key []byte) (int64, error) {
return -1, err return -1, err
} }
return db.ttl(binType, key) return db.ttl(BitType, key)
} }
func (db *DB) BPersist(key []byte) (int64, error) { func (db *DB) BPersist(key []byte) (int64, error) {
@ -798,7 +806,7 @@ func (db *DB) BPersist(key []byte) (int64, error) {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
n, err := db.rmExpire(t, binType, key) n, err := db.rmExpire(t, BitType, key)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -818,14 +826,14 @@ func (db *DB) bFlush() (drop int64, err error) {
minKey := make([]byte, 2) minKey := make([]byte, 2)
minKey[0] = db.index minKey[0] = db.index
minKey[1] = binType minKey[1] = BitType
maxKey := make([]byte, 2) maxKey := make([]byte, 2)
maxKey[0] = db.index maxKey[0] = db.index
maxKey[1] = binMetaType + 1 maxKey[1] = BitMetaType + 1
drop, err = db.flushRegion(t, minKey, maxKey) drop, err = db.flushRegion(t, minKey, maxKey)
err = db.expFlush(t, binType) err = db.expFlush(t, BitType)
err = t.Commit() err = t.Commit()
return return

View File

@ -33,14 +33,14 @@ func (db *DB) hEncodeSizeKey(key []byte) []byte {
buf := make([]byte, len(key)+2) buf := make([]byte, len(key)+2)
buf[0] = db.index buf[0] = db.index
buf[1] = hSizeType buf[1] = HSizeType
copy(buf[2:], key) copy(buf[2:], key)
return buf return buf
} }
func (db *DB) hDecodeSizeKey(ek []byte) ([]byte, error) { func (db *DB) hDecodeSizeKey(ek []byte) ([]byte, error) {
if len(ek) < 2 || ek[0] != db.index || ek[1] != hSizeType { if len(ek) < 2 || ek[0] != db.index || ek[1] != HSizeType {
return nil, errHSizeKey return nil, errHSizeKey
} }
@ -53,7 +53,7 @@ func (db *DB) hEncodeHashKey(key []byte, field []byte) []byte {
pos := 0 pos := 0
buf[pos] = db.index buf[pos] = db.index
pos++ pos++
buf[pos] = hashType buf[pos] = HashType
pos++ pos++
binary.BigEndian.PutUint16(buf[pos:], uint16(len(key))) binary.BigEndian.PutUint16(buf[pos:], uint16(len(key)))
@ -70,7 +70,7 @@ func (db *DB) hEncodeHashKey(key []byte, field []byte) []byte {
} }
func (db *DB) hDecodeHashKey(ek []byte) ([]byte, []byte, error) { func (db *DB) hDecodeHashKey(ek []byte) ([]byte, []byte, error) {
if len(ek) < 5 || ek[0] != db.index || ek[1] != hashType { if len(ek) < 5 || ek[0] != db.index || ek[1] != HashType {
return nil, nil, errHashKey return nil, nil, errHashKey
} }
@ -151,7 +151,7 @@ func (db *DB) hExpireAt(key []byte, when int64) (int64, error) {
if hlen, err := db.HLen(key); err != nil || hlen == 0 { if hlen, err := db.HLen(key); err != nil || hlen == 0 {
return 0, err return 0, err
} else { } else {
db.expireAt(t, hashType, key, when) db.expireAt(t, HashType, key, when)
if err := t.Commit(); err != nil { if err := t.Commit(); err != nil {
return 0, err return 0, err
} }
@ -304,7 +304,7 @@ func (db *DB) hIncrSize(key []byte, delta int64) (int64, error) {
if size <= 0 { if size <= 0 {
size = 0 size = 0
t.Delete(sk) t.Delete(sk)
db.rmExpire(t, hashType, key) db.rmExpire(t, HashType, key)
} else { } else {
t.Put(sk, PutInt64(size)) t.Put(sk, PutInt64(size))
} }
@ -428,7 +428,7 @@ func (db *DB) HClear(key []byte) (int64, error) {
defer t.Unlock() defer t.Unlock()
num := db.hDelete(t, key) num := db.hDelete(t, key)
db.rmExpire(t, hashType, key) db.rmExpire(t, HashType, key)
err := t.Commit() err := t.Commit()
return num, err return num, err
@ -445,7 +445,7 @@ func (db *DB) HMclear(keys ...[]byte) (int64, error) {
} }
db.hDelete(t, key) db.hDelete(t, key)
db.rmExpire(t, hashType, key) db.rmExpire(t, HashType, key)
} }
err := t.Commit() err := t.Commit()
@ -455,18 +455,18 @@ func (db *DB) HMclear(keys ...[]byte) (int64, error) {
func (db *DB) hFlush() (drop int64, err error) { func (db *DB) hFlush() (drop int64, err error) {
minKey := make([]byte, 2) minKey := make([]byte, 2)
minKey[0] = db.index minKey[0] = db.index
minKey[1] = hashType minKey[1] = HashType
maxKey := make([]byte, 2) maxKey := make([]byte, 2)
maxKey[0] = db.index maxKey[0] = db.index
maxKey[1] = hSizeType + 1 maxKey[1] = HSizeType + 1
t := db.kvTx t := db.kvTx
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
drop, err = db.flushRegion(t, minKey, maxKey) drop, err = db.flushRegion(t, minKey, maxKey)
err = db.expFlush(t, hashType) err = db.expFlush(t, HashType)
err = t.Commit() err = t.Commit()
return return
@ -530,7 +530,7 @@ func (db *DB) HTTL(key []byte) (int64, error) {
return -1, err return -1, err
} }
return db.ttl(hashType, key) return db.ttl(HashType, key)
} }
func (db *DB) HPersist(key []byte) (int64, error) { func (db *DB) HPersist(key []byte) (int64, error) {
@ -542,7 +542,7 @@ func (db *DB) HPersist(key []byte) (int64, error) {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
n, err := db.rmExpire(t, hashType, key) n, err := db.rmExpire(t, HashType, key)
if err != nil { if err != nil {
return 0, err return 0, err
} }

View File

@ -31,13 +31,13 @@ func checkValueSize(value []byte) error {
func (db *DB) encodeKVKey(key []byte) []byte { func (db *DB) encodeKVKey(key []byte) []byte {
ek := make([]byte, len(key)+2) ek := make([]byte, len(key)+2)
ek[0] = db.index ek[0] = db.index
ek[1] = kvType ek[1] = KVType
copy(ek[2:], key) copy(ek[2:], key)
return ek return ek
} }
func (db *DB) decodeKVKey(ek []byte) ([]byte, error) { func (db *DB) decodeKVKey(ek []byte) ([]byte, error) {
if len(ek) < 2 || ek[0] != db.index || ek[1] != kvType { if len(ek) < 2 || ek[0] != db.index || ek[1] != KVType {
return nil, errKVKey return nil, errKVKey
} }
@ -51,7 +51,7 @@ func (db *DB) encodeKVMinKey() []byte {
func (db *DB) encodeKVMaxKey() []byte { func (db *DB) encodeKVMaxKey() []byte {
ek := db.encodeKVKey(nil) ek := db.encodeKVKey(nil)
ek[len(ek)-1] = kvType + 1 ek[len(ek)-1] = KVType + 1
return ek return ek
} }
@ -100,7 +100,7 @@ func (db *DB) setExpireAt(key []byte, when int64) (int64, error) {
if exist, err := db.Exists(key); err != nil || exist == 0 { if exist, err := db.Exists(key); err != nil || exist == 0 {
return 0, err return 0, err
} else { } else {
db.expireAt(t, kvType, key, when) db.expireAt(t, KVType, key, when)
if err := t.Commit(); err != nil { if err := t.Commit(); err != nil {
return 0, err return 0, err
} }
@ -132,7 +132,7 @@ func (db *DB) Del(keys ...[]byte) (int64, error) {
for i, k := range keys { for i, k := range keys {
t.Delete(codedKeys[i]) t.Delete(codedKeys[i])
db.rmExpire(t, kvType, k) db.rmExpire(t, KVType, k)
} }
err := t.Commit() err := t.Commit()
@ -317,7 +317,7 @@ func (db *DB) flush() (drop int64, err error) {
defer t.Unlock() defer t.Unlock()
drop, err = db.flushRegion(t, minKey, maxKey) drop, err = db.flushRegion(t, minKey, maxKey)
err = db.expFlush(t, kvType) err = db.expFlush(t, KVType)
err = t.Commit() err = t.Commit()
return return
@ -382,7 +382,7 @@ func (db *DB) TTL(key []byte) (int64, error) {
return -1, err return -1, err
} }
return db.ttl(kvType, key) return db.ttl(KVType, key)
} }
func (db *DB) Persist(key []byte) (int64, error) { func (db *DB) Persist(key []byte) (int64, error) {
@ -393,7 +393,7 @@ func (db *DB) Persist(key []byte) (int64, error) {
t := db.kvTx t := db.kvTx
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
n, err := db.rmExpire(t, kvType, key) n, err := db.rmExpire(t, KVType, key)
if err != nil { if err != nil {
return 0, err return 0, err
} }

View File

@ -23,14 +23,14 @@ var errListSeq = errors.New("invalid list sequence, overflow")
func (db *DB) lEncodeMetaKey(key []byte) []byte { func (db *DB) lEncodeMetaKey(key []byte) []byte {
buf := make([]byte, len(key)+2) buf := make([]byte, len(key)+2)
buf[0] = db.index buf[0] = db.index
buf[1] = lMetaType buf[1] = LMetaType
copy(buf[2:], key) copy(buf[2:], key)
return buf return buf
} }
func (db *DB) lDecodeMetaKey(ek []byte) ([]byte, error) { func (db *DB) lDecodeMetaKey(ek []byte) ([]byte, error) {
if len(ek) < 2 || ek[0] != db.index || ek[1] != lMetaType { if len(ek) < 2 || ek[0] != db.index || ek[1] != LMetaType {
return nil, errLMetaKey return nil, errLMetaKey
} }
@ -43,7 +43,7 @@ func (db *DB) lEncodeListKey(key []byte, seq int32) []byte {
pos := 0 pos := 0
buf[pos] = db.index buf[pos] = db.index
pos++ pos++
buf[pos] = listType buf[pos] = ListType
pos++ pos++
binary.BigEndian.PutUint16(buf[pos:], uint16(len(key))) binary.BigEndian.PutUint16(buf[pos:], uint16(len(key)))
@ -58,7 +58,7 @@ func (db *DB) lEncodeListKey(key []byte, seq int32) []byte {
} }
func (db *DB) lDecodeListKey(ek []byte) (key []byte, seq int32, err error) { func (db *DB) lDecodeListKey(ek []byte) (key []byte, seq int32, err error) {
if len(ek) < 8 || ek[0] != db.index || ek[1] != listType { if len(ek) < 8 || ek[0] != db.index || ek[1] != ListType {
err = errListKey err = errListKey
return return
} }
@ -84,8 +84,12 @@ func (db *DB) lpush(key []byte, whereSeq int32, args ...[]byte) (int64, error) {
var size int32 var size int32
var err error var err error
t := db.listTx
t.Lock()
defer t.Unlock()
metaKey := db.lEncodeMetaKey(key) metaKey := db.lEncodeMetaKey(key)
headSeq, tailSeq, size, err = db.lGetMeta(metaKey) headSeq, tailSeq, size, err = db.lGetMeta(nil, metaKey)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -102,10 +106,6 @@ func (db *DB) lpush(key []byte, whereSeq int32, args ...[]byte) (int64, error) {
delta = 1 delta = 1
} }
t := db.listTx
t.Lock()
defer t.Unlock()
// append elements // append elements
if size > 0 { if size > 0 {
seq += delta seq += delta
@ -148,7 +148,7 @@ func (db *DB) lpop(key []byte, whereSeq int32) ([]byte, error) {
var err error var err error
metaKey := db.lEncodeMetaKey(key) metaKey := db.lEncodeMetaKey(key)
headSeq, tailSeq, _, err = db.lGetMeta(metaKey) headSeq, tailSeq, _, err = db.lGetMeta(nil, metaKey)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -175,7 +175,7 @@ func (db *DB) lpop(key []byte, whereSeq int32) ([]byte, error) {
t.Delete(itemKey) t.Delete(itemKey)
size := db.lSetMeta(metaKey, headSeq, tailSeq) size := db.lSetMeta(metaKey, headSeq, tailSeq)
if size == 0 { if size == 0 {
db.rmExpire(t, hashType, key) db.rmExpire(t, HashType, key)
} }
err = t.Commit() err = t.Commit()
@ -191,7 +191,10 @@ func (db *DB) lDelete(t *tx, key []byte) int64 {
var tailSeq int32 var tailSeq int32
var err error var err error
headSeq, tailSeq, _, err = db.lGetMeta(mk) it := db.db.NewIterator()
defer it.Close()
headSeq, tailSeq, _, err = db.lGetMeta(it, mk)
if err != nil { if err != nil {
return 0 return 0
} }
@ -200,27 +203,24 @@ func (db *DB) lDelete(t *tx, key []byte) int64 {
startKey := db.lEncodeListKey(key, headSeq) startKey := db.lEncodeListKey(key, headSeq)
stopKey := db.lEncodeListKey(key, tailSeq) stopKey := db.lEncodeListKey(key, tailSeq)
it := db.db.RangeLimitIterator(startKey, stopKey, leveldb.RangeClose, 0, -1) rit := leveldb.NewRangeIterator(it, &leveldb.Range{startKey, stopKey, leveldb.RangeClose})
for ; it.Valid(); it.Next() { for ; rit.Valid(); rit.Next() {
t.Delete(it.Key()) t.Delete(rit.RawKey())
num++ num++
} }
it.Close()
t.Delete(mk) t.Delete(mk)
return num return num
} }
func (db *DB) lGetSeq(key []byte, whereSeq int32) (int64, error) { func (db *DB) lGetMeta(it *leveldb.Iterator, ek []byte) (headSeq int32, tailSeq int32, size int32, err error) {
ek := db.lEncodeListKey(key, whereSeq)
return Int64(db.db.Get(ek))
}
func (db *DB) lGetMeta(ek []byte) (headSeq int32, tailSeq int32, size int32, err error) {
var v []byte var v []byte
v, err = db.db.Get(ek) if it != nil {
v = it.Find(ek)
} else {
v, err = db.db.Get(ek)
}
if err != nil { if err != nil {
return return
} else if v == nil { } else if v == nil {
@ -264,7 +264,7 @@ func (db *DB) lExpireAt(key []byte, when int64) (int64, error) {
if llen, err := db.LLen(key); err != nil || llen == 0 { if llen, err := db.LLen(key); err != nil || llen == 0 {
return 0, err return 0, err
} else { } else {
db.expireAt(t, listType, key, when) db.expireAt(t, ListType, key, when)
if err := t.Commit(); err != nil { if err := t.Commit(); err != nil {
return 0, err return 0, err
} }
@ -284,7 +284,10 @@ func (db *DB) LIndex(key []byte, index int32) ([]byte, error) {
metaKey := db.lEncodeMetaKey(key) metaKey := db.lEncodeMetaKey(key)
headSeq, tailSeq, _, err = db.lGetMeta(metaKey) it := db.db.NewIterator()
defer it.Close()
headSeq, tailSeq, _, err = db.lGetMeta(it, metaKey)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -296,7 +299,9 @@ func (db *DB) LIndex(key []byte, index int32) ([]byte, error) {
} }
sk := db.lEncodeListKey(key, seq) sk := db.lEncodeListKey(key, seq)
return db.db.Get(sk) v := it.Find(sk)
return v, nil
} }
func (db *DB) LLen(key []byte) (int64, error) { func (db *DB) LLen(key []byte) (int64, error) {
@ -305,7 +310,7 @@ func (db *DB) LLen(key []byte) (int64, error) {
} }
ek := db.lEncodeMetaKey(key) ek := db.lEncodeMetaKey(key)
_, _, size, err := db.lGetMeta(ek) _, _, size, err := db.lGetMeta(nil, ek)
return int64(size), err return int64(size), err
} }
@ -328,7 +333,10 @@ func (db *DB) LRange(key []byte, start int32, stop int32) ([][]byte, error) {
metaKey := db.lEncodeMetaKey(key) metaKey := db.lEncodeMetaKey(key)
if headSeq, _, llen, err = db.lGetMeta(metaKey); err != nil { it := db.db.NewIterator()
defer it.Close()
if headSeq, _, llen, err = db.lGetMeta(it, metaKey); err != nil {
return nil, err return nil, err
} }
@ -356,12 +364,18 @@ func (db *DB) LRange(key []byte, start int32, stop int32) ([][]byte, error) {
v := make([][]byte, 0, limit) v := make([][]byte, 0, limit)
startKey := db.lEncodeListKey(key, headSeq) startKey := db.lEncodeListKey(key, headSeq)
it := db.db.RangeLimitIterator(startKey, nil, leveldb.RangeClose, 0, int(limit)) rit := leveldb.NewRangeLimitIterator(it,
for ; it.Valid(); it.Next() { &leveldb.Range{
v = append(v, it.Value()) Min: startKey,
} Max: nil,
Type: leveldb.RangeClose},
&leveldb.Limit{
Offset: 0,
Count: int(limit)})
it.Close() for ; rit.Valid(); rit.Next() {
v = append(v, rit.Value())
}
return v, nil return v, nil
} }
@ -384,7 +398,7 @@ func (db *DB) LClear(key []byte) (int64, error) {
defer t.Unlock() defer t.Unlock()
num := db.lDelete(t, key) num := db.lDelete(t, key)
db.rmExpire(t, listType, key) db.rmExpire(t, ListType, key)
err := t.Commit() err := t.Commit()
return num, err return num, err
@ -401,7 +415,7 @@ func (db *DB) LMclear(keys ...[]byte) (int64, error) {
} }
db.lDelete(t, key) db.lDelete(t, key)
db.rmExpire(t, listType, key) db.rmExpire(t, ListType, key)
} }
@ -412,18 +426,18 @@ func (db *DB) LMclear(keys ...[]byte) (int64, error) {
func (db *DB) lFlush() (drop int64, err error) { func (db *DB) lFlush() (drop int64, err error) {
minKey := make([]byte, 2) minKey := make([]byte, 2)
minKey[0] = db.index minKey[0] = db.index
minKey[1] = listType minKey[1] = ListType
maxKey := make([]byte, 2) maxKey := make([]byte, 2)
maxKey[0] = db.index maxKey[0] = db.index
maxKey[1] = lMetaType + 1 maxKey[1] = LMetaType + 1
t := db.listTx t := db.listTx
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
drop, err = db.flushRegion(t, minKey, maxKey) drop, err = db.flushRegion(t, minKey, maxKey)
err = db.expFlush(t, listType) err = db.expFlush(t, ListType)
err = t.Commit() err = t.Commit()
return return
@ -450,7 +464,7 @@ func (db *DB) LTTL(key []byte) (int64, error) {
return -1, err return -1, err
} }
return db.ttl(listType, key) return db.ttl(ListType, key)
} }
func (db *DB) LPersist(key []byte) (int64, error) { func (db *DB) LPersist(key []byte) (int64, error) {
@ -462,7 +476,7 @@ func (db *DB) LPersist(key []byte) (int64, error) {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
n, err := db.rmExpire(t, listType, key) n, err := db.rmExpire(t, ListType, key)
if err != nil { if err != nil {
return 0, err return 0, err
} }

View File

@ -26,7 +26,7 @@ func (db *DB) expEncodeTimeKey(dataType byte, key []byte, when int64) []byte {
buf := make([]byte, len(key)+11) buf := make([]byte, len(key)+11)
buf[0] = db.index buf[0] = db.index
buf[1] = expTimeType buf[1] = ExpTimeType
buf[2] = dataType buf[2] = dataType
pos := 3 pos := 3
@ -42,7 +42,7 @@ func (db *DB) expEncodeMetaKey(dataType byte, key []byte) []byte {
buf := make([]byte, len(key)+3) buf := make([]byte, len(key)+3)
buf[0] = db.index buf[0] = db.index
buf[1] = expMetaType buf[1] = ExpMetaType
buf[2] = dataType buf[2] = dataType
pos := 3 pos := 3
@ -52,7 +52,7 @@ func (db *DB) expEncodeMetaKey(dataType byte, key []byte) []byte {
} }
func (db *DB) expDecodeMetaKey(mk []byte) (byte, []byte, error) { func (db *DB) expDecodeMetaKey(mk []byte) (byte, []byte, error) {
if len(mk) <= 3 || mk[0] != db.index || mk[1] != expMetaType { if len(mk) <= 3 || mk[0] != db.index || mk[1] != ExpMetaType {
return 0, nil, errExpMetaKey return 0, nil, errExpMetaKey
} }
@ -60,7 +60,7 @@ func (db *DB) expDecodeMetaKey(mk []byte) (byte, []byte, error) {
} }
func (db *DB) expDecodeTimeKey(tk []byte) (byte, []byte, int64, error) { func (db *DB) expDecodeTimeKey(tk []byte) (byte, []byte, int64, error) {
if len(tk) < 11 || tk[0] != db.index || tk[1] != expTimeType { if len(tk) < 11 || tk[0] != db.index || tk[1] != ExpTimeType {
return 0, nil, 0, errExpTimeKey return 0, nil, 0, errExpTimeKey
} }
@ -114,12 +114,12 @@ func (db *DB) rmExpire(t *tx, dataType byte, key []byte) (int64, error) {
func (db *DB) expFlush(t *tx, dataType byte) (err error) { func (db *DB) expFlush(t *tx, dataType byte) (err error) {
minKey := make([]byte, 3) minKey := make([]byte, 3)
minKey[0] = db.index minKey[0] = db.index
minKey[1] = expTimeType minKey[1] = ExpTimeType
minKey[2] = dataType minKey[2] = dataType
maxKey := make([]byte, 3) maxKey := make([]byte, 3)
maxKey[0] = db.index maxKey[0] = db.index
maxKey[1] = expMetaType maxKey[1] = ExpMetaType
maxKey[2] = dataType + 1 maxKey[2] = dataType + 1
_, err = db.flushRegion(t, minKey, maxKey) _, err = db.flushRegion(t, minKey, maxKey)
@ -153,7 +153,7 @@ func (eli *elimination) active() {
db := eli.db db := eli.db
dbGet := db.db.Get dbGet := db.db.Get
minKey := db.expEncodeTimeKey(noneType, nil, 0) minKey := db.expEncodeTimeKey(NoneType, nil, 0)
maxKey := db.expEncodeTimeKey(maxDataType, nil, now) maxKey := db.expEncodeTimeKey(maxDataType, nil, now)
it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1)

View File

@ -45,14 +45,14 @@ func checkZSetKMSize(key []byte, member []byte) error {
func (db *DB) zEncodeSizeKey(key []byte) []byte { func (db *DB) zEncodeSizeKey(key []byte) []byte {
buf := make([]byte, len(key)+2) buf := make([]byte, len(key)+2)
buf[0] = db.index buf[0] = db.index
buf[1] = zSizeType buf[1] = ZSizeType
copy(buf[2:], key) copy(buf[2:], key)
return buf return buf
} }
func (db *DB) zDecodeSizeKey(ek []byte) ([]byte, error) { func (db *DB) zDecodeSizeKey(ek []byte) ([]byte, error) {
if len(ek) < 2 || ek[0] != db.index || ek[1] != zSizeType { if len(ek) < 2 || ek[0] != db.index || ek[1] != ZSizeType {
return nil, errZSizeKey return nil, errZSizeKey
} }
@ -66,7 +66,7 @@ func (db *DB) zEncodeSetKey(key []byte, member []byte) []byte {
buf[pos] = db.index buf[pos] = db.index
pos++ pos++
buf[pos] = zsetType buf[pos] = ZSetType
pos++ pos++
binary.BigEndian.PutUint16(buf[pos:], uint16(len(key))) binary.BigEndian.PutUint16(buf[pos:], uint16(len(key)))
@ -84,7 +84,7 @@ func (db *DB) zEncodeSetKey(key []byte, member []byte) []byte {
} }
func (db *DB) zDecodeSetKey(ek []byte) ([]byte, []byte, error) { func (db *DB) zDecodeSetKey(ek []byte) ([]byte, []byte, error) {
if len(ek) < 5 || ek[0] != db.index || ek[1] != zsetType { if len(ek) < 5 || ek[0] != db.index || ek[1] != ZSetType {
return nil, nil, errZSetKey return nil, nil, errZSetKey
} }
@ -121,7 +121,7 @@ func (db *DB) zEncodeScoreKey(key []byte, member []byte, score int64) []byte {
buf[pos] = db.index buf[pos] = db.index
pos++ pos++
buf[pos] = zScoreType buf[pos] = ZScoreType
pos++ pos++
binary.BigEndian.PutUint16(buf[pos:], uint16(len(key))) binary.BigEndian.PutUint16(buf[pos:], uint16(len(key)))
@ -158,7 +158,7 @@ func (db *DB) zEncodeStopScoreKey(key []byte, score int64) []byte {
} }
func (db *DB) zDecodeScoreKey(ek []byte) (key []byte, member []byte, score int64, err error) { func (db *DB) zDecodeScoreKey(ek []byte) (key []byte, member []byte, score int64, err error) {
if len(ek) < 14 || ek[0] != db.index || ek[1] != zScoreType { if len(ek) < 14 || ek[0] != db.index || ek[1] != ZScoreType {
err = errZScoreKey err = errZScoreKey
return return
} }
@ -260,7 +260,7 @@ func (db *DB) zExpireAt(key []byte, when int64) (int64, error) {
if zcnt, err := db.ZCard(key); err != nil || zcnt == 0 { if zcnt, err := db.ZCard(key); err != nil || zcnt == 0 {
return 0, err return 0, err
} else { } else {
db.expireAt(t, zsetType, key, when) db.expireAt(t, ZSetType, key, when)
if err := t.Commit(); err != nil { if err := t.Commit(); err != nil {
return 0, err return 0, err
} }
@ -314,7 +314,7 @@ func (db *DB) zIncrSize(t *tx, key []byte, delta int64) (int64, error) {
if size <= 0 { if size <= 0 {
size = 0 size = 0
t.Delete(sk) t.Delete(sk)
db.rmExpire(t, zsetType, key) db.rmExpire(t, ZSetType, key)
} else { } else {
t.Put(sk, PutInt64(size)) t.Put(sk, PutInt64(size))
} }
@ -451,37 +451,37 @@ func (db *DB) zrank(key []byte, member []byte, reverse bool) (int64, error) {
k := db.zEncodeSetKey(key, member) k := db.zEncodeSetKey(key, member)
if v, err := db.db.Get(k); err != nil { it := db.db.NewIterator()
return 0, err defer it.Close()
} else if v == nil {
if v := it.Find(k); v == nil {
return -1, nil return -1, nil
} else { } else {
if s, err := Int64(v, err); err != nil { if s, err := Int64(v, nil); err != nil {
return 0, err return 0, err
} else { } else {
var it *leveldb.RangeLimitIterator var rit *leveldb.RangeLimitIterator
sk := db.zEncodeScoreKey(key, member, s) sk := db.zEncodeScoreKey(key, member, s)
if !reverse { if !reverse {
minKey := db.zEncodeStartScoreKey(key, MinScore) minKey := db.zEncodeStartScoreKey(key, MinScore)
it = db.db.RangeLimitIterator(minKey, sk, leveldb.RangeClose, 0, -1)
rit = leveldb.NewRangeIterator(it, &leveldb.Range{minKey, sk, leveldb.RangeClose})
} else { } else {
maxKey := db.zEncodeStopScoreKey(key, MaxScore) maxKey := db.zEncodeStopScoreKey(key, MaxScore)
it = db.db.RevRangeLimitIterator(sk, maxKey, leveldb.RangeClose, 0, -1) rit = leveldb.NewRevRangeIterator(it, &leveldb.Range{sk, maxKey, leveldb.RangeClose})
} }
var lastKey []byte = nil var lastKey []byte = nil
var n int64 = 0 var n int64 = 0
for ; it.Valid(); it.Next() { for ; rit.Valid(); rit.Next() {
n++ n++
lastKey = it.BufKey(lastKey) lastKey = rit.BufKey(lastKey)
} }
it.Close()
if _, m, _, err := db.zDecodeScoreKey(lastKey); err == nil && bytes.Equal(m, member) { if _, m, _, err := db.zDecodeScoreKey(lastKey); err == nil && bytes.Equal(m, member) {
n-- n--
return n, nil return n, nil
@ -734,15 +734,17 @@ func (db *DB) zFlush() (drop int64, err error) {
minKey := make([]byte, 2) minKey := make([]byte, 2)
minKey[0] = db.index minKey[0] = db.index
minKey[1] = zsetType minKey[1] = ZSetType
maxKey := make([]byte, 2) maxKey := make([]byte, 2)
maxKey[0] = db.index maxKey[0] = db.index
maxKey[1] = zScoreType + 1 maxKey[1] = ZScoreType + 1
it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1) it := db.db.RangeLimitIterator(minKey, maxKey, leveldb.RangeROpen, 0, -1)
defer it.Close()
for ; it.Valid(); it.Next() { for ; it.Valid(); it.Next() {
t.Delete(it.Key()) t.Delete(it.RawKey())
drop++ drop++
if drop&1023 == 0 { if drop&1023 == 0 {
if err = t.Commit(); err != nil { if err = t.Commit(); err != nil {
@ -750,9 +752,8 @@ func (db *DB) zFlush() (drop int64, err error) {
} }
} }
} }
it.Close()
db.expFlush(t, zsetType) db.expFlush(t, ZSetType)
err = t.Commit() err = t.Commit()
return return
@ -818,7 +819,7 @@ func (db *DB) ZTTL(key []byte) (int64, error) {
return -1, err return -1, err
} }
return db.ttl(zsetType, key) return db.ttl(ZSetType, key)
} }
func (db *DB) ZPersist(key []byte) (int64, error) { func (db *DB) ZPersist(key []byte) (int64, error) {
@ -830,7 +831,7 @@ func (db *DB) ZPersist(key []byte) (int64, error) {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
n, err := db.rmExpire(t, zsetType, key) n, err := db.rmExpire(t, ZSetType, key)
if err != nil { if err != nil {
return 0, err return 0, err
} }

View File

@ -125,8 +125,10 @@ func (it *Iterator) BufValue(b []byte) []byte {
} }
func (it *Iterator) Close() { func (it *Iterator) Close() {
C.leveldb_iter_destroy(it.it) if it.it != nil {
it.it = nil C.leveldb_iter_destroy(it.it)
it.it = nil
}
} }
func (it *Iterator) Valid() bool { func (it *Iterator) Valid() bool {
@ -274,6 +276,14 @@ func NewRevRangeLimitIterator(i *Iterator, r *Range, l *Limit) *RangeLimitIterat
return rangeLimitIterator(i, r, l, IteratorBackward) return rangeLimitIterator(i, r, l, IteratorBackward)
} }
func NewRangeIterator(i *Iterator, r *Range) *RangeLimitIterator {
return rangeLimitIterator(i, r, &Limit{0, -1}, IteratorForward)
}
func NewRevRangeIterator(i *Iterator, r *Range) *RangeLimitIterator {
return rangeLimitIterator(i, r, &Limit{0, -1}, IteratorBackward)
}
func rangeLimitIterator(i *Iterator, r *Range, l *Limit, direction uint8) *RangeLimitIterator { func rangeLimitIterator(i *Iterator, r *Range, l *Limit, direction uint8) *RangeLimitIterator {
it := new(RangeLimitIterator) it := new(RangeLimitIterator)

View File

@ -81,7 +81,7 @@ func (c *client) run() {
} }
func (c *client) readLine() ([]byte, error) { func (c *client) readLine() ([]byte, error) {
return readLine(c.rb) return ReadLine(c.rb)
} }
//A client sends to the Redis server a RESP Array consisting of just Bulk Strings. //A client sends to the Redis server a RESP Array consisting of just Bulk Strings.

View File

@ -23,13 +23,55 @@ var (
errConnectMaster = errors.New("connect master error") errConnectMaster = errors.New("connect master error")
) )
type MasterInfo struct {
Addr string `json:"addr"`
LogFileIndex int64 `json:"log_file_index"`
LogPos int64 `json:"log_pos"`
}
func (m *MasterInfo) Save(filePath string) error {
data, err := json.Marshal(m)
if err != nil {
return err
}
filePathBak := fmt.Sprintf("%s.bak", filePath)
var fd *os.File
fd, err = os.OpenFile(filePathBak, os.O_CREATE|os.O_WRONLY, os.ModePerm)
if err != nil {
return err
}
if _, err = fd.Write(data); err != nil {
fd.Close()
return err
}
fd.Close()
return os.Rename(filePathBak, filePath)
}
func (m *MasterInfo) Load(filePath string) error {
data, err := ioutil.ReadFile(filePath)
if err != nil {
if os.IsNotExist(err) {
return nil
} else {
return err
}
}
if err = json.Unmarshal(data, m); err != nil {
return err
}
return nil
}
type master struct { type master struct {
sync.Mutex sync.Mutex
addr string `json:"addr"`
logFileIndex int64 `json:"log_file_index"`
logPos int64 `json:"log_pos"`
c net.Conn c net.Conn
rb *bufio.Reader rb *bufio.Reader
@ -37,8 +79,9 @@ type master struct {
quit chan struct{} quit chan struct{}
infoName string infoName string
infoNameBak string
info *MasterInfo
wg sync.WaitGroup wg sync.WaitGroup
@ -52,12 +95,13 @@ func newMaster(app *App) *master {
m.app = app m.app = app
m.infoName = path.Join(m.app.cfg.DataDir, "master.info") m.infoName = path.Join(m.app.cfg.DataDir, "master.info")
m.infoNameBak = fmt.Sprintf("%s.bak", m.infoName)
m.quit = make(chan struct{}, 1) m.quit = make(chan struct{}, 1)
m.compressBuf = make([]byte, 256) m.compressBuf = make([]byte, 256)
m.info = new(MasterInfo)
//if load error, we will start a fullsync later //if load error, we will start a fullsync later
m.loadInfo() m.loadInfo()
@ -79,53 +123,15 @@ func (m *master) Close() {
} }
func (m *master) loadInfo() error { func (m *master) loadInfo() error {
data, err := ioutil.ReadFile(m.infoName) return m.info.Load(m.infoName)
if err != nil {
if os.IsNotExist(err) {
return nil
} else {
return err
}
}
if err = json.Unmarshal(data, m); err != nil {
return err
}
return nil
} }
func (m *master) saveInfo() error { func (m *master) saveInfo() error {
data, err := json.Marshal(struct { return m.info.Save(m.infoName)
Addr string `json:"addr"`
LogFileIndex int64 `json:"log_file_index"`
LogPos int64 `json:"log_pos"`
}{
m.addr,
m.logFileIndex,
m.logPos,
})
if err != nil {
return err
}
var fd *os.File
fd, err = os.OpenFile(m.infoNameBak, os.O_CREATE|os.O_WRONLY, os.ModePerm)
if err != nil {
return err
}
if _, err = fd.Write(data); err != nil {
fd.Close()
return err
}
fd.Close()
return os.Rename(m.infoNameBak, m.infoName)
} }
func (m *master) connect() error { func (m *master) connect() error {
if len(m.addr) == 0 { if len(m.info.Addr) == 0 {
return fmt.Errorf("no assign master addr") return fmt.Errorf("no assign master addr")
} }
@ -134,7 +140,7 @@ func (m *master) connect() error {
m.c = nil m.c = nil
} }
if c, err := net.Dial("tcp", m.addr); err != nil { if c, err := net.Dial("tcp", m.info.Addr); err != nil {
return err return err
} else { } else {
m.c = c m.c = c
@ -145,9 +151,9 @@ func (m *master) connect() error {
} }
func (m *master) resetInfo(addr string) { func (m *master) resetInfo(addr string) {
m.addr = addr m.info.Addr = addr
m.logFileIndex = 0 m.info.LogFileIndex = 0
m.logPos = 0 m.info.LogPos = 0
} }
func (m *master) stopReplication() error { func (m *master) stopReplication() error {
@ -165,7 +171,7 @@ func (m *master) startReplication(masterAddr string) error {
//stop last replcation, if avaliable //stop last replcation, if avaliable
m.Close() m.Close()
if masterAddr != m.addr { if masterAddr != m.info.Addr {
m.resetInfo(masterAddr) m.resetInfo(masterAddr)
if err := m.saveInfo(); err != nil { if err := m.saveInfo(); err != nil {
log.Error("save master info error %s", err.Error()) log.Error("save master info error %s", err.Error())
@ -189,20 +195,20 @@ func (m *master) runReplication() {
return return
default: default:
if err := m.connect(); err != nil { if err := m.connect(); err != nil {
log.Error("connect master %s error %s, try 2s later", m.addr, err.Error()) log.Error("connect master %s error %s, try 2s later", m.info.Addr, err.Error())
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
continue continue
} }
} }
if m.logFileIndex == 0 { if m.info.LogFileIndex == 0 {
//try a fullsync //try a fullsync
if err := m.fullSync(); err != nil { if err := m.fullSync(); err != nil {
log.Warn("full sync error %s", err.Error()) log.Warn("full sync error %s", err.Error())
return return
} }
if m.logFileIndex == 0 { if m.info.LogFileIndex == 0 {
//master not support binlog, we cannot sync, so stop replication //master not support binlog, we cannot sync, so stop replication
m.stopReplication() m.stopReplication()
return return
@ -211,14 +217,14 @@ func (m *master) runReplication() {
for { for {
for { for {
lastIndex := m.logFileIndex lastIndex := m.info.LogFileIndex
lastPos := m.logPos lastPos := m.info.LogPos
if err := m.sync(); err != nil { if err := m.sync(); err != nil {
log.Warn("sync error %s", err.Error()) log.Warn("sync error %s", err.Error())
return return
} }
if m.logFileIndex == lastIndex && m.logPos == lastPos { if m.info.LogFileIndex == lastIndex && m.info.LogPos == lastPos {
//sync no data, wait 1s and retry //sync no data, wait 1s and retry
break break
} }
@ -254,7 +260,7 @@ func (m *master) fullSync() error {
defer os.Remove(dumpPath) defer os.Remove(dumpPath)
err = readBulkTo(m.rb, f) err = ReadBulkTo(m.rb, f)
f.Close() f.Close()
if err != nil { if err != nil {
log.Error("read dump data error %s", err.Error()) log.Error("read dump data error %s", err.Error())
@ -273,15 +279,15 @@ func (m *master) fullSync() error {
return err return err
} }
m.logFileIndex = head.LogFileIndex m.info.LogFileIndex = head.LogFileIndex
m.logPos = head.LogPos m.info.LogPos = head.LogPos
return m.saveInfo() return m.saveInfo()
} }
func (m *master) sync() error { func (m *master) sync() error {
logIndexStr := strconv.FormatInt(m.logFileIndex, 10) logIndexStr := strconv.FormatInt(m.info.LogFileIndex, 10)
logPosStr := strconv.FormatInt(m.logPos, 10) logPosStr := strconv.FormatInt(m.info.LogPos, 10)
cmd := ledis.Slice(fmt.Sprintf(syncCmdFormat, len(logIndexStr), cmd := ledis.Slice(fmt.Sprintf(syncCmdFormat, len(logIndexStr),
logIndexStr, len(logPosStr), logPosStr)) logIndexStr, len(logPosStr), logPosStr))
@ -291,7 +297,7 @@ func (m *master) sync() error {
m.syncBuf.Reset() m.syncBuf.Reset()
err := readBulkTo(m.rb, &m.syncBuf) err := ReadBulkTo(m.rb, &m.syncBuf)
if err != nil { if err != nil {
return err return err
} }
@ -308,14 +314,14 @@ func (m *master) sync() error {
return fmt.Errorf("invalid sync data len %d", len(buf)) return fmt.Errorf("invalid sync data len %d", len(buf))
} }
m.logFileIndex = int64(binary.BigEndian.Uint64(buf[0:8])) m.info.LogFileIndex = int64(binary.BigEndian.Uint64(buf[0:8]))
m.logPos = int64(binary.BigEndian.Uint64(buf[8:16])) m.info.LogPos = int64(binary.BigEndian.Uint64(buf[8:16]))
if m.logFileIndex == 0 { if m.info.LogFileIndex == 0 {
//master now not support binlog, stop replication //master now not support binlog, stop replication
m.stopReplication() m.stopReplication()
return nil return nil
} else if m.logFileIndex == -1 { } else if m.info.LogFileIndex == -1 {
//-1 means than binlog index and pos are lost, we must start a full sync instead //-1 means than binlog index and pos are lost, we must start a full sync instead
return m.fullSync() return m.fullSync()
} }

View File

@ -14,7 +14,7 @@ var (
errLineFormat = errors.New("bad response line format") errLineFormat = errors.New("bad response line format")
) )
func readLine(rb *bufio.Reader) ([]byte, error) { func ReadLine(rb *bufio.Reader) ([]byte, error) {
p, err := rb.ReadSlice('\n') p, err := rb.ReadSlice('\n')
if err != nil { if err != nil {
@ -27,8 +27,8 @@ func readLine(rb *bufio.Reader) ([]byte, error) {
return p[:i], nil return p[:i], nil
} }
func readBulkTo(rb *bufio.Reader, w io.Writer) error { func ReadBulkTo(rb *bufio.Reader, w io.Writer) error {
l, err := readLine(rb) l, err := ReadLine(rb)
if len(l) == 0 { if len(l) == 0 {
return errBulkFormat return errBulkFormat
} else if l[0] == '$' { } else if l[0] == '$' {
@ -43,7 +43,7 @@ func readBulkTo(rb *bufio.Reader, w io.Writer) error {
return err return err
} }
if l, err = readLine(rb); err != nil { if l, err = ReadLine(rb); err != nil {
return err return err
} else if len(l) != 0 { } else if len(l) != 0 {
return errBulkFormat return errBulkFormat