diff --git a/server/app.go b/server/app.go index 12ef2d7..4917afc 100644 --- a/server/app.go +++ b/server/app.go @@ -44,8 +44,9 @@ type App struct { rcm sync.Mutex rcs map[*respClient]struct{} - migrateM sync.Mutex - migrateClients map[string]*goledis.Client + migrateM sync.Mutex + migrateClients map[string]*goledis.Client + migrateKeyLockers map[string]*migrateKeyLocker } func netType(s string) string { @@ -76,6 +77,7 @@ func NewApp(cfg *config.Config) (*App, error) { app.rcs = make(map[*respClient]struct{}) app.migrateClients = make(map[string]*goledis.Client) + app.newMigrateKeyLockers() var err error diff --git a/server/cmd_migrate.go b/server/cmd_migrate.go index ffe47c7..51430d4 100644 --- a/server/cmd_migrate.go +++ b/server/cmd_migrate.go @@ -2,10 +2,13 @@ package server import ( "fmt" + "strings" + "sync" + "time" + + "github.com/siddontang/go/hack" goledis "github.com/siddontang/ledisdb/client/goledis" "github.com/siddontang/ledisdb/ledis" - "strings" - "time" ) func dumpCommand(c *client) error { @@ -110,15 +113,15 @@ func xdump(db *ledis.DB, tp string, key []byte) ([]byte, error) { var err error var data []byte switch strings.ToUpper(tp) { - case "KV": + case KVName: data, err = db.Dump(key) - case "HASH": + case HashName: data, err = db.HDump(key) - case "LIST": + case ListName: data, err = db.LDump(key) - case "SET": + case SetName: data, err = db.SDump(key) - case "ZSET": + case ZSetName: data, err = db.ZDump(key) default: err = fmt.Errorf("invalid key type %s", tp) @@ -129,15 +132,15 @@ func xdump(db *ledis.DB, tp string, key []byte) ([]byte, error) { func xdel(db *ledis.DB, tp string, key []byte) error { var err error switch strings.ToUpper(tp) { - case "KV": + case KVName: _, err = db.Del(key) - case "HASH": + case HashName: _, err = db.HClear(key) - case "LIST": + case ListName: _, err = db.LClear(key) - case "SET": + case SetName: _, err = db.SClear(key) - case "ZSET": + case ZSetName: _, err = db.ZClear(key) default: err = fmt.Errorf("invalid key type %s", tp) @@ -147,15 +150,15 @@ func xdel(db *ledis.DB, tp string, key []byte) error { func xttl(db *ledis.DB, tp string, key []byte) (int64, error) { switch strings.ToUpper(tp) { - case "KV": + case KVName: return db.TTL(key) - case "HASH": + case HashName: return db.HTTL(key) - case "LIST": + case ListName: return db.LTTL(key) - case "SET": + case SetName: return db.STTL(key) - case "ZSET": + case ZSetName: return db.ZTTL(key) default: return 0, fmt.Errorf("invalid key type %s", tp) @@ -164,15 +167,15 @@ func xttl(db *ledis.DB, tp string, key []byte) (int64, error) { func xscan(db *ledis.DB, tp string, count int) ([][]byte, error) { switch strings.ToUpper(tp) { - case "KV": + case KVName: return db.Scan(KV, nil, count, false, "") - case "HASH": + case HashName: return db.Scan(HASH, nil, count, false, "") - case "LIST": + case ListName: return db.Scan(LIST, nil, count, false, "") - case "SET": + case SetName: return db.Scan(SET, nil, count, false, "") - case "ZSET": + case ZSetName: return db.Scan(ZSET, nil, count, false, "") default: return nil, fmt.Errorf("invalid key type %s", tp) @@ -185,7 +188,7 @@ func xdumpCommand(c *client) error { return ErrCmdParams } - tp := string(args[0]) + tp := strings.ToUpper(string(args[0])) key := args[1] if data, err := xdump(c.db, tp, key); err != nil { @@ -211,6 +214,68 @@ func (app *App) getMigrateClient(addr string) *goledis.Client { return mc } +type migrateKeyLocker struct { + m sync.Mutex + + locks map[string]struct{} +} + +func (m *migrateKeyLocker) Lock(key []byte) bool { + m.m.Lock() + defer m.m.Unlock() + + k := hack.String(key) + _, ok := m.locks[k] + if ok { + return false + } + m.locks[k] = struct{}{} + return true +} + +func (m *migrateKeyLocker) Unlock(key []byte) { + m.m.Lock() + defer m.m.Unlock() + + delete(m.locks, hack.String(key)) +} + +func newMigrateKeyLocker() *migrateKeyLocker { + m := new(migrateKeyLocker) + + m.locks = make(map[string]struct{}) + + return m +} + +func (a *App) newMigrateKeyLockers() { + a.migrateKeyLockers = make(map[string]*migrateKeyLocker) + + a.migrateKeyLockers[KVName] = newMigrateKeyLocker() + a.migrateKeyLockers[HashName] = newMigrateKeyLocker() + a.migrateKeyLockers[ListName] = newMigrateKeyLocker() + a.migrateKeyLockers[SetName] = newMigrateKeyLocker() + a.migrateKeyLockers[ZSetName] = newMigrateKeyLocker() +} + +func (a *App) migrateKeyLock(tp string, key []byte) bool { + l, ok := a.migrateKeyLockers[strings.ToUpper(tp)] + if !ok { + return false + } + + return l.Lock(key) +} + +func (a *App) migrateKeyUnlock(tp string, key []byte) { + l, ok := a.migrateKeyLockers[strings.ToUpper(tp)] + if !ok { + return + } + + l.Unlock(key) +} + //XMIGRATEDB host port tp count db timeout //select count tp type keys and migrate //will block any other write operations @@ -227,7 +292,7 @@ func xmigratedbCommand(c *client) error { return fmt.Errorf("migrate in same server is not allowed") } - tp := string(args[2]) + tp := strings.ToUpper(string(args[2])) count, err := ledis.StrInt64(args[3], nil) if err != nil { @@ -250,13 +315,7 @@ func xmigratedbCommand(c *client) error { return fmt.Errorf("invalid timeout %d", timeout) } - m, err := c.db.Multi() - if err != nil { - return err - } - defer m.Close() - - keys, err := xscan(m.DB, tp, int(count)) + keys, err := xscan(c.db, tp, int(count)) if err != nil { return err } else if len(keys) == 0 { @@ -270,6 +329,7 @@ func xmigratedbCommand(c *client) error { if err != nil { return err } + defer conn.Close() //timeout is milliseconds t := time.Duration(timeout) * time.Millisecond @@ -278,13 +338,24 @@ func xmigratedbCommand(c *client) error { return err } + migrateNum := int64(0) for _, key := range keys { - data, err := xdump(m.DB, tp, key) - if err != nil { - return err + if !c.app.migrateKeyLock(tp, key) { + // other may also migrate this key, skip it + continue } - ttl, err := xttl(m.DB, tp, key) + defer c.app.migrateKeyUnlock(tp, key) + + data, err := xdump(c.db, tp, key) + if err != nil { + return err + } else if data == nil { + // no key now, skip it + continue + } + + ttl, err := xttl(c.db, tp, key) if err != nil { return err } @@ -296,13 +367,14 @@ func xmigratedbCommand(c *client) error { return err } - if err = xdel(m.DB, tp, key); err != nil { + if err = xdel(c.db, tp, key); err != nil { return err } + migrateNum++ } - c.resp.writeInteger(int64(len(keys))) + c.resp.writeInteger(migrateNum) return nil } @@ -323,7 +395,7 @@ func xmigrateCommand(c *client) error { return fmt.Errorf("migrate in same server is not allowed") } - tp := string(args[2]) + tp := strings.ToUpper(string(args[2])) key := args[3] db, err := ledis.StrUint64(args[4], nil) if err != nil { @@ -339,13 +411,14 @@ func xmigrateCommand(c *client) error { return fmt.Errorf("invalid timeout %d", timeout) } - m, err := c.db.Multi() - if err != nil { - return err + if !c.app.migrateKeyLock(tp, key) { + // other may also migrate this key, skip it + return fmt.Errorf("%s %s is in migrating yet", tp, key) } - defer m.Close() - data, err := xdump(m.DB, tp, key) + defer c.app.migrateKeyUnlock(tp, key) + + data, err := xdump(c.db, tp, key) if err != nil { return err } else if data == nil { @@ -353,7 +426,7 @@ func xmigrateCommand(c *client) error { return nil } - ttl, err := xttl(m.DB, tp, key) + ttl, err := xttl(c.db, tp, key) if err != nil { return err } @@ -364,6 +437,7 @@ func xmigrateCommand(c *client) error { if err != nil { return err } + defer conn.Close() //timeout is milliseconds t := time.Duration(timeout) * time.Millisecond @@ -379,7 +453,7 @@ func xmigrateCommand(c *client) error { return err } - if err = xdel(m.DB, tp, key); err != nil { + if err = xdel(c.db, tp, key); err != nil { return err } diff --git a/server/const.go b/server/const.go index 9804ad7..4d72743 100644 --- a/server/const.go +++ b/server/const.go @@ -34,6 +34,14 @@ const ( ZSET = ledis.ZSET ) +const ( + KVName = ledis.KVName + ListName = ledis.ListName + HashName = ledis.HashName + SetName = ledis.SetName + ZSetName = ledis.ZSetName +) + const ( GB uint64 = 1024 * 1024 * 1024 MB uint64 = 1024 * 1024