added DeleteAll function

This commit is contained in:
Josh Baker 2016-09-02 19:19:17 -07:00
parent d570a6fba9
commit b559e28540
2 changed files with 184 additions and 38 deletions

137
buntdb.go
View File

@ -233,6 +233,24 @@ type index struct {
db *DB // the origin database db *DB // the origin database
} }
// clearCopy creates a copy of the index, but with an empty dataset.
func (idx *index) clearCopy() *index {
nidx := &index{
name: idx.name,
pattern: idx.pattern,
db: idx.db,
less: idx.less,
rect: idx.rect,
}
if nidx.less != nil {
nidx.btr = btree.New(btreeDegrees, nidx)
}
if nidx.rect != nil {
nidx.rtr = rtree.New(nidx)
}
return nidx
}
// CreateIndex builds a new index and populates it with items. // CreateIndex builds a new index and populates it with items.
// The items are ordered in an b-tree and can be retrieved using the // The items are ordered in an b-tree and can be retrieved using the
// Ascend* and Descend* methods. // Ascend* and Descend* methods.
@ -789,9 +807,6 @@ func (db *DB) readLoad(rd io.Reader, modTime time.Time) error {
if len(parts) == 0 { if len(parts) == 0 {
continue continue
} }
if len(parts[0]) != 3 {
return ErrInvalid
}
if (parts[0][0] == 's' || parts[0][1] == 'S') && if (parts[0][0] == 's' || parts[0][1] == 'S') &&
(parts[0][1] == 'e' || parts[0][1] == 'E') && (parts[0][1] == 'e' || parts[0][1] == 'E') &&
(parts[0][2] == 't' || parts[0][2] == 'T') { (parts[0][2] == 't' || parts[0][2] == 'T') {
@ -830,6 +845,11 @@ func (db *DB) readLoad(rd io.Reader, modTime time.Time) error {
return ErrInvalid return ErrInvalid
} }
db.deleteFromDatabase(&dbItem{key: parts[1]}) db.deleteFromDatabase(&dbItem{key: parts[1]})
} else if (parts[0][0] == 'f' || parts[0][1] == 'F') &&
strings.ToLower(parts[0]) == "flushdb" {
db.keys = btree.New(btreeDegrees, nil)
db.exps = btree.New(btreeDegrees, &exctx{db})
db.idxs = make(map[string]*index)
} else { } else {
return ErrInvalid return ErrInvalid
} }
@ -929,11 +949,54 @@ type Tx struct {
db *DB // the underlying database. db *DB // the underlying database.
writable bool // when false mutable operations fail. writable bool // when false mutable operations fail.
funcd bool // when true Commit and Rollback panic. funcd bool // when true Commit and Rollback panic.
wc *txWriteContext // context for writable transactions.
}
type txWriteContext struct {
// rollback when deleteAll is called
rbkeys *btree.BTree // a tree of all item ordered by key
rbexps *btree.BTree // a tree of items ordered by expiration
rbidxs map[string]*index // the index trees.
rollbacks map[string]*dbItem // cotnains details for rolling back tx. rollbacks map[string]*dbItem // cotnains details for rolling back tx.
commits map[string]*dbItem // contains details for committing tx. commits map[string]*dbItem // contains details for committing tx.
itercount int // stack of iterators itercount int // stack of iterators
} }
// DeleteAll deletes all items from the database.
func (tx *Tx) DeleteAll() error {
if tx.db == nil {
return ErrTxClosed
} else if !tx.writable {
return ErrTxNotWritable
} else if tx.wc.itercount > 0 {
return ErrTxIterating
}
// check to see if we've already deleted everything
if tx.wc.rbkeys == nil {
// we need to backup the live data in case of a rollback.
tx.wc.rbkeys = tx.db.keys
tx.wc.rbexps = tx.db.exps
tx.wc.rbidxs = tx.db.idxs
}
// now reset the live database trees
tx.db.keys = btree.New(btreeDegrees, nil)
tx.db.exps = btree.New(btreeDegrees, &exctx{tx.db})
tx.db.idxs = make(map[string]*index)
// finally re-create the indexes
for name, idx := range tx.wc.rbidxs {
tx.db.idxs[name] = idx.clearCopy()
}
// always clear out the commits
tx.wc.commits = make(map[string]*dbItem)
return nil
}
// begin opens a new transaction. // begin opens a new transaction.
// Multiple read-only transactions can be opened at the same time but there can // Multiple read-only transactions can be opened at the same time but there can
// only be one read/write transaction at a time. Attempting to open a read/write // only be one read/write transaction at a time. Attempting to open a read/write
@ -952,9 +1015,10 @@ func (db *DB) begin(writable bool) (*Tx, error) {
return nil, ErrDatabaseClosed return nil, ErrDatabaseClosed
} }
if writable { if writable {
tx.rollbacks = make(map[string]*dbItem) tx.wc = &txWriteContext{}
tx.wc.rollbacks = make(map[string]*dbItem)
if db.persist { if db.persist {
tx.commits = make(map[string]*dbItem) tx.wc.commits = make(map[string]*dbItem)
} }
} }
return tx, nil return tx, nil
@ -981,7 +1045,13 @@ func (tx *Tx) unlock() {
// rollbackInner handles the underlying rollback logic. // rollbackInner handles the underlying rollback logic.
// Intended to be called from Commit() and Rollback(). // Intended to be called from Commit() and Rollback().
func (tx *Tx) rollbackInner() { func (tx *Tx) rollbackInner() {
for key, item := range tx.rollbacks { // rollback the deleteAll if needed
if tx.wc.rbkeys != nil {
tx.db.keys = tx.wc.rbkeys
tx.db.idxs = tx.wc.rbidxs
tx.db.exps = tx.wc.rbexps
}
for key, item := range tx.wc.rollbacks {
tx.db.deleteFromDatabase(&dbItem{key: key}) tx.db.deleteFromDatabase(&dbItem{key: key})
if item != nil { if item != nil {
// When an item is not nil, we will need to reinsert that item // When an item is not nil, we will need to reinsert that item
@ -1004,10 +1074,14 @@ func (tx *Tx) commit() error {
return ErrTxNotWritable return ErrTxNotWritable
} }
var err error var err error
if tx.db.persist && len(tx.commits) > 0 { if tx.db.persist && (len(tx.wc.commits) > 0 || tx.wc.rbkeys != nil) {
// Each committed record is written to disk
tx.db.buf.Reset() tx.db.buf.Reset()
for key, item := range tx.commits { // write a flushdb if a deleteAll was called.
if tx.wc.rbkeys != nil {
tx.db.buf.WriteString("*1\r\n$7\r\nflushdb\r\n")
}
// Each committed record is written to disk
for key, item := range tx.wc.commits {
if item == nil { if item == nil {
(&dbItem{key: key}).writeDeleteTo(tx.db.buf) (&dbItem{key: key}).writeDeleteTo(tx.db.buf)
} else { } else {
@ -1025,7 +1099,6 @@ func (tx *Tx) commit() error {
} }
// Increment the number of flushes. The background syncing uses this. // Increment the number of flushes. The background syncing uses this.
tx.db.flushes++ tx.db.flushes++
} }
// Unlock the database and allow for another writable transaction. // Unlock the database and allow for another writable transaction.
tx.unlock() tx.unlock()
@ -1196,7 +1269,7 @@ func (tx *Tx) Set(key, value string, opts *SetOptions) (previousValue string,
return "", false, ErrTxClosed return "", false, ErrTxClosed
} else if !tx.writable { } else if !tx.writable {
return "", false, ErrTxNotWritable return "", false, ErrTxNotWritable
} else if tx.itercount > 0 { } else if tx.wc.itercount > 0 {
return "", false, ErrTxIterating return "", false, ErrTxIterating
} }
item := &dbItem{key: key, val: value} item := &dbItem{key: key, val: value}
@ -1209,27 +1282,32 @@ func (tx *Tx) Set(key, value string, opts *SetOptions) (previousValue string,
} }
// Insert the item into the keys tree. // Insert the item into the keys tree.
prev := tx.db.insertIntoDatabase(item) prev := tx.db.insertIntoDatabase(item)
// insert into the rollback map if there has not been a deleteAll.
if tx.wc.rbkeys == nil {
if prev == nil { if prev == nil {
// An item with the same key did not previously exist. Let's create a // An item with the same key did not previously exist. Let's
// rollback entry with a nil value. A nil value indicates that the // create a rollback entry with a nil value. A nil value indicates
// entry should be deleted on rollback. When the value is *not* nil, // that the entry should be deleted on rollback. When the value is
// that means the entry should be reverted. // *not* nil, that means the entry should be reverted.
tx.rollbacks[key] = nil tx.wc.rollbacks[key] = nil
} else { } else {
// A previous item already exists in the database. Let's create a // A previous item already exists in the database. Let's create a
// rollback entry with the item as the value. We need to check the map // rollback entry with the item as the value. We need to check the
// to see if there isn't already an item that matches the same key. // map to see if there isn't already an item that matches the
if _, ok := tx.rollbacks[key]; !ok { // same key.
tx.rollbacks[key] = prev if _, ok := tx.wc.rollbacks[key]; !ok {
tx.wc.rollbacks[key] = prev
} }
if !prev.expired() { if !prev.expired() {
previousValue, replaced = prev.val, true previousValue, replaced = prev.val, true
} }
} }
}
// For commits we simply assign the item to the map. We use this map to // For commits we simply assign the item to the map. We use this map to
// write the entry to disk. // write the entry to disk.
if tx.db.persist { if tx.db.persist {
tx.commits[key] = item tx.wc.commits[key] = item
} }
return previousValue, replaced, nil return previousValue, replaced, nil
} }
@ -1259,18 +1337,21 @@ func (tx *Tx) Delete(key string) (val string, err error) {
return "", ErrTxClosed return "", ErrTxClosed
} else if !tx.writable { } else if !tx.writable {
return "", ErrTxNotWritable return "", ErrTxNotWritable
} else if tx.itercount > 0 { } else if tx.wc.itercount > 0 {
return "", ErrTxIterating return "", ErrTxIterating
} }
item := tx.db.deleteFromDatabase(&dbItem{key: key}) item := tx.db.deleteFromDatabase(&dbItem{key: key})
if item == nil { if item == nil {
return "", ErrNotFound return "", ErrNotFound
} }
if _, ok := tx.rollbacks[key]; !ok { // create a rollback entry if there has not been a deleteAll call.
tx.rollbacks[key] = item if tx.wc.rbkeys == nil {
if _, ok := tx.wc.rollbacks[key]; !ok {
tx.wc.rollbacks[key] = item
}
} }
if tx.db.persist { if tx.db.persist {
tx.commits[key] = nil tx.wc.commits[key] = nil
} }
// Even though the item has been deleted, we still want to check // Even though the item has been deleted, we still want to check
// if it has expired. An expired item should not be returned. // if it has expired. An expired item should not be returned.
@ -1349,10 +1430,12 @@ func (tx *Tx) scan(desc, gt, lt bool, index, start, stop string,
} }
} }
// execute the scan on the underlying tree. // execute the scan on the underlying tree.
tx.itercount++ if tx.wc != nil {
tx.wc.itercount++
defer func() { defer func() {
tx.itercount-- tx.wc.itercount--
}() }()
}
if desc { if desc {
if gt { if gt {
if lt { if lt {

View File

@ -184,6 +184,69 @@ func TestMutatingIterator(t *testing.T) {
} }
} }
func TestDeleteAll(t *testing.T) {
db := testOpen(t)
defer testClose(db)
db.Update(func(tx *Tx) error {
tx.Set("hello1", "planet1", nil)
tx.Set("hello2", "planet2", nil)
tx.Set("hello3", "planet3", nil)
return nil
})
db.CreateIndex("all", "*", IndexString)
db.Update(func(tx *Tx) error {
tx.Set("hello1", "planet1.1", nil)
tx.DeleteAll()
tx.Set("bb", "11", nil)
tx.Set("aa", "**", nil)
tx.Delete("aa")
tx.Set("aa", "22", nil)
return nil
})
var res string
var res2 string
db.View(func(tx *Tx) error {
tx.Ascend("", func(key, val string) bool {
res += key + ":" + val + "\n"
return true
})
tx.Ascend("all", func(key, val string) bool {
res2 += key + ":" + val + "\n"
return true
})
return nil
})
if res != "aa:22\nbb:11\n" {
t.Fatal("fail")
}
if res2 != "bb:11\naa:22\n" {
t.Fatal("fail")
}
db = testReOpen(t, db)
defer testClose(db)
res = ""
res2 = ""
db.CreateIndex("all", "*", IndexString)
db.View(func(tx *Tx) error {
tx.Ascend("", func(key, val string) bool {
res += key + ":" + val + "\n"
return true
})
tx.Ascend("all", func(key, val string) bool {
res2 += key + ":" + val + "\n"
return true
})
return nil
})
if res != "aa:22\nbb:11\n" {
t.Fatal("fail")
}
if res2 != "bb:11\naa:22\n" {
t.Fatal("fail")
}
}
func TestVariousTx(t *testing.T) { func TestVariousTx(t *testing.T) {
db := testOpen(t) db := testOpen(t)
defer testClose(db) defer testClose(db)