From 454b94d04ac5044b3a43b7364e794b42470040a3 Mon Sep 17 00:00:00 2001 From: Josh Baker Date: Thu, 29 Sep 2016 15:54:51 -0700 Subject: [PATCH] transactional indexes --- buntdb.go | 382 +++++++++++++++++++++++++++++++++---------------- buntdb_test.go | 161 +++++++++++++++++++++ 2 files changed, 417 insertions(+), 126 deletions(-) diff --git a/buntdb.go b/buntdb.go index 27cc203..c46ba19 100644 --- a/buntdb.go +++ b/buntdb.go @@ -245,7 +245,6 @@ type index struct { } // clearCopy creates a copy of the index, but with an empty dataset. -// This is used with the DeleteAll command. func (idx *index) clearCopy() *index { // copy the index meta information nidx := &index{ @@ -265,6 +264,32 @@ func (idx *index) clearCopy() *index { return nidx } +// rebuild rebuilds the index +func (idx *index) rebuild() { + // initialize trees + if idx.less != nil { + idx.btr = btree.New(btreeDegrees, idx) + } + if idx.rect != nil { + idx.rtr = rtree.New(idx) + } + // iterate through all keys and fill the index + idx.db.keys.Ascend(func(item btree.Item) bool { + dbi := item.(*dbItem) + if idx.pattern != "*" && !match.Match(dbi.key, idx.pattern) { + // does not match the pattern, conintue + return true + } + if idx.less != nil { + idx.btr.ReplaceOrInsert(dbi) + } + if idx.rect != nil { + idx.rtr.Insert(dbi) + } + return true + }) +} + // CreateIndex builds a new index and populates it with items. // The items are ordered in an b-tree and can be retrieved using the // Ascend* and Descend* methods. @@ -280,18 +305,37 @@ func (idx *index) clearCopy() *index { // less function to handle the content format and comparison. // There are some default less function that can be used such as // IndexString, IndexBinary, etc. +// +// Deprecated: Use Transactions func (db *DB) CreateIndex(name, pattern string, less ...func(a, b string) bool) error { - return db.createIndex(false, name, pattern, less, nil) + return db.Update(func(tx *Tx) error { + return tx.CreateIndex(name, pattern, less...) + }) } // ReplaceIndex builds a new index and populates it with items. // The items are ordered in an b-tree and can be retrieved using the // Ascend* and Descend* methods. // If a previous index with the same name exists, that index will be deleted. +// +// Deprecated: Use Transactions func (db *DB) ReplaceIndex(name, pattern string, less ...func(a, b string) bool) error { - return db.createIndex(true, name, pattern, less, nil) + return db.Update(func(tx *Tx) error { + err := tx.CreateIndex(name, pattern, less...) + if err != nil { + if err == ErrIndexExists { + err := tx.DropIndex(name) + if err != nil { + return err + } + return tx.CreateIndex(name, pattern, less...) + } + return err + } + return nil + }) } // CreateSpatialIndex builds a new index and populates it with items. @@ -308,135 +352,59 @@ func (db *DB) ReplaceIndex(name, pattern string, // Thus min[0] must be less-than-or-equal-to max[0]. // The IndexRect is a default function that can be used for the rect // parameter. +// +// Deprecated: Use Transactions func (db *DB) CreateSpatialIndex(name, pattern string, rect func(item string) (min, max []float64)) error { - return db.createIndex(false, name, pattern, nil, rect) + return db.Update(func(tx *Tx) error { + return tx.CreateSpatialIndex(name, pattern, rect) + }) } // ReplaceSpatialIndex builds a new index and populates it with items. // The items are organized in an r-tree and can be retrieved using the // Intersects method. // If a previous index with the same name exists, that index will be deleted. +// +// Deprecated: Use Transactions func (db *DB) ReplaceSpatialIndex(name, pattern string, rect func(item string) (min, max []float64)) error { - return db.createIndex(true, name, pattern, nil, rect) -} - -// createIndex is called by CreateIndex() and CreateSpatialIndex() -func (db *DB) createIndex(replace bool, name string, pattern string, - lessers []func(a, b string) bool, - rect func(item string) (min, max []float64), -) error { - db.mu.Lock() - defer db.mu.Unlock() - if db.closed { - return ErrDatabaseClosed - } - if name == "" { - // cannot create an index without a name. - // an empty name index is designated for the main "keys" tree. - return ErrIndexExists - } - // check if an index with that name already exists. - if _, ok := db.idxs[name]; ok { - if replace { - // the "replace" param is specified, simply delete the old index. - delete(db.idxs, name) - } else { - // index with name already exists. error. - return ErrIndexExists - } - } - // genreate a less function - var less func(a, b string) bool - switch len(lessers) { - default: - // multiple less functions specified. - // create a compound less function. - less = func(a, b string) bool { - for i := 0; i < len(lessers)-1; i++ { - if lessers[i](a, b) { - return true - } - if lessers[i](b, a) { - return false + return db.Update(func(tx *Tx) error { + err := tx.CreateSpatialIndex(name, pattern, rect) + if err != nil { + if err == ErrIndexExists { + err := tx.DropIndex(name) + if err != nil { + return err } + return tx.CreateSpatialIndex(name, pattern, rect) } - return lessers[len(lessers)-1](a, b) + return err } - case 0: - // no less function - case 1: - less = lessers[0] - } - // intialize new index - idx := &index{ - name: name, - pattern: pattern, - less: less, - rect: rect, - db: db, - } - // initialize trees - if less != nil { - idx.btr = btree.New(btreeDegrees, idx) - } - if rect != nil { - idx.rtr = rtree.New(idx) - } - // iterate through all keys and fill the index - db.keys.Ascend(func(item btree.Item) bool { - dbi := item.(*dbItem) - if idx.pattern != "*" && !match.Match(dbi.key, idx.pattern) { - // does not match the pattern, conintue - return true - } - if less != nil { - idx.btr.ReplaceOrInsert(dbi) - } - if rect != nil { - idx.rtr.Insert(dbi) - } - return true + return nil }) - // save the index - db.idxs[name] = idx - return nil } // DropIndex removes an index. +// +// Deprecated: Use Transactions func (db *DB) DropIndex(name string) error { - db.mu.Lock() - defer db.mu.Unlock() - if db.closed { - return ErrDatabaseClosed - } - if name == "" { - // cannot drop the default "keys" index - return ErrInvalidOperation - } - if _, ok := db.idxs[name]; !ok { - return ErrNotFound - } - // delete from the map. - // this is all that is needed to delete an index. - delete(db.idxs, name) - return nil + return db.Update(func(tx *Tx) error { + return tx.DropIndex(name) + }) } // Indexes returns a list of index names. +// +// Deprecated: Use Transactions func (db *DB) Indexes() ([]string, error) { - db.mu.RLock() - defer db.mu.RUnlock() - if db.closed { - return nil, ErrDatabaseClosed - } - names := make([]string, 0, len(db.idxs)) - for name := range db.idxs { - names = append(names, name) - } - sort.Strings(names) - return names, nil + var names []string + var err = db.View(func(tx *Tx) error { + var err error + names, err = tx.Indexes() + return err + }) + return names, err } // ReadConfig returns the database configuration. @@ -985,9 +953,10 @@ type txWriteContext struct { rbexps *btree.BTree // a tree of items ordered by expiration rbidxs map[string]*index // the index trees. - rollbacks map[string]*dbItem // cotnains details for rolling back tx. - commits map[string]*dbItem // contains details for committing tx. - itercount int // stack of iterators + rollbackItems map[string]*dbItem // details for rolling back tx. + commitItems map[string]*dbItem // details for committing tx. + itercount int // stack of iterators + rollbackIndexes map[string]*index // details for dropped indexes. } // ClearAll deletes all items from the database. @@ -1019,7 +988,7 @@ func (tx *Tx) DeleteAll() error { } // always clear out the commits - tx.wc.commits = make(map[string]*dbItem) + tx.wc.commitItems = make(map[string]*dbItem) return nil } @@ -1042,10 +1011,13 @@ func (db *DB) begin(writable bool) (*Tx, error) { return nil, ErrDatabaseClosed } if writable { + // writable transactions have a writeContext object that + // contains information about changes to the database. tx.wc = &txWriteContext{} - tx.wc.rollbacks = make(map[string]*dbItem) + tx.wc.rollbackItems = make(map[string]*dbItem) + tx.wc.rollbackIndexes = make(map[string]*index) if db.persist { - tx.wc.commits = make(map[string]*dbItem) + tx.wc.commitItems = make(map[string]*dbItem) } } return tx, nil @@ -1078,7 +1050,7 @@ func (tx *Tx) rollbackInner() { tx.db.idxs = tx.wc.rbidxs tx.db.exps = tx.wc.rbexps } - for key, item := range tx.wc.rollbacks { + for key, item := range tx.wc.rollbackItems { tx.db.deleteFromDatabase(&dbItem{key: key}) if item != nil { // When an item is not nil, we will need to reinsert that item @@ -1086,6 +1058,16 @@ func (tx *Tx) rollbackInner() { tx.db.insertIntoDatabase(item) } } + for name, idx := range tx.wc.rollbackIndexes { + delete(tx.db.idxs, name) + if idx != nil { + // When an index is not nil, we will need to rebuilt that index + // this could be an expensive process if the database has many + // items or the index is complex. + tx.db.idxs[name] = idx + idx.rebuild() + } + } } // commit writes all changes to disk. @@ -1101,14 +1083,14 @@ func (tx *Tx) commit() error { return ErrTxNotWritable } var err error - if tx.db.persist && (len(tx.wc.commits) > 0 || tx.wc.rbkeys != nil) { + if tx.db.persist && (len(tx.wc.commitItems) > 0 || tx.wc.rbkeys != nil) { tx.db.buf.Reset() // write a flushdb if a deleteAll was called. if tx.wc.rbkeys != nil { tx.db.buf.WriteString("*1\r\n$7\r\nflushdb\r\n") } // Each committed record is written to disk - for key, item := range tx.wc.commits { + for key, item := range tx.wc.commitItems { if item == nil { (&dbItem{key: key}).writeDeleteTo(tx.db.buf) } else { @@ -1348,14 +1330,14 @@ func (tx *Tx) Set(key, value string, opts *SetOptions) (previousValue string, // create a rollback entry with a nil value. A nil value indicates // that the entry should be deleted on rollback. When the value is // *not* nil, that means the entry should be reverted. - tx.wc.rollbacks[key] = nil + tx.wc.rollbackItems[key] = nil } else { // A previous item already exists in the database. Let's create a // rollback entry with the item as the value. We need to check the // map to see if there isn't already an item that matches the // same key. - if _, ok := tx.wc.rollbacks[key]; !ok { - tx.wc.rollbacks[key] = prev + if _, ok := tx.wc.rollbackItems[key]; !ok { + tx.wc.rollbackItems[key] = prev } if !prev.expired() { previousValue, replaced = prev.val, true @@ -1365,7 +1347,7 @@ func (tx *Tx) Set(key, value string, opts *SetOptions) (previousValue string, // For commits we simply assign the item to the map. We use this map to // write the entry to disk. if tx.db.persist { - tx.wc.commits[key] = item + tx.wc.commitItems[key] = item } return previousValue, replaced, nil } @@ -1404,12 +1386,12 @@ func (tx *Tx) Delete(key string) (val string, err error) { } // create a rollback entry if there has not been a deleteAll call. if tx.wc.rbkeys == nil { - if _, ok := tx.wc.rollbacks[key]; !ok { - tx.wc.rollbacks[key] = item + if _, ok := tx.wc.rollbackItems[key]; !ok { + tx.wc.rollbackItems[key] = item } } if tx.db.persist { - tx.wc.commits[key] = nil + tx.wc.commitItems[key] = nil } // Even though the item has been deleted, we still want to check // if it has expired. An expired item should not be returned. @@ -1741,6 +1723,154 @@ func (tx *Tx) Len() (int, error) { return tx.db.keys.Len(), nil } +// CreateIndex builds a new index and populates it with items. +// The items are ordered in an b-tree and can be retrieved using the +// Ascend* and Descend* methods. +// An error will occur if an index with the same name already exists. +// +// When a pattern is provided, the index will be populated with +// keys that match the specified pattern. This is a very simple pattern +// match where '*' matches on any number characters and '?' matches on +// any one character. +// The less function compares if string 'a' is less than string 'b'. +// It allows for indexes to create custom ordering. It's possible +// that the strings may be textual or binary. It's up to the provided +// less function to handle the content format and comparison. +// There are some default less function that can be used such as +// IndexString, IndexBinary, etc. +func (tx *Tx) CreateIndex(name, pattern string, + less ...func(a, b string) bool) error { + return tx.createIndex(name, pattern, less, nil) +} + +// CreateSpatialIndex builds a new index and populates it with items. +// The items are organized in an r-tree and can be retrieved using the +// Intersects method. +// An error will occur if an index with the same name already exists. +// +// The rect function converts a string to a rectangle. The rectangle is +// represented by two arrays, min and max. Both arrays may have a length +// between 1 and 20, and both arrays must match in length. A length of 1 is a +// one dimensional rectangle, and a length of 4 is a four dimension rectangle. +// There is support for up to 20 dimensions. +// The values of min must be less than the values of max at the same dimension. +// Thus min[0] must be less-than-or-equal-to max[0]. +// The IndexRect is a default function that can be used for the rect +// parameter. +func (tx *Tx) CreateSpatialIndex(name, pattern string, + rect func(item string) (min, max []float64)) error { + return tx.createIndex(name, pattern, nil, rect) +} + +// createIndex is called by CreateIndex() and CreateSpatialIndex() +func (tx *Tx) createIndex(name string, pattern string, + lessers []func(a, b string) bool, + rect func(item string) (min, max []float64), +) error { + if tx.db == nil { + return ErrTxClosed + } else if !tx.writable { + return ErrTxNotWritable + } else if tx.wc.itercount > 0 { + return ErrTxIterating + } + if name == "" { + // cannot create an index without a name. + // an empty name index is designated for the main "keys" tree. + return ErrIndexExists + } + // check if an index with that name already exists. + if _, ok := tx.db.idxs[name]; ok { + // index with name already exists. error. + return ErrIndexExists + } + // genreate a less function + var less func(a, b string) bool + switch len(lessers) { + default: + // multiple less functions specified. + // create a compound less function. + less = func(a, b string) bool { + for i := 0; i < len(lessers)-1; i++ { + if lessers[i](a, b) { + return true + } + if lessers[i](b, a) { + return false + } + } + return lessers[len(lessers)-1](a, b) + } + case 0: + // no less function + case 1: + less = lessers[0] + } + // intialize new index + idx := &index{ + name: name, + pattern: pattern, + less: less, + rect: rect, + db: tx.db, + } + idx.rebuild() + // save the index + tx.db.idxs[name] = idx + if tx.wc.rbkeys == nil { + // store the index in the rollback map. + if _, ok := tx.wc.rollbackIndexes[name]; !ok { + // we use nil to indicate that the index should be removed upon rollback. + tx.wc.rollbackIndexes[name] = nil + } + } + return nil +} + +// DropIndex removes an index. +func (tx *Tx) DropIndex(name string) error { + if tx.db == nil { + return ErrTxClosed + } else if !tx.writable { + return ErrTxNotWritable + } else if tx.wc.itercount > 0 { + return ErrTxIterating + } + if name == "" { + // cannot drop the default "keys" index + return ErrInvalidOperation + } + idx, ok := tx.db.idxs[name] + if !ok { + return ErrNotFound + } + // delete from the map. + // this is all that is needed to delete an index. + delete(tx.db.idxs, name) + if tx.wc.rbkeys == nil { + // store the index in the rollback map. + if _, ok := tx.wc.rollbackIndexes[name]; !ok { + // we use a non-nil copy of the index without the data to indicate that the + // index should be rebuilt upon rollback. + tx.wc.rollbackIndexes[name] = idx.clearCopy() + } + } + return nil +} + +// Indexes returns a list of index names. +func (tx *Tx) Indexes() ([]string, error) { + if tx.db == nil { + return nil, ErrTxClosed + } + names := make([]string, 0, len(tx.db.idxs)) + for name := range tx.db.idxs { + names = append(names, name) + } + sort.Strings(names) + return names, nil +} + // Rect is helper function that returns a string representation // of a rect. IndexRect() is the reverse function and can be used // to generate a rect from a string. diff --git a/buntdb_test.go b/buntdb_test.go index 4bd7d24..1b34dd5 100644 --- a/buntdb_test.go +++ b/buntdb_test.go @@ -184,6 +184,167 @@ func TestMutatingIterator(t *testing.T) { } } +func TestIndexTransaction(t *testing.T) { + db := testOpen(t) + defer testClose(db) + var errFine = errors.New("this is fine") + ascend := func(tx *Tx, index string) ([]string, error) { + var vals []string + if err := tx.Ascend(index, func(key, val string) bool { + vals = append(vals, key, val) + return true + }); err != nil { + return nil, err + } + return vals, nil + } + ascendEqual := func(tx *Tx, index string, vals []string) error { + vals2, err := ascend(tx, index) + if err != nil { + return err + } + if len(vals) != len(vals2) { + return errors.New("invalid size match") + } + for i := 0; i < len(vals); i++ { + if vals[i] != vals2[i] { + return errors.New("invalid order") + } + } + return nil + } + // test creating an index and adding items + if err := db.Update(func(tx *Tx) error { + tx.Set("1", "3", nil) + tx.Set("2", "2", nil) + tx.Set("3", "1", nil) + if err := tx.CreateIndex("idx1", "*", IndexInt); err != nil { + return err + } + if err := ascendEqual(tx, "idx1", []string{"3", "1", "2", "2", "1", "3"}); err != nil { + return err + } + return nil + }); err != nil { + t.Fatal(err) + } + + // test to see if the items persisted from previous transaction + // test add item. + // test force rollback. + if err := db.Update(func(tx *Tx) error { + if err := ascendEqual(tx, "idx1", []string{"3", "1", "2", "2", "1", "3"}); err != nil { + return err + } + tx.Set("4", "0", nil) + if err := ascendEqual(tx, "idx1", []string{"4", "0", "3", "1", "2", "2", "1", "3"}); err != nil { + return err + } + return errFine + }); err != errFine { + t.Fatalf("expected '%v', got '%v'", errFine, err) + } + + // test to see if the rollback happened + if err := db.View(func(tx *Tx) error { + if err := ascendEqual(tx, "idx1", []string{"3", "1", "2", "2", "1", "3"}); err != nil { + return err + } + return nil + }); err != nil { + t.Fatalf("expected '%v', got '%v'", nil, err) + } + + // del item, drop index, rollback + if err := db.Update(func(tx *Tx) error { + if err := tx.DropIndex("idx1"); err != nil { + return err + } + return errFine + }); err != errFine { + t.Fatalf("expected '%v', got '%v'", errFine, err) + } + + // test to see if the rollback happened + if err := db.View(func(tx *Tx) error { + if err := ascendEqual(tx, "idx1", []string{"3", "1", "2", "2", "1", "3"}); err != nil { + return err + } + return nil + }); err != nil { + t.Fatalf("expected '%v', got '%v'", nil, err) + } + + various := func(reterr error) error { + // del item 3, add index 2, add item 4, test index 1 and 2. + // flushdb, test index 1 and 2. + // add item 1 and 2, add index 2 and 3, test index 2 and 3 + return db.Update(func(tx *Tx) error { + tx.Delete("3") + tx.CreateIndex("idx2", "*", IndexInt) + tx.Set("4", "0", nil) + if err := ascendEqual(tx, "idx1", []string{"4", "0", "2", "2", "1", "3"}); err != nil { + return fmt.Errorf("err: %v", err) + } + if err := ascendEqual(tx, "idx2", []string{"4", "0", "2", "2", "1", "3"}); err != nil { + return fmt.Errorf("err: %v", err) + } + tx.DeleteAll() + if err := ascendEqual(tx, "idx1", []string{}); err != nil { + return fmt.Errorf("err: %v", err) + } + if err := ascendEqual(tx, "idx2", []string{}); err != nil { + return fmt.Errorf("err: %v", err) + } + tx.Set("1", "3", nil) + tx.Set("2", "2", nil) + tx.CreateIndex("idx1", "*", IndexInt) + tx.CreateIndex("idx2", "*", IndexInt) + if err := ascendEqual(tx, "idx1", []string{"2", "2", "1", "3"}); err != nil { + return fmt.Errorf("err: %v", err) + } + if err := ascendEqual(tx, "idx2", []string{"2", "2", "1", "3"}); err != nil { + return fmt.Errorf("err: %v", err) + } + return reterr + }) + } + // various rollback + if err := various(errFine); err != errFine { + t.Fatalf("expected '%v', got '%v'", errFine, err) + } + // test to see if the rollback happened + if err := db.View(func(tx *Tx) error { + if err := ascendEqual(tx, "idx1", []string{"3", "1", "2", "2", "1", "3"}); err != nil { + return fmt.Errorf("err: %v", err) + } + if err := ascendEqual(tx, "idx2", []string{"3", "1", "2", "2", "1", "3"}); err != ErrNotFound { + return fmt.Errorf("err: %v", err) + } + return nil + }); err != nil { + t.Fatalf("expected '%v', got '%v'", nil, err) + } + + // various commit + if err := various(nil); err != nil { + t.Fatalf("expected '%v', got '%v'", nil, err) + } + + // test to see if the commit happened + if err := db.View(func(tx *Tx) error { + if err := ascendEqual(tx, "idx1", []string{"2", "2", "1", "3"}); err != nil { + return fmt.Errorf("err: %v", err) + } + if err := ascendEqual(tx, "idx2", []string{"2", "2", "1", "3"}); err != nil { + return fmt.Errorf("err: %v", err) + } + return nil + }); err != nil { + t.Fatalf("expected '%v', got '%v'", nil, err) + } +} + func TestDeleteAll(t *testing.T) { db := testOpen(t) defer testClose(db)