Use key lock when migration

https://github.com/siddontang/ledisdb/issues/141

Now migration is only safe for xcodis, but I think it does not matter.
This commit is contained in:
siddontang 2015-03-07 12:47:35 +08:00
parent acabb99368
commit c6c82b979d
3 changed files with 131 additions and 47 deletions

View File

@ -44,8 +44,9 @@ type App struct {
rcm sync.Mutex
rcs map[*respClient]struct{}
migrateM sync.Mutex
migrateClients map[string]*goledis.Client
migrateM sync.Mutex
migrateClients map[string]*goledis.Client
migrateKeyLockers map[string]*migrateKeyLocker
}
func netType(s string) string {
@ -76,6 +77,7 @@ func NewApp(cfg *config.Config) (*App, error) {
app.rcs = make(map[*respClient]struct{})
app.migrateClients = make(map[string]*goledis.Client)
app.newMigrateKeyLockers()
var err error

View File

@ -2,10 +2,13 @@ package server
import (
"fmt"
"strings"
"sync"
"time"
"github.com/siddontang/go/hack"
goledis "github.com/siddontang/ledisdb/client/goledis"
"github.com/siddontang/ledisdb/ledis"
"strings"
"time"
)
func dumpCommand(c *client) error {
@ -110,15 +113,15 @@ func xdump(db *ledis.DB, tp string, key []byte) ([]byte, error) {
var err error
var data []byte
switch strings.ToUpper(tp) {
case "KV":
case KVName:
data, err = db.Dump(key)
case "HASH":
case HashName:
data, err = db.HDump(key)
case "LIST":
case ListName:
data, err = db.LDump(key)
case "SET":
case SetName:
data, err = db.SDump(key)
case "ZSET":
case ZSetName:
data, err = db.ZDump(key)
default:
err = fmt.Errorf("invalid key type %s", tp)
@ -129,15 +132,15 @@ func xdump(db *ledis.DB, tp string, key []byte) ([]byte, error) {
func xdel(db *ledis.DB, tp string, key []byte) error {
var err error
switch strings.ToUpper(tp) {
case "KV":
case KVName:
_, err = db.Del(key)
case "HASH":
case HashName:
_, err = db.HClear(key)
case "LIST":
case ListName:
_, err = db.LClear(key)
case "SET":
case SetName:
_, err = db.SClear(key)
case "ZSET":
case ZSetName:
_, err = db.ZClear(key)
default:
err = fmt.Errorf("invalid key type %s", tp)
@ -147,15 +150,15 @@ func xdel(db *ledis.DB, tp string, key []byte) error {
func xttl(db *ledis.DB, tp string, key []byte) (int64, error) {
switch strings.ToUpper(tp) {
case "KV":
case KVName:
return db.TTL(key)
case "HASH":
case HashName:
return db.HTTL(key)
case "LIST":
case ListName:
return db.LTTL(key)
case "SET":
case SetName:
return db.STTL(key)
case "ZSET":
case ZSetName:
return db.ZTTL(key)
default:
return 0, fmt.Errorf("invalid key type %s", tp)
@ -164,15 +167,15 @@ func xttl(db *ledis.DB, tp string, key []byte) (int64, error) {
func xscan(db *ledis.DB, tp string, count int) ([][]byte, error) {
switch strings.ToUpper(tp) {
case "KV":
case KVName:
return db.Scan(KV, nil, count, false, "")
case "HASH":
case HashName:
return db.Scan(HASH, nil, count, false, "")
case "LIST":
case ListName:
return db.Scan(LIST, nil, count, false, "")
case "SET":
case SetName:
return db.Scan(SET, nil, count, false, "")
case "ZSET":
case ZSetName:
return db.Scan(ZSET, nil, count, false, "")
default:
return nil, fmt.Errorf("invalid key type %s", tp)
@ -185,7 +188,7 @@ func xdumpCommand(c *client) error {
return ErrCmdParams
}
tp := string(args[0])
tp := strings.ToUpper(string(args[0]))
key := args[1]
if data, err := xdump(c.db, tp, key); err != nil {
@ -211,6 +214,68 @@ func (app *App) getMigrateClient(addr string) *goledis.Client {
return mc
}
type migrateKeyLocker struct {
m sync.Mutex
locks map[string]struct{}
}
func (m *migrateKeyLocker) Lock(key []byte) bool {
m.m.Lock()
defer m.m.Unlock()
k := hack.String(key)
_, ok := m.locks[k]
if ok {
return false
}
m.locks[k] = struct{}{}
return true
}
func (m *migrateKeyLocker) Unlock(key []byte) {
m.m.Lock()
defer m.m.Unlock()
delete(m.locks, hack.String(key))
}
func newMigrateKeyLocker() *migrateKeyLocker {
m := new(migrateKeyLocker)
m.locks = make(map[string]struct{})
return m
}
func (a *App) newMigrateKeyLockers() {
a.migrateKeyLockers = make(map[string]*migrateKeyLocker)
a.migrateKeyLockers[KVName] = newMigrateKeyLocker()
a.migrateKeyLockers[HashName] = newMigrateKeyLocker()
a.migrateKeyLockers[ListName] = newMigrateKeyLocker()
a.migrateKeyLockers[SetName] = newMigrateKeyLocker()
a.migrateKeyLockers[ZSetName] = newMigrateKeyLocker()
}
func (a *App) migrateKeyLock(tp string, key []byte) bool {
l, ok := a.migrateKeyLockers[strings.ToUpper(tp)]
if !ok {
return false
}
return l.Lock(key)
}
func (a *App) migrateKeyUnlock(tp string, key []byte) {
l, ok := a.migrateKeyLockers[strings.ToUpper(tp)]
if !ok {
return
}
l.Unlock(key)
}
//XMIGRATEDB host port tp count db timeout
//select count tp type keys and migrate
//will block any other write operations
@ -227,7 +292,7 @@ func xmigratedbCommand(c *client) error {
return fmt.Errorf("migrate in same server is not allowed")
}
tp := string(args[2])
tp := strings.ToUpper(string(args[2]))
count, err := ledis.StrInt64(args[3], nil)
if err != nil {
@ -250,13 +315,7 @@ func xmigratedbCommand(c *client) error {
return fmt.Errorf("invalid timeout %d", timeout)
}
m, err := c.db.Multi()
if err != nil {
return err
}
defer m.Close()
keys, err := xscan(m.DB, tp, int(count))
keys, err := xscan(c.db, tp, int(count))
if err != nil {
return err
} else if len(keys) == 0 {
@ -270,6 +329,7 @@ func xmigratedbCommand(c *client) error {
if err != nil {
return err
}
defer conn.Close()
//timeout is milliseconds
t := time.Duration(timeout) * time.Millisecond
@ -278,13 +338,24 @@ func xmigratedbCommand(c *client) error {
return err
}
migrateNum := int64(0)
for _, key := range keys {
data, err := xdump(m.DB, tp, key)
if err != nil {
return err
if !c.app.migrateKeyLock(tp, key) {
// other may also migrate this key, skip it
continue
}
ttl, err := xttl(m.DB, tp, key)
defer c.app.migrateKeyUnlock(tp, key)
data, err := xdump(c.db, tp, key)
if err != nil {
return err
} else if data == nil {
// no key now, skip it
continue
}
ttl, err := xttl(c.db, tp, key)
if err != nil {
return err
}
@ -296,13 +367,14 @@ func xmigratedbCommand(c *client) error {
return err
}
if err = xdel(m.DB, tp, key); err != nil {
if err = xdel(c.db, tp, key); err != nil {
return err
}
migrateNum++
}
c.resp.writeInteger(int64(len(keys)))
c.resp.writeInteger(migrateNum)
return nil
}
@ -323,7 +395,7 @@ func xmigrateCommand(c *client) error {
return fmt.Errorf("migrate in same server is not allowed")
}
tp := string(args[2])
tp := strings.ToUpper(string(args[2]))
key := args[3]
db, err := ledis.StrUint64(args[4], nil)
if err != nil {
@ -339,13 +411,14 @@ func xmigrateCommand(c *client) error {
return fmt.Errorf("invalid timeout %d", timeout)
}
m, err := c.db.Multi()
if err != nil {
return err
if !c.app.migrateKeyLock(tp, key) {
// other may also migrate this key, skip it
return fmt.Errorf("%s %s is in migrating yet", tp, key)
}
defer m.Close()
data, err := xdump(m.DB, tp, key)
defer c.app.migrateKeyUnlock(tp, key)
data, err := xdump(c.db, tp, key)
if err != nil {
return err
} else if data == nil {
@ -353,7 +426,7 @@ func xmigrateCommand(c *client) error {
return nil
}
ttl, err := xttl(m.DB, tp, key)
ttl, err := xttl(c.db, tp, key)
if err != nil {
return err
}
@ -364,6 +437,7 @@ func xmigrateCommand(c *client) error {
if err != nil {
return err
}
defer conn.Close()
//timeout is milliseconds
t := time.Duration(timeout) * time.Millisecond
@ -379,7 +453,7 @@ func xmigrateCommand(c *client) error {
return err
}
if err = xdel(m.DB, tp, key); err != nil {
if err = xdel(c.db, tp, key); err != nil {
return err
}

View File

@ -34,6 +34,14 @@ const (
ZSET = ledis.ZSET
)
const (
KVName = ledis.KVName
ListName = ledis.ListName
HashName = ledis.HashName
SetName = ledis.SetName
ZSetName = ledis.ZSetName
)
const (
GB uint64 = 1024 * 1024 * 1024
MB uint64 = 1024 * 1024