Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Flux Limiter

A high-performance rate limiter based on the Generic Cell Rate Algorithm (GCRA) with nanosecond precision and lock-free concurrent access.

Features

  • Mathematically precise: Implements the GCRA algorithm with exact nanosecond timing
  • High performance: Lock-free concurrent access using DashMap
  • Generic client IDs: Works with any hashable client identifier (String, IpAddr, u64, etc.)
  • Rich metadata: Returns detailed decision information for HTTP response construction
  • Memory efficient: Automatic cleanup of stale client entries
  • Robust error handling: Graceful handling of clock failures and configuration errors
  • Testable: Clock abstraction enables deterministic testing
  • Thread-safe: Safe to use across multiple threads
  • Zero allocations: Efficient hot path with minimal overhead

What is Rate Limiting?

Rate limiting is a technique used to control the rate at which requests or operations are processed. It’s commonly used to:

  • Protect services: Prevent abuse and ensure fair resource allocation
  • Control costs: Limit API usage to manage infrastructure costs
  • Ensure quality of service: Prevent individual users from degrading performance for others
  • Comply with policies: Enforce usage limits and SLA agreements

Why Flux Limiter?

Flux Limiter stands out with its focus on:

  1. Correctness: Mathematically precise GCRA implementation
  2. Performance: Lock-free concurrency with O(1) operations
  3. Reliability: Comprehensive error handling and graceful degradation
  4. Observability: Rich metadata for monitoring and HTTP headers
  5. Flexibility: Generic design supporting various client ID types

Algorithm: GCRA

Flux Limiter implements the Generic Cell Rate Algorithm (GCRA), which is mathematically equivalent to the token bucket algorithm but offers several advantages:

  • No background token refill processes
  • Exact timing without floating-point precision loss
  • Efficient state representation (one timestamp per client)
  • Deterministic behavior with integer arithmetic

Performance Characteristics

  • Memory: O(number of active clients)
  • Time complexity: O(1) for check_request() operations
  • Concurrency: Lock-free reads and writes via DashMap
  • Precision: Nanosecond timing accuracy
  • Throughput: Millions of operations per second
  • Reliability: Graceful degradation on system clock issues

Next Steps

Installation

Add Flux Limiter to your Rust project using Cargo.

Adding to Cargo.toml

Add this to your Cargo.toml:

[dependencies]
flux-limiter = "0.7.2"

Alternative: Using Cargo Add

If you have cargo-edit installed, you can use:

cargo add flux-limiter

Verify Installation

Create a simple example to verify the installation:

use flux_limiter::{FluxLimiter, FluxLimiterConfig, SystemClock};

fn main() {
    let config = FluxLimiterConfig::new(10.0, 5.0);
    let limiter = FluxLimiter::with_config(config, SystemClock).unwrap();

    match limiter.check_request("test_client") {
        Ok(decision) => {
            println!("Request allowed: {}", decision.allowed);
        }
        Err(e) => {
            eprintln!("Error: {}", e);
        }
    }
}

Run with:

cargo run

You should see output like:

Request allowed: true

Next Steps

Now that you have Flux Limiter installed, proceed to:

Quick Start

Get started with Flux Limiter in just a few minutes.

Basic Example

#![allow(unused)]
fn main() {
use flux_limiter::{FluxLimiter, FluxLimiterConfig, SystemClock};

// Create a rate limiter: 10 requests per second with burst of 5
let config = FluxLimiterConfig::new(10.0, 5.0);
let limiter = FluxLimiter::with_config(config, SystemClock).unwrap();

// Check if a request should be allowed
match limiter.check_request("user_123") {
    Ok(decision) => {
        if decision.allowed {
            println!("Request allowed");
        } else {
            println!("Rate limited - retry after {:.2}s",
                     decision.retry_after_seconds.unwrap_or(0.0));
        }
    }
    Err(e) => {
        eprintln!("Rate limiter error: {}", e);
        // Handle error appropriately (e.g., allow request, log error)
    }
}
}

Understanding the Example

Let’s break down what’s happening:

  1. Create Configuration: FluxLimiterConfig::new(10.0, 5.0)

    • Rate: 10 requests per second
    • Burst: 5 additional requests allowed in bursts
    • Total capacity: ~6 requests can be made immediately
  2. Create Rate Limiter: FluxLimiter::with_config(config, SystemClock)

    • Uses the configuration
    • Uses SystemClock for production time source
    • Returns Result to handle configuration errors
  3. Check Request: limiter.check_request("user_123")

    • Checks if the client “user_123” can make a request
    • Returns rich metadata about the decision
    • Automatically updates internal state

Decision Metadata

The FluxLimiterDecision struct provides detailed information:

#![allow(unused)]
fn main() {
pub struct FluxLimiterDecision {
    pub allowed: bool,                    // Whether to allow the request
    pub retry_after_seconds: Option<f64>, // When to retry (if denied)
    pub remaining_capacity: Option<f64>,  // Remaining burst capacity
    pub reset_time_nanos: u64,           // When the window resets
}
}

Using Decision Metadata

#![allow(unused)]
fn main() {
match limiter.check_request("user_123") {
    Ok(decision) => {
        if decision.allowed {
            println!("Request allowed");
            if let Some(remaining) = decision.remaining_capacity {
                println!("Remaining capacity: {:.2}", remaining);
            }
        } else {
            if let Some(retry_after) = decision.retry_after_seconds {
                println!("Please retry after {:.2} seconds", retry_after);
            }
        }
    }
    Err(e) => {
        eprintln!("Error: {}", e);
    }
}
}

Multiple Clients

Flux Limiter automatically tracks state for each unique client:

#![allow(unused)]
fn main() {
let config = FluxLimiterConfig::new(10.0, 5.0);
let limiter = FluxLimiter::with_config(config, SystemClock).unwrap();

// Different clients have independent rate limits
limiter.check_request("user_1").unwrap();
limiter.check_request("user_2").unwrap();
limiter.check_request("user_3").unwrap();

// Each client is tracked separately
}

Thread Safety

Flux Limiter is thread-safe and can be shared across threads:

#![allow(unused)]
fn main() {
use std::sync::Arc;
use std::thread;

let config = FluxLimiterConfig::new(10.0, 5.0);
let limiter = Arc::new(FluxLimiter::with_config(config, SystemClock).unwrap());

let mut handles = vec![];

for i in 0..10 {
    let limiter = Arc::clone(&limiter);
    let handle = thread::spawn(move || {
        let client_id = format!("user_{}", i);
        limiter.check_request(client_id)
    });
    handles.push(handle);
}

for handle in handles {
    let result = handle.join().unwrap();
    println!("Result: {:?}", result);
}
}

Next Steps

Configuration

Learn how to configure Flux Limiter for your specific use case.

Configuration Basics

Flux Limiter uses FluxLimiterConfig for configuration:

#![allow(unused)]
fn main() {
use flux_limiter::FluxLimiterConfig;

let config = FluxLimiterConfig::new(rate_per_second, burst_capacity);
}

Rate Parameter

The rate parameter defines the sustained requests per second.

  • Must be positive (> 0)
  • Defines the steady-state request rate
  • Example: rate = 100.0 means 100 requests per second
#![allow(unused)]
fn main() {
// 100 requests per second sustained
let config = FluxLimiterConfig::new(100.0, 0.0);
}

Understanding Rate

The rate translates to a time interval between requests:

  • rate = 1.0 → One request per second (1000ms interval)
  • rate = 10.0 → Ten requests per second (100ms interval)
  • rate = 100.0 → One hundred requests per second (10ms interval)

Burst Parameter

The burst parameter defines additional capacity for handling bursts.

  • Must be non-negative (≥ 0)
  • Allows temporary spikes above the sustained rate
  • Example: burst = 50.0 allows bursts of ~50 additional requests
#![allow(unused)]
fn main() {
// 100 requests per second with burst capacity of 50
let config = FluxLimiterConfig::new(100.0, 50.0);
}

Understanding Burst

  • Total capacity: Approximately 1 + burst requests can be made immediately
  • Burst recovery: Unused burst capacity accumulates at the configured rate
  • Maximum burst: Capped at the configured burst value

Rate and Burst Example

With rate=10.0 and burst=5.0:

  1. Initial state: Client can make ~6 requests immediately (1 + 5 burst)
  2. After burst: Limited to sustained rate of 10 requests per second
  3. Recovery: Burst capacity recovers at 10 units per second
  4. After 1 second idle: Can make ~6 requests again

Builder Pattern

Use the builder pattern for clearer configuration:

#![allow(unused)]
fn main() {
let config = FluxLimiterConfig::new(0.0, 0.0)
    .rate(100.0)        // 100 requests per second
    .burst(50.0);       // Allow bursts of up to 50 requests
}

This approach is especially useful when you want to:

  • Start with default values and override specific settings
  • Make configuration changes clearer and more readable

Common Configuration Patterns

Strict Rate Limiting (No Burst)

#![allow(unused)]
fn main() {
// Exactly 10 requests per second, no burst allowed
let config = FluxLimiterConfig::new(10.0, 0.0);
}

Use when:

  • You need strict, predictable rate limiting
  • Bursts would cause issues for your backend
  • You want simple, consistent behavior

Flexible Rate Limiting (With Burst)

#![allow(unused)]
fn main() {
// 10 requests per second with burst of 20
let config = FluxLimiterConfig::new(10.0, 20.0);
}

Use when:

  • User experience benefits from handling short bursts
  • Your backend can handle temporary spikes
  • You want to allow occasional bursty traffic

API Gateway Configuration

#![allow(unused)]
fn main() {
// 1000 requests per second with generous burst
let config = FluxLimiterConfig::new(1000.0, 500.0);
}

Use for:

  • High-throughput API gateways
  • Services that can handle significant load
  • Preventing extreme abuse while allowing normal traffic

Free Tier API Configuration

#![allow(unused)]
fn main() {
// 10 requests per minute (0.1667 per second) with small burst
let rate_per_minute = 10.0 / 60.0;
let config = FluxLimiterConfig::new(rate_per_minute, 5.0);
}

Use for:

  • Free tier API limits
  • Generous limits that prevent abuse
  • Pay-per-use API tiers

Configuration Validation

Configuration is validated when creating the rate limiter:

#![allow(unused)]
fn main() {
use flux_limiter::{FluxLimiterConfig, FluxLimiterError};

// Invalid configuration: negative rate
let config = FluxLimiterConfig::new(-10.0, 5.0);
match FluxLimiter::with_config(config, SystemClock) {
    Ok(_) => println!("Valid configuration"),
    Err(FluxLimiterError::InvalidRate) => {
        eprintln!("Error: Rate must be positive");
    }
    Err(e) => eprintln!("Error: {}", e),
}
}

Validation Rules

  • InvalidRate: Returned if rate ≤ 0
  • InvalidBurst: Returned if burst < 0

Manual Validation

You can validate configuration before creating the limiter:

#![allow(unused)]
fn main() {
match config.validate() {
    Ok(_) => println!("Configuration is valid"),
    Err(FluxLimiterError::InvalidRate) => {
        eprintln!("Rate must be positive");
    }
    Err(FluxLimiterError::InvalidBurst) => {
        eprintln!("Burst must be non-negative");
    }
    Err(e) => eprintln!("Configuration error: {}", e),
}
}

Dynamic Configuration

While Flux Limiter doesn’t support runtime configuration changes, you can work around this:

#![allow(unused)]
fn main() {
use std::sync::Arc;
use parking_lot::RwLock;

struct DynamicRateLimiter {
    limiter: Arc<RwLock<FluxLimiter<String, SystemClock>>>,
}

impl DynamicRateLimiter {
    fn update_config(&self, new_config: FluxLimiterConfig) {
        let new_limiter = FluxLimiter::with_config(new_config, SystemClock).unwrap();
        *self.limiter.write() = new_limiter;
    }

    fn check_request(&self, client_id: String) -> Result<FluxLimiterDecision, FluxLimiterError> {
        self.limiter.read().check_request(client_id)
    }
}
}

Note: Changing configuration resets all client state.

Next Steps

Basic Usage

Common patterns and best practices for using Flux Limiter.

Simple Rate Limiting

The most basic use case - check if a request should be allowed:

#![allow(unused)]
fn main() {
use flux_limiter::{FluxLimiter, FluxLimiterConfig, SystemClock};

let config = FluxLimiterConfig::new(10.0, 5.0);
let limiter = FluxLimiter::with_config(config, SystemClock).unwrap();

match limiter.check_request("user_123") {
    Ok(decision) if decision.allowed => {
        // Process request
        println!("Request allowed");
    }
    Ok(decision) => {
        // Rate limited
        println!("Rate limited - retry after {:.2}s",
                 decision.retry_after_seconds.unwrap_or(0.0));
    }
    Err(e) => {
        // Handle error
        eprintln!("Error: {}", e);
    }
}
}

Working with Different Client ID Types

String Client IDs

#![allow(unused)]
fn main() {
// User IDs, session IDs, API keys
let limiter = FluxLimiter::<String, _>::with_config(config, SystemClock).unwrap();

limiter.check_request("user_123".to_string())?;
limiter.check_request("session_abc".to_string())?;
}

IP Address Client IDs

#![allow(unused)]
fn main() {
use std::net::IpAddr;

let limiter = FluxLimiter::<IpAddr, _>::with_config(config, SystemClock).unwrap();

let client_ip: IpAddr = "192.168.1.1".parse().unwrap();
limiter.check_request(client_ip)?;
}

Numeric Client IDs

#![allow(unused)]
fn main() {
// High-performance numeric IDs
let limiter = FluxLimiter::<u64, _>::with_config(config, SystemClock).unwrap();

limiter.check_request(12345)?;
limiter.check_request(67890)?;
}

Custom Client IDs

#![allow(unused)]
fn main() {
use std::hash::{Hash, Hasher};

#[derive(Clone, PartialEq, Eq, Hash)]
struct ClientId {
    user_id: u64,
    endpoint: String,
}

let limiter = FluxLimiter::<ClientId, _>::with_config(config, SystemClock).unwrap();

let client = ClientId {
    user_id: 123,
    endpoint: "/api/data".to_string(),
};

limiter.check_request(client)?;
}

Accessing Configuration

You can query the current rate limiter configuration:

#![allow(unused)]
fn main() {
let limiter = FluxLimiter::with_config(config, SystemClock).unwrap();

println!("Rate: {} req/sec", limiter.rate());
println!("Burst: {}", limiter.burst());
}

Using Decision Metadata

Extracting All Metadata

#![allow(unused)]
fn main() {
match limiter.check_request("user_123") {
    Ok(decision) => {
        println!("Allowed: {}", decision.allowed);

        if let Some(retry_after) = decision.retry_after_seconds {
            println!("Retry after: {:.2}s", retry_after);
        }

        if let Some(remaining) = decision.remaining_capacity {
            println!("Remaining capacity: {:.2}", remaining);
        }

        println!("Reset time: {} ns", decision.reset_time_nanos);
    }
    Err(e) => eprintln!("Error: {}", e),
}
}

Computing Reset Time

#![allow(unused)]
fn main() {
use std::time::{SystemTime, UNIX_EPOCH, Duration};

match limiter.check_request("user_123") {
    Ok(decision) => {
        // Convert reset_time_nanos to a timestamp
        let reset_duration = Duration::from_nanos(decision.reset_time_nanos);
        let reset_time = UNIX_EPOCH + reset_duration;

        let now = SystemTime::now();
        if let Ok(time_until_reset) = reset_time.duration_since(now) {
            println!("Reset in {:.2}s", time_until_reset.as_secs_f64());
        }
    }
    Err(e) => eprintln!("Error: {}", e),
}
}

Sharing Across Threads

Flux Limiter is thread-safe and designed to be shared:

#![allow(unused)]
fn main() {
use std::sync::Arc;
use std::thread;

let config = FluxLimiterConfig::new(100.0, 50.0);
let limiter = Arc::new(FluxLimiter::with_config(config, SystemClock).unwrap());

// Share across multiple threads
let handles: Vec<_> = (0..10)
    .map(|i| {
        let limiter = Arc::clone(&limiter);
        thread::spawn(move || {
            let client_id = format!("client_{}", i);
            limiter.check_request(client_id)
        })
    })
    .collect();

for handle in handles {
    match handle.join().unwrap() {
        Ok(decision) => println!("Allowed: {}", decision.allowed),
        Err(e) => eprintln!("Error: {}", e),
    }
}
}

Async Usage

While Flux Limiter is synchronous, it works seamlessly in async contexts:

use std::sync::Arc;
use tokio;

#[tokio::main]
async fn main() {
    let config = FluxLimiterConfig::new(100.0, 50.0);
    let limiter = Arc::new(FluxLimiter::with_config(config, SystemClock).unwrap());

    // Use in async tasks
    let limiter_clone = Arc::clone(&limiter);
    tokio::spawn(async move {
        match limiter_clone.check_request("user_123") {
            Ok(decision) if decision.allowed => {
                // Process async request
                println!("Processing request...");
            }
            Ok(_) => println!("Rate limited"),
            Err(e) => eprintln!("Error: {}", e),
        }
    })
    .await
    .unwrap();
}

Multiple Rate Limiters

You can create different rate limiters for different purposes:

#![allow(unused)]
fn main() {
// Global rate limiter
let global_limiter = FluxLimiter::with_config(
    FluxLimiterConfig::new(1000.0, 500.0),
    SystemClock
).unwrap();

// Per-user rate limiter
let user_limiter = FluxLimiter::with_config(
    FluxLimiterConfig::new(10.0, 5.0),
    SystemClock
).unwrap();

// Apply both limits
fn check_both_limits(user_id: &str) -> bool {
    match global_limiter.check_request("global") {
        Ok(decision) if !decision.allowed => return false,
        Err(_) => return false,
        _ => {}
    }

    match user_limiter.check_request(user_id) {
        Ok(decision) => decision.allowed,
        Err(_) => false,
    }
}
}

Idiomatic Error Handling

#![allow(unused)]
fn main() {
use flux_limiter::{FluxLimiter, FluxLimiterError};

fn process_request(limiter: &FluxLimiter<String, SystemClock>, client_id: String) -> Result<(), String> {
    let decision = limiter
        .check_request(client_id)
        .map_err(|e| format!("Rate limiter error: {}", e))?;

    if !decision.allowed {
        return Err(format!(
            "Rate limited. Retry after {:.2}s",
            decision.retry_after_seconds.unwrap_or(0.0)
        ));
    }

    // Process the request
    Ok(())
}
}

Next Steps

Error Handling

Comprehensive guide to handling errors in Flux Limiter.

Error Types

Flux Limiter provides a well-defined error hierarchy:

#![allow(unused)]
fn main() {
pub enum FluxLimiterError {
    InvalidRate,           // Configuration: rate ≤ 0
    InvalidBurst,          // Configuration: burst < 0
    ClockError(ClockError), // Runtime: clock failure
}

pub enum ClockError {
    SystemTimeError,       // System time unavailable
}
}

Configuration Errors

Configuration errors occur when creating a rate limiter with invalid settings.

InvalidRate Error

#![allow(unused)]
fn main() {
use flux_limiter::{FluxLimiterConfig, FluxLimiter, SystemClock, FluxLimiterError};

// Invalid: rate must be positive
let config = FluxLimiterConfig::new(-10.0, 5.0);

match FluxLimiter::with_config(config, SystemClock) {
    Ok(_) => println!("Success"),
    Err(FluxLimiterError::InvalidRate) => {
        eprintln!("Error: Rate must be positive (> 0)");
    }
    Err(e) => eprintln!("Other error: {}", e),
}
}

InvalidBurst Error

#![allow(unused)]
fn main() {
// Invalid: burst must be non-negative
let config = FluxLimiterConfig::new(10.0, -5.0);

match FluxLimiter::with_config(config, SystemClock) {
    Ok(_) => println!("Success"),
    Err(FluxLimiterError::InvalidBurst) => {
        eprintln!("Error: Burst must be non-negative (≥ 0)");
    }
    Err(e) => eprintln!("Other error: {}", e),
}
}

Handling Configuration Errors

Configuration errors should be caught early, typically at application startup:

fn create_rate_limiter() -> Result<FluxLimiter<String, SystemClock>, String> {
    let config = FluxLimiterConfig::new(100.0, 50.0);

    FluxLimiter::with_config(config, SystemClock)
        .map_err(|e| match e {
            FluxLimiterError::InvalidRate => {
                "Invalid configuration: rate must be positive".to_string()
            }
            FluxLimiterError::InvalidBurst => {
                "Invalid configuration: burst must be non-negative".to_string()
            }
            _ => format!("Configuration error: {}", e),
        })
}

fn main() {
    let limiter = create_rate_limiter()
        .expect("Failed to create rate limiter with valid configuration");

    // Use limiter...
}

Runtime Clock Errors

Clock errors can occur during normal operation when the system clock is unavailable or behaves unexpectedly.

Understanding Clock Errors

Clock errors happen when:

  • System time API fails
  • Clock jumps backward (NTP adjustment)
  • System suspend/resume causes time discontinuity
  • Virtualization causes time skips

Basic Clock Error Handling

#![allow(unused)]
fn main() {
match limiter.check_request("user_123") {
    Ok(decision) => {
        if decision.allowed {
            // Process request
        } else {
            // Rate limited
        }
    }
    Err(FluxLimiterError::ClockError(_)) => {
        eprintln!("System clock error detected");
        // Implement your error policy
    }
    Err(e) => {
        eprintln!("Unexpected error: {}", e);
    }
}
}

Error Handling Policies

Different applications require different error handling strategies.

Fail-Open Policy

Allow requests when the rate limiter encounters errors:

#![allow(unused)]
fn main() {
fn should_allow_request(
    limiter: &FluxLimiter<String, SystemClock>,
    client_id: &str
) -> bool {
    match limiter.check_request(client_id) {
        Ok(decision) => decision.allowed,
        Err(FluxLimiterError::ClockError(_)) => {
            // Fail-open: allow request on clock error
            eprintln!("Clock error - allowing request (fail-open policy)");
            true
        }
        Err(e) => {
            eprintln!("Rate limiter error: {} - allowing request", e);
            true
        }
    }
}
}

Use when:

  • Availability is more important than strict rate limiting
  • False positives (allowing too many requests) are acceptable
  • Your backend can handle temporary spikes

Fail-Closed Policy

Deny requests when the rate limiter encounters errors:

#![allow(unused)]
fn main() {
fn should_allow_request(
    limiter: &FluxLimiter<String, SystemClock>,
    client_id: &str
) -> bool {
    match limiter.check_request(client_id) {
        Ok(decision) => decision.allowed,
        Err(FluxLimiterError::ClockError(_)) => {
            // Fail-closed: deny request on clock error
            eprintln!("Clock error - denying request (fail-closed policy)");
            false
        }
        Err(e) => {
            eprintln!("Rate limiter error: {} - denying request", e);
            false
        }
    }
}
}

Use when:

  • Security is paramount
  • False negatives (denying legitimate requests) are acceptable
  • Protecting backend from overload is critical

Fallback Policy

Use alternative rate limiting when clock errors occur:

#![allow(unused)]
fn main() {
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

struct FallbackRateLimiter {
    primary: FluxLimiter<String, SystemClock>,
    fallback_counter: Arc<AtomicU64>,
    fallback_limit: u64,
}

impl FallbackRateLimiter {
    fn check_request(&self, client_id: String) -> bool {
        match self.primary.check_request(client_id) {
            Ok(decision) => decision.allowed,
            Err(FluxLimiterError::ClockError(_)) => {
                // Use simple counter as fallback
                let count = self.fallback_counter.fetch_add(1, Ordering::Relaxed);

                if count >= self.fallback_limit {
                    eprintln!("Fallback limit reached");
                    false
                } else {
                    eprintln!("Using fallback counter: {}/{}", count, self.fallback_limit);
                    true
                }
            }
            Err(e) => {
                eprintln!("Unexpected error: {}", e);
                false
            }
        }
    }
}
}

Monitoring Clock Errors

Track clock errors for alerting and debugging:

#![allow(unused)]
fn main() {
use std::sync::atomic::{AtomicU64, Ordering};

static CLOCK_ERROR_COUNT: AtomicU64 = AtomicU64::new(0);
static TOTAL_REQUESTS: AtomicU64 = AtomicU64::new(0);

fn check_with_monitoring(
    limiter: &FluxLimiter<String, SystemClock>,
    client_id: String
) -> bool {
    TOTAL_REQUESTS.fetch_add(1, Ordering::Relaxed);

    match limiter.check_request(client_id) {
        Ok(decision) => decision.allowed,
        Err(FluxLimiterError::ClockError(e)) => {
            CLOCK_ERROR_COUNT.fetch_add(1, Ordering::Relaxed);

            let error_count = CLOCK_ERROR_COUNT.load(Ordering::Relaxed);
            let total = TOTAL_REQUESTS.load(Ordering::Relaxed);
            let error_rate = error_count as f64 / total as f64;

            eprintln!("Clock error: {:?} (rate: {:.4}%)", e, error_rate * 100.0);

            // Implement your policy
            true // Fail-open
        }
        Err(e) => {
            eprintln!("Unexpected error: {}", e);
            false
        }
    }
}
}

Circuit Breaker Pattern

Temporarily bypass rate limiting after consecutive failures:

#![allow(unused)]
fn main() {
use std::sync::atomic::{AtomicU64, Ordering};

struct CircuitBreakerLimiter {
    limiter: FluxLimiter<String, SystemClock>,
    consecutive_failures: AtomicU64,
    failure_threshold: u64,
    bypassed: AtomicU64,
}

impl CircuitBreakerLimiter {
    fn check_request(&self, client_id: String) -> bool {
        // Check if circuit is open
        if self.consecutive_failures.load(Ordering::Relaxed) >= self.failure_threshold {
            self.bypassed.fetch_add(1, Ordering::Relaxed);
            eprintln!("Circuit open - bypassing rate limiter");
            return true;
        }

        match self.limiter.check_request(client_id) {
            Ok(decision) => {
                // Reset failure counter on success
                self.consecutive_failures.store(0, Ordering::Relaxed);
                decision.allowed
            }
            Err(FluxLimiterError::ClockError(_)) => {
                let failures = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;

                if failures >= self.failure_threshold {
                    eprintln!("Opening circuit after {} consecutive failures", failures);
                }

                true // Fail-open
            }
            Err(e) => {
                eprintln!("Unexpected error: {}", e);
                false
            }
        }
    }
}
}

Cleanup Error Handling

The cleanup_stale_clients method can also return clock errors:

#![allow(unused)]
fn main() {
// Cleanup errors are typically not critical
match limiter.cleanup_stale_clients(one_hour_nanos) {
    Ok(count) => {
        println!("Cleaned up {} stale clients", count);
    }
    Err(FluxLimiterError::ClockError(_)) => {
        eprintln!("Clock error during cleanup - will retry later");
        // Cleanup failure is not critical - continue operation
    }
    Err(e) => {
        eprintln!("Unexpected cleanup error: {}", e);
    }
}
}

Best Practices

  1. Validate Configuration Early: Check configuration at startup, not runtime
  2. Choose an Error Policy: Decide on fail-open, fail-closed, or fallback
  3. Monitor Errors: Track error rates for alerting
  4. Log Contextually: Include client ID and error context in logs
  5. Handle Gracefully: Never panic - always return a decision
  6. Test Error Paths: Use TestClock to simulate failures
  7. Document Policy: Make your error handling policy explicit

Next Steps

Advanced Usage

Advanced features and optimization techniques for Flux Limiter.

Memory Management

Flux Limiter tracks state for each client, which can grow over time. Use cleanup to manage memory.

Understanding Memory Usage

  • Memory: O(number of active clients)
  • Per-client overhead: ~40-48 bytes (client ID + TAT timestamp)
  • Example: 1 million clients ≈ 40-48 MB

Manual Cleanup

#![allow(unused)]
fn main() {
// Clean up clients that haven't been seen for 1 hour
let one_hour_nanos = 60 * 60 * 1_000_000_000u64;

match limiter.cleanup_stale_clients(one_hour_nanos) {
    Ok(removed_count) => {
        println!("Cleaned up {} stale clients", removed_count);
    }
    Err(e) => {
        eprintln!("Cleanup failed: {}", e);
        // Cleanup failure is usually not critical - log and continue
    }
}
}

Automatic Cleanup with Tokio

use std::sync::Arc;
use tokio::time::{interval, Duration};

async fn start_cleanup_task(limiter: Arc<FluxLimiter<String, SystemClock>>) {
    let mut cleanup_interval = interval(Duration::from_secs(3600)); // 1 hour

    loop {
        cleanup_interval.tick().await;

        let threshold = 24 * 60 * 60 * 1_000_000_000u64; // 24 hours

        match limiter.cleanup_stale_clients(threshold) {
            Ok(count) => {
                println!("Cleaned up {} stale clients", count);
            }
            Err(e) => {
                eprintln!("Cleanup failed: {}", e);
                // Consider implementing fallback cleanup or alerting
            }
        }
    }
}

#[tokio::main]
async fn main() {
    let config = FluxLimiterConfig::new(100.0, 50.0);
    let limiter = Arc::new(FluxLimiter::with_config(config, SystemClock).unwrap());

    // Start cleanup task in background
    let limiter_clone = Arc::clone(&limiter);
    tokio::spawn(start_cleanup_task(limiter_clone));

    // Use limiter in your application...
}

Dynamic Cleanup Thresholds

Base cleanup threshold on the configured rate:

#![allow(unused)]
fn main() {
fn calculate_cleanup_threshold(limiter: &FluxLimiter<String, SystemClock>) -> u64 {
    // Clean up clients inactive for 100x the rate interval
    let rate = limiter.rate();
    let rate_interval_nanos = (1_000_000_000.0 / rate) as u64;
    rate_interval_nanos * 100
}

let threshold = calculate_cleanup_threshold(&limiter);
let _ = limiter.cleanup_stale_clients(threshold);
}

Custom Client ID Types

Create custom client IDs for complex rate limiting scenarios.

Composite Client IDs

Rate limit by multiple dimensions:

#![allow(unused)]
fn main() {
use std::hash::{Hash, Hasher};

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct CompositeClientId {
    user_id: String,
    api_endpoint: String,
}

let limiter = FluxLimiter::<CompositeClientId, _>::with_config(
    FluxLimiterConfig::new(100.0, 50.0),
    SystemClock
).unwrap();

// Different rate limits per user per endpoint
let client = CompositeClientId {
    user_id: "user_123".to_string(),
    api_endpoint: "/api/data".to_string(),
};

limiter.check_request(client)?;
}

Hierarchical Rate Limiting

#![allow(unused)]
fn main() {
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
enum RateLimitKey {
    Global,
    PerTenant(String),
    PerUser(String, String), // tenant_id, user_id
}

let limiter = FluxLimiter::<RateLimitKey, _>::with_config(
    FluxLimiterConfig::new(1000.0, 500.0),
    SystemClock
).unwrap();

// Check multiple levels
fn check_hierarchical(
    limiter: &FluxLimiter<RateLimitKey, SystemClock>,
    tenant_id: &str,
    user_id: &str,
) -> bool {
    // Global limit
    if !limiter.check_request(RateLimitKey::Global).unwrap().allowed {
        return false;
    }

    // Tenant limit
    if !limiter.check_request(RateLimitKey::PerTenant(tenant_id.to_string())).unwrap().allowed {
        return false;
    }

    // User limit
    limiter.check_request(RateLimitKey::PerUser(tenant_id.to_string(), user_id.to_string()))
        .unwrap()
        .allowed
}
}

Multiple Rate Limiters

Use different rate limiters for different purposes.

Tiered Rate Limiting

#![allow(unused)]
fn main() {
use std::sync::Arc;

struct TieredRateLimiter {
    free_tier: Arc<FluxLimiter<String, SystemClock>>,
    paid_tier: Arc<FluxLimiter<String, SystemClock>>,
    premium_tier: Arc<FluxLimiter<String, SystemClock>>,
}

impl TieredRateLimiter {
    fn new() -> Result<Self, FluxLimiterError> {
        Ok(Self {
            free_tier: Arc::new(FluxLimiter::with_config(
                FluxLimiterConfig::new(10.0, 5.0), // 10 req/s
                SystemClock
            )?),
            paid_tier: Arc::new(FluxLimiter::with_config(
                FluxLimiterConfig::new(100.0, 50.0), // 100 req/s
                SystemClock
            )?),
            premium_tier: Arc::new(FluxLimiter::with_config(
                FluxLimiterConfig::new(1000.0, 500.0), // 1000 req/s
                SystemClock
            )?),
        })
    }

    fn check_request(&self, tier: &str, client_id: String) -> Result<FluxLimiterDecision, FluxLimiterError> {
        match tier {
            "free" => self.free_tier.check_request(client_id),
            "paid" => self.paid_tier.check_request(client_id),
            "premium" => self.premium_tier.check_request(client_id),
            _ => self.free_tier.check_request(client_id),
        }
    }
}
}

Per-Endpoint Rate Limiting

#![allow(unused)]
fn main() {
use std::collections::HashMap;
use std::sync::Arc;

struct EndpointRateLimiter {
    limiters: HashMap<String, Arc<FluxLimiter<String, SystemClock>>>,
    default: Arc<FluxLimiter<String, SystemClock>>,
}

impl EndpointRateLimiter {
    fn new() -> Result<Self, FluxLimiterError> {
        let mut limiters = HashMap::new();

        // Strict limit for expensive endpoint
        limiters.insert(
            "/api/expensive".to_string(),
            Arc::new(FluxLimiter::with_config(
                FluxLimiterConfig::new(1.0, 2.0),
                SystemClock
            )?),
        );

        // Generous limit for cheap endpoint
        limiters.insert(
            "/api/cheap".to_string(),
            Arc::new(FluxLimiter::with_config(
                FluxLimiterConfig::new(1000.0, 500.0),
                SystemClock
            )?),
        );

        // Default limit
        let default = Arc::new(FluxLimiter::with_config(
            FluxLimiterConfig::new(100.0, 50.0),
            SystemClock
        )?);

        Ok(Self { limiters, default })
    }

    fn check_request(&self, endpoint: &str, client_id: String) -> Result<FluxLimiterDecision, FluxLimiterError> {
        let limiter = self.limiters.get(endpoint).unwrap_or(&self.default);
        limiter.check_request(client_id)
    }
}
}

Performance Optimization

Minimize String Allocations

Use references where possible:

#![allow(unused)]
fn main() {
// Instead of allocating new strings
fn check_request_owned(limiter: &FluxLimiter<String, SystemClock>, id: String) -> bool {
    limiter.check_request(id).unwrap().allowed
}

// Use string slices and clone only when needed
fn check_request_borrowed(limiter: &FluxLimiter<String, SystemClock>, id: &str) -> bool {
    limiter.check_request(id.to_string()).unwrap().allowed
}
}

Or use numeric IDs for better performance:

#![allow(unused)]
fn main() {
// Faster: no string allocation
let limiter = FluxLimiter::<u64, _>::with_config(config, SystemClock).unwrap();
limiter.check_request(user_id_numeric).unwrap();
}

Batch Operations

Process multiple clients efficiently:

#![allow(unused)]
fn main() {
fn check_batch(
    limiter: &FluxLimiter<String, SystemClock>,
    client_ids: &[String],
) -> Vec<bool> {
    client_ids
        .iter()
        .map(|id| limiter.check_request(id.clone())
            .map(|d| d.allowed)
            .unwrap_or(false))
        .collect()
}
}

Parallel Processing

Use rayon for parallel rate limiting checks:

#![allow(unused)]
fn main() {
use rayon::prelude::*;

fn check_parallel(
    limiter: &FluxLimiter<String, SystemClock>,
    client_ids: Vec<String>,
) -> Vec<bool> {
    client_ids
        .par_iter()
        .map(|id| limiter.check_request(id.clone())
            .map(|d| d.allowed)
            .unwrap_or(false))
        .collect()
}
}

Custom Clock Implementation

Implement custom clock sources for special scenarios.

Mock Clock for Testing

#![allow(unused)]
fn main() {
use flux_limiter::{Clock, ClockError};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

#[derive(Clone)]
struct MockClock {
    time: Arc<AtomicU64>,
}

impl MockClock {
    fn new(initial_time_nanos: u64) -> Self {
        Self {
            time: Arc::new(AtomicU64::new(initial_time_nanos)),
        }
    }

    fn set_time(&self, time_nanos: u64) {
        self.time.store(time_nanos, Ordering::SeqCst);
    }

    fn advance(&self, duration_nanos: u64) {
        self.time.fetch_add(duration_nanos, Ordering::SeqCst);
    }
}

impl Clock for MockClock {
    fn now(&self) -> Result<u64, ClockError> {
        Ok(self.time.load(Ordering::SeqCst))
    }
}

// Use in tests
let clock = MockClock::new(0);
let limiter = FluxLimiter::with_config(config, clock.clone()).unwrap();

// Control time in tests
clock.advance(1_000_000_000); // Advance 1 second
}

Monotonic Clock

Ensure time never goes backward:

#![allow(unused)]
fn main() {
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};

#[derive(Clone)]
struct MonotonicClock {
    last_time: Arc<AtomicU64>,
}

impl MonotonicClock {
    fn new() -> Self {
        Self {
            last_time: Arc::new(AtomicU64::new(0)),
        }
    }
}

impl Clock for MonotonicClock {
    fn now(&self) -> Result<u64, ClockError> {
        let current_time = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .map_err(|_| ClockError::SystemTimeError)?
            .as_nanos() as u64;

        // Ensure monotonicity
        let mut last = self.last_time.load(Ordering::Acquire);
        loop {
            let next = current_time.max(last);
            match self.last_time.compare_exchange_weak(
                last,
                next,
                Ordering::Release,
                Ordering::Acquire,
            ) {
                Ok(_) => return Ok(next),
                Err(x) => last = x,
            }
        }
    }
}
}

Next Steps

Web Framework Integration

Learn how to integrate Flux Limiter with popular web frameworks.

Axum Integration

Basic Middleware

#![allow(unused)]
fn main() {
use axum::{
    extract::State,
    http::{Request, StatusCode, HeaderMap},
    middleware::Next,
    response::{Response, IntoResponse},
};
use flux_limiter::{FluxLimiter, FluxLimiterError, SystemClock};
use std::sync::Arc;

type SharedLimiter = Arc<FluxLimiter<String, SystemClock>>;

async fn rate_limit_middleware<B>(
    State(limiter): State<SharedLimiter>,
    request: Request<B>,
    next: Next<B>,
) -> Result<Response, (StatusCode, HeaderMap, &'static str)> {
    // Extract client ID (e.g., from IP or auth header)
    let client_ip = extract_client_ip(&request);

    match limiter.check_request(client_ip.clone()) {
        Ok(decision) if decision.allowed => {
            // Add rate limit headers
            let mut response = next.run(request).await;
            if let Some(remaining) = decision.remaining_capacity {
                response.headers_mut().insert(
                    "X-RateLimit-Remaining",
                    remaining.to_string().parse().unwrap(),
                );
            }
            Ok(response)
        }
        Ok(decision) => {
            // Rate limited
            let mut headers = HeaderMap::new();
            if let Some(retry_after) = decision.retry_after_seconds {
                headers.insert(
                    "Retry-After",
                    (retry_after.ceil() as u64).to_string().parse().unwrap(),
                );
            }
            headers.insert("X-RateLimit-Remaining", "0".parse().unwrap());

            Err((StatusCode::TOO_MANY_REQUESTS, headers, "Rate limited"))
        }
        Err(FluxLimiterError::ClockError(_)) => {
            // Fail-open policy
            eprintln!("Rate limiter clock error - allowing request");
            Ok(next.run(request).await)
        }
        Err(e) => {
            eprintln!("Rate limiter error: {}", e);
            Err((StatusCode::INTERNAL_SERVER_ERROR, HeaderMap::new(), "Internal error"))
        }
    }
}

fn extract_client_ip<B>(request: &Request<B>) -> String {
    request
        .headers()
        .get("X-Forwarded-For")
        .and_then(|h| h.to_str().ok())
        .and_then(|s| s.split(',').next())
        .unwrap_or("unknown")
        .to_string()
}
}

Complete Axum Example

use axum::{
    routing::get,
    Router,
    middleware,
};
use flux_limiter::{FluxLimiter, FluxLimiterConfig, SystemClock};
use std::sync::Arc;

#[tokio::main]
async fn main() {
    // Create rate limiter
    let config = FluxLimiterConfig::new(10.0, 5.0);
    let limiter = Arc::new(FluxLimiter::with_config(config, SystemClock).unwrap());

    // Build router with middleware
    let app = Router::new()
        .route("/api/data", get(handler))
        .layer(middleware::from_fn_with_state(
            limiter.clone(),
            rate_limit_middleware,
        ))
        .with_state(limiter);

    // Run server
    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000")
        .await
        .unwrap();

    axum::serve(listener, app).await.unwrap();
}

async fn handler() -> &'static str {
    "Hello, world!"
}

Actix-Web Integration

Basic Middleware

#![allow(unused)]
fn main() {
use actix_web::{
    dev::{Service, ServiceRequest, ServiceResponse, Transform},
    Error, HttpResponse, http::header,
};
use flux_limiter::{FluxLimiter, FluxLimiterError, SystemClock};
use std::sync::Arc;
use std::future::Future;
use std::pin::Pin;

pub struct RateLimitMiddleware {
    limiter: Arc<FluxLimiter<String, SystemClock>>,
}

impl RateLimitMiddleware {
    pub fn new(limiter: Arc<FluxLimiter<String, SystemClock>>) -> Self {
        Self { limiter }
    }
}

impl<S, B> Transform<S, ServiceRequest> for RateLimitMiddleware
where
    S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
    S::Future: 'static,
    B: 'static,
{
    type Response = ServiceResponse<B>;
    type Error = Error;
    type Transform = RateLimitMiddlewareService<S>;
    type InitError = ();
    type Future = Pin<Box<dyn Future<Output = Result<Self::Transform, Self::InitError>>>>;

    fn new_transform(&self, service: S) -> Self::Future {
        let limiter = self.limiter.clone();
        Box::pin(async move {
            Ok(RateLimitMiddlewareService { service, limiter })
        })
    }
}

pub struct RateLimitMiddlewareService<S> {
    service: S,
    limiter: Arc<FluxLimiter<String, SystemClock>>,
}

impl<S, B> Service<ServiceRequest> for RateLimitMiddlewareService<S>
where
    S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
    S::Future: 'static,
    B: 'static,
{
    type Response = ServiceResponse<B>;
    type Error = Error;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;

    fn call(&self, req: ServiceRequest) -> Self::Future {
        let client_ip = req
            .connection_info()
            .realip_remote_addr()
            .unwrap_or("unknown")
            .to_string();

        let decision = self.limiter.check_request(client_ip);

        match decision {
            Ok(decision) if decision.allowed => {
                let fut = self.service.call(req);
                Box::pin(async move {
                    let mut res = fut.await?;
                    if let Some(remaining) = decision.remaining_capacity {
                        res.headers_mut().insert(
                            header::HeaderName::from_static("x-ratelimit-remaining"),
                            header::HeaderValue::from_str(&remaining.to_string()).unwrap(),
                        );
                    }
                    Ok(res)
                })
            }
            Ok(decision) => {
                Box::pin(async move {
                    let mut response = HttpResponse::TooManyRequests();
                    if let Some(retry_after) = decision.retry_after_seconds {
                        response.insert_header((
                            "Retry-After",
                            (retry_after.ceil() as u64).to_string(),
                        ));
                    }
                    Ok(req.into_response(response.finish()))
                })
            }
            Err(FluxLimiterError::ClockError(_)) => {
                // Fail-open policy
                eprintln!("Clock error - allowing request");
                let fut = self.service.call(req);
                Box::pin(async move { fut.await })
            }
            Err(e) => {
                eprintln!("Rate limiter error: {}", e);
                Box::pin(async move {
                    Ok(req.into_response(HttpResponse::InternalServerError().finish()))
                })
            }
        }
    }
}
}

Rocket Integration

Request Guard

#![allow(unused)]
fn main() {
use rocket::{
    request::{FromRequest, Outcome, Request},
    http::Status,
    State,
};
use flux_limiter::{FluxLimiter, FluxLimiterDecision, SystemClock};
use std::sync::Arc;

pub struct RateLimited {
    pub decision: FluxLimiterDecision,
}

#[rocket::async_trait]
impl<'r> FromRequest<'r> for RateLimited {
    type Error = ();

    async fn from_request(request: &'r Request<'_>) -> Outcome<Self, Self::Error> {
        let limiter = request
            .guard::<&State<Arc<FluxLimiter<String, SystemClock>>>>()
            .await
            .unwrap();

        let client_ip = request
            .client_ip()
            .map(|ip| ip.to_string())
            .unwrap_or_else(|| "unknown".to_string());

        match limiter.check_request(client_ip) {
            Ok(decision) if decision.allowed => {
                Outcome::Success(RateLimited { decision })
            }
            Ok(_) => Outcome::Error((Status::TooManyRequests, ())),
            Err(_) => {
                // Fail-open
                Outcome::Success(RateLimited {
                    decision: FluxLimiterDecision {
                        allowed: true,
                        retry_after_seconds: None,
                        remaining_capacity: None,
                        reset_time_nanos: 0,
                    },
                })
            }
        }
    }
}

// Usage in route
#[rocket::get("/api/data")]
fn data(_rate_limited: RateLimited) -> &'static str {
    "Hello, world!"
}
}

Standard Rate Limit Headers

HTTP Response Headers

Implement standard rate limiting headers:

#![allow(unused)]
fn main() {
use axum::http::HeaderMap;

fn add_rate_limit_headers(
    headers: &mut HeaderMap,
    decision: &FluxLimiterDecision,
    limit: f64,
) {
    // X-RateLimit-Limit: Maximum requests allowed
    headers.insert(
        "X-RateLimit-Limit",
        limit.to_string().parse().unwrap(),
    );

    // X-RateLimit-Remaining: Remaining requests in window
    if let Some(remaining) = decision.remaining_capacity {
        headers.insert(
            "X-RateLimit-Remaining",
            remaining.to_string().parse().unwrap(),
        );
    }

    // X-RateLimit-Reset: When the limit resets (Unix timestamp)
    let reset_seconds = decision.reset_time_nanos / 1_000_000_000;
    headers.insert(
        "X-RateLimit-Reset",
        reset_seconds.to_string().parse().unwrap(),
    );

    // Retry-After: When to retry (for 429 responses)
    if !decision.allowed {
        if let Some(retry_after) = decision.retry_after_seconds {
            headers.insert(
                "Retry-After",
                (retry_after.ceil() as u64).to_string().parse().unwrap(),
            );
        }
    }
}
}

Multi-Tier Rate Limiting

Rate limit based on user tier:

#![allow(unused)]
fn main() {
use axum::{
    extract::{State, Extension},
    http::Request,
    middleware::Next,
    response::Response,
};
use std::collections::HashMap;
use std::sync::Arc;

struct TieredLimiters {
    limiters: HashMap<String, Arc<FluxLimiter<String, SystemClock>>>,
}

impl TieredLimiters {
    fn new() -> Result<Self, FluxLimiterError> {
        let mut limiters = HashMap::new();

        limiters.insert(
            "free".to_string(),
            Arc::new(FluxLimiter::with_config(
                FluxLimiterConfig::new(10.0, 5.0),
                SystemClock
            )?),
        );

        limiters.insert(
            "paid".to_string(),
            Arc::new(FluxLimiter::with_config(
                FluxLimiterConfig::new(100.0, 50.0),
                SystemClock
            )?),
        );

        Ok(Self { limiters })
    }

    fn get_limiter(&self, tier: &str) -> &Arc<FluxLimiter<String, SystemClock>> {
        self.limiters.get(tier).unwrap_or_else(|| &self.limiters["free"])
    }
}

async fn tiered_rate_limit<B>(
    State(limiters): State<Arc<TieredLimiters>>,
    Extension(user_tier): Extension<String>,
    request: Request<B>,
    next: Next<B>,
) -> Response {
    let client_ip = extract_client_ip(&request);
    let limiter = limiters.get_limiter(&user_tier);

    match limiter.check_request(client_ip) {
        Ok(decision) if decision.allowed => next.run(request).await,
        Ok(_) => {
            (StatusCode::TOO_MANY_REQUESTS, "Rate limited").into_response()
        }
        Err(_) => next.run(request).await, // Fail-open
    }
}
}

Background Cleanup Task

Automatically clean up stale clients:

use tokio::time::{interval, Duration};
use std::sync::Arc;

async fn cleanup_task(limiter: Arc<FluxLimiter<String, SystemClock>>) {
    let mut cleanup_interval = interval(Duration::from_secs(3600)); // 1 hour

    loop {
        cleanup_interval.tick().await;

        let threshold = 24 * 60 * 60 * 1_000_000_000u64; // 24 hours

        match limiter.cleanup_stale_clients(threshold) {
            Ok(count) => {
                println!("Rate limiter: cleaned up {} stale clients", count);
            }
            Err(e) => {
                eprintln!("Rate limiter cleanup failed: {}", e);
            }
        }
    }
}

// Start cleanup task when initializing the server
#[tokio::main]
async fn main() {
    let config = FluxLimiterConfig::new(100.0, 50.0);
    let limiter = Arc::new(FluxLimiter::with_config(config, SystemClock).unwrap());

    // Start cleanup in background
    tokio::spawn(cleanup_task(limiter.clone()));

    // Build and run your web server...
}

Next Steps

Production Considerations

Best practices for deploying Flux Limiter in production environments.

Configuration

Choose Appropriate Limits

Consider your service’s capacity and SLAs:

#![allow(unused)]
fn main() {
// Calculate based on backend capacity
let backend_capacity = 10_000.0; // 10k requests/second
let safety_margin = 0.8; // 80% of capacity
let clients_count = 100.0; // Expected concurrent clients

let rate_per_client = (backend_capacity * safety_margin) / clients_count;
let config = FluxLimiterConfig::new(rate_per_client, rate_per_client * 0.5);
}

Environment-Based Configuration

#![allow(unused)]
fn main() {
use std::env;

fn create_rate_limiter() -> Result<FluxLimiter<String, SystemClock>, FluxLimiterError> {
    let rate = env::var("RATE_LIMIT_RATE")
        .unwrap_or_else(|_| "100.0".to_string())
        .parse::<f64>()
        .expect("Invalid RATE_LIMIT_RATE");

    let burst = env::var("RATE_LIMIT_BURST")
        .unwrap_or_else(|_| "50.0".to_string())
        .parse::<f64>()
        .expect("Invalid RATE_LIMIT_BURST");

    let config = FluxLimiterConfig::new(rate, burst);
    FluxLimiter::with_config(config, SystemClock)
}
}

Monitoring and Observability

Metrics Collection

Track rate limiting decisions for monitoring:

#![allow(unused)]
fn main() {
use std::sync::atomic::{AtomicU64, Ordering};

struct RateLimiterMetrics {
    total_requests: AtomicU64,
    allowed_requests: AtomicU64,
    denied_requests: AtomicU64,
    clock_errors: AtomicU64,
}

impl RateLimiterMetrics {
    fn new() -> Self {
        Self {
            total_requests: AtomicU64::new(0),
            allowed_requests: AtomicU64::new(0),
            denied_requests: AtomicU64::new(0),
            clock_errors: AtomicU64::new(0),
        }
    }

    fn record_decision(&self, result: &Result<FluxLimiterDecision, FluxLimiterError>) {
        self.total_requests.fetch_add(1, Ordering::Relaxed);

        match result {
            Ok(decision) if decision.allowed => {
                self.allowed_requests.fetch_add(1, Ordering::Relaxed);
            }
            Ok(_) => {
                self.denied_requests.fetch_add(1, Ordering::Relaxed);
            }
            Err(FluxLimiterError::ClockError(_)) => {
                self.clock_errors.fetch_add(1, Ordering::Relaxed);
            }
            Err(_) => {}
        }
    }

    fn get_stats(&self) -> (u64, u64, u64, u64) {
        (
            self.total_requests.load(Ordering::Relaxed),
            self.allowed_requests.load(Ordering::Relaxed),
            self.denied_requests.load(Ordering::Relaxed),
            self.clock_errors.load(Ordering::Relaxed),
        )
    }
}
}

Prometheus Integration

#![allow(unused)]
fn main() {
use prometheus::{Counter, IntCounter, Registry};
use std::sync::Arc;

struct PrometheusRateLimiter {
    limiter: Arc<FluxLimiter<String, SystemClock>>,
    requests_total: IntCounter,
    requests_allowed: IntCounter,
    requests_denied: IntCounter,
    clock_errors: IntCounter,
}

impl PrometheusRateLimiter {
    fn new(
        limiter: FluxLimiter<String, SystemClock>,
        registry: &Registry,
    ) -> Result<Self, Box<dyn std::error::Error>> {
        let requests_total = IntCounter::new(
            "rate_limiter_requests_total",
            "Total number of rate limit checks"
        )?;
        registry.register(Box::new(requests_total.clone()))?;

        let requests_allowed = IntCounter::new(
            "rate_limiter_requests_allowed",
            "Number of allowed requests"
        )?;
        registry.register(Box::new(requests_allowed.clone()))?;

        let requests_denied = IntCounter::new(
            "rate_limiter_requests_denied",
            "Number of denied requests"
        )?;
        registry.register(Box::new(requests_denied.clone()))?;

        let clock_errors = IntCounter::new(
            "rate_limiter_clock_errors",
            "Number of clock errors"
        )?;
        registry.register(Box::new(clock_errors.clone()))?;

        Ok(Self {
            limiter: Arc::new(limiter),
            requests_total,
            requests_allowed,
            requests_denied,
            clock_errors,
        })
    }

    fn check_request(&self, client_id: String) -> Result<FluxLimiterDecision, FluxLimiterError> {
        self.requests_total.inc();

        let result = self.limiter.check_request(client_id);

        match &result {
            Ok(decision) if decision.allowed => self.requests_allowed.inc(),
            Ok(_) => self.requests_denied.inc(),
            Err(FluxLimiterError::ClockError(_)) => self.clock_errors.inc(),
            _ => {}
        }

        result
    }
}
}

Logging

Implement structured logging:

#![allow(unused)]
fn main() {
use tracing::{info, warn, error};

fn check_request_with_logging(
    limiter: &FluxLimiter<String, SystemClock>,
    client_id: String,
) -> bool {
    match limiter.check_request(client_id.clone()) {
        Ok(decision) if decision.allowed => {
            info!(
                client_id = %client_id,
                remaining = ?decision.remaining_capacity,
                "Request allowed"
            );
            true
        }
        Ok(decision) => {
            warn!(
                client_id = %client_id,
                retry_after = ?decision.retry_after_seconds,
                "Request rate limited"
            );
            false
        }
        Err(FluxLimiterError::ClockError(e)) => {
            error!(
                client_id = %client_id,
                error = ?e,
                "Clock error in rate limiter - failing open"
            );
            true // Fail-open
        }
        Err(e) => {
            error!(
                client_id = %client_id,
                error = ?e,
                "Unexpected rate limiter error"
            );
            false
        }
    }
}
}

Memory Management

Automatic Cleanup

Implement periodic cleanup to prevent unbounded memory growth:

#![allow(unused)]
fn main() {
use tokio::time::{interval, Duration};
use std::sync::Arc;

struct ManagedRateLimiter {
    limiter: Arc<FluxLimiter<String, SystemClock>>,
}

impl ManagedRateLimiter {
    fn new(config: FluxLimiterConfig) -> Result<Self, FluxLimiterError> {
        let limiter = Arc::new(FluxLimiter::with_config(config, SystemClock)?);

        // Start background cleanup task
        let limiter_clone = Arc::clone(&limiter);
        tokio::spawn(async move {
            Self::cleanup_loop(limiter_clone).await;
        });

        Ok(Self { limiter })
    }

    async fn cleanup_loop(limiter: Arc<FluxLimiter<String, SystemClock>>) {
        let mut interval = interval(Duration::from_secs(3600)); // 1 hour

        loop {
            interval.tick().await;

            let threshold = 24 * 60 * 60 * 1_000_000_000u64; // 24 hours

            match limiter.cleanup_stale_clients(threshold) {
                Ok(count) => {
                    info!("Cleaned up {} stale rate limit entries", count);
                }
                Err(e) => {
                    error!("Rate limiter cleanup failed: {}", e);
                }
            }
        }
    }

    fn check_request(&self, client_id: String) -> Result<FluxLimiterDecision, FluxLimiterError> {
        self.limiter.check_request(client_id)
    }
}
}

Memory Monitoring

Monitor memory usage and alert on growth:

#![allow(unused)]
fn main() {
fn estimate_memory_usage(limiter: &FluxLimiter<String, SystemClock>) -> usize {
    // Estimate: ~48 bytes per client (HashMap overhead + String + u64)
    let estimated_clients = 1_000_000; // Adjust based on your load
    estimated_clients * 48
}

fn check_memory_threshold(limiter: &FluxLimiter<String, SystemClock>) {
    let estimated_usage = estimate_memory_usage(limiter);
    let threshold = 100 * 1024 * 1024; // 100 MB

    if estimated_usage > threshold {
        warn!(
            "Rate limiter memory usage estimated at {} MB",
            estimated_usage / (1024 * 1024)
        );
    }
}
}

Error Handling Strategy

Circuit Breaker for Clock Errors

Temporarily disable rate limiting after persistent failures:

#![allow(unused)]
fn main() {
use std::sync::atomic::{AtomicU64, AtomicBool, Ordering};
use std::time::{SystemTime, Duration};

struct CircuitBreakerRateLimiter {
    limiter: FluxLimiter<String, SystemClock>,
    consecutive_failures: AtomicU64,
    circuit_open: AtomicBool,
    circuit_open_time: AtomicU64,
    failure_threshold: u64,
    reset_timeout_secs: u64,
}

impl CircuitBreakerRateLimiter {
    fn new(
        config: FluxLimiterConfig,
        failure_threshold: u64,
        reset_timeout_secs: u64,
    ) -> Result<Self, FluxLimiterError> {
        Ok(Self {
            limiter: FluxLimiter::with_config(config, SystemClock)?,
            consecutive_failures: AtomicU64::new(0),
            circuit_open: AtomicBool::new(false),
            circuit_open_time: AtomicU64::new(0),
            failure_threshold,
            reset_timeout_secs,
        })
    }

    fn check_request(&self, client_id: String) -> bool {
        // Check if circuit should be reset
        if self.circuit_open.load(Ordering::Relaxed) {
            let open_time = self.circuit_open_time.load(Ordering::Relaxed);
            let now = SystemTime::now()
                .duration_since(SystemTime::UNIX_EPOCH)
                .unwrap()
                .as_secs();

            if now - open_time > self.reset_timeout_secs {
                info!("Attempting to close circuit breaker");
                self.circuit_open.store(false, Ordering::Relaxed);
                self.consecutive_failures.store(0, Ordering::Relaxed);
            } else {
                // Circuit still open - bypass rate limiting
                return true;
            }
        }

        match self.limiter.check_request(client_id) {
            Ok(decision) => {
                // Reset failure counter on success
                self.consecutive_failures.store(0, Ordering::Relaxed);
                decision.allowed
            }
            Err(FluxLimiterError::ClockError(_)) => {
                let failures = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;

                if failures >= self.failure_threshold {
                    error!(
                        "Opening circuit breaker after {} consecutive clock failures",
                        failures
                    );
                    self.circuit_open.store(true, Ordering::Relaxed);
                    let now = SystemTime::now()
                        .duration_since(SystemTime::UNIX_EPOCH)
                        .unwrap()
                        .as_secs();
                    self.circuit_open_time.store(now, Ordering::Relaxed);
                }

                true // Fail-open
            }
            Err(e) => {
                error!("Unexpected rate limiter error: {}", e);
                false
            }
        }
    }
}
}

Load Testing

Test your rate limiter configuration under load:

#![allow(unused)]
fn main() {
#[cfg(test)]
mod load_tests {
    use super::*;
    use std::sync::Arc;
    use std::thread;
    use std::time::Instant;

    #[test]
    fn test_concurrent_load() {
        let config = FluxLimiterConfig::new(1000.0, 500.0);
        let limiter = Arc::new(FluxLimiter::with_config(config, SystemClock).unwrap());

        let start = Instant::now();
        let threads: Vec<_> = (0..10)
            .map(|i| {
                let limiter = Arc::clone(&limiter);
                thread::spawn(move || {
                    let mut allowed = 0;
                    let mut denied = 0;

                    for j in 0..10000 {
                        let client_id = format!("client_{}_{}", i, j % 100);
                        match limiter.check_request(client_id) {
                            Ok(decision) if decision.allowed => allowed += 1,
                            Ok(_) => denied += 1,
                            Err(_) => {}
                        }
                    }

                    (allowed, denied)
                })
            })
            .collect();

        let mut total_allowed = 0;
        let mut total_denied = 0;

        for handle in threads {
            let (allowed, denied) = handle.join().unwrap();
            total_allowed += allowed;
            total_denied += denied;
        }

        let elapsed = start.elapsed();
        let total_requests = total_allowed + total_denied;

        println!("Processed {} requests in {:?}", total_requests, elapsed);
        println!("Throughput: {:.2} req/s", total_requests as f64 / elapsed.as_secs_f64());
        println!("Allowed: {}, Denied: {}", total_allowed, total_denied);
    }
}
}

Deployment Checklist

  • Configure appropriate rate and burst limits
  • Implement error handling policy (fail-open/fail-closed)
  • Set up monitoring and alerting
  • Enable structured logging
  • Configure automatic cleanup
  • Load test under expected traffic
  • Document rate limit headers in API docs
  • Set up memory monitoring
  • Test clock error handling
  • Implement circuit breaker if needed

Next Steps

Architecture Overview

This document provides a comprehensive overview of the Flux Limiter’s architecture, design decisions, and implementation details.

Design Philosophy

Flux Limiter is built on several key architectural principles:

  1. Lock-Free Concurrency: Uses atomic operations and lock-free data structures
  2. Zero-Allocation Hot Path: Minimizes memory allocation in rate limiting decisions
  3. Clock Abstraction: Enables testing and handles time-related failures
  4. Type Safety: Leverages Rust’s type system for correctness guarantees
  5. Graceful Degradation: Continues operation despite partial failures

Core Architecture

┌─────────────────────────────────────────────────────────────┐
│                    Flux Limiter                             │
├─────────────────────────────────────────────────────────────┤
│  Client API                                                 │
│  ├─ check_request(client_id) -> Result<Decision, Error>     │
│  ├─ cleanup_stale_clients(threshold) -> Result<(), Error>   │
│  └─ rate(), burst() -> f64                                  │
├─────────────────────────────────────────────────────────────┤
│  Core Components                                            │
│  ├─ FluxLimiter<T, C>     │ Main rate limiter struct        │
│  ├─ FluxLimiterConfig     │ Configuration management        │
│  ├─ FluxLimiterDecision   │ Rich decision metadata          │
│  └─ FluxLimiterError      │ Comprehensive error handling    │
├─────────────────────────────────────────────────────────────┤
│  Algorithm Layer                                            │
│  ├─ GCRA Implementation   │ Generic Cell Rate Algorithm     │
│  ├─ Nanosecond Precision  │ u64 nanosecond calculations     │
│  └─ TAT Tracking          │ Theoretical Arrival Time        │
├─────────────────────────────────────────────────────────────┤
│  Storage Layer                                              │
│  ├─ DashMap<T, u64>       │ Lock-free concurrent hash map   │
│  ├─ Atomic Operations     │ Thread-safe state updates       │
│  └─ Memory Management     │ Automatic cleanup mechanisms    │
├─────────────────────────────────────────────────────────────┤
│  Time Abstraction                                           │
│  ├─ Clock Trait           │ Pluggable time source           │
│  ├─ SystemClock           │ Production time implementation   │
│  └─ TestClock             │ Deterministic test time         │
└─────────────────────────────────────────────────────────────┘

Data Flow

Request → check_request(client_id)
    ↓
Clock::now() → Current Time (nanoseconds)
    ↓
DashMap::get(client_id) → Previous TAT
    ↓
GCRA Calculation
    ↓
Decision: Allow/Deny + Metadata
    ↓
DashMap::insert(client_id, new_TAT)
    ↓
Return FluxLimiterDecision

Key Design Principles

Performance First

  • O(1) operations: Constant time complexity for rate limit checks
  • Lock-free concurrency: No global locks or contention
  • Nanosecond precision: Integer arithmetic avoids floating-point overhead
  • Minimal allocations: Hot path reuses existing memory

Correctness Guaranteed

  • Mathematical precision: Exact GCRA implementation
  • Type safety: Rust’s type system prevents common errors
  • Comprehensive testing: >95% code coverage with deterministic tests
  • No silent failures: All errors are explicitly handled

Observability Built-in

  • Rich metadata: Every decision includes detailed information
  • Error transparency: Clear error types and recovery paths
  • Memory tracking: Built-in cleanup and monitoring capabilities
  • Production-ready: Designed for monitoring and debugging

Flexible and Extensible

  • Generic client IDs: Works with any hashable type
  • Pluggable time sources: Clock abstraction for testing
  • Composable: Multiple limiters for complex scenarios
  • Future-proof: Architecture supports future enhancements

System Requirements

Dependencies

  • DashMap: Lock-free concurrent hash map
  • Rust Standard Library: Minimal external dependencies
  • No async runtime: Works with any async runtime (Tokio, async-std, etc.)

Performance Characteristics

  • Memory: O(number of active clients)
  • Time complexity: O(1) for check_request()
  • Concurrency: Lock-free reads and writes
  • Precision: Nanosecond timing accuracy
  • Throughput: Millions of operations per second

Scalability

Clients     Memory Usage    Latency
1K          ~32KB          < 1μs
100K        ~3.2MB         < 1μs
1M          ~32MB          < 1μs
10M         ~320MB         < 1μs

Architecture Layers

1. API Layer

Public interface for rate limiting operations:

  • check_request(): Primary rate limiting decision
  • cleanup_stale_clients(): Memory management
  • rate(), burst(): Configuration inspection

2. Algorithm Layer

GCRA implementation with nanosecond precision:

  • Theoretical Arrival Time (TAT) calculation
  • Conformance checking
  • Metadata computation

3. Storage Layer

Thread-safe state management:

  • DashMap for concurrent access
  • Atomic operations for consistency
  • Efficient memory layout

4. Time Abstraction Layer

Clock trait for time source flexibility:

  • SystemClock for production
  • TestClock for deterministic testing
  • Custom clocks for special scenarios

Next Steps

GCRA Algorithm

Deep dive into the Generic Cell Rate Algorithm implementation in Flux Limiter.

Algorithm Choice

Generic Cell Rate Algorithm (GCRA) was chosen over Token Bucket for several reasons:

  1. Mathematical Precision: Avoids floating-point precision issues
  2. Stateless Calculation: No background token refill processes
  3. Efficient State: One timestamp per client vs. token count + last refill
  4. Deterministic: Exact timing calculations with integer arithmetic

GCRA vs Token Bucket

Token Bucket

  • Maintains a count of available tokens
  • Tokens refill at a constant rate
  • Requests consume tokens
  • Requires background refill process or calculation on each check

GCRA

  • Maintains Theoretical Arrival Time (TAT)
  • Checks if current time conforms to rate
  • Updates TAT for next request
  • No background processes needed

Equivalence: GCRA and Token Bucket produce mathematically equivalent results, but GCRA is more efficient for implementation.

Core Concepts

Theoretical Arrival Time (TAT)

The TAT represents the theoretical time when the next request should arrive to maintain the configured rate.

Initial state:    TAT = 0
After request 1:  TAT = current_time + rate_interval
After request 2:  TAT = max(current_time, previous_TAT) + rate_interval

Rate Interval (T)

Time between consecutive requests at the configured rate:

T = 1 / rate_per_second
T_nanos = 1_000_000_000 / rate_per_second

Examples:

  • rate = 10.0T = 0.1s = 100_000_000 nanos
  • rate = 100.0T = 0.01s = 10_000_000 nanos
  • rate = 1000.0T = 0.001s = 1_000_000 nanos

Tolerance (τ)

Maximum allowed deviation from the rate, determined by burst capacity:

τ = burst_capacity * rate_interval
τ_nanos = burst_capacity * (1_000_000_000 / rate_per_second)

Examples with rate = 10.0:

  • burst = 0.0τ = 0 nanos (no burst)
  • burst = 5.0τ = 500_000_000 nanos (0.5 seconds)
  • burst = 10.0τ = 1_000_000_000 nanos (1 second)

Algorithm Implementation

Conformance Check

#![allow(unused)]
fn main() {
let current_time_nanos = clock.now()?;
let previous_tat_nanos = client_state
    .get(&client_id)
    .unwrap_or(current_time_nanos);

// Check if request conforms (is within tolerance)
let is_conforming = current_time_nanos >=
    previous_tat_nanos.saturating_sub(tolerance_nanos);
}

Conforming Request: current_time >= TAT - τ

This allows requests to arrive up to τ nanoseconds early (burst capacity).

TAT Update (Allowed Request)

#![allow(unused)]
fn main() {
if is_conforming {
    // Allow request and update TAT
    let new_tat_nanos = current_time_nanos
        .max(previous_tat_nanos) + rate_nanos;

    client_state.insert(client_id, new_tat_nanos);

    // Return allowed decision
}
}

TAT Update Rule: TAT' = max(current_time, previous_TAT) + T

This ensures TAT advances by the rate interval while preventing it from going backward.

Retry After Calculation (Denied Request)

#![allow(unused)]
fn main() {
else {
    // Deny request
    let retry_after_nanos = previous_tat_nanos
        .saturating_sub(tolerance_nanos)
        .saturating_sub(current_time_nanos);

    let retry_after_seconds = retry_after_nanos as f64 / 1_000_000_000.0;

    // Return denied decision with retry_after
}
}

Retry After: Time until the request would conform to the rate.

Mathematical Foundation

Basic Equations

  1. Rate Interval: T = 1 / r where r is rate per second
  2. Tolerance: τ = b * T where b is burst capacity
  3. TAT Update: TAT' = max(t, TAT) + T where t is current time
  4. Conformance: Request allowed if t >= TAT - τ

Burst Capacity

The burst capacity determines how many requests can be made immediately:

Total immediate capacity ≈ 1 + burst_capacity

Proof:

  • First request at t=0: TAT becomes T
  • Can make requests at times: 0, T-τ, 2T-τ, 3T-τ, ...
  • Maximum burst when all accumulated: τ/T = burst_capacity
  • Plus the current request: 1 + burst_capacity

Burst Recovery

After using burst capacity, it recovers at the configured rate:

Recovery rate = rate_per_second
Time to full recovery = burst_capacity / rate_per_second

Example with rate=10.0, burst=5.0:

  • Can burst 6 requests immediately
  • Recovers at 10 units/second
  • Full recovery in 0.5 seconds

Precision Guarantees

Nanosecond Arithmetic

All calculations use u64 nanoseconds:

#![allow(unused)]
fn main() {
// No floating-point drift
let rate_nanos: u64 = (1_000_000_000.0 / rate_per_second) as u64;
let tolerance_nanos: u64 = (burst_capacity * rate_nanos as f64) as u64;
}

Benefits:

  • Exact integer arithmetic
  • No accumulating rounding errors
  • Supports rates up to 1 billion requests/second
  • Nanosecond precision for timing

Overflow Protection

Uses saturating arithmetic to prevent overflow:

#![allow(unused)]
fn main() {
let is_conforming = current_time_nanos >=
    previous_tat_nanos.saturating_sub(tolerance_nanos);
}

Saturating operations:

  • saturating_sub: Returns 0 if underflow would occur
  • saturating_add: Returns u64::MAX if overflow would occur
  • Prevents undefined behavior and panics

Example Scenarios

Scenario 1: Sustained Rate

Configuration: rate=10.0, burst=0.0

t=0.0s:   Request → Allowed (TAT=0.1s)
t=0.1s:   Request → Allowed (TAT=0.2s)
t=0.2s:   Request → Allowed (TAT=0.3s)
t=0.25s:  Request → Denied (retry after 0.05s)
t=0.3s:   Request → Allowed (TAT=0.4s)

Scenario 2: Burst Capacity

Configuration: rate=10.0, burst=5.0

t=0.0s:   Request → Allowed (TAT=0.1s)
t=0.0s:   Request → Allowed (TAT=0.2s)  [burst]
t=0.0s:   Request → Allowed (TAT=0.3s)  [burst]
t=0.0s:   Request → Allowed (TAT=0.4s)  [burst]
t=0.0s:   Request → Allowed (TAT=0.5s)  [burst]
t=0.0s:   Request → Allowed (TAT=0.6s)  [burst]
t=0.0s:   Request → Denied (retry after 0.1s)
t=0.1s:   Request → Allowed (TAT=0.7s)

Scenario 3: Recovery After Idle

Configuration: rate=10.0, burst=5.0

t=0.0s:   6 requests → All allowed, TAT=0.6s
t=1.0s:   Request → Allowed (TAT=1.1s)  [burst recovered]
t=1.0s:   6 requests → All allowed, TAT=1.6s

Metadata Calculation

Remaining Capacity

#![allow(unused)]
fn main() {
let elapsed = current_time_nanos.saturating_sub(previous_tat_nanos);
let recovered = (elapsed as f64 / rate_nanos as f64).min(burst_capacity);
let remaining_capacity = Some(burst_capacity - consumed + recovered);
}

Tracks how much burst capacity is currently available.

Reset Time

#![allow(unused)]
fn main() {
let reset_time_nanos = new_tat_nanos + tolerance_nanos;
}

When the rate limit window will fully reset (all burst capacity recovered).

Performance Characteristics

  • Time Complexity: O(1) - constant time for all operations
  • Space Complexity: O(1) per client - single u64 timestamp
  • Calculation Overhead: ~10-20 CPU cycles for GCRA calculation
  • Memory Access: Single DashMap lookup/insert

Next Steps

Component Design

Detailed exploration of Flux Limiter’s core components and their design rationale.

FluxLimiter<T, C>

The main rate limiter struct uses generics for flexibility.

Structure Definition

#![allow(unused)]
fn main() {
pub struct FluxLimiter<T, C = SystemClock>
where
    T: Hash + Eq + Clone,  // Client identifier type
    C: Clock,              // Time source
{
    rate_nanos: u64,                    // Rate interval in nanoseconds
    tolerance_nanos: u64,               // Burst tolerance in nanoseconds
    client_state: Arc<DashMap<T, u64>>, // Client TAT storage
    clock: C,                           // Time abstraction
}
}

Design Rationale

Generic Client ID (T):

  • Supports String, IpAddr, u64, custom types
  • Constrains: Hash + Eq + Clone
  • Zero-cost abstraction - no runtime overhead

Generic Clock (C):

  • Default: SystemClock for production
  • Alternative: TestClock for testing
  • Custom: User-defined time sources
  • Constrains: Clock trait

Arc:

  • Thread-safe shared ownership
  • Lock-free concurrent access
  • Minimal contention
  • Efficient cloning

Nanosecond Storage:

  • u64 for rate and tolerance
  • Maintains precision throughout calculations
  • Avoids floating-point arithmetic in hot path

Public API

#![allow(unused)]
fn main() {
impl<T, C> FluxLimiter<T, C>
where
    T: Hash + Eq + Clone,
    C: Clock,
{
    pub fn with_config(
        config: FluxLimiterConfig,
        clock: C,
    ) -> Result<Self, FluxLimiterError>

    pub fn check_request(
        &self,
        client_id: T,
    ) -> Result<FluxLimiterDecision, FluxLimiterError>

    pub fn cleanup_stale_clients(
        &self,
        stale_threshold_nanos: u64,
    ) -> Result<usize, FluxLimiterError>

    pub fn rate(&self) -> f64
    pub fn burst(&self) -> f64
}
}

FluxLimiterConfig

Configuration management with builder pattern support.

Structure Definition

#![allow(unused)]
fn main() {
#[derive(Debug, Clone)]
pub struct FluxLimiterConfig {
    rate_per_second: f64,   // User-friendly rate specification
    burst_capacity: f64,    // User-friendly burst specification
}
}

Design Rationale

User-Friendly Units:

  • rate_per_second: Intuitive “requests per second”
  • burst_capacity: Additional burst allowance
  • Converted to nanoseconds internally

Builder Pattern:

#![allow(unused)]
fn main() {
impl FluxLimiterConfig {
    pub fn new(rate_per_second: f64, burst_capacity: f64) -> Self
    pub fn rate(mut self, rate_per_second: f64) -> Self
    pub fn burst(mut self, burst_capacity: f64) -> Self
    pub fn validate(&self) -> Result<(), FluxLimiterError>
}
}

Validation

#![allow(unused)]
fn main() {
pub fn validate(&self) -> Result<(), FluxLimiterError> {
    if self.rate_per_second <= 0.0 {
        return Err(FluxLimiterError::InvalidRate);
    }
    if self.burst_capacity < 0.0 {
        return Err(FluxLimiterError::InvalidBurst);
    }
    Ok(())
}
}

Validation Rules:

  • Rate must be positive (> 0.0)
  • Burst must be non-negative (≥ 0.0)
  • Checked at construction time

FluxLimiterDecision

Rich metadata returned from rate limiting decisions.

Structure Definition

#![allow(unused)]
fn main() {
#[derive(Debug, Clone, PartialEq)]
pub struct FluxLimiterDecision {
    pub allowed: bool,                    // Primary decision
    pub retry_after_seconds: Option<f64>, // When to retry (if denied)
    pub remaining_capacity: Option<f64>,  // Remaining burst capacity
    pub reset_time_nanos: u64,           // When window resets
}
}

Design Rationale

Rich Metadata Enables:

  • HTTP rate limit headers (X-RateLimit-Remaining, Retry-After)
  • Client-side backoff strategies
  • Monitoring and observability
  • Debugging and diagnostics

Field Details:

  1. allowed: bool

    • Primary decision
    • true = allow request, false = deny
  2. retry_after_seconds: Option<f64>

    • Some(seconds) when denied
    • How long to wait before retrying
    • Used for HTTP Retry-After header
  3. remaining_capacity: Option<f64>

    • Current burst capacity remaining
    • Useful for X-RateLimit-Remaining header
    • Helps clients understand their quota
  4. reset_time_nanos: u64

    • Nanosecond timestamp when limit resets
    • Convert to HTTP X-RateLimit-Reset header
    • Absolute time, not relative

FluxLimiterError

Comprehensive error handling for robust production usage.

Enum Definition

#![allow(unused)]
fn main() {
#[derive(Debug, Clone, PartialEq)]
pub enum FluxLimiterError {
    InvalidRate,           // Configuration: rate ≤ 0
    InvalidBurst,          // Configuration: burst < 0
    ClockError(ClockError), // Runtime: clock failure
}

#[derive(Debug, Clone, PartialEq)]
pub enum ClockError {
    SystemTimeError,       // System time unavailable
}
}

Design Rationale

Explicit Error Types:

  • Configuration errors (InvalidRate, InvalidBurst)
  • Runtime errors (ClockError)
  • Clear separation of error categories

Error Display:

#![allow(unused)]
fn main() {
impl std::fmt::Display for FluxLimiterError {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        match self {
            Self::InvalidRate => write!(f, "Invalid rate: must be positive"),
            Self::InvalidBurst => write!(f, "Invalid burst: must be non-negative"),
            Self::ClockError(e) => write!(f, "Clock error: {}", e),
        }
    }
}
}

Error Conversion:

#![allow(unused)]
fn main() {
impl From<ClockError> for FluxLimiterError {
    fn from(error: ClockError) -> Self {
        FluxLimiterError::ClockError(error)
    }
}
}

Clock Trait

Abstraction for pluggable time sources.

Trait Definition

#![allow(unused)]
fn main() {
pub trait Clock: Send + Sync {
    fn now(&self) -> Result<u64, ClockError>;
}
}

Design Rationale

Benefits:

  • Enables deterministic testing
  • Handles real-world clock issues
  • Allows custom time sources
  • Graceful error handling

Send + Sync Requirements:

  • Thread-safe: Can be shared across threads
  • Required for concurrent rate limiting

SystemClock

Production time source using system time:

#![allow(unused)]
fn main() {
#[derive(Clone, Copy, Debug)]
pub struct SystemClock;

impl Clock for SystemClock {
    fn now(&self) -> Result<u64, ClockError> {
        SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .map(|d| d.as_nanos() as u64)
            .map_err(|_| ClockError::SystemTimeError)
    }
}
}

Handles Real-World Issues:

  • System clock going backwards (NTP adjustments)
  • Clock resolution limitations
  • System suspend/resume
  • Virtualization time skips

TestClock

Deterministic time source for testing:

#![allow(unused)]
fn main() {
pub struct TestClock {
    time: Arc<AtomicU64>,        // Current time in nanoseconds
    should_fail: Arc<AtomicBool>, // Failure simulation flag
}

impl TestClock {
    pub fn new(initial_time_secs: f64) -> Self
    pub fn advance(&self, duration_secs: f64)
    pub fn set_time(&self, time_secs: f64)
    pub fn fail_next_call(&self)
}
}

Features:

  • Precise time control
  • Failure simulation
  • Thread-safe
  • Deterministic testing

Memory Layout

FluxLimiter Size

#![allow(unused)]
fn main() {
FluxLimiter<String, SystemClock> {
    rate_nanos: u64,         // 8 bytes
    tolerance_nanos: u64,    // 8 bytes
    client_state: Arc<..>,   // 8 bytes (pointer)
    clock: SystemClock,      // 0 bytes (zero-sized type)
}
// Total: 24 bytes
}

Cache Efficiency:

  • Small struct size fits in cache line
  • Frequently accessed fields grouped
  • Arc enables cheap cloning

Per-Client State

#![allow(unused)]
fn main() {
DashMap<String, u64> entry:
    String: ~24 bytes (pointer + len + capacity)
    u64:    8 bytes
    Overhead: ~16 bytes (hash map metadata)
// Total per client: ~48 bytes
}

Scalability

1,000 clients     = ~48 KB
10,000 clients    = ~480 KB
100,000 clients   = ~4.8 MB
1,000,000 clients = ~48 MB

Thread Safety

All components are thread-safe:

  • FluxLimiter: Safe to share via Arc
  • DashMap: Lock-free concurrent access
  • Clock: Send + Sync requirement
  • Decisions: Immutable after creation

Next Steps

Concurrency Model

Understanding the lock-free concurrency design in Flux Limiter.

Lock-Free Design Philosophy

Flux Limiter is built on a lock-free concurrency model for maximum performance:

Multiple Threads
      ↓
FluxLimiter (Shared via Arc)
      ↓
Arc<DashMap<ClientId, TAT>>
      ↓
Lock-free hash map operations

Thread Safety Guarantees

1. Read Operations

Multiple concurrent readers without contention:

#![allow(unused)]
fn main() {
// Thread 1
limiter.check_request("client_1")?;

// Thread 2 (concurrent)
limiter.check_request("client_2")?;

// No lock contention - different shards
}

2. Write Operations

Atomic updates with minimal contention:

#![allow(unused)]
fn main() {
// Atomic TAT update
client_state.insert(client_id, new_tat);
}

Each client’s state is independent, allowing parallel updates.

3. Memory Ordering

Relaxed ordering is sufficient for TAT updates:

  • TAT values are monotonically increasing
  • No cross-client dependencies
  • No ABA problem

4. Concurrent Access Patterns

Same Client, Different Threads:

#![allow(unused)]
fn main() {
// Thread 1
limiter.check_request("client_1")?; // TAT = 100ms

// Thread 2 (concurrent)
limiter.check_request("client_1")?; // TAT = 200ms

// DashMap serializes access to same key
// Ordering determined by DashMap's internal locking
}

Different Clients, Different Threads:

#![allow(unused)]
fn main() {
// Thread 1
limiter.check_request("client_1")?;

// Thread 2 (concurrent)
limiter.check_request("client_2")?;

// Lock-free - different shards, no contention
}

DashMap: The Core Concurrency Primitive

Why DashMap?

DashMap provides lock-free concurrent access through sharding:

DashMap<ClientId, TAT>
    ├─ Shard 0: HashMap + RwLock
    ├─ Shard 1: HashMap + RwLock
    ├─ Shard 2: HashMap + RwLock
    ├─ ...
    └─ Shard N: HashMap + RwLock

Benefits:

  • Better than std::HashMap + Mutex: Avoids global locking
  • Better than RwLock: No reader/writer contention
  • Better than custom lock-free map: Mature, battle-tested
  • Segmented locking: Reduces contention vs. single lock

Sharding Strategy

#![allow(unused)]
fn main() {
// Client IDs are hashed to shards
shard_index = hash(client_id) % num_shards

// Different clients likely in different shards
// Same client always in same shard (consistency)
}

Operations on DashMap

Get (Read):

#![allow(unused)]
fn main() {
if let Some(previous_tat) = client_state.get(&client_id) {
    // Read lock on single shard
    // Other shards remain accessible
}
}

Insert (Write):

#![allow(unused)]
fn main() {
client_state.insert(client_id, new_tat);
// Write lock on single shard
// Other shards remain accessible
}

Cleanup (Iteration):

#![allow(unused)]
fn main() {
client_state.retain(|_, &mut tat| {
    // Locks each shard sequentially
    // Short lock duration per shard
    current_time - tat < threshold
});
}

Memory Consistency

Happens-Before Relationships

DashMap provides happens-before guarantees:

#![allow(unused)]
fn main() {
// Thread 1
limiter.check_request("client_1")?; // Writes TAT = 100ms

// Thread 2 (later)
limiter.check_request("client_1")?; // Reads TAT >= 100ms
}

The write in Thread 1 happens-before the read in Thread 2.

Visibility Guarantees

All updates to client state are immediately visible:

#![allow(unused)]
fn main() {
// Thread 1
client_state.insert("client_1", 100);

// Thread 2
let tat = client_state.get("client_1"); // Sees 100
}

DashMap ensures proper memory synchronization.

Sharing Across Threads

Arc for Shared Ownership

#![allow(unused)]
fn main() {
use std::sync::Arc;

let config = FluxLimiterConfig::new(100.0, 50.0);
let limiter = Arc::new(FluxLimiter::with_config(config, SystemClock)?);

// Clone Arc for each thread (cheap)
let limiter1 = Arc::clone(&limiter);
let limiter2 = Arc::clone(&limiter);

// Use in different threads
thread::spawn(move || limiter1.check_request("client_1"));
thread::spawn(move || limiter2.check_request("client_2"));
}

Arc Benefits:

  • Reference counting with atomic operations
  • Thread-safe shared ownership
  • Cheap to clone (increments counter)
  • No data duplication

Internal Arc Usage

#![allow(unused)]
fn main() {
pub struct FluxLimiter<T, C> {
    // ...
    client_state: Arc<DashMap<T, u64>>, // Shared across clones
    // ...
}
}

Cloning FluxLimiter shares the same state:

#![allow(unused)]
fn main() {
let limiter1 = limiter.clone();
let limiter2 = limiter.clone();

// Both share the same client_state
// Updates visible to both
}

Contention Analysis

Low Contention Scenarios

Different clients, concurrent access:

Threads: [T1: client_1] [T2: client_2] [T3: client_3]
Shards:  [Shard 0    ] [Shard 1    ] [Shard 2    ]
Result:  Lock-free, no contention

Medium Contention Scenarios

Same shard, different clients:

Threads: [T1: client_1] [T2: client_100]
Shards:  [Shard 0    ] [Shard 0      ]
Result:  Some contention, but short lock duration

High Contention Scenarios

Same client, concurrent access:

Threads: [T1: client_1] [T2: client_1] [T3: client_1]
Shards:  [Shard 0    ] [Shard 0    ] [Shard 0    ]
Result:  Serialized access (expected for consistency)

This is correct behavior: Rate limiting the same client from multiple threads should be serialized to maintain correctness.

Performance Under Concurrency

Scalability Characteristics

Threads  Throughput  Latency
1        1.0x        1.0μs
2        1.9x        1.0μs
4        3.7x        1.1μs
8        7.0x        1.2μs
16       12.5x       1.3μs

Nearly linear scalability with different clients.

Benchmark: Concurrent Access

#![allow(unused)]
fn main() {
#[bench]
fn bench_concurrent_different_clients(b: &mut Bencher) {
    let limiter = Arc::new(FluxLimiter::with_config(
        FluxLimiterConfig::new(1000.0, 500.0),
        SystemClock
    ).unwrap());

    b.iter(|| {
        let handles: Vec<_> = (0..8)
            .map(|i| {
                let limiter = Arc::clone(&limiter);
                thread::spawn(move || {
                    for j in 0..1000 {
                        let client_id = format!("client_{}_{}", i, j);
                        limiter.check_request(client_id).unwrap();
                    }
                })
            })
            .collect();

        for handle in handles {
            handle.join().unwrap();
        }
    });
}
}

Clock Thread Safety

The Clock trait requires Send + Sync:

#![allow(unused)]
fn main() {
pub trait Clock: Send + Sync {
    fn now(&self) -> Result<u64, ClockError>;
}
}

SystemClock:

#![allow(unused)]
fn main() {
impl Clock for SystemClock {
    fn now(&self) -> Result<u64, ClockError> {
        // SystemTime::now() is thread-safe
        SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .map(|d| d.as_nanos() as u64)
            .map_err(|_| ClockError::SystemTimeError)
    }
}
}

TestClock:

#![allow(unused)]
fn main() {
pub struct TestClock {
    time: Arc<AtomicU64>,        // Atomic for thread safety
    should_fail: Arc<AtomicBool>, // Atomic for thread safety
}
}

Both implementations are thread-safe.

Cleanup Concurrency

Cleanup can run concurrently with rate limiting:

#![allow(unused)]
fn main() {
// Thread 1: Rate limiting
limiter.check_request("client_1")?;

// Thread 2: Cleanup (concurrent)
limiter.cleanup_stale_clients(threshold)?;

// Thread 3: More rate limiting (concurrent)
limiter.check_request("client_2")?;
}

DashMap’s retain method:

  • Locks each shard briefly
  • Other shards remain accessible
  • Minimal impact on concurrent operations

Best Practices

1. Share via Arc

#![allow(unused)]
fn main() {
// Good: Share limiter across threads
let limiter = Arc::new(FluxLimiter::with_config(config, clock)?);

// Bad: Create multiple limiters (separate state)
let limiter1 = FluxLimiter::with_config(config, clock1)?;
let limiter2 = FluxLimiter::with_config(config, clock2)?;
}

2. Use Different Client IDs

#![allow(unused)]
fn main() {
// Good: Different clients = low contention
for i in 0..1000 {
    limiter.check_request(format!("client_{}", i))?;
}

// Bad: Same client = serialized access
for _ in 0..1000 {
    limiter.check_request("same_client")?;
}
}

3. Avoid Blocking in Callbacks

#![allow(unused)]
fn main() {
// Good: Quick check
match limiter.check_request(client_id) {
    Ok(decision) => decision.allowed,
    Err(_) => false,
}

// Bad: Blocking I/O while holding client state
match limiter.check_request(client_id) {
    Ok(decision) => {
        // Don't do expensive work here
        database.log_decision(decision)?; // Blocks!
        decision.allowed
    }
    Err(_) => false,
}
}

4. Periodic Cleanup

#![allow(unused)]
fn main() {
// Background cleanup task
tokio::spawn(async move {
    let mut interval = interval(Duration::from_secs(3600));
    loop {
        interval.tick().await;
        let _ = limiter.cleanup_stale_clients(threshold);
    }
});
}

Next Steps

Error Handling Architecture

Comprehensive error handling design for production reliability.

Error Type Hierarchy

FluxLimiterError
├── InvalidRate        (Configuration)
├── InvalidBurst       (Configuration)
└── ClockError         (Runtime)
    └── SystemTimeError

Design Principles

1. Explicit Error Types

No silent failures - all errors are explicitly typed:

#![allow(unused)]
fn main() {
pub enum FluxLimiterError {
    InvalidRate,           // Rate ≤ 0
    InvalidBurst,          // Burst < 0
    ClockError(ClockError), // System time failure
}
}

2. Configuration vs Runtime Errors

Configuration Errors (at startup):

  • InvalidRate: Caught during limiter creation
  • InvalidBurst: Caught during limiter creation
  • Should never occur in production after startup

Runtime Errors (during operation):

  • ClockError: Can occur anytime
  • Requires graceful handling
  • Application chooses policy

3. Graceful Degradation

Applications can choose error handling policy:

  • Fail-open: Allow requests on errors
  • Fail-closed: Deny requests on errors
  • Fallback: Use alternative rate limiting

Configuration Error Handling

Early Validation

Validate configuration at startup:

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = FluxLimiterConfig::new(100.0, 50.0);

    let limiter = FluxLimiter::with_config(config, SystemClock)
        .map_err(|e| match e {
            FluxLimiterError::InvalidRate => {
                "Invalid rate: must be positive".to_string()
            }
            FluxLimiterError::InvalidBurst => {
                "Invalid burst: must be non-negative".to_string()
            }
            _ => format!("Configuration error: {}", e),
        })?;

    // Use limiter...
    Ok(())
}

Explicit Validation

#![allow(unused)]
fn main() {
impl FluxLimiterConfig {
    pub fn validate(&self) -> Result<(), FluxLimiterError> {
        if self.rate_per_second <= 0.0 {
            return Err(FluxLimiterError::InvalidRate);
        }
        if self.burst_capacity < 0.0 {
            return Err(FluxLimiterError::InvalidBurst);
        }
        Ok(())
    }
}
}

When to use:

  • Before storing configuration
  • When loading from external sources
  • Before creating rate limiter

Runtime Clock Errors

Clock Error Sources

Clock errors occur when:

  1. System Time Unavailable

    • System clock not accessible
    • Permissions issues
    • Platform limitations
  2. Time Goes Backward

    • NTP adjustments
    • Manual clock changes
    • Virtualization issues
  3. Time Discontinuities

    • System suspend/resume
    • Hibernation
    • Container migrations

Error Propagation

Clock::now() → Result<u64, ClockError>
     ↓
FluxLimiter::check_request() → Result<Decision, FluxLimiterError>
     ↓
Application Layer → Implements error policy

Handling Clock Errors

Fail-Open Policy

Allow requests when clock fails:

#![allow(unused)]
fn main() {
match limiter.check_request(client_id) {
    Ok(decision) => decision.allowed,
    Err(FluxLimiterError::ClockError(_)) => {
        eprintln!("Clock error - allowing request (fail-open)");
        true
    }
    Err(e) => {
        eprintln!("Unexpected error: {}", e);
        true
    }
}
}

Use when:

  • Availability > strict rate limiting
  • False positives acceptable
  • Backend can handle spikes

Fail-Closed Policy

Deny requests when clock fails:

#![allow(unused)]
fn main() {
match limiter.check_request(client_id) {
    Ok(decision) => decision.allowed,
    Err(FluxLimiterError::ClockError(_)) => {
        eprintln!("Clock error - denying request (fail-closed)");
        false
    }
    Err(e) => {
        eprintln!("Unexpected error: {}", e);
        false
    }
}
}

Use when:

  • Security paramount
  • False negatives acceptable
  • Protecting backend critical

Fallback Policy

Use alternative when clock fails:

#![allow(unused)]
fn main() {
use std::sync::atomic::{AtomicU64, Ordering};

static FALLBACK_COUNTER: AtomicU64 = AtomicU64::new(0);
const FALLBACK_LIMIT: u64 = 1000;

match limiter.check_request(client_id) {
    Ok(decision) => decision.allowed,
    Err(FluxLimiterError::ClockError(_)) => {
        let count = FALLBACK_COUNTER.fetch_add(1, Ordering::Relaxed);
        count < FALLBACK_LIMIT
    }
    Err(_) => false,
}
}

Use when:

  • Need graceful degradation
  • Have fallback mechanism
  • Want best effort

Error Monitoring

Tracking Error Rates

#![allow(unused)]
fn main() {
use std::sync::atomic::{AtomicU64, Ordering};

struct ErrorMetrics {
    total_requests: AtomicU64,
    clock_errors: AtomicU64,
    config_errors: AtomicU64,
}

impl ErrorMetrics {
    fn record_result(&self, result: &Result<FluxLimiterDecision, FluxLimiterError>) {
        self.total_requests.fetch_add(1, Ordering::Relaxed);

        if let Err(e) = result {
            match e {
                FluxLimiterError::ClockError(_) => {
                    self.clock_errors.fetch_add(1, Ordering::Relaxed);
                }
                FluxLimiterError::InvalidRate | FluxLimiterError::InvalidBurst => {
                    self.config_errors.fetch_add(1, Ordering::Relaxed);
                }
            }
        }
    }

    fn error_rate(&self) -> f64 {
        let total = self.total_requests.load(Ordering::Relaxed);
        let errors = self.clock_errors.load(Ordering::Relaxed);

        if total == 0 {
            0.0
        } else {
            errors as f64 / total as f64
        }
    }
}
}

Alerting on Errors

#![allow(unused)]
fn main() {
fn check_with_alerting(
    limiter: &FluxLimiter<String, SystemClock>,
    metrics: &ErrorMetrics,
    client_id: String,
) -> bool {
    let result = limiter.check_request(client_id);
    metrics.record_result(&result);

    // Alert if error rate exceeds threshold
    if metrics.error_rate() > 0.01 {
        eprintln!(
            "ALERT: Clock error rate exceeded 1%: {:.2}%",
            metrics.error_rate() * 100.0
        );
    }

    match result {
        Ok(decision) => decision.allowed,
        Err(FluxLimiterError::ClockError(_)) => true, // Fail-open
        Err(_) => false,
    }
}
}

Circuit Breaker Pattern

Temporarily bypass rate limiting after consecutive failures:

#![allow(unused)]
fn main() {
use std::sync::atomic::{AtomicU64, AtomicBool, Ordering};

struct CircuitBreaker {
    consecutive_failures: AtomicU64,
    circuit_open: AtomicBool,
    threshold: u64,
}

impl CircuitBreaker {
    fn new(threshold: u64) -> Self {
        Self {
            consecutive_failures: AtomicU64::new(0),
            circuit_open: AtomicBool::new(false),
            threshold,
        }
    }

    fn record_success(&self) {
        self.consecutive_failures.store(0, Ordering::Relaxed);
        self.circuit_open.store(false, Ordering::Relaxed);
    }

    fn record_failure(&self) -> bool {
        let failures = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;

        if failures >= self.threshold {
            if !self.circuit_open.swap(true, Ordering::Relaxed) {
                eprintln!("Circuit breaker opened after {} failures", failures);
            }
            true
        } else {
            false
        }
    }

    fn is_open(&self) -> bool {
        self.circuit_open.load(Ordering::Relaxed)
    }
}

fn check_with_circuit_breaker(
    limiter: &FluxLimiter<String, SystemClock>,
    breaker: &CircuitBreaker,
    client_id: String,
) -> bool {
    if breaker.is_open() {
        return true; // Bypass rate limiting
    }

    match limiter.check_request(client_id) {
        Ok(decision) => {
            breaker.record_success();
            decision.allowed
        }
        Err(FluxLimiterError::ClockError(_)) => {
            breaker.record_failure();
            true // Fail-open
        }
        Err(_) => false,
    }
}
}

Cleanup Error Handling

Cleanup errors are typically non-critical:

#![allow(unused)]
fn main() {
match limiter.cleanup_stale_clients(threshold) {
    Ok(count) => {
        info!("Cleaned up {} stale clients", count);
    }
    Err(FluxLimiterError::ClockError(_)) => {
        warn!("Clock error during cleanup - will retry later");
        // Cleanup failure is not critical - continue operation
    }
    Err(e) => {
        error!("Unexpected cleanup error: {}", e);
    }
}
}

Error Recovery Strategies

1. Retry with Backoff

#![allow(unused)]
fn main() {
async fn check_with_retry(
    limiter: &FluxLimiter<String, SystemClock>,
    client_id: String,
    max_retries: u32,
) -> Result<FluxLimiterDecision, FluxLimiterError> {
    let mut retries = 0;
    let mut delay = Duration::from_millis(10);

    loop {
        match limiter.check_request(client_id.clone()) {
            Ok(decision) => return Ok(decision),
            Err(FluxLimiterError::ClockError(_)) if retries < max_retries => {
                eprintln!("Clock error, retrying in {:?}", delay);
                tokio::time::sleep(delay).await;
                delay *= 2;
                retries += 1;
            }
            Err(e) => return Err(e),
        }
    }
}
}

2. Degrade Gracefully

#![allow(unused)]
fn main() {
enum RateLimitStrategy {
    Primary(FluxLimiter<String, SystemClock>),
    Fallback(SimpleCounter),
}

impl RateLimitStrategy {
    fn check_request(&mut self, client_id: String) -> bool {
        match self {
            Self::Primary(limiter) => {
                match limiter.check_request(client_id) {
                    Ok(decision) => decision.allowed,
                    Err(FluxLimiterError::ClockError(_)) => {
                        // Switch to fallback
                        eprintln!("Switching to fallback strategy");
                        *self = Self::Fallback(SimpleCounter::new());
                        true
                    }
                    Err(_) => false,
                }
            }
            Self::Fallback(counter) => counter.check(),
        }
    }
}
}

3. Log and Continue

#![allow(unused)]
fn main() {
fn check_with_logging(
    limiter: &FluxLimiter<String, SystemClock>,
    client_id: String,
) -> bool {
    match limiter.check_request(client_id.clone()) {
        Ok(decision) => {
            trace!("Rate limit check succeeded for {}", client_id);
            decision.allowed
        }
        Err(FluxLimiterError::ClockError(e)) => {
            error!("Clock error for {}: {:?}", client_id, e);
            true // Fail-open
        }
        Err(e) => {
            error!("Unexpected error for {}: {:?}", client_id, e);
            false
        }
    }
}
}

Testing Error Handling

Simulating Clock Failures

#![allow(unused)]
fn main() {
#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_clock_error_handling() {
        let clock = TestClock::new(0.0);
        let limiter = FluxLimiter::with_config(
            FluxLimiterConfig::new(10.0, 5.0),
            clock.clone(),
        ).unwrap();

        // Normal operation
        assert!(limiter.check_request("client1").unwrap().allowed);

        // Simulate clock failure
        clock.fail_next_call();
        assert!(matches!(
            limiter.check_request("client1"),
            Err(FluxLimiterError::ClockError(_))
        ));

        // Recovery
        assert!(limiter.check_request("client1").unwrap().allowed);
    }
}
}

Best Practices

  1. Fail Fast on Configuration: Validate at startup
  2. Choose Error Policy: Fail-open, fail-closed, or fallback
  3. Monitor Errors: Track error rates and alert
  4. Log Contextually: Include client ID and error details
  5. Test Error Paths: Use TestClock to simulate failures
  6. Document Policy: Make error handling explicit

Next Steps

Performance Design

Deep dive into performance optimization techniques and characteristics.

Hot Path Optimization

The check_request() method is optimized for minimal latency:

Execution Path

#![allow(unused)]
fn main() {
pub fn check_request(&self, client_id: T) -> Result<FluxLimiterDecision, FluxLimiterError> {
    // 1. Single clock call (~50-100ns)
    let current_time_nanos = self.clock.now()?;

    // 2. Single map lookup (~10-50ns)
    let previous_tat_nanos = self.client_state
        .get(&client_id)
        .map(|entry| *entry.value())
        .unwrap_or(current_time_nanos);

    // 3. Integer arithmetic (~5-10ns)
    let is_conforming = current_time_nanos >=
        previous_tat_nanos.saturating_sub(self.tolerance_nanos);

    // 4. Conditional update (~10-50ns)
    if is_conforming {
        let new_tat_nanos = current_time_nanos
            .max(previous_tat_nanos) + self.rate_nanos;
        self.client_state.insert(client_id, new_tat_nanos);
    }

    // 5. Metadata calculation (~5-10ns)
    // 6. Return decision
}
}

Total latency: ~100-250ns typical case

Optimization Techniques

  1. Single Clock Call: One time source access per request
  2. Single Map Operation: Either get or get+insert
  3. Integer Arithmetic: No floating-point operations in hot path
  4. No Allocations: Reuses existing memory
  5. Minimal Branching: Straight-line execution

Memory Layout

FluxLimiter Structure

#![allow(unused)]
fn main() {
FluxLimiter<String, SystemClock> {
    rate_nanos: u64,         // 8 bytes - cache-friendly
    tolerance_nanos: u64,    // 8 bytes
    client_state: Arc<..>,   // 8 bytes - pointer
    clock: SystemClock,      // 0 bytes - zero-sized type
}
// Total: 24 bytes - fits in single cache line
}

Cache Efficiency:

  • Small struct size (24-32 bytes)
  • Frequently accessed fields grouped
  • Arc enables sharing without duplication
  • Zero-sized clock type (SystemClock)

Per-Client State

#![allow(unused)]
fn main() {
DashMap<String, u64> entry:
    String:   ~24 bytes (pointer + len + capacity)
    u64:      8 bytes (TAT)
    Overhead: ~16 bytes (hash map metadata)
// Total: ~48 bytes per client
}

Memory Scalability

Clients         Memory Usage    Example
1,000           ~48 KB          Small API
10,000          ~480 KB         Medium service
100,000         ~4.8 MB         Large service
1,000,000       ~48 MB          Very large service
10,000,000      ~480 MB         Massive scale

Memory Growth: Linear with number of active clients

Algorithmic Complexity

Time Complexity

OperationComplexityNotes
check_request()O(1)Hash map lookup + arithmetic
cleanup_stale_clients()O(n)Iterates all clients
rate()O(1)Simple field access
burst()O(1)Simple calculation

Space Complexity

ComponentComplexityNotes
Per-client stateO(1)Single u64 per client
Total stateO(n)Where n = active clients
ConfigurationO(1)Fixed size

Concurrency Performance

Lock-Free Benefits

DashMap provides near-linear scalability:

Threads  Throughput   Latency
1        1.0x         100ns
2        1.9x         105ns
4        3.7x         110ns
8        7.0x         120ns
16       12.5x        130ns

Key factors:

  • Different clients = different shards (low contention)
  • Same client = same shard (necessary serialization)
  • Short critical sections

Contention Patterns

Low Contention (different clients):

#![allow(unused)]
fn main() {
// Thread 1
limiter.check_request("client_1")?; // Shard 0
// Thread 2
limiter.check_request("client_2")?; // Shard 1
// No contention - lock-free
}

Medium Contention (same shard):

#![allow(unused)]
fn main() {
// Thread 1
limiter.check_request("client_1")?; // Shard 0
// Thread 2
limiter.check_request("client_100")?; // Shard 0
// Brief contention - short lock
}

High Contention (same client):

#![allow(unused)]
fn main() {
// Thread 1
limiter.check_request("client_1")?; // Shard 0
// Thread 2
limiter.check_request("client_1")?; // Shard 0
// Serialized - correct behavior
}

Benchmark Results

Single-Threaded Performance

#![allow(unused)]
fn main() {
test bench_check_request_allowed ... bench:    85 ns/iter (+/- 5)
test bench_check_request_denied  ... bench:    82 ns/iter (+/- 4)
test bench_cleanup               ... bench:    45 μs/iter (+/- 2)
}

Multi-Threaded Performance

#![allow(unused)]
fn main() {
test bench_concurrent_8_threads  ... bench:    120 ns/iter (+/- 10)
test bench_concurrent_16_threads ... bench:    135 ns/iter (+/- 12)
}

Memory Overhead

#![allow(unused)]
fn main() {
test bench_memory_1k_clients     ... ~48 KB
test bench_memory_100k_clients   ... ~4.8 MB
test bench_memory_1m_clients     ... ~48 MB
}

Optimization Decisions

Integer Arithmetic vs Floating-Point

Integer (chosen):

  • ✅ Exact precision
  • ✅ Faster arithmetic
  • ✅ No rounding errors
  • ✅ Cache-friendly (u64)

Floating-point (rejected):

  • ❌ Precision loss
  • ❌ Slower arithmetic
  • ❌ Accumulating errors
  • ❌ Larger memory footprint

DashMap vs Alternatives

DashMap (chosen):

  • ✅ Lock-free reads/writes
  • ✅ Good scalability
  • ✅ Battle-tested
  • ✅ Ergonomic API

Mutex (rejected):

  • ❌ Global locking
  • ❌ Poor scalability
  • ❌ Read contention

RwLock (rejected):

  • ❌ Reader/writer contention
  • ❌ Write starvation possible

Custom lock-free map (rejected):

  • ❌ Implementation complexity
  • ❌ Maintenance burden
  • ❌ Potential bugs

Nanosecond Precision

u64 nanoseconds (chosen):

  • ✅ Maximum precision
  • ✅ Integer arithmetic
  • ✅ No overflow for 584 years
  • ✅ Direct system time conversion

Milliseconds (rejected):

  • ❌ Insufficient for high rates
  • ❌ Precision loss

Duration (rejected):

  • ❌ Larger memory footprint
  • ❌ More complex arithmetic

Performance Tuning

Client ID Selection

Choose efficient client ID types:

#![allow(unused)]
fn main() {
// Fastest: Numeric IDs
let limiter = FluxLimiter::<u64, _>::with_config(config, clock)?;
limiter.check_request(user_id_numeric)?; // ~80ns

// Fast: IP addresses
let limiter = FluxLimiter::<IpAddr, _>::with_config(config, clock)?;
limiter.check_request(client_ip)?; // ~90ns

// Slower: Strings (requires allocation)
let limiter = FluxLimiter::<String, _>::with_config(config, clock)?;
limiter.check_request(user_id.to_string())?; // ~120ns
}

Cleanup Strategy

Balance memory usage vs. cleanup overhead:

#![allow(unused)]
fn main() {
// Frequent cleanup: Lower memory, higher overhead
tokio::spawn(async move {
    let mut interval = interval(Duration::from_secs(600)); // 10 min
    loop {
        interval.tick().await;
        let _ = limiter.cleanup_stale_clients(hour_nanos);
    }
});

// Infrequent cleanup: Higher memory, lower overhead
tokio::spawn(async move {
    let mut interval = interval(Duration::from_secs(86400)); // 24 hours
    loop {
        interval.tick().await;
        let _ = limiter.cleanup_stale_clients(week_nanos);
    }
});
}

Avoid Allocations

#![allow(unused)]
fn main() {
// Good: Reuse strings
let client_id = format!("user_{}", user_id);
for _ in 0..100 {
    limiter.check_request(client_id.clone())?;
}

// Bad: Allocate every time
for _ in 0..100 {
    limiter.check_request(format!("user_{}", user_id))?;
}
}

Profiling and Monitoring

Latency Monitoring

#![allow(unused)]
fn main() {
use std::time::Instant;

let start = Instant::now();
let decision = limiter.check_request(client_id)?;
let latency = start.elapsed();

if latency.as_micros() > 10 {
    warn!("Slow rate limit check: {:?}", latency);
}
}

Memory Monitoring

#![allow(unused)]
fn main() {
fn estimate_memory_usage<T, C>(limiter: &FluxLimiter<T, C>) -> usize
where
    T: Hash + Eq + Clone,
    C: Clock,
{
    let client_count = estimate_client_count();
    let bytes_per_client = std::mem::size_of::<T>() + 8 + 16; // ID + TAT + overhead
    client_count * bytes_per_client
}
}

Throughput Tracking

#![allow(unused)]
fn main() {
use std::sync::atomic::{AtomicU64, Ordering};

static REQUEST_COUNT: AtomicU64 = AtomicU64::new(0);

fn track_throughput(limiter: &FluxLimiter<String, SystemClock>, client_id: String) -> bool {
    REQUEST_COUNT.fetch_add(1, Ordering::Relaxed);

    match limiter.check_request(client_id) {
        Ok(decision) => decision.allowed,
        Err(_) => false,
    }
}

// Report throughput
fn report_throughput(duration: Duration) {
    let count = REQUEST_COUNT.swap(0, Ordering::Relaxed);
    let throughput = count as f64 / duration.as_secs_f64();
    println!("Throughput: {:.2} req/s", throughput);
}
}

Performance Characteristics Summary

MetricValue
Latency (typical)100-250ns
Throughput (single thread)~10M req/s
Throughput (8 threads)~70M req/s
Memory per client~48 bytes
ScalabilityNear-linear
Cleanup overheadO(n) but infrequent

Best Practices

  1. Use numeric client IDs when possible for best performance
  2. Share limiter via Arc to avoid state duplication
  3. Cleanup periodically to prevent memory growth
  4. Profile in production to identify bottlenecks
  5. Monitor latency and alert on degradation
  6. Batch operations when checking multiple clients

Next Steps

Testing Architecture

Comprehensive testing strategy for deterministic and reliable tests.

Test Organization

Tests are organized in tests/ratelimiter/ with clear separation of concerns:

tests/ratelimiter/
├── fixtures/
│   ├── test_clock.rs      # TestClock implementation
│   └── mod.rs
├── gcra_algorithm_tests.rs # Core algorithm correctness
├── config_tests.rs        # Configuration validation
├── error_tests.rs         # Error handling and recovery
├── cleanup_tests.rs       # Memory management
├── performance_tests.rs   # Performance characteristics
├── decision_metadata_tests.rs # Decision metadata validation
└── main.rs               # Test module organization

TestClock Design

The TestClock is the foundation for deterministic testing.

Implementation

#![allow(unused)]
fn main() {
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, AtomicBool, Ordering};

pub struct TestClock {
    time: Arc<AtomicU64>,        // Current time in nanoseconds
    should_fail: Arc<AtomicBool>, // Failure simulation flag
}

impl TestClock {
    pub fn new(initial_time_secs: f64) -> Self {
        Self {
            time: Arc::new(AtomicU64::new((initial_time_secs * 1e9) as u64)),
            should_fail: Arc::new(AtomicBool::new(false)),
        }
    }

    pub fn advance(&self, duration_secs: f64) {
        let duration_nanos = (duration_secs * 1e9) as u64;
        self.time.fetch_add(duration_nanos, Ordering::SeqCst);
    }

    pub fn set_time(&self, time_secs: f64) {
        self.time.store((time_secs * 1e9) as u64, Ordering::SeqCst);
    }

    pub fn fail_next_call(&self) {
        self.should_fail.store(true, Ordering::SeqCst);
    }
}

impl Clock for TestClock {
    fn now(&self) -> Result<u64, ClockError> {
        if self.should_fail.swap(false, Ordering::SeqCst) {
            return Err(ClockError::SystemTimeError);
        }
        Ok(self.time.load(Ordering::SeqCst))
    }
}
}

Key Features

  1. Deterministic Time: Controlled time progression
  2. Thread-Safe: Can be shared across test threads
  3. Failure Simulation: Can simulate clock errors
  4. Precise Control: Nanosecond-level manipulation

Usage Example

#![allow(unused)]
fn main() {
#[test]
fn test_rate_limiting() {
    let clock = TestClock::new(0.0);
    let limiter = FluxLimiter::with_config(
        FluxLimiterConfig::new(10.0, 5.0),
        clock.clone(),
    ).unwrap();

    // First request at t=0
    assert!(limiter.check_request("client1").unwrap().allowed);

    // Advance time by 0.1 seconds
    clock.advance(0.1);

    // Second request should be allowed
    assert!(limiter.check_request("client1").unwrap().allowed);
}
}

Test Categories

1. GCRA Algorithm Tests

Test core algorithm correctness:

#![allow(unused)]
fn main() {
#[test]
fn test_sustained_rate() {
    let clock = TestClock::new(0.0);
    let config = FluxLimiterConfig::new(10.0, 0.0); // 10 req/s, no burst
    let limiter = FluxLimiter::with_config(config, clock.clone()).unwrap();

    // First request allowed
    assert!(limiter.check_request("client1").unwrap().allowed);

    // Request 0.05s later (too early)
    clock.advance(0.05);
    assert!(!limiter.check_request("client1").unwrap().allowed);

    // Request 0.1s after first (exactly on time)
    clock.advance(0.05);
    assert!(limiter.check_request("client1").unwrap().allowed);
}
}

2. Burst Capacity Tests

Verify burst handling:

#![allow(unused)]
fn main() {
#[test]
fn test_burst_capacity() {
    let clock = TestClock::new(0.0);
    let config = FluxLimiterConfig::new(10.0, 5.0); // 5 request burst
    let limiter = FluxLimiter::with_config(config, clock.clone()).unwrap();

    // Should allow ~6 requests immediately (1 + burst)
    for _ in 0..6 {
        assert!(limiter.check_request("client1").unwrap().allowed);
    }

    // 7th request should be denied
    assert!(!limiter.check_request("client1").unwrap().allowed);

    // After rate interval, allow one more
    clock.advance(0.1);
    assert!(limiter.check_request("client1").unwrap().allowed);
}
}

3. Configuration Tests

Validate configuration handling:

#![allow(unused)]
fn main() {
#[test]
fn test_invalid_rate() {
    let config = FluxLimiterConfig::new(-10.0, 5.0);
    let result = FluxLimiter::with_config(config, SystemClock);

    assert!(matches!(result, Err(FluxLimiterError::InvalidRate)));
}

#[test]
fn test_invalid_burst() {
    let config = FluxLimiterConfig::new(10.0, -5.0);
    let result = FluxLimiter::with_config(config, SystemClock);

    assert!(matches!(result, Err(FluxLimiterError::InvalidBurst)));
}
}

4. Error Handling Tests

Test error scenarios and recovery:

#![allow(unused)]
fn main() {
#[test]
fn test_clock_error_handling() {
    let clock = TestClock::new(0.0);
    let limiter = FluxLimiter::with_config(
        FluxLimiterConfig::new(10.0, 5.0),
        clock.clone(),
    ).unwrap();

    // Normal operation
    assert!(limiter.check_request("client1").unwrap().allowed);

    // Simulate clock failure
    clock.fail_next_call();
    let result = limiter.check_request("client1");
    assert!(matches!(result, Err(FluxLimiterError::ClockError(_))));

    // Verify recovery
    assert!(limiter.check_request("client1").unwrap().allowed);
}

#[test]
fn test_multiple_clock_failures() {
    let clock = TestClock::new(0.0);
    let limiter = FluxLimiter::with_config(
        FluxLimiterConfig::new(10.0, 5.0),
        clock.clone(),
    ).unwrap();

    // Multiple consecutive failures
    for _ in 0..5 {
        clock.fail_next_call();
        assert!(limiter.check_request("client1").is_err());
    }

    // Recovery
    assert!(limiter.check_request("client1").unwrap().allowed);
}
}

5. Cleanup Tests

Test memory management:

#![allow(unused)]
fn main() {
#[test]
fn test_cleanup_stale_clients() {
    let clock = TestClock::new(0.0);
    let limiter = FluxLimiter::with_config(
        FluxLimiterConfig::new(10.0, 5.0),
        clock.clone(),
    ).unwrap();

    // Create some client state
    limiter.check_request("client1").unwrap();
    limiter.check_request("client2").unwrap();
    limiter.check_request("client3").unwrap();

    // Advance time by 1 hour
    clock.advance(3600.0);

    // Cleanup clients older than 30 minutes
    let threshold = 30 * 60 * 1_000_000_000u64;
    let removed = limiter.cleanup_stale_clients(threshold).unwrap();

    assert_eq!(removed, 3);
}
}

6. Concurrency Tests

Test thread safety:

#![allow(unused)]
fn main() {
#[test]
fn test_concurrent_access() {
    use std::sync::Arc;
    use std::thread;

    let config = FluxLimiterConfig::new(100.0, 50.0);
    let limiter = Arc::new(
        FluxLimiter::with_config(config, SystemClock).unwrap()
    );

    let handles: Vec<_> = (0..10)
        .map(|i| {
            let limiter = Arc::clone(&limiter);
            thread::spawn(move || {
                for j in 0..1000 {
                    let client_id = format!("client_{}_{}", i, j);
                    limiter.check_request(client_id).unwrap();
                }
            })
        })
        .collect();

    for handle in handles {
        handle.join().unwrap();
    }
}
}

7. Decision Metadata Tests

Verify decision metadata accuracy:

#![allow(unused)]
fn main() {
#[test]
fn test_retry_after_metadata() {
    let clock = TestClock::new(0.0);
    let config = FluxLimiterConfig::new(10.0, 0.0);
    let limiter = FluxLimiter::with_config(config, clock.clone()).unwrap();

    // First request allowed
    limiter.check_request("client1").unwrap();

    // Second request denied
    let decision = limiter.check_request("client1").unwrap();
    assert!(!decision.allowed);

    // Verify retry_after is approximately 0.1 seconds
    let retry_after = decision.retry_after_seconds.unwrap();
    assert!((retry_after - 0.1).abs() < 0.001);
}

#[test]
fn test_remaining_capacity() {
    let clock = TestClock::new(0.0);
    let config = FluxLimiterConfig::new(10.0, 5.0);
    let limiter = FluxLimiter::with_config(config, clock.clone()).unwrap();

    // First request
    let decision = limiter.check_request("client1").unwrap();
    assert!(decision.allowed);

    // Should have some remaining capacity
    assert!(decision.remaining_capacity.is_some());

    // Make more requests and verify capacity decreases
    for _ in 0..5 {
        limiter.check_request("client1").unwrap();
    }

    // Capacity should be depleted
    let decision = limiter.check_request("client1").unwrap();
    assert!(!decision.allowed);
}
}

Performance Testing

Latency Benchmarks

#![allow(unused)]
fn main() {
#[cfg(test)]
mod benchmarks {
    use super::*;
    use std::time::Instant;

    #[test]
    fn bench_check_request_latency() {
        let limiter = FluxLimiter::with_config(
            FluxLimiterConfig::new(1000.0, 500.0),
            SystemClock,
        ).unwrap();

        let iterations = 100_000;
        let start = Instant::now();

        for i in 0..iterations {
            let client_id = format!("client_{}", i % 1000);
            limiter.check_request(client_id).unwrap();
        }

        let elapsed = start.elapsed();
        let avg_latency = elapsed.as_nanos() / iterations;

        println!("Average latency: {}ns", avg_latency);
        assert!(avg_latency < 1000); // Should be under 1μs
    }
}
}

Throughput Tests

#![allow(unused)]
fn main() {
#[test]
fn test_throughput() {
    let limiter = Arc::new(
        FluxLimiter::with_config(
            FluxLimiterConfig::new(10_000.0, 5_000.0),
            SystemClock,
        ).unwrap()
    );

    let start = Instant::now();
    let threads = 8;
    let requests_per_thread = 100_000;

    let handles: Vec<_> = (0..threads)
        .map(|t| {
            let limiter = Arc::clone(&limiter);
            thread::spawn(move || {
                for i in 0..requests_per_thread {
                    let client_id = format!("client_{}_{}", t, i % 1000);
                    limiter.check_request(client_id).unwrap();
                }
            })
        })
        .collect();

    for handle in handles {
        handle.join().unwrap();
    }

    let elapsed = start.elapsed();
    let total_requests = threads * requests_per_thread;
    let throughput = total_requests as f64 / elapsed.as_secs_f64();

    println!("Throughput: {:.2} req/s", throughput);
}
}

Test Utilities

Helper Functions

#![allow(unused)]
fn main() {
fn assert_allowed(result: Result<FluxLimiterDecision, FluxLimiterError>) {
    match result {
        Ok(decision) => assert!(decision.allowed, "Expected request to be allowed"),
        Err(e) => panic!("Expected allowed decision, got error: {:?}", e),
    }
}

fn assert_denied(result: Result<FluxLimiterDecision, FluxLimiterError>) {
    match result {
        Ok(decision) => assert!(!decision.allowed, "Expected request to be denied"),
        Err(e) => panic!("Expected denied decision, got error: {:?}", e),
    }
}

fn assert_error<T>(result: Result<T, FluxLimiterError>) {
    assert!(result.is_err(), "Expected error, got success");
}
}

Test Fixtures

#![allow(unused)]
fn main() {
fn create_test_limiter(rate: f64, burst: f64) -> (FluxLimiter<String, TestClock>, TestClock) {
    let clock = TestClock::new(0.0);
    let config = FluxLimiterConfig::new(rate, burst);
    let limiter = FluxLimiter::with_config(config, clock.clone()).unwrap();
    (limiter, clock)
}
}

Test Coverage

Aim for comprehensive coverage:

  • ✅ Algorithm correctness
  • ✅ Configuration validation
  • ✅ Error handling and recovery
  • ✅ Concurrency safety
  • ✅ Memory management
  • ✅ Decision metadata accuracy
  • ✅ Performance characteristics
  • ✅ Edge cases and boundary conditions

Best Practices

  1. Use TestClock for deterministic time control
  2. Test Error Paths including clock failures
  3. Verify Metadata not just allow/deny
  4. Test Concurrency with multiple threads
  5. Measure Performance with benchmarks
  6. Test Edge Cases like zero burst, high rates
  7. Cleanup After Tests to avoid state leakage

Next Steps

Design Decisions

Detailed rationale behind key architectural choices in Flux Limiter.

Algorithm Selection: GCRA vs Token Bucket

Decision: Use GCRA

Rationale: The Generic Cell Rate Algorithm (GCRA) provides mathematical equivalence to Token Bucket with several implementation advantages.

GCRA Advantages

Exact Mathematical Precision

  • Integer arithmetic with u64 nanoseconds
  • No floating-point precision loss
  • No accumulating rounding errors

Stateless Calculation

  • No background token refill processes
  • No timer management overhead
  • Simpler implementation

Efficient State

  • One timestamp (u64) per client
  • vs. token count + last refill time
  • Smaller memory footprint

Deterministic Behavior

  • Exact timing with integer arithmetic
  • Predictable results in tests
  • No timer drift

Token Bucket Drawbacks

Floating-Point Precision

  • Token counts as floats
  • Accumulating rounding errors
  • Precision loss over time

Background Processes

  • Requires token refill mechanism
  • Timer management complexity
  • Potential drift issues

Complex State

  • Token count + timestamp
  • More memory per client
  • More complex updates

Mathematical Equivalence

Both algorithms produce the same rate limiting behavior:

GCRA: Request allowed if current_time >= TAT - tolerance
Token Bucket: Request allowed if tokens >= 1

The difference is implementation, not behavior.

Data Structure: DashMap

Decision: Use DashMap for Client State

Rationale: DashMap provides the best balance of performance, correctness, and ergonomics.

DashMap Benefits

Lock-Free Concurrency

  • Segmented locking
  • Different shards = no contention
  • Near-linear scalability

Battle-Tested

  • Mature library
  • Well-documented
  • Proven in production

Good Performance

  • ~100ns for get/insert
  • Minimal overhead
  • Efficient memory layout

Ergonomic API

  • Similar to std::HashMap
  • Easy to use correctly
  • Clear semantics

Alternative: Mutex

Global Locking

  • All operations serialize
  • Poor scalability
  • Read contention

Why rejected: Unacceptable performance bottleneck for concurrent access.

Alternative: RwLock

Reader/Writer Contention

  • Writers block all readers
  • Write starvation possible
  • Complex lock management

Why rejected: Still has contention issues, doesn’t scale well.

Alternative: Custom Lock-Free Map

Implementation Complexity

  • Requires deep concurrency expertise
  • Bug-prone
  • High maintenance burden

Maturity Risk

  • Unproven in production
  • Potential correctness issues
  • Performance unknowns

Why rejected: Not worth the complexity and risk when DashMap exists.

Time Representation: Nanoseconds

Decision: Use u64 Nanoseconds

Rationale: Nanoseconds provide maximum precision with efficient integer arithmetic.

Benefits

Maximum Precision

  • Supports very high rates (1B req/s)
  • No precision loss
  • Exact calculations

Integer Arithmetic

  • Faster than floating-point
  • No rounding errors
  • Deterministic results

No Overflow

  • u64::MAX nanoseconds = 584 years
  • Sufficient for any realistic usage
  • Saturating arithmetic for safety

System Time Compatible

  • Direct conversion from SystemTime
  • No conversion overhead
  • Natural representation

Alternative: Milliseconds

Insufficient Precision

  • Can’t support rates > 1000 req/s precisely
  • Precision loss for high rates
  • Rounding errors

Why rejected: Limits maximum rate and precision.

Alternative: Duration Type

Memory Overhead

  • Larger struct (2x u64)
  • More complex arithmetic
  • Cache inefficient

Performance

  • More expensive operations
  • Additional abstractions
  • Not necessary

Why rejected: Overhead without benefit for our use case.

Alternative: f64 Seconds

Floating-Point Issues

  • Precision loss
  • Rounding errors
  • Non-deterministic

Performance

  • Slower arithmetic
  • Cache less friendly
  • Conversion overhead

Why rejected: Precision and performance concerns.

Error Handling: Result Types

Decision: Use Result for All Errors

Rationale: Explicit error handling enables graceful degradation and better observability.

Benefits

Explicit Error Handling

  • Caller must handle errors
  • No silent failures
  • Clear error paths

Graceful Degradation

  • Application chooses policy
  • Fail-open or fail-closed
  • Fallback strategies possible

Better Observability

  • Errors can be logged
  • Metrics can be collected
  • Debugging easier

Production-Ready

  • Handles clock failures
  • Recoverable errors
  • Robust operation

Alternative: Panics

Difficult Recovery

  • Can’t recover from panic
  • Crashes entire application
  • Poor user experience

Poor Observability

  • Hard to monitor
  • Difficult to debug
  • No graceful degradation

Not Production-Ready

  • Unacceptable for library code
  • Forces policy on users
  • Fragile

Why rejected: Panics are inappropriate for library code and prevent graceful error handling.

Alternative: Silent Failures (allow on error)

Hidden Bugs

  • Errors go unnoticed
  • Hard to debug
  • Incorrect behavior

No Observability

  • Can’t track error rates
  • No alerting possible
  • Silent degradation

Why rejected: Silent failures make debugging impossible and hide problems.

Generic Design: Client ID Types

Decision: Generic Client ID (T: Hash + Eq + Clone)

Rationale: Generics provide flexibility while maintaining type safety and performance.

Benefits

Flexibility

  • String: User IDs, API keys
  • IpAddr: IP-based limiting
  • u64: Numeric IDs
  • Custom types: Complex scenarios

Zero-Cost Abstraction

  • No runtime overhead
  • No boxing/dynamic dispatch
  • Compile-time optimization

Type Safety

  • Compile-time type checking
  • Can’t mix different ID types
  • Clear API

Performance

  • Monomorphization
  • Inlined operations
  • Optimal code generation

Alternative: Trait Object (dyn ClientId)

Runtime Overhead

  • Dynamic dispatch
  • Heap allocation (Box)
  • Slower performance

Less Ergonomic

  • Requires trait implementations
  • More verbose usage
  • Lifetime complexity

Why rejected: Performance overhead without significant benefit.

Alternative: String Only

Limited Flexibility

  • Forces string conversion
  • Allocation overhead
  • Can’t optimize for numeric IDs

Performance

  • Always allocates
  • Slower hash/compare
  • Memory overhead

Why rejected: Generics provide flexibility without cost.

Clock Abstraction

Decision: Clock Trait

Rationale: Abstracting time enables testing and handles real-world clock issues.

Benefits

Testable

  • TestClock for deterministic tests
  • Precise time control
  • Failure simulation

Handles Real-World Issues

  • Clock going backwards
  • System suspend/resume
  • Virtualization issues

Flexible

  • Custom time sources
  • Mock implementations
  • Production/test separation

Error Handling

  • Graceful clock failures
  • Explicit error types
  • Recoverable

Alternative: Direct SystemTime

Not Testable

  • Can’t control time in tests
  • Non-deterministic tests
  • Hard to test edge cases

No Error Handling

  • SystemTime can fail
  • Can’t recover
  • Silent failures

Why rejected: Testing and error handling require abstraction.

Memory Management: Manual Cleanup

Decision: Explicit cleanup_stale_clients()

Rationale: Manual cleanup gives users control over memory/CPU tradeoff.

Benefits

User Control

  • Choose cleanup frequency
  • Choose threshold
  • Tune for workload

No Background Threads

  • Simpler implementation
  • No thread overhead
  • User controls when

Predictable Performance

  • Cleanup only when called
  • No surprising pauses
  • Controllable overhead

Alternative: Automatic Background Cleanup

Thread Overhead

  • Requires background thread
  • Thread management complexity
  • Resource usage

Loss of Control

  • Can’t control timing
  • Fixed thresholds
  • May not fit all workloads

Complexity

  • Thread lifecycle management
  • Shutdown coordination
  • Error handling

Why rejected: Manual cleanup is simpler and more flexible.

Alternative: Reference Counting

Incorrect Semantics

  • Clients don’t have clear ownership
  • Memory leak if client keeps requesting
  • Doesn’t solve problem

Why rejected: Doesn’t match use case.

Configuration: Builder Pattern

Decision: Support Builder Pattern

Rationale: Builder pattern provides ergonomic API while maintaining validation.

Benefits

Ergonomic

  • Clear, readable code
  • Method chaining
  • Self-documenting

Flexible

  • Can set individual fields
  • Start with defaults
  • Override as needed

Validation

  • Validate at build time
  • Clear error messages
  • Fail fast

Example

#![allow(unused)]
fn main() {
let config = FluxLimiterConfig::new(0.0, 0.0)
    .rate(100.0)
    .burst(50.0);
}

vs.

#![allow(unused)]
fn main() {
let config = FluxLimiterConfig::new(100.0, 50.0);
}

Both are supported.

Decision Summary

DecisionChosenRejectedRationale
AlgorithmGCRAToken BucketPrecision, simplicity
StorageDashMapMutex, RwLockPerformance, scalability
Timeu64 nanosms, Duration, f64Precision, performance
ErrorsResultPanic, SilentObservability, recovery
Client IDGenericTrait object, StringFlexibility, performance
ClockTraitDirect SystemTimeTesting, error handling
CleanupManualAutomaticControl, simplicity
ConfigBuilderConstructor onlyErgonomics

Next Steps

Future Extensibility

Planned architecture enhancements and extension points.

Planned Features

1. Async Support

Add async variants of key methods for better integration with async runtimes.

#![allow(unused)]
fn main() {
impl<T, C> FluxLimiter<T, C>
where
    T: Hash + Eq + Clone + Send + Sync,
    C: Clock + Send + Sync,
{
    pub async fn check_request_async(
        &self,
        client_id: T,
    ) -> Result<FluxLimiterDecision, FluxLimiterError> {
        // Non-blocking implementation
        // Async clock sources
        // Future-based cleanup
    }

    pub async fn cleanup_stale_clients_async(
        &self,
        stale_threshold_nanos: u64,
    ) -> Result<usize, FluxLimiterError> {
        // Async cleanup without blocking
    }
}
}

Benefits:

  • Better integration with Tokio/async-std
  • Non-blocking I/O for distributed backends
  • Async clock sources

Implementation Strategy:

  • Feature flag: async
  • Separate module: flux_limiter::async
  • Maintain backward compatibility

2. Persistence Layer

Support for persisting rate limit state across restarts.

#![allow(unused)]
fn main() {
#[async_trait]
pub trait StateStore: Send + Sync {
    async fn load_state(
        &self,
        client_id: &str,
    ) -> Result<Option<u64>, StoreError>;

    async fn save_state(
        &self,
        client_id: &str,
        tat: u64,
    ) -> Result<(), StoreError>;

    async fn cleanup_stale(
        &self,
        threshold: u64,
    ) -> Result<usize, StoreError>;
}
}

Implementations:

Redis Backend

#![allow(unused)]
fn main() {
pub struct RedisStateStore {
    client: redis::Client,
    key_prefix: String,
}

impl StateStore for RedisStateStore {
    async fn load_state(&self, client_id: &str) -> Result<Option<u64>, StoreError> {
        let key = format!("{}:{}", self.key_prefix, client_id);
        // Load TAT from Redis
    }

    async fn save_state(&self, client_id: &str, tat: u64) -> Result<(), StoreError> {
        let key = format!("{}:{}", self.key_prefix, client_id);
        // Save TAT to Redis with TTL
    }
}
}

Database Backend

#![allow(unused)]
fn main() {
pub struct DatabaseStateStore {
    pool: sqlx::PgPool,
}

impl StateStore for DatabaseStateStore {
    async fn load_state(&self, client_id: &str) -> Result<Option<u64>, StoreError> {
        sqlx::query_scalar("SELECT tat FROM rate_limits WHERE client_id = $1")
            .bind(client_id)
            .fetch_optional(&self.pool)
            .await
            .map_err(Into::into)
    }
}
}

Benefits:

  • State survives restarts
  • Distributed rate limiting
  • Shared state across instances

Implementation Strategy:

  • Feature flags: redis, postgres, etc.
  • Trait-based design
  • Fallback to in-memory

3. Observability Integration

Built-in metrics and tracing support.

#![allow(unused)]
fn main() {
pub trait RateLimiterMetrics: Send + Sync {
    fn record_decision(
        &self,
        client_id: &str,
        allowed: bool,
        latency: Duration,
    );

    fn record_error(&self, error: &FluxLimiterError);

    fn record_cleanup(&self, removed: usize, duration: Duration);
}
}

Implementations:

Prometheus Metrics

#![allow(unused)]
fn main() {
pub struct PrometheusMetrics {
    requests_total: IntCounterVec,
    requests_allowed: IntCounterVec,
    requests_denied: IntCounterVec,
    check_duration: HistogramVec,
    clock_errors: IntCounter,
}

impl RateLimiterMetrics for PrometheusMetrics {
    fn record_decision(&self, client_id: &str, allowed: bool, latency: Duration) {
        self.requests_total.with_label_values(&[client_id]).inc();

        if allowed {
            self.requests_allowed.with_label_values(&[client_id]).inc();
        } else {
            self.requests_denied.with_label_values(&[client_id]).inc();
        }

        self.check_duration
            .with_label_values(&[client_id])
            .observe(latency.as_secs_f64());
    }
}
}

OpenTelemetry Tracing

#![allow(unused)]
fn main() {
pub struct OpenTelemetryMetrics {
    meter: Meter,
}

impl RateLimiterMetrics for OpenTelemetryMetrics {
    fn record_decision(&self, client_id: &str, allowed: bool, latency: Duration) {
        let span = tracing::span!(
            tracing::Level::INFO,
            "rate_limit_check",
            client_id = %client_id,
            allowed = allowed,
            latency_us = latency.as_micros() as i64,
        );
        let _enter = span.enter();
    }
}
}

Benefits:

  • Built-in monitoring
  • Standard metrics
  • Distributed tracing

Implementation Strategy:

  • Feature flags: prometheus, opentelemetry
  • Optional trait implementations
  • Zero-cost when disabled

4. Alternative Algorithms

Support for multiple rate limiting algorithms.

#![allow(unused)]
fn main() {
pub trait RateLimitAlgorithm<T>: Send + Sync {
    fn check_request(
        &self,
        client_id: &T,
        current_time: u64,
    ) -> FluxLimiterDecision;

    fn cleanup_stale(
        &self,
        threshold: u64,
    ) -> Result<usize, FluxLimiterError>;
}
}

Implementations:

Token Bucket

#![allow(unused)]
fn main() {
pub struct TokenBucketAlgorithm {
    rate: f64,
    capacity: f64,
    client_state: DashMap<String, TokenBucketState>,
}

struct TokenBucketState {
    tokens: f64,
    last_refill: u64,
}
}

Sliding Window

#![allow(unused)]
fn main() {
pub struct SlidingWindowAlgorithm {
    window_size_nanos: u64,
    max_requests: u64,
    client_state: DashMap<String, VecDeque<u64>>,
}
}

Leaky Bucket

#![allow(unused)]
fn main() {
pub struct LeakyBucketAlgorithm {
    rate: f64,
    capacity: u64,
    client_state: DashMap<String, LeakyBucketState>,
}
}

Benefits:

  • Algorithm flexibility
  • Use case specific optimization
  • Easy comparison

Implementation Strategy:

  • Feature flag: algorithms
  • Separate modules
  • Common trait interface

5. Distributed Rate Limiting

Coordinate rate limits across multiple instances.

#![allow(unused)]
fn main() {
pub struct DistributedFluxLimiter<T, C, S>
where
    T: Hash + Eq + Clone + Send + Sync,
    C: Clock,
    S: StateStore,
{
    local: FluxLimiter<T, C>,
    store: S,
    sync_interval: Duration,
}

impl<T, C, S> DistributedFluxLimiter<T, C, S> {
    pub async fn check_request(
        &self,
        client_id: T,
    ) -> Result<FluxLimiterDecision, FluxLimiterError> {
        // 1. Check local cache
        // 2. If miss, fetch from store
        // 3. Make decision
        // 4. Update both local and remote
    }

    async fn sync_state(&self) -> Result<(), StoreError> {
        // Periodic sync with remote store
    }
}
}

Strategies:

  • Best-effort: Local-first with periodic sync
  • Strict: Remote check every time (slower but accurate)
  • Hybrid: Local fast path, remote for edge cases

Benefits:

  • Multi-instance coordination
  • Cluster-wide rate limits
  • Horizontal scaling

6. Adaptive Rate Limiting

Dynamically adjust rates based on backend health.

#![allow(unused)]
fn main() {
pub struct AdaptiveRateLimiter<T, C, H>
where
    T: Hash + Eq + Clone,
    C: Clock,
    H: HealthCheck,
{
    base_config: FluxLimiterConfig,
    limiter: RwLock<FluxLimiter<T, C>>,
    health_check: H,
    adjustment_factor: AtomicU64, // Fixed-point percentage
}

impl<T, C, H> AdaptiveRateLimiter<T, C, H> {
    pub async fn adjust_rate(&self) -> Result<(), FluxLimiterError> {
        let health = self.health_check.check().await;

        let new_factor = match health {
            Health::Good => 1.0,      // Normal rate
            Health::Degraded => 0.7,  // 70% of normal
            Health::Bad => 0.3,       // 30% of normal
        };

        // Update limiter with adjusted rate
        let new_rate = self.base_config.rate() * new_factor;
        // ... recreate limiter ...
    }
}
}

Benefits:

  • Protect degraded backends
  • Auto-recovery
  • Dynamic adjustment

7. Quota Management

Track cumulative usage over longer periods.

#![allow(unused)]
fn main() {
pub struct QuotaManager<T>
where
    T: Hash + Eq + Clone,
{
    limiter: FluxLimiter<T, SystemClock>,
    quotas: DashMap<T, QuotaState>,
}

struct QuotaState {
    total_allowed: u64,
    period_start: u64,
    quota_limit: u64,
}

impl<T> QuotaManager<T> {
    pub fn check_request(
        &self,
        client_id: T,
    ) -> Result<QuotaDecision, FluxLimiterError> {
        // 1. Check rate limit
        // 2. Check quota
        // 3. Update quota usage
        // 4. Return combined decision
    }
}
}

Benefits:

  • Long-term limits (daily, monthly)
  • Billing integration
  • Multi-tier quotas

Extensibility Patterns

Plugin Architecture

#![allow(unused)]
fn main() {
pub trait FluxLimiterPlugin: Send + Sync {
    fn before_check(&self, client_id: &str);
    fn after_check(&self, client_id: &str, decision: &FluxLimiterDecision);
    fn on_error(&self, client_id: &str, error: &FluxLimiterError);
}

pub struct PluggableFluxLimiter<T, C>
where
    T: Hash + Eq + Clone,
    C: Clock,
{
    limiter: FluxLimiter<T, C>,
    plugins: Vec<Box<dyn FluxLimiterPlugin>>,
}
}

Middleware Pattern

#![allow(unused)]
fn main() {
pub trait RateLimitMiddleware {
    fn wrap(
        &self,
        inner: Box<dyn Fn(&str) -> Result<FluxLimiterDecision, FluxLimiterError>>,
    ) -> Box<dyn Fn(&str) -> Result<FluxLimiterDecision, FluxLimiterError>>;
}

// Example: Logging middleware
pub struct LoggingMiddleware;

impl RateLimitMiddleware for LoggingMiddleware {
    fn wrap(&self, inner: Box<dyn Fn(&str) -> ...>) -> Box<dyn Fn(&str) -> ...> {
        Box::new(move |client_id| {
            info!("Checking rate limit for {}", client_id);
            let result = inner(client_id);
            info!("Result: {:?}", result);
            result
        })
    }
}
}

Backward Compatibility Strategy

Semantic Versioning

  • Patch (0.4.x): Bug fixes, performance improvements
  • Minor (0.x.0): New features, backward compatible
  • Major (x.0.0): Breaking changes

Feature Flags

All new features behind feature flags:

[features]
default = []
async = ["tokio"]
redis = ["redis", "async"]
prometheus = ["prometheus"]
opentelemetry = ["opentelemetry"]
algorithms = []

Deprecation Path

  1. Deprecate: Mark old API with #[deprecated]
  2. Transition Period: Support both old and new (2-3 minor versions)
  3. Remove: Remove in next major version

Type Aliases

Maintain existing API surface:

#![allow(unused)]
fn main() {
// Existing users can continue using this
pub type FluxLimiter<T> = FluxLimiter<T, SystemClock>;

// New users can use explicit clock type
pub type FluxLimiter<T, C> = core::FluxLimiter<T, C>;
}

Implementation Roadmap

Phase 1: Core Improvements (v0.5)

  • Async support
  • Enhanced metrics
  • Performance optimizations

Phase 2: Persistence (v0.6)

  • State store trait
  • Redis backend
  • PostgreSQL backend

Phase 3: Advanced Features (v0.7)

  • Alternative algorithms
  • Distributed coordination
  • Adaptive rate limiting

Phase 4: Enterprise Features (v1.0)

  • Quota management
  • Multi-tier support
  • Advanced monitoring

Contributing

We welcome contributions! Areas of interest:

  1. Performance: Benchmarks and optimizations
  2. Backends: New state store implementations
  3. Metrics: Monitoring integrations
  4. Algorithms: Alternative rate limiting strategies
  5. Documentation: Examples and guides
  6. Testing: More comprehensive tests

Next Steps