forked from mirror/redis
feat(pool): add check for badConnection
* fix: badConn check(#2053) * fix: internalpool test * fix: sentinel test * fix: conncheck ut * fix: remove maxBadConnRetries * fix: add connCheck.deadline check Signed-off-by: monkey92t <golang@88.com>
This commit is contained in:
parent
f5fbb367e7
commit
a8a7665ddf
|
@ -0,0 +1,49 @@
|
||||||
|
// +build linux darwin dragonfly freebsd netbsd openbsd solaris illumos
|
||||||
|
|
||||||
|
package pool
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
"net"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
var errUnexpectedRead = errors.New("unexpected read from socket")
|
||||||
|
|
||||||
|
func connCheck(conn net.Conn) error {
|
||||||
|
// Reset previous timeout.
|
||||||
|
_ = conn.SetDeadline(time.Time{})
|
||||||
|
|
||||||
|
sysConn, ok := conn.(syscall.Conn)
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
rawConn, err := sysConn.SyscallConn()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var sysErr error
|
||||||
|
err = rawConn.Read(func(fd uintptr) bool {
|
||||||
|
var buf [1]byte
|
||||||
|
n, err := syscall.Read(int(fd), buf[:])
|
||||||
|
switch {
|
||||||
|
case n == 0 && err == nil:
|
||||||
|
sysErr = io.EOF
|
||||||
|
case n > 0:
|
||||||
|
sysErr = errUnexpectedRead
|
||||||
|
case err == syscall.EAGAIN || err == syscall.EWOULDBLOCK:
|
||||||
|
sysErr = nil
|
||||||
|
default:
|
||||||
|
sysErr = err
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return sysErr
|
||||||
|
}
|
|
@ -0,0 +1,9 @@
|
||||||
|
// +build !linux,!darwin,!dragonfly,!freebsd,!netbsd,!openbsd,!solaris,!illumos
|
||||||
|
|
||||||
|
package pool
|
||||||
|
|
||||||
|
import "net"
|
||||||
|
|
||||||
|
func connCheck(conn net.Conn) error {
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,48 @@
|
||||||
|
//go:build linux || darwin || dragonfly || freebsd || netbsd || openbsd || solaris || illumos
|
||||||
|
// +build linux darwin dragonfly freebsd netbsd openbsd solaris illumos
|
||||||
|
|
||||||
|
package pool
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net"
|
||||||
|
"net/http/httptest"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
. "github.com/onsi/ginkgo"
|
||||||
|
. "github.com/onsi/gomega"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ = Describe("tests conn_check with real conns", func() {
|
||||||
|
var ts *httptest.Server
|
||||||
|
var conn net.Conn
|
||||||
|
var err error
|
||||||
|
|
||||||
|
BeforeEach(func() {
|
||||||
|
ts = httptest.NewServer(nil)
|
||||||
|
conn, err = net.DialTimeout(ts.Listener.Addr().Network(), ts.Listener.Addr().String(), time.Second)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
AfterEach(func() {
|
||||||
|
ts.Close()
|
||||||
|
})
|
||||||
|
|
||||||
|
It("good conn check", func() {
|
||||||
|
Expect(connCheck(conn)).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
Expect(conn.Close()).NotTo(HaveOccurred())
|
||||||
|
Expect(connCheck(conn)).To(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("bad conn check", func() {
|
||||||
|
Expect(conn.Close()).NotTo(HaveOccurred())
|
||||||
|
Expect(connCheck(conn)).To(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("check conn deadline", func() {
|
||||||
|
Expect(conn.SetDeadline(time.Now())).NotTo(HaveOccurred())
|
||||||
|
time.Sleep(time.Millisecond * 10)
|
||||||
|
Expect(connCheck(conn)).NotTo(HaveOccurred())
|
||||||
|
Expect(conn.Close()).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
})
|
|
@ -1,9 +1,14 @@
|
||||||
package pool
|
package pool
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"net"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (cn *Conn) SetCreatedAt(tm time.Time) {
|
func (cn *Conn) SetCreatedAt(tm time.Time) {
|
||||||
cn.createdAt = tm
|
cn.createdAt = tm
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cn *Conn) NetConn() net.Conn {
|
||||||
|
return cn.netConn
|
||||||
|
}
|
|
@ -2,9 +2,12 @@ package pool_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
|
"syscall"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
. "github.com/onsi/ginkgo"
|
. "github.com/onsi/ginkgo"
|
||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
|
@ -32,5 +35,87 @@ func perform(n int, cbs ...func(int)) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func dummyDialer(context.Context) (net.Conn, error) {
|
func dummyDialer(context.Context) (net.Conn, error) {
|
||||||
return &net.TCPConn{}, nil
|
// return &net.TCPConn{}, nil
|
||||||
|
return newDummyConn(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newDummyConn() net.Conn {
|
||||||
|
return &dummyConn{
|
||||||
|
rawConn: &dummyRawConn{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ net.Conn = (*dummyConn)(nil)
|
||||||
|
var _ syscall.Conn = (*dummyConn)(nil)
|
||||||
|
|
||||||
|
type dummyConn struct {
|
||||||
|
rawConn *dummyRawConn
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dummyConn) SyscallConn() (syscall.RawConn, error) {
|
||||||
|
return d.rawConn, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var errDummy = fmt.Errorf("dummyConn err")
|
||||||
|
|
||||||
|
func (d *dummyConn) Read(b []byte) (n int, err error) {
|
||||||
|
return 0, errDummy
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dummyConn) Write(b []byte) (n int, err error) {
|
||||||
|
return 0, errDummy
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dummyConn) Close() error {
|
||||||
|
d.rawConn.Close()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dummyConn) LocalAddr() net.Addr {
|
||||||
|
return &net.TCPAddr{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dummyConn) RemoteAddr() net.Addr {
|
||||||
|
return &net.TCPAddr{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dummyConn) SetDeadline(t time.Time) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dummyConn) SetReadDeadline(t time.Time) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dummyConn) SetWriteDeadline(t time.Time) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ syscall.RawConn = (*dummyRawConn)(nil)
|
||||||
|
|
||||||
|
type dummyRawConn struct {
|
||||||
|
closed bool
|
||||||
|
mux sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dummyRawConn) Control(f func(fd uintptr)) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dummyRawConn) Read(f func(fd uintptr) (done bool)) error {
|
||||||
|
d.mux.Lock()
|
||||||
|
defer d.mux.Unlock()
|
||||||
|
if d.closed {
|
||||||
|
return fmt.Errorf("dummyRawConn closed")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dummyRawConn) Write(f func(fd uintptr) (done bool)) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (d *dummyRawConn) Close() {
|
||||||
|
d.mux.Lock()
|
||||||
|
d.closed = true
|
||||||
|
d.mux.Unlock()
|
||||||
}
|
}
|
||||||
|
|
|
@ -542,7 +542,7 @@ func (p *ConnPool) reapStaleConn() *Conn {
|
||||||
|
|
||||||
func (p *ConnPool) isStaleConn(cn *Conn) bool {
|
func (p *ConnPool) isStaleConn(cn *Conn) bool {
|
||||||
if p.opt.IdleTimeout == 0 && p.opt.MaxConnAge == 0 {
|
if p.opt.IdleTimeout == 0 && p.opt.MaxConnAge == 0 {
|
||||||
return false
|
return connCheck(cn.netConn) != nil
|
||||||
}
|
}
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
@ -553,5 +553,5 @@ func (p *ConnPool) isStaleConn(cn *Conn) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
return false
|
return connCheck(cn.netConn) != nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -323,6 +323,8 @@ var _ = Describe("conns reaper", func() {
|
||||||
cn.SetUsedAt(time.Now().Add(-2 * idleTimeout))
|
cn.SetUsedAt(time.Now().Add(-2 * idleTimeout))
|
||||||
case "aged":
|
case "aged":
|
||||||
cn.SetCreatedAt(time.Now().Add(-2 * maxAge))
|
cn.SetCreatedAt(time.Now().Add(-2 * maxAge))
|
||||||
|
case "connCheck":
|
||||||
|
_ = cn.Close()
|
||||||
}
|
}
|
||||||
conns = append(conns, cn)
|
conns = append(conns, cn)
|
||||||
staleConns = append(staleConns, cn)
|
staleConns = append(staleConns, cn)
|
||||||
|
@ -409,6 +411,7 @@ var _ = Describe("conns reaper", func() {
|
||||||
|
|
||||||
assert("idle")
|
assert("idle")
|
||||||
assert("aged")
|
assert("aged")
|
||||||
|
assert("connCheck")
|
||||||
})
|
})
|
||||||
|
|
||||||
var _ = Describe("race", func() {
|
var _ = Describe("race", func() {
|
||||||
|
|
|
@ -87,7 +87,7 @@ var _ = Describe("pool", func() {
|
||||||
client.Pool().Put(ctx, cn)
|
client.Pool().Put(ctx, cn)
|
||||||
|
|
||||||
err = client.Ping(ctx).Err()
|
err = client.Ping(ctx).Err()
|
||||||
Expect(err).To(MatchError("bad connection"))
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
val, err := client.Ping(ctx).Result()
|
val, err := client.Ping(ctx).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
|
@ -192,7 +192,7 @@ var _ = Describe("NewFailoverClusterClient", func() {
|
||||||
err = master.Shutdown(ctx).Err()
|
err = master.Shutdown(ctx).Err()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Eventually(func() error {
|
Eventually(func() error {
|
||||||
return sentinelMaster.Ping(ctx).Err()
|
return master.Ping(ctx).Err()
|
||||||
}, "15s", "100ms").Should(HaveOccurred())
|
}, "15s", "100ms").Should(HaveOccurred())
|
||||||
|
|
||||||
// Check that client picked up new master.
|
// Check that client picked up new master.
|
||||||
|
|
|
@ -142,9 +142,6 @@ var _ = Describe("Tx", func() {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = do()
|
|
||||||
Expect(err).To(MatchError("bad connection"))
|
|
||||||
|
|
||||||
err = do()
|
err = do()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in New Issue