]> Untitled Git - lemmy.git/blobdiff - crates/apub/src/fetcher/community.rs
Rewrite fetcher (#1792)
[lemmy.git] / crates / apub / src / fetcher / community.rs
index e1211f33b23103b0a24ee00ab85f6aae7fef4969..99dd3f85e87074cead5988d6231ff1907248ab11 100644 (file)
 use crate::{
-  fetcher::{
-    fetch::fetch_remote_object,
-    get_or_fetch_and_upsert_user,
-    is_deleted,
-    should_refetch_actor,
-  },
-  inbox::user_inbox::receive_announce,
-  objects::FromApub,
-  GroupExt,
-};
-use activitystreams::{
-  actor::ApActorExt,
-  collection::{CollectionExt, OrderedCollection},
-  object::ObjectExt,
+  activities::community::announce::AnnounceActivity,
+  fetcher::{fetch::fetch_remote_object, object_id::ObjectId},
+  objects::community::Group,
 };
+use activitystreams::collection::{CollectionExt, OrderedCollection};
 use anyhow::Context;
-use diesel::result::Error::NotFound;
-use lemmy_db_queries::{source::community::Community_, ApubObject, Joinable};
-use lemmy_db_schema::source::community::{Community, CommunityModerator, CommunityModeratorForm};
-use lemmy_structs::blocking;
+use lemmy_api_common::blocking;
+use lemmy_apub_lib::ActivityHandler;
+use lemmy_db_queries::Joinable;
+use lemmy_db_schema::source::{
+  community::{Community, CommunityModerator, CommunityModeratorForm},
+  person::Person,
+};
+use lemmy_db_views_actor::community_moderator_view::CommunityModeratorView;
 use lemmy_utils::{location_info, LemmyError};
 use lemmy_websocket::LemmyContext;
-use log::debug;
 use url::Url;
 
-/// Get a community from its apub ID.
-///
-/// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
-/// Otherwise it is fetched from the remote instance, stored and returned.
-pub(crate) async fn get_or_fetch_and_upsert_community(
-  apub_id: &Url,
+pub(crate) async fn update_community_mods(
+  group: &Group,
+  community: &Community,
   context: &LemmyContext,
-  recursion_counter: &mut i32,
-) -> Result<Community, LemmyError> {
-  let apub_id_owned = apub_id.to_owned();
-  let community = blocking(context.pool(), move |conn| {
-    Community::read_from_apub_id(conn, &apub_id_owned.into())
+  request_counter: &mut i32,
+) -> Result<(), LemmyError> {
+  let new_moderators = fetch_community_mods(context, group, request_counter).await?;
+  let community_id = community.id;
+  let current_moderators = blocking(context.pool(), move |conn| {
+    CommunityModeratorView::for_community(conn, community_id)
   })
-  .await?;
-
-  match community {
-    Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
-      debug!("Fetching and updating from remote community: {}", apub_id);
-      fetch_remote_community(apub_id, context, Some(c), recursion_counter).await
-    }
-    Ok(c) => Ok(c),
-    Err(NotFound {}) => {
-      debug!("Fetching and creating remote community: {}", apub_id);
-      fetch_remote_community(apub_id, context, None, recursion_counter).await
+  .await??;
+  // Remove old mods from database which arent in the moderators collection anymore
+  for mod_user in &current_moderators {
+    if !new_moderators.contains(&mod_user.moderator.actor_id.clone().into()) {
+      let community_moderator_form = CommunityModeratorForm {
+        community_id: mod_user.community.id,
+        person_id: mod_user.moderator.id,
+      };
+      blocking(context.pool(), move |conn| {
+        CommunityModerator::leave(conn, &community_moderator_form)
+      })
+      .await??;
     }
-    Err(e) => Err(e.into()),
   }
-}
-
-/// Request a community by apub ID from a remote instance, including moderators. If `old_community`,
-/// is set, this is an update for a community which is already known locally. If not, we don't know
-/// the community yet and also pull the outbox, to get some initial posts.
-async fn fetch_remote_community(
-  apub_id: &Url,
-  context: &LemmyContext,
-  old_community: Option<Community>,
-  recursion_counter: &mut i32,
-) -> Result<Community, LemmyError> {
-  let group = fetch_remote_object::<GroupExt>(context.client(), apub_id, recursion_counter).await;
 
-  if let Some(c) = old_community.to_owned() {
-    if is_deleted(&group) {
+  // Add new mods to database which have been added to moderators collection
+  for mod_id in new_moderators {
+    let mod_id = ObjectId::new(mod_id);
+    let mod_user: Person = mod_id.dereference(context, request_counter).await?;
+
+    if !current_moderators
+      .clone()
+      .iter()
+      .map(|c| c.moderator.actor_id.clone())
+      .any(|x| x == mod_user.actor_id)
+    {
+      let community_moderator_form = CommunityModeratorForm {
+        community_id: community.id,
+        person_id: mod_user.id,
+      };
       blocking(context.pool(), move |conn| {
-        Community::update_deleted(conn, c.id, true)
+        CommunityModerator::join(conn, &community_moderator_form)
       })
       .await??;
-    } else if group.is_err() {
-      // If fetching failed, return the existing data.
-      return Ok(c);
     }
   }
 
-  let group = group?;
-  let community =
-    Community::from_apub(&group, context, apub_id.to_owned(), recursion_counter).await?;
-
-  // Also add the community moderators too
-  let attributed_to = group.inner.attributed_to().context(location_info!())?;
-  let creator_and_moderator_uris: Vec<&Url> = attributed_to
-    .as_many()
-    .context(location_info!())?
-    .iter()
-    .map(|a| a.as_xsd_any_uri().context(""))
-    .collect::<Result<Vec<&Url>, anyhow::Error>>()?;
-
-  let mut creator_and_moderators = Vec::new();
-
-  for uri in creator_and_moderator_uris {
-    let c_or_m = get_or_fetch_and_upsert_user(uri, context, recursion_counter).await?;
-
-    creator_and_moderators.push(c_or_m);
-  }
-
-  // TODO: need to make this work to update mods of existing communities
-  if old_community.is_none() {
-    let community_id = community.id;
-    blocking(context.pool(), move |conn| {
-      for mod_ in creator_and_moderators {
-        let community_moderator_form = CommunityModeratorForm {
-          community_id,
-          person_id: mod_.id,
-        };
-
-        CommunityModerator::join(conn, &community_moderator_form)?;
-      }
-      Ok(()) as Result<(), LemmyError>
-    })
-    .await??;
-  }
-
-  // only fetch outbox for new communities, otherwise this can create an infinite loop
-  if old_community.is_none() {
-    let outbox = group.inner.outbox()?.context(location_info!())?;
-    fetch_community_outbox(context, outbox, &community, recursion_counter).await?
-  }
-
-  Ok(community)
+  Ok(())
 }
 
-async fn fetch_community_outbox(
+pub(crate) async fn fetch_community_outbox(
   context: &LemmyContext,
   outbox: &Url,
-  community: &Community,
   recursion_counter: &mut i32,
 ) -> Result<(), LemmyError> {
   let outbox =
@@ -137,9 +81,37 @@ async fn fetch_community_outbox(
     outbox_activities = outbox_activities[0..20].to_vec();
   }
 
-  for activity in outbox_activities {
-    receive_announce(context, activity, community, recursion_counter).await?;
+  for announce in outbox_activities {
+    // TODO: instead of converting like this, we should create a struct CommunityOutbox with
+    //       AnnounceActivity as inner type, but that gives me stackoverflow
+    let ser = serde_json::to_string(&announce)?;
+    let announce: AnnounceActivity = serde_json::from_str(&ser)?;
+    announce.receive(context, recursion_counter).await?;
   }
 
   Ok(())
 }
+
+async fn fetch_community_mods(
+  context: &LemmyContext,
+  group: &Group,
+  recursion_counter: &mut i32,
+) -> Result<Vec<Url>, LemmyError> {
+  if let Some(mods_url) = &group.moderators {
+    let mods =
+      fetch_remote_object::<OrderedCollection>(context.client(), mods_url, recursion_counter)
+        .await?;
+    let mods = mods
+      .items()
+      .map(|i| i.as_many())
+      .flatten()
+      .context(location_info!())?
+      .iter()
+      .filter_map(|i| i.as_xsd_any_uri())
+      .map(|u| u.to_owned())
+      .collect();
+    Ok(mods)
+  } else {
+    Ok(vec![])
+  }
+}