add base bin log support

This commit is contained in:
siddontang 2014-05-27 16:05:24 +08:00
parent bd38e693f2
commit f66ffb18dc
6 changed files with 419 additions and 6 deletions

273
ledis/binlog.go Normal file
View File

@ -0,0 +1,273 @@
package ledis
import (
"bufio"
"encoding/binary"
"encoding/json"
"fmt"
"github.com/siddontang/go-log/log"
"io"
"os"
"path"
"strconv"
"strings"
"sync"
"time"
)
/*
index file format:
ledis-bin.00001
ledis-bin.00002
ledis-bin.00003
log file format
timestamp(bigendian uint32, seconds)|PayloadLen(bigendian uint32)|PayloadData|LogId
*/
type BinLogConfig struct {
Path string `json:"path"`
MaxFileSize int `json:"max_file_size"`
MaxFileNum int `json:"max_file_num"`
}
func (cfg *BinLogConfig) adjust() {
if cfg.MaxFileSize <= 0 {
cfg.MaxFileSize = DefaultBinLogFileSize
} else if cfg.MaxFileSize > MaxBinLogFileSize {
cfg.MaxFileSize = MaxBinLogFileSize
}
if cfg.MaxFileNum <= 0 {
cfg.MaxFileNum = DefaultBinLogFileNum
} else if cfg.MaxFileNum > MaxBinLogFileNum {
cfg.MaxFileNum = MaxBinLogFileNum
}
}
type BinLog struct {
sync.Mutex
cfg *BinLogConfig
logFile *os.File
logWb *bufio.Writer
indexName string
logNames []string
lastLogIndex int
}
func NewBinLog(data json.RawMessage) (*BinLog, error) {
var cfg BinLogConfig
if err := json.Unmarshal(data, &cfg); err != nil {
return nil, err
}
return NewBinLogWithConfig(&cfg)
}
func NewBinLogWithConfig(cfg *BinLogConfig) (*BinLog, error) {
b := new(BinLog)
cfg.adjust()
b.cfg = cfg
if err := os.MkdirAll(cfg.Path, os.ModePerm); err != nil {
return nil, err
}
b.logNames = make([]string, 0, b.cfg.MaxFileNum)
if err := b.loadIndex(); err != nil {
return nil, err
}
return b, nil
}
func (b *BinLog) Close() {
if b.logFile != nil {
b.logFile.Close()
}
}
func (b *BinLog) deleteOldest() {
logPath := path.Join(b.cfg.Path, b.logNames[0])
os.Remove(logPath)
copy(b.logNames[0:], b.logNames[1:])
b.logNames = b.logNames[0 : len(b.logNames)-1]
}
func (b *BinLog) flushIndex() error {
data := strings.Join(b.logNames, "\n")
bakName := fmt.Sprintf("%s.bak", b.indexName)
f, err := os.OpenFile(bakName, os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
log.Error("create binlog bak index error %s", err.Error())
return err
}
if _, err := f.WriteString(data); err != nil {
log.Error("write binlog index error %s", err.Error())
f.Close()
return err
}
f.Close()
if err := os.Rename(bakName, b.indexName); err != nil {
log.Error("rename binlog bak index error %s", err.Error())
return err
}
return nil
}
func (b *BinLog) loadIndex() error {
b.indexName = path.Join(b.cfg.Path, BinLogIndexFile)
fd, err := os.OpenFile(b.indexName, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return err
}
//maybe we will check valid later?
rb := bufio.NewReader(fd)
for {
line, err := rb.ReadString('\n')
if err != nil && err != io.EOF {
fd.Close()
return err
}
line = strings.Trim(line, "\r\n ")
if len(line) > 0 {
b.logNames = append(b.logNames, line)
}
if len(b.logNames) == b.cfg.MaxFileNum {
//remove oldest logfile
b.deleteOldest()
}
if err == io.EOF {
break
}
}
fd.Close()
if err := b.flushIndex(); err != nil {
return err
}
if len(b.logNames) == 0 {
b.lastLogIndex = 1
} else {
lastName := b.logNames[len(b.logNames)-1]
if b.lastLogIndex, err = strconv.Atoi(path.Ext(lastName)[1:]); err != nil {
log.Error("invalid logfile name %s", err.Error())
return err
}
//like mysql, if server restart, a new binlog will create
b.lastLogIndex++
}
return nil
}
func (b *BinLog) getLogName() string {
return fmt.Sprintf("%s.%05d", BinLogBaseName, b.lastLogIndex)
}
func (b *BinLog) openNewLogFile() error {
var err error
lastName := b.getLogName()
logPath := path.Join(b.cfg.Path, lastName)
if b.logFile, err = os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY, 0666); err != nil {
log.Error("open new logfile error %s", err.Error())
return err
}
if len(b.logNames) == b.cfg.MaxFileNum {
b.deleteOldest()
}
b.logNames = append(b.logNames, lastName)
if b.logWb == nil {
b.logWb = bufio.NewWriterSize(b.logFile, 1024)
} else {
b.logWb.Reset(b.logFile)
}
if err = b.flushIndex(); err != nil {
return err
}
return nil
}
func (b *BinLog) openLogFile() error {
if b.logFile == nil {
return b.openNewLogFile()
} else {
//check file size
st, _ := b.logFile.Stat()
if st.Size() >= int64(b.cfg.MaxFileSize) {
//must use new file
b.lastLogIndex++
b.logFile.Close()
return b.openNewLogFile()
}
}
return nil
}
func (b *BinLog) Log(args ...[]byte) error {
var err error
for _, data := range args {
createTime := uint32(time.Now().Unix())
payLoadLen := len(data)
if err = b.openLogFile(); err != nil {
return err
}
binary.Write(b.logWb, binary.BigEndian, createTime)
binary.Write(b.logWb, binary.BigEndian, payLoadLen)
b.logWb.Write(data)
if err = b.logWb.Flush(); err != nil {
log.Error("write log error %s", err.Error())
return err
}
}
return nil
}
func (b *BinLog) SavePoint() (string, int64) {
if b.logFile == nil {
return "", 0
} else {
st, _ := b.logFile.Stat()
return b.logNames[len(b.logNames)-1], st.Size()
}
}

36
ledis/binlog_test.go Normal file
View File

@ -0,0 +1,36 @@
package ledis
import (
"io/ioutil"
"os"
"testing"
)
func TestBinLog(t *testing.T) {
cfg := new(BinLogConfig)
cfg.MaxFileNum = 1
cfg.MaxFileSize = 1024
cfg.Path = "/tmp/ledis_binlog"
os.RemoveAll(cfg.Path)
b, err := NewBinLogWithConfig(cfg)
if err != nil {
t.Fatal(err)
}
if err := b.Log(make([]byte, 1024)); err != nil {
t.Fatal(err)
}
if err := b.Log(make([]byte, 1024)); err != nil {
t.Fatal(err)
}
if fs, err := ioutil.ReadDir(cfg.Path); err != nil {
t.Fatal(err)
} else if len(fs) != 2 {
t.Fatal(len(fs))
}
}

View File

@ -38,3 +38,20 @@ var (
ErrHashFieldSize = errors.New("invalid hash field size") ErrHashFieldSize = errors.New("invalid hash field size")
ErrZSetMemberSize = errors.New("invalid zset member size") ErrZSetMemberSize = errors.New("invalid zset member size")
) )
const BinLogBaseName = "ledis-bin"
const BinLogIndexFile = "ledis-bin.index"
const (
MaxBinLogFileSize int = 1024 * 1024 * 1024
MaxBinLogFileNum int = 10000
DefaultBinLogFileSize int = MaxBinLogFileSize
DefaultBinLogFileNum int = 10
)
//like leveldb
const (
BinLogTypeDeletion uint8 = 0x0
BinLogTypeValue uint8 = 0x1
)

View File

@ -8,6 +8,8 @@ import (
type Config struct { type Config struct {
DataDB leveldb.Config `json:"data_db"` DataDB leveldb.Config `json:"data_db"`
BinLog BinLogConfig `json:"binlog"`
} }
type DB struct { type DB struct {
@ -26,6 +28,8 @@ type Ledis struct {
ldb *leveldb.DB ldb *leveldb.DB
dbs [MaxDBNumber]*DB dbs [MaxDBNumber]*DB
binlog *BinLog
} }
func Open(configJson json.RawMessage) (*Ledis, error) { func Open(configJson json.RawMessage) (*Ledis, error) {
@ -47,6 +51,15 @@ func OpenWithConfig(cfg *Config) (*Ledis, error) {
l := new(Ledis) l := new(Ledis)
l.ldb = ldb l.ldb = ldb
if len(cfg.BinLog.Path) > 0 {
l.binlog, err = NewBinLogWithConfig(&cfg.BinLog)
if err != nil {
return nil, err
}
} else {
l.binlog = nil
}
for i := uint8(0); i < MaxDBNumber; i++ { for i := uint8(0); i < MaxDBNumber; i++ {
l.dbs[i] = newDB(l, i) l.dbs[i] = newDB(l, i)
} }
@ -61,10 +74,10 @@ func newDB(l *Ledis, index uint8) *DB {
d.index = index d.index = index
d.kvTx = &tx{wb: d.db.NewWriteBatch()} d.kvTx = newTx(l)
d.listTx = &tx{wb: d.db.NewWriteBatch()} d.listTx = newTx(l)
d.hashTx = &tx{wb: d.db.NewWriteBatch()} d.hashTx = newTx(l)
d.zsetTx = &tx{wb: d.db.NewWriteBatch()} d.zsetTx = newTx(l)
return d return d
} }
@ -80,3 +93,16 @@ func (l *Ledis) Select(index int) (*DB, error) {
return l.dbs[index], nil return l.dbs[index], nil
} }
func (l *Ledis) Snapshot() (*leveldb.Snapshot, string, int64) {
if l.binlog == nil {
return l.ldb.NewSnapshot(), "", 0
} else {
l.binlog.Lock()
s := l.ldb.NewSnapshot()
fileName, offset := l.binlog.SavePoint()
l.binlog.Unlock()
return s, fileName, offset
}
}

View File

@ -18,6 +18,12 @@ func getTestDB() *DB {
"block_size" : 32768, "block_size" : 32768,
"write_buffer_size" : 2097152, "write_buffer_size" : 2097152,
"cache_size" : 20971520 "cache_size" : 20971520
},
"binlog" : {
"path" : "/tmp/testdb_binlog",
"max_file_size" : 1073741824,
"max_file_num" : 3
} }
} }
`) `)

View File

@ -1,6 +1,7 @@
package ledis package ledis
import ( import (
"encoding/binary"
"github.com/siddontang/go-leveldb/leveldb" "github.com/siddontang/go-leveldb/leveldb"
"sync" "sync"
) )
@ -9,6 +10,19 @@ type tx struct {
m sync.Mutex m sync.Mutex
wb *leveldb.WriteBatch wb *leveldb.WriteBatch
binlog *BinLog
batch [][]byte
}
func newTx(l *Ledis) *tx {
t := new(tx)
t.wb = l.ldb.NewWriteBatch()
t.batch = make([][]byte, 0, 4)
t.binlog = l.binlog
return t
} }
func (t *tx) Close() { func (t *tx) Close() {
@ -17,10 +31,36 @@ func (t *tx) Close() {
func (t *tx) Put(key []byte, value []byte) { func (t *tx) Put(key []byte, value []byte) {
t.wb.Put(key, value) t.wb.Put(key, value)
if t.binlog != nil {
buf := make([]byte, 9+len(key)+len(value))
buf[0] = BinLogTypeValue
pos := 1
binary.BigEndian.PutUint32(buf[pos:], uint32(len(key)))
pos += 4
copy(buf[pos:], key)
pos += len(key)
binary.BigEndian.PutUint32(buf[pos:], uint32(len(value)))
pos += 4
copy(buf[pos:], value)
t.batch = append(t.batch, buf)
}
} }
func (t *tx) Delete(key []byte) { func (t *tx) Delete(key []byte) {
t.wb.Delete(key) t.wb.Delete(key)
if t.binlog != nil {
buf := make([]byte, 5+len(key))
buf[0] = BinLogTypeDeletion
pos := 1
binary.BigEndian.PutUint32(buf[pos:], uint32(len(key)))
pos += 4
copy(buf[pos:], key)
t.batch = append(t.batch, buf)
}
} }
func (t *tx) Lock() { func (t *tx) Lock() {
@ -28,12 +68,27 @@ func (t *tx) Lock() {
} }
func (t *tx) Unlock() { func (t *tx) Unlock() {
t.batch = t.batch[0:0]
t.wb.Rollback() t.wb.Rollback()
t.m.Unlock() t.m.Unlock()
} }
func (t *tx) Commit() error { func (t *tx) Commit() error {
err := t.wb.Commit() var err error
if t.binlog != nil {
t.binlog.Lock()
err = t.wb.Commit()
if err != nil {
t.binlog.Unlock()
return err
}
err = t.binlog.Log(t.batch...)
t.binlog.Unlock()
} else {
err = t.wb.Commit()
}
return err return err
} }