mirror of https://github.com/go-redis/redis.git
Compare commits
3 Commits
9a24a888b3
...
4c36b1ace3
Author | SHA1 | Date |
---|---|---|
Monkey | 4c36b1ace3 | |
monkey92t | e9158aaee5 | |
monkey92t | 0761b27368 |
|
@ -0,0 +1,263 @@
|
|||
package redis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9/internal"
|
||||
"github.com/redis/go-redis/v9/internal/pool"
|
||||
"github.com/redis/go-redis/v9/internal/proto"
|
||||
)
|
||||
|
||||
type Cache interface{}
|
||||
|
||||
type cache struct {
|
||||
client *Client
|
||||
|
||||
// cluster? sentinel?
|
||||
conn *Conn
|
||||
prefix []string
|
||||
|
||||
closed int32 // atomic
|
||||
}
|
||||
|
||||
func newCache() Cache {
|
||||
// ?
|
||||
return &cache{}
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------------------------
|
||||
|
||||
// extension method
|
||||
|
||||
func (c *Conn) readReply(ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error) error {
|
||||
return c.withConn(ctx, func(ctx context.Context, conn *pool.Conn) error {
|
||||
return conn.WithReader(ctx, timeout, fn)
|
||||
})
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------------------------
|
||||
|
||||
// Client-side caching command.
|
||||
|
||||
type trackingArgs struct {
|
||||
redirect int
|
||||
prefixes []string
|
||||
broadcast bool
|
||||
optIn bool
|
||||
optOut bool
|
||||
noLoop bool
|
||||
}
|
||||
|
||||
func (c *cache) clientTracking(ctx context.Context, t *trackingArgs) *StringCmd {
|
||||
args := make([]any, 0, 7+len(t.prefixes))
|
||||
args = append(args, "CLIENT", "TRACKING", "ON")
|
||||
if t.redirect > 0 {
|
||||
args = append(args, "REDIRECT", t.redirect)
|
||||
}
|
||||
if len(t.prefixes) > 0 {
|
||||
for _, prefix := range t.prefixes {
|
||||
args = append(args, "PREFIX", prefix)
|
||||
}
|
||||
}
|
||||
if t.optIn {
|
||||
args = append(args, "OPTIN")
|
||||
}
|
||||
if t.optOut {
|
||||
args = append(args, "OPTOUT")
|
||||
}
|
||||
if t.noLoop {
|
||||
args = append(args, "NOLOOP")
|
||||
}
|
||||
cmd := NewStringCmd(ctx, args...)
|
||||
_ = c.conn.Process(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
func (c *cache) trackingClose(ctx context.Context) error {
|
||||
return c.conn.Process(ctx, NewStringCmd(ctx, "CLIENT", "TRACKING", "OFF"))
|
||||
}
|
||||
|
||||
func (c *cache) cachingYes(ctx context.Context) error {
|
||||
return c.conn.Process(ctx, NewStringCmd(ctx, "CLIENT", "CACHING", "YES"))
|
||||
}
|
||||
|
||||
func (c *cache) cachingNo(ctx context.Context) error {
|
||||
return c.conn.Process(ctx, NewStringCmd(ctx, "CLIENT", "CACHING", "NO"))
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------------------
|
||||
|
||||
// readInvalidate To read the expired message push from redis-server,
|
||||
// we only read for invalidate messages, and consider any other data that is read as an error.
|
||||
func (c *cache) readInvalidate(rd *proto.Reader) ([]string, error) {
|
||||
line, err := rd.ReadLine()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if line[0] != proto.RespPush {
|
||||
return nil, fmt.Errorf("invalid data-%s", string(line))
|
||||
}
|
||||
|
||||
n, err := strconv.Atoi(string(line[1:]))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if n != 2 {
|
||||
return nil, fmt.Errorf("got %d elements in the map, wanted %d", n, 2)
|
||||
}
|
||||
|
||||
// read `invalidate`
|
||||
s, err := rd.ReadString()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s != "invalidate" {
|
||||
return nil, fmt.Errorf("not a client-side caching push message, data-%s", s)
|
||||
}
|
||||
|
||||
n, err = rd.ReadArrayLen()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
keys := make([]string, 0, n)
|
||||
for i := 0; i < n; i++ {
|
||||
key, err := rd.ReadString()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
keys = append(keys, key)
|
||||
}
|
||||
|
||||
return keys, nil
|
||||
}
|
||||
|
||||
// ------------------------------------- Broadcasting -------------------------------------
|
||||
|
||||
func (c *cache) listen(timeout time.Duration) {
|
||||
ctx := context.Background()
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
internal.Logger.Printf(ctx, "redis cache: panic - %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
if timeout == 0 {
|
||||
timeout = 30 * time.Second
|
||||
}
|
||||
internal.Logger.Printf(ctx, "redis cache: listen working, read timeout-%d second", int(timeout/time.Second))
|
||||
|
||||
// state, 0-normal, 1-need init track
|
||||
const (
|
||||
normal = 0
|
||||
bad = 1
|
||||
)
|
||||
var state = normal
|
||||
for {
|
||||
if atomic.LoadInt32(&c.closed) == 1 {
|
||||
_ = c.conn.Close()
|
||||
internal.Logger.Printf(ctx, "redis cache: close, quit listen")
|
||||
return
|
||||
}
|
||||
|
||||
if state == bad {
|
||||
internal.Logger.Printf(ctx, "redis cache: state bad")
|
||||
if err := c.initTrack(ctx); err != nil {
|
||||
internal.Logger.Printf(ctx, "redis cache: listen init track error-%s", err.Error())
|
||||
time.Sleep(1 * time.Second)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if err := c.conn.Ping(ctx).Err(); err != nil {
|
||||
internal.Logger.Printf(ctx, "redis cache: listen ping error-%s", err.Error())
|
||||
state = bad
|
||||
continue
|
||||
}
|
||||
state = normal
|
||||
|
||||
var keys []string
|
||||
err := c.conn.withConn(ctx, func(ctx context.Context, conn *pool.Conn) error {
|
||||
return conn.WithReader(ctx, timeout, func(rd *proto.Reader) (err error) {
|
||||
keys, err = c.readInvalidate(rd)
|
||||
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// The timeout error is considered normal, and it is triggered when we fail
|
||||
// to receive a notification. We handle it as nil.
|
||||
// We cannot return the timeout error, as go-redis would consider it a network
|
||||
// problem and close the network connection.
|
||||
if isNetTimeout(err) {
|
||||
err = nil
|
||||
return err
|
||||
}
|
||||
|
||||
// We only listen for redis-push notifications, so under normal circumstances,
|
||||
// we should not receive any redis-error notifications.
|
||||
// If we do, we need to handle them as errors; otherwise,
|
||||
// go-redis may consider redis errors as normal occurrences.
|
||||
if isRedisError(err) {
|
||||
err = fmt.Errorf("redis cache: unexpected response redis-error-msg-%s", err.Error())
|
||||
}
|
||||
|
||||
return err
|
||||
})
|
||||
})
|
||||
|
||||
// under normal circumstances, we should not receive any errors, including redis errors.
|
||||
if err != nil {
|
||||
state = bad
|
||||
|
||||
internal.Logger.Printf(ctx, "redis cache: read push data error-%s", err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
// it's possible that we may not receive any notifications for keys.
|
||||
if len(keys) > 0 {
|
||||
// handle keys
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cache) initTrack(ctx context.Context) error {
|
||||
internal.Logger.Printf(ctx, "redis cache: init track")
|
||||
if c.conn != nil {
|
||||
_ = c.conn.Close()
|
||||
}
|
||||
c.conn = c.client.Conn()
|
||||
|
||||
args := make([]any, 0, 3+2*len(c.prefix)+1)
|
||||
args = append(args, "CLIENT", "TRACKING", "ON")
|
||||
for _, prefix := range c.prefix {
|
||||
args = append(args, "PREFIX", prefix)
|
||||
}
|
||||
args = append(args, "BCAST")
|
||||
cmd := NewStringCmd(ctx, args...)
|
||||
|
||||
if err := c.conn.Process(ctx, cmd); err != nil {
|
||||
_ = c.conn.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// isNetTimeout check err == net timeout
|
||||
func isNetTimeout(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
netErr, ok := err.(net.Error)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
return netErr.Timeout()
|
||||
}
|
Loading…
Reference in New Issue