Push output directly to dst instead of buffering.

This commit is contained in:
Andy Balholm 2020-05-07 17:27:37 -07:00
parent 625cbb6f92
commit 7c7a5a10ef
2 changed files with 35 additions and 72 deletions

View File

@ -74,6 +74,7 @@ const (
type Writer struct {
dst io.Writer
options WriterOptions
err error
params encoderParams
hasher_ hasherHandle
@ -102,9 +103,6 @@ type Writer struct {
cmd_code_numbits_ uint
command_buf_ []uint32
literal_buf_ []byte
next_out_ []byte
available_out_ uint
total_out_ uint
tiny_buf_ struct {
u64 [2]uint64
u8 [16]byte
@ -145,7 +143,7 @@ func wrapPosition(position uint64) uint32 {
return result
}
func getBrotliStorage(s *Writer, size int) []byte {
func (s *Writer) getStorage(size int) []byte {
if len(s.storage) < size {
s.storage = make([]byte, size)
}
@ -1089,9 +1087,6 @@ func encoderInitState(s *Writer) {
s.hasher_.Common().is_prepared_ = false
}
s.cmd_code_numbits_ = 0
s.next_out_ = nil
s.available_out_ = 0
s.total_out_ = 0
s.stream_state_ = streamProcessing
s.is_last_block_emitted_ = false
s.is_initialized_ = false
@ -1209,11 +1204,10 @@ func extendLastCommand(s *Writer, bytes *uint32, wrapped_last_processed_pos *uin
}
/*
Processes the accumulated input data and sets |s.available_out_| to the length of
the new output meta-block, or to zero if no new output meta-block has been
created (in this case the processed input data is buffered internally).
If |s.available_out_| is positive, |s.next_out_| points to the start of the output
data. If |is_last| or |force_flush| is true, an output meta-block is
Processes the accumulated input data and writes
the new output meta-block to s.dest, if one has been
created (otherwise the processed input data is buffered internally).
If |is_last| or |force_flush| is true, an output meta-block is
always created. However, until |is_last| is true encoder may retain up
to 7 bits of the last byte of output. To force encoder to dump the remaining
bits use WriteMetadata() to append an empty meta-data block.
@ -1257,11 +1251,10 @@ func encodeData(s *Writer, is_last bool, force_flush bool) bool {
if delta == 0 && !is_last {
/* We have no new input data and we don't have to finish the stream, so
nothing to do. */
s.available_out_ = 0
return true
}
storage = getBrotliStorage(s, int(2*bytes+503))
storage = s.getStorage(int(2*bytes + 503))
storage[0] = byte(s.last_bytes_)
storage[1] = byte(s.last_bytes_ >> 8)
table = getHashTable(s, s.params.quality, uint(bytes), &table_size)
@ -1274,8 +1267,7 @@ func encodeData(s *Writer, is_last bool, force_flush bool) bool {
s.last_bytes_ = uint16(storage[storage_ix>>3])
s.last_bytes_bits_ = byte(storage_ix & 7)
updateLastProcessedPos(s)
s.next_out_ = storage[0:]
s.available_out_ = storage_ix >> 3
s.writeOutput(storage[:storage_ix>>3])
return true
}
{
@ -1334,7 +1326,6 @@ func encodeData(s *Writer, is_last bool, force_flush bool) bool {
hasherReset(s.hasher_)
}
s.available_out_ = 0
return true
}
}
@ -1350,8 +1341,6 @@ func encodeData(s *Writer, is_last bool, force_flush bool) bool {
if !is_last && s.input_pos_ == s.last_flush_pos_ {
/* We have no new input data and we don't have to finish the stream, so
nothing to do. */
s.available_out_ = 0
return true
}
@ -1360,7 +1349,7 @@ func encodeData(s *Writer, is_last bool, force_flush bool) bool {
assert(s.input_pos_-s.last_flush_pos_ <= 1<<24)
{
var metablock_size uint32 = uint32(s.input_pos_ - s.last_flush_pos_)
var storage []byte = getBrotliStorage(s, int(2*metablock_size+503))
var storage []byte = s.getStorage(int(2*metablock_size + 503))
var storage_ix uint = uint(s.last_bytes_bits_)
storage[0] = byte(s.last_bytes_)
storage[1] = byte(s.last_bytes_ >> 8)
@ -1387,8 +1376,7 @@ func encodeData(s *Writer, is_last bool, force_flush bool) bool {
emitting an uncompressed block. */
copy(s.saved_dist_cache_[:], s.dist_cache_[:])
s.next_out_ = storage[0:]
s.available_out_ = storage_ix >> 3
s.writeOutput(storage[:storage_ix>>3])
return true
}
}
@ -1428,7 +1416,6 @@ func writeMetadataHeader(s *Writer, block_size uint, header []byte) uint {
func injectBytePaddingBlock(s *Writer) {
var seal uint32 = uint32(s.last_bytes_)
var seal_bits uint = uint(s.last_bytes_bits_)
var destination []byte
s.last_bytes_ = 0
s.last_bytes_bits_ = 0
@ -1437,14 +1424,7 @@ func injectBytePaddingBlock(s *Writer) {
seal_bits += 6
/* If we have already created storage, then append to it.
Storage is valid until next block is being compressed. */
if s.next_out_ != nil {
destination = s.next_out_[s.available_out_:]
} else {
destination = s.tiny_buf_.u8[:]
s.next_out_ = destination
}
destination := s.tiny_buf_.u8[:]
destination[0] = byte(seal)
if seal_bits > 8 {
@ -1453,13 +1433,12 @@ func injectBytePaddingBlock(s *Writer) {
if seal_bits > 16 {
destination[2] = byte(seal >> 16)
}
s.available_out_ += (seal_bits + 7) >> 3
s.writeOutput(destination[:(seal_bits+7)>>3])
}
func checkFlushComplete(s *Writer) {
if s.stream_state_ == streamFlushRequested && s.available_out_ == 0 {
if s.stream_state_ == streamFlushRequested && s.err == nil {
s.stream_state_ = streamProcessing
s.next_out_ = nil
}
}
@ -1497,10 +1476,10 @@ func encoderCompressStreamFast(s *Writer, op int, available_in *uint, next_in *[
continue
}
/* Compress block only when internal output buffer is empty, stream is not
/* Compress block only when stream is not
finished, there is no pending flush request, and there is either
additional input or pending operation. */
if s.available_out_ == 0 && s.stream_state_ == streamProcessing && (*available_in != 0 || op != int(operationProcess)) {
if s.stream_state_ == streamProcessing && (*available_in != 0 || op != int(operationProcess)) {
var block_size uint = brotli_min_size_t(block_size_limit, *available_in)
var is_last bool = (*available_in == block_size) && (op == int(operationFinish))
var force_flush bool = (*available_in == block_size) && (op == int(operationFlush))
@ -1515,7 +1494,7 @@ func encoderCompressStreamFast(s *Writer, op int, available_in *uint, next_in *[
continue
}
storage = getBrotliStorage(s, int(max_out_size))
storage = s.getStorage(int(max_out_size))
storage[0] = byte(s.last_bytes_)
storage[1] = byte(s.last_bytes_ >> 8)
@ -1530,8 +1509,7 @@ func encoderCompressStreamFast(s *Writer, op int, available_in *uint, next_in *[
*next_in = (*next_in)[block_size:]
*available_in -= block_size
var out_bytes uint = storage_ix >> 3
s.next_out_ = storage
s.available_out_ = out_bytes
s.writeOutput(storage[:out_bytes])
s.last_bytes_ = uint16(storage[storage_ix>>3])
s.last_bytes_bits_ = byte(storage_ix & 7)
@ -1575,10 +1553,6 @@ func processMetadata(s *Writer, available_in *uint, next_in *[]byte) bool {
continue
}
if s.available_out_ != 0 {
break
}
if s.input_pos_ != s.last_flush_pos_ {
var result bool = encodeData(s, false, true)
if !result {
@ -1588,8 +1562,8 @@ func processMetadata(s *Writer, available_in *uint, next_in *[]byte) bool {
}
if s.stream_state_ == streamMetadataHead {
s.next_out_ = s.tiny_buf_.u8[:]
s.available_out_ = writeMetadataHeader(s, uint(s.remaining_metadata_bytes_), s.next_out_)
n := writeMetadataHeader(s, uint(s.remaining_metadata_bytes_), s.tiny_buf_.u8[:])
s.writeOutput(s.tiny_buf_.u8[:n])
s.stream_state_ = streamMetadataBody
continue
} else {
@ -1603,12 +1577,11 @@ func processMetadata(s *Writer, available_in *uint, next_in *[]byte) bool {
/* This guarantees progress in "TakeOutput" workflow. */
var c uint32 = brotli_min_uint32_t(s.remaining_metadata_bytes_, 16)
s.next_out_ = s.tiny_buf_.u8[:]
copy(s.next_out_, (*next_in)[:c])
copy(s.tiny_buf_.u8[:], (*next_in)[:c])
*next_in = (*next_in)[c:]
*available_in -= uint(c)
s.remaining_metadata_bytes_ -= c
s.available_out_ = uint(c)
s.writeOutput(s.tiny_buf_.u8[:c])
continue
}
@ -1681,9 +1654,9 @@ func encoderCompressStream(s *Writer, op int, available_in *uint, next_in *[]byt
continue
}
/* Compress data only when internal output buffer is empty, stream is not
/* Compress data only when stream is not
finished and there is no pending flush request. */
if s.available_out_ == 0 && s.stream_state_ == streamProcessing {
if s.stream_state_ == streamProcessing {
if remaining_block_size == 0 || op != int(operationProcess) {
var is_last bool = ((*available_in == 0) && op == int(operationFinish))
var force_flush bool = ((*available_in == 0) && op == int(operationFlush))
@ -1710,18 +1683,13 @@ func encoderCompressStream(s *Writer, op int, available_in *uint, next_in *[]byt
return true
}
func encoderHasMoreOutput(s *Writer) bool {
return s.available_out_ != 0
func (w *Writer) writeOutput(data []byte) {
if w.err != nil {
return
}
func encoderTakeOutput(s *Writer) []byte {
if s.available_out_ == 0 {
return nil
_, w.err = w.dst.Write(data)
if w.err == nil {
checkFlushComplete(w)
}
result := s.next_out_[:s.available_out_]
s.total_out_ += s.available_out_
s.available_out_ = 0
checkFlushComplete(s)
return result
}

View File

@ -67,6 +67,9 @@ func (w *Writer) writeChunk(p []byte, op int) (n int, err error) {
if w.dst == nil {
return 0, errWriterClosed
}
if w.err != nil {
return 0, w.err
}
for {
availableIn := uint(len(p))
@ -79,16 +82,8 @@ func (w *Writer) writeChunk(p []byte, op int) (n int, err error) {
return n, errEncode
}
outputData := encoderTakeOutput(w)
if len(outputData) > 0 {
_, err = w.dst.Write(outputData)
if err != nil {
return n, err
}
}
if len(p) == 0 {
return n, nil
if len(p) == 0 || w.err != nil {
return n, w.err
}
}
}