moved follow counter to atomic

This commit is contained in:
Josh Baker 2017-09-30 07:34:08 -07:00
parent 920dc3adb6
commit 0db57db1f1
3 changed files with 14 additions and 17 deletions

View File

@ -138,13 +138,13 @@ func getEndOfLastValuePositionInFile(fname string, startPos int64) (int64, error
// followCheckSome is not a full checksum. It just "checks some" data. // followCheckSome is not a full checksum. It just "checks some" data.
// We will do some various checksums on the leader until we find the correct position to start at. // We will do some various checksums on the leader until we find the correct position to start at.
func (c *Controller) followCheckSome(addr string, followc uint64) (pos int64, err error) { func (c *Controller) followCheckSome(addr string, followc int) (pos int64, err error) {
if core.ShowDebugMessages { if core.ShowDebugMessages {
log.Debug("follow:", addr, ":check some") log.Debug("follow:", addr, ":check some")
} }
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
if c.followc != followc { if c.followc.get() != followc {
return 0, errNoLongerFollowing return 0, errNoLongerFollowing
} }
if c.aofsz < checksumsz { if c.aofsz < checksumsz {

View File

@ -78,7 +78,7 @@ type Controller struct {
aofsz int aofsz int
dir string dir string
config *Config config *Config
followc uint64 // counter increases when follow property changes followc aint // counter increases when follow property changes
follows map[*bytes.Buffer]bool follows map[*bytes.Buffer]bool
fcond *sync.Cond fcond *sync.Cond
lstack []*commandDetailsT lstack []*commandDetailsT
@ -183,12 +183,10 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http
} }
c.fillExpiresList() c.fillExpiresList()
if c.config.followHost() != "" { if c.config.followHost() != "" {
go c.follow(c.config.followHost(), c.config.followPort(), c.followc) go c.follow(c.config.followHost(), c.config.followPort(), c.followc.get())
} }
defer func() { defer func() {
c.mu.Lock() c.followc.add(1) // this will force any follow communication to die
c.followc++ // this will force any follow communication to die
c.mu.Unlock()
}() }()
go c.processLives() go c.processLives()
go c.watchMemory() go c.watchMemory()
@ -204,8 +202,8 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http
if cc, ok := c.conns[conn]; ok { if cc, ok := c.conns[conn]; ok {
cc.last = time.Now() cc.last = time.Now()
} }
c.statsTotalCommands.add(1)
c.mu.Unlock() c.mu.Unlock()
c.statsTotalCommands.add(1)
err := c.handleInputCommand(conn, msg, w) err := c.handleInputCommand(conn, msg, w)
if err != nil { if err != nil {
if err.Error() == "going live" { if err.Error() == "going live" {

View File

@ -85,10 +85,10 @@ func (c *Controller) cmdFollow(msg *server.Message) (res string, err error) {
} }
c.config.write(false) c.config.write(false)
if update { if update {
c.followc++ c.followc.add(1)
if c.config.followHost() != "" { if c.config.followHost() != "" {
log.Infof("following new host '%s' '%s'.", host, sport) log.Infof("following new host '%s' '%s'.", host, sport)
go c.follow(c.config.followHost(), c.config.followPort(), c.followc) go c.follow(c.config.followHost(), c.config.followPort(), c.followc.get())
} else { } else {
log.Infof("following no one") log.Infof("following no one")
} }
@ -112,10 +112,10 @@ func doServer(conn *Conn) (map[string]string, error) {
return m, err return m, err
} }
func (c *Controller) followHandleCommand(values []resp.Value, followc uint64, w io.Writer) (int, error) { func (c *Controller) followHandleCommand(values []resp.Value, followc int, w io.Writer) (int, error) {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
if c.followc != followc { if c.followc.get() != followc {
return c.aofsz, errNoLongerFollowing return c.aofsz, errNoLongerFollowing
} }
msg := &server.Message{ msg := &server.Message{
@ -148,12 +148,11 @@ func (c *Controller) followDoLeaderAuth(conn *Conn, auth string) error {
return nil return nil
} }
func (c *Controller) followStep(host string, port int, followc uint64) error { func (c *Controller) followStep(host string, port int, followc int) error {
c.mu.Lock() if c.followc.get() != followc {
if c.followc != followc {
c.mu.Unlock()
return errNoLongerFollowing return errNoLongerFollowing
} }
c.mu.Lock()
c.fcup = false c.fcup = false
auth := c.config.leaderAuth() auth := c.config.leaderAuth()
c.mu.Unlock() c.mu.Unlock()
@ -247,7 +246,7 @@ func (c *Controller) followStep(host string, port int, followc uint64) error {
} }
} }
func (c *Controller) follow(host string, port int, followc uint64) { func (c *Controller) follow(host string, port int, followc int) {
for { for {
err := c.followStep(host, port, followc) err := c.followStep(host, port, followc)
if err == errNoLongerFollowing { if err == errNoLongerFollowing {