forked from mirror/buntdb
Ignore incomplete tail commands and allow for null padding
This commit allows for BuntDB to load data files that were previously considered invalid or corrupted. Now when the data file ends with an incomplete command, the data will be truncated at the end of the previously success command. Also when a null control character is encountered instead of an asterix, which indicates the start of a command, the null is ignored and the cursor moves to the next byte. This allows for null padding at the head and the tail. Fixes #71 https://github.com/tidwall/tile38/issues/600
This commit is contained in:
parent
4f7ef48197
commit
d7ed6a747a
103
buntdb.go
103
buntdb.go
|
@ -69,7 +69,6 @@ type DB struct {
|
|||
keys *btree.BTree // a tree of all item ordered by key
|
||||
exps *btree.BTree // a tree of items ordered by expiration
|
||||
idxs map[string]*index // the index trees.
|
||||
exmgr bool // indicates that expires manager is running.
|
||||
flushes int // a count of the number of disk flushes
|
||||
closed bool // set when the database has been closed
|
||||
config Config // the database configuration
|
||||
|
@ -135,9 +134,6 @@ type exctx struct {
|
|||
db *DB
|
||||
}
|
||||
|
||||
// Default number of btree degrees
|
||||
const btreeDegrees = 64
|
||||
|
||||
// Open opens a database at the provided path.
|
||||
// If the file does not exist then it will be created automatically.
|
||||
func Open(path string) (*DB, error) {
|
||||
|
@ -241,7 +237,8 @@ func (db *DB) Load(rd io.Reader) error {
|
|||
// cannot load into databases that persist to disk
|
||||
return ErrPersistenceActive
|
||||
}
|
||||
return db.readLoad(rd, time.Now())
|
||||
_, err := db.readLoad(rd, time.Now())
|
||||
return err
|
||||
}
|
||||
|
||||
// index represents a b-tree or r-tree index and also acts as the
|
||||
|
@ -755,46 +752,65 @@ func (db *DB) Shrink() error {
|
|||
}()
|
||||
}
|
||||
|
||||
var errValidEOF = errors.New("valid eof")
|
||||
|
||||
// readLoad reads from the reader and loads commands into the database.
|
||||
// modTime is the modified time of the reader, should be no greater than
|
||||
// the current time.Now().
|
||||
func (db *DB) readLoad(rd io.Reader, modTime time.Time) error {
|
||||
// Returns the number of bytes of the last command read and the error if any.
|
||||
func (db *DB) readLoad(rd io.Reader, modTime time.Time) (n int64, err error) {
|
||||
defer func() {
|
||||
if err == io.EOF {
|
||||
err = io.ErrUnexpectedEOF
|
||||
}
|
||||
}()
|
||||
totalSize := int64(0)
|
||||
data := make([]byte, 4096)
|
||||
parts := make([]string, 0, 8)
|
||||
r := bufio.NewReader(rd)
|
||||
for {
|
||||
// peek at the first byte. If it's a 'nul' control character then
|
||||
// ignore it and move to the next byte.
|
||||
c, err := r.ReadByte()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
err = nil
|
||||
}
|
||||
return totalSize, err
|
||||
}
|
||||
if c == 0 {
|
||||
// ignore nul control characters
|
||||
n += 1
|
||||
continue
|
||||
}
|
||||
if err := r.UnreadByte(); err != nil {
|
||||
return totalSize, err
|
||||
}
|
||||
|
||||
// read a single command.
|
||||
// first we should read the number of parts that the of the command
|
||||
cmdByteSize := int64(0)
|
||||
line, err := r.ReadBytes('\n')
|
||||
if err != nil {
|
||||
if len(line) > 0 {
|
||||
// got an eof but also data. this should be an unexpected eof.
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
return err
|
||||
return totalSize, err
|
||||
}
|
||||
if line[0] != '*' {
|
||||
return ErrInvalid
|
||||
return totalSize, ErrInvalid
|
||||
}
|
||||
cmdByteSize += int64(len(line))
|
||||
|
||||
// convert the string number to and int
|
||||
var n int
|
||||
if len(line) == 4 && line[len(line)-2] == '\r' {
|
||||
if line[1] < '0' || line[1] > '9' {
|
||||
return ErrInvalid
|
||||
return totalSize, ErrInvalid
|
||||
}
|
||||
n = int(line[1] - '0')
|
||||
} else {
|
||||
if len(line) < 5 || line[len(line)-2] != '\r' {
|
||||
return ErrInvalid
|
||||
return totalSize, ErrInvalid
|
||||
}
|
||||
for i := 1; i < len(line)-2; i++ {
|
||||
if line[i] < '0' || line[i] > '9' {
|
||||
return ErrInvalid
|
||||
return totalSize, ErrInvalid
|
||||
}
|
||||
n = n*10 + int(line[i]-'0')
|
||||
}
|
||||
|
@ -805,25 +821,26 @@ func (db *DB) readLoad(rd io.Reader, modTime time.Time) error {
|
|||
// read the number of bytes of the part.
|
||||
line, err := r.ReadBytes('\n')
|
||||
if err != nil {
|
||||
return err
|
||||
return totalSize, err
|
||||
}
|
||||
if line[0] != '$' {
|
||||
return ErrInvalid
|
||||
return totalSize, ErrInvalid
|
||||
}
|
||||
cmdByteSize += int64(len(line))
|
||||
// convert the string number to and int
|
||||
var n int
|
||||
if len(line) == 4 && line[len(line)-2] == '\r' {
|
||||
if line[1] < '0' || line[1] > '9' {
|
||||
return ErrInvalid
|
||||
return totalSize, ErrInvalid
|
||||
}
|
||||
n = int(line[1] - '0')
|
||||
} else {
|
||||
if len(line) < 5 || line[len(line)-2] != '\r' {
|
||||
return ErrInvalid
|
||||
return totalSize, ErrInvalid
|
||||
}
|
||||
for i := 1; i < len(line)-2; i++ {
|
||||
if line[i] < '0' || line[i] > '9' {
|
||||
return ErrInvalid
|
||||
return totalSize, ErrInvalid
|
||||
}
|
||||
n = n*10 + int(line[i]-'0')
|
||||
}
|
||||
|
@ -837,10 +854,10 @@ func (db *DB) readLoad(rd io.Reader, modTime time.Time) error {
|
|||
data = make([]byte, dataln)
|
||||
}
|
||||
if _, err = io.ReadFull(r, data[:n+2]); err != nil {
|
||||
return err
|
||||
return totalSize, err
|
||||
}
|
||||
if data[n] != '\r' || data[n+1] != '\n' {
|
||||
return ErrInvalid
|
||||
return totalSize, ErrInvalid
|
||||
}
|
||||
// copy string
|
||||
parts = append(parts, string(data[:n]))
|
||||
|
@ -855,15 +872,15 @@ func (db *DB) readLoad(rd io.Reader, modTime time.Time) error {
|
|||
(parts[0][2] == 't' || parts[0][2] == 'T') {
|
||||
// SET
|
||||
if len(parts) < 3 || len(parts) == 4 || len(parts) > 5 {
|
||||
return ErrInvalid
|
||||
return totalSize, ErrInvalid
|
||||
}
|
||||
if len(parts) == 5 {
|
||||
if strings.ToLower(parts[3]) != "ex" {
|
||||
return ErrInvalid
|
||||
return totalSize, ErrInvalid
|
||||
}
|
||||
ex, err := strconv.ParseUint(parts[4], 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
return totalSize, err
|
||||
}
|
||||
now := time.Now()
|
||||
dur := (time.Duration(ex) * time.Second) - now.Sub(modTime)
|
||||
|
@ -885,7 +902,7 @@ func (db *DB) readLoad(rd io.Reader, modTime time.Time) error {
|
|||
(parts[0][2] == 'l' || parts[0][2] == 'L') {
|
||||
// DEL
|
||||
if len(parts) != 2 {
|
||||
return ErrInvalid
|
||||
return totalSize, ErrInvalid
|
||||
}
|
||||
db.deleteFromDatabase(&dbItem{key: parts[1]})
|
||||
} else if (parts[0][0] == 'f' || parts[0][0] == 'F') &&
|
||||
|
@ -894,10 +911,10 @@ func (db *DB) readLoad(rd io.Reader, modTime time.Time) error {
|
|||
db.exps = btree.New(lessCtx(&exctx{db}))
|
||||
db.idxs = make(map[string]*index)
|
||||
} else {
|
||||
return ErrInvalid
|
||||
return totalSize, ErrInvalid
|
||||
}
|
||||
totalSize += cmdByteSize
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// load reads entries from the append only database file and fills the database.
|
||||
|
@ -910,10 +927,20 @@ func (db *DB) load() error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := db.readLoad(db.file, fi.ModTime()); err != nil {
|
||||
return err
|
||||
n, err := db.readLoad(db.file, fi.ModTime())
|
||||
if err != nil {
|
||||
if err == io.ErrUnexpectedEOF {
|
||||
// The db file has ended mid-command, which is allowed but the
|
||||
// data file should be truncated to the end of the last valid
|
||||
// command
|
||||
if err := db.file.Truncate(n); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
pos, err := db.file.Seek(0, 2)
|
||||
pos, err := db.file.Seek(n, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1216,7 +1243,7 @@ func appendBulkString(buf []byte, s string) []byte {
|
|||
// writeSetTo writes an item as a single SET record to the a bufio Writer.
|
||||
func (dbi *dbItem) writeSetTo(buf []byte) []byte {
|
||||
if dbi.opts != nil && dbi.opts.ex {
|
||||
ex := dbi.opts.exat.Sub(time.Now()) / time.Second
|
||||
ex := time.Until(dbi.opts.exat) / time.Second
|
||||
buf = appendArray(buf, 5)
|
||||
buf = appendBulkString(buf, "set")
|
||||
buf = appendBulkString(buf, dbi.key)
|
||||
|
@ -1483,7 +1510,7 @@ func (tx *Tx) TTL(key string) (time.Duration, error) {
|
|||
} else if item.opts == nil || !item.opts.ex {
|
||||
return -1, nil
|
||||
}
|
||||
dur := item.opts.exat.Sub(time.Now())
|
||||
dur := time.Until(item.opts.exat)
|
||||
if dur < 0 {
|
||||
return 0, ErrNotFound
|
||||
}
|
||||
|
|
103
buntdb_test.go
103
buntdb_test.go
|
@ -1362,44 +1362,86 @@ func TestDatabaseFormat(t *testing.T) {
|
|||
db := testOpen(t)
|
||||
defer testClose(db)
|
||||
}()
|
||||
testBadFormat := func(resp string) {
|
||||
if err := os.RemoveAll("data.db"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
testFormat := func(t *testing.T, expectValid bool, resp string, do func(db *DB) error) {
|
||||
t.Helper()
|
||||
os.RemoveAll("data.db")
|
||||
if err := ioutil.WriteFile("data.db", []byte(resp), 0666); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll("data.db")
|
||||
|
||||
db, err := Open("data.db")
|
||||
if err == nil {
|
||||
if do != nil {
|
||||
if err := do(db); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
if err := db.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := os.RemoveAll("data.db"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Fatalf("invalid database should not be allowed")
|
||||
}
|
||||
if err == nil && !expectValid {
|
||||
t.Fatalf("expected invalid database")
|
||||
} else if err != nil && expectValid {
|
||||
t.Fatalf("expected valid database, got '%s'", err)
|
||||
}
|
||||
}
|
||||
testBadFormat("*3\r")
|
||||
testBadFormat("*3\n")
|
||||
testBadFormat("*a\r\n")
|
||||
testBadFormat("*2\r\n")
|
||||
testBadFormat("*2\r\n%3")
|
||||
testBadFormat("*2\r\n$")
|
||||
testBadFormat("*2\r\n$3\r\n")
|
||||
testBadFormat("*2\r\n$3\r\ndel")
|
||||
testBadFormat("*2\r\n$3\r\ndel\r\r")
|
||||
testBadFormat("*0\r\n*2\r\n$3\r\ndel\r\r")
|
||||
testBadFormat("*1\r\n$3\r\nnop\r\n")
|
||||
testBadFormat("*1\r\n$3\r\ndel\r\n")
|
||||
testBadFormat("*1\r\n$3\r\nset\r\n")
|
||||
testBadFormat("*5\r\n$3\r\nset\r\n$3\r\nvar\r\n$3\r\nval\r\n$2\r\nxx\r\n$2\r\n10\r\n")
|
||||
testBadFormat("*5\r\n$3\r\nset\r\n$3\r\nvar\r\n$3\r\nval\r\n$2\r\nex\r\n$2\r\naa\r\n")
|
||||
testBadFormat("*15\r\n$3\r\nset\r\n$3\r\nvar\r\n$3\r\nval\r\n$2\r\nex\r\n$2\r\naa\r\n")
|
||||
testBadFormat("*1A\r\n$3\r\nset\r\n$3\r\nvar\r\n$3\r\nval\r\n$2\r\nex\r\n$2\r\naa\r\n")
|
||||
testBadFormat("*5\r\n$13\r\nset\r\n$3\r\nvar\r\n$3\r\nval\r\n$2\r\nex\r\n$2\r\naa\r\n")
|
||||
testBadFormat("*5\r\n$1A\r\nset\r\n$3\r\nvar\r\n$3\r\nval\r\n$2\r\nex\r\n$2\r\naa\r\n")
|
||||
testBadFormat("*5\r\n$3\r\nset\r\n$5000\r\nvar\r\n$3\r\nval\r\n$2\r\nex\r\n$2\r\naa\r\n")
|
||||
|
||||
// basic valid commands
|
||||
testFormat(t, true, "*2\r\n$3\r\nDEL\r\n$5\r\nHELLO\r\n", nil)
|
||||
testFormat(t, true, "*3\r\n$3\r\nSET\r\n$5\r\nHELLO\r\n$5\r\nWORLD\r\n", nil)
|
||||
testFormat(t, true, "*1\r\n$7\r\nFLUSHDB\r\n", nil)
|
||||
|
||||
// commands with invalid names or arguments
|
||||
testFormat(t, false, "*3\r\n$3\r\nDEL\r\n$5\r\nHELLO\r\n$5\r\nWORLD\r\n", nil)
|
||||
testFormat(t, false, "*2\r\n$3\r\nSET\r\n$5\r\nHELLO\r\n", nil)
|
||||
testFormat(t, false, "*1\r\n$6\r\nSET123\r\n", nil)
|
||||
|
||||
// partial tail commands should be ignored but allowed
|
||||
pcmd := "*3\r\n$3\r\nSET\r\n$5\r\nHELLO\r\n$5\r\nWORLD\r\n"
|
||||
for i := 1; i < len(pcmd); i++ {
|
||||
cmd := "*3\r\n$3\r\nSET\r\n$5\r\nHELLO\r\n$5\r\nJELLO\r\n"
|
||||
testFormat(t, true, cmd+pcmd[:len(pcmd)-i],
|
||||
func(db *DB) error {
|
||||
return db.View(func(tx *Tx) error {
|
||||
val, err := tx.Get("HELLO")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if val != "JELLO" {
|
||||
return fmt.Errorf("expected '%s', got '%s'", "JELLO", val)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// commands with invalid formatting
|
||||
testFormat(t, false, "^3\r\n$3\r\nSET\r\n$5\r\nHELLO\r\n$5\r\nWORLD\r\n", nil)
|
||||
testFormat(t, false, "*3\n$3\r\nSET\r\n$5\r\nHELLO\r\n$5\r\nWORLD\r\n", nil)
|
||||
testFormat(t, false, "*\n$3\r\nSET\r\n$5\r\nHELLO\r\n$5\r\nWORLD\r\n", nil)
|
||||
testFormat(t, false, "*3\r\n^3\r\nSET\r\n$5\r\nHELLO\r\n$5\r\nWORLD\r\n", nil)
|
||||
testFormat(t, false, "*3\r\n$\r\nSET\r\n$5\r\nHELLO\r\n$5\r\nWORLD\r\n", nil)
|
||||
testFormat(t, false, "*3\r\n$3\nSET\r\n$5\r\nHELLO\r\n$5\r\nWORLD\r\n", nil)
|
||||
testFormat(t, false, "*3\r\n$3SET\r\n$5\r\nHELLO\r\n$5\r\nWORLD\r\n", nil)
|
||||
testFormat(t, false, "*3\r\n$3\r\nSET\r\n$5\r\nHELLO\r\n$5\r\nWORLD\r\n123\n", nil)
|
||||
|
||||
// commands with nuls
|
||||
testFormat(t, true, "\u0000*3\r\n$3\r\nSET\r\n$5\r\nHELLO\r\n$5\r\nWORLD\r\n"+
|
||||
"\u0000\u0000*3\r\n$3\r\nSET\r\n$5\r\nHELLO\r\n$5\r\nJELLO\r\n\u0000", func(db *DB) error {
|
||||
return db.View(func(tx *Tx) error {
|
||||
val, err := tx.Get("HELLO")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if val != "JELLO" {
|
||||
return fmt.Errorf("expected '%s', got '%s'", "JELLO", val)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func TestInsertsAndDeleted(t *testing.T) {
|
||||
|
@ -1812,7 +1854,7 @@ func TestBasic(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
if false {
|
||||
println(time.Now().Sub(start).String(), db.keys.Len())
|
||||
println(time.Since(start).String(), db.keys.Len())
|
||||
}
|
||||
// add some random rects
|
||||
if err := db.Update(func(tx *Tx) error {
|
||||
|
@ -1862,6 +1904,9 @@ func TestBasic(t *testing.T) {
|
|||
fmt.Fprintf(buf, "%s: %v,%v\n", key, min, max)
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
expect := make([]string, 2)
|
||||
n := 0
|
||||
err = tx.Intersects("rects", "[0 0],[15 15]", func(key, val string) bool {
|
||||
|
|
Loading…
Reference in New Issue