set -e
export LEMMY_DATABASE_URL=postgres://lemmy:password@localhost:5432
-
+export LEMMY_SYNCHRONOUS_FEDERATION=1 # currently this is true in debug by default, but still.
pushd ..
cargo build
rm target/lemmy_server || true
#[async_trait::async_trait(?Send)]
pub trait Perform {
- type Response: serde::ser::Serialize + Send;
+ type Response: serde::ser::Serialize + Send + Clone + Sync;
async fn perform(&self, context: &Data<LemmyContext>) -> Result<Self::Response, LemmyError>;
}
pub auth: Sensitive<String>,
}
-#[derive(Serialize, Deserialize)]
+#[derive(Serialize, Deserialize, Clone)]
#[cfg_attr(feature = "full", derive(TS))]
#[cfg_attr(feature = "full", ts(export))]
/// The response for deleting a custom emoji.
pub auth: Sensitive<String>,
}
-#[derive(Serialize, Deserialize)]
+#[derive(Serialize, Deserialize, Clone)]
#[cfg_attr(feature = "full", derive(TS))]
#[cfg_attr(feature = "full", ts(export))]
/// The response for purged items.
#[async_trait::async_trait(?Send)]
pub trait PerformCrud {
- type Response: serde::ser::Serialize + Send;
+ type Response: serde::ser::Serialize + Send + Clone + Sync;
async fn perform(&self, context: &Data<LemmyContext>) -> Result<Self::Response, LemmyError>;
}
use lemmy_db_views_actor::structs::CommunityView;
use lemmy_utils::{
error::LemmyError,
+ spawn_try_task,
utils::{
slurs::{check_slurs, check_slurs_opt},
validation::{check_url_scheme, clean_url_params, is_valid_body_field, is_valid_post_title},
},
+ SYNCHRONOUS_FEDERATION,
};
-use tracing::{warn, Instrument};
+use tracing::Instrument;
use url::Url;
use webmention::{Webmention, WebmentionError};
// Mark the post as read
mark_post_as_read(person_id, post_id, context.pool()).await?;
- if let Some(url) = &updated_post.url {
- let mut webmention =
- Webmention::new::<Url>(updated_post.ap_id.clone().into(), url.clone().into())?;
- webmention.set_checked(true);
- match webmention
- .send()
- .instrument(tracing::info_span!("Sending webmention"))
- .await
- {
- Ok(_) => {}
- Err(WebmentionError::NoEndpointDiscovered(_)) => {}
- Err(e) => warn!("Failed to send webmention: {}", e),
+ if let Some(url) = updated_post.url.clone() {
+ let task = async move {
+ let mut webmention =
+ Webmention::new::<Url>(updated_post.ap_id.clone().into(), url.clone().into())?;
+ webmention.set_checked(true);
+ match webmention
+ .send()
+ .instrument(tracing::info_span!("Sending webmention"))
+ .await
+ {
+ Err(WebmentionError::NoEndpointDiscovered(_)) => Ok(()),
+ Ok(_) => Ok(()),
+ Err(e) => Err(LemmyError::from_error_message(
+ e,
+ "Couldn't send webmention",
+ )),
+ }
+ };
+ if *SYNCHRONOUS_FEDERATION {
+ task.await?;
+ } else {
+ spawn_try_task(task);
}
- }
+ };
build_post_response(context, community_id, person_id, post_id).await
}
#[async_trait::async_trait]
pub trait SendActivity: Sync {
- type Response: Sync + Send;
+ type Response: Sync + Send + Clone;
async fn send_activity(
_request: &Self,
pub mod utils;
pub mod version;
+use error::LemmyError;
+use futures::Future;
+use once_cell::sync::Lazy;
use std::time::Duration;
+use tracing::Instrument;
pub type ConnectionId = usize;
)
};
}
+
+/// if true, all federation should happen synchronously. useful for debugging and testing.
+/// defaults to true on debug mode, false on releasemode
+/// override to true by setting env LEMMY_SYNCHRONOUS_FEDERATION=1
+/// override to false by setting env LEMMY_SYNCHRONOUS_FEDERATION=""
+pub static SYNCHRONOUS_FEDERATION: Lazy<bool> = Lazy::new(|| {
+ std::env::var("LEMMY_SYNCHRONOUS_FEDERATION")
+ .map(|s| !s.is_empty())
+ .unwrap_or(cfg!(debug_assertions))
+});
+
+/// tokio::spawn, but accepts a future that may fail and also
+/// * logs errors
+/// * attaches the spawned task to the tracing span of the caller for better logging
+pub fn spawn_try_task(task: impl Future<Output = Result<(), LemmyError>> + Send + 'static) {
+ tokio::spawn(
+ async {
+ if let Err(e) = task.await {
+ tracing::warn!("error in spawn: {e}");
+ }
+ }
+ .in_current_span(), // this makes sure the inner tracing gets the same context as where spawn was called
+ );
+}
},
SendActivity,
};
-use lemmy_utils::rate_limit::RateLimitCell;
+use lemmy_utils::{rate_limit::RateLimitCell, spawn_try_task, SYNCHRONOUS_FEDERATION};
use serde::Deserialize;
pub fn config(cfg: &mut web::ServiceConfig, rate_limit: &RateLimitCell) {
+ 'static,
{
let res = data.perform(&context).await?;
- SendActivity::send_activity(&data, &res, &apub_data).await?;
- Ok(HttpResponse::Ok().json(res))
+ let res_clone = res.clone();
+ let fed_task = async move { SendActivity::send_activity(&data, &res_clone, &apub_data).await };
+ if *SYNCHRONOUS_FEDERATION {
+ fed_task.await?;
+ } else {
+ spawn_try_task(fed_task);
+ }
+ Ok(HttpResponse::Ok().json(&res))
}
async fn route_get<'a, Data>(
+ 'static,
{
let res = data.perform(&context).await?;
- SendActivity::send_activity(&data, &res, &apub_data).await?;
- Ok(HttpResponse::Ok().json(res))
+ let res_clone = res.clone();
+ let fed_task = async move { SendActivity::send_activity(&data, &res_clone, &apub_data).await };
+ if *SYNCHRONOUS_FEDERATION {
+ fed_task.await?;
+ } else {
+ spawn_try_task(fed_task);
+ }
+ Ok(HttpResponse::Ok().json(&res))
}
async fn route_get_crud<'a, Data>(
utils::{build_db_pool, get_database_url, run_migrations},
};
use lemmy_routes::{feeds, images, nodeinfo, webfinger};
-use lemmy_utils::{error::LemmyError, rate_limit::RateLimitCell, settings::SETTINGS};
+use lemmy_utils::{
+ error::LemmyError,
+ rate_limit::RateLimitCell,
+ settings::SETTINGS,
+ SYNCHRONOUS_FEDERATION,
+};
use reqwest::Client;
use reqwest_middleware::ClientBuilder;
use reqwest_tracing::TracingMiddleware;
.http_fetch_limit(FEDERATION_HTTP_FETCH_LIMIT)
.worker_count(settings.worker_count)
.retry_count(settings.retry_count)
- .debug(cfg!(debug_assertions))
+ .debug(*SYNCHRONOUS_FEDERATION)
.http_signature_compat(true)
.url_verifier(Box::new(VerifyUrlData(context.pool().clone())))
.build()
.register(Box::new(metrics.available.clone()))
.unwrap();
- return metrics;
+ metrics
}
async fn collect_db_pool_metrics(context: &PromContext) {