tile38/controller/follow.go

264 lines
5.7 KiB
Go
Raw Permalink Normal View History

2016-03-05 02:08:16 +03:00
package controller
import (
"errors"
"fmt"
"io"
"io/ioutil"
"strconv"
"strings"
"time"
2016-04-01 02:26:36 +03:00
"github.com/tidwall/resp"
2016-03-06 17:55:00 +03:00
"github.com/tidwall/tile38/controller/log"
2016-04-01 02:26:36 +03:00
"github.com/tidwall/tile38/controller/server"
2016-03-06 17:55:00 +03:00
"github.com/tidwall/tile38/core"
2016-03-05 02:08:16 +03:00
)
var errNoLongerFollowing = errors.New("no longer following")
const checksumsz = 512 * 1024
2016-04-01 02:26:36 +03:00
func (c *Controller) cmdFollow(msg *server.Message) (res string, err error) {
start := time.Now()
vs := msg.Values[1:]
var ok bool
2016-03-05 02:08:16 +03:00
var host, sport string
2016-04-01 02:26:36 +03:00
if vs, host, ok = tokenval(vs); !ok || host == "" {
return "", errInvalidNumberOfArguments
2016-03-05 02:08:16 +03:00
}
2016-04-01 02:26:36 +03:00
if vs, sport, ok = tokenval(vs); !ok || sport == "" {
return "", errInvalidNumberOfArguments
2016-03-05 02:08:16 +03:00
}
2016-04-01 02:26:36 +03:00
if len(vs) != 0 {
return "", errInvalidNumberOfArguments
2016-03-05 02:08:16 +03:00
}
host = strings.ToLower(host)
sport = strings.ToLower(sport)
var update bool
pconfig := c.config
if host == "no" && sport == "one" {
update = c.config.FollowHost != "" || c.config.FollowPort != 0
c.config.FollowHost = ""
c.config.FollowPort = 0
} else {
n, err := strconv.ParseUint(sport, 10, 64)
if err != nil {
2016-04-01 02:26:36 +03:00
return "", errInvalidArgument(sport)
2016-03-05 02:08:16 +03:00
}
port := int(n)
update = c.config.FollowHost != host || c.config.FollowPort != port
2016-03-08 18:35:43 +03:00
auth := c.config.LeaderAuth
2016-03-05 02:08:16 +03:00
if update {
c.mu.Unlock()
2016-04-01 02:26:36 +03:00
conn, err := DialTimeout(fmt.Sprintf("%s:%d", host, port), time.Second*2)
2016-03-05 02:08:16 +03:00
if err != nil {
c.mu.Lock()
2016-04-01 02:26:36 +03:00
return "", fmt.Errorf("cannot follow: %v", err)
2016-03-05 02:08:16 +03:00
}
defer conn.Close()
2016-03-08 18:35:43 +03:00
if auth != "" {
if err := c.followDoLeaderAuth(conn, auth); err != nil {
2016-04-01 02:26:36 +03:00
return "", fmt.Errorf("cannot follow: %v", err)
2016-03-08 18:35:43 +03:00
}
}
2016-04-01 02:26:36 +03:00
m, err := doServer(conn)
2016-03-05 02:08:16 +03:00
if err != nil {
c.mu.Lock()
2016-04-01 02:26:36 +03:00
return "", fmt.Errorf("cannot follow: %v", err)
2016-03-05 02:08:16 +03:00
}
2016-04-01 02:26:36 +03:00
if m["id"] == "" {
2016-03-05 02:08:16 +03:00
c.mu.Lock()
2016-04-01 02:26:36 +03:00
return "", fmt.Errorf("cannot follow: invalid id")
2016-03-05 02:08:16 +03:00
}
2016-04-01 02:26:36 +03:00
if m["id"] == c.config.ServerID {
2016-03-05 02:08:16 +03:00
c.mu.Lock()
2016-04-01 02:26:36 +03:00
return "", fmt.Errorf("cannot follow self")
}
if m["following"] != "" {
c.mu.Lock()
return "", fmt.Errorf("cannot follow a follower")
2016-03-05 02:08:16 +03:00
}
c.mu.Lock()
}
c.config.FollowHost = host
c.config.FollowPort = port
}
2016-03-08 03:37:39 +03:00
if err := c.writeConfig(false); err != nil {
2016-03-05 02:08:16 +03:00
c.config = pconfig // revert
2016-04-01 02:26:36 +03:00
return "", err
2016-03-05 02:08:16 +03:00
}
if update {
c.followc++
if c.config.FollowHost != "" {
log.Infof("following new host '%s' '%s'.", host, sport)
go c.follow(c.config.FollowHost, c.config.FollowPort, c.followc)
} else {
log.Infof("following no one")
}
}
2016-04-01 02:26:36 +03:00
return server.OKMessage(msg, start), nil
}
func doServer(conn *Conn) (map[string]string, error) {
v, err := conn.Do("server")
if err != nil {
return nil, err
}
if v.Error() != nil {
return nil, v.Error()
}
arr := v.Array()
m := make(map[string]string)
for i := 0; i < len(arr)/2; i++ {
m[arr[i*2+0].String()] = arr[i*2+1].String()
}
return m, err
2016-03-05 02:08:16 +03:00
}
2016-04-01 02:26:36 +03:00
func (c *Controller) followHandleCommand(values []resp.Value, followc uint64, w io.Writer) (int, error) {
2016-03-05 02:08:16 +03:00
c.mu.Lock()
defer c.mu.Unlock()
2016-04-01 02:26:36 +03:00
if c.followc != followc {
return c.aofsz, errNoLongerFollowing
}
msg := &server.Message{
Command: strings.ToLower(values[0].String()),
Values: values,
}
_, d, err := c.command(msg, nil)
if err != nil {
if commandErrIsFatal(err) {
return c.aofsz, err
}
}
if err := c.writeAOF(resp.ArrayValue(values), &d); err != nil {
return c.aofsz, err
}
2016-03-05 02:08:16 +03:00
return c.aofsz, nil
}
2016-04-01 02:26:36 +03:00
func (c *Controller) followDoLeaderAuth(conn *Conn, auth string) error {
v, err := conn.Do("auth", auth)
2016-03-08 18:35:43 +03:00
if err != nil {
return err
}
2016-04-01 02:26:36 +03:00
if v.Error() != nil {
return v.Error()
2016-03-08 18:35:43 +03:00
}
2016-04-01 02:26:36 +03:00
if v.String() != "OK" {
return errors.New("cannot follow: auth no ok")
2016-03-08 18:35:43 +03:00
}
return nil
}
2016-03-05 02:08:16 +03:00
func (c *Controller) followStep(host string, port int, followc uint64) error {
c.mu.Lock()
if c.followc != followc {
c.mu.Unlock()
return errNoLongerFollowing
}
c.fcup = false
2016-03-08 18:35:43 +03:00
auth := c.config.LeaderAuth
2016-03-05 02:08:16 +03:00
c.mu.Unlock()
addr := fmt.Sprintf("%s:%d", host, port)
2016-04-01 02:26:36 +03:00
2016-03-05 02:08:16 +03:00
// check if we are following self
2016-04-01 02:26:36 +03:00
conn, err := DialTimeout(addr, time.Second*2)
2016-03-05 02:08:16 +03:00
if err != nil {
return fmt.Errorf("cannot follow: %v", err)
}
defer conn.Close()
2016-03-08 18:35:43 +03:00
if auth != "" {
if err := c.followDoLeaderAuth(conn, auth); err != nil {
return fmt.Errorf("cannot follow: %v", err)
}
}
2016-04-01 02:26:36 +03:00
m, err := doServer(conn)
2016-03-05 02:08:16 +03:00
if err != nil {
return fmt.Errorf("cannot follow: %v", err)
}
2016-04-01 02:26:36 +03:00
if m["id"] == "" {
return fmt.Errorf("cannot follow: invalid id")
}
if m["id"] == c.config.ServerID {
2016-03-05 02:08:16 +03:00
return fmt.Errorf("cannot follow self")
}
2016-04-01 02:26:36 +03:00
if m["following"] != "" {
2016-03-05 02:08:16 +03:00
return fmt.Errorf("cannot follow a follower")
}
2016-04-01 02:26:36 +03:00
2016-03-05 02:08:16 +03:00
// verify checksum
pos, err := c.followCheckSome(addr, followc)
if err != nil {
return err
}
2016-04-01 02:26:36 +03:00
v, err := conn.Do("aof", pos)
2016-03-05 02:08:16 +03:00
if err != nil {
return err
}
2016-04-01 02:26:36 +03:00
if v.Error() != nil {
return v.Error()
}
if v.String() != "OK" {
2016-03-05 02:08:16 +03:00
return errors.New("invalid response to aof live request")
}
2016-03-06 17:55:00 +03:00
if core.ShowDebugMessages {
2016-03-05 02:08:16 +03:00
log.Debug("follow:", addr, ":read aof")
}
2016-04-01 02:26:36 +03:00
aofSize, err := strconv.ParseInt(m["aof_size"], 10, 64)
if err != nil {
return err
}
caughtUp := pos >= aofSize
2016-03-05 02:08:16 +03:00
if caughtUp {
c.mu.Lock()
c.fcup = true
c.mu.Unlock()
log.Info("caught up")
}
nullw := ioutil.Discard
for {
2016-04-01 02:26:36 +03:00
v, telnet, _, err := conn.rd.ReadMultiBulk()
2016-03-05 02:08:16 +03:00
if err != nil {
return err
}
2016-04-01 02:26:36 +03:00
vals := v.Array()
if telnet || v.Type() != resp.Array {
return errors.New("invalid multibulk")
}
2016-04-01 03:58:02 +03:00
2016-04-01 02:26:36 +03:00
aofsz, err := c.followHandleCommand(vals, followc, nullw)
2016-03-05 02:08:16 +03:00
if err != nil {
return err
}
if !caughtUp {
2016-04-01 02:26:36 +03:00
if aofsz >= int(aofSize) {
2016-03-05 02:08:16 +03:00
caughtUp = true
c.mu.Lock()
c.fcup = true
c.mu.Unlock()
log.Info("caught up")
}
}
}
}
func (c *Controller) follow(host string, port int, followc uint64) {
for {
err := c.followStep(host, port, followc)
if err == errNoLongerFollowing {
return
}
if err != nil && err != io.EOF {
2016-03-08 18:35:43 +03:00
log.Error("follow: " + err.Error())
2016-03-05 02:08:16 +03:00
}
time.Sleep(time.Second)
}
}