diff --git a/wal/file_store.go b/wal/file_store.go new file mode 100644 index 0000000..0d39efb --- /dev/null +++ b/wal/file_store.go @@ -0,0 +1,83 @@ +package wal + +import ( + "os" + "sync" +) + +const ( + defaultMaxLogFileSize = 1024 * 1024 * 1024 + defaultMaxLogFileNum = 10 +) + +type FileStore struct { + Store + + m sync.Mutex + + maxFileSize int + maxFileNum int + + first uint64 + last uint64 +} + +func NewFileStore(path string) (*FileStore, error) { + s := new(FileStore) + + if err := os.MkdirAll(path, 0755); err != nil { + return nil, err + } + + s.maxFileSize = defaultMaxLogFileSize + s.maxFileNum = defaultMaxLogFileNum + + s.first = 0 + s.last = 0 + + return s, nil +} + +func (s *FileStore) SetMaxFileSize(size int) { + s.maxFileSize = size +} + +func (s *FileStore) SetMaxFileNum(n int) { + s.maxFileNum = n +} + +func (s *FileStore) GetLog(id uint64, log *Log) error { + return nil +} + +func (s *FileStore) SeekLog(id uint64, log *Log) error { + return nil +} + +func (s *FileStore) FirstID() (uint64, error) { + return 0, nil +} + +func (s *FileStore) LastID() (uint64, error) { + return 0, nil +} + +func (s *FileStore) StoreLog(log *Log) error { + return nil +} + +func (s *FileStore) StoreLogs(logs []*Log) error { + return nil +} + +func (s *FileStore) DeleteRange(start, stop uint64) error { + return nil +} + +func (s *FileStore) Clear() error { + return nil +} + +func (s *FileStore) Close() error { + return nil +} diff --git a/wal/gen.go b/wal/gen.go new file mode 100644 index 0000000..9d84938 --- /dev/null +++ b/wal/gen.go @@ -0,0 +1,137 @@ +package wal + +import ( + "encoding/binary" + "fmt" + "os" + "path" + "sync" +) + +type FileIDGenerator struct { + LogIDGenerator + + m sync.Mutex + f *os.File + + id uint64 +} + +func NewFileIDGenerator(base string) (*FileIDGenerator, error) { + if err := os.MkdirAll(base, 0755); err != nil { + return nil, err + } + + g := new(FileIDGenerator) + + name := path.Join(base, "log.id") + + var err error + if g.f, err = os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0644); err != nil { + return nil, err + } + + s, _ := g.f.Stat() + if s.Size() == 0 { + g.id = 0 + } else if s.Size() == 8 { + if err = binary.Read(g.f, binary.BigEndian, &g.id); err != nil { + g.f.Close() + return nil, err + } else if g.id == InvalidLogID { + g.f.Close() + return nil, fmt.Errorf("read invalid log id in %s", name) + } + } else { + g.f.Close() + return nil, fmt.Errorf("log id file %s is invalid", name) + } + + return g, nil +} + +func (g *FileIDGenerator) Reset(id uint64) error { + g.m.Lock() + defer g.m.Unlock() + + if g.f == nil { + return fmt.Errorf("generator closed") + } + + if g.id < id { + g.id = id + } + + return nil +} + +func (g *FileIDGenerator) GenerateID() (uint64, error) { + g.m.Lock() + defer g.m.Unlock() + + if g.f == nil { + return 0, fmt.Errorf("generator closed") + } + + if _, err := g.f.Seek(0, os.SEEK_SET); err != nil { + return 0, nil + } + + id := g.id + 1 + + if err := binary.Write(g.f, binary.BigEndian, id); err != nil { + return 0, nil + } + + g.id = id + + return id, nil +} + +func (g *FileIDGenerator) Close() error { + g.m.Lock() + defer g.m.Unlock() + + if g.f != nil { + err := g.f.Close() + g.f = nil + return err + } + return nil +} + +type MemIDGenerator struct { + m sync.Mutex + + LogIDGenerator + + id uint64 +} + +func NewMemIDGenerator(baseID uint64) *MemIDGenerator { + g := &MemIDGenerator{id: baseID} + return g +} + +func (g *MemIDGenerator) Reset(id uint64) error { + g.m.Lock() + defer g.m.Unlock() + + if g.id < id { + g.id = id + } + return nil +} + +func (g *MemIDGenerator) GenerateID() (uint64, error) { + g.m.Lock() + defer g.m.Unlock() + + g.id++ + id := g.id + return id, nil +} + +func (g *MemIDGenerator) Close() error { + return nil +} diff --git a/wal/gen_test.go b/wal/gen_test.go new file mode 100644 index 0000000..2f60999 --- /dev/null +++ b/wal/gen_test.go @@ -0,0 +1,48 @@ +package wal + +import ( + "io/ioutil" + "os" + "testing" +) + +func testGenerator(t *testing.T, g LogIDGenerator, base uint64) { + for i := base; i < base+100; i++ { + id, err := g.GenerateID() + if err != nil { + t.Fatal(err) + } else if id != i { + t.Fatal(id, i) + } + } +} + +func TestGenerator(t *testing.T) { + base, err := ioutil.TempDir("", "wal") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(base) + + var g *FileIDGenerator + if g, err = NewFileIDGenerator(base); err != nil { + t.Fatal(err) + } else { + testGenerator(t, g, 1) + if err = g.Close(); err != nil { + t.Fatal(err) + } + } + + if g, err = NewFileIDGenerator(base); err != nil { + t.Fatal(err) + } else { + testGenerator(t, g, 101) + if err = g.Close(); err != nil { + t.Fatal(err) + } + } + + m := NewMemIDGenerator(100) + testGenerator(t, m, 101) +} diff --git a/wal/goleveldb_store.go b/wal/goleveldb_store.go new file mode 100644 index 0000000..0510432 --- /dev/null +++ b/wal/goleveldb_store.go @@ -0,0 +1,229 @@ +package wal + +import ( + "bytes" + "github.com/siddontang/go/num" + "github.com/siddontang/ledisdb/config" + "github.com/siddontang/ledisdb/store" + "os" + "sync" +) + +type GoLevelDBStore struct { + m sync.Mutex + db *store.DB + + cfg *config.Config + + first uint64 + last uint64 +} + +func (s *GoLevelDBStore) FirstID() (uint64, error) { + s.m.Lock() + defer s.m.Unlock() + return s.firstID() +} + +func (s *GoLevelDBStore) LastID() (uint64, error) { + s.m.Lock() + defer s.m.Unlock() + return s.lastID() +} + +func (s *GoLevelDBStore) firstID() (uint64, error) { + if s.first != InvalidLogID { + return s.first, nil + } + + it := s.db.NewIterator() + defer it.Close() + + it.SeekToFirst() + + if it.Valid() { + s.first = num.BytesToUint64(it.RawKey()) + } + + return s.first, nil +} + +func (s *GoLevelDBStore) lastID() (uint64, error) { + if s.last != InvalidLogID { + return s.last, nil + } + + it := s.db.NewIterator() + defer it.Close() + + it.SeekToLast() + + if it.Valid() { + s.last = num.BytesToUint64(it.RawKey()) + } + + return s.last, nil +} + +func (s *GoLevelDBStore) GetLog(id uint64, log *Log) error { + v, err := s.db.Get(num.Uint64ToBytes(id)) + if err != nil { + return err + } else if v == nil { + return ErrLogNotFound + } else { + return log.Decode(bytes.NewBuffer(v)) + } +} + +func (s *GoLevelDBStore) SeekLog(id uint64, log *Log) error { + it := s.db.NewIterator() + defer it.Close() + + it.Seek(num.Uint64ToBytes(id)) + + if !it.Valid() { + return ErrLogNotFound + } else { + return log.Decode(bytes.NewBuffer(it.RawValue())) + } +} + +func (s *GoLevelDBStore) StoreLog(log *Log) error { + return s.StoreLogs([]*Log{log}) +} + +func (s *GoLevelDBStore) StoreLogs(logs []*Log) error { + s.m.Lock() + defer s.m.Unlock() + + w := s.db.NewWriteBatch() + defer w.Rollback() + + last := s.last + + s.reset() + + var buf bytes.Buffer + for _, log := range logs { + buf.Reset() + + if log.ID <= last { + return ErrLessLogID + } + + last = log.ID + key := num.Uint64ToBytes(log.ID) + + if err := log.Encode(&buf); err != nil { + return err + } + w.Put(key, buf.Bytes()) + } + + return w.Commit() +} + +func (s *GoLevelDBStore) DeleteRange(min, max uint64) error { + s.m.Lock() + defer s.m.Unlock() + + var first, last uint64 + var err error + + first, err = s.firstID() + if err != nil { + return err + } + + last, err = s.lastID() + if err != nil { + return err + } + + min = num.MaxUint64(min, first) + max = num.MinUint64(max, last) + + w := s.db.NewWriteBatch() + defer w.Rollback() + + n := 0 + + s.reset() + + for i := min; i <= max; i++ { + w.Delete(num.Uint64ToBytes(i)) + n++ + if n > 1024 { + if err = w.Commit(); err != nil { + return err + } + n = 0 + } + } + + if err = w.Commit(); err != nil { + return err + } + return nil +} + +func (s *GoLevelDBStore) Clear() error { + s.m.Lock() + defer s.m.Unlock() + + if s.db != nil { + s.db.Close() + } + + os.RemoveAll(s.cfg.DBPath) + + return s.open() +} + +func (s *GoLevelDBStore) reset() { + s.first = InvalidLogID + s.last = InvalidLogID +} + +func (s *GoLevelDBStore) Close() error { + s.m.Lock() + defer s.m.Unlock() + + if s.db == nil { + return nil + } + + err := s.db.Close() + s.db = nil + return err +} + +func (s *GoLevelDBStore) open() error { + var err error + + s.first = InvalidLogID + s.last = InvalidLogID + + s.db, err = store.Open(s.cfg) + return err +} + +func NewGoLevelDBStore(base string) (*GoLevelDBStore, error) { + cfg := new(config.Config) + cfg.DBName = "goleveldb" + cfg.DBPath = base + cfg.LevelDB.BlockSize = 4 * 1024 * 1024 + cfg.LevelDB.CacheSize = 16 * 1024 * 1024 + cfg.LevelDB.WriteBufferSize = 4 * 1024 * 1024 + cfg.LevelDB.Compression = false + + s := new(GoLevelDBStore) + s.cfg = cfg + + if err := s.open(); err != nil { + return nil, err + } + + return s, nil +} diff --git a/wal/log.go b/wal/log.go new file mode 100644 index 0000000..8ff8c95 --- /dev/null +++ b/wal/log.go @@ -0,0 +1,73 @@ +package wal + +import ( + "encoding/binary" + "io" +) + +type Log struct { + ID uint64 + CreateTime uint32 + // 0 for no compression + // 1 for snappy compression + Compression uint8 + Data []byte +} + +func (l *Log) Encode(w io.Writer) error { + length := uint32(17) + buf := make([]byte, length) + + pos := 0 + binary.BigEndian.PutUint64(buf[pos:], l.ID) + pos += 8 + + binary.BigEndian.PutUint32(buf[pos:], l.CreateTime) + pos += 4 + + buf[pos] = l.Compression + pos++ + + binary.BigEndian.PutUint32(buf[pos:], uint32(len(l.Data))) + + if n, err := w.Write(buf); err != nil { + return err + } else if n != len(buf) { + return io.ErrShortWrite + } + + if n, err := w.Write(l.Data); err != nil { + return err + } else if n != len(l.Data) { + return io.ErrShortWrite + } + return nil +} + +func (l *Log) Decode(r io.Reader) error { + length := uint32(17) + buf := make([]byte, length) + + if _, err := io.ReadFull(r, buf); err != nil { + return err + } + + pos := 0 + l.ID = binary.BigEndian.Uint64(buf[pos:]) + pos += 8 + + l.CreateTime = binary.BigEndian.Uint32(buf[pos:]) + pos += 4 + + l.Compression = buf[pos] + pos++ + + length = binary.BigEndian.Uint32(buf[pos:]) + + l.Data = make([]byte, length) + if _, err := io.ReadFull(r, l.Data); err != nil { + return err + } + + return nil +} diff --git a/wal/log_test.go b/wal/log_test.go new file mode 100644 index 0000000..ea5d91e --- /dev/null +++ b/wal/log_test.go @@ -0,0 +1,27 @@ +package wal + +import ( + "bytes" + "reflect" + "testing" +) + +func TestLog(t *testing.T) { + l1 := &Log{ID: 1, CreateTime: 100, Compression: 0, Data: []byte("hello world")} + + var buf bytes.Buffer + + if err := l1.Encode(&buf); err != nil { + t.Fatal(err) + } + + l2 := &Log{} + + if err := l2.Decode(&buf); err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(l1, l2) { + t.Fatal("must equal") + } +} diff --git a/wal/store_test.go b/wal/store_test.go new file mode 100644 index 0000000..45636c6 --- /dev/null +++ b/wal/store_test.go @@ -0,0 +1,137 @@ +package wal + +import ( + "io/ioutil" + "os" + "testing" +) + +func TestGoLevelDBStore(t *testing.T) { + // Create a test dir + dir, err := ioutil.TempDir("", "wal") + if err != nil { + t.Fatalf("err: %v ", err) + } + defer os.RemoveAll(dir) + + // New level + l, err := NewGoLevelDBStore(dir) + if err != nil { + t.Fatalf("err: %v ", err) + } + defer l.Close() + + testLogs(t, l) +} + +func testLogs(t *testing.T, l Store) { + // Should be no first index + idx, err := l.FirstID() + if err != nil { + t.Fatalf("err: %v ", err) + } + if idx != 0 { + t.Fatalf("bad idx: %d", idx) + } + + // Should be no last index + idx, err = l.LastID() + if err != nil { + t.Fatalf("err: %v ", err) + } + if idx != 0 { + t.Fatalf("bad idx: %d", idx) + } + + // Try a filed fetch + var out Log + if err := l.GetLog(10, &out); err.Error() != "log not found" { + t.Fatalf("err: %v ", err) + } + + // Write out a log + log := Log{ + ID: 1, + Data: []byte("first"), + } + for i := 1; i <= 10; i++ { + log.ID = uint64(i) + if err := l.StoreLog(&log); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Attempt to write multiple logs + var logs []*Log + for i := 11; i <= 20; i++ { + nl := &Log{ + ID: uint64(i), + Data: []byte("first"), + } + logs = append(logs, nl) + } + if err := l.StoreLogs(logs); err != nil { + t.Fatalf("err: %v", err) + } + + // Try to fetch + if err := l.GetLog(10, &out); err != nil { + t.Fatalf("err: %v ", err) + } + + // Try to fetch + if err := l.GetLog(20, &out); err != nil { + t.Fatalf("err: %v ", err) + } + + // Check the lowest index + idx, err = l.FirstID() + if err != nil { + t.Fatalf("err: %v ", err) + } + if idx != 1 { + t.Fatalf("bad idx: %d", idx) + } + + // Check the highest index + idx, err = l.LastID() + if err != nil { + t.Fatalf("err: %v ", err) + } + if idx != 20 { + t.Fatalf("bad idx: %d", idx) + } + + // Delete a suffix + if err := l.DeleteRange(5, 20); err != nil { + t.Fatalf("err: %v ", err) + } + + // Verify they are all deleted + for i := 5; i <= 20; i++ { + if err := l.GetLog(uint64(i), &out); err != ErrLogNotFound { + t.Fatalf("err: %v ", err) + } + } + + // Index should be one + idx, err = l.FirstID() + if err != nil { + t.Fatalf("err: %v ", err) + } + if idx != 1 { + t.Fatalf("bad idx: %d", idx) + } + idx, err = l.LastID() + if err != nil { + t.Fatalf("err: %v ", err) + } + if idx != 4 { + t.Fatalf("bad idx: %d", idx) + } + + // Should not be able to fetch + if err := l.GetLog(5, &out); err != ErrLogNotFound { + t.Fatalf("err: %v ", err) + } +} diff --git a/wal/wal.go b/wal/wal.go new file mode 100644 index 0000000..b32139c --- /dev/null +++ b/wal/wal.go @@ -0,0 +1,46 @@ +package wal + +import ( + "errors" +) + +const ( + InvalidLogID uint64 = 0 +) + +var ( + ErrLogNotFound = errors.New("log not found") + ErrLessLogID = errors.New("log id is less") +) + +type LogIDGenerator interface { + // Force reset to id, if current id is larger than id, nothing reset + Reset(id uint64) error + + // ID must be first at 1, and increased monotonously, 0 is invalid + GenerateID() (uint64, error) + + Close() error +} + +type Store interface { + GetLog(id uint64, log *Log) error + + // Get the first log which ID is equal or larger than id + SeekLog(id uint64, log *Log) error + + FirstID() (uint64, error) + LastID() (uint64, error) + + // if log id is less than current last id, return error + StoreLog(log *Log) error + StoreLogs(logs []*Log) error + + // Delete logs [start, stop] + DeleteRange(start, stop uint64) error + + // Clear all logs + Clear() error + + Close() error +}