diff --git a/config/config.go b/config/config.go index aa74c9d..aa3c1cf 100644 --- a/config/config.go +++ b/config/config.go @@ -62,9 +62,9 @@ type Config struct { DataDir string `toml:"data_dir"` - DBName string `toml:"db_name"` - - DBPath string `toml:"db_path"` + DBName string `toml:"db_name"` + DBPath string `toml:"db_path"` + DBSyncCommit int `toml:"db_sync_commit"` LevelDB LevelDBConfig `toml:"leveldb"` diff --git a/config/config.toml b/config/config.toml index 8a8cfba..690db30 100644 --- a/config/config.toml +++ b/config/config.toml @@ -35,6 +35,12 @@ db_name = "leveldb" # If not set, use data_dir/"db_name"_data db_path = "" +# Sync commit to disk if possible +# 0: no sync +# 1: sync every second +# 2: sync every commit +db_sync_commit = 0 + # enable replication or not use_replication = true diff --git a/etc/ledis.conf b/etc/ledis.conf index 27de6c5..690db30 100644 --- a/etc/ledis.conf +++ b/etc/ledis.conf @@ -35,6 +35,12 @@ db_name = "leveldb" # If not set, use data_dir/"db_name"_data db_path = "" +# Sync commit to disk if possible +# 0: no sync +# 1: sync every second +# 2: sync every commit +db_sync_commit = 0 + # enable replication or not use_replication = true @@ -81,7 +87,7 @@ compression = true [snapshot] # Path to store snapshot dump file # if not set, use data_dir/snapshot -# snapshot file name format is snap-2006-01-02T15:04:05.999999999.dmp +# snapshot file name format is dmp-2006-01-02T15:04:05.999999999 path = "" # Reserve newest max_num snapshot dump files diff --git a/rpl/goleveldb_store.go b/rpl/goleveldb_store.go index c00f36c..39bf63a 100644 --- a/rpl/goleveldb_store.go +++ b/rpl/goleveldb_store.go @@ -21,8 +21,6 @@ type GoLevelDBStore struct { first uint64 last uint64 - - lastCommit time.Time } func (s *GoLevelDBStore) FirstID() (uint64, error) { @@ -134,17 +132,7 @@ func (s *GoLevelDBStore) StoreLogs(logs []*Log) error { w.Put(key, buf.Bytes()) } - n := time.Now() - if s.cfg.Replication.SyncLog == 2 || - (s.cfg.Replication.SyncLog == 1 && n.Sub(s.lastCommit) > time.Second) { - err = w.SyncCommit() - } else { - err = w.Commit() - } - - s.lastCommit = n - - if err != nil { + if err = w.Commit(); err != nil { return err } @@ -268,7 +256,7 @@ func (s *GoLevelDBStore) open() error { return err } -func NewGoLevelDBStore(base string) (*GoLevelDBStore, error) { +func NewGoLevelDBStore(base string, syncLog int) (*GoLevelDBStore, error) { cfg := config.NewConfigDefault() cfg.DBName = "goleveldb" cfg.DBPath = base @@ -276,6 +264,7 @@ func NewGoLevelDBStore(base string) (*GoLevelDBStore, error) { cfg.LevelDB.CacheSize = 64 * 1024 * 1024 cfg.LevelDB.WriteBufferSize = 64 * 1024 * 1024 cfg.LevelDB.Compression = false + cfg.DBSyncCommit = syncLog s := new(GoLevelDBStore) s.cfg = cfg diff --git a/rpl/rpl.go b/rpl/rpl.go index 3eaad9a..d862132 100644 --- a/rpl/rpl.go +++ b/rpl/rpl.go @@ -49,7 +49,7 @@ func NewReplication(cfg *config.Config) (*Replication, error) { r.cfg = cfg var err error - if r.s, err = NewGoLevelDBStore(path.Join(base, "wal")); err != nil { + if r.s, err = NewGoLevelDBStore(path.Join(base, "wal"), cfg.Replication.SyncLog); err != nil { return nil, err } diff --git a/rpl/store_test.go b/rpl/store_test.go index ddb43f0..0dda1ce 100644 --- a/rpl/store_test.go +++ b/rpl/store_test.go @@ -16,7 +16,7 @@ func TestGoLevelDBStore(t *testing.T) { defer os.RemoveAll(dir) // New level - l, err := NewGoLevelDBStore(dir) + l, err := NewGoLevelDBStore(dir, 0) if err != nil { t.Fatalf("err: %v ", err) } diff --git a/store/db.go b/store/db.go index c238016..b2d116b 100644 --- a/store/db.go +++ b/store/db.go @@ -1,15 +1,27 @@ package store import ( + "github.com/siddontang/ledisdb/config" "github.com/siddontang/ledisdb/store/driver" + "sync" "time" ) type DB struct { - driver.IDB + db driver.IDB name string st *Stat + + cfg *config.Config + + lastCommit time.Time + + m sync.Mutex +} + +func (db *DB) Close() error { + return db.db.Close() } func (db *DB) String() string { @@ -20,43 +32,46 @@ func (db *DB) NewIterator() *Iterator { db.st.IterNum.Add(1) it := new(Iterator) - it.it = db.IDB.NewIterator() + it.it = db.db.NewIterator() it.st = db.st return it } func (db *DB) Get(key []byte) ([]byte, error) { - v, err := db.IDB.Get(key) + v, err := db.db.Get(key) db.st.statGet(v, err) return v, err } func (db *DB) Put(key []byte, value []byte) error { db.st.PutNum.Add(1) - return db.IDB.Put(key, value) + + if db.needSyncCommit() { + return db.db.SyncPut(key, value) + + } else { + return db.db.Put(key, value) + + } } func (db *DB) Delete(key []byte) error { db.st.DeleteNum.Add(1) - return db.IDB.Delete(key) -} -func (db *DB) SyncPut(key []byte, value []byte) error { - db.st.SyncPutNum.Add(1) - return db.IDB.SyncPut(key, value) -} - -func (db *DB) SyncDelete(key []byte) error { - db.st.SyncDeleteNum.Add(1) - return db.IDB.SyncDelete(key) + if db.needSyncCommit() { + return db.db.SyncDelete(key) + } else { + return db.db.Delete(key) + } } func (db *DB) NewWriteBatch() *WriteBatch { db.st.BatchNum.Add(1) wb := new(WriteBatch) - wb.IWriteBatch = db.IDB.NewWriteBatch() + wb.wb = db.db.NewWriteBatch() wb.st = db.st + wb.db = db return wb } @@ -65,7 +80,7 @@ func (db *DB) NewSnapshot() (*Snapshot, error) { var err error s := &Snapshot{} - if s.ISnapshot, err = db.IDB.NewSnapshot(); err != nil { + if s.ISnapshot, err = db.db.NewSnapshot(); err != nil { return nil, err } s.st = db.st @@ -77,7 +92,7 @@ func (db *DB) Compact() error { db.st.CompactNum.Add(1) t := time.Now() - err := db.IDB.Compact() + err := db.db.Compact() db.st.CompactTotalTime.Add(time.Now().Sub(t)) @@ -107,7 +122,7 @@ func (db *DB) RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, off } func (db *DB) Begin() (*Tx, error) { - tx, err := db.IDB.Begin() + tx, err := db.db.Begin() if err != nil { return nil, err } @@ -120,3 +135,24 @@ func (db *DB) Begin() (*Tx, error) { func (db *DB) Stat() *Stat { return db.st } + +func (db *DB) needSyncCommit() bool { + if db.cfg.DBSyncCommit == 0 { + return false + } else if db.cfg.DBSyncCommit == 2 { + return true + } else { + n := time.Now() + need := false + db.m.Lock() + + if n.Sub(db.lastCommit) > time.Second { + need = true + } + db.lastCommit = n + + db.m.Unlock() + return need + } + +} diff --git a/store/stat.go b/store/stat.go index 75436fe..0b535d0 100644 --- a/store/stat.go +++ b/store/stat.go @@ -5,25 +5,22 @@ import ( ) type Stat struct { - GetNum sync2.AtomicInt64 - GetMissingNum sync2.AtomicInt64 - PutNum sync2.AtomicInt64 - DeleteNum sync2.AtomicInt64 - SyncPutNum sync2.AtomicInt64 - SyncDeleteNum sync2.AtomicInt64 - IterNum sync2.AtomicInt64 - IterSeekNum sync2.AtomicInt64 - IterCloseNum sync2.AtomicInt64 - SnapshotNum sync2.AtomicInt64 - SnapshotCloseNum sync2.AtomicInt64 - BatchNum sync2.AtomicInt64 - BatchCommitNum sync2.AtomicInt64 - BatchSyncCommitNum sync2.AtomicInt64 - TxNum sync2.AtomicInt64 - TxCommitNum sync2.AtomicInt64 - TxCloseNum sync2.AtomicInt64 - CompactNum sync2.AtomicInt64 - CompactTotalTime sync2.AtomicDuration + GetNum sync2.AtomicInt64 + GetMissingNum sync2.AtomicInt64 + PutNum sync2.AtomicInt64 + DeleteNum sync2.AtomicInt64 + IterNum sync2.AtomicInt64 + IterSeekNum sync2.AtomicInt64 + IterCloseNum sync2.AtomicInt64 + SnapshotNum sync2.AtomicInt64 + SnapshotCloseNum sync2.AtomicInt64 + BatchNum sync2.AtomicInt64 + BatchCommitNum sync2.AtomicInt64 + TxNum sync2.AtomicInt64 + TxCommitNum sync2.AtomicInt64 + TxCloseNum sync2.AtomicInt64 + CompactNum sync2.AtomicInt64 + CompactTotalTime sync2.AtomicDuration } func (st *Stat) statGet(v []byte, err error) { @@ -35,23 +32,4 @@ func (st *Stat) statGet(v []byte, err error) { func (st *Stat) Reset() { *st = Stat{} - // st.GetNum.Set(0) - // st.GetMissingNum.Set(0) - // st.PutNum.Set(0) - // st.DeleteNum.Set(0) - // st.SyncPutNum.Set(0) - // st.SyncDeleteNum.Set(0) - // st.IterNum.Set(0) - // st.IterSeekNum.Set(0) - // st.IterCloseNum.Set(0) - // st.SnapshotNum.Set(0) - // st.SnapshotCloseNum.Set(0) - // st.BatchNum.Set(0) - // st.BatchCommitNum.Set(0) - // st.BatchSyncCommitNum.Set(0) - // st.TxNum.Set(0) - // st.TxCommitNum.Set(0) - // st.TxCloseNum.Set(0) - // st.CompactNum.Set(0) - // st.CompactTotalTime.Set(0) } diff --git a/store/store.go b/store/store.go index d882e48..3ad8a54 100644 --- a/store/store.go +++ b/store/store.go @@ -40,7 +40,11 @@ func Open(cfg *config.Config) (*DB, error) { return nil, err } - db := &DB{idb, s.String(), &Stat{}} + db := new(DB) + db.db = idb + db.name = s.String() + db.st = &Stat{} + db.cfg = cfg return db, nil } diff --git a/store/tx.go b/store/tx.go index 494219d..6845ee5 100644 --- a/store/tx.go +++ b/store/tx.go @@ -5,13 +5,13 @@ import ( ) type Tx struct { - driver.Tx + tx driver.Tx st *Stat } func (tx *Tx) NewIterator() *Iterator { it := new(Iterator) - it.it = tx.Tx.NewIterator() + it.it = tx.tx.NewIterator() it.st = tx.st tx.st.IterNum.Add(1) @@ -23,7 +23,7 @@ func (tx *Tx) NewWriteBatch() *WriteBatch { tx.st.BatchNum.Add(1) wb := new(WriteBatch) - wb.IWriteBatch = tx.Tx.NewWriteBatch() + wb.wb = tx.tx.NewWriteBatch() wb.st = tx.st return wb } @@ -51,26 +51,26 @@ func (tx *Tx) RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, off } func (tx *Tx) Get(key []byte) ([]byte, error) { - v, err := tx.Tx.Get(key) + v, err := tx.tx.Get(key) tx.st.statGet(v, err) return v, err } func (tx *Tx) Put(key []byte, value []byte) error { tx.st.PutNum.Add(1) - return tx.Tx.Put(key, value) + return tx.tx.Put(key, value) } func (tx *Tx) Delete(key []byte) error { tx.st.DeleteNum.Add(1) - return tx.Tx.Delete(key) + return tx.tx.Delete(key) } func (tx *Tx) Commit() error { tx.st.TxCommitNum.Add(1) - return tx.Tx.Commit() + return tx.tx.Commit() } func (tx *Tx) Rollback() error { - return tx.Tx.Rollback() + return tx.tx.Rollback() } diff --git a/store/writebatch.go b/store/writebatch.go index e1235df..bf4658c 100644 --- a/store/writebatch.go +++ b/store/writebatch.go @@ -5,20 +5,22 @@ import ( ) type WriteBatch struct { - driver.IWriteBatch + wb driver.IWriteBatch st *Stat putNum int64 deleteNum int64 + + db *DB } func (wb *WriteBatch) Put(key []byte, value []byte) { wb.putNum++ - wb.IWriteBatch.Put(key, value) + wb.wb.Put(key, value) } func (wb *WriteBatch) Delete(key []byte) { wb.deleteNum++ - wb.IWriteBatch.Delete(key) + wb.wb.Delete(key) } func (wb *WriteBatch) Commit() error { @@ -27,18 +29,13 @@ func (wb *WriteBatch) Commit() error { wb.st.DeleteNum.Add(wb.deleteNum) wb.putNum = 0 wb.deleteNum = 0 - return wb.IWriteBatch.Commit() -} - -func (wb *WriteBatch) SyncCommit() error { - wb.st.BatchSyncCommitNum.Add(1) - wb.st.SyncPutNum.Add(wb.putNum) - wb.st.SyncDeleteNum.Add(wb.deleteNum) - wb.putNum = 0 - wb.deleteNum = 0 - return wb.IWriteBatch.SyncCommit() + if wb.db == nil || !wb.db.needSyncCommit() { + return wb.wb.Commit() + } else { + return wb.wb.SyncCommit() + } } func (wb *WriteBatch) Rollback() error { - return wb.IWriteBatch.Rollback() + return wb.wb.Rollback() }