From 66d2b3b8ba78791b62943b5cfc3927bb4ada08bd Mon Sep 17 00:00:00 2001 From: siddontang Date: Mon, 7 Jul 2014 12:37:16 +0800 Subject: [PATCH] optimize cgo leveldb reduce memory copy, reduce cgo call number --- leveldb/db.go | 28 ++++++++--- leveldb/iterator.go | 112 +++++++++++++++++++++++++++++++++++------ leveldb/leveldb_ext.cc | 86 +++++++++++++++++++++++++++++++ leveldb/leveldb_ext.h | 38 ++++++++++++++ leveldb/snapshot.go | 6 ++- 5 files changed, 246 insertions(+), 24 deletions(-) create mode 100644 leveldb/leveldb_ext.cc create mode 100644 leveldb/leveldb_ext.h diff --git a/leveldb/db.go b/leveldb/db.go index ed61b18..0cc16e2 100644 --- a/leveldb/db.go +++ b/leveldb/db.go @@ -4,6 +4,7 @@ package leveldb /* #cgo LDFLAGS: -lleveldb #include +#include "leveldb_ext.h" */ import "C" @@ -210,7 +211,7 @@ func (db *DB) Clear() error { num := 0 for ; it.Valid(); it.Next() { - bc.Delete(it.Key()) + bc.Delete(it.RawKey()) num++ if num == 1000 { num = 0 @@ -234,7 +235,11 @@ func (db *DB) SyncPut(key, value []byte) error { } func (db *DB) Get(key []byte) ([]byte, error) { - return db.get(db.readOpts, key) + return db.get(nil, db.readOpts, key) +} + +func (db *DB) BufGet(r []byte, key []byte) ([]byte, error) { + return db.get(r, db.readOpts, key) } func (db *DB) Delete(key []byte) error { @@ -317,7 +322,7 @@ func (db *DB) put(wo *WriteOptions, key, value []byte) error { return nil } -func (db *DB) get(ro *ReadOptions, key []byte) ([]byte, error) { +func (db *DB) get(r []byte, ro *ReadOptions, key []byte) ([]byte, error) { var errStr *C.char var vallen C.size_t var k *C.char @@ -325,8 +330,10 @@ func (db *DB) get(ro *ReadOptions, key []byte) ([]byte, error) { k = (*C.char)(unsafe.Pointer(&key[0])) } - value := C.leveldb_get( - db.db, ro.Opt, k, C.size_t(len(key)), &vallen, &errStr) + var value *C.char + + c := C.leveldb_get_ext( + db.db, ro.Opt, k, C.size_t(len(key)), &value, &vallen, &errStr) if errStr != nil { return nil, saveError(errStr) @@ -336,8 +343,15 @@ func (db *DB) get(ro *ReadOptions, key []byte) ([]byte, error) { return nil, nil } - defer C.leveldb_free(unsafe.Pointer(value)) - return C.GoBytes(unsafe.Pointer(value), C.int(vallen)), nil + defer C.leveldb_get_free_ext(unsafe.Pointer(c)) + + if int(C.int(vallen)) > len(r) { + return C.GoBytes(unsafe.Pointer(value), C.int(vallen)), nil + } else { + b := slice(unsafe.Pointer(value), int(C.int(vallen))) + n := copy(r, b) + return r[0:n], nil + } } func (db *DB) delete(wo *WriteOptions, key []byte) error { diff --git a/leveldb/iterator.go b/leveldb/iterator.go index 4976e6c..3e4192a 100644 --- a/leveldb/iterator.go +++ b/leveldb/iterator.go @@ -3,6 +3,7 @@ package leveldb // #cgo LDFLAGS: -lleveldb // #include // #include "leveldb/c.h" +// #include "leveldb_ext.h" import "C" import ( @@ -41,9 +42,11 @@ type Limit struct { } type Iterator struct { - it *C.leveldb_iterator_t + it *C.leveldb_iterator_t + isValid C.uchar } +//Returns a copy of key. func (it *Iterator) Key() []byte { var klen C.size_t kdata := C.leveldb_iter_key(it.it, &klen) @@ -54,6 +57,7 @@ func (it *Iterator) Key() []byte { return C.GoBytes(unsafe.Pointer(kdata), C.int(klen)) } +//Returns a copy of value. func (it *Iterator) Value() []byte { var vlen C.size_t vdata := C.leveldb_iter_value(it.it, &vlen) @@ -64,43 +68,95 @@ func (it *Iterator) Value() []byte { return C.GoBytes(unsafe.Pointer(vdata), C.int(vlen)) } +//Returns a reference of key. +//you must be careful that it will be changed after next iterate. +func (it *Iterator) RawKey() []byte { + var klen C.size_t + kdata := C.leveldb_iter_key(it.it, &klen) + if kdata == nil { + return nil + } + + return slice(unsafe.Pointer(kdata), int(C.int(klen))) +} + +//Returns a reference of value. +//you must be careful that it will be changed after next iterate. +func (it *Iterator) RawValue() []byte { + var vlen C.size_t + vdata := C.leveldb_iter_value(it.it, &vlen) + if vdata == nil { + return nil + } + + return slice(unsafe.Pointer(vdata), int(C.int(vlen))) +} + +//Copy key to b, if b len is small or nil, returns a new one +func (it *Iterator) BufKey(b []byte) []byte { + k := it.RawKey() + if k == nil { + return nil + } + if len(k) > len(b) { + b = make([]byte, len(k)) + } + + n := copy(b, k) + return b[0:n] +} + +//Copy value to b, if b len is small or nil, returns a new one +func (it *Iterator) BufValue(b []byte) []byte { + v := it.RawValue() + if v == nil { + return nil + } + if len(v) > len(b) { + b = make([]byte, len(v)) + } + + n := copy(b, v) + return b[0:n] +} + func (it *Iterator) Close() { C.leveldb_iter_destroy(it.it) it.it = nil } func (it *Iterator) Valid() bool { - return ucharToBool(C.leveldb_iter_valid(it.it)) + return ucharToBool(it.isValid) } func (it *Iterator) Next() { - C.leveldb_iter_next(it.it) + it.isValid = C.leveldb_iter_next_ext(it.it) } func (it *Iterator) Prev() { - C.leveldb_iter_prev(it.it) + it.isValid = C.leveldb_iter_prev_ext(it.it) } func (it *Iterator) SeekToFirst() { - C.leveldb_iter_seek_to_first(it.it) + it.isValid = C.leveldb_iter_seek_to_first_ext(it.it) } func (it *Iterator) SeekToLast() { - C.leveldb_iter_seek_to_last(it.it) + it.isValid = C.leveldb_iter_seek_to_last_ext(it.it) } func (it *Iterator) Seek(key []byte) { - C.leveldb_iter_seek(it.it, (*C.char)(unsafe.Pointer(&key[0])), C.size_t(len(key))) + it.isValid = C.leveldb_iter_seek_ext(it.it, (*C.char)(unsafe.Pointer(&key[0])), C.size_t(len(key))) } +//Finds by key, if not found, nil returns func (it *Iterator) Find(key []byte) []byte { it.Seek(key) if it.Valid() { - var klen C.size_t - kdata := C.leveldb_iter_key(it.it, &klen) - if kdata == nil { + k := it.RawKey() + if k == nil { return nil - } else if bytes.Equal(slice(unsafe.Pointer(kdata), int(C.int(klen))), key) { + } else if bytes.Equal(k, key) { return it.Value() } } @@ -108,6 +164,22 @@ func (it *Iterator) Find(key []byte) []byte { return nil } +//Finds by key, if not found, nil returns, else a reference of value returns +//you must be careful that it will be changed after next iterate. +func (it *Iterator) RawFind(key []byte) []byte { + it.Seek(key) + if it.Valid() { + k := it.RawKey() + if k == nil { + return nil + } else if bytes.Equal(k, key) { + return it.RawValue() + } + } + + return nil +} + type RangeLimitIterator struct { it *Iterator @@ -128,6 +200,14 @@ func (it *RangeLimitIterator) Value() []byte { return it.it.Value() } +func (it *RangeLimitIterator) RawKey() []byte { + return it.it.RawKey() +} + +func (it *RangeLimitIterator) RawValue() []byte { + return it.it.RawValue() +} + func (it *RangeLimitIterator) Valid() bool { if it.l.Offset < 0 { return false @@ -139,7 +219,7 @@ func (it *RangeLimitIterator) Valid() bool { if it.direction == IteratorForward { if it.r.Max != nil { - r := bytes.Compare(it.it.Key(), it.r.Max) + r := bytes.Compare(it.it.RawKey(), it.r.Max) if it.r.Type&RangeROpen > 0 { return !(r >= 0) } else { @@ -148,7 +228,7 @@ func (it *RangeLimitIterator) Valid() bool { } } else { if it.r.Min != nil { - r := bytes.Compare(it.it.Key(), it.r.Min) + r := bytes.Compare(it.it.RawKey(), it.r.Min) if it.r.Type&RangeLOpen > 0 { return !(r <= 0) } else { @@ -204,7 +284,7 @@ func rangeLimitIterator(i *Iterator, r *Range, l *Limit, direction uint8) *Range it.it.Seek(r.Min) if r.Type&RangeLOpen > 0 { - if it.it.Valid() && bytes.Equal(it.it.Key(), r.Min) { + if it.it.Valid() && bytes.Equal(it.it.RawKey(), r.Min) { it.it.Next() } } @@ -218,13 +298,13 @@ func rangeLimitIterator(i *Iterator, r *Range, l *Limit, direction uint8) *Range if !it.it.Valid() { it.it.SeekToLast() } else { - if !bytes.Equal(it.it.Key(), r.Max) { + if !bytes.Equal(it.it.RawKey(), r.Max) { it.it.Prev() } } if r.Type&RangeROpen > 0 { - if it.it.Valid() && bytes.Equal(it.it.Key(), r.Max) { + if it.it.Valid() && bytes.Equal(it.it.RawKey(), r.Max) { it.it.Prev() } } diff --git a/leveldb/leveldb_ext.cc b/leveldb/leveldb_ext.cc new file mode 100644 index 0000000..9ed2579 --- /dev/null +++ b/leveldb/leveldb_ext.cc @@ -0,0 +1,86 @@ +#include "leveldb_ext.h" + +#include +#include + +#include "leveldb/db.h" + +using namespace leveldb; + +extern "C" { + +static bool SaveError(char** errptr, const Status& s) { + assert(errptr != NULL); + if (s.ok()) { + return false; + } else if (*errptr == NULL) { + *errptr = strdup(s.ToString().c_str()); + } else { + free(*errptr); + *errptr = strdup(s.ToString().c_str()); + } + return true; +} + +void* leveldb_get_ext( + leveldb_t* db, + const leveldb_readoptions_t* options, + const char* key, size_t keylen, + char** valptr, + size_t* vallen, + char** errptr) { + + std::string *tmp = new(std::string); + + //very tricky, maybe changed with c++ leveldb upgrade + Status s = (*(DB**)db)->Get(*(ReadOptions*)options, Slice(key, keylen), tmp); + + if (s.ok()) { + *valptr = (char*)tmp->data(); + *vallen = tmp->size(); + } else { + delete(tmp); + tmp = NULL; + *valptr = NULL; + *vallen = 0; + if (!s.IsNotFound()) { + SaveError(errptr, s); + } + } + return tmp; +} + +void leveldb_get_free_ext(void* context) { + std::string* s = (std::string*)context; + + delete(s); +} + + +unsigned char leveldb_iter_seek_to_first_ext(leveldb_iterator_t* iter) { + leveldb_iter_seek_to_first(iter); + return leveldb_iter_valid(iter); +} + +unsigned char leveldb_iter_seek_to_last_ext(leveldb_iterator_t* iter) { + leveldb_iter_seek_to_last(iter); + return leveldb_iter_valid(iter); +} + +unsigned char leveldb_iter_seek_ext(leveldb_iterator_t* iter, const char* k, size_t klen) { + leveldb_iter_seek(iter, k, klen); + return leveldb_iter_valid(iter); +} + +unsigned char leveldb_iter_next_ext(leveldb_iterator_t* iter) { + leveldb_iter_next(iter); + return leveldb_iter_valid(iter); +} + +unsigned char leveldb_iter_prev_ext(leveldb_iterator_t* iter) { + leveldb_iter_prev(iter); + return leveldb_iter_valid(iter); +} + + +} \ No newline at end of file diff --git a/leveldb/leveldb_ext.h b/leveldb/leveldb_ext.h new file mode 100644 index 0000000..8b26010 --- /dev/null +++ b/leveldb/leveldb_ext.h @@ -0,0 +1,38 @@ +#ifndef LEVELDB_EXT_H +#define LEVELDB_EXT_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "leveldb/c.h" + + +/* Returns NULL if not found. Otherwise stores the value in **valptr. + Stores the length of the value in *vallen. + Returns a context must be later to free*/ +extern void* leveldb_get_ext( + leveldb_t* db, + const leveldb_readoptions_t* options, + const char* key, size_t keylen, + char** valptr, + size_t* vallen, + char** errptr); + +// Free context returns by leveldb_get_ext +extern void leveldb_get_free_ext(void* context); + + +// Below iterator functions like leveldb iterator but returns valid status for iterator +extern unsigned char leveldb_iter_seek_to_first_ext(leveldb_iterator_t*); +extern unsigned char leveldb_iter_seek_to_last_ext(leveldb_iterator_t*); +extern unsigned char leveldb_iter_seek_ext(leveldb_iterator_t*, const char* k, size_t klen); +extern unsigned char leveldb_iter_next_ext(leveldb_iterator_t*); +extern unsigned char leveldb_iter_prev_ext(leveldb_iterator_t*); + + +#ifdef __cplusplus +} +#endif + +#endif \ No newline at end of file diff --git a/leveldb/snapshot.go b/leveldb/snapshot.go index b34f2a2..82c7363 100644 --- a/leveldb/snapshot.go +++ b/leveldb/snapshot.go @@ -22,7 +22,11 @@ func (s *Snapshot) Close() { } func (s *Snapshot) Get(key []byte) ([]byte, error) { - return s.db.get(s.readOpts, key) + return s.db.get(nil, s.readOpts, key) +} + +func (s *Snapshot) BufGet(r []byte, key []byte) ([]byte, error) { + return s.db.get(r, s.readOpts, key) } func (s *Snapshot) NewIterator() *Iterator {