Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Concurrency Model

Understanding the lock-free concurrency design in Flux Limiter.

Lock-Free Design Philosophy

Flux Limiter is built on a lock-free concurrency model for maximum performance:

Multiple Threads
      ↓
FluxLimiter (Shared via Arc)
      ↓
Arc<DashMap<ClientId, TAT>>
      ↓
Lock-free hash map operations

Thread Safety Guarantees

1. Read Operations

Multiple concurrent readers without contention:

#![allow(unused)]
fn main() {
// Thread 1
limiter.check_request("client_1")?;

// Thread 2 (concurrent)
limiter.check_request("client_2")?;

// No lock contention - different shards
}

2. Write Operations

Atomic updates with minimal contention:

#![allow(unused)]
fn main() {
// Atomic TAT update
client_state.insert(client_id, new_tat);
}

Each client’s state is independent, allowing parallel updates.

3. Memory Ordering

Relaxed ordering is sufficient for TAT updates:

  • TAT values are monotonically increasing
  • No cross-client dependencies
  • No ABA problem

4. Concurrent Access Patterns

Same Client, Different Threads:

#![allow(unused)]
fn main() {
// Thread 1
limiter.check_request("client_1")?; // TAT = 100ms

// Thread 2 (concurrent)
limiter.check_request("client_1")?; // TAT = 200ms

// DashMap serializes access to same key
// Ordering determined by DashMap's internal locking
}

Different Clients, Different Threads:

#![allow(unused)]
fn main() {
// Thread 1
limiter.check_request("client_1")?;

// Thread 2 (concurrent)
limiter.check_request("client_2")?;

// Lock-free - different shards, no contention
}

DashMap: The Core Concurrency Primitive

Why DashMap?

DashMap provides lock-free concurrent access through sharding:

DashMap<ClientId, TAT>
    ├─ Shard 0: HashMap + RwLock
    ├─ Shard 1: HashMap + RwLock
    ├─ Shard 2: HashMap + RwLock
    ├─ ...
    └─ Shard N: HashMap + RwLock

Benefits:

  • Better than std::HashMap + Mutex: Avoids global locking
  • Better than RwLock: No reader/writer contention
  • Better than custom lock-free map: Mature, battle-tested
  • Segmented locking: Reduces contention vs. single lock

Sharding Strategy

#![allow(unused)]
fn main() {
// Client IDs are hashed to shards
shard_index = hash(client_id) % num_shards

// Different clients likely in different shards
// Same client always in same shard (consistency)
}

Operations on DashMap

Get (Read):

#![allow(unused)]
fn main() {
if let Some(previous_tat) = client_state.get(&client_id) {
    // Read lock on single shard
    // Other shards remain accessible
}
}

Insert (Write):

#![allow(unused)]
fn main() {
client_state.insert(client_id, new_tat);
// Write lock on single shard
// Other shards remain accessible
}

Cleanup (Iteration):

#![allow(unused)]
fn main() {
client_state.retain(|_, &mut tat| {
    // Locks each shard sequentially
    // Short lock duration per shard
    current_time - tat < threshold
});
}

Memory Consistency

Happens-Before Relationships

DashMap provides happens-before guarantees:

#![allow(unused)]
fn main() {
// Thread 1
limiter.check_request("client_1")?; // Writes TAT = 100ms

// Thread 2 (later)
limiter.check_request("client_1")?; // Reads TAT >= 100ms
}

The write in Thread 1 happens-before the read in Thread 2.

Visibility Guarantees

All updates to client state are immediately visible:

#![allow(unused)]
fn main() {
// Thread 1
client_state.insert("client_1", 100);

// Thread 2
let tat = client_state.get("client_1"); // Sees 100
}

DashMap ensures proper memory synchronization.

Sharing Across Threads

Arc for Shared Ownership

#![allow(unused)]
fn main() {
use std::sync::Arc;

let config = FluxLimiterConfig::new(100.0, 50.0);
let limiter = Arc::new(FluxLimiter::with_config(config, SystemClock)?);

// Clone Arc for each thread (cheap)
let limiter1 = Arc::clone(&limiter);
let limiter2 = Arc::clone(&limiter);

// Use in different threads
thread::spawn(move || limiter1.check_request("client_1"));
thread::spawn(move || limiter2.check_request("client_2"));
}

Arc Benefits:

  • Reference counting with atomic operations
  • Thread-safe shared ownership
  • Cheap to clone (increments counter)
  • No data duplication

Internal Arc Usage

#![allow(unused)]
fn main() {
pub struct FluxLimiter<T, C> {
    // ...
    client_state: Arc<DashMap<T, u64>>, // Shared across clones
    // ...
}
}

Cloning FluxLimiter shares the same state:

#![allow(unused)]
fn main() {
let limiter1 = limiter.clone();
let limiter2 = limiter.clone();

// Both share the same client_state
// Updates visible to both
}

Contention Analysis

Low Contention Scenarios

Different clients, concurrent access:

Threads: [T1: client_1] [T2: client_2] [T3: client_3]
Shards:  [Shard 0    ] [Shard 1    ] [Shard 2    ]
Result:  Lock-free, no contention

Medium Contention Scenarios

Same shard, different clients:

Threads: [T1: client_1] [T2: client_100]
Shards:  [Shard 0    ] [Shard 0      ]
Result:  Some contention, but short lock duration

High Contention Scenarios

Same client, concurrent access:

Threads: [T1: client_1] [T2: client_1] [T3: client_1]
Shards:  [Shard 0    ] [Shard 0    ] [Shard 0    ]
Result:  Serialized access (expected for consistency)

This is correct behavior: Rate limiting the same client from multiple threads should be serialized to maintain correctness.

Performance Under Concurrency

Scalability Characteristics

Threads  Throughput  Latency
1        1.0x        1.0μs
2        1.9x        1.0μs
4        3.7x        1.1μs
8        7.0x        1.2μs
16       12.5x       1.3μs

Nearly linear scalability with different clients.

Benchmark: Concurrent Access

#![allow(unused)]
fn main() {
#[bench]
fn bench_concurrent_different_clients(b: &mut Bencher) {
    let limiter = Arc::new(FluxLimiter::with_config(
        FluxLimiterConfig::new(1000.0, 500.0),
        SystemClock
    ).unwrap());

    b.iter(|| {
        let handles: Vec<_> = (0..8)
            .map(|i| {
                let limiter = Arc::clone(&limiter);
                thread::spawn(move || {
                    for j in 0..1000 {
                        let client_id = format!("client_{}_{}", i, j);
                        limiter.check_request(client_id).unwrap();
                    }
                })
            })
            .collect();

        for handle in handles {
            handle.join().unwrap();
        }
    });
}
}

Clock Thread Safety

The Clock trait requires Send + Sync:

#![allow(unused)]
fn main() {
pub trait Clock: Send + Sync {
    fn now(&self) -> Result<u64, ClockError>;
}
}

SystemClock:

#![allow(unused)]
fn main() {
impl Clock for SystemClock {
    fn now(&self) -> Result<u64, ClockError> {
        // SystemTime::now() is thread-safe
        SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .map(|d| d.as_nanos() as u64)
            .map_err(|_| ClockError::SystemTimeError)
    }
}
}

TestClock:

#![allow(unused)]
fn main() {
pub struct TestClock {
    time: Arc<AtomicU64>,        // Atomic for thread safety
    should_fail: Arc<AtomicBool>, // Atomic for thread safety
}
}

Both implementations are thread-safe.

Cleanup Concurrency

Cleanup can run concurrently with rate limiting:

#![allow(unused)]
fn main() {
// Thread 1: Rate limiting
limiter.check_request("client_1")?;

// Thread 2: Cleanup (concurrent)
limiter.cleanup_stale_clients(threshold)?;

// Thread 3: More rate limiting (concurrent)
limiter.check_request("client_2")?;
}

DashMap’s retain method:

  • Locks each shard briefly
  • Other shards remain accessible
  • Minimal impact on concurrent operations

Best Practices

1. Share via Arc

#![allow(unused)]
fn main() {
// Good: Share limiter across threads
let limiter = Arc::new(FluxLimiter::with_config(config, clock)?);

// Bad: Create multiple limiters (separate state)
let limiter1 = FluxLimiter::with_config(config, clock1)?;
let limiter2 = FluxLimiter::with_config(config, clock2)?;
}

2. Use Different Client IDs

#![allow(unused)]
fn main() {
// Good: Different clients = low contention
for i in 0..1000 {
    limiter.check_request(format!("client_{}", i))?;
}

// Bad: Same client = serialized access
for _ in 0..1000 {
    limiter.check_request("same_client")?;
}
}

3. Avoid Blocking in Callbacks

#![allow(unused)]
fn main() {
// Good: Quick check
match limiter.check_request(client_id) {
    Ok(decision) => decision.allowed,
    Err(_) => false,
}

// Bad: Blocking I/O while holding client state
match limiter.check_request(client_id) {
    Ok(decision) => {
        // Don't do expensive work here
        database.log_decision(decision)?; // Blocks!
        decision.allowed
    }
    Err(_) => false,
}
}

4. Periodic Cleanup

#![allow(unused)]
fn main() {
// Background cleanup task
tokio::spawn(async move {
    let mut interval = interval(Duration::from_secs(3600));
    loop {
        interval.tick().await;
        let _ = limiter.cleanup_stale_clients(threshold);
    }
});
}

Next Steps