add xmigrate

xmigrate will be used in xcodis
This commit is contained in:
siddontang 2014-12-03 16:27:52 +08:00
parent 07e3705b22
commit 3055e6b2fc
5 changed files with 346 additions and 78 deletions

View File

@ -8,6 +8,7 @@ import (
"io" "io"
"net" "net"
"strconv" "strconv"
"strings"
"sync" "sync"
"time" "time"
) )
@ -78,6 +79,22 @@ func (c *Conn) SetConnectTimeout(t time.Duration) {
c.cm.Unlock() c.cm.Unlock()
} }
func (c *Conn) SetReadDeadline(t time.Time) {
c.cm.Lock()
if c.c != nil {
c.c.SetReadDeadline(t)
}
c.cm.Unlock()
}
func (c *Conn) SetWriteDeadline(t time.Time) {
c.cm.Lock()
if c.c != nil {
c.c.SetWriteDeadline(t)
}
c.cm.Unlock()
}
func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) { func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) {
if err := c.Send(cmd, args...); err != nil { if err := c.Send(cmd, args...); err != nil {
return nil, err return nil, err
@ -87,6 +104,21 @@ func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) {
} }
func (c *Conn) Send(cmd string, args ...interface{}) error { func (c *Conn) Send(cmd string, args ...interface{}) error {
var err error
for i := 0; i < 2; i++ {
if err = c.send(cmd, args...); err != nil {
if e, ok := err.(*net.OpError); ok && strings.Contains(e.Error(), "use of closed network connection") {
//send to a closed connection, try again
continue
}
} else {
return nil
}
}
return err
}
func (c *Conn) send(cmd string, args ...interface{}) error {
if err := c.connect(); err != nil { if err := c.connect(); err != nil {
return err return err
} }
@ -138,7 +170,9 @@ func (c *Conn) ReceiveBulkTo(w io.Writer) error {
func (c *Conn) finalize() { func (c *Conn) finalize() {
c.cm.Lock() c.cm.Lock()
if !c.closed { if !c.closed {
if c.c != nil {
c.c.Close() c.c.Close()
}
c.closed = true c.closed = true
} }
c.cm.Unlock() c.cm.Unlock()

View File

@ -44,8 +44,8 @@ type App struct {
rcm sync.Mutex rcm sync.Mutex
rcs map[*respClient]struct{} rcs map[*respClient]struct{}
migrateConnM sync.Mutex migrateM sync.Mutex
migrateConns map[string]*goledis.Conn migrateClients map[string]*goledis.Client
} }
func netType(s string) string { func netType(s string) string {
@ -75,7 +75,7 @@ func NewApp(cfg *config.Config) (*App, error) {
app.rcs = make(map[*respClient]struct{}) app.rcs = make(map[*respClient]struct{})
app.migrateConns = make(map[string]*goledis.Conn) app.migrateClients = make(map[string]*goledis.Client)
var err error var err error
@ -139,12 +139,12 @@ func (app *App) Close() {
app.listener.Close() app.listener.Close()
//close all migrate connections //close all migrate connections
app.migrateConnM.Lock() app.migrateM.Lock()
for k, c := range app.migrateConns { for k, c := range app.migrateClients {
c.Close() c.Close()
delete(app.migrateConns, k) delete(app.migrateClients, k)
} }
app.migrateConnM.Unlock() app.migrateM.Unlock()
if app.httpListener != nil { if app.httpListener != nil {
app.httpListener.Close() app.httpListener.Close()

View File

@ -19,6 +19,8 @@ var txUnsupportedCmds = map[string]struct{}{
"flushall": struct{}{}, "flushall": struct{}{},
"flushdb": struct{}{}, "flushdb": struct{}{},
"eval": struct{}{}, "eval": struct{}{},
"xmigrate": struct{}{},
"xmigratedb": struct{}{},
} }
var scriptUnsupportedCmds = map[string]struct{}{ var scriptUnsupportedCmds = map[string]struct{}{
@ -30,6 +32,8 @@ var scriptUnsupportedCmds = map[string]struct{}{
"rollback": struct{}{}, "rollback": struct{}{},
"flushall": struct{}{}, "flushall": struct{}{},
"flushdb": struct{}{}, "flushdb": struct{}{},
"xmigrate": struct{}{},
"xmigratedb": struct{}{},
} }
type responseWriter interface { type responseWriter interface {

View File

@ -109,16 +109,16 @@ func restoreCommand(c *client) error {
func xdump(db *ledis.DB, tp string, key []byte) ([]byte, error) { func xdump(db *ledis.DB, tp string, key []byte) ([]byte, error) {
var err error var err error
var data []byte var data []byte
switch tp { switch strings.ToUpper(tp) {
case "kv": case "KV":
data, err = db.Dump(key) data, err = db.Dump(key)
case "hash": case "HASH":
data, err = db.HDump(key) data, err = db.HDump(key)
case "list": case "LIST":
data, err = db.LDump(key) data, err = db.LDump(key)
case "set": case "SET":
data, err = db.SDump(key) data, err = db.SDump(key)
case "zset": case "ZSET":
data, err = db.ZDump(key) data, err = db.ZDump(key)
default: default:
err = fmt.Errorf("invalid key type %s", tp) err = fmt.Errorf("invalid key type %s", tp)
@ -128,16 +128,16 @@ func xdump(db *ledis.DB, tp string, key []byte) ([]byte, error) {
func xdel(db *ledis.DB, tp string, key []byte) error { func xdel(db *ledis.DB, tp string, key []byte) error {
var err error var err error
switch tp { switch strings.ToUpper(tp) {
case "kv": case "KV":
_, err = db.Del(key) _, err = db.Del(key)
case "hash": case "HASH":
_, err = db.HClear(key) _, err = db.HClear(key)
case "list": case "LIST":
_, err = db.LClear(key) _, err = db.LClear(key)
case "set": case "SET":
_, err = db.SClear(key) _, err = db.SClear(key)
case "zset": case "ZSET":
_, err = db.ZClear(key) _, err = db.ZClear(key)
default: default:
err = fmt.Errorf("invalid key type %s", tp) err = fmt.Errorf("invalid key type %s", tp)
@ -145,6 +145,40 @@ func xdel(db *ledis.DB, tp string, key []byte) error {
return err return err
} }
func xttl(db *ledis.DB, tp string, key []byte) (int64, error) {
switch strings.ToUpper(tp) {
case "KV":
return db.TTL(key)
case "HASH":
return db.HTTL(key)
case "LIST":
return db.LTTL(key)
case "SET":
return db.STTL(key)
case "ZSET":
return db.ZTTL(key)
default:
return 0, fmt.Errorf("invalid key type %s", tp)
}
}
func xscan(db *ledis.DB, tp string, count int) ([][]byte, error) {
switch strings.ToUpper(tp) {
case "KV":
return db.Scan(nil, count, false, "")
case "HASH":
return db.HScan(nil, count, false, "")
case "LIST":
return db.LScan(nil, count, false, "")
case "SET":
return db.SScan(nil, count, false, "")
case "ZSET":
return db.ZScan(nil, count, false, "")
default:
return nil, fmt.Errorf("invalid key type %s", tp)
}
}
func xdumpCommand(c *client) error { func xdumpCommand(c *client) error {
args := c.args args := c.args
if len(args) != 2 { if len(args) != 2 {
@ -162,15 +196,131 @@ func xdumpCommand(c *client) error {
return nil return nil
} }
//XMIGRATE host port type key destination-db timeout [COPY] func (app *App) getMigrateClient(addr string) *goledis.Client {
func xmigrateCommand(c *client) error { app.migrateM.Lock()
args := c.args
if len(args) != 6 && len(args) != 7 { mc, ok := app.migrateClients[addr]
if !ok {
mc = goledis.NewClient(&goledis.Config{addr, 4, 0, 0})
app.migrateClients[addr] = mc
}
app.migrateM.Unlock()
return mc
}
//XMIGRATEDB host port tp count db timeout
//select count tp type keys and migrate
//will block any other write operations
//maybe only for xcodis
func xmigratedbCommand(c *client) error {
args := c.args
if len(args) != 6 {
return ErrCmdParams return ErrCmdParams
} }
addr := fmt.Sprintf("%s:%d", string(args[0]), string(args[1])) addr := fmt.Sprintf("%s:%s", string(args[0]), string(args[1]))
if addr == c.app.cfg.Addr {
//same server can not migrate
return fmt.Errorf("migrate in same server is not allowed")
}
tp := string(args[2])
count, err := ledis.StrInt64(args[3], nil)
if err != nil {
return err
} else if count <= 0 {
count = 10
}
db, err := ledis.StrUint64(args[4], nil)
if err != nil {
return err
} else if db >= uint64(ledis.MaxDBNumber) {
return fmt.Errorf("invalid db index %d, must < %d", db, ledis.MaxDBNumber)
}
timeout, err := ledis.StrInt64(args[5], nil)
if err != nil {
return err
} else if timeout < 0 {
return fmt.Errorf("invalid timeout %d", timeout)
}
m, err := c.db.Multi()
if err != nil {
return err
}
defer m.Close()
keys, err := xscan(m.DB, tp, int(count))
if err != nil {
return err
} else if len(keys) == 0 {
c.resp.writeInteger(0)
return nil
}
mc := c.app.getMigrateClient(addr)
conn := mc.Get()
//timeout is milliseconds
t := time.Duration(timeout) * time.Millisecond
conn.SetConnectTimeout(t)
if _, err = conn.Do("select", db); err != nil {
return err
}
for _, key := range keys {
data, err := xdump(m.DB, tp, key)
if err != nil {
return err
}
ttl, err := xttl(m.DB, tp, key)
if err != nil {
return err
}
conn.SetReadDeadline(time.Now().Add(t))
//ttl is second, but restore need millisecond
if _, err = conn.Do("restore", key, ttl*1e3, data); err != nil {
return err
}
if err = xdel(m.DB, tp, key); err != nil {
return err
}
}
c.resp.writeInteger(int64(len(keys)))
return nil
}
//XMIGRATE host port type key destination-db timeout
//will block any other write operations
//maybe only for xcodis
func xmigrateCommand(c *client) error {
args := c.args
if len(args) != 6 {
return ErrCmdParams
}
addr := fmt.Sprintf("%s:%s", string(args[0]), string(args[1]))
if addr == c.app.cfg.Addr {
//same server can not migrate
return fmt.Errorf("migrate in same server is not allowed")
}
tp := string(args[2]) tp := string(args[2])
key := args[3] key := args[3]
db, err := ledis.StrUint64(args[4], nil) db, err := ledis.StrUint64(args[4], nil)
@ -179,29 +329,21 @@ func xmigrateCommand(c *client) error {
} else if db >= uint64(ledis.MaxDBNumber) { } else if db >= uint64(ledis.MaxDBNumber) {
return fmt.Errorf("invalid db index %d, must < %d", db, ledis.MaxDBNumber) return fmt.Errorf("invalid db index %d, must < %d", db, ledis.MaxDBNumber)
} }
var timeout int64
timeout, err = ledis.StrInt64(args[5], nil) timeout, err := ledis.StrInt64(args[5], nil)
if err != nil { if err != nil {
return err return err
} else if timeout < 0 { } else if timeout < 0 {
return fmt.Errorf("invalid timeout %d", timeout) return fmt.Errorf("invalid timeout %d", timeout)
} }
onlyCopy := false m, err := c.db.Multi()
if len(args) == 7 { if err != nil {
if strings.ToUpper(string(args[6])) == "COPY" {
onlyCopy = true
}
}
var m *ledis.Multi
if m, err = c.db.Multi(); err != nil {
return err return err
} }
defer m.Close() defer m.Close()
var data []byte data, err := xdump(m.DB, tp, key)
data, err = xdump(m.DB, tp, key)
if err != nil { if err != nil {
return err return err
} else if data == nil { } else if data == nil {
@ -209,27 +351,33 @@ func xmigrateCommand(c *client) error {
return nil return nil
} }
c.app.migrateConnM.Lock() ttl, err := xttl(m.DB, tp, key)
defer c.app.migrateConnM.Unlock() if err != nil {
return err
conn, ok := c.app.migrateConns[addr]
if !ok {
conn = goledis.NewConn(addr)
c.app.migrateConns[addr] = conn
} }
mc := c.app.getMigrateClient(addr)
conn := mc.Get()
//timeout is milliseconds //timeout is milliseconds
conn.SetConnectTimeout(time.Duration(timeout) * time.Millisecond) t := time.Duration(timeout) * time.Millisecond
conn.SetConnectTimeout(t)
if _, err = conn.Do("restore", key, data); err != nil { if _, err = conn.Do("select", db); err != nil {
return err
}
conn.SetReadDeadline(time.Now().Add(t))
//ttl is second, but restore need millisecond
if _, err = conn.Do("restore", key, ttl*1e3, data); err != nil {
return err return err
} }
if !onlyCopy {
if err = xdel(m.DB, tp, key); err != nil { if err = xdel(m.DB, tp, key); err != nil {
return err return err
} }
}
c.resp.writeStatus(OK) c.resp.writeStatus(OK)
return nil return nil
@ -244,4 +392,5 @@ func init() {
register("restore", restoreCommand) register("restore", restoreCommand)
register("xdump", xdumpCommand) register("xdump", xdumpCommand)
register("xmigrate", xmigrateCommand) register("xmigrate", xmigrateCommand)
register("xmigratedb", xmigratedbCommand)
} }

View File

@ -1,11 +1,15 @@
package server package server
import ( import (
"fmt"
"github.com/siddontang/ledisdb/client/go/ledis" "github.com/siddontang/ledisdb/client/go/ledis"
"github.com/siddontang/ledisdb/config"
"os"
"testing" "testing"
"time"
) )
func TestMigrate(t *testing.T) { func TestDumpRestore(t *testing.T) {
c := getTestConn() c := getTestConn()
defer c.Close() defer c.Close()
@ -31,17 +35,94 @@ func TestMigrate(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
testMigrate(c, "dump", "mtest_a", t) testDumpRestore(c, "dump", "mtest_a", t)
testMigrate(c, "ldump", "mtest_la", t) testDumpRestore(c, "ldump", "mtest_la", t)
testMigrate(c, "hdump", "mtest_ha", t) testDumpRestore(c, "hdump", "mtest_ha", t)
testMigrate(c, "sdump", "mtest_sa", t) testDumpRestore(c, "sdump", "mtest_sa", t)
testMigrate(c, "zdump", "mtest_za", t) testDumpRestore(c, "zdump", "mtest_za", t)
} }
func testMigrate(c *ledis.Conn, dump string, key string, t *testing.T) { func testDumpRestore(c *ledis.Conn, dump string, key string, t *testing.T) {
if data, err := ledis.Bytes(c.Do(dump, key)); err != nil { if data, err := ledis.Bytes(c.Do(dump, key)); err != nil {
t.Fatal(err) t.Fatal(err)
} else if _, err := c.Do("restore", key, 0, data); err != nil { } else if _, err := c.Do("restore", key, 0, data); err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
func TestMigrate(t *testing.T) {
data_dir := "/tmp/test_migrate"
os.RemoveAll(data_dir)
s1Cfg := config.NewConfigDefault()
s1Cfg.DataDir = fmt.Sprintf("%s/s1", data_dir)
s1Cfg.Addr = "127.0.0.1:11185"
s2Cfg := config.NewConfigDefault()
s2Cfg.DataDir = fmt.Sprintf("%s/s2", data_dir)
s2Cfg.Addr = "127.0.0.1:11186"
s1, err := NewApp(s1Cfg)
if err != nil {
t.Fatal(err)
}
defer s1.Close()
s2, err := NewApp(s2Cfg)
if err != nil {
t.Fatal(err)
}
defer s2.Close()
go s1.Run()
go s2.Run()
time.Sleep(1 * time.Second)
c1 := ledis.NewConn(s1Cfg.Addr)
defer c1.Close()
c2 := ledis.NewConn(s2Cfg.Addr)
defer c2.Close()
if _, err = c1.Do("set", "a", "1"); err != nil {
t.Fatal(err)
}
timeout := 30000
if _, err = c1.Do("xmigrate", "127.0.0.1", 11186, "KV", "a", 0, timeout); err != nil {
t.Fatal(err)
}
if s, err := ledis.String(c2.Do("get", "a")); err != nil {
t.Fatal(err)
} else if s != "1" {
t.Fatal(s, "must 1")
}
if s, err := ledis.String(c1.Do("get", "a")); err != nil && err != ledis.ErrNil {
t.Fatal(err)
} else if s != "" {
t.Fatal(s, "must empty")
}
if num, err := ledis.Int(c2.Do("xmigratedb", "127.0.0.1", 11185, "KV", 10, 0, timeout)); err != nil {
t.Fatal(err)
} else if num != 1 {
t.Fatal(num, "must number 1")
}
if s, err := ledis.String(c1.Do("get", "a")); err != nil {
t.Fatal(err)
} else if s != "1" {
t.Fatal(s, "must 1")
}
if s, err := ledis.String(c2.Do("get", "a")); err != nil && err != ledis.ErrNil {
t.Fatal(err)
} else if s != "" {
t.Fatal(s, "must empty")
}
}