From 6dbb18799b3b5b3832b58aaed051fe0ac04df6e9 Mon Sep 17 00:00:00 2001 From: Josh Baker Date: Sun, 2 Oct 2016 07:14:14 -0700 Subject: [PATCH] updated buffer writing --- buntdb.go | 109 ++++++++++++++++++++++++++----------------------- buntdb_test.go | 2 +- 2 files changed, 58 insertions(+), 53 deletions(-) diff --git a/buntdb.go b/buntdb.go index 43af1b1..25624bb 100644 --- a/buntdb.go +++ b/buntdb.go @@ -65,7 +65,7 @@ var ( type DB struct { mu sync.RWMutex // the gatekeeper for all fields file *os.File // the underlying file - buf *bytes.Buffer // a buffer to write to + buf []byte // a buffer to write to keys *btree.BTree // a tree of all item ordered by key exps *btree.BTree // a tree of items ordered by expiration idxs map[string]*index // the index trees. @@ -139,8 +139,6 @@ func Open(path string) (*DB, error) { db.keys = btree.New(btreeDegrees, nil) db.exps = btree.New(btreeDegrees, &exctx{db}) db.idxs = make(map[string]*index) - // initialize reusable blank buffer - db.buf = &bytes.Buffer{} // initialize default configuration db.config = Config{ SyncPolicy: EverySecond, @@ -198,17 +196,18 @@ func (db *DB) Save(wr io.Writer) error { db.mu.RLock() defer db.mu.RUnlock() // use a buffered writer and flush every 4MB - w := bufio.NewWriter(wr) + var buf []byte // iterated through every item in the database and write to the buffer db.keys.Ascend(func(item btree.Item) bool { dbi := item.(*dbItem) - dbi.writeSetTo(w) - if w.Buffered() > 1024*1024*4 { + buf = dbi.writeSetTo(buf) + if len(buf) > 1024*1024*4 { // flush when buffer is over 4MB - err = w.Flush() + _, err = wr.Write(buf) if err != nil { return false } + buf = buf[:0] } return true }) @@ -216,9 +215,11 @@ func (db *DB) Save(wr io.Writer) error { return err } // one final flush - err = w.Flush() - if err != nil { - return err + if len(buf) > 0 { + _, err = wr.Write(buf) + if err != nil { + return err + } } return nil } @@ -635,7 +636,7 @@ func (db *DB) Shrink() error { // we are going to read items in as chunks as to not hold up the database // for too long. - buf := &bytes.Buffer{} + var buf []byte pivot := "" done := false for !done { @@ -645,25 +646,28 @@ func (db *DB) Shrink() error { if db.closed { return ErrDatabaseClosed } - n := 0 done = true + var n int db.keys.AscendGreaterOrEqual(&dbItem{key: pivot}, func(item btree.Item) bool { dbi := item.(*dbItem) - if n > 100 { + // 1000 items or 64MB buffer + if n > 1000 || len(buf) > 64*1024*1024 { pivot = dbi.key done = false return false } - dbi.writeSetTo(buf) + buf = dbi.writeSetTo(buf) n++ return true }, ) - if _, err := f.Write(buf.Bytes()); err != nil { - return err + if len(buf) > 0 { + if _, err := f.Write(buf); err != nil { + return err + } + buf = buf[:0] } - buf.Reset() return nil }() if err != nil { @@ -1101,23 +1105,23 @@ func (tx *Tx) commit() error { } var err error if tx.db.persist && (len(tx.wc.commitItems) > 0 || tx.wc.rbkeys != nil) { - tx.db.buf.Reset() + tx.db.buf = tx.db.buf[:0] // write a flushdb if a deleteAll was called. if tx.wc.rbkeys != nil { - tx.db.buf.WriteString("*1\r\n$7\r\nflushdb\r\n") + tx.db.buf = append(tx.db.buf, "*1\r\n$7\r\nflushdb\r\n"...) } // Each committed record is written to disk for key, item := range tx.wc.commitItems { if item == nil { - (&dbItem{key: key}).writeDeleteTo(tx.db.buf) + tx.db.buf = (&dbItem{key: key}).writeDeleteTo(tx.db.buf) } else { - item.writeSetTo(tx.db.buf) + tx.db.buf = item.writeSetTo(tx.db.buf) } } // Flushing the buffer only once per transaction. // If this operation fails then the write did failed and we must // rollback. - if _, err = tx.db.file.Write(tx.db.buf.Bytes()); err != nil { + if _, err = tx.db.file.Write(tx.db.buf); err != nil { tx.rollbackInner() } if tx.db.config.SyncPolicy == Always { @@ -1165,46 +1169,47 @@ type dbItem struct { opts *dbItemOpts // optional meta information } -type byteWriter interface { - WriteByte(byte) error - WriteString(string) (int, error) +func appendArray(buf []byte, count int) []byte { + buf = append(buf, '*') + buf = append(buf, strconv.FormatInt(int64(count), 10)...) + buf = append(buf, '\r', '\n') + return buf } -// writeHead writes the resp header part -func writeHead(wr byteWriter, c byte, n int) int { - wr.WriteByte(c) - nn, _ := wr.WriteString(strconv.FormatInt(int64(n), 10)) - wr.WriteString("\r\n") - return nn + 3 -} - -// writeMultiBulk writes a resp array -func writeMultiBulk(wr byteWriter, bulks ...string) int { - n := writeHead(wr, '*', len(bulks)) - for _, bulk := range bulks { - nn := writeHead(wr, '$', len(bulk)) - wr.WriteString(bulk) - wr.WriteString("\r\n") - n += nn + len(bulk) + 2 - } - return n +func appendBulkString(buf []byte, s string) []byte { + buf = append(buf, '$') + buf = append(buf, strconv.FormatInt(int64(len(s)), 10)...) + buf = append(buf, '\r', '\n') + buf = append(buf, s...) + buf = append(buf, '\r', '\n') + return buf } // writeSetTo writes an item as a single SET record to the a bufio Writer. -func (dbi *dbItem) writeSetTo(wr byteWriter) int { +func (dbi *dbItem) writeSetTo(buf []byte) []byte { if dbi.opts != nil && dbi.opts.ex { - ex := strconv.FormatUint( - uint64(dbi.opts.exat.Sub(time.Now())/time.Second), - 10, - ) - return writeMultiBulk(wr, "set", dbi.key, dbi.val, "ex", ex) + ex := dbi.opts.exat.Sub(time.Now()) / time.Second + buf = appendArray(buf, 5) + buf = appendBulkString(buf, "set") + buf = appendBulkString(buf, dbi.key) + buf = appendBulkString(buf, dbi.val) + buf = appendBulkString(buf, "ex") + buf = appendBulkString(buf, strconv.FormatUint(uint64(ex), 10)) + } else { + buf = appendArray(buf, 3) + buf = appendBulkString(buf, "set") + buf = appendBulkString(buf, dbi.key) + buf = appendBulkString(buf, dbi.val) } - return writeMultiBulk(wr, "set", dbi.key, dbi.val) + return buf } // writeSetTo writes an item as a single DEL record to the a bufio Writer. -func (dbi *dbItem) writeDeleteTo(wr byteWriter) int { - return writeMultiBulk(wr, "del", dbi.key) +func (dbi *dbItem) writeDeleteTo(buf []byte) []byte { + buf = appendArray(buf, 2) + buf = appendBulkString(buf, "del") + buf = appendBulkString(buf, dbi.key) + return buf } // expired evaluates id the item has expired. This will always return false when diff --git a/buntdb_test.go b/buntdb_test.go index 1b34dd5..58d841e 100644 --- a/buntdb_test.go +++ b/buntdb_test.go @@ -593,7 +593,7 @@ func TestVariousTx(t *testing.T) { if _, err := db.file.Seek(0, 2); err != nil { t.Fatal(err) } - db.buf = &bytes.Buffer{} + db.buf = nil if err := db.CreateIndex("blank", "*", nil); err != nil { t.Fatal(err) }