Merge branch 'develop'

This commit is contained in:
siddontang 2015-05-03 15:36:41 +08:00
commit 4525ed025b
35 changed files with 1720 additions and 158 deletions

28
Godeps/Godeps.json generated
View File

@ -1,6 +1,6 @@
{ {
"ImportPath": "github.com/siddontang/ledisdb", "ImportPath": "github.com/siddontang/ledisdb",
"GoVersion": "go1.4.2", "GoVersion": "go1.3.3",
"Packages": [ "Packages": [
"./..." "./..."
], ],
@ -11,8 +11,8 @@
}, },
{ {
"ImportPath": "github.com/boltdb/bolt", "ImportPath": "github.com/boltdb/bolt",
"Comment": "v1.0-28-gb8dbe11", "Comment": "v1.0-62-gee95430",
"Rev": "b8dbe1101d74c30e817227a99716591e3e6e6bc4" "Rev": "ee954308d64186f0fc9b7022b6178977848c17a3"
}, },
{ {
"ImportPath": "github.com/cupcake/rdb", "ImportPath": "github.com/cupcake/rdb",
@ -24,39 +24,39 @@
}, },
{ {
"ImportPath": "github.com/siddontang/go/bson", "ImportPath": "github.com/siddontang/go/bson",
"Rev": "c2b33271306fcb7c6532efceac33ec45ee2439e0" "Rev": "530a23162549a31baa14dfa3b647a9eccee8878f"
}, },
{ {
"ImportPath": "github.com/siddontang/go/filelock", "ImportPath": "github.com/siddontang/go/filelock",
"Rev": "c2b33271306fcb7c6532efceac33ec45ee2439e0" "Rev": "530a23162549a31baa14dfa3b647a9eccee8878f"
}, },
{ {
"ImportPath": "github.com/siddontang/go/hack", "ImportPath": "github.com/siddontang/go/hack",
"Rev": "c2b33271306fcb7c6532efceac33ec45ee2439e0" "Rev": "530a23162549a31baa14dfa3b647a9eccee8878f"
}, },
{ {
"ImportPath": "github.com/siddontang/go/ioutil2", "ImportPath": "github.com/siddontang/go/ioutil2",
"Rev": "c2b33271306fcb7c6532efceac33ec45ee2439e0" "Rev": "530a23162549a31baa14dfa3b647a9eccee8878f"
}, },
{ {
"ImportPath": "github.com/siddontang/go/log", "ImportPath": "github.com/siddontang/go/log",
"Rev": "c2b33271306fcb7c6532efceac33ec45ee2439e0" "Rev": "530a23162549a31baa14dfa3b647a9eccee8878f"
}, },
{ {
"ImportPath": "github.com/siddontang/go/num", "ImportPath": "github.com/siddontang/go/num",
"Rev": "c2b33271306fcb7c6532efceac33ec45ee2439e0" "Rev": "530a23162549a31baa14dfa3b647a9eccee8878f"
}, },
{ {
"ImportPath": "github.com/siddontang/go/snappy", "ImportPath": "github.com/siddontang/go/snappy",
"Rev": "c2b33271306fcb7c6532efceac33ec45ee2439e0" "Rev": "530a23162549a31baa14dfa3b647a9eccee8878f"
}, },
{ {
"ImportPath": "github.com/siddontang/go/sync2", "ImportPath": "github.com/siddontang/go/sync2",
"Rev": "c2b33271306fcb7c6532efceac33ec45ee2439e0" "Rev": "530a23162549a31baa14dfa3b647a9eccee8878f"
}, },
{ {
"ImportPath": "github.com/siddontang/goredis", "ImportPath": "github.com/siddontang/goredis",
"Rev": "802ef3bdd5f642335f9ed132e024e5e2fd3d03ce" "Rev": "760763f78400635ed7b9b115511b8ed06035e908"
}, },
{ {
"ImportPath": "github.com/siddontang/rdb", "ImportPath": "github.com/siddontang/rdb",
@ -64,11 +64,11 @@
}, },
{ {
"ImportPath": "github.com/syndtr/goleveldb/leveldb", "ImportPath": "github.com/syndtr/goleveldb/leveldb",
"Rev": "e9e2c8f6d3b9c313fb4acaac5ab06285bcf30b04" "Rev": "4875955338b0a434238a31165cb87255ab6e9e4a"
}, },
{ {
"ImportPath": "github.com/syndtr/gosnappy/snappy", "ImportPath": "github.com/syndtr/gosnappy/snappy",
"Rev": "ce8acff4829e0c2458a67ead32390ac0a381c862" "Rev": "156a073208e131d7d2e212cb749feae7c339e846"
}, },
{ {
"ImportPath": "github.com/ugorji/go/codec", "ImportPath": "github.com/ugorji/go/codec",

View File

@ -16,7 +16,7 @@ and setting values. That's it.
## Project Status ## Project Status
Bolt is stable and the API is fixed. Full unit test coverage and randomized Bolt is stable and the API is fixed. Full unit test coverage and randomized
black box testing are used to ensure database consistency and thread safety. black box testing are used to ensure database consistency and thread safety.
Bolt is currently in high-load production environments serving databases as Bolt is currently in high-load production environments serving databases as
large as 1TB. Many companies such as Shopify and Heroku use Bolt-backed large as 1TB. Many companies such as Shopify and Heroku use Bolt-backed
@ -120,11 +120,53 @@ err := db.View(func(tx *bolt.Tx) error {
}) })
``` ```
You also get a consistent view of the database within this closure, however, You also get a consistent view of the database within this closure, however,
no mutating operations are allowed within a read-only transaction. You can only no mutating operations are allowed within a read-only transaction. You can only
retrieve buckets, retrieve values, and copy the database within a read-only retrieve buckets, retrieve values, and copy the database within a read-only
transaction. transaction.
#### Batch read-write transactions
Each `DB.Update()` waits for disk to commit the writes. This overhead
can be minimized by combining multiple updates with the `DB.Batch()`
function:
```go
err := db.Batch(func(tx *bolt.Tx) error {
...
return nil
})
```
Concurrent Batch calls are opportunistically combined into larger
transactions. Batch is only useful when there are multiple goroutines
calling it.
The trade-off is that `Batch` can call the given
function multiple times, if parts of the transaction fail. The
function must be idempotent and side effects must take effect only
after a successful return from `DB.Batch()`.
For example: don't display messages from inside the function, instead
set variables in the enclosing scope:
```go
var id uint64
err := db.Batch(func(tx *bolt.Tx) error {
// Find last key in bucket, decode as bigendian uint64, increment
// by one, encode back to []byte, and add new key.
...
id = newValue
return nil
})
if err != nil {
return ...
}
fmt.Println("Allocated ID %d", id)
```
#### Managing transactions manually #### Managing transactions manually
The `DB.View()` and `DB.Update()` functions are wrappers around the `DB.Begin()` The `DB.View()` and `DB.Update()` functions are wrappers around the `DB.Begin()`
@ -145,7 +187,7 @@ if err != nil {
defer tx.Rollback() defer tx.Rollback()
// Use the transaction... // Use the transaction...
_, err := tx.CreateBucket([]byte("MyBucket") _, err := tx.CreateBucket([]byte("MyBucket"))
if err != nil { if err != nil {
return err return err
} }
@ -216,6 +258,10 @@ set to a key which is different than the key not existing.
Use the `Bucket.Delete()` function to delete a key from the bucket. Use the `Bucket.Delete()` function to delete a key from the bucket.
Please note that values returned from `Get()` are only valid while the
transaction is open. If you need to use a value outside of the transaction
then you must use `copy()` to copy it to another byte slice.
### Iterating over keys ### Iterating over keys
@ -328,7 +374,7 @@ func (*Bucket) DeleteBucket(key []byte) error
### Database backups ### Database backups
Bolt is a single file so it's easy to backup. You can use the `Tx.Copy()` Bolt is a single file so it's easy to backup. You can use the `Tx.WriteTo()`
function to write a consistent view of the database to a writer. If you call function to write a consistent view of the database to a writer. If you call
this from a read-only transaction, it will perform a hot backup and not block this from a read-only transaction, it will perform a hot backup and not block
your other database reads and writes. It will also use `O_DIRECT` when available your other database reads and writes. It will also use `O_DIRECT` when available
@ -339,11 +385,12 @@ do database backups:
```go ```go
func BackupHandleFunc(w http.ResponseWriter, req *http.Request) { func BackupHandleFunc(w http.ResponseWriter, req *http.Request) {
err := db.View(func(tx bolt.Tx) error { err := db.View(func(tx *bolt.Tx) error {
w.Header().Set("Content-Type", "application/octet-stream") w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Content-Disposition", `attachment; filename="my.db"`) w.Header().Set("Content-Disposition", `attachment; filename="my.db"`)
w.Header().Set("Content-Length", strconv.Itoa(int(tx.Size()))) w.Header().Set("Content-Length", strconv.Itoa(int(tx.Size())))
return tx.Copy(w) _, err := tx.WriteTo(w)
return err
}) })
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
@ -385,7 +432,7 @@ go func() {
// Grab the current stats and diff them. // Grab the current stats and diff them.
stats := db.Stats() stats := db.Stats()
diff := stats.Sub(&prev) diff := stats.Sub(&prev)
// Encode stats to JSON and print to STDERR. // Encode stats to JSON and print to STDERR.
json.NewEncoder(os.Stderr).Encode(diff) json.NewEncoder(os.Stderr).Encode(diff)
@ -404,7 +451,7 @@ or to provide an HTTP endpoint that will perform a fixed-length sample.
For more information on getting started with Bolt, check out the following articles: For more information on getting started with Bolt, check out the following articles:
* [Intro to BoltDB: Painless Performant Persistence](http://npf.io/2014/07/intro-to-boltdb-painless-performant-persistence/) by [Nate Finch](https://github.com/natefinch). * [Intro to BoltDB: Painless Performant Persistence](http://npf.io/2014/07/intro-to-boltdb-painless-performant-persistence/) by [Nate Finch](https://github.com/natefinch).
* [Bolt -- an embedded key/value database for Go](https://www.progville.com/go/bolt-embedded-db-golang/) by Progville
## Comparison with other databases ## Comparison with other databases
@ -453,9 +500,9 @@ lock-free MVCC using a single writer and multiple readers.
The two projects have somewhat diverged. LMDB heavily focuses on raw performance The two projects have somewhat diverged. LMDB heavily focuses on raw performance
while Bolt has focused on simplicity and ease of use. For example, LMDB allows while Bolt has focused on simplicity and ease of use. For example, LMDB allows
several unsafe actions such as direct writes and append writes for the sake of several unsafe actions such as direct writes for the sake of performance. Bolt
performance. Bolt opts to disallow actions which can leave the database in a opts to disallow actions which can leave the database in a corrupted state. The
corrupted state. The only exception to this in Bolt is `DB.NoSync`. only exception to this in Bolt is `DB.NoSync`.
There are also a few differences in API. LMDB requires a maximum mmap size when There are also a few differences in API. LMDB requires a maximum mmap size when
opening an `mdb_env` whereas Bolt will handle incremental mmap resizing opening an `mdb_env` whereas Bolt will handle incremental mmap resizing
@ -503,11 +550,23 @@ Here are a few things to note when evaluating and using Bolt:
However, this is expected and the OS will release memory as needed. Bolt can However, this is expected and the OS will release memory as needed. Bolt can
handle databases much larger than the available physical RAM. handle databases much larger than the available physical RAM.
* Because of the way pages are laid out on disk, Bolt cannot truncate data files
and return free pages back to the disk. Instead, Bolt maintains a free list
of unused pages within its data file. These free pages can be reused by later
transactions. This works well for many use cases as databases generally tend
to grow. However, it's important to note that deleting large chunks of data
will not allow you to reclaim that space on disk.
For more information on page allocation, [see this comment][page-allocation].
[page-allocation]: https://github.com/boltdb/bolt/issues/308#issuecomment-74811638
## Other Projects Using Bolt ## Other Projects Using Bolt
Below is a list of public, open source projects that use Bolt: Below is a list of public, open source projects that use Bolt:
* [Operation Go: A Routine Mission](http://gocode.io) - An online programming game for Golang using Bolt for user accounts and a leaderboard.
* [Bazil](https://github.com/bazillion/bazil) - A file system that lets your data reside where it is most convenient for it to reside. * [Bazil](https://github.com/bazillion/bazil) - A file system that lets your data reside where it is most convenient for it to reside.
* [DVID](https://github.com/janelia-flyem/dvid) - Added Bolt as optional storage engine and testing it against Basho-tuned leveldb. * [DVID](https://github.com/janelia-flyem/dvid) - Added Bolt as optional storage engine and testing it against Basho-tuned leveldb.
* [Skybox Analytics](https://github.com/skybox/skybox) - A standalone funnel analysis tool for web analytics. * [Skybox Analytics](https://github.com/skybox/skybox) - A standalone funnel analysis tool for web analytics.
@ -526,6 +585,6 @@ Below is a list of public, open source projects that use Bolt:
* [bleve](http://www.blevesearch.com/) - A pure Go search engine similar to ElasticSearch that uses Bolt as the default storage backend. * [bleve](http://www.blevesearch.com/) - A pure Go search engine similar to ElasticSearch that uses Bolt as the default storage backend.
* [tentacool](https://github.com/optiflows/tentacool) - REST api server to manage system stuff (IP, DNS, Gateway...) on a linux server. * [tentacool](https://github.com/optiflows/tentacool) - REST api server to manage system stuff (IP, DNS, Gateway...) on a linux server.
* [SkyDB](https://github.com/skydb/sky) - Behavioral analytics database. * [SkyDB](https://github.com/skydb/sky) - Behavioral analytics database.
* [Seaweed File System](https://github.com/chrislusf/weed-fs) - Highly scalable distributed key~file system with O(1) disk read.
If you are using Bolt in a project please send a pull request to add it to the list. If you are using Bolt in a project please send a pull request to add it to the list.

135
Godeps/_workspace/src/github.com/boltdb/bolt/batch.go generated vendored Normal file
View File

@ -0,0 +1,135 @@
package bolt
import (
"errors"
"fmt"
"sync"
"time"
)
// Batch calls fn as part of a batch. It behaves similar to Update,
// except:
//
// 1. concurrent Batch calls can be combined into a single Bolt
// transaction.
//
// 2. the function passed to Batch may be called multiple times,
// regardless of whether it returns error or not.
//
// This means that Batch function side effects must be idempotent and
// take permanent effect only after a successful return is seen in
// caller.
//
// Batch is only useful when there are multiple goroutines calling it.
func (db *DB) Batch(fn func(*Tx) error) error {
errCh := make(chan error, 1)
db.batchMu.Lock()
if (db.batch == nil) || (db.batch != nil && len(db.batch.calls) >= db.MaxBatchSize) {
// There is no existing batch, or the existing batch is full; start a new one.
db.batch = &batch{
db: db,
}
db.batch.timer = time.AfterFunc(db.MaxBatchDelay, db.batch.trigger)
}
db.batch.calls = append(db.batch.calls, call{fn: fn, err: errCh})
if len(db.batch.calls) >= db.MaxBatchSize {
// wake up batch, it's ready to run
go db.batch.trigger()
}
db.batchMu.Unlock()
err := <-errCh
if err == trySolo {
err = db.Update(fn)
}
return err
}
type call struct {
fn func(*Tx) error
err chan<- error
}
type batch struct {
db *DB
timer *time.Timer
start sync.Once
calls []call
}
// trigger runs the batch if it hasn't already been run.
func (b *batch) trigger() {
b.start.Do(b.run)
}
// run performs the transactions in the batch and communicates results
// back to DB.Batch.
func (b *batch) run() {
b.db.batchMu.Lock()
b.timer.Stop()
// Make sure no new work is added to this batch, but don't break
// other batches.
if b.db.batch == b {
b.db.batch = nil
}
b.db.batchMu.Unlock()
retry:
for len(b.calls) > 0 {
var failIdx = -1
err := b.db.Update(func(tx *Tx) error {
for i, c := range b.calls {
if err := safelyCall(c.fn, tx); err != nil {
failIdx = i
return err
}
}
return nil
})
if failIdx >= 0 {
// take the failing transaction out of the batch. it's
// safe to shorten b.calls here because db.batch no longer
// points to us, and we hold the mutex anyway.
c := b.calls[failIdx]
b.calls[failIdx], b.calls = b.calls[len(b.calls)-1], b.calls[:len(b.calls)-1]
// tell the submitter re-run it solo, continue with the rest of the batch
c.err <- trySolo
continue retry
}
// pass success, or bolt internal errors, to all callers
for _, c := range b.calls {
if c.err != nil {
c.err <- err
}
}
break retry
}
}
// trySolo is a special sentinel error value used for signaling that a
// transaction function should be re-run. It should never be seen by
// callers.
var trySolo = errors.New("batch function returned an error and should be re-run solo")
type panicked struct {
reason interface{}
}
func (p panicked) Error() string {
if err, ok := p.reason.(error); ok {
return err.Error()
}
return fmt.Sprintf("panic: %v", p.reason)
}
func safelyCall(fn func(*Tx) error, tx *Tx) (err error) {
defer func() {
if p := recover(); p != nil {
err = panicked{p}
}
}()
return fn(tx)
}

View File

@ -0,0 +1,170 @@
package bolt_test
import (
"bytes"
"encoding/binary"
"errors"
"hash/fnv"
"sync"
"testing"
"github.com/boltdb/bolt"
)
func validateBatchBench(b *testing.B, db *TestDB) {
var rollback = errors.New("sentinel error to cause rollback")
validate := func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte("bench"))
h := fnv.New32a()
buf := make([]byte, 4)
for id := uint32(0); id < 1000; id++ {
binary.LittleEndian.PutUint32(buf, id)
h.Reset()
h.Write(buf[:])
k := h.Sum(nil)
v := bucket.Get(k)
if v == nil {
b.Errorf("not found id=%d key=%x", id, k)
continue
}
if g, e := v, []byte("filler"); !bytes.Equal(g, e) {
b.Errorf("bad value for id=%d key=%x: %s != %q", id, k, g, e)
}
if err := bucket.Delete(k); err != nil {
return err
}
}
// should be empty now
c := bucket.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
b.Errorf("unexpected key: %x = %q", k, v)
}
return rollback
}
if err := db.Update(validate); err != nil && err != rollback {
b.Error(err)
}
}
func BenchmarkDBBatchAutomatic(b *testing.B) {
db := NewTestDB()
defer db.Close()
db.MustCreateBucket([]byte("bench"))
b.ResetTimer()
for i := 0; i < b.N; i++ {
start := make(chan struct{})
var wg sync.WaitGroup
for round := 0; round < 1000; round++ {
wg.Add(1)
go func(id uint32) {
defer wg.Done()
<-start
h := fnv.New32a()
buf := make([]byte, 4)
binary.LittleEndian.PutUint32(buf, id)
h.Write(buf[:])
k := h.Sum(nil)
insert := func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("bench"))
return b.Put(k, []byte("filler"))
}
if err := db.Batch(insert); err != nil {
b.Error(err)
return
}
}(uint32(round))
}
close(start)
wg.Wait()
}
b.StopTimer()
validateBatchBench(b, db)
}
func BenchmarkDBBatchSingle(b *testing.B) {
db := NewTestDB()
defer db.Close()
db.MustCreateBucket([]byte("bench"))
b.ResetTimer()
for i := 0; i < b.N; i++ {
start := make(chan struct{})
var wg sync.WaitGroup
for round := 0; round < 1000; round++ {
wg.Add(1)
go func(id uint32) {
defer wg.Done()
<-start
h := fnv.New32a()
buf := make([]byte, 4)
binary.LittleEndian.PutUint32(buf, id)
h.Write(buf[:])
k := h.Sum(nil)
insert := func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("bench"))
return b.Put(k, []byte("filler"))
}
if err := db.Update(insert); err != nil {
b.Error(err)
return
}
}(uint32(round))
}
close(start)
wg.Wait()
}
b.StopTimer()
validateBatchBench(b, db)
}
func BenchmarkDBBatchManual10x100(b *testing.B) {
db := NewTestDB()
defer db.Close()
db.MustCreateBucket([]byte("bench"))
b.ResetTimer()
for i := 0; i < b.N; i++ {
start := make(chan struct{})
var wg sync.WaitGroup
for major := 0; major < 10; major++ {
wg.Add(1)
go func(id uint32) {
defer wg.Done()
<-start
insert100 := func(tx *bolt.Tx) error {
h := fnv.New32a()
buf := make([]byte, 4)
for minor := uint32(0); minor < 100; minor++ {
binary.LittleEndian.PutUint32(buf, uint32(id*100+minor))
h.Reset()
h.Write(buf[:])
k := h.Sum(nil)
b := tx.Bucket([]byte("bench"))
if err := b.Put(k, []byte("filler")); err != nil {
return err
}
}
return nil
}
if err := db.Update(insert100); err != nil {
b.Fatal(err)
}
}(uint32(major))
}
close(start)
wg.Wait()
}
b.StopTimer()
validateBatchBench(b, db)
}

View File

@ -0,0 +1,148 @@
package bolt_test
import (
"encoding/binary"
"fmt"
"io/ioutil"
"log"
"math/rand"
"net/http"
"net/http/httptest"
"os"
"github.com/boltdb/bolt"
)
// Set this to see how the counts are actually updated.
const verbose = false
// Counter updates a counter in Bolt for every URL path requested.
type counter struct {
db *bolt.DB
}
func (c counter) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
// Communicates the new count from a successful database
// transaction.
var result uint64
increment := func(tx *bolt.Tx) error {
b, err := tx.CreateBucketIfNotExists([]byte("hits"))
if err != nil {
return err
}
key := []byte(req.URL.String())
// Decode handles key not found for us.
count := decode(b.Get(key)) + 1
b.Put(key, encode(count))
// All good, communicate new count.
result = count
return nil
}
if err := c.db.Batch(increment); err != nil {
http.Error(rw, err.Error(), 500)
return
}
if verbose {
log.Printf("server: %s: %d", req.URL.String(), result)
}
rw.Header().Set("Content-Type", "application/octet-stream")
fmt.Fprintf(rw, "%d\n", result)
}
func client(id int, base string, paths []string) error {
// Process paths in random order.
rng := rand.New(rand.NewSource(int64(id)))
permutation := rng.Perm(len(paths))
for i := range paths {
path := paths[permutation[i]]
resp, err := http.Get(base + path)
if err != nil {
return err
}
defer resp.Body.Close()
buf, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
if verbose {
log.Printf("client: %s: %s", path, buf)
}
}
return nil
}
func ExampleDB_Batch() {
// Open the database.
db, _ := bolt.Open(tempfile(), 0666, nil)
defer os.Remove(db.Path())
defer db.Close()
// Start our web server
count := counter{db}
srv := httptest.NewServer(count)
defer srv.Close()
// Decrease the batch size to make things more interesting.
db.MaxBatchSize = 3
// Get every path multiple times concurrently.
const clients = 10
paths := []string{
"/foo",
"/bar",
"/baz",
"/quux",
"/thud",
"/xyzzy",
}
errors := make(chan error, clients)
for i := 0; i < clients; i++ {
go func(id int) {
errors <- client(id, srv.URL, paths)
}(i)
}
// Check all responses to make sure there's no error.
for i := 0; i < clients; i++ {
if err := <-errors; err != nil {
fmt.Printf("client error: %v", err)
return
}
}
// Check the final result
db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("hits"))
c := b.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
fmt.Printf("hits to %s: %d\n", k, decode(v))
}
return nil
})
// Output:
// hits to /bar: 10
// hits to /baz: 10
// hits to /foo: 10
// hits to /quux: 10
// hits to /thud: 10
// hits to /xyzzy: 10
}
// encode marshals a counter.
func encode(n uint64) []byte {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, n)
return buf
}
// decode unmarshals a counter. Nil buffers are decoded as 0.
func decode(buf []byte) uint64 {
if buf == nil {
return 0
}
return binary.BigEndian.Uint64(buf)
}

View File

@ -0,0 +1,167 @@
package bolt_test
import (
"testing"
"time"
"github.com/boltdb/bolt"
)
// Ensure two functions can perform updates in a single batch.
func TestDB_Batch(t *testing.T) {
db := NewTestDB()
defer db.Close()
db.MustCreateBucket([]byte("widgets"))
// Iterate over multiple updates in separate goroutines.
n := 2
ch := make(chan error)
for i := 0; i < n; i++ {
go func(i int) {
ch <- db.Batch(func(tx *bolt.Tx) error {
return tx.Bucket([]byte("widgets")).Put(u64tob(uint64(i)), []byte{})
})
}(i)
}
// Check all responses to make sure there's no error.
for i := 0; i < n; i++ {
if err := <-ch; err != nil {
t.Fatal(err)
}
}
// Ensure data is correct.
db.MustView(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("widgets"))
for i := 0; i < n; i++ {
if v := b.Get(u64tob(uint64(i))); v == nil {
t.Errorf("key not found: %d", i)
}
}
return nil
})
}
func TestDB_Batch_Panic(t *testing.T) {
db := NewTestDB()
defer db.Close()
var sentinel int
var bork = &sentinel
var problem interface{}
var err error
// Execute a function inside a batch that panics.
func() {
defer func() {
if p := recover(); p != nil {
problem = p
}
}()
err = db.Batch(func(tx *bolt.Tx) error {
panic(bork)
})
}()
// Verify there is no error.
if g, e := err, error(nil); g != e {
t.Fatalf("wrong error: %v != %v", g, e)
}
// Verify the panic was captured.
if g, e := problem, bork; g != e {
t.Fatalf("wrong error: %v != %v", g, e)
}
}
func TestDB_BatchFull(t *testing.T) {
db := NewTestDB()
defer db.Close()
db.MustCreateBucket([]byte("widgets"))
const size = 3
// buffered so we never leak goroutines
ch := make(chan error, size)
put := func(i int) {
ch <- db.Batch(func(tx *bolt.Tx) error {
return tx.Bucket([]byte("widgets")).Put(u64tob(uint64(i)), []byte{})
})
}
db.MaxBatchSize = size
// high enough to never trigger here
db.MaxBatchDelay = 1 * time.Hour
go put(1)
go put(2)
// Give the batch a chance to exhibit bugs.
time.Sleep(10 * time.Millisecond)
// not triggered yet
select {
case <-ch:
t.Fatalf("batch triggered too early")
default:
}
go put(3)
// Check all responses to make sure there's no error.
for i := 0; i < size; i++ {
if err := <-ch; err != nil {
t.Fatal(err)
}
}
// Ensure data is correct.
db.MustView(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("widgets"))
for i := 1; i <= size; i++ {
if v := b.Get(u64tob(uint64(i))); v == nil {
t.Errorf("key not found: %d", i)
}
}
return nil
})
}
func TestDB_BatchTime(t *testing.T) {
db := NewTestDB()
defer db.Close()
db.MustCreateBucket([]byte("widgets"))
const size = 1
// buffered so we never leak goroutines
ch := make(chan error, size)
put := func(i int) {
ch <- db.Batch(func(tx *bolt.Tx) error {
return tx.Bucket([]byte("widgets")).Put(u64tob(uint64(i)), []byte{})
})
}
db.MaxBatchSize = 1000
db.MaxBatchDelay = 0
go put(1)
// Batch must trigger by time alone.
// Check all responses to make sure there's no error.
for i := 0; i < size; i++ {
if err := <-ch; err != nil {
t.Fatal(err)
}
}
// Ensure data is correct.
db.MustView(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("widgets"))
for i := 1; i <= size; i++ {
if v := b.Get(u64tob(uint64(i))); v == nil {
t.Errorf("key not found: %d", i)
}
}
return nil
})
}

View File

@ -2,3 +2,6 @@ package bolt
// maxMapSize represents the largest mmap size supported by Bolt. // maxMapSize represents the largest mmap size supported by Bolt.
const maxMapSize = 0x7FFFFFFF // 2GB const maxMapSize = 0x7FFFFFFF // 2GB
// maxAllocSize is the size used when creating array pointers.
const maxAllocSize = 0xFFFFFFF

View File

@ -2,3 +2,6 @@ package bolt
// maxMapSize represents the largest mmap size supported by Bolt. // maxMapSize represents the largest mmap size supported by Bolt.
const maxMapSize = 0xFFFFFFFFFFFF // 256TB const maxMapSize = 0xFFFFFFFFFFFF // 256TB
// maxAllocSize is the size used when creating array pointers.
const maxAllocSize = 0x7FFFFFFF

View File

@ -2,3 +2,6 @@ package bolt
// maxMapSize represents the largest mmap size supported by Bolt. // maxMapSize represents the largest mmap size supported by Bolt.
const maxMapSize = 0x7FFFFFFF // 2GB const maxMapSize = 0x7FFFFFFF // 2GB
// maxAllocSize is the size used when creating array pointers.
const maxAllocSize = 0xFFFFFFF

View File

@ -252,6 +252,7 @@ func (b *Bucket) DeleteBucket(key []byte) error {
// Get retrieves the value for a key in the bucket. // Get retrieves the value for a key in the bucket.
// Returns a nil value if the key does not exist or if the key is a nested bucket. // Returns a nil value if the key does not exist or if the key is a nested bucket.
// The returned value is only valid for the life of the transaction.
func (b *Bucket) Get(key []byte) []byte { func (b *Bucket) Get(key []byte) []byte {
k, v, flags := b.Cursor().seek(key) k, v, flags := b.Cursor().seek(key)

View File

@ -22,5 +22,5 @@ func Info(path string) {
// Print basic database info. // Print basic database info.
var info = db.Info() var info = db.Info()
printf("Page Size: %d", info.PageSize) printf("Page Size: %d\n", info.PageSize)
} }

View File

@ -10,6 +10,8 @@ import (
// Cursors see nested buckets with value == nil. // Cursors see nested buckets with value == nil.
// Cursors can be obtained from a transaction and are valid as long as the transaction is open. // Cursors can be obtained from a transaction and are valid as long as the transaction is open.
// //
// Keys and values returned from the cursor are only valid for the life of the transaction.
//
// Changing data while traversing with a cursor may cause it to be invalidated // Changing data while traversing with a cursor may cause it to be invalidated
// and return unexpected keys and/or values. You must reposition your cursor // and return unexpected keys and/or values. You must reposition your cursor
// after mutating data. // after mutating data.
@ -25,6 +27,7 @@ func (c *Cursor) Bucket() *Bucket {
// First moves the cursor to the first item in the bucket and returns its key and value. // First moves the cursor to the first item in the bucket and returns its key and value.
// If the bucket is empty then a nil key and value are returned. // If the bucket is empty then a nil key and value are returned.
// The returned key and value are only valid for the life of the transaction.
func (c *Cursor) First() (key []byte, value []byte) { func (c *Cursor) First() (key []byte, value []byte) {
_assert(c.bucket.tx.db != nil, "tx closed") _assert(c.bucket.tx.db != nil, "tx closed")
c.stack = c.stack[:0] c.stack = c.stack[:0]
@ -41,6 +44,7 @@ func (c *Cursor) First() (key []byte, value []byte) {
// Last moves the cursor to the last item in the bucket and returns its key and value. // Last moves the cursor to the last item in the bucket and returns its key and value.
// If the bucket is empty then a nil key and value are returned. // If the bucket is empty then a nil key and value are returned.
// The returned key and value are only valid for the life of the transaction.
func (c *Cursor) Last() (key []byte, value []byte) { func (c *Cursor) Last() (key []byte, value []byte) {
_assert(c.bucket.tx.db != nil, "tx closed") _assert(c.bucket.tx.db != nil, "tx closed")
c.stack = c.stack[:0] c.stack = c.stack[:0]
@ -58,6 +62,7 @@ func (c *Cursor) Last() (key []byte, value []byte) {
// Next moves the cursor to the next item in the bucket and returns its key and value. // Next moves the cursor to the next item in the bucket and returns its key and value.
// If the cursor is at the end of the bucket then a nil key and value are returned. // If the cursor is at the end of the bucket then a nil key and value are returned.
// The returned key and value are only valid for the life of the transaction.
func (c *Cursor) Next() (key []byte, value []byte) { func (c *Cursor) Next() (key []byte, value []byte) {
_assert(c.bucket.tx.db != nil, "tx closed") _assert(c.bucket.tx.db != nil, "tx closed")
k, v, flags := c.next() k, v, flags := c.next()
@ -69,6 +74,7 @@ func (c *Cursor) Next() (key []byte, value []byte) {
// Prev moves the cursor to the previous item in the bucket and returns its key and value. // Prev moves the cursor to the previous item in the bucket and returns its key and value.
// If the cursor is at the beginning of the bucket then a nil key and value are returned. // If the cursor is at the beginning of the bucket then a nil key and value are returned.
// The returned key and value are only valid for the life of the transaction.
func (c *Cursor) Prev() (key []byte, value []byte) { func (c *Cursor) Prev() (key []byte, value []byte) {
_assert(c.bucket.tx.db != nil, "tx closed") _assert(c.bucket.tx.db != nil, "tx closed")
@ -100,6 +106,7 @@ func (c *Cursor) Prev() (key []byte, value []byte) {
// Seek moves the cursor to a given key and returns it. // Seek moves the cursor to a given key and returns it.
// If the key does not exist then the next key is used. If no keys // If the key does not exist then the next key is used. If no keys
// follow, a nil key is returned. // follow, a nil key is returned.
// The returned key and value are only valid for the life of the transaction.
func (c *Cursor) Seek(seek []byte) (key []byte, value []byte) { func (c *Cursor) Seek(seek []byte) (key []byte, value []byte) {
k, v, flags := c.seek(seek) k, v, flags := c.seek(seek)

View File

@ -27,6 +27,12 @@ const magic uint32 = 0xED0CDAED
// must be synchronzied using the msync(2) syscall. // must be synchronzied using the msync(2) syscall.
const IgnoreNoSync = runtime.GOOS == "openbsd" const IgnoreNoSync = runtime.GOOS == "openbsd"
// Default values if not set in a DB instance.
const (
DefaultMaxBatchSize int = 1000
DefaultMaxBatchDelay = 10 * time.Millisecond
)
// DB represents a collection of buckets persisted to a file on disk. // DB represents a collection of buckets persisted to a file on disk.
// All data access is performed through transactions which can be obtained through the DB. // All data access is performed through transactions which can be obtained through the DB.
// All the functions on DB will return a ErrDatabaseNotOpen if accessed before Open() is called. // All the functions on DB will return a ErrDatabaseNotOpen if accessed before Open() is called.
@ -49,9 +55,25 @@ type DB struct {
// THIS IS UNSAFE. PLEASE USE WITH CAUTION. // THIS IS UNSAFE. PLEASE USE WITH CAUTION.
NoSync bool NoSync bool
// MaxBatchSize is the maximum size of a batch. Default value is
// copied from DefaultMaxBatchSize in Open.
//
// If <=0, disables batching.
//
// Do not change concurrently with calls to Batch.
MaxBatchSize int
// MaxBatchDelay is the maximum delay before a batch starts.
// Default value is copied from DefaultMaxBatchDelay in Open.
//
// If <=0, effectively disables batching.
//
// Do not change concurrently with calls to Batch.
MaxBatchDelay time.Duration
path string path string
file *os.File file *os.File
dataref []byte dataref []byte // mmap'ed readonly, write throws SEGV
data *[maxMapSize]byte data *[maxMapSize]byte
datasz int datasz int
meta0 *meta meta0 *meta
@ -63,6 +85,9 @@ type DB struct {
freelist *freelist freelist *freelist
stats Stats stats Stats
batchMu sync.Mutex
batch *batch
rwlock sync.Mutex // Allows only one writer at a time. rwlock sync.Mutex // Allows only one writer at a time.
metalock sync.Mutex // Protects meta page access. metalock sync.Mutex // Protects meta page access.
mmaplock sync.RWMutex // Protects mmap access during remapping. mmaplock sync.RWMutex // Protects mmap access during remapping.
@ -99,6 +124,10 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) {
options = DefaultOptions options = DefaultOptions
} }
// Set default values for later DB operations.
db.MaxBatchSize = DefaultMaxBatchSize
db.MaxBatchDelay = DefaultMaxBatchDelay
// Open data file and separate sync handler for metadata writes. // Open data file and separate sync handler for metadata writes.
db.path = path db.path = path
@ -215,7 +244,7 @@ func (db *DB) munmap() error {
} }
// mmapSize determines the appropriate size for the mmap given the current size // mmapSize determines the appropriate size for the mmap given the current size
// of the database. The minimum size is 4MB and doubles until it reaches 1GB. // of the database. The minimum size is 1MB and doubles until it reaches 1GB.
// Returns an error if the new mmap size is greater than the max allowed. // Returns an error if the new mmap size is greater than the max allowed.
func (db *DB) mmapSize(size int) (int, error) { func (db *DB) mmapSize(size int) (int, error) {
// Double the size from 1MB until 1GB. // Double the size from 1MB until 1GB.
@ -231,9 +260,9 @@ func (db *DB) mmapSize(size int) (int, error) {
} }
// If larger than 1GB then grow by 1GB at a time. // If larger than 1GB then grow by 1GB at a time.
sz := int64(size) + int64(maxMmapStep) sz := int64(size)
if remainder := sz % int64(maxMmapStep); remainder > 0 { if remainder := sz % int64(maxMmapStep); remainder > 0 {
sz -= remainder sz += int64(maxMmapStep) - remainder
} }
// Ensure that the mmap size is a multiple of the page size. // Ensure that the mmap size is a multiple of the page size.

View File

@ -1,6 +1,7 @@
package bolt_test package bolt_test
import ( import (
"encoding/binary"
"errors" "errors"
"flag" "flag"
"fmt" "fmt"
@ -125,6 +126,56 @@ func TestOpen_Size(t *testing.T) {
} }
} }
// Ensure that opening a database beyond the max step size does not increase its size.
// https://github.com/boltdb/bolt/issues/303
func TestOpen_Size_Large(t *testing.T) {
if testing.Short() {
t.Skip("short mode")
}
// Open a data file.
db := NewTestDB()
path := db.Path()
defer db.Close()
// Insert until we get above the minimum 4MB size.
var index uint64
for i := 0; i < 10000; i++ {
ok(t, db.Update(func(tx *bolt.Tx) error {
b, _ := tx.CreateBucketIfNotExists([]byte("data"))
for j := 0; j < 1000; j++ {
ok(t, b.Put(u64tob(index), make([]byte, 50)))
index++
}
return nil
}))
}
// Close database and grab the size.
db.DB.Close()
sz := fileSize(path)
if sz == 0 {
t.Fatalf("unexpected new file size: %d", sz)
} else if sz < (1 << 30) {
t.Fatalf("expected larger initial size: %d", sz)
}
// Reopen database, update, and check size again.
db0, err := bolt.Open(path, 0666, nil)
ok(t, err)
ok(t, db0.Update(func(tx *bolt.Tx) error { return tx.Bucket([]byte("data")).Put([]byte{0}, []byte{0}) }))
ok(t, db0.Close())
newSz := fileSize(path)
if newSz == 0 {
t.Fatalf("unexpected new file size: %d", newSz)
}
// Compare the original size with the new size.
if sz != newSz {
t.Fatalf("unexpected file growth: %d => %d", sz, newSz)
}
}
// Ensure that a re-opened database is consistent. // Ensure that a re-opened database is consistent.
func TestOpen_Check(t *testing.T) { func TestOpen_Check(t *testing.T) {
path := tempfile() path := tempfile()
@ -567,6 +618,34 @@ func NewTestDB() *TestDB {
return &TestDB{db} return &TestDB{db}
} }
// MustView executes a read-only function. Panic on error.
func (db *TestDB) MustView(fn func(tx *bolt.Tx) error) {
if err := db.DB.View(func(tx *bolt.Tx) error {
return fn(tx)
}); err != nil {
panic(err.Error())
}
}
// MustUpdate executes a read-write function. Panic on error.
func (db *TestDB) MustUpdate(fn func(tx *bolt.Tx) error) {
if err := db.DB.View(func(tx *bolt.Tx) error {
return fn(tx)
}); err != nil {
panic(err.Error())
}
}
// MustCreateBucket creates a new bucket. Panic on error.
func (db *TestDB) MustCreateBucket(name []byte) {
if err := db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucket([]byte(name))
return err
}); err != nil {
panic(err.Error())
}
}
// Close closes the database and deletes the underlying file. // Close closes the database and deletes the underlying file.
func (db *TestDB) Close() { func (db *TestDB) Close() {
// Log statistics. // Log statistics.
@ -699,3 +778,13 @@ func fileSize(path string) int64 {
func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) } func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) }
func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) } func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) }
// u64tob converts a uint64 into an 8-byte slice.
func u64tob(v uint64) []byte {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, v)
return b
}
// btou64 converts an 8-byte slice into an uint64.
func btou64(b []byte) uint64 { return binary.BigEndian.Uint64(b) }

View File

@ -8,7 +8,6 @@ import (
const pageHeaderSize = int(unsafe.Offsetof(((*page)(nil)).ptr)) const pageHeaderSize = int(unsafe.Offsetof(((*page)(nil)).ptr))
const maxAllocSize = 0xFFFFFFF
const minKeysPerPage = 2 const minKeysPerPage = 2
const branchPageElementSize = int(unsafe.Sizeof(branchPageElement{})) const branchPageElementSize = int(unsafe.Sizeof(branchPageElement{}))

View File

@ -252,37 +252,42 @@ func (tx *Tx) close() {
} }
// Copy writes the entire database to a writer. // Copy writes the entire database to a writer.
// A reader transaction is maintained during the copy so it is safe to continue // This function exists for backwards compatibility. Use WriteTo() in
// using the database while a copy is in progress.
// Copy will write exactly tx.Size() bytes into the writer.
func (tx *Tx) Copy(w io.Writer) error { func (tx *Tx) Copy(w io.Writer) error {
var f *os.File _, err := tx.WriteTo(w)
var err error return err
}
// WriteTo writes the entire database to a writer.
// If err == nil then exactly tx.Size() bytes will be written into the writer.
func (tx *Tx) WriteTo(w io.Writer) (n int64, err error) {
// Attempt to open reader directly. // Attempt to open reader directly.
var f *os.File
if f, err = os.OpenFile(tx.db.path, os.O_RDONLY|odirect, 0); err != nil { if f, err = os.OpenFile(tx.db.path, os.O_RDONLY|odirect, 0); err != nil {
// Fallback to a regular open if that doesn't work. // Fallback to a regular open if that doesn't work.
if f, err = os.OpenFile(tx.db.path, os.O_RDONLY, 0); err != nil { if f, err = os.OpenFile(tx.db.path, os.O_RDONLY, 0); err != nil {
return err return 0, err
} }
} }
// Copy the meta pages. // Copy the meta pages.
tx.db.metalock.Lock() tx.db.metalock.Lock()
_, err = io.CopyN(w, f, int64(tx.db.pageSize*2)) n, err = io.CopyN(w, f, int64(tx.db.pageSize*2))
tx.db.metalock.Unlock() tx.db.metalock.Unlock()
if err != nil { if err != nil {
_ = f.Close() _ = f.Close()
return fmt.Errorf("meta copy: %s", err) return n, fmt.Errorf("meta copy: %s", err)
} }
// Copy data pages. // Copy data pages.
if _, err := io.CopyN(w, f, tx.Size()-int64(tx.db.pageSize*2)); err != nil { wn, err := io.CopyN(w, f, tx.Size()-int64(tx.db.pageSize*2))
n += wn
if err != nil {
_ = f.Close() _ = f.Close()
return err return n, err
} }
return f.Close() return n, f.Close()
} }
// CopyFile copies the entire database to file at the given path. // CopyFile copies the entire database to file at the given path.

View File

@ -6,6 +6,7 @@ import (
"runtime" "runtime"
"strconv" "strconv"
"sync" "sync"
"sync/atomic"
"time" "time"
) )
@ -31,35 +32,45 @@ const TimeFormat = "2006/01/02 15:04:05"
const maxBufPoolSize = 16 const maxBufPoolSize = 16
type Logger struct { type atomicInt32 int32
sync.Mutex
level int func (i *atomicInt32) Set(n int) {
atomic.StoreInt32((*int32)(i), int32(n))
}
func (i *atomicInt32) Get() int {
return int(atomic.LoadInt32((*int32)(i)))
}
type Logger struct {
level atomicInt32
flag int flag int
hMutex sync.Mutex
handler Handler handler Handler
quit chan struct{} quit chan struct{}
msg chan []byte msg chan []byte
bufs [][]byte bufMutex sync.Mutex
bufs [][]byte
wg sync.WaitGroup wg sync.WaitGroup
closed bool closed atomicInt32
} }
//new a logger with specified handler and flag //new a logger with specified handler and flag
func New(handler Handler, flag int) *Logger { func New(handler Handler, flag int) *Logger {
var l = new(Logger) var l = new(Logger)
l.level = LevelInfo l.level.Set(LevelInfo)
l.handler = handler l.handler = handler
l.flag = flag l.flag = flag
l.quit = make(chan struct{}) l.quit = make(chan struct{})
l.closed = false l.closed.Set(0)
l.msg = make(chan []byte, 1024) l.msg = make(chan []byte, 1024)
@ -88,7 +99,9 @@ func (l *Logger) run() {
for { for {
select { select {
case msg := <-l.msg: case msg := <-l.msg:
l.hMutex.Lock()
l.handler.Write(msg) l.handler.Write(msg)
l.hMutex.Unlock()
l.putBuf(msg) l.putBuf(msg)
case <-l.quit: case <-l.quit:
//we must log all msg //we must log all msg
@ -100,7 +113,7 @@ func (l *Logger) run() {
} }
func (l *Logger) popBuf() []byte { func (l *Logger) popBuf() []byte {
l.Lock() l.bufMutex.Lock()
var buf []byte var buf []byte
if len(l.bufs) == 0 { if len(l.bufs) == 0 {
buf = make([]byte, 0, 1024) buf = make([]byte, 0, 1024)
@ -108,25 +121,25 @@ func (l *Logger) popBuf() []byte {
buf = l.bufs[len(l.bufs)-1] buf = l.bufs[len(l.bufs)-1]
l.bufs = l.bufs[0 : len(l.bufs)-1] l.bufs = l.bufs[0 : len(l.bufs)-1]
} }
l.Unlock() l.bufMutex.Unlock()
return buf return buf
} }
func (l *Logger) putBuf(buf []byte) { func (l *Logger) putBuf(buf []byte) {
l.Lock() l.bufMutex.Lock()
if len(l.bufs) < maxBufPoolSize { if len(l.bufs) < maxBufPoolSize {
buf = buf[0:0] buf = buf[0:0]
l.bufs = append(l.bufs, buf) l.bufs = append(l.bufs, buf)
} }
l.Unlock() l.bufMutex.Unlock()
} }
func (l *Logger) Close() { func (l *Logger) Close() {
if l.closed { if l.closed.Get() == 1 {
return return
} }
l.closed = true l.closed.Set(1)
close(l.quit) close(l.quit)
@ -139,16 +152,30 @@ func (l *Logger) Close() {
//set log level, any log level less than it will not log //set log level, any log level less than it will not log
func (l *Logger) SetLevel(level int) { func (l *Logger) SetLevel(level int) {
l.level = level l.level.Set(level)
} }
func (l *Logger) Output(callDepth int, level int, s string) { func (l *Logger) SetHandler(h Handler) {
if l.closed { if l.closed.Get() == 1 {
//closed
return return
} }
if l.level > level { l.hMutex.Lock()
if l.handler != nil {
l.handler.Close()
}
l.handler = h
l.hMutex.Unlock()
}
func (l *Logger) Output(callDepth int, level int, s string) {
if l.closed.Get() == 1 {
// closed
return
}
if l.level.Get() > level {
// higher level can be logged
return return
} }
@ -261,6 +288,10 @@ func SetLevel(level int) {
std.SetLevel(level) std.SetLevel(level)
} }
func SetHandler(h Handler) {
std.SetHandler(h)
}
func Trace(v ...interface{}) { func Trace(v ...interface{}) {
std.Output(2, LevelTrace, fmt.Sprint(v...)) std.Output(2, LevelTrace, fmt.Sprint(v...))
} }

View File

@ -16,7 +16,14 @@ func TestStdStreamLog(t *testing.T) {
Info("hello world") Info("hello world")
SetHandler(os.Stderr)
Infof("%s %d", "Hello", 123) Infof("%s %d", "Hello", 123)
SetLevel(LevelError)
Infof("%s %d", "Hello", 123)
Fatalf("%s %d", "Hello", 123)
} }
func TestRotatingFileLog(t *testing.T) { func TestRotatingFileLog(t *testing.T) {

View File

@ -0,0 +1,65 @@
package sync2
import (
"sync"
"sync/atomic"
"time"
)
func NewSemaphore(initialCount int) *Semaphore {
res := &Semaphore{
counter: int64(initialCount),
}
res.cond.L = &res.lock
return res
}
type Semaphore struct {
lock sync.Mutex
cond sync.Cond
counter int64
}
func (s *Semaphore) Release() {
s.lock.Lock()
s.counter += 1
if s.counter >= 0 {
s.cond.Signal()
}
s.lock.Unlock()
}
func (s *Semaphore) Acquire() {
s.lock.Lock()
for s.counter < 1 {
s.cond.Wait()
}
s.counter -= 1
s.lock.Unlock()
}
func (s *Semaphore) AcquireTimeout(timeout time.Duration) bool {
done := make(chan bool, 1)
// Gate used to communicate between the threads and decide what the result
// is. If the main thread decides, we have timed out, otherwise we succeed.
decided := new(int32)
go func() {
s.Acquire()
if atomic.SwapInt32(decided, 1) == 0 {
done <- true
} else {
// If we already decided the result, and this thread did not win
s.Release()
}
}()
select {
case <-done:
return true
case <-time.NewTimer(timeout).C:
if atomic.SwapInt32(decided, 1) == 1 {
// The other thread already decided the result
return true
}
return false
}
}

View File

@ -0,0 +1,41 @@
// Copyright 2012, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package sync2
import (
"testing"
"time"
)
func TestSemaNoTimeout(t *testing.T) {
s := NewSemaphore(1)
s.Acquire()
released := false
go func() {
time.Sleep(10 * time.Millisecond)
released = true
s.Release()
}()
s.Acquire()
if !released {
t.Errorf("want true, got false")
}
}
func TestSemaTimeout(t *testing.T) {
s := NewSemaphore(1)
s.Acquire()
go func() {
time.Sleep(10 * time.Millisecond)
s.Release()
}()
if ok := s.AcquireTimeout(5 * time.Millisecond); ok {
t.Errorf("want false, got true")
}
time.Sleep(10 * time.Millisecond)
if ok := s.AcquireTimeout(5 * time.Millisecond); !ok {
t.Errorf("want true, got false")
}
}

View File

@ -2,6 +2,7 @@ package goredis
import ( import (
"bufio" "bufio"
"fmt"
"io" "io"
"net" "net"
"sync/atomic" "sync/atomic"
@ -16,9 +17,7 @@ func (s *sizeWriter) Write(p []byte) (int, error) {
} }
type Conn struct { type Conn struct {
c net.Conn c net.Conn
br *bufio.Reader
bw *bufio.Writer
respReader *RespReader respReader *RespReader
respWriter *RespWriter respWriter *RespWriter
@ -34,23 +33,33 @@ func Connect(addr string) (*Conn, error) {
} }
func ConnectWithSize(addr string, readSize int, writeSize int) (*Conn, error) { func ConnectWithSize(addr string, readSize int, writeSize int) (*Conn, error) {
c := new(Conn) conn, err := net.Dial(getProto(addr), addr)
var err error
c.c, err = net.Dial(getProto(addr), addr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
c.br = bufio.NewReaderSize(io.TeeReader(c.c, &c.totalReadSize), readSize) return NewConnWithSize(conn, readSize, writeSize)
c.bw = bufio.NewWriterSize(io.MultiWriter(c.c, &c.totalWriteSize), writeSize) }
c.respReader = NewRespReader(c.br) func NewConn(conn net.Conn) (*Conn, error) {
c.respWriter = NewRespWriter(c.bw) return NewConnWithSize(conn, 1024, 1024)
}
func NewConnWithSize(conn net.Conn, readSize int, writeSize int) (*Conn, error) {
c := new(Conn)
c.c = conn
br := bufio.NewReaderSize(io.TeeReader(c.c, &c.totalReadSize), readSize)
bw := bufio.NewWriterSize(io.MultiWriter(c.c, &c.totalWriteSize), writeSize)
c.respReader = NewRespReader(br)
c.respWriter = NewRespWriter(bw)
atomic.StoreInt32(&c.closed, 0) atomic.StoreInt32(&c.closed, 0)
return c, nil return c, nil
} }
func (c *Conn) Close() { func (c *Conn) Close() {
@ -75,14 +84,23 @@ func (c *Conn) GetTotalWriteSize() int64 {
return int64(c.totalWriteSize) return int64(c.totalWriteSize)
} }
func (c *Conn) SetReadDeadline(t time.Time) { func (c *Conn) SetReadDeadline(t time.Time) error {
c.c.SetReadDeadline(t) return c.c.SetReadDeadline(t)
} }
func (c *Conn) SetWriteDeadline(t time.Time) { func (c *Conn) SetWriteDeadline(t time.Time) error {
c.c.SetWriteDeadline(t) return c.c.SetWriteDeadline(t)
} }
func (c *Conn) RemoteAddr() net.Addr {
return c.c.RemoteAddr()
}
func (c *Conn) LocalAddr() net.Addr {
return c.c.LocalAddr()
}
// Send RESP command and receive the reply
func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) { func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) {
if err := c.Send(cmd, args...); err != nil { if err := c.Send(cmd, args...); err != nil {
return nil, err return nil, err
@ -91,6 +109,7 @@ func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) {
return c.Receive() return c.Receive()
} }
// Send RESP command
func (c *Conn) Send(cmd string, args ...interface{}) error { func (c *Conn) Send(cmd string, args ...interface{}) error {
if err := c.respWriter.WriteCommand(cmd, args...); err != nil { if err := c.respWriter.WriteCommand(cmd, args...); err != nil {
c.Close() c.Close()
@ -100,6 +119,7 @@ func (c *Conn) Send(cmd string, args ...interface{}) error {
return nil return nil
} }
// Receive RESP reply
func (c *Conn) Receive() (interface{}, error) { func (c *Conn) Receive() (interface{}, error) {
if reply, err := c.respReader.Parse(); err != nil { if reply, err := c.respReader.Parse(); err != nil {
c.Close() c.Close()
@ -113,6 +133,7 @@ func (c *Conn) Receive() (interface{}, error) {
} }
} }
// Receive RESP bulk string reply into writer w
func (c *Conn) ReceiveBulkTo(w io.Writer) error { func (c *Conn) ReceiveBulkTo(w io.Writer) error {
err := c.respReader.ParseBulkTo(w) err := c.respReader.ParseBulkTo(w)
if err != nil { if err != nil {
@ -123,6 +144,31 @@ func (c *Conn) ReceiveBulkTo(w io.Writer) error {
return err return err
} }
// Receive RESP command request, must array of bulk stirng
func (c *Conn) ReceiveRequest() ([][]byte, error) {
return c.respReader.ParseRequest()
}
// Send RESP value, must be string, int64, []byte, error, nil or []interface{}
func (c *Conn) SendValue(v interface{}) error {
switch v := v.(type) {
case string:
return c.respWriter.FlushString(v)
case int64:
return c.respWriter.FlushInteger(v)
case []byte:
return c.respWriter.FlushBulk(v)
case []interface{}:
return c.respWriter.FlushArray(v)
case error:
return c.respWriter.FlushError(v)
case nil:
return c.respWriter.FlushBulk(nil)
default:
return fmt.Errorf("invalid type %T for send RESP value", v)
}
}
func (c *Client) newConn(addr string, pass string) (*Conn, error) { func (c *Client) newConn(addr string, pass string) (*Conn, error) {
co, err := ConnectWithSize(addr, c.readBufferSize, c.writeBufferSize) co, err := ConnectWithSize(addr, c.readBufferSize, c.writeBufferSize)
if err != nil { if err != nil {

View File

@ -347,12 +347,14 @@ func recoverTable(s *session, o *opt.Options) error {
return err return err
} }
iter := tr.NewIterator(nil, nil) iter := tr.NewIterator(nil, nil)
iter.(iterator.ErrorCallbackSetter).SetErrorCallback(func(err error) { if itererr, ok := iter.(iterator.ErrorCallbackSetter); ok {
if errors.IsCorrupted(err) { itererr.SetErrorCallback(func(err error) {
s.logf("table@recovery block corruption @%d %q", file.Num(), err) if errors.IsCorrupted(err) {
tcorruptedBlock++ s.logf("table@recovery block corruption @%d %q", file.Num(), err)
} tcorruptedBlock++
}) }
})
}
// Scan the table. // Scan the table.
for iter.Next() { for iter.Next() {

View File

@ -8,6 +8,7 @@ package leveldb
import ( import (
"errors" "errors"
"math/rand"
"runtime" "runtime"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -80,6 +81,10 @@ func (db *DB) newIterator(seq uint64, slice *util.Range, ro *opt.ReadOptions) *d
return iter return iter
} }
func (db *DB) iterSamplingRate() int {
return rand.Intn(2 * db.s.o.GetIteratorSamplingRate())
}
type dir int type dir int
const ( const (
@ -98,11 +103,21 @@ type dbIter struct {
seq uint64 seq uint64
strict bool strict bool
dir dir smaplingGap int
key []byte dir dir
value []byte key []byte
err error value []byte
releaser util.Releaser err error
releaser util.Releaser
}
func (i *dbIter) sampleSeek() {
ikey := i.iter.Key()
i.smaplingGap -= len(ikey) + len(i.iter.Value())
for i.smaplingGap < 0 {
i.smaplingGap += i.db.iterSamplingRate()
i.db.sampleSeek(ikey)
}
} }
func (i *dbIter) setErr(err error) { func (i *dbIter) setErr(err error) {
@ -175,6 +190,7 @@ func (i *dbIter) Seek(key []byte) bool {
func (i *dbIter) next() bool { func (i *dbIter) next() bool {
for { for {
if ukey, seq, kt, kerr := parseIkey(i.iter.Key()); kerr == nil { if ukey, seq, kt, kerr := parseIkey(i.iter.Key()); kerr == nil {
i.sampleSeek()
if seq <= i.seq { if seq <= i.seq {
switch kt { switch kt {
case ktDel: case ktDel:
@ -225,6 +241,7 @@ func (i *dbIter) prev() bool {
if i.iter.Valid() { if i.iter.Valid() {
for { for {
if ukey, seq, kt, kerr := parseIkey(i.iter.Key()); kerr == nil { if ukey, seq, kt, kerr := parseIkey(i.iter.Key()); kerr == nil {
i.sampleSeek()
if seq <= i.seq { if seq <= i.seq {
if !del && i.icmp.uCompare(ukey, i.key) < 0 { if !del && i.icmp.uCompare(ukey, i.key) < 0 {
return true return true
@ -266,6 +283,7 @@ func (i *dbIter) Prev() bool {
case dirForward: case dirForward:
for i.iter.Prev() { for i.iter.Prev() {
if ukey, _, _, kerr := parseIkey(i.iter.Key()); kerr == nil { if ukey, _, _, kerr := parseIkey(i.iter.Key()); kerr == nil {
i.sampleSeek()
if i.icmp.uCompare(ukey, i.key) < 0 { if i.icmp.uCompare(ukey, i.key) < 0 {
goto cont goto cont
} }

View File

@ -48,6 +48,15 @@ func (db *DB) addSeq(delta uint64) {
atomic.AddUint64(&db.seq, delta) atomic.AddUint64(&db.seq, delta)
} }
func (db *DB) sampleSeek(ikey iKey) {
v := db.s.version()
if v.sampleSeek(ikey) {
// Trigger table compaction.
db.compSendTrigger(db.tcompCmdC)
}
v.release()
}
func (db *DB) mpoolPut(mem *memdb.DB) { func (db *DB) mpoolPut(mem *memdb.DB) {
defer func() { defer func() {
recover() recover()

View File

@ -405,19 +405,21 @@ func (h *dbHarness) compactRange(min, max string) {
t.Log("DB range compaction done") t.Log("DB range compaction done")
} }
func (h *dbHarness) sizeAssert(start, limit string, low, hi uint64) { func (h *dbHarness) sizeOf(start, limit string) uint64 {
t := h.t sz, err := h.db.SizeOf([]util.Range{
db := h.db
s, err := db.SizeOf([]util.Range{
{[]byte(start), []byte(limit)}, {[]byte(start), []byte(limit)},
}) })
if err != nil { if err != nil {
t.Error("SizeOf: got error: ", err) h.t.Error("SizeOf: got error: ", err)
} }
if s.Sum() < low || s.Sum() > hi { return sz.Sum()
t.Errorf("sizeof %q to %q not in range, want %d - %d, got %d", }
shorten(start), shorten(limit), low, hi, s.Sum())
func (h *dbHarness) sizeAssert(start, limit string, low, hi uint64) {
sz := h.sizeOf(start, limit)
if sz < low || sz > hi {
h.t.Errorf("sizeOf %q to %q not in range, want %d - %d, got %d",
shorten(start), shorten(limit), low, hi, sz)
} }
} }
@ -2577,3 +2579,87 @@ func TestDB_TableCompactionBuilder(t *testing.T) {
} }
v.release() v.release()
} }
func testDB_IterTriggeredCompaction(t *testing.T, limitDiv int) {
const (
vSize = 200 * opt.KiB
tSize = 100 * opt.MiB
mIter = 100
n = tSize / vSize
)
h := newDbHarnessWopt(t, &opt.Options{
Compression: opt.NoCompression,
DisableBlockCache: true,
})
defer h.close()
key := func(x int) string {
return fmt.Sprintf("v%06d", x)
}
// Fill.
value := strings.Repeat("x", vSize)
for i := 0; i < n; i++ {
h.put(key(i), value)
}
h.compactMem()
// Delete all.
for i := 0; i < n; i++ {
h.delete(key(i))
}
h.compactMem()
var (
limit = n / limitDiv
startKey = key(0)
limitKey = key(limit)
maxKey = key(n)
slice = &util.Range{Limit: []byte(limitKey)}
initialSize0 = h.sizeOf(startKey, limitKey)
initialSize1 = h.sizeOf(limitKey, maxKey)
)
t.Logf("inital size %s [rest %s]", shortenb(int(initialSize0)), shortenb(int(initialSize1)))
for r := 0; true; r++ {
if r >= mIter {
t.Fatal("taking too long to compact")
}
// Iterates.
iter := h.db.NewIterator(slice, h.ro)
for iter.Next() {
}
if err := iter.Error(); err != nil {
t.Fatalf("Iter err: %v", err)
}
iter.Release()
// Wait compaction.
h.waitCompaction()
// Check size.
size0 := h.sizeOf(startKey, limitKey)
size1 := h.sizeOf(limitKey, maxKey)
t.Logf("#%03d size %s [rest %s]", r, shortenb(int(size0)), shortenb(int(size1)))
if size0 < initialSize0/10 {
break
}
}
if initialSize1 > 0 {
h.sizeAssert(limitKey, maxKey, initialSize1/4-opt.MiB, initialSize1+opt.MiB)
}
}
func TestDB_IterTriggeredCompaction(t *testing.T) {
testDB_IterTriggeredCompaction(t, 1)
}
func TestDB_IterTriggeredCompactionHalf(t *testing.T) {
testDB_IterTriggeredCompaction(t, 2)
}

View File

@ -34,10 +34,11 @@ var (
DefaultCompactionTotalSize = 10 * MiB DefaultCompactionTotalSize = 10 * MiB
DefaultCompactionTotalSizeMultiplier = 10.0 DefaultCompactionTotalSizeMultiplier = 10.0
DefaultCompressionType = SnappyCompression DefaultCompressionType = SnappyCompression
DefaultOpenFilesCacher = LRUCacher DefaultIteratorSamplingRate = 1 * MiB
DefaultOpenFilesCacheCapacity = 500
DefaultMaxMemCompationLevel = 2 DefaultMaxMemCompationLevel = 2
DefaultNumLevel = 7 DefaultNumLevel = 7
DefaultOpenFilesCacher = LRUCacher
DefaultOpenFilesCacheCapacity = 500
DefaultWriteBuffer = 4 * MiB DefaultWriteBuffer = 4 * MiB
DefaultWriteL0PauseTrigger = 12 DefaultWriteL0PauseTrigger = 12
DefaultWriteL0SlowdownTrigger = 8 DefaultWriteL0SlowdownTrigger = 8
@ -153,7 +154,7 @@ type Options struct {
BlockCacher Cacher BlockCacher Cacher
// BlockCacheCapacity defines the capacity of the 'sorted table' block caching. // BlockCacheCapacity defines the capacity of the 'sorted table' block caching.
// Use -1 for zero, this has same effect with specifying NoCacher to BlockCacher. // Use -1 for zero, this has same effect as specifying NoCacher to BlockCacher.
// //
// The default value is 8MiB. // The default value is 8MiB.
BlockCacheCapacity int BlockCacheCapacity int
@ -288,6 +289,13 @@ type Options struct {
// The default value is nil. // The default value is nil.
Filter filter.Filter Filter filter.Filter
// IteratorSamplingRate defines approximate gap (in bytes) between read
// sampling of an iterator. The samples will be used to determine when
// compaction should be triggered.
//
// The default is 1MiB.
IteratorSamplingRate int
// MaxMemCompationLevel defines maximum level a newly compacted 'memdb' // MaxMemCompationLevel defines maximum level a newly compacted 'memdb'
// will be pushed into if doesn't creates overlap. This should less than // will be pushed into if doesn't creates overlap. This should less than
// NumLevel. Use -1 for level-0. // NumLevel. Use -1 for level-0.
@ -308,7 +316,7 @@ type Options struct {
OpenFilesCacher Cacher OpenFilesCacher Cacher
// OpenFilesCacheCapacity defines the capacity of the open files caching. // OpenFilesCacheCapacity defines the capacity of the open files caching.
// Use -1 for zero, this has same effect with specifying NoCacher to OpenFilesCacher. // Use -1 for zero, this has same effect as specifying NoCacher to OpenFilesCacher.
// //
// The default value is 500. // The default value is 500.
OpenFilesCacheCapacity int OpenFilesCacheCapacity int
@ -355,9 +363,9 @@ func (o *Options) GetBlockCacher() Cacher {
} }
func (o *Options) GetBlockCacheCapacity() int { func (o *Options) GetBlockCacheCapacity() int {
if o == nil || o.BlockCacheCapacity <= 0 { if o == nil || o.BlockCacheCapacity == 0 {
return DefaultBlockCacheCapacity return DefaultBlockCacheCapacity
} else if o.BlockCacheCapacity == -1 { } else if o.BlockCacheCapacity < 0 {
return 0 return 0
} }
return o.BlockCacheCapacity return o.BlockCacheCapacity
@ -492,12 +500,19 @@ func (o *Options) GetFilter() filter.Filter {
return o.Filter return o.Filter
} }
func (o *Options) GetIteratorSamplingRate() int {
if o == nil || o.IteratorSamplingRate <= 0 {
return DefaultIteratorSamplingRate
}
return o.IteratorSamplingRate
}
func (o *Options) GetMaxMemCompationLevel() int { func (o *Options) GetMaxMemCompationLevel() int {
level := DefaultMaxMemCompationLevel level := DefaultMaxMemCompationLevel
if o != nil { if o != nil {
if o.MaxMemCompationLevel > 0 { if o.MaxMemCompationLevel > 0 {
level = o.MaxMemCompationLevel level = o.MaxMemCompationLevel
} else if o.MaxMemCompationLevel == -1 { } else if o.MaxMemCompationLevel < 0 {
level = 0 level = 0
} }
} }
@ -525,9 +540,9 @@ func (o *Options) GetOpenFilesCacher() Cacher {
} }
func (o *Options) GetOpenFilesCacheCapacity() int { func (o *Options) GetOpenFilesCacheCapacity() int {
if o == nil || o.OpenFilesCacheCapacity <= 0 { if o == nil || o.OpenFilesCacheCapacity == 0 {
return DefaultOpenFilesCacheCapacity return DefaultOpenFilesCacheCapacity
} else if o.OpenFilesCacheCapacity == -1 { } else if o.OpenFilesCacheCapacity < 0 {
return 0 return 0
} }
return o.OpenFilesCacheCapacity return o.OpenFilesCacheCapacity

View File

@ -136,9 +136,8 @@ func (v *version) get(ikey iKey, ro *opt.ReadOptions, noValue bool) (value []byt
if !tseek { if !tseek {
if tset == nil { if tset == nil {
tset = &tSet{level, t} tset = &tSet{level, t}
} else if tset.table.consumeSeek() <= 0 { } else {
tseek = true tseek = true
tcomp = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset))
} }
} }
@ -203,6 +202,28 @@ func (v *version) get(ikey iKey, ro *opt.ReadOptions, noValue bool) (value []byt
return true return true
}) })
if tseek && tset.table.consumeSeek() <= 0 {
tcomp = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset))
}
return
}
func (v *version) sampleSeek(ikey iKey) (tcomp bool) {
var tset *tSet
v.walkOverlapping(ikey, func(level int, t *tFile) bool {
if tset == nil {
tset = &tSet{level, t}
return true
} else {
if tset.table.consumeSeek() <= 0 {
tcomp = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset))
}
return false
}
}, nil)
return return
} }

View File

@ -7,10 +7,15 @@ package snappy
import ( import (
"encoding/binary" "encoding/binary"
"errors" "errors"
"io"
) )
// ErrCorrupt reports that the input is invalid. var (
var ErrCorrupt = errors.New("snappy: corrupt input") // ErrCorrupt reports that the input is invalid.
ErrCorrupt = errors.New("snappy: corrupt input")
// ErrUnsupported reports that the input isn't supported.
ErrUnsupported = errors.New("snappy: unsupported input")
)
// DecodedLen returns the length of the decoded block. // DecodedLen returns the length of the decoded block.
func DecodedLen(src []byte) (int, error) { func DecodedLen(src []byte) (int, error) {
@ -122,3 +127,166 @@ func Decode(dst, src []byte) ([]byte, error) {
} }
return dst[:d], nil return dst[:d], nil
} }
// NewReader returns a new Reader that decompresses from r, using the framing
// format described at
// https://code.google.com/p/snappy/source/browse/trunk/framing_format.txt
func NewReader(r io.Reader) *Reader {
return &Reader{
r: r,
decoded: make([]byte, maxUncompressedChunkLen),
buf: make([]byte, MaxEncodedLen(maxUncompressedChunkLen)+checksumSize),
}
}
// Reader is an io.Reader than can read Snappy-compressed bytes.
type Reader struct {
r io.Reader
err error
decoded []byte
buf []byte
// decoded[i:j] contains decoded bytes that have not yet been passed on.
i, j int
readHeader bool
}
// Reset discards any buffered data, resets all state, and switches the Snappy
// reader to read from r. This permits reusing a Reader rather than allocating
// a new one.
func (r *Reader) Reset(reader io.Reader) {
r.r = reader
r.err = nil
r.i = 0
r.j = 0
r.readHeader = false
}
func (r *Reader) readFull(p []byte) (ok bool) {
if _, r.err = io.ReadFull(r.r, p); r.err != nil {
if r.err == io.ErrUnexpectedEOF {
r.err = ErrCorrupt
}
return false
}
return true
}
// Read satisfies the io.Reader interface.
func (r *Reader) Read(p []byte) (int, error) {
if r.err != nil {
return 0, r.err
}
for {
if r.i < r.j {
n := copy(p, r.decoded[r.i:r.j])
r.i += n
return n, nil
}
if !r.readFull(r.buf[:4]) {
return 0, r.err
}
chunkType := r.buf[0]
if !r.readHeader {
if chunkType != chunkTypeStreamIdentifier {
r.err = ErrCorrupt
return 0, r.err
}
r.readHeader = true
}
chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16
if chunkLen > len(r.buf) {
r.err = ErrUnsupported
return 0, r.err
}
// The chunk types are specified at
// https://code.google.com/p/snappy/source/browse/trunk/framing_format.txt
switch chunkType {
case chunkTypeCompressedData:
// Section 4.2. Compressed data (chunk type 0x00).
if chunkLen < checksumSize {
r.err = ErrCorrupt
return 0, r.err
}
buf := r.buf[:chunkLen]
if !r.readFull(buf) {
return 0, r.err
}
checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
buf = buf[checksumSize:]
n, err := DecodedLen(buf)
if err != nil {
r.err = err
return 0, r.err
}
if n > len(r.decoded) {
r.err = ErrCorrupt
return 0, r.err
}
if _, err := Decode(r.decoded, buf); err != nil {
r.err = err
return 0, r.err
}
if crc(r.decoded[:n]) != checksum {
r.err = ErrCorrupt
return 0, r.err
}
r.i, r.j = 0, n
continue
case chunkTypeUncompressedData:
// Section 4.3. Uncompressed data (chunk type 0x01).
if chunkLen < checksumSize {
r.err = ErrCorrupt
return 0, r.err
}
buf := r.buf[:checksumSize]
if !r.readFull(buf) {
return 0, r.err
}
checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
// Read directly into r.decoded instead of via r.buf.
n := chunkLen - checksumSize
if !r.readFull(r.decoded[:n]) {
return 0, r.err
}
if crc(r.decoded[:n]) != checksum {
r.err = ErrCorrupt
return 0, r.err
}
r.i, r.j = 0, n
continue
case chunkTypeStreamIdentifier:
// Section 4.1. Stream identifier (chunk type 0xff).
if chunkLen != len(magicBody) {
r.err = ErrCorrupt
return 0, r.err
}
if !r.readFull(r.buf[:len(magicBody)]) {
return 0, r.err
}
for i := 0; i < len(magicBody); i++ {
if r.buf[i] != magicBody[i] {
r.err = ErrCorrupt
return 0, r.err
}
}
continue
}
if chunkType <= 0x7f {
// Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f).
r.err = ErrUnsupported
return 0, r.err
} else {
// Section 4.4 Padding (chunk type 0xfe).
// Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
if !r.readFull(r.buf[:chunkLen]) {
return 0, r.err
}
}
}
}

View File

@ -6,6 +6,7 @@ package snappy
import ( import (
"encoding/binary" "encoding/binary"
"io"
) )
// We limit how far copy back-references can go, the same as the C++ code. // We limit how far copy back-references can go, the same as the C++ code.
@ -172,3 +173,86 @@ func MaxEncodedLen(srcLen int) int {
// This last factor dominates the blowup, so the final estimate is: // This last factor dominates the blowup, so the final estimate is:
return 32 + srcLen + srcLen/6 return 32 + srcLen + srcLen/6
} }
// NewWriter returns a new Writer that compresses to w, using the framing
// format described at
// https://code.google.com/p/snappy/source/browse/trunk/framing_format.txt
func NewWriter(w io.Writer) *Writer {
return &Writer{
w: w,
enc: make([]byte, MaxEncodedLen(maxUncompressedChunkLen)),
}
}
// Writer is an io.Writer than can write Snappy-compressed bytes.
type Writer struct {
w io.Writer
err error
enc []byte
buf [checksumSize + chunkHeaderSize]byte
wroteHeader bool
}
// Reset discards the writer's state and switches the Snappy writer to write to
// w. This permits reusing a Writer rather than allocating a new one.
func (w *Writer) Reset(writer io.Writer) {
w.w = writer
w.err = nil
w.wroteHeader = false
}
// Write satisfies the io.Writer interface.
func (w *Writer) Write(p []byte) (n int, errRet error) {
if w.err != nil {
return 0, w.err
}
if !w.wroteHeader {
copy(w.enc, magicChunk)
if _, err := w.w.Write(w.enc[:len(magicChunk)]); err != nil {
w.err = err
return n, err
}
w.wroteHeader = true
}
for len(p) > 0 {
var uncompressed []byte
if len(p) > maxUncompressedChunkLen {
uncompressed, p = p[:maxUncompressedChunkLen], p[maxUncompressedChunkLen:]
} else {
uncompressed, p = p, nil
}
checksum := crc(uncompressed)
// Compress the buffer, discarding the result if the improvement
// isn't at least 12.5%.
chunkType := uint8(chunkTypeCompressedData)
chunkBody, err := Encode(w.enc, uncompressed)
if err != nil {
w.err = err
return n, err
}
if len(chunkBody) >= len(uncompressed)-len(uncompressed)/8 {
chunkType, chunkBody = chunkTypeUncompressedData, uncompressed
}
chunkLen := 4 + len(chunkBody)
w.buf[0] = chunkType
w.buf[1] = uint8(chunkLen >> 0)
w.buf[2] = uint8(chunkLen >> 8)
w.buf[3] = uint8(chunkLen >> 16)
w.buf[4] = uint8(checksum >> 0)
w.buf[5] = uint8(checksum >> 8)
w.buf[6] = uint8(checksum >> 16)
w.buf[7] = uint8(checksum >> 24)
if _, err = w.w.Write(w.buf[:]); err != nil {
w.err = err
return n, err
}
if _, err = w.w.Write(chunkBody); err != nil {
w.err = err
return n, err
}
n += len(uncompressed)
}
return n, nil
}

View File

@ -8,6 +8,10 @@
// The C++ snappy implementation is at http://code.google.com/p/snappy/ // The C++ snappy implementation is at http://code.google.com/p/snappy/
package snappy package snappy
import (
"hash/crc32"
)
/* /*
Each encoded block begins with the varint-encoded length of the decoded data, Each encoded block begins with the varint-encoded length of the decoded data,
followed by a sequence of chunks. Chunks begin and end on byte boundaries. The followed by a sequence of chunks. Chunks begin and end on byte boundaries. The
@ -36,3 +40,29 @@ const (
tagCopy2 = 0x02 tagCopy2 = 0x02
tagCopy4 = 0x03 tagCopy4 = 0x03
) )
const (
checksumSize = 4
chunkHeaderSize = 4
magicChunk = "\xff\x06\x00\x00" + magicBody
magicBody = "sNaPpY"
// https://code.google.com/p/snappy/source/browse/trunk/framing_format.txt says
// that "the uncompressed data in a chunk must be no longer than 65536 bytes".
maxUncompressedChunkLen = 65536
)
const (
chunkTypeCompressedData = 0x00
chunkTypeUncompressedData = 0x01
chunkTypePadding = 0xfe
chunkTypeStreamIdentifier = 0xff
)
var crcTable = crc32.MakeTable(crc32.Castagnoli)
// crc implements the checksum specified in section 3 of
// https://code.google.com/p/snappy/source/browse/trunk/framing_format.txt
func crc(b []byte) uint32 {
c := crc32.Update(0, crcTable, b)
return uint32(c>>15|c<<17) + 0xa282ead8
}

View File

@ -18,7 +18,10 @@ import (
"testing" "testing"
) )
var download = flag.Bool("download", false, "If true, download any missing files before running benchmarks") var (
download = flag.Bool("download", false, "If true, download any missing files before running benchmarks")
testdata = flag.String("testdata", "testdata", "Directory containing the test data")
)
func roundtrip(b, ebuf, dbuf []byte) error { func roundtrip(b, ebuf, dbuf []byte) error {
e, err := Encode(ebuf, b) e, err := Encode(ebuf, b)
@ -55,11 +58,11 @@ func TestSmallCopy(t *testing.T) {
} }
func TestSmallRand(t *testing.T) { func TestSmallRand(t *testing.T) {
rand.Seed(27354294) rng := rand.New(rand.NewSource(27354294))
for n := 1; n < 20000; n += 23 { for n := 1; n < 20000; n += 23 {
b := make([]byte, n) b := make([]byte, n)
for i, _ := range b { for i := range b {
b[i] = uint8(rand.Uint32()) b[i] = uint8(rng.Uint32())
} }
if err := roundtrip(b, nil, nil); err != nil { if err := roundtrip(b, nil, nil); err != nil {
t.Fatal(err) t.Fatal(err)
@ -70,7 +73,7 @@ func TestSmallRand(t *testing.T) {
func TestSmallRegular(t *testing.T) { func TestSmallRegular(t *testing.T) {
for n := 1; n < 20000; n += 23 { for n := 1; n < 20000; n += 23 {
b := make([]byte, n) b := make([]byte, n)
for i, _ := range b { for i := range b {
b[i] = uint8(i%10 + 'a') b[i] = uint8(i%10 + 'a')
} }
if err := roundtrip(b, nil, nil); err != nil { if err := roundtrip(b, nil, nil); err != nil {
@ -79,6 +82,120 @@ func TestSmallRegular(t *testing.T) {
} }
} }
func cmp(a, b []byte) error {
if len(a) != len(b) {
return fmt.Errorf("got %d bytes, want %d", len(a), len(b))
}
for i := range a {
if a[i] != b[i] {
return fmt.Errorf("byte #%d: got 0x%02x, want 0x%02x", i, a[i], b[i])
}
}
return nil
}
func TestFramingFormat(t *testing.T) {
// src is comprised of alternating 1e5-sized sequences of random
// (incompressible) bytes and repeated (compressible) bytes. 1e5 was chosen
// because it is larger than maxUncompressedChunkLen (64k).
src := make([]byte, 1e6)
rng := rand.New(rand.NewSource(1))
for i := 0; i < 10; i++ {
if i%2 == 0 {
for j := 0; j < 1e5; j++ {
src[1e5*i+j] = uint8(rng.Intn(256))
}
} else {
for j := 0; j < 1e5; j++ {
src[1e5*i+j] = uint8(i)
}
}
}
buf := new(bytes.Buffer)
if _, err := NewWriter(buf).Write(src); err != nil {
t.Fatalf("Write: encoding: %v", err)
}
dst, err := ioutil.ReadAll(NewReader(buf))
if err != nil {
t.Fatalf("ReadAll: decoding: %v", err)
}
if err := cmp(dst, src); err != nil {
t.Fatal(err)
}
}
func TestReaderReset(t *testing.T) {
gold := bytes.Repeat([]byte("All that is gold does not glitter,\n"), 10000)
buf := new(bytes.Buffer)
if _, err := NewWriter(buf).Write(gold); err != nil {
t.Fatalf("Write: %v", err)
}
encoded, invalid, partial := buf.String(), "invalid", "partial"
r := NewReader(nil)
for i, s := range []string{encoded, invalid, partial, encoded, partial, invalid, encoded, encoded} {
if s == partial {
r.Reset(strings.NewReader(encoded))
if _, err := r.Read(make([]byte, 101)); err != nil {
t.Errorf("#%d: %v", i, err)
continue
}
continue
}
r.Reset(strings.NewReader(s))
got, err := ioutil.ReadAll(r)
switch s {
case encoded:
if err != nil {
t.Errorf("#%d: %v", i, err)
continue
}
if err := cmp(got, gold); err != nil {
t.Errorf("#%d: %v", i, err)
continue
}
case invalid:
if err == nil {
t.Errorf("#%d: got nil error, want non-nil", i)
continue
}
}
}
}
func TestWriterReset(t *testing.T) {
gold := bytes.Repeat([]byte("Not all those who wander are lost;\n"), 10000)
var gots, wants [][]byte
const n = 20
w, failed := NewWriter(nil), false
for i := 0; i <= n; i++ {
buf := new(bytes.Buffer)
w.Reset(buf)
want := gold[:len(gold)*i/n]
if _, err := w.Write(want); err != nil {
t.Errorf("#%d: Write: %v", i, err)
failed = true
continue
}
got, err := ioutil.ReadAll(NewReader(buf))
if err != nil {
t.Errorf("#%d: ReadAll: %v", i, err)
failed = true
continue
}
gots = append(gots, got)
wants = append(wants, want)
}
if failed {
return
}
for i := range gots {
if err := cmp(gots[i], wants[i]); err != nil {
t.Errorf("#%d: %v", i, err)
}
}
}
func benchDecode(b *testing.B, src []byte) { func benchDecode(b *testing.B, src []byte) {
encoded, err := Encode(nil, src) encoded, err := Encode(nil, src)
if err != nil { if err != nil {
@ -102,7 +219,7 @@ func benchEncode(b *testing.B, src []byte) {
} }
} }
func readFile(b *testing.B, filename string) []byte { func readFile(b testing.TB, filename string) []byte {
src, err := ioutil.ReadFile(filename) src, err := ioutil.ReadFile(filename)
if err != nil { if err != nil {
b.Fatalf("failed reading %s: %s", filename, err) b.Fatalf("failed reading %s: %s", filename, err)
@ -144,7 +261,7 @@ func BenchmarkWordsEncode1e5(b *testing.B) { benchWords(b, 1e5, false) }
func BenchmarkWordsEncode1e6(b *testing.B) { benchWords(b, 1e6, false) } func BenchmarkWordsEncode1e6(b *testing.B) { benchWords(b, 1e6, false) }
// testFiles' values are copied directly from // testFiles' values are copied directly from
// https://code.google.com/p/snappy/source/browse/trunk/snappy_unittest.cc. // https://raw.githubusercontent.com/google/snappy/master/snappy_unittest.cc
// The label field is unused in snappy-go. // The label field is unused in snappy-go.
var testFiles = []struct { var testFiles = []struct {
label string label string
@ -152,29 +269,36 @@ var testFiles = []struct {
}{ }{
{"html", "html"}, {"html", "html"},
{"urls", "urls.10K"}, {"urls", "urls.10K"},
{"jpg", "house.jpg"}, {"jpg", "fireworks.jpeg"},
{"pdf", "mapreduce-osdi-1.pdf"}, {"jpg_200", "fireworks.jpeg"},
{"pdf", "paper-100k.pdf"},
{"html4", "html_x_4"}, {"html4", "html_x_4"},
{"cp", "cp.html"},
{"c", "fields.c"},
{"lsp", "grammar.lsp"},
{"xls", "kennedy.xls"},
{"txt1", "alice29.txt"}, {"txt1", "alice29.txt"},
{"txt2", "asyoulik.txt"}, {"txt2", "asyoulik.txt"},
{"txt3", "lcet10.txt"}, {"txt3", "lcet10.txt"},
{"txt4", "plrabn12.txt"}, {"txt4", "plrabn12.txt"},
{"bin", "ptt5"},
{"sum", "sum"},
{"man", "xargs.1"},
{"pb", "geo.protodata"}, {"pb", "geo.protodata"},
{"gaviota", "kppkn.gtb"}, {"gaviota", "kppkn.gtb"},
} }
// The test data files are present at this canonical URL. // The test data files are present at this canonical URL.
const baseURL = "https://snappy.googlecode.com/svn/trunk/testdata/" const baseURL = "https://raw.githubusercontent.com/google/snappy/master/testdata/"
func downloadTestdata(basename string) (errRet error) { func downloadTestdata(basename string) (errRet error) {
filename := filepath.Join("testdata", basename) filename := filepath.Join(*testdata, basename)
if stat, err := os.Stat(filename); err == nil && stat.Size() != 0 {
return nil
}
if !*download {
return fmt.Errorf("test data not found; skipping benchmark without the -download flag")
}
// Download the official snappy C++ implementation reference test data
// files for benchmarking.
if err := os.Mkdir(*testdata, 0777); err != nil && !os.IsExist(err) {
return fmt.Errorf("failed to create testdata: %s", err)
}
f, err := os.Create(filename) f, err := os.Create(filename)
if err != nil { if err != nil {
return fmt.Errorf("failed to create %s: %s", filename, err) return fmt.Errorf("failed to create %s: %s", filename, err)
@ -185,36 +309,27 @@ func downloadTestdata(basename string) (errRet error) {
os.Remove(filename) os.Remove(filename)
} }
}() }()
resp, err := http.Get(baseURL + basename) url := baseURL + basename
resp, err := http.Get(url)
if err != nil { if err != nil {
return fmt.Errorf("failed to download %s: %s", baseURL+basename, err) return fmt.Errorf("failed to download %s: %s", url, err)
} }
defer resp.Body.Close() defer resp.Body.Close()
if s := resp.StatusCode; s != http.StatusOK {
return fmt.Errorf("downloading %s: HTTP status code %d (%s)", url, s, http.StatusText(s))
}
_, err = io.Copy(f, resp.Body) _, err = io.Copy(f, resp.Body)
if err != nil { if err != nil {
return fmt.Errorf("failed to write %s: %s", filename, err) return fmt.Errorf("failed to download %s to %s: %s", url, filename, err)
} }
return nil return nil
} }
func benchFile(b *testing.B, n int, decode bool) { func benchFile(b *testing.B, n int, decode bool) {
filename := filepath.Join("testdata", testFiles[n].filename) if err := downloadTestdata(testFiles[n].filename); err != nil {
if stat, err := os.Stat(filename); err != nil || stat.Size() == 0 { b.Fatalf("failed to download testdata: %s", err)
if !*download {
b.Fatal("test data not found; skipping benchmark without the -download flag")
}
// Download the official snappy C++ implementation reference test data
// files for benchmarking.
if err := os.Mkdir("testdata", 0777); err != nil && !os.IsExist(err) {
b.Fatalf("failed to create testdata: %s", err)
}
for _, tf := range testFiles {
if err := downloadTestdata(tf.filename); err != nil {
b.Fatalf("failed to download testdata: %s", err)
}
}
} }
data := readFile(b, filename) data := readFile(b, filepath.Join(*testdata, testFiles[n].filename))
if decode { if decode {
benchDecode(b, data) benchDecode(b, data)
} else { } else {
@ -235,12 +350,6 @@ func Benchmark_UFlat8(b *testing.B) { benchFile(b, 8, true) }
func Benchmark_UFlat9(b *testing.B) { benchFile(b, 9, true) } func Benchmark_UFlat9(b *testing.B) { benchFile(b, 9, true) }
func Benchmark_UFlat10(b *testing.B) { benchFile(b, 10, true) } func Benchmark_UFlat10(b *testing.B) { benchFile(b, 10, true) }
func Benchmark_UFlat11(b *testing.B) { benchFile(b, 11, true) } func Benchmark_UFlat11(b *testing.B) { benchFile(b, 11, true) }
func Benchmark_UFlat12(b *testing.B) { benchFile(b, 12, true) }
func Benchmark_UFlat13(b *testing.B) { benchFile(b, 13, true) }
func Benchmark_UFlat14(b *testing.B) { benchFile(b, 14, true) }
func Benchmark_UFlat15(b *testing.B) { benchFile(b, 15, true) }
func Benchmark_UFlat16(b *testing.B) { benchFile(b, 16, true) }
func Benchmark_UFlat17(b *testing.B) { benchFile(b, 17, true) }
func Benchmark_ZFlat0(b *testing.B) { benchFile(b, 0, false) } func Benchmark_ZFlat0(b *testing.B) { benchFile(b, 0, false) }
func Benchmark_ZFlat1(b *testing.B) { benchFile(b, 1, false) } func Benchmark_ZFlat1(b *testing.B) { benchFile(b, 1, false) }
func Benchmark_ZFlat2(b *testing.B) { benchFile(b, 2, false) } func Benchmark_ZFlat2(b *testing.B) { benchFile(b, 2, false) }
@ -253,9 +362,3 @@ func Benchmark_ZFlat8(b *testing.B) { benchFile(b, 8, false) }
func Benchmark_ZFlat9(b *testing.B) { benchFile(b, 9, false) } func Benchmark_ZFlat9(b *testing.B) { benchFile(b, 9, false) }
func Benchmark_ZFlat10(b *testing.B) { benchFile(b, 10, false) } func Benchmark_ZFlat10(b *testing.B) { benchFile(b, 10, false) }
func Benchmark_ZFlat11(b *testing.B) { benchFile(b, 11, false) } func Benchmark_ZFlat11(b *testing.B) { benchFile(b, 11, false) }
func Benchmark_ZFlat12(b *testing.B) { benchFile(b, 12, false) }
func Benchmark_ZFlat13(b *testing.B) { benchFile(b, 13, false) }
func Benchmark_ZFlat14(b *testing.B) { benchFile(b, 14, false) }
func Benchmark_ZFlat15(b *testing.B) { benchFile(b, 15, false) }
func Benchmark_ZFlat16(b *testing.B) { benchFile(b, 16, false) }
func Benchmark_ZFlat17(b *testing.B) { benchFile(b, 17, false) }

View File

@ -38,6 +38,7 @@ test_rpl:
$(GO) test --race -tags '$(GO_BUILD_TAGS)' ./rpl $(GO) test --race -tags '$(GO_BUILD_TAGS)' ./rpl
clean: clean:
rm -rf Godeps/_workspace/pkg/
$(GO) clean -i ./... $(GO) clean -i ./...
fmt: fmt:

View File

@ -113,5 +113,7 @@ func main() {
<-sc <-sc
println("ledis-server is closing")
app.Close() app.Close()
println("ledis-server is closed")
} }

View File

@ -230,3 +230,10 @@ func (l *Ledis) checkTTL() {
func (l *Ledis) StoreStat() *store.Stat { func (l *Ledis) StoreStat() *store.Stat {
return l.ldb.Stat() return l.ldb.Stat()
} }
func (l *Ledis) CompactStore() error {
l.wLock.Lock()
defer l.wLock.Unlock()
return l.ldb.Compact()
}

View File

@ -13,6 +13,7 @@ import (
"net" "net"
"runtime" "runtime"
"strconv" "strconv"
"syscall"
"time" "time"
) )
@ -165,6 +166,13 @@ func (c *respClient) handleRequest(reqData [][]byte) error {
c.resp.writeStatus(OK) c.resp.writeStatus(OK)
c.resp.flush() c.resp.flush()
c.conn.Close() c.conn.Close()
return errClientQuit
} else if c.cmd == "shutdown" {
c.conn.Close()
// send kill signal
syscall.Kill(syscall.Getpid(), syscall.SIGTERM)
return errClientQuit return errClientQuit
} }