forked from mirror/ledisdb
Merge branch 'develop'
This commit is contained in:
commit
d161d9247d
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"ImportPath": "github.com/siddontang/ledisdb",
|
||||
"GoVersion": "go1.3.2",
|
||||
"GoVersion": "go1.3.3",
|
||||
"Packages": [
|
||||
"./..."
|
||||
],
|
||||
|
@ -16,27 +16,35 @@
|
|||
},
|
||||
{
|
||||
"ImportPath": "github.com/siddontang/go/bson",
|
||||
"Rev": "466d5bc779ad45f5923d0f59efbc5d696bf2099c"
|
||||
"Rev": "ada3cebe1055442f2af2840ed42cd50f64bbc6f7"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/siddontang/go/filelock",
|
||||
"Rev": "466d5bc779ad45f5923d0f59efbc5d696bf2099c"
|
||||
"Rev": "ada3cebe1055442f2af2840ed42cd50f64bbc6f7"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/siddontang/go/hack",
|
||||
"Rev": "466d5bc779ad45f5923d0f59efbc5d696bf2099c"
|
||||
"Rev": "ada3cebe1055442f2af2840ed42cd50f64bbc6f7"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/siddontang/go/ioutil2",
|
||||
"Rev": "ada3cebe1055442f2af2840ed42cd50f64bbc6f7"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/siddontang/go/log",
|
||||
"Rev": "466d5bc779ad45f5923d0f59efbc5d696bf2099c"
|
||||
"Rev": "ada3cebe1055442f2af2840ed42cd50f64bbc6f7"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/siddontang/go/num",
|
||||
"Rev": "466d5bc779ad45f5923d0f59efbc5d696bf2099c"
|
||||
"Rev": "ada3cebe1055442f2af2840ed42cd50f64bbc6f7"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/siddontang/go/snappy",
|
||||
"Rev": "466d5bc779ad45f5923d0f59efbc5d696bf2099c"
|
||||
"Rev": "ada3cebe1055442f2af2840ed42cd50f64bbc6f7"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/siddontang/go/sync2",
|
||||
"Rev": "ada3cebe1055442f2af2840ed42cd50f64bbc6f7"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/siddontang/goleveldb/leveldb",
|
||||
|
|
|
@ -25,3 +25,4 @@ go get github.com/siddontang/go/log
|
|||
go get github.com/siddontang/go/snappy
|
||||
go get github.com/siddontang/go/num
|
||||
go get github.com/siddontang/go/filelock
|
||||
go get github.com/siddontang/go/sync2
|
|
@ -2566,15 +2566,7 @@ Very dangerous to use!!!
|
|||
|
||||
Return information and statistic about the server in a format that is simple to parse by computers and easy to read by humans.
|
||||
|
||||
The optional parameter can be used to select a specific section of information:
|
||||
|
||||
+ server: General information about the Redis server
|
||||
+ client: Client connections section
|
||||
+ mem: Memory consumption related information
|
||||
+ goroutine: Goroutine num
|
||||
+ persistence: Strorage related information
|
||||
|
||||
When no parameter is provided, all will return.
|
||||
The optional parameter can be used to select a specific section of information. When no parameter is provided, all will return.
|
||||
|
||||
### TIME
|
||||
|
||||
|
|
|
@ -10,7 +10,7 @@ import (
|
|||
type batch struct {
|
||||
l *Ledis
|
||||
|
||||
store.WriteBatch
|
||||
*store.WriteBatch
|
||||
|
||||
sync.Locker
|
||||
|
||||
|
@ -88,7 +88,7 @@ type multiBatchLocker struct {
|
|||
func (l *multiBatchLocker) Lock() {}
|
||||
func (l *multiBatchLocker) Unlock() {}
|
||||
|
||||
func (l *Ledis) newBatch(wb store.WriteBatch, locker sync.Locker, tx *Tx) *batch {
|
||||
func (l *Ledis) newBatch(wb *store.WriteBatch, locker sync.Locker, tx *Tx) *batch {
|
||||
b := new(batch)
|
||||
b.l = l
|
||||
b.WriteBatch = wb
|
||||
|
|
|
@ -26,7 +26,7 @@ type Ledis struct {
|
|||
//for replication
|
||||
r *rpl.Replication
|
||||
rc chan struct{}
|
||||
rbatch store.WriteBatch
|
||||
rbatch *store.WriteBatch
|
||||
rwg sync.WaitGroup
|
||||
rhs []NewLogEventHandler
|
||||
|
||||
|
@ -198,3 +198,7 @@ func (l *Ledis) onDataExpired() {
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
func (l *Ledis) StoreStat() *store.Stat {
|
||||
return l.ldb.Stat()
|
||||
}
|
||||
|
|
|
@ -14,7 +14,7 @@ type ibucket interface {
|
|||
|
||||
NewIterator() *store.Iterator
|
||||
|
||||
NewWriteBatch() store.WriteBatch
|
||||
NewWriteBatch() *store.WriteBatch
|
||||
|
||||
RangeIterator(min []byte, max []byte, rangeType uint8) *store.RangeLimitIterator
|
||||
RevRangeIterator(min []byte, max []byte, rangeType uint8) *store.RangeLimitIterator
|
||||
|
|
|
@ -55,6 +55,7 @@ func newInfo(app *App) (i *info, err error) {
|
|||
func (i *info) addClients(delta int64) {
|
||||
atomic.AddInt64(&i.Clients.ConnectedClients, delta)
|
||||
}
|
||||
|
||||
func (i *info) Close() {
|
||||
|
||||
}
|
||||
|
@ -82,10 +83,8 @@ func (i *info) Dump(section string) []byte {
|
|||
i.dumpClients(buf)
|
||||
case "mem":
|
||||
i.dumpMem(buf)
|
||||
case "persistence":
|
||||
i.dumpPersistence(buf)
|
||||
case "goroutine":
|
||||
i.dumpGoroutine(buf)
|
||||
case "store":
|
||||
i.dumpStore(buf)
|
||||
case "replication":
|
||||
i.dumpReplication(buf)
|
||||
default:
|
||||
|
@ -103,14 +102,12 @@ type infoPair struct {
|
|||
func (i *info) dumpAll(buf *bytes.Buffer) {
|
||||
i.dumpServer(buf)
|
||||
buf.Write(Delims)
|
||||
i.dumpPersistence(buf)
|
||||
i.dumpStore(buf)
|
||||
buf.Write(Delims)
|
||||
i.dumpClients(buf)
|
||||
buf.Write(Delims)
|
||||
i.dumpMem(buf)
|
||||
buf.Write(Delims)
|
||||
i.dumpGoroutine(buf)
|
||||
buf.Write(Delims)
|
||||
i.dumpReplication(buf)
|
||||
}
|
||||
|
||||
|
@ -121,7 +118,9 @@ func (i *info) dumpServer(buf *bytes.Buffer) {
|
|||
infoPair{"process_id", i.Server.ProceessId},
|
||||
infoPair{"addr", i.app.cfg.Addr},
|
||||
infoPair{"http_addr", i.app.cfg.HttpAddr},
|
||||
infoPair{"readonly", i.app.cfg.Readonly})
|
||||
infoPair{"readonly", i.app.cfg.Readonly},
|
||||
infoPair{"goroutine_num", runtime.NumGoroutine()},
|
||||
)
|
||||
}
|
||||
|
||||
func (i *info) dumpClients(buf *bytes.Buffer) {
|
||||
|
@ -140,16 +139,21 @@ func (i *info) dumpMem(buf *bytes.Buffer) {
|
|||
infoPair{"mem_alloc_human", getMemoryHuman(mem.Alloc)})
|
||||
}
|
||||
|
||||
func (i *info) dumpGoroutine(buf *bytes.Buffer) {
|
||||
buf.WriteString("# Goroutine\r\n")
|
||||
func (i *info) dumpStore(buf *bytes.Buffer) {
|
||||
buf.WriteString("# Store\r\n")
|
||||
|
||||
i.dumpPairs(buf, infoPair{"goroutine_num", runtime.NumGoroutine()})
|
||||
}
|
||||
s := i.app.ldb.StoreStat()
|
||||
|
||||
func (i *info) dumpPersistence(buf *bytes.Buffer) {
|
||||
buf.WriteString("# Persistence\r\n")
|
||||
|
||||
i.dumpPairs(buf, infoPair{"db_name", i.Persistence.DBName})
|
||||
i.dumpPairs(buf, infoPair{"name", i.Persistence.DBName},
|
||||
infoPair{"get", s.GetNum},
|
||||
infoPair{"get_missing", s.GetMissingNum},
|
||||
infoPair{"put", s.PutNum},
|
||||
infoPair{"delete", s.DeleteNum},
|
||||
infoPair{"iter", s.IterNum},
|
||||
infoPair{"iter_seek", s.IterSeekNum},
|
||||
infoPair{"iter_close", s.IterCloseNum},
|
||||
infoPair{"batch_commit", s.BatchCommitNum},
|
||||
)
|
||||
}
|
||||
|
||||
func (i *info) dumpReplication(buf *bytes.Buffer) {
|
||||
|
|
|
@ -190,7 +190,7 @@ func (s *snapshotStore) Create(d snapshotDumper) (*snapshot, time.Time, error) {
|
|||
|
||||
if len(s.names) > 0 {
|
||||
lastTime, _ := parseSnapshotName(s.names[len(s.names)-1])
|
||||
if !now.After(lastTime) {
|
||||
if now.Nanosecond() <= lastTime.Nanosecond() {
|
||||
return nil, time.Time{}, fmt.Errorf("create snapshot file time %s is behind %s ",
|
||||
now.Format(snapshotTimeFormat), lastTime.Format(snapshotTimeFormat))
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ func TestSnapshot(t *testing.T) {
|
|||
cfg := config.NewConfigDefault()
|
||||
cfg.Snapshot.MaxNum = 2
|
||||
cfg.Snapshot.Path = path.Join(os.TempDir(), "snapshot")
|
||||
|
||||
defer os.RemoveAll(cfg.Snapshot.Path)
|
||||
|
||||
d := new(testSnapshotDumper)
|
||||
|
|
60
store/db.go
60
store/db.go
|
@ -2,11 +2,14 @@ package store
|
|||
|
||||
import (
|
||||
"github.com/siddontang/ledisdb/store/driver"
|
||||
"time"
|
||||
)
|
||||
|
||||
type DB struct {
|
||||
driver.IDB
|
||||
name string
|
||||
|
||||
st *Stat
|
||||
}
|
||||
|
||||
func (db *DB) String() string {
|
||||
|
@ -14,28 +17,71 @@ func (db *DB) String() string {
|
|||
}
|
||||
|
||||
func (db *DB) NewIterator() *Iterator {
|
||||
db.st.IterNum.Add(1)
|
||||
|
||||
it := new(Iterator)
|
||||
it.it = db.IDB.NewIterator()
|
||||
it.st = db.st
|
||||
|
||||
return it
|
||||
}
|
||||
|
||||
func (db *DB) NewWriteBatch() WriteBatch {
|
||||
return db.IDB.NewWriteBatch()
|
||||
func (db *DB) Get(key []byte) ([]byte, error) {
|
||||
v, err := db.IDB.Get(key)
|
||||
db.st.statGet(v, err)
|
||||
return v, err
|
||||
}
|
||||
|
||||
func (db *DB) Put(key []byte, value []byte) error {
|
||||
db.st.PutNum.Add(1)
|
||||
return db.IDB.Put(key, value)
|
||||
}
|
||||
|
||||
func (db *DB) Delete(key []byte) error {
|
||||
db.st.DeleteNum.Add(1)
|
||||
return db.IDB.Delete(key)
|
||||
}
|
||||
|
||||
func (db *DB) SyncPut(key []byte, value []byte) error {
|
||||
db.st.SyncPutNum.Add(1)
|
||||
return db.IDB.SyncPut(key, value)
|
||||
}
|
||||
|
||||
func (db *DB) SyncDelete(key []byte) error {
|
||||
db.st.SyncDeleteNum.Add(1)
|
||||
return db.IDB.SyncDelete(key)
|
||||
}
|
||||
|
||||
func (db *DB) NewWriteBatch() *WriteBatch {
|
||||
db.st.BatchNum.Add(1)
|
||||
wb := new(WriteBatch)
|
||||
wb.IWriteBatch = db.IDB.NewWriteBatch()
|
||||
wb.st = db.st
|
||||
return wb
|
||||
}
|
||||
|
||||
func (db *DB) NewSnapshot() (*Snapshot, error) {
|
||||
db.st.SnapshotNum.Add(1)
|
||||
|
||||
var err error
|
||||
s := &Snapshot{}
|
||||
if s.ISnapshot, err = db.IDB.NewSnapshot(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.st = db.st
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (db *DB) Compact() error {
|
||||
return db.IDB.Compact()
|
||||
db.st.CompactNum.Add(1)
|
||||
|
||||
t := time.Now()
|
||||
err := db.IDB.Compact()
|
||||
|
||||
db.st.CompactTotalTime.Add(time.Now().Sub(t))
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (db *DB) RangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator {
|
||||
|
@ -66,5 +112,11 @@ func (db *DB) Begin() (*Tx, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
return &Tx{tx}, nil
|
||||
db.st.TxNum.Add(1)
|
||||
|
||||
return &Tx{tx, db.st}, nil
|
||||
}
|
||||
|
||||
func (db *DB) Stat() *Stat {
|
||||
return db.st
|
||||
}
|
||||
|
|
|
@ -40,6 +40,7 @@ type Limit struct {
|
|||
|
||||
type Iterator struct {
|
||||
it driver.IIterator
|
||||
st *Stat
|
||||
}
|
||||
|
||||
// Returns a copy of key.
|
||||
|
@ -105,6 +106,7 @@ func (it *Iterator) BufValue(b []byte) []byte {
|
|||
|
||||
func (it *Iterator) Close() {
|
||||
if it.it != nil {
|
||||
it.st.IterCloseNum.Add(1)
|
||||
it.it.Close()
|
||||
it.it = nil
|
||||
}
|
||||
|
@ -115,22 +117,27 @@ func (it *Iterator) Valid() bool {
|
|||
}
|
||||
|
||||
func (it *Iterator) Next() {
|
||||
it.st.IterSeekNum.Add(1)
|
||||
it.it.Next()
|
||||
}
|
||||
|
||||
func (it *Iterator) Prev() {
|
||||
it.st.IterSeekNum.Add(1)
|
||||
it.it.Prev()
|
||||
}
|
||||
|
||||
func (it *Iterator) SeekToFirst() {
|
||||
it.st.IterSeekNum.Add(1)
|
||||
it.it.First()
|
||||
}
|
||||
|
||||
func (it *Iterator) SeekToLast() {
|
||||
it.st.IterSeekNum.Add(1)
|
||||
it.it.Last()
|
||||
}
|
||||
|
||||
func (it *Iterator) Seek(key []byte) {
|
||||
it.st.IterSeekNum.Add(1)
|
||||
it.it.Seek(key)
|
||||
}
|
||||
|
||||
|
|
|
@ -6,11 +6,26 @@ import (
|
|||
|
||||
type Snapshot struct {
|
||||
driver.ISnapshot
|
||||
st *Stat
|
||||
}
|
||||
|
||||
func (s *Snapshot) NewIterator() *Iterator {
|
||||
it := new(Iterator)
|
||||
it.it = s.ISnapshot.NewIterator()
|
||||
it.st = s.st
|
||||
|
||||
s.st.IterNum.Add(1)
|
||||
|
||||
return it
|
||||
}
|
||||
|
||||
func (s *Snapshot) Get(key []byte) ([]byte, error) {
|
||||
v, err := s.ISnapshot.Get(key)
|
||||
s.st.statGet(v, err)
|
||||
return v, err
|
||||
}
|
||||
|
||||
func (s *Snapshot) Close() {
|
||||
s.st.SnapshotCloseNum.Add(1)
|
||||
s.ISnapshot.Close()
|
||||
}
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"github.com/siddontang/go/sync2"
|
||||
)
|
||||
|
||||
type Stat struct {
|
||||
GetNum sync2.AtomicInt64
|
||||
GetMissingNum sync2.AtomicInt64
|
||||
PutNum sync2.AtomicInt64
|
||||
DeleteNum sync2.AtomicInt64
|
||||
SyncPutNum sync2.AtomicInt64
|
||||
SyncDeleteNum sync2.AtomicInt64
|
||||
IterNum sync2.AtomicInt64
|
||||
IterSeekNum sync2.AtomicInt64
|
||||
IterCloseNum sync2.AtomicInt64
|
||||
SnapshotNum sync2.AtomicInt64
|
||||
SnapshotCloseNum sync2.AtomicInt64
|
||||
BatchNum sync2.AtomicInt64
|
||||
BatchCommitNum sync2.AtomicInt64
|
||||
BatchSyncCommitNum sync2.AtomicInt64
|
||||
TxNum sync2.AtomicInt64
|
||||
TxCommitNum sync2.AtomicInt64
|
||||
TxCloseNum sync2.AtomicInt64
|
||||
CompactNum sync2.AtomicInt64
|
||||
CompactTotalTime sync2.AtomicDuration
|
||||
}
|
||||
|
||||
func (st *Stat) statGet(v []byte, err error) {
|
||||
st.GetNum.Add(1)
|
||||
if v == nil && err == nil {
|
||||
st.GetMissingNum.Add(1)
|
||||
}
|
||||
}
|
||||
|
||||
func (st *Stat) Reset() {
|
||||
*st = Stat{}
|
||||
// st.GetNum.Set(0)
|
||||
// st.GetMissingNum.Set(0)
|
||||
// st.PutNum.Set(0)
|
||||
// st.DeleteNum.Set(0)
|
||||
// st.SyncPutNum.Set(0)
|
||||
// st.SyncDeleteNum.Set(0)
|
||||
// st.IterNum.Set(0)
|
||||
// st.IterSeekNum.Set(0)
|
||||
// st.IterCloseNum.Set(0)
|
||||
// st.SnapshotNum.Set(0)
|
||||
// st.SnapshotCloseNum.Set(0)
|
||||
// st.BatchNum.Set(0)
|
||||
// st.BatchCommitNum.Set(0)
|
||||
// st.BatchSyncCommitNum.Set(0)
|
||||
// st.TxNum.Set(0)
|
||||
// st.TxCommitNum.Set(0)
|
||||
// st.TxCloseNum.Set(0)
|
||||
// st.CompactNum.Set(0)
|
||||
// st.CompactTotalTime.Set(0)
|
||||
}
|
|
@ -40,7 +40,7 @@ func Open(cfg *config.Config) (*DB, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
db := &DB{idb, s.String()}
|
||||
db := &DB{idb, s.String(), &Stat{}}
|
||||
|
||||
return db, nil
|
||||
}
|
||||
|
|
38
store/tx.go
38
store/tx.go
|
@ -6,17 +6,26 @@ import (
|
|||
|
||||
type Tx struct {
|
||||
driver.Tx
|
||||
st *Stat
|
||||
}
|
||||
|
||||
func (tx *Tx) NewIterator() *Iterator {
|
||||
it := new(Iterator)
|
||||
it.it = tx.Tx.NewIterator()
|
||||
it.st = tx.st
|
||||
|
||||
tx.st.IterNum.Add(1)
|
||||
|
||||
return it
|
||||
}
|
||||
|
||||
func (tx *Tx) NewWriteBatch() WriteBatch {
|
||||
return tx.Tx.NewWriteBatch()
|
||||
func (tx *Tx) NewWriteBatch() *WriteBatch {
|
||||
tx.st.BatchNum.Add(1)
|
||||
|
||||
wb := new(WriteBatch)
|
||||
wb.IWriteBatch = tx.Tx.NewWriteBatch()
|
||||
wb.st = tx.st
|
||||
return wb
|
||||
}
|
||||
|
||||
func (tx *Tx) RangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator {
|
||||
|
@ -40,3 +49,28 @@ func (tx *Tx) RangeLimitIterator(min []byte, max []byte, rangeType uint8, offset
|
|||
func (tx *Tx) RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator {
|
||||
return NewRevRangeLimitIterator(tx.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count})
|
||||
}
|
||||
|
||||
func (tx *Tx) Get(key []byte) ([]byte, error) {
|
||||
v, err := tx.Tx.Get(key)
|
||||
tx.st.statGet(v, err)
|
||||
return v, err
|
||||
}
|
||||
|
||||
func (tx *Tx) Put(key []byte, value []byte) error {
|
||||
tx.st.PutNum.Add(1)
|
||||
return tx.Tx.Put(key, value)
|
||||
}
|
||||
|
||||
func (tx *Tx) Delete(key []byte) error {
|
||||
tx.st.DeleteNum.Add(1)
|
||||
return tx.Tx.Delete(key)
|
||||
}
|
||||
|
||||
func (tx *Tx) Commit() error {
|
||||
tx.st.TxCommitNum.Add(1)
|
||||
return tx.Tx.Commit()
|
||||
}
|
||||
|
||||
func (tx *Tx) Rollback() error {
|
||||
return tx.Tx.Rollback()
|
||||
}
|
||||
|
|
|
@ -4,6 +4,41 @@ import (
|
|||
"github.com/siddontang/ledisdb/store/driver"
|
||||
)
|
||||
|
||||
type WriteBatch interface {
|
||||
type WriteBatch struct {
|
||||
driver.IWriteBatch
|
||||
st *Stat
|
||||
putNum int64
|
||||
deleteNum int64
|
||||
}
|
||||
|
||||
func (wb *WriteBatch) Put(key []byte, value []byte) {
|
||||
wb.putNum++
|
||||
wb.IWriteBatch.Put(key, value)
|
||||
}
|
||||
|
||||
func (wb *WriteBatch) Delete(key []byte) {
|
||||
wb.deleteNum++
|
||||
wb.IWriteBatch.Delete(key)
|
||||
}
|
||||
|
||||
func (wb *WriteBatch) Commit() error {
|
||||
wb.st.BatchCommitNum.Add(1)
|
||||
wb.st.PutNum.Add(wb.putNum)
|
||||
wb.st.DeleteNum.Add(wb.deleteNum)
|
||||
wb.putNum = 0
|
||||
wb.deleteNum = 0
|
||||
return wb.IWriteBatch.Commit()
|
||||
}
|
||||
|
||||
func (wb *WriteBatch) SyncCommit() error {
|
||||
wb.st.BatchSyncCommitNum.Add(1)
|
||||
wb.st.SyncPutNum.Add(wb.putNum)
|
||||
wb.st.SyncDeleteNum.Add(wb.deleteNum)
|
||||
wb.putNum = 0
|
||||
wb.deleteNum = 0
|
||||
return wb.IWriteBatch.SyncCommit()
|
||||
}
|
||||
|
||||
func (wb *WriteBatch) Rollback() error {
|
||||
return wb.IWriteBatch.Rollback()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue