add base awl package

This commit is contained in:
siddontang 2014-09-15 22:42:25 +08:00
parent 95cbcc6460
commit 63e2437611
8 changed files with 780 additions and 0 deletions

83
wal/file_store.go Normal file
View File

@ -0,0 +1,83 @@
package wal
import (
"os"
"sync"
)
const (
defaultMaxLogFileSize = 1024 * 1024 * 1024
defaultMaxLogFileNum = 10
)
type FileStore struct {
Store
m sync.Mutex
maxFileSize int
maxFileNum int
first uint64
last uint64
}
func NewFileStore(path string) (*FileStore, error) {
s := new(FileStore)
if err := os.MkdirAll(path, 0755); err != nil {
return nil, err
}
s.maxFileSize = defaultMaxLogFileSize
s.maxFileNum = defaultMaxLogFileNum
s.first = 0
s.last = 0
return s, nil
}
func (s *FileStore) SetMaxFileSize(size int) {
s.maxFileSize = size
}
func (s *FileStore) SetMaxFileNum(n int) {
s.maxFileNum = n
}
func (s *FileStore) GetLog(id uint64, log *Log) error {
return nil
}
func (s *FileStore) SeekLog(id uint64, log *Log) error {
return nil
}
func (s *FileStore) FirstID() (uint64, error) {
return 0, nil
}
func (s *FileStore) LastID() (uint64, error) {
return 0, nil
}
func (s *FileStore) StoreLog(log *Log) error {
return nil
}
func (s *FileStore) StoreLogs(logs []*Log) error {
return nil
}
func (s *FileStore) DeleteRange(start, stop uint64) error {
return nil
}
func (s *FileStore) Clear() error {
return nil
}
func (s *FileStore) Close() error {
return nil
}

137
wal/gen.go Normal file
View File

@ -0,0 +1,137 @@
package wal
import (
"encoding/binary"
"fmt"
"os"
"path"
"sync"
)
type FileIDGenerator struct {
LogIDGenerator
m sync.Mutex
f *os.File
id uint64
}
func NewFileIDGenerator(base string) (*FileIDGenerator, error) {
if err := os.MkdirAll(base, 0755); err != nil {
return nil, err
}
g := new(FileIDGenerator)
name := path.Join(base, "log.id")
var err error
if g.f, err = os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0644); err != nil {
return nil, err
}
s, _ := g.f.Stat()
if s.Size() == 0 {
g.id = 0
} else if s.Size() == 8 {
if err = binary.Read(g.f, binary.BigEndian, &g.id); err != nil {
g.f.Close()
return nil, err
} else if g.id == InvalidLogID {
g.f.Close()
return nil, fmt.Errorf("read invalid log id in %s", name)
}
} else {
g.f.Close()
return nil, fmt.Errorf("log id file %s is invalid", name)
}
return g, nil
}
func (g *FileIDGenerator) Reset(id uint64) error {
g.m.Lock()
defer g.m.Unlock()
if g.f == nil {
return fmt.Errorf("generator closed")
}
if g.id < id {
g.id = id
}
return nil
}
func (g *FileIDGenerator) GenerateID() (uint64, error) {
g.m.Lock()
defer g.m.Unlock()
if g.f == nil {
return 0, fmt.Errorf("generator closed")
}
if _, err := g.f.Seek(0, os.SEEK_SET); err != nil {
return 0, nil
}
id := g.id + 1
if err := binary.Write(g.f, binary.BigEndian, id); err != nil {
return 0, nil
}
g.id = id
return id, nil
}
func (g *FileIDGenerator) Close() error {
g.m.Lock()
defer g.m.Unlock()
if g.f != nil {
err := g.f.Close()
g.f = nil
return err
}
return nil
}
type MemIDGenerator struct {
m sync.Mutex
LogIDGenerator
id uint64
}
func NewMemIDGenerator(baseID uint64) *MemIDGenerator {
g := &MemIDGenerator{id: baseID}
return g
}
func (g *MemIDGenerator) Reset(id uint64) error {
g.m.Lock()
defer g.m.Unlock()
if g.id < id {
g.id = id
}
return nil
}
func (g *MemIDGenerator) GenerateID() (uint64, error) {
g.m.Lock()
defer g.m.Unlock()
g.id++
id := g.id
return id, nil
}
func (g *MemIDGenerator) Close() error {
return nil
}

48
wal/gen_test.go Normal file
View File

@ -0,0 +1,48 @@
package wal
import (
"io/ioutil"
"os"
"testing"
)
func testGenerator(t *testing.T, g LogIDGenerator, base uint64) {
for i := base; i < base+100; i++ {
id, err := g.GenerateID()
if err != nil {
t.Fatal(err)
} else if id != i {
t.Fatal(id, i)
}
}
}
func TestGenerator(t *testing.T) {
base, err := ioutil.TempDir("", "wal")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(base)
var g *FileIDGenerator
if g, err = NewFileIDGenerator(base); err != nil {
t.Fatal(err)
} else {
testGenerator(t, g, 1)
if err = g.Close(); err != nil {
t.Fatal(err)
}
}
if g, err = NewFileIDGenerator(base); err != nil {
t.Fatal(err)
} else {
testGenerator(t, g, 101)
if err = g.Close(); err != nil {
t.Fatal(err)
}
}
m := NewMemIDGenerator(100)
testGenerator(t, m, 101)
}

229
wal/goleveldb_store.go Normal file
View File

@ -0,0 +1,229 @@
package wal
import (
"bytes"
"github.com/siddontang/go/num"
"github.com/siddontang/ledisdb/config"
"github.com/siddontang/ledisdb/store"
"os"
"sync"
)
type GoLevelDBStore struct {
m sync.Mutex
db *store.DB
cfg *config.Config
first uint64
last uint64
}
func (s *GoLevelDBStore) FirstID() (uint64, error) {
s.m.Lock()
defer s.m.Unlock()
return s.firstID()
}
func (s *GoLevelDBStore) LastID() (uint64, error) {
s.m.Lock()
defer s.m.Unlock()
return s.lastID()
}
func (s *GoLevelDBStore) firstID() (uint64, error) {
if s.first != InvalidLogID {
return s.first, nil
}
it := s.db.NewIterator()
defer it.Close()
it.SeekToFirst()
if it.Valid() {
s.first = num.BytesToUint64(it.RawKey())
}
return s.first, nil
}
func (s *GoLevelDBStore) lastID() (uint64, error) {
if s.last != InvalidLogID {
return s.last, nil
}
it := s.db.NewIterator()
defer it.Close()
it.SeekToLast()
if it.Valid() {
s.last = num.BytesToUint64(it.RawKey())
}
return s.last, nil
}
func (s *GoLevelDBStore) GetLog(id uint64, log *Log) error {
v, err := s.db.Get(num.Uint64ToBytes(id))
if err != nil {
return err
} else if v == nil {
return ErrLogNotFound
} else {
return log.Decode(bytes.NewBuffer(v))
}
}
func (s *GoLevelDBStore) SeekLog(id uint64, log *Log) error {
it := s.db.NewIterator()
defer it.Close()
it.Seek(num.Uint64ToBytes(id))
if !it.Valid() {
return ErrLogNotFound
} else {
return log.Decode(bytes.NewBuffer(it.RawValue()))
}
}
func (s *GoLevelDBStore) StoreLog(log *Log) error {
return s.StoreLogs([]*Log{log})
}
func (s *GoLevelDBStore) StoreLogs(logs []*Log) error {
s.m.Lock()
defer s.m.Unlock()
w := s.db.NewWriteBatch()
defer w.Rollback()
last := s.last
s.reset()
var buf bytes.Buffer
for _, log := range logs {
buf.Reset()
if log.ID <= last {
return ErrLessLogID
}
last = log.ID
key := num.Uint64ToBytes(log.ID)
if err := log.Encode(&buf); err != nil {
return err
}
w.Put(key, buf.Bytes())
}
return w.Commit()
}
func (s *GoLevelDBStore) DeleteRange(min, max uint64) error {
s.m.Lock()
defer s.m.Unlock()
var first, last uint64
var err error
first, err = s.firstID()
if err != nil {
return err
}
last, err = s.lastID()
if err != nil {
return err
}
min = num.MaxUint64(min, first)
max = num.MinUint64(max, last)
w := s.db.NewWriteBatch()
defer w.Rollback()
n := 0
s.reset()
for i := min; i <= max; i++ {
w.Delete(num.Uint64ToBytes(i))
n++
if n > 1024 {
if err = w.Commit(); err != nil {
return err
}
n = 0
}
}
if err = w.Commit(); err != nil {
return err
}
return nil
}
func (s *GoLevelDBStore) Clear() error {
s.m.Lock()
defer s.m.Unlock()
if s.db != nil {
s.db.Close()
}
os.RemoveAll(s.cfg.DBPath)
return s.open()
}
func (s *GoLevelDBStore) reset() {
s.first = InvalidLogID
s.last = InvalidLogID
}
func (s *GoLevelDBStore) Close() error {
s.m.Lock()
defer s.m.Unlock()
if s.db == nil {
return nil
}
err := s.db.Close()
s.db = nil
return err
}
func (s *GoLevelDBStore) open() error {
var err error
s.first = InvalidLogID
s.last = InvalidLogID
s.db, err = store.Open(s.cfg)
return err
}
func NewGoLevelDBStore(base string) (*GoLevelDBStore, error) {
cfg := new(config.Config)
cfg.DBName = "goleveldb"
cfg.DBPath = base
cfg.LevelDB.BlockSize = 4 * 1024 * 1024
cfg.LevelDB.CacheSize = 16 * 1024 * 1024
cfg.LevelDB.WriteBufferSize = 4 * 1024 * 1024
cfg.LevelDB.Compression = false
s := new(GoLevelDBStore)
s.cfg = cfg
if err := s.open(); err != nil {
return nil, err
}
return s, nil
}

73
wal/log.go Normal file
View File

@ -0,0 +1,73 @@
package wal
import (
"encoding/binary"
"io"
)
type Log struct {
ID uint64
CreateTime uint32
// 0 for no compression
// 1 for snappy compression
Compression uint8
Data []byte
}
func (l *Log) Encode(w io.Writer) error {
length := uint32(17)
buf := make([]byte, length)
pos := 0
binary.BigEndian.PutUint64(buf[pos:], l.ID)
pos += 8
binary.BigEndian.PutUint32(buf[pos:], l.CreateTime)
pos += 4
buf[pos] = l.Compression
pos++
binary.BigEndian.PutUint32(buf[pos:], uint32(len(l.Data)))
if n, err := w.Write(buf); err != nil {
return err
} else if n != len(buf) {
return io.ErrShortWrite
}
if n, err := w.Write(l.Data); err != nil {
return err
} else if n != len(l.Data) {
return io.ErrShortWrite
}
return nil
}
func (l *Log) Decode(r io.Reader) error {
length := uint32(17)
buf := make([]byte, length)
if _, err := io.ReadFull(r, buf); err != nil {
return err
}
pos := 0
l.ID = binary.BigEndian.Uint64(buf[pos:])
pos += 8
l.CreateTime = binary.BigEndian.Uint32(buf[pos:])
pos += 4
l.Compression = buf[pos]
pos++
length = binary.BigEndian.Uint32(buf[pos:])
l.Data = make([]byte, length)
if _, err := io.ReadFull(r, l.Data); err != nil {
return err
}
return nil
}

27
wal/log_test.go Normal file
View File

@ -0,0 +1,27 @@
package wal
import (
"bytes"
"reflect"
"testing"
)
func TestLog(t *testing.T) {
l1 := &Log{ID: 1, CreateTime: 100, Compression: 0, Data: []byte("hello world")}
var buf bytes.Buffer
if err := l1.Encode(&buf); err != nil {
t.Fatal(err)
}
l2 := &Log{}
if err := l2.Decode(&buf); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(l1, l2) {
t.Fatal("must equal")
}
}

137
wal/store_test.go Normal file
View File

@ -0,0 +1,137 @@
package wal
import (
"io/ioutil"
"os"
"testing"
)
func TestGoLevelDBStore(t *testing.T) {
// Create a test dir
dir, err := ioutil.TempDir("", "wal")
if err != nil {
t.Fatalf("err: %v ", err)
}
defer os.RemoveAll(dir)
// New level
l, err := NewGoLevelDBStore(dir)
if err != nil {
t.Fatalf("err: %v ", err)
}
defer l.Close()
testLogs(t, l)
}
func testLogs(t *testing.T, l Store) {
// Should be no first index
idx, err := l.FirstID()
if err != nil {
t.Fatalf("err: %v ", err)
}
if idx != 0 {
t.Fatalf("bad idx: %d", idx)
}
// Should be no last index
idx, err = l.LastID()
if err != nil {
t.Fatalf("err: %v ", err)
}
if idx != 0 {
t.Fatalf("bad idx: %d", idx)
}
// Try a filed fetch
var out Log
if err := l.GetLog(10, &out); err.Error() != "log not found" {
t.Fatalf("err: %v ", err)
}
// Write out a log
log := Log{
ID: 1,
Data: []byte("first"),
}
for i := 1; i <= 10; i++ {
log.ID = uint64(i)
if err := l.StoreLog(&log); err != nil {
t.Fatalf("err: %v", err)
}
}
// Attempt to write multiple logs
var logs []*Log
for i := 11; i <= 20; i++ {
nl := &Log{
ID: uint64(i),
Data: []byte("first"),
}
logs = append(logs, nl)
}
if err := l.StoreLogs(logs); err != nil {
t.Fatalf("err: %v", err)
}
// Try to fetch
if err := l.GetLog(10, &out); err != nil {
t.Fatalf("err: %v ", err)
}
// Try to fetch
if err := l.GetLog(20, &out); err != nil {
t.Fatalf("err: %v ", err)
}
// Check the lowest index
idx, err = l.FirstID()
if err != nil {
t.Fatalf("err: %v ", err)
}
if idx != 1 {
t.Fatalf("bad idx: %d", idx)
}
// Check the highest index
idx, err = l.LastID()
if err != nil {
t.Fatalf("err: %v ", err)
}
if idx != 20 {
t.Fatalf("bad idx: %d", idx)
}
// Delete a suffix
if err := l.DeleteRange(5, 20); err != nil {
t.Fatalf("err: %v ", err)
}
// Verify they are all deleted
for i := 5; i <= 20; i++ {
if err := l.GetLog(uint64(i), &out); err != ErrLogNotFound {
t.Fatalf("err: %v ", err)
}
}
// Index should be one
idx, err = l.FirstID()
if err != nil {
t.Fatalf("err: %v ", err)
}
if idx != 1 {
t.Fatalf("bad idx: %d", idx)
}
idx, err = l.LastID()
if err != nil {
t.Fatalf("err: %v ", err)
}
if idx != 4 {
t.Fatalf("bad idx: %d", idx)
}
// Should not be able to fetch
if err := l.GetLog(5, &out); err != ErrLogNotFound {
t.Fatalf("err: %v ", err)
}
}

46
wal/wal.go Normal file
View File

@ -0,0 +1,46 @@
package wal
import (
"errors"
)
const (
InvalidLogID uint64 = 0
)
var (
ErrLogNotFound = errors.New("log not found")
ErrLessLogID = errors.New("log id is less")
)
type LogIDGenerator interface {
// Force reset to id, if current id is larger than id, nothing reset
Reset(id uint64) error
// ID must be first at 1, and increased monotonously, 0 is invalid
GenerateID() (uint64, error)
Close() error
}
type Store interface {
GetLog(id uint64, log *Log) error
// Get the first log which ID is equal or larger than id
SeekLog(id uint64, log *Log) error
FirstID() (uint64, error)
LastID() (uint64, error)
// if log id is less than current last id, return error
StoreLog(log *Log) error
StoreLogs(logs []*Log) error
// Delete logs [start, stop]
DeleteRange(start, stop uint64) error
// Clear all logs
Clear() error
Close() error
}