ledisdb/ledis/ledis.go

226 lines
3.8 KiB
Go
Raw Normal View History

2014-05-20 04:41:24 +04:00
package ledis
import (
"fmt"
"github.com/siddontang/go/filelock"
2014-09-24 05:46:36 +04:00
"github.com/siddontang/go/log"
"github.com/siddontang/ledisdb/config"
2014-09-22 13:50:51 +04:00
"github.com/siddontang/ledisdb/rpl"
2014-07-25 13:58:00 +04:00
"github.com/siddontang/ledisdb/store"
"io"
"os"
"path"
2014-05-29 11:07:14 +04:00
"sync"
"time"
2014-05-20 04:41:24 +04:00
)
type Ledis struct {
cfg *config.Config
2014-05-20 04:41:24 +04:00
2014-07-25 13:58:00 +04:00
ldb *store.DB
2015-03-04 04:15:28 +03:00
dbs []*DB
2014-05-27 12:05:24 +04:00
2014-06-05 11:59:10 +04:00
quit chan struct{}
2014-09-22 13:50:51 +04:00
wg sync.WaitGroup
2014-08-25 10:18:23 +04:00
//for replication
2014-09-22 13:50:51 +04:00
r *rpl.Replication
rc chan struct{}
2014-10-15 06:18:20 +04:00
rbatch *store.WriteBatch
2014-09-22 13:50:51 +04:00
rwg sync.WaitGroup
rhs []NewLogEventHandler
2014-08-30 13:39:44 +04:00
wLock sync.RWMutex //allow one write at same time
commitLock sync.Mutex //allow one write commit at same time
2014-09-17 13:54:04 +04:00
lock io.Closer
2015-03-04 04:15:28 +03:00
tcs []*ttlChecker
2014-05-20 04:41:24 +04:00
}
func Open(cfg *config.Config) (*Ledis, error) {
if len(cfg.DataDir) == 0 {
cfg.DataDir = config.DefaultDataDir
}
2015-03-04 04:15:28 +03:00
if cfg.Databases == 0 {
cfg.Databases = 16
2015-03-04 12:46:40 +03:00
} else if cfg.Databases > 256 {
cfg.Databases = 256
2015-03-04 04:15:28 +03:00
}
os.MkdirAll(cfg.DataDir, 0755)
var err error
2014-05-20 04:41:24 +04:00
l := new(Ledis)
2014-10-10 05:49:16 +04:00
l.cfg = cfg
2014-06-05 11:59:10 +04:00
if l.lock, err = filelock.Lock(path.Join(cfg.DataDir, "LOCK")); err != nil {
return nil, err
}
2014-06-05 11:59:10 +04:00
l.quit = make(chan struct{})
if l.ldb, err = store.Open(cfg); err != nil {
return nil, err
}
2014-05-20 04:41:24 +04:00
if cfg.UseReplication {
2014-09-22 13:50:51 +04:00
if l.r, err = rpl.NewReplication(cfg); err != nil {
2014-05-27 12:05:24 +04:00
return nil, err
}
2014-09-22 13:50:51 +04:00
2014-09-29 13:01:58 +04:00
l.rc = make(chan struct{}, 1)
2014-09-22 13:50:51 +04:00
l.rbatch = l.ldb.NewWriteBatch()
2014-09-27 16:13:13 +04:00
l.wg.Add(1)
2014-09-22 13:50:51 +04:00
go l.onReplication()
//first we must try wait all replication ok
//maybe some logs are not committed
l.WaitReplication()
2014-05-27 12:05:24 +04:00
} else {
2014-09-22 13:50:51 +04:00
l.r = nil
2014-05-27 12:05:24 +04:00
}
2015-03-04 04:15:28 +03:00
l.dbs = make([]*DB, cfg.Databases)
2015-03-04 12:46:40 +03:00
for i := 0; i < cfg.Databases; i++ {
l.dbs[i] = l.newDB(uint8(i))
2014-05-20 04:41:24 +04:00
}
2014-11-01 18:28:28 +03:00
l.checkTTL()
2014-05-20 04:41:24 +04:00
return l, nil
}
func (l *Ledis) Close() {
2014-06-05 11:59:10 +04:00
close(l.quit)
2014-09-22 13:50:51 +04:00
l.wg.Wait()
2014-06-05 11:59:10 +04:00
2014-05-20 04:41:24 +04:00
l.ldb.Close()
2014-06-05 11:59:10 +04:00
2014-09-22 13:50:51 +04:00
if l.r != nil {
l.r.Close()
2014-11-01 18:28:28 +03:00
//l.r = nil
2014-06-05 11:59:10 +04:00
}
if l.lock != nil {
l.lock.Close()
2014-11-01 18:28:28 +03:00
//l.lock = nil
}
2014-05-20 04:41:24 +04:00
}
func (l *Ledis) Select(index int) (*DB, error) {
2015-03-04 04:15:28 +03:00
if index < 0 || index >= len(l.dbs) {
2015-03-04 12:46:40 +03:00
return nil, fmt.Errorf("invalid db index %d, must in [0, %d]", index, len(l.dbs)-1)
2014-05-20 04:41:24 +04:00
}
return l.dbs[index], nil
}
2014-06-08 12:43:59 +04:00
// Flush All will clear all data and replication logs
2014-06-08 12:43:59 +04:00
func (l *Ledis) FlushAll() error {
l.wLock.Lock()
defer l.wLock.Unlock()
return l.flushAll()
}
func (l *Ledis) flushAll() error {
it := l.ldb.NewIterator()
defer it.Close()
2015-03-13 06:18:47 +03:00
it.SeekToFirst()
w := l.ldb.NewWriteBatch()
defer w.Rollback()
n := 0
for ; it.Valid(); it.Next() {
n++
if n == 10000 {
if err := w.Commit(); err != nil {
2015-01-12 05:32:03 +03:00
log.Fatalf("flush all commit error: %s", err.Error())
return err
}
n = 0
}
w.Delete(it.RawKey())
}
if err := w.Commit(); err != nil {
2015-01-12 05:32:03 +03:00
log.Fatalf("flush all commit error: %s", err.Error())
return err
}
if l.r != nil {
if err := l.r.Clear(); err != nil {
2015-01-12 05:32:03 +03:00
log.Fatalf("flush all replication clear error: %s", err.Error())
return err
2014-06-08 12:43:59 +04:00
}
}
return nil
}
2014-06-10 06:41:50 +04:00
2014-09-22 13:50:51 +04:00
func (l *Ledis) IsReadOnly() bool {
2014-11-01 18:28:28 +03:00
if l.cfg.GetReadonly() {
2014-09-22 13:50:51 +04:00
return true
} else if l.r != nil {
if b, _ := l.r.CommitIDBehind(); b {
return true
}
}
return false
}
func (l *Ledis) checkTTL() {
2015-03-04 04:15:28 +03:00
l.tcs = make([]*ttlChecker, len(l.dbs))
for i, db := range l.dbs {
c := newTTLChecker(db)
c.register(KVType, db.kvBatch, db.delete)
c.register(ListType, db.listBatch, db.lDelete)
c.register(HashType, db.hashBatch, db.hDelete)
c.register(ZSetType, db.zsetBatch, db.zDelete)
2015-03-03 04:21:00 +03:00
// c.register(BitType, db.binBatch, db.bDelete)
c.register(SetType, db.setBatch, db.sDelete)
l.tcs[i] = c
}
if l.cfg.TTLCheckInterval == 0 {
l.cfg.TTLCheckInterval = 1
}
2014-09-22 13:50:51 +04:00
2014-11-01 18:28:28 +03:00
l.wg.Add(1)
go func() {
defer l.wg.Done()
tick := time.NewTicker(time.Duration(l.cfg.TTLCheckInterval) * time.Second)
defer tick.Stop()
for {
select {
case <-tick.C:
if l.IsReadOnly() {
break
}
for _, c := range l.tcs {
c.check()
}
case <-l.quit:
return
}
}
2014-11-01 18:28:28 +03:00
}()
}
2014-10-15 06:41:43 +04:00
func (l *Ledis) StoreStat() *store.Stat {
return l.ldb.Stat()
}