diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 282164f..38c3bf1 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -60,7 +60,7 @@ }, { "ImportPath": "github.com/siddontang/goredis", - "Rev": "f711beb9ecead18cf638a898610aa2c24ccb6dc7" + "Rev": "ca4c5d7500bcc6850c52824caf2c49a6015c8a03" }, { "ImportPath": "github.com/siddontang/rdb", diff --git a/Godeps/_workspace/src/github.com/siddontang/goredis/client.go b/Godeps/_workspace/src/github.com/siddontang/goredis/client.go index b9682a9..2ada1d9 100644 --- a/Godeps/_workspace/src/github.com/siddontang/goredis/client.go +++ b/Godeps/_workspace/src/github.com/siddontang/goredis/client.go @@ -5,6 +5,7 @@ import ( "net" "strings" "sync" + "time" ) type PoolConn struct { @@ -30,6 +31,9 @@ type Client struct { password string conns *list.List + + quit chan struct{} + wg sync.WaitGroup } func getProto(addr string) string { @@ -50,6 +54,10 @@ func NewClient(addr string, password string) *Client { c.password = password c.conns = list.New() + c.quit = make(chan struct{}) + + c.wg.Add(1) + go c.onCheck() return c } @@ -105,6 +113,9 @@ func (c *Client) Close() { c.Lock() defer c.Unlock() + close(c.quit) + c.wg.Wait() + for c.conns.Len() > 0 { e := c.conns.Front() co := e.Value.(*Conn) @@ -142,11 +153,62 @@ func (c *Client) get() (co *Conn, err error) { func (c *Client) put(conn *Conn) { c.Lock() - if c.conns.Len() >= c.maxIdleConns { - c.Unlock() - conn.Close() + defer c.Unlock() + + for c.conns.Len() >= c.maxIdleConns { + // remove back + e := c.conns.Back() + co := e.Value.(*Conn) + c.conns.Remove(e) + + co.Close() + } + + c.conns.PushFront(conn) +} + +func (c *Client) getIdle() *Conn { + c.Lock() + defer c.Unlock() + + if c.conns.Len() == 0 { + return nil } else { - c.conns.PushFront(conn) - c.Unlock() + e := c.conns.Back() + co := e.Value.(*Conn) + c.conns.Remove(e) + return co + } +} + +func (c *Client) checkIdle() { + co := c.getIdle() + if co == nil { + return + } + + _, err := co.Do("PING") + if err != nil { + co.Close() + } else { + c.put(co) + } +} + +func (c *Client) onCheck() { + t := time.NewTicker(3 * time.Second) + + defer func() { + t.Stop() + c.wg.Done() + }() + + for { + select { + case <-t.C: + c.checkIdle() + case <-c.quit: + return + } } } diff --git a/cmd/ledis-server/main.go b/cmd/ledis-server/main.go index 594a046..7987733 100644 --- a/cmd/ledis-server/main.go +++ b/cmd/ledis-server/main.go @@ -26,6 +26,7 @@ var readonly = flag.Bool("readonly", false, "set readonly mode, slave server is var rpl = flag.Bool("rpl", false, "enable replication or not, slave server is always enabled") var rplSync = flag.Bool("rpl_sync", false, "enable sync replication or not") var ttlCheck = flag.Int("ttl_check", 0, "TTL check interval") +var databases = flag.Int("databases", 0, "ledisdb maximum database number") func main() { runtime.GOMAXPROCS(runtime.NumCPU()) @@ -59,23 +60,27 @@ func main() { cfg.DBName = *dbName } + if *databases > 0 { + cfg.Databases = *databases + } + + // check bool flag, use it. + for _, arg := range os.Args { + arg := strings.ToLower(arg) + switch arg { + case "-rpl", "-rpl=true", "-rpl=false": + cfg.UseReplication = *rpl + case "-readonly", "-readonly=true", "-readonly=false": + cfg.Readonly = *readonly + case "-rpl_sync", "-rpl_sync=true", "-rpl_sync=false": + cfg.Replication.Sync = *rplSync + } + } + if len(*slaveof) > 0 { cfg.SlaveOf = *slaveof cfg.Readonly = true cfg.UseReplication = true - } else { - cfg.Readonly = *readonly - - // if rpl in command flag, use it. - for _, arg := range os.Args { - arg := strings.ToLower(arg) - if arg == "-rpl" || arg == "-rpl=true" || arg == "-rpl=false" { - cfg.UseReplication = *rpl - break - } - } - - cfg.Replication.Sync = *rplSync } if *ttlCheck > 0 { diff --git a/config/config.toml b/config/config.toml index 0b965f5..6135848 100644 --- a/config/config.toml +++ b/config/config.toml @@ -11,7 +11,7 @@ data_dir = "/tmp/ledis_server" # Set the number of databases. You can use `select dbindex` to choose a db. # dbindex must be in [0, databases - 1]. -# Maximum databases is 16 now. +# Default databases is 16, maximum is 10240 now. databases = 16 # Log server command, set empty to disable diff --git a/etc/ledis.conf b/etc/ledis.conf index 0b965f5..6135848 100644 --- a/etc/ledis.conf +++ b/etc/ledis.conf @@ -11,7 +11,7 @@ data_dir = "/tmp/ledis_server" # Set the number of databases. You can use `select dbindex` to choose a db. # dbindex must be in [0, databases - 1]. -# Maximum databases is 16 now. +# Default databases is 16, maximum is 10240 now. databases = 16 # Log server command, set empty to disable diff --git a/ledis/ledis.go b/ledis/ledis.go index 01e0977..7397f2a 100644 --- a/ledis/ledis.go +++ b/ledis/ledis.go @@ -48,8 +48,8 @@ func Open(cfg *config.Config) (*Ledis, error) { if cfg.Databases == 0 { cfg.Databases = 16 - } else if cfg.Databases > 16 { - cfg.Databases = 16 + } else if cfg.Databases > MaxDatabases { + cfg.Databases = MaxDatabases } os.MkdirAll(cfg.DataDir, 0755) @@ -112,8 +112,8 @@ func (l *Ledis) Close() { } func (l *Ledis) Select(index int) (*DB, error) { - if index < 0 || index >= MaxDatabases { - return nil, fmt.Errorf("invalid db index %d, must in [0, %d]", index, MaxDatabases-1) + if index < 0 || index >= l.cfg.Databases { + return nil, fmt.Errorf("invalid db index %d, must in [0, %d]", index, l.cfg.Databases-1) } l.dbLock.Lock() diff --git a/ledis/ledis_test.go b/ledis/ledis_test.go index a0c9879..de308e7 100644 --- a/ledis/ledis_test.go +++ b/ledis/ledis_test.go @@ -14,6 +14,7 @@ func getTestDB() *DB { f := func() { cfg := config.NewConfigDefault() cfg.DataDir = "/tmp/test_ledis" + cfg.Databases = 10240 os.RemoveAll(cfg.DataDir) diff --git a/server/client_resp.go b/server/client_resp.go index 6d493f9..eabc357 100644 --- a/server/client_resp.go +++ b/server/client_resp.go @@ -134,6 +134,11 @@ func (c *respClient) run() { reqData, err := c.readRequest() if err == nil { err = c.handleRequest(reqData) + + c.cmd = "" + c.args = nil + + c.ar.Reset() } if err != nil { @@ -154,6 +159,16 @@ func (c *respClient) handleRequest(reqData [][]byte) error { c.cmd = hack.String(lowerSlice(reqData[0])) c.args = reqData[1:] } + + if c.cmd == "xselect" { + err := c.handleXSelectCmd() + if err != nil { + c.resp.writeError(err) + c.resp.flush() + return nil + } + } + if c.cmd == "quit" { c.activeQuit = true c.resp.writeStatus(OK) @@ -164,10 +179,35 @@ func (c *respClient) handleRequest(reqData [][]byte) error { c.perform() - c.cmd = "" - c.args = nil + return nil +} - c.ar.Reset() +// XSELECT db THEN command +func (c *respClient) handleXSelectCmd() error { + if len(c.args) <= 2 { + // invalid command format + return fmt.Errorf("invalid format for XSELECT, must XSELECT db THEN your command") + } + + if hack.String(upperSlice(c.args[1])) != "THEN" { + // invalid command format, just resturn here + return fmt.Errorf("invalid format for XSELECT, must XSELECT db THEN your command") + } + + index, err := strconv.Atoi(hack.String(c.args[0])) + if err != nil { + return fmt.Errorf("invalid db for XSELECT, err %v", err) + } + + db, err := c.app.ldb.Select(index) + if err != nil { + return fmt.Errorf("invalid db for XSELECT, err %v", err) + } + + c.db = db + + c.cmd = hack.String(lowerSlice(c.args[2])) + c.args = c.args[3:] return nil } diff --git a/server/cmd_server_test.go b/server/cmd_server_test.go new file mode 100644 index 0000000..ca283dc --- /dev/null +++ b/server/cmd_server_test.go @@ -0,0 +1,38 @@ +package server + +import ( + "github.com/siddontang/goredis" + "testing" +) + +func TestXSelect(t *testing.T) { + c1 := getTestConn() + defer c1.Close() + + c2 := getTestConn() + defer c2.Close() + + _, err := c1.Do("XSELECT", "1", "THEN", "SET", "tmp_select_key", "1") + if err != nil { + t.Fatal(err) + } + + _, err = goredis.Int(c2.Do("GET", "tmp_select_key")) + if err != goredis.ErrNil { + t.Fatal(err) + } + + n, _ := goredis.Int(c2.Do("XSELECT", "1", "THEN", "GET", "tmp_select_key")) + if n != 1 { + t.Fatal(n) + } + + n, _ = goredis.Int(c2.Do("GET", "tmp_select_key")) + if n != 1 { + t.Fatal(n) + } + + c1.Do("SELECT", 0) + c2.Do("SELECT", 0) + +} diff --git a/server/util.go b/server/util.go index 7c5b73d..a6ef5d4 100644 --- a/server/util.go +++ b/server/util.go @@ -134,3 +134,14 @@ func lowerSlice(buf []byte) []byte { } return buf } + +func upperSlice(buf []byte) []byte { + for i, r := range buf { + if 'a' <= r && r <= 'z' { + r -= 'a' - 'A' + } + + buf[i] = r + } + return buf +}