mirror of https://github.com/ledisdb/ledisdb.git
226 lines
3.4 KiB
Go
226 lines
3.4 KiB
Go
package rpl
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ledisdb/ledisdb/config"
|
|
"github.com/ledisdb/ledisdb/store"
|
|
"github.com/siddontang/go/num"
|
|
)
|
|
|
|
type GoLevelDBStore struct {
|
|
LogStore
|
|
|
|
m sync.Mutex
|
|
db *store.DB
|
|
|
|
cfg *config.Config
|
|
|
|
first uint64
|
|
last uint64
|
|
|
|
buf bytes.Buffer
|
|
}
|
|
|
|
func (s *GoLevelDBStore) FirstID() (uint64, error) {
|
|
s.m.Lock()
|
|
id, err := s.firstID()
|
|
s.m.Unlock()
|
|
|
|
return id, err
|
|
}
|
|
|
|
func (s *GoLevelDBStore) LastID() (uint64, error) {
|
|
s.m.Lock()
|
|
id, err := s.lastID()
|
|
s.m.Unlock()
|
|
|
|
return id, err
|
|
}
|
|
|
|
func (s *GoLevelDBStore) firstID() (uint64, error) {
|
|
if s.first != InvalidLogID {
|
|
return s.first, nil
|
|
}
|
|
|
|
it := s.db.NewIterator()
|
|
defer it.Close()
|
|
|
|
it.SeekToFirst()
|
|
|
|
if it.Valid() {
|
|
s.first = num.BytesToUint64(it.RawKey())
|
|
}
|
|
|
|
return s.first, nil
|
|
}
|
|
|
|
func (s *GoLevelDBStore) lastID() (uint64, error) {
|
|
if s.last != InvalidLogID {
|
|
return s.last, nil
|
|
}
|
|
|
|
it := s.db.NewIterator()
|
|
defer it.Close()
|
|
|
|
it.SeekToLast()
|
|
|
|
if it.Valid() {
|
|
s.last = num.BytesToUint64(it.RawKey())
|
|
}
|
|
|
|
return s.last, nil
|
|
}
|
|
|
|
func (s *GoLevelDBStore) GetLog(id uint64, log *Log) error {
|
|
v, err := s.db.Get(num.Uint64ToBytes(id))
|
|
if err != nil {
|
|
return err
|
|
} else if v == nil {
|
|
return ErrLogNotFound
|
|
} else {
|
|
return log.Decode(bytes.NewBuffer(v))
|
|
}
|
|
}
|
|
|
|
func (s *GoLevelDBStore) StoreLog(log *Log) error {
|
|
s.m.Lock()
|
|
defer s.m.Unlock()
|
|
|
|
last, err := s.lastID()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
s.last = InvalidLogID
|
|
|
|
s.buf.Reset()
|
|
|
|
if log.ID != last+1 {
|
|
return ErrStoreLogID
|
|
}
|
|
|
|
last = log.ID
|
|
key := num.Uint64ToBytes(log.ID)
|
|
|
|
if err := log.Encode(&s.buf); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err = s.db.Put(key, s.buf.Bytes()); err != nil {
|
|
return err
|
|
}
|
|
|
|
s.last = last
|
|
return nil
|
|
}
|
|
|
|
func (s *GoLevelDBStore) PurgeExpired(n int64) error {
|
|
if n <= 0 {
|
|
return fmt.Errorf("invalid expired time %d", n)
|
|
}
|
|
|
|
t := uint32(time.Now().Unix() - int64(n))
|
|
|
|
s.m.Lock()
|
|
defer s.m.Unlock()
|
|
|
|
s.reset()
|
|
|
|
it := s.db.NewIterator()
|
|
it.SeekToFirst()
|
|
|
|
w := s.db.NewWriteBatch()
|
|
defer w.Rollback()
|
|
|
|
l := new(Log)
|
|
for ; it.Valid(); it.Next() {
|
|
v := it.RawValue()
|
|
|
|
if err := l.Unmarshal(v); err != nil {
|
|
return err
|
|
} else if l.CreateTime > t {
|
|
break
|
|
} else {
|
|
w.Delete(it.RawKey())
|
|
}
|
|
}
|
|
|
|
if err := w.Commit(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *GoLevelDBStore) Sync() error {
|
|
//no other way for sync, so ignore here
|
|
return nil
|
|
}
|
|
|
|
func (s *GoLevelDBStore) reset() {
|
|
s.first = InvalidLogID
|
|
s.last = InvalidLogID
|
|
}
|
|
|
|
func (s *GoLevelDBStore) Clear() error {
|
|
s.m.Lock()
|
|
defer s.m.Unlock()
|
|
|
|
if s.db != nil {
|
|
s.db.Close()
|
|
}
|
|
|
|
s.reset()
|
|
os.RemoveAll(s.cfg.DBPath)
|
|
|
|
return s.open()
|
|
}
|
|
|
|
func (s *GoLevelDBStore) Close() error {
|
|
s.m.Lock()
|
|
defer s.m.Unlock()
|
|
|
|
if s.db == nil {
|
|
return nil
|
|
}
|
|
|
|
err := s.db.Close()
|
|
s.db = nil
|
|
return err
|
|
}
|
|
|
|
func (s *GoLevelDBStore) open() error {
|
|
var err error
|
|
|
|
s.first = InvalidLogID
|
|
s.last = InvalidLogID
|
|
|
|
s.db, err = store.Open(s.cfg)
|
|
return err
|
|
}
|
|
|
|
func NewGoLevelDBStore(base string, syncLog int) (*GoLevelDBStore, error) {
|
|
cfg := config.NewConfigDefault()
|
|
cfg.DBName = "goleveldb"
|
|
cfg.DBPath = base
|
|
cfg.LevelDB.BlockSize = 16 * 1024 * 1024
|
|
cfg.LevelDB.CacheSize = 64 * 1024 * 1024
|
|
cfg.LevelDB.WriteBufferSize = 64 * 1024 * 1024
|
|
cfg.LevelDB.Compression = false
|
|
cfg.DBSyncCommit = syncLog
|
|
|
|
s := new(GoLevelDBStore)
|
|
s.cfg = cfg
|
|
|
|
if err := s.open(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return s, nil
|
|
}
|