From 3055e6b2fca0e65671b31bb8b4bc3e870e0b20b7 Mon Sep 17 00:00:00 2001 From: siddontang Date: Wed, 3 Dec 2014 16:27:52 +0800 Subject: [PATCH] add xmigrate xmigrate will be used in xcodis --- client/go/ledis/conn.go | 36 +++++- server/app.go | 14 +-- server/client.go | 36 +++--- server/cmd_migrate.go | 243 ++++++++++++++++++++++++++++++------- server/cmd_migrate_test.go | 95 +++++++++++++-- 5 files changed, 346 insertions(+), 78 deletions(-) diff --git a/client/go/ledis/conn.go b/client/go/ledis/conn.go index 2b8e411..12f5f00 100644 --- a/client/go/ledis/conn.go +++ b/client/go/ledis/conn.go @@ -8,6 +8,7 @@ import ( "io" "net" "strconv" + "strings" "sync" "time" ) @@ -78,6 +79,22 @@ func (c *Conn) SetConnectTimeout(t time.Duration) { c.cm.Unlock() } +func (c *Conn) SetReadDeadline(t time.Time) { + c.cm.Lock() + if c.c != nil { + c.c.SetReadDeadline(t) + } + c.cm.Unlock() +} + +func (c *Conn) SetWriteDeadline(t time.Time) { + c.cm.Lock() + if c.c != nil { + c.c.SetWriteDeadline(t) + } + c.cm.Unlock() +} + func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) { if err := c.Send(cmd, args...); err != nil { return nil, err @@ -87,6 +104,21 @@ func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) { } func (c *Conn) Send(cmd string, args ...interface{}) error { + var err error + for i := 0; i < 2; i++ { + if err = c.send(cmd, args...); err != nil { + if e, ok := err.(*net.OpError); ok && strings.Contains(e.Error(), "use of closed network connection") { + //send to a closed connection, try again + continue + } + } else { + return nil + } + } + return err +} + +func (c *Conn) send(cmd string, args ...interface{}) error { if err := c.connect(); err != nil { return err } @@ -138,7 +170,9 @@ func (c *Conn) ReceiveBulkTo(w io.Writer) error { func (c *Conn) finalize() { c.cm.Lock() if !c.closed { - c.c.Close() + if c.c != nil { + c.c.Close() + } c.closed = true } c.cm.Unlock() diff --git a/server/app.go b/server/app.go index 09cba2c..08cd5d0 100644 --- a/server/app.go +++ b/server/app.go @@ -44,8 +44,8 @@ type App struct { rcm sync.Mutex rcs map[*respClient]struct{} - migrateConnM sync.Mutex - migrateConns map[string]*goledis.Conn + migrateM sync.Mutex + migrateClients map[string]*goledis.Client } func netType(s string) string { @@ -75,7 +75,7 @@ func NewApp(cfg *config.Config) (*App, error) { app.rcs = make(map[*respClient]struct{}) - app.migrateConns = make(map[string]*goledis.Conn) + app.migrateClients = make(map[string]*goledis.Client) var err error @@ -139,12 +139,12 @@ func (app *App) Close() { app.listener.Close() //close all migrate connections - app.migrateConnM.Lock() - for k, c := range app.migrateConns { + app.migrateM.Lock() + for k, c := range app.migrateClients { c.Close() - delete(app.migrateConns, k) + delete(app.migrateClients, k) } - app.migrateConnM.Unlock() + app.migrateM.Unlock() if app.httpListener != nil { app.httpListener.Close() diff --git a/server/client.go b/server/client.go index 57abc8c..5c1a9d8 100644 --- a/server/client.go +++ b/server/client.go @@ -11,25 +11,29 @@ import ( ) var txUnsupportedCmds = map[string]struct{}{ - "select": struct{}{}, - "slaveof": struct{}{}, - "fullsync": struct{}{}, - "sync": struct{}{}, - "begin": struct{}{}, - "flushall": struct{}{}, - "flushdb": struct{}{}, - "eval": struct{}{}, + "select": struct{}{}, + "slaveof": struct{}{}, + "fullsync": struct{}{}, + "sync": struct{}{}, + "begin": struct{}{}, + "flushall": struct{}{}, + "flushdb": struct{}{}, + "eval": struct{}{}, + "xmigrate": struct{}{}, + "xmigratedb": struct{}{}, } var scriptUnsupportedCmds = map[string]struct{}{ - "slaveof": struct{}{}, - "fullsync": struct{}{}, - "sync": struct{}{}, - "begin": struct{}{}, - "commit": struct{}{}, - "rollback": struct{}{}, - "flushall": struct{}{}, - "flushdb": struct{}{}, + "slaveof": struct{}{}, + "fullsync": struct{}{}, + "sync": struct{}{}, + "begin": struct{}{}, + "commit": struct{}{}, + "rollback": struct{}{}, + "flushall": struct{}{}, + "flushdb": struct{}{}, + "xmigrate": struct{}{}, + "xmigratedb": struct{}{}, } type responseWriter interface { diff --git a/server/cmd_migrate.go b/server/cmd_migrate.go index 1295bb5..3153821 100644 --- a/server/cmd_migrate.go +++ b/server/cmd_migrate.go @@ -109,16 +109,16 @@ func restoreCommand(c *client) error { func xdump(db *ledis.DB, tp string, key []byte) ([]byte, error) { var err error var data []byte - switch tp { - case "kv": + switch strings.ToUpper(tp) { + case "KV": data, err = db.Dump(key) - case "hash": + case "HASH": data, err = db.HDump(key) - case "list": + case "LIST": data, err = db.LDump(key) - case "set": + case "SET": data, err = db.SDump(key) - case "zset": + case "ZSET": data, err = db.ZDump(key) default: err = fmt.Errorf("invalid key type %s", tp) @@ -128,16 +128,16 @@ func xdump(db *ledis.DB, tp string, key []byte) ([]byte, error) { func xdel(db *ledis.DB, tp string, key []byte) error { var err error - switch tp { - case "kv": + switch strings.ToUpper(tp) { + case "KV": _, err = db.Del(key) - case "hash": + case "HASH": _, err = db.HClear(key) - case "list": + case "LIST": _, err = db.LClear(key) - case "set": + case "SET": _, err = db.SClear(key) - case "zset": + case "ZSET": _, err = db.ZClear(key) default: err = fmt.Errorf("invalid key type %s", tp) @@ -145,6 +145,40 @@ func xdel(db *ledis.DB, tp string, key []byte) error { return err } +func xttl(db *ledis.DB, tp string, key []byte) (int64, error) { + switch strings.ToUpper(tp) { + case "KV": + return db.TTL(key) + case "HASH": + return db.HTTL(key) + case "LIST": + return db.LTTL(key) + case "SET": + return db.STTL(key) + case "ZSET": + return db.ZTTL(key) + default: + return 0, fmt.Errorf("invalid key type %s", tp) + } +} + +func xscan(db *ledis.DB, tp string, count int) ([][]byte, error) { + switch strings.ToUpper(tp) { + case "KV": + return db.Scan(nil, count, false, "") + case "HASH": + return db.HScan(nil, count, false, "") + case "LIST": + return db.LScan(nil, count, false, "") + case "SET": + return db.SScan(nil, count, false, "") + case "ZSET": + return db.ZScan(nil, count, false, "") + default: + return nil, fmt.Errorf("invalid key type %s", tp) + } +} + func xdumpCommand(c *client) error { args := c.args if len(args) != 2 { @@ -162,15 +196,131 @@ func xdumpCommand(c *client) error { return nil } -//XMIGRATE host port type key destination-db timeout [COPY] -func xmigrateCommand(c *client) error { - args := c.args +func (app *App) getMigrateClient(addr string) *goledis.Client { + app.migrateM.Lock() - if len(args) != 6 && len(args) != 7 { + mc, ok := app.migrateClients[addr] + if !ok { + mc = goledis.NewClient(&goledis.Config{addr, 4, 0, 0}) + app.migrateClients[addr] = mc + + } + + app.migrateM.Unlock() + + return mc +} + +//XMIGRATEDB host port tp count db timeout +//select count tp type keys and migrate +//will block any other write operations +//maybe only for xcodis +func xmigratedbCommand(c *client) error { + args := c.args + if len(args) != 6 { return ErrCmdParams } - addr := fmt.Sprintf("%s:%d", string(args[0]), string(args[1])) + addr := fmt.Sprintf("%s:%s", string(args[0]), string(args[1])) + if addr == c.app.cfg.Addr { + //same server, can not migrate + return fmt.Errorf("migrate in same server is not allowed") + } + + tp := string(args[2]) + + count, err := ledis.StrInt64(args[3], nil) + if err != nil { + return err + } else if count <= 0 { + count = 10 + } + + db, err := ledis.StrUint64(args[4], nil) + if err != nil { + return err + } else if db >= uint64(ledis.MaxDBNumber) { + return fmt.Errorf("invalid db index %d, must < %d", db, ledis.MaxDBNumber) + } + + timeout, err := ledis.StrInt64(args[5], nil) + if err != nil { + return err + } else if timeout < 0 { + return fmt.Errorf("invalid timeout %d", timeout) + } + + m, err := c.db.Multi() + if err != nil { + return err + } + defer m.Close() + + keys, err := xscan(m.DB, tp, int(count)) + if err != nil { + return err + } else if len(keys) == 0 { + c.resp.writeInteger(0) + return nil + } + + mc := c.app.getMigrateClient(addr) + + conn := mc.Get() + + //timeout is milliseconds + t := time.Duration(timeout) * time.Millisecond + conn.SetConnectTimeout(t) + + if _, err = conn.Do("select", db); err != nil { + return err + } + + for _, key := range keys { + data, err := xdump(m.DB, tp, key) + if err != nil { + return err + } + + ttl, err := xttl(m.DB, tp, key) + if err != nil { + return err + } + + conn.SetReadDeadline(time.Now().Add(t)) + + //ttl is second, but restore need millisecond + if _, err = conn.Do("restore", key, ttl*1e3, data); err != nil { + return err + } + + if err = xdel(m.DB, tp, key); err != nil { + return err + } + + } + + c.resp.writeInteger(int64(len(keys))) + + return nil +} + +//XMIGRATE host port type key destination-db timeout +//will block any other write operations +//maybe only for xcodis +func xmigrateCommand(c *client) error { + args := c.args + + if len(args) != 6 { + return ErrCmdParams + } + + addr := fmt.Sprintf("%s:%s", string(args[0]), string(args[1])) + if addr == c.app.cfg.Addr { + //same server, can not migrate + return fmt.Errorf("migrate in same server is not allowed") + } + tp := string(args[2]) key := args[3] db, err := ledis.StrUint64(args[4], nil) @@ -179,29 +329,21 @@ func xmigrateCommand(c *client) error { } else if db >= uint64(ledis.MaxDBNumber) { return fmt.Errorf("invalid db index %d, must < %d", db, ledis.MaxDBNumber) } - var timeout int64 - timeout, err = ledis.StrInt64(args[5], nil) + + timeout, err := ledis.StrInt64(args[5], nil) if err != nil { return err } else if timeout < 0 { return fmt.Errorf("invalid timeout %d", timeout) } - onlyCopy := false - if len(args) == 7 { - if strings.ToUpper(string(args[6])) == "COPY" { - onlyCopy = true - } - } - - var m *ledis.Multi - if m, err = c.db.Multi(); err != nil { + m, err := c.db.Multi() + if err != nil { return err } defer m.Close() - var data []byte - data, err = xdump(m.DB, tp, key) + data, err := xdump(m.DB, tp, key) if err != nil { return err } else if data == nil { @@ -209,26 +351,32 @@ func xmigrateCommand(c *client) error { return nil } - c.app.migrateConnM.Lock() - defer c.app.migrateConnM.Unlock() - - conn, ok := c.app.migrateConns[addr] - if !ok { - conn = goledis.NewConn(addr) - c.app.migrateConns[addr] = conn - } - - //timeout is milliseconds - conn.SetConnectTimeout(time.Duration(timeout) * time.Millisecond) - - if _, err = conn.Do("restore", key, data); err != nil { + ttl, err := xttl(m.DB, tp, key) + if err != nil { return err } - if !onlyCopy { - if err = xdel(m.DB, tp, key); err != nil { - return err - } + mc := c.app.getMigrateClient(addr) + + conn := mc.Get() + + //timeout is milliseconds + t := time.Duration(timeout) * time.Millisecond + conn.SetConnectTimeout(t) + + if _, err = conn.Do("select", db); err != nil { + return err + } + + conn.SetReadDeadline(time.Now().Add(t)) + + //ttl is second, but restore need millisecond + if _, err = conn.Do("restore", key, ttl*1e3, data); err != nil { + return err + } + + if err = xdel(m.DB, tp, key); err != nil { + return err } c.resp.writeStatus(OK) @@ -244,4 +392,5 @@ func init() { register("restore", restoreCommand) register("xdump", xdumpCommand) register("xmigrate", xmigrateCommand) + register("xmigratedb", xmigratedbCommand) } diff --git a/server/cmd_migrate_test.go b/server/cmd_migrate_test.go index cc71c18..1dfb2c0 100644 --- a/server/cmd_migrate_test.go +++ b/server/cmd_migrate_test.go @@ -1,11 +1,15 @@ package server import ( + "fmt" "github.com/siddontang/ledisdb/client/go/ledis" + "github.com/siddontang/ledisdb/config" + "os" "testing" + "time" ) -func TestMigrate(t *testing.T) { +func TestDumpRestore(t *testing.T) { c := getTestConn() defer c.Close() @@ -31,17 +35,94 @@ func TestMigrate(t *testing.T) { t.Fatal(err) } - testMigrate(c, "dump", "mtest_a", t) - testMigrate(c, "ldump", "mtest_la", t) - testMigrate(c, "hdump", "mtest_ha", t) - testMigrate(c, "sdump", "mtest_sa", t) - testMigrate(c, "zdump", "mtest_za", t) + testDumpRestore(c, "dump", "mtest_a", t) + testDumpRestore(c, "ldump", "mtest_la", t) + testDumpRestore(c, "hdump", "mtest_ha", t) + testDumpRestore(c, "sdump", "mtest_sa", t) + testDumpRestore(c, "zdump", "mtest_za", t) } -func testMigrate(c *ledis.Conn, dump string, key string, t *testing.T) { +func testDumpRestore(c *ledis.Conn, dump string, key string, t *testing.T) { if data, err := ledis.Bytes(c.Do(dump, key)); err != nil { t.Fatal(err) } else if _, err := c.Do("restore", key, 0, data); err != nil { t.Fatal(err) } } + +func TestMigrate(t *testing.T) { + data_dir := "/tmp/test_migrate" + os.RemoveAll(data_dir) + + s1Cfg := config.NewConfigDefault() + s1Cfg.DataDir = fmt.Sprintf("%s/s1", data_dir) + s1Cfg.Addr = "127.0.0.1:11185" + + s2Cfg := config.NewConfigDefault() + s2Cfg.DataDir = fmt.Sprintf("%s/s2", data_dir) + s2Cfg.Addr = "127.0.0.1:11186" + + s1, err := NewApp(s1Cfg) + if err != nil { + t.Fatal(err) + } + defer s1.Close() + + s2, err := NewApp(s2Cfg) + if err != nil { + t.Fatal(err) + } + defer s2.Close() + + go s1.Run() + + go s2.Run() + + time.Sleep(1 * time.Second) + + c1 := ledis.NewConn(s1Cfg.Addr) + defer c1.Close() + + c2 := ledis.NewConn(s2Cfg.Addr) + defer c2.Close() + + if _, err = c1.Do("set", "a", "1"); err != nil { + t.Fatal(err) + } + + timeout := 30000 + if _, err = c1.Do("xmigrate", "127.0.0.1", 11186, "KV", "a", 0, timeout); err != nil { + t.Fatal(err) + } + + if s, err := ledis.String(c2.Do("get", "a")); err != nil { + t.Fatal(err) + } else if s != "1" { + t.Fatal(s, "must 1") + } + + if s, err := ledis.String(c1.Do("get", "a")); err != nil && err != ledis.ErrNil { + t.Fatal(err) + } else if s != "" { + t.Fatal(s, "must empty") + } + + if num, err := ledis.Int(c2.Do("xmigratedb", "127.0.0.1", 11185, "KV", 10, 0, timeout)); err != nil { + t.Fatal(err) + } else if num != 1 { + t.Fatal(num, "must number 1") + } + + if s, err := ledis.String(c1.Do("get", "a")); err != nil { + t.Fatal(err) + } else if s != "1" { + t.Fatal(s, "must 1") + } + + if s, err := ledis.String(c2.Do("get", "a")); err != nil && err != ledis.ErrNil { + t.Fatal(err) + } else if s != "" { + t.Fatal(s, "must empty") + } + +}