]> Untitled Git - lemmy.git/blob - crates/apub/src/fetcher/community.rs
Merge pull request #1660 from LemmyNet/merge-apub-crates
[lemmy.git] / crates / apub / src / fetcher / community.rs
1 use crate::{
2   activities::community::announce::AnnounceActivity,
3   fetcher::{
4     fetch::fetch_remote_object,
5     is_deleted,
6     person::get_or_fetch_and_upsert_person,
7     should_refetch_actor,
8   },
9   objects::FromApub,
10   GroupExt,
11 };
12 use activitystreams::{
13   actor::ApActorExt,
14   collection::{CollectionExt, OrderedCollection},
15 };
16 use anyhow::Context;
17 use diesel::result::Error::NotFound;
18 use lemmy_api_common::blocking;
19 use lemmy_apub_lib::ActivityHandler;
20 use lemmy_db_queries::{source::community::Community_, ApubObject, Joinable};
21 use lemmy_db_schema::source::community::{Community, CommunityModerator, CommunityModeratorForm};
22 use lemmy_db_views_actor::community_moderator_view::CommunityModeratorView;
23 use lemmy_utils::{location_info, LemmyError};
24 use lemmy_websocket::LemmyContext;
25 use log::debug;
26 use url::Url;
27
28 /// Get a community from its apub ID.
29 ///
30 /// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
31 /// Otherwise it is fetched from the remote instance, stored and returned.
32 pub async fn get_or_fetch_and_upsert_community(
33   apub_id: &Url,
34   context: &LemmyContext,
35   recursion_counter: &mut i32,
36 ) -> Result<Community, LemmyError> {
37   let apub_id_owned = apub_id.to_owned();
38   let community = blocking(context.pool(), move |conn| {
39     Community::read_from_apub_id(conn, &apub_id_owned.into())
40   })
41   .await?;
42
43   match community {
44     Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
45       debug!("Fetching and updating from remote community: {}", apub_id);
46       fetch_remote_community(apub_id, context, Some(c), recursion_counter).await
47     }
48     Ok(c) => Ok(c),
49     Err(NotFound {}) => {
50       debug!("Fetching and creating remote community: {}", apub_id);
51       fetch_remote_community(apub_id, context, None, recursion_counter).await
52     }
53     Err(e) => Err(e.into()),
54   }
55 }
56
57 /// Request a community by apub ID from a remote instance, including moderators. If `old_community`,
58 /// is set, this is an update for a community which is already known locally. If not, we don't know
59 /// the community yet and also pull the outbox, to get some initial posts.
60 async fn fetch_remote_community(
61   apub_id: &Url,
62   context: &LemmyContext,
63   old_community: Option<Community>,
64   request_counter: &mut i32,
65 ) -> Result<Community, LemmyError> {
66   let group = fetch_remote_object::<GroupExt>(context.client(), apub_id, request_counter).await;
67
68   if let Some(c) = old_community.to_owned() {
69     if is_deleted(&group) {
70       blocking(context.pool(), move |conn| {
71         Community::update_deleted(conn, c.id, true)
72       })
73       .await??;
74     } else if group.is_err() {
75       // If fetching failed, return the existing data.
76       return Ok(c);
77     }
78   }
79
80   let group = group?;
81   let community =
82     Community::from_apub(&group, context, apub_id.to_owned(), request_counter, false).await?;
83
84   update_community_mods(&group, &community, context, request_counter).await?;
85
86   // only fetch outbox for new communities, otherwise this can create an infinite loop
87   if old_community.is_none() {
88     let outbox = group.inner.outbox()?.context(location_info!())?;
89     fetch_community_outbox(context, outbox, request_counter).await?
90   }
91
92   Ok(community)
93 }
94
95 async fn update_community_mods(
96   group: &GroupExt,
97   community: &Community,
98   context: &LemmyContext,
99   request_counter: &mut i32,
100 ) -> Result<(), LemmyError> {
101   let new_moderators = fetch_community_mods(context, group, request_counter).await?;
102   let community_id = community.id;
103   let current_moderators = blocking(context.pool(), move |conn| {
104     CommunityModeratorView::for_community(conn, community_id)
105   })
106   .await??;
107   // Remove old mods from database which arent in the moderators collection anymore
108   for mod_user in &current_moderators {
109     if !new_moderators.contains(&mod_user.moderator.actor_id.clone().into()) {
110       let community_moderator_form = CommunityModeratorForm {
111         community_id: mod_user.community.id,
112         person_id: mod_user.moderator.id,
113       };
114       blocking(context.pool(), move |conn| {
115         CommunityModerator::leave(conn, &community_moderator_form)
116       })
117       .await??;
118     }
119   }
120
121   // Add new mods to database which have been added to moderators collection
122   for mod_uri in new_moderators {
123     let mod_user = get_or_fetch_and_upsert_person(&mod_uri, context, request_counter).await?;
124
125     if !current_moderators
126       .clone()
127       .iter()
128       .map(|c| c.moderator.actor_id.clone())
129       .any(|x| x == mod_user.actor_id)
130     {
131       let community_moderator_form = CommunityModeratorForm {
132         community_id: community.id,
133         person_id: mod_user.id,
134       };
135       blocking(context.pool(), move |conn| {
136         CommunityModerator::join(conn, &community_moderator_form)
137       })
138       .await??;
139     }
140   }
141
142   Ok(())
143 }
144
145 async fn fetch_community_outbox(
146   context: &LemmyContext,
147   outbox: &Url,
148   recursion_counter: &mut i32,
149 ) -> Result<(), LemmyError> {
150   let outbox =
151     fetch_remote_object::<OrderedCollection>(context.client(), outbox, recursion_counter).await?;
152   let outbox_activities = outbox.items().context(location_info!())?.clone();
153   let mut outbox_activities = outbox_activities.many().context(location_info!())?;
154   if outbox_activities.len() > 20 {
155     outbox_activities = outbox_activities[0..20].to_vec();
156   }
157
158   for announce in outbox_activities {
159     // TODO: instead of converting like this, we should create a struct CommunityOutbox with
160     //       AnnounceActivity as inner type, but that gives me stackoverflow
161     let ser = serde_json::to_string(&announce)?;
162     let announce: AnnounceActivity = serde_json::from_str(&ser)?;
163     announce.receive(context, recursion_counter).await?;
164   }
165
166   Ok(())
167 }
168
169 pub(crate) async fn fetch_community_mods(
170   context: &LemmyContext,
171   group: &GroupExt,
172   recursion_counter: &mut i32,
173 ) -> Result<Vec<Url>, LemmyError> {
174   if let Some(mods_url) = &group.ext_one.moderators {
175     let mods =
176       fetch_remote_object::<OrderedCollection>(context.client(), mods_url, recursion_counter)
177         .await?;
178     let mods = mods
179       .items()
180       .map(|i| i.as_many())
181       .flatten()
182       .context(location_info!())?
183       .iter()
184       .filter_map(|i| i.as_xsd_any_uri())
185       .map(|u| u.to_owned())
186       .collect();
187     Ok(mods)
188   } else {
189     Ok(vec![])
190   }
191 }