mirror of https://github.com/tidwall/tile38.git
207 lines
4.2 KiB
Go
207 lines
4.2 KiB
Go
package server
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"sync"
|
|
|
|
"github.com/tidwall/redcon"
|
|
"github.com/tidwall/tile38/internal/log"
|
|
)
|
|
|
|
type liveBuffer struct {
|
|
key string
|
|
glob string
|
|
fence *liveFenceSwitches
|
|
details []*commandDetails
|
|
cond *sync.Cond
|
|
}
|
|
|
|
func (s *Server) processLives() {
|
|
s.lcond.L.Lock()
|
|
defer s.lcond.L.Unlock()
|
|
for {
|
|
if s.stopServer.on() {
|
|
return
|
|
}
|
|
for len(s.lstack) > 0 {
|
|
item := s.lstack[0]
|
|
s.lstack = s.lstack[1:]
|
|
if len(s.lstack) == 0 {
|
|
s.lstack = nil
|
|
}
|
|
for lb := range s.lives {
|
|
lb.cond.L.Lock()
|
|
if lb.key != "" && lb.key == item.key {
|
|
lb.details = append(lb.details, item)
|
|
lb.cond.Broadcast()
|
|
}
|
|
lb.cond.L.Unlock()
|
|
}
|
|
}
|
|
s.lcond.Wait()
|
|
}
|
|
}
|
|
|
|
func writeLiveMessage(
|
|
conn net.Conn,
|
|
message []byte,
|
|
wrapRESP bool,
|
|
connType Type, websocket bool,
|
|
) error {
|
|
if len(message) == 0 {
|
|
return nil
|
|
}
|
|
if websocket {
|
|
return WriteWebSocketMessage(conn, message)
|
|
}
|
|
var err error
|
|
switch connType {
|
|
case RESP:
|
|
if wrapRESP {
|
|
_, err = fmt.Fprintf(conn, "$%d\r\n%s\r\n", len(message), string(message))
|
|
} else {
|
|
_, err = conn.Write(message)
|
|
}
|
|
case Native:
|
|
_, err = fmt.Fprintf(conn, "$%d %s\r\n", len(message), string(message))
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (s *Server) goLive(
|
|
inerr error, conn net.Conn, rd *PipelineReader, msg *Message, websocket bool,
|
|
) error {
|
|
addr := conn.RemoteAddr().String()
|
|
log.Info("live " + addr)
|
|
defer func() {
|
|
log.Info("not live " + addr)
|
|
}()
|
|
switch lfs := inerr.(type) {
|
|
default:
|
|
return errors.New("invalid live type switches")
|
|
case liveAOFSwitches:
|
|
return s.liveAOF(lfs.pos, conn, rd, msg)
|
|
case liveSubscriptionSwitches:
|
|
return s.liveSubscription(conn, rd, msg, websocket)
|
|
case liveMonitorSwitches:
|
|
return s.liveMonitor(conn, rd, msg)
|
|
case liveFenceSwitches:
|
|
// fallthrough
|
|
}
|
|
|
|
// everything below is for live geofences
|
|
lb := &liveBuffer{
|
|
cond: sync.NewCond(&sync.Mutex{}),
|
|
}
|
|
var err error
|
|
var sw *scanWriter
|
|
var wr bytes.Buffer
|
|
lfs := inerr.(liveFenceSwitches)
|
|
lb.glob = lfs.glob
|
|
lb.key = lfs.key
|
|
lb.fence = &lfs
|
|
s.mu.RLock()
|
|
sw, err = s.newScanWriter(
|
|
&wr, msg, lfs.key, lfs.output, lfs.precision, lfs.glob, false,
|
|
lfs.cursor, lfs.limit, lfs.wheres, lfs.whereins, lfs.whereevals, lfs.nofields)
|
|
s.mu.RUnlock()
|
|
|
|
// everything below if for live SCAN, NEARBY, WITHIN, INTERSECTS
|
|
if err != nil {
|
|
return err
|
|
}
|
|
s.lcond.L.Lock()
|
|
s.lives[lb] = true
|
|
s.lcond.L.Unlock()
|
|
defer func() {
|
|
s.lcond.L.Lock()
|
|
delete(s.lives, lb)
|
|
s.lcond.L.Unlock()
|
|
conn.Close()
|
|
}()
|
|
|
|
var mustQuit bool
|
|
go func() {
|
|
defer func() {
|
|
lb.cond.L.Lock()
|
|
mustQuit = true
|
|
lb.cond.Broadcast()
|
|
lb.cond.L.Unlock()
|
|
conn.Close()
|
|
}()
|
|
for {
|
|
vs, err := rd.ReadMessages()
|
|
if err != nil {
|
|
if err != io.EOF && !(websocket && err == io.ErrUnexpectedEOF) {
|
|
log.Error(err)
|
|
}
|
|
return
|
|
}
|
|
for _, v := range vs {
|
|
if v == nil {
|
|
continue
|
|
}
|
|
switch v.Command() {
|
|
default:
|
|
log.Error("received a live command that was not QUIT")
|
|
return
|
|
case "quit", "":
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
outputType := msg.OutputType
|
|
connType := msg.ConnType
|
|
if websocket {
|
|
outputType = JSON
|
|
}
|
|
var livemsg []byte
|
|
switch outputType {
|
|
case JSON:
|
|
livemsg = redcon.AppendBulkString(nil, `{"ok":true,"live":true}`)
|
|
case RESP:
|
|
livemsg = redcon.AppendOK(nil)
|
|
}
|
|
if err := writeLiveMessage(conn, livemsg, false, connType, websocket); err != nil {
|
|
return nil // nil return is fine here
|
|
}
|
|
for {
|
|
lb.cond.L.Lock()
|
|
if mustQuit {
|
|
lb.cond.L.Unlock()
|
|
return nil
|
|
}
|
|
for len(lb.details) > 0 {
|
|
details := lb.details[0]
|
|
lb.details = lb.details[1:]
|
|
if len(lb.details) == 0 {
|
|
lb.details = nil
|
|
}
|
|
fence := lb.fence
|
|
lb.cond.L.Unlock()
|
|
var msgs []string
|
|
func() {
|
|
// safely lock the fence because we are outside the main loop
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
msgs = FenceMatch("", sw, fence, nil, details)
|
|
}()
|
|
for _, msg := range msgs {
|
|
if err := writeLiveMessage(conn, []byte(msg), true, connType, websocket); err != nil {
|
|
return nil // nil return is fine here
|
|
}
|
|
}
|
|
s.statsTotalMsgsSent.add(len(msgs))
|
|
lb.cond.L.Lock()
|
|
|
|
}
|
|
lb.cond.Wait()
|
|
lb.cond.L.Unlock()
|
|
}
|
|
}
|