]> Untitled Git - lemmy.git/blob - crates/apub/src/lib.rs
8d818602262eb031971edd65649f1ac1f9dc7804
[lemmy.git] / crates / apub / src / lib.rs
1 use crate::fetcher::post_or_comment::PostOrComment;
2 use activitypub_federation::config::{Data, UrlVerifier};
3 use async_trait::async_trait;
4 use lemmy_api_common::context::LemmyContext;
5 use lemmy_db_schema::{
6   source::{
7     activity::{Activity, ActivityInsertForm},
8     instance::Instance,
9     local_site::LocalSite,
10   },
11   traits::Crud,
12   utils::{ActualDbPool, DbPool},
13 };
14 use lemmy_utils::error::{LemmyError, LemmyErrorType, LemmyResult};
15 use moka::future::Cache;
16 use once_cell::sync::Lazy;
17 use serde::Serialize;
18 use std::{sync::Arc, time::Duration};
19 use url::Url;
20
21 pub mod activities;
22 pub(crate) mod activity_lists;
23 pub mod api;
24 pub(crate) mod collections;
25 pub mod fetcher;
26 pub mod http;
27 pub(crate) mod mentions;
28 pub mod objects;
29 pub mod protocol;
30
31 pub const FEDERATION_HTTP_FETCH_LIMIT: u32 = 50;
32 /// All incoming and outgoing federation actions read the blocklist/allowlist and slur filters
33 /// multiple times. This causes a huge number of database reads if we hit the db directly. So we
34 /// cache these values for a short time, which will already make a huge difference and ensures that
35 /// changes take effect quickly.
36 const BLOCKLIST_CACHE_DURATION: Duration = Duration::from_secs(60);
37
38 static CONTEXT: Lazy<Vec<serde_json::Value>> = Lazy::new(|| {
39   serde_json::from_str(include_str!("../assets/lemmy/context.json")).expect("parse context")
40 });
41
42 #[derive(Clone)]
43 pub struct VerifyUrlData(pub ActualDbPool);
44
45 #[async_trait]
46 impl UrlVerifier for VerifyUrlData {
47   async fn verify(&self, url: &Url) -> Result<(), &'static str> {
48     let local_site_data = local_site_data_cached(&mut (&self.0).into())
49       .await
50       .expect("read local site data");
51     check_apub_id_valid(url, &local_site_data)?;
52     Ok(())
53   }
54 }
55
56 /// Checks if the ID is allowed for sending or receiving.
57 ///
58 /// In particular, it checks for:
59 /// - federation being enabled (if its disabled, only local URLs are allowed)
60 /// - the correct scheme (either http or https)
61 /// - URL being in the allowlist (if it is active)
62 /// - URL not being in the blocklist (if it is active)
63 #[tracing::instrument(skip(local_site_data))]
64 fn check_apub_id_valid(apub_id: &Url, local_site_data: &LocalSiteData) -> Result<(), &'static str> {
65   let domain = apub_id.domain().expect("apud id has domain").to_string();
66
67   if !local_site_data
68     .local_site
69     .as_ref()
70     .map(|l| l.federation_enabled)
71     .unwrap_or(true)
72   {
73     return Err("Federation disabled");
74   }
75
76   if local_site_data
77     .blocked_instances
78     .iter()
79     .any(|i| domain.eq(&i.domain))
80   {
81     return Err("Domain is blocked");
82   }
83
84   // Only check this if there are instances in the allowlist
85   if !local_site_data.allowed_instances.is_empty()
86     && !local_site_data
87       .allowed_instances
88       .iter()
89       .any(|i| domain.eq(&i.domain))
90   {
91     return Err("Domain is not in allowlist");
92   }
93
94   Ok(())
95 }
96
97 #[derive(Clone)]
98 pub(crate) struct LocalSiteData {
99   local_site: Option<LocalSite>,
100   allowed_instances: Vec<Instance>,
101   blocked_instances: Vec<Instance>,
102 }
103
104 pub(crate) async fn local_site_data_cached(
105   pool: &mut DbPool<'_>,
106 ) -> LemmyResult<Arc<LocalSiteData>> {
107   static CACHE: Lazy<Cache<(), Arc<LocalSiteData>>> = Lazy::new(|| {
108     Cache::builder()
109       .max_capacity(1)
110       .time_to_live(BLOCKLIST_CACHE_DURATION)
111       .build()
112   });
113   Ok(
114     CACHE
115       .try_get_with((), async {
116         let (local_site, allowed_instances, blocked_instances) =
117           lemmy_db_schema::try_join_with_pool!(pool => (
118             // LocalSite may be missing
119             |pool| async {
120               Ok(LocalSite::read(pool).await.ok())
121             },
122             Instance::allowlist,
123             Instance::blocklist
124           ))?;
125
126         Ok::<_, diesel::result::Error>(Arc::new(LocalSiteData {
127           local_site,
128           allowed_instances,
129           blocked_instances,
130         }))
131       })
132       .await?,
133   )
134 }
135
136 pub(crate) async fn check_apub_id_valid_with_strictness(
137   apub_id: &Url,
138   is_strict: bool,
139   context: &LemmyContext,
140 ) -> Result<(), LemmyError> {
141   let domain = apub_id.domain().expect("apud id has domain").to_string();
142   let local_instance = context
143     .settings()
144     .get_hostname_without_port()
145     .expect("local hostname is valid");
146   if domain == local_instance {
147     return Ok(());
148   }
149
150   let local_site_data = local_site_data_cached(&mut context.pool()).await?;
151   check_apub_id_valid(apub_id, &local_site_data).map_err(|err| match err {
152     "Federation disabled" => LemmyErrorType::FederationDisabled,
153     "Domain is blocked" => LemmyErrorType::DomainBlocked,
154     "Domain is not in allowlist" => LemmyErrorType::DomainNotInAllowList,
155     _ => panic!("Could not handle apub error!"),
156   })?;
157
158   // Only check allowlist if this is a community, and there are instances in the allowlist
159   if is_strict && !local_site_data.allowed_instances.is_empty() {
160     // need to allow this explicitly because apub receive might contain objects from our local
161     // instance.
162     let mut allowed_and_local = local_site_data
163       .allowed_instances
164       .iter()
165       .map(|i| i.domain.clone())
166       .collect::<Vec<String>>();
167     let local_instance = context
168       .settings()
169       .get_hostname_without_port()
170       .expect("local hostname is valid");
171     allowed_and_local.push(local_instance);
172
173     let domain = apub_id.domain().expect("apud id has domain").to_string();
174     if !allowed_and_local.contains(&domain) {
175       return Err(LemmyErrorType::FederationDisabledByStrictAllowList)?;
176     }
177   }
178   Ok(())
179 }
180
181 /// Store a sent or received activity in the database.
182 ///
183 /// Stored activities are served over the HTTP endpoint `GET /activities/{type_}/{id}`. This also
184 /// ensures that the same activity cannot be received more than once.
185 #[tracing::instrument(skip(data, activity))]
186 async fn insert_activity<T>(
187   ap_id: &Url,
188   activity: &T,
189   local: bool,
190   sensitive: bool,
191   data: &Data<LemmyContext>,
192 ) -> Result<(), LemmyError>
193 where
194   T: Serialize,
195 {
196   let ap_id = ap_id.clone().into();
197   let form = ActivityInsertForm {
198     ap_id,
199     data: serde_json::to_value(activity)?,
200     local: Some(local),
201     sensitive: Some(sensitive),
202     updated: None,
203   };
204   Activity::create(&mut data.pool(), &form).await?;
205   Ok(())
206 }
207
208 #[async_trait::async_trait]
209 pub trait SendActivity: Sync {
210   type Response: Sync + Send + Clone;
211
212   async fn send_activity(
213     _request: &Self,
214     _response: &Self::Response,
215     _context: &Data<LemmyContext>,
216   ) -> Result<(), LemmyError> {
217     Ok(())
218   }
219 }