forked from mirror/ledisdb
parent
c6c82b979d
commit
4eeaffbc00
|
@ -1,6 +1,7 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -301,11 +302,9 @@ func xmigratedbCommand(c *client) error {
|
|||
count = 10
|
||||
}
|
||||
|
||||
db, err := ledis.StrUint64(args[4], nil)
|
||||
db, err := parseMigrateDB(c, args[4])
|
||||
if err != nil {
|
||||
return err
|
||||
} else if db >= uint64(c.app.cfg.Databases) {
|
||||
return fmt.Errorf("invalid db index %d, must < %d", db, c.app.cfg.Databases)
|
||||
}
|
||||
|
||||
timeout, err := ledis.StrInt64(args[5], nil)
|
||||
|
@ -323,52 +322,21 @@ func xmigratedbCommand(c *client) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
mc := c.app.getMigrateClient(addr)
|
||||
|
||||
conn, err := mc.Get()
|
||||
conn, err := getMigrateDBConn(c, addr, db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
//timeout is milliseconds
|
||||
t := time.Duration(timeout) * time.Millisecond
|
||||
|
||||
if _, err = conn.Do("select", db); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
migrateNum := int64(0)
|
||||
for _, key := range keys {
|
||||
if !c.app.migrateKeyLock(tp, key) {
|
||||
// other may also migrate this key, skip it
|
||||
continue
|
||||
}
|
||||
|
||||
defer c.app.migrateKeyUnlock(tp, key)
|
||||
|
||||
data, err := xdump(c.db, tp, key)
|
||||
err = migrateKey(c, conn, tp, key, timeout)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if data == nil {
|
||||
// no key now, skip it
|
||||
if err == errNoKey || err == errKeyInMigrating {
|
||||
continue
|
||||
}
|
||||
|
||||
ttl, err := xttl(c.db, tp, key)
|
||||
if err != nil {
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
|
||||
conn.SetReadDeadline(time.Now().Add(t))
|
||||
|
||||
//ttl is second, but restore need millisecond
|
||||
if _, err = conn.Do("restore", key, ttl*1e3, data); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = xdel(c.db, tp, key); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
migrateNum++
|
||||
|
@ -379,6 +347,16 @@ func xmigratedbCommand(c *client) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func parseMigrateDB(c *client, arg []byte) (uint64, error) {
|
||||
db, err := ledis.StrUint64(arg, nil)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
} else if db >= uint64(c.app.cfg.Databases) {
|
||||
return 0, fmt.Errorf("invalid db index %d, must < %d", db, c.app.cfg.Databases)
|
||||
}
|
||||
return db, nil
|
||||
}
|
||||
|
||||
//XMIGRATE host port type key destination-db timeout
|
||||
//will block any other write operations
|
||||
//maybe only for xcodis
|
||||
|
@ -397,11 +375,9 @@ func xmigrateCommand(c *client) error {
|
|||
|
||||
tp := strings.ToUpper(string(args[2]))
|
||||
key := args[3]
|
||||
db, err := ledis.StrUint64(args[4], nil)
|
||||
db, err := parseMigrateDB(c, args[4])
|
||||
if err != nil {
|
||||
return err
|
||||
} else if db >= uint64(c.app.cfg.Databases) {
|
||||
return fmt.Errorf("invalid db index %d, must < %d", db, c.app.cfg.Databases)
|
||||
}
|
||||
|
||||
timeout, err := ledis.StrInt64(args[5], nil)
|
||||
|
@ -411,9 +387,57 @@ func xmigrateCommand(c *client) error {
|
|||
return fmt.Errorf("invalid timeout %d", timeout)
|
||||
}
|
||||
|
||||
conn, err := getMigrateDBConn(c, addr, db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
if tp == "ALL" {
|
||||
// if tp is ALL, we will migrate the key in all types
|
||||
// this feature is useful for xcodis RESTORE or other commands that we don't know the data type exactly
|
||||
err = migrateAllTypeKeys(c, conn, key, timeout)
|
||||
} else {
|
||||
err = migrateKey(c, conn, tp, key, timeout)
|
||||
if err != nil {
|
||||
if err == errNoKey {
|
||||
c.resp.writeStatus(NOKEY)
|
||||
return nil
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
c.resp.writeStatus(OK)
|
||||
return nil
|
||||
}
|
||||
|
||||
func getMigrateDBConn(c *client, addr string, db uint64) (*goledis.Conn, error) {
|
||||
mc := c.app.getMigrateClient(addr)
|
||||
|
||||
conn, err := mc.Get()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if _, err = conn.Do("select", db); err != nil {
|
||||
conn.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
var (
|
||||
errNoKey = errors.New("migrate key is not exists")
|
||||
errKeyInMigrating = errors.New("key is in migrating yet")
|
||||
)
|
||||
|
||||
func migrateKey(c *client, conn *goledis.Conn, tp string, key []byte, timeout int64) error {
|
||||
if !c.app.migrateKeyLock(tp, key) {
|
||||
// other may also migrate this key, skip it
|
||||
return fmt.Errorf("%s %s is in migrating yet", tp, key)
|
||||
return errKeyInMigrating
|
||||
}
|
||||
|
||||
defer c.app.migrateKeyUnlock(tp, key)
|
||||
|
@ -422,8 +446,7 @@ func xmigrateCommand(c *client) error {
|
|||
if err != nil {
|
||||
return err
|
||||
} else if data == nil {
|
||||
c.resp.writeStatus(NOKEY)
|
||||
return nil
|
||||
return errNoKey
|
||||
}
|
||||
|
||||
ttl, err := xttl(c.db, tp, key)
|
||||
|
@ -431,21 +454,9 @@ func xmigrateCommand(c *client) error {
|
|||
return err
|
||||
}
|
||||
|
||||
mc := c.app.getMigrateClient(addr)
|
||||
|
||||
conn, err := mc.Get()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
//timeout is milliseconds
|
||||
t := time.Duration(timeout) * time.Millisecond
|
||||
|
||||
if _, err = conn.Do("select", db); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
conn.SetReadDeadline(time.Now().Add(t))
|
||||
|
||||
//ttl is second, but restore need millisecond
|
||||
|
@ -457,7 +468,21 @@ func xmigrateCommand(c *client) error {
|
|||
return err
|
||||
}
|
||||
|
||||
c.resp.writeStatus(OK)
|
||||
return nil
|
||||
}
|
||||
|
||||
func migrateAllTypeKeys(c *client, conn *goledis.Conn, key []byte, timeout int64) error {
|
||||
for _, tp := range TypeNames {
|
||||
err := migrateKey(c, conn, tp, key, timeout)
|
||||
if err != nil {
|
||||
if err == errNoKey || err == errKeyInMigrating {
|
||||
continue
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -125,4 +125,19 @@ func TestMigrate(t *testing.T) {
|
|||
t.Fatal(s, "must empty")
|
||||
}
|
||||
|
||||
if _, err = c1.Do("xmigrate", "127.0.0.1", 11186, "ALL", "a", 0, timeout); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if s, err := ledis.String(c2.Do("get", "a")); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if s != "1" {
|
||||
t.Fatal(s, "must 1")
|
||||
}
|
||||
|
||||
if s, err := ledis.String(c1.Do("get", "a")); err != nil && err != ledis.ErrNil {
|
||||
t.Fatal(err)
|
||||
} else if s != "" {
|
||||
t.Fatal(s, "must empty")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -47,3 +47,5 @@ const (
|
|||
MB uint64 = 1024 * 1024
|
||||
KB uint64 = 1024
|
||||
)
|
||||
|
||||
var TypeNames = []string{KVName, ListName, HashName, SetName, ZSetName}
|
||||
|
|
Loading…
Reference in New Issue