Load and Save for :memory: databases

This commit is contained in:
Josh Baker 2016-08-28 17:35:46 -07:00
parent 7fb2c48afb
commit 9f0b48341d
2 changed files with 145 additions and 27 deletions

116
buntdb.go
View File

@ -50,6 +50,10 @@ var (
// ErrShrinkInProcess is returned when a shrink operation is in-process.
ErrShrinkInProcess = errors.New("shrink is in-process")
// ErrPersistenceActive is returned when post-loading data from an database
// not opened with Open(":memory:").
ErrPersistenceActive = errors.New("persistence active")
)
// DB represents a collection of key-value pairs that persist on disk.
@ -171,6 +175,48 @@ func (db *DB) Close() error {
return nil
}
// Save writes a snapshot of the database to a writer. This operation blocks all
// writes, but not reads.
func (db *DB) Save(wr io.Writer) error {
var err error
db.mu.RLock()
defer db.mu.RUnlock()
w := bufio.NewWriter(wr)
db.keys.Ascend(
func(item btree.Item) bool {
dbi := item.(*dbItem)
dbi.writeSetTo(w)
if w.Buffered() > 1024*20 { // 20MB buffer
err = w.Flush()
if err != nil {
return false
}
}
return true
},
)
if err != nil {
return err
}
err = w.Flush()
if err != nil {
return err
}
return nil
}
// Load loads commands from reader. This operation blocks all reads and writes.
// Note that this can only work for fully in-memory databases opened with
// Open(":memory:").
func (db *DB) Load(rd io.Reader) error {
db.mu.Lock()
defer db.mu.Unlock()
if db.persist {
return ErrPersistenceActive
}
return db.readLoad(rd, time.Now())
}
// index represents a b-tree or r-tree index and also acts as the
// b-tree/r-tree context for itself.
type index struct {
@ -652,20 +698,13 @@ func (db *DB) Shrink() error {
var errValidEOF = errors.New("valid eof")
// load reads entries from the append only database file and fills the database.
// The file format uses the Redis append only file format, which is and a series
// of RESP commands. For more information on RESP please read
// http://redis.io/topics/protocol. The only supported RESP commands are DEL and
// SET.
func (db *DB) load() error {
fi, err := db.file.Stat()
if err != nil {
return err
}
modTime := fi.ModTime()
// readLoad reads from the reader and loads commands into the database.
// modTime is the modified time of the reader, should be no greater than
// the current time.Now().
func (db *DB) readLoad(rd io.Reader, modTime time.Time) error {
data := make([]byte, 4096)
parts := make([]string, 0, 8)
r := bufio.NewReader(db.file)
r := bufio.NewReader(rd)
for {
// read a single command.
// first we should read the number of parts that the of the command
@ -797,6 +836,22 @@ func (db *DB) load() error {
return ErrInvalid
}
}
return nil
}
// load reads entries from the append only database file and fills the database.
// The file format uses the Redis append only file format, which is and a series
// of RESP commands. For more information on RESP please read
// http://redis.io/topics/protocol. The only supported RESP commands are DEL and
// SET.
func (db *DB) load() error {
fi, err := db.file.Stat()
if err != nil {
return err
}
if err := db.readLoad(db.file, fi.ModTime()); err != nil {
return err
}
pos, err := db.file.Seek(0, 2)
if err != nil {
return err
@ -1012,39 +1067,46 @@ type dbItem struct {
opts *dbItemOpts // optional meta information
}
type byteWriter interface {
WriteByte(byte) error
WriteString(string) (int, error)
}
// writeHead writes the resp header part
func writeHead(wr *bytes.Buffer, c byte, n int) {
_ = wr.WriteByte(c)
_, _ = wr.WriteString(strconv.FormatInt(int64(n), 10))
_, _ = wr.WriteString("\r\n")
func writeHead(wr byteWriter, c byte, n int) int {
wr.WriteByte(c)
nn, _ := wr.WriteString(strconv.FormatInt(int64(n), 10))
wr.WriteString("\r\n")
return nn + 3
}
// writeMultiBulk writes a resp array
func writeMultiBulk(wr *bytes.Buffer, bulks ...string) {
writeHead(wr, '*', len(bulks))
func writeMultiBulk(wr byteWriter, bulks ...string) int {
n := writeHead(wr, '*', len(bulks))
for _, bulk := range bulks {
writeHead(wr, '$', len(bulk))
_, _ = wr.WriteString(bulk)
_, _ = wr.WriteString("\r\n")
nn := writeHead(wr, '$', len(bulk))
wr.WriteString(bulk)
wr.WriteString("\r\n")
n += nn + len(bulk) + 2
}
return n
}
// writeSetTo writes an item as a single SET record to the a bufio Writer.
func (dbi *dbItem) writeSetTo(wr *bytes.Buffer) {
func (dbi *dbItem) writeSetTo(wr byteWriter) int {
if dbi.opts != nil && dbi.opts.ex {
ex := strconv.FormatUint(
uint64(dbi.opts.exat.Sub(time.Now())/time.Second),
10,
)
writeMultiBulk(wr, "set", dbi.key, dbi.val, "ex", ex)
} else {
writeMultiBulk(wr, "set", dbi.key, dbi.val)
return writeMultiBulk(wr, "set", dbi.key, dbi.val, "ex", ex)
}
return writeMultiBulk(wr, "set", dbi.key, dbi.val)
}
// writeSetTo writes an item as a single DEL record to the a bufio Writer.
func (dbi *dbItem) writeDeleteTo(wr *bytes.Buffer) {
writeMultiBulk(wr, "del", dbi.key)
func (dbi *dbItem) writeDeleteTo(wr byteWriter) int {
return writeMultiBulk(wr, "del", dbi.key)
}
// expired evaluates id the item has expired. This will always return false when

View File

@ -89,6 +89,62 @@ func TestBackgroudOperations(t *testing.T) {
t.Fatalf("expecting '%v', got '%v'", 200, n)
}
}
func TestSaveLoad(t *testing.T) {
db, _ := Open(":memory:")
defer db.Close()
if err := db.Update(func(tx *Tx) error {
for i := 0; i < 20; i++ {
_, _, err := tx.Set(fmt.Sprintf("key:%d", i), fmt.Sprintf("planet:%d", i), nil)
if err != nil {
return err
}
}
return nil
}); err != nil {
t.Fatal(err)
}
f, err := os.Create("temp.db")
if err != nil {
t.Fatal(err)
}
defer func() {
f.Close()
os.RemoveAll("temp.db")
}()
if err := db.Save(f); err != nil {
t.Fatal(err)
}
if err := f.Close(); err != nil {
t.Fatal(err)
}
db.Close()
db, _ = Open(":memory:")
defer db.Close()
f, err = os.Open("temp.db")
if err != nil {
t.Fatal(err)
}
defer f.Close()
if err := db.Load(f); err != nil {
t.Fatal(err)
}
if err := db.View(func(tx *Tx) error {
for i := 0; i < 20; i++ {
ex := fmt.Sprintf("planet:%d", i)
val, err := tx.Get(fmt.Sprintf("key:%d", i))
if err != nil {
return err
}
if ex != val {
t.Fatalf("expected %s, got %s", ex, val)
}
}
return nil
}); err != nil {
t.Fatal(err)
}
}
func TestVariousTx(t *testing.T) {
db := testOpen(t)
defer testClose(db)