Skip to content

Commit db73f09

Browse files
committed
Slowly creating the seams
Signed-off-by: Alex Snaps <[email protected]>
1 parent b318247 commit db73f09

File tree

5 files changed

+255
-80
lines changed

5 files changed

+255
-80
lines changed

limitador/src/storage/disk/rocksdb_storage.rs

+48-10
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,50 @@ impl CounterStorage for RocksDbStorage {
4242

4343
#[tracing::instrument(skip_all)]
4444
fn check_and_update(
45+
&self,
46+
counters: &[Counter],
47+
delta: u64,
48+
) -> Result<Authorization, StorageErr> {
49+
let mut keys: Vec<Vec<u8>> = Vec::with_capacity(counters.len());
50+
51+
for counter in counters {
52+
let key = key_for_counter(counter);
53+
let slice: &[u8] = key.as_ref();
54+
let entry = {
55+
let span = debug_span!("datastore");
56+
let _entered = span.enter();
57+
self.db.get(slice)?
58+
};
59+
let (val, _) = match entry {
60+
None => (0, Duration::from_secs(counter.limit().seconds())),
61+
Some(raw) => {
62+
let slice: &[u8] = raw.as_ref();
63+
let value: ExpiringValue = slice.try_into()?;
64+
(value.value(), value.ttl())
65+
}
66+
};
67+
68+
if counter.max_value() < val + delta {
69+
return Ok(Authorization::Limited(
70+
counter.limit().name().map(|n| n.to_string()),
71+
));
72+
}
73+
74+
keys.push(key);
75+
}
76+
77+
for (idx, counter) in counters.iter().enumerate() {
78+
self.insert_or_update(&keys[idx], counter, delta)?;
79+
}
80+
81+
Ok(Authorization::Ok)
82+
}
83+
84+
#[tracing::instrument(skip_all)]
85+
fn check_and_update_loading(
4586
&self,
4687
counters: &mut Vec<Counter>,
4788
delta: u64,
48-
load_counters: bool,
4989
) -> Result<Authorization, StorageErr> {
5090
let mut keys: Vec<Vec<u8>> = Vec::with_capacity(counters.len());
5191

@@ -66,15 +106,13 @@ impl CounterStorage for RocksDbStorage {
66106
}
67107
};
68108

69-
if load_counters {
70-
counter.set_expires_in(ttl);
71-
counter.set_remaining(
72-
counter
73-
.max_value()
74-
.checked_sub(val + delta)
75-
.unwrap_or_default(),
76-
);
77-
}
109+
counter.set_expires_in(ttl);
110+
counter.set_remaining(
111+
counter
112+
.max_value()
113+
.checked_sub(val + delta)
114+
.unwrap_or_default(),
115+
);
78116

79117
if counter.max_value() < val + delta {
80118
return Ok(Authorization::Limited(

limitador/src/storage/distributed/mod.rs

+80-20
Original file line numberDiff line numberDiff line change
@@ -90,25 +90,14 @@ impl CounterStorage for CrInMemoryStorage {
9090
#[tracing::instrument(skip_all)]
9191
fn check_and_update(
9292
&self,
93-
counters: &mut Vec<Counter>,
93+
counters: &[Counter],
9494
delta: u64,
95-
load_counters: bool,
9695
) -> Result<Authorization, StorageErr> {
97-
let mut first_limited = None;
9896
let mut counter_values_to_update: Vec<Vec<u8>> = Vec::new();
9997
let now = SystemTime::now();
10098

101-
let mut process_counter =
102-
|counter: &mut Counter, value: u64, delta: u64| -> Option<Authorization> {
103-
if load_counters {
104-
let remaining = counter.max_value().checked_sub(value + delta);
105-
counter.set_remaining(remaining.unwrap_or(0));
106-
if first_limited.is_none() && remaining.is_none() {
107-
first_limited = Some(Authorization::Limited(
108-
counter.limit().name().map(|n| n.to_owned()),
109-
));
110-
}
111-
}
99+
let process_counter =
100+
|counter: &Counter, value: u64, delta: u64| -> Option<Authorization> {
112101
if !Self::counter_is_within_limits(counter, Some(&value), delta) {
113102
return Some(Authorization::Limited(
114103
counter.limit().name().map(|n| n.to_owned()),
@@ -118,7 +107,7 @@ impl CounterStorage for CrInMemoryStorage {
118107
};
119108

120109
// Process simple counters
121-
for counter in counters.iter_mut() {
110+
for counter in counters.iter() {
122111
let key = encode_counter_to_key(counter);
123112

124113
// most of the time the counter should exist, so first try with a read only lock
@@ -132,9 +121,7 @@ impl CounterStorage for CrInMemoryStorage {
132121
if let Some(limited) =
133122
process_counter(counter, store_value.value.read(), delta)
134123
{
135-
if !load_counters {
136-
return Ok(limited);
137-
}
124+
return Ok(limited);
138125
}
139126
counter_values_to_update.push(key);
140127
true
@@ -157,10 +144,83 @@ impl CounterStorage for CrInMemoryStorage {
157144
}));
158145

159146
if let Some(limited) = process_counter(counter, store_value.value.read(), delta) {
160-
if !load_counters {
161-
return Ok(limited);
147+
return Ok(limited);
148+
}
149+
counter_values_to_update.push(key);
150+
}
151+
}
152+
153+
// Update counters
154+
let limits = self.limits.read().unwrap();
155+
counter_values_to_update.into_iter().for_each(|key| {
156+
let store_value = limits.get(&key).unwrap();
157+
self.increment_counter(store_value.clone(), delta, now);
158+
});
159+
160+
Ok(Authorization::Ok)
161+
}
162+
163+
#[tracing::instrument(skip_all)]
164+
fn check_and_update_loading(
165+
&self,
166+
counters: &mut Vec<Counter>,
167+
delta: u64,
168+
) -> Result<Authorization, StorageErr> {
169+
let mut first_limited = None;
170+
let mut counter_values_to_update: Vec<Vec<u8>> = Vec::new();
171+
let now = SystemTime::now();
172+
173+
let mut process_counter =
174+
|counter: &mut Counter, value: u64, delta: u64| -> Option<Authorization> {
175+
let remaining = counter.max_value().checked_sub(value + delta);
176+
counter.set_remaining(remaining.unwrap_or(0));
177+
if first_limited.is_none() && remaining.is_none() {
178+
first_limited = Some(Authorization::Limited(
179+
counter.limit().name().map(|n| n.to_owned()),
180+
));
181+
}
182+
if !Self::counter_is_within_limits(counter, Some(&value), delta) {
183+
return Some(Authorization::Limited(
184+
counter.limit().name().map(|n| n.to_owned()),
185+
));
186+
}
187+
None
188+
};
189+
190+
// Process simple counters
191+
for counter in counters.iter_mut() {
192+
let key = encode_counter_to_key(counter);
193+
194+
// most of the time the counter should exist, so first try with a read only lock
195+
// since that will allow us to have higher concurrency
196+
let counter_existed = {
197+
let key = key.clone();
198+
let limits = self.limits.read().unwrap();
199+
match limits.get(&key) {
200+
None => false,
201+
Some(store_value) => {
202+
let _ = process_counter(counter, store_value.value.read(), delta);
203+
counter_values_to_update.push(key);
204+
true
162205
}
163206
}
207+
};
208+
209+
// we need to take the slow path since we need to mutate the limits map.
210+
if !counter_existed {
211+
// try again with a write lock to create the counter if it's still missing.
212+
let mut limits = self.limits.write().unwrap();
213+
let store_value = limits.entry(key.clone()).or_insert(Arc::new(CounterEntry {
214+
key: key.clone(),
215+
counter: counter.clone(),
216+
value: CrCounterValue::new(
217+
self.identifier.clone(),
218+
counter.max_value(),
219+
counter.window(),
220+
),
221+
}));
222+
223+
let _ = process_counter(counter, store_value.value.read(), delta);
164224
counter_values_to_update.push(key);
165225
}
166226
}

limitador/src/storage/in_memory.rs

+70-20
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,71 @@ impl CounterStorage for InMemoryStorage {
6969

7070
#[tracing::instrument(skip_all)]
7171
fn check_and_update(
72+
&self,
73+
counters: &[Counter],
74+
delta: u64,
75+
) -> Result<Authorization, StorageErr> {
76+
let limits_by_namespace = self.simple_limits.read().unwrap();
77+
let mut counter_values_to_update: Vec<(&AtomicExpiringValue, Duration)> = Vec::new();
78+
let mut qualified_counter_values_to_updated: Vec<(Arc<AtomicExpiringValue>, Duration)> =
79+
Vec::new();
80+
let now = SystemTime::now();
81+
82+
let process_counter =
83+
|counter: &Counter, value: u64, delta: u64| -> Option<Authorization> {
84+
if !Self::counter_is_within_limits(counter, Some(&value), delta) {
85+
return Some(Authorization::Limited(
86+
counter.limit().name().map(|n| n.to_owned()),
87+
));
88+
}
89+
None
90+
};
91+
92+
// Process simple counters
93+
for counter in counters.iter().filter(|c| !c.is_qualified()) {
94+
let atomic_expiring_value: &AtomicExpiringValue =
95+
limits_by_namespace.get(counter.limit()).unwrap();
96+
97+
if let Some(limited) = process_counter(counter, atomic_expiring_value.value(), delta) {
98+
return Ok(limited);
99+
}
100+
counter_values_to_update.push((atomic_expiring_value, counter.window()));
101+
}
102+
103+
// Process qualified counters
104+
for counter in counters.iter().filter(|c| c.is_qualified()) {
105+
let value = match self.qualified_counters.get(counter) {
106+
None => self.qualified_counters.get_with_by_ref(counter, || {
107+
Arc::new(AtomicExpiringValue::new(0, now + counter.window()))
108+
}),
109+
Some(counter) => counter,
110+
};
111+
112+
if let Some(limited) = process_counter(counter, value.value(), delta) {
113+
return Ok(limited);
114+
}
115+
116+
qualified_counter_values_to_updated.push((value, counter.window()));
117+
}
118+
119+
// Update counters
120+
counter_values_to_update.iter().for_each(|(v, ttl)| {
121+
v.update(delta, *ttl, now);
122+
});
123+
qualified_counter_values_to_updated
124+
.iter()
125+
.for_each(|(v, ttl)| {
126+
v.update(delta, *ttl, now);
127+
});
128+
129+
Ok(Authorization::Ok)
130+
}
131+
132+
#[tracing::instrument(skip_all)]
133+
fn check_and_update_loading(
72134
&self,
73135
counters: &mut Vec<Counter>,
74136
delta: u64,
75-
load_counters: bool,
76137
) -> Result<Authorization, StorageErr> {
77138
let limits_by_namespace = self.simple_limits.read().unwrap();
78139
let mut first_limited = None;
@@ -83,14 +144,12 @@ impl CounterStorage for InMemoryStorage {
83144

84145
let mut process_counter =
85146
|counter: &mut Counter, value: u64, delta: u64| -> Option<Authorization> {
86-
if load_counters {
87-
let remaining = counter.max_value().checked_sub(value + delta);
88-
counter.set_remaining(remaining.unwrap_or_default());
89-
if first_limited.is_none() && remaining.is_none() {
90-
first_limited = Some(Authorization::Limited(
91-
counter.limit().name().map(|n| n.to_owned()),
92-
));
93-
}
147+
let remaining = counter.max_value().checked_sub(value + delta);
148+
counter.set_remaining(remaining.unwrap_or_default());
149+
if first_limited.is_none() && remaining.is_none() {
150+
first_limited = Some(Authorization::Limited(
151+
counter.limit().name().map(|n| n.to_owned()),
152+
));
94153
}
95154
if !Self::counter_is_within_limits(counter, Some(&value), delta) {
96155
return Some(Authorization::Limited(
@@ -105,11 +164,7 @@ impl CounterStorage for InMemoryStorage {
105164
let atomic_expiring_value: &AtomicExpiringValue =
106165
limits_by_namespace.get(counter.limit()).unwrap();
107166

108-
if let Some(limited) = process_counter(counter, atomic_expiring_value.value(), delta) {
109-
if !load_counters {
110-
return Ok(limited);
111-
}
112-
}
167+
let _ = process_counter(counter, atomic_expiring_value.value(), delta);
113168
counter_values_to_update.push((atomic_expiring_value, counter.window()));
114169
}
115170

@@ -122,12 +177,7 @@ impl CounterStorage for InMemoryStorage {
122177
Some(counter) => counter,
123178
};
124179

125-
if let Some(limited) = process_counter(counter, value.value(), delta) {
126-
if !load_counters {
127-
return Ok(limited);
128-
}
129-
}
130-
180+
let _ = process_counter(counter, value.value(), delta);
131181
qualified_counter_values_to_updated.push((value, counter.window()));
132182
}
133183

limitador/src/storage/mod.rs

+10-3
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,11 @@ impl Storage {
136136
delta: u64,
137137
load_counters: bool,
138138
) -> Result<Authorization, StorageErr> {
139-
self.counters
140-
.check_and_update(counters, delta, load_counters)
139+
if load_counters {
140+
self.counters.check_and_update_loading(counters, delta)
141+
} else {
142+
self.counters.check_and_update(counters, delta)
143+
}
141144
}
142145

143146
pub fn get_counters(&self, namespace: &Namespace) -> Result<HashSet<Counter>, StorageErr> {
@@ -281,10 +284,14 @@ pub trait CounterStorage: Sync + Send {
281284
fn add_counter(&self, limit: &Limit) -> Result<(), StorageErr>;
282285
fn update_counter(&self, counter: &Counter, delta: u64) -> Result<(), StorageErr>;
283286
fn check_and_update(
287+
&self,
288+
counters: &[Counter],
289+
delta: u64,
290+
) -> Result<Authorization, StorageErr>;
291+
fn check_and_update_loading(
284292
&self,
285293
counters: &mut Vec<Counter>,
286294
delta: u64,
287-
load_counters: bool,
288295
) -> Result<Authorization, StorageErr>;
289296
fn get_counters(&self, limits: &HashSet<Arc<Limit>>) -> Result<HashSet<Counter>, StorageErr>; // todo revise typing here?
290297
fn delete_counters(&self, limits: &HashSet<Arc<Limit>>) -> Result<(), StorageErr>; // todo revise typing here?

0 commit comments

Comments
 (0)