From 07e3705b2210795615207c18f5e2a24e7239de64 Mon Sep 17 00:00:00 2001 From: siddontang Date: Mon, 1 Dec 2014 17:50:48 +0800 Subject: [PATCH] add migrate command, improved later --- client/go/ledis/conn.go | 11 +++- server/app.go | 14 +++++ server/cmd_migrate.go | 136 ++++++++++++++++++++++++++++++++++++++++ server/const.go | 5 +- 4 files changed, 163 insertions(+), 3 deletions(-) diff --git a/client/go/ledis/conn.go b/client/go/ledis/conn.go index 1c988b6..2b8e411 100644 --- a/client/go/ledis/conn.go +++ b/client/go/ledis/conn.go @@ -9,6 +9,7 @@ import ( "net" "strconv" "sync" + "time" ) // Error represents an error returned in a command reply. @@ -40,6 +41,8 @@ type Conn struct { // Scratch space for formatting integers and floats. numScratch [40]byte + + connectTimeout time.Duration } func NewConn(addr string) *Conn { @@ -69,6 +72,12 @@ func (c *Conn) Close() { } } +func (c *Conn) SetConnectTimeout(t time.Duration) { + c.cm.Lock() + c.connectTimeout = t + c.cm.Unlock() +} + func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) { if err := c.Send(cmd, args...); err != nil { return nil, err @@ -144,7 +153,7 @@ func (c *Conn) connect() error { } var err error - c.c, err = net.Dial(getProto(c.addr), c.addr) + c.c, err = net.DialTimeout(getProto(c.addr), c.addr, c.connectTimeout) if err != nil { c.c = nil return err diff --git a/server/app.go b/server/app.go index 393da09..09cba2c 100644 --- a/server/app.go +++ b/server/app.go @@ -1,6 +1,7 @@ package server import ( + goledis "github.com/siddontang/ledisdb/client/go/ledis" "github.com/siddontang/ledisdb/config" "github.com/siddontang/ledisdb/ledis" "net" @@ -42,6 +43,9 @@ type App struct { rcm sync.Mutex rcs map[*respClient]struct{} + + migrateConnM sync.Mutex + migrateConns map[string]*goledis.Conn } func netType(s string) string { @@ -71,6 +75,8 @@ func NewApp(cfg *config.Config) (*App, error) { app.rcs = make(map[*respClient]struct{}) + app.migrateConns = make(map[string]*goledis.Conn) + var err error if app.info, err = newInfo(app); err != nil { @@ -132,6 +138,14 @@ func (app *App) Close() { app.listener.Close() + //close all migrate connections + app.migrateConnM.Lock() + for k, c := range app.migrateConns { + c.Close() + delete(app.migrateConns, k) + } + app.migrateConnM.Unlock() + if app.httpListener != nil { app.httpListener.Close() } diff --git a/server/cmd_migrate.go b/server/cmd_migrate.go index c18f6f0..1295bb5 100644 --- a/server/cmd_migrate.go +++ b/server/cmd_migrate.go @@ -1,7 +1,11 @@ package server import ( + "fmt" + goledis "github.com/siddontang/ledisdb/client/go/ledis" "github.com/siddontang/ledisdb/ledis" + "strings" + "time" ) func dumpCommand(c *client) error { @@ -79,6 +83,7 @@ func zdumpCommand(c *client) error { return nil } +// unlike redis, restore will try to delete old key first func restoreCommand(c *client) error { args := c.args if len(args) != 3 { @@ -101,6 +106,135 @@ func restoreCommand(c *client) error { return nil } +func xdump(db *ledis.DB, tp string, key []byte) ([]byte, error) { + var err error + var data []byte + switch tp { + case "kv": + data, err = db.Dump(key) + case "hash": + data, err = db.HDump(key) + case "list": + data, err = db.LDump(key) + case "set": + data, err = db.SDump(key) + case "zset": + data, err = db.ZDump(key) + default: + err = fmt.Errorf("invalid key type %s", tp) + } + return data, err +} + +func xdel(db *ledis.DB, tp string, key []byte) error { + var err error + switch tp { + case "kv": + _, err = db.Del(key) + case "hash": + _, err = db.HClear(key) + case "list": + _, err = db.LClear(key) + case "set": + _, err = db.SClear(key) + case "zset": + _, err = db.ZClear(key) + default: + err = fmt.Errorf("invalid key type %s", tp) + } + return err +} + +func xdumpCommand(c *client) error { + args := c.args + if len(args) != 2 { + return ErrCmdParams + } + + tp := string(args[0]) + key := args[1] + + if data, err := xdump(c.db, tp, key); err != nil { + return err + } else { + c.resp.writeBulk(data) + } + return nil +} + +//XMIGRATE host port type key destination-db timeout [COPY] +func xmigrateCommand(c *client) error { + args := c.args + + if len(args) != 6 && len(args) != 7 { + return ErrCmdParams + } + + addr := fmt.Sprintf("%s:%d", string(args[0]), string(args[1])) + tp := string(args[2]) + key := args[3] + db, err := ledis.StrUint64(args[4], nil) + if err != nil { + return err + } else if db >= uint64(ledis.MaxDBNumber) { + return fmt.Errorf("invalid db index %d, must < %d", db, ledis.MaxDBNumber) + } + var timeout int64 + timeout, err = ledis.StrInt64(args[5], nil) + if err != nil { + return err + } else if timeout < 0 { + return fmt.Errorf("invalid timeout %d", timeout) + } + + onlyCopy := false + if len(args) == 7 { + if strings.ToUpper(string(args[6])) == "COPY" { + onlyCopy = true + } + } + + var m *ledis.Multi + if m, err = c.db.Multi(); err != nil { + return err + } + defer m.Close() + + var data []byte + data, err = xdump(m.DB, tp, key) + if err != nil { + return err + } else if data == nil { + c.resp.writeStatus(NOKEY) + return nil + } + + c.app.migrateConnM.Lock() + defer c.app.migrateConnM.Unlock() + + conn, ok := c.app.migrateConns[addr] + if !ok { + conn = goledis.NewConn(addr) + c.app.migrateConns[addr] = conn + } + + //timeout is milliseconds + conn.SetConnectTimeout(time.Duration(timeout) * time.Millisecond) + + if _, err = conn.Do("restore", key, data); err != nil { + return err + } + + if !onlyCopy { + if err = xdel(m.DB, tp, key); err != nil { + return err + } + } + + c.resp.writeStatus(OK) + return nil +} + func init() { register("dump", dumpCommand) register("ldump", ldumpCommand) @@ -108,4 +242,6 @@ func init() { register("sdump", sdumpCommand) register("zdump", zdumpCommand) register("restore", restoreCommand) + register("xdump", xdumpCommand) + register("xmigrate", xmigrateCommand) } diff --git a/server/const.go b/server/const.go index f1123e3..dc55e24 100644 --- a/server/const.go +++ b/server/const.go @@ -20,8 +20,9 @@ var ( NullBulk = []byte("-1") NullArray = []byte("-1") - PONG = "PONG" - OK = "OK" + PONG = "PONG" + OK = "OK" + NOKEY = "NOKEY" ) const (