From: phiresky Date: Mon, 10 Jul 2023 10:27:49 +0000 (+0200) Subject: Improve api response times by doing send_activity asynchronously (#3493) X-Git-Url: http://these/git/%22https:/nerdica.net/photo/contact/80/README.ja.md?a=commitdiff_plain;h=b35757b429e015fa5705ac07ea6993342f675761;p=lemmy.git Improve api response times by doing send_activity asynchronously (#3493) * do send_activity after http response * move to util function * format * fix prometheus * make synchronous federation configurable * cargo fmt * empty * empty --------- Co-authored-by: Dessalines --- diff --git a/api_tests/run-federation-test.sh b/api_tests/run-federation-test.sh index ed4bba41..abced2ad 100755 --- a/api_tests/run-federation-test.sh +++ b/api_tests/run-federation-test.sh @@ -2,7 +2,7 @@ set -e export LEMMY_DATABASE_URL=postgres://lemmy:password@localhost:5432 - +export LEMMY_SYNCHRONOUS_FEDERATION=1 # currently this is true in debug by default, but still. pushd .. cargo build rm target/lemmy_server || true diff --git a/crates/api/src/lib.rs b/crates/api/src/lib.rs index 7ac3cec7..e4e496ba 100644 --- a/crates/api/src/lib.rs +++ b/crates/api/src/lib.rs @@ -17,7 +17,7 @@ mod site; #[async_trait::async_trait(?Send)] pub trait Perform { - type Response: serde::ser::Serialize + Send; + type Response: serde::ser::Serialize + Send + Clone + Sync; async fn perform(&self, context: &Data) -> Result; } diff --git a/crates/api_common/src/custom_emoji.rs b/crates/api_common/src/custom_emoji.rs index 550dd7a3..7f3461ca 100644 --- a/crates/api_common/src/custom_emoji.rs +++ b/crates/api_common/src/custom_emoji.rs @@ -43,7 +43,7 @@ pub struct DeleteCustomEmoji { pub auth: Sensitive, } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] #[cfg_attr(feature = "full", derive(TS))] #[cfg_attr(feature = "full", ts(export))] /// The response for deleting a custom emoji. diff --git a/crates/api_common/src/site.rs b/crates/api_common/src/site.rs index 865acc0d..bc7687e3 100644 --- a/crates/api_common/src/site.rs +++ b/crates/api_common/src/site.rs @@ -395,7 +395,7 @@ pub struct PurgeComment { pub auth: Sensitive, } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] #[cfg_attr(feature = "full", derive(TS))] #[cfg_attr(feature = "full", ts(export))] /// The response for purged items. diff --git a/crates/api_crud/src/lib.rs b/crates/api_crud/src/lib.rs index a10309fc..b9449ca6 100644 --- a/crates/api_crud/src/lib.rs +++ b/crates/api_crud/src/lib.rs @@ -12,7 +12,7 @@ mod user; #[async_trait::async_trait(?Send)] pub trait PerformCrud { - type Response: serde::ser::Serialize + Send; + type Response: serde::ser::Serialize + Send + Clone + Sync; async fn perform(&self, context: &Data) -> Result; } diff --git a/crates/api_crud/src/post/create.rs b/crates/api_crud/src/post/create.rs index 13d896b4..4264c26d 100644 --- a/crates/api_crud/src/post/create.rs +++ b/crates/api_crud/src/post/create.rs @@ -29,12 +29,14 @@ use lemmy_db_schema::{ use lemmy_db_views_actor::structs::CommunityView; use lemmy_utils::{ error::LemmyError, + spawn_try_task, utils::{ slurs::{check_slurs, check_slurs_opt}, validation::{check_url_scheme, clean_url_params, is_valid_body_field, is_valid_post_title}, }, + SYNCHRONOUS_FEDERATION, }; -use tracing::{warn, Instrument}; +use tracing::Instrument; use url::Url; use webmention::{Webmention, WebmentionError}; @@ -143,20 +145,30 @@ impl PerformCrud for CreatePost { // Mark the post as read mark_post_as_read(person_id, post_id, context.pool()).await?; - if let Some(url) = &updated_post.url { - let mut webmention = - Webmention::new::(updated_post.ap_id.clone().into(), url.clone().into())?; - webmention.set_checked(true); - match webmention - .send() - .instrument(tracing::info_span!("Sending webmention")) - .await - { - Ok(_) => {} - Err(WebmentionError::NoEndpointDiscovered(_)) => {} - Err(e) => warn!("Failed to send webmention: {}", e), + if let Some(url) = updated_post.url.clone() { + let task = async move { + let mut webmention = + Webmention::new::(updated_post.ap_id.clone().into(), url.clone().into())?; + webmention.set_checked(true); + match webmention + .send() + .instrument(tracing::info_span!("Sending webmention")) + .await + { + Err(WebmentionError::NoEndpointDiscovered(_)) => Ok(()), + Ok(_) => Ok(()), + Err(e) => Err(LemmyError::from_error_message( + e, + "Couldn't send webmention", + )), + } + }; + if *SYNCHRONOUS_FEDERATION { + task.await?; + } else { + spawn_try_task(task); } - } + }; build_post_response(context, community_id, person_id, post_id).await } diff --git a/crates/apub/src/lib.rs b/crates/apub/src/lib.rs index 653a1c19..1c36f985 100644 --- a/crates/apub/src/lib.rs +++ b/crates/apub/src/lib.rs @@ -201,7 +201,7 @@ where #[async_trait::async_trait] pub trait SendActivity: Sync { - type Response: Sync + Send; + type Response: Sync + Send + Clone; async fn send_activity( _request: &Self, diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index e5d07db2..2c71c58d 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -14,7 +14,11 @@ pub mod request; pub mod utils; pub mod version; +use error::LemmyError; +use futures::Future; +use once_cell::sync::Lazy; use std::time::Duration; +use tracing::Instrument; pub type ConnectionId = usize; @@ -31,3 +35,27 @@ macro_rules! location_info { ) }; } + +/// if true, all federation should happen synchronously. useful for debugging and testing. +/// defaults to true on debug mode, false on releasemode +/// override to true by setting env LEMMY_SYNCHRONOUS_FEDERATION=1 +/// override to false by setting env LEMMY_SYNCHRONOUS_FEDERATION="" +pub static SYNCHRONOUS_FEDERATION: Lazy = Lazy::new(|| { + std::env::var("LEMMY_SYNCHRONOUS_FEDERATION") + .map(|s| !s.is_empty()) + .unwrap_or(cfg!(debug_assertions)) +}); + +/// tokio::spawn, but accepts a future that may fail and also +/// * logs errors +/// * attaches the spawned task to the tracing span of the caller for better logging +pub fn spawn_try_task(task: impl Future> + Send + 'static) { + tokio::spawn( + async { + if let Err(e) = task.await { + tracing::warn!("error in spawn: {e}"); + } + } + .in_current_span(), // this makes sure the inner tracing gets the same context as where spawn was called + ); +} diff --git a/src/api_routes_http.rs b/src/api_routes_http.rs index ca0fa4c2..cb735f80 100644 --- a/src/api_routes_http.rs +++ b/src/api_routes_http.rs @@ -105,7 +105,7 @@ use lemmy_apub::{ }, SendActivity, }; -use lemmy_utils::rate_limit::RateLimitCell; +use lemmy_utils::{rate_limit::RateLimitCell, spawn_try_task, SYNCHRONOUS_FEDERATION}; use serde::Deserialize; pub fn config(cfg: &mut web::ServiceConfig, rate_limit: &RateLimitCell) { @@ -382,8 +382,14 @@ where + 'static, { let res = data.perform(&context).await?; - SendActivity::send_activity(&data, &res, &apub_data).await?; - Ok(HttpResponse::Ok().json(res)) + let res_clone = res.clone(); + let fed_task = async move { SendActivity::send_activity(&data, &res_clone, &apub_data).await }; + if *SYNCHRONOUS_FEDERATION { + fed_task.await?; + } else { + spawn_try_task(fed_task); + } + Ok(HttpResponse::Ok().json(&res)) } async fn route_get<'a, Data>( @@ -432,8 +438,14 @@ where + 'static, { let res = data.perform(&context).await?; - SendActivity::send_activity(&data, &res, &apub_data).await?; - Ok(HttpResponse::Ok().json(res)) + let res_clone = res.clone(); + let fed_task = async move { SendActivity::send_activity(&data, &res_clone, &apub_data).await }; + if *SYNCHRONOUS_FEDERATION { + fed_task.await?; + } else { + spawn_try_task(fed_task); + } + Ok(HttpResponse::Ok().json(&res)) } async fn route_get_crud<'a, Data>( diff --git a/src/lib.rs b/src/lib.rs index c798db68..b6fd64ec 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,7 +26,12 @@ use lemmy_db_schema::{ utils::{build_db_pool, get_database_url, run_migrations}, }; use lemmy_routes::{feeds, images, nodeinfo, webfinger}; -use lemmy_utils::{error::LemmyError, rate_limit::RateLimitCell, settings::SETTINGS}; +use lemmy_utils::{ + error::LemmyError, + rate_limit::RateLimitCell, + settings::SETTINGS, + SYNCHRONOUS_FEDERATION, +}; use reqwest::Client; use reqwest_middleware::ClientBuilder; use reqwest_tracing::TracingMiddleware; @@ -139,7 +144,7 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> { .http_fetch_limit(FEDERATION_HTTP_FETCH_LIMIT) .worker_count(settings.worker_count) .retry_count(settings.retry_count) - .debug(cfg!(debug_assertions)) + .debug(*SYNCHRONOUS_FEDERATION) .http_signature_compat(true) .url_verifier(Box::new(VerifyUrlData(context.pool().clone()))) .build() diff --git a/src/prometheus_metrics.rs b/src/prometheus_metrics.rs index 1ff47a54..4fe8150f 100644 --- a/src/prometheus_metrics.rs +++ b/src/prometheus_metrics.rs @@ -103,7 +103,7 @@ fn create_db_pool_metrics() -> DbPoolMetrics { .register(Box::new(metrics.available.clone())) .unwrap(); - return metrics; + metrics } async fn collect_db_pool_metrics(context: &PromContext) {