From 9f0b48341d020e5960ee32cafe22463baff86604 Mon Sep 17 00:00:00 2001 From: Josh Baker Date: Sun, 28 Aug 2016 17:35:46 -0700 Subject: [PATCH] Load and Save for :memory: databases --- buntdb.go | 116 +++++++++++++++++++++++++++++++++++++------------ buntdb_test.go | 56 ++++++++++++++++++++++++ 2 files changed, 145 insertions(+), 27 deletions(-) diff --git a/buntdb.go b/buntdb.go index f7b2a1f..774aa83 100644 --- a/buntdb.go +++ b/buntdb.go @@ -50,6 +50,10 @@ var ( // ErrShrinkInProcess is returned when a shrink operation is in-process. ErrShrinkInProcess = errors.New("shrink is in-process") + + // ErrPersistenceActive is returned when post-loading data from an database + // not opened with Open(":memory:"). + ErrPersistenceActive = errors.New("persistence active") ) // DB represents a collection of key-value pairs that persist on disk. @@ -171,6 +175,48 @@ func (db *DB) Close() error { return nil } +// Save writes a snapshot of the database to a writer. This operation blocks all +// writes, but not reads. +func (db *DB) Save(wr io.Writer) error { + var err error + db.mu.RLock() + defer db.mu.RUnlock() + w := bufio.NewWriter(wr) + db.keys.Ascend( + func(item btree.Item) bool { + dbi := item.(*dbItem) + dbi.writeSetTo(w) + if w.Buffered() > 1024*20 { // 20MB buffer + err = w.Flush() + if err != nil { + return false + } + } + return true + }, + ) + if err != nil { + return err + } + err = w.Flush() + if err != nil { + return err + } + return nil +} + +// Load loads commands from reader. This operation blocks all reads and writes. +// Note that this can only work for fully in-memory databases opened with +// Open(":memory:"). +func (db *DB) Load(rd io.Reader) error { + db.mu.Lock() + defer db.mu.Unlock() + if db.persist { + return ErrPersistenceActive + } + return db.readLoad(rd, time.Now()) +} + // index represents a b-tree or r-tree index and also acts as the // b-tree/r-tree context for itself. type index struct { @@ -652,20 +698,13 @@ func (db *DB) Shrink() error { var errValidEOF = errors.New("valid eof") -// load reads entries from the append only database file and fills the database. -// The file format uses the Redis append only file format, which is and a series -// of RESP commands. For more information on RESP please read -// http://redis.io/topics/protocol. The only supported RESP commands are DEL and -// SET. -func (db *DB) load() error { - fi, err := db.file.Stat() - if err != nil { - return err - } - modTime := fi.ModTime() +// readLoad reads from the reader and loads commands into the database. +// modTime is the modified time of the reader, should be no greater than +// the current time.Now(). +func (db *DB) readLoad(rd io.Reader, modTime time.Time) error { data := make([]byte, 4096) parts := make([]string, 0, 8) - r := bufio.NewReader(db.file) + r := bufio.NewReader(rd) for { // read a single command. // first we should read the number of parts that the of the command @@ -797,6 +836,22 @@ func (db *DB) load() error { return ErrInvalid } } + return nil +} + +// load reads entries from the append only database file and fills the database. +// The file format uses the Redis append only file format, which is and a series +// of RESP commands. For more information on RESP please read +// http://redis.io/topics/protocol. The only supported RESP commands are DEL and +// SET. +func (db *DB) load() error { + fi, err := db.file.Stat() + if err != nil { + return err + } + if err := db.readLoad(db.file, fi.ModTime()); err != nil { + return err + } pos, err := db.file.Seek(0, 2) if err != nil { return err @@ -1012,39 +1067,46 @@ type dbItem struct { opts *dbItemOpts // optional meta information } +type byteWriter interface { + WriteByte(byte) error + WriteString(string) (int, error) +} + // writeHead writes the resp header part -func writeHead(wr *bytes.Buffer, c byte, n int) { - _ = wr.WriteByte(c) - _, _ = wr.WriteString(strconv.FormatInt(int64(n), 10)) - _, _ = wr.WriteString("\r\n") +func writeHead(wr byteWriter, c byte, n int) int { + wr.WriteByte(c) + nn, _ := wr.WriteString(strconv.FormatInt(int64(n), 10)) + wr.WriteString("\r\n") + return nn + 3 } // writeMultiBulk writes a resp array -func writeMultiBulk(wr *bytes.Buffer, bulks ...string) { - writeHead(wr, '*', len(bulks)) +func writeMultiBulk(wr byteWriter, bulks ...string) int { + n := writeHead(wr, '*', len(bulks)) for _, bulk := range bulks { - writeHead(wr, '$', len(bulk)) - _, _ = wr.WriteString(bulk) - _, _ = wr.WriteString("\r\n") + nn := writeHead(wr, '$', len(bulk)) + wr.WriteString(bulk) + wr.WriteString("\r\n") + n += nn + len(bulk) + 2 } + return n } // writeSetTo writes an item as a single SET record to the a bufio Writer. -func (dbi *dbItem) writeSetTo(wr *bytes.Buffer) { +func (dbi *dbItem) writeSetTo(wr byteWriter) int { if dbi.opts != nil && dbi.opts.ex { ex := strconv.FormatUint( uint64(dbi.opts.exat.Sub(time.Now())/time.Second), 10, ) - writeMultiBulk(wr, "set", dbi.key, dbi.val, "ex", ex) - } else { - writeMultiBulk(wr, "set", dbi.key, dbi.val) + return writeMultiBulk(wr, "set", dbi.key, dbi.val, "ex", ex) } + return writeMultiBulk(wr, "set", dbi.key, dbi.val) } // writeSetTo writes an item as a single DEL record to the a bufio Writer. -func (dbi *dbItem) writeDeleteTo(wr *bytes.Buffer) { - writeMultiBulk(wr, "del", dbi.key) +func (dbi *dbItem) writeDeleteTo(wr byteWriter) int { + return writeMultiBulk(wr, "del", dbi.key) } // expired evaluates id the item has expired. This will always return false when diff --git a/buntdb_test.go b/buntdb_test.go index ffd40e4..da6909f 100644 --- a/buntdb_test.go +++ b/buntdb_test.go @@ -89,6 +89,62 @@ func TestBackgroudOperations(t *testing.T) { t.Fatalf("expecting '%v', got '%v'", 200, n) } } +func TestSaveLoad(t *testing.T) { + db, _ := Open(":memory:") + defer db.Close() + if err := db.Update(func(tx *Tx) error { + for i := 0; i < 20; i++ { + _, _, err := tx.Set(fmt.Sprintf("key:%d", i), fmt.Sprintf("planet:%d", i), nil) + if err != nil { + return err + } + } + return nil + }); err != nil { + t.Fatal(err) + } + f, err := os.Create("temp.db") + if err != nil { + t.Fatal(err) + } + defer func() { + f.Close() + os.RemoveAll("temp.db") + }() + if err := db.Save(f); err != nil { + t.Fatal(err) + } + if err := f.Close(); err != nil { + t.Fatal(err) + } + db.Close() + db, _ = Open(":memory:") + defer db.Close() + f, err = os.Open("temp.db") + if err != nil { + t.Fatal(err) + } + defer f.Close() + if err := db.Load(f); err != nil { + t.Fatal(err) + } + if err := db.View(func(tx *Tx) error { + for i := 0; i < 20; i++ { + ex := fmt.Sprintf("planet:%d", i) + val, err := tx.Get(fmt.Sprintf("key:%d", i)) + if err != nil { + return err + } + if ex != val { + t.Fatalf("expected %s, got %s", ex, val) + } + } + return nil + }); err != nil { + t.Fatal(err) + } +} + func TestVariousTx(t *testing.T) { db := testOpen(t) defer testClose(db)