transactional indexes

This commit is contained in:
Josh Baker 2016-09-29 15:54:51 -07:00
parent 6cd540fffb
commit 454b94d04a
2 changed files with 417 additions and 126 deletions

382
buntdb.go
View File

@ -245,7 +245,6 @@ type index struct {
} }
// clearCopy creates a copy of the index, but with an empty dataset. // clearCopy creates a copy of the index, but with an empty dataset.
// This is used with the DeleteAll command.
func (idx *index) clearCopy() *index { func (idx *index) clearCopy() *index {
// copy the index meta information // copy the index meta information
nidx := &index{ nidx := &index{
@ -265,6 +264,32 @@ func (idx *index) clearCopy() *index {
return nidx return nidx
} }
// rebuild rebuilds the index
func (idx *index) rebuild() {
// initialize trees
if idx.less != nil {
idx.btr = btree.New(btreeDegrees, idx)
}
if idx.rect != nil {
idx.rtr = rtree.New(idx)
}
// iterate through all keys and fill the index
idx.db.keys.Ascend(func(item btree.Item) bool {
dbi := item.(*dbItem)
if idx.pattern != "*" && !match.Match(dbi.key, idx.pattern) {
// does not match the pattern, conintue
return true
}
if idx.less != nil {
idx.btr.ReplaceOrInsert(dbi)
}
if idx.rect != nil {
idx.rtr.Insert(dbi)
}
return true
})
}
// CreateIndex builds a new index and populates it with items. // CreateIndex builds a new index and populates it with items.
// The items are ordered in an b-tree and can be retrieved using the // The items are ordered in an b-tree and can be retrieved using the
// Ascend* and Descend* methods. // Ascend* and Descend* methods.
@ -280,18 +305,37 @@ func (idx *index) clearCopy() *index {
// less function to handle the content format and comparison. // less function to handle the content format and comparison.
// There are some default less function that can be used such as // There are some default less function that can be used such as
// IndexString, IndexBinary, etc. // IndexString, IndexBinary, etc.
//
// Deprecated: Use Transactions
func (db *DB) CreateIndex(name, pattern string, func (db *DB) CreateIndex(name, pattern string,
less ...func(a, b string) bool) error { less ...func(a, b string) bool) error {
return db.createIndex(false, name, pattern, less, nil) return db.Update(func(tx *Tx) error {
return tx.CreateIndex(name, pattern, less...)
})
} }
// ReplaceIndex builds a new index and populates it with items. // ReplaceIndex builds a new index and populates it with items.
// The items are ordered in an b-tree and can be retrieved using the // The items are ordered in an b-tree and can be retrieved using the
// Ascend* and Descend* methods. // Ascend* and Descend* methods.
// If a previous index with the same name exists, that index will be deleted. // If a previous index with the same name exists, that index will be deleted.
//
// Deprecated: Use Transactions
func (db *DB) ReplaceIndex(name, pattern string, func (db *DB) ReplaceIndex(name, pattern string,
less ...func(a, b string) bool) error { less ...func(a, b string) bool) error {
return db.createIndex(true, name, pattern, less, nil) return db.Update(func(tx *Tx) error {
err := tx.CreateIndex(name, pattern, less...)
if err != nil {
if err == ErrIndexExists {
err := tx.DropIndex(name)
if err != nil {
return err
}
return tx.CreateIndex(name, pattern, less...)
}
return err
}
return nil
})
} }
// CreateSpatialIndex builds a new index and populates it with items. // CreateSpatialIndex builds a new index and populates it with items.
@ -308,135 +352,59 @@ func (db *DB) ReplaceIndex(name, pattern string,
// Thus min[0] must be less-than-or-equal-to max[0]. // Thus min[0] must be less-than-or-equal-to max[0].
// The IndexRect is a default function that can be used for the rect // The IndexRect is a default function that can be used for the rect
// parameter. // parameter.
//
// Deprecated: Use Transactions
func (db *DB) CreateSpatialIndex(name, pattern string, func (db *DB) CreateSpatialIndex(name, pattern string,
rect func(item string) (min, max []float64)) error { rect func(item string) (min, max []float64)) error {
return db.createIndex(false, name, pattern, nil, rect) return db.Update(func(tx *Tx) error {
return tx.CreateSpatialIndex(name, pattern, rect)
})
} }
// ReplaceSpatialIndex builds a new index and populates it with items. // ReplaceSpatialIndex builds a new index and populates it with items.
// The items are organized in an r-tree and can be retrieved using the // The items are organized in an r-tree and can be retrieved using the
// Intersects method. // Intersects method.
// If a previous index with the same name exists, that index will be deleted. // If a previous index with the same name exists, that index will be deleted.
//
// Deprecated: Use Transactions
func (db *DB) ReplaceSpatialIndex(name, pattern string, func (db *DB) ReplaceSpatialIndex(name, pattern string,
rect func(item string) (min, max []float64)) error { rect func(item string) (min, max []float64)) error {
return db.createIndex(true, name, pattern, nil, rect) return db.Update(func(tx *Tx) error {
} err := tx.CreateSpatialIndex(name, pattern, rect)
if err != nil {
// createIndex is called by CreateIndex() and CreateSpatialIndex() if err == ErrIndexExists {
func (db *DB) createIndex(replace bool, name string, pattern string, err := tx.DropIndex(name)
lessers []func(a, b string) bool, if err != nil {
rect func(item string) (min, max []float64), return err
) error {
db.mu.Lock()
defer db.mu.Unlock()
if db.closed {
return ErrDatabaseClosed
}
if name == "" {
// cannot create an index without a name.
// an empty name index is designated for the main "keys" tree.
return ErrIndexExists
}
// check if an index with that name already exists.
if _, ok := db.idxs[name]; ok {
if replace {
// the "replace" param is specified, simply delete the old index.
delete(db.idxs, name)
} else {
// index with name already exists. error.
return ErrIndexExists
}
}
// genreate a less function
var less func(a, b string) bool
switch len(lessers) {
default:
// multiple less functions specified.
// create a compound less function.
less = func(a, b string) bool {
for i := 0; i < len(lessers)-1; i++ {
if lessers[i](a, b) {
return true
}
if lessers[i](b, a) {
return false
} }
return tx.CreateSpatialIndex(name, pattern, rect)
} }
return lessers[len(lessers)-1](a, b) return err
} }
case 0: return nil
// no less function
case 1:
less = lessers[0]
}
// intialize new index
idx := &index{
name: name,
pattern: pattern,
less: less,
rect: rect,
db: db,
}
// initialize trees
if less != nil {
idx.btr = btree.New(btreeDegrees, idx)
}
if rect != nil {
idx.rtr = rtree.New(idx)
}
// iterate through all keys and fill the index
db.keys.Ascend(func(item btree.Item) bool {
dbi := item.(*dbItem)
if idx.pattern != "*" && !match.Match(dbi.key, idx.pattern) {
// does not match the pattern, conintue
return true
}
if less != nil {
idx.btr.ReplaceOrInsert(dbi)
}
if rect != nil {
idx.rtr.Insert(dbi)
}
return true
}) })
// save the index
db.idxs[name] = idx
return nil
} }
// DropIndex removes an index. // DropIndex removes an index.
//
// Deprecated: Use Transactions
func (db *DB) DropIndex(name string) error { func (db *DB) DropIndex(name string) error {
db.mu.Lock() return db.Update(func(tx *Tx) error {
defer db.mu.Unlock() return tx.DropIndex(name)
if db.closed { })
return ErrDatabaseClosed
}
if name == "" {
// cannot drop the default "keys" index
return ErrInvalidOperation
}
if _, ok := db.idxs[name]; !ok {
return ErrNotFound
}
// delete from the map.
// this is all that is needed to delete an index.
delete(db.idxs, name)
return nil
} }
// Indexes returns a list of index names. // Indexes returns a list of index names.
//
// Deprecated: Use Transactions
func (db *DB) Indexes() ([]string, error) { func (db *DB) Indexes() ([]string, error) {
db.mu.RLock() var names []string
defer db.mu.RUnlock() var err = db.View(func(tx *Tx) error {
if db.closed { var err error
return nil, ErrDatabaseClosed names, err = tx.Indexes()
} return err
names := make([]string, 0, len(db.idxs)) })
for name := range db.idxs { return names, err
names = append(names, name)
}
sort.Strings(names)
return names, nil
} }
// ReadConfig returns the database configuration. // ReadConfig returns the database configuration.
@ -985,9 +953,10 @@ type txWriteContext struct {
rbexps *btree.BTree // a tree of items ordered by expiration rbexps *btree.BTree // a tree of items ordered by expiration
rbidxs map[string]*index // the index trees. rbidxs map[string]*index // the index trees.
rollbacks map[string]*dbItem // cotnains details for rolling back tx. rollbackItems map[string]*dbItem // details for rolling back tx.
commits map[string]*dbItem // contains details for committing tx. commitItems map[string]*dbItem // details for committing tx.
itercount int // stack of iterators itercount int // stack of iterators
rollbackIndexes map[string]*index // details for dropped indexes.
} }
// ClearAll deletes all items from the database. // ClearAll deletes all items from the database.
@ -1019,7 +988,7 @@ func (tx *Tx) DeleteAll() error {
} }
// always clear out the commits // always clear out the commits
tx.wc.commits = make(map[string]*dbItem) tx.wc.commitItems = make(map[string]*dbItem)
return nil return nil
} }
@ -1042,10 +1011,13 @@ func (db *DB) begin(writable bool) (*Tx, error) {
return nil, ErrDatabaseClosed return nil, ErrDatabaseClosed
} }
if writable { if writable {
// writable transactions have a writeContext object that
// contains information about changes to the database.
tx.wc = &txWriteContext{} tx.wc = &txWriteContext{}
tx.wc.rollbacks = make(map[string]*dbItem) tx.wc.rollbackItems = make(map[string]*dbItem)
tx.wc.rollbackIndexes = make(map[string]*index)
if db.persist { if db.persist {
tx.wc.commits = make(map[string]*dbItem) tx.wc.commitItems = make(map[string]*dbItem)
} }
} }
return tx, nil return tx, nil
@ -1078,7 +1050,7 @@ func (tx *Tx) rollbackInner() {
tx.db.idxs = tx.wc.rbidxs tx.db.idxs = tx.wc.rbidxs
tx.db.exps = tx.wc.rbexps tx.db.exps = tx.wc.rbexps
} }
for key, item := range tx.wc.rollbacks { for key, item := range tx.wc.rollbackItems {
tx.db.deleteFromDatabase(&dbItem{key: key}) tx.db.deleteFromDatabase(&dbItem{key: key})
if item != nil { if item != nil {
// When an item is not nil, we will need to reinsert that item // When an item is not nil, we will need to reinsert that item
@ -1086,6 +1058,16 @@ func (tx *Tx) rollbackInner() {
tx.db.insertIntoDatabase(item) tx.db.insertIntoDatabase(item)
} }
} }
for name, idx := range tx.wc.rollbackIndexes {
delete(tx.db.idxs, name)
if idx != nil {
// When an index is not nil, we will need to rebuilt that index
// this could be an expensive process if the database has many
// items or the index is complex.
tx.db.idxs[name] = idx
idx.rebuild()
}
}
} }
// commit writes all changes to disk. // commit writes all changes to disk.
@ -1101,14 +1083,14 @@ func (tx *Tx) commit() error {
return ErrTxNotWritable return ErrTxNotWritable
} }
var err error var err error
if tx.db.persist && (len(tx.wc.commits) > 0 || tx.wc.rbkeys != nil) { if tx.db.persist && (len(tx.wc.commitItems) > 0 || tx.wc.rbkeys != nil) {
tx.db.buf.Reset() tx.db.buf.Reset()
// write a flushdb if a deleteAll was called. // write a flushdb if a deleteAll was called.
if tx.wc.rbkeys != nil { if tx.wc.rbkeys != nil {
tx.db.buf.WriteString("*1\r\n$7\r\nflushdb\r\n") tx.db.buf.WriteString("*1\r\n$7\r\nflushdb\r\n")
} }
// Each committed record is written to disk // Each committed record is written to disk
for key, item := range tx.wc.commits { for key, item := range tx.wc.commitItems {
if item == nil { if item == nil {
(&dbItem{key: key}).writeDeleteTo(tx.db.buf) (&dbItem{key: key}).writeDeleteTo(tx.db.buf)
} else { } else {
@ -1348,14 +1330,14 @@ func (tx *Tx) Set(key, value string, opts *SetOptions) (previousValue string,
// create a rollback entry with a nil value. A nil value indicates // create a rollback entry with a nil value. A nil value indicates
// that the entry should be deleted on rollback. When the value is // that the entry should be deleted on rollback. When the value is
// *not* nil, that means the entry should be reverted. // *not* nil, that means the entry should be reverted.
tx.wc.rollbacks[key] = nil tx.wc.rollbackItems[key] = nil
} else { } else {
// A previous item already exists in the database. Let's create a // A previous item already exists in the database. Let's create a
// rollback entry with the item as the value. We need to check the // rollback entry with the item as the value. We need to check the
// map to see if there isn't already an item that matches the // map to see if there isn't already an item that matches the
// same key. // same key.
if _, ok := tx.wc.rollbacks[key]; !ok { if _, ok := tx.wc.rollbackItems[key]; !ok {
tx.wc.rollbacks[key] = prev tx.wc.rollbackItems[key] = prev
} }
if !prev.expired() { if !prev.expired() {
previousValue, replaced = prev.val, true previousValue, replaced = prev.val, true
@ -1365,7 +1347,7 @@ func (tx *Tx) Set(key, value string, opts *SetOptions) (previousValue string,
// For commits we simply assign the item to the map. We use this map to // For commits we simply assign the item to the map. We use this map to
// write the entry to disk. // write the entry to disk.
if tx.db.persist { if tx.db.persist {
tx.wc.commits[key] = item tx.wc.commitItems[key] = item
} }
return previousValue, replaced, nil return previousValue, replaced, nil
} }
@ -1404,12 +1386,12 @@ func (tx *Tx) Delete(key string) (val string, err error) {
} }
// create a rollback entry if there has not been a deleteAll call. // create a rollback entry if there has not been a deleteAll call.
if tx.wc.rbkeys == nil { if tx.wc.rbkeys == nil {
if _, ok := tx.wc.rollbacks[key]; !ok { if _, ok := tx.wc.rollbackItems[key]; !ok {
tx.wc.rollbacks[key] = item tx.wc.rollbackItems[key] = item
} }
} }
if tx.db.persist { if tx.db.persist {
tx.wc.commits[key] = nil tx.wc.commitItems[key] = nil
} }
// Even though the item has been deleted, we still want to check // Even though the item has been deleted, we still want to check
// if it has expired. An expired item should not be returned. // if it has expired. An expired item should not be returned.
@ -1741,6 +1723,154 @@ func (tx *Tx) Len() (int, error) {
return tx.db.keys.Len(), nil return tx.db.keys.Len(), nil
} }
// CreateIndex builds a new index and populates it with items.
// The items are ordered in an b-tree and can be retrieved using the
// Ascend* and Descend* methods.
// An error will occur if an index with the same name already exists.
//
// When a pattern is provided, the index will be populated with
// keys that match the specified pattern. This is a very simple pattern
// match where '*' matches on any number characters and '?' matches on
// any one character.
// The less function compares if string 'a' is less than string 'b'.
// It allows for indexes to create custom ordering. It's possible
// that the strings may be textual or binary. It's up to the provided
// less function to handle the content format and comparison.
// There are some default less function that can be used such as
// IndexString, IndexBinary, etc.
func (tx *Tx) CreateIndex(name, pattern string,
less ...func(a, b string) bool) error {
return tx.createIndex(name, pattern, less, nil)
}
// CreateSpatialIndex builds a new index and populates it with items.
// The items are organized in an r-tree and can be retrieved using the
// Intersects method.
// An error will occur if an index with the same name already exists.
//
// The rect function converts a string to a rectangle. The rectangle is
// represented by two arrays, min and max. Both arrays may have a length
// between 1 and 20, and both arrays must match in length. A length of 1 is a
// one dimensional rectangle, and a length of 4 is a four dimension rectangle.
// There is support for up to 20 dimensions.
// The values of min must be less than the values of max at the same dimension.
// Thus min[0] must be less-than-or-equal-to max[0].
// The IndexRect is a default function that can be used for the rect
// parameter.
func (tx *Tx) CreateSpatialIndex(name, pattern string,
rect func(item string) (min, max []float64)) error {
return tx.createIndex(name, pattern, nil, rect)
}
// createIndex is called by CreateIndex() and CreateSpatialIndex()
func (tx *Tx) createIndex(name string, pattern string,
lessers []func(a, b string) bool,
rect func(item string) (min, max []float64),
) error {
if tx.db == nil {
return ErrTxClosed
} else if !tx.writable {
return ErrTxNotWritable
} else if tx.wc.itercount > 0 {
return ErrTxIterating
}
if name == "" {
// cannot create an index without a name.
// an empty name index is designated for the main "keys" tree.
return ErrIndexExists
}
// check if an index with that name already exists.
if _, ok := tx.db.idxs[name]; ok {
// index with name already exists. error.
return ErrIndexExists
}
// genreate a less function
var less func(a, b string) bool
switch len(lessers) {
default:
// multiple less functions specified.
// create a compound less function.
less = func(a, b string) bool {
for i := 0; i < len(lessers)-1; i++ {
if lessers[i](a, b) {
return true
}
if lessers[i](b, a) {
return false
}
}
return lessers[len(lessers)-1](a, b)
}
case 0:
// no less function
case 1:
less = lessers[0]
}
// intialize new index
idx := &index{
name: name,
pattern: pattern,
less: less,
rect: rect,
db: tx.db,
}
idx.rebuild()
// save the index
tx.db.idxs[name] = idx
if tx.wc.rbkeys == nil {
// store the index in the rollback map.
if _, ok := tx.wc.rollbackIndexes[name]; !ok {
// we use nil to indicate that the index should be removed upon rollback.
tx.wc.rollbackIndexes[name] = nil
}
}
return nil
}
// DropIndex removes an index.
func (tx *Tx) DropIndex(name string) error {
if tx.db == nil {
return ErrTxClosed
} else if !tx.writable {
return ErrTxNotWritable
} else if tx.wc.itercount > 0 {
return ErrTxIterating
}
if name == "" {
// cannot drop the default "keys" index
return ErrInvalidOperation
}
idx, ok := tx.db.idxs[name]
if !ok {
return ErrNotFound
}
// delete from the map.
// this is all that is needed to delete an index.
delete(tx.db.idxs, name)
if tx.wc.rbkeys == nil {
// store the index in the rollback map.
if _, ok := tx.wc.rollbackIndexes[name]; !ok {
// we use a non-nil copy of the index without the data to indicate that the
// index should be rebuilt upon rollback.
tx.wc.rollbackIndexes[name] = idx.clearCopy()
}
}
return nil
}
// Indexes returns a list of index names.
func (tx *Tx) Indexes() ([]string, error) {
if tx.db == nil {
return nil, ErrTxClosed
}
names := make([]string, 0, len(tx.db.idxs))
for name := range tx.db.idxs {
names = append(names, name)
}
sort.Strings(names)
return names, nil
}
// Rect is helper function that returns a string representation // Rect is helper function that returns a string representation
// of a rect. IndexRect() is the reverse function and can be used // of a rect. IndexRect() is the reverse function and can be used
// to generate a rect from a string. // to generate a rect from a string.

View File

@ -184,6 +184,167 @@ func TestMutatingIterator(t *testing.T) {
} }
} }
func TestIndexTransaction(t *testing.T) {
db := testOpen(t)
defer testClose(db)
var errFine = errors.New("this is fine")
ascend := func(tx *Tx, index string) ([]string, error) {
var vals []string
if err := tx.Ascend(index, func(key, val string) bool {
vals = append(vals, key, val)
return true
}); err != nil {
return nil, err
}
return vals, nil
}
ascendEqual := func(tx *Tx, index string, vals []string) error {
vals2, err := ascend(tx, index)
if err != nil {
return err
}
if len(vals) != len(vals2) {
return errors.New("invalid size match")
}
for i := 0; i < len(vals); i++ {
if vals[i] != vals2[i] {
return errors.New("invalid order")
}
}
return nil
}
// test creating an index and adding items
if err := db.Update(func(tx *Tx) error {
tx.Set("1", "3", nil)
tx.Set("2", "2", nil)
tx.Set("3", "1", nil)
if err := tx.CreateIndex("idx1", "*", IndexInt); err != nil {
return err
}
if err := ascendEqual(tx, "idx1", []string{"3", "1", "2", "2", "1", "3"}); err != nil {
return err
}
return nil
}); err != nil {
t.Fatal(err)
}
// test to see if the items persisted from previous transaction
// test add item.
// test force rollback.
if err := db.Update(func(tx *Tx) error {
if err := ascendEqual(tx, "idx1", []string{"3", "1", "2", "2", "1", "3"}); err != nil {
return err
}
tx.Set("4", "0", nil)
if err := ascendEqual(tx, "idx1", []string{"4", "0", "3", "1", "2", "2", "1", "3"}); err != nil {
return err
}
return errFine
}); err != errFine {
t.Fatalf("expected '%v', got '%v'", errFine, err)
}
// test to see if the rollback happened
if err := db.View(func(tx *Tx) error {
if err := ascendEqual(tx, "idx1", []string{"3", "1", "2", "2", "1", "3"}); err != nil {
return err
}
return nil
}); err != nil {
t.Fatalf("expected '%v', got '%v'", nil, err)
}
// del item, drop index, rollback
if err := db.Update(func(tx *Tx) error {
if err := tx.DropIndex("idx1"); err != nil {
return err
}
return errFine
}); err != errFine {
t.Fatalf("expected '%v', got '%v'", errFine, err)
}
// test to see if the rollback happened
if err := db.View(func(tx *Tx) error {
if err := ascendEqual(tx, "idx1", []string{"3", "1", "2", "2", "1", "3"}); err != nil {
return err
}
return nil
}); err != nil {
t.Fatalf("expected '%v', got '%v'", nil, err)
}
various := func(reterr error) error {
// del item 3, add index 2, add item 4, test index 1 and 2.
// flushdb, test index 1 and 2.
// add item 1 and 2, add index 2 and 3, test index 2 and 3
return db.Update(func(tx *Tx) error {
tx.Delete("3")
tx.CreateIndex("idx2", "*", IndexInt)
tx.Set("4", "0", nil)
if err := ascendEqual(tx, "idx1", []string{"4", "0", "2", "2", "1", "3"}); err != nil {
return fmt.Errorf("err: %v", err)
}
if err := ascendEqual(tx, "idx2", []string{"4", "0", "2", "2", "1", "3"}); err != nil {
return fmt.Errorf("err: %v", err)
}
tx.DeleteAll()
if err := ascendEqual(tx, "idx1", []string{}); err != nil {
return fmt.Errorf("err: %v", err)
}
if err := ascendEqual(tx, "idx2", []string{}); err != nil {
return fmt.Errorf("err: %v", err)
}
tx.Set("1", "3", nil)
tx.Set("2", "2", nil)
tx.CreateIndex("idx1", "*", IndexInt)
tx.CreateIndex("idx2", "*", IndexInt)
if err := ascendEqual(tx, "idx1", []string{"2", "2", "1", "3"}); err != nil {
return fmt.Errorf("err: %v", err)
}
if err := ascendEqual(tx, "idx2", []string{"2", "2", "1", "3"}); err != nil {
return fmt.Errorf("err: %v", err)
}
return reterr
})
}
// various rollback
if err := various(errFine); err != errFine {
t.Fatalf("expected '%v', got '%v'", errFine, err)
}
// test to see if the rollback happened
if err := db.View(func(tx *Tx) error {
if err := ascendEqual(tx, "idx1", []string{"3", "1", "2", "2", "1", "3"}); err != nil {
return fmt.Errorf("err: %v", err)
}
if err := ascendEqual(tx, "idx2", []string{"3", "1", "2", "2", "1", "3"}); err != ErrNotFound {
return fmt.Errorf("err: %v", err)
}
return nil
}); err != nil {
t.Fatalf("expected '%v', got '%v'", nil, err)
}
// various commit
if err := various(nil); err != nil {
t.Fatalf("expected '%v', got '%v'", nil, err)
}
// test to see if the commit happened
if err := db.View(func(tx *Tx) error {
if err := ascendEqual(tx, "idx1", []string{"2", "2", "1", "3"}); err != nil {
return fmt.Errorf("err: %v", err)
}
if err := ascendEqual(tx, "idx2", []string{"2", "2", "1", "3"}); err != nil {
return fmt.Errorf("err: %v", err)
}
return nil
}); err != nil {
t.Fatalf("expected '%v', got '%v'", nil, err)
}
}
func TestDeleteAll(t *testing.T) { func TestDeleteAll(t *testing.T) {
db := testOpen(t) db := testOpen(t)
defer testClose(db) defer testClose(db)