Fix followers not receiving channel messages

This commit fixes a bug where the leader was not propagating to
the followers.

See #468
This commit is contained in:
tidwall 2024-06-26 08:30:50 -07:00
parent 7effd234ac
commit 193bce17c1
4 changed files with 96 additions and 3 deletions

View File

@ -169,16 +169,20 @@ func (s *Server) followHandleCommand(args []string, followc int, w io.Writer) (i
return s.aofsz, errNoLongerFollowing return s.aofsz, errNoLongerFollowing
} }
msg := &Message{Args: args} msg := &Message{Args: args}
_, d, err := s.command(msg, nil) _, d, err := s.command(msg, nil)
if err != nil { if err != nil {
if commandErrIsFatal(err) { if commandErrIsFatal(err) {
return s.aofsz, err return s.aofsz, err
} }
} }
switch msg.Command() {
case "publish":
// Avoid writing these commands to the AOF
default:
if err := s.writeAOF(args, &d); err != nil { if err := s.writeAOF(args, &d); err != nil {
return s.aofsz, err return s.aofsz, err
} }
}
if len(s.aofbuf) > 10240 { if len(s.aofbuf) > 10240 {
s.flushAOF(false) s.flushAOF(false)
} }

View File

@ -0,0 +1,81 @@
package server
import (
"net"
"sync"
"github.com/tidwall/redcon"
)
type pubQueue struct {
cond *sync.Cond
entries []pubQueueEntry // follower publish queue
closed bool
}
type pubQueueEntry struct {
channel string
messages []string
}
func (s *Server) startPublishQueue(wg *sync.WaitGroup) {
defer wg.Done()
var buf []byte
var conns []net.Conn
s.pubq.cond = sync.NewCond(&sync.Mutex{})
s.pubq.cond.L.Lock()
for {
for len(s.pubq.entries) > 0 {
entries := s.pubq.entries
s.pubq.entries = nil
s.pubq.cond.L.Unlock()
// Get follower connections
s.mu.RLock()
for conn := range s.aofconnM {
conns = append(conns, conn)
}
s.mu.RUnlock()
// Buffer the PUBLISH command pipeline
buf = buf[:0]
for _, entry := range entries {
for _, message := range entry.messages {
buf = redcon.AppendArray(buf, 3)
buf = redcon.AppendBulkString(buf, "PUBLISH")
buf = redcon.AppendBulkString(buf, entry.channel)
buf = redcon.AppendBulkString(buf, message)
}
}
// Publish to followers
for i, conn := range conns {
conn.Write(buf)
conns[i] = nil
}
conns = conns[:0]
s.pubq.cond.L.Lock()
}
if s.pubq.closed {
break
}
s.pubq.cond.Wait()
}
s.pubq.cond.L.Unlock()
}
func (s *Server) stopPublishQueue() {
s.pubq.cond.L.Lock()
s.pubq.closed = true
s.pubq.cond.Broadcast()
s.pubq.cond.L.Unlock()
}
func (s *Server) sendPublishQueue(channel string, message ...string) {
s.pubq.cond.L.Lock()
if !s.pubq.closed {
s.pubq.entries = append(s.pubq.entries, pubQueueEntry{
channel: channel,
messages: message,
})
}
s.pubq.cond.Broadcast()
s.pubq.cond.L.Unlock()
}

View File

@ -66,6 +66,7 @@ func (s *Server) Publish(channel string, message ...string) int {
} }
s.pubsub.mu.RUnlock() s.pubsub.mu.RUnlock()
// broadcast to clients
for _, msg := range msgs { for _, msg := range msgs {
msg.target.cond.L.Lock() msg.target.cond.L.Lock()
msg.target.msgs = append(msg.target.msgs, msg) msg.target.msgs = append(msg.target.msgs, msg)
@ -73,6 +74,9 @@ func (s *Server) Publish(channel string, message ...string) int {
msg.target.cond.L.Unlock() msg.target.cond.L.Unlock()
} }
// Broadcast to followers
s.sendPublishQueue(channel, message...)
return len(msgs) return len(msgs)
} }

View File

@ -140,6 +140,7 @@ type Server struct {
fcup bool // follow caught up fcup bool // follow caught up
fcuponce bool // follow caught up once fcuponce bool // follow caught up once
aofconnM map[net.Conn]io.Closer aofconnM map[net.Conn]io.Closer
pubq pubQueue
// lua scripts // lua scripts
luascripts *lScriptMap luascripts *lScriptMap
@ -422,9 +423,12 @@ func Serve(opts Options) error {
go s.backgroundExpiring(&bgwg) go s.backgroundExpiring(&bgwg)
bgwg.Add(1) bgwg.Add(1)
go s.backgroundSyncAOF(&bgwg) go s.backgroundSyncAOF(&bgwg)
bgwg.Add(1)
go s.startPublishQueue(&bgwg)
defer func() { defer func() {
log.Debug("Stopping background routines") log.Debug("Stopping background routines")
// Stop background routines // Stop background routines
s.stopPublishQueue()
s.followc.Add(1) // this will force any follow communication to die s.followc.Add(1) // this will force any follow communication to die
s.stopServer.Store(true) s.stopServer.Store(true)
if mln != nil { if mln != nil {