refactor to calculate wire representation lazily

This commit is contained in:
Alexander Emelin 2017-01-25 02:11:48 +03:00
parent c369553f89
commit cf2ca6925a
2 changed files with 103 additions and 84 deletions

171
conn.go
View File

@ -660,27 +660,13 @@ func (w *messageWriter) Close() error {
return nil return nil
} }
// PreparedMessage allows to prepare message to be sent into connections // netConn is a fake network connection used to get PreparedMessage
// using WritePreparedMessage method. By doing so, you can avoid the overhead // prebuilt payloads.
// of framing the same payload into WebSocket messages multiple times when type netConn struct {
// that same payload is to be sent out on multiple connections - i.e. PUB/SUB bytes.Buffer
// scenarios with many active subscribers.
// This is especially useful when compression is used as permessage compression
// is pretty CPU and memory expensive.
type PreparedMessage struct {
messageType int
compression bool
compressionLevel int
payload []byte
compressedPayload []byte
} }
// netConn is a fake connection to be used to get PreparedMessage prebuilt payloads. func (netConn) Read(p []byte) (int, error) { return 0, nil }
// TODO: this is a simplest solution I've found. Is it hacky? Better to refactor a package in some way?
type netConn struct {
io.Reader
io.Writer
}
// netAddr is a fake net.Addr implementation to be used in netConn. // netAddr is a fake net.Addr implementation to be used in netConn.
type netAddr int type netAddr int
@ -698,92 +684,121 @@ func (c netConn) SetWriteDeadline(t time.Time) error { return nil }
var ( var (
preparingServerConnPool = sync.Pool{New: func() interface{} { preparingServerConnPool = sync.Pool{New: func() interface{} {
var buf bytes.Buffer var buf bytes.Buffer
return newConn(&netConn{Reader: nil, Writer: &buf}, true, 0, 0) return newConn(&netConn{Buffer: buf}, true, 0, 0)
}} }}
preparingClientConnPool = sync.Pool{New: func() interface{} { preparingClientConnPool = sync.Pool{New: func() interface{} {
var buf bytes.Buffer var buf bytes.Buffer
return newConn(&netConn{Reader: nil, Writer: &buf}, false, 0, 0) return newConn(&netConn{Buffer: buf}, false, 0, 0)
}} }}
) )
// NewPreparedMessage returns ready to use PreparedMessage with uncompressed (always) // PreparedMessage caches on the wire representations of a message payload.
// and compressed (only if compression flag is true) prebuilt payloads. // Use PreparedMessage to efficiently send a message payload to multiple
// TODO: client or server message? Options as last argument (with compression level only at moment). // connections. PreparedMessage is especially useful when compression
func NewPreparedMessage(messageType int, data []byte, compression bool, compressionLevel int) (*PreparedMessage, error) { // is used because the CPU and memory expensive compression operation
m := &PreparedMessage{messageType: messageType} // can be executed once for a given set of compression options.
type PreparedMessage struct {
frameType int
data []byte
mu sync.Mutex
frames map[frameKey]*preparedFrame
}
c := preparingServerConnPool.Get().(*Conn) // frameKey defines a unique set of options to cache prepared frames in PreparedMessage.
defer func() { type frameKey struct {
c.conn.(*netConn).Writer.(*bytes.Buffer).Reset() isServer bool
c.enableWriteCompression = false compress bool
c.newCompressionWriter = nil compressionLevel int
preparingServerConnPool.Put(c) }
}()
w, err := c.NextWriter(messageType) // preparedFrame contains data in wire representation.
if err != nil { type preparedFrame struct {
return nil, err once sync.Once
data []byte
}
// NewPreparedMessage returns initialized PreparedMessage. You can then send
// it to connection using WritePreparedMessage method. Valid wire representation
// will be calculated lazily only once for a set of current connection options.
func NewPreparedMessage(messageType int, data []byte) *PreparedMessage {
if !isData(messageType) {
panic("Prepared message type can only be TextMessage or BinaryMessage")
} }
if _, err = w.Write(data); err != nil { return &PreparedMessage{
return nil, err frameType: messageType,
data: data,
frames: make(map[frameKey]*preparedFrame),
} }
err = w.Close() }
if err != nil {
return nil, err func (pm *PreparedMessage) frame(key frameKey) (int, []byte, error) {
pm.mu.Lock()
frame, ok := pm.frames[key]
if !ok {
frame = &preparedFrame{}
pm.frames[key] = frame
} }
pm.mu.Unlock()
// We always need uncompressed payload because even if application enables var writeErr error
// compression we can't guarantee it will be negotiated with client.
m.payload = c.conn.(*netConn).Writer.(*bytes.Buffer).Bytes()
if compression { frame.once.Do(func() {
// Create compressed payload only if application uses compression. // Create frame data once for a given frameKey.
var c *Conn
m.compression = true if key.isServer {
m.compressionLevel = compressionLevel c = preparingServerConnPool.Get().(*Conn)
} else {
c.conn.(*netConn).Writer.(*bytes.Buffer).Reset() c = preparingClientConnPool.Get().(*Conn)
c.enableWriteCompression = true
c.newCompressionWriter = compressNoContextTakeover
c.SetCompressionLevel(compressionLevel)
w, err = c.NextWriter(messageType)
if err != nil {
return nil, err
} }
if _, err = w.Write(data); err != nil {
return nil, err
}
err = w.Close()
if err != nil {
return nil, err
}
m.compressedPayload = c.conn.(*netConn).Writer.(*bytes.Buffer).Bytes()
}
return m, nil defer func() {
c.conn.(*netConn).Buffer.Reset()
c.enableWriteCompression = false
c.newCompressionWriter = nil
c.SetCompressionLevel(0)
if key.isServer {
preparingServerConnPool.Put(c)
} else {
preparingClientConnPool.Put(c)
}
}()
if key.compress {
c.enableWriteCompression = true
c.newCompressionWriter = compressNoContextTakeover
c.SetCompressionLevel(key.compressionLevel)
}
writeErr := c.WriteMessage(pm.frameType, pm.data)
if writeErr == nil {
preparedData := c.conn.(*netConn).Buffer.Bytes()
data := make([]byte, len(preparedData))
copy(data, preparedData)
frame.data = data
}
})
return pm.frameType, frame.data, writeErr
} }
// WritePreparedMessage writes prepared message into connection. // WritePreparedMessage writes prepared message into connection.
func (c *Conn) WritePreparedMessage(msg *PreparedMessage) error { func (c *Conn) WritePreparedMessage(msg *PreparedMessage) error {
frameType, frameData, err := msg.frame(frameKey{
isServer: c.isServer,
compress: c.newCompressionWriter != nil && c.enableWriteCompression,
compressionLevel: c.compressionLevel,
})
if err != nil {
return err
}
if c.isWriting { if c.isWriting {
panic("concurrent write to websocket connection") panic("concurrent write to websocket connection")
} }
c.isWriting = true c.isWriting = true
err = c.write(frameType, c.writeDeadline, frameData, nil)
var err error
if c.newCompressionWriter != nil && c.enableWriteCompression && isData(msg.messageType) {
err = c.write(msg.messageType, c.writeDeadline, msg.compressedPayload)
} else {
err = c.write(msg.messageType, c.writeDeadline, msg.payload)
}
if !c.isWriting { if !c.isWriting {
panic("concurrent write to websocket connection") panic("concurrent write to websocket connection")
} }
c.isWriting = false c.isWriting = false
return err return err
} }

View File

@ -620,7 +620,7 @@ func BenchmarkBroadcastNoCompressionPrepared(b *testing.B) {
for j := 0; j < b.N; j++ { for j := 0; j < b.N; j++ {
for i := 0; i < bench.numMessages; i++ { for i := 0; i < bench.numMessages; i++ {
msg := bench.messages[i%len(bench.messages)] msg := bench.messages[i%len(bench.messages)]
preparedMsg, _ := NewPreparedMessage(TextMessage, msg, false, 1) preparedMsg := NewPreparedMessage(TextMessage, msg)
for _, c := range conns { for _, c := range conns {
c.messages <- preparedMsg c.messages <- preparedMsg
} }
@ -638,7 +638,7 @@ func BenchmarkBroadcastWithCompressionPrepared(b *testing.B) {
for j := 0; j < b.N; j++ { for j := 0; j < b.N; j++ {
for i := 0; i < bench.numMessages; i++ { for i := 0; i < bench.numMessages; i++ {
msg := bench.messages[i%len(bench.messages)] msg := bench.messages[i%len(bench.messages)]
preparedMsg, _ := NewPreparedMessage(TextMessage, msg, true, 1) preparedMsg := NewPreparedMessage(TextMessage, msg)
for _, c := range conns { for _, c := range conns {
c.messages <- preparedMsg c.messages <- preparedMsg
} }
@ -655,7 +655,7 @@ func TestPreparedMessageBytesStreamUncompressed(t *testing.T) {
var b1 bytes.Buffer var b1 bytes.Buffer
c := newConn(fakeNetConn{Reader: nil, Writer: &b1}, true, 1024, 1024) c := newConn(fakeNetConn{Reader: nil, Writer: &b1}, true, 1024, 1024)
for _, msg := range messages { for _, msg := range messages {
preparedMsg, _ := NewPreparedMessage(TextMessage, msg, false, 1) preparedMsg := NewPreparedMessage(TextMessage, msg)
c.WritePreparedMessage(preparedMsg) c.WritePreparedMessage(preparedMsg)
} }
out1 := b1.Bytes() out1 := b1.Bytes()
@ -679,8 +679,10 @@ func TestPreparedMessageBytesStreamCompressed(t *testing.T) {
c := newConn(fakeNetConn{Reader: nil, Writer: &b1}, true, 1024, 1024) c := newConn(fakeNetConn{Reader: nil, Writer: &b1}, true, 1024, 1024)
c.enableWriteCompression = true c.enableWriteCompression = true
c.newCompressionWriter = compressNoContextTakeover c.newCompressionWriter = compressNoContextTakeover
for _, msg := range messages { for i, msg := range messages {
preparedMsg, _ := NewPreparedMessage(TextMessage, msg, true, 1) preparedMsg := NewPreparedMessage(TextMessage, msg)
level := i%(maxCompressionLevel-minCompressionLevel+1) - 2
c.SetCompressionLevel(level)
c.WritePreparedMessage(preparedMsg) c.WritePreparedMessage(preparedMsg)
} }
out1 := b1.Bytes() out1 := b1.Bytes()
@ -689,7 +691,9 @@ func TestPreparedMessageBytesStreamCompressed(t *testing.T) {
c = newConn(fakeNetConn{Reader: nil, Writer: &b2}, true, 1024, 1024) c = newConn(fakeNetConn{Reader: nil, Writer: &b2}, true, 1024, 1024)
c.enableWriteCompression = true c.enableWriteCompression = true
c.newCompressionWriter = compressNoContextTakeover c.newCompressionWriter = compressNoContextTakeover
for _, msg := range messages { for i, msg := range messages {
level := i%(maxCompressionLevel-minCompressionLevel+1) - 2
c.SetCompressionLevel(level)
c.WriteMessage(TextMessage, msg) c.WriteMessage(TextMessage, msg)
} }
out2 := b2.Bytes() out2 := b2.Bytes()