diff --git a/.github/wordlist.txt b/.github/wordlist.txt index 19f01b10..fcaba724 100644 --- a/.github/wordlist.txt +++ b/.github/wordlist.txt @@ -54,4 +54,5 @@ url variadic RedisStack RedisGears +RedisTimeseries RediSearch diff --git a/command.go b/command.go index 1bd4d5db..549322a9 100644 --- a/command.go +++ b/command.go @@ -1351,6 +1351,65 @@ func (cmd *MapStringIntCmd) readReply(rd *proto.Reader) error { return nil } +// ------------------------------------------------------------------------------ +type MapStringSliceInterfaceCmd struct { + baseCmd + val map[string][]interface{} +} + +func NewMapStringSliceInterfaceCmd(ctx context.Context, args ...interface{}) *MapStringSliceInterfaceCmd { + return &MapStringSliceInterfaceCmd{ + baseCmd: baseCmd{ + ctx: ctx, + args: args, + }, + } +} + +func (cmd *MapStringSliceInterfaceCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *MapStringSliceInterfaceCmd) SetVal(val map[string][]interface{}) { + cmd.val = val +} + +func (cmd *MapStringSliceInterfaceCmd) Result() (map[string][]interface{}, error) { + return cmd.val, cmd.err +} + +func (cmd *MapStringSliceInterfaceCmd) Val() map[string][]interface{} { + return cmd.val +} + +func (cmd *MapStringSliceInterfaceCmd) readReply(rd *proto.Reader) (err error) { + n, err := rd.ReadMapLen() + if err != nil { + return err + } + cmd.val = make(map[string][]interface{}, n) + for i := 0; i < n; i++ { + k, err := rd.ReadString() + if err != nil { + return err + } + nn, err := rd.ReadArrayLen() + if err != nil { + return err + } + cmd.val[k] = make([]interface{}, nn) + for j := 0; j < nn; j++ { + value, err := rd.ReadReply() + if err != nil { + return err + } + cmd.val[k][j] = value + } + } + + return nil +} + //------------------------------------------------------------------------------ type StringStructMapCmd struct { diff --git a/commands.go b/commands.go index 07c8e2c8..ce383025 100644 --- a/commands.go +++ b/commands.go @@ -507,6 +507,7 @@ type Cmdable interface { gearsCmdable probabilisticCmdable + TimeseriesCmdable } type StatefulCmdable interface { diff --git a/redis_timeseries.go b/redis_timeseries.go new file mode 100644 index 00000000..5ead2fa5 --- /dev/null +++ b/redis_timeseries.go @@ -0,0 +1,929 @@ +package redis + +import ( + "context" + "strconv" + + "github.com/redis/go-redis/v9/internal/proto" +) + +type TimeseriesCmdable interface { + TSAdd(ctx context.Context, key string, timestamp interface{}, value float64) *IntCmd + TSAddWithArgs(ctx context.Context, key string, timestamp interface{}, value float64, options *TSOptions) *IntCmd + TSCreate(ctx context.Context, key string) *StatusCmd + TSCreateWithArgs(ctx context.Context, key string, options *TSOptions) *StatusCmd + TSAlter(ctx context.Context, key string, options *TSAlterOptions) *StatusCmd + TSCreateRule(ctx context.Context, sourceKey string, destKey string, aggregator Aggregator, bucketDuration int) *StatusCmd + TSCreateRuleWithArgs(ctx context.Context, sourceKey string, destKey string, aggregator Aggregator, bucketDuration int, options *TSCreateRuleOptions) *StatusCmd + TSIncrBy(ctx context.Context, Key string, timestamp float64) *IntCmd + TSIncrByWithArgs(ctx context.Context, key string, timestamp float64, options *TSIncrDecrOptions) *IntCmd + TSDecrBy(ctx context.Context, Key string, timestamp float64) *IntCmd + TSDecrByWithArgs(ctx context.Context, key string, timestamp float64, options *TSIncrDecrOptions) *IntCmd + TSDel(ctx context.Context, Key string, fromTimestamp int, toTimestamp int) *IntCmd + TSDeleteRule(ctx context.Context, sourceKey string, destKey string) *StatusCmd + TSGet(ctx context.Context, key string) *TSTimestampValueCmd + TSGetWithArgs(ctx context.Context, key string, options *TSGetOptions) *TSTimestampValueCmd + TSInfo(ctx context.Context, key string) *MapStringInterfaceCmd + TSInfoWithArgs(ctx context.Context, key string, options *TSInfoOptions) *MapStringInterfaceCmd + TSMAdd(ctx context.Context, ktvSlices [][]interface{}) *IntSliceCmd + TSQueryIndex(ctx context.Context, filterExpr []string) *StringSliceCmd + TSRevRange(ctx context.Context, key string, fromTimestamp int, toTimestamp int) *TSTimestampValueSliceCmd + TSRevRangeWithArgs(ctx context.Context, key string, fromTimestamp int, toTimestamp int, options *TSRevRangeOptions) *TSTimestampValueSliceCmd + TSRange(ctx context.Context, key string, fromTimestamp int, toTimestamp int) *TSTimestampValueSliceCmd + TSRangeWithArgs(ctx context.Context, key string, fromTimestamp int, toTimestamp int, options *TSRangeOptions) *TSTimestampValueSliceCmd + TSMRange(ctx context.Context, fromTimestamp int, toTimestamp int, filterExpr []string) *MapStringSliceInterfaceCmd + TSMRangeWithArgs(ctx context.Context, fromTimestamp int, toTimestamp int, filterExpr []string, options *TSMRangeOptions) *MapStringSliceInterfaceCmd + TSMRevRange(ctx context.Context, fromTimestamp int, toTimestamp int, filterExpr []string) *MapStringSliceInterfaceCmd + TSMRevRangeWithArgs(ctx context.Context, fromTimestamp int, toTimestamp int, filterExpr []string, options *TSMRevRangeOptions) *MapStringSliceInterfaceCmd + TSMGet(ctx context.Context, filters []string) *MapStringSliceInterfaceCmd + TSMGetWithArgs(ctx context.Context, filters []string, options *TSMGetOptions) *MapStringSliceInterfaceCmd +} + +type TSOptions struct { + Retention int + ChunkSize int + Encoding string + DuplicatePolicy string + Labels map[string]string +} +type TSIncrDecrOptions struct { + Timestamp int64 + Retention int + ChunkSize int + Uncompressed bool + Labels map[string]string +} + +type TSAlterOptions struct { + Retention int + ChunkSize int + DuplicatePolicy string + Labels map[string]string +} + +type TSCreateRuleOptions struct { + alignTimestamp int64 +} + +type TSGetOptions struct { + Latest bool +} + +type TSInfoOptions struct { + Debug bool +} +type Aggregator int + +const ( + Invalid = Aggregator(iota) + Avg + Sum + Min + Max + Range + Count + First + Last + StdP + StdS + VarP + VarS + Twa +) + +func (a Aggregator) String() string { + switch a { + case Invalid: + return "" + case Avg: + return "AVG" + case Sum: + return "SUM" + case Min: + return "MIN" + case Max: + return "MAX" + case Range: + return "RANGE" + case Count: + return "COUNT" + case First: + return "FIRST" + case Last: + return "LAST" + case StdP: + return "STD.P" + case StdS: + return "STD.S" + case VarP: + return "VAR.P" + case VarS: + return "VAR.S" + case Twa: + return "TWA" + default: + return "" + } +} + +type TSRangeOptions struct { + Latest bool + FilterByTS []int + FilterByValue []int + Count int + Align interface{} + Aggregator Aggregator + BucketDuration int + BucketTimestamp interface{} + Empty bool +} + +type TSRevRangeOptions struct { + Latest bool + FilterByTS []int + FilterByValue []int + Count int + Align interface{} + Aggregator Aggregator + BucketDuration int + BucketTimestamp interface{} + Empty bool +} + +type TSMRangeOptions struct { + Latest bool + FilterByTS []int + FilterByValue []int + WithLabels bool + SelectedLabels []interface{} + Count int + Align interface{} + Aggregator Aggregator + BucketDuration int + BucketTimestamp interface{} + Empty bool + GroupByLabel interface{} + Reducer interface{} +} + +type TSMRevRangeOptions struct { + Latest bool + FilterByTS []int + FilterByValue []int + WithLabels bool + SelectedLabels []interface{} + Count int + Align interface{} + Aggregator Aggregator + BucketDuration int + BucketTimestamp interface{} + Empty bool + GroupByLabel interface{} + Reducer interface{} +} + +type TSMGetOptions struct { + Latest bool + WithLabels bool + SelectedLabels []interface{} +} + +// TSAdd - Adds one or more observations to a t-digest sketch. +// For more information - https://redis.io/commands/ts.add/ +func (c cmdable) TSAdd(ctx context.Context, key string, timestamp interface{}, value float64) *IntCmd { + args := []interface{}{"TS.ADD", key, timestamp, value} + cmd := NewIntCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// TSAddWithArgs - Adds one or more observations to a t-digest sketch. +// This function also allows for specifying additional options such as: +// Retention, ChunkSize, Encoding, DuplicatePolicy and Labels. +// For more information - https://redis.io/commands/ts.add/ +func (c cmdable) TSAddWithArgs(ctx context.Context, key string, timestamp interface{}, value float64, options *TSOptions) *IntCmd { + args := []interface{}{"TS.ADD", key, timestamp, value} + if options != nil { + if options.Retention != 0 { + args = append(args, "RETENTION", options.Retention) + } + if options.ChunkSize != 0 { + args = append(args, "CHUNK_SIZE", options.ChunkSize) + + } + if options.Encoding != "" { + args = append(args, "ENCODING", options.Encoding) + } + + if options.DuplicatePolicy != "" { + args = append(args, "DUPLICATE_POLICY", options.DuplicatePolicy) + } + if options.Labels != nil { + args = append(args, "LABELS") + for label, value := range options.Labels { + args = append(args, label, value) + } + } + } + cmd := NewIntCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// TSCreate - Creates a new time-series key. +// For more information - https://redis.io/commands/ts.create/ +func (c cmdable) TSCreate(ctx context.Context, key string) *StatusCmd { + args := []interface{}{"TS.CREATE", key} + cmd := NewStatusCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// TSCreateWithArgs - Creates a new time-series key with additional options. +// This function allows for specifying additional options such as: +// Retention, ChunkSize, Encoding, DuplicatePolicy and Labels. +// For more information - https://redis.io/commands/ts.create/ +func (c cmdable) TSCreateWithArgs(ctx context.Context, key string, options *TSOptions) *StatusCmd { + args := []interface{}{"TS.CREATE", key} + if options != nil { + if options.Retention != 0 { + args = append(args, "RETENTION", options.Retention) + } + if options.ChunkSize != 0 { + args = append(args, "CHUNK_SIZE", options.ChunkSize) + + } + if options.Encoding != "" { + args = append(args, "ENCODING", options.Encoding) + } + + if options.DuplicatePolicy != "" { + args = append(args, "DUPLICATE_POLICY", options.DuplicatePolicy) + } + if options.Labels != nil { + args = append(args, "LABELS") + for label, value := range options.Labels { + args = append(args, label, value) + + } + } + } + cmd := NewStatusCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// TSAlter - Alters an existing time-series key with additional options. +// This function allows for specifying additional options such as: +// Retention, ChunkSize and DuplicatePolicy. +// For more information - https://redis.io/commands/ts.alter/ +func (c cmdable) TSAlter(ctx context.Context, key string, options *TSAlterOptions) *StatusCmd { + args := []interface{}{"TS.ALTER", key} + if options != nil { + if options.Retention != 0 { + args = append(args, "RETENTION", options.Retention) + } + if options.ChunkSize != 0 { + args = append(args, "CHUNK_SIZE", options.ChunkSize) + + } + if options.DuplicatePolicy != "" { + args = append(args, "DUPLICATE_POLICY", options.DuplicatePolicy) + } + if options.Labels != nil { + args = append(args, "LABELS") + for label, value := range options.Labels { + args = append(args, label, value) + + } + } + } + cmd := NewStatusCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// TSCreateRule - Creates a compaction rule from sourceKey to destKey. +// For more information - https://redis.io/commands/ts.createrule/ +func (c cmdable) TSCreateRule(ctx context.Context, sourceKey string, destKey string, aggregator Aggregator, bucketDuration int) *StatusCmd { + args := []interface{}{"TS.CREATERULE", sourceKey, destKey, "AGGREGATION", aggregator.String(), bucketDuration} + cmd := NewStatusCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// TSCreateRuleWithArgs - Creates a compaction rule from sourceKey to destKey with additional option. +// This function allows for specifying additional option such as: +// alignTimestamp. +// For more information - https://redis.io/commands/ts.createrule/ +func (c cmdable) TSCreateRuleWithArgs(ctx context.Context, sourceKey string, destKey string, aggregator Aggregator, bucketDuration int, options *TSCreateRuleOptions) *StatusCmd { + args := []interface{}{"TS.CREATERULE", sourceKey, destKey, "AGGREGATION", aggregator.String(), bucketDuration} + if options != nil { + if options.alignTimestamp != 0 { + args = append(args, options.alignTimestamp) + } + } + cmd := NewStatusCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// TSIncrBy - Increments the value of a time-series key by the specified timestamp. +// For more information - https://redis.io/commands/ts.incrby/ +func (c cmdable) TSIncrBy(ctx context.Context, Key string, timestamp float64) *IntCmd { + args := []interface{}{"TS.INCRBY", Key, timestamp} + cmd := NewIntCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// TSIncrByWithArgs - Increments the value of a time-series key by the specified timestamp with additional options. +// This function allows for specifying additional options such as: +// Timestamp, Retention, ChunkSize, Uncompressed and Labels. +// For more information - https://redis.io/commands/ts.incrby/ +func (c cmdable) TSIncrByWithArgs(ctx context.Context, key string, timestamp float64, options *TSIncrDecrOptions) *IntCmd { + args := []interface{}{"TS.INCRBY", key, timestamp} + if options != nil { + if options.Timestamp != 0 { + args = append(args, "TIMESTAMP", options.Timestamp) + } + if options.Retention != 0 { + args = append(args, "RETENTION", options.Retention) + } + if options.ChunkSize != 0 { + args = append(args, "CHUNK_SIZE", options.ChunkSize) + + } + if options.Uncompressed { + args = append(args, "UNCOMPRESSED") + } + if options.Labels != nil { + args = append(args, "LABELS") + for label, value := range options.Labels { + args = append(args, label, value) + + } + } + } + cmd := NewIntCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// TSDecrBy - Decrements the value of a time-series key by the specified timestamp. +// For more information - https://redis.io/commands/ts.decrby/ +func (c cmdable) TSDecrBy(ctx context.Context, Key string, timestamp float64) *IntCmd { + args := []interface{}{"TS.DECRBY", Key, timestamp} + cmd := NewIntCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// TSDecrByWithArgs - Decrements the value of a time-series key by the specified timestamp with additional options. +// This function allows for specifying additional options such as: +// Timestamp, Retention, ChunkSize, Uncompressed and Labels. +// For more information - https://redis.io/commands/ts.decrby/ +func (c cmdable) TSDecrByWithArgs(ctx context.Context, key string, timestamp float64, options *TSIncrDecrOptions) *IntCmd { + args := []interface{}{"TS.DECRBY", key, timestamp} + if options != nil { + if options.Timestamp != 0 { + args = append(args, "TIMESTAMP", options.Timestamp) + } + if options.Retention != 0 { + args = append(args, "RETENTION", options.Retention) + } + if options.ChunkSize != 0 { + args = append(args, "CHUNK_SIZE", options.ChunkSize) + + } + if options.Uncompressed { + args = append(args, "UNCOMPRESSED") + } + if options.Labels != nil { + args = append(args, "LABELS") + for label, value := range options.Labels { + args = append(args, label, value) + + } + } + } + cmd := NewIntCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// TSDel - Deletes a range of samples from a time-series key. +// For more information - https://redis.io/commands/ts.del/ +func (c cmdable) TSDel(ctx context.Context, Key string, fromTimestamp int, toTimestamp int) *IntCmd { + args := []interface{}{"TS.DEL", Key, fromTimestamp, toTimestamp} + cmd := NewIntCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// TSDeleteRule - Deletes a compaction rule from sourceKey to destKey. +// For more information - https://redis.io/commands/ts.deleterule/ +func (c cmdable) TSDeleteRule(ctx context.Context, sourceKey string, destKey string) *StatusCmd { + args := []interface{}{"TS.DELETERULE", sourceKey, destKey} + cmd := NewStatusCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// TSGetWithArgs - Gets the last sample of a time-series key with additional option. +// This function allows for specifying additional option such as: +// Latest. +// For more information - https://redis.io/commands/ts.get/ +func (c cmdable) TSGetWithArgs(ctx context.Context, key string, options *TSGetOptions) *TSTimestampValueCmd { + args := []interface{}{"TS.GET", key} + if options != nil { + if options.Latest { + args = append(args, "LATEST") + } + } + cmd := newTSTimestampValueCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// TSGet - Gets the last sample of a time-series key. +// For more information - https://redis.io/commands/ts.get/ +func (c cmdable) TSGet(ctx context.Context, key string) *TSTimestampValueCmd { + args := []interface{}{"TS.GET", key} + cmd := newTSTimestampValueCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +type TSTimestampValue struct { + Timestamp int64 + Value float64 +} +type TSTimestampValueCmd struct { + baseCmd + val TSTimestampValue +} + +func newTSTimestampValueCmd(ctx context.Context, args ...interface{}) *TSTimestampValueCmd { + return &TSTimestampValueCmd{ + baseCmd: baseCmd{ + ctx: ctx, + args: args, + }, + } +} + +func (cmd *TSTimestampValueCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *TSTimestampValueCmd) SetVal(val TSTimestampValue) { + cmd.val = val +} + +func (cmd *TSTimestampValueCmd) Result() (TSTimestampValue, error) { + return cmd.val, cmd.err +} + +func (cmd *TSTimestampValueCmd) Val() TSTimestampValue { + return cmd.val +} + +func (cmd *TSTimestampValueCmd) readReply(rd *proto.Reader) (err error) { + n, err := rd.ReadMapLen() + if err != nil { + return err + } + cmd.val = TSTimestampValue{} + for i := 0; i < n; i++ { + timestamp, err := rd.ReadInt() + if err != nil { + return err + } + value, err := rd.ReadString() + if err != nil { + return err + } + cmd.val.Timestamp = timestamp + cmd.val.Value, err = strconv.ParseFloat(value, 64) + if err != nil { + return err + } + } + + return nil +} + +// TSInfo - Returns information about a time-series key. +// For more information - https://redis.io/commands/ts.info/ +func (c cmdable) TSInfo(ctx context.Context, key string) *MapStringInterfaceCmd { + args := []interface{}{"TS.INFO", key} + cmd := NewMapStringInterfaceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// TSInfoWithArgs - Returns information about a time-series key with additional option. +// This function allows for specifying additional option such as: +// Debug. +// For more information - https://redis.io/commands/ts.info/ +func (c cmdable) TSInfoWithArgs(ctx context.Context, key string, options *TSInfoOptions) *MapStringInterfaceCmd { + args := []interface{}{"TS.INFO", key} + if options != nil { + if options.Debug { + args = append(args, "DEBUG") + } + } + cmd := NewMapStringInterfaceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// TSMAdd - Adds multiple samples to multiple time-series keys. +// For more information - https://redis.io/commands/ts.madd/ +func (c cmdable) TSMAdd(ctx context.Context, ktvSlices [][]interface{}) *IntSliceCmd { + args := []interface{}{"TS.MADD"} + for _, ktv := range ktvSlices { + args = append(args, ktv...) + } + cmd := NewIntSliceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// TSQueryIndex - Returns all the keys matching the filter expression. +// For more information - https://redis.io/commands/ts.queryindex/ +func (c cmdable) TSQueryIndex(ctx context.Context, filterExpr []string) *StringSliceCmd { + args := []interface{}{"TS.QUERYINDEX"} + for _, f := range filterExpr { + args = append(args, f) + } + cmd := NewStringSliceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// TSRevRange - Returns a range of samples from a time-series key in reverse order. +// For more information - https://redis.io/commands/ts.revrange/ +func (c cmdable) TSRevRange(ctx context.Context, key string, fromTimestamp int, toTimestamp int) *TSTimestampValueSliceCmd { + args := []interface{}{"TS.REVRANGE", key, fromTimestamp, toTimestamp} + cmd := newTSTimestampValueSliceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// TSRevRangeWithArgs - Returns a range of samples from a time-series key in reverse order with additional options. +// This function allows for specifying additional options such as: +// Latest, FilterByTS, FilterByValue, Count, Align, Aggregator, +// BucketDuration, BucketTimestamp and Empty. +// For more information - https://redis.io/commands/ts.revrange/ +func (c cmdable) TSRevRangeWithArgs(ctx context.Context, key string, fromTimestamp int, toTimestamp int, options *TSRevRangeOptions) *TSTimestampValueSliceCmd { + args := []interface{}{"TS.REVRANGE", key, fromTimestamp, toTimestamp} + if options != nil { + if options.Latest { + args = append(args, "LATEST") + } + if options.FilterByTS != nil { + args = append(args, "FILTER_BY_TS") + for _, f := range options.FilterByTS { + args = append(args, f) + } + } + if options.FilterByValue != nil { + args = append(args, "FILTER_BY_VALUE") + for _, f := range options.FilterByValue { + args = append(args, f) + } + } + if options.Count != 0 { + args = append(args, "COUNT", options.Count) + } + if options.Align != nil { + args = append(args, "ALIGN", options.Align) + } + if options.Aggregator != 0 { + args = append(args, "AGGREGATION", options.Aggregator.String()) + } + if options.BucketDuration != 0 { + args = append(args, options.BucketDuration) + } + if options.BucketTimestamp != nil { + args = append(args, "BUCKETTIMESTAMP", options.BucketTimestamp) + } + if options.Empty { + args = append(args, "EMPTY") + } + } + cmd := newTSTimestampValueSliceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// TSRange - Returns a range of samples from a time-series key. +// For more information - https://redis.io/commands/ts.range/ +func (c cmdable) TSRange(ctx context.Context, key string, fromTimestamp int, toTimestamp int) *TSTimestampValueSliceCmd { + args := []interface{}{"TS.RANGE", key, fromTimestamp, toTimestamp} + cmd := newTSTimestampValueSliceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// TSRangeWithArgs - Returns a range of samples from a time-series key with additional options. +// This function allows for specifying additional options such as: +// Latest, FilterByTS, FilterByValue, Count, Align, Aggregator, +// BucketDuration, BucketTimestamp and Empty. +// For more information - https://redis.io/commands/ts.range/ +func (c cmdable) TSRangeWithArgs(ctx context.Context, key string, fromTimestamp int, toTimestamp int, options *TSRangeOptions) *TSTimestampValueSliceCmd { + args := []interface{}{"TS.RANGE", key, fromTimestamp, toTimestamp} + if options != nil { + if options.Latest { + args = append(args, "LATEST") + } + if options.FilterByTS != nil { + args = append(args, "FILTER_BY_TS") + for _, f := range options.FilterByTS { + args = append(args, f) + } + } + if options.FilterByValue != nil { + args = append(args, "FILTER_BY_VALUE") + for _, f := range options.FilterByValue { + args = append(args, f) + } + } + if options.Count != 0 { + args = append(args, "COUNT", options.Count) + } + if options.Align != nil { + args = append(args, "ALIGN", options.Align) + } + if options.Aggregator != 0 { + args = append(args, "AGGREGATION", options.Aggregator.String()) + } + if options.BucketDuration != 0 { + args = append(args, options.BucketDuration) + } + if options.BucketTimestamp != nil { + args = append(args, "BUCKETTIMESTAMP", options.BucketTimestamp) + } + if options.Empty { + args = append(args, "EMPTY") + } + } + cmd := newTSTimestampValueSliceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +type TSTimestampValueSliceCmd struct { + baseCmd + val []TSTimestampValue +} + +func newTSTimestampValueSliceCmd(ctx context.Context, args ...interface{}) *TSTimestampValueSliceCmd { + return &TSTimestampValueSliceCmd{ + baseCmd: baseCmd{ + ctx: ctx, + args: args, + }, + } +} + +func (cmd *TSTimestampValueSliceCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *TSTimestampValueSliceCmd) SetVal(val []TSTimestampValue) { + cmd.val = val +} + +func (cmd *TSTimestampValueSliceCmd) Result() ([]TSTimestampValue, error) { + return cmd.val, cmd.err +} + +func (cmd *TSTimestampValueSliceCmd) Val() []TSTimestampValue { + return cmd.val +} + +func (cmd *TSTimestampValueSliceCmd) readReply(rd *proto.Reader) (err error) { + n, err := rd.ReadArrayLen() + if err != nil { + return err + } + cmd.val = make([]TSTimestampValue, n) + for i := 0; i < n; i++ { + _, _ = rd.ReadArrayLen() + timestamp, err := rd.ReadInt() + if err != nil { + return err + } + value, err := rd.ReadString() + if err != nil { + return err + } + cmd.val[i].Timestamp = timestamp + cmd.val[i].Value, err = strconv.ParseFloat(value, 64) + if err != nil { + return err + } + } + + return nil +} + +// TSMRange - Returns a range of samples from multiple time-series keys. +// For more information - https://redis.io/commands/ts.mrange/ +func (c cmdable) TSMRange(ctx context.Context, fromTimestamp int, toTimestamp int, filterExpr []string) *MapStringSliceInterfaceCmd { + args := []interface{}{"TS.MRANGE", fromTimestamp, toTimestamp, "FILTER"} + for _, f := range filterExpr { + args = append(args, f) + } + cmd := NewMapStringSliceInterfaceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// TSMRangeWithArgs - Returns a range of samples from multiple time-series keys with additional options. +// This function allows for specifying additional options such as: +// Latest, FilterByTS, FilterByValue, WithLabels, SelectedLabels, +// Count, Align, Aggregator, BucketDuration, BucketTimestamp, +// Empty, GroupByLabel and Reducer. +// For more information - https://redis.io/commands/ts.mrange/ +func (c cmdable) TSMRangeWithArgs(ctx context.Context, fromTimestamp int, toTimestamp int, filterExpr []string, options *TSMRangeOptions) *MapStringSliceInterfaceCmd { + args := []interface{}{"TS.MRANGE", fromTimestamp, toTimestamp} + if options != nil { + if options.Latest { + args = append(args, "LATEST") + } + if options.FilterByTS != nil { + args = append(args, "FILTER_BY_TS") + for _, f := range options.FilterByTS { + args = append(args, f) + } + } + if options.FilterByValue != nil { + args = append(args, "FILTER_BY_VALUE") + for _, f := range options.FilterByValue { + args = append(args, f) + } + } + if options.WithLabels { + args = append(args, "WITHLABELS") + } + if options.SelectedLabels != nil { + args = append(args, "SELECTED_LABELS") + args = append(args, options.SelectedLabels...) + } + if options.Count != 0 { + args = append(args, "COUNT", options.Count) + } + if options.Align != nil { + args = append(args, "ALIGN", options.Align) + } + if options.Aggregator != 0 { + args = append(args, "AGGREGATION", options.Aggregator.String()) + } + if options.BucketDuration != 0 { + args = append(args, options.BucketDuration) + } + if options.BucketTimestamp != nil { + args = append(args, "BUCKETTIMESTAMP", options.BucketTimestamp) + } + if options.Empty { + args = append(args, "EMPTY") + } + } + args = append(args, "FILTER") + for _, f := range filterExpr { + args = append(args, f) + } + if options != nil { + if options.GroupByLabel != nil { + args = append(args, "GROUPBY", options.GroupByLabel) + } + if options.Reducer != nil { + args = append(args, "REDUCE", options.Reducer) + } + } + cmd := NewMapStringSliceInterfaceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// TSMRevRange - Returns a range of samples from multiple time-series keys in reverse order. +// For more information - https://redis.io/commands/ts.mrevrange/ +func (c cmdable) TSMRevRange(ctx context.Context, fromTimestamp int, toTimestamp int, filterExpr []string) *MapStringSliceInterfaceCmd { + args := []interface{}{"TS.MREVRANGE", fromTimestamp, toTimestamp, "FILTER"} + for _, f := range filterExpr { + args = append(args, f) + } + cmd := NewMapStringSliceInterfaceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// TSMRevRangeWithArgs - Returns a range of samples from multiple time-series keys in reverse order with additional options. +// This function allows for specifying additional options such as: +// Latest, FilterByTS, FilterByValue, WithLabels, SelectedLabels, +// Count, Align, Aggregator, BucketDuration, BucketTimestamp, +// Empty, GroupByLabel and Reducer. +// For more information - https://redis.io/commands/ts.mrevrange/ +func (c cmdable) TSMRevRangeWithArgs(ctx context.Context, fromTimestamp int, toTimestamp int, filterExpr []string, options *TSMRevRangeOptions) *MapStringSliceInterfaceCmd { + args := []interface{}{"TS.MREVRANGE", fromTimestamp, toTimestamp} + if options != nil { + if options.Latest { + args = append(args, "LATEST") + } + if options.FilterByTS != nil { + args = append(args, "FILTER_BY_TS") + for _, f := range options.FilterByTS { + args = append(args, f) + } + } + if options.FilterByValue != nil { + args = append(args, "FILTER_BY_VALUE") + for _, f := range options.FilterByValue { + args = append(args, f) + } + } + if options.WithLabels { + args = append(args, "WITHLABELS") + } + if options.SelectedLabels != nil { + args = append(args, "SELECTED_LABELS") + args = append(args, options.SelectedLabels...) + } + if options.Count != 0 { + args = append(args, "COUNT", options.Count) + } + if options.Align != nil { + args = append(args, "ALIGN", options.Align) + } + if options.Aggregator != 0 { + args = append(args, "AGGREGATION", options.Aggregator.String()) + } + if options.BucketDuration != 0 { + args = append(args, options.BucketDuration) + } + if options.BucketTimestamp != nil { + args = append(args, "BUCKETTIMESTAMP", options.BucketTimestamp) + } + if options.Empty { + args = append(args, "EMPTY") + } + } + args = append(args, "FILTER") + for _, f := range filterExpr { + args = append(args, f) + } + if options != nil { + if options.GroupByLabel != nil { + args = append(args, "GROUPBY", options.GroupByLabel) + } + if options.Reducer != nil { + args = append(args, "REDUCE", options.Reducer) + } + } + cmd := NewMapStringSliceInterfaceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// TSMGet - Returns the last sample of multiple time-series keys. +// For more information - https://redis.io/commands/ts.mget/ +func (c cmdable) TSMGet(ctx context.Context, filters []string) *MapStringSliceInterfaceCmd { + args := []interface{}{"TS.MGET", "FILTER"} + for _, f := range filters { + args = append(args, f) + } + cmd := NewMapStringSliceInterfaceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// TSMGetWithArgs - Returns the last sample of multiple time-series keys with additional options. +// This function allows for specifying additional options such as: +// Latest, WithLabels and SelectedLabels. +// For more information - https://redis.io/commands/ts.mget/ +func (c cmdable) TSMGetWithArgs(ctx context.Context, filters []string, options *TSMGetOptions) *MapStringSliceInterfaceCmd { + args := []interface{}{"TS.MGET"} + if options != nil { + if options.Latest { + args = append(args, "LATEST") + } + if options.WithLabels { + args = append(args, "WITHLABELS") + } + if options.SelectedLabels != nil { + args = append(args, "SELECTED_LABELS") + args = append(args, options.SelectedLabels...) + } + } + args = append(args, "FILTER") + for _, f := range filters { + args = append(args, f) + } + cmd := NewMapStringSliceInterfaceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} diff --git a/redis_timeseries_test.go b/redis_timeseries_test.go new file mode 100644 index 00000000..291274de --- /dev/null +++ b/redis_timeseries_test.go @@ -0,0 +1,951 @@ +package redis_test + +import ( + "context" + "strings" + + . "github.com/bsm/ginkgo/v2" + . "github.com/bsm/gomega" + "github.com/redis/go-redis/v9" +) + +var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() { + ctx := context.TODO() + var client *redis.Client + + BeforeEach(func() { + client = redis.NewClient(&redis.Options{Addr: ":6379"}) + Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) + }) + + AfterEach(func() { + Expect(client.Close()).NotTo(HaveOccurred()) + }) + + It("should TSCreate and TSCreateWithArgs", Label("timeseries", "tscreate", "tscreateWithArgs"), func() { + result, err := client.TSCreate(ctx, "1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo("OK")) + // Test TSCreateWithArgs + opt := &redis.TSOptions{Retention: 5} + result, err = client.TSCreateWithArgs(ctx, "2", opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo("OK")) + opt = &redis.TSOptions{Labels: map[string]string{"Redis": "Labs"}} + result, err = client.TSCreateWithArgs(ctx, "3", opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo("OK")) + opt = &redis.TSOptions{Labels: map[string]string{"Time": "Series"}, Retention: 20} + result, err = client.TSCreateWithArgs(ctx, "4", opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo("OK")) + resultInfo, err := client.TSInfo(ctx, "4").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultInfo["labels"].(map[interface{}]interface{})["Time"]).To(BeEquivalentTo("Series")) + // Test chunk size + opt = &redis.TSOptions{ChunkSize: 128} + result, err = client.TSCreateWithArgs(ctx, "ts-cs-1", opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo("OK")) + resultInfo, err = client.TSInfo(ctx, "ts-cs-1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultInfo["chunkSize"]).To(BeEquivalentTo(128)) + // Test duplicate policy + duplicate_policies := []string{"BLOCK", "LAST", "FIRST", "MIN", "MAX"} + for _, dup := range duplicate_policies { + keyName := "ts-dup-" + dup + opt = &redis.TSOptions{DuplicatePolicy: dup} + result, err = client.TSCreateWithArgs(ctx, keyName, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo("OK")) + resultInfo, err = client.TSInfo(ctx, keyName).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(strings.ToUpper(resultInfo["duplicatePolicy"].(string))).To(BeEquivalentTo(dup)) + + } + }) + It("should TSAdd and TSAddWithArgs", Label("timeseries", "tsadd", "tsaddWithArgs"), func() { + result, err := client.TSAdd(ctx, "1", 1, 1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo(1)) + // Test TSAddWithArgs + opt := &redis.TSOptions{Retention: 10} + result, err = client.TSAddWithArgs(ctx, "2", 2, 3, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo(2)) + opt = &redis.TSOptions{Labels: map[string]string{"Redis": "Labs"}} + result, err = client.TSAddWithArgs(ctx, "3", 3, 2, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo(3)) + opt = &redis.TSOptions{Labels: map[string]string{"Redis": "Labs", "Time": "Series"}, Retention: 10} + result, err = client.TSAddWithArgs(ctx, "4", 4, 2, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo(4)) + resultInfo, err := client.TSInfo(ctx, "4").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultInfo["labels"].(map[interface{}]interface{})["Time"]).To(BeEquivalentTo("Series")) + // Test chunk size + opt = &redis.TSOptions{ChunkSize: 128} + result, err = client.TSAddWithArgs(ctx, "ts-cs-1", 1, 10, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo(1)) + resultInfo, err = client.TSInfo(ctx, "ts-cs-1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultInfo["chunkSize"]).To(BeEquivalentTo(128)) + // Test duplicate policy + // LAST + opt = &redis.TSOptions{DuplicatePolicy: "LAST"} + result, err = client.TSAddWithArgs(ctx, "tsal-1", 1, 5, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo(1)) + result, err = client.TSAddWithArgs(ctx, "tsal-1", 1, 10, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo(1)) + resultGet, err := client.TSGet(ctx, "tsal-1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultGet.Value).To(BeEquivalentTo(10)) + // FIRST + opt = &redis.TSOptions{DuplicatePolicy: "FIRST"} + result, err = client.TSAddWithArgs(ctx, "tsaf-1", 1, 5, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo(1)) + result, err = client.TSAddWithArgs(ctx, "tsaf-1", 1, 10, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo(1)) + resultGet, err = client.TSGet(ctx, "tsaf-1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultGet.Value).To(BeEquivalentTo(5)) + // MAX + opt = &redis.TSOptions{DuplicatePolicy: "MAX"} + result, err = client.TSAddWithArgs(ctx, "tsam-1", 1, 5, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo(1)) + result, err = client.TSAddWithArgs(ctx, "tsam-1", 1, 10, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo(1)) + resultGet, err = client.TSGet(ctx, "tsam-1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultGet.Value).To(BeEquivalentTo(10)) + // MIN + opt = &redis.TSOptions{DuplicatePolicy: "MIN"} + result, err = client.TSAddWithArgs(ctx, "tsami-1", 1, 5, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo(1)) + result, err = client.TSAddWithArgs(ctx, "tsami-1", 1, 10, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo(1)) + resultGet, err = client.TSGet(ctx, "tsami-1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultGet.Value).To(BeEquivalentTo(5)) + + }) + + It("should TSAlter", Label("timeseries", "tsalter"), func() { + result, err := client.TSCreate(ctx, "1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo("OK")) + resultInfo, err := client.TSInfo(ctx, "1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultInfo["retentionTime"]).To(BeEquivalentTo(0)) + + opt := &redis.TSAlterOptions{Retention: 10} + resultAlter, err := client.TSAlter(ctx, "1", opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAlter).To(BeEquivalentTo("OK")) + + resultInfo, err = client.TSInfo(ctx, "1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultInfo["retentionTime"]).To(BeEquivalentTo(10)) + + resultInfo, err = client.TSInfo(ctx, "1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultInfo["labels"]).To(BeEquivalentTo(map[interface{}]interface{}{})) + + opt = &redis.TSAlterOptions{Labels: map[string]string{"Time": "Series"}} + resultAlter, err = client.TSAlter(ctx, "1", opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAlter).To(BeEquivalentTo("OK")) + + resultInfo, err = client.TSInfo(ctx, "1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultInfo["labels"].(map[interface{}]interface{})["Time"]).To(BeEquivalentTo("Series")) + Expect(resultInfo["retentionTime"]).To(BeEquivalentTo(10)) + Expect(resultInfo["duplicatePolicy"]).To(BeEquivalentTo(redis.Nil)) + opt = &redis.TSAlterOptions{DuplicatePolicy: "min"} + resultAlter, err = client.TSAlter(ctx, "1", opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAlter).To(BeEquivalentTo("OK")) + + resultInfo, err = client.TSInfo(ctx, "1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultInfo["duplicatePolicy"]).To(BeEquivalentTo("min")) + + }) + + It("should TSCreateRule and TSDeleteRule", Label("timeseries", "tscreaterule", "tsdeleterule"), func() { + result, err := client.TSCreate(ctx, "1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo("OK")) + result, err = client.TSCreate(ctx, "2").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo("OK")) + result, err = client.TSCreateRule(ctx, "1", "2", redis.Avg, 100).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo("OK")) + for i := 0; i < 50; i++ { + resultAdd, err := client.TSAdd(ctx, "1", 100+i*2, 1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo(100 + i*2)) + resultAdd, err = client.TSAdd(ctx, "1", 100+i*2+1, 2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo(100 + i*2 + 1)) + + } + resultAdd, err := client.TSAdd(ctx, "1", 100*2, 1.5).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo(100 * 2)) + resultGet, err := client.TSGet(ctx, "2").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultGet.Value).To(BeEquivalentTo(1.5)) + Expect(resultGet.Timestamp).To(BeEquivalentTo(100)) + + resultDeleteRule, err := client.TSDeleteRule(ctx, "1", "2").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultDeleteRule).To(BeEquivalentTo("OK")) + resultInfo, err := client.TSInfo(ctx, "1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultInfo["rules"]).To(BeEquivalentTo(map[interface{}]interface{}{})) + + }) + + It("should TSIncrBy, TSIncrByWithArgs, TSDecrBy and TSDecrByWithArgs", Label("timeseries", "tsincrby", "tsdecrby", "tsincrbyWithArgs", "tsdecrbyWithArgs"), func() { + for i := 0; i < 100; i++ { + _, err := client.TSIncrBy(ctx, "1", 1).Result() + Expect(err).NotTo(HaveOccurred()) + } + result, err := client.TSGet(ctx, "1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result.Value).To(BeEquivalentTo(100)) + + for i := 0; i < 100; i++ { + _, err := client.TSDecrBy(ctx, "1", 1).Result() + Expect(err).NotTo(HaveOccurred()) + } + result, err = client.TSGet(ctx, "1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result.Value).To(BeEquivalentTo(0)) + + opt := &redis.TSIncrDecrOptions{Timestamp: 5} + _, err = client.TSIncrByWithArgs(ctx, "2", 1.5, opt).Result() + Expect(err).NotTo(HaveOccurred()) + + result, err = client.TSGet(ctx, "2").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result.Timestamp).To(BeEquivalentTo(5)) + Expect(result.Value).To(BeEquivalentTo(1.5)) + + opt = &redis.TSIncrDecrOptions{Timestamp: 7} + _, err = client.TSIncrByWithArgs(ctx, "2", 2.25, opt).Result() + Expect(err).NotTo(HaveOccurred()) + + result, err = client.TSGet(ctx, "2").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result.Timestamp).To(BeEquivalentTo(7)) + Expect(result.Value).To(BeEquivalentTo(3.75)) + + opt = &redis.TSIncrDecrOptions{Timestamp: 15} + _, err = client.TSDecrByWithArgs(ctx, "2", 1.5, opt).Result() + Expect(err).NotTo(HaveOccurred()) + + result, err = client.TSGet(ctx, "2").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result.Timestamp).To(BeEquivalentTo(15)) + Expect(result.Value).To(BeEquivalentTo(2.25)) + + // Test chunk size INCRBY + opt = &redis.TSIncrDecrOptions{ChunkSize: 128} + _, err = client.TSIncrByWithArgs(ctx, "3", 10, opt).Result() + Expect(err).NotTo(HaveOccurred()) + + resultInfo, err := client.TSInfo(ctx, "3").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultInfo["chunkSize"]).To(BeEquivalentTo(128)) + + // Test chunk size DECRBY + opt = &redis.TSIncrDecrOptions{ChunkSize: 128} + _, err = client.TSDecrByWithArgs(ctx, "4", 10, opt).Result() + Expect(err).NotTo(HaveOccurred()) + + resultInfo, err = client.TSInfo(ctx, "4").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultInfo["chunkSize"]).To(BeEquivalentTo(128)) + }) + + It("should TSGet", Label("timeseries", "tsget"), func() { + opt := &redis.TSOptions{DuplicatePolicy: "max"} + resultGet, err := client.TSAddWithArgs(ctx, "foo", 2265985, 151, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultGet).To(BeEquivalentTo(2265985)) + result, err := client.TSGet(ctx, "foo").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result.Timestamp).To(BeEquivalentTo(2265985)) + Expect(result.Value).To(BeEquivalentTo(151)) + + }) + + It("should TSGet Latest", Label("timeseries", "tsgetlatest"), func() { + resultGet, err := client.TSCreate(ctx, "tsgl-1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultGet).To(BeEquivalentTo("OK")) + resultGet, err = client.TSCreate(ctx, "tsgl-2").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultGet).To(BeEquivalentTo("OK")) + resultGet, err = client.TSCreateRule(ctx, "tsgl-1", "tsgl-2", redis.Sum, 10).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultGet).To(BeEquivalentTo("OK")) + _, err = client.TSAdd(ctx, "tsgl-1", 1, 1).Result() + Expect(err).NotTo(HaveOccurred()) + _, err = client.TSAdd(ctx, "tsgl-1", 2, 3).Result() + Expect(err).NotTo(HaveOccurred()) + _, err = client.TSAdd(ctx, "tsgl-1", 11, 7).Result() + Expect(err).NotTo(HaveOccurred()) + _, err = client.TSAdd(ctx, "tsgl-1", 13, 1).Result() + Expect(err).NotTo(HaveOccurred()) + result, errGet := client.TSGet(ctx, "tsgl-2").Result() + Expect(errGet).NotTo(HaveOccurred()) + Expect(result.Timestamp).To(BeEquivalentTo(0)) + Expect(result.Value).To(BeEquivalentTo(4)) + result, errGet = client.TSGetWithArgs(ctx, "tsgl-2", &redis.TSGetOptions{Latest: true}).Result() + Expect(errGet).NotTo(HaveOccurred()) + Expect(result.Timestamp).To(BeEquivalentTo(10)) + Expect(result.Value).To(BeEquivalentTo(8)) + }) + + It("should TSInfo", Label("timeseries", "tsinfo"), func() { + resultGet, err := client.TSAdd(ctx, "foo", 2265985, 151).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultGet).To(BeEquivalentTo(2265985)) + result, err := client.TSInfo(ctx, "foo").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result["firstTimestamp"]).To(BeEquivalentTo(2265985)) + + }) + + It("should TSMAdd", Label("timeseries", "tsmadd"), func() { + resultGet, err := client.TSCreate(ctx, "a").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultGet).To(BeEquivalentTo("OK")) + ktvSlices := make([][]interface{}, 3) + for i := 0; i < 3; i++ { + ktvSlices[i] = make([]interface{}, 3) + ktvSlices[i][0] = "a" + for j := 1; j < 3; j++ { + ktvSlices[i][j] = (i + j) * j + } + } + result, err := client.TSMAdd(ctx, ktvSlices).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo([]int64{1, 2, 3})) + + }) + + It("should TSMGet and TSMGetWithArgs", Label("timeseries", "tsmget", "tsmgetWithArgs"), func() { + opt := &redis.TSOptions{Labels: map[string]string{"Test": "This"}} + resultCreate, err := client.TSCreateWithArgs(ctx, "a", opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultCreate).To(BeEquivalentTo("OK")) + opt = &redis.TSOptions{Labels: map[string]string{"Test": "This", "Taste": "That"}} + resultCreate, err = client.TSCreateWithArgs(ctx, "b", opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultCreate).To(BeEquivalentTo("OK")) + _, err = client.TSAdd(ctx, "a", "*", 15).Result() + Expect(err).NotTo(HaveOccurred()) + _, err = client.TSAdd(ctx, "b", "*", 25).Result() + Expect(err).NotTo(HaveOccurred()) + + result, err := client.TSMGet(ctx, []string{"Test=This"}).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result["a"][1].([]interface{})[1]).To(BeEquivalentTo(15)) + Expect(result["b"][1].([]interface{})[1]).To(BeEquivalentTo(25)) + mgetOpt := &redis.TSMGetOptions{WithLabels: true} + result, err = client.TSMGetWithArgs(ctx, []string{"Test=This"}, mgetOpt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result["b"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"Test": "This", "Taste": "That"})) + + resultCreate, err = client.TSCreate(ctx, "c").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultCreate).To(BeEquivalentTo("OK")) + opt = &redis.TSOptions{Labels: map[string]string{"is_compaction": "true"}} + resultCreate, err = client.TSCreateWithArgs(ctx, "d", opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultCreate).To(BeEquivalentTo("OK")) + resultCreateRule, err := client.TSCreateRule(ctx, "c", "d", redis.Sum, 10).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultCreateRule).To(BeEquivalentTo("OK")) + _, err = client.TSAdd(ctx, "c", 1, 1).Result() + Expect(err).NotTo(HaveOccurred()) + _, err = client.TSAdd(ctx, "c", 2, 3).Result() + Expect(err).NotTo(HaveOccurred()) + _, err = client.TSAdd(ctx, "c", 11, 7).Result() + Expect(err).NotTo(HaveOccurred()) + _, err = client.TSAdd(ctx, "c", 13, 1).Result() + Expect(err).NotTo(HaveOccurred()) + result, err = client.TSMGet(ctx, []string{"is_compaction=true"}).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result["d"][1]).To(BeEquivalentTo([]interface{}{int64(0), 4.0})) + mgetOpt = &redis.TSMGetOptions{Latest: true} + result, err = client.TSMGetWithArgs(ctx, []string{"is_compaction=true"}, mgetOpt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result["d"][1]).To(BeEquivalentTo([]interface{}{int64(10), 8.0})) + + }) + + It("should TSQueryIndex", Label("timeseries", "tsqueryindex"), func() { + opt := &redis.TSOptions{Labels: map[string]string{"Test": "This"}} + resultCreate, err := client.TSCreateWithArgs(ctx, "a", opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultCreate).To(BeEquivalentTo("OK")) + opt = &redis.TSOptions{Labels: map[string]string{"Test": "This", "Taste": "That"}} + resultCreate, err = client.TSCreateWithArgs(ctx, "b", opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultCreate).To(BeEquivalentTo("OK")) + result, err := client.TSQueryIndex(ctx, []string{"Test=This"}).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).To(BeEquivalentTo(2)) + result, err = client.TSQueryIndex(ctx, []string{"Taste=That"}).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).To(BeEquivalentTo(1)) + + }) + + It("should TSDel and TSRange", Label("timeseries", "tsdel", "tsrange"), func() { + for i := 0; i < 100; i++ { + _, err := client.TSAdd(ctx, "a", i, float64(i%7)).Result() + Expect(err).NotTo(HaveOccurred()) + } + resultDelete, err := client.TSDel(ctx, "a", 0, 21).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultDelete).To(BeEquivalentTo(22)) + + resultRange, err := client.TSRange(ctx, "a", 0, 21).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultRange).To(BeEquivalentTo([]redis.TSTimestampValue{})) + + resultRange, err = client.TSRange(ctx, "a", 22, 22).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultRange[0]).To(BeEquivalentTo(redis.TSTimestampValue{Timestamp: 22, Value: 1})) + }) + + It("should TSRange, TSRangeWithArgs", Label("timeseries", "tsrange", "tsrangeWithArgs"), func() { + for i := 0; i < 100; i++ { + _, err := client.TSAdd(ctx, "a", i, float64(i%7)).Result() + Expect(err).NotTo(HaveOccurred()) + + } + result, err := client.TSRange(ctx, "a", 0, 200).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).To(BeEquivalentTo(100)) + for i := 0; i < 100; i++ { + client.TSAdd(ctx, "a", i+200, float64(i%7)) + } + result, err = client.TSRange(ctx, "a", 0, 500).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).To(BeEquivalentTo(200)) + fts := make([]int, 0) + for i := 10; i < 20; i++ { + fts = append(fts, i) + } + opt := &redis.TSRangeOptions{FilterByTS: fts, FilterByValue: []int{1, 2}} + result, err = client.TSRangeWithArgs(ctx, "a", 0, 500, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).To(BeEquivalentTo(2)) + opt = &redis.TSRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: "+"} + result, err = client.TSRangeWithArgs(ctx, "a", 0, 10, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 0, Value: 10}, {Timestamp: 10, Value: 1}})) + opt = &redis.TSRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: "5"} + result, err = client.TSRangeWithArgs(ctx, "a", 0, 10, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 0, Value: 5}, {Timestamp: 5, Value: 6}})) + opt = &redis.TSRangeOptions{Aggregator: redis.Twa, BucketDuration: 10} + result, err = client.TSRangeWithArgs(ctx, "a", 0, 10, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 0, Value: 2.55}, {Timestamp: 10, Value: 3}})) + // Test Range Latest + resultCreate, err := client.TSCreate(ctx, "t1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultCreate).To(BeEquivalentTo("OK")) + resultCreate, err = client.TSCreate(ctx, "t2").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultCreate).To(BeEquivalentTo("OK")) + resultRule, err := client.TSCreateRule(ctx, "t1", "t2", redis.Sum, 10).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultRule).To(BeEquivalentTo("OK")) + _, errAdd := client.TSAdd(ctx, "t1", 1, 1).Result() + Expect(errAdd).NotTo(HaveOccurred()) + _, errAdd = client.TSAdd(ctx, "t1", 2, 3).Result() + Expect(errAdd).NotTo(HaveOccurred()) + _, errAdd = client.TSAdd(ctx, "t1", 11, 7).Result() + Expect(errAdd).NotTo(HaveOccurred()) + _, errAdd = client.TSAdd(ctx, "t1", 13, 1).Result() + Expect(errAdd).NotTo(HaveOccurred()) + resultRange, err := client.TSRange(ctx, "t1", 0, 20).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultRange[0]).To(BeEquivalentTo(redis.TSTimestampValue{Timestamp: 1, Value: 1})) + + opt = &redis.TSRangeOptions{Latest: true} + resultRange, err = client.TSRangeWithArgs(ctx, "t2", 0, 10, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultRange[0]).To(BeEquivalentTo(redis.TSTimestampValue{Timestamp: 0, Value: 4})) + // Test Bucket Timestamp + resultCreate, err = client.TSCreate(ctx, "t3").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultCreate).To(BeEquivalentTo("OK")) + _, errAdd = client.TSAdd(ctx, "t3", 15, 1).Result() + Expect(errAdd).NotTo(HaveOccurred()) + _, errAdd = client.TSAdd(ctx, "t3", 17, 4).Result() + Expect(errAdd).NotTo(HaveOccurred()) + _, errAdd = client.TSAdd(ctx, "t3", 51, 3).Result() + Expect(errAdd).NotTo(HaveOccurred()) + _, errAdd = client.TSAdd(ctx, "t3", 73, 5).Result() + Expect(errAdd).NotTo(HaveOccurred()) + _, errAdd = client.TSAdd(ctx, "t3", 75, 3).Result() + Expect(errAdd).NotTo(HaveOccurred()) + + opt = &redis.TSRangeOptions{Aggregator: redis.Max, Align: 0, BucketDuration: 10} + resultRange, err = client.TSRangeWithArgs(ctx, "t3", 0, 100, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultRange[0]).To(BeEquivalentTo(redis.TSTimestampValue{Timestamp: 10, Value: 4})) + Expect(len(resultRange)).To(BeEquivalentTo(3)) + + opt = &redis.TSRangeOptions{Aggregator: redis.Max, Align: 0, BucketDuration: 10, BucketTimestamp: "+"} + resultRange, err = client.TSRangeWithArgs(ctx, "t3", 0, 100, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultRange[0]).To(BeEquivalentTo(redis.TSTimestampValue{Timestamp: 20, Value: 4})) + Expect(len(resultRange)).To(BeEquivalentTo(3)) + // Test Empty + _, errAdd = client.TSAdd(ctx, "t4", 15, 1).Result() + Expect(errAdd).NotTo(HaveOccurred()) + _, errAdd = client.TSAdd(ctx, "t4", 17, 4).Result() + Expect(errAdd).NotTo(HaveOccurred()) + _, errAdd = client.TSAdd(ctx, "t4", 51, 3).Result() + Expect(errAdd).NotTo(HaveOccurred()) + _, errAdd = client.TSAdd(ctx, "t4", 73, 5).Result() + Expect(errAdd).NotTo(HaveOccurred()) + _, errAdd = client.TSAdd(ctx, "t4", 75, 3).Result() + Expect(errAdd).NotTo(HaveOccurred()) + + opt = &redis.TSRangeOptions{Aggregator: redis.Max, Align: 0, BucketDuration: 10} + resultRange, err = client.TSRangeWithArgs(ctx, "t4", 0, 100, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultRange[0]).To(BeEquivalentTo(redis.TSTimestampValue{Timestamp: 10, Value: 4})) + Expect(len(resultRange)).To(BeEquivalentTo(3)) + + opt = &redis.TSRangeOptions{Aggregator: redis.Max, Align: 0, BucketDuration: 10, Empty: true} + resultRange, err = client.TSRangeWithArgs(ctx, "t4", 0, 100, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultRange[0]).To(BeEquivalentTo(redis.TSTimestampValue{Timestamp: 10, Value: 4})) + Expect(len(resultRange)).To(BeEquivalentTo(7)) + }) + + It("should TSRevRange, TSRevRangeWithArgs", Label("timeseries", "tsrevrange", "tsrevrangeWithArgs"), func() { + for i := 0; i < 100; i++ { + _, err := client.TSAdd(ctx, "a", i, float64(i%7)).Result() + Expect(err).NotTo(HaveOccurred()) + + } + result, err := client.TSRange(ctx, "a", 0, 200).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).To(BeEquivalentTo(100)) + for i := 0; i < 100; i++ { + client.TSAdd(ctx, "a", i+200, float64(i%7)) + } + result, err = client.TSRange(ctx, "a", 0, 500).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).To(BeEquivalentTo(200)) + + opt := &redis.TSRevRangeOptions{Aggregator: redis.Avg, BucketDuration: 10} + result, err = client.TSRevRangeWithArgs(ctx, "a", 0, 500, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).To(BeEquivalentTo(20)) + + opt = &redis.TSRevRangeOptions{Count: 10} + result, err = client.TSRevRangeWithArgs(ctx, "a", 0, 500, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).To(BeEquivalentTo(10)) + + fts := make([]int, 0) + for i := 10; i < 20; i++ { + fts = append(fts, i) + } + opt = &redis.TSRevRangeOptions{FilterByTS: fts, FilterByValue: []int{1, 2}} + result, err = client.TSRevRangeWithArgs(ctx, "a", 0, 500, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).To(BeEquivalentTo(2)) + + opt = &redis.TSRevRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: "+"} + result, err = client.TSRevRangeWithArgs(ctx, "a", 0, 10, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 10, Value: 1}, {Timestamp: 0, Value: 10}})) + + opt = &redis.TSRevRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: "1"} + result, err = client.TSRevRangeWithArgs(ctx, "a", 0, 10, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 1, Value: 10}, {Timestamp: 0, Value: 1}})) + + opt = &redis.TSRevRangeOptions{Aggregator: redis.Twa, BucketDuration: 10} + result, err = client.TSRevRangeWithArgs(ctx, "a", 0, 10, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 10, Value: 3}, {Timestamp: 0, Value: 2.55}})) + // Test Range Latest + resultCreate, err := client.TSCreate(ctx, "t1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultCreate).To(BeEquivalentTo("OK")) + resultCreate, err = client.TSCreate(ctx, "t2").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultCreate).To(BeEquivalentTo("OK")) + resultRule, err := client.TSCreateRule(ctx, "t1", "t2", redis.Sum, 10).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultRule).To(BeEquivalentTo("OK")) + _, errAdd := client.TSAdd(ctx, "t1", 1, 1).Result() + Expect(errAdd).NotTo(HaveOccurred()) + _, errAdd = client.TSAdd(ctx, "t1", 2, 3).Result() + Expect(errAdd).NotTo(HaveOccurred()) + _, errAdd = client.TSAdd(ctx, "t1", 11, 7).Result() + Expect(errAdd).NotTo(HaveOccurred()) + _, errAdd = client.TSAdd(ctx, "t1", 13, 1).Result() + Expect(errAdd).NotTo(HaveOccurred()) + resultRange, err := client.TSRange(ctx, "t2", 0, 10).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultRange[0]).To(BeEquivalentTo(redis.TSTimestampValue{Timestamp: 0, Value: 4})) + opt = &redis.TSRevRangeOptions{Latest: true} + resultRange, err = client.TSRevRangeWithArgs(ctx, "t2", 0, 10, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultRange[0]).To(BeEquivalentTo(redis.TSTimestampValue{Timestamp: 10, Value: 8})) + resultRange, err = client.TSRevRangeWithArgs(ctx, "t2", 0, 9, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultRange[0]).To(BeEquivalentTo(redis.TSTimestampValue{Timestamp: 0, Value: 4})) + // Test Bucket Timestamp + resultCreate, err = client.TSCreate(ctx, "t3").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultCreate).To(BeEquivalentTo("OK")) + _, errAdd = client.TSAdd(ctx, "t3", 15, 1).Result() + Expect(errAdd).NotTo(HaveOccurred()) + _, errAdd = client.TSAdd(ctx, "t3", 17, 4).Result() + Expect(errAdd).NotTo(HaveOccurred()) + _, errAdd = client.TSAdd(ctx, "t3", 51, 3).Result() + Expect(errAdd).NotTo(HaveOccurred()) + _, errAdd = client.TSAdd(ctx, "t3", 73, 5).Result() + Expect(errAdd).NotTo(HaveOccurred()) + _, errAdd = client.TSAdd(ctx, "t3", 75, 3).Result() + Expect(errAdd).NotTo(HaveOccurred()) + + opt = &redis.TSRevRangeOptions{Aggregator: redis.Max, Align: 0, BucketDuration: 10} + resultRange, err = client.TSRevRangeWithArgs(ctx, "t3", 0, 100, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultRange[0]).To(BeEquivalentTo(redis.TSTimestampValue{Timestamp: 70, Value: 5})) + Expect(len(resultRange)).To(BeEquivalentTo(3)) + + opt = &redis.TSRevRangeOptions{Aggregator: redis.Max, Align: 0, BucketDuration: 10, BucketTimestamp: "+"} + resultRange, err = client.TSRevRangeWithArgs(ctx, "t3", 0, 100, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultRange[0]).To(BeEquivalentTo(redis.TSTimestampValue{Timestamp: 80, Value: 5})) + Expect(len(resultRange)).To(BeEquivalentTo(3)) + // Test Empty + _, errAdd = client.TSAdd(ctx, "t4", 15, 1).Result() + Expect(errAdd).NotTo(HaveOccurred()) + _, errAdd = client.TSAdd(ctx, "t4", 17, 4).Result() + Expect(errAdd).NotTo(HaveOccurred()) + _, errAdd = client.TSAdd(ctx, "t4", 51, 3).Result() + Expect(errAdd).NotTo(HaveOccurred()) + _, errAdd = client.TSAdd(ctx, "t4", 73, 5).Result() + Expect(errAdd).NotTo(HaveOccurred()) + _, errAdd = client.TSAdd(ctx, "t4", 75, 3).Result() + Expect(errAdd).NotTo(HaveOccurred()) + + opt = &redis.TSRevRangeOptions{Aggregator: redis.Max, Align: 0, BucketDuration: 10} + resultRange, err = client.TSRevRangeWithArgs(ctx, "t4", 0, 100, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultRange[0]).To(BeEquivalentTo(redis.TSTimestampValue{Timestamp: 70, Value: 5})) + Expect(len(resultRange)).To(BeEquivalentTo(3)) + + opt = &redis.TSRevRangeOptions{Aggregator: redis.Max, Align: 0, BucketDuration: 10, Empty: true} + resultRange, err = client.TSRevRangeWithArgs(ctx, "t4", 0, 100, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultRange[0]).To(BeEquivalentTo(redis.TSTimestampValue{Timestamp: 70, Value: 5})) + Expect(len(resultRange)).To(BeEquivalentTo(7)) + + }) + + It("should TSMRange and TSMRangeWithArgs", Label("timeseries", "tsmrange", "tsmrangeWithArgs"), func() { + createOpt := &redis.TSOptions{Labels: map[string]string{"Test": "This", "team": "ny"}} + resultCreate, err := client.TSCreateWithArgs(ctx, "a", createOpt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultCreate).To(BeEquivalentTo("OK")) + createOpt = &redis.TSOptions{Labels: map[string]string{"Test": "This", "Taste": "That", "team": "sf"}} + resultCreate, err = client.TSCreateWithArgs(ctx, "b", createOpt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultCreate).To(BeEquivalentTo("OK")) + + for i := 0; i < 100; i++ { + _, err := client.TSAdd(ctx, "a", i, float64(i%7)).Result() + Expect(err).NotTo(HaveOccurred()) + _, err = client.TSAdd(ctx, "b", i, float64(i%11)).Result() + Expect(err).NotTo(HaveOccurred()) + } + + result, err := client.TSMRange(ctx, 0, 200, []string{"Test=This"}).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).To(BeEquivalentTo(2)) + Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(100)) + // Test Count + mrangeOpt := &redis.TSMRangeOptions{Count: 10} + result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(10)) + // Test Aggregation and BucketDuration + for i := 0; i < 100; i++ { + _, err := client.TSAdd(ctx, "a", i+200, float64(i%7)).Result() + Expect(err).NotTo(HaveOccurred()) + } + mrangeOpt = &redis.TSMRangeOptions{Aggregator: redis.Avg, BucketDuration: 10} + result, err = client.TSMRangeWithArgs(ctx, 0, 500, []string{"Test=This"}, mrangeOpt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).To(BeEquivalentTo(2)) + Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(20)) + // Test WithLabels + Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{})) + mrangeOpt = &redis.TSMRangeOptions{WithLabels: true} + result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"Test": "This", "team": "ny"})) + // Test SelectedLabels + mrangeOpt = &redis.TSMRangeOptions{SelectedLabels: []interface{}{"team"}} + result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "ny"})) + Expect(result["b"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "sf"})) + // Test FilterBy + fts := make([]int, 0) + for i := 10; i < 20; i++ { + fts = append(fts, i) + } + mrangeOpt = &redis.TSMRangeOptions{FilterByTS: fts, FilterByValue: []int{1, 2}} + result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(15), 1.0}, []interface{}{int64(16), 2.0}})) + // Test GroupBy + mrangeOpt = &redis.TSMRangeOptions{GroupByLabel: "Test", Reducer: "sum"} + result, err = client.TSMRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 2.0}, []interface{}{int64(2), 4.0}, []interface{}{int64(3), 6.0}})) + + mrangeOpt = &redis.TSMRangeOptions{GroupByLabel: "Test", Reducer: "max"} + result, err = client.TSMRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(3), 3.0}})) + + mrangeOpt = &redis.TSMRangeOptions{GroupByLabel: "team", Reducer: "min"} + result, err = client.TSMRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).To(BeEquivalentTo(2)) + Expect(result["team=ny"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(3), 3.0}})) + Expect(result["team=sf"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(3), 3.0}})) + // Test Align + mrangeOpt = &redis.TSMRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: "-"} + result, err = client.TSMRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 10.0}, []interface{}{int64(10), 1.0}})) + + mrangeOpt = &redis.TSMRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: 5} + result, err = client.TSMRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 5.0}, []interface{}{int64(5), 6.0}})) + + }) + + It("should TSMRangeWithArgs Latest", Label("timeseries", "tsmrangeWithArgs", "tsmrangelatest"), func() { + resultCreate, err := client.TSCreate(ctx, "a").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultCreate).To(BeEquivalentTo("OK")) + opt := &redis.TSOptions{Labels: map[string]string{"is_compaction": "true"}} + resultCreate, err = client.TSCreateWithArgs(ctx, "b", opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultCreate).To(BeEquivalentTo("OK")) + + resultCreate, err = client.TSCreate(ctx, "c").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultCreate).To(BeEquivalentTo("OK")) + opt = &redis.TSOptions{Labels: map[string]string{"is_compaction": "true"}} + resultCreate, err = client.TSCreateWithArgs(ctx, "d", opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultCreate).To(BeEquivalentTo("OK")) + + resultCreateRule, err := client.TSCreateRule(ctx, "a", "b", redis.Sum, 10).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultCreateRule).To(BeEquivalentTo("OK")) + resultCreateRule, err = client.TSCreateRule(ctx, "c", "d", redis.Sum, 10).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultCreateRule).To(BeEquivalentTo("OK")) + + _, err = client.TSAdd(ctx, "a", 1, 1).Result() + Expect(err).NotTo(HaveOccurred()) + _, err = client.TSAdd(ctx, "a", 2, 3).Result() + Expect(err).NotTo(HaveOccurred()) + _, err = client.TSAdd(ctx, "a", 11, 7).Result() + Expect(err).NotTo(HaveOccurred()) + _, err = client.TSAdd(ctx, "a", 13, 1).Result() + Expect(err).NotTo(HaveOccurred()) + + _, err = client.TSAdd(ctx, "c", 1, 1).Result() + Expect(err).NotTo(HaveOccurred()) + _, err = client.TSAdd(ctx, "c", 2, 3).Result() + Expect(err).NotTo(HaveOccurred()) + _, err = client.TSAdd(ctx, "c", 11, 7).Result() + Expect(err).NotTo(HaveOccurred()) + _, err = client.TSAdd(ctx, "c", 13, 1).Result() + Expect(err).NotTo(HaveOccurred()) + mrangeOpt := &redis.TSMRangeOptions{Latest: true} + result, err := client.TSMRangeWithArgs(ctx, 0, 10, []string{"is_compaction=true"}, mrangeOpt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result["b"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 4.0}, []interface{}{int64(10), 8.0}})) + Expect(result["d"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 4.0}, []interface{}{int64(10), 8.0}})) + + }) + It("should TSMRevRange and TSMRevRangeWithArgs", Label("timeseries", "tsmrevrange", "tsmrevrangeWithArgs"), func() { + createOpt := &redis.TSOptions{Labels: map[string]string{"Test": "This", "team": "ny"}} + resultCreate, err := client.TSCreateWithArgs(ctx, "a", createOpt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultCreate).To(BeEquivalentTo("OK")) + createOpt = &redis.TSOptions{Labels: map[string]string{"Test": "This", "Taste": "That", "team": "sf"}} + resultCreate, err = client.TSCreateWithArgs(ctx, "b", createOpt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultCreate).To(BeEquivalentTo("OK")) + + for i := 0; i < 100; i++ { + _, err := client.TSAdd(ctx, "a", i, float64(i%7)).Result() + Expect(err).NotTo(HaveOccurred()) + _, err = client.TSAdd(ctx, "b", i, float64(i%11)).Result() + Expect(err).NotTo(HaveOccurred()) + } + result, err := client.TSMRevRange(ctx, 0, 200, []string{"Test=This"}).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).To(BeEquivalentTo(2)) + Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(100)) + // Test Count + mrangeOpt := &redis.TSMRevRangeOptions{Count: 10} + result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(10)) + // Test Aggregation and BucketDuration + for i := 0; i < 100; i++ { + _, err := client.TSAdd(ctx, "a", i+200, float64(i%7)).Result() + Expect(err).NotTo(HaveOccurred()) + } + mrangeOpt = &redis.TSMRevRangeOptions{Aggregator: redis.Avg, BucketDuration: 10} + result, err = client.TSMRevRangeWithArgs(ctx, 0, 500, []string{"Test=This"}, mrangeOpt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).To(BeEquivalentTo(2)) + Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(20)) + Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{})) + // Test WithLabels + Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{})) + mrangeOpt = &redis.TSMRevRangeOptions{WithLabels: true} + result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"Test": "This", "team": "ny"})) + // Test SelectedLabels + mrangeOpt = &redis.TSMRevRangeOptions{SelectedLabels: []interface{}{"team"}} + result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "ny"})) + Expect(result["b"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "sf"})) + // Test FilterBy + fts := make([]int, 0) + for i := 10; i < 20; i++ { + fts = append(fts, i) + } + mrangeOpt = &redis.TSMRevRangeOptions{FilterByTS: fts, FilterByValue: []int{1, 2}} + result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(16), 2.0}, []interface{}{int64(15), 1.0}})) + // Test GroupBy + mrangeOpt = &redis.TSMRevRangeOptions{GroupByLabel: "Test", Reducer: "sum"} + result, err = client.TSMRevRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 6.0}, []interface{}{int64(2), 4.0}, []interface{}{int64(1), 2.0}, []interface{}{int64(0), 0.0}})) + + mrangeOpt = &redis.TSMRevRangeOptions{GroupByLabel: "Test", Reducer: "max"} + result, err = client.TSMRevRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 3.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(0), 0.0}})) + + mrangeOpt = &redis.TSMRevRangeOptions{GroupByLabel: "team", Reducer: "min"} + result, err = client.TSMRevRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).To(BeEquivalentTo(2)) + Expect(result["team=ny"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 3.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(0), 0.0}})) + Expect(result["team=sf"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 3.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(0), 0.0}})) + // Test Align + mrangeOpt = &redis.TSMRevRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: "-"} + result, err = client.TSMRevRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), 1.0}, []interface{}{int64(0), 10.0}})) + + mrangeOpt = &redis.TSMRevRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: 1} + result, err = client.TSMRevRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(1), 10.0}, []interface{}{int64(0), 1.0}})) + + }) + + It("should TSMRevRangeWithArgs Latest", Label("timeseries", "tsmrevrangeWithArgs", "tsmrevrangelatest"), func() { + resultCreate, err := client.TSCreate(ctx, "a").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultCreate).To(BeEquivalentTo("OK")) + opt := &redis.TSOptions{Labels: map[string]string{"is_compaction": "true"}} + resultCreate, err = client.TSCreateWithArgs(ctx, "b", opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultCreate).To(BeEquivalentTo("OK")) + + resultCreate, err = client.TSCreate(ctx, "c").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultCreate).To(BeEquivalentTo("OK")) + opt = &redis.TSOptions{Labels: map[string]string{"is_compaction": "true"}} + resultCreate, err = client.TSCreateWithArgs(ctx, "d", opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultCreate).To(BeEquivalentTo("OK")) + + resultCreateRule, err := client.TSCreateRule(ctx, "a", "b", redis.Sum, 10).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultCreateRule).To(BeEquivalentTo("OK")) + resultCreateRule, err = client.TSCreateRule(ctx, "c", "d", redis.Sum, 10).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultCreateRule).To(BeEquivalentTo("OK")) + + _, err = client.TSAdd(ctx, "a", 1, 1).Result() + Expect(err).NotTo(HaveOccurred()) + _, err = client.TSAdd(ctx, "a", 2, 3).Result() + Expect(err).NotTo(HaveOccurred()) + _, err = client.TSAdd(ctx, "a", 11, 7).Result() + Expect(err).NotTo(HaveOccurred()) + _, err = client.TSAdd(ctx, "a", 13, 1).Result() + Expect(err).NotTo(HaveOccurred()) + + _, err = client.TSAdd(ctx, "c", 1, 1).Result() + Expect(err).NotTo(HaveOccurred()) + _, err = client.TSAdd(ctx, "c", 2, 3).Result() + Expect(err).NotTo(HaveOccurred()) + _, err = client.TSAdd(ctx, "c", 11, 7).Result() + Expect(err).NotTo(HaveOccurred()) + _, err = client.TSAdd(ctx, "c", 13, 1).Result() + Expect(err).NotTo(HaveOccurred()) + mrangeOpt := &redis.TSMRevRangeOptions{Latest: true} + result, err := client.TSMRevRangeWithArgs(ctx, 0, 10, []string{"is_compaction=true"}, mrangeOpt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result["b"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), 8.0}, []interface{}{int64(0), 4.0}})) + Expect(result["d"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), 8.0}, []interface{}{int64(0), 4.0}})) + + }) + +})