1 use crate::error::LemmyError;
2 use actix_web::dev::{ConnectionInfo, Service, ServiceRequest, ServiceResponse, Transform};
3 use enum_map::enum_map;
4 use futures::future::{ok, Ready};
5 use rate_limiter::{InstantSecs, RateLimitStorage, RateLimitType};
6 use serde::{Deserialize, Serialize};
9 net::{IpAddr, Ipv4Addr, SocketAddr},
14 task::{Context, Poll},
17 use tokio::sync::{mpsc, mpsc::Sender, OnceCell};
18 use typed_builder::TypedBuilder;
22 #[derive(Debug, Deserialize, Serialize, Clone, TypedBuilder)]
23 pub struct RateLimitConfig {
24 #[builder(default = 180)]
25 /// Maximum number of messages created in interval
27 #[builder(default = 60)]
28 /// Interval length for message limit, in seconds
29 pub message_per_second: i32,
30 #[builder(default = 6)]
31 /// Maximum number of posts created in interval
33 #[builder(default = 300)]
34 /// Interval length for post limit, in seconds
35 pub post_per_second: i32,
36 #[builder(default = 3)]
37 /// Maximum number of registrations in interval
39 #[builder(default = 3600)]
40 /// Interval length for registration limit, in seconds
41 pub register_per_second: i32,
42 #[builder(default = 6)]
43 /// Maximum number of image uploads in interval
45 #[builder(default = 3600)]
46 /// Interval length for image uploads, in seconds
47 pub image_per_second: i32,
48 #[builder(default = 6)]
49 /// Maximum number of comments created in interval
51 #[builder(default = 600)]
52 /// Interval length for comment limit, in seconds
53 pub comment_per_second: i32,
54 #[builder(default = 60)]
55 /// Maximum number of searches created in interval
57 #[builder(default = 600)]
58 /// Interval length for search limit, in seconds
59 pub search_per_second: i32,
62 #[derive(Debug, Clone)]
64 pub rate_limiter: RateLimitStorage,
65 pub rate_limit_config: RateLimitConfig,
68 #[derive(Debug, Clone)]
69 pub struct RateLimitedGuard {
70 rate_limit: Arc<Mutex<RateLimit>>,
74 /// Single instance of rate limit config and buckets, which is shared across all threads.
76 pub struct RateLimitCell {
77 tx: Sender<RateLimitConfig>,
78 rate_limit: Arc<Mutex<RateLimit>>,
82 /// Initialize cell if it wasnt initialized yet. Otherwise returns the existing cell.
83 pub async fn new(rate_limit_config: RateLimitConfig) -> &'static Self {
84 static LOCAL_INSTANCE: OnceCell<RateLimitCell> = OnceCell::const_new();
86 .get_or_init(|| async {
87 let (tx, mut rx) = mpsc::channel::<RateLimitConfig>(4);
88 let rate_limit = Arc::new(Mutex::new(RateLimit {
89 rate_limiter: Default::default(),
92 let rate_limit2 = rate_limit.clone();
93 tokio::spawn(async move {
94 while let Some(r) = rx.recv().await {
97 .expect("Failed to lock rate limit mutex for updating")
98 .rate_limit_config = r;
101 RateLimitCell { tx, rate_limit }
106 /// Call this when the config was updated, to update all in-memory cells.
107 pub async fn send(&self, config: RateLimitConfig) -> Result<(), LemmyError> {
108 self.tx.send(config).await?;
112 /// Remove buckets older than the given duration
113 pub fn remove_older_than(&self, mut duration: Duration) {
117 .expect("Failed to lock rate limit mutex for reading");
118 let rate_limit = &guard.rate_limit_config;
120 // If any rate limit interval is greater than `duration`, then the largest interval is used instead. This preserves buckets that would not pass the rate limit check.
121 let max_interval_secs = enum_map! {
122 RateLimitType::Message => rate_limit.message_per_second,
123 RateLimitType::Post => rate_limit.post_per_second,
124 RateLimitType::Register => rate_limit.register_per_second,
125 RateLimitType::Image => rate_limit.image_per_second,
126 RateLimitType::Comment => rate_limit.comment_per_second,
127 RateLimitType::Search => rate_limit.search_per_second,
131 .and_then(|max| u64::try_from(max).ok())
134 duration = std::cmp::max(duration, Duration::from_secs(max_interval_secs));
138 .remove_older_than(duration, InstantSecs::now())
141 pub fn message(&self) -> RateLimitedGuard {
142 self.kind(RateLimitType::Message)
145 pub fn post(&self) -> RateLimitedGuard {
146 self.kind(RateLimitType::Post)
149 pub fn register(&self) -> RateLimitedGuard {
150 self.kind(RateLimitType::Register)
153 pub fn image(&self) -> RateLimitedGuard {
154 self.kind(RateLimitType::Image)
157 pub fn comment(&self) -> RateLimitedGuard {
158 self.kind(RateLimitType::Comment)
161 pub fn search(&self) -> RateLimitedGuard {
162 self.kind(RateLimitType::Search)
165 fn kind(&self, type_: RateLimitType) -> RateLimitedGuard {
167 rate_limit: self.rate_limit.clone(),
173 pub struct RateLimitedMiddleware<S> {
174 rate_limited: RateLimitedGuard,
178 impl RateLimitedGuard {
179 /// Returns true if the request passed the rate limit, false if it failed and should be rejected.
180 pub fn check(self, ip_addr: IpAddr) -> bool {
181 // Does not need to be blocking because the RwLock in settings never held across await points,
182 // and the operation here locks only long enough to clone
186 .expect("Failed to lock rate limit mutex for reading");
187 let rate_limit = &guard.rate_limit_config;
189 let (kind, interval) = match self.type_ {
190 RateLimitType::Message => (rate_limit.message, rate_limit.message_per_second),
191 RateLimitType::Post => (rate_limit.post, rate_limit.post_per_second),
192 RateLimitType::Register => (rate_limit.register, rate_limit.register_per_second),
193 RateLimitType::Image => (rate_limit.image, rate_limit.image_per_second),
194 RateLimitType::Comment => (rate_limit.comment, rate_limit.comment_per_second),
195 RateLimitType::Search => (rate_limit.search, rate_limit.search_per_second),
197 let limiter = &mut guard.rate_limiter;
199 limiter.check_rate_limit_full(self.type_, ip_addr, kind, interval, InstantSecs::now())
203 impl<S> Transform<S, ServiceRequest> for RateLimitedGuard
205 S: Service<ServiceRequest, Response = ServiceResponse, Error = actix_web::Error> + 'static,
208 type Response = S::Response;
209 type Error = actix_web::Error;
211 type Transform = RateLimitedMiddleware<S>;
212 type Future = Ready<Result<Self::Transform, Self::InitError>>;
214 fn new_transform(&self, service: S) -> Self::Future {
215 ok(RateLimitedMiddleware {
216 rate_limited: self.clone(),
217 service: Rc::new(service),
222 type FutResult<T, E> = dyn Future<Output = Result<T, E>>;
224 impl<S> Service<ServiceRequest> for RateLimitedMiddleware<S>
226 S: Service<ServiceRequest, Response = ServiceResponse, Error = actix_web::Error> + 'static,
229 type Response = S::Response;
230 type Error = actix_web::Error;
231 type Future = Pin<Box<FutResult<Self::Response, Self::Error>>>;
233 fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
234 self.service.poll_ready(cx)
237 fn call(&self, req: ServiceRequest) -> Self::Future {
238 let ip_addr = get_ip(&req.connection_info());
240 let rate_limited = self.rate_limited.clone();
241 let service = self.service.clone();
243 Box::pin(async move {
244 if rate_limited.check(ip_addr) {
245 service.call(req).await
247 let (http_req, _) = req.into_parts();
248 Ok(ServiceResponse::from_err(
249 LemmyError::from_message("rate_limit_error"),
257 fn get_ip(conn_info: &ConnectionInfo) -> IpAddr {
259 .realip_remote_addr()
261 .unwrap_or(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))
264 fn parse_ip(addr: &str) -> Option<IpAddr> {
265 if let Some(s) = addr.strip_suffix(']') {
266 IpAddr::from_str(s.get(1..)?).ok()
267 } else if let Ok(ip) = IpAddr::from_str(addr) {
269 } else if let Ok(socket) = SocketAddr::from_str(addr) {
287 for addr in ip_addrs {
288 assert!(super::parse_ip(addr).is_some(), "failed to parse {addr}");