diff --git a/ledis/binlog_util.go b/ledis/binlog_util.go index bc1cd63..766d3e6 100644 --- a/ledis/binlog_util.go +++ b/ledis/binlog_util.go @@ -3,6 +3,8 @@ package ledis import ( "encoding/binary" "errors" + "fmt" + "strconv" ) var ( @@ -60,3 +62,149 @@ func encodeBinLogCommand(commandType uint8, args ...[]byte) []byte { func decodeBinLogCommand(sz []byte) (uint8, [][]byte, error) { return 0, nil, errBinLogCommandType } + +func FormatBinLogEvent(event []byte) (string, error) { + logType := uint8(event[0]) + + var err error + var k []byte + var v []byte + + var buf []byte = make([]byte, 0, 1024) + + switch logType { + case BinLogTypePut: + k, v, err = decodeBinLogPut(event) + buf = append(buf, "PUT "...) + case BinLogTypeDeletion: + k, err = decodeBinLogDelete(event) + buf = append(buf, "DELETE "...) + default: + err = errInvalidBinLogEvent + } + + if err != nil { + return "", err + } + + if buf, err = formatDataKey(buf, k); err != nil { + return "", err + } + + if v != nil && len(v) != 0 { + buf = append(buf, fmt.Sprintf(" %q", v)...) + } + + return String(buf), nil +} + +func formatDataKey(buf []byte, k []byte) ([]byte, error) { + if len(k) < 2 { + return nil, errInvalidBinLogEvent + } + + buf = append(buf, fmt.Sprintf("DB:%2d ", k[0])...) + buf = append(buf, fmt.Sprintf("%s ", TypeName[k[1]])...) + + db := new(DB) + db.index = k[0] + + //to do format at respective place + + switch k[1] { + case KVType: + if key, err := db.decodeKVKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + } + case HashType: + if key, field, err := db.hDecodeHashKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + buf = append(buf, ' ') + buf = strconv.AppendQuote(buf, String(field)) + } + case HSizeType: + if key, err := db.hDecodeSizeKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + } + case ListType: + if key, seq, err := db.lDecodeListKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + buf = append(buf, ' ') + buf = strconv.AppendInt(buf, int64(seq), 10) + } + case LMetaType: + if key, err := db.lDecodeMetaKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + } + case ZSetType: + if key, m, err := db.zDecodeSetKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + buf = append(buf, ' ') + buf = strconv.AppendQuote(buf, String(m)) + } + case ZSizeType: + if key, err := db.zDecodeSizeKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + } + case ZScoreType: + if key, m, score, err := db.zDecodeScoreKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + buf = append(buf, ' ') + buf = strconv.AppendQuote(buf, String(m)) + buf = append(buf, ' ') + buf = strconv.AppendInt(buf, score, 10) + } + case BitType: + if key, seq, err := db.bDecodeBinKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + buf = append(buf, ' ') + buf = strconv.AppendUint(buf, uint64(seq), 10) + } + case BitMetaType: + if key, err := db.bDecodeMetaKey(k); err != nil { + return nil, err + } else { + buf = strconv.AppendQuote(buf, String(key)) + } + case ExpTimeType: + if tp, key, t, err := db.expDecodeTimeKey(k); err != nil { + return nil, err + } else { + buf = append(buf, TypeName[tp]...) + buf = append(buf, ' ') + buf = strconv.AppendQuote(buf, String(key)) + buf = append(buf, ' ') + buf = strconv.AppendInt(buf, t, 10) + } + case ExpMetaType: + if tp, key, err := db.expDecodeMetaKey(k); err != nil { + return nil, err + } else { + buf = append(buf, TypeName[tp]...) + buf = append(buf, ' ') + buf = strconv.AppendQuote(buf, String(key)) + } + default: + return nil, errInvalidBinLogEvent + } + + return buf, nil +} diff --git a/ledis/replication.go b/ledis/replication.go index e19da6a..bd6c192 100644 --- a/ledis/replication.go +++ b/ledis/replication.go @@ -10,6 +10,10 @@ import ( "os" ) +var ( + ErrSkipEvent = errors.New("skip to next event") +) + var ( errInvalidBinLogEvent = errors.New("invalid binglog event") errInvalidBinLogFile = errors.New("invalid binlog file") @@ -71,7 +75,7 @@ func (l *Ledis) replicateCommandEvent(event []byte) error { return errors.New("command event not supported now") } -func (l *Ledis) ReplicateFromReader(rb io.Reader) error { +func ReadEventFromReader(rb io.Reader, f func(createTime uint32, event []byte) error) error { var createTime uint32 var dataLen uint32 var dataBuf bytes.Buffer @@ -94,9 +98,9 @@ func (l *Ledis) ReplicateFromReader(rb io.Reader) error { return err } - err = l.ReplicateEvent(dataBuf.Bytes()) - if err != nil { - log.Fatal("replication error %s, skip to next", err.Error()) + err = f(createTime, dataBuf.Bytes()) + if err != nil && err != ErrSkipEvent { + return err } dataBuf.Reset() @@ -105,6 +109,19 @@ func (l *Ledis) ReplicateFromReader(rb io.Reader) error { return nil } +func (l *Ledis) ReplicateFromReader(rb io.Reader) error { + f := func(createTime uint32, event []byte) error { + err := l.ReplicateEvent(event) + if err != nil { + log.Fatal("replication error %s, skip to next", err.Error()) + return ErrSkipEvent + } + return nil + } + + return ReadEventFromReader(rb, f) +} + func (l *Ledis) ReplicateFromData(data []byte) error { rb := bytes.NewReader(data)