]> Untitled Git - lemmy.git/blob - crates/utils/src/rate_limit/mod.rs
Rate limit websocket joins. (#2165)
[lemmy.git] / crates / utils / src / rate_limit / mod.rs
1 use crate::{settings::structs::RateLimitConfig, utils::get_ip, IpAddr};
2 use actix_web::{
3   dev::{Service, ServiceRequest, ServiceResponse, Transform},
4   HttpResponse,
5 };
6 use futures::future::{ok, Ready};
7 use parking_lot::Mutex;
8 use rate_limiter::{RateLimitType, RateLimiter};
9 use std::{
10   future::Future,
11   pin::Pin,
12   rc::Rc,
13   sync::Arc,
14   task::{Context, Poll},
15 };
16
17 pub mod rate_limiter;
18
19 #[derive(Debug, Clone)]
20 pub struct RateLimit {
21   // it might be reasonable to use a std::sync::Mutex here, since we don't need to lock this
22   // across await points
23   pub rate_limiter: Arc<Mutex<RateLimiter>>,
24   pub rate_limit_config: RateLimitConfig,
25 }
26
27 #[derive(Debug, Clone)]
28 pub struct RateLimited {
29   rate_limiter: Arc<Mutex<RateLimiter>>,
30   rate_limit_config: RateLimitConfig,
31   type_: RateLimitType,
32 }
33
34 pub struct RateLimitedMiddleware<S> {
35   rate_limited: RateLimited,
36   service: Rc<S>,
37 }
38
39 impl RateLimit {
40   pub fn message(&self) -> RateLimited {
41     self.kind(RateLimitType::Message)
42   }
43
44   pub fn post(&self) -> RateLimited {
45     self.kind(RateLimitType::Post)
46   }
47
48   pub fn register(&self) -> RateLimited {
49     self.kind(RateLimitType::Register)
50   }
51
52   pub fn image(&self) -> RateLimited {
53     self.kind(RateLimitType::Image)
54   }
55
56   pub fn comment(&self) -> RateLimited {
57     self.kind(RateLimitType::Comment)
58   }
59
60   fn kind(&self, type_: RateLimitType) -> RateLimited {
61     RateLimited {
62       rate_limiter: self.rate_limiter.clone(),
63       rate_limit_config: self.rate_limit_config.clone(),
64       type_,
65     }
66   }
67 }
68
69 impl RateLimited {
70   /// Returns true if the request passed the rate limit, false if it failed and should be rejected.
71   pub fn check(self, ip_addr: IpAddr) -> bool {
72     // Does not need to be blocking because the RwLock in settings never held across await points,
73     // and the operation here locks only long enough to clone
74     let rate_limit = self.rate_limit_config;
75
76     let (kind, interval) = match self.type_ {
77       RateLimitType::Message => (rate_limit.message, rate_limit.message_per_second),
78       RateLimitType::Post => (rate_limit.post, rate_limit.post_per_second),
79       RateLimitType::Register => (rate_limit.register, rate_limit.register_per_second),
80       RateLimitType::Image => (rate_limit.image, rate_limit.image_per_second),
81       RateLimitType::Comment => (rate_limit.comment, rate_limit.comment_per_second),
82     };
83     let mut limiter = self.rate_limiter.lock();
84
85     limiter.check_rate_limit_full(self.type_, &ip_addr, kind, interval)
86   }
87 }
88
89 impl<S> Transform<S, ServiceRequest> for RateLimited
90 where
91   S: Service<ServiceRequest, Response = ServiceResponse, Error = actix_web::Error> + 'static,
92   S::Future: 'static,
93 {
94   type Response = S::Response;
95   type Error = actix_web::Error;
96   type InitError = ();
97   type Transform = RateLimitedMiddleware<S>;
98   type Future = Ready<Result<Self::Transform, Self::InitError>>;
99
100   fn new_transform(&self, service: S) -> Self::Future {
101     ok(RateLimitedMiddleware {
102       rate_limited: self.clone(),
103       service: Rc::new(service),
104     })
105   }
106 }
107
108 type FutResult<T, E> = dyn Future<Output = Result<T, E>>;
109
110 impl<S> Service<ServiceRequest> for RateLimitedMiddleware<S>
111 where
112   S: Service<ServiceRequest, Response = ServiceResponse, Error = actix_web::Error> + 'static,
113   S::Future: 'static,
114 {
115   type Response = S::Response;
116   type Error = actix_web::Error;
117   type Future = Pin<Box<FutResult<Self::Response, Self::Error>>>;
118
119   fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
120     self.service.poll_ready(cx)
121   }
122
123   fn call(&self, req: ServiceRequest) -> Self::Future {
124     let ip_addr = get_ip(&req.connection_info());
125
126     let rate_limited = self.rate_limited.clone();
127     let service = self.service.clone();
128
129     Box::pin(async move {
130       if rate_limited.check(ip_addr) {
131         service.call(req).await
132       } else {
133         let (http_req, _) = req.into_parts();
134         // if rate limit was hit, respond with http 400
135         Ok(ServiceResponse::new(
136           http_req,
137           HttpResponse::BadRequest().finish(),
138         ))
139       }
140     })
141   }
142 }