From 39d6db56bd21566b7d8ceafb4e6d277163c05b4a Mon Sep 17 00:00:00 2001 From: siddontang Date: Sat, 13 Sep 2014 23:06:36 +0800 Subject: [PATCH 1/7] change open file and mkdir mode open file use 0644 mkdir use 0755 --- cmd/ledis-dump/main.go | 2 +- ledis/binlog.go | 2 +- server/replication.go | 4 ++-- store/boltdb/db.go | 2 +- store/goleveldb/db.go | 2 +- store/hyperleveldb/db.go | 2 +- store/leveldb/db.go | 2 +- store/rocksdb/db.go | 2 +- store/store.go | 2 +- 9 files changed, 10 insertions(+), 10 deletions(-) diff --git a/cmd/ledis-dump/main.go b/cmd/ledis-dump/main.go index bf02ad7..b6a798d 100644 --- a/cmd/ledis-dump/main.go +++ b/cmd/ledis-dump/main.go @@ -23,7 +23,7 @@ func main() { var err error var f *os.File - if f, err = os.OpenFile(*dumpFile, os.O_CREATE|os.O_WRONLY, os.ModePerm); err != nil { + if f, err = os.OpenFile(*dumpFile, os.O_CREATE|os.O_WRONLY, 0644); err != nil { println(err.Error()) return } diff --git a/ledis/binlog.go b/ledis/binlog.go index 077398a..f26323e 100644 --- a/ledis/binlog.go +++ b/ledis/binlog.go @@ -117,7 +117,7 @@ func NewBinLog(cfg *config.Config) (*BinLog, error) { l.path = path.Join(cfg.DataDir, "binlog") - if err := os.MkdirAll(l.path, os.ModePerm); err != nil { + if err := os.MkdirAll(l.path, 0755); err != nil { return nil, err } diff --git a/server/replication.go b/server/replication.go index 267a29b..445a813 100644 --- a/server/replication.go +++ b/server/replication.go @@ -38,7 +38,7 @@ func (m *MasterInfo) Save(filePath string) error { filePathBak := fmt.Sprintf("%s.bak", filePath) var fd *os.File - fd, err = os.OpenFile(filePathBak, os.O_CREATE|os.O_WRONLY, os.ModePerm) + fd, err = os.OpenFile(filePathBak, os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return err } @@ -250,7 +250,7 @@ func (m *master) fullSync() error { } dumpPath := path.Join(m.app.cfg.DataDir, "master.dump") - f, err := os.OpenFile(dumpPath, os.O_CREATE|os.O_WRONLY, os.ModePerm) + f, err := os.OpenFile(dumpPath, os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return err } diff --git a/store/boltdb/db.go b/store/boltdb/db.go index f2cb1f3..15a0570 100644 --- a/store/boltdb/db.go +++ b/store/boltdb/db.go @@ -18,7 +18,7 @@ func (s Store) String() string { } func (s Store) Open(dbPath string, cfg *config.Config) (driver.IDB, error) { - os.MkdirAll(dbPath, os.ModePerm) + os.MkdirAll(dbPath, 0755) name := path.Join(dbPath, "ledis_bolt.db") db := new(DB) var err error diff --git a/store/goleveldb/db.go b/store/goleveldb/db.go index 2a13f50..dad6a90 100644 --- a/store/goleveldb/db.go +++ b/store/goleveldb/db.go @@ -47,7 +47,7 @@ type DB struct { } func (s Store) Open(path string, cfg *config.Config) (driver.IDB, error) { - if err := os.MkdirAll(path, os.ModePerm); err != nil { + if err := os.MkdirAll(path, 0755); err != nil { return nil, err } diff --git a/store/hyperleveldb/db.go b/store/hyperleveldb/db.go index d6d7aeb..071dd38 100644 --- a/store/hyperleveldb/db.go +++ b/store/hyperleveldb/db.go @@ -28,7 +28,7 @@ func (s Store) String() string { } func (s Store) Open(path string, cfg *config.Config) (driver.IDB, error) { - if err := os.MkdirAll(path, os.ModePerm); err != nil { + if err := os.MkdirAll(path, 0755); err != nil { return nil, err } diff --git a/store/leveldb/db.go b/store/leveldb/db.go index 92a2419..ab29928 100644 --- a/store/leveldb/db.go +++ b/store/leveldb/db.go @@ -28,7 +28,7 @@ func (s Store) String() string { } func (s Store) Open(path string, cfg *config.Config) (driver.IDB, error) { - if err := os.MkdirAll(path, os.ModePerm); err != nil { + if err := os.MkdirAll(path, 0755); err != nil { return nil, err } diff --git a/store/rocksdb/db.go b/store/rocksdb/db.go index 1c79229..f3fb406 100644 --- a/store/rocksdb/db.go +++ b/store/rocksdb/db.go @@ -29,7 +29,7 @@ func (s Store) String() string { } func (s Store) Open(path string, cfg *config.Config) (driver.IDB, error) { - if err := os.MkdirAll(path, os.ModePerm); err != nil { + if err := os.MkdirAll(path, 0755); err != nil { return nil, err } diff --git a/store/store.go b/store/store.go index d620c03..aa4b485 100644 --- a/store/store.go +++ b/store/store.go @@ -27,7 +27,7 @@ func Open(cfg *config.Config) (*DB, error) { path := getStorePath(cfg) - if err := os.MkdirAll(path, os.ModePerm); err != nil { + if err := os.MkdirAll(path, 0755); err != nil { return nil, err } From 95cbcc6460d932fee47d1ce5765925c9d81d55cf Mon Sep 17 00:00:00 2001 From: siddontang Date: Mon, 15 Sep 2014 22:42:15 +0800 Subject: [PATCH 2/7] add dbpath config --- config/config.go | 1 + config/config.toml | 3 +++ etc/ledis.conf | 3 +++ store/store.go | 6 +++++- 4 files changed, 12 insertions(+), 1 deletion(-) diff --git a/config/config.go b/config/config.go index ca93d29..1464edf 100644 --- a/config/config.go +++ b/config/config.go @@ -50,6 +50,7 @@ type Config struct { DataDir string `toml:"data_dir"` DBName string `toml:"db_name"` + DBPath string `toml:"db_path"` LevelDB LevelDBConfig `toml:"leveldb"` diff --git a/config/config.toml b/config/config.toml index 2a3a246..f271a70 100644 --- a/config/config.toml +++ b/config/config.toml @@ -27,6 +27,9 @@ slaveof = "" # db_name = "leveldb" +# if not set, use data_dir/"db_name"_data +db_path = "" + [leveldb] compression = false block_size = 32768 diff --git a/etc/ledis.conf b/etc/ledis.conf index d3adbd8..c0606eb 100644 --- a/etc/ledis.conf +++ b/etc/ledis.conf @@ -29,6 +29,9 @@ slaveof = "" # db_name = "leveldb" +# if not set, use data_dir/"db_name"_data +db_path = "" + [leveldb] compression = false block_size = 32768 diff --git a/store/store.go b/store/store.go index aa4b485..2edde30 100644 --- a/store/store.go +++ b/store/store.go @@ -16,7 +16,11 @@ import ( ) func getStorePath(cfg *config.Config) string { - return path.Join(cfg.DataDir, fmt.Sprintf("%s_data", cfg.DBName)) + if len(cfg.DBPath) > 0 { + return cfg.DBPath + } else { + return path.Join(cfg.DataDir, fmt.Sprintf("%s_data", cfg.DBName)) + } } func Open(cfg *config.Config) (*DB, error) { From 63e24376113bd69756205663cd9850929649e9dd Mon Sep 17 00:00:00 2001 From: siddontang Date: Mon, 15 Sep 2014 22:42:25 +0800 Subject: [PATCH 3/7] add base awl package --- wal/file_store.go | 83 +++++++++++++++ wal/gen.go | 137 ++++++++++++++++++++++++ wal/gen_test.go | 48 +++++++++ wal/goleveldb_store.go | 229 +++++++++++++++++++++++++++++++++++++++++ wal/log.go | 73 +++++++++++++ wal/log_test.go | 27 +++++ wal/store_test.go | 137 ++++++++++++++++++++++++ wal/wal.go | 46 +++++++++ 8 files changed, 780 insertions(+) create mode 100644 wal/file_store.go create mode 100644 wal/gen.go create mode 100644 wal/gen_test.go create mode 100644 wal/goleveldb_store.go create mode 100644 wal/log.go create mode 100644 wal/log_test.go create mode 100644 wal/store_test.go create mode 100644 wal/wal.go diff --git a/wal/file_store.go b/wal/file_store.go new file mode 100644 index 0000000..0d39efb --- /dev/null +++ b/wal/file_store.go @@ -0,0 +1,83 @@ +package wal + +import ( + "os" + "sync" +) + +const ( + defaultMaxLogFileSize = 1024 * 1024 * 1024 + defaultMaxLogFileNum = 10 +) + +type FileStore struct { + Store + + m sync.Mutex + + maxFileSize int + maxFileNum int + + first uint64 + last uint64 +} + +func NewFileStore(path string) (*FileStore, error) { + s := new(FileStore) + + if err := os.MkdirAll(path, 0755); err != nil { + return nil, err + } + + s.maxFileSize = defaultMaxLogFileSize + s.maxFileNum = defaultMaxLogFileNum + + s.first = 0 + s.last = 0 + + return s, nil +} + +func (s *FileStore) SetMaxFileSize(size int) { + s.maxFileSize = size +} + +func (s *FileStore) SetMaxFileNum(n int) { + s.maxFileNum = n +} + +func (s *FileStore) GetLog(id uint64, log *Log) error { + return nil +} + +func (s *FileStore) SeekLog(id uint64, log *Log) error { + return nil +} + +func (s *FileStore) FirstID() (uint64, error) { + return 0, nil +} + +func (s *FileStore) LastID() (uint64, error) { + return 0, nil +} + +func (s *FileStore) StoreLog(log *Log) error { + return nil +} + +func (s *FileStore) StoreLogs(logs []*Log) error { + return nil +} + +func (s *FileStore) DeleteRange(start, stop uint64) error { + return nil +} + +func (s *FileStore) Clear() error { + return nil +} + +func (s *FileStore) Close() error { + return nil +} diff --git a/wal/gen.go b/wal/gen.go new file mode 100644 index 0000000..9d84938 --- /dev/null +++ b/wal/gen.go @@ -0,0 +1,137 @@ +package wal + +import ( + "encoding/binary" + "fmt" + "os" + "path" + "sync" +) + +type FileIDGenerator struct { + LogIDGenerator + + m sync.Mutex + f *os.File + + id uint64 +} + +func NewFileIDGenerator(base string) (*FileIDGenerator, error) { + if err := os.MkdirAll(base, 0755); err != nil { + return nil, err + } + + g := new(FileIDGenerator) + + name := path.Join(base, "log.id") + + var err error + if g.f, err = os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0644); err != nil { + return nil, err + } + + s, _ := g.f.Stat() + if s.Size() == 0 { + g.id = 0 + } else if s.Size() == 8 { + if err = binary.Read(g.f, binary.BigEndian, &g.id); err != nil { + g.f.Close() + return nil, err + } else if g.id == InvalidLogID { + g.f.Close() + return nil, fmt.Errorf("read invalid log id in %s", name) + } + } else { + g.f.Close() + return nil, fmt.Errorf("log id file %s is invalid", name) + } + + return g, nil +} + +func (g *FileIDGenerator) Reset(id uint64) error { + g.m.Lock() + defer g.m.Unlock() + + if g.f == nil { + return fmt.Errorf("generator closed") + } + + if g.id < id { + g.id = id + } + + return nil +} + +func (g *FileIDGenerator) GenerateID() (uint64, error) { + g.m.Lock() + defer g.m.Unlock() + + if g.f == nil { + return 0, fmt.Errorf("generator closed") + } + + if _, err := g.f.Seek(0, os.SEEK_SET); err != nil { + return 0, nil + } + + id := g.id + 1 + + if err := binary.Write(g.f, binary.BigEndian, id); err != nil { + return 0, nil + } + + g.id = id + + return id, nil +} + +func (g *FileIDGenerator) Close() error { + g.m.Lock() + defer g.m.Unlock() + + if g.f != nil { + err := g.f.Close() + g.f = nil + return err + } + return nil +} + +type MemIDGenerator struct { + m sync.Mutex + + LogIDGenerator + + id uint64 +} + +func NewMemIDGenerator(baseID uint64) *MemIDGenerator { + g := &MemIDGenerator{id: baseID} + return g +} + +func (g *MemIDGenerator) Reset(id uint64) error { + g.m.Lock() + defer g.m.Unlock() + + if g.id < id { + g.id = id + } + return nil +} + +func (g *MemIDGenerator) GenerateID() (uint64, error) { + g.m.Lock() + defer g.m.Unlock() + + g.id++ + id := g.id + return id, nil +} + +func (g *MemIDGenerator) Close() error { + return nil +} diff --git a/wal/gen_test.go b/wal/gen_test.go new file mode 100644 index 0000000..2f60999 --- /dev/null +++ b/wal/gen_test.go @@ -0,0 +1,48 @@ +package wal + +import ( + "io/ioutil" + "os" + "testing" +) + +func testGenerator(t *testing.T, g LogIDGenerator, base uint64) { + for i := base; i < base+100; i++ { + id, err := g.GenerateID() + if err != nil { + t.Fatal(err) + } else if id != i { + t.Fatal(id, i) + } + } +} + +func TestGenerator(t *testing.T) { + base, err := ioutil.TempDir("", "wal") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(base) + + var g *FileIDGenerator + if g, err = NewFileIDGenerator(base); err != nil { + t.Fatal(err) + } else { + testGenerator(t, g, 1) + if err = g.Close(); err != nil { + t.Fatal(err) + } + } + + if g, err = NewFileIDGenerator(base); err != nil { + t.Fatal(err) + } else { + testGenerator(t, g, 101) + if err = g.Close(); err != nil { + t.Fatal(err) + } + } + + m := NewMemIDGenerator(100) + testGenerator(t, m, 101) +} diff --git a/wal/goleveldb_store.go b/wal/goleveldb_store.go new file mode 100644 index 0000000..0510432 --- /dev/null +++ b/wal/goleveldb_store.go @@ -0,0 +1,229 @@ +package wal + +import ( + "bytes" + "github.com/siddontang/go/num" + "github.com/siddontang/ledisdb/config" + "github.com/siddontang/ledisdb/store" + "os" + "sync" +) + +type GoLevelDBStore struct { + m sync.Mutex + db *store.DB + + cfg *config.Config + + first uint64 + last uint64 +} + +func (s *GoLevelDBStore) FirstID() (uint64, error) { + s.m.Lock() + defer s.m.Unlock() + return s.firstID() +} + +func (s *GoLevelDBStore) LastID() (uint64, error) { + s.m.Lock() + defer s.m.Unlock() + return s.lastID() +} + +func (s *GoLevelDBStore) firstID() (uint64, error) { + if s.first != InvalidLogID { + return s.first, nil + } + + it := s.db.NewIterator() + defer it.Close() + + it.SeekToFirst() + + if it.Valid() { + s.first = num.BytesToUint64(it.RawKey()) + } + + return s.first, nil +} + +func (s *GoLevelDBStore) lastID() (uint64, error) { + if s.last != InvalidLogID { + return s.last, nil + } + + it := s.db.NewIterator() + defer it.Close() + + it.SeekToLast() + + if it.Valid() { + s.last = num.BytesToUint64(it.RawKey()) + } + + return s.last, nil +} + +func (s *GoLevelDBStore) GetLog(id uint64, log *Log) error { + v, err := s.db.Get(num.Uint64ToBytes(id)) + if err != nil { + return err + } else if v == nil { + return ErrLogNotFound + } else { + return log.Decode(bytes.NewBuffer(v)) + } +} + +func (s *GoLevelDBStore) SeekLog(id uint64, log *Log) error { + it := s.db.NewIterator() + defer it.Close() + + it.Seek(num.Uint64ToBytes(id)) + + if !it.Valid() { + return ErrLogNotFound + } else { + return log.Decode(bytes.NewBuffer(it.RawValue())) + } +} + +func (s *GoLevelDBStore) StoreLog(log *Log) error { + return s.StoreLogs([]*Log{log}) +} + +func (s *GoLevelDBStore) StoreLogs(logs []*Log) error { + s.m.Lock() + defer s.m.Unlock() + + w := s.db.NewWriteBatch() + defer w.Rollback() + + last := s.last + + s.reset() + + var buf bytes.Buffer + for _, log := range logs { + buf.Reset() + + if log.ID <= last { + return ErrLessLogID + } + + last = log.ID + key := num.Uint64ToBytes(log.ID) + + if err := log.Encode(&buf); err != nil { + return err + } + w.Put(key, buf.Bytes()) + } + + return w.Commit() +} + +func (s *GoLevelDBStore) DeleteRange(min, max uint64) error { + s.m.Lock() + defer s.m.Unlock() + + var first, last uint64 + var err error + + first, err = s.firstID() + if err != nil { + return err + } + + last, err = s.lastID() + if err != nil { + return err + } + + min = num.MaxUint64(min, first) + max = num.MinUint64(max, last) + + w := s.db.NewWriteBatch() + defer w.Rollback() + + n := 0 + + s.reset() + + for i := min; i <= max; i++ { + w.Delete(num.Uint64ToBytes(i)) + n++ + if n > 1024 { + if err = w.Commit(); err != nil { + return err + } + n = 0 + } + } + + if err = w.Commit(); err != nil { + return err + } + return nil +} + +func (s *GoLevelDBStore) Clear() error { + s.m.Lock() + defer s.m.Unlock() + + if s.db != nil { + s.db.Close() + } + + os.RemoveAll(s.cfg.DBPath) + + return s.open() +} + +func (s *GoLevelDBStore) reset() { + s.first = InvalidLogID + s.last = InvalidLogID +} + +func (s *GoLevelDBStore) Close() error { + s.m.Lock() + defer s.m.Unlock() + + if s.db == nil { + return nil + } + + err := s.db.Close() + s.db = nil + return err +} + +func (s *GoLevelDBStore) open() error { + var err error + + s.first = InvalidLogID + s.last = InvalidLogID + + s.db, err = store.Open(s.cfg) + return err +} + +func NewGoLevelDBStore(base string) (*GoLevelDBStore, error) { + cfg := new(config.Config) + cfg.DBName = "goleveldb" + cfg.DBPath = base + cfg.LevelDB.BlockSize = 4 * 1024 * 1024 + cfg.LevelDB.CacheSize = 16 * 1024 * 1024 + cfg.LevelDB.WriteBufferSize = 4 * 1024 * 1024 + cfg.LevelDB.Compression = false + + s := new(GoLevelDBStore) + s.cfg = cfg + + if err := s.open(); err != nil { + return nil, err + } + + return s, nil +} diff --git a/wal/log.go b/wal/log.go new file mode 100644 index 0000000..8ff8c95 --- /dev/null +++ b/wal/log.go @@ -0,0 +1,73 @@ +package wal + +import ( + "encoding/binary" + "io" +) + +type Log struct { + ID uint64 + CreateTime uint32 + // 0 for no compression + // 1 for snappy compression + Compression uint8 + Data []byte +} + +func (l *Log) Encode(w io.Writer) error { + length := uint32(17) + buf := make([]byte, length) + + pos := 0 + binary.BigEndian.PutUint64(buf[pos:], l.ID) + pos += 8 + + binary.BigEndian.PutUint32(buf[pos:], l.CreateTime) + pos += 4 + + buf[pos] = l.Compression + pos++ + + binary.BigEndian.PutUint32(buf[pos:], uint32(len(l.Data))) + + if n, err := w.Write(buf); err != nil { + return err + } else if n != len(buf) { + return io.ErrShortWrite + } + + if n, err := w.Write(l.Data); err != nil { + return err + } else if n != len(l.Data) { + return io.ErrShortWrite + } + return nil +} + +func (l *Log) Decode(r io.Reader) error { + length := uint32(17) + buf := make([]byte, length) + + if _, err := io.ReadFull(r, buf); err != nil { + return err + } + + pos := 0 + l.ID = binary.BigEndian.Uint64(buf[pos:]) + pos += 8 + + l.CreateTime = binary.BigEndian.Uint32(buf[pos:]) + pos += 4 + + l.Compression = buf[pos] + pos++ + + length = binary.BigEndian.Uint32(buf[pos:]) + + l.Data = make([]byte, length) + if _, err := io.ReadFull(r, l.Data); err != nil { + return err + } + + return nil +} diff --git a/wal/log_test.go b/wal/log_test.go new file mode 100644 index 0000000..ea5d91e --- /dev/null +++ b/wal/log_test.go @@ -0,0 +1,27 @@ +package wal + +import ( + "bytes" + "reflect" + "testing" +) + +func TestLog(t *testing.T) { + l1 := &Log{ID: 1, CreateTime: 100, Compression: 0, Data: []byte("hello world")} + + var buf bytes.Buffer + + if err := l1.Encode(&buf); err != nil { + t.Fatal(err) + } + + l2 := &Log{} + + if err := l2.Decode(&buf); err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(l1, l2) { + t.Fatal("must equal") + } +} diff --git a/wal/store_test.go b/wal/store_test.go new file mode 100644 index 0000000..45636c6 --- /dev/null +++ b/wal/store_test.go @@ -0,0 +1,137 @@ +package wal + +import ( + "io/ioutil" + "os" + "testing" +) + +func TestGoLevelDBStore(t *testing.T) { + // Create a test dir + dir, err := ioutil.TempDir("", "wal") + if err != nil { + t.Fatalf("err: %v ", err) + } + defer os.RemoveAll(dir) + + // New level + l, err := NewGoLevelDBStore(dir) + if err != nil { + t.Fatalf("err: %v ", err) + } + defer l.Close() + + testLogs(t, l) +} + +func testLogs(t *testing.T, l Store) { + // Should be no first index + idx, err := l.FirstID() + if err != nil { + t.Fatalf("err: %v ", err) + } + if idx != 0 { + t.Fatalf("bad idx: %d", idx) + } + + // Should be no last index + idx, err = l.LastID() + if err != nil { + t.Fatalf("err: %v ", err) + } + if idx != 0 { + t.Fatalf("bad idx: %d", idx) + } + + // Try a filed fetch + var out Log + if err := l.GetLog(10, &out); err.Error() != "log not found" { + t.Fatalf("err: %v ", err) + } + + // Write out a log + log := Log{ + ID: 1, + Data: []byte("first"), + } + for i := 1; i <= 10; i++ { + log.ID = uint64(i) + if err := l.StoreLog(&log); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Attempt to write multiple logs + var logs []*Log + for i := 11; i <= 20; i++ { + nl := &Log{ + ID: uint64(i), + Data: []byte("first"), + } + logs = append(logs, nl) + } + if err := l.StoreLogs(logs); err != nil { + t.Fatalf("err: %v", err) + } + + // Try to fetch + if err := l.GetLog(10, &out); err != nil { + t.Fatalf("err: %v ", err) + } + + // Try to fetch + if err := l.GetLog(20, &out); err != nil { + t.Fatalf("err: %v ", err) + } + + // Check the lowest index + idx, err = l.FirstID() + if err != nil { + t.Fatalf("err: %v ", err) + } + if idx != 1 { + t.Fatalf("bad idx: %d", idx) + } + + // Check the highest index + idx, err = l.LastID() + if err != nil { + t.Fatalf("err: %v ", err) + } + if idx != 20 { + t.Fatalf("bad idx: %d", idx) + } + + // Delete a suffix + if err := l.DeleteRange(5, 20); err != nil { + t.Fatalf("err: %v ", err) + } + + // Verify they are all deleted + for i := 5; i <= 20; i++ { + if err := l.GetLog(uint64(i), &out); err != ErrLogNotFound { + t.Fatalf("err: %v ", err) + } + } + + // Index should be one + idx, err = l.FirstID() + if err != nil { + t.Fatalf("err: %v ", err) + } + if idx != 1 { + t.Fatalf("bad idx: %d", idx) + } + idx, err = l.LastID() + if err != nil { + t.Fatalf("err: %v ", err) + } + if idx != 4 { + t.Fatalf("bad idx: %d", idx) + } + + // Should not be able to fetch + if err := l.GetLog(5, &out); err != ErrLogNotFound { + t.Fatalf("err: %v ", err) + } +} diff --git a/wal/wal.go b/wal/wal.go new file mode 100644 index 0000000..b32139c --- /dev/null +++ b/wal/wal.go @@ -0,0 +1,46 @@ +package wal + +import ( + "errors" +) + +const ( + InvalidLogID uint64 = 0 +) + +var ( + ErrLogNotFound = errors.New("log not found") + ErrLessLogID = errors.New("log id is less") +) + +type LogIDGenerator interface { + // Force reset to id, if current id is larger than id, nothing reset + Reset(id uint64) error + + // ID must be first at 1, and increased monotonously, 0 is invalid + GenerateID() (uint64, error) + + Close() error +} + +type Store interface { + GetLog(id uint64, log *Log) error + + // Get the first log which ID is equal or larger than id + SeekLog(id uint64, log *Log) error + + FirstID() (uint64, error) + LastID() (uint64, error) + + // if log id is less than current last id, return error + StoreLog(log *Log) error + StoreLogs(logs []*Log) error + + // Delete logs [start, stop] + DeleteRange(start, stop uint64) error + + // Clear all logs + Clear() error + + Close() error +} From 63f8f82727d3e99740fc887b7183b174a22f11a3 Mon Sep 17 00:00:00 2001 From: siddontang Date: Tue, 16 Sep 2014 08:39:52 +0800 Subject: [PATCH 4/7] remove id gen, update --- wal/gen.go | 137 ----------------------------------------- wal/gen_test.go | 48 --------------- wal/goleveldb_store.go | 16 ++++- wal/log.go | 18 ++++++ wal/log_test.go | 13 ++++ wal/wal.go | 10 --- 6 files changed, 44 insertions(+), 198 deletions(-) delete mode 100644 wal/gen.go delete mode 100644 wal/gen_test.go diff --git a/wal/gen.go b/wal/gen.go deleted file mode 100644 index 9d84938..0000000 --- a/wal/gen.go +++ /dev/null @@ -1,137 +0,0 @@ -package wal - -import ( - "encoding/binary" - "fmt" - "os" - "path" - "sync" -) - -type FileIDGenerator struct { - LogIDGenerator - - m sync.Mutex - f *os.File - - id uint64 -} - -func NewFileIDGenerator(base string) (*FileIDGenerator, error) { - if err := os.MkdirAll(base, 0755); err != nil { - return nil, err - } - - g := new(FileIDGenerator) - - name := path.Join(base, "log.id") - - var err error - if g.f, err = os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0644); err != nil { - return nil, err - } - - s, _ := g.f.Stat() - if s.Size() == 0 { - g.id = 0 - } else if s.Size() == 8 { - if err = binary.Read(g.f, binary.BigEndian, &g.id); err != nil { - g.f.Close() - return nil, err - } else if g.id == InvalidLogID { - g.f.Close() - return nil, fmt.Errorf("read invalid log id in %s", name) - } - } else { - g.f.Close() - return nil, fmt.Errorf("log id file %s is invalid", name) - } - - return g, nil -} - -func (g *FileIDGenerator) Reset(id uint64) error { - g.m.Lock() - defer g.m.Unlock() - - if g.f == nil { - return fmt.Errorf("generator closed") - } - - if g.id < id { - g.id = id - } - - return nil -} - -func (g *FileIDGenerator) GenerateID() (uint64, error) { - g.m.Lock() - defer g.m.Unlock() - - if g.f == nil { - return 0, fmt.Errorf("generator closed") - } - - if _, err := g.f.Seek(0, os.SEEK_SET); err != nil { - return 0, nil - } - - id := g.id + 1 - - if err := binary.Write(g.f, binary.BigEndian, id); err != nil { - return 0, nil - } - - g.id = id - - return id, nil -} - -func (g *FileIDGenerator) Close() error { - g.m.Lock() - defer g.m.Unlock() - - if g.f != nil { - err := g.f.Close() - g.f = nil - return err - } - return nil -} - -type MemIDGenerator struct { - m sync.Mutex - - LogIDGenerator - - id uint64 -} - -func NewMemIDGenerator(baseID uint64) *MemIDGenerator { - g := &MemIDGenerator{id: baseID} - return g -} - -func (g *MemIDGenerator) Reset(id uint64) error { - g.m.Lock() - defer g.m.Unlock() - - if g.id < id { - g.id = id - } - return nil -} - -func (g *MemIDGenerator) GenerateID() (uint64, error) { - g.m.Lock() - defer g.m.Unlock() - - g.id++ - id := g.id - return id, nil -} - -func (g *MemIDGenerator) Close() error { - return nil -} diff --git a/wal/gen_test.go b/wal/gen_test.go deleted file mode 100644 index 2f60999..0000000 --- a/wal/gen_test.go +++ /dev/null @@ -1,48 +0,0 @@ -package wal - -import ( - "io/ioutil" - "os" - "testing" -) - -func testGenerator(t *testing.T, g LogIDGenerator, base uint64) { - for i := base; i < base+100; i++ { - id, err := g.GenerateID() - if err != nil { - t.Fatal(err) - } else if id != i { - t.Fatal(id, i) - } - } -} - -func TestGenerator(t *testing.T) { - base, err := ioutil.TempDir("", "wal") - if err != nil { - t.Fatal(err) - } - defer os.RemoveAll(base) - - var g *FileIDGenerator - if g, err = NewFileIDGenerator(base); err != nil { - t.Fatal(err) - } else { - testGenerator(t, g, 1) - if err = g.Close(); err != nil { - t.Fatal(err) - } - } - - if g, err = NewFileIDGenerator(base); err != nil { - t.Fatal(err) - } else { - testGenerator(t, g, 101) - if err = g.Close(); err != nil { - t.Fatal(err) - } - } - - m := NewMemIDGenerator(100) - testGenerator(t, m, 101) -} diff --git a/wal/goleveldb_store.go b/wal/goleveldb_store.go index 0510432..2c9b09d 100644 --- a/wal/goleveldb_store.go +++ b/wal/goleveldb_store.go @@ -100,9 +100,12 @@ func (s *GoLevelDBStore) StoreLogs(logs []*Log) error { w := s.db.NewWriteBatch() defer w.Rollback() - last := s.last + last, err := s.lastID() + if err != nil { + return err + } - s.reset() + s.last = InvalidLogID var buf bytes.Buffer for _, log := range logs { @@ -121,7 +124,12 @@ func (s *GoLevelDBStore) StoreLogs(logs []*Log) error { w.Put(key, buf.Bytes()) } - return w.Commit() + if err := w.Commit(); err != nil { + return err + } + + s.last = last + return nil } func (s *GoLevelDBStore) DeleteRange(min, max uint64) error { @@ -165,6 +173,7 @@ func (s *GoLevelDBStore) DeleteRange(min, max uint64) error { if err = w.Commit(); err != nil { return err } + return nil } @@ -176,6 +185,7 @@ func (s *GoLevelDBStore) Clear() error { s.db.Close() } + s.reset() os.RemoveAll(s.cfg.DBPath) return s.open() diff --git a/wal/log.go b/wal/log.go index 8ff8c95..d567c60 100644 --- a/wal/log.go +++ b/wal/log.go @@ -1,6 +1,7 @@ package wal import ( + "bytes" "encoding/binary" "io" ) @@ -14,6 +15,23 @@ type Log struct { Data []byte } +func (l *Log) Marshal() ([]byte, error) { + buf := bytes.NewBuffer(make([]byte, 17+len(l.Data))) + buf.Reset() + + if err := l.Encode(buf); err != nil { + return nil, err + } + + return buf.Bytes(), nil +} + +func (l *Log) Unmarshal(b []byte) error { + buf := bytes.NewBuffer(b) + + return l.Decode(buf) +} + func (l *Log) Encode(w io.Writer) error { length := uint32(17) buf := make([]byte, length) diff --git a/wal/log_test.go b/wal/log_test.go index ea5d91e..cfd8c22 100644 --- a/wal/log_test.go +++ b/wal/log_test.go @@ -24,4 +24,17 @@ func TestLog(t *testing.T) { if !reflect.DeepEqual(l1, l2) { t.Fatal("must equal") } + + if buf, err := l1.Marshal(); err != nil { + t.Fatal(err) + } else { + if err = l2.Unmarshal(buf); err != nil { + t.Fatal(err) + } + } + + if !reflect.DeepEqual(l1, l2) { + t.Fatal("must equal") + } + } diff --git a/wal/wal.go b/wal/wal.go index b32139c..cc18870 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -13,16 +13,6 @@ var ( ErrLessLogID = errors.New("log id is less") ) -type LogIDGenerator interface { - // Force reset to id, if current id is larger than id, nothing reset - Reset(id uint64) error - - // ID must be first at 1, and increased monotonously, 0 is invalid - GenerateID() (uint64, error) - - Close() error -} - type Store interface { GetLog(id uint64, log *Log) error From 3a37b2e29713023dd80ef5dc9dee112bec09f58d Mon Sep 17 00:00:00 2001 From: siddontang Date: Tue, 16 Sep 2014 08:41:38 +0800 Subject: [PATCH 5/7] update store clear test --- wal/store_test.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/wal/store_test.go b/wal/store_test.go index 45636c6..5b32f3e 100644 --- a/wal/store_test.go +++ b/wal/store_test.go @@ -134,4 +134,24 @@ func testLogs(t *testing.T, l Store) { if err := l.GetLog(5, &out); err != ErrLogNotFound { t.Fatalf("err: %v ", err) } + + if err := l.Clear(); err != nil { + t.Fatal(err) + } + + idx, err = l.FirstID() + if err != nil { + t.Fatalf("err: %v ", err) + } + if idx != 0 { + t.Fatalf("bad idx: %d", idx) + } + + idx, err = l.LastID() + if err != nil { + t.Fatalf("err: %v ", err) + } + if idx != 0 { + t.Fatalf("bad idx: %d", idx) + } } From 8b8745be9251d0e8eb5782431a4d73e283691c68 Mon Sep 17 00:00:00 2001 From: siddontang Date: Wed, 17 Sep 2014 17:54:04 +0800 Subject: [PATCH 6/7] refactor, can not build and run at all --- cmd/ledis-binlog/main.go | 85 ------ cmd/ledis-load/main.go | 15 +- config/config.go | 35 +-- config/config.toml | 10 +- etc/ledis.conf | 7 +- ledis/batch.go | 40 ++- ledis/binlog.go | 400 ----------------------------- ledis/binlog_test.go | 49 ---- ledis/const.go | 6 - ledis/dump.go | 77 +----- ledis/dump_test.go | 2 +- ledis/{binlog_util.go => event.go} | 157 +++++------ ledis/event_test.go | 34 +++ ledis/ledis.go | 18 +- ledis/ledis_test.go | 2 - ledis/t_hash.go | 2 - ledis/t_kv.go | 5 - ledis/t_zset.go | 4 +- ledis/tx.go | 6 - wal/file_store.go | 165 +++++++++++- wal/goleveldb_store.go | 57 +++- wal/log.go | 36 +-- wal/log_test.go | 1 - wal/store_test.go | 40 ++- wal/wal.go | 20 +- 25 files changed, 450 insertions(+), 823 deletions(-) delete mode 100644 cmd/ledis-binlog/main.go delete mode 100644 ledis/binlog.go delete mode 100644 ledis/binlog_test.go rename ledis/{binlog_util.go => event.go} (63%) create mode 100644 ledis/event_test.go diff --git a/cmd/ledis-binlog/main.go b/cmd/ledis-binlog/main.go deleted file mode 100644 index 3725920..0000000 --- a/cmd/ledis-binlog/main.go +++ /dev/null @@ -1,85 +0,0 @@ -package main - -import ( - "bufio" - "flag" - "fmt" - "github.com/siddontang/ledisdb/ledis" - "os" - "time" -) - -var TimeFormat = "2006-01-02 15:04:05" - -var startDateTime = flag.String("start-datetime", "", - "Start reading the binary log at the first event having a timestamp equal to or later than the datetime argument.") -var stopDateTime = flag.String("stop-datetime", "", - "Stop reading the binary log at the first event having a timestamp equal to or earlier than the datetime argument.") - -var startTime uint32 = 0 -var stopTime uint32 = 0xFFFFFFFF - -func main() { - flag.Usage = func() { - fmt.Fprintf(os.Stderr, "Usage of %s [options] log_file\n", os.Args[0]) - flag.PrintDefaults() - } - - flag.Parse() - - logFile := flag.Arg(0) - f, err := os.Open(logFile) - if err != nil { - println(err.Error()) - return - } - defer f.Close() - - var t time.Time - - if len(*startDateTime) > 0 { - if t, err = time.Parse(TimeFormat, *startDateTime); err != nil { - println("parse start-datetime error: ", err.Error()) - return - } - - startTime = uint32(t.Unix()) - } - - if len(*stopDateTime) > 0 { - if t, err = time.Parse(TimeFormat, *stopDateTime); err != nil { - println("parse stop-datetime error: ", err.Error()) - return - } - - stopTime = uint32(t.Unix()) - } - - rb := bufio.NewReaderSize(f, 4096) - err = ledis.ReadEventFromReader(rb, printEvent) - if err != nil { - println("read event error: ", err.Error()) - return - } -} - -func printEvent(head *ledis.BinLogHead, event []byte) error { - if head.CreateTime < startTime || head.CreateTime > stopTime { - return nil - } - - t := time.Unix(int64(head.CreateTime), 0) - - fmt.Printf("%s ", t.Format(TimeFormat)) - - s, err := ledis.FormatBinLogEvent(event) - if err != nil { - fmt.Printf("%s", err.Error()) - } else { - fmt.Printf(s) - } - - fmt.Printf("\n") - - return nil -} diff --git a/cmd/ledis-load/main.go b/cmd/ledis-load/main.go index 34165b8..1edb250 100644 --- a/cmd/ledis-load/main.go +++ b/cmd/ledis-load/main.go @@ -57,18 +57,5 @@ func loadDump(cfg *config.Config, ldb *ledis.Ledis) error { return err } - var head *ledis.BinLogAnchor - head, err = ldb.LoadDumpFile(*dumpPath) - - if err != nil { - return err - } - - //master enable binlog, here output this like mysql - if head.LogFileIndex != 0 && head.LogPos != 0 { - format := "MASTER_LOG_FILE='binlog.%07d', MASTER_LOG_POS=%d;\n" - fmt.Printf(format, head.LogFileIndex, head.LogPos) - } - - return nil + return ldb.LoadDumpFile(*dumpPath) } diff --git a/config/config.go b/config/config.go index 1464edf..b54da8b 100644 --- a/config/config.go +++ b/config/config.go @@ -16,14 +16,6 @@ const ( DefaultDataDir string = "./var" ) -const ( - MaxBinLogFileSize int = 1024 * 1024 * 1024 - MaxBinLogFileNum int = 10000 - - DefaultBinLogFileSize int = MaxBinLogFileSize - DefaultBinLogFileNum int = 10 -) - type LevelDBConfig struct { Compression bool `toml:"compression"` BlockSize int `toml:"block_size"` @@ -37,9 +29,8 @@ type LMDBConfig struct { NoSync bool `toml:"nosync"` } -type BinLogConfig struct { - MaxFileSize int `toml:"max_file_size"` - MaxFileNum int `toml:"max_file_num"` +type WALConfig struct { + Path string `toml:"path"` } type Config struct { @@ -52,11 +43,13 @@ type Config struct { DBName string `toml:"db_name"` DBPath string `toml:"db_path"` + UseWAL bool `toml:use_wal` + LevelDB LevelDBConfig `toml:"leveldb"` LMDB LMDBConfig `toml:"lmdb"` - BinLog BinLogConfig `toml:"binlog"` + WAL WALConfig `toml:wal` SlaveOf string `toml:"slaveof"` @@ -93,10 +86,6 @@ func NewConfigDefault() *Config { cfg.DBName = DefaultDBName - // disable binlog - cfg.BinLog.MaxFileNum = 0 - cfg.BinLog.MaxFileSize = 0 - // disable replication cfg.SlaveOf = "" @@ -126,17 +115,3 @@ func (cfg *LevelDBConfig) Adjust() { cfg.MaxOpenFiles = 1024 } } - -func (cfg *BinLogConfig) Adjust() { - if cfg.MaxFileSize <= 0 { - cfg.MaxFileSize = DefaultBinLogFileSize - } else if cfg.MaxFileSize > MaxBinLogFileSize { - cfg.MaxFileSize = MaxBinLogFileSize - } - - if cfg.MaxFileNum <= 0 { - cfg.MaxFileNum = DefaultBinLogFileNum - } else if cfg.MaxFileNum > MaxBinLogFileNum { - cfg.MaxFileNum = MaxBinLogFileNum - } -} diff --git a/config/config.toml b/config/config.toml index f271a70..ae08c47 100644 --- a/config/config.toml +++ b/config/config.toml @@ -30,6 +30,8 @@ db_name = "leveldb" # if not set, use data_dir/"db_name"_data db_path = "" +use_wal = true + [leveldb] compression = false block_size = 32768 @@ -41,8 +43,10 @@ max_open_files = 1024 map_size = 524288000 nosync = true -[binlog] -max_file_size = 0 -max_file_num = 0 +[wal] +# if not set, use data_dir/wal +path = "" + + diff --git a/etc/ledis.conf b/etc/ledis.conf index c0606eb..0d46aee 100644 --- a/etc/ledis.conf +++ b/etc/ledis.conf @@ -43,9 +43,8 @@ max_open_files = 1024 map_size = 524288000 nosync = true -[binlog] -# Set either size or num to 0 to disable binlog -max_file_size = 0 -max_file_num = 0 +[wal] +# if not set, use data_dir/wal +path = "" diff --git a/ledis/batch.go b/ledis/batch.go index b23cc47..6f97457 100644 --- a/ledis/batch.go +++ b/ledis/batch.go @@ -12,9 +12,11 @@ type batch struct { sync.Locker - logs [][]byte + eb *eventBatch tx *Tx + + noLogging bool } func (b *batch) Commit() error { @@ -23,17 +25,6 @@ func (b *batch) Commit() error { err := b.WriteBatch.Commit() - if b.l.binlog != nil { - if err == nil { - if b.tx == nil { - b.l.binlog.Log(b.logs...) - } else { - b.tx.logs = append(b.tx.logs, b.logs...) - } - } - b.logs = [][]byte{} - } - return err } @@ -42,29 +33,28 @@ func (b *batch) Lock() { } func (b *batch) Unlock() { - if b.l.binlog != nil { - b.logs = [][]byte{} - } + b.noLogging = false b.WriteBatch.Rollback() b.Locker.Unlock() } func (b *batch) Put(key []byte, value []byte) { - if b.l.binlog != nil { - buf := encodeBinLogPut(key, value) - b.logs = append(b.logs, buf) - } b.WriteBatch.Put(key, value) } func (b *batch) Delete(key []byte) { - if b.l.binlog != nil { - buf := encodeBinLogDelete(key) - b.logs = append(b.logs, buf) - } + b.WriteBatch.Delete(key) } +func (b *batch) LogEanbled() bool { + return !b.noLogging && b.l.log != nil +} + +func (b *batch) DisableLog(d bool) { + b.noLogging = d +} + type dbBatchLocker struct { l *sync.Mutex wrLock *sync.RWMutex @@ -100,6 +90,8 @@ func (l *Ledis) newBatch(wb store.WriteBatch, locker sync.Locker, tx *Tx) *batch b.tx = tx b.Locker = locker - b.logs = [][]byte{} + b.eb = new(eventBatch) + b.noLogging = false + return b } diff --git a/ledis/binlog.go b/ledis/binlog.go deleted file mode 100644 index f26323e..0000000 --- a/ledis/binlog.go +++ /dev/null @@ -1,400 +0,0 @@ -package ledis - -import ( - "bufio" - "encoding/binary" - "fmt" - "github.com/siddontang/go-log/log" - "github.com/siddontang/ledisdb/config" - "io" - "io/ioutil" - "os" - "path" - "strconv" - "strings" - "sync" - "time" -) - -type BinLogHead struct { - CreateTime uint32 - BatchId uint32 - PayloadLen uint32 -} - -func (h *BinLogHead) Len() int { - return 12 -} - -func (h *BinLogHead) Write(w io.Writer) error { - if err := binary.Write(w, binary.BigEndian, h.CreateTime); err != nil { - return err - } - - if err := binary.Write(w, binary.BigEndian, h.BatchId); err != nil { - return err - } - - if err := binary.Write(w, binary.BigEndian, h.PayloadLen); err != nil { - return err - } - - return nil -} - -func (h *BinLogHead) handleReadError(err error) error { - if err == io.EOF { - return io.ErrUnexpectedEOF - } else { - return err - } -} - -func (h *BinLogHead) Read(r io.Reader) error { - var err error - if err = binary.Read(r, binary.BigEndian, &h.CreateTime); err != nil { - return err - } - - if err = binary.Read(r, binary.BigEndian, &h.BatchId); err != nil { - return h.handleReadError(err) - } - - if err = binary.Read(r, binary.BigEndian, &h.PayloadLen); err != nil { - return h.handleReadError(err) - } - - return nil -} - -func (h *BinLogHead) InSameBatch(ho *BinLogHead) bool { - if h.CreateTime == ho.CreateTime && h.BatchId == ho.BatchId { - return true - } else { - return false - } -} - -/* -index file format: -ledis-bin.00001 -ledis-bin.00002 -ledis-bin.00003 - -log file format - -Log: Head|PayloadData - -Head: createTime|batchId|payloadData - -*/ - -type BinLog struct { - sync.Mutex - - path string - - cfg *config.BinLogConfig - - logFile *os.File - - logWb *bufio.Writer - - indexName string - logNames []string - lastLogIndex int64 - - batchId uint32 - - ch chan struct{} -} - -func NewBinLog(cfg *config.Config) (*BinLog, error) { - l := new(BinLog) - - l.cfg = &cfg.BinLog - l.cfg.Adjust() - - l.path = path.Join(cfg.DataDir, "binlog") - - if err := os.MkdirAll(l.path, 0755); err != nil { - return nil, err - } - - l.logNames = make([]string, 0, 16) - - l.ch = make(chan struct{}) - - if err := l.loadIndex(); err != nil { - return nil, err - } - - return l, nil -} - -func (l *BinLog) flushIndex() error { - data := strings.Join(l.logNames, "\n") - - bakName := fmt.Sprintf("%s.bak", l.indexName) - f, err := os.OpenFile(bakName, os.O_WRONLY|os.O_CREATE, 0666) - if err != nil { - log.Error("create binlog bak index error %s", err.Error()) - return err - } - - if _, err := f.WriteString(data); err != nil { - log.Error("write binlog index error %s", err.Error()) - f.Close() - return err - } - - f.Close() - - if err := os.Rename(bakName, l.indexName); err != nil { - log.Error("rename binlog bak index error %s", err.Error()) - return err - } - - return nil -} - -func (l *BinLog) loadIndex() error { - l.indexName = path.Join(l.path, fmt.Sprintf("ledis-bin.index")) - if _, err := os.Stat(l.indexName); os.IsNotExist(err) { - //no index file, nothing to do - } else { - indexData, err := ioutil.ReadFile(l.indexName) - if err != nil { - return err - } - - lines := strings.Split(string(indexData), "\n") - for _, line := range lines { - line = strings.Trim(line, "\r\n ") - if len(line) == 0 { - continue - } - - if _, err := os.Stat(path.Join(l.path, line)); err != nil { - log.Error("load index line %s error %s", line, err.Error()) - return err - } else { - l.logNames = append(l.logNames, line) - } - } - } - if l.cfg.MaxFileNum > 0 && len(l.logNames) > l.cfg.MaxFileNum { - //remove oldest logfile - if err := l.Purge(len(l.logNames) - l.cfg.MaxFileNum); err != nil { - return err - } - } - - var err error - if len(l.logNames) == 0 { - l.lastLogIndex = 1 - } else { - lastName := l.logNames[len(l.logNames)-1] - - if l.lastLogIndex, err = strconv.ParseInt(path.Ext(lastName)[1:], 10, 64); err != nil { - log.Error("invalid logfile name %s", err.Error()) - return err - } - - //like mysql, if server restart, a new binlog will create - l.lastLogIndex++ - } - - return nil -} - -func (l *BinLog) getLogFile() string { - return l.FormatLogFileName(l.lastLogIndex) -} - -func (l *BinLog) openNewLogFile() error { - var err error - lastName := l.getLogFile() - - logPath := path.Join(l.path, lastName) - if l.logFile, err = os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY, 0666); err != nil { - log.Error("open new logfile error %s", err.Error()) - return err - } - - if l.cfg.MaxFileNum > 0 && len(l.logNames) == l.cfg.MaxFileNum { - l.purge(1) - } - - l.logNames = append(l.logNames, lastName) - - if l.logWb == nil { - l.logWb = bufio.NewWriterSize(l.logFile, 1024) - } else { - l.logWb.Reset(l.logFile) - } - - if err = l.flushIndex(); err != nil { - return err - } - - return nil -} - -func (l *BinLog) checkLogFileSize() bool { - if l.logFile == nil { - return false - } - - st, _ := l.logFile.Stat() - if st.Size() >= int64(l.cfg.MaxFileSize) { - l.closeLog() - return true - } - - return false -} - -func (l *BinLog) closeLog() { - if l.logFile == nil { - return - } - - l.lastLogIndex++ - - l.logFile.Close() - l.logFile = nil -} - -func (l *BinLog) purge(n int) { - if len(l.logNames) < n { - n = len(l.logNames) - } - for i := 0; i < n; i++ { - logPath := path.Join(l.path, l.logNames[i]) - os.Remove(logPath) - } - - copy(l.logNames[0:], l.logNames[n:]) - l.logNames = l.logNames[0 : len(l.logNames)-n] -} - -func (l *BinLog) Close() { - if l.logFile != nil { - l.logFile.Close() - l.logFile = nil - } -} - -func (l *BinLog) LogNames() []string { - return l.logNames -} - -func (l *BinLog) LogFileName() string { - return l.getLogFile() -} - -func (l *BinLog) LogFilePos() int64 { - if l.logFile == nil { - return 0 - } else { - st, _ := l.logFile.Stat() - return st.Size() - } -} - -func (l *BinLog) LogFileIndex() int64 { - return l.lastLogIndex -} - -func (l *BinLog) FormatLogFileName(index int64) string { - return fmt.Sprintf("ledis-bin.%07d", index) -} - -func (l *BinLog) FormatLogFilePath(index int64) string { - return path.Join(l.path, l.FormatLogFileName(index)) -} - -func (l *BinLog) LogPath() string { - return l.path -} - -func (l *BinLog) Purge(n int) error { - l.Lock() - defer l.Unlock() - - if len(l.logNames) == 0 { - return nil - } - - if n >= len(l.logNames) { - n = len(l.logNames) - //can not purge current log file - if l.logNames[n-1] == l.getLogFile() { - n = n - 1 - } - } - - l.purge(n) - - return l.flushIndex() -} - -func (l *BinLog) PurgeAll() error { - l.Lock() - defer l.Unlock() - - l.closeLog() - - l.purge(len(l.logNames)) - - return l.openNewLogFile() -} - -func (l *BinLog) Log(args ...[]byte) error { - l.Lock() - defer l.Unlock() - - var err error - - if l.logFile == nil { - if err = l.openNewLogFile(); err != nil { - return err - } - } - - head := &BinLogHead{} - - head.CreateTime = uint32(time.Now().Unix()) - head.BatchId = l.batchId - - l.batchId++ - - for _, data := range args { - head.PayloadLen = uint32(len(data)) - - if err := head.Write(l.logWb); err != nil { - return err - } - - if _, err := l.logWb.Write(data); err != nil { - return err - } - } - - if err = l.logWb.Flush(); err != nil { - log.Error("write log error %s", err.Error()) - return err - } - - l.checkLogFileSize() - - close(l.ch) - l.ch = make(chan struct{}) - - return nil -} - -func (l *BinLog) Wait() <-chan struct{} { - return l.ch -} diff --git a/ledis/binlog_test.go b/ledis/binlog_test.go deleted file mode 100644 index ea62bd9..0000000 --- a/ledis/binlog_test.go +++ /dev/null @@ -1,49 +0,0 @@ -package ledis - -import ( - "github.com/siddontang/ledisdb/config" - "io/ioutil" - "os" - "testing" -) - -func TestBinLog(t *testing.T) { - cfg := new(config.Config) - - cfg.BinLog.MaxFileNum = 1 - cfg.BinLog.MaxFileSize = 1024 - cfg.DataDir = "/tmp/ledis_binlog" - - os.RemoveAll(cfg.DataDir) - - b, err := NewBinLog(cfg) - if err != nil { - t.Fatal(err) - } - - if err := b.Log(make([]byte, 1024)); err != nil { - t.Fatal(err) - } - - if err := b.Log(make([]byte, 1024)); err != nil { - t.Fatal(err) - } - - if fs, err := ioutil.ReadDir(b.LogPath()); err != nil { - t.Fatal(err) - } else if len(fs) != 2 { - t.Fatal(len(fs)) - } - - if err := b.PurgeAll(); err != nil { - t.Fatal(err) - } - - if fs, err := ioutil.ReadDir(b.LogPath()); err != nil { - t.Fatal(err) - } else if len(fs) != 2 { - t.Fatal(len(fs)) - } else if b.LogFilePos() != 0 { - t.Fatal(b.LogFilePos()) - } -} diff --git a/ledis/const.go b/ledis/const.go index e889f4e..9108736 100644 --- a/ledis/const.go +++ b/ledis/const.go @@ -81,12 +81,6 @@ var ( ErrScoreMiss = errors.New("zset score miss") ) -const ( - BinLogTypeDeletion uint8 = 0x0 - BinLogTypePut uint8 = 0x1 - BinLogTypeCommand uint8 = 0x2 -) - const ( DBAutoCommit uint8 = 0x0 DBInTransaction uint8 = 0x1 diff --git a/ledis/dump.go b/ledis/dump.go index f162481..6f3d81c 100644 --- a/ledis/dump.go +++ b/ledis/dump.go @@ -9,42 +9,6 @@ import ( "os" ) -//dump format -// fileIndex(bigendian int64)|filePos(bigendian int64) -// |keylen(bigendian int32)|key|valuelen(bigendian int32)|value...... -// -//key and value are both compressed for fast transfer dump on network using snappy - -type BinLogAnchor struct { - LogFileIndex int64 - LogPos int64 -} - -func (m *BinLogAnchor) WriteTo(w io.Writer) error { - if err := binary.Write(w, binary.BigEndian, m.LogFileIndex); err != nil { - return err - } - - if err := binary.Write(w, binary.BigEndian, m.LogPos); err != nil { - return err - } - return nil -} - -func (m *BinLogAnchor) ReadFrom(r io.Reader) error { - err := binary.Read(r, binary.BigEndian, &m.LogFileIndex) - if err != nil { - return err - } - - err = binary.Read(r, binary.BigEndian, &m.LogPos) - if err != nil { - return err - } - - return nil -} - func (l *Ledis) DumpFile(path string) error { f, err := os.Create(path) if err != nil { @@ -56,18 +20,11 @@ func (l *Ledis) DumpFile(path string) error { } func (l *Ledis) Dump(w io.Writer) error { - m := new(BinLogAnchor) - var err error l.wLock.Lock() defer l.wLock.Unlock() - if l.binlog != nil { - m.LogFileIndex = l.binlog.LogFileIndex() - m.LogPos = l.binlog.LogFilePos() - } - wb := bufio.NewWriterSize(w, 4096) if err = m.WriteTo(wb); err != nil { return err @@ -118,7 +75,7 @@ func (l *Ledis) Dump(w io.Writer) error { return nil } -func (l *Ledis) LoadDumpFile(path string) (*BinLogAnchor, error) { +func (l *Ledis) LoadDumpFile(path string) error { f, err := os.Open(path) if err != nil { return nil, err @@ -128,19 +85,12 @@ func (l *Ledis) LoadDumpFile(path string) (*BinLogAnchor, error) { return l.LoadDump(f) } -func (l *Ledis) LoadDump(r io.Reader) (*BinLogAnchor, error) { +func (l *Ledis) LoadDump(r io.Reader) error { l.wLock.Lock() defer l.wLock.Unlock() - info := new(BinLogAnchor) - rb := bufio.NewReaderSize(r, 4096) - err := info.ReadFrom(rb) - if err != nil { - return nil, err - } - var keyLen uint16 var valueLen uint32 @@ -154,33 +104,33 @@ func (l *Ledis) LoadDump(r io.Reader) (*BinLogAnchor, error) { for { if err = binary.Read(rb, binary.BigEndian, &keyLen); err != nil && err != io.EOF { - return nil, err + return err } else if err == io.EOF { break } if _, err = io.CopyN(&keyBuf, rb, int64(keyLen)); err != nil { - return nil, err + return err } if key, err = snappy.Decode(deKeyBuf, keyBuf.Bytes()); err != nil { - return nil, err + return err } if err = binary.Read(rb, binary.BigEndian, &valueLen); err != nil { - return nil, err + return err } if _, err = io.CopyN(&valueBuf, rb, int64(valueLen)); err != nil { - return nil, err + return err } if value, err = snappy.Decode(deValueBuf, valueBuf.Bytes()); err != nil { - return nil, err + return err } if err = l.ldb.Put(key, value); err != nil { - return nil, err + return err } keyBuf.Reset() @@ -190,10 +140,11 @@ func (l *Ledis) LoadDump(r io.Reader) (*BinLogAnchor, error) { deKeyBuf = nil deValueBuf = nil - //if binlog enable, we will delete all binlogs and open a new one for handling simply - if l.binlog != nil { - l.binlog.PurgeAll() + //to do remove all wal log + + if l.log != nil { + l.log.Clear() } - return info, nil + return nil } diff --git a/ledis/dump_test.go b/ledis/dump_test.go index e29d928..8a1b2fa 100644 --- a/ledis/dump_test.go +++ b/ledis/dump_test.go @@ -38,7 +38,7 @@ func TestDump(t *testing.T) { t.Fatal(err) } - if _, err := slave.LoadDumpFile("/tmp/testdb.dump"); err != nil { + if err := slave.LoadDumpFile("/tmp/testdb.dump"); err != nil { t.Fatal(err) } diff --git a/ledis/binlog_util.go b/ledis/event.go similarity index 63% rename from ledis/binlog_util.go rename to ledis/event.go index da058bd..674e9c7 100644 --- a/ledis/binlog_util.go +++ b/ledis/event.go @@ -1,97 +1,108 @@ package ledis import ( + "bytes" "encoding/binary" "errors" - "fmt" + "io" "strconv" ) -var ( - errBinLogDeleteType = errors.New("invalid bin log delete type") - errBinLogPutType = errors.New("invalid bin log put type") - errBinLogCommandType = errors.New("invalid bin log command type") +const ( + kTypeDeleteEvent uint8 = 0 + kTypePutEvent uint8 = 1 ) -func encodeBinLogDelete(key []byte) []byte { - buf := make([]byte, 1+len(key)) - buf[0] = BinLogTypeDeletion - copy(buf[1:], key) - return buf +var ( + errInvalidPutEvent = errors.New("invalid put event") + errInvalidDeleteEvent = errors.New("invalid delete event") + errInvalidEvent = errors.New("invalid event") +) + +type eventBatch struct { + bytes.Buffer } -func decodeBinLogDelete(sz []byte) ([]byte, error) { - if len(sz) < 1 || sz[0] != BinLogTypeDeletion { - return nil, errBinLogDeleteType +type event struct { + key []byte + value []byte //value = nil for delete event +} + +func (b *eventBatch) Put(key []byte, value []byte) { + l := uint32(len(key) + len(value) + 1 + 2) + binary.Write(b, binary.BigEndian, l) + b.WriteByte(kTypePutEvent) + keyLen := uint16(len(key)) + binary.Write(b, binary.BigEndian, keyLen) + b.Write(key) + b.Write(value) +} + +func (b *eventBatch) Delete(key []byte) { + l := uint32(len(key) + 1) + binary.Write(b, binary.BigEndian, l) + b.WriteByte(kTypeDeleteEvent) + b.Write(key) +} + +func decodeEventBatch(data []byte) (ev []event, err error) { + ev = make([]event, 0, 16) + for { + if len(data) == 0 { + return ev, nil + } + + if len(data) < 4 { + return nil, io.ErrUnexpectedEOF + } + + l := binary.BigEndian.Uint32(data) + data = data[4:] + if uint32(len(data)) < l { + return nil, io.ErrUnexpectedEOF + } + + var e event + if err := decodeEvent(&e, data[0:l]); err != nil { + return nil, err + } + ev = append(ev, e) + data = data[l:] + } +} + +func decodeEvent(e *event, b []byte) error { + if len(b) == 0 { + return errInvalidEvent } - return sz[1:], nil -} + switch b[0] { + case kTypePutEvent: + if len(b[1:]) < 2 { + return errInvalidPutEvent + } -func encodeBinLogPut(key []byte, value []byte) []byte { - buf := make([]byte, 3+len(key)+len(value)) - buf[0] = BinLogTypePut - pos := 1 - binary.BigEndian.PutUint16(buf[pos:], uint16(len(key))) - pos += 2 - copy(buf[pos:], key) - pos += len(key) - copy(buf[pos:], value) + keyLen := binary.BigEndian.Uint16(b[1:3]) + b = b[3:] + if len(b) < int(keyLen) { + return errInvalidPutEvent + } - return buf -} - -func decodeBinLogPut(sz []byte) ([]byte, []byte, error) { - if len(sz) < 3 || sz[0] != BinLogTypePut { - return nil, nil, errBinLogPutType - } - - keyLen := int(binary.BigEndian.Uint16(sz[1:])) - if 3+keyLen > len(sz) { - return nil, nil, errBinLogPutType - } - - return sz[3 : 3+keyLen], sz[3+keyLen:], nil -} - -func FormatBinLogEvent(event []byte) (string, error) { - logType := uint8(event[0]) - - var err error - var k []byte - var v []byte - - var buf []byte = make([]byte, 0, 1024) - - switch logType { - case BinLogTypePut: - k, v, err = decodeBinLogPut(event) - buf = append(buf, "PUT "...) - case BinLogTypeDeletion: - k, err = decodeBinLogDelete(event) - buf = append(buf, "DELETE "...) + e.key = b[0:keyLen] + e.value = b[keyLen:] + case kTypeDeleteEvent: + e.value = nil + e.key = b[1:] default: - err = errInvalidBinLogEvent + return errInvalidEvent } - if err != nil { - return "", err - } - - if buf, err = formatDataKey(buf, k); err != nil { - return "", err - } - - if v != nil && len(v) != 0 { - buf = append(buf, fmt.Sprintf(" %q", v)...) - } - - return String(buf), nil + return nil } -func formatDataKey(buf []byte, k []byte) ([]byte, error) { +func formatEventKey(buf []byte, k []byte) ([]byte, error) { if len(k) < 2 { - return nil, errInvalidBinLogEvent + return nil, errInvalidEvent } buf = append(buf, fmt.Sprintf("DB:%2d ", k[0])...) @@ -208,7 +219,7 @@ func formatDataKey(buf []byte, k []byte) ([]byte, error) { buf = strconv.AppendQuote(buf, String(key)) } default: - return nil, errInvalidBinLogEvent + return nil, errInvalidEvent } return buf, nil diff --git a/ledis/event_test.go b/ledis/event_test.go new file mode 100644 index 0000000..0349ea7 --- /dev/null +++ b/ledis/event_test.go @@ -0,0 +1,34 @@ +package ledis + +import ( + "reflect" + "testing" +) + +func TestEvent(t *testing.T) { + k1 := []byte("k1") + v1 := []byte("v1") + k2 := []byte("k2") + k3 := []byte("k3") + v3 := []byte("v3") + + b := new(eventBatch) + + b.Put(k1, v1) + b.Delete(k2) + b.Put(k3, v3) + + buf := b.Bytes() + + ev2 := []event{ + event{k1, v1}, + event{k2, nil}, + event{k3, v3}, + } + + if ev, err := decodeEventBatch(buf); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(ev, ev2) { + t.Fatal("not equal") + } +} diff --git a/ledis/ledis.go b/ledis/ledis.go index f3c1c8c..c4ac42f 100644 --- a/ledis/ledis.go +++ b/ledis/ledis.go @@ -5,6 +5,7 @@ import ( "github.com/siddontang/go-log/log" "github.com/siddontang/ledisdb/config" "github.com/siddontang/ledisdb/store" + "github.com/siddontang/ledisdb/wal" "sync" "time" ) @@ -18,10 +19,12 @@ type Ledis struct { quit chan struct{} jobs *sync.WaitGroup - binlog *BinLog + log wal.Store wLock sync.RWMutex //allow one write at same time commitLock sync.Mutex //allow one write commit at same time + + readOnly bool } func Open(cfg *config.Config) (*Ledis, error) { @@ -41,13 +44,10 @@ func Open(cfg *config.Config) (*Ledis, error) { l.ldb = ldb - if cfg.BinLog.MaxFileNum > 0 && cfg.BinLog.MaxFileSize > 0 { - l.binlog, err = NewBinLog(cfg) - if err != nil { + if cfg.UseWAL { + if l.log, err = wal.NewStore(cfg); err != nil { return nil, err } - } else { - l.binlog = nil } for i := uint8(0); i < MaxDBNumber; i++ { @@ -65,9 +65,9 @@ func (l *Ledis) Close() { l.ldb.Close() - if l.binlog != nil { - l.binlog.Close() - l.binlog = nil + if l.log != nil { + l.log.Close() + l.log = nil } } diff --git a/ledis/ledis_test.go b/ledis/ledis_test.go index d5a5476..45f1c7f 100644 --- a/ledis/ledis_test.go +++ b/ledis/ledis_test.go @@ -14,8 +14,6 @@ func getTestDB() *DB { f := func() { cfg := new(config.Config) cfg.DataDir = "/tmp/test_ledis" - // cfg.BinLog.MaxFileSize = 1073741824 - // cfg.BinLog.MaxFileNum = 3 os.RemoveAll(cfg.DataDir) diff --git a/ledis/t_hash.go b/ledis/t_hash.go index 8ee199e..952ddae 100644 --- a/ledis/t_hash.go +++ b/ledis/t_hash.go @@ -183,8 +183,6 @@ func (db *DB) HSet(key []byte, field []byte, value []byte) (int64, error) { return 0, err } - //todo add binlog - err = t.Commit() return n, err } diff --git a/ledis/t_kv.go b/ledis/t_kv.go index 1dd540a..fd13436 100644 --- a/ledis/t_kv.go +++ b/ledis/t_kv.go @@ -77,8 +77,6 @@ func (db *DB) incr(key []byte, delta int64) (int64, error) { t.Put(key, StrPutInt64(n)) - //todo binlog - err = t.Commit() return n, err } @@ -244,7 +242,6 @@ func (db *DB) MSet(args ...KVPair) error { t.Put(key, value) - //todo binlog } err = t.Commit() @@ -297,8 +294,6 @@ func (db *DB) SetNX(key []byte, value []byte) (int64, error) { } else { t.Put(key, value) - //todo binlog - err = t.Commit() } diff --git a/ledis/t_zset.go b/ledis/t_zset.go index 47af6ec..50fc6aa 100644 --- a/ledis/t_zset.go +++ b/ledis/t_zset.go @@ -305,7 +305,6 @@ func (db *DB) ZAdd(key []byte, args ...ScorePair) (int64, error) { return 0, err } - //todo add binlog err := t.Commit() return num, err } @@ -862,7 +861,6 @@ func (db *DB) ZUnionStore(destKey []byte, srcKeys [][]byte, weights []int64, agg sk := db.zEncodeSizeKey(destKey) t.Put(sk, PutInt64(num)) - //todo add binlog if err := t.Commit(); err != nil { return 0, err } @@ -930,7 +928,7 @@ func (db *DB) ZInterStore(destKey []byte, srcKeys [][]byte, weights []int64, agg var num int64 = int64(len(destMap)) sk := db.zEncodeSizeKey(destKey) t.Put(sk, PutInt64(num)) - //todo add binlog + if err := t.Commit(); err != nil { return 0, err } diff --git a/ledis/tx.go b/ledis/tx.go index 6339bae..2d96bd3 100644 --- a/ledis/tx.go +++ b/ledis/tx.go @@ -15,8 +15,6 @@ type Tx struct { *DB tx *store.Tx - - logs [][]byte } func (db *DB) IsTransaction() bool { @@ -71,10 +69,6 @@ func (tx *Tx) Commit() error { err := tx.tx.Commit() tx.tx = nil - if len(tx.logs) > 0 { - tx.l.binlog.Log(tx.logs...) - } - tx.l.commitLock.Unlock() tx.l.wLock.Unlock() diff --git a/wal/file_store.go b/wal/file_store.go index 0d39efb..5eb800f 100644 --- a/wal/file_store.go +++ b/wal/file_store.go @@ -1,25 +1,44 @@ package wal import ( + "fmt" + "github.com/siddontang/go-log/log" + "io/ioutil" "os" + "path" + "strconv" + "strings" "sync" ) const ( defaultMaxLogFileSize = 1024 * 1024 * 1024 - defaultMaxLogFileNum = 10 ) +/* +index file format: +ledis-bin.00001 +ledis-bin.00002 +ledis-bin.00003 +*/ + type FileStore struct { Store m sync.Mutex maxFileSize int - maxFileNum int first uint64 last uint64 + + logFile *os.File + logNames []string + nextLogIndex int64 + + indexName string + + path string } func NewFileStore(path string) (*FileStore, error) { @@ -29,12 +48,19 @@ func NewFileStore(path string) (*FileStore, error) { return nil, err } + s.path = path + s.maxFileSize = defaultMaxLogFileSize - s.maxFileNum = defaultMaxLogFileNum s.first = 0 s.last = 0 + s.logNames = make([]string, 0, 16) + + if err := s.loadIndex(); err != nil { + return nil, err + } + return s, nil } @@ -42,10 +68,6 @@ func (s *FileStore) SetMaxFileSize(size int) { s.maxFileSize = size } -func (s *FileStore) SetMaxFileNum(n int) { - s.maxFileNum = n -} - func (s *FileStore) GetLog(id uint64, log *Log) error { return nil } @@ -70,7 +92,11 @@ func (s *FileStore) StoreLogs(logs []*Log) error { return nil } -func (s *FileStore) DeleteRange(start, stop uint64) error { +func (s *FileStore) Purge(n uint64) error { + return nil +} + +func (s *FileStore) PuregeExpired(n int) error { return nil } @@ -81,3 +107,126 @@ func (s *FileStore) Clear() error { func (s *FileStore) Close() error { return nil } + +func (s *FileStore) flushIndex() error { + data := strings.Join(s.logNames, "\n") + + bakName := fmt.Sprintf("%s.bak", s.indexName) + f, err := os.OpenFile(bakName, os.O_WRONLY|os.O_CREATE, 0644) + if err != nil { + log.Error("create bak index error %s", err.Error()) + return err + } + + if _, err := f.WriteString(data); err != nil { + log.Error("write index error %s", err.Error()) + f.Close() + return err + } + + f.Close() + + if err := os.Rename(bakName, s.indexName); err != nil { + log.Error("rename bak index error %s", err.Error()) + return err + } + + return nil +} + +func (s *FileStore) fileExists(name string) bool { + p := path.Join(s.path, name) + _, err := os.Stat(p) + return !os.IsNotExist(err) +} + +func (s *FileStore) loadIndex() error { + s.indexName = path.Join(s.path, fmt.Sprintf("ledis-bin.index")) + if _, err := os.Stat(s.indexName); os.IsNotExist(err) { + //no index file, nothing to do + } else { + indexData, err := ioutil.ReadFile(s.indexName) + if err != nil { + return err + } + + lines := strings.Split(string(indexData), "\n") + for _, line := range lines { + line = strings.Trim(line, "\r\n ") + if len(line) == 0 { + continue + } + + if s.fileExists(line) { + s.logNames = append(s.logNames, line) + } else { + log.Info("log %s has not exists", line) + } + } + } + + var err error + if len(s.logNames) == 0 { + s.nextLogIndex = 1 + } else { + lastName := s.logNames[len(s.logNames)-1] + + if s.nextLogIndex, err = strconv.ParseInt(path.Ext(lastName)[1:], 10, 64); err != nil { + log.Error("invalid logfile name %s", err.Error()) + return err + } + + //like mysql, if server restart, a new log will create + s.nextLogIndex++ + } + + return nil +} + +func (s *FileStore) openNewLogFile() error { + var err error + lastName := s.formatLogFileName(s.nextLogIndex) + + logPath := path.Join(s.path, lastName) + if s.logFile, err = os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY, 0644); err != nil { + log.Error("open new logfile error %s", err.Error()) + return err + } + + s.logNames = append(s.logNames, lastName) + + if err = s.flushIndex(); err != nil { + return err + } + + return nil +} + +func (s *FileStore) checkLogFileSize() bool { + if s.logFile == nil { + return false + } + + st, _ := s.logFile.Stat() + if st.Size() >= int64(s.maxFileSize) { + s.closeLog() + return true + } + + return false +} + +func (s *FileStore) closeLog() { + if s.logFile == nil { + return + } + + s.nextLogIndex++ + + s.logFile.Close() + s.logFile = nil +} + +func (s *FileStore) formatLogFileName(index int64) string { + return fmt.Sprintf("ledis-bin.%07d", index) +} diff --git a/wal/goleveldb_store.go b/wal/goleveldb_store.go index 2c9b09d..4e78eb8 100644 --- a/wal/goleveldb_store.go +++ b/wal/goleveldb_store.go @@ -2,11 +2,13 @@ package wal import ( "bytes" + "fmt" "github.com/siddontang/go/num" "github.com/siddontang/ledisdb/config" "github.com/siddontang/ledisdb/store" "os" "sync" + "time" ) type GoLevelDBStore struct { @@ -132,7 +134,7 @@ func (s *GoLevelDBStore) StoreLogs(logs []*Log) error { return nil } -func (s *GoLevelDBStore) DeleteRange(min, max uint64) error { +func (s *GoLevelDBStore) Purge(n uint64) error { s.m.Lock() defer s.m.Unlock() @@ -149,25 +151,16 @@ func (s *GoLevelDBStore) DeleteRange(min, max uint64) error { return err } - min = num.MaxUint64(min, first) - max = num.MinUint64(max, last) + start := first + stop := num.MinUint64(last, first+n) w := s.db.NewWriteBatch() defer w.Rollback() - n := 0 - s.reset() - for i := min; i <= max; i++ { + for i := start; i < stop; i++ { w.Delete(num.Uint64ToBytes(i)) - n++ - if n > 1024 { - if err = w.Commit(); err != nil { - return err - } - n = 0 - } } if err = w.Commit(); err != nil { @@ -177,6 +170,44 @@ func (s *GoLevelDBStore) DeleteRange(min, max uint64) error { return nil } +func (s *GoLevelDBStore) PurgeExpired(n int) error { + if n <= 0 { + return fmt.Errorf("invalid expired time %d", n) + } + + t := uint32(time.Now().Unix() - int64(n)) + + s.m.Lock() + defer s.m.Unlock() + + s.reset() + + it := s.db.NewIterator() + it.SeekToFirst() + + w := s.db.NewWriteBatch() + defer w.Rollback() + + l := new(Log) + for ; it.Valid(); it.Next() { + v := it.RawValue() + + if err := l.Unmarshal(v); err != nil { + return err + } else if l.CreateTime > t { + break + } else { + w.Delete(it.RawKey()) + } + } + + if err := w.Commit(); err != nil { + return err + } + + return nil +} + func (s *GoLevelDBStore) Clear() error { s.m.Lock() defer s.m.Unlock() diff --git a/wal/log.go b/wal/log.go index d567c60..c150513 100644 --- a/wal/log.go +++ b/wal/log.go @@ -4,19 +4,31 @@ import ( "bytes" "encoding/binary" "io" + "time" ) type Log struct { ID uint64 CreateTime uint32 - // 0 for no compression - // 1 for snappy compression - Compression uint8 - Data []byte + + Data []byte +} + +func NewLog(id uint64, data []byte) *Log { + l := new(Log) + l.ID = id + l.CreateTime = uint32(time.Now().Unix()) + l.Data = data + + return l +} + +func (l *Log) HeadSize() int { + return 16 } func (l *Log) Marshal() ([]byte, error) { - buf := bytes.NewBuffer(make([]byte, 17+len(l.Data))) + buf := bytes.NewBuffer(make([]byte, l.HeadSize()+len(l.Data))) buf.Reset() if err := l.Encode(buf); err != nil { @@ -33,8 +45,7 @@ func (l *Log) Unmarshal(b []byte) error { } func (l *Log) Encode(w io.Writer) error { - length := uint32(17) - buf := make([]byte, length) + buf := make([]byte, l.HeadSize()) pos := 0 binary.BigEndian.PutUint64(buf[pos:], l.ID) @@ -43,9 +54,6 @@ func (l *Log) Encode(w io.Writer) error { binary.BigEndian.PutUint32(buf[pos:], l.CreateTime) pos += 4 - buf[pos] = l.Compression - pos++ - binary.BigEndian.PutUint32(buf[pos:], uint32(len(l.Data))) if n, err := w.Write(buf); err != nil { @@ -63,8 +71,7 @@ func (l *Log) Encode(w io.Writer) error { } func (l *Log) Decode(r io.Reader) error { - length := uint32(17) - buf := make([]byte, length) + buf := make([]byte, l.HeadSize()) if _, err := io.ReadFull(r, buf); err != nil { return err @@ -77,10 +84,7 @@ func (l *Log) Decode(r io.Reader) error { l.CreateTime = binary.BigEndian.Uint32(buf[pos:]) pos += 4 - l.Compression = buf[pos] - pos++ - - length = binary.BigEndian.Uint32(buf[pos:]) + length := binary.BigEndian.Uint32(buf[pos:]) l.Data = make([]byte, length) if _, err := io.ReadFull(r, l.Data); err != nil { diff --git a/wal/log_test.go b/wal/log_test.go index cfd8c22..46109cd 100644 --- a/wal/log_test.go +++ b/wal/log_test.go @@ -36,5 +36,4 @@ func TestLog(t *testing.T) { if !reflect.DeepEqual(l1, l2) { t.Fatal("must equal") } - } diff --git a/wal/store_test.go b/wal/store_test.go index 5b32f3e..030bff0 100644 --- a/wal/store_test.go +++ b/wal/store_test.go @@ -4,6 +4,7 @@ import ( "io/ioutil" "os" "testing" + "time" ) func TestGoLevelDBStore(t *testing.T) { @@ -103,12 +104,12 @@ func testLogs(t *testing.T, l Store) { } // Delete a suffix - if err := l.DeleteRange(5, 20); err != nil { + if err := l.Purge(5); err != nil { t.Fatalf("err: %v ", err) } // Verify they are all deleted - for i := 5; i <= 20; i++ { + for i := 1; i <= 5; i++ { if err := l.GetLog(uint64(i), &out); err != ErrLogNotFound { t.Fatalf("err: %v ", err) } @@ -119,14 +120,14 @@ func testLogs(t *testing.T, l Store) { if err != nil { t.Fatalf("err: %v ", err) } - if idx != 1 { + if idx != 6 { t.Fatalf("bad idx: %d", idx) } idx, err = l.LastID() if err != nil { t.Fatalf("err: %v ", err) } - if idx != 4 { + if idx != 20 { t.Fatalf("bad idx: %d", idx) } @@ -154,4 +155,35 @@ func testLogs(t *testing.T, l Store) { if idx != 0 { t.Fatalf("bad idx: %d", idx) } + + now := uint32(time.Now().Unix()) + logs = []*Log{} + for i := 1; i <= 20; i++ { + nl := &Log{ + ID: uint64(i), + CreateTime: now - 20, + Data: []byte("first"), + } + logs = append(logs, nl) + } + + if err := l.PurgeExpired(1); err != nil { + t.Fatal(err) + } + + idx, err = l.FirstID() + if err != nil { + t.Fatalf("err: %v ", err) + } + if idx != 0 { + t.Fatalf("bad idx: %d", idx) + } + + idx, err = l.LastID() + if err != nil { + t.Fatalf("err: %v ", err) + } + if idx != 0 { + t.Fatalf("bad idx: %d", idx) + } } diff --git a/wal/wal.go b/wal/wal.go index cc18870..b879619 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -2,6 +2,8 @@ package wal import ( "errors" + "github.com/siddontang/ledisdb/config" + "path" ) const ( @@ -26,11 +28,25 @@ type Store interface { StoreLog(log *Log) error StoreLogs(logs []*Log) error - // Delete logs [start, stop] - DeleteRange(start, stop uint64) error + // Delete first n logs + Purge(n uint64) error + + // Delete logs before n seconds + PurgeExpired(n int) error // Clear all logs Clear() error Close() error } + +func NewStore(cfg *config.Config) (Store, error) { + //now we only support goleveldb + + base := cfg.WAL.Path + if len(base) == 0 { + base = path.Join(cfg.DataDir, "wal") + } + + return NewGoLevelDBStore(base) +} From 8c70bbfdbec7c3c4bc5e8b82c2e91e3466adf05d Mon Sep 17 00:00:00 2001 From: siddontang Date: Wed, 17 Sep 2014 23:06:42 +0800 Subject: [PATCH 7/7] update, can not run at all --- bootstrap.sh | 2 ++ ledis/batch.go | 34 +++++++++++++++++++++++++++++++--- ledis/const.go | 5 ++++- ledis/ledis.go | 9 +++++---- ledis/meta.go | 36 ++++++++++++++++++++++++++++++++++++ ledis/replication.go | 4 ++-- ledis/scan.go | 20 ++++++++++---------- ledis/t_ttl.go | 4 +++- ledis/tx.go | 2 ++ ledis/util.go | 12 ++++++++++++ 10 files changed, 107 insertions(+), 21 deletions(-) create mode 100644 ledis/meta.go diff --git a/bootstrap.sh b/bootstrap.sh index ee260b7..a93c219 100755 --- a/bootstrap.sh +++ b/bootstrap.sh @@ -15,3 +15,5 @@ go get github.com/ugorji/go/codec go get github.com/BurntSushi/toml go get github.com/siddontang/go-bson/bson + +go get github.com/siddontang/go/num diff --git a/ledis/batch.go b/ledis/batch.go index 6f97457..f754116 100644 --- a/ledis/batch.go +++ b/ledis/batch.go @@ -20,29 +20,52 @@ type batch struct { } func (b *batch) Commit() error { + if b.l.replMode { + return ErrWriteInReplMode + } + b.l.commitLock.Lock() defer b.l.commitLock.Unlock() + if b.LogEanbled() { + + } + err := b.WriteBatch.Commit() return err } +// only use in expire cycle +func (b *batch) expireCommit() error { + b.l.commitLock.Lock() + defer b.l.commitLock.Unlock() + + return b.WriteBatch.Commit() +} + func (b *batch) Lock() { b.Locker.Lock() } func (b *batch) Unlock() { b.noLogging = false + b.eb.Reset() b.WriteBatch.Rollback() b.Locker.Unlock() } func (b *batch) Put(key []byte, value []byte) { + if b.LogEanbled() { + b.eb.Put(key, value) + } b.WriteBatch.Put(key, value) } func (b *batch) Delete(key []byte) { + if b.LogEanbled() { + b.eb.Delete(key) + } b.WriteBatch.Delete(key) } @@ -51,8 +74,9 @@ func (b *batch) LogEanbled() bool { return !b.noLogging && b.l.log != nil } -func (b *batch) DisableLog(d bool) { - b.noLogging = d +// only for expire cycle +func (b *batch) disableLog() { + b.noLogging = true } type dbBatchLocker struct { @@ -90,7 +114,11 @@ func (l *Ledis) newBatch(wb store.WriteBatch, locker sync.Locker, tx *Tx) *batch b.tx = tx b.Locker = locker - b.eb = new(eventBatch) + if tx != nil { + b.eb = tx.eb + } else { + b.eb = new(eventBatch) + } b.noLogging = false return b diff --git a/ledis/const.go b/ledis/const.go index 9108736..7ed8b05 100644 --- a/ledis/const.go +++ b/ledis/const.go @@ -23,6 +23,8 @@ const ( ExpTimeType byte = 101 ExpMetaType byte = 102 + + MetaType byte = 201 ) var ( @@ -78,7 +80,8 @@ const ( ) var ( - ErrScoreMiss = errors.New("zset score miss") + ErrScoreMiss = errors.New("zset score miss") + ErrWriteInReplMode = errors.New("write in replication mode") ) const ( diff --git a/ledis/ledis.go b/ledis/ledis.go index c4ac42f..666829d 100644 --- a/ledis/ledis.go +++ b/ledis/ledis.go @@ -24,7 +24,7 @@ type Ledis struct { wLock sync.RWMutex //allow one write at same time commitLock sync.Mutex //allow one write commit at same time - readOnly bool + replMode bool } func Open(cfg *config.Config) (*Ledis, error) { @@ -89,9 +89,10 @@ func (l *Ledis) FlushAll() error { return nil } -// very dangerous to use -func (l *Ledis) DataDB() *store.DB { - return l.ldb +// for replication mode, any write operations will fail, +// except clear expired data in expire cycle +func (l *Ledis) SetReplictionMode(b bool) { + l.replMode = b } func (l *Ledis) activeExpireCycle() { diff --git a/ledis/meta.go b/ledis/meta.go new file mode 100644 index 0000000..1aec07e --- /dev/null +++ b/ledis/meta.go @@ -0,0 +1,36 @@ +package ledis + +import ( + "github.com/siddontang/go/num" +) + +var ( + lastCommitIDKey = []byte{} +) + +func init() { + f := func(name string) []byte { + b := make([]byte, 0, 2+len(name)) + b = append(b, []byte{255, MetaType}...) + b = append(b, name...) + return b + } + + lastCommitIDKey = f("last_commit_id") +} + +func (l *Ledis) GetLastCommitID() (uint64, error) { + return Uint64(l.ldb.Get(lastCommitIDKey)) +} + +func (l *Ledis) GetLastLogID() (uint64, error) { + if l.log == nil { + return 0, nil + } + + return l.log.LastID() +} + +func setLastCommitID(t *batch, id uint64) { + t.Put(lastCommitIDKey, num.Uint64ToBytes(id)) +} diff --git a/ledis/replication.go b/ledis/replication.go index 804573d..fa12f1b 100644 --- a/ledis/replication.go +++ b/ledis/replication.go @@ -5,7 +5,7 @@ import ( "bytes" "errors" "github.com/siddontang/go-log/log" - "github.com/siddontang/ledisdb/store/driver" + "github.com/siddontang/ledisdb/store" "io" "os" "time" @@ -26,7 +26,7 @@ var ( ) type replBatch struct { - wb driver.IWriteBatch + wb store.WriteBatch events [][]byte l *Ledis diff --git a/ledis/scan.go b/ledis/scan.go index 09e2b5c..f7fca13 100644 --- a/ledis/scan.go +++ b/ledis/scan.go @@ -24,17 +24,17 @@ func (db *DB) scan(dataType byte, key []byte, count int, inclusive bool, match s if err = checkKeySize(key); err != nil { return nil, err } - if minKey, err = db.encodeMetaKey(dataType, key); err != nil { + if minKey, err = db.encodeScanKey(dataType, key); err != nil { return nil, err } } else { - if minKey, err = db.encodeMinKey(dataType); err != nil { + if minKey, err = db.encodeScanMinKey(dataType); err != nil { return nil, err } } - if maxKey, err = db.encodeMaxKey(dataType); err != nil { + if maxKey, err = db.encodeScanMaxKey(dataType); err != nil { return nil, err } @@ -54,7 +54,7 @@ func (db *DB) scan(dataType byte, key []byte, count int, inclusive bool, match s } for i := 0; it.Valid() && i < count && bytes.Compare(it.RawKey(), maxKey) < 0; it.Next() { - if k, err := db.decodeMetaKey(dataType, it.Key()); err != nil { + if k, err := db.decodeScanKey(dataType, it.Key()); err != nil { continue } else if r != nil && !r.Match(k) { continue @@ -67,12 +67,12 @@ func (db *DB) scan(dataType byte, key []byte, count int, inclusive bool, match s return v, nil } -func (db *DB) encodeMinKey(dataType byte) ([]byte, error) { - return db.encodeMetaKey(dataType, nil) +func (db *DB) encodeScanMinKey(dataType byte) ([]byte, error) { + return db.encodeScanKey(dataType, nil) } -func (db *DB) encodeMaxKey(dataType byte) ([]byte, error) { - k, err := db.encodeMetaKey(dataType, nil) +func (db *DB) encodeScanMaxKey(dataType byte) ([]byte, error) { + k, err := db.encodeScanKey(dataType, nil) if err != nil { return nil, err } @@ -80,7 +80,7 @@ func (db *DB) encodeMaxKey(dataType byte) ([]byte, error) { return k, nil } -func (db *DB) encodeMetaKey(dataType byte, key []byte) ([]byte, error) { +func (db *DB) encodeScanKey(dataType byte, key []byte) ([]byte, error) { switch dataType { case KVType: return db.encodeKVKey(key), nil @@ -98,7 +98,7 @@ func (db *DB) encodeMetaKey(dataType byte, key []byte) ([]byte, error) { return nil, errDataType } } -func (db *DB) decodeMetaKey(dataType byte, ek []byte) ([]byte, error) { +func (db *DB) decodeScanKey(dataType byte, ek []byte) ([]byte, error) { if len(ek) < 2 || ek[0] != db.index || ek[1] != dataType { return nil, errMetaKey } diff --git a/ledis/t_ttl.go b/ledis/t_ttl.go index 3d12606..a2e9cba 100644 --- a/ledis/t_ttl.go +++ b/ledis/t_ttl.go @@ -174,6 +174,8 @@ func (eli *elimination) active() { t.Lock() + t.disableLog() + if exp, err := Int64(dbGet(mk)); err == nil { // check expire again if exp <= now { @@ -181,7 +183,7 @@ func (eli *elimination) active() { t.Delete(tk) t.Delete(mk) - t.Commit() + t.expireCommit() } } diff --git a/ledis/tx.go b/ledis/tx.go index 2d96bd3..2966c16 100644 --- a/ledis/tx.go +++ b/ledis/tx.go @@ -15,6 +15,8 @@ type Tx struct { *DB tx *store.Tx + + eb *eventBatch } func (db *DB) IsTransaction() bool { diff --git a/ledis/util.go b/ledis/util.go index 770bca1..258c972 100644 --- a/ledis/util.go +++ b/ledis/util.go @@ -43,6 +43,18 @@ func Int64(v []byte, err error) (int64, error) { return int64(binary.LittleEndian.Uint64(v)), nil } +func Uint64(v []byte, err error) (uint64, error) { + if err != nil { + return 0, err + } else if v == nil || len(v) == 0 { + return 0, nil + } else if len(v) != 8 { + return 0, errIntNumber + } + + return binary.LittleEndian.Uint64(v), nil +} + func PutInt64(v int64) []byte { var b []byte pbytes := (*reflect.SliceHeader)(unsafe.Pointer(&b))