optional no-persist with :memory: path. benchmarks

This commit is contained in:
Josh Baker 2016-07-23 10:20:38 -07:00
parent fe40e74afd
commit d121dd81dd
3 changed files with 395 additions and 17 deletions

View File

@ -79,6 +79,12 @@ func main() {
}
```
It's also possible to open a database that does not persist to disk by using `:memory:` as the path of the file.
```go
buntdb.Open(":memory:") // Open a file that does not persist to disk.
```
## Transactions
All reads and writes must be performed from inside a transaction. BuntDB can have one write transaction opened at a time, but can have many concurrent read transactions. Each transaction maintains a stable view of the database. In other words, once a transaction has begun, the data for that transaction cannot be changed by other transactions.

View File

@ -25,21 +25,29 @@ var (
// ErrTxNotWritable is returned when performing a write operation on a
// read-only transaction.
ErrTxNotWritable = errors.New("tx not writable")
// ErrTxClosed is returned when committing or rolling back a transaction
// that has already been committed or rolled back.
ErrTxClosed = errors.New("tx closed")
// ErrNotFound is returned when an item or index is not in the database.
ErrNotFound = errors.New("not found")
// ErrInvalid is returned when the database file is an invalid format.
ErrInvalid = errors.New("invalid database")
// ErrDatabaseClosed is returned when the database is closed.
ErrDatabaseClosed = errors.New("database closed")
// ErrIndexExists is returned when an index already exists in the database.
ErrIndexExists = errors.New("index exists")
// ErrInvalidOperation is returned when an operation cannot be completed.
ErrInvalidOperation = errors.New("invalid operation")
// ErrInvalidAutoShrink is returned for an invalid AutoShrink value.
ErrInvalidAutoShrink = errors.New("invalid autoshink")
// ErrInvalidSyncPolicy is returned for an invalid SyncPolicy value.
ErrInvalidSyncPolicy = errors.New("invalid sync policy")
)
@ -64,6 +72,7 @@ type DB struct {
closed bool // set when the database has been closed
aoflen int // the number of lines in the aof file
config Config // the database configuration
persist bool // do we write to disk
}
// SyncPolicy represents how often data is synced to disk.
@ -118,17 +127,20 @@ func Open(path string) (*DB, error) {
SyncPolicy: EverySecond,
AutoShrink: 5,
}
var err error
// Hardcoding 0666 as the default mode.
db.file, err = os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return nil, err
db.persist = path != ":memory:"
if db.persist {
var err error
// Hardcoding 0666 as the default mode.
db.file, err = os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return nil, err
}
if err := db.load(); err != nil {
db.file.Close()
return nil, err
}
db.bufw = bufio.NewWriter(db.file)
}
if err := db.load(); err != nil {
db.file.Close()
return nil, err
}
db.bufw = bufio.NewWriter(db.file)
// start the background manager.
go db.backgroundManager()
return db, nil
@ -143,7 +155,10 @@ func (db *DB) Close() error {
return ErrDatabaseClosed
}
db.closed = true
return db.file.Close()
if db.persist {
return db.file.Close()
}
return nil
}
// index represents a b-tree or r-tree index and also acts as the
@ -446,7 +461,8 @@ func (db *DB) backgroundManager() {
}
// execute a disk sync.
if db.config.SyncPolicy == EverySecond && flushes != db.flushes {
if db.persist && db.config.SyncPolicy == EverySecond &&
flushes != db.flushes {
db.file.Sync()
flushes = db.flushes
}
@ -461,13 +477,20 @@ func (db *DB) backgroundManager() {
}
}
// Shrink function description [here]
// Shrink will make the database file smaller by removing redundent
// log entries. This operation does not block the database.
func (db *DB) Shrink() error {
db.mu.Lock()
if db.closed {
db.mu.Unlock()
return ErrDatabaseClosed
}
if !db.persist {
// The database was opened with ":memory:" as the path.
// There is no persistence, and no need to do anything here.
db.mu.Unlock()
return nil
}
fname := db.file.Name()
tmpname := fname + ".tmp"
// the endpos is used to return to the end of the file when we are
@ -792,7 +815,9 @@ func (db *DB) begin(writable bool) (*Tx, error) {
}
if writable {
tx.rollbacks = make(map[string]*dbItem)
tx.commits = make(map[string]*dbItem)
if db.persist {
tx.commits = make(map[string]*dbItem)
}
}
tx.lock()
if db.closed {
@ -846,7 +871,7 @@ func (tx *Tx) commit() error {
return ErrTxNotWritable
}
var err error
if len(tx.commits) > 0 {
if tx.db.persist && len(tx.commits) > 0 {
// Each committed record is written to disk
for key, item := range tx.commits {
if item == nil {
@ -1050,7 +1075,9 @@ func (tx *Tx) Set(key, value string, opts *SetOptions) (previousValue string,
}
// For commits we simply assign the item to the map. We use this map to
// write the entry to disk.
tx.commits[key] = item
if tx.db.persist {
tx.commits[key] = item
}
return previousValue, replaced, nil
}
@ -1089,7 +1116,9 @@ func (tx *Tx) Delete(key string) (val string, err error) {
if _, ok := tx.rollbacks[key]; !ok {
tx.rollbacks[key] = item
}
tx.commits[key] = nil
if tx.db.persist {
tx.commits[key] = nil
}
// Even though the item has been deleted, we still want to check
// if it has expired. An expired item should not be returned.
if item.expired() {

View File

@ -8,6 +8,7 @@ import (
"io/ioutil"
"math/rand"
"os"
"strconv"
"strings"
"testing"
"time"
@ -517,6 +518,111 @@ func TestOpeningClosedDatabase(t *testing.T) {
if err := db.Close(); err != ErrDatabaseClosed {
t.Fatal("should not be able to close a closed database")
}
db, err = Open(":memory:")
if err != nil {
t.Fatal(err)
}
if err := db.Close(); err != nil {
t.Fatal(err)
}
if err := db.Close(); err != ErrDatabaseClosed {
t.Fatal("should not be able to close a closed database")
}
}
// test shrinking a database.
func TestShrink(t *testing.T) {
os.RemoveAll("data.db")
db, err := Open("data.db")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll("data.db")
defer db.Close()
if err := db.Shrink(); err != nil {
t.Fatal(err)
}
fi, err := os.Stat("data.db")
if err != nil {
t.Fatal(err)
}
if fi.Size() != 0 {
t.Fatalf("expected %v, got %v", 0, fi.Size())
}
// add 10 items
db.Update(func(tx *Tx) error {
for i := 0; i < 10; i++ {
tx.Set(fmt.Sprintf("key%d", i), fmt.Sprintf("val%d", i), nil)
}
return nil
})
// add the same 10 items
// this will create 10 duplicate log entries
db.Update(func(tx *Tx) error {
for i := 0; i < 10; i++ {
tx.Set(fmt.Sprintf("key%d", i), fmt.Sprintf("val%d", i), nil)
}
return nil
})
fi, err = os.Stat("data.db")
if err != nil {
t.Fatal(err)
}
sz1 := fi.Size()
if sz1 == 0 {
t.Fatalf("expected > 0, got %v", sz1)
}
if err := db.Shrink(); err != nil {
t.Fatal(err)
}
fi, err = os.Stat("data.db")
if err != nil {
t.Fatal(err)
}
sz2 := fi.Size()
if sz2 >= sz1 {
t.Fatalf("expected < %v, got %v", sz1, sz2)
}
db.Close()
if err := db.Shrink(); err != ErrDatabaseClosed {
t.Fatal("shrink on a closed databse should not be allowed")
}
// Now we will open a db that does not persist
db, err = Open(":memory:")
if err != nil {
t.Fatal(err)
}
defer db.Close()
// add 10 items
db.Update(func(tx *Tx) error {
for i := 0; i < 10; i++ {
tx.Set(fmt.Sprintf("key%d", i), fmt.Sprintf("val%d", i), nil)
}
return nil
})
// add the same 10 items
// this will create 10 duplicate log entries
db.Update(func(tx *Tx) error {
for i := 0; i < 10; i++ {
tx.Set(fmt.Sprintf("key%d", i), fmt.Sprintf("val%d", i), nil)
}
return nil
})
db.View(func(tx *Tx) error {
n, err := tx.Len()
if err != nil {
t.Fatal(err)
}
if n != 10 {
t.Fatalf("expecting %v, got %v", 10, n)
}
return nil
})
// this should succeed even though it's basically a noop.
if err := db.Shrink(); err != nil {
t.Fatal(err)
}
}
func TestVariousIndexOperations(t *testing.T) {
@ -875,3 +981,240 @@ func TestConfig(t *testing.T) {
t.Fatalf("expecting %v and %v, got %v and %v", 6, Always, c.AutoShrink, c.SyncPolicy)
}
}
func testUint64Hex(n uint64) string {
s := strconv.FormatUint(n, 16)
s = "0000000000000000" + s
return s[len(s)-16:]
}
func textHexUint64(s string) uint64 {
n, _ := strconv.ParseUint(s, 16, 64)
return n
}
func benchClose(t *testing.B, persist bool, db *DB) {
if persist {
os.RemoveAll("data.db")
}
db.Close()
}
func benchOpenFillData(t *testing.B, N int,
set, persist, random bool,
geo bool,
batch int) (db *DB, keys, vals []string) {
///
t.StopTimer()
rand.Seed(time.Now().UnixNano())
var err error
if persist {
os.RemoveAll("data.db")
db, err = Open("data.db")
} else {
db, err = Open(":memory:")
}
if err != nil {
t.Fatal(err)
}
keys = make([]string, N)
vals = make([]string, N)
perm := rand.Perm(N)
for i := 0; i < N; i++ {
if random && set {
keys[perm[i]] = testUint64Hex(uint64(i))
vals[perm[i]] = strconv.FormatInt(rand.Int63()%1000+1000, 10)
} else {
keys[i] = testUint64Hex(uint64(i))
vals[i] = strconv.FormatInt(rand.Int63()%1000+1000, 10)
}
}
if set {
t.StartTimer()
}
for i := 0; i < N; {
err := db.Update(func(tx *Tx) error {
var err error
for j := 0; j < batch && i < N; j++ {
_, _, err = tx.Set(keys[i], vals[i], nil)
i++
}
return err
})
if err != nil {
t.Fatal(err)
}
}
if set {
t.StopTimer()
}
var n uint64
err = db.View(func(tx *Tx) error {
err := tx.Ascend("", func(key, value string) bool {
n2 := textHexUint64(key)
if n2 != n {
t.Fatalf("expecting '%v', got '%v'", n2, n)
}
n++
return true
})
return err
})
if err != nil {
t.Fatal(err)
}
if n != uint64(N) {
t.Fatal("expecting '%v', got '%v'", N, n)
}
t.StartTimer()
return db, keys, vals
}
func benchSetGet(t *testing.B, set, persist, random bool, batch int) {
N := t.N
for N > 0 {
n := 0
if N >= 100000 {
n = 100000
} else {
n = N
}
func() {
db, keys, _ := benchOpenFillData(t, n, set, persist, random, false, batch)
defer benchClose(t, persist, db)
if !set {
for i := 0; i < n; {
err := db.View(func(tx *Tx) error {
var err error
for j := 0; j < batch && i < n; j++ {
_, err = tx.Get(keys[i])
i++
}
return err
})
if err != nil {
t.Fatal(err)
}
}
}
}()
N -= n
}
}
// Set Persist
func Benchmark_Set_Persist_Random_1(t *testing.B) {
benchSetGet(t, true, true, true, 1)
}
func Benchmark_Set_Persist_Random_10(t *testing.B) {
benchSetGet(t, true, true, true, 10)
}
func Benchmark_Set_Persist_Random_100(t *testing.B) {
benchSetGet(t, true, true, true, 100)
}
func Benchmark_Set_Persist_Sequential_1(t *testing.B) {
benchSetGet(t, true, true, false, 1)
}
func Benchmark_Set_Persist_Sequential_10(t *testing.B) {
benchSetGet(t, true, true, false, 10)
}
func Benchmark_Set_Persist_Sequential_100(t *testing.B) {
benchSetGet(t, true, true, false, 100)
}
// Set NoPersist
func Benchmark_Set_NoPersist_Random_1(t *testing.B) {
benchSetGet(t, true, false, true, 1)
}
func Benchmark_Set_NoPersist_Random_10(t *testing.B) {
benchSetGet(t, true, false, true, 10)
}
func Benchmark_Set_NoPersist_Random_100(t *testing.B) {
benchSetGet(t, true, false, true, 100)
}
func Benchmark_Set_NoPersist_Sequential_1(t *testing.B) {
benchSetGet(t, true, false, false, 1)
}
func Benchmark_Set_NoPersist_Sequential_10(t *testing.B) {
benchSetGet(t, true, false, false, 10)
}
func Benchmark_Set_NoPersist_Sequential_100(t *testing.B) {
benchSetGet(t, true, false, false, 100)
}
// Get
func Benchmark_Get_1(t *testing.B) {
benchSetGet(t, false, false, false, 1)
}
func Benchmark_Get_10(t *testing.B) {
benchSetGet(t, false, false, false, 10)
}
func Benchmark_Get_100(t *testing.B) {
benchSetGet(t, false, false, false, 100)
}
func benchScan(t *testing.B, asc bool, count int) {
N := count
db, _, _ := benchOpenFillData(t, N, false, false, false, false, 100)
defer benchClose(t, false, db)
for i := 0; i < t.N; i++ {
count := 0
err := db.View(func(tx *Tx) error {
if asc {
return tx.Ascend("", func(key, val string) bool {
count++
return true
})
} else {
return tx.Descend("", func(key, val string) bool {
count++
return true
})
}
})
if err != nil {
t.Fatal(err)
}
if count != N {
t.Fatalf("expecting '%v', got '%v'", N, count)
}
}
}
func Benchmark_Ascend_1(t *testing.B) {
benchScan(t, true, 1)
}
func Benchmark_Ascend_10(t *testing.B) {
benchScan(t, true, 10)
}
func Benchmark_Ascend_100(t *testing.B) {
benchScan(t, true, 100)
}
func Benchmark_Ascend_1000(t *testing.B) {
benchScan(t, true, 1000)
}
func Benchmark_Ascend_10000(t *testing.B) {
benchScan(t, true, 10000)
}
func Benchmark_Descend_1(t *testing.B) {
benchScan(t, false, 1)
}
func Benchmark_Descend_10(t *testing.B) {
benchScan(t, false, 10)
}
func Benchmark_Descend_100(t *testing.B) {
benchScan(t, false, 100)
}
func Benchmark_Descend_1000(t *testing.B) {
benchScan(t, false, 1000)
}
func Benchmark_Descend_10000(t *testing.B) {
benchScan(t, false, 10000)
}
/*
func Benchmark_Spatial_2D(t *testing.B) {
N := 100000
db, _, _ := benchOpenFillData(t, N, true, true, false, true, 100)
defer benchClose(t, false, db)
}
*/