From 8a0c59b101805f2e45272100134ac4ad83bd69ee Mon Sep 17 00:00:00 2001 From: ofekshenawa <104765379+ofekshenawa@users.noreply.github.com> Date: Thu, 11 Jul 2024 11:57:56 +0300 Subject: [PATCH] TimeSeries insertion filters for close samples (#3003) * TimeSeries insertion filters for close samples * fix * fix * fix * fix * fix --------- Co-authored-by: Vladyslav Vildanov <117659936+vladvildanov@users.noreply.github.com> --- timeseries_commands.go | 56 ++++++++++---- timeseries_commands_test.go | 149 ++++++++++++++++++++++++++++++++++-- 2 files changed, 186 insertions(+), 19 deletions(-) diff --git a/timeseries_commands.go b/timeseries_commands.go index 6f1b2fa4..82d8cdfc 100644 --- a/timeseries_commands.go +++ b/timeseries_commands.go @@ -40,25 +40,32 @@ type TimeseriesCmdable interface { } type TSOptions struct { - Retention int - ChunkSize int - Encoding string - DuplicatePolicy string - Labels map[string]string + Retention int + ChunkSize int + Encoding string + DuplicatePolicy string + Labels map[string]string + IgnoreMaxTimeDiff int64 + IgnoreMaxValDiff float64 } type TSIncrDecrOptions struct { - Timestamp int64 - Retention int - ChunkSize int - Uncompressed bool - Labels map[string]string + Timestamp int64 + Retention int + ChunkSize int + Uncompressed bool + DuplicatePolicy string + Labels map[string]string + IgnoreMaxTimeDiff int64 + IgnoreMaxValDiff float64 } type TSAlterOptions struct { - Retention int - ChunkSize int - DuplicatePolicy string - Labels map[string]string + Retention int + ChunkSize int + DuplicatePolicy string + Labels map[string]string + IgnoreMaxTimeDiff int64 + IgnoreMaxValDiff float64 } type TSCreateRuleOptions struct { @@ -223,6 +230,9 @@ func (c cmdable) TSAddWithArgs(ctx context.Context, key string, timestamp interf args = append(args, label, value) } } + if options.IgnoreMaxTimeDiff != 0 || options.IgnoreMaxValDiff != 0 { + args = append(args, "IGNORE", options.IgnoreMaxTimeDiff, options.IgnoreMaxValDiff) + } } cmd := NewIntCmd(ctx, args...) _ = c(ctx, cmd) @@ -264,6 +274,9 @@ func (c cmdable) TSCreateWithArgs(ctx context.Context, key string, options *TSOp args = append(args, label, value) } } + if options.IgnoreMaxTimeDiff != 0 || options.IgnoreMaxValDiff != 0 { + args = append(args, "IGNORE", options.IgnoreMaxTimeDiff, options.IgnoreMaxValDiff) + } } cmd := NewStatusCmd(ctx, args...) _ = c(ctx, cmd) @@ -292,6 +305,9 @@ func (c cmdable) TSAlter(ctx context.Context, key string, options *TSAlterOption args = append(args, label, value) } } + if options.IgnoreMaxTimeDiff != 0 || options.IgnoreMaxValDiff != 0 { + args = append(args, "IGNORE", options.IgnoreMaxTimeDiff, options.IgnoreMaxValDiff) + } } cmd := NewStatusCmd(ctx, args...) _ = c(ctx, cmd) @@ -351,12 +367,18 @@ func (c cmdable) TSIncrByWithArgs(ctx context.Context, key string, timestamp flo if options.Uncompressed { args = append(args, "UNCOMPRESSED") } + if options.DuplicatePolicy != "" { + args = append(args, "DUPLICATE_POLICY", options.DuplicatePolicy) + } if options.Labels != nil { args = append(args, "LABELS") for label, value := range options.Labels { args = append(args, label, value) } } + if options.IgnoreMaxTimeDiff != 0 || options.IgnoreMaxValDiff != 0 { + args = append(args, "IGNORE", options.IgnoreMaxTimeDiff, options.IgnoreMaxValDiff) + } } cmd := NewIntCmd(ctx, args...) _ = c(ctx, cmd) @@ -391,12 +413,18 @@ func (c cmdable) TSDecrByWithArgs(ctx context.Context, key string, timestamp flo if options.Uncompressed { args = append(args, "UNCOMPRESSED") } + if options.DuplicatePolicy != "" { + args = append(args, "DUPLICATE_POLICY", options.DuplicatePolicy) + } if options.Labels != nil { args = append(args, "LABELS") for label, value := range options.Labels { args = append(args, label, value) } } + if options.IgnoreMaxTimeDiff != 0 || options.IgnoreMaxValDiff != 0 { + args = append(args, "IGNORE", options.IgnoreMaxTimeDiff, options.IgnoreMaxValDiff) + } } cmd := NewIntCmd(ctx, args...) _ = c(ctx, cmd) diff --git a/timeseries_commands_test.go b/timeseries_commands_test.go index 563f24e7..c62367a7 100644 --- a/timeseries_commands_test.go +++ b/timeseries_commands_test.go @@ -23,7 +23,7 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() { Expect(client.Close()).NotTo(HaveOccurred()) }) - It("should TSCreate and TSCreateWithArgs", Label("timeseries", "tscreate", "tscreateWithArgs"), func() { + It("should TSCreate and TSCreateWithArgs", Label("timeseries", "tscreate", "tscreateWithArgs", "NonRedisEnterprise"), func() { result, err := client.TSCreate(ctx, "1").Result() Expect(err).NotTo(HaveOccurred()) Expect(result).To(BeEquivalentTo("OK")) @@ -62,10 +62,60 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() { resultInfo, err = client.TSInfo(ctx, keyName).Result() Expect(err).NotTo(HaveOccurred()) Expect(strings.ToUpper(resultInfo["duplicatePolicy"].(string))).To(BeEquivalentTo(dup)) - } + // Test insertion filters + opt = &redis.TSOptions{IgnoreMaxTimeDiff: 5, DuplicatePolicy: "LAST", IgnoreMaxValDiff: 10.0} + result, err = client.TSCreateWithArgs(ctx, "ts-if-1", opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo("OK")) + resultAdd, err := client.TSAdd(ctx, "ts-if-1", 1000, 1.0).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo(1000)) + resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1010, 11.0).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo(1010)) + resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1013, 10.0).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo(1010)) + resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1020, 11.5).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo(1020)) + resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1021, 22.0).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo(1021)) + + rangePoints, err := client.TSRange(ctx, "ts-if-1", 1000, 1021).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(rangePoints)).To(BeEquivalentTo(4)) + Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{ + {Timestamp: 1000, Value: 1.0}, + {Timestamp: 1010, Value: 11.0}, + {Timestamp: 1020, Value: 11.5}, + {Timestamp: 1021, Value: 22.0}})) + // Test insertion filters with other duplicate policy + opt = &redis.TSOptions{IgnoreMaxTimeDiff: 5, IgnoreMaxValDiff: 10.0} + result, err = client.TSCreateWithArgs(ctx, "ts-if-2", opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo("OK")) + resultAdd1, err := client.TSAdd(ctx, "ts-if-1", 1000, 1.0).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd1).To(BeEquivalentTo(1000)) + resultAdd1, err = client.TSAdd(ctx, "ts-if-1", 1010, 11.0).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd1).To(BeEquivalentTo(1010)) + resultAdd1, err = client.TSAdd(ctx, "ts-if-1", 1013, 10.0).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd1).To(BeEquivalentTo(1013)) + + rangePoints, err = client.TSRange(ctx, "ts-if-1", 1000, 1013).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(rangePoints)).To(BeEquivalentTo(3)) + Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{ + {Timestamp: 1000, Value: 1.0}, + {Timestamp: 1010, Value: 11.0}, + {Timestamp: 1013, Value: 10.0}})) }) - It("should TSAdd and TSAddWithArgs", Label("timeseries", "tsadd", "tsaddWithArgs"), func() { + It("should TSAdd and TSAddWithArgs", Label("timeseries", "tsadd", "tsaddWithArgs", "NonRedisEnterprise"), func() { result, err := client.TSAdd(ctx, "1", 1, 1).Result() Expect(err).NotTo(HaveOccurred()) Expect(result).To(BeEquivalentTo(1)) @@ -138,9 +188,23 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() { resultGet, err = client.TSGet(ctx, "tsami-1").Result() Expect(err).NotTo(HaveOccurred()) Expect(resultGet.Value).To(BeEquivalentTo(5)) + // Insertion filters + opt = &redis.TSOptions{IgnoreMaxTimeDiff: 5, IgnoreMaxValDiff: 10.0, DuplicatePolicy: "LAST"} + result, err = client.TSAddWithArgs(ctx, "ts-if-1", 1000, 1.0, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo(1000)) + + result, err = client.TSAddWithArgs(ctx, "ts-if-1", 1004, 3.0, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo(1000)) + + rangePoints, err := client.TSRange(ctx, "ts-if-1", 1000, 1004).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(rangePoints)).To(BeEquivalentTo(1)) + Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 1000, Value: 1.0}})) }) - It("should TSAlter", Label("timeseries", "tsalter"), func() { + It("should TSAlter", Label("timeseries", "tsalter", "NonRedisEnterprise"), func() { result, err := client.TSCreate(ctx, "1").Result() Expect(err).NotTo(HaveOccurred()) Expect(result).To(BeEquivalentTo("OK")) @@ -179,6 +243,33 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() { resultInfo, err = client.TSInfo(ctx, "1").Result() Expect(err).NotTo(HaveOccurred()) Expect(resultInfo["duplicatePolicy"]).To(BeEquivalentTo("min")) + // Test insertion filters + resultAdd, err := client.TSAdd(ctx, "ts-if-1", 1000, 1.0).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo(1000)) + resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1010, 11.0).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo(1010)) + resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1013, 10.0).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo(1013)) + + alterOpt := &redis.TSAlterOptions{IgnoreMaxTimeDiff: 5, IgnoreMaxValDiff: 10.0, DuplicatePolicy: "LAST"} + resultAlter, err = client.TSAlter(ctx, "ts-if-1", alterOpt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAlter).To(BeEquivalentTo("OK")) + + resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1015, 11.5).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo(1013)) + + rangePoints, err := client.TSRange(ctx, "ts-if-1", 1000, 1013).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(rangePoints)).To(BeEquivalentTo(3)) + Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{ + {Timestamp: 1000, Value: 1.0}, + {Timestamp: 1010, Value: 11.0}, + {Timestamp: 1013, Value: 10.0}})) }) It("should TSCreateRule and TSDeleteRule", Label("timeseries", "tscreaterule", "tsdeleterule"), func() { @@ -216,7 +307,7 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() { Expect(resultInfo["rules"]).To(BeEquivalentTo(map[interface{}]interface{}{})) }) - It("should TSIncrBy, TSIncrByWithArgs, TSDecrBy and TSDecrByWithArgs", Label("timeseries", "tsincrby", "tsdecrby", "tsincrbyWithArgs", "tsdecrbyWithArgs"), func() { + It("should TSIncrBy, TSIncrByWithArgs, TSDecrBy and TSDecrByWithArgs", Label("timeseries", "tsincrby", "tsdecrby", "tsincrbyWithArgs", "tsdecrbyWithArgs", "NonRedisEnterprise"), func() { for i := 0; i < 100; i++ { _, err := client.TSIncrBy(ctx, "1", 1).Result() Expect(err).NotTo(HaveOccurred()) @@ -277,6 +368,54 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() { resultInfo, err = client.TSInfo(ctx, "4").Result() Expect(err).NotTo(HaveOccurred()) Expect(resultInfo["chunkSize"]).To(BeEquivalentTo(128)) + + // Test insertion filters INCRBY + opt = &redis.TSIncrDecrOptions{Timestamp: 1000, IgnoreMaxTimeDiff: 5, IgnoreMaxValDiff: 10.0, DuplicatePolicy: "LAST"} + res, err := client.TSIncrByWithArgs(ctx, "ts-if-1", 1.0, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(BeEquivalentTo(1000)) + + res, err = client.TSIncrByWithArgs(ctx, "ts-if-1", 3.0, &redis.TSIncrDecrOptions{Timestamp: 1000}).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(BeEquivalentTo(1000)) + + rangePoints, err := client.TSRange(ctx, "ts-if-1", 1000, 1004).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(rangePoints)).To(BeEquivalentTo(1)) + Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 1000, Value: 1.0}})) + + res, err = client.TSIncrByWithArgs(ctx, "ts-if-1", 10.1, &redis.TSIncrDecrOptions{Timestamp: 1000}).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(BeEquivalentTo(1000)) + + rangePoints, err = client.TSRange(ctx, "ts-if-1", 1000, 1004).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(rangePoints)).To(BeEquivalentTo(1)) + Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 1000, Value: 11.1}})) + + // Test insertion filters DECRBY + opt = &redis.TSIncrDecrOptions{Timestamp: 1000, IgnoreMaxTimeDiff: 5, IgnoreMaxValDiff: 10.0, DuplicatePolicy: "LAST"} + res, err = client.TSDecrByWithArgs(ctx, "ts-if-2", 1.0, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(BeEquivalentTo(1000)) + + res, err = client.TSDecrByWithArgs(ctx, "ts-if-2", 3.0, &redis.TSIncrDecrOptions{Timestamp: 1000}).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(BeEquivalentTo(1000)) + + rangePoints, err = client.TSRange(ctx, "ts-if-2", 1000, 1004).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(rangePoints)).To(BeEquivalentTo(1)) + Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 1000, Value: -1.0}})) + + res, err = client.TSDecrByWithArgs(ctx, "ts-if-2", 10.1, &redis.TSIncrDecrOptions{Timestamp: 1000}).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(BeEquivalentTo(1000)) + + rangePoints, err = client.TSRange(ctx, "ts-if-2", 1000, 1004).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(rangePoints)).To(BeEquivalentTo(1)) + Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 1000, Value: -11.1}})) }) It("should TSGet", Label("timeseries", "tsget"), func() {