From b559e285403d4527bc3ef790c7b0323fda9d2d5b Mon Sep 17 00:00:00 2001 From: Josh Baker Date: Fri, 2 Sep 2016 19:19:17 -0700 Subject: [PATCH] added DeleteAll function --- buntdb.go | 159 +++++++++++++++++++++++++++++++++++++------------ buntdb_test.go | 63 ++++++++++++++++++++ 2 files changed, 184 insertions(+), 38 deletions(-) diff --git a/buntdb.go b/buntdb.go index e181950..23d9350 100644 --- a/buntdb.go +++ b/buntdb.go @@ -233,6 +233,24 @@ type index struct { db *DB // the origin database } +// clearCopy creates a copy of the index, but with an empty dataset. +func (idx *index) clearCopy() *index { + nidx := &index{ + name: idx.name, + pattern: idx.pattern, + db: idx.db, + less: idx.less, + rect: idx.rect, + } + if nidx.less != nil { + nidx.btr = btree.New(btreeDegrees, nidx) + } + if nidx.rect != nil { + nidx.rtr = rtree.New(nidx) + } + return nidx +} + // CreateIndex builds a new index and populates it with items. // The items are ordered in an b-tree and can be retrieved using the // Ascend* and Descend* methods. @@ -789,9 +807,6 @@ func (db *DB) readLoad(rd io.Reader, modTime time.Time) error { if len(parts) == 0 { continue } - if len(parts[0]) != 3 { - return ErrInvalid - } if (parts[0][0] == 's' || parts[0][1] == 'S') && (parts[0][1] == 'e' || parts[0][1] == 'E') && (parts[0][2] == 't' || parts[0][2] == 'T') { @@ -830,6 +845,11 @@ func (db *DB) readLoad(rd io.Reader, modTime time.Time) error { return ErrInvalid } db.deleteFromDatabase(&dbItem{key: parts[1]}) + } else if (parts[0][0] == 'f' || parts[0][1] == 'F') && + strings.ToLower(parts[0]) == "flushdb" { + db.keys = btree.New(btreeDegrees, nil) + db.exps = btree.New(btreeDegrees, &exctx{db}) + db.idxs = make(map[string]*index) } else { return ErrInvalid } @@ -926,14 +946,57 @@ func (db *DB) get(key string) *dbItem { // // All transactions must be committed or rolled-back when done. type Tx struct { - db *DB // the underlying database. - writable bool // when false mutable operations fail. - funcd bool // when true Commit and Rollback panic. + db *DB // the underlying database. + writable bool // when false mutable operations fail. + funcd bool // when true Commit and Rollback panic. + wc *txWriteContext // context for writable transactions. +} + +type txWriteContext struct { + // rollback when deleteAll is called + rbkeys *btree.BTree // a tree of all item ordered by key + rbexps *btree.BTree // a tree of items ordered by expiration + rbidxs map[string]*index // the index trees. + rollbacks map[string]*dbItem // cotnains details for rolling back tx. commits map[string]*dbItem // contains details for committing tx. itercount int // stack of iterators } +// DeleteAll deletes all items from the database. +func (tx *Tx) DeleteAll() error { + if tx.db == nil { + return ErrTxClosed + } else if !tx.writable { + return ErrTxNotWritable + } else if tx.wc.itercount > 0 { + return ErrTxIterating + } + + // check to see if we've already deleted everything + if tx.wc.rbkeys == nil { + // we need to backup the live data in case of a rollback. + tx.wc.rbkeys = tx.db.keys + tx.wc.rbexps = tx.db.exps + tx.wc.rbidxs = tx.db.idxs + } + + // now reset the live database trees + tx.db.keys = btree.New(btreeDegrees, nil) + tx.db.exps = btree.New(btreeDegrees, &exctx{tx.db}) + tx.db.idxs = make(map[string]*index) + + // finally re-create the indexes + for name, idx := range tx.wc.rbidxs { + tx.db.idxs[name] = idx.clearCopy() + } + + // always clear out the commits + tx.wc.commits = make(map[string]*dbItem) + + return nil +} + // begin opens a new transaction. // Multiple read-only transactions can be opened at the same time but there can // only be one read/write transaction at a time. Attempting to open a read/write @@ -952,9 +1015,10 @@ func (db *DB) begin(writable bool) (*Tx, error) { return nil, ErrDatabaseClosed } if writable { - tx.rollbacks = make(map[string]*dbItem) + tx.wc = &txWriteContext{} + tx.wc.rollbacks = make(map[string]*dbItem) if db.persist { - tx.commits = make(map[string]*dbItem) + tx.wc.commits = make(map[string]*dbItem) } } return tx, nil @@ -981,7 +1045,13 @@ func (tx *Tx) unlock() { // rollbackInner handles the underlying rollback logic. // Intended to be called from Commit() and Rollback(). func (tx *Tx) rollbackInner() { - for key, item := range tx.rollbacks { + // rollback the deleteAll if needed + if tx.wc.rbkeys != nil { + tx.db.keys = tx.wc.rbkeys + tx.db.idxs = tx.wc.rbidxs + tx.db.exps = tx.wc.rbexps + } + for key, item := range tx.wc.rollbacks { tx.db.deleteFromDatabase(&dbItem{key: key}) if item != nil { // When an item is not nil, we will need to reinsert that item @@ -1004,10 +1074,14 @@ func (tx *Tx) commit() error { return ErrTxNotWritable } var err error - if tx.db.persist && len(tx.commits) > 0 { - // Each committed record is written to disk + if tx.db.persist && (len(tx.wc.commits) > 0 || tx.wc.rbkeys != nil) { tx.db.buf.Reset() - for key, item := range tx.commits { + // write a flushdb if a deleteAll was called. + if tx.wc.rbkeys != nil { + tx.db.buf.WriteString("*1\r\n$7\r\nflushdb\r\n") + } + // Each committed record is written to disk + for key, item := range tx.wc.commits { if item == nil { (&dbItem{key: key}).writeDeleteTo(tx.db.buf) } else { @@ -1025,7 +1099,6 @@ func (tx *Tx) commit() error { } // Increment the number of flushes. The background syncing uses this. tx.db.flushes++ - } // Unlock the database and allow for another writable transaction. tx.unlock() @@ -1196,7 +1269,7 @@ func (tx *Tx) Set(key, value string, opts *SetOptions) (previousValue string, return "", false, ErrTxClosed } else if !tx.writable { return "", false, ErrTxNotWritable - } else if tx.itercount > 0 { + } else if tx.wc.itercount > 0 { return "", false, ErrTxIterating } item := &dbItem{key: key, val: value} @@ -1209,27 +1282,32 @@ func (tx *Tx) Set(key, value string, opts *SetOptions) (previousValue string, } // Insert the item into the keys tree. prev := tx.db.insertIntoDatabase(item) - if prev == nil { - // An item with the same key did not previously exist. Let's create a - // rollback entry with a nil value. A nil value indicates that the - // entry should be deleted on rollback. When the value is *not* nil, - // that means the entry should be reverted. - tx.rollbacks[key] = nil - } else { - // A previous item already exists in the database. Let's create a - // rollback entry with the item as the value. We need to check the map - // to see if there isn't already an item that matches the same key. - if _, ok := tx.rollbacks[key]; !ok { - tx.rollbacks[key] = prev - } - if !prev.expired() { - previousValue, replaced = prev.val, true + + // insert into the rollback map if there has not been a deleteAll. + if tx.wc.rbkeys == nil { + if prev == nil { + // An item with the same key did not previously exist. Let's + // create a rollback entry with a nil value. A nil value indicates + // that the entry should be deleted on rollback. When the value is + // *not* nil, that means the entry should be reverted. + tx.wc.rollbacks[key] = nil + } else { + // A previous item already exists in the database. Let's create a + // rollback entry with the item as the value. We need to check the + // map to see if there isn't already an item that matches the + // same key. + if _, ok := tx.wc.rollbacks[key]; !ok { + tx.wc.rollbacks[key] = prev + } + if !prev.expired() { + previousValue, replaced = prev.val, true + } } } // For commits we simply assign the item to the map. We use this map to // write the entry to disk. if tx.db.persist { - tx.commits[key] = item + tx.wc.commits[key] = item } return previousValue, replaced, nil } @@ -1259,18 +1337,21 @@ func (tx *Tx) Delete(key string) (val string, err error) { return "", ErrTxClosed } else if !tx.writable { return "", ErrTxNotWritable - } else if tx.itercount > 0 { + } else if tx.wc.itercount > 0 { return "", ErrTxIterating } item := tx.db.deleteFromDatabase(&dbItem{key: key}) if item == nil { return "", ErrNotFound } - if _, ok := tx.rollbacks[key]; !ok { - tx.rollbacks[key] = item + // create a rollback entry if there has not been a deleteAll call. + if tx.wc.rbkeys == nil { + if _, ok := tx.wc.rollbacks[key]; !ok { + tx.wc.rollbacks[key] = item + } } if tx.db.persist { - tx.commits[key] = nil + tx.wc.commits[key] = nil } // Even though the item has been deleted, we still want to check // if it has expired. An expired item should not be returned. @@ -1349,10 +1430,12 @@ func (tx *Tx) scan(desc, gt, lt bool, index, start, stop string, } } // execute the scan on the underlying tree. - tx.itercount++ - defer func() { - tx.itercount-- - }() + if tx.wc != nil { + tx.wc.itercount++ + defer func() { + tx.wc.itercount-- + }() + } if desc { if gt { if lt { diff --git a/buntdb_test.go b/buntdb_test.go index 3cacf77..13d2d3b 100644 --- a/buntdb_test.go +++ b/buntdb_test.go @@ -184,6 +184,69 @@ func TestMutatingIterator(t *testing.T) { } } +func TestDeleteAll(t *testing.T) { + db := testOpen(t) + defer testClose(db) + + db.Update(func(tx *Tx) error { + tx.Set("hello1", "planet1", nil) + tx.Set("hello2", "planet2", nil) + tx.Set("hello3", "planet3", nil) + return nil + }) + db.CreateIndex("all", "*", IndexString) + db.Update(func(tx *Tx) error { + tx.Set("hello1", "planet1.1", nil) + tx.DeleteAll() + tx.Set("bb", "11", nil) + tx.Set("aa", "**", nil) + tx.Delete("aa") + tx.Set("aa", "22", nil) + return nil + }) + var res string + var res2 string + db.View(func(tx *Tx) error { + tx.Ascend("", func(key, val string) bool { + res += key + ":" + val + "\n" + return true + }) + tx.Ascend("all", func(key, val string) bool { + res2 += key + ":" + val + "\n" + return true + }) + return nil + }) + if res != "aa:22\nbb:11\n" { + t.Fatal("fail") + } + if res2 != "bb:11\naa:22\n" { + t.Fatal("fail") + } + db = testReOpen(t, db) + defer testClose(db) + res = "" + res2 = "" + db.CreateIndex("all", "*", IndexString) + db.View(func(tx *Tx) error { + tx.Ascend("", func(key, val string) bool { + res += key + ":" + val + "\n" + return true + }) + tx.Ascend("all", func(key, val string) bool { + res2 += key + ":" + val + "\n" + return true + }) + return nil + }) + if res != "aa:22\nbb:11\n" { + t.Fatal("fail") + } + if res2 != "bb:11\naa:22\n" { + t.Fatal("fail") + } +} + func TestVariousTx(t *testing.T) { db := testOpen(t) defer testClose(db)