]> Untitled Git - lemmy.git/blob - crates/apub/src/fetcher/community.rs
6187ac10cf3fe58a33b9c74de7b2f34c680d003f
[lemmy.git] / crates / apub / src / fetcher / community.rs
1 use crate::{
2   fetcher::{
3     fetch::fetch_remote_object,
4     is_deleted,
5     person::get_or_fetch_and_upsert_person,
6     should_refetch_actor,
7   },
8   objects::FromApub,
9   GroupExt,
10 };
11 use activitystreams::{
12   actor::ApActorExt,
13   collection::{CollectionExt, OrderedCollection},
14 };
15 use anyhow::Context;
16 use diesel::result::Error::NotFound;
17 use lemmy_api_common::blocking;
18 use lemmy_db_queries::{source::community::Community_, ApubObject, Joinable};
19 use lemmy_db_schema::{
20   source::community::{Community, CommunityModerator, CommunityModeratorForm},
21   DbUrl,
22 };
23 use lemmy_db_views_actor::community_moderator_view::CommunityModeratorView;
24 use lemmy_utils::{location_info, LemmyError};
25 use lemmy_websocket::LemmyContext;
26 use log::debug;
27 use url::Url;
28
29 /// Get a community from its apub ID.
30 ///
31 /// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
32 /// Otherwise it is fetched from the remote instance, stored and returned.
33 pub async fn get_or_fetch_and_upsert_community(
34   apub_id: &Url,
35   context: &LemmyContext,
36   recursion_counter: &mut i32,
37 ) -> Result<Community, LemmyError> {
38   let apub_id_owned = apub_id.to_owned();
39   let community = blocking(context.pool(), move |conn| {
40     Community::read_from_apub_id(conn, &apub_id_owned.into())
41   })
42   .await?;
43
44   match community {
45     Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
46       debug!("Fetching and updating from remote community: {}", apub_id);
47       fetch_remote_community(apub_id, context, Some(c), recursion_counter).await
48     }
49     Ok(c) => Ok(c),
50     Err(NotFound {}) => {
51       debug!("Fetching and creating remote community: {}", apub_id);
52       fetch_remote_community(apub_id, context, None, recursion_counter).await
53     }
54     Err(e) => Err(e.into()),
55   }
56 }
57
58 /// Request a community by apub ID from a remote instance, including moderators. If `old_community`,
59 /// is set, this is an update for a community which is already known locally. If not, we don't know
60 /// the community yet and also pull the outbox, to get some initial posts.
61 async fn fetch_remote_community(
62   apub_id: &Url,
63   context: &LemmyContext,
64   old_community: Option<Community>,
65   request_counter: &mut i32,
66 ) -> Result<Community, LemmyError> {
67   let group = fetch_remote_object::<GroupExt>(context.client(), apub_id, request_counter).await;
68
69   if let Some(c) = old_community.to_owned() {
70     if is_deleted(&group) {
71       blocking(context.pool(), move |conn| {
72         Community::update_deleted(conn, c.id, true)
73       })
74       .await??;
75     } else if group.is_err() {
76       // If fetching failed, return the existing data.
77       return Ok(c);
78     }
79   }
80
81   let group = group?;
82   let community =
83     Community::from_apub(&group, context, apub_id.to_owned(), request_counter, false).await?;
84
85   update_community_mods(&group, &community, context, request_counter).await?;
86
87   // only fetch outbox for new communities, otherwise this can create an infinite loop
88   if old_community.is_none() {
89     let outbox = group.inner.outbox()?.context(location_info!())?;
90     fetch_community_outbox(context, outbox, &community, request_counter).await?
91   }
92
93   Ok(community)
94 }
95
96 async fn update_community_mods(
97   group: &GroupExt,
98   community: &Community,
99   context: &LemmyContext,
100   request_counter: &mut i32,
101 ) -> Result<(), LemmyError> {
102   let new_moderators = fetch_community_mods(context, group, request_counter).await?;
103   let community_id = community.id;
104   let current_moderators = blocking(context.pool(), move |conn| {
105     CommunityModeratorView::for_community(&conn, community_id)
106   })
107   .await??;
108   // Remove old mods from database which arent in the moderators collection anymore
109   for mod_user in &current_moderators {
110     if !new_moderators.contains(&&mod_user.moderator.actor_id.clone().into()) {
111       let community_moderator_form = CommunityModeratorForm {
112         community_id: mod_user.community.id,
113         person_id: mod_user.moderator.id,
114       };
115       blocking(context.pool(), move |conn| {
116         CommunityModerator::leave(conn, &community_moderator_form)
117       })
118       .await??;
119     }
120   }
121
122   // Add new mods to database which have been added to moderators collection
123   for mod_uri in new_moderators {
124     let mod_user = get_or_fetch_and_upsert_person(&mod_uri, context, request_counter).await?;
125     let current_mod_uris: Vec<DbUrl> = current_moderators
126       .clone()
127       .iter()
128       .map(|c| c.moderator.actor_id.clone())
129       .collect();
130     if !current_mod_uris.contains(&mod_user.actor_id) {
131       let community_moderator_form = CommunityModeratorForm {
132         community_id: community.id,
133         person_id: mod_user.id,
134       };
135       blocking(context.pool(), move |conn| {
136         CommunityModerator::join(conn, &community_moderator_form)
137       })
138       .await??;
139     }
140   }
141
142   Ok(())
143 }
144
145 async fn fetch_community_outbox(
146   context: &LemmyContext,
147   outbox: &Url,
148   community: &Community,
149   recursion_counter: &mut i32,
150 ) -> Result<(), LemmyError> {
151   let outbox =
152     fetch_remote_object::<OrderedCollection>(context.client(), outbox, recursion_counter).await?;
153   let outbox_activities = outbox.items().context(location_info!())?.clone();
154   let mut outbox_activities = outbox_activities.many().context(location_info!())?;
155   if outbox_activities.len() > 20 {
156     outbox_activities = outbox_activities[0..20].to_vec();
157   }
158
159   for activity in outbox_activities {
160     todo!("{:?} {:?} {:?}", activity, community, recursion_counter);
161     //receive_announce(context, activity, community, recursion_counter).await?;
162   }
163
164   Ok(())
165 }
166
167 pub(crate) async fn fetch_community_mods(
168   context: &LemmyContext,
169   group: &GroupExt,
170   recursion_counter: &mut i32,
171 ) -> Result<Vec<Url>, LemmyError> {
172   if let Some(mods_url) = &group.ext_one.moderators {
173     let mods =
174       fetch_remote_object::<OrderedCollection>(context.client(), mods_url, recursion_counter)
175         .await?;
176     let mods = mods
177       .items()
178       .map(|i| i.as_many())
179       .flatten()
180       .context(location_info!())?
181       .iter()
182       .filter_map(|i| i.as_xsd_any_uri())
183       .map(|u| u.to_owned())
184       .collect();
185     Ok(mods)
186   } else {
187     Ok(vec![])
188   }
189 }