updated buffer writing

This commit is contained in:
Josh Baker 2016-10-02 07:14:14 -07:00
parent f6366c8b4b
commit 6dbb18799b
2 changed files with 58 additions and 53 deletions

109
buntdb.go
View File

@ -65,7 +65,7 @@ var (
type DB struct { type DB struct {
mu sync.RWMutex // the gatekeeper for all fields mu sync.RWMutex // the gatekeeper for all fields
file *os.File // the underlying file file *os.File // the underlying file
buf *bytes.Buffer // a buffer to write to buf []byte // a buffer to write to
keys *btree.BTree // a tree of all item ordered by key keys *btree.BTree // a tree of all item ordered by key
exps *btree.BTree // a tree of items ordered by expiration exps *btree.BTree // a tree of items ordered by expiration
idxs map[string]*index // the index trees. idxs map[string]*index // the index trees.
@ -139,8 +139,6 @@ func Open(path string) (*DB, error) {
db.keys = btree.New(btreeDegrees, nil) db.keys = btree.New(btreeDegrees, nil)
db.exps = btree.New(btreeDegrees, &exctx{db}) db.exps = btree.New(btreeDegrees, &exctx{db})
db.idxs = make(map[string]*index) db.idxs = make(map[string]*index)
// initialize reusable blank buffer
db.buf = &bytes.Buffer{}
// initialize default configuration // initialize default configuration
db.config = Config{ db.config = Config{
SyncPolicy: EverySecond, SyncPolicy: EverySecond,
@ -198,17 +196,18 @@ func (db *DB) Save(wr io.Writer) error {
db.mu.RLock() db.mu.RLock()
defer db.mu.RUnlock() defer db.mu.RUnlock()
// use a buffered writer and flush every 4MB // use a buffered writer and flush every 4MB
w := bufio.NewWriter(wr) var buf []byte
// iterated through every item in the database and write to the buffer // iterated through every item in the database and write to the buffer
db.keys.Ascend(func(item btree.Item) bool { db.keys.Ascend(func(item btree.Item) bool {
dbi := item.(*dbItem) dbi := item.(*dbItem)
dbi.writeSetTo(w) buf = dbi.writeSetTo(buf)
if w.Buffered() > 1024*1024*4 { if len(buf) > 1024*1024*4 {
// flush when buffer is over 4MB // flush when buffer is over 4MB
err = w.Flush() _, err = wr.Write(buf)
if err != nil { if err != nil {
return false return false
} }
buf = buf[:0]
} }
return true return true
}) })
@ -216,9 +215,11 @@ func (db *DB) Save(wr io.Writer) error {
return err return err
} }
// one final flush // one final flush
err = w.Flush() if len(buf) > 0 {
if err != nil { _, err = wr.Write(buf)
return err if err != nil {
return err
}
} }
return nil return nil
} }
@ -635,7 +636,7 @@ func (db *DB) Shrink() error {
// we are going to read items in as chunks as to not hold up the database // we are going to read items in as chunks as to not hold up the database
// for too long. // for too long.
buf := &bytes.Buffer{} var buf []byte
pivot := "" pivot := ""
done := false done := false
for !done { for !done {
@ -645,25 +646,28 @@ func (db *DB) Shrink() error {
if db.closed { if db.closed {
return ErrDatabaseClosed return ErrDatabaseClosed
} }
n := 0
done = true done = true
var n int
db.keys.AscendGreaterOrEqual(&dbItem{key: pivot}, db.keys.AscendGreaterOrEqual(&dbItem{key: pivot},
func(item btree.Item) bool { func(item btree.Item) bool {
dbi := item.(*dbItem) dbi := item.(*dbItem)
if n > 100 { // 1000 items or 64MB buffer
if n > 1000 || len(buf) > 64*1024*1024 {
pivot = dbi.key pivot = dbi.key
done = false done = false
return false return false
} }
dbi.writeSetTo(buf) buf = dbi.writeSetTo(buf)
n++ n++
return true return true
}, },
) )
if _, err := f.Write(buf.Bytes()); err != nil { if len(buf) > 0 {
return err if _, err := f.Write(buf); err != nil {
return err
}
buf = buf[:0]
} }
buf.Reset()
return nil return nil
}() }()
if err != nil { if err != nil {
@ -1101,23 +1105,23 @@ func (tx *Tx) commit() error {
} }
var err error var err error
if tx.db.persist && (len(tx.wc.commitItems) > 0 || tx.wc.rbkeys != nil) { if tx.db.persist && (len(tx.wc.commitItems) > 0 || tx.wc.rbkeys != nil) {
tx.db.buf.Reset() tx.db.buf = tx.db.buf[:0]
// write a flushdb if a deleteAll was called. // write a flushdb if a deleteAll was called.
if tx.wc.rbkeys != nil { if tx.wc.rbkeys != nil {
tx.db.buf.WriteString("*1\r\n$7\r\nflushdb\r\n") tx.db.buf = append(tx.db.buf, "*1\r\n$7\r\nflushdb\r\n"...)
} }
// Each committed record is written to disk // Each committed record is written to disk
for key, item := range tx.wc.commitItems { for key, item := range tx.wc.commitItems {
if item == nil { if item == nil {
(&dbItem{key: key}).writeDeleteTo(tx.db.buf) tx.db.buf = (&dbItem{key: key}).writeDeleteTo(tx.db.buf)
} else { } else {
item.writeSetTo(tx.db.buf) tx.db.buf = item.writeSetTo(tx.db.buf)
} }
} }
// Flushing the buffer only once per transaction. // Flushing the buffer only once per transaction.
// If this operation fails then the write did failed and we must // If this operation fails then the write did failed and we must
// rollback. // rollback.
if _, err = tx.db.file.Write(tx.db.buf.Bytes()); err != nil { if _, err = tx.db.file.Write(tx.db.buf); err != nil {
tx.rollbackInner() tx.rollbackInner()
} }
if tx.db.config.SyncPolicy == Always { if tx.db.config.SyncPolicy == Always {
@ -1165,46 +1169,47 @@ type dbItem struct {
opts *dbItemOpts // optional meta information opts *dbItemOpts // optional meta information
} }
type byteWriter interface { func appendArray(buf []byte, count int) []byte {
WriteByte(byte) error buf = append(buf, '*')
WriteString(string) (int, error) buf = append(buf, strconv.FormatInt(int64(count), 10)...)
buf = append(buf, '\r', '\n')
return buf
} }
// writeHead writes the resp header part func appendBulkString(buf []byte, s string) []byte {
func writeHead(wr byteWriter, c byte, n int) int { buf = append(buf, '$')
wr.WriteByte(c) buf = append(buf, strconv.FormatInt(int64(len(s)), 10)...)
nn, _ := wr.WriteString(strconv.FormatInt(int64(n), 10)) buf = append(buf, '\r', '\n')
wr.WriteString("\r\n") buf = append(buf, s...)
return nn + 3 buf = append(buf, '\r', '\n')
} return buf
// writeMultiBulk writes a resp array
func writeMultiBulk(wr byteWriter, bulks ...string) int {
n := writeHead(wr, '*', len(bulks))
for _, bulk := range bulks {
nn := writeHead(wr, '$', len(bulk))
wr.WriteString(bulk)
wr.WriteString("\r\n")
n += nn + len(bulk) + 2
}
return n
} }
// writeSetTo writes an item as a single SET record to the a bufio Writer. // writeSetTo writes an item as a single SET record to the a bufio Writer.
func (dbi *dbItem) writeSetTo(wr byteWriter) int { func (dbi *dbItem) writeSetTo(buf []byte) []byte {
if dbi.opts != nil && dbi.opts.ex { if dbi.opts != nil && dbi.opts.ex {
ex := strconv.FormatUint( ex := dbi.opts.exat.Sub(time.Now()) / time.Second
uint64(dbi.opts.exat.Sub(time.Now())/time.Second), buf = appendArray(buf, 5)
10, buf = appendBulkString(buf, "set")
) buf = appendBulkString(buf, dbi.key)
return writeMultiBulk(wr, "set", dbi.key, dbi.val, "ex", ex) buf = appendBulkString(buf, dbi.val)
buf = appendBulkString(buf, "ex")
buf = appendBulkString(buf, strconv.FormatUint(uint64(ex), 10))
} else {
buf = appendArray(buf, 3)
buf = appendBulkString(buf, "set")
buf = appendBulkString(buf, dbi.key)
buf = appendBulkString(buf, dbi.val)
} }
return writeMultiBulk(wr, "set", dbi.key, dbi.val) return buf
} }
// writeSetTo writes an item as a single DEL record to the a bufio Writer. // writeSetTo writes an item as a single DEL record to the a bufio Writer.
func (dbi *dbItem) writeDeleteTo(wr byteWriter) int { func (dbi *dbItem) writeDeleteTo(buf []byte) []byte {
return writeMultiBulk(wr, "del", dbi.key) buf = appendArray(buf, 2)
buf = appendBulkString(buf, "del")
buf = appendBulkString(buf, dbi.key)
return buf
} }
// expired evaluates id the item has expired. This will always return false when // expired evaluates id the item has expired. This will always return false when

View File

@ -593,7 +593,7 @@ func TestVariousTx(t *testing.T) {
if _, err := db.file.Seek(0, 2); err != nil { if _, err := db.file.Seek(0, 2); err != nil {
t.Fatal(err) t.Fatal(err)
} }
db.buf = &bytes.Buffer{} db.buf = nil
if err := db.CreateIndex("blank", "*", nil); err != nil { if err := db.CreateIndex("blank", "*", nil); err != nil {
t.Fatal(err) t.Fatal(err)
} }