]> Untitled Git - lemmy.git/blob - server/src/rate_limit/mod.rs
Merge remote-tracking branch 'weblate/main' into main
[lemmy.git] / server / src / rate_limit / mod.rs
1 use super::IPAddr;
2 use crate::{get_ip, LemmyError};
3 use actix_web::dev::{Service, ServiceRequest, ServiceResponse, Transform};
4 use futures::future::{ok, Ready};
5 use lemmy_utils::settings::{RateLimitConfig, Settings};
6 use rate_limiter::{RateLimitType, RateLimiter};
7 use std::{
8   future::Future,
9   pin::Pin,
10   sync::Arc,
11   task::{Context, Poll},
12 };
13 use tokio::sync::Mutex;
14
15 pub mod rate_limiter;
16
17 #[derive(Debug, Clone)]
18 pub struct RateLimit {
19   // it might be reasonable to use a std::sync::Mutex here, since we don't need to lock this
20   // across await points
21   pub rate_limiter: Arc<Mutex<RateLimiter>>,
22 }
23
24 #[derive(Debug, Clone)]
25 pub struct RateLimited {
26   rate_limiter: Arc<Mutex<RateLimiter>>,
27   type_: RateLimitType,
28 }
29
30 pub struct RateLimitedMiddleware<S> {
31   rate_limited: RateLimited,
32   service: S,
33 }
34
35 impl RateLimit {
36   pub fn message(&self) -> RateLimited {
37     self.kind(RateLimitType::Message)
38   }
39
40   pub fn post(&self) -> RateLimited {
41     self.kind(RateLimitType::Post)
42   }
43
44   pub fn register(&self) -> RateLimited {
45     self.kind(RateLimitType::Register)
46   }
47
48   pub fn image(&self) -> RateLimited {
49     self.kind(RateLimitType::Image)
50   }
51
52   fn kind(&self, type_: RateLimitType) -> RateLimited {
53     RateLimited {
54       rate_limiter: self.rate_limiter.clone(),
55       type_,
56     }
57   }
58 }
59
60 impl RateLimited {
61   pub async fn wrap<T, E>(
62     self,
63     ip_addr: String,
64     fut: impl Future<Output = Result<T, E>>,
65   ) -> Result<T, E>
66   where
67     E: From<LemmyError>,
68   {
69     // Does not need to be blocking because the RwLock in settings never held across await points,
70     // and the operation here locks only long enough to clone
71     let rate_limit: RateLimitConfig = Settings::get().rate_limit;
72
73     // before
74     {
75       let mut limiter = self.rate_limiter.lock().await;
76
77       match self.type_ {
78         RateLimitType::Message => {
79           limiter.check_rate_limit_full(
80             self.type_,
81             &ip_addr,
82             rate_limit.message,
83             rate_limit.message_per_second,
84             false,
85           )?;
86
87           drop(limiter);
88           return fut.await;
89         }
90         RateLimitType::Post => {
91           limiter.check_rate_limit_full(
92             self.type_,
93             &ip_addr,
94             rate_limit.post,
95             rate_limit.post_per_second,
96             true,
97           )?;
98         }
99         RateLimitType::Register => {
100           limiter.check_rate_limit_full(
101             self.type_,
102             &ip_addr,
103             rate_limit.register,
104             rate_limit.register_per_second,
105             true,
106           )?;
107         }
108         RateLimitType::Image => {
109           limiter.check_rate_limit_full(
110             self.type_,
111             &ip_addr,
112             rate_limit.image,
113             rate_limit.image_per_second,
114             false,
115           )?;
116         }
117       };
118     }
119
120     let res = fut.await;
121
122     // after
123     {
124       let mut limiter = self.rate_limiter.lock().await;
125       if res.is_ok() {
126         match self.type_ {
127           RateLimitType::Post => {
128             limiter.check_rate_limit_full(
129               self.type_,
130               &ip_addr,
131               rate_limit.post,
132               rate_limit.post_per_second,
133               false,
134             )?;
135           }
136           RateLimitType::Register => {
137             limiter.check_rate_limit_full(
138               self.type_,
139               &ip_addr,
140               rate_limit.register,
141               rate_limit.register_per_second,
142               false,
143             )?;
144           }
145           _ => (),
146         };
147       }
148     }
149
150     res
151   }
152 }
153
154 impl<S> Transform<S> for RateLimited
155 where
156   S: Service<Request = ServiceRequest, Response = ServiceResponse, Error = actix_web::Error>,
157   S::Future: 'static,
158 {
159   type Request = S::Request;
160   type Response = S::Response;
161   type Error = actix_web::Error;
162   type InitError = ();
163   type Transform = RateLimitedMiddleware<S>;
164   type Future = Ready<Result<Self::Transform, Self::InitError>>;
165
166   fn new_transform(&self, service: S) -> Self::Future {
167     ok(RateLimitedMiddleware {
168       rate_limited: self.clone(),
169       service,
170     })
171   }
172 }
173
174 type FutResult<T, E> = dyn Future<Output = Result<T, E>>;
175
176 impl<S> Service for RateLimitedMiddleware<S>
177 where
178   S: Service<Request = ServiceRequest, Response = ServiceResponse, Error = actix_web::Error>,
179   S::Future: 'static,
180 {
181   type Request = S::Request;
182   type Response = S::Response;
183   type Error = actix_web::Error;
184   type Future = Pin<Box<FutResult<Self::Response, Self::Error>>>;
185
186   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
187     self.service.poll_ready(cx)
188   }
189
190   fn call(&mut self, req: S::Request) -> Self::Future {
191     let ip_addr = get_ip(&req.connection_info());
192
193     let fut = self
194       .rate_limited
195       .clone()
196       .wrap(ip_addr, self.service.call(req));
197
198     Box::pin(async move { fut.await.map_err(actix_web::Error::from) })
199   }
200 }