From ce8e0d4fdc75d40466f51b5a589e62f6f8ee569a Mon Sep 17 00:00:00 2001 From: siddontang Date: Mon, 9 Mar 2015 10:00:29 +0800 Subject: [PATCH] retry migration when key is in migrating --- server/cmd_migrate.go | 64 +++++++++++++++++++++++++++++++++---------- 1 file changed, 50 insertions(+), 14 deletions(-) diff --git a/server/cmd_migrate.go b/server/cmd_migrate.go index c3233de..afc2800 100644 --- a/server/cmd_migrate.go +++ b/server/cmd_migrate.go @@ -8,6 +8,7 @@ import ( "time" "github.com/siddontang/go/hack" + "github.com/siddontang/go/log" goledis "github.com/siddontang/ledisdb/client/goledis" "github.com/siddontang/ledisdb/ledis" ) @@ -110,6 +111,29 @@ func restoreCommand(c *client) error { return nil } +// maybe only used in xcodis for redis data port +func xrestoreCommand(c *client) error { + args := c.args + if len(args) != 4 { + return ErrCmdParams + } + + // tp := strings.ToUpper(string(args[2])) + key := args[1] + ttl, err := ledis.StrInt64(args[2], nil) + if err != nil { + return err + } + data := args[3] + + if err = c.db.Restore(key, ttl, data); err != nil { + return err + } else { + c.resp.writeStatus(OK) + } + + return nil +} func xdump(db *ledis.DB, tp string, key []byte) ([]byte, error) { var err error var data []byte @@ -393,19 +417,30 @@ func xmigrateCommand(c *client) error { } defer conn.Close() - if tp == "ALL" { - // if tp is ALL, we will migrate the key in all types - // this feature is useful for xcodis RESTORE or other commands that we don't know the data type exactly - err = migrateAllTypeKeys(c, conn, key, timeout) - } else { - err = migrateKey(c, conn, tp, key, timeout) - if err != nil { - if err == errNoKey { - c.resp.writeStatus(NOKEY) - return nil - } else { - return err - } + // if key is in migrating, we will wait 500ms and retry again + for i := 0; i < 10; i++ { + if tp == "ALL" { + // if tp is ALL, we will migrate the key in all types + // this feature is useful for xcodis RESTORE or other commands that we don't know the data type exactly + err = migrateAllTypeKeys(c, conn, key, timeout) + } else { + err = migrateKey(c, conn, tp, key, timeout) + } + + if err != errKeyInMigrating { + break + } else { + log.Infof("%s key %s is in migrating, wait 500ms and retry", tp, key) + time.Sleep(500 * time.Millisecond) + } + } + + if err != nil { + if err == errNoKey { + c.resp.writeStatus(NOKEY) + return nil + } else { + return err } } @@ -475,7 +510,7 @@ func migrateAllTypeKeys(c *client, conn *goledis.Conn, key []byte, timeout int64 for _, tp := range TypeNames { err := migrateKey(c, conn, tp, key, timeout) if err != nil { - if err == errNoKey || err == errKeyInMigrating { + if err == errNoKey { continue } else { return err @@ -493,6 +528,7 @@ func init() { register("sdump", sdumpCommand) register("zdump", zdumpCommand) register("restore", restoreCommand) + register("xrestore", xrestoreCommand) register("xdump", xdumpCommand) register("xmigrate", xmigrateCommand) register("xmigratedb", xmigratedbCommand)