Cleanup buffers manipulation

This commit is contained in:
Vladimir Mihailenco 2018-08-15 11:53:15 +03:00
parent 7c26d1ceb6
commit 5146fb0c57
7 changed files with 111 additions and 132 deletions

View File

@ -1333,19 +1333,19 @@ func (c *ClusterClient) remapCmds(cmds []Cmder, failedCmds map[*clusterNode][]Cm
func (c *ClusterClient) pipelineProcessCmds( func (c *ClusterClient) pipelineProcessCmds(
node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error { ) error {
cn.SetWriteTimeout(c.opt.WriteTimeout) err := cn.WithWriter(c.opt.WriteTimeout, func(wb *proto.WriteBuffer) error {
return writeCmd(wb, cmds...)
err := writeCmd(cn, cmds...) })
if err != nil { if err != nil {
setCmdsErr(cmds, err) setCmdsErr(cmds, err)
failedCmds[node] = cmds failedCmds[node] = cmds
return err return err
} }
// Set read timeout for all commands. err = cn.WithReader(c.opt.ReadTimeout, func(rd proto.Reader) error {
cn.SetReadTimeout(c.opt.ReadTimeout) return c.pipelineReadCmds(rd, cmds, failedCmds)
})
return c.pipelineReadCmds(cn.Rd, cmds, failedCmds) return err
} }
func (c *ClusterClient) pipelineReadCmds( func (c *ClusterClient) pipelineReadCmds(
@ -1476,23 +1476,24 @@ func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
func (c *ClusterClient) txPipelineProcessCmds( func (c *ClusterClient) txPipelineProcessCmds(
node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error { ) error {
cn.SetWriteTimeout(c.opt.WriteTimeout) err := cn.WithWriter(c.opt.WriteTimeout, func(wb *proto.WriteBuffer) error {
if err := txPipelineWriteMulti(cn, cmds); err != nil { return txPipelineWriteMulti(wb, cmds)
})
if err != nil {
setCmdsErr(cmds, err) setCmdsErr(cmds, err)
failedCmds[node] = cmds failedCmds[node] = cmds
return err return err
} }
// Set read timeout for all commands. err = cn.WithReader(c.opt.ReadTimeout, func(rd proto.Reader) error {
cn.SetReadTimeout(c.opt.ReadTimeout) err := c.txPipelineReadQueued(rd, cmds, failedCmds)
err := c.txPipelineReadQueued(cn.Rd, cmds, failedCmds)
if err != nil { if err != nil {
setCmdsErr(cmds, err) setCmdsErr(cmds, err)
return err return err
} }
return pipelineReadCmds(rd, cmds)
return pipelineReadCmds(cn.Rd, cmds) })
return err
} }
func (c *ClusterClient) txPipelineReadQueued( func (c *ClusterClient) txPipelineReadQueued(

View File

@ -9,7 +9,6 @@ import (
"time" "time"
"github.com/go-redis/redis/internal" "github.com/go-redis/redis/internal"
"github.com/go-redis/redis/internal/pool"
"github.com/go-redis/redis/internal/proto" "github.com/go-redis/redis/internal/proto"
"github.com/go-redis/redis/internal/util" "github.com/go-redis/redis/internal/util"
) )
@ -44,17 +43,14 @@ func cmdsFirstErr(cmds []Cmder) error {
return nil return nil
} }
func writeCmd(cn *pool.Conn, cmds ...Cmder) error { func writeCmd(wb *proto.WriteBuffer, cmds ...Cmder) error {
wb := cn.PrepareWriteBuffer()
for _, cmd := range cmds { for _, cmd := range cmds {
err := wb.Append(cmd.Args()) err := wb.Append(cmd.Args())
if err != nil { if err != nil {
return err return err
} }
} }
return nil
err := cn.FlushWriteBuffer(wb)
return err
} }
func cmdString(cmd Cmder, val interface{}) string { func cmdString(cmd Cmder, val interface{}) string {

View File

@ -8,16 +8,21 @@ import (
"github.com/go-redis/redis/internal/proto" "github.com/go-redis/redis/internal/proto"
) )
func makeBuffer() []byte {
const defaulBufSize = 4096
return make([]byte, defaulBufSize)
}
var noDeadline = time.Time{} var noDeadline = time.Time{}
type Conn struct { type Conn struct {
netConn net.Conn netConn net.Conn
Rd proto.Reader buf []byte
rd proto.Reader
rdLocked bool
wb *proto.WriteBuffer wb *proto.WriteBuffer
concurrentReadWrite bool
InitedAt time.Time InitedAt time.Time
pooled bool pooled bool
usedAt atomic.Value usedAt atomic.Value
@ -26,9 +31,9 @@ type Conn struct {
func NewConn(netConn net.Conn) *Conn { func NewConn(netConn net.Conn) *Conn {
cn := &Conn{ cn := &Conn{
netConn: netConn, netConn: netConn,
buf: makeBuffer(),
} }
buf := proto.NewElasticBufReader(netConn) cn.rd = proto.NewReader(proto.NewElasticBufReader(netConn))
cn.Rd = proto.NewReader(buf)
cn.wb = proto.NewWriteBuffer() cn.wb = proto.NewWriteBuffer()
cn.SetUsedAt(time.Now()) cn.SetUsedAt(time.Now())
return cn return cn
@ -44,27 +49,25 @@ func (cn *Conn) SetUsedAt(tm time.Time) {
func (cn *Conn) SetNetConn(netConn net.Conn) { func (cn *Conn) SetNetConn(netConn net.Conn) {
cn.netConn = netConn cn.netConn = netConn
cn.Rd.Reset(netConn) cn.rd.Reset(netConn)
} }
func (cn *Conn) SetReadTimeout(timeout time.Duration) { func (cn *Conn) setReadTimeout(timeout time.Duration) error {
now := time.Now() now := time.Now()
cn.SetUsedAt(now) cn.SetUsedAt(now)
if timeout > 0 { if timeout > 0 {
cn.netConn.SetReadDeadline(now.Add(timeout)) return cn.netConn.SetReadDeadline(now.Add(timeout))
} else {
cn.netConn.SetReadDeadline(noDeadline)
} }
return cn.netConn.SetReadDeadline(noDeadline)
} }
func (cn *Conn) SetWriteTimeout(timeout time.Duration) { func (cn *Conn) setWriteTimeout(timeout time.Duration) error {
now := time.Now() now := time.Now()
cn.SetUsedAt(now) cn.SetUsedAt(now)
if timeout > 0 { if timeout > 0 {
cn.netConn.SetWriteDeadline(now.Add(timeout)) return cn.netConn.SetWriteDeadline(now.Add(timeout))
} else {
cn.netConn.SetWriteDeadline(noDeadline)
} }
return cn.netConn.SetWriteDeadline(noDeadline)
} }
func (cn *Conn) Write(b []byte) (int, error) { func (cn *Conn) Write(b []byte) (int, error) {
@ -75,28 +78,43 @@ func (cn *Conn) RemoteAddr() net.Addr {
return cn.netConn.RemoteAddr() return cn.netConn.RemoteAddr()
} }
func (cn *Conn) EnableConcurrentReadWrite() { func (cn *Conn) LockReaderBuffer() {
cn.concurrentReadWrite = true cn.rdLocked = true
cn.wb.ResetBuffer(make([]byte, 4096)) cn.rd.ResetBuffer(makeBuffer())
} }
func (cn *Conn) PrepareWriteBuffer() *proto.WriteBuffer { func (cn *Conn) WithReader(timeout time.Duration, fn func(rd proto.Reader) error) error {
if cn.concurrentReadWrite { _ = cn.setReadTimeout(timeout)
cn.wb.Reset()
} else {
cn.wb.ResetBuffer(cn.Rd.Buffer())
}
return cn.wb
}
func (cn *Conn) FlushWriteBuffer(wb *proto.WriteBuffer) error { if !cn.rdLocked {
_, err := cn.netConn.Write(wb.Bytes()) cn.rd.ResetBuffer(cn.buf)
if !cn.concurrentReadWrite {
cn.Rd.ResetBuffer(wb.Buffer())
} }
err := fn(cn.rd)
if !cn.rdLocked {
cn.buf = cn.rd.Buffer()
}
return err return err
} }
func (cn *Conn) WithWriter(timeout time.Duration, fn func(wb *proto.WriteBuffer) error) error {
_ = cn.setWriteTimeout(timeout)
cn.wb.ResetBuffer(cn.buf)
firstErr := fn(cn.wb)
_, err := cn.netConn.Write(cn.wb.Bytes())
cn.buf = cn.wb.Buffer()
if err != nil && firstErr == nil {
firstErr = err
}
return firstErr
}
func (cn *Conn) Close() error { func (cn *Conn) Close() error {
return cn.netConn.Close() return cn.netConn.Close()
} }

View File

@ -288,13 +288,6 @@ func (p *ConnPool) popIdle() *Conn {
} }
func (p *ConnPool) Put(cn *Conn) { func (p *ConnPool) Put(cn *Conn) {
buf := cn.Rd.Bytes()
if len(buf) > 0 {
internal.Logf("connection has unread data: %.100q", buf)
p.Remove(cn)
return
}
if !cn.pooled { if !cn.pooled {
p.Remove(cn) p.Remove(cn)
return return

View File

@ -19,7 +19,6 @@ type ElasticBufReader struct {
func NewElasticBufReader(rd io.Reader) *ElasticBufReader { func NewElasticBufReader(rd io.Reader) *ElasticBufReader {
return &ElasticBufReader{ return &ElasticBufReader{
buf: make([]byte, defaultBufSize),
rd: rd, rd: rd,
} }
} }
@ -89,44 +88,6 @@ func (b *ElasticBufReader) readErr() error {
return err return err
} }
func (b *ElasticBufReader) Read(p []byte) (n int, err error) {
n = len(p)
if n == 0 {
return 0, b.readErr()
}
if b.r == b.w {
if b.err != nil {
return 0, b.readErr()
}
if len(p) >= len(b.buf) {
// Large read, empty buffer.
// Read directly into p to avoid copy.
n, b.err = b.rd.Read(p)
if n < 0 {
panic(errNegativeRead)
}
return n, b.readErr()
}
// One read.
// Do not use b.fill, which will loop.
b.r = 0
b.w = 0
n, b.err = b.rd.Read(b.buf)
if n < 0 {
panic(errNegativeRead)
}
if n == 0 {
return 0, b.readErr()
}
b.w += n
}
// copy as much as we can
n = copy(p, b.buf[b.r:b.w])
b.r += n
return n, nil
}
func (b *ElasticBufReader) ReadSlice(delim byte) (line []byte, err error) { func (b *ElasticBufReader) ReadSlice(delim byte) (line []byte, err error) {
for { for {
// Search buffer. // Search buffer.

View File

@ -7,9 +7,10 @@ import (
"github.com/go-redis/redis/internal" "github.com/go-redis/redis/internal"
"github.com/go-redis/redis/internal/pool" "github.com/go-redis/redis/internal/pool"
"github.com/go-redis/redis/internal/proto"
) )
// PubSub implements Pub/Sub commands as described in // PubSub implements Pub/Sub commands bas described in
// http://redis.io/topics/pubsub. Message receiving is NOT safe // http://redis.io/topics/pubsub. Message receiving is NOT safe
// for concurrent use by multiple goroutines. // for concurrent use by multiple goroutines.
// //
@ -62,7 +63,7 @@ func (c *PubSub) _conn(newChannels []string) (*pool.Conn, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
cn.EnableConcurrentReadWrite() cn.LockReaderBuffer()
if err := c.resubscribe(cn); err != nil { if err := c.resubscribe(cn); err != nil {
_ = c.closeConn(cn) _ = c.closeConn(cn)
@ -74,8 +75,9 @@ func (c *PubSub) _conn(newChannels []string) (*pool.Conn, error) {
} }
func (c *PubSub) writeCmd(cn *pool.Conn, cmd Cmder) error { func (c *PubSub) writeCmd(cn *pool.Conn, cmd Cmder) error {
cn.SetWriteTimeout(c.opt.WriteTimeout) return cn.WithWriter(c.opt.WriteTimeout, func(wb *proto.WriteBuffer) error {
return writeCmd(cn, cmd) return writeCmd(wb, cmd)
})
} }
func (c *PubSub) resubscribe(cn *pool.Conn) error { func (c *PubSub) resubscribe(cn *pool.Conn) error {
@ -339,8 +341,10 @@ func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
return nil, err return nil, err
} }
cn.SetReadTimeout(timeout) err = cn.WithReader(timeout, func(rd proto.Reader) error {
err = c.cmd.readReply(cn.Rd) return c.cmd.readReply(rd)
})
c.releaseConn(cn, err, timeout > 0) c.releaseConn(cn, err, timeout > 0)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -156,8 +156,10 @@ func (c *baseClient) defaultProcess(cmd Cmder) error {
return err return err
} }
cn.SetWriteTimeout(c.opt.WriteTimeout) err = cn.WithWriter(c.opt.WriteTimeout, func(wb *proto.WriteBuffer) error {
if err := writeCmd(cn, cmd); err != nil { return writeCmd(wb, cmd)
})
if err != nil {
c.releaseConn(cn, err) c.releaseConn(cn, err)
cmd.setErr(err) cmd.setErr(err)
if internal.IsRetryableError(err, true) { if internal.IsRetryableError(err, true) {
@ -166,8 +168,9 @@ func (c *baseClient) defaultProcess(cmd Cmder) error {
return err return err
} }
cn.SetReadTimeout(c.cmdTimeout(cmd)) err = cn.WithReader(c.cmdTimeout(cmd), func(rd proto.Reader) error {
err = cmd.readReply(cn.Rd) return cmd.readReply(rd)
})
c.releaseConn(cn, err) c.releaseConn(cn, err)
if err != nil && internal.IsRetryableError(err, cmd.readTimeout() == nil) { if err != nil && internal.IsRetryableError(err, cmd.readTimeout() == nil) {
continue continue
@ -256,15 +259,18 @@ func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) e
} }
func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) { func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) {
cn.SetWriteTimeout(c.opt.WriteTimeout) err := cn.WithWriter(c.opt.WriteTimeout, func(wb *proto.WriteBuffer) error {
if err := writeCmd(cn, cmds...); err != nil { return writeCmd(wb, cmds...)
})
if err != nil {
setCmdsErr(cmds, err) setCmdsErr(cmds, err)
return true, err return true, err
} }
// Set read timeout for all commands. err = cn.WithReader(c.opt.ReadTimeout, func(rd proto.Reader) error {
cn.SetReadTimeout(c.opt.ReadTimeout) return pipelineReadCmds(rd, cmds)
return true, pipelineReadCmds(cn.Rd, cmds) })
return true, err
} }
func pipelineReadCmds(rd proto.Reader, cmds []Cmder) error { func pipelineReadCmds(rd proto.Reader, cmds []Cmder) error {
@ -278,34 +284,34 @@ func pipelineReadCmds(rd proto.Reader, cmds []Cmder) error {
} }
func (c *baseClient) txPipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) { func (c *baseClient) txPipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) {
cn.SetWriteTimeout(c.opt.WriteTimeout) err := cn.WithWriter(c.opt.WriteTimeout, func(wb *proto.WriteBuffer) error {
err := txPipelineWriteMulti(cn, cmds) return txPipelineWriteMulti(wb, cmds)
})
if err != nil { if err != nil {
setCmdsErr(cmds, err) setCmdsErr(cmds, err)
return true, err return true, err
} }
// Set read timeout for all commands. err = cn.WithReader(c.opt.ReadTimeout, func(rd proto.Reader) error {
cn.SetReadTimeout(c.opt.ReadTimeout) err := txPipelineReadQueued(rd, cmds)
err = c.txPipelineReadQueued(cn.Rd, cmds)
if err != nil { if err != nil {
setCmdsErr(cmds, err) setCmdsErr(cmds, err)
return false, err return err
} }
return pipelineReadCmds(rd, cmds)
return false, pipelineReadCmds(cn.Rd, cmds) })
return false, err
} }
func txPipelineWriteMulti(cn *pool.Conn, cmds []Cmder) error { func txPipelineWriteMulti(wb *proto.WriteBuffer, cmds []Cmder) error {
multiExec := make([]Cmder, 0, len(cmds)+2) multiExec := make([]Cmder, 0, len(cmds)+2)
multiExec = append(multiExec, NewStatusCmd("MULTI")) multiExec = append(multiExec, NewStatusCmd("MULTI"))
multiExec = append(multiExec, cmds...) multiExec = append(multiExec, cmds...)
multiExec = append(multiExec, NewSliceCmd("EXEC")) multiExec = append(multiExec, NewSliceCmd("EXEC"))
return writeCmd(cn, multiExec...) return writeCmd(wb, multiExec...)
} }
func (c *baseClient) txPipelineReadQueued(rd proto.Reader, cmds []Cmder) error { func txPipelineReadQueued(rd proto.Reader, cmds []Cmder) error {
// Parse queued replies. // Parse queued replies.
var statusCmd StatusCmd var statusCmd StatusCmd
err := statusCmd.readReply(rd) err := statusCmd.readReply(rd)