Compare commits

...

3 Commits

Author SHA1 Message Date
Monkey 4c36b1ace3
Merge e9158aaee5 into cc9bcb0c0f 2024-10-17 10:31:53 +03:00
monkey92t e9158aaee5 add Broadcasting listen and read push
Signed-off-by: monkey92t <golang@88.com>
2023-04-24 22:16:43 +08:00
monkey92t 0761b27368 feat: cache init
Signed-off-by: monkey92t <golang@88.com>
2023-04-05 23:46:58 +08:00
1 changed files with 263 additions and 0 deletions

263
cache.go Normal file
View File

@ -0,0 +1,263 @@
package redis
import (
"context"
"fmt"
"net"
"strconv"
"sync/atomic"
"time"
"github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/pool"
"github.com/redis/go-redis/v9/internal/proto"
)
type Cache interface{}
type cache struct {
client *Client
// cluster? sentinel?
conn *Conn
prefix []string
closed int32 // atomic
}
func newCache() Cache {
// ?
return &cache{}
}
// ------------------------------------------------------------------------------------------
// extension method
func (c *Conn) readReply(ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error) error {
return c.withConn(ctx, func(ctx context.Context, conn *pool.Conn) error {
return conn.WithReader(ctx, timeout, fn)
})
}
// ------------------------------------------------------------------------------------------
// Client-side caching command.
type trackingArgs struct {
redirect int
prefixes []string
broadcast bool
optIn bool
optOut bool
noLoop bool
}
func (c *cache) clientTracking(ctx context.Context, t *trackingArgs) *StringCmd {
args := make([]any, 0, 7+len(t.prefixes))
args = append(args, "CLIENT", "TRACKING", "ON")
if t.redirect > 0 {
args = append(args, "REDIRECT", t.redirect)
}
if len(t.prefixes) > 0 {
for _, prefix := range t.prefixes {
args = append(args, "PREFIX", prefix)
}
}
if t.optIn {
args = append(args, "OPTIN")
}
if t.optOut {
args = append(args, "OPTOUT")
}
if t.noLoop {
args = append(args, "NOLOOP")
}
cmd := NewStringCmd(ctx, args...)
_ = c.conn.Process(ctx, cmd)
return cmd
}
func (c *cache) trackingClose(ctx context.Context) error {
return c.conn.Process(ctx, NewStringCmd(ctx, "CLIENT", "TRACKING", "OFF"))
}
func (c *cache) cachingYes(ctx context.Context) error {
return c.conn.Process(ctx, NewStringCmd(ctx, "CLIENT", "CACHING", "YES"))
}
func (c *cache) cachingNo(ctx context.Context) error {
return c.conn.Process(ctx, NewStringCmd(ctx, "CLIENT", "CACHING", "NO"))
}
// ------------------------------------------------------------------------------------
// readInvalidate To read the expired message push from redis-server,
// we only read for invalidate messages, and consider any other data that is read as an error.
func (c *cache) readInvalidate(rd *proto.Reader) ([]string, error) {
line, err := rd.ReadLine()
if err != nil {
return nil, err
}
if line[0] != proto.RespPush {
return nil, fmt.Errorf("invalid data-%s", string(line))
}
n, err := strconv.Atoi(string(line[1:]))
if err != nil {
return nil, err
}
if n != 2 {
return nil, fmt.Errorf("got %d elements in the map, wanted %d", n, 2)
}
// read `invalidate`
s, err := rd.ReadString()
if err != nil {
return nil, err
}
if s != "invalidate" {
return nil, fmt.Errorf("not a client-side caching push message, data-%s", s)
}
n, err = rd.ReadArrayLen()
if err != nil {
return nil, err
}
keys := make([]string, 0, n)
for i := 0; i < n; i++ {
key, err := rd.ReadString()
if err != nil {
return nil, err
}
keys = append(keys, key)
}
return keys, nil
}
// ------------------------------------- Broadcasting -------------------------------------
func (c *cache) listen(timeout time.Duration) {
ctx := context.Background()
defer func() {
if err := recover(); err != nil {
internal.Logger.Printf(ctx, "redis cache: panic - %v", err)
}
}()
if timeout == 0 {
timeout = 30 * time.Second
}
internal.Logger.Printf(ctx, "redis cache: listen working, read timeout-%d second", int(timeout/time.Second))
// state, 0-normal, 1-need init track
const (
normal = 0
bad = 1
)
var state = normal
for {
if atomic.LoadInt32(&c.closed) == 1 {
_ = c.conn.Close()
internal.Logger.Printf(ctx, "redis cache: close, quit listen")
return
}
if state == bad {
internal.Logger.Printf(ctx, "redis cache: state bad")
if err := c.initTrack(ctx); err != nil {
internal.Logger.Printf(ctx, "redis cache: listen init track error-%s", err.Error())
time.Sleep(1 * time.Second)
continue
}
}
if err := c.conn.Ping(ctx).Err(); err != nil {
internal.Logger.Printf(ctx, "redis cache: listen ping error-%s", err.Error())
state = bad
continue
}
state = normal
var keys []string
err := c.conn.withConn(ctx, func(ctx context.Context, conn *pool.Conn) error {
return conn.WithReader(ctx, timeout, func(rd *proto.Reader) (err error) {
keys, err = c.readInvalidate(rd)
if err == nil {
return nil
}
// The timeout error is considered normal, and it is triggered when we fail
// to receive a notification. We handle it as nil.
// We cannot return the timeout error, as go-redis would consider it a network
// problem and close the network connection.
if isNetTimeout(err) {
err = nil
return err
}
// We only listen for redis-push notifications, so under normal circumstances,
// we should not receive any redis-error notifications.
// If we do, we need to handle them as errors; otherwise,
// go-redis may consider redis errors as normal occurrences.
if isRedisError(err) {
err = fmt.Errorf("redis cache: unexpected response redis-error-msg-%s", err.Error())
}
return err
})
})
// under normal circumstances, we should not receive any errors, including redis errors.
if err != nil {
state = bad
internal.Logger.Printf(ctx, "redis cache: read push data error-%s", err.Error())
continue
}
// it's possible that we may not receive any notifications for keys.
if len(keys) > 0 {
// handle keys
}
}
}
func (c *cache) initTrack(ctx context.Context) error {
internal.Logger.Printf(ctx, "redis cache: init track")
if c.conn != nil {
_ = c.conn.Close()
}
c.conn = c.client.Conn()
args := make([]any, 0, 3+2*len(c.prefix)+1)
args = append(args, "CLIENT", "TRACKING", "ON")
for _, prefix := range c.prefix {
args = append(args, "PREFIX", prefix)
}
args = append(args, "BCAST")
cmd := NewStringCmd(ctx, args...)
if err := c.conn.Process(ctx, cmd); err != nil {
_ = c.conn.Close()
return err
}
return nil
}
// isNetTimeout check err == net timeout
func isNetTimeout(err error) bool {
if err == nil {
return false
}
netErr, ok := err.(net.Error)
if !ok {
return false
}
return netErr.Timeout()
}