add role command

This commit is contained in:
siddontang 2015-02-03 14:15:30 +08:00
parent e569e0c553
commit fb9d8cc527
3 changed files with 144 additions and 2 deletions

View File

@ -182,9 +182,86 @@ func replconfCommand(c *client) error {
return nil return nil
} }
func roleCommand(c *client) error {
if len(c.args) != 0 {
return ErrCmdParams
}
c.app.m.Lock()
isMaster := len(c.app.cfg.SlaveOf) == 0
c.app.m.Unlock()
ay := make([]interface{}, 0, 5)
var lastId int64 = 0
stat, _ := c.app.ldb.ReplicationStat()
if stat != nil {
lastId = int64(stat.LastID)
}
if isMaster {
ay = append(ay, []byte("master"))
ay = append(ay, lastId)
items := make([]interface{}, 0, 3)
c.app.slock.Lock()
for addr, slave := range c.app.slaves {
host, port, _ := splitHostPort(addr)
items = append(items, []interface{}{[]byte(host),
strconv.AppendUint(nil, uint64(port), 10),
strconv.AppendUint(nil, slave.lastLogID.Get(), 10)})
}
c.app.slock.Unlock()
ay = append(ay, items)
} else {
host, port, _ := splitHostPort(c.app.cfg.Addr)
ay = append(ay, []byte("slave"))
ay = append(ay, []byte(host))
ay = append(ay, int64(port))
ay = append(ay, []byte(replStatetring(c.app.m.state.Get())))
ay = append(ay, lastId)
}
c.resp.writeArray(ay)
return nil
}
func replStatetring(r int32) string {
switch r {
case replConnectState:
return "connect"
case replConnectingState:
return "connecting"
case replSyncState:
return "sync"
case replConnectedState:
return "connected"
default:
return "unknown"
}
}
func splitHostPort(str string) (string, int16, error) {
host, port, err := net.SplitHostPort(str)
if err != nil {
return "", 0, err
}
p, err := strconv.ParseInt(port, 10, 16)
if err != nil {
return "", 0, err
}
return host, int16(p), nil
}
func init() { func init() {
register("slaveof", slaveofCommand) register("slaveof", slaveofCommand)
register("fullsync", fullsyncCommand) register("fullsync", fullsyncCommand)
register("sync", syncCommand) register("sync", syncCommand)
register("replconf", replconfCommand) register("replconf", replconfCommand)
register("role", roleCommand)
} }

View File

@ -2,6 +2,7 @@ package server
import ( import (
"fmt" "fmt"
goledis "github.com/siddontang/ledisdb/client/go/ledis"
"github.com/siddontang/ledisdb/config" "github.com/siddontang/ledisdb/config"
"os" "os"
"reflect" "reflect"
@ -120,6 +121,32 @@ func TestReplication(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
mStat, _ := master.ldb.ReplicationStat()
sStat, _ := slave.ldb.ReplicationStat()
if err = checkTestRole(masterCfg.Addr, []interface{}{
[]byte("master"),
int64(mStat.LastID),
[]interface{}{
[]interface{}{
[]byte("127.0.0.1"),
[]byte("11183"),
[]byte(fmt.Sprintf("%d", sStat.LastID)),
}},
}); err != nil {
t.Fatal(err)
}
if err = checkTestRole(slaveCfg.Addr, []interface{}{
[]byte("slave"),
[]byte("127.0.0.1"),
int64(11183),
[]byte("connected"),
int64(sStat.LastID),
}); err != nil {
t.Fatal(err)
}
slave.tryReSlaveof() slave.tryReSlaveof()
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
@ -129,5 +156,16 @@ func TestReplication(t *testing.T) {
if err = checkDataEqual(master, slave); err != nil { if err = checkDataEqual(master, slave); err != nil {
t.Fatal(err) t.Fatal(err)
} }
}
func checkTestRole(addr string, checkRoles []interface{}) error {
conn := goledis.NewConn(addr)
defer conn.Close()
roles, err := goledis.MultiBulk(conn.Do("ROLE"))
if err != nil {
return err
} else if !reflect.DeepEqual(roles, checkRoles) {
return fmt.Errorf("%v != %v", roles, checkRoles)
}
return nil
} }

View File

@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"github.com/siddontang/go/log" "github.com/siddontang/go/log"
"github.com/siddontang/go/num" "github.com/siddontang/go/num"
"github.com/siddontang/go/sync2"
goledis "github.com/siddontang/ledisdb/client/go/ledis" goledis "github.com/siddontang/ledisdb/client/go/ledis"
"github.com/siddontang/ledisdb/ledis" "github.com/siddontang/ledisdb/ledis"
"github.com/siddontang/ledisdb/rpl" "github.com/siddontang/ledisdb/rpl"
@ -22,6 +23,17 @@ var (
errReplClosed = errors.New("replication is closed") errReplClosed = errors.New("replication is closed")
) )
const (
// slave needs to connect to its master
replConnectState int32 = iota + 1
// slave-master connection is in progress
replConnectingState
// perform the synchronization
replSyncState
// slave is online
replConnectedState
)
type master struct { type master struct {
sync.Mutex sync.Mutex
@ -36,6 +48,8 @@ type master struct {
wg sync.WaitGroup wg sync.WaitGroup
syncBuf bytes.Buffer syncBuf bytes.Buffer
state sync2.AtomicInt32
} }
func newMaster(app *App) *master { func newMaster(app *App) *master {
@ -44,6 +58,8 @@ func newMaster(app *App) *master {
m.quit = make(chan struct{}, 1) m.quit = make(chan struct{}, 1)
m.state.Set(replConnectState)
return m return m
} }
@ -58,6 +74,8 @@ func (m *master) Close() {
case <-m.quit: case <-m.quit:
default: default:
} }
m.state.Set(replConnectState)
} }
func (m *master) resetConn() error { func (m *master) resetConn() error {
@ -103,8 +121,12 @@ func (m *master) startReplication(masterAddr string, restart bool) error {
} }
func (m *master) runReplication(restart bool) { func (m *master) runReplication(restart bool) {
defer m.wg.Done() defer func() {
m.state.Set(replConnectState)
m.wg.Done()
}()
m.state.Set(replConnectingState)
if err := m.resetConn(); err != nil { if err := m.resetConn(); err != nil {
log.Errorf("reset conn error %s", err.Error()) log.Errorf("reset conn error %s", err.Error())
return return
@ -122,12 +144,15 @@ func (m *master) runReplication(restart bool) {
} }
} }
m.state.Set(replConnectedState)
if err := m.replConf(); err != nil { if err := m.replConf(); err != nil {
log.Errorf("replconf error %s", err.Error()) log.Errorf("replconf error %s", err.Error())
return return
} }
if restart { if restart {
m.state.Set(replSyncState)
if err := m.fullSync(); err != nil { if err := m.fullSync(); err != nil {
log.Errorf("restart fullsync error %s", err.Error()) log.Errorf("restart fullsync error %s", err.Error())
return return
@ -139,6 +164,7 @@ func (m *master) runReplication(restart bool) {
case <-m.quit: case <-m.quit:
return return
default: default:
m.state.Set(replConnectedState)
if err := m.sync(); err != nil { if err := m.sync(); err != nil {
log.Errorf("sync error %s", err.Error()) log.Errorf("sync error %s", err.Error())
return return
@ -250,6 +276,7 @@ func (m *master) sync() error {
return nil return nil
} }
m.state.Set(replSyncState)
if err = m.app.ldb.StoreLogsFromData(buf); err != nil { if err = m.app.ldb.StoreLogsFromData(buf); err != nil {
return err return err
} }