]> Untitled Git - lemmy.git/blob - crates/apub/src/fetcher/community.rs
Rewrite delete activities (#1699)
[lemmy.git] / crates / apub / src / fetcher / community.rs
1 use crate::{
2   activities::community::announce::AnnounceActivity,
3   fetcher::{
4     fetch::fetch_remote_object,
5     is_deleted,
6     person::get_or_fetch_and_upsert_person,
7     should_refetch_actor,
8   },
9   objects::{community::Group, FromApub},
10 };
11 use activitystreams::collection::{CollectionExt, OrderedCollection};
12 use anyhow::Context;
13 use diesel::result::Error::NotFound;
14 use lemmy_api_common::blocking;
15 use lemmy_apub_lib::ActivityHandler;
16 use lemmy_db_queries::{source::community::Community_, ApubObject, Joinable};
17 use lemmy_db_schema::source::community::{Community, CommunityModerator, CommunityModeratorForm};
18 use lemmy_db_views_actor::community_moderator_view::CommunityModeratorView;
19 use lemmy_utils::{location_info, LemmyError};
20 use lemmy_websocket::LemmyContext;
21 use log::debug;
22 use url::Url;
23
24 /// Get a community from its apub ID.
25 ///
26 /// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
27 /// Otherwise it is fetched from the remote instance, stored and returned.
28 pub(crate) async fn get_or_fetch_and_upsert_community(
29   apub_id: &Url,
30   context: &LemmyContext,
31   recursion_counter: &mut i32,
32 ) -> Result<Community, LemmyError> {
33   let apub_id_owned = apub_id.to_owned();
34   let community = blocking(context.pool(), move |conn| {
35     Community::read_from_apub_id(conn, &apub_id_owned.into())
36   })
37   .await?;
38
39   match community {
40     Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
41       debug!("Fetching and updating from remote community: {}", apub_id);
42       fetch_remote_community(apub_id, context, Some(c), recursion_counter).await
43     }
44     Ok(c) => Ok(c),
45     Err(NotFound {}) => {
46       debug!("Fetching and creating remote community: {}", apub_id);
47       fetch_remote_community(apub_id, context, None, recursion_counter).await
48     }
49     Err(e) => Err(e.into()),
50   }
51 }
52
53 /// Request a community by apub ID from a remote instance, including moderators. If `old_community`,
54 /// is set, this is an update for a community which is already known locally. If not, we don't know
55 /// the community yet and also pull the outbox, to get some initial posts.
56 async fn fetch_remote_community(
57   apub_id: &Url,
58   context: &LemmyContext,
59   old_community: Option<Community>,
60   request_counter: &mut i32,
61 ) -> Result<Community, LemmyError> {
62   let group = fetch_remote_object::<Group>(context.client(), apub_id, request_counter).await;
63
64   if let Some(c) = old_community.to_owned() {
65     if is_deleted(&group) {
66       blocking(context.pool(), move |conn| {
67         Community::update_deleted(conn, c.id, true)
68       })
69       .await??;
70     } else if group.is_err() {
71       // If fetching failed, return the existing data.
72       return Ok(c);
73     }
74   }
75
76   let group = group?;
77   let community = Community::from_apub(&group, context, apub_id, request_counter).await?;
78
79   update_community_mods(&group, &community, context, request_counter).await?;
80
81   // only fetch outbox for new communities, otherwise this can create an infinite loop
82   if old_community.is_none() {
83     fetch_community_outbox(context, &group.outbox, request_counter).await?
84   }
85
86   Ok(community)
87 }
88
89 async fn update_community_mods(
90   group: &Group,
91   community: &Community,
92   context: &LemmyContext,
93   request_counter: &mut i32,
94 ) -> Result<(), LemmyError> {
95   let new_moderators = fetch_community_mods(context, group, request_counter).await?;
96   let community_id = community.id;
97   let current_moderators = blocking(context.pool(), move |conn| {
98     CommunityModeratorView::for_community(conn, community_id)
99   })
100   .await??;
101   // Remove old mods from database which arent in the moderators collection anymore
102   for mod_user in &current_moderators {
103     if !new_moderators.contains(&mod_user.moderator.actor_id.clone().into()) {
104       let community_moderator_form = CommunityModeratorForm {
105         community_id: mod_user.community.id,
106         person_id: mod_user.moderator.id,
107       };
108       blocking(context.pool(), move |conn| {
109         CommunityModerator::leave(conn, &community_moderator_form)
110       })
111       .await??;
112     }
113   }
114
115   // Add new mods to database which have been added to moderators collection
116   for mod_uri in new_moderators {
117     let mod_user = get_or_fetch_and_upsert_person(&mod_uri, context, request_counter).await?;
118
119     if !current_moderators
120       .clone()
121       .iter()
122       .map(|c| c.moderator.actor_id.clone())
123       .any(|x| x == mod_user.actor_id)
124     {
125       let community_moderator_form = CommunityModeratorForm {
126         community_id: community.id,
127         person_id: mod_user.id,
128       };
129       blocking(context.pool(), move |conn| {
130         CommunityModerator::join(conn, &community_moderator_form)
131       })
132       .await??;
133     }
134   }
135
136   Ok(())
137 }
138
139 async fn fetch_community_outbox(
140   context: &LemmyContext,
141   outbox: &Url,
142   recursion_counter: &mut i32,
143 ) -> Result<(), LemmyError> {
144   let outbox =
145     fetch_remote_object::<OrderedCollection>(context.client(), outbox, recursion_counter).await?;
146   let outbox_activities = outbox.items().context(location_info!())?.clone();
147   let mut outbox_activities = outbox_activities.many().context(location_info!())?;
148   if outbox_activities.len() > 20 {
149     outbox_activities = outbox_activities[0..20].to_vec();
150   }
151
152   for announce in outbox_activities {
153     // TODO: instead of converting like this, we should create a struct CommunityOutbox with
154     //       AnnounceActivity as inner type, but that gives me stackoverflow
155     let ser = serde_json::to_string(&announce)?;
156     let announce: AnnounceActivity = serde_json::from_str(&ser)?;
157     announce.receive(context, recursion_counter).await?;
158   }
159
160   Ok(())
161 }
162
163 pub(crate) async fn fetch_community_mods(
164   context: &LemmyContext,
165   group: &Group,
166   recursion_counter: &mut i32,
167 ) -> Result<Vec<Url>, LemmyError> {
168   if let Some(mods_url) = &group.moderators {
169     let mods =
170       fetch_remote_object::<OrderedCollection>(context.client(), mods_url, recursion_counter)
171         .await?;
172     let mods = mods
173       .items()
174       .map(|i| i.as_many())
175       .flatten()
176       .context(location_info!())?
177       .iter()
178       .filter_map(|i| i.as_xsd_any_uri())
179       .map(|u| u.to_owned())
180       .collect();
181     Ok(mods)
182   } else {
183     Ok(vec![])
184   }
185 }