]> Untitled Git - lemmy.git/blob - crates/utils/src/rate_limit/mod.rs
Merge branch 'split_user_table' into strictly_type_db_ids
[lemmy.git] / crates / utils / src / rate_limit / mod.rs
1 use crate::{
2   settings::structs::{RateLimitConfig, Settings},
3   utils::get_ip,
4   IpAddr,
5   LemmyError,
6 };
7 use actix_web::dev::{Service, ServiceRequest, ServiceResponse, Transform};
8 use futures::future::{ok, Ready};
9 use rate_limiter::{RateLimitType, RateLimiter};
10 use std::{
11   future::Future,
12   pin::Pin,
13   sync::Arc,
14   task::{Context, Poll},
15 };
16 use tokio::sync::Mutex;
17
18 pub mod rate_limiter;
19
20 #[derive(Debug, Clone)]
21 pub struct RateLimit {
22   // it might be reasonable to use a std::sync::Mutex here, since we don't need to lock this
23   // across await points
24   pub rate_limiter: Arc<Mutex<RateLimiter>>,
25 }
26
27 #[derive(Debug, Clone)]
28 pub struct RateLimited {
29   rate_limiter: Arc<Mutex<RateLimiter>>,
30   type_: RateLimitType,
31 }
32
33 pub struct RateLimitedMiddleware<S> {
34   rate_limited: RateLimited,
35   service: S,
36 }
37
38 impl RateLimit {
39   pub fn message(&self) -> RateLimited {
40     self.kind(RateLimitType::Message)
41   }
42
43   pub fn post(&self) -> RateLimited {
44     self.kind(RateLimitType::Post)
45   }
46
47   pub fn register(&self) -> RateLimited {
48     self.kind(RateLimitType::Register)
49   }
50
51   pub fn image(&self) -> RateLimited {
52     self.kind(RateLimitType::Image)
53   }
54
55   fn kind(&self, type_: RateLimitType) -> RateLimited {
56     RateLimited {
57       rate_limiter: self.rate_limiter.clone(),
58       type_,
59     }
60   }
61 }
62
63 impl RateLimited {
64   pub async fn wrap<T, E>(
65     self,
66     ip_addr: IpAddr,
67     fut: impl Future<Output = Result<T, E>>,
68   ) -> Result<T, E>
69   where
70     E: From<LemmyError>,
71   {
72     // Does not need to be blocking because the RwLock in settings never held across await points,
73     // and the operation here locks only long enough to clone
74     let rate_limit: RateLimitConfig = Settings::get().rate_limit();
75
76     // before
77     {
78       let mut limiter = self.rate_limiter.lock().await;
79
80       match self.type_ {
81         RateLimitType::Message => {
82           limiter.check_rate_limit_full(
83             self.type_,
84             &ip_addr,
85             rate_limit.message,
86             rate_limit.message_per_second,
87             false,
88           )?;
89
90           drop(limiter);
91           return fut.await;
92         }
93         RateLimitType::Post => {
94           limiter.check_rate_limit_full(
95             self.type_,
96             &ip_addr,
97             rate_limit.post,
98             rate_limit.post_per_second,
99             true,
100           )?;
101         }
102         RateLimitType::Register => {
103           limiter.check_rate_limit_full(
104             self.type_,
105             &ip_addr,
106             rate_limit.register,
107             rate_limit.register_per_second,
108             true,
109           )?;
110         }
111         RateLimitType::Image => {
112           limiter.check_rate_limit_full(
113             self.type_,
114             &ip_addr,
115             rate_limit.image,
116             rate_limit.image_per_second,
117             false,
118           )?;
119         }
120       };
121     }
122
123     let res = fut.await;
124
125     // after
126     {
127       let mut limiter = self.rate_limiter.lock().await;
128       if res.is_ok() {
129         match self.type_ {
130           RateLimitType::Post => {
131             limiter.check_rate_limit_full(
132               self.type_,
133               &ip_addr,
134               rate_limit.post,
135               rate_limit.post_per_second,
136               false,
137             )?;
138           }
139           RateLimitType::Register => {
140             limiter.check_rate_limit_full(
141               self.type_,
142               &ip_addr,
143               rate_limit.register,
144               rate_limit.register_per_second,
145               false,
146             )?;
147           }
148           _ => (),
149         };
150       }
151     }
152
153     res
154   }
155 }
156
157 impl<S> Transform<S> for RateLimited
158 where
159   S: Service<Request = ServiceRequest, Response = ServiceResponse, Error = actix_web::Error>,
160   S::Future: 'static,
161 {
162   type Request = S::Request;
163   type Response = S::Response;
164   type Error = actix_web::Error;
165   type InitError = ();
166   type Transform = RateLimitedMiddleware<S>;
167   type Future = Ready<Result<Self::Transform, Self::InitError>>;
168
169   fn new_transform(&self, service: S) -> Self::Future {
170     ok(RateLimitedMiddleware {
171       rate_limited: self.clone(),
172       service,
173     })
174   }
175 }
176
177 type FutResult<T, E> = dyn Future<Output = Result<T, E>>;
178
179 impl<S> Service for RateLimitedMiddleware<S>
180 where
181   S: Service<Request = ServiceRequest, Response = ServiceResponse, Error = actix_web::Error>,
182   S::Future: 'static,
183 {
184   type Request = S::Request;
185   type Response = S::Response;
186   type Error = actix_web::Error;
187   type Future = Pin<Box<FutResult<Self::Response, Self::Error>>>;
188
189   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
190     self.service.poll_ready(cx)
191   }
192
193   fn call(&mut self, req: S::Request) -> Self::Future {
194     let ip_addr = get_ip(&req.connection_info());
195
196     let fut = self
197       .rate_limited
198       .clone()
199       .wrap(ip_addr, self.service.call(req));
200
201     Box::pin(async move { fut.await.map_err(actix_web::Error::from) })
202   }
203 }