]> Untitled Git - lemmy.git/blob - crates/apub/src/lib.rs
Cache federation blocklist (#3486)
[lemmy.git] / crates / apub / src / lib.rs
1 use crate::fetcher::post_or_comment::PostOrComment;
2 use activitypub_federation::config::{Data, UrlVerifier};
3 use async_trait::async_trait;
4 use futures::future::join3;
5 use lemmy_api_common::context::LemmyContext;
6 use lemmy_db_schema::{
7   source::{
8     activity::{Activity, ActivityInsertForm},
9     instance::Instance,
10     local_site::LocalSite,
11   },
12   traits::Crud,
13   utils::DbPool,
14 };
15 use lemmy_utils::error::{LemmyError, LemmyResult};
16 use moka::future::Cache;
17 use once_cell::sync::Lazy;
18 use serde::Serialize;
19 use std::{sync::Arc, time::Duration};
20 use url::Url;
21
22 pub mod activities;
23 pub(crate) mod activity_lists;
24 pub mod api;
25 pub(crate) mod collections;
26 pub mod fetcher;
27 pub mod http;
28 pub(crate) mod mentions;
29 pub mod objects;
30 pub mod protocol;
31
32 pub const FEDERATION_HTTP_FETCH_LIMIT: u32 = 50;
33 /// All incoming and outgoing federation actions read the blocklist/allowlist and slur filters
34 /// multiple times. This causes a huge number of database reads if we hit the db directly. So we
35 /// cache these values for a short time, which will already make a huge difference and ensures that
36 /// changes take effect quickly.
37 const BLOCKLIST_CACHE_DURATION: Duration = Duration::from_secs(60);
38
39 static CONTEXT: Lazy<Vec<serde_json::Value>> = Lazy::new(|| {
40   serde_json::from_str(include_str!("../assets/lemmy/context.json")).expect("parse context")
41 });
42
43 #[derive(Clone)]
44 pub struct VerifyUrlData(pub DbPool);
45
46 #[async_trait]
47 impl UrlVerifier for VerifyUrlData {
48   async fn verify(&self, url: &Url) -> Result<(), &'static str> {
49     let local_site_data = local_site_data_cached(&self.0)
50       .await
51       .expect("read local site data");
52     check_apub_id_valid(url, &local_site_data)?;
53     Ok(())
54   }
55 }
56
57 /// Checks if the ID is allowed for sending or receiving.
58 ///
59 /// In particular, it checks for:
60 /// - federation being enabled (if its disabled, only local URLs are allowed)
61 /// - the correct scheme (either http or https)
62 /// - URL being in the allowlist (if it is active)
63 /// - URL not being in the blocklist (if it is active)
64 #[tracing::instrument(skip(local_site_data))]
65 fn check_apub_id_valid(apub_id: &Url, local_site_data: &LocalSiteData) -> Result<(), &'static str> {
66   let domain = apub_id.domain().expect("apud id has domain").to_string();
67
68   if !local_site_data
69     .local_site
70     .as_ref()
71     .map(|l| l.federation_enabled)
72     .unwrap_or(true)
73   {
74     return Err("Federation disabled");
75   }
76
77   if local_site_data
78     .blocked_instances
79     .iter()
80     .any(|i| domain.eq(&i.domain))
81   {
82     return Err("Domain is blocked");
83   }
84
85   // Only check this if there are instances in the allowlist
86   if !local_site_data.allowed_instances.is_empty()
87     && !local_site_data
88       .allowed_instances
89       .iter()
90       .any(|i| domain.eq(&i.domain))
91   {
92     return Err("Domain is not in allowlist");
93   }
94
95   Ok(())
96 }
97
98 #[derive(Clone)]
99 pub(crate) struct LocalSiteData {
100   local_site: Option<LocalSite>,
101   allowed_instances: Vec<Instance>,
102   blocked_instances: Vec<Instance>,
103 }
104
105 pub(crate) async fn local_site_data_cached(pool: &DbPool) -> LemmyResult<Arc<LocalSiteData>> {
106   static CACHE: Lazy<Cache<(), Arc<LocalSiteData>>> = Lazy::new(|| {
107     Cache::builder()
108       .max_capacity(1)
109       .time_to_live(BLOCKLIST_CACHE_DURATION)
110       .build()
111   });
112   Ok(
113     CACHE
114       .try_get_with((), async {
115         let (local_site, allowed_instances, blocked_instances) = join3(
116           LocalSite::read(pool),
117           Instance::allowlist(pool),
118           Instance::blocklist(pool),
119         )
120         .await;
121
122         Ok::<_, diesel::result::Error>(Arc::new(LocalSiteData {
123           // LocalSite may be missing
124           local_site: local_site.ok(),
125           allowed_instances: allowed_instances?,
126           blocked_instances: blocked_instances?,
127         }))
128       })
129       .await?,
130   )
131 }
132
133 pub(crate) async fn check_apub_id_valid_with_strictness(
134   apub_id: &Url,
135   is_strict: bool,
136   context: &LemmyContext,
137 ) -> Result<(), LemmyError> {
138   let domain = apub_id.domain().expect("apud id has domain").to_string();
139   let local_instance = context
140     .settings()
141     .get_hostname_without_port()
142     .expect("local hostname is valid");
143   if domain == local_instance {
144     return Ok(());
145   }
146
147   let local_site_data = local_site_data_cached(context.pool()).await?;
148   check_apub_id_valid(apub_id, &local_site_data).map_err(LemmyError::from_message)?;
149
150   // Only check allowlist if this is a community, and there are instances in the allowlist
151   if is_strict && !local_site_data.allowed_instances.is_empty() {
152     // need to allow this explicitly because apub receive might contain objects from our local
153     // instance.
154     let mut allowed_and_local = local_site_data
155       .allowed_instances
156       .iter()
157       .map(|i| i.domain.clone())
158       .collect::<Vec<String>>();
159     let local_instance = context
160       .settings()
161       .get_hostname_without_port()
162       .expect("local hostname is valid");
163     allowed_and_local.push(local_instance);
164
165     let domain = apub_id.domain().expect("apud id has domain").to_string();
166     if !allowed_and_local.contains(&domain) {
167       return Err(LemmyError::from_message(
168         "Federation forbidden by strict allowlist",
169       ));
170     }
171   }
172   Ok(())
173 }
174
175 /// Store a sent or received activity in the database.
176 ///
177 /// Stored activities are served over the HTTP endpoint `GET /activities/{type_}/{id}`. This also
178 /// ensures that the same activity cannot be received more than once.
179 #[tracing::instrument(skip(data, activity))]
180 async fn insert_activity<T>(
181   ap_id: &Url,
182   activity: &T,
183   local: bool,
184   sensitive: bool,
185   data: &Data<LemmyContext>,
186 ) -> Result<(), LemmyError>
187 where
188   T: Serialize,
189 {
190   let ap_id = ap_id.clone().into();
191   let form = ActivityInsertForm {
192     ap_id,
193     data: serde_json::to_value(activity)?,
194     local: Some(local),
195     sensitive: Some(sensitive),
196     updated: None,
197   };
198   Activity::create(data.pool(), &form).await?;
199   Ok(())
200 }
201
202 #[async_trait::async_trait]
203 pub trait SendActivity: Sync {
204   type Response: Sync + Send;
205
206   async fn send_activity(
207     _request: &Self,
208     _response: &Self::Response,
209     _context: &Data<LemmyContext>,
210   ) -> Result<(), LemmyError> {
211     Ok(())
212   }
213 }