upgrade: use pool for writerdict

This commit is contained in:
misu 2018-01-30 19:02:50 +09:00
parent f2a68e2d21
commit 2472e6d800
3 changed files with 15 additions and 76 deletions

View File

@ -5,11 +5,12 @@
package websocket
import (
"compress/flate"
"errors"
"io"
"strings"
"sync"
"compress/flate"
)
const (
@ -55,7 +56,7 @@ func isValidCompressionLevel(level int) bool {
return minCompressionLevel <= level && level <= maxCompressionLevel
}
func compressNoContextTakeover(w io.WriteCloser, level int, dict *[]byte) io.WriteCloser {
func compressNoContextTakeover(w io.WriteCloser, level int) io.WriteCloser {
p := &flateWriterPools[level-minCompressionLevel]
tw := &truncWriter{w: w}
fw, _ := p.Get().(*flate.Writer)
@ -67,12 +68,16 @@ func compressNoContextTakeover(w io.WriteCloser, level int, dict *[]byte) io.Wri
return &flateWriteWrapper{fw: fw, tw: tw, p: p}
}
func compressContextTakeover(w io.WriteCloser, level int, dict *[]byte) io.WriteCloser {
func compressContextTakeover(w io.WriteCloser, level int) io.WriteCloser {
p := &flateWriterDictPools[level-minCompressionLevel]
tw := &truncWriter{w: w}
fw, _ := flate.NewWriterDict(tw, level, *dict)
return &flateWriteWrapper{fw: fw, tw: tw, hasDict: true, dict: dict}
fw, _ := p.Get().(*flate.Writer)
if fw == nil {
fw, _ = flate.NewWriterDict(tw, level, []byte{})
} else {
fw.Reset(tw)
}
return &flateWriteWrapper{fw: fw, tw: tw, p: p}
}
// truncWriter is an io.Writer that writes all but the last four bytes of the
@ -115,9 +120,6 @@ type flateWriteWrapper struct {
fw *flate.Writer
tw *truncWriter
p *sync.Pool
hasDict bool
dict *[]byte
}
func (w *flateWriteWrapper) Write(p []byte) (int, error) {
@ -125,10 +127,6 @@ func (w *flateWriteWrapper) Write(p []byte) (int, error) {
return 0, errWriteClosed
}
if w.hasDict {
w.addDict(p)
}
return w.fw.Write(p)
}
@ -138,9 +136,7 @@ func (w *flateWriteWrapper) Close() error {
}
err1 := w.fw.Flush()
if !w.hasDict {
w.p.Put(w.fw)
}
w.fw = nil
if w.tw.p != [4]byte{0, 0, 0xff, 0xff} {
@ -153,16 +149,6 @@ func (w *flateWriteWrapper) Close() error {
return err2
}
// addDict adds payload to dict.
func (w *flateWriteWrapper) addDict(b []byte) {
*w.dict = append(*w.dict, b...)
if len(*w.dict) > maxWindowBits {
offset := len(*w.dict) - maxWindowBits
*w.dict = (*w.dict)[offset:]
}
}
type flateReadWrapper struct {
fr io.ReadCloser // flate.NewReader

View File

@ -79,45 +79,6 @@ func BenchmarkWriteWithCompressionOfContextTakeover(b *testing.B) {
b.ReportAllocs()
}
func BenchmarkCallWriteWithCompressionOfContextTakeover(b *testing.B) {
w := ioutil.Discard
c := newConn(fakeNetConn{Reader: nil, Writer: w}, false, 1024, 1024)
// messages := textMessages(100)
c.enableWriteCompression = true
c.contextTakeover = true
c.newCompressionWriter = compressContextTakeover
mw := &messageWriter{
c: c,
frameType: 2,
pos: maxFrameHeaderSize,
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
// c.txDict = &messages[i%len(messages)]
c.newCompressionWriter(mw, 2, &[]byte{})
}
b.ReportAllocs()
}
func BenchmarkCallWriteWithCompression(b *testing.B) {
w := ioutil.Discard
c := newConn(fakeNetConn{Reader: nil, Writer: w}, false, 1024, 1024)
// messages := textMessages(100)
c.enableWriteCompression = true
c.newCompressionWriter = compressNoContextTakeover
mw := &messageWriter{
c: c,
frameType: 2,
pos: maxFrameHeaderSize,
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
// c.txDict = &messages[i%len(messages)]
c.newCompressionWriter(mw, 2, nil)
}
b.ReportAllocs()
}
func BenchmarkReadWithCompression(b *testing.B) {
w := ioutil.Discard
c := newConn(fakeNetConn{Reader: nil, Writer: w}, false, 1024, 1024)

12
conn.go
View File

@ -243,7 +243,7 @@ type Conn struct {
enableWriteCompression bool
compressionLevel int
newCompressionWriter func(io.WriteCloser, int, *[]byte) io.WriteCloser
newCompressionWriter func(io.WriteCloser, int) io.WriteCloser
// Read fields
reader io.ReadCloser // the current reader returned to the application
@ -265,7 +265,6 @@ type Conn struct {
newDecompressionReader func(io.Reader, *[]byte) io.ReadCloser // arges may flateReadWrapper struct
contextTakeover bool
txDict *[]byte
rxDict *[]byte
}
@ -337,7 +336,6 @@ func newConnBRW(conn net.Conn, isServer bool, readBufferSize, writeBufferSize in
enableWriteCompression: true,
compressionLevel: defaultCompressionLevel,
txDict: &[]byte{},
rxDict: &[]byte{},
}
c.SetCloseHandler(nil)
@ -509,13 +507,7 @@ func (c *Conn) NextWriter(messageType int) (io.WriteCloser, error) {
c.writer = mw
if c.newCompressionWriter != nil && c.enableWriteCompression && isData(messageType) {
mw.compress = true
switch {
case c.contextTakeover:
c.writer = c.newCompressionWriter(c.writer, c.compressionLevel, c.txDict)
// no-context-takeover
default:
c.writer = c.newCompressionWriter(c.writer, c.compressionLevel, nil)
}
c.writer = c.newCompressionWriter(c.writer, c.compressionLevel)
}
return c.writer, nil
}