fix go test race error

This commit is contained in:
siddontang 2014-11-01 23:28:28 +08:00
parent c2a9131bf0
commit 7b5d950cc9
11 changed files with 138 additions and 72 deletions

View File

@ -22,5 +22,8 @@ clean:
test:
$(GO) test -tags '$(GO_BUILD_TAGS)' ./...
test_race:
$(GO) test -race -tags '$(GO_BUILD_TAGS)' ./...
pytest:
sh client/ledis-py/tests/all.sh

View File

@ -8,6 +8,7 @@ import (
"io"
"net"
"strconv"
"sync"
)
// Error represents an error returned in a command reply.
@ -16,6 +17,12 @@ type Error string
func (err Error) Error() string { return string(err) }
type Conn struct {
cm sync.Mutex
wm sync.Mutex
rm sync.Mutex
closed bool
client *Client
addr string
@ -42,6 +49,8 @@ func NewConn(addr string) *Conn {
co.rSize = 4096
co.wSize = 4096
co.closed = false
return co
}
@ -73,6 +82,9 @@ func (c *Conn) Send(cmd string, args ...interface{}) error {
return err
}
c.wm.Lock()
defer c.wm.Unlock()
if err := c.writeCommand(cmd, args); err != nil {
c.finalize()
return err
@ -86,6 +98,9 @@ func (c *Conn) Send(cmd string, args ...interface{}) error {
}
func (c *Conn) Receive() (interface{}, error) {
c.rm.Lock()
defer c.rm.Unlock()
if reply, err := c.readReply(); err != nil {
c.finalize()
return nil, err
@ -99,6 +114,9 @@ func (c *Conn) Receive() (interface{}, error) {
}
func (c *Conn) ReceiveBulkTo(w io.Writer) error {
c.rm.Lock()
defer c.rm.Unlock()
err := c.readBulkReplyTo(w)
if err != nil {
if _, ok := err.(Error); !ok {
@ -109,20 +127,26 @@ func (c *Conn) ReceiveBulkTo(w io.Writer) error {
}
func (c *Conn) finalize() {
if c.c != nil {
c.cm.Lock()
if !c.closed {
c.c.Close()
c.c = nil
c.closed = true
}
c.cm.Unlock()
}
func (c *Conn) connect() error {
if c.c != nil {
c.cm.Lock()
defer c.cm.Unlock()
if !c.closed && c.c != nil {
return nil
}
var err error
c.c, err = net.Dial(getProto(c.addr), c.addr)
if err != nil {
c.c = nil
return err
}

View File

@ -7,6 +7,7 @@ import (
"github.com/siddontang/go/ioutil2"
"io"
"io/ioutil"
"sync"
)
var (
@ -83,6 +84,8 @@ type SnapshotConfig struct {
}
type Config struct {
m sync.RWMutex `toml:"-"`
FileName string `toml:"-"`
Addr string `toml:"addr"`
@ -254,3 +257,16 @@ func (cfg *Config) Rewrite() error {
return cfg.DumpFile(cfg.FileName)
}
func (cfg *Config) GetReadonly() bool {
cfg.m.RLock()
b := cfg.Readonly
cfg.m.RUnlock()
return b
}
func (cfg *Config) SetReadonly(b bool) {
cfg.m.Lock()
cfg.Readonly = b
cfg.m.Unlock()
}

View File

@ -82,8 +82,7 @@ func Open(cfg *config.Config) (*Ledis, error) {
l.dbs[i] = l.newDB(i)
}
l.wg.Add(1)
go l.checkTTL()
l.checkTTL()
return l, nil
}
@ -96,12 +95,12 @@ func (l *Ledis) Close() {
if l.r != nil {
l.r.Close()
l.r = nil
//l.r = nil
}
if l.lock != nil {
l.lock.Close()
l.lock = nil
//l.lock = nil
}
}
@ -157,7 +156,7 @@ func (l *Ledis) flushAll() error {
}
func (l *Ledis) IsReadOnly() bool {
if l.cfg.Readonly {
if l.cfg.GetReadonly() {
return true
} else if l.r != nil {
if b, _ := l.r.CommitIDBehind(); b {
@ -168,8 +167,6 @@ func (l *Ledis) IsReadOnly() bool {
}
func (l *Ledis) checkTTL() {
defer l.wg.Done()
for i, db := range l.dbs {
c := newTTLChecker(db)
@ -187,23 +184,29 @@ func (l *Ledis) checkTTL() {
l.cfg.TTLCheckInterval = 1
}
tick := time.NewTicker(time.Duration(l.cfg.TTLCheckInterval) * time.Second)
defer tick.Stop()
l.wg.Add(1)
go func() {
defer l.wg.Done()
for {
select {
case <-tick.C:
if l.IsReadOnly() {
break
}
tick := time.NewTicker(time.Duration(l.cfg.TTLCheckInterval) * time.Second)
defer tick.Stop()
for _, c := range l.tcs {
c.check()
for {
select {
case <-tick.C:
if l.IsReadOnly() {
break
}
for _, c := range l.tcs {
c.check()
}
case <-l.quit:
return
}
case <-l.quit:
return
}
}
}()
}

View File

@ -85,7 +85,8 @@ func (db *DB) expireAt(t *batch, dataType byte, key []byte, when int64) {
t.Put(tk, mk)
t.Put(mk, PutInt64(when))
db.l.tcs[db.index].setNextCheckTime(when, false)
tc := db.l.tcs[db.index]
tc.setNextCheckTime(when, false)
}
func (db *DB) ttl(dataType byte, key []byte) (t int64, err error) {

View File

@ -32,6 +32,8 @@ type Replication struct {
wg sync.WaitGroup
nc chan struct{}
ncm sync.Mutex
}
func NewReplication(cfg *config.Config) (*Replication, error) {
@ -63,6 +65,7 @@ func NewReplication(cfg *config.Config) (*Replication, error) {
return nil, err
}
r.wg.Add(1)
go r.onPurgeExpired()
return r, nil
@ -73,6 +76,9 @@ func (r *Replication) Close() error {
r.wg.Wait()
r.m.Lock()
defer r.m.Unlock()
if r.s != nil {
r.s.Close()
r.s = nil
@ -124,14 +130,19 @@ func (r *Replication) Log(data []byte) (*Log, error) {
return nil, err
}
r.ncm.Lock()
close(r.nc)
r.nc = make(chan struct{})
r.ncm.Unlock()
return l, nil
}
func (r *Replication) WaitLog() <-chan struct{} {
return r.nc
r.ncm.Lock()
ch := r.nc
r.ncm.Unlock()
return ch
}
func (r *Replication) StoreLog(log *Log) error {
@ -255,7 +266,6 @@ func (r *Replication) ClearWithCommitID(id uint64) error {
}
func (r *Replication) onPurgeExpired() {
r.wg.Add(1)
defer r.wg.Done()
for {

View File

@ -149,13 +149,18 @@ func (app *App) Run() {
go app.httpServe()
for !app.closed {
conn, err := app.listener.Accept()
if err != nil {
continue
}
for {
select {
case <-app.quit:
return
default:
conn, err := app.listener.Accept()
if err != nil {
continue
}
newClientRESP(conn, app)
newClientRESP(conn, app)
}
}
}

View File

@ -3,6 +3,7 @@ package server
import (
"bytes"
"fmt"
"github.com/siddontang/go/sync2"
"github.com/siddontang/ledisdb/ledis"
"io"
"time"
@ -62,7 +63,7 @@ type client struct {
syncBuf bytes.Buffer
lastLogID uint64
lastLogID sync2.AtomicUint64
// reqErr chan error

View File

@ -110,16 +110,20 @@ func syncCommand(c *client) error {
return ErrCmdParams
}
c.lastLogID = logId - 1
lastLogID := logId - 1
stat, err := c.app.ldb.ReplicationStat()
if err != nil {
return err
}
if c.lastLogID > stat.LastID {
if lastLogID > stat.LastID {
return fmt.Errorf("invalid sync logid %d > %d + 1", logId, stat.LastID)
} else if c.lastLogID == stat.LastID {
}
c.lastLogID.Set(lastLogID)
if lastLogID == stat.LastID {
c.app.slaveAck(c)
}

View File

@ -72,8 +72,8 @@ func TestExpire(t *testing.T) {
if ttl, err := ledis.Int64(c.Do(ttl, key)); err != nil {
t.Fatal(err)
} else if ttl != exp {
t.Fatal(ttl)
} else if ttl == -1 {
t.Fatal("no ttl")
}
// expireat + ttl
@ -86,8 +86,8 @@ func TestExpire(t *testing.T) {
if ttl, err := ledis.Int64(c.Do(ttl, key)); err != nil {
t.Fatal(err)
} else if ttl != 3 {
t.Fatal(ttl)
} else if ttl == -1 {
t.Fatal("no ttl")
}
kErr := "not_exist_ttl"

View File

@ -19,6 +19,7 @@ import (
var (
errConnectMaster = errors.New("connect master error")
errReplClosed = errors.New("replication is closed")
)
type master struct {
@ -47,17 +48,16 @@ func newMaster(app *App) *master {
}
func (m *master) Close() {
ledis.AsyncNotify(m.quit)
m.quit <- struct{}{}
if m.conn != nil {
//for replication, we send quit command to close gracefully
m.conn.Send("quit")
m.conn.Close()
m.conn = nil
}
m.closeConn()
m.wg.Wait()
select {
case <-m.quit:
default:
}
}
func (m *master) resetConn() error {
@ -67,7 +67,6 @@ func (m *master) resetConn() error {
if m.conn != nil {
m.conn.Close()
m.conn = nil
}
m.conn = goledis.NewConn(m.addr)
@ -75,6 +74,15 @@ func (m *master) resetConn() error {
return nil
}
func (m *master) closeConn() {
if m.conn != nil {
//for replication, we send quit command to close gracefully
m.conn.Send("quit")
m.conn.Close()
}
}
func (m *master) stopReplication() error {
m.Close()
@ -87,9 +95,7 @@ func (m *master) startReplication(masterAddr string, restart bool) error {
m.addr = masterAddr
m.quit = make(chan struct{}, 1)
m.app.cfg.Readonly = true
m.app.cfg.SetReadonly(true)
m.wg.Add(1)
go m.runReplication(restart)
@ -123,28 +129,20 @@ func (m *master) runReplication(restart bool) {
if restart {
if err := m.fullSync(); err != nil {
if m.conn != nil {
//if conn == nil, other close the replication, not error
log.Error("restart fullsync error %s", err.Error())
}
log.Error("restart fullsync error %s", err.Error())
return
}
}
for {
if err := m.sync(); err != nil {
if m.conn != nil {
//if conn == nil, other close the replication, not error
log.Error("sync error %s", err.Error())
}
return
}
select {
case <-m.quit:
return
default:
break
if err := m.sync(); err != nil {
log.Error("sync error %s", err.Error())
return
}
}
}
}
@ -266,7 +264,7 @@ func (app *App) slaveof(masterAddr string, restart bool, readonly bool) error {
//in master mode and no slaveof, only set readonly
if len(app.cfg.SlaveOf) == 0 && len(masterAddr) == 0 {
app.cfg.Readonly = readonly
app.cfg.SetReadonly(readonly)
return nil
}
@ -281,7 +279,7 @@ func (app *App) slaveof(masterAddr string, restart bool, readonly bool) error {
return err
}
app.cfg.Readonly = readonly
app.cfg.SetReadonly(readonly)
} else {
return app.m.startReplication(masterAddr, restart)
}
@ -323,7 +321,7 @@ func (app *App) removeSlave(c *client, activeQuit bool) {
delete(app.slaves, addr)
log.Info("remove slave %s", addr)
if activeQuit {
asyncNotifyUint64(app.slaveSyncAck, c.lastLogID)
asyncNotifyUint64(app.slaveSyncAck, c.lastLogID.Get())
}
}
}
@ -339,7 +337,7 @@ func (app *App) slaveAck(c *client) {
return
}
asyncNotifyUint64(app.slaveSyncAck, c.lastLogID)
asyncNotifyUint64(app.slaveSyncAck, c.lastLogID.Get())
}
func asyncNotifyUint64(ch chan uint64, v uint64) {
@ -369,11 +367,12 @@ func (app *App) publishNewLog(l *rpl.Log) {
n := 0
logId := l.ID
for _, s := range app.slaves {
if s.lastLogID == logId {
lastLogID := s.lastLogID.Get()
if lastLogID == logId {
//slave has already owned this log
n++
} else if s.lastLogID > logId {
log.Error("invalid slave %s, lastlogid %d > %d", s.slaveListeningAddr, s.lastLogID, logId)
} else if lastLogID > logId {
log.Error("invalid slave %s, lastlogid %d > %d", s.slaveListeningAddr, lastLogID, logId)
}
}