Flux Limiter
A high-performance rate limiter based on the Generic Cell Rate Algorithm (GCRA) with nanosecond precision and lock-free concurrent access.
Features
- Mathematically precise: Implements the GCRA algorithm with exact nanosecond timing
- High performance: Lock-free concurrent access using DashMap
- Generic client IDs: Works with any hashable client identifier (
String,IpAddr,u64, etc.) - Rich metadata: Returns detailed decision information for HTTP response construction
- Memory efficient: Automatic cleanup of stale client entries
- Robust error handling: Graceful handling of clock failures and configuration errors
- Testable: Clock abstraction enables deterministic testing
- Thread-safe: Safe to use across multiple threads
- Zero allocations: Efficient hot path with minimal overhead
What is Rate Limiting?
Rate limiting is a technique used to control the rate at which requests or operations are processed. It’s commonly used to:
- Protect services: Prevent abuse and ensure fair resource allocation
- Control costs: Limit API usage to manage infrastructure costs
- Ensure quality of service: Prevent individual users from degrading performance for others
- Comply with policies: Enforce usage limits and SLA agreements
Why Flux Limiter?
Flux Limiter stands out with its focus on:
- Correctness: Mathematically precise GCRA implementation
- Performance: Lock-free concurrency with O(1) operations
- Reliability: Comprehensive error handling and graceful degradation
- Observability: Rich metadata for monitoring and HTTP headers
- Flexibility: Generic design supporting various client ID types
Algorithm: GCRA
Flux Limiter implements the Generic Cell Rate Algorithm (GCRA), which is mathematically equivalent to the token bucket algorithm but offers several advantages:
- No background token refill processes
- Exact timing without floating-point precision loss
- Efficient state representation (one timestamp per client)
- Deterministic behavior with integer arithmetic
Performance Characteristics
- Memory: O(number of active clients)
- Time complexity: O(1) for
check_request()operations - Concurrency: Lock-free reads and writes via DashMap
- Precision: Nanosecond timing accuracy
- Throughput: Millions of operations per second
- Reliability: Graceful degradation on system clock issues
Next Steps
- Installation - Add Flux Limiter to your project
- Quick Start - Get started in minutes
- Architecture - Understand the design and implementation
Installation
Add Flux Limiter to your Rust project using Cargo.
Adding to Cargo.toml
Add this to your Cargo.toml:
[dependencies]
flux-limiter = "0.7.2"
Alternative: Using Cargo Add
If you have cargo-edit installed, you can use:
cargo add flux-limiter
Verify Installation
Create a simple example to verify the installation:
use flux_limiter::{FluxLimiter, FluxLimiterConfig, SystemClock};
fn main() {
let config = FluxLimiterConfig::new(10.0, 5.0);
let limiter = FluxLimiter::with_config(config, SystemClock).unwrap();
match limiter.check_request("test_client") {
Ok(decision) => {
println!("Request allowed: {}", decision.allowed);
}
Err(e) => {
eprintln!("Error: {}", e);
}
}
}
Run with:
cargo run
You should see output like:
Request allowed: true
Next Steps
Now that you have Flux Limiter installed, proceed to:
- Quick Start - Learn the basics
- Configuration - Understand rate and burst settings
Quick Start
Get started with Flux Limiter in just a few minutes.
Basic Example
#![allow(unused)]
fn main() {
use flux_limiter::{FluxLimiter, FluxLimiterConfig, SystemClock};
// Create a rate limiter: 10 requests per second with burst of 5
let config = FluxLimiterConfig::new(10.0, 5.0);
let limiter = FluxLimiter::with_config(config, SystemClock).unwrap();
// Check if a request should be allowed
match limiter.check_request("user_123") {
Ok(decision) => {
if decision.allowed {
println!("Request allowed");
} else {
println!("Rate limited - retry after {:.2}s",
decision.retry_after_seconds.unwrap_or(0.0));
}
}
Err(e) => {
eprintln!("Rate limiter error: {}", e);
// Handle error appropriately (e.g., allow request, log error)
}
}
}
Understanding the Example
Let’s break down what’s happening:
-
Create Configuration:
FluxLimiterConfig::new(10.0, 5.0)- Rate: 10 requests per second
- Burst: 5 additional requests allowed in bursts
- Total capacity: ~6 requests can be made immediately
-
Create Rate Limiter:
FluxLimiter::with_config(config, SystemClock)- Uses the configuration
- Uses
SystemClockfor production time source - Returns
Resultto handle configuration errors
-
Check Request:
limiter.check_request("user_123")- Checks if the client “user_123” can make a request
- Returns rich metadata about the decision
- Automatically updates internal state
Decision Metadata
The FluxLimiterDecision struct provides detailed information:
#![allow(unused)]
fn main() {
pub struct FluxLimiterDecision {
pub allowed: bool, // Whether to allow the request
pub retry_after_seconds: Option<f64>, // When to retry (if denied)
pub remaining_capacity: Option<f64>, // Remaining burst capacity
pub reset_time_nanos: u64, // When the window resets
}
}
Using Decision Metadata
#![allow(unused)]
fn main() {
match limiter.check_request("user_123") {
Ok(decision) => {
if decision.allowed {
println!("Request allowed");
if let Some(remaining) = decision.remaining_capacity {
println!("Remaining capacity: {:.2}", remaining);
}
} else {
if let Some(retry_after) = decision.retry_after_seconds {
println!("Please retry after {:.2} seconds", retry_after);
}
}
}
Err(e) => {
eprintln!("Error: {}", e);
}
}
}
Multiple Clients
Flux Limiter automatically tracks state for each unique client:
#![allow(unused)]
fn main() {
let config = FluxLimiterConfig::new(10.0, 5.0);
let limiter = FluxLimiter::with_config(config, SystemClock).unwrap();
// Different clients have independent rate limits
limiter.check_request("user_1").unwrap();
limiter.check_request("user_2").unwrap();
limiter.check_request("user_3").unwrap();
// Each client is tracked separately
}
Thread Safety
Flux Limiter is thread-safe and can be shared across threads:
#![allow(unused)]
fn main() {
use std::sync::Arc;
use std::thread;
let config = FluxLimiterConfig::new(10.0, 5.0);
let limiter = Arc::new(FluxLimiter::with_config(config, SystemClock).unwrap());
let mut handles = vec![];
for i in 0..10 {
let limiter = Arc::clone(&limiter);
let handle = thread::spawn(move || {
let client_id = format!("user_{}", i);
limiter.check_request(client_id)
});
handles.push(handle);
}
for handle in handles {
let result = handle.join().unwrap();
println!("Result: {:?}", result);
}
}
Next Steps
- Configuration - Learn about rate and burst configuration
- Basic Usage - Explore common usage patterns
- Error Handling - Handle errors gracefully
Configuration
Learn how to configure Flux Limiter for your specific use case.
Configuration Basics
Flux Limiter uses FluxLimiterConfig for configuration:
#![allow(unused)]
fn main() {
use flux_limiter::FluxLimiterConfig;
let config = FluxLimiterConfig::new(rate_per_second, burst_capacity);
}
Rate Parameter
The rate parameter defines the sustained requests per second.
- Must be positive (> 0)
- Defines the steady-state request rate
- Example:
rate = 100.0means 100 requests per second
#![allow(unused)]
fn main() {
// 100 requests per second sustained
let config = FluxLimiterConfig::new(100.0, 0.0);
}
Understanding Rate
The rate translates to a time interval between requests:
rate = 1.0→ One request per second (1000ms interval)rate = 10.0→ Ten requests per second (100ms interval)rate = 100.0→ One hundred requests per second (10ms interval)
Burst Parameter
The burst parameter defines additional capacity for handling bursts.
- Must be non-negative (≥ 0)
- Allows temporary spikes above the sustained rate
- Example:
burst = 50.0allows bursts of ~50 additional requests
#![allow(unused)]
fn main() {
// 100 requests per second with burst capacity of 50
let config = FluxLimiterConfig::new(100.0, 50.0);
}
Understanding Burst
- Total capacity: Approximately
1 + burstrequests can be made immediately - Burst recovery: Unused burst capacity accumulates at the configured rate
- Maximum burst: Capped at the configured burst value
Rate and Burst Example
With rate=10.0 and burst=5.0:
- Initial state: Client can make ~6 requests immediately (1 + 5 burst)
- After burst: Limited to sustained rate of 10 requests per second
- Recovery: Burst capacity recovers at 10 units per second
- After 1 second idle: Can make ~6 requests again
Builder Pattern
Use the builder pattern for clearer configuration:
#![allow(unused)]
fn main() {
let config = FluxLimiterConfig::new(0.0, 0.0)
.rate(100.0) // 100 requests per second
.burst(50.0); // Allow bursts of up to 50 requests
}
This approach is especially useful when you want to:
- Start with default values and override specific settings
- Make configuration changes clearer and more readable
Common Configuration Patterns
Strict Rate Limiting (No Burst)
#![allow(unused)]
fn main() {
// Exactly 10 requests per second, no burst allowed
let config = FluxLimiterConfig::new(10.0, 0.0);
}
Use when:
- You need strict, predictable rate limiting
- Bursts would cause issues for your backend
- You want simple, consistent behavior
Flexible Rate Limiting (With Burst)
#![allow(unused)]
fn main() {
// 10 requests per second with burst of 20
let config = FluxLimiterConfig::new(10.0, 20.0);
}
Use when:
- User experience benefits from handling short bursts
- Your backend can handle temporary spikes
- You want to allow occasional bursty traffic
API Gateway Configuration
#![allow(unused)]
fn main() {
// 1000 requests per second with generous burst
let config = FluxLimiterConfig::new(1000.0, 500.0);
}
Use for:
- High-throughput API gateways
- Services that can handle significant load
- Preventing extreme abuse while allowing normal traffic
Free Tier API Configuration
#![allow(unused)]
fn main() {
// 10 requests per minute (0.1667 per second) with small burst
let rate_per_minute = 10.0 / 60.0;
let config = FluxLimiterConfig::new(rate_per_minute, 5.0);
}
Use for:
- Free tier API limits
- Generous limits that prevent abuse
- Pay-per-use API tiers
Configuration Validation
Configuration is validated when creating the rate limiter:
#![allow(unused)]
fn main() {
use flux_limiter::{FluxLimiterConfig, FluxLimiterError};
// Invalid configuration: negative rate
let config = FluxLimiterConfig::new(-10.0, 5.0);
match FluxLimiter::with_config(config, SystemClock) {
Ok(_) => println!("Valid configuration"),
Err(FluxLimiterError::InvalidRate) => {
eprintln!("Error: Rate must be positive");
}
Err(e) => eprintln!("Error: {}", e),
}
}
Validation Rules
- InvalidRate: Returned if rate ≤ 0
- InvalidBurst: Returned if burst < 0
Manual Validation
You can validate configuration before creating the limiter:
#![allow(unused)]
fn main() {
match config.validate() {
Ok(_) => println!("Configuration is valid"),
Err(FluxLimiterError::InvalidRate) => {
eprintln!("Rate must be positive");
}
Err(FluxLimiterError::InvalidBurst) => {
eprintln!("Burst must be non-negative");
}
Err(e) => eprintln!("Configuration error: {}", e),
}
}
Dynamic Configuration
While Flux Limiter doesn’t support runtime configuration changes, you can work around this:
#![allow(unused)]
fn main() {
use std::sync::Arc;
use parking_lot::RwLock;
struct DynamicRateLimiter {
limiter: Arc<RwLock<FluxLimiter<String, SystemClock>>>,
}
impl DynamicRateLimiter {
fn update_config(&self, new_config: FluxLimiterConfig) {
let new_limiter = FluxLimiter::with_config(new_config, SystemClock).unwrap();
*self.limiter.write() = new_limiter;
}
fn check_request(&self, client_id: String) -> Result<FluxLimiterDecision, FluxLimiterError> {
self.limiter.read().check_request(client_id)
}
}
}
Note: Changing configuration resets all client state.
Next Steps
- Basic Usage - Learn common usage patterns
- Advanced Usage - Explore advanced features
- Web Integration - Integrate with web frameworks
Basic Usage
Common patterns and best practices for using Flux Limiter.
Simple Rate Limiting
The most basic use case - check if a request should be allowed:
#![allow(unused)]
fn main() {
use flux_limiter::{FluxLimiter, FluxLimiterConfig, SystemClock};
let config = FluxLimiterConfig::new(10.0, 5.0);
let limiter = FluxLimiter::with_config(config, SystemClock).unwrap();
match limiter.check_request("user_123") {
Ok(decision) if decision.allowed => {
// Process request
println!("Request allowed");
}
Ok(decision) => {
// Rate limited
println!("Rate limited - retry after {:.2}s",
decision.retry_after_seconds.unwrap_or(0.0));
}
Err(e) => {
// Handle error
eprintln!("Error: {}", e);
}
}
}
Working with Different Client ID Types
String Client IDs
#![allow(unused)]
fn main() {
// User IDs, session IDs, API keys
let limiter = FluxLimiter::<String, _>::with_config(config, SystemClock).unwrap();
limiter.check_request("user_123".to_string())?;
limiter.check_request("session_abc".to_string())?;
}
IP Address Client IDs
#![allow(unused)]
fn main() {
use std::net::IpAddr;
let limiter = FluxLimiter::<IpAddr, _>::with_config(config, SystemClock).unwrap();
let client_ip: IpAddr = "192.168.1.1".parse().unwrap();
limiter.check_request(client_ip)?;
}
Numeric Client IDs
#![allow(unused)]
fn main() {
// High-performance numeric IDs
let limiter = FluxLimiter::<u64, _>::with_config(config, SystemClock).unwrap();
limiter.check_request(12345)?;
limiter.check_request(67890)?;
}
Custom Client IDs
#![allow(unused)]
fn main() {
use std::hash::{Hash, Hasher};
#[derive(Clone, PartialEq, Eq, Hash)]
struct ClientId {
user_id: u64,
endpoint: String,
}
let limiter = FluxLimiter::<ClientId, _>::with_config(config, SystemClock).unwrap();
let client = ClientId {
user_id: 123,
endpoint: "/api/data".to_string(),
};
limiter.check_request(client)?;
}
Accessing Configuration
You can query the current rate limiter configuration:
#![allow(unused)]
fn main() {
let limiter = FluxLimiter::with_config(config, SystemClock).unwrap();
println!("Rate: {} req/sec", limiter.rate());
println!("Burst: {}", limiter.burst());
}
Using Decision Metadata
Extracting All Metadata
#![allow(unused)]
fn main() {
match limiter.check_request("user_123") {
Ok(decision) => {
println!("Allowed: {}", decision.allowed);
if let Some(retry_after) = decision.retry_after_seconds {
println!("Retry after: {:.2}s", retry_after);
}
if let Some(remaining) = decision.remaining_capacity {
println!("Remaining capacity: {:.2}", remaining);
}
println!("Reset time: {} ns", decision.reset_time_nanos);
}
Err(e) => eprintln!("Error: {}", e),
}
}
Computing Reset Time
#![allow(unused)]
fn main() {
use std::time::{SystemTime, UNIX_EPOCH, Duration};
match limiter.check_request("user_123") {
Ok(decision) => {
// Convert reset_time_nanos to a timestamp
let reset_duration = Duration::from_nanos(decision.reset_time_nanos);
let reset_time = UNIX_EPOCH + reset_duration;
let now = SystemTime::now();
if let Ok(time_until_reset) = reset_time.duration_since(now) {
println!("Reset in {:.2}s", time_until_reset.as_secs_f64());
}
}
Err(e) => eprintln!("Error: {}", e),
}
}
Sharing Across Threads
Flux Limiter is thread-safe and designed to be shared:
#![allow(unused)]
fn main() {
use std::sync::Arc;
use std::thread;
let config = FluxLimiterConfig::new(100.0, 50.0);
let limiter = Arc::new(FluxLimiter::with_config(config, SystemClock).unwrap());
// Share across multiple threads
let handles: Vec<_> = (0..10)
.map(|i| {
let limiter = Arc::clone(&limiter);
thread::spawn(move || {
let client_id = format!("client_{}", i);
limiter.check_request(client_id)
})
})
.collect();
for handle in handles {
match handle.join().unwrap() {
Ok(decision) => println!("Allowed: {}", decision.allowed),
Err(e) => eprintln!("Error: {}", e),
}
}
}
Async Usage
While Flux Limiter is synchronous, it works seamlessly in async contexts:
use std::sync::Arc;
use tokio;
#[tokio::main]
async fn main() {
let config = FluxLimiterConfig::new(100.0, 50.0);
let limiter = Arc::new(FluxLimiter::with_config(config, SystemClock).unwrap());
// Use in async tasks
let limiter_clone = Arc::clone(&limiter);
tokio::spawn(async move {
match limiter_clone.check_request("user_123") {
Ok(decision) if decision.allowed => {
// Process async request
println!("Processing request...");
}
Ok(_) => println!("Rate limited"),
Err(e) => eprintln!("Error: {}", e),
}
})
.await
.unwrap();
}
Multiple Rate Limiters
You can create different rate limiters for different purposes:
#![allow(unused)]
fn main() {
// Global rate limiter
let global_limiter = FluxLimiter::with_config(
FluxLimiterConfig::new(1000.0, 500.0),
SystemClock
).unwrap();
// Per-user rate limiter
let user_limiter = FluxLimiter::with_config(
FluxLimiterConfig::new(10.0, 5.0),
SystemClock
).unwrap();
// Apply both limits
fn check_both_limits(user_id: &str) -> bool {
match global_limiter.check_request("global") {
Ok(decision) if !decision.allowed => return false,
Err(_) => return false,
_ => {}
}
match user_limiter.check_request(user_id) {
Ok(decision) => decision.allowed,
Err(_) => false,
}
}
}
Idiomatic Error Handling
#![allow(unused)]
fn main() {
use flux_limiter::{FluxLimiter, FluxLimiterError};
fn process_request(limiter: &FluxLimiter<String, SystemClock>, client_id: String) -> Result<(), String> {
let decision = limiter
.check_request(client_id)
.map_err(|e| format!("Rate limiter error: {}", e))?;
if !decision.allowed {
return Err(format!(
"Rate limited. Retry after {:.2}s",
decision.retry_after_seconds.unwrap_or(0.0)
));
}
// Process the request
Ok(())
}
}
Next Steps
- Error Handling - Handle errors gracefully
- Advanced Usage - Memory management and optimization
- Web Integration - Use with web frameworks
Error Handling
Comprehensive guide to handling errors in Flux Limiter.
Error Types
Flux Limiter provides a well-defined error hierarchy:
#![allow(unused)]
fn main() {
pub enum FluxLimiterError {
InvalidRate, // Configuration: rate ≤ 0
InvalidBurst, // Configuration: burst < 0
ClockError(ClockError), // Runtime: clock failure
}
pub enum ClockError {
SystemTimeError, // System time unavailable
}
}
Configuration Errors
Configuration errors occur when creating a rate limiter with invalid settings.
InvalidRate Error
#![allow(unused)]
fn main() {
use flux_limiter::{FluxLimiterConfig, FluxLimiter, SystemClock, FluxLimiterError};
// Invalid: rate must be positive
let config = FluxLimiterConfig::new(-10.0, 5.0);
match FluxLimiter::with_config(config, SystemClock) {
Ok(_) => println!("Success"),
Err(FluxLimiterError::InvalidRate) => {
eprintln!("Error: Rate must be positive (> 0)");
}
Err(e) => eprintln!("Other error: {}", e),
}
}
InvalidBurst Error
#![allow(unused)]
fn main() {
// Invalid: burst must be non-negative
let config = FluxLimiterConfig::new(10.0, -5.0);
match FluxLimiter::with_config(config, SystemClock) {
Ok(_) => println!("Success"),
Err(FluxLimiterError::InvalidBurst) => {
eprintln!("Error: Burst must be non-negative (≥ 0)");
}
Err(e) => eprintln!("Other error: {}", e),
}
}
Handling Configuration Errors
Configuration errors should be caught early, typically at application startup:
fn create_rate_limiter() -> Result<FluxLimiter<String, SystemClock>, String> {
let config = FluxLimiterConfig::new(100.0, 50.0);
FluxLimiter::with_config(config, SystemClock)
.map_err(|e| match e {
FluxLimiterError::InvalidRate => {
"Invalid configuration: rate must be positive".to_string()
}
FluxLimiterError::InvalidBurst => {
"Invalid configuration: burst must be non-negative".to_string()
}
_ => format!("Configuration error: {}", e),
})
}
fn main() {
let limiter = create_rate_limiter()
.expect("Failed to create rate limiter with valid configuration");
// Use limiter...
}
Runtime Clock Errors
Clock errors can occur during normal operation when the system clock is unavailable or behaves unexpectedly.
Understanding Clock Errors
Clock errors happen when:
- System time API fails
- Clock jumps backward (NTP adjustment)
- System suspend/resume causes time discontinuity
- Virtualization causes time skips
Basic Clock Error Handling
#![allow(unused)]
fn main() {
match limiter.check_request("user_123") {
Ok(decision) => {
if decision.allowed {
// Process request
} else {
// Rate limited
}
}
Err(FluxLimiterError::ClockError(_)) => {
eprintln!("System clock error detected");
// Implement your error policy
}
Err(e) => {
eprintln!("Unexpected error: {}", e);
}
}
}
Error Handling Policies
Different applications require different error handling strategies.
Fail-Open Policy
Allow requests when the rate limiter encounters errors:
#![allow(unused)]
fn main() {
fn should_allow_request(
limiter: &FluxLimiter<String, SystemClock>,
client_id: &str
) -> bool {
match limiter.check_request(client_id) {
Ok(decision) => decision.allowed,
Err(FluxLimiterError::ClockError(_)) => {
// Fail-open: allow request on clock error
eprintln!("Clock error - allowing request (fail-open policy)");
true
}
Err(e) => {
eprintln!("Rate limiter error: {} - allowing request", e);
true
}
}
}
}
Use when:
- Availability is more important than strict rate limiting
- False positives (allowing too many requests) are acceptable
- Your backend can handle temporary spikes
Fail-Closed Policy
Deny requests when the rate limiter encounters errors:
#![allow(unused)]
fn main() {
fn should_allow_request(
limiter: &FluxLimiter<String, SystemClock>,
client_id: &str
) -> bool {
match limiter.check_request(client_id) {
Ok(decision) => decision.allowed,
Err(FluxLimiterError::ClockError(_)) => {
// Fail-closed: deny request on clock error
eprintln!("Clock error - denying request (fail-closed policy)");
false
}
Err(e) => {
eprintln!("Rate limiter error: {} - denying request", e);
false
}
}
}
}
Use when:
- Security is paramount
- False negatives (denying legitimate requests) are acceptable
- Protecting backend from overload is critical
Fallback Policy
Use alternative rate limiting when clock errors occur:
#![allow(unused)]
fn main() {
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
struct FallbackRateLimiter {
primary: FluxLimiter<String, SystemClock>,
fallback_counter: Arc<AtomicU64>,
fallback_limit: u64,
}
impl FallbackRateLimiter {
fn check_request(&self, client_id: String) -> bool {
match self.primary.check_request(client_id) {
Ok(decision) => decision.allowed,
Err(FluxLimiterError::ClockError(_)) => {
// Use simple counter as fallback
let count = self.fallback_counter.fetch_add(1, Ordering::Relaxed);
if count >= self.fallback_limit {
eprintln!("Fallback limit reached");
false
} else {
eprintln!("Using fallback counter: {}/{}", count, self.fallback_limit);
true
}
}
Err(e) => {
eprintln!("Unexpected error: {}", e);
false
}
}
}
}
}
Monitoring Clock Errors
Track clock errors for alerting and debugging:
#![allow(unused)]
fn main() {
use std::sync::atomic::{AtomicU64, Ordering};
static CLOCK_ERROR_COUNT: AtomicU64 = AtomicU64::new(0);
static TOTAL_REQUESTS: AtomicU64 = AtomicU64::new(0);
fn check_with_monitoring(
limiter: &FluxLimiter<String, SystemClock>,
client_id: String
) -> bool {
TOTAL_REQUESTS.fetch_add(1, Ordering::Relaxed);
match limiter.check_request(client_id) {
Ok(decision) => decision.allowed,
Err(FluxLimiterError::ClockError(e)) => {
CLOCK_ERROR_COUNT.fetch_add(1, Ordering::Relaxed);
let error_count = CLOCK_ERROR_COUNT.load(Ordering::Relaxed);
let total = TOTAL_REQUESTS.load(Ordering::Relaxed);
let error_rate = error_count as f64 / total as f64;
eprintln!("Clock error: {:?} (rate: {:.4}%)", e, error_rate * 100.0);
// Implement your policy
true // Fail-open
}
Err(e) => {
eprintln!("Unexpected error: {}", e);
false
}
}
}
}
Circuit Breaker Pattern
Temporarily bypass rate limiting after consecutive failures:
#![allow(unused)]
fn main() {
use std::sync::atomic::{AtomicU64, Ordering};
struct CircuitBreakerLimiter {
limiter: FluxLimiter<String, SystemClock>,
consecutive_failures: AtomicU64,
failure_threshold: u64,
bypassed: AtomicU64,
}
impl CircuitBreakerLimiter {
fn check_request(&self, client_id: String) -> bool {
// Check if circuit is open
if self.consecutive_failures.load(Ordering::Relaxed) >= self.failure_threshold {
self.bypassed.fetch_add(1, Ordering::Relaxed);
eprintln!("Circuit open - bypassing rate limiter");
return true;
}
match self.limiter.check_request(client_id) {
Ok(decision) => {
// Reset failure counter on success
self.consecutive_failures.store(0, Ordering::Relaxed);
decision.allowed
}
Err(FluxLimiterError::ClockError(_)) => {
let failures = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
if failures >= self.failure_threshold {
eprintln!("Opening circuit after {} consecutive failures", failures);
}
true // Fail-open
}
Err(e) => {
eprintln!("Unexpected error: {}", e);
false
}
}
}
}
}
Cleanup Error Handling
The cleanup_stale_clients method can also return clock errors:
#![allow(unused)]
fn main() {
// Cleanup errors are typically not critical
match limiter.cleanup_stale_clients(one_hour_nanos) {
Ok(count) => {
println!("Cleaned up {} stale clients", count);
}
Err(FluxLimiterError::ClockError(_)) => {
eprintln!("Clock error during cleanup - will retry later");
// Cleanup failure is not critical - continue operation
}
Err(e) => {
eprintln!("Unexpected cleanup error: {}", e);
}
}
}
Best Practices
- Validate Configuration Early: Check configuration at startup, not runtime
- Choose an Error Policy: Decide on fail-open, fail-closed, or fallback
- Monitor Errors: Track error rates for alerting
- Log Contextually: Include client ID and error context in logs
- Handle Gracefully: Never panic - always return a decision
- Test Error Paths: Use TestClock to simulate failures
- Document Policy: Make your error handling policy explicit
Next Steps
- Advanced Usage - Memory management and optimization
- Production Considerations - Deploy with confidence
- Testing Architecture - Test error handling
Advanced Usage
Advanced features and optimization techniques for Flux Limiter.
Memory Management
Flux Limiter tracks state for each client, which can grow over time. Use cleanup to manage memory.
Understanding Memory Usage
- Memory: O(number of active clients)
- Per-client overhead: ~40-48 bytes (client ID + TAT timestamp)
- Example: 1 million clients ≈ 40-48 MB
Manual Cleanup
#![allow(unused)]
fn main() {
// Clean up clients that haven't been seen for 1 hour
let one_hour_nanos = 60 * 60 * 1_000_000_000u64;
match limiter.cleanup_stale_clients(one_hour_nanos) {
Ok(removed_count) => {
println!("Cleaned up {} stale clients", removed_count);
}
Err(e) => {
eprintln!("Cleanup failed: {}", e);
// Cleanup failure is usually not critical - log and continue
}
}
}
Automatic Cleanup with Tokio
use std::sync::Arc;
use tokio::time::{interval, Duration};
async fn start_cleanup_task(limiter: Arc<FluxLimiter<String, SystemClock>>) {
let mut cleanup_interval = interval(Duration::from_secs(3600)); // 1 hour
loop {
cleanup_interval.tick().await;
let threshold = 24 * 60 * 60 * 1_000_000_000u64; // 24 hours
match limiter.cleanup_stale_clients(threshold) {
Ok(count) => {
println!("Cleaned up {} stale clients", count);
}
Err(e) => {
eprintln!("Cleanup failed: {}", e);
// Consider implementing fallback cleanup or alerting
}
}
}
}
#[tokio::main]
async fn main() {
let config = FluxLimiterConfig::new(100.0, 50.0);
let limiter = Arc::new(FluxLimiter::with_config(config, SystemClock).unwrap());
// Start cleanup task in background
let limiter_clone = Arc::clone(&limiter);
tokio::spawn(start_cleanup_task(limiter_clone));
// Use limiter in your application...
}
Dynamic Cleanup Thresholds
Base cleanup threshold on the configured rate:
#![allow(unused)]
fn main() {
fn calculate_cleanup_threshold(limiter: &FluxLimiter<String, SystemClock>) -> u64 {
// Clean up clients inactive for 100x the rate interval
let rate = limiter.rate();
let rate_interval_nanos = (1_000_000_000.0 / rate) as u64;
rate_interval_nanos * 100
}
let threshold = calculate_cleanup_threshold(&limiter);
let _ = limiter.cleanup_stale_clients(threshold);
}
Custom Client ID Types
Create custom client IDs for complex rate limiting scenarios.
Composite Client IDs
Rate limit by multiple dimensions:
#![allow(unused)]
fn main() {
use std::hash::{Hash, Hasher};
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct CompositeClientId {
user_id: String,
api_endpoint: String,
}
let limiter = FluxLimiter::<CompositeClientId, _>::with_config(
FluxLimiterConfig::new(100.0, 50.0),
SystemClock
).unwrap();
// Different rate limits per user per endpoint
let client = CompositeClientId {
user_id: "user_123".to_string(),
api_endpoint: "/api/data".to_string(),
};
limiter.check_request(client)?;
}
Hierarchical Rate Limiting
#![allow(unused)]
fn main() {
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
enum RateLimitKey {
Global,
PerTenant(String),
PerUser(String, String), // tenant_id, user_id
}
let limiter = FluxLimiter::<RateLimitKey, _>::with_config(
FluxLimiterConfig::new(1000.0, 500.0),
SystemClock
).unwrap();
// Check multiple levels
fn check_hierarchical(
limiter: &FluxLimiter<RateLimitKey, SystemClock>,
tenant_id: &str,
user_id: &str,
) -> bool {
// Global limit
if !limiter.check_request(RateLimitKey::Global).unwrap().allowed {
return false;
}
// Tenant limit
if !limiter.check_request(RateLimitKey::PerTenant(tenant_id.to_string())).unwrap().allowed {
return false;
}
// User limit
limiter.check_request(RateLimitKey::PerUser(tenant_id.to_string(), user_id.to_string()))
.unwrap()
.allowed
}
}
Multiple Rate Limiters
Use different rate limiters for different purposes.
Tiered Rate Limiting
#![allow(unused)]
fn main() {
use std::sync::Arc;
struct TieredRateLimiter {
free_tier: Arc<FluxLimiter<String, SystemClock>>,
paid_tier: Arc<FluxLimiter<String, SystemClock>>,
premium_tier: Arc<FluxLimiter<String, SystemClock>>,
}
impl TieredRateLimiter {
fn new() -> Result<Self, FluxLimiterError> {
Ok(Self {
free_tier: Arc::new(FluxLimiter::with_config(
FluxLimiterConfig::new(10.0, 5.0), // 10 req/s
SystemClock
)?),
paid_tier: Arc::new(FluxLimiter::with_config(
FluxLimiterConfig::new(100.0, 50.0), // 100 req/s
SystemClock
)?),
premium_tier: Arc::new(FluxLimiter::with_config(
FluxLimiterConfig::new(1000.0, 500.0), // 1000 req/s
SystemClock
)?),
})
}
fn check_request(&self, tier: &str, client_id: String) -> Result<FluxLimiterDecision, FluxLimiterError> {
match tier {
"free" => self.free_tier.check_request(client_id),
"paid" => self.paid_tier.check_request(client_id),
"premium" => self.premium_tier.check_request(client_id),
_ => self.free_tier.check_request(client_id),
}
}
}
}
Per-Endpoint Rate Limiting
#![allow(unused)]
fn main() {
use std::collections::HashMap;
use std::sync::Arc;
struct EndpointRateLimiter {
limiters: HashMap<String, Arc<FluxLimiter<String, SystemClock>>>,
default: Arc<FluxLimiter<String, SystemClock>>,
}
impl EndpointRateLimiter {
fn new() -> Result<Self, FluxLimiterError> {
let mut limiters = HashMap::new();
// Strict limit for expensive endpoint
limiters.insert(
"/api/expensive".to_string(),
Arc::new(FluxLimiter::with_config(
FluxLimiterConfig::new(1.0, 2.0),
SystemClock
)?),
);
// Generous limit for cheap endpoint
limiters.insert(
"/api/cheap".to_string(),
Arc::new(FluxLimiter::with_config(
FluxLimiterConfig::new(1000.0, 500.0),
SystemClock
)?),
);
// Default limit
let default = Arc::new(FluxLimiter::with_config(
FluxLimiterConfig::new(100.0, 50.0),
SystemClock
)?);
Ok(Self { limiters, default })
}
fn check_request(&self, endpoint: &str, client_id: String) -> Result<FluxLimiterDecision, FluxLimiterError> {
let limiter = self.limiters.get(endpoint).unwrap_or(&self.default);
limiter.check_request(client_id)
}
}
}
Performance Optimization
Minimize String Allocations
Use references where possible:
#![allow(unused)]
fn main() {
// Instead of allocating new strings
fn check_request_owned(limiter: &FluxLimiter<String, SystemClock>, id: String) -> bool {
limiter.check_request(id).unwrap().allowed
}
// Use string slices and clone only when needed
fn check_request_borrowed(limiter: &FluxLimiter<String, SystemClock>, id: &str) -> bool {
limiter.check_request(id.to_string()).unwrap().allowed
}
}
Or use numeric IDs for better performance:
#![allow(unused)]
fn main() {
// Faster: no string allocation
let limiter = FluxLimiter::<u64, _>::with_config(config, SystemClock).unwrap();
limiter.check_request(user_id_numeric).unwrap();
}
Batch Operations
Process multiple clients efficiently:
#![allow(unused)]
fn main() {
fn check_batch(
limiter: &FluxLimiter<String, SystemClock>,
client_ids: &[String],
) -> Vec<bool> {
client_ids
.iter()
.map(|id| limiter.check_request(id.clone())
.map(|d| d.allowed)
.unwrap_or(false))
.collect()
}
}
Parallel Processing
Use rayon for parallel rate limiting checks:
#![allow(unused)]
fn main() {
use rayon::prelude::*;
fn check_parallel(
limiter: &FluxLimiter<String, SystemClock>,
client_ids: Vec<String>,
) -> Vec<bool> {
client_ids
.par_iter()
.map(|id| limiter.check_request(id.clone())
.map(|d| d.allowed)
.unwrap_or(false))
.collect()
}
}
Custom Clock Implementation
Implement custom clock sources for special scenarios.
Mock Clock for Testing
#![allow(unused)]
fn main() {
use flux_limiter::{Clock, ClockError};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
#[derive(Clone)]
struct MockClock {
time: Arc<AtomicU64>,
}
impl MockClock {
fn new(initial_time_nanos: u64) -> Self {
Self {
time: Arc::new(AtomicU64::new(initial_time_nanos)),
}
}
fn set_time(&self, time_nanos: u64) {
self.time.store(time_nanos, Ordering::SeqCst);
}
fn advance(&self, duration_nanos: u64) {
self.time.fetch_add(duration_nanos, Ordering::SeqCst);
}
}
impl Clock for MockClock {
fn now(&self) -> Result<u64, ClockError> {
Ok(self.time.load(Ordering::SeqCst))
}
}
// Use in tests
let clock = MockClock::new(0);
let limiter = FluxLimiter::with_config(config, clock.clone()).unwrap();
// Control time in tests
clock.advance(1_000_000_000); // Advance 1 second
}
Monotonic Clock
Ensure time never goes backward:
#![allow(unused)]
fn main() {
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Clone)]
struct MonotonicClock {
last_time: Arc<AtomicU64>,
}
impl MonotonicClock {
fn new() -> Self {
Self {
last_time: Arc::new(AtomicU64::new(0)),
}
}
}
impl Clock for MonotonicClock {
fn now(&self) -> Result<u64, ClockError> {
let current_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|_| ClockError::SystemTimeError)?
.as_nanos() as u64;
// Ensure monotonicity
let mut last = self.last_time.load(Ordering::Acquire);
loop {
let next = current_time.max(last);
match self.last_time.compare_exchange_weak(
last,
next,
Ordering::Release,
Ordering::Acquire,
) {
Ok(_) => return Ok(next),
Err(x) => last = x,
}
}
}
}
}
Next Steps
- Web Integration - Use with web frameworks
- Production Considerations - Deploy with confidence
- Performance Design - Understand performance characteristics
Web Framework Integration
Learn how to integrate Flux Limiter with popular web frameworks.
Axum Integration
Basic Middleware
#![allow(unused)]
fn main() {
use axum::{
extract::State,
http::{Request, StatusCode, HeaderMap},
middleware::Next,
response::{Response, IntoResponse},
};
use flux_limiter::{FluxLimiter, FluxLimiterError, SystemClock};
use std::sync::Arc;
type SharedLimiter = Arc<FluxLimiter<String, SystemClock>>;
async fn rate_limit_middleware<B>(
State(limiter): State<SharedLimiter>,
request: Request<B>,
next: Next<B>,
) -> Result<Response, (StatusCode, HeaderMap, &'static str)> {
// Extract client ID (e.g., from IP or auth header)
let client_ip = extract_client_ip(&request);
match limiter.check_request(client_ip.clone()) {
Ok(decision) if decision.allowed => {
// Add rate limit headers
let mut response = next.run(request).await;
if let Some(remaining) = decision.remaining_capacity {
response.headers_mut().insert(
"X-RateLimit-Remaining",
remaining.to_string().parse().unwrap(),
);
}
Ok(response)
}
Ok(decision) => {
// Rate limited
let mut headers = HeaderMap::new();
if let Some(retry_after) = decision.retry_after_seconds {
headers.insert(
"Retry-After",
(retry_after.ceil() as u64).to_string().parse().unwrap(),
);
}
headers.insert("X-RateLimit-Remaining", "0".parse().unwrap());
Err((StatusCode::TOO_MANY_REQUESTS, headers, "Rate limited"))
}
Err(FluxLimiterError::ClockError(_)) => {
// Fail-open policy
eprintln!("Rate limiter clock error - allowing request");
Ok(next.run(request).await)
}
Err(e) => {
eprintln!("Rate limiter error: {}", e);
Err((StatusCode::INTERNAL_SERVER_ERROR, HeaderMap::new(), "Internal error"))
}
}
}
fn extract_client_ip<B>(request: &Request<B>) -> String {
request
.headers()
.get("X-Forwarded-For")
.and_then(|h| h.to_str().ok())
.and_then(|s| s.split(',').next())
.unwrap_or("unknown")
.to_string()
}
}
Complete Axum Example
use axum::{
routing::get,
Router,
middleware,
};
use flux_limiter::{FluxLimiter, FluxLimiterConfig, SystemClock};
use std::sync::Arc;
#[tokio::main]
async fn main() {
// Create rate limiter
let config = FluxLimiterConfig::new(10.0, 5.0);
let limiter = Arc::new(FluxLimiter::with_config(config, SystemClock).unwrap());
// Build router with middleware
let app = Router::new()
.route("/api/data", get(handler))
.layer(middleware::from_fn_with_state(
limiter.clone(),
rate_limit_middleware,
))
.with_state(limiter);
// Run server
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000")
.await
.unwrap();
axum::serve(listener, app).await.unwrap();
}
async fn handler() -> &'static str {
"Hello, world!"
}
Actix-Web Integration
Basic Middleware
#![allow(unused)]
fn main() {
use actix_web::{
dev::{Service, ServiceRequest, ServiceResponse, Transform},
Error, HttpResponse, http::header,
};
use flux_limiter::{FluxLimiter, FluxLimiterError, SystemClock};
use std::sync::Arc;
use std::future::Future;
use std::pin::Pin;
pub struct RateLimitMiddleware {
limiter: Arc<FluxLimiter<String, SystemClock>>,
}
impl RateLimitMiddleware {
pub fn new(limiter: Arc<FluxLimiter<String, SystemClock>>) -> Self {
Self { limiter }
}
}
impl<S, B> Transform<S, ServiceRequest> for RateLimitMiddleware
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
S::Future: 'static,
B: 'static,
{
type Response = ServiceResponse<B>;
type Error = Error;
type Transform = RateLimitMiddlewareService<S>;
type InitError = ();
type Future = Pin<Box<dyn Future<Output = Result<Self::Transform, Self::InitError>>>>;
fn new_transform(&self, service: S) -> Self::Future {
let limiter = self.limiter.clone();
Box::pin(async move {
Ok(RateLimitMiddlewareService { service, limiter })
})
}
}
pub struct RateLimitMiddlewareService<S> {
service: S,
limiter: Arc<FluxLimiter<String, SystemClock>>,
}
impl<S, B> Service<ServiceRequest> for RateLimitMiddlewareService<S>
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
S::Future: 'static,
B: 'static,
{
type Response = ServiceResponse<B>;
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
fn call(&self, req: ServiceRequest) -> Self::Future {
let client_ip = req
.connection_info()
.realip_remote_addr()
.unwrap_or("unknown")
.to_string();
let decision = self.limiter.check_request(client_ip);
match decision {
Ok(decision) if decision.allowed => {
let fut = self.service.call(req);
Box::pin(async move {
let mut res = fut.await?;
if let Some(remaining) = decision.remaining_capacity {
res.headers_mut().insert(
header::HeaderName::from_static("x-ratelimit-remaining"),
header::HeaderValue::from_str(&remaining.to_string()).unwrap(),
);
}
Ok(res)
})
}
Ok(decision) => {
Box::pin(async move {
let mut response = HttpResponse::TooManyRequests();
if let Some(retry_after) = decision.retry_after_seconds {
response.insert_header((
"Retry-After",
(retry_after.ceil() as u64).to_string(),
));
}
Ok(req.into_response(response.finish()))
})
}
Err(FluxLimiterError::ClockError(_)) => {
// Fail-open policy
eprintln!("Clock error - allowing request");
let fut = self.service.call(req);
Box::pin(async move { fut.await })
}
Err(e) => {
eprintln!("Rate limiter error: {}", e);
Box::pin(async move {
Ok(req.into_response(HttpResponse::InternalServerError().finish()))
})
}
}
}
}
}
Rocket Integration
Request Guard
#![allow(unused)]
fn main() {
use rocket::{
request::{FromRequest, Outcome, Request},
http::Status,
State,
};
use flux_limiter::{FluxLimiter, FluxLimiterDecision, SystemClock};
use std::sync::Arc;
pub struct RateLimited {
pub decision: FluxLimiterDecision,
}
#[rocket::async_trait]
impl<'r> FromRequest<'r> for RateLimited {
type Error = ();
async fn from_request(request: &'r Request<'_>) -> Outcome<Self, Self::Error> {
let limiter = request
.guard::<&State<Arc<FluxLimiter<String, SystemClock>>>>()
.await
.unwrap();
let client_ip = request
.client_ip()
.map(|ip| ip.to_string())
.unwrap_or_else(|| "unknown".to_string());
match limiter.check_request(client_ip) {
Ok(decision) if decision.allowed => {
Outcome::Success(RateLimited { decision })
}
Ok(_) => Outcome::Error((Status::TooManyRequests, ())),
Err(_) => {
// Fail-open
Outcome::Success(RateLimited {
decision: FluxLimiterDecision {
allowed: true,
retry_after_seconds: None,
remaining_capacity: None,
reset_time_nanos: 0,
},
})
}
}
}
}
// Usage in route
#[rocket::get("/api/data")]
fn data(_rate_limited: RateLimited) -> &'static str {
"Hello, world!"
}
}
Standard Rate Limit Headers
HTTP Response Headers
Implement standard rate limiting headers:
#![allow(unused)]
fn main() {
use axum::http::HeaderMap;
fn add_rate_limit_headers(
headers: &mut HeaderMap,
decision: &FluxLimiterDecision,
limit: f64,
) {
// X-RateLimit-Limit: Maximum requests allowed
headers.insert(
"X-RateLimit-Limit",
limit.to_string().parse().unwrap(),
);
// X-RateLimit-Remaining: Remaining requests in window
if let Some(remaining) = decision.remaining_capacity {
headers.insert(
"X-RateLimit-Remaining",
remaining.to_string().parse().unwrap(),
);
}
// X-RateLimit-Reset: When the limit resets (Unix timestamp)
let reset_seconds = decision.reset_time_nanos / 1_000_000_000;
headers.insert(
"X-RateLimit-Reset",
reset_seconds.to_string().parse().unwrap(),
);
// Retry-After: When to retry (for 429 responses)
if !decision.allowed {
if let Some(retry_after) = decision.retry_after_seconds {
headers.insert(
"Retry-After",
(retry_after.ceil() as u64).to_string().parse().unwrap(),
);
}
}
}
}
Multi-Tier Rate Limiting
Rate limit based on user tier:
#![allow(unused)]
fn main() {
use axum::{
extract::{State, Extension},
http::Request,
middleware::Next,
response::Response,
};
use std::collections::HashMap;
use std::sync::Arc;
struct TieredLimiters {
limiters: HashMap<String, Arc<FluxLimiter<String, SystemClock>>>,
}
impl TieredLimiters {
fn new() -> Result<Self, FluxLimiterError> {
let mut limiters = HashMap::new();
limiters.insert(
"free".to_string(),
Arc::new(FluxLimiter::with_config(
FluxLimiterConfig::new(10.0, 5.0),
SystemClock
)?),
);
limiters.insert(
"paid".to_string(),
Arc::new(FluxLimiter::with_config(
FluxLimiterConfig::new(100.0, 50.0),
SystemClock
)?),
);
Ok(Self { limiters })
}
fn get_limiter(&self, tier: &str) -> &Arc<FluxLimiter<String, SystemClock>> {
self.limiters.get(tier).unwrap_or_else(|| &self.limiters["free"])
}
}
async fn tiered_rate_limit<B>(
State(limiters): State<Arc<TieredLimiters>>,
Extension(user_tier): Extension<String>,
request: Request<B>,
next: Next<B>,
) -> Response {
let client_ip = extract_client_ip(&request);
let limiter = limiters.get_limiter(&user_tier);
match limiter.check_request(client_ip) {
Ok(decision) if decision.allowed => next.run(request).await,
Ok(_) => {
(StatusCode::TOO_MANY_REQUESTS, "Rate limited").into_response()
}
Err(_) => next.run(request).await, // Fail-open
}
}
}
Background Cleanup Task
Automatically clean up stale clients:
use tokio::time::{interval, Duration};
use std::sync::Arc;
async fn cleanup_task(limiter: Arc<FluxLimiter<String, SystemClock>>) {
let mut cleanup_interval = interval(Duration::from_secs(3600)); // 1 hour
loop {
cleanup_interval.tick().await;
let threshold = 24 * 60 * 60 * 1_000_000_000u64; // 24 hours
match limiter.cleanup_stale_clients(threshold) {
Ok(count) => {
println!("Rate limiter: cleaned up {} stale clients", count);
}
Err(e) => {
eprintln!("Rate limiter cleanup failed: {}", e);
}
}
}
}
// Start cleanup task when initializing the server
#[tokio::main]
async fn main() {
let config = FluxLimiterConfig::new(100.0, 50.0);
let limiter = Arc::new(FluxLimiter::with_config(config, SystemClock).unwrap());
// Start cleanup in background
tokio::spawn(cleanup_task(limiter.clone()));
// Build and run your web server...
}
Next Steps
- Production Considerations - Deploy with confidence
- Error Handling - Handle errors gracefully
- Advanced Usage - Optimize performance
Production Considerations
Best practices for deploying Flux Limiter in production environments.
Configuration
Choose Appropriate Limits
Consider your service’s capacity and SLAs:
#![allow(unused)]
fn main() {
// Calculate based on backend capacity
let backend_capacity = 10_000.0; // 10k requests/second
let safety_margin = 0.8; // 80% of capacity
let clients_count = 100.0; // Expected concurrent clients
let rate_per_client = (backend_capacity * safety_margin) / clients_count;
let config = FluxLimiterConfig::new(rate_per_client, rate_per_client * 0.5);
}
Environment-Based Configuration
#![allow(unused)]
fn main() {
use std::env;
fn create_rate_limiter() -> Result<FluxLimiter<String, SystemClock>, FluxLimiterError> {
let rate = env::var("RATE_LIMIT_RATE")
.unwrap_or_else(|_| "100.0".to_string())
.parse::<f64>()
.expect("Invalid RATE_LIMIT_RATE");
let burst = env::var("RATE_LIMIT_BURST")
.unwrap_or_else(|_| "50.0".to_string())
.parse::<f64>()
.expect("Invalid RATE_LIMIT_BURST");
let config = FluxLimiterConfig::new(rate, burst);
FluxLimiter::with_config(config, SystemClock)
}
}
Monitoring and Observability
Metrics Collection
Track rate limiting decisions for monitoring:
#![allow(unused)]
fn main() {
use std::sync::atomic::{AtomicU64, Ordering};
struct RateLimiterMetrics {
total_requests: AtomicU64,
allowed_requests: AtomicU64,
denied_requests: AtomicU64,
clock_errors: AtomicU64,
}
impl RateLimiterMetrics {
fn new() -> Self {
Self {
total_requests: AtomicU64::new(0),
allowed_requests: AtomicU64::new(0),
denied_requests: AtomicU64::new(0),
clock_errors: AtomicU64::new(0),
}
}
fn record_decision(&self, result: &Result<FluxLimiterDecision, FluxLimiterError>) {
self.total_requests.fetch_add(1, Ordering::Relaxed);
match result {
Ok(decision) if decision.allowed => {
self.allowed_requests.fetch_add(1, Ordering::Relaxed);
}
Ok(_) => {
self.denied_requests.fetch_add(1, Ordering::Relaxed);
}
Err(FluxLimiterError::ClockError(_)) => {
self.clock_errors.fetch_add(1, Ordering::Relaxed);
}
Err(_) => {}
}
}
fn get_stats(&self) -> (u64, u64, u64, u64) {
(
self.total_requests.load(Ordering::Relaxed),
self.allowed_requests.load(Ordering::Relaxed),
self.denied_requests.load(Ordering::Relaxed),
self.clock_errors.load(Ordering::Relaxed),
)
}
}
}
Prometheus Integration
#![allow(unused)]
fn main() {
use prometheus::{Counter, IntCounter, Registry};
use std::sync::Arc;
struct PrometheusRateLimiter {
limiter: Arc<FluxLimiter<String, SystemClock>>,
requests_total: IntCounter,
requests_allowed: IntCounter,
requests_denied: IntCounter,
clock_errors: IntCounter,
}
impl PrometheusRateLimiter {
fn new(
limiter: FluxLimiter<String, SystemClock>,
registry: &Registry,
) -> Result<Self, Box<dyn std::error::Error>> {
let requests_total = IntCounter::new(
"rate_limiter_requests_total",
"Total number of rate limit checks"
)?;
registry.register(Box::new(requests_total.clone()))?;
let requests_allowed = IntCounter::new(
"rate_limiter_requests_allowed",
"Number of allowed requests"
)?;
registry.register(Box::new(requests_allowed.clone()))?;
let requests_denied = IntCounter::new(
"rate_limiter_requests_denied",
"Number of denied requests"
)?;
registry.register(Box::new(requests_denied.clone()))?;
let clock_errors = IntCounter::new(
"rate_limiter_clock_errors",
"Number of clock errors"
)?;
registry.register(Box::new(clock_errors.clone()))?;
Ok(Self {
limiter: Arc::new(limiter),
requests_total,
requests_allowed,
requests_denied,
clock_errors,
})
}
fn check_request(&self, client_id: String) -> Result<FluxLimiterDecision, FluxLimiterError> {
self.requests_total.inc();
let result = self.limiter.check_request(client_id);
match &result {
Ok(decision) if decision.allowed => self.requests_allowed.inc(),
Ok(_) => self.requests_denied.inc(),
Err(FluxLimiterError::ClockError(_)) => self.clock_errors.inc(),
_ => {}
}
result
}
}
}
Logging
Implement structured logging:
#![allow(unused)]
fn main() {
use tracing::{info, warn, error};
fn check_request_with_logging(
limiter: &FluxLimiter<String, SystemClock>,
client_id: String,
) -> bool {
match limiter.check_request(client_id.clone()) {
Ok(decision) if decision.allowed => {
info!(
client_id = %client_id,
remaining = ?decision.remaining_capacity,
"Request allowed"
);
true
}
Ok(decision) => {
warn!(
client_id = %client_id,
retry_after = ?decision.retry_after_seconds,
"Request rate limited"
);
false
}
Err(FluxLimiterError::ClockError(e)) => {
error!(
client_id = %client_id,
error = ?e,
"Clock error in rate limiter - failing open"
);
true // Fail-open
}
Err(e) => {
error!(
client_id = %client_id,
error = ?e,
"Unexpected rate limiter error"
);
false
}
}
}
}
Memory Management
Automatic Cleanup
Implement periodic cleanup to prevent unbounded memory growth:
#![allow(unused)]
fn main() {
use tokio::time::{interval, Duration};
use std::sync::Arc;
struct ManagedRateLimiter {
limiter: Arc<FluxLimiter<String, SystemClock>>,
}
impl ManagedRateLimiter {
fn new(config: FluxLimiterConfig) -> Result<Self, FluxLimiterError> {
let limiter = Arc::new(FluxLimiter::with_config(config, SystemClock)?);
// Start background cleanup task
let limiter_clone = Arc::clone(&limiter);
tokio::spawn(async move {
Self::cleanup_loop(limiter_clone).await;
});
Ok(Self { limiter })
}
async fn cleanup_loop(limiter: Arc<FluxLimiter<String, SystemClock>>) {
let mut interval = interval(Duration::from_secs(3600)); // 1 hour
loop {
interval.tick().await;
let threshold = 24 * 60 * 60 * 1_000_000_000u64; // 24 hours
match limiter.cleanup_stale_clients(threshold) {
Ok(count) => {
info!("Cleaned up {} stale rate limit entries", count);
}
Err(e) => {
error!("Rate limiter cleanup failed: {}", e);
}
}
}
}
fn check_request(&self, client_id: String) -> Result<FluxLimiterDecision, FluxLimiterError> {
self.limiter.check_request(client_id)
}
}
}
Memory Monitoring
Monitor memory usage and alert on growth:
#![allow(unused)]
fn main() {
fn estimate_memory_usage(limiter: &FluxLimiter<String, SystemClock>) -> usize {
// Estimate: ~48 bytes per client (HashMap overhead + String + u64)
let estimated_clients = 1_000_000; // Adjust based on your load
estimated_clients * 48
}
fn check_memory_threshold(limiter: &FluxLimiter<String, SystemClock>) {
let estimated_usage = estimate_memory_usage(limiter);
let threshold = 100 * 1024 * 1024; // 100 MB
if estimated_usage > threshold {
warn!(
"Rate limiter memory usage estimated at {} MB",
estimated_usage / (1024 * 1024)
);
}
}
}
Error Handling Strategy
Circuit Breaker for Clock Errors
Temporarily disable rate limiting after persistent failures:
#![allow(unused)]
fn main() {
use std::sync::atomic::{AtomicU64, AtomicBool, Ordering};
use std::time::{SystemTime, Duration};
struct CircuitBreakerRateLimiter {
limiter: FluxLimiter<String, SystemClock>,
consecutive_failures: AtomicU64,
circuit_open: AtomicBool,
circuit_open_time: AtomicU64,
failure_threshold: u64,
reset_timeout_secs: u64,
}
impl CircuitBreakerRateLimiter {
fn new(
config: FluxLimiterConfig,
failure_threshold: u64,
reset_timeout_secs: u64,
) -> Result<Self, FluxLimiterError> {
Ok(Self {
limiter: FluxLimiter::with_config(config, SystemClock)?,
consecutive_failures: AtomicU64::new(0),
circuit_open: AtomicBool::new(false),
circuit_open_time: AtomicU64::new(0),
failure_threshold,
reset_timeout_secs,
})
}
fn check_request(&self, client_id: String) -> bool {
// Check if circuit should be reset
if self.circuit_open.load(Ordering::Relaxed) {
let open_time = self.circuit_open_time.load(Ordering::Relaxed);
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
if now - open_time > self.reset_timeout_secs {
info!("Attempting to close circuit breaker");
self.circuit_open.store(false, Ordering::Relaxed);
self.consecutive_failures.store(0, Ordering::Relaxed);
} else {
// Circuit still open - bypass rate limiting
return true;
}
}
match self.limiter.check_request(client_id) {
Ok(decision) => {
// Reset failure counter on success
self.consecutive_failures.store(0, Ordering::Relaxed);
decision.allowed
}
Err(FluxLimiterError::ClockError(_)) => {
let failures = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
if failures >= self.failure_threshold {
error!(
"Opening circuit breaker after {} consecutive clock failures",
failures
);
self.circuit_open.store(true, Ordering::Relaxed);
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
self.circuit_open_time.store(now, Ordering::Relaxed);
}
true // Fail-open
}
Err(e) => {
error!("Unexpected rate limiter error: {}", e);
false
}
}
}
}
}
Load Testing
Test your rate limiter configuration under load:
#![allow(unused)]
fn main() {
#[cfg(test)]
mod load_tests {
use super::*;
use std::sync::Arc;
use std::thread;
use std::time::Instant;
#[test]
fn test_concurrent_load() {
let config = FluxLimiterConfig::new(1000.0, 500.0);
let limiter = Arc::new(FluxLimiter::with_config(config, SystemClock).unwrap());
let start = Instant::now();
let threads: Vec<_> = (0..10)
.map(|i| {
let limiter = Arc::clone(&limiter);
thread::spawn(move || {
let mut allowed = 0;
let mut denied = 0;
for j in 0..10000 {
let client_id = format!("client_{}_{}", i, j % 100);
match limiter.check_request(client_id) {
Ok(decision) if decision.allowed => allowed += 1,
Ok(_) => denied += 1,
Err(_) => {}
}
}
(allowed, denied)
})
})
.collect();
let mut total_allowed = 0;
let mut total_denied = 0;
for handle in threads {
let (allowed, denied) = handle.join().unwrap();
total_allowed += allowed;
total_denied += denied;
}
let elapsed = start.elapsed();
let total_requests = total_allowed + total_denied;
println!("Processed {} requests in {:?}", total_requests, elapsed);
println!("Throughput: {:.2} req/s", total_requests as f64 / elapsed.as_secs_f64());
println!("Allowed: {}, Denied: {}", total_allowed, total_denied);
}
}
}
Deployment Checklist
- Configure appropriate rate and burst limits
- Implement error handling policy (fail-open/fail-closed)
- Set up monitoring and alerting
- Enable structured logging
- Configure automatic cleanup
- Load test under expected traffic
- Document rate limit headers in API docs
- Set up memory monitoring
- Test clock error handling
- Implement circuit breaker if needed
Next Steps
- Monitoring - Use decision metadata
- Performance - Understand performance characteristics
- Error Handling - Handle errors gracefully
Architecture Overview
This document provides a comprehensive overview of the Flux Limiter’s architecture, design decisions, and implementation details.
Design Philosophy
Flux Limiter is built on several key architectural principles:
- Lock-Free Concurrency: Uses atomic operations and lock-free data structures
- Zero-Allocation Hot Path: Minimizes memory allocation in rate limiting decisions
- Clock Abstraction: Enables testing and handles time-related failures
- Type Safety: Leverages Rust’s type system for correctness guarantees
- Graceful Degradation: Continues operation despite partial failures
Core Architecture
┌─────────────────────────────────────────────────────────────┐
│ Flux Limiter │
├─────────────────────────────────────────────────────────────┤
│ Client API │
│ ├─ check_request(client_id) -> Result<Decision, Error> │
│ ├─ cleanup_stale_clients(threshold) -> Result<(), Error> │
│ └─ rate(), burst() -> f64 │
├─────────────────────────────────────────────────────────────┤
│ Core Components │
│ ├─ FluxLimiter<T, C> │ Main rate limiter struct │
│ ├─ FluxLimiterConfig │ Configuration management │
│ ├─ FluxLimiterDecision │ Rich decision metadata │
│ └─ FluxLimiterError │ Comprehensive error handling │
├─────────────────────────────────────────────────────────────┤
│ Algorithm Layer │
│ ├─ GCRA Implementation │ Generic Cell Rate Algorithm │
│ ├─ Nanosecond Precision │ u64 nanosecond calculations │
│ └─ TAT Tracking │ Theoretical Arrival Time │
├─────────────────────────────────────────────────────────────┤
│ Storage Layer │
│ ├─ DashMap<T, u64> │ Lock-free concurrent hash map │
│ ├─ Atomic Operations │ Thread-safe state updates │
│ └─ Memory Management │ Automatic cleanup mechanisms │
├─────────────────────────────────────────────────────────────┤
│ Time Abstraction │
│ ├─ Clock Trait │ Pluggable time source │
│ ├─ SystemClock │ Production time implementation │
│ └─ TestClock │ Deterministic test time │
└─────────────────────────────────────────────────────────────┘
Data Flow
Request → check_request(client_id)
↓
Clock::now() → Current Time (nanoseconds)
↓
DashMap::get(client_id) → Previous TAT
↓
GCRA Calculation
↓
Decision: Allow/Deny + Metadata
↓
DashMap::insert(client_id, new_TAT)
↓
Return FluxLimiterDecision
Key Design Principles
Performance First
- O(1) operations: Constant time complexity for rate limit checks
- Lock-free concurrency: No global locks or contention
- Nanosecond precision: Integer arithmetic avoids floating-point overhead
- Minimal allocations: Hot path reuses existing memory
Correctness Guaranteed
- Mathematical precision: Exact GCRA implementation
- Type safety: Rust’s type system prevents common errors
- Comprehensive testing: >95% code coverage with deterministic tests
- No silent failures: All errors are explicitly handled
Observability Built-in
- Rich metadata: Every decision includes detailed information
- Error transparency: Clear error types and recovery paths
- Memory tracking: Built-in cleanup and monitoring capabilities
- Production-ready: Designed for monitoring and debugging
Flexible and Extensible
- Generic client IDs: Works with any hashable type
- Pluggable time sources: Clock abstraction for testing
- Composable: Multiple limiters for complex scenarios
- Future-proof: Architecture supports future enhancements
System Requirements
Dependencies
- DashMap: Lock-free concurrent hash map
- Rust Standard Library: Minimal external dependencies
- No async runtime: Works with any async runtime (Tokio, async-std, etc.)
Performance Characteristics
- Memory: O(number of active clients)
- Time complexity: O(1) for check_request()
- Concurrency: Lock-free reads and writes
- Precision: Nanosecond timing accuracy
- Throughput: Millions of operations per second
Scalability
Clients Memory Usage Latency
1K ~32KB < 1μs
100K ~3.2MB < 1μs
1M ~32MB < 1μs
10M ~320MB < 1μs
Architecture Layers
1. API Layer
Public interface for rate limiting operations:
check_request(): Primary rate limiting decisioncleanup_stale_clients(): Memory managementrate(),burst(): Configuration inspection
2. Algorithm Layer
GCRA implementation with nanosecond precision:
- Theoretical Arrival Time (TAT) calculation
- Conformance checking
- Metadata computation
3. Storage Layer
Thread-safe state management:
- DashMap for concurrent access
- Atomic operations for consistency
- Efficient memory layout
4. Time Abstraction Layer
Clock trait for time source flexibility:
- SystemClock for production
- TestClock for deterministic testing
- Custom clocks for special scenarios
Next Steps
- GCRA Algorithm - Understand the core algorithm
- Component Design - Explore individual components
- Concurrency Model - Learn about thread safety
- Performance Design - Understand performance characteristics
GCRA Algorithm
Deep dive into the Generic Cell Rate Algorithm implementation in Flux Limiter.
Algorithm Choice
Generic Cell Rate Algorithm (GCRA) was chosen over Token Bucket for several reasons:
- Mathematical Precision: Avoids floating-point precision issues
- Stateless Calculation: No background token refill processes
- Efficient State: One timestamp per client vs. token count + last refill
- Deterministic: Exact timing calculations with integer arithmetic
GCRA vs Token Bucket
Token Bucket
- Maintains a count of available tokens
- Tokens refill at a constant rate
- Requests consume tokens
- Requires background refill process or calculation on each check
GCRA
- Maintains Theoretical Arrival Time (TAT)
- Checks if current time conforms to rate
- Updates TAT for next request
- No background processes needed
Equivalence: GCRA and Token Bucket produce mathematically equivalent results, but GCRA is more efficient for implementation.
Core Concepts
Theoretical Arrival Time (TAT)
The TAT represents the theoretical time when the next request should arrive to maintain the configured rate.
Initial state: TAT = 0
After request 1: TAT = current_time + rate_interval
After request 2: TAT = max(current_time, previous_TAT) + rate_interval
Rate Interval (T)
Time between consecutive requests at the configured rate:
T = 1 / rate_per_second
T_nanos = 1_000_000_000 / rate_per_second
Examples:
rate = 10.0→T = 0.1s=100_000_000 nanosrate = 100.0→T = 0.01s=10_000_000 nanosrate = 1000.0→T = 0.001s=1_000_000 nanos
Tolerance (τ)
Maximum allowed deviation from the rate, determined by burst capacity:
τ = burst_capacity * rate_interval
τ_nanos = burst_capacity * (1_000_000_000 / rate_per_second)
Examples with rate = 10.0:
burst = 0.0→τ = 0 nanos(no burst)burst = 5.0→τ = 500_000_000 nanos(0.5 seconds)burst = 10.0→τ = 1_000_000_000 nanos(1 second)
Algorithm Implementation
Conformance Check
#![allow(unused)]
fn main() {
let current_time_nanos = clock.now()?;
let previous_tat_nanos = client_state
.get(&client_id)
.unwrap_or(current_time_nanos);
// Check if request conforms (is within tolerance)
let is_conforming = current_time_nanos >=
previous_tat_nanos.saturating_sub(tolerance_nanos);
}
Conforming Request: current_time >= TAT - τ
This allows requests to arrive up to τ nanoseconds early (burst capacity).
TAT Update (Allowed Request)
#![allow(unused)]
fn main() {
if is_conforming {
// Allow request and update TAT
let new_tat_nanos = current_time_nanos
.max(previous_tat_nanos) + rate_nanos;
client_state.insert(client_id, new_tat_nanos);
// Return allowed decision
}
}
TAT Update Rule: TAT' = max(current_time, previous_TAT) + T
This ensures TAT advances by the rate interval while preventing it from going backward.
Retry After Calculation (Denied Request)
#![allow(unused)]
fn main() {
else {
// Deny request
let retry_after_nanos = previous_tat_nanos
.saturating_sub(tolerance_nanos)
.saturating_sub(current_time_nanos);
let retry_after_seconds = retry_after_nanos as f64 / 1_000_000_000.0;
// Return denied decision with retry_after
}
}
Retry After: Time until the request would conform to the rate.
Mathematical Foundation
Basic Equations
- Rate Interval:
T = 1 / rwhereris rate per second - Tolerance:
τ = b * Twherebis burst capacity - TAT Update:
TAT' = max(t, TAT) + Twheretis current time - Conformance: Request allowed if
t >= TAT - τ
Burst Capacity
The burst capacity determines how many requests can be made immediately:
Total immediate capacity ≈ 1 + burst_capacity
Proof:
- First request at
t=0: TAT becomesT - Can make requests at times:
0, T-τ, 2T-τ, 3T-τ, ... - Maximum burst when all accumulated:
τ/T = burst_capacity - Plus the current request:
1 + burst_capacity
Burst Recovery
After using burst capacity, it recovers at the configured rate:
Recovery rate = rate_per_second
Time to full recovery = burst_capacity / rate_per_second
Example with rate=10.0, burst=5.0:
- Can burst 6 requests immediately
- Recovers at 10 units/second
- Full recovery in 0.5 seconds
Precision Guarantees
Nanosecond Arithmetic
All calculations use u64 nanoseconds:
#![allow(unused)]
fn main() {
// No floating-point drift
let rate_nanos: u64 = (1_000_000_000.0 / rate_per_second) as u64;
let tolerance_nanos: u64 = (burst_capacity * rate_nanos as f64) as u64;
}
Benefits:
- Exact integer arithmetic
- No accumulating rounding errors
- Supports rates up to 1 billion requests/second
- Nanosecond precision for timing
Overflow Protection
Uses saturating arithmetic to prevent overflow:
#![allow(unused)]
fn main() {
let is_conforming = current_time_nanos >=
previous_tat_nanos.saturating_sub(tolerance_nanos);
}
Saturating operations:
saturating_sub: Returns 0 if underflow would occursaturating_add: Returnsu64::MAXif overflow would occur- Prevents undefined behavior and panics
Example Scenarios
Scenario 1: Sustained Rate
Configuration: rate=10.0, burst=0.0
t=0.0s: Request → Allowed (TAT=0.1s)
t=0.1s: Request → Allowed (TAT=0.2s)
t=0.2s: Request → Allowed (TAT=0.3s)
t=0.25s: Request → Denied (retry after 0.05s)
t=0.3s: Request → Allowed (TAT=0.4s)
Scenario 2: Burst Capacity
Configuration: rate=10.0, burst=5.0
t=0.0s: Request → Allowed (TAT=0.1s)
t=0.0s: Request → Allowed (TAT=0.2s) [burst]
t=0.0s: Request → Allowed (TAT=0.3s) [burst]
t=0.0s: Request → Allowed (TAT=0.4s) [burst]
t=0.0s: Request → Allowed (TAT=0.5s) [burst]
t=0.0s: Request → Allowed (TAT=0.6s) [burst]
t=0.0s: Request → Denied (retry after 0.1s)
t=0.1s: Request → Allowed (TAT=0.7s)
Scenario 3: Recovery After Idle
Configuration: rate=10.0, burst=5.0
t=0.0s: 6 requests → All allowed, TAT=0.6s
t=1.0s: Request → Allowed (TAT=1.1s) [burst recovered]
t=1.0s: 6 requests → All allowed, TAT=1.6s
Metadata Calculation
Remaining Capacity
#![allow(unused)]
fn main() {
let elapsed = current_time_nanos.saturating_sub(previous_tat_nanos);
let recovered = (elapsed as f64 / rate_nanos as f64).min(burst_capacity);
let remaining_capacity = Some(burst_capacity - consumed + recovered);
}
Tracks how much burst capacity is currently available.
Reset Time
#![allow(unused)]
fn main() {
let reset_time_nanos = new_tat_nanos + tolerance_nanos;
}
When the rate limit window will fully reset (all burst capacity recovered).
Performance Characteristics
- Time Complexity: O(1) - constant time for all operations
- Space Complexity: O(1) per client - single u64 timestamp
- Calculation Overhead: ~10-20 CPU cycles for GCRA calculation
- Memory Access: Single DashMap lookup/insert
Next Steps
- Component Design - Explore the FluxLimiter struct
- Performance Design - Understand optimization techniques
- Testing Architecture - Learn about deterministic testing
Component Design
Detailed exploration of Flux Limiter’s core components and their design rationale.
FluxLimiter<T, C>
The main rate limiter struct uses generics for flexibility.
Structure Definition
#![allow(unused)]
fn main() {
pub struct FluxLimiter<T, C = SystemClock>
where
T: Hash + Eq + Clone, // Client identifier type
C: Clock, // Time source
{
rate_nanos: u64, // Rate interval in nanoseconds
tolerance_nanos: u64, // Burst tolerance in nanoseconds
client_state: Arc<DashMap<T, u64>>, // Client TAT storage
clock: C, // Time abstraction
}
}
Design Rationale
Generic Client ID (T):
- Supports
String,IpAddr,u64, custom types - Constrains:
Hash + Eq + Clone - Zero-cost abstraction - no runtime overhead
Generic Clock (C):
- Default:
SystemClockfor production - Alternative:
TestClockfor testing - Custom: User-defined time sources
- Constrains:
Clocktrait
Arc
- Thread-safe shared ownership
- Lock-free concurrent access
- Minimal contention
- Efficient cloning
Nanosecond Storage:
u64for rate and tolerance- Maintains precision throughout calculations
- Avoids floating-point arithmetic in hot path
Public API
#![allow(unused)]
fn main() {
impl<T, C> FluxLimiter<T, C>
where
T: Hash + Eq + Clone,
C: Clock,
{
pub fn with_config(
config: FluxLimiterConfig,
clock: C,
) -> Result<Self, FluxLimiterError>
pub fn check_request(
&self,
client_id: T,
) -> Result<FluxLimiterDecision, FluxLimiterError>
pub fn cleanup_stale_clients(
&self,
stale_threshold_nanos: u64,
) -> Result<usize, FluxLimiterError>
pub fn rate(&self) -> f64
pub fn burst(&self) -> f64
}
}
FluxLimiterConfig
Configuration management with builder pattern support.
Structure Definition
#![allow(unused)]
fn main() {
#[derive(Debug, Clone)]
pub struct FluxLimiterConfig {
rate_per_second: f64, // User-friendly rate specification
burst_capacity: f64, // User-friendly burst specification
}
}
Design Rationale
User-Friendly Units:
rate_per_second: Intuitive “requests per second”burst_capacity: Additional burst allowance- Converted to nanoseconds internally
Builder Pattern:
#![allow(unused)]
fn main() {
impl FluxLimiterConfig {
pub fn new(rate_per_second: f64, burst_capacity: f64) -> Self
pub fn rate(mut self, rate_per_second: f64) -> Self
pub fn burst(mut self, burst_capacity: f64) -> Self
pub fn validate(&self) -> Result<(), FluxLimiterError>
}
}
Validation
#![allow(unused)]
fn main() {
pub fn validate(&self) -> Result<(), FluxLimiterError> {
if self.rate_per_second <= 0.0 {
return Err(FluxLimiterError::InvalidRate);
}
if self.burst_capacity < 0.0 {
return Err(FluxLimiterError::InvalidBurst);
}
Ok(())
}
}
Validation Rules:
- Rate must be positive (> 0.0)
- Burst must be non-negative (≥ 0.0)
- Checked at construction time
FluxLimiterDecision
Rich metadata returned from rate limiting decisions.
Structure Definition
#![allow(unused)]
fn main() {
#[derive(Debug, Clone, PartialEq)]
pub struct FluxLimiterDecision {
pub allowed: bool, // Primary decision
pub retry_after_seconds: Option<f64>, // When to retry (if denied)
pub remaining_capacity: Option<f64>, // Remaining burst capacity
pub reset_time_nanos: u64, // When window resets
}
}
Design Rationale
Rich Metadata Enables:
- HTTP rate limit headers (X-RateLimit-Remaining, Retry-After)
- Client-side backoff strategies
- Monitoring and observability
- Debugging and diagnostics
Field Details:
-
allowed: bool- Primary decision
true= allow request,false= deny
-
retry_after_seconds: Option<f64>Some(seconds)when denied- How long to wait before retrying
- Used for HTTP
Retry-Afterheader
-
remaining_capacity: Option<f64>- Current burst capacity remaining
- Useful for
X-RateLimit-Remainingheader - Helps clients understand their quota
-
reset_time_nanos: u64- Nanosecond timestamp when limit resets
- Convert to HTTP
X-RateLimit-Resetheader - Absolute time, not relative
FluxLimiterError
Comprehensive error handling for robust production usage.
Enum Definition
#![allow(unused)]
fn main() {
#[derive(Debug, Clone, PartialEq)]
pub enum FluxLimiterError {
InvalidRate, // Configuration: rate ≤ 0
InvalidBurst, // Configuration: burst < 0
ClockError(ClockError), // Runtime: clock failure
}
#[derive(Debug, Clone, PartialEq)]
pub enum ClockError {
SystemTimeError, // System time unavailable
}
}
Design Rationale
Explicit Error Types:
- Configuration errors (InvalidRate, InvalidBurst)
- Runtime errors (ClockError)
- Clear separation of error categories
Error Display:
#![allow(unused)]
fn main() {
impl std::fmt::Display for FluxLimiterError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Self::InvalidRate => write!(f, "Invalid rate: must be positive"),
Self::InvalidBurst => write!(f, "Invalid burst: must be non-negative"),
Self::ClockError(e) => write!(f, "Clock error: {}", e),
}
}
}
}
Error Conversion:
#![allow(unused)]
fn main() {
impl From<ClockError> for FluxLimiterError {
fn from(error: ClockError) -> Self {
FluxLimiterError::ClockError(error)
}
}
}
Clock Trait
Abstraction for pluggable time sources.
Trait Definition
#![allow(unused)]
fn main() {
pub trait Clock: Send + Sync {
fn now(&self) -> Result<u64, ClockError>;
}
}
Design Rationale
Benefits:
- Enables deterministic testing
- Handles real-world clock issues
- Allows custom time sources
- Graceful error handling
Send + Sync Requirements:
- Thread-safe: Can be shared across threads
- Required for concurrent rate limiting
SystemClock
Production time source using system time:
#![allow(unused)]
fn main() {
#[derive(Clone, Copy, Debug)]
pub struct SystemClock;
impl Clock for SystemClock {
fn now(&self) -> Result<u64, ClockError> {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.map_err(|_| ClockError::SystemTimeError)
}
}
}
Handles Real-World Issues:
- System clock going backwards (NTP adjustments)
- Clock resolution limitations
- System suspend/resume
- Virtualization time skips
TestClock
Deterministic time source for testing:
#![allow(unused)]
fn main() {
pub struct TestClock {
time: Arc<AtomicU64>, // Current time in nanoseconds
should_fail: Arc<AtomicBool>, // Failure simulation flag
}
impl TestClock {
pub fn new(initial_time_secs: f64) -> Self
pub fn advance(&self, duration_secs: f64)
pub fn set_time(&self, time_secs: f64)
pub fn fail_next_call(&self)
}
}
Features:
- Precise time control
- Failure simulation
- Thread-safe
- Deterministic testing
Memory Layout
FluxLimiter Size
#![allow(unused)]
fn main() {
FluxLimiter<String, SystemClock> {
rate_nanos: u64, // 8 bytes
tolerance_nanos: u64, // 8 bytes
client_state: Arc<..>, // 8 bytes (pointer)
clock: SystemClock, // 0 bytes (zero-sized type)
}
// Total: 24 bytes
}
Cache Efficiency:
- Small struct size fits in cache line
- Frequently accessed fields grouped
- Arc enables cheap cloning
Per-Client State
#![allow(unused)]
fn main() {
DashMap<String, u64> entry:
String: ~24 bytes (pointer + len + capacity)
u64: 8 bytes
Overhead: ~16 bytes (hash map metadata)
// Total per client: ~48 bytes
}
Scalability
1,000 clients = ~48 KB
10,000 clients = ~480 KB
100,000 clients = ~4.8 MB
1,000,000 clients = ~48 MB
Thread Safety
All components are thread-safe:
- FluxLimiter: Safe to share via
Arc - DashMap: Lock-free concurrent access
- Clock:
Send + Syncrequirement - Decisions: Immutable after creation
Next Steps
- Concurrency Model - Understand thread safety
- Performance Design - Optimization techniques
- Error Handling Architecture - Error strategies
Concurrency Model
Understanding the lock-free concurrency design in Flux Limiter.
Lock-Free Design Philosophy
Flux Limiter is built on a lock-free concurrency model for maximum performance:
Multiple Threads
↓
FluxLimiter (Shared via Arc)
↓
Arc<DashMap<ClientId, TAT>>
↓
Lock-free hash map operations
Thread Safety Guarantees
1. Read Operations
Multiple concurrent readers without contention:
#![allow(unused)]
fn main() {
// Thread 1
limiter.check_request("client_1")?;
// Thread 2 (concurrent)
limiter.check_request("client_2")?;
// No lock contention - different shards
}
2. Write Operations
Atomic updates with minimal contention:
#![allow(unused)]
fn main() {
// Atomic TAT update
client_state.insert(client_id, new_tat);
}
Each client’s state is independent, allowing parallel updates.
3. Memory Ordering
Relaxed ordering is sufficient for TAT updates:
- TAT values are monotonically increasing
- No cross-client dependencies
- No ABA problem
4. Concurrent Access Patterns
Same Client, Different Threads:
#![allow(unused)]
fn main() {
// Thread 1
limiter.check_request("client_1")?; // TAT = 100ms
// Thread 2 (concurrent)
limiter.check_request("client_1")?; // TAT = 200ms
// DashMap serializes access to same key
// Ordering determined by DashMap's internal locking
}
Different Clients, Different Threads:
#![allow(unused)]
fn main() {
// Thread 1
limiter.check_request("client_1")?;
// Thread 2 (concurrent)
limiter.check_request("client_2")?;
// Lock-free - different shards, no contention
}
DashMap: The Core Concurrency Primitive
Why DashMap?
DashMap provides lock-free concurrent access through sharding:
DashMap<ClientId, TAT>
├─ Shard 0: HashMap + RwLock
├─ Shard 1: HashMap + RwLock
├─ Shard 2: HashMap + RwLock
├─ ...
└─ Shard N: HashMap + RwLock
Benefits:
- Better than std::HashMap + Mutex: Avoids global locking
- Better than RwLock: No reader/writer contention
- Better than custom lock-free map: Mature, battle-tested
- Segmented locking: Reduces contention vs. single lock
Sharding Strategy
#![allow(unused)]
fn main() {
// Client IDs are hashed to shards
shard_index = hash(client_id) % num_shards
// Different clients likely in different shards
// Same client always in same shard (consistency)
}
Operations on DashMap
Get (Read):
#![allow(unused)]
fn main() {
if let Some(previous_tat) = client_state.get(&client_id) {
// Read lock on single shard
// Other shards remain accessible
}
}
Insert (Write):
#![allow(unused)]
fn main() {
client_state.insert(client_id, new_tat);
// Write lock on single shard
// Other shards remain accessible
}
Cleanup (Iteration):
#![allow(unused)]
fn main() {
client_state.retain(|_, &mut tat| {
// Locks each shard sequentially
// Short lock duration per shard
current_time - tat < threshold
});
}
Memory Consistency
Happens-Before Relationships
DashMap provides happens-before guarantees:
#![allow(unused)]
fn main() {
// Thread 1
limiter.check_request("client_1")?; // Writes TAT = 100ms
// Thread 2 (later)
limiter.check_request("client_1")?; // Reads TAT >= 100ms
}
The write in Thread 1 happens-before the read in Thread 2.
Visibility Guarantees
All updates to client state are immediately visible:
#![allow(unused)]
fn main() {
// Thread 1
client_state.insert("client_1", 100);
// Thread 2
let tat = client_state.get("client_1"); // Sees 100
}
DashMap ensures proper memory synchronization.
Sharing Across Threads
Arc for Shared Ownership
#![allow(unused)]
fn main() {
use std::sync::Arc;
let config = FluxLimiterConfig::new(100.0, 50.0);
let limiter = Arc::new(FluxLimiter::with_config(config, SystemClock)?);
// Clone Arc for each thread (cheap)
let limiter1 = Arc::clone(&limiter);
let limiter2 = Arc::clone(&limiter);
// Use in different threads
thread::spawn(move || limiter1.check_request("client_1"));
thread::spawn(move || limiter2.check_request("client_2"));
}
Arc Benefits:
- Reference counting with atomic operations
- Thread-safe shared ownership
- Cheap to clone (increments counter)
- No data duplication
Internal Arc Usage
#![allow(unused)]
fn main() {
pub struct FluxLimiter<T, C> {
// ...
client_state: Arc<DashMap<T, u64>>, // Shared across clones
// ...
}
}
Cloning FluxLimiter shares the same state:
#![allow(unused)]
fn main() {
let limiter1 = limiter.clone();
let limiter2 = limiter.clone();
// Both share the same client_state
// Updates visible to both
}
Contention Analysis
Low Contention Scenarios
Different clients, concurrent access:
Threads: [T1: client_1] [T2: client_2] [T3: client_3]
Shards: [Shard 0 ] [Shard 1 ] [Shard 2 ]
Result: Lock-free, no contention
Medium Contention Scenarios
Same shard, different clients:
Threads: [T1: client_1] [T2: client_100]
Shards: [Shard 0 ] [Shard 0 ]
Result: Some contention, but short lock duration
High Contention Scenarios
Same client, concurrent access:
Threads: [T1: client_1] [T2: client_1] [T3: client_1]
Shards: [Shard 0 ] [Shard 0 ] [Shard 0 ]
Result: Serialized access (expected for consistency)
This is correct behavior: Rate limiting the same client from multiple threads should be serialized to maintain correctness.
Performance Under Concurrency
Scalability Characteristics
Threads Throughput Latency
1 1.0x 1.0μs
2 1.9x 1.0μs
4 3.7x 1.1μs
8 7.0x 1.2μs
16 12.5x 1.3μs
Nearly linear scalability with different clients.
Benchmark: Concurrent Access
#![allow(unused)]
fn main() {
#[bench]
fn bench_concurrent_different_clients(b: &mut Bencher) {
let limiter = Arc::new(FluxLimiter::with_config(
FluxLimiterConfig::new(1000.0, 500.0),
SystemClock
).unwrap());
b.iter(|| {
let handles: Vec<_> = (0..8)
.map(|i| {
let limiter = Arc::clone(&limiter);
thread::spawn(move || {
for j in 0..1000 {
let client_id = format!("client_{}_{}", i, j);
limiter.check_request(client_id).unwrap();
}
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
});
}
}
Clock Thread Safety
The Clock trait requires Send + Sync:
#![allow(unused)]
fn main() {
pub trait Clock: Send + Sync {
fn now(&self) -> Result<u64, ClockError>;
}
}
SystemClock:
#![allow(unused)]
fn main() {
impl Clock for SystemClock {
fn now(&self) -> Result<u64, ClockError> {
// SystemTime::now() is thread-safe
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.map_err(|_| ClockError::SystemTimeError)
}
}
}
TestClock:
#![allow(unused)]
fn main() {
pub struct TestClock {
time: Arc<AtomicU64>, // Atomic for thread safety
should_fail: Arc<AtomicBool>, // Atomic for thread safety
}
}
Both implementations are thread-safe.
Cleanup Concurrency
Cleanup can run concurrently with rate limiting:
#![allow(unused)]
fn main() {
// Thread 1: Rate limiting
limiter.check_request("client_1")?;
// Thread 2: Cleanup (concurrent)
limiter.cleanup_stale_clients(threshold)?;
// Thread 3: More rate limiting (concurrent)
limiter.check_request("client_2")?;
}
DashMap’s retain method:
- Locks each shard briefly
- Other shards remain accessible
- Minimal impact on concurrent operations
Best Practices
1. Share via Arc
#![allow(unused)]
fn main() {
// Good: Share limiter across threads
let limiter = Arc::new(FluxLimiter::with_config(config, clock)?);
// Bad: Create multiple limiters (separate state)
let limiter1 = FluxLimiter::with_config(config, clock1)?;
let limiter2 = FluxLimiter::with_config(config, clock2)?;
}
2. Use Different Client IDs
#![allow(unused)]
fn main() {
// Good: Different clients = low contention
for i in 0..1000 {
limiter.check_request(format!("client_{}", i))?;
}
// Bad: Same client = serialized access
for _ in 0..1000 {
limiter.check_request("same_client")?;
}
}
3. Avoid Blocking in Callbacks
#![allow(unused)]
fn main() {
// Good: Quick check
match limiter.check_request(client_id) {
Ok(decision) => decision.allowed,
Err(_) => false,
}
// Bad: Blocking I/O while holding client state
match limiter.check_request(client_id) {
Ok(decision) => {
// Don't do expensive work here
database.log_decision(decision)?; // Blocks!
decision.allowed
}
Err(_) => false,
}
}
4. Periodic Cleanup
#![allow(unused)]
fn main() {
// Background cleanup task
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(3600));
loop {
interval.tick().await;
let _ = limiter.cleanup_stale_clients(threshold);
}
});
}
Next Steps
- Performance Design - Optimization techniques
- Testing Architecture - Concurrent testing strategies
- Design Decisions - Why these choices?
Error Handling Architecture
Comprehensive error handling design for production reliability.
Error Type Hierarchy
FluxLimiterError
├── InvalidRate (Configuration)
├── InvalidBurst (Configuration)
└── ClockError (Runtime)
└── SystemTimeError
Design Principles
1. Explicit Error Types
No silent failures - all errors are explicitly typed:
#![allow(unused)]
fn main() {
pub enum FluxLimiterError {
InvalidRate, // Rate ≤ 0
InvalidBurst, // Burst < 0
ClockError(ClockError), // System time failure
}
}
2. Configuration vs Runtime Errors
Configuration Errors (at startup):
InvalidRate: Caught during limiter creationInvalidBurst: Caught during limiter creation- Should never occur in production after startup
Runtime Errors (during operation):
ClockError: Can occur anytime- Requires graceful handling
- Application chooses policy
3. Graceful Degradation
Applications can choose error handling policy:
- Fail-open: Allow requests on errors
- Fail-closed: Deny requests on errors
- Fallback: Use alternative rate limiting
Configuration Error Handling
Early Validation
Validate configuration at startup:
fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = FluxLimiterConfig::new(100.0, 50.0);
let limiter = FluxLimiter::with_config(config, SystemClock)
.map_err(|e| match e {
FluxLimiterError::InvalidRate => {
"Invalid rate: must be positive".to_string()
}
FluxLimiterError::InvalidBurst => {
"Invalid burst: must be non-negative".to_string()
}
_ => format!("Configuration error: {}", e),
})?;
// Use limiter...
Ok(())
}
Explicit Validation
#![allow(unused)]
fn main() {
impl FluxLimiterConfig {
pub fn validate(&self) -> Result<(), FluxLimiterError> {
if self.rate_per_second <= 0.0 {
return Err(FluxLimiterError::InvalidRate);
}
if self.burst_capacity < 0.0 {
return Err(FluxLimiterError::InvalidBurst);
}
Ok(())
}
}
}
When to use:
- Before storing configuration
- When loading from external sources
- Before creating rate limiter
Runtime Clock Errors
Clock Error Sources
Clock errors occur when:
-
System Time Unavailable
- System clock not accessible
- Permissions issues
- Platform limitations
-
Time Goes Backward
- NTP adjustments
- Manual clock changes
- Virtualization issues
-
Time Discontinuities
- System suspend/resume
- Hibernation
- Container migrations
Error Propagation
Clock::now() → Result<u64, ClockError>
↓
FluxLimiter::check_request() → Result<Decision, FluxLimiterError>
↓
Application Layer → Implements error policy
Handling Clock Errors
Fail-Open Policy
Allow requests when clock fails:
#![allow(unused)]
fn main() {
match limiter.check_request(client_id) {
Ok(decision) => decision.allowed,
Err(FluxLimiterError::ClockError(_)) => {
eprintln!("Clock error - allowing request (fail-open)");
true
}
Err(e) => {
eprintln!("Unexpected error: {}", e);
true
}
}
}
Use when:
- Availability > strict rate limiting
- False positives acceptable
- Backend can handle spikes
Fail-Closed Policy
Deny requests when clock fails:
#![allow(unused)]
fn main() {
match limiter.check_request(client_id) {
Ok(decision) => decision.allowed,
Err(FluxLimiterError::ClockError(_)) => {
eprintln!("Clock error - denying request (fail-closed)");
false
}
Err(e) => {
eprintln!("Unexpected error: {}", e);
false
}
}
}
Use when:
- Security paramount
- False negatives acceptable
- Protecting backend critical
Fallback Policy
Use alternative when clock fails:
#![allow(unused)]
fn main() {
use std::sync::atomic::{AtomicU64, Ordering};
static FALLBACK_COUNTER: AtomicU64 = AtomicU64::new(0);
const FALLBACK_LIMIT: u64 = 1000;
match limiter.check_request(client_id) {
Ok(decision) => decision.allowed,
Err(FluxLimiterError::ClockError(_)) => {
let count = FALLBACK_COUNTER.fetch_add(1, Ordering::Relaxed);
count < FALLBACK_LIMIT
}
Err(_) => false,
}
}
Use when:
- Need graceful degradation
- Have fallback mechanism
- Want best effort
Error Monitoring
Tracking Error Rates
#![allow(unused)]
fn main() {
use std::sync::atomic::{AtomicU64, Ordering};
struct ErrorMetrics {
total_requests: AtomicU64,
clock_errors: AtomicU64,
config_errors: AtomicU64,
}
impl ErrorMetrics {
fn record_result(&self, result: &Result<FluxLimiterDecision, FluxLimiterError>) {
self.total_requests.fetch_add(1, Ordering::Relaxed);
if let Err(e) = result {
match e {
FluxLimiterError::ClockError(_) => {
self.clock_errors.fetch_add(1, Ordering::Relaxed);
}
FluxLimiterError::InvalidRate | FluxLimiterError::InvalidBurst => {
self.config_errors.fetch_add(1, Ordering::Relaxed);
}
}
}
}
fn error_rate(&self) -> f64 {
let total = self.total_requests.load(Ordering::Relaxed);
let errors = self.clock_errors.load(Ordering::Relaxed);
if total == 0 {
0.0
} else {
errors as f64 / total as f64
}
}
}
}
Alerting on Errors
#![allow(unused)]
fn main() {
fn check_with_alerting(
limiter: &FluxLimiter<String, SystemClock>,
metrics: &ErrorMetrics,
client_id: String,
) -> bool {
let result = limiter.check_request(client_id);
metrics.record_result(&result);
// Alert if error rate exceeds threshold
if metrics.error_rate() > 0.01 {
eprintln!(
"ALERT: Clock error rate exceeded 1%: {:.2}%",
metrics.error_rate() * 100.0
);
}
match result {
Ok(decision) => decision.allowed,
Err(FluxLimiterError::ClockError(_)) => true, // Fail-open
Err(_) => false,
}
}
}
Circuit Breaker Pattern
Temporarily bypass rate limiting after consecutive failures:
#![allow(unused)]
fn main() {
use std::sync::atomic::{AtomicU64, AtomicBool, Ordering};
struct CircuitBreaker {
consecutive_failures: AtomicU64,
circuit_open: AtomicBool,
threshold: u64,
}
impl CircuitBreaker {
fn new(threshold: u64) -> Self {
Self {
consecutive_failures: AtomicU64::new(0),
circuit_open: AtomicBool::new(false),
threshold,
}
}
fn record_success(&self) {
self.consecutive_failures.store(0, Ordering::Relaxed);
self.circuit_open.store(false, Ordering::Relaxed);
}
fn record_failure(&self) -> bool {
let failures = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
if failures >= self.threshold {
if !self.circuit_open.swap(true, Ordering::Relaxed) {
eprintln!("Circuit breaker opened after {} failures", failures);
}
true
} else {
false
}
}
fn is_open(&self) -> bool {
self.circuit_open.load(Ordering::Relaxed)
}
}
fn check_with_circuit_breaker(
limiter: &FluxLimiter<String, SystemClock>,
breaker: &CircuitBreaker,
client_id: String,
) -> bool {
if breaker.is_open() {
return true; // Bypass rate limiting
}
match limiter.check_request(client_id) {
Ok(decision) => {
breaker.record_success();
decision.allowed
}
Err(FluxLimiterError::ClockError(_)) => {
breaker.record_failure();
true // Fail-open
}
Err(_) => false,
}
}
}
Cleanup Error Handling
Cleanup errors are typically non-critical:
#![allow(unused)]
fn main() {
match limiter.cleanup_stale_clients(threshold) {
Ok(count) => {
info!("Cleaned up {} stale clients", count);
}
Err(FluxLimiterError::ClockError(_)) => {
warn!("Clock error during cleanup - will retry later");
// Cleanup failure is not critical - continue operation
}
Err(e) => {
error!("Unexpected cleanup error: {}", e);
}
}
}
Error Recovery Strategies
1. Retry with Backoff
#![allow(unused)]
fn main() {
async fn check_with_retry(
limiter: &FluxLimiter<String, SystemClock>,
client_id: String,
max_retries: u32,
) -> Result<FluxLimiterDecision, FluxLimiterError> {
let mut retries = 0;
let mut delay = Duration::from_millis(10);
loop {
match limiter.check_request(client_id.clone()) {
Ok(decision) => return Ok(decision),
Err(FluxLimiterError::ClockError(_)) if retries < max_retries => {
eprintln!("Clock error, retrying in {:?}", delay);
tokio::time::sleep(delay).await;
delay *= 2;
retries += 1;
}
Err(e) => return Err(e),
}
}
}
}
2. Degrade Gracefully
#![allow(unused)]
fn main() {
enum RateLimitStrategy {
Primary(FluxLimiter<String, SystemClock>),
Fallback(SimpleCounter),
}
impl RateLimitStrategy {
fn check_request(&mut self, client_id: String) -> bool {
match self {
Self::Primary(limiter) => {
match limiter.check_request(client_id) {
Ok(decision) => decision.allowed,
Err(FluxLimiterError::ClockError(_)) => {
// Switch to fallback
eprintln!("Switching to fallback strategy");
*self = Self::Fallback(SimpleCounter::new());
true
}
Err(_) => false,
}
}
Self::Fallback(counter) => counter.check(),
}
}
}
}
3. Log and Continue
#![allow(unused)]
fn main() {
fn check_with_logging(
limiter: &FluxLimiter<String, SystemClock>,
client_id: String,
) -> bool {
match limiter.check_request(client_id.clone()) {
Ok(decision) => {
trace!("Rate limit check succeeded for {}", client_id);
decision.allowed
}
Err(FluxLimiterError::ClockError(e)) => {
error!("Clock error for {}: {:?}", client_id, e);
true // Fail-open
}
Err(e) => {
error!("Unexpected error for {}: {:?}", client_id, e);
false
}
}
}
}
Testing Error Handling
Simulating Clock Failures
#![allow(unused)]
fn main() {
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_clock_error_handling() {
let clock = TestClock::new(0.0);
let limiter = FluxLimiter::with_config(
FluxLimiterConfig::new(10.0, 5.0),
clock.clone(),
).unwrap();
// Normal operation
assert!(limiter.check_request("client1").unwrap().allowed);
// Simulate clock failure
clock.fail_next_call();
assert!(matches!(
limiter.check_request("client1"),
Err(FluxLimiterError::ClockError(_))
));
// Recovery
assert!(limiter.check_request("client1").unwrap().allowed);
}
}
}
Best Practices
- Fail Fast on Configuration: Validate at startup
- Choose Error Policy: Fail-open, fail-closed, or fallback
- Monitor Errors: Track error rates and alert
- Log Contextually: Include client ID and error details
- Test Error Paths: Use TestClock to simulate failures
- Document Policy: Make error handling explicit
Next Steps
- Testing Architecture - Test error handling
- Production Considerations - Deploy with confidence
- Design Decisions - Why these choices?
Performance Design
Deep dive into performance optimization techniques and characteristics.
Hot Path Optimization
The check_request() method is optimized for minimal latency:
Execution Path
#![allow(unused)]
fn main() {
pub fn check_request(&self, client_id: T) -> Result<FluxLimiterDecision, FluxLimiterError> {
// 1. Single clock call (~50-100ns)
let current_time_nanos = self.clock.now()?;
// 2. Single map lookup (~10-50ns)
let previous_tat_nanos = self.client_state
.get(&client_id)
.map(|entry| *entry.value())
.unwrap_or(current_time_nanos);
// 3. Integer arithmetic (~5-10ns)
let is_conforming = current_time_nanos >=
previous_tat_nanos.saturating_sub(self.tolerance_nanos);
// 4. Conditional update (~10-50ns)
if is_conforming {
let new_tat_nanos = current_time_nanos
.max(previous_tat_nanos) + self.rate_nanos;
self.client_state.insert(client_id, new_tat_nanos);
}
// 5. Metadata calculation (~5-10ns)
// 6. Return decision
}
}
Total latency: ~100-250ns typical case
Optimization Techniques
- Single Clock Call: One time source access per request
- Single Map Operation: Either get or get+insert
- Integer Arithmetic: No floating-point operations in hot path
- No Allocations: Reuses existing memory
- Minimal Branching: Straight-line execution
Memory Layout
FluxLimiter Structure
#![allow(unused)]
fn main() {
FluxLimiter<String, SystemClock> {
rate_nanos: u64, // 8 bytes - cache-friendly
tolerance_nanos: u64, // 8 bytes
client_state: Arc<..>, // 8 bytes - pointer
clock: SystemClock, // 0 bytes - zero-sized type
}
// Total: 24 bytes - fits in single cache line
}
Cache Efficiency:
- Small struct size (24-32 bytes)
- Frequently accessed fields grouped
- Arc enables sharing without duplication
- Zero-sized clock type (SystemClock)
Per-Client State
#![allow(unused)]
fn main() {
DashMap<String, u64> entry:
String: ~24 bytes (pointer + len + capacity)
u64: 8 bytes (TAT)
Overhead: ~16 bytes (hash map metadata)
// Total: ~48 bytes per client
}
Memory Scalability
Clients Memory Usage Example
1,000 ~48 KB Small API
10,000 ~480 KB Medium service
100,000 ~4.8 MB Large service
1,000,000 ~48 MB Very large service
10,000,000 ~480 MB Massive scale
Memory Growth: Linear with number of active clients
Algorithmic Complexity
Time Complexity
| Operation | Complexity | Notes |
|---|---|---|
check_request() | O(1) | Hash map lookup + arithmetic |
cleanup_stale_clients() | O(n) | Iterates all clients |
rate() | O(1) | Simple field access |
burst() | O(1) | Simple calculation |
Space Complexity
| Component | Complexity | Notes |
|---|---|---|
| Per-client state | O(1) | Single u64 per client |
| Total state | O(n) | Where n = active clients |
| Configuration | O(1) | Fixed size |
Concurrency Performance
Lock-Free Benefits
DashMap provides near-linear scalability:
Threads Throughput Latency
1 1.0x 100ns
2 1.9x 105ns
4 3.7x 110ns
8 7.0x 120ns
16 12.5x 130ns
Key factors:
- Different clients = different shards (low contention)
- Same client = same shard (necessary serialization)
- Short critical sections
Contention Patterns
Low Contention (different clients):
#![allow(unused)]
fn main() {
// Thread 1
limiter.check_request("client_1")?; // Shard 0
// Thread 2
limiter.check_request("client_2")?; // Shard 1
// No contention - lock-free
}
Medium Contention (same shard):
#![allow(unused)]
fn main() {
// Thread 1
limiter.check_request("client_1")?; // Shard 0
// Thread 2
limiter.check_request("client_100")?; // Shard 0
// Brief contention - short lock
}
High Contention (same client):
#![allow(unused)]
fn main() {
// Thread 1
limiter.check_request("client_1")?; // Shard 0
// Thread 2
limiter.check_request("client_1")?; // Shard 0
// Serialized - correct behavior
}
Benchmark Results
Single-Threaded Performance
#![allow(unused)]
fn main() {
test bench_check_request_allowed ... bench: 85 ns/iter (+/- 5)
test bench_check_request_denied ... bench: 82 ns/iter (+/- 4)
test bench_cleanup ... bench: 45 μs/iter (+/- 2)
}
Multi-Threaded Performance
#![allow(unused)]
fn main() {
test bench_concurrent_8_threads ... bench: 120 ns/iter (+/- 10)
test bench_concurrent_16_threads ... bench: 135 ns/iter (+/- 12)
}
Memory Overhead
#![allow(unused)]
fn main() {
test bench_memory_1k_clients ... ~48 KB
test bench_memory_100k_clients ... ~4.8 MB
test bench_memory_1m_clients ... ~48 MB
}
Optimization Decisions
Integer Arithmetic vs Floating-Point
Integer (chosen):
- ✅ Exact precision
- ✅ Faster arithmetic
- ✅ No rounding errors
- ✅ Cache-friendly (u64)
Floating-point (rejected):
- ❌ Precision loss
- ❌ Slower arithmetic
- ❌ Accumulating errors
- ❌ Larger memory footprint
DashMap vs Alternatives
DashMap (chosen):
- ✅ Lock-free reads/writes
- ✅ Good scalability
- ✅ Battle-tested
- ✅ Ergonomic API
Mutex
- ❌ Global locking
- ❌ Poor scalability
- ❌ Read contention
RwLock
- ❌ Reader/writer contention
- ❌ Write starvation possible
Custom lock-free map (rejected):
- ❌ Implementation complexity
- ❌ Maintenance burden
- ❌ Potential bugs
Nanosecond Precision
u64 nanoseconds (chosen):
- ✅ Maximum precision
- ✅ Integer arithmetic
- ✅ No overflow for 584 years
- ✅ Direct system time conversion
Milliseconds (rejected):
- ❌ Insufficient for high rates
- ❌ Precision loss
Duration (rejected):
- ❌ Larger memory footprint
- ❌ More complex arithmetic
Performance Tuning
Client ID Selection
Choose efficient client ID types:
#![allow(unused)]
fn main() {
// Fastest: Numeric IDs
let limiter = FluxLimiter::<u64, _>::with_config(config, clock)?;
limiter.check_request(user_id_numeric)?; // ~80ns
// Fast: IP addresses
let limiter = FluxLimiter::<IpAddr, _>::with_config(config, clock)?;
limiter.check_request(client_ip)?; // ~90ns
// Slower: Strings (requires allocation)
let limiter = FluxLimiter::<String, _>::with_config(config, clock)?;
limiter.check_request(user_id.to_string())?; // ~120ns
}
Cleanup Strategy
Balance memory usage vs. cleanup overhead:
#![allow(unused)]
fn main() {
// Frequent cleanup: Lower memory, higher overhead
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(600)); // 10 min
loop {
interval.tick().await;
let _ = limiter.cleanup_stale_clients(hour_nanos);
}
});
// Infrequent cleanup: Higher memory, lower overhead
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(86400)); // 24 hours
loop {
interval.tick().await;
let _ = limiter.cleanup_stale_clients(week_nanos);
}
});
}
Avoid Allocations
#![allow(unused)]
fn main() {
// Good: Reuse strings
let client_id = format!("user_{}", user_id);
for _ in 0..100 {
limiter.check_request(client_id.clone())?;
}
// Bad: Allocate every time
for _ in 0..100 {
limiter.check_request(format!("user_{}", user_id))?;
}
}
Profiling and Monitoring
Latency Monitoring
#![allow(unused)]
fn main() {
use std::time::Instant;
let start = Instant::now();
let decision = limiter.check_request(client_id)?;
let latency = start.elapsed();
if latency.as_micros() > 10 {
warn!("Slow rate limit check: {:?}", latency);
}
}
Memory Monitoring
#![allow(unused)]
fn main() {
fn estimate_memory_usage<T, C>(limiter: &FluxLimiter<T, C>) -> usize
where
T: Hash + Eq + Clone,
C: Clock,
{
let client_count = estimate_client_count();
let bytes_per_client = std::mem::size_of::<T>() + 8 + 16; // ID + TAT + overhead
client_count * bytes_per_client
}
}
Throughput Tracking
#![allow(unused)]
fn main() {
use std::sync::atomic::{AtomicU64, Ordering};
static REQUEST_COUNT: AtomicU64 = AtomicU64::new(0);
fn track_throughput(limiter: &FluxLimiter<String, SystemClock>, client_id: String) -> bool {
REQUEST_COUNT.fetch_add(1, Ordering::Relaxed);
match limiter.check_request(client_id) {
Ok(decision) => decision.allowed,
Err(_) => false,
}
}
// Report throughput
fn report_throughput(duration: Duration) {
let count = REQUEST_COUNT.swap(0, Ordering::Relaxed);
let throughput = count as f64 / duration.as_secs_f64();
println!("Throughput: {:.2} req/s", throughput);
}
}
Performance Characteristics Summary
| Metric | Value |
|---|---|
| Latency (typical) | 100-250ns |
| Throughput (single thread) | ~10M req/s |
| Throughput (8 threads) | ~70M req/s |
| Memory per client | ~48 bytes |
| Scalability | Near-linear |
| Cleanup overhead | O(n) but infrequent |
Best Practices
- Use numeric client IDs when possible for best performance
- Share limiter via Arc to avoid state duplication
- Cleanup periodically to prevent memory growth
- Profile in production to identify bottlenecks
- Monitor latency and alert on degradation
- Batch operations when checking multiple clients
Next Steps
- Concurrency Model - Understand thread safety
- Testing Architecture - Performance testing
- Design Decisions - Why these choices?
Testing Architecture
Comprehensive testing strategy for deterministic and reliable tests.
Test Organization
Tests are organized in tests/ratelimiter/ with clear separation of concerns:
tests/ratelimiter/
├── fixtures/
│ ├── test_clock.rs # TestClock implementation
│ └── mod.rs
├── gcra_algorithm_tests.rs # Core algorithm correctness
├── config_tests.rs # Configuration validation
├── error_tests.rs # Error handling and recovery
├── cleanup_tests.rs # Memory management
├── performance_tests.rs # Performance characteristics
├── decision_metadata_tests.rs # Decision metadata validation
└── main.rs # Test module organization
TestClock Design
The TestClock is the foundation for deterministic testing.
Implementation
#![allow(unused)]
fn main() {
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, AtomicBool, Ordering};
pub struct TestClock {
time: Arc<AtomicU64>, // Current time in nanoseconds
should_fail: Arc<AtomicBool>, // Failure simulation flag
}
impl TestClock {
pub fn new(initial_time_secs: f64) -> Self {
Self {
time: Arc::new(AtomicU64::new((initial_time_secs * 1e9) as u64)),
should_fail: Arc::new(AtomicBool::new(false)),
}
}
pub fn advance(&self, duration_secs: f64) {
let duration_nanos = (duration_secs * 1e9) as u64;
self.time.fetch_add(duration_nanos, Ordering::SeqCst);
}
pub fn set_time(&self, time_secs: f64) {
self.time.store((time_secs * 1e9) as u64, Ordering::SeqCst);
}
pub fn fail_next_call(&self) {
self.should_fail.store(true, Ordering::SeqCst);
}
}
impl Clock for TestClock {
fn now(&self) -> Result<u64, ClockError> {
if self.should_fail.swap(false, Ordering::SeqCst) {
return Err(ClockError::SystemTimeError);
}
Ok(self.time.load(Ordering::SeqCst))
}
}
}
Key Features
- Deterministic Time: Controlled time progression
- Thread-Safe: Can be shared across test threads
- Failure Simulation: Can simulate clock errors
- Precise Control: Nanosecond-level manipulation
Usage Example
#![allow(unused)]
fn main() {
#[test]
fn test_rate_limiting() {
let clock = TestClock::new(0.0);
let limiter = FluxLimiter::with_config(
FluxLimiterConfig::new(10.0, 5.0),
clock.clone(),
).unwrap();
// First request at t=0
assert!(limiter.check_request("client1").unwrap().allowed);
// Advance time by 0.1 seconds
clock.advance(0.1);
// Second request should be allowed
assert!(limiter.check_request("client1").unwrap().allowed);
}
}
Test Categories
1. GCRA Algorithm Tests
Test core algorithm correctness:
#![allow(unused)]
fn main() {
#[test]
fn test_sustained_rate() {
let clock = TestClock::new(0.0);
let config = FluxLimiterConfig::new(10.0, 0.0); // 10 req/s, no burst
let limiter = FluxLimiter::with_config(config, clock.clone()).unwrap();
// First request allowed
assert!(limiter.check_request("client1").unwrap().allowed);
// Request 0.05s later (too early)
clock.advance(0.05);
assert!(!limiter.check_request("client1").unwrap().allowed);
// Request 0.1s after first (exactly on time)
clock.advance(0.05);
assert!(limiter.check_request("client1").unwrap().allowed);
}
}
2. Burst Capacity Tests
Verify burst handling:
#![allow(unused)]
fn main() {
#[test]
fn test_burst_capacity() {
let clock = TestClock::new(0.0);
let config = FluxLimiterConfig::new(10.0, 5.0); // 5 request burst
let limiter = FluxLimiter::with_config(config, clock.clone()).unwrap();
// Should allow ~6 requests immediately (1 + burst)
for _ in 0..6 {
assert!(limiter.check_request("client1").unwrap().allowed);
}
// 7th request should be denied
assert!(!limiter.check_request("client1").unwrap().allowed);
// After rate interval, allow one more
clock.advance(0.1);
assert!(limiter.check_request("client1").unwrap().allowed);
}
}
3. Configuration Tests
Validate configuration handling:
#![allow(unused)]
fn main() {
#[test]
fn test_invalid_rate() {
let config = FluxLimiterConfig::new(-10.0, 5.0);
let result = FluxLimiter::with_config(config, SystemClock);
assert!(matches!(result, Err(FluxLimiterError::InvalidRate)));
}
#[test]
fn test_invalid_burst() {
let config = FluxLimiterConfig::new(10.0, -5.0);
let result = FluxLimiter::with_config(config, SystemClock);
assert!(matches!(result, Err(FluxLimiterError::InvalidBurst)));
}
}
4. Error Handling Tests
Test error scenarios and recovery:
#![allow(unused)]
fn main() {
#[test]
fn test_clock_error_handling() {
let clock = TestClock::new(0.0);
let limiter = FluxLimiter::with_config(
FluxLimiterConfig::new(10.0, 5.0),
clock.clone(),
).unwrap();
// Normal operation
assert!(limiter.check_request("client1").unwrap().allowed);
// Simulate clock failure
clock.fail_next_call();
let result = limiter.check_request("client1");
assert!(matches!(result, Err(FluxLimiterError::ClockError(_))));
// Verify recovery
assert!(limiter.check_request("client1").unwrap().allowed);
}
#[test]
fn test_multiple_clock_failures() {
let clock = TestClock::new(0.0);
let limiter = FluxLimiter::with_config(
FluxLimiterConfig::new(10.0, 5.0),
clock.clone(),
).unwrap();
// Multiple consecutive failures
for _ in 0..5 {
clock.fail_next_call();
assert!(limiter.check_request("client1").is_err());
}
// Recovery
assert!(limiter.check_request("client1").unwrap().allowed);
}
}
5. Cleanup Tests
Test memory management:
#![allow(unused)]
fn main() {
#[test]
fn test_cleanup_stale_clients() {
let clock = TestClock::new(0.0);
let limiter = FluxLimiter::with_config(
FluxLimiterConfig::new(10.0, 5.0),
clock.clone(),
).unwrap();
// Create some client state
limiter.check_request("client1").unwrap();
limiter.check_request("client2").unwrap();
limiter.check_request("client3").unwrap();
// Advance time by 1 hour
clock.advance(3600.0);
// Cleanup clients older than 30 minutes
let threshold = 30 * 60 * 1_000_000_000u64;
let removed = limiter.cleanup_stale_clients(threshold).unwrap();
assert_eq!(removed, 3);
}
}
6. Concurrency Tests
Test thread safety:
#![allow(unused)]
fn main() {
#[test]
fn test_concurrent_access() {
use std::sync::Arc;
use std::thread;
let config = FluxLimiterConfig::new(100.0, 50.0);
let limiter = Arc::new(
FluxLimiter::with_config(config, SystemClock).unwrap()
);
let handles: Vec<_> = (0..10)
.map(|i| {
let limiter = Arc::clone(&limiter);
thread::spawn(move || {
for j in 0..1000 {
let client_id = format!("client_{}_{}", i, j);
limiter.check_request(client_id).unwrap();
}
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
}
}
7. Decision Metadata Tests
Verify decision metadata accuracy:
#![allow(unused)]
fn main() {
#[test]
fn test_retry_after_metadata() {
let clock = TestClock::new(0.0);
let config = FluxLimiterConfig::new(10.0, 0.0);
let limiter = FluxLimiter::with_config(config, clock.clone()).unwrap();
// First request allowed
limiter.check_request("client1").unwrap();
// Second request denied
let decision = limiter.check_request("client1").unwrap();
assert!(!decision.allowed);
// Verify retry_after is approximately 0.1 seconds
let retry_after = decision.retry_after_seconds.unwrap();
assert!((retry_after - 0.1).abs() < 0.001);
}
#[test]
fn test_remaining_capacity() {
let clock = TestClock::new(0.0);
let config = FluxLimiterConfig::new(10.0, 5.0);
let limiter = FluxLimiter::with_config(config, clock.clone()).unwrap();
// First request
let decision = limiter.check_request("client1").unwrap();
assert!(decision.allowed);
// Should have some remaining capacity
assert!(decision.remaining_capacity.is_some());
// Make more requests and verify capacity decreases
for _ in 0..5 {
limiter.check_request("client1").unwrap();
}
// Capacity should be depleted
let decision = limiter.check_request("client1").unwrap();
assert!(!decision.allowed);
}
}
Performance Testing
Latency Benchmarks
#![allow(unused)]
fn main() {
#[cfg(test)]
mod benchmarks {
use super::*;
use std::time::Instant;
#[test]
fn bench_check_request_latency() {
let limiter = FluxLimiter::with_config(
FluxLimiterConfig::new(1000.0, 500.0),
SystemClock,
).unwrap();
let iterations = 100_000;
let start = Instant::now();
for i in 0..iterations {
let client_id = format!("client_{}", i % 1000);
limiter.check_request(client_id).unwrap();
}
let elapsed = start.elapsed();
let avg_latency = elapsed.as_nanos() / iterations;
println!("Average latency: {}ns", avg_latency);
assert!(avg_latency < 1000); // Should be under 1μs
}
}
}
Throughput Tests
#![allow(unused)]
fn main() {
#[test]
fn test_throughput() {
let limiter = Arc::new(
FluxLimiter::with_config(
FluxLimiterConfig::new(10_000.0, 5_000.0),
SystemClock,
).unwrap()
);
let start = Instant::now();
let threads = 8;
let requests_per_thread = 100_000;
let handles: Vec<_> = (0..threads)
.map(|t| {
let limiter = Arc::clone(&limiter);
thread::spawn(move || {
for i in 0..requests_per_thread {
let client_id = format!("client_{}_{}", t, i % 1000);
limiter.check_request(client_id).unwrap();
}
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
let elapsed = start.elapsed();
let total_requests = threads * requests_per_thread;
let throughput = total_requests as f64 / elapsed.as_secs_f64();
println!("Throughput: {:.2} req/s", throughput);
}
}
Test Utilities
Helper Functions
#![allow(unused)]
fn main() {
fn assert_allowed(result: Result<FluxLimiterDecision, FluxLimiterError>) {
match result {
Ok(decision) => assert!(decision.allowed, "Expected request to be allowed"),
Err(e) => panic!("Expected allowed decision, got error: {:?}", e),
}
}
fn assert_denied(result: Result<FluxLimiterDecision, FluxLimiterError>) {
match result {
Ok(decision) => assert!(!decision.allowed, "Expected request to be denied"),
Err(e) => panic!("Expected denied decision, got error: {:?}", e),
}
}
fn assert_error<T>(result: Result<T, FluxLimiterError>) {
assert!(result.is_err(), "Expected error, got success");
}
}
Test Fixtures
#![allow(unused)]
fn main() {
fn create_test_limiter(rate: f64, burst: f64) -> (FluxLimiter<String, TestClock>, TestClock) {
let clock = TestClock::new(0.0);
let config = FluxLimiterConfig::new(rate, burst);
let limiter = FluxLimiter::with_config(config, clock.clone()).unwrap();
(limiter, clock)
}
}
Test Coverage
Aim for comprehensive coverage:
- ✅ Algorithm correctness
- ✅ Configuration validation
- ✅ Error handling and recovery
- ✅ Concurrency safety
- ✅ Memory management
- ✅ Decision metadata accuracy
- ✅ Performance characteristics
- ✅ Edge cases and boundary conditions
Best Practices
- Use TestClock for deterministic time control
- Test Error Paths including clock failures
- Verify Metadata not just allow/deny
- Test Concurrency with multiple threads
- Measure Performance with benchmarks
- Test Edge Cases like zero burst, high rates
- Cleanup After Tests to avoid state leakage
Next Steps
- Design Decisions - Understand the rationale
- Future Extensibility - Planned enhancements
Design Decisions
Detailed rationale behind key architectural choices in Flux Limiter.
Algorithm Selection: GCRA vs Token Bucket
Decision: Use GCRA
Rationale: The Generic Cell Rate Algorithm (GCRA) provides mathematical equivalence to Token Bucket with several implementation advantages.
GCRA Advantages
✅ Exact Mathematical Precision
- Integer arithmetic with u64 nanoseconds
- No floating-point precision loss
- No accumulating rounding errors
✅ Stateless Calculation
- No background token refill processes
- No timer management overhead
- Simpler implementation
✅ Efficient State
- One timestamp (u64) per client
- vs. token count + last refill time
- Smaller memory footprint
✅ Deterministic Behavior
- Exact timing with integer arithmetic
- Predictable results in tests
- No timer drift
Token Bucket Drawbacks
❌ Floating-Point Precision
- Token counts as floats
- Accumulating rounding errors
- Precision loss over time
❌ Background Processes
- Requires token refill mechanism
- Timer management complexity
- Potential drift issues
❌ Complex State
- Token count + timestamp
- More memory per client
- More complex updates
Mathematical Equivalence
Both algorithms produce the same rate limiting behavior:
GCRA: Request allowed if current_time >= TAT - tolerance
Token Bucket: Request allowed if tokens >= 1
The difference is implementation, not behavior.
Data Structure: DashMap
Decision: Use DashMap for Client State
Rationale: DashMap provides the best balance of performance, correctness, and ergonomics.
DashMap Benefits
✅ Lock-Free Concurrency
- Segmented locking
- Different shards = no contention
- Near-linear scalability
✅ Battle-Tested
- Mature library
- Well-documented
- Proven in production
✅ Good Performance
- ~100ns for get/insert
- Minimal overhead
- Efficient memory layout
✅ Ergonomic API
- Similar to std::HashMap
- Easy to use correctly
- Clear semantics
Alternative: Mutex
❌ Global Locking
- All operations serialize
- Poor scalability
- Read contention
Why rejected: Unacceptable performance bottleneck for concurrent access.
Alternative: RwLock
❌ Reader/Writer Contention
- Writers block all readers
- Write starvation possible
- Complex lock management
Why rejected: Still has contention issues, doesn’t scale well.
Alternative: Custom Lock-Free Map
❌ Implementation Complexity
- Requires deep concurrency expertise
- Bug-prone
- High maintenance burden
❌ Maturity Risk
- Unproven in production
- Potential correctness issues
- Performance unknowns
Why rejected: Not worth the complexity and risk when DashMap exists.
Time Representation: Nanoseconds
Decision: Use u64 Nanoseconds
Rationale: Nanoseconds provide maximum precision with efficient integer arithmetic.
Benefits
✅ Maximum Precision
- Supports very high rates (1B req/s)
- No precision loss
- Exact calculations
✅ Integer Arithmetic
- Faster than floating-point
- No rounding errors
- Deterministic results
✅ No Overflow
- u64::MAX nanoseconds = 584 years
- Sufficient for any realistic usage
- Saturating arithmetic for safety
✅ System Time Compatible
- Direct conversion from SystemTime
- No conversion overhead
- Natural representation
Alternative: Milliseconds
❌ Insufficient Precision
- Can’t support rates > 1000 req/s precisely
- Precision loss for high rates
- Rounding errors
Why rejected: Limits maximum rate and precision.
Alternative: Duration Type
❌ Memory Overhead
- Larger struct (2x u64)
- More complex arithmetic
- Cache inefficient
❌ Performance
- More expensive operations
- Additional abstractions
- Not necessary
Why rejected: Overhead without benefit for our use case.
Alternative: f64 Seconds
❌ Floating-Point Issues
- Precision loss
- Rounding errors
- Non-deterministic
❌ Performance
- Slower arithmetic
- Cache less friendly
- Conversion overhead
Why rejected: Precision and performance concerns.
Error Handling: Result Types
Decision: Use Result for All Errors
Rationale: Explicit error handling enables graceful degradation and better observability.
Benefits
✅ Explicit Error Handling
- Caller must handle errors
- No silent failures
- Clear error paths
✅ Graceful Degradation
- Application chooses policy
- Fail-open or fail-closed
- Fallback strategies possible
✅ Better Observability
- Errors can be logged
- Metrics can be collected
- Debugging easier
✅ Production-Ready
- Handles clock failures
- Recoverable errors
- Robust operation
Alternative: Panics
❌ Difficult Recovery
- Can’t recover from panic
- Crashes entire application
- Poor user experience
❌ Poor Observability
- Hard to monitor
- Difficult to debug
- No graceful degradation
❌ Not Production-Ready
- Unacceptable for library code
- Forces policy on users
- Fragile
Why rejected: Panics are inappropriate for library code and prevent graceful error handling.
Alternative: Silent Failures (allow on error)
❌ Hidden Bugs
- Errors go unnoticed
- Hard to debug
- Incorrect behavior
❌ No Observability
- Can’t track error rates
- No alerting possible
- Silent degradation
Why rejected: Silent failures make debugging impossible and hide problems.
Generic Design: Client ID Types
Decision: Generic Client ID (T: Hash + Eq + Clone)
Rationale: Generics provide flexibility while maintaining type safety and performance.
Benefits
✅ Flexibility
- String: User IDs, API keys
- IpAddr: IP-based limiting
- u64: Numeric IDs
- Custom types: Complex scenarios
✅ Zero-Cost Abstraction
- No runtime overhead
- No boxing/dynamic dispatch
- Compile-time optimization
✅ Type Safety
- Compile-time type checking
- Can’t mix different ID types
- Clear API
✅ Performance
- Monomorphization
- Inlined operations
- Optimal code generation
Alternative: Trait Object (dyn ClientId)
❌ Runtime Overhead
- Dynamic dispatch
- Heap allocation (Box)
- Slower performance
❌ Less Ergonomic
- Requires trait implementations
- More verbose usage
- Lifetime complexity
Why rejected: Performance overhead without significant benefit.
Alternative: String Only
❌ Limited Flexibility
- Forces string conversion
- Allocation overhead
- Can’t optimize for numeric IDs
❌ Performance
- Always allocates
- Slower hash/compare
- Memory overhead
Why rejected: Generics provide flexibility without cost.
Clock Abstraction
Decision: Clock Trait
Rationale: Abstracting time enables testing and handles real-world clock issues.
Benefits
✅ Testable
- TestClock for deterministic tests
- Precise time control
- Failure simulation
✅ Handles Real-World Issues
- Clock going backwards
- System suspend/resume
- Virtualization issues
✅ Flexible
- Custom time sources
- Mock implementations
- Production/test separation
✅ Error Handling
- Graceful clock failures
- Explicit error types
- Recoverable
Alternative: Direct SystemTime
❌ Not Testable
- Can’t control time in tests
- Non-deterministic tests
- Hard to test edge cases
❌ No Error Handling
- SystemTime can fail
- Can’t recover
- Silent failures
Why rejected: Testing and error handling require abstraction.
Memory Management: Manual Cleanup
Decision: Explicit cleanup_stale_clients()
Rationale: Manual cleanup gives users control over memory/CPU tradeoff.
Benefits
✅ User Control
- Choose cleanup frequency
- Choose threshold
- Tune for workload
✅ No Background Threads
- Simpler implementation
- No thread overhead
- User controls when
✅ Predictable Performance
- Cleanup only when called
- No surprising pauses
- Controllable overhead
Alternative: Automatic Background Cleanup
❌ Thread Overhead
- Requires background thread
- Thread management complexity
- Resource usage
❌ Loss of Control
- Can’t control timing
- Fixed thresholds
- May not fit all workloads
❌ Complexity
- Thread lifecycle management
- Shutdown coordination
- Error handling
Why rejected: Manual cleanup is simpler and more flexible.
Alternative: Reference Counting
❌ Incorrect Semantics
- Clients don’t have clear ownership
- Memory leak if client keeps requesting
- Doesn’t solve problem
Why rejected: Doesn’t match use case.
Configuration: Builder Pattern
Decision: Support Builder Pattern
Rationale: Builder pattern provides ergonomic API while maintaining validation.
Benefits
✅ Ergonomic
- Clear, readable code
- Method chaining
- Self-documenting
✅ Flexible
- Can set individual fields
- Start with defaults
- Override as needed
✅ Validation
- Validate at build time
- Clear error messages
- Fail fast
Example
#![allow(unused)]
fn main() {
let config = FluxLimiterConfig::new(0.0, 0.0)
.rate(100.0)
.burst(50.0);
}
vs.
#![allow(unused)]
fn main() {
let config = FluxLimiterConfig::new(100.0, 50.0);
}
Both are supported.
Decision Summary
| Decision | Chosen | Rejected | Rationale |
|---|---|---|---|
| Algorithm | GCRA | Token Bucket | Precision, simplicity |
| Storage | DashMap | Mutex, RwLock | Performance, scalability |
| Time | u64 nanos | ms, Duration, f64 | Precision, performance |
| Errors | Result | Panic, Silent | Observability, recovery |
| Client ID | Generic | Trait object, String | Flexibility, performance |
| Clock | Trait | Direct SystemTime | Testing, error handling |
| Cleanup | Manual | Automatic | Control, simplicity |
| Config | Builder | Constructor only | Ergonomics |
Next Steps
- Future Extensibility - Planned enhancements
- Performance Design - Performance implications
Future Extensibility
Planned architecture enhancements and extension points.
Planned Features
1. Async Support
Add async variants of key methods for better integration with async runtimes.
#![allow(unused)]
fn main() {
impl<T, C> FluxLimiter<T, C>
where
T: Hash + Eq + Clone + Send + Sync,
C: Clock + Send + Sync,
{
pub async fn check_request_async(
&self,
client_id: T,
) -> Result<FluxLimiterDecision, FluxLimiterError> {
// Non-blocking implementation
// Async clock sources
// Future-based cleanup
}
pub async fn cleanup_stale_clients_async(
&self,
stale_threshold_nanos: u64,
) -> Result<usize, FluxLimiterError> {
// Async cleanup without blocking
}
}
}
Benefits:
- Better integration with Tokio/async-std
- Non-blocking I/O for distributed backends
- Async clock sources
Implementation Strategy:
- Feature flag:
async - Separate module:
flux_limiter::async - Maintain backward compatibility
2. Persistence Layer
Support for persisting rate limit state across restarts.
#![allow(unused)]
fn main() {
#[async_trait]
pub trait StateStore: Send + Sync {
async fn load_state(
&self,
client_id: &str,
) -> Result<Option<u64>, StoreError>;
async fn save_state(
&self,
client_id: &str,
tat: u64,
) -> Result<(), StoreError>;
async fn cleanup_stale(
&self,
threshold: u64,
) -> Result<usize, StoreError>;
}
}
Implementations:
Redis Backend
#![allow(unused)]
fn main() {
pub struct RedisStateStore {
client: redis::Client,
key_prefix: String,
}
impl StateStore for RedisStateStore {
async fn load_state(&self, client_id: &str) -> Result<Option<u64>, StoreError> {
let key = format!("{}:{}", self.key_prefix, client_id);
// Load TAT from Redis
}
async fn save_state(&self, client_id: &str, tat: u64) -> Result<(), StoreError> {
let key = format!("{}:{}", self.key_prefix, client_id);
// Save TAT to Redis with TTL
}
}
}
Database Backend
#![allow(unused)]
fn main() {
pub struct DatabaseStateStore {
pool: sqlx::PgPool,
}
impl StateStore for DatabaseStateStore {
async fn load_state(&self, client_id: &str) -> Result<Option<u64>, StoreError> {
sqlx::query_scalar("SELECT tat FROM rate_limits WHERE client_id = $1")
.bind(client_id)
.fetch_optional(&self.pool)
.await
.map_err(Into::into)
}
}
}
Benefits:
- State survives restarts
- Distributed rate limiting
- Shared state across instances
Implementation Strategy:
- Feature flags:
redis,postgres, etc. - Trait-based design
- Fallback to in-memory
3. Observability Integration
Built-in metrics and tracing support.
#![allow(unused)]
fn main() {
pub trait RateLimiterMetrics: Send + Sync {
fn record_decision(
&self,
client_id: &str,
allowed: bool,
latency: Duration,
);
fn record_error(&self, error: &FluxLimiterError);
fn record_cleanup(&self, removed: usize, duration: Duration);
}
}
Implementations:
Prometheus Metrics
#![allow(unused)]
fn main() {
pub struct PrometheusMetrics {
requests_total: IntCounterVec,
requests_allowed: IntCounterVec,
requests_denied: IntCounterVec,
check_duration: HistogramVec,
clock_errors: IntCounter,
}
impl RateLimiterMetrics for PrometheusMetrics {
fn record_decision(&self, client_id: &str, allowed: bool, latency: Duration) {
self.requests_total.with_label_values(&[client_id]).inc();
if allowed {
self.requests_allowed.with_label_values(&[client_id]).inc();
} else {
self.requests_denied.with_label_values(&[client_id]).inc();
}
self.check_duration
.with_label_values(&[client_id])
.observe(latency.as_secs_f64());
}
}
}
OpenTelemetry Tracing
#![allow(unused)]
fn main() {
pub struct OpenTelemetryMetrics {
meter: Meter,
}
impl RateLimiterMetrics for OpenTelemetryMetrics {
fn record_decision(&self, client_id: &str, allowed: bool, latency: Duration) {
let span = tracing::span!(
tracing::Level::INFO,
"rate_limit_check",
client_id = %client_id,
allowed = allowed,
latency_us = latency.as_micros() as i64,
);
let _enter = span.enter();
}
}
}
Benefits:
- Built-in monitoring
- Standard metrics
- Distributed tracing
Implementation Strategy:
- Feature flags:
prometheus,opentelemetry - Optional trait implementations
- Zero-cost when disabled
4. Alternative Algorithms
Support for multiple rate limiting algorithms.
#![allow(unused)]
fn main() {
pub trait RateLimitAlgorithm<T>: Send + Sync {
fn check_request(
&self,
client_id: &T,
current_time: u64,
) -> FluxLimiterDecision;
fn cleanup_stale(
&self,
threshold: u64,
) -> Result<usize, FluxLimiterError>;
}
}
Implementations:
Token Bucket
#![allow(unused)]
fn main() {
pub struct TokenBucketAlgorithm {
rate: f64,
capacity: f64,
client_state: DashMap<String, TokenBucketState>,
}
struct TokenBucketState {
tokens: f64,
last_refill: u64,
}
}
Sliding Window
#![allow(unused)]
fn main() {
pub struct SlidingWindowAlgorithm {
window_size_nanos: u64,
max_requests: u64,
client_state: DashMap<String, VecDeque<u64>>,
}
}
Leaky Bucket
#![allow(unused)]
fn main() {
pub struct LeakyBucketAlgorithm {
rate: f64,
capacity: u64,
client_state: DashMap<String, LeakyBucketState>,
}
}
Benefits:
- Algorithm flexibility
- Use case specific optimization
- Easy comparison
Implementation Strategy:
- Feature flag:
algorithms - Separate modules
- Common trait interface
5. Distributed Rate Limiting
Coordinate rate limits across multiple instances.
#![allow(unused)]
fn main() {
pub struct DistributedFluxLimiter<T, C, S>
where
T: Hash + Eq + Clone + Send + Sync,
C: Clock,
S: StateStore,
{
local: FluxLimiter<T, C>,
store: S,
sync_interval: Duration,
}
impl<T, C, S> DistributedFluxLimiter<T, C, S> {
pub async fn check_request(
&self,
client_id: T,
) -> Result<FluxLimiterDecision, FluxLimiterError> {
// 1. Check local cache
// 2. If miss, fetch from store
// 3. Make decision
// 4. Update both local and remote
}
async fn sync_state(&self) -> Result<(), StoreError> {
// Periodic sync with remote store
}
}
}
Strategies:
- Best-effort: Local-first with periodic sync
- Strict: Remote check every time (slower but accurate)
- Hybrid: Local fast path, remote for edge cases
Benefits:
- Multi-instance coordination
- Cluster-wide rate limits
- Horizontal scaling
6. Adaptive Rate Limiting
Dynamically adjust rates based on backend health.
#![allow(unused)]
fn main() {
pub struct AdaptiveRateLimiter<T, C, H>
where
T: Hash + Eq + Clone,
C: Clock,
H: HealthCheck,
{
base_config: FluxLimiterConfig,
limiter: RwLock<FluxLimiter<T, C>>,
health_check: H,
adjustment_factor: AtomicU64, // Fixed-point percentage
}
impl<T, C, H> AdaptiveRateLimiter<T, C, H> {
pub async fn adjust_rate(&self) -> Result<(), FluxLimiterError> {
let health = self.health_check.check().await;
let new_factor = match health {
Health::Good => 1.0, // Normal rate
Health::Degraded => 0.7, // 70% of normal
Health::Bad => 0.3, // 30% of normal
};
// Update limiter with adjusted rate
let new_rate = self.base_config.rate() * new_factor;
// ... recreate limiter ...
}
}
}
Benefits:
- Protect degraded backends
- Auto-recovery
- Dynamic adjustment
7. Quota Management
Track cumulative usage over longer periods.
#![allow(unused)]
fn main() {
pub struct QuotaManager<T>
where
T: Hash + Eq + Clone,
{
limiter: FluxLimiter<T, SystemClock>,
quotas: DashMap<T, QuotaState>,
}
struct QuotaState {
total_allowed: u64,
period_start: u64,
quota_limit: u64,
}
impl<T> QuotaManager<T> {
pub fn check_request(
&self,
client_id: T,
) -> Result<QuotaDecision, FluxLimiterError> {
// 1. Check rate limit
// 2. Check quota
// 3. Update quota usage
// 4. Return combined decision
}
}
}
Benefits:
- Long-term limits (daily, monthly)
- Billing integration
- Multi-tier quotas
Extensibility Patterns
Plugin Architecture
#![allow(unused)]
fn main() {
pub trait FluxLimiterPlugin: Send + Sync {
fn before_check(&self, client_id: &str);
fn after_check(&self, client_id: &str, decision: &FluxLimiterDecision);
fn on_error(&self, client_id: &str, error: &FluxLimiterError);
}
pub struct PluggableFluxLimiter<T, C>
where
T: Hash + Eq + Clone,
C: Clock,
{
limiter: FluxLimiter<T, C>,
plugins: Vec<Box<dyn FluxLimiterPlugin>>,
}
}
Middleware Pattern
#![allow(unused)]
fn main() {
pub trait RateLimitMiddleware {
fn wrap(
&self,
inner: Box<dyn Fn(&str) -> Result<FluxLimiterDecision, FluxLimiterError>>,
) -> Box<dyn Fn(&str) -> Result<FluxLimiterDecision, FluxLimiterError>>;
}
// Example: Logging middleware
pub struct LoggingMiddleware;
impl RateLimitMiddleware for LoggingMiddleware {
fn wrap(&self, inner: Box<dyn Fn(&str) -> ...>) -> Box<dyn Fn(&str) -> ...> {
Box::new(move |client_id| {
info!("Checking rate limit for {}", client_id);
let result = inner(client_id);
info!("Result: {:?}", result);
result
})
}
}
}
Backward Compatibility Strategy
Semantic Versioning
- Patch (0.4.x): Bug fixes, performance improvements
- Minor (0.x.0): New features, backward compatible
- Major (x.0.0): Breaking changes
Feature Flags
All new features behind feature flags:
[features]
default = []
async = ["tokio"]
redis = ["redis", "async"]
prometheus = ["prometheus"]
opentelemetry = ["opentelemetry"]
algorithms = []
Deprecation Path
- Deprecate: Mark old API with
#[deprecated] - Transition Period: Support both old and new (2-3 minor versions)
- Remove: Remove in next major version
Type Aliases
Maintain existing API surface:
#![allow(unused)]
fn main() {
// Existing users can continue using this
pub type FluxLimiter<T> = FluxLimiter<T, SystemClock>;
// New users can use explicit clock type
pub type FluxLimiter<T, C> = core::FluxLimiter<T, C>;
}
Implementation Roadmap
Phase 1: Core Improvements (v0.5)
- Async support
- Enhanced metrics
- Performance optimizations
Phase 2: Persistence (v0.6)
- State store trait
- Redis backend
- PostgreSQL backend
Phase 3: Advanced Features (v0.7)
- Alternative algorithms
- Distributed coordination
- Adaptive rate limiting
Phase 4: Enterprise Features (v1.0)
- Quota management
- Multi-tier support
- Advanced monitoring
Contributing
We welcome contributions! Areas of interest:
- Performance: Benchmarks and optimizations
- Backends: New state store implementations
- Metrics: Monitoring integrations
- Algorithms: Alternative rate limiting strategies
- Documentation: Examples and guides
- Testing: More comprehensive tests
Next Steps
- Overview - Understand current architecture
- Design Decisions - Why we made these choices
- Contributing - How to contribute