ledisdb/rpl/rpl.go

337 lines
5.6 KiB
Go
Raw Normal View History

2014-09-22 13:50:51 +04:00
package rpl
import (
"encoding/binary"
"os"
"path"
"sync"
"time"
2015-05-04 17:42:28 +03:00
"github.com/siddontang/go/log"
"github.com/siddontang/go/snappy"
"github.com/siddontang/ledisdb/config"
2014-09-22 13:50:51 +04:00
)
type Stat struct {
FirstID uint64
LastID uint64
CommitID uint64
}
2014-09-22 13:50:51 +04:00
type Replication struct {
m sync.Mutex
cfg *config.Config
s LogStore
2014-11-11 10:20:26 +03:00
commitID uint64
commitLog *os.File
2014-09-22 13:50:51 +04:00
quit chan struct{}
wg sync.WaitGroup
nc chan struct{}
2014-11-01 18:28:28 +03:00
ncm sync.Mutex
2014-09-22 13:50:51 +04:00
}
func NewReplication(cfg *config.Config) (*Replication, error) {
if len(cfg.Replication.Path) == 0 {
cfg.Replication.Path = path.Join(cfg.DataDir, "rpl")
}
base := cfg.Replication.Path
r := new(Replication)
r.quit = make(chan struct{})
r.nc = make(chan struct{})
2014-09-22 13:50:51 +04:00
r.cfg = cfg
var err error
2014-11-07 11:35:54 +03:00
switch cfg.Replication.StoreName {
case "goleveldb":
if r.s, err = NewGoLevelDBStore(path.Join(base, "wal"), cfg.Replication.SyncLog); err != nil {
return nil, err
}
default:
2014-11-15 16:20:12 +03:00
if r.s, err = NewFileStore(path.Join(base, "ldb"), cfg); err != nil {
2014-11-07 11:35:54 +03:00
return nil, err
}
2014-09-22 13:50:51 +04:00
}
if r.commitLog, err = os.OpenFile(path.Join(base, "commit.log"), os.O_RDWR|os.O_CREATE, 0644); err != nil {
return nil, err
}
if s, _ := r.commitLog.Stat(); s.Size() == 0 {
r.commitID = 0
} else if err = binary.Read(r.commitLog, binary.BigEndian, &r.commitID); err != nil {
return nil, err
}
2017-03-18 09:17:37 +03:00
log.Infof("staring replication with commit ID %d", r.commitID)
2014-11-01 18:28:28 +03:00
r.wg.Add(1)
2014-11-11 10:20:26 +03:00
go r.run()
2014-09-22 13:50:51 +04:00
return r, nil
}
func (r *Replication) Close() error {
close(r.quit)
r.wg.Wait()
2014-11-01 18:28:28 +03:00
r.m.Lock()
defer r.m.Unlock()
2017-03-18 09:17:37 +03:00
log.Infof("closing replication with commit ID %d", r.commitID)
2014-09-22 13:50:51 +04:00
if r.s != nil {
r.s.Close()
r.s = nil
}
2014-11-10 09:13:29 +03:00
if err := r.updateCommitID(r.commitID, true); err != nil {
2015-01-12 05:32:03 +03:00
log.Errorf("update commit id err %s", err.Error())
2014-11-10 09:13:29 +03:00
}
2014-09-22 13:50:51 +04:00
if r.commitLog != nil {
r.commitLog.Close()
r.commitLog = nil
}
return nil
}
func (r *Replication) Log(data []byte) (*Log, error) {
2014-09-27 06:08:45 +04:00
if r.cfg.Replication.Compression {
//todo optimize
var err error
if data, err = snappy.Encode(nil, data); err != nil {
return nil, err
}
}
2014-09-22 13:50:51 +04:00
r.m.Lock()
lastID, err := r.s.LastID()
if err != nil {
r.m.Unlock()
2014-09-22 13:50:51 +04:00
return nil, err
}
commitId := r.commitID
if lastID < commitId {
lastID = commitId
2014-11-18 12:50:22 +03:00
} else if lastID > commitId {
r.m.Unlock()
2014-11-18 12:50:22 +03:00
return nil, ErrCommitIDBehind
2014-09-22 13:50:51 +04:00
}
l := new(Log)
l.ID = lastID + 1
l.CreateTime = uint32(time.Now().Unix())
2014-09-27 06:08:45 +04:00
if r.cfg.Replication.Compression {
l.Compression = 1
} else {
l.Compression = 0
}
2014-09-22 13:50:51 +04:00
l.Data = data
if err = r.s.StoreLog(l); err != nil {
r.m.Unlock()
2014-09-22 13:50:51 +04:00
return nil, err
}
r.m.Unlock()
2014-11-01 18:28:28 +03:00
r.ncm.Lock()
close(r.nc)
r.nc = make(chan struct{})
2014-11-01 18:28:28 +03:00
r.ncm.Unlock()
2014-09-22 13:50:51 +04:00
return l, nil
}
func (r *Replication) WaitLog() <-chan struct{} {
2014-11-01 18:28:28 +03:00
r.ncm.Lock()
ch := r.nc
r.ncm.Unlock()
return ch
}
2014-09-22 13:50:51 +04:00
func (r *Replication) StoreLog(log *Log) error {
r.m.Lock()
err := r.s.StoreLog(log)
r.m.Unlock()
2014-09-22 13:50:51 +04:00
return err
2014-09-22 13:50:51 +04:00
}
func (r *Replication) FirstLogID() (uint64, error) {
r.m.Lock()
id, err := r.s.FirstID()
r.m.Unlock()
2014-09-22 13:50:51 +04:00
return id, err
}
func (r *Replication) LastLogID() (uint64, error) {
r.m.Lock()
id, err := r.s.LastID()
r.m.Unlock()
2014-09-22 13:50:51 +04:00
return id, err
}
func (r *Replication) LastCommitID() (uint64, error) {
r.m.Lock()
id := r.commitID
r.m.Unlock()
return id, nil
}
func (r *Replication) UpdateCommitID(id uint64) error {
r.m.Lock()
err := r.updateCommitID(id, r.cfg.Replication.SyncLog == 2)
r.m.Unlock()
2014-09-22 13:50:51 +04:00
return err
}
func (r *Replication) Stat() (*Stat, error) {
r.m.Lock()
defer r.m.Unlock()
s := &Stat{}
var err error
if s.FirstID, err = r.s.FirstID(); err != nil {
return nil, err
}
if s.LastID, err = r.s.LastID(); err != nil {
return nil, err
}
s.CommitID = r.commitID
return s, nil
}
2014-11-10 09:13:29 +03:00
func (r *Replication) updateCommitID(id uint64, force bool) error {
2014-11-11 10:20:26 +03:00
if force {
2014-11-10 09:13:29 +03:00
if _, err := r.commitLog.Seek(0, os.SEEK_SET); err != nil {
return err
}
if err := binary.Write(r.commitLog, binary.BigEndian, id); err != nil {
return err
}
2014-09-22 13:50:51 +04:00
}
r.commitID = id
return nil
}
func (r *Replication) CommitIDBehind() (bool, error) {
r.m.Lock()
id, err := r.s.LastID()
if err != nil {
r.m.Unlock()
2014-09-22 13:50:51 +04:00
return false, err
}
behind := id > r.commitID
r.m.Unlock()
return behind, nil
2014-09-22 13:50:51 +04:00
}
func (r *Replication) GetLog(id uint64, log *Log) error {
return r.s.GetLog(id, log)
}
func (r *Replication) NextNeedCommitLog(log *Log) error {
2014-09-22 13:50:51 +04:00
r.m.Lock()
defer r.m.Unlock()
id, err := r.s.LastID()
if err != nil {
return err
}
if id <= r.commitID {
return ErrNoBehindLog
}
return r.s.GetLog(r.commitID+1, log)
}
func (r *Replication) Clear() error {
return r.ClearWithCommitID(0)
}
func (r *Replication) ClearWithCommitID(id uint64) error {
r.m.Lock()
defer r.m.Unlock()
if err := r.s.Clear(); err != nil {
return err
}
2014-11-10 09:13:29 +03:00
return r.updateCommitID(id, true)
}
2014-11-11 10:20:26 +03:00
func (r *Replication) run() {
2014-09-22 13:50:51 +04:00
defer r.wg.Done()
2014-11-11 10:20:26 +03:00
syncTc := time.NewTicker(1 * time.Second)
purgeTc := time.NewTicker(1 * time.Hour)
2014-09-22 13:50:51 +04:00
for {
select {
2014-11-11 10:20:26 +03:00
case <-purgeTc.C:
2014-09-22 13:50:51 +04:00
n := (r.cfg.Replication.ExpiredLogDays * 24 * 3600)
r.m.Lock()
2014-11-11 10:20:26 +03:00
err := r.s.PurgeExpired(int64(n))
r.m.Unlock()
if err != nil {
2015-01-12 05:32:03 +03:00
log.Errorf("purge expired log error %s", err.Error())
2014-09-22 13:50:51 +04:00
}
2014-11-11 10:20:26 +03:00
case <-syncTc.C:
if r.cfg.Replication.SyncLog == 1 {
r.m.Lock()
err := r.s.Sync()
r.m.Unlock()
if err != nil {
2015-01-12 05:32:03 +03:00
log.Errorf("sync store error %s", err.Error())
2014-11-11 10:20:26 +03:00
}
}
if r.cfg.Replication.SyncLog != 2 {
//we will sync commit id every 1 second
r.m.Lock()
err := r.updateCommitID(r.commitID, true)
r.m.Unlock()
if err != nil {
2015-01-12 05:32:03 +03:00
log.Errorf("sync commitid error %s", err.Error())
2014-11-11 10:20:26 +03:00
}
}
2014-09-22 13:50:51 +04:00
case <-r.quit:
2014-11-11 10:20:26 +03:00
syncTc.Stop()
purgeTc.Stop()
2014-09-22 13:50:51 +04:00
return
}
}
}