]> Untitled Git - lemmy.git/blob - crates/utils/src/rate_limit/mod.rs
Merge remote-tracking branch 'origin/main' into 1462-jwt-revocation-on-pwd-change
[lemmy.git] / crates / utils / src / rate_limit / mod.rs
1 use crate::{
2   settings::structs::{RateLimitConfig, Settings},
3   utils::get_ip,
4   LemmyError,
5 };
6 use actix_web::dev::{Service, ServiceRequest, ServiceResponse, Transform};
7 use futures::future::{ok, Ready};
8 use rate_limiter::{RateLimitType, RateLimiter};
9 use std::{
10   future::Future,
11   pin::Pin,
12   sync::Arc,
13   task::{Context, Poll},
14 };
15 use tokio::sync::Mutex;
16
17 pub mod rate_limiter;
18
19 #[derive(Debug, Clone)]
20 pub struct RateLimit {
21   // it might be reasonable to use a std::sync::Mutex here, since we don't need to lock this
22   // across await points
23   pub rate_limiter: Arc<Mutex<RateLimiter>>,
24 }
25
26 #[derive(Debug, Clone)]
27 pub struct RateLimited {
28   rate_limiter: Arc<Mutex<RateLimiter>>,
29   type_: RateLimitType,
30 }
31
32 pub struct RateLimitedMiddleware<S> {
33   rate_limited: RateLimited,
34   service: S,
35 }
36
37 impl RateLimit {
38   pub fn message(&self) -> RateLimited {
39     self.kind(RateLimitType::Message)
40   }
41
42   pub fn post(&self) -> RateLimited {
43     self.kind(RateLimitType::Post)
44   }
45
46   pub fn register(&self) -> RateLimited {
47     self.kind(RateLimitType::Register)
48   }
49
50   pub fn image(&self) -> RateLimited {
51     self.kind(RateLimitType::Image)
52   }
53
54   fn kind(&self, type_: RateLimitType) -> RateLimited {
55     RateLimited {
56       rate_limiter: self.rate_limiter.clone(),
57       type_,
58     }
59   }
60 }
61
62 impl RateLimited {
63   pub async fn wrap<T, E>(
64     self,
65     ip_addr: String,
66     fut: impl Future<Output = Result<T, E>>,
67   ) -> Result<T, E>
68   where
69     E: From<LemmyError>,
70   {
71     // Does not need to be blocking because the RwLock in settings never held across await points,
72     // and the operation here locks only long enough to clone
73     let rate_limit: RateLimitConfig = Settings::get().rate_limit();
74
75     // before
76     {
77       let mut limiter = self.rate_limiter.lock().await;
78
79       match self.type_ {
80         RateLimitType::Message => {
81           limiter.check_rate_limit_full(
82             self.type_,
83             &ip_addr,
84             rate_limit.message,
85             rate_limit.message_per_second,
86             false,
87           )?;
88
89           drop(limiter);
90           return fut.await;
91         }
92         RateLimitType::Post => {
93           limiter.check_rate_limit_full(
94             self.type_,
95             &ip_addr,
96             rate_limit.post,
97             rate_limit.post_per_second,
98             true,
99           )?;
100         }
101         RateLimitType::Register => {
102           limiter.check_rate_limit_full(
103             self.type_,
104             &ip_addr,
105             rate_limit.register,
106             rate_limit.register_per_second,
107             true,
108           )?;
109         }
110         RateLimitType::Image => {
111           limiter.check_rate_limit_full(
112             self.type_,
113             &ip_addr,
114             rate_limit.image,
115             rate_limit.image_per_second,
116             false,
117           )?;
118         }
119       };
120     }
121
122     let res = fut.await;
123
124     // after
125     {
126       let mut limiter = self.rate_limiter.lock().await;
127       if res.is_ok() {
128         match self.type_ {
129           RateLimitType::Post => {
130             limiter.check_rate_limit_full(
131               self.type_,
132               &ip_addr,
133               rate_limit.post,
134               rate_limit.post_per_second,
135               false,
136             )?;
137           }
138           RateLimitType::Register => {
139             limiter.check_rate_limit_full(
140               self.type_,
141               &ip_addr,
142               rate_limit.register,
143               rate_limit.register_per_second,
144               false,
145             )?;
146           }
147           _ => (),
148         };
149       }
150     }
151
152     res
153   }
154 }
155
156 impl<S> Transform<S> for RateLimited
157 where
158   S: Service<Request = ServiceRequest, Response = ServiceResponse, Error = actix_web::Error>,
159   S::Future: 'static,
160 {
161   type Request = S::Request;
162   type Response = S::Response;
163   type Error = actix_web::Error;
164   type InitError = ();
165   type Transform = RateLimitedMiddleware<S>;
166   type Future = Ready<Result<Self::Transform, Self::InitError>>;
167
168   fn new_transform(&self, service: S) -> Self::Future {
169     ok(RateLimitedMiddleware {
170       rate_limited: self.clone(),
171       service,
172     })
173   }
174 }
175
176 type FutResult<T, E> = dyn Future<Output = Result<T, E>>;
177
178 impl<S> Service for RateLimitedMiddleware<S>
179 where
180   S: Service<Request = ServiceRequest, Response = ServiceResponse, Error = actix_web::Error>,
181   S::Future: 'static,
182 {
183   type Request = S::Request;
184   type Response = S::Response;
185   type Error = actix_web::Error;
186   type Future = Pin<Box<FutResult<Self::Response, Self::Error>>>;
187
188   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
189     self.service.poll_ready(cx)
190   }
191
192   fn call(&mut self, req: S::Request) -> Self::Future {
193     let ip_addr = get_ip(&req.connection_info());
194
195     let fut = self
196       .rate_limited
197       .clone()
198       .wrap(ip_addr, self.service.call(req));
199
200     Box::pin(async move { fut.await.map_err(actix_web::Error::from) })
201   }
202 }