Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Web Framework Integration

Learn how to integrate Flux Limiter with popular web frameworks.

Axum Integration

Basic Middleware

#![allow(unused)]
fn main() {
use axum::{
    extract::State,
    http::{Request, StatusCode, HeaderMap},
    middleware::Next,
    response::{Response, IntoResponse},
};
use flux_limiter::{FluxLimiter, FluxLimiterError, SystemClock};
use std::sync::Arc;

type SharedLimiter = Arc<FluxLimiter<String, SystemClock>>;

async fn rate_limit_middleware<B>(
    State(limiter): State<SharedLimiter>,
    request: Request<B>,
    next: Next<B>,
) -> Result<Response, (StatusCode, HeaderMap, &'static str)> {
    // Extract client ID (e.g., from IP or auth header)
    let client_ip = extract_client_ip(&request);

    match limiter.check_request(client_ip.clone()) {
        Ok(decision) if decision.allowed => {
            // Add rate limit headers
            let mut response = next.run(request).await;
            if let Some(remaining) = decision.remaining_capacity {
                response.headers_mut().insert(
                    "X-RateLimit-Remaining",
                    remaining.to_string().parse().unwrap(),
                );
            }
            Ok(response)
        }
        Ok(decision) => {
            // Rate limited
            let mut headers = HeaderMap::new();
            if let Some(retry_after) = decision.retry_after_seconds {
                headers.insert(
                    "Retry-After",
                    (retry_after.ceil() as u64).to_string().parse().unwrap(),
                );
            }
            headers.insert("X-RateLimit-Remaining", "0".parse().unwrap());

            Err((StatusCode::TOO_MANY_REQUESTS, headers, "Rate limited"))
        }
        Err(FluxLimiterError::ClockError(_)) => {
            // Fail-open policy
            eprintln!("Rate limiter clock error - allowing request");
            Ok(next.run(request).await)
        }
        Err(e) => {
            eprintln!("Rate limiter error: {}", e);
            Err((StatusCode::INTERNAL_SERVER_ERROR, HeaderMap::new(), "Internal error"))
        }
    }
}

fn extract_client_ip<B>(request: &Request<B>) -> String {
    request
        .headers()
        .get("X-Forwarded-For")
        .and_then(|h| h.to_str().ok())
        .and_then(|s| s.split(',').next())
        .unwrap_or("unknown")
        .to_string()
}
}

Complete Axum Example

use axum::{
    routing::get,
    Router,
    middleware,
};
use flux_limiter::{FluxLimiter, FluxLimiterConfig, SystemClock};
use std::sync::Arc;

#[tokio::main]
async fn main() {
    // Create rate limiter
    let config = FluxLimiterConfig::new(10.0, 5.0);
    let limiter = Arc::new(FluxLimiter::with_config(config, SystemClock).unwrap());

    // Build router with middleware
    let app = Router::new()
        .route("/api/data", get(handler))
        .layer(middleware::from_fn_with_state(
            limiter.clone(),
            rate_limit_middleware,
        ))
        .with_state(limiter);

    // Run server
    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000")
        .await
        .unwrap();

    axum::serve(listener, app).await.unwrap();
}

async fn handler() -> &'static str {
    "Hello, world!"
}

Actix-Web Integration

Basic Middleware

#![allow(unused)]
fn main() {
use actix_web::{
    dev::{Service, ServiceRequest, ServiceResponse, Transform},
    Error, HttpResponse, http::header,
};
use flux_limiter::{FluxLimiter, FluxLimiterError, SystemClock};
use std::sync::Arc;
use std::future::Future;
use std::pin::Pin;

pub struct RateLimitMiddleware {
    limiter: Arc<FluxLimiter<String, SystemClock>>,
}

impl RateLimitMiddleware {
    pub fn new(limiter: Arc<FluxLimiter<String, SystemClock>>) -> Self {
        Self { limiter }
    }
}

impl<S, B> Transform<S, ServiceRequest> for RateLimitMiddleware
where
    S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
    S::Future: 'static,
    B: 'static,
{
    type Response = ServiceResponse<B>;
    type Error = Error;
    type Transform = RateLimitMiddlewareService<S>;
    type InitError = ();
    type Future = Pin<Box<dyn Future<Output = Result<Self::Transform, Self::InitError>>>>;

    fn new_transform(&self, service: S) -> Self::Future {
        let limiter = self.limiter.clone();
        Box::pin(async move {
            Ok(RateLimitMiddlewareService { service, limiter })
        })
    }
}

pub struct RateLimitMiddlewareService<S> {
    service: S,
    limiter: Arc<FluxLimiter<String, SystemClock>>,
}

impl<S, B> Service<ServiceRequest> for RateLimitMiddlewareService<S>
where
    S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
    S::Future: 'static,
    B: 'static,
{
    type Response = ServiceResponse<B>;
    type Error = Error;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;

    fn call(&self, req: ServiceRequest) -> Self::Future {
        let client_ip = req
            .connection_info()
            .realip_remote_addr()
            .unwrap_or("unknown")
            .to_string();

        let decision = self.limiter.check_request(client_ip);

        match decision {
            Ok(decision) if decision.allowed => {
                let fut = self.service.call(req);
                Box::pin(async move {
                    let mut res = fut.await?;
                    if let Some(remaining) = decision.remaining_capacity {
                        res.headers_mut().insert(
                            header::HeaderName::from_static("x-ratelimit-remaining"),
                            header::HeaderValue::from_str(&remaining.to_string()).unwrap(),
                        );
                    }
                    Ok(res)
                })
            }
            Ok(decision) => {
                Box::pin(async move {
                    let mut response = HttpResponse::TooManyRequests();
                    if let Some(retry_after) = decision.retry_after_seconds {
                        response.insert_header((
                            "Retry-After",
                            (retry_after.ceil() as u64).to_string(),
                        ));
                    }
                    Ok(req.into_response(response.finish()))
                })
            }
            Err(FluxLimiterError::ClockError(_)) => {
                // Fail-open policy
                eprintln!("Clock error - allowing request");
                let fut = self.service.call(req);
                Box::pin(async move { fut.await })
            }
            Err(e) => {
                eprintln!("Rate limiter error: {}", e);
                Box::pin(async move {
                    Ok(req.into_response(HttpResponse::InternalServerError().finish()))
                })
            }
        }
    }
}
}

Rocket Integration

Request Guard

#![allow(unused)]
fn main() {
use rocket::{
    request::{FromRequest, Outcome, Request},
    http::Status,
    State,
};
use flux_limiter::{FluxLimiter, FluxLimiterDecision, SystemClock};
use std::sync::Arc;

pub struct RateLimited {
    pub decision: FluxLimiterDecision,
}

#[rocket::async_trait]
impl<'r> FromRequest<'r> for RateLimited {
    type Error = ();

    async fn from_request(request: &'r Request<'_>) -> Outcome<Self, Self::Error> {
        let limiter = request
            .guard::<&State<Arc<FluxLimiter<String, SystemClock>>>>()
            .await
            .unwrap();

        let client_ip = request
            .client_ip()
            .map(|ip| ip.to_string())
            .unwrap_or_else(|| "unknown".to_string());

        match limiter.check_request(client_ip) {
            Ok(decision) if decision.allowed => {
                Outcome::Success(RateLimited { decision })
            }
            Ok(_) => Outcome::Error((Status::TooManyRequests, ())),
            Err(_) => {
                // Fail-open
                Outcome::Success(RateLimited {
                    decision: FluxLimiterDecision {
                        allowed: true,
                        retry_after_seconds: None,
                        remaining_capacity: None,
                        reset_time_nanos: 0,
                    },
                })
            }
        }
    }
}

// Usage in route
#[rocket::get("/api/data")]
fn data(_rate_limited: RateLimited) -> &'static str {
    "Hello, world!"
}
}

Standard Rate Limit Headers

HTTP Response Headers

Implement standard rate limiting headers:

#![allow(unused)]
fn main() {
use axum::http::HeaderMap;

fn add_rate_limit_headers(
    headers: &mut HeaderMap,
    decision: &FluxLimiterDecision,
    limit: f64,
) {
    // X-RateLimit-Limit: Maximum requests allowed
    headers.insert(
        "X-RateLimit-Limit",
        limit.to_string().parse().unwrap(),
    );

    // X-RateLimit-Remaining: Remaining requests in window
    if let Some(remaining) = decision.remaining_capacity {
        headers.insert(
            "X-RateLimit-Remaining",
            remaining.to_string().parse().unwrap(),
        );
    }

    // X-RateLimit-Reset: When the limit resets (Unix timestamp)
    let reset_seconds = decision.reset_time_nanos / 1_000_000_000;
    headers.insert(
        "X-RateLimit-Reset",
        reset_seconds.to_string().parse().unwrap(),
    );

    // Retry-After: When to retry (for 429 responses)
    if !decision.allowed {
        if let Some(retry_after) = decision.retry_after_seconds {
            headers.insert(
                "Retry-After",
                (retry_after.ceil() as u64).to_string().parse().unwrap(),
            );
        }
    }
}
}

Multi-Tier Rate Limiting

Rate limit based on user tier:

#![allow(unused)]
fn main() {
use axum::{
    extract::{State, Extension},
    http::Request,
    middleware::Next,
    response::Response,
};
use std::collections::HashMap;
use std::sync::Arc;

struct TieredLimiters {
    limiters: HashMap<String, Arc<FluxLimiter<String, SystemClock>>>,
}

impl TieredLimiters {
    fn new() -> Result<Self, FluxLimiterError> {
        let mut limiters = HashMap::new();

        limiters.insert(
            "free".to_string(),
            Arc::new(FluxLimiter::with_config(
                FluxLimiterConfig::new(10.0, 5.0),
                SystemClock
            )?),
        );

        limiters.insert(
            "paid".to_string(),
            Arc::new(FluxLimiter::with_config(
                FluxLimiterConfig::new(100.0, 50.0),
                SystemClock
            )?),
        );

        Ok(Self { limiters })
    }

    fn get_limiter(&self, tier: &str) -> &Arc<FluxLimiter<String, SystemClock>> {
        self.limiters.get(tier).unwrap_or_else(|| &self.limiters["free"])
    }
}

async fn tiered_rate_limit<B>(
    State(limiters): State<Arc<TieredLimiters>>,
    Extension(user_tier): Extension<String>,
    request: Request<B>,
    next: Next<B>,
) -> Response {
    let client_ip = extract_client_ip(&request);
    let limiter = limiters.get_limiter(&user_tier);

    match limiter.check_request(client_ip) {
        Ok(decision) if decision.allowed => next.run(request).await,
        Ok(_) => {
            (StatusCode::TOO_MANY_REQUESTS, "Rate limited").into_response()
        }
        Err(_) => next.run(request).await, // Fail-open
    }
}
}

Background Cleanup Task

Automatically clean up stale clients:

use tokio::time::{interval, Duration};
use std::sync::Arc;

async fn cleanup_task(limiter: Arc<FluxLimiter<String, SystemClock>>) {
    let mut cleanup_interval = interval(Duration::from_secs(3600)); // 1 hour

    loop {
        cleanup_interval.tick().await;

        let threshold = 24 * 60 * 60 * 1_000_000_000u64; // 24 hours

        match limiter.cleanup_stale_clients(threshold) {
            Ok(count) => {
                println!("Rate limiter: cleaned up {} stale clients", count);
            }
            Err(e) => {
                eprintln!("Rate limiter cleanup failed: {}", e);
            }
        }
    }
}

// Start cleanup task when initializing the server
#[tokio::main]
async fn main() {
    let config = FluxLimiterConfig::new(100.0, 50.0);
    let limiter = Arc::new(FluxLimiter::with_config(config, SystemClock).unwrap());

    // Start cleanup in background
    tokio::spawn(cleanup_task(limiter.clone()));

    // Build and run your web server...
}

Next Steps