From 7d8cb93b5323eb50bd90afde270685d1ecd07061 Mon Sep 17 00:00:00 2001
From: Nutomic <me@nutomic.com>
Date: Thu, 13 Jul 2023 16:12:01 +0200
Subject: [PATCH] Check for dead federated instances (fixes #2221) (#3427)

* Check for dead federated instances (fixes #2221)

* move to apub crate, use timestamp

* make it compile

* clippy

* use moka to cache blocklists, dead instances, restore orig scheduled tasks

* remove leftover last_alive var

* error handling

* wip

* fix alive check for instances without nodeinfo, add coalesce

* clippy

* move federation blocklist cache to #3486

* unused deps
---
 crates/apub/src/activities/mod.rs      | 31 +++++++-
 crates/db_schema/src/impls/instance.rs | 29 +++++++-
 crates/db_schema/src/impls/site.rs     |  1 -
 crates/db_schema/src/source/site.rs    | 15 ++--
 src/scheduled_tasks.rs                 | 98 ++++++++++++++------------
 5 files changed, 117 insertions(+), 57 deletions(-)

diff --git a/crates/apub/src/activities/mod.rs b/crates/apub/src/activities/mod.rs
index 41a66c7d..e0b46e0e 100644
--- a/crates/apub/src/activities/mod.rs
+++ b/crates/apub/src/activities/mod.rs
@@ -13,11 +13,16 @@ use activitypub_federation::{
 };
 use anyhow::anyhow;
 use lemmy_api_common::context::LemmyContext;
-use lemmy_db_schema::{newtypes::CommunityId, source::community::Community};
+use lemmy_db_schema::{
+  newtypes::CommunityId,
+  source::{community::Community, instance::Instance},
+};
 use lemmy_db_views_actor::structs::{CommunityPersonBanView, CommunityView};
 use lemmy_utils::error::{LemmyError, LemmyErrorExt, LemmyErrorType};
+use moka::future::Cache;
+use once_cell::sync::Lazy;
 use serde::Serialize;
-use std::ops::Deref;
+use std::{ops::Deref, sync::Arc, time::Duration};
 use tracing::info;
 use url::{ParseError, Url};
 use uuid::Uuid;
@@ -30,6 +35,10 @@ pub mod following;
 pub mod unfederated;
 pub mod voting;
 
+/// Amount of time that the list of dead instances is cached. This is only updated once a day,
+/// so there is no harm in caching it for a longer time.
+pub static DEAD_INSTANCE_LIST_CACHE_DURATION: Duration = Duration::from_secs(30 * 60);
+
 /// Checks that the specified Url actually identifies a Person (by fetching it), and that the person
 /// doesn't have a site ban.
 #[tracing::instrument(skip_all)]
@@ -148,7 +157,7 @@ async fn send_lemmy_activity<Activity, ActorT>(
   data: &Data<LemmyContext>,
   activity: Activity,
   actor: &ActorT,
-  inbox: Vec<Url>,
+  mut inbox: Vec<Url>,
   sensitive: bool,
 ) -> Result<(), LemmyError>
 where
@@ -156,6 +165,22 @@ where
   ActorT: Actor,
   Activity: ActivityHandler<Error = LemmyError>,
 {
+  static CACHE: Lazy<Cache<(), Arc<Vec<String>>>> = Lazy::new(|| {
+    Cache::builder()
+      .max_capacity(1)
+      .time_to_live(DEAD_INSTANCE_LIST_CACHE_DURATION)
+      .build()
+  });
+  let dead_instances = CACHE
+    .try_get_with((), async {
+      Ok::<_, diesel::result::Error>(Arc::new(Instance::dead_instances(&mut data.pool()).await?))
+    })
+    .await?;
+
+  inbox.retain(|i| {
+    let domain = i.domain().expect("has domain").to_string();
+    !dead_instances.contains(&domain)
+  });
   info!("Sending activity {}", activity.id().to_string());
   let activity = WithContext::new(activity, CONTEXT.deref().clone());
 
diff --git a/crates/db_schema/src/impls/instance.rs b/crates/db_schema/src/impls/instance.rs
index 068e317f..d6a23a71 100644
--- a/crates/db_schema/src/impls/instance.rs
+++ b/crates/db_schema/src/impls/instance.rs
@@ -1,10 +1,17 @@
 use crate::{
+  diesel::dsl::IntervalDsl,
   newtypes::InstanceId,
   schema::{federation_allowlist, federation_blocklist, instance},
   source::instance::{Instance, InstanceForm},
   utils::{get_conn, naive_now, DbPool},
 };
-use diesel::{dsl::insert_into, result::Error, ExpressionMethods, QueryDsl};
+use diesel::{
+  dsl::{insert_into, now},
+  result::Error,
+  sql_types::{Nullable, Timestamp},
+  ExpressionMethods,
+  QueryDsl,
+};
 use diesel_async::RunQueryDsl;
 
 impl Instance {
@@ -46,6 +53,24 @@ impl Instance {
       .execute(conn)
       .await
   }
+
+  pub async fn read_all(pool: &mut DbPool<'_>) -> Result<Vec<Instance>, Error> {
+    let conn = &mut get_conn(pool).await?;
+    instance::table
+      .select(instance::all_columns)
+      .get_results(conn)
+      .await
+  }
+
+  pub async fn dead_instances(pool: &mut DbPool<'_>) -> Result<Vec<String>, Error> {
+    let conn = &mut get_conn(pool).await?;
+    instance::table
+      .select(instance::domain)
+      .filter(coalesce(instance::updated, instance::published).lt(now - 3.days()))
+      .get_results(conn)
+      .await
+  }
+
   #[cfg(test)]
   pub async fn delete_all(pool: &mut DbPool<'_>) -> Result<usize, Error> {
     let conn = &mut get_conn(pool).await?;
@@ -79,3 +104,5 @@ impl Instance {
       .await
   }
 }
+
+sql_function! { fn coalesce(x: Nullable<Timestamp>, y: Timestamp) -> Timestamp; }
diff --git a/crates/db_schema/src/impls/site.rs b/crates/db_schema/src/impls/site.rs
index 806def96..2820e9cd 100644
--- a/crates/db_schema/src/impls/site.rs
+++ b/crates/db_schema/src/impls/site.rs
@@ -81,7 +81,6 @@ impl Site {
     )
   }
 
-  // TODO this needs fixed
   pub async fn read_remote_sites(pool: &mut DbPool<'_>) -> Result<Vec<Self>, Error> {
     let conn = &mut get_conn(pool).await?;
     site.order_by(id).offset(1).get_results::<Self>(conn).await
diff --git a/crates/db_schema/src/source/site.rs b/crates/db_schema/src/source/site.rs
index f6a19b21..12b30c58 100644
--- a/crates/db_schema/src/source/site.rs
+++ b/crates/db_schema/src/source/site.rs
@@ -1,6 +1,7 @@
 use crate::newtypes::{DbUrl, InstanceId, SiteId};
 #[cfg(feature = "full")]
 use crate::schema::site;
+use chrono::NaiveDateTime;
 use serde::{Deserialize, Serialize};
 use serde_with::skip_serializing_none;
 #[cfg(feature = "full")]
@@ -18,8 +19,8 @@ pub struct Site {
   pub name: String,
   /// A sidebar for the site in markdown.
   pub sidebar: Option<String>,
-  pub published: chrono::NaiveDateTime,
-  pub updated: Option<chrono::NaiveDateTime>,
+  pub published: NaiveDateTime,
+  pub updated: Option<NaiveDateTime>,
   /// An icon URL.
   pub icon: Option<DbUrl>,
   /// A banner url.
@@ -29,7 +30,7 @@ pub struct Site {
   /// The federated actor_id.
   pub actor_id: DbUrl,
   /// The time the site was last refreshed.
-  pub last_refreshed_at: chrono::NaiveDateTime,
+  pub last_refreshed_at: NaiveDateTime,
   /// The site inbox
   pub inbox_url: DbUrl,
   pub private_key: Option<String>,
@@ -45,12 +46,12 @@ pub struct SiteInsertForm {
   #[builder(!default)]
   pub name: String,
   pub sidebar: Option<String>,
-  pub updated: Option<chrono::NaiveDateTime>,
+  pub updated: Option<NaiveDateTime>,
   pub icon: Option<DbUrl>,
   pub banner: Option<DbUrl>,
   pub description: Option<String>,
   pub actor_id: Option<DbUrl>,
-  pub last_refreshed_at: Option<chrono::NaiveDateTime>,
+  pub last_refreshed_at: Option<NaiveDateTime>,
   pub inbox_url: Option<DbUrl>,
   pub private_key: Option<String>,
   pub public_key: Option<String>,
@@ -65,13 +66,13 @@ pub struct SiteInsertForm {
 pub struct SiteUpdateForm {
   pub name: Option<String>,
   pub sidebar: Option<Option<String>>,
-  pub updated: Option<Option<chrono::NaiveDateTime>>,
+  pub updated: Option<Option<NaiveDateTime>>,
   // when you want to null out a column, you have to send Some(None)), since sending None means you just don't want to update that column.
   pub icon: Option<Option<DbUrl>>,
   pub banner: Option<Option<DbUrl>>,
   pub description: Option<Option<String>>,
   pub actor_id: Option<DbUrl>,
-  pub last_refreshed_at: Option<chrono::NaiveDateTime>,
+  pub last_refreshed_at: Option<NaiveDateTime>,
   pub inbox_url: Option<DbUrl>,
   pub private_key: Option<Option<String>>,
   pub public_key: Option<String>,
diff --git a/src/scheduled_tasks.rs b/src/scheduled_tasks.rs
index a052585e..f20e61e1 100644
--- a/src/scheduled_tasks.rs
+++ b/src/scheduled_tasks.rs
@@ -18,10 +18,13 @@ use lemmy_db_schema::{
   utils::{naive_now, DELETED_REPLACEMENT_TEXT},
 };
 use lemmy_routes::nodeinfo::NodeInfo;
-use lemmy_utils::{error::LemmyError, REQWEST_TIMEOUT};
+use lemmy_utils::{
+  error::{LemmyError, LemmyResult},
+  REQWEST_TIMEOUT,
+};
 use reqwest::blocking::Client;
 use std::{thread, time::Duration};
-use tracing::{error, info};
+use tracing::{error, info, warn};
 
 /// Schedules various cleanup tasks for lemmy in a background thread
 pub fn setup(
@@ -79,7 +82,9 @@ pub fn setup(
   // Update the Instance Software
   scheduler.every(CTimeUnits::days(1)).run(move || {
     let mut conn = PgConnection::establish(&db_url).expect("could not establish connection");
-    update_instance_software(&mut conn, &user_agent);
+    update_instance_software(&mut conn, &user_agent)
+      .map_err(|e| warn!("Failed to update instance software: {e}"))
+      .ok();
   });
 
   // Manually run the scheduler in an event loop
@@ -323,62 +328,65 @@ fn update_banned_when_expired(conn: &mut PgConnection) {
 }
 
 /// Updates the instance software and version
-fn update_instance_software(conn: &mut PgConnection, user_agent: &str) {
+///
+/// TODO: this should be async
+/// TODO: if instance has been dead for a long time, it should be checked less frequently
+fn update_instance_software(conn: &mut PgConnection, user_agent: &str) -> LemmyResult<()> {
   info!("Updating instances software and versions...");
 
-  let client = match Client::builder()
+  let client = Client::builder()
     .user_agent(user_agent)
     .timeout(REQWEST_TIMEOUT)
-    .build()
-  {
-    Ok(client) => client,
-    Err(e) => {
-      error!("Failed to build reqwest client: {}", e);
-      return;
-    }
-  };
+    .build()?;
 
-  let instances = match instance::table.get_results::<Instance>(conn) {
-    Ok(instances) => instances,
-    Err(e) => {
-      error!("Failed to get instances: {}", e);
-      return;
-    }
-  };
+  let instances = instance::table.get_results::<Instance>(conn)?;
 
   for instance in instances {
     let node_info_url = format!("https://{}/nodeinfo/2.0.json", instance.domain);
 
-    // Skip it if it can't connect
-    let res = client
-      .get(&node_info_url)
-      .send()
-      .ok()
-      .and_then(|t| t.json::<NodeInfo>().ok());
-
-    if let Some(node_info) = res {
-      let software = node_info.software.as_ref();
-      let form = InstanceForm::builder()
-        .domain(instance.domain)
-        .software(software.and_then(|s| s.name.clone()))
-        .version(software.and_then(|s| s.version.clone()))
-        .updated(Some(naive_now()))
-        .build();
-
-      match diesel::update(instance::table.find(instance.id))
-        .set(form)
-        .execute(conn)
-      {
-        Ok(_) => {
-          info!("Done.");
+    // The `updated` column is used to check if instances are alive. If it is more than three days
+    // in the past, no outgoing activities will be sent to that instance. However not every
+    // Fediverse instance has a valid Nodeinfo endpoint (its not required for Activitypub). That's
+    // why we always need to mark instances as updated if they are alive.
+    let default_form = InstanceForm::builder()
+      .domain(instance.domain.clone())
+      .updated(Some(naive_now()))
+      .build();
+    let form = match client.get(&node_info_url).send() {
+      Ok(res) if res.status().is_client_error() => {
+        // Instance doesnt have nodeinfo but sent a response, consider it alive
+        Some(default_form)
+      }
+      Ok(res) => match res.json::<NodeInfo>() {
+        Ok(node_info) => {
+          // Instance sent valid nodeinfo, write it to db
+          Some(
+            InstanceForm::builder()
+              .domain(instance.domain)
+              .updated(Some(naive_now()))
+              .software(node_info.software.and_then(|s| s.name))
+              .version(node_info.version.clone())
+              .build(),
+          )
         }
-        Err(e) => {
-          error!("Failed to update site instance software: {}", e);
-          return;
+        Err(_) => {
+          // No valid nodeinfo but valid HTTP response, consider instance alive
+          Some(default_form)
         }
+      },
+      Err(_) => {
+        // dead instance, do nothing
+        None
       }
+    };
+    if let Some(form) = form {
+      diesel::update(instance::table.find(instance.id))
+        .set(form)
+        .execute(conn)?;
     }
   }
+  info!("Finished updating instances software and versions...");
+  Ok(())
 }
 
 #[cfg(test)]
-- 
2.44.1