From 4eeaffbc00417258f732a13f2fb9a5b1b7254f82 Mon Sep 17 00:00:00 2001 From: siddontang Date: Sat, 7 Mar 2015 16:51:27 +0800 Subject: [PATCH] simplify migration code handle ALL for xmigrate type --- server/cmd_migrate.go | 145 ++++++++++++++++++++++--------------- server/cmd_migrate_test.go | 15 ++++ server/const.go | 2 + 3 files changed, 102 insertions(+), 60 deletions(-) diff --git a/server/cmd_migrate.go b/server/cmd_migrate.go index 51430d4..c3233de 100644 --- a/server/cmd_migrate.go +++ b/server/cmd_migrate.go @@ -1,6 +1,7 @@ package server import ( + "errors" "fmt" "strings" "sync" @@ -301,11 +302,9 @@ func xmigratedbCommand(c *client) error { count = 10 } - db, err := ledis.StrUint64(args[4], nil) + db, err := parseMigrateDB(c, args[4]) if err != nil { return err - } else if db >= uint64(c.app.cfg.Databases) { - return fmt.Errorf("invalid db index %d, must < %d", db, c.app.cfg.Databases) } timeout, err := ledis.StrInt64(args[5], nil) @@ -323,52 +322,21 @@ func xmigratedbCommand(c *client) error { return nil } - mc := c.app.getMigrateClient(addr) - - conn, err := mc.Get() + conn, err := getMigrateDBConn(c, addr, db) if err != nil { return err } defer conn.Close() - //timeout is milliseconds - t := time.Duration(timeout) * time.Millisecond - - if _, err = conn.Do("select", db); err != nil { - return err - } - migrateNum := int64(0) for _, key := range keys { - if !c.app.migrateKeyLock(tp, key) { - // other may also migrate this key, skip it - continue - } - - defer c.app.migrateKeyUnlock(tp, key) - - data, err := xdump(c.db, tp, key) + err = migrateKey(c, conn, tp, key, timeout) if err != nil { - return err - } else if data == nil { - // no key now, skip it - continue - } - - ttl, err := xttl(c.db, tp, key) - if err != nil { - return err - } - - conn.SetReadDeadline(time.Now().Add(t)) - - //ttl is second, but restore need millisecond - if _, err = conn.Do("restore", key, ttl*1e3, data); err != nil { - return err - } - - if err = xdel(c.db, tp, key); err != nil { - return err + if err == errNoKey || err == errKeyInMigrating { + continue + } else { + return err + } } migrateNum++ @@ -379,6 +347,16 @@ func xmigratedbCommand(c *client) error { return nil } +func parseMigrateDB(c *client, arg []byte) (uint64, error) { + db, err := ledis.StrUint64(arg, nil) + if err != nil { + return 0, err + } else if db >= uint64(c.app.cfg.Databases) { + return 0, fmt.Errorf("invalid db index %d, must < %d", db, c.app.cfg.Databases) + } + return db, nil +} + //XMIGRATE host port type key destination-db timeout //will block any other write operations //maybe only for xcodis @@ -397,11 +375,9 @@ func xmigrateCommand(c *client) error { tp := strings.ToUpper(string(args[2])) key := args[3] - db, err := ledis.StrUint64(args[4], nil) + db, err := parseMigrateDB(c, args[4]) if err != nil { return err - } else if db >= uint64(c.app.cfg.Databases) { - return fmt.Errorf("invalid db index %d, must < %d", db, c.app.cfg.Databases) } timeout, err := ledis.StrInt64(args[5], nil) @@ -411,9 +387,57 @@ func xmigrateCommand(c *client) error { return fmt.Errorf("invalid timeout %d", timeout) } + conn, err := getMigrateDBConn(c, addr, db) + if err != nil { + return err + } + defer conn.Close() + + if tp == "ALL" { + // if tp is ALL, we will migrate the key in all types + // this feature is useful for xcodis RESTORE or other commands that we don't know the data type exactly + err = migrateAllTypeKeys(c, conn, key, timeout) + } else { + err = migrateKey(c, conn, tp, key, timeout) + if err != nil { + if err == errNoKey { + c.resp.writeStatus(NOKEY) + return nil + } else { + return err + } + } + } + + c.resp.writeStatus(OK) + return nil +} + +func getMigrateDBConn(c *client, addr string, db uint64) (*goledis.Conn, error) { + mc := c.app.getMigrateClient(addr) + + conn, err := mc.Get() + if err != nil { + return nil, err + } + + if _, err = conn.Do("select", db); err != nil { + conn.Close() + return nil, err + } + + return conn, nil +} + +var ( + errNoKey = errors.New("migrate key is not exists") + errKeyInMigrating = errors.New("key is in migrating yet") +) + +func migrateKey(c *client, conn *goledis.Conn, tp string, key []byte, timeout int64) error { if !c.app.migrateKeyLock(tp, key) { // other may also migrate this key, skip it - return fmt.Errorf("%s %s is in migrating yet", tp, key) + return errKeyInMigrating } defer c.app.migrateKeyUnlock(tp, key) @@ -422,8 +446,7 @@ func xmigrateCommand(c *client) error { if err != nil { return err } else if data == nil { - c.resp.writeStatus(NOKEY) - return nil + return errNoKey } ttl, err := xttl(c.db, tp, key) @@ -431,21 +454,9 @@ func xmigrateCommand(c *client) error { return err } - mc := c.app.getMigrateClient(addr) - - conn, err := mc.Get() - if err != nil { - return err - } - defer conn.Close() - //timeout is milliseconds t := time.Duration(timeout) * time.Millisecond - if _, err = conn.Do("select", db); err != nil { - return err - } - conn.SetReadDeadline(time.Now().Add(t)) //ttl is second, but restore need millisecond @@ -457,7 +468,21 @@ func xmigrateCommand(c *client) error { return err } - c.resp.writeStatus(OK) + return nil +} + +func migrateAllTypeKeys(c *client, conn *goledis.Conn, key []byte, timeout int64) error { + for _, tp := range TypeNames { + err := migrateKey(c, conn, tp, key, timeout) + if err != nil { + if err == errNoKey || err == errKeyInMigrating { + continue + } else { + return err + } + } + } + return nil } diff --git a/server/cmd_migrate_test.go b/server/cmd_migrate_test.go index 0bfc6cb..0545676 100644 --- a/server/cmd_migrate_test.go +++ b/server/cmd_migrate_test.go @@ -125,4 +125,19 @@ func TestMigrate(t *testing.T) { t.Fatal(s, "must empty") } + if _, err = c1.Do("xmigrate", "127.0.0.1", 11186, "ALL", "a", 0, timeout); err != nil { + t.Fatal(err) + } + + if s, err := ledis.String(c2.Do("get", "a")); err != nil { + t.Fatal(err) + } else if s != "1" { + t.Fatal(s, "must 1") + } + + if s, err := ledis.String(c1.Do("get", "a")); err != nil && err != ledis.ErrNil { + t.Fatal(err) + } else if s != "" { + t.Fatal(s, "must empty") + } } diff --git a/server/const.go b/server/const.go index 4d72743..d5c7308 100644 --- a/server/const.go +++ b/server/const.go @@ -47,3 +47,5 @@ const ( MB uint64 = 1024 * 1024 KB uint64 = 1024 ) + +var TypeNames = []string{KVName, ListName, HashName, SetName, ZSetName}