diff --git a/internal/server/aofshrink.go b/internal/server/aofshrink.go index efd93a3e..257b102b 100644 --- a/internal/server/aofshrink.go +++ b/internal/server/aofshrink.go @@ -19,12 +19,9 @@ const maxids = 32 const maxchunk = 4 * 1024 * 1024 func (s *Server) aofshrink() { - if s.aof == nil { - return - } start := time.Now() s.mu.Lock() - if s.shrinking { + if s.aof == nil || s.shrinking { s.mu.Unlock() return } diff --git a/tests/aof_test.go b/tests/aof_test.go index 6ffb0b1b..7a68058a 100644 --- a/tests/aof_test.go +++ b/tests/aof_test.go @@ -7,6 +7,9 @@ import ( "errors" "fmt" "math/rand" + "net" + "net/http" + "sync/atomic" "time" "github.com/gomodule/redigo/redis" @@ -16,6 +19,7 @@ func subTestAOF(g *testGroup) { g.regSubTest("loading", aof_loading_test) g.regSubTest("AOF", aof_AOF_test) g.regSubTest("AOFMD5", aof_AOFMD5_test) + g.regSubTest("AOFSHRINK", aof_AOFSHRINK_test) } func loadAOFAndClose(aof any) error { @@ -131,6 +135,34 @@ func aof_AOFMD5_test(mc *mockServer) error { ) } +func openFollower(mc *mockServer) (conn redis.Conn, err error) { + conn, err = redis.Dial("tcp", fmt.Sprintf(":%d", mc.port), + redis.DialReadTimeout(time.Second)) + if err != nil { + return nil, err + } + defer func() { + if err != nil { + conn.Close() + conn = nil + } + }() + if err := conn.Send("AOF", 0); err != nil { + return nil, err + } + if err := conn.Flush(); err != nil { + return nil, err + } + str, err := redis.String(conn.Receive()) + if err != nil { + return nil, err + } + if str != "OK" { + return nil, fmt.Errorf("expected '%s', got '%s'", "OK", str) + } + return conn, nil +} + func aof_AOF_test(mc *mockServer) error { var argss [][]interface{} for i := 0; i < 10000; i++ { @@ -144,10 +176,9 @@ func aof_AOF_test(mc *mockServer) error { } } readAll := func() (conn redis.Conn, err error) { - conn, err = redis.Dial("tcp", fmt.Sprintf(":%d", mc.port), - redis.DialReadTimeout(time.Second)) + conn, err = openFollower(mc) if err != nil { - return nil, err + return } defer func() { if err != nil { @@ -155,19 +186,6 @@ func aof_AOF_test(mc *mockServer) error { conn = nil } }() - if err := conn.Send("AOF", 0); err != nil { - return nil, err - } - if err := conn.Flush(); err != nil { - return nil, err - } - str, err := redis.String(conn.Receive()) - if err != nil { - return nil, err - } - if str != "OK" { - return nil, fmt.Errorf("expected '%s', got '%s'", "OK", str) - } var t bool for i := 0; i < len(argss); i++ { args, err := redis.Values(conn.Receive()) @@ -216,3 +234,42 @@ func aof_AOF_test(mc *mockServer) error { Do("AOF", 1000000000000).Err("pos is too big, must be less that the aof_size of leader"), ) } + +func aof_AOFSHRINK_test(mc *mockServer) error { + var err error + haddr := fmt.Sprintf("localhost:%d", getNextPort()) + ln, err := net.Listen("tcp", haddr) + if err != nil { + return err + } + defer ln.Close() + var msgs atomic.Int32 + go func() { + mux := http.NewServeMux() + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + msgs.Add(1) + // println(r.URL.Path) + }) + http.Serve(ln, mux) + }() + err = mc.DoBatch( + Do("SETCHAN", "mychan", "INTERSECTS", "mi:0", "BOUNDS", -10, -10, 10, 10).Str("1"), + Do("SETHOOK", "myhook", "http://"+haddr, "INTERSECTS", "mi:0", "BOUNDS", -10, -10, 10, 10).Str("1"), + Do("MASSINSERT", 5, 10000).OK(), + ) + if err != nil { + return err + } + err = mc.DoBatch( + Do("AOFSHRINK").OK(), + Do("MASSINSERT", 5, 10000).OK(), + ) + if err != nil { + return err + } + nmsgs := msgs.Load() + if nmsgs == 0 { + return fmt.Errorf("expected > 0, got %d", nmsgs) + } + return err +} diff --git a/tests/mock_test.go b/tests/mock_test.go index c87015b1..f4354ac8 100644 --- a/tests/mock_test.go +++ b/tests/mock_test.go @@ -60,7 +60,7 @@ type MockServerOptions struct { var nextPort int32 = 10000 -func getRandPort() int { +func getNextPort() int { // choose a valid port between 10000-50000 for { port := int(atomic.AddInt32(&nextPort, 1)) @@ -82,7 +82,7 @@ func mockOpenServer(opts MockServerOptions) (*mockServer, error) { log.SetOutput(logOutput) rand.Seed(time.Now().UnixNano()) - port := getRandPort() + port := getNextPort() dir := fmt.Sprintf("data-mock-%d", port) if !opts.Silent { fmt.Printf("Starting test server at port %d\n", port) @@ -101,7 +101,7 @@ func mockOpenServer(opts MockServerOptions) (*mockServer, error) { shutdown := make(chan bool) s := &mockServer{port: port, dir: dir, shutdown: shutdown} if opts.Metrics { - s.mport = getRandPort() + s.mport = getNextPort() } var ferrt int32 // atomic flag for when ferr has been set var ferr error // ferr for when the server fails to start