add sync commit config for store

This commit is contained in:
siddontang 2014-10-16 09:35:35 +08:00
parent 028b9fbd55
commit 522f77a7a5
11 changed files with 115 additions and 99 deletions

View File

@ -62,9 +62,9 @@ type Config struct {
DataDir string `toml:"data_dir"`
DBName string `toml:"db_name"`
DBPath string `toml:"db_path"`
DBName string `toml:"db_name"`
DBPath string `toml:"db_path"`
DBSyncCommit int `toml:"db_sync_commit"`
LevelDB LevelDBConfig `toml:"leveldb"`

View File

@ -35,6 +35,12 @@ db_name = "leveldb"
# If not set, use data_dir/"db_name"_data
db_path = ""
# Sync commit to disk if possible
# 0: no sync
# 1: sync every second
# 2: sync every commit
db_sync_commit = 0
# enable replication or not
use_replication = true

View File

@ -35,6 +35,12 @@ db_name = "leveldb"
# If not set, use data_dir/"db_name"_data
db_path = ""
# Sync commit to disk if possible
# 0: no sync
# 1: sync every second
# 2: sync every commit
db_sync_commit = 0
# enable replication or not
use_replication = true
@ -81,7 +87,7 @@ compression = true
[snapshot]
# Path to store snapshot dump file
# if not set, use data_dir/snapshot
# snapshot file name format is snap-2006-01-02T15:04:05.999999999.dmp
# snapshot file name format is dmp-2006-01-02T15:04:05.999999999
path = ""
# Reserve newest max_num snapshot dump files

View File

@ -21,8 +21,6 @@ type GoLevelDBStore struct {
first uint64
last uint64
lastCommit time.Time
}
func (s *GoLevelDBStore) FirstID() (uint64, error) {
@ -134,17 +132,7 @@ func (s *GoLevelDBStore) StoreLogs(logs []*Log) error {
w.Put(key, buf.Bytes())
}
n := time.Now()
if s.cfg.Replication.SyncLog == 2 ||
(s.cfg.Replication.SyncLog == 1 && n.Sub(s.lastCommit) > time.Second) {
err = w.SyncCommit()
} else {
err = w.Commit()
}
s.lastCommit = n
if err != nil {
if err = w.Commit(); err != nil {
return err
}
@ -268,7 +256,7 @@ func (s *GoLevelDBStore) open() error {
return err
}
func NewGoLevelDBStore(base string) (*GoLevelDBStore, error) {
func NewGoLevelDBStore(base string, syncLog int) (*GoLevelDBStore, error) {
cfg := config.NewConfigDefault()
cfg.DBName = "goleveldb"
cfg.DBPath = base
@ -276,6 +264,7 @@ func NewGoLevelDBStore(base string) (*GoLevelDBStore, error) {
cfg.LevelDB.CacheSize = 64 * 1024 * 1024
cfg.LevelDB.WriteBufferSize = 64 * 1024 * 1024
cfg.LevelDB.Compression = false
cfg.DBSyncCommit = syncLog
s := new(GoLevelDBStore)
s.cfg = cfg

View File

@ -49,7 +49,7 @@ func NewReplication(cfg *config.Config) (*Replication, error) {
r.cfg = cfg
var err error
if r.s, err = NewGoLevelDBStore(path.Join(base, "wal")); err != nil {
if r.s, err = NewGoLevelDBStore(path.Join(base, "wal"), cfg.Replication.SyncLog); err != nil {
return nil, err
}

View File

@ -16,7 +16,7 @@ func TestGoLevelDBStore(t *testing.T) {
defer os.RemoveAll(dir)
// New level
l, err := NewGoLevelDBStore(dir)
l, err := NewGoLevelDBStore(dir, 0)
if err != nil {
t.Fatalf("err: %v ", err)
}

View File

@ -1,15 +1,27 @@
package store
import (
"github.com/siddontang/ledisdb/config"
"github.com/siddontang/ledisdb/store/driver"
"sync"
"time"
)
type DB struct {
driver.IDB
db driver.IDB
name string
st *Stat
cfg *config.Config
lastCommit time.Time
m sync.Mutex
}
func (db *DB) Close() error {
return db.db.Close()
}
func (db *DB) String() string {
@ -20,43 +32,46 @@ func (db *DB) NewIterator() *Iterator {
db.st.IterNum.Add(1)
it := new(Iterator)
it.it = db.IDB.NewIterator()
it.it = db.db.NewIterator()
it.st = db.st
return it
}
func (db *DB) Get(key []byte) ([]byte, error) {
v, err := db.IDB.Get(key)
v, err := db.db.Get(key)
db.st.statGet(v, err)
return v, err
}
func (db *DB) Put(key []byte, value []byte) error {
db.st.PutNum.Add(1)
return db.IDB.Put(key, value)
if db.needSyncCommit() {
return db.db.SyncPut(key, value)
} else {
return db.db.Put(key, value)
}
}
func (db *DB) Delete(key []byte) error {
db.st.DeleteNum.Add(1)
return db.IDB.Delete(key)
}
func (db *DB) SyncPut(key []byte, value []byte) error {
db.st.SyncPutNum.Add(1)
return db.IDB.SyncPut(key, value)
}
func (db *DB) SyncDelete(key []byte) error {
db.st.SyncDeleteNum.Add(1)
return db.IDB.SyncDelete(key)
if db.needSyncCommit() {
return db.db.SyncDelete(key)
} else {
return db.db.Delete(key)
}
}
func (db *DB) NewWriteBatch() *WriteBatch {
db.st.BatchNum.Add(1)
wb := new(WriteBatch)
wb.IWriteBatch = db.IDB.NewWriteBatch()
wb.wb = db.db.NewWriteBatch()
wb.st = db.st
wb.db = db
return wb
}
@ -65,7 +80,7 @@ func (db *DB) NewSnapshot() (*Snapshot, error) {
var err error
s := &Snapshot{}
if s.ISnapshot, err = db.IDB.NewSnapshot(); err != nil {
if s.ISnapshot, err = db.db.NewSnapshot(); err != nil {
return nil, err
}
s.st = db.st
@ -77,7 +92,7 @@ func (db *DB) Compact() error {
db.st.CompactNum.Add(1)
t := time.Now()
err := db.IDB.Compact()
err := db.db.Compact()
db.st.CompactTotalTime.Add(time.Now().Sub(t))
@ -107,7 +122,7 @@ func (db *DB) RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, off
}
func (db *DB) Begin() (*Tx, error) {
tx, err := db.IDB.Begin()
tx, err := db.db.Begin()
if err != nil {
return nil, err
}
@ -120,3 +135,24 @@ func (db *DB) Begin() (*Tx, error) {
func (db *DB) Stat() *Stat {
return db.st
}
func (db *DB) needSyncCommit() bool {
if db.cfg.DBSyncCommit == 0 {
return false
} else if db.cfg.DBSyncCommit == 2 {
return true
} else {
n := time.Now()
need := false
db.m.Lock()
if n.Sub(db.lastCommit) > time.Second {
need = true
}
db.lastCommit = n
db.m.Unlock()
return need
}
}

View File

@ -5,25 +5,22 @@ import (
)
type Stat struct {
GetNum sync2.AtomicInt64
GetMissingNum sync2.AtomicInt64
PutNum sync2.AtomicInt64
DeleteNum sync2.AtomicInt64
SyncPutNum sync2.AtomicInt64
SyncDeleteNum sync2.AtomicInt64
IterNum sync2.AtomicInt64
IterSeekNum sync2.AtomicInt64
IterCloseNum sync2.AtomicInt64
SnapshotNum sync2.AtomicInt64
SnapshotCloseNum sync2.AtomicInt64
BatchNum sync2.AtomicInt64
BatchCommitNum sync2.AtomicInt64
BatchSyncCommitNum sync2.AtomicInt64
TxNum sync2.AtomicInt64
TxCommitNum sync2.AtomicInt64
TxCloseNum sync2.AtomicInt64
CompactNum sync2.AtomicInt64
CompactTotalTime sync2.AtomicDuration
GetNum sync2.AtomicInt64
GetMissingNum sync2.AtomicInt64
PutNum sync2.AtomicInt64
DeleteNum sync2.AtomicInt64
IterNum sync2.AtomicInt64
IterSeekNum sync2.AtomicInt64
IterCloseNum sync2.AtomicInt64
SnapshotNum sync2.AtomicInt64
SnapshotCloseNum sync2.AtomicInt64
BatchNum sync2.AtomicInt64
BatchCommitNum sync2.AtomicInt64
TxNum sync2.AtomicInt64
TxCommitNum sync2.AtomicInt64
TxCloseNum sync2.AtomicInt64
CompactNum sync2.AtomicInt64
CompactTotalTime sync2.AtomicDuration
}
func (st *Stat) statGet(v []byte, err error) {
@ -35,23 +32,4 @@ func (st *Stat) statGet(v []byte, err error) {
func (st *Stat) Reset() {
*st = Stat{}
// st.GetNum.Set(0)
// st.GetMissingNum.Set(0)
// st.PutNum.Set(0)
// st.DeleteNum.Set(0)
// st.SyncPutNum.Set(0)
// st.SyncDeleteNum.Set(0)
// st.IterNum.Set(0)
// st.IterSeekNum.Set(0)
// st.IterCloseNum.Set(0)
// st.SnapshotNum.Set(0)
// st.SnapshotCloseNum.Set(0)
// st.BatchNum.Set(0)
// st.BatchCommitNum.Set(0)
// st.BatchSyncCommitNum.Set(0)
// st.TxNum.Set(0)
// st.TxCommitNum.Set(0)
// st.TxCloseNum.Set(0)
// st.CompactNum.Set(0)
// st.CompactTotalTime.Set(0)
}

View File

@ -40,7 +40,11 @@ func Open(cfg *config.Config) (*DB, error) {
return nil, err
}
db := &DB{idb, s.String(), &Stat{}}
db := new(DB)
db.db = idb
db.name = s.String()
db.st = &Stat{}
db.cfg = cfg
return db, nil
}

View File

@ -5,13 +5,13 @@ import (
)
type Tx struct {
driver.Tx
tx driver.Tx
st *Stat
}
func (tx *Tx) NewIterator() *Iterator {
it := new(Iterator)
it.it = tx.Tx.NewIterator()
it.it = tx.tx.NewIterator()
it.st = tx.st
tx.st.IterNum.Add(1)
@ -23,7 +23,7 @@ func (tx *Tx) NewWriteBatch() *WriteBatch {
tx.st.BatchNum.Add(1)
wb := new(WriteBatch)
wb.IWriteBatch = tx.Tx.NewWriteBatch()
wb.wb = tx.tx.NewWriteBatch()
wb.st = tx.st
return wb
}
@ -51,26 +51,26 @@ func (tx *Tx) RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, off
}
func (tx *Tx) Get(key []byte) ([]byte, error) {
v, err := tx.Tx.Get(key)
v, err := tx.tx.Get(key)
tx.st.statGet(v, err)
return v, err
}
func (tx *Tx) Put(key []byte, value []byte) error {
tx.st.PutNum.Add(1)
return tx.Tx.Put(key, value)
return tx.tx.Put(key, value)
}
func (tx *Tx) Delete(key []byte) error {
tx.st.DeleteNum.Add(1)
return tx.Tx.Delete(key)
return tx.tx.Delete(key)
}
func (tx *Tx) Commit() error {
tx.st.TxCommitNum.Add(1)
return tx.Tx.Commit()
return tx.tx.Commit()
}
func (tx *Tx) Rollback() error {
return tx.Tx.Rollback()
return tx.tx.Rollback()
}

View File

@ -5,20 +5,22 @@ import (
)
type WriteBatch struct {
driver.IWriteBatch
wb driver.IWriteBatch
st *Stat
putNum int64
deleteNum int64
db *DB
}
func (wb *WriteBatch) Put(key []byte, value []byte) {
wb.putNum++
wb.IWriteBatch.Put(key, value)
wb.wb.Put(key, value)
}
func (wb *WriteBatch) Delete(key []byte) {
wb.deleteNum++
wb.IWriteBatch.Delete(key)
wb.wb.Delete(key)
}
func (wb *WriteBatch) Commit() error {
@ -27,18 +29,13 @@ func (wb *WriteBatch) Commit() error {
wb.st.DeleteNum.Add(wb.deleteNum)
wb.putNum = 0
wb.deleteNum = 0
return wb.IWriteBatch.Commit()
}
func (wb *WriteBatch) SyncCommit() error {
wb.st.BatchSyncCommitNum.Add(1)
wb.st.SyncPutNum.Add(wb.putNum)
wb.st.SyncDeleteNum.Add(wb.deleteNum)
wb.putNum = 0
wb.deleteNum = 0
return wb.IWriteBatch.SyncCommit()
if wb.db == nil || !wb.db.needSyncCommit() {
return wb.wb.Commit()
} else {
return wb.wb.SyncCommit()
}
}
func (wb *WriteBatch) Rollback() error {
return wb.IWriteBatch.Rollback()
return wb.wb.Rollback()
}