diff --git a/buntdb.go b/buntdb.go index 4c5926d..21f8b7c 100644 --- a/buntdb.go +++ b/buntdb.go @@ -63,7 +63,7 @@ type Iterator func(key, val string) bool type DB struct { mu sync.RWMutex // the gatekeeper for all fields file *os.File // the underlying file - bufw *bufio.Writer // only write to this + buf *bytes.Buffer // a buffer to write to keys *btree.BTree // a tree of all item ordered by key exps *btree.BTree // a tree of items ordered by expiration idxs map[string]*index // the index trees. @@ -132,6 +132,7 @@ func Open(path string) (*DB, error) { db.keys = btree.New(btreeDegrees, nil) db.exps = btree.New(btreeDegrees, &exctx{db}) db.idxs = make(map[string]*index) + db.buf = &bytes.Buffer{} db.config = Config{ SyncPolicy: EverySecond, AutoShrinkPercentage: 100, @@ -149,7 +150,6 @@ func Open(path string) (*DB, error) { _ = db.file.Close() return nil, err } - db.bufw = bufio.NewWriter(db.file) } // start the background manager. go db.backgroundManager() @@ -172,7 +172,7 @@ func (db *DB) Close() error { } // Let's release all references to nil. This will help both with debugging // late usage panics and it provides a hint to the garbage collector - db.keys, db.exps, db.idxs, db.file, db.bufw = nil, nil, nil, nil, nil + db.keys, db.exps, db.idxs, db.file = nil, nil, nil, nil return nil } @@ -275,6 +275,7 @@ func (db *DB) createIndex( if rect != nil { idx.rtr = rtree.New(idx) } + db.keys.Ascend(func(item btree.Item) bool { dbi := item.(*dbItem) if !wildcardMatch(dbi.key, idx.pattern) { @@ -565,7 +566,7 @@ func (db *DB) Shrink() error { // we are going to read items in as chunks as to not hold up the database // for too long. - wr := bufio.NewWriter(f) + buf := &bytes.Buffer{} pivot := "" done := false for !done { @@ -585,14 +586,15 @@ func (db *DB) Shrink() error { done = false return false } - dbi.writeSetTo(wr) + dbi.writeSetTo(buf) n++ return true }, ) - if err := wr.Flush(); err != nil { + if _, err := f.Write(buf.Bytes()); err != nil { return err } + buf.Reset() return nil }() if err != nil { @@ -648,8 +650,6 @@ func (db *DB) Shrink() error { if err != nil { return err } - // reset the bufio writer - db.bufw = bufio.NewWriter(db.file) db.lastaofsz = int(pos) return nil }() @@ -949,17 +949,18 @@ func (tx *Tx) commit() error { var err error if tx.db.persist && len(tx.commits) > 0 { // Each committed record is written to disk + tx.db.buf.Reset() for key, item := range tx.commits { if item == nil { - (&dbItem{key: key}).writeDeleteTo(tx.db.bufw) + (&dbItem{key: key}).writeDeleteTo(tx.db.buf) } else { - item.writeSetTo(tx.db.bufw) + item.writeSetTo(tx.db.buf) } } // Flushing the buffer only once per transaction. // If this operation fails then the write did failed and we must // rollback. - if err = tx.db.bufw.Flush(); err != nil { + if _, err = tx.db.file.Write(tx.db.buf.Bytes()); err != nil { tx.rollbackInner() } if tx.db.config.SyncPolicy == Always { @@ -1009,14 +1010,14 @@ type dbItem struct { } // writeHead writes the resp header part -func writeHead(wr *bufio.Writer, c byte, n int) { +func writeHead(wr *bytes.Buffer, c byte, n int) { _ = wr.WriteByte(c) _, _ = wr.WriteString(strconv.FormatInt(int64(n), 10)) _, _ = wr.WriteString("\r\n") } // writeMultiBulk writes a resp array -func writeMultiBulk(wr *bufio.Writer, bulks ...string) { +func writeMultiBulk(wr *bytes.Buffer, bulks ...string) { writeHead(wr, '*', len(bulks)) for _, bulk := range bulks { writeHead(wr, '$', len(bulk)) @@ -1026,7 +1027,7 @@ func writeMultiBulk(wr *bufio.Writer, bulks ...string) { } // writeSetTo writes an item as a single SET record to the a bufio Writer. -func (dbi *dbItem) writeSetTo(wr *bufio.Writer) { +func (dbi *dbItem) writeSetTo(wr *bytes.Buffer) { if dbi.opts != nil && dbi.opts.ex { ex := strconv.FormatUint( uint64(dbi.opts.exat.Sub(time.Now())/time.Second), @@ -1039,7 +1040,7 @@ func (dbi *dbItem) writeSetTo(wr *bufio.Writer) { } // writeSetTo writes an item as a single DEL record to the a bufio Writer. -func (dbi *dbItem) writeDeleteTo(wr *bufio.Writer) { +func (dbi *dbItem) writeDeleteTo(wr *bytes.Buffer) { writeMultiBulk(wr, "del", dbi.key) } @@ -1171,11 +1172,8 @@ func (tx *Tx) Get(key string) (val string, err error) { return "", ErrTxClosed } item := tx.db.get(key) - if item == nil { - return "", ErrNotFound - } - if item.expired() { - // The item exists in the tree, but has expired. Let's assume that + if item == nil || item.expired() { + // The item does not exists or has expired. Let's assume that // the caller is only interested in items that have not expired. return "", ErrNotFound } diff --git a/buntdb_test.go b/buntdb_test.go index 1678b2e..4e84692 100644 --- a/buntdb_test.go +++ b/buntdb_test.go @@ -1,7 +1,6 @@ package buntdb import ( - "bufio" "bytes" "errors" "fmt" @@ -230,7 +229,7 @@ func TestVariousTx(t *testing.T) { if _, err := db.file.Seek(0, 2); err != nil { t.Fatal(err) } - db.bufw = bufio.NewWriter(db.file) + db.buf = &bytes.Buffer{} if err := db.CreateIndex("blank", "*", nil); err != nil { t.Fatal(err) }