ledisdb/rpl/goleveldb_store.go

226 lines
3.4 KiB
Go

package rpl
import (
"bytes"
"fmt"
"os"
"sync"
"time"
"git.internal/re/ledisdb/config"
"git.internal/re/ledisdb/store"
"github.com/siddontang/go/num"
)
type GoLevelDBStore struct {
LogStore
m sync.Mutex
db *store.DB
cfg *config.Config
first uint64
last uint64
buf bytes.Buffer
}
func (s *GoLevelDBStore) FirstID() (uint64, error) {
s.m.Lock()
id, err := s.firstID()
s.m.Unlock()
return id, err
}
func (s *GoLevelDBStore) LastID() (uint64, error) {
s.m.Lock()
id, err := s.lastID()
s.m.Unlock()
return id, err
}
func (s *GoLevelDBStore) firstID() (uint64, error) {
if s.first != InvalidLogID {
return s.first, nil
}
it := s.db.NewIterator()
defer it.Close()
it.SeekToFirst()
if it.Valid() {
s.first = num.BytesToUint64(it.RawKey())
}
return s.first, nil
}
func (s *GoLevelDBStore) lastID() (uint64, error) {
if s.last != InvalidLogID {
return s.last, nil
}
it := s.db.NewIterator()
defer it.Close()
it.SeekToLast()
if it.Valid() {
s.last = num.BytesToUint64(it.RawKey())
}
return s.last, nil
}
func (s *GoLevelDBStore) GetLog(id uint64, log *Log) error {
v, err := s.db.Get(num.Uint64ToBytes(id))
if err != nil {
return err
} else if v == nil {
return ErrLogNotFound
} else {
return log.Decode(bytes.NewBuffer(v))
}
}
func (s *GoLevelDBStore) StoreLog(log *Log) error {
s.m.Lock()
defer s.m.Unlock()
last, err := s.lastID()
if err != nil {
return err
}
s.last = InvalidLogID
s.buf.Reset()
if log.ID != last+1 {
return ErrStoreLogID
}
last = log.ID
key := num.Uint64ToBytes(log.ID)
if err := log.Encode(&s.buf); err != nil {
return err
}
if err = s.db.Put(key, s.buf.Bytes()); err != nil {
return err
}
s.last = last
return nil
}
func (s *GoLevelDBStore) PurgeExpired(n int64) error {
if n <= 0 {
return fmt.Errorf("invalid expired time %d", n)
}
t := uint32(time.Now().Unix() - int64(n))
s.m.Lock()
defer s.m.Unlock()
s.reset()
it := s.db.NewIterator()
it.SeekToFirst()
w := s.db.NewWriteBatch()
defer w.Rollback()
l := new(Log)
for ; it.Valid(); it.Next() {
v := it.RawValue()
if err := l.Unmarshal(v); err != nil {
return err
} else if l.CreateTime > t {
break
} else {
w.Delete(it.RawKey())
}
}
if err := w.Commit(); err != nil {
return err
}
return nil
}
func (s *GoLevelDBStore) Sync() error {
//no other way for sync, so ignore here
return nil
}
func (s *GoLevelDBStore) reset() {
s.first = InvalidLogID
s.last = InvalidLogID
}
func (s *GoLevelDBStore) Clear() error {
s.m.Lock()
defer s.m.Unlock()
if s.db != nil {
s.db.Close()
}
s.reset()
os.RemoveAll(s.cfg.DBPath)
return s.open()
}
func (s *GoLevelDBStore) Close() error {
s.m.Lock()
defer s.m.Unlock()
if s.db == nil {
return nil
}
err := s.db.Close()
s.db = nil
return err
}
func (s *GoLevelDBStore) open() error {
var err error
s.first = InvalidLogID
s.last = InvalidLogID
s.db, err = store.Open(s.cfg)
return err
}
func NewGoLevelDBStore(base string, syncLog int) (*GoLevelDBStore, error) {
cfg := config.NewConfigDefault()
cfg.DBName = "goleveldb"
cfg.DBPath = base
cfg.LevelDB.BlockSize = 16 * 1024 * 1024
cfg.LevelDB.CacheSize = 64 * 1024 * 1024
cfg.LevelDB.WriteBufferSize = 64 * 1024 * 1024
cfg.LevelDB.Compression = false
cfg.DBSyncCommit = syncLog
s := new(GoLevelDBStore)
s.cfg = cfg
if err := s.open(); err != nil {
return nil, err
}
return s, nil
}