"reqwest-retry",
"reqwest-tracing",
"serde",
+ "tokio",
"tracing",
"tracing-actix-web",
"tracing-error",
"smart-default",
"strum",
"strum_macros",
+ "tokio",
"tracing",
"tracing-error",
"typed-builder",
opentelemetry = { version = "0.17.0", features = ["rt-tokio"], optional = true }
opentelemetry-otlp = { version = "0.10.0", optional = true }
tracing-opentelemetry = { version = "0.17.2", optional = true }
+tokio = "1.21.2"
# https://join-lemmy.org/docs/en/administration/configuration.html
{
hostname: lemmy-alpha
- database: {
- # Username to connect to postgres
- user: "&££%^!£*!:::!"£:!:"
- }
}
utils::{
get_local_user_view_from_jwt,
is_admin,
+ local_site_rate_limit_to_rate_limit_config,
local_site_to_slur_regex,
site_description_length_check,
},
let site_view = SiteView::read_local(context.pool()).await?;
+ let rate_limit_config =
+ local_site_rate_limit_to_rate_limit_config(&site_view.local_site_rate_limit);
+ context
+ .settings_updated_channel()
+ .send(rate_limit_config)
+ .await?;
+
Ok(SiteResponse { site_view })
}
}
utils::{
get_local_user_view_from_jwt,
is_admin,
+ local_site_rate_limit_to_rate_limit_config,
local_site_to_slur_regex,
site_description_length_check,
},
let site_view = SiteView::read_local(context.pool()).await?;
+ let rate_limit_config =
+ local_site_rate_limit_to_rate_limit_config(&site_view.local_site_rate_limit);
+ context
+ .settings_updated_channel()
+ .send(rate_limit_config)
+ .await?;
+
let res = SiteResponse { site_view };
context.chat_server().do_send(SendAllMessage {
use lemmy_db_schema::{source::secret::Secret, utils::build_db_pool_for_tests};
use lemmy_utils::{
error::LemmyError,
- rate_limit::{rate_limiter::RateLimiter, RateLimit, RateLimitConfig},
+ rate_limit::{RateLimitCell, RateLimitConfig},
settings::SETTINGS,
};
use lemmy_websocket::{chat_server::ChatServer, LemmyContext};
use reqwest::{Client, Request, Response};
use reqwest_middleware::{ClientBuilder, Middleware, Next};
- use std::sync::{Arc, Mutex};
use task_local_extensions::Extensions;
struct BlockedMiddleware;
}
let rate_limit_config = RateLimitConfig::builder().build();
-
- let rate_limiter = RateLimit {
- rate_limiter: Arc::new(Mutex::new(RateLimiter::default())),
- rate_limit_config,
- };
+ let rate_limit_cell = RateLimitCell::new(rate_limit_config).await;
let chat_server = ChatServer::startup(
pool.clone(),
- rate_limiter,
|_, _, _, _| Box::pin(x()),
|_, _, _, _| Box::pin(x()),
client.clone(),
settings.clone(),
secret.clone(),
+ rate_limit_cell.clone(),
)
.start();
- LemmyContext::create(pool, chat_server, client, settings, secret)
+ LemmyContext::create(
+ pool,
+ chat_server,
+ client,
+ settings,
+ secret,
+ rate_limit_cell.clone(),
+ )
}
}
use futures::stream::{Stream, StreamExt};
use lemmy_api_common::utils::get_local_user_view_from_jwt;
use lemmy_db_schema::source::local_site::LocalSite;
-use lemmy_utils::{claims::Claims, rate_limit::RateLimit, REQWEST_TIMEOUT};
+use lemmy_utils::{claims::Claims, rate_limit::RateLimitCell, REQWEST_TIMEOUT};
use lemmy_websocket::LemmyContext;
use reqwest::Body;
use reqwest_middleware::{ClientWithMiddleware, RequestBuilder};
use serde::{Deserialize, Serialize};
-pub fn config(cfg: &mut web::ServiceConfig, client: ClientWithMiddleware, rate_limit: &RateLimit) {
+pub fn config(
+ cfg: &mut web::ServiceConfig,
+ client: ClientWithMiddleware,
+ rate_limit: &RateLimitCell,
+) {
cfg
.app_data(web::Data::new(client))
.service(
parking_lot = "0.12.1"
typed-builder = "0.10.0"
percent-encoding = "2.2.0"
+tokio = "1.21.2"
[build-dependencies]
rosetta-build = "0.1.2"
use crate::{error::LemmyError, utils::get_ip, IpAddr};
use actix_web::dev::{Service, ServiceRequest, ServiceResponse, Transform};
use futures::future::{ok, Ready};
-use rate_limiter::{RateLimitType, RateLimiter};
+use rate_limiter::{RateLimitStorage, RateLimitType};
use serde::{Deserialize, Serialize};
use std::{
future::Future,
sync::{Arc, Mutex},
task::{Context, Poll},
};
+use tokio::sync::{mpsc, mpsc::Sender, OnceCell};
use typed_builder::TypedBuilder;
pub mod rate_limiter;
}
#[derive(Debug, Clone)]
-pub struct RateLimit {
- // it might be reasonable to use a std::sync::Mutex here, since we don't need to lock this
- // across await points
- pub rate_limiter: Arc<Mutex<RateLimiter>>,
+struct RateLimit {
+ pub rate_limiter: RateLimitStorage,
pub rate_limit_config: RateLimitConfig,
}
#[derive(Debug, Clone)]
-pub struct RateLimited {
- rate_limiter: Arc<Mutex<RateLimiter>>,
- rate_limit_config: RateLimitConfig,
+pub struct RateLimitedGuard {
+ rate_limit: Arc<Mutex<RateLimit>>,
type_: RateLimitType,
}
-pub struct RateLimitedMiddleware<S> {
- rate_limited: RateLimited,
- service: Rc<S>,
+/// Single instance of rate limit config and buckets, which is shared across all threads.
+#[derive(Clone)]
+pub struct RateLimitCell {
+ tx: Sender<RateLimitConfig>,
+ rate_limit: Arc<Mutex<RateLimit>>,
}
-impl RateLimit {
- pub fn message(&self) -> RateLimited {
+impl RateLimitCell {
+ /// Initialize cell if it wasnt initialized yet. Otherwise returns the existing cell.
+ pub async fn new(rate_limit_config: RateLimitConfig) -> &'static Self {
+ static LOCAL_INSTANCE: OnceCell<RateLimitCell> = OnceCell::const_new();
+ LOCAL_INSTANCE
+ .get_or_init(|| async {
+ let (tx, mut rx) = mpsc::channel::<RateLimitConfig>(4);
+ let rate_limit = Arc::new(Mutex::new(RateLimit {
+ rate_limiter: Default::default(),
+ rate_limit_config,
+ }));
+ let rate_limit2 = rate_limit.clone();
+ tokio::spawn(async move {
+ while let Some(r) = rx.recv().await {
+ rate_limit2
+ .lock()
+ .expect("Failed to lock rate limit mutex for updating")
+ .rate_limit_config = r;
+ }
+ });
+ RateLimitCell { tx, rate_limit }
+ })
+ .await
+ }
+
+ /// Call this when the config was updated, to update all in-memory cells.
+ pub async fn send(&self, config: RateLimitConfig) -> Result<(), LemmyError> {
+ self.tx.send(config).await?;
+ Ok(())
+ }
+
+ pub fn message(&self) -> RateLimitedGuard {
self.kind(RateLimitType::Message)
}
- pub fn post(&self) -> RateLimited {
+ pub fn post(&self) -> RateLimitedGuard {
self.kind(RateLimitType::Post)
}
- pub fn register(&self) -> RateLimited {
+ pub fn register(&self) -> RateLimitedGuard {
self.kind(RateLimitType::Register)
}
- pub fn image(&self) -> RateLimited {
+ pub fn image(&self) -> RateLimitedGuard {
self.kind(RateLimitType::Image)
}
- pub fn comment(&self) -> RateLimited {
+ pub fn comment(&self) -> RateLimitedGuard {
self.kind(RateLimitType::Comment)
}
- pub fn search(&self) -> RateLimited {
+ pub fn search(&self) -> RateLimitedGuard {
self.kind(RateLimitType::Search)
}
- fn kind(&self, type_: RateLimitType) -> RateLimited {
- RateLimited {
- rate_limiter: self.rate_limiter.clone(),
- rate_limit_config: self.rate_limit_config.clone(),
+ fn kind(&self, type_: RateLimitType) -> RateLimitedGuard {
+ RateLimitedGuard {
+ rate_limit: self.rate_limit.clone(),
type_,
}
}
}
-impl RateLimited {
+pub struct RateLimitedMiddleware<S> {
+ rate_limited: RateLimitedGuard,
+ service: Rc<S>,
+}
+
+impl RateLimitedGuard {
/// Returns true if the request passed the rate limit, false if it failed and should be rejected.
pub fn check(self, ip_addr: IpAddr) -> bool {
// Does not need to be blocking because the RwLock in settings never held across await points,
// and the operation here locks only long enough to clone
- let rate_limit = self.rate_limit_config;
+ let mut guard = self
+ .rate_limit
+ .lock()
+ .expect("Failed to lock rate limit mutex for reading");
+ let rate_limit = &guard.rate_limit_config;
let (kind, interval) = match self.type_ {
RateLimitType::Message => (rate_limit.message, rate_limit.message_per_second),
RateLimitType::Comment => (rate_limit.comment, rate_limit.comment_per_second),
RateLimitType::Search => (rate_limit.search, rate_limit.search_per_second),
};
- let mut limiter = self.rate_limiter.lock().expect("mutex poison error");
+ let limiter = &mut guard.rate_limiter;
limiter.check_rate_limit_full(self.type_, &ip_addr, kind, interval)
}
}
-impl<S> Transform<S, ServiceRequest> for RateLimited
+impl<S> Transform<S, ServiceRequest> for RateLimitedGuard
where
S: Service<ServiceRequest, Response = ServiceResponse, Error = actix_web::Error> + 'static,
S::Future: 'static,
/// Rate limiting based on rate type and IP addr
#[derive(Debug, Clone, Default)]
-pub struct RateLimiter {
+pub struct RateLimitStorage {
buckets: HashMap<RateLimitType, HashMap<IpAddr, RateLimitBucket>>,
}
-impl RateLimiter {
+impl RateLimitStorage {
fn insert_ip(&mut self, ip: &IpAddr) {
for rate_limit_type in RateLimitType::iter() {
if self.buckets.get(&rate_limit_type).is_none() {
use lemmy_utils::{
error::LemmyError,
location_info,
- rate_limit::RateLimit,
+ rate_limit::RateLimitCell,
settings::structs::Settings,
ConnectionId,
IpAddr,
/// The Secrets
pub(super) secret: Secret,
- /// Rate limiting based on rate type and IP addr
- pub(super) rate_limiter: RateLimit,
-
/// A list of the current captchas
pub(super) captchas: Vec<CaptchaItem>,
/// An HTTP Client
client: ClientWithMiddleware,
+
+ rate_limit_cell: RateLimitCell,
}
pub struct SessionInfo {
#![allow(clippy::too_many_arguments)]
pub fn startup(
pool: DbPool,
- rate_limiter: RateLimit,
message_handler: MessageHandlerType,
message_handler_crud: MessageHandlerCrudType,
client: ClientWithMiddleware,
settings: Settings,
secret: Secret,
+ rate_limit_cell: RateLimitCell,
) -> ChatServer {
ChatServer {
sessions: HashMap::new(),
user_rooms: HashMap::new(),
rng: rand::thread_rng(),
pool,
- rate_limiter,
captchas: Vec::new(),
message_handler,
message_handler_crud,
client,
settings,
secret,
+ rate_limit_cell,
}
}
msg: StandardMessage,
ctx: &mut Context<Self>,
) -> impl Future<Output = Result<String, LemmyError>> {
- let rate_limiter = self.rate_limiter.clone();
-
let ip: IpAddr = match self.sessions.get(&msg.id) {
Some(info) => info.ip.to_owned(),
None => IpAddr("blank_ip".to_string()),
client: self.client.to_owned(),
settings: self.settings.to_owned(),
secret: self.secret.to_owned(),
+ rate_limit_cell: self.rate_limit_cell.to_owned(),
};
let message_handler_crud = self.message_handler_crud;
let message_handler = self.message_handler;
+ let rate_limiter = self.rate_limit_cell.clone();
async move {
let json: Value = serde_json::from_str(&msg.msg)?;
let data = &json["data"].to_string();
use lemmy_db_schema::{source::secret::Secret, utils::DbPool};
use lemmy_utils::{
error::LemmyError,
+ rate_limit::RateLimitCell,
settings::{structs::Settings, SETTINGS},
};
use reqwest_middleware::ClientWithMiddleware;
client: ClientWithMiddleware,
settings: Settings,
secret: Secret,
+ rate_limit_cell: RateLimitCell,
}
impl LemmyContext {
client: ClientWithMiddleware,
settings: Settings,
secret: Secret,
+ settings_updated_channel: RateLimitCell,
) -> LemmyContext {
LemmyContext {
pool,
client,
settings,
secret,
+ rate_limit_cell: settings_updated_channel,
}
}
pub fn pool(&self) -> &DbPool {
pub fn secret(&self) -> &Secret {
&self.secret
}
+ pub fn settings_updated_channel(&self) -> &RateLimitCell {
+ &self.rate_limit_cell
+ }
}
impl Clone for LemmyContext {
client: self.client.clone(),
settings: self.settings.clone(),
secret: self.secret.clone(),
+ rate_limit_cell: self.rate_limit_cell.clone(),
}
}
}
use actix::prelude::*;
use actix_web::{web, Error, HttpRequest, HttpResponse};
use actix_web_actors::ws;
-use lemmy_utils::{rate_limit::RateLimit, utils::get_ip, ConnectionId, IpAddr};
+use lemmy_utils::{rate_limit::RateLimitCell, utils::get_ip, ConnectionId, IpAddr};
use std::time::{Duration, Instant};
use tracing::{debug, error, info};
req: HttpRequest,
stream: web::Payload,
context: web::Data<LemmyContext>,
- rate_limiter: web::Data<RateLimit>,
+ rate_limiter: web::Data<RateLimitCell>,
) -> Result<HttpResponse, Error> {
ws::start(
WsSession {
/// otherwise we drop connection.
hb: Instant,
/// A rate limiter for websocket joins
- rate_limiter: RateLimit,
+ rate_limiter: RateLimitCell,
}
impl Actor for WsSession {
websocket::*,
};
use lemmy_api_crud::PerformCrud;
-use lemmy_utils::rate_limit::RateLimit;
+use lemmy_utils::rate_limit::RateLimitCell;
use lemmy_websocket::{routes::chat_route, LemmyContext};
use serde::Deserialize;
-pub fn config(cfg: &mut web::ServiceConfig, rate_limit: &RateLimit) {
+pub fn config(cfg: &mut web::ServiceConfig, rate_limit: &RateLimitCell) {
cfg.service(
web::scope("/api/v3")
// Websocket
};
use lemmy_utils::{
error::LemmyError,
- rate_limit::{rate_limiter::RateLimiter, RateLimit},
+ rate_limit::RateLimitCell,
settings::{structs::Settings, SETTINGS},
};
use lemmy_websocket::{chat_server::ChatServer, LemmyContext};
use reqwest_middleware::ClientBuilder;
use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use reqwest_tracing::TracingMiddleware;
-use std::{
- env,
- sync::{Arc, Mutex},
- thread,
- time::Duration,
-};
+use std::{env, thread, time::Duration};
use tracing_actix_web::TracingLogger;
pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
// Set up the rate limiter
let rate_limit_config =
local_site_rate_limit_to_rate_limit_config(&site_view.local_site_rate_limit);
-
- // TODO this isn't live-updating
- // https://github.com/LemmyNet/lemmy/issues/2508
- let rate_limiter = RateLimit {
- rate_limiter: Arc::new(Mutex::new(RateLimiter::default())),
- rate_limit_config,
- };
+ let rate_limit_cell = RateLimitCell::new(rate_limit_config).await;
println!(
"Starting http server at {}:{}",
let chat_server = ChatServer::startup(
pool.clone(),
- rate_limiter.clone(),
|c, i, o, d| Box::pin(match_websocket_operation(c, i, o, d)),
|c, i, o, d| Box::pin(match_websocket_operation_crud(c, i, o, d)),
client.clone(),
settings.clone(),
secret.clone(),
+ rate_limit_cell.clone(),
)
.start();
client.clone(),
settings.to_owned(),
secret.to_owned(),
+ rate_limit_cell.clone(),
);
- let rate_limiter = rate_limiter.clone();
App::new()
- .wrap(actix_web::middleware::Logger::default())
+ .wrap(middleware::Logger::default())
.wrap(TracingLogger::<QuieterRootSpanBuilder>::new())
.app_data(Data::new(context))
- .app_data(Data::new(rate_limiter.clone()))
+ .app_data(Data::new(rate_limit_cell.clone()))
// The routes
- .configure(|cfg| api_routes::config(cfg, &rate_limiter))
+ .configure(|cfg| api_routes::config(cfg, rate_limit_cell))
.configure(|cfg| {
if federation_enabled {
lemmy_apub::http::routes::config(cfg);
}
})
.configure(feeds::config)
- .configure(|cfg| images::config(cfg, pictrs_client.clone(), &rate_limiter))
+ .configure(|cfg| images::config(cfg, pictrs_client.clone(), rate_limit_cell))
.configure(nodeinfo::config)
})
.bind((settings_bind.bind, settings_bind.port))?