tile38/controller/follow.go

231 lines
5.1 KiB
Go

package controller
import (
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"strconv"
"strings"
"time"
"github.com/tidwall/tile38/client"
"github.com/tidwall/tile38/controller/log"
"github.com/tidwall/tile38/core"
)
var errNoLongerFollowing = errors.New("no longer following")
const checksumsz = 512 * 1024
func (c *Controller) cmdFollow(line string) error {
var host, sport string
if line, host = token(line); host == "" {
return errInvalidNumberOfArguments
}
if line, sport = token(line); sport == "" {
return errInvalidNumberOfArguments
}
if line != "" {
return errInvalidNumberOfArguments
}
host = strings.ToLower(host)
sport = strings.ToLower(sport)
var update bool
pconfig := c.config
if host == "no" && sport == "one" {
update = c.config.FollowHost != "" || c.config.FollowPort != 0
c.config.FollowHost = ""
c.config.FollowPort = 0
} else {
n, err := strconv.ParseUint(sport, 10, 64)
if err != nil {
return errInvalidArgument(sport)
}
port := int(n)
update = c.config.FollowHost != host || c.config.FollowPort != port
auth := c.config.LeaderAuth
if update {
c.mu.Unlock()
conn, err := client.DialTimeout(fmt.Sprintf("%s:%d", host, port), time.Second*2)
if err != nil {
c.mu.Lock()
return fmt.Errorf("cannot follow: %v", err)
}
defer conn.Close()
if auth != "" {
if err := c.followDoLeaderAuth(conn, auth); err != nil {
return fmt.Errorf("cannot follow: %v", err)
}
}
msg, err := conn.Server()
if err != nil {
c.mu.Lock()
return fmt.Errorf("cannot follow: %v", err)
}
if msg.Stats.ServerID == c.config.ServerID {
c.mu.Lock()
return fmt.Errorf("cannot follow self")
}
if msg.Stats.Following != "" {
c.mu.Lock()
return fmt.Errorf("cannot follow a follower")
}
c.mu.Lock()
}
c.config.FollowHost = host
c.config.FollowPort = port
}
if err := c.writeConfig(false); err != nil {
c.config = pconfig // revert
return err
}
if update {
c.followc++
if c.config.FollowHost != "" {
log.Infof("following new host '%s' '%s'.", host, sport)
go c.follow(c.config.FollowHost, c.config.FollowPort, c.followc)
} else {
log.Infof("following no one")
}
}
return nil
}
func (c *Controller) followHandleCommand(line string, followc uint64, w io.Writer) (int, error) {
c.mu.Lock()
defer c.mu.Unlock()
// if c.followc != followc {
// return c.aofsz, errNoLongerFollowing
// }
// _, d, err := c.command(line, w)
// if err != nil {
// return c.aofsz, err
// }
// if err := c.writeAOF(line, &d); err != nil {
// return c.aofsz, err
// }
return c.aofsz, nil
}
func (c *Controller) followDoLeaderAuth(conn *client.Conn, auth string) error {
data, err := conn.Do("AUTH " + auth)
if err != nil {
return err
}
var msg client.Standard
if err := json.Unmarshal(data, &msg); err != nil {
return err
}
if !msg.OK {
if msg.Err != "" {
return errors.New(msg.Err)
} else {
return errors.New("cannot follow: auth no ok")
}
}
return nil
}
func (c *Controller) followStep(host string, port int, followc uint64) error {
c.mu.Lock()
if c.followc != followc {
c.mu.Unlock()
return errNoLongerFollowing
}
c.fcup = false
auth := c.config.LeaderAuth
c.mu.Unlock()
addr := fmt.Sprintf("%s:%d", host, port)
// check if we are following self
conn, err := client.DialTimeout(addr, time.Second*2)
if err != nil {
return fmt.Errorf("cannot follow: %v", err)
}
defer conn.Close()
if auth != "" {
if err := c.followDoLeaderAuth(conn, auth); err != nil {
return fmt.Errorf("cannot follow: %v", err)
}
}
stats, err := conn.Server()
if err != nil {
return fmt.Errorf("cannot follow: %v", err)
}
if stats.Stats.ServerID == c.config.ServerID {
return fmt.Errorf("cannot follow self")
}
if stats.Stats.Following != "" {
return fmt.Errorf("cannot follow a follower")
}
// verify checksum
pos, err := c.followCheckSome(addr, followc)
if err != nil {
return err
}
msg, err := conn.Do(fmt.Sprintf("aof %d", pos))
if err != nil {
return err
}
if string(msg) != client.LiveJSON {
var m map[string]interface{}
err := json.Unmarshal(msg, &m)
if err != nil {
return err
}
if errs, ok := m["err"].(string); ok && errs != "" {
return errors.New(errs)
}
return errors.New("invalid response to aof live request")
}
if core.ShowDebugMessages {
log.Debug("follow:", addr, ":read aof")
}
caughtUp := pos >= int64(stats.Stats.AOFSize)
if caughtUp {
c.mu.Lock()
c.fcup = true
c.mu.Unlock()
log.Info("caught up")
}
nullw := ioutil.Discard
rd := NewAOFReader(conn.Reader())
for {
buf, err := rd.ReadCommand()
if err != nil {
return err
}
aofsz, err := c.followHandleCommand(string(buf), followc, nullw)
if err != nil {
return err
}
if !caughtUp {
if aofsz >= stats.Stats.AOFSize {
caughtUp = true
c.mu.Lock()
c.fcup = true
c.mu.Unlock()
log.Info("caught up")
}
}
}
}
func (c *Controller) follow(host string, port int, followc uint64) {
for {
err := c.followStep(host, port, followc)
if err == errNoLongerFollowing {
return
}
if err != nil && err != io.EOF {
log.Error("follow: " + err.Error())
}
time.Sleep(time.Second)
}
}