diff --git a/internal/server/aof.go b/internal/server/aof.go index 6d083e11..ffab4957 100644 --- a/internal/server/aof.go +++ b/internal/server/aof.go @@ -41,10 +41,7 @@ func (s *Server) loadAOF() (err error) { ps := float64(count) / (float64(d) / float64(time.Second)) suf := []string{"bytes/s", "KB/s", "MB/s", "GB/s", "TB/s"} bps := float64(fi.Size()) / (float64(d) / float64(time.Second)) - for i := 0; bps > 1024; i++ { - if len(suf) == 1 { - break - } + for i := 0; bps > 1024 && len(suf) > 1; i++ { bps /= 1024 suf = suf[1:] } @@ -123,11 +120,7 @@ func commandErrIsFatal(err error) bool { // FSET (and other writable commands) may return errors that we need // to ignore during the loading process. These errors may occur (though unlikely) // due to the aof rewrite operation. - switch err { - case errKeyNotFound, errIDNotFound: - return false - } - return true + return !(err == errKeyNotFound || err == errIDNotFound) } // flushAOF flushes all aof buffer data to disk. Set sync to true to sync the diff --git a/tests/aof_test.go b/tests/aof_test.go index d9cb68b4..5f74b25a 100644 --- a/tests/aof_test.go +++ b/tests/aof_test.go @@ -1,35 +1,89 @@ package tests import ( + "bytes" + "errors" + "fmt" "testing" ) func subTestAOF(t *testing.T, mc *mockServer) { runStep(t, mc, "loading", aof_loading_test) + // runStep(t, mc, "AOFMD5", aof_AOFMD5_test) +} + +func loadAOFAndClose(aof any) error { + var aofb []byte + switch aof := aof.(type) { + case []byte: + aofb = []byte(aof) + case string: + aofb = []byte(aof) + default: + return errors.New("aof is not string or bytes") + } + mc, err := mockOpenServer(MockServerOptions{ + Silent: true, + Metrics: false, + AOFData: aofb, + }) + if mc != nil { + mc.Close() + } + return err } func aof_loading_test(mc *mockServer) error { - // aof, err := mc.readAOF() - // if err != nil { - // return err - // } + var err error + // invalid command + err = loadAOFAndClose("asdfasdf\r\n") + if err == nil || err.Error() != "unknown command 'asdfasdf'" { + return fmt.Errorf("expected '%v', got '%v'", + "unknown command 'asdfasdf'", err) + } - // aof = append(aof, "asdfasdf\r\n"...) - // aof = nil - // mc2, err := mockOpenServer(MockServerOptions{ - // Silent: false, - // Metrics: false, - // AOFData: aof, - // }) - // if err != nil { - // return err - // } - // defer mc2.Close() + // incomplete command + err = loadAOFAndClose("set fleet truck point 10 10\r\nasdfasdf") + if err != nil { + return err + } - // time.Sleep(time.Minute) + // big aof file + var aof string + for i := 0; i < 10000; i++ { + aof += fmt.Sprintf("SET fleet truck%d POINT 10 10\r\n", i) + } + err = loadAOFAndClose(aof) + if err != nil { + return err + } - // ` + // extra zeros at various places + aof = "" + for i := 0; i < 1000; i++ { + if i%10 == 0 { + aof += string(bytes.Repeat([]byte{0}, 100)) + } + aof += fmt.Sprintf("SET fleet truck%d POINT 10 10\r\n", i) + } + aof += string(bytes.Repeat([]byte{0}, 5000)) + err = loadAOFAndClose(aof) + if err != nil { + return err + } + + // bad protocol + aof = "*2\r\n$1\r\nh\r\n+OK\r\n" + err = loadAOFAndClose(aof) + if fmt.Sprintf("%v", err) != "Protocol error: expected '$', got '+'" { + return fmt.Errorf("expected '%v', got '%v'", + "Protocol error: expected '$', got '+'", err) + } return nil } + +// func aof_AOFMD5_test(mc *mockServer) error { +// return nil +// } diff --git a/tests/mock_test.go b/tests/mock_test.go index cbec7fe3..a67e5f17 100644 --- a/tests/mock_test.go +++ b/tests/mock_test.go @@ -4,11 +4,11 @@ import ( "errors" "fmt" "io" - "log" "math/rand" "os" "path/filepath" "strings" + "sync/atomic" "time" "github.com/gomodule/redigo/redis" @@ -39,7 +39,6 @@ type mockServer struct { conn redis.Conn ioJSON bool dir string - // alt *mockServer } func (mc *mockServer) readAOF() ([]byte, error) { @@ -73,35 +72,41 @@ func mockOpenServer(opts MockServerOptions) (*mockServer, error) { if os.Getenv("PRINTLOG") == "1" { logOutput = os.Stderr } - s := &mockServer{port: port} + s := &mockServer{port: port, dir: dir} tlog.SetOutput(logOutput) + var ferrt int32 // atomic flag for when ferr has been set + var ferr error // ferr for when the server fails to start go func() { sopts := server.Options{ - Host: "localhost", - Port: port, - Dir: dir, - UseHTTP: true, - DevMode: true, + Host: "localhost", + Port: port, + Dir: dir, + UseHTTP: true, + DevMode: true, + AppendOnly: true, } if opts.Metrics { sopts.MetricsAddr = ":4321" } if err := server.Serve(sopts); err != nil { - log.Fatal(err) + ferr = err + atomic.StoreInt32(&ferrt, 1) } }() - if err := s.waitForStartup(); err != nil { + if err := s.waitForStartup(&ferr, &ferrt); err != nil { s.Close() return nil, err } - s.dir = dir return s, nil } -func (s *mockServer) waitForStartup() error { +func (s *mockServer) waitForStartup(ferr *error, ferrt *int32) error { var lerr error start := time.Now() for { + if atomic.LoadInt32(ferrt) != 0 { + return *ferr + } if time.Since(start) > time.Second*5 { if lerr != nil { return lerr @@ -131,6 +136,9 @@ func (mc *mockServer) Close() { if mc.conn != nil { mc.conn.Close() } + if mc.dir != "" { + os.RemoveAll(mc.dir) + } } func (mc *mockServer) ResetConn() { diff --git a/tests/tests_test.go b/tests/tests_test.go index 415a44d0..5894bb20 100644 --- a/tests/tests_test.go +++ b/tests/tests_test.go @@ -47,14 +47,6 @@ func TestAll(t *testing.T) { } defer mc.Close() - // mc2, err := mockOpenServer(false, false) - // if err != nil { - // t.Fatal(err) - // } - // defer mc2.Close() - // mc.alt = mc2 - // mc2.alt = mc - runSubTest(t, "keys", mc, subTestKeys) runSubTest(t, "json", mc, subTestJSON) runSubTest(t, "search", mc, subTestSearch)