diff --git a/controller/checksum.go b/controller/checksum.go index e9c047dd..bca5100e 100644 --- a/controller/checksum.go +++ b/controller/checksum.go @@ -138,13 +138,13 @@ func getEndOfLastValuePositionInFile(fname string, startPos int64) (int64, error // followCheckSome is not a full checksum. It just "checks some" data. // We will do some various checksums on the leader until we find the correct position to start at. -func (c *Controller) followCheckSome(addr string, followc uint64) (pos int64, err error) { +func (c *Controller) followCheckSome(addr string, followc int) (pos int64, err error) { if core.ShowDebugMessages { log.Debug("follow:", addr, ":check some") } c.mu.Lock() defer c.mu.Unlock() - if c.followc != followc { + if c.followc.get() != followc { return 0, errNoLongerFollowing } if c.aofsz < checksumsz { diff --git a/controller/controller.go b/controller/controller.go index e84ff733..7e658f40 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -78,7 +78,7 @@ type Controller struct { aofsz int dir string config *Config - followc uint64 // counter increases when follow property changes + followc aint // counter increases when follow property changes follows map[*bytes.Buffer]bool fcond *sync.Cond lstack []*commandDetailsT @@ -183,12 +183,10 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http } c.fillExpiresList() if c.config.followHost() != "" { - go c.follow(c.config.followHost(), c.config.followPort(), c.followc) + go c.follow(c.config.followHost(), c.config.followPort(), c.followc.get()) } defer func() { - c.mu.Lock() - c.followc++ // this will force any follow communication to die - c.mu.Unlock() + c.followc.add(1) // this will force any follow communication to die }() go c.processLives() go c.watchMemory() @@ -204,8 +202,8 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http if cc, ok := c.conns[conn]; ok { cc.last = time.Now() } - c.statsTotalCommands.add(1) c.mu.Unlock() + c.statsTotalCommands.add(1) err := c.handleInputCommand(conn, msg, w) if err != nil { if err.Error() == "going live" { diff --git a/controller/follow.go b/controller/follow.go index 901ec9f6..18827323 100644 --- a/controller/follow.go +++ b/controller/follow.go @@ -85,10 +85,10 @@ func (c *Controller) cmdFollow(msg *server.Message) (res string, err error) { } c.config.write(false) if update { - c.followc++ + c.followc.add(1) if c.config.followHost() != "" { log.Infof("following new host '%s' '%s'.", host, sport) - go c.follow(c.config.followHost(), c.config.followPort(), c.followc) + go c.follow(c.config.followHost(), c.config.followPort(), c.followc.get()) } else { log.Infof("following no one") } @@ -112,10 +112,10 @@ func doServer(conn *Conn) (map[string]string, error) { return m, err } -func (c *Controller) followHandleCommand(values []resp.Value, followc uint64, w io.Writer) (int, error) { +func (c *Controller) followHandleCommand(values []resp.Value, followc int, w io.Writer) (int, error) { c.mu.Lock() defer c.mu.Unlock() - if c.followc != followc { + if c.followc.get() != followc { return c.aofsz, errNoLongerFollowing } msg := &server.Message{ @@ -148,12 +148,11 @@ func (c *Controller) followDoLeaderAuth(conn *Conn, auth string) error { return nil } -func (c *Controller) followStep(host string, port int, followc uint64) error { - c.mu.Lock() - if c.followc != followc { - c.mu.Unlock() +func (c *Controller) followStep(host string, port int, followc int) error { + if c.followc.get() != followc { return errNoLongerFollowing } + c.mu.Lock() c.fcup = false auth := c.config.leaderAuth() c.mu.Unlock() @@ -247,7 +246,7 @@ func (c *Controller) followStep(host string, port int, followc uint64) error { } } -func (c *Controller) follow(host string, port int, followc uint64) { +func (c *Controller) follow(host string, port int, followc int) { for { err := c.followStep(host, port, followc) if err == errNoLongerFollowing {