From 7d8cb93b5323eb50bd90afde270685d1ecd07061 Mon Sep 17 00:00:00 2001 From: Nutomic <me@nutomic.com> Date: Thu, 13 Jul 2023 16:12:01 +0200 Subject: [PATCH] Check for dead federated instances (fixes #2221) (#3427) * Check for dead federated instances (fixes #2221) * move to apub crate, use timestamp * make it compile * clippy * use moka to cache blocklists, dead instances, restore orig scheduled tasks * remove leftover last_alive var * error handling * wip * fix alive check for instances without nodeinfo, add coalesce * clippy * move federation blocklist cache to #3486 * unused deps --- crates/apub/src/activities/mod.rs | 31 +++++++- crates/db_schema/src/impls/instance.rs | 29 +++++++- crates/db_schema/src/impls/site.rs | 1 - crates/db_schema/src/source/site.rs | 15 ++-- src/scheduled_tasks.rs | 98 ++++++++++++++------------ 5 files changed, 117 insertions(+), 57 deletions(-) diff --git a/crates/apub/src/activities/mod.rs b/crates/apub/src/activities/mod.rs index 41a66c7d..e0b46e0e 100644 --- a/crates/apub/src/activities/mod.rs +++ b/crates/apub/src/activities/mod.rs @@ -13,11 +13,16 @@ use activitypub_federation::{ }; use anyhow::anyhow; use lemmy_api_common::context::LemmyContext; -use lemmy_db_schema::{newtypes::CommunityId, source::community::Community}; +use lemmy_db_schema::{ + newtypes::CommunityId, + source::{community::Community, instance::Instance}, +}; use lemmy_db_views_actor::structs::{CommunityPersonBanView, CommunityView}; use lemmy_utils::error::{LemmyError, LemmyErrorExt, LemmyErrorType}; +use moka::future::Cache; +use once_cell::sync::Lazy; use serde::Serialize; -use std::ops::Deref; +use std::{ops::Deref, sync::Arc, time::Duration}; use tracing::info; use url::{ParseError, Url}; use uuid::Uuid; @@ -30,6 +35,10 @@ pub mod following; pub mod unfederated; pub mod voting; +/// Amount of time that the list of dead instances is cached. This is only updated once a day, +/// so there is no harm in caching it for a longer time. +pub static DEAD_INSTANCE_LIST_CACHE_DURATION: Duration = Duration::from_secs(30 * 60); + /// Checks that the specified Url actually identifies a Person (by fetching it), and that the person /// doesn't have a site ban. #[tracing::instrument(skip_all)] @@ -148,7 +157,7 @@ async fn send_lemmy_activity<Activity, ActorT>( data: &Data<LemmyContext>, activity: Activity, actor: &ActorT, - inbox: Vec<Url>, + mut inbox: Vec<Url>, sensitive: bool, ) -> Result<(), LemmyError> where @@ -156,6 +165,22 @@ where ActorT: Actor, Activity: ActivityHandler<Error = LemmyError>, { + static CACHE: Lazy<Cache<(), Arc<Vec<String>>>> = Lazy::new(|| { + Cache::builder() + .max_capacity(1) + .time_to_live(DEAD_INSTANCE_LIST_CACHE_DURATION) + .build() + }); + let dead_instances = CACHE + .try_get_with((), async { + Ok::<_, diesel::result::Error>(Arc::new(Instance::dead_instances(&mut data.pool()).await?)) + }) + .await?; + + inbox.retain(|i| { + let domain = i.domain().expect("has domain").to_string(); + !dead_instances.contains(&domain) + }); info!("Sending activity {}", activity.id().to_string()); let activity = WithContext::new(activity, CONTEXT.deref().clone()); diff --git a/crates/db_schema/src/impls/instance.rs b/crates/db_schema/src/impls/instance.rs index 068e317f..d6a23a71 100644 --- a/crates/db_schema/src/impls/instance.rs +++ b/crates/db_schema/src/impls/instance.rs @@ -1,10 +1,17 @@ use crate::{ + diesel::dsl::IntervalDsl, newtypes::InstanceId, schema::{federation_allowlist, federation_blocklist, instance}, source::instance::{Instance, InstanceForm}, utils::{get_conn, naive_now, DbPool}, }; -use diesel::{dsl::insert_into, result::Error, ExpressionMethods, QueryDsl}; +use diesel::{ + dsl::{insert_into, now}, + result::Error, + sql_types::{Nullable, Timestamp}, + ExpressionMethods, + QueryDsl, +}; use diesel_async::RunQueryDsl; impl Instance { @@ -46,6 +53,24 @@ impl Instance { .execute(conn) .await } + + pub async fn read_all(pool: &mut DbPool<'_>) -> Result<Vec<Instance>, Error> { + let conn = &mut get_conn(pool).await?; + instance::table + .select(instance::all_columns) + .get_results(conn) + .await + } + + pub async fn dead_instances(pool: &mut DbPool<'_>) -> Result<Vec<String>, Error> { + let conn = &mut get_conn(pool).await?; + instance::table + .select(instance::domain) + .filter(coalesce(instance::updated, instance::published).lt(now - 3.days())) + .get_results(conn) + .await + } + #[cfg(test)] pub async fn delete_all(pool: &mut DbPool<'_>) -> Result<usize, Error> { let conn = &mut get_conn(pool).await?; @@ -79,3 +104,5 @@ impl Instance { .await } } + +sql_function! { fn coalesce(x: Nullable<Timestamp>, y: Timestamp) -> Timestamp; } diff --git a/crates/db_schema/src/impls/site.rs b/crates/db_schema/src/impls/site.rs index 806def96..2820e9cd 100644 --- a/crates/db_schema/src/impls/site.rs +++ b/crates/db_schema/src/impls/site.rs @@ -81,7 +81,6 @@ impl Site { ) } - // TODO this needs fixed pub async fn read_remote_sites(pool: &mut DbPool<'_>) -> Result<Vec<Self>, Error> { let conn = &mut get_conn(pool).await?; site.order_by(id).offset(1).get_results::<Self>(conn).await diff --git a/crates/db_schema/src/source/site.rs b/crates/db_schema/src/source/site.rs index f6a19b21..12b30c58 100644 --- a/crates/db_schema/src/source/site.rs +++ b/crates/db_schema/src/source/site.rs @@ -1,6 +1,7 @@ use crate::newtypes::{DbUrl, InstanceId, SiteId}; #[cfg(feature = "full")] use crate::schema::site; +use chrono::NaiveDateTime; use serde::{Deserialize, Serialize}; use serde_with::skip_serializing_none; #[cfg(feature = "full")] @@ -18,8 +19,8 @@ pub struct Site { pub name: String, /// A sidebar for the site in markdown. pub sidebar: Option<String>, - pub published: chrono::NaiveDateTime, - pub updated: Option<chrono::NaiveDateTime>, + pub published: NaiveDateTime, + pub updated: Option<NaiveDateTime>, /// An icon URL. pub icon: Option<DbUrl>, /// A banner url. @@ -29,7 +30,7 @@ pub struct Site { /// The federated actor_id. pub actor_id: DbUrl, /// The time the site was last refreshed. - pub last_refreshed_at: chrono::NaiveDateTime, + pub last_refreshed_at: NaiveDateTime, /// The site inbox pub inbox_url: DbUrl, pub private_key: Option<String>, @@ -45,12 +46,12 @@ pub struct SiteInsertForm { #[builder(!default)] pub name: String, pub sidebar: Option<String>, - pub updated: Option<chrono::NaiveDateTime>, + pub updated: Option<NaiveDateTime>, pub icon: Option<DbUrl>, pub banner: Option<DbUrl>, pub description: Option<String>, pub actor_id: Option<DbUrl>, - pub last_refreshed_at: Option<chrono::NaiveDateTime>, + pub last_refreshed_at: Option<NaiveDateTime>, pub inbox_url: Option<DbUrl>, pub private_key: Option<String>, pub public_key: Option<String>, @@ -65,13 +66,13 @@ pub struct SiteInsertForm { pub struct SiteUpdateForm { pub name: Option<String>, pub sidebar: Option<Option<String>>, - pub updated: Option<Option<chrono::NaiveDateTime>>, + pub updated: Option<Option<NaiveDateTime>>, // when you want to null out a column, you have to send Some(None)), since sending None means you just don't want to update that column. pub icon: Option<Option<DbUrl>>, pub banner: Option<Option<DbUrl>>, pub description: Option<Option<String>>, pub actor_id: Option<DbUrl>, - pub last_refreshed_at: Option<chrono::NaiveDateTime>, + pub last_refreshed_at: Option<NaiveDateTime>, pub inbox_url: Option<DbUrl>, pub private_key: Option<Option<String>>, pub public_key: Option<String>, diff --git a/src/scheduled_tasks.rs b/src/scheduled_tasks.rs index a052585e..f20e61e1 100644 --- a/src/scheduled_tasks.rs +++ b/src/scheduled_tasks.rs @@ -18,10 +18,13 @@ use lemmy_db_schema::{ utils::{naive_now, DELETED_REPLACEMENT_TEXT}, }; use lemmy_routes::nodeinfo::NodeInfo; -use lemmy_utils::{error::LemmyError, REQWEST_TIMEOUT}; +use lemmy_utils::{ + error::{LemmyError, LemmyResult}, + REQWEST_TIMEOUT, +}; use reqwest::blocking::Client; use std::{thread, time::Duration}; -use tracing::{error, info}; +use tracing::{error, info, warn}; /// Schedules various cleanup tasks for lemmy in a background thread pub fn setup( @@ -79,7 +82,9 @@ pub fn setup( // Update the Instance Software scheduler.every(CTimeUnits::days(1)).run(move || { let mut conn = PgConnection::establish(&db_url).expect("could not establish connection"); - update_instance_software(&mut conn, &user_agent); + update_instance_software(&mut conn, &user_agent) + .map_err(|e| warn!("Failed to update instance software: {e}")) + .ok(); }); // Manually run the scheduler in an event loop @@ -323,62 +328,65 @@ fn update_banned_when_expired(conn: &mut PgConnection) { } /// Updates the instance software and version -fn update_instance_software(conn: &mut PgConnection, user_agent: &str) { +/// +/// TODO: this should be async +/// TODO: if instance has been dead for a long time, it should be checked less frequently +fn update_instance_software(conn: &mut PgConnection, user_agent: &str) -> LemmyResult<()> { info!("Updating instances software and versions..."); - let client = match Client::builder() + let client = Client::builder() .user_agent(user_agent) .timeout(REQWEST_TIMEOUT) - .build() - { - Ok(client) => client, - Err(e) => { - error!("Failed to build reqwest client: {}", e); - return; - } - }; + .build()?; - let instances = match instance::table.get_results::<Instance>(conn) { - Ok(instances) => instances, - Err(e) => { - error!("Failed to get instances: {}", e); - return; - } - }; + let instances = instance::table.get_results::<Instance>(conn)?; for instance in instances { let node_info_url = format!("https://{}/nodeinfo/2.0.json", instance.domain); - // Skip it if it can't connect - let res = client - .get(&node_info_url) - .send() - .ok() - .and_then(|t| t.json::<NodeInfo>().ok()); - - if let Some(node_info) = res { - let software = node_info.software.as_ref(); - let form = InstanceForm::builder() - .domain(instance.domain) - .software(software.and_then(|s| s.name.clone())) - .version(software.and_then(|s| s.version.clone())) - .updated(Some(naive_now())) - .build(); - - match diesel::update(instance::table.find(instance.id)) - .set(form) - .execute(conn) - { - Ok(_) => { - info!("Done."); + // The `updated` column is used to check if instances are alive. If it is more than three days + // in the past, no outgoing activities will be sent to that instance. However not every + // Fediverse instance has a valid Nodeinfo endpoint (its not required for Activitypub). That's + // why we always need to mark instances as updated if they are alive. + let default_form = InstanceForm::builder() + .domain(instance.domain.clone()) + .updated(Some(naive_now())) + .build(); + let form = match client.get(&node_info_url).send() { + Ok(res) if res.status().is_client_error() => { + // Instance doesnt have nodeinfo but sent a response, consider it alive + Some(default_form) + } + Ok(res) => match res.json::<NodeInfo>() { + Ok(node_info) => { + // Instance sent valid nodeinfo, write it to db + Some( + InstanceForm::builder() + .domain(instance.domain) + .updated(Some(naive_now())) + .software(node_info.software.and_then(|s| s.name)) + .version(node_info.version.clone()) + .build(), + ) } - Err(e) => { - error!("Failed to update site instance software: {}", e); - return; + Err(_) => { + // No valid nodeinfo but valid HTTP response, consider instance alive + Some(default_form) } + }, + Err(_) => { + // dead instance, do nothing + None } + }; + if let Some(form) = form { + diesel::update(instance::table.find(instance.id)) + .set(form) + .execute(conn)?; } } + info!("Finished updating instances software and versions..."); + Ok(()) } #[cfg(test)] -- 2.44.1