replaced bufferedwriter with bytes.buffer

This commit is contained in:
Josh Baker 2016-08-19 10:06:03 -07:00
parent 9301f51db3
commit 18e872e092
2 changed files with 19 additions and 22 deletions

View File

@ -63,7 +63,7 @@ type Iterator func(key, val string) bool
type DB struct { type DB struct {
mu sync.RWMutex // the gatekeeper for all fields mu sync.RWMutex // the gatekeeper for all fields
file *os.File // the underlying file file *os.File // the underlying file
bufw *bufio.Writer // only write to this buf *bytes.Buffer // a buffer to write to
keys *btree.BTree // a tree of all item ordered by key keys *btree.BTree // a tree of all item ordered by key
exps *btree.BTree // a tree of items ordered by expiration exps *btree.BTree // a tree of items ordered by expiration
idxs map[string]*index // the index trees. idxs map[string]*index // the index trees.
@ -132,6 +132,7 @@ func Open(path string) (*DB, error) {
db.keys = btree.New(btreeDegrees, nil) db.keys = btree.New(btreeDegrees, nil)
db.exps = btree.New(btreeDegrees, &exctx{db}) db.exps = btree.New(btreeDegrees, &exctx{db})
db.idxs = make(map[string]*index) db.idxs = make(map[string]*index)
db.buf = &bytes.Buffer{}
db.config = Config{ db.config = Config{
SyncPolicy: EverySecond, SyncPolicy: EverySecond,
AutoShrinkPercentage: 100, AutoShrinkPercentage: 100,
@ -149,7 +150,6 @@ func Open(path string) (*DB, error) {
_ = db.file.Close() _ = db.file.Close()
return nil, err return nil, err
} }
db.bufw = bufio.NewWriter(db.file)
} }
// start the background manager. // start the background manager.
go db.backgroundManager() go db.backgroundManager()
@ -172,7 +172,7 @@ func (db *DB) Close() error {
} }
// Let's release all references to nil. This will help both with debugging // Let's release all references to nil. This will help both with debugging
// late usage panics and it provides a hint to the garbage collector // late usage panics and it provides a hint to the garbage collector
db.keys, db.exps, db.idxs, db.file, db.bufw = nil, nil, nil, nil, nil db.keys, db.exps, db.idxs, db.file = nil, nil, nil, nil
return nil return nil
} }
@ -275,6 +275,7 @@ func (db *DB) createIndex(
if rect != nil { if rect != nil {
idx.rtr = rtree.New(idx) idx.rtr = rtree.New(idx)
} }
db.keys.Ascend(func(item btree.Item) bool { db.keys.Ascend(func(item btree.Item) bool {
dbi := item.(*dbItem) dbi := item.(*dbItem)
if !wildcardMatch(dbi.key, idx.pattern) { if !wildcardMatch(dbi.key, idx.pattern) {
@ -565,7 +566,7 @@ func (db *DB) Shrink() error {
// we are going to read items in as chunks as to not hold up the database // we are going to read items in as chunks as to not hold up the database
// for too long. // for too long.
wr := bufio.NewWriter(f) buf := &bytes.Buffer{}
pivot := "" pivot := ""
done := false done := false
for !done { for !done {
@ -585,14 +586,15 @@ func (db *DB) Shrink() error {
done = false done = false
return false return false
} }
dbi.writeSetTo(wr) dbi.writeSetTo(buf)
n++ n++
return true return true
}, },
) )
if err := wr.Flush(); err != nil { if _, err := f.Write(buf.Bytes()); err != nil {
return err return err
} }
buf.Reset()
return nil return nil
}() }()
if err != nil { if err != nil {
@ -648,8 +650,6 @@ func (db *DB) Shrink() error {
if err != nil { if err != nil {
return err return err
} }
// reset the bufio writer
db.bufw = bufio.NewWriter(db.file)
db.lastaofsz = int(pos) db.lastaofsz = int(pos)
return nil return nil
}() }()
@ -949,17 +949,18 @@ func (tx *Tx) commit() error {
var err error var err error
if tx.db.persist && len(tx.commits) > 0 { if tx.db.persist && len(tx.commits) > 0 {
// Each committed record is written to disk // Each committed record is written to disk
tx.db.buf.Reset()
for key, item := range tx.commits { for key, item := range tx.commits {
if item == nil { if item == nil {
(&dbItem{key: key}).writeDeleteTo(tx.db.bufw) (&dbItem{key: key}).writeDeleteTo(tx.db.buf)
} else { } else {
item.writeSetTo(tx.db.bufw) item.writeSetTo(tx.db.buf)
} }
} }
// Flushing the buffer only once per transaction. // Flushing the buffer only once per transaction.
// If this operation fails then the write did failed and we must // If this operation fails then the write did failed and we must
// rollback. // rollback.
if err = tx.db.bufw.Flush(); err != nil { if _, err = tx.db.file.Write(tx.db.buf.Bytes()); err != nil {
tx.rollbackInner() tx.rollbackInner()
} }
if tx.db.config.SyncPolicy == Always { if tx.db.config.SyncPolicy == Always {
@ -1009,14 +1010,14 @@ type dbItem struct {
} }
// writeHead writes the resp header part // writeHead writes the resp header part
func writeHead(wr *bufio.Writer, c byte, n int) { func writeHead(wr *bytes.Buffer, c byte, n int) {
_ = wr.WriteByte(c) _ = wr.WriteByte(c)
_, _ = wr.WriteString(strconv.FormatInt(int64(n), 10)) _, _ = wr.WriteString(strconv.FormatInt(int64(n), 10))
_, _ = wr.WriteString("\r\n") _, _ = wr.WriteString("\r\n")
} }
// writeMultiBulk writes a resp array // writeMultiBulk writes a resp array
func writeMultiBulk(wr *bufio.Writer, bulks ...string) { func writeMultiBulk(wr *bytes.Buffer, bulks ...string) {
writeHead(wr, '*', len(bulks)) writeHead(wr, '*', len(bulks))
for _, bulk := range bulks { for _, bulk := range bulks {
writeHead(wr, '$', len(bulk)) writeHead(wr, '$', len(bulk))
@ -1026,7 +1027,7 @@ func writeMultiBulk(wr *bufio.Writer, bulks ...string) {
} }
// writeSetTo writes an item as a single SET record to the a bufio Writer. // writeSetTo writes an item as a single SET record to the a bufio Writer.
func (dbi *dbItem) writeSetTo(wr *bufio.Writer) { func (dbi *dbItem) writeSetTo(wr *bytes.Buffer) {
if dbi.opts != nil && dbi.opts.ex { if dbi.opts != nil && dbi.opts.ex {
ex := strconv.FormatUint( ex := strconv.FormatUint(
uint64(dbi.opts.exat.Sub(time.Now())/time.Second), uint64(dbi.opts.exat.Sub(time.Now())/time.Second),
@ -1039,7 +1040,7 @@ func (dbi *dbItem) writeSetTo(wr *bufio.Writer) {
} }
// writeSetTo writes an item as a single DEL record to the a bufio Writer. // writeSetTo writes an item as a single DEL record to the a bufio Writer.
func (dbi *dbItem) writeDeleteTo(wr *bufio.Writer) { func (dbi *dbItem) writeDeleteTo(wr *bytes.Buffer) {
writeMultiBulk(wr, "del", dbi.key) writeMultiBulk(wr, "del", dbi.key)
} }
@ -1171,11 +1172,8 @@ func (tx *Tx) Get(key string) (val string, err error) {
return "", ErrTxClosed return "", ErrTxClosed
} }
item := tx.db.get(key) item := tx.db.get(key)
if item == nil { if item == nil || item.expired() {
return "", ErrNotFound // The item does not exists or has expired. Let's assume that
}
if item.expired() {
// The item exists in the tree, but has expired. Let's assume that
// the caller is only interested in items that have not expired. // the caller is only interested in items that have not expired.
return "", ErrNotFound return "", ErrNotFound
} }

View File

@ -1,7 +1,6 @@
package buntdb package buntdb
import ( import (
"bufio"
"bytes" "bytes"
"errors" "errors"
"fmt" "fmt"
@ -230,7 +229,7 @@ func TestVariousTx(t *testing.T) {
if _, err := db.file.Seek(0, 2); err != nil { if _, err := db.file.Seek(0, 2); err != nil {
t.Fatal(err) t.Fatal(err)
} }
db.bufw = bufio.NewWriter(db.file) db.buf = &bytes.Buffer{}
if err := db.CreateIndex("blank", "*", nil); err != nil { if err := db.CreateIndex("blank", "*", nil); err != nil {
t.Fatal(err) t.Fatal(err)
} }