From: Felix Ableitner <me@nutomic.com>
Date: Wed, 30 Sep 2020 16:19:14 +0000 (+0200)
Subject: Dont send to blocked instances, rewrite activity_sender
X-Git-Url: http://these/git/%7B%24%7B%60data:application/static/git-favicon.png?a=commitdiff_plain;h=a4cb067130f34f4be82698ab648ea5e418a351e6;p=lemmy.git

Dont send to blocked instances, rewrite activity_sender
---

diff --git a/lemmy_apub/src/activities.rs b/lemmy_apub/src/activities.rs
deleted file mode 100644
index 18781ef4..00000000
--- a/lemmy_apub/src/activities.rs
+++ /dev/null
@@ -1,49 +0,0 @@
-use crate::{activity_queue::send_activity, community::do_announce, insert_activity};
-use activitystreams::{
-  base::{Extends, ExtendsExt},
-  object::AsObject,
-};
-use lemmy_db::{community::Community, user::User_};
-use lemmy_utils::{settings::Settings, LemmyError};
-use lemmy_websocket::LemmyContext;
-use serde::{export::fmt::Debug, Serialize};
-use url::{ParseError, Url};
-use uuid::Uuid;
-
-pub async fn send_activity_to_community<T, Kind>(
-  creator: &User_,
-  community: &Community,
-  to: Vec<Url>,
-  activity: T,
-  context: &LemmyContext,
-) -> Result<(), LemmyError>
-where
-  T: AsObject<Kind> + Extends<Kind> + Serialize + Debug + Send + Clone + 'static,
-  Kind: Serialize,
-  <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
-{
-  // TODO: looks like call this sometimes with activity, and sometimes with any_base
-  insert_activity(creator.id, activity.clone(), true, context.pool()).await?;
-
-  // if this is a local community, we need to do an announce from the community instead
-  if community.local {
-    do_announce(activity.into_any_base()?, &community, creator, context).await?;
-  } else {
-    send_activity(context.activity_queue(), activity, creator, to)?;
-  }
-
-  Ok(())
-}
-
-pub(in crate) fn generate_activity_id<T>(kind: T) -> Result<Url, ParseError>
-where
-  T: ToString,
-{
-  let id = format!(
-    "{}/activities/{}/{}",
-    Settings::get().get_protocol_and_hostname(),
-    kind.to_string().to_lowercase(),
-    Uuid::new_v4()
-  );
-  Url::parse(&id)
-}
diff --git a/lemmy_apub/src/activity_queue.rs b/lemmy_apub/src/activity_queue.rs
index ece782c5..960e126b 100644
--- a/lemmy_apub/src/activity_queue.rs
+++ b/lemmy_apub/src/activity_queue.rs
@@ -1,4 +1,10 @@
-use crate::{check_is_apub_id_valid, extensions::signatures::sign, ActorType};
+use crate::{
+  check_is_apub_id_valid,
+  community::do_announce,
+  extensions::signatures::sign,
+  insert_activity,
+  ActorType,
+};
 use activitystreams::{
   base::{Extends, ExtendsExt},
   object::AsObject,
@@ -13,22 +19,148 @@ use background_jobs::{
   QueueHandle,
   WorkerConfig,
 };
+use itertools::Itertools;
+use lemmy_db::{community::Community, user::User_, DbPool};
 use lemmy_utils::{location_info, settings::Settings, LemmyError};
+use lemmy_websocket::LemmyContext;
 use log::warn;
 use reqwest::Client;
 use serde::{Deserialize, Serialize};
 use std::{collections::BTreeMap, future::Future, pin::Pin};
 use url::Url;
 
-pub fn send_activity<T, Kind>(
+pub async fn send_activity_single_dest<T, Kind>(
+  activity: T,
+  creator: &dyn ActorType,
+  to: Url,
+  context: &LemmyContext,
+) -> Result<(), LemmyError>
+where
+  T: AsObject<Kind> + Extends<Kind>,
+  Kind: Serialize,
+  <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
+{
+  if check_is_apub_id_valid(&to).is_ok() {
+    send_activity_internal(
+      context.activity_queue(),
+      activity,
+      creator,
+      vec![to],
+      context.pool(),
+    )
+    .await?;
+  }
+
+  Ok(())
+}
+
+pub async fn send_to_community_followers<T, Kind>(
+  activity: T,
+  community: &Community,
+  context: &LemmyContext,
+  sender_shared_inbox: Option<Url>,
+) -> Result<(), LemmyError>
+where
+  T: AsObject<Kind> + Extends<Kind>,
+  Kind: Serialize,
+  <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
+{
+  // dont send to the local instance, nor to the instance where the activity originally came from,
+  // because that would result in a database error (same data inserted twice)
+  let community_shared_inbox = community.get_shared_inbox_url()?;
+  let to: Vec<Url> = community
+    .get_follower_inboxes(context.pool())
+    .await?
+    .iter()
+    .filter(|inbox| Some(inbox) != sender_shared_inbox.as_ref().as_ref())
+    .filter(|inbox| inbox != &&community_shared_inbox)
+    .filter(|inbox| check_is_apub_id_valid(inbox).is_ok())
+    .unique()
+    .map(|inbox| inbox.to_owned())
+    .collect();
+
+  send_activity_internal(
+    context.activity_queue(),
+    activity,
+    community,
+    to,
+    context.pool(),
+  )
+  .await?;
+
+  Ok(())
+}
+
+pub async fn send_to_community<T, Kind>(
+  creator: &User_,
+  community: &Community,
+  activity: T,
+  context: &LemmyContext,
+) -> Result<(), LemmyError>
+where
+  T: AsObject<Kind> + Extends<Kind>,
+  Kind: Serialize,
+  <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
+{
+  // if this is a local community, we need to do an announce from the community instead
+  if community.local {
+    do_announce(activity.into_any_base()?, &community, creator, context).await?;
+  } else {
+    let inbox = community.get_shared_inbox_url()?;
+    check_is_apub_id_valid(&inbox)?;
+    send_activity_internal(
+      context.activity_queue(),
+      activity,
+      creator,
+      vec![inbox],
+      context.pool(),
+    )
+    .await?;
+  }
+
+  Ok(())
+}
+
+pub async fn send_comment_mentions<T, Kind>(
+  creator: &User_,
+  mentions: Vec<Url>,
+  activity: T,
+  context: &LemmyContext,
+) -> Result<(), LemmyError>
+where
+  T: AsObject<Kind> + Extends<Kind>,
+  Kind: Serialize,
+  <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
+{
+  let mentions = mentions
+    .iter()
+    .filter(|inbox| check_is_apub_id_valid(inbox).is_ok())
+    .map(|i| i.to_owned())
+    .collect();
+  send_activity_internal(
+    context.activity_queue(),
+    activity,
+    creator,
+    mentions,
+    context.pool(),
+  )
+  .await?;
+  Ok(())
+}
+
+/// Asynchronously sends the given `activity` from `actor` to every inbox URL in `to`.
+///
+/// The caller of this function needs to remove any blocked domains from `to`,
+/// using `check_is_apub_id_valid()`.
+async fn send_activity_internal<T, Kind>(
   activity_sender: &QueueHandle,
   activity: T,
   actor: &dyn ActorType,
   to: Vec<Url>,
+  pool: &DbPool,
 ) -> Result<(), LemmyError>
 where
-  T: AsObject<Kind>,
-  T: Extends<Kind>,
+  T: AsObject<Kind> + Extends<Kind>,
   Kind: Serialize,
   <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
 {
@@ -36,13 +168,14 @@ where
     return Ok(());
   }
 
-  let activity = activity.into_any_base()?;
-  let serialised_activity = serde_json::to_string(&activity)?;
-
   for to_url in &to {
-    check_is_apub_id_valid(&to_url)?;
+    assert!(check_is_apub_id_valid(&to_url).is_ok());
   }
 
+  let activity = activity.into_any_base()?;
+  let serialised_activity = serde_json::to_string(&activity)?;
+  insert_activity(actor.user_id(), serialised_activity.clone(), true, pool).await?;
+
   // TODO: it would make sense to create a separate task for each destination server
   let message = SendActivityTask {
     activity: serialised_activity,
diff --git a/lemmy_apub/src/comment.rs b/lemmy_apub/src/comment.rs
index 4e5c173f..7f6885a1 100644
--- a/lemmy_apub/src/comment.rs
+++ b/lemmy_apub/src/comment.rs
@@ -1,5 +1,5 @@
 use crate::{
-  activities::{generate_activity_id, send_activity_to_community},
+  activity_queue::{send_comment_mentions, send_to_community},
   check_actor_domain,
   create_apub_response,
   create_apub_tombstone_response,
@@ -10,6 +10,7 @@ use crate::{
     get_or_fetch_and_insert_post,
     get_or_fetch_and_upsert_user,
   },
+  generate_activity_id,
   ActorType,
   ApubLikeableType,
   ApubObjectType,
@@ -219,7 +220,8 @@ impl ApubObjectType for Comment {
       // Set the mention tags
       .set_many_tags(maa.get_tags()?);
 
-    send_activity_to_community(&creator, &community, maa.inboxes, create, context).await?;
+    send_to_community(&creator, &community, create.clone(), context).await?;
+    send_comment_mentions(&creator, maa.inboxes, create, context).await?;
     Ok(())
   }
 
@@ -247,7 +249,8 @@ impl ApubObjectType for Comment {
       // Set the mention tags
       .set_many_tags(maa.get_tags()?);
 
-    send_activity_to_community(&creator, &community, maa.inboxes, update, context).await?;
+    send_to_community(&creator, &community, update.clone(), context).await?;
+    send_comment_mentions(&creator, maa.inboxes, update, context).await?;
     Ok(())
   }
 
@@ -270,14 +273,7 @@ impl ApubObjectType for Comment {
       .set_to(public())
       .set_many_ccs(vec![community.get_followers_url()?]);
 
-    send_activity_to_community(
-      &creator,
-      &community,
-      vec![community.get_shared_inbox_url()?],
-      delete,
-      context,
-    )
-    .await?;
+    send_to_community(&creator, &community, delete, context).await?;
     Ok(())
   }
 
@@ -313,14 +309,7 @@ impl ApubObjectType for Comment {
       .set_to(public())
       .set_many_ccs(vec![community.get_followers_url()?]);
 
-    send_activity_to_community(
-      &creator,
-      &community,
-      vec![community.get_shared_inbox_url()?],
-      undo,
-      context,
-    )
-    .await?;
+    send_to_community(&creator, &community, undo, context).await?;
     Ok(())
   }
 
@@ -343,14 +332,7 @@ impl ApubObjectType for Comment {
       .set_to(public())
       .set_many_ccs(vec![community.get_followers_url()?]);
 
-    send_activity_to_community(
-      &mod_,
-      &community,
-      vec![community.get_shared_inbox_url()?],
-      remove,
-      context,
-    )
-    .await?;
+    send_to_community(&mod_, &community, remove, context).await?;
     Ok(())
   }
 
@@ -382,14 +364,7 @@ impl ApubObjectType for Comment {
       .set_to(public())
       .set_many_ccs(vec![community.get_followers_url()?]);
 
-    send_activity_to_community(
-      &mod_,
-      &community,
-      vec![community.get_shared_inbox_url()?],
-      undo,
-      context,
-    )
-    .await?;
+    send_to_community(&mod_, &community, undo, context).await?;
     Ok(())
   }
 }
@@ -415,14 +390,7 @@ impl ApubLikeableType for Comment {
       .set_to(public())
       .set_many_ccs(vec![community.get_followers_url()?]);
 
-    send_activity_to_community(
-      &creator,
-      &community,
-      vec![community.get_shared_inbox_url()?],
-      like,
-      context,
-    )
-    .await?;
+    send_to_community(&creator, &community, like, context).await?;
     Ok(())
   }
 
@@ -445,14 +413,7 @@ impl ApubLikeableType for Comment {
       .set_to(public())
       .set_many_ccs(vec![community.get_followers_url()?]);
 
-    send_activity_to_community(
-      &creator,
-      &community,
-      vec![community.get_shared_inbox_url()?],
-      dislike,
-      context,
-    )
-    .await?;
+    send_to_community(&creator, &community, dislike, context).await?;
     Ok(())
   }
 
@@ -487,14 +448,7 @@ impl ApubLikeableType for Comment {
       .set_to(public())
       .set_many_ccs(vec![community.get_followers_url()?]);
 
-    send_activity_to_community(
-      &creator,
-      &community,
-      vec![community.get_shared_inbox_url()?],
-      undo,
-      context,
-    )
-    .await?;
+    send_to_community(&creator, &community, undo, context).await?;
     Ok(())
   }
 }
diff --git a/lemmy_apub/src/community.rs b/lemmy_apub/src/community.rs
index 715b765b..44f5e6e1 100644
--- a/lemmy_apub/src/community.rs
+++ b/lemmy_apub/src/community.rs
@@ -1,13 +1,13 @@
 use crate::{
-  activities::generate_activity_id,
-  activity_queue::send_activity,
+  activity_queue::{send_activity_single_dest, send_to_community_followers},
   check_actor_domain,
+  check_is_apub_id_valid,
   create_apub_response,
   create_apub_tombstone_response,
   create_tombstone,
   extensions::group_extensions::GroupExtension,
   fetcher::{get_or_fetch_and_upsert_actor, get_or_fetch_and_upsert_user},
-  insert_activity,
+  generate_activity_id,
   ActorType,
   FromApub,
   GroupExt,
@@ -167,9 +167,7 @@ impl ActorType for Community {
       .set_id(generate_activity_id(AcceptType::Accept)?)
       .set_to(to.clone());
 
-    insert_activity(self.creator_id, accept.clone(), true, context.pool()).await?;
-
-    send_activity(context.activity_queue(), accept, self, vec![to])?;
+    send_activity_single_dest(accept, self, to, context).await?;
     Ok(())
   }
 
@@ -183,14 +181,7 @@ impl ActorType for Community {
       .set_to(public())
       .set_many_ccs(vec![self.get_followers_url()?]);
 
-    insert_activity(self.creator_id, delete.clone(), true, context.pool()).await?;
-
-    let inboxes = self.get_follower_inboxes(context.pool()).await?;
-
-    // Note: For an accept, since it was automatic, no one pushed a button,
-    // the community was the actor.
-    // But for delete, the creator is the actor, and does the signing
-    send_activity(context.activity_queue(), delete, creator, inboxes)?;
+    send_to_community_followers(delete, self, context, None).await?;
     Ok(())
   }
 
@@ -215,14 +206,7 @@ impl ActorType for Community {
       .set_to(public())
       .set_many_ccs(vec![self.get_followers_url()?]);
 
-    insert_activity(self.creator_id, undo.clone(), true, context.pool()).await?;
-
-    let inboxes = self.get_follower_inboxes(context.pool()).await?;
-
-    // Note: For an accept, since it was automatic, no one pushed a button,
-    // the community was the actor.
-    // But for delete, the creator is the actor, and does the signing
-    send_activity(context.activity_queue(), undo, creator, inboxes)?;
+    send_to_community_followers(undo, self, context, None).await?;
     Ok(())
   }
 
@@ -236,14 +220,7 @@ impl ActorType for Community {
       .set_to(public())
       .set_many_ccs(vec![self.get_followers_url()?]);
 
-    insert_activity(mod_.id, remove.clone(), true, context.pool()).await?;
-
-    let inboxes = self.get_follower_inboxes(context.pool()).await?;
-
-    // Note: For an accept, since it was automatic, no one pushed a button,
-    // the community was the actor.
-    // But for delete, the creator is the actor, and does the signing
-    send_activity(context.activity_queue(), remove, mod_, inboxes)?;
+    send_to_community_followers(remove, self, context, None).await?;
     Ok(())
   }
 
@@ -265,14 +242,7 @@ impl ActorType for Community {
       .set_to(public())
       .set_many_ccs(vec![self.get_followers_url()?]);
 
-    insert_activity(mod_.id, undo.clone(), true, context.pool()).await?;
-
-    let inboxes = self.get_follower_inboxes(context.pool()).await?;
-
-    // Note: For an accept, since it was automatic, no one pushed a button,
-    // the community was the actor.
-    // But for remove , the creator is the actor, and does the signing
-    send_activity(context.activity_queue(), undo, mod_, inboxes)?;
+    send_to_community_followers(undo, self, context, None).await?;
     Ok(())
   }
 
@@ -305,6 +275,8 @@ impl ActorType for Community {
         ))?)
       })
       .filter_map(Result::ok)
+      // Don't send to blocked instances
+      .filter(|inbox| check_is_apub_id_valid(inbox).is_ok())
       .unique()
       .collect();
 
@@ -513,19 +485,13 @@ pub async fn do_announce(
     .set_to(public())
     .set_many_ccs(vec![community.get_followers_url()?]);
 
-  insert_activity(community.creator_id, announce.clone(), true, context.pool()).await?;
-
-  let mut to: Vec<Url> = community.get_follower_inboxes(context.pool()).await?;
-
-  // dont send to the local instance, nor to the instance where the activity originally came from,
-  // because that would result in a database error (same data inserted twice)
-  // this seems to be the "easiest" stable alternative for remove_item()
-  let sender_shared_inbox = sender.get_shared_inbox_url()?;
-  to.retain(|x| x != &sender_shared_inbox);
-  let community_shared_inbox = community.get_shared_inbox_url()?;
-  to.retain(|x| x != &community_shared_inbox);
-
-  send_activity(context.activity_queue(), announce, community, to)?;
+  send_to_community_followers(
+    announce,
+    community,
+    context,
+    Some(sender.get_shared_inbox_url()?),
+  )
+  .await?;
 
   Ok(())
 }
diff --git a/lemmy_apub/src/lib.rs b/lemmy_apub/src/lib.rs
index 3f37c5d3..1f6e75e4 100644
--- a/lemmy_apub/src/lib.rs
+++ b/lemmy_apub/src/lib.rs
@@ -1,7 +1,6 @@
 #[macro_use]
 extern crate lazy_static;
 
-pub mod activities;
 pub mod activity_queue;
 pub mod comment;
 pub mod community;
@@ -43,6 +42,7 @@ use log::debug;
 use reqwest::Client;
 use serde::Serialize;
 use url::{ParseError, Url};
+use uuid::Uuid;
 
 type GroupExt = Ext2<ApActor<Group>, GroupExtension, PublicKeyExtension>;
 type PersonExt = Ext1<ApActor<Person>, PublicKeyExtension>;
@@ -360,3 +360,16 @@ where
   .await??;
   Ok(())
 }
+
+pub(in crate) fn generate_activity_id<T>(kind: T) -> Result<Url, ParseError>
+where
+  T: ToString,
+{
+  let id = format!(
+    "{}/activities/{}/{}",
+    Settings::get().get_protocol_and_hostname(),
+    kind.to_string().to_lowercase(),
+    Uuid::new_v4()
+  );
+  Url::parse(&id)
+}
diff --git a/lemmy_apub/src/post.rs b/lemmy_apub/src/post.rs
index 8f5ffbcb..2d615ea1 100644
--- a/lemmy_apub/src/post.rs
+++ b/lemmy_apub/src/post.rs
@@ -1,11 +1,12 @@
 use crate::{
-  activities::{generate_activity_id, send_activity_to_community},
+  activity_queue::send_to_community,
   check_actor_domain,
   create_apub_response,
   create_apub_tombstone_response,
   create_tombstone,
   extensions::page_extension::PageExtension,
   fetcher::{get_or_fetch_and_upsert_community, get_or_fetch_and_upsert_user},
+  generate_activity_id,
   ActorType,
   ApubLikeableType,
   ApubObjectType,
@@ -257,14 +258,7 @@ impl ApubObjectType for Post {
       .set_to(public())
       .set_many_ccs(vec![community.get_followers_url()?]);
 
-    send_activity_to_community(
-      creator,
-      &community,
-      vec![community.get_shared_inbox_url()?],
-      create,
-      context,
-    )
-    .await?;
+    send_to_community(creator, &community, create, context).await?;
     Ok(())
   }
 
@@ -285,14 +279,7 @@ impl ApubObjectType for Post {
       .set_to(public())
       .set_many_ccs(vec![community.get_followers_url()?]);
 
-    send_activity_to_community(
-      creator,
-      &community,
-      vec![community.get_shared_inbox_url()?],
-      update,
-      context,
-    )
-    .await?;
+    send_to_community(creator, &community, update, context).await?;
     Ok(())
   }
 
@@ -312,14 +299,7 @@ impl ApubObjectType for Post {
       .set_to(public())
       .set_many_ccs(vec![community.get_followers_url()?]);
 
-    send_activity_to_community(
-      creator,
-      &community,
-      vec![community.get_shared_inbox_url()?],
-      delete,
-      context,
-    )
-    .await?;
+    send_to_community(creator, &community, delete, context).await?;
     Ok(())
   }
 
@@ -351,14 +331,7 @@ impl ApubObjectType for Post {
       .set_to(public())
       .set_many_ccs(vec![community.get_followers_url()?]);
 
-    send_activity_to_community(
-      creator,
-      &community,
-      vec![community.get_shared_inbox_url()?],
-      undo,
-      context,
-    )
-    .await?;
+    send_to_community(creator, &community, undo, context).await?;
     Ok(())
   }
 
@@ -378,14 +351,7 @@ impl ApubObjectType for Post {
       .set_to(public())
       .set_many_ccs(vec![community.get_followers_url()?]);
 
-    send_activity_to_community(
-      mod_,
-      &community,
-      vec![community.get_shared_inbox_url()?],
-      remove,
-      context,
-    )
-    .await?;
+    send_to_community(mod_, &community, remove, context).await?;
     Ok(())
   }
 
@@ -413,14 +379,7 @@ impl ApubObjectType for Post {
       .set_to(public())
       .set_many_ccs(vec![community.get_followers_url()?]);
 
-    send_activity_to_community(
-      mod_,
-      &community,
-      vec![community.get_shared_inbox_url()?],
-      undo,
-      context,
-    )
-    .await?;
+    send_to_community(mod_, &community, undo, context).await?;
     Ok(())
   }
 }
@@ -443,14 +402,7 @@ impl ApubLikeableType for Post {
       .set_to(public())
       .set_many_ccs(vec![community.get_followers_url()?]);
 
-    send_activity_to_community(
-      &creator,
-      &community,
-      vec![community.get_shared_inbox_url()?],
-      like,
-      context,
-    )
-    .await?;
+    send_to_community(&creator, &community, like, context).await?;
     Ok(())
   }
 
@@ -470,14 +422,7 @@ impl ApubLikeableType for Post {
       .set_to(public())
       .set_many_ccs(vec![community.get_followers_url()?]);
 
-    send_activity_to_community(
-      &creator,
-      &community,
-      vec![community.get_shared_inbox_url()?],
-      dislike,
-      context,
-    )
-    .await?;
+    send_to_community(&creator, &community, dislike, context).await?;
     Ok(())
   }
 
@@ -509,14 +454,7 @@ impl ApubLikeableType for Post {
       .set_to(public())
       .set_many_ccs(vec![community.get_followers_url()?]);
 
-    send_activity_to_community(
-      &creator,
-      &community,
-      vec![community.get_shared_inbox_url()?],
-      undo,
-      context,
-    )
-    .await?;
+    send_to_community(&creator, &community, undo, context).await?;
     Ok(())
   }
 }
diff --git a/lemmy_apub/src/private_message.rs b/lemmy_apub/src/private_message.rs
index d61a7771..fd8e6c6b 100644
--- a/lemmy_apub/src/private_message.rs
+++ b/lemmy_apub/src/private_message.rs
@@ -1,11 +1,10 @@
 use crate::{
-  activities::generate_activity_id,
-  activity_queue::send_activity,
+  activity_queue::send_activity_single_dest,
   check_actor_domain,
   check_is_apub_id_valid,
   create_tombstone,
   fetcher::get_or_fetch_and_upsert_user,
-  insert_activity,
+  generate_activity_id,
   ActorType,
   ApubObjectType,
   FromApub,
@@ -130,9 +129,7 @@ impl ApubObjectType for PrivateMessage {
       .set_id(generate_activity_id(CreateType::Create)?)
       .set_to(to.clone());
 
-    insert_activity(creator.id, create.clone(), true, context.pool()).await?;
-
-    send_activity(context.activity_queue(), create, creator, vec![to])?;
+    send_activity_single_dest(create, creator, to, context).await?;
     Ok(())
   }
 
@@ -150,9 +147,7 @@ impl ApubObjectType for PrivateMessage {
       .set_id(generate_activity_id(UpdateType::Update)?)
       .set_to(to.clone());
 
-    insert_activity(creator.id, update.clone(), true, context.pool()).await?;
-
-    send_activity(context.activity_queue(), update, creator, vec![to])?;
+    send_activity_single_dest(update, creator, to, context).await?;
     Ok(())
   }
 
@@ -169,9 +164,7 @@ impl ApubObjectType for PrivateMessage {
       .set_id(generate_activity_id(DeleteType::Delete)?)
       .set_to(to.clone());
 
-    insert_activity(creator.id, delete.clone(), true, context.pool()).await?;
-
-    send_activity(context.activity_queue(), delete, creator, vec![to])?;
+    send_activity_single_dest(delete, creator, to, context).await?;
     Ok(())
   }
 
@@ -199,9 +192,7 @@ impl ApubObjectType for PrivateMessage {
       .set_id(generate_activity_id(UndoType::Undo)?)
       .set_to(to.clone());
 
-    insert_activity(creator.id, undo.clone(), true, context.pool()).await?;
-
-    send_activity(context.activity_queue(), undo, creator, vec![to])?;
+    send_activity_single_dest(undo, creator, to, context).await?;
     Ok(())
   }
 
diff --git a/lemmy_apub/src/user.rs b/lemmy_apub/src/user.rs
index 60af834c..3f6e6971 100644
--- a/lemmy_apub/src/user.rs
+++ b/lemmy_apub/src/user.rs
@@ -1,10 +1,9 @@
 use crate::{
-  activities::generate_activity_id,
-  activity_queue::send_activity,
+  activity_queue::send_activity_single_dest,
   check_actor_domain,
   create_apub_response,
   fetcher::get_or_fetch_and_upsert_actor,
-  insert_activity,
+  generate_activity_id,
   ActorType,
   FromApub,
   PersonExt,
@@ -126,9 +125,7 @@ impl ActorType for User_ {
     let follow_actor = get_or_fetch_and_upsert_actor(follow_actor_id, context).await?;
     let to = follow_actor.get_inbox_url()?;
 
-    insert_activity(self.id, follow.clone(), true, context.pool()).await?;
-
-    send_activity(context.activity_queue(), follow, self, vec![to])?;
+    send_activity_single_dest(follow, self, to, context).await?;
     Ok(())
   }
 
@@ -151,9 +148,7 @@ impl ActorType for User_ {
       .set_context(activitystreams::context())
       .set_id(generate_activity_id(UndoType::Undo)?);
 
-    insert_activity(self.id, undo.clone(), true, context.pool()).await?;
-
-    send_activity(context.activity_queue(), undo, self, vec![to])?;
+    send_activity_single_dest(undo, self, to, context).await?;
     Ok(())
   }