Remove SendActivity and Perform traits, rely on channel (#3596)
authorNutomic <me@nutomic.com>
Wed, 19 Jul 2023 13:49:41 +0000 (15:49 +0200)
committerGitHub <noreply@github.com>
Wed, 19 Jul 2023 13:49:41 +0000 (09:49 -0400)
* Remove SendActivity and Perform traits, rely on channel

These traits arent necessary anymore now that websocket is removed.
Removing them allows us to use normal actix http handler methods
which are much more flexible, and allow using different middlewares
as well as setting response attributes.

* compiling and create post federating

* clippy

* rename methods, join outgoing activities task

* fix api tests

* no unwrap

* conditional compile

* add back getrandom

* make crates optional

* fmt

12 files changed:
Cargo.lock
crates/api_common/Cargo.toml
crates/api_common/src/build_response.rs
crates/api_common/src/lib.rs
crates/api_common/src/send_activity.rs [new file with mode: 0644]
crates/api_crud/src/lib.rs
crates/api_crud/src/post/create.rs
crates/api_crud/src/post/mod.rs
crates/apub/src/activities/create_or_update/post.rs
crates/apub/src/activities/mod.rs
src/api_routes_http.rs
src/lib.rs

index 407cb9aae2a99a86061b4936598c04aedc664a88..b55aeb6b33a26cf87a442a781030a2c360ef9d6b 100644 (file)
@@ -2633,6 +2633,7 @@ dependencies = [
 name = "lemmy_api_common"
 version = "0.18.1"
 dependencies = [
+ "activitypub_federation",
  "actix-web",
  "anyhow",
  "chrono",
@@ -2644,6 +2645,7 @@ dependencies = [
  "lemmy_db_views_actor",
  "lemmy_db_views_moderator",
  "lemmy_utils",
+ "once_cell",
  "percent-encoding",
  "regex",
  "reqwest",
index bef5ab285ff1b32ae4f5d216f4b31a70bd99a203..8a23a4cb2475425b59c54ad26e2b53b8c8ea658a 100644 (file)
@@ -22,6 +22,7 @@ full = [
   "lemmy_db_views/full",
   "lemmy_db_views_actor/full",
   "lemmy_db_views_moderator/full",
+  "activitypub_federation",
   "percent-encoding",
   "encoding",
   "reqwest-middleware",
@@ -32,6 +33,7 @@ full = [
   "reqwest",
   "actix-web",
   "futures",
+  "once_cell",
 ]
 
 [dependencies]
@@ -40,6 +42,7 @@ lemmy_db_views_moderator = { workspace = true }
 lemmy_db_views_actor = { workspace = true }
 lemmy_db_schema = { workspace = true }
 lemmy_utils = { workspace = true, optional = true }
+activitypub_federation = { workspace = true, optional = true }
 serde = { workspace = true }
 serde_with = { workspace = true }
 url = { workspace = true }
@@ -59,5 +62,7 @@ uuid = { workspace = true, optional = true }
 tokio = { workspace = true, optional = true }
 reqwest = { workspace = true, optional = true }
 ts-rs = { workspace = true, optional = true }
+once_cell = { workspace = true, optional = true }
 actix-web = { workspace = true, optional = true }
+# necessary for wasmt compilation
 getrandom = { version = "0.2.10", features = ["js"] }
index acb7355bdea64c81c78f7c4a161dca42dd810bc7..a61da7f0b7627a2ce3fcc6b5e547aae78d2cc9f9 100644 (file)
@@ -64,7 +64,7 @@ pub async fn build_community_response(
 }
 
 pub async fn build_post_response(
-  context: &Data<LemmyContext>,
+  context: &LemmyContext,
   community_id: CommunityId,
   person_id: PersonId,
   post_id: PostId,
index 224e114a55161d335cb9ce955139b6ceca042226..652cbaf43a40141d39d510fb163faa13dca3ec4f 100644 (file)
@@ -10,6 +10,8 @@ pub mod post;
 pub mod private_message;
 #[cfg(feature = "full")]
 pub mod request;
+#[cfg(feature = "full")]
+pub mod send_activity;
 pub mod sensitive;
 pub mod site;
 #[cfg(feature = "full")]
diff --git a/crates/api_common/src/send_activity.rs b/crates/api_common/src/send_activity.rs
new file mode 100644 (file)
index 0000000..a2bc9a6
--- /dev/null
@@ -0,0 +1,58 @@
+use crate::context::LemmyContext;
+use activitypub_federation::config::Data;
+use futures::future::BoxFuture;
+use lemmy_db_schema::source::post::Post;
+use lemmy_utils::{error::LemmyResult, SYNCHRONOUS_FEDERATION};
+use once_cell::sync::{Lazy, OnceCell};
+use tokio::sync::{
+  mpsc,
+  mpsc::{UnboundedReceiver, UnboundedSender},
+  Mutex,
+};
+
+type MatchOutgoingActivitiesBoxed =
+  Box<for<'a> fn(SendActivityData, &'a Data<LemmyContext>) -> BoxFuture<'a, LemmyResult<()>>>;
+
+/// This static is necessary so that activities can be sent out synchronously for tests.
+pub static MATCH_OUTGOING_ACTIVITIES: OnceCell<MatchOutgoingActivitiesBoxed> = OnceCell::new();
+
+#[derive(Debug)]
+pub enum SendActivityData {
+  CreatePost(Post),
+}
+
+static ACTIVITY_CHANNEL: Lazy<ActivityChannel> = Lazy::new(|| {
+  let (sender, receiver) = mpsc::unbounded_channel();
+  ActivityChannel {
+    sender,
+    receiver: Mutex::new(receiver),
+  }
+});
+
+pub struct ActivityChannel {
+  sender: UnboundedSender<SendActivityData>,
+  receiver: Mutex<UnboundedReceiver<SendActivityData>>,
+}
+
+impl ActivityChannel {
+  pub async fn retrieve_activity() -> Option<SendActivityData> {
+    let mut lock = ACTIVITY_CHANNEL.receiver.lock().await;
+    lock.recv().await
+  }
+
+  pub async fn submit_activity(
+    data: SendActivityData,
+    context: &Data<LemmyContext>,
+  ) -> LemmyResult<()> {
+    if *SYNCHRONOUS_FEDERATION {
+      MATCH_OUTGOING_ACTIVITIES
+        .get()
+        .expect("retrieve function pointer")(data, context)
+      .await?;
+    } else {
+      let lock = &ACTIVITY_CHANNEL.sender;
+      lock.send(data)?;
+    }
+    Ok(())
+  }
+}
index b9449ca69df40d69508a810771dcf266fe98f6f4..e793428650f4d20a1173b0935a05c47ea2c5fc31 100644 (file)
@@ -5,7 +5,7 @@ use lemmy_utils::error::LemmyError;
 mod comment;
 mod community;
 mod custom_emoji;
-mod post;
+pub mod post;
 mod private_message;
 mod site;
 mod user;
index a7aafe81249e0b9f2a25799eaf82cc01ce11311a..16a6f0004389cebb54c8a4676de9abe72014b13e 100644 (file)
@@ -1,10 +1,11 @@
-use crate::PerformCrud;
-use actix_web::web::Data;
+use activitypub_federation::config::Data;
+use actix_web::web::Json;
 use lemmy_api_common::{
   build_response::build_post_response,
   context::LemmyContext,
   post::{CreatePost, PostResponse},
   request::fetch_site_data,
+  send_activity::{ActivityChannel, SendActivityData},
   utils::{
     check_community_ban,
     check_community_deleted_or_removed,
@@ -40,147 +41,145 @@ use tracing::Instrument;
 use url::Url;
 use webmention::{Webmention, WebmentionError};
 
-#[async_trait::async_trait(?Send)]
-impl PerformCrud for CreatePost {
-  type Response = PostResponse;
-
-  #[tracing::instrument(skip(context))]
-  async fn perform(&self, context: &Data<LemmyContext>) -> Result<PostResponse, LemmyError> {
-    let data: &CreatePost = self;
-    let local_user_view = local_user_view_from_jwt(&data.auth, context).await?;
-    let local_site = LocalSite::read(&mut context.pool()).await?;
-
-    let slur_regex = local_site_to_slur_regex(&local_site);
-    check_slurs(&data.name, &slur_regex)?;
-    check_slurs_opt(&data.body, &slur_regex)?;
-    honeypot_check(&data.honeypot)?;
-
-    let data_url = data.url.as_ref();
-    let url = data_url.map(clean_url_params).map(Into::into); // TODO no good way to handle a "clear"
-
-    is_valid_post_title(&data.name)?;
-    is_valid_body_field(&data.body, true)?;
-    check_url_scheme(&data.url)?;
-
-    check_community_ban(
-      local_user_view.person.id,
-      data.community_id,
+#[tracing::instrument(skip(context))]
+pub async fn create_post(
+  data: Json<CreatePost>,
+  context: Data<LemmyContext>,
+) -> Result<Json<PostResponse>, LemmyError> {
+  let local_user_view = local_user_view_from_jwt(&data.auth, &context).await?;
+  let local_site = LocalSite::read(&mut context.pool()).await?;
+
+  let slur_regex = local_site_to_slur_regex(&local_site);
+  check_slurs(&data.name, &slur_regex)?;
+  check_slurs_opt(&data.body, &slur_regex)?;
+  honeypot_check(&data.honeypot)?;
+
+  let data_url = data.url.as_ref();
+  let url = data_url.map(clean_url_params).map(Into::into); // TODO no good way to handle a "clear"
+
+  is_valid_post_title(&data.name)?;
+  is_valid_body_field(&data.body, true)?;
+  check_url_scheme(&data.url)?;
+
+  check_community_ban(
+    local_user_view.person.id,
+    data.community_id,
+    &mut context.pool(),
+  )
+  .await?;
+  check_community_deleted_or_removed(data.community_id, &mut context.pool()).await?;
+
+  let community_id = data.community_id;
+  let community = Community::read(&mut context.pool(), community_id).await?;
+  if community.posting_restricted_to_mods {
+    let community_id = data.community_id;
+    let is_mod = CommunityView::is_mod_or_admin(
       &mut context.pool(),
+      local_user_view.local_user.person_id,
+      community_id,
     )
     .await?;
-    check_community_deleted_or_removed(data.community_id, &mut context.pool()).await?;
+    if !is_mod {
+      return Err(LemmyErrorType::OnlyModsCanPostInCommunity)?;
+    }
+  }
 
-    let community_id = data.community_id;
-    let community = Community::read(&mut context.pool(), community_id).await?;
-    if community.posting_restricted_to_mods {
-      let community_id = data.community_id;
-      let is_mod = CommunityView::is_mod_or_admin(
+  // Fetch post links and pictrs cached image
+  let (metadata_res, thumbnail_url) =
+    fetch_site_data(context.client(), context.settings(), data_url, true).await;
+  let (embed_title, embed_description, embed_video_url) = metadata_res
+    .map(|u| (u.title, u.description, u.embed_video_url))
+    .unwrap_or_default();
+
+  let language_id = match data.language_id {
+    Some(lid) => Some(lid),
+    None => {
+      default_post_language(
         &mut context.pool(),
-        local_user_view.local_user.person_id,
         community_id,
+        local_user_view.local_user.id,
       )
-      .await?;
-      if !is_mod {
-        return Err(LemmyErrorType::OnlyModsCanPostInCommunity)?;
-      }
+      .await?
     }
-
-    // Fetch post links and pictrs cached image
-    let (metadata_res, thumbnail_url) =
-      fetch_site_data(context.client(), context.settings(), data_url, true).await;
-    let (embed_title, embed_description, embed_video_url) = metadata_res
-      .map(|u| (u.title, u.description, u.embed_video_url))
-      .unwrap_or_default();
-
-    let language_id = match data.language_id {
-      Some(lid) => Some(lid),
-      None => {
-        default_post_language(
-          &mut context.pool(),
-          community_id,
-          local_user_view.local_user.id,
-        )
-        .await?
-      }
-    };
-    CommunityLanguage::is_allowed_community_language(
-      &mut context.pool(),
-      language_id,
-      community_id,
-    )
+  };
+  CommunityLanguage::is_allowed_community_language(&mut context.pool(), language_id, community_id)
     .await?;
 
-    let post_form = PostInsertForm::builder()
-      .name(data.name.trim().to_owned())
-      .url(url)
-      .body(data.body.clone())
-      .community_id(data.community_id)
-      .creator_id(local_user_view.person.id)
-      .nsfw(data.nsfw)
-      .embed_title(embed_title)
-      .embed_description(embed_description)
-      .embed_video_url(embed_video_url)
-      .language_id(language_id)
-      .thumbnail_url(thumbnail_url)
-      .build();
-
-    let inserted_post = Post::create(&mut context.pool(), &post_form)
-      .await
-      .with_lemmy_type(LemmyErrorType::CouldntCreatePost)?;
-
-    let inserted_post_id = inserted_post.id;
-    let protocol_and_hostname = context.settings().get_protocol_and_hostname();
-    let apub_id = generate_local_apub_endpoint(
-      EndpointType::Post,
-      &inserted_post_id.to_string(),
-      &protocol_and_hostname,
-    )?;
-    let updated_post = Post::update(
-      &mut context.pool(),
-      inserted_post_id,
-      &PostUpdateForm::builder().ap_id(Some(apub_id)).build(),
-    )
+  let post_form = PostInsertForm::builder()
+    .name(data.name.trim().to_owned())
+    .url(url)
+    .body(data.body.clone())
+    .community_id(data.community_id)
+    .creator_id(local_user_view.person.id)
+    .nsfw(data.nsfw)
+    .embed_title(embed_title)
+    .embed_description(embed_description)
+    .embed_video_url(embed_video_url)
+    .language_id(language_id)
+    .thumbnail_url(thumbnail_url)
+    .build();
+
+  let inserted_post = Post::create(&mut context.pool(), &post_form)
     .await
     .with_lemmy_type(LemmyErrorType::CouldntCreatePost)?;
 
-    // They like their own post by default
-    let person_id = local_user_view.person.id;
-    let post_id = inserted_post.id;
-    let like_form = PostLikeForm {
-      post_id,
-      person_id,
-      score: 1,
-    };
+  let inserted_post_id = inserted_post.id;
+  let protocol_and_hostname = context.settings().get_protocol_and_hostname();
+  let apub_id = generate_local_apub_endpoint(
+    EndpointType::Post,
+    &inserted_post_id.to_string(),
+    &protocol_and_hostname,
+  )?;
+  let updated_post = Post::update(
+    &mut context.pool(),
+    inserted_post_id,
+    &PostUpdateForm::builder().ap_id(Some(apub_id)).build(),
+  )
+  .await
+  .with_lemmy_type(LemmyErrorType::CouldntCreatePost)?;
+
+  // They like their own post by default
+  let person_id = local_user_view.person.id;
+  let post_id = inserted_post.id;
+  let like_form = PostLikeForm {
+    post_id,
+    person_id,
+    score: 1,
+  };
+
+  PostLike::like(&mut context.pool(), &like_form)
+    .await
+    .with_lemmy_type(LemmyErrorType::CouldntLikePost)?;
+
+  ActivityChannel::submit_activity(SendActivityData::CreatePost(updated_post.clone()), &context)
+    .await?;
 
-    PostLike::like(&mut context.pool(), &like_form)
-      .await
-      .with_lemmy_type(LemmyErrorType::CouldntLikePost)?;
-
-    // Mark the post as read
-    mark_post_as_read(person_id, post_id, &mut context.pool()).await?;
-
-    if let Some(url) = updated_post.url.clone() {
-      let task = async move {
-        let mut webmention =
-          Webmention::new::<Url>(updated_post.ap_id.clone().into(), url.clone().into())?;
-        webmention.set_checked(true);
-        match webmention
-          .send()
-          .instrument(tracing::info_span!("Sending webmention"))
-          .await
-        {
-          Err(WebmentionError::NoEndpointDiscovered(_)) => Ok(()),
-          Ok(_) => Ok(()),
-          Err(e) => Err(e).with_lemmy_type(LemmyErrorType::CouldntSendWebmention),
-        }
-      };
-      if *SYNCHRONOUS_FEDERATION {
-        task.await?;
-      } else {
-        spawn_try_task(task);
+  // Mark the post as read
+  mark_post_as_read(person_id, post_id, &mut context.pool()).await?;
+
+  if let Some(url) = updated_post.url.clone() {
+    let task = async move {
+      let mut webmention =
+        Webmention::new::<Url>(updated_post.ap_id.clone().into(), url.clone().into())?;
+      webmention.set_checked(true);
+      match webmention
+        .send()
+        .instrument(tracing::info_span!("Sending webmention"))
+        .await
+      {
+        Err(WebmentionError::NoEndpointDiscovered(_)) => Ok(()),
+        Ok(_) => Ok(()),
+        Err(e) => Err(e).with_lemmy_type(LemmyErrorType::CouldntSendWebmention),
       }
     };
+    if *SYNCHRONOUS_FEDERATION {
+      task.await?;
+    } else {
+      spawn_try_task(task);
+    }
+  };
 
-    build_post_response(context, community_id, person_id, post_id).await
-  }
+  Ok(Json(
+    build_post_response(&context, community_id, person_id, post_id).await?,
+  ))
 }
index d3d789a02ab30217a8be7e11ee3dee5b707360cf..43795556146d20d9495163e2d03ae36dab9d3f10 100644 (file)
@@ -1,4 +1,4 @@
-mod create;
+pub mod create;
 mod delete;
 mod read;
 mod remove;
index 77199056d53f044c236f9d74aac1b569daeb1568..4767114f98b55edeb1eda6ab82743f85729e15d2 100644 (file)
@@ -24,7 +24,7 @@ use activitypub_federation::{
 };
 use lemmy_api_common::{
   context::LemmyContext,
-  post::{CreatePost, EditPost, PostResponse},
+  post::{EditPost, PostResponse},
 };
 use lemmy_db_schema::{
   aggregates::structs::PostAggregates,
@@ -39,25 +39,6 @@ use lemmy_db_schema::{
 use lemmy_utils::error::{LemmyError, LemmyErrorType};
 use url::Url;
 
-#[async_trait::async_trait]
-impl SendActivity for CreatePost {
-  type Response = PostResponse;
-
-  async fn send_activity(
-    _request: &Self,
-    response: &Self::Response,
-    context: &Data<LemmyContext>,
-  ) -> Result<(), LemmyError> {
-    CreateOrUpdatePage::send(
-      &response.post_view.post,
-      response.post_view.creator.id,
-      CreateOrUpdateType::Create,
-      context,
-    )
-    .await
-  }
-}
-
 #[async_trait::async_trait]
 impl SendActivity for EditPost {
   type Response = PostResponse;
@@ -68,10 +49,10 @@ impl SendActivity for EditPost {
     context: &Data<LemmyContext>,
   ) -> Result<(), LemmyError> {
     CreateOrUpdatePage::send(
-      &response.post_view.post,
+      response.post_view.post.clone(),
       response.post_view.creator.id,
       CreateOrUpdateType::Update,
-      context,
+      context.reset_request_count(),
     )
     .await
   }
@@ -102,12 +83,12 @@ impl CreateOrUpdatePage {
 
   #[tracing::instrument(skip_all)]
   pub(crate) async fn send(
-    post: &Post,
+    post: Post,
     person_id: PersonId,
     kind: CreateOrUpdateType,
-    context: &Data<LemmyContext>,
+    context: Data<LemmyContext>,
   ) -> Result<(), LemmyError> {
-    let post = ApubPost(post.clone());
+    let post = ApubPost(post);
     let community_id = post.community_id;
     let person: ApubPerson = Person::read(&mut context.pool(), person_id).await?.into();
     let community: ApubCommunity = Community::read(&mut context.pool(), community_id)
@@ -115,8 +96,8 @@ impl CreateOrUpdatePage {
       .into();
 
     let create_or_update =
-      CreateOrUpdatePage::new(post, &person, &community, kind, context).await?;
-    let is_mod_action = create_or_update.object.is_mod_action(context).await?;
+      CreateOrUpdatePage::new(post, &person, &community, kind, &context).await?;
+    let is_mod_action = create_or_update.object.is_mod_action(&context).await?;
     let activity = AnnouncableActivities::CreateOrUpdatePost(create_or_update);
     send_activity_in_community(
       activity,
@@ -124,7 +105,7 @@ impl CreateOrUpdatePage {
       &community,
       vec![],
       is_mod_action,
-      context,
+      &context,
     )
     .await?;
     Ok(())
index 4fd8da536f20f7e01a39cd545c5e6165e7ea1a4a..02ad0b6b19e32f8bfbc99fa079842fd2d685301a 100644 (file)
@@ -1,5 +1,6 @@
 use crate::{
   objects::{community::ApubCommunity, person::ApubPerson},
+  protocol::activities::{create_or_update::page::CreateOrUpdatePage, CreateOrUpdateType},
   CONTEXT,
 };
 use activitypub_federation::{
@@ -11,7 +12,10 @@ use activitypub_federation::{
   traits::{ActivityHandler, Actor},
 };
 use anyhow::anyhow;
-use lemmy_api_common::context::LemmyContext;
+use lemmy_api_common::{
+  context::LemmyContext,
+  send_activity::{ActivityChannel, SendActivityData},
+};
 use lemmy_db_schema::{
   newtypes::CommunityId,
   source::{
@@ -21,7 +25,11 @@ use lemmy_db_schema::{
   },
 };
 use lemmy_db_views_actor::structs::{CommunityPersonBanView, CommunityView};
-use lemmy_utils::error::{LemmyError, LemmyErrorExt, LemmyErrorType};
+use lemmy_utils::{
+  error::{LemmyError, LemmyErrorExt, LemmyErrorType, LemmyResult},
+  spawn_try_task,
+  SYNCHRONOUS_FEDERATION,
+};
 use moka::future::Cache;
 use once_cell::sync::Lazy;
 use serde::Serialize;
@@ -197,3 +205,33 @@ where
 
   Ok(())
 }
+
+pub async fn handle_outgoing_activities(context: Data<LemmyContext>) -> LemmyResult<()> {
+  while let Some(data) = ActivityChannel::retrieve_activity().await {
+    match_outgoing_activities(data, &context.reset_request_count()).await?
+  }
+  Ok(())
+}
+
+pub async fn match_outgoing_activities(
+  data: SendActivityData,
+  context: &Data<LemmyContext>,
+) -> LemmyResult<()> {
+  let fed_task = match data {
+    SendActivityData::CreatePost(post) => {
+      let creator_id = post.creator_id;
+      CreateOrUpdatePage::send(
+        post,
+        creator_id,
+        CreateOrUpdateType::Create,
+        context.reset_request_count(),
+      )
+    }
+  };
+  if *SYNCHRONOUS_FEDERATION {
+    fed_task.await?;
+  } else {
+    spawn_try_task(fed_task);
+  }
+  Ok(())
+}
index cb735f807c73903b1b72ecdc5e2ed0fee52ef9f1..bc4340e3cccf5baf61d8fc18fc506ca12791436a 100644 (file)
@@ -52,7 +52,6 @@ use lemmy_api_common::{
     VerifyEmail,
   },
   post::{
-    CreatePost,
     CreatePostLike,
     CreatePostReport,
     DeletePost,
@@ -93,7 +92,7 @@ use lemmy_api_common::{
     PurgePost,
   },
 };
-use lemmy_api_crud::PerformCrud;
+use lemmy_api_crud::{post::create::create_post, PerformCrud};
 use lemmy_apub::{
   api::{
     list_comments::list_comments,
@@ -175,7 +174,7 @@ pub fn config(cfg: &mut web::ServiceConfig, rate_limit: &RateLimitCell) {
         web::resource("/post")
           .guard(guard::Post())
           .wrap(rate_limit.post())
-          .route(web::post().to(route_post_crud::<CreatePost>)),
+          .route(web::post().to(create_post)),
       )
       .service(
         web::scope("/post")
index 55bb91606341cdbfb8c4747cea93a884108c97fc..e07ae2685d478736c756b320de429bd2f1914e5c 100644 (file)
@@ -21,12 +21,17 @@ use lemmy_api_common::{
   context::LemmyContext,
   lemmy_db_views::structs::SiteView,
   request::build_user_agent,
+  send_activity::MATCH_OUTGOING_ACTIVITIES,
   utils::{
     check_private_instance_and_federation_enabled,
     local_site_rate_limit_to_rate_limit_config,
   },
 };
-use lemmy_apub::{VerifyUrlData, FEDERATION_HTTP_FETCH_LIMIT};
+use lemmy_apub::{
+  activities::{handle_outgoing_activities, match_outgoing_activities},
+  VerifyUrlData,
+  FEDERATION_HTTP_FETCH_LIMIT,
+};
 use lemmy_db_schema::{
   source::secret::Secret,
   utils::{build_db_pool, get_database_url, run_migrations},
@@ -165,9 +170,17 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> {
     .build()
     .expect("Should always be buildable");
 
+  MATCH_OUTGOING_ACTIVITIES
+    .set(Box::new(move |d, c| {
+      Box::pin(match_outgoing_activities(d, c))
+    }))
+    .expect("set function pointer");
+  let request_data = federation_config.to_request_data();
+  let outgoing_activities_task = tokio::task::spawn(handle_outgoing_activities(request_data));
+
   // Create Http server with websocket support
   HttpServer::new(move || {
-    let cors_origin = std::env::var("LEMMY_CORS_ORIGIN");
+    let cors_origin = env::var("LEMMY_CORS_ORIGIN");
     let cors_config = match (cors_origin, cfg!(debug_assertions)) {
       (Ok(origin), false) => Cors::default()
         .allowed_origin(&origin)
@@ -213,6 +226,9 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> {
   .run()
   .await?;
 
+  // Wait for outgoing apub sends to complete
+  outgoing_activities_task.await??;
+
   Ok(())
 }