mirror of https://github.com/go-redis/redis.git
Add Redis Timeseries support (#2688)
* Add Redis Timeseries support * Small fixes * Make timeseries interface public * remove bloom renaming
This commit is contained in:
parent
5bbd80d943
commit
017466b6cc
|
@ -53,4 +53,5 @@ URI
|
|||
url
|
||||
variadic
|
||||
RedisStack
|
||||
RedisGears
|
||||
RedisGears
|
||||
RedisTimeseries
|
59
command.go
59
command.go
|
@ -1351,6 +1351,65 @@ func (cmd *MapStringIntCmd) readReply(rd *proto.Reader) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------------
|
||||
type MapStringSliceInterfaceCmd struct {
|
||||
baseCmd
|
||||
val map[string][]interface{}
|
||||
}
|
||||
|
||||
func NewMapStringSliceInterfaceCmd(ctx context.Context, args ...interface{}) *MapStringSliceInterfaceCmd {
|
||||
return &MapStringSliceInterfaceCmd{
|
||||
baseCmd: baseCmd{
|
||||
ctx: ctx,
|
||||
args: args,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (cmd *MapStringSliceInterfaceCmd) String() string {
|
||||
return cmdString(cmd, cmd.val)
|
||||
}
|
||||
|
||||
func (cmd *MapStringSliceInterfaceCmd) SetVal(val map[string][]interface{}) {
|
||||
cmd.val = val
|
||||
}
|
||||
|
||||
func (cmd *MapStringSliceInterfaceCmd) Result() (map[string][]interface{}, error) {
|
||||
return cmd.val, cmd.err
|
||||
}
|
||||
|
||||
func (cmd *MapStringSliceInterfaceCmd) Val() map[string][]interface{} {
|
||||
return cmd.val
|
||||
}
|
||||
|
||||
func (cmd *MapStringSliceInterfaceCmd) readReply(rd *proto.Reader) (err error) {
|
||||
n, err := rd.ReadMapLen()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cmd.val = make(map[string][]interface{}, n)
|
||||
for i := 0; i < n; i++ {
|
||||
k, err := rd.ReadString()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
nn, err := rd.ReadArrayLen()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cmd.val[k] = make([]interface{}, nn)
|
||||
for j := 0; j < nn; j++ {
|
||||
value, err := rd.ReadReply()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cmd.val[k][j] = value
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
type StringStructMapCmd struct {
|
||||
|
|
|
@ -507,6 +507,7 @@ type Cmdable interface {
|
|||
|
||||
gearsCmdable
|
||||
probabilisticCmdable
|
||||
TimeseriesCmdable
|
||||
}
|
||||
|
||||
type StatefulCmdable interface {
|
||||
|
|
|
@ -0,0 +1,929 @@
|
|||
package redis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
|
||||
"github.com/redis/go-redis/v9/internal/proto"
|
||||
)
|
||||
|
||||
type TimeseriesCmdable interface {
|
||||
TSAdd(ctx context.Context, key string, timestamp interface{}, value float64) *IntCmd
|
||||
TSAddWithArgs(ctx context.Context, key string, timestamp interface{}, value float64, options *TSOptions) *IntCmd
|
||||
TSCreate(ctx context.Context, key string) *StatusCmd
|
||||
TSCreateWithArgs(ctx context.Context, key string, options *TSOptions) *StatusCmd
|
||||
TSAlter(ctx context.Context, key string, options *TSAlterOptions) *StatusCmd
|
||||
TSCreateRule(ctx context.Context, sourceKey string, destKey string, aggregator Aggregator, bucketDuration int) *StatusCmd
|
||||
TSCreateRuleWithArgs(ctx context.Context, sourceKey string, destKey string, aggregator Aggregator, bucketDuration int, options *TSCreateRuleOptions) *StatusCmd
|
||||
TSIncrBy(ctx context.Context, Key string, timestamp float64) *IntCmd
|
||||
TSIncrByWithArgs(ctx context.Context, key string, timestamp float64, options *TSIncrDecrOptions) *IntCmd
|
||||
TSDecrBy(ctx context.Context, Key string, timestamp float64) *IntCmd
|
||||
TSDecrByWithArgs(ctx context.Context, key string, timestamp float64, options *TSIncrDecrOptions) *IntCmd
|
||||
TSDel(ctx context.Context, Key string, fromTimestamp int, toTimestamp int) *IntCmd
|
||||
TSDeleteRule(ctx context.Context, sourceKey string, destKey string) *StatusCmd
|
||||
TSGet(ctx context.Context, key string) *TSTimestampValueCmd
|
||||
TSGetWithArgs(ctx context.Context, key string, options *TSGetOptions) *TSTimestampValueCmd
|
||||
TSInfo(ctx context.Context, key string) *MapStringInterfaceCmd
|
||||
TSInfoWithArgs(ctx context.Context, key string, options *TSInfoOptions) *MapStringInterfaceCmd
|
||||
TSMAdd(ctx context.Context, ktvSlices [][]interface{}) *IntSliceCmd
|
||||
TSQueryIndex(ctx context.Context, filterExpr []string) *StringSliceCmd
|
||||
TSRevRange(ctx context.Context, key string, fromTimestamp int, toTimestamp int) *TSTimestampValueSliceCmd
|
||||
TSRevRangeWithArgs(ctx context.Context, key string, fromTimestamp int, toTimestamp int, options *TSRevRangeOptions) *TSTimestampValueSliceCmd
|
||||
TSRange(ctx context.Context, key string, fromTimestamp int, toTimestamp int) *TSTimestampValueSliceCmd
|
||||
TSRangeWithArgs(ctx context.Context, key string, fromTimestamp int, toTimestamp int, options *TSRangeOptions) *TSTimestampValueSliceCmd
|
||||
TSMRange(ctx context.Context, fromTimestamp int, toTimestamp int, filterExpr []string) *MapStringSliceInterfaceCmd
|
||||
TSMRangeWithArgs(ctx context.Context, fromTimestamp int, toTimestamp int, filterExpr []string, options *TSMRangeOptions) *MapStringSliceInterfaceCmd
|
||||
TSMRevRange(ctx context.Context, fromTimestamp int, toTimestamp int, filterExpr []string) *MapStringSliceInterfaceCmd
|
||||
TSMRevRangeWithArgs(ctx context.Context, fromTimestamp int, toTimestamp int, filterExpr []string, options *TSMRevRangeOptions) *MapStringSliceInterfaceCmd
|
||||
TSMGet(ctx context.Context, filters []string) *MapStringSliceInterfaceCmd
|
||||
TSMGetWithArgs(ctx context.Context, filters []string, options *TSMGetOptions) *MapStringSliceInterfaceCmd
|
||||
}
|
||||
|
||||
type TSOptions struct {
|
||||
Retention int
|
||||
ChunkSize int
|
||||
Encoding string
|
||||
DuplicatePolicy string
|
||||
Labels map[string]string
|
||||
}
|
||||
type TSIncrDecrOptions struct {
|
||||
Timestamp int64
|
||||
Retention int
|
||||
ChunkSize int
|
||||
Uncompressed bool
|
||||
Labels map[string]string
|
||||
}
|
||||
|
||||
type TSAlterOptions struct {
|
||||
Retention int
|
||||
ChunkSize int
|
||||
DuplicatePolicy string
|
||||
Labels map[string]string
|
||||
}
|
||||
|
||||
type TSCreateRuleOptions struct {
|
||||
alignTimestamp int64
|
||||
}
|
||||
|
||||
type TSGetOptions struct {
|
||||
Latest bool
|
||||
}
|
||||
|
||||
type TSInfoOptions struct {
|
||||
Debug bool
|
||||
}
|
||||
type Aggregator int
|
||||
|
||||
const (
|
||||
Invalid = Aggregator(iota)
|
||||
Avg
|
||||
Sum
|
||||
Min
|
||||
Max
|
||||
Range
|
||||
Count
|
||||
First
|
||||
Last
|
||||
StdP
|
||||
StdS
|
||||
VarP
|
||||
VarS
|
||||
Twa
|
||||
)
|
||||
|
||||
func (a Aggregator) String() string {
|
||||
switch a {
|
||||
case Invalid:
|
||||
return ""
|
||||
case Avg:
|
||||
return "AVG"
|
||||
case Sum:
|
||||
return "SUM"
|
||||
case Min:
|
||||
return "MIN"
|
||||
case Max:
|
||||
return "MAX"
|
||||
case Range:
|
||||
return "RANGE"
|
||||
case Count:
|
||||
return "COUNT"
|
||||
case First:
|
||||
return "FIRST"
|
||||
case Last:
|
||||
return "LAST"
|
||||
case StdP:
|
||||
return "STD.P"
|
||||
case StdS:
|
||||
return "STD.S"
|
||||
case VarP:
|
||||
return "VAR.P"
|
||||
case VarS:
|
||||
return "VAR.S"
|
||||
case Twa:
|
||||
return "TWA"
|
||||
default:
|
||||
return ""
|
||||
}
|
||||
}
|
||||
|
||||
type TSRangeOptions struct {
|
||||
Latest bool
|
||||
FilterByTS []int
|
||||
FilterByValue []int
|
||||
Count int
|
||||
Align interface{}
|
||||
Aggregator Aggregator
|
||||
BucketDuration int
|
||||
BucketTimestamp interface{}
|
||||
Empty bool
|
||||
}
|
||||
|
||||
type TSRevRangeOptions struct {
|
||||
Latest bool
|
||||
FilterByTS []int
|
||||
FilterByValue []int
|
||||
Count int
|
||||
Align interface{}
|
||||
Aggregator Aggregator
|
||||
BucketDuration int
|
||||
BucketTimestamp interface{}
|
||||
Empty bool
|
||||
}
|
||||
|
||||
type TSMRangeOptions struct {
|
||||
Latest bool
|
||||
FilterByTS []int
|
||||
FilterByValue []int
|
||||
WithLabels bool
|
||||
SelectedLabels []interface{}
|
||||
Count int
|
||||
Align interface{}
|
||||
Aggregator Aggregator
|
||||
BucketDuration int
|
||||
BucketTimestamp interface{}
|
||||
Empty bool
|
||||
GroupByLabel interface{}
|
||||
Reducer interface{}
|
||||
}
|
||||
|
||||
type TSMRevRangeOptions struct {
|
||||
Latest bool
|
||||
FilterByTS []int
|
||||
FilterByValue []int
|
||||
WithLabels bool
|
||||
SelectedLabels []interface{}
|
||||
Count int
|
||||
Align interface{}
|
||||
Aggregator Aggregator
|
||||
BucketDuration int
|
||||
BucketTimestamp interface{}
|
||||
Empty bool
|
||||
GroupByLabel interface{}
|
||||
Reducer interface{}
|
||||
}
|
||||
|
||||
type TSMGetOptions struct {
|
||||
Latest bool
|
||||
WithLabels bool
|
||||
SelectedLabels []interface{}
|
||||
}
|
||||
|
||||
// TSAdd - Adds one or more observations to a t-digest sketch.
|
||||
// For more information - https://redis.io/commands/ts.add/
|
||||
func (c cmdable) TSAdd(ctx context.Context, key string, timestamp interface{}, value float64) *IntCmd {
|
||||
args := []interface{}{"TS.ADD", key, timestamp, value}
|
||||
cmd := NewIntCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
// TSAddWithArgs - Adds one or more observations to a t-digest sketch.
|
||||
// This function also allows for specifying additional options such as:
|
||||
// Retention, ChunkSize, Encoding, DuplicatePolicy and Labels.
|
||||
// For more information - https://redis.io/commands/ts.add/
|
||||
func (c cmdable) TSAddWithArgs(ctx context.Context, key string, timestamp interface{}, value float64, options *TSOptions) *IntCmd {
|
||||
args := []interface{}{"TS.ADD", key, timestamp, value}
|
||||
if options != nil {
|
||||
if options.Retention != 0 {
|
||||
args = append(args, "RETENTION", options.Retention)
|
||||
}
|
||||
if options.ChunkSize != 0 {
|
||||
args = append(args, "CHUNK_SIZE", options.ChunkSize)
|
||||
|
||||
}
|
||||
if options.Encoding != "" {
|
||||
args = append(args, "ENCODING", options.Encoding)
|
||||
}
|
||||
|
||||
if options.DuplicatePolicy != "" {
|
||||
args = append(args, "DUPLICATE_POLICY", options.DuplicatePolicy)
|
||||
}
|
||||
if options.Labels != nil {
|
||||
args = append(args, "LABELS")
|
||||
for label, value := range options.Labels {
|
||||
args = append(args, label, value)
|
||||
}
|
||||
}
|
||||
}
|
||||
cmd := NewIntCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
// TSCreate - Creates a new time-series key.
|
||||
// For more information - https://redis.io/commands/ts.create/
|
||||
func (c cmdable) TSCreate(ctx context.Context, key string) *StatusCmd {
|
||||
args := []interface{}{"TS.CREATE", key}
|
||||
cmd := NewStatusCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
// TSCreateWithArgs - Creates a new time-series key with additional options.
|
||||
// This function allows for specifying additional options such as:
|
||||
// Retention, ChunkSize, Encoding, DuplicatePolicy and Labels.
|
||||
// For more information - https://redis.io/commands/ts.create/
|
||||
func (c cmdable) TSCreateWithArgs(ctx context.Context, key string, options *TSOptions) *StatusCmd {
|
||||
args := []interface{}{"TS.CREATE", key}
|
||||
if options != nil {
|
||||
if options.Retention != 0 {
|
||||
args = append(args, "RETENTION", options.Retention)
|
||||
}
|
||||
if options.ChunkSize != 0 {
|
||||
args = append(args, "CHUNK_SIZE", options.ChunkSize)
|
||||
|
||||
}
|
||||
if options.Encoding != "" {
|
||||
args = append(args, "ENCODING", options.Encoding)
|
||||
}
|
||||
|
||||
if options.DuplicatePolicy != "" {
|
||||
args = append(args, "DUPLICATE_POLICY", options.DuplicatePolicy)
|
||||
}
|
||||
if options.Labels != nil {
|
||||
args = append(args, "LABELS")
|
||||
for label, value := range options.Labels {
|
||||
args = append(args, label, value)
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
cmd := NewStatusCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
// TSAlter - Alters an existing time-series key with additional options.
|
||||
// This function allows for specifying additional options such as:
|
||||
// Retention, ChunkSize and DuplicatePolicy.
|
||||
// For more information - https://redis.io/commands/ts.alter/
|
||||
func (c cmdable) TSAlter(ctx context.Context, key string, options *TSAlterOptions) *StatusCmd {
|
||||
args := []interface{}{"TS.ALTER", key}
|
||||
if options != nil {
|
||||
if options.Retention != 0 {
|
||||
args = append(args, "RETENTION", options.Retention)
|
||||
}
|
||||
if options.ChunkSize != 0 {
|
||||
args = append(args, "CHUNK_SIZE", options.ChunkSize)
|
||||
|
||||
}
|
||||
if options.DuplicatePolicy != "" {
|
||||
args = append(args, "DUPLICATE_POLICY", options.DuplicatePolicy)
|
||||
}
|
||||
if options.Labels != nil {
|
||||
args = append(args, "LABELS")
|
||||
for label, value := range options.Labels {
|
||||
args = append(args, label, value)
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
cmd := NewStatusCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
// TSCreateRule - Creates a compaction rule from sourceKey to destKey.
|
||||
// For more information - https://redis.io/commands/ts.createrule/
|
||||
func (c cmdable) TSCreateRule(ctx context.Context, sourceKey string, destKey string, aggregator Aggregator, bucketDuration int) *StatusCmd {
|
||||
args := []interface{}{"TS.CREATERULE", sourceKey, destKey, "AGGREGATION", aggregator.String(), bucketDuration}
|
||||
cmd := NewStatusCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
// TSCreateRuleWithArgs - Creates a compaction rule from sourceKey to destKey with additional option.
|
||||
// This function allows for specifying additional option such as:
|
||||
// alignTimestamp.
|
||||
// For more information - https://redis.io/commands/ts.createrule/
|
||||
func (c cmdable) TSCreateRuleWithArgs(ctx context.Context, sourceKey string, destKey string, aggregator Aggregator, bucketDuration int, options *TSCreateRuleOptions) *StatusCmd {
|
||||
args := []interface{}{"TS.CREATERULE", sourceKey, destKey, "AGGREGATION", aggregator.String(), bucketDuration}
|
||||
if options != nil {
|
||||
if options.alignTimestamp != 0 {
|
||||
args = append(args, options.alignTimestamp)
|
||||
}
|
||||
}
|
||||
cmd := NewStatusCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
// TSIncrBy - Increments the value of a time-series key by the specified timestamp.
|
||||
// For more information - https://redis.io/commands/ts.incrby/
|
||||
func (c cmdable) TSIncrBy(ctx context.Context, Key string, timestamp float64) *IntCmd {
|
||||
args := []interface{}{"TS.INCRBY", Key, timestamp}
|
||||
cmd := NewIntCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
// TSIncrByWithArgs - Increments the value of a time-series key by the specified timestamp with additional options.
|
||||
// This function allows for specifying additional options such as:
|
||||
// Timestamp, Retention, ChunkSize, Uncompressed and Labels.
|
||||
// For more information - https://redis.io/commands/ts.incrby/
|
||||
func (c cmdable) TSIncrByWithArgs(ctx context.Context, key string, timestamp float64, options *TSIncrDecrOptions) *IntCmd {
|
||||
args := []interface{}{"TS.INCRBY", key, timestamp}
|
||||
if options != nil {
|
||||
if options.Timestamp != 0 {
|
||||
args = append(args, "TIMESTAMP", options.Timestamp)
|
||||
}
|
||||
if options.Retention != 0 {
|
||||
args = append(args, "RETENTION", options.Retention)
|
||||
}
|
||||
if options.ChunkSize != 0 {
|
||||
args = append(args, "CHUNK_SIZE", options.ChunkSize)
|
||||
|
||||
}
|
||||
if options.Uncompressed {
|
||||
args = append(args, "UNCOMPRESSED")
|
||||
}
|
||||
if options.Labels != nil {
|
||||
args = append(args, "LABELS")
|
||||
for label, value := range options.Labels {
|
||||
args = append(args, label, value)
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
cmd := NewIntCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
// TSDecrBy - Decrements the value of a time-series key by the specified timestamp.
|
||||
// For more information - https://redis.io/commands/ts.decrby/
|
||||
func (c cmdable) TSDecrBy(ctx context.Context, Key string, timestamp float64) *IntCmd {
|
||||
args := []interface{}{"TS.DECRBY", Key, timestamp}
|
||||
cmd := NewIntCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
// TSDecrByWithArgs - Decrements the value of a time-series key by the specified timestamp with additional options.
|
||||
// This function allows for specifying additional options such as:
|
||||
// Timestamp, Retention, ChunkSize, Uncompressed and Labels.
|
||||
// For more information - https://redis.io/commands/ts.decrby/
|
||||
func (c cmdable) TSDecrByWithArgs(ctx context.Context, key string, timestamp float64, options *TSIncrDecrOptions) *IntCmd {
|
||||
args := []interface{}{"TS.DECRBY", key, timestamp}
|
||||
if options != nil {
|
||||
if options.Timestamp != 0 {
|
||||
args = append(args, "TIMESTAMP", options.Timestamp)
|
||||
}
|
||||
if options.Retention != 0 {
|
||||
args = append(args, "RETENTION", options.Retention)
|
||||
}
|
||||
if options.ChunkSize != 0 {
|
||||
args = append(args, "CHUNK_SIZE", options.ChunkSize)
|
||||
|
||||
}
|
||||
if options.Uncompressed {
|
||||
args = append(args, "UNCOMPRESSED")
|
||||
}
|
||||
if options.Labels != nil {
|
||||
args = append(args, "LABELS")
|
||||
for label, value := range options.Labels {
|
||||
args = append(args, label, value)
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
cmd := NewIntCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
// TSDel - Deletes a range of samples from a time-series key.
|
||||
// For more information - https://redis.io/commands/ts.del/
|
||||
func (c cmdable) TSDel(ctx context.Context, Key string, fromTimestamp int, toTimestamp int) *IntCmd {
|
||||
args := []interface{}{"TS.DEL", Key, fromTimestamp, toTimestamp}
|
||||
cmd := NewIntCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
// TSDeleteRule - Deletes a compaction rule from sourceKey to destKey.
|
||||
// For more information - https://redis.io/commands/ts.deleterule/
|
||||
func (c cmdable) TSDeleteRule(ctx context.Context, sourceKey string, destKey string) *StatusCmd {
|
||||
args := []interface{}{"TS.DELETERULE", sourceKey, destKey}
|
||||
cmd := NewStatusCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
// TSGetWithArgs - Gets the last sample of a time-series key with additional option.
|
||||
// This function allows for specifying additional option such as:
|
||||
// Latest.
|
||||
// For more information - https://redis.io/commands/ts.get/
|
||||
func (c cmdable) TSGetWithArgs(ctx context.Context, key string, options *TSGetOptions) *TSTimestampValueCmd {
|
||||
args := []interface{}{"TS.GET", key}
|
||||
if options != nil {
|
||||
if options.Latest {
|
||||
args = append(args, "LATEST")
|
||||
}
|
||||
}
|
||||
cmd := newTSTimestampValueCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
// TSGet - Gets the last sample of a time-series key.
|
||||
// For more information - https://redis.io/commands/ts.get/
|
||||
func (c cmdable) TSGet(ctx context.Context, key string) *TSTimestampValueCmd {
|
||||
args := []interface{}{"TS.GET", key}
|
||||
cmd := newTSTimestampValueCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
type TSTimestampValue struct {
|
||||
Timestamp int64
|
||||
Value float64
|
||||
}
|
||||
type TSTimestampValueCmd struct {
|
||||
baseCmd
|
||||
val TSTimestampValue
|
||||
}
|
||||
|
||||
func newTSTimestampValueCmd(ctx context.Context, args ...interface{}) *TSTimestampValueCmd {
|
||||
return &TSTimestampValueCmd{
|
||||
baseCmd: baseCmd{
|
||||
ctx: ctx,
|
||||
args: args,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (cmd *TSTimestampValueCmd) String() string {
|
||||
return cmdString(cmd, cmd.val)
|
||||
}
|
||||
|
||||
func (cmd *TSTimestampValueCmd) SetVal(val TSTimestampValue) {
|
||||
cmd.val = val
|
||||
}
|
||||
|
||||
func (cmd *TSTimestampValueCmd) Result() (TSTimestampValue, error) {
|
||||
return cmd.val, cmd.err
|
||||
}
|
||||
|
||||
func (cmd *TSTimestampValueCmd) Val() TSTimestampValue {
|
||||
return cmd.val
|
||||
}
|
||||
|
||||
func (cmd *TSTimestampValueCmd) readReply(rd *proto.Reader) (err error) {
|
||||
n, err := rd.ReadMapLen()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cmd.val = TSTimestampValue{}
|
||||
for i := 0; i < n; i++ {
|
||||
timestamp, err := rd.ReadInt()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
value, err := rd.ReadString()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cmd.val.Timestamp = timestamp
|
||||
cmd.val.Value, err = strconv.ParseFloat(value, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// TSInfo - Returns information about a time-series key.
|
||||
// For more information - https://redis.io/commands/ts.info/
|
||||
func (c cmdable) TSInfo(ctx context.Context, key string) *MapStringInterfaceCmd {
|
||||
args := []interface{}{"TS.INFO", key}
|
||||
cmd := NewMapStringInterfaceCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
// TSInfoWithArgs - Returns information about a time-series key with additional option.
|
||||
// This function allows for specifying additional option such as:
|
||||
// Debug.
|
||||
// For more information - https://redis.io/commands/ts.info/
|
||||
func (c cmdable) TSInfoWithArgs(ctx context.Context, key string, options *TSInfoOptions) *MapStringInterfaceCmd {
|
||||
args := []interface{}{"TS.INFO", key}
|
||||
if options != nil {
|
||||
if options.Debug {
|
||||
args = append(args, "DEBUG")
|
||||
}
|
||||
}
|
||||
cmd := NewMapStringInterfaceCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
// TSMAdd - Adds multiple samples to multiple time-series keys.
|
||||
// For more information - https://redis.io/commands/ts.madd/
|
||||
func (c cmdable) TSMAdd(ctx context.Context, ktvSlices [][]interface{}) *IntSliceCmd {
|
||||
args := []interface{}{"TS.MADD"}
|
||||
for _, ktv := range ktvSlices {
|
||||
args = append(args, ktv...)
|
||||
}
|
||||
cmd := NewIntSliceCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
// TSQueryIndex - Returns all the keys matching the filter expression.
|
||||
// For more information - https://redis.io/commands/ts.queryindex/
|
||||
func (c cmdable) TSQueryIndex(ctx context.Context, filterExpr []string) *StringSliceCmd {
|
||||
args := []interface{}{"TS.QUERYINDEX"}
|
||||
for _, f := range filterExpr {
|
||||
args = append(args, f)
|
||||
}
|
||||
cmd := NewStringSliceCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
// TSRevRange - Returns a range of samples from a time-series key in reverse order.
|
||||
// For more information - https://redis.io/commands/ts.revrange/
|
||||
func (c cmdable) TSRevRange(ctx context.Context, key string, fromTimestamp int, toTimestamp int) *TSTimestampValueSliceCmd {
|
||||
args := []interface{}{"TS.REVRANGE", key, fromTimestamp, toTimestamp}
|
||||
cmd := newTSTimestampValueSliceCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
// TSRevRangeWithArgs - Returns a range of samples from a time-series key in reverse order with additional options.
|
||||
// This function allows for specifying additional options such as:
|
||||
// Latest, FilterByTS, FilterByValue, Count, Align, Aggregator,
|
||||
// BucketDuration, BucketTimestamp and Empty.
|
||||
// For more information - https://redis.io/commands/ts.revrange/
|
||||
func (c cmdable) TSRevRangeWithArgs(ctx context.Context, key string, fromTimestamp int, toTimestamp int, options *TSRevRangeOptions) *TSTimestampValueSliceCmd {
|
||||
args := []interface{}{"TS.REVRANGE", key, fromTimestamp, toTimestamp}
|
||||
if options != nil {
|
||||
if options.Latest {
|
||||
args = append(args, "LATEST")
|
||||
}
|
||||
if options.FilterByTS != nil {
|
||||
args = append(args, "FILTER_BY_TS")
|
||||
for _, f := range options.FilterByTS {
|
||||
args = append(args, f)
|
||||
}
|
||||
}
|
||||
if options.FilterByValue != nil {
|
||||
args = append(args, "FILTER_BY_VALUE")
|
||||
for _, f := range options.FilterByValue {
|
||||
args = append(args, f)
|
||||
}
|
||||
}
|
||||
if options.Count != 0 {
|
||||
args = append(args, "COUNT", options.Count)
|
||||
}
|
||||
if options.Align != nil {
|
||||
args = append(args, "ALIGN", options.Align)
|
||||
}
|
||||
if options.Aggregator != 0 {
|
||||
args = append(args, "AGGREGATION", options.Aggregator.String())
|
||||
}
|
||||
if options.BucketDuration != 0 {
|
||||
args = append(args, options.BucketDuration)
|
||||
}
|
||||
if options.BucketTimestamp != nil {
|
||||
args = append(args, "BUCKETTIMESTAMP", options.BucketTimestamp)
|
||||
}
|
||||
if options.Empty {
|
||||
args = append(args, "EMPTY")
|
||||
}
|
||||
}
|
||||
cmd := newTSTimestampValueSliceCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
// TSRange - Returns a range of samples from a time-series key.
|
||||
// For more information - https://redis.io/commands/ts.range/
|
||||
func (c cmdable) TSRange(ctx context.Context, key string, fromTimestamp int, toTimestamp int) *TSTimestampValueSliceCmd {
|
||||
args := []interface{}{"TS.RANGE", key, fromTimestamp, toTimestamp}
|
||||
cmd := newTSTimestampValueSliceCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
// TSRangeWithArgs - Returns a range of samples from a time-series key with additional options.
|
||||
// This function allows for specifying additional options such as:
|
||||
// Latest, FilterByTS, FilterByValue, Count, Align, Aggregator,
|
||||
// BucketDuration, BucketTimestamp and Empty.
|
||||
// For more information - https://redis.io/commands/ts.range/
|
||||
func (c cmdable) TSRangeWithArgs(ctx context.Context, key string, fromTimestamp int, toTimestamp int, options *TSRangeOptions) *TSTimestampValueSliceCmd {
|
||||
args := []interface{}{"TS.RANGE", key, fromTimestamp, toTimestamp}
|
||||
if options != nil {
|
||||
if options.Latest {
|
||||
args = append(args, "LATEST")
|
||||
}
|
||||
if options.FilterByTS != nil {
|
||||
args = append(args, "FILTER_BY_TS")
|
||||
for _, f := range options.FilterByTS {
|
||||
args = append(args, f)
|
||||
}
|
||||
}
|
||||
if options.FilterByValue != nil {
|
||||
args = append(args, "FILTER_BY_VALUE")
|
||||
for _, f := range options.FilterByValue {
|
||||
args = append(args, f)
|
||||
}
|
||||
}
|
||||
if options.Count != 0 {
|
||||
args = append(args, "COUNT", options.Count)
|
||||
}
|
||||
if options.Align != nil {
|
||||
args = append(args, "ALIGN", options.Align)
|
||||
}
|
||||
if options.Aggregator != 0 {
|
||||
args = append(args, "AGGREGATION", options.Aggregator.String())
|
||||
}
|
||||
if options.BucketDuration != 0 {
|
||||
args = append(args, options.BucketDuration)
|
||||
}
|
||||
if options.BucketTimestamp != nil {
|
||||
args = append(args, "BUCKETTIMESTAMP", options.BucketTimestamp)
|
||||
}
|
||||
if options.Empty {
|
||||
args = append(args, "EMPTY")
|
||||
}
|
||||
}
|
||||
cmd := newTSTimestampValueSliceCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
type TSTimestampValueSliceCmd struct {
|
||||
baseCmd
|
||||
val []TSTimestampValue
|
||||
}
|
||||
|
||||
func newTSTimestampValueSliceCmd(ctx context.Context, args ...interface{}) *TSTimestampValueSliceCmd {
|
||||
return &TSTimestampValueSliceCmd{
|
||||
baseCmd: baseCmd{
|
||||
ctx: ctx,
|
||||
args: args,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (cmd *TSTimestampValueSliceCmd) String() string {
|
||||
return cmdString(cmd, cmd.val)
|
||||
}
|
||||
|
||||
func (cmd *TSTimestampValueSliceCmd) SetVal(val []TSTimestampValue) {
|
||||
cmd.val = val
|
||||
}
|
||||
|
||||
func (cmd *TSTimestampValueSliceCmd) Result() ([]TSTimestampValue, error) {
|
||||
return cmd.val, cmd.err
|
||||
}
|
||||
|
||||
func (cmd *TSTimestampValueSliceCmd) Val() []TSTimestampValue {
|
||||
return cmd.val
|
||||
}
|
||||
|
||||
func (cmd *TSTimestampValueSliceCmd) readReply(rd *proto.Reader) (err error) {
|
||||
n, err := rd.ReadArrayLen()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cmd.val = make([]TSTimestampValue, n)
|
||||
for i := 0; i < n; i++ {
|
||||
_, _ = rd.ReadArrayLen()
|
||||
timestamp, err := rd.ReadInt()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
value, err := rd.ReadString()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cmd.val[i].Timestamp = timestamp
|
||||
cmd.val[i].Value, err = strconv.ParseFloat(value, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// TSMRange - Returns a range of samples from multiple time-series keys.
|
||||
// For more information - https://redis.io/commands/ts.mrange/
|
||||
func (c cmdable) TSMRange(ctx context.Context, fromTimestamp int, toTimestamp int, filterExpr []string) *MapStringSliceInterfaceCmd {
|
||||
args := []interface{}{"TS.MRANGE", fromTimestamp, toTimestamp, "FILTER"}
|
||||
for _, f := range filterExpr {
|
||||
args = append(args, f)
|
||||
}
|
||||
cmd := NewMapStringSliceInterfaceCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
// TSMRangeWithArgs - Returns a range of samples from multiple time-series keys with additional options.
|
||||
// This function allows for specifying additional options such as:
|
||||
// Latest, FilterByTS, FilterByValue, WithLabels, SelectedLabels,
|
||||
// Count, Align, Aggregator, BucketDuration, BucketTimestamp,
|
||||
// Empty, GroupByLabel and Reducer.
|
||||
// For more information - https://redis.io/commands/ts.mrange/
|
||||
func (c cmdable) TSMRangeWithArgs(ctx context.Context, fromTimestamp int, toTimestamp int, filterExpr []string, options *TSMRangeOptions) *MapStringSliceInterfaceCmd {
|
||||
args := []interface{}{"TS.MRANGE", fromTimestamp, toTimestamp}
|
||||
if options != nil {
|
||||
if options.Latest {
|
||||
args = append(args, "LATEST")
|
||||
}
|
||||
if options.FilterByTS != nil {
|
||||
args = append(args, "FILTER_BY_TS")
|
||||
for _, f := range options.FilterByTS {
|
||||
args = append(args, f)
|
||||
}
|
||||
}
|
||||
if options.FilterByValue != nil {
|
||||
args = append(args, "FILTER_BY_VALUE")
|
||||
for _, f := range options.FilterByValue {
|
||||
args = append(args, f)
|
||||
}
|
||||
}
|
||||
if options.WithLabels {
|
||||
args = append(args, "WITHLABELS")
|
||||
}
|
||||
if options.SelectedLabels != nil {
|
||||
args = append(args, "SELECTED_LABELS")
|
||||
args = append(args, options.SelectedLabels...)
|
||||
}
|
||||
if options.Count != 0 {
|
||||
args = append(args, "COUNT", options.Count)
|
||||
}
|
||||
if options.Align != nil {
|
||||
args = append(args, "ALIGN", options.Align)
|
||||
}
|
||||
if options.Aggregator != 0 {
|
||||
args = append(args, "AGGREGATION", options.Aggregator.String())
|
||||
}
|
||||
if options.BucketDuration != 0 {
|
||||
args = append(args, options.BucketDuration)
|
||||
}
|
||||
if options.BucketTimestamp != nil {
|
||||
args = append(args, "BUCKETTIMESTAMP", options.BucketTimestamp)
|
||||
}
|
||||
if options.Empty {
|
||||
args = append(args, "EMPTY")
|
||||
}
|
||||
}
|
||||
args = append(args, "FILTER")
|
||||
for _, f := range filterExpr {
|
||||
args = append(args, f)
|
||||
}
|
||||
if options != nil {
|
||||
if options.GroupByLabel != nil {
|
||||
args = append(args, "GROUPBY", options.GroupByLabel)
|
||||
}
|
||||
if options.Reducer != nil {
|
||||
args = append(args, "REDUCE", options.Reducer)
|
||||
}
|
||||
}
|
||||
cmd := NewMapStringSliceInterfaceCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
// TSMRevRange - Returns a range of samples from multiple time-series keys in reverse order.
|
||||
// For more information - https://redis.io/commands/ts.mrevrange/
|
||||
func (c cmdable) TSMRevRange(ctx context.Context, fromTimestamp int, toTimestamp int, filterExpr []string) *MapStringSliceInterfaceCmd {
|
||||
args := []interface{}{"TS.MREVRANGE", fromTimestamp, toTimestamp, "FILTER"}
|
||||
for _, f := range filterExpr {
|
||||
args = append(args, f)
|
||||
}
|
||||
cmd := NewMapStringSliceInterfaceCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
// TSMRevRangeWithArgs - Returns a range of samples from multiple time-series keys in reverse order with additional options.
|
||||
// This function allows for specifying additional options such as:
|
||||
// Latest, FilterByTS, FilterByValue, WithLabels, SelectedLabels,
|
||||
// Count, Align, Aggregator, BucketDuration, BucketTimestamp,
|
||||
// Empty, GroupByLabel and Reducer.
|
||||
// For more information - https://redis.io/commands/ts.mrevrange/
|
||||
func (c cmdable) TSMRevRangeWithArgs(ctx context.Context, fromTimestamp int, toTimestamp int, filterExpr []string, options *TSMRevRangeOptions) *MapStringSliceInterfaceCmd {
|
||||
args := []interface{}{"TS.MREVRANGE", fromTimestamp, toTimestamp}
|
||||
if options != nil {
|
||||
if options.Latest {
|
||||
args = append(args, "LATEST")
|
||||
}
|
||||
if options.FilterByTS != nil {
|
||||
args = append(args, "FILTER_BY_TS")
|
||||
for _, f := range options.FilterByTS {
|
||||
args = append(args, f)
|
||||
}
|
||||
}
|
||||
if options.FilterByValue != nil {
|
||||
args = append(args, "FILTER_BY_VALUE")
|
||||
for _, f := range options.FilterByValue {
|
||||
args = append(args, f)
|
||||
}
|
||||
}
|
||||
if options.WithLabels {
|
||||
args = append(args, "WITHLABELS")
|
||||
}
|
||||
if options.SelectedLabels != nil {
|
||||
args = append(args, "SELECTED_LABELS")
|
||||
args = append(args, options.SelectedLabels...)
|
||||
}
|
||||
if options.Count != 0 {
|
||||
args = append(args, "COUNT", options.Count)
|
||||
}
|
||||
if options.Align != nil {
|
||||
args = append(args, "ALIGN", options.Align)
|
||||
}
|
||||
if options.Aggregator != 0 {
|
||||
args = append(args, "AGGREGATION", options.Aggregator.String())
|
||||
}
|
||||
if options.BucketDuration != 0 {
|
||||
args = append(args, options.BucketDuration)
|
||||
}
|
||||
if options.BucketTimestamp != nil {
|
||||
args = append(args, "BUCKETTIMESTAMP", options.BucketTimestamp)
|
||||
}
|
||||
if options.Empty {
|
||||
args = append(args, "EMPTY")
|
||||
}
|
||||
}
|
||||
args = append(args, "FILTER")
|
||||
for _, f := range filterExpr {
|
||||
args = append(args, f)
|
||||
}
|
||||
if options != nil {
|
||||
if options.GroupByLabel != nil {
|
||||
args = append(args, "GROUPBY", options.GroupByLabel)
|
||||
}
|
||||
if options.Reducer != nil {
|
||||
args = append(args, "REDUCE", options.Reducer)
|
||||
}
|
||||
}
|
||||
cmd := NewMapStringSliceInterfaceCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
// TSMGet - Returns the last sample of multiple time-series keys.
|
||||
// For more information - https://redis.io/commands/ts.mget/
|
||||
func (c cmdable) TSMGet(ctx context.Context, filters []string) *MapStringSliceInterfaceCmd {
|
||||
args := []interface{}{"TS.MGET", "FILTER"}
|
||||
for _, f := range filters {
|
||||
args = append(args, f)
|
||||
}
|
||||
cmd := NewMapStringSliceInterfaceCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
// TSMGetWithArgs - Returns the last sample of multiple time-series keys with additional options.
|
||||
// This function allows for specifying additional options such as:
|
||||
// Latest, WithLabels and SelectedLabels.
|
||||
// For more information - https://redis.io/commands/ts.mget/
|
||||
func (c cmdable) TSMGetWithArgs(ctx context.Context, filters []string, options *TSMGetOptions) *MapStringSliceInterfaceCmd {
|
||||
args := []interface{}{"TS.MGET"}
|
||||
if options != nil {
|
||||
if options.Latest {
|
||||
args = append(args, "LATEST")
|
||||
}
|
||||
if options.WithLabels {
|
||||
args = append(args, "WITHLABELS")
|
||||
}
|
||||
if options.SelectedLabels != nil {
|
||||
args = append(args, "SELECTED_LABELS")
|
||||
args = append(args, options.SelectedLabels...)
|
||||
}
|
||||
}
|
||||
args = append(args, "FILTER")
|
||||
for _, f := range filters {
|
||||
args = append(args, f)
|
||||
}
|
||||
cmd := NewMapStringSliceInterfaceCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
|
@ -0,0 +1,951 @@
|
|||
package redis_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
. "github.com/bsm/ginkgo/v2"
|
||||
. "github.com/bsm/gomega"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
||||
ctx := context.TODO()
|
||||
var client *redis.Client
|
||||
|
||||
BeforeEach(func() {
|
||||
client = redis.NewClient(&redis.Options{Addr: ":6379"})
|
||||
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
Expect(client.Close()).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
It("should TSCreate and TSCreateWithArgs", Label("timeseries", "tscreate", "tscreateWithArgs"), func() {
|
||||
result, err := client.TSCreate(ctx, "1").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo("OK"))
|
||||
// Test TSCreateWithArgs
|
||||
opt := &redis.TSOptions{Retention: 5}
|
||||
result, err = client.TSCreateWithArgs(ctx, "2", opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo("OK"))
|
||||
opt = &redis.TSOptions{Labels: map[string]string{"Redis": "Labs"}}
|
||||
result, err = client.TSCreateWithArgs(ctx, "3", opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo("OK"))
|
||||
opt = &redis.TSOptions{Labels: map[string]string{"Time": "Series"}, Retention: 20}
|
||||
result, err = client.TSCreateWithArgs(ctx, "4", opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo("OK"))
|
||||
resultInfo, err := client.TSInfo(ctx, "4").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultInfo["labels"].(map[interface{}]interface{})["Time"]).To(BeEquivalentTo("Series"))
|
||||
// Test chunk size
|
||||
opt = &redis.TSOptions{ChunkSize: 128}
|
||||
result, err = client.TSCreateWithArgs(ctx, "ts-cs-1", opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo("OK"))
|
||||
resultInfo, err = client.TSInfo(ctx, "ts-cs-1").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultInfo["chunkSize"]).To(BeEquivalentTo(128))
|
||||
// Test duplicate policy
|
||||
duplicate_policies := []string{"BLOCK", "LAST", "FIRST", "MIN", "MAX"}
|
||||
for _, dup := range duplicate_policies {
|
||||
keyName := "ts-dup-" + dup
|
||||
opt = &redis.TSOptions{DuplicatePolicy: dup}
|
||||
result, err = client.TSCreateWithArgs(ctx, keyName, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo("OK"))
|
||||
resultInfo, err = client.TSInfo(ctx, keyName).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(strings.ToUpper(resultInfo["duplicatePolicy"].(string))).To(BeEquivalentTo(dup))
|
||||
|
||||
}
|
||||
})
|
||||
It("should TSAdd and TSAddWithArgs", Label("timeseries", "tsadd", "tsaddWithArgs"), func() {
|
||||
result, err := client.TSAdd(ctx, "1", 1, 1).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo(1))
|
||||
// Test TSAddWithArgs
|
||||
opt := &redis.TSOptions{Retention: 10}
|
||||
result, err = client.TSAddWithArgs(ctx, "2", 2, 3, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo(2))
|
||||
opt = &redis.TSOptions{Labels: map[string]string{"Redis": "Labs"}}
|
||||
result, err = client.TSAddWithArgs(ctx, "3", 3, 2, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo(3))
|
||||
opt = &redis.TSOptions{Labels: map[string]string{"Redis": "Labs", "Time": "Series"}, Retention: 10}
|
||||
result, err = client.TSAddWithArgs(ctx, "4", 4, 2, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo(4))
|
||||
resultInfo, err := client.TSInfo(ctx, "4").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultInfo["labels"].(map[interface{}]interface{})["Time"]).To(BeEquivalentTo("Series"))
|
||||
// Test chunk size
|
||||
opt = &redis.TSOptions{ChunkSize: 128}
|
||||
result, err = client.TSAddWithArgs(ctx, "ts-cs-1", 1, 10, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo(1))
|
||||
resultInfo, err = client.TSInfo(ctx, "ts-cs-1").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultInfo["chunkSize"]).To(BeEquivalentTo(128))
|
||||
// Test duplicate policy
|
||||
// LAST
|
||||
opt = &redis.TSOptions{DuplicatePolicy: "LAST"}
|
||||
result, err = client.TSAddWithArgs(ctx, "tsal-1", 1, 5, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo(1))
|
||||
result, err = client.TSAddWithArgs(ctx, "tsal-1", 1, 10, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo(1))
|
||||
resultGet, err := client.TSGet(ctx, "tsal-1").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultGet.Value).To(BeEquivalentTo(10))
|
||||
// FIRST
|
||||
opt = &redis.TSOptions{DuplicatePolicy: "FIRST"}
|
||||
result, err = client.TSAddWithArgs(ctx, "tsaf-1", 1, 5, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo(1))
|
||||
result, err = client.TSAddWithArgs(ctx, "tsaf-1", 1, 10, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo(1))
|
||||
resultGet, err = client.TSGet(ctx, "tsaf-1").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultGet.Value).To(BeEquivalentTo(5))
|
||||
// MAX
|
||||
opt = &redis.TSOptions{DuplicatePolicy: "MAX"}
|
||||
result, err = client.TSAddWithArgs(ctx, "tsam-1", 1, 5, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo(1))
|
||||
result, err = client.TSAddWithArgs(ctx, "tsam-1", 1, 10, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo(1))
|
||||
resultGet, err = client.TSGet(ctx, "tsam-1").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultGet.Value).To(BeEquivalentTo(10))
|
||||
// MIN
|
||||
opt = &redis.TSOptions{DuplicatePolicy: "MIN"}
|
||||
result, err = client.TSAddWithArgs(ctx, "tsami-1", 1, 5, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo(1))
|
||||
result, err = client.TSAddWithArgs(ctx, "tsami-1", 1, 10, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo(1))
|
||||
resultGet, err = client.TSGet(ctx, "tsami-1").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultGet.Value).To(BeEquivalentTo(5))
|
||||
|
||||
})
|
||||
|
||||
It("should TSAlter", Label("timeseries", "tsalter"), func() {
|
||||
result, err := client.TSCreate(ctx, "1").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo("OK"))
|
||||
resultInfo, err := client.TSInfo(ctx, "1").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultInfo["retentionTime"]).To(BeEquivalentTo(0))
|
||||
|
||||
opt := &redis.TSAlterOptions{Retention: 10}
|
||||
resultAlter, err := client.TSAlter(ctx, "1", opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultAlter).To(BeEquivalentTo("OK"))
|
||||
|
||||
resultInfo, err = client.TSInfo(ctx, "1").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultInfo["retentionTime"]).To(BeEquivalentTo(10))
|
||||
|
||||
resultInfo, err = client.TSInfo(ctx, "1").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultInfo["labels"]).To(BeEquivalentTo(map[interface{}]interface{}{}))
|
||||
|
||||
opt = &redis.TSAlterOptions{Labels: map[string]string{"Time": "Series"}}
|
||||
resultAlter, err = client.TSAlter(ctx, "1", opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultAlter).To(BeEquivalentTo("OK"))
|
||||
|
||||
resultInfo, err = client.TSInfo(ctx, "1").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultInfo["labels"].(map[interface{}]interface{})["Time"]).To(BeEquivalentTo("Series"))
|
||||
Expect(resultInfo["retentionTime"]).To(BeEquivalentTo(10))
|
||||
Expect(resultInfo["duplicatePolicy"]).To(BeEquivalentTo(redis.Nil))
|
||||
opt = &redis.TSAlterOptions{DuplicatePolicy: "min"}
|
||||
resultAlter, err = client.TSAlter(ctx, "1", opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultAlter).To(BeEquivalentTo("OK"))
|
||||
|
||||
resultInfo, err = client.TSInfo(ctx, "1").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultInfo["duplicatePolicy"]).To(BeEquivalentTo("min"))
|
||||
|
||||
})
|
||||
|
||||
It("should TSCreateRule and TSDeleteRule", Label("timeseries", "tscreaterule", "tsdeleterule"), func() {
|
||||
result, err := client.TSCreate(ctx, "1").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo("OK"))
|
||||
result, err = client.TSCreate(ctx, "2").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo("OK"))
|
||||
result, err = client.TSCreateRule(ctx, "1", "2", redis.Avg, 100).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo("OK"))
|
||||
for i := 0; i < 50; i++ {
|
||||
resultAdd, err := client.TSAdd(ctx, "1", 100+i*2, 1).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultAdd).To(BeEquivalentTo(100 + i*2))
|
||||
resultAdd, err = client.TSAdd(ctx, "1", 100+i*2+1, 2).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultAdd).To(BeEquivalentTo(100 + i*2 + 1))
|
||||
|
||||
}
|
||||
resultAdd, err := client.TSAdd(ctx, "1", 100*2, 1.5).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultAdd).To(BeEquivalentTo(100 * 2))
|
||||
resultGet, err := client.TSGet(ctx, "2").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultGet.Value).To(BeEquivalentTo(1.5))
|
||||
Expect(resultGet.Timestamp).To(BeEquivalentTo(100))
|
||||
|
||||
resultDeleteRule, err := client.TSDeleteRule(ctx, "1", "2").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultDeleteRule).To(BeEquivalentTo("OK"))
|
||||
resultInfo, err := client.TSInfo(ctx, "1").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultInfo["rules"]).To(BeEquivalentTo(map[interface{}]interface{}{}))
|
||||
|
||||
})
|
||||
|
||||
It("should TSIncrBy, TSIncrByWithArgs, TSDecrBy and TSDecrByWithArgs", Label("timeseries", "tsincrby", "tsdecrby", "tsincrbyWithArgs", "tsdecrbyWithArgs"), func() {
|
||||
for i := 0; i < 100; i++ {
|
||||
_, err := client.TSIncrBy(ctx, "1", 1).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}
|
||||
result, err := client.TSGet(ctx, "1").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result.Value).To(BeEquivalentTo(100))
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
_, err := client.TSDecrBy(ctx, "1", 1).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}
|
||||
result, err = client.TSGet(ctx, "1").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result.Value).To(BeEquivalentTo(0))
|
||||
|
||||
opt := &redis.TSIncrDecrOptions{Timestamp: 5}
|
||||
_, err = client.TSIncrByWithArgs(ctx, "2", 1.5, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
result, err = client.TSGet(ctx, "2").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result.Timestamp).To(BeEquivalentTo(5))
|
||||
Expect(result.Value).To(BeEquivalentTo(1.5))
|
||||
|
||||
opt = &redis.TSIncrDecrOptions{Timestamp: 7}
|
||||
_, err = client.TSIncrByWithArgs(ctx, "2", 2.25, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
result, err = client.TSGet(ctx, "2").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result.Timestamp).To(BeEquivalentTo(7))
|
||||
Expect(result.Value).To(BeEquivalentTo(3.75))
|
||||
|
||||
opt = &redis.TSIncrDecrOptions{Timestamp: 15}
|
||||
_, err = client.TSDecrByWithArgs(ctx, "2", 1.5, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
result, err = client.TSGet(ctx, "2").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result.Timestamp).To(BeEquivalentTo(15))
|
||||
Expect(result.Value).To(BeEquivalentTo(2.25))
|
||||
|
||||
// Test chunk size INCRBY
|
||||
opt = &redis.TSIncrDecrOptions{ChunkSize: 128}
|
||||
_, err = client.TSIncrByWithArgs(ctx, "3", 10, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
resultInfo, err := client.TSInfo(ctx, "3").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultInfo["chunkSize"]).To(BeEquivalentTo(128))
|
||||
|
||||
// Test chunk size DECRBY
|
||||
opt = &redis.TSIncrDecrOptions{ChunkSize: 128}
|
||||
_, err = client.TSDecrByWithArgs(ctx, "4", 10, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
resultInfo, err = client.TSInfo(ctx, "4").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultInfo["chunkSize"]).To(BeEquivalentTo(128))
|
||||
})
|
||||
|
||||
It("should TSGet", Label("timeseries", "tsget"), func() {
|
||||
opt := &redis.TSOptions{DuplicatePolicy: "max"}
|
||||
resultGet, err := client.TSAddWithArgs(ctx, "foo", 2265985, 151, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultGet).To(BeEquivalentTo(2265985))
|
||||
result, err := client.TSGet(ctx, "foo").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result.Timestamp).To(BeEquivalentTo(2265985))
|
||||
Expect(result.Value).To(BeEquivalentTo(151))
|
||||
|
||||
})
|
||||
|
||||
It("should TSGet Latest", Label("timeseries", "tsgetlatest"), func() {
|
||||
resultGet, err := client.TSCreate(ctx, "tsgl-1").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultGet).To(BeEquivalentTo("OK"))
|
||||
resultGet, err = client.TSCreate(ctx, "tsgl-2").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultGet).To(BeEquivalentTo("OK"))
|
||||
resultGet, err = client.TSCreateRule(ctx, "tsgl-1", "tsgl-2", redis.Sum, 10).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultGet).To(BeEquivalentTo("OK"))
|
||||
_, err = client.TSAdd(ctx, "tsgl-1", 1, 1).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
_, err = client.TSAdd(ctx, "tsgl-1", 2, 3).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
_, err = client.TSAdd(ctx, "tsgl-1", 11, 7).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
_, err = client.TSAdd(ctx, "tsgl-1", 13, 1).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
result, errGet := client.TSGet(ctx, "tsgl-2").Result()
|
||||
Expect(errGet).NotTo(HaveOccurred())
|
||||
Expect(result.Timestamp).To(BeEquivalentTo(0))
|
||||
Expect(result.Value).To(BeEquivalentTo(4))
|
||||
result, errGet = client.TSGetWithArgs(ctx, "tsgl-2", &redis.TSGetOptions{Latest: true}).Result()
|
||||
Expect(errGet).NotTo(HaveOccurred())
|
||||
Expect(result.Timestamp).To(BeEquivalentTo(10))
|
||||
Expect(result.Value).To(BeEquivalentTo(8))
|
||||
})
|
||||
|
||||
It("should TSInfo", Label("timeseries", "tsinfo"), func() {
|
||||
resultGet, err := client.TSAdd(ctx, "foo", 2265985, 151).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultGet).To(BeEquivalentTo(2265985))
|
||||
result, err := client.TSInfo(ctx, "foo").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result["firstTimestamp"]).To(BeEquivalentTo(2265985))
|
||||
|
||||
})
|
||||
|
||||
It("should TSMAdd", Label("timeseries", "tsmadd"), func() {
|
||||
resultGet, err := client.TSCreate(ctx, "a").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultGet).To(BeEquivalentTo("OK"))
|
||||
ktvSlices := make([][]interface{}, 3)
|
||||
for i := 0; i < 3; i++ {
|
||||
ktvSlices[i] = make([]interface{}, 3)
|
||||
ktvSlices[i][0] = "a"
|
||||
for j := 1; j < 3; j++ {
|
||||
ktvSlices[i][j] = (i + j) * j
|
||||
}
|
||||
}
|
||||
result, err := client.TSMAdd(ctx, ktvSlices).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo([]int64{1, 2, 3}))
|
||||
|
||||
})
|
||||
|
||||
It("should TSMGet and TSMGetWithArgs", Label("timeseries", "tsmget", "tsmgetWithArgs"), func() {
|
||||
opt := &redis.TSOptions{Labels: map[string]string{"Test": "This"}}
|
||||
resultCreate, err := client.TSCreateWithArgs(ctx, "a", opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultCreate).To(BeEquivalentTo("OK"))
|
||||
opt = &redis.TSOptions{Labels: map[string]string{"Test": "This", "Taste": "That"}}
|
||||
resultCreate, err = client.TSCreateWithArgs(ctx, "b", opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultCreate).To(BeEquivalentTo("OK"))
|
||||
_, err = client.TSAdd(ctx, "a", "*", 15).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
_, err = client.TSAdd(ctx, "b", "*", 25).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
result, err := client.TSMGet(ctx, []string{"Test=This"}).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result["a"][1].([]interface{})[1]).To(BeEquivalentTo(15))
|
||||
Expect(result["b"][1].([]interface{})[1]).To(BeEquivalentTo(25))
|
||||
mgetOpt := &redis.TSMGetOptions{WithLabels: true}
|
||||
result, err = client.TSMGetWithArgs(ctx, []string{"Test=This"}, mgetOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result["b"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"Test": "This", "Taste": "That"}))
|
||||
|
||||
resultCreate, err = client.TSCreate(ctx, "c").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultCreate).To(BeEquivalentTo("OK"))
|
||||
opt = &redis.TSOptions{Labels: map[string]string{"is_compaction": "true"}}
|
||||
resultCreate, err = client.TSCreateWithArgs(ctx, "d", opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultCreate).To(BeEquivalentTo("OK"))
|
||||
resultCreateRule, err := client.TSCreateRule(ctx, "c", "d", redis.Sum, 10).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultCreateRule).To(BeEquivalentTo("OK"))
|
||||
_, err = client.TSAdd(ctx, "c", 1, 1).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
_, err = client.TSAdd(ctx, "c", 2, 3).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
_, err = client.TSAdd(ctx, "c", 11, 7).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
_, err = client.TSAdd(ctx, "c", 13, 1).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
result, err = client.TSMGet(ctx, []string{"is_compaction=true"}).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result["d"][1]).To(BeEquivalentTo([]interface{}{int64(0), 4.0}))
|
||||
mgetOpt = &redis.TSMGetOptions{Latest: true}
|
||||
result, err = client.TSMGetWithArgs(ctx, []string{"is_compaction=true"}, mgetOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result["d"][1]).To(BeEquivalentTo([]interface{}{int64(10), 8.0}))
|
||||
|
||||
})
|
||||
|
||||
It("should TSQueryIndex", Label("timeseries", "tsqueryindex"), func() {
|
||||
opt := &redis.TSOptions{Labels: map[string]string{"Test": "This"}}
|
||||
resultCreate, err := client.TSCreateWithArgs(ctx, "a", opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultCreate).To(BeEquivalentTo("OK"))
|
||||
opt = &redis.TSOptions{Labels: map[string]string{"Test": "This", "Taste": "That"}}
|
||||
resultCreate, err = client.TSCreateWithArgs(ctx, "b", opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultCreate).To(BeEquivalentTo("OK"))
|
||||
result, err := client.TSQueryIndex(ctx, []string{"Test=This"}).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(result)).To(BeEquivalentTo(2))
|
||||
result, err = client.TSQueryIndex(ctx, []string{"Taste=That"}).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(result)).To(BeEquivalentTo(1))
|
||||
|
||||
})
|
||||
|
||||
It("should TSDel and TSRange", Label("timeseries", "tsdel", "tsrange"), func() {
|
||||
for i := 0; i < 100; i++ {
|
||||
_, err := client.TSAdd(ctx, "a", i, float64(i%7)).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}
|
||||
resultDelete, err := client.TSDel(ctx, "a", 0, 21).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultDelete).To(BeEquivalentTo(22))
|
||||
|
||||
resultRange, err := client.TSRange(ctx, "a", 0, 21).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultRange).To(BeEquivalentTo([]redis.TSTimestampValue{}))
|
||||
|
||||
resultRange, err = client.TSRange(ctx, "a", 22, 22).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultRange[0]).To(BeEquivalentTo(redis.TSTimestampValue{Timestamp: 22, Value: 1}))
|
||||
})
|
||||
|
||||
It("should TSRange, TSRangeWithArgs", Label("timeseries", "tsrange", "tsrangeWithArgs"), func() {
|
||||
for i := 0; i < 100; i++ {
|
||||
_, err := client.TSAdd(ctx, "a", i, float64(i%7)).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
}
|
||||
result, err := client.TSRange(ctx, "a", 0, 200).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(result)).To(BeEquivalentTo(100))
|
||||
for i := 0; i < 100; i++ {
|
||||
client.TSAdd(ctx, "a", i+200, float64(i%7))
|
||||
}
|
||||
result, err = client.TSRange(ctx, "a", 0, 500).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(result)).To(BeEquivalentTo(200))
|
||||
fts := make([]int, 0)
|
||||
for i := 10; i < 20; i++ {
|
||||
fts = append(fts, i)
|
||||
}
|
||||
opt := &redis.TSRangeOptions{FilterByTS: fts, FilterByValue: []int{1, 2}}
|
||||
result, err = client.TSRangeWithArgs(ctx, "a", 0, 500, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(result)).To(BeEquivalentTo(2))
|
||||
opt = &redis.TSRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: "+"}
|
||||
result, err = client.TSRangeWithArgs(ctx, "a", 0, 10, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 0, Value: 10}, {Timestamp: 10, Value: 1}}))
|
||||
opt = &redis.TSRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: "5"}
|
||||
result, err = client.TSRangeWithArgs(ctx, "a", 0, 10, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 0, Value: 5}, {Timestamp: 5, Value: 6}}))
|
||||
opt = &redis.TSRangeOptions{Aggregator: redis.Twa, BucketDuration: 10}
|
||||
result, err = client.TSRangeWithArgs(ctx, "a", 0, 10, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 0, Value: 2.55}, {Timestamp: 10, Value: 3}}))
|
||||
// Test Range Latest
|
||||
resultCreate, err := client.TSCreate(ctx, "t1").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultCreate).To(BeEquivalentTo("OK"))
|
||||
resultCreate, err = client.TSCreate(ctx, "t2").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultCreate).To(BeEquivalentTo("OK"))
|
||||
resultRule, err := client.TSCreateRule(ctx, "t1", "t2", redis.Sum, 10).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultRule).To(BeEquivalentTo("OK"))
|
||||
_, errAdd := client.TSAdd(ctx, "t1", 1, 1).Result()
|
||||
Expect(errAdd).NotTo(HaveOccurred())
|
||||
_, errAdd = client.TSAdd(ctx, "t1", 2, 3).Result()
|
||||
Expect(errAdd).NotTo(HaveOccurred())
|
||||
_, errAdd = client.TSAdd(ctx, "t1", 11, 7).Result()
|
||||
Expect(errAdd).NotTo(HaveOccurred())
|
||||
_, errAdd = client.TSAdd(ctx, "t1", 13, 1).Result()
|
||||
Expect(errAdd).NotTo(HaveOccurred())
|
||||
resultRange, err := client.TSRange(ctx, "t1", 0, 20).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultRange[0]).To(BeEquivalentTo(redis.TSTimestampValue{Timestamp: 1, Value: 1}))
|
||||
|
||||
opt = &redis.TSRangeOptions{Latest: true}
|
||||
resultRange, err = client.TSRangeWithArgs(ctx, "t2", 0, 10, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultRange[0]).To(BeEquivalentTo(redis.TSTimestampValue{Timestamp: 0, Value: 4}))
|
||||
// Test Bucket Timestamp
|
||||
resultCreate, err = client.TSCreate(ctx, "t3").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultCreate).To(BeEquivalentTo("OK"))
|
||||
_, errAdd = client.TSAdd(ctx, "t3", 15, 1).Result()
|
||||
Expect(errAdd).NotTo(HaveOccurred())
|
||||
_, errAdd = client.TSAdd(ctx, "t3", 17, 4).Result()
|
||||
Expect(errAdd).NotTo(HaveOccurred())
|
||||
_, errAdd = client.TSAdd(ctx, "t3", 51, 3).Result()
|
||||
Expect(errAdd).NotTo(HaveOccurred())
|
||||
_, errAdd = client.TSAdd(ctx, "t3", 73, 5).Result()
|
||||
Expect(errAdd).NotTo(HaveOccurred())
|
||||
_, errAdd = client.TSAdd(ctx, "t3", 75, 3).Result()
|
||||
Expect(errAdd).NotTo(HaveOccurred())
|
||||
|
||||
opt = &redis.TSRangeOptions{Aggregator: redis.Max, Align: 0, BucketDuration: 10}
|
||||
resultRange, err = client.TSRangeWithArgs(ctx, "t3", 0, 100, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultRange[0]).To(BeEquivalentTo(redis.TSTimestampValue{Timestamp: 10, Value: 4}))
|
||||
Expect(len(resultRange)).To(BeEquivalentTo(3))
|
||||
|
||||
opt = &redis.TSRangeOptions{Aggregator: redis.Max, Align: 0, BucketDuration: 10, BucketTimestamp: "+"}
|
||||
resultRange, err = client.TSRangeWithArgs(ctx, "t3", 0, 100, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultRange[0]).To(BeEquivalentTo(redis.TSTimestampValue{Timestamp: 20, Value: 4}))
|
||||
Expect(len(resultRange)).To(BeEquivalentTo(3))
|
||||
// Test Empty
|
||||
_, errAdd = client.TSAdd(ctx, "t4", 15, 1).Result()
|
||||
Expect(errAdd).NotTo(HaveOccurred())
|
||||
_, errAdd = client.TSAdd(ctx, "t4", 17, 4).Result()
|
||||
Expect(errAdd).NotTo(HaveOccurred())
|
||||
_, errAdd = client.TSAdd(ctx, "t4", 51, 3).Result()
|
||||
Expect(errAdd).NotTo(HaveOccurred())
|
||||
_, errAdd = client.TSAdd(ctx, "t4", 73, 5).Result()
|
||||
Expect(errAdd).NotTo(HaveOccurred())
|
||||
_, errAdd = client.TSAdd(ctx, "t4", 75, 3).Result()
|
||||
Expect(errAdd).NotTo(HaveOccurred())
|
||||
|
||||
opt = &redis.TSRangeOptions{Aggregator: redis.Max, Align: 0, BucketDuration: 10}
|
||||
resultRange, err = client.TSRangeWithArgs(ctx, "t4", 0, 100, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultRange[0]).To(BeEquivalentTo(redis.TSTimestampValue{Timestamp: 10, Value: 4}))
|
||||
Expect(len(resultRange)).To(BeEquivalentTo(3))
|
||||
|
||||
opt = &redis.TSRangeOptions{Aggregator: redis.Max, Align: 0, BucketDuration: 10, Empty: true}
|
||||
resultRange, err = client.TSRangeWithArgs(ctx, "t4", 0, 100, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultRange[0]).To(BeEquivalentTo(redis.TSTimestampValue{Timestamp: 10, Value: 4}))
|
||||
Expect(len(resultRange)).To(BeEquivalentTo(7))
|
||||
})
|
||||
|
||||
It("should TSRevRange, TSRevRangeWithArgs", Label("timeseries", "tsrevrange", "tsrevrangeWithArgs"), func() {
|
||||
for i := 0; i < 100; i++ {
|
||||
_, err := client.TSAdd(ctx, "a", i, float64(i%7)).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
}
|
||||
result, err := client.TSRange(ctx, "a", 0, 200).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(result)).To(BeEquivalentTo(100))
|
||||
for i := 0; i < 100; i++ {
|
||||
client.TSAdd(ctx, "a", i+200, float64(i%7))
|
||||
}
|
||||
result, err = client.TSRange(ctx, "a", 0, 500).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(result)).To(BeEquivalentTo(200))
|
||||
|
||||
opt := &redis.TSRevRangeOptions{Aggregator: redis.Avg, BucketDuration: 10}
|
||||
result, err = client.TSRevRangeWithArgs(ctx, "a", 0, 500, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(result)).To(BeEquivalentTo(20))
|
||||
|
||||
opt = &redis.TSRevRangeOptions{Count: 10}
|
||||
result, err = client.TSRevRangeWithArgs(ctx, "a", 0, 500, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(result)).To(BeEquivalentTo(10))
|
||||
|
||||
fts := make([]int, 0)
|
||||
for i := 10; i < 20; i++ {
|
||||
fts = append(fts, i)
|
||||
}
|
||||
opt = &redis.TSRevRangeOptions{FilterByTS: fts, FilterByValue: []int{1, 2}}
|
||||
result, err = client.TSRevRangeWithArgs(ctx, "a", 0, 500, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(result)).To(BeEquivalentTo(2))
|
||||
|
||||
opt = &redis.TSRevRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: "+"}
|
||||
result, err = client.TSRevRangeWithArgs(ctx, "a", 0, 10, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 10, Value: 1}, {Timestamp: 0, Value: 10}}))
|
||||
|
||||
opt = &redis.TSRevRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: "1"}
|
||||
result, err = client.TSRevRangeWithArgs(ctx, "a", 0, 10, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 1, Value: 10}, {Timestamp: 0, Value: 1}}))
|
||||
|
||||
opt = &redis.TSRevRangeOptions{Aggregator: redis.Twa, BucketDuration: 10}
|
||||
result, err = client.TSRevRangeWithArgs(ctx, "a", 0, 10, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 10, Value: 3}, {Timestamp: 0, Value: 2.55}}))
|
||||
// Test Range Latest
|
||||
resultCreate, err := client.TSCreate(ctx, "t1").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultCreate).To(BeEquivalentTo("OK"))
|
||||
resultCreate, err = client.TSCreate(ctx, "t2").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultCreate).To(BeEquivalentTo("OK"))
|
||||
resultRule, err := client.TSCreateRule(ctx, "t1", "t2", redis.Sum, 10).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultRule).To(BeEquivalentTo("OK"))
|
||||
_, errAdd := client.TSAdd(ctx, "t1", 1, 1).Result()
|
||||
Expect(errAdd).NotTo(HaveOccurred())
|
||||
_, errAdd = client.TSAdd(ctx, "t1", 2, 3).Result()
|
||||
Expect(errAdd).NotTo(HaveOccurred())
|
||||
_, errAdd = client.TSAdd(ctx, "t1", 11, 7).Result()
|
||||
Expect(errAdd).NotTo(HaveOccurred())
|
||||
_, errAdd = client.TSAdd(ctx, "t1", 13, 1).Result()
|
||||
Expect(errAdd).NotTo(HaveOccurred())
|
||||
resultRange, err := client.TSRange(ctx, "t2", 0, 10).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultRange[0]).To(BeEquivalentTo(redis.TSTimestampValue{Timestamp: 0, Value: 4}))
|
||||
opt = &redis.TSRevRangeOptions{Latest: true}
|
||||
resultRange, err = client.TSRevRangeWithArgs(ctx, "t2", 0, 10, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultRange[0]).To(BeEquivalentTo(redis.TSTimestampValue{Timestamp: 10, Value: 8}))
|
||||
resultRange, err = client.TSRevRangeWithArgs(ctx, "t2", 0, 9, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultRange[0]).To(BeEquivalentTo(redis.TSTimestampValue{Timestamp: 0, Value: 4}))
|
||||
// Test Bucket Timestamp
|
||||
resultCreate, err = client.TSCreate(ctx, "t3").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultCreate).To(BeEquivalentTo("OK"))
|
||||
_, errAdd = client.TSAdd(ctx, "t3", 15, 1).Result()
|
||||
Expect(errAdd).NotTo(HaveOccurred())
|
||||
_, errAdd = client.TSAdd(ctx, "t3", 17, 4).Result()
|
||||
Expect(errAdd).NotTo(HaveOccurred())
|
||||
_, errAdd = client.TSAdd(ctx, "t3", 51, 3).Result()
|
||||
Expect(errAdd).NotTo(HaveOccurred())
|
||||
_, errAdd = client.TSAdd(ctx, "t3", 73, 5).Result()
|
||||
Expect(errAdd).NotTo(HaveOccurred())
|
||||
_, errAdd = client.TSAdd(ctx, "t3", 75, 3).Result()
|
||||
Expect(errAdd).NotTo(HaveOccurred())
|
||||
|
||||
opt = &redis.TSRevRangeOptions{Aggregator: redis.Max, Align: 0, BucketDuration: 10}
|
||||
resultRange, err = client.TSRevRangeWithArgs(ctx, "t3", 0, 100, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultRange[0]).To(BeEquivalentTo(redis.TSTimestampValue{Timestamp: 70, Value: 5}))
|
||||
Expect(len(resultRange)).To(BeEquivalentTo(3))
|
||||
|
||||
opt = &redis.TSRevRangeOptions{Aggregator: redis.Max, Align: 0, BucketDuration: 10, BucketTimestamp: "+"}
|
||||
resultRange, err = client.TSRevRangeWithArgs(ctx, "t3", 0, 100, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultRange[0]).To(BeEquivalentTo(redis.TSTimestampValue{Timestamp: 80, Value: 5}))
|
||||
Expect(len(resultRange)).To(BeEquivalentTo(3))
|
||||
// Test Empty
|
||||
_, errAdd = client.TSAdd(ctx, "t4", 15, 1).Result()
|
||||
Expect(errAdd).NotTo(HaveOccurred())
|
||||
_, errAdd = client.TSAdd(ctx, "t4", 17, 4).Result()
|
||||
Expect(errAdd).NotTo(HaveOccurred())
|
||||
_, errAdd = client.TSAdd(ctx, "t4", 51, 3).Result()
|
||||
Expect(errAdd).NotTo(HaveOccurred())
|
||||
_, errAdd = client.TSAdd(ctx, "t4", 73, 5).Result()
|
||||
Expect(errAdd).NotTo(HaveOccurred())
|
||||
_, errAdd = client.TSAdd(ctx, "t4", 75, 3).Result()
|
||||
Expect(errAdd).NotTo(HaveOccurred())
|
||||
|
||||
opt = &redis.TSRevRangeOptions{Aggregator: redis.Max, Align: 0, BucketDuration: 10}
|
||||
resultRange, err = client.TSRevRangeWithArgs(ctx, "t4", 0, 100, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultRange[0]).To(BeEquivalentTo(redis.TSTimestampValue{Timestamp: 70, Value: 5}))
|
||||
Expect(len(resultRange)).To(BeEquivalentTo(3))
|
||||
|
||||
opt = &redis.TSRevRangeOptions{Aggregator: redis.Max, Align: 0, BucketDuration: 10, Empty: true}
|
||||
resultRange, err = client.TSRevRangeWithArgs(ctx, "t4", 0, 100, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultRange[0]).To(BeEquivalentTo(redis.TSTimestampValue{Timestamp: 70, Value: 5}))
|
||||
Expect(len(resultRange)).To(BeEquivalentTo(7))
|
||||
|
||||
})
|
||||
|
||||
It("should TSMRange and TSMRangeWithArgs", Label("timeseries", "tsmrange", "tsmrangeWithArgs"), func() {
|
||||
createOpt := &redis.TSOptions{Labels: map[string]string{"Test": "This", "team": "ny"}}
|
||||
resultCreate, err := client.TSCreateWithArgs(ctx, "a", createOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultCreate).To(BeEquivalentTo("OK"))
|
||||
createOpt = &redis.TSOptions{Labels: map[string]string{"Test": "This", "Taste": "That", "team": "sf"}}
|
||||
resultCreate, err = client.TSCreateWithArgs(ctx, "b", createOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultCreate).To(BeEquivalentTo("OK"))
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
_, err := client.TSAdd(ctx, "a", i, float64(i%7)).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
_, err = client.TSAdd(ctx, "b", i, float64(i%11)).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}
|
||||
|
||||
result, err := client.TSMRange(ctx, 0, 200, []string{"Test=This"}).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(result)).To(BeEquivalentTo(2))
|
||||
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(100))
|
||||
// Test Count
|
||||
mrangeOpt := &redis.TSMRangeOptions{Count: 10}
|
||||
result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(10))
|
||||
// Test Aggregation and BucketDuration
|
||||
for i := 0; i < 100; i++ {
|
||||
_, err := client.TSAdd(ctx, "a", i+200, float64(i%7)).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}
|
||||
mrangeOpt = &redis.TSMRangeOptions{Aggregator: redis.Avg, BucketDuration: 10}
|
||||
result, err = client.TSMRangeWithArgs(ctx, 0, 500, []string{"Test=This"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(result)).To(BeEquivalentTo(2))
|
||||
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(20))
|
||||
// Test WithLabels
|
||||
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{}))
|
||||
mrangeOpt = &redis.TSMRangeOptions{WithLabels: true}
|
||||
result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"Test": "This", "team": "ny"}))
|
||||
// Test SelectedLabels
|
||||
mrangeOpt = &redis.TSMRangeOptions{SelectedLabels: []interface{}{"team"}}
|
||||
result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "ny"}))
|
||||
Expect(result["b"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "sf"}))
|
||||
// Test FilterBy
|
||||
fts := make([]int, 0)
|
||||
for i := 10; i < 20; i++ {
|
||||
fts = append(fts, i)
|
||||
}
|
||||
mrangeOpt = &redis.TSMRangeOptions{FilterByTS: fts, FilterByValue: []int{1, 2}}
|
||||
result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(15), 1.0}, []interface{}{int64(16), 2.0}}))
|
||||
// Test GroupBy
|
||||
mrangeOpt = &redis.TSMRangeOptions{GroupByLabel: "Test", Reducer: "sum"}
|
||||
result, err = client.TSMRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 2.0}, []interface{}{int64(2), 4.0}, []interface{}{int64(3), 6.0}}))
|
||||
|
||||
mrangeOpt = &redis.TSMRangeOptions{GroupByLabel: "Test", Reducer: "max"}
|
||||
result, err = client.TSMRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(3), 3.0}}))
|
||||
|
||||
mrangeOpt = &redis.TSMRangeOptions{GroupByLabel: "team", Reducer: "min"}
|
||||
result, err = client.TSMRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(result)).To(BeEquivalentTo(2))
|
||||
Expect(result["team=ny"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(3), 3.0}}))
|
||||
Expect(result["team=sf"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(3), 3.0}}))
|
||||
// Test Align
|
||||
mrangeOpt = &redis.TSMRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: "-"}
|
||||
result, err = client.TSMRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 10.0}, []interface{}{int64(10), 1.0}}))
|
||||
|
||||
mrangeOpt = &redis.TSMRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: 5}
|
||||
result, err = client.TSMRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 5.0}, []interface{}{int64(5), 6.0}}))
|
||||
|
||||
})
|
||||
|
||||
It("should TSMRangeWithArgs Latest", Label("timeseries", "tsmrangeWithArgs", "tsmrangelatest"), func() {
|
||||
resultCreate, err := client.TSCreate(ctx, "a").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultCreate).To(BeEquivalentTo("OK"))
|
||||
opt := &redis.TSOptions{Labels: map[string]string{"is_compaction": "true"}}
|
||||
resultCreate, err = client.TSCreateWithArgs(ctx, "b", opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultCreate).To(BeEquivalentTo("OK"))
|
||||
|
||||
resultCreate, err = client.TSCreate(ctx, "c").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultCreate).To(BeEquivalentTo("OK"))
|
||||
opt = &redis.TSOptions{Labels: map[string]string{"is_compaction": "true"}}
|
||||
resultCreate, err = client.TSCreateWithArgs(ctx, "d", opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultCreate).To(BeEquivalentTo("OK"))
|
||||
|
||||
resultCreateRule, err := client.TSCreateRule(ctx, "a", "b", redis.Sum, 10).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultCreateRule).To(BeEquivalentTo("OK"))
|
||||
resultCreateRule, err = client.TSCreateRule(ctx, "c", "d", redis.Sum, 10).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultCreateRule).To(BeEquivalentTo("OK"))
|
||||
|
||||
_, err = client.TSAdd(ctx, "a", 1, 1).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
_, err = client.TSAdd(ctx, "a", 2, 3).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
_, err = client.TSAdd(ctx, "a", 11, 7).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
_, err = client.TSAdd(ctx, "a", 13, 1).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
_, err = client.TSAdd(ctx, "c", 1, 1).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
_, err = client.TSAdd(ctx, "c", 2, 3).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
_, err = client.TSAdd(ctx, "c", 11, 7).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
_, err = client.TSAdd(ctx, "c", 13, 1).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
mrangeOpt := &redis.TSMRangeOptions{Latest: true}
|
||||
result, err := client.TSMRangeWithArgs(ctx, 0, 10, []string{"is_compaction=true"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result["b"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 4.0}, []interface{}{int64(10), 8.0}}))
|
||||
Expect(result["d"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 4.0}, []interface{}{int64(10), 8.0}}))
|
||||
|
||||
})
|
||||
It("should TSMRevRange and TSMRevRangeWithArgs", Label("timeseries", "tsmrevrange", "tsmrevrangeWithArgs"), func() {
|
||||
createOpt := &redis.TSOptions{Labels: map[string]string{"Test": "This", "team": "ny"}}
|
||||
resultCreate, err := client.TSCreateWithArgs(ctx, "a", createOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultCreate).To(BeEquivalentTo("OK"))
|
||||
createOpt = &redis.TSOptions{Labels: map[string]string{"Test": "This", "Taste": "That", "team": "sf"}}
|
||||
resultCreate, err = client.TSCreateWithArgs(ctx, "b", createOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultCreate).To(BeEquivalentTo("OK"))
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
_, err := client.TSAdd(ctx, "a", i, float64(i%7)).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
_, err = client.TSAdd(ctx, "b", i, float64(i%11)).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}
|
||||
result, err := client.TSMRevRange(ctx, 0, 200, []string{"Test=This"}).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(result)).To(BeEquivalentTo(2))
|
||||
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(100))
|
||||
// Test Count
|
||||
mrangeOpt := &redis.TSMRevRangeOptions{Count: 10}
|
||||
result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(10))
|
||||
// Test Aggregation and BucketDuration
|
||||
for i := 0; i < 100; i++ {
|
||||
_, err := client.TSAdd(ctx, "a", i+200, float64(i%7)).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}
|
||||
mrangeOpt = &redis.TSMRevRangeOptions{Aggregator: redis.Avg, BucketDuration: 10}
|
||||
result, err = client.TSMRevRangeWithArgs(ctx, 0, 500, []string{"Test=This"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(result)).To(BeEquivalentTo(2))
|
||||
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(20))
|
||||
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{}))
|
||||
// Test WithLabels
|
||||
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{}))
|
||||
mrangeOpt = &redis.TSMRevRangeOptions{WithLabels: true}
|
||||
result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"Test": "This", "team": "ny"}))
|
||||
// Test SelectedLabels
|
||||
mrangeOpt = &redis.TSMRevRangeOptions{SelectedLabels: []interface{}{"team"}}
|
||||
result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "ny"}))
|
||||
Expect(result["b"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "sf"}))
|
||||
// Test FilterBy
|
||||
fts := make([]int, 0)
|
||||
for i := 10; i < 20; i++ {
|
||||
fts = append(fts, i)
|
||||
}
|
||||
mrangeOpt = &redis.TSMRevRangeOptions{FilterByTS: fts, FilterByValue: []int{1, 2}}
|
||||
result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(16), 2.0}, []interface{}{int64(15), 1.0}}))
|
||||
// Test GroupBy
|
||||
mrangeOpt = &redis.TSMRevRangeOptions{GroupByLabel: "Test", Reducer: "sum"}
|
||||
result, err = client.TSMRevRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 6.0}, []interface{}{int64(2), 4.0}, []interface{}{int64(1), 2.0}, []interface{}{int64(0), 0.0}}))
|
||||
|
||||
mrangeOpt = &redis.TSMRevRangeOptions{GroupByLabel: "Test", Reducer: "max"}
|
||||
result, err = client.TSMRevRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 3.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(0), 0.0}}))
|
||||
|
||||
mrangeOpt = &redis.TSMRevRangeOptions{GroupByLabel: "team", Reducer: "min"}
|
||||
result, err = client.TSMRevRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(result)).To(BeEquivalentTo(2))
|
||||
Expect(result["team=ny"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 3.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(0), 0.0}}))
|
||||
Expect(result["team=sf"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 3.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(0), 0.0}}))
|
||||
// Test Align
|
||||
mrangeOpt = &redis.TSMRevRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: "-"}
|
||||
result, err = client.TSMRevRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), 1.0}, []interface{}{int64(0), 10.0}}))
|
||||
|
||||
mrangeOpt = &redis.TSMRevRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: 1}
|
||||
result, err = client.TSMRevRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(1), 10.0}, []interface{}{int64(0), 1.0}}))
|
||||
|
||||
})
|
||||
|
||||
It("should TSMRevRangeWithArgs Latest", Label("timeseries", "tsmrevrangeWithArgs", "tsmrevrangelatest"), func() {
|
||||
resultCreate, err := client.TSCreate(ctx, "a").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultCreate).To(BeEquivalentTo("OK"))
|
||||
opt := &redis.TSOptions{Labels: map[string]string{"is_compaction": "true"}}
|
||||
resultCreate, err = client.TSCreateWithArgs(ctx, "b", opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultCreate).To(BeEquivalentTo("OK"))
|
||||
|
||||
resultCreate, err = client.TSCreate(ctx, "c").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultCreate).To(BeEquivalentTo("OK"))
|
||||
opt = &redis.TSOptions{Labels: map[string]string{"is_compaction": "true"}}
|
||||
resultCreate, err = client.TSCreateWithArgs(ctx, "d", opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultCreate).To(BeEquivalentTo("OK"))
|
||||
|
||||
resultCreateRule, err := client.TSCreateRule(ctx, "a", "b", redis.Sum, 10).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultCreateRule).To(BeEquivalentTo("OK"))
|
||||
resultCreateRule, err = client.TSCreateRule(ctx, "c", "d", redis.Sum, 10).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultCreateRule).To(BeEquivalentTo("OK"))
|
||||
|
||||
_, err = client.TSAdd(ctx, "a", 1, 1).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
_, err = client.TSAdd(ctx, "a", 2, 3).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
_, err = client.TSAdd(ctx, "a", 11, 7).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
_, err = client.TSAdd(ctx, "a", 13, 1).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
_, err = client.TSAdd(ctx, "c", 1, 1).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
_, err = client.TSAdd(ctx, "c", 2, 3).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
_, err = client.TSAdd(ctx, "c", 11, 7).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
_, err = client.TSAdd(ctx, "c", 13, 1).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
mrangeOpt := &redis.TSMRevRangeOptions{Latest: true}
|
||||
result, err := client.TSMRevRangeWithArgs(ctx, 0, 10, []string{"is_compaction=true"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result["b"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), 8.0}, []interface{}{int64(0), 4.0}}))
|
||||
Expect(result["d"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), 8.0}, []interface{}{int64(0), 4.0}}))
|
||||
|
||||
})
|
||||
|
||||
})
|
Loading…
Reference in New Issue