]> Untitled Git - lemmy.git/blob - crates/apub/src/lib.rs
Split activity table into sent and received parts (fixes #3103) (#3583)
[lemmy.git] / crates / apub / src / lib.rs
1 use crate::fetcher::post_or_comment::PostOrComment;
2 use activitypub_federation::config::{Data, UrlVerifier};
3 use async_trait::async_trait;
4 use lemmy_api_common::context::LemmyContext;
5 use lemmy_db_schema::{
6   source::{activity::ReceivedActivity, instance::Instance, local_site::LocalSite},
7   utils::{ActualDbPool, DbPool},
8 };
9 use lemmy_utils::error::{LemmyError, LemmyErrorType, LemmyResult};
10 use moka::future::Cache;
11 use once_cell::sync::Lazy;
12 use std::{sync::Arc, time::Duration};
13 use url::Url;
14
15 pub mod activities;
16 pub(crate) mod activity_lists;
17 pub mod api;
18 pub(crate) mod collections;
19 pub mod fetcher;
20 pub mod http;
21 pub(crate) mod mentions;
22 pub mod objects;
23 pub mod protocol;
24
25 pub const FEDERATION_HTTP_FETCH_LIMIT: u32 = 50;
26 /// All incoming and outgoing federation actions read the blocklist/allowlist and slur filters
27 /// multiple times. This causes a huge number of database reads if we hit the db directly. So we
28 /// cache these values for a short time, which will already make a huge difference and ensures that
29 /// changes take effect quickly.
30 const BLOCKLIST_CACHE_DURATION: Duration = Duration::from_secs(60);
31
32 static CONTEXT: Lazy<Vec<serde_json::Value>> = Lazy::new(|| {
33   serde_json::from_str(include_str!("../assets/lemmy/context.json")).expect("parse context")
34 });
35
36 #[derive(Clone)]
37 pub struct VerifyUrlData(pub ActualDbPool);
38
39 #[async_trait]
40 impl UrlVerifier for VerifyUrlData {
41   async fn verify(&self, url: &Url) -> Result<(), &'static str> {
42     let local_site_data = local_site_data_cached(&mut (&self.0).into())
43       .await
44       .expect("read local site data");
45     check_apub_id_valid(url, &local_site_data)?;
46     Ok(())
47   }
48 }
49
50 /// Checks if the ID is allowed for sending or receiving.
51 ///
52 /// In particular, it checks for:
53 /// - federation being enabled (if its disabled, only local URLs are allowed)
54 /// - the correct scheme (either http or https)
55 /// - URL being in the allowlist (if it is active)
56 /// - URL not being in the blocklist (if it is active)
57 #[tracing::instrument(skip(local_site_data))]
58 fn check_apub_id_valid(apub_id: &Url, local_site_data: &LocalSiteData) -> Result<(), &'static str> {
59   let domain = apub_id.domain().expect("apud id has domain").to_string();
60
61   if !local_site_data
62     .local_site
63     .as_ref()
64     .map(|l| l.federation_enabled)
65     .unwrap_or(true)
66   {
67     return Err("Federation disabled");
68   }
69
70   if local_site_data
71     .blocked_instances
72     .iter()
73     .any(|i| domain.eq(&i.domain))
74   {
75     return Err("Domain is blocked");
76   }
77
78   // Only check this if there are instances in the allowlist
79   if !local_site_data.allowed_instances.is_empty()
80     && !local_site_data
81       .allowed_instances
82       .iter()
83       .any(|i| domain.eq(&i.domain))
84   {
85     return Err("Domain is not in allowlist");
86   }
87
88   Ok(())
89 }
90
91 #[derive(Clone)]
92 pub(crate) struct LocalSiteData {
93   local_site: Option<LocalSite>,
94   allowed_instances: Vec<Instance>,
95   blocked_instances: Vec<Instance>,
96 }
97
98 pub(crate) async fn local_site_data_cached(
99   pool: &mut DbPool<'_>,
100 ) -> LemmyResult<Arc<LocalSiteData>> {
101   static CACHE: Lazy<Cache<(), Arc<LocalSiteData>>> = Lazy::new(|| {
102     Cache::builder()
103       .max_capacity(1)
104       .time_to_live(BLOCKLIST_CACHE_DURATION)
105       .build()
106   });
107   Ok(
108     CACHE
109       .try_get_with((), async {
110         let (local_site, allowed_instances, blocked_instances) =
111           lemmy_db_schema::try_join_with_pool!(pool => (
112             // LocalSite may be missing
113             |pool| async {
114               Ok(LocalSite::read(pool).await.ok())
115             },
116             Instance::allowlist,
117             Instance::blocklist
118           ))?;
119
120         Ok::<_, diesel::result::Error>(Arc::new(LocalSiteData {
121           local_site,
122           allowed_instances,
123           blocked_instances,
124         }))
125       })
126       .await?,
127   )
128 }
129
130 pub(crate) async fn check_apub_id_valid_with_strictness(
131   apub_id: &Url,
132   is_strict: bool,
133   context: &LemmyContext,
134 ) -> Result<(), LemmyError> {
135   let domain = apub_id.domain().expect("apud id has domain").to_string();
136   let local_instance = context
137     .settings()
138     .get_hostname_without_port()
139     .expect("local hostname is valid");
140   if domain == local_instance {
141     return Ok(());
142   }
143
144   let local_site_data = local_site_data_cached(&mut context.pool()).await?;
145   check_apub_id_valid(apub_id, &local_site_data).map_err(|err| match err {
146     "Federation disabled" => LemmyErrorType::FederationDisabled,
147     "Domain is blocked" => LemmyErrorType::DomainBlocked,
148     "Domain is not in allowlist" => LemmyErrorType::DomainNotInAllowList,
149     _ => panic!("Could not handle apub error!"),
150   })?;
151
152   // Only check allowlist if this is a community, and there are instances in the allowlist
153   if is_strict && !local_site_data.allowed_instances.is_empty() {
154     // need to allow this explicitly because apub receive might contain objects from our local
155     // instance.
156     let mut allowed_and_local = local_site_data
157       .allowed_instances
158       .iter()
159       .map(|i| i.domain.clone())
160       .collect::<Vec<String>>();
161     let local_instance = context
162       .settings()
163       .get_hostname_without_port()
164       .expect("local hostname is valid");
165     allowed_and_local.push(local_instance);
166
167     let domain = apub_id.domain().expect("apud id has domain").to_string();
168     if !allowed_and_local.contains(&domain) {
169       return Err(LemmyErrorType::FederationDisabledByStrictAllowList)?;
170     }
171   }
172   Ok(())
173 }
174
175 /// Store received activities in the database.
176 ///
177 /// This ensures that the same activity doesnt get received and processed more than once, which
178 /// would be a waste of resources.
179 #[tracing::instrument(skip(data))]
180 async fn insert_received_activity(
181   ap_id: &Url,
182   data: &Data<LemmyContext>,
183 ) -> Result<(), LemmyError> {
184   ReceivedActivity::create(&mut data.pool(), &ap_id.clone().into()).await?;
185   Ok(())
186 }
187
188 #[async_trait::async_trait]
189 pub trait SendActivity: Sync {
190   type Response: Sync + Send + Clone;
191
192   async fn send_activity(
193     _request: &Self,
194     _response: &Self::Response,
195     _context: &Data<LemmyContext>,
196   ) -> Result<(), LemmyError> {
197     Ok(())
198   }
199 }