diff --git a/README.md b/README.md index 5a8436b..9e4ee0a 100644 --- a/README.md +++ b/README.md @@ -167,18 +167,9 @@ See [Clients](https://github.com/siddontang/ledisdb/wiki/Clients) to find or con + `pcall` and `xpcall` are not supported in lua, you can see the readme in [golua](https://github.com/aarzilli/golua). -## Thanks - -Gmail: cenqichao@gmail.com - -Gmail: chendahui007@gmail.com - -Gmail: cppgohan@gmail.com - -Gmail: tiaotiaoyly@gmail.com - -Gmail: wyk4true@gmail.com +## Requirement ++ go version >= 1.3 ## Feedback diff --git a/config/config.toml b/config/config.toml index 4e04cb1..98da90e 100644 --- a/config/config.toml +++ b/config/config.toml @@ -111,7 +111,7 @@ path = "" # If sync is true, the new log must be sent to some slaves, and then commit. # It will reduce performance but have better high availability. -sync = true +sync = false # If sync is true, wait at last wait_sync_time milliseconds for slave syncing this log wait_sync_time = 500 diff --git a/etc/ledis.conf b/etc/ledis.conf index 4e04cb1..98da90e 100644 --- a/etc/ledis.conf +++ b/etc/ledis.conf @@ -111,7 +111,7 @@ path = "" # If sync is true, the new log must be sent to some slaves, and then commit. # It will reduce performance but have better high availability. -sync = true +sync = false # If sync is true, wait at last wait_sync_time milliseconds for slave syncing this log wait_sync_time = 500 diff --git a/ledis/batch.go b/ledis/batch.go index 171a3ea..c9064df 100644 --- a/ledis/batch.go +++ b/ledis/batch.go @@ -100,12 +100,13 @@ type commitDataGetter interface { func (l *Ledis) handleCommit(g commitDataGetter, c commiter) error { l.commitLock.Lock() - defer l.commitLock.Unlock() var err error if l.r != nil { var rl *rpl.Log if rl, err = l.r.Log(g.Data()); err != nil { + l.commitLock.Unlock() + log.Fatal("write wal error %s", err.Error()) return err } @@ -113,19 +114,25 @@ func (l *Ledis) handleCommit(g commitDataGetter, c commiter) error { l.propagate(rl) if err = c.Commit(); err != nil { + l.commitLock.Unlock() + log.Fatal("commit error %s", err.Error()) l.noticeReplication() return err } if err = l.r.UpdateCommitID(rl.ID); err != nil { + l.commitLock.Unlock() + log.Fatal("update commit id error %s", err.Error()) l.noticeReplication() return err } - - return nil } else { - return c.Commit() + err = c.Commit() } + + l.commitLock.Unlock() + + return err } diff --git a/rpl/file_store.go b/rpl/file_store.go index 823b1cd..13d86c8 100644 --- a/rpl/file_store.go +++ b/rpl/file_store.go @@ -163,8 +163,12 @@ func (s *FileStore) LastID() (uint64, error) { func (s *FileStore) StoreLog(l *Log) error { s.wm.Lock() - defer s.wm.Unlock() + err := s.storeLog(l) + s.wm.Unlock() + return err +} +func (s *FileStore) storeLog(l *Log) error { err := s.w.StoreLog(l) if err == nil { return nil @@ -172,23 +176,24 @@ func (s *FileStore) StoreLog(l *Log) error { return err } - s.rm.Lock() - var r *tableReader - if r, err = s.w.Flush(); err != nil { + r, err = s.w.Flush() + + if err != nil { log.Error("write table flush error %s, can not store now", err.Error()) s.w.Close() - s.rm.Unlock() - return err } + s.rm.Lock() s.rs = append(s.rs, r) s.rm.Unlock() - return s.w.StoreLog(l) + err = s.w.StoreLog(l) + + return err } func (s *FileStore) PurgeExpired(n int64) error { diff --git a/rpl/file_table.go b/rpl/file_table.go index 814475e..7b37cd8 100644 --- a/rpl/file_table.go +++ b/rpl/file_table.go @@ -546,13 +546,18 @@ func (t *tableWriter) Flush() (*tableReader, error) { } func (t *tableWriter) StoreLog(l *Log) error { + t.Lock() + err := t.storeLog(l) + t.Unlock() + + return err +} + +func (t *tableWriter) storeLog(l *Log) error { if l.ID == 0 { return ErrStoreLogID } - t.Lock() - defer t.Unlock() - if t.closed { return fmt.Errorf("table writer is closed") } @@ -588,9 +593,11 @@ func (t *tableWriter) StoreLog(l *Log) error { offsetPos := t.offsetPos - if err := l.Encode(t.wb); err != nil { + if err = l.Encode(t.wb); err != nil { return err - } else if err = t.wb.Flush(); err != nil { + } + + if err = t.wb.Flush(); err != nil { return err } @@ -652,12 +659,14 @@ func (t *tableWriter) GetLog(id uint64, l *Log) error { func (t *tableWriter) Sync() error { t.Lock() - defer t.Unlock() + var err error if t.wf != nil { - return t.wf.Sync() + err = t.wf.Sync() } - return nil + t.Unlock() + + return err } func (t *tableWriter) getLog(l *Log, pos int64) error { diff --git a/rpl/log.go b/rpl/log.go index 8500fbf..0b10fd4 100644 --- a/rpl/log.go +++ b/rpl/log.go @@ -4,8 +4,11 @@ import ( "bytes" "encoding/binary" "io" + "sync" ) +const LogHeadSize = 17 + type Log struct { ID uint64 CreateTime uint32 @@ -15,7 +18,7 @@ type Log struct { } func (l *Log) HeadSize() int { - return 17 + return LogHeadSize } func (l *Log) Size() int { @@ -23,7 +26,7 @@ func (l *Log) Size() int { } func (l *Log) Marshal() ([]byte, error) { - buf := bytes.NewBuffer(make([]byte, l.HeadSize()+len(l.Data))) + buf := bytes.NewBuffer(make([]byte, l.Size())) buf.Reset() if err := l.Encode(buf); err != nil { @@ -39,25 +42,32 @@ func (l *Log) Unmarshal(b []byte) error { return l.Decode(buf) } +var headPool = sync.Pool{ + New: func() interface{} { return make([]byte, LogHeadSize) }, +} + func (l *Log) Encode(w io.Writer) error { - if err := binary.Write(w, binary.BigEndian, l.ID); err != nil { + b := headPool.Get().([]byte) + pos := 0 + + binary.BigEndian.PutUint64(b[pos:], l.ID) + pos += 8 + binary.BigEndian.PutUint32(b[pos:], uint32(l.CreateTime)) + pos += 4 + b[pos] = l.Compression + pos++ + binary.BigEndian.PutUint32(b[pos:], uint32(len(l.Data))) + + n, err := w.Write(b) + headPool.Put(b) + + if err != nil { return err + } else if n != LogHeadSize { + return io.ErrShortWrite } - if err := binary.Write(w, binary.BigEndian, l.CreateTime); err != nil { - return err - } - - if _, err := w.Write([]byte{l.Compression}); err != nil { - return err - } - - dataLen := uint32(len(l.Data)) - if err := binary.Write(w, binary.BigEndian, dataLen); err != nil { - return err - } - - if n, err := w.Write(l.Data); err != nil { + if n, err = w.Write(l.Data); err != nil { return err } else if n != len(l.Data) { return io.ErrShortWrite @@ -86,9 +96,10 @@ func (l *Log) Decode(r io.Reader) error { } func (l *Log) DecodeHead(r io.Reader) (uint32, error) { - buf := make([]byte, l.HeadSize()) + buf := headPool.Get().([]byte) if _, err := io.ReadFull(r, buf); err != nil { + headPool.Put(buf) return 0, err } @@ -104,5 +115,7 @@ func (l *Log) DecodeHead(r io.Reader) (uint32, error) { length := binary.BigEndian.Uint32(buf[pos:]) + headPool.Put(buf) + return length, nil } diff --git a/rpl/rpl.go b/rpl/rpl.go index 1e15b6f..d232992 100644 --- a/rpl/rpl.go +++ b/rpl/rpl.go @@ -114,10 +114,10 @@ func (r *Replication) Log(data []byte) (*Log, error) { } r.m.Lock() - defer r.m.Unlock() lastID, err := r.s.LastID() if err != nil { + r.m.Unlock() return nil, err } @@ -125,6 +125,7 @@ func (r *Replication) Log(data []byte) (*Log, error) { if lastID < commitId { lastID = commitId } else if lastID > commitId { + r.m.Unlock() return nil, ErrCommitIDBehind } @@ -141,9 +142,12 @@ func (r *Replication) Log(data []byte) (*Log, error) { l.Data = data if err = r.s.StoreLog(l); err != nil { + r.m.Unlock() return nil, err } + r.m.Unlock() + r.ncm.Lock() close(r.nc) r.nc = make(chan struct{}) @@ -161,22 +165,24 @@ func (r *Replication) WaitLog() <-chan struct{} { func (r *Replication) StoreLog(log *Log) error { r.m.Lock() - defer r.m.Unlock() + err := r.s.StoreLog(log) + r.m.Unlock() - return r.s.StoreLog(log) + return err } func (r *Replication) FirstLogID() (uint64, error) { r.m.Lock() - defer r.m.Unlock() id, err := r.s.FirstID() + r.m.Unlock() + return id, err } func (r *Replication) LastLogID() (uint64, error) { r.m.Lock() - defer r.m.Unlock() id, err := r.s.LastID() + r.m.Unlock() return id, err } @@ -189,9 +195,10 @@ func (r *Replication) LastCommitID() (uint64, error) { func (r *Replication) UpdateCommitID(id uint64) error { r.m.Lock() - defer r.m.Unlock() + err := r.updateCommitID(id, r.cfg.Replication.SyncLog == 2) + r.m.Unlock() - return r.updateCommitID(id, r.cfg.Replication.SyncLog == 2) + return err } func (r *Replication) Stat() (*Stat, error) { @@ -231,14 +238,17 @@ func (r *Replication) updateCommitID(id uint64, force bool) error { func (r *Replication) CommitIDBehind() (bool, error) { r.m.Lock() - defer r.m.Unlock() id, err := r.s.LastID() if err != nil { + r.m.Unlock() return false, err } - return id > r.commitID, nil + behind := id > r.commitID + r.m.Unlock() + + return behind, nil } func (r *Replication) GetLog(id uint64, log *Log) error {