add sync interface

This commit is contained in:
siddontang 2014-10-09 13:05:55 +08:00
parent 884de89cb8
commit 761431cf5b
14 changed files with 119 additions and 0 deletions

View File

@ -98,7 +98,14 @@ func (db *DB) Delete(key []byte) error {
return b.Delete(key)
})
return err
}
func (db *DB) SyncPut(key []byte, value []byte) error {
return db.Put(key, value)
}
func (db *DB) SyncDelete(key []byte) error {
return db.Delete(key)
}
func (db *DB) NewIterator() driver.IIterator {
@ -152,6 +159,10 @@ func (db *DB) BatchPut(writes []driver.Write) error {
return err
}
func (db *DB) SyncBatchPut(writes []driver.Write) error {
return db.BatchPut(writes)
}
func (db *DB) Compact() error {
return nil
}

View File

@ -48,6 +48,10 @@ func (t *Tx) BatchPut(writes []driver.Write) error {
return nil
}
func (t *Tx) SyncBatchPut(writes []driver.Write) error {
return t.BatchPut(writes)
}
func (t *Tx) Rollback() error {
return t.tx.Rollback()
}

View File

@ -2,6 +2,7 @@ package driver
type BatchPuter interface {
BatchPut([]Write) error
SyncBatchPut([]Write) error
}
type Write struct {
@ -29,6 +30,10 @@ func (w *WriteBatch) Commit() error {
return w.batch.BatchPut(w.wb)
}
func (w *WriteBatch) SyncCommit() error {
return w.batch.SyncBatchPut(w.wb)
}
func (w *WriteBatch) Rollback() error {
w.wb = w.wb[0:0]
return nil

View File

@ -16,6 +16,9 @@ type IDB interface {
Put(key []byte, value []byte) error
Delete(key []byte) error
SyncPut(key []byte, value []byte) error
SyncDelete(key []byte) error
NewIterator() IIterator
NewWriteBatch() IWriteBatch
@ -53,6 +56,7 @@ type IWriteBatch interface {
Put(key []byte, value []byte)
Delete(key []byte)
Commit() error
SyncCommit() error
Rollback() error
}

View File

@ -21,6 +21,10 @@ func (w *WriteBatch) Commit() error {
return w.db.db.Write(w.wbatch, nil)
}
func (w *WriteBatch) SyncCommit() error {
return w.db.db.Write(w.wbatch, w.db.syncOpts)
}
func (w *WriteBatch) Rollback() error {
w.wbatch.Reset()
return nil

View File

@ -41,6 +41,8 @@ type DB struct {
iteratorOpts *opt.ReadOptions
syncOpts *opt.WriteOptions
cache cache.Cache
filter filter.Filter
@ -102,6 +104,9 @@ func (db *DB) initOpts() {
db.iteratorOpts = &opt.ReadOptions{}
db.iteratorOpts.DontFillCache = true
db.syncOpts = &opt.WriteOptions{}
db.syncOpts.Sync = true
}
func newOptions(cfg *config.LevelDBConfig) *opt.Options {
@ -147,6 +152,14 @@ func (db *DB) Delete(key []byte) error {
return db.db.Delete(key, nil)
}
func (db *DB) SyncPut(key []byte, value []byte) error {
return db.db.Put(key, value, db.syncOpts)
}
func (db *DB) SyncDelete(key []byte) error {
return db.db.Delete(key, db.syncOpts)
}
func (db *DB) NewWriteBatch() driver.IWriteBatch {
wb := &WriteBatch{
db: db,

View File

@ -46,6 +46,10 @@ func (w *WriteBatch) Commit() error {
return w.commit(w.db.writeOpts)
}
func (w *WriteBatch) SyncCommit() error {
return w.commit(w.db.syncOpts)
}
func (w *WriteBatch) Rollback() error {
C.leveldb_writebatch_clear(w.wbatch)
return nil

View File

@ -81,6 +81,8 @@ type DB struct {
writeOpts *WriteOptions
iteratorOpts *ReadOptions
syncOpts *WriteOptions
cache *Cache
filter *FilterPolicy
@ -132,6 +134,9 @@ func (db *DB) initOptions(cfg *config.LevelDBConfig) {
db.readOpts = NewReadOptions()
db.writeOpts = NewWriteOptions()
db.syncOpts = NewWriteOptions()
db.syncOpts.SetSync(true)
db.iteratorOpts = NewReadOptions()
db.iteratorOpts.SetFillCache(false)
}
@ -171,6 +176,14 @@ func (db *DB) Delete(key []byte) error {
return db.delete(db.writeOpts, key)
}
func (db *DB) SyncPut(key []byte, value []byte) error {
return db.put(db.syncOpts, key, value)
}
func (db *DB) SyncDelete(key []byte) error {
return db.delete(db.syncOpts, key)
}
func (db *DB) NewWriteBatch() driver.IWriteBatch {
wb := &WriteBatch{
db: db,

View File

@ -46,6 +46,10 @@ func (w *WriteBatch) Commit() error {
return w.commit(w.db.writeOpts)
}
func (w *WriteBatch) SyncCommit() error {
return w.commit(w.db.syncOpts)
}
func (w *WriteBatch) Rollback() error {
C.leveldb_writebatch_clear(w.wbatch)
return nil

View File

@ -81,6 +81,8 @@ type DB struct {
writeOpts *WriteOptions
iteratorOpts *ReadOptions
syncOpts *WriteOptions
cache *Cache
filter *FilterPolicy
@ -132,6 +134,9 @@ func (db *DB) initOptions(cfg *config.LevelDBConfig) {
db.readOpts = NewReadOptions()
db.writeOpts = NewWriteOptions()
db.syncOpts = NewWriteOptions()
db.syncOpts.SetSync(true)
db.iteratorOpts = NewReadOptions()
db.iteratorOpts.SetFillCache(false)
}
@ -171,6 +176,14 @@ func (db *DB) Delete(key []byte) error {
return db.delete(db.writeOpts, key)
}
func (db *DB) SyncPut(key []byte, value []byte) error {
return db.put(db.syncOpts, key, value)
}
func (db *DB) SyncDelete(key []byte) error {
return db.delete(db.syncOpts, key)
}
func (db *DB) NewWriteBatch() driver.IWriteBatch {
wb := &WriteBatch{
db: db,

View File

@ -122,6 +122,14 @@ func (db MDB) BatchPut(writes []driver.Write) error {
return itr.err
}
func (db MDB) SyncBatchPut(writes []driver.Write) error {
if err := db.BatchPut(writes); err != nil {
return err
}
return db.env.Sync(1)
}
func (db MDB) Get(key []byte) ([]byte, error) {
tx, err := db.env.BeginTxn(nil, mdb.RDONLY)
if err != nil {
@ -148,6 +156,22 @@ func (db MDB) Delete(key []byte) error {
return itr.Error()
}
func (db MDB) SyncPut(key []byte, value []byte) error {
if err := db.Put(key, value); err != nil {
return err
}
return db.env.Sync(1)
}
func (db MDB) SyncDelete(key []byte) error {
if err := db.Delete(key); err != nil {
return err
}
return db.env.Sync(1)
}
type MDBIterator struct {
key []byte
value []byte

View File

@ -74,7 +74,10 @@ func (t *Tx) BatchPut(writes []driver.Write) error {
itr.setState()
return itr.Close()
}
func (t *Tx) SyncBatchPut(writes []driver.Write) error {
return t.BatchPut(writes)
}
func (t *Tx) Rollback() error {

View File

@ -45,6 +45,10 @@ func (w *WriteBatch) Commit() error {
return w.commit(w.db.writeOpts)
}
func (w *WriteBatch) SyncCommit() error {
return w.commit(w.db.syncOpts)
}
func (w *WriteBatch) Rollback() error {
C.rocksdb_writebatch_clear(w.wbatch)
return nil

View File

@ -85,6 +85,8 @@ type DB struct {
writeOpts *WriteOptions
iteratorOpts *ReadOptions
syncOpts *WriteOptions
cache *Cache
filter *FilterPolicy
@ -152,6 +154,9 @@ func (db *DB) initOptions(cfg *config.LevelDBConfig) {
db.readOpts = NewReadOptions()
db.writeOpts = NewWriteOptions()
db.syncOpts = NewWriteOptions()
db.syncOpts.SetSync(true)
db.iteratorOpts = NewReadOptions()
db.iteratorOpts.SetFillCache(false)
}
@ -197,6 +202,14 @@ func (db *DB) Delete(key []byte) error {
return db.delete(db.writeOpts, key)
}
func (db *DB) SyncPut(key []byte, value []byte) error {
return db.put(db.syncOpts, key, value)
}
func (db *DB) SyncDelete(key []byte) error {
return db.delete(db.syncOpts, key)
}
func (db *DB) NewWriteBatch() driver.IWriteBatch {
wb := &WriteBatch{
db: db,