add support for read and print bin log

This commit is contained in:
siddontang 2014-07-11 13:28:34 +08:00
parent a41b1ef669
commit eec14806d2
2 changed files with 169 additions and 4 deletions

View File

@ -3,6 +3,8 @@ package ledis
import (
"encoding/binary"
"errors"
"fmt"
"strconv"
)
var (
@ -60,3 +62,149 @@ func encodeBinLogCommand(commandType uint8, args ...[]byte) []byte {
func decodeBinLogCommand(sz []byte) (uint8, [][]byte, error) {
return 0, nil, errBinLogCommandType
}
func FormatBinLogEvent(event []byte) (string, error) {
logType := uint8(event[0])
var err error
var k []byte
var v []byte
var buf []byte = make([]byte, 0, 1024)
switch logType {
case BinLogTypePut:
k, v, err = decodeBinLogPut(event)
buf = append(buf, "PUT "...)
case BinLogTypeDeletion:
k, err = decodeBinLogDelete(event)
buf = append(buf, "DELETE "...)
default:
err = errInvalidBinLogEvent
}
if err != nil {
return "", err
}
if buf, err = formatDataKey(buf, k); err != nil {
return "", err
}
if v != nil && len(v) != 0 {
buf = append(buf, fmt.Sprintf(" %q", v)...)
}
return String(buf), nil
}
func formatDataKey(buf []byte, k []byte) ([]byte, error) {
if len(k) < 2 {
return nil, errInvalidBinLogEvent
}
buf = append(buf, fmt.Sprintf("DB:%2d ", k[0])...)
buf = append(buf, fmt.Sprintf("%s ", TypeName[k[1]])...)
db := new(DB)
db.index = k[0]
//to do format at respective place
switch k[1] {
case KVType:
if key, err := db.decodeKVKey(k); err != nil {
return nil, err
} else {
buf = strconv.AppendQuote(buf, String(key))
}
case HashType:
if key, field, err := db.hDecodeHashKey(k); err != nil {
return nil, err
} else {
buf = strconv.AppendQuote(buf, String(key))
buf = append(buf, ' ')
buf = strconv.AppendQuote(buf, String(field))
}
case HSizeType:
if key, err := db.hDecodeSizeKey(k); err != nil {
return nil, err
} else {
buf = strconv.AppendQuote(buf, String(key))
}
case ListType:
if key, seq, err := db.lDecodeListKey(k); err != nil {
return nil, err
} else {
buf = strconv.AppendQuote(buf, String(key))
buf = append(buf, ' ')
buf = strconv.AppendInt(buf, int64(seq), 10)
}
case LMetaType:
if key, err := db.lDecodeMetaKey(k); err != nil {
return nil, err
} else {
buf = strconv.AppendQuote(buf, String(key))
}
case ZSetType:
if key, m, err := db.zDecodeSetKey(k); err != nil {
return nil, err
} else {
buf = strconv.AppendQuote(buf, String(key))
buf = append(buf, ' ')
buf = strconv.AppendQuote(buf, String(m))
}
case ZSizeType:
if key, err := db.zDecodeSizeKey(k); err != nil {
return nil, err
} else {
buf = strconv.AppendQuote(buf, String(key))
}
case ZScoreType:
if key, m, score, err := db.zDecodeScoreKey(k); err != nil {
return nil, err
} else {
buf = strconv.AppendQuote(buf, String(key))
buf = append(buf, ' ')
buf = strconv.AppendQuote(buf, String(m))
buf = append(buf, ' ')
buf = strconv.AppendInt(buf, score, 10)
}
case BitType:
if key, seq, err := db.bDecodeBinKey(k); err != nil {
return nil, err
} else {
buf = strconv.AppendQuote(buf, String(key))
buf = append(buf, ' ')
buf = strconv.AppendUint(buf, uint64(seq), 10)
}
case BitMetaType:
if key, err := db.bDecodeMetaKey(k); err != nil {
return nil, err
} else {
buf = strconv.AppendQuote(buf, String(key))
}
case ExpTimeType:
if tp, key, t, err := db.expDecodeTimeKey(k); err != nil {
return nil, err
} else {
buf = append(buf, TypeName[tp]...)
buf = append(buf, ' ')
buf = strconv.AppendQuote(buf, String(key))
buf = append(buf, ' ')
buf = strconv.AppendInt(buf, t, 10)
}
case ExpMetaType:
if tp, key, err := db.expDecodeMetaKey(k); err != nil {
return nil, err
} else {
buf = append(buf, TypeName[tp]...)
buf = append(buf, ' ')
buf = strconv.AppendQuote(buf, String(key))
}
default:
return nil, errInvalidBinLogEvent
}
return buf, nil
}

View File

@ -10,6 +10,10 @@ import (
"os"
)
var (
ErrSkipEvent = errors.New("skip to next event")
)
var (
errInvalidBinLogEvent = errors.New("invalid binglog event")
errInvalidBinLogFile = errors.New("invalid binlog file")
@ -71,7 +75,7 @@ func (l *Ledis) replicateCommandEvent(event []byte) error {
return errors.New("command event not supported now")
}
func (l *Ledis) ReplicateFromReader(rb io.Reader) error {
func ReadEventFromReader(rb io.Reader, f func(createTime uint32, event []byte) error) error {
var createTime uint32
var dataLen uint32
var dataBuf bytes.Buffer
@ -94,9 +98,9 @@ func (l *Ledis) ReplicateFromReader(rb io.Reader) error {
return err
}
err = l.ReplicateEvent(dataBuf.Bytes())
if err != nil {
log.Fatal("replication error %s, skip to next", err.Error())
err = f(createTime, dataBuf.Bytes())
if err != nil && err != ErrSkipEvent {
return err
}
dataBuf.Reset()
@ -105,6 +109,19 @@ func (l *Ledis) ReplicateFromReader(rb io.Reader) error {
return nil
}
func (l *Ledis) ReplicateFromReader(rb io.Reader) error {
f := func(createTime uint32, event []byte) error {
err := l.ReplicateEvent(event)
if err != nil {
log.Fatal("replication error %s, skip to next", err.Error())
return ErrSkipEvent
}
return nil
}
return ReadEventFromReader(rb, f)
}
func (l *Ledis) ReplicateFromData(data []byte) error {
rb := bytes.NewReader(data)