From 7ac01aef8b1ffbae777e3a3ef692a4ed02716526 Mon Sep 17 00:00:00 2001 From: Cyrus Katrak Date: Sun, 11 Dec 2016 20:16:54 -0800 Subject: [PATCH] write buffer pooling --- client.go | 10 ++++++++++ conn.go | 53 +++++++++++++++++++++++++++++++++++++++++++++++------ server.go | 15 +++++++++++++++ 3 files changed, 72 insertions(+), 6 deletions(-) diff --git a/client.go b/client.go index 78d9328..76e0e39 100644 --- a/client.go +++ b/client.go @@ -83,6 +83,15 @@ type Dialer struct { // If Jar is nil, cookies are not sent in requests and ignored // in responses. Jar http.CookieJar + + // WriteBufferPool specifies a pool of buffers to use for write methods. A + // nil value will cause a buffer to be allocated per connection. It is + // recommended to use a buffer pool for applications that have a large number + // of connections and a modest volume of writes. The provided buffer pool + // must not implement a new value instatiator (e.g. Do not implement + // sync.Pool.New()), and must not be shared across connections that have + // different values of WriteBufferSize. + WriteBufferPool BufferPool } var errMalformedURL = errors.New("malformed ws or wss URL") @@ -339,6 +348,7 @@ func (d *Dialer) Dial(urlStr string, requestHeader http.Header) (*Conn, *http.Re } conn := newConn(netConn, false, d.ReadBufferSize, d.WriteBufferSize) + conn.writePool = d.WriteBufferPool if err := req.Write(netConn); err != nil { return nil, nil, err diff --git a/conn.go b/conn.go index ce7f0a6..8d19cd3 100644 --- a/conn.go +++ b/conn.go @@ -109,6 +109,12 @@ type CloseError struct { Text string } +// BufferPool represents a pool of buffers. +type BufferPool interface { + Get() interface{} + Put(interface{}) +} + func (e *CloseError) Error() string { s := []byte("websocket: close ") s = strconv.AppendInt(s, int64(e.Code), 10) @@ -225,11 +231,13 @@ type Conn struct { subprotocol string // Write fields - mu chan bool // used as mutex to protect write to conn - writeBuf []byte // frame is constructed in this buffer. - writeDeadline time.Time - writer io.WriteCloser // the current writer returned to the application - isWriting bool // for best-effort concurrent write detection + mu chan bool // used as mutex to protect write to conn + writeBuf []byte // frame is constructed in this buffer. + writeBufferSize int + writePool BufferPool + writeDeadline time.Time + writer io.WriteCloser // the current writer returned to the application + isWriting bool // for best-effort concurrent write detection writeErrMu sync.Mutex writeErr error @@ -276,7 +284,7 @@ func newConn(conn net.Conn, isServer bool, readBufferSize, writeBufferSize int) conn: conn, mu: mu, readFinal: true, - writeBuf: make([]byte, writeBufferSize+maxFrameHeaderSize), + writeBufferSize: writeBufferSize, enableWriteCompression: true, } c.SetCloseHandler(nil) @@ -441,6 +449,9 @@ func (c *Conn) NextWriter(messageType int) (io.WriteCloser, error) { frameType: messageType, pos: maxFrameHeaderSize, } + if err := c.acquireWriteBuf(); err != nil { + return nil, err + } c.writer = mw if c.newCompressionWriter != nil && c.enableWriteCompression && isData(messageType) { w, err := c.newCompressionWriter(c.writer) @@ -644,6 +655,7 @@ func (w *messageWriter) ReadFrom(r io.Reader) (nn int64, err error) { } func (w *messageWriter) Close() error { + defer w.c.releaseWriteBuf() if w.err != nil { return w.err } @@ -666,6 +678,9 @@ func (c *Conn) WriteMessage(messageType int, data []byte) error { return err } mw := messageWriter{c: c, frameType: messageType, pos: maxFrameHeaderSize} + if err := c.acquireWriteBuf(); err != nil { + return err + } n := copy(c.writeBuf[mw.pos:], data) mw.pos += n data = data[n:] @@ -1041,3 +1056,29 @@ func FormatCloseMessage(closeCode int, text string) []byte { copy(buf[2:], text) return buf } + +func (c *Conn) acquireWriteBuf() error { + if c.writeBuf != nil { + return nil + } + n := c.writeBufferSize + maxFrameHeaderSize + if c.writePool != nil { + if i := c.writePool.Get(); i != nil { + p, ok := i.([]byte) + if !ok || len(p) != n { + return errors.New("bad value from write buffer pool") + } + c.writeBuf = p + return nil + } + } + c.writeBuf = make([]byte, n) + return nil +} + +func (c *Conn) releaseWriteBuf() { + if c.writePool != nil && c.writeBuf != nil { + c.writePool.Put(c.writeBuf) + c.writeBuf = nil + } +} diff --git a/server.go b/server.go index aaedebd..a6b5345 100644 --- a/server.go +++ b/server.go @@ -52,6 +52,15 @@ type Upgrader struct { // guarantee that compression will be supported. Currently only "no context // takeover" modes are supported. EnableCompression bool + + // WriteBufferPool specifies a pool of buffers to use for write methods. A + // nil value will cause a buffer to be allocated per connection. It is + // recommended to use a buffer pool for applications that have a large number + // of connections and a modest volume of writes. The provided buffer pool + // must not implement a new value instatiator (e.g. Do not implement + // sync.Pool.New()), and must not be shared across connections that have + // different values of WriteBufferSize. + WriteBufferPool BufferPool } func (u *Upgrader) returnError(w http.ResponseWriter, r *http.Request, status int, reason string) (*Conn, error) { @@ -173,6 +182,7 @@ func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeade } c := newConn(netConn, true, u.ReadBufferSize, u.WriteBufferSize) + c.writePool = u.WriteBufferPool c.subprotocol = subprotocol if compress { @@ -180,6 +190,11 @@ func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeade c.newDecompressionReader = decompressNoContextTakeover } + if err = c.acquireWriteBuf(); err != nil { + netConn.Close() + return nil, err + } + defer c.releaseWriteBuf() p := c.writeBuf[:0] p = append(p, "HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: "...) p = append(p, computeAcceptKey(challengeKey)...)