diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..e3f1130 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,9 @@ +language: go +go: 1.3.3 +before_install: + - go get github.com/tools/godep + - go get code.google.com/p/go.tools/cmd/cover + - go install -race std +script: + - godep go test -cover ./... +# - godep go test -race ./... diff --git a/Makefile b/Makefile index cfdddda..d52de40 100644 --- a/Makefile +++ b/Makefile @@ -22,5 +22,5 @@ clean: test: $(GO) test -tags '$(GO_BUILD_TAGS)' ./... -pytest: - sh client/ledis-py/tests/all.sh +test_race: + $(GO) test -race -tags '$(GO_BUILD_TAGS)' ./... diff --git a/client/go/ledis/conn.go b/client/go/ledis/conn.go index c74ab3f..1c988b6 100644 --- a/client/go/ledis/conn.go +++ b/client/go/ledis/conn.go @@ -8,6 +8,7 @@ import ( "io" "net" "strconv" + "sync" ) // Error represents an error returned in a command reply. @@ -16,6 +17,12 @@ type Error string func (err Error) Error() string { return string(err) } type Conn struct { + cm sync.Mutex + wm sync.Mutex + rm sync.Mutex + + closed bool + client *Client addr string @@ -42,6 +49,8 @@ func NewConn(addr string) *Conn { co.rSize = 4096 co.wSize = 4096 + co.closed = false + return co } @@ -73,6 +82,9 @@ func (c *Conn) Send(cmd string, args ...interface{}) error { return err } + c.wm.Lock() + defer c.wm.Unlock() + if err := c.writeCommand(cmd, args); err != nil { c.finalize() return err @@ -86,6 +98,9 @@ func (c *Conn) Send(cmd string, args ...interface{}) error { } func (c *Conn) Receive() (interface{}, error) { + c.rm.Lock() + defer c.rm.Unlock() + if reply, err := c.readReply(); err != nil { c.finalize() return nil, err @@ -99,6 +114,9 @@ func (c *Conn) Receive() (interface{}, error) { } func (c *Conn) ReceiveBulkTo(w io.Writer) error { + c.rm.Lock() + defer c.rm.Unlock() + err := c.readBulkReplyTo(w) if err != nil { if _, ok := err.(Error); !ok { @@ -109,20 +127,26 @@ func (c *Conn) ReceiveBulkTo(w io.Writer) error { } func (c *Conn) finalize() { - if c.c != nil { + c.cm.Lock() + if !c.closed { c.c.Close() - c.c = nil + c.closed = true } + c.cm.Unlock() } func (c *Conn) connect() error { - if c.c != nil { + c.cm.Lock() + defer c.cm.Unlock() + + if !c.closed && c.c != nil { return nil } var err error c.c, err = net.Dial(getProto(c.addr), c.addr) if err != nil { + c.c = nil return err } diff --git a/config/config.go b/config/config.go index e75bca4..498c748 100644 --- a/config/config.go +++ b/config/config.go @@ -7,6 +7,7 @@ import ( "github.com/siddontang/go/ioutil2" "io" "io/ioutil" + "sync" ) var ( @@ -83,6 +84,8 @@ type SnapshotConfig struct { } type Config struct { + m sync.RWMutex `toml:"-"` + FileName string `toml:"-"` Addr string `toml:"addr"` @@ -254,3 +257,16 @@ func (cfg *Config) Rewrite() error { return cfg.DumpFile(cfg.FileName) } + +func (cfg *Config) GetReadonly() bool { + cfg.m.RLock() + b := cfg.Readonly + cfg.m.RUnlock() + return b +} + +func (cfg *Config) SetReadonly(b bool) { + cfg.m.Lock() + cfg.Readonly = b + cfg.m.Unlock() +} diff --git a/ledis/ledis.go b/ledis/ledis.go index a5e926e..6e390c5 100644 --- a/ledis/ledis.go +++ b/ledis/ledis.go @@ -82,8 +82,7 @@ func Open(cfg *config.Config) (*Ledis, error) { l.dbs[i] = l.newDB(i) } - l.wg.Add(1) - go l.checkTTL() + l.checkTTL() return l, nil } @@ -96,12 +95,12 @@ func (l *Ledis) Close() { if l.r != nil { l.r.Close() - l.r = nil + //l.r = nil } if l.lock != nil { l.lock.Close() - l.lock = nil + //l.lock = nil } } @@ -157,7 +156,7 @@ func (l *Ledis) flushAll() error { } func (l *Ledis) IsReadOnly() bool { - if l.cfg.Readonly { + if l.cfg.GetReadonly() { return true } else if l.r != nil { if b, _ := l.r.CommitIDBehind(); b { @@ -168,8 +167,6 @@ func (l *Ledis) IsReadOnly() bool { } func (l *Ledis) checkTTL() { - defer l.wg.Done() - for i, db := range l.dbs { c := newTTLChecker(db) @@ -187,23 +184,29 @@ func (l *Ledis) checkTTL() { l.cfg.TTLCheckInterval = 1 } - tick := time.NewTicker(time.Duration(l.cfg.TTLCheckInterval) * time.Second) - defer tick.Stop() + l.wg.Add(1) + go func() { + defer l.wg.Done() - for { - select { - case <-tick.C: - if l.IsReadOnly() { - break - } + tick := time.NewTicker(time.Duration(l.cfg.TTLCheckInterval) * time.Second) + defer tick.Stop() - for _, c := range l.tcs { - c.check() + for { + select { + case <-tick.C: + if l.IsReadOnly() { + break + } + + for _, c := range l.tcs { + c.check() + } + case <-l.quit: + return } - case <-l.quit: - return } - } + + }() } diff --git a/ledis/t_ttl.go b/ledis/t_ttl.go index a912d26..f16a735 100644 --- a/ledis/t_ttl.go +++ b/ledis/t_ttl.go @@ -85,7 +85,8 @@ func (db *DB) expireAt(t *batch, dataType byte, key []byte, when int64) { t.Put(tk, mk) t.Put(mk, PutInt64(when)) - db.l.tcs[db.index].setNextCheckTime(when, false) + tc := db.l.tcs[db.index] + tc.setNextCheckTime(when, false) } func (db *DB) ttl(dataType byte, key []byte) (t int64, err error) { diff --git a/rpl/rpl.go b/rpl/rpl.go index 151d17c..067c0c2 100644 --- a/rpl/rpl.go +++ b/rpl/rpl.go @@ -32,6 +32,8 @@ type Replication struct { wg sync.WaitGroup nc chan struct{} + + ncm sync.Mutex } func NewReplication(cfg *config.Config) (*Replication, error) { @@ -63,6 +65,7 @@ func NewReplication(cfg *config.Config) (*Replication, error) { return nil, err } + r.wg.Add(1) go r.onPurgeExpired() return r, nil @@ -73,6 +76,9 @@ func (r *Replication) Close() error { r.wg.Wait() + r.m.Lock() + defer r.m.Unlock() + if r.s != nil { r.s.Close() r.s = nil @@ -124,14 +130,19 @@ func (r *Replication) Log(data []byte) (*Log, error) { return nil, err } + r.ncm.Lock() close(r.nc) r.nc = make(chan struct{}) + r.ncm.Unlock() return l, nil } func (r *Replication) WaitLog() <-chan struct{} { - return r.nc + r.ncm.Lock() + ch := r.nc + r.ncm.Unlock() + return ch } func (r *Replication) StoreLog(log *Log) error { @@ -251,7 +262,6 @@ func (r *Replication) ClearWithCommitID(id uint64) error { } func (r *Replication) onPurgeExpired() { - r.wg.Add(1) defer r.wg.Done() for { diff --git a/server/app.go b/server/app.go index 0e03b3f..a021865 100644 --- a/server/app.go +++ b/server/app.go @@ -149,13 +149,18 @@ func (app *App) Run() { go app.httpServe() - for !app.closed { - conn, err := app.listener.Accept() - if err != nil { - continue - } + for { + select { + case <-app.quit: + return + default: + conn, err := app.listener.Accept() + if err != nil { + continue + } - newClientRESP(conn, app) + newClientRESP(conn, app) + } } } diff --git a/server/client.go b/server/client.go index 47d8f85..d39241b 100644 --- a/server/client.go +++ b/server/client.go @@ -3,6 +3,7 @@ package server import ( "bytes" "fmt" + "github.com/siddontang/go/sync2" "github.com/siddontang/ledisdb/ledis" "io" "time" @@ -62,7 +63,7 @@ type client struct { syncBuf bytes.Buffer - lastLogID uint64 + lastLogID sync2.AtomicUint64 // reqErr chan error diff --git a/server/cmd_replication.go b/server/cmd_replication.go index b401f35..bc26968 100644 --- a/server/cmd_replication.go +++ b/server/cmd_replication.go @@ -110,16 +110,20 @@ func syncCommand(c *client) error { return ErrCmdParams } - c.lastLogID = logId - 1 + lastLogID := logId - 1 stat, err := c.app.ldb.ReplicationStat() if err != nil { return err } - if c.lastLogID > stat.LastID { + if lastLogID > stat.LastID { return fmt.Errorf("invalid sync logid %d > %d + 1", logId, stat.LastID) - } else if c.lastLogID == stat.LastID { + } + + c.lastLogID.Set(lastLogID) + + if lastLogID == stat.LastID { c.app.slaveAck(c) } diff --git a/server/cmd_ttl_test.go b/server/cmd_ttl_test.go index c9d388c..d851b83 100644 --- a/server/cmd_ttl_test.go +++ b/server/cmd_ttl_test.go @@ -72,8 +72,8 @@ func TestExpire(t *testing.T) { if ttl, err := ledis.Int64(c.Do(ttl, key)); err != nil { t.Fatal(err) - } else if ttl != exp { - t.Fatal(ttl) + } else if ttl == -1 { + t.Fatal("no ttl") } // expireat + ttl @@ -86,8 +86,8 @@ func TestExpire(t *testing.T) { if ttl, err := ledis.Int64(c.Do(ttl, key)); err != nil { t.Fatal(err) - } else if ttl != 3 { - t.Fatal(ttl) + } else if ttl == -1 { + t.Fatal("no ttl") } kErr := "not_exist_ttl" diff --git a/server/replication.go b/server/replication.go index 24a5c1a..3f95388 100644 --- a/server/replication.go +++ b/server/replication.go @@ -19,6 +19,7 @@ import ( var ( errConnectMaster = errors.New("connect master error") + errReplClosed = errors.New("replication is closed") ) type master struct { @@ -47,17 +48,16 @@ func newMaster(app *App) *master { } func (m *master) Close() { - ledis.AsyncNotify(m.quit) + m.quit <- struct{}{} - if m.conn != nil { - //for replication, we send quit command to close gracefully - m.conn.Send("quit") - - m.conn.Close() - m.conn = nil - } + m.closeConn() m.wg.Wait() + + select { + case <-m.quit: + default: + } } func (m *master) resetConn() error { @@ -67,7 +67,6 @@ func (m *master) resetConn() error { if m.conn != nil { m.conn.Close() - m.conn = nil } m.conn = goledis.NewConn(m.addr) @@ -75,6 +74,15 @@ func (m *master) resetConn() error { return nil } +func (m *master) closeConn() { + if m.conn != nil { + //for replication, we send quit command to close gracefully + m.conn.Send("quit") + + m.conn.Close() + } +} + func (m *master) stopReplication() error { m.Close() @@ -87,9 +95,7 @@ func (m *master) startReplication(masterAddr string, restart bool) error { m.addr = masterAddr - m.quit = make(chan struct{}, 1) - - m.app.cfg.Readonly = true + m.app.cfg.SetReadonly(true) m.wg.Add(1) go m.runReplication(restart) @@ -123,28 +129,20 @@ func (m *master) runReplication(restart bool) { if restart { if err := m.fullSync(); err != nil { - if m.conn != nil { - //if conn == nil, other close the replication, not error - log.Error("restart fullsync error %s", err.Error()) - } + log.Error("restart fullsync error %s", err.Error()) return } } for { - if err := m.sync(); err != nil { - if m.conn != nil { - //if conn == nil, other close the replication, not error - log.Error("sync error %s", err.Error()) - } - return - } - select { case <-m.quit: return default: - break + if err := m.sync(); err != nil { + log.Error("sync error %s", err.Error()) + return + } } } } @@ -266,7 +264,7 @@ func (app *App) slaveof(masterAddr string, restart bool, readonly bool) error { //in master mode and no slaveof, only set readonly if len(app.cfg.SlaveOf) == 0 && len(masterAddr) == 0 { - app.cfg.Readonly = readonly + app.cfg.SetReadonly(readonly) return nil } @@ -281,7 +279,7 @@ func (app *App) slaveof(masterAddr string, restart bool, readonly bool) error { return err } - app.cfg.Readonly = readonly + app.cfg.SetReadonly(readonly) } else { return app.m.startReplication(masterAddr, restart) } @@ -323,7 +321,7 @@ func (app *App) removeSlave(c *client, activeQuit bool) { delete(app.slaves, addr) log.Info("remove slave %s", addr) if activeQuit { - asyncNotifyUint64(app.slaveSyncAck, c.lastLogID) + asyncNotifyUint64(app.slaveSyncAck, c.lastLogID.Get()) } } } @@ -339,7 +337,7 @@ func (app *App) slaveAck(c *client) { return } - asyncNotifyUint64(app.slaveSyncAck, c.lastLogID) + asyncNotifyUint64(app.slaveSyncAck, c.lastLogID.Get()) } func asyncNotifyUint64(ch chan uint64, v uint64) { @@ -369,11 +367,12 @@ func (app *App) publishNewLog(l *rpl.Log) { n := 0 logId := l.ID for _, s := range app.slaves { - if s.lastLogID == logId { + lastLogID := s.lastLogID.Get() + if lastLogID == logId { //slave has already owned this log n++ - } else if s.lastLogID > logId { - log.Error("invalid slave %s, lastlogid %d > %d", s.slaveListeningAddr, s.lastLogID, logId) + } else if lastLogID > logId { + log.Error("invalid slave %s, lastlogid %d > %d", s.slaveListeningAddr, lastLogID, logId) } }