diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 1ad04ae..ab1bbab 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -1,6 +1,6 @@ { "ImportPath": "github.com/siddontang/ledisdb", - "GoVersion": "go1.3.2", + "GoVersion": "go1.3.3", "Packages": [ "./..." ], @@ -16,27 +16,35 @@ }, { "ImportPath": "github.com/siddontang/go/bson", - "Rev": "466d5bc779ad45f5923d0f59efbc5d696bf2099c" + "Rev": "ada3cebe1055442f2af2840ed42cd50f64bbc6f7" }, { "ImportPath": "github.com/siddontang/go/filelock", - "Rev": "466d5bc779ad45f5923d0f59efbc5d696bf2099c" + "Rev": "ada3cebe1055442f2af2840ed42cd50f64bbc6f7" }, { "ImportPath": "github.com/siddontang/go/hack", - "Rev": "466d5bc779ad45f5923d0f59efbc5d696bf2099c" + "Rev": "ada3cebe1055442f2af2840ed42cd50f64bbc6f7" + }, + { + "ImportPath": "github.com/siddontang/go/ioutil2", + "Rev": "ada3cebe1055442f2af2840ed42cd50f64bbc6f7" }, { "ImportPath": "github.com/siddontang/go/log", - "Rev": "466d5bc779ad45f5923d0f59efbc5d696bf2099c" + "Rev": "ada3cebe1055442f2af2840ed42cd50f64bbc6f7" }, { "ImportPath": "github.com/siddontang/go/num", - "Rev": "466d5bc779ad45f5923d0f59efbc5d696bf2099c" + "Rev": "ada3cebe1055442f2af2840ed42cd50f64bbc6f7" }, { "ImportPath": "github.com/siddontang/go/snappy", - "Rev": "466d5bc779ad45f5923d0f59efbc5d696bf2099c" + "Rev": "ada3cebe1055442f2af2840ed42cd50f64bbc6f7" + }, + { + "ImportPath": "github.com/siddontang/go/sync2", + "Rev": "ada3cebe1055442f2af2840ed42cd50f64bbc6f7" }, { "ImportPath": "github.com/siddontang/goleveldb/leveldb", diff --git a/bootstrap.sh b/bootstrap.sh index ca844b2..ae18f27 100755 --- a/bootstrap.sh +++ b/bootstrap.sh @@ -25,3 +25,4 @@ go get github.com/siddontang/go/log go get github.com/siddontang/go/snappy go get github.com/siddontang/go/num go get github.com/siddontang/go/filelock +go get github.com/siddontang/go/sync2 \ No newline at end of file diff --git a/doc/commands.md b/doc/commands.md index 4c9f3bc..4a36760 100644 --- a/doc/commands.md +++ b/doc/commands.md @@ -2566,15 +2566,7 @@ Very dangerous to use!!! Return information and statistic about the server in a format that is simple to parse by computers and easy to read by humans. -The optional parameter can be used to select a specific section of information: - -+ server: General information about the Redis server -+ client: Client connections section -+ mem: Memory consumption related information -+ goroutine: Goroutine num -+ persistence: Strorage related information - -When no parameter is provided, all will return. +The optional parameter can be used to select a specific section of information. When no parameter is provided, all will return. ### TIME diff --git a/ledis/batch.go b/ledis/batch.go index 61d5cd2..f5fe061 100644 --- a/ledis/batch.go +++ b/ledis/batch.go @@ -10,7 +10,7 @@ import ( type batch struct { l *Ledis - store.WriteBatch + *store.WriteBatch sync.Locker @@ -88,7 +88,7 @@ type multiBatchLocker struct { func (l *multiBatchLocker) Lock() {} func (l *multiBatchLocker) Unlock() {} -func (l *Ledis) newBatch(wb store.WriteBatch, locker sync.Locker, tx *Tx) *batch { +func (l *Ledis) newBatch(wb *store.WriteBatch, locker sync.Locker, tx *Tx) *batch { b := new(batch) b.l = l b.WriteBatch = wb diff --git a/ledis/ledis.go b/ledis/ledis.go index 75caedd..10fa49c 100644 --- a/ledis/ledis.go +++ b/ledis/ledis.go @@ -26,7 +26,7 @@ type Ledis struct { //for replication r *rpl.Replication rc chan struct{} - rbatch store.WriteBatch + rbatch *store.WriteBatch rwg sync.WaitGroup rhs []NewLogEventHandler @@ -198,3 +198,7 @@ func (l *Ledis) onDataExpired() { } } + +func (l *Ledis) StoreStat() *store.Stat { + return l.ldb.Stat() +} diff --git a/ledis/ledis_db.go b/ledis/ledis_db.go index 6a8eb9c..c24e295 100644 --- a/ledis/ledis_db.go +++ b/ledis/ledis_db.go @@ -14,7 +14,7 @@ type ibucket interface { NewIterator() *store.Iterator - NewWriteBatch() store.WriteBatch + NewWriteBatch() *store.WriteBatch RangeIterator(min []byte, max []byte, rangeType uint8) *store.RangeLimitIterator RevRangeIterator(min []byte, max []byte, rangeType uint8) *store.RangeLimitIterator diff --git a/server/info.go b/server/info.go index 2c729ed..8c4b00a 100644 --- a/server/info.go +++ b/server/info.go @@ -55,6 +55,7 @@ func newInfo(app *App) (i *info, err error) { func (i *info) addClients(delta int64) { atomic.AddInt64(&i.Clients.ConnectedClients, delta) } + func (i *info) Close() { } @@ -82,10 +83,8 @@ func (i *info) Dump(section string) []byte { i.dumpClients(buf) case "mem": i.dumpMem(buf) - case "persistence": - i.dumpPersistence(buf) - case "goroutine": - i.dumpGoroutine(buf) + case "store": + i.dumpStore(buf) case "replication": i.dumpReplication(buf) default: @@ -103,14 +102,12 @@ type infoPair struct { func (i *info) dumpAll(buf *bytes.Buffer) { i.dumpServer(buf) buf.Write(Delims) - i.dumpPersistence(buf) + i.dumpStore(buf) buf.Write(Delims) i.dumpClients(buf) buf.Write(Delims) i.dumpMem(buf) buf.Write(Delims) - i.dumpGoroutine(buf) - buf.Write(Delims) i.dumpReplication(buf) } @@ -121,7 +118,9 @@ func (i *info) dumpServer(buf *bytes.Buffer) { infoPair{"process_id", i.Server.ProceessId}, infoPair{"addr", i.app.cfg.Addr}, infoPair{"http_addr", i.app.cfg.HttpAddr}, - infoPair{"readonly", i.app.cfg.Readonly}) + infoPair{"readonly", i.app.cfg.Readonly}, + infoPair{"goroutine_num", runtime.NumGoroutine()}, + ) } func (i *info) dumpClients(buf *bytes.Buffer) { @@ -140,16 +139,21 @@ func (i *info) dumpMem(buf *bytes.Buffer) { infoPair{"mem_alloc_human", getMemoryHuman(mem.Alloc)}) } -func (i *info) dumpGoroutine(buf *bytes.Buffer) { - buf.WriteString("# Goroutine\r\n") +func (i *info) dumpStore(buf *bytes.Buffer) { + buf.WriteString("# Store\r\n") - i.dumpPairs(buf, infoPair{"goroutine_num", runtime.NumGoroutine()}) -} + s := i.app.ldb.StoreStat() -func (i *info) dumpPersistence(buf *bytes.Buffer) { - buf.WriteString("# Persistence\r\n") - - i.dumpPairs(buf, infoPair{"db_name", i.Persistence.DBName}) + i.dumpPairs(buf, infoPair{"name", i.Persistence.DBName}, + infoPair{"get", s.GetNum}, + infoPair{"get_missing", s.GetMissingNum}, + infoPair{"put", s.PutNum}, + infoPair{"delete", s.DeleteNum}, + infoPair{"iter", s.IterNum}, + infoPair{"iter_seek", s.IterSeekNum}, + infoPair{"iter_close", s.IterCloseNum}, + infoPair{"batch_commit", s.BatchCommitNum}, + ) } func (i *info) dumpReplication(buf *bytes.Buffer) { diff --git a/server/snapshot.go b/server/snapshot.go index 7240c7a..67192ea 100644 --- a/server/snapshot.go +++ b/server/snapshot.go @@ -190,7 +190,7 @@ func (s *snapshotStore) Create(d snapshotDumper) (*snapshot, time.Time, error) { if len(s.names) > 0 { lastTime, _ := parseSnapshotName(s.names[len(s.names)-1]) - if !now.After(lastTime) { + if now.Nanosecond() <= lastTime.Nanosecond() { return nil, time.Time{}, fmt.Errorf("create snapshot file time %s is behind %s ", now.Format(snapshotTimeFormat), lastTime.Format(snapshotTimeFormat)) } diff --git a/server/snapshot_test.go b/server/snapshot_test.go index d1e9230..e3ff625 100644 --- a/server/snapshot_test.go +++ b/server/snapshot_test.go @@ -21,6 +21,7 @@ func TestSnapshot(t *testing.T) { cfg := config.NewConfigDefault() cfg.Snapshot.MaxNum = 2 cfg.Snapshot.Path = path.Join(os.TempDir(), "snapshot") + defer os.RemoveAll(cfg.Snapshot.Path) d := new(testSnapshotDumper) diff --git a/store/db.go b/store/db.go index 99e6ed2..c238016 100644 --- a/store/db.go +++ b/store/db.go @@ -2,11 +2,14 @@ package store import ( "github.com/siddontang/ledisdb/store/driver" + "time" ) type DB struct { driver.IDB name string + + st *Stat } func (db *DB) String() string { @@ -14,28 +17,71 @@ func (db *DB) String() string { } func (db *DB) NewIterator() *Iterator { + db.st.IterNum.Add(1) + it := new(Iterator) it.it = db.IDB.NewIterator() + it.st = db.st return it } -func (db *DB) NewWriteBatch() WriteBatch { - return db.IDB.NewWriteBatch() +func (db *DB) Get(key []byte) ([]byte, error) { + v, err := db.IDB.Get(key) + db.st.statGet(v, err) + return v, err +} + +func (db *DB) Put(key []byte, value []byte) error { + db.st.PutNum.Add(1) + return db.IDB.Put(key, value) +} + +func (db *DB) Delete(key []byte) error { + db.st.DeleteNum.Add(1) + return db.IDB.Delete(key) +} + +func (db *DB) SyncPut(key []byte, value []byte) error { + db.st.SyncPutNum.Add(1) + return db.IDB.SyncPut(key, value) +} + +func (db *DB) SyncDelete(key []byte) error { + db.st.SyncDeleteNum.Add(1) + return db.IDB.SyncDelete(key) +} + +func (db *DB) NewWriteBatch() *WriteBatch { + db.st.BatchNum.Add(1) + wb := new(WriteBatch) + wb.IWriteBatch = db.IDB.NewWriteBatch() + wb.st = db.st + return wb } func (db *DB) NewSnapshot() (*Snapshot, error) { + db.st.SnapshotNum.Add(1) + var err error s := &Snapshot{} if s.ISnapshot, err = db.IDB.NewSnapshot(); err != nil { return nil, err } + s.st = db.st return s, nil } func (db *DB) Compact() error { - return db.IDB.Compact() + db.st.CompactNum.Add(1) + + t := time.Now() + err := db.IDB.Compact() + + db.st.CompactTotalTime.Add(time.Now().Sub(t)) + + return err } func (db *DB) RangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator { @@ -66,5 +112,11 @@ func (db *DB) Begin() (*Tx, error) { return nil, err } - return &Tx{tx}, nil + db.st.TxNum.Add(1) + + return &Tx{tx, db.st}, nil +} + +func (db *DB) Stat() *Stat { + return db.st } diff --git a/store/iterator.go b/store/iterator.go index ff2536d..d81976b 100644 --- a/store/iterator.go +++ b/store/iterator.go @@ -40,6 +40,7 @@ type Limit struct { type Iterator struct { it driver.IIterator + st *Stat } // Returns a copy of key. @@ -105,6 +106,7 @@ func (it *Iterator) BufValue(b []byte) []byte { func (it *Iterator) Close() { if it.it != nil { + it.st.IterCloseNum.Add(1) it.it.Close() it.it = nil } @@ -115,22 +117,27 @@ func (it *Iterator) Valid() bool { } func (it *Iterator) Next() { + it.st.IterSeekNum.Add(1) it.it.Next() } func (it *Iterator) Prev() { + it.st.IterSeekNum.Add(1) it.it.Prev() } func (it *Iterator) SeekToFirst() { + it.st.IterSeekNum.Add(1) it.it.First() } func (it *Iterator) SeekToLast() { + it.st.IterSeekNum.Add(1) it.it.Last() } func (it *Iterator) Seek(key []byte) { + it.st.IterSeekNum.Add(1) it.it.Seek(key) } diff --git a/store/snapshot.go b/store/snapshot.go index 3f7538a..80524ce 100644 --- a/store/snapshot.go +++ b/store/snapshot.go @@ -6,11 +6,26 @@ import ( type Snapshot struct { driver.ISnapshot + st *Stat } func (s *Snapshot) NewIterator() *Iterator { it := new(Iterator) it.it = s.ISnapshot.NewIterator() + it.st = s.st + + s.st.IterNum.Add(1) return it } + +func (s *Snapshot) Get(key []byte) ([]byte, error) { + v, err := s.ISnapshot.Get(key) + s.st.statGet(v, err) + return v, err +} + +func (s *Snapshot) Close() { + s.st.SnapshotCloseNum.Add(1) + s.ISnapshot.Close() +} diff --git a/store/stat.go b/store/stat.go new file mode 100644 index 0000000..75436fe --- /dev/null +++ b/store/stat.go @@ -0,0 +1,57 @@ +package store + +import ( + "github.com/siddontang/go/sync2" +) + +type Stat struct { + GetNum sync2.AtomicInt64 + GetMissingNum sync2.AtomicInt64 + PutNum sync2.AtomicInt64 + DeleteNum sync2.AtomicInt64 + SyncPutNum sync2.AtomicInt64 + SyncDeleteNum sync2.AtomicInt64 + IterNum sync2.AtomicInt64 + IterSeekNum sync2.AtomicInt64 + IterCloseNum sync2.AtomicInt64 + SnapshotNum sync2.AtomicInt64 + SnapshotCloseNum sync2.AtomicInt64 + BatchNum sync2.AtomicInt64 + BatchCommitNum sync2.AtomicInt64 + BatchSyncCommitNum sync2.AtomicInt64 + TxNum sync2.AtomicInt64 + TxCommitNum sync2.AtomicInt64 + TxCloseNum sync2.AtomicInt64 + CompactNum sync2.AtomicInt64 + CompactTotalTime sync2.AtomicDuration +} + +func (st *Stat) statGet(v []byte, err error) { + st.GetNum.Add(1) + if v == nil && err == nil { + st.GetMissingNum.Add(1) + } +} + +func (st *Stat) Reset() { + *st = Stat{} + // st.GetNum.Set(0) + // st.GetMissingNum.Set(0) + // st.PutNum.Set(0) + // st.DeleteNum.Set(0) + // st.SyncPutNum.Set(0) + // st.SyncDeleteNum.Set(0) + // st.IterNum.Set(0) + // st.IterSeekNum.Set(0) + // st.IterCloseNum.Set(0) + // st.SnapshotNum.Set(0) + // st.SnapshotCloseNum.Set(0) + // st.BatchNum.Set(0) + // st.BatchCommitNum.Set(0) + // st.BatchSyncCommitNum.Set(0) + // st.TxNum.Set(0) + // st.TxCommitNum.Set(0) + // st.TxCloseNum.Set(0) + // st.CompactNum.Set(0) + // st.CompactTotalTime.Set(0) +} diff --git a/store/store.go b/store/store.go index 2edde30..d882e48 100644 --- a/store/store.go +++ b/store/store.go @@ -40,7 +40,7 @@ func Open(cfg *config.Config) (*DB, error) { return nil, err } - db := &DB{idb, s.String()} + db := &DB{idb, s.String(), &Stat{}} return db, nil } diff --git a/store/tx.go b/store/tx.go index 8ee72a8..494219d 100644 --- a/store/tx.go +++ b/store/tx.go @@ -6,17 +6,26 @@ import ( type Tx struct { driver.Tx + st *Stat } func (tx *Tx) NewIterator() *Iterator { it := new(Iterator) it.it = tx.Tx.NewIterator() + it.st = tx.st + + tx.st.IterNum.Add(1) return it } -func (tx *Tx) NewWriteBatch() WriteBatch { - return tx.Tx.NewWriteBatch() +func (tx *Tx) NewWriteBatch() *WriteBatch { + tx.st.BatchNum.Add(1) + + wb := new(WriteBatch) + wb.IWriteBatch = tx.Tx.NewWriteBatch() + wb.st = tx.st + return wb } func (tx *Tx) RangeIterator(min []byte, max []byte, rangeType uint8) *RangeLimitIterator { @@ -40,3 +49,28 @@ func (tx *Tx) RangeLimitIterator(min []byte, max []byte, rangeType uint8, offset func (tx *Tx) RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator { return NewRevRangeLimitIterator(tx.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count}) } + +func (tx *Tx) Get(key []byte) ([]byte, error) { + v, err := tx.Tx.Get(key) + tx.st.statGet(v, err) + return v, err +} + +func (tx *Tx) Put(key []byte, value []byte) error { + tx.st.PutNum.Add(1) + return tx.Tx.Put(key, value) +} + +func (tx *Tx) Delete(key []byte) error { + tx.st.DeleteNum.Add(1) + return tx.Tx.Delete(key) +} + +func (tx *Tx) Commit() error { + tx.st.TxCommitNum.Add(1) + return tx.Tx.Commit() +} + +func (tx *Tx) Rollback() error { + return tx.Tx.Rollback() +} diff --git a/store/writebatch.go b/store/writebatch.go index 9fe21ac..e1235df 100644 --- a/store/writebatch.go +++ b/store/writebatch.go @@ -4,6 +4,41 @@ import ( "github.com/siddontang/ledisdb/store/driver" ) -type WriteBatch interface { +type WriteBatch struct { driver.IWriteBatch + st *Stat + putNum int64 + deleteNum int64 +} + +func (wb *WriteBatch) Put(key []byte, value []byte) { + wb.putNum++ + wb.IWriteBatch.Put(key, value) +} + +func (wb *WriteBatch) Delete(key []byte) { + wb.deleteNum++ + wb.IWriteBatch.Delete(key) +} + +func (wb *WriteBatch) Commit() error { + wb.st.BatchCommitNum.Add(1) + wb.st.PutNum.Add(wb.putNum) + wb.st.DeleteNum.Add(wb.deleteNum) + wb.putNum = 0 + wb.deleteNum = 0 + return wb.IWriteBatch.Commit() +} + +func (wb *WriteBatch) SyncCommit() error { + wb.st.BatchSyncCommitNum.Add(1) + wb.st.SyncPutNum.Add(wb.putNum) + wb.st.SyncDeleteNum.Add(wb.deleteNum) + wb.putNum = 0 + wb.deleteNum = 0 + return wb.IWriteBatch.SyncCommit() +} + +func (wb *WriteBatch) Rollback() error { + return wb.IWriteBatch.Rollback() }