]> Untitled Git - lemmy.git/blobdiff - crates/apub/src/lib.rs
Split activity table into sent and received parts (fixes #3103) (#3583)
[lemmy.git] / crates / apub / src / lib.rs
index f7ef22eece91375f1e06d24027578a2805ba16c0..9a45284f22fbf609dfbbdf0196847e685b8cdbf0 100644 (file)
@@ -1,21 +1,14 @@
 use crate::fetcher::post_or_comment::PostOrComment;
 use activitypub_federation::config::{Data, UrlVerifier};
 use async_trait::async_trait;
-use futures::future::join3;
 use lemmy_api_common::context::LemmyContext;
 use lemmy_db_schema::{
-  source::{
-    activity::{Activity, ActivityInsertForm},
-    instance::Instance,
-    local_site::LocalSite,
-  },
-  traits::Crud,
-  utils::DbPool,
+  source::{activity::ReceivedActivity, instance::Instance, local_site::LocalSite},
+  utils::{ActualDbPool, DbPool},
 };
 use lemmy_utils::error::{LemmyError, LemmyErrorType, LemmyResult};
 use moka::future::Cache;
 use once_cell::sync::Lazy;
-use serde::Serialize;
 use std::{sync::Arc, time::Duration};
 use url::Url;
 
@@ -41,12 +34,12 @@ static CONTEXT: Lazy<Vec<serde_json::Value>> = Lazy::new(|| {
 });
 
 #[derive(Clone)]
-pub struct VerifyUrlData(pub DbPool);
+pub struct VerifyUrlData(pub ActualDbPool);
 
 #[async_trait]
 impl UrlVerifier for VerifyUrlData {
   async fn verify(&self, url: &Url) -> Result<(), &'static str> {
-    let local_site_data = local_site_data_cached(&self.0)
+    let local_site_data = local_site_data_cached(&mut (&self.0).into())
       .await
       .expect("read local site data");
     check_apub_id_valid(url, &local_site_data)?;
@@ -102,7 +95,9 @@ pub(crate) struct LocalSiteData {
   blocked_instances: Vec<Instance>,
 }
 
-pub(crate) async fn local_site_data_cached(pool: &DbPool) -> LemmyResult<Arc<LocalSiteData>> {
+pub(crate) async fn local_site_data_cached(
+  pool: &mut DbPool<'_>,
+) -> LemmyResult<Arc<LocalSiteData>> {
   static CACHE: Lazy<Cache<(), Arc<LocalSiteData>>> = Lazy::new(|| {
     Cache::builder()
       .max_capacity(1)
@@ -112,18 +107,20 @@ pub(crate) async fn local_site_data_cached(pool: &DbPool) -> LemmyResult<Arc<Loc
   Ok(
     CACHE
       .try_get_with((), async {
-        let (local_site, allowed_instances, blocked_instances) = join3(
-          LocalSite::read(pool),
-          Instance::allowlist(pool),
-          Instance::blocklist(pool),
-        )
-        .await;
+        let (local_site, allowed_instances, blocked_instances) =
+          lemmy_db_schema::try_join_with_pool!(pool => (
+            // LocalSite may be missing
+            |pool| async {
+              Ok(LocalSite::read(pool).await.ok())
+            },
+            Instance::allowlist,
+            Instance::blocklist
+          ))?;
 
         Ok::<_, diesel::result::Error>(Arc::new(LocalSiteData {
-          // LocalSite may be missing
-          local_site: local_site.ok(),
-          allowed_instances: allowed_instances?,
-          blocked_instances: blocked_instances?,
+          local_site,
+          allowed_instances,
+          blocked_instances,
         }))
       })
       .await?,
@@ -144,7 +141,7 @@ pub(crate) async fn check_apub_id_valid_with_strictness(
     return Ok(());
   }
 
-  let local_site_data = local_site_data_cached(context.pool()).await?;
+  let local_site_data = local_site_data_cached(&mut context.pool()).await?;
   check_apub_id_valid(apub_id, &local_site_data).map_err(|err| match err {
     "Federation disabled" => LemmyErrorType::FederationDisabled,
     "Domain is blocked" => LemmyErrorType::DomainBlocked,
@@ -175,30 +172,16 @@ pub(crate) async fn check_apub_id_valid_with_strictness(
   Ok(())
 }
 
-/// Store a sent or received activity in the database.
+/// Store received activities in the database.
 ///
-/// Stored activities are served over the HTTP endpoint `GET /activities/{type_}/{id}`. This also
-/// ensures that the same activity cannot be received more than once.
-#[tracing::instrument(skip(data, activity))]
-async fn insert_activity<T>(
+/// This ensures that the same activity doesnt get received and processed more than once, which
+/// would be a waste of resources.
+#[tracing::instrument(skip(data))]
+async fn insert_received_activity(
   ap_id: &Url,
-  activity: &T,
-  local: bool,
-  sensitive: bool,
   data: &Data<LemmyContext>,
-) -> Result<(), LemmyError>
-where
-  T: Serialize,
-{
-  let ap_id = ap_id.clone().into();
-  let form = ActivityInsertForm {
-    ap_id,
-    data: serde_json::to_value(activity)?,
-    local: Some(local),
-    sensitive: Some(sensitive),
-    updated: None,
-  };
-  Activity::create(data.pool(), &form).await?;
+) -> Result<(), LemmyError> {
+  ReceivedActivity::create(&mut data.pool(), &ap_id.clone().into()).await?;
   Ok(())
 }