tile38/vendor/github.com/tidwall/buntdb/buntdb.go

2129 lines
62 KiB
Go
Raw Normal View History

2016-09-12 07:25:09 +03:00
// Package buntdb implements a low-level in-memory key/value store in pure Go.
// It persists to disk, is ACID compliant, and uses locking for multiple
// readers and a single writer. Bunt is ideal for projects that need
// a dependable database, and favor speed over data size.
package buntdb
import (
"bufio"
"errors"
"io"
"os"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/tidwall/btree"
"github.com/tidwall/gjson"
"github.com/tidwall/grect"
2016-09-12 07:25:09 +03:00
"github.com/tidwall/match"
"github.com/tidwall/rtree"
)
var (
// ErrTxNotWritable is returned when performing a write operation on a
// read-only transaction.
ErrTxNotWritable = errors.New("tx not writable")
// ErrTxClosed is returned when committing or rolling back a transaction
// that has already been committed or rolled back.
ErrTxClosed = errors.New("tx closed")
// ErrNotFound is returned when an item or index is not in the database.
ErrNotFound = errors.New("not found")
// ErrInvalid is returned when the database file is an invalid format.
ErrInvalid = errors.New("invalid database")
// ErrDatabaseClosed is returned when the database is closed.
ErrDatabaseClosed = errors.New("database closed")
// ErrIndexExists is returned when an index already exists in the database.
ErrIndexExists = errors.New("index exists")
// ErrInvalidOperation is returned when an operation cannot be completed.
ErrInvalidOperation = errors.New("invalid operation")
// ErrInvalidSyncPolicy is returned for an invalid SyncPolicy value.
ErrInvalidSyncPolicy = errors.New("invalid sync policy")
// ErrShrinkInProcess is returned when a shrink operation is in-process.
ErrShrinkInProcess = errors.New("shrink is in-process")
// ErrPersistenceActive is returned when post-loading data from an database
// not opened with Open(":memory:").
ErrPersistenceActive = errors.New("persistence active")
// ErrTxIterating is returned when Set or Delete are called while iterating.
ErrTxIterating = errors.New("tx is iterating")
)
// DB represents a collection of key-value pairs that persist on disk.
// Transactions are used for all forms of data access to the DB.
type DB struct {
mu sync.RWMutex // the gatekeeper for all fields
file *os.File // the underlying file
buf []byte // a buffer to write to
2016-09-12 07:25:09 +03:00
keys *btree.BTree // a tree of all item ordered by key
exps *btree.BTree // a tree of items ordered by expiration
idxs map[string]*index // the index trees.
exmgr bool // indicates that expires manager is running.
flushes int // a count of the number of disk flushes
closed bool // set when the database has been closed
config Config // the database configuration
persist bool // do we write to disk
shrinking bool // when an aof shrink is in-process.
lastaofsz int // the size of the last shrink aof size
}
// SyncPolicy represents how often data is synced to disk.
type SyncPolicy int
const (
// Never is used to disable syncing data to disk.
// The faster and less safe method.
Never SyncPolicy = 0
// EverySecond is used to sync data to disk every second.
// It's pretty fast and you can lose 1 second of data if there
// is a disaster.
// This is the recommended setting.
EverySecond = 1
// Always is used to sync data after every write to disk.
// Slow. Very safe.
Always = 2
)
// Config represents database configuration options. These
// options are used to change various behaviors of the database.
type Config struct {
// SyncPolicy adjusts how often the data is synced to disk.
// This value can be Never, EverySecond, or Always.
// The default is EverySecond.
SyncPolicy SyncPolicy
// AutoShrinkPercentage is used by the background process to trigger
// a shrink of the aof file when the size of the file is larger than the
// percentage of the result of the previous shrunk file.
// For example, if this value is 100, and the last shrink process
// resulted in a 100mb file, then the new aof file must be 200mb before
// a shrink is triggered.
AutoShrinkPercentage int
// AutoShrinkMinSize defines the minimum size of the aof file before
// an automatic shrink can occur.
AutoShrinkMinSize int
// AutoShrinkDisabled turns off automatic background shrinking
AutoShrinkDisabled bool
// OnExpired is used to custom handle the deletion option when a key
// has been expired.
OnExpired func(keys []string)
2016-09-12 07:25:09 +03:00
}
// exctx is a simple b-tree context for ordering by expiration.
type exctx struct {
db *DB
}
// Default number of btree degrees
const btreeDegrees = 64
// Open opens a database at the provided path.
// If the file does not exist then it will be created automatically.
func Open(path string) (*DB, error) {
db := &DB{}
// initialize trees and indexes
db.keys = btree.New(btreeDegrees, nil)
db.exps = btree.New(btreeDegrees, &exctx{db})
db.idxs = make(map[string]*index)
// initialize default configuration
db.config = Config{
SyncPolicy: EverySecond,
AutoShrinkPercentage: 100,
AutoShrinkMinSize: 32 * 1024 * 1024,
}
// turn off persistence for pure in-memory
db.persist = path != ":memory:"
if db.persist {
var err error
// hardcoding 0666 as the default mode.
db.file, err = os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return nil, err
}
// load the database from disk
if err := db.load(); err != nil {
// close on error, ignore close error
_ = db.file.Close()
return nil, err
}
}
// start the background manager.
go db.backgroundManager()
return db, nil
}
// Close releases all database resources.
// All transactions must be closed before closing the database.
func (db *DB) Close() error {
db.mu.Lock()
defer db.mu.Unlock()
if db.closed {
return ErrDatabaseClosed
}
db.closed = true
if db.persist {
db.file.Sync() // do a sync but ignore the error
if err := db.file.Close(); err != nil {
return err
}
}
// Let's release all references to nil. This will help both with debugging
// late usage panics and it provides a hint to the garbage collector
db.keys, db.exps, db.idxs, db.file = nil, nil, nil, nil
return nil
}
// Save writes a snapshot of the database to a writer. This operation blocks all
// writes, but not reads. This can be used for snapshots and backups for pure
// in-memory databases using the ":memory:". Database that persist to disk
// can be snapshotted by simply copying the database file.
func (db *DB) Save(wr io.Writer) error {
var err error
db.mu.RLock()
defer db.mu.RUnlock()
// use a buffered writer and flush every 4MB
var buf []byte
2016-09-12 07:25:09 +03:00
// iterated through every item in the database and write to the buffer
db.keys.Ascend(func(item btree.Item) bool {
dbi := item.(*dbItem)
buf = dbi.writeSetTo(buf)
if len(buf) > 1024*1024*4 {
2016-09-12 07:25:09 +03:00
// flush when buffer is over 4MB
_, err = wr.Write(buf)
2016-09-12 07:25:09 +03:00
if err != nil {
return false
}
buf = buf[:0]
2016-09-12 07:25:09 +03:00
}
return true
})
if err != nil {
return err
}
// one final flush
if len(buf) > 0 {
_, err = wr.Write(buf)
if err != nil {
return err
}
2016-09-12 07:25:09 +03:00
}
return nil
}
// Load loads commands from reader. This operation blocks all reads and writes.
// Note that this can only work for fully in-memory databases opened with
// Open(":memory:").
func (db *DB) Load(rd io.Reader) error {
db.mu.Lock()
defer db.mu.Unlock()
if db.persist {
// cannot load into databases that persist to disk
return ErrPersistenceActive
}
return db.readLoad(rd, time.Now())
}
// index represents a b-tree or r-tree index and also acts as the
// b-tree/r-tree context for itself.
type index struct {
btr *btree.BTree // contains the items
rtr *rtree.RTree // contains the items
name string // name of the index
pattern string // a required key pattern
less func(a, b string) bool // less comparison function
rect func(item string) (min, max []float64) // rect from string function
db *DB // the origin database
opts IndexOptions // index options
}
// match matches the pattern to the key
func (idx *index) match(key string) bool {
if idx.pattern == "*" {
return true
}
if idx.opts.CaseInsensitiveKeyMatching {
for i := 0; i < len(key); i++ {
if key[i] >= 'A' && key[i] <= 'Z' {
key = strings.ToLower(key)
break
}
}
}
return match.Match(key, idx.pattern)
2016-09-12 07:25:09 +03:00
}
// clearCopy creates a copy of the index, but with an empty dataset.
func (idx *index) clearCopy() *index {
// copy the index meta information
nidx := &index{
name: idx.name,
pattern: idx.pattern,
db: idx.db,
less: idx.less,
rect: idx.rect,
opts: idx.opts,
2016-09-12 07:25:09 +03:00
}
// initialize with empty trees
if nidx.less != nil {
nidx.btr = btree.New(btreeDegrees, nidx)
}
if nidx.rect != nil {
nidx.rtr = rtree.New(nidx)
}
return nidx
}
// rebuild rebuilds the index
func (idx *index) rebuild() {
// initialize trees
if idx.less != nil {
idx.btr = btree.New(btreeDegrees, idx)
}
if idx.rect != nil {
idx.rtr = rtree.New(idx)
}
// iterate through all keys and fill the index
idx.db.keys.Ascend(func(item btree.Item) bool {
dbi := item.(*dbItem)
if !idx.match(dbi.key) {
// does not match the pattern, conintue
return true
}
if idx.less != nil {
idx.btr.ReplaceOrInsert(dbi)
}
if idx.rect != nil {
idx.rtr.Insert(dbi)
}
return true
})
}
2016-09-12 07:25:09 +03:00
// CreateIndex builds a new index and populates it with items.
// The items are ordered in an b-tree and can be retrieved using the
// Ascend* and Descend* methods.
// An error will occur if an index with the same name already exists.
//
// When a pattern is provided, the index will be populated with
// keys that match the specified pattern. This is a very simple pattern
// match where '*' matches on any number characters and '?' matches on
// any one character.
// The less function compares if string 'a' is less than string 'b'.
// It allows for indexes to create custom ordering. It's possible
// that the strings may be textual or binary. It's up to the provided
// less function to handle the content format and comparison.
// There are some default less function that can be used such as
// IndexString, IndexBinary, etc.
//
// Deprecated: Use Transactions
2016-09-12 07:25:09 +03:00
func (db *DB) CreateIndex(name, pattern string,
less ...func(a, b string) bool) error {
return db.Update(func(tx *Tx) error {
return tx.CreateIndex(name, pattern, less...)
})
2016-09-12 07:25:09 +03:00
}
// ReplaceIndex builds a new index and populates it with items.
// The items are ordered in an b-tree and can be retrieved using the
// Ascend* and Descend* methods.
// If a previous index with the same name exists, that index will be deleted.
//
// Deprecated: Use Transactions
2016-09-12 07:25:09 +03:00
func (db *DB) ReplaceIndex(name, pattern string,
less ...func(a, b string) bool) error {
return db.Update(func(tx *Tx) error {
err := tx.CreateIndex(name, pattern, less...)
if err != nil {
if err == ErrIndexExists {
err := tx.DropIndex(name)
if err != nil {
return err
}
return tx.CreateIndex(name, pattern, less...)
}
return err
}
return nil
})
2016-09-12 07:25:09 +03:00
}
// CreateSpatialIndex builds a new index and populates it with items.
// The items are organized in an r-tree and can be retrieved using the
// Intersects method.
// An error will occur if an index with the same name already exists.
//
// The rect function converts a string to a rectangle. The rectangle is
// represented by two arrays, min and max. Both arrays may have a length
// between 1 and 20, and both arrays must match in length. A length of 1 is a
// one dimensional rectangle, and a length of 4 is a four dimension rectangle.
// There is support for up to 20 dimensions.
// The values of min must be less than the values of max at the same dimension.
// Thus min[0] must be less-than-or-equal-to max[0].
// The IndexRect is a default function that can be used for the rect
// parameter.
//
// Deprecated: Use Transactions
2016-09-12 07:25:09 +03:00
func (db *DB) CreateSpatialIndex(name, pattern string,
rect func(item string) (min, max []float64)) error {
return db.Update(func(tx *Tx) error {
return tx.CreateSpatialIndex(name, pattern, rect)
})
2016-09-12 07:25:09 +03:00
}
// ReplaceSpatialIndex builds a new index and populates it with items.
// The items are organized in an r-tree and can be retrieved using the
// Intersects method.
// If a previous index with the same name exists, that index will be deleted.
//
// Deprecated: Use Transactions
2016-09-12 07:25:09 +03:00
func (db *DB) ReplaceSpatialIndex(name, pattern string,
rect func(item string) (min, max []float64)) error {
return db.Update(func(tx *Tx) error {
err := tx.CreateSpatialIndex(name, pattern, rect)
if err != nil {
if err == ErrIndexExists {
err := tx.DropIndex(name)
if err != nil {
return err
2016-09-12 07:25:09 +03:00
}
return tx.CreateSpatialIndex(name, pattern, rect)
2016-09-12 07:25:09 +03:00
}
return err
2016-09-12 07:25:09 +03:00
}
return nil
2016-09-12 07:25:09 +03:00
})
}
// DropIndex removes an index.
//
// Deprecated: Use Transactions
2016-09-12 07:25:09 +03:00
func (db *DB) DropIndex(name string) error {
return db.Update(func(tx *Tx) error {
return tx.DropIndex(name)
})
2016-09-12 07:25:09 +03:00
}
// Indexes returns a list of index names.
//
// Deprecated: Use Transactions
2016-09-12 07:25:09 +03:00
func (db *DB) Indexes() ([]string, error) {
var names []string
var err = db.View(func(tx *Tx) error {
var err error
names, err = tx.Indexes()
return err
})
return names, err
2016-09-12 07:25:09 +03:00
}
// ReadConfig returns the database configuration.
func (db *DB) ReadConfig(config *Config) error {
db.mu.RLock()
defer db.mu.RUnlock()
if db.closed {
return ErrDatabaseClosed
}
*config = db.config
return nil
}
// SetConfig updates the database configuration.
func (db *DB) SetConfig(config Config) error {
db.mu.Lock()
defer db.mu.Unlock()
if db.closed {
return ErrDatabaseClosed
}
switch config.SyncPolicy {
default:
return ErrInvalidSyncPolicy
case Never, EverySecond, Always:
}
db.config = config
return nil
}
// insertIntoDatabase performs inserts an item in to the database and updates
// all indexes. If a previous item with the same key already exists, that item
// will be replaced with the new one, and return the previous item.
func (db *DB) insertIntoDatabase(item *dbItem) *dbItem {
var pdbi *dbItem
prev := db.keys.ReplaceOrInsert(item)
if prev != nil {
// A previous item was removed from the keys tree. Let's
// fully delete this item from all indexes.
pdbi = prev.(*dbItem)
if pdbi.opts != nil && pdbi.opts.ex {
// Remove it from the exipres tree.
db.exps.Delete(pdbi)
}
for _, idx := range db.idxs {
if idx.btr != nil {
// Remove it from the btree index.
idx.btr.Delete(pdbi)
}
if idx.rtr != nil {
// Remove it from the rtree index.
idx.rtr.Remove(pdbi)
}
}
}
if item.opts != nil && item.opts.ex {
// The new item has eviction options. Add it to the
// expires tree
db.exps.ReplaceOrInsert(item)
}
for _, idx := range db.idxs {
if !idx.match(item.key) {
2016-09-12 07:25:09 +03:00
continue