add migrate command, improved later

This commit is contained in:
siddontang 2014-12-01 17:50:48 +08:00
parent 0ee6860979
commit 07e3705b22
4 changed files with 163 additions and 3 deletions

View File

@ -9,6 +9,7 @@ import (
"net"
"strconv"
"sync"
"time"
)
// Error represents an error returned in a command reply.
@ -40,6 +41,8 @@ type Conn struct {
// Scratch space for formatting integers and floats.
numScratch [40]byte
connectTimeout time.Duration
}
func NewConn(addr string) *Conn {
@ -69,6 +72,12 @@ func (c *Conn) Close() {
}
}
func (c *Conn) SetConnectTimeout(t time.Duration) {
c.cm.Lock()
c.connectTimeout = t
c.cm.Unlock()
}
func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) {
if err := c.Send(cmd, args...); err != nil {
return nil, err
@ -144,7 +153,7 @@ func (c *Conn) connect() error {
}
var err error
c.c, err = net.Dial(getProto(c.addr), c.addr)
c.c, err = net.DialTimeout(getProto(c.addr), c.addr, c.connectTimeout)
if err != nil {
c.c = nil
return err

View File

@ -1,6 +1,7 @@
package server
import (
goledis "github.com/siddontang/ledisdb/client/go/ledis"
"github.com/siddontang/ledisdb/config"
"github.com/siddontang/ledisdb/ledis"
"net"
@ -42,6 +43,9 @@ type App struct {
rcm sync.Mutex
rcs map[*respClient]struct{}
migrateConnM sync.Mutex
migrateConns map[string]*goledis.Conn
}
func netType(s string) string {
@ -71,6 +75,8 @@ func NewApp(cfg *config.Config) (*App, error) {
app.rcs = make(map[*respClient]struct{})
app.migrateConns = make(map[string]*goledis.Conn)
var err error
if app.info, err = newInfo(app); err != nil {
@ -132,6 +138,14 @@ func (app *App) Close() {
app.listener.Close()
//close all migrate connections
app.migrateConnM.Lock()
for k, c := range app.migrateConns {
c.Close()
delete(app.migrateConns, k)
}
app.migrateConnM.Unlock()
if app.httpListener != nil {
app.httpListener.Close()
}

View File

@ -1,7 +1,11 @@
package server
import (
"fmt"
goledis "github.com/siddontang/ledisdb/client/go/ledis"
"github.com/siddontang/ledisdb/ledis"
"strings"
"time"
)
func dumpCommand(c *client) error {
@ -79,6 +83,7 @@ func zdumpCommand(c *client) error {
return nil
}
// unlike redis, restore will try to delete old key first
func restoreCommand(c *client) error {
args := c.args
if len(args) != 3 {
@ -101,6 +106,135 @@ func restoreCommand(c *client) error {
return nil
}
func xdump(db *ledis.DB, tp string, key []byte) ([]byte, error) {
var err error
var data []byte
switch tp {
case "kv":
data, err = db.Dump(key)
case "hash":
data, err = db.HDump(key)
case "list":
data, err = db.LDump(key)
case "set":
data, err = db.SDump(key)
case "zset":
data, err = db.ZDump(key)
default:
err = fmt.Errorf("invalid key type %s", tp)
}
return data, err
}
func xdel(db *ledis.DB, tp string, key []byte) error {
var err error
switch tp {
case "kv":
_, err = db.Del(key)
case "hash":
_, err = db.HClear(key)
case "list":
_, err = db.LClear(key)
case "set":
_, err = db.SClear(key)
case "zset":
_, err = db.ZClear(key)
default:
err = fmt.Errorf("invalid key type %s", tp)
}
return err
}
func xdumpCommand(c *client) error {
args := c.args
if len(args) != 2 {
return ErrCmdParams
}
tp := string(args[0])
key := args[1]
if data, err := xdump(c.db, tp, key); err != nil {
return err
} else {
c.resp.writeBulk(data)
}
return nil
}
//XMIGRATE host port type key destination-db timeout [COPY]
func xmigrateCommand(c *client) error {
args := c.args
if len(args) != 6 && len(args) != 7 {
return ErrCmdParams
}
addr := fmt.Sprintf("%s:%d", string(args[0]), string(args[1]))
tp := string(args[2])
key := args[3]
db, err := ledis.StrUint64(args[4], nil)
if err != nil {
return err
} else if db >= uint64(ledis.MaxDBNumber) {
return fmt.Errorf("invalid db index %d, must < %d", db, ledis.MaxDBNumber)
}
var timeout int64
timeout, err = ledis.StrInt64(args[5], nil)
if err != nil {
return err
} else if timeout < 0 {
return fmt.Errorf("invalid timeout %d", timeout)
}
onlyCopy := false
if len(args) == 7 {
if strings.ToUpper(string(args[6])) == "COPY" {
onlyCopy = true
}
}
var m *ledis.Multi
if m, err = c.db.Multi(); err != nil {
return err
}
defer m.Close()
var data []byte
data, err = xdump(m.DB, tp, key)
if err != nil {
return err
} else if data == nil {
c.resp.writeStatus(NOKEY)
return nil
}
c.app.migrateConnM.Lock()
defer c.app.migrateConnM.Unlock()
conn, ok := c.app.migrateConns[addr]
if !ok {
conn = goledis.NewConn(addr)
c.app.migrateConns[addr] = conn
}
//timeout is milliseconds
conn.SetConnectTimeout(time.Duration(timeout) * time.Millisecond)
if _, err = conn.Do("restore", key, data); err != nil {
return err
}
if !onlyCopy {
if err = xdel(m.DB, tp, key); err != nil {
return err
}
}
c.resp.writeStatus(OK)
return nil
}
func init() {
register("dump", dumpCommand)
register("ldump", ldumpCommand)
@ -108,4 +242,6 @@ func init() {
register("sdump", sdumpCommand)
register("zdump", zdumpCommand)
register("restore", restoreCommand)
register("xdump", xdumpCommand)
register("xmigrate", xmigrateCommand)
}

View File

@ -22,6 +22,7 @@ var (
PONG = "PONG"
OK = "OK"
NOKEY = "NOKEY"
)
const (