From: Dessalines Date: Mon, 20 Apr 2020 19:03:32 +0000 (-0400) Subject: Merge branch 'asonix-abstract_websocket_sends' into abstract_websocket_sends X-Git-Url: http://these/git/readmes/%7B%60%24%7BwebArchiveUrl%7D/save/static/gitweb.css?a=commitdiff_plain;h=c0f4d5260f3a6a3642a419ee69014e9edf53388e;p=lemmy.git Merge branch 'asonix-abstract_websocket_sends' into abstract_websocket_sends --- c0f4d5260f3a6a3642a419ee69014e9edf53388e diff --cc server/src/rate_limit/mod.rs index a95c51b5,de45002e..fec8a569 --- a/server/src/rate_limit/mod.rs +++ b/server/src/rate_limit/mod.rs @@@ -2,17 -2,193 +2,193 @@@ pub mod rate_limiter use super::{IPAddr, Settings}; use crate::api::APIError; + use crate::get_ip; + use crate::settings::RateLimitConfig; + use actix_web::dev::{Service, ServiceRequest, ServiceResponse, Transform}; use failure::Error; + use futures::future::{ok, Ready}; -use log::warn; +use log::debug; - use rate_limiter::RateLimiter; + use rate_limiter::{RateLimitType, RateLimiter}; use std::collections::HashMap; + use std::future::Future; + use std::pin::Pin; use std::sync::Arc; - use std::sync::Mutex; + use std::task::{Context, Poll}; use std::time::SystemTime; use strum::IntoEnumIterator; + use tokio::sync::Mutex; #[derive(Debug, Clone)] - pub struct RateLimitInfo { + pub struct RateLimit { pub rate_limiter: Arc>, - pub ip: IPAddr, + } + + #[derive(Debug, Clone)] + pub struct RateLimited { + rate_limiter: Arc>, + type_: RateLimitType, + } + + pub struct RateLimitedMiddleware { + rate_limited: RateLimited, + service: S, + } + + impl RateLimit { + pub fn message(&self) -> RateLimited { + self.kind(RateLimitType::Message) + } + + pub fn post(&self) -> RateLimited { + self.kind(RateLimitType::Post) + } + + pub fn register(&self) -> RateLimited { + self.kind(RateLimitType::Register) + } + + fn kind(&self, type_: RateLimitType) -> RateLimited { + RateLimited { + rate_limiter: self.rate_limiter.clone(), + type_, + } + } + } + + impl RateLimited { + pub async fn wrap( + self, + ip_addr: String, + fut: impl Future>, + ) -> Result + where + E: From, + { + let rate_limit: RateLimitConfig = actix_web::web::block(move || { + // needs to be in a web::block because the RwLock in settings is from stdlib + Ok(Settings::get().rate_limit) as Result<_, failure::Error> + }) + .await + .map_err(|e| match e { + actix_web::error::BlockingError::Error(e) => e, + _ => APIError::err("Operation canceled").into(), + })?; + + // before + { + let mut limiter = self.rate_limiter.lock().await; + + match self.type_ { + RateLimitType::Message => { + limiter.check_rate_limit_full( + self.type_, + &ip_addr, + rate_limit.message, + rate_limit.message_per_second, + false, + )?; + + return fut.await; + } + RateLimitType::Post => { + limiter.check_rate_limit_full( + self.type_.clone(), + &ip_addr, + rate_limit.post, + rate_limit.post_per_second, + true, + )?; + } + RateLimitType::Register => { + limiter.check_rate_limit_full( + self.type_, + &ip_addr, + rate_limit.register, + rate_limit.register_per_second, + true, + )?; + } + }; + } + + let res = fut.await; + + // after + { + let mut limiter = self.rate_limiter.lock().await; + if res.is_ok() { + match self.type_ { + RateLimitType::Post => { + limiter.check_rate_limit_full( + self.type_, + &ip_addr, + rate_limit.post, + rate_limit.post_per_second, + false, + )?; + } + RateLimitType::Register => { + limiter.check_rate_limit_full( + self.type_, + &ip_addr, + rate_limit.register, + rate_limit.register_per_second, + false, + )?; + } + _ => (), + }; + } + } + + res + } + } + + impl Transform for RateLimited + where + S: Service, + S::Future: 'static, + { + type Request = S::Request; + type Response = S::Response; + type Error = actix_web::Error; + type InitError = (); + type Transform = RateLimitedMiddleware; + type Future = Ready>; + + fn new_transform(&self, service: S) -> Self::Future { + ok(RateLimitedMiddleware { + rate_limited: self.clone(), + service, + }) + } + } + + type FutResult = dyn Future>; + + impl Service for RateLimitedMiddleware + where + S: Service, + S::Future: 'static, + { + type Request = S::Request; + type Response = S::Response; + type Error = actix_web::Error; + type Future = Pin>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.service.poll_ready(cx) + } + + fn call(&mut self, req: S::Request) -> Self::Future { + let ip_addr = get_ip(&req.connection_info()); + + let fut = self + .rate_limited + .clone() + .wrap(ip_addr, self.service.call(req)); + + Box::pin(async move { fut.await.map_err(actix_web::Error::from) }) + } }