tile38/internal/server/live.go

207 lines
4.2 KiB
Go
Raw Normal View History

package server
2016-03-05 02:08:16 +03:00
import (
"bytes"
"errors"
2016-04-01 22:46:39 +03:00
"fmt"
2016-03-05 02:08:16 +03:00
"io"
"net"
"sync"
"github.com/tidwall/redcon"
"github.com/tidwall/tile38/internal/log"
2016-03-05 02:08:16 +03:00
)
type liveBuffer struct {
key string
glob string
fence *liveFenceSwitches
2018-11-24 01:53:33 +03:00
details []*commandDetails
2016-03-05 02:08:16 +03:00
cond *sync.Cond
}
2021-12-09 19:24:26 +03:00
func (s *Server) processLives() {
s.lcond.L.Lock()
defer s.lcond.L.Unlock()
2016-03-05 02:08:16 +03:00
for {
2021-12-09 19:24:26 +03:00
if s.stopServer.on() {
2018-11-23 12:14:26 +03:00
return
}
2021-12-09 19:24:26 +03:00
for len(s.lstack) > 0 {
item := s.lstack[0]
s.lstack = s.lstack[1:]
if len(s.lstack) == 0 {
s.lstack = nil
2016-03-05 02:08:16 +03:00
}
2021-12-09 19:24:26 +03:00
for lb := range s.lives {
2016-03-05 02:08:16 +03:00
lb.cond.L.Lock()
if lb.key != "" && lb.key == item.key {
lb.details = append(lb.details, item)
lb.cond.Broadcast()
}
lb.cond.L.Unlock()
}
}
2021-12-09 19:24:26 +03:00
s.lcond.Wait()
2016-03-05 02:08:16 +03:00
}
}
2018-08-14 03:05:30 +03:00
func writeLiveMessage(
conn net.Conn,
message []byte,
wrapRESP bool,
connType Type, websocket bool,
2018-08-14 03:05:30 +03:00
) error {
2016-04-01 22:46:39 +03:00
if len(message) == 0 {
return nil
}
2016-03-05 02:08:16 +03:00
if websocket {
return WriteWebSocketMessage(conn, message)
2016-04-01 22:46:39 +03:00
}
var err error
switch connType {
case RESP:
2016-04-01 22:46:39 +03:00
if wrapRESP {
_, err = fmt.Fprintf(conn, "$%d\r\n%s\r\n", len(message), string(message))
} else {
_, err = conn.Write(message)
}
case Native:
2016-04-05 17:15:13 +03:00
_, err = fmt.Fprintf(conn, "$%d %s\r\n", len(message), string(message))
2016-03-05 02:08:16 +03:00
}
2016-04-01 22:46:39 +03:00
return err
2016-03-05 02:08:16 +03:00
}
2021-12-09 19:24:26 +03:00
func (s *Server) goLive(
inerr error, conn net.Conn, rd *PipelineReader, msg *Message, websocket bool,
) error {
2016-03-05 02:08:16 +03:00
addr := conn.RemoteAddr().String()
log.Info("live " + addr)
defer func() {
log.Info("not live " + addr)
}()
2021-12-09 19:24:26 +03:00
switch lfs := inerr.(type) {
2018-08-14 03:05:30 +03:00
default:
return errors.New("invalid live type switches")
case liveAOFSwitches:
2021-12-09 19:24:26 +03:00
return s.liveAOF(lfs.pos, conn, rd, msg)
2018-08-14 03:05:30 +03:00
case liveSubscriptionSwitches:
2021-12-09 19:24:26 +03:00
return s.liveSubscription(conn, rd, msg, websocket)
2020-08-12 22:38:35 +03:00
case liveMonitorSwitches:
2021-12-09 19:24:26 +03:00
return s.liveMonitor(conn, rd, msg)
2018-08-14 03:05:30 +03:00
case liveFenceSwitches:
// fallthrough
2016-03-05 02:08:16 +03:00
}
2018-08-14 03:05:30 +03:00
// everything below is for live geofences
2016-03-05 02:08:16 +03:00
lb := &liveBuffer{
cond: sync.NewCond(&sync.Mutex{}),
}
var err error
var sw *scanWriter
var wr bytes.Buffer
2021-12-09 19:24:26 +03:00
lfs := inerr.(liveFenceSwitches)
lb.glob = lfs.glob
lb.key = lfs.key
lb.fence = &lfs
s.mu.RLock()
sw, err = s.newScanWriter(
&wr, msg, lfs.key, lfs.output, lfs.precision, lfs.glob, false,
lfs.cursor, lfs.limit, lfs.wheres, lfs.whereins, lfs.whereevals, lfs.nofields)
s.mu.RUnlock()
2018-08-14 03:05:30 +03:00
2016-03-05 02:08:16 +03:00
// everything below if for live SCAN, NEARBY, WITHIN, INTERSECTS
if err != nil {
return err
}
2021-12-09 19:24:26 +03:00
s.lcond.L.Lock()
s.lives[lb] = true
s.lcond.L.Unlock()
2016-03-05 02:08:16 +03:00
defer func() {
2021-12-09 19:24:26 +03:00
s.lcond.L.Lock()
delete(s.lives, lb)
s.lcond.L.Unlock()
2016-03-05 02:08:16 +03:00
conn.Close()
}()
var mustQuit bool
go func() {
defer func() {
lb.cond.L.Lock()
mustQuit = true
lb.cond.Broadcast()
lb.cond.L.Unlock()
conn.Close()
}()
for {
vs, err := rd.ReadMessages()
2016-03-05 02:08:16 +03:00
if err != nil {
2017-01-13 00:22:14 +03:00
if err != io.EOF && !(websocket && err == io.ErrUnexpectedEOF) {
2016-03-05 02:08:16 +03:00
log.Error(err)
}
return
}
for _, v := range vs {
if v == nil {
continue
}
switch v.Command() {
default:
log.Error("received a live command that was not QUIT")
return
case "quit", "":
return
}
2016-03-05 02:08:16 +03:00
}
}
}()
2016-04-01 22:46:39 +03:00
outputType := msg.OutputType
connType := msg.ConnType
if websocket {
outputType = JSON
2016-04-01 22:46:39 +03:00
}
var livemsg []byte
switch outputType {
case JSON:
livemsg = redcon.AppendBulkString(nil, `{"ok":true,"live":true}`)
case RESP:
livemsg = redcon.AppendOK(nil)
2016-04-01 22:46:39 +03:00
}
2018-08-14 03:05:30 +03:00
if err := writeLiveMessage(conn, livemsg, false, connType, websocket); err != nil {
2016-03-05 02:08:16 +03:00
return nil // nil return is fine here
}
for {
lb.cond.L.Lock()
if mustQuit {
lb.cond.L.Unlock()
return nil
}
for len(lb.details) > 0 {
details := lb.details[0]
lb.details = lb.details[1:]
if len(lb.details) == 0 {
lb.details = nil
}
fence := lb.fence
lb.cond.L.Unlock()
2018-08-14 06:27:22 +03:00
var msgs []string
func() {
// safely lock the fence because we are outside the main loop
2021-12-09 19:24:26 +03:00
s.mu.RLock()
defer s.mu.RUnlock()
2018-08-14 06:27:22 +03:00
msgs = FenceMatch("", sw, fence, nil, details)
}()
2016-03-19 17:16:19 +03:00
for _, msg := range msgs {
2018-08-14 03:05:30 +03:00
if err := writeLiveMessage(conn, []byte(msg), true, connType, websocket); err != nil {
2016-03-19 17:16:19 +03:00
return nil // nil return is fine here
2016-03-05 02:08:16 +03:00
}
}
2021-12-09 19:24:26 +03:00
s.statsTotalMsgsSent.add(len(msgs))
2016-03-05 02:08:16 +03:00
lb.cond.L.Lock()
2019-03-14 21:23:23 +03:00
2016-03-05 02:08:16 +03:00
}
lb.cond.Wait()
lb.cond.L.Unlock()
}
}