write buffer pooling

This commit is contained in:
Cyrus Katrak 2016-12-11 20:16:54 -08:00
parent 0868951cdb
commit 7ac01aef8b
3 changed files with 72 additions and 6 deletions

View File

@ -83,6 +83,15 @@ type Dialer struct {
// If Jar is nil, cookies are not sent in requests and ignored
// in responses.
Jar http.CookieJar
// WriteBufferPool specifies a pool of buffers to use for write methods. A
// nil value will cause a buffer to be allocated per connection. It is
// recommended to use a buffer pool for applications that have a large number
// of connections and a modest volume of writes. The provided buffer pool
// must not implement a new value instatiator (e.g. Do not implement
// sync.Pool.New()), and must not be shared across connections that have
// different values of WriteBufferSize.
WriteBufferPool BufferPool
}
var errMalformedURL = errors.New("malformed ws or wss URL")
@ -339,6 +348,7 @@ func (d *Dialer) Dial(urlStr string, requestHeader http.Header) (*Conn, *http.Re
}
conn := newConn(netConn, false, d.ReadBufferSize, d.WriteBufferSize)
conn.writePool = d.WriteBufferPool
if err := req.Write(netConn); err != nil {
return nil, nil, err

43
conn.go
View File

@ -109,6 +109,12 @@ type CloseError struct {
Text string
}
// BufferPool represents a pool of buffers.
type BufferPool interface {
Get() interface{}
Put(interface{})
}
func (e *CloseError) Error() string {
s := []byte("websocket: close ")
s = strconv.AppendInt(s, int64(e.Code), 10)
@ -227,6 +233,8 @@ type Conn struct {
// Write fields
mu chan bool // used as mutex to protect write to conn
writeBuf []byte // frame is constructed in this buffer.
writeBufferSize int
writePool BufferPool
writeDeadline time.Time
writer io.WriteCloser // the current writer returned to the application
isWriting bool // for best-effort concurrent write detection
@ -276,7 +284,7 @@ func newConn(conn net.Conn, isServer bool, readBufferSize, writeBufferSize int)
conn: conn,
mu: mu,
readFinal: true,
writeBuf: make([]byte, writeBufferSize+maxFrameHeaderSize),
writeBufferSize: writeBufferSize,
enableWriteCompression: true,
}
c.SetCloseHandler(nil)
@ -441,6 +449,9 @@ func (c *Conn) NextWriter(messageType int) (io.WriteCloser, error) {
frameType: messageType,
pos: maxFrameHeaderSize,
}
if err := c.acquireWriteBuf(); err != nil {
return nil, err
}
c.writer = mw
if c.newCompressionWriter != nil && c.enableWriteCompression && isData(messageType) {
w, err := c.newCompressionWriter(c.writer)
@ -644,6 +655,7 @@ func (w *messageWriter) ReadFrom(r io.Reader) (nn int64, err error) {
}
func (w *messageWriter) Close() error {
defer w.c.releaseWriteBuf()
if w.err != nil {
return w.err
}
@ -666,6 +678,9 @@ func (c *Conn) WriteMessage(messageType int, data []byte) error {
return err
}
mw := messageWriter{c: c, frameType: messageType, pos: maxFrameHeaderSize}
if err := c.acquireWriteBuf(); err != nil {
return err
}
n := copy(c.writeBuf[mw.pos:], data)
mw.pos += n
data = data[n:]
@ -1041,3 +1056,29 @@ func FormatCloseMessage(closeCode int, text string) []byte {
copy(buf[2:], text)
return buf
}
func (c *Conn) acquireWriteBuf() error {
if c.writeBuf != nil {
return nil
}
n := c.writeBufferSize + maxFrameHeaderSize
if c.writePool != nil {
if i := c.writePool.Get(); i != nil {
p, ok := i.([]byte)
if !ok || len(p) != n {
return errors.New("bad value from write buffer pool")
}
c.writeBuf = p
return nil
}
}
c.writeBuf = make([]byte, n)
return nil
}
func (c *Conn) releaseWriteBuf() {
if c.writePool != nil && c.writeBuf != nil {
c.writePool.Put(c.writeBuf)
c.writeBuf = nil
}
}

View File

@ -52,6 +52,15 @@ type Upgrader struct {
// guarantee that compression will be supported. Currently only "no context
// takeover" modes are supported.
EnableCompression bool
// WriteBufferPool specifies a pool of buffers to use for write methods. A
// nil value will cause a buffer to be allocated per connection. It is
// recommended to use a buffer pool for applications that have a large number
// of connections and a modest volume of writes. The provided buffer pool
// must not implement a new value instatiator (e.g. Do not implement
// sync.Pool.New()), and must not be shared across connections that have
// different values of WriteBufferSize.
WriteBufferPool BufferPool
}
func (u *Upgrader) returnError(w http.ResponseWriter, r *http.Request, status int, reason string) (*Conn, error) {
@ -173,6 +182,7 @@ func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeade
}
c := newConn(netConn, true, u.ReadBufferSize, u.WriteBufferSize)
c.writePool = u.WriteBufferPool
c.subprotocol = subprotocol
if compress {
@ -180,6 +190,11 @@ func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeade
c.newDecompressionReader = decompressNoContextTakeover
}
if err = c.acquireWriteBuf(); err != nil {
netConn.Close()
return nil, err
}
defer c.releaseWriteBuf()
p := c.writeBuf[:0]
p = append(p, "HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: "...)
p = append(p, computeAcceptKey(challengeKey)...)