From: phiresky <phireskyde+git@gmail.com>
Date: Mon, 10 Jul 2023 10:27:49 +0000 (+0200)
Subject: Improve api response times by doing send_activity asynchronously (#3493)
X-Git-Url: http://these/git/readmes/%7B%60%24%7BarchiveTodayUrl%7D/%22%7Burl%7D/static/git-favicon.png?a=commitdiff_plain;h=b35757b429e015fa5705ac07ea6993342f675761;p=lemmy.git

Improve api response times by doing send_activity asynchronously (#3493)

* do send_activity after http response

* move to util function

* format

* fix prometheus

* make synchronous federation configurable

* cargo fmt

* empty

* empty

---------

Co-authored-by: Dessalines <dessalines@users.noreply.github.com>
---

diff --git a/api_tests/run-federation-test.sh b/api_tests/run-federation-test.sh
index ed4bba41..abced2ad 100755
--- a/api_tests/run-federation-test.sh
+++ b/api_tests/run-federation-test.sh
@@ -2,7 +2,7 @@
 set -e
 
 export LEMMY_DATABASE_URL=postgres://lemmy:password@localhost:5432
-
+export LEMMY_SYNCHRONOUS_FEDERATION=1 # currently this is true in debug by default, but still.
 pushd ..
 cargo build
 rm target/lemmy_server || true
diff --git a/crates/api/src/lib.rs b/crates/api/src/lib.rs
index 7ac3cec7..e4e496ba 100644
--- a/crates/api/src/lib.rs
+++ b/crates/api/src/lib.rs
@@ -17,7 +17,7 @@ mod site;
 
 #[async_trait::async_trait(?Send)]
 pub trait Perform {
-  type Response: serde::ser::Serialize + Send;
+  type Response: serde::ser::Serialize + Send + Clone + Sync;
 
   async fn perform(&self, context: &Data<LemmyContext>) -> Result<Self::Response, LemmyError>;
 }
diff --git a/crates/api_common/src/custom_emoji.rs b/crates/api_common/src/custom_emoji.rs
index 550dd7a3..7f3461ca 100644
--- a/crates/api_common/src/custom_emoji.rs
+++ b/crates/api_common/src/custom_emoji.rs
@@ -43,7 +43,7 @@ pub struct DeleteCustomEmoji {
   pub auth: Sensitive<String>,
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Serialize, Deserialize, Clone)]
 #[cfg_attr(feature = "full", derive(TS))]
 #[cfg_attr(feature = "full", ts(export))]
 /// The response for deleting a custom emoji.
diff --git a/crates/api_common/src/site.rs b/crates/api_common/src/site.rs
index 865acc0d..bc7687e3 100644
--- a/crates/api_common/src/site.rs
+++ b/crates/api_common/src/site.rs
@@ -395,7 +395,7 @@ pub struct PurgeComment {
   pub auth: Sensitive<String>,
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Serialize, Deserialize, Clone)]
 #[cfg_attr(feature = "full", derive(TS))]
 #[cfg_attr(feature = "full", ts(export))]
 /// The response for purged items.
diff --git a/crates/api_crud/src/lib.rs b/crates/api_crud/src/lib.rs
index a10309fc..b9449ca6 100644
--- a/crates/api_crud/src/lib.rs
+++ b/crates/api_crud/src/lib.rs
@@ -12,7 +12,7 @@ mod user;
 
 #[async_trait::async_trait(?Send)]
 pub trait PerformCrud {
-  type Response: serde::ser::Serialize + Send;
+  type Response: serde::ser::Serialize + Send + Clone + Sync;
 
   async fn perform(&self, context: &Data<LemmyContext>) -> Result<Self::Response, LemmyError>;
 }
diff --git a/crates/api_crud/src/post/create.rs b/crates/api_crud/src/post/create.rs
index 13d896b4..4264c26d 100644
--- a/crates/api_crud/src/post/create.rs
+++ b/crates/api_crud/src/post/create.rs
@@ -29,12 +29,14 @@ use lemmy_db_schema::{
 use lemmy_db_views_actor::structs::CommunityView;
 use lemmy_utils::{
   error::LemmyError,
+  spawn_try_task,
   utils::{
     slurs::{check_slurs, check_slurs_opt},
     validation::{check_url_scheme, clean_url_params, is_valid_body_field, is_valid_post_title},
   },
+  SYNCHRONOUS_FEDERATION,
 };
-use tracing::{warn, Instrument};
+use tracing::Instrument;
 use url::Url;
 use webmention::{Webmention, WebmentionError};
 
@@ -143,20 +145,30 @@ impl PerformCrud for CreatePost {
     // Mark the post as read
     mark_post_as_read(person_id, post_id, context.pool()).await?;
 
-    if let Some(url) = &updated_post.url {
-      let mut webmention =
-        Webmention::new::<Url>(updated_post.ap_id.clone().into(), url.clone().into())?;
-      webmention.set_checked(true);
-      match webmention
-        .send()
-        .instrument(tracing::info_span!("Sending webmention"))
-        .await
-      {
-        Ok(_) => {}
-        Err(WebmentionError::NoEndpointDiscovered(_)) => {}
-        Err(e) => warn!("Failed to send webmention: {}", e),
+    if let Some(url) = updated_post.url.clone() {
+      let task = async move {
+        let mut webmention =
+          Webmention::new::<Url>(updated_post.ap_id.clone().into(), url.clone().into())?;
+        webmention.set_checked(true);
+        match webmention
+          .send()
+          .instrument(tracing::info_span!("Sending webmention"))
+          .await
+        {
+          Err(WebmentionError::NoEndpointDiscovered(_)) => Ok(()),
+          Ok(_) => Ok(()),
+          Err(e) => Err(LemmyError::from_error_message(
+            e,
+            "Couldn't send webmention",
+          )),
+        }
+      };
+      if *SYNCHRONOUS_FEDERATION {
+        task.await?;
+      } else {
+        spawn_try_task(task);
       }
-    }
+    };
 
     build_post_response(context, community_id, person_id, post_id).await
   }
diff --git a/crates/apub/src/lib.rs b/crates/apub/src/lib.rs
index 653a1c19..1c36f985 100644
--- a/crates/apub/src/lib.rs
+++ b/crates/apub/src/lib.rs
@@ -201,7 +201,7 @@ where
 
 #[async_trait::async_trait]
 pub trait SendActivity: Sync {
-  type Response: Sync + Send;
+  type Response: Sync + Send + Clone;
 
   async fn send_activity(
     _request: &Self,
diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs
index e5d07db2..2c71c58d 100644
--- a/crates/utils/src/lib.rs
+++ b/crates/utils/src/lib.rs
@@ -14,7 +14,11 @@ pub mod request;
 pub mod utils;
 pub mod version;
 
+use error::LemmyError;
+use futures::Future;
+use once_cell::sync::Lazy;
 use std::time::Duration;
+use tracing::Instrument;
 
 pub type ConnectionId = usize;
 
@@ -31,3 +35,27 @@ macro_rules! location_info {
     )
   };
 }
+
+/// if true, all federation should happen synchronously. useful for debugging and testing.
+/// defaults to true on debug mode, false on releasemode
+/// override to true by setting env LEMMY_SYNCHRONOUS_FEDERATION=1
+/// override to false by setting env LEMMY_SYNCHRONOUS_FEDERATION=""
+pub static SYNCHRONOUS_FEDERATION: Lazy<bool> = Lazy::new(|| {
+  std::env::var("LEMMY_SYNCHRONOUS_FEDERATION")
+    .map(|s| !s.is_empty())
+    .unwrap_or(cfg!(debug_assertions))
+});
+
+/// tokio::spawn, but accepts a future that may fail and also
+/// * logs errors
+/// * attaches the spawned task to the tracing span of the caller for better logging
+pub fn spawn_try_task(task: impl Future<Output = Result<(), LemmyError>> + Send + 'static) {
+  tokio::spawn(
+    async {
+      if let Err(e) = task.await {
+        tracing::warn!("error in spawn: {e}");
+      }
+    }
+    .in_current_span(), // this makes sure the inner tracing gets the same context as where spawn was called
+  );
+}
diff --git a/src/api_routes_http.rs b/src/api_routes_http.rs
index ca0fa4c2..cb735f80 100644
--- a/src/api_routes_http.rs
+++ b/src/api_routes_http.rs
@@ -105,7 +105,7 @@ use lemmy_apub::{
   },
   SendActivity,
 };
-use lemmy_utils::rate_limit::RateLimitCell;
+use lemmy_utils::{rate_limit::RateLimitCell, spawn_try_task, SYNCHRONOUS_FEDERATION};
 use serde::Deserialize;
 
 pub fn config(cfg: &mut web::ServiceConfig, rate_limit: &RateLimitCell) {
@@ -382,8 +382,14 @@ where
     + 'static,
 {
   let res = data.perform(&context).await?;
-  SendActivity::send_activity(&data, &res, &apub_data).await?;
-  Ok(HttpResponse::Ok().json(res))
+  let res_clone = res.clone();
+  let fed_task = async move { SendActivity::send_activity(&data, &res_clone, &apub_data).await };
+  if *SYNCHRONOUS_FEDERATION {
+    fed_task.await?;
+  } else {
+    spawn_try_task(fed_task);
+  }
+  Ok(HttpResponse::Ok().json(&res))
 }
 
 async fn route_get<'a, Data>(
@@ -432,8 +438,14 @@ where
     + 'static,
 {
   let res = data.perform(&context).await?;
-  SendActivity::send_activity(&data, &res, &apub_data).await?;
-  Ok(HttpResponse::Ok().json(res))
+  let res_clone = res.clone();
+  let fed_task = async move { SendActivity::send_activity(&data, &res_clone, &apub_data).await };
+  if *SYNCHRONOUS_FEDERATION {
+    fed_task.await?;
+  } else {
+    spawn_try_task(fed_task);
+  }
+  Ok(HttpResponse::Ok().json(&res))
 }
 
 async fn route_get_crud<'a, Data>(
diff --git a/src/lib.rs b/src/lib.rs
index c798db68..b6fd64ec 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -26,7 +26,12 @@ use lemmy_db_schema::{
   utils::{build_db_pool, get_database_url, run_migrations},
 };
 use lemmy_routes::{feeds, images, nodeinfo, webfinger};
-use lemmy_utils::{error::LemmyError, rate_limit::RateLimitCell, settings::SETTINGS};
+use lemmy_utils::{
+  error::LemmyError,
+  rate_limit::RateLimitCell,
+  settings::SETTINGS,
+  SYNCHRONOUS_FEDERATION,
+};
 use reqwest::Client;
 use reqwest_middleware::ClientBuilder;
 use reqwest_tracing::TracingMiddleware;
@@ -139,7 +144,7 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> {
     .http_fetch_limit(FEDERATION_HTTP_FETCH_LIMIT)
     .worker_count(settings.worker_count)
     .retry_count(settings.retry_count)
-    .debug(cfg!(debug_assertions))
+    .debug(*SYNCHRONOUS_FEDERATION)
     .http_signature_compat(true)
     .url_verifier(Box::new(VerifyUrlData(context.pool().clone())))
     .build()
diff --git a/src/prometheus_metrics.rs b/src/prometheus_metrics.rs
index 1ff47a54..4fe8150f 100644
--- a/src/prometheus_metrics.rs
+++ b/src/prometheus_metrics.rs
@@ -103,7 +103,7 @@ fn create_db_pool_metrics() -> DbPoolMetrics {
     .register(Box::new(metrics.available.clone()))
     .unwrap();
 
-  return metrics;
+  metrics
 }
 
 async fn collect_db_pool_metrics(context: &PromContext) {