From 7c7a5a10efd3d2621c941256b0c92e8d96974ff3 Mon Sep 17 00:00:00 2001 From: Andy Balholm Date: Thu, 7 May 2020 17:27:37 -0700 Subject: [PATCH] Push output directly to dst instead of buffering. --- encode.go | 92 ++++++++++++++++++------------------------------------- writer.go | 15 +++------ 2 files changed, 35 insertions(+), 72 deletions(-) diff --git a/encode.go b/encode.go index 3192b9b..375a470 100644 --- a/encode.go +++ b/encode.go @@ -74,6 +74,7 @@ const ( type Writer struct { dst io.Writer options WriterOptions + err error params encoderParams hasher_ hasherHandle @@ -102,9 +103,6 @@ type Writer struct { cmd_code_numbits_ uint command_buf_ []uint32 literal_buf_ []byte - next_out_ []byte - available_out_ uint - total_out_ uint tiny_buf_ struct { u64 [2]uint64 u8 [16]byte @@ -145,7 +143,7 @@ func wrapPosition(position uint64) uint32 { return result } -func getBrotliStorage(s *Writer, size int) []byte { +func (s *Writer) getStorage(size int) []byte { if len(s.storage) < size { s.storage = make([]byte, size) } @@ -1089,9 +1087,6 @@ func encoderInitState(s *Writer) { s.hasher_.Common().is_prepared_ = false } s.cmd_code_numbits_ = 0 - s.next_out_ = nil - s.available_out_ = 0 - s.total_out_ = 0 s.stream_state_ = streamProcessing s.is_last_block_emitted_ = false s.is_initialized_ = false @@ -1209,11 +1204,10 @@ func extendLastCommand(s *Writer, bytes *uint32, wrapped_last_processed_pos *uin } /* - Processes the accumulated input data and sets |s.available_out_| to the length of - the new output meta-block, or to zero if no new output meta-block has been - created (in this case the processed input data is buffered internally). - If |s.available_out_| is positive, |s.next_out_| points to the start of the output - data. If |is_last| or |force_flush| is true, an output meta-block is + Processes the accumulated input data and writes + the new output meta-block to s.dest, if one has been + created (otherwise the processed input data is buffered internally). + If |is_last| or |force_flush| is true, an output meta-block is always created. However, until |is_last| is true encoder may retain up to 7 bits of the last byte of output. To force encoder to dump the remaining bits use WriteMetadata() to append an empty meta-data block. @@ -1257,11 +1251,10 @@ func encodeData(s *Writer, is_last bool, force_flush bool) bool { if delta == 0 && !is_last { /* We have no new input data and we don't have to finish the stream, so nothing to do. */ - s.available_out_ = 0 return true } - storage = getBrotliStorage(s, int(2*bytes+503)) + storage = s.getStorage(int(2*bytes + 503)) storage[0] = byte(s.last_bytes_) storage[1] = byte(s.last_bytes_ >> 8) table = getHashTable(s, s.params.quality, uint(bytes), &table_size) @@ -1274,8 +1267,7 @@ func encodeData(s *Writer, is_last bool, force_flush bool) bool { s.last_bytes_ = uint16(storage[storage_ix>>3]) s.last_bytes_bits_ = byte(storage_ix & 7) updateLastProcessedPos(s) - s.next_out_ = storage[0:] - s.available_out_ = storage_ix >> 3 + s.writeOutput(storage[:storage_ix>>3]) return true } { @@ -1334,7 +1326,6 @@ func encodeData(s *Writer, is_last bool, force_flush bool) bool { hasherReset(s.hasher_) } - s.available_out_ = 0 return true } } @@ -1350,8 +1341,6 @@ func encodeData(s *Writer, is_last bool, force_flush bool) bool { if !is_last && s.input_pos_ == s.last_flush_pos_ { /* We have no new input data and we don't have to finish the stream, so nothing to do. */ - s.available_out_ = 0 - return true } @@ -1360,7 +1349,7 @@ func encodeData(s *Writer, is_last bool, force_flush bool) bool { assert(s.input_pos_-s.last_flush_pos_ <= 1<<24) { var metablock_size uint32 = uint32(s.input_pos_ - s.last_flush_pos_) - var storage []byte = getBrotliStorage(s, int(2*metablock_size+503)) + var storage []byte = s.getStorage(int(2*metablock_size + 503)) var storage_ix uint = uint(s.last_bytes_bits_) storage[0] = byte(s.last_bytes_) storage[1] = byte(s.last_bytes_ >> 8) @@ -1387,8 +1376,7 @@ func encodeData(s *Writer, is_last bool, force_flush bool) bool { emitting an uncompressed block. */ copy(s.saved_dist_cache_[:], s.dist_cache_[:]) - s.next_out_ = storage[0:] - s.available_out_ = storage_ix >> 3 + s.writeOutput(storage[:storage_ix>>3]) return true } } @@ -1428,7 +1416,6 @@ func writeMetadataHeader(s *Writer, block_size uint, header []byte) uint { func injectBytePaddingBlock(s *Writer) { var seal uint32 = uint32(s.last_bytes_) var seal_bits uint = uint(s.last_bytes_bits_) - var destination []byte s.last_bytes_ = 0 s.last_bytes_bits_ = 0 @@ -1437,14 +1424,7 @@ func injectBytePaddingBlock(s *Writer) { seal_bits += 6 - /* If we have already created storage, then append to it. - Storage is valid until next block is being compressed. */ - if s.next_out_ != nil { - destination = s.next_out_[s.available_out_:] - } else { - destination = s.tiny_buf_.u8[:] - s.next_out_ = destination - } + destination := s.tiny_buf_.u8[:] destination[0] = byte(seal) if seal_bits > 8 { @@ -1453,13 +1433,12 @@ func injectBytePaddingBlock(s *Writer) { if seal_bits > 16 { destination[2] = byte(seal >> 16) } - s.available_out_ += (seal_bits + 7) >> 3 + s.writeOutput(destination[:(seal_bits+7)>>3]) } func checkFlushComplete(s *Writer) { - if s.stream_state_ == streamFlushRequested && s.available_out_ == 0 { + if s.stream_state_ == streamFlushRequested && s.err == nil { s.stream_state_ = streamProcessing - s.next_out_ = nil } } @@ -1497,10 +1476,10 @@ func encoderCompressStreamFast(s *Writer, op int, available_in *uint, next_in *[ continue } - /* Compress block only when internal output buffer is empty, stream is not + /* Compress block only when stream is not finished, there is no pending flush request, and there is either additional input or pending operation. */ - if s.available_out_ == 0 && s.stream_state_ == streamProcessing && (*available_in != 0 || op != int(operationProcess)) { + if s.stream_state_ == streamProcessing && (*available_in != 0 || op != int(operationProcess)) { var block_size uint = brotli_min_size_t(block_size_limit, *available_in) var is_last bool = (*available_in == block_size) && (op == int(operationFinish)) var force_flush bool = (*available_in == block_size) && (op == int(operationFlush)) @@ -1515,7 +1494,7 @@ func encoderCompressStreamFast(s *Writer, op int, available_in *uint, next_in *[ continue } - storage = getBrotliStorage(s, int(max_out_size)) + storage = s.getStorage(int(max_out_size)) storage[0] = byte(s.last_bytes_) storage[1] = byte(s.last_bytes_ >> 8) @@ -1530,8 +1509,7 @@ func encoderCompressStreamFast(s *Writer, op int, available_in *uint, next_in *[ *next_in = (*next_in)[block_size:] *available_in -= block_size var out_bytes uint = storage_ix >> 3 - s.next_out_ = storage - s.available_out_ = out_bytes + s.writeOutput(storage[:out_bytes]) s.last_bytes_ = uint16(storage[storage_ix>>3]) s.last_bytes_bits_ = byte(storage_ix & 7) @@ -1575,10 +1553,6 @@ func processMetadata(s *Writer, available_in *uint, next_in *[]byte) bool { continue } - if s.available_out_ != 0 { - break - } - if s.input_pos_ != s.last_flush_pos_ { var result bool = encodeData(s, false, true) if !result { @@ -1588,8 +1562,8 @@ func processMetadata(s *Writer, available_in *uint, next_in *[]byte) bool { } if s.stream_state_ == streamMetadataHead { - s.next_out_ = s.tiny_buf_.u8[:] - s.available_out_ = writeMetadataHeader(s, uint(s.remaining_metadata_bytes_), s.next_out_) + n := writeMetadataHeader(s, uint(s.remaining_metadata_bytes_), s.tiny_buf_.u8[:]) + s.writeOutput(s.tiny_buf_.u8[:n]) s.stream_state_ = streamMetadataBody continue } else { @@ -1603,12 +1577,11 @@ func processMetadata(s *Writer, available_in *uint, next_in *[]byte) bool { /* This guarantees progress in "TakeOutput" workflow. */ var c uint32 = brotli_min_uint32_t(s.remaining_metadata_bytes_, 16) - s.next_out_ = s.tiny_buf_.u8[:] - copy(s.next_out_, (*next_in)[:c]) + copy(s.tiny_buf_.u8[:], (*next_in)[:c]) *next_in = (*next_in)[c:] *available_in -= uint(c) s.remaining_metadata_bytes_ -= c - s.available_out_ = uint(c) + s.writeOutput(s.tiny_buf_.u8[:c]) continue } @@ -1681,9 +1654,9 @@ func encoderCompressStream(s *Writer, op int, available_in *uint, next_in *[]byt continue } - /* Compress data only when internal output buffer is empty, stream is not + /* Compress data only when stream is not finished and there is no pending flush request. */ - if s.available_out_ == 0 && s.stream_state_ == streamProcessing { + if s.stream_state_ == streamProcessing { if remaining_block_size == 0 || op != int(operationProcess) { var is_last bool = ((*available_in == 0) && op == int(operationFinish)) var force_flush bool = ((*available_in == 0) && op == int(operationFlush)) @@ -1710,18 +1683,13 @@ func encoderCompressStream(s *Writer, op int, available_in *uint, next_in *[]byt return true } -func encoderHasMoreOutput(s *Writer) bool { - return s.available_out_ != 0 -} - -func encoderTakeOutput(s *Writer) []byte { - if s.available_out_ == 0 { - return nil +func (w *Writer) writeOutput(data []byte) { + if w.err != nil { + return } - result := s.next_out_[:s.available_out_] - s.total_out_ += s.available_out_ - s.available_out_ = 0 - checkFlushComplete(s) - return result + _, w.err = w.dst.Write(data) + if w.err == nil { + checkFlushComplete(w) + } } diff --git a/writer.go b/writer.go index ec333f9..63676b4 100644 --- a/writer.go +++ b/writer.go @@ -67,6 +67,9 @@ func (w *Writer) writeChunk(p []byte, op int) (n int, err error) { if w.dst == nil { return 0, errWriterClosed } + if w.err != nil { + return 0, w.err + } for { availableIn := uint(len(p)) @@ -79,16 +82,8 @@ func (w *Writer) writeChunk(p []byte, op int) (n int, err error) { return n, errEncode } - outputData := encoderTakeOutput(w) - - if len(outputData) > 0 { - _, err = w.dst.Write(outputData) - if err != nil { - return n, err - } - } - if len(p) == 0 { - return n, nil + if len(p) == 0 || w.err != nil { + return n, w.err } } }