]> Untitled Git - lemmy.git/blob - lemmy_rate_limit/src/lib.rs
Move websocket code into workspace (#107)
[lemmy.git] / lemmy_rate_limit / src / lib.rs
1 #[macro_use]
2 extern crate strum_macros;
3
4 use actix_web::dev::{Service, ServiceRequest, ServiceResponse, Transform};
5 use futures::future::{ok, Ready};
6 use lemmy_utils::{
7   settings::{RateLimitConfig, Settings},
8   utils::get_ip,
9   LemmyError,
10 };
11 use rate_limiter::{RateLimitType, RateLimiter};
12 use std::{
13   future::Future,
14   pin::Pin,
15   sync::Arc,
16   task::{Context, Poll},
17 };
18 use tokio::sync::Mutex;
19
20 pub mod rate_limiter;
21
22 #[derive(Debug, Clone)]
23 pub struct RateLimit {
24   // it might be reasonable to use a std::sync::Mutex here, since we don't need to lock this
25   // across await points
26   pub rate_limiter: Arc<Mutex<RateLimiter>>,
27 }
28
29 #[derive(Debug, Clone)]
30 pub struct RateLimited {
31   rate_limiter: Arc<Mutex<RateLimiter>>,
32   type_: RateLimitType,
33 }
34
35 pub struct RateLimitedMiddleware<S> {
36   rate_limited: RateLimited,
37   service: S,
38 }
39
40 impl RateLimit {
41   pub fn message(&self) -> RateLimited {
42     self.kind(RateLimitType::Message)
43   }
44
45   pub fn post(&self) -> RateLimited {
46     self.kind(RateLimitType::Post)
47   }
48
49   pub fn register(&self) -> RateLimited {
50     self.kind(RateLimitType::Register)
51   }
52
53   pub fn image(&self) -> RateLimited {
54     self.kind(RateLimitType::Image)
55   }
56
57   fn kind(&self, type_: RateLimitType) -> RateLimited {
58     RateLimited {
59       rate_limiter: self.rate_limiter.clone(),
60       type_,
61     }
62   }
63 }
64
65 impl RateLimited {
66   pub async fn wrap<T, E>(
67     self,
68     ip_addr: String,
69     fut: impl Future<Output = Result<T, E>>,
70   ) -> Result<T, E>
71   where
72     E: From<LemmyError>,
73   {
74     // Does not need to be blocking because the RwLock in settings never held across await points,
75     // and the operation here locks only long enough to clone
76     let rate_limit: RateLimitConfig = Settings::get().rate_limit;
77
78     // before
79     {
80       let mut limiter = self.rate_limiter.lock().await;
81
82       match self.type_ {
83         RateLimitType::Message => {
84           limiter.check_rate_limit_full(
85             self.type_,
86             &ip_addr,
87             rate_limit.message,
88             rate_limit.message_per_second,
89             false,
90           )?;
91
92           drop(limiter);
93           return fut.await;
94         }
95         RateLimitType::Post => {
96           limiter.check_rate_limit_full(
97             self.type_,
98             &ip_addr,
99             rate_limit.post,
100             rate_limit.post_per_second,
101             true,
102           )?;
103         }
104         RateLimitType::Register => {
105           limiter.check_rate_limit_full(
106             self.type_,
107             &ip_addr,
108             rate_limit.register,
109             rate_limit.register_per_second,
110             true,
111           )?;
112         }
113         RateLimitType::Image => {
114           limiter.check_rate_limit_full(
115             self.type_,
116             &ip_addr,
117             rate_limit.image,
118             rate_limit.image_per_second,
119             false,
120           )?;
121         }
122       };
123     }
124
125     let res = fut.await;
126
127     // after
128     {
129       let mut limiter = self.rate_limiter.lock().await;
130       if res.is_ok() {
131         match self.type_ {
132           RateLimitType::Post => {
133             limiter.check_rate_limit_full(
134               self.type_,
135               &ip_addr,
136               rate_limit.post,
137               rate_limit.post_per_second,
138               false,
139             )?;
140           }
141           RateLimitType::Register => {
142             limiter.check_rate_limit_full(
143               self.type_,
144               &ip_addr,
145               rate_limit.register,
146               rate_limit.register_per_second,
147               false,
148             )?;
149           }
150           _ => (),
151         };
152       }
153     }
154
155     res
156   }
157 }
158
159 impl<S> Transform<S> for RateLimited
160 where
161   S: Service<Request = ServiceRequest, Response = ServiceResponse, Error = actix_web::Error>,
162   S::Future: 'static,
163 {
164   type Request = S::Request;
165   type Response = S::Response;
166   type Error = actix_web::Error;
167   type InitError = ();
168   type Transform = RateLimitedMiddleware<S>;
169   type Future = Ready<Result<Self::Transform, Self::InitError>>;
170
171   fn new_transform(&self, service: S) -> Self::Future {
172     ok(RateLimitedMiddleware {
173       rate_limited: self.clone(),
174       service,
175     })
176   }
177 }
178
179 type FutResult<T, E> = dyn Future<Output = Result<T, E>>;
180
181 impl<S> Service for RateLimitedMiddleware<S>
182 where
183   S: Service<Request = ServiceRequest, Response = ServiceResponse, Error = actix_web::Error>,
184   S::Future: 'static,
185 {
186   type Request = S::Request;
187   type Response = S::Response;
188   type Error = actix_web::Error;
189   type Future = Pin<Box<FutResult<Self::Response, Self::Error>>>;
190
191   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
192     self.service.poll_ready(cx)
193   }
194
195   fn call(&mut self, req: S::Request) -> Self::Future {
196     let ip_addr = get_ip(&req.connection_info());
197
198     let fut = self
199       .rate_limited
200       .clone()
201       .wrap(ip_addr, self.service.call(req));
202
203     Box::pin(async move { fut.await.map_err(actix_web::Error::from) })
204   }
205 }