diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 2773e88..b02ba59 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -1,6 +1,6 @@ { "ImportPath": "github.com/siddontang/ledisdb", - "GoVersion": "go1.4.2", + "GoVersion": "go1.3.3", "Packages": [ "./..." ], @@ -11,8 +11,8 @@ }, { "ImportPath": "github.com/boltdb/bolt", - "Comment": "v1.0-28-gb8dbe11", - "Rev": "b8dbe1101d74c30e817227a99716591e3e6e6bc4" + "Comment": "v1.0-62-gee95430", + "Rev": "ee954308d64186f0fc9b7022b6178977848c17a3" }, { "ImportPath": "github.com/cupcake/rdb", @@ -24,39 +24,39 @@ }, { "ImportPath": "github.com/siddontang/go/bson", - "Rev": "c2b33271306fcb7c6532efceac33ec45ee2439e0" + "Rev": "530a23162549a31baa14dfa3b647a9eccee8878f" }, { "ImportPath": "github.com/siddontang/go/filelock", - "Rev": "c2b33271306fcb7c6532efceac33ec45ee2439e0" + "Rev": "530a23162549a31baa14dfa3b647a9eccee8878f" }, { "ImportPath": "github.com/siddontang/go/hack", - "Rev": "c2b33271306fcb7c6532efceac33ec45ee2439e0" + "Rev": "530a23162549a31baa14dfa3b647a9eccee8878f" }, { "ImportPath": "github.com/siddontang/go/ioutil2", - "Rev": "c2b33271306fcb7c6532efceac33ec45ee2439e0" + "Rev": "530a23162549a31baa14dfa3b647a9eccee8878f" }, { "ImportPath": "github.com/siddontang/go/log", - "Rev": "c2b33271306fcb7c6532efceac33ec45ee2439e0" + "Rev": "530a23162549a31baa14dfa3b647a9eccee8878f" }, { "ImportPath": "github.com/siddontang/go/num", - "Rev": "c2b33271306fcb7c6532efceac33ec45ee2439e0" + "Rev": "530a23162549a31baa14dfa3b647a9eccee8878f" }, { "ImportPath": "github.com/siddontang/go/snappy", - "Rev": "c2b33271306fcb7c6532efceac33ec45ee2439e0" + "Rev": "530a23162549a31baa14dfa3b647a9eccee8878f" }, { "ImportPath": "github.com/siddontang/go/sync2", - "Rev": "c2b33271306fcb7c6532efceac33ec45ee2439e0" + "Rev": "530a23162549a31baa14dfa3b647a9eccee8878f" }, { "ImportPath": "github.com/siddontang/goredis", - "Rev": "802ef3bdd5f642335f9ed132e024e5e2fd3d03ce" + "Rev": "760763f78400635ed7b9b115511b8ed06035e908" }, { "ImportPath": "github.com/siddontang/rdb", @@ -64,11 +64,11 @@ }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb", - "Rev": "e9e2c8f6d3b9c313fb4acaac5ab06285bcf30b04" + "Rev": "4875955338b0a434238a31165cb87255ab6e9e4a" }, { "ImportPath": "github.com/syndtr/gosnappy/snappy", - "Rev": "ce8acff4829e0c2458a67ead32390ac0a381c862" + "Rev": "156a073208e131d7d2e212cb749feae7c339e846" }, { "ImportPath": "github.com/ugorji/go/codec", diff --git a/Godeps/_workspace/src/github.com/boltdb/bolt/README.md b/Godeps/_workspace/src/github.com/boltdb/bolt/README.md index 4b5c63f..d106e36 100644 --- a/Godeps/_workspace/src/github.com/boltdb/bolt/README.md +++ b/Godeps/_workspace/src/github.com/boltdb/bolt/README.md @@ -16,7 +16,7 @@ and setting values. That's it. ## Project Status -Bolt is stable and the API is fixed. Full unit test coverage and randomized +Bolt is stable and the API is fixed. Full unit test coverage and randomized black box testing are used to ensure database consistency and thread safety. Bolt is currently in high-load production environments serving databases as large as 1TB. Many companies such as Shopify and Heroku use Bolt-backed @@ -120,11 +120,53 @@ err := db.View(func(tx *bolt.Tx) error { }) ``` -You also get a consistent view of the database within this closure, however, +You also get a consistent view of the database within this closure, however, no mutating operations are allowed within a read-only transaction. You can only retrieve buckets, retrieve values, and copy the database within a read-only transaction. + +#### Batch read-write transactions + +Each `DB.Update()` waits for disk to commit the writes. This overhead +can be minimized by combining multiple updates with the `DB.Batch()` +function: + +```go +err := db.Batch(func(tx *bolt.Tx) error { + ... + return nil +}) +``` + +Concurrent Batch calls are opportunistically combined into larger +transactions. Batch is only useful when there are multiple goroutines +calling it. + +The trade-off is that `Batch` can call the given +function multiple times, if parts of the transaction fail. The +function must be idempotent and side effects must take effect only +after a successful return from `DB.Batch()`. + +For example: don't display messages from inside the function, instead +set variables in the enclosing scope: + +```go +var id uint64 +err := db.Batch(func(tx *bolt.Tx) error { + // Find last key in bucket, decode as bigendian uint64, increment + // by one, encode back to []byte, and add new key. + ... + id = newValue + return nil +}) +if err != nil { + return ... +} +fmt.Println("Allocated ID %d", id) +``` + + #### Managing transactions manually The `DB.View()` and `DB.Update()` functions are wrappers around the `DB.Begin()` @@ -145,7 +187,7 @@ if err != nil { defer tx.Rollback() // Use the transaction... -_, err := tx.CreateBucket([]byte("MyBucket") +_, err := tx.CreateBucket([]byte("MyBucket")) if err != nil { return err } @@ -216,6 +258,10 @@ set to a key which is different than the key not existing. Use the `Bucket.Delete()` function to delete a key from the bucket. +Please note that values returned from `Get()` are only valid while the +transaction is open. If you need to use a value outside of the transaction +then you must use `copy()` to copy it to another byte slice. + ### Iterating over keys @@ -328,7 +374,7 @@ func (*Bucket) DeleteBucket(key []byte) error ### Database backups -Bolt is a single file so it's easy to backup. You can use the `Tx.Copy()` +Bolt is a single file so it's easy to backup. You can use the `Tx.WriteTo()` function to write a consistent view of the database to a writer. If you call this from a read-only transaction, it will perform a hot backup and not block your other database reads and writes. It will also use `O_DIRECT` when available @@ -339,11 +385,12 @@ do database backups: ```go func BackupHandleFunc(w http.ResponseWriter, req *http.Request) { - err := db.View(func(tx bolt.Tx) error { + err := db.View(func(tx *bolt.Tx) error { w.Header().Set("Content-Type", "application/octet-stream") w.Header().Set("Content-Disposition", `attachment; filename="my.db"`) w.Header().Set("Content-Length", strconv.Itoa(int(tx.Size()))) - return tx.Copy(w) + _, err := tx.WriteTo(w) + return err }) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -385,7 +432,7 @@ go func() { // Grab the current stats and diff them. stats := db.Stats() diff := stats.Sub(&prev) - + // Encode stats to JSON and print to STDERR. json.NewEncoder(os.Stderr).Encode(diff) @@ -404,7 +451,7 @@ or to provide an HTTP endpoint that will perform a fixed-length sample. For more information on getting started with Bolt, check out the following articles: * [Intro to BoltDB: Painless Performant Persistence](http://npf.io/2014/07/intro-to-boltdb-painless-performant-persistence/) by [Nate Finch](https://github.com/natefinch). - +* [Bolt -- an embedded key/value database for Go](https://www.progville.com/go/bolt-embedded-db-golang/) by Progville ## Comparison with other databases @@ -453,9 +500,9 @@ lock-free MVCC using a single writer and multiple readers. The two projects have somewhat diverged. LMDB heavily focuses on raw performance while Bolt has focused on simplicity and ease of use. For example, LMDB allows -several unsafe actions such as direct writes and append writes for the sake of -performance. Bolt opts to disallow actions which can leave the database in a -corrupted state. The only exception to this in Bolt is `DB.NoSync`. +several unsafe actions such as direct writes for the sake of performance. Bolt +opts to disallow actions which can leave the database in a corrupted state. The +only exception to this in Bolt is `DB.NoSync`. There are also a few differences in API. LMDB requires a maximum mmap size when opening an `mdb_env` whereas Bolt will handle incremental mmap resizing @@ -503,11 +550,23 @@ Here are a few things to note when evaluating and using Bolt: However, this is expected and the OS will release memory as needed. Bolt can handle databases much larger than the available physical RAM. +* Because of the way pages are laid out on disk, Bolt cannot truncate data files + and return free pages back to the disk. Instead, Bolt maintains a free list + of unused pages within its data file. These free pages can be reused by later + transactions. This works well for many use cases as databases generally tend + to grow. However, it's important to note that deleting large chunks of data + will not allow you to reclaim that space on disk. + + For more information on page allocation, [see this comment][page-allocation]. + +[page-allocation]: https://github.com/boltdb/bolt/issues/308#issuecomment-74811638 + ## Other Projects Using Bolt Below is a list of public, open source projects that use Bolt: +* [Operation Go: A Routine Mission](http://gocode.io) - An online programming game for Golang using Bolt for user accounts and a leaderboard. * [Bazil](https://github.com/bazillion/bazil) - A file system that lets your data reside where it is most convenient for it to reside. * [DVID](https://github.com/janelia-flyem/dvid) - Added Bolt as optional storage engine and testing it against Basho-tuned leveldb. * [Skybox Analytics](https://github.com/skybox/skybox) - A standalone funnel analysis tool for web analytics. @@ -526,6 +585,6 @@ Below is a list of public, open source projects that use Bolt: * [bleve](http://www.blevesearch.com/) - A pure Go search engine similar to ElasticSearch that uses Bolt as the default storage backend. * [tentacool](https://github.com/optiflows/tentacool) - REST api server to manage system stuff (IP, DNS, Gateway...) on a linux server. * [SkyDB](https://github.com/skydb/sky) - Behavioral analytics database. +* [Seaweed File System](https://github.com/chrislusf/weed-fs) - Highly scalable distributed key~file system with O(1) disk read. If you are using Bolt in a project please send a pull request to add it to the list. - diff --git a/Godeps/_workspace/src/github.com/boltdb/bolt/batch.go b/Godeps/_workspace/src/github.com/boltdb/bolt/batch.go new file mode 100644 index 0000000..bef1f4a --- /dev/null +++ b/Godeps/_workspace/src/github.com/boltdb/bolt/batch.go @@ -0,0 +1,135 @@ +package bolt + +import ( + "errors" + "fmt" + "sync" + "time" +) + +// Batch calls fn as part of a batch. It behaves similar to Update, +// except: +// +// 1. concurrent Batch calls can be combined into a single Bolt +// transaction. +// +// 2. the function passed to Batch may be called multiple times, +// regardless of whether it returns error or not. +// +// This means that Batch function side effects must be idempotent and +// take permanent effect only after a successful return is seen in +// caller. +// +// Batch is only useful when there are multiple goroutines calling it. +func (db *DB) Batch(fn func(*Tx) error) error { + errCh := make(chan error, 1) + + db.batchMu.Lock() + if (db.batch == nil) || (db.batch != nil && len(db.batch.calls) >= db.MaxBatchSize) { + // There is no existing batch, or the existing batch is full; start a new one. + db.batch = &batch{ + db: db, + } + db.batch.timer = time.AfterFunc(db.MaxBatchDelay, db.batch.trigger) + } + db.batch.calls = append(db.batch.calls, call{fn: fn, err: errCh}) + if len(db.batch.calls) >= db.MaxBatchSize { + // wake up batch, it's ready to run + go db.batch.trigger() + } + db.batchMu.Unlock() + + err := <-errCh + if err == trySolo { + err = db.Update(fn) + } + return err +} + +type call struct { + fn func(*Tx) error + err chan<- error +} + +type batch struct { + db *DB + timer *time.Timer + start sync.Once + calls []call +} + +// trigger runs the batch if it hasn't already been run. +func (b *batch) trigger() { + b.start.Do(b.run) +} + +// run performs the transactions in the batch and communicates results +// back to DB.Batch. +func (b *batch) run() { + b.db.batchMu.Lock() + b.timer.Stop() + // Make sure no new work is added to this batch, but don't break + // other batches. + if b.db.batch == b { + b.db.batch = nil + } + b.db.batchMu.Unlock() + +retry: + for len(b.calls) > 0 { + var failIdx = -1 + err := b.db.Update(func(tx *Tx) error { + for i, c := range b.calls { + if err := safelyCall(c.fn, tx); err != nil { + failIdx = i + return err + } + } + return nil + }) + + if failIdx >= 0 { + // take the failing transaction out of the batch. it's + // safe to shorten b.calls here because db.batch no longer + // points to us, and we hold the mutex anyway. + c := b.calls[failIdx] + b.calls[failIdx], b.calls = b.calls[len(b.calls)-1], b.calls[:len(b.calls)-1] + // tell the submitter re-run it solo, continue with the rest of the batch + c.err <- trySolo + continue retry + } + + // pass success, or bolt internal errors, to all callers + for _, c := range b.calls { + if c.err != nil { + c.err <- err + } + } + break retry + } +} + +// trySolo is a special sentinel error value used for signaling that a +// transaction function should be re-run. It should never be seen by +// callers. +var trySolo = errors.New("batch function returned an error and should be re-run solo") + +type panicked struct { + reason interface{} +} + +func (p panicked) Error() string { + if err, ok := p.reason.(error); ok { + return err.Error() + } + return fmt.Sprintf("panic: %v", p.reason) +} + +func safelyCall(fn func(*Tx) error, tx *Tx) (err error) { + defer func() { + if p := recover(); p != nil { + err = panicked{p} + } + }() + return fn(tx) +} diff --git a/Godeps/_workspace/src/github.com/boltdb/bolt/batch_benchmark_test.go b/Godeps/_workspace/src/github.com/boltdb/bolt/batch_benchmark_test.go new file mode 100644 index 0000000..b745a37 --- /dev/null +++ b/Godeps/_workspace/src/github.com/boltdb/bolt/batch_benchmark_test.go @@ -0,0 +1,170 @@ +package bolt_test + +import ( + "bytes" + "encoding/binary" + "errors" + "hash/fnv" + "sync" + "testing" + + "github.com/boltdb/bolt" +) + +func validateBatchBench(b *testing.B, db *TestDB) { + var rollback = errors.New("sentinel error to cause rollback") + validate := func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte("bench")) + h := fnv.New32a() + buf := make([]byte, 4) + for id := uint32(0); id < 1000; id++ { + binary.LittleEndian.PutUint32(buf, id) + h.Reset() + h.Write(buf[:]) + k := h.Sum(nil) + v := bucket.Get(k) + if v == nil { + b.Errorf("not found id=%d key=%x", id, k) + continue + } + if g, e := v, []byte("filler"); !bytes.Equal(g, e) { + b.Errorf("bad value for id=%d key=%x: %s != %q", id, k, g, e) + } + if err := bucket.Delete(k); err != nil { + return err + } + } + // should be empty now + c := bucket.Cursor() + for k, v := c.First(); k != nil; k, v = c.Next() { + b.Errorf("unexpected key: %x = %q", k, v) + } + return rollback + } + if err := db.Update(validate); err != nil && err != rollback { + b.Error(err) + } +} + +func BenchmarkDBBatchAutomatic(b *testing.B) { + db := NewTestDB() + defer db.Close() + db.MustCreateBucket([]byte("bench")) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + start := make(chan struct{}) + var wg sync.WaitGroup + + for round := 0; round < 1000; round++ { + wg.Add(1) + + go func(id uint32) { + defer wg.Done() + <-start + + h := fnv.New32a() + buf := make([]byte, 4) + binary.LittleEndian.PutUint32(buf, id) + h.Write(buf[:]) + k := h.Sum(nil) + insert := func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("bench")) + return b.Put(k, []byte("filler")) + } + if err := db.Batch(insert); err != nil { + b.Error(err) + return + } + }(uint32(round)) + } + close(start) + wg.Wait() + } + + b.StopTimer() + validateBatchBench(b, db) +} + +func BenchmarkDBBatchSingle(b *testing.B) { + db := NewTestDB() + defer db.Close() + db.MustCreateBucket([]byte("bench")) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + start := make(chan struct{}) + var wg sync.WaitGroup + + for round := 0; round < 1000; round++ { + wg.Add(1) + go func(id uint32) { + defer wg.Done() + <-start + + h := fnv.New32a() + buf := make([]byte, 4) + binary.LittleEndian.PutUint32(buf, id) + h.Write(buf[:]) + k := h.Sum(nil) + insert := func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("bench")) + return b.Put(k, []byte("filler")) + } + if err := db.Update(insert); err != nil { + b.Error(err) + return + } + }(uint32(round)) + } + close(start) + wg.Wait() + } + + b.StopTimer() + validateBatchBench(b, db) +} + +func BenchmarkDBBatchManual10x100(b *testing.B) { + db := NewTestDB() + defer db.Close() + db.MustCreateBucket([]byte("bench")) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + start := make(chan struct{}) + var wg sync.WaitGroup + + for major := 0; major < 10; major++ { + wg.Add(1) + go func(id uint32) { + defer wg.Done() + <-start + + insert100 := func(tx *bolt.Tx) error { + h := fnv.New32a() + buf := make([]byte, 4) + for minor := uint32(0); minor < 100; minor++ { + binary.LittleEndian.PutUint32(buf, uint32(id*100+minor)) + h.Reset() + h.Write(buf[:]) + k := h.Sum(nil) + b := tx.Bucket([]byte("bench")) + if err := b.Put(k, []byte("filler")); err != nil { + return err + } + } + return nil + } + if err := db.Update(insert100); err != nil { + b.Fatal(err) + } + }(uint32(major)) + } + close(start) + wg.Wait() + } + + b.StopTimer() + validateBatchBench(b, db) +} diff --git a/Godeps/_workspace/src/github.com/boltdb/bolt/batch_example_test.go b/Godeps/_workspace/src/github.com/boltdb/bolt/batch_example_test.go new file mode 100644 index 0000000..74eff8a --- /dev/null +++ b/Godeps/_workspace/src/github.com/boltdb/bolt/batch_example_test.go @@ -0,0 +1,148 @@ +package bolt_test + +import ( + "encoding/binary" + "fmt" + "io/ioutil" + "log" + "math/rand" + "net/http" + "net/http/httptest" + "os" + + "github.com/boltdb/bolt" +) + +// Set this to see how the counts are actually updated. +const verbose = false + +// Counter updates a counter in Bolt for every URL path requested. +type counter struct { + db *bolt.DB +} + +func (c counter) ServeHTTP(rw http.ResponseWriter, req *http.Request) { + // Communicates the new count from a successful database + // transaction. + var result uint64 + + increment := func(tx *bolt.Tx) error { + b, err := tx.CreateBucketIfNotExists([]byte("hits")) + if err != nil { + return err + } + key := []byte(req.URL.String()) + // Decode handles key not found for us. + count := decode(b.Get(key)) + 1 + b.Put(key, encode(count)) + // All good, communicate new count. + result = count + return nil + } + if err := c.db.Batch(increment); err != nil { + http.Error(rw, err.Error(), 500) + return + } + + if verbose { + log.Printf("server: %s: %d", req.URL.String(), result) + } + + rw.Header().Set("Content-Type", "application/octet-stream") + fmt.Fprintf(rw, "%d\n", result) +} + +func client(id int, base string, paths []string) error { + // Process paths in random order. + rng := rand.New(rand.NewSource(int64(id))) + permutation := rng.Perm(len(paths)) + + for i := range paths { + path := paths[permutation[i]] + resp, err := http.Get(base + path) + if err != nil { + return err + } + defer resp.Body.Close() + buf, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + if verbose { + log.Printf("client: %s: %s", path, buf) + } + } + return nil +} + +func ExampleDB_Batch() { + // Open the database. + db, _ := bolt.Open(tempfile(), 0666, nil) + defer os.Remove(db.Path()) + defer db.Close() + + // Start our web server + count := counter{db} + srv := httptest.NewServer(count) + defer srv.Close() + + // Decrease the batch size to make things more interesting. + db.MaxBatchSize = 3 + + // Get every path multiple times concurrently. + const clients = 10 + paths := []string{ + "/foo", + "/bar", + "/baz", + "/quux", + "/thud", + "/xyzzy", + } + errors := make(chan error, clients) + for i := 0; i < clients; i++ { + go func(id int) { + errors <- client(id, srv.URL, paths) + }(i) + } + // Check all responses to make sure there's no error. + for i := 0; i < clients; i++ { + if err := <-errors; err != nil { + fmt.Printf("client error: %v", err) + return + } + } + + // Check the final result + db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("hits")) + c := b.Cursor() + for k, v := c.First(); k != nil; k, v = c.Next() { + fmt.Printf("hits to %s: %d\n", k, decode(v)) + } + return nil + }) + + // Output: + // hits to /bar: 10 + // hits to /baz: 10 + // hits to /foo: 10 + // hits to /quux: 10 + // hits to /thud: 10 + // hits to /xyzzy: 10 +} + +// encode marshals a counter. +func encode(n uint64) []byte { + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, n) + return buf +} + +// decode unmarshals a counter. Nil buffers are decoded as 0. +func decode(buf []byte) uint64 { + if buf == nil { + return 0 + } + return binary.BigEndian.Uint64(buf) +} diff --git a/Godeps/_workspace/src/github.com/boltdb/bolt/batch_test.go b/Godeps/_workspace/src/github.com/boltdb/bolt/batch_test.go new file mode 100644 index 0000000..0b5075f --- /dev/null +++ b/Godeps/_workspace/src/github.com/boltdb/bolt/batch_test.go @@ -0,0 +1,167 @@ +package bolt_test + +import ( + "testing" + "time" + + "github.com/boltdb/bolt" +) + +// Ensure two functions can perform updates in a single batch. +func TestDB_Batch(t *testing.T) { + db := NewTestDB() + defer db.Close() + db.MustCreateBucket([]byte("widgets")) + + // Iterate over multiple updates in separate goroutines. + n := 2 + ch := make(chan error) + for i := 0; i < n; i++ { + go func(i int) { + ch <- db.Batch(func(tx *bolt.Tx) error { + return tx.Bucket([]byte("widgets")).Put(u64tob(uint64(i)), []byte{}) + }) + }(i) + } + + // Check all responses to make sure there's no error. + for i := 0; i < n; i++ { + if err := <-ch; err != nil { + t.Fatal(err) + } + } + + // Ensure data is correct. + db.MustView(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("widgets")) + for i := 0; i < n; i++ { + if v := b.Get(u64tob(uint64(i))); v == nil { + t.Errorf("key not found: %d", i) + } + } + return nil + }) +} + +func TestDB_Batch_Panic(t *testing.T) { + db := NewTestDB() + defer db.Close() + + var sentinel int + var bork = &sentinel + var problem interface{} + var err error + + // Execute a function inside a batch that panics. + func() { + defer func() { + if p := recover(); p != nil { + problem = p + } + }() + err = db.Batch(func(tx *bolt.Tx) error { + panic(bork) + }) + }() + + // Verify there is no error. + if g, e := err, error(nil); g != e { + t.Fatalf("wrong error: %v != %v", g, e) + } + // Verify the panic was captured. + if g, e := problem, bork; g != e { + t.Fatalf("wrong error: %v != %v", g, e) + } +} + +func TestDB_BatchFull(t *testing.T) { + db := NewTestDB() + defer db.Close() + db.MustCreateBucket([]byte("widgets")) + + const size = 3 + // buffered so we never leak goroutines + ch := make(chan error, size) + put := func(i int) { + ch <- db.Batch(func(tx *bolt.Tx) error { + return tx.Bucket([]byte("widgets")).Put(u64tob(uint64(i)), []byte{}) + }) + } + + db.MaxBatchSize = size + // high enough to never trigger here + db.MaxBatchDelay = 1 * time.Hour + + go put(1) + go put(2) + + // Give the batch a chance to exhibit bugs. + time.Sleep(10 * time.Millisecond) + + // not triggered yet + select { + case <-ch: + t.Fatalf("batch triggered too early") + default: + } + + go put(3) + + // Check all responses to make sure there's no error. + for i := 0; i < size; i++ { + if err := <-ch; err != nil { + t.Fatal(err) + } + } + + // Ensure data is correct. + db.MustView(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("widgets")) + for i := 1; i <= size; i++ { + if v := b.Get(u64tob(uint64(i))); v == nil { + t.Errorf("key not found: %d", i) + } + } + return nil + }) +} + +func TestDB_BatchTime(t *testing.T) { + db := NewTestDB() + defer db.Close() + db.MustCreateBucket([]byte("widgets")) + + const size = 1 + // buffered so we never leak goroutines + ch := make(chan error, size) + put := func(i int) { + ch <- db.Batch(func(tx *bolt.Tx) error { + return tx.Bucket([]byte("widgets")).Put(u64tob(uint64(i)), []byte{}) + }) + } + + db.MaxBatchSize = 1000 + db.MaxBatchDelay = 0 + + go put(1) + + // Batch must trigger by time alone. + + // Check all responses to make sure there's no error. + for i := 0; i < size; i++ { + if err := <-ch; err != nil { + t.Fatal(err) + } + } + + // Ensure data is correct. + db.MustView(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("widgets")) + for i := 1; i <= size; i++ { + if v := b.Get(u64tob(uint64(i))); v == nil { + t.Errorf("key not found: %d", i) + } + } + return nil + }) +} diff --git a/Godeps/_workspace/src/github.com/boltdb/bolt/bolt_386.go b/Godeps/_workspace/src/github.com/boltdb/bolt/bolt_386.go index cc21894..e659bfb 100644 --- a/Godeps/_workspace/src/github.com/boltdb/bolt/bolt_386.go +++ b/Godeps/_workspace/src/github.com/boltdb/bolt/bolt_386.go @@ -2,3 +2,6 @@ package bolt // maxMapSize represents the largest mmap size supported by Bolt. const maxMapSize = 0x7FFFFFFF // 2GB + +// maxAllocSize is the size used when creating array pointers. +const maxAllocSize = 0xFFFFFFF diff --git a/Godeps/_workspace/src/github.com/boltdb/bolt/bolt_amd64.go b/Godeps/_workspace/src/github.com/boltdb/bolt/bolt_amd64.go index 4262932..cca6b7e 100644 --- a/Godeps/_workspace/src/github.com/boltdb/bolt/bolt_amd64.go +++ b/Godeps/_workspace/src/github.com/boltdb/bolt/bolt_amd64.go @@ -2,3 +2,6 @@ package bolt // maxMapSize represents the largest mmap size supported by Bolt. const maxMapSize = 0xFFFFFFFFFFFF // 256TB + +// maxAllocSize is the size used when creating array pointers. +const maxAllocSize = 0x7FFFFFFF diff --git a/Godeps/_workspace/src/github.com/boltdb/bolt/bolt_arm.go b/Godeps/_workspace/src/github.com/boltdb/bolt/bolt_arm.go index cc21894..e659bfb 100644 --- a/Godeps/_workspace/src/github.com/boltdb/bolt/bolt_arm.go +++ b/Godeps/_workspace/src/github.com/boltdb/bolt/bolt_arm.go @@ -2,3 +2,6 @@ package bolt // maxMapSize represents the largest mmap size supported by Bolt. const maxMapSize = 0x7FFFFFFF // 2GB + +// maxAllocSize is the size used when creating array pointers. +const maxAllocSize = 0xFFFFFFF diff --git a/Godeps/_workspace/src/github.com/boltdb/bolt/bucket.go b/Godeps/_workspace/src/github.com/boltdb/bolt/bucket.go index 470689b..6766992 100644 --- a/Godeps/_workspace/src/github.com/boltdb/bolt/bucket.go +++ b/Godeps/_workspace/src/github.com/boltdb/bolt/bucket.go @@ -252,6 +252,7 @@ func (b *Bucket) DeleteBucket(key []byte) error { // Get retrieves the value for a key in the bucket. // Returns a nil value if the key does not exist or if the key is a nested bucket. +// The returned value is only valid for the life of the transaction. func (b *Bucket) Get(key []byte) []byte { k, v, flags := b.Cursor().seek(key) diff --git a/Godeps/_workspace/src/github.com/boltdb/bolt/cmd/bolt/info.go b/Godeps/_workspace/src/github.com/boltdb/bolt/cmd/bolt/info.go index 5606e68..cb01e38 100644 --- a/Godeps/_workspace/src/github.com/boltdb/bolt/cmd/bolt/info.go +++ b/Godeps/_workspace/src/github.com/boltdb/bolt/cmd/bolt/info.go @@ -22,5 +22,5 @@ func Info(path string) { // Print basic database info. var info = db.Info() - printf("Page Size: %d", info.PageSize) + printf("Page Size: %d\n", info.PageSize) } diff --git a/Godeps/_workspace/src/github.com/boltdb/bolt/cursor.go b/Godeps/_workspace/src/github.com/boltdb/bolt/cursor.go index 0d8ed16..006c548 100644 --- a/Godeps/_workspace/src/github.com/boltdb/bolt/cursor.go +++ b/Godeps/_workspace/src/github.com/boltdb/bolt/cursor.go @@ -10,6 +10,8 @@ import ( // Cursors see nested buckets with value == nil. // Cursors can be obtained from a transaction and are valid as long as the transaction is open. // +// Keys and values returned from the cursor are only valid for the life of the transaction. +// // Changing data while traversing with a cursor may cause it to be invalidated // and return unexpected keys and/or values. You must reposition your cursor // after mutating data. @@ -25,6 +27,7 @@ func (c *Cursor) Bucket() *Bucket { // First moves the cursor to the first item in the bucket and returns its key and value. // If the bucket is empty then a nil key and value are returned. +// The returned key and value are only valid for the life of the transaction. func (c *Cursor) First() (key []byte, value []byte) { _assert(c.bucket.tx.db != nil, "tx closed") c.stack = c.stack[:0] @@ -41,6 +44,7 @@ func (c *Cursor) First() (key []byte, value []byte) { // Last moves the cursor to the last item in the bucket and returns its key and value. // If the bucket is empty then a nil key and value are returned. +// The returned key and value are only valid for the life of the transaction. func (c *Cursor) Last() (key []byte, value []byte) { _assert(c.bucket.tx.db != nil, "tx closed") c.stack = c.stack[:0] @@ -58,6 +62,7 @@ func (c *Cursor) Last() (key []byte, value []byte) { // Next moves the cursor to the next item in the bucket and returns its key and value. // If the cursor is at the end of the bucket then a nil key and value are returned. +// The returned key and value are only valid for the life of the transaction. func (c *Cursor) Next() (key []byte, value []byte) { _assert(c.bucket.tx.db != nil, "tx closed") k, v, flags := c.next() @@ -69,6 +74,7 @@ func (c *Cursor) Next() (key []byte, value []byte) { // Prev moves the cursor to the previous item in the bucket and returns its key and value. // If the cursor is at the beginning of the bucket then a nil key and value are returned. +// The returned key and value are only valid for the life of the transaction. func (c *Cursor) Prev() (key []byte, value []byte) { _assert(c.bucket.tx.db != nil, "tx closed") @@ -100,6 +106,7 @@ func (c *Cursor) Prev() (key []byte, value []byte) { // Seek moves the cursor to a given key and returns it. // If the key does not exist then the next key is used. If no keys // follow, a nil key is returned. +// The returned key and value are only valid for the life of the transaction. func (c *Cursor) Seek(seek []byte) (key []byte, value []byte) { k, v, flags := c.seek(seek) diff --git a/Godeps/_workspace/src/github.com/boltdb/bolt/db.go b/Godeps/_workspace/src/github.com/boltdb/bolt/db.go index 773620d..8f0e90b 100644 --- a/Godeps/_workspace/src/github.com/boltdb/bolt/db.go +++ b/Godeps/_workspace/src/github.com/boltdb/bolt/db.go @@ -27,6 +27,12 @@ const magic uint32 = 0xED0CDAED // must be synchronzied using the msync(2) syscall. const IgnoreNoSync = runtime.GOOS == "openbsd" +// Default values if not set in a DB instance. +const ( + DefaultMaxBatchSize int = 1000 + DefaultMaxBatchDelay = 10 * time.Millisecond +) + // DB represents a collection of buckets persisted to a file on disk. // All data access is performed through transactions which can be obtained through the DB. // All the functions on DB will return a ErrDatabaseNotOpen if accessed before Open() is called. @@ -49,9 +55,25 @@ type DB struct { // THIS IS UNSAFE. PLEASE USE WITH CAUTION. NoSync bool + // MaxBatchSize is the maximum size of a batch. Default value is + // copied from DefaultMaxBatchSize in Open. + // + // If <=0, disables batching. + // + // Do not change concurrently with calls to Batch. + MaxBatchSize int + + // MaxBatchDelay is the maximum delay before a batch starts. + // Default value is copied from DefaultMaxBatchDelay in Open. + // + // If <=0, effectively disables batching. + // + // Do not change concurrently with calls to Batch. + MaxBatchDelay time.Duration + path string file *os.File - dataref []byte + dataref []byte // mmap'ed readonly, write throws SEGV data *[maxMapSize]byte datasz int meta0 *meta @@ -63,6 +85,9 @@ type DB struct { freelist *freelist stats Stats + batchMu sync.Mutex + batch *batch + rwlock sync.Mutex // Allows only one writer at a time. metalock sync.Mutex // Protects meta page access. mmaplock sync.RWMutex // Protects mmap access during remapping. @@ -99,6 +124,10 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) { options = DefaultOptions } + // Set default values for later DB operations. + db.MaxBatchSize = DefaultMaxBatchSize + db.MaxBatchDelay = DefaultMaxBatchDelay + // Open data file and separate sync handler for metadata writes. db.path = path @@ -215,7 +244,7 @@ func (db *DB) munmap() error { } // mmapSize determines the appropriate size for the mmap given the current size -// of the database. The minimum size is 4MB and doubles until it reaches 1GB. +// of the database. The minimum size is 1MB and doubles until it reaches 1GB. // Returns an error if the new mmap size is greater than the max allowed. func (db *DB) mmapSize(size int) (int, error) { // Double the size from 1MB until 1GB. @@ -231,9 +260,9 @@ func (db *DB) mmapSize(size int) (int, error) { } // If larger than 1GB then grow by 1GB at a time. - sz := int64(size) + int64(maxMmapStep) + sz := int64(size) if remainder := sz % int64(maxMmapStep); remainder > 0 { - sz -= remainder + sz += int64(maxMmapStep) - remainder } // Ensure that the mmap size is a multiple of the page size. diff --git a/Godeps/_workspace/src/github.com/boltdb/bolt/db_test.go b/Godeps/_workspace/src/github.com/boltdb/bolt/db_test.go index 4f05ad9..ad17e87 100644 --- a/Godeps/_workspace/src/github.com/boltdb/bolt/db_test.go +++ b/Godeps/_workspace/src/github.com/boltdb/bolt/db_test.go @@ -1,6 +1,7 @@ package bolt_test import ( + "encoding/binary" "errors" "flag" "fmt" @@ -125,6 +126,56 @@ func TestOpen_Size(t *testing.T) { } } +// Ensure that opening a database beyond the max step size does not increase its size. +// https://github.com/boltdb/bolt/issues/303 +func TestOpen_Size_Large(t *testing.T) { + if testing.Short() { + t.Skip("short mode") + } + + // Open a data file. + db := NewTestDB() + path := db.Path() + defer db.Close() + + // Insert until we get above the minimum 4MB size. + var index uint64 + for i := 0; i < 10000; i++ { + ok(t, db.Update(func(tx *bolt.Tx) error { + b, _ := tx.CreateBucketIfNotExists([]byte("data")) + for j := 0; j < 1000; j++ { + ok(t, b.Put(u64tob(index), make([]byte, 50))) + index++ + } + return nil + })) + } + + // Close database and grab the size. + db.DB.Close() + sz := fileSize(path) + if sz == 0 { + t.Fatalf("unexpected new file size: %d", sz) + } else if sz < (1 << 30) { + t.Fatalf("expected larger initial size: %d", sz) + } + + // Reopen database, update, and check size again. + db0, err := bolt.Open(path, 0666, nil) + ok(t, err) + ok(t, db0.Update(func(tx *bolt.Tx) error { return tx.Bucket([]byte("data")).Put([]byte{0}, []byte{0}) })) + ok(t, db0.Close()) + newSz := fileSize(path) + if newSz == 0 { + t.Fatalf("unexpected new file size: %d", newSz) + } + + // Compare the original size with the new size. + if sz != newSz { + t.Fatalf("unexpected file growth: %d => %d", sz, newSz) + } +} + // Ensure that a re-opened database is consistent. func TestOpen_Check(t *testing.T) { path := tempfile() @@ -567,6 +618,34 @@ func NewTestDB() *TestDB { return &TestDB{db} } +// MustView executes a read-only function. Panic on error. +func (db *TestDB) MustView(fn func(tx *bolt.Tx) error) { + if err := db.DB.View(func(tx *bolt.Tx) error { + return fn(tx) + }); err != nil { + panic(err.Error()) + } +} + +// MustUpdate executes a read-write function. Panic on error. +func (db *TestDB) MustUpdate(fn func(tx *bolt.Tx) error) { + if err := db.DB.View(func(tx *bolt.Tx) error { + return fn(tx) + }); err != nil { + panic(err.Error()) + } +} + +// MustCreateBucket creates a new bucket. Panic on error. +func (db *TestDB) MustCreateBucket(name []byte) { + if err := db.Update(func(tx *bolt.Tx) error { + _, err := tx.CreateBucket([]byte(name)) + return err + }); err != nil { + panic(err.Error()) + } +} + // Close closes the database and deletes the underlying file. func (db *TestDB) Close() { // Log statistics. @@ -699,3 +778,13 @@ func fileSize(path string) int64 { func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) } func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) } + +// u64tob converts a uint64 into an 8-byte slice. +func u64tob(v uint64) []byte { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, v) + return b +} + +// btou64 converts an 8-byte slice into an uint64. +func btou64(b []byte) uint64 { return binary.BigEndian.Uint64(b) } diff --git a/Godeps/_workspace/src/github.com/boltdb/bolt/page.go b/Godeps/_workspace/src/github.com/boltdb/bolt/page.go index b3dc473..58e43c4 100644 --- a/Godeps/_workspace/src/github.com/boltdb/bolt/page.go +++ b/Godeps/_workspace/src/github.com/boltdb/bolt/page.go @@ -8,7 +8,6 @@ import ( const pageHeaderSize = int(unsafe.Offsetof(((*page)(nil)).ptr)) -const maxAllocSize = 0xFFFFFFF const minKeysPerPage = 2 const branchPageElementSize = int(unsafe.Sizeof(branchPageElement{})) diff --git a/Godeps/_workspace/src/github.com/boltdb/bolt/tx.go b/Godeps/_workspace/src/github.com/boltdb/bolt/tx.go index c041d73..fda6a21 100644 --- a/Godeps/_workspace/src/github.com/boltdb/bolt/tx.go +++ b/Godeps/_workspace/src/github.com/boltdb/bolt/tx.go @@ -252,37 +252,42 @@ func (tx *Tx) close() { } // Copy writes the entire database to a writer. -// A reader transaction is maintained during the copy so it is safe to continue -// using the database while a copy is in progress. -// Copy will write exactly tx.Size() bytes into the writer. +// This function exists for backwards compatibility. Use WriteTo() in func (tx *Tx) Copy(w io.Writer) error { - var f *os.File - var err error + _, err := tx.WriteTo(w) + return err +} +// WriteTo writes the entire database to a writer. +// If err == nil then exactly tx.Size() bytes will be written into the writer. +func (tx *Tx) WriteTo(w io.Writer) (n int64, err error) { // Attempt to open reader directly. + var f *os.File if f, err = os.OpenFile(tx.db.path, os.O_RDONLY|odirect, 0); err != nil { // Fallback to a regular open if that doesn't work. if f, err = os.OpenFile(tx.db.path, os.O_RDONLY, 0); err != nil { - return err + return 0, err } } // Copy the meta pages. tx.db.metalock.Lock() - _, err = io.CopyN(w, f, int64(tx.db.pageSize*2)) + n, err = io.CopyN(w, f, int64(tx.db.pageSize*2)) tx.db.metalock.Unlock() if err != nil { _ = f.Close() - return fmt.Errorf("meta copy: %s", err) + return n, fmt.Errorf("meta copy: %s", err) } // Copy data pages. - if _, err := io.CopyN(w, f, tx.Size()-int64(tx.db.pageSize*2)); err != nil { + wn, err := io.CopyN(w, f, tx.Size()-int64(tx.db.pageSize*2)) + n += wn + if err != nil { _ = f.Close() - return err + return n, err } - return f.Close() + return n, f.Close() } // CopyFile copies the entire database to file at the given path. diff --git a/Godeps/_workspace/src/github.com/siddontang/go/log/log.go b/Godeps/_workspace/src/github.com/siddontang/go/log/log.go index 2fd6fd0..74cd76a 100644 --- a/Godeps/_workspace/src/github.com/siddontang/go/log/log.go +++ b/Godeps/_workspace/src/github.com/siddontang/go/log/log.go @@ -6,6 +6,7 @@ import ( "runtime" "strconv" "sync" + "sync/atomic" "time" ) @@ -31,35 +32,45 @@ const TimeFormat = "2006/01/02 15:04:05" const maxBufPoolSize = 16 -type Logger struct { - sync.Mutex +type atomicInt32 int32 - level int +func (i *atomicInt32) Set(n int) { + atomic.StoreInt32((*int32)(i), int32(n)) +} + +func (i *atomicInt32) Get() int { + return int(atomic.LoadInt32((*int32)(i))) +} + +type Logger struct { + level atomicInt32 flag int + hMutex sync.Mutex handler Handler quit chan struct{} msg chan []byte - bufs [][]byte + bufMutex sync.Mutex + bufs [][]byte wg sync.WaitGroup - closed bool + closed atomicInt32 } //new a logger with specified handler and flag func New(handler Handler, flag int) *Logger { var l = new(Logger) - l.level = LevelInfo + l.level.Set(LevelInfo) l.handler = handler l.flag = flag l.quit = make(chan struct{}) - l.closed = false + l.closed.Set(0) l.msg = make(chan []byte, 1024) @@ -88,7 +99,9 @@ func (l *Logger) run() { for { select { case msg := <-l.msg: + l.hMutex.Lock() l.handler.Write(msg) + l.hMutex.Unlock() l.putBuf(msg) case <-l.quit: //we must log all msg @@ -100,7 +113,7 @@ func (l *Logger) run() { } func (l *Logger) popBuf() []byte { - l.Lock() + l.bufMutex.Lock() var buf []byte if len(l.bufs) == 0 { buf = make([]byte, 0, 1024) @@ -108,25 +121,25 @@ func (l *Logger) popBuf() []byte { buf = l.bufs[len(l.bufs)-1] l.bufs = l.bufs[0 : len(l.bufs)-1] } - l.Unlock() + l.bufMutex.Unlock() return buf } func (l *Logger) putBuf(buf []byte) { - l.Lock() + l.bufMutex.Lock() if len(l.bufs) < maxBufPoolSize { buf = buf[0:0] l.bufs = append(l.bufs, buf) } - l.Unlock() + l.bufMutex.Unlock() } func (l *Logger) Close() { - if l.closed { + if l.closed.Get() == 1 { return } - l.closed = true + l.closed.Set(1) close(l.quit) @@ -139,16 +152,30 @@ func (l *Logger) Close() { //set log level, any log level less than it will not log func (l *Logger) SetLevel(level int) { - l.level = level + l.level.Set(level) } -func (l *Logger) Output(callDepth int, level int, s string) { - if l.closed { - //closed +func (l *Logger) SetHandler(h Handler) { + if l.closed.Get() == 1 { return } - if l.level > level { + l.hMutex.Lock() + if l.handler != nil { + l.handler.Close() + } + l.handler = h + l.hMutex.Unlock() +} + +func (l *Logger) Output(callDepth int, level int, s string) { + if l.closed.Get() == 1 { + // closed + return + } + + if l.level.Get() > level { + // higher level can be logged return } @@ -261,6 +288,10 @@ func SetLevel(level int) { std.SetLevel(level) } +func SetHandler(h Handler) { + std.SetHandler(h) +} + func Trace(v ...interface{}) { std.Output(2, LevelTrace, fmt.Sprint(v...)) } diff --git a/Godeps/_workspace/src/github.com/siddontang/go/log/log_test.go b/Godeps/_workspace/src/github.com/siddontang/go/log/log_test.go index dd3f904..2e29b31 100644 --- a/Godeps/_workspace/src/github.com/siddontang/go/log/log_test.go +++ b/Godeps/_workspace/src/github.com/siddontang/go/log/log_test.go @@ -16,7 +16,14 @@ func TestStdStreamLog(t *testing.T) { Info("hello world") + SetHandler(os.Stderr) + Infof("%s %d", "Hello", 123) + + SetLevel(LevelError) + + Infof("%s %d", "Hello", 123) + Fatalf("%s %d", "Hello", 123) } func TestRotatingFileLog(t *testing.T) { diff --git a/Godeps/_workspace/src/github.com/siddontang/go/sync2/semaphore.go b/Godeps/_workspace/src/github.com/siddontang/go/sync2/semaphore.go new file mode 100644 index 0000000..d310da7 --- /dev/null +++ b/Godeps/_workspace/src/github.com/siddontang/go/sync2/semaphore.go @@ -0,0 +1,65 @@ +package sync2 + +import ( + "sync" + "sync/atomic" + "time" +) + +func NewSemaphore(initialCount int) *Semaphore { + res := &Semaphore{ + counter: int64(initialCount), + } + res.cond.L = &res.lock + return res +} + +type Semaphore struct { + lock sync.Mutex + cond sync.Cond + counter int64 +} + +func (s *Semaphore) Release() { + s.lock.Lock() + s.counter += 1 + if s.counter >= 0 { + s.cond.Signal() + } + s.lock.Unlock() +} + +func (s *Semaphore) Acquire() { + s.lock.Lock() + for s.counter < 1 { + s.cond.Wait() + } + s.counter -= 1 + s.lock.Unlock() +} + +func (s *Semaphore) AcquireTimeout(timeout time.Duration) bool { + done := make(chan bool, 1) + // Gate used to communicate between the threads and decide what the result + // is. If the main thread decides, we have timed out, otherwise we succeed. + decided := new(int32) + go func() { + s.Acquire() + if atomic.SwapInt32(decided, 1) == 0 { + done <- true + } else { + // If we already decided the result, and this thread did not win + s.Release() + } + }() + select { + case <-done: + return true + case <-time.NewTimer(timeout).C: + if atomic.SwapInt32(decided, 1) == 1 { + // The other thread already decided the result + return true + } + return false + } +} diff --git a/Godeps/_workspace/src/github.com/siddontang/go/sync2/semaphore_test.go b/Godeps/_workspace/src/github.com/siddontang/go/sync2/semaphore_test.go new file mode 100644 index 0000000..8c48694 --- /dev/null +++ b/Godeps/_workspace/src/github.com/siddontang/go/sync2/semaphore_test.go @@ -0,0 +1,41 @@ +// Copyright 2012, Google Inc. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync2 + +import ( + "testing" + "time" +) + +func TestSemaNoTimeout(t *testing.T) { + s := NewSemaphore(1) + s.Acquire() + released := false + go func() { + time.Sleep(10 * time.Millisecond) + released = true + s.Release() + }() + s.Acquire() + if !released { + t.Errorf("want true, got false") + } +} + +func TestSemaTimeout(t *testing.T) { + s := NewSemaphore(1) + s.Acquire() + go func() { + time.Sleep(10 * time.Millisecond) + s.Release() + }() + if ok := s.AcquireTimeout(5 * time.Millisecond); ok { + t.Errorf("want false, got true") + } + time.Sleep(10 * time.Millisecond) + if ok := s.AcquireTimeout(5 * time.Millisecond); !ok { + t.Errorf("want true, got false") + } +} diff --git a/Godeps/_workspace/src/github.com/siddontang/goredis/conn.go b/Godeps/_workspace/src/github.com/siddontang/goredis/conn.go index 4a1f4e6..465041b 100644 --- a/Godeps/_workspace/src/github.com/siddontang/goredis/conn.go +++ b/Godeps/_workspace/src/github.com/siddontang/goredis/conn.go @@ -2,6 +2,7 @@ package goredis import ( "bufio" + "fmt" "io" "net" "sync/atomic" @@ -16,9 +17,7 @@ func (s *sizeWriter) Write(p []byte) (int, error) { } type Conn struct { - c net.Conn - br *bufio.Reader - bw *bufio.Writer + c net.Conn respReader *RespReader respWriter *RespWriter @@ -34,23 +33,33 @@ func Connect(addr string) (*Conn, error) { } func ConnectWithSize(addr string, readSize int, writeSize int) (*Conn, error) { - c := new(Conn) - - var err error - c.c, err = net.Dial(getProto(addr), addr) + conn, err := net.Dial(getProto(addr), addr) if err != nil { return nil, err } - c.br = bufio.NewReaderSize(io.TeeReader(c.c, &c.totalReadSize), readSize) - c.bw = bufio.NewWriterSize(io.MultiWriter(c.c, &c.totalWriteSize), writeSize) + return NewConnWithSize(conn, readSize, writeSize) +} - c.respReader = NewRespReader(c.br) - c.respWriter = NewRespWriter(c.bw) +func NewConn(conn net.Conn) (*Conn, error) { + return NewConnWithSize(conn, 1024, 1024) +} + +func NewConnWithSize(conn net.Conn, readSize int, writeSize int) (*Conn, error) { + c := new(Conn) + + c.c = conn + + br := bufio.NewReaderSize(io.TeeReader(c.c, &c.totalReadSize), readSize) + bw := bufio.NewWriterSize(io.MultiWriter(c.c, &c.totalWriteSize), writeSize) + + c.respReader = NewRespReader(br) + c.respWriter = NewRespWriter(bw) atomic.StoreInt32(&c.closed, 0) return c, nil + } func (c *Conn) Close() { @@ -75,14 +84,23 @@ func (c *Conn) GetTotalWriteSize() int64 { return int64(c.totalWriteSize) } -func (c *Conn) SetReadDeadline(t time.Time) { - c.c.SetReadDeadline(t) +func (c *Conn) SetReadDeadline(t time.Time) error { + return c.c.SetReadDeadline(t) } -func (c *Conn) SetWriteDeadline(t time.Time) { - c.c.SetWriteDeadline(t) +func (c *Conn) SetWriteDeadline(t time.Time) error { + return c.c.SetWriteDeadline(t) } +func (c *Conn) RemoteAddr() net.Addr { + return c.c.RemoteAddr() +} + +func (c *Conn) LocalAddr() net.Addr { + return c.c.LocalAddr() +} + +// Send RESP command and receive the reply func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) { if err := c.Send(cmd, args...); err != nil { return nil, err @@ -91,6 +109,7 @@ func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) { return c.Receive() } +// Send RESP command func (c *Conn) Send(cmd string, args ...interface{}) error { if err := c.respWriter.WriteCommand(cmd, args...); err != nil { c.Close() @@ -100,6 +119,7 @@ func (c *Conn) Send(cmd string, args ...interface{}) error { return nil } +// Receive RESP reply func (c *Conn) Receive() (interface{}, error) { if reply, err := c.respReader.Parse(); err != nil { c.Close() @@ -113,6 +133,7 @@ func (c *Conn) Receive() (interface{}, error) { } } +// Receive RESP bulk string reply into writer w func (c *Conn) ReceiveBulkTo(w io.Writer) error { err := c.respReader.ParseBulkTo(w) if err != nil { @@ -123,6 +144,31 @@ func (c *Conn) ReceiveBulkTo(w io.Writer) error { return err } +// Receive RESP command request, must array of bulk stirng +func (c *Conn) ReceiveRequest() ([][]byte, error) { + return c.respReader.ParseRequest() +} + +// Send RESP value, must be string, int64, []byte, error, nil or []interface{} +func (c *Conn) SendValue(v interface{}) error { + switch v := v.(type) { + case string: + return c.respWriter.FlushString(v) + case int64: + return c.respWriter.FlushInteger(v) + case []byte: + return c.respWriter.FlushBulk(v) + case []interface{}: + return c.respWriter.FlushArray(v) + case error: + return c.respWriter.FlushError(v) + case nil: + return c.respWriter.FlushBulk(nil) + default: + return fmt.Errorf("invalid type %T for send RESP value", v) + } +} + func (c *Client) newConn(addr string, pass string) (*Conn, error) { co, err := ConnectWithSize(addr, c.readBufferSize, c.writeBufferSize) if err != nil { diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go index d50a008..323353b 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go @@ -347,12 +347,14 @@ func recoverTable(s *session, o *opt.Options) error { return err } iter := tr.NewIterator(nil, nil) - iter.(iterator.ErrorCallbackSetter).SetErrorCallback(func(err error) { - if errors.IsCorrupted(err) { - s.logf("table@recovery block corruption @%d %q", file.Num(), err) - tcorruptedBlock++ - } - }) + if itererr, ok := iter.(iterator.ErrorCallbackSetter); ok { + itererr.SetErrorCallback(func(err error) { + if errors.IsCorrupted(err) { + s.logf("table@recovery block corruption @%d %q", file.Num(), err) + tcorruptedBlock++ + } + }) + } // Scan the table. for iter.Next() { diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go index 4607e5d..011a94a 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go @@ -8,6 +8,7 @@ package leveldb import ( "errors" + "math/rand" "runtime" "sync" "sync/atomic" @@ -80,6 +81,10 @@ func (db *DB) newIterator(seq uint64, slice *util.Range, ro *opt.ReadOptions) *d return iter } +func (db *DB) iterSamplingRate() int { + return rand.Intn(2 * db.s.o.GetIteratorSamplingRate()) +} + type dir int const ( @@ -98,11 +103,21 @@ type dbIter struct { seq uint64 strict bool - dir dir - key []byte - value []byte - err error - releaser util.Releaser + smaplingGap int + dir dir + key []byte + value []byte + err error + releaser util.Releaser +} + +func (i *dbIter) sampleSeek() { + ikey := i.iter.Key() + i.smaplingGap -= len(ikey) + len(i.iter.Value()) + for i.smaplingGap < 0 { + i.smaplingGap += i.db.iterSamplingRate() + i.db.sampleSeek(ikey) + } } func (i *dbIter) setErr(err error) { @@ -175,6 +190,7 @@ func (i *dbIter) Seek(key []byte) bool { func (i *dbIter) next() bool { for { if ukey, seq, kt, kerr := parseIkey(i.iter.Key()); kerr == nil { + i.sampleSeek() if seq <= i.seq { switch kt { case ktDel: @@ -225,6 +241,7 @@ func (i *dbIter) prev() bool { if i.iter.Valid() { for { if ukey, seq, kt, kerr := parseIkey(i.iter.Key()); kerr == nil { + i.sampleSeek() if seq <= i.seq { if !del && i.icmp.uCompare(ukey, i.key) < 0 { return true @@ -266,6 +283,7 @@ func (i *dbIter) Prev() bool { case dirForward: for i.iter.Prev() { if ukey, _, _, kerr := parseIkey(i.iter.Key()); kerr == nil { + i.sampleSeek() if i.icmp.uCompare(ukey, i.key) < 0 { goto cont } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go index 24ecab5..d4db9d6 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go @@ -48,6 +48,15 @@ func (db *DB) addSeq(delta uint64) { atomic.AddUint64(&db.seq, delta) } +func (db *DB) sampleSeek(ikey iKey) { + v := db.s.version() + if v.sampleSeek(ikey) { + // Trigger table compaction. + db.compSendTrigger(db.tcompCmdC) + } + v.release() +} + func (db *DB) mpoolPut(mem *memdb.DB) { defer func() { recover() diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go index 0acb567..38bfbf1 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go @@ -405,19 +405,21 @@ func (h *dbHarness) compactRange(min, max string) { t.Log("DB range compaction done") } -func (h *dbHarness) sizeAssert(start, limit string, low, hi uint64) { - t := h.t - db := h.db - - s, err := db.SizeOf([]util.Range{ +func (h *dbHarness) sizeOf(start, limit string) uint64 { + sz, err := h.db.SizeOf([]util.Range{ {[]byte(start), []byte(limit)}, }) if err != nil { - t.Error("SizeOf: got error: ", err) + h.t.Error("SizeOf: got error: ", err) } - if s.Sum() < low || s.Sum() > hi { - t.Errorf("sizeof %q to %q not in range, want %d - %d, got %d", - shorten(start), shorten(limit), low, hi, s.Sum()) + return sz.Sum() +} + +func (h *dbHarness) sizeAssert(start, limit string, low, hi uint64) { + sz := h.sizeOf(start, limit) + if sz < low || sz > hi { + h.t.Errorf("sizeOf %q to %q not in range, want %d - %d, got %d", + shorten(start), shorten(limit), low, hi, sz) } } @@ -2577,3 +2579,87 @@ func TestDB_TableCompactionBuilder(t *testing.T) { } v.release() } + +func testDB_IterTriggeredCompaction(t *testing.T, limitDiv int) { + const ( + vSize = 200 * opt.KiB + tSize = 100 * opt.MiB + mIter = 100 + n = tSize / vSize + ) + + h := newDbHarnessWopt(t, &opt.Options{ + Compression: opt.NoCompression, + DisableBlockCache: true, + }) + defer h.close() + + key := func(x int) string { + return fmt.Sprintf("v%06d", x) + } + + // Fill. + value := strings.Repeat("x", vSize) + for i := 0; i < n; i++ { + h.put(key(i), value) + } + h.compactMem() + + // Delete all. + for i := 0; i < n; i++ { + h.delete(key(i)) + } + h.compactMem() + + var ( + limit = n / limitDiv + + startKey = key(0) + limitKey = key(limit) + maxKey = key(n) + slice = &util.Range{Limit: []byte(limitKey)} + + initialSize0 = h.sizeOf(startKey, limitKey) + initialSize1 = h.sizeOf(limitKey, maxKey) + ) + + t.Logf("inital size %s [rest %s]", shortenb(int(initialSize0)), shortenb(int(initialSize1))) + + for r := 0; true; r++ { + if r >= mIter { + t.Fatal("taking too long to compact") + } + + // Iterates. + iter := h.db.NewIterator(slice, h.ro) + for iter.Next() { + } + if err := iter.Error(); err != nil { + t.Fatalf("Iter err: %v", err) + } + iter.Release() + + // Wait compaction. + h.waitCompaction() + + // Check size. + size0 := h.sizeOf(startKey, limitKey) + size1 := h.sizeOf(limitKey, maxKey) + t.Logf("#%03d size %s [rest %s]", r, shortenb(int(size0)), shortenb(int(size1))) + if size0 < initialSize0/10 { + break + } + } + + if initialSize1 > 0 { + h.sizeAssert(limitKey, maxKey, initialSize1/4-opt.MiB, initialSize1+opt.MiB) + } +} + +func TestDB_IterTriggeredCompaction(t *testing.T) { + testDB_IterTriggeredCompaction(t, 1) +} + +func TestDB_IterTriggeredCompactionHalf(t *testing.T) { + testDB_IterTriggeredCompaction(t, 2) +} diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go index 86828f4..61f0ead 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go @@ -34,10 +34,11 @@ var ( DefaultCompactionTotalSize = 10 * MiB DefaultCompactionTotalSizeMultiplier = 10.0 DefaultCompressionType = SnappyCompression - DefaultOpenFilesCacher = LRUCacher - DefaultOpenFilesCacheCapacity = 500 + DefaultIteratorSamplingRate = 1 * MiB DefaultMaxMemCompationLevel = 2 DefaultNumLevel = 7 + DefaultOpenFilesCacher = LRUCacher + DefaultOpenFilesCacheCapacity = 500 DefaultWriteBuffer = 4 * MiB DefaultWriteL0PauseTrigger = 12 DefaultWriteL0SlowdownTrigger = 8 @@ -153,7 +154,7 @@ type Options struct { BlockCacher Cacher // BlockCacheCapacity defines the capacity of the 'sorted table' block caching. - // Use -1 for zero, this has same effect with specifying NoCacher to BlockCacher. + // Use -1 for zero, this has same effect as specifying NoCacher to BlockCacher. // // The default value is 8MiB. BlockCacheCapacity int @@ -288,6 +289,13 @@ type Options struct { // The default value is nil. Filter filter.Filter + // IteratorSamplingRate defines approximate gap (in bytes) between read + // sampling of an iterator. The samples will be used to determine when + // compaction should be triggered. + // + // The default is 1MiB. + IteratorSamplingRate int + // MaxMemCompationLevel defines maximum level a newly compacted 'memdb' // will be pushed into if doesn't creates overlap. This should less than // NumLevel. Use -1 for level-0. @@ -308,7 +316,7 @@ type Options struct { OpenFilesCacher Cacher // OpenFilesCacheCapacity defines the capacity of the open files caching. - // Use -1 for zero, this has same effect with specifying NoCacher to OpenFilesCacher. + // Use -1 for zero, this has same effect as specifying NoCacher to OpenFilesCacher. // // The default value is 500. OpenFilesCacheCapacity int @@ -355,9 +363,9 @@ func (o *Options) GetBlockCacher() Cacher { } func (o *Options) GetBlockCacheCapacity() int { - if o == nil || o.BlockCacheCapacity <= 0 { + if o == nil || o.BlockCacheCapacity == 0 { return DefaultBlockCacheCapacity - } else if o.BlockCacheCapacity == -1 { + } else if o.BlockCacheCapacity < 0 { return 0 } return o.BlockCacheCapacity @@ -492,12 +500,19 @@ func (o *Options) GetFilter() filter.Filter { return o.Filter } +func (o *Options) GetIteratorSamplingRate() int { + if o == nil || o.IteratorSamplingRate <= 0 { + return DefaultIteratorSamplingRate + } + return o.IteratorSamplingRate +} + func (o *Options) GetMaxMemCompationLevel() int { level := DefaultMaxMemCompationLevel if o != nil { if o.MaxMemCompationLevel > 0 { level = o.MaxMemCompationLevel - } else if o.MaxMemCompationLevel == -1 { + } else if o.MaxMemCompationLevel < 0 { level = 0 } } @@ -525,9 +540,9 @@ func (o *Options) GetOpenFilesCacher() Cacher { } func (o *Options) GetOpenFilesCacheCapacity() int { - if o == nil || o.OpenFilesCacheCapacity <= 0 { + if o == nil || o.OpenFilesCacheCapacity == 0 { return DefaultOpenFilesCacheCapacity - } else if o.OpenFilesCacheCapacity == -1 { + } else if o.OpenFilesCacheCapacity < 0 { return 0 } return o.OpenFilesCacheCapacity diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/version.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/version.go index 5ab7b53..88a52f5 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/version.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/version.go @@ -136,9 +136,8 @@ func (v *version) get(ikey iKey, ro *opt.ReadOptions, noValue bool) (value []byt if !tseek { if tset == nil { tset = &tSet{level, t} - } else if tset.table.consumeSeek() <= 0 { + } else { tseek = true - tcomp = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset)) } } @@ -203,6 +202,28 @@ func (v *version) get(ikey iKey, ro *opt.ReadOptions, noValue bool) (value []byt return true }) + if tseek && tset.table.consumeSeek() <= 0 { + tcomp = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset)) + } + + return +} + +func (v *version) sampleSeek(ikey iKey) (tcomp bool) { + var tset *tSet + + v.walkOverlapping(ikey, func(level int, t *tFile) bool { + if tset == nil { + tset = &tSet{level, t} + return true + } else { + if tset.table.consumeSeek() <= 0 { + tcomp = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset)) + } + return false + } + }, nil) + return } diff --git a/Godeps/_workspace/src/github.com/syndtr/gosnappy/snappy/decode.go b/Godeps/_workspace/src/github.com/syndtr/gosnappy/snappy/decode.go index d93c1b9..552a17b 100644 --- a/Godeps/_workspace/src/github.com/syndtr/gosnappy/snappy/decode.go +++ b/Godeps/_workspace/src/github.com/syndtr/gosnappy/snappy/decode.go @@ -7,10 +7,15 @@ package snappy import ( "encoding/binary" "errors" + "io" ) -// ErrCorrupt reports that the input is invalid. -var ErrCorrupt = errors.New("snappy: corrupt input") +var ( + // ErrCorrupt reports that the input is invalid. + ErrCorrupt = errors.New("snappy: corrupt input") + // ErrUnsupported reports that the input isn't supported. + ErrUnsupported = errors.New("snappy: unsupported input") +) // DecodedLen returns the length of the decoded block. func DecodedLen(src []byte) (int, error) { @@ -122,3 +127,166 @@ func Decode(dst, src []byte) ([]byte, error) { } return dst[:d], nil } + +// NewReader returns a new Reader that decompresses from r, using the framing +// format described at +// https://code.google.com/p/snappy/source/browse/trunk/framing_format.txt +func NewReader(r io.Reader) *Reader { + return &Reader{ + r: r, + decoded: make([]byte, maxUncompressedChunkLen), + buf: make([]byte, MaxEncodedLen(maxUncompressedChunkLen)+checksumSize), + } +} + +// Reader is an io.Reader than can read Snappy-compressed bytes. +type Reader struct { + r io.Reader + err error + decoded []byte + buf []byte + // decoded[i:j] contains decoded bytes that have not yet been passed on. + i, j int + readHeader bool +} + +// Reset discards any buffered data, resets all state, and switches the Snappy +// reader to read from r. This permits reusing a Reader rather than allocating +// a new one. +func (r *Reader) Reset(reader io.Reader) { + r.r = reader + r.err = nil + r.i = 0 + r.j = 0 + r.readHeader = false +} + +func (r *Reader) readFull(p []byte) (ok bool) { + if _, r.err = io.ReadFull(r.r, p); r.err != nil { + if r.err == io.ErrUnexpectedEOF { + r.err = ErrCorrupt + } + return false + } + return true +} + +// Read satisfies the io.Reader interface. +func (r *Reader) Read(p []byte) (int, error) { + if r.err != nil { + return 0, r.err + } + for { + if r.i < r.j { + n := copy(p, r.decoded[r.i:r.j]) + r.i += n + return n, nil + } + if !r.readFull(r.buf[:4]) { + return 0, r.err + } + chunkType := r.buf[0] + if !r.readHeader { + if chunkType != chunkTypeStreamIdentifier { + r.err = ErrCorrupt + return 0, r.err + } + r.readHeader = true + } + chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16 + if chunkLen > len(r.buf) { + r.err = ErrUnsupported + return 0, r.err + } + + // The chunk types are specified at + // https://code.google.com/p/snappy/source/browse/trunk/framing_format.txt + switch chunkType { + case chunkTypeCompressedData: + // Section 4.2. Compressed data (chunk type 0x00). + if chunkLen < checksumSize { + r.err = ErrCorrupt + return 0, r.err + } + buf := r.buf[:chunkLen] + if !r.readFull(buf) { + return 0, r.err + } + checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24 + buf = buf[checksumSize:] + + n, err := DecodedLen(buf) + if err != nil { + r.err = err + return 0, r.err + } + if n > len(r.decoded) { + r.err = ErrCorrupt + return 0, r.err + } + if _, err := Decode(r.decoded, buf); err != nil { + r.err = err + return 0, r.err + } + if crc(r.decoded[:n]) != checksum { + r.err = ErrCorrupt + return 0, r.err + } + r.i, r.j = 0, n + continue + + case chunkTypeUncompressedData: + // Section 4.3. Uncompressed data (chunk type 0x01). + if chunkLen < checksumSize { + r.err = ErrCorrupt + return 0, r.err + } + buf := r.buf[:checksumSize] + if !r.readFull(buf) { + return 0, r.err + } + checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24 + // Read directly into r.decoded instead of via r.buf. + n := chunkLen - checksumSize + if !r.readFull(r.decoded[:n]) { + return 0, r.err + } + if crc(r.decoded[:n]) != checksum { + r.err = ErrCorrupt + return 0, r.err + } + r.i, r.j = 0, n + continue + + case chunkTypeStreamIdentifier: + // Section 4.1. Stream identifier (chunk type 0xff). + if chunkLen != len(magicBody) { + r.err = ErrCorrupt + return 0, r.err + } + if !r.readFull(r.buf[:len(magicBody)]) { + return 0, r.err + } + for i := 0; i < len(magicBody); i++ { + if r.buf[i] != magicBody[i] { + r.err = ErrCorrupt + return 0, r.err + } + } + continue + } + + if chunkType <= 0x7f { + // Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f). + r.err = ErrUnsupported + return 0, r.err + + } else { + // Section 4.4 Padding (chunk type 0xfe). + // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd). + if !r.readFull(r.buf[:chunkLen]) { + return 0, r.err + } + } + } +} diff --git a/Godeps/_workspace/src/github.com/syndtr/gosnappy/snappy/encode.go b/Godeps/_workspace/src/github.com/syndtr/gosnappy/snappy/encode.go index b2371db..dda3724 100644 --- a/Godeps/_workspace/src/github.com/syndtr/gosnappy/snappy/encode.go +++ b/Godeps/_workspace/src/github.com/syndtr/gosnappy/snappy/encode.go @@ -6,6 +6,7 @@ package snappy import ( "encoding/binary" + "io" ) // We limit how far copy back-references can go, the same as the C++ code. @@ -172,3 +173,86 @@ func MaxEncodedLen(srcLen int) int { // This last factor dominates the blowup, so the final estimate is: return 32 + srcLen + srcLen/6 } + +// NewWriter returns a new Writer that compresses to w, using the framing +// format described at +// https://code.google.com/p/snappy/source/browse/trunk/framing_format.txt +func NewWriter(w io.Writer) *Writer { + return &Writer{ + w: w, + enc: make([]byte, MaxEncodedLen(maxUncompressedChunkLen)), + } +} + +// Writer is an io.Writer than can write Snappy-compressed bytes. +type Writer struct { + w io.Writer + err error + enc []byte + buf [checksumSize + chunkHeaderSize]byte + wroteHeader bool +} + +// Reset discards the writer's state and switches the Snappy writer to write to +// w. This permits reusing a Writer rather than allocating a new one. +func (w *Writer) Reset(writer io.Writer) { + w.w = writer + w.err = nil + w.wroteHeader = false +} + +// Write satisfies the io.Writer interface. +func (w *Writer) Write(p []byte) (n int, errRet error) { + if w.err != nil { + return 0, w.err + } + if !w.wroteHeader { + copy(w.enc, magicChunk) + if _, err := w.w.Write(w.enc[:len(magicChunk)]); err != nil { + w.err = err + return n, err + } + w.wroteHeader = true + } + for len(p) > 0 { + var uncompressed []byte + if len(p) > maxUncompressedChunkLen { + uncompressed, p = p[:maxUncompressedChunkLen], p[maxUncompressedChunkLen:] + } else { + uncompressed, p = p, nil + } + checksum := crc(uncompressed) + + // Compress the buffer, discarding the result if the improvement + // isn't at least 12.5%. + chunkType := uint8(chunkTypeCompressedData) + chunkBody, err := Encode(w.enc, uncompressed) + if err != nil { + w.err = err + return n, err + } + if len(chunkBody) >= len(uncompressed)-len(uncompressed)/8 { + chunkType, chunkBody = chunkTypeUncompressedData, uncompressed + } + + chunkLen := 4 + len(chunkBody) + w.buf[0] = chunkType + w.buf[1] = uint8(chunkLen >> 0) + w.buf[2] = uint8(chunkLen >> 8) + w.buf[3] = uint8(chunkLen >> 16) + w.buf[4] = uint8(checksum >> 0) + w.buf[5] = uint8(checksum >> 8) + w.buf[6] = uint8(checksum >> 16) + w.buf[7] = uint8(checksum >> 24) + if _, err = w.w.Write(w.buf[:]); err != nil { + w.err = err + return n, err + } + if _, err = w.w.Write(chunkBody); err != nil { + w.err = err + return n, err + } + n += len(uncompressed) + } + return n, nil +} diff --git a/Godeps/_workspace/src/github.com/syndtr/gosnappy/snappy/snappy.go b/Godeps/_workspace/src/github.com/syndtr/gosnappy/snappy/snappy.go index 2f1b790..043bf3d 100644 --- a/Godeps/_workspace/src/github.com/syndtr/gosnappy/snappy/snappy.go +++ b/Godeps/_workspace/src/github.com/syndtr/gosnappy/snappy/snappy.go @@ -8,6 +8,10 @@ // The C++ snappy implementation is at http://code.google.com/p/snappy/ package snappy +import ( + "hash/crc32" +) + /* Each encoded block begins with the varint-encoded length of the decoded data, followed by a sequence of chunks. Chunks begin and end on byte boundaries. The @@ -36,3 +40,29 @@ const ( tagCopy2 = 0x02 tagCopy4 = 0x03 ) + +const ( + checksumSize = 4 + chunkHeaderSize = 4 + magicChunk = "\xff\x06\x00\x00" + magicBody + magicBody = "sNaPpY" + // https://code.google.com/p/snappy/source/browse/trunk/framing_format.txt says + // that "the uncompressed data in a chunk must be no longer than 65536 bytes". + maxUncompressedChunkLen = 65536 +) + +const ( + chunkTypeCompressedData = 0x00 + chunkTypeUncompressedData = 0x01 + chunkTypePadding = 0xfe + chunkTypeStreamIdentifier = 0xff +) + +var crcTable = crc32.MakeTable(crc32.Castagnoli) + +// crc implements the checksum specified in section 3 of +// https://code.google.com/p/snappy/source/browse/trunk/framing_format.txt +func crc(b []byte) uint32 { + c := crc32.Update(0, crcTable, b) + return uint32(c>>15|c<<17) + 0xa282ead8 +} diff --git a/Godeps/_workspace/src/github.com/syndtr/gosnappy/snappy/snappy_test.go b/Godeps/_workspace/src/github.com/syndtr/gosnappy/snappy/snappy_test.go index 7ba8392..0623385 100644 --- a/Godeps/_workspace/src/github.com/syndtr/gosnappy/snappy/snappy_test.go +++ b/Godeps/_workspace/src/github.com/syndtr/gosnappy/snappy/snappy_test.go @@ -18,7 +18,10 @@ import ( "testing" ) -var download = flag.Bool("download", false, "If true, download any missing files before running benchmarks") +var ( + download = flag.Bool("download", false, "If true, download any missing files before running benchmarks") + testdata = flag.String("testdata", "testdata", "Directory containing the test data") +) func roundtrip(b, ebuf, dbuf []byte) error { e, err := Encode(ebuf, b) @@ -55,11 +58,11 @@ func TestSmallCopy(t *testing.T) { } func TestSmallRand(t *testing.T) { - rand.Seed(27354294) + rng := rand.New(rand.NewSource(27354294)) for n := 1; n < 20000; n += 23 { b := make([]byte, n) - for i, _ := range b { - b[i] = uint8(rand.Uint32()) + for i := range b { + b[i] = uint8(rng.Uint32()) } if err := roundtrip(b, nil, nil); err != nil { t.Fatal(err) @@ -70,7 +73,7 @@ func TestSmallRand(t *testing.T) { func TestSmallRegular(t *testing.T) { for n := 1; n < 20000; n += 23 { b := make([]byte, n) - for i, _ := range b { + for i := range b { b[i] = uint8(i%10 + 'a') } if err := roundtrip(b, nil, nil); err != nil { @@ -79,6 +82,120 @@ func TestSmallRegular(t *testing.T) { } } +func cmp(a, b []byte) error { + if len(a) != len(b) { + return fmt.Errorf("got %d bytes, want %d", len(a), len(b)) + } + for i := range a { + if a[i] != b[i] { + return fmt.Errorf("byte #%d: got 0x%02x, want 0x%02x", i, a[i], b[i]) + } + } + return nil +} + +func TestFramingFormat(t *testing.T) { + // src is comprised of alternating 1e5-sized sequences of random + // (incompressible) bytes and repeated (compressible) bytes. 1e5 was chosen + // because it is larger than maxUncompressedChunkLen (64k). + src := make([]byte, 1e6) + rng := rand.New(rand.NewSource(1)) + for i := 0; i < 10; i++ { + if i%2 == 0 { + for j := 0; j < 1e5; j++ { + src[1e5*i+j] = uint8(rng.Intn(256)) + } + } else { + for j := 0; j < 1e5; j++ { + src[1e5*i+j] = uint8(i) + } + } + } + + buf := new(bytes.Buffer) + if _, err := NewWriter(buf).Write(src); err != nil { + t.Fatalf("Write: encoding: %v", err) + } + dst, err := ioutil.ReadAll(NewReader(buf)) + if err != nil { + t.Fatalf("ReadAll: decoding: %v", err) + } + if err := cmp(dst, src); err != nil { + t.Fatal(err) + } +} + +func TestReaderReset(t *testing.T) { + gold := bytes.Repeat([]byte("All that is gold does not glitter,\n"), 10000) + buf := new(bytes.Buffer) + if _, err := NewWriter(buf).Write(gold); err != nil { + t.Fatalf("Write: %v", err) + } + encoded, invalid, partial := buf.String(), "invalid", "partial" + r := NewReader(nil) + for i, s := range []string{encoded, invalid, partial, encoded, partial, invalid, encoded, encoded} { + if s == partial { + r.Reset(strings.NewReader(encoded)) + if _, err := r.Read(make([]byte, 101)); err != nil { + t.Errorf("#%d: %v", i, err) + continue + } + continue + } + r.Reset(strings.NewReader(s)) + got, err := ioutil.ReadAll(r) + switch s { + case encoded: + if err != nil { + t.Errorf("#%d: %v", i, err) + continue + } + if err := cmp(got, gold); err != nil { + t.Errorf("#%d: %v", i, err) + continue + } + case invalid: + if err == nil { + t.Errorf("#%d: got nil error, want non-nil", i) + continue + } + } + } +} + +func TestWriterReset(t *testing.T) { + gold := bytes.Repeat([]byte("Not all those who wander are lost;\n"), 10000) + var gots, wants [][]byte + const n = 20 + w, failed := NewWriter(nil), false + for i := 0; i <= n; i++ { + buf := new(bytes.Buffer) + w.Reset(buf) + want := gold[:len(gold)*i/n] + if _, err := w.Write(want); err != nil { + t.Errorf("#%d: Write: %v", i, err) + failed = true + continue + } + got, err := ioutil.ReadAll(NewReader(buf)) + if err != nil { + t.Errorf("#%d: ReadAll: %v", i, err) + failed = true + continue + } + gots = append(gots, got) + wants = append(wants, want) + } + if failed { + return + } + for i := range gots { + if err := cmp(gots[i], wants[i]); err != nil { + t.Errorf("#%d: %v", i, err) + } + } +} + func benchDecode(b *testing.B, src []byte) { encoded, err := Encode(nil, src) if err != nil { @@ -102,7 +219,7 @@ func benchEncode(b *testing.B, src []byte) { } } -func readFile(b *testing.B, filename string) []byte { +func readFile(b testing.TB, filename string) []byte { src, err := ioutil.ReadFile(filename) if err != nil { b.Fatalf("failed reading %s: %s", filename, err) @@ -144,7 +261,7 @@ func BenchmarkWordsEncode1e5(b *testing.B) { benchWords(b, 1e5, false) } func BenchmarkWordsEncode1e6(b *testing.B) { benchWords(b, 1e6, false) } // testFiles' values are copied directly from -// https://code.google.com/p/snappy/source/browse/trunk/snappy_unittest.cc. +// https://raw.githubusercontent.com/google/snappy/master/snappy_unittest.cc // The label field is unused in snappy-go. var testFiles = []struct { label string @@ -152,29 +269,36 @@ var testFiles = []struct { }{ {"html", "html"}, {"urls", "urls.10K"}, - {"jpg", "house.jpg"}, - {"pdf", "mapreduce-osdi-1.pdf"}, + {"jpg", "fireworks.jpeg"}, + {"jpg_200", "fireworks.jpeg"}, + {"pdf", "paper-100k.pdf"}, {"html4", "html_x_4"}, - {"cp", "cp.html"}, - {"c", "fields.c"}, - {"lsp", "grammar.lsp"}, - {"xls", "kennedy.xls"}, {"txt1", "alice29.txt"}, {"txt2", "asyoulik.txt"}, {"txt3", "lcet10.txt"}, {"txt4", "plrabn12.txt"}, - {"bin", "ptt5"}, - {"sum", "sum"}, - {"man", "xargs.1"}, {"pb", "geo.protodata"}, {"gaviota", "kppkn.gtb"}, } // The test data files are present at this canonical URL. -const baseURL = "https://snappy.googlecode.com/svn/trunk/testdata/" +const baseURL = "https://raw.githubusercontent.com/google/snappy/master/testdata/" func downloadTestdata(basename string) (errRet error) { - filename := filepath.Join("testdata", basename) + filename := filepath.Join(*testdata, basename) + if stat, err := os.Stat(filename); err == nil && stat.Size() != 0 { + return nil + } + + if !*download { + return fmt.Errorf("test data not found; skipping benchmark without the -download flag") + } + // Download the official snappy C++ implementation reference test data + // files for benchmarking. + if err := os.Mkdir(*testdata, 0777); err != nil && !os.IsExist(err) { + return fmt.Errorf("failed to create testdata: %s", err) + } + f, err := os.Create(filename) if err != nil { return fmt.Errorf("failed to create %s: %s", filename, err) @@ -185,36 +309,27 @@ func downloadTestdata(basename string) (errRet error) { os.Remove(filename) } }() - resp, err := http.Get(baseURL + basename) + url := baseURL + basename + resp, err := http.Get(url) if err != nil { - return fmt.Errorf("failed to download %s: %s", baseURL+basename, err) + return fmt.Errorf("failed to download %s: %s", url, err) } defer resp.Body.Close() + if s := resp.StatusCode; s != http.StatusOK { + return fmt.Errorf("downloading %s: HTTP status code %d (%s)", url, s, http.StatusText(s)) + } _, err = io.Copy(f, resp.Body) if err != nil { - return fmt.Errorf("failed to write %s: %s", filename, err) + return fmt.Errorf("failed to download %s to %s: %s", url, filename, err) } return nil } func benchFile(b *testing.B, n int, decode bool) { - filename := filepath.Join("testdata", testFiles[n].filename) - if stat, err := os.Stat(filename); err != nil || stat.Size() == 0 { - if !*download { - b.Fatal("test data not found; skipping benchmark without the -download flag") - } - // Download the official snappy C++ implementation reference test data - // files for benchmarking. - if err := os.Mkdir("testdata", 0777); err != nil && !os.IsExist(err) { - b.Fatalf("failed to create testdata: %s", err) - } - for _, tf := range testFiles { - if err := downloadTestdata(tf.filename); err != nil { - b.Fatalf("failed to download testdata: %s", err) - } - } + if err := downloadTestdata(testFiles[n].filename); err != nil { + b.Fatalf("failed to download testdata: %s", err) } - data := readFile(b, filename) + data := readFile(b, filepath.Join(*testdata, testFiles[n].filename)) if decode { benchDecode(b, data) } else { @@ -235,12 +350,6 @@ func Benchmark_UFlat8(b *testing.B) { benchFile(b, 8, true) } func Benchmark_UFlat9(b *testing.B) { benchFile(b, 9, true) } func Benchmark_UFlat10(b *testing.B) { benchFile(b, 10, true) } func Benchmark_UFlat11(b *testing.B) { benchFile(b, 11, true) } -func Benchmark_UFlat12(b *testing.B) { benchFile(b, 12, true) } -func Benchmark_UFlat13(b *testing.B) { benchFile(b, 13, true) } -func Benchmark_UFlat14(b *testing.B) { benchFile(b, 14, true) } -func Benchmark_UFlat15(b *testing.B) { benchFile(b, 15, true) } -func Benchmark_UFlat16(b *testing.B) { benchFile(b, 16, true) } -func Benchmark_UFlat17(b *testing.B) { benchFile(b, 17, true) } func Benchmark_ZFlat0(b *testing.B) { benchFile(b, 0, false) } func Benchmark_ZFlat1(b *testing.B) { benchFile(b, 1, false) } func Benchmark_ZFlat2(b *testing.B) { benchFile(b, 2, false) } @@ -253,9 +362,3 @@ func Benchmark_ZFlat8(b *testing.B) { benchFile(b, 8, false) } func Benchmark_ZFlat9(b *testing.B) { benchFile(b, 9, false) } func Benchmark_ZFlat10(b *testing.B) { benchFile(b, 10, false) } func Benchmark_ZFlat11(b *testing.B) { benchFile(b, 11, false) } -func Benchmark_ZFlat12(b *testing.B) { benchFile(b, 12, false) } -func Benchmark_ZFlat13(b *testing.B) { benchFile(b, 13, false) } -func Benchmark_ZFlat14(b *testing.B) { benchFile(b, 14, false) } -func Benchmark_ZFlat15(b *testing.B) { benchFile(b, 15, false) } -func Benchmark_ZFlat16(b *testing.B) { benchFile(b, 16, false) } -func Benchmark_ZFlat17(b *testing.B) { benchFile(b, 17, false) }