diff --git a/store/boltdb/db.go b/store/boltdb/db.go index 15a0570..ac8cc03 100644 --- a/store/boltdb/db.go +++ b/store/boltdb/db.go @@ -98,7 +98,14 @@ func (db *DB) Delete(key []byte) error { return b.Delete(key) }) return err +} +func (db *DB) SyncPut(key []byte, value []byte) error { + return db.Put(key, value) +} + +func (db *DB) SyncDelete(key []byte) error { + return db.Delete(key) } func (db *DB) NewIterator() driver.IIterator { @@ -152,6 +159,10 @@ func (db *DB) BatchPut(writes []driver.Write) error { return err } +func (db *DB) SyncBatchPut(writes []driver.Write) error { + return db.BatchPut(writes) +} + func (db *DB) Compact() error { return nil } diff --git a/store/boltdb/tx.go b/store/boltdb/tx.go index 058a5f7..8dba000 100644 --- a/store/boltdb/tx.go +++ b/store/boltdb/tx.go @@ -48,6 +48,10 @@ func (t *Tx) BatchPut(writes []driver.Write) error { return nil } +func (t *Tx) SyncBatchPut(writes []driver.Write) error { + return t.BatchPut(writes) +} + func (t *Tx) Rollback() error { return t.tx.Rollback() } diff --git a/store/driver/batch.go b/store/driver/batch.go index 6b79c21..5fc461f 100644 --- a/store/driver/batch.go +++ b/store/driver/batch.go @@ -2,6 +2,7 @@ package driver type BatchPuter interface { BatchPut([]Write) error + SyncBatchPut([]Write) error } type Write struct { @@ -29,6 +30,10 @@ func (w *WriteBatch) Commit() error { return w.batch.BatchPut(w.wb) } +func (w *WriteBatch) SyncCommit() error { + return w.batch.SyncBatchPut(w.wb) +} + func (w *WriteBatch) Rollback() error { w.wb = w.wb[0:0] return nil diff --git a/store/driver/driver.go b/store/driver/driver.go index 859b840..e4312ce 100644 --- a/store/driver/driver.go +++ b/store/driver/driver.go @@ -16,6 +16,9 @@ type IDB interface { Put(key []byte, value []byte) error Delete(key []byte) error + SyncPut(key []byte, value []byte) error + SyncDelete(key []byte) error + NewIterator() IIterator NewWriteBatch() IWriteBatch @@ -53,6 +56,7 @@ type IWriteBatch interface { Put(key []byte, value []byte) Delete(key []byte) Commit() error + SyncCommit() error Rollback() error } diff --git a/store/goleveldb/batch.go b/store/goleveldb/batch.go index 74902b2..85b78c6 100644 --- a/store/goleveldb/batch.go +++ b/store/goleveldb/batch.go @@ -21,6 +21,10 @@ func (w *WriteBatch) Commit() error { return w.db.db.Write(w.wbatch, nil) } +func (w *WriteBatch) SyncCommit() error { + return w.db.db.Write(w.wbatch, w.db.syncOpts) +} + func (w *WriteBatch) Rollback() error { w.wbatch.Reset() return nil diff --git a/store/goleveldb/db.go b/store/goleveldb/db.go index dad6a90..5e76f74 100644 --- a/store/goleveldb/db.go +++ b/store/goleveldb/db.go @@ -41,6 +41,8 @@ type DB struct { iteratorOpts *opt.ReadOptions + syncOpts *opt.WriteOptions + cache cache.Cache filter filter.Filter @@ -102,6 +104,9 @@ func (db *DB) initOpts() { db.iteratorOpts = &opt.ReadOptions{} db.iteratorOpts.DontFillCache = true + + db.syncOpts = &opt.WriteOptions{} + db.syncOpts.Sync = true } func newOptions(cfg *config.LevelDBConfig) *opt.Options { @@ -147,6 +152,14 @@ func (db *DB) Delete(key []byte) error { return db.db.Delete(key, nil) } +func (db *DB) SyncPut(key []byte, value []byte) error { + return db.db.Put(key, value, db.syncOpts) +} + +func (db *DB) SyncDelete(key []byte) error { + return db.db.Delete(key, db.syncOpts) +} + func (db *DB) NewWriteBatch() driver.IWriteBatch { wb := &WriteBatch{ db: db, diff --git a/store/hyperleveldb/batch.go b/store/hyperleveldb/batch.go index 149f0b4..8084335 100644 --- a/store/hyperleveldb/batch.go +++ b/store/hyperleveldb/batch.go @@ -46,6 +46,10 @@ func (w *WriteBatch) Commit() error { return w.commit(w.db.writeOpts) } +func (w *WriteBatch) SyncCommit() error { + return w.commit(w.db.syncOpts) +} + func (w *WriteBatch) Rollback() error { C.leveldb_writebatch_clear(w.wbatch) return nil diff --git a/store/hyperleveldb/db.go b/store/hyperleveldb/db.go index 071dd38..0f13a0b 100644 --- a/store/hyperleveldb/db.go +++ b/store/hyperleveldb/db.go @@ -81,6 +81,8 @@ type DB struct { writeOpts *WriteOptions iteratorOpts *ReadOptions + syncOpts *WriteOptions + cache *Cache filter *FilterPolicy @@ -132,6 +134,9 @@ func (db *DB) initOptions(cfg *config.LevelDBConfig) { db.readOpts = NewReadOptions() db.writeOpts = NewWriteOptions() + db.syncOpts = NewWriteOptions() + db.syncOpts.SetSync(true) + db.iteratorOpts = NewReadOptions() db.iteratorOpts.SetFillCache(false) } @@ -171,6 +176,14 @@ func (db *DB) Delete(key []byte) error { return db.delete(db.writeOpts, key) } +func (db *DB) SyncPut(key []byte, value []byte) error { + return db.put(db.syncOpts, key, value) +} + +func (db *DB) SyncDelete(key []byte) error { + return db.delete(db.syncOpts, key) +} + func (db *DB) NewWriteBatch() driver.IWriteBatch { wb := &WriteBatch{ db: db, diff --git a/store/leveldb/batch.go b/store/leveldb/batch.go index 08a7d46..caadc03 100644 --- a/store/leveldb/batch.go +++ b/store/leveldb/batch.go @@ -46,6 +46,10 @@ func (w *WriteBatch) Commit() error { return w.commit(w.db.writeOpts) } +func (w *WriteBatch) SyncCommit() error { + return w.commit(w.db.syncOpts) +} + func (w *WriteBatch) Rollback() error { C.leveldb_writebatch_clear(w.wbatch) return nil diff --git a/store/leveldb/db.go b/store/leveldb/db.go index ab29928..3705f78 100644 --- a/store/leveldb/db.go +++ b/store/leveldb/db.go @@ -81,6 +81,8 @@ type DB struct { writeOpts *WriteOptions iteratorOpts *ReadOptions + syncOpts *WriteOptions + cache *Cache filter *FilterPolicy @@ -132,6 +134,9 @@ func (db *DB) initOptions(cfg *config.LevelDBConfig) { db.readOpts = NewReadOptions() db.writeOpts = NewWriteOptions() + db.syncOpts = NewWriteOptions() + db.syncOpts.SetSync(true) + db.iteratorOpts = NewReadOptions() db.iteratorOpts.SetFillCache(false) } @@ -171,6 +176,14 @@ func (db *DB) Delete(key []byte) error { return db.delete(db.writeOpts, key) } +func (db *DB) SyncPut(key []byte, value []byte) error { + return db.put(db.syncOpts, key, value) +} + +func (db *DB) SyncDelete(key []byte) error { + return db.delete(db.syncOpts, key) +} + func (db *DB) NewWriteBatch() driver.IWriteBatch { wb := &WriteBatch{ db: db, diff --git a/store/mdb/mdb.go b/store/mdb/mdb.go index ca79706..6bf26f6 100644 --- a/store/mdb/mdb.go +++ b/store/mdb/mdb.go @@ -122,6 +122,14 @@ func (db MDB) BatchPut(writes []driver.Write) error { return itr.err } +func (db MDB) SyncBatchPut(writes []driver.Write) error { + if err := db.BatchPut(writes); err != nil { + return err + } + + return db.env.Sync(1) +} + func (db MDB) Get(key []byte) ([]byte, error) { tx, err := db.env.BeginTxn(nil, mdb.RDONLY) if err != nil { @@ -148,6 +156,22 @@ func (db MDB) Delete(key []byte) error { return itr.Error() } +func (db MDB) SyncPut(key []byte, value []byte) error { + if err := db.Put(key, value); err != nil { + return err + } + + return db.env.Sync(1) +} + +func (db MDB) SyncDelete(key []byte) error { + if err := db.Delete(key); err != nil { + return err + } + + return db.env.Sync(1) +} + type MDBIterator struct { key []byte value []byte diff --git a/store/mdb/tx.go b/store/mdb/tx.go index e98a5f6..e2e3be0 100644 --- a/store/mdb/tx.go +++ b/store/mdb/tx.go @@ -74,7 +74,10 @@ func (t *Tx) BatchPut(writes []driver.Write) error { itr.setState() return itr.Close() +} +func (t *Tx) SyncBatchPut(writes []driver.Write) error { + return t.BatchPut(writes) } func (t *Tx) Rollback() error { diff --git a/store/rocksdb/batch.go b/store/rocksdb/batch.go index b69c383..017fc88 100644 --- a/store/rocksdb/batch.go +++ b/store/rocksdb/batch.go @@ -45,6 +45,10 @@ func (w *WriteBatch) Commit() error { return w.commit(w.db.writeOpts) } +func (w *WriteBatch) SyncCommit() error { + return w.commit(w.db.syncOpts) +} + func (w *WriteBatch) Rollback() error { C.rocksdb_writebatch_clear(w.wbatch) return nil diff --git a/store/rocksdb/db.go b/store/rocksdb/db.go index f3fb406..f5cecf4 100644 --- a/store/rocksdb/db.go +++ b/store/rocksdb/db.go @@ -85,6 +85,8 @@ type DB struct { writeOpts *WriteOptions iteratorOpts *ReadOptions + syncOpts *WriteOptions + cache *Cache filter *FilterPolicy @@ -152,6 +154,9 @@ func (db *DB) initOptions(cfg *config.LevelDBConfig) { db.readOpts = NewReadOptions() db.writeOpts = NewWriteOptions() + db.syncOpts = NewWriteOptions() + db.syncOpts.SetSync(true) + db.iteratorOpts = NewReadOptions() db.iteratorOpts.SetFillCache(false) } @@ -197,6 +202,14 @@ func (db *DB) Delete(key []byte) error { return db.delete(db.writeOpts, key) } +func (db *DB) SyncPut(key []byte, value []byte) error { + return db.put(db.syncOpts, key, value) +} + +func (db *DB) SyncDelete(key []byte) error { + return db.delete(db.syncOpts, key) +} + func (db *DB) NewWriteBatch() driver.IWriteBatch { wb := &WriteBatch{ db: db,