Merge branch 'develop' into rpl-feature

This commit is contained in:
siddontang 2014-11-02 14:27:35 +08:00
commit 014954c3b9
12 changed files with 146 additions and 74 deletions

9
.travis.yml Normal file
View File

@ -0,0 +1,9 @@
language: go
go: 1.3.3
before_install:
- go get github.com/tools/godep
- go get code.google.com/p/go.tools/cmd/cover
- go install -race std
script:
- godep go test -cover ./...
# - godep go test -race ./...

View File

@ -22,5 +22,5 @@ clean:
test: test:
$(GO) test -tags '$(GO_BUILD_TAGS)' ./... $(GO) test -tags '$(GO_BUILD_TAGS)' ./...
pytest: test_race:
sh client/ledis-py/tests/all.sh $(GO) test -race -tags '$(GO_BUILD_TAGS)' ./...

View File

@ -8,6 +8,7 @@ import (
"io" "io"
"net" "net"
"strconv" "strconv"
"sync"
) )
// Error represents an error returned in a command reply. // Error represents an error returned in a command reply.
@ -16,6 +17,12 @@ type Error string
func (err Error) Error() string { return string(err) } func (err Error) Error() string { return string(err) }
type Conn struct { type Conn struct {
cm sync.Mutex
wm sync.Mutex
rm sync.Mutex
closed bool
client *Client client *Client
addr string addr string
@ -42,6 +49,8 @@ func NewConn(addr string) *Conn {
co.rSize = 4096 co.rSize = 4096
co.wSize = 4096 co.wSize = 4096
co.closed = false
return co return co
} }
@ -73,6 +82,9 @@ func (c *Conn) Send(cmd string, args ...interface{}) error {
return err return err
} }
c.wm.Lock()
defer c.wm.Unlock()
if err := c.writeCommand(cmd, args); err != nil { if err := c.writeCommand(cmd, args); err != nil {
c.finalize() c.finalize()
return err return err
@ -86,6 +98,9 @@ func (c *Conn) Send(cmd string, args ...interface{}) error {
} }
func (c *Conn) Receive() (interface{}, error) { func (c *Conn) Receive() (interface{}, error) {
c.rm.Lock()
defer c.rm.Unlock()
if reply, err := c.readReply(); err != nil { if reply, err := c.readReply(); err != nil {
c.finalize() c.finalize()
return nil, err return nil, err
@ -99,6 +114,9 @@ func (c *Conn) Receive() (interface{}, error) {
} }
func (c *Conn) ReceiveBulkTo(w io.Writer) error { func (c *Conn) ReceiveBulkTo(w io.Writer) error {
c.rm.Lock()
defer c.rm.Unlock()
err := c.readBulkReplyTo(w) err := c.readBulkReplyTo(w)
if err != nil { if err != nil {
if _, ok := err.(Error); !ok { if _, ok := err.(Error); !ok {
@ -109,20 +127,26 @@ func (c *Conn) ReceiveBulkTo(w io.Writer) error {
} }
func (c *Conn) finalize() { func (c *Conn) finalize() {
if c.c != nil { c.cm.Lock()
if !c.closed {
c.c.Close() c.c.Close()
c.c = nil c.closed = true
} }
c.cm.Unlock()
} }
func (c *Conn) connect() error { func (c *Conn) connect() error {
if c.c != nil { c.cm.Lock()
defer c.cm.Unlock()
if !c.closed && c.c != nil {
return nil return nil
} }
var err error var err error
c.c, err = net.Dial(getProto(c.addr), c.addr) c.c, err = net.Dial(getProto(c.addr), c.addr)
if err != nil { if err != nil {
c.c = nil
return err return err
} }

View File

@ -7,6 +7,7 @@ import (
"github.com/siddontang/go/ioutil2" "github.com/siddontang/go/ioutil2"
"io" "io"
"io/ioutil" "io/ioutil"
"sync"
) )
var ( var (
@ -83,6 +84,8 @@ type SnapshotConfig struct {
} }
type Config struct { type Config struct {
m sync.RWMutex `toml:"-"`
FileName string `toml:"-"` FileName string `toml:"-"`
Addr string `toml:"addr"` Addr string `toml:"addr"`
@ -254,3 +257,16 @@ func (cfg *Config) Rewrite() error {
return cfg.DumpFile(cfg.FileName) return cfg.DumpFile(cfg.FileName)
} }
func (cfg *Config) GetReadonly() bool {
cfg.m.RLock()
b := cfg.Readonly
cfg.m.RUnlock()
return b
}
func (cfg *Config) SetReadonly(b bool) {
cfg.m.Lock()
cfg.Readonly = b
cfg.m.Unlock()
}

View File

@ -82,8 +82,7 @@ func Open(cfg *config.Config) (*Ledis, error) {
l.dbs[i] = l.newDB(i) l.dbs[i] = l.newDB(i)
} }
l.wg.Add(1) l.checkTTL()
go l.checkTTL()
return l, nil return l, nil
} }
@ -96,12 +95,12 @@ func (l *Ledis) Close() {
if l.r != nil { if l.r != nil {
l.r.Close() l.r.Close()
l.r = nil //l.r = nil
} }
if l.lock != nil { if l.lock != nil {
l.lock.Close() l.lock.Close()
l.lock = nil //l.lock = nil
} }
} }
@ -157,7 +156,7 @@ func (l *Ledis) flushAll() error {
} }
func (l *Ledis) IsReadOnly() bool { func (l *Ledis) IsReadOnly() bool {
if l.cfg.Readonly { if l.cfg.GetReadonly() {
return true return true
} else if l.r != nil { } else if l.r != nil {
if b, _ := l.r.CommitIDBehind(); b { if b, _ := l.r.CommitIDBehind(); b {
@ -168,8 +167,6 @@ func (l *Ledis) IsReadOnly() bool {
} }
func (l *Ledis) checkTTL() { func (l *Ledis) checkTTL() {
defer l.wg.Done()
for i, db := range l.dbs { for i, db := range l.dbs {
c := newTTLChecker(db) c := newTTLChecker(db)
@ -187,6 +184,10 @@ func (l *Ledis) checkTTL() {
l.cfg.TTLCheckInterval = 1 l.cfg.TTLCheckInterval = 1
} }
l.wg.Add(1)
go func() {
defer l.wg.Done()
tick := time.NewTicker(time.Duration(l.cfg.TTLCheckInterval) * time.Second) tick := time.NewTicker(time.Duration(l.cfg.TTLCheckInterval) * time.Second)
defer tick.Stop() defer tick.Stop()
@ -205,6 +206,8 @@ func (l *Ledis) checkTTL() {
} }
} }
}()
} }
func (l *Ledis) StoreStat() *store.Stat { func (l *Ledis) StoreStat() *store.Stat {

View File

@ -85,7 +85,8 @@ func (db *DB) expireAt(t *batch, dataType byte, key []byte, when int64) {
t.Put(tk, mk) t.Put(tk, mk)
t.Put(mk, PutInt64(when)) t.Put(mk, PutInt64(when))
db.l.tcs[db.index].setNextCheckTime(when, false) tc := db.l.tcs[db.index]
tc.setNextCheckTime(when, false)
} }
func (db *DB) ttl(dataType byte, key []byte) (t int64, err error) { func (db *DB) ttl(dataType byte, key []byte) (t int64, err error) {

View File

@ -32,6 +32,8 @@ type Replication struct {
wg sync.WaitGroup wg sync.WaitGroup
nc chan struct{} nc chan struct{}
ncm sync.Mutex
} }
func NewReplication(cfg *config.Config) (*Replication, error) { func NewReplication(cfg *config.Config) (*Replication, error) {
@ -63,6 +65,7 @@ func NewReplication(cfg *config.Config) (*Replication, error) {
return nil, err return nil, err
} }
r.wg.Add(1)
go r.onPurgeExpired() go r.onPurgeExpired()
return r, nil return r, nil
@ -73,6 +76,9 @@ func (r *Replication) Close() error {
r.wg.Wait() r.wg.Wait()
r.m.Lock()
defer r.m.Unlock()
if r.s != nil { if r.s != nil {
r.s.Close() r.s.Close()
r.s = nil r.s = nil
@ -124,14 +130,19 @@ func (r *Replication) Log(data []byte) (*Log, error) {
return nil, err return nil, err
} }
r.ncm.Lock()
close(r.nc) close(r.nc)
r.nc = make(chan struct{}) r.nc = make(chan struct{})
r.ncm.Unlock()
return l, nil return l, nil
} }
func (r *Replication) WaitLog() <-chan struct{} { func (r *Replication) WaitLog() <-chan struct{} {
return r.nc r.ncm.Lock()
ch := r.nc
r.ncm.Unlock()
return ch
} }
func (r *Replication) StoreLog(log *Log) error { func (r *Replication) StoreLog(log *Log) error {
@ -251,7 +262,6 @@ func (r *Replication) ClearWithCommitID(id uint64) error {
} }
func (r *Replication) onPurgeExpired() { func (r *Replication) onPurgeExpired() {
r.wg.Add(1)
defer r.wg.Done() defer r.wg.Done()
for { for {

View File

@ -149,7 +149,11 @@ func (app *App) Run() {
go app.httpServe() go app.httpServe()
for !app.closed { for {
select {
case <-app.quit:
return
default:
conn, err := app.listener.Accept() conn, err := app.listener.Accept()
if err != nil { if err != nil {
continue continue
@ -157,6 +161,7 @@ func (app *App) Run() {
newClientRESP(conn, app) newClientRESP(conn, app)
} }
}
} }
func (app *App) httpServe() { func (app *App) httpServe() {

View File

@ -3,6 +3,7 @@ package server
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"github.com/siddontang/go/sync2"
"github.com/siddontang/ledisdb/ledis" "github.com/siddontang/ledisdb/ledis"
"io" "io"
"time" "time"
@ -62,7 +63,7 @@ type client struct {
syncBuf bytes.Buffer syncBuf bytes.Buffer
lastLogID uint64 lastLogID sync2.AtomicUint64
// reqErr chan error // reqErr chan error

View File

@ -110,16 +110,20 @@ func syncCommand(c *client) error {
return ErrCmdParams return ErrCmdParams
} }
c.lastLogID = logId - 1 lastLogID := logId - 1
stat, err := c.app.ldb.ReplicationStat() stat, err := c.app.ldb.ReplicationStat()
if err != nil { if err != nil {
return err return err
} }
if c.lastLogID > stat.LastID { if lastLogID > stat.LastID {
return fmt.Errorf("invalid sync logid %d > %d + 1", logId, stat.LastID) return fmt.Errorf("invalid sync logid %d > %d + 1", logId, stat.LastID)
} else if c.lastLogID == stat.LastID { }
c.lastLogID.Set(lastLogID)
if lastLogID == stat.LastID {
c.app.slaveAck(c) c.app.slaveAck(c)
} }

View File

@ -72,8 +72,8 @@ func TestExpire(t *testing.T) {
if ttl, err := ledis.Int64(c.Do(ttl, key)); err != nil { if ttl, err := ledis.Int64(c.Do(ttl, key)); err != nil {
t.Fatal(err) t.Fatal(err)
} else if ttl != exp { } else if ttl == -1 {
t.Fatal(ttl) t.Fatal("no ttl")
} }
// expireat + ttl // expireat + ttl
@ -86,8 +86,8 @@ func TestExpire(t *testing.T) {
if ttl, err := ledis.Int64(c.Do(ttl, key)); err != nil { if ttl, err := ledis.Int64(c.Do(ttl, key)); err != nil {
t.Fatal(err) t.Fatal(err)
} else if ttl != 3 { } else if ttl == -1 {
t.Fatal(ttl) t.Fatal("no ttl")
} }
kErr := "not_exist_ttl" kErr := "not_exist_ttl"

View File

@ -19,6 +19,7 @@ import (
var ( var (
errConnectMaster = errors.New("connect master error") errConnectMaster = errors.New("connect master error")
errReplClosed = errors.New("replication is closed")
) )
type master struct { type master struct {
@ -47,17 +48,16 @@ func newMaster(app *App) *master {
} }
func (m *master) Close() { func (m *master) Close() {
ledis.AsyncNotify(m.quit) m.quit <- struct{}{}
if m.conn != nil { m.closeConn()
//for replication, we send quit command to close gracefully
m.conn.Send("quit")
m.conn.Close()
m.conn = nil
}
m.wg.Wait() m.wg.Wait()
select {
case <-m.quit:
default:
}
} }
func (m *master) resetConn() error { func (m *master) resetConn() error {
@ -67,7 +67,6 @@ func (m *master) resetConn() error {
if m.conn != nil { if m.conn != nil {
m.conn.Close() m.conn.Close()
m.conn = nil
} }
m.conn = goledis.NewConn(m.addr) m.conn = goledis.NewConn(m.addr)
@ -75,6 +74,15 @@ func (m *master) resetConn() error {
return nil return nil
} }
func (m *master) closeConn() {
if m.conn != nil {
//for replication, we send quit command to close gracefully
m.conn.Send("quit")
m.conn.Close()
}
}
func (m *master) stopReplication() error { func (m *master) stopReplication() error {
m.Close() m.Close()
@ -87,9 +95,7 @@ func (m *master) startReplication(masterAddr string, restart bool) error {
m.addr = masterAddr m.addr = masterAddr
m.quit = make(chan struct{}, 1) m.app.cfg.SetReadonly(true)
m.app.cfg.Readonly = true
m.wg.Add(1) m.wg.Add(1)
go m.runReplication(restart) go m.runReplication(restart)
@ -123,28 +129,20 @@ func (m *master) runReplication(restart bool) {
if restart { if restart {
if err := m.fullSync(); err != nil { if err := m.fullSync(); err != nil {
if m.conn != nil {
//if conn == nil, other close the replication, not error
log.Error("restart fullsync error %s", err.Error()) log.Error("restart fullsync error %s", err.Error())
}
return return
} }
} }
for { for {
if err := m.sync(); err != nil {
if m.conn != nil {
//if conn == nil, other close the replication, not error
log.Error("sync error %s", err.Error())
}
return
}
select { select {
case <-m.quit: case <-m.quit:
return return
default: default:
break if err := m.sync(); err != nil {
log.Error("sync error %s", err.Error())
return
}
} }
} }
} }
@ -266,7 +264,7 @@ func (app *App) slaveof(masterAddr string, restart bool, readonly bool) error {
//in master mode and no slaveof, only set readonly //in master mode and no slaveof, only set readonly
if len(app.cfg.SlaveOf) == 0 && len(masterAddr) == 0 { if len(app.cfg.SlaveOf) == 0 && len(masterAddr) == 0 {
app.cfg.Readonly = readonly app.cfg.SetReadonly(readonly)
return nil return nil
} }
@ -281,7 +279,7 @@ func (app *App) slaveof(masterAddr string, restart bool, readonly bool) error {
return err return err
} }
app.cfg.Readonly = readonly app.cfg.SetReadonly(readonly)
} else { } else {
return app.m.startReplication(masterAddr, restart) return app.m.startReplication(masterAddr, restart)
} }
@ -323,7 +321,7 @@ func (app *App) removeSlave(c *client, activeQuit bool) {
delete(app.slaves, addr) delete(app.slaves, addr)
log.Info("remove slave %s", addr) log.Info("remove slave %s", addr)
if activeQuit { if activeQuit {
asyncNotifyUint64(app.slaveSyncAck, c.lastLogID) asyncNotifyUint64(app.slaveSyncAck, c.lastLogID.Get())
} }
} }
} }
@ -339,7 +337,7 @@ func (app *App) slaveAck(c *client) {
return return
} }
asyncNotifyUint64(app.slaveSyncAck, c.lastLogID) asyncNotifyUint64(app.slaveSyncAck, c.lastLogID.Get())
} }
func asyncNotifyUint64(ch chan uint64, v uint64) { func asyncNotifyUint64(ch chan uint64, v uint64) {
@ -369,11 +367,12 @@ func (app *App) publishNewLog(l *rpl.Log) {
n := 0 n := 0
logId := l.ID logId := l.ID
for _, s := range app.slaves { for _, s := range app.slaves {
if s.lastLogID == logId { lastLogID := s.lastLogID.Get()
if lastLogID == logId {
//slave has already owned this log //slave has already owned this log
n++ n++
} else if s.lastLogID > logId { } else if lastLogID > logId {
log.Error("invalid slave %s, lastlogid %d > %d", s.slaveListeningAddr, s.lastLogID, logId) log.Error("invalid slave %s, lastlogid %d > %d", s.slaveListeningAddr, lastLogID, logId)
} }
} }