forked from mirror/ledisdb
Merge branch 'develop'
This commit is contained in:
commit
f8dbe30e0b
|
@ -60,7 +60,7 @@
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/siddontang/goredis",
|
"ImportPath": "github.com/siddontang/goredis",
|
||||||
"Rev": "f711beb9ecead18cf638a898610aa2c24ccb6dc7"
|
"Rev": "ca4c5d7500bcc6850c52824caf2c49a6015c8a03"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/siddontang/rdb",
|
"ImportPath": "github.com/siddontang/rdb",
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"net"
|
"net"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type PoolConn struct {
|
type PoolConn struct {
|
||||||
|
@ -30,6 +31,9 @@ type Client struct {
|
||||||
password string
|
password string
|
||||||
|
|
||||||
conns *list.List
|
conns *list.List
|
||||||
|
|
||||||
|
quit chan struct{}
|
||||||
|
wg sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
func getProto(addr string) string {
|
func getProto(addr string) string {
|
||||||
|
@ -50,6 +54,10 @@ func NewClient(addr string, password string) *Client {
|
||||||
c.password = password
|
c.password = password
|
||||||
|
|
||||||
c.conns = list.New()
|
c.conns = list.New()
|
||||||
|
c.quit = make(chan struct{})
|
||||||
|
|
||||||
|
c.wg.Add(1)
|
||||||
|
go c.onCheck()
|
||||||
|
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
@ -105,6 +113,9 @@ func (c *Client) Close() {
|
||||||
c.Lock()
|
c.Lock()
|
||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
|
|
||||||
|
close(c.quit)
|
||||||
|
c.wg.Wait()
|
||||||
|
|
||||||
for c.conns.Len() > 0 {
|
for c.conns.Len() > 0 {
|
||||||
e := c.conns.Front()
|
e := c.conns.Front()
|
||||||
co := e.Value.(*Conn)
|
co := e.Value.(*Conn)
|
||||||
|
@ -142,11 +153,62 @@ func (c *Client) get() (co *Conn, err error) {
|
||||||
|
|
||||||
func (c *Client) put(conn *Conn) {
|
func (c *Client) put(conn *Conn) {
|
||||||
c.Lock()
|
c.Lock()
|
||||||
if c.conns.Len() >= c.maxIdleConns {
|
defer c.Unlock()
|
||||||
c.Unlock()
|
|
||||||
conn.Close()
|
for c.conns.Len() >= c.maxIdleConns {
|
||||||
|
// remove back
|
||||||
|
e := c.conns.Back()
|
||||||
|
co := e.Value.(*Conn)
|
||||||
|
c.conns.Remove(e)
|
||||||
|
|
||||||
|
co.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
c.conns.PushFront(conn)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) getIdle() *Conn {
|
||||||
|
c.Lock()
|
||||||
|
defer c.Unlock()
|
||||||
|
|
||||||
|
if c.conns.Len() == 0 {
|
||||||
|
return nil
|
||||||
} else {
|
} else {
|
||||||
c.conns.PushFront(conn)
|
e := c.conns.Back()
|
||||||
c.Unlock()
|
co := e.Value.(*Conn)
|
||||||
|
c.conns.Remove(e)
|
||||||
|
return co
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) checkIdle() {
|
||||||
|
co := c.getIdle()
|
||||||
|
if co == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := co.Do("PING")
|
||||||
|
if err != nil {
|
||||||
|
co.Close()
|
||||||
|
} else {
|
||||||
|
c.put(co)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) onCheck() {
|
||||||
|
t := time.NewTicker(3 * time.Second)
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
t.Stop()
|
||||||
|
c.wg.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-t.C:
|
||||||
|
c.checkIdle()
|
||||||
|
case <-c.quit:
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,6 +26,7 @@ var readonly = flag.Bool("readonly", false, "set readonly mode, slave server is
|
||||||
var rpl = flag.Bool("rpl", false, "enable replication or not, slave server is always enabled")
|
var rpl = flag.Bool("rpl", false, "enable replication or not, slave server is always enabled")
|
||||||
var rplSync = flag.Bool("rpl_sync", false, "enable sync replication or not")
|
var rplSync = flag.Bool("rpl_sync", false, "enable sync replication or not")
|
||||||
var ttlCheck = flag.Int("ttl_check", 0, "TTL check interval")
|
var ttlCheck = flag.Int("ttl_check", 0, "TTL check interval")
|
||||||
|
var databases = flag.Int("databases", 0, "ledisdb maximum database number")
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
runtime.GOMAXPROCS(runtime.NumCPU())
|
runtime.GOMAXPROCS(runtime.NumCPU())
|
||||||
|
@ -59,23 +60,27 @@ func main() {
|
||||||
cfg.DBName = *dbName
|
cfg.DBName = *dbName
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if *databases > 0 {
|
||||||
|
cfg.Databases = *databases
|
||||||
|
}
|
||||||
|
|
||||||
|
// check bool flag, use it.
|
||||||
|
for _, arg := range os.Args {
|
||||||
|
arg := strings.ToLower(arg)
|
||||||
|
switch arg {
|
||||||
|
case "-rpl", "-rpl=true", "-rpl=false":
|
||||||
|
cfg.UseReplication = *rpl
|
||||||
|
case "-readonly", "-readonly=true", "-readonly=false":
|
||||||
|
cfg.Readonly = *readonly
|
||||||
|
case "-rpl_sync", "-rpl_sync=true", "-rpl_sync=false":
|
||||||
|
cfg.Replication.Sync = *rplSync
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if len(*slaveof) > 0 {
|
if len(*slaveof) > 0 {
|
||||||
cfg.SlaveOf = *slaveof
|
cfg.SlaveOf = *slaveof
|
||||||
cfg.Readonly = true
|
cfg.Readonly = true
|
||||||
cfg.UseReplication = true
|
cfg.UseReplication = true
|
||||||
} else {
|
|
||||||
cfg.Readonly = *readonly
|
|
||||||
|
|
||||||
// if rpl in command flag, use it.
|
|
||||||
for _, arg := range os.Args {
|
|
||||||
arg := strings.ToLower(arg)
|
|
||||||
if arg == "-rpl" || arg == "-rpl=true" || arg == "-rpl=false" {
|
|
||||||
cfg.UseReplication = *rpl
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg.Replication.Sync = *rplSync
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if *ttlCheck > 0 {
|
if *ttlCheck > 0 {
|
||||||
|
|
|
@ -11,7 +11,7 @@ data_dir = "/tmp/ledis_server"
|
||||||
|
|
||||||
# Set the number of databases. You can use `select dbindex` to choose a db.
|
# Set the number of databases. You can use `select dbindex` to choose a db.
|
||||||
# dbindex must be in [0, databases - 1].
|
# dbindex must be in [0, databases - 1].
|
||||||
# Maximum databases is 16 now.
|
# Default databases is 16, maximum is 10240 now.
|
||||||
databases = 16
|
databases = 16
|
||||||
|
|
||||||
# Log server command, set empty to disable
|
# Log server command, set empty to disable
|
||||||
|
|
|
@ -11,7 +11,7 @@ data_dir = "/tmp/ledis_server"
|
||||||
|
|
||||||
# Set the number of databases. You can use `select dbindex` to choose a db.
|
# Set the number of databases. You can use `select dbindex` to choose a db.
|
||||||
# dbindex must be in [0, databases - 1].
|
# dbindex must be in [0, databases - 1].
|
||||||
# Maximum databases is 16 now.
|
# Default databases is 16, maximum is 10240 now.
|
||||||
databases = 16
|
databases = 16
|
||||||
|
|
||||||
# Log server command, set empty to disable
|
# Log server command, set empty to disable
|
||||||
|
|
|
@ -48,8 +48,8 @@ func Open(cfg *config.Config) (*Ledis, error) {
|
||||||
|
|
||||||
if cfg.Databases == 0 {
|
if cfg.Databases == 0 {
|
||||||
cfg.Databases = 16
|
cfg.Databases = 16
|
||||||
} else if cfg.Databases > 16 {
|
} else if cfg.Databases > MaxDatabases {
|
||||||
cfg.Databases = 16
|
cfg.Databases = MaxDatabases
|
||||||
}
|
}
|
||||||
|
|
||||||
os.MkdirAll(cfg.DataDir, 0755)
|
os.MkdirAll(cfg.DataDir, 0755)
|
||||||
|
@ -112,8 +112,8 @@ func (l *Ledis) Close() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *Ledis) Select(index int) (*DB, error) {
|
func (l *Ledis) Select(index int) (*DB, error) {
|
||||||
if index < 0 || index >= MaxDatabases {
|
if index < 0 || index >= l.cfg.Databases {
|
||||||
return nil, fmt.Errorf("invalid db index %d, must in [0, %d]", index, MaxDatabases-1)
|
return nil, fmt.Errorf("invalid db index %d, must in [0, %d]", index, l.cfg.Databases-1)
|
||||||
}
|
}
|
||||||
|
|
||||||
l.dbLock.Lock()
|
l.dbLock.Lock()
|
||||||
|
|
|
@ -14,6 +14,7 @@ func getTestDB() *DB {
|
||||||
f := func() {
|
f := func() {
|
||||||
cfg := config.NewConfigDefault()
|
cfg := config.NewConfigDefault()
|
||||||
cfg.DataDir = "/tmp/test_ledis"
|
cfg.DataDir = "/tmp/test_ledis"
|
||||||
|
cfg.Databases = 10240
|
||||||
|
|
||||||
os.RemoveAll(cfg.DataDir)
|
os.RemoveAll(cfg.DataDir)
|
||||||
|
|
||||||
|
|
|
@ -134,6 +134,11 @@ func (c *respClient) run() {
|
||||||
reqData, err := c.readRequest()
|
reqData, err := c.readRequest()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
err = c.handleRequest(reqData)
|
err = c.handleRequest(reqData)
|
||||||
|
|
||||||
|
c.cmd = ""
|
||||||
|
c.args = nil
|
||||||
|
|
||||||
|
c.ar.Reset()
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -154,6 +159,16 @@ func (c *respClient) handleRequest(reqData [][]byte) error {
|
||||||
c.cmd = hack.String(lowerSlice(reqData[0]))
|
c.cmd = hack.String(lowerSlice(reqData[0]))
|
||||||
c.args = reqData[1:]
|
c.args = reqData[1:]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if c.cmd == "xselect" {
|
||||||
|
err := c.handleXSelectCmd()
|
||||||
|
if err != nil {
|
||||||
|
c.resp.writeError(err)
|
||||||
|
c.resp.flush()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if c.cmd == "quit" {
|
if c.cmd == "quit" {
|
||||||
c.activeQuit = true
|
c.activeQuit = true
|
||||||
c.resp.writeStatus(OK)
|
c.resp.writeStatus(OK)
|
||||||
|
@ -164,10 +179,35 @@ func (c *respClient) handleRequest(reqData [][]byte) error {
|
||||||
|
|
||||||
c.perform()
|
c.perform()
|
||||||
|
|
||||||
c.cmd = ""
|
return nil
|
||||||
c.args = nil
|
}
|
||||||
|
|
||||||
c.ar.Reset()
|
// XSELECT db THEN command
|
||||||
|
func (c *respClient) handleXSelectCmd() error {
|
||||||
|
if len(c.args) <= 2 {
|
||||||
|
// invalid command format
|
||||||
|
return fmt.Errorf("invalid format for XSELECT, must XSELECT db THEN your command")
|
||||||
|
}
|
||||||
|
|
||||||
|
if hack.String(upperSlice(c.args[1])) != "THEN" {
|
||||||
|
// invalid command format, just resturn here
|
||||||
|
return fmt.Errorf("invalid format for XSELECT, must XSELECT db THEN your command")
|
||||||
|
}
|
||||||
|
|
||||||
|
index, err := strconv.Atoi(hack.String(c.args[0]))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("invalid db for XSELECT, err %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
db, err := c.app.ldb.Select(index)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("invalid db for XSELECT, err %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.db = db
|
||||||
|
|
||||||
|
c.cmd = hack.String(lowerSlice(c.args[2]))
|
||||||
|
c.args = c.args[3:]
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,38 @@
|
||||||
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/siddontang/goredis"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestXSelect(t *testing.T) {
|
||||||
|
c1 := getTestConn()
|
||||||
|
defer c1.Close()
|
||||||
|
|
||||||
|
c2 := getTestConn()
|
||||||
|
defer c2.Close()
|
||||||
|
|
||||||
|
_, err := c1.Do("XSELECT", "1", "THEN", "SET", "tmp_select_key", "1")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = goredis.Int(c2.Do("GET", "tmp_select_key"))
|
||||||
|
if err != goredis.ErrNil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
n, _ := goredis.Int(c2.Do("XSELECT", "1", "THEN", "GET", "tmp_select_key"))
|
||||||
|
if n != 1 {
|
||||||
|
t.Fatal(n)
|
||||||
|
}
|
||||||
|
|
||||||
|
n, _ = goredis.Int(c2.Do("GET", "tmp_select_key"))
|
||||||
|
if n != 1 {
|
||||||
|
t.Fatal(n)
|
||||||
|
}
|
||||||
|
|
||||||
|
c1.Do("SELECT", 0)
|
||||||
|
c2.Do("SELECT", 0)
|
||||||
|
|
||||||
|
}
|
|
@ -134,3 +134,14 @@ func lowerSlice(buf []byte) []byte {
|
||||||
}
|
}
|
||||||
return buf
|
return buf
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func upperSlice(buf []byte) []byte {
|
||||||
|
for i, r := range buf {
|
||||||
|
if 'a' <= r && r <= 'z' {
|
||||||
|
r -= 'a' - 'A'
|
||||||
|
}
|
||||||
|
|
||||||
|
buf[i] = r
|
||||||
|
}
|
||||||
|
return buf
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue