Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions limitador/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,10 @@
#![allow(clippy::multiple_crate_versions)]

use crate::counter::Counter;
use crate::errors::LimitadorError;

Check warning on line 197 in limitador/src/lib.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/limitador/limitador/limitador/src/lib.rs
use crate::limit::{Context, Limit, Namespace};
use crate::storage::in_memory::InMemoryStorage;
use crate::storage::{AsyncCounterStorage, AsyncStorage, Authorization, CounterStorage, Storage};
use crate::storage::{AsyncCounterStorage, AsyncStorage, Authorization, CounterKey, CounterStorage, Storage};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

Expand Down Expand Up @@ -404,7 +404,7 @@
if counters.is_empty() {
return Ok(CheckResult {
limited: false,
counters,
counters: Vec::default(),
limit_name: None,
});
}
Expand Down Expand Up @@ -478,16 +478,12 @@
&self,
namespace: &Namespace,
ctx: &Context,
) -> LimitadorResult<Vec<Counter>> {
) -> LimitadorResult<Vec<CounterKey>> {
let limits = self.storage.get_limits(namespace);
limits
.iter()
.filter(|lim| lim.applies(ctx))
.filter_map(|lim| match Counter::new(Arc::clone(lim), ctx) {
Ok(None) => None,
Ok(Some(c)) => Some(Ok(c)),
Err(e) => Some(Err(e)),
})
.map(|lim| lim.into())
.collect()
}
}
Expand Down
58 changes: 48 additions & 10 deletions limitador/src/storage/disk/rocksdb_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@
}

impl CounterStorage for RocksDbStorage {
#[tracing::instrument(skip_all)]

Check failure on line 24 in limitador/src/storage/disk/rocksdb_storage.rs

View workflow job for this annotation

GitHub Actions / Check

method `is_within_limits` is not a member of trait `CounterStorage`

Check failure on line 24 in limitador/src/storage/disk/rocksdb_storage.rs

View workflow job for this annotation

GitHub Actions / Clippy

method `is_within_limits` is not a member of trait `CounterStorage`

Check failure on line 24 in limitador/src/storage/disk/rocksdb_storage.rs

View workflow job for this annotation

GitHub Actions / Test Suite

method `is_within_limits` is not a member of trait `CounterStorage`
fn is_within_limits(&self, counter: &Counter, delta: u64) -> Result<bool, StorageErr> {
let key = key_for_counter(counter);
let value = self.insert_or_update(&key, counter, 0)?;
Ok(counter.max_value() >= value.value() + delta)
}

#[tracing::instrument(skip_all)]

Check failure on line 31 in limitador/src/storage/disk/rocksdb_storage.rs

View workflow job for this annotation

GitHub Actions / Check

method `add_counter` is not a member of trait `CounterStorage`

Check failure on line 31 in limitador/src/storage/disk/rocksdb_storage.rs

View workflow job for this annotation

GitHub Actions / Clippy

method `add_counter` is not a member of trait `CounterStorage`

Check failure on line 31 in limitador/src/storage/disk/rocksdb_storage.rs

View workflow job for this annotation

GitHub Actions / Test Suite

method `add_counter` is not a member of trait `CounterStorage`
fn add_counter(&self, _limit: &Limit) -> Result<(), StorageErr> {
Ok(())
}

#[tracing::instrument(skip_all)]

Check failure on line 36 in limitador/src/storage/disk/rocksdb_storage.rs

View workflow job for this annotation

GitHub Actions / Check

method `update_counter` is not a member of trait `CounterStorage`

Check failure on line 36 in limitador/src/storage/disk/rocksdb_storage.rs

View workflow job for this annotation

GitHub Actions / Clippy

method `update_counter` is not a member of trait `CounterStorage`

Check failure on line 36 in limitador/src/storage/disk/rocksdb_storage.rs

View workflow job for this annotation

GitHub Actions / Test Suite

method `update_counter` is not a member of trait `CounterStorage`
fn update_counter(&self, counter: &Counter, delta: u64) -> Result<(), StorageErr> {
let key = key_for_counter(counter);
self.insert_or_update(&key, counter, delta)?;
Expand All @@ -42,10 +42,50 @@

#[tracing::instrument(skip_all)]
fn check_and_update(
&self,
counters: &[Counter],
delta: u64,
) -> Result<Authorization, StorageErr> {
let mut keys: Vec<Vec<u8>> = Vec::with_capacity(counters.len());

for counter in counters {
let key = key_for_counter(counter);
let slice: &[u8] = key.as_ref();
let entry = {
let span = debug_span!("datastore");
let _entered = span.enter();
self.db.get(slice)?
};
let (val, _) = match entry {
None => (0, Duration::from_secs(counter.limit().seconds())),
Some(raw) => {
let slice: &[u8] = raw.as_ref();
let value: ExpiringValue = slice.try_into()?;
(value.value(), value.ttl())
}
};

if counter.max_value() < val + delta {
return Ok(Authorization::Limited(
counter.limit().name().map(|n| n.to_string()),
));
}

keys.push(key);
}

for (idx, counter) in counters.iter().enumerate() {
self.insert_or_update(&keys[idx], counter, delta)?;
}

Ok(Authorization::Ok)
}

#[tracing::instrument(skip_all)]
fn check_and_update_loading(
&self,
counters: &mut Vec<Counter>,
delta: u64,
load_counters: bool,
) -> Result<Authorization, StorageErr> {
let mut keys: Vec<Vec<u8>> = Vec::with_capacity(counters.len());

Expand All @@ -66,15 +106,13 @@
}
};

if load_counters {
counter.set_expires_in(ttl);
counter.set_remaining(
counter
.max_value()
.checked_sub(val + delta)
.unwrap_or_default(),
);
}
counter.set_expires_in(ttl);
counter.set_remaining(
counter
.max_value()
.checked_sub(val + delta)
.unwrap_or_default(),
);

if counter.max_value() < val + delta {
return Ok(Authorization::Limited(
Expand Down
100 changes: 80 additions & 20 deletions limitador/src/storage/distributed/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
}

impl CounterStorage for CrInMemoryStorage {
#[tracing::instrument(skip_all)]

Check failure on line 31 in limitador/src/storage/distributed/mod.rs

View workflow job for this annotation

GitHub Actions / Check

method `is_within_limits` is not a member of trait `CounterStorage`

Check failure on line 31 in limitador/src/storage/distributed/mod.rs

View workflow job for this annotation

GitHub Actions / Clippy

method `is_within_limits` is not a member of trait `CounterStorage`

Check failure on line 31 in limitador/src/storage/distributed/mod.rs

View workflow job for this annotation

GitHub Actions / Test Suite

method `is_within_limits` is not a member of trait `CounterStorage`
fn is_within_limits(&self, counter: &Counter, delta: u64) -> Result<bool, StorageErr> {
let limits = self.limits.read().unwrap();

Expand All @@ -40,7 +40,7 @@
Ok(counter.max_value() >= value + delta)
}

#[tracing::instrument(skip_all)]

Check failure on line 43 in limitador/src/storage/distributed/mod.rs

View workflow job for this annotation

GitHub Actions / Check

method `add_counter` is not a member of trait `CounterStorage`

Check failure on line 43 in limitador/src/storage/distributed/mod.rs

View workflow job for this annotation

GitHub Actions / Clippy

method `add_counter` is not a member of trait `CounterStorage`

Check failure on line 43 in limitador/src/storage/distributed/mod.rs

View workflow job for this annotation

GitHub Actions / Test Suite

method `add_counter` is not a member of trait `CounterStorage`
fn add_counter(&self, limit: &Limit) -> Result<(), StorageErr> {
if limit.variables().is_empty() {
let mut limits = self.limits.write().unwrap();
Expand All @@ -60,7 +60,7 @@
Ok(())
}

#[tracing::instrument(skip_all)]

Check failure on line 63 in limitador/src/storage/distributed/mod.rs

View workflow job for this annotation

GitHub Actions / Check

method `update_counter` is not a member of trait `CounterStorage`

Check failure on line 63 in limitador/src/storage/distributed/mod.rs

View workflow job for this annotation

GitHub Actions / Clippy

method `update_counter` is not a member of trait `CounterStorage`

Check failure on line 63 in limitador/src/storage/distributed/mod.rs

View workflow job for this annotation

GitHub Actions / Test Suite

method `update_counter` is not a member of trait `CounterStorage`
fn update_counter(&self, counter: &Counter, delta: u64) -> Result<(), StorageErr> {
let mut limits = self.limits.write().unwrap();
let now = SystemTime::now();
Expand Down Expand Up @@ -91,25 +91,14 @@
#[tracing::instrument(skip_all)]
fn check_and_update(
&self,
counters: &mut Vec<Counter>,
counters: &[Counter],
delta: u64,
load_counters: bool,
) -> Result<Authorization, StorageErr> {
let mut first_limited = None;
let mut counter_values_to_update: Vec<Vec<u8>> = Vec::new();
let now = SystemTime::now();

let mut process_counter =
|counter: &mut Counter, value: u64, delta: u64| -> Option<Authorization> {
if load_counters {
let remaining = counter.max_value().checked_sub(value + delta);
counter.set_remaining(remaining.unwrap_or(0));
if first_limited.is_none() && remaining.is_none() {
first_limited = Some(Authorization::Limited(
counter.limit().name().map(|n| n.to_owned()),
));
}
}
let process_counter =
|counter: &Counter, value: u64, delta: u64| -> Option<Authorization> {
if !Self::counter_is_within_limits(counter, Some(&value), delta) {
return Some(Authorization::Limited(
counter.limit().name().map(|n| n.to_owned()),
Expand All @@ -119,7 +108,7 @@
};

// Process simple counters
for counter in counters.iter_mut() {
for counter in counters.iter() {
let key = encode_counter_to_key(counter);

// most of the time the counter should exist, so first try with a read only lock
Expand All @@ -133,9 +122,7 @@
if let Some(limited) =
process_counter(counter, store_value.value.read(), delta)
{
if !load_counters {
return Ok(limited);
}
return Ok(limited);
}
counter_values_to_update.push(key);
true
Expand All @@ -158,10 +145,83 @@
}));

if let Some(limited) = process_counter(counter, store_value.value.read(), delta) {
if !load_counters {
return Ok(limited);
return Ok(limited);
}
counter_values_to_update.push(key);
}
}

// Update counters
let limits = self.limits.read().unwrap();
counter_values_to_update.into_iter().for_each(|key| {
let store_value = limits.get(&key).unwrap();
self.increment_counter(store_value.clone(), delta, now);
});

Ok(Authorization::Ok)
}

#[tracing::instrument(skip_all)]
fn check_and_update_loading(
&self,
counters: &mut Vec<Counter>,
delta: u64,
) -> Result<Authorization, StorageErr> {
let mut first_limited = None;
let mut counter_values_to_update: Vec<Vec<u8>> = Vec::new();
let now = SystemTime::now();

let mut process_counter =
|counter: &mut Counter, value: u64, delta: u64| -> Option<Authorization> {
let remaining = counter.max_value().checked_sub(value + delta);
counter.set_remaining(remaining.unwrap_or(0));
if first_limited.is_none() && remaining.is_none() {
first_limited = Some(Authorization::Limited(
counter.limit().name().map(|n| n.to_owned()),
));
}
if !Self::counter_is_within_limits(counter, Some(&value), delta) {
return Some(Authorization::Limited(
counter.limit().name().map(|n| n.to_owned()),
));
}
None
};

// Process simple counters
for counter in counters.iter_mut() {
let key = encode_counter_to_key(counter);

// most of the time the counter should exist, so first try with a read only lock
// since that will allow us to have higher concurrency
let counter_existed = {
let key = key.clone();
let limits = self.limits.read().unwrap();
match limits.get(&key) {
None => false,
Some(store_value) => {
let _ = process_counter(counter, store_value.value.read(), delta);
counter_values_to_update.push(key);
true
}
}
};

// we need to take the slow path since we need to mutate the limits map.
if !counter_existed {
// try again with a write lock to create the counter if it's still missing.
let mut limits = self.limits.write().unwrap();
let store_value = limits.entry(key.clone()).or_insert(Arc::new(CounterEntry {
key: key.clone(),
counter: counter.clone(),
value: CrCounterValue::new(
self.identifier.clone(),
counter.max_value(),
counter.window(),
),
}));

let _ = process_counter(counter, store_value.value.read(), delta);
counter_values_to_update.push(key);
}
}
Expand Down
Loading
Loading