From 06fce8810a79bca980d82b95ceff40dd2baad83c Mon Sep 17 00:00:00 2001 From: siddontang Date: Tue, 14 Mar 2017 21:55:16 +0800 Subject: [PATCH] revert goleveldb for replication compatibility --- .../syndtr/goleveldb/leveldb/batch.go | 377 +++++++---------- .../syndtr/goleveldb/leveldb/cache/cache.go | 37 +- .../syndtr/goleveldb/leveldb/comparer.go | 40 +- .../github.com/syndtr/goleveldb/leveldb/db.go | 78 ++-- .../syndtr/goleveldb/leveldb/db_compaction.go | 38 +- .../syndtr/goleveldb/leveldb/db_snapshot.go | 4 +- .../syndtr/goleveldb/leveldb/db_state.go | 46 +- .../goleveldb/leveldb/db_transaction.go | 68 +-- .../syndtr/goleveldb/leveldb/db_util.go | 2 +- .../syndtr/goleveldb/leveldb/db_write.go | 394 +++++++----------- .../syndtr/goleveldb/leveldb/errors.go | 1 - .../syndtr/goleveldb/leveldb/errors/errors.go | 6 +- .../syndtr/goleveldb/leveldb/iterator/iter.go | 15 +- .../goleveldb/leveldb/journal/journal.go | 43 +- .../syndtr/goleveldb/leveldb/key.go | 14 +- .../syndtr/goleveldb/leveldb/memdb/memdb.go | 12 +- .../syndtr/goleveldb/leveldb/opt/options.go | 24 -- .../syndtr/goleveldb/leveldb/session.go | 11 +- .../syndtr/goleveldb/leveldb/session_util.go | 35 +- .../goleveldb/leveldb/storage/file_storage.go | 6 +- .../leveldb/storage/file_storage_nacl.go | 34 -- .../goleveldb/leveldb/storage/mem_storage.go | 8 +- .../goleveldb/leveldb/storage/storage.go | 52 ++- .../syndtr/goleveldb/leveldb/table.go | 2 +- .../syndtr/goleveldb/leveldb/table/reader.go | 74 +--- .../syndtr/goleveldb/leveldb/table/writer.go | 2 +- .../syndtr/goleveldb/leveldb/util.go | 7 - .../syndtr/goleveldb/leveldb/version.go | 43 +- glide.lock | 6 +- glide.yaml | 2 +- store/rocksdb/batch.go | 5 +- store/store_test.go | 2 +- 32 files changed, 552 insertions(+), 936 deletions(-) delete mode 100644 _vendor/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_nacl.go diff --git a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/batch.go b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/batch.go index 2259200..5010067 100644 --- a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/batch.go +++ b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/batch.go @@ -9,15 +9,13 @@ package leveldb import ( "encoding/binary" "fmt" - "io" "github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/memdb" "github.com/syndtr/goleveldb/leveldb/storage" ) -// ErrBatchCorrupted records reason of batch corruption. This error will be -// wrapped with errors.ErrCorrupted. +// ErrBatchCorrupted records reason of batch corruption. type ErrBatchCorrupted struct { Reason string } @@ -31,9 +29,8 @@ func newErrBatchCorrupted(reason string) error { } const ( - batchHeaderLen = 8 + 4 - batchGrowRec = 3000 - batchBufioSize = 16 + batchHdrLen = 8 + 4 + batchGrowRec = 3000 ) // BatchReplay wraps basic batch operations. @@ -42,46 +39,34 @@ type BatchReplay interface { Delete(key []byte) } -type batchIndex struct { - keyType keyType - keyPos, keyLen int - valuePos, valueLen int -} - -func (index batchIndex) k(data []byte) []byte { - return data[index.keyPos : index.keyPos+index.keyLen] -} - -func (index batchIndex) v(data []byte) []byte { - if index.valueLen != 0 { - return data[index.valuePos : index.valuePos+index.valueLen] - } - return nil -} - -func (index batchIndex) kv(data []byte) (key, value []byte) { - return index.k(data), index.v(data) -} - // Batch is a write batch. type Batch struct { - data []byte - index []batchIndex - - // internalLen is sums of key/value pair length plus 8-bytes internal key. - internalLen int + data []byte + rLen, bLen int + seq uint64 + sync bool } func (b *Batch) grow(n int) { - o := len(b.data) - if cap(b.data)-o < n { - div := 1 - if len(b.index) > batchGrowRec { - div = len(b.index) / batchGrowRec + off := len(b.data) + if off == 0 { + off = batchHdrLen + if b.data != nil { + b.data = b.data[:off] + } + } + if cap(b.data)-off < n { + if b.data == nil { + b.data = make([]byte, off, off+n) + } else { + odata := b.data + div := 1 + if b.rLen > batchGrowRec { + div = b.rLen / batchGrowRec + } + b.data = make([]byte, off, off+n+(off-batchHdrLen)/div) + copy(b.data, odata) } - ndata := make([]byte, o, o+n+o/div) - copy(ndata, b.data) - b.data = ndata } } @@ -91,36 +76,32 @@ func (b *Batch) appendRec(kt keyType, key, value []byte) { n += binary.MaxVarintLen32 + len(value) } b.grow(n) - index := batchIndex{keyType: kt} - o := len(b.data) - data := b.data[:o+n] - data[o] = byte(kt) - o++ - o += binary.PutUvarint(data[o:], uint64(len(key))) - index.keyPos = o - index.keyLen = len(key) - o += copy(data[o:], key) + off := len(b.data) + data := b.data[:off+n] + data[off] = byte(kt) + off++ + off += binary.PutUvarint(data[off:], uint64(len(key))) + copy(data[off:], key) + off += len(key) if kt == keyTypeVal { - o += binary.PutUvarint(data[o:], uint64(len(value))) - index.valuePos = o - index.valueLen = len(value) - o += copy(data[o:], value) + off += binary.PutUvarint(data[off:], uint64(len(value))) + copy(data[off:], value) + off += len(value) } - b.data = data[:o] - b.index = append(b.index, index) - b.internalLen += index.keyLen + index.valueLen + 8 + b.data = data[:off] + b.rLen++ + // Include 8-byte ikey header + b.bLen += len(key) + len(value) + 8 } // Put appends 'put operation' of the given key/value pair to the batch. -// It is safe to modify the contents of the argument after Put returns but not -// before. +// It is safe to modify the contents of the argument after Put returns. func (b *Batch) Put(key, value []byte) { b.appendRec(keyTypeVal, key, value) } // Delete appends 'delete operation' of the given key to the batch. -// It is safe to modify the contents of the argument after Delete returns but -// not before. +// It is safe to modify the contents of the argument after Delete returns. func (b *Batch) Delete(key []byte) { b.appendRec(keyTypeDel, key, nil) } @@ -130,7 +111,7 @@ func (b *Batch) Delete(key []byte) { // The returned slice is not its own copy, so the contents should not be // modified. func (b *Batch) Dump() []byte { - return b.data + return b.encode() } // Load loads given slice into the batch. Previous contents of the batch @@ -138,212 +119,144 @@ func (b *Batch) Dump() []byte { // The given slice will not be copied and will be used as batch buffer, so // it is not safe to modify the contents of the slice. func (b *Batch) Load(data []byte) error { - return b.decode(data, -1) + return b.decode(0, data) } // Replay replays batch contents. func (b *Batch) Replay(r BatchReplay) error { - for _, index := range b.index { - switch index.keyType { + return b.decodeRec(func(i int, kt keyType, key, value []byte) error { + switch kt { case keyTypeVal: - r.Put(index.k(b.data), index.v(b.data)) + r.Put(key, value) case keyTypeDel: - r.Delete(index.k(b.data)) + r.Delete(key) } - } - return nil + return nil + }) } // Len returns number of records in the batch. func (b *Batch) Len() int { - return len(b.index) + return b.rLen } // Reset resets the batch. func (b *Batch) Reset() { b.data = b.data[:0] - b.index = b.index[:0] - b.internalLen = 0 + b.seq = 0 + b.rLen = 0 + b.bLen = 0 + b.sync = false } -func (b *Batch) replayInternal(fn func(i int, kt keyType, k, v []byte) error) error { - for i, index := range b.index { - if err := fn(i, index.keyType, index.k(b.data), index.v(b.data)); err != nil { - return err - } - } - return nil +func (b *Batch) init(sync bool) { + b.sync = sync } func (b *Batch) append(p *Batch) { - ob := len(b.data) - oi := len(b.index) - b.data = append(b.data, p.data...) - b.index = append(b.index, p.index...) - b.internalLen += p.internalLen - - // Updating index offset. - if ob != 0 { - for ; oi < len(b.index); oi++ { - index := &b.index[oi] - index.keyPos += ob - if index.valueLen != 0 { - index.valuePos += ob - } - } + if p.rLen > 0 { + b.grow(len(p.data) - batchHdrLen) + b.data = append(b.data, p.data[batchHdrLen:]...) + b.rLen += p.rLen + b.bLen += p.bLen + } + if p.sync { + b.sync = true } } -func (b *Batch) decode(data []byte, expectedLen int) error { +// size returns sums of key/value pair length plus 8-bytes ikey. +func (b *Batch) size() int { + return b.bLen +} + +func (b *Batch) encode() []byte { + b.grow(0) + binary.LittleEndian.PutUint64(b.data, b.seq) + binary.LittleEndian.PutUint32(b.data[8:], uint32(b.rLen)) + + return b.data +} + +func (b *Batch) decode(prevSeq uint64, data []byte) error { + if len(data) < batchHdrLen { + return newErrBatchCorrupted("too short") + } + + b.seq = binary.LittleEndian.Uint64(data) + if b.seq < prevSeq { + return newErrBatchCorrupted("invalid sequence number") + } + b.rLen = int(binary.LittleEndian.Uint32(data[8:])) + if b.rLen < 0 { + return newErrBatchCorrupted("invalid records length") + } + // No need to be precise at this point, it won't be used anyway + b.bLen = len(data) - batchHdrLen b.data = data - b.index = b.index[:0] - b.internalLen = 0 - err := decodeBatch(data, func(i int, index batchIndex) error { - b.index = append(b.index, index) - b.internalLen += index.keyLen + index.valueLen + 8 - return nil - }) - if err != nil { - return err - } - if expectedLen >= 0 && len(b.index) != expectedLen { - return newErrBatchCorrupted(fmt.Sprintf("invalid records length: %d vs %d", expectedLen, len(b.index))) - } + return nil } -func (b *Batch) putMem(seq uint64, mdb *memdb.DB) error { - var ik []byte - for i, index := range b.index { - ik = makeInternalKey(ik, index.k(b.data), seq+uint64(i), index.keyType) - if err := mdb.Put(ik, index.v(b.data)); err != nil { - return err - } - } - return nil -} - -func (b *Batch) revertMem(seq uint64, mdb *memdb.DB) error { - var ik []byte - for i, index := range b.index { - ik = makeInternalKey(ik, index.k(b.data), seq+uint64(i), index.keyType) - if err := mdb.Delete(ik); err != nil { - return err - } - } - return nil -} - -func newBatch() interface{} { - return &Batch{} -} - -func decodeBatch(data []byte, fn func(i int, index batchIndex) error) error { - var index batchIndex - for i, o := 0, 0; o < len(data); i++ { - // Key type. - index.keyType = keyType(data[o]) - if index.keyType > keyTypeVal { - return newErrBatchCorrupted(fmt.Sprintf("bad record: invalid type %#x", uint(index.keyType))) - } - o++ - - // Key. - x, n := binary.Uvarint(data[o:]) - o += n - if n <= 0 || o+int(x) > len(data) { - return newErrBatchCorrupted("bad record: invalid key length") - } - index.keyPos = o - index.keyLen = int(x) - o += index.keyLen - - // Value. - if index.keyType == keyTypeVal { - x, n = binary.Uvarint(data[o:]) - o += n - if n <= 0 || o+int(x) > len(data) { - return newErrBatchCorrupted("bad record: invalid value length") - } - index.valuePos = o - index.valueLen = int(x) - o += index.valueLen - } else { - index.valuePos = 0 - index.valueLen = 0 - } - - if err := fn(i, index); err != nil { - return err - } - } - return nil -} - -func decodeBatchToMem(data []byte, expectSeq uint64, mdb *memdb.DB) (seq uint64, batchLen int, err error) { - seq, batchLen, err = decodeBatchHeader(data) - if err != nil { - return 0, 0, err - } - if seq < expectSeq { - return 0, 0, newErrBatchCorrupted("invalid sequence number") - } - data = data[batchHeaderLen:] - var ik []byte - var decodedLen int - err = decodeBatch(data, func(i int, index batchIndex) error { - if i >= batchLen { +func (b *Batch) decodeRec(f func(i int, kt keyType, key, value []byte) error) error { + off := batchHdrLen + for i := 0; i < b.rLen; i++ { + if off >= len(b.data) { return newErrBatchCorrupted("invalid records length") } - ik = makeInternalKey(ik, index.k(data), seq+uint64(i), index.keyType) - if err := mdb.Put(ik, index.v(data)); err != nil { - return err + + kt := keyType(b.data[off]) + if kt > keyTypeVal { + panic(kt) + return newErrBatchCorrupted("bad record: invalid type") } - decodedLen++ - return nil - }) - if err == nil && decodedLen != batchLen { - err = newErrBatchCorrupted(fmt.Sprintf("invalid records length: %d vs %d", batchLen, decodedLen)) - } - return -} + off++ -func encodeBatchHeader(dst []byte, seq uint64, batchLen int) []byte { - dst = ensureBuffer(dst, batchHeaderLen) - binary.LittleEndian.PutUint64(dst, seq) - binary.LittleEndian.PutUint32(dst[8:], uint32(batchLen)) - return dst -} + x, n := binary.Uvarint(b.data[off:]) + off += n + if n <= 0 || off+int(x) > len(b.data) { + return newErrBatchCorrupted("bad record: invalid key length") + } + key := b.data[off : off+int(x)] + off += int(x) + var value []byte + if kt == keyTypeVal { + x, n := binary.Uvarint(b.data[off:]) + off += n + if n <= 0 || off+int(x) > len(b.data) { + return newErrBatchCorrupted("bad record: invalid value length") + } + value = b.data[off : off+int(x)] + off += int(x) + } -func decodeBatchHeader(data []byte) (seq uint64, batchLen int, err error) { - if len(data) < batchHeaderLen { - return 0, 0, newErrBatchCorrupted("too short") - } - - seq = binary.LittleEndian.Uint64(data) - batchLen = int(binary.LittleEndian.Uint32(data[8:])) - if batchLen < 0 { - return 0, 0, newErrBatchCorrupted("invalid records length") - } - return -} - -func batchesLen(batches []*Batch) int { - batchLen := 0 - for _, batch := range batches { - batchLen += batch.Len() - } - return batchLen -} - -func writeBatchesWithHeader(wr io.Writer, batches []*Batch, seq uint64) error { - if _, err := wr.Write(encodeBatchHeader(nil, seq, batchesLen(batches))); err != nil { - return err - } - for _, batch := range batches { - if _, err := wr.Write(batch.data); err != nil { + if err := f(i, kt, key, value); err != nil { return err } } + return nil } + +func (b *Batch) memReplay(to *memdb.DB) error { + var ikScratch []byte + return b.decodeRec(func(i int, kt keyType, key, value []byte) error { + ikScratch = makeInternalKey(ikScratch, key, b.seq+uint64(i), kt) + return to.Put(ikScratch, value) + }) +} + +func (b *Batch) memDecodeAndReplay(prevSeq uint64, data []byte, to *memdb.DB) error { + if err := b.decode(prevSeq, data); err != nil { + return err + } + return b.memReplay(to) +} + +func (b *Batch) revertMemReplay(to *memdb.DB) error { + var ikScratch []byte + return b.decodeRec(func(i int, kt keyType, key, value []byte) error { + ikScratch := makeInternalKey(ikScratch, key, b.seq+uint64(i), kt) + return to.Delete(ikScratch) + }) +} diff --git a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/cache/cache.go b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/cache/cache.go index c5940b2..a287d0e 100644 --- a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/cache/cache.go +++ b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/cache/cache.go @@ -16,7 +16,7 @@ import ( ) // Cacher provides interface to implements a caching functionality. -// An implementation must be safe for concurrent use. +// An implementation must be goroutine-safe. type Cacher interface { // Capacity returns cache capacity. Capacity() int @@ -511,12 +511,18 @@ func (r *Cache) EvictAll() { } } -// Close closes the 'cache map' and forcefully releases all 'cache node'. +// Close closes the 'cache map' and releases all 'cache node'. func (r *Cache) Close() error { r.mu.Lock() if !r.closed { r.closed = true + if r.cacher != nil { + if err := r.cacher.Close(); err != nil { + return err + } + } + h := (*mNode)(r.mHead) h.initBuckets() @@ -535,37 +541,10 @@ func (r *Cache) Close() error { for _, f := range n.onDel { f() } - n.onDel = nil } } } r.mu.Unlock() - - // Avoid deadlock. - if r.cacher != nil { - if err := r.cacher.Close(); err != nil { - return err - } - } - return nil -} - -// CloseWeak closes the 'cache map' and evict all 'cache node' from cacher, but -// unlike Close it doesn't forcefully releases 'cache node'. -func (r *Cache) CloseWeak() error { - r.mu.Lock() - if !r.closed { - r.closed = true - } - r.mu.Unlock() - - // Avoid deadlock. - if r.cacher != nil { - r.cacher.EvictAll() - if err := r.cacher.Close(); err != nil { - return err - } - } return nil } diff --git a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/comparer.go b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/comparer.go index 448402b..248bf7c 100644 --- a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/comparer.go +++ b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/comparer.go @@ -6,9 +6,7 @@ package leveldb -import ( - "github.com/syndtr/goleveldb/leveldb/comparer" -) +import "github.com/syndtr/goleveldb/leveldb/comparer" type iComparer struct { ucmp comparer.Comparer @@ -35,12 +33,12 @@ func (icmp *iComparer) Name() string { } func (icmp *iComparer) Compare(a, b []byte) int { - x := icmp.uCompare(internalKey(a).ukey(), internalKey(b).ukey()) + x := icmp.ucmp.Compare(internalKey(a).ukey(), internalKey(b).ukey()) if x == 0 { if m, n := internalKey(a).num(), internalKey(b).num(); m > n { - return -1 + x = -1 } else if m < n { - return 1 + x = 1 } } return x @@ -48,20 +46,30 @@ func (icmp *iComparer) Compare(a, b []byte) int { func (icmp *iComparer) Separator(dst, a, b []byte) []byte { ua, ub := internalKey(a).ukey(), internalKey(b).ukey() - dst = icmp.uSeparator(dst, ua, ub) - if dst != nil && len(dst) < len(ua) && icmp.uCompare(ua, dst) < 0 { - // Append earliest possible number. - return append(dst, keyMaxNumBytes...) + dst = icmp.ucmp.Separator(dst, ua, ub) + if dst == nil { + return nil } - return nil + if len(dst) < len(ua) && icmp.uCompare(ua, dst) < 0 { + dst = append(dst, keyMaxNumBytes...) + } else { + // Did not close possibilities that n maybe longer than len(ub). + dst = append(dst, a[len(a)-8:]...) + } + return dst } func (icmp *iComparer) Successor(dst, b []byte) []byte { ub := internalKey(b).ukey() - dst = icmp.uSuccessor(dst, ub) - if dst != nil && len(dst) < len(ub) && icmp.uCompare(ub, dst) < 0 { - // Append earliest possible number. - return append(dst, keyMaxNumBytes...) + dst = icmp.ucmp.Successor(dst, ub) + if dst == nil { + return nil } - return nil + if len(dst) < len(ub) && icmp.uCompare(ub, dst) < 0 { + dst = append(dst, keyMaxNumBytes...) + } else { + // Did not close possibilities that n maybe longer than len(ub). + dst = append(dst, b[len(b)-8:]...) + } + return dst } diff --git a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/db.go b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/db.go index a02cb2c..eb6abd0 100644 --- a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/db.go +++ b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/db.go @@ -53,13 +53,14 @@ type DB struct { aliveSnaps, aliveIters int32 // Write. - batchPool sync.Pool - writeMergeC chan writeMerge + writeC chan *Batch writeMergedC chan bool writeLockC chan struct{} writeAckC chan error writeDelay time.Duration writeDelayN int + journalC chan *Batch + journalAckC chan error tr *Transaction // Compaction. @@ -93,11 +94,12 @@ func openDB(s *session) (*DB, error) { // Snapshot snapsList: list.New(), // Write - batchPool: sync.Pool{New: newBatch}, - writeMergeC: make(chan writeMerge), + writeC: make(chan *Batch), writeMergedC: make(chan bool), writeLockC: make(chan struct{}, 1), writeAckC: make(chan error), + journalC: make(chan *Batch), + journalAckC: make(chan error), // Compaction tcompCmdC: make(chan cCmd), tcompPauseC: make(chan chan<- struct{}), @@ -142,10 +144,10 @@ func openDB(s *session) (*DB, error) { if readOnly { db.SetReadOnly() } else { - db.closeW.Add(2) + db.closeW.Add(3) go db.tCompaction() go db.mCompaction() - // go db.jWriter() + go db.jWriter() } s.logf("db@open done T·%v", time.Since(start)) @@ -160,10 +162,10 @@ func openDB(s *session) (*DB, error) { // os.ErrExist error. // // Open will return an error with type of ErrCorrupted if corruption -// detected in the DB. Use errors.IsCorrupted to test whether an error is -// due to corruption. Corrupted DB can be recovered with Recover function. +// detected in the DB. Corrupted DB can be recovered with Recover +// function. // -// The returned DB instance is safe for concurrent use. +// The returned DB instance is goroutine-safe. // The DB must be closed after use, by calling Close method. func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) { s, err := newSession(stor, o) @@ -200,13 +202,13 @@ func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) { // os.ErrExist error. // // OpenFile uses standard file-system backed storage implementation as -// described in the leveldb/storage package. +// desribed in the leveldb/storage package. // // OpenFile will return an error with type of ErrCorrupted if corruption -// detected in the DB. Use errors.IsCorrupted to test whether an error is -// due to corruption. Corrupted DB can be recovered with Recover function. +// detected in the DB. Corrupted DB can be recovered with Recover +// function. // -// The returned DB instance is safe for concurrent use. +// The returned DB instance is goroutine-safe. // The DB must be closed after use, by calling Close method. func OpenFile(path string, o *opt.Options) (db *DB, err error) { stor, err := storage.OpenFile(path, o.GetReadOnly()) @@ -227,7 +229,7 @@ func OpenFile(path string, o *opt.Options) (db *DB, err error) { // The DB must already exist or it will returns an error. // Also, Recover will ignore ErrorIfMissing and ErrorIfExist options. // -// The returned DB instance is safe for concurrent use. +// The returned DB instance is goroutine-safe. // The DB must be closed after use, by calling Close method. func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) { s, err := newSession(stor, o) @@ -253,10 +255,10 @@ func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) { // The DB must already exist or it will returns an error. // Also, Recover will ignore ErrorIfMissing and ErrorIfExist options. // -// RecoverFile uses standard file-system backed storage implementation as described +// RecoverFile uses standard file-system backed storage implementation as desribed // in the leveldb/storage package. // -// The returned DB instance is safe for concurrent use. +// The returned DB instance is goroutine-safe. // The DB must be closed after use, by calling Close method. func RecoverFile(path string, o *opt.Options) (db *DB, err error) { stor, err := storage.OpenFile(path, false) @@ -502,11 +504,10 @@ func (db *DB) recoverJournal() error { checksum = db.s.o.GetStrict(opt.StrictJournalChecksum) writeBuffer = db.s.o.GetWriteBuffer() - jr *journal.Reader - mdb = memdb.New(db.s.icmp, writeBuffer) - buf = &util.Buffer{} - batchSeq uint64 - batchLen int + jr *journal.Reader + mdb = memdb.New(db.s.icmp, writeBuffer) + buf = &util.Buffer{} + batch = &Batch{} ) for _, fd := range fds { @@ -525,7 +526,7 @@ func (db *DB) recoverJournal() error { } // Flush memdb and remove obsolete journal file. - if !ofd.Zero() { + if !ofd.Nil() { if mdb.Len() > 0 { if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil { fr.Close() @@ -568,8 +569,7 @@ func (db *DB) recoverJournal() error { fr.Close() return errors.SetFd(err, fd) } - batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb) - if err != nil { + if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil { if !strict && errors.IsCorrupted(err) { db.s.logf("journal error: %v (skipped)", err) // We won't apply sequence number as it might be corrupted. @@ -581,7 +581,7 @@ func (db *DB) recoverJournal() error { } // Save sequence number. - db.seq = batchSeq + uint64(batchLen) + db.seq = batch.seq + uint64(batch.Len()) // Flush it if large enough. if mdb.Size() >= writeBuffer { @@ -624,7 +624,7 @@ func (db *DB) recoverJournal() error { } // Remove the last obsolete journal file. - if !ofd.Zero() { + if !ofd.Nil() { db.s.stor.Remove(ofd) } @@ -661,10 +661,9 @@ func (db *DB) recoverJournalRO() error { db.logf("journal@recovery RO·Mode F·%d", len(fds)) var ( - jr *journal.Reader - buf = &util.Buffer{} - batchSeq uint64 - batchLen int + jr *journal.Reader + buf = &util.Buffer{} + batch = &Batch{} ) for _, fd := range fds { @@ -704,8 +703,7 @@ func (db *DB) recoverJournalRO() error { fr.Close() return errors.SetFd(err, fd) } - batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb) - if err != nil { + if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil { if !strict && errors.IsCorrupted(err) { db.s.logf("journal error: %v (skipped)", err) // We won't apply sequence number as it might be corrupted. @@ -717,7 +715,7 @@ func (db *DB) recoverJournalRO() error { } // Save sequence number. - db.seq = batchSeq + uint64(batchLen) + db.seq = batch.seq + uint64(batch.Len()) } fr.Close() @@ -858,7 +856,7 @@ func (db *DB) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) { // NewIterator returns an iterator for the latest snapshot of the // underlying DB. -// The returned iterator is not safe for concurrent use, but it is safe to use +// The returned iterator is not goroutine-safe, but it is safe to use // multiple iterators concurrently, with each in a dedicated goroutine. // It is also safe to use an iterator concurrently with modifying its // underlying DB. The resultant key/value pairs are guaranteed to be @@ -1064,8 +1062,6 @@ func (db *DB) Close() error { if db.journal != nil { db.journal.Close() db.journalWriter.Close() - db.journal = nil - db.journalWriter = nil } if db.writeDelayN > 0 { @@ -1081,11 +1077,15 @@ func (db *DB) Close() error { if err1 := db.closer.Close(); err == nil { err = err1 } - db.closer = nil } - // Clear memdbs. - db.clearMems() + // NIL'ing pointers. + db.s = nil + db.mem = nil + db.frozenMem = nil + db.journal = nil + db.journalWriter = nil + db.closer = nil return err } diff --git a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go index b6563e8..5553202 100644 --- a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go +++ b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go @@ -96,7 +96,7 @@ noerr: default: goto haserr } - case <-db.closeC: + case _, _ = <-db.closeC: return } } @@ -113,7 +113,7 @@ haserr: goto hasperr default: } - case <-db.closeC: + case _, _ = <-db.closeC: return } } @@ -126,7 +126,7 @@ hasperr: case db.writeLockC <- struct{}{}: // Hold write lock, so that write won't pass-through. db.compWriteLocking = true - case <-db.closeC: + case _, _ = <-db.closeC: if db.compWriteLocking { // We should release the lock or Close will hang. <-db.writeLockC @@ -195,7 +195,7 @@ func (db *DB) compactionTransact(name string, t compactionTransactInterface) { db.logf("%s exiting (persistent error %q)", name, perr) db.compactionExitTransact() } - case <-db.closeC: + case _, _ = <-db.closeC: db.logf("%s exiting", name) db.compactionExitTransact() } @@ -224,7 +224,7 @@ func (db *DB) compactionTransact(name string, t compactionTransactInterface) { } select { case <-backoffT.C: - case <-db.closeC: + case _, _ = <-db.closeC: db.logf("%s exiting", name) db.compactionExitTransact() } @@ -288,8 +288,8 @@ func (db *DB) memCompaction() { case <-db.compPerErrC: close(resumeC) resumeC = nil - case <-db.closeC: - db.compactionExitTransact() + case _, _ = <-db.closeC: + return } var ( @@ -337,8 +337,8 @@ func (db *DB) memCompaction() { select { case <-resumeC: close(resumeC) - case <-db.closeC: - db.compactionExitTransact() + case _, _ = <-db.closeC: + return } } @@ -378,7 +378,7 @@ func (b *tableCompactionBuilder) appendKV(key, value []byte) error { select { case ch := <-b.db.tcompPauseC: b.db.pauseCompaction(ch) - case <-b.db.closeC: + case _, _ = <-b.db.closeC: b.db.compactionExitTransact() default: } @@ -643,7 +643,7 @@ func (db *DB) tableNeedCompaction() bool { func (db *DB) pauseCompaction(ch chan<- struct{}) { select { case ch <- struct{}{}: - case <-db.closeC: + case _, _ = <-db.closeC: db.compactionExitTransact() } } @@ -688,7 +688,7 @@ func (db *DB) compTrigger(compC chan<- cCmd) { } } -// This will trigger auto compaction and/or wait for all compaction to be done. +// This will trigger auto compation and/or wait for all compaction to be done. func (db *DB) compTriggerWait(compC chan<- cCmd) (err error) { ch := make(chan error) defer close(ch) @@ -697,14 +697,14 @@ func (db *DB) compTriggerWait(compC chan<- cCmd) (err error) { case compC <- cAuto{ch}: case err = <-db.compErrC: return - case <-db.closeC: + case _, _ = <-db.closeC: return ErrClosed } // Wait cmd. select { case err = <-ch: case err = <-db.compErrC: - case <-db.closeC: + case _, _ = <-db.closeC: return ErrClosed } return err @@ -719,14 +719,14 @@ func (db *DB) compTriggerRange(compC chan<- cCmd, level int, min, max []byte) (e case compC <- cRange{level, min, max, ch}: case err := <-db.compErrC: return err - case <-db.closeC: + case _, _ = <-db.closeC: return ErrClosed } // Wait cmd. select { case err = <-ch: case err = <-db.compErrC: - case <-db.closeC: + case _, _ = <-db.closeC: return ErrClosed } return err @@ -758,7 +758,7 @@ func (db *DB) mCompaction() { default: panic("leveldb: unknown command") } - case <-db.closeC: + case _, _ = <-db.closeC: return } } @@ -791,7 +791,7 @@ func (db *DB) tCompaction() { case ch := <-db.tcompPauseC: db.pauseCompaction(ch) continue - case <-db.closeC: + case _, _ = <-db.closeC: return default: } @@ -806,7 +806,7 @@ func (db *DB) tCompaction() { case ch := <-db.tcompPauseC: db.pauseCompaction(ch) continue - case <-db.closeC: + case _, _ = <-db.closeC: return } } diff --git a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/db_snapshot.go b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/db_snapshot.go index 2c69d2e..977f65b 100644 --- a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/db_snapshot.go +++ b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/db_snapshot.go @@ -59,7 +59,7 @@ func (db *DB) releaseSnapshot(se *snapshotElement) { } } -// Gets minimum sequence that not being snapshotted. +// Gets minimum sequence that not being snapshoted. func (db *DB) minSeq() uint64 { db.snapsMu.Lock() defer db.snapsMu.Unlock() @@ -131,7 +131,7 @@ func (snap *Snapshot) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) } // NewIterator returns an iterator for the snapshot of the underlying DB. -// The returned iterator is not safe for concurrent use, but it is safe to use +// The returned iterator is not goroutine-safe, but it is safe to use // multiple iterators concurrently, with each in a dedicated goroutine. // It is also safe to use an iterator concurrently with modifying its // underlying DB. The resultant key/value pairs are guaranteed to be diff --git a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/db_state.go b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/db_state.go index 65e1c54..40f454d 100644 --- a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/db_state.go +++ b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/db_state.go @@ -7,7 +7,6 @@ package leveldb import ( - "errors" "sync/atomic" "time" @@ -16,10 +15,6 @@ import ( "github.com/syndtr/goleveldb/leveldb/storage" ) -var ( - errHasFrozenMem = errors.New("has frozen mem") -) - type memDB struct { db *DB *memdb.DB @@ -72,11 +67,12 @@ func (db *DB) sampleSeek(ikey internalKey) { } func (db *DB) mpoolPut(mem *memdb.DB) { - if !db.isClosed() { - select { - case db.memPool <- mem: - default: - } + defer func() { + recover() + }() + select { + case db.memPool <- mem: + default: } } @@ -104,13 +100,7 @@ func (db *DB) mpoolDrain() { case <-db.memPool: default: } - case <-db.closeC: - ticker.Stop() - // Make sure the pool is drained. - select { - case <-db.memPool: - case <-time.After(time.Second): - } + case _, _ = <-db.closeC: close(db.memPool) return } @@ -131,7 +121,7 @@ func (db *DB) newMem(n int) (mem *memDB, err error) { defer db.memMu.Unlock() if db.frozenMem != nil { - return nil, errHasFrozenMem + panic("still has frozen mem") } if db.journal == nil { @@ -158,26 +148,24 @@ func (db *DB) newMem(n int) (mem *memDB, err error) { func (db *DB) getMems() (e, f *memDB) { db.memMu.RLock() defer db.memMu.RUnlock() - if db.mem != nil { - db.mem.incref() - } else if !db.isClosed() { + if db.mem == nil { panic("nil effective mem") } + db.mem.incref() if db.frozenMem != nil { db.frozenMem.incref() } return db.mem, db.frozenMem } -// Get effective memdb. +// Get frozen memdb. func (db *DB) getEffectiveMem() *memDB { db.memMu.RLock() defer db.memMu.RUnlock() - if db.mem != nil { - db.mem.incref() - } else if !db.isClosed() { + if db.mem == nil { panic("nil effective mem") } + db.mem.incref() return db.mem } @@ -212,14 +200,6 @@ func (db *DB) dropFrozenMem() { db.memMu.Unlock() } -// Clear mems ptr; used by DB.Close(). -func (db *DB) clearMems() { - db.memMu.Lock() - db.mem = nil - db.frozenMem = nil - db.memMu.Unlock() -} - // Set closed flag; return true if not already closed. func (db *DB) setClosed() bool { return atomic.CompareAndSwapUint32(&db.closed, 0, 1) diff --git a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/db_transaction.go b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/db_transaction.go index b8f7e7d..fca8803 100644 --- a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/db_transaction.go +++ b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/db_transaction.go @@ -59,8 +59,8 @@ func (tr *Transaction) Has(key []byte, ro *opt.ReadOptions) (bool, error) { } // NewIterator returns an iterator for the latest snapshot of the transaction. -// The returned iterator is not safe for concurrent use, but it is safe to use -// multiple iterators concurrently, with each in a dedicated goroutine. +// The returned iterator is not goroutine-safe, but it is safe to use multiple +// iterators concurrently, with each in a dedicated goroutine. // It is also safe to use an iterator concurrently while writes to the // transaction. The resultant key/value pairs are guaranteed to be consistent. // @@ -167,8 +167,8 @@ func (tr *Transaction) Write(b *Batch, wo *opt.WriteOptions) error { if tr.closed { return errTransactionDone } - return b.replayInternal(func(i int, kt keyType, k, v []byte) error { - return tr.put(kt, k, v) + return b.decodeRec(func(i int, kt keyType, key, value []byte) error { + return tr.put(kt, key, value) }) } @@ -179,8 +179,7 @@ func (tr *Transaction) setDone() { <-tr.db.writeLockC } -// Commit commits the transaction. If error is not nil, then the transaction is -// not committed, it can then either be retried or discarded. +// Commit commits the transaction. // // Other methods should not be called after transaction has been committed. func (tr *Transaction) Commit() error { @@ -193,27 +192,24 @@ func (tr *Transaction) Commit() error { if tr.closed { return errTransactionDone } + defer tr.setDone() if err := tr.flush(); err != nil { - // Return error, lets user decide either to retry or discard - // transaction. + tr.discard() return err } if len(tr.tables) != 0 { // Committing transaction. tr.rec.setSeqNum(tr.seq) tr.db.compCommitLk.Lock() - tr.stats.startTimer() - var cerr error + defer tr.db.compCommitLk.Unlock() for retry := 0; retry < 3; retry++ { - cerr = tr.db.s.commit(&tr.rec) - if cerr != nil { - tr.db.logf("transaction@commit error R·%d %q", retry, cerr) + if err := tr.db.s.commit(&tr.rec); err != nil { + tr.db.logf("transaction@commit error R·%d %q", retry, err) select { case <-time.After(time.Second): - case <-tr.db.closeC: + case _, _ = <-tr.db.closeC: tr.db.logf("transaction@commit exiting") - tr.db.compCommitLk.Unlock() - return cerr + return err } } else { // Success. Set db.seq. @@ -221,26 +217,9 @@ func (tr *Transaction) Commit() error { break } } - tr.stats.stopTimer() - if cerr != nil { - // Return error, lets user decide either to retry or discard - // transaction. - return cerr - } - - // Update compaction stats. This is safe as long as we hold compCommitLk. - tr.db.compStats.addStat(0, &tr.stats) - // Trigger table auto-compaction. tr.db.compTrigger(tr.db.tcompCmdC) - tr.db.compCommitLk.Unlock() - - // Additionally, wait compaction when certain threshold reached. - // Ignore error, returns error only if transaction can't be committed. - tr.db.waitCompaction() } - // Only mark as done if transaction committed successfully. - tr.setDone() return nil } @@ -266,20 +245,10 @@ func (tr *Transaction) Discard() { tr.lk.Unlock() } -func (db *DB) waitCompaction() error { - if db.s.tLen(0) >= db.s.o.GetWriteL0PauseTrigger() { - return db.compTriggerWait(db.tcompCmdC) - } - return nil -} - // OpenTransaction opens an atomic DB transaction. Only one transaction can be -// opened at a time. Subsequent call to Write and OpenTransaction will be blocked -// until in-flight transaction is committed or discarded. -// The returned transaction handle is safe for concurrent use. -// -// Transaction is expensive and can overwhelm compaction, especially if -// transaction size is small. Use with caution. +// opened at a time. Write will be blocked until the transaction is committed or +// discarded. +// The returned transaction handle is goroutine-safe. // // The transaction must be closed once done, either by committing or discarding // the transaction. @@ -294,7 +263,7 @@ func (db *DB) OpenTransaction() (*Transaction, error) { case db.writeLockC <- struct{}{}: case err := <-db.compPerErrC: return nil, err - case <-db.closeC: + case _, _ = <-db.closeC: return nil, ErrClosed } @@ -309,11 +278,6 @@ func (db *DB) OpenTransaction() (*Transaction, error) { } } - // Wait compaction when certain threshold reached. - if err := db.waitCompaction(); err != nil { - return nil, err - } - tr := &Transaction{ db: db, seq: db.seq, diff --git a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/db_util.go b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/db_util.go index 7ecd960..7fd386c 100644 --- a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/db_util.go +++ b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/db_util.go @@ -62,7 +62,7 @@ func (db *DB) checkAndCleanFiles() error { case storage.TypeManifest: keep = fd.Num >= db.s.manifestFd.Num case storage.TypeJournal: - if !db.frozenJournalFd.Zero() { + if !db.frozenJournalFd.Nil() { keep = fd.Num >= db.frozenJournalFd.Num } else { keep = fd.Num >= db.journalFd.Num diff --git a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/db_write.go b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/db_write.go index 5b6cb48..5576761 100644 --- a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/db_write.go +++ b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/db_write.go @@ -14,42 +14,47 @@ import ( "github.com/syndtr/goleveldb/leveldb/util" ) -func (db *DB) writeJournal(batches []*Batch, seq uint64, sync bool) error { - wr, err := db.journal.Next() +func (db *DB) writeJournal(b *Batch) error { + w, err := db.journal.Next() if err != nil { return err } - if err := writeBatchesWithHeader(wr, batches, seq); err != nil { + if _, err := w.Write(b.encode()); err != nil { return err } if err := db.journal.Flush(); err != nil { return err } - if sync { + if b.sync { return db.journalWriter.Sync() } return nil } +func (db *DB) jWriter() { + defer db.closeW.Done() + for { + select { + case b := <-db.journalC: + if b != nil { + db.journalAckC <- db.writeJournal(b) + } + case _, _ = <-db.closeC: + return + } + } +} + func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) { - retryLimit := 3 -retry: // Wait for pending memdb compaction. err = db.compTriggerWait(db.mcompCmdC) if err != nil { return } - retryLimit-- // Create new memdb and journal. mem, err = db.newMem(n) if err != nil { - if err == errHasFrozenMem { - if retryLimit <= 0 { - panic("BUG: still has frozen memdb") - } - goto retry - } return } @@ -64,29 +69,24 @@ retry: func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) { delayed := false - slowdownTrigger := db.s.o.GetWriteL0SlowdownTrigger() - pauseTrigger := db.s.o.GetWriteL0PauseTrigger() flush := func() (retry bool) { + v := db.s.version() + defer v.release() mdb = db.getEffectiveMem() - if mdb == nil { - err = ErrClosed - return false - } defer func() { if retry { mdb.decref() mdb = nil } }() - tLen := db.s.tLen(0) mdbFree = mdb.Free() switch { - case tLen >= slowdownTrigger && !delayed: + case v.tLen(0) >= db.s.o.GetWriteL0SlowdownTrigger() && !delayed: delayed = true time.Sleep(time.Millisecond) case mdbFree >= n: return false - case tLen >= pauseTrigger: + case v.tLen(0) >= db.s.o.GetWriteL0PauseTrigger(): delayed = true err = db.compTriggerWait(db.tcompCmdC) if err != nil { @@ -123,250 +123,159 @@ func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) { return } -type writeMerge struct { - sync bool - batch *Batch - keyType keyType - key, value []byte -} - -func (db *DB) unlockWrite(overflow bool, merged int, err error) { - for i := 0; i < merged; i++ { - db.writeAckC <- err - } - if overflow { - // Pass lock to the next write (that failed to merge). - db.writeMergedC <- false - } else { - // Release lock. - <-db.writeLockC - } -} - -// ourBatch if defined should equal with batch. -func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error { - // Try to flush memdb. This method would also trying to throttle writes - // if it is too fast and compaction cannot catch-up. - mdb, mdbFree, err := db.flush(batch.internalLen) - if err != nil { - db.unlockWrite(false, 0, err) - return err - } - defer mdb.decref() - - var ( - overflow bool - merged int - batches = []*Batch{batch} - ) - - if merge { - // Merge limit. - var mergeLimit int - if batch.internalLen > 128<<10 { - mergeLimit = (1 << 20) - batch.internalLen - } else { - mergeLimit = 128 << 10 - } - mergeCap := mdbFree - batch.internalLen - if mergeLimit > mergeCap { - mergeLimit = mergeCap - } - - merge: - for mergeLimit > 0 { - select { - case incoming := <-db.writeMergeC: - if incoming.batch != nil { - // Merge batch. - if incoming.batch.internalLen > mergeLimit { - overflow = true - break merge - } - batches = append(batches, incoming.batch) - mergeLimit -= incoming.batch.internalLen - } else { - // Merge put. - internalLen := len(incoming.key) + len(incoming.value) + 8 - if internalLen > mergeLimit { - overflow = true - break merge - } - if ourBatch == nil { - ourBatch = db.batchPool.Get().(*Batch) - ourBatch.Reset() - batches = append(batches, ourBatch) - } - // We can use same batch since concurrent write doesn't - // guarantee write order. - ourBatch.appendRec(incoming.keyType, incoming.key, incoming.value) - mergeLimit -= internalLen - } - sync = sync || incoming.sync - merged++ - db.writeMergedC <- true - - default: - break merge - } - } - } - - // Seq number. - seq := db.seq + 1 - - // Write journal. - if err := db.writeJournal(batches, seq, sync); err != nil { - db.unlockWrite(overflow, merged, err) - return err - } - - // Put batches. - for _, batch := range batches { - if err := batch.putMem(seq, mdb.DB); err != nil { - panic(err) - } - seq += uint64(batch.Len()) - } - - // Incr seq number. - db.addSeq(uint64(batchesLen(batches))) - - // Rotate memdb if it's reach the threshold. - if batch.internalLen >= mdbFree { - db.rotateMem(0, false) - } - - db.unlockWrite(overflow, merged, nil) - return nil -} - -// Write apply the given batch to the DB. The batch records will be applied -// sequentially. Write might be used concurrently, when used concurrently and -// batch is small enough, write will try to merge the batches. Set NoWriteMerge -// option to true to disable write merge. +// Write apply the given batch to the DB. The batch will be applied +// sequentially. // -// It is safe to modify the contents of the arguments after Write returns but -// not before. Write will not modify content of the batch. -func (db *DB) Write(batch *Batch, wo *opt.WriteOptions) error { - if err := db.ok(); err != nil || batch == nil || batch.Len() == 0 { - return err +// It is safe to modify the contents of the arguments after Write returns. +func (db *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) { + err = db.ok() + if err != nil || b == nil || b.Len() == 0 { + return } - // If the batch size is larger than write buffer, it may justified to write - // using transaction instead. Using transaction the batch will be written - // into tables directly, skipping the journaling. - if batch.internalLen > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() { - tr, err := db.OpenTransaction() - if err != nil { - return err + b.init(wo.GetSync() && !db.s.o.GetNoSync()) + + if b.size() > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() { + // Writes using transaction. + tr, err1 := db.OpenTransaction() + if err1 != nil { + return err1 } - if err := tr.Write(batch, wo); err != nil { + if err1 := tr.Write(b, wo); err1 != nil { tr.Discard() - return err + return err1 } return tr.Commit() } - merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge() - sync := wo.GetSync() && !db.s.o.GetNoSync() + // The write happen synchronously. + select { + case db.writeC <- b: + if <-db.writeMergedC { + return <-db.writeAckC + } + // Continue, the write lock already acquired by previous writer + // and handed out to us. + case db.writeLockC <- struct{}{}: + case err = <-db.compPerErrC: + return + case _, _ = <-db.closeC: + return ErrClosed + } - // Acquire write lock. - if merge { + merged := 0 + danglingMerge := false + defer func() { + for i := 0; i < merged; i++ { + db.writeAckC <- err + } + if danglingMerge { + // Only one dangling merge at most, so this is safe. + db.writeMergedC <- false + } else { + <-db.writeLockC + } + }() + + mdb, mdbFree, err := db.flush(b.size()) + if err != nil { + return + } + defer mdb.decref() + + // Calculate maximum size of the batch. + m := 1 << 20 + if x := b.size(); x <= 128<<10 { + m = x + (128 << 10) + } + m = minInt(m, mdbFree) + + // Merge with other batch. +drain: + for b.size() < m && !b.sync { select { - case db.writeMergeC <- writeMerge{sync: sync, batch: batch}: - if <-db.writeMergedC { - // Write is merged. - return <-db.writeAckC + case nb := <-db.writeC: + if b.size()+nb.size() <= m { + b.append(nb) + db.writeMergedC <- true + merged++ + } else { + danglingMerge = true + break drain } - // Write is not merged, the write lock is handed to us. Continue. - case db.writeLockC <- struct{}{}: - // Write lock acquired. - case err := <-db.compPerErrC: - // Compaction error. - return err - case <-db.closeC: - // Closed - return ErrClosed + default: + break drain + } + } + + // Set batch first seq number relative from last seq. + b.seq = db.seq + 1 + + // Write journal concurrently if it is large enough. + if b.size() >= (128 << 10) { + // Push the write batch to the journal writer + select { + case db.journalC <- b: + // Write into memdb + if berr := b.memReplay(mdb.DB); berr != nil { + panic(berr) + } + case err = <-db.compPerErrC: + return + case _, _ = <-db.closeC: + err = ErrClosed + return + } + // Wait for journal writer + select { + case err = <-db.journalAckC: + if err != nil { + // Revert memdb if error detected + if berr := b.revertMemReplay(mdb.DB); berr != nil { + panic(berr) + } + return + } + case _, _ = <-db.closeC: + err = ErrClosed + return } } else { - select { - case db.writeLockC <- struct{}{}: - // Write lock acquired. - case err := <-db.compPerErrC: - // Compaction error. - return err - case <-db.closeC: - // Closed - return ErrClosed + err = db.writeJournal(b) + if err != nil { + return + } + if berr := b.memReplay(mdb.DB); berr != nil { + panic(berr) } } - return db.writeLocked(batch, nil, merge, sync) -} + // Set last seq number. + db.addSeq(uint64(b.Len())) -func (db *DB) putRec(kt keyType, key, value []byte, wo *opt.WriteOptions) error { - if err := db.ok(); err != nil { - return err + if b.size() >= mdbFree { + db.rotateMem(0, false) } - - merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge() - sync := wo.GetSync() && !db.s.o.GetNoSync() - - // Acquire write lock. - if merge { - select { - case db.writeMergeC <- writeMerge{sync: sync, keyType: kt, key: key, value: value}: - if <-db.writeMergedC { - // Write is merged. - return <-db.writeAckC - } - // Write is not merged, the write lock is handed to us. Continue. - case db.writeLockC <- struct{}{}: - // Write lock acquired. - case err := <-db.compPerErrC: - // Compaction error. - return err - case <-db.closeC: - // Closed - return ErrClosed - } - } else { - select { - case db.writeLockC <- struct{}{}: - // Write lock acquired. - case err := <-db.compPerErrC: - // Compaction error. - return err - case <-db.closeC: - // Closed - return ErrClosed - } - } - - batch := db.batchPool.Get().(*Batch) - batch.Reset() - batch.appendRec(kt, key, value) - return db.writeLocked(batch, batch, merge, sync) + return } // Put sets the value for the given key. It overwrites any previous value -// for that key; a DB is not a multi-map. Write merge also applies for Put, see -// Write. +// for that key; a DB is not a multi-map. // -// It is safe to modify the contents of the arguments after Put returns but not -// before. +// It is safe to modify the contents of the arguments after Put returns. func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error { - return db.putRec(keyTypeVal, key, value, wo) + b := new(Batch) + b.Put(key, value) + return db.Write(b, wo) } -// Delete deletes the value for the given key. Delete will not returns error if -// key doesn't exist. Write merge also applies for Delete, see Write. +// Delete deletes the value for the given key. // -// It is safe to modify the contents of the arguments after Delete returns but -// not before. +// It is safe to modify the contents of the arguments after Delete returns. func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error { - return db.putRec(keyTypeDel, key, nil, wo) + b := new(Batch) + b.Delete(key) + return db.Write(b, wo) } func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool { @@ -395,15 +304,12 @@ func (db *DB) CompactRange(r util.Range) error { case db.writeLockC <- struct{}{}: case err := <-db.compPerErrC: return err - case <-db.closeC: + case _, _ = <-db.closeC: return ErrClosed } // Check for overlaps in memdb. mdb := db.getEffectiveMem() - if mdb == nil { - return ErrClosed - } defer mdb.decref() if isMemOverlaps(db.s.icmp, mdb.DB, r.Start, r.Limit) { // Memdb compaction. @@ -435,7 +341,7 @@ func (db *DB) SetReadOnly() error { db.compWriteLocking = true case err := <-db.compPerErrC: return err - case <-db.closeC: + case _, _ = <-db.closeC: return ErrClosed } @@ -444,7 +350,7 @@ func (db *DB) SetReadOnly() error { case db.compErrSetC <- ErrReadOnly: case perr := <-db.compPerErrC: return perr - case <-db.closeC: + case _, _ = <-db.closeC: return ErrClosed } diff --git a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/errors.go b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/errors.go index de26498..c8bd66a 100644 --- a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/errors.go +++ b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/errors.go @@ -10,7 +10,6 @@ import ( "github.com/syndtr/goleveldb/leveldb/errors" ) -// Common errors. var ( ErrNotFound = errors.ErrNotFound ErrReadOnly = errors.New("leveldb: read-only mode") diff --git a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/errors/errors.go b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/errors/errors.go index 8d6146b..9a0f6e2 100644 --- a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/errors/errors.go +++ b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/errors/errors.go @@ -15,7 +15,6 @@ import ( "github.com/syndtr/goleveldb/leveldb/util" ) -// Common errors. var ( ErrNotFound = New("leveldb: not found") ErrReleased = util.ErrReleased @@ -35,10 +34,11 @@ type ErrCorrupted struct { } func (e *ErrCorrupted) Error() string { - if !e.Fd.Zero() { + if !e.Fd.Nil() { return fmt.Sprintf("%v [file=%v]", e.Err, e.Fd) + } else { + return e.Err.Error() } - return e.Err.Error() } // NewErrCorrupted creates new ErrCorrupted error. diff --git a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/iterator/iter.go b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/iterator/iter.go index 3b55532..c252286 100644 --- a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/iterator/iter.go +++ b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/iterator/iter.go @@ -21,13 +21,13 @@ var ( // IteratorSeeker is the interface that wraps the 'seeks method'. type IteratorSeeker interface { // First moves the iterator to the first key/value pair. If the iterator - // only contains one key/value pair then First and Last would moves + // only contains one key/value pair then First and Last whould moves // to the same key/value pair. // It returns whether such pair exist. First() bool // Last moves the iterator to the last key/value pair. If the iterator - // only contains one key/value pair then First and Last would moves + // only contains one key/value pair then First and Last whould moves // to the same key/value pair. // It returns whether such pair exist. Last() bool @@ -48,7 +48,7 @@ type IteratorSeeker interface { Prev() bool } -// CommonIterator is the interface that wraps common iterator methods. +// CommonIterator is the interface that wraps common interator methods. type CommonIterator interface { IteratorSeeker @@ -71,15 +71,14 @@ type CommonIterator interface { // Iterator iterates over a DB's key/value pairs in key order. // -// When encounter an error any 'seeks method' will return false and will +// When encouter an error any 'seeks method' will return false and will // yield no key/value pairs. The error can be queried by calling the Error // method. Calling Release is still necessary. // // An iterator must be released after use, but it is not necessary to read // an iterator until exhaustion. -// Also, an iterator is not necessarily safe for concurrent use, but it is -// safe to use multiple iterators concurrently, with each in a dedicated -// goroutine. +// Also, an iterator is not necessarily goroutine-safe, but it is safe to use +// multiple iterators concurrently, with each in a dedicated goroutine. type Iterator interface { CommonIterator @@ -99,7 +98,7 @@ type Iterator interface { // // ErrorCallbackSetter implemented by indexed and merged iterator. type ErrorCallbackSetter interface { - // SetErrorCallback allows set an error callback of the corresponding + // SetErrorCallback allows set an error callback of the coresponding // iterator. Use nil to clear the callback. SetErrorCallback(f func(err error)) } diff --git a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/journal/journal.go b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/journal/journal.go index d094c3d..891098b 100644 --- a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/journal/journal.go +++ b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/journal/journal.go @@ -180,37 +180,34 @@ func (r *Reader) nextChunk(first bool) error { checksum := binary.LittleEndian.Uint32(r.buf[r.j+0 : r.j+4]) length := binary.LittleEndian.Uint16(r.buf[r.j+4 : r.j+6]) chunkType := r.buf[r.j+6] - unprocBlock := r.n - r.j + if checksum == 0 && length == 0 && chunkType == 0 { // Drop entire block. + m := r.n - r.j r.i = r.n r.j = r.n - return r.corrupt(unprocBlock, "zero header", false) - } - if chunkType < fullChunkType || chunkType > lastChunkType { - // Drop entire block. - r.i = r.n - r.j = r.n - return r.corrupt(unprocBlock, fmt.Sprintf("invalid chunk type %#x", chunkType), false) - } - r.i = r.j + headerSize - r.j = r.j + headerSize + int(length) - if r.j > r.n { - // Drop entire block. - r.i = r.n - r.j = r.n - return r.corrupt(unprocBlock, "chunk length overflows block", false) - } else if r.checksum && checksum != util.NewCRC(r.buf[r.i-1:r.j]).Value() { - // Drop entire block. - r.i = r.n - r.j = r.n - return r.corrupt(unprocBlock, "checksum mismatch", false) + return r.corrupt(m, "zero header", false) + } else { + m := r.n - r.j + r.i = r.j + headerSize + r.j = r.j + headerSize + int(length) + if r.j > r.n { + // Drop entire block. + r.i = r.n + r.j = r.n + return r.corrupt(m, "chunk length overflows block", false) + } else if r.checksum && checksum != util.NewCRC(r.buf[r.i-1:r.j]).Value() { + // Drop entire block. + r.i = r.n + r.j = r.n + return r.corrupt(m, "checksum mismatch", false) + } } if first && chunkType != fullChunkType && chunkType != firstChunkType { - chunkLength := (r.j - r.i) + headerSize + m := r.j - r.i r.i = r.j // Report the error, but skip it. - return r.corrupt(chunkLength, "orphan chunk", true) + return r.corrupt(m+headerSize, "orphan chunk", true) } r.last = chunkType == fullChunkType || chunkType == lastChunkType return nil diff --git a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/key.go b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/key.go index ad8f51e..d0b80aa 100644 --- a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/key.go +++ b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/key.go @@ -37,14 +37,14 @@ func (kt keyType) String() string { case keyTypeVal: return "v" } - return fmt.Sprintf("", uint(kt)) + return "x" } // Value types encoded as the last component of internal keys. // Don't modify; this value are saved to disk. const ( - keyTypeDel = keyType(0) - keyTypeVal = keyType(1) + keyTypeDel keyType = iota + keyTypeVal ) // keyTypeSeek defines the keyType that should be passed when constructing an @@ -79,7 +79,11 @@ func makeInternalKey(dst, ukey []byte, seq uint64, kt keyType) internalKey { panic("leveldb: invalid type") } - dst = ensureBuffer(dst, len(ukey)+8) + if n := len(ukey) + 8; cap(dst) < n { + dst = make([]byte, n) + } else { + dst = dst[:n] + } copy(dst, ukey) binary.LittleEndian.PutUint64(dst[len(ukey):], (seq<<8)|uint64(kt)) return internalKey(dst) @@ -139,5 +143,5 @@ func (ik internalKey) String() string { if ukey, seq, kt, err := parseInternalKey(ik); err == nil { return fmt.Sprintf("%s,%s%d", shorten(string(ukey)), kt, seq) } - return fmt.Sprintf("", []byte(ik)) + return "" } diff --git a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go index 18a19ed..1395bd9 100644 --- a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go +++ b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go @@ -17,7 +17,6 @@ import ( "github.com/syndtr/goleveldb/leveldb/util" ) -// Common errors. var ( ErrNotFound = errors.ErrNotFound ErrIterReleased = errors.New("leveldb/memdb: iterator released") @@ -386,7 +385,7 @@ func (p *DB) Find(key []byte) (rkey, value []byte, err error) { } // NewIterator returns an iterator of the DB. -// The returned iterator is not safe for concurrent use, but it is safe to use +// The returned iterator is not goroutine-safe, but it is safe to use // multiple iterators concurrently, with each in a dedicated goroutine. // It is also safe to use an iterator concurrently with modifying its // underlying DB. However, the resultant key/value pairs are not guaranteed @@ -412,7 +411,7 @@ func (p *DB) Capacity() int { } // Size returns sum of keys and values length. Note that deleted -// key/value will not be accounted for, but it will still consume +// key/value will not be accouted for, but it will still consume // the buffer, since the buffer is append only. func (p *DB) Size() int { p.mu.RLock() @@ -454,14 +453,11 @@ func (p *DB) Reset() { p.mu.Unlock() } -// New creates a new initialized in-memory key/value DB. The capacity +// New creates a new initalized in-memory key/value DB. The capacity // is the initial key/value buffer capacity. The capacity is advisory, // not enforced. // -// This DB is append-only, deleting an entry would remove entry node but not -// reclaim KV buffer. -// -// The returned DB instance is safe for concurrent use. +// The returned DB instance is goroutine-safe. func New(cmp comparer.BasicComparer, capacity int) *DB { p := &DB{ cmp: cmp, diff --git a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/opt/options.go b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/opt/options.go index 44e7d9a..3d2bf1c 100644 --- a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/opt/options.go +++ b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/opt/options.go @@ -312,11 +312,6 @@ type Options struct { // The default is false. NoSync bool - // NoWriteMerge allows disabling write merge. - // - // The default is false. - NoWriteMerge bool - // OpenFilesCacher provides cache algorithm for open files caching. // Specify NoCacher to disable caching algorithm. // @@ -548,13 +543,6 @@ func (o *Options) GetNoSync() bool { return o.NoSync } -func (o *Options) GetNoWriteMerge() bool { - if o == nil { - return false - } - return o.NoWriteMerge -} - func (o *Options) GetOpenFilesCacher() Cacher { if o == nil || o.OpenFilesCacher == nil { return DefaultOpenFilesCacher @@ -641,11 +629,6 @@ func (ro *ReadOptions) GetStrict(strict Strict) bool { // WriteOptions holds the optional parameters for 'write operation'. The // 'write operation' includes Write, Put and Delete. type WriteOptions struct { - // NoWriteMerge allows disabling write merge. - // - // The default is false. - NoWriteMerge bool - // Sync is whether to sync underlying writes from the OS buffer cache // through to actual disk, if applicable. Setting Sync can result in // slower writes. @@ -661,13 +644,6 @@ type WriteOptions struct { Sync bool } -func (wo *WriteOptions) GetNoWriteMerge() bool { - if wo == nil { - return false - } - return wo.NoWriteMerge -} - func (wo *WriteOptions) GetSync() bool { if wo == nil { return false diff --git a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/session.go b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/session.go index ad68a87..b0d3fef 100644 --- a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/session.go +++ b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/session.go @@ -18,8 +18,7 @@ import ( "github.com/syndtr/goleveldb/leveldb/storage" ) -// ErrManifestCorrupted records manifest corruption. This error will be -// wrapped with errors.ErrCorrupted. +// ErrManifestCorrupted records manifest corruption. type ErrManifestCorrupted struct { Field string Reason string @@ -43,11 +42,10 @@ type session struct { stSeqNum uint64 // last mem compacted seq; need external synchronization stor storage.Storage - storLock storage.Locker + storLock storage.Lock o *cachedOptions icmp *iComparer tops *tOps - fileRef map[int64]int manifest *journal.Writer manifestWriter storage.Writer @@ -70,7 +68,6 @@ func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) { s = &session{ stor: stor, storLock: storLock, - fileRef: make(map[int64]int), } s.setOptions(o) s.tops = newTableOps(s) @@ -90,12 +87,12 @@ func (s *session) close() { } s.manifest = nil s.manifestWriter = nil - s.setVersion(&version{s: s, closing: true}) + s.stVersion = nil } // Release session lock. func (s *session) release() { - s.storLock.Unlock() + s.storLock.Release() } // Create a new database session; need external synchronization. diff --git a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go index 9232893..674182f 100644 --- a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go +++ b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go @@ -39,18 +39,6 @@ func (s *session) newTemp() storage.FileDesc { return storage.FileDesc{storage.TypeTemp, num} } -func (s *session) addFileRef(fd storage.FileDesc, ref int) int { - ref += s.fileRef[fd.Num] - if ref > 0 { - s.fileRef[fd.Num] = ref - } else if ref == 0 { - delete(s.fileRef, fd.Num) - } else { - panic(fmt.Sprintf("negative ref: %v", fd)) - } - return ref -} - // Session state. // Get current version. This will incr version ref, must call @@ -58,28 +46,21 @@ func (s *session) addFileRef(fd storage.FileDesc, ref int) int { func (s *session) version() *version { s.vmu.Lock() defer s.vmu.Unlock() - s.stVersion.incref() + s.stVersion.ref++ return s.stVersion } -func (s *session) tLen(level int) int { - s.vmu.Lock() - defer s.vmu.Unlock() - return s.stVersion.tLen(level) -} - // Set current version to v. func (s *session) setVersion(v *version) { s.vmu.Lock() - defer s.vmu.Unlock() - // Hold by session. It is important to call this first before releasing - // current version, otherwise the still used files might get released. - v.incref() - if s.stVersion != nil { - // Release current version. - s.stVersion.releaseNB() + v.ref = 1 // Holds by session. + if old := s.stVersion; old != nil { + v.ref++ // Holds by old version. + old.next = v + old.releaseNB() } s.stVersion = v + s.vmu.Unlock() } // Get current unused file number. @@ -216,7 +197,7 @@ func (s *session) newManifest(rec *sessionRecord, v *version) (err error) { if s.manifestWriter != nil { s.manifestWriter.Close() } - if !s.manifestFd.Zero() { + if !s.manifestFd.Nil() { s.stor.Remove(s.manifestFd) } s.manifestFd = fd diff --git a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go index e53434c..cbe1dc1 100644 --- a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go +++ b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go @@ -32,7 +32,7 @@ type fileStorageLock struct { fs *fileStorage } -func (lock *fileStorageLock) Unlock() { +func (lock *fileStorageLock) Release() { if lock.fs != nil { lock.fs.mu.Lock() defer lock.fs.mu.Unlock() @@ -116,7 +116,7 @@ func OpenFile(path string, readOnly bool) (Storage, error) { return fs, nil } -func (fs *fileStorage) Lock() (Locker, error) { +func (fs *fileStorage) Lock() (Lock, error) { fs.mu.Lock() defer fs.mu.Unlock() if fs.open < 0 { @@ -323,7 +323,7 @@ func (fs *fileStorage) GetMeta() (fd FileDesc, err error) { } } // Don't remove any files if there is no valid CURRENT file. - if fd.Zero() { + if fd.Nil() { if cerr != nil { err = cerr } else { diff --git a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_nacl.go b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_nacl.go deleted file mode 100644 index 5545aee..0000000 --- a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_nacl.go +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright (c) 2012, Suryandaru Triandana -// All rights reserved. -// -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -// +build nacl - -package storage - -import ( - "os" - "syscall" -) - -func newFileLock(path string, readOnly bool) (fl fileLock, err error) { - return nil, syscall.ENOTSUP -} - -func setFileLock(f *os.File, readOnly, lock bool) error { - return syscall.ENOTSUP -} - -func rename(oldpath, newpath string) error { - return syscall.ENOTSUP -} - -func isErrInvalid(err error) bool { - return false -} - -func syncDir(name string) error { - return syscall.ENOTSUP -} diff --git a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/storage/mem_storage.go b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/storage/mem_storage.go index 9b0421f..9b70e15 100644 --- a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/storage/mem_storage.go +++ b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/storage/mem_storage.go @@ -18,7 +18,7 @@ type memStorageLock struct { ms *memStorage } -func (lock *memStorageLock) Unlock() { +func (lock *memStorageLock) Release() { ms := lock.ms ms.mu.Lock() defer ms.mu.Unlock() @@ -43,7 +43,7 @@ func NewMemStorage() Storage { } } -func (ms *memStorage) Lock() (Locker, error) { +func (ms *memStorage) Lock() (Lock, error) { ms.mu.Lock() defer ms.mu.Unlock() if ms.slock != nil { @@ -69,7 +69,7 @@ func (ms *memStorage) SetMeta(fd FileDesc) error { func (ms *memStorage) GetMeta() (FileDesc, error) { ms.mu.Lock() defer ms.mu.Unlock() - if ms.meta.Zero() { + if ms.meta.Nil() { return FileDesc{}, os.ErrNotExist } return ms.meta, nil @@ -78,7 +78,7 @@ func (ms *memStorage) GetMeta() (FileDesc, error) { func (ms *memStorage) List(ft FileType) ([]FileDesc, error) { ms.mu.Lock() var fds []FileDesc - for x := range ms.files { + for x, _ := range ms.files { fd := unpackFile(x) if fd.Type&ft != 0 { fds = append(fds, fd) diff --git a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/storage/storage.go b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/storage/storage.go index c16bce6..9b30b67 100644 --- a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/storage/storage.go +++ b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/storage/storage.go @@ -11,12 +11,12 @@ import ( "errors" "fmt" "io" + + "github.com/syndtr/goleveldb/leveldb/util" ) -// FileType represent a file type. type FileType int -// File types. const ( TypeManifest FileType = 1 << iota TypeJournal @@ -40,7 +40,6 @@ func (t FileType) String() string { return fmt.Sprintf("", t) } -// Common error. var ( ErrInvalidFile = errors.New("leveldb/storage: invalid file for argument") ErrLocked = errors.New("leveldb/storage: already locked") @@ -56,10 +55,11 @@ type ErrCorrupted struct { } func (e *ErrCorrupted) Error() string { - if !e.Fd.Zero() { + if !e.Fd.Nil() { return fmt.Sprintf("%v [file=%v]", e.Err, e.Fd) + } else { + return e.Err.Error() } - return e.Err.Error() } // Syncer is the interface that wraps basic Sync method. @@ -83,12 +83,11 @@ type Writer interface { Syncer } -// Locker is the interface that wraps Unlock method. -type Locker interface { - Unlock() +type Lock interface { + util.Releaser } -// FileDesc is a 'file descriptor'. +// FileDesc is a file descriptor. type FileDesc struct { Type FileType Num int64 @@ -109,12 +108,12 @@ func (fd FileDesc) String() string { } } -// Zero returns true if fd == (FileDesc{}). -func (fd FileDesc) Zero() bool { +// Nil returns true if fd == (FileDesc{}). +func (fd FileDesc) Nil() bool { return fd == (FileDesc{}) } -// FileDescOk returns true if fd is a valid 'file descriptor'. +// FileDescOk returns true if fd is a valid file descriptor. func FileDescOk(fd FileDesc) bool { switch fd.Type { case TypeManifest: @@ -127,44 +126,43 @@ func FileDescOk(fd FileDesc) bool { return fd.Num >= 0 } -// Storage is the storage. A storage instance must be safe for concurrent use. +// Storage is the storage. A storage instance must be goroutine-safe. type Storage interface { // Lock locks the storage. Any subsequent attempt to call Lock will fail // until the last lock released. - // Caller should call Unlock method after use. - Lock() (Locker, error) + // After use the caller should call the Release method. + Lock() (Lock, error) // Log logs a string. This is used for logging. // An implementation may write to a file, stdout or simply do nothing. Log(str string) - // SetMeta store 'file descriptor' that can later be acquired using GetMeta - // method. The 'file descriptor' should point to a valid file. - // SetMeta should be implemented in such way that changes should happen + // SetMeta sets to point to the given fd, which then can be acquired using + // GetMeta method. + // SetMeta should be implemented in such way that changes should happened // atomically. SetMeta(fd FileDesc) error - // GetMeta returns 'file descriptor' stored in meta. The 'file descriptor' - // can be updated using SetMeta method. - // Returns os.ErrNotExist if meta doesn't store any 'file descriptor', or - // 'file descriptor' point to nonexistent file. + // GetManifest returns a manifest file. + // Returns os.ErrNotExist if meta doesn't point to any fd, or point to fd + // that doesn't exist. GetMeta() (FileDesc, error) - // List returns file descriptors that match the given file types. + // List returns fds that match the given file types. // The file types may be OR'ed together. List(ft FileType) ([]FileDesc, error) - // Open opens file with the given 'file descriptor' read-only. + // Open opens file with the given fd read-only. // Returns os.ErrNotExist error if the file does not exist. // Returns ErrClosed if the underlying storage is closed. Open(fd FileDesc) (Reader, error) - // Create creates file with the given 'file descriptor', truncate if already - // exist and opens write-only. + // Create creates file with the given fd, truncate if already exist and + // opens write-only. // Returns ErrClosed if the underlying storage is closed. Create(fd FileDesc) (Writer, error) - // Remove removes file with the given 'file descriptor'. + // Remove removes file with the given fd. // Returns ErrClosed if the underlying storage is closed. Remove(fd FileDesc) error diff --git a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/table.go b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/table.go index 81d18a5..310ba6c 100644 --- a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/table.go +++ b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/table.go @@ -434,7 +434,7 @@ func (t *tOps) close() { t.bpool.Close() t.cache.Close() if t.bcache != nil { - t.bcache.CloseWeak() + t.bcache.Close() } } diff --git a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/table/reader.go b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/table/reader.go index 16cfbaa..ae61bec 100644 --- a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/table/reader.go +++ b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/table/reader.go @@ -26,15 +26,12 @@ import ( "github.com/syndtr/goleveldb/leveldb/util" ) -// Reader errors. var ( ErrNotFound = errors.ErrNotFound ErrReaderReleased = errors.New("leveldb/table: reader released") ErrIterReleased = errors.New("leveldb/table: iterator released") ) -// ErrCorrupted describes error due to corruption. This error will be wrapped -// with errors.ErrCorrupted. type ErrCorrupted struct { Pos int64 Size int64 @@ -64,7 +61,7 @@ type block struct { func (b *block) seek(cmp comparer.Comparer, rstart, rlimit int, key []byte) (index, offset int, err error) { index = sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool { offset := int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):])) - offset++ // shared always zero, since this is a restart point + offset += 1 // shared always zero, since this is a restart point v1, n1 := binary.Uvarint(b.data[offset:]) // key length _, n2 := binary.Uvarint(b.data[offset+n1:]) // value length m := offset + n1 + n2 @@ -359,7 +356,7 @@ func (i *blockIter) Prev() bool { i.value = nil offset := i.block.restartOffset(ri) if offset == i.offset { - ri-- + ri -= 1 if ri < 0 { i.dir = dirSOI return false @@ -581,7 +578,6 @@ func (r *Reader) readRawBlock(bh blockHandle, verifyChecksum bool) ([]byte, erro case blockTypeSnappyCompression: decLen, err := snappy.DecodedLen(data[:bh.length]) if err != nil { - r.bpool.Put(data) return nil, r.newErrCorruptedBH(bh, err.Error()) } decData := r.bpool.Get(decLen) @@ -787,8 +783,8 @@ func (r *Reader) getDataIterErr(dataBH blockHandle, slice *util.Range, verifyChe // table. And a nil Range.Limit is treated as a key after all keys in // the table. // -// The returned iterator is not safe for concurrent use and should be released -// after use. +// The returned iterator is not goroutine-safe and should be released +// when not used. // // Also read Iterator documentation of the leveldb/iterator package. func (r *Reader) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { @@ -830,21 +826,18 @@ func (r *Reader) find(key []byte, filtered bool, ro *opt.ReadOptions, noValue bo index := r.newBlockIter(indexBlock, nil, nil, true) defer index.Release() - if !index.Seek(key) { - if err = index.Error(); err == nil { + err = index.Error() + if err == nil { err = ErrNotFound } return } - dataBH, n := decodeBlockHandle(index.Value()) if n == 0 { r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle") - return nil, nil, r.err + return } - - // The filter should only used for exact match. if filtered && r.filter != nil { filterBlock, frel, ferr := r.getFilterBlock(true) if ferr == nil { @@ -854,53 +847,30 @@ func (r *Reader) find(key []byte, filtered bool, ro *opt.ReadOptions, noValue bo } frel.Release() } else if !errors.IsCorrupted(ferr) { - return nil, nil, ferr + err = ferr + return } } - data := r.getDataIter(dataBH, nil, r.verifyChecksum, !ro.GetDontFillCache()) + defer data.Release() if !data.Seek(key) { - data.Release() - if err = data.Error(); err != nil { - return - } - - // The nearest greater-than key is the first key of the next block. - if !index.Next() { - if err = index.Error(); err == nil { - err = ErrNotFound - } - return - } - - dataBH, n = decodeBlockHandle(index.Value()) - if n == 0 { - r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle") - return nil, nil, r.err - } - - data = r.getDataIter(dataBH, nil, r.verifyChecksum, !ro.GetDontFillCache()) - if !data.Next() { - data.Release() - if err = data.Error(); err == nil { - err = ErrNotFound - } - return + err = data.Error() + if err == nil { + err = ErrNotFound } + return } - - // Key doesn't use block buffer, no need to copy the buffer. + // Don't use block buffer, no need to copy the buffer. rkey = data.Key() if !noValue { if r.bpool == nil { value = data.Value() } else { - // Value does use block buffer, and since the buffer will be - // recycled, it need to be copied. + // Use block buffer, and since the buffer will be recycled, the buffer + // need to be copied. value = append([]byte{}, data.Value()...) } } - data.Release() return } @@ -918,7 +888,7 @@ func (r *Reader) Find(key []byte, filtered bool, ro *opt.ReadOptions) (rkey, val return r.find(key, filtered, ro, false) } -// FindKey finds key that is greater than or equal to the given key. +// Find finds key that is greater than or equal to the given key. // It returns ErrNotFound if the table doesn't contain such key. // If filtered is true then the nearest 'block' will be checked against // 'filter data' (if present) and will immediately return ErrNotFound if @@ -1017,7 +987,7 @@ func (r *Reader) Release() { // NewReader creates a new initialized table reader for the file. // The fi, cache and bpool is optional and can be nil. // -// The returned table reader instance is safe for concurrent use. +// The returned table reader instance is goroutine-safe. func NewReader(f io.ReaderAt, size int64, fd storage.FileDesc, cache *cache.NamespaceGetter, bpool *util.BufferPool, o *opt.Options) (*Reader, error) { if f == nil { return nil, errors.New("leveldb/table: nil file") @@ -1069,8 +1039,9 @@ func NewReader(f io.ReaderAt, size int64, fd storage.FileDesc, cache *cache.Name if errors.IsCorrupted(err) { r.err = err return r, nil + } else { + return nil, err } - return nil, err } // Set data end. @@ -1115,8 +1086,9 @@ func NewReader(f io.ReaderAt, size int64, fd storage.FileDesc, cache *cache.Name if errors.IsCorrupted(err) { r.err = err return r, nil + } else { + return nil, err } - return nil, err } if r.filter != nil { r.filterBlock, err = r.readFilterBlock(r.filterBH) diff --git a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/table/writer.go b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/table/writer.go index b96b271..274dee6 100644 --- a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/table/writer.go +++ b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/table/writer.go @@ -349,7 +349,7 @@ func (w *Writer) Close() error { // NewWriter creates a new initialized table writer for the file. // -// Table writer is not safe for concurrent use. +// Table writer is not goroutine-safe. func NewWriter(f io.Writer, o *opt.Options) *Writer { w := &Writer{ writer: f, diff --git a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/util.go b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/util.go index e572a32..3b663d1 100644 --- a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/util.go +++ b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/util.go @@ -89,10 +89,3 @@ func (p fdSorter) Swap(i, j int) { func sortFds(fds []storage.FileDesc) { sort.Sort(fdSorter(fds)) } - -func ensureBuffer(b []byte, n int) []byte { - if cap(b) < n { - return make([]byte, n) - } - return b[:n] -} diff --git a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/version.go b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/version.go index 73f272a..d274eef 100644 --- a/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/version.go +++ b/_vendor/vendor/github.com/syndtr/goleveldb/leveldb/version.go @@ -34,48 +34,43 @@ type version struct { cSeek unsafe.Pointer - closing bool - ref int - released bool + ref int + // Succeeding version. + next *version } func newVersion(s *session) *version { return &version{s: s} } -func (v *version) incref() { - if v.released { - panic("already released") - } - - v.ref++ - if v.ref == 1 { - // Incr file ref. - for _, tt := range v.levels { - for _, t := range tt { - v.s.addFileRef(t.fd, 1) - } - } - } -} - func (v *version) releaseNB() { v.ref-- if v.ref > 0 { return - } else if v.ref < 0 { + } + if v.ref < 0 { panic("negative version ref") } + nextTables := make(map[int64]bool) + for _, tt := range v.next.levels { + for _, t := range tt { + num := t.fd.Num + nextTables[num] = true + } + } + for _, tt := range v.levels { for _, t := range tt { - if v.s.addFileRef(t.fd, -1) == 0 { + num := t.fd.Num + if _, ok := nextTables[num]; !ok { v.s.tops.remove(t) } } } - v.released = true + v.next.releaseNB() + v.next = nil } func (v *version) release() { @@ -136,10 +131,6 @@ func (v *version) walkOverlapping(aux tFiles, ikey internalKey, f func(level int } func (v *version) get(aux tFiles, ikey internalKey, ro *opt.ReadOptions, noValue bool) (value []byte, tcomp bool, err error) { - if v.closing { - return nil, false, ErrClosed - } - ukey := ikey.ukey() var ( diff --git a/glide.lock b/glide.lock index 6968478..63462a6 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: b1848c39932685b6791ef2b3eaf9840bfe4591706f0a53cc83b52833be63d545 -updated: 2017-03-12T16:13:15.751664496+08:00 +hash: bb587732cf0325595c4805c0351a47c2410c86e3aa279c4cc7c8959d2221631c +updated: 2017-03-14T21:51:57.391664488+08:00 imports: - name: github.com/BurntSushi/toml version: bbd5bb678321a0d6e58f1099321dfa73391c1b6f @@ -32,7 +32,7 @@ imports: - name: github.com/siddontang/rdb version: fc89ed2e418d27e3ea76e708e54276d2b44ae9cf - name: github.com/syndtr/goleveldb - version: 3c5717caf1475fd25964109a0fc640bd150fce43 + version: cfa635847112c5dc4782e128fa7e0d05fdbfb394 subpackages: - leveldb - leveldb/cache diff --git a/glide.yaml b/glide.yaml index 6eda9db..aa45cf7 100644 --- a/glide.yaml +++ b/glide.yaml @@ -24,7 +24,7 @@ import: - package: github.com/siddontang/rdb version: fc89ed2e418d27e3ea76e708e54276d2b44ae9cf - package: github.com/syndtr/goleveldb - version: 3c5717caf1475fd25964109a0fc640bd150fce43 + version: cfa635847112c5dc4782e128fa7e0d05fdbfb394 subpackages: - leveldb - leveldb/cache diff --git a/store/rocksdb/batch.go b/store/rocksdb/batch.go index 1d7ce0f..bb727e7 100644 --- a/store/rocksdb/batch.go +++ b/store/rocksdb/batch.go @@ -75,12 +75,9 @@ func (w *WriteBatch) commit(wb *WriteOptions) error { return nil } -const batchDataHeadLen = 12 - func (w *WriteBatch) Data() []byte { var vallen C.size_t value := C.rocksdb_writebatch_data(w.wbatch, &vallen) - buf := slice(unsafe.Pointer(value), int(vallen)) - return buf[batchDataHeadLen:] + return slice(unsafe.Pointer(value), int(vallen)) } diff --git a/store/store_test.go b/store/store_test.go index ffe954f..3e33a7c 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -358,7 +358,7 @@ func testBatchData(db *DB, t *testing.T) { expected := []BatchItem{ {[]byte("a"), []byte("1")}, - {[]byte("b"), []byte(nil)}, + {[]byte("b"), []byte{}}, {[]byte("c"), nil}, }