mirror of https://github.com/tidwall/tile38.git
Code cleanup
This commit cleans up various Go code in the internal directory. - Ensures comments on exported functions - Changes all *Server receiver in all files to be "s", instead of mixed "c", "s", "server", etc. - Silenced Go warnings for if/else with returns. - Cleaned up import ordering.
This commit is contained in:
parent
981d9ece42
commit
c084aeedc2
|
@ -112,11 +112,11 @@ func TileXYToPixelXY(tileX, tileY int64) (pixelX, pixelY int64) {
|
|||
return tileX << 8, tileY << 8
|
||||
}
|
||||
|
||||
/// TileXYToQuadKey converts tile XY coordinates into a QuadKey at a specified level of detail.
|
||||
/// Param 'tileX' is the tile X coordinate.
|
||||
/// Param 'tileY' is the tile Y coordinate.
|
||||
/// Param 'levelOfDetail' is the Level of detail, from 1 (lowest detail) to N (highest detail).
|
||||
/// Returns a string containing the QuadKey.
|
||||
// TileXYToQuadKey converts tile XY coordinates into a QuadKey at a specified level of detail.
|
||||
// Param 'tileX' is the tile X coordinate.
|
||||
// Param 'tileY' is the tile Y coordinate.
|
||||
// Param 'levelOfDetail' is the Level of detail, from 1 (lowest detail) to N (highest detail).
|
||||
// Returns a string containing the QuadKey.
|
||||
func TileXYToQuadKey(tileX, tileY int64, levelOfDetail uint64) string {
|
||||
quadKey := make([]byte, levelOfDetail)
|
||||
for i, j := levelOfDetail, 0; i > 0; i, j = i-1, j+1 {
|
||||
|
@ -136,11 +136,11 @@ func TileXYToQuadKey(tileX, tileY int64, levelOfDetail uint64) string {
|
|||
return string(quadKey)
|
||||
}
|
||||
|
||||
/// QuadKeyToTileXY converts a QuadKey into tile XY coordinates.
|
||||
/// Param 'quadKey' is the quadKey of the tile.
|
||||
/// Return value 'tileX' is the output parameter receiving the tile X coordinate.
|
||||
/// Return value 'tileY is the output parameter receiving the tile Y coordinate.
|
||||
/// Return value 'levelOfDetail' is the output parameter receiving the level of detail.
|
||||
// QuadKeyToTileXY converts a QuadKey into tile XY coordinates.
|
||||
// Param 'quadKey' is the quadKey of the tile.
|
||||
// Return value 'tileX' is the output parameter receiving the tile X coordinate.
|
||||
// Return value 'tileY is the output parameter receiving the tile Y coordinate.
|
||||
// Return value 'levelOfDetail' is the output parameter receiving the level of detail.
|
||||
func QuadKeyToTileXY(quadKey string) (tileX, tileY int64, levelOfDetail uint64) {
|
||||
levelOfDetail = uint64(len(quadKey))
|
||||
for i := levelOfDetail; i > 0; i-- {
|
||||
|
|
|
@ -9,28 +9,29 @@ type Deadline struct {
|
|||
}
|
||||
|
||||
// New returns a new deadline object
|
||||
func New(deadline time.Time) *Deadline {
|
||||
return &Deadline{unixNano: deadline.UnixNano()}
|
||||
func New(dl time.Time) *Deadline {
|
||||
return &Deadline{unixNano: dl.UnixNano()}
|
||||
}
|
||||
|
||||
// Check the deadline and panic when reached
|
||||
//go:noinline
|
||||
func (deadline *Deadline) Check() {
|
||||
if deadline == nil || deadline.unixNano == 0 {
|
||||
func (dl *Deadline) Check() {
|
||||
if dl == nil || dl.unixNano == 0 {
|
||||
return
|
||||
}
|
||||
if !deadline.hit && time.Now().UnixNano() > deadline.unixNano {
|
||||
deadline.hit = true
|
||||
if !dl.hit && time.Now().UnixNano() > dl.unixNano {
|
||||
dl.hit = true
|
||||
panic("deadline")
|
||||
}
|
||||
}
|
||||
|
||||
// Hit returns true if the deadline has been hit
|
||||
func (deadline *Deadline) Hit() bool {
|
||||
return deadline.hit
|
||||
func (dl *Deadline) Hit() bool {
|
||||
return dl.hit
|
||||
}
|
||||
|
||||
// GetDeadlineTime returns the time object for the deadline, and an "empty" boolean
|
||||
func (deadline *Deadline) GetDeadlineTime() (time.Time) {
|
||||
return time.Unix(0, deadline.unixNano)
|
||||
// GetDeadlineTime returns the time object for the deadline, and an
|
||||
// "empty" boolean
|
||||
func (dl *Deadline) GetDeadlineTime() time.Time {
|
||||
return time.Unix(0, dl.unixNano)
|
||||
}
|
||||
|
|
|
@ -9,9 +9,7 @@ import (
|
|||
"github.com/streadway/amqp"
|
||||
)
|
||||
|
||||
const (
|
||||
amqpExpiresAfter = time.Second * 30
|
||||
)
|
||||
const amqpExpiresAfter = time.Second * 30
|
||||
|
||||
// AMQPConn is an endpoint connection
|
||||
type AMQPConn struct {
|
||||
|
|
|
@ -9,9 +9,7 @@ import (
|
|||
"github.com/tidwall/tile38/internal/log"
|
||||
)
|
||||
|
||||
const (
|
||||
disqueExpiresAfter = time.Second * 30
|
||||
)
|
||||
const disqueExpiresAfter = time.Second * 30
|
||||
|
||||
// DisqueConn is an endpoint connection
|
||||
type DisqueConn struct {
|
||||
|
|
|
@ -11,9 +11,7 @@ import (
|
|||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
const (
|
||||
grpcExpiresAfter = time.Second * 30
|
||||
)
|
||||
const grpcExpiresAfter = time.Second * 30
|
||||
|
||||
// GRPCConn is an endpoint connection
|
||||
type GRPCConn struct {
|
||||
|
|
|
@ -3,16 +3,14 @@ package endpoint
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/tidwall/gjson"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/tidwall/gjson"
|
||||
)
|
||||
|
||||
const (
|
||||
kafkaExpiresAfter = time.Second * 30
|
||||
)
|
||||
const kafkaExpiresAfter = time.Second * 30
|
||||
|
||||
// KafkaConn is an endpoint connection
|
||||
type KafkaConn struct {
|
||||
|
|
|
@ -4,9 +4,7 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
localExpiresAfter = time.Second * 30
|
||||
)
|
||||
const localExpiresAfter = time.Second * 30
|
||||
|
||||
// LocalPublisher is used to publish local notifcations
|
||||
type LocalPublisher interface {
|
||||
|
|
|
@ -8,9 +8,7 @@ import (
|
|||
"github.com/nats-io/go-nats"
|
||||
)
|
||||
|
||||
const (
|
||||
natsExpiresAfter = time.Second * 30
|
||||
)
|
||||
const natsExpiresAfter = time.Second * 30
|
||||
|
||||
// NATSConn is an endpoint connection
|
||||
type NATSConn struct {
|
||||
|
|
|
@ -8,9 +8,7 @@ import (
|
|||
"github.com/gomodule/redigo/redis"
|
||||
)
|
||||
|
||||
const (
|
||||
redisExpiresAfter = time.Second * 30
|
||||
)
|
||||
const redisExpiresAfter = time.Second * 30
|
||||
|
||||
// RedisConn is an endpoint connection
|
||||
type RedisConn struct {
|
||||
|
|
|
@ -17,9 +17,7 @@ import (
|
|||
|
||||
var errCreateQueue = errors.New("Error while creating queue")
|
||||
|
||||
const (
|
||||
sqsExpiresAfter = time.Second * 30
|
||||
)
|
||||
const sqsExpiresAfter = time.Second * 30
|
||||
|
||||
// SQSConn is an endpoint connection
|
||||
type SQSConn struct {
|
||||
|
|
|
@ -2,6 +2,7 @@ package glob
|
|||
|
||||
import "strings"
|
||||
|
||||
// Glob structure for simple string matching
|
||||
type Glob struct {
|
||||
Pattern string
|
||||
Desc bool
|
||||
|
@ -9,10 +10,13 @@ type Glob struct {
|
|||
IsGlob bool
|
||||
}
|
||||
|
||||
func Match(pattern, name string) (matched bool, err error) {
|
||||
return wildcardMatch(pattern, name)
|
||||
// Match returns true when string matches pattern. Returns an error when the
|
||||
// pattern is invalid.
|
||||
func Match(pattern, str string) (matched bool, err error) {
|
||||
return wildcardMatch(pattern, str)
|
||||
}
|
||||
|
||||
// IsGlob returns true when the pattern is a valid glob
|
||||
func IsGlob(pattern string) bool {
|
||||
for i := 0; i < len(pattern); i++ {
|
||||
switch pattern[i] {
|
||||
|
@ -24,6 +28,7 @@ func IsGlob(pattern string) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
// Parse returns a glob structure from the pattern.
|
||||
func Parse(pattern string, desc bool) *Glob {
|
||||
g := &Glob{Pattern: pattern, Desc: desc, Limits: []string{"", ""}}
|
||||
if strings.HasPrefix(pattern, "*") {
|
||||
|
|
|
@ -22,5 +22,4 @@ func BenchmarkLogPrintf(t *testing.B) {
|
|||
for i := 0; i < t.N; i++ {
|
||||
Printf("X %s", "Y")
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -31,8 +31,8 @@ func (err errAOFHook) Error() string {
|
|||
|
||||
var errInvalidAOF = errors.New("invalid aof file")
|
||||
|
||||
func (server *Server) loadAOF() error {
|
||||
fi, err := server.aof.Stat()
|
||||
func (s *Server) loadAOF() error {
|
||||
fi, err := s.aof.Stat()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -58,7 +58,7 @@ func (server *Server) loadAOF() error {
|
|||
var args [][]byte
|
||||
var packet [0xFFFF]byte
|
||||
for {
|
||||
n, err := server.aof.Read(packet[:])
|
||||
n, err := s.aof.Read(packet[:])
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
if len(buf) > 0 {
|
||||
|
@ -68,7 +68,7 @@ func (server *Server) loadAOF() error {
|
|||
}
|
||||
return err
|
||||
}
|
||||
server.aofsz += n
|
||||
s.aofsz += n
|
||||
data := packet[:n]
|
||||
if len(buf) > 0 {
|
||||
data = append(buf, data...)
|
||||
|
@ -88,7 +88,7 @@ func (server *Server) loadAOF() error {
|
|||
for _, arg := range args {
|
||||
msg.Args = append(msg.Args, string(arg))
|
||||
}
|
||||
if _, _, err := server.command(&msg, nil); err != nil {
|
||||
if _, _, err := s.command(&msg, nil); err != nil {
|
||||
if commandErrIsFatal(err) {
|
||||
return err
|
||||
}
|
||||
|
@ -117,96 +117,96 @@ func commandErrIsFatal(err error) bool {
|
|||
|
||||
// flushAOF flushes all aof buffer data to disk. Set sync to true to sync the
|
||||
// fsync the file.
|
||||
func (server *Server) flushAOF(sync bool) {
|
||||
if len(server.aofbuf) > 0 {
|
||||
_, err := server.aof.Write(server.aofbuf)
|
||||
func (s *Server) flushAOF(sync bool) {
|
||||
if len(s.aofbuf) > 0 {
|
||||
_, err := s.aof.Write(s.aofbuf)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if sync {
|
||||
if err := server.aof.Sync(); err != nil {
|
||||
if err := s.aof.Sync(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
if cap(server.aofbuf) > 1024*1024*32 {
|
||||
server.aofbuf = make([]byte, 0, 1024*1024*32)
|
||||
if cap(s.aofbuf) > 1024*1024*32 {
|
||||
s.aofbuf = make([]byte, 0, 1024*1024*32)
|
||||
} else {
|
||||
server.aofbuf = server.aofbuf[:0]
|
||||
s.aofbuf = s.aofbuf[:0]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (server *Server) writeAOF(args []string, d *commandDetails) error {
|
||||
func (s *Server) writeAOF(args []string, d *commandDetails) error {
|
||||
if d != nil && !d.updated {
|
||||
// just ignore writes if the command did not update
|
||||
return nil
|
||||
}
|
||||
|
||||
if server.shrinking {
|
||||
if s.shrinking {
|
||||
nargs := make([]string, len(args))
|
||||
copy(nargs, args)
|
||||
server.shrinklog = append(server.shrinklog, nargs)
|
||||
s.shrinklog = append(s.shrinklog, nargs)
|
||||
}
|
||||
|
||||
if server.aof != nil {
|
||||
atomic.StoreInt32(&server.aofdirty, 1) // prewrite optimization flag
|
||||
n := len(server.aofbuf)
|
||||
server.aofbuf = redcon.AppendArray(server.aofbuf, len(args))
|
||||
if s.aof != nil {
|
||||
atomic.StoreInt32(&s.aofdirty, 1) // prewrite optimization flag
|
||||
n := len(s.aofbuf)
|
||||
s.aofbuf = redcon.AppendArray(s.aofbuf, len(args))
|
||||
for _, arg := range args {
|
||||
server.aofbuf = redcon.AppendBulkString(server.aofbuf, arg)
|
||||
s.aofbuf = redcon.AppendBulkString(s.aofbuf, arg)
|
||||
}
|
||||
server.aofsz += len(server.aofbuf) - n
|
||||
s.aofsz += len(s.aofbuf) - n
|
||||
}
|
||||
|
||||
// notify aof live connections that we have new data
|
||||
server.fcond.L.Lock()
|
||||
server.fcond.Broadcast()
|
||||
server.fcond.L.Unlock()
|
||||
s.fcond.L.Lock()
|
||||
s.fcond.Broadcast()
|
||||
s.fcond.L.Unlock()
|
||||
|
||||
// process geofences
|
||||
if d != nil {
|
||||
// webhook geofences
|
||||
if server.config.followHost() == "" {
|
||||
if s.config.followHost() == "" {
|
||||
// for leader only
|
||||
if d.parent {
|
||||
// queue children
|
||||
for _, d := range d.children {
|
||||
if err := server.queueHooks(d); err != nil {
|
||||
if err := s.queueHooks(d); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// queue parent
|
||||
if err := server.queueHooks(d); err != nil {
|
||||
if err := s.queueHooks(d); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// live geofences
|
||||
server.lcond.L.Lock()
|
||||
if len(server.lives) > 0 {
|
||||
s.lcond.L.Lock()
|
||||
if len(s.lives) > 0 {
|
||||
if d.parent {
|
||||
// queue children
|
||||
for _, d := range d.children {
|
||||
server.lstack = append(server.lstack, d)
|
||||
s.lstack = append(s.lstack, d)
|
||||
}
|
||||
} else {
|
||||
// queue parent
|
||||
server.lstack = append(server.lstack, d)
|
||||
s.lstack = append(s.lstack, d)
|
||||
}
|
||||
server.lcond.Broadcast()
|
||||
s.lcond.Broadcast()
|
||||
}
|
||||
server.lcond.L.Unlock()
|
||||
s.lcond.L.Unlock()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (server *Server) getQueueCandidates(d *commandDetails) []*Hook {
|
||||
func (s *Server) getQueueCandidates(d *commandDetails) []*Hook {
|
||||
var candidates []*Hook
|
||||
// add the hooks with "outside" detection
|
||||
if len(server.hooksOut) > 0 {
|
||||
for _, hook := range server.hooksOut {
|
||||
if len(s.hooksOut) > 0 {
|
||||
for _, hook := range s.hooksOut {
|
||||
if hook.Key == d.key {
|
||||
candidates = append(candidates, hook)
|
||||
}
|
||||
|
@ -218,7 +218,7 @@ func (server *Server) getQueueCandidates(d *commandDetails) []*Hook {
|
|||
continue
|
||||
}
|
||||
rect := obj.Rect()
|
||||
server.hookTree.Search(
|
||||
s.hookTree.Search(
|
||||
[2]float64{rect.Min.X, rect.Min.Y},
|
||||
[2]float64{rect.Max.X, rect.Max.Y},
|
||||
func(_, _ [2]float64, value interface{}) bool {
|
||||
|
@ -243,13 +243,13 @@ func (server *Server) getQueueCandidates(d *commandDetails) []*Hook {
|
|||
return candidates
|
||||
}
|
||||
|
||||
func (server *Server) queueHooks(d *commandDetails) error {
|
||||
func (s *Server) queueHooks(d *commandDetails) error {
|
||||
// Create the slices that will store all messages and hooks
|
||||
var cmsgs, wmsgs []string
|
||||
var whooks []*Hook
|
||||
|
||||
// Compile a slice of potential hook recipients
|
||||
candidates := server.getQueueCandidates(d)
|
||||
candidates := s.getQueueCandidates(d)
|
||||
for _, hook := range candidates {
|
||||
// Calculate all matching fence messages for all candidates and append
|
||||
// them to the appropriate message slice
|
||||
|
@ -280,22 +280,22 @@ func (server *Server) queueHooks(d *commandDetails) error {
|
|||
// Publish all channel messages if any exist
|
||||
if len(cmsgs) > 0 {
|
||||
for _, m := range cmsgs {
|
||||
server.Publish(gjson.Get(m, "hook").String(), m)
|
||||
s.Publish(gjson.Get(m, "hook").String(), m)
|
||||
}
|
||||
}
|
||||
|
||||
// Queue the webhook messages in the buntdb database
|
||||
err := server.qdb.Update(func(tx *buntdb.Tx) error {
|
||||
err := s.qdb.Update(func(tx *buntdb.Tx) error {
|
||||
for _, msg := range wmsgs {
|
||||
server.qidx++ // increment the log id
|
||||
key := hookLogPrefix + uint64ToString(server.qidx)
|
||||
s.qidx++ // increment the log id
|
||||
key := hookLogPrefix + uint64ToString(s.qidx)
|
||||
_, _, err := tx.Set(key, msg, hookLogSetDefaults)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Debugf("queued hook: %d", server.qidx)
|
||||
log.Debugf("queued hook: %d", s.qidx)
|
||||
}
|
||||
_, _, err := tx.Set("hook:idx", uint64ToString(server.qidx), nil)
|
||||
_, _, err := tx.Set("hook:idx", uint64ToString(s.qidx), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -365,7 +365,7 @@ func (s liveAOFSwitches) Error() string {
|
|||
return goingLive
|
||||
}
|
||||
|
||||
func (server *Server) cmdAOFMD5(msg *Message) (res resp.Value, err error) {
|
||||
func (s *Server) cmdAOFMD5(msg *Message) (res resp.Value, err error) {
|
||||
start := time.Now()
|
||||
vs := msg.Args[1:]
|
||||
var ok bool
|
||||
|
@ -388,7 +388,7 @@ func (server *Server) cmdAOFMD5(msg *Message) (res resp.Value, err error) {
|
|||
if err != nil || size < 0 {
|
||||
return NOMessage, errInvalidArgument(ssize)
|
||||
}
|
||||
sum, err := server.checksum(pos, size)
|
||||
sum, err := s.checksum(pos, size)
|
||||
if err != nil {
|
||||
return NOMessage, err
|
||||
}
|
||||
|
@ -402,8 +402,8 @@ func (server *Server) cmdAOFMD5(msg *Message) (res resp.Value, err error) {
|
|||
return res, nil
|
||||
}
|
||||
|
||||
func (server *Server) cmdAOF(msg *Message) (res resp.Value, err error) {
|
||||
if server.aof == nil {
|
||||
func (s *Server) cmdAOF(msg *Message) (res resp.Value, err error) {
|
||||
if s.aof == nil {
|
||||
return NOMessage, errors.New("aof disabled")
|
||||
}
|
||||
vs := msg.Args[1:]
|
||||
|
@ -420,7 +420,7 @@ func (server *Server) cmdAOF(msg *Message) (res resp.Value, err error) {
|
|||
if err != nil || pos < 0 {
|
||||
return NOMessage, errInvalidArgument(spos)
|
||||
}
|
||||
f, err := os.Open(server.aof.Name())
|
||||
f, err := os.Open(s.aof.Name())
|
||||
if err != nil {
|
||||
return NOMessage, err
|
||||
}
|
||||
|
@ -432,19 +432,19 @@ func (server *Server) cmdAOF(msg *Message) (res resp.Value, err error) {
|
|||
if n < pos {
|
||||
return NOMessage, errors.New("pos is too big, must be less that the aof_size of leader")
|
||||
}
|
||||
var s liveAOFSwitches
|
||||
s.pos = pos
|
||||
return NOMessage, s
|
||||
var ls liveAOFSwitches
|
||||
ls.pos = pos
|
||||
return NOMessage, ls
|
||||
}
|
||||
|
||||
func (server *Server) liveAOF(pos int64, conn net.Conn, rd *PipelineReader, msg *Message) error {
|
||||
server.mu.Lock()
|
||||
server.aofconnM[conn] = true
|
||||
server.mu.Unlock()
|
||||
func (s *Server) liveAOF(pos int64, conn net.Conn, rd *PipelineReader, msg *Message) error {
|
||||
s.mu.Lock()
|
||||
s.aofconnM[conn] = true
|
||||
s.mu.Unlock()
|
||||
defer func() {
|
||||
server.mu.Lock()
|
||||
delete(server.aofconnM, conn)
|
||||
server.mu.Unlock()
|
||||
s.mu.Lock()
|
||||
delete(s.aofconnM, conn)
|
||||
s.mu.Unlock()
|
||||
conn.Close()
|
||||
}()
|
||||
|
||||
|
@ -452,9 +452,9 @@ func (server *Server) liveAOF(pos int64, conn net.Conn, rd *PipelineReader, msg
|
|||
return err
|
||||
}
|
||||
|
||||
server.mu.RLock()
|
||||
f, err := os.Open(server.aof.Name())
|
||||
server.mu.RUnlock()
|
||||
s.mu.RLock()
|
||||
f, err := os.Open(s.aof.Name())
|
||||
s.mu.RUnlock()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -516,9 +516,9 @@ func (server *Server) liveAOF(pos int64, conn net.Conn, rd *PipelineReader, msg
|
|||
}
|
||||
continue
|
||||
}
|
||||
server.fcond.L.Lock()
|
||||
server.fcond.Wait()
|
||||
server.fcond.L.Unlock()
|
||||
s.fcond.L.Lock()
|
||||
s.fcond.Wait()
|
||||
s.fcond.L.Unlock()
|
||||
}
|
||||
}()
|
||||
if err != nil {
|
||||
|
|
|
@ -83,15 +83,15 @@ func NewLegacyAOFReader(r io.Reader) *LegacyAOFReader {
|
|||
return rd
|
||||
}
|
||||
|
||||
func (c *Server) migrateAOF() error {
|
||||
_, err := os.Stat(path.Join(c.dir, "appendonly.aof"))
|
||||
func (s *Server) migrateAOF() error {
|
||||
_, err := os.Stat(path.Join(s.dir, "appendonly.aof"))
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
if !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
_, err = os.Stat(path.Join(c.dir, "aof"))
|
||||
_, err = os.Stat(path.Join(s.dir, "aof"))
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return nil
|
||||
|
@ -99,13 +99,13 @@ func (c *Server) migrateAOF() error {
|
|||
return err
|
||||
}
|
||||
log.Warn("Migrating aof to new format")
|
||||
newf, err := os.Create(path.Join(c.dir, "migrate.aof"))
|
||||
newf, err := os.Create(path.Join(s.dir, "migrate.aof"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer newf.Close()
|
||||
|
||||
oldf, err := os.Open(path.Join(c.dir, "aof"))
|
||||
oldf, err := os.Open(path.Join(s.dir, "aof"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -155,5 +155,5 @@ func (c *Server) migrateAOF() error {
|
|||
oldf.Close()
|
||||
newf.Close()
|
||||
log.Debugf("%d items: %.0f/sec", count, float64(count)/(float64(time.Now().Sub(start))/float64(time.Second)))
|
||||
return os.Rename(path.Join(c.dir, "migrate.aof"), path.Join(c.dir, "appendonly.aof"))
|
||||
return os.Rename(path.Join(s.dir, "migrate.aof"), path.Join(s.dir, "appendonly.aof"))
|
||||
}
|
||||
|
|
|
@ -14,12 +14,12 @@ import (
|
|||
)
|
||||
|
||||
// checksum performs a simple md5 checksum on the aof file
|
||||
func (c *Server) checksum(pos, size int64) (sum string, err error) {
|
||||
if pos+size > int64(c.aofsz) {
|
||||
func (s *Server) checksum(pos, size int64) (sum string, err error) {
|
||||
if pos+size > int64(s.aofsz) {
|
||||
return "", io.EOF
|
||||
}
|
||||
var f *os.File
|
||||
f, err = os.Open(c.aof.Name())
|
||||
f, err = os.Open(s.aof.Name())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
@ -27,7 +27,7 @@ func (c *Server) checksum(pos, size int64) (sum string, err error) {
|
|||
sumr := md5.New()
|
||||
err = func() error {
|
||||
if size == 0 {
|
||||
n, err := f.Seek(int64(c.aofsz), 0)
|
||||
n, err := f.Seek(int64(s.aofsz), 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -74,8 +74,8 @@ func connAOFMD5(conn *RESPConn, pos, size int64) (sum string, err error) {
|
|||
return sum, nil
|
||||
}
|
||||
|
||||
func (c *Server) matchChecksums(conn *RESPConn, pos, size int64) (match bool, err error) {
|
||||
sum, err := c.checksum(pos, size)
|
||||
func (s *Server) matchChecksums(conn *RESPConn, pos, size int64) (match bool, err error) {
|
||||
sum, err := s.checksum(pos, size)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
return false, nil
|
||||
|
@ -138,16 +138,16 @@ func getEndOfLastValuePositionInFile(fname string, startPos int64) (int64, error
|
|||
|
||||
// followCheckSome is not a full checksum. It just "checks some" data.
|
||||
// We will do some various checksums on the leader until we find the correct position to start at.
|
||||
func (c *Server) followCheckSome(addr string, followc int) (pos int64, err error) {
|
||||
func (s *Server) followCheckSome(addr string, followc int) (pos int64, err error) {
|
||||
if core.ShowDebugMessages {
|
||||
log.Debug("follow:", addr, ":check some")
|
||||
}
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if c.followc.get() != followc {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.followc.get() != followc {
|
||||
return 0, errNoLongerFollowing
|
||||
}
|
||||
if c.aofsz < checksumsz {
|
||||
if s.aofsz < checksumsz {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
|
@ -158,9 +158,9 @@ func (c *Server) followCheckSome(addr string, followc int) (pos int64, err error
|
|||
defer conn.Close()
|
||||
|
||||
min := int64(0)
|
||||
max := int64(c.aofsz) - checksumsz
|
||||
limit := int64(c.aofsz)
|
||||
match, err := c.matchChecksums(conn, min, checksumsz)
|
||||
max := int64(s.aofsz) - checksumsz
|
||||
limit := int64(s.aofsz)
|
||||
match, err := s.matchChecksums(conn, min, checksumsz)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
@ -172,7 +172,7 @@ func (c *Server) followCheckSome(addr string, followc int) (pos int64, err error
|
|||
pos = min
|
||||
break
|
||||
} else {
|
||||
match, err = c.matchChecksums(conn, max, checksumsz)
|
||||
match, err = s.matchChecksums(conn, max, checksumsz)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
@ -186,10 +186,10 @@ func (c *Server) followCheckSome(addr string, followc int) (pos int64, err error
|
|||
}
|
||||
}
|
||||
fullpos := pos
|
||||
fname := c.aof.Name()
|
||||
fname := s.aof.Name()
|
||||
if pos == 0 {
|
||||
c.aof.Close()
|
||||
c.aof, err = os.Create(fname)
|
||||
s.aof.Close()
|
||||
s.aof, err = os.Create(fname)
|
||||
if err != nil {
|
||||
log.Fatalf("could not recreate aof, possible data loss. %s", err.Error())
|
||||
return 0, err
|
||||
|
@ -199,7 +199,7 @@ func (c *Server) followCheckSome(addr string, followc int) (pos int64, err error
|
|||
|
||||
// we want to truncate at a command location
|
||||
// search for nearest command
|
||||
pos, err = getEndOfLastValuePositionInFile(c.aof.Name(), fullpos)
|
||||
pos, err = getEndOfLastValuePositionInFile(s.aof.Name(), fullpos)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
@ -211,24 +211,24 @@ func (c *Server) followCheckSome(addr string, followc int) (pos int64, err error
|
|||
}
|
||||
log.Warnf("truncating aof to %d", pos)
|
||||
// any errror below are fatal.
|
||||
c.aof.Close()
|
||||
s.aof.Close()
|
||||
if err := os.Truncate(fname, pos); err != nil {
|
||||
log.Fatalf("could not truncate aof, possible data loss. %s", err.Error())
|
||||
return 0, err
|
||||
}
|
||||
c.aof, err = os.OpenFile(fname, os.O_CREATE|os.O_RDWR, 0600)
|
||||
s.aof, err = os.OpenFile(fname, os.O_CREATE|os.O_RDWR, 0600)
|
||||
if err != nil {
|
||||
log.Fatalf("could not create aof, possible data loss. %s", err.Error())
|
||||
return 0, err
|
||||
}
|
||||
// reset the entire system.
|
||||
log.Infof("reloading aof commands")
|
||||
c.reset()
|
||||
if err := c.loadAOF(); err != nil {
|
||||
s.reset()
|
||||
if err := s.loadAOF(); err != nil {
|
||||
log.Fatalf("could not reload aof, possible data loss. %s", err.Error())
|
||||
return 0, err
|
||||
}
|
||||
if int64(c.aofsz) != pos {
|
||||
if int64(s.aofsz) != pos {
|
||||
log.Fatalf("aof size mismatch during reload, possible data loss.")
|
||||
return 0, errors.New("?")
|
||||
}
|
||||
|
|
|
@ -52,7 +52,7 @@ func (arr byID) Swap(a, b int) {
|
|||
arr[a], arr[b] = arr[b], arr[a]
|
||||
}
|
||||
|
||||
func (c *Server) cmdClient(msg *Message, client *Client) (resp.Value, error) {
|
||||
func (s *Server) cmdClient(msg *Message, client *Client) (resp.Value, error) {
|
||||
start := time.Now()
|
||||
|
||||
if len(msg.Args) == 1 {
|
||||
|
@ -67,11 +67,11 @@ func (c *Server) cmdClient(msg *Message, client *Client) (resp.Value, error) {
|
|||
return NOMessage, errInvalidNumberOfArguments
|
||||
}
|
||||
var list []*Client
|
||||
c.connsmu.RLock()
|
||||
for _, cc := range c.conns {
|
||||
s.connsmu.RLock()
|
||||
for _, cc := range s.conns {
|
||||
list = append(list, cc)
|
||||
}
|
||||
c.connsmu.RUnlock()
|
||||
s.connsmu.RUnlock()
|
||||
sort.Sort(byID(list))
|
||||
now := time.Now()
|
||||
var buf []byte
|
||||
|
@ -190,8 +190,8 @@ func (c *Server) cmdClient(msg *Message, client *Client) (resp.Value, error) {
|
|||
}
|
||||
}
|
||||
var cclose *Client
|
||||
c.connsmu.RLock()
|
||||
for _, cc := range c.conns {
|
||||
s.connsmu.RLock()
|
||||
for _, cc := range s.conns {
|
||||
if useID && fmt.Sprintf("%d", cc.id) == id {
|
||||
cclose = cc
|
||||
break
|
||||
|
@ -200,7 +200,7 @@ func (c *Server) cmdClient(msg *Message, client *Client) (resp.Value, error) {
|
|||
break
|
||||
}
|
||||
}
|
||||
c.connsmu.RUnlock()
|
||||
s.connsmu.RUnlock()
|
||||
if cclose == nil {
|
||||
return NOMessage, errors.New("No such client")
|
||||
}
|
||||
|
|
|
@ -326,7 +326,7 @@ func (config *Config) getProperty(name string) string {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *Server) cmdConfigGet(msg *Message) (res resp.Value, err error) {
|
||||
func (s *Server) cmdConfigGet(msg *Message) (res resp.Value, err error) {
|
||||
start := time.Now()
|
||||
vs := msg.Args[1:]
|
||||
var ok bool
|
||||
|
@ -338,7 +338,7 @@ func (c *Server) cmdConfigGet(msg *Message) (res resp.Value, err error) {
|
|||
if len(vs) != 0 {
|
||||
return NOMessage, errInvalidNumberOfArguments
|
||||
}
|
||||
m := c.config.getProperties(name)
|
||||
m := s.config.getProperties(name)
|
||||
switch msg.OutputType {
|
||||
case JSON:
|
||||
data, err := json.Marshal(m)
|
||||
|
@ -352,7 +352,7 @@ func (c *Server) cmdConfigGet(msg *Message) (res resp.Value, err error) {
|
|||
}
|
||||
return
|
||||
}
|
||||
func (c *Server) cmdConfigSet(msg *Message) (res resp.Value, err error) {
|
||||
func (s *Server) cmdConfigSet(msg *Message) (res resp.Value, err error) {
|
||||
start := time.Now()
|
||||
vs := msg.Args[1:]
|
||||
var ok bool
|
||||
|
@ -370,19 +370,19 @@ func (c *Server) cmdConfigSet(msg *Message) (res resp.Value, err error) {
|
|||
if len(vs) != 0 {
|
||||
return NOMessage, errInvalidNumberOfArguments
|
||||
}
|
||||
if err := c.config.setProperty(name, value, false); err != nil {
|
||||
if err := s.config.setProperty(name, value, false); err != nil {
|
||||
return NOMessage, err
|
||||
}
|
||||
return OKMessage(msg, start), nil
|
||||
}
|
||||
func (c *Server) cmdConfigRewrite(msg *Message) (res resp.Value, err error) {
|
||||
func (s *Server) cmdConfigRewrite(msg *Message) (res resp.Value, err error) {
|
||||
start := time.Now()
|
||||
vs := msg.Args[1:]
|
||||
|
||||
if len(vs) != 0 {
|
||||
return NOMessage, errInvalidNumberOfArguments
|
||||
}
|
||||
c.config.write(true)
|
||||
s.config.write(true)
|
||||
return OKMessage(msg, start), nil
|
||||
}
|
||||
|
||||
|
|
|
@ -19,7 +19,7 @@ func randMassInsertPosition(minLat, minLon, maxLat, maxLon float64) (float64, fl
|
|||
return lat, lon
|
||||
}
|
||||
|
||||
func (c *Server) cmdMassInsert(msg *Message) (res resp.Value, err error) {
|
||||
func (s *Server) cmdMassInsert(msg *Message) (res resp.Value, err error) {
|
||||
start := time.Now()
|
||||
vs := msg.Args[1:]
|
||||
|
||||
|
@ -76,17 +76,17 @@ func (c *Server) cmdMassInsert(msg *Message) (res resp.Value, err error) {
|
|||
}
|
||||
|
||||
docmd := func(args []string) error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
var nmsg Message
|
||||
nmsg = *msg
|
||||
nmsg._command = ""
|
||||
nmsg.Args = args
|
||||
_, d, err := c.command(&nmsg, nil)
|
||||
_, d, err := s.command(&nmsg, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return c.writeAOF(nmsg.Args, &d)
|
||||
return s.writeAOF(nmsg.Args, &d)
|
||||
|
||||
}
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
|
@ -146,7 +146,7 @@ func (c *Server) cmdMassInsert(msg *Message) (res resp.Value, err error) {
|
|||
return OKMessage(msg, start), nil
|
||||
}
|
||||
|
||||
func (c *Server) cmdSleep(msg *Message) (res resp.Value, err error) {
|
||||
func (s *Server) cmdSleep(msg *Message) (res resp.Value, err error) {
|
||||
start := time.Now()
|
||||
if len(msg.Args) != 2 {
|
||||
return NOMessage, errInvalidNumberOfArguments
|
||||
|
|
|
@ -9,12 +9,12 @@ import (
|
|||
)
|
||||
|
||||
// clearIDExpires clears a single item from the expires list.
|
||||
func (c *Server) clearIDExpires(key, id string) (cleared bool) {
|
||||
if c.expires.Len() > 0 {
|
||||
if idm, ok := c.expires.Get(key); ok {
|
||||
func (s *Server) clearIDExpires(key, id string) (cleared bool) {
|
||||
if s.expires.Len() > 0 {
|
||||
if idm, ok := s.expires.Get(key); ok {
|
||||
if _, ok := idm.(*rhh.Map).Delete(id); ok {
|
||||
if idm.(*rhh.Map).Len() == 0 {
|
||||
c.expires.Delete(key)
|
||||
s.expires.Delete(key)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
@ -24,31 +24,31 @@ func (c *Server) clearIDExpires(key, id string) (cleared bool) {
|
|||
}
|
||||
|
||||
// clearKeyExpires clears all items that are marked as expires from a single key.
|
||||
func (c *Server) clearKeyExpires(key string) {
|
||||
c.expires.Delete(key)
|
||||
func (s *Server) clearKeyExpires(key string) {
|
||||
s.expires.Delete(key)
|
||||
}
|
||||
|
||||
// moveKeyExpires moves all items that are marked as expires from a key to a newKey.
|
||||
func (c *Server) moveKeyExpires(key, newKey string) {
|
||||
if idm, ok := c.expires.Delete(key); ok {
|
||||
c.expires.Set(newKey, idm)
|
||||
func (s *Server) moveKeyExpires(key, newKey string) {
|
||||
if idm, ok := s.expires.Delete(key); ok {
|
||||
s.expires.Set(newKey, idm)
|
||||
}
|
||||
}
|
||||
|
||||
// expireAt marks an item as expires at a specific time.
|
||||
func (c *Server) expireAt(key, id string, at time.Time) {
|
||||
idm, ok := c.expires.Get(key)
|
||||
func (s *Server) expireAt(key, id string, at time.Time) {
|
||||
idm, ok := s.expires.Get(key)
|
||||
if !ok {
|
||||
idm = rhh.New(0)
|
||||
c.expires.Set(key, idm)
|
||||
s.expires.Set(key, idm)
|
||||
}
|
||||
idm.(*rhh.Map).Set(id, at.UnixNano())
|
||||
}
|
||||
|
||||
// getExpires returns the when an item expires.
|
||||
func (c *Server) getExpires(key, id string) (at time.Time, ok bool) {
|
||||
if c.expires.Len() > 0 {
|
||||
if idm, ok := c.expires.Get(key); ok {
|
||||
func (s *Server) getExpires(key, id string) (at time.Time, ok bool) {
|
||||
if s.expires.Len() > 0 {
|
||||
if idm, ok := s.expires.Get(key); ok {
|
||||
if atv, ok := idm.(*rhh.Map).Get(id); ok {
|
||||
return time.Unix(0, atv.(int64)), true
|
||||
}
|
||||
|
@ -58,8 +58,8 @@ func (c *Server) getExpires(key, id string) (at time.Time, ok bool) {
|
|||
}
|
||||
|
||||
// hasExpired returns true if an item has expired.
|
||||
func (c *Server) hasExpired(key, id string) bool {
|
||||
if at, ok := c.getExpires(key, id); ok {
|
||||
func (s *Server) hasExpired(key, id string) bool {
|
||||
if at, ok := s.getExpires(key, id); ok {
|
||||
return time.Now().After(at)
|
||||
}
|
||||
return false
|
||||
|
@ -70,26 +70,26 @@ const bgExpireSegmentSize = 20
|
|||
|
||||
// expirePurgeSweep is ran from backgroundExpiring operation and performs
|
||||
// segmented sweep of the expires list
|
||||
func (c *Server) expirePurgeSweep(rng *rand.Rand) (purged int) {
|
||||
func (s *Server) expirePurgeSweep(rng *rand.Rand) (purged int) {
|
||||
now := time.Now().UnixNano()
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if c.expires.Len() == 0 {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.expires.Len() == 0 {
|
||||
return 0
|
||||
}
|
||||
for i := 0; i < bgExpireSegmentSize; i++ {
|
||||
if key, idm, ok := c.expires.GetPos(rng.Uint64()); ok {
|
||||
if key, idm, ok := s.expires.GetPos(rng.Uint64()); ok {
|
||||
id, atv, ok := idm.(*rhh.Map).GetPos(rng.Uint64())
|
||||
if ok {
|
||||
if now > atv.(int64) {
|
||||
// expired, purge from database
|
||||
msg := &Message{}
|
||||
msg.Args = []string{"del", key, id}
|
||||
_, d, err := c.cmdDel(msg)
|
||||
_, d, err := s.cmdDel(msg)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if err := c.writeAOF(msg.Args, &d); err != nil {
|
||||
if err := s.writeAOF(msg.Args, &d); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
purged++
|
||||
|
@ -97,21 +97,21 @@ func (c *Server) expirePurgeSweep(rng *rand.Rand) (purged int) {
|
|||
}
|
||||
}
|
||||
// recycle the lock
|
||||
c.mu.Unlock()
|
||||
c.mu.Lock()
|
||||
s.mu.Unlock()
|
||||
s.mu.Lock()
|
||||
}
|
||||
return purged
|
||||
}
|
||||
|
||||
// backgroundExpiring watches for when items that have expired must be purged
|
||||
// from the database. It's executes 10 times a seconds.
|
||||
func (c *Server) backgroundExpiring() {
|
||||
func (s *Server) backgroundExpiring() {
|
||||
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
for {
|
||||
if c.stopServer.on() {
|
||||
if s.stopServer.on() {
|
||||
return
|
||||
}
|
||||
purged := c.expirePurgeSweep(rng)
|
||||
purged := s.expirePurgeSweep(rng)
|
||||
if purged > bgExpireSegmentSize/4 {
|
||||
// do another purge immediately
|
||||
continue
|
||||
|
|
|
@ -6,8 +6,10 @@ import (
|
|||
"github.com/tidwall/geojson"
|
||||
)
|
||||
|
||||
// BinaryOp represents various operators for expressions
|
||||
type BinaryOp byte
|
||||
|
||||
// expression operator enum
|
||||
const (
|
||||
NOOP BinaryOp = iota
|
||||
AND
|
||||
|
@ -19,7 +21,8 @@ const (
|
|||
tokenRParen = ")"
|
||||
)
|
||||
|
||||
// areaExpression is (maybe negated) either an spatial object or operator + children (other expressions).
|
||||
// areaExpression is (maybe negated) either an spatial object or operator +
|
||||
// children (other expressions).
|
||||
type areaExpression struct {
|
||||
negate bool
|
||||
obj geojson.Object
|
||||
|
|
|
@ -102,7 +102,7 @@ func fenceMatch(
|
|||
if fence.roam.on {
|
||||
if details.command == "set" {
|
||||
roamNearbys, roamFaraways =
|
||||
fenceMatchRoam(sw.c, fence, details.key,
|
||||
fenceMatchRoam(sw.s, fence, details.key,
|
||||
details.id, details.obj)
|
||||
}
|
||||
if len(roamNearbys) == 0 && len(roamFaraways) == 0 {
|
||||
|
@ -272,7 +272,7 @@ func extendRoamMessage(
|
|||
math.Floor(match.meters*1000)/1000, 'f', -1, 64)
|
||||
if fence.roam.scan != "" {
|
||||
nmsg = append(nmsg, `,"scan":[`...)
|
||||
col := sw.c.getCol(fence.roam.key)
|
||||
col := sw.s.getCol(fence.roam.key)
|
||||
if col != nil {
|
||||
obj, _, ok := col.Get(match.id)
|
||||
if ok {
|
||||
|
@ -354,10 +354,10 @@ func fenceMatchObject(fence *liveFenceSwitches, obj geojson.Object) bool {
|
|||
}
|
||||
|
||||
func fenceMatchRoam(
|
||||
c *Server, fence *liveFenceSwitches,
|
||||
s *Server, fence *liveFenceSwitches,
|
||||
tkey, tid string, obj geojson.Object,
|
||||
) (nearbys, faraways []roamMatch) {
|
||||
col := c.getCol(fence.roam.key)
|
||||
col := s.getCol(fence.roam.key)
|
||||
if col == nil {
|
||||
return
|
||||
}
|
||||
|
@ -373,7 +373,7 @@ func fenceMatchRoam(
|
|||
col.Intersects(geojson.NewRect(rect), 0, nil, nil, func(
|
||||
id string, obj2 geojson.Object, fields []float64,
|
||||
) bool {
|
||||
if c.hasExpired(fence.roam.key, id) {
|
||||
if s.hasExpired(fence.roam.key, id) {
|
||||
return true
|
||||
}
|
||||
var idMatch bool
|
||||
|
@ -410,7 +410,7 @@ func fenceMatchRoam(
|
|||
})
|
||||
for id := range prevNearbys {
|
||||
obj2, _, ok := col.Get(id)
|
||||
if ok && !c.hasExpired(fence.roam.key, id) {
|
||||
if ok && !s.hasExpired(fence.roam.key, id) {
|
||||
faraways = append(faraways, roamMatch{
|
||||
id: id,
|
||||
obj: obj2,
|
||||
|
|
|
@ -18,7 +18,7 @@ var errNoLongerFollowing = errors.New("no longer following")
|
|||
|
||||
const checksumsz = 512 * 1024
|
||||
|
||||
func (c *Server) cmdFollow(msg *Message) (res resp.Value, err error) {
|
||||
func (s *Server) cmdFollow(msg *Message) (res resp.Value, err error) {
|
||||
start := time.Now()
|
||||
vs := msg.Args[1:]
|
||||
var ok bool
|
||||
|
@ -37,58 +37,58 @@ func (c *Server) cmdFollow(msg *Message) (res resp.Value, err error) {
|
|||
sport = strings.ToLower(sport)
|
||||
var update bool
|
||||
if host == "no" && sport == "one" {
|
||||
update = c.config.followHost() != "" || c.config.followPort() != 0
|
||||
c.config.setFollowHost("")
|
||||
c.config.setFollowPort(0)
|
||||
update = s.config.followHost() != "" || s.config.followPort() != 0
|
||||
s.config.setFollowHost("")
|
||||
s.config.setFollowPort(0)
|
||||
} else {
|
||||
n, err := strconv.ParseUint(sport, 10, 64)
|
||||
if err != nil {
|
||||
return NOMessage, errInvalidArgument(sport)
|
||||
}
|
||||
port := int(n)
|
||||
update = c.config.followHost() != host || c.config.followPort() != port
|
||||
auth := c.config.leaderAuth()
|
||||
update = s.config.followHost() != host || s.config.followPort() != port
|
||||
auth := s.config.leaderAuth()
|
||||
if update {
|
||||
c.mu.Unlock()
|
||||
s.mu.Unlock()
|
||||
conn, err := DialTimeout(fmt.Sprintf("%s:%d", host, port), time.Second*2)
|
||||
if err != nil {
|
||||
c.mu.Lock()
|
||||
s.mu.Lock()
|
||||
return NOMessage, fmt.Errorf("cannot follow: %v", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
if auth != "" {
|
||||
if err := c.followDoLeaderAuth(conn, auth); err != nil {
|
||||
if err := s.followDoLeaderAuth(conn, auth); err != nil {
|
||||
return NOMessage, fmt.Errorf("cannot follow: %v", err)
|
||||
}
|
||||
}
|
||||
m, err := doServer(conn)
|
||||
if err != nil {
|
||||
c.mu.Lock()
|
||||
s.mu.Lock()
|
||||
return NOMessage, fmt.Errorf("cannot follow: %v", err)
|
||||
}
|
||||
if m["id"] == "" {
|
||||
c.mu.Lock()
|
||||
s.mu.Lock()
|
||||
return NOMessage, fmt.Errorf("cannot follow: invalid id")
|
||||
}
|
||||
if m["id"] == c.config.serverID() {
|
||||
c.mu.Lock()
|
||||
if m["id"] == s.config.serverID() {
|
||||
s.mu.Lock()
|
||||
return NOMessage, fmt.Errorf("cannot follow self")
|
||||
}
|
||||
if m["following"] != "" {
|
||||
c.mu.Lock()
|
||||
s.mu.Lock()
|
||||
return NOMessage, fmt.Errorf("cannot follow a follower")
|
||||
}
|
||||
c.mu.Lock()
|
||||
s.mu.Lock()
|
||||
}
|
||||
c.config.setFollowHost(host)
|
||||
c.config.setFollowPort(port)
|
||||
s.config.setFollowHost(host)
|
||||
s.config.setFollowPort(port)
|
||||
}
|
||||
c.config.write(false)
|
||||
s.config.write(false)
|
||||
if update {
|
||||
c.followc.add(1)
|
||||
if c.config.followHost() != "" {
|
||||
s.followc.add(1)
|
||||
if s.config.followHost() != "" {
|
||||
log.Infof("following new host '%s' '%s'.", host, sport)
|
||||
go c.follow(c.config.followHost(), c.config.followPort(), c.followc.get())
|
||||
go s.follow(s.config.followHost(), s.config.followPort(), s.followc.get())
|
||||
} else {
|
||||
log.Infof("following no one")
|
||||
}
|
||||
|
@ -97,7 +97,7 @@ func (c *Server) cmdFollow(msg *Message) (res resp.Value, err error) {
|
|||
}
|
||||
|
||||
// cmdReplConf is a command handler that sets replication configuration info
|
||||
func (c *Server) cmdReplConf(msg *Message, client *Client) (res resp.Value, err error) {
|
||||
func (s *Server) cmdReplConf(msg *Message, client *Client) (res resp.Value, err error) {
|
||||
start := time.Now()
|
||||
vs := msg.Args[1:]
|
||||
var ok bool
|
||||
|
@ -121,7 +121,7 @@ func (c *Server) cmdReplConf(msg *Message, client *Client) (res resp.Value, err
|
|||
}
|
||||
|
||||
// Apply the replication port to the client and return
|
||||
for _, c := range c.conns {
|
||||
for _, c := range s.conns {
|
||||
if c.remoteAddr == client.remoteAddr {
|
||||
c.replPort = port
|
||||
return OKMessage(msg, start), nil
|
||||
|
@ -147,30 +147,30 @@ func doServer(conn *RESPConn) (map[string]string, error) {
|
|||
return m, err
|
||||
}
|
||||
|
||||
func (c *Server) followHandleCommand(args []string, followc int, w io.Writer) (int, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if c.followc.get() != followc {
|
||||
return c.aofsz, errNoLongerFollowing
|
||||
func (s *Server) followHandleCommand(args []string, followc int, w io.Writer) (int, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.followc.get() != followc {
|
||||
return s.aofsz, errNoLongerFollowing
|
||||
}
|
||||
msg := &Message{Args: args}
|
||||
|
||||
_, d, err := c.command(msg, nil)
|
||||
_, d, err := s.command(msg, nil)
|
||||
if err != nil {
|
||||
if commandErrIsFatal(err) {
|
||||
return c.aofsz, err
|
||||
return s.aofsz, err
|
||||
}
|
||||
}
|
||||
if err := c.writeAOF(args, &d); err != nil {
|
||||
return c.aofsz, err
|
||||
if err := s.writeAOF(args, &d); err != nil {
|
||||
return s.aofsz, err
|
||||
}
|
||||
if len(c.aofbuf) > 10240 {
|
||||
c.flushAOF(false)
|
||||
if len(s.aofbuf) > 10240 {
|
||||
s.flushAOF(false)
|
||||
}
|
||||
return c.aofsz, nil
|
||||
return s.aofsz, nil
|
||||
}
|
||||
|
||||
func (c *Server) followDoLeaderAuth(conn *RESPConn, auth string) error {
|
||||
func (s *Server) followDoLeaderAuth(conn *RESPConn, auth string) error {
|
||||
v, err := conn.Do("auth", auth)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -184,14 +184,14 @@ func (c *Server) followDoLeaderAuth(conn *RESPConn, auth string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *Server) followStep(host string, port int, followc int) error {
|
||||
if c.followc.get() != followc {
|
||||
func (s *Server) followStep(host string, port int, followc int) error {
|
||||
if s.followc.get() != followc {
|
||||
return errNoLongerFollowing
|
||||
}
|
||||
c.mu.Lock()
|
||||
c.fcup = false
|
||||
auth := c.config.leaderAuth()
|
||||
c.mu.Unlock()
|
||||
s.mu.Lock()
|
||||
s.fcup = false
|
||||
auth := s.config.leaderAuth()
|
||||
s.mu.Unlock()
|
||||
addr := fmt.Sprintf("%s:%d", host, port)
|
||||
|
||||
// check if we are following self
|
||||
|
@ -201,7 +201,7 @@ func (c *Server) followStep(host string, port int, followc int) error {
|
|||
}
|
||||
defer conn.Close()
|
||||
if auth != "" {
|
||||
if err := c.followDoLeaderAuth(conn, auth); err != nil {
|
||||
if err := s.followDoLeaderAuth(conn, auth); err != nil {
|
||||
return fmt.Errorf("cannot follow: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -213,7 +213,7 @@ func (c *Server) followStep(host string, port int, followc int) error {
|
|||
if m["id"] == "" {
|
||||
return fmt.Errorf("cannot follow: invalid id")
|
||||
}
|
||||
if m["id"] == c.config.serverID() {
|
||||
if m["id"] == s.config.serverID() {
|
||||
return fmt.Errorf("cannot follow self")
|
||||
}
|
||||
if m["following"] != "" {
|
||||
|
@ -221,13 +221,13 @@ func (c *Server) followStep(host string, port int, followc int) error {
|
|||
}
|
||||
|
||||
// verify checksum
|
||||
pos, err := c.followCheckSome(addr, followc)
|
||||
pos, err := s.followCheckSome(addr, followc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Send the replication port to the leader
|
||||
v, err := conn.Do("replconf", "listening-port", c.port)
|
||||
v, err := conn.Do("replconf", "listening-port", s.port)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -262,10 +262,10 @@ func (c *Server) followStep(host string, port int, followc int) error {
|
|||
|
||||
caughtUp := pos >= aofSize
|
||||
if caughtUp {
|
||||
c.mu.Lock()
|
||||
c.fcup = true
|
||||
c.fcuponce = true
|
||||
c.mu.Unlock()
|
||||
s.mu.Lock()
|
||||
s.fcup = true
|
||||
s.fcuponce = true
|
||||
s.mu.Unlock()
|
||||
log.Info("caught up")
|
||||
}
|
||||
nullw := ioutil.Discard
|
||||
|
@ -283,18 +283,18 @@ func (c *Server) followStep(host string, port int, followc int) error {
|
|||
svals[i] = vals[i].String()
|
||||
}
|
||||
|
||||
aofsz, err := c.followHandleCommand(svals, followc, nullw)
|
||||
aofsz, err := s.followHandleCommand(svals, followc, nullw)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !caughtUp {
|
||||
if aofsz >= int(aofSize) {
|
||||
caughtUp = true
|
||||
c.mu.Lock()
|
||||
c.flushAOF(false)
|
||||
c.fcup = true
|
||||
c.fcuponce = true
|
||||
c.mu.Unlock()
|
||||
s.mu.Lock()
|
||||
s.flushAOF(false)
|
||||
s.fcup = true
|
||||
s.fcuponce = true
|
||||
s.mu.Unlock()
|
||||
log.Info("caught up")
|
||||
}
|
||||
}
|
||||
|
@ -302,9 +302,9 @@ func (c *Server) followStep(host string, port int, followc int) error {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *Server) follow(host string, port int, followc int) {
|
||||
func (s *Server) follow(host string, port int, followc int) {
|
||||
for {
|
||||
err := c.followStep(host, port, followc)
|
||||
err := s.followStep(host, port, followc)
|
||||
if err == errNoLongerFollowing {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@ func (a hooksByName) Swap(i, j int) {
|
|||
a[i], a[j] = a[j], a[i]
|
||||
}
|
||||
|
||||
func (c *Server) cmdSetHook(msg *Message, chanCmd bool) (
|
||||
func (s *Server) cmdSetHook(msg *Message, chanCmd bool) (
|
||||
res resp.Value, d commandDetails, err error,
|
||||
) {
|
||||
start := time.Now()
|
||||
|
@ -55,7 +55,7 @@ func (c *Server) cmdSetHook(msg *Message, chanCmd bool) (
|
|||
}
|
||||
for _, url := range strings.Split(urls, ",") {
|
||||
url = strings.TrimSpace(url)
|
||||
err := c.epc.Validate(url)
|
||||
err := s.epc.Validate(url)
|
||||
if err != nil {
|
||||
log.Errorf("sethook: %v", err)
|
||||
return resp.SimpleStringValue(""), d, errInvalidArgument(url)
|
||||
|
@ -108,15 +108,15 @@ func (c *Server) cmdSetHook(msg *Message, chanCmd bool) (
|
|||
}
|
||||
break
|
||||
}
|
||||
s, err := c.cmdSearchArgs(true, cmdlc, vs, types)
|
||||
defer s.Close()
|
||||
args, err := s.cmdSearchArgs(true, cmdlc, vs, types)
|
||||
defer args.Close()
|
||||
if err != nil {
|
||||
return NOMessage, d, err
|
||||
}
|
||||
if !s.fence {
|
||||
if !args.fence {
|
||||
return NOMessage, d, errors.New("missing FENCE argument")
|
||||
}
|
||||
s.cmd = cmdlc
|
||||
args.cmd = cmdlc
|
||||
cmsg := &Message{}
|
||||
*cmsg = *msg
|
||||
cmsg.Args = make([]string, len(commandvs))
|
||||
|
@ -130,33 +130,34 @@ func (c *Server) cmdSetHook(msg *Message, chanCmd bool) (
|
|||
sort.Sort(hookMetaByName(metas))
|
||||
|
||||
hook := &Hook{
|
||||
Key: s.key,
|
||||
Key: args.key,
|
||||
Name: name,
|
||||
Endpoints: endpoints,
|
||||
Fence: &s,
|
||||
Fence: &args,
|
||||
Message: cmsg,
|
||||
epm: c.epc,
|
||||
epm: s.epc,
|
||||
Metas: metas,
|
||||
channel: chanCmd,
|
||||
cond: sync.NewCond(&sync.Mutex{}),
|
||||
counter: &c.statsTotalMsgsSent,
|
||||
counter: &s.statsTotalMsgsSent,
|
||||
}
|
||||
if expiresSet {
|
||||
hook.expires =
|
||||
time.Now().Add(time.Duration(expires * float64(time.Second)))
|
||||
}
|
||||
if !chanCmd {
|
||||
hook.db = c.qdb
|
||||
hook.db = s.qdb
|
||||
}
|
||||
var wr bytes.Buffer
|
||||
hook.ScanWriter, err = c.newScanWriter(
|
||||
&wr, cmsg, s.key, s.output, s.precision, s.glob, false,
|
||||
s.cursor, s.limit, s.wheres, s.whereins, s.whereevals, s.nofields)
|
||||
hook.ScanWriter, err = s.newScanWriter(
|
||||
&wr, cmsg, args.key, args.output, args.precision, args.glob, false,
|
||||
args.cursor, args.limit, args.wheres, args.whereins, args.whereevals,
|
||||
args.nofields)
|
||||
if err != nil {
|
||||
|
||||
return NOMessage, d, err
|
||||
}
|
||||
prevHook := c.hooks[name]
|
||||
prevHook := s.hooks[name]
|
||||
if prevHook != nil {
|
||||
if prevHook.channel != chanCmd {
|
||||
return NOMessage, d,
|
||||
|
@ -167,7 +168,7 @@ func (c *Server) cmdSetHook(msg *Message, chanCmd bool) (
|
|||
// for good measure.
|
||||
prevHook.Signal()
|
||||
if !hook.expires.IsZero() {
|
||||
c.hookex.Push(hook)
|
||||
s.hookex.Push(hook)
|
||||
}
|
||||
switch msg.OutputType {
|
||||
case JSON:
|
||||
|
@ -177,22 +178,22 @@ func (c *Server) cmdSetHook(msg *Message, chanCmd bool) (
|
|||
}
|
||||
}
|
||||
prevHook.Close()
|
||||
delete(c.hooks, name)
|
||||
delete(c.hooksOut, name)
|
||||
delete(s.hooks, name)
|
||||
delete(s.hooksOut, name)
|
||||
}
|
||||
|
||||
d.updated = true
|
||||
d.timestamp = time.Now()
|
||||
|
||||
c.hooks[name] = hook
|
||||
s.hooks[name] = hook
|
||||
if hook.Fence.detect == nil || hook.Fence.detect["outside"] {
|
||||
c.hooksOut[name] = hook
|
||||
s.hooksOut[name] = hook
|
||||
}
|
||||
|
||||
// remove previous hook from spatial index
|
||||
if prevHook != nil && prevHook.Fence != nil && prevHook.Fence.obj != nil {
|
||||
rect := prevHook.Fence.obj.Rect()
|
||||
c.hookTree.Delete(
|
||||
s.hookTree.Delete(
|
||||
[2]float64{rect.Min.X, rect.Min.Y},
|
||||
[2]float64{rect.Max.X, rect.Max.Y},
|
||||
prevHook)
|
||||
|
@ -200,7 +201,7 @@ func (c *Server) cmdSetHook(msg *Message, chanCmd bool) (
|
|||
// add hook to spatial index
|
||||
if hook != nil && hook.Fence != nil && hook.Fence.obj != nil {
|
||||
rect := hook.Fence.obj.Rect()
|
||||
c.hookTree.Insert(
|
||||
s.hookTree.Insert(
|
||||
[2]float64{rect.Min.X, rect.Min.Y},
|
||||
[2]float64{rect.Max.X, rect.Max.Y},
|
||||
hook)
|
||||
|
@ -208,7 +209,7 @@ func (c *Server) cmdSetHook(msg *Message, chanCmd bool) (
|
|||
|
||||
hook.Open() // Opens a goroutine to notify the hook
|
||||
if !hook.expires.IsZero() {
|
||||
c.hookex.Push(hook)
|
||||
s.hookex.Push(hook)
|
||||
}
|
||||
switch msg.OutputType {
|
||||
case JSON:
|
||||
|
@ -219,7 +220,7 @@ func (c *Server) cmdSetHook(msg *Message, chanCmd bool) (
|
|||
return NOMessage, d, nil
|
||||
}
|
||||
|
||||
func (c *Server) cmdDelHook(msg *Message, chanCmd bool) (
|
||||
func (s *Server) cmdDelHook(msg *Message, chanCmd bool) (
|
||||
res resp.Value, d commandDetails, err error,
|
||||
) {
|
||||
start := time.Now()
|
||||
|
@ -233,15 +234,15 @@ func (c *Server) cmdDelHook(msg *Message, chanCmd bool) (
|
|||
if len(vs) != 0 {
|
||||
return NOMessage, d, errInvalidNumberOfArguments
|
||||
}
|
||||
if hook, ok := c.hooks[name]; ok && hook.channel == chanCmd {
|
||||
if hook, ok := s.hooks[name]; ok && hook.channel == chanCmd {
|
||||
hook.Close()
|
||||
// remove hook from maps
|
||||
delete(c.hooks, hook.Name)
|
||||
delete(c.hooksOut, hook.Name)
|
||||
delete(s.hooks, hook.Name)
|
||||
delete(s.hooksOut, hook.Name)
|
||||
// remove hook from spatial index
|
||||
if hook != nil && hook.Fence != nil && hook.Fence.obj != nil {
|
||||
rect := hook.Fence.obj.Rect()
|
||||
c.hookTree.Delete(
|
||||
s.hookTree.Delete(
|
||||
[2]float64{rect.Min.X, rect.Min.Y},
|
||||
[2]float64{rect.Max.X, rect.Max.Y},
|
||||
hook)
|
||||
|
@ -262,7 +263,7 @@ func (c *Server) cmdDelHook(msg *Message, chanCmd bool) (
|
|||
return
|
||||
}
|
||||
|
||||
func (c *Server) cmdPDelHook(msg *Message, channel bool) (
|
||||
func (s *Server) cmdPDelHook(msg *Message, channel bool) (
|
||||
res resp.Value, d commandDetails, err error,
|
||||
) {
|
||||
start := time.Now()
|
||||
|
@ -278,7 +279,7 @@ func (c *Server) cmdPDelHook(msg *Message, channel bool) (
|
|||
}
|
||||
|
||||
count := 0
|
||||
for name, hook := range c.hooks {
|
||||
for name, hook := range s.hooks {
|
||||
if hook.channel != channel {
|
||||
continue
|
||||
}
|
||||
|
@ -288,12 +289,12 @@ func (c *Server) cmdPDelHook(msg *Message, channel bool) (
|
|||
}
|
||||
hook.Close()
|
||||
// remove hook from maps
|
||||
delete(c.hooks, hook.Name)
|
||||
delete(c.hooksOut, hook.Name)
|
||||
delete(s.hooks, hook.Name)
|
||||
delete(s.hooksOut, hook.Name)
|
||||
// remove hook from spatial index
|
||||
if hook != nil && hook.Fence != nil && hook.Fence.obj != nil {
|
||||
rect := hook.Fence.obj.Rect()
|
||||
c.hookTree.Delete(
|
||||
s.hookTree.Delete(
|
||||
[2]float64{rect.Min.X, rect.Min.Y},
|
||||
[2]float64{rect.Max.X, rect.Max.Y},
|
||||
hook)
|
||||
|
@ -315,9 +316,9 @@ func (c *Server) cmdPDelHook(msg *Message, channel bool) (
|
|||
// possiblyExpireHook will evaluate a hook by it's name for expiration and
|
||||
// purge it from the database if needed. This operation is called from an
|
||||
// independent goroutine
|
||||
func (c *Server) possiblyExpireHook(name string) {
|
||||
c.mu.Lock()
|
||||
if h, ok := c.hooks[name]; ok {
|
||||
func (s *Server) possiblyExpireHook(name string) {
|
||||
s.mu.Lock()
|
||||
if h, ok := s.hooks[name]; ok {
|
||||
if !h.expires.IsZero() && time.Now().After(h.expires) {
|
||||
// purge from database
|
||||
msg := &Message{}
|
||||
|
@ -326,22 +327,22 @@ func (c *Server) possiblyExpireHook(name string) {
|
|||
} else {
|
||||
msg.Args = []string{"delhook", h.Name}
|
||||
}
|
||||
_, d, err := c.cmdDelHook(msg, h.channel)
|
||||
_, d, err := s.cmdDelHook(msg, h.channel)
|
||||
if err != nil {
|
||||
c.mu.Unlock()
|
||||
s.mu.Unlock()
|
||||
panic(err)
|
||||
}
|
||||
if err := c.writeAOF(msg.Args, &d); err != nil {
|
||||
c.mu.Unlock()
|
||||
if err := s.writeAOF(msg.Args, &d); err != nil {
|
||||
s.mu.Unlock()
|
||||
panic(err)
|
||||
}
|
||||
log.Debugf("purged hook %v", h.Name)
|
||||
}
|
||||
}
|
||||
c.mu.Unlock()
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (c *Server) cmdHooks(msg *Message, channel bool) (
|
||||
func (s *Server) cmdHooks(msg *Message, channel bool) (
|
||||
res resp.Value, err error,
|
||||
) {
|
||||
start := time.Now()
|
||||
|
@ -358,7 +359,7 @@ func (c *Server) cmdHooks(msg *Message, channel bool) (
|
|||
}
|
||||
|
||||
var hooks []*Hook
|
||||
for name, hook := range c.hooks {
|
||||
for name, hook := range s.hooks {
|
||||
if hook.channel != channel {
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -41,7 +41,7 @@ func jsonString(s string) string {
|
|||
return string(b)
|
||||
}
|
||||
|
||||
func isJsonNumber(data string) bool {
|
||||
func isJSONNumber(data string) bool {
|
||||
// Returns true if the given string can be encoded as a JSON number value.
|
||||
// See:
|
||||
// https://json.org
|
||||
|
@ -162,7 +162,7 @@ func jsonTimeFormat(t time.Time) string {
|
|||
return string(b)
|
||||
}
|
||||
|
||||
func (c *Server) cmdJget(msg *Message) (resp.Value, error) {
|
||||
func (s *Server) cmdJget(msg *Message) (resp.Value, error) {
|
||||
start := time.Now()
|
||||
|
||||
if len(msg.Args) < 3 {
|
||||
|
@ -187,7 +187,7 @@ func (c *Server) cmdJget(msg *Message) (resp.Value, error) {
|
|||
}
|
||||
}
|
||||
}
|
||||
col := c.getCol(key)
|
||||
col := s.getCol(key)
|
||||
if col == nil {
|
||||
if msg.OutputType == RESP {
|
||||
return resp.NullValue(), nil
|
||||
|
@ -233,7 +233,7 @@ func (c *Server) cmdJget(msg *Message) (resp.Value, error) {
|
|||
return NOMessage, nil
|
||||
}
|
||||
|
||||
func (c *Server) cmdJset(msg *Message) (res resp.Value, d commandDetails, err error) {
|
||||
func (s *Server) cmdJset(msg *Message) (res resp.Value, d commandDetails, err error) {
|
||||
// JSET key path value [RAW]
|
||||
start := time.Now()
|
||||
|
||||
|
@ -260,12 +260,12 @@ func (c *Server) cmdJset(msg *Message) (res resp.Value, d commandDetails, err er
|
|||
if !str && !raw {
|
||||
switch val {
|
||||
default:
|
||||
raw = isJsonNumber(val)
|
||||
raw = isJSONNumber(val)
|
||||
case "true", "false", "null":
|
||||
raw = true
|
||||
}
|
||||
}
|
||||
col := c.getCol(key)
|
||||
col := s.getCol(key)
|
||||
var createcol bool
|
||||
if col == nil {
|
||||
col = collection.New()
|
||||
|
@ -293,10 +293,10 @@ func (c *Server) cmdJset(msg *Message) (res resp.Value, d commandDetails, err er
|
|||
nmsg := *msg
|
||||
nmsg.Args = []string{"SET", key, id, "OBJECT", json}
|
||||
// SET key id OBJECT json
|
||||
return c.cmdSet(&nmsg, false)
|
||||
return s.cmdSet(&nmsg, false)
|
||||
}
|
||||
if createcol {
|
||||
c.setCol(key, col)
|
||||
s.setCol(key, col)
|
||||
}
|
||||
|
||||
d.key = key
|
||||
|
@ -305,7 +305,7 @@ func (c *Server) cmdJset(msg *Message) (res resp.Value, d commandDetails, err er
|
|||
d.timestamp = time.Now()
|
||||
d.updated = true
|
||||
|
||||
c.clearIDExpires(key, id)
|
||||
s.clearIDExpires(key, id)
|
||||
col.Set(d.id, d.obj, nil, nil)
|
||||
switch msg.OutputType {
|
||||
case JSON:
|
||||
|
@ -319,7 +319,7 @@ func (c *Server) cmdJset(msg *Message) (res resp.Value, d commandDetails, err er
|
|||
return NOMessage, d, nil
|
||||
}
|
||||
|
||||
func (c *Server) cmdJdel(msg *Message) (res resp.Value, d commandDetails, err error) {
|
||||
func (s *Server) cmdJdel(msg *Message) (res resp.Value, d commandDetails, err error) {
|
||||
start := time.Now()
|
||||
|
||||
if len(msg.Args) != 4 {
|
||||
|
@ -329,7 +329,7 @@ func (c *Server) cmdJdel(msg *Message) (res resp.Value, d commandDetails, err er
|
|||
id := msg.Args[2]
|
||||
path := msg.Args[3]
|
||||
|
||||
col := c.getCol(key)
|
||||
col := s.getCol(key)
|
||||
if col == nil {
|
||||
if msg.OutputType == RESP {
|
||||
return resp.IntegerValue(0), d, nil
|
||||
|
@ -362,7 +362,7 @@ func (c *Server) cmdJdel(msg *Message) (res resp.Value, d commandDetails, err er
|
|||
nmsg := *msg
|
||||
nmsg.Args = []string{"SET", key, id, "OBJECT", json}
|
||||
// SET key id OBJECT json
|
||||
return c.cmdSet(&nmsg, false)
|
||||
return s.cmdSet(&nmsg, false)
|
||||
}
|
||||
|
||||
d.key = key
|
||||
|
@ -371,7 +371,7 @@ func (c *Server) cmdJdel(msg *Message) (res resp.Value, d commandDetails, err er
|
|||
d.timestamp = time.Now()
|
||||
d.updated = true
|
||||
|
||||
c.clearIDExpires(d.key, d.id)
|
||||
s.clearIDExpires(d.key, d.id)
|
||||
col.Set(d.id, d.obj, nil, nil)
|
||||
switch msg.OutputType {
|
||||
case JSON:
|
||||
|
|
|
@ -21,7 +21,7 @@ func BenchmarkJSONMarshal(t *testing.B) {
|
|||
|
||||
func TestIsJsonNumber(t *testing.T) {
|
||||
test := func(expected bool, val string) {
|
||||
actual := isJsonNumber(val)
|
||||
actual := isJSONNumber(val)
|
||||
if expected != actual {
|
||||
t.Fatalf("Expected %t == isJsonNumber(\"%s\") but was %t", expected, val, actual)
|
||||
}
|
||||
|
@ -43,4 +43,4 @@ func TestIsJsonNumber(t *testing.T) {
|
|||
test(true, "1.0e10")
|
||||
test(true, "1E+5")
|
||||
test(true, "1E-10")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,7 +9,7 @@ import (
|
|||
"github.com/tidwall/tile38/internal/glob"
|
||||
)
|
||||
|
||||
func (c *Server) cmdKeys(msg *Message) (res resp.Value, err error) {
|
||||
func (s *Server) cmdKeys(msg *Message) (res resp.Value, err error) {
|
||||
var start = time.Now()
|
||||
vs := msg.Args[1:]
|
||||
|
||||
|
@ -74,17 +74,17 @@ func (c *Server) cmdKeys(msg *Message) (res resp.Value, err error) {
|
|||
// TODO: This can be further optimized by using glob.Parse and limits
|
||||
if pattern == "*" {
|
||||
everything = true
|
||||
c.cols.Scan(iterator)
|
||||
s.cols.Scan(iterator)
|
||||
} else if strings.HasSuffix(pattern, "*") {
|
||||
greaterPivot = pattern[:len(pattern)-1]
|
||||
if glob.IsGlob(greaterPivot) {
|
||||
c.cols.Scan(iterator)
|
||||
s.cols.Scan(iterator)
|
||||
} else {
|
||||
greater = true
|
||||
c.cols.Ascend(greaterPivot, iterator)
|
||||
s.cols.Ascend(greaterPivot, iterator)
|
||||
}
|
||||
} else {
|
||||
c.cols.Scan(iterator)
|
||||
s.cols.Scan(iterator)
|
||||
}
|
||||
if msg.OutputType == JSON {
|
||||
wr.WriteString(`],"elapsed":"` + time.Now().Sub(start).String() + "\"}")
|
||||
|
|
|
@ -7,7 +7,7 @@ import (
|
|||
"github.com/tidwall/resp"
|
||||
)
|
||||
|
||||
func (c *Server) cmdOutput(msg *Message) (res resp.Value, err error) {
|
||||
func (s *Server) cmdOutput(msg *Message) (res resp.Value, err error) {
|
||||
start := time.Now()
|
||||
vs := msg.Args[1:]
|
||||
var arg string
|
||||
|
|
|
@ -34,10 +34,10 @@ func newPubsub() *pubsub {
|
|||
}
|
||||
|
||||
// Publish a message to subscribers
|
||||
func (c *Server) Publish(channel string, message ...string) int {
|
||||
func (s *Server) Publish(channel string, message ...string) int {
|
||||
var msgs []submsg
|
||||
c.pubsub.mu.RLock()
|
||||
if hub := c.pubsub.hubs[pubsubChannel][channel]; hub != nil {
|
||||
s.pubsub.mu.RLock()
|
||||
if hub := s.pubsub.hubs[pubsubChannel][channel]; hub != nil {
|
||||
for target := range hub.targets {
|
||||
for _, message := range message {
|
||||
msgs = append(msgs, submsg{
|
||||
|
@ -49,7 +49,7 @@ func (c *Server) Publish(channel string, message ...string) int {
|
|||
}
|
||||
}
|
||||
}
|
||||
for pattern, hub := range c.pubsub.hubs[pubsubPattern] {
|
||||
for pattern, hub := range s.pubsub.hubs[pubsubPattern] {
|
||||
if match.Match(channel, pattern) {
|
||||
for target := range hub.targets {
|
||||
for _, message := range message {
|
||||
|
@ -64,7 +64,7 @@ func (c *Server) Publish(channel string, message ...string) int {
|
|||
}
|
||||
}
|
||||
}
|
||||
c.pubsub.mu.RUnlock()
|
||||
s.pubsub.mu.RUnlock()
|
||||
|
||||
for _, msg := range msgs {
|
||||
msg.target.cond.L.Lock()
|
||||
|
@ -137,21 +137,21 @@ func (sub liveSubscriptionSwitches) Error() string {
|
|||
return goingLive
|
||||
}
|
||||
|
||||
func (c *Server) cmdSubscribe(msg *Message) (resp.Value, error) {
|
||||
func (s *Server) cmdSubscribe(msg *Message) (resp.Value, error) {
|
||||
if len(msg.Args) < 2 {
|
||||
return resp.Value{}, errInvalidNumberOfArguments
|
||||
}
|
||||
return NOMessage, liveSubscriptionSwitches{}
|
||||
}
|
||||
|
||||
func (c *Server) cmdPsubscribe(msg *Message) (resp.Value, error) {
|
||||
func (s *Server) cmdPsubscribe(msg *Message) (resp.Value, error) {
|
||||
if len(msg.Args) < 2 {
|
||||
return resp.Value{}, errInvalidNumberOfArguments
|
||||
}
|
||||
return NOMessage, liveSubscriptionSwitches{}
|
||||
}
|
||||
|
||||
func (c *Server) cmdPublish(msg *Message) (resp.Value, error) {
|
||||
func (s *Server) cmdPublish(msg *Message) (resp.Value, error) {
|
||||
start := time.Now()
|
||||
if len(msg.Args) != 3 {
|
||||
return resp.Value{}, errInvalidNumberOfArguments
|
||||
|
@ -160,7 +160,7 @@ func (c *Server) cmdPublish(msg *Message) (resp.Value, error) {
|
|||
channel := msg.Args[1]
|
||||
message := msg.Args[2]
|
||||
//geofence := gjson.Valid(message) && gjson.Get(message, "fence").Bool()
|
||||
n := c.Publish(channel, message) //, geofence)
|
||||
n := s.Publish(channel, message) //, geofence)
|
||||
var res resp.Value
|
||||
switch msg.OutputType {
|
||||
case JSON:
|
||||
|
@ -173,7 +173,7 @@ func (c *Server) cmdPublish(msg *Message) (resp.Value, error) {
|
|||
return res, nil
|
||||
}
|
||||
|
||||
func (c *Server) liveSubscription(
|
||||
func (s *Server) liveSubscription(
|
||||
conn net.Conn,
|
||||
rd *PipelineReader,
|
||||
msg *Message,
|
||||
|
@ -280,7 +280,7 @@ func (c *Server) liveSubscription(
|
|||
write(b)
|
||||
}
|
||||
}
|
||||
c.statsTotalMsgsSent.add(1)
|
||||
s.statsTotalMsgsSent.add(1)
|
||||
}
|
||||
|
||||
m := [2]map[string]bool{
|
||||
|
@ -293,7 +293,7 @@ func (c *Server) liveSubscription(
|
|||
defer func() {
|
||||
for i := 0; i < 2; i++ {
|
||||
for channel := range m[i] {
|
||||
c.pubsub.unregister(i, channel, target)
|
||||
s.pubsub.unregister(i, channel, target)
|
||||
}
|
||||
}
|
||||
target.cond.L.Lock()
|
||||
|
@ -354,10 +354,10 @@ func (c *Server) liveSubscription(
|
|||
channel := msg.Args[i]
|
||||
if un {
|
||||
delete(m[kind], channel)
|
||||
c.pubsub.unregister(kind, channel, target)
|
||||
s.pubsub.unregister(kind, channel, target)
|
||||
} else {
|
||||
m[kind][channel] = true
|
||||
c.pubsub.register(kind, channel, target)
|
||||
s.pubsub.register(kind, channel, target)
|
||||
}
|
||||
writeSubscribe(msg.Command(), channel, len(m[0])+len(m[1]))
|
||||
}
|
||||
|
|
|
@ -8,7 +8,7 @@ import (
|
|||
"github.com/tidwall/tile38/internal/log"
|
||||
)
|
||||
|
||||
func (c *Server) cmdReadOnly(msg *Message) (res resp.Value, err error) {
|
||||
func (s *Server) cmdReadOnly(msg *Message) (res resp.Value, err error) {
|
||||
start := time.Now()
|
||||
vs := msg.Args[1:]
|
||||
var arg string
|
||||
|
@ -25,20 +25,20 @@ func (c *Server) cmdReadOnly(msg *Message) (res resp.Value, err error) {
|
|||
default:
|
||||
return NOMessage, errInvalidArgument(arg)
|
||||
case "yes":
|
||||
if !c.config.readOnly() {
|
||||
if !s.config.readOnly() {
|
||||
update = true
|
||||
c.config.setReadOnly(true)
|
||||
s.config.setReadOnly(true)
|
||||
log.Info("read only")
|
||||
}
|
||||
case "no":
|
||||
if c.config.readOnly() {
|
||||
if s.config.readOnly() {
|
||||
update = true
|
||||
c.config.setReadOnly(false)
|
||||
s.config.setReadOnly(false)
|
||||
log.Info("read write")
|
||||
}
|
||||
}
|
||||
if update {
|
||||
c.config.write(false)
|
||||
s.config.write(false)
|
||||
}
|
||||
return OKMessage(msg, start), nil
|
||||
}
|
||||
|
|
|
@ -10,15 +10,15 @@ import (
|
|||
"github.com/tidwall/tile38/internal/glob"
|
||||
)
|
||||
|
||||
func (c *Server) cmdScanArgs(vs []string) (
|
||||
s liveFenceSwitches, err error,
|
||||
func (s *Server) cmdScanArgs(vs []string) (
|
||||
ls liveFenceSwitches, err error,
|
||||
) {
|
||||
var t searchScanBaseTokens
|
||||
vs, t, err = c.parseSearchScanBaseTokens("scan", t, vs)
|
||||
vs, t, err = s.parseSearchScanBaseTokens("scan", t, vs)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
s.searchScanBaseTokens = t
|
||||
ls.searchScanBaseTokens = t
|
||||
if len(vs) != 0 {
|
||||
err = errInvalidNumberOfArguments
|
||||
return
|
||||
|
@ -26,13 +26,13 @@ func (c *Server) cmdScanArgs(vs []string) (
|
|||
return
|
||||
}
|
||||
|
||||
func (c *Server) cmdScan(msg *Message) (res resp.Value, err error) {
|
||||
func (s *Server) cmdScan(msg *Message) (res resp.Value, err error) {
|
||||
start := time.Now()
|
||||
vs := msg.Args[1:]
|
||||
|
||||
s, err := c.cmdScanArgs(vs)
|
||||
if s.usingLua() {
|
||||
defer s.Close()
|
||||
args, err := s.cmdScanArgs(vs)
|
||||
if args.usingLua() {
|
||||
defer args.Close()
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
res = NOMessage
|
||||
|
@ -45,9 +45,10 @@ func (c *Server) cmdScan(msg *Message) (res resp.Value, err error) {
|
|||
return NOMessage, err
|
||||
}
|
||||
wr := &bytes.Buffer{}
|
||||
sw, err := c.newScanWriter(
|
||||
wr, msg, s.key, s.output, s.precision, s.glob, false,
|
||||
s.cursor, s.limit, s.wheres, s.whereins, s.whereevals, s.nofields)
|
||||
sw, err := s.newScanWriter(
|
||||
wr, msg, args.key, args.output, args.precision, args.glob, false,
|
||||
args.cursor, args.limit, args.wheres, args.whereins, args.whereevals,
|
||||
args.nofields)
|
||||
if err != nil {
|
||||
return NOMessage, err
|
||||
}
|
||||
|
@ -58,15 +59,15 @@ func (c *Server) cmdScan(msg *Message) (res resp.Value, err error) {
|
|||
if sw.col != nil {
|
||||
if sw.output == outputCount && len(sw.wheres) == 0 &&
|
||||
len(sw.whereins) == 0 && sw.globEverything == true {
|
||||
count := sw.col.Count() - int(s.cursor)
|
||||
count := sw.col.Count() - int(args.cursor)
|
||||
if count < 0 {
|
||||
count = 0
|
||||
}
|
||||
sw.count = uint64(count)
|
||||
} else {
|
||||
g := glob.Parse(sw.globPattern, s.desc)
|
||||
g := glob.Parse(sw.globPattern, args.desc)
|
||||
if g.Limits[0] == "" && g.Limits[1] == "" {
|
||||
sw.col.Scan(s.desc, sw,
|
||||
sw.col.Scan(args.desc, sw,
|
||||
msg.Deadline,
|
||||
func(id string, o geojson.Object, fields []float64) bool {
|
||||
return sw.writeObject(ScanWriterParams{
|
||||
|
@ -77,7 +78,7 @@ func (c *Server) cmdScan(msg *Message) (res resp.Value, err error) {
|
|||
},
|
||||
)
|
||||
} else {
|
||||
sw.col.ScanRange(g.Limits[0], g.Limits[1], s.desc, sw,
|
||||
sw.col.ScanRange(g.Limits[0], g.Limits[1], args.desc, sw,
|
||||
msg.Deadline,
|
||||
func(id string, o geojson.Object, fields []float64) bool {
|
||||
return sw.writeObject(ScanWriterParams{
|
||||
|
@ -97,20 +98,3 @@ func (c *Server) cmdScan(msg *Message) (res resp.Value, err error) {
|
|||
}
|
||||
return sw.respOut, nil
|
||||
}
|
||||
|
||||
// type tCursor struct {
|
||||
// offset func() uint64
|
||||
// iter func(n uint64)
|
||||
// }
|
||||
|
||||
// func (cursor *tCursor) Offset() uint64 {
|
||||
// return cursor.offset()
|
||||
// }
|
||||
|
||||
// func (cursor *tCursor) Step(n uint64) {
|
||||
// cursor.iter(n)
|
||||
// }
|
||||
|
||||
// func newCursor(offset func() uint64, iter func(n uint64)) *tCursor {
|
||||
// return &tCursor{offset, iter}
|
||||
// }
|
||||
|
|
|
@ -31,7 +31,7 @@ const (
|
|||
|
||||
type scanWriter struct {
|
||||
mu sync.Mutex
|
||||
c *Server
|
||||
s *Server
|
||||
wr *bytes.Buffer
|
||||
msg *Message
|
||||
col *collection.Collection
|
||||
|
@ -72,7 +72,7 @@ type ScanWriterParams struct {
|
|||
skipTesting bool
|
||||
}
|
||||
|
||||
func (c *Server) newScanWriter(
|
||||
func (s *Server) newScanWriter(
|
||||
wr *bytes.Buffer, msg *Message, key string, output outputT,
|
||||
precision uint64, globPattern string, matchValues bool,
|
||||
cursor, limit uint64, wheres []whereT, whereins []whereinT, whereevals []whereevalT, nofields bool,
|
||||
|
@ -92,7 +92,7 @@ func (c *Server) newScanWriter(
|
|||
}
|
||||
}
|
||||
sw := &scanWriter{
|
||||
c: c,
|
||||
s: s,
|
||||
wr: wr,
|
||||
msg: msg,
|
||||
cursor: cursor,
|
||||
|
@ -113,7 +113,7 @@ func (c *Server) newScanWriter(
|
|||
sw.globSingle = true
|
||||
}
|
||||
}
|
||||
sw.col = c.getCol(key)
|
||||
sw.col = s.getCol(key)
|
||||
if sw.col != nil {
|
||||
sw.fmap = sw.col.FieldMap()
|
||||
sw.farr = sw.col.FieldArr()
|
||||
|
|
|
@ -37,16 +37,16 @@ var errTimeout = errors.New("timeout")
|
|||
// Go-routine-safe pool of read-to-go lua states
|
||||
type lStatePool struct {
|
||||
m sync.Mutex
|
||||
c *Server
|
||||
s *Server
|
||||
saved []*lua.LState
|
||||
total int
|
||||
}
|
||||
|
||||
// newPool returns a new pool of lua states
|
||||
func (c *Server) newPool() *lStatePool {
|
||||
func (s *Server) newPool() *lStatePool {
|
||||
pl := &lStatePool{
|
||||
saved: make([]*lua.LState, iniLuaPoolSize),
|
||||
c: c,
|
||||
s: s,
|
||||
}
|
||||
// Fill the pool with some ready handlers
|
||||
for i := 0; i < iniLuaPoolSize; i++ {
|
||||
|
@ -110,7 +110,7 @@ func (pl *lStatePool) New() *lua.LState {
|
|||
call := func(ls *lua.LState) int {
|
||||
evalCmd, args := getArgs(ls)
|
||||
var numRet int
|
||||
if res, err := pl.c.luaTile38Call(evalCmd, args[0], args[1:]...); err != nil {
|
||||
if res, err := pl.s.luaTile38Call(evalCmd, args[0], args[1:]...); err != nil {
|
||||
ls.RaiseError("ERR %s", err.Error())
|
||||
numRet = 0
|
||||
} else {
|
||||
|
@ -121,7 +121,7 @@ func (pl *lStatePool) New() *lua.LState {
|
|||
}
|
||||
pcall := func(ls *lua.LState) int {
|
||||
evalCmd, args := getArgs(ls)
|
||||
if res, err := pl.c.luaTile38Call(evalCmd, args[0], args[1:]...); err != nil {
|
||||
if res, err := pl.s.luaTile38Call(evalCmd, args[0], args[1:]...); err != nil {
|
||||
ls.Push(ConvertToLua(ls, resp.ErrorValue(err)))
|
||||
} else {
|
||||
ls.Push(ConvertToLua(ls, res))
|
||||
|
@ -220,7 +220,7 @@ func (sm *lScriptMap) Flush() {
|
|||
}
|
||||
|
||||
// NewScriptMap returns a new map with lua scripts
|
||||
func (c *Server) newScriptMap() *lScriptMap {
|
||||
func (s *Server) newScriptMap() *lScriptMap {
|
||||
return &lScriptMap{
|
||||
scripts: make(map[string]*lua.FunctionProto),
|
||||
}
|
||||
|
@ -370,7 +370,7 @@ func makeSafeErr(err error) error {
|
|||
}
|
||||
|
||||
// Run eval/evalro/evalna command or it's -sha variant
|
||||
func (c *Server) cmdEvalUnified(scriptIsSha bool, msg *Message) (res resp.Value, err error) {
|
||||
func (s *Server) cmdEvalUnified(scriptIsSha bool, msg *Message) (res resp.Value, err error) {
|
||||
start := time.Now()
|
||||
vs := msg.Args[1:]
|
||||
|
||||
|
@ -390,7 +390,7 @@ func (c *Server) cmdEvalUnified(scriptIsSha bool, msg *Message) (res resp.Value,
|
|||
return
|
||||
}
|
||||
|
||||
luaState, err := c.luapool.Get()
|
||||
luaState, err := s.luapool.Get()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
@ -403,7 +403,7 @@ func (c *Server) cmdEvalUnified(scriptIsSha bool, msg *Message) (res resp.Value,
|
|||
defer luaState.RemoveContext()
|
||||
luaDeadline = lua.LNumber(float64(dlTime.UnixNano()) / 1e9)
|
||||
}
|
||||
defer c.luapool.Put(luaState)
|
||||
defer s.luapool.Put(luaState)
|
||||
|
||||
keysTbl := luaState.CreateTable(int(numkeys), 0)
|
||||
for i = 0; i < numkeys; i++ {
|
||||
|
@ -438,7 +438,7 @@ func (c *Server) cmdEvalUnified(scriptIsSha bool, msg *Message) (res resp.Value,
|
|||
"EVAL_CMD": lua.LString(msg.Command()),
|
||||
})
|
||||
|
||||
compiled, ok := c.luascripts.Get(shaSum)
|
||||
compiled, ok := s.luascripts.Get(shaSum)
|
||||
var fn *lua.LFunction
|
||||
if ok {
|
||||
fn = &lua.LFunction{
|
||||
|
@ -457,7 +457,7 @@ func (c *Server) cmdEvalUnified(scriptIsSha bool, msg *Message) (res resp.Value,
|
|||
if err != nil {
|
||||
return NOMessage, makeSafeErr(err)
|
||||
}
|
||||
c.luascripts.Put(shaSum, fn.Proto)
|
||||
s.luascripts.Put(shaSum, fn.Proto)
|
||||
}
|
||||
luaState.Push(fn)
|
||||
defer luaSetRawGlobals(
|
||||
|
@ -490,7 +490,7 @@ func (c *Server) cmdEvalUnified(scriptIsSha bool, msg *Message) (res resp.Value,
|
|||
return NOMessage, nil
|
||||
}
|
||||
|
||||
func (c *Server) cmdScriptLoad(msg *Message) (resp.Value, error) {
|
||||
func (s *Server) cmdScriptLoad(msg *Message) (resp.Value, error) {
|
||||
start := time.Now()
|
||||
vs := msg.Args[1:]
|
||||
|
||||
|
@ -502,17 +502,17 @@ func (c *Server) cmdScriptLoad(msg *Message) (resp.Value, error) {
|
|||
|
||||
shaSum := Sha1Sum(script)
|
||||
|
||||
luaState, err := c.luapool.Get()
|
||||
luaState, err := s.luapool.Get()
|
||||
if err != nil {
|
||||
return NOMessage, err
|
||||
}
|
||||
defer c.luapool.Put(luaState)
|
||||
defer s.luapool.Put(luaState)
|
||||
|
||||
fn, err := luaState.Load(strings.NewReader(script), "f_"+shaSum)
|
||||
if err != nil {
|
||||
return NOMessage, makeSafeErr(err)
|
||||
}
|
||||
c.luascripts.Put(shaSum, fn.Proto)
|
||||
s.luascripts.Put(shaSum, fn.Proto)
|
||||
|
||||
switch msg.OutputType {
|
||||
case JSON:
|
||||
|
@ -527,7 +527,7 @@ func (c *Server) cmdScriptLoad(msg *Message) (resp.Value, error) {
|
|||
return NOMessage, nil
|
||||
}
|
||||
|
||||
func (c *Server) cmdScriptExists(msg *Message) (resp.Value, error) {
|
||||
func (s *Server) cmdScriptExists(msg *Message) (resp.Value, error) {
|
||||
start := time.Now()
|
||||
vs := msg.Args[1:]
|
||||
|
||||
|
@ -539,7 +539,7 @@ func (c *Server) cmdScriptExists(msg *Message) (resp.Value, error) {
|
|||
if vs, shaSum, ok = tokenval(vs); !ok || shaSum == "" {
|
||||
return NOMessage, errInvalidNumberOfArguments
|
||||
}
|
||||
_, ok = c.luascripts.Get(shaSum)
|
||||
_, ok = s.luascripts.Get(shaSum)
|
||||
if ok {
|
||||
ires = 1
|
||||
} else {
|
||||
|
@ -569,9 +569,9 @@ func (c *Server) cmdScriptExists(msg *Message) (resp.Value, error) {
|
|||
return resp.SimpleStringValue(""), nil
|
||||
}
|
||||
|
||||
func (c *Server) cmdScriptFlush(msg *Message) (resp.Value, error) {
|
||||
func (s *Server) cmdScriptFlush(msg *Message) (resp.Value, error) {
|
||||
start := time.Now()
|
||||
c.luascripts.Flush()
|
||||
s.luascripts.Flush()
|
||||
|
||||
switch msg.OutputType {
|
||||
case JSON:
|
||||
|
@ -585,67 +585,67 @@ func (c *Server) cmdScriptFlush(msg *Message) (resp.Value, error) {
|
|||
return resp.SimpleStringValue(""), nil
|
||||
}
|
||||
|
||||
func (c *Server) commandInScript(msg *Message) (
|
||||
func (s *Server) commandInScript(msg *Message) (
|
||||
res resp.Value, d commandDetails, err error,
|
||||
) {
|
||||
switch msg.Command() {
|
||||
default:
|
||||
err = fmt.Errorf("unknown command '%s'", msg.Args[0])
|
||||
case "set":
|
||||
res, d, err = c.cmdSet(msg, true)
|
||||
res, d, err = s.cmdSet(msg, true)
|
||||
case "fset":
|
||||
res, d, err = c.cmdFset(msg)
|
||||
res, d, err = s.cmdFset(msg)
|
||||
case "del":
|
||||
res, d, err = c.cmdDel(msg)
|
||||
res, d, err = s.cmdDel(msg)
|
||||
case "pdel":
|
||||
res, d, err = c.cmdPdel(msg)
|
||||
res, d, err = s.cmdPdel(msg)
|
||||
case "drop":
|
||||
res, d, err = c.cmdDrop(msg)
|
||||
res, d, err = s.cmdDrop(msg)
|
||||
case "expire":
|
||||
res, d, err = c.cmdExpire(msg)
|
||||
res, d, err = s.cmdExpire(msg)
|
||||
case "rename":
|
||||
res, d, err = c.cmdRename(msg, false)
|
||||
res, d, err = s.cmdRename(msg, false)
|
||||
case "renamenx":
|
||||
res, d, err = c.cmdRename(msg, true)
|
||||
res, d, err = s.cmdRename(msg, true)
|
||||
case "persist":
|
||||
res, d, err = c.cmdPersist(msg)
|
||||
res, d, err = s.cmdPersist(msg)
|
||||
case "ttl":
|
||||
res, err = c.cmdTTL(msg)
|
||||
res, err = s.cmdTTL(msg)
|
||||
case "stats":
|
||||
res, err = c.cmdStats(msg)
|
||||
res, err = s.cmdStats(msg)
|
||||
case "scan":
|
||||
res, err = c.cmdScan(msg)
|
||||
res, err = s.cmdScan(msg)
|
||||
case "nearby":
|
||||
res, err = c.cmdNearby(msg)
|
||||
res, err = s.cmdNearby(msg)
|
||||
case "within":
|
||||
res, err = c.cmdWithin(msg)
|
||||
res, err = s.cmdWithin(msg)
|
||||
case "intersects":
|
||||
res, err = c.cmdIntersects(msg)
|
||||
res, err = s.cmdIntersects(msg)
|
||||
case "search":
|
||||
res, err = c.cmdSearch(msg)
|
||||
res, err = s.cmdSearch(msg)
|
||||
case "bounds":
|
||||
res, err = c.cmdBounds(msg)
|
||||
res, err = s.cmdBounds(msg)
|
||||
case "get":
|
||||
res, err = c.cmdGet(msg)
|
||||
res, err = s.cmdGet(msg)
|
||||
case "jget":
|
||||
res, err = c.cmdJget(msg)
|
||||
res, err = s.cmdJget(msg)
|
||||
case "jset":
|
||||
res, d, err = c.cmdJset(msg)
|
||||
res, d, err = s.cmdJset(msg)
|
||||
case "jdel":
|
||||
res, d, err = c.cmdJdel(msg)
|
||||
res, d, err = s.cmdJdel(msg)
|
||||
case "type":
|
||||
res, err = c.cmdType(msg)
|
||||
res, err = s.cmdType(msg)
|
||||
case "keys":
|
||||
res, err = c.cmdKeys(msg)
|
||||
res, err = s.cmdKeys(msg)
|
||||
case "test":
|
||||
res, err = c.cmdTest(msg)
|
||||
res, err = s.cmdTest(msg)
|
||||
case "server":
|
||||
res, err = c.cmdServer(msg)
|
||||
res, err = s.cmdServer(msg)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Server) luaTile38Call(evalcmd string, cmd string, args ...string) (resp.Value, error) {
|
||||
func (s *Server) luaTile38Call(evalcmd string, cmd string, args ...string) (resp.Value, error) {
|
||||
msg := &Message{}
|
||||
msg.OutputType = RESP
|
||||
msg.Args = append([]string{cmd}, args...)
|
||||
|
@ -668,18 +668,18 @@ func (c *Server) luaTile38Call(evalcmd string, cmd string, args ...string) (resp
|
|||
|
||||
switch evalcmd {
|
||||
case "eval", "evalsha":
|
||||
return c.luaTile38AtomicRW(msg)
|
||||
return s.luaTile38AtomicRW(msg)
|
||||
case "evalro", "evalrosha":
|
||||
return c.luaTile38AtomicRO(msg)
|
||||
return s.luaTile38AtomicRO(msg)
|
||||
case "evalna", "evalnasha":
|
||||
return c.luaTile38NonAtomic(msg)
|
||||
return s.luaTile38NonAtomic(msg)
|
||||
}
|
||||
|
||||
return resp.NullValue(), errCmdNotSupported
|
||||
}
|
||||
|
||||
// The eval command has already got the lock. No locking on the call from within the script.
|
||||
func (c *Server) luaTile38AtomicRW(msg *Message) (resp.Value, error) {
|
||||
func (s *Server) luaTile38AtomicRW(msg *Message) (resp.Value, error) {
|
||||
var write bool
|
||||
|
||||
switch msg.Command() {
|
||||
|
@ -689,16 +689,16 @@ func (c *Server) luaTile38AtomicRW(msg *Message) (resp.Value, error) {
|
|||
"rename", "renamenx":
|
||||
// write operations
|
||||
write = true
|
||||
if c.config.followHost() != "" {
|
||||
if s.config.followHost() != "" {
|
||||
return resp.NullValue(), errNotLeader
|
||||
}
|
||||
if c.config.readOnly() {
|
||||
if s.config.readOnly() {
|
||||
return resp.NullValue(), errReadOnly
|
||||
}
|
||||
case "get", "keys", "scan", "nearby", "within", "intersects", "hooks", "search",
|
||||
"ttl", "bounds", "server", "info", "type", "jget", "test":
|
||||
// read operations
|
||||
if c.config.followHost() != "" && !c.fcuponce {
|
||||
if s.config.followHost() != "" && !s.fcuponce {
|
||||
return resp.NullValue(), errCatchingUp
|
||||
}
|
||||
}
|
||||
|
@ -723,14 +723,14 @@ func (c *Server) luaTile38AtomicRW(msg *Message) (resp.Value, error) {
|
|||
}
|
||||
}()
|
||||
}
|
||||
return c.commandInScript(msg)
|
||||
return s.commandInScript(msg)
|
||||
}()
|
||||
if err != nil {
|
||||
return resp.NullValue(), err
|
||||
}
|
||||
|
||||
if write {
|
||||
if err := c.writeAOF(msg.Args, &d); err != nil {
|
||||
if err := s.writeAOF(msg.Args, &d); err != nil {
|
||||
return resp.NullValue(), err
|
||||
}
|
||||
}
|
||||
|
@ -738,7 +738,7 @@ func (c *Server) luaTile38AtomicRW(msg *Message) (resp.Value, error) {
|
|||
return res, nil
|
||||
}
|
||||
|
||||
func (c *Server) luaTile38AtomicRO(msg *Message) (resp.Value, error) {
|
||||
func (s *Server) luaTile38AtomicRO(msg *Message) (resp.Value, error) {
|
||||
switch msg.Command() {
|
||||
default:
|
||||
return resp.NullValue(), errCmdNotSupported
|
||||
|
@ -751,7 +751,7 @@ func (c *Server) luaTile38AtomicRO(msg *Message) (resp.Value, error) {
|
|||
case "get", "keys", "scan", "nearby", "within", "intersects", "hooks", "search",
|
||||
"ttl", "bounds", "server", "info", "type", "jget", "test":
|
||||
// read operations
|
||||
if c.config.followHost() != "" && !c.fcuponce {
|
||||
if s.config.followHost() != "" && !s.fcuponce {
|
||||
return resp.NullValue(), errCatchingUp
|
||||
}
|
||||
}
|
||||
|
@ -771,7 +771,7 @@ func (c *Server) luaTile38AtomicRO(msg *Message) (resp.Value, error) {
|
|||
}
|
||||
}()
|
||||
}
|
||||
return c.commandInScript(msg)
|
||||
return s.commandInScript(msg)
|
||||
}()
|
||||
if err != nil {
|
||||
return resp.NullValue(), err
|
||||
|
@ -780,7 +780,7 @@ func (c *Server) luaTile38AtomicRO(msg *Message) (resp.Value, error) {
|
|||
return res, nil
|
||||
}
|
||||
|
||||
func (c *Server) luaTile38NonAtomic(msg *Message) (resp.Value, error) {
|
||||
func (s *Server) luaTile38NonAtomic(msg *Message) (resp.Value, error) {
|
||||
var write bool
|
||||
|
||||
// choose the locking strategy
|
||||
|
@ -791,20 +791,20 @@ func (c *Server) luaTile38NonAtomic(msg *Message) (resp.Value, error) {
|
|||
"rename", "renamenx":
|
||||
// write operations
|
||||
write = true
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if c.config.followHost() != "" {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.config.followHost() != "" {
|
||||
return resp.NullValue(), errNotLeader
|
||||
}
|
||||
if c.config.readOnly() {
|
||||
if s.config.readOnly() {
|
||||
return resp.NullValue(), errReadOnly
|
||||
}
|
||||
case "get", "keys", "scan", "nearby", "within", "intersects", "hooks", "search",
|
||||
"ttl", "bounds", "server", "info", "type", "jget", "test":
|
||||
// read operations
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
if c.config.followHost() != "" && !c.fcuponce {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
if s.config.followHost() != "" && !s.fcuponce {
|
||||
return resp.NullValue(), errCatchingUp
|
||||
}
|
||||
}
|
||||
|
@ -829,14 +829,14 @@ func (c *Server) luaTile38NonAtomic(msg *Message) (resp.Value, error) {
|
|||
}
|
||||
}()
|
||||
}
|
||||
return c.commandInScript(msg)
|
||||
return s.commandInScript(msg)
|
||||
}()
|
||||
if err != nil {
|
||||
return resp.NullValue(), err
|
||||
}
|
||||
|
||||
if write {
|
||||
if err := c.writeAOF(msg.Args, &d); err != nil {
|
||||
if err := s.writeAOF(msg.Args, &d); err != nil {
|
||||
return resp.NullValue(), err
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,7 +43,7 @@ func readMemStats() runtime.MemStats {
|
|||
return ms
|
||||
}
|
||||
|
||||
func (c *Server) cmdStats(msg *Message) (res resp.Value, err error) {
|
||||
func (s *Server) cmdStats(msg *Message) (res resp.Value, err error) {
|
||||
start := time.Now()
|
||||
vs := msg.Args[1:]
|
||||
var ms = []map[string]interface{}{}
|
||||
|
@ -59,7 +59,7 @@ func (c *Server) cmdStats(msg *Message) (res resp.Value, err error) {
|
|||
if !ok {
|
||||
break
|
||||
}
|
||||
col := c.getCol(key)
|
||||
col := s.getCol(key)
|
||||
if col != nil {
|
||||
m := make(map[string]interface{})
|
||||
m["num_points"] = col.PointCount()
|
||||
|
@ -95,7 +95,7 @@ func (c *Server) cmdStats(msg *Message) (res resp.Value, err error) {
|
|||
return res, nil
|
||||
}
|
||||
|
||||
func (c *Server) cmdServer(msg *Message) (res resp.Value, err error) {
|
||||
func (s *Server) cmdServer(msg *Message) (res resp.Value, err error) {
|
||||
start := time.Now()
|
||||
m := make(map[string]interface{})
|
||||
args := msg.Args[1:]
|
||||
|
@ -103,10 +103,10 @@ func (c *Server) cmdServer(msg *Message) (res resp.Value, err error) {
|
|||
// Switch on the type of stats requested
|
||||
switch len(args) {
|
||||
case 0:
|
||||
c.basicStats(m)
|
||||
s.basicStats(m)
|
||||
case 1:
|
||||
if strings.ToLower(args[0]) == "ext" {
|
||||
c.extStats(m)
|
||||
s.extStats(m)
|
||||
}
|
||||
default:
|
||||
return NOMessage, errInvalidNumberOfArguments
|
||||
|
@ -127,21 +127,21 @@ func (c *Server) cmdServer(msg *Message) (res resp.Value, err error) {
|
|||
}
|
||||
|
||||
// basicStats populates the passed map with basic system/go/tile38 statistics
|
||||
func (c *Server) basicStats(m map[string]interface{}) {
|
||||
m["id"] = c.config.serverID()
|
||||
if c.config.followHost() != "" {
|
||||
m["following"] = fmt.Sprintf("%s:%d", c.config.followHost(),
|
||||
c.config.followPort())
|
||||
m["caught_up"] = c.fcup
|
||||
m["caught_up_once"] = c.fcuponce
|
||||
func (s *Server) basicStats(m map[string]interface{}) {
|
||||
m["id"] = s.config.serverID()
|
||||
if s.config.followHost() != "" {
|
||||
m["following"] = fmt.Sprintf("%s:%d", s.config.followHost(),
|
||||
s.config.followPort())
|
||||
m["caught_up"] = s.fcup
|
||||
m["caught_up_once"] = s.fcuponce
|
||||
}
|
||||
m["http_transport"] = c.http
|
||||
m["http_transport"] = s.http
|
||||
m["pid"] = os.Getpid()
|
||||
m["aof_size"] = c.aofsz
|
||||
m["num_collections"] = c.cols.Len()
|
||||
m["num_hooks"] = len(c.hooks)
|
||||
m["aof_size"] = s.aofsz
|
||||
m["num_collections"] = s.cols.Len()
|
||||
m["num_hooks"] = len(s.hooks)
|
||||
sz := 0
|
||||
c.cols.Scan(func(key string, value interface{}) bool {
|
||||
s.cols.Scan(func(key string, value interface{}) bool {
|
||||
col := value.(*collection.Collection)
|
||||
sz += col.TotalWeight()
|
||||
return true
|
||||
|
@ -150,7 +150,7 @@ func (c *Server) basicStats(m map[string]interface{}) {
|
|||
points := 0
|
||||
objects := 0
|
||||
strings := 0
|
||||
c.cols.Scan(func(key string, value interface{}) bool {
|
||||
s.cols.Scan(func(key string, value interface{}) bool {
|
||||
col := value.(*collection.Collection)
|
||||
points += col.PointCount()
|
||||
objects += col.Count()
|
||||
|
@ -168,18 +168,18 @@ func (c *Server) basicStats(m map[string]interface{}) {
|
|||
m["mem_alloc"] = mem.Alloc
|
||||
m["heap_size"] = mem.HeapAlloc
|
||||
m["heap_released"] = mem.HeapReleased
|
||||
m["max_heap_size"] = c.config.maxMemory()
|
||||
m["max_heap_size"] = s.config.maxMemory()
|
||||
m["avg_item_size"] = avgsz
|
||||
m["version"] = core.Version
|
||||
m["pointer_size"] = (32 << uintptr(uint64(^uintptr(0))>>63)) / 8
|
||||
m["read_only"] = c.config.readOnly()
|
||||
m["read_only"] = s.config.readOnly()
|
||||
m["cpus"] = runtime.NumCPU()
|
||||
n, _ := runtime.ThreadCreateProfile(nil)
|
||||
m["threads"] = float64(n)
|
||||
}
|
||||
|
||||
// extStats populates the passed map with extended system/go/tile38 statistics
|
||||
func (c *Server) extStats(m map[string]interface{}) {
|
||||
func (s *Server) extStats(m map[string]interface{}) {
|
||||
n, _ := runtime.ThreadCreateProfile(nil)
|
||||
mem := readMemStats()
|
||||
|
||||
|
@ -246,37 +246,37 @@ func (c *Server) extStats(m map[string]interface{}) {
|
|||
// Tile38 Stats
|
||||
|
||||
// ID of the server
|
||||
m["tile38_id"] = c.config.serverID()
|
||||
m["tile38_id"] = s.config.serverID()
|
||||
// The process ID of the server
|
||||
m["tile38_pid"] = os.Getpid()
|
||||
// Version of Tile38 running
|
||||
m["tile38_version"] = core.Version
|
||||
// Maximum heap size allowed
|
||||
m["tile38_max_heap_size"] = c.config.maxMemory()
|
||||
m["tile38_max_heap_size"] = s.config.maxMemory()
|
||||
// Type of instance running
|
||||
if c.config.followHost() == "" {
|
||||
if s.config.followHost() == "" {
|
||||
m["tile38_type"] = "leader"
|
||||
} else {
|
||||
m["tile38_type"] = "follower"
|
||||
}
|
||||
// Whether or not the server is read-only
|
||||
m["tile38_read_only"] = c.config.readOnly()
|
||||
m["tile38_read_only"] = s.config.readOnly()
|
||||
// Size of pointer
|
||||
m["tile38_pointer_size"] = (32 << uintptr(uint64(^uintptr(0))>>63)) / 8
|
||||
// Uptime of the Tile38 server in seconds
|
||||
m["tile38_uptime_in_seconds"] = time.Since(c.started).Seconds()
|
||||
m["tile38_uptime_in_seconds"] = time.Since(s.started).Seconds()
|
||||
// Number of currently connected Tile38 clients
|
||||
c.connsmu.RLock()
|
||||
m["tile38_connected_clients"] = len(c.conns)
|
||||
c.connsmu.RUnlock()
|
||||
s.connsmu.RLock()
|
||||
m["tile38_connected_clients"] = len(s.conns)
|
||||
s.connsmu.RUnlock()
|
||||
// Whether or not a cluster is enabled
|
||||
m["tile38_cluster_enabled"] = false
|
||||
// Whether or not the Tile38 AOF is enabled
|
||||
m["tile38_aof_enabled"] = core.AppendOnly
|
||||
// Whether or not an AOF shrink is currently in progress
|
||||
m["tile38_aof_rewrite_in_progress"] = c.shrinking
|
||||
m["tile38_aof_rewrite_in_progress"] = s.shrinking
|
||||
// Length of time the last AOF shrink took
|
||||
m["tile38_aof_last_rewrite_time_sec"] = c.lastShrinkDuration.get() / int(time.Second)
|
||||
m["tile38_aof_last_rewrite_time_sec"] = s.lastShrinkDuration.get() / int(time.Second)
|
||||
// Duration of the on-going AOF rewrite operation if any
|
||||
var currentShrinkStart time.Time
|
||||
if currentShrinkStart.IsZero() {
|
||||
|
@ -285,24 +285,24 @@ func (c *Server) extStats(m map[string]interface{}) {
|
|||
m["tile38_aof_current_rewrite_time_sec"] = time.Since(currentShrinkStart).Seconds()
|
||||
}
|
||||
// Total size of the AOF in bytes
|
||||
m["tile38_aof_size"] = c.aofsz
|
||||
m["tile38_aof_size"] = s.aofsz
|
||||
// Whether or no the HTTP transport is being served
|
||||
m["tile38_http_transport"] = c.http
|
||||
m["tile38_http_transport"] = s.http
|
||||
// Number of connections accepted by the server
|
||||
m["tile38_total_connections_received"] = c.statsTotalConns.get()
|
||||
m["tile38_total_connections_received"] = s.statsTotalConns.get()
|
||||
// Number of commands processed by the server
|
||||
m["tile38_total_commands_processed"] = c.statsTotalCommands.get()
|
||||
m["tile38_total_commands_processed"] = s.statsTotalCommands.get()
|
||||
// Number of webhook messages sent by server
|
||||
m["tile38_total_messages_sent"] = c.statsTotalMsgsSent.get()
|
||||
m["tile38_total_messages_sent"] = s.statsTotalMsgsSent.get()
|
||||
// Number of key expiration events
|
||||
m["tile38_expired_keys"] = c.statsExpired.get()
|
||||
m["tile38_expired_keys"] = s.statsExpired.get()
|
||||
// Number of connected slaves
|
||||
m["tile38_connected_slaves"] = len(c.aofconnM)
|
||||
m["tile38_connected_slaves"] = len(s.aofconnM)
|
||||
|
||||
points := 0
|
||||
objects := 0
|
||||
strings := 0
|
||||
c.cols.Scan(func(key string, value interface{}) bool {
|
||||
s.cols.Scan(func(key string, value interface{}) bool {
|
||||
col := value.(*collection.Collection)
|
||||
points += col.PointCount()
|
||||
objects += col.Count()
|
||||
|
@ -317,9 +317,9 @@ func (c *Server) extStats(m map[string]interface{}) {
|
|||
// Number of string in the database
|
||||
m["tile38_num_strings"] = strings
|
||||
// Number of collections in the database
|
||||
m["tile38_num_collections"] = c.cols.Len()
|
||||
m["tile38_num_collections"] = s.cols.Len()
|
||||
// Number of hooks in the database
|
||||
m["tile38_num_hooks"] = len(c.hooks)
|
||||
m["tile38_num_hooks"] = len(s.hooks)
|
||||
|
||||
avgsz := 0
|
||||
if points != 0 {
|
||||
|
@ -330,7 +330,7 @@ func (c *Server) extStats(m map[string]interface{}) {
|
|||
m["tile38_avg_point_size"] = avgsz
|
||||
|
||||
sz := 0
|
||||
c.cols.Scan(func(key string, value interface{}) bool {
|
||||
s.cols.Scan(func(key string, value interface{}) bool {
|
||||
col := value.(*collection.Collection)
|
||||
sz += col.TotalWeight()
|
||||
return true
|
||||
|
@ -340,17 +340,17 @@ func (c *Server) extStats(m map[string]interface{}) {
|
|||
m["tile38_in_memory_size"] = sz
|
||||
}
|
||||
|
||||
func (c *Server) writeInfoServer(w *bytes.Buffer) {
|
||||
func (s *Server) writeInfoServer(w *bytes.Buffer) {
|
||||
fmt.Fprintf(w, "tile38_version:%s\r\n", core.Version)
|
||||
fmt.Fprintf(w, "redis_version:%s\r\n", core.Version) // Version of the Redis server
|
||||
fmt.Fprintf(w, "uptime_in_seconds:%d\r\n", int(time.Since(c.started).Seconds())) // Number of seconds since Redis server start
|
||||
fmt.Fprintf(w, "uptime_in_seconds:%d\r\n", int(time.Since(s.started).Seconds())) // Number of seconds since Redis server start
|
||||
}
|
||||
func (c *Server) writeInfoClients(w *bytes.Buffer) {
|
||||
c.connsmu.RLock()
|
||||
fmt.Fprintf(w, "connected_clients:%d\r\n", len(c.conns)) // Number of client connections (excluding connections from slaves)
|
||||
c.connsmu.RUnlock()
|
||||
func (s *Server) writeInfoClients(w *bytes.Buffer) {
|
||||
s.connsmu.RLock()
|
||||
fmt.Fprintf(w, "connected_clients:%d\r\n", len(s.conns)) // Number of client connections (excluding connections from slaves)
|
||||
s.connsmu.RUnlock()
|
||||
}
|
||||
func (c *Server) writeInfoMemory(w *bytes.Buffer) {
|
||||
func (s *Server) writeInfoMemory(w *bytes.Buffer) {
|
||||
mem := readMemStats()
|
||||
fmt.Fprintf(w, "used_memory:%d\r\n", mem.Alloc) // total number of bytes allocated by Redis using its allocator (either standard libc, jemalloc, or an alternative allocator such as tcmalloc
|
||||
}
|
||||
|
@ -360,10 +360,10 @@ func boolInt(t bool) int {
|
|||
}
|
||||
return 0
|
||||
}
|
||||
func (c *Server) writeInfoPersistence(w *bytes.Buffer) {
|
||||
func (s *Server) writeInfoPersistence(w *bytes.Buffer) {
|
||||
fmt.Fprintf(w, "aof_enabled:%d\r\n", boolInt(core.AppendOnly))
|
||||
fmt.Fprintf(w, "aof_rewrite_in_progress:%d\r\n", boolInt(c.shrinking)) // Flag indicating a AOF rewrite operation is on-going
|
||||
fmt.Fprintf(w, "aof_last_rewrite_time_sec:%d\r\n", c.lastShrinkDuration.get()/int(time.Second)) // Duration of the last AOF rewrite operation in seconds
|
||||
fmt.Fprintf(w, "aof_rewrite_in_progress:%d\r\n", boolInt(s.shrinking)) // Flag indicating a AOF rewrite operation is on-going
|
||||
fmt.Fprintf(w, "aof_last_rewrite_time_sec:%d\r\n", s.lastShrinkDuration.get()/int(time.Second)) // Duration of the last AOF rewrite operation in seconds
|
||||
|
||||
var currentShrinkStart time.Time // c.currentShrinkStart.get()
|
||||
if currentShrinkStart.IsZero() {
|
||||
|
@ -373,40 +373,40 @@ func (c *Server) writeInfoPersistence(w *bytes.Buffer) {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *Server) writeInfoStats(w *bytes.Buffer) {
|
||||
fmt.Fprintf(w, "total_connections_received:%d\r\n", c.statsTotalConns.get()) // Total number of connections accepted by the server
|
||||
fmt.Fprintf(w, "total_commands_processed:%d\r\n", c.statsTotalCommands.get()) // Total number of commands processed by the server
|
||||
fmt.Fprintf(w, "total_messages_sent:%d\r\n", c.statsTotalMsgsSent.get()) // Total number of commands processed by the server
|
||||
fmt.Fprintf(w, "expired_keys:%d\r\n", c.statsExpired.get()) // Total number of key expiration events
|
||||
func (s *Server) writeInfoStats(w *bytes.Buffer) {
|
||||
fmt.Fprintf(w, "total_connections_received:%d\r\n", s.statsTotalConns.get()) // Total number of connections accepted by the server
|
||||
fmt.Fprintf(w, "total_commands_processed:%d\r\n", s.statsTotalCommands.get()) // Total number of commands processed by the server
|
||||
fmt.Fprintf(w, "total_messages_sent:%d\r\n", s.statsTotalMsgsSent.get()) // Total number of commands processed by the server
|
||||
fmt.Fprintf(w, "expired_keys:%d\r\n", s.statsExpired.get()) // Total number of key expiration events
|
||||
}
|
||||
|
||||
// writeInfoReplication writes all replication data to the 'info' response
|
||||
func (c *Server) writeInfoReplication(w *bytes.Buffer) {
|
||||
if c.config.followHost() != "" {
|
||||
func (s *Server) writeInfoReplication(w *bytes.Buffer) {
|
||||
if s.config.followHost() != "" {
|
||||
fmt.Fprintf(w, "role:slave\r\n")
|
||||
fmt.Fprintf(w, "master_host:%s\r\n", c.config.followHost())
|
||||
fmt.Fprintf(w, "master_port:%v\r\n", c.config.followPort())
|
||||
fmt.Fprintf(w, "master_host:%s\r\n", s.config.followHost())
|
||||
fmt.Fprintf(w, "master_port:%v\r\n", s.config.followPort())
|
||||
} else {
|
||||
fmt.Fprintf(w, "role:master\r\n")
|
||||
var i int
|
||||
c.connsmu.RLock()
|
||||
for _, cc := range c.conns {
|
||||
s.connsmu.RLock()
|
||||
for _, cc := range s.conns {
|
||||
if cc.replPort != 0 {
|
||||
fmt.Fprintf(w, "slave%v:ip=%s,port=%v,state=online\r\n", i,
|
||||
strings.Split(cc.remoteAddr, ":")[0], cc.replPort)
|
||||
i++
|
||||
}
|
||||
}
|
||||
c.connsmu.RUnlock()
|
||||
s.connsmu.RUnlock()
|
||||
}
|
||||
fmt.Fprintf(w, "connected_slaves:%d\r\n", len(c.aofconnM)) // Number of connected slaves
|
||||
fmt.Fprintf(w, "connected_slaves:%d\r\n", len(s.aofconnM)) // Number of connected slaves
|
||||
}
|
||||
|
||||
func (c *Server) writeInfoCluster(w *bytes.Buffer) {
|
||||
func (s *Server) writeInfoCluster(w *bytes.Buffer) {
|
||||
fmt.Fprintf(w, "cluster_enabled:0\r\n")
|
||||
}
|
||||
|
||||
func (c *Server) cmdInfo(msg *Message) (res resp.Value, err error) {
|
||||
func (s *Server) cmdInfo(msg *Message) (res resp.Value, err error) {
|
||||
start := time.Now()
|
||||
|
||||
sections := []string{"server", "clients", "memory", "persistence", "stats", "replication", "cpu", "cluster", "keyspace"}
|
||||
|
@ -435,28 +435,28 @@ func (c *Server) cmdInfo(msg *Message) (res resp.Value, err error) {
|
|||
continue
|
||||
case "server":
|
||||
w.WriteString("# Server\r\n")
|
||||
c.writeInfoServer(w)
|
||||
s.writeInfoServer(w)
|
||||
case "clients":
|
||||
w.WriteString("# Clients\r\n")
|
||||
c.writeInfoClients(w)
|
||||
s.writeInfoClients(w)
|
||||
case "memory":
|
||||
w.WriteString("# Memory\r\n")
|
||||
c.writeInfoMemory(w)
|
||||
s.writeInfoMemory(w)
|
||||
case "persistence":
|
||||
w.WriteString("# Persistence\r\n")
|
||||
c.writeInfoPersistence(w)
|
||||
s.writeInfoPersistence(w)
|
||||
case "stats":
|
||||
w.WriteString("# Stats\r\n")
|
||||
c.writeInfoStats(w)
|
||||
s.writeInfoStats(w)
|
||||
case "replication":
|
||||
w.WriteString("# Replication\r\n")
|
||||
c.writeInfoReplication(w)
|
||||
s.writeInfoReplication(w)
|
||||
case "cpu":
|
||||
w.WriteString("# CPU\r\n")
|
||||
c.writeInfoCPU(w)
|
||||
s.writeInfoCPU(w)
|
||||
case "cluster":
|
||||
w.WriteString("# Cluster\r\n")
|
||||
c.writeInfoCluster(w)
|
||||
s.writeInfoCluster(w)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -516,13 +516,13 @@ func respValuesSimpleMap(m map[string]interface{}) []resp.Value {
|
|||
return vals
|
||||
}
|
||||
|
||||
func (c *Server) statsCollections(line string) (string, error) {
|
||||
func (s *Server) statsCollections(line string) (string, error) {
|
||||
start := time.Now()
|
||||
var key string
|
||||
var ms = []map[string]interface{}{}
|
||||
for len(line) > 0 {
|
||||
line, key = token(line)
|
||||
col := c.getCol(key)
|
||||
col := s.getCol(key)
|
||||
if col != nil {
|
||||
m := make(map[string]interface{})
|
||||
points := col.PointCount()
|
||||
|
|
|
@ -8,7 +8,7 @@ import (
|
|||
"syscall"
|
||||
)
|
||||
|
||||
func (c *Server) writeInfoCPU(w *bytes.Buffer) {
|
||||
func (s *Server) writeInfoCPU(w *bytes.Buffer) {
|
||||
var selfRu syscall.Rusage
|
||||
var cRu syscall.Rusage
|
||||
syscall.Getrusage(syscall.RUSAGE_SELF, &selfRu)
|
||||
|
|
|
@ -7,7 +7,7 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/yuin/gopher-lua"
|
||||
lua "github.com/yuin/gopher-lua"
|
||||
)
|
||||
|
||||
const defaultSearchOutput = outputObjects
|
||||
|
@ -249,7 +249,7 @@ type searchScanBaseTokens struct {
|
|||
clip bool
|
||||
}
|
||||
|
||||
func (c *Server) parseSearchScanBaseTokens(
|
||||
func (s *Server) parseSearchScanBaseTokens(
|
||||
cmd string, t searchScanBaseTokens, vs []string,
|
||||
) (
|
||||
vsout []string, tout searchScanBaseTokens, err error,
|
||||
|
@ -380,7 +380,7 @@ func (c *Server) parseSearchScanBaseTokens(
|
|||
}
|
||||
|
||||
var luaState *lua.LState
|
||||
luaState, err = c.luapool.Get()
|
||||
luaState, err = s.luapool.Get()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
@ -406,7 +406,7 @@ func (c *Server) parseSearchScanBaseTokens(
|
|||
"ARGV": argsTbl,
|
||||
})
|
||||
|
||||
compiled, ok := c.luascripts.Get(shaSum)
|
||||
compiled, ok := s.luascripts.Get(shaSum)
|
||||
var fn *lua.LFunction
|
||||
if ok {
|
||||
fn = &lua.LFunction{
|
||||
|
@ -426,9 +426,9 @@ func (c *Server) parseSearchScanBaseTokens(
|
|||
err = makeSafeErr(err)
|
||||
return
|
||||
}
|
||||
c.luascripts.Put(shaSum, fn.Proto)
|
||||
s.luascripts.Put(shaSum, fn.Proto)
|
||||
}
|
||||
t.whereevals = append(t.whereevals, whereevalT{c, luaState, fn})
|
||||
t.whereevals = append(t.whereevals, whereevalT{s, luaState, fn})
|
||||
continue
|
||||
case "nofields":
|
||||
vs = nvs
|
||||
|
@ -735,12 +735,12 @@ loop:
|
|||
err = errInvalidArgument(tokenRParen)
|
||||
return
|
||||
}
|
||||
if parent, empty := ps.pop(); empty {
|
||||
parent, empty := ps.pop()
|
||||
if empty {
|
||||
err = errInvalidArgument(tokenRParen)
|
||||
return
|
||||
} else {
|
||||
ae = parent
|
||||
}
|
||||
ae = parent
|
||||
vsout = nvs
|
||||
case tokenNOT:
|
||||
negate = !negate
|
||||
|
@ -762,13 +762,12 @@ loop:
|
|||
if numChildren < 2 {
|
||||
err = errInvalidNumberOfArguments
|
||||
return
|
||||
} else {
|
||||
ae.children = append(
|
||||
ae.children[:numChildren-1],
|
||||
&areaExpression{
|
||||
op: AND,
|
||||
children: []*areaExpression{ae.children[numChildren-1]}})
|
||||
}
|
||||
ae.children = append(
|
||||
ae.children[:numChildren-1],
|
||||
&areaExpression{
|
||||
op: AND,
|
||||
children: []*areaExpression{ae.children[numChildren-1]}})
|
||||
case NOOP:
|
||||
ae.op = AND
|
||||
}
|
||||
|
@ -791,9 +790,8 @@ loop:
|
|||
if len(ae.children) < 2 {
|
||||
err = errInvalidNumberOfArguments
|
||||
return
|
||||
} else {
|
||||
ae = &areaExpression{op: OR, children: []*areaExpression{ae}}
|
||||
}
|
||||
ae = &areaExpression{op: OR, children: []*areaExpression{ae}}
|
||||
case NOOP:
|
||||
ae.op = OR
|
||||
}
|
||||
|
@ -802,20 +800,20 @@ loop:
|
|||
}
|
||||
vsout = nvs
|
||||
case "point", "circle", "object", "bounds", "hash", "quadkey", "tile", "get":
|
||||
if parsedVs, parsedObj, areaErr := s.parseArea(vsout, doClip); areaErr != nil {
|
||||
parsedVs, parsedObj, areaErr := s.parseArea(vsout, doClip)
|
||||
if areaErr != nil {
|
||||
err = areaErr
|
||||
return
|
||||
} else {
|
||||
newExpr := &areaExpression{negate: negate, obj: parsedObj, op: NOOP}
|
||||
negate = false
|
||||
needObj = false
|
||||
if ae == nil {
|
||||
ae = newExpr
|
||||
} else {
|
||||
ae.children = append(ae.children, newExpr)
|
||||
}
|
||||
vsout = parsedVs
|
||||
}
|
||||
newExpr := &areaExpression{negate: negate, obj: parsedObj, op: NOOP}
|
||||
negate = false
|
||||
needObj = false
|
||||
if ae == nil {
|
||||
ae = newExpr
|
||||
} else {
|
||||
ae.children = append(ae.children, newExpr)
|
||||
}
|
||||
vsout = parsedVs
|
||||
default:
|
||||
break loop
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue