diff --git a/Cargo.lock b/Cargo.lock index 9339e1d..a05bf96 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -394,6 +394,24 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +[[package]] +name = "concread" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cba00cef522c2597dfbb0a8d1b0ac8ac2b99714f50cc354cda71da63164da0be" +dependencies = [ + "ahash", + "arc-swap", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", + "lru", + "smallvec", + "sptr", + "tokio", + "tracing", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -428,6 +446,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-queue" version = "0.3.11" @@ -962,6 +989,17 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +[[package]] +name = "leaky-bucket" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a396bb213c2d09ed6c5495fd082c991b6ab39c9daf4fff59e6727f85c73e4c5" +dependencies = [ + "parking_lot", + "pin-project-lite", + "tokio", +] + [[package]] name = "libc" version = "0.2.153" @@ -1117,13 +1155,14 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.11" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" dependencies = [ + "hermit-abi 0.3.9", "libc", "wasi", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -1173,16 +1212,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "num_cpus" -version = "1.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" -dependencies = [ - "hermit-abi 0.3.9", - "libc", -] - [[package]] name = "object" version = "0.32.2" @@ -1812,9 +1841,11 @@ dependencies = [ "async-trait", "cidr", "clap 4.5.4", + "concread", "futures-util", "http 1.1.0", "kdl", + "leaky-bucket", "log", "miette", "pandora-module-utils", @@ -2193,6 +2224,12 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "sptr" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b9b39299b249ad65f3b7e96443bad61c02ca5cd3589f46cb6d610a0fd6c0d6a" + [[package]] name = "static-files-module" version = "0.2.0" @@ -2448,27 +2485,26 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.37.0" +version = "1.39.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" +checksum = "9babc99b9923bfa4804bd74722ff02c0381021eafa4db9949217e3be8e84fff5" dependencies = [ "backtrace", "bytes", "libc", "mio", - "num_cpus", "pin-project-lite", "signal-hook-registry", "socket2", "tokio-macros", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] name = "tokio-macros" -version = "2.2.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" +checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", diff --git a/source/river/Cargo.toml b/source/river/Cargo.toml index 4eb3f91..f19e321 100644 --- a/source/river/Cargo.toml +++ b/source/river/Cargo.toml @@ -24,17 +24,19 @@ rustdoc-args = ["--cfg", "doc_cfg"] [dependencies] async-trait = "0.1.79" +cidr = "0.2.3" +concread = "0.5.3" +futures-util = "0.3.30" +http = "1.0.0" +kdl = "4.6.0" +leaky-bucket = "1.1.2" log = "0.4.21" +miette = { version = "5.10.0", features = ["fancy"] } regex = "1.10.4" +thiserror = "1.0.61" tokio = "1.37.0" # TODO: check for implicit feature usage toml = "0.8.12" tracing = "0.1.40" -kdl = "4.6.0" -miette = { version = "5.10.0", features = ["fancy"] } -thiserror = "1.0.61" -http = "1.0.0" -futures-util = "0.3.30" -cidr = "0.2.3" [dependencies.static-files-module] version = "0.2" diff --git a/source/river/assets/test-config.kdl b/source/river/assets/test-config.kdl index c0de1aa..d6e1e2c 100644 --- a/source/river/assets/test-config.kdl +++ b/source/river/assets/test-config.kdl @@ -31,6 +31,45 @@ services { "0.0.0.0:4443" cert-path="./assets/test.crt" key-path="./assets/test.key" offer-h2=true } + // Apply Rate limiting to this service + // + // Note that ALL rules are applied, and a request must receive a token from all + // applicable rules. + // + // For example: + // + // A request to URI `/index.html` from IP 1.2.3.4 will only need to get a token from + // the `source-ip` rule. + // + // A request to URI `/static/style.css` from IP 1.2.3.4 will need to get a token from + // BOTH the `source-ip` rule (from the `1.2.3.4` bucket), AND the `specific-uri` rule + // (from the `/static/style.css` bucket) + rate-limiting { + // This rate limiting rule is based on the source IP address + // + // * Up to the last 4000 IP addresses will be remembered + // * Each IP address can make a burst of 10 requests + // * The bucket for each IP will refill at a rate of 1 request per 10 milliseconds + rule kind="source-ip" \ + max-buckets=4000 tokens-per-bucket=10 refill-qty=1 refill-rate-ms=10 + + // This rate limiting is based on the specific URI path + // + // * Up to the last 2000 URI paths will be remembered + // * Each URI path can make a burst of 20 requests + // * The bucket for each URI will refill at a rate of 5 requests per 1 millisecond + rule kind="specific-uri" pattern="static/.*" \ + max-buckets=2000 tokens-per-bucket=20 refill-qty=5 refill-rate-ms=1 + + // This rate limiting is based on ANY URI paths that match the pattern + // + // * A single bucket will be used for all URIs that match the pattern + // * We allow a burst of up to 50 requests for any MP4 files + // * The bucket for all MP4 files will refill at a rate of 2 requests per 3 milliseconds + rule kind="any-matching-uri" pattern=r".*\.mp4" \ + tokens-per-bucket=50 refill-qty=2 refill-rate-ms=3 + } + // Connectors are the "upstream" interfaces that we connect with. We can name as many // as we'd like, at least one is required. By default, connectors are distributed // round-robin. @@ -52,7 +91,7 @@ services { // This section is optional. path-control { request-filters { - filter kind="block-cidr-range" addrs="192.168.0.0/16, 10.0.0.0/8, 2001:0db8::0/32, 127.0.0.1" + filter kind="block-cidr-range" addrs="192.168.0.0/16, 10.0.0.0/8, 2001:0db8::0/32" } upstream-request { filter kind="remove-header-key-regex" pattern=".*(secret|SECRET).*" diff --git a/source/river/src/config/internal.rs b/source/river/src/config/internal.rs index e610533..db96c94 100644 --- a/source/river/src/config/internal.rs +++ b/source/river/src/config/internal.rs @@ -14,7 +14,10 @@ use pingora::{ }; use tracing::warn; -use crate::proxy::request_selector::{null_selector, RequestSelector}; +use crate::proxy::{ + rate_limiting::AllRateConfig, + request_selector::{null_selector, RequestSelector}, +}; /// River's internal configuration #[derive(Debug, Clone)] @@ -109,6 +112,12 @@ impl Config { } } +/// +#[derive(Debug, Default, Clone, PartialEq)] +pub struct RateLimitingConfig { + pub(crate) rules: Vec, +} + /// Add Path Control Modifiers /// /// Note that we use `BTreeMap` and NOT `HashMap`, as we want to maintain the @@ -141,6 +150,7 @@ pub struct ProxyConfig { pub(crate) upstream_options: UpstreamOptions, pub(crate) upstreams: Vec, pub(crate) path_control: PathControl, + pub(crate) rate_limiting: RateLimitingConfig, } #[derive(Debug, PartialEq, Clone)] diff --git a/source/river/src/config/kdl/mod.rs b/source/river/src/config/kdl/mod.rs index 2c9c703..68b8ea2 100644 --- a/source/river/src/config/kdl/mod.rs +++ b/source/river/src/config/kdl/mod.rs @@ -4,19 +4,27 @@ use std::{ path::PathBuf, }; -use kdl::{KdlDocument, KdlEntry, KdlNode}; -use miette::{bail, Diagnostic, SourceSpan}; -use pingora::{protocols::ALPN, upstreams::peer::HttpPeer}; - use crate::{ config::internal::{ Config, DiscoveryKind, FileServerConfig, HealthCheckKind, ListenerConfig, ListenerKind, PathControl, ProxyConfig, SelectionKind, TlsConfig, UpstreamOptions, }, - proxy::request_selector::{ - null_selector, source_addr_and_uri_path_selector, uri_path_selector, RequestSelector, + proxy::{ + rate_limiting::{ + multi::{MultiRaterConfig, MultiRequestKeyKind}, + single::{SingleInstanceConfig, SingleRequestKeyKind}, + AllRateConfig, RegexShim, + }, + request_selector::{ + null_selector, source_addr_and_uri_path_selector, uri_path_selector, RequestSelector, + }, }, }; +use kdl::{KdlDocument, KdlEntry, KdlNode, KdlValue}; +use miette::{bail, Diagnostic, SourceSpan}; +use pingora::{protocols::ALPN, upstreams::peer::HttpPeer}; + +use super::internal::RateLimitingConfig; #[cfg(test)] mod test; @@ -33,7 +41,7 @@ impl TryFrom for Config { upgrade_socket, pid_file, } = extract_system_data(&value)?; - let (basic_proxies, file_servers) = extract_services(&value)?; + let (basic_proxies, file_servers) = extract_services(threads_per_service, &value)?; Ok(Config { threads_per_service, @@ -67,12 +75,14 @@ impl Default for SystemData { /// Extract all services from the top level document fn extract_services( + threads_per_service: usize, doc: &KdlDocument, ) -> miette::Result<(Vec, Vec)> { let service_node = utils::required_child_doc(doc, doc, "services")?; let services = utils::wildcard_argless_child_docs(doc, service_node)?; - let proxy_node_set = HashSet::from(["listeners", "connectors", "path-control"]); + let proxy_node_set = + HashSet::from(["listeners", "connectors", "path-control", "rate-limiting"]); let file_server_node_set = HashSet::from(["listeners", "file-server"]); let mut proxies = vec![]; @@ -96,7 +106,7 @@ fn extract_services( if fingerprint_set.is_subset(&proxy_node_set) { // If the contained nodes are a strict subset of proxy node config fields, // then treat this section as a proxy node - proxies.push(extract_service(doc, name, service)?); + proxies.push(extract_service(threads_per_service, doc, name, service)?); } else if fingerprint_set.is_subset(&file_server_node_set) { // If the contained nodes are a strict subset of the file server config // fields, then treat this section as a file server node @@ -221,6 +231,7 @@ fn extract_file_server( /// Extracts a single service from the `services` block fn extract_service( + threads_per_service: usize, doc: &KdlDocument, name: &str, node: &KdlDocument, @@ -281,15 +292,137 @@ fn extract_service( } } + // Rate limiting + let mut rl = RateLimitingConfig::default(); + if let Some(rl_node) = utils::optional_child_doc(doc, node, "rate-limiting") { + let nodes = utils::data_nodes(doc, rl_node)?; + for (node, name, args) in nodes.iter() { + if *name == "rule" { + let vals = utils::str_value_args(doc, args)?; + let valslice = vals + .iter() + .map(|(k, v)| (*k, v.value())) + .collect::>(); + rl.rules + .push(make_rate_limiter(threads_per_service, doc, node, valslice)?); + } else { + return Err( + Bad::docspan(format!("Unknown name: '{name}'"), doc, node.span()).into(), + ); + } + } + } + Ok(ProxyConfig { name: name.to_string(), listeners: list_cfgs, upstreams: conn_cfgs, path_control: pc, upstream_options: load_balance.unwrap_or_default(), + rate_limiting: rl, }) } +fn make_rate_limiter( + threads_per_service: usize, + doc: &KdlDocument, + node: &KdlNode, + args: BTreeMap<&str, &KdlValue>, +) -> miette::Result { + let take_num = |key: &str| -> miette::Result { + let Some(val) = args.get(key) else { + return Err(Bad::docspan(format!("Missing key: '{key}'"), doc, node.span()).into()); + }; + let Some(val) = val.as_i64().and_then(|v| usize::try_from(v).ok()) else { + return Err(Bad::docspan( + format!( + "'{key} should have a positive integer value, got '{:?}' instead", + val + ), + doc, + node.span(), + ) + .into()); + }; + Ok(val) + }; + let take_str = |key: &str| -> miette::Result<&str> { + let Some(val) = args.get(key) else { + return Err(Bad::docspan(format!("Missing key: '{key}'"), doc, node.span()).into()); + }; + let Some(val) = val.as_string() else { + return Err(Bad::docspan( + format!("'{key} should have a string value, got '{:?}' instead", val), + doc, + node.span(), + ) + .into()); + }; + Ok(val) + }; + + // mandatory/common fields + let kind = take_str("kind")?; + let tokens_per_bucket = take_num("tokens-per-bucket")?; + let refill_qty = take_num("refill-qty")?; + let refill_rate_ms = take_num("refill-rate-ms")?; + + let multi_cfg = || -> miette::Result { + let max_buckets = take_num("max-buckets")?; + Ok(MultiRaterConfig { + threads: threads_per_service, + max_buckets, + max_tokens_per_bucket: tokens_per_bucket, + refill_interval_millis: refill_rate_ms, + refill_qty, + }) + }; + + let single_cfg = || SingleInstanceConfig { + max_tokens_per_bucket: tokens_per_bucket, + refill_interval_millis: refill_rate_ms, + refill_qty, + }; + + let regex_pattern = || -> miette::Result { + let pattern = take_str("pattern")?; + let Ok(pattern) = RegexShim::new(pattern) else { + return Err(Bad::docspan( + format!("'{pattern} should be a valid regular expression"), + doc, + node.span(), + ) + .into()); + }; + Ok(pattern) + }; + + match kind { + "source-ip" => Ok(AllRateConfig::Multi { + kind: MultiRequestKeyKind::SourceIp, + config: multi_cfg()?, + }), + "specific-uri" => Ok(AllRateConfig::Multi { + kind: MultiRequestKeyKind::Uri { + pattern: regex_pattern()?, + }, + config: multi_cfg()?, + }), + "any-matching-uri" => Ok(AllRateConfig::Single { + kind: SingleRequestKeyKind::UriGroup { + pattern: regex_pattern()?, + }, + config: single_cfg(), + }), + other => Err(Bad::docspan( + format!("'{other} is not a known kind of rate limiting"), + doc, + node.span(), + ) + .into()), + } +} + /// Extracts the `load-balance` structure from the `connectors` section fn extract_load_balance(doc: &KdlDocument, node: &KdlNode) -> miette::Result { let items = utils::data_nodes( diff --git a/source/river/src/config/kdl/test.rs b/source/river/src/config/kdl/test.rs index 08a62b9..6cd2d89 100644 --- a/source/river/src/config/kdl/test.rs +++ b/source/river/src/config/kdl/test.rs @@ -6,7 +6,10 @@ use crate::{ config::internal::{ FileServerConfig, ListenerConfig, ListenerKind, ProxyConfig, UpstreamOptions, }, - proxy::request_selector::uri_path_selector, + proxy::{ + rate_limiting::{multi::MultiRaterConfig, AllRateConfig, RegexShim}, + request_selector::uri_path_selector, + }, }; #[test] @@ -77,7 +80,7 @@ fn load_test() { ("kind".to_string(), "block-cidr-range".to_string()), ( "addrs".to_string(), - "192.168.0.0/16, 10.0.0.0/8, 2001:0db8::0/32, 127.0.0.1".to_string(), + "192.168.0.0/16, 10.0.0.0/8, 2001:0db8::0/32".to_string(), ), ])], }, @@ -87,6 +90,42 @@ fn load_test() { health_checks: crate::config::internal::HealthCheckKind::None, discovery: crate::config::internal::DiscoveryKind::Static, }, + rate_limiting: crate::config::internal::RateLimitingConfig { + rules: vec![ + AllRateConfig::Multi { + config: MultiRaterConfig { + threads: 8, + max_buckets: 4000, + max_tokens_per_bucket: 10, + refill_interval_millis: 10, + refill_qty: 1, + }, + kind: crate::proxy::rate_limiting::multi::MultiRequestKeyKind::SourceIp, + }, + AllRateConfig::Multi { + config: MultiRaterConfig { + threads: 8, + max_buckets: 2000, + max_tokens_per_bucket: 20, + refill_interval_millis: 1, + refill_qty: 5, + }, + kind: crate::proxy::rate_limiting::multi::MultiRequestKeyKind::Uri { + pattern: RegexShim::new("static/.*").unwrap(), + }, + }, + AllRateConfig::Single { + config: crate::proxy::rate_limiting::single::SingleInstanceConfig { + max_tokens_per_bucket: 50, + refill_interval_millis: 3, + refill_qty: 2, + }, + kind: crate::proxy::rate_limiting::single::SingleRequestKeyKind::UriGroup { + pattern: RegexShim::new(r".*\.mp4").unwrap(), + }, + }, + ], + }, }, ProxyConfig { name: "Example2".into(), @@ -104,6 +143,7 @@ fn load_test() { request_filters: vec![], }, upstream_options: UpstreamOptions::default(), + rate_limiting: crate::config::internal::RateLimitingConfig { rules: vec![] }, }, ], file_servers: vec![FileServerConfig { @@ -147,6 +187,7 @@ fn load_test() { upstream_options, upstreams, path_control, + rate_limiting, } = abp; assert_eq!(*name, ebp.name); assert_eq!(*listeners, ebp.listeners); @@ -160,6 +201,7 @@ fn load_test() { assert_eq!(a.sni, e.sni); }); assert_eq!(*path_control, ebp.path_control); + assert_eq!(*rate_limiting, ebp.rate_limiting); } for (afs, efs) in val.file_servers.iter().zip(expected.file_servers.iter()) { diff --git a/source/river/src/config/toml.rs b/source/river/src/config/toml.rs index c45da01..f2004ef 100644 --- a/source/river/src/config/toml.rs +++ b/source/river/src/config/toml.rs @@ -8,7 +8,7 @@ use std::{ use pingora::upstreams::peer::HttpPeer; use serde::{Deserialize, Serialize}; -use super::internal::UpstreamOptions; +use super::internal::{RateLimitingConfig, UpstreamOptions}; /// Configuration used for TOML formatted files #[derive(Serialize, Deserialize, Debug, PartialEq)] @@ -153,6 +153,7 @@ impl From for super::internal::ProxyConfig { upstreams: vec![other.connector.into()], path_control: other.path_control.into(), upstream_options: UpstreamOptions::default(), + rate_limiting: RateLimitingConfig::default(), } } } @@ -205,7 +206,7 @@ pub mod test { use crate::config::{ apply_toml, - internal::{self, UpstreamOptions}, + internal::{self, RateLimitingConfig, UpstreamOptions}, toml::{ConnectorConfig, ListenerConfig, ProxyConfig, System}, }; @@ -365,6 +366,7 @@ pub mod test { request_filters: vec![], }, upstream_options: UpstreamOptions::default(), + rate_limiting: RateLimitingConfig::default(), }, internal::ProxyConfig { name: "Example2".into(), @@ -382,6 +384,7 @@ pub mod test { request_filters: vec![], }, upstream_options: UpstreamOptions::default(), + rate_limiting: RateLimitingConfig::default(), }, ], file_servers: Vec::new(), diff --git a/source/river/src/proxy/mod.rs b/source/river/src/proxy/mod.rs index b85a1e4..b875d88 100644 --- a/source/river/src/proxy/mod.rs +++ b/source/river/src/proxy/mod.rs @@ -9,7 +9,7 @@ use std::collections::{BTreeMap, BTreeSet}; use async_trait::async_trait; use futures_util::FutureExt; -use pingora::{server::Server, Error}; +use pingora::{server::Server, Error, ErrorType}; use pingora_core::{upstreams::peer::HttpPeer, Result}; use pingora_http::{RequestHeader, ResponseHeader}; use pingora_load_balancing::{ @@ -30,13 +30,22 @@ use crate::{ }, }; -use self::request_filters::RequestFilterMod; +use self::{ + rate_limiting::{multi::MultiRaterInstance, single::SingleInstance, Outcome}, + request_filters::RequestFilterMod, +}; +pub mod rate_limiting; pub mod request_filters; pub mod request_modifiers; pub mod request_selector; pub mod response_modifiers; +pub struct RateLimiters { + request_filter_stage_multi: Vec, + request_filter_stage_single: Vec, +} + /// The [RiverProxyService] is intended to capture the behaviors used to extend /// the [HttpProxy] functionality by providing a [ProxyHttp] trait implementation. /// @@ -50,6 +59,7 @@ pub struct RiverProxyService { /// Load Balancer pub load_balancer: LoadBalancer, pub request_selector: RequestSelector, + pub rate_limiters: RateLimiters, } /// Create a proxy service, with the type parameters chosen based on the config file @@ -100,12 +110,32 @@ where .expect("static should not error"); // end of TODO + let mut request_filter_stage_multi = vec![]; + let mut request_filter_stage_single = vec![]; + + for rule in conf.rate_limiting.rules { + match rule { + rate_limiting::AllRateConfig::Single { kind, config } => { + let rater = SingleInstance::new(config, kind); + request_filter_stage_single.push(rater); + } + rate_limiting::AllRateConfig::Multi { kind, config } => { + let rater = MultiRaterInstance::new(config, kind); + request_filter_stage_multi.push(rater); + } + } + } + let mut my_proxy = pingora_proxy::http_proxy_service_with_name( &server.configuration, Self { modifiers, load_balancer: upstreams, request_selector: conf.upstream_options.selector, + rate_limiters: RateLimiters { + request_filter_stage_multi, + request_filter_stage_single, + }, }, &conf.name, ); @@ -168,7 +198,7 @@ impl Modifiers { } other => { tracing::warn!("Unknown request filter: '{other}'"); - return Err(Error::new(pingora::ErrorType::Custom("Bad configuration"))); + return Err(Error::new(ErrorType::Custom("Bad configuration"))); } }; request_filter_mods.push(f); @@ -186,7 +216,7 @@ impl Modifiers { } other => { tracing::warn!("Unknown upstream request filter: '{other}'"); - return Err(Error::new(pingora::ErrorType::Custom("Bad configuration"))); + return Err(Error::new(ErrorType::Custom("Bad configuration"))); } }; upstream_request_filters.push(f); @@ -204,7 +234,7 @@ impl Modifiers { } other => { tracing::warn!("Unknown upstream response filter: '{other}'"); - return Err(Error::new(pingora::ErrorType::Custom("Bad configuration"))); + return Err(Error::new(ErrorType::Custom("Bad configuration"))); } }; upstream_response_filters.push(f); @@ -242,6 +272,39 @@ where where Self::CTX: Send + Sync, { + let multis = self + .rate_limiters + .request_filter_stage_multi + .iter() + .filter_map(|l| l.get_ticket(session)); + + let singles = self + .rate_limiters + .request_filter_stage_single + .iter() + .filter_map(|l| l.get_ticket(session)); + + // Attempt to get all tokens + // + // TODO: If https://github.com/udoprog/leaky-bucket/issues/17 is resolved we could + // remember the buckets that we did get approved for, and "return" the unused tokens. + // + // For now, if some tickets succeed but subsequent tickets fail, the preceeding + // approved tokens are just "burned". + // + // TODO: If https://github.com/udoprog/leaky-bucket/issues/34 is resolved we could + // support a "max debt" number, allowing us to delay if acquisition of the token + // would happen soon-ish, instead of immediately 429-ing if the token we need is + // about to become available. + if singles + .chain(multis) + .any(|t| t.now_or_never() == Outcome::Declined) + { + tracing::trace!("Rejecting due to rate limiting failure"); + session.downstream_session.respond_error(429).await; + return Ok(true); + } + for filter in &self.modifiers.request_filters { match filter.request_filter(session, ctx).await { // If Ok true: we're done handling this request diff --git a/source/river/src/proxy/rate_limiting/mod.rs b/source/river/src/proxy/rate_limiting/mod.rs new file mode 100644 index 0000000..f6ba307 --- /dev/null +++ b/source/river/src/proxy/rate_limiting/mod.rs @@ -0,0 +1,80 @@ +use std::{ops::Deref, sync::Arc}; + +use leaky_bucket::RateLimiter; +use regex::Regex; + +use self::{ + multi::{MultiRaterConfig, MultiRequestKeyKind}, + single::{SingleInstanceConfig, SingleRequestKeyKind}, +}; + +// +// We have two kinds of rate limiters: +// +// * "Multi" rate limiters use a cache of buckets. These are used when we remember +// multiple bucket keys, like tracking all of the source IP addresses +// * "Single" rate limiters use a single bucket, for example `any-matching-uri`, +// which uses a single bucket for all matching URIs +pub mod multi; +pub mod single; + +#[derive(Debug, PartialEq, Clone)] +pub enum AllRateConfig { + Single { + kind: SingleRequestKeyKind, + config: SingleInstanceConfig, + }, + Multi { + kind: MultiRequestKeyKind, + config: MultiRaterConfig, + }, +} + +#[derive(Debug, Clone)] +pub struct RegexShim(pub Regex); + +impl PartialEq for RegexShim { + fn eq(&self, other: &Self) -> bool { + self.0.as_str().eq(other.0.as_str()) + } +} + +impl Deref for RegexShim { + type Target = Regex; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl RegexShim { + pub fn new(pattern: &str) -> Result { + Ok(Self(Regex::new(pattern)?)) + } +} + +#[derive(Debug, PartialEq, Clone, Copy)] +pub enum Outcome { + Approved, + Declined, +} + +/// A claim ticket for the leaky bucket queue +/// +/// You are expected to call [`Ticket::wait()`] to wait for your turn to perform +/// the rate limited option. +#[must_use = "You must call `Ticket::wait()` to wait your turn!"] +pub struct Ticket { + limiter: Arc, +} + +impl Ticket { + /// Try to get a token immediately from the bucket. + pub fn now_or_never(self) -> Outcome { + if self.limiter.try_acquire(1) { + Outcome::Approved + } else { + Outcome::Declined + } + } +} diff --git a/source/river/src/proxy/rate_limiting/multi.rs b/source/river/src/proxy/rate_limiting/multi.rs new file mode 100644 index 0000000..247bd99 --- /dev/null +++ b/source/river/src/proxy/rate_limiting/multi.rs @@ -0,0 +1,293 @@ +//! Rate Limiting +//! +//! This is an implementation of request rate limiting. +//! +//! See the [`Rater`] structure for more details + +use std::{fmt::Debug, hash::Hash, net::IpAddr, sync::Arc, time::Duration}; + +use concread::arcache::{ARCache, ARCacheBuilder}; +use leaky_bucket::RateLimiter; +use pandora_module_utils::pingora::SocketAddr; +use pingora_proxy::Session; + +use crate::proxy::rate_limiting::Ticket; + +use super::RegexShim; + +#[derive(Debug, Clone, PartialEq)] +pub struct MultiRaterInstanceConfig { + pub rater_cfg: MultiRaterConfig, + pub kind: MultiRequestKeyKind, +} + +/// Configuration for the [`Rater`] +#[derive(Debug, PartialEq, Clone)] +pub struct MultiRaterConfig { + /// The number of expected concurrent threads - should match the number of + /// tokio threadpool workers + pub threads: usize, + /// The peak number of leaky buckets we aim to have live at once + /// + /// NOTE: This is not a hard limit of the amount of memory used. See [`ARCacheBuilder`] + /// for docs on calculating actual memory usage based on these parameters + pub max_buckets: usize, + /// The max and initial number of tokens in the leaky bucket - this is the number of + /// requests that can go through without any waiting if the bucket is full + pub max_tokens_per_bucket: usize, + /// The interval between "refills" of the bucket, e.g. the bucket refills `refill_qty` + /// every `refill_interval_millis` + pub refill_interval_millis: usize, + /// The number of tokens added to the bucket every `refill_interval_millis` + pub refill_qty: usize, +} + +#[derive(Debug, Clone, PartialEq)] +pub enum MultiRequestKeyKind { + SourceIp, + Uri { pattern: RegexShim }, +} + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum MultiRequestKey { + Source(IpAddr), + Uri(String), +} + +#[derive(Debug)] +pub struct MultiRaterInstance { + pub rater: Rater, + pub kind: MultiRequestKeyKind, +} + +impl MultiRaterInstance { + pub fn new(config: MultiRaterConfig, kind: MultiRequestKeyKind) -> Self { + Self { + rater: Rater::new(config), + kind, + } + } + + pub fn get_ticket(&self, session: &Session) -> Option { + let key = self.get_key(session)?; + Some(self.rater.get_ticket(key)) + } + + pub fn get_key(&self, session: &Session) -> Option { + match &self.kind { + MultiRequestKeyKind::SourceIp => { + let src = session.downstream_session.client_addr()?; + let src_ip = match src { + SocketAddr::Inet(addr) => addr.ip(), + SocketAddr::Unix(_) => return None, + }; + Some(MultiRequestKey::Source(src_ip)) + } + MultiRequestKeyKind::Uri { pattern } => { + let uri_path = session.downstream_session.req_header().uri.path(); + if pattern.is_match(uri_path) { + Some(MultiRequestKey::Uri(uri_path.to_string())) + } else { + None + } + } + } + } +} + +/// A concurrent rate limiting structure +/// +/// ## Implementation details and notes +/// +/// For performance and resource reasons, this provides an *approximation* of exact rate +/// limiting. Currently, there are a few "false positive" cases that can permit more than +/// the expected number of actions to occur. +/// +/// Rater is currently modeled as a Least Recently Used (LRU) cache of leaky buckets mapped +/// by a key. This is done to provide a bounded quantity of leaky buckets, without requiring +/// a worker task to "cull" the oldest buckets. Instead, unused buckets will naturally +/// "fall out" of the cache if they are not used. +/// +/// ### Too many unique keys at too high of a rate +/// +/// If there is a very high diversity of Keys provided, it is possible that keys could +/// be evicted from the cache before they would naturally expire or be refilled. In this +/// case, Rater will appear to not apply rate limiting, as the evicted bucket will be +/// replaced with a new, initially full bucket. This can be mitigated by choosing a +/// bucket storage capacity that is large enough to hold enough buckets to handle the +/// expected requests per second. e.g. if there is room for 1M buckets, and a bucket would +/// refill one token every 100ms, then we would expect to be able to handle at least 100K +/// requests with unique keys per second without evicting the buckets before the bucket +/// would refill anyway. +/// +/// ### A burst of previously-unseen keys +/// +/// If a number of requests appear around the same time for a Key that is not resident +/// in the cache, it is possible that all worker threads will create a new bucket and +/// attempt to add their buckets to the cache, though only one will be persisted, and +/// the others will be lost. +/// +/// For example if there are N worker threads, and N requests with the same key arrive +/// at roughly the same time, it is possible that we will create N new leaky buckets, +/// each that will give one immediately-ready token for the request. However, in the +/// worst case (N - 1) of these tokens won't "count", as (N - 1) of these buckets +/// will be thrown away, and not counted in the one bucket that was persisted +/// +/// This worst case is extremely unlikely, as it would require N requests with the same Key +/// to arrive in the time window necessary to write to the cache, and for all N requests +/// to be distributed to N different worker threads that all attempt to find the Key +/// at the same time. +/// +/// There is no mitigation for this currently, other than treating the "max tokens per +/// bucket" as an approximate value, with up to "number of worker threads" of false +/// positives as an acceptable bound. +pub struct Rater +where + Key: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static, +{ + cache: ARCache>, + max_tokens_per_bucket: usize, + refill_interval_millis: usize, + refill_qty: usize, +} + +impl Debug for Rater +where + Key: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("Rater { ... }") + } +} + +impl Rater +where + Key: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static, +{ + /// Create a new rate limiter with the given configuration. + /// + /// See [`MultiRaterConfig`] for configuration options. + pub fn new(config: MultiRaterConfig) -> Self { + let MultiRaterConfig { + threads, + max_buckets, + max_tokens_per_bucket, + refill_interval_millis, + refill_qty, + } = config; + let cache = ARCacheBuilder::new() + .set_expected_workload( + // total + // + // total number of items you want to have in memory + max_buckets, + // threads + // + // the number of read threads you expect concurrently (AT LEAST 1) + threads.max(1), + // ex_ro_miss + // + // the expected average number of cache misses per read operation + 1, + // ex_rw_miss + // + // the expected average number of writes or write cache misses per operation + 1, + // read_cache + // + // ? + false, + ) + .build() + .expect("Creation of rate limiter should not fail"); + + Self { + cache, + max_tokens_per_bucket, + refill_interval_millis, + refill_qty: refill_qty.min(max_tokens_per_bucket), + } + } + + /// Obtain a ticket for the given Key. + /// + /// If the Key does not exist already, it will be created. + pub fn get_ticket(&self, key: Key) -> Ticket { + let mut reader = self.cache.read(); + + if let Some(find) = reader.get(&key) { + // Rate limiter DID exist in the cache + tracing::trace!(?key, "rate limiting cache hit",); + Ticket { + limiter: find.clone(), + } + } else { + let new_limiter = Arc::new(self.new_rate_limiter()); + tracing::debug!(?key, "rate limiting cache miss",); + reader.insert(key, new_limiter.clone()); + reader.finish(); + Ticket { + limiter: new_limiter, + } + } + } + + fn new_rate_limiter(&self) -> RateLimiter { + RateLimiter::builder() + .initial(self.max_tokens_per_bucket) + .max(self.max_tokens_per_bucket) + .interval(Duration::from_millis(self.refill_interval_millis as u64)) + .refill(self.refill_qty) + .fair(true) + .build() + } +} + +#[cfg(test)] +mod test { + use crate::proxy::rate_limiting::Outcome; + + use super::*; + use std::time::Instant; + use tokio::time::interval; + + #[tokio::test] + async fn smoke() { + let _ = tracing_subscriber::fmt::try_init(); + let config = MultiRaterConfig { + threads: 2, + max_buckets: 5, + max_tokens_per_bucket: 3, + refill_interval_millis: 10, + refill_qty: 1, + }; + + let rater = Arc::new(Rater::new(config.clone())); + let mut sleeper = interval(Duration::from_millis(6)); + let start = Instant::now(); + let mut approved = 0; + for i in 0..100 { + sleeper.tick().await; + let ticket = rater.get_ticket("bob".to_string()); + + match ticket.now_or_never() { + Outcome::Approved => { + approved += 1; + tracing::info!("Approved {i}!") + } + Outcome::Declined => tracing::info!("Declined {i}!"), + } + } + let duration = start.elapsed(); + let duration = duration.as_secs_f64(); + let approved = approved as f64; + + let expected_rate = 1000.0f64 / config.refill_interval_millis as f64; + let expected_ttl = (duration * expected_rate) + config.max_tokens_per_bucket as f64; + + // Did we get +/-10% of the expected number of approvals? + tracing::info!(expected_ttl, actual_ttl = approved, "Rates"); + assert!(approved > (expected_ttl * 0.9f64)); + assert!(approved < (expected_ttl * 1.1f64)); + } +} diff --git a/source/river/src/proxy/rate_limiting/single.rs b/source/river/src/proxy/rate_limiting/single.rs new file mode 100644 index 0000000..42d968e --- /dev/null +++ b/source/river/src/proxy/rate_limiting/single.rs @@ -0,0 +1,68 @@ +use std::{sync::Arc, time::Duration}; + +use leaky_bucket::RateLimiter; +use pingora_proxy::Session; + +use super::{RegexShim, Ticket}; + +#[derive(Debug, PartialEq, Clone)] +pub struct SingleInstanceConfig { + /// The max and initial number of tokens in the leaky bucket - this is the number of + /// requests that can go through without any waiting if the bucket is full + pub max_tokens_per_bucket: usize, + /// The interval between "refills" of the bucket, e.g. the bucket refills `refill_qty` + /// every `refill_interval_millis` + pub refill_interval_millis: usize, + /// The number of tokens added to the bucket every `refill_interval_millis` + pub refill_qty: usize, +} + +#[derive(Debug, Clone, PartialEq)] +pub enum SingleRequestKeyKind { + UriGroup { pattern: RegexShim }, +} + +#[derive(Debug)] +pub struct SingleInstance { + pub limiter: Arc, + pub kind: SingleRequestKeyKind, +} + +impl SingleInstance { + /// Create a new rate limiter with the given configuration. + /// + /// See [`SingleInstanceConfig`] for configuration options. + pub fn new(config: SingleInstanceConfig, kind: SingleRequestKeyKind) -> Self { + let SingleInstanceConfig { + max_tokens_per_bucket, + refill_interval_millis, + refill_qty, + } = config; + + let limiter = RateLimiter::builder() + .initial(max_tokens_per_bucket) + .max(max_tokens_per_bucket) + .interval(Duration::from_millis(refill_interval_millis as u64)) + .refill(refill_qty) + .fair(true) + .build(); + let limiter = Arc::new(limiter); + + Self { limiter, kind } + } + + pub fn get_ticket(&self, session: &Session) -> Option { + match &self.kind { + SingleRequestKeyKind::UriGroup { pattern } => { + let uri_path = session.downstream_session.req_header().uri.path(); + if pattern.is_match(uri_path) { + Some(Ticket { + limiter: self.limiter.clone(), + }) + } else { + None + } + } + } + } +} diff --git a/source/river/src/proxy/request_filters.rs b/source/river/src/proxy/request_filters.rs index e267fdd..144ce7c 100644 --- a/source/river/src/proxy/request_filters.rs +++ b/source/river/src/proxy/request_filters.rs @@ -50,7 +50,8 @@ impl RequestFilterMod for CidrRangeFilter { async fn request_filter(&self, session: &mut Session, _ctx: &mut RiverContext) -> Result { let Some(addr) = session.downstream_session.client_addr() else { // Unable to determine source address, assuming it should be blocked - return Err(Error::new_down(ErrorType::Custom("Missing Client Address"))); + session.downstream_session.respond_error(401).await; + return Ok(true); }; let SocketAddr::Inet(addr) = addr else { // CIDR filters don't apply to UDS @@ -59,7 +60,8 @@ impl RequestFilterMod for CidrRangeFilter { let ip_addr = addr.ip(); if self.blocks.iter().any(|b| b.contains(&ip_addr)) { - Err(Error::new_down(ErrorType::ConnectRefused)) + session.downstream_session.respond_error(401).await; + Ok(true) } else { Ok(false) } diff --git a/user-manual/src/config/kdl.md b/user-manual/src/config/kdl.md index a148169..8c34872 100644 --- a/user-manual/src/config/kdl.md +++ b/user-manual/src/config/kdl.md @@ -97,6 +97,16 @@ services { filter kind="upsert-header" key="x-with-love-from" value="river" } } + rate-limiting { + rule kind="source-ip" \ + max-buckets=4000 tokens-per-bucket=10 refill-qty=1 refill-rate-ms=10 + + rule kind="specific-uri" pattern="static/.*" \ + max-buckets=2000 tokens-per-bucket=20 refill-qty=5 refill-rate-ms=1 + + rule kind="any-matching-uri" pattern=r".*\.mp4" \ + tokens-per-bucket=50 refill-qty=2 refill-rate-ms=3 + } } Example3 { listeners { @@ -255,6 +265,112 @@ Filters at this stage are the earliest. Currently supported filters: * Arguments: `key="KEY" value="VALUE"`, where `KEY` is a valid HTTP header key, and `VALUE` is a valid HTTP header value * The given header will be added or replaced to `VALUE` +### `services.$NAME.rate-limiting` + +This section contains the configuration for rate limiting rules. + +Rate limiting rules are used to limit the total number of requests made by downstream clients, +based on various criteria. + +Note that Rate limiting is on a **per service** basis, services do not share rate limiting +information. + +This section is optional. + +Example: + +``` +rate-limiting { + rule kind="source-ip" \ + max-buckets=4000 tokens-per-bucket=10 refill-qty=1 refill-rate-ms=10 + + rule kind="specific-uri" pattern="static/.*" \ + max-buckets=2000 tokens-per-bucket=20 refill-qty=5 refill-rate-ms=1 + + rule kind="any-matching-uri" pattern=r".*\.mp4" \ + tokens-per-bucket=50 refill-qty=2 refill-rate-ms=3 +} +``` + +#### `services.$NAME.rate-limiting.rule` + +Rules are used to specify rate limiting parameters, and applicability of rules to a given request. + +##### Leaky Buckets + +Rate limiting in River uses a [Leaky Bucket] model for determining whether a request can be served +immediately, or if it should be rejected. For a given rule, a "bucket" of "tokens" is created, where +one "token" is required for each request. + +The bucket for a rule starts with a configurable `tokens-per-bucket` number. When a request arrives, +it attempts to take one token from the bucket. If one is available, it is served immediately. Otherwise, +the request is rejected immediately. + +The bucket is refilled at a configurable rate, specified by `refill-rate-ms`, and adds a configurable +number of tokens specified by `refill-qty`. The number of tokens in the bucket will never exceed the +initial `tokens-per-bucket` number. + +Once a refill occurs, additional requests may be served. + +[Leaky Bucket]: https://en.wikipedia.org/wiki/Leaky_bucket + +##### How many buckets? + +Some rules require many buckets. For example, rules based on the source IP address will create a bucket +for each unique source IP address observed in a request. We refer to these as "multi" rules. + +However, each of these buckets require space to contain the metadata, and to avoid unbounded growth, +we allow for a configurable `max-buckets` number, which serves to influence the total memory required +for storing buckets. This uses an [Adaptive Replacement Cache] +to allow for concurrent access to these buckets, as well as the ability to automatically evict buckets that +are not actively being used (somewhat similar to an LRU or "Least Recently Used" cache). + +[Adaptive Replacement Cache]: https://docs.rs/concread/latest/concread/arcache/index.html + +There is a trade off here: The larger `max-buckets` is, the longer that River can "remember" a bucket +for a given factor, such as specific IP addresses. However, it also requires more resident memory to +retain this information. + +If `max-buckets` is set too low, then buckets will be "evicted" from the cache, meaning that subsequent +requests matching that bucket will require the creation of a new bucket (with a full set of tokens), +potentially defeating the objective of accurate rate limiting. + +For "single" rules, or rules that do not have multiple buckets, a single bucket will be shared by all +requests matching the rule. + +##### Gotta claim 'em all + +When multiple rules apply to a single request, for example rules based on both source IP address, +and the URI path, then a request must claim ALL applicable tokens before proceeding. If a given IP +address is making it's first request, but to a URI that that has an empty bucket, it will immediately +obtain the IP address token, but the request will be rejected as the URI bucket claim failed. + +##### Kinds of Rules + +Currently three kinds of rules are supported: + +* `kind="source-ip"` - this tracks the IP address of the requestor. + * This rule is a "multi" rule: A unique bucket will be created for + the IPv4 or IPv6 address of the requestor. + * The `max-buckets` parameter controls how many IP addresses will be remembered. +* `kind="specific-uri" pattern="REGEX"` - This tracks the URI path of the request, such as `static/images/example.jpg` + * This rule is a "multi" rule: if the request's URI path matches the provided `REGEX`, + the full URI path will be assigned to a given bucket + * For example, if the regex `static/.*` was provided: + * `index.html` would not match this rule, and would not require obtaining a token + * `static/images/example.jpg` would match this rule, and would require obtaining a token + * `static/styles/example.css` would also match this rule, and would require obtaining a token + * Note that `static/images/example.jpg` and `static/styles/example.css` would each have a UNIQUE + bucket. +* `kind="any-matching-uri" pattern="REGEX"` - This tracks the URI path of the request, such as `static/videos/example.mp4` + * This is a "single" rule: ANY path matching `REGEX` will share a single bucket + * For example, if the regex `.*\.mp4` was provided: + * `index.html` would not match this rule, and would not require obtaining a token + * `static/videos/example1.mp4` would match this rule, and would require obtaining a token + * `static/videos/example2.mp4` would also match this rule, and would require obtaining a token + * Note that `static/videos/example1.mp4` and `static/videos/example2.mp4` would share a SINGLE bucket + (also shared with any other path containing an MP4 file) + ### `services.$NAME.file-server` This section is only allowed when `connectors` and `path-control` are not present.