X-Git-Url: http://these/git/?a=blobdiff_plain;f=crates%2Fapub%2Fsrc%2Flib.rs;h=9a45284f22fbf609dfbbdf0196847e685b8cdbf0;hb=e9e76549a88cfbdab36f00d302cceabcaaa24f4c;hp=a5bc41d1fd0b2a192190fa074ec64cf7e2a47d0a;hpb=eb40aeb89b139d843cac1ecf9370a6b808b3aeea;p=lemmy.git diff --git a/crates/apub/src/lib.rs b/crates/apub/src/lib.rs index a5bc41d1..9a45284f 100644 --- a/crates/apub/src/lib.rs +++ b/crates/apub/src/lib.rs @@ -3,17 +3,13 @@ use activitypub_federation::config::{Data, UrlVerifier}; use async_trait::async_trait; use lemmy_api_common::context::LemmyContext; use lemmy_db_schema::{ - source::{ - activity::{Activity, ActivityInsertForm}, - instance::Instance, - local_site::LocalSite, - }, - traits::Crud, - utils::DbPool, + source::{activity::ReceivedActivity, instance::Instance, local_site::LocalSite}, + utils::{ActualDbPool, DbPool}, }; -use lemmy_utils::{error::LemmyError, settings::structs::Settings}; +use lemmy_utils::error::{LemmyError, LemmyErrorType, LemmyResult}; +use moka::future::Cache; use once_cell::sync::Lazy; -use serde::Serialize; +use std::{sync::Arc, time::Duration}; use url::Url; pub mod activities; @@ -27,18 +23,23 @@ pub mod objects; pub mod protocol; pub const FEDERATION_HTTP_FETCH_LIMIT: u32 = 50; +/// All incoming and outgoing federation actions read the blocklist/allowlist and slur filters +/// multiple times. This causes a huge number of database reads if we hit the db directly. So we +/// cache these values for a short time, which will already make a huge difference and ensures that +/// changes take effect quickly. +const BLOCKLIST_CACHE_DURATION: Duration = Duration::from_secs(60); static CONTEXT: Lazy> = Lazy::new(|| { serde_json::from_str(include_str!("../assets/lemmy/context.json")).expect("parse context") }); #[derive(Clone)] -pub struct VerifyUrlData(pub DbPool); +pub struct VerifyUrlData(pub ActualDbPool); #[async_trait] impl UrlVerifier for VerifyUrlData { async fn verify(&self, url: &Url) -> Result<(), &'static str> { - let local_site_data = fetch_local_site_data(&self.0) + let local_site_data = local_site_data_cached(&mut (&self.0).into()) .await .expect("read local site data"); check_apub_id_valid(url, &local_site_data)?; @@ -53,9 +54,6 @@ impl UrlVerifier for VerifyUrlData { /// - the correct scheme (either http or https) /// - URL being in the allowlist (if it is active) /// - URL not being in the blocklist (if it is active) -/// -/// `use_strict_allowlist` should be true only when parsing a remote community, or when parsing a -/// post/comment in a local community. #[tracing::instrument(skip(local_site_data))] fn check_apub_id_valid(apub_id: &Url, local_site_data: &LocalSiteData) -> Result<(), &'static str> { let domain = apub_id.domain().expect("apud id has domain").to_string(); @@ -97,36 +95,59 @@ pub(crate) struct LocalSiteData { blocked_instances: Vec, } -pub(crate) async fn fetch_local_site_data( - pool: &DbPool, -) -> Result { - // LocalSite may be missing - let local_site = LocalSite::read(pool).await.ok(); - let allowed_instances = Instance::allowlist(pool).await?; - let blocked_instances = Instance::blocklist(pool).await?; - - Ok(LocalSiteData { - local_site, - allowed_instances, - blocked_instances, - }) +pub(crate) async fn local_site_data_cached( + pool: &mut DbPool<'_>, +) -> LemmyResult> { + static CACHE: Lazy>> = Lazy::new(|| { + Cache::builder() + .max_capacity(1) + .time_to_live(BLOCKLIST_CACHE_DURATION) + .build() + }); + Ok( + CACHE + .try_get_with((), async { + let (local_site, allowed_instances, blocked_instances) = + lemmy_db_schema::try_join_with_pool!(pool => ( + // LocalSite may be missing + |pool| async { + Ok(LocalSite::read(pool).await.ok()) + }, + Instance::allowlist, + Instance::blocklist + ))?; + + Ok::<_, diesel::result::Error>(Arc::new(LocalSiteData { + local_site, + allowed_instances, + blocked_instances, + })) + }) + .await?, + ) } -#[tracing::instrument(skip(settings, local_site_data))] -pub(crate) fn check_apub_id_valid_with_strictness( +pub(crate) async fn check_apub_id_valid_with_strictness( apub_id: &Url, is_strict: bool, - local_site_data: &LocalSiteData, - settings: &Settings, + context: &LemmyContext, ) -> Result<(), LemmyError> { let domain = apub_id.domain().expect("apud id has domain").to_string(); - let local_instance = settings + let local_instance = context + .settings() .get_hostname_without_port() .expect("local hostname is valid"); if domain == local_instance { return Ok(()); } - check_apub_id_valid(apub_id, local_site_data).map_err(LemmyError::from_message)?; + + let local_site_data = local_site_data_cached(&mut context.pool()).await?; + check_apub_id_valid(apub_id, &local_site_data).map_err(|err| match err { + "Federation disabled" => LemmyErrorType::FederationDisabled, + "Domain is blocked" => LemmyErrorType::DomainBlocked, + "Domain is not in allowlist" => LemmyErrorType::DomainNotInAllowList, + _ => panic!("Could not handle apub error!"), + })?; // Only check allowlist if this is a community, and there are instances in the allowlist if is_strict && !local_site_data.allowed_instances.is_empty() { @@ -137,51 +158,36 @@ pub(crate) fn check_apub_id_valid_with_strictness( .iter() .map(|i| i.domain.clone()) .collect::>(); - let local_instance = settings + let local_instance = context + .settings() .get_hostname_without_port() .expect("local hostname is valid"); allowed_and_local.push(local_instance); let domain = apub_id.domain().expect("apud id has domain").to_string(); if !allowed_and_local.contains(&domain) { - return Err(LemmyError::from_message( - "Federation forbidden by strict allowlist", - )); + return Err(LemmyErrorType::FederationDisabledByStrictAllowList)?; } } Ok(()) } -/// Store a sent or received activity in the database. +/// Store received activities in the database. /// -/// Stored activities are served over the HTTP endpoint `GET /activities/{type_}/{id}`. This also -/// ensures that the same activity cannot be received more than once. -#[tracing::instrument(skip(data, activity))] -async fn insert_activity( +/// This ensures that the same activity doesnt get received and processed more than once, which +/// would be a waste of resources. +#[tracing::instrument(skip(data))] +async fn insert_received_activity( ap_id: &Url, - activity: &T, - local: bool, - sensitive: bool, data: &Data, -) -> Result<(), LemmyError> -where - T: Serialize, -{ - let ap_id = ap_id.clone().into(); - let form = ActivityInsertForm { - ap_id, - data: serde_json::to_value(activity)?, - local: Some(local), - sensitive: Some(sensitive), - updated: None, - }; - Activity::create(data.pool(), &form).await?; +) -> Result<(), LemmyError> { + ReceivedActivity::create(&mut data.pool(), &ap_id.clone().into()).await?; Ok(()) } #[async_trait::async_trait] pub trait SendActivity: Sync { - type Response: Sync + Send; + type Response: Sync + Send + Clone; async fn send_activity( _request: &Self,