Improve api response times by doing send_activity asynchronously (#3493)
authorphiresky <phireskyde+git@gmail.com>
Mon, 10 Jul 2023 10:27:49 +0000 (12:27 +0200)
committerGitHub <noreply@github.com>
Mon, 10 Jul 2023 10:27:49 +0000 (12:27 +0200)
* do send_activity after http response

* move to util function

* format

* fix prometheus

* make synchronous federation configurable

* cargo fmt

* empty

* empty

---------

Co-authored-by: Dessalines <dessalines@users.noreply.github.com>
api_tests/run-federation-test.sh
crates/api/src/lib.rs
crates/api_common/src/custom_emoji.rs
crates/api_common/src/site.rs
crates/api_crud/src/lib.rs
crates/api_crud/src/post/create.rs
crates/apub/src/lib.rs
crates/utils/src/lib.rs
src/api_routes_http.rs
src/lib.rs
src/prometheus_metrics.rs

index ed4bba4167f9e8c90ba10137cba7e4da40a29ded..abced2ad6c7f95e73786625b68f148d2bc549042 100755 (executable)
@@ -2,7 +2,7 @@
 set -e
 
 export LEMMY_DATABASE_URL=postgres://lemmy:password@localhost:5432
-
+export LEMMY_SYNCHRONOUS_FEDERATION=1 # currently this is true in debug by default, but still.
 pushd ..
 cargo build
 rm target/lemmy_server || true
index 7ac3cec726faa2eabe956635bb8983b6aa573c81..e4e496ba99ce61464cd05849ccd9b55379373c58 100644 (file)
@@ -17,7 +17,7 @@ mod site;
 
 #[async_trait::async_trait(?Send)]
 pub trait Perform {
-  type Response: serde::ser::Serialize + Send;
+  type Response: serde::ser::Serialize + Send + Clone + Sync;
 
   async fn perform(&self, context: &Data<LemmyContext>) -> Result<Self::Response, LemmyError>;
 }
index 550dd7a3fc53bd8e6ec25ece75d4e363cf6cbf1c..7f3461ca794c8c2b5351980bfbd8959b0a99874d 100644 (file)
@@ -43,7 +43,7 @@ pub struct DeleteCustomEmoji {
   pub auth: Sensitive<String>,
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Serialize, Deserialize, Clone)]
 #[cfg_attr(feature = "full", derive(TS))]
 #[cfg_attr(feature = "full", ts(export))]
 /// The response for deleting a custom emoji.
index 865acc0dc2f81bb84717bd82fc82c255b20f5e1a..bc7687e3ce55957fd0637ccd7285a09d966afd0e 100644 (file)
@@ -395,7 +395,7 @@ pub struct PurgeComment {
   pub auth: Sensitive<String>,
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Serialize, Deserialize, Clone)]
 #[cfg_attr(feature = "full", derive(TS))]
 #[cfg_attr(feature = "full", ts(export))]
 /// The response for purged items.
index a10309fc9491cdfe4131abc4641a82198127183c..b9449ca69df40d69508a810771dcf266fe98f6f4 100644 (file)
@@ -12,7 +12,7 @@ mod user;
 
 #[async_trait::async_trait(?Send)]
 pub trait PerformCrud {
-  type Response: serde::ser::Serialize + Send;
+  type Response: serde::ser::Serialize + Send + Clone + Sync;
 
   async fn perform(&self, context: &Data<LemmyContext>) -> Result<Self::Response, LemmyError>;
 }
index 13d896b413cfa562ec81b5919f45d8d2674ba307..4264c26d4e759e065d4acffb48eb6d3f14ebe8f2 100644 (file)
@@ -29,12 +29,14 @@ use lemmy_db_schema::{
 use lemmy_db_views_actor::structs::CommunityView;
 use lemmy_utils::{
   error::LemmyError,
+  spawn_try_task,
   utils::{
     slurs::{check_slurs, check_slurs_opt},
     validation::{check_url_scheme, clean_url_params, is_valid_body_field, is_valid_post_title},
   },
+  SYNCHRONOUS_FEDERATION,
 };
-use tracing::{warn, Instrument};
+use tracing::Instrument;
 use url::Url;
 use webmention::{Webmention, WebmentionError};
 
@@ -143,20 +145,30 @@ impl PerformCrud for CreatePost {
     // Mark the post as read
     mark_post_as_read(person_id, post_id, context.pool()).await?;
 
-    if let Some(url) = &updated_post.url {
-      let mut webmention =
-        Webmention::new::<Url>(updated_post.ap_id.clone().into(), url.clone().into())?;
-      webmention.set_checked(true);
-      match webmention
-        .send()
-        .instrument(tracing::info_span!("Sending webmention"))
-        .await
-      {
-        Ok(_) => {}
-        Err(WebmentionError::NoEndpointDiscovered(_)) => {}
-        Err(e) => warn!("Failed to send webmention: {}", e),
+    if let Some(url) = updated_post.url.clone() {
+      let task = async move {
+        let mut webmention =
+          Webmention::new::<Url>(updated_post.ap_id.clone().into(), url.clone().into())?;
+        webmention.set_checked(true);
+        match webmention
+          .send()
+          .instrument(tracing::info_span!("Sending webmention"))
+          .await
+        {
+          Err(WebmentionError::NoEndpointDiscovered(_)) => Ok(()),
+          Ok(_) => Ok(()),
+          Err(e) => Err(LemmyError::from_error_message(
+            e,
+            "Couldn't send webmention",
+          )),
+        }
+      };
+      if *SYNCHRONOUS_FEDERATION {
+        task.await?;
+      } else {
+        spawn_try_task(task);
       }
-    }
+    };
 
     build_post_response(context, community_id, person_id, post_id).await
   }
index 653a1c194d5a2d6ce7097cb4a2ccf30aba483d8b..1c36f9852ecf823f2d68a833ee6fec8004ff6beb 100644 (file)
@@ -201,7 +201,7 @@ where
 
 #[async_trait::async_trait]
 pub trait SendActivity: Sync {
-  type Response: Sync + Send;
+  type Response: Sync + Send + Clone;
 
   async fn send_activity(
     _request: &Self,
index e5d07db2c61e498e666090cf1c2397be5ff0f2c1..2c71c58d032df655b335119696a5066822991824 100644 (file)
@@ -14,7 +14,11 @@ pub mod request;
 pub mod utils;
 pub mod version;
 
+use error::LemmyError;
+use futures::Future;
+use once_cell::sync::Lazy;
 use std::time::Duration;
+use tracing::Instrument;
 
 pub type ConnectionId = usize;
 
@@ -31,3 +35,27 @@ macro_rules! location_info {
     )
   };
 }
+
+/// if true, all federation should happen synchronously. useful for debugging and testing.
+/// defaults to true on debug mode, false on releasemode
+/// override to true by setting env LEMMY_SYNCHRONOUS_FEDERATION=1
+/// override to false by setting env LEMMY_SYNCHRONOUS_FEDERATION=""
+pub static SYNCHRONOUS_FEDERATION: Lazy<bool> = Lazy::new(|| {
+  std::env::var("LEMMY_SYNCHRONOUS_FEDERATION")
+    .map(|s| !s.is_empty())
+    .unwrap_or(cfg!(debug_assertions))
+});
+
+/// tokio::spawn, but accepts a future that may fail and also
+/// * logs errors
+/// * attaches the spawned task to the tracing span of the caller for better logging
+pub fn spawn_try_task(task: impl Future<Output = Result<(), LemmyError>> + Send + 'static) {
+  tokio::spawn(
+    async {
+      if let Err(e) = task.await {
+        tracing::warn!("error in spawn: {e}");
+      }
+    }
+    .in_current_span(), // this makes sure the inner tracing gets the same context as where spawn was called
+  );
+}
index ca0fa4c225917d41f4acc2d1c133b3980ee6844a..cb735f807c73903b1b72ecdc5e2ed0fee52ef9f1 100644 (file)
@@ -105,7 +105,7 @@ use lemmy_apub::{
   },
   SendActivity,
 };
-use lemmy_utils::rate_limit::RateLimitCell;
+use lemmy_utils::{rate_limit::RateLimitCell, spawn_try_task, SYNCHRONOUS_FEDERATION};
 use serde::Deserialize;
 
 pub fn config(cfg: &mut web::ServiceConfig, rate_limit: &RateLimitCell) {
@@ -382,8 +382,14 @@ where
     + 'static,
 {
   let res = data.perform(&context).await?;
-  SendActivity::send_activity(&data, &res, &apub_data).await?;
-  Ok(HttpResponse::Ok().json(res))
+  let res_clone = res.clone();
+  let fed_task = async move { SendActivity::send_activity(&data, &res_clone, &apub_data).await };
+  if *SYNCHRONOUS_FEDERATION {
+    fed_task.await?;
+  } else {
+    spawn_try_task(fed_task);
+  }
+  Ok(HttpResponse::Ok().json(&res))
 }
 
 async fn route_get<'a, Data>(
@@ -432,8 +438,14 @@ where
     + 'static,
 {
   let res = data.perform(&context).await?;
-  SendActivity::send_activity(&data, &res, &apub_data).await?;
-  Ok(HttpResponse::Ok().json(res))
+  let res_clone = res.clone();
+  let fed_task = async move { SendActivity::send_activity(&data, &res_clone, &apub_data).await };
+  if *SYNCHRONOUS_FEDERATION {
+    fed_task.await?;
+  } else {
+    spawn_try_task(fed_task);
+  }
+  Ok(HttpResponse::Ok().json(&res))
 }
 
 async fn route_get_crud<'a, Data>(
index c798db68bc23a0f0a1db7357f97312778991119c..b6fd64ec8096c0640549774339bf11bbc1553cbc 100644 (file)
@@ -26,7 +26,12 @@ use lemmy_db_schema::{
   utils::{build_db_pool, get_database_url, run_migrations},
 };
 use lemmy_routes::{feeds, images, nodeinfo, webfinger};
-use lemmy_utils::{error::LemmyError, rate_limit::RateLimitCell, settings::SETTINGS};
+use lemmy_utils::{
+  error::LemmyError,
+  rate_limit::RateLimitCell,
+  settings::SETTINGS,
+  SYNCHRONOUS_FEDERATION,
+};
 use reqwest::Client;
 use reqwest_middleware::ClientBuilder;
 use reqwest_tracing::TracingMiddleware;
@@ -139,7 +144,7 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> {
     .http_fetch_limit(FEDERATION_HTTP_FETCH_LIMIT)
     .worker_count(settings.worker_count)
     .retry_count(settings.retry_count)
-    .debug(cfg!(debug_assertions))
+    .debug(*SYNCHRONOUS_FEDERATION)
     .http_signature_compat(true)
     .url_verifier(Box::new(VerifyUrlData(context.pool().clone())))
     .build()
index 1ff47a54ba9e9e6761612d33389ac5e9bbc272e7..4fe8150f2b908a26b94cb6137e98a4c7cbc46124 100644 (file)
@@ -103,7 +103,7 @@ fn create_db_pool_metrics() -> DbPoolMetrics {
     .register(Box::new(metrics.available.clone()))
     .unwrap();
 
-  return metrics;
+  metrics
 }
 
 async fn collect_db_pool_metrics(context: &PromContext) {