aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--.bleep2
-rw-r--r--pingora-core/src/protocols/http/compression/brotli.rs1
-rw-r--r--pingora-core/src/protocols/http/compression/gzip.rs89
-rw-r--r--pingora-core/src/protocols/http/compression/mod.rs47
4 files changed, 114 insertions, 25 deletions
diff --git a/.bleep b/.bleep
index 9ab32a4..f05f155 100644
--- a/.bleep
+++ b/.bleep
@@ -1 +1 @@
-e68f6024370efed50aebc8741171956acabf9c35 \ No newline at end of file
+4c6da000f956f3c13473dee0c6302ac0126418dd \ No newline at end of file
diff --git a/pingora-core/src/protocols/http/compression/brotli.rs b/pingora-core/src/protocols/http/compression/brotli.rs
index 956f87d..89f7b4e 100644
--- a/pingora-core/src/protocols/http/compression/brotli.rs
+++ b/pingora-core/src/protocols/http/compression/brotli.rs
@@ -42,7 +42,6 @@ impl Decompressor {
impl Encode for Decompressor {
fn encode(&mut self, input: &[u8], end: bool) -> Result<Bytes> {
- // reserve at most 16k
const MAX_INIT_COMPRESSED_SIZE_CAP: usize = 4 * 1024;
// Brotli compress ratio can be 3.5 to 4.5
const ESTIMATED_COMPRESSION_RATIO: usize = 4;
diff --git a/pingora-core/src/protocols/http/compression/gzip.rs b/pingora-core/src/protocols/http/compression/gzip.rs
index d64c961..f7f997d 100644
--- a/pingora-core/src/protocols/http/compression/gzip.rs
+++ b/pingora-core/src/protocols/http/compression/gzip.rs
@@ -12,15 +12,65 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use super::Encode;
+use super::{Encode, COMPRESSION_ERROR};
use bytes::Bytes;
-use flate2::write::GzEncoder;
-use pingora_error::Result;
+use flate2::write::{GzDecoder, GzEncoder};
+use pingora_error::{OrErr, Result};
use std::io::Write;
use std::time::{Duration, Instant};
-// TODO: unzip
+pub struct Decompressor {
+ decompress: GzDecoder<Vec<u8>>,
+ total_in: usize,
+ total_out: usize,
+ duration: Duration,
+}
+
+impl Decompressor {
+ pub fn new() -> Self {
+ Decompressor {
+ decompress: GzDecoder::new(vec![]),
+ total_in: 0,
+ total_out: 0,
+ duration: Duration::new(0, 0),
+ }
+ }
+}
+
+impl Encode for Decompressor {
+ fn encode(&mut self, input: &[u8], end: bool) -> Result<Bytes> {
+ const MAX_INIT_COMPRESSED_SIZE_CAP: usize = 4 * 1024;
+ const ESTIMATED_COMPRESSION_RATIO: usize = 3; // estimated 2.5-3x compression
+ let start = Instant::now();
+ self.total_in += input.len();
+ // cap the buf size amplification, there is a DoS risk of always allocate
+ // 3x the memory of the input buffer
+ let reserve_size = if input.len() < MAX_INIT_COMPRESSED_SIZE_CAP {
+ input.len() * ESTIMATED_COMPRESSION_RATIO
+ } else {
+ input.len()
+ };
+ self.decompress.get_mut().reserve(reserve_size);
+ self.decompress
+ .write_all(input)
+ .or_err(COMPRESSION_ERROR, "while decompress Gzip")?;
+ // write to vec will never fail, only possible error is that the input data
+ // was not actually gzip compressed
+ if end {
+ self.decompress
+ .try_finish()
+ .or_err(COMPRESSION_ERROR, "while decompress Gzip")?;
+ }
+ self.total_out += self.decompress.get_ref().len();
+ self.duration += start.elapsed();
+ Ok(std::mem::take(self.decompress.get_mut()).into()) // into() Bytes will drop excess capacity
+ }
+
+ fn stat(&self) -> (&'static str, usize, usize, Duration) {
+ ("de-gzip", self.total_in, self.total_out, self.duration)
+ }
+}
pub struct Compressor {
// TODO: enum for other compression algorithms
@@ -66,6 +116,20 @@ impl Encode for Compressor {
}
use std::ops::{Deref, DerefMut};
+impl Deref for Decompressor {
+ type Target = GzDecoder<Vec<u8>>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.decompress
+ }
+}
+
+impl DerefMut for Decompressor {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.decompress
+ }
+}
+
impl Deref for Compressor {
type Target = GzEncoder<Vec<u8>>;
@@ -100,4 +164,21 @@ mod tests_stream {
assert!(compressor.get_ref().is_empty());
}
+
+ #[test]
+ fn gunzip_data() {
+ let mut decompressor = Decompressor::new();
+
+ let compressed_bytes = &[
+ 0x1f, 0x8b, 0x08, 0, 0, 0, 0, 0, 0, 255, 75, 76, 74, 78, 73, 77, 75, 7, 0, 166, 106,
+ 42, 49, 7, 0, 0, 0,
+ ];
+ let decompressed = decompressor.encode(compressed_bytes, true).unwrap();
+
+ assert_eq!(&decompressed[..], b"abcdefg");
+ assert_eq!(decompressor.total_in, compressed_bytes.len());
+ assert_eq!(decompressor.total_out, decompressed.len());
+
+ assert!(decompressor.get_ref().is_empty());
+ }
}
diff --git a/pingora-core/src/protocols/http/compression/mod.rs b/pingora-core/src/protocols/http/compression/mod.rs
index ad1cd0b..6236a0d 100644
--- a/pingora-core/src/protocols/http/compression/mod.rs
+++ b/pingora-core/src/protocols/http/compression/mod.rs
@@ -67,10 +67,10 @@ pub struct ResponseCompressionCtx(CtxInner);
enum CtxInner {
HeaderPhase {
- decompress_enable: bool,
// Store the preferred list to compare with content-encoding
accept_encoding: Vec<Algorithm>,
encoding_levels: [u32; Algorithm::COUNT],
+ decompress_enable: [bool; Algorithm::COUNT],
},
BodyPhase(Option<Box<dyn Encode + Send + Sync>>),
}
@@ -81,9 +81,9 @@ impl ResponseCompressionCtx {
/// The `decompress_enable` flag will tell the ctx to decompress if needed.
pub fn new(compression_level: u32, decompress_enable: bool) -> Self {
Self(CtxInner::HeaderPhase {
- decompress_enable,
accept_encoding: Vec::new(),
encoding_levels: [compression_level; Algorithm::COUNT],
+ decompress_enable: [decompress_enable; Algorithm::COUNT],
})
}
@@ -93,9 +93,9 @@ impl ResponseCompressionCtx {
match &self.0 {
CtxInner::HeaderPhase {
decompress_enable,
- accept_encoding: _,
encoding_levels: levels,
- } => levels.iter().any(|l| *l != 0) || *decompress_enable,
+ ..
+ } => levels.iter().any(|l| *l != 0) || decompress_enable.iter().any(|d| *d),
CtxInner::BodyPhase(c) => c.is_some(),
}
}
@@ -104,11 +104,7 @@ impl ResponseCompressionCtx {
/// algorithm name, in bytes, out bytes, time took for the compression
pub fn get_info(&self) -> Option<(&'static str, usize, usize, Duration)> {
match &self.0 {
- CtxInner::HeaderPhase {
- decompress_enable: _,
- accept_encoding: _,
- encoding_levels: _,
- } => None,
+ CtxInner::HeaderPhase { .. } => None,
CtxInner::BodyPhase(c) => c.as_ref().map(|c| c.stat()),
}
}
@@ -119,9 +115,8 @@ impl ResponseCompressionCtx {
pub fn adjust_level(&mut self, new_level: u32) {
match &mut self.0 {
CtxInner::HeaderPhase {
- decompress_enable: _,
- accept_encoding: _,
encoding_levels: levels,
+ ..
} => {
*levels = [new_level; Algorithm::COUNT];
}
@@ -135,9 +130,8 @@ impl ResponseCompressionCtx {
pub fn adjust_algorithm_level(&mut self, algorithm: Algorithm, new_level: u32) {
match &mut self.0 {
CtxInner::HeaderPhase {
- decompress_enable: _,
- accept_encoding: _,
encoding_levels: levels,
+ ..
} => {
levels[algorithm.index()] = new_level;
}
@@ -145,17 +139,29 @@ impl ResponseCompressionCtx {
}
}
- /// Adjust the decompression flag.
+ /// Adjust the decompression flag for all compression algorithms.
/// # Panic
/// This function will panic if it has already started encoding the response body.
pub fn adjust_decompression(&mut self, enabled: bool) {
match &mut self.0 {
CtxInner::HeaderPhase {
- decompress_enable,
- accept_encoding: _,
- encoding_levels: _,
+ decompress_enable, ..
} => {
- *decompress_enable = enabled;
+ *decompress_enable = [enabled; Algorithm::COUNT];
+ }
+ CtxInner::BodyPhase(_) => panic!("Wrong phase: BodyPhase"),
+ }
+ }
+
+ /// Adjust the decompression flag for a specific algorithm.
+ /// # Panic
+ /// This function will panic if it has already started encoding the response body.
+ pub fn adjust_algorithm_decompression(&mut self, algorithm: Algorithm, enabled: bool) {
+ match &mut self.0 {
+ CtxInner::HeaderPhase {
+ decompress_enable, ..
+ } => {
+ decompress_enable[algorithm.index()] = enabled;
}
CtxInner::BodyPhase(_) => panic!("Wrong phase: BodyPhase"),
}
@@ -208,7 +214,9 @@ impl ResponseCompressionCtx {
let encoder = match action {
Action::Noop => None,
Action::Compress(algorithm) => algorithm.compressor(levels[algorithm.index()]),
- Action::Decompress(algorithm) => algorithm.decompressor(*decompress_enable),
+ Action::Decompress(algorithm) => {
+ algorithm.decompressor(decompress_enable[algorithm.index()])
+ }
};
if encoder.is_some() {
adjust_response_header(resp, &action);
@@ -317,6 +325,7 @@ impl Algorithm {
None
} else {
match self {
+ Self::Gzip => Some(Box::new(gzip::Decompressor::new())),
Self::Brotli => Some(Box::new(brotli::Decompressor::new())),
_ => None, // not implemented
}